Skip to content

Commit f2ab40b

Browse files
committed
feat: trigger mollifier phase 1 scaffolding
Redis-backed burst-smoothing layer behind MOLLIFIER_ENABLED=0 (default). With the kill switch off, the gate short-circuits on its first env check and production behaviour is identical to main. @trigger.dev/redis-worker: - MollifierBuffer: atomic Lua-backed FIFO with accept / pop / ack / requeue / fail + TTL. Per-env queues with HSET entry storage, atomic RPOP + status transition, FIFO retry ordering. - MollifierDrainer: generic round-robin worker with concurrency cap, retry semantics, and a stop deadline to avoid livelock on a hung handler. Phase 3 will wire the handler to engine.trigger(). - Full testcontainers-backed test suite (21 tests). apps/webapp: - evaluateGate cascade-check (kill switch -> org feature flag -> shadow mode -> trip evaluator -> mollify / shadow_log / pass_through). Dependencies injected for testability; the trip evaluator stub returns { divert: false } in phase 1. - Inserted into RunEngineTriggerTaskService.call() before traceEventConcern.traceRun. The mollify branch throws (unreachable in phase 1). - Lazy MollifierBuffer + MollifierDrainer singletons; no Redis connection unless MOLLIFIER_ENABLED=1. - 12 MOLLIFIER_* env vars (all safe defaults) and a mollifierEnabled feature flag in the global catalog. - Drainer booted from worker.server.ts on first import. - Read-fallback stub for phase 3. - Gate cascade tests + .env loader so env.server validates in vitest workers. Phase 2 will land the real trip evaluator; phase 3 will activate the buffer-write + drain path.
1 parent 759214e commit f2ab40b

20 files changed

Lines changed: 1473 additions & 0 deletions
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@trigger.dev/redis-worker": patch
3+
---
4+
5+
Add MollifierBuffer and MollifierDrainer primitives for burst smoothing (scaffolding only — not active without webapp wiring).
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
area: webapp
3+
type: feature
4+
---
5+
6+
Add scaffolding for the trigger mollifier (phase 1). New env vars (all default off), `evaluateGate` (the mollifier gate) wired into the trigger hot path as a no-op, lazy singletons for the dedicated mollifier Redis client and drainer. No behavioural change while `MOLLIFIER_ENABLED=0`.

apps/webapp/app/env.server.ts

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1030,6 +1030,34 @@ const EnvironmentSchema = z
10301030
COMMON_WORKER_REDIS_TLS_DISABLED: z.string().default(process.env.REDIS_TLS_DISABLED ?? "false"),
10311031
COMMON_WORKER_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"),
10321032

1033+
MOLLIFIER_ENABLED: z.string().default("0"),
1034+
MOLLIFIER_SHADOW_MODE: z.string().default("0"),
1035+
MOLLIFIER_REDIS_HOST: z
1036+
.string()
1037+
.optional()
1038+
.transform((v) => v ?? process.env.REDIS_HOST),
1039+
MOLLIFIER_REDIS_PORT: z.coerce
1040+
.number()
1041+
.optional()
1042+
.transform(
1043+
(v) => v ?? (process.env.REDIS_PORT ? parseInt(process.env.REDIS_PORT) : undefined),
1044+
),
1045+
MOLLIFIER_REDIS_USERNAME: z
1046+
.string()
1047+
.optional()
1048+
.transform((v) => v ?? process.env.REDIS_USERNAME),
1049+
MOLLIFIER_REDIS_PASSWORD: z
1050+
.string()
1051+
.optional()
1052+
.transform((v) => v ?? process.env.REDIS_PASSWORD),
1053+
MOLLIFIER_REDIS_TLS_DISABLED: z.string().default(process.env.REDIS_TLS_DISABLED ?? "false"),
1054+
MOLLIFIER_TRIP_WINDOW_MS: z.coerce.number().int().positive().default(200),
1055+
MOLLIFIER_TRIP_THRESHOLD: z.coerce.number().int().positive().default(100),
1056+
MOLLIFIER_HOLD_MS: z.coerce.number().int().positive().default(500),
1057+
MOLLIFIER_DRAIN_CONCURRENCY: z.coerce.number().int().positive().default(50),
1058+
MOLLIFIER_ENTRY_TTL_S: z.coerce.number().int().positive().default(600),
1059+
MOLLIFIER_DRAIN_MAX_ATTEMPTS: z.coerce.number().int().positive().default(3),
1060+
10331061
BATCH_TRIGGER_PROCESS_JOB_VISIBILITY_TIMEOUT_MS: z.coerce
10341062
.number()
10351063
.int()

