From 1d5f05e1fd33e09200ff37dab71c90b7470a4790 Mon Sep 17 00:00:00 2001 From: Chris Carlon Date: Sat, 12 Oct 2024 15:57:54 +0100 Subject: [PATCH] added in delta lake functionality --- Cargo.toml | 6 ++++ src/delta_lake/mod.rs | 64 +++++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 1 + src/main.rs | 32 ++++++++++++++++++++++ 4 files changed, 103 insertions(+) create mode 100644 src/delta_lake/mod.rs diff --git a/Cargo.toml b/Cargo.toml index 6fc6a13..67edf54 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,6 +25,12 @@ serde_json = "1.0.124" indicatif = "0.17.8" bytes = "1.7.2" csv = "1.3.0" +aws-config = "1.5.5" +aws-credential-types = "1.2.0" +aws-sdk-sts = "1.39.0" +aws-types = "1.3.3" +deltalake = "0.19.0" +deltalake-aws = "0.1.4" [lib] name = "nebby" diff --git a/src/delta_lake/mod.rs b/src/delta_lake/mod.rs new file mode 100644 index 0000000..9855a7a --- /dev/null +++ b/src/delta_lake/mod.rs @@ -0,0 +1,64 @@ +use aws_config; +use aws_config::BehaviorVersion; +use aws_sdk_sts::config::ProvideCredentials; +use deltalake::{open_table_with_storage_options, DeltaTable, DeltaTableError}; +use std::collections::HashMap; +use std::time::Duration; + +// Load AWS Creds into a hashmap for use with delta lake reader +pub fn get_aws_config() -> Result, Box> { + let runtime = tokio::runtime::Runtime::new().unwrap(); + let config = runtime.block_on(async { + aws_config::defaults(BehaviorVersion::latest()) + .retry_config(aws_config::retry::RetryConfig::standard().with_max_attempts(5)) + .timeout_config( + aws_config::timeout::TimeoutConfig::builder() + .operation_timeout(Duration::from_secs(30)) + .build(), + ) + .load() + .await + }); + + let mut aws_info = HashMap::new(); + // Add credentials to HashMap if available + if let Some(creds_provider) = config.credentials_provider() { + match runtime.block_on(creds_provider.provide_credentials()) { + Ok(creds) => { + aws_info.insert( + "AWS_ACCESS_KEY_ID".to_string(), + creds.access_key_id().to_string(), + ); + aws_info.insert( + "AWS_SECRET_ACCESS_KEY".to_string(), + creds.secret_access_key().to_string(), + ); + if let Some(session_token) = creds.session_token() { + aws_info.insert("AWS_SESSION_TOKEN".to_string(), session_token.to_string()); + } + } + Err(e) => return Err(format!("Failed to retrieve credentials: {}", e).into()), + } + } else { + return Err("No credentials provider found in the configuration".into()); + } + // Add success message + println!("AWS configuration loaded successfully and added to HashMap."); + Ok(aws_info) +} + +// Read basic info about delta lake stored in S3 +pub fn load_remote_delta_lake_table_info( + s3_uri: &str, + credential_hash_map: HashMap, +) -> Result { + let storage_options: HashMap = credential_hash_map; + + deltalake_aws::register_handlers(None); + + let remote_delta_lake_table = open_table(s3_uri, Some(storage_options))?; + + println!("version: {}", remote_delta_lake_table.version()); + println!("metadata: {:?}", remote_delta_lake_table.metadata()); + Ok(remote_delta_lake_table) +} diff --git a/src/lib.rs b/src/lib.rs index 2bde365..7024c15 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,6 @@ pub mod api; pub mod bytes; pub mod csv; +pub mod delta_lake; pub mod excel; pub mod utils; diff --git a/src/main.rs b/src/main.rs index 033c563..e0fa0a4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,12 +1,14 @@ mod api; mod bytes; mod csv; +mod delta_lake; mod excel; mod utils; use api::analyze_json_nesting; use bytes::{get_file_type_string, view_bytes}; use clap::{Parser, Subcommand}; use csv::{fetch_remote_csv, process_basic_csv}; +use delta_lake::{get_aws_config, load_remote_delta_lake_table_info}; use excel::{ analyze_excel_formatting, display_remote_basic_info, display_remote_basic_info_specify_header_idx, excel_quick_view, fetch_remote_file, @@ -58,6 +60,12 @@ enum Commands { }, /// Basic CSV feature BasicCsv { url: String }, + /// Process Delta Lake table + DeltaLake { + /// S3 URI of the Delta Lake table + #[arg(short, long)] + s3_uri: String, + }, } // Call commands and file logic @@ -71,6 +79,7 @@ fn main() -> Result<(), Box> { Commands::BasicJson { url } => process_json(url), Commands::Nibble { url } => process_view_bytes(url), Commands::BasicCsv { url } => process_csv(url), + Commands::DeltaLake { s3_uri } => process_delta_lake(s3_uri), } } @@ -148,6 +157,29 @@ fn process_csv(url: &str) -> Result<(), Box> { result } +fn process_delta_lake(s3_uri: &str) -> Result<(), Box> { + let pb = create_progress_bar("Processing Delta Lake table..."); + + match get_aws_config() { + Ok(config) => match load_remote_delta_lake_table_info(s3_uri, config) { + Ok(_table) => { + pb.finish_with_message("Successfully loaded the Delta table"); + Ok(()) + } + Err(e) => { + pb.finish_with_message("Error loading the Delta table"); + eprintln!("Error loading the Delta table: {}", e); + Err(e.into()) + } + }, + Err(e) => { + pb.finish_with_message("Error getting AWS configuration"); + eprintln!("Error getting AWS configuration: {}", e); + Err(e.into()) + } + } +} + fn validate_url(url: &str) -> Result<(), Box> { if url.is_empty() { return Err("Error: URL cannot be empty".into());