Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 99 additions & 0 deletions crates/connect/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1404,6 +1404,35 @@ impl DataFrame {
pub fn write_to(self, table: &str) -> DataFrameWriterV2 {
DataFrameWriterV2::new(self, table)
}

/// Create a MERGE INTO operation targeting a table using this DataFrame as the source.
///
/// Requires a Delta Lake or Iceberg table as the target.
///
/// # Arguments
/// * `target_table` - The target table name
/// * `condition` - The merge condition (e.g., `"target.id = source.id"`)
///
/// # Example
/// ```rust
/// df.merge_into("target_table", "target.id = source.id")
/// .when_matched_update("target.value = source.value")
/// .when_not_matched_insert("*")
/// .execute()
/// .await?;
/// ```
pub fn merge_into(self, target_table: &str, condition: &str) -> MergeIntoBuilder {
let session = self.spark_session.clone();
MergeIntoBuilder {
spark_session: session,
source: self,
target_table: target_table.to_string(),
condition: condition.to_string(),
when_matched_update: None,
when_matched_delete: false,
when_not_matched_insert: None,
}
}
}

/// Functionality for working with missing data in [DataFrame].
Expand Down Expand Up @@ -1503,6 +1532,76 @@ impl DataFrameNaFunctions {
}
}

/// Builder for constructing MERGE INTO SQL statements.
///
/// Created via [`DataFrame::merge_into`].
pub struct MergeIntoBuilder {
spark_session: Box<SparkSession>,
target_table: String,
source: DataFrame,
condition: String,
when_matched_update: Option<String>,
when_matched_delete: bool,
when_not_matched_insert: Option<String>,
}

impl MergeIntoBuilder {
/// Define the UPDATE action when rows match.
///
/// # Arguments
/// * `set_clause` - SET clause (e.g., `"target.value = source.value, target.updated = current_timestamp()"`)
pub fn when_matched_update(mut self, set_clause: &str) -> Self {
self.when_matched_update = Some(set_clause.to_string());
self
}

/// Define a DELETE action when rows match.
pub fn when_matched_delete(mut self) -> Self {
self.when_matched_delete = true;
self
}

/// Define the INSERT action when rows do not match.
///
/// # Arguments
/// * `values_clause` - VALUES clause (e.g., `"(source.id, source.value)"`) or `"*"` for all columns
pub fn when_not_matched_insert(mut self, values_clause: &str) -> Self {
self.when_not_matched_insert = Some(values_clause.to_string());
self
}

/// Execute the MERGE INTO statement.
pub async fn execute(self) -> Result<DataFrame, SparkError> {
// Create a temp view for the source DataFrame
let source_view = format!(
"_merge_source_{}",
uuid::Uuid::new_v4().to_string().replace('-', "")
);
self.source
.create_or_replace_temp_view(&source_view)
.await?;

let mut sql = format!(
"MERGE INTO {} USING {} ON {}",
self.target_table, source_view, self.condition
);

if let Some(ref set_clause) = self.when_matched_update {
sql.push_str(&format!(" WHEN MATCHED THEN UPDATE SET {}", set_clause));
}

if self.when_matched_delete {
sql.push_str(" WHEN MATCHED THEN DELETE");
}

if let Some(ref values_clause) = self.when_not_matched_insert {
sql.push_str(&format!(" WHEN NOT MATCHED THEN INSERT {}", values_clause));
}

self.spark_session.sql(&sql).await
}
}

#[cfg(test)]
mod tests {

Expand Down