From 0cba1cd7e9c0527e440fafb85ea260800f4208f7 Mon Sep 17 00:00:00 2001 From: Phil Haack Date: Mon, 16 Mar 2026 11:26:32 -0700 Subject: [PATCH 1/4] feat: Add sync_mode for Sidekiq/Resque compatibility Send events synchronously on the calling thread instead of queuing them for a background worker. Follows the same pattern as the Python SDK: in sync_mode, events bypass the queue entirely and are sent inline via Transport#send. Retries are capped at 3 in sync mode (vs 10 in async) to avoid blocking the calling thread for extended periods during API outages. test_mode takes precedence over sync_mode to prevent accidental network calls in test environments. Usage: PostHog::Client.new(api_key: 'key', sync_mode: true) Fixes #10 --- lib/posthog/client.rb | 42 ++++++++++++++ lib/posthog/noop_worker.rb | 4 ++ lib/posthog/send_worker.rb | 4 ++ posthog-rails/lib/posthog/rails/railtie.rb | 4 ++ spec/posthog/client_spec.rb | 66 ++++++++++++++++++++++ 5 files changed, 120 insertions(+) diff --git a/lib/posthog/client.rb b/lib/posthog/client.rb index c9fc619..5e00d3c 100644 --- a/lib/posthog/client.rb +++ b/lib/posthog/client.rb @@ -8,6 +8,8 @@ require 'posthog/utils' require 'posthog/send_worker' require 'posthog/noop_worker' +require 'posthog/message_batch' +require 'posthog/transport' require 'posthog/feature_flags' require 'posthog/send_feature_flags_options' require 'posthog/exception_capture' @@ -53,6 +55,9 @@ def _decrement_instance_count(api_key) # remain queued. Defaults to 10_000. # @option opts [Bool] :test_mode +true+ if messages should remain # queued for testing. Defaults to +false+. + # @option opts [Bool] :sync_mode +true+ to send events synchronously + # on the calling thread. Useful in forking environments like Sidekiq + # and Resque. Defaults to +false+. # @option opts [Proc] :on_error Handles error calls from the API. # @option opts [String] :host Fully qualified hostname of the PostHog server. Defaults to `https://app.posthog.com` # @option opts [Integer] :feature_flags_polling_interval How often to poll for feature flag definition changes. @@ -72,11 +77,22 @@ def initialize(opts = {}) @api_key = opts[:api_key] @max_queue_size = opts[:max_queue_size] || Defaults::Queue::MAX_SIZE @worker_mutex = Mutex.new + @sync_mode = opts[:sync_mode] == true && !opts[:test_mode] + @on_error = opts[:on_error] || proc { |status, error| } @worker = if opts[:test_mode] NoopWorker.new(@queue) + elsif @sync_mode + nil else SendWorker.new(@queue, @api_key, opts) end + @transport = if @sync_mode + Transport.new( + api_host: opts[:host], + skip_ssl_verification: opts[:skip_ssl_verification], + retries: 3 + ) + end @worker_thread = nil @feature_flags_poller = nil @personal_api_key = opts[:personal_api_key] @@ -118,6 +134,8 @@ def initialize(opts = {}) # Use only for scripts which are not long-running, and will specifically # exit def flush + return if @sync_mode + while !@queue.empty? || @worker.is_requesting? ensure_worker_running sleep(0.1) @@ -491,6 +509,11 @@ def shutdown self.class._decrement_instance_count(@api_key) if @api_key @feature_flags_poller.shutdown_poller flush + if @sync_mode + @transport&.shutdown + else + @worker&.shutdown + end end private @@ -528,6 +551,11 @@ def enqueue(action) # add our request id for tracing purposes action[:messageId] ||= uid + if @sync_mode + send_sync(action) + return true + end + if @queue.length < @max_queue_size @queue << action ensure_worker_running @@ -558,6 +586,20 @@ def ensure_worker_running end end + def send_sync(action) + batch = MessageBatch.new(1) + begin + batch << action + rescue MessageBatch::JSONGenerationError => e + @on_error.call(-1, e.to_s) + return + end + return if batch.empty? + + res = @transport.send(@api_key, batch) + @on_error.call(res.status, res.error) unless res.status == 200 + end + def worker_running? @worker_thread&.alive? end diff --git a/lib/posthog/noop_worker.rb b/lib/posthog/noop_worker.rb index f1d2e58..a30e347 100644 --- a/lib/posthog/noop_worker.rb +++ b/lib/posthog/noop_worker.rb @@ -15,5 +15,9 @@ def run def is_requesting? # rubocop:disable Naming/PredicateName false end + + def shutdown + # Does nothing + end end end diff --git a/lib/posthog/send_worker.rb b/lib/posthog/send_worker.rb index 5e1fa8f..794559d 100644 --- a/lib/posthog/send_worker.rb +++ b/lib/posthog/send_worker.rb @@ -54,6 +54,10 @@ def run @transport.shutdown end + def shutdown + @transport.shutdown + end + # public: Check whether we have outstanding requests. # # TODO: Rename to `requesting?` in future version diff --git a/posthog-rails/lib/posthog/rails/railtie.rb b/posthog-rails/lib/posthog/rails/railtie.rb index 62bb72a..519e55b 100644 --- a/posthog-rails/lib/posthog/rails/railtie.rb +++ b/posthog-rails/lib/posthog/rails/railtie.rb @@ -160,6 +160,10 @@ def test_mode=(value) @base_options[:test_mode] = value end + def sync_mode=(value) + @base_options[:sync_mode] = value + end + def on_error=(value) @base_options[:on_error] = value end diff --git a/spec/posthog/client_spec.rb b/spec/posthog/client_spec.rb index 002486f..28ce781 100644 --- a/spec/posthog/client_spec.rb +++ b/spec/posthog/client_spec.rb @@ -93,6 +93,72 @@ module PostHog end end + describe 'sync_mode' do + around do |example| + PostHog::Transport.stub = true + example.call + PostHog::Transport.stub = false + end + + it 'sends events inline without using a queue or worker' do + sync_client = Client.new(api_key: API_KEY, sync_mode: true) + sync_client.capture(Queued::CAPTURE) + + worker = sync_client.instance_variable_get(:@worker) + worker_thread = sync_client.instance_variable_get(:@worker_thread) + expect(worker).to be_nil + expect(worker_thread).to be_nil + expect(sync_client.queued_messages).to eq(0) + end + + it 'calls on_error when the request fails' do + error_status = nil + on_error = proc { |status, _error| error_status = status } + + allow_any_instance_of(PostHog::Transport).to( + receive(:send).and_return(PostHog::Response.new(400, 'Bad request')) + ) + + sync_client = Client.new(api_key: API_KEY, sync_mode: true, on_error: on_error) + sync_client.capture(Queued::CAPTURE) + + expect(error_status).to eq(400) + end + + it 'flush does not attempt to run a worker' do + sync_client = Client.new(api_key: API_KEY, sync_mode: true) + expect(sync_client).not_to receive(:ensure_worker_running) + sync_client.flush + end + + it 'calls on_error with status -1 when message serialization fails' do + error_status = nil + error_message = nil + on_error = proc { |status, error| + error_status = status + error_message = error + } + + allow_any_instance_of(PostHog::MessageBatch).to( + receive(:<<).and_raise(PostHog::MessageBatch::JSONGenerationError, 'Serialization error') + ) + + sync_client = Client.new(api_key: API_KEY, sync_mode: true, on_error: on_error) + sync_client.capture(Queued::CAPTURE) + + expect(error_status).to eq(-1) + expect(error_message).to include('Serialization error') + end + + it 'prefers test_mode over sync_mode' do + both_client = Client.new(api_key: API_KEY, test_mode: true, sync_mode: true) + worker = both_client.instance_variable_get(:@worker) + sync_mode = both_client.instance_variable_get(:@sync_mode) + expect(worker).to be_a(PostHog::NoopWorker) + expect(sync_mode).to eq(false) + end + end + describe '#capture' do it 'errors without an event' do expect { client.capture(distinct_id: 'user') }.to raise_error( From a3880866e23ad2cef3334710e40825ac24cca347 Mon Sep 17 00:00:00 2001 From: Phil Haack Date: Mon, 16 Mar 2026 14:46:07 -0700 Subject: [PATCH 2/4] Bump version to 3.6.0 --- lib/posthog/version.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/posthog/version.rb b/lib/posthog/version.rb index d65ddb0..35da7d1 100644 --- a/lib/posthog/version.rb +++ b/lib/posthog/version.rb @@ -1,5 +1,5 @@ # frozen_string_literal: true module PostHog - VERSION = '3.5.5' + VERSION = '3.6.0' end From 4a881b86c780719849da180f3def02d2375cd25b Mon Sep 17 00:00:00 2001 From: Phil Haack Date: Mon, 16 Mar 2026 14:59:21 -0700 Subject: [PATCH 3/4] Guard sync_mode transport with mutex for thread safety --- lib/posthog/client.rb | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/lib/posthog/client.rb b/lib/posthog/client.rb index 5e00d3c..56f573c 100644 --- a/lib/posthog/client.rb +++ b/lib/posthog/client.rb @@ -86,13 +86,14 @@ def initialize(opts = {}) else SendWorker.new(@queue, @api_key, opts) end - @transport = if @sync_mode - Transport.new( - api_host: opts[:host], - skip_ssl_verification: opts[:skip_ssl_verification], - retries: 3 - ) - end + if @sync_mode + @transport = Transport.new( + api_host: opts[:host], + skip_ssl_verification: opts[:skip_ssl_verification], + retries: 3 + ) + @sync_lock = Mutex.new + end @worker_thread = nil @feature_flags_poller = nil @personal_api_key = opts[:personal_api_key] @@ -596,8 +597,10 @@ def send_sync(action) end return if batch.empty? - res = @transport.send(@api_key, batch) - @on_error.call(res.status, res.error) unless res.status == 200 + @sync_lock.synchronize do + res = @transport.send(@api_key, batch) + @on_error.call(res.status, res.error) unless res.status == 200 + end end def worker_running? From 7f2e7f9be23e0c81ae3aed2a8c5e51262d1af78e Mon Sep 17 00:00:00 2001 From: Phil Haack Date: Mon, 16 Mar 2026 15:23:02 -0700 Subject: [PATCH 4/4] Coordinate flush and shutdown with sync_lock --- lib/posthog/client.rb | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/lib/posthog/client.rb b/lib/posthog/client.rb index 56f573c..5e65860 100644 --- a/lib/posthog/client.rb +++ b/lib/posthog/client.rb @@ -135,7 +135,11 @@ def initialize(opts = {}) # Use only for scripts which are not long-running, and will specifically # exit def flush - return if @sync_mode + if @sync_mode + # Wait for any in-flight sync send to complete + @sync_lock.synchronize {} # rubocop:disable Lint/EmptyBlock + return + end while !@queue.empty? || @worker.is_requesting? ensure_worker_running @@ -511,7 +515,7 @@ def shutdown @feature_flags_poller.shutdown_poller flush if @sync_mode - @transport&.shutdown + @sync_lock.synchronize { @transport&.shutdown } else @worker&.shutdown end