Skip to content

feat(streams): in-memory counters and auto-enabled stats for ephemeral broadcasts#156

Merged
mhenrixon merged 4 commits into
mainfrom
feature/transient-metrics
May 14, 2026
Merged

feat(streams): in-memory counters and auto-enabled stats for ephemeral broadcasts#156
mhenrixon merged 4 commits into
mainfrom
feature/transient-metrics

Conversation

@mhenrixon
Copy link
Copy Markdown
Owner

@mhenrixon mhenrixon commented May 14, 2026

Summary

  • StreamCounter: Always-on, zero-cost in-memory counters per stream (broadcasts, active connections, total connections) using Concurrent::AtomicFixnum. Exposed via DataSource#live_stream_metrics and rendered in the Insights dashboard as "Live Stream Counters".
  • Auto-enabled StreamStat for ephemeral broadcasts: Ephemeral broadcasts now always write to pgbus_stream_stats regardless of streams_stats_enabled. Durable broadcasts remain gated by the flag since they already have PGMQ queue-level metrics. This provides historical analytics (top streams, throughput, fanout) for the default ephemeral mode without configuration.
  • Dashboard integration: New "Live Stream Counters" section in Insights view shows per-stream broadcast count, active connections, and total connections — visible even when streams_stats_enabled is false.

Architecture

StreamEventDispatcher
  ├── StreamCounter (always-on, in-memory)
  │   ├── increment_broadcasts() on every wake (durable + ephemeral)
  │   ├── increment_connections() on successful connect
  │   └── decrement_connections() on disconnect
  └── record_stat() (StreamStat DB writes)
      ├── ephemeral broadcasts → always records
      └── durable broadcasts → gated by streams_stats_enabled

Files changed

Area Files
Core lib/pgbus/web/streamer/stream_counter.rb (new), stream_event_dispatcher.rb, instance.rb, streamer.rb
Dashboard data_source.rb, insights_controller.rb, api/insights_controller.rb, show.html.erb
i18n All 12 locale files
Specs stream_counter_spec.rb (new, 10 examples), stream_event_dispatcher_spec.rb (+6 examples), data_source_spec.rb (+2 examples)

Test plan

  • bundle exec rspec — 2003 examples, 0 failures
  • bundle exec rubocop — 359 files, no offenses
  • i18n files normalized, no inconsistent interpolations
  • Manual: verify Insights page renders Live Stream Counters when ephemeral streams are active
  • Manual: verify ephemeral broadcasts create pgbus_stream_stats rows without streams_stats_enabled

Summary by CodeRabbit

  • New Features

    • Insights page now shows live stream counters (totals and per-stream breakdown) and includes live stream metrics from the streaming subsystem.
  • Localization

    • Added localized labels for live stream metrics in 12 languages.
  • Tests

    • Added test coverage for live stream metrics, stream counter behavior, and concurrent counter safety.

Review Change Stack

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 14, 2026

📝 Walkthrough

Walkthrough

Adds a thread-safe per-stream counter, wires it into Streamer and StreamEventDispatcher to track broadcasts and connections, exposes metrics via DataSource, surfaces them in the insights web/API controllers and view, and adds i18n keys and RSpec tests.

Changes

Live Stream Metrics Tracking and Visualization

Layer / File(s) Summary
StreamCounter data structure and API
lib/pgbus/web/streamer/stream_counter.rb, spec/pgbus/web/streamer/stream_counter_spec.rb
Adds a concurrency-safe per-stream counter with increment/decrement APIs, snapshot, and totals aggregation.
Streamer module and Instance integration
lib/pgbus/web/streamer/instance.rb, lib/pgbus/web/streamer.rb
Streamer::Instance creates and exposes a StreamCounter, and Streamer provides module-level access to the current instance's counter.
StreamEventDispatcher event counting
lib/pgbus/web/streamer/stream_event_dispatcher.rb, spec/pgbus/web/streamer/stream_event_dispatcher_spec.rb
Dispatcher accepts an injected stream_counter, increments broadcasts on durable/ephemeral wakes, updates active/total connection counters on connect/disconnect, and supports ephemeral stat recording.
DataSource and metrics exposure
lib/pgbus/web/data_source.rb, spec/pgbus/web/data_source_spec.rb
DataSource#live_stream_metrics returns per-stream snapshot and aggregate totals from Streamer.stream_counter with safe zero defaults; API adds live_streams to JSON.
Controller and view rendering
app/controllers/pgbus/insights_controller.rb, app/controllers/pgbus/api/insights_controller.rb, app/views/pgbus/insights/show.html.erb
Controllers expose live_stream_metrics (JSON and instance var); view conditionally renders Live Stream Counters cards and a per-stream table sorted by broadcasts.
Internationalization support
config/locales/{da,de,en,es,fi,fr,it,ja,nb,nl,pt,sv}.yml
Adds live_streams translation keys for titles, labels, per-stream labels, and empty-state text across locales.
Review orchestration command (doc)
.claude/commands/github-review-pr.md
Adds a two-phase GitHub review runbook document describing Phase A (CI) then Phase B (review threads) with exit criteria and reporting.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Possibly related PRs

  • mhenrixon/pgbus#151: Modifies StreamEventDispatcher handling of durable vs ephemeral wakes; intersects with broadcast/stat paths.
  • mhenrixon/pgbus#24: Prior insights UI/API changes; this PR extends that surface with live_stream_metrics and view additions.
  • mhenrixon/pgbus#107: Earlier dispatcher rewiring that this work builds on.

