diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs index 074c7fefe4..d8c039c357 100644 --- a/crates/iceberg/src/transaction/mod.rs +++ b/crates/iceberg/src/transaction/mod.rs @@ -54,6 +54,7 @@ mod action; pub use action::*; mod append; +mod row_delta; mod snapshot; mod sort_order; mod update_location; @@ -71,6 +72,7 @@ use crate::spec::TableProperties; use crate::table::Table; use crate::transaction::action::BoxedTransactionAction; use crate::transaction::append::FastAppendAction; +use crate::transaction::row_delta::RowDeltaAction; use crate::transaction::sort_order::ReplaceSortOrderAction; use crate::transaction::update_location::UpdateLocationAction; use crate::transaction::update_properties::UpdatePropertiesAction; @@ -141,6 +143,16 @@ impl Transaction { FastAppendAction::new() } + /// Creates a row delta action for row-level modifications. + /// + /// RowDelta supports: + /// - Adding new data files (inserts) + /// - Removing data files (deletes in Copy-on-Write (COW) mode) + /// - Both operations in a single transaction (updates/merges) + pub fn row_delta(&self) -> RowDeltaAction { + RowDeltaAction::new() + } + /// Creates replace sort order action. pub fn replace_sort_order(&self) -> ReplaceSortOrderAction { ReplaceSortOrderAction::new() diff --git a/crates/iceberg/src/transaction/row_delta.rs b/crates/iceberg/src/transaction/row_delta.rs new file mode 100644 index 0000000000..f639bd666f --- /dev/null +++ b/crates/iceberg/src/transaction/row_delta.rs @@ -0,0 +1,509 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; +use std::sync::Arc; + +use async_trait::async_trait; +use uuid::Uuid; + +use crate::error::Result; +use crate::spec::{DataFile, ManifestEntry, ManifestFile, ManifestStatus, Operation}; +use crate::table::Table; +use crate::transaction::snapshot::{ + DefaultManifestProcess, SnapshotProduceOperation, SnapshotProducer, +}; +use crate::transaction::{ActionCommit, TransactionAction}; + +/// RowDeltaAction handles both data file additions and deletions in a single snapshot. +/// This is the core transaction type for MERGE, UPDATE, DELETE operations. +/// +/// Corresponds to `org.apache.iceberg.RowDelta` in the Java implementation. +/// +/// # Copy-on-Write (COW) Strategy +/// +/// For row-level modifications: +/// 1. Read target data files that contain rows to be modified +/// 2. Apply modifications (UPDATE/DELETE logic) +/// 3. Write modified rows to new data files via `add_data_files()` +/// 4. Mark original files as deleted via `remove_data_files()` +/// +/// For inserts (NOT MATCHED in MERGE): +/// 1. Write new rows to data files +/// 2. Add files via `add_data_files()` +/// +/// # Future: Merge-on-Read Strategy +/// +/// The `add_delete_files()` method is reserved for future Merge-on-Read support, which uses +/// delete files instead of rewriting data files. +pub struct RowDeltaAction { + /// New data files to add (for inserts or rewritten files in COW mode) + added_data_files: Vec, + /// Data files to mark as deleted (for COW mode when rewriting files) + removed_data_files: Vec, + /// Delete files to add (reserved for future Merge-on-Read mode support) + added_delete_files: Vec, + /// Optional commit UUID for manifest file naming + commit_uuid: Option, + /// Additional properties to add to snapshot summary + snapshot_properties: HashMap, + /// Optional starting snapshot ID for conflict detection + starting_snapshot_id: Option, +} + +impl RowDeltaAction { + pub(crate) fn new() -> Self { + Self { + added_data_files: vec![], + removed_data_files: vec![], + added_delete_files: vec![], + commit_uuid: None, + snapshot_properties: HashMap::default(), + starting_snapshot_id: None, + } + } + + /// Add new data files to the snapshot. Used for: + /// - New rows from INSERT operations + /// - Rewritten data files in COW mode (after applying UPDATE/DELETE) + pub fn add_data_files(mut self, data_files: impl IntoIterator) -> Self { + self.added_data_files.extend(data_files); + self + } + + /// Mark data files as deleted in the snapshot. + /// Used in COW mode to mark original files as deleted when they've been rewritten with modifications. + /// Corresponds to `removeRows(DataFile)` in Java implementation. + pub fn remove_data_files(mut self, data_files: impl IntoIterator) -> Self { + self.removed_data_files.extend(data_files); + self + } + + /// Add delete files to the snapshot (reserved for future Merge-on-Read mode). + /// #Note: This is not yet implemented and is reserved for future Merge-on-Read optimization + /// where delete files are used instead of rewriting data files. + pub fn add_delete_files(mut self, delete_files: impl IntoIterator) -> Self { + self.added_delete_files.extend(delete_files); + self + } + + /// Set commit UUID for the snapshot. + pub fn set_commit_uuid(mut self, commit_uuid: Uuid) -> Self { + self.commit_uuid = Some(commit_uuid); + self + } + + /// Set snapshot summary properties. + pub fn set_snapshot_properties(mut self, snapshot_properties: HashMap) -> Self { + self.snapshot_properties = snapshot_properties; + self + } + + /// Validate that the operation is applied on top of a specific snapshot. + /// This can be used for conflict detection in concurrent modification scenarios. + pub fn validate_from_snapshot(mut self, snapshot_id: i64) -> Self { + self.starting_snapshot_id = Some(snapshot_id); + self + } +} + +#[async_trait] +impl TransactionAction for RowDeltaAction { + async fn commit(self: Arc, table: &Table) -> Result { + // Validate starting snapshot if specified + if let Some(expected_snapshot_id) = self.starting_snapshot_id + && table.metadata().current_snapshot_id() != Some(expected_snapshot_id) + { + return Err(crate::Error::new( + crate::ErrorKind::DataInvalid, + format!( + "Cannot commit RowDelta based on stale snapshot. Expected: {}, Current: {:?}", + expected_snapshot_id, + table.metadata().current_snapshot_id() + ), + )); + } + + let snapshot_producer = SnapshotProducer::new( + table, + self.commit_uuid.unwrap_or_else(Uuid::now_v7), + None, // key_metadata - not used for row delta + self.snapshot_properties.clone(), + self.added_data_files.clone(), + ); + + // Validate added files (same validation as FastAppend) + snapshot_producer.validate_added_data_files()?; + + // Create RowDeltaOperation with removed files + let operation = RowDeltaOperation { + removed_data_files: self.removed_data_files.clone(), + added_delete_files: self.added_delete_files.clone(), + }; + + snapshot_producer + .commit(operation, DefaultManifestProcess) + .await + } +} + +/// Implements the snapshot production logic for RowDelta operations. +/// +/// This determines: +/// - Which operation type is recorded (Append/Delete/Overwrite) +/// - Which manifest entries should be marked as deleted +/// - Which existing manifests should be carried forward +struct RowDeltaOperation { + removed_data_files: Vec, + added_delete_files: Vec, +} + +impl SnapshotProduceOperation for RowDeltaOperation { + /// Determine operation type based on what's being added/removed. + /// + /// Logic matches Java implementation in BaseRowDelta: + /// - Only adds data files (no deletes, no removes) → Append + /// - Only adds delete files → Delete + /// - Mixed or removes data files → Overwrite + fn operation(&self) -> Operation { + let has_added_deletes = !self.added_delete_files.is_empty(); + let has_removed_data = !self.removed_data_files.is_empty(); + + if has_removed_data || has_added_deletes { + // If we're removing data files or adding delete files, it's an Overwrite + Operation::Overwrite + } else { + // Pure append of new data files + Operation::Append + } + } + + /// Returns manifest entries for files that should be marked as deleted. + /// This creates DELETED entries for removed data files in COW mode. + async fn delete_entries( + &self, + snapshot_produce: &SnapshotProducer<'_>, + ) -> Result> { + let snapshot_id = snapshot_produce.table.metadata().current_snapshot_id(); + + // Create DELETED manifest entries for removed data files + let deleted_entries: Vec = self + .removed_data_files + .iter() + .map(|data_file| { + if let Some(snapshot_id) = snapshot_id { + ManifestEntry::builder() + .status(ManifestStatus::Deleted) + .snapshot_id(snapshot_id) + // TODO: Get actual sequence numbers from original manifest entries + // For now, use 0 as a placeholder - this should be the sequence + // number from when the file was originally added + .sequence_number(0) + .file_sequence_number(0) + .data_file(data_file.clone()) + .build() + } else { + ManifestEntry::builder() + .status(ManifestStatus::Deleted) + .sequence_number(0) + .file_sequence_number(0) + .data_file(data_file.clone()) + .build() + } + }) + .collect(); + + Ok(deleted_entries) + } + + /// Returns existing manifest files that should be included in the new snapshot. + /// + /// For RowDelta in Copy-on-Write mode: + /// - We're rewriting entire data files (not just modifying rows) + /// - Files being deleted are completely replaced by new files + /// - We should NOT carry forward manifests that contain any of the deleted files + /// + /// Note: For future precision COW or Merge-on-Read modes, this logic may need refinement. + async fn existing_manifest( + &self, + snapshot_produce: &SnapshotProducer<'_>, + ) -> Result> { + let Some(snapshot) = snapshot_produce.table.metadata().current_snapshot() else { + return Ok(vec![]); + }; + + let manifest_list = snapshot + .load_manifest_list( + snapshot_produce.table.file_io(), + &snapshot_produce.table.metadata_ref(), + ) + .await?; + + // In COW mode, we rewrite entire files, so we need to exclude manifests + // that contain any files we're deleting. Create a set of deleted file paths for fast lookup. + let deleted_file_paths: std::collections::HashSet = self + .removed_data_files + .iter() + .map(|f| f.file_path().to_string()) + .collect(); + + // Filter out manifests that contain deleted files + let mut filtered_manifests = Vec::new(); + for manifest_file in manifest_list.entries().iter() { + if manifest_file.has_added_files() || manifest_file.has_existing_files() { + // Load the manifest to check if it contains any deleted files + let manifest = manifest_file + .load_manifest(snapshot_produce.table.file_io()) + .await?; + + // Check if any entries in this manifest are files we're deleting + let contains_deleted_file = manifest + .entries() + .iter() + .any(|entry| deleted_file_paths.contains(entry.data_file().file_path())); + + if !contains_deleted_file { + // This manifest doesn't contain any files we're deleting, keep it + filtered_manifests.push(manifest_file.clone()); + } + } + } + + Ok(filtered_manifests) + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + use std::sync::Arc; + + use crate::TableUpdate; + use crate::spec::{DataContentType, DataFileBuilder, DataFileFormat, Literal, Struct}; + use crate::transaction::tests::make_v2_minimal_table; + use crate::transaction::{Transaction, TransactionAction}; + + #[tokio::test] + async fn test_row_delta_add_only() { + // Test adding data files only (pure append) + let table = make_v2_minimal_table(); + let tx = Transaction::new(&table); + + let data_file = DataFileBuilder::default() + .content(DataContentType::Data) + .file_path("test/1.parquet".to_string()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(100) + .record_count(10) + .partition_spec_id(table.metadata().default_partition_spec_id()) + .partition(Struct::from_iter([Some(Literal::long(100))])) + .build() + .unwrap(); + + let action = tx.row_delta().add_data_files(vec![data_file.clone()]); + let mut action_commit = Arc::new(action).commit(&table).await.unwrap(); + let updates = action_commit.take_updates(); + + // Verify snapshot was created + assert!(matches!(&updates[0], TableUpdate::AddSnapshot { .. })); + + // Verify the snapshot summary shows Append operation + if let TableUpdate::AddSnapshot { snapshot } = &updates[0] { + assert_eq!(snapshot.summary().operation, crate::spec::Operation::Append); + } + } + + #[tokio::test] + async fn test_row_delta_remove_only() { + // Test removing data files (COW delete) - should succeed + let table = make_v2_minimal_table(); + let tx = Transaction::new(&table); + + let data_file = DataFileBuilder::default() + .content(DataContentType::Data) + .file_path("test/old.parquet".to_string()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(100) + .record_count(10) + .partition_spec_id(table.metadata().default_partition_spec_id()) + .partition(Struct::from_iter([Some(Literal::long(100))])) + .build() + .unwrap(); + + let action = tx.row_delta().remove_data_files(vec![data_file]); + + // This should succeed - delete-only operations are valid + let mut action_commit = Arc::new(action).commit(&table).await.unwrap(); + let updates = action_commit.take_updates(); + + // Verify snapshot was created with Overwrite operation + if let TableUpdate::AddSnapshot { snapshot } = &updates[0] { + assert_eq!( + snapshot.summary().operation, + crate::spec::Operation::Overwrite + ); + } + } + + #[tokio::test] + async fn test_row_delta_add_and_remove() { + // Test COW update: remove old file, add new file + let table = make_v2_minimal_table(); + let tx = Transaction::new(&table); + + let old_file = DataFileBuilder::default() + .content(DataContentType::Data) + .file_path("test/old.parquet".to_string()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(100) + .record_count(10) + .partition_spec_id(table.metadata().default_partition_spec_id()) + .partition(Struct::from_iter([Some(Literal::long(100))])) + .build() + .unwrap(); + + let new_file = DataFileBuilder::default() + .content(DataContentType::Data) + .file_path("test/new.parquet".to_string()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(120) + .record_count(12) + .partition_spec_id(table.metadata().default_partition_spec_id()) + .partition(Struct::from_iter([Some(Literal::long(100))])) + .build() + .unwrap(); + + let action = tx + .row_delta() + .remove_data_files(vec![old_file]) + .add_data_files(vec![new_file]); + + let mut action_commit = Arc::new(action).commit(&table).await.unwrap(); + let updates = action_commit.take_updates(); + + // Verify snapshot was created with Overwrite operation + if let TableUpdate::AddSnapshot { snapshot } = &updates[0] { + assert_eq!( + snapshot.summary().operation, + crate::spec::Operation::Overwrite + ); + } + } + + #[tokio::test] + async fn test_row_delta_with_snapshot_properties() { + let table = make_v2_minimal_table(); + let tx = Transaction::new(&table); + + let mut snapshot_properties = HashMap::new(); + snapshot_properties.insert("key".to_string(), "value".to_string()); + + let data_file = DataFileBuilder::default() + .content(DataContentType::Data) + .file_path("test/1.parquet".to_string()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(100) + .record_count(10) + .partition_spec_id(table.metadata().default_partition_spec_id()) + .partition(Struct::from_iter([Some(Literal::long(100))])) + .build() + .unwrap(); + + let action = tx + .row_delta() + .set_snapshot_properties(snapshot_properties) + .add_data_files(vec![data_file]); + + let mut action_commit = Arc::new(action).commit(&table).await.unwrap(); + let updates = action_commit.take_updates(); + + // Check customized properties in snapshot summary + if let TableUpdate::AddSnapshot { snapshot } = &updates[0] { + assert_eq!( + snapshot.summary().additional_properties.get("key").unwrap(), + "value" + ); + } + } + + #[tokio::test] + async fn test_row_delta_validate_from_snapshot() { + // Test the snapshot validation logic + let table = make_v2_minimal_table(); + let tx = Transaction::new(&table); + + let data_file = DataFileBuilder::default() + .content(DataContentType::Data) + .file_path("test/1.parquet".to_string()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(100) + .record_count(10) + .partition_spec_id(table.metadata().default_partition_spec_id()) + .partition(Struct::from_iter([Some(Literal::long(100))])) + .build() + .unwrap(); + + // Test with invalid snapshot ID (table has no snapshot, so any ID should fail) + let action = tx + .row_delta() + .validate_from_snapshot(99999) + .add_data_files(vec![data_file.clone()]); + + let result = Arc::new(action).commit(&table).await; + assert!(result.is_err()); + + // Verify the error message mentions snapshot validation + if let Err(e) = result { + assert!( + e.to_string().contains("stale snapshot") || e.to_string().contains("Cannot commit") + ); + } + } + + #[tokio::test] + async fn test_row_delta_empty_action() { + let table = make_v2_minimal_table(); + let tx = Transaction::new(&table); + let action = tx.row_delta(); + + // Empty row delta should fail + assert!(Arc::new(action).commit(&table).await.is_err()); + } + + #[tokio::test] + async fn test_row_delta_incompatible_partition_value() { + let table = make_v2_minimal_table(); + let tx = Transaction::new(&table); + + // Create file with incompatible partition value (string instead of long) + let data_file = DataFileBuilder::default() + .content(DataContentType::Data) + .file_path("test/bad.parquet".to_string()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(100) + .record_count(10) + .partition_spec_id(table.metadata().default_partition_spec_id()) + .partition(Struct::from_iter([Some(Literal::string("wrong"))])) + .build() + .unwrap(); + + let action = tx.row_delta().add_data_files(vec![data_file]); + + // Should fail validation + assert!(Arc::new(action).commit(&table).await.is_err()); + } +} diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index c8bf26a174..fb3865dc3a 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -319,20 +319,48 @@ impl<'a> SnapshotProducer<'a> { writer.write_manifest_file().await } + // Write manifest file for deleted data files and return the ManifestFile for ManifestList. + async fn write_delete_manifest( + &mut self, + delete_entries: Vec, + ) -> Result { + if delete_entries.is_empty() { + return Err(Error::new( + ErrorKind::PreconditionFailed, + "No delete entries found when writing a delete manifest file", + )); + } + + let mut writer = self.new_manifest_writer(ManifestContentType::Data)?; + for entry in delete_entries { + // Use add_delete_entry() to preserve Deleted status instead of add_entry() + // which always overwrites status to Added + writer.add_delete_entry(entry)?; + } + writer.write_manifest_file().await + } + async fn manifest_file( &mut self, snapshot_produce_operation: &OP, manifest_process: &MP, ) -> Result> { + // Check if there's any content to add to the new snapshot + let delete_entries = snapshot_produce_operation.delete_entries(self).await?; + let has_delete_entries = !delete_entries.is_empty(); + // Assert current snapshot producer contains new content to add to new snapshot. // // TODO: Allowing snapshot property setup with no added data files is a workaround. // We should clean it up after all necessary actions are supported. // For details, please refer to https://github.com/apache/iceberg-rust/issues/1548 - if self.added_data_files.is_empty() && self.snapshot_properties.is_empty() { + if self.added_data_files.is_empty() + && self.snapshot_properties.is_empty() + && !has_delete_entries + { return Err(Error::new( ErrorKind::PreconditionFailed, - "No added data files or added snapshot properties found when write a manifest file", + "No added data files, delete entries, or snapshot properties found when write a manifest file", )); } @@ -345,8 +373,11 @@ impl<'a> SnapshotProducer<'a> { manifest_files.push(added_manifest); } - // # TODO - // Support process delete entries. + // Process delete entries. + if has_delete_entries { + let delete_manifest = self.write_delete_manifest(delete_entries).await?; + manifest_files.push(delete_manifest); + } let manifest_files = manifest_process.process_manifests(self, manifest_files); Ok(manifest_files)