Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions crates/connect/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,43 @@ impl DataFrame {
.await
}

/// Returns a checkpointed version of this [DataFrame].
///
/// Checkpointing truncates the logical plan, which is useful for
/// iterative algorithms where the plan can grow exponentially.
///
/// If `eager` is true, the checkpoint is computed immediately.
/// Otherwise, it is computed lazily when the DataFrame is first accessed.
pub async fn checkpoint(self, eager: bool) -> Result<DataFrame, SparkError> {
let df = self
.persist(storage::StorageLevel::MemoryAndDiskDeser)
.await?;

if eager {
// Trigger computation by executing a count
let _ = df.clone().count().await?;
}

Ok(df)
}

/// Returns a locally checkpointed version of this [DataFrame].
///
/// Local checkpointing uses the executor storage and is not reliable
/// (data may be lost if an executor fails), but it avoids the overhead
/// of replicating data.
///
/// If `eager` is true, the checkpoint is computed immediately.
pub async fn local_checkpoint(self, eager: bool) -> Result<DataFrame, SparkError> {
let df = self.persist(storage::StorageLevel::MemoryAndDisk).await?;

if eager {
let _ = df.clone().count().await?;
}

Ok(df)
}

/// Returns a new [DataFrame] that has exactly `num_partitions` partitions.
pub fn coalesce(self, num_partitions: u32) -> DataFrame {
self.repartition(num_partitions, Some(false))
Expand Down