@@ -17,6 +17,8 @@ import { tryCatch } from "@trigger.dev/core/v3";
1717import { ServiceValidationError } from "~/v3/services/common.server" ;
1818import { createCache , createLRUMemoryStore , DefaultStatefulContext , Namespace } from "@internal/cache" ;
1919import { singleton } from "~/utils/singleton" ;
20+ import type { TaskMetadataCache , TaskMetadataEntry } from "~/services/taskMetadataCache.server" ;
21+ import { taskMetadataCacheInstance } from "~/services/taskMetadataCacheInstance.server" ;
2022
2123// LRU cache for environment queue sizes to reduce Redis calls
2224const queueSizeCache = singleton ( "queueSizeCache" , ( ) => {
@@ -63,13 +65,16 @@ function extractQueueName(queue: { name?: unknown } | undefined): string | undef
6365
6466export class DefaultQueueManager implements QueueManager {
6567 private readonly replicaPrisma : PrismaClientOrTransaction ;
68+ private readonly taskMetaCache : TaskMetadataCache ;
6669
6770 constructor (
6871 private readonly prisma : PrismaClientOrTransaction ,
6972 private readonly engine : RunEngine ,
70- replicaPrisma ?: PrismaClientOrTransaction
73+ replicaPrisma ?: PrismaClientOrTransaction ,
74+ taskMetaCache : TaskMetadataCache = taskMetadataCacheInstance
7175 ) {
7276 this . replicaPrisma = replicaPrisma ?? prisma ;
77+ this . taskMetaCache = taskMetaCache ;
7378 }
7479
7580 async resolveQueueProperties (
@@ -87,7 +92,10 @@ export class DefaultQueueManager implements QueueManager {
8792 const specifiedQueueName = extractQueueName ( request . body . options ?. queue ) ;
8893
8994 if ( specifiedQueueName ) {
90- // A specific queue name is provided, validate it exists for the locked worker
95+ // A specific queue name is provided, validate it exists for the locked worker.
96+ // Pre-existing query — not cached because TaskQueue rows can be added or
97+ // removed independently of BackgroundWorkerTask, and a stale "queue exists"
98+ // claim would silently route to the wrong queue.
9199 const specifiedQueue = await this . prisma . taskQueue . findFirst ( {
92100 where : {
93101 name : specifiedQueueName ,
@@ -107,49 +115,38 @@ export class DefaultQueueManager implements QueueManager {
107115 queueName = specifiedQueue . name ;
108116 lockedQueueId = specifiedQueue . id ;
109117
110- // Always fetch the task so we can resolve `triggerSource` (which
111- // becomes `taskKind` on annotations and replicates to ClickHouse).
112- // Without this, AGENT/SCHEDULED runs triggered with
113- // `lockToVersion` + a queue override would be annotated as
114- // STANDARD and disappear from the run-list "Source" filter.
115- // `ttl` is read from the same row but only used when the caller
116- // didn't specify a per-trigger TTL.
117- const lockedTask = await this . replicaPrisma . backgroundWorkerTask . findFirst ( {
118- where : {
119- workerId : lockedBackgroundWorker . id ,
120- runtimeEnvironmentId : request . environment . id ,
121- slug : request . taskId ,
122- } ,
123- select : { ttl : true , triggerSource : true } ,
124- } ) ;
118+ // Pull `triggerSource` (for `taskKind` annotation) and `ttl` from cache.
119+ // On cache hit this is 0 PG queries; on miss the helper falls back to
120+ // a BackgroundWorkerTask lookup and back-fills the cache.
121+ const lockedMeta = await this . resolveLockedTaskMetadata (
122+ lockedBackgroundWorker . id ,
123+ request . environment . id ,
124+ request . taskId
125+ ) ;
125126
126127 if ( request . body . options ?. ttl === undefined ) {
127- taskTtl = lockedTask ?. ttl ;
128+ taskTtl = lockedMeta ?. ttl ?? undefined ;
128129 }
129- taskKind = lockedTask ?. triggerSource ;
130+ taskKind = lockedMeta ?. triggerSource ;
130131 } else {
131- // No queue override - fetch task with queue to get both default queue and TTL
132- const lockedTask = await this . replicaPrisma . backgroundWorkerTask . findFirst ( {
133- where : {
134- workerId : lockedBackgroundWorker . id ,
135- runtimeEnvironmentId : request . environment . id ,
136- slug : request . taskId ,
137- } ,
138- include : {
139- queue : true ,
140- } ,
141- } ) ;
132+ // No queue override - resolve default queue + TTL + triggerSource via cache,
133+ // falling back to a single BackgroundWorkerTask lookup on miss.
134+ const lockedMeta = await this . resolveLockedTaskMetadata (
135+ lockedBackgroundWorker . id ,
136+ request . environment . id ,
137+ request . taskId
138+ ) ;
142139
143- if ( ! lockedTask ) {
140+ if ( ! lockedMeta ) {
144141 throw new ServiceValidationError (
145142 `Task '${ request . taskId } ' not found on locked version '${ lockedBackgroundWorker . version ?? "<unknown>"
146143 } '.`
147144 ) ;
148145 }
149146
150- taskTtl = lockedTask . ttl ;
147+ taskTtl = lockedMeta . ttl ;
151148
152- if ( ! lockedTask . queue ) {
149+ if ( ! lockedMeta . queueName ) {
153150 // This case should ideally be prevented by earlier checks or schema constraints,
154151 // but handle it defensively.
155152 logger . error ( "Task found on locked version, but has no associated queue record" , {
@@ -164,9 +161,9 @@ export class DefaultQueueManager implements QueueManager {
164161 }
165162
166163 // Use the task's default queue name
167- queueName = lockedTask . queue . name ;
168- lockedQueueId = lockedTask . queue . id ;
169- taskKind = lockedTask . triggerSource ;
164+ queueName = lockedMeta . queueName ;
165+ lockedQueueId = lockedMeta . queueId ?? undefined ;
166+ taskKind = lockedMeta . triggerSource ;
170167 }
171168 } else {
172169 // Task is not locked to a specific version, use regular logic
@@ -213,76 +210,131 @@ export class DefaultQueueManager implements QueueManager {
213210
214211 const defaultQueueName = `task/${ taskId } ` ;
215212
216- // Even when the caller provides both a queue override and a
217- // per-trigger TTL, we still need to fetch the task so `triggerSource`
218- // (which becomes `taskKind` on annotations and replicates to
219- // ClickHouse) is populated. Without it, AGENT/SCHEDULED runs hitting
220- // this path get stamped as STANDARD and disappear from the
221- // dashboard's `Source` filter. Mirrors the locked-worker fix above
222- // — `taskTtl` is harmless in the returned value because the call
223- // site coalesces `body.options.ttl ?? taskTtl`.
224-
225- // Find the current worker for the environment. Replica is fine here —
226- // the adjacent `backgroundWorkerTask` lookups below already use
227- // `replicaPrisma` (replica lag for "just deployed" is bounded the same
228- // way for both queries; reading the worker from the writer and the
229- // task from the replica would only widen the inconsistency window).
230- const worker = await findCurrentWorkerFromEnvironment ( environment , this . replicaPrisma ) ;
213+ // Resolve the current worker's task metadata via cache (HGET on warm path,
214+ // BackgroundWorkerTask findFirst + cache back-fill on miss). When this hits,
215+ // both the queue-override + TTL caller and the default-queue caller satisfy
216+ // their full result without any database query.
217+ const meta = await this . resolveCurrentTaskMetadata ( environment , taskId ) ;
218+
219+ if ( overriddenQueueName ) {
220+ // Caller already named the queue. We only need triggerSource (for taskKind)
221+ // and ttl (for the call site to coalesce against body.options.ttl).
222+ return {
223+ queueName : overriddenQueueName ,
224+ taskTtl : meta ?. ttl ?? undefined ,
225+ taskKind : meta ?. triggerSource ,
226+ } ;
227+ }
231228
232- if ( ! worker ) {
233- logger . debug ( "Failed to get queue name: No worker found" , {
229+ if ( ! meta ) {
230+ logger . debug ( "Failed to get queue name: No worker or task found" , {
234231 taskId,
235232 environmentId : environment . id ,
236233 } ) ;
237-
238- return { queueName : overriddenQueueName ?? defaultQueueName , taskTtl : undefined } ;
234+ return { queueName : defaultQueueName , taskTtl : undefined } ;
239235 }
240236
241- // When queue is overridden, we only need TTL from the task (no queue join needed)
242- if ( overriddenQueueName ) {
243- const task = await this . replicaPrisma . backgroundWorkerTask . findFirst ( {
244- where : {
245- workerId : worker . id ,
246- runtimeEnvironmentId : environment . id ,
247- slug : taskId ,
248- } ,
249- select : { ttl : true , triggerSource : true } ,
237+ if ( ! meta . queueName ) {
238+ logger . debug ( "Failed to get queue name: No queue found" , {
239+ taskId,
240+ environmentId : environment . id ,
250241 } ) ;
251-
252- return { queueName : overriddenQueueName , taskTtl : task ?. ttl , taskKind : task ?. triggerSource } ;
242+ return { queueName : defaultQueueName , taskTtl : meta . ttl , taskKind : meta . triggerSource } ;
253243 }
254244
255- const task = await this . replicaPrisma . backgroundWorkerTask . findFirst ( {
256- where : {
257- workerId : worker . id ,
258- runtimeEnvironmentId : environment . id ,
259- slug : taskId ,
260- } ,
261- include : {
262- queue : true ,
245+ return { queueName : meta . queueName , taskTtl : meta . ttl , taskKind : meta . triggerSource } ;
246+ }
247+
248+ /**
249+ * Resolve task metadata for a locked-version trigger. Reads from the
250+ * `task-meta:by-worker:{workerId}` Redis hash; falls back to a single
251+ * BackgroundWorkerTask findFirst on miss and back-fills the cache.
252+ *
253+ * Returns null when no BackgroundWorkerTask row exists.
254+ */
255+ private async resolveLockedTaskMetadata (
256+ workerId : string ,
257+ environmentId : string ,
258+ slug : string
259+ ) : Promise < TaskMetadataEntry | null > {
260+ const cached = await this . taskMetaCache . getByWorker ( workerId , slug ) ;
261+ if ( cached ) return cached ;
262+
263+ const row = await this . replicaPrisma . backgroundWorkerTask . findFirst ( {
264+ where : { workerId, runtimeEnvironmentId : environmentId , slug } ,
265+ select : {
266+ ttl : true ,
267+ triggerSource : true ,
268+ queue : { select : { id : true , name : true } } ,
263269 } ,
264270 } ) ;
265271
266- if ( ! task ) {
267- console . log ( "Failed to get queue name: No task found" , {
268- taskId,
269- environmentId : environment . id ,
270- } ) ;
272+ if ( ! row ) return null ;
271273
272- return { queueName : defaultQueueName , taskTtl : undefined } ;
273- }
274+ const entry : TaskMetadataEntry = {
275+ slug,
276+ ttl : row . ttl ,
277+ triggerSource : row . triggerSource ,
278+ queueId : row . queue ?. id ?? null ,
279+ queueName : row . queue ?. name ?? "" ,
280+ } ;
274281
275- if ( ! task . queue ) {
276- console . log ( "Failed to get queue name: No queue found" , {
277- taskId,
278- environmentId : environment . id ,
279- queueConfig : task . queueConfig ,
280- } ) ;
282+ // Fire-and-forget back-fill — `setByWorker` upserts the single field and
283+ // refreshes the hash TTL. Errors are logged inside the cache and swallowed.
284+ this . taskMetaCache . setByWorker ( workerId , entry ) . catch ( ( ) => { } ) ;
281285
282- return { queueName : defaultQueueName , taskTtl : task . ttl , taskKind : task . triggerSource } ;
283- }
286+ return entry ;
287+ }
288+
289+ /**
290+ * Resolve task metadata for a non-locked trigger. Reads from the
291+ * `task-meta:env:{envId}` Redis hash; falls back to
292+ * findCurrentWorkerFromEnvironment + a single BackgroundWorkerTask findFirst
293+ * on miss and back-fills both keyspaces.
294+ *
295+ * Returns null when no current worker or task can be resolved.
296+ */
297+ private async resolveCurrentTaskMetadata (
298+ environment : AuthenticatedEnvironment ,
299+ slug : string
300+ ) : Promise < TaskMetadataEntry | null > {
301+ const cached = await this . taskMetaCache . getCurrent ( environment . id , slug ) ;
302+ if ( cached ) return cached ;
303+
304+ // Cold cache: discover the current worker for the env. Replica is fine —
305+ // the adjacent BackgroundWorkerTask lookup below uses `replicaPrisma` too
306+ // (replica lag for "just deployed" is bounded the same way for both
307+ // queries; reading from the writer here would only widen the window).
308+ const worker = await findCurrentWorkerFromEnvironment ( environment , this . replicaPrisma ) ;
309+ if ( ! worker ) return null ;
310+
311+ const row = await this . replicaPrisma . backgroundWorkerTask . findFirst ( {
312+ where : { workerId : worker . id , runtimeEnvironmentId : environment . id , slug } ,
313+ select : {
314+ ttl : true ,
315+ triggerSource : true ,
316+ queue : { select : { id : true , name : true } } ,
317+ } ,
318+ } ) ;
319+
320+ if ( ! row ) return null ;
321+
322+ const entry : TaskMetadataEntry = {
323+ slug,
324+ ttl : row . ttl ,
325+ triggerSource : row . triggerSource ,
326+ queueId : row . queue ?. id ?? null ,
327+ queueName : row . queue ?. name ?? "" ,
328+ } ;
329+
330+ // Back-fill both keyspaces so a subsequent locked-or-not trigger hits the
331+ // cache. `setCurrent` preserves the env hash's existing TTL boundary
332+ // (promotion owns it); `setByWorker` refreshes the by-worker TTL to keep
333+ // active workers warm.
334+ this . taskMetaCache . setCurrent ( environment . id , entry ) . catch ( ( ) => { } ) ;
335+ this . taskMetaCache . setByWorker ( worker . id , entry ) . catch ( ( ) => { } ) ;
284336
285- return { queueName : task . queue . name ?? defaultQueueName , taskTtl : task . ttl , taskKind : task . triggerSource } ;
337+ return entry ;
286338 }
287339
288340 async validateQueueLimits (
0 commit comments