Skip to content

Commit fccc2fc

Browse files
author
deepshekhardas
committed
Fix error handling gaps in recent releases
1 parent 9c3a802 commit fccc2fc

4 files changed

Lines changed: 284 additions & 3 deletions

File tree

apps/webapp/app/entry.server.tsx

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,11 @@ Worker.init().catch((error) => {
247247
logError(error);
248248
});
249249

250+
import { initMollifierDrainerWorker } from "~/v3/mollifierDrainerWorker.server";
251+
initMollifierDrainerWorker().catch((error) => {
252+
logger.error("Mollifier drainer initialization failed", { error });
253+
});
254+
250255
bootstrap().catch((error) => {
251256
logError(error);
252257
});
Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
import { Logger } from "@trigger.dev/core/logger";
2+
3+
export type ReplicationErrorRecoveryStrategy =
4+
| {
5+
type: "reconnect";
6+
initialDelayMs?: number;
7+
maxDelayMs?: number;
8+
maxAttempts?: number;
9+
}
10+
| {
11+
type: "exit";
12+
exitDelayMs?: number;
13+
exitCode?: number;
14+
}
15+
| { type: "log" };
16+
17+
export type ReplicationErrorRecoveryDeps = {
18+
strategy: ReplicationErrorRecoveryStrategy;
19+
logger: Logger;
20+
reconnect: () => Promise<void>;
21+
isShuttingDown: () => boolean;
22+
};
23+
24+
export type ReplicationErrorRecovery = {
25+
handle(error: unknown): void;
26+
notifyStreamStarted(): void;
27+
notifyLeaderElectionLost(error: unknown): void;
28+
dispose(): void;
29+
};
30+
31+
export function createReplicationErrorRecovery(
32+
deps: ReplicationErrorRecoveryDeps
33+
): ReplicationErrorRecovery {
34+
const { strategy, logger, reconnect, isShuttingDown } = deps;
35+
let attempt = 0;
36+
let pendingReconnect: NodeJS.Timeout | null = null;
37+
let pendingExit: NodeJS.Timeout | null = null;
38+
39+
function scheduleReconnect(error: unknown): void {
40+
if (strategy.type !== "reconnect") return;
41+
if (pendingReconnect) return;
42+
43+
attempt += 1;
44+
const maxAttempts = strategy.maxAttempts ?? 0;
45+
if (maxAttempts > 0 && attempt > maxAttempts) {
46+
logger.error("Replication reconnect exceeded maxAttempts; giving up", {
47+
attempt,
48+
maxAttempts,
49+
error,
50+
});
51+
return;
52+
}
53+
54+
const initialDelay = strategy.initialDelayMs ?? 1_000;
55+
const maxDelay = strategy.maxDelayMs ?? 60_000;
56+
const delay = Math.min(initialDelay * Math.pow(2, attempt - 1), maxDelay);
57+
58+
logger.error("Replication stream lost — scheduling reconnect", {
59+
attempt,
60+
delayMs: delay,
61+
error,
62+
});
63+
64+
pendingReconnect = setTimeout(async () => {
65+
pendingReconnect = null;
66+
67+
if (isShuttingDown()) {
68+
logger.info("Replication reconnect skipped — shutting down");
69+
return;
70+
}
71+
72+
try {
73+
await reconnect();
74+
} catch (err) {
75+
logger.error("Replication reconnect failed", { error: err });
76+
scheduleReconnect(err);
77+
}
78+
}, delay);
79+
}
80+
81+
function scheduleExit(): void {
82+
if (strategy.type !== "exit") return;
83+
if (pendingExit) return;
84+
85+
const delay = strategy.exitDelayMs ?? 5_000;
86+
const exitCode = strategy.exitCode ?? 1;
87+
88+
logger.error("Replication stream lost — exiting", { delayMs: delay, exitCode });
89+
90+
pendingExit = setTimeout(() => {
91+
process.exit(exitCode);
92+
}, delay);
93+
}
94+
95+
return {
96+
handle(error: unknown) {
97+
if (isShuttingDown()) return;
98+
99+
switch (strategy.type) {
100+
case "log":
101+
return;
102+
case "exit":
103+
return scheduleExit();
104+
case "reconnect":
105+
return scheduleReconnect(error);
106+
}
107+
},
108+
notifyStreamStarted() {
109+
if (attempt > 0) {
110+
logger.info("Replication reconnect succeeded", { attempt });
111+
attempt = 0;
112+
}
113+
},
114+
notifyLeaderElectionLost(error: unknown) {
115+
if (isShuttingDown()) return;
116+
if (strategy.type !== "reconnect") return;
117+
scheduleReconnect(error);
118+
},
119+
dispose() {
120+
if (pendingReconnect) {
121+
clearTimeout(pendingReconnect);
122+
pendingReconnect = null;
123+
}
124+
if (pendingExit) {
125+
clearTimeout(pendingExit);
126+
pendingExit = null;
127+
}
128+
},
129+
};
130+
}
131+
132+
export type ReplicationErrorRecoveryEnv = {
133+
strategy: "reconnect" | "exit" | "log";
134+
reconnectInitialDelayMs?: number;
135+
reconnectMaxDelayMs?: number;
136+
reconnectMaxAttempts?: number;
137+
exitDelayMs?: number;
138+
exitCode?: number;
139+
};
140+
141+
export function strategyFromEnv(
142+
env: ReplicationErrorRecoveryEnv
143+
): ReplicationErrorRecoveryStrategy {
144+
switch (env.strategy) {
145+
case "exit":
146+
return {
147+
type: "exit",
148+
exitDelayMs: env.exitDelayMs,
149+
exitCode: env.exitCode,
150+
};
151+
case "log":
152+
return { type: "log" };
153+
case "reconnect":
154+
default:
155+
return {
156+
type: "reconnect",
157+
initialDelayMs: env.reconnectInitialDelayMs,
158+
maxDelayMs: env.reconnectMaxDelayMs,
159+
maxAttempts: env.reconnectMaxAttempts,
160+
};
161+
}
162+
}

