diff --git a/crates/connect/src/dataframe.rs b/crates/connect/src/dataframe.rs index a69f655..643562b 100644 --- a/crates/connect/src/dataframe.rs +++ b/crates/connect/src/dataframe.rs @@ -1404,6 +1404,35 @@ impl DataFrame { pub fn write_to(self, table: &str) -> DataFrameWriterV2 { DataFrameWriterV2::new(self, table) } + + /// Create a MERGE INTO operation targeting a table using this DataFrame as the source. + /// + /// Requires a Delta Lake or Iceberg table as the target. + /// + /// # Arguments + /// * `target_table` - The target table name + /// * `condition` - The merge condition (e.g., `"target.id = source.id"`) + /// + /// # Example + /// ```rust + /// df.merge_into("target_table", "target.id = source.id") + /// .when_matched_update("target.value = source.value") + /// .when_not_matched_insert("*") + /// .execute() + /// .await?; + /// ``` + pub fn merge_into(self, target_table: &str, condition: &str) -> MergeIntoBuilder { + let session = self.spark_session.clone(); + MergeIntoBuilder { + spark_session: session, + source: self, + target_table: target_table.to_string(), + condition: condition.to_string(), + when_matched_update: None, + when_matched_delete: false, + when_not_matched_insert: None, + } + } } /// Functionality for working with missing data in [DataFrame]. @@ -1503,6 +1532,76 @@ impl DataFrameNaFunctions { } } +/// Builder for constructing MERGE INTO SQL statements. +/// +/// Created via [`DataFrame::merge_into`]. +pub struct MergeIntoBuilder { + spark_session: Box, + target_table: String, + source: DataFrame, + condition: String, + when_matched_update: Option, + when_matched_delete: bool, + when_not_matched_insert: Option, +} + +impl MergeIntoBuilder { + /// Define the UPDATE action when rows match. + /// + /// # Arguments + /// * `set_clause` - SET clause (e.g., `"target.value = source.value, target.updated = current_timestamp()"`) + pub fn when_matched_update(mut self, set_clause: &str) -> Self { + self.when_matched_update = Some(set_clause.to_string()); + self + } + + /// Define a DELETE action when rows match. + pub fn when_matched_delete(mut self) -> Self { + self.when_matched_delete = true; + self + } + + /// Define the INSERT action when rows do not match. + /// + /// # Arguments + /// * `values_clause` - VALUES clause (e.g., `"(source.id, source.value)"`) or `"*"` for all columns + pub fn when_not_matched_insert(mut self, values_clause: &str) -> Self { + self.when_not_matched_insert = Some(values_clause.to_string()); + self + } + + /// Execute the MERGE INTO statement. + pub async fn execute(self) -> Result { + // Create a temp view for the source DataFrame + let source_view = format!( + "_merge_source_{}", + uuid::Uuid::new_v4().to_string().replace('-', "") + ); + self.source + .create_or_replace_temp_view(&source_view) + .await?; + + let mut sql = format!( + "MERGE INTO {} USING {} ON {}", + self.target_table, source_view, self.condition + ); + + if let Some(ref set_clause) = self.when_matched_update { + sql.push_str(&format!(" WHEN MATCHED THEN UPDATE SET {}", set_clause)); + } + + if self.when_matched_delete { + sql.push_str(" WHEN MATCHED THEN DELETE"); + } + + if let Some(ref values_clause) = self.when_not_matched_insert { + sql.push_str(&format!(" WHEN NOT MATCHED THEN INSERT {}", values_clause)); + } + + self.spark_session.sql(&sql).await + } +} + #[cfg(test)] mod tests {