Skip to content

Commit

Permalink
added in delta lake functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
CHRISCARLON committed Oct 12, 2024
1 parent 512e3a2 commit 1d5f05e
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 0 deletions.
6 changes: 6 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ serde_json = "1.0.124"
indicatif = "0.17.8"
bytes = "1.7.2"
csv = "1.3.0"
aws-config = "1.5.5"
aws-credential-types = "1.2.0"
aws-sdk-sts = "1.39.0"
aws-types = "1.3.3"
deltalake = "0.19.0"
deltalake-aws = "0.1.4"

[lib]
name = "nebby"
Expand Down
64 changes: 64 additions & 0 deletions src/delta_lake/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
use aws_config;
use aws_config::BehaviorVersion;
use aws_sdk_sts::config::ProvideCredentials;
use deltalake::{open_table_with_storage_options, DeltaTable, DeltaTableError};
use std::collections::HashMap;
use std::time::Duration;

// Load AWS Creds into a hashmap for use with delta lake reader
pub fn get_aws_config() -> Result<HashMap<String, String>, Box<dyn std::error::Error>> {
let runtime = tokio::runtime::Runtime::new().unwrap();
let config = runtime.block_on(async {
aws_config::defaults(BehaviorVersion::latest())
.retry_config(aws_config::retry::RetryConfig::standard().with_max_attempts(5))
.timeout_config(
aws_config::timeout::TimeoutConfig::builder()
.operation_timeout(Duration::from_secs(30))
.build(),
)
.load()
.await
});

let mut aws_info = HashMap::new();
// Add credentials to HashMap if available
if let Some(creds_provider) = config.credentials_provider() {
match runtime.block_on(creds_provider.provide_credentials()) {
Ok(creds) => {
aws_info.insert(
"AWS_ACCESS_KEY_ID".to_string(),
creds.access_key_id().to_string(),
);
aws_info.insert(
"AWS_SECRET_ACCESS_KEY".to_string(),
creds.secret_access_key().to_string(),
);
if let Some(session_token) = creds.session_token() {
aws_info.insert("AWS_SESSION_TOKEN".to_string(), session_token.to_string());
}
}
Err(e) => return Err(format!("Failed to retrieve credentials: {}", e).into()),
}
} else {
return Err("No credentials provider found in the configuration".into());
}
// Add success message
println!("AWS configuration loaded successfully and added to HashMap.");
Ok(aws_info)
}

// Read basic info about delta lake stored in S3
pub fn load_remote_delta_lake_table_info(
s3_uri: &str,
credential_hash_map: HashMap<String, String>,
) -> Result<DeltaTable, DeltaTableError> {
let storage_options: HashMap<String, String> = credential_hash_map;

deltalake_aws::register_handlers(None);

let remote_delta_lake_table = open_table(s3_uri, Some(storage_options))?;

println!("version: {}", remote_delta_lake_table.version());
println!("metadata: {:?}", remote_delta_lake_table.metadata());
Ok(remote_delta_lake_table)
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod api;
pub mod bytes;
pub mod csv;
pub mod delta_lake;
pub mod excel;
pub mod utils;
32 changes: 32 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
mod api;
mod bytes;
mod csv;
mod delta_lake;
mod excel;
mod utils;
use api::analyze_json_nesting;
use bytes::{get_file_type_string, view_bytes};
use clap::{Parser, Subcommand};
use csv::{fetch_remote_csv, process_basic_csv};
use delta_lake::{get_aws_config, load_remote_delta_lake_table_info};
use excel::{
analyze_excel_formatting, display_remote_basic_info,
display_remote_basic_info_specify_header_idx, excel_quick_view, fetch_remote_file,
Expand Down Expand Up @@ -58,6 +60,12 @@ enum Commands {
},
/// Basic CSV feature
BasicCsv { url: String },
/// Process Delta Lake table
DeltaLake {
/// S3 URI of the Delta Lake table
#[arg(short, long)]
s3_uri: String,
},
}

// Call commands and file logic
Expand All @@ -71,6 +79,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
Commands::BasicJson { url } => process_json(url),
Commands::Nibble { url } => process_view_bytes(url),
Commands::BasicCsv { url } => process_csv(url),
Commands::DeltaLake { s3_uri } => process_delta_lake(s3_uri),
}
}

Expand Down Expand Up @@ -148,6 +157,29 @@ fn process_csv(url: &str) -> Result<(), Box<dyn std::error::Error>> {
result
}

fn process_delta_lake(s3_uri: &str) -> Result<(), Box<dyn std::error::Error>> {
let pb = create_progress_bar("Processing Delta Lake table...");

match get_aws_config() {
Ok(config) => match load_remote_delta_lake_table_info(s3_uri, config) {
Ok(_table) => {
pb.finish_with_message("Successfully loaded the Delta table");
Ok(())
}
Err(e) => {
pb.finish_with_message("Error loading the Delta table");
eprintln!("Error loading the Delta table: {}", e);
Err(e.into())
}
},
Err(e) => {
pb.finish_with_message("Error getting AWS configuration");
eprintln!("Error getting AWS configuration: {}", e);
Err(e.into())
}
}
}

fn validate_url(url: &str) -> Result<(), Box<dyn std::error::Error>> {
if url.is_empty() {
return Err("Error: URL cannot be empty".into());
Expand Down

0 comments on commit 1d5f05e

Please sign in to comment.