Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
b8c1ed6
add partial cache broadcast on misses
ruslandoga Feb 27, 2026
e19b258
credo
ruslandoga Mar 18, 2026
1b5797a
credo again
ruslandoga Mar 18, 2026
b2331ac
Merge branch 'main' into rd/cache-partial-broadcast
ruslandoga Apr 2, 2026
da0a25b
naming
ruslandoga Apr 2, 2026
0370555
comment
ruslandoga Apr 2, 2026
1f11002
comments
ruslandoga Apr 2, 2026
91b6a19
naming
ruslandoga Apr 2, 2026
359b522
continue
ruslandoga Apr 2, 2026
3079a9b
credo
ruslandoga Apr 2, 2026
a44cb6a
wording
ruslandoga Apr 2, 2026
7c4d5bd
continue
ruslandoga Apr 2, 2026
7e597bf
fewer changes
ruslandoga Apr 2, 2026
425b8f9
Merge branch 'main' into rd/cache-partial-broadcast
ruslandoga Apr 2, 2026
3306acf
add refresh
ruslandoga Apr 2, 2026
5960cb5
begin tests
ruslandoga Apr 2, 2026
ecf5931
seems to work
ruslandoga Apr 2, 2026
44d7629
add key to telemetry info
ruslandoga Apr 2, 2026
79c2373
add key to telemetry info
ruslandoga Apr 2, 2026
00bd922
Merge branch 'main' into rd/cache-partial-broadcast
ruslandoga Apr 2, 2026
fd0d40c
doc
ruslandoga Apr 2, 2026
97e956d
fix credo warning
ruslandoga Apr 2, 2026
e62958e
extract gossip functions into own module
ruslandoga Apr 2, 2026
829a725
fix mime error
ruslandoga Apr 2, 2026
43cbc6b
continue
ruslandoga Apr 3, 2026
4d2f286
move epmd setup to ci
ruslandoga Apr 3, 2026
99e2899
use ; instead of &&
ruslandoga Apr 3, 2026
5670b6b
seems to work
ruslandoga Apr 3, 2026
a24f90c
cleanup
ruslandoga Apr 3, 2026
c7200dc
cleanup again
ruslandoga Apr 3, 2026
6cec3ca
add big error if epmd is not running
ruslandoga Apr 3, 2026
327b816
cleanup
ruslandoga Apr 3, 2026
e510a94
format error
ruslandoga Apr 3, 2026
3695f9c
improve float parsing (allow 0 and 1)
ruslandoga Apr 3, 2026
a088f28
add warnings
ruslandoga Apr 3, 2026
2f11b47
pipe
ruslandoga Apr 3, 2026
748cbf5
comment on epmd
ruslandoga Apr 3, 2026
a04cf8b
move comment
ruslandoga Apr 3, 2026
0dc0415
move unboxed runs to setup from setup_all
ruslandoga Apr 3, 2026
dfb9a30
cache user
ruslandoga Apr 3, 2026
5cbc731
continue
ruslandoga Apr 3, 2026
96ed7e6
cleanup
ruslandoga Apr 3, 2026
6eb7e37
continue
ruslandoga Apr 3, 2026
556ac10
more logs
ruslandoga Apr 3, 2026
dd9e5e8
wording
ruslandoga Apr 3, 2026
5550242
comment
ruslandoga Apr 3, 2026
3644f03
naming
ruslandoga Apr 3, 2026
41e3d7a
eh
ruslandoga Apr 3, 2026
35687d3
Merge branch 'main' into rd/cache-partial-broadcast
ruslandoga Apr 10, 2026
21d15ff
notice
ruslandoga Apr 10, 2026
c6842ed
Merge branch 'main' into rd/cache-partial-broadcast
Ziinc Apr 15, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/elixir-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ jobs:
- name: Compilation Warnings
run: mix test.compile
- name: Tests
run: mix do ecto.create + ecto.migrate + test.coverage.ci
# we start epmd for clustered tests
run: epmd -daemon; mix do ecto.create + ecto.migrate + test.coverage.ci
# - name: Security - Sobelow Code Scan
# run: mix test.security
- name: Typing check
Expand Down
34 changes: 34 additions & 0 deletions config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -480,3 +480,37 @@ config :logflare, Oban,
]

config :logflare, Logflare.Alerting, enabled: enable_alerting?

