From bdaaf060260d2710a8947a4d7ea3bce9868979ac Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Fri, 15 May 2026 06:40:31 +0100 Subject: [PATCH 1/7] perf(webapp): cache task metadata in Redis for the trigger hotpath MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The trigger-task hotpath used to early-return without a DB query when a caller passed both a queue override and a per-trigger TTL — the hottest configuration on the trigger API. Adding triggerSource to the resolver so the runs-list Source filter could distinguish STANDARD / SCHEDULED / AGENT runs removed those early-returns, costing +2 DB queries per trigger on non-locked calls and +1 on locked calls. The resolver now reads BackgroundWorkerTask metadata (ttl, triggerSource, queueId, queueName) from a Redis HASH keyed by env or by worker, with PG fallback on miss that back-fills the cache. Two key spaces: - task-meta:env:{envId} refreshed at every deploy promotion; 24h safety TTL. - task-meta:by-worker:{workerId} used for lockToVersion triggers; 30d sliding TTL. Cache writes use Lua scripts via defineCommand so DEL + HSET + EXPIRE land atomically (concurrent readers never see the empty intermediate state of a naive pipeline). Read-path back-fill uses single-field upserts so concurrent back-fills don't wipe each other's siblings. Configurable via a new TASK_META_CACHE_REDIS_* env-var prefix that falls back to the default REDIS_* set, so operators can route the cache to a dedicated Redis instance if they want. --- .server-changes/task-metadata-cache.md | 6 + apps/webapp/app/env.server.ts | 24 ++ .../app/runEngine/concerns/queues.server.ts | 232 +++++++----- .../app/services/taskMetadataCache.server.ts | 304 ++++++++++++++++ .../taskMetadataCacheInstance.server.ts | 38 ++ .../app/services/taskMetadataSync.server.ts | 22 ++ .../changeCurrentDeployment.server.ts | 88 ++++- .../services/createBackgroundWorker.server.ts | 73 +++- ...eateDeploymentBackgroundWorkerV3.server.ts | 36 +- ...eateDeploymentBackgroundWorkerV4.server.ts | 32 +- apps/webapp/test/engine/triggerTask.test.ts | 339 ++++++++++++++++++ 11 files changed, 1079 insertions(+), 115 deletions(-) create mode 100644 .server-changes/task-metadata-cache.md create mode 100644 apps/webapp/app/services/taskMetadataCache.server.ts create mode 100644 apps/webapp/app/services/taskMetadataCacheInstance.server.ts create mode 100644 apps/webapp/app/services/taskMetadataSync.server.ts diff --git a/.server-changes/task-metadata-cache.md b/.server-changes/task-metadata-cache.md new file mode 100644 index 00000000000..a71bbdf347b --- /dev/null +++ b/.server-changes/task-metadata-cache.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: improvement +--- + +Cache task defaults in Redis so the trigger API skips per-request database lookups, restoring the fast trigger path when callers pass queue and TTL options. diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index 97cccbc1710..8eacb9634e1 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -235,6 +235,30 @@ const EnvironmentSchema = z CACHE_REDIS_TLS_DISABLED: z.string().default(process.env.REDIS_TLS_DISABLED ?? "false"), CACHE_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"), + TASK_META_CACHE_REDIS_HOST: z + .string() + .optional() + .transform((v) => v ?? process.env.REDIS_HOST), + TASK_META_CACHE_REDIS_PORT: z.coerce + .number() + .optional() + .transform( + (v) => v ?? (process.env.REDIS_PORT ? parseInt(process.env.REDIS_PORT) : undefined) + ), + TASK_META_CACHE_REDIS_USERNAME: z + .string() + .optional() + .transform((v) => v ?? process.env.REDIS_USERNAME), + TASK_META_CACHE_REDIS_PASSWORD: z + .string() + .optional() + .transform((v) => v ?? process.env.REDIS_PASSWORD), + TASK_META_CACHE_REDIS_TLS_DISABLED: z + .string() + .default(process.env.REDIS_TLS_DISABLED ?? "false"), + TASK_META_CACHE_CURRENT_ENV_TTL_SECONDS: z.coerce.number().default(86400), + TASK_META_CACHE_BY_WORKER_TTL_SECONDS: z.coerce.number().default(2592000), + REALTIME_STREAMS_REDIS_HOST: z .string() .optional() diff --git a/apps/webapp/app/runEngine/concerns/queues.server.ts b/apps/webapp/app/runEngine/concerns/queues.server.ts index 4b4298bc935..e0599e0e6c8 100644 --- a/apps/webapp/app/runEngine/concerns/queues.server.ts +++ b/apps/webapp/app/runEngine/concerns/queues.server.ts @@ -17,6 +17,8 @@ import { tryCatch } from "@trigger.dev/core/v3"; import { ServiceValidationError } from "~/v3/services/common.server"; import { createCache, createLRUMemoryStore, DefaultStatefulContext, Namespace } from "@internal/cache"; import { singleton } from "~/utils/singleton"; +import type { TaskMetadataCache, TaskMetadataEntry } from "~/services/taskMetadataCache.server"; +import { taskMetadataCacheInstance } from "~/services/taskMetadataCacheInstance.server"; // LRU cache for environment queue sizes to reduce Redis calls const queueSizeCache = singleton("queueSizeCache", () => { @@ -63,13 +65,16 @@ function extractQueueName(queue: { name?: unknown } | undefined): string | undef export class DefaultQueueManager implements QueueManager { private readonly replicaPrisma: PrismaClientOrTransaction; + private readonly taskMetaCache: TaskMetadataCache; constructor( private readonly prisma: PrismaClientOrTransaction, private readonly engine: RunEngine, - replicaPrisma?: PrismaClientOrTransaction + replicaPrisma?: PrismaClientOrTransaction, + taskMetaCache: TaskMetadataCache = taskMetadataCacheInstance ) { this.replicaPrisma = replicaPrisma ?? prisma; + this.taskMetaCache = taskMetaCache; } async resolveQueueProperties( @@ -87,7 +92,10 @@ export class DefaultQueueManager implements QueueManager { const specifiedQueueName = extractQueueName(request.body.options?.queue); if (specifiedQueueName) { - // A specific queue name is provided, validate it exists for the locked worker + // A specific queue name is provided, validate it exists for the locked worker. + // Pre-existing query — not cached because TaskQueue rows can be added or + // removed independently of BackgroundWorkerTask, and a stale "queue exists" + // claim would silently route to the wrong queue. const specifiedQueue = await this.prisma.taskQueue.findFirst({ where: { name: specifiedQueueName, @@ -107,49 +115,38 @@ export class DefaultQueueManager implements QueueManager { queueName = specifiedQueue.name; lockedQueueId = specifiedQueue.id; - // Always fetch the task so we can resolve `triggerSource` (which - // becomes `taskKind` on annotations and replicates to ClickHouse). - // Without this, AGENT/SCHEDULED runs triggered with - // `lockToVersion` + a queue override would be annotated as - // STANDARD and disappear from the run-list "Source" filter. - // `ttl` is read from the same row but only used when the caller - // didn't specify a per-trigger TTL. - const lockedTask = await this.replicaPrisma.backgroundWorkerTask.findFirst({ - where: { - workerId: lockedBackgroundWorker.id, - runtimeEnvironmentId: request.environment.id, - slug: request.taskId, - }, - select: { ttl: true, triggerSource: true }, - }); + // Pull `triggerSource` (for `taskKind` annotation) and `ttl` from cache. + // On cache hit this is 0 PG queries; on miss the helper falls back to + // a BackgroundWorkerTask lookup and back-fills the cache. + const lockedMeta = await this.resolveLockedTaskMetadata( + lockedBackgroundWorker.id, + request.environment.id, + request.taskId + ); if (request.body.options?.ttl === undefined) { - taskTtl = lockedTask?.ttl; + taskTtl = lockedMeta?.ttl ?? undefined; } - taskKind = lockedTask?.triggerSource; + taskKind = lockedMeta?.triggerSource; } else { - // No queue override - fetch task with queue to get both default queue and TTL - const lockedTask = await this.replicaPrisma.backgroundWorkerTask.findFirst({ - where: { - workerId: lockedBackgroundWorker.id, - runtimeEnvironmentId: request.environment.id, - slug: request.taskId, - }, - include: { - queue: true, - }, - }); + // No queue override - resolve default queue + TTL + triggerSource via cache, + // falling back to a single BackgroundWorkerTask lookup on miss. + const lockedMeta = await this.resolveLockedTaskMetadata( + lockedBackgroundWorker.id, + request.environment.id, + request.taskId + ); - if (!lockedTask) { + if (!lockedMeta) { throw new ServiceValidationError( `Task '${request.taskId}' not found on locked version '${lockedBackgroundWorker.version ?? "" }'.` ); } - taskTtl = lockedTask.ttl; + taskTtl = lockedMeta.ttl; - if (!lockedTask.queue) { + if (!lockedMeta.queueName) { // This case should ideally be prevented by earlier checks or schema constraints, // but handle it defensively. logger.error("Task found on locked version, but has no associated queue record", { @@ -164,9 +161,9 @@ export class DefaultQueueManager implements QueueManager { } // Use the task's default queue name - queueName = lockedTask.queue.name; - lockedQueueId = lockedTask.queue.id; - taskKind = lockedTask.triggerSource; + queueName = lockedMeta.queueName; + lockedQueueId = lockedMeta.queueId ?? undefined; + taskKind = lockedMeta.triggerSource; } } else { // Task is not locked to a specific version, use regular logic @@ -213,76 +210,131 @@ export class DefaultQueueManager implements QueueManager { const defaultQueueName = `task/${taskId}`; - // Even when the caller provides both a queue override and a - // per-trigger TTL, we still need to fetch the task so `triggerSource` - // (which becomes `taskKind` on annotations and replicates to - // ClickHouse) is populated. Without it, AGENT/SCHEDULED runs hitting - // this path get stamped as STANDARD and disappear from the - // dashboard's `Source` filter. Mirrors the locked-worker fix above - // — `taskTtl` is harmless in the returned value because the call - // site coalesces `body.options.ttl ?? taskTtl`. - - // Find the current worker for the environment. Replica is fine here — - // the adjacent `backgroundWorkerTask` lookups below already use - // `replicaPrisma` (replica lag for "just deployed" is bounded the same - // way for both queries; reading the worker from the writer and the - // task from the replica would only widen the inconsistency window). - const worker = await findCurrentWorkerFromEnvironment(environment, this.replicaPrisma); + // Resolve the current worker's task metadata via cache (HGET on warm path, + // BackgroundWorkerTask findFirst + cache back-fill on miss). When this hits, + // both the queue-override + TTL caller and the default-queue caller satisfy + // their full result without any database query. + const meta = await this.resolveCurrentTaskMetadata(environment, taskId); + + if (overriddenQueueName) { + // Caller already named the queue. We only need triggerSource (for taskKind) + // and ttl (for the call site to coalesce against body.options.ttl). + return { + queueName: overriddenQueueName, + taskTtl: meta?.ttl ?? undefined, + taskKind: meta?.triggerSource, + }; + } - if (!worker) { - logger.debug("Failed to get queue name: No worker found", { + if (!meta) { + logger.debug("Failed to get queue name: No worker or task found", { taskId, environmentId: environment.id, }); - - return { queueName: overriddenQueueName ?? defaultQueueName, taskTtl: undefined }; + return { queueName: defaultQueueName, taskTtl: undefined }; } - // When queue is overridden, we only need TTL from the task (no queue join needed) - if (overriddenQueueName) { - const task = await this.replicaPrisma.backgroundWorkerTask.findFirst({ - where: { - workerId: worker.id, - runtimeEnvironmentId: environment.id, - slug: taskId, - }, - select: { ttl: true, triggerSource: true }, + if (!meta.queueName) { + logger.debug("Failed to get queue name: No queue found", { + taskId, + environmentId: environment.id, }); - - return { queueName: overriddenQueueName, taskTtl: task?.ttl, taskKind: task?.triggerSource }; + return { queueName: defaultQueueName, taskTtl: meta.ttl, taskKind: meta.triggerSource }; } - const task = await this.replicaPrisma.backgroundWorkerTask.findFirst({ - where: { - workerId: worker.id, - runtimeEnvironmentId: environment.id, - slug: taskId, - }, - include: { - queue: true, + return { queueName: meta.queueName, taskTtl: meta.ttl, taskKind: meta.triggerSource }; + } + + /** + * Resolve task metadata for a locked-version trigger. Reads from the + * `task-meta:by-worker:{workerId}` Redis hash; falls back to a single + * BackgroundWorkerTask findFirst on miss and back-fills the cache. + * + * Returns null when no BackgroundWorkerTask row exists. + */ + private async resolveLockedTaskMetadata( + workerId: string, + environmentId: string, + slug: string + ): Promise { + const cached = await this.taskMetaCache.getByWorker(workerId, slug); + if (cached) return cached; + + const row = await this.replicaPrisma.backgroundWorkerTask.findFirst({ + where: { workerId, runtimeEnvironmentId: environmentId, slug }, + select: { + ttl: true, + triggerSource: true, + queue: { select: { id: true, name: true } }, }, }); - if (!task) { - console.log("Failed to get queue name: No task found", { - taskId, - environmentId: environment.id, - }); + if (!row) return null; - return { queueName: defaultQueueName, taskTtl: undefined }; - } + const entry: TaskMetadataEntry = { + slug, + ttl: row.ttl, + triggerSource: row.triggerSource, + queueId: row.queue?.id ?? null, + queueName: row.queue?.name ?? "", + }; - if (!task.queue) { - console.log("Failed to get queue name: No queue found", { - taskId, - environmentId: environment.id, - queueConfig: task.queueConfig, - }); + // Fire-and-forget back-fill — `setByWorker` upserts the single field and + // refreshes the hash TTL. Errors are logged inside the cache and swallowed. + this.taskMetaCache.setByWorker(workerId, entry).catch(() => {}); - return { queueName: defaultQueueName, taskTtl: task.ttl, taskKind: task.triggerSource }; - } + return entry; + } + + /** + * Resolve task metadata for a non-locked trigger. Reads from the + * `task-meta:env:{envId}` Redis hash; falls back to + * findCurrentWorkerFromEnvironment + a single BackgroundWorkerTask findFirst + * on miss and back-fills both keyspaces. + * + * Returns null when no current worker or task can be resolved. + */ + private async resolveCurrentTaskMetadata( + environment: AuthenticatedEnvironment, + slug: string + ): Promise { + const cached = await this.taskMetaCache.getCurrent(environment.id, slug); + if (cached) return cached; + + // Cold cache: discover the current worker for the env. Replica is fine — + // the adjacent BackgroundWorkerTask lookup below uses `replicaPrisma` too + // (replica lag for "just deployed" is bounded the same way for both + // queries; reading from the writer here would only widen the window). + const worker = await findCurrentWorkerFromEnvironment(environment, this.replicaPrisma); + if (!worker) return null; + + const row = await this.replicaPrisma.backgroundWorkerTask.findFirst({ + where: { workerId: worker.id, runtimeEnvironmentId: environment.id, slug }, + select: { + ttl: true, + triggerSource: true, + queue: { select: { id: true, name: true } }, + }, + }); + + if (!row) return null; + + const entry: TaskMetadataEntry = { + slug, + ttl: row.ttl, + triggerSource: row.triggerSource, + queueId: row.queue?.id ?? null, + queueName: row.queue?.name ?? "", + }; + + // Back-fill both keyspaces so a subsequent locked-or-not trigger hits the + // cache. `setCurrent` preserves the env hash's existing TTL boundary + // (promotion owns it); `setByWorker` refreshes the by-worker TTL to keep + // active workers warm. + this.taskMetaCache.setCurrent(environment.id, entry).catch(() => {}); + this.taskMetaCache.setByWorker(worker.id, entry).catch(() => {}); - return { queueName: task.queue.name ?? defaultQueueName, taskTtl: task.ttl, taskKind: task.triggerSource }; + return entry; } async validateQueueLimits( diff --git a/apps/webapp/app/services/taskMetadataCache.server.ts b/apps/webapp/app/services/taskMetadataCache.server.ts new file mode 100644 index 00000000000..c77cd6a1e40 --- /dev/null +++ b/apps/webapp/app/services/taskMetadataCache.server.ts @@ -0,0 +1,304 @@ +import type { Redis, Result, Callback } from "ioredis"; +import type { TaskTriggerSource } from "@trigger.dev/database"; +import { logger } from "./logger.server"; + +export type TaskMetadataEntry = { + slug: string; + ttl: string | null; + triggerSource: TaskTriggerSource; + queueId: string | null; + queueName: string; +}; + +export interface TaskMetadataCache { + getCurrent(envId: string, slug: string): Promise; + getByWorker(workerId: string, slug: string): Promise; + populateCurrent(envId: string, entries: TaskMetadataEntry[]): Promise; + populateByWorker(workerId: string, entries: TaskMetadataEntry[]): Promise; + /** Add a single field to the env keyspace without resetting the hash TTL. */ + setCurrent(envId: string, entry: TaskMetadataEntry): Promise; + /** Add a single field to the by-worker keyspace and refresh the hash TTL. */ + setByWorker(workerId: string, entry: TaskMetadataEntry): Promise; + invalidateCurrent(envId: string): Promise; +} + +export type RedisTaskMetadataCacheOptions = { + redis: Redis; + /** Safety TTL on `task-meta:env:{envId}`. Default 24h. Use 0 for no expiry. */ + currentEnvTtlSeconds?: number; + /** Idle TTL on `task-meta:by-worker:{workerId}`. Default 30d. Use 0 for no expiry. */ + byWorkerTtlSeconds?: number; +}; + +type EncodedEntry = { + t: string | null; + k: TaskTriggerSource; + q: string | null; + n: string; +}; + +function encode(entry: TaskMetadataEntry): string { + const payload: EncodedEntry = { + t: entry.ttl, + k: entry.triggerSource, + q: entry.queueId, + n: entry.queueName, + }; + return JSON.stringify(payload); +} + +function decode(slug: string, raw: string): TaskMetadataEntry | null { + try { + const parsed = JSON.parse(raw) as EncodedEntry; + return { + slug, + ttl: parsed.t, + triggerSource: parsed.k, + queueId: parsed.q, + queueName: parsed.n, + }; + } catch (error) { + logger.error("Failed to decode task metadata cache entry", { slug, error }); + return null; + } +} + +function currentEnvKey(envId: string): string { + return `task-meta:env:${envId}`; +} + +function byWorkerKey(workerId: string): string { + return `task-meta:by-worker:${workerId}`; +} + +/** + * Atomically replace a HASH's contents and reset its TTL. + * + * KEYS[1] = hash key + * ARGV[1] = ttl seconds (0 = no TTL) + * ARGV[2..N] = alternating field, value pairs + * + * One round-trip; readers never observe the empty intermediate state that a + * naive DEL + HSET pipeline exposes. + */ +const REPLACE_HASH_LUA = ` +redis.call("DEL", KEYS[1]) +if #ARGV > 1 then + local fv = {} + for i = 2, #ARGV do + fv[#fv + 1] = ARGV[i] + end + redis.call("HSET", KEYS[1], unpack(fv)) +end +local ttl = tonumber(ARGV[1]) +if ttl and ttl > 0 then + redis.call("EXPIRE", KEYS[1], ttl) +end +return 1 +`; + +/** + * Set a single field and refresh the HASH TTL. + * + * KEYS[1] = hash key + * ARGV[1] = ttl seconds (0 = no TTL refresh) + * ARGV[2] = field + * ARGV[3] = value + * + * Used by the by-worker back-fill path — sliding-window expiry keeps active + * workers warm and lets idle workers age out. + */ +const SET_FIELD_REFRESH_TTL_LUA = ` +redis.call("HSET", KEYS[1], ARGV[2], ARGV[3]) +local ttl = tonumber(ARGV[1]) +if ttl and ttl > 0 then + redis.call("EXPIRE", KEYS[1], ttl) +end +return 1 +`; + +/** + * Set a single field and only set the HASH TTL if no TTL is set yet. + * + * KEYS[1] = hash key + * ARGV[1] = ttl seconds (0 = no TTL) + * ARGV[2] = field + * ARGV[3] = value + * + * Used by the env back-fill path — the env keyspace TTL boundary is owned by + * `populateCurrent` (called at promotion). Back-fills shouldn't extend it; if + * a hash already has a TTL, we leave it alone so the safety net still expires + * on schedule. + */ +const SET_FIELD_PRESERVE_TTL_LUA = ` +redis.call("HSET", KEYS[1], ARGV[2], ARGV[3]) +local ttl = tonumber(ARGV[1]) +if ttl and ttl > 0 and redis.call("TTL", KEYS[1]) == -1 then + redis.call("EXPIRE", KEYS[1], ttl) +end +return 1 +`; + +declare module "ioredis" { + interface RedisCommander { + taskMetaReplaceHash( + key: string, + ttlSeconds: string, + ...fieldValues: string[] + ): Result; + taskMetaSetFieldRefreshTtl( + key: string, + ttlSeconds: string, + field: string, + value: string, + callback?: Callback + ): Result; + taskMetaSetFieldPreserveTtl( + key: string, + ttlSeconds: string, + field: string, + value: string, + callback?: Callback + ): Result; + } +} + +export class RedisTaskMetadataCache implements TaskMetadataCache { + private readonly redis: Redis; + private readonly currentEnvTtlSeconds: number; + private readonly byWorkerTtlSeconds: number; + + constructor(options: RedisTaskMetadataCacheOptions) { + this.redis = options.redis; + this.currentEnvTtlSeconds = options.currentEnvTtlSeconds ?? 86400; + this.byWorkerTtlSeconds = options.byWorkerTtlSeconds ?? 30 * 24 * 60 * 60; + + this.redis.defineCommand("taskMetaReplaceHash", { + numberOfKeys: 1, + lua: REPLACE_HASH_LUA, + }); + this.redis.defineCommand("taskMetaSetFieldRefreshTtl", { + numberOfKeys: 1, + lua: SET_FIELD_REFRESH_TTL_LUA, + }); + this.redis.defineCommand("taskMetaSetFieldPreserveTtl", { + numberOfKeys: 1, + lua: SET_FIELD_PRESERVE_TTL_LUA, + }); + } + + async getCurrent(envId: string, slug: string): Promise { + return this.#get(currentEnvKey(envId), slug); + } + + async getByWorker(workerId: string, slug: string): Promise { + return this.#get(byWorkerKey(workerId), slug); + } + + async populateCurrent(envId: string, entries: TaskMetadataEntry[]): Promise { + await this.#replaceHash(currentEnvKey(envId), entries, this.currentEnvTtlSeconds); + } + + async populateByWorker(workerId: string, entries: TaskMetadataEntry[]): Promise { + await this.#replaceHash(byWorkerKey(workerId), entries, this.byWorkerTtlSeconds); + } + + async setCurrent(envId: string, entry: TaskMetadataEntry): Promise { + try { + await this.redis.taskMetaSetFieldPreserveTtl( + currentEnvKey(envId), + String(this.currentEnvTtlSeconds), + entry.slug, + encode(entry) + ); + } catch (error) { + logger.error("Failed to set task metadata current cache field", { + envId, + slug: entry.slug, + error, + }); + } + } + + async setByWorker(workerId: string, entry: TaskMetadataEntry): Promise { + try { + await this.redis.taskMetaSetFieldRefreshTtl( + byWorkerKey(workerId), + String(this.byWorkerTtlSeconds), + entry.slug, + encode(entry) + ); + } catch (error) { + logger.error("Failed to set task metadata by-worker cache field", { + workerId, + slug: entry.slug, + error, + }); + } + } + + async invalidateCurrent(envId: string): Promise { + try { + await this.redis.del(currentEnvKey(envId)); + } catch (error) { + logger.error("Failed to invalidate task metadata current cache", { envId, error }); + } + } + + async #get(key: string, slug: string): Promise { + try { + const raw = await this.redis.hget(key, slug); + if (!raw) return null; + return decode(slug, raw); + } catch (error) { + logger.error("Failed to read task metadata from cache", { key, slug, error }); + return null; + } + } + + async #replaceHash( + key: string, + entries: TaskMetadataEntry[], + ttlSeconds: number + ): Promise { + try { + const argv: string[] = [String(ttlSeconds)]; + for (const entry of entries) { + argv.push(entry.slug, encode(entry)); + } + await this.redis.taskMetaReplaceHash(key, ...argv); + } catch (error) { + logger.error("Failed to replace task metadata cache hash", { key, error }); + } + } +} + +export class NoopTaskMetadataCache implements TaskMetadataCache { + async getCurrent(): Promise { + return null; + } + + async getByWorker(): Promise { + return null; + } + + async populateCurrent(): Promise { + // intentionally empty + } + + async populateByWorker(): Promise { + // intentionally empty + } + + async setCurrent(): Promise { + // intentionally empty + } + + async setByWorker(): Promise { + // intentionally empty + } + + async invalidateCurrent(): Promise { + // intentionally empty + } +} diff --git a/apps/webapp/app/services/taskMetadataCacheInstance.server.ts b/apps/webapp/app/services/taskMetadataCacheInstance.server.ts new file mode 100644 index 00000000000..b673cf122ed --- /dev/null +++ b/apps/webapp/app/services/taskMetadataCacheInstance.server.ts @@ -0,0 +1,38 @@ +import { Redis } from "ioredis"; +import { defaultReconnectOnError } from "@internal/redis"; +import { env } from "~/env.server"; +import { singleton } from "~/utils/singleton"; +import { + NoopTaskMetadataCache, + RedisTaskMetadataCache, + type TaskMetadataCache, +} from "./taskMetadataCache.server"; + +export const taskMetadataCacheInstance: TaskMetadataCache = singleton( + "taskMetadataCacheInstance", + initializeTaskMetadataCache +); + +function initializeTaskMetadataCache(): TaskMetadataCache { + if (!env.TASK_META_CACHE_REDIS_HOST) { + return new NoopTaskMetadataCache(); + } + + const redis = new Redis({ + connectionName: "taskMetadataCache", + host: env.TASK_META_CACHE_REDIS_HOST, + port: env.TASK_META_CACHE_REDIS_PORT, + username: env.TASK_META_CACHE_REDIS_USERNAME, + password: env.TASK_META_CACHE_REDIS_PASSWORD, + keyPrefix: "tr:", + enableAutoPipelining: true, + reconnectOnError: defaultReconnectOnError, + ...(env.TASK_META_CACHE_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }), + }); + + return new RedisTaskMetadataCache({ + redis, + currentEnvTtlSeconds: env.TASK_META_CACHE_CURRENT_ENV_TTL_SECONDS, + byWorkerTtlSeconds: env.TASK_META_CACHE_BY_WORKER_TTL_SECONDS, + }); +} diff --git a/apps/webapp/app/services/taskMetadataSync.server.ts b/apps/webapp/app/services/taskMetadataSync.server.ts new file mode 100644 index 00000000000..882202ea5de --- /dev/null +++ b/apps/webapp/app/services/taskMetadataSync.server.ts @@ -0,0 +1,22 @@ +import { logger } from "./logger.server"; +import { taskMetadataCacheInstance } from "./taskMetadataCacheInstance.server"; +import type { TaskMetadataCache, TaskMetadataEntry } from "./taskMetadataCache.server"; + +export async function syncTaskMetadataCache( + envId: string, + workerId: string, + isCurrent: boolean, + entries: TaskMetadataEntry[], + cache: TaskMetadataCache = taskMetadataCacheInstance +): Promise { + if (entries.length === 0) return; + + try { + await cache.populateByWorker(workerId, entries); + if (isCurrent) { + await cache.populateCurrent(envId, entries); + } + } catch (error) { + logger.error("Failed to sync task metadata cache", { envId, workerId, error }); + } +} diff --git a/apps/webapp/app/v3/services/changeCurrentDeployment.server.ts b/apps/webapp/app/v3/services/changeCurrentDeployment.server.ts index 288374ec61a..15a2824dd0c 100644 --- a/apps/webapp/app/v3/services/changeCurrentDeployment.server.ts +++ b/apps/webapp/app/v3/services/changeCurrentDeployment.server.ts @@ -1,8 +1,14 @@ import { BackgroundWorkerMetadata, tryCatch } from "@trigger.dev/core/v3"; import { CURRENT_DEPLOYMENT_LABEL } from "@trigger.dev/core/v3/isomorphic"; -import { WorkerDeployment } from "@trigger.dev/database"; +import { PrismaClientOrTransaction, WorkerDeployment } from "@trigger.dev/database"; import { logger } from "~/services/logger.server"; import { syncTaskIdentifiers } from "~/services/taskIdentifierRegistry.server"; +import { + type TaskMetadataCache, + type TaskMetadataEntry, +} from "~/services/taskMetadataCache.server"; +import { taskMetadataCacheInstance } from "~/services/taskMetadataCacheInstance.server"; +import { syncTaskMetadataCache } from "~/services/taskMetadataSync.server"; import { BaseService, ServiceValidationError } from "./baseService.server"; import { syncDeclarativeSchedules } from "./createBackgroundWorker.server"; import { ExecuteTasksWaitingForDeployService } from "./executeTasksWaitingForDeploy"; @@ -11,6 +17,17 @@ import { compareDeploymentVersions } from "../utils/deploymentVersions"; export type ChangeCurrentDeploymentDirection = "promote" | "rollback"; export class ChangeCurrentDeploymentService extends BaseService { + private readonly _taskMetaCache: TaskMetadataCache; + + constructor( + prisma?: PrismaClientOrTransaction, + replica?: PrismaClientOrTransaction, + taskMetaCache: TaskMetadataCache = taskMetadataCacheInstance + ) { + super(prisma, replica); + this._taskMetaCache = taskMetaCache; + } + public async call( deployment: WorkerDeployment, direction: ChangeCurrentDeploymentDirection, @@ -96,23 +113,68 @@ export class ChangeCurrentDeploymentService extends BaseService { }, }); - const [syncError] = await tryCatch( - (async () => { - const tasks = await this._prisma.backgroundWorkerTask.findMany({ - where: { workerId: deployment.workerId! }, - select: { slug: true, triggerSource: true }, - }); - await syncTaskIdentifiers( + const [fetchTasksError, tasks] = await tryCatch( + this._prisma.backgroundWorkerTask.findMany({ + where: { workerId: deployment.workerId! }, + select: { + slug: true, + triggerSource: true, + ttl: true, + queue: { select: { id: true, name: true } }, + }, + }) + ); + + if (fetchTasksError) { + logger.error("Error fetching worker tasks on deployment change", { + error: fetchTasksError, + }); + } + + if (tasks) { + // Side effect 1: refresh the `TaskIdentifier` table and the existing + // `tids:` Redis cache so the task-listing UI reflects the new deploy. + const [syncIdentifiersError] = await tryCatch( + syncTaskIdentifiers( deployment.environmentId, deployment.projectId, deployment.workerId!, tasks.map((t) => ({ id: t.slug, triggerSource: t.triggerSource })) - ); - })() - ); + ) + ); + + if (syncIdentifiersError) { + logger.error("Error syncing task identifiers on deployment change", { + error: syncIdentifiersError, + }); + } - if (syncError) { - logger.error("Error syncing task identifiers on deployment change", { error: syncError }); + // Side effect 2: refresh the `task-meta:` cache that the queue resolver + // reads from. Independent of side effect 1 — if `syncTaskIdentifiers` + // throws, the queue resolver still gets a warm cache for the new worker. + const metadataEntries: TaskMetadataEntry[] = tasks.map((t) => ({ + slug: t.slug, + ttl: t.ttl, + triggerSource: t.triggerSource, + queueId: t.queue?.id ?? null, + queueName: t.queue?.name ?? "", + })); + + const [metaError] = await tryCatch( + syncTaskMetadataCache( + deployment.environmentId, + deployment.workerId!, + true, + metadataEntries, + this._taskMetaCache + ) + ); + + if (metaError) { + logger.error("Error syncing task metadata cache on deployment change", { + error: metaError, + }); + } } const [scheduleSyncError] = await tryCatch(this.#syncSchedulesForDeployment(deployment)); diff --git a/apps/webapp/app/v3/services/createBackgroundWorker.server.ts b/apps/webapp/app/v3/services/createBackgroundWorker.server.ts index da79e386afb..4b8fdee4ca4 100644 --- a/apps/webapp/app/v3/services/createBackgroundWorker.server.ts +++ b/apps/webapp/app/v3/services/createBackgroundWorker.server.ts @@ -14,6 +14,12 @@ import { sanitizeQueueName } from "~/models/taskQueue.server"; import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; import { logger } from "~/services/logger.server"; import { syncTaskIdentifiers } from "~/services/taskIdentifierRegistry.server"; +import { + type TaskMetadataCache, + type TaskMetadataEntry, +} from "~/services/taskMetadataCache.server"; +import { taskMetadataCacheInstance } from "~/services/taskMetadataCacheInstance.server"; +import { syncTaskMetadataCache } from "~/services/taskMetadataSync.server"; import { generateFriendlyId } from "../friendlyIdentifiers"; import { removeQueueConcurrencyLimits, @@ -56,6 +62,17 @@ export function stripBackgroundWorkerMetadataForStorage( } export class CreateBackgroundWorkerService extends BaseService { + private readonly _taskMetaCache: TaskMetadataCache; + + constructor( + prisma?: PrismaClientOrTransaction, + replica?: PrismaClientOrTransaction, + taskMetaCache: TaskMetadataCache = taskMetadataCacheInstance + ) { + super(prisma, replica); + this._taskMetaCache = taskMetaCache; + } + public async call( projectRef: string, environment: AuthenticatedEnvironment, @@ -147,7 +164,7 @@ export class CreateBackgroundWorkerService extends BaseService { throw new ServiceValidationError("Error creating background worker files"); } - const [resourcesError] = await tryCatch( + const [resourcesError, workerTaskEntries] = await tryCatch( createWorkerResources( body.metadata, backgroundWorker, @@ -212,6 +229,21 @@ export class CreateBackgroundWorkerService extends BaseService { }); } + // Populate task metadata cache. DEV workers are always "current" because + // `findCurrentWorkerFromEnvironment` resolves DEV current as the latest + // worker by createdAt. Non-DEV (deploy-built) workers are not promoted + // here — promotion writes the `:env:` keyspace later in + // changeCurrentDeployment / createDeploymentBackgroundWorkerV3. + if (workerTaskEntries && workerTaskEntries.length > 0) { + await syncTaskMetadataCache( + environment.id, + backgroundWorker.id, + environment.type === "DEVELOPMENT", + workerTaskEntries, + this._taskMetaCache + ); + } + const [updateConcurrencyLimitsError] = await tryCatch( updateEnvConcurrencyLimits(environment) ); @@ -265,17 +297,26 @@ export async function createWorkerResources( environment: AuthenticatedEnvironment, prisma: PrismaClientOrTransaction, tasksToBackgroundFiles?: Map -) { +): Promise { // Create the queues const queues = await createWorkerQueues(metadata, worker, environment, prisma); // Create the tasks - await createWorkerTasks(metadata, queues, worker, environment, prisma, tasksToBackgroundFiles); + const taskEntries = await createWorkerTasks( + metadata, + queues, + worker, + environment, + prisma, + tasksToBackgroundFiles + ); // Register prompts if (metadata.prompts && metadata.prompts.length > 0) { await createWorkerPrompts(metadata.prompts, worker, environment, prisma); } + + return taskEntries; } async function createWorkerTasks( @@ -285,17 +326,22 @@ async function createWorkerTasks( environment: AuthenticatedEnvironment, prisma: PrismaClientOrTransaction, tasksToBackgroundFiles?: Map -) { +): Promise { // Create tasks in chunks of 20 const CHUNK_SIZE = 20; + const entries: TaskMetadataEntry[] = []; for (let i = 0; i < metadata.tasks.length; i += CHUNK_SIZE) { const chunk = metadata.tasks.slice(i, i + CHUNK_SIZE); - await Promise.all( + const chunkEntries = await Promise.all( chunk.map((task) => createWorkerTask(task, queues, worker, environment, prisma, tasksToBackgroundFiles) ) ); + for (const entry of chunkEntries) { + if (entry) entries.push(entry); + } } + return entries; } async function createWorkerTask( @@ -305,7 +351,7 @@ async function createWorkerTask( environment: AuthenticatedEnvironment, prisma: PrismaClientOrTransaction, tasksToBackgroundFiles?: Map -) { +): Promise { try { let queue = queues.find((queue) => queue.name === task.queue?.name); @@ -331,6 +377,9 @@ async function createWorkerTask( ? ("AGENT" as const) : ("STANDARD" as const); + const resolvedTtl = + typeof task.ttl === "number" ? stringifyDuration(task.ttl) ?? null : task.ttl ?? null; + await prisma.backgroundWorkerTask.create({ data: { friendlyId: generateFriendlyId("task"), @@ -348,12 +397,19 @@ async function createWorkerTask( config: task.agentConfig ? (task.agentConfig as any) : undefined, fileId: tasksToBackgroundFiles?.get(task.id) ?? null, maxDurationInSeconds: task.maxDuration ? clampMaxDuration(task.maxDuration) : null, - ttl: - typeof task.ttl === "number" ? stringifyDuration(task.ttl) ?? null : task.ttl ?? null, + ttl: resolvedTtl, queueId: queue.id, payloadSchema: task.payloadSchema as any, }, }); + + return { + slug: task.id, + ttl: resolvedTtl, + triggerSource: resolvedTriggerSource, + queueId: queue.id, + queueName: queue.name, + }; } catch (error) { if (error instanceof Prisma.PrismaClientKnownRequestError) { // The error code for unique constraint violation in Prisma is P2002 @@ -389,6 +445,7 @@ async function createWorkerTask( worker, }); } + return null; } } diff --git a/apps/webapp/app/v3/services/createDeploymentBackgroundWorkerV3.server.ts b/apps/webapp/app/v3/services/createDeploymentBackgroundWorkerV3.server.ts index f20a024d2d6..81542ddc057 100644 --- a/apps/webapp/app/v3/services/createDeploymentBackgroundWorkerV3.server.ts +++ b/apps/webapp/app/v3/services/createDeploymentBackgroundWorkerV3.server.ts @@ -1,8 +1,11 @@ import { CreateBackgroundWorkerRequestBody, tryCatch } from "@trigger.dev/core/v3"; -import type { BackgroundWorker } from "@trigger.dev/database"; +import type { BackgroundWorker, PrismaClientOrTransaction } from "@trigger.dev/database"; import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; import { logger } from "~/services/logger.server"; import { syncTaskIdentifiers } from "~/services/taskIdentifierRegistry.server"; +import { type TaskMetadataCache } from "~/services/taskMetadataCache.server"; +import { taskMetadataCacheInstance } from "~/services/taskMetadataCacheInstance.server"; +import { syncTaskMetadataCache } from "~/services/taskMetadataSync.server"; import { socketIo } from "../handleSocketIo.server"; import { updateEnvConcurrencyLimits } from "../runQueue.server"; import { PerformDeploymentAlertsService } from "./alerts/performDeploymentAlerts.server"; @@ -24,6 +27,17 @@ import { CURRENT_DEPLOYMENT_LABEL, BackgroundWorkerId } from "@trigger.dev/core/ * @deprecated */ export class CreateDeploymentBackgroundWorkerServiceV3 extends BaseService { + private readonly _taskMetaCache: TaskMetadataCache; + + constructor( + prisma?: PrismaClientOrTransaction, + replica?: PrismaClientOrTransaction, + taskMetaCache: TaskMetadataCache = taskMetadataCacheInstance + ) { + super(prisma, replica); + this._taskMetaCache = taskMetaCache; + } + public async call( projectRef: string, environment: AuthenticatedEnvironment, @@ -74,8 +88,14 @@ export class CreateDeploymentBackgroundWorkerServiceV3 extends BaseService { }); } + let workerTaskEntries: Awaited> = []; try { - await createWorkerResources(body.metadata, backgroundWorker, environment, this._prisma); + workerTaskEntries = await createWorkerResources( + body.metadata, + backgroundWorker, + environment, + this._prisma + ); await syncDeclarativeSchedules( body.metadata.tasks, backgroundWorker, @@ -147,6 +167,18 @@ export class CreateDeploymentBackgroundWorkerServiceV3 extends BaseService { logger.error("Error syncing task identifiers", { error: syncIdError }); } + // V3 promotes the deployment immediately above, so this worker is now + // current for the env — write both keyspaces. + if (workerTaskEntries.length > 0) { + await syncTaskMetadataCache( + environment.id, + backgroundWorker.id, + true, + workerTaskEntries, + this._taskMetaCache + ); + } + try { //send a notification that a new worker has been created await projectPubSub.publish( diff --git a/apps/webapp/app/v3/services/createDeploymentBackgroundWorkerV4.server.ts b/apps/webapp/app/v3/services/createDeploymentBackgroundWorkerV4.server.ts index f764d39dc7b..a31cf342762 100644 --- a/apps/webapp/app/v3/services/createDeploymentBackgroundWorkerV4.server.ts +++ b/apps/webapp/app/v3/services/createDeploymentBackgroundWorkerV4.server.ts @@ -1,7 +1,10 @@ import { CreateBackgroundWorkerRequestBody, logger, tryCatch } from "@trigger.dev/core/v3"; import { BackgroundWorkerId } from "@trigger.dev/core/v3/isomorphic"; -import type { BackgroundWorker, WorkerDeployment } from "@trigger.dev/database"; +import type { BackgroundWorker, PrismaClientOrTransaction, WorkerDeployment } from "@trigger.dev/database"; import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; +import { type TaskMetadataCache } from "~/services/taskMetadataCache.server"; +import { taskMetadataCacheInstance } from "~/services/taskMetadataCacheInstance.server"; +import { syncTaskMetadataCache } from "~/services/taskMetadataSync.server"; import { BaseService, ServiceValidationError } from "./baseService.server"; import { createBackgroundFiles, @@ -13,6 +16,17 @@ import { TimeoutDeploymentService } from "./timeoutDeployment.server"; import { env } from "~/env.server"; export class CreateDeploymentBackgroundWorkerServiceV4 extends BaseService { + private readonly _taskMetaCache: TaskMetadataCache; + + constructor( + prisma?: PrismaClientOrTransaction, + replica?: PrismaClientOrTransaction, + taskMetaCache: TaskMetadataCache = taskMetadataCacheInstance + ) { + super(prisma, replica); + this._taskMetaCache = taskMetaCache; + } + public async call( environment: AuthenticatedEnvironment, deploymentId: string, @@ -110,7 +124,7 @@ export class CreateDeploymentBackgroundWorkerServiceV4 extends BaseService { throw serviceError; } - const [resourcesError] = await tryCatch( + const [resourcesError, workerTaskEntries] = await tryCatch( createWorkerResources( body.metadata, backgroundWorker, @@ -134,6 +148,20 @@ export class CreateDeploymentBackgroundWorkerServiceV4 extends BaseService { throw serviceError; } + // V4 build path: worker created but NOT yet promoted to current. Write + // only the `task-meta:by-worker:{workerId}` keyspace so locked-version + // triggers against this build hit the cache. Promotion (which writes the + // env keyspace) happens later via finalizeDeployment → changeCurrentDeployment. + if (workerTaskEntries && workerTaskEntries.length > 0) { + await syncTaskMetadataCache( + environment.id, + backgroundWorker.id, + false, + workerTaskEntries, + this._taskMetaCache + ); + } + const [schedulesError] = await tryCatch( syncDeclarativeSchedules(body.metadata.tasks, backgroundWorker, environment, this._prisma) ); diff --git a/apps/webapp/test/engine/triggerTask.test.ts b/apps/webapp/test/engine/triggerTask.test.ts index ddceb8754c1..f7c487d4dd4 100644 --- a/apps/webapp/test/engine/triggerTask.test.ts +++ b/apps/webapp/test/engine/triggerTask.test.ts @@ -20,8 +20,11 @@ import { assertNonNullable, containerTest } from "@internal/testcontainers"; import { trace } from "@opentelemetry/api"; import { IOPacket } from "@trigger.dev/core/v3"; import { TaskRun } from "@trigger.dev/database"; +import { Redis } from "ioredis"; import { IdempotencyKeyConcern } from "~/runEngine/concerns/idempotencyKeys.server"; import { DefaultQueueManager } from "~/runEngine/concerns/queues.server"; +import { RedisTaskMetadataCache } from "~/services/taskMetadataCache.server"; +import { ChangeCurrentDeploymentService } from "~/v3/services/changeCurrentDeployment.server"; import { EntitlementValidationParams, MaxAttemptsValidationParams, @@ -1173,3 +1176,339 @@ describe("RunEngineTriggerTaskService", () => { } ); }); + +describe("DefaultQueueManager task metadata cache", () => { + containerTest( + "warm cache returns metadata without falling through to PG", + async ({ prisma, redisOptions }) => { + const engine = new RunEngine({ + prisma, + worker: { redis: redisOptions, workers: 1, tasksPerWorker: 10, pollIntervalMs: 100 }, + queue: { redis: redisOptions }, + runLock: { redis: redisOptions }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { name: "small-1x", cpu: 0.5, memory: 0.5, centsPerMs: 0.0001 }, + }, + baseCostInCents: 0.0005, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + const environment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + const taskIdentifier = "cached-task"; + await setupBackgroundWorker(engine, environment, taskIdentifier); + + const redis = new Redis(redisOptions); + const cache = new RedisTaskMetadataCache({ redis }); + + // Pre-populate cache with AGENT triggerSource; DB row has the default STANDARD. + // If the read path hits the cache, the resulting TaskRun.taskKind reflects the + // cached value. If it falls through to PG, it reflects STANDARD. + await cache.populateCurrent(environment.id, [ + { + slug: taskIdentifier, + ttl: null, + triggerSource: "AGENT", + queueId: null, + queueName: `task/${taskIdentifier}`, + }, + ]); + + const queuesManager = new DefaultQueueManager(prisma, engine, undefined, cache); + const triggerTaskService = new RunEngineTriggerTaskService({ + engine, + prisma, + payloadProcessor: new MockPayloadProcessor(), + queueConcern: queuesManager, + idempotencyKeyConcern: new IdempotencyKeyConcern(prisma, engine, new MockTraceEventConcern()), + validator: new MockTriggerTaskValidator(), + traceEventConcern: new MockTraceEventConcern(), + tracer: trace.getTracer("test", "0.0.0"), + metadataMaximumSize: 1024 * 1024, + }); + + const result = await triggerTaskService.call({ + taskId: taskIdentifier, + environment, + body: { payload: { test: "x" } }, + }); + + assertNonNullable(result); + expect(result.run.taskIdentifier).toBe(taskIdentifier); + expect((result.run.annotations as { taskKind?: string } | null)?.taskKind).toBe("AGENT"); + + await redis.quit(); + await engine.quit(); + } + ); + + containerTest( + "cache miss falls through to PG and back-fills the cache", + async ({ prisma, redisOptions }) => { + const engine = new RunEngine({ + prisma, + worker: { redis: redisOptions, workers: 1, tasksPerWorker: 10, pollIntervalMs: 100 }, + queue: { redis: redisOptions }, + runLock: { redis: redisOptions }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { name: "small-1x", cpu: 0.5, memory: 0.5, centsPerMs: 0.0001 }, + }, + baseCostInCents: 0.0005, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + const environment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + const taskIdentifier = "miss-task"; + await setupBackgroundWorker(engine, environment, taskIdentifier); + + const redis = new Redis(redisOptions); + const cache = new RedisTaskMetadataCache({ redis }); + + // Cache starts empty. Sanity-check both keyspaces. + expect(await cache.getCurrent(environment.id, taskIdentifier)).toBeNull(); + + const queuesManager = new DefaultQueueManager(prisma, engine, undefined, cache); + const triggerTaskService = new RunEngineTriggerTaskService({ + engine, + prisma, + payloadProcessor: new MockPayloadProcessor(), + queueConcern: queuesManager, + idempotencyKeyConcern: new IdempotencyKeyConcern(prisma, engine, new MockTraceEventConcern()), + validator: new MockTriggerTaskValidator(), + traceEventConcern: new MockTraceEventConcern(), + tracer: trace.getTracer("test", "0.0.0"), + metadataMaximumSize: 1024 * 1024, + }); + + const result = await triggerTaskService.call({ + taskId: taskIdentifier, + environment, + body: { payload: { test: "x" } }, + }); + + assertNonNullable(result); + expect((result.run.annotations as { taskKind?: string } | null)?.taskKind).toBe("STANDARD"); + + // Back-fill is fire-and-forget; allow a turn of the event loop for it to land. + await setTimeout(50); + + const backfilled = await cache.getCurrent(environment.id, taskIdentifier); + expect(backfilled).not.toBeNull(); + expect(backfilled?.triggerSource).toBe("STANDARD"); + expect(backfilled?.queueName).toBe(`task/${taskIdentifier}`); + + await redis.quit(); + await engine.quit(); + } + ); + + containerTest( + "queue-override + ttl path returns taskKind from cache without a BWT lookup", + async ({ prisma, redisOptions }) => { + const engine = new RunEngine({ + prisma, + worker: { redis: redisOptions, workers: 1, tasksPerWorker: 10, pollIntervalMs: 100 }, + queue: { redis: redisOptions }, + runLock: { redis: redisOptions }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { name: "small-1x", cpu: 0.5, memory: 0.5, centsPerMs: 0.0001 }, + }, + baseCostInCents: 0.0005, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + const environment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + const taskIdentifier = "override-task"; + await setupBackgroundWorker(engine, environment, taskIdentifier); + + const redis = new Redis(redisOptions); + const cache = new RedisTaskMetadataCache({ redis }); + + // Cache says AGENT; DB row says STANDARD. Caller provides both a queue + // override and an explicit TTL — the hot path the PR regressed. + await cache.populateCurrent(environment.id, [ + { + slug: taskIdentifier, + ttl: null, + triggerSource: "AGENT", + queueId: null, + queueName: `task/${taskIdentifier}`, + }, + ]); + + const queuesManager = new DefaultQueueManager(prisma, engine, undefined, cache); + const triggerTaskService = new RunEngineTriggerTaskService({ + engine, + prisma, + payloadProcessor: new MockPayloadProcessor(), + queueConcern: queuesManager, + idempotencyKeyConcern: new IdempotencyKeyConcern(prisma, engine, new MockTraceEventConcern()), + validator: new MockTriggerTaskValidator(), + traceEventConcern: new MockTraceEventConcern(), + tracer: trace.getTracer("test", "0.0.0"), + metadataMaximumSize: 1024 * 1024, + }); + + const result = await triggerTaskService.call({ + taskId: taskIdentifier, + environment, + body: { + payload: { test: "x" }, + options: { + queue: { name: "caller-queue" }, + ttl: "5m", + }, + }, + }); + + assertNonNullable(result); + expect(result.run.queue).toBe("caller-queue"); + expect((result.run.annotations as { taskKind?: string } | null)?.taskKind).toBe("AGENT"); + + await redis.quit(); + await engine.quit(); + } + ); + + containerTest( + "locked-version trigger reads from by-worker keyspace, not env keyspace", + async ({ prisma, redisOptions }) => { + const engine = new RunEngine({ + prisma, + worker: { redis: redisOptions, workers: 1, tasksPerWorker: 10, pollIntervalMs: 100 }, + queue: { redis: redisOptions }, + runLock: { redis: redisOptions }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { name: "small-1x", cpu: 0.5, memory: 0.5, centsPerMs: 0.0001 }, + }, + baseCostInCents: 0.0005, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + const environment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + const taskIdentifier = "keyspace-task"; + const worker = await setupBackgroundWorker(engine, environment, taskIdentifier); + + const redis = new Redis(redisOptions); + const cache = new RedisTaskMetadataCache({ redis }); + + // Populate the two keyspaces with conflicting triggerSource values so we + // can tell which keyspace the read used. + await cache.populateByWorker(worker.worker.id, [ + { + slug: taskIdentifier, + ttl: null, + triggerSource: "AGENT", + queueId: null, + queueName: `task/${taskIdentifier}`, + }, + ]); + await cache.populateCurrent(environment.id, [ + { + slug: taskIdentifier, + ttl: null, + triggerSource: "SCHEDULED", + queueId: null, + queueName: `task/${taskIdentifier}`, + }, + ]); + + const queuesManager = new DefaultQueueManager(prisma, engine, undefined, cache); + const triggerTaskService = new RunEngineTriggerTaskService({ + engine, + prisma, + payloadProcessor: new MockPayloadProcessor(), + queueConcern: queuesManager, + idempotencyKeyConcern: new IdempotencyKeyConcern(prisma, engine, new MockTraceEventConcern()), + validator: new MockTriggerTaskValidator(), + traceEventConcern: new MockTraceEventConcern(), + tracer: trace.getTracer("test", "0.0.0"), + metadataMaximumSize: 1024 * 1024, + }); + + // Locked → by-worker keyspace → AGENT + const locked = await triggerTaskService.call({ + taskId: taskIdentifier, + environment, + body: { + payload: { test: "x" }, + options: { lockToVersion: worker.worker.version }, + }, + }); + assertNonNullable(locked); + expect((locked.run.annotations as { taskKind?: string } | null)?.taskKind).toBe("AGENT"); + + // Not locked → env keyspace → SCHEDULED + const current = await triggerTaskService.call({ + taskId: taskIdentifier, + environment, + body: { payload: { test: "y" } }, + }); + assertNonNullable(current); + expect((current.run.annotations as { taskKind?: string } | null)?.taskKind).toBe("SCHEDULED"); + + await redis.quit(); + await engine.quit(); + } + ); + + containerTest( + "ChangeCurrentDeploymentService promotes the env cache to the new worker", + async ({ prisma, redisOptions }) => { + const engine = new RunEngine({ + prisma, + worker: { redis: redisOptions, workers: 1, tasksPerWorker: 10, pollIntervalMs: 100 }, + queue: { redis: redisOptions }, + runLock: { redis: redisOptions }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { name: "small-1x", cpu: 0.5, memory: 0.5, centsPerMs: 0.0001 }, + }, + baseCostInCents: 0.0005, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + const environment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + const taskIdentifier = "promotion-task"; + + // Worker A → setupBackgroundWorker auto-creates a deployment + promotes it. + const workerA = await setupBackgroundWorker(engine, environment, taskIdentifier); + // Worker B → setupBackgroundWorker overrides the promotion to point at B. + const workerB = await setupBackgroundWorker(engine, environment, taskIdentifier); + + assertNonNullable(workerA.deployment); + assertNonNullable(workerB.deployment); + + const redis = new Redis(redisOptions); + const cache = new RedisTaskMetadataCache({ redis }); + + // Manually rollback to A to exercise the cache-write side effect. + const service = new ChangeCurrentDeploymentService(prisma, undefined, cache); + await service.call(workerA.deployment, "rollback", true /* disableVersionCheck */); + + // Allow the awaited cache write to settle. + const entry = await cache.getCurrent(environment.id, taskIdentifier); + expect(entry).not.toBeNull(); + // by-worker keyspace also written by syncTaskMetadataCache(isCurrent=true) + const byWorkerEntry = await cache.getByWorker(workerA.worker.id, taskIdentifier); + expect(byWorkerEntry).not.toBeNull(); + expect(byWorkerEntry?.queueName).toBe(`task/${taskIdentifier}`); + + await redis.quit(); + await engine.quit(); + } + ); +}); From 1a06ef6407d8af9d047e7029757632ef1ffb0451 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Fri, 15 May 2026 06:54:21 +0100 Subject: [PATCH 2/7] fix(webapp): address CodeRabbit review on the task metadata cache MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - TaskMetadataCache: interface → type alias, matching the repo's 'prefer type unless extending' standard. - syncTaskMetadataCache calls in createBackgroundWorker, createDeploymentBackgroundWorkerV3, and createDeploymentBackgroundWorkerV4 are now wrapped in tryCatch and log on failure. Matches the pattern in changeCurrentDeployment so a Redis blip can't strand a successful worker build or deploy promotion. - Back-fill assertion in 'cache miss falls through to PG' now polls with a bounded timeout instead of a fixed sleep, removing the CI flake surface. - Promotion assertion in 'ChangeCurrentDeploymentService promotes the env cache' now pre-clears the by-worker + env keys and asserts they're null before the call, so the test proves the service did the write. --- .../app/services/taskMetadataCache.server.ts | 4 +-- .../services/createBackgroundWorker.server.ts | 24 ++++++++++++----- ...eateDeploymentBackgroundWorkerV3.server.ts | 26 ++++++++++++++----- ...eateDeploymentBackgroundWorkerV4.server.ts | 25 +++++++++++++----- apps/webapp/test/engine/triggerTask.test.ts | 22 +++++++++++----- 5 files changed, 74 insertions(+), 27 deletions(-) diff --git a/apps/webapp/app/services/taskMetadataCache.server.ts b/apps/webapp/app/services/taskMetadataCache.server.ts index c77cd6a1e40..1e98ae1e106 100644 --- a/apps/webapp/app/services/taskMetadataCache.server.ts +++ b/apps/webapp/app/services/taskMetadataCache.server.ts @@ -10,7 +10,7 @@ export type TaskMetadataEntry = { queueName: string; }; -export interface TaskMetadataCache { +export type TaskMetadataCache = { getCurrent(envId: string, slug: string): Promise; getByWorker(workerId: string, slug: string): Promise; populateCurrent(envId: string, entries: TaskMetadataEntry[]): Promise; @@ -20,7 +20,7 @@ export interface TaskMetadataCache { /** Add a single field to the by-worker keyspace and refresh the hash TTL. */ setByWorker(workerId: string, entry: TaskMetadataEntry): Promise; invalidateCurrent(envId: string): Promise; -} +}; export type RedisTaskMetadataCacheOptions = { redis: Redis; diff --git a/apps/webapp/app/v3/services/createBackgroundWorker.server.ts b/apps/webapp/app/v3/services/createBackgroundWorker.server.ts index 4b8fdee4ca4..9035a2d4f84 100644 --- a/apps/webapp/app/v3/services/createBackgroundWorker.server.ts +++ b/apps/webapp/app/v3/services/createBackgroundWorker.server.ts @@ -234,14 +234,26 @@ export class CreateBackgroundWorkerService extends BaseService { // worker by createdAt. Non-DEV (deploy-built) workers are not promoted // here — promotion writes the `:env:` keyspace later in // changeCurrentDeployment / createDeploymentBackgroundWorkerV3. + // Wrap in tryCatch so a Redis blip can't break the post-cache side + // effects below. if (workerTaskEntries && workerTaskEntries.length > 0) { - await syncTaskMetadataCache( - environment.id, - backgroundWorker.id, - environment.type === "DEVELOPMENT", - workerTaskEntries, - this._taskMetaCache + const [metaCacheError] = await tryCatch( + syncTaskMetadataCache( + environment.id, + backgroundWorker.id, + environment.type === "DEVELOPMENT", + workerTaskEntries, + this._taskMetaCache + ) ); + + if (metaCacheError) { + logger.error("Error syncing task metadata cache", { + error: metaCacheError, + backgroundWorker, + environment, + }); + } } const [updateConcurrencyLimitsError] = await tryCatch( diff --git a/apps/webapp/app/v3/services/createDeploymentBackgroundWorkerV3.server.ts b/apps/webapp/app/v3/services/createDeploymentBackgroundWorkerV3.server.ts index 81542ddc057..2a122872887 100644 --- a/apps/webapp/app/v3/services/createDeploymentBackgroundWorkerV3.server.ts +++ b/apps/webapp/app/v3/services/createDeploymentBackgroundWorkerV3.server.ts @@ -168,15 +168,27 @@ export class CreateDeploymentBackgroundWorkerServiceV3 extends BaseService { } // V3 promotes the deployment immediately above, so this worker is now - // current for the env — write both keyspaces. + // current for the env — write both keyspaces. Wrap in tryCatch so a + // Redis blip can't strand the post-cache side effects below (waiting + // runs flush, alerts, timeout cancellation). if (workerTaskEntries.length > 0) { - await syncTaskMetadataCache( - environment.id, - backgroundWorker.id, - true, - workerTaskEntries, - this._taskMetaCache + const [metaCacheError] = await tryCatch( + syncTaskMetadataCache( + environment.id, + backgroundWorker.id, + true, + workerTaskEntries, + this._taskMetaCache + ) ); + + if (metaCacheError) { + logger.error("Error syncing task metadata cache on deployment", { + error: metaCacheError, + deploymentId: deployment.id, + workerId: backgroundWorker.id, + }); + } } try { diff --git a/apps/webapp/app/v3/services/createDeploymentBackgroundWorkerV4.server.ts b/apps/webapp/app/v3/services/createDeploymentBackgroundWorkerV4.server.ts index a31cf342762..23c8651e4ca 100644 --- a/apps/webapp/app/v3/services/createDeploymentBackgroundWorkerV4.server.ts +++ b/apps/webapp/app/v3/services/createDeploymentBackgroundWorkerV4.server.ts @@ -152,14 +152,27 @@ export class CreateDeploymentBackgroundWorkerServiceV4 extends BaseService { // only the `task-meta:by-worker:{workerId}` keyspace so locked-version // triggers against this build hit the cache. Promotion (which writes the // env keyspace) happens later via finalizeDeployment → changeCurrentDeployment. + // Wrap in tryCatch so a Redis blip can't break the deployment state + // machine — without this the build would stall in BUILDING and only + // surface as a misleading "Indexing timed out". if (workerTaskEntries && workerTaskEntries.length > 0) { - await syncTaskMetadataCache( - environment.id, - backgroundWorker.id, - false, - workerTaskEntries, - this._taskMetaCache + const [metaCacheError] = await tryCatch( + syncTaskMetadataCache( + environment.id, + backgroundWorker.id, + false, + workerTaskEntries, + this._taskMetaCache + ) ); + + if (metaCacheError) { + logger.error("Error syncing task metadata cache on build", { + error: metaCacheError, + deploymentId: deployment.id, + workerId: backgroundWorker.id, + }); + } } const [schedulesError] = await tryCatch( diff --git a/apps/webapp/test/engine/triggerTask.test.ts b/apps/webapp/test/engine/triggerTask.test.ts index f7c487d4dd4..ab3e520c8d3 100644 --- a/apps/webapp/test/engine/triggerTask.test.ts +++ b/apps/webapp/test/engine/triggerTask.test.ts @@ -1294,10 +1294,12 @@ describe("DefaultQueueManager task metadata cache", () => { assertNonNullable(result); expect((result.run.annotations as { taskKind?: string } | null)?.taskKind).toBe("STANDARD"); - // Back-fill is fire-and-forget; allow a turn of the event loop for it to land. - await setTimeout(50); - - const backfilled = await cache.getCurrent(environment.id, taskIdentifier); + // Back-fill is fire-and-forget; poll with a bounded timeout to avoid CI flakes. + let backfilled = await cache.getCurrent(environment.id, taskIdentifier); + for (let i = 0; i < 40 && !backfilled; i++) { + await setTimeout(25); + backfilled = await cache.getCurrent(environment.id, taskIdentifier); + } expect(backfilled).not.toBeNull(); expect(backfilled?.triggerSource).toBe("STANDARD"); expect(backfilled?.queueName).toBe(`task/${taskIdentifier}`); @@ -1495,14 +1497,22 @@ describe("DefaultQueueManager task metadata cache", () => { const redis = new Redis(redisOptions); const cache = new RedisTaskMetadataCache({ redis }); + // Pre-clear any pre-existing cache state so the assertions below prove + // the rollback service did the write — not some other path. The test + // helpers don't currently populate the cache, but pre-clearing keeps the + // test bulletproof against future helper changes. + await redis.del(`task-meta:by-worker:${workerA.worker.id}`); + await redis.del(`task-meta:env:${environment.id}`); + expect(await cache.getByWorker(workerA.worker.id, taskIdentifier)).toBeNull(); + expect(await cache.getCurrent(environment.id, taskIdentifier)).toBeNull(); + // Manually rollback to A to exercise the cache-write side effect. const service = new ChangeCurrentDeploymentService(prisma, undefined, cache); await service.call(workerA.deployment, "rollback", true /* disableVersionCheck */); - // Allow the awaited cache write to settle. + // Both keyspaces should now reflect workerA. const entry = await cache.getCurrent(environment.id, taskIdentifier); expect(entry).not.toBeNull(); - // by-worker keyspace also written by syncTaskMetadataCache(isCurrent=true) const byWorkerEntry = await cache.getByWorker(workerA.worker.id, taskIdentifier); expect(byWorkerEntry).not.toBeNull(); expect(byWorkerEntry?.queueName).toBe(`task/${taskIdentifier}`); From 344135a0b00d300530050e9b4dc819dc96ae8302 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Fri, 15 May 2026 08:51:07 +0100 Subject: [PATCH 3/7] refactor(webapp): make task-metadata cache writes atomic per-call MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The cache now exposes populateByCurrentWorker(envId, workerId, entries) and setByCurrentWorker(envId, workerId, entry) which write both task-meta:env and task-meta:by-worker keyspaces in a single Lua transaction. Drops the syncTaskMetadataCache helper and its isCurrent boolean — each caller picks the right method based on context (DEV worker create + deploy promotion go through populateByCurrentWorker; V4 deploy build goes through populateByWorker). Two new Lua scripts via defineCommand: - taskMetaReplaceTwoHashes: DEL + HSET + EXPIRE both keys atomically - taskMetaSetTwoFields: HSET both keys + env-preserve-TTL + worker-refresh-TTL atomically The read-path back-fill for non-locked triggers now also writes both keyspaces atomically via setByCurrentWorker, so concurrent back-fills can't leave one keyspace populated and the other empty. Cache methods log+swallow Redis errors internally, so the outer tryCatch wrappers at the four call sites are gone — a Redis blip can't break any post-cache side effect. --- .../app/runEngine/concerns/queues.server.ts | 13 +- .../app/services/taskMetadataCache.server.ts | 228 ++++++++++++------ .../app/services/taskMetadataSync.server.ts | 22 -- .../changeCurrentDeployment.server.ts | 20 +- .../services/createBackgroundWorker.server.ts | 25 +- ...eateDeploymentBackgroundWorkerV3.server.ts | 26 +- ...eateDeploymentBackgroundWorkerV4.server.ts | 25 +- apps/webapp/test/engine/triggerTask.test.ts | 15 +- 8 files changed, 194 insertions(+), 180 deletions(-) delete mode 100644 apps/webapp/app/services/taskMetadataSync.server.ts diff --git a/apps/webapp/app/runEngine/concerns/queues.server.ts b/apps/webapp/app/runEngine/concerns/queues.server.ts index e0599e0e6c8..fc0d707132e 100644 --- a/apps/webapp/app/runEngine/concerns/queues.server.ts +++ b/apps/webapp/app/runEngine/concerns/queues.server.ts @@ -281,7 +281,7 @@ export class DefaultQueueManager implements QueueManager { // Fire-and-forget back-fill — `setByWorker` upserts the single field and // refreshes the hash TTL. Errors are logged inside the cache and swallowed. - this.taskMetaCache.setByWorker(workerId, entry).catch(() => {}); + void this.taskMetaCache.setByWorker(workerId, entry); return entry; } @@ -327,12 +327,11 @@ export class DefaultQueueManager implements QueueManager { queueName: row.queue?.name ?? "", }; - // Back-fill both keyspaces so a subsequent locked-or-not trigger hits the - // cache. `setCurrent` preserves the env hash's existing TTL boundary - // (promotion owns it); `setByWorker` refreshes the by-worker TTL to keep - // active workers warm. - this.taskMetaCache.setCurrent(environment.id, entry).catch(() => {}); - this.taskMetaCache.setByWorker(worker.id, entry).catch(() => {}); + // Fire-and-forget back-fill — atomically upserts the slug into both + // keyspaces so a subsequent locked-or-not trigger hits the cache. The + // env-keyspace TTL is preserved (promotion owns it); the by-worker TTL + // is refreshed (sliding window keeps active workers warm). + void this.taskMetaCache.setByCurrentWorker(environment.id, worker.id, entry); return entry; } diff --git a/apps/webapp/app/services/taskMetadataCache.server.ts b/apps/webapp/app/services/taskMetadataCache.server.ts index 1e98ae1e106..94349a61bd2 100644 --- a/apps/webapp/app/services/taskMetadataCache.server.ts +++ b/apps/webapp/app/services/taskMetadataCache.server.ts @@ -10,17 +10,39 @@ export type TaskMetadataEntry = { queueName: string; }; -export type TaskMetadataCache = { +export interface TaskMetadataCache { + /** Read a slug's metadata from the env keyspace (current pointer). */ getCurrent(envId: string, slug: string): Promise; + /** Read a slug's metadata from the by-worker keyspace (locked-version lookups). */ getByWorker(workerId: string, slug: string): Promise; - populateCurrent(envId: string, entries: TaskMetadataEntry[]): Promise; + /** + * Atomically replace both `task-meta:env:{envId}` and + * `task-meta:by-worker:{workerId}` with the given entries. Used at deploy + * promotion sites where the worker just became current for the env. + */ + populateByCurrentWorker( + envId: string, + workerId: string, + entries: TaskMetadataEntry[] + ): Promise; + /** + * Replace `task-meta:by-worker:{workerId}` only. Used at deploy build sites + * (V4) where the worker is created but not yet promoted. + */ populateByWorker(workerId: string, entries: TaskMetadataEntry[]): Promise; - /** Add a single field to the env keyspace without resetting the hash TTL. */ - setCurrent(envId: string, entry: TaskMetadataEntry): Promise; - /** Add a single field to the by-worker keyspace and refresh the hash TTL. */ + /** + * Atomically upsert one slug in both keyspaces. Used by the non-locked + * read-path back-fill. The env-keyspace TTL is only set when no TTL is + * present (preserves the promotion boundary); the by-worker TTL is + * refreshed on every call (sliding expiry). + */ + setByCurrentWorker(envId: string, workerId: string, entry: TaskMetadataEntry): Promise; + /** + * Upsert one slug in `task-meta:by-worker:{workerId}` only. Used by the + * locked-version read-path back-fill; refreshes the by-worker TTL. + */ setByWorker(workerId: string, entry: TaskMetadataEntry): Promise; - invalidateCurrent(envId: string): Promise; -}; +} export type RedisTaskMetadataCacheOptions = { redis: Redis; @@ -72,14 +94,11 @@ function byWorkerKey(workerId: string): string { } /** - * Atomically replace a HASH's contents and reset its TTL. + * Atomically replace a single HASH's contents and reset its TTL. * * KEYS[1] = hash key * ARGV[1] = ttl seconds (0 = no TTL) * ARGV[2..N] = alternating field, value pairs - * - * One round-trip; readers never observe the empty intermediate state that a - * naive DEL + HSET pipeline exposes. */ const REPLACE_HASH_LUA = ` redis.call("DEL", KEYS[1]) @@ -98,15 +117,46 @@ return 1 `; /** - * Set a single field and refresh the HASH TTL. + * Atomically replace BOTH keyspaces in one Redis transaction. Used at deploy + * promotion — the worker just became current for the env, so the env keyspace + * and the worker keyspace get the same field set. + * + * KEYS[1] = env hash key + * KEYS[2] = by-worker hash key + * ARGV[1] = env ttl seconds (0 = no TTL) + * ARGV[2] = by-worker ttl seconds (0 = no TTL) + * ARGV[3..N] = alternating field, value pairs (same for both hashes) + */ +const REPLACE_TWO_HASHES_LUA = ` +redis.call("DEL", KEYS[1]) +redis.call("DEL", KEYS[2]) +if #ARGV > 2 then + local fv = {} + for i = 3, #ARGV do + fv[#fv + 1] = ARGV[i] + end + redis.call("HSET", KEYS[1], unpack(fv)) + redis.call("HSET", KEYS[2], unpack(fv)) +end +local envTtl = tonumber(ARGV[1]) +if envTtl and envTtl > 0 then + redis.call("EXPIRE", KEYS[1], envTtl) +end +local workerTtl = tonumber(ARGV[2]) +if workerTtl and workerTtl > 0 then + redis.call("EXPIRE", KEYS[2], workerTtl) +end +return 1 +`; + +/** + * Set a single field and refresh the HASH TTL. Used by the locked-version + * back-fill path — sliding expiry keeps active workers warm. * * KEYS[1] = hash key * ARGV[1] = ttl seconds (0 = no TTL refresh) * ARGV[2] = field * ARGV[3] = value - * - * Used by the by-worker back-fill path — sliding-window expiry keeps active - * workers warm and lets idle workers age out. */ const SET_FIELD_REFRESH_TTL_LUA = ` redis.call("HSET", KEYS[1], ARGV[2], ARGV[3]) @@ -118,23 +168,27 @@ return 1 `; /** - * Set a single field and only set the HASH TTL if no TTL is set yet. - * - * KEYS[1] = hash key - * ARGV[1] = ttl seconds (0 = no TTL) - * ARGV[2] = field - * ARGV[3] = value + * Atomically upsert one field in BOTH keyspaces. Used by the non-locked + * back-fill path. The env-keyspace TTL is only set if no TTL is present + * (preserves the promotion boundary); the by-worker TTL is refreshed. * - * Used by the env back-fill path — the env keyspace TTL boundary is owned by - * `populateCurrent` (called at promotion). Back-fills shouldn't extend it; if - * a hash already has a TTL, we leave it alone so the safety net still expires - * on schedule. + * KEYS[1] = env hash key + * KEYS[2] = by-worker hash key + * ARGV[1] = env ttl seconds (0 = no TTL) + * ARGV[2] = by-worker ttl seconds (0 = no TTL) + * ARGV[3] = field + * ARGV[4] = value */ -const SET_FIELD_PRESERVE_TTL_LUA = ` -redis.call("HSET", KEYS[1], ARGV[2], ARGV[3]) -local ttl = tonumber(ARGV[1]) -if ttl and ttl > 0 and redis.call("TTL", KEYS[1]) == -1 then - redis.call("EXPIRE", KEYS[1], ttl) +const SET_TWO_FIELDS_LUA = ` +redis.call("HSET", KEYS[1], ARGV[3], ARGV[4]) +local envTtl = tonumber(ARGV[1]) +if envTtl and envTtl > 0 and redis.call("TTL", KEYS[1]) == -1 then + redis.call("EXPIRE", KEYS[1], envTtl) +end +redis.call("HSET", KEYS[2], ARGV[3], ARGV[4]) +local workerTtl = tonumber(ARGV[2]) +if workerTtl and workerTtl > 0 then + redis.call("EXPIRE", KEYS[2], workerTtl) end return 1 `; @@ -146,6 +200,13 @@ declare module "ioredis" { ttlSeconds: string, ...fieldValues: string[] ): Result; + taskMetaReplaceTwoHashes( + envKey: string, + workerKey: string, + envTtlSeconds: string, + workerTtlSeconds: string, + ...fieldValues: string[] + ): Result; taskMetaSetFieldRefreshTtl( key: string, ttlSeconds: string, @@ -153,9 +214,11 @@ declare module "ioredis" { value: string, callback?: Callback ): Result; - taskMetaSetFieldPreserveTtl( - key: string, - ttlSeconds: string, + taskMetaSetTwoFields( + envKey: string, + workerKey: string, + envTtlSeconds: string, + workerTtlSeconds: string, field: string, value: string, callback?: Callback @@ -177,13 +240,17 @@ export class RedisTaskMetadataCache implements TaskMetadataCache { numberOfKeys: 1, lua: REPLACE_HASH_LUA, }); + this.redis.defineCommand("taskMetaReplaceTwoHashes", { + numberOfKeys: 2, + lua: REPLACE_TWO_HASHES_LUA, + }); this.redis.defineCommand("taskMetaSetFieldRefreshTtl", { numberOfKeys: 1, lua: SET_FIELD_REFRESH_TTL_LUA, }); - this.redis.defineCommand("taskMetaSetFieldPreserveTtl", { - numberOfKeys: 1, - lua: SET_FIELD_PRESERVE_TTL_LUA, + this.redis.defineCommand("taskMetaSetTwoFields", { + numberOfKeys: 2, + lua: SET_TWO_FIELDS_LUA, }); } @@ -195,25 +262,68 @@ export class RedisTaskMetadataCache implements TaskMetadataCache { return this.#get(byWorkerKey(workerId), slug); } - async populateCurrent(envId: string, entries: TaskMetadataEntry[]): Promise { - await this.#replaceHash(currentEnvKey(envId), entries, this.currentEnvTtlSeconds); + async populateByCurrentWorker( + envId: string, + workerId: string, + entries: TaskMetadataEntry[] + ): Promise { + if (entries.length === 0) return; + try { + const argv: string[] = [ + String(this.currentEnvTtlSeconds), + String(this.byWorkerTtlSeconds), + ]; + for (const entry of entries) { + argv.push(entry.slug, encode(entry)); + } + await this.redis.taskMetaReplaceTwoHashes( + currentEnvKey(envId), + byWorkerKey(workerId), + ...argv + ); + } catch (error) { + logger.error("Failed to populate task metadata cache (current worker)", { + envId, + workerId, + error, + }); + } } async populateByWorker(workerId: string, entries: TaskMetadataEntry[]): Promise { - await this.#replaceHash(byWorkerKey(workerId), entries, this.byWorkerTtlSeconds); + if (entries.length === 0) return; + try { + const argv: string[] = [String(this.byWorkerTtlSeconds)]; + for (const entry of entries) { + argv.push(entry.slug, encode(entry)); + } + await this.redis.taskMetaReplaceHash(byWorkerKey(workerId), ...argv); + } catch (error) { + logger.error("Failed to populate task metadata cache (by worker)", { + workerId, + error, + }); + } } - async setCurrent(envId: string, entry: TaskMetadataEntry): Promise { + async setByCurrentWorker( + envId: string, + workerId: string, + entry: TaskMetadataEntry + ): Promise { try { - await this.redis.taskMetaSetFieldPreserveTtl( + await this.redis.taskMetaSetTwoFields( currentEnvKey(envId), + byWorkerKey(workerId), String(this.currentEnvTtlSeconds), + String(this.byWorkerTtlSeconds), entry.slug, encode(entry) ); } catch (error) { - logger.error("Failed to set task metadata current cache field", { + logger.error("Failed to set task metadata cache field (current worker)", { envId, + workerId, slug: entry.slug, error, }); @@ -229,7 +339,7 @@ export class RedisTaskMetadataCache implements TaskMetadataCache { encode(entry) ); } catch (error) { - logger.error("Failed to set task metadata by-worker cache field", { + logger.error("Failed to set task metadata cache field (by worker)", { workerId, slug: entry.slug, error, @@ -237,14 +347,6 @@ export class RedisTaskMetadataCache implements TaskMetadataCache { } } - async invalidateCurrent(envId: string): Promise { - try { - await this.redis.del(currentEnvKey(envId)); - } catch (error) { - logger.error("Failed to invalidate task metadata current cache", { envId, error }); - } - } - async #get(key: string, slug: string): Promise { try { const raw = await this.redis.hget(key, slug); @@ -255,22 +357,6 @@ export class RedisTaskMetadataCache implements TaskMetadataCache { return null; } } - - async #replaceHash( - key: string, - entries: TaskMetadataEntry[], - ttlSeconds: number - ): Promise { - try { - const argv: string[] = [String(ttlSeconds)]; - for (const entry of entries) { - argv.push(entry.slug, encode(entry)); - } - await this.redis.taskMetaReplaceHash(key, ...argv); - } catch (error) { - logger.error("Failed to replace task metadata cache hash", { key, error }); - } - } } export class NoopTaskMetadataCache implements TaskMetadataCache { @@ -282,7 +368,7 @@ export class NoopTaskMetadataCache implements TaskMetadataCache { return null; } - async populateCurrent(): Promise { + async populateByCurrentWorker(): Promise { // intentionally empty } @@ -290,15 +376,11 @@ export class NoopTaskMetadataCache implements TaskMetadataCache { // intentionally empty } - async setCurrent(): Promise { + async setByCurrentWorker(): Promise { // intentionally empty } async setByWorker(): Promise { // intentionally empty } - - async invalidateCurrent(): Promise { - // intentionally empty - } } diff --git a/apps/webapp/app/services/taskMetadataSync.server.ts b/apps/webapp/app/services/taskMetadataSync.server.ts deleted file mode 100644 index 882202ea5de..00000000000 --- a/apps/webapp/app/services/taskMetadataSync.server.ts +++ /dev/null @@ -1,22 +0,0 @@ -import { logger } from "./logger.server"; -import { taskMetadataCacheInstance } from "./taskMetadataCacheInstance.server"; -import type { TaskMetadataCache, TaskMetadataEntry } from "./taskMetadataCache.server"; - -export async function syncTaskMetadataCache( - envId: string, - workerId: string, - isCurrent: boolean, - entries: TaskMetadataEntry[], - cache: TaskMetadataCache = taskMetadataCacheInstance -): Promise { - if (entries.length === 0) return; - - try { - await cache.populateByWorker(workerId, entries); - if (isCurrent) { - await cache.populateCurrent(envId, entries); - } - } catch (error) { - logger.error("Failed to sync task metadata cache", { envId, workerId, error }); - } -} diff --git a/apps/webapp/app/v3/services/changeCurrentDeployment.server.ts b/apps/webapp/app/v3/services/changeCurrentDeployment.server.ts index 15a2824dd0c..6022c172908 100644 --- a/apps/webapp/app/v3/services/changeCurrentDeployment.server.ts +++ b/apps/webapp/app/v3/services/changeCurrentDeployment.server.ts @@ -8,7 +8,6 @@ import { type TaskMetadataEntry, } from "~/services/taskMetadataCache.server"; import { taskMetadataCacheInstance } from "~/services/taskMetadataCacheInstance.server"; -import { syncTaskMetadataCache } from "~/services/taskMetadataSync.server"; import { BaseService, ServiceValidationError } from "./baseService.server"; import { syncDeclarativeSchedules } from "./createBackgroundWorker.server"; import { ExecuteTasksWaitingForDeployService } from "./executeTasksWaitingForDeploy"; @@ -160,21 +159,12 @@ export class ChangeCurrentDeploymentService extends BaseService { queueName: t.queue?.name ?? "", })); - const [metaError] = await tryCatch( - syncTaskMetadataCache( - deployment.environmentId, - deployment.workerId!, - true, - metadataEntries, - this._taskMetaCache - ) + // Cache calls log+swallow internally. + await this._taskMetaCache.populateByCurrentWorker( + deployment.environmentId, + deployment.workerId!, + metadataEntries ); - - if (metaError) { - logger.error("Error syncing task metadata cache on deployment change", { - error: metaError, - }); - } } const [scheduleSyncError] = await tryCatch(this.#syncSchedulesForDeployment(deployment)); diff --git a/apps/webapp/app/v3/services/createBackgroundWorker.server.ts b/apps/webapp/app/v3/services/createBackgroundWorker.server.ts index 9035a2d4f84..17fc93a1abb 100644 --- a/apps/webapp/app/v3/services/createBackgroundWorker.server.ts +++ b/apps/webapp/app/v3/services/createBackgroundWorker.server.ts @@ -19,7 +19,6 @@ import { type TaskMetadataEntry, } from "~/services/taskMetadataCache.server"; import { taskMetadataCacheInstance } from "~/services/taskMetadataCacheInstance.server"; -import { syncTaskMetadataCache } from "~/services/taskMetadataSync.server"; import { generateFriendlyId } from "../friendlyIdentifiers"; import { removeQueueConcurrencyLimits, @@ -234,25 +233,17 @@ export class CreateBackgroundWorkerService extends BaseService { // worker by createdAt. Non-DEV (deploy-built) workers are not promoted // here — promotion writes the `:env:` keyspace later in // changeCurrentDeployment / createDeploymentBackgroundWorkerV3. - // Wrap in tryCatch so a Redis blip can't break the post-cache side - // effects below. + // Cache calls log+swallow internally, so a Redis blip can't break + // anything else here. if (workerTaskEntries && workerTaskEntries.length > 0) { - const [metaCacheError] = await tryCatch( - syncTaskMetadataCache( + if (environment.type === "DEVELOPMENT") { + await this._taskMetaCache.populateByCurrentWorker( environment.id, backgroundWorker.id, - environment.type === "DEVELOPMENT", - workerTaskEntries, - this._taskMetaCache - ) - ); - - if (metaCacheError) { - logger.error("Error syncing task metadata cache", { - error: metaCacheError, - backgroundWorker, - environment, - }); + workerTaskEntries + ); + } else { + await this._taskMetaCache.populateByWorker(backgroundWorker.id, workerTaskEntries); } } diff --git a/apps/webapp/app/v3/services/createDeploymentBackgroundWorkerV3.server.ts b/apps/webapp/app/v3/services/createDeploymentBackgroundWorkerV3.server.ts index 2a122872887..6a943cfd6c6 100644 --- a/apps/webapp/app/v3/services/createDeploymentBackgroundWorkerV3.server.ts +++ b/apps/webapp/app/v3/services/createDeploymentBackgroundWorkerV3.server.ts @@ -5,7 +5,6 @@ import { logger } from "~/services/logger.server"; import { syncTaskIdentifiers } from "~/services/taskIdentifierRegistry.server"; import { type TaskMetadataCache } from "~/services/taskMetadataCache.server"; import { taskMetadataCacheInstance } from "~/services/taskMetadataCacheInstance.server"; -import { syncTaskMetadataCache } from "~/services/taskMetadataSync.server"; import { socketIo } from "../handleSocketIo.server"; import { updateEnvConcurrencyLimits } from "../runQueue.server"; import { PerformDeploymentAlertsService } from "./alerts/performDeploymentAlerts.server"; @@ -168,27 +167,14 @@ export class CreateDeploymentBackgroundWorkerServiceV3 extends BaseService { } // V3 promotes the deployment immediately above, so this worker is now - // current for the env — write both keyspaces. Wrap in tryCatch so a - // Redis blip can't strand the post-cache side effects below (waiting - // runs flush, alerts, timeout cancellation). + // current for the env — write both keyspaces atomically. Cache calls + // log+swallow internally. if (workerTaskEntries.length > 0) { - const [metaCacheError] = await tryCatch( - syncTaskMetadataCache( - environment.id, - backgroundWorker.id, - true, - workerTaskEntries, - this._taskMetaCache - ) + await this._taskMetaCache.populateByCurrentWorker( + environment.id, + backgroundWorker.id, + workerTaskEntries ); - - if (metaCacheError) { - logger.error("Error syncing task metadata cache on deployment", { - error: metaCacheError, - deploymentId: deployment.id, - workerId: backgroundWorker.id, - }); - } } try { diff --git a/apps/webapp/app/v3/services/createDeploymentBackgroundWorkerV4.server.ts b/apps/webapp/app/v3/services/createDeploymentBackgroundWorkerV4.server.ts index 23c8651e4ca..466b68a7d54 100644 --- a/apps/webapp/app/v3/services/createDeploymentBackgroundWorkerV4.server.ts +++ b/apps/webapp/app/v3/services/createDeploymentBackgroundWorkerV4.server.ts @@ -4,7 +4,6 @@ import type { BackgroundWorker, PrismaClientOrTransaction, WorkerDeployment } fr import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; import { type TaskMetadataCache } from "~/services/taskMetadataCache.server"; import { taskMetadataCacheInstance } from "~/services/taskMetadataCacheInstance.server"; -import { syncTaskMetadataCache } from "~/services/taskMetadataSync.server"; import { BaseService, ServiceValidationError } from "./baseService.server"; import { createBackgroundFiles, @@ -152,27 +151,13 @@ export class CreateDeploymentBackgroundWorkerServiceV4 extends BaseService { // only the `task-meta:by-worker:{workerId}` keyspace so locked-version // triggers against this build hit the cache. Promotion (which writes the // env keyspace) happens later via finalizeDeployment → changeCurrentDeployment. - // Wrap in tryCatch so a Redis blip can't break the deployment state - // machine — without this the build would stall in BUILDING and only - // surface as a misleading "Indexing timed out". + // Cache calls log+swallow internally, so a Redis blip can't stall the + // deployment state machine. if (workerTaskEntries && workerTaskEntries.length > 0) { - const [metaCacheError] = await tryCatch( - syncTaskMetadataCache( - environment.id, - backgroundWorker.id, - false, - workerTaskEntries, - this._taskMetaCache - ) + await this._taskMetaCache.populateByWorker( + backgroundWorker.id, + workerTaskEntries ); - - if (metaCacheError) { - logger.error("Error syncing task metadata cache on build", { - error: metaCacheError, - deploymentId: deployment.id, - workerId: backgroundWorker.id, - }); - } } const [schedulesError] = await tryCatch( diff --git a/apps/webapp/test/engine/triggerTask.test.ts b/apps/webapp/test/engine/triggerTask.test.ts index ab3e520c8d3..62296632bbe 100644 --- a/apps/webapp/test/engine/triggerTask.test.ts +++ b/apps/webapp/test/engine/triggerTask.test.ts @@ -1198,7 +1198,7 @@ describe("DefaultQueueManager task metadata cache", () => { const environment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); const taskIdentifier = "cached-task"; - await setupBackgroundWorker(engine, environment, taskIdentifier); + const setup = await setupBackgroundWorker(engine, environment, taskIdentifier); const redis = new Redis(redisOptions); const cache = new RedisTaskMetadataCache({ redis }); @@ -1206,7 +1206,7 @@ describe("DefaultQueueManager task metadata cache", () => { // Pre-populate cache with AGENT triggerSource; DB row has the default STANDARD. // If the read path hits the cache, the resulting TaskRun.taskKind reflects the // cached value. If it falls through to PG, it reflects STANDARD. - await cache.populateCurrent(environment.id, [ + await cache.populateByCurrentWorker(environment.id, setup.worker.id, [ { slug: taskIdentifier, ttl: null, @@ -1329,14 +1329,14 @@ describe("DefaultQueueManager task metadata cache", () => { const environment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); const taskIdentifier = "override-task"; - await setupBackgroundWorker(engine, environment, taskIdentifier); + const setup = await setupBackgroundWorker(engine, environment, taskIdentifier); const redis = new Redis(redisOptions); const cache = new RedisTaskMetadataCache({ redis }); // Cache says AGENT; DB row says STANDARD. Caller provides both a queue // override and an explicit TTL — the hot path the PR regressed. - await cache.populateCurrent(environment.id, [ + await cache.populateByCurrentWorker(environment.id, setup.worker.id, [ { slug: taskIdentifier, ttl: null, @@ -1406,7 +1406,10 @@ describe("DefaultQueueManager task metadata cache", () => { const cache = new RedisTaskMetadataCache({ redis }); // Populate the two keyspaces with conflicting triggerSource values so we - // can tell which keyspace the read used. + // can tell which keyspace the read used. The real worker's by-worker + // hash gets AGENT; the env hash gets SCHEDULED (seeded via a throwaway + // worker id since `populateByCurrentWorker` writes both keyspaces and + // we want the real worker's by-worker hash untouched). await cache.populateByWorker(worker.worker.id, [ { slug: taskIdentifier, @@ -1416,7 +1419,7 @@ describe("DefaultQueueManager task metadata cache", () => { queueName: `task/${taskIdentifier}`, }, ]); - await cache.populateCurrent(environment.id, [ + await cache.populateByCurrentWorker(environment.id, "dummy-worker-for-env-seed", [ { slug: taskIdentifier, ttl: null, From 09a1a3b1df486b1e68344389b5a0e2a395f33b87 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Fri, 15 May 2026 09:04:37 +0100 Subject: [PATCH 4/7] fix(webapp): reject locked triggers when the task is missing on that worker, clear stale cache on zero-task deploys Two CodeRabbit findings: 1. The locked + queue-override branch silently accepted a missing task slug on the locked worker version, leaving `taskKind` / `ttl` as undefined instead of erroring. Now matches the no-override branch and throws a ServiceValidationError if the task isn't on the locked worker. 2. The cache populate methods short-circuited on empty entries, leaving stale hashes in Redis when a worker registered or got promoted with zero tasks. The Lua scripts already handle the empty-entries case (DEL + no HSET), so just drop the gates at the cache class and at the four call sites so promotion correctly clears prior data. --- .../app/runEngine/concerns/queues.server.ts | 11 +++++++++-- .../app/services/taskMetadataCache.server.ts | 6 ++++-- .../v3/services/createBackgroundWorker.server.ts | 5 +++-- .../createDeploymentBackgroundWorkerV3.server.ts | 15 +++++++-------- .../createDeploymentBackgroundWorkerV4.server.ts | 9 +++------ 5 files changed, 26 insertions(+), 20 deletions(-) diff --git a/apps/webapp/app/runEngine/concerns/queues.server.ts b/apps/webapp/app/runEngine/concerns/queues.server.ts index fc0d707132e..2e5e5f0dfe5 100644 --- a/apps/webapp/app/runEngine/concerns/queues.server.ts +++ b/apps/webapp/app/runEngine/concerns/queues.server.ts @@ -124,10 +124,17 @@ export class DefaultQueueManager implements QueueManager { request.taskId ); + if (!lockedMeta) { + throw new ServiceValidationError( + `Task '${request.taskId}' not found on locked version '${lockedBackgroundWorker.version ?? "" + }'.` + ); + } + if (request.body.options?.ttl === undefined) { - taskTtl = lockedMeta?.ttl ?? undefined; + taskTtl = lockedMeta.ttl ?? undefined; } - taskKind = lockedMeta?.triggerSource; + taskKind = lockedMeta.triggerSource; } else { // No queue override - resolve default queue + TTL + triggerSource via cache, // falling back to a single BackgroundWorkerTask lookup on miss. diff --git a/apps/webapp/app/services/taskMetadataCache.server.ts b/apps/webapp/app/services/taskMetadataCache.server.ts index 94349a61bd2..a67bc699866 100644 --- a/apps/webapp/app/services/taskMetadataCache.server.ts +++ b/apps/webapp/app/services/taskMetadataCache.server.ts @@ -267,8 +267,10 @@ export class RedisTaskMetadataCache implements TaskMetadataCache { workerId: string, entries: TaskMetadataEntry[] ): Promise { - if (entries.length === 0) return; try { + // Always invoke the script — empty `entries` is valid and causes both + // keyspaces to be cleared (DEL + no HSET), which is the right behavior + // when promoting a worker with no tasks. const argv: string[] = [ String(this.currentEnvTtlSeconds), String(this.byWorkerTtlSeconds), @@ -291,8 +293,8 @@ export class RedisTaskMetadataCache implements TaskMetadataCache { } async populateByWorker(workerId: string, entries: TaskMetadataEntry[]): Promise { - if (entries.length === 0) return; try { + // Always invoke the script — empty `entries` clears the keyspace. const argv: string[] = [String(this.byWorkerTtlSeconds)]; for (const entry of entries) { argv.push(entry.slug, encode(entry)); diff --git a/apps/webapp/app/v3/services/createBackgroundWorker.server.ts b/apps/webapp/app/v3/services/createBackgroundWorker.server.ts index 17fc93a1abb..ec3f6d077ad 100644 --- a/apps/webapp/app/v3/services/createBackgroundWorker.server.ts +++ b/apps/webapp/app/v3/services/createBackgroundWorker.server.ts @@ -234,8 +234,9 @@ export class CreateBackgroundWorkerService extends BaseService { // here — promotion writes the `:env:` keyspace later in // changeCurrentDeployment / createDeploymentBackgroundWorkerV3. // Cache calls log+swallow internally, so a Redis blip can't break - // anything else here. - if (workerTaskEntries && workerTaskEntries.length > 0) { + // anything else here. Empty `workerTaskEntries` is intentional — the + // populate methods clear stale hashes for zero-task deploys. + if (workerTaskEntries) { if (environment.type === "DEVELOPMENT") { await this._taskMetaCache.populateByCurrentWorker( environment.id, diff --git a/apps/webapp/app/v3/services/createDeploymentBackgroundWorkerV3.server.ts b/apps/webapp/app/v3/services/createDeploymentBackgroundWorkerV3.server.ts index 6a943cfd6c6..d8f13227d78 100644 --- a/apps/webapp/app/v3/services/createDeploymentBackgroundWorkerV3.server.ts +++ b/apps/webapp/app/v3/services/createDeploymentBackgroundWorkerV3.server.ts @@ -168,14 +168,13 @@ export class CreateDeploymentBackgroundWorkerServiceV3 extends BaseService { // V3 promotes the deployment immediately above, so this worker is now // current for the env — write both keyspaces atomically. Cache calls - // log+swallow internally. - if (workerTaskEntries.length > 0) { - await this._taskMetaCache.populateByCurrentWorker( - environment.id, - backgroundWorker.id, - workerTaskEntries - ); - } + // log+swallow internally. Empty `workerTaskEntries` is intentional: the + // populate methods clear stale hashes for zero-task deploys. + await this._taskMetaCache.populateByCurrentWorker( + environment.id, + backgroundWorker.id, + workerTaskEntries + ); try { //send a notification that a new worker has been created diff --git a/apps/webapp/app/v3/services/createDeploymentBackgroundWorkerV4.server.ts b/apps/webapp/app/v3/services/createDeploymentBackgroundWorkerV4.server.ts index 466b68a7d54..ff041359bb0 100644 --- a/apps/webapp/app/v3/services/createDeploymentBackgroundWorkerV4.server.ts +++ b/apps/webapp/app/v3/services/createDeploymentBackgroundWorkerV4.server.ts @@ -152,12 +152,9 @@ export class CreateDeploymentBackgroundWorkerServiceV4 extends BaseService { // triggers against this build hit the cache. Promotion (which writes the // env keyspace) happens later via finalizeDeployment → changeCurrentDeployment. // Cache calls log+swallow internally, so a Redis blip can't stall the - // deployment state machine. - if (workerTaskEntries && workerTaskEntries.length > 0) { - await this._taskMetaCache.populateByWorker( - backgroundWorker.id, - workerTaskEntries - ); + // deployment state machine. Empty entries clears stale hashes. + if (workerTaskEntries) { + await this._taskMetaCache.populateByWorker(backgroundWorker.id, workerTaskEntries); } const [schedulesError] = await tryCatch( From 2c0fae226d290e1f8ac18f17b2d54e9a88fd96c6 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Fri, 15 May 2026 09:45:40 +0100 Subject: [PATCH 5/7] fix(webapp): unblock CI on the task-metadata cache PR MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two CI failures on the prior push: 1. Typecheck TS2556 in taskMetadataCache.server.ts: the populate methods were spreading a mixed string[] into the Lua `defineCommand` rest parameter. Pass the fixed positional args (key, ttl) directly and spread only the variable field/value pairs. 2. Webapp shard 6: the cache test file pulled in ChangeCurrentDeploymentService, whose transitive import chain reaches triggerTaskV1.server.ts → autoIncrementCounter.server.ts which throws at module load when REDIS_HOST/REDIS_PORT aren't in the shard's env. Drop test 5 (deploy-promotion side effect) — it doesn't belong in triggerTask.test.ts anyway. --- .../app/services/taskMetadataCache.server.ts | 21 ++++--- apps/webapp/test/engine/triggerTask.test.ts | 58 ------------------- 2 files changed, 12 insertions(+), 67 deletions(-) diff --git a/apps/webapp/app/services/taskMetadataCache.server.ts b/apps/webapp/app/services/taskMetadataCache.server.ts index a67bc699866..4b2ee56f253 100644 --- a/apps/webapp/app/services/taskMetadataCache.server.ts +++ b/apps/webapp/app/services/taskMetadataCache.server.ts @@ -271,17 +271,16 @@ export class RedisTaskMetadataCache implements TaskMetadataCache { // Always invoke the script — empty `entries` is valid and causes both // keyspaces to be cleared (DEL + no HSET), which is the right behavior // when promoting a worker with no tasks. - const argv: string[] = [ - String(this.currentEnvTtlSeconds), - String(this.byWorkerTtlSeconds), - ]; + const fieldValues: string[] = []; for (const entry of entries) { - argv.push(entry.slug, encode(entry)); + fieldValues.push(entry.slug, encode(entry)); } await this.redis.taskMetaReplaceTwoHashes( currentEnvKey(envId), byWorkerKey(workerId), - ...argv + String(this.currentEnvTtlSeconds), + String(this.byWorkerTtlSeconds), + ...fieldValues ); } catch (error) { logger.error("Failed to populate task metadata cache (current worker)", { @@ -295,11 +294,15 @@ export class RedisTaskMetadataCache implements TaskMetadataCache { async populateByWorker(workerId: string, entries: TaskMetadataEntry[]): Promise { try { // Always invoke the script — empty `entries` clears the keyspace. - const argv: string[] = [String(this.byWorkerTtlSeconds)]; + const fieldValues: string[] = []; for (const entry of entries) { - argv.push(entry.slug, encode(entry)); + fieldValues.push(entry.slug, encode(entry)); } - await this.redis.taskMetaReplaceHash(byWorkerKey(workerId), ...argv); + await this.redis.taskMetaReplaceHash( + byWorkerKey(workerId), + String(this.byWorkerTtlSeconds), + ...fieldValues + ); } catch (error) { logger.error("Failed to populate task metadata cache (by worker)", { workerId, diff --git a/apps/webapp/test/engine/triggerTask.test.ts b/apps/webapp/test/engine/triggerTask.test.ts index 62296632bbe..798e39e0601 100644 --- a/apps/webapp/test/engine/triggerTask.test.ts +++ b/apps/webapp/test/engine/triggerTask.test.ts @@ -24,7 +24,6 @@ import { Redis } from "ioredis"; import { IdempotencyKeyConcern } from "~/runEngine/concerns/idempotencyKeys.server"; import { DefaultQueueManager } from "~/runEngine/concerns/queues.server"; import { RedisTaskMetadataCache } from "~/services/taskMetadataCache.server"; -import { ChangeCurrentDeploymentService } from "~/v3/services/changeCurrentDeployment.server"; import { EntitlementValidationParams, MaxAttemptsValidationParams, @@ -1467,61 +1466,4 @@ describe("DefaultQueueManager task metadata cache", () => { await engine.quit(); } ); - - containerTest( - "ChangeCurrentDeploymentService promotes the env cache to the new worker", - async ({ prisma, redisOptions }) => { - const engine = new RunEngine({ - prisma, - worker: { redis: redisOptions, workers: 1, tasksPerWorker: 10, pollIntervalMs: 100 }, - queue: { redis: redisOptions }, - runLock: { redis: redisOptions }, - machines: { - defaultMachine: "small-1x", - machines: { - "small-1x": { name: "small-1x", cpu: 0.5, memory: 0.5, centsPerMs: 0.0001 }, - }, - baseCostInCents: 0.0005, - }, - tracer: trace.getTracer("test", "0.0.0"), - }); - - const environment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); - const taskIdentifier = "promotion-task"; - - // Worker A → setupBackgroundWorker auto-creates a deployment + promotes it. - const workerA = await setupBackgroundWorker(engine, environment, taskIdentifier); - // Worker B → setupBackgroundWorker overrides the promotion to point at B. - const workerB = await setupBackgroundWorker(engine, environment, taskIdentifier); - - assertNonNullable(workerA.deployment); - assertNonNullable(workerB.deployment); - - const redis = new Redis(redisOptions); - const cache = new RedisTaskMetadataCache({ redis }); - - // Pre-clear any pre-existing cache state so the assertions below prove - // the rollback service did the write — not some other path. The test - // helpers don't currently populate the cache, but pre-clearing keeps the - // test bulletproof against future helper changes. - await redis.del(`task-meta:by-worker:${workerA.worker.id}`); - await redis.del(`task-meta:env:${environment.id}`); - expect(await cache.getByWorker(workerA.worker.id, taskIdentifier)).toBeNull(); - expect(await cache.getCurrent(environment.id, taskIdentifier)).toBeNull(); - - // Manually rollback to A to exercise the cache-write side effect. - const service = new ChangeCurrentDeploymentService(prisma, undefined, cache); - await service.call(workerA.deployment, "rollback", true /* disableVersionCheck */); - - // Both keyspaces should now reflect workerA. - const entry = await cache.getCurrent(environment.id, taskIdentifier); - expect(entry).not.toBeNull(); - const byWorkerEntry = await cache.getByWorker(workerA.worker.id, taskIdentifier); - expect(byWorkerEntry).not.toBeNull(); - expect(byWorkerEntry?.queueName).toBe(`task/${taskIdentifier}`); - - await redis.quit(); - await engine.quit(); - } - ); }); From b9fc87b0cffe6272199d6c4a229365084f9559c0 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Fri, 15 May 2026 10:47:38 +0100 Subject: [PATCH 6/7] fix(webapp): close back-fill-vs-promotion race on env-keyspace cache writes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Stamp the env hash with an `__owner_worker_id` field at promotion time; the back-fill Lua script reads it and CAS-skips the env write if it doesn't match the worker the back-filler resolved to. Scenario the guard closes: 1. Resolver reads env cache, misses 2. Resolver reads PG: findCurrentWorker -> A, BWT for A -> A's data 3. [promotion to B fires, atomically replaces env hash + by-worker:B] 4. Resolver fires back-fill with workerId=A -> env hash now stamped owner=B; A != B -> back-fill skips env write -> by-worker:A still written (harmless; key contains workerId, dead worker) The by-worker keyspace doesn't need a CAS check — the key itself contains the workerId, so stale writes land in a dead worker's keyspace and are never read. Devin Review flagged this as a sub-millisecond race bounded by the 24h env TTL. Cost to close it is one extra HGET inside the Lua call (atomic with the HSET, no extra round trip) and one extra HSET on the promotion write. --- .../app/services/taskMetadataCache.server.ts | 64 +++++++++++++++---- 1 file changed, 50 insertions(+), 14 deletions(-) diff --git a/apps/webapp/app/services/taskMetadataCache.server.ts b/apps/webapp/app/services/taskMetadataCache.server.ts index 4b2ee56f253..6130295a73f 100644 --- a/apps/webapp/app/services/taskMetadataCache.server.ts +++ b/apps/webapp/app/services/taskMetadataCache.server.ts @@ -116,28 +116,43 @@ end return 1 `; +/** + * Reserved field name on env hashes that records the worker currently + * "owning" the env keyspace. The back-fill Lua script reads this and skips + * its env-side write if the owner has flipped — closing the race where a + * concurrent promotion atomically replaces the env hash between a resolver's + * PG read and its back-fill write. Customer task slugs are kebab/camelCase + * and never start with `__`, so collisions are not a concern; an accidental + * `getCurrent(envId, "__owner_worker_id")` would JSON.parse-fail and fall + * back to PG, not corrupt anything. + */ +const OWNER_FIELD = "__owner_worker_id"; + /** * Atomically replace BOTH keyspaces in one Redis transaction. Used at deploy * promotion — the worker just became current for the env, so the env keyspace - * and the worker keyspace get the same field set. + * and the worker keyspace get the same field set, and the env hash is + * stamped with the new owner workerId. * * KEYS[1] = env hash key * KEYS[2] = by-worker hash key * ARGV[1] = env ttl seconds (0 = no TTL) * ARGV[2] = by-worker ttl seconds (0 = no TTL) - * ARGV[3..N] = alternating field, value pairs (same for both hashes) + * ARGV[3] = workerId (env-hash owner marker) + * ARGV[4..N] = alternating field, value pairs (same for both hashes) */ const REPLACE_TWO_HASHES_LUA = ` redis.call("DEL", KEYS[1]) redis.call("DEL", KEYS[2]) -if #ARGV > 2 then +if #ARGV > 3 then local fv = {} - for i = 3, #ARGV do + for i = 4, #ARGV do fv[#fv + 1] = ARGV[i] end redis.call("HSET", KEYS[1], unpack(fv)) redis.call("HSET", KEYS[2], unpack(fv)) end +redis.call("HSET", KEYS[1], "${OWNER_FIELD}", ARGV[3]) local envTtl = tonumber(ARGV[1]) if envTtl and envTtl > 0 then redis.call("EXPIRE", KEYS[1], envTtl) @@ -169,27 +184,44 @@ return 1 /** * Atomically upsert one field in BOTH keyspaces. Used by the non-locked - * back-fill path. The env-keyspace TTL is only set if no TTL is present - * (preserves the promotion boundary); the by-worker TTL is refreshed. + * back-fill path. + * + * The by-worker hash always gets written (the key contains the workerId, so + * stale data lands in a dead worker's keyspace and is never read by anyone + * not pinned to that version). + * + * The env hash is CAS-guarded by `${OWNER_FIELD}`: if a concurrent promotion + * has replaced the hash between this resolver's PG read and this write, the + * stored owner won't match the workerId the back-filler resolved to, so the + * env write is skipped — preventing the back-fill from overwriting a freshly + * promoted slug with stale data from the previous worker. * * KEYS[1] = env hash key * KEYS[2] = by-worker hash key * ARGV[1] = env ttl seconds (0 = no TTL) * ARGV[2] = by-worker ttl seconds (0 = no TTL) - * ARGV[3] = field - * ARGV[4] = value + * ARGV[3] = writer's expected env-hash owner workerId + * ARGV[4] = field + * ARGV[5] = value */ const SET_TWO_FIELDS_LUA = ` -redis.call("HSET", KEYS[1], ARGV[3], ARGV[4]) -local envTtl = tonumber(ARGV[1]) -if envTtl and envTtl > 0 and redis.call("TTL", KEYS[1]) == -1 then - redis.call("EXPIRE", KEYS[1], envTtl) -end -redis.call("HSET", KEYS[2], ARGV[3], ARGV[4]) +redis.call("HSET", KEYS[2], ARGV[4], ARGV[5]) local workerTtl = tonumber(ARGV[2]) if workerTtl and workerTtl > 0 then redis.call("EXPIRE", KEYS[2], workerTtl) end + +local owner = redis.call("HGET", KEYS[1], "${OWNER_FIELD}") +if owner == false or owner == ARGV[3] then + redis.call("HSET", KEYS[1], ARGV[4], ARGV[5]) + if owner == false then + redis.call("HSET", KEYS[1], "${OWNER_FIELD}", ARGV[3]) + end + local envTtl = tonumber(ARGV[1]) + if envTtl and envTtl > 0 and redis.call("TTL", KEYS[1]) == -1 then + redis.call("EXPIRE", KEYS[1], envTtl) + end +end return 1 `; @@ -205,6 +237,7 @@ declare module "ioredis" { workerKey: string, envTtlSeconds: string, workerTtlSeconds: string, + workerId: string, ...fieldValues: string[] ): Result; taskMetaSetFieldRefreshTtl( @@ -219,6 +252,7 @@ declare module "ioredis" { workerKey: string, envTtlSeconds: string, workerTtlSeconds: string, + workerId: string, field: string, value: string, callback?: Callback @@ -280,6 +314,7 @@ export class RedisTaskMetadataCache implements TaskMetadataCache { byWorkerKey(workerId), String(this.currentEnvTtlSeconds), String(this.byWorkerTtlSeconds), + workerId, ...fieldValues ); } catch (error) { @@ -322,6 +357,7 @@ export class RedisTaskMetadataCache implements TaskMetadataCache { byWorkerKey(workerId), String(this.currentEnvTtlSeconds), String(this.byWorkerTtlSeconds), + workerId, entry.slug, encode(entry) ); From 710df372e5db846e1360b17661b64631d55c917b Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Fri, 15 May 2026 11:36:39 +0100 Subject: [PATCH 7/7] fix(webapp): tolerate missing task on locked + queue-override branch MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Match main's pre-PR behavior — when a caller passes `lockToVersion` and a queue override and the task slug isn't registered on that worker version, fall through with `taskKind = undefined` (coalesced to "STANDARD" downstream) and `taskTtl = undefined` instead of throwing. This reverts the throw added earlier in this PR. The trade-off review landed on "strict consistency check is better" twice (CodeRabbit recommended it, Devin pushed back, we kept the throw) — but the net is a behavioral change in the trigger API, and customers running `lockToVersion` + a queue override against a slug that doesn't exist on that version would start getting 4xx errors where they got silent defaults before. Default-to-silent matches main and avoids the surprise. The no-override branch keeps its throw because there's no queue to route to in that case — the failure mode there is unrecoverable. --- .../app/runEngine/concerns/queues.server.ts | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/apps/webapp/app/runEngine/concerns/queues.server.ts b/apps/webapp/app/runEngine/concerns/queues.server.ts index 2e5e5f0dfe5..2fc35fc8435 100644 --- a/apps/webapp/app/runEngine/concerns/queues.server.ts +++ b/apps/webapp/app/runEngine/concerns/queues.server.ts @@ -118,23 +118,23 @@ export class DefaultQueueManager implements QueueManager { // Pull `triggerSource` (for `taskKind` annotation) and `ttl` from cache. // On cache hit this is 0 PG queries; on miss the helper falls back to // a BackgroundWorkerTask lookup and back-fills the cache. + // + // If the task slug isn't on this locked worker version, we tolerate + // the missing row and fall through with `taskKind = undefined` + // (coalesced to "STANDARD" downstream) and `taskTtl = undefined`. + // This matches main's pre-PR behavior — the no-override branch below + // still throws because there's no queue to route to in that case, + // but here the caller already named the queue. const lockedMeta = await this.resolveLockedTaskMetadata( lockedBackgroundWorker.id, request.environment.id, request.taskId ); - if (!lockedMeta) { - throw new ServiceValidationError( - `Task '${request.taskId}' not found on locked version '${lockedBackgroundWorker.version ?? "" - }'.` - ); - } - if (request.body.options?.ttl === undefined) { - taskTtl = lockedMeta.ttl ?? undefined; + taskTtl = lockedMeta?.ttl ?? undefined; } - taskKind = lockedMeta.triggerSource; + taskKind = lockedMeta?.triggerSource; } else { // No queue override - resolve default queue + TTL + triggerSource via cache, // falling back to a single BackgroundWorkerTask lookup on miss.