Skip to content

feat(streams): per-broadcast and per-stream durable opt-in#152

Merged
mhenrixon merged 2 commits into
mainfrom
feature/durable-opt-in-controls
May 8, 2026
Merged

feat(streams): per-broadcast and per-stream durable opt-in#152
mhenrixon merged 2 commits into
mainfrom
feature/durable-opt-in-controls

Conversation

@mhenrixon
Copy link
Copy Markdown
Owner

@mhenrixon mhenrixon commented May 8, 2026

Summary

Builds on #151 (ephemeral broadcast mode) to add finer-grained opt-in controls for durable streams. Apps no longer have to choose between "all broadcasts ephemeral" or "all broadcasts durable" — they can mix per-broadcast and per-stream-pattern.

Two new opt-in mechanisms

1. Per-broadcast override

# Stream is ephemeral by default, but this single broadcast persists:
Pgbus.stream("notifications").broadcast(html, durable: true)

# Stream is durable, but this one broadcast is fire-and-forget:
Pgbus.stream("chat:42").broadcast(html, durable: false)

durable: on Stream#broadcast defaults to nil (defer to stream-level setting). Explicit true/false flips the mode for that one call.

2. Per-stream pattern config

Pgbus.configure do |c|
  c.streams_default_broadcast_mode = :ephemeral

  # These streams need backlog replay, so create durable PGMQ queues:
  c.streams_durable_patterns = [
    "chat",                # exact match
    /^orders:/,            # regex match
    /^audit:invoice_/
  ]
end

Pgbus.stream("chat").broadcast(html)              # → durable
Pgbus.stream("orders:42:items").broadcast(html)   # → durable (regex)
Pgbus.stream("notifications:7").broadcast(html)   # → ephemeral (default)

Pgbus.stream(name) without an explicit durable: kwarg resolves the mode via:

  1. streams_durable_patterns (string equality or regex match)
  2. streams_default_broadcast_mode fallback

Explicit Pgbus.stream(name, durable: true/false) still bypasses the resolver.

Behavior change

Pgbus.stream's durable: parameter default changed from truenil (resolver-driven). Apps relying on the old "always durable when called with no kwarg" behavior should either:

  • Pass durable: true explicitly, or
  • Set c.streams_default_broadcast_mode = :durable, or
  • Add the stream name/pattern to c.streams_durable_patterns

This aligns the explicit-API path with the Turbo-patch path that #151 introduced.

Follow-up

Per-Turbo-helper overrides (broadcasts_to :account, durable: true) tracked separately — needs to patch every Turbo broadcast helper signature, larger surface than this change. Will open a follow-up issue.

Test plan

  • bundle exec rspec — passes (1925+ examples, 0 failures, same pre-existing I18n pendings as main)
  • bundle exec rubocop — clean
  • Per-broadcast durable: true on ephemeral stream creates queue + sends message
  • Per-broadcast durable: false on durable stream uses NOTIFY-only path
  • streams_durable_patterns exact-string match marks stream durable
  • streams_durable_patterns regex match marks stream durable
  • Explicit durable: kwarg on Pgbus.stream overrides the resolver
  • Configuration validation rejects non-Array streams_durable_patterns

Summary by CodeRabbit

  • New Features

    • Stream durability can be selected per stream name via configurable patterns (strings or regex) and a configurable default mode.
    • Broadcasts may now specify durability per call, overriding the stream’s default.
  • Behavior

    • Omitting a per-call durability causes the system to resolve durability from patterns/defaults.
  • Tests

    • Added tests covering pattern matching, per-broadcast overrides, validation, and default behavior.

Adds two complementary opt-in mechanisms on top of the global
streams_default_broadcast_mode setting introduced in #151:

1. Per-broadcast override: stream.broadcast(html, durable: true/false)
   flips the mode for a single call. nil (the default) defers to the
   stream's own setting.

2. Per-stream pattern config: streams_durable_patterns accepts an Array
   of strings (exact match) or Regexps. Pgbus.stream(name) without an
   explicit durable: kwarg now resolves the mode through the patterns
   first, then falls back to streams_default_broadcast_mode.

Pgbus.stream's durable: parameter changed default from true to nil so
the resolver can run. Explicit durable: true/false still bypasses the
resolver.

Example:
  Pgbus.configure do |c|
    c.streams_durable_patterns = ["chat", /^orders:/]
  end

  Pgbus.stream("chat").broadcast(html)         # durable
  Pgbus.stream("notifications").broadcast(html) # ephemeral
  Pgbus.stream("notifications").broadcast(html, durable: true) # one-off durable
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 8, 2026

Review Change Stack

Caution

Review failed

The pull request is closed.

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Repository UI (base), Organization UI (inherited)

Review profile: ASSERTIVE

Plan: Pro

