From 6db1dcbc037d4bfbfba071067ee64bc449c6681c Mon Sep 17 00:00:00 2001 From: Stefan Stanciulescu Date: Sun, 2 Oct 2022 08:44:37 -0700 Subject: [PATCH 1/5] Add support for GCS --- ballista/rust/client/Cargo.toml | 1 + ballista/rust/core/Cargo.toml | 1 + ballista/rust/core/src/utils.rs | 15 ++++++++++++++- 3 files changed, 16 insertions(+), 1 deletion(-) diff --git a/ballista/rust/client/Cargo.toml b/ballista/rust/client/Cargo.toml index a7ab2164a..733ee394a 100644 --- a/ballista/rust/client/Cargo.toml +++ b/ballista/rust/client/Cargo.toml @@ -44,4 +44,5 @@ tokio = "1.0" default = [] hdfs = ["ballista-core/hdfs"] s3 = ["ballista-core/s3"] +gcs = ["ballista-core/gcs"] standalone = ["ballista-executor", "ballista-scheduler"] diff --git a/ballista/rust/core/Cargo.toml b/ballista/rust/core/Cargo.toml index 6699af013..d512e063c 100644 --- a/ballista/rust/core/Cargo.toml +++ b/ballista/rust/core/Cargo.toml @@ -36,6 +36,7 @@ force_hash_collisions = ["datafusion/force_hash_collisions"] # Used to enable hdfs to be registered in the ObjectStoreRegistry by default hdfs = ["datafusion-objectstore-hdfs"] s3 = ["object_store/aws"] +gcs = ["object_store/gcp"] simd = ["datafusion/simd"] [dependencies] diff --git a/ballista/rust/core/src/utils.rs b/ballista/rust/core/src/utils.rs index 21626c555..d2a295ee1 100644 --- a/ballista/rust/core/src/utils.rs +++ b/ballista/rust/core/src/utils.rs @@ -52,7 +52,8 @@ use futures::StreamExt; use log::error; #[cfg(feature = "s3")] use object_store::aws::AmazonS3Builder; -use object_store::ObjectStore; +#[cfg(feature = "gcs")] +use object_store::gcp::GoogleCloudStorageBuilder; use std::io::{BufWriter, Write}; use std::marker::PhantomData; use std::sync::atomic::{AtomicUsize, Ordering}; @@ -109,6 +110,18 @@ impl ObjectStoreProvider for FeatureBasedObjectStoreProvider { } } + #[cfg(feature = "gcs")] + { + if url.to_string().starts_with("gcs://") { + if let Some(bucket_name) = url.host_str() { + let store = GoogleCloudStorageBuilder::from_env() + .with_bucket_name(bucket_name) + .build()?; + return Ok(Arc::new(store)); + } + } + } + Err(DataFusionError::Execution(format!( "No object store available for {}", url From 4eeff2c1df2ecbde0ddf36962ec8c7193a6633a2 Mon Sep 17 00:00:00 2001 From: Stefan Stanciulescu Date: Sun, 2 Oct 2022 08:49:27 -0700 Subject: [PATCH 2/5] Ups, missed this removal of an import --- ballista/rust/core/src/utils.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/ballista/rust/core/src/utils.rs b/ballista/rust/core/src/utils.rs index d2a295ee1..cab189d6e 100644 --- a/ballista/rust/core/src/utils.rs +++ b/ballista/rust/core/src/utils.rs @@ -50,6 +50,7 @@ use datafusion_proto::logical_plan::{ }; use futures::StreamExt; use log::error; +use object_store::ObjectStore; #[cfg(feature = "s3")] use object_store::aws::AmazonS3Builder; #[cfg(feature = "gcs")] From 3e27410ae95bb79314f0e712f2616db3dd1fecb3 Mon Sep 17 00:00:00 2001 From: Stefan Stanciulescu Date: Sun, 2 Oct 2022 09:04:11 -0700 Subject: [PATCH 3/5] Add support for Azure --- ballista/rust/client/Cargo.toml | 1 + ballista/rust/core/Cargo.toml | 1 + ballista/rust/core/src/utils.rs | 16 +++++++++++++++- 3 files changed, 17 insertions(+), 1 deletion(-) diff --git a/ballista/rust/client/Cargo.toml b/ballista/rust/client/Cargo.toml index 733ee394a..5e327a2da 100644 --- a/ballista/rust/client/Cargo.toml +++ b/ballista/rust/client/Cargo.toml @@ -45,4 +45,5 @@ default = [] hdfs = ["ballista-core/hdfs"] s3 = ["ballista-core/s3"] gcs = ["ballista-core/gcs"] +azure = ["ballista-core/azure"] standalone = ["ballista-executor", "ballista-scheduler"] diff --git a/ballista/rust/core/Cargo.toml b/ballista/rust/core/Cargo.toml index d512e063c..45a192517 100644 --- a/ballista/rust/core/Cargo.toml +++ b/ballista/rust/core/Cargo.toml @@ -37,6 +37,7 @@ force_hash_collisions = ["datafusion/force_hash_collisions"] hdfs = ["datafusion-objectstore-hdfs"] s3 = ["object_store/aws"] gcs = ["object_store/gcp"] +azure = ["object_store/azure"] simd = ["datafusion/simd"] [dependencies] diff --git a/ballista/rust/core/src/utils.rs b/ballista/rust/core/src/utils.rs index cab189d6e..697567fe0 100644 --- a/ballista/rust/core/src/utils.rs +++ b/ballista/rust/core/src/utils.rs @@ -50,11 +50,13 @@ use datafusion_proto::logical_plan::{ }; use futures::StreamExt; use log::error; -use object_store::ObjectStore; #[cfg(feature = "s3")] use object_store::aws::AmazonS3Builder; +#[cfg(feature = "azure")] +use object_store::azure::MicrosoftAzureBuilder; #[cfg(feature = "gcs")] use object_store::gcp::GoogleCloudStorageBuilder; +use object_store::ObjectStore; use std::io::{BufWriter, Write}; use std::marker::PhantomData; use std::sync::atomic::{AtomicUsize, Ordering}; @@ -123,6 +125,18 @@ impl ObjectStoreProvider for FeatureBasedObjectStoreProvider { } } + #[cfg(feature = "azure")] + { + if url.to_string().starts_with("azure://") { + if let Some(bucket_name) = url.host_str() { + let store = MicrosoftAzureBuilder::from_env() + .with_container_name(bucket_name) + .build()?; + return Ok(Arc::new(store)); + } + } + } + Err(DataFusionError::Execution(format!( "No object store available for {}", url From 7928de621e936160a10eeb8c7a3330a4c935f7e8 Mon Sep 17 00:00:00 2001 From: Stefan Stanciulescu Date: Sun, 2 Oct 2022 11:31:30 -0700 Subject: [PATCH 4/5] Fix formatting issue on Cargo.toml --- ballista/rust/core/Cargo.toml | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/ballista/rust/core/Cargo.toml b/ballista/rust/core/Cargo.toml index 45a192517..5827b3ae9 100644 --- a/ballista/rust/core/Cargo.toml +++ b/ballista/rust/core/Cargo.toml @@ -14,7 +14,6 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. - [package] name = "ballista-core" description = "Ballista Distributed Compute" @@ -31,18 +30,17 @@ build = "build.rs" rustc-args = ["--cfg", "docsrs"] [features] +azure = ["object_store/azure"] # Used for testing ONLY: causes all values to hash to the same value (test for collisions) force_hash_collisions = ["datafusion/force_hash_collisions"] +gcs = ["object_store/gcp"] # Used to enable hdfs to be registered in the ObjectStoreRegistry by default hdfs = ["datafusion-objectstore-hdfs"] s3 = ["object_store/aws"] -gcs = ["object_store/gcp"] -azure = ["object_store/azure"] simd = ["datafusion/simd"] [dependencies] ahash = { version = "0.8", default-features = false } - arrow-flight = { version = "23.0.0", features = ["flight-sql-experimental"] } async-trait = "0.1.41" chrono = { version = "0.4", default-features = false } @@ -52,13 +50,11 @@ datafusion-objectstore-hdfs = { version = "0.1.0", optional = true } datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "06a4f79f02fcb6ea85303925b7c5a9b0231e3fee" } futures = "0.3" hashbrown = "0.12" - itertools = "0.10" libloading = "0.7.3" log = "0.4" object_store = "0.5.0" once_cell = "1.9.0" - parking_lot = "0.12" parse_arg = "0.1.3" prost = "0.11" @@ -78,4 +74,7 @@ tempfile = "3" [build-dependencies] rustc_version = "0.4.0" -tonic-build = { version = "0.8", default-features = false, features = ["transport", "prost"] } +tonic-build = { version = "0.8", default-features = false, features = [ + "transport", + "prost", +] } From 78510c48ce264376d282342e1b0fc90c9d03c3fd Mon Sep 17 00:00:00 2001 From: Stefan Stanciulescu Date: Sun, 2 Oct 2022 12:32:13 -0700 Subject: [PATCH 5/5] Try again with the toml formatting...: --- ballista/rust/client/Cargo.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ballista/rust/client/Cargo.toml b/ballista/rust/client/Cargo.toml index 5e327a2da..450b61f47 100644 --- a/ballista/rust/client/Cargo.toml +++ b/ballista/rust/client/Cargo.toml @@ -41,9 +41,9 @@ tempfile = "3" tokio = "1.0" [features] +azure = ["ballista-core/azure"] default = [] +gcs = ["ballista-core/gcs"] hdfs = ["ballista-core/hdfs"] s3 = ["ballista-core/s3"] -gcs = ["ballista-core/gcs"] -azure = ["ballista-core/azure"] standalone = ["ballista-executor", "ballista-scheduler"]