diff --git a/lib/active_publisher.rb b/lib/active_publisher.rb index 647da66..7c0703f 100644 --- a/lib/active_publisher.rb +++ b/lib/active_publisher.rb @@ -19,6 +19,23 @@ class UnknownMessageClassError < StandardError; end class ExchangeMismatchError < StandardError; end class FailedPublisherConfirms < StandardError; end + NETWORK_ERRORS = if ::RUBY_PLATFORM == "java" + [ + ::MarchHare::NetworkException, + ::MarchHare::ConnectionRefused, + ::Java::ComRabbitmqClient::AlreadyClosedException, + ::Java::JavaIo::IOException + ].freeze + else + [ + ::Bunny::NetworkFailure, + ::Bunny::TCPConnectionFailed, + ::Bunny::ConnectionTimeout, + ::Timeout::Error, + ::IOError + ].freeze + end + def self.configuration @configuration ||= ::ActivePublisher::Configuration.new end @@ -81,8 +98,13 @@ def self.publishing_options(route, in_options = {}) end def self.with_exchange(exchange_name) - connection = ::ActivePublisher::Connection.connection - channel = connection.create_channel + begin + connection = ::ActivePublisher::Connection.connection + channel = connection.create_channel + rescue *NETWORK_ERRORS + ::ActivePublisher::Connection.disconnect! + end + begin channel.confirm_select if configuration.publisher_confirms exchange = channel.topic(exchange_name) diff --git a/lib/active_publisher/async/in_memory_adapter/consumer_thread.rb b/lib/active_publisher/async/in_memory_adapter/consumer_thread.rb index 346bb32..788f4cb 100644 --- a/lib/active_publisher/async/in_memory_adapter/consumer_thread.rb +++ b/lib/active_publisher/async/in_memory_adapter/consumer_thread.rb @@ -4,14 +4,6 @@ module InMemoryAdapter class ConsumerThread attr_reader :thread, :queue, :sampled_queue_size, :last_tick_at - if ::RUBY_PLATFORM == "java" - NETWORK_ERRORS = [::MarchHare::NetworkException, ::MarchHare::ConnectionRefused, - ::Java::ComRabbitmqClient::AlreadyClosedException, ::Java::JavaIo::IOException].freeze - else - NETWORK_ERRORS = [::Bunny::NetworkFailure, ::Bunny::TCPConnectionFailed, ::Bunny::ConnectionTimeout, - ::Timeout::Error, ::IOError].freeze - end - if ::RUBY_PLATFORM == "java" PRECONDITION_ERRORS = [::MarchHare::PreconditionFailed] else @@ -77,7 +69,7 @@ def start_thread publish_all(@channel, exchange_name, messages) current_messages -= messages end - rescue *NETWORK_ERRORS + rescue *ActivePublisher::NETWORK_ERRORS # Sleep because connection is down await_network_reconnect rescue => unknown_error diff --git a/spec/lib/active_publisher/async/in_memory_adapter_spec.rb b/spec/lib/active_publisher/async/in_memory_adapter_spec.rb index a8435d9..e7edbd9 100644 --- a/spec/lib/active_publisher/async/in_memory_adapter_spec.rb +++ b/spec/lib/active_publisher/async/in_memory_adapter_spec.rb @@ -110,7 +110,7 @@ end context "when network error occurs" do - let(:error) { ActivePublisher::Async::InMemoryAdapter::ConsumerThread::NETWORK_ERRORS.first } + let(:error) { ::ActivePublisher::NETWORK_ERRORS.first } before { allow(consumer).to receive(:publish_all).and_raise(error) } it "requeues the message" do