Run ID: 882872bd-d918-4e2d-8cad-d0cbbc5cb4b0

📥 Commits

Reviewing files that changed from the base of the PR and between 02e698c and 04831d2.

📒 Files selected for processing (2)
  • lib/pgbus/client/notify_stream.rb
  • spec/integration_helper.rb

📝 Walkthrough

Walkthrough

This PR adds configurable stream durability patterns to pgbus. Streams can now be marked durable via configuration pattern rules, with explicit durable: parameters at stream creation and per-broadcast call levels overriding configured patterns.

Changes

Stream Durability Configuration

Layer / File(s) Summary
Configuration Settings and Validation
lib/pgbus/configuration.rb
Adds streams_durable_patterns attribute (defaults to empty array), validates it as an Array, and implements stream_durable?(name) to resolve durability by testing string/regex patterns against stream names with fallback to streams_default_broadcast_mode.
Stream Caching and Per-Broadcast Overrides
lib/pgbus.rb, lib/pgbus/streams.rb
Pgbus.stream now resolves durability from configuration when durable: nil (default) and caches streams by resolved mode ("d" vs "e"). Stream#broadcast accepts optional durable: parameter to override stream-level durability for a single call.
NotifyStream call-site
lib/pgbus/client/notify_stream.rb
Uses @pgmq.__send__(:with_connection) to invoke the internal connection wrapper while preserving instrumentation and notify behavior.
Configuration and Override Tests
spec/pgbus/streams/durable_patterns_spec.rb, spec/pgbus/streams/durable_opt_in_spec.rb
Validates pattern-based stream selection (exact string and regex matching), configuration attribute defaults and validation, per-broadcast durable: overrides with correct queue routing, and payload propagation for visible_to.
Integration Test Setup
spec/integration_helper.rb
Integration tests set streams_default_broadcast_mode = :durable.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Possibly related issues

  • mhenrixon/pgbus#149: Both changes directly address stream durability configuration and per-call override behavior.

Possibly related PRs

  • mhenrixon/pgbus#152: The main PR implements per-broadcast durable: overrides and per-stream durable pattern/configuration changes—exactly the same code-level changes described in the retrieved PR (#152).
  • mhenrixon/pgbus#81: Both PRs modify Pgbus::Streams::Stream#broadcast (adding a per-call durable: override in the main PR and transactional deferral behavior in the retrieved PR), so they are related at the code level.
  • mhenrixon/pgbus#88: Both PRs modify Pgbus.stream’s caching behavior (this main PR extends the cache key to include resolved durability and changes the method signature/semantics), so they are related.

Suggested labels

streaming, enhancement

Poem

🐰 A rabbit sniffs patterns in the code,

Hop-hop, choosing durable mode.
Per-broadcast choices—soft and spry,
Cached by mode as messages fly.
Cheers for flexible streams, oh my!

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 60.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'feat(streams): per-broadcast and per-stream durable opt-in' directly and comprehensively summarizes the main changes: introducing optional durable controls at both the broadcast and stream level.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feature/durable-opt-in-controls

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@lib/pgbus/configuration.rb`:
- Line 370: The current check only ensures streams_durable_patterns is an Array
but not that each element is a String or Regexp, causing later NoMethodError in
stream_durable? when non-matching types are present; update the validation at
the existing raise to iterate streams_durable_patterns and verify every element
is a String or Regexp (e.g., using is_a?(String) || is_a?(Regexp)), and if any
invalid elements exist raise an ArgumentError that lists the offending values or
their types so the user gets a clear configuration-time error referencing
streams_durable_patterns and preventing runtime failures in stream_durable?.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI (base), Organization UI (inherited)

Review profile: ASSERTIVE

Plan: Pro

Run ID: bb9ab0e8-2acf-4107-8199-e8f166a985c5

📥 Commits

Reviewing files that changed from the base of the PR and between 6bee6bb and 02e698c.

📒 Files selected for processing (5)
  • lib/pgbus.rb
  • lib/pgbus/configuration.rb
  • lib/pgbus/streams.rb
  • spec/pgbus/streams/durable_opt_in_spec.rb
  • spec/pgbus/streams/durable_patterns_spec.rb

Comment thread lib/pgbus/configuration.rb
- Use __send__(:with_connection) since PGMQ::Client#with_connection is
  private — unit tests passed because mocks respond to private methods,
  but integration tests hit the real PGMQ client
- Set streams_default_broadcast_mode = :durable in integration_helper so
  integration specs using Pgbus.stream(name).broadcast() get durable
  queues for read_after verification
@mhenrixon mhenrixon merged commit 4ad2852 into main May 8, 2026
8 of 9 checks passed
@mhenrixon mhenrixon deleted the feature/durable-opt-in-controls branch May 8, 2026 15:29
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant