Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
c00b148
feat: trigger mollifier phase 1 scaffolding
d-cs May 12, 2026
ae05184
feat(mollifier): trigger burst smoothing — Phase 1 (trip evaluator + …
d-cs May 12, 2026
31aefa1
chore(mollifier): address CodeRabbit review for phase-1 PR
d-cs May 14, 2026
f4bac2d
fix(mollifier): guard drainer shutdown registration against listener …
d-cs May 14, 2026
cfa6e9e
feat(mollifier): make resolveOrgFlag actually org-scoped via Organiza…
d-cs May 14, 2026
2dee88c
fix(mollifier): mock db.server in gate test to avoid eager prisma con…
d-cs May 14, 2026
4f5978a
chore(mollifier): address review follow-ups for phase-1 PR
d-cs May 14, 2026
6c55bf8
fix(mollifier): extend MollifierEvaluateGate input to carry orgFeatur…
d-cs May 14, 2026
74fe441
fix(mollifier): raise mollifierGate test timeout to 30s for postgresT…
d-cs May 14, 2026
2afbe12
fix(mollifier): keep trigger hot path DB-free and fail open on flag e…
d-cs May 14, 2026
8469561
fix(mollifier): bound drainer shutdown so a hung handler can't block …
d-cs May 14, 2026
d76bb9e
fix(mollifier): keep drainer loop alive across transient redis errors
d-cs May 14, 2026
f1efc41
fix(mollifier): add missing imports to readFallback.server.ts
d-cs May 14, 2026
275a5ba
chore(mollifier): fix misleading rate-counter comment + symmetric eva…
d-cs May 14, 2026
e699034
chore(mollifier): merge phase-1 and phase-2 server-changes into one file
d-cs May 14, 2026
e734490
chore(mollifier): drop fuzz tests to keep phase-1 PR focused
d-cs May 14, 2026
0bf53e7
chore(mollifier): drop drive-by enqueueSystem comment change
d-cs May 14, 2026
edd3250
chore(mollifier): rewrite server-changes note for external readers
d-cs May 14, 2026
1e087e2
chore(mollifier): clarify server-changes note — monitoring only, no d…
d-cs May 14, 2026
7d74b8a
refactor(mollifier): move DI seam types to the modules that define them
d-cs May 14, 2026
5f06709
fix(mollifier): degrade to disabled when redis host is unset, no main…
d-cs May 14, 2026
f91cbf2
fix(mollifier): bound drainer per-tick env fan-out via maxEnvsPerTick
d-cs May 14, 2026
b7e2655
refactor(mollifier): align drainer stop semantics with FairQueue / Ba…
d-cs May 14, 2026
24407fa
fix(mollifier): preserve env fairness when drainer slices
d-cs May 14, 2026
adc29fc
test(mollifier): pin no-starvation property for light env behind heav…
d-cs May 14, 2026
cb8a54d
fix(mollifier): typecheck — destructure popsPerTick to satisfy noUnch…
d-cs May 15, 2026
3daee33
test(mollifier): cover six previously-untested drainer behaviours
d-cs May 15, 2026
2cad05f
feat(mollifier): two-level org→env rotation in drainer for tenant-lev…
d-cs May 15, 2026
2348bf2
chore(mollifier): rewrite changeset as feature intro (drop delta-lang…
d-cs May 15, 2026
5610099
feat(mollifier): track org→envs in the buffer for clean org-level fai…
d-cs May 15, 2026
a1a0a85
revert(mollifier): use standard REDIS_* fallback and fail loud on mis…
d-cs May 15, 2026
650f025
refactor(mollifier): drop the redundant mollifier:envs SET
d-cs May 15, 2026
5163a65
refactor(mollifier): drop global FeatureFlag fallback in hot-path res…
d-cs May 15, 2026
c31eb22
fix(mollifier): pipeline per-tick org→env fan-out and reconcile shutd…
d-cs May 15, 2026
ed0c468
chore(mollifier): refresh redis-worker changeset for buffer-side org …
d-cs May 15, 2026
a467e9e
Merge branch 'main' into mollifier-phase-2
d-cs May 15, 2026
7344211
test(redis-worker): drop vi.fn handler spies from drainer tests
d-cs May 15, 2026
673c7d0
fix(webapp): validate mollifier drain shutdown timeout before startin…
d-cs May 15, 2026
60f2fb9
fix(webapp): validate mollifier drain shutdown timeout before startin…
d-cs May 15, 2026
9007053
switch info logging to debug
d-cs May 15, 2026
6487461
refactor(webapp): split mollifier drainer factory into create + start
d-cs May 15, 2026
02c0b71
refactor(webapp): move mollifier drainer bootstrap out of legacy work…
d-cs May 15, 2026
ad90fe3
feat(webapp): MOLLIFIER_DRAINER_ENABLED for per-service drainer control
d-cs May 15, 2026
e5d403e
refactor(webapp): prefix mollifier env vars with TRIGGER_
d-cs May 15, 2026
50868ff
docs(review): clarify what the no-mocking rule is actually for
d-cs May 15, 2026
ee474b5
Merge branch 'main' into mollifier-phase-2
d-cs May 15, 2026
92d0841
fix(redis-worker): clear MollifierDrainer.stop() timeout timer when l…
d-cs May 15, 2026
0d12e7b
refactor(webapp): wire mollifier drainer shutdown through signalsEmitter
d-cs May 15, 2026
f2f4ba6
chore(review): revert the no-mocking-rule clarification
d-cs May 15, 2026
5255c47
perf(webapp): short-circuit mollifier gate when globally disabled
d-cs May 15, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .changeset/mollifier-redis-worker-primitives.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
"@trigger.dev/redis-worker": patch
---

Add MollifierBuffer and MollifierDrainer primitives for trigger burst smoothing.

MollifierBuffer (`accept`, `pop`, `ack`, `requeue`, `fail`, `evaluateTrip`) is a per-env FIFO over Redis with atomic Lua transitions for status tracking. `evaluateTrip` is a sliding-window trip evaluator the webapp gate uses to detect per-env trigger bursts.

MollifierDrainer pops entries through a polling loop with a user-supplied handler. The loop survives transient Redis errors via capped exponential backoff (up to 5s), and per-env pop failures don't poison the rest of the batch — one env's blip is logged and counted as failed for that tick. Rotation is two-level: orgs at the top, envs within each org. The buffer maintains `mollifier:orgs` and `mollifier:org-envs:${orgId}` atomically with per-env queues, so the drainer walks orgs → envs directly without an in-memory cache. The `maxOrgsPerTick` option (default 500) caps how many orgs are scheduled per tick; for each picked org, one env is popped (rotating round-robin within the org). An org with N envs gets the same per-tick scheduling slot as an org with 1 env, so tenant-level drainage throughput is determined by org count rather than env count.
6 changes: 6 additions & 0 deletions .server-changes/mollifier-burst-protection.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: feature
---

Lay the groundwork for an opt-in burst-protection layer on the trigger hot path. This release ships **monitoring only** — operators can observe per-env trigger storms via two opt-in modes, but no trigger calls are diverted or rate-limited yet (active burst smoothing follows in a later release). All new env vars default off, so existing deployments see no behaviour change. With `MOLLIFIER_SHADOW_MODE=1`, each trigger evaluates a per-env rate counter and logs `mollifier.would_mollify` when the threshold is crossed. With `MOLLIFIER_ENABLED=1` plus a per-org `mollifierEnabled` flag, over-threshold triggers are also recorded in a Redis audit buffer alongside the normal `engine.trigger` call, drained by a background no-op consumer. Emits the `mollifier.decisions` OTel counter for per-env rate visibility.
3 changes: 3 additions & 0 deletions apps/webapp/app/entry.server.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import isbot from "isbot";
import { renderToPipeableStream } from "react-dom/server";
import { PassThrough } from "stream";
import * as Worker from "~/services/worker.server";
import { initMollifierDrainerWorker } from "~/v3/mollifierDrainerWorker.server";
import { bootstrap } from "./bootstrap";
import { LocaleContextProvider } from "./components/primitives/LocaleProvider";
import {
Expand Down Expand Up @@ -247,6 +248,8 @@ Worker.init().catch((error) => {
logError(error);
});

initMollifierDrainerWorker();

bootstrap().catch((error) => {
logError(error);
});
Expand Down
41 changes: 41 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1054,6 +1054,47 @@ const EnvironmentSchema = z
COMMON_WORKER_REDIS_TLS_DISABLED: z.string().default(process.env.REDIS_TLS_DISABLED ?? "false"),
COMMON_WORKER_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"),
Comment thread
d-cs marked this conversation as resolved.

TRIGGER_MOLLIFIER_ENABLED: z.string().default("0"),
// Separate switch for the drainer (consumer side) so it can be split
// off onto a dedicated worker service. Unset → inherits
// TRIGGER_MOLLIFIER_ENABLED, so single-container self-hosters don't have to
// flip two switches. In multi-replica deployments, set this to "0"
// explicitly on every replica except the one dedicated drainer
// service — otherwise every replica's polling loop races for the
// same buffer entries. `TRIGGER_MOLLIFIER_ENABLED` is still the master kill
// switch; setting this to "1" while `TRIGGER_MOLLIFIER_ENABLED` is "0" is a
// no-op because the gate-side singleton refuses to construct a
// buffer when the system is off.
TRIGGER_MOLLIFIER_DRAINER_ENABLED: z.string().default(process.env.TRIGGER_MOLLIFIER_ENABLED ?? "0"),
TRIGGER_MOLLIFIER_SHADOW_MODE: z.string().default("0"),
TRIGGER_MOLLIFIER_REDIS_HOST: z
.string()
.optional()
.transform((v) => v ?? process.env.REDIS_HOST),
TRIGGER_MOLLIFIER_REDIS_PORT: z.coerce
.number()
.optional()
.transform(
(v) => v ?? (process.env.REDIS_PORT ? parseInt(process.env.REDIS_PORT) : undefined),
),
TRIGGER_MOLLIFIER_REDIS_USERNAME: z
.string()
.optional()
.transform((v) => v ?? process.env.REDIS_USERNAME),
TRIGGER_MOLLIFIER_REDIS_PASSWORD: z
.string()
.optional()
.transform((v) => v ?? process.env.REDIS_PASSWORD),
TRIGGER_MOLLIFIER_REDIS_TLS_DISABLED: z.string().default(process.env.REDIS_TLS_DISABLED ?? "false"),
TRIGGER_MOLLIFIER_TRIP_WINDOW_MS: z.coerce.number().int().positive().default(200),
TRIGGER_MOLLIFIER_TRIP_THRESHOLD: z.coerce.number().int().positive().default(100),
TRIGGER_MOLLIFIER_HOLD_MS: z.coerce.number().int().positive().default(500),
TRIGGER_MOLLIFIER_DRAIN_CONCURRENCY: z.coerce.number().int().positive().default(50),
TRIGGER_MOLLIFIER_ENTRY_TTL_S: z.coerce.number().int().positive().default(600),
TRIGGER_MOLLIFIER_DRAIN_MAX_ATTEMPTS: z.coerce.number().int().positive().default(3),
TRIGGER_MOLLIFIER_DRAIN_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().positive().default(30_000),
TRIGGER_MOLLIFIER_DRAIN_MAX_ORGS_PER_TICK: z.coerce.number().int().positive().default(500),

BATCH_TRIGGER_PROCESS_JOB_VISIBILITY_TIMEOUT_MS: z.coerce
.number()
.int()
Expand Down
106 changes: 106 additions & 0 deletions apps/webapp/app/runEngine/services/triggerTask.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,18 @@ import type {
TriggerTaskRequest,
TriggerTaskValidator,
} from "../types";
import { env } from "~/env.server";
import {
evaluateGate as defaultEvaluateGate,
type GateOutcome,
type MollifierEvaluateGate,
} from "~/v3/mollifier/mollifierGate.server";
import {
getMollifierBuffer as defaultGetMollifierBuffer,
type MollifierGetBuffer,
} from "~/v3/mollifier/mollifierBuffer.server";
import { buildBufferedTriggerPayload } from "~/v3/mollifier/bufferedTriggerPayload.server";
import { serialiseSnapshot } from "@trigger.dev/redis-worker";
import { QueueSizeLimitExceededError, ServiceValidationError } from "~/v3/services/common.server";

class NoopTriggerRacepointSystem implements TriggerRacepointSystem {
Expand All @@ -59,6 +71,11 @@ export class RunEngineTriggerTaskService {
private readonly traceEventConcern: TraceEventConcern;
private readonly triggerRacepointSystem: TriggerRacepointSystem;
private readonly metadataMaximumSize: number;
// Mollifier hooks are DI'd so tests can drive the call-site's mollify branch
// deterministically (stub the gate to return mollify, inject a real or fake
// buffer). In production both default to the live module-level singletons.
private readonly evaluateGate: MollifierEvaluateGate;
private readonly getMollifierBuffer: MollifierGetBuffer;

constructor(opts: {
prisma: PrismaClientOrTransaction;
Expand All @@ -71,6 +88,8 @@ export class RunEngineTriggerTaskService {
tracer: Tracer;
metadataMaximumSize: number;
triggerRacepointSystem?: TriggerRacepointSystem;
evaluateGate?: MollifierEvaluateGate;
getMollifierBuffer?: MollifierGetBuffer;
}) {
this.prisma = opts.prisma;
this.engine = opts.engine;
Expand All @@ -82,6 +101,8 @@ export class RunEngineTriggerTaskService {
this.traceEventConcern = opts.traceEventConcern;
this.metadataMaximumSize = opts.metadataMaximumSize;
this.triggerRacepointSystem = opts.triggerRacepointSystem ?? new NoopTriggerRacepointSystem();
this.evaluateGate = opts.evaluateGate ?? defaultEvaluateGate;
this.getMollifierBuffer = opts.getMollifierBuffer ?? defaultGetMollifierBuffer;
}

public async call({
Expand Down Expand Up @@ -316,6 +337,23 @@ export class RunEngineTriggerTaskService {
taskKind: taskKind ?? "STANDARD",
};

// Short-circuit before the gate when mollifier is globally off (the
// default for every deployment that hasn't opted in). Avoids the
// GateInputs allocation, the deps spread inside `evaluateGate`, and
// the `mollifier.decisions{outcome=pass_through}` OTel increment on
// every trigger — `triggerTask` is the highest-throughput code path
// in the system. When the flag is on, behaviour is unchanged.
const mollifierOutcome: GateOutcome | null =
env.TRIGGER_MOLLIFIER_ENABLED === "1"
? await this.evaluateGate({
envId: environment.id,
orgId: environment.organizationId,
taskId,
orgFeatureFlags:
(environment.organization.featureFlags as Record<string, unknown> | null) ?? null,
})
: null;
Comment on lines +346 to +355
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Hard-coded env.TRIGGER_MOLLIFIER_ENABLED check bypasses DI'd evaluateGate, making mollifier integration tests unreachable

The short-circuit at triggerTask.server.ts:347 reads the global env singleton directly (env.TRIGGER_MOLLIFIER_ENABLED === "1") instead of going through the DI'd evaluateGate. When this env var is not set (defaults to "0" per env.server.ts:1057), mollifierOutcome is unconditionally null, so the injected gate is never called and the buffer-write branch at line 377 is unreachable.

This means three mollifier integration tests will fail:

  • "mollify action triggers dual-write" (line 1328: expect(buffer.accepted).toHaveLength(1) gets 0)
  • "engine.trigger throwing AFTER buffer.accept" (line 1491: same assertion)
  • "debounce match produces an orphan buffer entry" (line 1730: same assertion)

The .env file does not exist in CI (confirmed: apps/webapp/.env is a symlink to ../../.env which is absent), and .env.example contains no TRIGGER_MOLLIFIER vars, so dotenv silently no-ops and the Zod default of "0" takes effect.

The DI pattern is correctly wired at construction (lines 91–105) but defeated by the env read at the call site

The constructor accepts optional evaluateGate / getMollifierBuffer hooks and stores them as instance fields. But the call site gates on the module-level env singleton before reaching those hooks. The fix is to make the enabled check part of the DI surface — e.g. check this.evaluateGate !== defaultEvaluateGate to skip the env guard when a custom gate is injected, or make the env check itself injectable.

Prompt for agents
The issue is in RunEngineTriggerTaskService.call() in apps/webapp/app/runEngine/services/triggerTask.server.ts at lines 346-355. The code reads env.TRIGGER_MOLLIFIER_ENABLED directly from the global env singleton to short-circuit the mollifier gate evaluation. This bypasses the DI'd evaluateGate that was injected via the constructor (lines 91-105), making the mollifier integration tests in test/engine/triggerTask.test.ts unreachable because the env var defaults to '0' in test environments.

The production intent is correct: skip the gate when the feature is globally off. The problem is that the env check is not part of the DI surface.

Possible approaches:
1. Add an isMollifierEnabled callback to the constructor options (like evaluateGate and getMollifierBuffer), defaulting to () => env.TRIGGER_MOLLIFIER_ENABLED === '1'. Tests can override it to return true.
2. Move the env check inside evaluateGate itself (it already has isMollifierEnabled in GateDependencies), and remove the outer check. The DI'd evaluateGate in tests already returns a fixed outcome, so the inner check is irrelevant. The production evaluateGate already checks isMollifierEnabled internally, so the outer check is redundant.
3. Skip the env guard when a non-default evaluateGate is injected (detect by comparing to the default import).

Approach 2 is simplest: just remove the ternary at lines 346-355 and always call this.evaluateGate. When TRIGGER_MOLLIFIER_ENABLED is '0' in production, the default evaluateGate's isMollifierEnabled() check returns false and it returns pass_through with an OTel counter increment — but the comment at line 340-345 explicitly says the short-circuit exists to avoid that counter increment. If that overhead is acceptable, approach 2 works. Otherwise approach 1 is the cleanest DI fix.
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.


try {
return await this.traceEventConcern.traceRun(
triggerRequest,
Expand All @@ -328,6 +366,74 @@ export class RunEngineTriggerTaskService {

const payloadPacket = await this.payloadProcessor.process(triggerRequest);

// Phase 1 dual-write: if the org has the mollifier feature flag
// enabled and the per-env trip evaluator says divert, write the
// canonical replay payload to the buffer AND continue through
// engine.trigger as normal. The buffer entry is an audit/preview
// copy; the drainer's no-op handler consumes it to prove the
// dequeue mechanism works. Phase 2 will replace engine.trigger
// (below) with a synthesised 200 response and rely on the
// drainer to perform the Postgres write via replay.
if (mollifierOutcome?.action === "mollify") {
const buffer = this.getMollifierBuffer();
if (buffer) {
const canonicalPayload = buildBufferedTriggerPayload({
runFriendlyId,
taskId,
envId: environment.id,
envType: environment.type,
envSlug: environment.slug,
orgId: environment.organizationId,
orgSlug: environment.organization.slug,
projectId: environment.projectId,
projectRef: environment.project.externalRef,
body,
idempotencyKey: idempotencyKey ?? null,
idempotencyKeyExpiresAt: idempotencyKey
? idempotencyKeyExpiresAt ?? null
: null,
tags,
parentRunFriendlyId: parentRun?.friendlyId ?? null,
traceContext: event.traceContext,
triggerSource,
triggerAction,
serviceOptions: options,
createdAt: new Date(),
});

try {
const serialisedPayload = serialiseSnapshot(canonicalPayload);
await buffer.accept({
runId: runFriendlyId,
envId: environment.id,
orgId: environment.organizationId,
payload: serialisedPayload,
});
// Light log on the hot path — keep this synchronous work
// O(1) per trigger. The drainer computes the payload hash
// off-path; operators correlate `mollifier.buffered` →
// `mollifier.drained` by runId.
logger.debug("mollifier.buffered", {
runId: runFriendlyId,
envId: environment.id,
orgId: environment.organizationId,
taskId,
payloadBytes: serialisedPayload.length,
});
} catch (err) {
// Fail-open: buffer write must never block the customer's
// trigger. engine.trigger below is the primary write path
// in Phase 1 — the customer still gets a valid run.
logger.error("mollifier.buffer_accept_failed", {
runId: runFriendlyId,
envId: environment.id,
taskId,
err: err instanceof Error ? err.message : String(err),
});
}
}
}

const taskRun = await this.engine.trigger(
{
friendlyId: runFriendlyId,
Expand Down
21 changes: 21 additions & 0 deletions apps/webapp/app/services/worker.server.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,24 @@
/**
* ⚠️ LEGACY — Graphile-worker / ZodWorker setup. Do not touch.
*
* This file wires the original background-job system the webapp was
* built on (`@internal/zod-worker` → graphile-worker → Postgres). It is
* now in deprecation mode: every task in `workerCatalog` below is
* annotated with `@deprecated, moved to <new home>` and the live jobs
* for new features all run on `@trigger.dev/redis-worker` instead.
*
* Where to put new things:
* - Background jobs / queues → use redis-worker, alongside
* `~/v3/commonWorker.server.ts`, `~/v3/alertsWorker.server.ts`, or
* `~/v3/batchTriggerWorker.server.ts`.
* - Run lifecycle → `@internal/run-engine` via `~/v3/runEngine.server`.
* - Custom polling loops with their own Redis connection → keep them
* in their own lifecycle module (e.g. `~/v3/mollifierDrainerWorker.server.ts`)
* and wire the bootstrap from `entry.server.tsx`. Don't reach into
* `init()` below.
*
* Edit only when removing legacy paths.
*/
import { ZodWorker } from "@internal/zod-worker";
import { DeliverEmailSchema } from "emails";
import { z } from "zod";
Expand Down
2 changes: 2 additions & 0 deletions apps/webapp/app/v3/featureFlags.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ export const FEATURE_FLAG = {
hasAiAccess: "hasAiAccess",
hasComputeAccess: "hasComputeAccess",
hasPrivateConnections: "hasPrivateConnections",
mollifierEnabled: "mollifierEnabled",
} as const;

export const FeatureFlagCatalog = {
Expand All @@ -18,6 +19,7 @@ export const FeatureFlagCatalog = {
[FEATURE_FLAG.hasAiAccess]: z.coerce.boolean(),
[FEATURE_FLAG.hasComputeAccess]: z.coerce.boolean(),
[FEATURE_FLAG.hasPrivateConnections]: z.coerce.boolean(),
[FEATURE_FLAG.mollifierEnabled]: z.coerce.boolean(),
};

export type FeatureFlagKey = keyof typeof FeatureFlagCatalog;
Expand Down
107 changes: 107 additions & 0 deletions apps/webapp/app/v3/mollifier/bufferedTriggerPayload.server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
import type { TriggerTaskRequestBody } from "@trigger.dev/core/v3";
import type { TriggerTaskServiceOptions } from "~/v3/services/triggerTask.server";

// Canonical payload shape written to the mollifier buffer when the gate
// decides to mollify a trigger. Phase 1 ALSO calls engine.trigger directly
// (dual-write) so this is currently an audit/preview record. Phase 2 will
// make the buffer the primary write path: the drainer's handler will read
// this payload and replay it through engine.trigger to create the run in
// Postgres, and read-fallback endpoints will synthesise a Run view from it
// while it is still QUEUED.
//
// CONTRACT: this shape must contain everything needed for Phase 2's
// drainer-replay to reconstruct an equivalent engine.trigger call. Phase 1
// emits it to logs; Phase 2 will serialise it into Redis and rebuild it on
// the drain side. Keep it serialisable — no functions, no class instances.
export type BufferedTriggerPayload = {
runFriendlyId: string;

// Routing identifiers — let the drainer re-fetch full AuthenticatedEnvironment
// at replay time rather than embedding it in the payload.
envId: string;
envType: string;
envSlug: string;
orgId: string;
orgSlug: string;
projectId: string;
projectRef: string;

// Task identifier — looked up against the locked BackgroundWorkerTask
// at replay time to recover task-defaults.
taskId: string;

// Customer-supplied trigger body (payload, options, context).
body: TriggerTaskRequestBody;

// Resolved values from upstream concerns. The drainer should NOT re-resolve
// these — that would create a second idempotency-key check, etc.
idempotencyKey: string | null;
idempotencyKeyExpiresAt: string | null;
tags: string[];

// Parent/root linkage for nested triggers.
parentRunFriendlyId: string | null;

// Trace context — propagates the original triggering span across the
// buffer→drain boundary so the run's lifecycle stays under one trace.
traceContext: Record<string, unknown>;

// Annotations + service options that influence routing/replay.
triggerSource: string;
triggerAction: string;
serviceOptions: TriggerTaskServiceOptions;

// Wall-clock instants relevant to the run.
createdAt: string;
};

// Assemble the canonical payload from the inputs available at the point
// `evaluateGate` returns "mollify" in `RunEngineTriggerTaskService.call`.
// All fields must be derivable from data already in scope at that call site;
// nothing should require an extra DB lookup.
export function buildBufferedTriggerPayload(input: {
runFriendlyId: string;
taskId: string;
envId: string;
envType: string;
envSlug: string;
orgId: string;
orgSlug: string;
projectId: string;
projectRef: string;
body: TriggerTaskRequestBody;
idempotencyKey: string | null;
idempotencyKeyExpiresAt: Date | null;
tags: string[];
parentRunFriendlyId: string | null;
traceContext: Record<string, unknown>;
triggerSource: string;
triggerAction: string;
serviceOptions: TriggerTaskServiceOptions;
createdAt: Date;
}): BufferedTriggerPayload {
return {
runFriendlyId: input.runFriendlyId,
envId: input.envId,
envType: input.envType,
envSlug: input.envSlug,
orgId: input.orgId,
orgSlug: input.orgSlug,
projectId: input.projectId,
projectRef: input.projectRef,
taskId: input.taskId,
body: input.body,
idempotencyKey: input.idempotencyKey,
idempotencyKeyExpiresAt:
input.idempotencyKey && input.idempotencyKeyExpiresAt
? input.idempotencyKeyExpiresAt.toISOString()
: null,
tags: input.tags,
parentRunFriendlyId: input.parentRunFriendlyId,
traceContext: input.traceContext,
triggerSource: input.triggerSource,
triggerAction: input.triggerAction,
serviceOptions: input.serviceOptions,
createdAt: input.createdAt.toISOString(),
};
}
Loading
Loading