Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
cfca793
feat: Refresh cache when busting entries
bblaszkow06 Feb 20, 2026
09287a1
Merge branch 'main' into bb/cainophile-cache-update
Ziinc Feb 20, 2026
c3c1d47
Merge branch 'main' into bb/cainophile-cache-update
Ziinc Feb 24, 2026
df30197
Merge branch 'main' into bb/cainophile-cache-update
Ziinc Feb 24, 2026
1b345e9
fix: refresh_keys typing error
bblaszkow06 Feb 24, 2026
120e425
fix tests after merging main
bblaszkow06 Feb 24, 2026
6d28026
Merge branch 'main' into bb/cainophile-cache-update
bblaszkow06 Feb 24, 2026
d62e242
Merge branch 'main' into bb/cainophile-cache-update
Ziinc Feb 24, 2026
6429f0b
perf: Fetch changed entity before broadcasting
bblaszkow06 Feb 25, 2026
59fe042
Merge branch 'main' into bb/cainophile-cache-update
Ziinc Feb 26, 2026
277ebaf
perf: Speed up context-cache test
bblaszkow06 Feb 26, 2026
45bf071
Merge remote-tracking branch 'upstream/main' into bb/cainophile-cache…
bblaszkow06 Feb 26, 2026
35d9c1a
fix: refreshing Auth cache with unusual tuple shape
bblaszkow06 Feb 26, 2026
9fd95be
refactor: Remove dead code
bblaszkow06 Feb 26, 2026
0af75da
Merge branch 'main' into bb/cainophile-cache-update
Ziinc Feb 27, 2026
d187748
Merge branch 'main' into bb/cainophile-cache-update
Ziinc Feb 27, 2026
0224a05
Merge remote-tracking branch 'upstream/main' into bb/cainophile-cache…
bblaszkow06 Feb 27, 2026
ec83c83
Cleanup CacheBusterWorker
bblaszkow06 Feb 27, 2026
651e952
Merge branch 'main' into bb/cainophile-cache-update
bblaszkow06 Mar 2, 2026
105144e
Merge branch 'main' into bb/cainophile-cache-update
Ziinc Mar 2, 2026
8557ac5
Merge branch 'main' into bb/cainophile-cache-update
Ziinc Mar 11, 2026
096bccf
Merge branch 'main' into bb/cainophile-cache-update
bblaszkow06 Mar 11, 2026
c262428
fix: Remove merge leftover
bblaszkow06 Mar 11, 2026
6373c10
refactor: CacheBuster compation
bblaszkow06 Mar 12, 2026
19f7fd6
Merge branch 'main' into bb/cainophile-cache-update
bblaszkow06 Apr 10, 2026
778d9c6
Add bust_actions replacing fetch_by_id & keys_to bust
bblaszkow06 Apr 23, 2026
83e1df7
Merge branch 'main' into bb/cainophile-cache-update
bblaszkow06 Apr 23, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions lib/logflare/auth.ex
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ defmodule Logflare.Auth do
)
end

@doc "Gets an access token by its integer primary key."
@spec get_access_token_by_id(integer()) :: OauthAccessToken.t() | nil
def get_access_token_by_id(id) when is_integer(id) do
Repo.get(OauthAccessToken, id)
end

