From 502403a8d8e1c932d6f6206bb982141bdede7adf Mon Sep 17 00:00:00 2001 From: mhenrixon Date: Thu, 14 May 2026 12:20:47 +0200 Subject: [PATCH] fix(client): wrap notify_stream in with_stale_connection_retry notify_stream was the only PGMQ call site on Pgbus::Client that bypassed with_stale_connection_retry, causing transient idle-socket TLS errors (SSL EOF, SSL SYSCALL) to propagate to callers instead of being absorbed by the one-shot retry that every other method uses. This wraps the body of notify_stream in with_stale_connection_retry with synchronized inside the retry (matching send_message and the rest of Client), so the ephemeral broadcast path has the same resilience as every other producer. --- lib/pgbus/client/notify_stream.rb | 8 ++-- spec/pgbus/client/notify_stream_spec.rb | 63 +++++++++++++++++++++++++ 2 files changed, 68 insertions(+), 3 deletions(-) diff --git a/lib/pgbus/client/notify_stream.rb b/lib/pgbus/client/notify_stream.rb index 4d2c6bb..1f9087e 100644 --- a/lib/pgbus/client/notify_stream.rb +++ b/lib/pgbus/client/notify_stream.rb @@ -25,9 +25,11 @@ def notify_stream(stream_name, payload) json = payload.is_a?(String) ? payload : JSON.generate(payload) Instrumentation.instrument("pgbus.stream.notify", stream: stream_name, bytes: json.bytesize) do - synchronized do - @pgmq.__send__(:with_connection) do |conn| - conn.exec_params("SELECT pg_notify($1, $2)", [channel, json]) + with_stale_connection_retry do + synchronized do + @pgmq.__send__(:with_connection) do |conn| + conn.exec_params("SELECT pg_notify($1, $2)", [channel, json]) + end end end end diff --git a/spec/pgbus/client/notify_stream_spec.rb b/spec/pgbus/client/notify_stream_spec.rb index a109f1d..06c671c 100644 --- a/spec/pgbus/client/notify_stream_spec.rb +++ b/spec/pgbus/client/notify_stream_spec.rb @@ -66,5 +66,68 @@ def initialize(*args, **kwargs); end ) ) end + + context "with stale pgmq connection recovery" do + before do + stub_const("PGMQ::Errors::ConnectionError", Class.new(StandardError)) unless defined?(PGMQ::Errors::ConnectionError) + end + + let(:ssl_eof_msg) { "Database connection error: PQconsumeInput() SSL error: unexpected eof while reading" } + + it "retries once on an idle-socket SSL EOF" do + call_count = 0 + allow(mock_pgmq).to receive(:with_connection) do |&block| + call_count += 1 + raise PGMQ::Errors::ConnectionError, ssl_eof_msg if call_count == 1 + + block.call(raw_conn) + end + + expect do + client.notify_stream("chat", { "html" => "" }) + end.not_to raise_error + + expect(call_count).to eq(2) + end + + it "does not retry on a non-matching ConnectionError" do + allow(mock_pgmq).to receive(:with_connection) + .and_raise(PGMQ::Errors::ConnectionError, "Connection pool timeout: waited 5.00s") + + expect do + client.notify_stream("chat", { "html" => "X" }) + end.to raise_error(PGMQ::Errors::ConnectionError, /pool timeout/) + end + + it "gives up after one retry (double failure) and re-raises" do + call_count = 0 + allow(mock_pgmq).to receive(:with_connection) do |&_block| + call_count += 1 + raise PGMQ::Errors::ConnectionError, ssl_eof_msg + end + + expect do + client.notify_stream("chat", { "html" => "X" }) + end.to raise_error(PGMQ::Errors::ConnectionError) + + expect(call_count).to eq(2) + end + + it "logs a warning on the retry path" do + call_count = 0 + allow(mock_pgmq).to receive(:with_connection) do |&block| + call_count += 1 + raise PGMQ::Errors::ConnectionError, ssl_eof_msg if call_count == 1 + + block.call(raw_conn) + end + + allow(Pgbus.logger).to receive(:warn) + + client.notify_stream("chat", { "html" => "X" }) + + expect(Pgbus.logger).to have_received(:warn) + end + end end end