diff --git a/.server-changes/otel-ai-sdk-attribute-truncation.md b/.server-changes/otel-ai-sdk-attribute-truncation.md new file mode 100644 index 0000000000..4dcd32f785 --- /dev/null +++ b/.server-changes/otel-ai-sdk-attribute-truncation.md @@ -0,0 +1,21 @@ +--- +area: webapp +type: fix +--- + +Tighten OTel span attribute truncation for Vercel AI SDK content keys +(`ai.prompt*`, `ai.response.text/object/toolCalls/reasoning*`, +`gen_ai.prompt`, `gen_ai.completion`, `gen_ai.request.messages`, +`gen_ai.response.text`) to a 1KB per-attribute cap, plus a 32KB per-span +backstop that drops these content keys in priority order if the assembled +attributes JSON still exceeds it. Cost/token metadata (`ai.usage.*`, +`ai.model.*`, `gen_ai.usage.*`, `gen_ai.response.model`, etc.) keeps the +default 8KB cap so LLM enrichment continues to work. + +Adds a parse-error-aware safety net in `DynamicFlushScheduler`: when +ClickHouse rejects a batch with `Cannot parse JSON object here`, the +batch is split in half and retried (up to 8 split levels / 256-way +isolation) instead of failing all 5–10k rows at once. Leaves that +still fail — whether a single row or a split-exhausted chunk — are +logged with a 1KB sample, counted in `droppedRows`, and removed from +the queue so the rest keeps flowing. diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index 97cccbc171..8c11dba319 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -498,6 +498,17 @@ const EnvironmentSchema = z TRIGGER_OTEL_ATTRIBUTE_PER_LINK_COUNT_LIMIT: z.string().default("10"), TRIGGER_OTEL_ATTRIBUTE_PER_EVENT_COUNT_LIMIT: z.string().default("10"), + // Server-side OTel ingestion limits applied in otlpExporter.server.ts. + // Default per-attribute cap (8KB) is enough for nearly all keys, but + // Vercel AI SDK content keys (ai.prompt*, ai.response.text/object/etc., + // gen_ai.prompt, gen_ai.completion) carry tens of KB and have a tighter + // dedicated cap. The total cap is a backstop applied to the assembled + // attributes JSON; if exceeded, AI content keys are dropped in priority + // order. Both prevent oversized JSON from breaking ClickHouse inserts. + SERVER_OTEL_SPAN_ATTRIBUTE_VALUE_LENGTH_LIMIT: z.coerce.number().int().default(8192), + SERVER_OTEL_AI_CONTENT_ATTRIBUTE_VALUE_LENGTH_LIMIT: z.coerce.number().int().default(1024), + SERVER_OTEL_SPAN_TOTAL_ATTRIBUTES_LENGTH_LIMIT: z.coerce.number().int().default(32768), + CHECKPOINT_THRESHOLD_IN_MS: z.coerce.number().int().default(30000), // Internal OTEL environment variables diff --git a/apps/webapp/app/v3/dynamicFlushScheduler.server.ts b/apps/webapp/app/v3/dynamicFlushScheduler.server.ts index 9f2cffec9e..17e4aeb93d 100644 --- a/apps/webapp/app/v3/dynamicFlushScheduler.server.ts +++ b/apps/webapp/app/v3/dynamicFlushScheduler.server.ts @@ -18,6 +18,24 @@ export type DynamicFlushSchedulerConfig = { isDroppableEvent?: (item: T) => boolean; // Function to determine if an event can be dropped }; +// Bound on the recursive batch-split safety net. 8 → up to 256-way split, +// which narrows a bad row to a ⌈batchSize / 256⌉-row leaf (~40 rows for a +// 10k batch). Reaching a true singleton would need ⌈log2(batchSize)⌉ levels; +// instead, once this depth is exhausted we drop the whole leaf (see the +// parse-error handling in tryFlush). Each split level only adds one extra +// failing ClickHouse call on the bad-row path, so worst-case latency is +// bounded. +const MAX_SPLIT_DEPTH = 8; + +function isClickHouseJsonParseError(error: unknown): boolean { + if (!error) return false; + const message = + typeof error === "object" && error !== null && "message" in error + ? String((error as { message?: unknown }).message ?? "") + : String(error); + return message.includes("Cannot parse JSON object"); +} + export class DynamicFlushScheduler { private batchQueue: T[][]; private currentBatch: T[]; @@ -43,6 +61,10 @@ export class DynamicFlushScheduler { totalItemsFlushed: 0, droppedEvents: 0, droppedEventsByKind: new Map(), + // Rows dropped at flush time because ClickHouse rejected them and the + // batch-split safety net couldn't isolate them further. Distinct from + // `droppedEvents`, which counts pre-batch load-shedding drops. + droppedRows: 0, }; private isShuttingDown: boolean = false; @@ -196,40 +218,115 @@ export class DynamicFlushScheduler { // Schedule all batches for concurrent processing const flushPromises = batchesToFlush.map((batch) => this.limiter(async () => { - const itemCount = batch.length; - const self = this; - async function tryFlush(flushId: string, batchToFlush: T[], attempt: number = 1) { + async function tryFlush( + flushId: string, + batchToFlush: T[], + attempt: number = 1, + splitDepth: number = 0 + ) { + const subBatchSize = batchToFlush.length; + try { const startTime = Date.now(); await self.callback(flushId, batchToFlush); const duration = Date.now() - startTime; - self.totalQueuedItems -= itemCount; + self.totalQueuedItems -= subBatchSize; self.consecutiveFlushFailures = 0; self.lastFlushTime = Date.now(); self.metrics.flushedBatches++; - self.metrics.totalItemsFlushed += itemCount; + self.metrics.totalItemsFlushed += subBatchSize; self.logger.debug("Batch flushed successfully", { flushId, - itemCount, + itemCount: subBatchSize, duration, remainingQueueDepth: self.totalQueuedItems, activeConcurrency: self.limiter.activeCount, pendingConcurrency: self.limiter.pendingCount, }); } catch (error) { + // ClickHouse rejects an entire batch when a single row's + // attributes JSON is unparseable. Retrying the same batch will + // just fail again, so split-and-retry isolates the offender + // instead of poisoning the whole 5–10k-row batch. + const isParseError = isClickHouseJsonParseError(error); + + if (isParseError && subBatchSize > 1 && splitDepth < MAX_SPLIT_DEPTH) { + const mid = Math.floor(subBatchSize / 2); + const left = batchToFlush.slice(0, mid); + const right = batchToFlush.slice(mid); + + self.logger.warn( + "Splitting OTel batch after ClickHouse JSON parse failure", + { + flushId, + itemCount: subBatchSize, + splitDepth, + leftSize: left.length, + rightSize: right.length, + } + ); + + // Run halves concurrently and tolerate independent failures — + // a rejection from one half must not prevent the other half + // from completing. Each leaf's tryFlush updates totalQueuedItems + // and metrics on its own success/drop paths. + const results = await Promise.allSettled([ + tryFlush(flushId + "-L", left, 1, splitDepth + 1), + tryFlush(flushId + "-R", right, 1, splitDepth + 1), + ]); + + for (const [index, result] of results.entries()) { + if (result.status === "rejected") { + self.metrics.failedBatches++; + self.logger.error( + "Split half failed after exhausting retries", + { + flushId: flushId + (index === 0 ? "-L" : "-R"), + error: result.reason, + splitDepth: splitDepth + 1, + } + ); + } + } + return; + } + + if (isParseError && (subBatchSize === 1 || splitDepth >= MAX_SPLIT_DEPTH)) { + // Either a singleton ClickHouse still rejects, or a leaf we can + // no longer split. Drop it so the rest of the queue keeps + // flowing, decrement the queue counter, and log a 1KB sample of + // the first row so the offender can be investigated later + // without dumping multi-KB of attributes into the log. + self.metrics.droppedRows += subBatchSize; + self.metrics.failedBatches++; + self.totalQueuedItems -= subBatchSize; + self.logger.error( + "Dropping OTel rows rejected by ClickHouse JSON parser", + { + flushId, + droppedCount: subBatchSize, + sample: JSON.stringify(batchToFlush[0]).slice(0, 1024), + splitDepth, + reason: subBatchSize === 1 ? "singleton" : "split_depth_exhausted", + } + ); + return; + } + self.consecutiveFlushFailures++; self.metrics.failedBatches++; self.logger.error("Error attempting to flush batch", { flushId, - itemCount, + itemCount: subBatchSize, error, consecutiveFailures: self.consecutiveFlushFailures, attempt, + splitDepth, }); // Back off on failures @@ -239,7 +336,7 @@ export class DynamicFlushScheduler { if (attempt <= 3) { await new Promise((resolve) => setTimeout(resolve, 500)); - return await tryFlush(flushId, batchToFlush, attempt + 1); + return await tryFlush(flushId, batchToFlush, attempt + 1, splitDepth); } else { throw error; } diff --git a/apps/webapp/app/v3/otlpAttributeLimits.ts b/apps/webapp/app/v3/otlpAttributeLimits.ts new file mode 100644 index 0000000000..4f66314f0b --- /dev/null +++ b/apps/webapp/app/v3/otlpAttributeLimits.ts @@ -0,0 +1,207 @@ +/** + * Pure helpers for OTel attribute truncation and per-span size capping. + * Lives in a separate module from `otlpExporter.server.ts` so tests can + * import the helpers without dragging in the env-parsing side effect of + * the server module's transitive dependencies. + */ + +export type AttributeValue = string | number | boolean | undefined; +export type AttributeMap = Record; + +/** + * Per-key cap overrides for `truncateAttributes`. A key matches an override + * when `key === prefix` or `key.startsWith(prefix + ".")` — i.e. the prefix + * covers the attribute itself and any dotted children. First matching entry + * wins; later entries are ignored. + */ +export type AttributeKeyOverride = { prefix: string; limit: number }; + +export type SpanAttributeLimits = { + /** Per-attribute cap applied to every string-valued attribute. */ + defaultValueLengthLimit: number; + /** + * Per-attribute cap applied only to known Vercel AI SDK content keys. + * These attributes (`ai.prompt*`, `ai.response.text/object/toolCalls/reasoning*`, + * `gen_ai.prompt`, `gen_ai.completion`, `gen_ai.request.messages`, + * `gen_ai.response.text`) routinely carry tens of KB of user prompt or + * model response, which is enough to push the assembled per-row JSON past + * ClickHouse's parse tolerance even after the default 8KB cap. + */ + aiContentValueLengthLimit: number; + /** + * Backstop: if the serialized size of all truncated attributes still + * exceeds this many bytes, the AI content keys are dropped in priority + * order until the assembled JSON is under budget. Cost/token metadata is + * preserved. + */ + totalAttributesLengthLimit: number; +}; + +/** + * Vercel AI SDK content keys to cap aggressively. Keep cost/token metadata + * out of this list — `ai.usage.*`, `ai.model.*`, `ai.operationId`, + * `ai.settings.*`, `ai.telemetry.*`, `gen_ai.usage.*`, + * `gen_ai.response.model`, `gen_ai.request.model`, `gen_ai.system`, and + * `gen_ai.operation.name` are needed by `enrichCreatableEvents` for cost + * and LLM enrichment. + */ +export const AI_CONTENT_KEY_OVERRIDES = (limit: number): AttributeKeyOverride[] => [ + // `ai.prompt` covers `ai.prompt`, `ai.prompt.messages`, `ai.prompt.format`, + // `ai.prompt.tools`, `ai.prompt.toolChoice`, `ai.prompt.system`. + { prefix: "ai.prompt", limit }, + { prefix: "ai.response.text", limit }, + { prefix: "ai.response.object", limit }, + { prefix: "ai.response.toolCalls", limit }, + { prefix: "ai.response.reasoning", limit }, + { prefix: "ai.response.reasoningDetails", limit }, + { prefix: "gen_ai.prompt", limit }, + { prefix: "gen_ai.completion", limit }, + { prefix: "gen_ai.request.messages", limit }, + { prefix: "gen_ai.response.text", limit }, +]; + +/** + * Priority list of keys to drop when the assembled attributes JSON exceeds + * the total-size budget. Higher up = dropped first. Each entry is a prefix + * (same semantics as `AttributeKeyOverride`). + */ +export const AI_CONTENT_DROP_PRIORITY: string[] = [ + "ai.prompt.messages", + "ai.prompt", + "ai.response.object", + "ai.response.text", + "ai.response.toolCalls", + "ai.response.reasoning", + "ai.response.reasoningDetails", + "gen_ai.prompt", + "gen_ai.completion", + "gen_ai.request.messages", + "gen_ai.response.text", +]; + +function matchKeyOverride( + key: string, + overrides: AttributeKeyOverride[] | undefined +): AttributeKeyOverride | undefined { + if (!overrides) return undefined; + for (const override of overrides) { + if (key === override.prefix || key.startsWith(override.prefix + ".")) { + return override; + } + } + return undefined; +} + +export function truncateAttributes( + attributes: AttributeMap | undefined, + maximumLength: number = 1024, + keyOverrides?: AttributeKeyOverride[] +): AttributeMap | undefined { + if (!attributes) return undefined; + + const truncatedAttributes: AttributeMap = {}; + + for (const [key, value] of Object.entries(attributes)) { + if (!key) continue; + + if (typeof value === "string") { + const override = matchKeyOverride(key, keyOverrides); + const limit = override ? override.limit : maximumLength; + truncatedAttributes[key] = truncateAndDetectUnpairedSurrogate(value, limit); + } else { + truncatedAttributes[key] = value; + } + } + + return truncatedAttributes; +} + +/** + * Backstop applied after per-attribute truncation. If `JSON.stringify(attrs)` + * is still over `maxBytes`, walk `AI_CONTENT_DROP_PRIORITY` and remove any + * attributes that match (by `key === prefix` or `key.startsWith(prefix + ".")`) + * until the assembled size is under budget or the list is exhausted. + * + * Returns the original `attributes` reference unchanged when already under + * budget; otherwise returns a new object with the offending keys removed. + * + * If the size is still over budget after exhausting the drop list, calls + * `onResidualOverflow` (if provided) with the remaining size so the caller + * can log it. Downstream protection lives in + * `DynamicFlushScheduler.tryFlush`'s batch-split branch. + */ +export function capAssembledAttributesSize( + attributes: AttributeMap | undefined, + maxBytes: number, + onResidualOverflow?: (info: { remainingBytes: number; maxBytes: number }) => void +): AttributeMap { + if (!attributes) return {}; + if (maxBytes <= 0) return attributes; + + let serialized = JSON.stringify(attributes); + if (serialized.length <= maxBytes) return attributes; + + const result: AttributeMap = { ...attributes }; + + for (const prefix of AI_CONTENT_DROP_PRIORITY) { + for (const key of Object.keys(result)) { + if (key === prefix || key.startsWith(prefix + ".")) { + delete result[key]; + } + } + serialized = JSON.stringify(result); + if (serialized.length <= maxBytes) return result; + } + + onResidualOverflow?.({ remainingBytes: serialized.length, maxBytes }); + return result; +} + +function truncateAndDetectUnpairedSurrogate(str: string, maximumLength: number): string { + const truncatedString = smartTruncateString(str, maximumLength); + + if (hasUnpairedSurrogateAtEnd(truncatedString)) { + return smartTruncateString(truncatedString, [...truncatedString].length - 1); + } + + return truncatedString; +} + +const ASCII_ONLY_REGEX = /^[\p{ASCII}]*$/u; + +function smartTruncateString(str: string, maximumLength: number): string { + if (!str) return ""; + if (str.length <= maximumLength) return str; + + const checkLength = Math.min(str.length, maximumLength * 2 + 2); + + if (ASCII_ONLY_REGEX.test(str.slice(0, checkLength))) { + return str.slice(0, maximumLength); + } + + return [...str.slice(0, checkLength)].slice(0, maximumLength).join(""); +} + +function hasUnpairedSurrogateAtEnd(str: string): boolean { + if (str.length === 0) return false; + + const lastCode = str.charCodeAt(str.length - 1); + + // Check if last character is an unpaired high surrogate + if (lastCode >= 0xd800 && lastCode <= 0xdbff) { + return true; // High surrogate at end = unpaired + } + + // Check if last character is an unpaired low surrogate + if (lastCode >= 0xdc00 && lastCode <= 0xdfff) { + // Low surrogate is only valid if preceded by high surrogate + if (str.length === 1) return true; // Single low surrogate + + const secondLastCode = str.charCodeAt(str.length - 2); + if (secondLastCode < 0xd800 || secondLastCode > 0xdbff) { + return true; // Low surrogate not preceded by high surrogate + } + } + + return false; +} diff --git a/apps/webapp/app/v3/otlpExporter.server.ts b/apps/webapp/app/v3/otlpExporter.server.ts index 7505693e3a..f4cfbd9c9a 100644 --- a/apps/webapp/app/v3/otlpExporter.server.ts +++ b/apps/webapp/app/v3/otlpExporter.server.ts @@ -41,6 +41,12 @@ import { waitForLlmPricingReady } from "./llmPricingRegistry.server"; import { env } from "~/env.server"; import { detectBadJsonStrings } from "~/utils/detectBadJsonStrings"; import { singleton } from "~/utils/singleton"; +import { + AI_CONTENT_KEY_OVERRIDES, + capAssembledAttributesSize, + truncateAttributes, + type SpanAttributeLimits, +} from "./otlpAttributeLimits"; class OTLPExporter { private _tracer: Tracer; @@ -51,7 +57,7 @@ class OTLPExporter { private readonly _clickhouseEventRepositoryV2: ClickhouseEventRepository, private readonly _metricsFlushScheduler: DynamicFlushScheduler, private readonly _verbose: boolean, - private readonly _spanAttributeValueLengthLimit: number + private readonly _attributeLimits: SpanAttributeLimits ) { this._tracer = trace.getTracer("otlp-exporter"); } @@ -62,7 +68,7 @@ class OTLPExporter { const eventsWithStores = this.#filterResourceSpans(request.resourceSpans).flatMap( (resourceSpan) => { - return convertSpansToCreateableEvents(resourceSpan, this._spanAttributeValueLengthLimit); + return convertSpansToCreateableEvents(resourceSpan, this._attributeLimits); } ); @@ -82,7 +88,7 @@ class OTLPExporter { (resourceMetrics) => { return convertMetricsToClickhouseRows( resourceMetrics, - this._spanAttributeValueLengthLimit + this._attributeLimits.defaultValueLengthLimit ); } ); @@ -103,7 +109,7 @@ class OTLPExporter { const eventsWithStores = this.#filterResourceLogs(request.resourceLogs).flatMap( (resourceLog) => { - return convertLogsToCreateableEvents(resourceLog, this._spanAttributeValueLengthLimit); + return convertLogsToCreateableEvents(resourceLog, this._attributeLimits); } ); @@ -250,12 +256,13 @@ class OTLPExporter { function convertLogsToCreateableEvents( resourceLog: ResourceLogs, - spanAttributeValueLengthLimit: number + attributeLimits: SpanAttributeLimits ): { events: Array; taskEventStore: string } { const resourceAttributes = resourceLog.resource?.attributes ?? []; const resourceProperties = extractEventProperties(resourceAttributes); + // Resource attributes don't carry AI SDK content; no per-key overrides needed. const userDefinedResourceAttributes = truncateAttributes( convertKeyValueItemsToMap(resourceAttributes ?? [], [], undefined, [ SemanticInternalAttributes.USAGE, @@ -271,7 +278,7 @@ function convertLogsToCreateableEvents( "cli", "cloud", ]), - spanAttributeValueLengthLimit + attributeLimits.defaultValueLengthLimit ); const taskEventStore = @@ -292,7 +299,7 @@ function convertLogsToCreateableEvents( SemanticInternalAttributes.METADATA ); - const properties = + const properties = capAssembledAttributesSize( truncateAttributes( convertKeyValueItemsToMap(log.attributes ?? [], [], undefined, [ SemanticInternalAttributes.USAGE, @@ -302,8 +309,11 @@ function convertLogsToCreateableEvents( SemanticInternalAttributes.METRIC_EVENTS, SemanticInternalAttributes.TRIGGER, ]), - spanAttributeValueLengthLimit - ) ?? {}; + attributeLimits.defaultValueLengthLimit, + AI_CONTENT_KEY_OVERRIDES(attributeLimits.aiContentValueLengthLimit) + ) ?? {}, + attributeLimits.totalAttributesLengthLimit + ); return { traceId: binaryToHex(log.traceId), @@ -351,12 +361,13 @@ function convertLogsToCreateableEvents( function convertSpansToCreateableEvents( resourceSpan: ResourceSpans, - spanAttributeValueLengthLimit: number + attributeLimits: SpanAttributeLimits ): { events: Array; taskEventStore: string } { const resourceAttributes = resourceSpan.resource?.attributes ?? []; const resourceProperties = extractEventProperties(resourceAttributes); + // Resource attributes don't carry AI SDK content; no per-key overrides needed. const userDefinedResourceAttributes = truncateAttributes( convertKeyValueItemsToMap(resourceAttributes ?? [], [], undefined, [ SemanticInternalAttributes.USAGE, @@ -372,7 +383,7 @@ function convertSpansToCreateableEvents( "cli", "cloud", ]), - spanAttributeValueLengthLimit + attributeLimits.defaultValueLengthLimit ); const taskEventStore = @@ -395,7 +406,7 @@ function convertSpansToCreateableEvents( const runTags = extractArrayAttribute(span.attributes ?? [], SemanticInternalAttributes.RUN_TAGS); - const properties = + const properties = capAssembledAttributesSize( truncateAttributes( convertKeyValueItemsToMap(span.attributes ?? [], [], undefined, [ SemanticInternalAttributes.USAGE, @@ -405,8 +416,11 @@ function convertSpansToCreateableEvents( SemanticInternalAttributes.METRIC_EVENTS, SemanticInternalAttributes.TRIGGER, ]), - spanAttributeValueLengthLimit - ) ?? {}; + attributeLimits.defaultValueLengthLimit, + AI_CONTENT_KEY_OVERRIDES(attributeLimits.aiContentValueLengthLimit) + ) ?? {}, + attributeLimits.totalAttributesLengthLimit + ); return { traceId: binaryToHex(span.traceId), @@ -469,7 +483,7 @@ function floorToTenSecondBucket(timeUnixNano: bigint | number): string { function convertMetricsToClickhouseRows( resourceMetrics: ResourceMetrics, - spanAttributeValueLengthLimit: number + _spanAttributeValueLengthLimit: number ): MetricsV1Input[] { const resourceAttributes = resourceMetrics.resource?.attributes ?? []; const resourceProperties = extractEventProperties(resourceAttributes); @@ -1100,76 +1114,6 @@ function binaryToHex(buffer: Buffer | string | undefined): string | undefined { return Buffer.from(Array.from(buffer)).toString("hex"); } -function truncateAttributes( - attributes: Record | undefined, - maximumLength: number = 1024 -): Record | undefined { - if (!attributes) return undefined; - - const truncatedAttributes: Record = {}; - - for (const [key, value] of Object.entries(attributes)) { - if (!key) continue; - - if (typeof value === "string") { - truncatedAttributes[key] = truncateAndDetectUnpairedSurrogate(value, maximumLength); - } else { - truncatedAttributes[key] = value; - } - } - - return truncatedAttributes; -} - -function truncateAndDetectUnpairedSurrogate(str: string, maximumLength: number): string { - const truncatedString = smartTruncateString(str, maximumLength); - - if (hasUnpairedSurrogateAtEnd(truncatedString)) { - return smartTruncateString(truncatedString, [...truncatedString].length - 1); - } - - return truncatedString; -} - -const ASCII_ONLY_REGEX = /^[\p{ASCII}]*$/u; - -function smartTruncateString(str: string, maximumLength: number): string { - if (!str) return ""; - if (str.length <= maximumLength) return str; - - const checkLength = Math.min(str.length, maximumLength * 2 + 2); - - if (ASCII_ONLY_REGEX.test(str.slice(0, checkLength))) { - return str.slice(0, maximumLength); - } - - return [...str.slice(0, checkLength)].slice(0, maximumLength).join(""); -} - -function hasUnpairedSurrogateAtEnd(str: string): boolean { - if (str.length === 0) return false; - - const lastCode = str.charCodeAt(str.length - 1); - - // Check if last character is an unpaired high surrogate - if (lastCode >= 0xd800 && lastCode <= 0xdbff) { - return true; // High surrogate at end = unpaired - } - - // Check if last character is an unpaired low surrogate - if (lastCode >= 0xdc00 && lastCode <= 0xdfff) { - // Low surrogate is only valid if preceded by high surrogate - if (str.length === 1) return true; // Single low surrogate - - const secondLastCode = str.charCodeAt(str.length - 2); - if (secondLastCode < 0xd800 || secondLastCode > 0xdbff) { - return true; // Low surrogate not preceded by high surrogate - } - } - - return false; -} - export const otlpExporter = singleton("otlpExporter", initializeOTLPExporter); function initializeOTLPExporter() { @@ -1184,14 +1128,18 @@ function initializeOTLPExporter() { loadSheddingEnabled: false, }); + const attributeLimits: SpanAttributeLimits = { + defaultValueLengthLimit: env.SERVER_OTEL_SPAN_ATTRIBUTE_VALUE_LENGTH_LIMIT, + aiContentValueLengthLimit: env.SERVER_OTEL_AI_CONTENT_ATTRIBUTE_VALUE_LENGTH_LIMIT, + totalAttributesLengthLimit: env.SERVER_OTEL_SPAN_TOTAL_ATTRIBUTES_LENGTH_LIMIT, + }; + return new OTLPExporter( eventRepository, clickhouseEventRepository, clickhouseEventRepositoryV2, metricsFlushScheduler, process.env.OTLP_EXPORTER_VERBOSE === "1", - process.env.SERVER_OTEL_SPAN_ATTRIBUTE_VALUE_LENGTH_LIMIT - ? parseInt(process.env.SERVER_OTEL_SPAN_ATTRIBUTE_VALUE_LENGTH_LIMIT, 10) - : 8192 + attributeLimits ); } \ No newline at end of file diff --git a/apps/webapp/test/otlpAttributeLimits.test.ts b/apps/webapp/test/otlpAttributeLimits.test.ts new file mode 100644 index 0000000000..56d47757c0 --- /dev/null +++ b/apps/webapp/test/otlpAttributeLimits.test.ts @@ -0,0 +1,194 @@ +import { + AI_CONTENT_DROP_PRIORITY, + AI_CONTENT_KEY_OVERRIDES, + capAssembledAttributesSize, + truncateAttributes, +} from "~/v3/otlpAttributeLimits"; + +describe("truncateAttributes", () => { + it("truncates string values to the default cap", () => { + const out = truncateAttributes({ "user.message": "x".repeat(10_000) }, 100); + expect(typeof out?.["user.message"]).toBe("string"); + expect((out?.["user.message"] as string).length).toBeLessThanOrEqual(100); + }); + + it("leaves non-string values untouched", () => { + const out = truncateAttributes( + { + "user.count": 42, + "user.flag": true, + "user.missing": undefined, + }, + 100 + ); + expect(out?.["user.count"]).toBe(42); + expect(out?.["user.flag"]).toBe(true); + expect(out?.["user.missing"]).toBeUndefined(); + }); + + it("applies a per-key override to matching prefix and dotted children", () => { + const overrides = [{ prefix: "ai.prompt", limit: 32 }]; + const out = truncateAttributes( + { + "ai.prompt": "p".repeat(1000), + "ai.prompt.messages": "m".repeat(1000), + "ai.prompt.tools": "t".repeat(1000), + "ai.model.id": "claude-sonnet-4-20250514", + }, + 4096, + overrides + ); + expect((out?.["ai.prompt"] as string).length).toBeLessThanOrEqual(32); + expect((out?.["ai.prompt.messages"] as string).length).toBeLessThanOrEqual(32); + expect((out?.["ai.prompt.tools"] as string).length).toBeLessThanOrEqual(32); + // Non-matching key keeps the default cap (well below it). + expect(out?.["ai.model.id"]).toBe("claude-sonnet-4-20250514"); + }); + + it("first-matching override wins", () => { + const overrides = [ + { prefix: "ai.prompt", limit: 8 }, + { prefix: "ai.prompt.messages", limit: 1024 }, // shadowed + ]; + const out = truncateAttributes( + { "ai.prompt.messages": "x".repeat(500) }, + 4096, + overrides + ); + expect((out?.["ai.prompt.messages"] as string).length).toBeLessThanOrEqual(8); + }); + + it("does not match a key that only shares a prefix substring (no dot boundary)", () => { + const overrides = [{ prefix: "ai.prompt", limit: 8 }]; + const out = truncateAttributes( + // "ai.prompts" should NOT match "ai.prompt"; the override requires + // exact match or a dot boundary. + { "ai.prompts": "x".repeat(500) }, + 4096, + overrides + ); + expect((out?.["ai.prompts"] as string).length).toBe(500); + }); + + it("returns undefined when input is undefined", () => { + expect(truncateAttributes(undefined, 100)).toBeUndefined(); + }); +}); + +describe("AI_CONTENT_KEY_OVERRIDES", () => { + it("targets prompt content keys but not cost / model metadata", () => { + const overrides = AI_CONTENT_KEY_OVERRIDES(1024); + const prefixes = overrides.map((o) => o.prefix); + + // Content keys we want capped. + expect(prefixes).toContain("ai.prompt"); + expect(prefixes).toContain("ai.response.text"); + expect(prefixes).toContain("ai.response.object"); + expect(prefixes).toContain("ai.response.toolCalls"); + expect(prefixes).toContain("gen_ai.prompt"); + expect(prefixes).toContain("gen_ai.completion"); + expect(prefixes).toContain("gen_ai.request.messages"); + + // Cost/model metadata MUST NOT be in the override list — those keys + // feed enrichCreatableEvents and the dashboard's LLM pills. + expect(prefixes).not.toContain("ai.usage"); + expect(prefixes).not.toContain("ai.model"); + expect(prefixes).not.toContain("ai.operationId"); + expect(prefixes).not.toContain("ai.settings"); + expect(prefixes).not.toContain("ai.telemetry"); + expect(prefixes).not.toContain("gen_ai.usage"); + expect(prefixes).not.toContain("gen_ai.response.model"); + expect(prefixes).not.toContain("gen_ai.request.model"); + expect(prefixes).not.toContain("gen_ai.system"); + expect(prefixes).not.toContain("gen_ai.operation.name"); + + // Every override carries the limit we passed in. + for (const o of overrides) { + expect(o.limit).toBe(1024); + } + }); +}); + +describe("capAssembledAttributesSize", () => { + it("is a no-op when input is already under budget", () => { + const input = { + "ai.prompt.messages": "small", + "ai.usage.input_tokens": 10, + }; + const out = capAssembledAttributesSize(input, 4096); + expect(out).toEqual(input); + }); + + it("returns an empty object when input is undefined", () => { + expect(capAssembledAttributesSize(undefined, 4096)).toEqual({}); + }); + + it("drops AI content keys in priority order until under budget", () => { + // Build a payload where the AI content alone overflows 2KB but cost + // metadata fits comfortably. After dropping `ai.prompt.messages` the + // remaining payload is well under budget, so subsequent priority + // entries should NOT be dropped. + const input = { + "ai.prompt.messages": "x".repeat(4000), + "ai.response.text": "y", + "ai.response.object": "z", + "ai.usage.input_tokens": 100, + "ai.usage.output_tokens": 50, + "gen_ai.response.model": "claude-sonnet-4-20250514", + "ai.model.id": "claude-sonnet-4-20250514", + } as Record; + + const out = capAssembledAttributesSize(input, 2048); + + expect(out["ai.prompt.messages"]).toBeUndefined(); + // Cost / model metadata is preserved. + expect(out["ai.usage.input_tokens"]).toBe(100); + expect(out["ai.usage.output_tokens"]).toBe(50); + expect(out["gen_ai.response.model"]).toBe("claude-sonnet-4-20250514"); + expect(out["ai.model.id"]).toBe("claude-sonnet-4-20250514"); + // Lower-priority content keys that weren't needed to fit the budget + // are preserved. + expect(out["ai.response.text"]).toBe("y"); + expect(out["ai.response.object"]).toBe("z"); + expect(JSON.stringify(out).length).toBeLessThanOrEqual(2048); + }); + + it("drops keys with dotted children matching a priority prefix", () => { + const input = { + "ai.prompt": "p".repeat(1000), + "ai.prompt.messages": "m".repeat(1000), + "ai.prompt.tools": "t".repeat(1000), + "ai.usage.input_tokens": 5, + } as Record; + + // Cap below the ai.prompt.* payload total so the whole prompt namespace + // gets dropped. + const out = capAssembledAttributesSize(input, 256); + + expect(out["ai.prompt"]).toBeUndefined(); + expect(out["ai.prompt.messages"]).toBeUndefined(); + expect(out["ai.prompt.tools"]).toBeUndefined(); + expect(out["ai.usage.input_tokens"]).toBe(5); + }); + + it("AI_CONTENT_DROP_PRIORITY puts highest-volume content first", () => { + // ai.prompt.messages is the heaviest in practice (full conversation + // history). It should be the first thing to go. + expect(AI_CONTENT_DROP_PRIORITY[0]).toBe("ai.prompt.messages"); + }); + + it("preserves non-AI attributes regardless of priority list", () => { + const input = { + "user.event": "y".repeat(5000), + "ai.prompt.messages": "x".repeat(5000), + } as Record; + + const out = capAssembledAttributesSize(input, 1024); + + expect(out["ai.prompt.messages"]).toBeUndefined(); + // User-defined non-AI attribute stays even though it's the only thing + // pushing the budget. The drop list only covers AI content prefixes — + // we don't silently shrink customer data. + expect(typeof out["user.event"]).toBe("string"); + }); +});