Skip to content

Commit c31eb22

Browse files
d-csclaude
andcommitted
fix(mollifier): pipeline per-tick org→env fan-out and reconcile shutdown deadlines
Two correctness/perf fixes on top of the phase-2 drainer: 1. `runOnce` was awaiting `listEnvsForOrg` serially before any pop ran. At the default `maxOrgsPerTick=500` and a ~1ms RTT, that's a ~500ms per-tick latency floor before `pLimit` even sees work. `Promise.all` over the org slice lets ioredis auto-pipeline the SMEMBERS into a single round-trip. Order is preserved so the org→envs pairing stays deterministic and `pickEnvForOrg` still rotates per org. 2. The SIGTERM handler is sync fire-and-forget: `drainer.stop({timeoutMs})` returns a promise that keeps the loop alive, but in cluster mode the primary process runs its own `GRACEFUL_SHUTDOWN_TIMEOUT` and will hit `process.exit(0)` independently. If the drainer's deadline exceeds the primary's, the drainer's "log a warning on timeout" turns into "hard exit with no log". Assert at boot that `MOLLIFIER_DRAIN_SHUTDOWN_TIMEOUT_MS <= GRACEFUL_SHUTDOWN_TIMEOUT - 1s` so a misconfig fails loud instead of disappearing at shutdown. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 5163a65 commit c31eb22

2 files changed

Lines changed: 33 additions & 6 deletions

File tree

apps/webapp/app/services/worker.server.ts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,26 @@ export async function init() {
142142
try {
143143
const drainer = getMollifierDrainer();
144144
if (drainer && !global.__mollifierShutdownRegistered__) {
145+
// The SIGTERM handler is sync fire-and-forget: it kicks off
146+
// `drainer.stop(...)` and returns. The unresolved promise keeps the
147+
// event loop alive, but in cluster mode the primary process runs its
148+
// own graceful-shutdown timer (`GRACEFUL_SHUTDOWN_TIMEOUT`) and will
149+
// call `process.exit(0)` independently. If the drainer's deadline
150+
// exceeds the primary's, the drainer gets cut off mid-wait — which
151+
// turns "log a warning on timeout" into "hard exit with no log".
152+
// Reconcile the two timeouts at boot rather than discovering the
153+
// misconfig from a missing warning at shutdown. Margin gives the
154+
// primary room to do its own teardown after the drainer settles.
155+
const SHUTDOWN_MARGIN_MS = 1_000;
156+
if (
157+
env.MOLLIFIER_DRAIN_SHUTDOWN_TIMEOUT_MS >=
158+
env.GRACEFUL_SHUTDOWN_TIMEOUT - SHUTDOWN_MARGIN_MS
159+
) {
160+
throw new Error(
161+
`MOLLIFIER_DRAIN_SHUTDOWN_TIMEOUT_MS (${env.MOLLIFIER_DRAIN_SHUTDOWN_TIMEOUT_MS}) must be at least ${SHUTDOWN_MARGIN_MS}ms below GRACEFUL_SHUTDOWN_TIMEOUT (${env.GRACEFUL_SHUTDOWN_TIMEOUT}); otherwise the primary's hard exit shadows the drainer's deadline.`,
162+
);
163+
}
164+
145165
// The drainer owns a polling loop and a Redis client; let it drain
146166
// in-flight pops on shutdown rather than tearing the process down
147167
// mid-handler. `init()` is called per request from entry.server.tsx,

packages/redis-worker/src/mollifier/drainer.ts

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -74,13 +74,20 @@ export class MollifierDrainer<TPayload = unknown> {
7474

7575
const orgSlice = this.takeOrgSlice(orgs);
7676

77-
// For each picked org, pick one env from its active-envs set. The
78-
// listEnvsForOrg calls are independent and could be parallelised; we
79-
// do them sequentially for simplicity since they're each a fast
80-
// SMEMBERS. The actual pops happen concurrently below.
77+
// Fan the per-org SMEMBERS out in a single pipelined round-trip. Serial
78+
// awaits would otherwise add `orgSlice.length × RTT` of dead time before
79+
// pops start — at the default `maxOrgsPerTick=500` and a ~1ms ElastiCache
80+
// RTT that's a ~500ms per-tick floor. ioredis auto-pipelines concurrent
81+
// commands into one batch, so the burst is cheap; SMEMBERS on a small set
82+
// is O(N) per org and trivial at this scale. `Promise.all` preserves
83+
// order, so the org→envs pairing below stays deterministic.
84+
const envsByOrg = await Promise.all(
85+
orgSlice.map((orgId) => this.buffer.listEnvsForOrg(orgId)),
86+
);
8187
const targets: string[] = [];
82-
for (const orgId of orgSlice) {
83-
const envsForOrg = await this.buffer.listEnvsForOrg(orgId);
88+
for (let i = 0; i < orgSlice.length; i++) {
89+
const orgId = orgSlice[i]!;
90+
const envsForOrg = envsByOrg[i]!;
8491
if (envsForOrg.length === 0) continue;
8592
const envId = this.pickEnvForOrg(orgId, envsForOrg);
8693
targets.push(envId);

0 commit comments

Comments
 (0)