@@ -18,6 +18,24 @@ export type DynamicFlushSchedulerConfig<T> = {
1818 isDroppableEvent ?: ( item : T ) => boolean ; // Function to determine if an event can be dropped
1919} ;
2020
21+ // Bound on the recursive batch-split safety net. 8 → up to 256-way split,
22+ // which narrows a bad row to a ⌈batchSize / 256⌉-row leaf (~40 rows for a
23+ // 10k batch). Reaching a true singleton would need ⌈log2(batchSize)⌉ levels;
24+ // instead, once this depth is exhausted we drop the whole leaf (see the
25+ // parse-error handling in tryFlush). Each split level only adds one extra
26+ // failing ClickHouse call on the bad-row path, so worst-case latency is
27+ // bounded.
28+ const MAX_SPLIT_DEPTH = 8 ;
29+
30+ function isClickHouseJsonParseError ( error : unknown ) : boolean {
31+ if ( ! error ) return false ;
32+ const message =
33+ typeof error === "object" && error !== null && "message" in error
34+ ? String ( ( error as { message ?: unknown } ) . message ?? "" )
35+ : String ( error ) ;
36+ return message . includes ( "Cannot parse JSON object" ) ;
37+ }
38+
2139export class DynamicFlushScheduler < T > {
2240 private batchQueue : T [ ] [ ] ;
2341 private currentBatch : T [ ] ;
@@ -43,6 +61,10 @@ export class DynamicFlushScheduler<T> {
4361 totalItemsFlushed : 0 ,
4462 droppedEvents : 0 ,
4563 droppedEventsByKind : new Map < string , number > ( ) ,
64+ // Rows dropped at flush time because ClickHouse rejected them and the
65+ // batch-split safety net couldn't isolate them further. Distinct from
66+ // `droppedEvents`, which counts pre-batch load-shedding drops.
67+ droppedRows : 0 ,
4668 } ;
4769 private isShuttingDown : boolean = false ;
4870
@@ -196,40 +218,115 @@ export class DynamicFlushScheduler<T> {
196218 // Schedule all batches for concurrent processing
197219 const flushPromises = batchesToFlush . map ( ( batch ) =>
198220 this . limiter ( async ( ) => {
199- const itemCount = batch . length ;
200-
201221 const self = this ;
202222
203- async function tryFlush ( flushId : string , batchToFlush : T [ ] , attempt : number = 1 ) {
223+ async function tryFlush (
224+ flushId : string ,
225+ batchToFlush : T [ ] ,
226+ attempt : number = 1 ,
227+ splitDepth : number = 0
228+ ) {
229+ const subBatchSize = batchToFlush . length ;
230+
204231 try {
205232 const startTime = Date . now ( ) ;
206233 await self . callback ( flushId , batchToFlush ) ;
207234
208235 const duration = Date . now ( ) - startTime ;
209- self . totalQueuedItems -= itemCount ;
236+ self . totalQueuedItems -= subBatchSize ;
210237 self . consecutiveFlushFailures = 0 ;
211238 self . lastFlushTime = Date . now ( ) ;
212239 self . metrics . flushedBatches ++ ;
213- self . metrics . totalItemsFlushed += itemCount ;
240+ self . metrics . totalItemsFlushed += subBatchSize ;
214241
215242 self . logger . debug ( "Batch flushed successfully" , {
216243 flushId,
217- itemCount,
244+ itemCount : subBatchSize ,
218245 duration,
219246 remainingQueueDepth : self . totalQueuedItems ,
220247 activeConcurrency : self . limiter . activeCount ,
221248 pendingConcurrency : self . limiter . pendingCount ,
222249 } ) ;
223250 } catch ( error ) {
251+ // ClickHouse rejects an entire batch when a single row's
252+ // attributes JSON is unparseable. Retrying the same batch will
253+ // just fail again, so split-and-retry isolates the offender
254+ // instead of poisoning the whole 5–10k-row batch.
255+ const isParseError = isClickHouseJsonParseError ( error ) ;
256+
257+ if ( isParseError && subBatchSize > 1 && splitDepth < MAX_SPLIT_DEPTH ) {
258+ const mid = Math . floor ( subBatchSize / 2 ) ;
259+ const left = batchToFlush . slice ( 0 , mid ) ;
260+ const right = batchToFlush . slice ( mid ) ;
261+
262+ self . logger . warn (
263+ "Splitting OTel batch after ClickHouse JSON parse failure" ,
264+ {
265+ flushId,
266+ itemCount : subBatchSize ,
267+ splitDepth,
268+ leftSize : left . length ,
269+ rightSize : right . length ,
270+ }
271+ ) ;
272+
273+ // Run halves concurrently and tolerate independent failures —
274+ // a rejection from one half must not prevent the other half
275+ // from completing. Each leaf's tryFlush updates totalQueuedItems
276+ // and metrics on its own success/drop paths.
277+ const results = await Promise . allSettled ( [
278+ tryFlush ( flushId + "-L" , left , 1 , splitDepth + 1 ) ,
279+ tryFlush ( flushId + "-R" , right , 1 , splitDepth + 1 ) ,
280+ ] ) ;
281+
282+ for ( const [ index , result ] of results . entries ( ) ) {
283+ if ( result . status === "rejected" ) {
284+ self . metrics . failedBatches ++ ;
285+ self . logger . error (
286+ "Split half failed after exhausting retries" ,
287+ {
288+ flushId : flushId + ( index === 0 ? "-L" : "-R" ) ,
289+ error : result . reason ,
290+ splitDepth : splitDepth + 1 ,
291+ }
292+ ) ;
293+ }
294+ }
295+ return ;
296+ }
297+
298+ if ( isParseError && ( subBatchSize === 1 || splitDepth >= MAX_SPLIT_DEPTH ) ) {
299+ // Either a singleton ClickHouse still rejects, or a leaf we can
300+ // no longer split. Drop it so the rest of the queue keeps
301+ // flowing, decrement the queue counter, and log a 1KB sample of
302+ // the first row so the offender can be investigated later
303+ // without dumping multi-KB of attributes into the log.
304+ self . metrics . droppedRows += subBatchSize ;
305+ self . metrics . failedBatches ++ ;
306+ self . totalQueuedItems -= subBatchSize ;
307+ self . logger . error (
308+ "Dropping OTel rows rejected by ClickHouse JSON parser" ,
309+ {
310+ flushId,
311+ droppedCount : subBatchSize ,
312+ sample : JSON . stringify ( batchToFlush [ 0 ] ) . slice ( 0 , 1024 ) ,
313+ splitDepth,
314+ reason : subBatchSize === 1 ? "singleton" : "split_depth_exhausted" ,
315+ }
316+ ) ;
317+ return ;
318+ }
319+
224320 self . consecutiveFlushFailures ++ ;
225321 self . metrics . failedBatches ++ ;
226322
227323 self . logger . error ( "Error attempting to flush batch" , {
228324 flushId,
229- itemCount,
325+ itemCount : subBatchSize ,
230326 error,
231327 consecutiveFailures : self . consecutiveFlushFailures ,
232328 attempt,
329+ splitDepth,
233330 } ) ;
234331
235332 // Back off on failures
@@ -239,7 +336,7 @@ export class DynamicFlushScheduler<T> {
239336
240337 if ( attempt <= 3 ) {
241338 await new Promise ( ( resolve ) => setTimeout ( resolve , 500 ) ) ;
242- return await tryFlush ( flushId , batchToFlush , attempt + 1 ) ;
339+ return await tryFlush ( flushId , batchToFlush , attempt + 1 , splitDepth ) ;
243340 } else {
244341 throw error ;
245342 }
0 commit comments