feat(streams): in-memory counters and auto-enabled stats for ephemeral broadcasts#156
Conversation
📝 WalkthroughWalkthroughAdds a thread-safe per-stream counter, wires it into Streamer and StreamEventDispatcher to track broadcasts and connections, exposes metrics via DataSource, surfaces them in the insights web/API controllers and view, and adds i18n keys and RSpec tests. ChangesLive Stream Metrics Tracking and Visualization
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Suggested labels
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@app/controllers/pgbus/insights_controller.rb`:
- Line 12: The controller currently calls data_source.live_stream_metrics and
raises NoMethodError for legacy/stub datasources like
Pgbus::Test::StubDataSource; update the assignment of `@live_stream_metrics` in
insights_controller (the line setting `@live_stream_metrics`) to first guard for
the method (e.g., respond_to?(:live_stream_metrics)) and fall back to a safe
default (such as an empty array or nil) when the data source doesn't implement
it, so tests and extension points using older stubs don't break.
In `@app/views/pgbus/insights/show.html.erb`:
- Around line 181-189: Replace the hardcoded English headings "Streams" and
"Per-Stream Counters" in the ERB template with i18n lookups (e.g. call
t('pgbus.insights.streams') and t('pgbus.insights.per_stream_counters')) so they
follow the rest of the view's localization; update the locale YAML (add
pgbus.insights.streams and pgbus.insights.per_stream_counters keys) and keep the
surrounding markup (the <dt> containing pgbus_number and the <h3> section
header) unchanged so only the displayed strings are switched to I18n.
In `@spec/pgbus/web/streamer/stream_event_dispatcher_spec.rb`:
- Around line 635-644: The test title says the connect should not increment
counters when the connection dies before registry but the assertion expects
total_connections to be 1; change the test to be consistent by either (A)
updating the assertion to expect
dispatcher.stream_counter.total_connections("chat") to eq(0) if a
dead-before-registry connect should not count, or (B) if the intended behavior
is that total_connections tracks attempted connections regardless of death,
rename the example title to reflect that (e.g., "increments total_connections
even if connect dies before registry") and keep the total_connections
expectation; update the spec around the c (built by build_conn, then
c.mark_dead!) and the dispatcher.send(:handle,
described_class::ConnectMessage.new(connection: c)) accordingly.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI (base), Organization UI (inherited)
Review profile: ASSERTIVE
Plan: Pro
Run ID: b107cae5-a3a6-4924-90f7-f55976b23030
📒 Files selected for processing (23)
app/controllers/pgbus/api/insights_controller.rbapp/controllers/pgbus/insights_controller.rbapp/views/pgbus/insights/show.html.erbconfig/locales/da.ymlconfig/locales/de.ymlconfig/locales/en.ymlconfig/locales/es.ymlconfig/locales/fi.ymlconfig/locales/fr.ymlconfig/locales/it.ymlconfig/locales/ja.ymlconfig/locales/nb.ymlconfig/locales/nl.ymlconfig/locales/pt.ymlconfig/locales/sv.ymllib/pgbus/web/data_source.rblib/pgbus/web/streamer.rblib/pgbus/web/streamer/instance.rblib/pgbus/web/streamer/stream_counter.rblib/pgbus/web/streamer/stream_event_dispatcher.rbspec/pgbus/web/data_source_spec.rbspec/pgbus/web/streamer/stream_counter_spec.rbspec/pgbus/web/streamer/stream_event_dispatcher_spec.rb
eb2724f to
b733ea5
Compare
…l broadcasts Ephemeral (transient) streams previously had zero observability — no PGMQ queue meant no queue-level metrics in the dashboard. This change adds two complementary mechanisms so operators never fly blind: 1. StreamCounter — always-on, zero-cost in-memory counters (broadcasts, active connections, total connections) per stream. Uses Concurrent::AtomicFixnum for thread safety. Exposed via DataSource and rendered in the Insights dashboard as "Live Stream Counters". 2. Auto-enabled StreamStat for ephemeral broadcasts — ephemeral broadcasts now always write to pgbus_stream_stats regardless of the streams_stats_enabled flag. Durable broadcasts remain gated by the flag since they already have PGMQ queue-level metrics. This ensures historical analytics (top streams, throughput, fanout) work out of the box for the default ephemeral mode.
Both StubDataSource classes (system test + dummy app) were missing the new live_stream_metrics method, causing system tests to crash with NoMethodError when the Insights controller called it.
b733ea5 to
85fbc60
Compare
- Replace hardcoded "Streams" and "Per-Stream Counters" text in the insights view with i18n lookups (per_stream_title, streams keys) - Add translations to all 12 locale files - Rename misleading spec title to match actual assertion semantics (total_connections IS incremented, active_connections is not)
Orchestrator command that runs /github-review-failures first (fix CI), then /github-review-comments (address review threads). Adapted from zazu/app-2's version for pgbus tooling (rubocop, rspec).
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@lib/pgbus/web/streamer/stream_event_dispatcher.rb`:
- Around line 351-354: The unconditional call to
`@stream_counter.decrement_connections`(stream) can run multiple times for the
same dead connection; change the logic to only decrement when the connection was
actually present (make it idempotent) by checking the result of removing the
connection before decrementing — e.g. call `@scanned_cursor.delete`(connection)
(or use the boolean/returned value from `@registry.unregister`(connection)) and
wrap `@stream_counter.decrement_connections`(stream) in an if that only runs when
the delete/unregister indicates the connection was present; keep
cleanup_stream_if_unused(stream) unchanged.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI (base), Organization UI (inherited)
Review profile: ASSERTIVE
Plan: Pro
Run ID: e0455852-2c9d-4381-ba79-280aed3b4032
📒 Files selected for processing (26)
.claude/commands/github-review-pr.mdapp/controllers/pgbus/api/insights_controller.rbapp/controllers/pgbus/insights_controller.rbapp/views/pgbus/insights/show.html.erbconfig/locales/da.ymlconfig/locales/de.ymlconfig/locales/en.ymlconfig/locales/es.ymlconfig/locales/fi.ymlconfig/locales/fr.ymlconfig/locales/it.ymlconfig/locales/ja.ymlconfig/locales/nb.ymlconfig/locales/nl.ymlconfig/locales/pt.ymlconfig/locales/sv.ymllib/pgbus/web/data_source.rblib/pgbus/web/streamer.rblib/pgbus/web/streamer/instance.rblib/pgbus/web/streamer/stream_counter.rblib/pgbus/web/streamer/stream_event_dispatcher.rbspec/dummy/lib/stub_data_source.rbspec/pgbus/web/data_source_spec.rbspec/pgbus/web/streamer/stream_counter_spec.rbspec/pgbus/web/streamer/stream_event_dispatcher_spec.rbspec/system/support/data_source_stub.rb
| @registry.unregister(connection) | ||
| @scanned_cursor.delete(connection) | ||
| @stream_counter.decrement_connections(stream) | ||
| cleanup_stream_if_unused(stream) |
There was a problem hiding this comment.
Make active-connection decrement idempotent on disconnect.
prune_dead can enqueue duplicate disconnect messages for the same dead connection while wake messages are being drained; with unconditional decrement on Line 353, active counters can drift below the real value.
Suggested fix
def handle_disconnect(msg)
started_at = monotonic_ms
connection = msg.connection
stream = connection.stream_name
+ was_registered = `@registry.connections_for`(stream).any? { |c| c.equal?(connection) }
`@registry.unregister`(connection)
`@scanned_cursor.delete`(connection)
- `@stream_counter.decrement_connections`(stream)
+ `@stream_counter.decrement_connections`(stream) if was_registered
cleanup_stream_if_unused(stream)
record_stat(🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@lib/pgbus/web/streamer/stream_event_dispatcher.rb` around lines 351 - 354,
The unconditional call to `@stream_counter.decrement_connections`(stream) can run
multiple times for the same dead connection; change the logic to only decrement
when the connection was actually present (make it idempotent) by checking the
result of removing the connection before decrementing — e.g. call
`@scanned_cursor.delete`(connection) (or use the boolean/returned value from
`@registry.unregister`(connection)) and wrap
`@stream_counter.decrement_connections`(stream) in an if that only runs when the
delete/unregister indicates the connection was present; keep
cleanup_stream_if_unused(stream) unchanged.
Summary
Concurrent::AtomicFixnum. Exposed viaDataSource#live_stream_metricsand rendered in the Insights dashboard as "Live Stream Counters".pgbus_stream_statsregardless ofstreams_stats_enabled. Durable broadcasts remain gated by the flag since they already have PGMQ queue-level metrics. This provides historical analytics (top streams, throughput, fanout) for the default ephemeral mode without configuration.streams_stats_enabledis false.Architecture
Files changed
lib/pgbus/web/streamer/stream_counter.rb(new),stream_event_dispatcher.rb,instance.rb,streamer.rbdata_source.rb,insights_controller.rb,api/insights_controller.rb,show.html.erbstream_counter_spec.rb(new, 10 examples),stream_event_dispatcher_spec.rb(+6 examples),data_source_spec.rb(+2 examples)Test plan
bundle exec rspec— 2003 examples, 0 failuresbundle exec rubocop— 359 files, no offensespgbus_stream_statsrows withoutstreams_stats_enabledSummary by CodeRabbit
New Features
Localization
Tests