Suggested labels

enhancement, streaming

Poem

🐰 I hop through channels, counters in paw,
Counting broadcasts, connections I saw.
From dispatcher hops to the UI's bright beams,
Live metrics sparkle in tiny streams.
A rabbit's tally, coded in dreams.

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 18.18% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately describes the primary change: introducing in-memory stream counters and enabling stats recording for ephemeral broadcasts, which aligns with the substantial updates to StreamCounter, StreamEventDispatcher, and the metrics integration throughout the codebase.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feature/transient-metrics

Comment @coderabbitai help to get the list of available commands and usage tips.

@mhenrixon mhenrixon self-assigned this May 14, 2026
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@app/controllers/pgbus/insights_controller.rb`:
- Line 12: The controller currently calls data_source.live_stream_metrics and
raises NoMethodError for legacy/stub datasources like
Pgbus::Test::StubDataSource; update the assignment of `@live_stream_metrics` in
insights_controller (the line setting `@live_stream_metrics`) to first guard for
the method (e.g., respond_to?(:live_stream_metrics)) and fall back to a safe
default (such as an empty array or nil) when the data source doesn't implement
it, so tests and extension points using older stubs don't break.

In `@app/views/pgbus/insights/show.html.erb`:
- Around line 181-189: Replace the hardcoded English headings "Streams" and
"Per-Stream Counters" in the ERB template with i18n lookups (e.g. call
t('pgbus.insights.streams') and t('pgbus.insights.per_stream_counters')) so they
follow the rest of the view's localization; update the locale YAML (add
pgbus.insights.streams and pgbus.insights.per_stream_counters keys) and keep the
surrounding markup (the <dt> containing pgbus_number and the <h3> section
header) unchanged so only the displayed strings are switched to I18n.

In `@spec/pgbus/web/streamer/stream_event_dispatcher_spec.rb`:
- Around line 635-644: The test title says the connect should not increment
counters when the connection dies before registry but the assertion expects
total_connections to be 1; change the test to be consistent by either (A)
updating the assertion to expect
dispatcher.stream_counter.total_connections("chat") to eq(0) if a
dead-before-registry connect should not count, or (B) if the intended behavior
is that total_connections tracks attempted connections regardless of death,
rename the example title to reflect that (e.g., "increments total_connections
even if connect dies before registry") and keep the total_connections
expectation; update the spec around the c (built by build_conn, then
c.mark_dead!) and the dispatcher.send(:handle,
described_class::ConnectMessage.new(connection: c)) accordingly.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI (base), Organization UI (inherited)

Review profile: ASSERTIVE

Plan: Pro

Run ID: b107cae5-a3a6-4924-90f7-f55976b23030

📥 Commits

Reviewing files that changed from the base of the PR and between 216a32a and eb2724f.

📒 Files selected for processing (23)
  • app/controllers/pgbus/api/insights_controller.rb
  • app/controllers/pgbus/insights_controller.rb
  • app/views/pgbus/insights/show.html.erb
  • config/locales/da.yml
  • config/locales/de.yml
  • config/locales/en.yml
  • config/locales/es.yml
  • config/locales/fi.yml
  • config/locales/fr.yml
  • config/locales/it.yml
  • config/locales/ja.yml
  • config/locales/nb.yml
  • config/locales/nl.yml
  • config/locales/pt.yml
  • config/locales/sv.yml
  • lib/pgbus/web/data_source.rb
  • lib/pgbus/web/streamer.rb
  • lib/pgbus/web/streamer/instance.rb
  • lib/pgbus/web/streamer/stream_counter.rb
  • lib/pgbus/web/streamer/stream_event_dispatcher.rb
  • spec/pgbus/web/data_source_spec.rb
  • spec/pgbus/web/streamer/stream_counter_spec.rb
  • spec/pgbus/web/streamer/stream_event_dispatcher_spec.rb

Comment thread app/controllers/pgbus/insights_controller.rb
Comment thread app/views/pgbus/insights/show.html.erb Outdated
Comment thread spec/pgbus/web/streamer/stream_event_dispatcher_spec.rb Outdated
@mhenrixon mhenrixon force-pushed the feature/transient-metrics branch from eb2724f to b733ea5 Compare May 14, 2026 14:56
mhenrixon added 2 commits May 14, 2026 16:57
…l broadcasts

Ephemeral (transient) streams previously had zero observability — no PGMQ
queue meant no queue-level metrics in the dashboard. This change adds two
complementary mechanisms so operators never fly blind:

1. StreamCounter — always-on, zero-cost in-memory counters (broadcasts,
   active connections, total connections) per stream. Uses
   Concurrent::AtomicFixnum for thread safety. Exposed via DataSource
   and rendered in the Insights dashboard as "Live Stream Counters".

2. Auto-enabled StreamStat for ephemeral broadcasts — ephemeral broadcasts
   now always write to pgbus_stream_stats regardless of the
   streams_stats_enabled flag. Durable broadcasts remain gated by the
   flag since they already have PGMQ queue-level metrics. This ensures
   historical analytics (top streams, throughput, fanout) work out of
   the box for the default ephemeral mode.
Both StubDataSource classes (system test + dummy app) were missing the
new live_stream_metrics method, causing system tests to crash with
NoMethodError when the Insights controller called it.
@mhenrixon mhenrixon force-pushed the feature/transient-metrics branch from b733ea5 to 85fbc60 Compare May 14, 2026 14:58
mhenrixon added 2 commits May 14, 2026 17:01
- Replace hardcoded "Streams" and "Per-Stream Counters" text in the
  insights view with i18n lookups (per_stream_title, streams keys)
- Add translations to all 12 locale files
- Rename misleading spec title to match actual assertion semantics
  (total_connections IS incremented, active_connections is not)
Orchestrator command that runs /github-review-failures first (fix CI),
then /github-review-comments (address review threads). Adapted from
zazu/app-2's version for pgbus tooling (rubocop, rspec).
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@lib/pgbus/web/streamer/stream_event_dispatcher.rb`:
- Around line 351-354: The unconditional call to
`@stream_counter.decrement_connections`(stream) can run multiple times for the
same dead connection; change the logic to only decrement when the connection was
actually present (make it idempotent) by checking the result of removing the
connection before decrementing — e.g. call `@scanned_cursor.delete`(connection)
(or use the boolean/returned value from `@registry.unregister`(connection)) and
wrap `@stream_counter.decrement_connections`(stream) in an if that only runs when
the delete/unregister indicates the connection was present; keep
cleanup_stream_if_unused(stream) unchanged.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI (base), Organization UI (inherited)

