diff --git a/crates/storage/opendal/src/lib.rs b/crates/storage/opendal/src/lib.rs index 7c11f80add..8160680523 100644 --- a/crates/storage/opendal/src/lib.rs +++ b/crates/storage/opendal/src/lib.rs @@ -90,6 +90,9 @@ cfg_if! { } } +mod resolving; +pub use resolving::{OpenDalResolvingStorage, OpenDalResolvingStorageFactory}; + /// OpenDAL-based storage factory. /// /// Maps scheme to the corresponding OpenDalStorage storage variant. diff --git a/crates/storage/opendal/src/resolving.rs b/crates/storage/opendal/src/resolving.rs new file mode 100644 index 0000000000..7c06cf96a5 --- /dev/null +++ b/crates/storage/opendal/src/resolving.rs @@ -0,0 +1,319 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Resolving storage that auto-detects the scheme from a path and delegates +//! to the appropriate [`OpenDalStorage`] variant. + +use std::collections::HashMap; +use std::sync::{Arc, RwLock}; + +use async_trait::async_trait; +use bytes::Bytes; +use futures::StreamExt; +use futures::stream::BoxStream; +use iceberg::io::{ + FileMetadata, FileRead, FileWrite, InputFile, OutputFile, Storage, StorageConfig, + StorageFactory, +}; +use iceberg::{Error, ErrorKind, Result}; +use opendal::Scheme; +use serde::{Deserialize, Serialize}; +use url::Url; + +use crate::OpenDalStorage; +#[cfg(feature = "opendal-s3")] +use crate::s3::CustomAwsCredentialLoader; + +/// Schemes supported by OpenDalResolvingStorage +pub const SCHEME_MEMORY: &str = "memory"; +pub const SCHEME_FILE: &str = "file"; +pub const SCHEME_S3: &str = "s3"; +pub const SCHEME_S3A: &str = "s3a"; +pub const SCHEME_S3N: &str = "s3n"; +pub const SCHEME_GS: &str = "gs"; +pub const SCHEME_GCS: &str = "gcs"; +pub const SCHEME_OSS: &str = "oss"; +pub const SCHEME_ABFSS: &str = "abfss"; +pub const SCHEME_ABFS: &str = "abfs"; +pub const SCHEME_WASBS: &str = "wasbs"; +pub const SCHEME_WASB: &str = "wasb"; + +/// Parse a URL scheme string into an [`opendal::Scheme`]. +fn parse_scheme(scheme: &str) -> Result { + match scheme { + SCHEME_MEMORY => Ok(Scheme::Memory), + SCHEME_FILE | "" => Ok(Scheme::Fs), + SCHEME_S3 | SCHEME_S3A | SCHEME_S3N => Ok(Scheme::S3), + SCHEME_GS | SCHEME_GCS => Ok(Scheme::Gcs), + SCHEME_OSS => Ok(Scheme::Oss), + SCHEME_ABFSS | SCHEME_ABFS | SCHEME_WASBS | SCHEME_WASB => Ok(Scheme::Azdls), + s => s.parse::().map_err(|e| { + Error::new( + ErrorKind::FeatureUnsupported, + format!("Unsupported storage scheme: {s}: {e}"), + ) + }), + } +} + +/// Extract the scheme string from a path URL. +fn extract_scheme(path: &str) -> Result { + let url = Url::parse(path).map_err(|e| { + Error::new( + ErrorKind::DataInvalid, + format!("Invalid path: {path}, failed to parse URL: {e}"), + ) + })?; + Ok(url.scheme().to_string()) +} + +/// Build an [`OpenDalStorage`] variant for the given scheme and config properties. +fn build_storage_for_scheme( + scheme: &str, + props: &HashMap, + #[cfg(feature = "opendal-s3")] customized_credential_load: &Option, +) -> Result { + match parse_scheme(scheme)? { + #[cfg(feature = "opendal-s3")] + Scheme::S3 => { + let config = crate::s3::s3_config_parse(props.clone())?; + Ok(OpenDalStorage::S3 { + configured_scheme: scheme.to_string(), + config: Arc::new(config), + customized_credential_load: customized_credential_load.clone(), + }) + } + #[cfg(feature = "opendal-gcs")] + Scheme::Gcs => { + let config = crate::gcs::gcs_config_parse(props.clone())?; + Ok(OpenDalStorage::Gcs { + config: Arc::new(config), + }) + } + #[cfg(feature = "opendal-oss")] + Scheme::Oss => { + let config = crate::oss::oss_config_parse(props.clone())?; + Ok(OpenDalStorage::Oss { + config: Arc::new(config), + }) + } + #[cfg(feature = "opendal-azdls")] + Scheme::Azdls => { + let configured_scheme: crate::azdls::AzureStorageScheme = scheme.parse()?; + let config = crate::azdls::azdls_config_parse(props.clone())?; + Ok(OpenDalStorage::Azdls { + configured_scheme, + config: Arc::new(config), + }) + } + #[cfg(feature = "opendal-fs")] + Scheme::Fs => Ok(OpenDalStorage::LocalFs), + #[cfg(feature = "opendal-memory")] + Scheme::Memory => Ok(OpenDalStorage::Memory(crate::memory::memory_config_build()?)), + unsupported => Err(Error::new( + ErrorKind::FeatureUnsupported, + format!("Unsupported storage scheme: {unsupported}"), + )), + } +} + +/// A resolving storage factory that creates [`OpenDalResolvingStorage`] instances. +/// +/// This factory accepts paths from any supported storage system and dynamically +/// delegates operations to the appropriate [`OpenDalStorage`] variant based on +/// the path scheme. +/// +/// # Example +/// +/// ```rust,ignore +/// use std::sync::Arc; +/// use iceberg::io::FileIOBuilder; +/// use iceberg_storage_opendal::OpenDalResolvingStorageFactory; +/// +/// let factory = OpenDalResolvingStorageFactory::new(); +/// let file_io = FileIOBuilder::new(Arc::new(factory)) +/// .with_prop("s3.region", "us-east-1") +/// .build(); +/// ``` +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct OpenDalResolvingStorageFactory { + /// Custom AWS credential loader for S3 storage. + #[cfg(feature = "opendal-s3")] + #[serde(skip)] + customized_credential_load: Option, +} + +impl Default for OpenDalResolvingStorageFactory { + fn default() -> Self { + Self::new() + } +} + +impl OpenDalResolvingStorageFactory { + /// Create a new resolving storage factory. + pub fn new() -> Self { + Self { + #[cfg(feature = "opendal-s3")] + customized_credential_load: None, + } + } + + /// Set a custom AWS credential loader for S3 storage. + #[cfg(feature = "opendal-s3")] + pub fn with_s3_credential_loader(mut self, loader: CustomAwsCredentialLoader) -> Self { + self.customized_credential_load = Some(loader); + self + } +} + +#[typetag::serde] +impl StorageFactory for OpenDalResolvingStorageFactory { + fn build(&self, config: &StorageConfig) -> Result> { + Ok(Arc::new(OpenDalResolvingStorage { + props: config.props().clone(), + storages: RwLock::new(HashMap::new()), + #[cfg(feature = "opendal-s3")] + customized_credential_load: self.customized_credential_load.clone(), + })) + } +} + +/// A resolving storage that auto-detects the scheme from a path and delegates +/// to the appropriate [`OpenDalStorage`] variant. +/// +/// Sub-storages are lazily created on first use for each scheme and cached +/// for subsequent operations. +#[derive(Debug, Serialize, Deserialize)] +pub struct OpenDalResolvingStorage { + /// Configuration properties shared across all backends. + props: HashMap, + /// Cache of scheme → storage mappings. + #[serde(skip, default)] + storages: RwLock>>, + /// Custom AWS credential loader for S3 storage. + #[cfg(feature = "opendal-s3")] + #[serde(skip)] + customized_credential_load: Option, +} + +impl OpenDalResolvingStorage { + /// Resolve the storage for the given path by extracting the scheme and + /// returning the cached or newly-created [`OpenDalStorage`]. + fn resolve(&self, path: &str) -> Result> { + let scheme = extract_scheme(path)?; + + // Fast path: check read lock first. + { + let cache = self + .storages + .read() + .map_err(|_| Error::new(ErrorKind::Unexpected, "Storage cache lock poisoned"))?; + if let Some(storage) = cache.get(&scheme) { + return Ok(storage.clone()); + } + } + + // Slow path: build and insert under write lock. + let mut cache = self + .storages + .write() + .map_err(|_| Error::new(ErrorKind::Unexpected, "Storage cache lock poisoned"))?; + + // Double-check after acquiring write lock. + if let Some(storage) = cache.get(&scheme) { + return Ok(storage.clone()); + } + + let storage = build_storage_for_scheme( + &scheme, + &self.props, + #[cfg(feature = "opendal-s3")] + &self.customized_credential_load, + )?; + let storage = Arc::new(storage); + cache.insert(scheme, storage.clone()); + Ok(storage) + } +} + +#[async_trait] +#[typetag::serde] +impl Storage for OpenDalResolvingStorage { + async fn exists(&self, path: &str) -> Result { + self.resolve(path)?.exists(path).await + } + + async fn metadata(&self, path: &str) -> Result { + self.resolve(path)?.metadata(path).await + } + + async fn read(&self, path: &str) -> Result { + self.resolve(path)?.read(path).await + } + + async fn reader(&self, path: &str) -> Result> { + self.resolve(path)?.reader(path).await + } + + async fn write(&self, path: &str, bs: Bytes) -> Result<()> { + self.resolve(path)?.write(path, bs).await + } + + async fn writer(&self, path: &str) -> Result> { + self.resolve(path)?.writer(path).await + } + + async fn delete(&self, path: &str) -> Result<()> { + self.resolve(path)?.delete(path).await + } + + async fn delete_prefix(&self, path: &str) -> Result<()> { + self.resolve(path)?.delete_prefix(path).await + } + + async fn delete_stream(&self, mut paths: BoxStream<'static, String>) -> Result<()> { + // Group paths by scheme so each resolved storage receives a batch, + // avoiding repeated operator creation per path. + let mut grouped: HashMap> = HashMap::new(); + while let Some(path) = paths.next().await { + let scheme = extract_scheme(&path)?; + grouped.entry(scheme).or_default().push(path); + } + + for (_, paths) in grouped { + let storage = self.resolve(&paths[0])?; + storage + .delete_stream(futures::stream::iter(paths).boxed()) + .await?; + } + Ok(()) + } + + fn new_input(&self, path: &str) -> Result { + Ok(InputFile::new( + Arc::new(self.resolve(path)?.as_ref().clone()), + path.to_string(), + )) + } + + fn new_output(&self, path: &str) -> Result { + Ok(OutputFile::new( + Arc::new(self.resolve(path)?.as_ref().clone()), + path.to_string(), + )) + } +} diff --git a/crates/storage/opendal/tests/resolving_storage_test.rs b/crates/storage/opendal/tests/resolving_storage_test.rs new file mode 100644 index 0000000000..4572ad2c2d --- /dev/null +++ b/crates/storage/opendal/tests/resolving_storage_test.rs @@ -0,0 +1,297 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Integration tests for OpenDalResolvingStorage. +//! +//! These tests assume Docker containers are started externally via `make docker-up`. +//! Each test uses unique file paths based on module path to avoid conflicts. + +#[cfg(all( + feature = "opendal-s3", + feature = "opendal-fs", + feature = "opendal-memory" +))] +mod tests { + use std::sync::Arc; + + use iceberg::io::{ + FileIOBuilder, S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY, + }; + use iceberg_storage_opendal::OpenDalResolvingStorageFactory; + use iceberg_test_utils::{get_minio_endpoint, normalize_test_name_with_parts, set_up}; + + fn get_resolving_file_io() -> iceberg::io::FileIO { + set_up(); + + let minio_endpoint = get_minio_endpoint(); + + FileIOBuilder::new(Arc::new(OpenDalResolvingStorageFactory::new())) + .with_props(vec![ + (S3_ENDPOINT, minio_endpoint), + (S3_ACCESS_KEY_ID, "admin".to_string()), + (S3_SECRET_ACCESS_KEY, "password".to_string()), + (S3_REGION, "us-east-1".to_string()), + ]) + .build() + } + + fn temp_fs_path(name: &str) -> String { + let dir = std::env::temp_dir().join("iceberg_resolving_tests"); + std::fs::create_dir_all(&dir).unwrap(); + let path = dir.join(name); + // Clean up from previous runs + let _ = std::fs::remove_file(&path); + format!("file:/{}", path.display()) + } + + #[tokio::test] + async fn test_mixed_scheme_write_and_read() { + let file_io = get_resolving_file_io(); + + let s3_path = format!( + "s3://bucket1/{}", + normalize_test_name_with_parts!("test_mixed_scheme_write_and_read") + ); + let fs_path = temp_fs_path("mixed_write_and_read.txt"); + let mem_path = "memory://test_mixed_scheme_write_and_read"; + + // Write to all three schemes + file_io + .new_output(&s3_path) + .unwrap() + .write("from_s3".into()) + .await + .unwrap(); + file_io + .new_output(&fs_path) + .unwrap() + .write("from_fs".into()) + .await + .unwrap(); + file_io + .new_output(mem_path) + .unwrap() + .write("from_memory".into()) + .await + .unwrap(); + + // Read back from all three + assert_eq!( + file_io.new_input(&s3_path).unwrap().read().await.unwrap(), + bytes::Bytes::from("from_s3") + ); + assert_eq!( + file_io.new_input(&fs_path).unwrap().read().await.unwrap(), + bytes::Bytes::from("from_fs") + ); + assert_eq!( + file_io.new_input(mem_path).unwrap().read().await.unwrap(), + bytes::Bytes::from("from_memory") + ); + } + + #[tokio::test] + async fn test_mixed_scheme_exists_independently() { + let file_io = get_resolving_file_io(); + + let s3_path = format!( + "s3://bucket1/{}", + normalize_test_name_with_parts!("test_mixed_scheme_exists_independently") + ); + let fs_path = temp_fs_path("mixed_exists_independently.txt"); + let mem_path = "memory://test_mixed_scheme_exists_independently"; + + // Clean up S3 from previous runs + let _ = file_io.delete(&s3_path).await; + + // None exist initially + assert!(!file_io.exists(&s3_path).await.unwrap()); + assert!(!file_io.exists(&fs_path).await.unwrap()); + assert!(!file_io.exists(mem_path).await.unwrap()); + + // Write only to fs + file_io + .new_output(&fs_path) + .unwrap() + .write("fs_only".into()) + .await + .unwrap(); + + // Only fs exists + assert!(!file_io.exists(&s3_path).await.unwrap()); + assert!(file_io.exists(&fs_path).await.unwrap()); + assert!(!file_io.exists(mem_path).await.unwrap()); + } + + #[tokio::test] + async fn test_mixed_scheme_delete_one_keeps_others() { + let file_io = get_resolving_file_io(); + + let s3_path = format!( + "s3://bucket1/{}", + normalize_test_name_with_parts!("test_mixed_scheme_delete_one_keeps_others") + ); + let fs_path = temp_fs_path("mixed_delete_one_keeps_others.txt"); + let mem_path = "memory://test_mixed_scheme_delete_one_keeps_others"; + + // Write to all three + file_io + .new_output(&s3_path) + .unwrap() + .write("s3".into()) + .await + .unwrap(); + file_io + .new_output(&fs_path) + .unwrap() + .write("fs".into()) + .await + .unwrap(); + file_io + .new_output(mem_path) + .unwrap() + .write("mem".into()) + .await + .unwrap(); + + // Delete only the fs file + file_io.delete(&fs_path).await.unwrap(); + + // fs gone, S3 and memory still there + assert!(file_io.exists(&s3_path).await.unwrap()); + assert!(!file_io.exists(&fs_path).await.unwrap()); + assert!(file_io.exists(mem_path).await.unwrap()); + + assert_eq!( + file_io.new_input(&s3_path).unwrap().read().await.unwrap(), + bytes::Bytes::from("s3") + ); + assert_eq!( + file_io.new_input(mem_path).unwrap().read().await.unwrap(), + bytes::Bytes::from("mem") + ); + } + + #[tokio::test] + async fn test_mixed_scheme_interleaved_operations() { + let file_io = get_resolving_file_io(); + + let s3_path = format!( + "s3://bucket1/{}", + normalize_test_name_with_parts!("test_mixed_scheme_interleaved") + ); + let fs_path = temp_fs_path("mixed_interleaved.txt"); + let mem_path = "memory://test_mixed_scheme_interleaved"; + + // Interleave: write fs, write memory, write s3 + file_io + .new_output(&fs_path) + .unwrap() + .write("fs_data".into()) + .await + .unwrap(); + file_io + .new_output(mem_path) + .unwrap() + .write("mem_data".into()) + .await + .unwrap(); + file_io + .new_output(&s3_path) + .unwrap() + .write("s3_data".into()) + .await + .unwrap(); + + // Read in reverse order: s3, memory, fs + assert_eq!( + file_io.new_input(&s3_path).unwrap().read().await.unwrap(), + bytes::Bytes::from("s3_data") + ); + assert_eq!( + file_io.new_input(mem_path).unwrap().read().await.unwrap(), + bytes::Bytes::from("mem_data") + ); + assert_eq!( + file_io.new_input(&fs_path).unwrap().read().await.unwrap(), + bytes::Bytes::from("fs_data") + ); + } + + #[tokio::test] + async fn test_invalid_scheme() { + let file_io = get_resolving_file_io(); + let result = file_io.exists("unknown://bucket/key").await; + assert!(result.is_err()); + assert!( + result + .unwrap_err() + .to_string() + .contains("Unsupported storage scheme"), + ); + } + + #[tokio::test] + async fn test_missing_scheme() { + let file_io = get_resolving_file_io(); + let result = file_io.exists("no-scheme-path").await; + assert!(result.is_err()); + } + + #[cfg(feature = "opendal-s3")] + #[tokio::test] + async fn test_with_custom_credential_loader() { + use async_trait::async_trait; + use iceberg_storage_opendal::CustomAwsCredentialLoader; + use reqsign::{AwsCredential, AwsCredentialLoad}; + use reqwest::Client; + + struct MinioCredentialLoader; + + #[async_trait] + impl AwsCredentialLoad for MinioCredentialLoader { + async fn load_credential( + &self, + _client: Client, + ) -> anyhow::Result> { + Ok(Some(AwsCredential { + access_key_id: "admin".to_string(), + secret_access_key: "password".to_string(), + session_token: None, + expires_in: None, + })) + } + } + + set_up(); + let minio_endpoint = get_minio_endpoint(); + + let factory = OpenDalResolvingStorageFactory::new().with_s3_credential_loader( + CustomAwsCredentialLoader::new(Arc::new(MinioCredentialLoader)), + ); + + let file_io = FileIOBuilder::new(Arc::new(factory)) + .with_props(vec![ + (S3_ENDPOINT, minio_endpoint), + (S3_REGION, "us-east-1".to_string()), + ]) + .build(); + + // Should be able to access S3 using the custom credential loader + assert!(file_io.exists("s3://bucket1/").await.unwrap()); + } +}