From 3305d3a3a12a6b9052252eae96d58b6d9dc33abe Mon Sep 17 00:00:00 2001 From: OussamaSaoudi <45303303+OussamaSaoudi@users.noreply.github.com> Date: Mon, 27 Jan 2025 19:16:53 -0800 Subject: [PATCH] feat: Introduce `TableConfiguration` to jointly manage metadata, protocol, and table properties (#644) ## What changes are proposed in this pull request? This PR introduces the `TableConfiguration` struct which is used to perform feature support and feature enablement checks the table's protocol, metadata, table properties, and schema. #### Problem statement To check that a feature is enabled, you often must check that a certain reader/writer feature is supported and that a table property is set to true. For example, a writer must check both the `delta.enableDeletionVectors` table property, and check that the `deletionVectors` writer/reader features are present in the table's Protocol. Probing two disparate structs to do a single check is error-prone and may lead to these metadata/protocol checks to become out of sync. Moreover checks are performed in the CDF path, snapshot scan path, and in the read path. Thus there are many ways in which protocol and metadata checks can diverge with one another. Put simply, the problems are: 1. When checking feature support over multiple structs (like P&M), it is easy to forget one and violate correctness. 2. Duplicate checks for the same feature may diverge among different code paths #### Solution `TableConfiguration` consolidates all protocol and metadata checks to one place. It also ensures that the logic for checking feature enablement is kept consistent throughout the codebase. This addresses the problems outlined above. Closes: https://github.com/delta-io/delta-kernel-rs/issues/571 ## How was this change tested? We add a couple tests to ensure that: 1) Creating `TableConfiguration` fails on tables for which reading is not supported 2) deletion vector support and enablement checks work as expected. --------- Co-authored-by: Oussama Saoudi --- kernel/examples/inspect-table/src/main.rs | 2 +- kernel/src/actions/set_transaction.rs | 2 +- kernel/src/lib.rs | 1 + kernel/src/log_segment/tests.rs | 2 +- kernel/src/scan/mod.rs | 10 +- kernel/src/snapshot.rs | 57 ++--- kernel/src/table_changes/mod.rs | 16 +- kernel/src/table_changes/scan.rs | 2 +- kernel/src/table_configuration.rs | 295 ++++++++++++++++++++++ 9 files changed, 334 insertions(+), 53 deletions(-) create mode 100644 kernel/src/table_configuration.rs diff --git a/kernel/examples/inspect-table/src/main.rs b/kernel/examples/inspect-table/src/main.rs index 01b1c4e88..243e56a5a 100644 --- a/kernel/examples/inspect-table/src/main.rs +++ b/kernel/examples/inspect-table/src/main.rs @@ -220,7 +220,7 @@ fn try_main() -> DeltaResult<()> { } Commands::Actions { oldest_first } => { let log_schema = get_log_schema(); - let actions = snapshot._log_segment().replay( + let actions = snapshot.log_segment().replay( &engine, log_schema.clone(), log_schema.clone(), diff --git a/kernel/src/actions/set_transaction.rs b/kernel/src/actions/set_transaction.rs index 89c389d3d..ea1ffa6a7 100644 --- a/kernel/src/actions/set_transaction.rs +++ b/kernel/src/actions/set_transaction.rs @@ -60,7 +60,7 @@ impl SetTransactionScanner { )) }); self.snapshot - .log_segment + .log_segment() .replay(engine, schema.clone(), schema, META_PREDICATE.clone()) } diff --git a/kernel/src/lib.rs b/kernel/src/lib.rs index 49dceea75..8dde21afe 100644 --- a/kernel/src/lib.rs +++ b/kernel/src/lib.rs @@ -82,6 +82,7 @@ pub mod schema; pub mod snapshot; pub mod table; pub mod table_changes; +pub mod table_configuration; pub mod table_features; pub mod table_properties; pub mod transaction; diff --git a/kernel/src/log_segment/tests.rs b/kernel/src/log_segment/tests.rs index e1c441d7e..5db1c4581 100644 --- a/kernel/src/log_segment/tests.rs +++ b/kernel/src/log_segment/tests.rs @@ -31,7 +31,7 @@ fn test_replay_for_metadata() { let table = Table::new(url); let snapshot = table.snapshot(&engine, None).unwrap(); let data: Vec<_> = snapshot - .log_segment + .log_segment() .replay_for_metadata(&engine) .unwrap() .try_collect() diff --git a/kernel/src/scan/mod.rs b/kernel/src/scan/mod.rs index 418e289fb..4e98eea7f 100644 --- a/kernel/src/scan/mod.rs +++ b/kernel/src/scan/mod.rs @@ -395,7 +395,7 @@ impl Scan { // needed (currently just means no partition cols AND no column mapping but will be extended // for other transforms as we support them) let static_transform = (self.have_partition_cols - || self.snapshot.column_mapping_mode != ColumnMappingMode::None) + || self.snapshot.column_mapping_mode() != ColumnMappingMode::None) .then_some(Arc::new(Scan::get_static_transform(&self.all_fields))); let physical_predicate = match self.physical_predicate.clone() { PhysicalPredicate::StaticSkipAll => return Ok(None.into_iter().flatten()), @@ -423,7 +423,7 @@ impl Scan { // NOTE: We don't pass any meta-predicate because we expect no meaningful row group skipping // when ~every checkpoint file will contain the adds and removes we are looking for. self.snapshot - .log_segment + .log_segment() .replay(engine, commit_read_schema, checkpoint_read_schema, None) } @@ -431,11 +431,11 @@ impl Scan { /// only be called once per scan. pub fn global_scan_state(&self) -> GlobalScanState { GlobalScanState { - table_root: self.snapshot.table_root.to_string(), + table_root: self.snapshot.table_root().to_string(), partition_columns: self.snapshot.metadata().partition_columns.clone(), logical_schema: self.logical_schema.clone(), physical_schema: self.physical_schema.clone(), - column_mapping_mode: self.snapshot.column_mapping_mode, + column_mapping_mode: self.snapshot.column_mapping_mode(), } } @@ -479,7 +479,7 @@ impl Scan { ); let global_state = Arc::new(self.global_scan_state()); - let table_root = self.snapshot.table_root.clone(); + let table_root = self.snapshot.table_root().clone(); let physical_predicate = self.physical_predicate(); let all_fields = self.all_fields.clone(); let have_partition_cols = self.have_partition_cols; diff --git a/kernel/src/snapshot.rs b/kernel/src/snapshot.rs index 75f52ab78..f198b9080 100644 --- a/kernel/src/snapshot.rs +++ b/kernel/src/snapshot.rs @@ -10,9 +10,8 @@ use crate::actions::{Metadata, Protocol}; use crate::log_segment::LogSegment; use crate::scan::ScanBuilder; use crate::schema::Schema; -use crate::table_features::{ - column_mapping_mode, validate_schema_column_mapping, ColumnMappingMode, -}; +use crate::table_configuration::TableConfiguration; +use crate::table_features::ColumnMappingMode; use crate::table_properties::TableProperties; use crate::{DeltaResult, Engine, Error, FileSystemClient, Version}; @@ -23,13 +22,8 @@ const LAST_CHECKPOINT_FILE_NAME: &str = "_last_checkpoint"; /// have a defined schema (which may change over time for any given table), specific version, and /// frozen log segment. pub struct Snapshot { - pub(crate) table_root: Url, - pub(crate) log_segment: LogSegment, - metadata: Metadata, - protocol: Protocol, - schema: Schema, - table_properties: TableProperties, - pub(crate) column_mapping_mode: ColumnMappingMode, + log_segment: LogSegment, + table_configuration: TableConfiguration, } impl Drop for Snapshot { @@ -43,7 +37,7 @@ impl std::fmt::Debug for Snapshot { f.debug_struct("Snapshot") .field("path", &self.log_segment.log_root.as_str()) .field("version", &self.version()) - .field("metadata", &self.metadata) + .field("metadata", &self.metadata()) .finish() } } @@ -80,67 +74,58 @@ impl Snapshot { engine: &dyn Engine, ) -> DeltaResult { let (metadata, protocol) = log_segment.read_metadata(engine)?; - - // important! before a read/write to the table we must check it is supported - protocol.ensure_read_supported()?; - - // validate column mapping mode -- all schema fields should be correctly (un)annotated - let schema = metadata.parse_schema()?; - let table_properties = metadata.parse_table_properties(); - let column_mapping_mode = column_mapping_mode(&protocol, &table_properties); - validate_schema_column_mapping(&schema, column_mapping_mode)?; - + let table_configuration = + TableConfiguration::try_new(metadata, protocol, location, log_segment.end_version)?; Ok(Self { - table_root: location, log_segment, - metadata, - protocol, - schema, - table_properties, - column_mapping_mode, + table_configuration, }) } /// Log segment this snapshot uses #[cfg_attr(feature = "developer-visibility", visibility::make(pub))] - fn _log_segment(&self) -> &LogSegment { + pub(crate) fn log_segment(&self) -> &LogSegment { &self.log_segment } pub fn table_root(&self) -> &Url { - &self.table_root + self.table_configuration.table_root() } /// Version of this `Snapshot` in the table. pub fn version(&self) -> Version { - self.log_segment.end_version + self.table_configuration().version() } /// Table [`Schema`] at this `Snapshot`s version. pub fn schema(&self) -> &Schema { - &self.schema + self.table_configuration.schema() } /// Table [`Metadata`] at this `Snapshot`s version. pub fn metadata(&self) -> &Metadata { - &self.metadata + self.table_configuration.metadata() } /// Table [`Protocol`] at this `Snapshot`s version. pub fn protocol(&self) -> &Protocol { - &self.protocol + self.table_configuration.protocol() } /// Get the [`TableProperties`] for this [`Snapshot`]. pub fn table_properties(&self) -> &TableProperties { - &self.table_properties + self.table_configuration().table_properties() + } + /// Get the [`TableConfiguration`] for this [`Snapshot`]. + #[cfg_attr(feature = "developer-visibility", visibility::make(pub))] + pub(crate) fn table_configuration(&self) -> &TableConfiguration { + &self.table_configuration } - /// Get the [column mapping /// mode](https://github.com/delta-io/delta/blob/master/PROTOCOL.md#column-mapping) at this /// `Snapshot`s version. pub fn column_mapping_mode(&self) -> ColumnMappingMode { - self.column_mapping_mode + self.table_configuration.column_mapping_mode() } /// Create a [`ScanBuilder`] for an `Arc`. diff --git a/kernel/src/table_changes/mod.rs b/kernel/src/table_changes/mod.rs index a855668d8..e65b0ae53 100644 --- a/kernel/src/table_changes/mod.rs +++ b/kernel/src/table_changes/mod.rs @@ -161,15 +161,15 @@ impl TableChanges { // we support CDF with those features enabled. // // Note: We must still check each metadata and protocol action in the CDF range. - let check_snapshot = |snapshot: &Snapshot| -> DeltaResult<()> { - ensure_cdf_read_supported(snapshot.protocol())?; - check_cdf_table_properties(snapshot.table_properties())?; - Ok(()) + let check_table_config = |snapshot: &Snapshot| { + if snapshot.table_configuration().is_cdf_read_supported() { + Ok(()) + } else { + Err(Error::change_data_feed_unsupported(snapshot.version())) + } }; - check_snapshot(&start_snapshot) - .map_err(|_| Error::change_data_feed_unsupported(start_snapshot.version()))?; - check_snapshot(&end_snapshot) - .map_err(|_| Error::change_data_feed_unsupported(end_snapshot.version()))?; + check_table_config(&start_snapshot)?; + check_table_config(&end_snapshot)?; // Verify that the start and end schemas are compatible. We must still check schema // compatibility for each schema update in the CDF range. diff --git a/kernel/src/table_changes/scan.rs b/kernel/src/table_changes/scan.rs index dffd40f68..0c2ff3eed 100644 --- a/kernel/src/table_changes/scan.rs +++ b/kernel/src/table_changes/scan.rs @@ -211,7 +211,7 @@ impl TableChangesScan { partition_columns: end_snapshot.metadata().partition_columns.clone(), logical_schema: self.logical_schema.clone(), physical_schema: self.physical_schema.clone(), - column_mapping_mode: end_snapshot.column_mapping_mode, + column_mapping_mode: end_snapshot.column_mapping_mode(), } } diff --git a/kernel/src/table_configuration.rs b/kernel/src/table_configuration.rs new file mode 100644 index 000000000..565546d52 --- /dev/null +++ b/kernel/src/table_configuration.rs @@ -0,0 +1,295 @@ +//! This module defines [`TableConfiguration`], a high level api to check feature support and +//! feature enablement for a table at a given version. This encapsulates [`Protocol`], [`Metadata`], +//! [`Schema`], [`TableProperties`], and [`ColumnMappingMode`]. These structs in isolation should +//! be considered raw and unvalidated if they are not a part of [`TableConfiguration`]. We unify +//! these fields because they are deeply intertwined when dealing with table features. For example: +//! To check that deletion vector writes are enabled, you must check both both the protocol's +//! reader/writer features, and ensure that the deletion vector table property is enabled in the +//! [`TableProperties`]. +//! +//! [`Schema`]: crate::schema::Schema +use std::collections::HashSet; +use std::sync::{Arc, LazyLock}; + +use url::Url; + +use crate::actions::{ensure_supported_features, Metadata, Protocol}; +use crate::schema::{Schema, SchemaRef}; +use crate::table_features::{ + column_mapping_mode, validate_schema_column_mapping, ColumnMappingMode, ReaderFeatures, + WriterFeatures, +}; +use crate::table_properties::TableProperties; +use crate::{DeltaResult, Version}; + +/// Holds all the configuration for a table at a specific version. This includes the supported +/// reader and writer features, table properties, schema, version, and table root. This can be used +/// to check whether a table supports a feature or has it enabled. For example, deletion vector +/// support can be checked with [`TableConfiguration::is_deletion_vector_supported`] and deletion +/// vector write enablement can be checked with [`TableConfiguration::is_deletion_vector_enabled`]. +/// +/// [`TableConfiguration`] performs checks upon construction with `TableConfiguration::try_new` +/// to validate that Metadata and Protocol are correctly formatted and mutually compatible. If +/// `try_new` successfully returns `TableConfiguration`, it is also guaranteed that reading the +/// table is supported. +#[cfg_attr(feature = "developer-visibility", visibility::make(pub))] +#[derive(Debug)] +pub(crate) struct TableConfiguration { + metadata: Metadata, + protocol: Protocol, + schema: SchemaRef, + table_properties: TableProperties, + column_mapping_mode: ColumnMappingMode, + table_root: Url, + version: Version, +} + +impl TableConfiguration { + /// Constructs a [`TableConfiguration`] for a table located in `table_root` at `version`. + /// This validates that the [`Metadata`] and [`Protocol`] are compatible with one another + /// and that the kernel supports reading from this table. + /// + /// Note: This only returns successfully kernel supports reading the table. It's important + /// to do this validation is done in `try_new` because all table accesses must first construct + /// the [`TableConfiguration`]. This ensures that developers never forget to check that kernel + /// supports reading the table, and that all table accesses are legal. + /// + /// Note: In the future, we will perform stricter checks on the set of reader and writer + /// features. In particular, we will check that: + /// - Non-legacy features must appear in both reader features and writer features lists. + /// If such a feature is present, the reader version and writer version must be 3, and 5 + /// respectively. + /// - Legacy reader features occur when the reader version is 3, but the writer version is + /// either 5 or 6. In this case, the writer feature list must be empty. + /// - Column mapping is the only legacy feature present in kernel. No future delta versions + /// will introduce new legacy features. + /// See: + pub(crate) fn try_new( + metadata: Metadata, + protocol: Protocol, + table_root: Url, + version: Version, + ) -> DeltaResult { + protocol.ensure_read_supported()?; + + let schema = Arc::new(metadata.parse_schema()?); + let table_properties = metadata.parse_table_properties(); + let column_mapping_mode = column_mapping_mode(&protocol, &table_properties); + + // validate column mapping mode -- all schema fields should be correctly (un)annotated + validate_schema_column_mapping(&schema, column_mapping_mode)?; + Ok(Self { + schema, + metadata, + protocol, + table_properties, + column_mapping_mode, + table_root, + version, + }) + } + /// The [`Metadata`] for this table at this version. + #[cfg_attr(feature = "developer-visibility", visibility::make(pub))] + pub(crate) fn metadata(&self) -> &Metadata { + &self.metadata + } + /// The [`Protocol`] of this table at this version. + #[cfg_attr(feature = "developer-visibility", visibility::make(pub))] + pub(crate) fn protocol(&self) -> &Protocol { + &self.protocol + } + /// The [`Schema`] of for this table at this version. + #[cfg_attr(feature = "developer-visibility", visibility::make(pub))] + pub(crate) fn schema(&self) -> &Schema { + self.schema.as_ref() + } + /// The [`TableProperties`] of this table at this version. + #[allow(unused)] + #[cfg_attr(feature = "developer-visibility", visibility::make(pub))] + pub(crate) fn table_properties(&self) -> &TableProperties { + &self.table_properties + } + /// The [`ColumnMappingMode`] for this table at this version. + #[cfg_attr(feature = "developer-visibility", visibility::make(pub))] + pub(crate) fn column_mapping_mode(&self) -> ColumnMappingMode { + self.column_mapping_mode + } + /// The [`Url`] of the table this [`TableConfiguration`] belongs to + #[cfg_attr(feature = "developer-visibility", visibility::make(pub))] + pub(crate) fn table_root(&self) -> &Url { + &self.table_root + } + /// The [`Version`] which this [`TableConfiguration`] belongs to. + #[cfg_attr(feature = "developer-visibility", visibility::make(pub))] + pub(crate) fn version(&self) -> Version { + self.version + } + /// Returns `true` if the kernel supports writing to this table. This checks that the + /// protocol's writer features are all supported. + #[allow(unused)] + #[cfg_attr(feature = "developer-visibility", visibility::make(pub))] + pub(crate) fn is_write_supported(&self) -> bool { + self.protocol.ensure_write_supported().is_ok() + } + /// Returns `true` if kernel supports reading Change Data Feed on this table. + /// See the documentation of [`TableChanges`] for more details. + /// + /// [`TableChanges`]: crate::table_changes::TableChanges + #[cfg_attr(feature = "developer-visibility", visibility::make(pub))] + pub(crate) fn is_cdf_read_supported(&self) -> bool { + static CDF_SUPPORTED_READER_FEATURES: LazyLock> = + LazyLock::new(|| HashSet::from([ReaderFeatures::DeletionVectors])); + let protocol_supported = match self.protocol.reader_features() { + // if min_reader_version = 3 and all reader features are subset of supported => OK + Some(reader_features) if self.protocol.min_reader_version() == 3 => { + ensure_supported_features(reader_features, &CDF_SUPPORTED_READER_FEATURES).is_ok() + } + // if min_reader_version = 1 and there are no reader features => OK + None => self.protocol.min_reader_version() == 1, + // any other protocol is not supported + _ => false, + }; + let cdf_enabled = self + .table_properties + .enable_change_data_feed + .unwrap_or(false); + let column_mapping_disabled = matches!( + self.table_properties.column_mapping_mode, + None | Some(ColumnMappingMode::None) + ); + protocol_supported && cdf_enabled && column_mapping_disabled + } + /// Returns `true` if deletion vectors is supported on this table. To support deletion vectors, + /// a table must support reader version 3, writer version 7, and the deletionVectors feature in + /// both the protocol's readerFeatures and writerFeatures. + /// + /// See: + #[allow(unused)] + #[cfg_attr(feature = "developer-visibility", visibility::make(pub))] + pub(crate) fn is_deletion_vector_supported(&self) -> bool { + let read_supported = self + .protocol() + .has_reader_feature(&ReaderFeatures::DeletionVectors) + && self.protocol.min_reader_version() == 3; + let write_supported = self + .protocol() + .has_writer_feature(&WriterFeatures::DeletionVectors) + && self.protocol.min_writer_version() == 7; + read_supported && write_supported + } + + /// Returns `true` if writing deletion vectors is enabled for this table. This is the case + /// when the deletion vectors is supported on this table and the `delta.enableDeletionVectors` + /// table property is set to `true`. + /// + /// See: + #[allow(unused)] + #[cfg_attr(feature = "developer-visibility", visibility::make(pub))] + pub(crate) fn is_deletion_vector_enabled(&self) -> bool { + self.is_deletion_vector_supported() + && self + .table_properties + .enable_deletion_vectors + .unwrap_or(false) + } +} + +#[cfg(test)] +mod test { + use std::collections::HashMap; + + use url::Url; + + use crate::actions::{Metadata, Protocol}; + use crate::table_features::{ReaderFeatures, WriterFeatures}; + + use super::TableConfiguration; + + #[test] + fn dv_supported_not_enabled() { + let metadata = Metadata { + configuration: HashMap::from_iter([( + "delta.enableChangeDataFeed".to_string(), + "true".to_string(), + )]), + schema_string: r#"{"type":"struct","fields":[{"name":"value","type":"integer","nullable":true,"metadata":{}}]}"#.to_string(), + ..Default::default() + }; + let protocol = Protocol::try_new( + 3, + 7, + Some([ReaderFeatures::DeletionVectors]), + Some([WriterFeatures::DeletionVectors]), + ) + .unwrap(); + let table_root = Url::try_from("file:///").unwrap(); + let table_config = TableConfiguration::try_new(metadata, protocol, table_root, 0).unwrap(); + assert!(table_config.is_deletion_vector_supported()); + assert!(!table_config.is_deletion_vector_enabled()); + } + #[test] + fn dv_enabled() { + let metadata = Metadata { + configuration: HashMap::from_iter([( + "delta.enableChangeDataFeed".to_string(), + "true".to_string(), + ), + ( + "delta.enableDeletionVectors".to_string(), + "true".to_string(), + )]), + schema_string: r#"{"type":"struct","fields":[{"name":"value","type":"integer","nullable":true,"metadata":{}}]}"#.to_string(), + ..Default::default() + }; + let protocol = Protocol::try_new( + 3, + 7, + Some([ReaderFeatures::DeletionVectors]), + Some([WriterFeatures::DeletionVectors]), + ) + .unwrap(); + let table_root = Url::try_from("file:///").unwrap(); + let table_config = TableConfiguration::try_new(metadata, protocol, table_root, 0).unwrap(); + assert!(table_config.is_deletion_vector_supported()); + assert!(table_config.is_deletion_vector_enabled()); + } + #[test] + fn fails_on_unsupported_feature() { + let metadata = Metadata { + schema_string: r#"{"type":"struct","fields":[{"name":"value","type":"integer","nullable":true,"metadata":{}}]}"#.to_string(), + ..Default::default() + }; + let protocol = Protocol::try_new( + 3, + 7, + Some([ReaderFeatures::V2Checkpoint]), + Some([WriterFeatures::V2Checkpoint]), + ) + .unwrap(); + let table_root = Url::try_from("file:///").unwrap(); + TableConfiguration::try_new(metadata, protocol, table_root, 0) + .expect_err("V2 checkpoint is not supported in kernel"); + } + #[test] + fn dv_not_supported() { + let metadata = Metadata { + configuration: HashMap::from_iter([( + "delta.enableChangeDataFeed".to_string(), + "true".to_string(), + )]), + schema_string: r#"{"type":"struct","fields":[{"name":"value","type":"integer","nullable":true,"metadata":{}}]}"#.to_string(), + ..Default::default() + }; + let protocol = Protocol::try_new( + 3, + 7, + Some([ReaderFeatures::TimestampWithoutTimezone]), + Some([WriterFeatures::TimestampWithoutTimezone]), + ) + .unwrap(); + let table_root = Url::try_from("file:///").unwrap(); + let table_config = TableConfiguration::try_new(metadata, protocol, table_root, 0).unwrap(); + assert!(!table_config.is_deletion_vector_supported()); + assert!(!table_config.is_deletion_vector_enabled()); + } +}