@doc """
Retrieves access token struct by token value.
Requires resource owner to be provided.
Expand Down
2 changes: 2 additions & 0 deletions lib/logflare/auth/cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ defmodule Logflare.Auth.Cache do
}
end

@behaviour Logflare.ContextCache

@spec verify_access_token(OauthAccessToken.t() | String.t()) ::
{:ok, OauthAccessToken.t(), User.t()} | {:error, term()}
def verify_access_token(access_token_or_api_key),
Expand Down
13 changes: 13 additions & 0 deletions lib/logflare/backends/cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,19 @@ defmodule Logflare.Backends.Cache do
}
end

@behaviour Logflare.ContextCache

@impl Logflare.ContextCache
def bust_actions(action, id) when is_integer(id) do
value =
case action do
:update -> Backends.get_backend(id)
:delete -> :bust
end

{:partial, %{{:get_backend, [id]} => value}}
end

def list_backends(arg), do: apply_repo_fun(__ENV__.function, [arg])
def get_backend(arg), do: apply_repo_fun(__ENV__.function, [arg])

Expand Down
8 changes: 8 additions & 0 deletions lib/logflare/billing.ex
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ defmodule Logflare.Billing do
@spec get_billing_account_by(keyword()) :: BillingAccount.t() | nil
def get_billing_account_by(kv), do: Repo.get_by(BillingAccount, kv)

@doc "Gets a single billing_account by user id."
@spec get_billing_account_by_user(integer()) :: BillingAccount.t() | nil
def get_billing_account_by_user(user_id), do: Repo.get_by(BillingAccount, user_id: user_id)

@doc "Gets a single billing_account by id."
@spec get_billing_account(number()) :: BillingAccount.t() | nil
def get_billing_account(id), do: Repo.get(BillingAccount, id)

@doc "Gets a single billing_account. Raises `Ecto.NoResultsError` if the Billing account does not exist."
@spec get_billing_account!(String.t() | number()) :: BillingAccount.t()
def get_billing_account!(id), do: Repo.get!(BillingAccount, id)
Expand Down
17 changes: 15 additions & 2 deletions lib/logflare/billing/cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,21 @@ defmodule Logflare.Billing.Cache do
}
end

def get_billing_account_by(keyword) do
apply_fun(__ENV__.function, [keyword])
@behaviour Logflare.ContextCache

@impl Logflare.ContextCache
def bust_actions(action, id) when is_integer(id) do
value =
case action do
:update -> Billing.get_billing_account(id)
:delete -> :bust
end

{:partial, %{{:get_billing_account_by_user, [id]} => value}}
end

def get_billing_account_by_user(user_id) when is_integer(user_id) do
apply_fun(__ENV__.function, [user_id])
end

def get_plan_by_user(user), do: apply_fun(__ENV__.function, [user])
Expand Down
191 changes: 125 additions & 66 deletions lib/logflare/context_cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ defmodule Logflare.ContextCache do
primary key checking within the matchspec. This approach queries across a narrower set of records,
providing better performance compared to a reverse index approach.

If customization of busting is needed, cache module may implement `c:bust_by/1` callback expecting
a keyword list instead of primary key for entry.
If customization of busting is needed, cache module may implement `c:bust_actions/1` callback.

## List Busting

Expand All @@ -38,10 +37,35 @@ defmodule Logflare.ContextCache do

alias Logflare.ContextCache.Gossip

@type key() :: {fun :: atom(), args :: list()}

@type actions() :: %{key() => any() | :bust}

@type bust_ctx() :: integer() | keyword(integer())

@doc """
Optional callback implementing custom cache key busting by a keyword of values
Returns a tagged tuple of cache actions to perform when a DB change is detected.

The tag indicates coverage:
- `:full` — the map covers all cached functions in this module. Only the map is applied.
- `:partial` — the map covers some cached functions. The map is applied, and for integer
triggers an ETS scan also runs to bust any remaining entries containing the trigger ID.
This is a temporary state during migration; the goal is to reach `:full` coverage.

Each key in the map is a cache key (`{fun, args}` tuple). The value is either a fresh
value to store under that key, or `:bust` to delete the entry.

Used by `CacheBuster` to pre-fetch and broadcast cache updates cluster-wide.
"""
@callback bust_by(keyword()) :: {:ok, non_neg_integer()} | {:error, term()}
@callback bust_actions(action, bust_ctx()) :: {:full | :partial, actions()}
when action: :update | :delete

@optional_callbacks [bust_actions: 2]

@spec cache_name(atom()) :: atom()
def cache_name(context) do
Module.concat(context, Cache)
end

@spec apply_fun(module(), tuple() | atom(), list()) :: any()
def apply_fun(context, {fun, _arity}, args), do: apply_fun(context, fun, args)
Expand Down Expand Up @@ -77,59 +101,28 @@ defmodule Logflare.ContextCache do
1. Queries the relevant context cache using a matchspec to find entries to bust
2. Handles both single records and lists of records containing matching IDs
3. Deletes matching cache entries

For keywords, it expects the cache to handle busting by implementing `c:bust_by/1`
"""
@spec bust_keys(list()) :: {:ok, non_neg_integer()}
@spec bust_keys(list()) :: :ok
def bust_keys(values) when is_list(values) do
busted =
for {context, primary_key} <- values, reduce: 0 do
acc ->
{:ok, n} = bust_key({context, primary_key})
acc + n
end

{:ok, busted}
end
for {context, primary_key} <- values do
context_cache = cache_name(context)
bust_key(context_cache, primary_key)
end

defp bust_key({context, kw}) when is_list(kw) do
context_cache = cache_name(context)
context_cache.bust_by(kw)
:ok
end

defp bust_key({context, pkey}) do
context_cache = cache_name(context)

