diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs index 3551b05160..e8140b7af0 100644 --- a/crates/catalog/rest/src/catalog.rs +++ b/crates/catalog/rest/src/catalog.rs @@ -24,6 +24,7 @@ use std::sync::Arc; use async_trait::async_trait; use iceberg::io::{FileIO, FileIOBuilder, StorageFactory}; +use iceberg::spec::TableMetadata; use iceberg::table::Table; use iceberg::{ Catalog, CatalogBuilder, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, @@ -33,7 +34,7 @@ use itertools::Itertools; use reqwest::header::{ HeaderMap, HeaderName, HeaderValue, {self}, }; -use reqwest::{Client, Method, StatusCode, Url}; +use reqwest::{Client, Method, StatusCode}; use tokio::sync::OnceCell; use typed_builder::TypedBuilder; @@ -406,7 +407,7 @@ impl RestCatalog { async fn load_file_io( &self, - metadata_location: Option<&str>, + metadata: &TableMetadata, extra_config: Option>, ) -> Result { let mut props = self.context().await?.config.props.clone(); @@ -414,21 +415,6 @@ impl RestCatalog { props.extend(config); } - // If the warehouse is a logical identifier instead of a URL we don't want - // to raise an exception - let warehouse_path = match self.context().await?.config.warehouse.as_deref() { - Some(url) if Url::parse(url).is_ok() => Some(url), - Some(_) => None, - None => None, - }; - - if metadata_location.or(warehouse_path).is_none() { - return Err(Error::new( - ErrorKind::Unexpected, - "Unable to load file io, neither warehouse nor metadata location is set!", - )); - } - // Require a StorageFactory to be provided let factory = self .storage_factory @@ -438,7 +424,8 @@ impl RestCatalog { ErrorKind::Unexpected, "StorageFactory must be provided for RestCatalog. Use `with_storage_factory` to configure it.", ) - })?; + })? + .with_metadata(metadata)?; let file_io = FileIOBuilder::new(factory).with_props(props).build(); @@ -743,10 +730,12 @@ impl Catalog for RestCatalog { } }; - let metadata_location = response.metadata_location.as_ref().ok_or(Error::new( - ErrorKind::DataInvalid, - "Metadata location missing in `create_table` response!", - ))?; + if response.metadata_location.is_none() { + return Err(Error::new( + ErrorKind::DataInvalid, + "Metadata location missing in `create_table` response!", + )); + } let config = response .config @@ -754,9 +743,7 @@ impl Catalog for RestCatalog { .chain(self.user_config.props.clone()) .collect(); - let file_io = self - .load_file_io(Some(metadata_location), Some(config)) - .await?; + let file_io = self.load_file_io(&response.metadata, Some(config)).await?; let table_builder = Table::builder() .identifier(table_ident.clone()) @@ -810,9 +797,7 @@ impl Catalog for RestCatalog { .chain(self.user_config.props.clone()) .collect(); - let file_io = self - .load_file_io(response.metadata_location.as_deref(), Some(config)) - .await?; + let file_io = self.load_file_io(&response.metadata, Some(config)).await?; let table_builder = Table::builder() .identifier(table_ident.clone()) @@ -960,7 +945,7 @@ impl Catalog for RestCatalog { "Metadata location missing in `register_table` response!", ))?; - let file_io = self.load_file_io(Some(metadata_location), None).await?; + let file_io = self.load_file_io(&response.metadata, None).await?; Table::builder() .identifier(table_ident.clone()) @@ -1030,9 +1015,7 @@ impl Catalog for RestCatalog { } }; - let file_io = self - .load_file_io(Some(&response.metadata_location), None) - .await?; + let file_io = self.load_file_io(&response.metadata, None).await?; Table::builder() .identifier(commit.identifier().clone()) diff --git a/crates/iceberg/src/io/storage/local_fs.rs b/crates/iceberg/src/io/storage/local_fs.rs index e96e951baa..36c93990f0 100644 --- a/crates/iceberg/src/io/storage/local_fs.rs +++ b/crates/iceberg/src/io/storage/local_fs.rs @@ -328,6 +328,13 @@ pub struct LocalFsStorageFactory; #[typetag::serde] impl StorageFactory for LocalFsStorageFactory { + fn with_metadata( + &self, + _metadata: &crate::spec::TableMetadata, + ) -> Result> { + Ok(Arc::new(self.clone())) + } + fn build(&self, _config: &StorageConfig) -> Result> { Ok(Arc::new(LocalFsStorage::new())) } diff --git a/crates/iceberg/src/io/storage/memory.rs b/crates/iceberg/src/io/storage/memory.rs index f33dbd07b1..a0230f9130 100644 --- a/crates/iceberg/src/io/storage/memory.rs +++ b/crates/iceberg/src/io/storage/memory.rs @@ -248,6 +248,13 @@ pub struct MemoryStorageFactory; #[typetag::serde] impl StorageFactory for MemoryStorageFactory { + fn with_metadata( + &self, + _metadata: &crate::spec::TableMetadata, + ) -> Result> { + Ok(Arc::new(self.clone())) + } + fn build(&self, _config: &StorageConfig) -> Result> { Ok(Arc::new(MemoryStorage::new())) } diff --git a/crates/iceberg/src/io/storage/mod.rs b/crates/iceberg/src/io/storage/mod.rs index 5276c7771f..59122c7df1 100644 --- a/crates/iceberg/src/io/storage/mod.rs +++ b/crates/iceberg/src/io/storage/mod.rs @@ -33,6 +33,7 @@ pub use memory::{MemoryStorage, MemoryStorageFactory}; use super::{FileMetadata, FileRead, FileWrite, InputFile, OutputFile}; use crate::Result; +use crate::spec::TableMetadata; /// Trait for storage operations in Iceberg. /// @@ -128,6 +129,19 @@ pub trait Storage: Debug + Send + Sync { /// ``` #[typetag::serde(tag = "type")] pub trait StorageFactory: Debug + Send + Sync { + /// Create a new factory instance enriched with table metadata. + /// + /// This allows storage factories to incorporate table-level metadata + /// (e.g., table properties) into the storage initialization. + /// + /// Implementations that don't need table metadata should return + /// a clone of themselves: `Ok(Arc::new(self.clone()))`. + /// + /// # Arguments + /// + /// * `metadata` - The table metadata to incorporate + fn with_metadata(&self, metadata: &TableMetadata) -> Result>; + /// Build a new Storage instance from the given configuration. /// /// # Arguments diff --git a/crates/storage/opendal/src/lib.rs b/crates/storage/opendal/src/lib.rs index 8160680523..ccf18bb6ef 100644 --- a/crates/storage/opendal/src/lib.rs +++ b/crates/storage/opendal/src/lib.rs @@ -131,6 +131,13 @@ pub enum OpenDalStorageFactory { #[typetag::serde(name = "OpenDalStorageFactory")] impl StorageFactory for OpenDalStorageFactory { + fn with_metadata( + &self, + _metadata: &iceberg::spec::TableMetadata, + ) -> Result> { + Ok(Arc::new(self.clone())) + } + #[allow(unused_variables)] fn build(&self, config: &StorageConfig) -> Result> { match self { diff --git a/crates/storage/opendal/src/resolving.rs b/crates/storage/opendal/src/resolving.rs index 7c06cf96a5..63f777fd55 100644 --- a/crates/storage/opendal/src/resolving.rs +++ b/crates/storage/opendal/src/resolving.rs @@ -182,6 +182,13 @@ impl OpenDalResolvingStorageFactory { #[typetag::serde] impl StorageFactory for OpenDalResolvingStorageFactory { + fn with_metadata( + &self, + _metadata: &iceberg::spec::TableMetadata, + ) -> Result> { + Ok(Arc::new(self.clone())) + } + fn build(&self, config: &StorageConfig) -> Result> { Ok(Arc::new(OpenDalResolvingStorage { props: config.props().clone(),