Skip to content

Commit 344135a

Browse files
committed
refactor(webapp): make task-metadata cache writes atomic per-call
The cache now exposes populateByCurrentWorker(envId, workerId, entries) and setByCurrentWorker(envId, workerId, entry) which write both task-meta:env and task-meta:by-worker keyspaces in a single Lua transaction. Drops the syncTaskMetadataCache helper and its isCurrent boolean — each caller picks the right method based on context (DEV worker create + deploy promotion go through populateByCurrentWorker; V4 deploy build goes through populateByWorker). Two new Lua scripts via defineCommand: - taskMetaReplaceTwoHashes: DEL + HSET + EXPIRE both keys atomically - taskMetaSetTwoFields: HSET both keys + env-preserve-TTL + worker-refresh-TTL atomically The read-path back-fill for non-locked triggers now also writes both keyspaces atomically via setByCurrentWorker, so concurrent back-fills can't leave one keyspace populated and the other empty. Cache methods log+swallow Redis errors internally, so the outer tryCatch wrappers at the four call sites are gone — a Redis blip can't break any post-cache side effect.
1 parent 1a06ef6 commit 344135a

8 files changed

Lines changed: 194 additions & 180 deletions

apps/webapp/app/runEngine/concerns/queues.server.ts

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,7 @@ export class DefaultQueueManager implements QueueManager {
281281

282282
// Fire-and-forget back-fill — `setByWorker` upserts the single field and
283283
// refreshes the hash TTL. Errors are logged inside the cache and swallowed.
284-
this.taskMetaCache.setByWorker(workerId, entry).catch(() => {});
284+
void this.taskMetaCache.setByWorker(workerId, entry);
285285

286286
return entry;
287287
}
@@ -327,12 +327,11 @@ export class DefaultQueueManager implements QueueManager {
327327
queueName: row.queue?.name ?? "",
328328
};
329329

330-
// Back-fill both keyspaces so a subsequent locked-or-not trigger hits the
331-
// cache. `setCurrent` preserves the env hash's existing TTL boundary
332-
// (promotion owns it); `setByWorker` refreshes the by-worker TTL to keep
333-
// active workers warm.
334-
this.taskMetaCache.setCurrent(environment.id, entry).catch(() => {});
335-
this.taskMetaCache.setByWorker(worker.id, entry).catch(() => {});
330+
// Fire-and-forget back-fill — atomically upserts the slug into both
331+
// keyspaces so a subsequent locked-or-not trigger hits the cache. The
332+
// env-keyspace TTL is preserved (promotion owns it); the by-worker TTL
333+
// is refreshed (sliding window keeps active workers warm).
334+
void this.taskMetaCache.setByCurrentWorker(environment.id, worker.id, entry);
336335

337336
return entry;
338337
}

apps/webapp/app/services/taskMetadataCache.server.ts

Lines changed: 155 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -10,17 +10,39 @@ export type TaskMetadataEntry = {
1010
queueName: string;
1111
};
1212

13-
export type TaskMetadataCache = {
13+
export interface TaskMetadataCache {
14+
/** Read a slug's metadata from the env keyspace (current pointer). */
1415
getCurrent(envId: string, slug: string): Promise<TaskMetadataEntry | null>;
16+
/** Read a slug's metadata from the by-worker keyspace (locked-version lookups). */
1517
getByWorker(workerId: string, slug: string): Promise<TaskMetadataEntry | null>;
16-
populateCurrent(envId: string, entries: TaskMetadataEntry[]): Promise<void>;
18+
/**
19+
* Atomically replace both `task-meta:env:{envId}` and
20+
* `task-meta:by-worker:{workerId}` with the given entries. Used at deploy
21+
* promotion sites where the worker just became current for the env.
22+
*/
23+
populateByCurrentWorker(
24+
envId: string,
25+
workerId: string,
26+
entries: TaskMetadataEntry[]
27+
): Promise<void>;
28+
/**
29+
* Replace `task-meta:by-worker:{workerId}` only. Used at deploy build sites
30+
* (V4) where the worker is created but not yet promoted.
31+
*/
1732
populateByWorker(workerId: string, entries: TaskMetadataEntry[]): Promise<void>;
18-
/** Add a single field to the env keyspace without resetting the hash TTL. */
19-
setCurrent(envId: string, entry: TaskMetadataEntry): Promise<void>;
20-
/** Add a single field to the by-worker keyspace and refresh the hash TTL. */
33+
/**
34+
* Atomically upsert one slug in both keyspaces. Used by the non-locked
35+
* read-path back-fill. The env-keyspace TTL is only set when no TTL is
36+
* present (preserves the promotion boundary); the by-worker TTL is
37+
* refreshed on every call (sliding expiry).
38+
*/
39+
setByCurrentWorker(envId: string, workerId: string, entry: TaskMetadataEntry): Promise<void>;
40+
/**
41+
* Upsert one slug in `task-meta:by-worker:{workerId}` only. Used by the
42+
* locked-version read-path back-fill; refreshes the by-worker TTL.
43+
*/
2144
setByWorker(workerId: string, entry: TaskMetadataEntry): Promise<void>;
22-
invalidateCurrent(envId: string): Promise<void>;
23-
};
45+
}
2446

2547
export type RedisTaskMetadataCacheOptions = {
2648
redis: Redis;
@@ -72,14 +94,11 @@ function byWorkerKey(workerId: string): string {
7294
}
7395

7496
/**
75-
* Atomically replace a HASH's contents and reset its TTL.
97+
* Atomically replace a single HASH's contents and reset its TTL.
7698
*
7799
* KEYS[1] = hash key
78100
* ARGV[1] = ttl seconds (0 = no TTL)
79101
* ARGV[2..N] = alternating field, value pairs
80-
*
81-
* One round-trip; readers never observe the empty intermediate state that a
82-
* naive DEL + HSET pipeline exposes.
83102
*/
84103
const REPLACE_HASH_LUA = `
85104
redis.call("DEL", KEYS[1])
@@ -98,15 +117,46 @@ return 1
98117
`;
99118

100119
/**
101-
* Set a single field and refresh the HASH TTL.
120+
* Atomically replace BOTH keyspaces in one Redis transaction. Used at deploy
121+
* promotion — the worker just became current for the env, so the env keyspace
122+
* and the worker keyspace get the same field set.
123+
*
124+
* KEYS[1] = env hash key
125+
* KEYS[2] = by-worker hash key
126+
* ARGV[1] = env ttl seconds (0 = no TTL)
127+
* ARGV[2] = by-worker ttl seconds (0 = no TTL)
128+
* ARGV[3..N] = alternating field, value pairs (same for both hashes)
129+
*/
130+
const REPLACE_TWO_HASHES_LUA = `
131+
redis.call("DEL", KEYS[1])
132+
redis.call("DEL", KEYS[2])
133+
if #ARGV > 2 then
134+
local fv = {}
135+
for i = 3, #ARGV do
136+
fv[#fv + 1] = ARGV[i]
137+
end
138+
redis.call("HSET", KEYS[1], unpack(fv))
139+
redis.call("HSET", KEYS[2], unpack(fv))
140+
end
141+
local envTtl = tonumber(ARGV[1])
142+
if envTtl and envTtl > 0 then
143+
redis.call("EXPIRE", KEYS[1], envTtl)
144+
end
145+
local workerTtl = tonumber(ARGV[2])
146+
if workerTtl and workerTtl > 0 then
147+
redis.call("EXPIRE", KEYS[2], workerTtl)
148+
end
149+
return 1
150+
`;
151+
152+
/**
153+
* Set a single field and refresh the HASH TTL. Used by the locked-version
154+
* back-fill path — sliding expiry keeps active workers warm.
102155
*
103156
* KEYS[1] = hash key
104157
* ARGV[1] = ttl seconds (0 = no TTL refresh)
105158
* ARGV[2] = field
106159
* ARGV[3] = value
107-
*
108-
* Used by the by-worker back-fill path — sliding-window expiry keeps active
109-
* workers warm and lets idle workers age out.
110160
*/
111161
const SET_FIELD_REFRESH_TTL_LUA = `
112162
redis.call("HSET", KEYS[1], ARGV[2], ARGV[3])
@@ -118,23 +168,27 @@ return 1
118168
`;
119169

120170
/**
121-
* Set a single field and only set the HASH TTL if no TTL is set yet.
122-
*
123-
* KEYS[1] = hash key
124-
* ARGV[1] = ttl seconds (0 = no TTL)
125-
* ARGV[2] = field
126-
* ARGV[3] = value
171+
* Atomically upsert one field in BOTH keyspaces. Used by the non-locked
172+
* back-fill path. The env-keyspace TTL is only set if no TTL is present
173+
* (preserves the promotion boundary); the by-worker TTL is refreshed.
127174
*
128-
* Used by the env back-fill path — the env keyspace TTL boundary is owned by
129-
* `populateCurrent` (called at promotion). Back-fills shouldn't extend it; if
130-
* a hash already has a TTL, we leave it alone so the safety net still expires
131-
* on schedule.
175+
* KEYS[1] = env hash key
176+
* KEYS[2] = by-worker hash key
177+
* ARGV[1] = env ttl seconds (0 = no TTL)
178+
* ARGV[2] = by-worker ttl seconds (0 = no TTL)
179+
* ARGV[3] = field
180+
* ARGV[4] = value
132181
*/
133-
const SET_FIELD_PRESERVE_TTL_LUA = `
134-
redis.call("HSET", KEYS[1], ARGV[2], ARGV[3])
135-
local ttl = tonumber(ARGV[1])
136-
if ttl and ttl > 0 and redis.call("TTL", KEYS[1]) == -1 then
137-
redis.call("EXPIRE", KEYS[1], ttl)
182+
const SET_TWO_FIELDS_LUA = `
183+
redis.call("HSET", KEYS[1], ARGV[3], ARGV[4])
184+
local envTtl = tonumber(ARGV[1])
185+
if envTtl and envTtl > 0 and redis.call("TTL", KEYS[1]) == -1 then
186+
redis.call("EXPIRE", KEYS[1], envTtl)
187+
end
188+
redis.call("HSET", KEYS[2], ARGV[3], ARGV[4])
189+
local workerTtl = tonumber(ARGV[2])
190+
if workerTtl and workerTtl > 0 then
191+
redis.call("EXPIRE", KEYS[2], workerTtl)
138192
end
139193
return 1
140194
`;
@@ -146,16 +200,25 @@ declare module "ioredis" {
146200
ttlSeconds: string,
147201
...fieldValues: string[]
148202
): Result<number, Context>;
203+
taskMetaReplaceTwoHashes(
204+
envKey: string,
205+
workerKey: string,
206+
envTtlSeconds: string,
207+
workerTtlSeconds: string,
208+
...fieldValues: string[]
209+
): Result<number, Context>;
149210
taskMetaSetFieldRefreshTtl(
150211
key: string,
151212
ttlSeconds: string,
152213
field: string,
153214
value: string,
154215
callback?: Callback<number>
155216
): Result<number, Context>;
156-
taskMetaSetFieldPreserveTtl(
157-
key: string,
158-
ttlSeconds: string,
217+
taskMetaSetTwoFields(
218+
envKey: string,
219+
workerKey: string,
220+
envTtlSeconds: string,
221+
workerTtlSeconds: string,
159222
field: string,
160223
value: string,
161224
callback?: Callback<number>
@@ -177,13 +240,17 @@ export class RedisTaskMetadataCache implements TaskMetadataCache {
177240
numberOfKeys: 1,
178241
lua: REPLACE_HASH_LUA,
179242
});
243+
this.redis.defineCommand("taskMetaReplaceTwoHashes", {
244+
numberOfKeys: 2,
245+
lua: REPLACE_TWO_HASHES_LUA,
246+
});
180247
this.redis.defineCommand("taskMetaSetFieldRefreshTtl", {
181248
numberOfKeys: 1,
182249
lua: SET_FIELD_REFRESH_TTL_LUA,
183250
});
184-
this.redis.defineCommand("taskMetaSetFieldPreserveTtl", {
185-
numberOfKeys: 1,
186-
lua: SET_FIELD_PRESERVE_TTL_LUA,
251+
this.redis.defineCommand("taskMetaSetTwoFields", {
252+
numberOfKeys: 2,
253+
lua: SET_TWO_FIELDS_LUA,
187254
});
188255
}
189256

@@ -195,25 +262,68 @@ export class RedisTaskMetadataCache implements TaskMetadataCache {
195262
return this.#get(byWorkerKey(workerId), slug);
196263
}
197264

198-
async populateCurrent(envId: string, entries: TaskMetadataEntry[]): Promise<void> {
199-
await this.#replaceHash(currentEnvKey(envId), entries, this.currentEnvTtlSeconds);
265+
async populateByCurrentWorker(
266+
envId: string,
267+
workerId: string,
268+
entries: TaskMetadataEntry[]
269+
): Promise<void> {
270+
if (entries.length === 0) return;
271+
try {
272+
const argv: string[] = [
273+
String(this.currentEnvTtlSeconds),
274+
String(this.byWorkerTtlSeconds),
275+
];
276+
for (const entry of entries) {
277+
argv.push(entry.slug, encode(entry));
278+
}
279+
await this.redis.taskMetaReplaceTwoHashes(
280+
currentEnvKey(envId),
281+
byWorkerKey(workerId),
282+
...argv
283+
);
284+
} catch (error) {
285+
logger.error("Failed to populate task metadata cache (current worker)", {
286+
envId,
287+
workerId,
288+
error,
289+
});
290+
}
200291
}
201292

202293
async populateByWorker(workerId: string, entries: TaskMetadataEntry[]): Promise<void> {
203-
await this.#replaceHash(byWorkerKey(workerId), entries, this.byWorkerTtlSeconds);
294+
if (entries.length === 0) return;
295+
try {
296+
const argv: string[] = [String(this.byWorkerTtlSeconds)];
297+
for (const entry of entries) {
298+
argv.push(entry.slug, encode(entry));
299+
}
300+
await this.redis.taskMetaReplaceHash(byWorkerKey(workerId), ...argv);
301+
} catch (error) {
302+
logger.error("Failed to populate task metadata cache (by worker)", {
303+
workerId,
304+
error,
305+
});
306+
}
204307
}
205308

206-
async setCurrent(envId: string, entry: TaskMetadataEntry): Promise<void> {
309+
async setByCurrentWorker(
310+
envId: string,
311+
workerId: string,
312+
entry: TaskMetadataEntry
313+
): Promise<void> {
207314
try {
208-
await this.redis.taskMetaSetFieldPreserveTtl(
315+
await this.redis.taskMetaSetTwoFields(
209316
currentEnvKey(envId),
317+
byWorkerKey(workerId),
210318
String(this.currentEnvTtlSeconds),
319+
String(this.byWorkerTtlSeconds),
211320
entry.slug,
212321
encode(entry)
213322
);
214323
} catch (error) {
215-
logger.error("Failed to set task metadata current cache field", {
324+
logger.error("Failed to set task metadata cache field (current worker)", {
216325
envId,
326+
workerId,
217327
slug: entry.slug,
218328
error,
219329
});
@@ -229,22 +339,14 @@ export class RedisTaskMetadataCache implements TaskMetadataCache {
229339
encode(entry)
230340
);
231341
} catch (error) {
232-
logger.error("Failed to set task metadata by-worker cache field", {
342+
logger.error("Failed to set task metadata cache field (by worker)", {
233343
workerId,
234344
slug: entry.slug,
235345
error,
236346
});
237347
}
238348
}
239349

240-
async invalidateCurrent(envId: string): Promise<void> {
241-
try {
242-
await this.redis.del(currentEnvKey(envId));
243-
} catch (error) {
244-
logger.error("Failed to invalidate task metadata current cache", { envId, error });
245-
}
246-
}
247-
248350
async #get(key: string, slug: string): Promise<TaskMetadataEntry | null> {
249351
try {
250352
const raw = await this.redis.hget(key, slug);
@@ -255,22 +357,6 @@ export class RedisTaskMetadataCache implements TaskMetadataCache {
255357
return null;
256358
}
257359
}
258-
259-
async #replaceHash(
260-
key: string,
261-
entries: TaskMetadataEntry[],
262-
ttlSeconds: number
263-
): Promise<void> {
264-
try {
265-
const argv: string[] = [String(ttlSeconds)];
266-
for (const entry of entries) {
267-
argv.push(entry.slug, encode(entry));
268-
}
269-
await this.redis.taskMetaReplaceHash(key, ...argv);
270-
} catch (error) {
271-
logger.error("Failed to replace task metadata cache hash", { key, error });
272-
}
273-
}
274360
}
275361

276362
export class NoopTaskMetadataCache implements TaskMetadataCache {
@@ -282,23 +368,19 @@ export class NoopTaskMetadataCache implements TaskMetadataCache {
282368
return null;
283369
}
284370

285-
async populateCurrent(): Promise<void> {
371+
async populateByCurrentWorker(): Promise<void> {
286372
// intentionally empty
287373
}
288374

289375
async populateByWorker(): Promise<void> {
290376
// intentionally empty
291377
}
292378

293-
async setCurrent(): Promise<void> {
379+
async setByCurrentWorker(): Promise<void> {
294380
// intentionally empty
295381
}
296382

297383
async setByWorker(): Promise<void> {
298384
// intentionally empty
299385
}
300-
301-
async invalidateCurrent(): Promise<void> {
302-
// intentionally empty
303-
}
304386
}

0 commit comments

Comments
 (0)