Skip to content

Commit

Permalink
addressed more pr comments
Browse files Browse the repository at this point in the history
  • Loading branch information
OussamaSaoudi committed Jan 17, 2025
1 parent a12e7ed commit 5474769
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 26 deletions.
43 changes: 22 additions & 21 deletions kernel/src/table_changes/log_replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::schema::{ArrayType, ColumnNamesAndTypes, DataType, MapType, SchemaRef
use crate::table_changes::scan_file::{cdf_scan_row_expression, cdf_scan_row_schema};
use crate::table_configuration::TableConfiguration;
use crate::utils::require;
use crate::{DeltaResult, Engine, EngineData, Error, ExpressionRef, RowVisitor};
use crate::{table, DeltaResult, Engine, EngineData, Error, ExpressionRef, RowVisitor};

use itertools::Itertools;

Expand Down Expand Up @@ -181,30 +181,31 @@ fn process_cdf_commit(
protocol: None,
metadata: None,
};
visitor.visit_rows_of(actions.as_ref())?;

//table_configuration.with_protocol(visitor.protocol)?;
//if let Some(metadata) = visitor.metadata {
// table_configuration.with_metadata(metadata)?;
// // Currently, schema compatibility is defined as having equal schema types. In the
// // future, more permisive schema evolution will be supported.
// // See: https://github.com/delta-io/delta-kernel-rs/issues/523
// require!(
// table_schema.as_ref() == table_configuration.schema(),
// Error::change_data_feed_incompatible_schema(
// table_schema,
// table_configuration.schema()
// )
// );
//}
if !table_configuration.is_cdf_read_supported() {
return Err(Error::change_data_feed_unsupported(commit_file.version));
visitor.visit_rows_of(actions.as_ref())?;
let has_metadata = visitor.metadata.is_some();
match (visitor.protocol, visitor.metadata) {
(None, None) => {}
(p, m) => {
let p = p.unwrap_or_else(|| table_configuration.protocol().clone());
let m = m.unwrap_or_else(|| table_configuration.metadata().clone());
*table_configuration = TableConfiguration::try_new(m, p)?;
if !table_configuration.is_cdf_read_supported() {
return Err(Error::change_data_feed_unsupported(commit_file.version));
}
}
}
if has_metadata {
require!(
table_schema.as_ref() == table_configuration.schema(),
Error::change_data_feed_incompatible_schema(
table_schema,
table_configuration.schema()
)
);
}
}

table_configuration
.can_read_cdf()
.map_err(|_| Error::change_data_feed_unsupported(commit_file.version))?;
// We resolve the remove deletion vector map after visiting the entire commit.
if has_cdc_action {
remove_dvs.clear();
Expand Down
2 changes: 1 addition & 1 deletion kernel/src/table_changes/log_replay/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ fn table_config() -> TableConfiguration {
Some([WriterFeatures::ColumnMapping]),
)
.unwrap();
TableConfiguration::new(metadata, protocol).unwrap()
TableConfiguration::try_new(metadata, protocol).unwrap()
}

fn get_segment(
Expand Down
8 changes: 4 additions & 4 deletions kernel/src/table_changes/scan_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ mod tests {
use crate::schema::{DataType, StructField, StructType};
use crate::table_changes::log_replay::table_changes_action_iter;
use crate::table_configuration::TableConfiguration;
use crate::table_features::ReaderFeatures;
use crate::table_features::{ReaderFeatures, WriterFeatures};
use crate::utils::test_utils::{Action, LocalMockTable};
use crate::Engine;

Expand Down Expand Up @@ -357,11 +357,11 @@ mod tests {
let protocol = Protocol::try_new(
3,
7,
Some([ReaderFeatures::DeletionVectors]),
Some([ReaderFeatures::ColumnMapping]),
Some::<Vec<String>>(vec![]),
Some::<Vec<String>>(vec![]),
)
.unwrap();
let table_config = TableConfiguration::new(metadata, protocol).unwrap();
let table_config = TableConfiguration::try_new(metadata, protocol).unwrap();

let scan_data = table_changes_action_iter(
Arc::new(engine),
Expand Down

0 comments on commit 5474769

Please sign in to comment.