diff --git a/.server-changes/task-metadata-cache.md b/.server-changes/task-metadata-cache.md new file mode 100644 index 00000000000..a71bbdf347b --- /dev/null +++ b/.server-changes/task-metadata-cache.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: improvement +--- + +Cache task defaults in Redis so the trigger API skips per-request database lookups, restoring the fast trigger path when callers pass queue and TTL options. diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index 97cccbc1710..8eacb9634e1 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -235,6 +235,30 @@ const EnvironmentSchema = z CACHE_REDIS_TLS_DISABLED: z.string().default(process.env.REDIS_TLS_DISABLED ?? "false"), CACHE_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"), + TASK_META_CACHE_REDIS_HOST: z + .string() + .optional() + .transform((v) => v ?? process.env.REDIS_HOST), + TASK_META_CACHE_REDIS_PORT: z.coerce + .number() + .optional() + .transform( + (v) => v ?? (process.env.REDIS_PORT ? parseInt(process.env.REDIS_PORT) : undefined) + ), + TASK_META_CACHE_REDIS_USERNAME: z + .string() + .optional() + .transform((v) => v ?? process.env.REDIS_USERNAME), + TASK_META_CACHE_REDIS_PASSWORD: z + .string() + .optional() + .transform((v) => v ?? process.env.REDIS_PASSWORD), + TASK_META_CACHE_REDIS_TLS_DISABLED: z + .string() + .default(process.env.REDIS_TLS_DISABLED ?? "false"), + TASK_META_CACHE_CURRENT_ENV_TTL_SECONDS: z.coerce.number().default(86400), + TASK_META_CACHE_BY_WORKER_TTL_SECONDS: z.coerce.number().default(2592000), + REALTIME_STREAMS_REDIS_HOST: z .string() .optional() diff --git a/apps/webapp/app/runEngine/concerns/queues.server.ts b/apps/webapp/app/runEngine/concerns/queues.server.ts index 4b4298bc935..2fc35fc8435 100644 --- a/apps/webapp/app/runEngine/concerns/queues.server.ts +++ b/apps/webapp/app/runEngine/concerns/queues.server.ts @@ -17,6 +17,8 @@ import { tryCatch } from "@trigger.dev/core/v3"; import { ServiceValidationError } from "~/v3/services/common.server"; import { createCache, createLRUMemoryStore, DefaultStatefulContext, Namespace } from "@internal/cache"; import { singleton } from "~/utils/singleton"; +import type { TaskMetadataCache, TaskMetadataEntry } from "~/services/taskMetadataCache.server"; +import { taskMetadataCacheInstance } from "~/services/taskMetadataCacheInstance.server"; // LRU cache for environment queue sizes to reduce Redis calls const queueSizeCache = singleton("queueSizeCache", () => { @@ -63,13 +65,16 @@ function extractQueueName(queue: { name?: unknown } | undefined): string | undef export class DefaultQueueManager implements QueueManager { private readonly replicaPrisma: PrismaClientOrTransaction; + private readonly taskMetaCache: TaskMetadataCache; constructor( private readonly prisma: PrismaClientOrTransaction, private readonly engine: RunEngine, - replicaPrisma?: PrismaClientOrTransaction + replicaPrisma?: PrismaClientOrTransaction, + taskMetaCache: TaskMetadataCache = taskMetadataCacheInstance ) { this.replicaPrisma = replicaPrisma ?? prisma; + this.taskMetaCache = taskMetaCache; } async resolveQueueProperties( @@ -87,7 +92,10 @@ export class DefaultQueueManager implements QueueManager { const specifiedQueueName = extractQueueName(request.body.options?.queue); if (specifiedQueueName) { - // A specific queue name is provided, validate it exists for the locked worker + // A specific queue name is provided, validate it exists for the locked worker. + // Pre-existing query — not cached because TaskQueue rows can be added or + // removed independently of BackgroundWorkerTask, and a stale "queue exists" + // claim would silently route to the wrong queue. const specifiedQueue = await this.prisma.taskQueue.findFirst({ where: { name: specifiedQueueName, @@ -107,49 +115,45 @@ export class DefaultQueueManager implements QueueManager { queueName = specifiedQueue.name; lockedQueueId = specifiedQueue.id; - // Always fetch the task so we can resolve `triggerSource` (which - // becomes `taskKind` on annotations and replicates to ClickHouse). - // Without this, AGENT/SCHEDULED runs triggered with - // `lockToVersion` + a queue override would be annotated as - // STANDARD and disappear from the run-list "Source" filter. - // `ttl` is read from the same row but only used when the caller - // didn't specify a per-trigger TTL. - const lockedTask = await this.replicaPrisma.backgroundWorkerTask.findFirst({ - where: { - workerId: lockedBackgroundWorker.id, - runtimeEnvironmentId: request.environment.id, - slug: request.taskId, - }, - select: { ttl: true, triggerSource: true }, - }); + // Pull `triggerSource` (for `taskKind` annotation) and `ttl` from cache. + // On cache hit this is 0 PG queries; on miss the helper falls back to + // a BackgroundWorkerTask lookup and back-fills the cache. + // + // If the task slug isn't on this locked worker version, we tolerate + // the missing row and fall through with `taskKind = undefined` + // (coalesced to "STANDARD" downstream) and `taskTtl = undefined`. + // This matches main's pre-PR behavior — the no-override branch below + // still throws because there's no queue to route to in that case, + // but here the caller already named the queue. + const lockedMeta = await this.resolveLockedTaskMetadata( + lockedBackgroundWorker.id, + request.environment.id, + request.taskId + ); if (request.body.options?.ttl === undefined) { - taskTtl = lockedTask?.ttl; + taskTtl = lockedMeta?.ttl ?? undefined; } - taskKind = lockedTask?.triggerSource; + taskKind = lockedMeta?.triggerSource; } else { - // No queue override - fetch task with queue to get both default queue and TTL - const lockedTask = await this.replicaPrisma.backgroundWorkerTask.findFirst({ - where: { - workerId: lockedBackgroundWorker.id, - runtimeEnvironmentId: request.environment.id, - slug: request.taskId, - }, - include: { - queue: true, - }, - }); + // No queue override - resolve default queue + TTL + triggerSource via cache, + // falling back to a single BackgroundWorkerTask lookup on miss. + const lockedMeta = await this.resolveLockedTaskMetadata( + lockedBackgroundWorker.id, + request.environment.id, + request.taskId + ); - if (!lockedTask) { + if (!lockedMeta) { throw new ServiceValidationError( `Task '${request.taskId}' not found on locked version '${lockedBackgroundWorker.version ?? "" }'.` ); } - taskTtl = lockedTask.ttl; + taskTtl = lockedMeta.ttl; - if (!lockedTask.queue) { + if (!lockedMeta.queueName) { // This case should ideally be prevented by earlier checks or schema constraints, // but handle it defensively. logger.error("Task found on locked version, but has no associated queue record", { @@ -164,9 +168,9 @@ export class DefaultQueueManager implements QueueManager { } // Use the task's default queue name - queueName = lockedTask.queue.name; - lockedQueueId = lockedTask.queue.id; - taskKind = lockedTask.triggerSource; + queueName = lockedMeta.queueName; + lockedQueueId = lockedMeta.queueId ?? undefined; + taskKind = lockedMeta.triggerSource; } } else { // Task is not locked to a specific version, use regular logic @@ -213,76 +217,130 @@ export class DefaultQueueManager implements QueueManager { const defaultQueueName = `task/${taskId}`; - // Even when the caller provides both a queue override and a - // per-trigger TTL, we still need to fetch the task so `triggerSource` - // (which becomes `taskKind` on annotations and replicates to - // ClickHouse) is populated. Without it, AGENT/SCHEDULED runs hitting - // this path get stamped as STANDARD and disappear from the - // dashboard's `Source` filter. Mirrors the locked-worker fix above - // — `taskTtl` is harmless in the returned value because the call - // site coalesces `body.options.ttl ?? taskTtl`. - - // Find the current worker for the environment. Replica is fine here — - // the adjacent `backgroundWorkerTask` lookups below already use - // `replicaPrisma` (replica lag for "just deployed" is bounded the same - // way for both queries; reading the worker from the writer and the - // task from the replica would only widen the inconsistency window). - const worker = await findCurrentWorkerFromEnvironment(environment, this.replicaPrisma); + // Resolve the current worker's task metadata via cache (HGET on warm path, + // BackgroundWorkerTask findFirst + cache back-fill on miss). When this hits, + // both the queue-override + TTL caller and the default-queue caller satisfy + // their full result without any database query. + const meta = await this.resolveCurrentTaskMetadata(environment, taskId); + + if (overriddenQueueName) { + // Caller already named the queue. We only need triggerSource (for taskKind) + // and ttl (for the call site to coalesce against body.options.ttl). + return { + queueName: overriddenQueueName, + taskTtl: meta?.ttl ?? undefined, + taskKind: meta?.triggerSource, + }; + } - if (!worker) { - logger.debug("Failed to get queue name: No worker found", { + if (!meta) { + logger.debug("Failed to get queue name: No worker or task found", { taskId, environmentId: environment.id, }); - - return { queueName: overriddenQueueName ?? defaultQueueName, taskTtl: undefined }; + return { queueName: defaultQueueName, taskTtl: undefined }; } - // When queue is overridden, we only need TTL from the task (no queue join needed) - if (overriddenQueueName) { - const task = await this.replicaPrisma.backgroundWorkerTask.findFirst({ - where: { - workerId: worker.id, - runtimeEnvironmentId: environment.id, - slug: taskId, - }, - select: { ttl: true, triggerSource: true }, + if (!meta.queueName) { + logger.debug("Failed to get queue name: No queue found", { + taskId, + environmentId: environment.id, }); - - return { queueName: overriddenQueueName, taskTtl: task?.ttl, taskKind: task?.triggerSource }; + return { queueName: defaultQueueName, taskTtl: meta.ttl, taskKind: meta.triggerSource }; } - const task = await this.replicaPrisma.backgroundWorkerTask.findFirst({ - where: { - workerId: worker.id, - runtimeEnvironmentId: environment.id, - slug: taskId, - }, - include: { - queue: true, + return { queueName: meta.queueName, taskTtl: meta.ttl, taskKind: meta.triggerSource }; + } + + /** + * Resolve task metadata for a locked-version trigger. Reads from the + * `task-meta:by-worker:{workerId}` Redis hash; falls back to a single + * BackgroundWorkerTask findFirst on miss and back-fills the cache. + * + * Returns null when no BackgroundWorkerTask row exists. + */ + private async resolveLockedTaskMetadata( + workerId: string, + environmentId: string, + slug: string + ): Promise { + const cached = await this.taskMetaCache.getByWorker(workerId, slug); + if (cached) return cached; + + const row = await this.replicaPrisma.backgroundWorkerTask.findFirst({ + where: { workerId, runtimeEnvironmentId: environmentId, slug }, + select: { + ttl: true, + triggerSource: true, + queue: { select: { id: true, name: true } }, }, }); - if (!task) { - console.log("Failed to get queue name: No task found", { - taskId, - environmentId: environment.id, - }); + if (!row) return null; - return { queueName: defaultQueueName, taskTtl: undefined }; - } + const entry: TaskMetadataEntry = { + slug, + ttl: row.ttl, + triggerSource: row.triggerSource, + queueId: row.queue?.id ?? null, + queueName: row.queue?.name ?? "", + }; - if (!task.queue) { - console.log("Failed to get queue name: No queue found", { - taskId, - environmentId: environment.id, - queueConfig: task.queueConfig, - }); + // Fire-and-forget back-fill — `setByWorker` upserts the single field and + // refreshes the hash TTL. Errors are logged inside the cache and swallowed. + void this.taskMetaCache.setByWorker(workerId, entry); - return { queueName: defaultQueueName, taskTtl: task.ttl, taskKind: task.triggerSource }; - } + return entry; + } + + /** + * Resolve task metadata for a non-locked trigger. Reads from the + * `task-meta:env:{envId}` Redis hash; falls back to + * findCurrentWorkerFromEnvironment + a single BackgroundWorkerTask findFirst + * on miss and back-fills both keyspaces. + * + * Returns null when no current worker or task can be resolved. + */ + private async resolveCurrentTaskMetadata( + environment: AuthenticatedEnvironment, + slug: string + ): Promise { + const cached = await this.taskMetaCache.getCurrent(environment.id, slug); + if (cached) return cached; + + // Cold cache: discover the current worker for the env. Replica is fine — + // the adjacent BackgroundWorkerTask lookup below uses `replicaPrisma` too + // (replica lag for "just deployed" is bounded the same way for both + // queries; reading from the writer here would only widen the window). + const worker = await findCurrentWorkerFromEnvironment(environment, this.replicaPrisma); + if (!worker) return null; + + const row = await this.replicaPrisma.backgroundWorkerTask.findFirst({ + where: { workerId: worker.id, runtimeEnvironmentId: environment.id, slug }, + select: { + ttl: true, + triggerSource: true, + queue: { select: { id: true, name: true } }, + }, + }); + + if (!row) return null; + + const entry: TaskMetadataEntry = { + slug, + ttl: row.ttl, + triggerSource: row.triggerSource, + queueId: row.queue?.id ?? null, + queueName: row.queue?.name ?? "", + }; + + // Fire-and-forget back-fill — atomically upserts the slug into both + // keyspaces so a subsequent locked-or-not trigger hits the cache. The + // env-keyspace TTL is preserved (promotion owns it); the by-worker TTL + // is refreshed (sliding window keeps active workers warm). + void this.taskMetaCache.setByCurrentWorker(environment.id, worker.id, entry); - return { queueName: task.queue.name ?? defaultQueueName, taskTtl: task.ttl, taskKind: task.triggerSource }; + return entry; } async validateQueueLimits( diff --git a/apps/webapp/app/services/taskMetadataCache.server.ts b/apps/webapp/app/services/taskMetadataCache.server.ts new file mode 100644 index 00000000000..6130295a73f --- /dev/null +++ b/apps/webapp/app/services/taskMetadataCache.server.ts @@ -0,0 +1,427 @@ +import type { Redis, Result, Callback } from "ioredis"; +import type { TaskTriggerSource } from "@trigger.dev/database"; +import { logger } from "./logger.server"; + +export type TaskMetadataEntry = { + slug: string; + ttl: string | null; + triggerSource: TaskTriggerSource; + queueId: string | null; + queueName: string; +}; + +export interface TaskMetadataCache { + /** Read a slug's metadata from the env keyspace (current pointer). */ + getCurrent(envId: string, slug: string): Promise; + /** Read a slug's metadata from the by-worker keyspace (locked-version lookups). */ + getByWorker(workerId: string, slug: string): Promise; + /** + * Atomically replace both `task-meta:env:{envId}` and + * `task-meta:by-worker:{workerId}` with the given entries. Used at deploy + * promotion sites where the worker just became current for the env. + */ + populateByCurrentWorker( + envId: string, + workerId: string, + entries: TaskMetadataEntry[] + ): Promise; + /** + * Replace `task-meta:by-worker:{workerId}` only. Used at deploy build sites + * (V4) where the worker is created but not yet promoted. + */ + populateByWorker(workerId: string, entries: TaskMetadataEntry[]): Promise; + /** + * Atomically upsert one slug in both keyspaces. Used by the non-locked + * read-path back-fill. The env-keyspace TTL is only set when no TTL is + * present (preserves the promotion boundary); the by-worker TTL is + * refreshed on every call (sliding expiry). + */ + setByCurrentWorker(envId: string, workerId: string, entry: TaskMetadataEntry): Promise; + /** + * Upsert one slug in `task-meta:by-worker:{workerId}` only. Used by the + * locked-version read-path back-fill; refreshes the by-worker TTL. + */ + setByWorker(workerId: string, entry: TaskMetadataEntry): Promise; +} + +export type RedisTaskMetadataCacheOptions = { + redis: Redis; + /** Safety TTL on `task-meta:env:{envId}`. Default 24h. Use 0 for no expiry. */ + currentEnvTtlSeconds?: number; + /** Idle TTL on `task-meta:by-worker:{workerId}`. Default 30d. Use 0 for no expiry. */ + byWorkerTtlSeconds?: number; +}; + +type EncodedEntry = { + t: string | null; + k: TaskTriggerSource; + q: string | null; + n: string; +}; + +function encode(entry: TaskMetadataEntry): string { + const payload: EncodedEntry = { + t: entry.ttl, + k: entry.triggerSource, + q: entry.queueId, + n: entry.queueName, + }; + return JSON.stringify(payload); +} + +function decode(slug: string, raw: string): TaskMetadataEntry | null { + try { + const parsed = JSON.parse(raw) as EncodedEntry; + return { + slug, + ttl: parsed.t, + triggerSource: parsed.k, + queueId: parsed.q, + queueName: parsed.n, + }; + } catch (error) { + logger.error("Failed to decode task metadata cache entry", { slug, error }); + return null; + } +} + +function currentEnvKey(envId: string): string { + return `task-meta:env:${envId}`; +} + +function byWorkerKey(workerId: string): string { + return `task-meta:by-worker:${workerId}`; +} + +/** + * Atomically replace a single HASH's contents and reset its TTL. + * + * KEYS[1] = hash key + * ARGV[1] = ttl seconds (0 = no TTL) + * ARGV[2..N] = alternating field, value pairs + */ +const REPLACE_HASH_LUA = ` +redis.call("DEL", KEYS[1]) +if #ARGV > 1 then + local fv = {} + for i = 2, #ARGV do + fv[#fv + 1] = ARGV[i] + end + redis.call("HSET", KEYS[1], unpack(fv)) +end +local ttl = tonumber(ARGV[1]) +if ttl and ttl > 0 then + redis.call("EXPIRE", KEYS[1], ttl) +end +return 1 +`; + +/** + * Reserved field name on env hashes that records the worker currently + * "owning" the env keyspace. The back-fill Lua script reads this and skips + * its env-side write if the owner has flipped — closing the race where a + * concurrent promotion atomically replaces the env hash between a resolver's + * PG read and its back-fill write. Customer task slugs are kebab/camelCase + * and never start with `__`, so collisions are not a concern; an accidental + * `getCurrent(envId, "__owner_worker_id")` would JSON.parse-fail and fall + * back to PG, not corrupt anything. + */ +const OWNER_FIELD = "__owner_worker_id"; + +/** + * Atomically replace BOTH keyspaces in one Redis transaction. Used at deploy + * promotion — the worker just became current for the env, so the env keyspace + * and the worker keyspace get the same field set, and the env hash is + * stamped with the new owner workerId. + * + * KEYS[1] = env hash key + * KEYS[2] = by-worker hash key + * ARGV[1] = env ttl seconds (0 = no TTL) + * ARGV[2] = by-worker ttl seconds (0 = no TTL) + * ARGV[3] = workerId (env-hash owner marker) + * ARGV[4..N] = alternating field, value pairs (same for both hashes) + */ +const REPLACE_TWO_HASHES_LUA = ` +redis.call("DEL", KEYS[1]) +redis.call("DEL", KEYS[2]) +if #ARGV > 3 then + local fv = {} + for i = 4, #ARGV do + fv[#fv + 1] = ARGV[i] + end + redis.call("HSET", KEYS[1], unpack(fv)) + redis.call("HSET", KEYS[2], unpack(fv)) +end +redis.call("HSET", KEYS[1], "${OWNER_FIELD}", ARGV[3]) +local envTtl = tonumber(ARGV[1]) +if envTtl and envTtl > 0 then + redis.call("EXPIRE", KEYS[1], envTtl) +end +local workerTtl = tonumber(ARGV[2]) +if workerTtl and workerTtl > 0 then + redis.call("EXPIRE", KEYS[2], workerTtl) +end +return 1 +`; + +/** + * Set a single field and refresh the HASH TTL. Used by the locked-version + * back-fill path — sliding expiry keeps active workers warm. + * + * KEYS[1] = hash key + * ARGV[1] = ttl seconds (0 = no TTL refresh) + * ARGV[2] = field + * ARGV[3] = value + */ +const SET_FIELD_REFRESH_TTL_LUA = ` +redis.call("HSET", KEYS[1], ARGV[2], ARGV[3]) +local ttl = tonumber(ARGV[1]) +if ttl and ttl > 0 then + redis.call("EXPIRE", KEYS[1], ttl) +end +return 1 +`; + +/** + * Atomically upsert one field in BOTH keyspaces. Used by the non-locked + * back-fill path. + * + * The by-worker hash always gets written (the key contains the workerId, so + * stale data lands in a dead worker's keyspace and is never read by anyone + * not pinned to that version). + * + * The env hash is CAS-guarded by `${OWNER_FIELD}`: if a concurrent promotion + * has replaced the hash between this resolver's PG read and this write, the + * stored owner won't match the workerId the back-filler resolved to, so the + * env write is skipped — preventing the back-fill from overwriting a freshly + * promoted slug with stale data from the previous worker. + * + * KEYS[1] = env hash key + * KEYS[2] = by-worker hash key + * ARGV[1] = env ttl seconds (0 = no TTL) + * ARGV[2] = by-worker ttl seconds (0 = no TTL) + * ARGV[3] = writer's expected env-hash owner workerId + * ARGV[4] = field + * ARGV[5] = value + */ +const SET_TWO_FIELDS_LUA = ` +redis.call("HSET", KEYS[2], ARGV[4], ARGV[5]) +local workerTtl = tonumber(ARGV[2]) +if workerTtl and workerTtl > 0 then + redis.call("EXPIRE", KEYS[2], workerTtl) +end + +local owner = redis.call("HGET", KEYS[1], "${OWNER_FIELD}") +if owner == false or owner == ARGV[3] then + redis.call("HSET", KEYS[1], ARGV[4], ARGV[5]) + if owner == false then + redis.call("HSET", KEYS[1], "${OWNER_FIELD}", ARGV[3]) + end + local envTtl = tonumber(ARGV[1]) + if envTtl and envTtl > 0 and redis.call("TTL", KEYS[1]) == -1 then + redis.call("EXPIRE", KEYS[1], envTtl) + end +end +return 1 +`; + +declare module "ioredis" { + interface RedisCommander { + taskMetaReplaceHash( + key: string, + ttlSeconds: string, + ...fieldValues: string[] + ): Result; + taskMetaReplaceTwoHashes( + envKey: string, + workerKey: string, + envTtlSeconds: string, + workerTtlSeconds: string, + workerId: string, + ...fieldValues: string[] + ): Result; + taskMetaSetFieldRefreshTtl( + key: string, + ttlSeconds: string, + field: string, + value: string, + callback?: Callback + ): Result; + taskMetaSetTwoFields( + envKey: string, + workerKey: string, + envTtlSeconds: string, + workerTtlSeconds: string, + workerId: string, + field: string, + value: string, + callback?: Callback + ): Result; + } +} + +export class RedisTaskMetadataCache implements TaskMetadataCache { + private readonly redis: Redis; + private readonly currentEnvTtlSeconds: number; + private readonly byWorkerTtlSeconds: number; + + constructor(options: RedisTaskMetadataCacheOptions) { + this.redis = options.redis; + this.currentEnvTtlSeconds = options.currentEnvTtlSeconds ?? 86400; + this.byWorkerTtlSeconds = options.byWorkerTtlSeconds ?? 30 * 24 * 60 * 60; + + this.redis.defineCommand("taskMetaReplaceHash", { + numberOfKeys: 1, + lua: REPLACE_HASH_LUA, + }); + this.redis.defineCommand("taskMetaReplaceTwoHashes", { + numberOfKeys: 2, + lua: REPLACE_TWO_HASHES_LUA, + }); + this.redis.defineCommand("taskMetaSetFieldRefreshTtl", { + numberOfKeys: 1, + lua: SET_FIELD_REFRESH_TTL_LUA, + }); + this.redis.defineCommand("taskMetaSetTwoFields", { + numberOfKeys: 2, + lua: SET_TWO_FIELDS_LUA, + }); + } + + async getCurrent(envId: string, slug: string): Promise { + return this.#get(currentEnvKey(envId), slug); + } + + async getByWorker(workerId: string, slug: string): Promise { + return this.#get(byWorkerKey(workerId), slug); + } + + async populateByCurrentWorker( + envId: string, + workerId: string, + entries: TaskMetadataEntry[] + ): Promise { + try { + // Always invoke the script — empty `entries` is valid and causes both + // keyspaces to be cleared (DEL + no HSET), which is the right behavior + // when promoting a worker with no tasks. + const fieldValues: string[] = []; + for (const entry of entries) { + fieldValues.push(entry.slug, encode(entry)); + } + await this.redis.taskMetaReplaceTwoHashes( + currentEnvKey(envId), + byWorkerKey(workerId), + String(this.currentEnvTtlSeconds), + String(this.byWorkerTtlSeconds), + workerId, + ...fieldValues + ); + } catch (error) { + logger.error("Failed to populate task metadata cache (current worker)", { + envId, + workerId, + error, + }); + } + } + + async populateByWorker(workerId: string, entries: TaskMetadataEntry[]): Promise { + try { + // Always invoke the script — empty `entries` clears the keyspace. + const fieldValues: string[] = []; + for (const entry of entries) { + fieldValues.push(entry.slug, encode(entry)); + } + await this.redis.taskMetaReplaceHash( + byWorkerKey(workerId), + String(this.byWorkerTtlSeconds), + ...fieldValues + ); + } catch (error) { + logger.error("Failed to populate task metadata cache (by worker)", { + workerId, + error, + }); + } + } + + async setByCurrentWorker( + envId: string, + workerId: string, + entry: TaskMetadataEntry + ): Promise { + try { + await this.redis.taskMetaSetTwoFields( + currentEnvKey(envId), + byWorkerKey(workerId), + String(this.currentEnvTtlSeconds), + String(this.byWorkerTtlSeconds), + workerId, + entry.slug, + encode(entry) + ); + } catch (error) { + logger.error("Failed to set task metadata cache field (current worker)", { + envId, + workerId, + slug: entry.slug, + error, + }); + } + } + + async setByWorker(workerId: string, entry: TaskMetadataEntry): Promise { + try { + await this.redis.taskMetaSetFieldRefreshTtl( + byWorkerKey(workerId), + String(this.byWorkerTtlSeconds), + entry.slug, + encode(entry) + ); + } catch (error) { + logger.error("Failed to set task metadata cache field (by worker)", { + workerId, + slug: entry.slug, + error, + }); + } + } + + async #get(key: string, slug: string): Promise { + try { + const raw = await this.redis.hget(key, slug); + if (!raw) return null; + return decode(slug, raw); + } catch (error) { + logger.error("Failed to read task metadata from cache", { key, slug, error }); + return null; + } + } +} + +export class NoopTaskMetadataCache implements TaskMetadataCache { + async getCurrent(): Promise { + return null; + } + + async getByWorker(): Promise { + return null; + } + + async populateByCurrentWorker(): Promise { + // intentionally empty + } + + async populateByWorker(): Promise { + // intentionally empty + } + + async setByCurrentWorker(): Promise { + // intentionally empty + } + + async setByWorker(): Promise { + // intentionally empty + } +} diff --git a/apps/webapp/app/services/taskMetadataCacheInstance.server.ts b/apps/webapp/app/services/taskMetadataCacheInstance.server.ts new file mode 100644 index 00000000000..b673cf122ed --- /dev/null +++ b/apps/webapp/app/services/taskMetadataCacheInstance.server.ts @@ -0,0 +1,38 @@ +import { Redis } from "ioredis"; +import { defaultReconnectOnError } from "@internal/redis"; +import { env } from "~/env.server"; +import { singleton } from "~/utils/singleton"; +import { + NoopTaskMetadataCache, + RedisTaskMetadataCache, + type TaskMetadataCache, +} from "./taskMetadataCache.server"; + +export const taskMetadataCacheInstance: TaskMetadataCache = singleton( + "taskMetadataCacheInstance", + initializeTaskMetadataCache +); + +function initializeTaskMetadataCache(): TaskMetadataCache { + if (!env.TASK_META_CACHE_REDIS_HOST) { + return new NoopTaskMetadataCache(); + } + + const redis = new Redis({ + connectionName: "taskMetadataCache", + host: env.TASK_META_CACHE_REDIS_HOST, + port: env.TASK_META_CACHE_REDIS_PORT, + username: env.TASK_META_CACHE_REDIS_USERNAME, + password: env.TASK_META_CACHE_REDIS_PASSWORD, + keyPrefix: "tr:", + enableAutoPipelining: true, + reconnectOnError: defaultReconnectOnError, + ...(env.TASK_META_CACHE_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }), + }); + + return new RedisTaskMetadataCache({ + redis, + currentEnvTtlSeconds: env.TASK_META_CACHE_CURRENT_ENV_TTL_SECONDS, + byWorkerTtlSeconds: env.TASK_META_CACHE_BY_WORKER_TTL_SECONDS, + }); +} diff --git a/apps/webapp/app/v3/services/changeCurrentDeployment.server.ts b/apps/webapp/app/v3/services/changeCurrentDeployment.server.ts index 288374ec61a..6022c172908 100644 --- a/apps/webapp/app/v3/services/changeCurrentDeployment.server.ts +++ b/apps/webapp/app/v3/services/changeCurrentDeployment.server.ts @@ -1,8 +1,13 @@ import { BackgroundWorkerMetadata, tryCatch } from "@trigger.dev/core/v3"; import { CURRENT_DEPLOYMENT_LABEL } from "@trigger.dev/core/v3/isomorphic"; -import { WorkerDeployment } from "@trigger.dev/database"; +import { PrismaClientOrTransaction, WorkerDeployment } from "@trigger.dev/database"; import { logger } from "~/services/logger.server"; import { syncTaskIdentifiers } from "~/services/taskIdentifierRegistry.server"; +import { + type TaskMetadataCache, + type TaskMetadataEntry, +} from "~/services/taskMetadataCache.server"; +import { taskMetadataCacheInstance } from "~/services/taskMetadataCacheInstance.server"; import { BaseService, ServiceValidationError } from "./baseService.server"; import { syncDeclarativeSchedules } from "./createBackgroundWorker.server"; import { ExecuteTasksWaitingForDeployService } from "./executeTasksWaitingForDeploy"; @@ -11,6 +16,17 @@ import { compareDeploymentVersions } from "../utils/deploymentVersions"; export type ChangeCurrentDeploymentDirection = "promote" | "rollback"; export class ChangeCurrentDeploymentService extends BaseService { + private readonly _taskMetaCache: TaskMetadataCache; + + constructor( + prisma?: PrismaClientOrTransaction, + replica?: PrismaClientOrTransaction, + taskMetaCache: TaskMetadataCache = taskMetadataCacheInstance + ) { + super(prisma, replica); + this._taskMetaCache = taskMetaCache; + } + public async call( deployment: WorkerDeployment, direction: ChangeCurrentDeploymentDirection, @@ -96,23 +112,59 @@ export class ChangeCurrentDeploymentService extends BaseService { }, }); - const [syncError] = await tryCatch( - (async () => { - const tasks = await this._prisma.backgroundWorkerTask.findMany({ - where: { workerId: deployment.workerId! }, - select: { slug: true, triggerSource: true }, - }); - await syncTaskIdentifiers( + const [fetchTasksError, tasks] = await tryCatch( + this._prisma.backgroundWorkerTask.findMany({ + where: { workerId: deployment.workerId! }, + select: { + slug: true, + triggerSource: true, + ttl: true, + queue: { select: { id: true, name: true } }, + }, + }) + ); + + if (fetchTasksError) { + logger.error("Error fetching worker tasks on deployment change", { + error: fetchTasksError, + }); + } + + if (tasks) { + // Side effect 1: refresh the `TaskIdentifier` table and the existing + // `tids:` Redis cache so the task-listing UI reflects the new deploy. + const [syncIdentifiersError] = await tryCatch( + syncTaskIdentifiers( deployment.environmentId, deployment.projectId, deployment.workerId!, tasks.map((t) => ({ id: t.slug, triggerSource: t.triggerSource })) - ); - })() - ); + ) + ); - if (syncError) { - logger.error("Error syncing task identifiers on deployment change", { error: syncError }); + if (syncIdentifiersError) { + logger.error("Error syncing task identifiers on deployment change", { + error: syncIdentifiersError, + }); + } + + // Side effect 2: refresh the `task-meta:` cache that the queue resolver + // reads from. Independent of side effect 1 — if `syncTaskIdentifiers` + // throws, the queue resolver still gets a warm cache for the new worker. + const metadataEntries: TaskMetadataEntry[] = tasks.map((t) => ({ + slug: t.slug, + ttl: t.ttl, + triggerSource: t.triggerSource, + queueId: t.queue?.id ?? null, + queueName: t.queue?.name ?? "", + })); + + // Cache calls log+swallow internally. + await this._taskMetaCache.populateByCurrentWorker( + deployment.environmentId, + deployment.workerId!, + metadataEntries + ); } const [scheduleSyncError] = await tryCatch(this.#syncSchedulesForDeployment(deployment)); diff --git a/apps/webapp/app/v3/services/createBackgroundWorker.server.ts b/apps/webapp/app/v3/services/createBackgroundWorker.server.ts index da79e386afb..ec3f6d077ad 100644 --- a/apps/webapp/app/v3/services/createBackgroundWorker.server.ts +++ b/apps/webapp/app/v3/services/createBackgroundWorker.server.ts @@ -14,6 +14,11 @@ import { sanitizeQueueName } from "~/models/taskQueue.server"; import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; import { logger } from "~/services/logger.server"; import { syncTaskIdentifiers } from "~/services/taskIdentifierRegistry.server"; +import { + type TaskMetadataCache, + type TaskMetadataEntry, +} from "~/services/taskMetadataCache.server"; +import { taskMetadataCacheInstance } from "~/services/taskMetadataCacheInstance.server"; import { generateFriendlyId } from "../friendlyIdentifiers"; import { removeQueueConcurrencyLimits, @@ -56,6 +61,17 @@ export function stripBackgroundWorkerMetadataForStorage( } export class CreateBackgroundWorkerService extends BaseService { + private readonly _taskMetaCache: TaskMetadataCache; + + constructor( + prisma?: PrismaClientOrTransaction, + replica?: PrismaClientOrTransaction, + taskMetaCache: TaskMetadataCache = taskMetadataCacheInstance + ) { + super(prisma, replica); + this._taskMetaCache = taskMetaCache; + } + public async call( projectRef: string, environment: AuthenticatedEnvironment, @@ -147,7 +163,7 @@ export class CreateBackgroundWorkerService extends BaseService { throw new ServiceValidationError("Error creating background worker files"); } - const [resourcesError] = await tryCatch( + const [resourcesError, workerTaskEntries] = await tryCatch( createWorkerResources( body.metadata, backgroundWorker, @@ -212,6 +228,26 @@ export class CreateBackgroundWorkerService extends BaseService { }); } + // Populate task metadata cache. DEV workers are always "current" because + // `findCurrentWorkerFromEnvironment` resolves DEV current as the latest + // worker by createdAt. Non-DEV (deploy-built) workers are not promoted + // here — promotion writes the `:env:` keyspace later in + // changeCurrentDeployment / createDeploymentBackgroundWorkerV3. + // Cache calls log+swallow internally, so a Redis blip can't break + // anything else here. Empty `workerTaskEntries` is intentional — the + // populate methods clear stale hashes for zero-task deploys. + if (workerTaskEntries) { + if (environment.type === "DEVELOPMENT") { + await this._taskMetaCache.populateByCurrentWorker( + environment.id, + backgroundWorker.id, + workerTaskEntries + ); + } else { + await this._taskMetaCache.populateByWorker(backgroundWorker.id, workerTaskEntries); + } + } + const [updateConcurrencyLimitsError] = await tryCatch( updateEnvConcurrencyLimits(environment) ); @@ -265,17 +301,26 @@ export async function createWorkerResources( environment: AuthenticatedEnvironment, prisma: PrismaClientOrTransaction, tasksToBackgroundFiles?: Map -) { +): Promise { // Create the queues const queues = await createWorkerQueues(metadata, worker, environment, prisma); // Create the tasks - await createWorkerTasks(metadata, queues, worker, environment, prisma, tasksToBackgroundFiles); + const taskEntries = await createWorkerTasks( + metadata, + queues, + worker, + environment, + prisma, + tasksToBackgroundFiles + ); // Register prompts if (metadata.prompts && metadata.prompts.length > 0) { await createWorkerPrompts(metadata.prompts, worker, environment, prisma); } + + return taskEntries; } async function createWorkerTasks( @@ -285,17 +330,22 @@ async function createWorkerTasks( environment: AuthenticatedEnvironment, prisma: PrismaClientOrTransaction, tasksToBackgroundFiles?: Map -) { +): Promise { // Create tasks in chunks of 20 const CHUNK_SIZE = 20; + const entries: TaskMetadataEntry[] = []; for (let i = 0; i < metadata.tasks.length; i += CHUNK_SIZE) { const chunk = metadata.tasks.slice(i, i + CHUNK_SIZE); - await Promise.all( + const chunkEntries = await Promise.all( chunk.map((task) => createWorkerTask(task, queues, worker, environment, prisma, tasksToBackgroundFiles) ) ); + for (const entry of chunkEntries) { + if (entry) entries.push(entry); + } } + return entries; } async function createWorkerTask( @@ -305,7 +355,7 @@ async function createWorkerTask( environment: AuthenticatedEnvironment, prisma: PrismaClientOrTransaction, tasksToBackgroundFiles?: Map -) { +): Promise { try { let queue = queues.find((queue) => queue.name === task.queue?.name); @@ -331,6 +381,9 @@ async function createWorkerTask( ? ("AGENT" as const) : ("STANDARD" as const); + const resolvedTtl = + typeof task.ttl === "number" ? stringifyDuration(task.ttl) ?? null : task.ttl ?? null; + await prisma.backgroundWorkerTask.create({ data: { friendlyId: generateFriendlyId("task"), @@ -348,12 +401,19 @@ async function createWorkerTask( config: task.agentConfig ? (task.agentConfig as any) : undefined, fileId: tasksToBackgroundFiles?.get(task.id) ?? null, maxDurationInSeconds: task.maxDuration ? clampMaxDuration(task.maxDuration) : null, - ttl: - typeof task.ttl === "number" ? stringifyDuration(task.ttl) ?? null : task.ttl ?? null, + ttl: resolvedTtl, queueId: queue.id, payloadSchema: task.payloadSchema as any, }, }); + + return { + slug: task.id, + ttl: resolvedTtl, + triggerSource: resolvedTriggerSource, + queueId: queue.id, + queueName: queue.name, + }; } catch (error) { if (error instanceof Prisma.PrismaClientKnownRequestError) { // The error code for unique constraint violation in Prisma is P2002 @@ -389,6 +449,7 @@ async function createWorkerTask( worker, }); } + return null; } } diff --git a/apps/webapp/app/v3/services/createDeploymentBackgroundWorkerV3.server.ts b/apps/webapp/app/v3/services/createDeploymentBackgroundWorkerV3.server.ts index f20a024d2d6..d8f13227d78 100644 --- a/apps/webapp/app/v3/services/createDeploymentBackgroundWorkerV3.server.ts +++ b/apps/webapp/app/v3/services/createDeploymentBackgroundWorkerV3.server.ts @@ -1,8 +1,10 @@ import { CreateBackgroundWorkerRequestBody, tryCatch } from "@trigger.dev/core/v3"; -import type { BackgroundWorker } from "@trigger.dev/database"; +import type { BackgroundWorker, PrismaClientOrTransaction } from "@trigger.dev/database"; import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; import { logger } from "~/services/logger.server"; import { syncTaskIdentifiers } from "~/services/taskIdentifierRegistry.server"; +import { type TaskMetadataCache } from "~/services/taskMetadataCache.server"; +import { taskMetadataCacheInstance } from "~/services/taskMetadataCacheInstance.server"; import { socketIo } from "../handleSocketIo.server"; import { updateEnvConcurrencyLimits } from "../runQueue.server"; import { PerformDeploymentAlertsService } from "./alerts/performDeploymentAlerts.server"; @@ -24,6 +26,17 @@ import { CURRENT_DEPLOYMENT_LABEL, BackgroundWorkerId } from "@trigger.dev/core/ * @deprecated */ export class CreateDeploymentBackgroundWorkerServiceV3 extends BaseService { + private readonly _taskMetaCache: TaskMetadataCache; + + constructor( + prisma?: PrismaClientOrTransaction, + replica?: PrismaClientOrTransaction, + taskMetaCache: TaskMetadataCache = taskMetadataCacheInstance + ) { + super(prisma, replica); + this._taskMetaCache = taskMetaCache; + } + public async call( projectRef: string, environment: AuthenticatedEnvironment, @@ -74,8 +87,14 @@ export class CreateDeploymentBackgroundWorkerServiceV3 extends BaseService { }); } + let workerTaskEntries: Awaited> = []; try { - await createWorkerResources(body.metadata, backgroundWorker, environment, this._prisma); + workerTaskEntries = await createWorkerResources( + body.metadata, + backgroundWorker, + environment, + this._prisma + ); await syncDeclarativeSchedules( body.metadata.tasks, backgroundWorker, @@ -147,6 +166,16 @@ export class CreateDeploymentBackgroundWorkerServiceV3 extends BaseService { logger.error("Error syncing task identifiers", { error: syncIdError }); } + // V3 promotes the deployment immediately above, so this worker is now + // current for the env — write both keyspaces atomically. Cache calls + // log+swallow internally. Empty `workerTaskEntries` is intentional: the + // populate methods clear stale hashes for zero-task deploys. + await this._taskMetaCache.populateByCurrentWorker( + environment.id, + backgroundWorker.id, + workerTaskEntries + ); + try { //send a notification that a new worker has been created await projectPubSub.publish( diff --git a/apps/webapp/app/v3/services/createDeploymentBackgroundWorkerV4.server.ts b/apps/webapp/app/v3/services/createDeploymentBackgroundWorkerV4.server.ts index f764d39dc7b..ff041359bb0 100644 --- a/apps/webapp/app/v3/services/createDeploymentBackgroundWorkerV4.server.ts +++ b/apps/webapp/app/v3/services/createDeploymentBackgroundWorkerV4.server.ts @@ -1,7 +1,9 @@ import { CreateBackgroundWorkerRequestBody, logger, tryCatch } from "@trigger.dev/core/v3"; import { BackgroundWorkerId } from "@trigger.dev/core/v3/isomorphic"; -import type { BackgroundWorker, WorkerDeployment } from "@trigger.dev/database"; +import type { BackgroundWorker, PrismaClientOrTransaction, WorkerDeployment } from "@trigger.dev/database"; import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; +import { type TaskMetadataCache } from "~/services/taskMetadataCache.server"; +import { taskMetadataCacheInstance } from "~/services/taskMetadataCacheInstance.server"; import { BaseService, ServiceValidationError } from "./baseService.server"; import { createBackgroundFiles, @@ -13,6 +15,17 @@ import { TimeoutDeploymentService } from "./timeoutDeployment.server"; import { env } from "~/env.server"; export class CreateDeploymentBackgroundWorkerServiceV4 extends BaseService { + private readonly _taskMetaCache: TaskMetadataCache; + + constructor( + prisma?: PrismaClientOrTransaction, + replica?: PrismaClientOrTransaction, + taskMetaCache: TaskMetadataCache = taskMetadataCacheInstance + ) { + super(prisma, replica); + this._taskMetaCache = taskMetaCache; + } + public async call( environment: AuthenticatedEnvironment, deploymentId: string, @@ -110,7 +123,7 @@ export class CreateDeploymentBackgroundWorkerServiceV4 extends BaseService { throw serviceError; } - const [resourcesError] = await tryCatch( + const [resourcesError, workerTaskEntries] = await tryCatch( createWorkerResources( body.metadata, backgroundWorker, @@ -134,6 +147,16 @@ export class CreateDeploymentBackgroundWorkerServiceV4 extends BaseService { throw serviceError; } + // V4 build path: worker created but NOT yet promoted to current. Write + // only the `task-meta:by-worker:{workerId}` keyspace so locked-version + // triggers against this build hit the cache. Promotion (which writes the + // env keyspace) happens later via finalizeDeployment → changeCurrentDeployment. + // Cache calls log+swallow internally, so a Redis blip can't stall the + // deployment state machine. Empty entries clears stale hashes. + if (workerTaskEntries) { + await this._taskMetaCache.populateByWorker(backgroundWorker.id, workerTaskEntries); + } + const [schedulesError] = await tryCatch( syncDeclarativeSchedules(body.metadata.tasks, backgroundWorker, environment, this._prisma) ); diff --git a/apps/webapp/test/engine/triggerTask.test.ts b/apps/webapp/test/engine/triggerTask.test.ts index ddceb8754c1..798e39e0601 100644 --- a/apps/webapp/test/engine/triggerTask.test.ts +++ b/apps/webapp/test/engine/triggerTask.test.ts @@ -20,8 +20,10 @@ import { assertNonNullable, containerTest } from "@internal/testcontainers"; import { trace } from "@opentelemetry/api"; import { IOPacket } from "@trigger.dev/core/v3"; import { TaskRun } from "@trigger.dev/database"; +import { Redis } from "ioredis"; import { IdempotencyKeyConcern } from "~/runEngine/concerns/idempotencyKeys.server"; import { DefaultQueueManager } from "~/runEngine/concerns/queues.server"; +import { RedisTaskMetadataCache } from "~/services/taskMetadataCache.server"; import { EntitlementValidationParams, MaxAttemptsValidationParams, @@ -1173,3 +1175,295 @@ describe("RunEngineTriggerTaskService", () => { } ); }); + +describe("DefaultQueueManager task metadata cache", () => { + containerTest( + "warm cache returns metadata without falling through to PG", + async ({ prisma, redisOptions }) => { + const engine = new RunEngine({ + prisma, + worker: { redis: redisOptions, workers: 1, tasksPerWorker: 10, pollIntervalMs: 100 }, + queue: { redis: redisOptions }, + runLock: { redis: redisOptions }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { name: "small-1x", cpu: 0.5, memory: 0.5, centsPerMs: 0.0001 }, + }, + baseCostInCents: 0.0005, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + const environment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + const taskIdentifier = "cached-task"; + const setup = await setupBackgroundWorker(engine, environment, taskIdentifier); + + const redis = new Redis(redisOptions); + const cache = new RedisTaskMetadataCache({ redis }); + + // Pre-populate cache with AGENT triggerSource; DB row has the default STANDARD. + // If the read path hits the cache, the resulting TaskRun.taskKind reflects the + // cached value. If it falls through to PG, it reflects STANDARD. + await cache.populateByCurrentWorker(environment.id, setup.worker.id, [ + { + slug: taskIdentifier, + ttl: null, + triggerSource: "AGENT", + queueId: null, + queueName: `task/${taskIdentifier}`, + }, + ]); + + const queuesManager = new DefaultQueueManager(prisma, engine, undefined, cache); + const triggerTaskService = new RunEngineTriggerTaskService({ + engine, + prisma, + payloadProcessor: new MockPayloadProcessor(), + queueConcern: queuesManager, + idempotencyKeyConcern: new IdempotencyKeyConcern(prisma, engine, new MockTraceEventConcern()), + validator: new MockTriggerTaskValidator(), + traceEventConcern: new MockTraceEventConcern(), + tracer: trace.getTracer("test", "0.0.0"), + metadataMaximumSize: 1024 * 1024, + }); + + const result = await triggerTaskService.call({ + taskId: taskIdentifier, + environment, + body: { payload: { test: "x" } }, + }); + + assertNonNullable(result); + expect(result.run.taskIdentifier).toBe(taskIdentifier); + expect((result.run.annotations as { taskKind?: string } | null)?.taskKind).toBe("AGENT"); + + await redis.quit(); + await engine.quit(); + } + ); + + containerTest( + "cache miss falls through to PG and back-fills the cache", + async ({ prisma, redisOptions }) => { + const engine = new RunEngine({ + prisma, + worker: { redis: redisOptions, workers: 1, tasksPerWorker: 10, pollIntervalMs: 100 }, + queue: { redis: redisOptions }, + runLock: { redis: redisOptions }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { name: "small-1x", cpu: 0.5, memory: 0.5, centsPerMs: 0.0001 }, + }, + baseCostInCents: 0.0005, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + const environment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + const taskIdentifier = "miss-task"; + await setupBackgroundWorker(engine, environment, taskIdentifier); + + const redis = new Redis(redisOptions); + const cache = new RedisTaskMetadataCache({ redis }); + + // Cache starts empty. Sanity-check both keyspaces. + expect(await cache.getCurrent(environment.id, taskIdentifier)).toBeNull(); + + const queuesManager = new DefaultQueueManager(prisma, engine, undefined, cache); + const triggerTaskService = new RunEngineTriggerTaskService({ + engine, + prisma, + payloadProcessor: new MockPayloadProcessor(), + queueConcern: queuesManager, + idempotencyKeyConcern: new IdempotencyKeyConcern(prisma, engine, new MockTraceEventConcern()), + validator: new MockTriggerTaskValidator(), + traceEventConcern: new MockTraceEventConcern(), + tracer: trace.getTracer("test", "0.0.0"), + metadataMaximumSize: 1024 * 1024, + }); + + const result = await triggerTaskService.call({ + taskId: taskIdentifier, + environment, + body: { payload: { test: "x" } }, + }); + + assertNonNullable(result); + expect((result.run.annotations as { taskKind?: string } | null)?.taskKind).toBe("STANDARD"); + + // Back-fill is fire-and-forget; poll with a bounded timeout to avoid CI flakes. + let backfilled = await cache.getCurrent(environment.id, taskIdentifier); + for (let i = 0; i < 40 && !backfilled; i++) { + await setTimeout(25); + backfilled = await cache.getCurrent(environment.id, taskIdentifier); + } + expect(backfilled).not.toBeNull(); + expect(backfilled?.triggerSource).toBe("STANDARD"); + expect(backfilled?.queueName).toBe(`task/${taskIdentifier}`); + + await redis.quit(); + await engine.quit(); + } + ); + + containerTest( + "queue-override + ttl path returns taskKind from cache without a BWT lookup", + async ({ prisma, redisOptions }) => { + const engine = new RunEngine({ + prisma, + worker: { redis: redisOptions, workers: 1, tasksPerWorker: 10, pollIntervalMs: 100 }, + queue: { redis: redisOptions }, + runLock: { redis: redisOptions }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { name: "small-1x", cpu: 0.5, memory: 0.5, centsPerMs: 0.0001 }, + }, + baseCostInCents: 0.0005, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + const environment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + const taskIdentifier = "override-task"; + const setup = await setupBackgroundWorker(engine, environment, taskIdentifier); + + const redis = new Redis(redisOptions); + const cache = new RedisTaskMetadataCache({ redis }); + + // Cache says AGENT; DB row says STANDARD. Caller provides both a queue + // override and an explicit TTL — the hot path the PR regressed. + await cache.populateByCurrentWorker(environment.id, setup.worker.id, [ + { + slug: taskIdentifier, + ttl: null, + triggerSource: "AGENT", + queueId: null, + queueName: `task/${taskIdentifier}`, + }, + ]); + + const queuesManager = new DefaultQueueManager(prisma, engine, undefined, cache); + const triggerTaskService = new RunEngineTriggerTaskService({ + engine, + prisma, + payloadProcessor: new MockPayloadProcessor(), + queueConcern: queuesManager, + idempotencyKeyConcern: new IdempotencyKeyConcern(prisma, engine, new MockTraceEventConcern()), + validator: new MockTriggerTaskValidator(), + traceEventConcern: new MockTraceEventConcern(), + tracer: trace.getTracer("test", "0.0.0"), + metadataMaximumSize: 1024 * 1024, + }); + + const result = await triggerTaskService.call({ + taskId: taskIdentifier, + environment, + body: { + payload: { test: "x" }, + options: { + queue: { name: "caller-queue" }, + ttl: "5m", + }, + }, + }); + + assertNonNullable(result); + expect(result.run.queue).toBe("caller-queue"); + expect((result.run.annotations as { taskKind?: string } | null)?.taskKind).toBe("AGENT"); + + await redis.quit(); + await engine.quit(); + } + ); + + containerTest( + "locked-version trigger reads from by-worker keyspace, not env keyspace", + async ({ prisma, redisOptions }) => { + const engine = new RunEngine({ + prisma, + worker: { redis: redisOptions, workers: 1, tasksPerWorker: 10, pollIntervalMs: 100 }, + queue: { redis: redisOptions }, + runLock: { redis: redisOptions }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { name: "small-1x", cpu: 0.5, memory: 0.5, centsPerMs: 0.0001 }, + }, + baseCostInCents: 0.0005, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + const environment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + const taskIdentifier = "keyspace-task"; + const worker = await setupBackgroundWorker(engine, environment, taskIdentifier); + + const redis = new Redis(redisOptions); + const cache = new RedisTaskMetadataCache({ redis }); + + // Populate the two keyspaces with conflicting triggerSource values so we + // can tell which keyspace the read used. The real worker's by-worker + // hash gets AGENT; the env hash gets SCHEDULED (seeded via a throwaway + // worker id since `populateByCurrentWorker` writes both keyspaces and + // we want the real worker's by-worker hash untouched). + await cache.populateByWorker(worker.worker.id, [ + { + slug: taskIdentifier, + ttl: null, + triggerSource: "AGENT", + queueId: null, + queueName: `task/${taskIdentifier}`, + }, + ]); + await cache.populateByCurrentWorker(environment.id, "dummy-worker-for-env-seed", [ + { + slug: taskIdentifier, + ttl: null, + triggerSource: "SCHEDULED", + queueId: null, + queueName: `task/${taskIdentifier}`, + }, + ]); + + const queuesManager = new DefaultQueueManager(prisma, engine, undefined, cache); + const triggerTaskService = new RunEngineTriggerTaskService({ + engine, + prisma, + payloadProcessor: new MockPayloadProcessor(), + queueConcern: queuesManager, + idempotencyKeyConcern: new IdempotencyKeyConcern(prisma, engine, new MockTraceEventConcern()), + validator: new MockTriggerTaskValidator(), + traceEventConcern: new MockTraceEventConcern(), + tracer: trace.getTracer("test", "0.0.0"), + metadataMaximumSize: 1024 * 1024, + }); + + // Locked → by-worker keyspace → AGENT + const locked = await triggerTaskService.call({ + taskId: taskIdentifier, + environment, + body: { + payload: { test: "x" }, + options: { lockToVersion: worker.worker.version }, + }, + }); + assertNonNullable(locked); + expect((locked.run.annotations as { taskKind?: string } | null)?.taskKind).toBe("AGENT"); + + // Not locked → env keyspace → SCHEDULED + const current = await triggerTaskService.call({ + taskId: taskIdentifier, + environment, + body: { payload: { test: "y" } }, + }); + assertNonNullable(current); + expect((current.run.annotations as { taskKind?: string } | null)?.taskKind).toBe("SCHEDULED"); + + await redis.quit(); + await engine.quit(); + } + ); +});