Skip to content

Commit c7636c1

Browse files
committed
Revert "perf(mollifier): fire-and-forget buffer.accept on the hot path"
This reverts commit 4135aca.
1 parent 825e421 commit c7636c1

2 files changed

Lines changed: 30 additions & 33 deletions

File tree

apps/webapp/app/runEngine/services/triggerTask.server.ts

Lines changed: 25 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -398,39 +398,36 @@ export class RunEngineTriggerTaskService {
398398
createdAt: new Date(),
399399
});
400400

401-
// Fire-and-forget. The buffer entry is audit-only in
402-
// Phase 1 — engine.trigger below is the primary write path,
403-
// so the customer must never wait on Redis here. A slow or
404-
// unreachable buffer adds zero latency to the hot path;
405-
// worst case the audit record is lost and we see a gap
406-
// between `mollifier.would_mollify` (gate decision) and
407-
// `mollifier.drained` (drainer). Errors land on the same
408-
// log line as before so operators can still alert on them.
409-
const serialisedPayload = serialiseSnapshot(canonicalPayload);
410-
void buffer
411-
.accept({
401+
try {
402+
const serialisedPayload = serialiseSnapshot(canonicalPayload);
403+
await buffer.accept({
412404
runId: runFriendlyId,
413405
envId: environment.id,
414406
orgId: environment.organizationId,
415407
payload: serialisedPayload,
416-
})
417-
.then(() => {
418-
logger.info("mollifier.buffered", {
419-
runId: runFriendlyId,
420-
envId: environment.id,
421-
orgId: environment.organizationId,
422-
taskId,
423-
payloadBytes: serialisedPayload.length,
424-
});
425-
})
426-
.catch((err) => {
427-
logger.error("mollifier.buffer_accept_failed", {
428-
runId: runFriendlyId,
429-
envId: environment.id,
430-
taskId,
431-
err: err instanceof Error ? err.message : String(err),
432-
});
433408
});
409+
// Light log on the hot path — keep this synchronous work
410+
// O(1) per trigger. The drainer computes the payload hash
411+
// off-path; operators correlate `mollifier.buffered` →
412+
// `mollifier.drained` by runId.
413+
logger.info("mollifier.buffered", {
414+
runId: runFriendlyId,
415+
envId: environment.id,
416+
orgId: environment.organizationId,
417+
taskId,
418+
payloadBytes: serialisedPayload.length,
419+
});
420+
} catch (err) {
421+
// Fail-open: buffer write must never block the customer's
422+
// trigger. engine.trigger below is the primary write path
423+
// in Phase 1 — the customer still gets a valid run.
424+
logger.error("mollifier.buffer_accept_failed", {
425+
runId: runFriendlyId,
426+
envId: environment.id,
427+
taskId,
428+
err: err instanceof Error ? err.message : String(err),
429+
});
430+
}
434431
}
435432
}
436433

apps/webapp/test/engine/triggerTask.test.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1442,11 +1442,11 @@ describe("RunEngineTriggerTaskService", () => {
14421442
// Phase 2's responsibility once the buffer becomes the primary write
14431443
// — at that point we add the mollifier-specific idempotency index.
14441444
//
1445-
// This test pins the invariant: buffer.accept is scheduled (fire-and-
1446-
// forget) BEFORE engine.trigger runs, and engine.trigger failure does
1447-
// NOT roll back the buffer write. Any future change that reverses the
1448-
// order or adds a silent rollback will fail this assertion and force a
1449-
// design decision rather than a silent behaviour change.
1445+
// This test pins the current ordering: buffer.accept fires synchronously
1446+
// BEFORE engine.trigger, and engine.trigger failure does NOT roll back
1447+
// the buffer write. Any future change that reverses the order or adds
1448+
// a silent rollback will fail this assertion and force a design
1449+
// decision rather than a silent behaviour change.
14501450

14511451
const engine = new RunEngine({
14521452
prisma,

0 commit comments

Comments
 (0)