Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions lib/pgbus/client/notify_stream.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@ def notify_stream(stream_name, payload)
json = payload.is_a?(String) ? payload : JSON.generate(payload)

Instrumentation.instrument("pgbus.stream.notify", stream: stream_name, bytes: json.bytesize) do
synchronized do
@pgmq.__send__(:with_connection) do |conn|
conn.exec_params("SELECT pg_notify($1, $2)", [channel, json])
with_stale_connection_retry do
synchronized do
@pgmq.__send__(:with_connection) do |conn|
conn.exec_params("SELECT pg_notify($1, $2)", [channel, json])
end
end
end
end
Expand Down
63 changes: 63 additions & 0 deletions spec/pgbus/client/notify_stream_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -66,5 +66,68 @@ def initialize(*args, **kwargs); end
)
)
end

context "with stale pgmq connection recovery" do
before do
stub_const("PGMQ::Errors::ConnectionError", Class.new(StandardError)) unless defined?(PGMQ::Errors::ConnectionError)
end

let(:ssl_eof_msg) { "Database connection error: PQconsumeInput() SSL error: unexpected eof while reading" }

it "retries once on an idle-socket SSL EOF" do
call_count = 0
allow(mock_pgmq).to receive(:with_connection) do |&block|
call_count += 1
raise PGMQ::Errors::ConnectionError, ssl_eof_msg if call_count == 1

block.call(raw_conn)
end

expect do
client.notify_stream("chat", { "html" => "<turbo-stream/>" })
end.not_to raise_error

expect(call_count).to eq(2)
end

it "does not retry on a non-matching ConnectionError" do
allow(mock_pgmq).to receive(:with_connection)
.and_raise(PGMQ::Errors::ConnectionError, "Connection pool timeout: waited 5.00s")

expect do
client.notify_stream("chat", { "html" => "X" })
end.to raise_error(PGMQ::Errors::ConnectionError, /pool timeout/)
end

it "gives up after one retry (double failure) and re-raises" do
call_count = 0
allow(mock_pgmq).to receive(:with_connection) do |&_block|
call_count += 1
raise PGMQ::Errors::ConnectionError, ssl_eof_msg
end

expect do
client.notify_stream("chat", { "html" => "X" })
end.to raise_error(PGMQ::Errors::ConnectionError)

expect(call_count).to eq(2)
end

it "logs a warning on the retry path" do
call_count = 0
allow(mock_pgmq).to receive(:with_connection) do |&block|
call_count += 1
raise PGMQ::Errors::ConnectionError, ssl_eof_msg if call_count == 1

block.call(raw_conn)
end

allow(Pgbus.logger).to receive(:warn)

client.notify_stream("chat", { "html" => "X" })

expect(Pgbus.logger).to have_received(:warn)
end
end
end
end