Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 112 additions & 0 deletions examples/redis_flag_cache.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
# frozen_string_literal: true

# Redis-based distributed cache for PostHog feature flag definitions.
#
# This example demonstrates how to implement a FlagDefinitionCacheProvider
# using Redis for multi-instance deployments (leader election pattern).
#
# Usage:
# require 'redis'
# require 'posthog'
# require_relative 'redis_flag_cache'
#
# redis = Redis.new(host: 'localhost', port: 6379)
# cache = RedisFlagCache.new(redis, service_key: 'my-service')
#
# posthog = PostHog::Client.new(
# api_key: '<project_api_key>',
# personal_api_key: '<personal_api_key>',
# flag_definition_cache_provider: cache
# )
#
# Requirements:
# gem install redis

require 'json'
require 'securerandom'

# A distributed cache for PostHog feature flag definitions using Redis.
#
# In a multi-instance deployment (e.g., multiple serverless functions or
# containers), we want only ONE instance to poll PostHog for flag updates,
# while all instances share the cached results. This prevents N instances
# from making N redundant API calls.
#
# Uses leader election:
# - One instance "wins" and becomes responsible for fetching
# - Other instances read from the shared cache
# - If the leader dies, the lock expires (TTL) and another instance takes over
#
# Uses Lua scripts for atomic operations, following Redis distributed lock
# best practices: https://redis.io/docs/latest/develop/clients/patterns/distributed-locks/
class RedisFlagCache
LOCK_TTL_MS = 60 * 1000 # 60 seconds, should be longer than the flags poll interval
CACHE_TTL_SECONDS = 60 * 60 * 24 # 24 hours

# Lua script: acquire lock if free, or extend if we own it
LUA_TRY_LEAD = <<~LUA
local current = redis.call('GET', KEYS[1])
if current == false then
redis.call('SET', KEYS[1], ARGV[1], 'PX', ARGV[2])
return 1
elseif current == ARGV[1] then
redis.call('PEXPIRE', KEYS[1], ARGV[2])
return 1
end
return 0
LUA

# Lua script: release lock only if we own it
LUA_STOP_LEAD = <<~LUA
if redis.call('GET', KEYS[1]) == ARGV[1] then
return redis.call('DEL', KEYS[1])
end
return 0
LUA

# @param redis [Redis] A redis client instance
# @param service_key [String] Unique identifier for this service/environment,
# used to scope Redis keys. Examples: "my-api-prod", "checkout-service"
#
# Redis keys created:
# - posthog:flags:{service_key} — cached flag definitions (JSON)
# - posthog:flags:{service_key}:lock — leader election lock
def initialize(redis, service_key:)
@redis = redis
@cache_key = "posthog:flags:#{service_key}"
@lock_key = "posthog:flags:#{service_key}:lock"
@instance_id = SecureRandom.uuid
end

# Retrieve cached flag definitions from Redis.
#
# @return [Hash, nil] Cached flag definitions, or nil if cache is empty
def flag_definitions
cached = @redis.get(@cache_key)
return nil unless cached

JSON.parse(cached)
end

# Determine if this instance should fetch flag definitions from PostHog.
#
# Atomically either acquires the lock (if free) or extends it (if we own it).
#
# @return [Boolean] true if this instance is the leader and should fetch
def should_fetch_flag_definitions?
result = @redis.eval(LUA_TRY_LEAD, keys: [@lock_key], argv: [@instance_id, LOCK_TTL_MS.to_s])
result == 1
end

# Store fetched flag definitions in Redis.
#
# @param data [Hash] Flag definitions to cache
def on_flag_definitions_received(data)
@redis.set(@cache_key, JSON.dump(data), ex: CACHE_TTL_SECONDS)
end

# Release leadership if we hold it. Safe to call even if not the leader.
def shutdown
@redis.eval(LUA_STOP_LEAD, keys: [@lock_key], argv: [@instance_id])
end
end
1 change: 1 addition & 0 deletions lib/posthog.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@
require 'posthog/exception_capture'
require 'posthog/feature_flag_error'
require 'posthog/feature_flag_result'
require 'posthog/flag_definition_cache'
6 changes: 5 additions & 1 deletion lib/posthog/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ def _decrement_instance_count(api_key)
# to be sent to PostHog or nil to prevent the event from being sent. e.g. `before_send: ->(event) { event }`
# @option opts [Bool] :disable_singleton_warning +true+ to suppress the warning when multiple clients
# share the same API key. Use only when you intentionally need multiple clients. Defaults to +false+.
# @option opts [Object] :flag_definition_cache_provider An object implementing the
# {FlagDefinitionCacheProvider} interface for distributed flag definition caching.
# EXPERIMENTAL: This API may change in future minor version bumps.
def initialize(opts = {})
symbolize_keys!(opts)

