Skip to content

Commit b7e2655

Browse files
committed
refactor(mollifier): align drainer stop semantics with FairQueue / BatchQueue
The MollifierDrainer's stop() was polling `isRunning` every 20ms until the loop exited, which differs from the codebase's convention for similar polling loops (FairQueue, BatchQueue both hold the loop promise as a field and await it directly on stop). Switch to the same pattern: store the loop promise on start(), then in stop() race it against the timeout via Promise.race. With no timeout we just await the loop directly. With a timeout the warn-and-return behaviour is unchanged. No polling, no separate `isRunning` poll loop. Behaviour is identical to the previous implementation, including the hung-handler timeout path (covered by the existing "stop returns after timeoutMs even if a handler is hung" test).
1 parent f91cbf2 commit b7e2655

1 file changed

Lines changed: 22 additions & 12 deletions

File tree

packages/redis-worker/src/mollifier/drainer.ts

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ export class MollifierDrainer<TPayload = unknown> {
4848
private envCursor = 0;
4949
private isRunning = false;
5050
private stopping = false;
51+
private loopPromise: Promise<void> | null = null;
5152

5253
constructor(options: MollifierDrainerOptions<TPayload>) {
5354
this.buffer = options.buffer;
@@ -82,22 +83,31 @@ export class MollifierDrainer<TPayload = unknown> {
8283
if (this.isRunning) return;
8384
this.isRunning = true;
8485
this.stopping = false;
85-
void this.loop();
86+
this.loopPromise = this.loop();
8687
}
8788

89+
// Signal the loop to exit (`stopping = true`) and wait for it. With no
90+
// timeout, wait indefinitely for the in-flight `runOnce` and its handlers
91+
// to settle — same semantic as FairQueue / BatchQueue's `stop()`. With a
92+
// timeout, race the loop promise against a deadline so a hung handler
93+
// can't wedge the process past its termination grace period.
8894
async stop(options: { timeoutMs?: number } = {}): Promise<void> {
89-
if (!this.isRunning) return;
95+
if (!this.isRunning || !this.loopPromise) return;
9096
this.stopping = true;
91-
const deadline = options.timeoutMs != null ? Date.now() + options.timeoutMs : Infinity;
92-
while (this.isRunning) {
93-
if (Date.now() >= deadline) {
94-
this.logger.warn(
95-
"MollifierDrainer.stop: deadline exceeded; returning while loop iteration is in flight",
96-
{ timeoutMs: options.timeoutMs },
97-
);
98-
return;
99-
}
100-
await this.delay(20);
97+
if (options.timeoutMs == null) {
98+
await this.loopPromise;
99+
return;
100+
}
101+
const timeoutSentinel = Symbol("mollifier.stop.timeout");
102+
const winner = await Promise.race([
103+
this.loopPromise.then(() => "done" as const),
104+
this.delay(options.timeoutMs).then(() => timeoutSentinel),
105+
]);
106+
if (winner === timeoutSentinel) {
107+
this.logger.warn(
108+
"MollifierDrainer.stop: deadline exceeded; returning while loop iteration is in flight",
109+
{ timeoutMs: options.timeoutMs },
110+
);
101111
}
102112
}
103113

0 commit comments

Comments
 (0)