apps/webapp/app/v3/dynamicFlushScheduler.server.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -193,14 +193,13 @@ export class DynamicFlushScheduler<T> {
193193

194194
if (batchesToFlush.length === 0) return;
195195

196-
// Schedule all batches for concurrent processing
196+
// Schedule all batches for concurrent processing
197197
const flushPromises = batchesToFlush.map((batch) =>
198198
this.limiter(async () => {
199-
const itemCount = batch.length;
200-
201199
const self = this;
202200

203201
async function tryFlush(flushId: string, batchToFlush: T[], attempt: number = 1) {
202+
const itemCount = batchToFlush.length;
204203
try {
205204
const startTime = Date.now();
206205
await self.callback(flushId, batchToFlush);
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
export type AttributeValue = string | number | boolean | undefined;
2+
export type AttributeMap = Record<string, AttributeValue>;
3+
4+
export type AttributeKeyOverride = { prefix: string; limit: number };
5+
6+
export type SpanAttributeLimits = {
7+
defaultValueLengthLimit: number;
8+
aiContentValueLengthLimit: number;
9+
totalAttributesLengthLimit: number;
10+
};
11+
12+
export const AI_CONTENT_KEY_OVERRIDES = (limit: number): AttributeKeyOverride[] => [
13+
{ prefix: "ai.prompt", limit },
14+
{ prefix: "ai.response.text", limit },
15+
{ prefix: "ai.response.object", limit },
16+
{ prefix: "ai.response.toolCalls", limit },
17+
{ prefix: "ai.response.reasoning", limit },
18+
{ prefix: "ai.response.reasoningDetails", limit },
19+
{ prefix: "gen_ai.prompt", limit },
20+
{ prefix: "gen_ai.completion", limit },
21+
{ prefix: "gen_ai.request.messages", limit },
22+
{ prefix: "gen_ai.response.text", limit },
23+
];
24+
25+
export const AI_CONTENT_DROP_PRIORITY: string[] = [
26+
"ai.prompt.messages",
27+
"ai.prompt",
28+
"ai.response.object",
29+
"ai.response.text",
30+
"ai.response.toolCalls",
31+
"ai.response.reasoning",
32+
"ai.response.reasoningDetails",
33+
"gen_ai.prompt",
34+
"gen_ai.completion",
35+
"gen_ai.request.messages",
36+
"gen_ai.response.text",
37+
];
38+
39+
function matchPrefix(key: string, prefix: string): boolean {
40+
return key === prefix || key.startsWith(prefix + ".");
41+
}
42+
43+
function getMatchingOverride(key: string, overrides: AttributeKeyOverride[]): number | null {
44+
for (const { prefix, limit } of overrides) {
45+
if (matchPrefix(key, prefix)) return limit;
46+
}
47+
return null;
48+
}
49+
50+
function truncateValue(value: string, limit: number): string {
51+
if (value.length <= limit) return value;
52+
return value.slice(0, limit);
53+
}
54+
55+
export function truncateAttributes(
56+
attributes: AttributeMap,
57+
limits: SpanAttributeLimits,
58+
overrides: AttributeKeyOverride[]
59+
): AttributeMap {
60+
const result: AttributeMap = {};
61+
62+
for (const [key, value] of Object.entries(attributes)) {
63+
if (typeof value !== "string") {
64+
result[key] = value;
65+
continue;
66+
}
67+
68+
const override = getMatchingOverride(key, overrides);
69+
const limit = override ?? limits.defaultValueLengthLimit;
70+
result[key] = truncateValue(value, limit);
71+
}
72+
73+
return result;
74+
}
75+
76+
export function applyTotalSizeBackstop(
77+
attributes: AttributeMap,
78+
limits: SpanAttributeLimits,
79+
dropPriority: string[]
80+
): AttributeMap {
81+
const json = JSON.stringify(attributes);
82+
if (json.length <= limits.totalAttributesLengthLimit) return attributes;
83+
84+
const result: AttributeMap = { ...attributes };
85+
const aiKeys = new Set<string>();
86+
87+
for (const key of Object.keys(result)) {
88+
for (const prefix of dropPriority) {
89+
if (matchPrefix(key, prefix)) {
90+
aiKeys.add(key);
91+
break;
92+
}
93+
}
94+
}
95+
96+
const sortedAiKeys = dropPriority.filter((k) => aiKeys.has(k));
97+
98+
for (const key of sortedAiKeys) {
99+
delete result[key];
100+
const remainingJson = JSON.stringify(result);
101+
if (remainingJson.length <= limits.totalAttributesLengthLimit) break;
102+
}
103+
104+
return result;
105+
}
106+
107+
export function truncateSpanAttributes(
108+
attributes: AttributeMap,
109+
limits: SpanAttributeLimits
110+
): AttributeMap {
111+
const overrides = AI_CONTENT_KEY_OVERRIDES(limits.aiContentValueLengthLimit);
112+
let result = truncateAttributes(attributes, limits, overrides);
113+
result = applyTotalSizeBackstop(result, limits, AI_CONTENT_DROP_PRIORITY);
114+
return result;
115+
}

0 commit comments

Comments
 (0)