Skip to content

Commit f91cbf2

Browse files
committed
fix(mollifier): bound drainer per-tick env fan-out via maxEnvsPerTick
mollifier:envs is a Redis SET that grows with the count of envs that currently have buffered entries. Under normal operation that's small, but an extended drainer outage can leave entries piled up across thousands of envs — at which point runOnce would queue one processOneFromEnv per env through pLimit, ballooning per-tick latency and event-loop queue depth. Cap per-tick fan-out at MOLLIFIER_DRAIN_MAX_ENVS_PER_TICK (default 500). When the set fits within the cap, behaviour is unchanged (take all, rotate cursor by 1 for fairness). When the set exceeds the cap, take a rotating slice and advance the cursor by the slice size so successive ticks sweep through the full set. Tests use a stub buffer to drive listEnvs() deterministically with thousands of envs without provisioning a real Redis.
1 parent 5f06709 commit f91cbf2

5 files changed

Lines changed: 144 additions & 5 deletions

File tree

.changeset/mollifier-redis-worker-primitives.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,5 @@
55
Add MollifierBuffer (with `accept`, `pop`, `ack`, `requeue`, `fail`, and `evaluateTrip`) and MollifierDrainer primitives for trigger burst smoothing. `evaluateTrip` is an atomic Lua sliding-window trip evaluator used by the webapp gate to detect per-env trigger bursts. Phase 1 wires MollifierBuffer dual-write monitoring alongside the real trigger path and runs MollifierDrainer's pop/ack loop end-to-end with a no-op handler; full buffering and replayed drainer-side triggers land in later phases.
66

77
MollifierDrainer's polling loop now survives transient Redis errors. `processOneFromEnv` catches `buffer.pop()` failures so one env's hiccup doesn't poison the rest of the batch, and the loop wraps each `runOnce` in a try/catch with capped exponential backoff (up to 5s) instead of dying permanently on the first `listEnvs`/`pop` error.
8+
9+
MollifierDrainer accepts a new `maxEnvsPerTick` option (default 500) that bounds per-tick fan-out across the `mollifier:envs` SET. When the set grows beyond the cap (e.g. after an extended drainer outage left entries piled up across many envs), `runOnce` processes a rotating slice rather than queuing one `processOneFromEnv` job per env, and the cursor advances by the slice size so successive ticks sweep through the full set.

apps/webapp/app/env.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1049,6 +1049,7 @@ const EnvironmentSchema = z
10491049
MOLLIFIER_ENTRY_TTL_S: z.coerce.number().int().positive().default(600),
10501050
MOLLIFIER_DRAIN_MAX_ATTEMPTS: z.coerce.number().int().positive().default(3),
10511051
MOLLIFIER_DRAIN_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().positive().default(30_000),
1052+
MOLLIFIER_DRAIN_MAX_ENVS_PER_TICK: z.coerce.number().int().positive().default(500),
10521053

10531054
BATCH_TRIGGER_PROCESS_JOB_VISIBILITY_TIMEOUT_MS: z.coerce
10541055
.number()

apps/webapp/app/v3/mollifier/mollifierDrainer.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ function initializeMollifierDrainer(): MollifierDrainer<BufferedTriggerPayload>
5151
},
5252
concurrency: env.MOLLIFIER_DRAIN_CONCURRENCY,
5353
maxAttempts: env.MOLLIFIER_DRAIN_MAX_ATTEMPTS,
54+
maxEnvsPerTick: env.MOLLIFIER_DRAIN_MAX_ENVS_PER_TICK,
5455
// A no-op handler shouldn't throw, but if something does (e.g. an
5556
// unexpected deserialise failure), don't loop — let it FAIL terminally
5657
// so the entry is observable in metrics.

packages/redis-worker/src/mollifier/drainer.test.ts

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -326,6 +326,119 @@ describe("MollifierDrainer resilience to transient buffer errors", () => {
326326
});
327327
});
328328

329+
describe("MollifierDrainer per-tick env cap", () => {
330+
// Bounding fan-out prevents one runOnce from queuing thousands of
331+
// processOneFromEnv jobs when `mollifier:envs` is unexpectedly large.
332+
// These tests use a stub buffer so we can drive the env list count
333+
// deterministically without provisioning a real Redis with thousands
334+
// of envs.
335+
type StubBuffer = Partial<MollifierBuffer> & { [K in keyof MollifierBuffer]?: any };
336+
function makeStubBuffer(overrides: StubBuffer): MollifierBuffer {
337+
const base: StubBuffer = {
338+
listEnvs: async () => [],
339+
pop: async () => null,
340+
ack: async () => {},
341+
requeue: async () => {},
342+
fail: async () => true,
343+
getEntry: async () => null,
344+
close: async () => {},
345+
};
346+
return { ...base, ...overrides } as unknown as MollifierBuffer;
347+
}
348+
349+
it("processes at most maxEnvsPerTick envs per runOnce", async () => {
350+
const allEnvs = Array.from({ length: 20 }, (_, i) => `env_${i}`);
351+
const popped: string[] = [];
352+
const buffer = makeStubBuffer({
353+
listEnvs: async () => allEnvs,
354+
pop: async (envId: string) => {
355+
popped.push(envId);
356+
return null; // empty queue — runOnce records this as "empty"
357+
},
358+
});
359+
360+
const drainer = new MollifierDrainer({
361+
buffer,
362+
handler: async () => {},
363+
concurrency: 5,
364+
maxAttempts: 3,
365+
isRetryable: () => false,
366+
maxEnvsPerTick: 5,
367+
logger: new Logger("test-drainer", "log"),
368+
});
369+
370+
await drainer.runOnce();
371+
expect(popped).toHaveLength(5);
372+
});
373+
374+
it("rotates through the full set across successive ticks when sliced", async () => {
375+
const allEnvs = Array.from({ length: 12 }, (_, i) => `env_${i}`);
376+
const popped: string[] = [];
377+
const buffer = makeStubBuffer({
378+
listEnvs: async () => allEnvs,
379+
pop: async (envId: string) => {
380+
popped.push(envId);
381+
return null;
382+
},
383+
});
384+
385+
const drainer = new MollifierDrainer({
386+
buffer,
387+
handler: async () => {},
388+
concurrency: 4,
389+
maxAttempts: 3,
390+
isRetryable: () => false,
391+
maxEnvsPerTick: 4,
392+
logger: new Logger("test-drainer", "log"),
393+
});
394+
395+
// Three ticks = 12 / 4 → exactly one full sweep.
396+
await drainer.runOnce();
397+
await drainer.runOnce();
398+
await drainer.runOnce();
399+
400+
expect(new Set(popped)).toEqual(new Set(allEnvs));
401+
expect(popped).toHaveLength(12);
402+
});
403+
404+
it("takes all envs and rotates by 1 when the set fits within the cap", async () => {
405+
const allEnvs = ["env_a", "env_b", "env_c"];
406+
const popsPerTick: string[][] = [];
407+
let tick: string[] = [];
408+
const buffer = makeStubBuffer({
409+
listEnvs: async () => allEnvs,
410+
pop: async (envId: string) => {
411+
tick.push(envId);
412+
return null;
413+
},
414+
});
415+
416+
const drainer = new MollifierDrainer({
417+
buffer,
418+
handler: async () => {},
419+
concurrency: 3,
420+
maxAttempts: 3,
421+
isRetryable: () => false,
422+
maxEnvsPerTick: 100, // way above n
423+
logger: new Logger("test-drainer", "log"),
424+
});
425+
426+
for (let i = 0; i < 3; i++) {
427+
tick = [];
428+
await drainer.runOnce();
429+
popsPerTick.push(tick);
430+
}
431+
432+
// Every tick covers every env (because cap > n), but the head-of-line
433+
// env rotates by 1 each tick — preserves the original fairness behaviour.
434+
for (const popped of popsPerTick) {
435+
expect(new Set(popped)).toEqual(new Set(allEnvs));
436+
}
437+
expect(popsPerTick[0][0]).not.toEqual(popsPerTick[1][0]);
438+
expect(popsPerTick[1][0]).not.toEqual(popsPerTick[2][0]);
439+
});
440+
});
441+
329442
describe("MollifierDrainer.start/stop", () => {
330443
redisTest("start polls and processes, stop halts the loop", { timeout: 20_000 }, async ({ redisContainer }) => {
331444
const buffer = new MollifierBuffer({

packages/redis-worker/src/mollifier/drainer.ts

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,15 @@ export type MollifierDrainerOptions<TPayload> = {
1919
maxAttempts: number;
2020
isRetryable: (err: unknown) => boolean;
2121
pollIntervalMs?: number;
22+
// Cap on how many envs `runOnce` processes per tick. When the
23+
// `mollifier:envs` SET grows large (e.g. an extended drainer outage left
24+
// entries piled up across thousands of envs), an uncapped fan-out queues
25+
// one `processOneFromEnv` job per env through `pLimit`, ballooning
26+
// per-tick latency and event-loop queue depth. With this cap the
27+
// drainer rotates through the full set across multiple ticks instead.
28+
// Defaults to 500; size for "typical worst-case envs-with-pending-
29+
// entries" rather than total system env count.
30+
maxEnvsPerTick?: number;
2231
logger?: Logger;
2332
};
2433

@@ -33,6 +42,7 @@ export class MollifierDrainer<TPayload = unknown> {
3342
private readonly maxAttempts: number;
3443
private readonly isRetryable: (err: unknown) => boolean;
3544
private readonly pollIntervalMs: number;
45+
private readonly maxEnvsPerTick: number;
3646
private readonly logger: Logger;
3747
private readonly limit: ReturnType<typeof pLimit>;
3848
private envCursor = 0;
@@ -45,6 +55,7 @@ export class MollifierDrainer<TPayload = unknown> {
4555
this.maxAttempts = options.maxAttempts;
4656
this.isRetryable = options.isRetryable;
4757
this.pollIntervalMs = options.pollIntervalMs ?? 100;
58+
this.maxEnvsPerTick = options.maxEnvsPerTick ?? 500;
4859
this.logger = options.logger ?? new Logger("MollifierDrainer", "debug");
4960
this.limit = pLimit(options.concurrency);
5061
}
@@ -53,7 +64,7 @@ export class MollifierDrainer<TPayload = unknown> {
5364
const envs = await this.buffer.listEnvs();
5465
if (envs.length === 0) return { drained: 0, failed: 0 };
5566

56-
const ordered = this.rotate(envs);
67+
const ordered = this.takeRotatingSlice(envs);
5768

5869
const inflight: Promise<"drained" | "failed" | "empty">[] = [];
5970
for (const envId of ordered) {
@@ -131,10 +142,21 @@ export class MollifierDrainer<TPayload = unknown> {
131142
return new Promise((resolve) => setTimeout(resolve, ms));
132143
}
133144

134-
private rotate(envs: string[]): string[] {
135-
const start = this.envCursor % envs.length;
136-
this.envCursor = (this.envCursor + 1) % Math.max(envs.length, 1);
137-
return [...envs.slice(start), ...envs.slice(0, start)];
145+
// Take up to `maxEnvsPerTick` envs starting at the current cursor, with
146+
// wrap-around. When the full set fits within the cap we take everything
147+
// and advance the cursor by 1 — preserves the original head-of-line
148+
// fairness rotation. When we have to slice, we advance the cursor by the
149+
// slice size so successive ticks sweep through the full set rather than
150+
// re-processing the same prefix on each tick.
151+
private takeRotatingSlice(envs: string[]): string[] {
152+
const n = envs.length;
153+
const sliceSize = Math.min(this.maxEnvsPerTick, n);
154+
const start = this.envCursor % n;
155+
const advance = sliceSize < n ? sliceSize : 1;
156+
this.envCursor = (this.envCursor + advance) % Math.max(n, 1);
157+
const end = start + sliceSize;
158+
if (end <= n) return envs.slice(start, end);
159+
return [...envs.slice(start), ...envs.slice(0, end - n)];
138160
}
139161

140162
// A `pop()` failure for one env (e.g. a Redis hiccup mid-batch) must not

0 commit comments

Comments
 (0)