Expand Down Expand Up @@ -120,7 +123,8 @@ def initialize(opts = {})
@api_key,
opts[:host],
opts[:feature_flag_request_timeout_seconds] || Defaults::FeatureFlags::FLAG_REQUEST_TIMEOUT_SECONDS,
opts[:on_error]
opts[:on_error],
flag_definition_cache_provider: opts[:flag_definition_cache_provider]
)

@distinct_id_has_sent_flag_calls = SizeLimitedHash.new(Defaults::MAX_HASH_SIZE) do |hash, key|
Expand Down
94 changes: 83 additions & 11 deletions lib/posthog/feature_flags.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
require 'posthog/version'
require 'posthog/logging'
require 'posthog/feature_flag'
require 'posthog/flag_definition_cache'
require 'digest'

module PostHog
Expand All @@ -29,7 +30,8 @@ def initialize(
project_api_key,
host,
feature_flag_request_timeout_seconds,
on_error = nil
on_error = nil,
flag_definition_cache_provider: nil
)
@polling_interval = polling_interval || 30
@personal_api_key = personal_api_key
Expand All @@ -44,6 +46,10 @@ def initialize(
@on_error = on_error || proc { |status, error| }
@quota_limited = Concurrent::AtomicBoolean.new(false)
@flags_etag = Concurrent::AtomicReference.new(nil)

@flag_definition_cache_provider = flag_definition_cache_provider
FlagDefinitionCacheProvider.validate!(@flag_definition_cache_provider) if @flag_definition_cache_provider

@task =
Concurrent::TimerTask.new(
execution_interval: polling_interval
Expand Down Expand Up @@ -372,6 +378,13 @@ def get_feature_flag_payload(

def shutdown_poller
@task.shutdown
return unless @flag_definition_cache_provider

begin
@flag_definition_cache_provider.shutdown
rescue StandardError => e
logger.error("[FEATURE FLAGS] Cache provider shutdown error: #{e}")
end
end

# Class methods
Expand Down Expand Up @@ -1006,6 +1019,38 @@ def variant_lookup_table(flag)
end

def _load_feature_flags
should_fetch = true

if @flag_definition_cache_provider
begin
should_fetch = @flag_definition_cache_provider.should_fetch_flag_definitions?
rescue StandardError => e
logger.error("[FEATURE FLAGS] Cache provider should_fetch error: #{e}")
should_fetch = true
end
end

unless should_fetch
begin
cached_data = @flag_definition_cache_provider.flag_definitions
if cached_data
logger.debug '[FEATURE FLAGS] Using cached flag definitions from external cache'
_apply_flag_definitions(cached_data)
return
elsif @feature_flags.empty?
# Emergency fallback: cache empty and no flags loaded -> fetch from API
should_fetch = true
end
rescue StandardError => e
logger.error("[FEATURE FLAGS] Cache provider get error: #{e}")
should_fetch = true
end
end

_fetch_and_apply_flag_definitions if should_fetch
end

def _fetch_and_apply_flag_definitions
begin
res = _request_feature_flag_definitions(etag: @flags_etag.value)
rescue StandardError => e
Expand Down Expand Up @@ -1040,21 +1085,48 @@ def _load_feature_flags
# Only update ETag on successful responses with flag data
@flags_etag.value = res[:etag]

@feature_flags = res[:flags] || []
@feature_flags_by_key = {}
@feature_flags.each do |flag|
@feature_flags_by_key[flag[:key]] = flag unless flag[:key].nil?
end
@group_type_mapping = res[:group_type_mapping] || {}
@cohorts = res[:cohorts] || {}

logger.debug "Loaded #{@feature_flags.length} feature flags and #{@cohorts.length} cohorts"
@loaded_flags_successfully_once.make_true if @loaded_flags_successfully_once.false?
_apply_flag_definitions(res)
_store_in_cache_provider
else
logger.debug "Failed to load feature flags: #{res}"
end
end

def _store_in_cache_provider
return unless @flag_definition_cache_provider

begin
data = {
flags: @feature_flags.to_a,
group_type_mapping: @group_type_mapping.to_h,
cohorts: @cohorts.to_h
}
@flag_definition_cache_provider.on_flag_definitions_received(data)
rescue StandardError => e
logger.error("[FEATURE FLAGS] Cache provider store error: #{e}")
end
end

def _apply_flag_definitions(data)
flags = get_by_symbol_or_string_key(data, 'flags') || []
group_type_mapping = get_by_symbol_or_string_key(data, 'group_type_mapping') || {}
cohorts = get_by_symbol_or_string_key(data, 'cohorts') || {}

@feature_flags = Concurrent::Array.new(flags.map { |f| deep_symbolize_keys(f) })

new_by_key = {}
@feature_flags.each do |flag|
new_by_key[flag[:key]] = flag unless flag[:key].nil?
end
@feature_flags_by_key = new_by_key

@group_type_mapping = Concurrent::Hash[deep_symbolize_keys(group_type_mapping)]
@cohorts = Concurrent::Hash[deep_symbolize_keys(cohorts)]

logger.debug "Loaded #{@feature_flags.length} feature flags and #{@cohorts.length} cohorts"
@loaded_flags_successfully_once.make_true if @loaded_flags_successfully_once.false?
end

def _request_feature_flag_definitions(etag: nil)
uri = URI("#{@host}/api/feature_flag/local_evaluation")
uri.query = URI.encode_www_form([['token', @project_api_key], %w[send_cohorts true]])
Expand Down
78 changes: 78 additions & 0 deletions lib/posthog/flag_definition_cache.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
# frozen_string_literal: true

module PostHog
# Interface for external caching of feature flag definitions.
#
# EXPERIMENTAL: This API may change in future minor version bumps.
#
# Enables multi-worker environments (Kubernetes, load-balanced servers,
# serverless functions) to share flag definitions via an external cache,
# reducing redundant API calls.
#
# Implement the four required methods on any object and pass it as the
# +:flag_definition_cache_provider+ option when creating a {Client}.
#
# == Required Methods
#
# [+flag_definitions+]
# Retrieve cached flag definitions. Return a Hash with +:flags+,
# +:group_type_mapping+, and +:cohorts+ keys, or +nil+ if the cache
# is empty. Returning +nil+ triggers an API fetch when no flags are
# loaded yet (emergency fallback).
#
# [+should_fetch_flag_definitions?+]
# Return +true+ if this instance should fetch new definitions from the
# API, +false+ to read from cache instead. Use for distributed lock
# coordination so only one worker fetches at a time.
#
# [+on_flag_definitions_received(data)+]
# Called after successfully fetching new definitions from the API.
# +data+ is a Hash with +:flags+, +:group_type_mapping+, and +:cohorts+
# keys (plain Ruby types, not Concurrent:: wrappers). Store it in your
# external cache.
#
# [+shutdown+]
# Called when the PostHog client shuts down. Release any distributed
# locks and clean up resources.
#
# == Error Handling
#
# All methods are wrapped in +begin/rescue+. Errors are logged but never
# break flag evaluation:
# - +should_fetch_flag_definitions?+ errors default to fetching (fail-safe)
# - +flag_definitions+ errors fall back to API fetch
# - +on_flag_definitions_received+ errors are logged; flags remain in memory
# - +shutdown+ errors are logged; shutdown continues
#
# == Example
#
# cache = RedisFlagCache.new(redis, service_key: 'my-service')
# client = PostHog::Client.new(
# api_key: '<project_api_key>',
# personal_api_key: '<personal_api_key>',
# flag_definition_cache_provider: cache
# )
#
module FlagDefinitionCacheProvider
REQUIRED_METHODS = %i[
flag_definitions
should_fetch_flag_definitions?
on_flag_definitions_received
shutdown
].freeze

# Validates that +provider+ implements all required methods.
# Raises +ArgumentError+ listing any missing methods.
#
# @param provider [Object] the cache provider to validate
# @raise [ArgumentError] if any required methods are missing
def self.validate!(provider)
missing = REQUIRED_METHODS.reject { |m| provider.respond_to?(m) }
return if missing.empty?

raise ArgumentError,
"Flag definition cache provider is missing required methods: #{missing.join(', ')}. " \
'See PostHog::FlagDefinitionCacheProvider for the required interface.'
end
end
end
15 changes: 15 additions & 0 deletions lib/posthog/utils.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,21 @@ def stringify_keys(hash)
hash.transform_keys(&:to_s)
end

# public: Recursively convert all keys to symbols in a Hash/Array tree
#
def deep_symbolize_keys(obj)
case obj
when Hash
obj.each_with_object({}) do |(key, value), result|
result[key.to_sym] = deep_symbolize_keys(value)
end
when Array
obj.map { |item| deep_symbolize_keys(item) }
else
obj
end
end

# public: Returns a new hash with all the date values in the into iso8601
# strings
#
Expand Down
Loading
Loading