From e21cc261cb023302f79ce395ee3f38b1b50c7889 Mon Sep 17 00:00:00 2001 From: Brandon Dewitt Date: Sun, 11 Oct 2020 16:54:58 -0400 Subject: [PATCH 1/2] add the ability to pause publishing in the redis queue and lengthen the value of in memory queue before flushing to redis --- lib/active_publisher/async/redis_adapter.rb | 3 ++- .../async/redis_adapter/redis_multi_pop_queue.rb | 9 +++++++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/lib/active_publisher/async/redis_adapter.rb b/lib/active_publisher/async/redis_adapter.rb index c418e60..d940b6d 100644 --- a/lib/active_publisher/async/redis_adapter.rb +++ b/lib/active_publisher/async/redis_adapter.rb @@ -13,6 +13,7 @@ def self.new(*args) end class Adapter + QUEUE_FLUSH_SIZE = 100 SUPERVISOR_INTERVAL = { :execution_interval => 1.5, # seconds :timeout_interval => 1, # seconds @@ -41,7 +42,7 @@ def initialize(new_redis_pool) def publish(route, payload, exchange_name, options = {}) message = ::ActivePublisher::Message.new(route, payload, exchange_name, options) queue << ::Marshal.dump(message) - flush_queue! if queue.size >= 20 || options[:flush_queue] + flush_queue! if queue.size >= QUEUE_FLUSH_SIZE || options[:flush_queue] nil end diff --git a/lib/active_publisher/async/redis_adapter/redis_multi_pop_queue.rb b/lib/active_publisher/async/redis_adapter/redis_multi_pop_queue.rb index 80ecbf9..abbd8b5 100644 --- a/lib/active_publisher/async/redis_adapter/redis_multi_pop_queue.rb +++ b/lib/active_publisher/async/redis_adapter/redis_multi_pop_queue.rb @@ -2,6 +2,8 @@ module ActivePublisher module Async module RedisAdapter class RedisMultiPopQueue + RABBITMQ_PAUSED_KEY = "ACTIVE_PUBLISHER_RABBITMQ_PAUSED".freeze + attr_reader :list_key, :redis_pool def initialize(redis_connection_pool, new_list_key) @@ -36,6 +38,12 @@ def empty? size <= 0 end + def pause_publishing? + redis_pool.with do |redis| + redis.exists(RABBITMQ_PAUSED_KEY) + end + end + def pop_up_to(num_to_pop, opts = {}) case opts when TrueClass, FalseClass @@ -73,6 +81,7 @@ def pop_up_to(num_to_pop, opts = {}) def shift(number) number = [number, size].min return [] if number <= 0 + return [] if pause_publishing? messages = [] multi_response = [] From e1eda6c6e177667aa43025ff963e5db39b361838 Mon Sep 17 00:00:00 2001 From: Brandon Dewitt Date: Mon, 12 Oct 2020 09:01:08 -0400 Subject: [PATCH 2/2] use boolean version --- .../async/redis_adapter/redis_multi_pop_queue.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/active_publisher/async/redis_adapter/redis_multi_pop_queue.rb b/lib/active_publisher/async/redis_adapter/redis_multi_pop_queue.rb index abbd8b5..154b2b3 100644 --- a/lib/active_publisher/async/redis_adapter/redis_multi_pop_queue.rb +++ b/lib/active_publisher/async/redis_adapter/redis_multi_pop_queue.rb @@ -40,7 +40,7 @@ def empty? def pause_publishing? redis_pool.with do |redis| - redis.exists(RABBITMQ_PAUSED_KEY) + redis.exists?(RABBITMQ_PAUSED_KEY) end end