diff --git a/crates/catalog/glue/src/catalog.rs b/crates/catalog/glue/src/catalog.rs
index 9e9d4580c3..5d7802912c 100644
--- a/crates/catalog/glue/src/catalog.rs
+++ b/crates/catalog/glue/src/catalog.rs
@@ -642,7 +642,7 @@ impl Catalog for GlueCatalog {
/// attempting to drop the table. This includes scenarios where
/// the table does not exist.
/// - Any network or communication error occurs with the database backend.
- async fn drop_table(&self, table: &TableIdent) -> Result<()> {
+ async fn drop_table_with_purge(&self, table: &TableIdent, _purge: bool) -> Result<()> {
let db_name = validate_namespace(table.namespace())?;
let table_name = table.name();
diff --git a/crates/catalog/hms/src/catalog.rs b/crates/catalog/hms/src/catalog.rs
index bd78193732..a2161707ff 100644
--- a/crates/catalog/hms/src/catalog.rs
+++ b/crates/catalog/hms/src/catalog.rs
@@ -580,7 +580,7 @@ impl Catalog for HmsCatalog {
/// attempting to drop the table. This includes scenarios where
/// the table does not exist.
/// - Any network or communication error occurs with the database backend.
- async fn drop_table(&self, table: &TableIdent) -> Result<()> {
+ async fn drop_table_with_purge(&self, table: &TableIdent, _purge: bool) -> Result<()> {
let db_name = validate_namespace(table.namespace())?;
if !self.namespace_exists(table.namespace()).await? {
return Err(Error::new(
diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs
index 3551b05160..da490d7d95 100644
--- a/crates/catalog/rest/src/catalog.rs
+++ b/crates/catalog/rest/src/catalog.rs
@@ -827,13 +827,18 @@ impl Catalog for RestCatalog {
}
/// Drop a table from the catalog.
- async fn drop_table(&self, table: &TableIdent) -> Result<()> {
+ async fn drop_table_with_purge(&self, table: &TableIdent, purge: bool) -> Result<()> {
let context = self.context().await?;
- let request = context
+ let mut request_builder = context
.client
- .request(Method::DELETE, context.config.table_endpoint(table))
- .build()?;
+ .request(Method::DELETE, context.config.table_endpoint(table));
+
+ if purge {
+ request_builder = request_builder.query(&[("purgeRequested", "true")]);
+ }
+
+ let request = request_builder.build()?;
let http_response = context.client.query_catalog(request).await?;
diff --git a/crates/catalog/s3tables/src/catalog.rs b/crates/catalog/s3tables/src/catalog.rs
index a416c38f22..2bd57d9c74 100644
--- a/crates/catalog/s3tables/src/catalog.rs
+++ b/crates/catalog/s3tables/src/catalog.rs
@@ -570,7 +570,7 @@ impl Catalog for S3TablesCatalog {
/// This function can return an error in the following situations:
/// - Errors from the underlying database deletion process, converted using
/// `from_aws_sdk_error`.
- async fn drop_table(&self, table: &TableIdent) -> Result<()> {
+ async fn drop_table_with_purge(&self, table: &TableIdent, _purge: bool) -> Result<()> {
let req = self
.s3tables_client
.delete_table()
diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs
index 195f6c9de4..55f738b9c6 100644
--- a/crates/catalog/sql/src/catalog.rs
+++ b/crates/catalog/sql/src/catalog.rs
@@ -729,7 +729,7 @@ impl Catalog for SqlCatalog {
}
}
- async fn drop_table(&self, identifier: &TableIdent) -> Result<()> {
+ async fn drop_table_with_purge(&self, identifier: &TableIdent, _purge: bool) -> Result<()> {
if !self.table_exists(identifier).await? {
return no_such_table_err(identifier);
}
diff --git a/crates/iceberg/src/catalog/memory/catalog.rs b/crates/iceberg/src/catalog/memory/catalog.rs
index 25ae004417..ffbcb88b45 100644
--- a/crates/iceberg/src/catalog/memory/catalog.rs
+++ b/crates/iceberg/src/catalog/memory/catalog.rs
@@ -319,7 +319,7 @@ impl Catalog for MemoryCatalog {
}
/// Drop a table from the catalog.
- async fn drop_table(&self, table_ident: &TableIdent) -> Result<()> {
+ async fn drop_table_with_purge(&self, table_ident: &TableIdent, _purge: bool) -> Result<()> {
let mut root_namespace_state = self.root_namespace_state.lock().await;
root_namespace_state.remove_existing_table(table_ident)?;
diff --git a/crates/iceberg/src/catalog/mod.rs b/crates/iceberg/src/catalog/mod.rs
index 06326917ec..59c4947cf5 100644
--- a/crates/iceberg/src/catalog/mod.rs
+++ b/crates/iceberg/src/catalog/mod.rs
@@ -19,6 +19,7 @@
pub mod memory;
mod metadata_location;
+pub mod utils;
use std::collections::HashMap;
use std::fmt::{Debug, Display};
@@ -95,8 +96,17 @@ pub trait Catalog: Debug + Sync + Send {
/// Load table from the catalog.
async fn load_table(&self, table: &TableIdent) -> Result
;
+ /// Drop a table from the catalog and purge its data, or returns error if it doesn't exist.
+ ///
+ /// This is equivalent to calling `drop_table_with_purge(table, true)`.
+ async fn drop_table(&self, table: &TableIdent) -> Result<()> {
+ self.drop_table_with_purge(table, true).await
+ }
+
/// Drop a table from the catalog, or returns error if it doesn't exist.
- async fn drop_table(&self, table: &TableIdent) -> Result<()>;
+ ///
+ /// If `purge` is true, the catalog should also delete the underlying table data.
+ async fn drop_table_with_purge(&self, table: &TableIdent, purge: bool) -> Result<()>;
/// Check if a table exists in the catalog.
async fn table_exists(&self, table: &TableIdent) -> Result;
diff --git a/crates/iceberg/src/catalog/utils.rs b/crates/iceberg/src/catalog/utils.rs
new file mode 100644
index 0000000000..000de96b22
--- /dev/null
+++ b/crates/iceberg/src/catalog/utils.rs
@@ -0,0 +1,151 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Utility functions for catalog operations.
+
+use std::collections::HashSet;
+
+use crate::io::FileIO;
+use crate::spec::TableMetadata;
+use crate::Result;
+
+/// Property key for enabling garbage collection on drop.
+/// When set to `false`, data files will not be deleted when a table is dropped.
+/// Defaults to `true`.
+pub const GC_ENABLED: &str = "gc.enabled";
+const GC_ENABLED_DEFAULT: bool = true;
+
+/// Deletes all data and metadata files referenced by the given table metadata.
+///
+/// This mirrors the Java implementation's `CatalogUtil.dropTableData`.
+/// It collects all manifest files, manifest lists, previous metadata files,
+/// statistics files, and partition statistics files, then deletes them.
+///
+/// Data files within manifests are only deleted if the `gc.enabled` table
+/// property is `true` (the default), to avoid corrupting other tables that
+/// may share the same data files.
+///
+/// Individual file deletion failures are suppressed to complete as much
+/// cleanup as possible, matching the Java behavior.
+pub async fn drop_table_data(
+ io: &FileIO,
+ metadata: &TableMetadata,
+ metadata_location: Option<&str>,
+) -> Result<()> {
+ let mut manifest_lists_to_delete: HashSet = HashSet::new();
+ let mut manifests_to_delete: HashSet = HashSet::new();
+
+ for snapshot in metadata.snapshots() {
+ // Collect the manifest list location
+ let manifest_list_location = snapshot.manifest_list();
+ if !manifest_list_location.is_empty() {
+ manifest_lists_to_delete.insert(manifest_list_location.to_string());
+ }
+
+ // Load all manifests from this snapshot
+ match snapshot.load_manifest_list(io, metadata).await {
+ Ok(manifest_list) => {
+ for manifest_file in manifest_list.entries() {
+ manifests_to_delete.insert(manifest_file.manifest_path.clone());
+ }
+ }
+ Err(_) => {
+ // Suppress failure to continue cleanup
+ }
+ }
+ }
+
+ let gc_enabled = metadata
+ .properties()
+ .get(GC_ENABLED)
+ .and_then(|v| v.parse::().ok())
+ .unwrap_or(GC_ENABLED_DEFAULT);
+
+ // Delete data files only if gc.enabled is true, to avoid corrupting shared tables
+ if gc_enabled {
+ delete_data_files(io, &manifests_to_delete).await;
+ }
+
+ // Delete manifest files
+ delete_files(io, manifests_to_delete.iter().map(String::as_str)).await;
+
+ // Delete manifest lists
+ delete_files(io, manifest_lists_to_delete.iter().map(String::as_str)).await;
+
+ // Delete previous metadata files
+ delete_files(
+ io,
+ metadata.metadata_log().iter().map(|m| m.metadata_file.as_str()),
+ )
+ .await;
+
+ // Delete statistics files
+ delete_files(
+ io,
+ metadata
+ .statistics_iter()
+ .map(|s| s.statistics_path.as_str()),
+ )
+ .await;
+
+ // Delete partition statistics files
+ delete_files(
+ io,
+ metadata
+ .partition_statistics_iter()
+ .map(|s| s.statistics_path.as_str()),
+ )
+ .await;
+
+ // Delete the current metadata file
+ if let Some(location) = metadata_location {
+ let _ = io.delete(location).await;
+ }
+
+ Ok(())
+}
+
+/// Reads each manifest and deletes the data files referenced within.
+async fn delete_data_files(io: &FileIO, manifest_paths: &HashSet) {
+ for manifest_path in manifest_paths {
+ let input = match io.new_input(manifest_path) {
+ Ok(input) => input,
+ Err(_) => continue,
+ };
+
+ let manifest_content = match input.read().await {
+ Ok(content) => content,
+ Err(_) => continue,
+ };
+
+ let manifest = match crate::spec::Manifest::parse_avro(&manifest_content) {
+ Ok(manifest) => manifest,
+ Err(_) => continue,
+ };
+
+ for entry in manifest.entries() {
+ let _ = io.delete(entry.data_file.file_path()).await;
+ }
+ }
+}
+
+/// Deletes a collection of files, suppressing individual failures.
+async fn delete_files<'a>(io: &FileIO, paths: impl Iterator- ) {
+ for path in paths {
+ let _ = io.delete(path).await;
+ }
+}