-
Notifications
You must be signed in to change notification settings - Fork 81
feat: add partial broadcast for cached values #3218
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
b8c1ed6
e19b258
1b5797a
b2331ac
da0a25b
0370555
1f11002
91b6a19
359b522
3079a9b
a44cb6a
7c4d5bd
7e597bf
425b8f9
3306acf
5960cb5
ecf5931
44d7629
79c2373
00bd922
fd0d40c
97e956d
e62958e
829a725
43cbc6b
4d2f286
99e2899
5670b6b
a24f90c
c7200dc
6cec3ca
327b816
e510a94
3695f9c
a088f28
2f11b47
748cbf5
a04cf8b
0dc0415
dfb9a30
5cbc731
96ed7e6
6eb7e37
556ac10
dd9e5e8
5550242
3644f03
41e3d7a
35687d3
21d15ff
c6842ed
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,188 @@ | ||
| defmodule Logflare.ContextCache.Gossip do | ||
| @moduledoc false | ||
|
|
||
| require Logger | ||
| require Cachex.Spec | ||
|
|
||
| alias Logflare.Cluster.Utils, as: ClusterUtils | ||
| alias Logflare.ContextCache | ||
| alias Logflare.ContextCache.Tombstones | ||
|
|
||
| @telemetry_handler_id "context-cache-gossip-logger" | ||
|
|
||
| # this logger can be attached to provide visibility into gossip decisions and dropped multicasts | ||
| def attach_logger do | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This can aid troubleshooting or just checking in with the node. The logs I'm adding there are probably too noisy to be on full-time. These functions can be removed later. |
||
| events = [ | ||
| [:logflare, :context_cache_gossip, :multicast, :stop], | ||
| [:logflare, :context_cache_gossip, :receive, :stop] | ||
| ] | ||
|
|
||
| :telemetry.attach_many( | ||
| @telemetry_handler_id, | ||
| events, | ||
| &__MODULE__.handle_telemetry_event/4, | ||
| _no_config = [] | ||
| ) | ||
| end | ||
|
|
||
| def detach_logger do | ||
| :telemetry.detach(@telemetry_handler_id) | ||
| end | ||
|
|
||
| @doc false | ||
| def handle_telemetry_event(event, measurements, metadata, _config) do | ||
| case event do | ||
| [:logflare, :context_cache_gossip, :multicast, :stop] -> | ||
| %{action: action, cache: cache, key: key} = metadata | ||
|
|
||
| duration = System.convert_time_unit(measurements.duration, :native, :millisecond) | ||
|
|
||
| msg = | ||
| case action do | ||
| :done -> | ||
| "Multicasted gossip for #{cache} #{inspect(key)} to peer nodes in #{duration}ms" | ||
|
|
||
| :disabled -> | ||
| "Context cache gossip is disabled, skipping multicast for #{cache} #{inspect(key)} in #{duration}ms" | ||
|
|
||
| :ignore -> | ||
| "Skipped gossip for #{cache} #{inspect(key)} in #{duration}ms" | ||
| end | ||
|
|
||
| Logger.notice(msg) | ||
|
|
||
| [:logflare, :context_cache_gossip, :receive, :stop] -> | ||
| %{action: action, cache: cache, key: key} = metadata | ||
|
|
||
| duration = System.convert_time_unit(measurements.duration, :native, :millisecond) | ||
|
|
||
| case action do | ||
| :dropped_no_pkey -> | ||
| Logger.warning(""" | ||
| Dropped gossip for #{cache} #{inspect(key)} in #{duration}ms: no primary keys \ | ||
| could be extracted from the value, so staleness cannot be determined\ | ||
| """) | ||
|
|
||
| :dropped_stale -> | ||
| Logger.warning(""" | ||
| Dropped gossip for #{cache} #{inspect(key)} in #{duration}ms: tombstone cache indicates \ | ||
| this record was recently updated or deleted, so the incoming gossip is likely stale\ | ||
| """) | ||
|
|
||
| :cached -> | ||
| Logger.notice("Cached gossip for #{cache} #{inspect(key)} in #{duration}ms") | ||
|
|
||
| :refreshed -> | ||
| Logger.notice("Refreshed gossip for #{cache} #{inspect(key)} in #{duration}ms") | ||
| end | ||
| end | ||
| end | ||
|
|
||
| def multicast(Cachex.Spec.cache(name: cache), key, value) do | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. needs @doc
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added in #3375 |
||
| multicast(cache, key, value) | ||
| end | ||
|
|
||
| def multicast(cache, key, value) when is_atom(cache) do | ||
| meta = %{cache: cache, key: key, value: value} | ||
|
|
||
| :telemetry.span([:logflare, :context_cache_gossip, :multicast], meta, fn -> | ||
| action = do_multicast(cache, key, value) | ||
| {:ok, Map.put(meta, :action, action)} | ||
| end) | ||
| end | ||
|
|
||
| # Negative lookups (`nil` or `[]`) are not gossiped. If Node A caches `nil`, | ||
| # and the record is immediately created, a delayed `nil` cast to Node B | ||
| # would cause phantom "not found" lookups while the record actually exists in the database. | ||
| defp do_multicast(_cache, _key, nil), do: :ignore | ||
| defp do_multicast(_cache, _key, []), do: :ignore | ||
|
|
||
| # Explicitly ignore high-volume/ephemeral caches | ||
| defp do_multicast(Logflare.Logs.LogEvents.Cache, _key, _value), do: :ignore | ||
|
|
||
| # Ignore caches with complicated or missing primary key structures, where staleness cannot be reliably detected | ||
| defp do_multicast(Logflare.Auth.Cache, _key, _value), do: :ignore | ||
| defp do_multicast(Logflare.Rules.Cache, _key, _value), do: :ignore | ||
|
|
||
| defp do_multicast(cache, key, value) when is_atom(cache) do | ||
| %{enabled: enabled, ratio: ratio, max_nodes: max_nodes} = | ||
| Application.fetch_env!(:logflare, :context_cache_gossip) | ||
|
|
||
| if enabled do | ||
| peers = ClusterUtils.peer_list_partial(ratio, max_nodes) | ||
| :erpc.multicast(peers, __MODULE__, :receive, [cache, key, value]) | ||
| :done | ||
| else | ||
| :disabled | ||
| end | ||
| end | ||
|
|
||
| @doc false | ||
| def receive(cache, key, value) do | ||
| meta = %{cache: cache, key: key, value: value} | ||
|
|
||
| :telemetry.span([:logflare, :context_cache_gossip, :receive], meta, fn -> | ||
| action = do_receive(cache, key, value) | ||
| {:ok, Map.put(meta, :action, action)} | ||
| end) | ||
| end | ||
|
|
||
| defp do_receive(cache, key, value) do | ||
| if Cachex.exists?(cache, key) == {:ok, true} do | ||
| # refresh if the node already has this cache key | ||
| Cachex.refresh(cache, key) | ||
| :refreshed | ||
| else | ||
| pkeys = pkeys_from_cached_value(value) | ||
|
|
||
| cond do | ||
| # if we can't extract any primary keys from the cache key/value, | ||
| # we have no way to detect staleness, so we drop it to be safe | ||
| pkeys == [] -> | ||
| :dropped_no_pkey | ||
|
|
||
| # do nothing if the WAL recently busted this specific record | ||
| Enum.any?(pkeys, fn pkey -> Tombstones.Cache.tombstoned?(cache, pkey) end) -> | ||
| :dropped_stale | ||
|
|
||
| true -> | ||
| Cachex.put(cache, key, {:cached, value}) | ||
| :cached | ||
| end | ||
| end | ||
| end | ||
|
|
||
| defp pkeys_from_cached_value(values) when is_list(values) do | ||
| Enum.flat_map(values, &pkeys_from_cached_value/1) | ||
| end | ||
|
|
||
| defp pkeys_from_cached_value({:ok, value}), do: pkeys_from_cached_value(value) | ||
| defp pkeys_from_cached_value(%{id: id}), do: [id] | ||
| defp pkeys_from_cached_value(_value), do: [] | ||
|
|
||
| @doc false | ||
| def record_tombstones(context_pkeys) when is_list(context_pkeys) do | ||
| # Writes a short-lived marker for a primary key indicating it was recently updated or deleted. | ||
| # Incoming cache multicasts check this tombstone cache to determine if their payload could be stale. | ||
|
Comment on lines
+163
to
+166
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. inline docs should be
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Moved in #3375 |
||
| Enum.each(context_pkeys, fn | ||
| # don't need to tombstone new records | ||
| {_context, :not_found} -> | ||
| :ignore | ||
|
|
||
| {context, pkey} -> | ||
| if pkey = format_busted_pkey(pkey) do | ||
| cache = ContextCache.cache_name(context) | ||
| Tombstones.Cache.put_tombstone(cache, pkey) | ||
| end | ||
| end) | ||
| end | ||
|
|
||
| defp format_busted_pkey(pkey) when is_integer(pkey) or is_binary(pkey), do: pkey | ||
|
|
||
| defp format_busted_pkey(info) when is_list(info) do | ||
| info |> Keyword.get(:id) |> format_busted_pkey() | ||
| end | ||
|
|
||
| defp format_busted_pkey(%{id: id}), do: format_busted_pkey(id) | ||
| defp format_busted_pkey(_), do: nil | ||
| end | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,35 @@ | ||
| defmodule Logflare.ContextCache.Tombstones.Cache do | ||
| @moduledoc false | ||
| require Cachex.Spec | ||
|
|
||
| @name __MODULE__ | ||
|
|
||
| def child_spec(_options) do | ||
| expiration = | ||
| Cachex.Spec.expiration( | ||
| interval: to_timeout(second: 30), | ||
| default: to_timeout(minute: 1), | ||
| lazy: true | ||
| ) | ||
|
|
||
| hooks = | ||
| if Application.get_env(:logflare, :cache_stats, false) do | ||
| [Cachex.Spec.hook(module: Cachex.Stats)] | ||
| end | ||
|
|
||
| options = [ | ||
| expiration: expiration, | ||
| hooks: List.wrap(hooks) | ||
| ] | ||
|
|
||
| Supervisor.child_spec({Cachex, [@name, options]}, id: @name) | ||
| end | ||
|
|
||
| def put_tombstone(cache, tombstone) do | ||
| Cachex.put(@name, {cache, tombstone}, true) | ||
| end | ||
|
|
||
| def tombstoned?(cache, tombstone) do | ||
| Cachex.exists?(@name, {cache, tombstone}) == {:ok, true} | ||
| end | ||
| end |
Uh oh!
There was an error while loading. Please reload this page.