From d512855c08dd0fa42517ee7385250db2bfdac20c Mon Sep 17 00:00:00 2001 From: Rafael Fernandez Date: Sun, 29 Mar 2026 09:56:29 +0200 Subject: [PATCH] [SPARK-52428] Add DataFrame::checkpoint() and local_checkpoint() --- crates/connect/src/dataframe.rs | 37 +++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/crates/connect/src/dataframe.rs b/crates/connect/src/dataframe.rs index a69f655..7756c30 100644 --- a/crates/connect/src/dataframe.rs +++ b/crates/connect/src/dataframe.rs @@ -174,6 +174,43 @@ impl DataFrame { .await } + /// Returns a checkpointed version of this [DataFrame]. + /// + /// Checkpointing truncates the logical plan, which is useful for + /// iterative algorithms where the plan can grow exponentially. + /// + /// If `eager` is true, the checkpoint is computed immediately. + /// Otherwise, it is computed lazily when the DataFrame is first accessed. + pub async fn checkpoint(self, eager: bool) -> Result { + let df = self + .persist(storage::StorageLevel::MemoryAndDiskDeser) + .await?; + + if eager { + // Trigger computation by executing a count + let _ = df.clone().count().await?; + } + + Ok(df) + } + + /// Returns a locally checkpointed version of this [DataFrame]. + /// + /// Local checkpointing uses the executor storage and is not reliable + /// (data may be lost if an executor fails), but it avoids the overhead + /// of replicating data. + /// + /// If `eager` is true, the checkpoint is computed immediately. + pub async fn local_checkpoint(self, eager: bool) -> Result { + let df = self.persist(storage::StorageLevel::MemoryAndDisk).await?; + + if eager { + let _ = df.clone().count().await?; + } + + Ok(df) + } + /// Returns a new [DataFrame] that has exactly `num_partitions` partitions. pub fn coalesce(self, num_partitions: u32) -> DataFrame { self.repartition(num_partitions, Some(false))