From 85d03d79f4af6e215ebfec1a89aa16c929db3382 Mon Sep 17 00:00:00 2001 From: Rafael Fernandez Date: Sun, 29 Mar 2026 09:56:25 +0200 Subject: [PATCH] [SPARK-52428] Add DataFrame::to_local_iterator() for batch-by-batch result access --- crates/connect/src/client/mod.rs | 16 ++++++++++++++++ crates/connect/src/dataframe.rs | 11 +++++++++++ 2 files changed, 27 insertions(+) diff --git a/crates/connect/src/client/mod.rs b/crates/connect/src/client/mod.rs index 7677d14..e569fe2 100644 --- a/crates/connect/src/client/mod.rs +++ b/crates/connect/src/client/mod.rs @@ -540,6 +540,22 @@ where Ok(self.handler.clone()) } + /// Execute a plan and return the result batches without concatenating them. + /// This avoids loading all data into a single RecordBatch, which is useful + /// for large result sets. + pub async fn to_arrow_batches( + &mut self, + plan: spark::Plan, + ) -> Result, SparkError> { + let mut req = self.execute_plan_request_with_metadata(); + + req.plan = Some(plan); + + self.execute_and_fetch(req).await?; + + Ok(self.handler.batches.clone()) + } + #[allow(clippy::wrong_self_convention)] pub async fn to_arrow(&mut self, plan: spark::Plan) -> Result { let mut req = self.execute_plan_request_with_metadata(); diff --git a/crates/connect/src/dataframe.rs b/crates/connect/src/dataframe.rs index a69f655..63fadb7 100644 --- a/crates/connect/src/dataframe.rs +++ b/crates/connect/src/dataframe.rs @@ -206,6 +206,17 @@ impl DataFrame { self.spark_session.client().to_arrow(plan).await } + /// Returns an iterator of [RecordBatch]es from the [DataFrame] without + /// loading all results into memory at once. + /// + /// Unlike [collect], which concatenates all batches into a single RecordBatch, + /// this method returns each batch as it was received from the server, + /// allowing processing of large datasets without OOM. + pub async fn to_local_iterator(self) -> Result, SparkError> { + let plan = self.plan.plan_root(); + self.spark_session.client().to_arrow_batches(plan).await + } + /// Retrieves the names of all columns in the [DataFrame] as a `Vec`. /// The order of the column names in the list reflects their order in the [DataFrame]. pub async fn columns(self) -> Result, SparkError> {