diff --git a/lib/pgbus/client/notify_stream.rb b/lib/pgbus/client/notify_stream.rb index 4d2c6bb..1f9087e 100644 --- a/lib/pgbus/client/notify_stream.rb +++ b/lib/pgbus/client/notify_stream.rb @@ -25,9 +25,11 @@ def notify_stream(stream_name, payload) json = payload.is_a?(String) ? payload : JSON.generate(payload) Instrumentation.instrument("pgbus.stream.notify", stream: stream_name, bytes: json.bytesize) do - synchronized do - @pgmq.__send__(:with_connection) do |conn| - conn.exec_params("SELECT pg_notify($1, $2)", [channel, json]) + with_stale_connection_retry do + synchronized do + @pgmq.__send__(:with_connection) do |conn| + conn.exec_params("SELECT pg_notify($1, $2)", [channel, json]) + end end end end diff --git a/spec/pgbus/client/notify_stream_spec.rb b/spec/pgbus/client/notify_stream_spec.rb index a109f1d..06c671c 100644 --- a/spec/pgbus/client/notify_stream_spec.rb +++ b/spec/pgbus/client/notify_stream_spec.rb @@ -66,5 +66,68 @@ def initialize(*args, **kwargs); end ) ) end + + context "with stale pgmq connection recovery" do + before do + stub_const("PGMQ::Errors::ConnectionError", Class.new(StandardError)) unless defined?(PGMQ::Errors::ConnectionError) + end + + let(:ssl_eof_msg) { "Database connection error: PQconsumeInput() SSL error: unexpected eof while reading" } + + it "retries once on an idle-socket SSL EOF" do + call_count = 0 + allow(mock_pgmq).to receive(:with_connection) do |&block| + call_count += 1 + raise PGMQ::Errors::ConnectionError, ssl_eof_msg if call_count == 1 + + block.call(raw_conn) + end + + expect do + client.notify_stream("chat", { "html" => "" }) + end.not_to raise_error + + expect(call_count).to eq(2) + end + + it "does not retry on a non-matching ConnectionError" do + allow(mock_pgmq).to receive(:with_connection) + .and_raise(PGMQ::Errors::ConnectionError, "Connection pool timeout: waited 5.00s") + + expect do + client.notify_stream("chat", { "html" => "X" }) + end.to raise_error(PGMQ::Errors::ConnectionError, /pool timeout/) + end + + it "gives up after one retry (double failure) and re-raises" do + call_count = 0 + allow(mock_pgmq).to receive(:with_connection) do |&_block| + call_count += 1 + raise PGMQ::Errors::ConnectionError, ssl_eof_msg + end + + expect do + client.notify_stream("chat", { "html" => "X" }) + end.to raise_error(PGMQ::Errors::ConnectionError) + + expect(call_count).to eq(2) + end + + it "logs a warning on the retry path" do + call_count = 0 + allow(mock_pgmq).to receive(:with_connection) do |&block| + call_count += 1 + raise PGMQ::Errors::ConnectionError, ssl_eof_msg if call_count == 1 + + block.call(raw_conn) + end + + allow(Pgbus.logger).to receive(:warn) + + client.notify_stream("chat", { "html" => "X" }) + + expect(Pgbus.logger).to have_received(:warn) + end + end end end