fix(streams): deliver ephemeral broadcasts, idempotent disconnect counter#157
Merged
Merged
Conversation
…nter Two related streamer bugs surfaced after 0.8.1. 1. Ephemeral broadcasts never reached browsers The Dispatcher tags ephemeral envelopes with negative msg_ids (-@ephemeral_seq) so they don't pollute the PGMQ cursor space, but Connection#enqueue applied the same `msg_id <= last_msg_id_sent` filter used for durable replay. Every fresh connection starts at since_id=0, so every ephemeral envelope (-1, -2, ...) was rejected before being written to the SSE socket. Symptom: a second tab broadcasts via Pgbus.stream(...).broadcast(html), the streamer logs the wake, but the browser tab listening on the same stream sees nothing. Fix: ephemeral envelopes (negative msg_id) bypass the cursor check and do not advance last_msg_id_sent (they have no replay semantics, so the cursor must continue tracking only durable PGMQ ids). 2. Duplicate DisconnectMessages over-decremented active_connections prune_dead enqueues a DisconnectMessage on every wake when a connection is dead. If two wakes fire before the first DisconnectMessage drains, two DisconnectMessages exist for the same connection. The second decrement_connections call drove the counter negative, undercounting subsequent legitimate connections. Fix: Registry#unregister now returns true/false depending on whether it actually removed a connection. handle_disconnect only decrements the counter when unregister returns true.
📝 WalkthroughWalkthroughThis PR adds support for ephemeral envelopes (negative msg_id) that skip cursor-based filtering and do not advance the replay cursor, updates Registry#unregister to return a boolean indicating whether removal occurred, and uses that contract to make disconnect event handling idempotent against duplicate disconnects from race conditions. ChangesEphemeral Message Handling and Idempotent Disconnect
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Possibly related PRs
Suggested labels
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Comment |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Two related streamer bugs surfaced after 0.8.1. Both ship together because they affect the same delivery path and share a fix branch.
1. Ephemeral broadcasts never reach browsers (the load-bearing fix)
**Ephemeral broadcasts arrive at the streamer as PG NOTIFY payloads but never reach the browser.
The
StreamEventDispatchertags ephemeral envelopes with negativemsg_ids (-@ephemeral_seq) so they don't pollute the PGMQ cursor space:But
Connection#enqueueapplied the same cursor check used for durable replay:A fresh connection starts at
since_id = 0, solast_msg_id_sent = 0. Every ephemeral envelope hasmsg_id = -1, -2, ..., and-1 <= 0is true — every ephemeral broadcast got dropped before being written to the SSE socket.Failure scenario (Juraj's reproduction):
Pgbus.stream(...).broadcast(html).msg_id = -1.-1 <= 0.Fix: ephemeral envelopes (negative
msg_id) bypass the cursor check and do not advancelast_msg_id_sent. They have no replay semantics, so the cursor must continue tracking only durable PGMQ ids — otherwise a single ephemeral broadcast would set the cursor to a negative value and break subsequent durable replay.2. Duplicate DisconnectMessages over-decremented active_connections (CodeRabbit comment)
prune_deadenqueues aDisconnectMessageon every wake when a connection is dead. Under burst traffic, two wakes can fire before the firstDisconnectMessagedrains the queue — leaving twoDisconnectMessages for the same connection. The seconddecrement_connectionscall drove the active-connection counter negative, undercounting subsequent legitimate connections.Fix:
Registry#unregisternow returnstruewhen it actually removed a connection,falsewhen the connection was already gone.handle_disconnectonly decrements whenunregisterreturns true.Test plan
bundle exec rspec spec/pgbus/web/streamer/— 158 examples, 0 failuresConnection#enqueue ephemeral envelopes (negative msg_id)— 4 examples covering since_id=0, post-replay cursor positions, mixed durable+ephemeral, and cursor non-advancementis idempotent on duplicate disconnect (does not over-decrement when prune_dead races)bundle exec rubocop— cleanFiles changed
lib/pgbus/web/streamer/connection.rblib/pgbus/web/streamer/registry.rbunregisterreturns true/falselib/pgbus/web/streamer/stream_event_dispatcher.rbspec/pgbus/web/streamer/connection_spec.rbspec/pgbus/web/streamer/registry_spec.rbunregisterspec/pgbus/web/streamer/stream_event_dispatcher_spec.rbSummary by CodeRabbit
New Features
Bug Fixes
Tests