Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Iceberg sync #771

Merged
merged 7 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 8 additions & 18 deletions src/context/iceberg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use datafusion::error::Result;
use datafusion::execution::{RecordBatchStream, TaskContext};
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::ExecutionPlanProperties;
use datafusion_common::{DataFusionError, TableReference};
use datafusion_common::DataFusionError;
use futures::stream::select_all;
use futures::{pin_mut, StreamExt, TryStream, TryStreamExt};
use iceberg::io::FileIO;
Expand All @@ -22,7 +22,6 @@ use iceberg::spec::{
ManifestWriter, Operation, PartitionSpec, Snapshot, SnapshotReference,
SnapshotRetention, Struct, Summary, TableMetadata, TableMetadataBuilder,
};
use iceberg::table::Table;
use iceberg::writer::file_writer::location_generator::{
DefaultFileNameGenerator, DefaultLocationGenerator,
};
Expand All @@ -35,7 +34,7 @@ use tracing::{info, warn};
use url::Url;
use uuid::Uuid;

use super::{LakehouseTableProvider, SeafowlContext};
use super::SeafowlContext;

use thiserror::Error;

Expand Down Expand Up @@ -156,15 +155,13 @@ const DEFAULT_SCHEMA_ID: i32 = 0;
pub async fn record_batches_to_iceberg(
record_batch_stream: impl TryStream<Item = Result<RecordBatch, DataLoadingError>>,
arrow_schema: SchemaRef,
table: &Table,
file_io: &FileIO,
table_location: &str,
) -> Result<(), DataLoadingError> {
pin_mut!(record_batch_stream);

let table_location = table.metadata().location();
let table_base_url = Url::parse(table_location).unwrap();

let file_io = table.file_io();

let version_hint_location = format!("{}/metadata/version-hint.text", table_base_url);
let version_hint_input = file_io.new_input(&version_hint_location)?;
let old_version_hint: Option<u64> = if version_hint_input.exists().await? {
Expand Down Expand Up @@ -386,18 +383,10 @@ pub async fn record_batches_to_iceberg(
impl SeafowlContext {
pub async fn plan_to_iceberg_table(
&self,
name: impl Into<TableReference>,
file_io: &FileIO,
table_location: &str,
plan: &Arc<dyn ExecutionPlan>,
) -> Result<()> {
let provider = match self.get_lakehouse_table_provider(name).await? {
LakehouseTableProvider::Iceberg(p) => p,
_ => {
return Err(DataFusionError::Internal(
"Expected iceberg provider".to_string(),
));
}
};
let table = provider.table();
let schema = plan.schema();
let mut streams: Vec<Pin<Box<dyn RecordBatchStream + Send>>> = vec![];
for i in 0..plan.output_partitioning().partition_count() {
Expand All @@ -411,7 +400,8 @@ impl SeafowlContext {
DataLoadingError::BadInputError(format!("Datafusion error: {}", e))
}),
schema,
&table,
file_io,
table_location,
)
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?;
Expand Down
7 changes: 5 additions & 2 deletions src/context/physical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,11 @@ impl SeafowlContext {
.await?;
Ok(make_dummy_exec())
}
LakehouseTableProvider::Iceberg(_) => {
self.plan_to_iceberg_table(table_name.clone(), &physical)
LakehouseTableProvider::Iceberg(provider) => {
let table = provider.table();
let table_location = table.metadata().location();
let file_io = table.file_io();
self.plan_to_iceberg_table(file_io, table_location, &physical)
.await?;
Ok(make_dummy_exec())
}
Expand Down
38 changes: 33 additions & 5 deletions src/frontend/flight/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ use dashmap::DashMap;
use datafusion::common::Result;
use datafusion::execution::SendableRecordBatchStream;
use datafusion_common::DataFusionError;
use iceberg::io::FileIO;
use lazy_static::lazy_static;
use object_store_factory::object_store_opts_to_file_io_props;
use prost::Message;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
Expand All @@ -23,7 +25,7 @@ use url::Url;
use crate::context::SeafowlContext;
use crate::sync::schema::SyncSchema;
use crate::sync::writer::SeafowlDataSyncWriter;
use crate::sync::{LakehouseSyncTarget, SyncError, SyncResult};
use crate::sync::{IcebergSyncTarget, LakehouseSyncTarget, SyncError, SyncResult};

lazy_static! {
pub static ref SEAFOWL_SQL_DATA: SqlInfoData = {
Expand Down Expand Up @@ -163,7 +165,7 @@ impl SeafowlFlightHandler {
});
}

let (sync_target, url) = match cmd.format() {
let sync_target = match cmd.format() {
TableFormat::Delta => {
let log_store = match cmd.store {
None => self
Expand All @@ -186,14 +188,40 @@ impl SeafowlFlightHandler {
.await?
}
};
let url = log_store.root_uri();
(LakehouseSyncTarget::Delta(log_store), url)
LakehouseSyncTarget::Delta(log_store)
}
TableFormat::Iceberg => {
return Err(SyncError::NotImplemented);
let (location, file_io) = match cmd.store {
None => {
return Err(SyncError::NotImplemented);
}
Some(store_loc) => {
let location = store_loc.location;
let options = store_loc.options;
let file_io_props = object_store_opts_to_file_io_props(&options);
let file_io = FileIO::from_path(&location)
.unwrap()
.with_props(file_io_props)
.build()?;
(location, file_io)
}
};

// Create the full path to table metadata by combining the object store location and
// relative table metadata path
let absolute_path = format!(
"{}/{}",
location.trim_end_matches("/"),
cmd.path.trim_start_matches("/")
);
LakehouseSyncTarget::Iceberg(IcebergSyncTarget {
file_io,
url: absolute_path.clone(),
})
}
};

let url = sync_target.get_url();
let num_batches = batches.len();

debug!("Processing data change with {num_rows} rows, {num_batches} batches, descriptor {sync_schema}, url {url} from origin {:?} at position {:?}",
Expand Down
16 changes: 15 additions & 1 deletion src/sync/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::sync::writer::SeafowlDataSyncWriter;
use deltalake::logstore::LogStore;
use iceberg::io::FileIO;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -38,6 +39,9 @@ pub enum SyncError {
#[error(transparent)]
DeltaTableError(#[from] deltalake::errors::DeltaTableError),

#[error(transparent)]
IcebergError(#[from] iceberg::Error),

#[error(transparent)]
ObjectStoreError(#[from] object_store::Error),
}
Expand All @@ -59,7 +63,8 @@ pub(super) struct SyncCommitInfo {

#[derive(Clone, Debug)]
pub struct IcebergSyncTarget {
url: String,
pub file_io: FileIO,
pub url: String,
}

#[derive(Clone, Debug)]
Expand All @@ -68,6 +73,15 @@ pub enum LakehouseSyncTarget {
Iceberg(IcebergSyncTarget),
}

impl LakehouseSyncTarget {
pub fn get_url(&self) -> String {
match self {
LakehouseSyncTarget::Iceberg(IcebergSyncTarget { url, .. }) => url.clone(),
LakehouseSyncTarget::Delta(log_store) => log_store.root_uri(),
}
}
}

impl SyncCommitInfo {
pub(super) fn new(
origin: impl Into<Origin>,
Expand Down
53 changes: 51 additions & 2 deletions src/sync/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ use datafusion::execution::session_state::{SessionState, SessionStateBuilder};
use datafusion::physical_expr::create_physical_expr;
use datafusion::physical_optimizer::pruning::PruningPredicate;
use datafusion::physical_plan::ExecutionPlan;
use datafusion_common::{Column, DFSchemaRef, JoinType, ScalarValue, ToDFSchema};
use datafusion_common::{
Column, DFSchema, DFSchemaRef, JoinType, ScalarValue, ToDFSchema,
};
use datafusion_expr::execution_props::ExecutionProps;
use datafusion_expr::{
col, lit, when, Expr, LogicalPlan, LogicalPlanBuilder, Projection,
col, lit, when, EmptyRelation, Expr, LogicalPlan, LogicalPlanBuilder, Projection,
};
use datafusion_functions_nested::expr_fn::make_array;
use deltalake::delta_datafusion::DeltaTableProvider;
Expand All @@ -30,6 +32,8 @@ use std::sync::Arc;
use std::time::Instant;
use tracing::{debug, info, trace, warn};

use super::SyncError;

pub(super) const LOWER_REL: &str = "__lower_rel";
pub(super) const UPPER_REL: &str = "__upper_rel";
pub(super) const UPSERT_COL: &str = "__upsert_col";
Expand All @@ -48,6 +52,51 @@ impl SeafowlSyncPlanner {
}
}

pub(super) async fn plan_iceberg_syncs(
&self,
syncs: &[DataSyncItem],
table_schema: Arc<arrow_schema::Schema>,
) -> SyncResult<Arc<dyn ExecutionPlan>> {
for sync in syncs {
for sc in sync.sync_schema.columns() {
if sc.role() == ColumnRole::OldPk {
for batch in &sync.data {
let null_count = batch
.column_by_name(sc.field().name())
.expect("Old PK array must exist")
.null_count();
if batch.num_rows() != null_count {
return Err(SyncError::InvalidMessage {
reason: "Old PK for Iceberg contains non-null value"
.to_string(),
});
}
}
}
}
}

let df_table_schema = DFSchema::try_from(table_schema.clone())?;

let base_plan =
LogicalPlanBuilder::new(LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: Arc::new(df_table_schema),
}))
.alias(LOWER_REL)?
.build()?;

let base_df = DataFrame::new(self.session_state(), base_plan);

let (sync_schema, sync_df) = self.squash_syncs(syncs)?;
let (sync_schema, sync_df) = self.normalize_syncs(&sync_schema, sync_df)?;
let input_df = self
.apply_syncs(table_schema, base_df, sync_df, &sync_schema)
.await?;
let input_plan = input_df.create_physical_plan().await?;
Ok(input_plan)
}

// Construct a plan for flushing the pending syncs to the provided table.
// Return the plan and the files that are re-written by it (to be removed from the table state).
pub(super) async fn plan_delta_syncs(
Expand Down
Loading
Loading