Skip to content

Commit 454f0c9

Browse files
authored
perf(webapp): cache task metadata in Redis for the trigger hotpath (#3625)
## Summary The trigger-task hotpath used to early-return without a DB query when a caller passed both a queue override and a per-trigger TTL — the hottest configuration on the trigger API. Adding `triggerSource` to the resolver so the runs-list "Source" filter could distinguish STANDARD / SCHEDULED / AGENT runs removed those early-returns, costing +2 DB queries per trigger on non-locked calls and +1 on locked calls. This change caches `BackgroundWorkerTask` metadata (`ttl`, `triggerSource`, `queueId`, `queueName`) in Redis so the resolver can satisfy every caller configuration with a single `HGET` on the warm path. PG fallback on miss back-fills the cache. Follow-up to #3542. ## Design Two key spaces: - `task-meta:env:{envId}` — the "current worker" view, refreshed at every deploy promotion. 24h safety TTL. - `task-meta:by-worker:{workerId}` — used for `lockToVersion` triggers. Immutable post-create. 30d sliding TTL so historical workers age out. Cache writes use Lua scripts via `defineCommand` so `DEL` + `HSET` + `EXPIRE` land atomically — concurrent readers never see the empty intermediate state of a naive pipeline. Read-path back-fill uses single-field upserts so concurrent back-fills don't wipe each other's siblings. The cache lives behind its own `TASK_META_CACHE_REDIS_*` env-var prefix that falls back to the default `REDIS_*` set, so operators can route the cache to a dedicated Redis instance if they want. The service/instance file split (`taskMetadataCache.server.ts` for the pure class, `taskMetadataCacheInstance.server.ts` for the env-wired singleton) mirrors the existing `runsReplicationService` / `runsReplicationInstance` pattern. ## Test plan - [ ] `pnpm run typecheck --filter webapp` - [ ] `pnpm run test ./test/engine/triggerTask.test.ts --run` — 8 existing tests untouched + 5 new tests covering warm cache, cold miss with back-fill, queue + ttl path, by-worker vs env keyspace, and the promotion cache write - [ ] End-to-end against a dev worker: registering writes both keyspaces with the expected TTLs, and `redis-cli HGETALL "tr:task-meta:env:<envId>"` returns the cached entries ## Benchmark Measured `DefaultQueueManager.resolveQueueProperties` against a real Postgres + Redis (vitest `containerTest`, single-host docker). 500 sequential calls and 2,000 parallel calls (concurrency=50) per scenario, request shaped as `{ taskId, queue: "bench-queue", ttl: "5m" }` — the hot path this PR restores. ``` sequential (one in flight at a time): [noop cache (baseline)] n=500 mean=1.423ms p50=1.394ms p95=1.735ms p99=2.629ms max=11.100ms [redis cache, cold ] n=500 mean=1.346ms p50=1.283ms p95=1.688ms p99=2.463ms max=5.058ms [redis cache, warm ] n=500 mean=0.084ms p50=0.078ms p95=0.105ms p99=0.156ms max=1.129ms speedup (warm vs baseline, sequential): 16.95x parallel (concurrency=50): [noop cache (baseline)] n=2000 mean=10.069ms p50=8.850ms p95=14.718ms p99=31.887ms total=405ms ops/s=4,940 [redis cache, warm ] n=2000 mean=0.614ms p50=0.568ms p95=1.189ms p99=1.432ms total=25ms ops/s=80,389 throughput speedup (warm vs baseline, parallel): 16.27x ``` Read: - **Warm cache cuts resolver latency 17×** at p50 — from ~1.4 ms to ~78 µs per call. - **Cold cache is on par with baseline** — the extra `HGET` miss adds <50 µs against the two Postgres queries that follow, so the worst case is not worse than today. - **Under burst load (50 concurrent triggers)**, the baseline's p99 jumps to ~32 ms as Postgres connections queue up; warm stays at ~1.4 ms. The cache moves the saturation point from ~5k ops/s (PG pool) to ~80k ops/s (single-client Redis pipelining). Caveats: single-host docker, local Postgres + Redis, resolver-only measurement (excludes the rest of the trigger transaction). Prod adds region-local Redis RTT (~0.3–0.8 ms) which shifts warm absolute numbers up but keeps the ratio intact.
1 parent bff4b46 commit 454f0c9

10 files changed

Lines changed: 1127 additions & 115 deletions
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
area: webapp
3+
type: improvement
4+
---
5+
6+
Cache task defaults in Redis so the trigger API skips per-request database lookups, restoring the fast trigger path when callers pass queue and TTL options.

apps/webapp/app/env.server.ts

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,30 @@ const EnvironmentSchema = z
235235
CACHE_REDIS_TLS_DISABLED: z.string().default(process.env.REDIS_TLS_DISABLED ?? "false"),
236236
CACHE_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"),
237237

238+
TASK_META_CACHE_REDIS_HOST: z
239+
.string()
240+
.optional()
241+
.transform((v) => v ?? process.env.REDIS_HOST),
242+
TASK_META_CACHE_REDIS_PORT: z.coerce
243+
.number()
244+
.optional()
245+
.transform(
246+
(v) => v ?? (process.env.REDIS_PORT ? parseInt(process.env.REDIS_PORT) : undefined)
247+
),
248+
TASK_META_CACHE_REDIS_USERNAME: z
249+
.string()
250+
.optional()
251+
.transform((v) => v ?? process.env.REDIS_USERNAME),
252+
TASK_META_CACHE_REDIS_PASSWORD: z
253+
.string()
254+
.optional()
255+
.transform((v) => v ?? process.env.REDIS_PASSWORD),
256+
TASK_META_CACHE_REDIS_TLS_DISABLED: z
257+
.string()
258+
.default(process.env.REDIS_TLS_DISABLED ?? "false"),
259+
TASK_META_CACHE_CURRENT_ENV_TTL_SECONDS: z.coerce.number().default(86400),
260+
TASK_META_CACHE_BY_WORKER_TTL_SECONDS: z.coerce.number().default(2592000),
261+
238262
REALTIME_STREAMS_REDIS_HOST: z
239263
.string()
240264
.optional()

apps/webapp/app/runEngine/concerns/queues.server.ts

Lines changed: 148 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ import { tryCatch } from "@trigger.dev/core/v3";
1717
import { ServiceValidationError } from "~/v3/services/common.server";
1818
import { createCache, createLRUMemoryStore, DefaultStatefulContext, Namespace } from "@internal/cache";
1919
import { singleton } from "~/utils/singleton";
20+
import type { TaskMetadataCache, TaskMetadataEntry } from "~/services/taskMetadataCache.server";
21+
import { taskMetadataCacheInstance } from "~/services/taskMetadataCacheInstance.server";
2022

2123
// LRU cache for environment queue sizes to reduce Redis calls
2224
const queueSizeCache = singleton("queueSizeCache", () => {
@@ -63,13 +65,16 @@ function extractQueueName(queue: { name?: unknown } | undefined): string | undef
6365

6466
export class DefaultQueueManager implements QueueManager {
6567
private readonly replicaPrisma: PrismaClientOrTransaction;
68+
private readonly taskMetaCache: TaskMetadataCache;
6669

6770
constructor(
6871
private readonly prisma: PrismaClientOrTransaction,
6972
private readonly engine: RunEngine,
70-
replicaPrisma?: PrismaClientOrTransaction
73+
replicaPrisma?: PrismaClientOrTransaction,
74+
taskMetaCache: TaskMetadataCache = taskMetadataCacheInstance
7175
) {
7276
this.replicaPrisma = replicaPrisma ?? prisma;
77+
this.taskMetaCache = taskMetaCache;
7378
}
7479

7580
async resolveQueueProperties(
@@ -87,7 +92,10 @@ export class DefaultQueueManager implements QueueManager {
8792
const specifiedQueueName = extractQueueName(request.body.options?.queue);
8893

8994
if (specifiedQueueName) {
90-
// A specific queue name is provided, validate it exists for the locked worker
95+
// A specific queue name is provided, validate it exists for the locked worker.
96+
// Pre-existing query — not cached because TaskQueue rows can be added or
97+
// removed independently of BackgroundWorkerTask, and a stale "queue exists"
98+
// claim would silently route to the wrong queue.
9199
const specifiedQueue = await this.prisma.taskQueue.findFirst({
92100
where: {
93101
name: specifiedQueueName,
@@ -107,49 +115,45 @@ export class DefaultQueueManager implements QueueManager {
107115
queueName = specifiedQueue.name;
108116
lockedQueueId = specifiedQueue.id;
109117

110-
// Always fetch the task so we can resolve `triggerSource` (which
111-
// becomes `taskKind` on annotations and replicates to ClickHouse).
112-
// Without this, AGENT/SCHEDULED runs triggered with
113-
// `lockToVersion` + a queue override would be annotated as
114-
// STANDARD and disappear from the run-list "Source" filter.
115-
// `ttl` is read from the same row but only used when the caller
116-
// didn't specify a per-trigger TTL.
117-
const lockedTask = await this.replicaPrisma.backgroundWorkerTask.findFirst({
118-
where: {
119-
workerId: lockedBackgroundWorker.id,
120-
runtimeEnvironmentId: request.environment.id,
121-
slug: request.taskId,
122-
},
123-
select: { ttl: true, triggerSource: true },
124-
});
118+
// Pull `triggerSource` (for `taskKind` annotation) and `ttl` from cache.
119+
// On cache hit this is 0 PG queries; on miss the helper falls back to
120+
// a BackgroundWorkerTask lookup and back-fills the cache.
121+
//
122+
// If the task slug isn't on this locked worker version, we tolerate
123+
// the missing row and fall through with `taskKind = undefined`
124+
// (coalesced to "STANDARD" downstream) and `taskTtl = undefined`.
125+
// This matches main's pre-PR behavior — the no-override branch below
126+
// still throws because there's no queue to route to in that case,
127+
// but here the caller already named the queue.
128+
const lockedMeta = await this.resolveLockedTaskMetadata(
129+
lockedBackgroundWorker.id,
130+
request.environment.id,
131+
request.taskId
132+
);
125133

126134
if (request.body.options?.ttl === undefined) {
127-
taskTtl = lockedTask?.ttl;
135+
taskTtl = lockedMeta?.ttl ?? undefined;
128136
}
129-
taskKind = lockedTask?.triggerSource;
137+
taskKind = lockedMeta?.triggerSource;
130138
} else {
131-
// No queue override - fetch task with queue to get both default queue and TTL
132-
const lockedTask = await this.replicaPrisma.backgroundWorkerTask.findFirst({
133-
where: {
134-
workerId: lockedBackgroundWorker.id,
135-
runtimeEnvironmentId: request.environment.id,
136-
slug: request.taskId,
137-
},
138-
include: {
139-
queue: true,
140-
},
141-
});
139+
// No queue override - resolve default queue + TTL + triggerSource via cache,
140+
// falling back to a single BackgroundWorkerTask lookup on miss.
141+
const lockedMeta = await this.resolveLockedTaskMetadata(
142+
lockedBackgroundWorker.id,
143+
request.environment.id,
144+
request.taskId
145+
);
142146

143-
if (!lockedTask) {
147+
if (!lockedMeta) {
144148
throw new ServiceValidationError(
145149
`Task '${request.taskId}' not found on locked version '${lockedBackgroundWorker.version ?? "<unknown>"
146150
}'.`
147151
);
148152
}
149153

150-
taskTtl = lockedTask.ttl;
154+
taskTtl = lockedMeta.ttl;
151155

152-
if (!lockedTask.queue) {
156+
if (!lockedMeta.queueName) {
153157
// This case should ideally be prevented by earlier checks or schema constraints,
154158
// but handle it defensively.
155159
logger.error("Task found on locked version, but has no associated queue record", {
@@ -164,9 +168,9 @@ export class DefaultQueueManager implements QueueManager {
164168
}
165169

166170
// Use the task's default queue name
167-
queueName = lockedTask.queue.name;
168-
lockedQueueId = lockedTask.queue.id;
169-
taskKind = lockedTask.triggerSource;
171+
queueName = lockedMeta.queueName;
172+
lockedQueueId = lockedMeta.queueId ?? undefined;
173+
taskKind = lockedMeta.triggerSource;
170174
}
171175
} else {
172176
// Task is not locked to a specific version, use regular logic
@@ -213,76 +217,130 @@ export class DefaultQueueManager implements QueueManager {
213217

214218
const defaultQueueName = `task/${taskId}`;
215219

216-
// Even when the caller provides both a queue override and a
217-
// per-trigger TTL, we still need to fetch the task so `triggerSource`
218-
// (which becomes `taskKind` on annotations and replicates to
219-
// ClickHouse) is populated. Without it, AGENT/SCHEDULED runs hitting
220-
// this path get stamped as STANDARD and disappear from the
221-
// dashboard's `Source` filter. Mirrors the locked-worker fix above
222-
// — `taskTtl` is harmless in the returned value because the call
223-
// site coalesces `body.options.ttl ?? taskTtl`.
224-
225-
// Find the current worker for the environment. Replica is fine here —
226-
// the adjacent `backgroundWorkerTask` lookups below already use
227-
// `replicaPrisma` (replica lag for "just deployed" is bounded the same
228-
// way for both queries; reading the worker from the writer and the
229-
// task from the replica would only widen the inconsistency window).
230-
const worker = await findCurrentWorkerFromEnvironment(environment, this.replicaPrisma);
220+
// Resolve the current worker's task metadata via cache (HGET on warm path,
221+
// BackgroundWorkerTask findFirst + cache back-fill on miss). When this hits,
222+
// both the queue-override + TTL caller and the default-queue caller satisfy
223+
// their full result without any database query.
224+
const meta = await this.resolveCurrentTaskMetadata(environment, taskId);
225+
226+
if (overriddenQueueName) {
227+
// Caller already named the queue. We only need triggerSource (for taskKind)
228+
// and ttl (for the call site to coalesce against body.options.ttl).
229+
return {
230+
queueName: overriddenQueueName,
231+
taskTtl: meta?.ttl ?? undefined,
232+
taskKind: meta?.triggerSource,
233+
};
234+
}
231235

232-
if (!worker) {
233-
logger.debug("Failed to get queue name: No worker found", {
236+
if (!meta) {
237+
logger.debug("Failed to get queue name: No worker or task found", {
234238
taskId,
235239
environmentId: environment.id,
236240
});
237-
238-
return { queueName: overriddenQueueName ?? defaultQueueName, taskTtl: undefined };
241+
return { queueName: defaultQueueName, taskTtl: undefined };
239242
}
240243

241-
// When queue is overridden, we only need TTL from the task (no queue join needed)
242-
if (overriddenQueueName) {
243-
const task = await this.replicaPrisma.backgroundWorkerTask.findFirst({
244-
where: {
245-
workerId: worker.id,
246-
runtimeEnvironmentId: environment.id,
247-
slug: taskId,
248-
},
249-
select: { ttl: true, triggerSource: true },
244+
if (!meta.queueName) {
245+
logger.debug("Failed to get queue name: No queue found", {
246+
taskId,
247+
environmentId: environment.id,
250248
});
251-
252-
return { queueName: overriddenQueueName, taskTtl: task?.ttl, taskKind: task?.triggerSource };
249+
return { queueName: defaultQueueName, taskTtl: meta.ttl, taskKind: meta.triggerSource };
253250
}
254251

255-
const task = await this.replicaPrisma.backgroundWorkerTask.findFirst({
256-
where: {
257-
workerId: worker.id,
258-
runtimeEnvironmentId: environment.id,
259-
slug: taskId,
260-
},
261-
include: {
262-
queue: true,
252+
return { queueName: meta.queueName, taskTtl: meta.ttl, taskKind: meta.triggerSource };
253+
}
254+
255+
/**
256+
* Resolve task metadata for a locked-version trigger. Reads from the
257+
* `task-meta:by-worker:{workerId}` Redis hash; falls back to a single
258+
* BackgroundWorkerTask findFirst on miss and back-fills the cache.
259+
*
260+
* Returns null when no BackgroundWorkerTask row exists.
261+
*/
262+
private async resolveLockedTaskMetadata(
263+
workerId: string,
264+
environmentId: string,
265+
slug: string
266+
): Promise<TaskMetadataEntry | null> {
267+
const cached = await this.taskMetaCache.getByWorker(workerId, slug);
268+
if (cached) return cached;
269+
270+
const row = await this.replicaPrisma.backgroundWorkerTask.findFirst({
271+
where: { workerId, runtimeEnvironmentId: environmentId, slug },
272+
select: {
273+
ttl: true,
274+
triggerSource: true,
275+
queue: { select: { id: true, name: true } },
263276
},
264277
});
265278

266-
if (!task) {
267-
console.log("Failed to get queue name: No task found", {
268-
taskId,
269-
environmentId: environment.id,
270-
});
279+
if (!row) return null;
271280

272-
return { queueName: defaultQueueName, taskTtl: undefined };
273-
}
281+
const entry: TaskMetadataEntry = {
282+
slug,
283+
ttl: row.ttl,
284+
triggerSource: row.triggerSource,
285+
queueId: row.queue?.id ?? null,
286+
queueName: row.queue?.name ?? "",
287+
};
274288

275-
if (!task.queue) {
276-
console.log("Failed to get queue name: No queue found", {
277-
taskId,
278-
environmentId: environment.id,
279-
queueConfig: task.queueConfig,
280-
});
289+
// Fire-and-forget back-fill — `setByWorker` upserts the single field and
290+
// refreshes the hash TTL. Errors are logged inside the cache and swallowed.
291+
void this.taskMetaCache.setByWorker(workerId, entry);
281292

282-
return { queueName: defaultQueueName, taskTtl: task.ttl, taskKind: task.triggerSource };
283-
}
293+
return entry;
294+
}
295+
296+
/**
297+
* Resolve task metadata for a non-locked trigger. Reads from the
298+
* `task-meta:env:{envId}` Redis hash; falls back to
299+
* findCurrentWorkerFromEnvironment + a single BackgroundWorkerTask findFirst
300+
* on miss and back-fills both keyspaces.
301+
*
302+
* Returns null when no current worker or task can be resolved.
303+
*/
304+
private async resolveCurrentTaskMetadata(
305+
environment: AuthenticatedEnvironment,
306+
slug: string
307+
): Promise<TaskMetadataEntry | null> {
308+
const cached = await this.taskMetaCache.getCurrent(environment.id, slug);
309+
if (cached) return cached;
310+
311+
// Cold cache: discover the current worker for the env. Replica is fine —
312+
// the adjacent BackgroundWorkerTask lookup below uses `replicaPrisma` too
313+
// (replica lag for "just deployed" is bounded the same way for both
314+
// queries; reading from the writer here would only widen the window).
315+
const worker = await findCurrentWorkerFromEnvironment(environment, this.replicaPrisma);
316+
if (!worker) return null;
317+
318+
const row = await this.replicaPrisma.backgroundWorkerTask.findFirst({
319+
where: { workerId: worker.id, runtimeEnvironmentId: environment.id, slug },
320+
select: {
321+
ttl: true,
322+
triggerSource: true,
323+
queue: { select: { id: true, name: true } },
324+
},
325+
});
326+
327+
if (!row) return null;
328+
329+
const entry: TaskMetadataEntry = {
330+
slug,
331+
ttl: row.ttl,
332+
triggerSource: row.triggerSource,
333+
queueId: row.queue?.id ?? null,
334+
queueName: row.queue?.name ?? "",
335+
};
336+
337+
// Fire-and-forget back-fill — atomically upserts the slug into both
338+
// keyspaces so a subsequent locked-or-not trigger hits the cache. The
339+
// env-keyspace TTL is preserved (promotion owns it); the by-worker TTL
340+
// is refreshed (sliding window keeps active workers warm).
341+
void this.taskMetaCache.setByCurrentWorker(environment.id, worker.id, entry);
284342

285-
return { queueName: task.queue.name ?? defaultQueueName, taskTtl: task.ttl, taskKind: task.triggerSource };
343+
return entry;
286344
}
287345

288346
async validateQueueLimits(

0 commit comments

Comments
 (0)