diff --git a/lib/active_publisher/async/redis_adapter.rb b/lib/active_publisher/async/redis_adapter.rb index 74007b2..0610d72 100644 --- a/lib/active_publisher/async/redis_adapter.rb +++ b/lib/active_publisher/async/redis_adapter.rb @@ -6,7 +6,7 @@ module ActivePublisher module Async module RedisAdapter - REDIS_LIST_KEY = "ACTIVE_PUBLISHER_LIST".freeze + REDIS_LIST_KEY = "ACTIVE_PUBLISHER_LIST.V2".freeze def self.new(*args) ::ActivePublisher::Async::RedisAdapter::Adapter.new(*args) @@ -42,7 +42,7 @@ def initialize(new_redis_pool) def publish(route, payload, exchange_name, options = {}) message = ::ActivePublisher::Message.new(route, payload, exchange_name, options) - queue << ::Marshal.dump(message) + queue << message.to_json flush_queue! if queue.size >= flush_min || options[:flush_queue] nil diff --git a/lib/active_publisher/async/redis_adapter/redis_multi_pop_queue.rb b/lib/active_publisher/async/redis_adapter/redis_multi_pop_queue.rb index 80ecbf9..516bea6 100644 --- a/lib/active_publisher/async/redis_adapter/redis_multi_pop_queue.rb +++ b/lib/active_publisher/async/redis_adapter/redis_multi_pop_queue.rb @@ -1,3 +1,5 @@ +require "json" + module ActivePublisher module Async module RedisAdapter @@ -10,7 +12,7 @@ def initialize(redis_connection_pool, new_list_key) end def <<(message) - encoded_message = ::Marshal.dump(message) + encoded_message = message.to_json redis_pool.with do |redis| redis.rpush(list_key, encoded_message) @@ -24,7 +26,7 @@ def concat(*messages) encoded_messages = [] messages.each do |message| - encoded_messages << ::Marshal.dump(message) + encoded_messages << message.to_json end redis_pool.with do |redis| @@ -92,10 +94,10 @@ def shift(number) messages = [messages] unless messages.respond_to?(:each) shifted_messages = [] + messages.each do |message| next if message.nil? - - shifted_messages << ::Marshal.load(message) + shifted_messages << ::ActivePublisher::Message.from_json(message) end shifted_messages diff --git a/lib/active_publisher/message.rb b/lib/active_publisher/message.rb index 6136684..72f92af 100644 --- a/lib/active_publisher/message.rb +++ b/lib/active_publisher/message.rb @@ -1,3 +1,28 @@ +require "base64" +require "json" +require "active_support/core_ext/hash/keys" + module ActivePublisher - class Message < Struct.new(:route, :payload, :exchange_name, :options); end + class Message < Struct.new(:route, :payload, :exchange_name, :options) + class << self + def from_json(payload) + parsed = JSON.load(payload) + self.new( + parsed["route"], + Base64.decode64(parsed["payload"]), + parsed["exchange_name"], + parsed["options"].symbolize_keys, + ) + end + end + + def to_json + { + route: self.route, + payload: Base64.encode64(self.payload), + exchange_name: self.exchange_name, + options: self.options, + }.to_json + end + end end diff --git a/spec/lib/active_publisher/async/redis_adapter/redis_multi_pop_queue_spec.rb b/spec/lib/active_publisher/async/redis_adapter/redis_multi_pop_queue_spec.rb index 67257eb..61168b4 100644 --- a/spec/lib/active_publisher/async/redis_adapter/redis_multi_pop_queue_spec.rb +++ b/spec/lib/active_publisher/async/redis_adapter/redis_multi_pop_queue_spec.rb @@ -1,6 +1,8 @@ describe ::ActivePublisher::Async::RedisAdapter::RedisMultiPopQueue do let(:list_key) { ::ActivePublisher::Async::RedisAdapter::REDIS_LIST_KEY } let(:redis_pool) { ::ConnectionPool.new(:size => 5) { ::Redis.new } } + let(:message) { ::ActivePublisher::Message.new('rtg.key', 'payload', 'some.exchange', {})} + let(:ten_messages) { 10.times.map { message } } subject { described_class.new(redis_pool, list_key) } describe "initialize with a redis_pool and list_key" do @@ -13,29 +15,18 @@ describe "#<<" do it "pushes 1 item on the list" do - subject << "derp" + subject << message expect(subject.size).to be 1 - expect(subject.pop_up_to(100)).to eq(["derp"]) + expect(subject.pop_up_to(100)).to eq([message]) end it "pushes 10 items on the list" do 10.times do - subject << "derp" + subject << message end expect(subject.size).to be 10 - expect(subject.pop_up_to(100)).to eq([ - "derp", - "derp", - "derp", - "derp", - "derp", - "derp", - "derp", - "derp", - "derp", - "derp", - ]) + expect(subject.pop_up_to(100)).to eq(ten_messages) end end @@ -45,86 +36,40 @@ end it "pushes 1 item on the list" do - subject.concat("derp") + subject.concat(message) expect(subject.size).to be 1 - expect(subject.pop_up_to(100)).to eq(["derp"]) + expect(subject.pop_up_to(100)).to eq([message]) end it "pushes 10 items on the list" do 10.times do - subject.concat("derp") + subject.concat(message) end expect(subject.size).to be 10 - expect(subject.pop_up_to(100)).to eq([ - "derp", - "derp", - "derp", - "derp", - "derp", - "derp", - "derp", - "derp", - "derp", - "derp", - ]) + expect(subject.pop_up_to(100)).to eq(ten_messages) end it "pushes 10 items on the list in single concat" do - subject.concat("derp", - "derp", - "derp", - "derp", - "derp", - "derp", - "derp", - "derp", - "derp", - "derp") + subject.concat(message, + message, + message, + message, + message, + message, + message, + message, + message, + message) expect(subject.size).to be 10 - expect(subject.pop_up_to(100)).to eq([ - "derp", - "derp", - "derp", - "derp", - "derp", - "derp", - "derp", - "derp", - "derp", - "derp", - ]) + expect(subject.pop_up_to(100)).to eq(ten_messages) end it "pushes 10 items on the list in single concat (with array)" do - array = [ - "derp", - "derp", - "derp", - "derp", - "derp", - "derp", - "derp", - "derp", - "derp", - "derp" - ] - - subject.concat(array) + subject.concat(ten_messages) expect(subject.size).to be 10 - expect(subject.pop_up_to(100)).to eq([ - "derp", - "derp", - "derp", - "derp", - "derp", - "derp", - "derp", - "derp", - "derp", - "derp", - ]) + expect(subject.pop_up_to(100)).to eq(ten_messages) end end @@ -135,7 +80,7 @@ it "is false when a single item is inserted to the list_key List" do redis_pool.with do |redis| - redis.rpush(list_key, "derp") + redis.rpush(list_key, message.to_json) end expect(subject.empty?).to be false @@ -144,7 +89,7 @@ it "is false when ten items are inserted to the list_key List" do redis_pool.with do |redis| 10.times do - redis.rpush(list_key, "derp") + redis.rpush(list_key, message.to_json) end end @@ -159,31 +104,20 @@ it "returns 1 item when a single item is inserted to the list_key List" do redis_pool.with do |redis| - redis.rpush(list_key, ::Marshal.dump("derp")) + redis.rpush(list_key, message.to_json) end - expect(subject.pop_up_to(100)).to eq(["derp"]) + expect(subject.pop_up_to(100)).to eq([message]) end it "is 10 when ten items are inserted to the list_key List" do redis_pool.with do |redis| 10.times do - redis.rpush(list_key, ::Marshal.dump("derp")) + redis.rpush(list_key, message.to_json) end end - expect(subject.pop_up_to(100)).to eq([ - "derp", - "derp", - "derp", - "derp", - "derp", - "derp", - "derp", - "derp", - "derp", - "derp", - ]) + expect(subject.pop_up_to(100)).to eq(ten_messages) end end @@ -194,31 +128,20 @@ it "returns 1 item when a single item is inserted to the list_key List" do redis_pool.with do |redis| - redis.rpush(list_key, ::Marshal.dump("derp")) + redis.rpush(list_key, message.to_json) end - expect(subject.shift(100)).to eq(["derp"]) + expect(subject.shift(100)).to eq([message]) end it "is 10 when ten items are inserted to the list_key List" do redis_pool.with do |redis| 10.times do - redis.rpush(list_key, ::Marshal.dump("derp")) + redis.rpush(list_key, message.to_json) end end - expect(subject.shift(100)).to eq([ - "derp", - "derp", - "derp", - "derp", - "derp", - "derp", - "derp", - "derp", - "derp", - "derp", - ]) + expect(subject.shift(100)).to eq(ten_messages) end end @@ -229,7 +152,7 @@ it "is 1 when a single item is inserted to the list_key List" do redis_pool.with do |redis| - redis.rpush(list_key, "derp") + redis.rpush(list_key, message.to_json) end expect(subject.size).to be 1