From 5474769876b1cf53775c3be7823a1309efd80d20 Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Thu, 16 Jan 2025 22:08:21 -0800 Subject: [PATCH] addressed more pr comments --- kernel/src/table_changes/log_replay.rs | 43 ++++++++++---------- kernel/src/table_changes/log_replay/tests.rs | 2 +- kernel/src/table_changes/scan_file.rs | 8 ++-- 3 files changed, 27 insertions(+), 26 deletions(-) diff --git a/kernel/src/table_changes/log_replay.rs b/kernel/src/table_changes/log_replay.rs index bae517b69..8396cd9b7 100644 --- a/kernel/src/table_changes/log_replay.rs +++ b/kernel/src/table_changes/log_replay.rs @@ -19,7 +19,7 @@ use crate::schema::{ArrayType, ColumnNamesAndTypes, DataType, MapType, SchemaRef use crate::table_changes::scan_file::{cdf_scan_row_expression, cdf_scan_row_schema}; use crate::table_configuration::TableConfiguration; use crate::utils::require; -use crate::{DeltaResult, Engine, EngineData, Error, ExpressionRef, RowVisitor}; +use crate::{table, DeltaResult, Engine, EngineData, Error, ExpressionRef, RowVisitor}; use itertools::Itertools; @@ -181,30 +181,31 @@ fn process_cdf_commit( protocol: None, metadata: None, }; - visitor.visit_rows_of(actions.as_ref())?; - //table_configuration.with_protocol(visitor.protocol)?; - //if let Some(metadata) = visitor.metadata { - // table_configuration.with_metadata(metadata)?; - // // Currently, schema compatibility is defined as having equal schema types. In the - // // future, more permisive schema evolution will be supported. - // // See: https://github.com/delta-io/delta-kernel-rs/issues/523 - // require!( - // table_schema.as_ref() == table_configuration.schema(), - // Error::change_data_feed_incompatible_schema( - // table_schema, - // table_configuration.schema() - // ) - // ); - //} - if !table_configuration.is_cdf_read_supported() { - return Err(Error::change_data_feed_unsupported(commit_file.version)); + visitor.visit_rows_of(actions.as_ref())?; + let has_metadata = visitor.metadata.is_some(); + match (visitor.protocol, visitor.metadata) { + (None, None) => {} + (p, m) => { + let p = p.unwrap_or_else(|| table_configuration.protocol().clone()); + let m = m.unwrap_or_else(|| table_configuration.metadata().clone()); + *table_configuration = TableConfiguration::try_new(m, p)?; + if !table_configuration.is_cdf_read_supported() { + return Err(Error::change_data_feed_unsupported(commit_file.version)); + } + } + } + if has_metadata { + require!( + table_schema.as_ref() == table_configuration.schema(), + Error::change_data_feed_incompatible_schema( + table_schema, + table_configuration.schema() + ) + ); } } - table_configuration - .can_read_cdf() - .map_err(|_| Error::change_data_feed_unsupported(commit_file.version))?; // We resolve the remove deletion vector map after visiting the entire commit. if has_cdc_action { remove_dvs.clear(); diff --git a/kernel/src/table_changes/log_replay/tests.rs b/kernel/src/table_changes/log_replay/tests.rs index e3941fb6a..35778196e 100644 --- a/kernel/src/table_changes/log_replay/tests.rs +++ b/kernel/src/table_changes/log_replay/tests.rs @@ -49,7 +49,7 @@ fn table_config() -> TableConfiguration { Some([WriterFeatures::ColumnMapping]), ) .unwrap(); - TableConfiguration::new(metadata, protocol).unwrap() + TableConfiguration::try_new(metadata, protocol).unwrap() } fn get_segment( diff --git a/kernel/src/table_changes/scan_file.rs b/kernel/src/table_changes/scan_file.rs index 1ca91f94d..7a82889b9 100644 --- a/kernel/src/table_changes/scan_file.rs +++ b/kernel/src/table_changes/scan_file.rs @@ -251,7 +251,7 @@ mod tests { use crate::schema::{DataType, StructField, StructType}; use crate::table_changes::log_replay::table_changes_action_iter; use crate::table_configuration::TableConfiguration; - use crate::table_features::ReaderFeatures; + use crate::table_features::{ReaderFeatures, WriterFeatures}; use crate::utils::test_utils::{Action, LocalMockTable}; use crate::Engine; @@ -357,11 +357,11 @@ mod tests { let protocol = Protocol::try_new( 3, 7, - Some([ReaderFeatures::DeletionVectors]), - Some([ReaderFeatures::ColumnMapping]), + Some::>(vec![]), + Some::>(vec![]), ) .unwrap(); - let table_config = TableConfiguration::new(metadata, protocol).unwrap(); + let table_config = TableConfiguration::try_new(metadata, protocol).unwrap(); let scan_data = table_changes_action_iter( Arc::new(engine),