feat: add partial broadcast for cached values#3218
feat: add partial broadcast for cached values#3218ruslandoga wants to merge 3 commits intoLogflare:mainfrom
Conversation
lib/logflare/context_cache.ex
Outdated
| end | ||
|
|
||
| defp maybe_broadcast_cached(cache, cache_key, value) do | ||
| Logflare.Utils.Tasks.start_child(fn -> |
There was a problem hiding this comment.
thinking that instead of creating a task for each broadcast, we can have a dedicated genserver for this to avoid the overhead of ephemeral task procs
There was a problem hiding this comment.
I can add a benchmark to compare the two but I'd rather avoid a single process handling cross-node broadcasts. AFAIK :erpc.multicast still awaits on spawn_request reply, so if one of the nodes connection is slow, it could potentially block all of the cache broadcast from sending and grow the process's message queue.
I looked into it a bit more and it seems like Tasks are unnecessary since multicast uses spawn_request with reply=no which means it should be returning immediately.
# since node=nil doesn't exist, it probably means we are not waiting for `send/2` to finish
iex> :erlang.spawn_request(_bad_node_name = nil, :erpc, :execute_cast, [Function, :identity, [1]], reply: :no)
#==> #Reference<0.2294785123.1844445189.249014>
# and it's pretty much instant
iex> :timer.tc fn -> :erlang.spawn_request(_bad_node_name = nil, :erpc, :execute_cast, [Function, :identity, [1]], reply: :no) end
#==> {12, #Reference<0.2294785123.1844445189.249084>}Side-note: maybe we can add global cache broadcast rate limiting similar to https://github.com/getsentry/sentry-elixir/blob/master/lib/sentry/logger_handler/rate_limiter.ex to avoid sending out too many messages, just as a pre-caution.
29cd132 to
40d61bc
Compare
40d61bc to
b8c1ed6
Compare
|
@Ziinc I think it's ready for the initial review! |
| "LOGFLARE_CACHE_BROADCAST_RATIO" |> System.get_env("0.1") |> String.to_float() | ||
|
|
||
| cache_broadcast_max_nodes = | ||
| "LOGFLARE_CACHE_BROADCAST_MAX_NODES" |> System.get_env("5") |> String.to_integer() |
There was a problem hiding this comment.
I'm not sure how it should be configured. For now I went with "global" env vars that configure all context caches with the same values.
There was a problem hiding this comment.
lets default to 3 nodes.
LOGFLARE_CACHE_GOSSIP_MAX_NODES
There was a problem hiding this comment.
I wonder if we really want to name it gossip. In the current implementation it's not really a gossip, it's just a one-hop partial broadcast. And from the discussion on Slack we don't actually want real gossiping since it would result in more network load due to redundant messaging (at least in my current understanding).
There was a problem hiding this comment.
for initial implementation we're looking at 1 hop, but if network messaging does not go through the roof then multi-hop is ideal due to benefits of the cluster size being large.
having the hop count, hop probability, and the max message throughput as env var configs would be good so that we can experiment with this.
| end | ||
|
|
||
| @spec peer_list_partial(float, pos_integer) :: [Node.t()] | ||
| def peer_list_partial(ratio, max_nodes) do |
There was a problem hiding this comment.
Not sure if this function belongs here or in ContextCache.
There was a problem hiding this comment.
here is good.
maybe node_list_random/2 might be a better name (to be similar to node_list_all/0
There was a problem hiding this comment.
It's different in node_list_all in that it's not listing itself, so I tried to highlight that with peer in the name. But I guess it's not that important.
There was a problem hiding this comment.
ah i see, would add @doc on use case if that is so.
for multi-hop scenario, perhaps we could partition the node list and send that with the message, then the receiver will know which partition to not select and broadcast to, to avoid duplication of messages. wonder if it would be worthwhile 🤔
just a thought.
| Enum.map(list_caches_with_metrics(), fn {cache, _} -> cache end) | ||
| end | ||
|
|
||
| defp wal_tombstone_specs do |
There was a problem hiding this comment.
I didn't want to create another module for it since it's not like a normal ContextCache. So I wrote the spec here. This "cache" is used to remember recently deleted records so that they are dropped from "cache broadcasts" to avoid race conditions (record fetch miss, lookup -> cache broadcast -> record deleted -> wal broadcast -> wal broadcast arrives faster -> cache broadcast arrives later).
There was a problem hiding this comment.
a separate ContextCache.WalTombstoneCache would probably be better, we have a few ephemeral caches like these as you have already identified so it is fine.
They're differentiated by the .Cache suffix.
| end | ||
| end | ||
|
|
||
| describe "broadcasts" do |
There was a problem hiding this comment.
I can also add a more realistic test similar to Phoenix PubSub distributed tests where extra nodes are started and connected and "cache broadcast" side-effects are tested on them. Or I can use mocks.
There was a problem hiding this comment.
a distributed test harness would be good for this and other broadcasting tests.
i had tried to use LocalCluster before for this but couldn't get a nice testing flow.
There was a problem hiding this comment.
would prefer avoiding mocks where possible
| System.get_env("LOGFLARE_CACHE_BROADCAST_ENABLED", default_cache_broadcast_enabled) == "true" | ||
|
|
||
| cache_broadcast_ratio = | ||
| "LOGFLARE_CACHE_BROADCAST_RATIO" |> System.get_env("0.1") |> String.to_float() |
There was a problem hiding this comment.
lets hardcode this value in the logic and let the user set the max broadcast nodes
There was a problem hiding this comment.
default of 0.05 would be better (5% of cluster)
so in a 100 node cluster, at most 5 would receive an update (assuming user sets a high max)
| "LOGFLARE_CACHE_BROADCAST_RATIO" |> System.get_env("0.1") |> String.to_float() | ||
|
|
||
| cache_broadcast_max_nodes = | ||
| "LOGFLARE_CACHE_BROADCAST_MAX_NODES" |> System.get_env("5") |> String.to_integer() |
There was a problem hiding this comment.
lets default to 3 nodes.
LOGFLARE_CACHE_GOSSIP_MAX_NODES
| # allows excluding heavy caches via: LOGFLARE_CACHE_BROADCAST_EXCLUDE="auth,saved_searches" | ||
| excluded_caches = | ||
| System.get_env("LOGFLARE_CACHE_BROADCAST_EXCLUDE", "") | ||
| |> String.split(",", trim: true) | ||
| |> Enum.map(&String.trim/1) | ||
| |> MapSet.new() |
There was a problem hiding this comment.
i don't think having an explicit exclusion is a good approach, would be too much configuration for this which should be a transparent optimization.
| cache_broadcasts = | ||
| Map.new(known_caches, fn {short_name, name} -> | ||
| config = %{ | ||
| ratio: cache_broadcast_ratio, | ||
| max_nodes: cache_broadcast_max_nodes, | ||
| enabled: cache_broadcast_enabled? and not MapSet.member?(excluded_caches, short_name) | ||
| } | ||
|
|
||
| {name, config} | ||
| end) | ||
|
|
||
| config :logflare, :cache_broadcasts, cache_broadcasts |
There was a problem hiding this comment.
storing the ratio and the max_nodes on app env would be enough, this seems unnecessary. all caches should be broadcasted.
| end | ||
|
|
||
| @spec peer_list_partial(float, pos_integer) :: [Node.t()] | ||
| def peer_list_partial(ratio, max_nodes) do |
There was a problem hiding this comment.
here is good.
maybe node_list_random/2 might be a better name (to be similar to node_list_all/0
| Enum.map(list_caches_with_metrics(), fn {cache, _} -> cache end) | ||
| end | ||
|
|
||
| defp wal_tombstone_specs do |
There was a problem hiding this comment.
a separate ContextCache.WalTombstoneCache would probably be better, we have a few ephemeral caches like these as you have already identified so it is fine.
They're differentiated by the .Cache suffix.
| def start_link(arg), do: Supervisor.start_link(__MODULE__, arg, name: __MODULE__) | ||
|
|
||
| context_caches_with_metrics = Logflare.ContextCache.Supervisor.list_caches_with_metrics() | ||
| wal_tombstones = Logflare.ContextCache.wal_tombstones_cache_name() |
There was a problem hiding this comment.
separate module name would make this easier.
| counter("logflare.context_cache.broadcast.count", | ||
| event_name: "logflare.context_cache.broadcast.stop", | ||
| tags: [:cache, :enabled], | ||
| description: "Total cache broadcast attempts" | ||
| ), | ||
| distribution("logflare.context_cache.broadcast.stop.duration", | ||
| tags: [:cache, :enabled], | ||
| unit: {:native, :millisecond}, | ||
| description: "Latency of dispatching the cache broadcast" | ||
| ), | ||
| counter("logflare.context_cache.receive_broadcast.count", | ||
| event_name: "logflare.context_cache.receive_broadcast.stop", | ||
| tags: [:cache, :action], | ||
| description: "Total cache broadcasts received and their outcome (cached or dropped)" | ||
| ), | ||
| distribution("logflare.context_cache.receive_broadcast.stop.duration", | ||
| tags: [:cache, :action], | ||
| unit: {:native, :millisecond}, | ||
| description: "Latency of processing an incoming cache broadcast" |
There was a problem hiding this comment.
should make clear in event_name that it is for the local gossip mechanism of the ContextCache on fetching nodes, and not necessarily from the node with TransactionBroadcaster that does broadcasting of wal updates.
clarification is needed due to upcoming changes in here where the TransactionBroadcaster will be broadcasting updated values with the wal.
| end | ||
| end | ||
|
|
||
| describe "broadcasts" do |
There was a problem hiding this comment.
a distributed test harness would be good for this and other broadcasting tests.
i had tried to use LocalCluster before for this but couldn't get a nice testing flow.
| end | ||
| end | ||
|
|
||
| describe "broadcasts" do |
There was a problem hiding this comment.
would prefer avoiding mocks where possible
|
@ruslandoga just to clarify, we do want a full gossip protocol. But with safe initial defaults reduced to minimize risks since this is experimental. We want to make all inputs to the propagation configurable so that we can experiment with it. |
ANL-1352