From d15deeb8697965983eb1b00fd67e82d17c91839e Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Thu, 12 Mar 2026 15:06:56 -0700 Subject: [PATCH] adding support for purge_table --- crates/catalog/glue/src/catalog.rs | 2 +- crates/catalog/hms/src/catalog.rs | 2 +- crates/catalog/rest/src/catalog.rs | 13 +- crates/catalog/s3tables/src/catalog.rs | 2 +- crates/catalog/sql/src/catalog.rs | 2 +- crates/iceberg/src/catalog/memory/catalog.rs | 2 +- crates/iceberg/src/catalog/mod.rs | 12 +- crates/iceberg/src/catalog/utils.rs | 151 +++++++++++++++++++ 8 files changed, 176 insertions(+), 10 deletions(-) create mode 100644 crates/iceberg/src/catalog/utils.rs diff --git a/crates/catalog/glue/src/catalog.rs b/crates/catalog/glue/src/catalog.rs index 9e9d4580c3..5d7802912c 100644 --- a/crates/catalog/glue/src/catalog.rs +++ b/crates/catalog/glue/src/catalog.rs @@ -642,7 +642,7 @@ impl Catalog for GlueCatalog { /// attempting to drop the table. This includes scenarios where /// the table does not exist. /// - Any network or communication error occurs with the database backend. - async fn drop_table(&self, table: &TableIdent) -> Result<()> { + async fn drop_table_with_purge(&self, table: &TableIdent, _purge: bool) -> Result<()> { let db_name = validate_namespace(table.namespace())?; let table_name = table.name(); diff --git a/crates/catalog/hms/src/catalog.rs b/crates/catalog/hms/src/catalog.rs index bd78193732..a2161707ff 100644 --- a/crates/catalog/hms/src/catalog.rs +++ b/crates/catalog/hms/src/catalog.rs @@ -580,7 +580,7 @@ impl Catalog for HmsCatalog { /// attempting to drop the table. This includes scenarios where /// the table does not exist. /// - Any network or communication error occurs with the database backend. - async fn drop_table(&self, table: &TableIdent) -> Result<()> { + async fn drop_table_with_purge(&self, table: &TableIdent, _purge: bool) -> Result<()> { let db_name = validate_namespace(table.namespace())?; if !self.namespace_exists(table.namespace()).await? { return Err(Error::new( diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs index 3551b05160..da490d7d95 100644 --- a/crates/catalog/rest/src/catalog.rs +++ b/crates/catalog/rest/src/catalog.rs @@ -827,13 +827,18 @@ impl Catalog for RestCatalog { } /// Drop a table from the catalog. - async fn drop_table(&self, table: &TableIdent) -> Result<()> { + async fn drop_table_with_purge(&self, table: &TableIdent, purge: bool) -> Result<()> { let context = self.context().await?; - let request = context + let mut request_builder = context .client - .request(Method::DELETE, context.config.table_endpoint(table)) - .build()?; + .request(Method::DELETE, context.config.table_endpoint(table)); + + if purge { + request_builder = request_builder.query(&[("purgeRequested", "true")]); + } + + let request = request_builder.build()?; let http_response = context.client.query_catalog(request).await?; diff --git a/crates/catalog/s3tables/src/catalog.rs b/crates/catalog/s3tables/src/catalog.rs index a416c38f22..2bd57d9c74 100644 --- a/crates/catalog/s3tables/src/catalog.rs +++ b/crates/catalog/s3tables/src/catalog.rs @@ -570,7 +570,7 @@ impl Catalog for S3TablesCatalog { /// This function can return an error in the following situations: /// - Errors from the underlying database deletion process, converted using /// `from_aws_sdk_error`. - async fn drop_table(&self, table: &TableIdent) -> Result<()> { + async fn drop_table_with_purge(&self, table: &TableIdent, _purge: bool) -> Result<()> { let req = self .s3tables_client .delete_table() diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index 195f6c9de4..55f738b9c6 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -729,7 +729,7 @@ impl Catalog for SqlCatalog { } } - async fn drop_table(&self, identifier: &TableIdent) -> Result<()> { + async fn drop_table_with_purge(&self, identifier: &TableIdent, _purge: bool) -> Result<()> { if !self.table_exists(identifier).await? { return no_such_table_err(identifier); } diff --git a/crates/iceberg/src/catalog/memory/catalog.rs b/crates/iceberg/src/catalog/memory/catalog.rs index 25ae004417..ffbcb88b45 100644 --- a/crates/iceberg/src/catalog/memory/catalog.rs +++ b/crates/iceberg/src/catalog/memory/catalog.rs @@ -319,7 +319,7 @@ impl Catalog for MemoryCatalog { } /// Drop a table from the catalog. - async fn drop_table(&self, table_ident: &TableIdent) -> Result<()> { + async fn drop_table_with_purge(&self, table_ident: &TableIdent, _purge: bool) -> Result<()> { let mut root_namespace_state = self.root_namespace_state.lock().await; root_namespace_state.remove_existing_table(table_ident)?; diff --git a/crates/iceberg/src/catalog/mod.rs b/crates/iceberg/src/catalog/mod.rs index 06326917ec..59c4947cf5 100644 --- a/crates/iceberg/src/catalog/mod.rs +++ b/crates/iceberg/src/catalog/mod.rs @@ -19,6 +19,7 @@ pub mod memory; mod metadata_location; +pub mod utils; use std::collections::HashMap; use std::fmt::{Debug, Display}; @@ -95,8 +96,17 @@ pub trait Catalog: Debug + Sync + Send { /// Load table from the catalog. async fn load_table(&self, table: &TableIdent) -> Result; + /// Drop a table from the catalog and purge its data, or returns error if it doesn't exist. + /// + /// This is equivalent to calling `drop_table_with_purge(table, true)`. + async fn drop_table(&self, table: &TableIdent) -> Result<()> { + self.drop_table_with_purge(table, true).await + } + /// Drop a table from the catalog, or returns error if it doesn't exist. - async fn drop_table(&self, table: &TableIdent) -> Result<()>; + /// + /// If `purge` is true, the catalog should also delete the underlying table data. + async fn drop_table_with_purge(&self, table: &TableIdent, purge: bool) -> Result<()>; /// Check if a table exists in the catalog. async fn table_exists(&self, table: &TableIdent) -> Result; diff --git a/crates/iceberg/src/catalog/utils.rs b/crates/iceberg/src/catalog/utils.rs new file mode 100644 index 0000000000..000de96b22 --- /dev/null +++ b/crates/iceberg/src/catalog/utils.rs @@ -0,0 +1,151 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Utility functions for catalog operations. + +use std::collections::HashSet; + +use crate::io::FileIO; +use crate::spec::TableMetadata; +use crate::Result; + +/// Property key for enabling garbage collection on drop. +/// When set to `false`, data files will not be deleted when a table is dropped. +/// Defaults to `true`. +pub const GC_ENABLED: &str = "gc.enabled"; +const GC_ENABLED_DEFAULT: bool = true; + +/// Deletes all data and metadata files referenced by the given table metadata. +/// +/// This mirrors the Java implementation's `CatalogUtil.dropTableData`. +/// It collects all manifest files, manifest lists, previous metadata files, +/// statistics files, and partition statistics files, then deletes them. +/// +/// Data files within manifests are only deleted if the `gc.enabled` table +/// property is `true` (the default), to avoid corrupting other tables that +/// may share the same data files. +/// +/// Individual file deletion failures are suppressed to complete as much +/// cleanup as possible, matching the Java behavior. +pub async fn drop_table_data( + io: &FileIO, + metadata: &TableMetadata, + metadata_location: Option<&str>, +) -> Result<()> { + let mut manifest_lists_to_delete: HashSet = HashSet::new(); + let mut manifests_to_delete: HashSet = HashSet::new(); + + for snapshot in metadata.snapshots() { + // Collect the manifest list location + let manifest_list_location = snapshot.manifest_list(); + if !manifest_list_location.is_empty() { + manifest_lists_to_delete.insert(manifest_list_location.to_string()); + } + + // Load all manifests from this snapshot + match snapshot.load_manifest_list(io, metadata).await { + Ok(manifest_list) => { + for manifest_file in manifest_list.entries() { + manifests_to_delete.insert(manifest_file.manifest_path.clone()); + } + } + Err(_) => { + // Suppress failure to continue cleanup + } + } + } + + let gc_enabled = metadata + .properties() + .get(GC_ENABLED) + .and_then(|v| v.parse::().ok()) + .unwrap_or(GC_ENABLED_DEFAULT); + + // Delete data files only if gc.enabled is true, to avoid corrupting shared tables + if gc_enabled { + delete_data_files(io, &manifests_to_delete).await; + } + + // Delete manifest files + delete_files(io, manifests_to_delete.iter().map(String::as_str)).await; + + // Delete manifest lists + delete_files(io, manifest_lists_to_delete.iter().map(String::as_str)).await; + + // Delete previous metadata files + delete_files( + io, + metadata.metadata_log().iter().map(|m| m.metadata_file.as_str()), + ) + .await; + + // Delete statistics files + delete_files( + io, + metadata + .statistics_iter() + .map(|s| s.statistics_path.as_str()), + ) + .await; + + // Delete partition statistics files + delete_files( + io, + metadata + .partition_statistics_iter() + .map(|s| s.statistics_path.as_str()), + ) + .await; + + // Delete the current metadata file + if let Some(location) = metadata_location { + let _ = io.delete(location).await; + } + + Ok(()) +} + +/// Reads each manifest and deletes the data files referenced within. +async fn delete_data_files(io: &FileIO, manifest_paths: &HashSet) { + for manifest_path in manifest_paths { + let input = match io.new_input(manifest_path) { + Ok(input) => input, + Err(_) => continue, + }; + + let manifest_content = match input.read().await { + Ok(content) => content, + Err(_) => continue, + }; + + let manifest = match crate::spec::Manifest::parse_avro(&manifest_content) { + Ok(manifest) => manifest, + Err(_) => continue, + }; + + for entry in manifest.entries() { + let _ = io.delete(entry.data_file.file_path()).await; + } + } +} + +/// Deletes a collection of files, suppressing individual failures. +async fn delete_files<'a>(io: &FileIO, paths: impl Iterator) { + for path in paths { + let _ = io.delete(path).await; + } +}