diff --git a/crates/iceberg/src/io/storage/opendal/mod.rs b/crates/iceberg/src/io/storage/opendal/mod.rs index ee0df20fce..3f0c9274a7 100644 --- a/crates/iceberg/src/io/storage/opendal/mod.rs +++ b/crates/iceberg/src/io/storage/opendal/mod.rs @@ -17,8 +17,9 @@ //! OpenDAL-based storage implementation. +use std::collections::HashMap; use std::ops::Range; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use async_trait::async_trait; #[cfg(feature = "storage-azdls")] @@ -122,20 +123,24 @@ impl StorageFactory for OpenDalStorageFactory { configured_scheme: "s3".to_string(), config: s3_config_parse(config.props().clone())?.into(), customized_credential_load: customized_credential_load.clone(), + operator_cache: default_operator_cache(), })), #[cfg(feature = "storage-gcs")] OpenDalStorageFactory::Gcs => Ok(Arc::new(OpenDalStorage::Gcs { config: gcs_config_parse(config.props().clone())?.into(), + operator_cache: default_operator_cache(), })), #[cfg(feature = "storage-oss")] OpenDalStorageFactory::Oss => Ok(Arc::new(OpenDalStorage::Oss { config: oss_config_parse(config.props().clone())?.into(), + operator_cache: default_operator_cache(), })), #[cfg(feature = "storage-azdls")] OpenDalStorageFactory::Azdls { configured_scheme } => { Ok(Arc::new(OpenDalStorage::Azdls { configured_scheme: configured_scheme.clone(), config: azdls_config_parse(config.props().clone())?.into(), + operator_cache: default_operator_cache(), })) } #[cfg(all( @@ -160,6 +165,11 @@ fn default_memory_operator() -> Operator { memory_config_build().expect("Failed to create default memory operator") } +/// Default empty operator cache for serde deserialization. +fn default_operator_cache() -> Arc>> { + Arc::new(Mutex::new(HashMap::new())) +} + /// OpenDAL-based storage implementation. #[derive(Clone, Debug, Serialize, Deserialize)] pub enum OpenDalStorage { @@ -180,18 +190,27 @@ pub enum OpenDalStorage { /// Custom AWS credential loader. #[serde(skip)] customized_credential_load: Option, + /// Reuses operators across calls to avoid rebuilding credential chains. + #[serde(skip, default = "default_operator_cache")] + operator_cache: Arc>>, }, /// GCS storage variant. #[cfg(feature = "storage-gcs")] Gcs { /// GCS configuration. config: Arc, + /// Reuses operators across calls to avoid rebuilding credential chains. + #[serde(skip, default = "default_operator_cache")] + operator_cache: Arc>>, }, /// OSS storage variant. #[cfg(feature = "storage-oss")] Oss { /// OSS configuration. config: Arc, + /// Reuses operators across calls to avoid rebuilding credential chains. + #[serde(skip, default = "default_operator_cache")] + operator_cache: Arc>>, }, /// Azure Data Lake Storage variant. /// Expects paths of the form @@ -206,9 +225,32 @@ pub enum OpenDalStorage { configured_scheme: AzureStorageScheme, /// Azure DLS configuration. config: Arc, + /// Reuses operators across calls to avoid rebuilding credential chains. + #[serde(skip, default = "default_operator_cache")] + operator_cache: Arc>>, }, } +/// Returns a cached operator for the given key, or creates and caches a new one. +fn get_or_create_operator( + cache: &Mutex>, + key: &str, + build: impl FnOnce() -> Result, +) -> Result { + let mut guard = cache.lock().map_err(|e| { + Error::new( + ErrorKind::Unexpected, + format!("Operator cache lock poisoned: {e}"), + ) + })?; + if let Some(op) = guard.get(key) { + return Ok(op.clone()); + } + let op = build()?.layer(RetryLayer::new()); + guard.insert(key.to_string(), op.clone()); + Ok(op) +} + impl OpenDalStorage { /// Convert iceberg config to opendal config. /// @@ -230,14 +272,17 @@ impl OpenDalStorage { customized_credential_load: extensions .get::() .map(Arc::unwrap_or_clone), + operator_cache: default_operator_cache(), }), #[cfg(feature = "storage-gcs")] Scheme::Gcs => Ok(Self::Gcs { config: gcs_config_parse(props)?.into(), + operator_cache: default_operator_cache(), }), #[cfg(feature = "storage-oss")] Scheme::Oss => Ok(Self::Oss { config: oss_config_parse(props)?.into(), + operator_cache: default_operator_cache(), }), #[cfg(feature = "storage-azdls")] Scheme::Azdls => { @@ -245,6 +290,7 @@ impl OpenDalStorage { Ok(Self::Azdls { config: azdls_config_parse(props)?.into(), configured_scheme: scheme, + operator_cache: default_operator_cache(), }) } // Update doc on [`FileIO`] when adding new schemes. @@ -276,15 +322,16 @@ impl OpenDalStorage { let (operator, relative_path): (Operator, &str) = match self { #[cfg(feature = "storage-memory")] OpenDalStorage::Memory(op) => { + let op = op.clone().layer(RetryLayer::new()); if let Some(stripped) = path.strip_prefix("memory:/") { - (op.clone(), stripped) + (op, stripped) } else { - (op.clone(), &path[1..]) + (op, &path[1..]) } } #[cfg(feature = "storage-fs")] OpenDalStorage::LocalFs => { - let op = fs_config_build()?; + let op = fs_config_build()?.layer(RetryLayer::new()); if let Some(stripped) = path.strip_prefix("file:/") { (op, stripped) } else { @@ -296,11 +343,21 @@ impl OpenDalStorage { configured_scheme, config, customized_credential_load, + operator_cache, } => { - let op = s3_config_build(config, customized_credential_load, path)?; + let url = url::Url::parse(path)?; + let bucket = url.host_str().ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("Invalid s3 url: {path}, missing bucket"), + ) + })?; + + let op = get_or_create_operator(operator_cache, bucket, || { + s3_config_build(config, customized_credential_load, path) + })?; let op_info = op.info(); - // Check prefix of s3 path. let prefix = format!("{}://{}/", configured_scheme, op_info.name()); if path.starts_with(&prefix) { (op, &path[prefix.len()..]) @@ -312,8 +369,21 @@ impl OpenDalStorage { } } #[cfg(feature = "storage-gcs")] - OpenDalStorage::Gcs { config } => { - let operator = gcs_config_build(config, path)?; + OpenDalStorage::Gcs { + config, + operator_cache, + } => { + let url = url::Url::parse(path)?; + let bucket = url.host_str().ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("Invalid gcs url: {path}, missing bucket"), + ) + })?; + + let operator = get_or_create_operator(operator_cache, bucket, || { + gcs_config_build(config, path) + })?; let prefix = format!("gs://{}/", operator.info().name()); if path.starts_with(&prefix) { (operator, &path[prefix.len()..]) @@ -325,8 +395,21 @@ impl OpenDalStorage { } } #[cfg(feature = "storage-oss")] - OpenDalStorage::Oss { config } => { - let op = oss_config_build(config, path)?; + OpenDalStorage::Oss { + config, + operator_cache, + } => { + let url = url::Url::parse(path)?; + let bucket = url.host_str().ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("Invalid oss url: {path}, missing bucket"), + ) + })?; + + let op = get_or_create_operator(operator_cache, bucket, || { + oss_config_build(config, path) + })?; let prefix = format!("oss://{}/", op.info().name()); if path.starts_with(&prefix) { (op, &path[prefix.len()..]) @@ -341,7 +424,41 @@ impl OpenDalStorage { OpenDalStorage::Azdls { configured_scheme, config, - } => azdls_create_operator(path, config, configured_scheme)?, + operator_cache, + } => { + let url = url::Url::parse(path)?; + let filesystem = url.username().to_string(); + + let op = { + let guard = operator_cache.lock().map_err(|e| { + Error::new( + ErrorKind::Unexpected, + format!("Operator cache lock poisoned: {e}"), + ) + })?; + guard.get(&filesystem).cloned() + }; + + match op { + Some(op) => { + let relative_path = &path[path.len() - url.path().len()..]; + (op, relative_path) + } + None => { + let (op, relative_path) = + azdls_create_operator(path, config, configured_scheme)?; + let op = op.layer(RetryLayer::new()); + let mut guard = operator_cache.lock().map_err(|e| { + Error::new( + ErrorKind::Unexpected, + format!("Operator cache lock poisoned: {e}"), + ) + })?; + guard.insert(filesystem, op.clone()); + (op, relative_path) + } + } + } #[cfg(all( not(feature = "storage-s3"), not(feature = "storage-fs"), @@ -357,9 +474,6 @@ impl OpenDalStorage { } }; - // Transient errors are common for object stores; however there's no - // harm in retrying temporary failures for other storage backends as well. - let operator = operator.layer(RetryLayer::new()); Ok((operator, relative_path)) }