Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions crates/connect/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,22 @@ where
Ok(self.handler.clone())
}

/// Execute a plan and return the result batches without concatenating them.
/// This avoids loading all data into a single RecordBatch, which is useful
/// for large result sets.
pub async fn to_arrow_batches(
&mut self,
plan: spark::Plan,
) -> Result<Vec<RecordBatch>, SparkError> {
let mut req = self.execute_plan_request_with_metadata();

req.plan = Some(plan);

self.execute_and_fetch(req).await?;

Ok(self.handler.batches.clone())
}

#[allow(clippy::wrong_self_convention)]
pub async fn to_arrow(&mut self, plan: spark::Plan) -> Result<RecordBatch, SparkError> {
let mut req = self.execute_plan_request_with_metadata();
Expand Down
11 changes: 11 additions & 0 deletions crates/connect/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,17 @@ impl DataFrame {
self.spark_session.client().to_arrow(plan).await
}

/// Returns an iterator of [RecordBatch]es from the [DataFrame] without
/// loading all results into memory at once.
///
/// Unlike [collect], which concatenates all batches into a single RecordBatch,
/// this method returns each batch as it was received from the server,
/// allowing processing of large datasets without OOM.
pub async fn to_local_iterator(self) -> Result<Vec<RecordBatch>, SparkError> {
let plan = self.plan.plan_root();
self.spark_session.client().to_arrow_batches(plan).await
}

/// Retrieves the names of all columns in the [DataFrame] as a `Vec<String>`.
/// The order of the column names in the list reflects their order in the [DataFrame].
pub async fn columns(self) -> Result<Vec<String>, SparkError> {
Expand Down