# LOGFLARE_CACHE_GOSSIP_ENABLED: Enable or disable cache gossip
default_cache_gossip_enabled = if config_env() == :test, do: "true", else: "false"

cache_gossip_enabled? =
System.get_env("LOGFLARE_CACHE_GOSSIP_ENABLED", default_cache_gossip_enabled) == "true"

# LOGFLARE_CACHE_GOSSIP_RATIO: Ratio of nodes to gossip cache updates to
raw_cache_gossip_ratio = System.get_env("LOGFLARE_CACHE_GOSSIP_RATIO", "0.05")

cache_gossip_ratio =
case Float.parse(raw_cache_gossip_ratio) do
{ratio, ""} when ratio >= 0.0 and ratio <= 1.0 ->
ratio

_ ->
raise ArgumentError,
"Invalid LOGFLARE_CACHE_GOSSIP_RATIO: #{raw_cache_gossip_ratio}. Must be a float between 0 and 1."
end

# LOGFLARE_CACHE_GOSSIP_MAX_NODES: Maximum number of nodes to gossip cache updates to
cache_gossip_max_nodes =
"LOGFLARE_CACHE_GOSSIP_MAX_NODES" |> System.get_env("3") |> String.to_integer()

if cache_gossip_max_nodes <= 0 do
raise ArgumentError,
"Invalid LOGFLARE_CACHE_GOSSIP_MAX_NODES: #{cache_gossip_max_nodes}. Must be a positive integer."
end

config :logflare, :context_cache_gossip, %{
enabled: cache_gossip_enabled?,
ratio: cache_gossip_ratio,
max_nodes: cache_gossip_max_nodes
}
17 changes: 17 additions & 0 deletions lib/logflare/cluster/utils.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,23 @@ defmodule Logflare.Cluster.Utils do
[Node.self() | Node.list()]
end

@doc """
Returns a random subset of the current cluster peers.
Does not include the local node.
"""
@spec peer_list_partial(float, pos_integer) :: [Node.t()]
def peer_list_partial(ratio, max_nodes) do
Comment thread
Ziinc marked this conversation as resolved.
peers = Node.list()
peer_count = length(peers)

if peer_count == 0 do
[]
else
target_count = min(ceil(peer_count * ratio), max_nodes)
Enum.take_random(peers, target_count)
end
end

@spec cluster_size() :: non_neg_integer()
def cluster_size do
max(actual_cluster_size(), min_cluster_size())
Expand Down
9 changes: 9 additions & 0 deletions lib/logflare/context_cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,16 @@ defmodule Logflare.ContextCache do

In the case functions don't return a response with a primary key, or something else we can
bust the cache on, it will get reverse indexed with `select_key/1` as `:unknown`.

## Gossip

