Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/

use clap::{Parser, value_parser};
use std::{env, fs, path::PathBuf};
use std::{env, fs, ops::Div, path::PathBuf};

use url::Url;

Expand Down Expand Up @@ -194,6 +194,16 @@ pub struct Options {
)]
pub max_connections: usize,

// DataFusion target partitions
#[arg(
long,
env = "P_DATAFUSION_TARGET_PARTITIONS",
default_value_t = num_cpus::get().div(2).max(1) as u64,
value_parser = value_parser!(u64).range(1..),
help = "Number of partitions for DF to split execution into"
)]
pub target_partitions: u64,

#[arg(
long = "origin",
env = "P_ORIGIN_URI",
Expand Down
2 changes: 1 addition & 1 deletion src/handlers/airplane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ impl FlightService for AirServiceImpl {

if event.is_some() {
// Clear staging of stream once airplane has taxied
PARSEABLE.get_or_create_stream(&stream_name, &None).clear();
let _ = PARSEABLE.get_or_create_stream(&stream_name, &None).clear();
}

let time = time.elapsed().as_secs_f64();
Expand Down
25 changes: 16 additions & 9 deletions src/parseable/staging/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,16 +135,17 @@ impl<const N: usize> Default for MemWriter<N> {
}

impl<const N: usize> MemWriter<N> {
pub fn push(&mut self, schema_key: &str, rb: &RecordBatch) {
pub fn push(&mut self, schema_key: &str, rb: &RecordBatch) -> Result<(), StagingError> {
if !self.schema_map.contains(schema_key) {
self.schema_map.insert(schema_key.to_owned());
self.schema = Schema::try_merge([self.schema.clone(), (*rb.schema()).clone()]).unwrap();
self.schema = Schema::try_merge([self.schema.clone(), (*rb.schema()).clone()])?;
}

if let Some(record) = self.mutable_buffer.push(rb) {
let record = concat_records(&Arc::new(self.schema.clone()), &record);
let record = concat_records(&Arc::new(self.schema.clone()), &record)?;
self.read_buffer.push(record);
}
Ok(())
}

pub fn clear(&mut self) {
Expand All @@ -154,23 +155,29 @@ impl<const N: usize> MemWriter<N> {
self.mutable_buffer.inner.clear();
}

pub fn recordbatch_cloned(&self, schema: &Arc<Schema>) -> Vec<RecordBatch> {
pub fn recordbatch_cloned(
&self,
schema: &Arc<Schema>,
) -> Result<Vec<RecordBatch>, StagingError> {
let mut read_buffer = self.read_buffer.clone();
if !self.mutable_buffer.inner.is_empty() {
let rb = concat_records(schema, &self.mutable_buffer.inner);
let rb = concat_records(schema, &self.mutable_buffer.inner)?;
read_buffer.push(rb)
}

read_buffer
Ok(read_buffer
.into_iter()
.map(|rb| adapt_batch(schema, &rb))
.collect()
.collect())
}
}

fn concat_records(schema: &Arc<Schema>, record: &[RecordBatch]) -> RecordBatch {
fn concat_records(
schema: &Arc<Schema>,
record: &[RecordBatch],
) -> Result<RecordBatch, StagingError> {
let records = record.iter().map(|x| adapt_batch(schema, x)).collect_vec();
concat_batches(schema, records.iter()).unwrap()
Ok(concat_batches(schema, records.iter())?)
}

#[derive(Debug, Default)]
Expand Down
56 changes: 38 additions & 18 deletions src/parseable/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,16 +186,15 @@ impl Stream {
OBJECT_STORE_DATA_GRANULARITY,
);
let file_path = self.data_path.join(&filename);
let mut writer = DiskWriter::try_new(file_path, &record.schema(), range)
.expect("File and RecordBatch both are checked");
let mut writer = DiskWriter::try_new(file_path, &record.schema(), range)?;

writer.write(record)?;
guard.disk.insert(filename, writer);
}
};
}

guard.mem.push(schema_key, record);
guard.mem.push(schema_key, record)?;

Ok(())
}
Expand Down Expand Up @@ -532,26 +531,46 @@ impl Stream {
Ok(())
}

pub fn recordbatches_cloned(&self, schema: &Arc<Schema>) -> Vec<RecordBatch> {
self.writer.lock().unwrap().mem.recordbatch_cloned(schema)
}

pub fn clear(&self) {
self.writer.lock().unwrap().mem.clear();
pub fn recordbatches_cloned(
&self,
schema: &Arc<Schema>,
) -> Result<Vec<RecordBatch>, StagingError> {
let writer = self.writer.lock().map_err(|poisoned| {
StagingError::PoisonError(PoisonError::new(format!(
"Writer lock poisoned while cloning record batches for stream {} - {}",
self.stream_name, poisoned
)))
})?;

writer.mem.recordbatch_cloned(schema)
}

pub fn clear(&self) -> Result<(), StagingError> {
self.writer
.lock()
.map_err(|poisoned| {
StagingError::PoisonError(PoisonError::new(format!(
"Writer lock poisoned while clearing stream {} - {}",
self.stream_name, poisoned
)))
})?
.mem
.clear();
Ok(())
}

pub fn flush(&self, forced: bool) {
pub fn flush(&self, forced: bool) -> Result<(), StagingError> {
let _span = info_span!("flush", stream_name = %self.stream_name, forced).entered();
// Swap out stale writers under the lock, drop them after releasing it.
// DiskWriter::Drop does I/O (IPC finish + file rename) so dropping
// outside the lock avoids blocking concurrent push() calls.
let stale_writers = {
let mut writer = self.writer.lock().unwrap_or_else(|_| {
panic!(
"Writer lock poisoned while flushing data for stream {}",
self.stream_name
)
});
let mut writer = self.writer.lock().map_err(|poisoned| {
StagingError::PoisonError(PoisonError::new(format!(
"Writer lock poisoned while flushing data for stream {} - {}",
self.stream_name, poisoned
)))
})?;
writer.mem.clear();

let mut old_disk = HashMap::new();
Expand All @@ -567,6 +586,7 @@ impl Stream {
};
// DiskWriter::Drop I/O happens here, outside the lock
drop(stale_writers);
Ok(())
}

fn parquet_writer_props(
Expand Down Expand Up @@ -1306,7 +1326,7 @@ impl Stream {
// Force flush for init or shutdown signals to convert all .part files to .arrows
// For regular cycles, use false to only flush non-current writers
let forced = init_signal || shutdown_signal;
self.flush(forced);
self.flush(forced)?;
info!(
"Flushing stream ({}) took: {}s",
self.stream_name,
Expand Down Expand Up @@ -1717,7 +1737,7 @@ mod tests {
StreamType::UserDefined,
)
.unwrap();
staging.flush(true);
staging.flush(true).unwrap();
}

#[test]
Expand Down
32 changes: 22 additions & 10 deletions src/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use datafusion::sql::parser::DFParser;
use datafusion::sql::resolve::resolve_table_references;
use datafusion::sql::sqlparser::dialect::PostgreSqlDialect;
use futures::Stream;
use futures::StreamExt;
use itertools::Itertools;
use once_cell::sync::Lazy;
use serde::{Deserialize, Serialize};
Expand All @@ -54,7 +55,7 @@ use std::sync::{Arc, RwLock};
use std::task::{Context, Poll};
use sysinfo::System;
use tokio::runtime::Runtime;
use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::Instrument;

use self::error::ExecuteError;
Expand Down Expand Up @@ -217,7 +218,8 @@ impl Query {
.with_prefer_existing_sort(true)
//batch size has been made configurable via environment variable
//default value is 20000
.with_batch_size(PARSEABLE.options.execution_batch_size);
.with_batch_size(PARSEABLE.options.execution_batch_size)
.with_target_partitions(PARSEABLE.options.target_partitions as usize);

// Pushdown filters allows DF to push the filters as far down in the plan as possible
// and thus, reducing the number of rows decoded
Expand All @@ -226,10 +228,22 @@ impl Query {
// Reorder filters allows DF to decide the order of filters minimizing the cost of filter evaluation
config.options_mut().execution.parquet.reorder_filters = true;
config.options_mut().execution.parquet.binary_as_string = true;
// Bump footer-read hint from the 512 KiB default. Streams with
// many label columns + page-indexed value columns can have
// parquet footers in the 1-2 MiB range; sizing the hint above
// the actual footer collapses the two-read fallback into a
// single GET per file, saving a round trip on every file open.
config.options_mut().execution.parquet.metadata_size_hint = Some(2 * 1024 * 1024);
config
.options_mut()
.execution
.use_row_number_estimates_to_optimize_partitioning = true;
config.options_mut().execution.parquet.enable_page_index = true;
config
.options_mut()
.execution
.parquet
.max_predicate_cache_size = Some(1024 * 1024 * 1024);

//adding this config as it improves query performance as explained here -
// https://github.com/apache/datafusion/pull/13101
Expand Down Expand Up @@ -308,23 +322,21 @@ impl Query {
});

let partition_streams = execute_stream_partitioned(plan.clone(), task_ctx.clone())?;
let n = partition_streams.len();
// Bound channel so a slow consumer backpressures producers — caps peak memory.
let (tx, rx) = tokio::sync::mpsc::channel::<

let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<
Result<RecordBatch, datafusion::error::DataFusionError>,
>((num_cpus::get() * 4).max(n * 2).max(1));
>();

for s in partition_streams {
for s in partition_streams.into_iter() {
let wrapped =
PartitionedMetricMonitor::new(s, monitor_state.clone(), tenant_id.clone());
let tx = tx.clone();
let span = tracing::Span::current();
tokio::spawn(
async move {
let mut stream: SendableRecordBatchStream = Box::pin(wrapped);
use futures::StreamExt;
while let Some(batch) = stream.next().await {
if tx.send(batch).await.is_err() {
if tx.send(batch).is_err() {
break;
}
}
Expand All @@ -334,7 +346,7 @@ impl Query {
}
drop(tx);

let merged = ReceiverStream::new(rx);
let merged = UnboundedReceiverStream::new(rx);
let final_stream = RecordBatchStreamAdapter::new(plan.schema(), merged);
Either::Right(Box::pin(final_stream) as SendableRecordBatchStream)
};
Expand Down
13 changes: 10 additions & 3 deletions src/query/stream_schema_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,14 +150,19 @@ impl StandardTableProvider {

// parquet file source, default table parquet options
let file_source = if let Some(phyiscal_expr) = filters {
ParquetSource::new(self.schema.clone()).with_predicate(phyiscal_expr)
ParquetSource::new(self.schema.clone())
.with_predicate(phyiscal_expr)
.with_pushdown_filters(true)
.with_reorder_filters(true)
} else {
ParquetSource::new(self.schema.clone())
.with_pushdown_filters(true)
.with_reorder_filters(true)
};

let mut conf_builder = FileScanConfigBuilder::new(object_store_url, file_source.into())
.with_statistics(statistics)
.with_batch_size(Some(20000))
.with_batch_size(Some(PARSEABLE.options.execution_batch_size))
.with_constraints(Constraints::default())
.with_file_groups(file_groups)
.with_output_ordering(vec![LexOrdering::new([sort_expr]).unwrap()]);
Expand Down Expand Up @@ -253,7 +258,9 @@ impl StandardTableProvider {
};

// Staging arrow exection plan
let records = staging.recordbatches_cloned(&self.schema);
let records = staging
.recordbatches_cloned(&self.schema)
.map_err(|e| DataFusionError::Internal(format!("Error during staging exec- {e}")))?;
let arrow_exec = reversed_mem_table(records, self.schema.clone())?
.scan(state, projection, filters, limit)
.await?;
Expand Down
Loading