Skip to content

feat: add partial broadcast for cached values#3218

Open
ruslandoga wants to merge 3 commits intoLogflare:mainfrom
ruslandoga:rd/cache-partial-broadcast
Open

feat: add partial broadcast for cached values#3218
ruslandoga wants to merge 3 commits intoLogflare:mainfrom
ruslandoga:rd/cache-partial-broadcast

Conversation

@ruslandoga
Copy link
Copy Markdown
Contributor

ANL-1352

end

defp maybe_broadcast_cached(cache, cache_key, value) do
Logflare.Utils.Tasks.start_child(fn ->
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thinking that instead of creating a task for each broadcast, we can have a dedicated genserver for this to avoid the overhead of ephemeral task procs

Copy link
Copy Markdown
Contributor Author

@ruslandoga ruslandoga Mar 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can add a benchmark to compare the two but I'd rather avoid a single process handling cross-node broadcasts. AFAIK :erpc.multicast still awaits on spawn_request reply, so if one of the nodes connection is slow, it could potentially block all of the cache broadcast from sending and grow the process's message queue.

I looked into it a bit more and it seems like Tasks are unnecessary since multicast uses spawn_request with reply=no which means it should be returning immediately.

# since node=nil doesn't exist, it probably means we are not waiting for `send/2` to finish
iex> :erlang.spawn_request(_bad_node_name = nil, :erpc, :execute_cast, [Function, :identity, [1]], reply: :no)
#==> #Reference<0.2294785123.1844445189.249014>

# and it's pretty much instant
iex> :timer.tc fn -> :erlang.spawn_request(_bad_node_name = nil, :erpc, :execute_cast, [Function, :identity, [1]], reply: :no) end
#==> {12, #Reference<0.2294785123.1844445189.249084>}

Side-note: maybe we can add global cache broadcast rate limiting similar to https://github.com/getsentry/sentry-elixir/blob/master/lib/sentry/logger_handler/rate_limiter.ex to avoid sending out too many messages, just as a pre-caution.

@ruslandoga ruslandoga force-pushed the rd/cache-partial-broadcast branch 2 times, most recently from 29cd132 to 40d61bc Compare March 18, 2026 19:20
@ruslandoga ruslandoga force-pushed the rd/cache-partial-broadcast branch from 40d61bc to b8c1ed6 Compare March 18, 2026 19:21
@ruslandoga ruslandoga marked this pull request as ready for review March 18, 2026 20:00
@ruslandoga
Copy link
Copy Markdown
Contributor Author

@Ziinc I think it's ready for the initial review!

@ruslandoga ruslandoga requested a review from Ziinc March 18, 2026 20:00
"LOGFLARE_CACHE_BROADCAST_RATIO" |> System.get_env("0.1") |> String.to_float()

cache_broadcast_max_nodes =
"LOGFLARE_CACHE_BROADCAST_MAX_NODES" |> System.get_env("5") |> String.to_integer()
Copy link
Copy Markdown
Contributor Author

@ruslandoga ruslandoga Mar 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how it should be configured. For now I went with "global" env vars that configure all context caches with the same values.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets default to 3 nodes.
LOGFLARE_CACHE_GOSSIP_MAX_NODES

Copy link
Copy Markdown
Contributor Author

@ruslandoga ruslandoga Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we really want to name it gossip. In the current implementation it's not really a gossip, it's just a one-hop partial broadcast. And from the discussion on Slack we don't actually want real gossiping since it would result in more network load due to redundant messaging (at least in my current understanding).

Copy link
Copy Markdown
Contributor

@Ziinc Ziinc Mar 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for initial implementation we're looking at 1 hop, but if network messaging does not go through the roof then multi-hop is ideal due to benefits of the cluster size being large.
having the hop count, hop probability, and the max message throughput as env var configs would be good so that we can experiment with this.

end

@spec peer_list_partial(float, pos_integer) :: [Node.t()]
def peer_list_partial(ratio, max_nodes) do
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this function belongs here or in ContextCache.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here is good.
maybe node_list_random/2 might be a better name (to be similar to node_list_all/0

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's different in node_list_all in that it's not listing itself, so I tried to highlight that with peer in the name. But I guess it's not that important.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah i see, would add @doc on use case if that is so.

for multi-hop scenario, perhaps we could partition the node list and send that with the message, then the receiver will know which partition to not select and broadcast to, to avoid duplication of messages. wonder if it would be worthwhile 🤔
just a thought.

Enum.map(list_caches_with_metrics(), fn {cache, _} -> cache end)
end

defp wal_tombstone_specs do
Copy link
Copy Markdown
Contributor Author

@ruslandoga ruslandoga Mar 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't want to create another module for it since it's not like a normal ContextCache. So I wrote the spec here. This "cache" is used to remember recently deleted records so that they are dropped from "cache broadcasts" to avoid race conditions (record fetch miss, lookup -> cache broadcast -> record deleted -> wal broadcast -> wal broadcast arrives faster -> cache broadcast arrives later).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a separate ContextCache.WalTombstoneCache would probably be better, we have a few ephemeral caches like these as you have already identified so it is fine.
They're differentiated by the .Cache suffix.

end
end

describe "broadcasts" do
Copy link
Copy Markdown
Contributor Author

@ruslandoga ruslandoga Mar 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can also add a more realistic test similar to Phoenix PubSub distributed tests where extra nodes are started and connected and "cache broadcast" side-effects are tested on them. Or I can use mocks.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a distributed test harness would be good for this and other broadcasting tests.
i had tried to use LocalCluster before for this but couldn't get a nice testing flow.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would prefer avoiding mocks where possible

System.get_env("LOGFLARE_CACHE_BROADCAST_ENABLED", default_cache_broadcast_enabled) == "true"

cache_broadcast_ratio =
"LOGFLARE_CACHE_BROADCAST_RATIO" |> System.get_env("0.1") |> String.to_float()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets hardcode this value in the logic and let the user set the max broadcast nodes

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

default of 0.05 would be better (5% of cluster)
so in a 100 node cluster, at most 5 would receive an update (assuming user sets a high max)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LOGFLARE_CACHE_GOSSIP_RATIO

"LOGFLARE_CACHE_BROADCAST_RATIO" |> System.get_env("0.1") |> String.to_float()

cache_broadcast_max_nodes =
"LOGFLARE_CACHE_BROADCAST_MAX_NODES" |> System.get_env("5") |> String.to_integer()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets default to 3 nodes.
LOGFLARE_CACHE_GOSSIP_MAX_NODES

Comment on lines +494 to +499
# allows excluding heavy caches via: LOGFLARE_CACHE_BROADCAST_EXCLUDE="auth,saved_searches"
excluded_caches =
System.get_env("LOGFLARE_CACHE_BROADCAST_EXCLUDE", "")
|> String.split(",", trim: true)
|> Enum.map(&String.trim/1)
|> MapSet.new()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't think having an explicit exclusion is a good approach, would be too much configuration for this which should be a transparent optimization.

Comment on lines +511 to +522
cache_broadcasts =
Map.new(known_caches, fn {short_name, name} ->
config = %{
ratio: cache_broadcast_ratio,
max_nodes: cache_broadcast_max_nodes,
enabled: cache_broadcast_enabled? and not MapSet.member?(excluded_caches, short_name)
}

{name, config}
end)

config :logflare, :cache_broadcasts, cache_broadcasts
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

storing the ratio and the max_nodes on app env would be enough, this seems unnecessary. all caches should be broadcasted.

end

@spec peer_list_partial(float, pos_integer) :: [Node.t()]
def peer_list_partial(ratio, max_nodes) do
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here is good.
maybe node_list_random/2 might be a better name (to be similar to node_list_all/0

Enum.map(list_caches_with_metrics(), fn {cache, _} -> cache end)
end

defp wal_tombstone_specs do
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a separate ContextCache.WalTombstoneCache would probably be better, we have a few ephemeral caches like these as you have already identified so it is fine.
They're differentiated by the .Cache suffix.

def start_link(arg), do: Supervisor.start_link(__MODULE__, arg, name: __MODULE__)

context_caches_with_metrics = Logflare.ContextCache.Supervisor.list_caches_with_metrics()
wal_tombstones = Logflare.ContextCache.wal_tombstones_cache_name()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

separate module name would make this easier.

Comment on lines +231 to +249
counter("logflare.context_cache.broadcast.count",
event_name: "logflare.context_cache.broadcast.stop",
tags: [:cache, :enabled],
description: "Total cache broadcast attempts"
),
distribution("logflare.context_cache.broadcast.stop.duration",
tags: [:cache, :enabled],
unit: {:native, :millisecond},
description: "Latency of dispatching the cache broadcast"
),
counter("logflare.context_cache.receive_broadcast.count",
event_name: "logflare.context_cache.receive_broadcast.stop",
tags: [:cache, :action],
description: "Total cache broadcasts received and their outcome (cached or dropped)"
),
distribution("logflare.context_cache.receive_broadcast.stop.duration",
tags: [:cache, :action],
unit: {:native, :millisecond},
description: "Latency of processing an incoming cache broadcast"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should make clear in event_name that it is for the local gossip mechanism of the ContextCache on fetching nodes, and not necessarily from the node with TransactionBroadcaster that does broadcasting of wal updates.
clarification is needed due to upcoming changes in here where the TransactionBroadcaster will be broadcasting updated values with the wal.

end
end

describe "broadcasts" do
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a distributed test harness would be good for this and other broadcasting tests.
i had tried to use LocalCluster before for this but couldn't get a nice testing flow.

end
end

describe "broadcasts" do
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would prefer avoiding mocks where possible

@Ziinc
Copy link
Copy Markdown
Contributor

Ziinc commented Mar 23, 2026

@ruslandoga just to clarify, we do want a full gossip protocol. But with safe initial defaults reduced to minimize risks since this is experimental. We want to make all inputs to the propagation configurable so that we can experiment with it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants