Skip to content

Commit 452ebda

Browse files
committed
feat(mollifier): trigger burst smoothing — Phase 1 (trip evaluator + dual-write monitoring + drainer ack loop)
Phase 1 of the trigger-burst smoothing initiative. Adds the A-side trip evaluator (atomic Lua sliding-window per env) and wires it into the trigger hot path. When the per-org mollifierEnabled feature flag is on AND the evaluator says divert, the canonical replay payload is buffered to Redis (via buffer.accept) AND the trigger continues through engine.trigger — i.e. dual-write. The drainer pops + acks (no-op handler) to prove the dequeue mechanism works end-to-end. Operators audit by joining mollifier.buffered (write) and mollifier.drained (consume) logs by runId. Buffer primitives hardened: - accept is idempotent on duplicate runId (Lua EXISTS guard) - pop skips orphan queue references (entry HASH TTL'd while runId queued) - fail no-ops on missing entry (no partial FAILED hash leak) - mollifier:envs set pruned on draining pop, restored on requeue - 16-row truth-table test enumerates the gate cascade - BufferedTriggerPayload defines the canonical replay shape Phase 2 will use to invoke engine.trigger - payload hash for audit-equivalence computed off the hot path (in the drainer) to avoid CPU during a spike Regression tests in apps/webapp/test/engine/triggerTask.test.ts pin the mollifier integration: - validation throws BEFORE the gate runs (no orphan buffer write on rejected triggers) - mollify dual-write happy path (Postgres + Redis both reflect the run) - pass_through path does NOT call buffer.accept - engine.trigger throwing AFTER buffer.accept leaves an orphan (documented behaviour — drainer auto-cleans; audit-trail surfaces it) - idempotency-key match short-circuits BEFORE the gate is consulted - debounce match produces an orphan (documented behaviour — Phase 2 must lift handleDebounce upfront before buffer.accept) Behaviour with MOLLIFIER_ENABLED=0 (default) is byte-identical to main. With MOLLIFIER_ENABLED=1 and the flag off, only mollifier.would_mollify logs fire (no buffer state). With the flag on, dual-write activates. Includes two opt-in *.fuzz.test.ts suites (gated on FUZZ=1) that randomise operation sequences against evaluateTrip and the drainer to find timing edges. They are clearly marked TEMPORARY in their headers.
1 parent f2ab40b commit 452ebda

17 files changed

Lines changed: 2464 additions & 119 deletions

.changeset/mollifier-redis-worker-primitives.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,4 @@
22
"@trigger.dev/redis-worker": patch
33
---
44

5-
Add MollifierBuffer and MollifierDrainer primitives for burst smoothing (scaffolding only — not active without webapp wiring).
5+
Add MollifierBuffer (with `accept`, `pop`, `ack`, `requeue`, `fail`, and `evaluateTrip`) and MollifierDrainer primitives for trigger burst smoothing. `evaluateTrip` is an atomic Lua sliding-window trip evaluator used by the webapp gate to detect per-env trigger bursts. Webapp shadow-mode logging is wired; buffer writes and drainer activation are deferred to a follow-up.
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
area: webapp
3+
type: feature
4+
---
5+
6+
Wire the real A-side trip evaluator into the mollifier gate. With `MOLLIFIER_SHADOW_MODE=1`, each trigger evaluates the per-env sliding-window rate counter; bursts above threshold are logged as `mollifier.would_mollify` (no buffer write — phase 3 activates that). Emits the `mollifier.decisions` OTel counter. Behaviour with `MOLLIFIER_ENABLED=0` (default) is unchanged.

apps/webapp/app/runEngine/services/triggerTask.server.ts

Lines changed: 92 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,21 @@ import type {
4040
TriggerTaskRequest,
4141
TriggerTaskValidator,
4242
} from "../types";
43-
import { evaluateGate } from "~/v3/mollifier/mollifierGate.server";
43+
import {
44+
evaluateGate as defaultEvaluateGate,
45+
type GateOutcome,
46+
} from "~/v3/mollifier/mollifierGate.server";
47+
import { getMollifierBuffer as defaultGetMollifierBuffer } from "~/v3/mollifier/mollifierBuffer.server";
48+
import { buildBufferedTriggerPayload } from "~/v3/mollifier/bufferedTriggerPayload.server";
49+
import { serialiseSnapshot, type MollifierBuffer } from "@trigger.dev/redis-worker";
4450
import { QueueSizeLimitExceededError, ServiceValidationError } from "~/v3/services/common.server";
4551

52+
export type MollifierEvaluateGate = (
53+
inputs: { envId: string; orgId: string; taskId: string },
54+
) => Promise<GateOutcome>;
55+
56+
export type MollifierGetBuffer = () => MollifierBuffer | null;
57+
4658
class NoopTriggerRacepointSystem implements TriggerRacepointSystem {
4759
async waitForRacepoint(options: { racepoint: TriggerRacepoints; id: string }): Promise<void> {
4860
return;
@@ -60,6 +72,11 @@ export class RunEngineTriggerTaskService {
6072
private readonly traceEventConcern: TraceEventConcern;
6173
private readonly triggerRacepointSystem: TriggerRacepointSystem;
6274
private readonly metadataMaximumSize: number;
75+
// Mollifier hooks are DI'd so tests can drive the call-site's mollify branch
76+
// deterministically (stub the gate to return mollify, inject a real or fake
77+
// buffer). In production both default to the live module-level singletons.
78+
private readonly evaluateGate: MollifierEvaluateGate;
79+
private readonly getMollifierBuffer: MollifierGetBuffer;
6380

6481
constructor(opts: {
6582
prisma: PrismaClientOrTransaction;
@@ -72,6 +89,8 @@ export class RunEngineTriggerTaskService {
7289
tracer: Tracer;
7390
metadataMaximumSize: number;
7491
triggerRacepointSystem?: TriggerRacepointSystem;
92+
evaluateGate?: MollifierEvaluateGate;
93+
getMollifierBuffer?: MollifierGetBuffer;
7594
}) {
7695
this.prisma = opts.prisma;
7796
this.engine = opts.engine;
@@ -83,6 +102,8 @@ export class RunEngineTriggerTaskService {
83102
this.traceEventConcern = opts.traceEventConcern;
84103
this.metadataMaximumSize = opts.metadataMaximumSize;
85104
this.triggerRacepointSystem = opts.triggerRacepointSystem ?? new NoopTriggerRacepointSystem();
105+
this.evaluateGate = opts.evaluateGate ?? defaultEvaluateGate;
106+
this.getMollifierBuffer = opts.getMollifierBuffer ?? defaultGetMollifierBuffer;
86107
}
87108

88109
public async call({
@@ -316,15 +337,11 @@ export class RunEngineTriggerTaskService {
316337
rootScheduleId: parentAnnotations?.rootScheduleId || options.scheduleId || undefined,
317338
};
318339

319-
const mollifierOutcome = await evaluateGate({
340+
const mollifierOutcome = await this.evaluateGate({
320341
envId: environment.id,
321342
orgId: environment.organizationId,
343+
taskId,
322344
});
323-
if (mollifierOutcome.action === "mollify") {
324-
throw new Error(
325-
"MollifierGate.mollify reached in phase 1 — should be unreachable until phase 3 wiring lands",
326-
);
327-
}
328345

329346
try {
330347
return await this.traceEventConcern.traceRun(
@@ -338,6 +355,74 @@ export class RunEngineTriggerTaskService {
338355

339356
const payloadPacket = await this.payloadProcessor.process(triggerRequest);
340357

358+
// Phase 1 dual-write: if the org has the mollifier feature flag
359+
// enabled and the per-env trip evaluator says divert, write the
360+
// canonical replay payload to the buffer AND continue through
361+
// engine.trigger as normal. The buffer entry is an audit/preview
362+
// copy; the drainer's no-op handler consumes it to prove the
363+
// dequeue mechanism works. Phase 2 will replace engine.trigger
364+
// (below) with a synthesised 200 response and rely on the
365+
// drainer to perform the Postgres write via replay.
366+
if (mollifierOutcome.action === "mollify") {
367+
const buffer = this.getMollifierBuffer();
368+
if (buffer) {
369+
const canonicalPayload = buildBufferedTriggerPayload({
370+
runFriendlyId,
371+
taskId,
372+
envId: environment.id,
373+
envType: environment.type,
374+
envSlug: environment.slug,
375+
orgId: environment.organizationId,
376+
orgSlug: environment.organization.slug,
377+
projectId: environment.projectId,
378+
projectRef: environment.project.externalRef,
379+
body,
380+
idempotencyKey: idempotencyKey ?? null,
381+
idempotencyKeyExpiresAt: idempotencyKey
382+
? idempotencyKeyExpiresAt ?? null
383+
: null,
384+
tags,
385+
parentRunFriendlyId: parentRun?.friendlyId ?? null,
386+
traceContext: event.traceContext,
387+
triggerSource,
388+
triggerAction,
389+
serviceOptions: options,
390+
createdAt: new Date(),
391+
});
392+
393+
try {
394+
const serialisedPayload = serialiseSnapshot(canonicalPayload);
395+
await buffer.accept({
396+
runId: runFriendlyId,
397+
envId: environment.id,
398+
orgId: environment.organizationId,
399+
payload: serialisedPayload,
400+
});
401+
// Light log on the hot path — keep this synchronous work
402+
// O(1) per trigger. The drainer computes the payload hash
403+
// off-path; operators correlate `mollifier.buffered` →
404+
// `mollifier.drained` by runId.
405+
logger.info("mollifier.buffered", {
406+
runId: runFriendlyId,
407+
envId: environment.id,
408+
orgId: environment.organizationId,
409+
taskId,
410+
payloadBytes: serialisedPayload.length,
411+
});
412+
} catch (err) {
413+
// Fail-open: buffer write must never block the customer's
414+
// trigger. engine.trigger below is the primary write path
415+
// in Phase 1 — the customer still gets a valid run.
416+
logger.error("mollifier.buffer_accept_failed", {
417+
runId: runFriendlyId,
418+
envId: environment.id,
419+
taskId,
420+
err: err instanceof Error ? err.message : String(err),
421+
});
422+
}
423+
}
424+
}
425+
341426
const taskRun = await this.engine.trigger(
342427
{
343428
friendlyId: runFriendlyId,
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
import type { TriggerTaskRequestBody } from "@trigger.dev/core/v3";
2+
import type { TriggerTaskServiceOptions } from "~/v3/services/triggerTask.server";
3+
4+
// Canonical payload shape written to the mollifier buffer when the gate
5+
// decides to mollify a trigger. Phase 1 ALSO calls engine.trigger directly
6+
// (dual-write) so this is currently an audit/preview record. Phase 2 will
7+
// make the buffer the primary write path: the drainer's handler will read
8+
// this payload and replay it through engine.trigger to create the run in
9+
// Postgres, and read-fallback endpoints will synthesise a Run view from it
10+
// while it is still QUEUED.
11+
//
12+
// CONTRACT: this shape must contain everything needed for Phase 2's
13+
// drainer-replay to reconstruct an equivalent engine.trigger call. Phase 1
14+
// emits it to logs; Phase 2 will serialise it into Redis and rebuild it on
15+
// the drain side. Keep it serialisable — no functions, no class instances.
16+
export type BufferedTriggerPayload = {
17+
runFriendlyId: string;
18+
19+
// Routing identifiers — let the drainer re-fetch full AuthenticatedEnvironment
20+
// at replay time rather than embedding it in the payload.
21+
envId: string;
22+
envType: string;
23+
envSlug: string;
24+
orgId: string;
25+
orgSlug: string;
26+
projectId: string;
27+
projectRef: string;
28+
29+
// Task identifier — looked up against the locked BackgroundWorkerTask
30+
// at replay time to recover task-defaults.
31+
taskId: string;
32+
33+
// Customer-supplied trigger body (payload, options, context).
34+
body: TriggerTaskRequestBody;
35+
36+
// Resolved values from upstream concerns. The drainer should NOT re-resolve
37+
// these — that would create a second idempotency-key check, etc.
38+
idempotencyKey: string | null;
39+
idempotencyKeyExpiresAt: string | null;
40+
tags: string[];
41+
42+
// Parent/root linkage for nested triggers.
43+
parentRunFriendlyId: string | null;
44+
45+
// Trace context — propagates the original triggering span across the
46+
// buffer→drain boundary so the run's lifecycle stays under one trace.
47+
traceContext: Record<string, unknown>;
48+
49+
// Annotations + service options that influence routing/replay.
50+
triggerSource: string;
51+
triggerAction: string;
52+
serviceOptions: TriggerTaskServiceOptions;
53+
54+
// Wall-clock instants relevant to the run.
55+
createdAt: string;
56+
};
57+
58+
// Assemble the canonical payload from the inputs available at the point
59+
// `evaluateGate` returns "mollify" in `RunEngineTriggerTaskService.call`.
60+
// All fields must be derivable from data already in scope at that call site;
61+
// nothing should require an extra DB lookup.
62+
export function buildBufferedTriggerPayload(input: {
63+
runFriendlyId: string;
64+
taskId: string;
65+
envId: string;
66+
envType: string;
67+
envSlug: string;
68+
orgId: string;
69+
orgSlug: string;
70+
projectId: string;
71+
projectRef: string;
72+
body: TriggerTaskRequestBody;
73+
idempotencyKey: string | null;
74+
idempotencyKeyExpiresAt: Date | null;
75+
tags: string[];
76+
parentRunFriendlyId: string | null;
77+
traceContext: Record<string, unknown>;
78+
triggerSource: string;
79+
triggerAction: string;
80+
serviceOptions: TriggerTaskServiceOptions;
81+
createdAt: Date;
82+
}): BufferedTriggerPayload {
83+
return {
84+
runFriendlyId: input.runFriendlyId,
85+
envId: input.envId,
86+
envType: input.envType,
87+
envSlug: input.envSlug,
88+
orgId: input.orgId,
89+
orgSlug: input.orgSlug,
90+
projectId: input.projectId,
91+
projectRef: input.projectRef,
92+
taskId: input.taskId,
93+
body: input.body,
94+
idempotencyKey: input.idempotencyKey,
95+
idempotencyKeyExpiresAt: input.idempotencyKeyExpiresAt
96+
? input.idempotencyKeyExpiresAt.toISOString()
97+
: null,
98+
tags: input.tags,
99+
parentRunFriendlyId: input.parentRunFriendlyId,
100+
traceContext: input.traceContext,
101+
triggerSource: input.triggerSource,
102+
triggerAction: input.triggerAction,
103+
serviceOptions: input.serviceOptions,
104+
createdAt: input.createdAt.toISOString(),
105+
};
106+
}
Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
1-
import { MollifierDrainer } from "@trigger.dev/redis-worker";
1+
import { createHash } from "node:crypto";
2+
import { MollifierDrainer, serialiseSnapshot } from "@trigger.dev/redis-worker";
23
import { env } from "~/env.server";
34
import { logger } from "~/services/logger.server";
45
import { singleton } from "~/utils/singleton";
56
import { getMollifierBuffer } from "./mollifierBuffer.server";
7+
import type { BufferedTriggerPayload } from "./bufferedTriggerPayload.server";
68

7-
function initializeMollifierDrainer(): MollifierDrainer {
9+
function initializeMollifierDrainer(): MollifierDrainer<BufferedTriggerPayload> {
810
const buffer = getMollifierBuffer();
911
if (!buffer) {
1012
// Should be unreachable: getMollifierDrainer() guards on the same env flag as getMollifierBuffer().
@@ -16,21 +18,46 @@ function initializeMollifierDrainer(): MollifierDrainer {
1618
maxAttempts: env.MOLLIFIER_DRAIN_MAX_ATTEMPTS,
1719
});
1820

19-
const drainer = new MollifierDrainer({
21+
// Phase 1 handler: no-op ack. The trigger has ALREADY been written to
22+
// Postgres via engine.trigger (dual-write at the call site). Popping +
23+
// acking here proves the dequeue mechanism works end-to-end without
24+
// duplicating the work. Phase 2 will replace this with an engine.trigger
25+
// replay that performs the actual Postgres write.
26+
const drainer = new MollifierDrainer<BufferedTriggerPayload>({
2027
buffer,
21-
handler: async () => {
22-
throw new Error("MollifierDrainer phase 1: no handler wired");
28+
handler: async (input) => {
29+
// Hash the (re-serialised, canonical) payload on the drain side rather
30+
// than on the trigger hot path. Burst-time CPU stays with engine.trigger;
31+
// the drainer is the natural place for the audit-equivalence checksum.
32+
// Re-serialisation is identity for the BufferedTriggerPayload shape
33+
// (only strings/numbers/plain objects), so this hash matches what the
34+
// call site wrote into Redis.
35+
const reserialised = serialiseSnapshot(input.payload);
36+
const payloadHash = createHash("sha256").update(reserialised).digest("hex");
37+
logger.info("mollifier.drained", {
38+
runId: input.runId,
39+
envId: input.envId,
40+
orgId: input.orgId,
41+
taskId: input.payload.taskId,
42+
attempts: input.attempts,
43+
ageMs: Date.now() - input.createdAt.getTime(),
44+
payloadBytes: reserialised.length,
45+
payloadHash,
46+
});
2347
},
2448
concurrency: env.MOLLIFIER_DRAIN_CONCURRENCY,
2549
maxAttempts: env.MOLLIFIER_DRAIN_MAX_ATTEMPTS,
50+
// A no-op handler shouldn't throw, but if something does (e.g. an
51+
// unexpected deserialise failure), don't loop — let it FAIL terminally
52+
// so the entry is observable in metrics.
2653
isRetryable: () => false,
2754
});
2855

2956
drainer.start();
3057
return drainer;
3158
}
3259

33-
export function getMollifierDrainer(): MollifierDrainer | null {
60+
export function getMollifierDrainer(): MollifierDrainer<BufferedTriggerPayload> | null {
3461
if (env.MOLLIFIER_ENABLED !== "1") return null;
3562
return singleton("mollifierDrainer", initializeMollifierDrainer);
3663
}

0 commit comments

Comments
 (0)