diff --git a/lib/posthog/client.rb b/lib/posthog/client.rb index c9fc619..5e65860 100644 --- a/lib/posthog/client.rb +++ b/lib/posthog/client.rb @@ -8,6 +8,8 @@ require 'posthog/utils' require 'posthog/send_worker' require 'posthog/noop_worker' +require 'posthog/message_batch' +require 'posthog/transport' require 'posthog/feature_flags' require 'posthog/send_feature_flags_options' require 'posthog/exception_capture' @@ -53,6 +55,9 @@ def _decrement_instance_count(api_key) # remain queued. Defaults to 10_000. # @option opts [Bool] :test_mode +true+ if messages should remain # queued for testing. Defaults to +false+. + # @option opts [Bool] :sync_mode +true+ to send events synchronously + # on the calling thread. Useful in forking environments like Sidekiq + # and Resque. Defaults to +false+. # @option opts [Proc] :on_error Handles error calls from the API. # @option opts [String] :host Fully qualified hostname of the PostHog server. Defaults to `https://app.posthog.com` # @option opts [Integer] :feature_flags_polling_interval How often to poll for feature flag definition changes. @@ -72,11 +77,23 @@ def initialize(opts = {}) @api_key = opts[:api_key] @max_queue_size = opts[:max_queue_size] || Defaults::Queue::MAX_SIZE @worker_mutex = Mutex.new + @sync_mode = opts[:sync_mode] == true && !opts[:test_mode] + @on_error = opts[:on_error] || proc { |status, error| } @worker = if opts[:test_mode] NoopWorker.new(@queue) + elsif @sync_mode + nil else SendWorker.new(@queue, @api_key, opts) end + if @sync_mode + @transport = Transport.new( + api_host: opts[:host], + skip_ssl_verification: opts[:skip_ssl_verification], + retries: 3 + ) + @sync_lock = Mutex.new + end @worker_thread = nil @feature_flags_poller = nil @personal_api_key = opts[:personal_api_key] @@ -118,6 +135,12 @@ def initialize(opts = {}) # Use only for scripts which are not long-running, and will specifically # exit def flush + if @sync_mode + # Wait for any in-flight sync send to complete + @sync_lock.synchronize {} # rubocop:disable Lint/EmptyBlock + return + end + while !@queue.empty? || @worker.is_requesting? ensure_worker_running sleep(0.1) @@ -491,6 +514,11 @@ def shutdown self.class._decrement_instance_count(@api_key) if @api_key @feature_flags_poller.shutdown_poller flush + if @sync_mode + @sync_lock.synchronize { @transport&.shutdown } + else + @worker&.shutdown + end end private @@ -528,6 +556,11 @@ def enqueue(action) # add our request id for tracing purposes action[:messageId] ||= uid + if @sync_mode + send_sync(action) + return true + end + if @queue.length < @max_queue_size @queue << action ensure_worker_running @@ -558,6 +591,22 @@ def ensure_worker_running end end + def send_sync(action) + batch = MessageBatch.new(1) + begin + batch << action + rescue MessageBatch::JSONGenerationError => e + @on_error.call(-1, e.to_s) + return + end + return if batch.empty? + + @sync_lock.synchronize do + res = @transport.send(@api_key, batch) + @on_error.call(res.status, res.error) unless res.status == 200 + end + end + def worker_running? @worker_thread&.alive? end diff --git a/lib/posthog/noop_worker.rb b/lib/posthog/noop_worker.rb index f1d2e58..a30e347 100644 --- a/lib/posthog/noop_worker.rb +++ b/lib/posthog/noop_worker.rb @@ -15,5 +15,9 @@ def run def is_requesting? # rubocop:disable Naming/PredicateName false end + + def shutdown + # Does nothing + end end end diff --git a/lib/posthog/send_worker.rb b/lib/posthog/send_worker.rb index 5e1fa8f..794559d 100644 --- a/lib/posthog/send_worker.rb +++ b/lib/posthog/send_worker.rb @@ -54,6 +54,10 @@ def run @transport.shutdown end + def shutdown + @transport.shutdown + end + # public: Check whether we have outstanding requests. # # TODO: Rename to `requesting?` in future version diff --git a/lib/posthog/version.rb b/lib/posthog/version.rb index d65ddb0..35da7d1 100644 --- a/lib/posthog/version.rb +++ b/lib/posthog/version.rb @@ -1,5 +1,5 @@ # frozen_string_literal: true module PostHog - VERSION = '3.5.5' + VERSION = '3.6.0' end diff --git a/posthog-rails/lib/posthog/rails/railtie.rb b/posthog-rails/lib/posthog/rails/railtie.rb index 62bb72a..519e55b 100644 --- a/posthog-rails/lib/posthog/rails/railtie.rb +++ b/posthog-rails/lib/posthog/rails/railtie.rb @@ -160,6 +160,10 @@ def test_mode=(value) @base_options[:test_mode] = value end + def sync_mode=(value) + @base_options[:sync_mode] = value + end + def on_error=(value) @base_options[:on_error] = value end diff --git a/spec/posthog/client_spec.rb b/spec/posthog/client_spec.rb index 002486f..28ce781 100644 --- a/spec/posthog/client_spec.rb +++ b/spec/posthog/client_spec.rb @@ -93,6 +93,72 @@ module PostHog end end + describe 'sync_mode' do + around do |example| + PostHog::Transport.stub = true + example.call + PostHog::Transport.stub = false + end + + it 'sends events inline without using a queue or worker' do + sync_client = Client.new(api_key: API_KEY, sync_mode: true) + sync_client.capture(Queued::CAPTURE) + + worker = sync_client.instance_variable_get(:@worker) + worker_thread = sync_client.instance_variable_get(:@worker_thread) + expect(worker).to be_nil + expect(worker_thread).to be_nil + expect(sync_client.queued_messages).to eq(0) + end + + it 'calls on_error when the request fails' do + error_status = nil + on_error = proc { |status, _error| error_status = status } + + allow_any_instance_of(PostHog::Transport).to( + receive(:send).and_return(PostHog::Response.new(400, 'Bad request')) + ) + + sync_client = Client.new(api_key: API_KEY, sync_mode: true, on_error: on_error) + sync_client.capture(Queued::CAPTURE) + + expect(error_status).to eq(400) + end + + it 'flush does not attempt to run a worker' do + sync_client = Client.new(api_key: API_KEY, sync_mode: true) + expect(sync_client).not_to receive(:ensure_worker_running) + sync_client.flush + end + + it 'calls on_error with status -1 when message serialization fails' do + error_status = nil + error_message = nil + on_error = proc { |status, error| + error_status = status + error_message = error + } + + allow_any_instance_of(PostHog::MessageBatch).to( + receive(:<<).and_raise(PostHog::MessageBatch::JSONGenerationError, 'Serialization error') + ) + + sync_client = Client.new(api_key: API_KEY, sync_mode: true, on_error: on_error) + sync_client.capture(Queued::CAPTURE) + + expect(error_status).to eq(-1) + expect(error_message).to include('Serialization error') + end + + it 'prefers test_mode over sync_mode' do + both_client = Client.new(api_key: API_KEY, test_mode: true, sync_mode: true) + worker = both_client.instance_variable_get(:@worker) + sync_mode = both_client.instance_variable_get(:@sync_mode) + expect(worker).to be_a(PostHog::NoopWorker) + expect(sync_mode).to eq(false) + end + end + describe '#capture' do it 'errors without an event' do expect { client.capture(distinct_id: 'user') }.to raise_error(