From dd7b83995afb572e10da0ac46f8c3c2c1b414c3f Mon Sep 17 00:00:00 2001 From: Brandon Dewitt Date: Mon, 14 Jan 2019 18:06:11 -0700 Subject: [PATCH 1/3] reconnect on both async and with_exchange publish calls and consolidate where errors are stored --- lib/active_publisher.rb | 32 +++++++++++++++++-- .../in_memory_adapter/consumer_thread.rb | 10 +----- 2 files changed, 31 insertions(+), 11 deletions(-) diff --git a/lib/active_publisher.rb b/lib/active_publisher.rb index 647da66..683362e 100644 --- a/lib/active_publisher.rb +++ b/lib/active_publisher.rb @@ -19,6 +19,23 @@ class UnknownMessageClassError < StandardError; end class ExchangeMismatchError < StandardError; end class FailedPublisherConfirms < StandardError; end + NETWORK_ERRORS = if ::RUBY_PLATFORM == "java" + [ + ::MarchHare::NetworkException, + ::MarchHare::ConnectionRefused, + ::Java::ComRabbitmqClient::AlreadyClosedException, + ::Java::JavaIo::IOException + ].freeze + else + [ + ::Bunny::NetworkFailure, + ::Bunny::TCPConnectionFailed, + ::Bunny::ConnectionTimeout, + ::Timeout::Error, + ::IOError + ].freeze + end + def self.configuration @configuration ||= ::ActivePublisher::Configuration.new end @@ -81,8 +98,19 @@ def self.publishing_options(route, in_options = {}) end def self.with_exchange(exchange_name) - connection = ::ActivePublisher::Connection.connection - channel = connection.create_channel + total_recovery_wait = 0 + + begin + connection = ::ActivePublisher::Connection.connection + channel = connection.create_channel + rescue *NETWORK_ERRORS + # Connection will auto-recover asynchronously; if we are "waiting" for that to happen for longer than 5 minutes then we should + # just disconnect and reconnect the connection (which will be done automatically on disconnect) + total_recovery_wait += 0.5 + sleep 0.5 + ::ActivePublisher::Connection.disconnect! if total_recovery_wait > 600 + end + begin channel.confirm_select if configuration.publisher_confirms exchange = channel.topic(exchange_name) diff --git a/lib/active_publisher/async/in_memory_adapter/consumer_thread.rb b/lib/active_publisher/async/in_memory_adapter/consumer_thread.rb index 346bb32..788f4cb 100644 --- a/lib/active_publisher/async/in_memory_adapter/consumer_thread.rb +++ b/lib/active_publisher/async/in_memory_adapter/consumer_thread.rb @@ -4,14 +4,6 @@ module InMemoryAdapter class ConsumerThread attr_reader :thread, :queue, :sampled_queue_size, :last_tick_at - if ::RUBY_PLATFORM == "java" - NETWORK_ERRORS = [::MarchHare::NetworkException, ::MarchHare::ConnectionRefused, - ::Java::ComRabbitmqClient::AlreadyClosedException, ::Java::JavaIo::IOException].freeze - else - NETWORK_ERRORS = [::Bunny::NetworkFailure, ::Bunny::TCPConnectionFailed, ::Bunny::ConnectionTimeout, - ::Timeout::Error, ::IOError].freeze - end - if ::RUBY_PLATFORM == "java" PRECONDITION_ERRORS = [::MarchHare::PreconditionFailed] else @@ -77,7 +69,7 @@ def start_thread publish_all(@channel, exchange_name, messages) current_messages -= messages end - rescue *NETWORK_ERRORS + rescue *ActivePublisher::NETWORK_ERRORS # Sleep because connection is down await_network_reconnect rescue => unknown_error From f20eb6cb8bc3e8cb061f1f484862c257f7a26dfe Mon Sep 17 00:00:00 2001 From: Brandon Dewitt Date: Mon, 14 Jan 2019 18:38:11 -0700 Subject: [PATCH 2/3] disconnect in an error state when a problem occurs --- lib/active_publisher.rb | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/lib/active_publisher.rb b/lib/active_publisher.rb index 683362e..7c0703f 100644 --- a/lib/active_publisher.rb +++ b/lib/active_publisher.rb @@ -98,17 +98,11 @@ def self.publishing_options(route, in_options = {}) end def self.with_exchange(exchange_name) - total_recovery_wait = 0 - begin connection = ::ActivePublisher::Connection.connection channel = connection.create_channel rescue *NETWORK_ERRORS - # Connection will auto-recover asynchronously; if we are "waiting" for that to happen for longer than 5 minutes then we should - # just disconnect and reconnect the connection (which will be done automatically on disconnect) - total_recovery_wait += 0.5 - sleep 0.5 - ::ActivePublisher::Connection.disconnect! if total_recovery_wait > 600 + ::ActivePublisher::Connection.disconnect! end begin From 7ab4968025c0cd3704d88e314faf6da932e31990 Mon Sep 17 00:00:00 2001 From: Brandon Dewitt Date: Mon, 14 Jan 2019 18:44:30 -0700 Subject: [PATCH 3/3] fix spec --- spec/lib/active_publisher/async/in_memory_adapter_spec.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spec/lib/active_publisher/async/in_memory_adapter_spec.rb b/spec/lib/active_publisher/async/in_memory_adapter_spec.rb index a8435d9..e7edbd9 100644 --- a/spec/lib/active_publisher/async/in_memory_adapter_spec.rb +++ b/spec/lib/active_publisher/async/in_memory_adapter_spec.rb @@ -110,7 +110,7 @@ end context "when network error occurs" do - let(:error) { ActivePublisher::Async::InMemoryAdapter::ConsumerThread::NETWORK_ERRORS.first } + let(:error) { ::ActivePublisher::NETWORK_ERRORS.first } before { allow(consumer).to receive(:publish_all).and_raise(error) } it "requeues the message" do