fix: Surface ingestion errors#1652
Conversation
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Repository UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (1)
🚧 Files skipped from review as they are similar to previous changes (1)
WalkthroughRefactors staging writer and Stream to return Results, adds a CLI ChangesStreaming error propagation and query optimization
Sequence Diagram(s)sequenceDiagram
participant ProducerTask as ProducerTask
participant UnboundedSender as UnboundedSender
participant UnboundedReceiverStream as UnboundedReceiverStream
participant RecordBatchAdapter as RecordBatchStreamAdapter
participant QueryExecutor as QueryExecutor
ProducerTask->>UnboundedSender: tx.send(batch)
UnboundedSender-->>UnboundedReceiverStream: deliver batch
UnboundedReceiverStream->>RecordBatchAdapter: stream next batch
RecordBatchAdapter->>QueryExecutor: supply RecordBatch
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 3 | ❌ 2❌ Failed checks (2 warnings)
✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
🧹 Nitpick comments (1)
src/query/mod.rs (1)
325-349: ⚖️ Poor tradeoffUnbounded channel can grow without limit under slow consumers.
Switching from a bounded channel to
unbounded_channelremoves backpressure. If the downstream consumer (e.g., HTTP response writer) stalls, batches will accumulate in memory until the DataFusion memory pool limit is hit. The existing memory pool provides some protection, but this could lead to OOM in edge cases where the pool check doesn't account for channel-buffered batches.Consider whether the original bounded channel caused specific deadlock issues, or if a larger but still bounded buffer (e.g.,
channel(1024)) would provide sufficient throughput while maintaining backpressure.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/query/mod.rs` around lines 325 - 349, The unbounded mpsc channel (tx/rx with UnboundedReceiverStream) can let batches accumulate and OOM; replace it with a bounded tokio::sync::mpsc::channel with a configured capacity (e.g., 1024) to restore backpressure, change the spawned tasks to use the bounded Sender clone and await tx.send(batch).await (or handle TrySendError appropriately) and replace UnboundedReceiverStream::new(rx) with the appropriate bounded receiver wrapper (e.g., ReceiverStream::new(rx) / tokio_stream::wrappers::ReceiverStream) so sends await when the consumer is slow; keep using PartitionedMetricMonitor::new(...) and drop(tx) after spawning to close the channel when all producers finish.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@src/query/mod.rs`:
- Around line 325-349: The unbounded mpsc channel (tx/rx with
UnboundedReceiverStream) can let batches accumulate and OOM; replace it with a
bounded tokio::sync::mpsc::channel with a configured capacity (e.g., 1024) to
restore backpressure, change the spawned tasks to use the bounded Sender clone
and await tx.send(batch).await (or handle TrySendError appropriately) and
replace UnboundedReceiverStream::new(rx) with the appropriate bounded receiver
wrapper (e.g., ReceiverStream::new(rx) / tokio_stream::wrappers::ReceiverStream)
so sends await when the consumer is slow; keep using
PartitionedMetricMonitor::new(...) and drop(tx) after spawning to close the
channel when all producers finish.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: e917816f-8ecc-4a9e-9ae8-6f3bec081236
📒 Files selected for processing (5)
src/cli.rssrc/parseable/staging/writer.rssrc/parseable/streams.rssrc/query/mod.rssrc/query/stream_schema_provider.rs
- DiskWriter and MemWriter expect and unwrap replaced - New cli env var `P_DATAFUSION_TARGET_PARTITIONS` for controlling number of partitions (default num cpu / 2) - Streaming response uses unbounded channel now
2e182cd to
535bf40
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/parseable/streams.rs`:
- Around line 534-536: The recordbatches_cloned method currently calls
self.writer.lock().unwrap() which will panic on a poisoned lock; change it to
handle PoisonError like the push method does by using lock().map_err(|e|
StagingError::from_lock_poison(e)) (or the same conversion used in push) and
propagate a StagingError instead of unwrapping, then call
mem.recordbatch_cloned(schema) on the guarded value; reference:
recordbatches_cloned, push, writer.lock().unwrap(), mem.recordbatch_cloned and
StagingError to ensure consistent lock-error handling.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 92f7934e-4704-49d7-b206-d93549f5a259
📒 Files selected for processing (5)
src/cli.rssrc/parseable/staging/writer.rssrc/parseable/streams.rssrc/query/mod.rssrc/query/stream_schema_provider.rs
There was a problem hiding this comment.
🧹 Nitpick comments (1)
src/handlers/airplane.rs (1)
236-239: ⚡ Quick winLog errors from staging clear instead of silently ignoring them.
The
clear()method now returnsResult<(), StagingError>, but this code ignores failures vialet _ = .... If clearing staging fails (e.g., due to lock poisoning), temporary events remain in memory without any visibility, potentially causing memory growth or stale data to affect subsequent queries.📝 Proposed fix to log clear errors
if event.is_some() { // Clear staging of stream once airplane has taxied - let _ = PARSEABLE.get_or_create_stream(&stream_name, &None).clear(); + if let Err(e) = PARSEABLE.get_or_create_stream(&stream_name, &None).clear() { + error!("Failed to clear staging for stream {}: {}", stream_name, e); + } }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/handlers/airplane.rs` around lines 236 - 239, The code currently ignores errors from PARSEABLE.get_or_create_stream(&stream_name, &None).clear() when event.is_some(), which can hide failures; change this to handle the Result by matching or using .map_err/.unwrap_or_else to log any StagingError via the existing logger (or create one) instead of discarding it—locate the clear() call inside the conditional near event.is_some() and replace the let _ = ... with code that captures the Result and calls something like logger.error or tracing::error with context including stream_name and the error so clear failures are visible.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@src/handlers/airplane.rs`:
- Around line 236-239: The code currently ignores errors from
PARSEABLE.get_or_create_stream(&stream_name, &None).clear() when
event.is_some(), which can hide failures; change this to handle the Result by
matching or using .map_err/.unwrap_or_else to log any StagingError via the
existing logger (or create one) instead of discarding it—locate the clear() call
inside the conditional near event.is_some() and replace the let _ = ... with
code that captures the Result and calls something like logger.error or
tracing::error with context including stream_name and the error so clear
failures are visible.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: f808ab35-78e6-4d8b-9365-e6dfc270eac7
📒 Files selected for processing (2)
src/handlers/airplane.rssrc/parseable/streams.rs
P_DATAFUSION_TARGET_PARTITIONSfor controlling number of partitions (default num cpu / 2)Fixes #XXXX.
Description
This PR has:
Summary by CodeRabbit
New Features
Improvements