diff --git a/src/cli.rs b/src/cli.rs index 50f1202c9..d6664ff2e 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -17,7 +17,7 @@ */ use clap::{Parser, value_parser}; -use std::{env, fs, path::PathBuf}; +use std::{env, fs, ops::Div, path::PathBuf}; use url::Url; @@ -194,6 +194,16 @@ pub struct Options { )] pub max_connections: usize, + // DataFusion target partitions + #[arg( + long, + env = "P_DATAFUSION_TARGET_PARTITIONS", + default_value_t = num_cpus::get().div(2).max(1) as u64, + value_parser = value_parser!(u64).range(1..), + help = "Number of partitions for DF to split execution into" + )] + pub target_partitions: u64, + #[arg( long = "origin", env = "P_ORIGIN_URI", diff --git a/src/handlers/airplane.rs b/src/handlers/airplane.rs index 88b4955ae..8352ea3f9 100644 --- a/src/handlers/airplane.rs +++ b/src/handlers/airplane.rs @@ -235,7 +235,7 @@ impl FlightService for AirServiceImpl { if event.is_some() { // Clear staging of stream once airplane has taxied - PARSEABLE.get_or_create_stream(&stream_name, &None).clear(); + let _ = PARSEABLE.get_or_create_stream(&stream_name, &None).clear(); } let time = time.elapsed().as_secs_f64(); diff --git a/src/parseable/staging/writer.rs b/src/parseable/staging/writer.rs index 56a059d9f..ec58800a3 100644 --- a/src/parseable/staging/writer.rs +++ b/src/parseable/staging/writer.rs @@ -135,16 +135,17 @@ impl Default for MemWriter { } impl MemWriter { - pub fn push(&mut self, schema_key: &str, rb: &RecordBatch) { + pub fn push(&mut self, schema_key: &str, rb: &RecordBatch) -> Result<(), StagingError> { if !self.schema_map.contains(schema_key) { self.schema_map.insert(schema_key.to_owned()); - self.schema = Schema::try_merge([self.schema.clone(), (*rb.schema()).clone()]).unwrap(); + self.schema = Schema::try_merge([self.schema.clone(), (*rb.schema()).clone()])?; } if let Some(record) = self.mutable_buffer.push(rb) { - let record = concat_records(&Arc::new(self.schema.clone()), &record); + let record = concat_records(&Arc::new(self.schema.clone()), &record)?; self.read_buffer.push(record); } + Ok(()) } pub fn clear(&mut self) { @@ -154,23 +155,29 @@ impl MemWriter { self.mutable_buffer.inner.clear(); } - pub fn recordbatch_cloned(&self, schema: &Arc) -> Vec { + pub fn recordbatch_cloned( + &self, + schema: &Arc, + ) -> Result, StagingError> { let mut read_buffer = self.read_buffer.clone(); if !self.mutable_buffer.inner.is_empty() { - let rb = concat_records(schema, &self.mutable_buffer.inner); + let rb = concat_records(schema, &self.mutable_buffer.inner)?; read_buffer.push(rb) } - read_buffer + Ok(read_buffer .into_iter() .map(|rb| adapt_batch(schema, &rb)) - .collect() + .collect()) } } -fn concat_records(schema: &Arc, record: &[RecordBatch]) -> RecordBatch { +fn concat_records( + schema: &Arc, + record: &[RecordBatch], +) -> Result { let records = record.iter().map(|x| adapt_batch(schema, x)).collect_vec(); - concat_batches(schema, records.iter()).unwrap() + Ok(concat_batches(schema, records.iter())?) } #[derive(Debug, Default)] diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index 6c75dec75..8c1e55115 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -186,8 +186,7 @@ impl Stream { OBJECT_STORE_DATA_GRANULARITY, ); let file_path = self.data_path.join(&filename); - let mut writer = DiskWriter::try_new(file_path, &record.schema(), range) - .expect("File and RecordBatch both are checked"); + let mut writer = DiskWriter::try_new(file_path, &record.schema(), range)?; writer.write(record)?; guard.disk.insert(filename, writer); @@ -195,7 +194,7 @@ impl Stream { }; } - guard.mem.push(schema_key, record); + guard.mem.push(schema_key, record)?; Ok(()) } @@ -532,26 +531,46 @@ impl Stream { Ok(()) } - pub fn recordbatches_cloned(&self, schema: &Arc) -> Vec { - self.writer.lock().unwrap().mem.recordbatch_cloned(schema) - } - - pub fn clear(&self) { - self.writer.lock().unwrap().mem.clear(); + pub fn recordbatches_cloned( + &self, + schema: &Arc, + ) -> Result, StagingError> { + let writer = self.writer.lock().map_err(|poisoned| { + StagingError::PoisonError(PoisonError::new(format!( + "Writer lock poisoned while cloning record batches for stream {} - {}", + self.stream_name, poisoned + ))) + })?; + + writer.mem.recordbatch_cloned(schema) + } + + pub fn clear(&self) -> Result<(), StagingError> { + self.writer + .lock() + .map_err(|poisoned| { + StagingError::PoisonError(PoisonError::new(format!( + "Writer lock poisoned while clearing stream {} - {}", + self.stream_name, poisoned + ))) + })? + .mem + .clear(); + Ok(()) } - pub fn flush(&self, forced: bool) { + pub fn flush(&self, forced: bool) -> Result<(), StagingError> { let _span = info_span!("flush", stream_name = %self.stream_name, forced).entered(); // Swap out stale writers under the lock, drop them after releasing it. // DiskWriter::Drop does I/O (IPC finish + file rename) so dropping // outside the lock avoids blocking concurrent push() calls. let stale_writers = { - let mut writer = self.writer.lock().unwrap_or_else(|_| { - panic!( - "Writer lock poisoned while flushing data for stream {}", - self.stream_name - ) - }); + let mut writer = self.writer.lock().map_err(|poisoned| { + StagingError::PoisonError(PoisonError::new(format!( + "Writer lock poisoned while flushing data for stream {} - {}", + self.stream_name, poisoned + ))) + })?; writer.mem.clear(); let mut old_disk = HashMap::new(); @@ -567,6 +586,7 @@ impl Stream { }; // DiskWriter::Drop I/O happens here, outside the lock drop(stale_writers); + Ok(()) } fn parquet_writer_props( @@ -1306,7 +1326,7 @@ impl Stream { // Force flush for init or shutdown signals to convert all .part files to .arrows // For regular cycles, use false to only flush non-current writers let forced = init_signal || shutdown_signal; - self.flush(forced); + self.flush(forced)?; info!( "Flushing stream ({}) took: {}s", self.stream_name, @@ -1717,7 +1737,7 @@ mod tests { StreamType::UserDefined, ) .unwrap(); - staging.flush(true); + staging.flush(true).unwrap(); } #[test] diff --git a/src/query/mod.rs b/src/query/mod.rs index 2cc49b068..effa5e44f 100644 --- a/src/query/mod.rs +++ b/src/query/mod.rs @@ -43,6 +43,7 @@ use datafusion::sql::parser::DFParser; use datafusion::sql::resolve::resolve_table_references; use datafusion::sql::sqlparser::dialect::PostgreSqlDialect; use futures::Stream; +use futures::StreamExt; use itertools::Itertools; use once_cell::sync::Lazy; use serde::{Deserialize, Serialize}; @@ -54,7 +55,7 @@ use std::sync::{Arc, RwLock}; use std::task::{Context, Poll}; use sysinfo::System; use tokio::runtime::Runtime; -use tokio_stream::wrappers::ReceiverStream; +use tokio_stream::wrappers::UnboundedReceiverStream; use tracing::Instrument; use self::error::ExecuteError; @@ -217,7 +218,8 @@ impl Query { .with_prefer_existing_sort(true) //batch size has been made configurable via environment variable //default value is 20000 - .with_batch_size(PARSEABLE.options.execution_batch_size); + .with_batch_size(PARSEABLE.options.execution_batch_size) + .with_target_partitions(PARSEABLE.options.target_partitions as usize); // Pushdown filters allows DF to push the filters as far down in the plan as possible // and thus, reducing the number of rows decoded @@ -226,10 +228,22 @@ impl Query { // Reorder filters allows DF to decide the order of filters minimizing the cost of filter evaluation config.options_mut().execution.parquet.reorder_filters = true; config.options_mut().execution.parquet.binary_as_string = true; + // Bump footer-read hint from the 512 KiB default. Streams with + // many label columns + page-indexed value columns can have + // parquet footers in the 1-2 MiB range; sizing the hint above + // the actual footer collapses the two-read fallback into a + // single GET per file, saving a round trip on every file open. + config.options_mut().execution.parquet.metadata_size_hint = Some(2 * 1024 * 1024); config .options_mut() .execution .use_row_number_estimates_to_optimize_partitioning = true; + config.options_mut().execution.parquet.enable_page_index = true; + config + .options_mut() + .execution + .parquet + .max_predicate_cache_size = Some(1024 * 1024 * 1024); //adding this config as it improves query performance as explained here - // https://github.com/apache/datafusion/pull/13101 @@ -308,13 +322,12 @@ impl Query { }); let partition_streams = execute_stream_partitioned(plan.clone(), task_ctx.clone())?; - let n = partition_streams.len(); - // Bound channel so a slow consumer backpressures producers — caps peak memory. - let (tx, rx) = tokio::sync::mpsc::channel::< + + let (tx, rx) = tokio::sync::mpsc::unbounded_channel::< Result, - >((num_cpus::get() * 4).max(n * 2).max(1)); + >(); - for s in partition_streams { + for s in partition_streams.into_iter() { let wrapped = PartitionedMetricMonitor::new(s, monitor_state.clone(), tenant_id.clone()); let tx = tx.clone(); @@ -322,9 +335,8 @@ impl Query { tokio::spawn( async move { let mut stream: SendableRecordBatchStream = Box::pin(wrapped); - use futures::StreamExt; while let Some(batch) = stream.next().await { - if tx.send(batch).await.is_err() { + if tx.send(batch).is_err() { break; } } @@ -334,7 +346,7 @@ impl Query { } drop(tx); - let merged = ReceiverStream::new(rx); + let merged = UnboundedReceiverStream::new(rx); let final_stream = RecordBatchStreamAdapter::new(plan.schema(), merged); Either::Right(Box::pin(final_stream) as SendableRecordBatchStream) }; diff --git a/src/query/stream_schema_provider.rs b/src/query/stream_schema_provider.rs index 97afa1cfb..2f4308a32 100644 --- a/src/query/stream_schema_provider.rs +++ b/src/query/stream_schema_provider.rs @@ -150,14 +150,19 @@ impl StandardTableProvider { // parquet file source, default table parquet options let file_source = if let Some(phyiscal_expr) = filters { - ParquetSource::new(self.schema.clone()).with_predicate(phyiscal_expr) + ParquetSource::new(self.schema.clone()) + .with_predicate(phyiscal_expr) + .with_pushdown_filters(true) + .with_reorder_filters(true) } else { ParquetSource::new(self.schema.clone()) + .with_pushdown_filters(true) + .with_reorder_filters(true) }; let mut conf_builder = FileScanConfigBuilder::new(object_store_url, file_source.into()) .with_statistics(statistics) - .with_batch_size(Some(20000)) + .with_batch_size(Some(PARSEABLE.options.execution_batch_size)) .with_constraints(Constraints::default()) .with_file_groups(file_groups) .with_output_ordering(vec![LexOrdering::new([sort_expr]).unwrap()]); @@ -253,7 +258,9 @@ impl StandardTableProvider { }; // Staging arrow exection plan - let records = staging.recordbatches_cloned(&self.schema); + let records = staging + .recordbatches_cloned(&self.schema) + .map_err(|e| DataFusionError::Internal(format!("Error during staging exec- {e}")))?; let arrow_exec = reversed_mem_table(records, self.schema.clone())? .scan(state, projection, filters, limit) .await?;