Skip to content

Commit 4135aca

Browse files
committed
perf(mollifier): fire-and-forget buffer.accept on the hot path
In Phase 1 the buffer entry is audit-only — engine.trigger below is the primary write path. Awaiting buffer.accept added Redis round-trip latency to every mollified trigger, which is the wrong tradeoff for an audit write. Switch to a fire-and-forget then/catch so a slow or unreachable Redis adds zero latency to the customer-facing path; the buffered/error log lines move to the microtask queue but still fire on the same lines operators alert on today. Updates the orphan-entry test docstring to reflect the scheduled (rather than synchronous) ordering — the invariant the test pins is unchanged.
1 parent a9400b1 commit 4135aca

2 files changed

Lines changed: 33 additions & 30 deletions

File tree

apps/webapp/app/runEngine/services/triggerTask.server.ts

Lines changed: 28 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -398,36 +398,39 @@ export class RunEngineTriggerTaskService {
398398
createdAt: new Date(),
399399
});
400400

401-
try {
402-
const serialisedPayload = serialiseSnapshot(canonicalPayload);
403-
await buffer.accept({
401+
// Fire-and-forget. The buffer entry is audit-only in
402+
// Phase 1 — engine.trigger below is the primary write path,
403+
// so the customer must never wait on Redis here. A slow or
404+
// unreachable buffer adds zero latency to the hot path;
405+
// worst case the audit record is lost and we see a gap
406+
// between `mollifier.would_mollify` (gate decision) and
407+
// `mollifier.drained` (drainer). Errors land on the same
408+
// log line as before so operators can still alert on them.
409+
const serialisedPayload = serialiseSnapshot(canonicalPayload);
410+
void buffer
411+
.accept({
404412
runId: runFriendlyId,
405413
envId: environment.id,
406414
orgId: environment.organizationId,
407415
payload: serialisedPayload,
416+
})
417+
.then(() => {
418+
logger.info("mollifier.buffered", {
419+
runId: runFriendlyId,
420+
envId: environment.id,
421+
orgId: environment.organizationId,
422+
taskId,
423+
payloadBytes: serialisedPayload.length,
424+
});
425+
})
426+
.catch((err) => {
427+
logger.error("mollifier.buffer_accept_failed", {
428+
runId: runFriendlyId,
429+
envId: environment.id,
430+
taskId,
431+
err: err instanceof Error ? err.message : String(err),
432+
});
408433
});
409-
// Light log on the hot path — keep this synchronous work
410-
// O(1) per trigger. The drainer computes the payload hash
411-
// off-path; operators correlate `mollifier.buffered` →
412-
// `mollifier.drained` by runId.
413-
logger.info("mollifier.buffered", {
414-
runId: runFriendlyId,
415-
envId: environment.id,
416-
orgId: environment.organizationId,
417-
taskId,
418-
payloadBytes: serialisedPayload.length,
419-
});
420-
} catch (err) {
421-
// Fail-open: buffer write must never block the customer's
422-
// trigger. engine.trigger below is the primary write path
423-
// in Phase 1 — the customer still gets a valid run.
424-
logger.error("mollifier.buffer_accept_failed", {
425-
runId: runFriendlyId,
426-
envId: environment.id,
427-
taskId,
428-
err: err instanceof Error ? err.message : String(err),
429-
});
430-
}
431434
}
432435
}
433436

apps/webapp/test/engine/triggerTask.test.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1442,11 +1442,11 @@ describe("RunEngineTriggerTaskService", () => {
14421442
// Phase 2's responsibility once the buffer becomes the primary write
14431443
// — at that point we add the mollifier-specific idempotency index.
14441444
//
1445-
// This test pins the current ordering: buffer.accept fires synchronously
1446-
// BEFORE engine.trigger, and engine.trigger failure does NOT roll back
1447-
// the buffer write. Any future change that reverses the order or adds
1448-
// a silent rollback will fail this assertion and force a design
1449-
// decision rather than a silent behaviour change.
1445+
// This test pins the invariant: buffer.accept is scheduled (fire-and-
1446+
// forget) BEFORE engine.trigger runs, and engine.trigger failure does
1447+
// NOT roll back the buffer write. Any future change that reverses the
1448+
// order or adds a silent rollback will fail this assertion and force a
1449+
// design decision rather than a silent behaviour change.
14501450

14511451
const engine = new RunEngine({
14521452
prisma,

0 commit comments

Comments
 (0)