Skip to content

Commit

Permalink
Merge pull request #758 from splitgraph/obj-store-factory-file-io-props
Browse files Browse the repository at this point in the history
Move FileIO props conversion into the object_store_factory
gruuya authored Dec 9, 2024
2 parents aea2f76 + ffebc28 commit d21ef72
Showing 7 changed files with 56 additions and 50 deletions.
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 5 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -21,6 +21,9 @@ datafusion-functions-nested = "43.0.0"

futures = "0.3"

iceberg = { git = "https://github.com/splitgraph/iceberg-rust", rev = "dbe5858039e676b45329a5cfc46cc1e7d9b98402" }
iceberg-datafusion = { git = "https://github.com/splitgraph/iceberg-rust", rev = "dbe5858039e676b45329a5cfc46cc1e7d9b98402" }

itertools = ">=0.10.0"
object_store = { version = "0.11", features = ["aws", "azure", "gcp"] }
prost = "0.13"
@@ -101,8 +104,8 @@ deltalake = { git = "https://github.com/splitgraph/delta-rs", rev = "eff57356982
futures = "0.3"
hex = ">=0.4.0"

iceberg = { git = "https://github.com/splitgraph/iceberg-rust", rev = "adf221357451861ebe403961e9e791c6b4093051" }
iceberg-datafusion = { git = "https://github.com/splitgraph/iceberg-rust", rev = "adf221357451861ebe403961e9e791c6b4093051" }
iceberg = { workspace = true }
iceberg-datafusion = { workspace = true }
indexmap = "2.6.0"
itertools = { workspace = true }

1 change: 1 addition & 0 deletions object_store_factory/Cargo.toml
Original file line number Diff line number Diff line change
@@ -7,6 +7,7 @@ license = "Apache-2.0"
[dependencies]
async-trait = { workspace = true }
futures = { workspace = true }
iceberg = { workspace = true }
object_store = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
1 change: 1 addition & 0 deletions object_store_factory/src/lib.rs
Original file line number Diff line number Diff line change
@@ -2,6 +2,7 @@ pub mod aws;
pub mod google;
pub mod local;
mod memory;
pub mod utils;

use aws::S3Config;
use google::GCSConfig;
45 changes: 45 additions & 0 deletions object_store_factory/src/utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
use iceberg::io::{
S3_ACCESS_KEY_ID, S3_ALLOW_ANONYMOUS, S3_DISABLE_EC2_METADATA, S3_ENDPOINT,
S3_REGION, S3_SECRET_ACCESS_KEY,
};
use object_store::aws::AmazonS3ConfigKey;
use std::collections::HashMap;
use std::str::FromStr;

// Go through all known keys for object store and convert them to corresponding file_io ones.
//
// For now only converts S3 keys.
// TODO: At some point this should be redundant, since there is an OpenDAL adapter for object_store,
// https://github.com/apache/iceberg-rust/issues/172
pub fn object_store_opts_to_file_io_props(
opts: &HashMap<String, String>,
) -> HashMap<String, String> {
let mut props = HashMap::new();

for (key, val) in opts.iter() {
let key = match AmazonS3ConfigKey::from_str(key) {
Ok(AmazonS3ConfigKey::AccessKeyId) => S3_ACCESS_KEY_ID,
Ok(AmazonS3ConfigKey::SecretAccessKey) => S3_SECRET_ACCESS_KEY,
Ok(AmazonS3ConfigKey::SkipSignature)
if ["true", "t", "1"].contains(&val.to_lowercase().as_str()) =>
{
// We need two options on the opendal client in this case
props.insert(S3_ALLOW_ANONYMOUS.to_string(), val.clone());
props.insert(S3_DISABLE_EC2_METADATA.to_string(), val.clone());
continue;
}
Ok(AmazonS3ConfigKey::Region) => S3_REGION,
Ok(AmazonS3ConfigKey::Endpoint) => S3_ENDPOINT,
_ => key, // for now just propagate any non-matched keys
};

props.insert(key.to_string(), val.clone());
}

// FileIO requires the region prop even when the S3 store doesn't (e.g. MinIO)
props
.entry(S3_REGION.to_string())
.or_insert("dummy-region".to_string());

props
}
2 changes: 1 addition & 1 deletion src/catalog/metastore.rs
Original file line number Diff line number Diff line change
@@ -20,13 +20,13 @@ use datafusion::datasource::TableProvider;

use super::empty::EmptyStore;
use crate::catalog::memory::MemoryStore;
use crate::object_store::utils::object_store_opts_to_file_io_props;
use deltalake::DeltaTable;
use futures::{stream, StreamExt, TryStreamExt};
use iceberg::io::FileIO;
use iceberg::table::StaticTable;
use iceberg::TableIdent;
use iceberg_datafusion::IcebergTableProvider;
use object_store_factory::utils::object_store_opts_to_file_io_props;
use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;
45 changes: 0 additions & 45 deletions src/object_store/utils.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,6 @@
use futures::TryFutureExt;
use iceberg::io::{
S3_ACCESS_KEY_ID, S3_ALLOW_ANONYMOUS, S3_DISABLE_EC2_METADATA, S3_ENDPOINT,
S3_REGION, S3_SECRET_ACCESS_KEY,
};
use object_store::aws::AmazonS3ConfigKey;
use object_store::Error;
use std::collections::HashMap;
use std::path::Path as StdPath;
use std::str::FromStr;
use tokio::fs::{copy, create_dir_all, remove_file, rename};
use tracing::debug;

@@ -51,41 +44,3 @@ pub async fn fast_upload(from: &StdPath, to: String) -> object_store::Result<(),
Ok(())
}
}

// Go through all known keys for object store and convert them to corresponding file_io ones.
//
// For now only converts S3 keys.
// TODO: At some point this should be redundant, since there is an OpenDAL adapter for object_store,
// https://github.com/apache/iceberg-rust/issues/172
pub fn object_store_opts_to_file_io_props(
opts: &HashMap<String, String>,
) -> HashMap<String, String> {
let mut props = HashMap::new();

for (key, val) in opts.iter() {
let key = match AmazonS3ConfigKey::from_str(key) {
Ok(AmazonS3ConfigKey::AccessKeyId) => S3_ACCESS_KEY_ID,
Ok(AmazonS3ConfigKey::SecretAccessKey) => S3_SECRET_ACCESS_KEY,
Ok(AmazonS3ConfigKey::SkipSignature)
if ["true", "t", "1"].contains(&val.to_lowercase().as_str()) =>
{
// We need two options on the opendal client in this case
props.insert(S3_ALLOW_ANONYMOUS.to_string(), val.clone());
props.insert(S3_DISABLE_EC2_METADATA.to_string(), val.clone());
continue;
}
Ok(AmazonS3ConfigKey::Region) => S3_REGION,
Ok(AmazonS3ConfigKey::Endpoint) => S3_ENDPOINT,
_ => key, // for now just propagate any non-matched keys
};

props.insert(key.to_string(), val.clone());
}

// FileIO requires the region prop even when the S3 store doesn't (e.g. MinIO)
props
.entry(S3_REGION.to_string())
.or_insert("dummy-region".to_string());

props
}

0 comments on commit d21ef72

Please sign in to comment.