From 376640253da8ffc56856d1f9c22dd7618995e880 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Wed, 11 Mar 2026 14:45:58 -0700 Subject: [PATCH 1/8] implement opendal resolving storage --- crates/storage/opendal/src/lib.rs | 3 + crates/storage/opendal/src/resolving.rs | 264 ++++++++ .../opendal/tests/resolving_storage_test.rs | 605 ++++++++++++++++++ 3 files changed, 872 insertions(+) create mode 100644 crates/storage/opendal/src/resolving.rs create mode 100644 crates/storage/opendal/tests/resolving_storage_test.rs diff --git a/crates/storage/opendal/src/lib.rs b/crates/storage/opendal/src/lib.rs index 1e5043acae..73047ceea0 100644 --- a/crates/storage/opendal/src/lib.rs +++ b/crates/storage/opendal/src/lib.rs @@ -86,6 +86,9 @@ cfg_if! { } } +mod resolving; +pub use resolving::{OpenDalResolvingStorage, OpenDalResolvingStorageFactory}; + /// OpenDAL-based storage factory. /// /// Maps scheme to the corresponding OpenDalStorage storage variant. diff --git a/crates/storage/opendal/src/resolving.rs b/crates/storage/opendal/src/resolving.rs new file mode 100644 index 0000000000..23e3c8865d --- /dev/null +++ b/crates/storage/opendal/src/resolving.rs @@ -0,0 +1,264 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Resolving storage that auto-detects the scheme from a path and delegates +//! to the appropriate [`OpenDalStorage`] variant. + +use std::collections::HashMap; +use std::sync::{Arc, RwLock}; + +use async_trait::async_trait; +use bytes::Bytes; +use iceberg::io::{ + FileMetadata, FileRead, FileWrite, InputFile, OutputFile, Storage, StorageConfig, + StorageFactory, +}; +use iceberg::{Error, ErrorKind, Result}; +use serde::{Deserialize, Serialize}; +use url::Url; + +use crate::OpenDalStorage; +#[cfg(feature = "opendal-s3")] +use crate::s3::CustomAwsCredentialLoader; + +/// Extract the scheme from a path string (e.g., `"s3://bucket/key"` → `"s3"`). +fn extract_scheme(path: &str) -> Result { + let url = Url::parse(path).map_err(|e| { + Error::new( + ErrorKind::DataInvalid, + format!("Invalid path: {path}, failed to parse URL: {e}"), + ) + })?; + Ok(url.scheme().to_string()) +} + +/// Build an [`OpenDalStorage`] variant for the given scheme and config properties. +fn build_storage_for_scheme( + scheme: &str, + props: &HashMap, + #[cfg(feature = "opendal-s3")] customized_credential_load: &Option, +) -> Result { + match scheme { + #[cfg(feature = "opendal-s3")] + "s3" | "s3a" | "s3n" => { + let config = crate::s3::s3_config_parse(props.clone())?; + Ok(OpenDalStorage::S3 { + configured_scheme: scheme.to_string(), + config: Arc::new(config), + customized_credential_load: customized_credential_load.clone(), + }) + } + #[cfg(feature = "opendal-gcs")] + "gs" => { + let config = crate::gcs::gcs_config_parse(props.clone())?; + Ok(OpenDalStorage::Gcs { + config: Arc::new(config), + }) + } + #[cfg(feature = "opendal-oss")] + "oss" => { + let config = crate::oss::oss_config_parse(props.clone())?; + Ok(OpenDalStorage::Oss { + config: Arc::new(config), + }) + } + #[cfg(feature = "opendal-azdls")] + "abfs" | "abfss" | "wasb" | "wasbs" => { + let configured_scheme: crate::azdls::AzureStorageScheme = scheme.parse()?; + let config = crate::azdls::azdls_config_parse(props.clone())?; + Ok(OpenDalStorage::Azdls { + configured_scheme, + config: Arc::new(config), + }) + } + #[cfg(feature = "opendal-fs")] + "file" => Ok(OpenDalStorage::LocalFs), + #[cfg(feature = "opendal-memory")] + "memory" => Ok(OpenDalStorage::Memory(crate::memory::memory_config_build()?)), + _ => Err(Error::new( + ErrorKind::FeatureUnsupported, + format!("Unsupported storage scheme: {scheme}"), + )), + } +} + +/// A resolving storage factory that creates [`OpenDalResolvingStorage`] instances. +/// +/// This factory accepts paths from any supported storage system and dynamically +/// delegates operations to the appropriate [`OpenDalStorage`] variant based on +/// the path scheme. +/// +/// # Example +/// +/// ```rust,ignore +/// use std::sync::Arc; +/// use iceberg::io::FileIOBuilder; +/// use iceberg_storage_opendal::OpenDalResolvingStorageFactory; +/// +/// let factory = OpenDalResolvingStorageFactory::new(); +/// let file_io = FileIOBuilder::new(Arc::new(factory)) +/// .with_prop("s3.region", "us-east-1") +/// .build(); +/// ``` +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct OpenDalResolvingStorageFactory { + /// Custom AWS credential loader for S3 storage. + #[cfg(feature = "opendal-s3")] + #[serde(skip)] + customized_credential_load: Option, +} + +impl Default for OpenDalResolvingStorageFactory { + fn default() -> Self { + Self::new() + } +} + +impl OpenDalResolvingStorageFactory { + /// Create a new resolving storage factory. + pub fn new() -> Self { + Self { + #[cfg(feature = "opendal-s3")] + customized_credential_load: None, + } + } + + /// Set a custom AWS credential loader for S3 storage. + #[cfg(feature = "opendal-s3")] + pub fn with_s3_credential_loader(mut self, loader: CustomAwsCredentialLoader) -> Self { + self.customized_credential_load = Some(loader); + self + } +} + +#[typetag::serde] +impl StorageFactory for OpenDalResolvingStorageFactory { + fn build(&self, config: &StorageConfig) -> Result> { + Ok(Arc::new(OpenDalResolvingStorage { + props: config.props().clone(), + storages: RwLock::new(HashMap::new()), + #[cfg(feature = "opendal-s3")] + customized_credential_load: self.customized_credential_load.clone(), + })) + } +} + +/// A resolving storage that auto-detects the scheme from a path and delegates +/// to the appropriate [`OpenDalStorage`] variant. +/// +/// Sub-storages are lazily created on first use for each scheme and cached +/// for subsequent operations. +#[derive(Debug, Serialize, Deserialize)] +pub struct OpenDalResolvingStorage { + /// Configuration properties shared across all backends. + props: HashMap, + /// Cache of scheme → storage mappings. + #[serde(skip, default)] + storages: RwLock>>, + /// Custom AWS credential loader for S3 storage. + #[cfg(feature = "opendal-s3")] + #[serde(skip)] + customized_credential_load: Option, +} + +impl OpenDalResolvingStorage { + /// Resolve the storage for the given path by extracting the scheme and + /// returning the cached or newly-created [`OpenDalStorage`]. + fn resolve(&self, path: &str) -> Result> { + let scheme = extract_scheme(path)?; + + // Fast path: check read lock first. + let cache = self + .storages + .read() + .map_err(|_| Error::new(ErrorKind::Unexpected, "Storage cache lock poisoned"))?; + if let Some(storage) = cache.get(&scheme) { + return Ok(storage.clone()); + } + + // Slow path: build and insert under write lock. + let mut cache = self + .storages + .write() + .map_err(|_| Error::new(ErrorKind::Unexpected, "Storage cache lock poisoned"))?; + + // Double-check after acquiring write lock. + if let Some(storage) = cache.get(&scheme) { + return Ok(storage.clone()); + } + + let storage = build_storage_for_scheme( + &scheme, + &self.props, + #[cfg(feature = "opendal-s3")] + &self.customized_credential_load, + )?; + let storage = Arc::new(storage); + cache.insert(scheme, storage.clone()); + Ok(storage) + } +} + +#[async_trait] +#[typetag::serde] +impl Storage for OpenDalResolvingStorage { + async fn exists(&self, path: &str) -> Result { + self.resolve(path)?.exists(path).await + } + + async fn metadata(&self, path: &str) -> Result { + self.resolve(path)?.metadata(path).await + } + + async fn read(&self, path: &str) -> Result { + self.resolve(path)?.read(path).await + } + + async fn reader(&self, path: &str) -> Result> { + self.resolve(path)?.reader(path).await + } + + async fn write(&self, path: &str, bs: Bytes) -> Result<()> { + self.resolve(path)?.write(path, bs).await + } + + async fn writer(&self, path: &str) -> Result> { + self.resolve(path)?.writer(path).await + } + + async fn delete(&self, path: &str) -> Result<()> { + self.resolve(path)?.delete(path).await + } + + async fn delete_prefix(&self, path: &str) -> Result<()> { + self.resolve(path)?.delete_prefix(path).await + } + + fn new_input(&self, path: &str) -> Result { + Ok(InputFile::new( + Arc::new(self.resolve(path)?.as_ref().clone()), + path.to_string(), + )) + } + + fn new_output(&self, path: &str) -> Result { + Ok(OutputFile::new( + Arc::new(self.resolve(path)?.as_ref().clone()), + path.to_string(), + )) + } +} diff --git a/crates/storage/opendal/tests/resolving_storage_test.rs b/crates/storage/opendal/tests/resolving_storage_test.rs new file mode 100644 index 0000000000..8701d13f77 --- /dev/null +++ b/crates/storage/opendal/tests/resolving_storage_test.rs @@ -0,0 +1,605 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Integration tests for OpenDalResolvingStorage. +//! +//! These tests assume Docker containers are started externally via `make docker-up`. +//! Each test uses unique file paths based on module path to avoid conflicts. +#[cfg(feature = "opendal-s3")] +mod tests { + use std::sync::Arc; + + use iceberg::io::{ + FileIOBuilder, S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY, + }; + use iceberg_storage_opendal::OpenDalResolvingStorageFactory; + use iceberg_test_utils::{get_minio_endpoint, normalize_test_name_with_parts, set_up}; + + fn get_resolving_file_io() -> iceberg::io::FileIO { + set_up(); + + let minio_endpoint = get_minio_endpoint(); + + FileIOBuilder::new(Arc::new(OpenDalResolvingStorageFactory::new())) + .with_props(vec![ + (S3_ENDPOINT, minio_endpoint), + (S3_ACCESS_KEY_ID, "admin".to_string()), + (S3_SECRET_ACCESS_KEY, "password".to_string()), + (S3_REGION, "us-east-1".to_string()), + ]) + .build() + } + + #[tokio::test] + async fn test_resolving_storage_s3_exists() { + let file_io = get_resolving_file_io(); + assert!(!file_io.exists("s3://bucket2/any").await.unwrap()); + assert!(file_io.exists("s3://bucket1/").await.unwrap()); + } + + #[tokio::test] + async fn test_resolving_storage_s3_write_and_read() { + let file_io = get_resolving_file_io(); + let path = format!( + "s3://bucket1/{}", + normalize_test_name_with_parts!("test_resolving_storage_s3_write_and_read") + ); + + // Clean up from any previous test runs + let _ = file_io.delete(&path).await; + assert!(!file_io.exists(&path).await.unwrap()); + + // Write via output file + let output_file = file_io.new_output(&path).unwrap(); + output_file.write("hello_resolving".into()).await.unwrap(); + assert!(file_io.exists(&path).await.unwrap()); + + // Read via input file + let input_file = file_io.new_input(&path).unwrap(); + let content = input_file.read().await.unwrap(); + assert_eq!(content, bytes::Bytes::from("hello_resolving")); + } + + #[tokio::test] + async fn test_resolving_storage_s3_delete() { + let file_io = get_resolving_file_io(); + let path = format!( + "s3://bucket1/{}", + normalize_test_name_with_parts!("test_resolving_storage_s3_delete") + ); + + // Write a file first + let output_file = file_io.new_output(&path).unwrap(); + output_file.write("to_delete".into()).await.unwrap(); + assert!(file_io.exists(&path).await.unwrap()); + + // Delete it + file_io.delete(&path).await.unwrap(); + assert!(!file_io.exists(&path).await.unwrap()); + } + + #[tokio::test] + async fn test_resolving_storage_caches_backend() { + let file_io = get_resolving_file_io(); + let path1 = format!( + "s3://bucket1/{}", + normalize_test_name_with_parts!("test_resolving_storage_caches_backend_1") + ); + let path2 = format!( + "s3://bucket1/{}", + normalize_test_name_with_parts!("test_resolving_storage_caches_backend_2") + ); + + // Both operations should succeed, using the same cached S3 backend + let output1 = file_io.new_output(&path1).unwrap(); + output1.write("file1".into()).await.unwrap(); + + let output2 = file_io.new_output(&path2).unwrap(); + output2.write("file2".into()).await.unwrap(); + + let input1 = file_io.new_input(&path1).unwrap(); + assert_eq!(input1.read().await.unwrap(), bytes::Bytes::from("file1")); + + let input2 = file_io.new_input(&path2).unwrap(); + assert_eq!(input2.read().await.unwrap(), bytes::Bytes::from("file2")); + } + + #[tokio::test] + async fn test_resolving_storage_invalid_scheme() { + let file_io = get_resolving_file_io(); + let result = file_io.exists("unknown://bucket/key").await; + assert!(result.is_err()); + let err = result.unwrap_err(); + assert!( + err.to_string().contains("Unsupported storage scheme"), + "Expected unsupported scheme error, got: {err}" + ); + } + + #[tokio::test] + async fn test_resolving_storage_missing_scheme() { + let file_io = get_resolving_file_io(); + let result = file_io.exists("no-scheme-path").await; + assert!(result.is_err()); + let err = result.unwrap_err(); + assert!( + err.to_string().contains("missing scheme"), + "Expected missing scheme error, got: {err}" + ); + } + + #[cfg(feature = "opendal-s3")] + #[tokio::test] + async fn test_resolving_storage_with_custom_credential_loader() { + use async_trait::async_trait; + use iceberg_storage_opendal::CustomAwsCredentialLoader; + use reqsign::{AwsCredential, AwsCredentialLoad}; + use reqwest::Client; + + struct MinioCredentialLoader; + + #[async_trait] + impl AwsCredentialLoad for MinioCredentialLoader { + async fn load_credential( + &self, + _client: Client, + ) -> anyhow::Result> { + Ok(Some(AwsCredential { + access_key_id: "admin".to_string(), + secret_access_key: "password".to_string(), + session_token: None, + expires_in: None, + })) + } + } + + set_up(); + let minio_endpoint = get_minio_endpoint(); + + let factory = OpenDalResolvingStorageFactory::new().with_s3_credential_loader( + CustomAwsCredentialLoader::new(Arc::new(MinioCredentialLoader)), + ); + + let file_io = FileIOBuilder::new(Arc::new(factory)) + .with_props(vec![ + (S3_ENDPOINT, minio_endpoint), + (S3_REGION, "us-east-1".to_string()), + ]) + .build(); + + // Should be able to access S3 using the custom credential loader + assert!(file_io.exists("s3://bucket1/").await.unwrap()); + } +} + +#[cfg(all(feature = "opendal-s3", feature = "opendal-memory"))] +mod mixed_scheme_tests { + use std::sync::Arc; + + use iceberg::io::{ + FileIOBuilder, S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY, + }; + use iceberg_storage_opendal::OpenDalResolvingStorageFactory; + use iceberg_test_utils::{get_minio_endpoint, normalize_test_name_with_parts, set_up}; + + fn get_resolving_file_io() -> iceberg::io::FileIO { + set_up(); + + let minio_endpoint = get_minio_endpoint(); + + FileIOBuilder::new(Arc::new(OpenDalResolvingStorageFactory::new())) + .with_props(vec![ + (S3_ENDPOINT, minio_endpoint), + (S3_ACCESS_KEY_ID, "admin".to_string()), + (S3_SECRET_ACCESS_KEY, "password".to_string()), + (S3_REGION, "us-east-1".to_string()), + ]) + .build() + } + + #[tokio::test] + async fn test_mixed_s3_and_memory_write_read() { + let file_io = get_resolving_file_io(); + + let s3_path = format!( + "s3://bucket1/{}", + normalize_test_name_with_parts!("test_mixed_s3_and_memory_write_read") + ); + let memory_path = "memory://test_mixed_s3_and_memory_write_read"; + + // Write to both schemes through the same FileIO + let s3_output = file_io.new_output(&s3_path).unwrap(); + s3_output.write("s3_content".into()).await.unwrap(); + + let mem_output = file_io.new_output(memory_path).unwrap(); + mem_output.write("memory_content".into()).await.unwrap(); + + // Read back from both schemes + let s3_input = file_io.new_input(&s3_path).unwrap(); + assert_eq!( + s3_input.read().await.unwrap(), + bytes::Bytes::from("s3_content") + ); + + let mem_input = file_io.new_input(memory_path).unwrap(); + assert_eq!( + mem_input.read().await.unwrap(), + bytes::Bytes::from("memory_content") + ); + } + + #[tokio::test] + async fn test_mixed_schemes_exist_independently() { + let file_io = get_resolving_file_io(); + + let s3_path = format!( + "s3://bucket1/{}", + normalize_test_name_with_parts!("test_mixed_schemes_exist_independently") + ); + let memory_path = "memory://test_mixed_schemes_exist_independently"; + + // Clean up S3 from previous runs + let _ = file_io.delete(&s3_path).await; + + // Neither should exist initially + assert!(!file_io.exists(&s3_path).await.unwrap()); + assert!(!file_io.exists(memory_path).await.unwrap()); + + // Write only to S3 + let s3_output = file_io.new_output(&s3_path).unwrap(); + s3_output.write("only_s3".into()).await.unwrap(); + + // S3 path exists, memory path does not + assert!(file_io.exists(&s3_path).await.unwrap()); + assert!(!file_io.exists(memory_path).await.unwrap()); + + // Now write to memory + let mem_output = file_io.new_output(memory_path).unwrap(); + mem_output.write("only_memory".into()).await.unwrap(); + + // Both exist + assert!(file_io.exists(&s3_path).await.unwrap()); + assert!(file_io.exists(memory_path).await.unwrap()); + } + + #[tokio::test] + async fn test_mixed_schemes_delete_one_keeps_other() { + let file_io = get_resolving_file_io(); + + let s3_path = format!( + "s3://bucket1/{}", + normalize_test_name_with_parts!("test_mixed_schemes_delete_one_keeps_other") + ); + let memory_path = "memory://test_mixed_schemes_delete_one_keeps_other"; + + // Write to both + file_io + .new_output(&s3_path) + .unwrap() + .write("s3_data".into()) + .await + .unwrap(); + file_io + .new_output(memory_path) + .unwrap() + .write("mem_data".into()) + .await + .unwrap(); + + // Delete only the S3 file + file_io.delete(&s3_path).await.unwrap(); + + // S3 gone, memory still there + assert!(!file_io.exists(&s3_path).await.unwrap()); + assert!(file_io.exists(memory_path).await.unwrap()); + + let mem_input = file_io.new_input(memory_path).unwrap(); + assert_eq!( + mem_input.read().await.unwrap(), + bytes::Bytes::from("mem_data") + ); + } + + #[tokio::test] + async fn test_mixed_schemes_interleaved_operations() { + let file_io = get_resolving_file_io(); + + let s3_path_1 = format!( + "s3://bucket1/{}", + normalize_test_name_with_parts!("test_mixed_interleaved_1") + ); + let mem_path_1 = "memory://test_mixed_interleaved_1"; + let s3_path_2 = format!( + "s3://bucket1/{}", + normalize_test_name_with_parts!("test_mixed_interleaved_2") + ); + let mem_path_2 = "memory://test_mixed_interleaved_2"; + + // Interleave writes across schemes + file_io + .new_output(&s3_path_1) + .unwrap() + .write("s3_1".into()) + .await + .unwrap(); + file_io + .new_output(mem_path_1) + .unwrap() + .write("mem_1".into()) + .await + .unwrap(); + file_io + .new_output(&s3_path_2) + .unwrap() + .write("s3_2".into()) + .await + .unwrap(); + file_io + .new_output(mem_path_2) + .unwrap() + .write("mem_2".into()) + .await + .unwrap(); + + // Read back all four, interleaved + let r1 = file_io.new_input(mem_path_2).unwrap().read().await.unwrap(); + let r2 = file_io.new_input(&s3_path_1).unwrap().read().await.unwrap(); + let r3 = file_io.new_input(mem_path_1).unwrap().read().await.unwrap(); + let r4 = file_io.new_input(&s3_path_2).unwrap().read().await.unwrap(); + + assert_eq!(r1, bytes::Bytes::from("mem_2")); + assert_eq!(r2, bytes::Bytes::from("s3_1")); + assert_eq!(r3, bytes::Bytes::from("mem_1")); + assert_eq!(r4, bytes::Bytes::from("s3_2")); + } +} + +#[cfg(all( + feature = "opendal-s3", + feature = "opendal-gcs", + feature = "opendal-memory" +))] +mod mixed_s3_gcs_memory_tests { + use std::collections::HashMap; + use std::sync::Arc; + + use iceberg::io::{ + FileIOBuilder, GCS_NO_AUTH, GCS_SERVICE_PATH, S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, + S3_SECRET_ACCESS_KEY, + }; + use iceberg_storage_opendal::OpenDalResolvingStorageFactory; + use iceberg_test_utils::{ + get_gcs_endpoint, get_minio_endpoint, normalize_test_name_with_parts, set_up, + }; + + static FAKE_GCS_BUCKET: &str = "resolving-test-bucket"; + + async fn ensure_gcs_bucket(endpoint: &str) { + let mut bucket_data = HashMap::new(); + bucket_data.insert("name", FAKE_GCS_BUCKET); + let client = reqwest::Client::new(); + let url = format!("{endpoint}/storage/v1/b"); + let _ = client.post(url).json(&bucket_data).send().await; + } + + async fn get_resolving_file_io() -> iceberg::io::FileIO { + set_up(); + + let minio_endpoint = get_minio_endpoint(); + let gcs_endpoint = get_gcs_endpoint(); + + ensure_gcs_bucket(&gcs_endpoint).await; + + FileIOBuilder::new(Arc::new(OpenDalResolvingStorageFactory::new())) + .with_props(vec![ + (S3_ENDPOINT, minio_endpoint), + (S3_ACCESS_KEY_ID, "admin".to_string()), + (S3_SECRET_ACCESS_KEY, "password".to_string()), + (S3_REGION, "us-east-1".to_string()), + (GCS_SERVICE_PATH, gcs_endpoint), + (GCS_NO_AUTH, "true".to_string()), + ]) + .build() + } + + #[tokio::test] + async fn test_three_scheme_write_and_read() { + let file_io = get_resolving_file_io().await; + + let s3_path = format!( + "s3://bucket1/{}", + normalize_test_name_with_parts!("test_three_scheme_write_and_read") + ); + let gcs_path = format!( + "gs://{FAKE_GCS_BUCKET}/{}", + normalize_test_name_with_parts!("test_three_scheme_write_and_read") + ); + let mem_path = "memory://test_three_scheme_write_and_read"; + + // Write to all three schemes + file_io + .new_output(&s3_path) + .unwrap() + .write("from_s3".into()) + .await + .unwrap(); + file_io + .new_output(&gcs_path) + .unwrap() + .write("from_gcs".into()) + .await + .unwrap(); + file_io + .new_output(mem_path) + .unwrap() + .write("from_memory".into()) + .await + .unwrap(); + + // Read back from all three + assert_eq!( + file_io.new_input(&s3_path).unwrap().read().await.unwrap(), + bytes::Bytes::from("from_s3") + ); + assert_eq!( + file_io.new_input(&gcs_path).unwrap().read().await.unwrap(), + bytes::Bytes::from("from_gcs") + ); + assert_eq!( + file_io.new_input(mem_path).unwrap().read().await.unwrap(), + bytes::Bytes::from("from_memory") + ); + } + + #[tokio::test] + async fn test_three_scheme_exists_independently() { + let file_io = get_resolving_file_io().await; + + let s3_path = format!( + "s3://bucket1/{}", + normalize_test_name_with_parts!("test_three_scheme_exists_independently") + ); + let gcs_path = format!( + "gs://{FAKE_GCS_BUCKET}/{}", + normalize_test_name_with_parts!("test_three_scheme_exists_independently") + ); + let mem_path = "memory://test_three_scheme_exists_independently"; + + // Clean up from previous runs + let _ = file_io.delete(&s3_path).await; + let _ = file_io.delete(&gcs_path).await; + + // None exist initially + assert!(!file_io.exists(&s3_path).await.unwrap()); + assert!(!file_io.exists(&gcs_path).await.unwrap()); + assert!(!file_io.exists(mem_path).await.unwrap()); + + // Write only to GCS + file_io + .new_output(&gcs_path) + .unwrap() + .write("gcs_only".into()) + .await + .unwrap(); + + // Only GCS exists + assert!(!file_io.exists(&s3_path).await.unwrap()); + assert!(file_io.exists(&gcs_path).await.unwrap()); + assert!(!file_io.exists(mem_path).await.unwrap()); + } + + #[tokio::test] + async fn test_three_scheme_interleaved_operations() { + let file_io = get_resolving_file_io().await; + + let s3_path = format!( + "s3://bucket1/{}", + normalize_test_name_with_parts!("test_three_scheme_interleaved") + ); + let gcs_path = format!( + "gs://{FAKE_GCS_BUCKET}/{}", + normalize_test_name_with_parts!("test_three_scheme_interleaved") + ); + let mem_path = "memory://test_three_scheme_interleaved"; + + // Interleave: write gcs, write memory, write s3 + file_io + .new_output(&gcs_path) + .unwrap() + .write("gcs_data".into()) + .await + .unwrap(); + file_io + .new_output(mem_path) + .unwrap() + .write("mem_data".into()) + .await + .unwrap(); + file_io + .new_output(&s3_path) + .unwrap() + .write("s3_data".into()) + .await + .unwrap(); + + // Read in reverse order: s3, memory, gcs + assert_eq!( + file_io.new_input(&s3_path).unwrap().read().await.unwrap(), + bytes::Bytes::from("s3_data") + ); + assert_eq!( + file_io.new_input(mem_path).unwrap().read().await.unwrap(), + bytes::Bytes::from("mem_data") + ); + assert_eq!( + file_io.new_input(&gcs_path).unwrap().read().await.unwrap(), + bytes::Bytes::from("gcs_data") + ); + } + + #[tokio::test] + async fn test_three_scheme_delete_one_keeps_others() { + let file_io = get_resolving_file_io().await; + + let s3_path = format!( + "s3://bucket1/{}", + normalize_test_name_with_parts!("test_three_scheme_delete_one_keeps_others") + ); + let gcs_path = format!( + "gs://{FAKE_GCS_BUCKET}/{}", + normalize_test_name_with_parts!("test_three_scheme_delete_one_keeps_others") + ); + let mem_path = "memory://test_three_scheme_delete_one_keeps_others"; + + // Write to all three + file_io + .new_output(&s3_path) + .unwrap() + .write("s3".into()) + .await + .unwrap(); + file_io + .new_output(&gcs_path) + .unwrap() + .write("gcs".into()) + .await + .unwrap(); + file_io + .new_output(mem_path) + .unwrap() + .write("mem".into()) + .await + .unwrap(); + + // Delete only GCS + file_io.delete(&gcs_path).await.unwrap(); + + // GCS gone, S3 and memory still there + assert!(file_io.exists(&s3_path).await.unwrap()); + assert!(!file_io.exists(&gcs_path).await.unwrap()); + assert!(file_io.exists(mem_path).await.unwrap()); + + assert_eq!( + file_io.new_input(&s3_path).unwrap().read().await.unwrap(), + bytes::Bytes::from("s3") + ); + assert_eq!( + file_io.new_input(mem_path).unwrap().read().await.unwrap(), + bytes::Bytes::from("mem") + ); + } +} From 0a5de6918bfa3c097455181264d8f17b70e25e54 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Wed, 11 Mar 2026 16:24:08 -0700 Subject: [PATCH 2/8] optimize tests --- .../opendal/tests/resolving_storage_test.rs | 583 +++++------------- 1 file changed, 138 insertions(+), 445 deletions(-) diff --git a/crates/storage/opendal/tests/resolving_storage_test.rs b/crates/storage/opendal/tests/resolving_storage_test.rs index 8701d13f77..256ba680f2 100644 --- a/crates/storage/opendal/tests/resolving_storage_test.rs +++ b/crates/storage/opendal/tests/resolving_storage_test.rs @@ -19,175 +19,13 @@ //! //! These tests assume Docker containers are started externally via `make docker-up`. //! Each test uses unique file paths based on module path to avoid conflicts. -#[cfg(feature = "opendal-s3")] -mod tests { - use std::sync::Arc; - - use iceberg::io::{ - FileIOBuilder, S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY, - }; - use iceberg_storage_opendal::OpenDalResolvingStorageFactory; - use iceberg_test_utils::{get_minio_endpoint, normalize_test_name_with_parts, set_up}; - - fn get_resolving_file_io() -> iceberg::io::FileIO { - set_up(); - - let minio_endpoint = get_minio_endpoint(); - - FileIOBuilder::new(Arc::new(OpenDalResolvingStorageFactory::new())) - .with_props(vec![ - (S3_ENDPOINT, minio_endpoint), - (S3_ACCESS_KEY_ID, "admin".to_string()), - (S3_SECRET_ACCESS_KEY, "password".to_string()), - (S3_REGION, "us-east-1".to_string()), - ]) - .build() - } - - #[tokio::test] - async fn test_resolving_storage_s3_exists() { - let file_io = get_resolving_file_io(); - assert!(!file_io.exists("s3://bucket2/any").await.unwrap()); - assert!(file_io.exists("s3://bucket1/").await.unwrap()); - } - - #[tokio::test] - async fn test_resolving_storage_s3_write_and_read() { - let file_io = get_resolving_file_io(); - let path = format!( - "s3://bucket1/{}", - normalize_test_name_with_parts!("test_resolving_storage_s3_write_and_read") - ); - - // Clean up from any previous test runs - let _ = file_io.delete(&path).await; - assert!(!file_io.exists(&path).await.unwrap()); - - // Write via output file - let output_file = file_io.new_output(&path).unwrap(); - output_file.write("hello_resolving".into()).await.unwrap(); - assert!(file_io.exists(&path).await.unwrap()); - - // Read via input file - let input_file = file_io.new_input(&path).unwrap(); - let content = input_file.read().await.unwrap(); - assert_eq!(content, bytes::Bytes::from("hello_resolving")); - } - - #[tokio::test] - async fn test_resolving_storage_s3_delete() { - let file_io = get_resolving_file_io(); - let path = format!( - "s3://bucket1/{}", - normalize_test_name_with_parts!("test_resolving_storage_s3_delete") - ); - - // Write a file first - let output_file = file_io.new_output(&path).unwrap(); - output_file.write("to_delete".into()).await.unwrap(); - assert!(file_io.exists(&path).await.unwrap()); - // Delete it - file_io.delete(&path).await.unwrap(); - assert!(!file_io.exists(&path).await.unwrap()); - } - - #[tokio::test] - async fn test_resolving_storage_caches_backend() { - let file_io = get_resolving_file_io(); - let path1 = format!( - "s3://bucket1/{}", - normalize_test_name_with_parts!("test_resolving_storage_caches_backend_1") - ); - let path2 = format!( - "s3://bucket1/{}", - normalize_test_name_with_parts!("test_resolving_storage_caches_backend_2") - ); - - // Both operations should succeed, using the same cached S3 backend - let output1 = file_io.new_output(&path1).unwrap(); - output1.write("file1".into()).await.unwrap(); - - let output2 = file_io.new_output(&path2).unwrap(); - output2.write("file2".into()).await.unwrap(); - - let input1 = file_io.new_input(&path1).unwrap(); - assert_eq!(input1.read().await.unwrap(), bytes::Bytes::from("file1")); - - let input2 = file_io.new_input(&path2).unwrap(); - assert_eq!(input2.read().await.unwrap(), bytes::Bytes::from("file2")); - } - - #[tokio::test] - async fn test_resolving_storage_invalid_scheme() { - let file_io = get_resolving_file_io(); - let result = file_io.exists("unknown://bucket/key").await; - assert!(result.is_err()); - let err = result.unwrap_err(); - assert!( - err.to_string().contains("Unsupported storage scheme"), - "Expected unsupported scheme error, got: {err}" - ); - } - - #[tokio::test] - async fn test_resolving_storage_missing_scheme() { - let file_io = get_resolving_file_io(); - let result = file_io.exists("no-scheme-path").await; - assert!(result.is_err()); - let err = result.unwrap_err(); - assert!( - err.to_string().contains("missing scheme"), - "Expected missing scheme error, got: {err}" - ); - } - - #[cfg(feature = "opendal-s3")] - #[tokio::test] - async fn test_resolving_storage_with_custom_credential_loader() { - use async_trait::async_trait; - use iceberg_storage_opendal::CustomAwsCredentialLoader; - use reqsign::{AwsCredential, AwsCredentialLoad}; - use reqwest::Client; - - struct MinioCredentialLoader; - - #[async_trait] - impl AwsCredentialLoad for MinioCredentialLoader { - async fn load_credential( - &self, - _client: Client, - ) -> anyhow::Result> { - Ok(Some(AwsCredential { - access_key_id: "admin".to_string(), - secret_access_key: "password".to_string(), - session_token: None, - expires_in: None, - })) - } - } - - set_up(); - let minio_endpoint = get_minio_endpoint(); - - let factory = OpenDalResolvingStorageFactory::new().with_s3_credential_loader( - CustomAwsCredentialLoader::new(Arc::new(MinioCredentialLoader)), - ); - - let file_io = FileIOBuilder::new(Arc::new(factory)) - .with_props(vec![ - (S3_ENDPOINT, minio_endpoint), - (S3_REGION, "us-east-1".to_string()), - ]) - .build(); - - // Should be able to access S3 using the custom credential loader - assert!(file_io.exists("s3://bucket1/").await.unwrap()); - } -} - -#[cfg(all(feature = "opendal-s3", feature = "opendal-memory"))] -mod mixed_scheme_tests { +#[cfg(all( + feature = "opendal-s3", + feature = "opendal-fs", + feature = "opendal-memory" +))] +mod tests { use std::sync::Arc; use iceberg::io::{ @@ -211,224 +49,25 @@ mod mixed_scheme_tests { .build() } - #[tokio::test] - async fn test_mixed_s3_and_memory_write_read() { - let file_io = get_resolving_file_io(); - - let s3_path = format!( - "s3://bucket1/{}", - normalize_test_name_with_parts!("test_mixed_s3_and_memory_write_read") - ); - let memory_path = "memory://test_mixed_s3_and_memory_write_read"; - - // Write to both schemes through the same FileIO - let s3_output = file_io.new_output(&s3_path).unwrap(); - s3_output.write("s3_content".into()).await.unwrap(); - - let mem_output = file_io.new_output(memory_path).unwrap(); - mem_output.write("memory_content".into()).await.unwrap(); - - // Read back from both schemes - let s3_input = file_io.new_input(&s3_path).unwrap(); - assert_eq!( - s3_input.read().await.unwrap(), - bytes::Bytes::from("s3_content") - ); - - let mem_input = file_io.new_input(memory_path).unwrap(); - assert_eq!( - mem_input.read().await.unwrap(), - bytes::Bytes::from("memory_content") - ); - } - - #[tokio::test] - async fn test_mixed_schemes_exist_independently() { - let file_io = get_resolving_file_io(); - - let s3_path = format!( - "s3://bucket1/{}", - normalize_test_name_with_parts!("test_mixed_schemes_exist_independently") - ); - let memory_path = "memory://test_mixed_schemes_exist_independently"; - - // Clean up S3 from previous runs - let _ = file_io.delete(&s3_path).await; - - // Neither should exist initially - assert!(!file_io.exists(&s3_path).await.unwrap()); - assert!(!file_io.exists(memory_path).await.unwrap()); - - // Write only to S3 - let s3_output = file_io.new_output(&s3_path).unwrap(); - s3_output.write("only_s3".into()).await.unwrap(); - - // S3 path exists, memory path does not - assert!(file_io.exists(&s3_path).await.unwrap()); - assert!(!file_io.exists(memory_path).await.unwrap()); - - // Now write to memory - let mem_output = file_io.new_output(memory_path).unwrap(); - mem_output.write("only_memory".into()).await.unwrap(); - - // Both exist - assert!(file_io.exists(&s3_path).await.unwrap()); - assert!(file_io.exists(memory_path).await.unwrap()); - } - - #[tokio::test] - async fn test_mixed_schemes_delete_one_keeps_other() { - let file_io = get_resolving_file_io(); - - let s3_path = format!( - "s3://bucket1/{}", - normalize_test_name_with_parts!("test_mixed_schemes_delete_one_keeps_other") - ); - let memory_path = "memory://test_mixed_schemes_delete_one_keeps_other"; - - // Write to both - file_io - .new_output(&s3_path) - .unwrap() - .write("s3_data".into()) - .await - .unwrap(); - file_io - .new_output(memory_path) - .unwrap() - .write("mem_data".into()) - .await - .unwrap(); - - // Delete only the S3 file - file_io.delete(&s3_path).await.unwrap(); - - // S3 gone, memory still there - assert!(!file_io.exists(&s3_path).await.unwrap()); - assert!(file_io.exists(memory_path).await.unwrap()); - - let mem_input = file_io.new_input(memory_path).unwrap(); - assert_eq!( - mem_input.read().await.unwrap(), - bytes::Bytes::from("mem_data") - ); + fn temp_fs_path(name: &str) -> String { + let dir = std::env::temp_dir().join("iceberg_resolving_tests"); + std::fs::create_dir_all(&dir).unwrap(); + let path = dir.join(name); + // Clean up from previous runs + let _ = std::fs::remove_file(&path); + format!("file:/{}", path.display()) } #[tokio::test] - async fn test_mixed_schemes_interleaved_operations() { + async fn test_mixed_scheme_write_and_read() { let file_io = get_resolving_file_io(); - let s3_path_1 = format!( - "s3://bucket1/{}", - normalize_test_name_with_parts!("test_mixed_interleaved_1") - ); - let mem_path_1 = "memory://test_mixed_interleaved_1"; - let s3_path_2 = format!( - "s3://bucket1/{}", - normalize_test_name_with_parts!("test_mixed_interleaved_2") - ); - let mem_path_2 = "memory://test_mixed_interleaved_2"; - - // Interleave writes across schemes - file_io - .new_output(&s3_path_1) - .unwrap() - .write("s3_1".into()) - .await - .unwrap(); - file_io - .new_output(mem_path_1) - .unwrap() - .write("mem_1".into()) - .await - .unwrap(); - file_io - .new_output(&s3_path_2) - .unwrap() - .write("s3_2".into()) - .await - .unwrap(); - file_io - .new_output(mem_path_2) - .unwrap() - .write("mem_2".into()) - .await - .unwrap(); - - // Read back all four, interleaved - let r1 = file_io.new_input(mem_path_2).unwrap().read().await.unwrap(); - let r2 = file_io.new_input(&s3_path_1).unwrap().read().await.unwrap(); - let r3 = file_io.new_input(mem_path_1).unwrap().read().await.unwrap(); - let r4 = file_io.new_input(&s3_path_2).unwrap().read().await.unwrap(); - - assert_eq!(r1, bytes::Bytes::from("mem_2")); - assert_eq!(r2, bytes::Bytes::from("s3_1")); - assert_eq!(r3, bytes::Bytes::from("mem_1")); - assert_eq!(r4, bytes::Bytes::from("s3_2")); - } -} - -#[cfg(all( - feature = "opendal-s3", - feature = "opendal-gcs", - feature = "opendal-memory" -))] -mod mixed_s3_gcs_memory_tests { - use std::collections::HashMap; - use std::sync::Arc; - - use iceberg::io::{ - FileIOBuilder, GCS_NO_AUTH, GCS_SERVICE_PATH, S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, - S3_SECRET_ACCESS_KEY, - }; - use iceberg_storage_opendal::OpenDalResolvingStorageFactory; - use iceberg_test_utils::{ - get_gcs_endpoint, get_minio_endpoint, normalize_test_name_with_parts, set_up, - }; - - static FAKE_GCS_BUCKET: &str = "resolving-test-bucket"; - - async fn ensure_gcs_bucket(endpoint: &str) { - let mut bucket_data = HashMap::new(); - bucket_data.insert("name", FAKE_GCS_BUCKET); - let client = reqwest::Client::new(); - let url = format!("{endpoint}/storage/v1/b"); - let _ = client.post(url).json(&bucket_data).send().await; - } - - async fn get_resolving_file_io() -> iceberg::io::FileIO { - set_up(); - - let minio_endpoint = get_minio_endpoint(); - let gcs_endpoint = get_gcs_endpoint(); - - ensure_gcs_bucket(&gcs_endpoint).await; - - FileIOBuilder::new(Arc::new(OpenDalResolvingStorageFactory::new())) - .with_props(vec![ - (S3_ENDPOINT, minio_endpoint), - (S3_ACCESS_KEY_ID, "admin".to_string()), - (S3_SECRET_ACCESS_KEY, "password".to_string()), - (S3_REGION, "us-east-1".to_string()), - (GCS_SERVICE_PATH, gcs_endpoint), - (GCS_NO_AUTH, "true".to_string()), - ]) - .build() - } - - #[tokio::test] - async fn test_three_scheme_write_and_read() { - let file_io = get_resolving_file_io().await; - let s3_path = format!( "s3://bucket1/{}", - normalize_test_name_with_parts!("test_three_scheme_write_and_read") - ); - let gcs_path = format!( - "gs://{FAKE_GCS_BUCKET}/{}", - normalize_test_name_with_parts!("test_three_scheme_write_and_read") + normalize_test_name_with_parts!("test_mixed_scheme_write_and_read") ); - let mem_path = "memory://test_three_scheme_write_and_read"; + let fs_path = temp_fs_path("mixed_write_and_read.txt"); + let mem_path = "memory://test_mixed_scheme_write_and_read"; // Write to all three schemes file_io @@ -438,9 +77,9 @@ mod mixed_s3_gcs_memory_tests { .await .unwrap(); file_io - .new_output(&gcs_path) + .new_output(&fs_path) .unwrap() - .write("from_gcs".into()) + .write("from_fs".into()) .await .unwrap(); file_io @@ -456,8 +95,8 @@ mod mixed_s3_gcs_memory_tests { bytes::Bytes::from("from_s3") ); assert_eq!( - file_io.new_input(&gcs_path).unwrap().read().await.unwrap(), - bytes::Bytes::from("from_gcs") + file_io.new_input(&fs_path).unwrap().read().await.unwrap(), + bytes::Bytes::from("from_fs") ); assert_eq!( file_io.new_input(mem_path).unwrap().read().await.unwrap(), @@ -466,140 +105,194 @@ mod mixed_s3_gcs_memory_tests { } #[tokio::test] - async fn test_three_scheme_exists_independently() { - let file_io = get_resolving_file_io().await; + async fn test_mixed_scheme_exists_independently() { + let file_io = get_resolving_file_io(); let s3_path = format!( "s3://bucket1/{}", - normalize_test_name_with_parts!("test_three_scheme_exists_independently") - ); - let gcs_path = format!( - "gs://{FAKE_GCS_BUCKET}/{}", - normalize_test_name_with_parts!("test_three_scheme_exists_independently") + normalize_test_name_with_parts!("test_mixed_scheme_exists_independently") ); - let mem_path = "memory://test_three_scheme_exists_independently"; + let fs_path = temp_fs_path("mixed_exists_independently.txt"); + let mem_path = "memory://test_mixed_scheme_exists_independently"; - // Clean up from previous runs + // Clean up S3 from previous runs let _ = file_io.delete(&s3_path).await; - let _ = file_io.delete(&gcs_path).await; // None exist initially assert!(!file_io.exists(&s3_path).await.unwrap()); - assert!(!file_io.exists(&gcs_path).await.unwrap()); + assert!(!file_io.exists(&fs_path).await.unwrap()); assert!(!file_io.exists(mem_path).await.unwrap()); - // Write only to GCS + // Write only to fs file_io - .new_output(&gcs_path) + .new_output(&fs_path) .unwrap() - .write("gcs_only".into()) + .write("fs_only".into()) .await .unwrap(); - // Only GCS exists + // Only fs exists assert!(!file_io.exists(&s3_path).await.unwrap()); - assert!(file_io.exists(&gcs_path).await.unwrap()); + assert!(file_io.exists(&fs_path).await.unwrap()); assert!(!file_io.exists(mem_path).await.unwrap()); } #[tokio::test] - async fn test_three_scheme_interleaved_operations() { - let file_io = get_resolving_file_io().await; + async fn test_mixed_scheme_delete_one_keeps_others() { + let file_io = get_resolving_file_io(); let s3_path = format!( "s3://bucket1/{}", - normalize_test_name_with_parts!("test_three_scheme_interleaved") + normalize_test_name_with_parts!("test_mixed_scheme_delete_one_keeps_others") ); - let gcs_path = format!( - "gs://{FAKE_GCS_BUCKET}/{}", - normalize_test_name_with_parts!("test_three_scheme_interleaved") - ); - let mem_path = "memory://test_three_scheme_interleaved"; + let fs_path = temp_fs_path("mixed_delete_one_keeps_others.txt"); + let mem_path = "memory://test_mixed_scheme_delete_one_keeps_others"; - // Interleave: write gcs, write memory, write s3 + // Write to all three file_io - .new_output(&gcs_path) + .new_output(&s3_path) .unwrap() - .write("gcs_data".into()) + .write("s3".into()) .await .unwrap(); file_io - .new_output(mem_path) + .new_output(&fs_path) .unwrap() - .write("mem_data".into()) + .write("fs".into()) .await .unwrap(); file_io - .new_output(&s3_path) + .new_output(mem_path) .unwrap() - .write("s3_data".into()) + .write("mem".into()) .await .unwrap(); - // Read in reverse order: s3, memory, gcs + // Delete only the fs file + file_io.delete(&fs_path).await.unwrap(); + + // fs gone, S3 and memory still there + assert!(file_io.exists(&s3_path).await.unwrap()); + assert!(!file_io.exists(&fs_path).await.unwrap()); + assert!(file_io.exists(mem_path).await.unwrap()); + assert_eq!( file_io.new_input(&s3_path).unwrap().read().await.unwrap(), - bytes::Bytes::from("s3_data") + bytes::Bytes::from("s3") ); assert_eq!( file_io.new_input(mem_path).unwrap().read().await.unwrap(), - bytes::Bytes::from("mem_data") - ); - assert_eq!( - file_io.new_input(&gcs_path).unwrap().read().await.unwrap(), - bytes::Bytes::from("gcs_data") + bytes::Bytes::from("mem") ); } #[tokio::test] - async fn test_three_scheme_delete_one_keeps_others() { - let file_io = get_resolving_file_io().await; + async fn test_mixed_scheme_interleaved_operations() { + let file_io = get_resolving_file_io(); let s3_path = format!( "s3://bucket1/{}", - normalize_test_name_with_parts!("test_three_scheme_delete_one_keeps_others") + normalize_test_name_with_parts!("test_mixed_scheme_interleaved") ); - let gcs_path = format!( - "gs://{FAKE_GCS_BUCKET}/{}", - normalize_test_name_with_parts!("test_three_scheme_delete_one_keeps_others") - ); - let mem_path = "memory://test_three_scheme_delete_one_keeps_others"; + let fs_path = temp_fs_path("mixed_interleaved.txt"); + let mem_path = "memory://test_mixed_scheme_interleaved"; - // Write to all three + // Interleave: write fs, write memory, write s3 file_io - .new_output(&s3_path) + .new_output(&fs_path) .unwrap() - .write("s3".into()) + .write("fs_data".into()) .await .unwrap(); file_io - .new_output(&gcs_path) + .new_output(mem_path) .unwrap() - .write("gcs".into()) + .write("mem_data".into()) .await .unwrap(); file_io - .new_output(mem_path) + .new_output(&s3_path) .unwrap() - .write("mem".into()) + .write("s3_data".into()) .await .unwrap(); - // Delete only GCS - file_io.delete(&gcs_path).await.unwrap(); - - // GCS gone, S3 and memory still there - assert!(file_io.exists(&s3_path).await.unwrap()); - assert!(!file_io.exists(&gcs_path).await.unwrap()); - assert!(file_io.exists(mem_path).await.unwrap()); - + // Read in reverse order: s3, memory, fs assert_eq!( file_io.new_input(&s3_path).unwrap().read().await.unwrap(), - bytes::Bytes::from("s3") + bytes::Bytes::from("s3_data") ); assert_eq!( file_io.new_input(mem_path).unwrap().read().await.unwrap(), - bytes::Bytes::from("mem") + bytes::Bytes::from("mem_data") ); + assert_eq!( + file_io.new_input(&fs_path).unwrap().read().await.unwrap(), + bytes::Bytes::from("fs_data") + ); + } + + #[tokio::test] + async fn test_invalid_scheme() { + let file_io = get_resolving_file_io(); + let result = file_io.exists("unknown://bucket/key").await; + assert!(result.is_err()); + assert!( + result + .unwrap_err() + .to_string() + .contains("Unsupported storage scheme"), + ); + } + + #[tokio::test] + async fn test_missing_scheme() { + let file_io = get_resolving_file_io(); + let result = file_io.exists("no-scheme-path").await; + assert!(result.is_err()); + } + + #[cfg(feature = "opendal-s3")] + #[tokio::test] + async fn test_with_custom_credential_loader() { + use async_trait::async_trait; + use iceberg_storage_opendal::CustomAwsCredentialLoader; + use reqsign::{AwsCredential, AwsCredentialLoad}; + use reqwest::Client; + + struct MinioCredentialLoader; + + #[async_trait] + impl AwsCredentialLoad for MinioCredentialLoader { + async fn load_credential( + &self, + _client: Client, + ) -> anyhow::Result> { + Ok(Some(AwsCredential { + access_key_id: "admin".to_string(), + secret_access_key: "password".to_string(), + session_token: None, + expires_in: None, + })) + } + } + + set_up(); + let minio_endpoint = get_minio_endpoint(); + + let factory = OpenDalResolvingStorageFactory::new() + .with_s3_credential_loader(CustomAwsCredentialLoader::new(Arc::new( + MinioCredentialLoader, + ))); + + let file_io = FileIOBuilder::new(Arc::new(factory)) + .with_props(vec![ + (S3_ENDPOINT, minio_endpoint), + (S3_REGION, "us-east-1".to_string()), + ]) + .build(); + + // Should be able to access S3 using the custom credential loader + assert!(file_io.exists("s3://bucket1/").await.unwrap()); } } From 03987544c73d9a0b846e47cbb7965165aa18a9ab Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Wed, 11 Mar 2026 16:26:36 -0700 Subject: [PATCH 3/8] fmt --- crates/storage/opendal/tests/resolving_storage_test.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/crates/storage/opendal/tests/resolving_storage_test.rs b/crates/storage/opendal/tests/resolving_storage_test.rs index 256ba680f2..4572ad2c2d 100644 --- a/crates/storage/opendal/tests/resolving_storage_test.rs +++ b/crates/storage/opendal/tests/resolving_storage_test.rs @@ -280,10 +280,9 @@ mod tests { set_up(); let minio_endpoint = get_minio_endpoint(); - let factory = OpenDalResolvingStorageFactory::new() - .with_s3_credential_loader(CustomAwsCredentialLoader::new(Arc::new( - MinioCredentialLoader, - ))); + let factory = OpenDalResolvingStorageFactory::new().with_s3_credential_loader( + CustomAwsCredentialLoader::new(Arc::new(MinioCredentialLoader)), + ); let file_io = FileIOBuilder::new(Arc::new(factory)) .with_props(vec![ From b7abf4d41874f40b6511aaf1a176e505d91f13fe Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Wed, 11 Mar 2026 16:41:05 -0700 Subject: [PATCH 4/8] drop read lock --- crates/storage/opendal/src/resolving.rs | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/crates/storage/opendal/src/resolving.rs b/crates/storage/opendal/src/resolving.rs index 23e3c8865d..9229ccc1f5 100644 --- a/crates/storage/opendal/src/resolving.rs +++ b/crates/storage/opendal/src/resolving.rs @@ -182,12 +182,14 @@ impl OpenDalResolvingStorage { let scheme = extract_scheme(path)?; // Fast path: check read lock first. - let cache = self - .storages - .read() - .map_err(|_| Error::new(ErrorKind::Unexpected, "Storage cache lock poisoned"))?; - if let Some(storage) = cache.get(&scheme) { - return Ok(storage.clone()); + { + let cache = self + .storages + .read() + .map_err(|_| Error::new(ErrorKind::Unexpected, "Storage cache lock poisoned"))?; + if let Some(storage) = cache.get(&scheme) { + return Ok(storage.clone()); + } } // Slow path: build and insert under write lock. From 80817b419025f28cee9b96332345d26058ba2d70 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Mon, 16 Mar 2026 11:02:36 -0700 Subject: [PATCH 5/8] add parse scheme --- crates/storage/opendal/src/resolving.rs | 42 +++++++++++++++++++------ 1 file changed, 32 insertions(+), 10 deletions(-) diff --git a/crates/storage/opendal/src/resolving.rs b/crates/storage/opendal/src/resolving.rs index 9229ccc1f5..efab7d48d8 100644 --- a/crates/storage/opendal/src/resolving.rs +++ b/crates/storage/opendal/src/resolving.rs @@ -28,6 +28,7 @@ use iceberg::io::{ StorageFactory, }; use iceberg::{Error, ErrorKind, Result}; +use opendal::Scheme; use serde::{Deserialize, Serialize}; use url::Url; @@ -35,7 +36,28 @@ use crate::OpenDalStorage; #[cfg(feature = "opendal-s3")] use crate::s3::CustomAwsCredentialLoader; -/// Extract the scheme from a path string (e.g., `"s3://bucket/key"` → `"s3"`). +/// Parse a URL scheme string into an [`opendal::Scheme`]. +/// +/// Handles Iceberg/Hadoop-specific aliases that opendal doesn't know about +/// (e.g. `s3a`, `s3n`, `gcs`, `abfs`, `abfss`, `wasb`, `wasbs`). +fn parse_scheme(scheme: &str) -> Result { + match scheme { + "memory" => Ok(Scheme::Memory), + "file" | "" => Ok(Scheme::Fs), + "s3" | "s3a" | "s3n" => Ok(Scheme::S3), + "gs" | "gcs" => Ok(Scheme::Gcs), + "oss" => Ok(Scheme::Oss), + "abfss" | "abfs" | "wasbs" | "wasb" => Ok(Scheme::Azdls), + s => s.parse::().map_err(|e| { + Error::new( + ErrorKind::FeatureUnsupported, + format!("Unsupported storage scheme: {s}: {e}"), + ) + }), + } +} + +/// Extract the scheme string from a path URL. fn extract_scheme(path: &str) -> Result { let url = Url::parse(path).map_err(|e| { Error::new( @@ -52,9 +74,9 @@ fn build_storage_for_scheme( props: &HashMap, #[cfg(feature = "opendal-s3")] customized_credential_load: &Option, ) -> Result { - match scheme { + match parse_scheme(scheme)? { #[cfg(feature = "opendal-s3")] - "s3" | "s3a" | "s3n" => { + Scheme::S3 => { let config = crate::s3::s3_config_parse(props.clone())?; Ok(OpenDalStorage::S3 { configured_scheme: scheme.to_string(), @@ -63,21 +85,21 @@ fn build_storage_for_scheme( }) } #[cfg(feature = "opendal-gcs")] - "gs" => { + Scheme::Gcs => { let config = crate::gcs::gcs_config_parse(props.clone())?; Ok(OpenDalStorage::Gcs { config: Arc::new(config), }) } #[cfg(feature = "opendal-oss")] - "oss" => { + Scheme::Oss => { let config = crate::oss::oss_config_parse(props.clone())?; Ok(OpenDalStorage::Oss { config: Arc::new(config), }) } #[cfg(feature = "opendal-azdls")] - "abfs" | "abfss" | "wasb" | "wasbs" => { + Scheme::Azdls => { let configured_scheme: crate::azdls::AzureStorageScheme = scheme.parse()?; let config = crate::azdls::azdls_config_parse(props.clone())?; Ok(OpenDalStorage::Azdls { @@ -86,12 +108,12 @@ fn build_storage_for_scheme( }) } #[cfg(feature = "opendal-fs")] - "file" => Ok(OpenDalStorage::LocalFs), + Scheme::Fs => Ok(OpenDalStorage::LocalFs), #[cfg(feature = "opendal-memory")] - "memory" => Ok(OpenDalStorage::Memory(crate::memory::memory_config_build()?)), - _ => Err(Error::new( + Scheme::Memory => Ok(OpenDalStorage::Memory(crate::memory::memory_config_build()?)), + unsupported => Err(Error::new( ErrorKind::FeatureUnsupported, - format!("Unsupported storage scheme: {scheme}"), + format!("Unsupported storage scheme: {unsupported}"), )), } } From b5a77be0d3529d477c1546202e85ae9228eba5c7 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Mon, 16 Mar 2026 19:26:33 -0700 Subject: [PATCH 6/8] const schemes --- crates/storage/opendal/src/resolving.rs | 26 +++++++++++++++++++------ 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/crates/storage/opendal/src/resolving.rs b/crates/storage/opendal/src/resolving.rs index efab7d48d8..6645f03f30 100644 --- a/crates/storage/opendal/src/resolving.rs +++ b/crates/storage/opendal/src/resolving.rs @@ -36,18 +36,32 @@ use crate::OpenDalStorage; #[cfg(feature = "opendal-s3")] use crate::s3::CustomAwsCredentialLoader; +/// Schemes supported by OpenDalResolvingStorage +pub const SCHEME_MEMORY: &str = "memory"; +pub const SCHEME_FILE: &str = "file"; +pub const SCHEME_S3: &str = "s3"; +pub const SCHEME_S3A: &str = "s3a"; +pub const SCHEME_S3N: &str = "s3n"; +pub const SCHEME_GS: &str = "gs"; +pub const SCHEME_GCS: &str = "gcs"; +pub const SCHEME_OSS: &str = "oss"; +pub const SCHEME_ABFSS: &str = "abfss"; +pub const SCHEME_ABFS: &str = "abfs"; +pub const SCHEME_WASBS: &str = "wasbs"; +pub const SCHEME_WASB: &str = "wasb"; + /// Parse a URL scheme string into an [`opendal::Scheme`]. /// /// Handles Iceberg/Hadoop-specific aliases that opendal doesn't know about /// (e.g. `s3a`, `s3n`, `gcs`, `abfs`, `abfss`, `wasb`, `wasbs`). fn parse_scheme(scheme: &str) -> Result { match scheme { - "memory" => Ok(Scheme::Memory), - "file" | "" => Ok(Scheme::Fs), - "s3" | "s3a" | "s3n" => Ok(Scheme::S3), - "gs" | "gcs" => Ok(Scheme::Gcs), - "oss" => Ok(Scheme::Oss), - "abfss" | "abfs" | "wasbs" | "wasb" => Ok(Scheme::Azdls), + SCHEME_MEMORY => Ok(Scheme::Memory), + SCHEME_FILE | "" => Ok(Scheme::Fs), + SCHEME_S3 | SCHEME_S3A | SCHEME_S3N => Ok(Scheme::S3), + SCHEME_GS | SCHEME_GCS => Ok(Scheme::Gcs), + SCHEME_OSS => Ok(Scheme::Oss), + SCHEME_ABFSS | SCHEME_ABFS | SCHEME_WASBS | SCHEME_WASB => Ok(Scheme::Azdls), s => s.parse::().map_err(|e| { Error::new( ErrorKind::FeatureUnsupported, From 84387bbbdef2d1856048921cc728ca9f9187c7fc Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Mon, 16 Mar 2026 19:27:19 -0700 Subject: [PATCH 7/8] minor --- crates/storage/opendal/src/resolving.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/crates/storage/opendal/src/resolving.rs b/crates/storage/opendal/src/resolving.rs index 6645f03f30..006c3d0575 100644 --- a/crates/storage/opendal/src/resolving.rs +++ b/crates/storage/opendal/src/resolving.rs @@ -51,9 +51,6 @@ pub const SCHEME_WASBS: &str = "wasbs"; pub const SCHEME_WASB: &str = "wasb"; /// Parse a URL scheme string into an [`opendal::Scheme`]. -/// -/// Handles Iceberg/Hadoop-specific aliases that opendal doesn't know about -/// (e.g. `s3a`, `s3n`, `gcs`, `abfs`, `abfss`, `wasb`, `wasbs`). fn parse_scheme(scheme: &str) -> Result { match scheme { SCHEME_MEMORY => Ok(Scheme::Memory), From ed1dd66235605671c6d470d8312fc6d0e598b775 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Mon, 16 Mar 2026 19:46:14 -0700 Subject: [PATCH 8/8] implement delete_stream --- crates/storage/opendal/src/resolving.rs | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/crates/storage/opendal/src/resolving.rs b/crates/storage/opendal/src/resolving.rs index 006c3d0575..7c06cf96a5 100644 --- a/crates/storage/opendal/src/resolving.rs +++ b/crates/storage/opendal/src/resolving.rs @@ -23,6 +23,8 @@ use std::sync::{Arc, RwLock}; use async_trait::async_trait; use bytes::Bytes; +use futures::StreamExt; +use futures::stream::BoxStream; use iceberg::io::{ FileMetadata, FileRead, FileWrite, InputFile, OutputFile, Storage, StorageConfig, StorageFactory, @@ -283,6 +285,24 @@ impl Storage for OpenDalResolvingStorage { self.resolve(path)?.delete_prefix(path).await } + async fn delete_stream(&self, mut paths: BoxStream<'static, String>) -> Result<()> { + // Group paths by scheme so each resolved storage receives a batch, + // avoiding repeated operator creation per path. + let mut grouped: HashMap> = HashMap::new(); + while let Some(path) = paths.next().await { + let scheme = extract_scheme(&path)?; + grouped.entry(scheme).or_default().push(path); + } + + for (_, paths) in grouped { + let storage = self.resolve(&paths[0])?; + storage + .delete_stream(futures::stream::iter(paths).boxed()) + .await?; + } + Ok(()) + } + fn new_input(&self, path: &str) -> Result { Ok(InputFile::new( Arc::new(self.resolve(path)?.as_ref().clone()),