Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions lib/posthog/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
require 'posthog/utils'
require 'posthog/send_worker'
require 'posthog/noop_worker'
require 'posthog/message_batch'
require 'posthog/transport'
require 'posthog/feature_flags'
require 'posthog/send_feature_flags_options'
require 'posthog/exception_capture'
Expand Down Expand Up @@ -53,6 +55,9 @@ def _decrement_instance_count(api_key)
# remain queued. Defaults to 10_000.
# @option opts [Bool] :test_mode +true+ if messages should remain
# queued for testing. Defaults to +false+.
# @option opts [Bool] :sync_mode +true+ to send events synchronously
# on the calling thread. Useful in forking environments like Sidekiq
# and Resque. Defaults to +false+.
# @option opts [Proc] :on_error Handles error calls from the API.
# @option opts [String] :host Fully qualified hostname of the PostHog server. Defaults to `https://app.posthog.com`
# @option opts [Integer] :feature_flags_polling_interval How often to poll for feature flag definition changes.
Expand All @@ -72,11 +77,23 @@ def initialize(opts = {})
@api_key = opts[:api_key]
@max_queue_size = opts[:max_queue_size] || Defaults::Queue::MAX_SIZE
@worker_mutex = Mutex.new
@sync_mode = opts[:sync_mode] == true && !opts[:test_mode]
@on_error = opts[:on_error] || proc { |status, error| }
@worker = if opts[:test_mode]
NoopWorker.new(@queue)
elsif @sync_mode
nil
else
SendWorker.new(@queue, @api_key, opts)
end
if @sync_mode
@transport = Transport.new(
api_host: opts[:host],
skip_ssl_verification: opts[:skip_ssl_verification],
retries: 3
)
@sync_lock = Mutex.new
end
@worker_thread = nil
@feature_flags_poller = nil
@personal_api_key = opts[:personal_api_key]
Expand Down Expand Up @@ -118,6 +135,12 @@ def initialize(opts = {})
# Use only for scripts which are not long-running, and will specifically
# exit
def flush
if @sync_mode
# Wait for any in-flight sync send to complete
@sync_lock.synchronize {} # rubocop:disable Lint/EmptyBlock
return
end

while !@queue.empty? || @worker.is_requesting?
ensure_worker_running
sleep(0.1)
Expand Down Expand Up @@ -491,6 +514,11 @@ def shutdown
self.class._decrement_instance_count(@api_key) if @api_key
@feature_flags_poller.shutdown_poller
flush
if @sync_mode
@sync_lock.synchronize { @transport&.shutdown }
else
@worker&.shutdown
end
end

private
Expand Down Expand Up @@ -528,6 +556,11 @@ def enqueue(action)
# add our request id for tracing purposes
action[:messageId] ||= uid

if @sync_mode
send_sync(action)
return true
end

if @queue.length < @max_queue_size
@queue << action
ensure_worker_running
Expand Down Expand Up @@ -558,6 +591,22 @@ def ensure_worker_running
end
end

def send_sync(action)
batch = MessageBatch.new(1)
begin
batch << action
rescue MessageBatch::JSONGenerationError => e
@on_error.call(-1, e.to_s)
return
end
return if batch.empty?

@sync_lock.synchronize do
res = @transport.send(@api_key, batch)
@on_error.call(res.status, res.error) unless res.status == 200
end
end

def worker_running?
@worker_thread&.alive?
end
Expand Down
4 changes: 4 additions & 0 deletions lib/posthog/noop_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,9 @@ def run
def is_requesting? # rubocop:disable Naming/PredicateName
false
end

def shutdown
# Does nothing
end
end
end
4 changes: 4 additions & 0 deletions lib/posthog/send_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ def run
@transport.shutdown
end

def shutdown
@transport.shutdown
end

# public: Check whether we have outstanding requests.
#
# TODO: Rename to `requesting?` in future version
Expand Down
2 changes: 1 addition & 1 deletion lib/posthog/version.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# frozen_string_literal: true

module PostHog
VERSION = '3.5.5'
VERSION = '3.6.0'
end
4 changes: 4 additions & 0 deletions posthog-rails/lib/posthog/rails/railtie.rb
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,10 @@ def test_mode=(value)
@base_options[:test_mode] = value
end

def sync_mode=(value)
@base_options[:sync_mode] = value
end

def on_error=(value)
@base_options[:on_error] = value
end
Expand Down
66 changes: 66 additions & 0 deletions spec/posthog/client_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,72 @@ module PostHog
end
end

describe 'sync_mode' do
around do |example|
PostHog::Transport.stub = true
example.call
PostHog::Transport.stub = false
end

it 'sends events inline without using a queue or worker' do
sync_client = Client.new(api_key: API_KEY, sync_mode: true)
sync_client.capture(Queued::CAPTURE)

worker = sync_client.instance_variable_get(:@worker)
worker_thread = sync_client.instance_variable_get(:@worker_thread)
expect(worker).to be_nil
expect(worker_thread).to be_nil
expect(sync_client.queued_messages).to eq(0)
end

it 'calls on_error when the request fails' do
error_status = nil
on_error = proc { |status, _error| error_status = status }

allow_any_instance_of(PostHog::Transport).to(
receive(:send).and_return(PostHog::Response.new(400, 'Bad request'))
)

sync_client = Client.new(api_key: API_KEY, sync_mode: true, on_error: on_error)
sync_client.capture(Queued::CAPTURE)

expect(error_status).to eq(400)
end

it 'flush does not attempt to run a worker' do
sync_client = Client.new(api_key: API_KEY, sync_mode: true)
expect(sync_client).not_to receive(:ensure_worker_running)
sync_client.flush
end

it 'calls on_error with status -1 when message serialization fails' do
error_status = nil
error_message = nil
on_error = proc { |status, error|
error_status = status
error_message = error
}

allow_any_instance_of(PostHog::MessageBatch).to(
receive(:<<).and_raise(PostHog::MessageBatch::JSONGenerationError, 'Serialization error')
)

sync_client = Client.new(api_key: API_KEY, sync_mode: true, on_error: on_error)
sync_client.capture(Queued::CAPTURE)

expect(error_status).to eq(-1)
expect(error_message).to include('Serialization error')
end

it 'prefers test_mode over sync_mode' do
both_client = Client.new(api_key: API_KEY, test_mode: true, sync_mode: true)
worker = both_client.instance_variable_get(:@worker)
sync_mode = both_client.instance_variable_get(:@sync_mode)
expect(worker).to be_a(PostHog::NoopWorker)
expect(sync_mode).to eq(false)
end
end

describe '#capture' do
it 'errors without an event' do
expect { client.capture(distinct_id: 'user') }.to raise_error(
Expand Down
Loading