Review profile: ASSERTIVE

Plan: Pro

Run ID: e0455852-2c9d-4381-ba79-280aed3b4032

📥 Commits

Reviewing files that changed from the base of the PR and between b733ea5 and e475caf.

📒 Files selected for processing (26)
  • .claude/commands/github-review-pr.md
  • app/controllers/pgbus/api/insights_controller.rb
  • app/controllers/pgbus/insights_controller.rb
  • app/views/pgbus/insights/show.html.erb
  • config/locales/da.yml
  • config/locales/de.yml
  • config/locales/en.yml
  • config/locales/es.yml
  • config/locales/fi.yml
  • config/locales/fr.yml
  • config/locales/it.yml
  • config/locales/ja.yml
  • config/locales/nb.yml
  • config/locales/nl.yml
  • config/locales/pt.yml
  • config/locales/sv.yml
  • lib/pgbus/web/data_source.rb
  • lib/pgbus/web/streamer.rb
  • lib/pgbus/web/streamer/instance.rb
  • lib/pgbus/web/streamer/stream_counter.rb
  • lib/pgbus/web/streamer/stream_event_dispatcher.rb
  • spec/dummy/lib/stub_data_source.rb
  • spec/pgbus/web/data_source_spec.rb
  • spec/pgbus/web/streamer/stream_counter_spec.rb
  • spec/pgbus/web/streamer/stream_event_dispatcher_spec.rb
  • spec/system/support/data_source_stub.rb

Comment on lines 351 to 354
@registry.unregister(connection)
@scanned_cursor.delete(connection)
@stream_counter.decrement_connections(stream)
cleanup_stream_if_unused(stream)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Make active-connection decrement idempotent on disconnect.

prune_dead can enqueue duplicate disconnect messages for the same dead connection while wake messages are being drained; with unconditional decrement on Line 353, active counters can drift below the real value.

Suggested fix
 def handle_disconnect(msg)
   started_at = monotonic_ms
   connection = msg.connection
   stream = connection.stream_name
+  was_registered = `@registry.connections_for`(stream).any? { |c| c.equal?(connection) }
   `@registry.unregister`(connection)
   `@scanned_cursor.delete`(connection)
-  `@stream_counter.decrement_connections`(stream)
+  `@stream_counter.decrement_connections`(stream) if was_registered
   cleanup_stream_if_unused(stream)

   record_stat(
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@lib/pgbus/web/streamer/stream_event_dispatcher.rb` around lines 351 - 354,
The unconditional call to `@stream_counter.decrement_connections`(stream) can run
multiple times for the same dead connection; change the logic to only decrement
when the connection was actually present (make it idempotent) by checking the
result of removing the connection before decrementing — e.g. call
`@scanned_cursor.delete`(connection) (or use the boolean/returned value from
`@registry.unregister`(connection)) and wrap
`@stream_counter.decrement_connections`(stream) in an if that only runs when the
delete/unregister indicates the connection was present; keep
cleanup_stream_if_unused(stream) unchanged.

@mhenrixon mhenrixon merged commit d0d4bc9 into main May 14, 2026
9 checks passed
@mhenrixon mhenrixon deleted the feature/transient-metrics branch May 14, 2026 15:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant