From a89a5571b8b1a1afee5b9f2ed2fc69a76bf3b529 Mon Sep 17 00:00:00 2001 From: Dan Sutton Date: Tue, 5 May 2026 11:20:38 +0100 Subject: [PATCH 1/4] test(run-engine): repro for stale worker queue entry after sweeper ack Adds a failing test in concurrencySweeper.test.ts that demonstrates an inconsistency between the worker queue list and the message body store: - Fast-path enqueue RPUSHes the messageKey value onto the worker queue list and SADDs the run into currentConcurrency. - The sweeper marks the run as completed (via test callback) and processMarkedRun acks with removeFromWorkerQueue: false, which DELs the message body but skips the LREM on the worker queue list. - A subsequent blocking dequeue BLPOPs the stale messageKey, the GET returns nil, and the dequeue path emits "Failed to dequeue message from worker queue" with workerQueueLength: 0. The test asserts that the dequeue path does not log this error after the sweeper has acked the run. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../tests/concurrencySweeper.test.ts | 96 +++++++++++++++++++ 1 file changed, 96 insertions(+) diff --git a/internal-packages/run-engine/src/run-queue/tests/concurrencySweeper.test.ts b/internal-packages/run-engine/src/run-queue/tests/concurrencySweeper.test.ts index 739a6bb190b..22bd7b483d4 100644 --- a/internal-packages/run-engine/src/run-queue/tests/concurrencySweeper.test.ts +++ b/internal-packages/run-engine/src/run-queue/tests/concurrencySweeper.test.ts @@ -175,4 +175,100 @@ describe("RunQueue Concurrency Sweeper", () => { } } ); + + // When the sweeper acks a run whose messageKey value is still sitting on the worker + // queue list (e.g. fast-path enqueued, never BLPOP'd), it deletes the message body but + // leaves a stale tombstone on the list. The next BLPOP returns that tombstone, GET + // returns nil, and the dequeue path logs "Failed to dequeue message from worker queue". + redisTest( + "should not produce 'Failed to dequeue message from worker queue' after sweeper acks a fast-path-enqueued run", + async ({ redisContainer }) => { + let enableConcurrencySweeper = false; + const logger = new Logger("RunQueue", "debug"); + const errorSpy = vi.spyOn(logger, "error"); + + const queue = new RunQueue({ + ...testOptions, + logger, + logLevel: "debug", + queueSelectionStrategy: new FairQueueSelectionStrategy({ + redis: { + keyPrefix: "runqueue:test:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + keys: testOptions.keys, + }), + workerOptions: { + pollIntervalMs: 100, + immediatePollIntervalMs: 100, + }, + redis: { + keyPrefix: "runqueue:test:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + concurrencySweeper: { + scanSchedule: "* * * * * *", + scanJitterInMs: 5, + processMarkedSchedule: "* * * * * *", + processMarkedJitterInMs: 5, + callback: async (runIds) => { + if (!enableConcurrencySweeper) { + return []; + } + return [{ id: messageDev.runId, orgId: "o1234" }]; + }, + }, + }); + + try { + await queue.updateEnvConcurrencyLimits(authenticatedEnvDev); + + // Fast-path enqueue: SET messageKey, RPUSH messageKeyValue onto worker queue list, + // SADD runId into currentConcurrency. The message is on the list waiting to be popped. + await queue.enqueueMessage({ + env: authenticatedEnvDev, + message: messageDev, + workerQueue: authenticatedEnvDev.id, + enableFastPath: true, + }); + + // Pre-conditions: list has the entry, run is "in-flight" per operational concurrency, + // body exists. Fast-path bumps the operational currentConcurrency (SADD) but not + // currentDequeued — the displayed concurrency is bumped only when a worker BLPOPs. + expect(await queue.peekAllOnWorkerQueue(authenticatedEnvDev.id)).toHaveLength(1); + expect(await queue.operationalCurrentConcurrencyOfEnvironment(authenticatedEnvDev)).toBe(1); + expect(await queue.readMessage(messageDev.orgId, messageDev.runId)).toBeDefined(); + + // Sweeper now considers the run completed (test callback returns it), so + // processMarkedRun acks with removeFromWorkerQueue: false. + enableConcurrencySweeper = true; + await setTimeout(5_000); + + // Sweeper has run: operational concurrency released, message body deleted. + expect(await queue.operationalCurrentConcurrencyOfEnvironment(authenticatedEnvDev)).toBe(0); + expect(await queue.readMessage(messageDev.orgId, messageDev.runId)).toBeUndefined(); + + // Trigger a blocking dequeue with a short timeout — production uses blockingPop:true, + // which is the only path that emits this error log. + const dequeued = await queue.dequeueMessageFromWorkerQueue( + "test_consumer", + authenticatedEnvDev.id, + { blockingPop: true, blockingPopTimeoutSeconds: 2 } + ); + expect(dequeued).toBeUndefined(); + + // BUG: the dequeue path logs the exact Sentry-visible error. Match the message and + // the structured fields one-to-one with what TRIGGER-CLOUD-VC reports. + const failedDequeueErrors = errorSpy.mock.calls.filter( + ([msg]) => msg === "Failed to dequeue message from worker queue" + ); + expect(failedDequeueErrors).toHaveLength(0); + } finally { + errorSpy.mockRestore(); + await queue.quit(); + } + } + ); }); From dc986984b597e8d96e81aa5f3d8cb42ea5971cad Mon Sep 17 00:00:00 2001 From: Dan Sutton Date: Tue, 5 May 2026 14:03:45 +0100 Subject: [PATCH 2/4] fix(run-engine): sweeper LREMs the worker queue list when acking marked runs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit processMarkedRun previously called acknowledgeMessage with removeFromWorkerQueue: false. The Lua script always DELs the message body but only LREMs the worker queue list when the flag is set, leaving a stale messageKey on the list whenever the swept run had been pushed to the worker queue list (fast-path enqueue or processQueueForWorkerQueue promotion) but never BLPOP'd. The next BLPOP returns the tombstone, GET on the messageKey returns nil, and the dequeue path logs "Failed to dequeue message from worker queue". Switch to removeFromWorkerQueue: true. Cost is one extra LREM per swept run (O(N) on the list); production telemetry shows sweeper acks at <0.33/sec hard upper bound on worker queue lists averaging 200-500 entries — under 0.05% Redis CPU even at peak. The repro test added in the previous commit flips to passing with this change. Co-Authored-By: Claude Opus 4.7 (1M context) --- internal-packages/run-engine/src/run-queue/index.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal-packages/run-engine/src/run-queue/index.ts b/internal-packages/run-engine/src/run-queue/index.ts index de0df73ad05..74f08471bb9 100644 --- a/internal-packages/run-engine/src/run-queue/index.ts +++ b/internal-packages/run-engine/src/run-queue/index.ts @@ -2847,7 +2847,7 @@ export class RunQueue { await this.acknowledgeMessage(run.orgId, run.messageId, { skipDequeueProcessing: true, - removeFromWorkerQueue: false, + removeFromWorkerQueue: true, }); } From eacd023535bcb30e6c38a95d3b3e8ba2605f6872 Mon Sep 17 00:00:00 2001 From: Dan Sutton Date: Tue, 5 May 2026 14:46:50 +0100 Subject: [PATCH 3/4] chore: add server-changes entry for run-queue sweeper stale-entry fix Co-Authored-By: Claude Opus 4.7 (1M context) --- .server-changes/run-queue-sweeper-stale-entry.md | 9 +++++++++ 1 file changed, 9 insertions(+) create mode 100644 .server-changes/run-queue-sweeper-stale-entry.md diff --git a/.server-changes/run-queue-sweeper-stale-entry.md b/.server-changes/run-queue-sweeper-stale-entry.md new file mode 100644 index 00000000000..04fe688399b --- /dev/null +++ b/.server-changes/run-queue-sweeper-stale-entry.md @@ -0,0 +1,9 @@ +--- +area: webapp +type: fix +--- + +Concurrency sweeper now removes the message from the worker queue list +when acking marked runs, eliminating stale `messageKey` tombstones that +produced "Failed to dequeue message from worker queue" errors when +consumed by a later BLPOP. From 0a1199af2e20378aaf2a5bbd90378940189432e9 Mon Sep 17 00:00:00 2001 From: Dan Sutton Date: Tue, 5 May 2026 15:39:11 +0100 Subject: [PATCH 4/4] test(run-engine): assert worker queue list state instead of spying on logger Address CodeRabbit feedback on concurrencySweeper.test.ts: - Replace vi.spyOn(logger, "error") with a peekAllOnWorkerQueue assertion. The state-based check proves the same invariant (no tombstone exists, so no future BLPOP can fail) and matches the repo's "never mock anything" testing convention. - Update the stale inline comment that said the sweeper acks with removeFromWorkerQueue: false; the implementation now uses true. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../tests/concurrencySweeper.test.ts | 31 +++++++------------ 1 file changed, 12 insertions(+), 19 deletions(-) diff --git a/internal-packages/run-engine/src/run-queue/tests/concurrencySweeper.test.ts b/internal-packages/run-engine/src/run-queue/tests/concurrencySweeper.test.ts index 22bd7b483d4..07c5890d046 100644 --- a/internal-packages/run-engine/src/run-queue/tests/concurrencySweeper.test.ts +++ b/internal-packages/run-engine/src/run-queue/tests/concurrencySweeper.test.ts @@ -177,19 +177,17 @@ describe("RunQueue Concurrency Sweeper", () => { ); // When the sweeper acks a run whose messageKey value is still sitting on the worker - // queue list (e.g. fast-path enqueued, never BLPOP'd), it deletes the message body but - // leaves a stale tombstone on the list. The next BLPOP returns that tombstone, GET - // returns nil, and the dequeue path logs "Failed to dequeue message from worker queue". + // queue list (e.g. fast-path enqueued, never BLPOP'd), it must remove the entry from + // the list as well as deleting the message body. Otherwise the list keeps a stale + // tombstone — the next BLPOP returns the messageKey, GET returns nil, and the dequeue + // path logs "Failed to dequeue message from worker queue". redisTest( - "should not produce 'Failed to dequeue message from worker queue' after sweeper acks a fast-path-enqueued run", + "should clear the worker queue list when sweeper acks a fast-path-enqueued run", async ({ redisContainer }) => { let enableConcurrencySweeper = false; - const logger = new Logger("RunQueue", "debug"); - const errorSpy = vi.spyOn(logger, "error"); const queue = new RunQueue({ ...testOptions, - logger, logLevel: "debug", queueSelectionStrategy: new FairQueueSelectionStrategy({ redis: { @@ -242,31 +240,26 @@ describe("RunQueue Concurrency Sweeper", () => { expect(await queue.readMessage(messageDev.orgId, messageDev.runId)).toBeDefined(); // Sweeper now considers the run completed (test callback returns it), so - // processMarkedRun acks with removeFromWorkerQueue: false. + // processMarkedRun acks with removeFromWorkerQueue: true. enableConcurrencySweeper = true; await setTimeout(5_000); - // Sweeper has run: operational concurrency released, message body deleted. + // Sweeper has run: operational concurrency released, message body deleted, AND + // the messageKey value has been LREM'd from the worker queue list. Without the + // LREM, the list would still contain the messageKey, and the next BLPOP would + // pop the tombstone and emit "Failed to dequeue message from worker queue". expect(await queue.operationalCurrentConcurrencyOfEnvironment(authenticatedEnvDev)).toBe(0); expect(await queue.readMessage(messageDev.orgId, messageDev.runId)).toBeUndefined(); + expect(await queue.peekAllOnWorkerQueue(authenticatedEnvDev.id)).toEqual([]); - // Trigger a blocking dequeue with a short timeout — production uses blockingPop:true, - // which is the only path that emits this error log. + // A subsequent blocking dequeue finds nothing — no real message and no tombstone. const dequeued = await queue.dequeueMessageFromWorkerQueue( "test_consumer", authenticatedEnvDev.id, { blockingPop: true, blockingPopTimeoutSeconds: 2 } ); expect(dequeued).toBeUndefined(); - - // BUG: the dequeue path logs the exact Sentry-visible error. Match the message and - // the structured fields one-to-one with what TRIGGER-CLOUD-VC reports. - const failedDequeueErrors = errorSpy.mock.calls.filter( - ([msg]) => msg === "Failed to dequeue message from worker queue" - ); - expect(failedDequeueErrors).toHaveLength(0); } finally { - errorSpy.mockRestore(); await queue.quit(); } }