apps/webapp/app/runEngine/services/triggerTask.server.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ import type {
4040
TriggerTaskRequest,
4141
TriggerTaskValidator,
4242
} from "../types";
43+
import { evaluateGate } from "~/v3/mollifier/mollifierGate.server";
4344
import { QueueSizeLimitExceededError, ServiceValidationError } from "~/v3/services/common.server";
4445

4546
class NoopTriggerRacepointSystem implements TriggerRacepointSystem {
@@ -315,6 +316,16 @@ export class RunEngineTriggerTaskService {
315316
rootScheduleId: parentAnnotations?.rootScheduleId || options.scheduleId || undefined,
316317
};
317318

319+
const mollifierOutcome = await evaluateGate({
320+
envId: environment.id,
321+
orgId: environment.organizationId,
322+
});
323+
if (mollifierOutcome.action === "mollify") {
324+
throw new Error(
325+
"MollifierGate.mollify reached in phase 1 — should be unreachable until phase 3 wiring lands",
326+
);
327+
}
328+
318329
try {
319330
return await this.traceEventConcern.traceRun(
320331
triggerRequest,

apps/webapp/app/services/worker.server.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import { ResumeBatchRunService } from "~/v3/services/resumeBatchRun.server";
2626
import { ResumeTaskDependencyService } from "~/v3/services/resumeTaskDependency.server";
2727
import { RetryAttemptService } from "~/v3/services/retryAttempt.server";
2828
import { TimeoutDeploymentService } from "~/v3/services/timeoutDeployment.server";
29+
import { getMollifierDrainer } from "~/v3/mollifier/mollifierDrainer.server";
2930
import { GraphileMigrationHelperService } from "./db/graphileMigrationHelper.server";
3031
import { sendEmail } from "./email.server";
3132
import { logger } from "./logger.server";
@@ -128,6 +129,8 @@ export async function init() {
128129
if (env.WORKER_ENABLED === "true") {
129130
await workerQueue.initialize();
130131
}
132+
133+
getMollifierDrainer();
131134
}
132135

133136
function getWorkerQueue() {

apps/webapp/app/v3/featureFlags.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ export const FEATURE_FLAG = {
88
hasAiAccess: "hasAiAccess",
99
hasComputeAccess: "hasComputeAccess",
1010
hasPrivateConnections: "hasPrivateConnections",
11+
mollifierEnabled: "mollifierEnabled",
1112
} as const;
1213

1314
export const FeatureFlagCatalog = {
@@ -18,6 +19,7 @@ export const FeatureFlagCatalog = {
1819
[FEATURE_FLAG.hasAiAccess]: z.coerce.boolean(),
1920
[FEATURE_FLAG.hasComputeAccess]: z.coerce.boolean(),
2021
[FEATURE_FLAG.hasPrivateConnections]: z.coerce.boolean(),
22+
[FEATURE_FLAG.mollifierEnabled]: z.coerce.boolean(),
2123
};
2224

2325
export type FeatureFlagKey = keyof typeof FeatureFlagCatalog;
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
import { MollifierBuffer } from "@trigger.dev/redis-worker";
2+
import { env } from "~/env.server";
3+
import { logger } from "~/services/logger.server";
4+
import { singleton } from "~/utils/singleton";
5+
6+
function initializeMollifierBuffer(): MollifierBuffer {
7+
logger.debug("Initializing mollifier buffer", {
8+
host: env.MOLLIFIER_REDIS_HOST,
9+
});
10+
11+
return new MollifierBuffer({
12+
redisOptions: {
13+
keyPrefix: "",
14+
host: env.MOLLIFIER_REDIS_HOST,
15+
port: env.MOLLIFIER_REDIS_PORT,
16+
username: env.MOLLIFIER_REDIS_USERNAME,
17+
password: env.MOLLIFIER_REDIS_PASSWORD,
18+
enableAutoPipelining: true,
19+
...(env.MOLLIFIER_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
20+
},
21+
entryTtlSeconds: env.MOLLIFIER_ENTRY_TTL_S,
22+
});
23+
}
24+
25+
export function getMollifierBuffer(): MollifierBuffer | null {
26+
if (env.MOLLIFIER_ENABLED !== "1") return null;
27+
return singleton("mollifierBuffer", initializeMollifierBuffer);
28+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import { MollifierDrainer } from "@trigger.dev/redis-worker";
2+
import { env } from "~/env.server";
3+
import { logger } from "~/services/logger.server";
4+
import { singleton } from "~/utils/singleton";
5+
import { getMollifierBuffer } from "./mollifierBuffer.server";
6+
7+
function initializeMollifierDrainer(): MollifierDrainer {
8+
const buffer = getMollifierBuffer();
9+
if (!buffer) {
10+
// Should be unreachable: getMollifierDrainer() guards on the same env flag as getMollifierBuffer().
11+
throw new Error("MollifierDrainer initialised without a buffer — env vars inconsistent");
12+
}
13+
14+
logger.debug("Initializing mollifier drainer", {
15+
concurrency: env.MOLLIFIER_DRAIN_CONCURRENCY,
16+
maxAttempts: env.MOLLIFIER_DRAIN_MAX_ATTEMPTS,
17+
});
18+
19+
const drainer = new MollifierDrainer({
20+
buffer,
21+
handler: async () => {
22+
throw new Error("MollifierDrainer phase 1: no handler wired");
23+
},
24+
concurrency: env.MOLLIFIER_DRAIN_CONCURRENCY,
25+
maxAttempts: env.MOLLIFIER_DRAIN_MAX_ATTEMPTS,
26+
isRetryable: () => false,
27+
});
28+
29+
drainer.start();
30+
return drainer;
31+
}
32+
33+
export function getMollifierDrainer(): MollifierDrainer | null {
34+
if (env.MOLLIFIER_ENABLED !== "1") return null;
35+
return singleton("mollifierDrainer", initializeMollifierDrainer);
36+
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
import { env } from "~/env.server";
2+
import { logger } from "~/services/logger.server";
3+
import { flag } from "~/v3/featureFlags.server";
4+
import { FEATURE_FLAG } from "~/v3/featureFlags";
5+
6+
export type TripDecision =
7+
| { divert: false }
8+
| { divert: true; reason: "per_env_rate" };
9+
10+
export type GateOutcome =
11+
| { action: "pass_through" }
12+
| { action: "mollify"; decision: Extract<TripDecision, { divert: true }> }
13+
| { action: "shadow_log"; decision: Extract<TripDecision, { divert: true }> };
14+
15+
export type GateInputs = {
16+
envId: string;
17+
orgId: string;
18+
};
19+
20+
export type TripEvaluator = (inputs: GateInputs) => Promise<TripDecision>;
21+
22+
export type GateDependencies = {
23+
isMollifierEnabled: () => boolean;
24+
isShadowModeOn: () => boolean;
25+
resolveOrgFlag: () => Promise<boolean>;
26+
evaluator: TripEvaluator;
27+
logShadow: (inputs: GateInputs, reason: "per_env_rate") => void;
28+
};
29+
30+
const stubTripEvaluator: TripEvaluator = async () => ({ divert: false });
31+
32+
export const defaultGateDependencies: GateDependencies = {
33+
isMollifierEnabled: () => env.MOLLIFIER_ENABLED === "1",
34+
isShadowModeOn: () => env.MOLLIFIER_SHADOW_MODE === "1",
35+
resolveOrgFlag: () =>
36+
flag({ key: FEATURE_FLAG.mollifierEnabled, defaultValue: false }),
37+
evaluator: stubTripEvaluator,
38+
logShadow: (inputs, reason) =>
39+
logger.info("mollifier shadow decision", {
40+
envId: inputs.envId,
41+
orgId: inputs.orgId,
42+
reason,
43+
}),
44+
};
45+
46+
export async function evaluateGate(
47+
inputs: GateInputs,
48+
deps: Partial<GateDependencies> = {},
49+
): Promise<GateOutcome> {
50+
const d = { ...defaultGateDependencies, ...deps };
51+
52+
if (!d.isMollifierEnabled()) {
53+
return { action: "pass_through" };
54+
}
55+
56+
const orgFlagEnabled = await d.resolveOrgFlag();
57+
const shadowOn = d.isShadowModeOn();
58+
59+
if (!orgFlagEnabled && !shadowOn) {
60+
return { action: "pass_through" };
61+
}
62+
63+
const decision = await d.evaluator(inputs);
64+
if (!decision.divert) {
65+
return { action: "pass_through" };
66+
}
67+
68+
if (orgFlagEnabled) {
69+
return { action: "mollify", decision };
70+
}
71+
72+
d.logShadow(inputs, decision.reason);
73+
return { action: "shadow_log", decision };
74+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
import { logger } from "~/services/logger.server";
2+
3+
export type ReadFallbackInput = {
4+
runId: string;
5+
environmentId: string;
6+
organizationId: string;
7+
};
8+
9+
export async function findRunByIdWithMollifierFallback(
10+
input: ReadFallbackInput,
11+
): Promise<null> {
12+
logger.debug("mollifier read-fallback called (phase 1 stub)", {
13+
runId: input.runId,
14+
});
15+
return null;
16+
}

0 commit comments

Comments
 (0)