Skip to content

Commit

Permalink
DeltaLake refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
CHRISCARLON committed Oct 13, 2024
1 parent 6ff3579 commit 3e0246a
Showing 1 changed file with 7 additions and 5 deletions.
12 changes: 7 additions & 5 deletions src/delta_lake/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use aws_sdk_sts::config::ProvideCredentials;
use colored::Colorize;
use deltalake::arrow::record_batch::RecordBatch;
use deltalake::datafusion::execution::context::SessionContext;
use deltalake::{open_table_with_storage_options, DeltaOps, DeltaTableError};
use deltalake::{open_table_with_storage_options, DeltaOps};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -54,7 +54,7 @@ pub async fn get_aws_config() -> Result<HashMap<String, String>, Box<dyn std::er
pub async fn load_remote_delta_lake_table_info(
s3_uri: &str,
credential_hash_map: HashMap<String, String>,
) -> Result<(), DeltaTableError> {
) -> Result<(), Box<dyn std::error::Error>> {
// load credentials
let storage_options: HashMap<String, String> = credential_hash_map;

Expand All @@ -74,16 +74,18 @@ pub async fn load_remote_delta_lake_table_info(
let ctx = SessionContext::new();

// Register table
ctx.register_table("data", Arc::new(table)).unwrap();
// Register table
ctx.register_table("data", Arc::new(table))
.map_err(|e| format!("Failed to register table: {}", e))?;

// Create batches
let batches = ctx
.sql("SELECT * FROM data")
.await
.unwrap()
.map_err(|e| format!("SQL Query Failed: {}", e))?
.collect()
.await
.unwrap();
.map_err(|e| format!("Could not read results: {}", e))?;

for batch in batches {
println!("{}", "DeltaLake Output: Columns and RecordBatch.".green());
Expand Down

0 comments on commit 3e0246a

Please sign in to comment.