filter =
{
# use orelse to prevent 2nd condition failing as value is not a map
:orelse,
{
:orelse,
# handle lists
{:is_list, {:element, 2, :value}},
# handle :ok tuples when struct with id is in 2nd element pos.
{:andalso, {:is_tuple, {:element, 2, :value}},
{:andalso, {:==, {:element, 1, {:element, 2, :value}}, :ok},
{:andalso, {:is_map, {:element, 2, {:element, 2, :value}}},
{:==, {:map_get, :id, {:element, 2, {:element, 2, :value}}}, pkey}}}}
},
# handle single maps
{:andalso, {:is_map, {:element, 2, :value}},
{:==, {:map_get, :id, {:element, 2, :value}}, pkey}}
}

query =
Cachex.Query.build(where: filter, output: {:key, :value})

context_cache
|> Cachex.stream!(query)
|> delete_matching_entries(context_cache, pkey)
defp bust_key(context_cache, pkey) when is_integer(pkey) do
Cachex.execute!(context_cache, fn cache ->
keys_with_id(cache, pkey)
|> delete_entries(cache)
end)
end

@spec cache_name(atom()) :: atom()
def cache_name(context) do
Module.concat(context, Cache)
defp delete_entries(entries, cache) do
Enum.each(entries, fn key ->
Cachex.del(cache, key)
end)
end

@doc """
Expand All @@ -152,22 +145,88 @@ defmodule Logflare.ContextCache do
end
end

defp delete_matching_entries(entries, context_cache, pkey) do
to_delete =
entries
|> Stream.filter(fn
{_k, {:cached, v}} when is_list(v) ->
Enum.any?(v, &(&1.id == pkey))

{_k, _v} ->
true
end)

Cachex.execute(context_cache, fn worker ->
Enum.reduce(to_delete, 0, fn {k, _v}, acc ->
Cachex.del(worker, k)
acc + 1
end)
@spec refresh_keys([{module(), integer() | keyword(), {:full | :partial, actions()}}]) :: :ok
def refresh_keys(values) when is_list(values) do
Enum.each(values, fn {context, trigger, {tag, actions}} ->
cache = cache_name(context)

case {tag, trigger} do
{:partial, trigger} when is_integer(trigger) ->
# Remeber present keys to bring back only existing keys after busting
present_keys = present_action_keys(cache, actions)
bust_key(cache, trigger)
apply_partial_actions(cache, actions, present_keys)

_ ->
apply_actions(cache, actions)
end
end)

:ok
end

defp present_action_keys(cache, actions) do
Cachex.execute!(cache, fn worker ->
for {cache_key, value} <- actions,
value != :bust,
{:ok, true} == Cachex.exists?(worker, cache_key),
into: MapSet.new() do
cache_key
end
end)
end

defp apply_actions(cache, actions) do
Enum.each(actions, fn
{cache_key, :bust} -> Cachex.del(cache, cache_key)
{cache_key, value} -> Cachex.update(cache, cache_key, {:cached, value})
end)
end

defp apply_partial_actions(cache, actions, present_keys) do
Enum.each(actions, fn
{cache_key, :bust} ->
Cachex.del(cache, cache_key)

{cache_key, value} ->
if MapSet.member?(present_keys, cache_key),
do: Cachex.put(cache, cache_key, {:cached, value})
end)
end

defp keys_with_id(cache, id) do
keys_with_id_stream(cache, id)
|> Stream.map(&elem(&1, 0))
end

defp keys_with_id_stream(cache, id) do
# match {_cached, %{id: ^pkey}}
direct_filter =
{:andalso, {:is_map, {:element, 2, :value}},
{:==, {:map_get, :id, {:element, 2, :value}}, id}}

# match {_cached, {:ok, %{id: ^pkey}}} and larger ok-tuples
tuple_filter =
{:andalso, {:is_tuple, {:element, 2, :value}},
{:andalso, {:==, {:element, 1, {:element, 2, :value}}, :ok},
{:andalso, {:is_map, {:element, 2, {:element, 2, :value}}},
{:==, {:map_get, :id, {:element, 2, {:element, 2, :value}}}, id}}}}

element_filter =
{:orelse, direct_filter, tuple_filter}

list_filter = {:is_list, {:element, 2, :value}}
filter = {:orelse, list_filter, element_filter}
query = Cachex.Query.build(where: filter, output: {:key, :value})

cache
|> Cachex.stream!(query)
|> Stream.filter(fn
{_k, {:cached, v}} when is_list(v) ->
Enum.any?(v, &(&1.id == id))

{_k, _v} ->
true
end)
end
end
Loading
Loading