diff --git a/Cargo.lock b/Cargo.lock index 9d7865ac..bcc69dd4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3946,7 +3946,7 @@ dependencies = [ [[package]] name = "iceberg" version = "0.3.0" -source = "git+https://github.com/splitgraph/iceberg-rust?rev=adf221357451861ebe403961e9e791c6b4093051#adf221357451861ebe403961e9e791c6b4093051" +source = "git+https://github.com/splitgraph/iceberg-rust?rev=dbe5858039e676b45329a5cfc46cc1e7d9b98402#dbe5858039e676b45329a5cfc46cc1e7d9b98402" dependencies = [ "anyhow", "apache-avro", @@ -3992,7 +3992,7 @@ dependencies = [ [[package]] name = "iceberg-datafusion" version = "0.3.0" -source = "git+https://github.com/splitgraph/iceberg-rust?rev=adf221357451861ebe403961e9e791c6b4093051#adf221357451861ebe403961e9e791c6b4093051" +source = "git+https://github.com/splitgraph/iceberg-rust?rev=dbe5858039e676b45329a5cfc46cc1e7d9b98402#dbe5858039e676b45329a5cfc46cc1e7d9b98402" dependencies = [ "anyhow", "async-trait", @@ -5129,6 +5129,7 @@ version = "0.1.0" dependencies = [ "async-trait", "futures", + "iceberg", "object_store", "rstest", "serde", diff --git a/Cargo.toml b/Cargo.toml index 11b3f409..d721e288 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,6 +21,9 @@ datafusion-functions-nested = "43.0.0" futures = "0.3" +iceberg = { git = "https://github.com/splitgraph/iceberg-rust", rev = "dbe5858039e676b45329a5cfc46cc1e7d9b98402" } +iceberg-datafusion = { git = "https://github.com/splitgraph/iceberg-rust", rev = "dbe5858039e676b45329a5cfc46cc1e7d9b98402" } + itertools = ">=0.10.0" object_store = { version = "0.11", features = ["aws", "azure", "gcp"] } prost = "0.13" @@ -101,8 +104,8 @@ deltalake = { git = "https://github.com/splitgraph/delta-rs", rev = "eff57356982 futures = "0.3" hex = ">=0.4.0" -iceberg = { git = "https://github.com/splitgraph/iceberg-rust", rev = "adf221357451861ebe403961e9e791c6b4093051" } -iceberg-datafusion = { git = "https://github.com/splitgraph/iceberg-rust", rev = "adf221357451861ebe403961e9e791c6b4093051" } +iceberg = { workspace = true } +iceberg-datafusion = { workspace = true } indexmap = "2.6.0" itertools = { workspace = true } diff --git a/object_store_factory/Cargo.toml b/object_store_factory/Cargo.toml index bdb8c19c..f1819460 100644 --- a/object_store_factory/Cargo.toml +++ b/object_store_factory/Cargo.toml @@ -7,6 +7,7 @@ license = "Apache-2.0" [dependencies] async-trait = { workspace = true } futures = { workspace = true } +iceberg = { workspace = true } object_store = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } diff --git a/object_store_factory/src/lib.rs b/object_store_factory/src/lib.rs index 62769f8f..e547c8e0 100644 --- a/object_store_factory/src/lib.rs +++ b/object_store_factory/src/lib.rs @@ -2,6 +2,7 @@ pub mod aws; pub mod google; pub mod local; mod memory; +pub mod utils; use aws::S3Config; use google::GCSConfig; diff --git a/object_store_factory/src/utils.rs b/object_store_factory/src/utils.rs new file mode 100644 index 00000000..563c7b58 --- /dev/null +++ b/object_store_factory/src/utils.rs @@ -0,0 +1,45 @@ +use iceberg::io::{ + S3_ACCESS_KEY_ID, S3_ALLOW_ANONYMOUS, S3_DISABLE_EC2_METADATA, S3_ENDPOINT, + S3_REGION, S3_SECRET_ACCESS_KEY, +}; +use object_store::aws::AmazonS3ConfigKey; +use std::collections::HashMap; +use std::str::FromStr; + +// Go through all known keys for object store and convert them to corresponding file_io ones. +// +// For now only converts S3 keys. +// TODO: At some point this should be redundant, since there is an OpenDAL adapter for object_store, +// https://github.com/apache/iceberg-rust/issues/172 +pub fn object_store_opts_to_file_io_props( + opts: &HashMap, +) -> HashMap { + let mut props = HashMap::new(); + + for (key, val) in opts.iter() { + let key = match AmazonS3ConfigKey::from_str(key) { + Ok(AmazonS3ConfigKey::AccessKeyId) => S3_ACCESS_KEY_ID, + Ok(AmazonS3ConfigKey::SecretAccessKey) => S3_SECRET_ACCESS_KEY, + Ok(AmazonS3ConfigKey::SkipSignature) + if ["true", "t", "1"].contains(&val.to_lowercase().as_str()) => + { + // We need two options on the opendal client in this case + props.insert(S3_ALLOW_ANONYMOUS.to_string(), val.clone()); + props.insert(S3_DISABLE_EC2_METADATA.to_string(), val.clone()); + continue; + } + Ok(AmazonS3ConfigKey::Region) => S3_REGION, + Ok(AmazonS3ConfigKey::Endpoint) => S3_ENDPOINT, + _ => key, // for now just propagate any non-matched keys + }; + + props.insert(key.to_string(), val.clone()); + } + + // FileIO requires the region prop even when the S3 store doesn't (e.g. MinIO) + props + .entry(S3_REGION.to_string()) + .or_insert("dummy-region".to_string()); + + props +} diff --git a/src/catalog/metastore.rs b/src/catalog/metastore.rs index 0f0f14dd..56b5bdf9 100644 --- a/src/catalog/metastore.rs +++ b/src/catalog/metastore.rs @@ -20,13 +20,13 @@ use datafusion::datasource::TableProvider; use super::empty::EmptyStore; use crate::catalog::memory::MemoryStore; -use crate::object_store::utils::object_store_opts_to_file_io_props; use deltalake::DeltaTable; use futures::{stream, StreamExt, TryStreamExt}; use iceberg::io::FileIO; use iceberg::table::StaticTable; use iceberg::TableIdent; use iceberg_datafusion::IcebergTableProvider; +use object_store_factory::utils::object_store_opts_to_file_io_props; use std::collections::HashMap; use std::str::FromStr; use std::sync::Arc; diff --git a/src/object_store/utils.rs b/src/object_store/utils.rs index c5069739..c7b932a7 100644 --- a/src/object_store/utils.rs +++ b/src/object_store/utils.rs @@ -1,13 +1,6 @@ use futures::TryFutureExt; -use iceberg::io::{ - S3_ACCESS_KEY_ID, S3_ALLOW_ANONYMOUS, S3_DISABLE_EC2_METADATA, S3_ENDPOINT, - S3_REGION, S3_SECRET_ACCESS_KEY, -}; -use object_store::aws::AmazonS3ConfigKey; use object_store::Error; -use std::collections::HashMap; use std::path::Path as StdPath; -use std::str::FromStr; use tokio::fs::{copy, create_dir_all, remove_file, rename}; use tracing::debug; @@ -51,41 +44,3 @@ pub async fn fast_upload(from: &StdPath, to: String) -> object_store::Result<(), Ok(()) } } - -// Go through all known keys for object store and convert them to corresponding file_io ones. -// -// For now only converts S3 keys. -// TODO: At some point this should be redundant, since there is an OpenDAL adapter for object_store, -// https://github.com/apache/iceberg-rust/issues/172 -pub fn object_store_opts_to_file_io_props( - opts: &HashMap, -) -> HashMap { - let mut props = HashMap::new(); - - for (key, val) in opts.iter() { - let key = match AmazonS3ConfigKey::from_str(key) { - Ok(AmazonS3ConfigKey::AccessKeyId) => S3_ACCESS_KEY_ID, - Ok(AmazonS3ConfigKey::SecretAccessKey) => S3_SECRET_ACCESS_KEY, - Ok(AmazonS3ConfigKey::SkipSignature) - if ["true", "t", "1"].contains(&val.to_lowercase().as_str()) => - { - // We need two options on the opendal client in this case - props.insert(S3_ALLOW_ANONYMOUS.to_string(), val.clone()); - props.insert(S3_DISABLE_EC2_METADATA.to_string(), val.clone()); - continue; - } - Ok(AmazonS3ConfigKey::Region) => S3_REGION, - Ok(AmazonS3ConfigKey::Endpoint) => S3_ENDPOINT, - _ => key, // for now just propagate any non-matched keys - }; - - props.insert(key.to_string(), val.clone()); - } - - // FileIO requires the region prop even when the S3 store doesn't (e.g. MinIO) - props - .entry(S3_REGION.to_string()) - .or_insert("dummy-region".to_string()); - - props -}