diff --git a/src/context/iceberg.rs b/src/context/iceberg.rs index 92ac17f7..bf23ee66 100644 --- a/src/context/iceberg.rs +++ b/src/context/iceberg.rs @@ -12,7 +12,7 @@ use datafusion::error::Result; use datafusion::execution::{RecordBatchStream, TaskContext}; use datafusion::physical_plan::ExecutionPlan; use datafusion::physical_plan::ExecutionPlanProperties; -use datafusion_common::{DataFusionError, TableReference}; +use datafusion_common::DataFusionError; use futures::stream::select_all; use futures::{pin_mut, StreamExt, TryStream, TryStreamExt}; use iceberg::io::FileIO; @@ -22,7 +22,6 @@ use iceberg::spec::{ ManifestWriter, Operation, PartitionSpec, Snapshot, SnapshotReference, SnapshotRetention, Struct, Summary, TableMetadata, TableMetadataBuilder, }; -use iceberg::table::Table; use iceberg::writer::file_writer::location_generator::{ DefaultFileNameGenerator, DefaultLocationGenerator, }; @@ -35,7 +34,7 @@ use tracing::{info, warn}; use url::Url; use uuid::Uuid; -use super::{LakehouseTableProvider, SeafowlContext}; +use super::SeafowlContext; use thiserror::Error; @@ -156,15 +155,13 @@ const DEFAULT_SCHEMA_ID: i32 = 0; pub async fn record_batches_to_iceberg( record_batch_stream: impl TryStream>, arrow_schema: SchemaRef, - table: &Table, + file_io: &FileIO, + table_location: &str, ) -> Result<(), DataLoadingError> { pin_mut!(record_batch_stream); - let table_location = table.metadata().location(); let table_base_url = Url::parse(table_location).unwrap(); - let file_io = table.file_io(); - let version_hint_location = format!("{}/metadata/version-hint.text", table_base_url); let version_hint_input = file_io.new_input(&version_hint_location)?; let old_version_hint: Option = if version_hint_input.exists().await? { @@ -386,18 +383,10 @@ pub async fn record_batches_to_iceberg( impl SeafowlContext { pub async fn plan_to_iceberg_table( &self, - name: impl Into, + file_io: &FileIO, + table_location: &str, plan: &Arc, ) -> Result<()> { - let provider = match self.get_lakehouse_table_provider(name).await? { - LakehouseTableProvider::Iceberg(p) => p, - _ => { - return Err(DataFusionError::Internal( - "Expected iceberg provider".to_string(), - )); - } - }; - let table = provider.table(); let schema = plan.schema(); let mut streams: Vec>> = vec![]; for i in 0..plan.output_partitioning().partition_count() { @@ -411,7 +400,8 @@ impl SeafowlContext { DataLoadingError::BadInputError(format!("Datafusion error: {}", e)) }), schema, - &table, + file_io, + table_location, ) .await .map_err(|e| DataFusionError::External(Box::new(e)))?; diff --git a/src/context/physical.rs b/src/context/physical.rs index ddbd5ab6..885be71a 100644 --- a/src/context/physical.rs +++ b/src/context/physical.rs @@ -206,8 +206,11 @@ impl SeafowlContext { .await?; Ok(make_dummy_exec()) } - LakehouseTableProvider::Iceberg(_) => { - self.plan_to_iceberg_table(table_name.clone(), &physical) + LakehouseTableProvider::Iceberg(provider) => { + let table = provider.table(); + let table_location = table.metadata().location(); + let file_io = table.file_io(); + self.plan_to_iceberg_table(file_io, table_location, &physical) .await?; Ok(make_dummy_exec()) } diff --git a/src/frontend/flight/handler.rs b/src/frontend/flight/handler.rs index 36287b9f..48b48b6e 100644 --- a/src/frontend/flight/handler.rs +++ b/src/frontend/flight/handler.rs @@ -10,7 +10,9 @@ use dashmap::DashMap; use datafusion::common::Result; use datafusion::execution::SendableRecordBatchStream; use datafusion_common::DataFusionError; +use iceberg::io::FileIO; use lazy_static::lazy_static; +use object_store_factory::object_store_opts_to_file_io_props; use prost::Message; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; @@ -23,7 +25,7 @@ use url::Url; use crate::context::SeafowlContext; use crate::sync::schema::SyncSchema; use crate::sync::writer::SeafowlDataSyncWriter; -use crate::sync::{LakehouseSyncTarget, SyncError, SyncResult}; +use crate::sync::{IcebergSyncTarget, LakehouseSyncTarget, SyncError, SyncResult}; lazy_static! { pub static ref SEAFOWL_SQL_DATA: SqlInfoData = { @@ -163,7 +165,7 @@ impl SeafowlFlightHandler { }); } - let (sync_target, url) = match cmd.format() { + let sync_target = match cmd.format() { TableFormat::Delta => { let log_store = match cmd.store { None => self @@ -186,14 +188,40 @@ impl SeafowlFlightHandler { .await? } }; - let url = log_store.root_uri(); - (LakehouseSyncTarget::Delta(log_store), url) + LakehouseSyncTarget::Delta(log_store) } TableFormat::Iceberg => { - return Err(SyncError::NotImplemented); + let (location, file_io) = match cmd.store { + None => { + return Err(SyncError::NotImplemented); + } + Some(store_loc) => { + let location = store_loc.location; + let options = store_loc.options; + let file_io_props = object_store_opts_to_file_io_props(&options); + let file_io = FileIO::from_path(&location) + .unwrap() + .with_props(file_io_props) + .build()?; + (location, file_io) + } + }; + + // Create the full path to table metadata by combining the object store location and + // relative table metadata path + let absolute_path = format!( + "{}/{}", + location.trim_end_matches("/"), + cmd.path.trim_start_matches("/") + ); + LakehouseSyncTarget::Iceberg(IcebergSyncTarget { + file_io, + url: absolute_path.clone(), + }) } }; + let url = sync_target.get_url(); let num_batches = batches.len(); debug!("Processing data change with {num_rows} rows, {num_batches} batches, descriptor {sync_schema}, url {url} from origin {:?} at position {:?}", diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 01181583..4c13969e 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -1,5 +1,6 @@ use crate::sync::writer::SeafowlDataSyncWriter; use deltalake::logstore::LogStore; +use iceberg::io::FileIO; use serde::{Deserialize, Serialize}; use std::sync::Arc; use std::time::Duration; @@ -38,6 +39,9 @@ pub enum SyncError { #[error(transparent)] DeltaTableError(#[from] deltalake::errors::DeltaTableError), + #[error(transparent)] + IcebergError(#[from] iceberg::Error), + #[error(transparent)] ObjectStoreError(#[from] object_store::Error), } @@ -59,7 +63,8 @@ pub(super) struct SyncCommitInfo { #[derive(Clone, Debug)] pub struct IcebergSyncTarget { - url: String, + pub file_io: FileIO, + pub url: String, } #[derive(Clone, Debug)] @@ -68,6 +73,15 @@ pub enum LakehouseSyncTarget { Iceberg(IcebergSyncTarget), } +impl LakehouseSyncTarget { + pub fn get_url(&self) -> String { + match self { + LakehouseSyncTarget::Iceberg(IcebergSyncTarget { url, .. }) => url.clone(), + LakehouseSyncTarget::Delta(log_store) => log_store.root_uri(), + } + } +} + impl SyncCommitInfo { pub(super) fn new( origin: impl Into, diff --git a/src/sync/planner.rs b/src/sync/planner.rs index e60dd173..2544d36b 100644 --- a/src/sync/planner.rs +++ b/src/sync/planner.rs @@ -15,10 +15,12 @@ use datafusion::execution::session_state::{SessionState, SessionStateBuilder}; use datafusion::physical_expr::create_physical_expr; use datafusion::physical_optimizer::pruning::PruningPredicate; use datafusion::physical_plan::ExecutionPlan; -use datafusion_common::{Column, DFSchemaRef, JoinType, ScalarValue, ToDFSchema}; +use datafusion_common::{ + Column, DFSchema, DFSchemaRef, JoinType, ScalarValue, ToDFSchema, +}; use datafusion_expr::execution_props::ExecutionProps; use datafusion_expr::{ - col, lit, when, Expr, LogicalPlan, LogicalPlanBuilder, Projection, + col, lit, when, EmptyRelation, Expr, LogicalPlan, LogicalPlanBuilder, Projection, }; use datafusion_functions_nested::expr_fn::make_array; use deltalake::delta_datafusion::DeltaTableProvider; @@ -30,6 +32,8 @@ use std::sync::Arc; use std::time::Instant; use tracing::{debug, info, trace, warn}; +use super::SyncError; + pub(super) const LOWER_REL: &str = "__lower_rel"; pub(super) const UPPER_REL: &str = "__upper_rel"; pub(super) const UPSERT_COL: &str = "__upsert_col"; @@ -48,6 +52,51 @@ impl SeafowlSyncPlanner { } } + pub(super) async fn plan_iceberg_syncs( + &self, + syncs: &[DataSyncItem], + table_schema: Arc, + ) -> SyncResult> { + for sync in syncs { + for sc in sync.sync_schema.columns() { + if sc.role() == ColumnRole::OldPk { + for batch in &sync.data { + let null_count = batch + .column_by_name(sc.field().name()) + .expect("Old PK array must exist") + .null_count(); + if batch.num_rows() != null_count { + return Err(SyncError::InvalidMessage { + reason: "Old PK for Iceberg contains non-null value" + .to_string(), + }); + } + } + } + } + } + + let df_table_schema = DFSchema::try_from(table_schema.clone())?; + + let base_plan = + LogicalPlanBuilder::new(LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row: false, + schema: Arc::new(df_table_schema), + })) + .alias(LOWER_REL)? + .build()?; + + let base_df = DataFrame::new(self.session_state(), base_plan); + + let (sync_schema, sync_df) = self.squash_syncs(syncs)?; + let (sync_schema, sync_df) = self.normalize_syncs(&sync_schema, sync_df)?; + let input_df = self + .apply_syncs(table_schema, base_df, sync_df, &sync_schema) + .await?; + let input_plan = input_df.create_physical_plan().await?; + Ok(input_plan) + } + // Construct a plan for flushing the pending syncs to the provided table. // Return the plan and the files that are re-written by it (to be removed from the table state). pub(super) async fn plan_delta_syncs( diff --git a/src/sync/writer.rs b/src/sync/writer.rs index 46e99bdd..2027c2d3 100644 --- a/src/sync/writer.rs +++ b/src/sync/writer.rs @@ -5,22 +5,32 @@ use deltalake::kernel::{Action, Schema}; use deltalake::operations::create::CreateBuilder; use deltalake::protocol::{DeltaOperation, SaveMode}; use deltalake::DeltaTable; +use futures::stream; +use iceberg::arrow::schema_to_arrow_schema; +use iceberg::table::StaticTable; +use iceberg::TableIdent; +use iceberg_datafusion::IcebergTableProvider; use indexmap::IndexMap; use itertools::Itertools; use std::collections::{HashMap, HashSet}; use std::sync::Arc; use std::time::{Instant, SystemTime, UNIX_EPOCH}; use tracing::{debug, info}; +use url::Url; use uuid::Uuid; +use crate::catalog::DEFAULT_SCHEMA; use crate::context::delta::plan_to_delta_adds; +use crate::context::iceberg::record_batches_to_iceberg; use crate::context::SeafowlContext; use crate::sync::metrics::SyncWriterMetrics; use crate::sync::planner::SeafowlSyncPlanner; use crate::sync::schema::SyncSchema; use crate::sync::utils::{get_size_and_rows, squash_batches}; -use crate::sync::{Origin, SequenceNumber, SyncCommitInfo, SyncError, SyncResult}; +use crate::sync::{ + IcebergSyncTarget, Origin, SequenceNumber, SyncCommitInfo, SyncError, SyncResult, +}; use super::LakehouseSyncTarget; @@ -292,7 +302,7 @@ impl SeafowlDataSyncWriter { &self, sync_target: LakehouseSyncTarget, sync_schema: &SyncSchema, - ) -> SyncResult { + ) -> SyncResult<()> { // Get the actual table schema by removing the OldPk and Changed column roles from the schema. let mut builder = SchemaBuilder::new(); sync_schema.columns().iter().for_each(|col| { @@ -305,16 +315,58 @@ impl SeafowlDataSyncWriter { LakehouseSyncTarget::Delta(log_store) => { let delta_schema = Schema::try_from(&builder.finish())?; - Ok(CreateBuilder::new() + CreateBuilder::new() .with_log_store(log_store) .with_columns(delta_schema.fields().cloned()) .with_comment(format!( "Synced by Seafowl {}", env!("CARGO_PKG_VERSION") )) - .await?) + .await?; + Ok(()) + } + LakehouseSyncTarget::Iceberg(IcebergSyncTarget { file_io, url, .. }) => { + let mut table_location = Url::parse(&url).unwrap(); + + // Turn metadata location into table location + // E.g. s3://a/b/metadata/v3.metadata.json -> s3://a/b + match table_location.path_segments_mut() { + Ok(mut segments) => segments.pop_if_empty().pop(), + Err(_) => { + return Err(SyncError::InvalidMessage { + reason: format!( + "Could not compute metadata directory from URL: {}", + url + ), + }) + } + }; + match table_location.path_segments_mut() { + Ok(mut segments) => segments.pop_if_empty().pop(), + Err(_) => { + return Err(SyncError::InvalidMessage { + reason: format!( + "Could not compute table directory from URL: {}", + url + ), + }) + } + }; + + match record_batches_to_iceberg( + stream::empty(), + builder.finish().into(), + &file_io, + table_location.as_str(), + ) + .await + { + Err(e) => Err(SyncError::InvalidMessage { + reason: e.to_string(), + }), + Ok(_) => Ok(()), + } } - LakehouseSyncTarget::Iceberg(..) => Err(SyncError::NotImplemented), } } @@ -491,8 +543,33 @@ impl SeafowlDataSyncWriter { "Committed data sync up to {new_sync_commit:?} for location {url}" ); } - LakehouseSyncTarget::Iceberg(..) => { - return Err(SyncError::NotImplemented); + LakehouseSyncTarget::Iceberg(IcebergSyncTarget { file_io, url, .. }) => { + if !file_io.exists(url).await.unwrap() { + self.create_table( + entry.sync_target.clone(), + &entry.syncs.first().unwrap().sync_schema, + ) + .await?; + } + let iceberg_table = StaticTable::from_metadata_file( + url, + TableIdent::from_strs(vec![DEFAULT_SCHEMA, "dummy_name"]).unwrap(), + file_io.clone(), + ) + .await? + .into_table(); + let table_provider = + IcebergTableProvider::try_new_from_table(iceberg_table).await?; + let table = table_provider.table(); + let schema = + schema_to_arrow_schema(table.metadata().current_schema()).unwrap(); + let planner = SeafowlSyncPlanner::new(self.context.clone()); + let plan = planner + .plan_iceberg_syncs(&entry.syncs, Arc::new(schema)) + .await?; + self.context + .plan_to_iceberg_table(file_io, url, &plan) + .await?; } }; diff --git a/tests/flight/sync.rs b/tests/flight/sync.rs index 4c3494bb..6feff33c 100644 --- a/tests/flight/sync.rs +++ b/tests/flight/sync.rs @@ -609,3 +609,78 @@ async fn test_sync_custom_store( Ok(()) } + +#[rstest] +#[case(false)] +#[case(true)] +#[tokio::test] +async fn test_sync_iceberg_custom_store( + #[case] is_existing_table: bool, +) -> std::result::Result<(), Box> { + let (_ctx, mut client) = flight_server(TestServerType::Memory).await; + + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, true), + Field::new("b", DataType::Int32, true), + Field::new("c", DataType::Utf8, true), + ])); + let column_descriptors = vec![ + ColumnDescriptor { + role: ColumnRole::OldPk as _, + name: "key".to_string(), + }, + ColumnDescriptor { + role: ColumnRole::NewPk as _, + name: "key".to_string(), + }, + ColumnDescriptor { + role: ColumnRole::Value as _, + name: "value".to_string(), + }, + ]; + + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::new_null(100_000)), + Arc::new(Int32Array::from((0..100_000).collect::>())), + Arc::new(StringArray::from(vec!["a"; 100_000])), + ], + )?; + + let location = "s3://seafowl-test-bucket/test-data/iceberg/default.db"; + let path = if is_existing_table { + "iceberg_table_2/metadata/v1.metadata.json" + } else { + "iceberg_table_3/metadata/v0.metadata.json" + }; + let options = minio_options(); + + let store = StorageLocation { + name: "custom-store".to_string(), + location: location.to_string(), + options, + }; + + let cmd = DataSyncCommand { + path: path.to_string(), + store: Some(store), + column_descriptors, + origin: "42".to_string(), + sequence_number: Some(1000), + format: TableFormat::Iceberg.into(), + }; + + let sync_result = do_put_sync(cmd.clone(), batch.clone(), &mut client).await?; + assert_eq!( + sync_result, + DataSyncResponse { + accepted: true, + memory_sequence_number: Some(1000), + durable_sequence_number: Some(1000), + first: true, + } + ); + + Ok(()) +}