-
Notifications
You must be signed in to change notification settings - Fork 434
feat(storage): implement opendal resolving storage #2231
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
9 commits
Select commit
Hold shift + click to select a range
3766402
implement opendal resolving storage
CTTY 0a5de69
optimize tests
CTTY 0398754
fmt
CTTY b7abf4d
drop read lock
CTTY 80817b4
add parse scheme
CTTY b5a77be
const schemes
CTTY 84387bb
minor
CTTY 17d71db
Merge branch 'main' into ctty/opendal-resolving
CTTY ed1dd66
implement delete_stream
CTTY File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,319 @@ | ||
| // Licensed to the Apache Software Foundation (ASF) under one | ||
| // or more contributor license agreements. See the NOTICE file | ||
| // distributed with this work for additional information | ||
| // regarding copyright ownership. The ASF licenses this file | ||
| // to you under the Apache License, Version 2.0 (the | ||
| // "License"); you may not use this file except in compliance | ||
| // with the License. You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, | ||
| // software distributed under the License is distributed on an | ||
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| // KIND, either express or implied. See the License for the | ||
| // specific language governing permissions and limitations | ||
| // under the License. | ||
|
|
||
| //! Resolving storage that auto-detects the scheme from a path and delegates | ||
| //! to the appropriate [`OpenDalStorage`] variant. | ||
|
|
||
| use std::collections::HashMap; | ||
| use std::sync::{Arc, RwLock}; | ||
|
|
||
| use async_trait::async_trait; | ||
| use bytes::Bytes; | ||
| use futures::StreamExt; | ||
| use futures::stream::BoxStream; | ||
| use iceberg::io::{ | ||
| FileMetadata, FileRead, FileWrite, InputFile, OutputFile, Storage, StorageConfig, | ||
| StorageFactory, | ||
| }; | ||
| use iceberg::{Error, ErrorKind, Result}; | ||
| use opendal::Scheme; | ||
| use serde::{Deserialize, Serialize}; | ||
| use url::Url; | ||
|
|
||
| use crate::OpenDalStorage; | ||
| #[cfg(feature = "opendal-s3")] | ||
| use crate::s3::CustomAwsCredentialLoader; | ||
|
|
||
| /// Schemes supported by OpenDalResolvingStorage | ||
| pub const SCHEME_MEMORY: &str = "memory"; | ||
| pub const SCHEME_FILE: &str = "file"; | ||
| pub const SCHEME_S3: &str = "s3"; | ||
| pub const SCHEME_S3A: &str = "s3a"; | ||
| pub const SCHEME_S3N: &str = "s3n"; | ||
| pub const SCHEME_GS: &str = "gs"; | ||
| pub const SCHEME_GCS: &str = "gcs"; | ||
| pub const SCHEME_OSS: &str = "oss"; | ||
| pub const SCHEME_ABFSS: &str = "abfss"; | ||
| pub const SCHEME_ABFS: &str = "abfs"; | ||
| pub const SCHEME_WASBS: &str = "wasbs"; | ||
| pub const SCHEME_WASB: &str = "wasb"; | ||
|
|
||
| /// Parse a URL scheme string into an [`opendal::Scheme`]. | ||
| fn parse_scheme(scheme: &str) -> Result<Scheme> { | ||
| match scheme { | ||
| SCHEME_MEMORY => Ok(Scheme::Memory), | ||
| SCHEME_FILE | "" => Ok(Scheme::Fs), | ||
| SCHEME_S3 | SCHEME_S3A | SCHEME_S3N => Ok(Scheme::S3), | ||
| SCHEME_GS | SCHEME_GCS => Ok(Scheme::Gcs), | ||
| SCHEME_OSS => Ok(Scheme::Oss), | ||
| SCHEME_ABFSS | SCHEME_ABFS | SCHEME_WASBS | SCHEME_WASB => Ok(Scheme::Azdls), | ||
| s => s.parse::<Scheme>().map_err(|e| { | ||
| Error::new( | ||
| ErrorKind::FeatureUnsupported, | ||
| format!("Unsupported storage scheme: {s}: {e}"), | ||
| ) | ||
| }), | ||
| } | ||
| } | ||
|
|
||
| /// Extract the scheme string from a path URL. | ||
| fn extract_scheme(path: &str) -> Result<String> { | ||
| let url = Url::parse(path).map_err(|e| { | ||
| Error::new( | ||
| ErrorKind::DataInvalid, | ||
| format!("Invalid path: {path}, failed to parse URL: {e}"), | ||
| ) | ||
| })?; | ||
| Ok(url.scheme().to_string()) | ||
| } | ||
|
|
||
| /// Build an [`OpenDalStorage`] variant for the given scheme and config properties. | ||
| fn build_storage_for_scheme( | ||
| scheme: &str, | ||
| props: &HashMap<String, String>, | ||
| #[cfg(feature = "opendal-s3")] customized_credential_load: &Option<CustomAwsCredentialLoader>, | ||
| ) -> Result<OpenDalStorage> { | ||
| match parse_scheme(scheme)? { | ||
| #[cfg(feature = "opendal-s3")] | ||
| Scheme::S3 => { | ||
| let config = crate::s3::s3_config_parse(props.clone())?; | ||
| Ok(OpenDalStorage::S3 { | ||
| configured_scheme: scheme.to_string(), | ||
| config: Arc::new(config), | ||
| customized_credential_load: customized_credential_load.clone(), | ||
| }) | ||
| } | ||
| #[cfg(feature = "opendal-gcs")] | ||
| Scheme::Gcs => { | ||
| let config = crate::gcs::gcs_config_parse(props.clone())?; | ||
| Ok(OpenDalStorage::Gcs { | ||
| config: Arc::new(config), | ||
| }) | ||
| } | ||
| #[cfg(feature = "opendal-oss")] | ||
| Scheme::Oss => { | ||
| let config = crate::oss::oss_config_parse(props.clone())?; | ||
| Ok(OpenDalStorage::Oss { | ||
| config: Arc::new(config), | ||
| }) | ||
| } | ||
| #[cfg(feature = "opendal-azdls")] | ||
| Scheme::Azdls => { | ||
| let configured_scheme: crate::azdls::AzureStorageScheme = scheme.parse()?; | ||
| let config = crate::azdls::azdls_config_parse(props.clone())?; | ||
| Ok(OpenDalStorage::Azdls { | ||
| configured_scheme, | ||
| config: Arc::new(config), | ||
| }) | ||
| } | ||
| #[cfg(feature = "opendal-fs")] | ||
| Scheme::Fs => Ok(OpenDalStorage::LocalFs), | ||
| #[cfg(feature = "opendal-memory")] | ||
| Scheme::Memory => Ok(OpenDalStorage::Memory(crate::memory::memory_config_build()?)), | ||
| unsupported => Err(Error::new( | ||
| ErrorKind::FeatureUnsupported, | ||
| format!("Unsupported storage scheme: {unsupported}"), | ||
| )), | ||
| } | ||
| } | ||
|
|
||
| /// A resolving storage factory that creates [`OpenDalResolvingStorage`] instances. | ||
| /// | ||
| /// This factory accepts paths from any supported storage system and dynamically | ||
| /// delegates operations to the appropriate [`OpenDalStorage`] variant based on | ||
| /// the path scheme. | ||
| /// | ||
| /// # Example | ||
| /// | ||
| /// ```rust,ignore | ||
| /// use std::sync::Arc; | ||
| /// use iceberg::io::FileIOBuilder; | ||
| /// use iceberg_storage_opendal::OpenDalResolvingStorageFactory; | ||
| /// | ||
| /// let factory = OpenDalResolvingStorageFactory::new(); | ||
| /// let file_io = FileIOBuilder::new(Arc::new(factory)) | ||
| /// .with_prop("s3.region", "us-east-1") | ||
| /// .build(); | ||
| /// ``` | ||
| #[derive(Clone, Debug, Serialize, Deserialize)] | ||
| pub struct OpenDalResolvingStorageFactory { | ||
| /// Custom AWS credential loader for S3 storage. | ||
| #[cfg(feature = "opendal-s3")] | ||
| #[serde(skip)] | ||
| customized_credential_load: Option<CustomAwsCredentialLoader>, | ||
| } | ||
|
|
||
| impl Default for OpenDalResolvingStorageFactory { | ||
| fn default() -> Self { | ||
| Self::new() | ||
| } | ||
| } | ||
|
|
||
| impl OpenDalResolvingStorageFactory { | ||
| /// Create a new resolving storage factory. | ||
| pub fn new() -> Self { | ||
| Self { | ||
| #[cfg(feature = "opendal-s3")] | ||
| customized_credential_load: None, | ||
| } | ||
| } | ||
|
|
||
| /// Set a custom AWS credential loader for S3 storage. | ||
| #[cfg(feature = "opendal-s3")] | ||
| pub fn with_s3_credential_loader(mut self, loader: CustomAwsCredentialLoader) -> Self { | ||
| self.customized_credential_load = Some(loader); | ||
| self | ||
| } | ||
| } | ||
|
|
||
| #[typetag::serde] | ||
| impl StorageFactory for OpenDalResolvingStorageFactory { | ||
| fn build(&self, config: &StorageConfig) -> Result<Arc<dyn Storage>> { | ||
| Ok(Arc::new(OpenDalResolvingStorage { | ||
| props: config.props().clone(), | ||
| storages: RwLock::new(HashMap::new()), | ||
| #[cfg(feature = "opendal-s3")] | ||
| customized_credential_load: self.customized_credential_load.clone(), | ||
| })) | ||
| } | ||
| } | ||
|
|
||
| /// A resolving storage that auto-detects the scheme from a path and delegates | ||
| /// to the appropriate [`OpenDalStorage`] variant. | ||
| /// | ||
| /// Sub-storages are lazily created on first use for each scheme and cached | ||
| /// for subsequent operations. | ||
| #[derive(Debug, Serialize, Deserialize)] | ||
| pub struct OpenDalResolvingStorage { | ||
| /// Configuration properties shared across all backends. | ||
| props: HashMap<String, String>, | ||
| /// Cache of scheme → storage mappings. | ||
| #[serde(skip, default)] | ||
| storages: RwLock<HashMap<String, Arc<OpenDalStorage>>>, | ||
| /// Custom AWS credential loader for S3 storage. | ||
| #[cfg(feature = "opendal-s3")] | ||
| #[serde(skip)] | ||
| customized_credential_load: Option<CustomAwsCredentialLoader>, | ||
| } | ||
|
|
||
| impl OpenDalResolvingStorage { | ||
| /// Resolve the storage for the given path by extracting the scheme and | ||
| /// returning the cached or newly-created [`OpenDalStorage`]. | ||
| fn resolve(&self, path: &str) -> Result<Arc<OpenDalStorage>> { | ||
| let scheme = extract_scheme(path)?; | ||
|
|
||
| // Fast path: check read lock first. | ||
| { | ||
| let cache = self | ||
| .storages | ||
| .read() | ||
| .map_err(|_| Error::new(ErrorKind::Unexpected, "Storage cache lock poisoned"))?; | ||
| if let Some(storage) = cache.get(&scheme) { | ||
| return Ok(storage.clone()); | ||
| } | ||
| } | ||
|
|
||
| // Slow path: build and insert under write lock. | ||
| let mut cache = self | ||
| .storages | ||
| .write() | ||
| .map_err(|_| Error::new(ErrorKind::Unexpected, "Storage cache lock poisoned"))?; | ||
|
|
||
| // Double-check after acquiring write lock. | ||
| if let Some(storage) = cache.get(&scheme) { | ||
| return Ok(storage.clone()); | ||
| } | ||
|
|
||
| let storage = build_storage_for_scheme( | ||
| &scheme, | ||
| &self.props, | ||
| #[cfg(feature = "opendal-s3")] | ||
| &self.customized_credential_load, | ||
| )?; | ||
| let storage = Arc::new(storage); | ||
| cache.insert(scheme, storage.clone()); | ||
| Ok(storage) | ||
| } | ||
| } | ||
|
|
||
| #[async_trait] | ||
| #[typetag::serde] | ||
| impl Storage for OpenDalResolvingStorage { | ||
| async fn exists(&self, path: &str) -> Result<bool> { | ||
| self.resolve(path)?.exists(path).await | ||
| } | ||
|
|
||
| async fn metadata(&self, path: &str) -> Result<FileMetadata> { | ||
| self.resolve(path)?.metadata(path).await | ||
| } | ||
|
|
||
| async fn read(&self, path: &str) -> Result<Bytes> { | ||
| self.resolve(path)?.read(path).await | ||
| } | ||
|
|
||
| async fn reader(&self, path: &str) -> Result<Box<dyn FileRead>> { | ||
| self.resolve(path)?.reader(path).await | ||
| } | ||
|
|
||
| async fn write(&self, path: &str, bs: Bytes) -> Result<()> { | ||
| self.resolve(path)?.write(path, bs).await | ||
| } | ||
|
|
||
| async fn writer(&self, path: &str) -> Result<Box<dyn FileWrite>> { | ||
| self.resolve(path)?.writer(path).await | ||
| } | ||
|
|
||
| async fn delete(&self, path: &str) -> Result<()> { | ||
| self.resolve(path)?.delete(path).await | ||
| } | ||
|
|
||
| async fn delete_prefix(&self, path: &str) -> Result<()> { | ||
| self.resolve(path)?.delete_prefix(path).await | ||
| } | ||
|
|
||
| async fn delete_stream(&self, mut paths: BoxStream<'static, String>) -> Result<()> { | ||
| // Group paths by scheme so each resolved storage receives a batch, | ||
| // avoiding repeated operator creation per path. | ||
| let mut grouped: HashMap<String, Vec<String>> = HashMap::new(); | ||
| while let Some(path) = paths.next().await { | ||
| let scheme = extract_scheme(&path)?; | ||
| grouped.entry(scheme).or_default().push(path); | ||
| } | ||
|
|
||
| for (_, paths) in grouped { | ||
| let storage = self.resolve(&paths[0])?; | ||
| storage | ||
| .delete_stream(futures::stream::iter(paths).boxed()) | ||
| .await?; | ||
| } | ||
| Ok(()) | ||
| } | ||
|
|
||
| fn new_input(&self, path: &str) -> Result<InputFile> { | ||
| Ok(InputFile::new( | ||
| Arc::new(self.resolve(path)?.as_ref().clone()), | ||
| path.to_string(), | ||
| )) | ||
| } | ||
|
|
||
| fn new_output(&self, path: &str) -> Result<OutputFile> { | ||
| Ok(OutputFile::new( | ||
| Arc::new(self.resolve(path)?.as_ref().clone()), | ||
| path.to_string(), | ||
| )) | ||
| } | ||
| } | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be multi map? For example, we may need to support both s3 and s3a for S3 storage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The key here is a string representing a scheme, you can have both within a map:
Or we are thinking of mapping one scheme to multiple storages?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking we should map them into same storage? A storage instance has a lot of resources inside, like connection pool, etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently there is a
configured_schemeforOpenDalStorage::{S3 , Azdls}, the path it handles should match the configured scheme, so technically it shouldn't be using the same storage instance if the schemes are different.https://github.com/apache/iceberg-rust/blob/main/crates/storage/opendal/src/lib.rs#L110
I'm not quite sure about the reason tho, maybe it's an OpenDal limitation? I think we can improve this in a different PR if needed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIRC the
configured_schemewas a legacy setting from before we refactorStoragetrait. I think we no longer need this field since theStroagenow accepts the full url. Please create an issue to track it.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Created a tracking issue: #2245