Cache misses are optionally multicast to peer nodes via `:erpc` to warm the cluster.
To prevent race conditions, WAL invalidations write short-lived tombstones that
filter out stale incoming messages.
"""

alias Logflare.ContextCache.Gossip

@doc """
Optional callback implementing custom cache key busting by a keyword of values
"""
Expand Down Expand Up @@ -134,6 +142,7 @@ defmodule Logflare.ContextCache do
{:commit, {:cached, getter_fn.()}}
end) do
{:commit, {:cached, value}} ->
Gossip.multicast(cache, cache_key, value)
value

{:ok, {:cached, value}} ->
Expand Down
1 change: 1 addition & 0 deletions lib/logflare/context_cache/cache_buster_worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ defmodule Logflare.ContextCache.CacheBusterWorker do

@impl true
def handle_cast({:to_bust, context_pkeys}, state) do
ContextCache.Gossip.record_tombstones(context_pkeys)
ContextCache.bust_keys(context_pkeys)

for record <- context_pkeys do
Expand Down
188 changes: 188 additions & 0 deletions lib/logflare/context_cache/gossip.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
defmodule Logflare.ContextCache.Gossip do
@moduledoc false

require Logger
require Cachex.Spec

alias Logflare.Cluster.Utils, as: ClusterUtils
alias Logflare.ContextCache
alias Logflare.ContextCache.Tombstones

@telemetry_handler_id "context-cache-gossip-logger"

# this logger can be attached to provide visibility into gossip decisions and dropped multicasts
def attach_logger do
Copy link
Copy Markdown
Contributor Author

@ruslandoga ruslandoga Apr 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can aid troubleshooting or just checking in with the node. The logs I'm adding there are probably too noisy to be on full-time. These functions can be removed later.

events = [
[:logflare, :context_cache_gossip, :multicast, :stop],
[:logflare, :context_cache_gossip, :receive, :stop]
]

:telemetry.attach_many(
@telemetry_handler_id,
events,
&__MODULE__.handle_telemetry_event/4,
_no_config = []
)
end

def detach_logger do
:telemetry.detach(@telemetry_handler_id)
end

@doc false
def handle_telemetry_event(event, measurements, metadata, _config) do
case event do
[:logflare, :context_cache_gossip, :multicast, :stop] ->
%{action: action, cache: cache, key: key} = metadata

duration = System.convert_time_unit(measurements.duration, :native, :millisecond)

msg =
case action do
:done ->
"Multicasted gossip for #{cache} #{inspect(key)} to peer nodes in #{duration}ms"

:disabled ->
"Context cache gossip is disabled, skipping multicast for #{cache} #{inspect(key)} in #{duration}ms"

:ignore ->
"Skipped gossip for #{cache} #{inspect(key)} in #{duration}ms"
end

Logger.notice(msg)

[:logflare, :context_cache_gossip, :receive, :stop] ->
%{action: action, cache: cache, key: key} = metadata

duration = System.convert_time_unit(measurements.duration, :native, :millisecond)

case action do
:dropped_no_pkey ->
Logger.warning("""
Dropped gossip for #{cache} #{inspect(key)} in #{duration}ms: no primary keys \
could be extracted from the value, so staleness cannot be determined\
""")

:dropped_stale ->
Logger.warning("""
Dropped gossip for #{cache} #{inspect(key)} in #{duration}ms: tombstone cache indicates \
this record was recently updated or deleted, so the incoming gossip is likely stale\
""")

:cached ->
Logger.notice("Cached gossip for #{cache} #{inspect(key)} in #{duration}ms")

:refreshed ->
Logger.notice("Refreshed gossip for #{cache} #{inspect(key)} in #{duration}ms")
end
end
end

def multicast(Cachex.Spec.cache(name: cache), key, value) do
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needs @doc

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added in #3375

multicast(cache, key, value)
end

def multicast(cache, key, value) when is_atom(cache) do
meta = %{cache: cache, key: key, value: value}

:telemetry.span([:logflare, :context_cache_gossip, :multicast], meta, fn ->
action = do_multicast(cache, key, value)
{:ok, Map.put(meta, :action, action)}
end)
end

# Negative lookups (`nil` or `[]`) are not gossiped. If Node A caches `nil`,
# and the record is immediately created, a delayed `nil` cast to Node B
# would cause phantom "not found" lookups while the record actually exists in the database.
defp do_multicast(_cache, _key, nil), do: :ignore
defp do_multicast(_cache, _key, []), do: :ignore

# Explicitly ignore high-volume/ephemeral caches
defp do_multicast(Logflare.Logs.LogEvents.Cache, _key, _value), do: :ignore

# Ignore caches with complicated or missing primary key structures, where staleness cannot be reliably detected
defp do_multicast(Logflare.Auth.Cache, _key, _value), do: :ignore
defp do_multicast(Logflare.Rules.Cache, _key, _value), do: :ignore

defp do_multicast(cache, key, value) when is_atom(cache) do
%{enabled: enabled, ratio: ratio, max_nodes: max_nodes} =
Application.fetch_env!(:logflare, :context_cache_gossip)

if enabled do
peers = ClusterUtils.peer_list_partial(ratio, max_nodes)
:erpc.multicast(peers, __MODULE__, :receive, [cache, key, value])
:done
else
:disabled
end
end

@doc false
def receive(cache, key, value) do
meta = %{cache: cache, key: key, value: value}

:telemetry.span([:logflare, :context_cache_gossip, :receive], meta, fn ->
action = do_receive(cache, key, value)
{:ok, Map.put(meta, :action, action)}
end)
end

defp do_receive(cache, key, value) do
if Cachex.exists?(cache, key) == {:ok, true} do
# refresh if the node already has this cache key
Cachex.refresh(cache, key)
:refreshed
else
pkeys = pkeys_from_cached_value(value)

cond do
# if we can't extract any primary keys from the cache key/value,
# we have no way to detect staleness, so we drop it to be safe
pkeys == [] ->
:dropped_no_pkey

# do nothing if the WAL recently busted this specific record
Enum.any?(pkeys, fn pkey -> Tombstones.Cache.tombstoned?(cache, pkey) end) ->
:dropped_stale

true ->
Cachex.put(cache, key, {:cached, value})
:cached
end
end
end

defp pkeys_from_cached_value(values) when is_list(values) do
Enum.flat_map(values, &pkeys_from_cached_value/1)
end

defp pkeys_from_cached_value({:ok, value}), do: pkeys_from_cached_value(value)
defp pkeys_from_cached_value(%{id: id}), do: [id]
defp pkeys_from_cached_value(_value), do: []

@doc false
def record_tombstones(context_pkeys) when is_list(context_pkeys) do
# Writes a short-lived marker for a primary key indicating it was recently updated or deleted.
# Incoming cache multicasts check this tombstone cache to determine if their payload could be stale.
Comment on lines +163 to +166
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inline docs should be @doc

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved in #3375

Enum.each(context_pkeys, fn
# don't need to tombstone new records
{_context, :not_found} ->
:ignore

{context, pkey} ->
if pkey = format_busted_pkey(pkey) do
cache = ContextCache.cache_name(context)
Tombstones.Cache.put_tombstone(cache, pkey)
end
end)
end

defp format_busted_pkey(pkey) when is_integer(pkey) or is_binary(pkey), do: pkey

defp format_busted_pkey(info) when is_list(info) do
info |> Keyword.get(:id) |> format_busted_pkey()
end

defp format_busted_pkey(%{id: id}), do: format_busted_pkey(id)
defp format_busted_pkey(_), do: nil
end
37 changes: 24 additions & 13 deletions lib/logflare/context_cache/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ defmodule Logflare.ContextCache.Supervisor do
alias Logflare.Backends
alias Logflare.ContextCache.CacheBuster
alias Logflare.ContextCache.CacheBusterWorker
alias Logflare.ContextCache.Tombstones
alias Logflare.Billing
alias Logflare.ContextCache
alias Logflare.Backends
Expand Down Expand Up @@ -33,19 +34,28 @@ defmodule Logflare.ContextCache.Supervisor do
|> Supervisor.init(strategy: :one_for_one)
end

defp get_children(:test),
do:
list_caches() ++
[
{GenSingleton, child_spec: cainophile_child_spec()}
]

defp get_children(_env) do
list_caches() ++
[
ContextCache.TransactionBroadcaster,
defp get_children(env) do
caches = list_caches()

maybe_transaction_broadcaster =
if env != :test do
ContextCache.TransactionBroadcaster
end

maybe_cainophile =
if Application.get_env(:logflare, :enable_cainophile, true) do
{GenSingleton, child_spec: cainophile_child_spec()}
] ++ buster_specs()
end

maybe_busters =
if env != :test do
buster_specs()
end

caches ++
List.wrap(maybe_transaction_broadcaster) ++
List.wrap(maybe_cainophile) ++
List.wrap(maybe_busters)
end

def buster_specs do
Expand All @@ -68,7 +78,8 @@ defmodule Logflare.ContextCache.Supervisor do
{Endpoints.Cache, :endpoints},
{Rules.Cache, :rules},
{KeyValues.Cache, :key_values},
{SavedSearches.Cache, :saved_searches}
{SavedSearches.Cache, :saved_searches},
{Tombstones.Cache, :context_cache_tombstones}
]
end

Expand Down
35 changes: 35 additions & 0 deletions lib/logflare/context_cache/tombstones/cache.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
defmodule Logflare.ContextCache.Tombstones.Cache do
@moduledoc false
require Cachex.Spec

@name __MODULE__

def child_spec(_options) do
expiration =
Cachex.Spec.expiration(
interval: to_timeout(second: 30),
default: to_timeout(minute: 1),
lazy: true
)

hooks =
if Application.get_env(:logflare, :cache_stats, false) do
[Cachex.Spec.hook(module: Cachex.Stats)]
end

options = [
expiration: expiration,
hooks: List.wrap(hooks)
]

Supervisor.child_spec({Cachex, [@name, options]}, id: @name)
end

def put_tombstone(cache, tombstone) do
Cachex.put(@name, {cache, tombstone}, true)
end

def tombstoned?(cache, tombstone) do
Cachex.exists?(@name, {cache, tombstone}) == {:ok, true}
end
end
Loading
Loading