Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions .server-changes/otel-ai-sdk-attribute-truncation.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
---
area: webapp
type: fix
---

Tighten OTel span attribute truncation for Vercel AI SDK content keys
(`ai.prompt*`, `ai.response.text/object/toolCalls/reasoning*`,
`gen_ai.prompt`, `gen_ai.completion`, `gen_ai.request.messages`,
`gen_ai.response.text`) to a 1KB per-attribute cap, plus a 32KB per-span
backstop that drops these content keys in priority order if the assembled
attributes JSON still exceeds it. Cost/token metadata (`ai.usage.*`,
`ai.model.*`, `gen_ai.usage.*`, `gen_ai.response.model`, etc.) keeps the
default 8KB cap so LLM enrichment continues to work.

Adds a parse-error-aware safety net in `DynamicFlushScheduler`: when
ClickHouse rejects a batch with `Cannot parse JSON object here`, the
batch is split in half and retried (up to 8 split levels / 256-way
isolation) instead of failing all 5–10k rows at once. Leaves that
Comment on lines +17 to +18
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Correct split-depth numbers to match implemented behavior.

Line 17 says up to 8 split levels / 256-way isolation, but this PR’s described behavior is up to 4 split levels / 16-way isolation. Please align this changelog text with the actual scheduler limits to avoid operational confusion.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In @.server-changes/otel-ai-sdk-attribute-truncation.md around lines 17 - 18,
Update the changelog text that currently reads "up to 8 split levels / 256-way
isolation" to the correct behavior "up to 4 split levels / 16-way isolation" so
the wording matches the scheduler limits; locate and replace that exact phrase
in the .md content (search for the string "up to 8 split levels / 256-way
isolation") and ensure the surrounding sentence remains grammatically correct.

still fail — whether a single row or a split-exhausted chunk — are
logged with a 1KB sample, counted in `droppedRows`, and removed from
Comment on lines +19 to +20
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Fix wording typo in failure-row sentence.

Line 19 should read Rows that still fail ... (not Leaves that still fail ...) for clear changelog wording.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In @.server-changes/otel-ai-sdk-attribute-truncation.md around lines 19 - 20,
Update the changelog sentence that currently reads "Leaves that still fail —
whether a single row or a split-exhausted chunk — are logged..." by replacing
"Leaves" with "Rows" so it reads "Rows that still fail — whether a single row or
a split-exhausted chunk — are logged..."; locate the exact phrase "Leaves that
still fail" in the .server-changes/otel-ai-sdk-attribute-truncation.md diff and
perform the single-word correction to fix the wording typo.

the queue so the rest keeps flowing.
11 changes: 11 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,17 @@ const EnvironmentSchema = z
TRIGGER_OTEL_ATTRIBUTE_PER_LINK_COUNT_LIMIT: z.string().default("10"),
TRIGGER_OTEL_ATTRIBUTE_PER_EVENT_COUNT_LIMIT: z.string().default("10"),

// Server-side OTel ingestion limits applied in otlpExporter.server.ts.
// Default per-attribute cap (8KB) is enough for nearly all keys, but
// Vercel AI SDK content keys (ai.prompt*, ai.response.text/object/etc.,
// gen_ai.prompt, gen_ai.completion) carry tens of KB and have a tighter
// dedicated cap. The total cap is a backstop applied to the assembled
// attributes JSON; if exceeded, AI content keys are dropped in priority
// order. Both prevent oversized JSON from breaking ClickHouse inserts.
SERVER_OTEL_SPAN_ATTRIBUTE_VALUE_LENGTH_LIMIT: z.coerce.number().int().default(8192),
SERVER_OTEL_AI_CONTENT_ATTRIBUTE_VALUE_LENGTH_LIMIT: z.coerce.number().int().default(1024),
SERVER_OTEL_SPAN_TOTAL_ATTRIBUTES_LENGTH_LIMIT: z.coerce.number().int().default(32768),

CHECKPOINT_THRESHOLD_IN_MS: z.coerce.number().int().default(30000),

// Internal OTEL environment variables
Expand Down
113 changes: 105 additions & 8 deletions apps/webapp/app/v3/dynamicFlushScheduler.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,24 @@ export type DynamicFlushSchedulerConfig<T> = {
isDroppableEvent?: (item: T) => boolean; // Function to determine if an event can be dropped
};

// Bound on the recursive batch-split safety net. 8 → up to 256-way split,
// which narrows a bad row to a ⌈batchSize / 256⌉-row leaf (~40 rows for a
// 10k batch). Reaching a true singleton would need ⌈log2(batchSize)⌉ levels;
// instead, once this depth is exhausted we drop the whole leaf (see the
// parse-error handling in tryFlush). Each split level only adds one extra
// failing ClickHouse call on the bad-row path, so worst-case latency is
// bounded.
const MAX_SPLIT_DEPTH = 8;

function isClickHouseJsonParseError(error: unknown): boolean {
if (!error) return false;
const message =
typeof error === "object" && error !== null && "message" in error
? String((error as { message?: unknown }).message ?? "")
: String(error);
return message.includes("Cannot parse JSON object");
}

export class DynamicFlushScheduler<T> {
private batchQueue: T[][];
private currentBatch: T[];
Expand All @@ -43,6 +61,10 @@ export class DynamicFlushScheduler<T> {
totalItemsFlushed: 0,
droppedEvents: 0,
droppedEventsByKind: new Map<string, number>(),
// Rows dropped at flush time because ClickHouse rejected them and the
// batch-split safety net couldn't isolate them further. Distinct from
// `droppedEvents`, which counts pre-batch load-shedding drops.
droppedRows: 0,
};
private isShuttingDown: boolean = false;

Expand Down Expand Up @@ -196,40 +218,115 @@ export class DynamicFlushScheduler<T> {
// Schedule all batches for concurrent processing
const flushPromises = batchesToFlush.map((batch) =>
this.limiter(async () => {
const itemCount = batch.length;

const self = this;

async function tryFlush(flushId: string, batchToFlush: T[], attempt: number = 1) {
async function tryFlush(
flushId: string,
batchToFlush: T[],
attempt: number = 1,
splitDepth: number = 0
) {
const subBatchSize = batchToFlush.length;

try {
const startTime = Date.now();
await self.callback(flushId, batchToFlush);

const duration = Date.now() - startTime;
self.totalQueuedItems -= itemCount;
self.totalQueuedItems -= subBatchSize;
self.consecutiveFlushFailures = 0;
self.lastFlushTime = Date.now();
self.metrics.flushedBatches++;
self.metrics.totalItemsFlushed += itemCount;
self.metrics.totalItemsFlushed += subBatchSize;

self.logger.debug("Batch flushed successfully", {
flushId,
itemCount,
itemCount: subBatchSize,
duration,
remainingQueueDepth: self.totalQueuedItems,
activeConcurrency: self.limiter.activeCount,
pendingConcurrency: self.limiter.pendingCount,
});
} catch (error) {
// ClickHouse rejects an entire batch when a single row's
// attributes JSON is unparseable. Retrying the same batch will
// just fail again, so split-and-retry isolates the offender
// instead of poisoning the whole 5–10k-row batch.
const isParseError = isClickHouseJsonParseError(error);

if (isParseError && subBatchSize > 1 && splitDepth < MAX_SPLIT_DEPTH) {
const mid = Math.floor(subBatchSize / 2);
const left = batchToFlush.slice(0, mid);
const right = batchToFlush.slice(mid);

self.logger.warn(
"Splitting OTel batch after ClickHouse JSON parse failure",
{
flushId,
itemCount: subBatchSize,
splitDepth,
leftSize: left.length,
rightSize: right.length,
}
);

// Run halves concurrently and tolerate independent failures —
// a rejection from one half must not prevent the other half
// from completing. Each leaf's tryFlush updates totalQueuedItems
// and metrics on its own success/drop paths.
const results = await Promise.allSettled([
tryFlush(flushId + "-L", left, 1, splitDepth + 1),
tryFlush(flushId + "-R", right, 1, splitDepth + 1),
]);

for (const [index, result] of results.entries()) {
if (result.status === "rejected") {
self.metrics.failedBatches++;
self.logger.error(
"Split half failed after exhausting retries",
{
flushId: flushId + (index === 0 ? "-L" : "-R"),
error: result.reason,
splitDepth: splitDepth + 1,
}
);
}
}
return;
}

if (isParseError && (subBatchSize === 1 || splitDepth >= MAX_SPLIT_DEPTH)) {
// Either a singleton ClickHouse still rejects, or a leaf we can
// no longer split. Drop it so the rest of the queue keeps
// flowing, decrement the queue counter, and log a 1KB sample of
// the first row so the offender can be investigated later
// without dumping multi-KB of attributes into the log.
self.metrics.droppedRows += subBatchSize;
self.metrics.failedBatches++;
self.totalQueuedItems -= subBatchSize;
self.logger.error(
"Dropping OTel rows rejected by ClickHouse JSON parser",
{
flushId,
droppedCount: subBatchSize,
sample: JSON.stringify(batchToFlush[0]).slice(0, 1024),
splitDepth,
reason: subBatchSize === 1 ? "singleton" : "split_depth_exhausted",
}
);
return;
}

self.consecutiveFlushFailures++;
self.metrics.failedBatches++;

self.logger.error("Error attempting to flush batch", {
flushId,
itemCount,
itemCount: subBatchSize,
error,
consecutiveFailures: self.consecutiveFlushFailures,
attempt,
splitDepth,
});

// Back off on failures
Expand All @@ -239,7 +336,7 @@ export class DynamicFlushScheduler<T> {

if (attempt <= 3) {
await new Promise((resolve) => setTimeout(resolve, 500));
return await tryFlush(flushId, batchToFlush, attempt + 1);
return await tryFlush(flushId, batchToFlush, attempt + 1, splitDepth);
} else {
throw error;
}
Expand Down
Loading