Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 151 additions & 20 deletions packages/appkit/src/cache/index.ts
Comment thread
pkosiec marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,29 @@ import { InMemoryStorage, PersistentStorage } from "./storage";

const logger = createLogger("cache");

/**
* Reference-counted in-flight cache execution entry.
*
* `sharedController` decouples the cached `fn()` from any single caller's
* abort signal. Callers join an in-flight entry by incrementing `refCount`;
* when a caller aborts, refCount is decremented. The shared controller is
* aborted only when refCount drops to 0 — i.e. all callers have abandoned
* the request. This prevents one caller's cancellation (e.g. React
* StrictMode unmount) from poisoning the in-flight result for other still-
* connected awaiters.
*/
interface InFlightEntry<T> {
promise: Promise<T>;
refCount: number;
sharedController: AbortController;
abortTimer?: ReturnType<typeof setTimeout>;
}

function createAbortError(signal: AbortSignal): unknown {
if (signal.reason !== undefined) return signal.reason;
return new DOMException("The operation was aborted.", "AbortError");
}

/**
* Cache manager class to handle cache operations.
* Can be used with in-memory storage or persistent storage (Lakebase).
Expand All @@ -34,7 +57,7 @@ export class CacheManager {

private storage: CacheStorage;
private config: CacheConfig;
private inFlightRequests: Map<string, Promise<unknown>>;
private inFlightRequests: Map<string, InFlightEntry<unknown>>;
private cleanupInProgress: boolean;
private lastCleanupAttempt: number;

Expand Down Expand Up @@ -174,20 +197,37 @@ export class CacheManager {
}

/**
* Get or execute a function and cache the result
* @param key - Cache key
* @param fn - Function to execute
* @param userKey - User key
* Get or execute a function and cache the result.
*
* Multiple concurrent callers with the same `cacheKey` are deduplicated
* onto a single in-flight execution. Each caller may pass its own
* `callerSignal`; the underlying `fn()` is run with a shared, internally
* managed `AbortSignal` that aborts only when *all* callers have
* abandoned the request (reference counted). This decouples a single
* caller's cancellation (e.g. React StrictMode unmount) from the shared
* result, so other still-connected callers receive the cached value
* normally.
*
* @param key - Cache key parts
* @param fn - Function to execute. Receives the cache-owned shared signal;
* pass it through to the underlying I/O so the work is cancelled when
* no caller is left waiting.
* @param userKey - User key for cache namespacing
* @param options - Options for the cache
* @returns Promise of the result
*/
async getOrExecute<T>(
key: (string | number | object)[],
fn: () => Promise<T>,
fn: (sharedSignal?: AbortSignal) => Promise<T>,
userKey: string,
options?: { ttl?: number },
options?: { ttl?: number; callerSignal?: AbortSignal },
): Promise<T> {
if (!this.config.enabled) return fn();
if (!this.config.enabled) return fn(options?.callerSignal);

const callerSignal = options?.callerSignal;
if (callerSignal?.aborted) {
throw createAbortError(callerSignal);
}

const cacheKey = this.generateKey(key, userKey);

Expand Down Expand Up @@ -218,9 +258,19 @@ export class CacheManager {
return cached.value;
}

// check if the value is being processed by another request
const inFlight = this.inFlightRequests.get(cacheKey);
if (inFlight) {
// check if the value is being processed by another request — join
// the existing in-flight entry under reference counting so this
// caller's abort doesn't poison the shared result.
const existing = this.inFlightRequests.get(cacheKey) as
| InFlightEntry<T>
| undefined;
if (existing && !existing.sharedController.signal.aborted) {
existing.refCount++;
// Cancel any pending abort timer — a new caller has joined
if (existing.abortTimer) {
clearTimeout(existing.abortTimer);
existing.abortTimer = undefined;
}
span.setAttribute("cache.hit", true);
span.setAttribute("cache.deduplication", true);
span.addEvent("cache.deduplication_used", {
Expand All @@ -238,11 +288,10 @@ export class CacheManager {
cache_deduplication: true,
});

span.end();
return inFlight as Promise<T>;
return await this._waitWithRefCount(existing, callerSignal);
}

// cache miss - execute function
// cache miss - execute function under a shared abort controller
span.setAttribute("cache.hit", false);
span.addEvent("cache.miss", { "cache.key": cacheKey });
this.telemetryMetrics.cacheMissCount.add(1, {
Expand All @@ -254,7 +303,14 @@ export class CacheManager {
cache_key: cacheKey,
});

const promise = fn()
const sharedController = new AbortController();
const entry: InFlightEntry<T> = {
promise: undefined as unknown as Promise<T>,
refCount: 1,
sharedController,
};

entry.promise = fn(sharedController.signal)
.then(async (result) => {
await this.set(cacheKey, result, options);
span.addEvent("cache.value_stored", {
Expand All @@ -266,8 +322,13 @@ export class CacheManager {
.catch((error) => {
span.recordException(error);
span.setStatus({ code: SpanStatusCode.ERROR });
// Preserve AppKit errors and Databricks API errors (with status codes)
// so route handlers can map them to proper HTTP responses.
// If the shared controller aborted, all callers have already
// abandoned the request (or are about to via their own signals)
// — propagate the original error without wrapping. No live
// awaiter will observe this rejection.
if (sharedController.signal.aborted) {
throw error;
}
if (error instanceof AppKitError || error instanceof ApiError) {
throw error;
}
Expand All @@ -276,12 +337,19 @@ export class CacheManager {
);
})
.finally(() => {
this.inFlightRequests.delete(cacheKey);
if (this.inFlightRequests.get(cacheKey) === entry) {
this.inFlightRequests.delete(cacheKey);
}
});

this.inFlightRequests.set(cacheKey, promise);
// Suppress unhandled rejection warnings when every caller bailed
// before fn() resolved (their own promises rejected via
// _waitWithRefCount; the underlying entry.promise has no awaiter).
entry.promise.catch(() => {});

const result = await promise;
this.inFlightRequests.set(cacheKey, entry as InFlightEntry<unknown>);

const result = await this._waitWithRefCount(entry, callerSignal);
span.setStatus({ code: SpanStatusCode.OK });
return result;
} catch (error) {
Expand All @@ -296,6 +364,69 @@ export class CacheManager {
);
}

/**
* Wait on an in-flight entry, racing the underlying promise against the
* caller's abort signal. When the caller aborts, the entry's refCount is
* decremented; if it hits zero the shared controller is aborted so the
* underlying `fn()` can stop. Other callers continue to await the same
* entry and receive the result when it arrives.
*/
private _waitWithRefCount<T>(
entry: InFlightEntry<T>,
callerSignal?: AbortSignal,
): Promise<T> {
if (!callerSignal) return entry.promise;

return new Promise<T>((resolve, reject) => {
let settled = false;

const release = () => {
if (entry.refCount > 0) entry.refCount--;
if (entry.refCount <= 0 && !entry.sharedController.signal.aborted) {
// Grace period: delay abort so a StrictMode remount can join
// the in-flight entry before the shared execution is cancelled.
entry.abortTimer = setTimeout(() => {
if (entry.refCount <= 0 && !entry.sharedController.signal.aborted) {
entry.sharedController.abort(
callerSignal.reason ?? "all cache callers aborted",
);
}
}, 100);
}
};

const onAbort = () => {
if (settled) return;
settled = true;
callerSignal.removeEventListener("abort", onAbort);
release();
reject(createAbortError(callerSignal));
};

if (callerSignal.aborted) {
onAbort();
return;
}

callerSignal.addEventListener("abort", onAbort, { once: true });

entry.promise.then(
(value) => {
if (settled) return;
settled = true;
callerSignal.removeEventListener("abort", onAbort);
resolve(value);
},
(error) => {
if (settled) return;
settled = true;
callerSignal.removeEventListener("abort", onAbort);
reject(error);
},
);
});
}

/**
* Get a cached value
* @param key - Cache key
Expand Down
Loading
Loading