Skip to content

Commit

Permalink
feat: Introduce TableConfiguration to jointly manage metadata, prot…
Browse files Browse the repository at this point in the history
…ocol, and table properties (#644)

## What changes are proposed in this pull request?
This PR introduces the `TableConfiguration` struct which is used to
perform feature support and feature enablement checks the table's
protocol, metadata, table properties, and schema.

#### Problem statement

To check that a feature is enabled, you often must check that a certain
reader/writer feature is supported and that a table property is set to
true. For example, a writer must check both the
`delta.enableDeletionVectors` table property, and check that the
`deletionVectors` writer/reader features are present in the table's
Protocol. Probing two disparate structs to do a single check is
error-prone and may lead to these metadata/protocol checks to become out
of sync. Moreover checks are performed in the CDF path, snapshot scan
path, and in the read path. Thus there are many ways in which protocol
and metadata checks can diverge with one another.

Put simply, the problems are:
1. When checking feature support over multiple structs (like P&M), it is
easy to forget one and violate correctness.
2. Duplicate checks for the same feature may diverge among different
code paths

#### Solution

`TableConfiguration` consolidates all protocol and metadata checks to
one place. It also ensures that the logic for checking feature
enablement is kept consistent throughout the codebase. This addresses
the problems outlined above.

Closes: #571

## How was this change tested?
We add a couple tests to ensure that:
1) Creating `TableConfiguration` fails on tables for which reading is
not supported
2) deletion vector support and enablement checks work as expected.

---------

Co-authored-by: Oussama Saoudi <[email protected]>
  • Loading branch information
OussamaSaoudi and OussamaSaoudi-db authored Jan 28, 2025
1 parent 6751838 commit 3305d3a
Show file tree
Hide file tree
Showing 9 changed files with 334 additions and 53 deletions.
2 changes: 1 addition & 1 deletion kernel/examples/inspect-table/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ fn try_main() -> DeltaResult<()> {
}
Commands::Actions { oldest_first } => {
let log_schema = get_log_schema();
let actions = snapshot._log_segment().replay(
let actions = snapshot.log_segment().replay(
&engine,
log_schema.clone(),
log_schema.clone(),
Expand Down
2 changes: 1 addition & 1 deletion kernel/src/actions/set_transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl SetTransactionScanner {
))
});
self.snapshot
.log_segment
.log_segment()
.replay(engine, schema.clone(), schema, META_PREDICATE.clone())
}

Expand Down
1 change: 1 addition & 0 deletions kernel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ pub mod schema;
pub mod snapshot;
pub mod table;
pub mod table_changes;
pub mod table_configuration;
pub mod table_features;
pub mod table_properties;
pub mod transaction;
Expand Down
2 changes: 1 addition & 1 deletion kernel/src/log_segment/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ fn test_replay_for_metadata() {
let table = Table::new(url);
let snapshot = table.snapshot(&engine, None).unwrap();
let data: Vec<_> = snapshot
.log_segment
.log_segment()
.replay_for_metadata(&engine)
.unwrap()
.try_collect()
Expand Down
10 changes: 5 additions & 5 deletions kernel/src/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ impl Scan {
// needed (currently just means no partition cols AND no column mapping but will be extended
// for other transforms as we support them)
let static_transform = (self.have_partition_cols
|| self.snapshot.column_mapping_mode != ColumnMappingMode::None)
|| self.snapshot.column_mapping_mode() != ColumnMappingMode::None)
.then_some(Arc::new(Scan::get_static_transform(&self.all_fields)));
let physical_predicate = match self.physical_predicate.clone() {
PhysicalPredicate::StaticSkipAll => return Ok(None.into_iter().flatten()),
Expand Down Expand Up @@ -423,19 +423,19 @@ impl Scan {
// NOTE: We don't pass any meta-predicate because we expect no meaningful row group skipping
// when ~every checkpoint file will contain the adds and removes we are looking for.
self.snapshot
.log_segment
.log_segment()
.replay(engine, commit_read_schema, checkpoint_read_schema, None)
}

/// Get global state that is valid for the entire scan. This is somewhat expensive so should
/// only be called once per scan.
pub fn global_scan_state(&self) -> GlobalScanState {
GlobalScanState {
table_root: self.snapshot.table_root.to_string(),
table_root: self.snapshot.table_root().to_string(),
partition_columns: self.snapshot.metadata().partition_columns.clone(),
logical_schema: self.logical_schema.clone(),
physical_schema: self.physical_schema.clone(),
column_mapping_mode: self.snapshot.column_mapping_mode,
column_mapping_mode: self.snapshot.column_mapping_mode(),
}
}

Expand Down Expand Up @@ -479,7 +479,7 @@ impl Scan {
);

let global_state = Arc::new(self.global_scan_state());
let table_root = self.snapshot.table_root.clone();
let table_root = self.snapshot.table_root().clone();
let physical_predicate = self.physical_predicate();
let all_fields = self.all_fields.clone();
let have_partition_cols = self.have_partition_cols;
Expand Down
57 changes: 21 additions & 36 deletions kernel/src/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@ use crate::actions::{Metadata, Protocol};
use crate::log_segment::LogSegment;
use crate::scan::ScanBuilder;
use crate::schema::Schema;
use crate::table_features::{
column_mapping_mode, validate_schema_column_mapping, ColumnMappingMode,
};
use crate::table_configuration::TableConfiguration;
use crate::table_features::ColumnMappingMode;
use crate::table_properties::TableProperties;
use crate::{DeltaResult, Engine, Error, FileSystemClient, Version};

Expand All @@ -23,13 +22,8 @@ const LAST_CHECKPOINT_FILE_NAME: &str = "_last_checkpoint";
/// have a defined schema (which may change over time for any given table), specific version, and
/// frozen log segment.
pub struct Snapshot {
pub(crate) table_root: Url,
pub(crate) log_segment: LogSegment,
metadata: Metadata,
protocol: Protocol,
schema: Schema,
table_properties: TableProperties,
pub(crate) column_mapping_mode: ColumnMappingMode,
log_segment: LogSegment,
table_configuration: TableConfiguration,
}

impl Drop for Snapshot {
Expand All @@ -43,7 +37,7 @@ impl std::fmt::Debug for Snapshot {
f.debug_struct("Snapshot")
.field("path", &self.log_segment.log_root.as_str())
.field("version", &self.version())
.field("metadata", &self.metadata)
.field("metadata", &self.metadata())
.finish()
}
}
Expand Down Expand Up @@ -80,67 +74,58 @@ impl Snapshot {
engine: &dyn Engine,
) -> DeltaResult<Self> {
let (metadata, protocol) = log_segment.read_metadata(engine)?;

// important! before a read/write to the table we must check it is supported
protocol.ensure_read_supported()?;

// validate column mapping mode -- all schema fields should be correctly (un)annotated
let schema = metadata.parse_schema()?;
let table_properties = metadata.parse_table_properties();
let column_mapping_mode = column_mapping_mode(&protocol, &table_properties);
validate_schema_column_mapping(&schema, column_mapping_mode)?;

let table_configuration =
TableConfiguration::try_new(metadata, protocol, location, log_segment.end_version)?;
Ok(Self {
table_root: location,
log_segment,
metadata,
protocol,
schema,
table_properties,
column_mapping_mode,
table_configuration,
})
}

/// Log segment this snapshot uses
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
fn _log_segment(&self) -> &LogSegment {
pub(crate) fn log_segment(&self) -> &LogSegment {
&self.log_segment
}

pub fn table_root(&self) -> &Url {
&self.table_root
self.table_configuration.table_root()
}

/// Version of this `Snapshot` in the table.
pub fn version(&self) -> Version {
self.log_segment.end_version
self.table_configuration().version()
}

/// Table [`Schema`] at this `Snapshot`s version.
pub fn schema(&self) -> &Schema {
&self.schema
self.table_configuration.schema()
}

/// Table [`Metadata`] at this `Snapshot`s version.
pub fn metadata(&self) -> &Metadata {
&self.metadata
self.table_configuration.metadata()
}

/// Table [`Protocol`] at this `Snapshot`s version.
pub fn protocol(&self) -> &Protocol {
&self.protocol
self.table_configuration.protocol()
}

/// Get the [`TableProperties`] for this [`Snapshot`].
pub fn table_properties(&self) -> &TableProperties {
&self.table_properties
self.table_configuration().table_properties()
}
/// Get the [`TableConfiguration`] for this [`Snapshot`].
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
pub(crate) fn table_configuration(&self) -> &TableConfiguration {
&self.table_configuration
}

/// Get the [column mapping
/// mode](https://github.com/delta-io/delta/blob/master/PROTOCOL.md#column-mapping) at this
/// `Snapshot`s version.
pub fn column_mapping_mode(&self) -> ColumnMappingMode {
self.column_mapping_mode
self.table_configuration.column_mapping_mode()
}

/// Create a [`ScanBuilder`] for an `Arc<Snapshot>`.
Expand Down
16 changes: 8 additions & 8 deletions kernel/src/table_changes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,15 +161,15 @@ impl TableChanges {
// we support CDF with those features enabled.
//
// Note: We must still check each metadata and protocol action in the CDF range.
let check_snapshot = |snapshot: &Snapshot| -> DeltaResult<()> {
ensure_cdf_read_supported(snapshot.protocol())?;
check_cdf_table_properties(snapshot.table_properties())?;
Ok(())
let check_table_config = |snapshot: &Snapshot| {
if snapshot.table_configuration().is_cdf_read_supported() {
Ok(())
} else {
Err(Error::change_data_feed_unsupported(snapshot.version()))
}
};
check_snapshot(&start_snapshot)
.map_err(|_| Error::change_data_feed_unsupported(start_snapshot.version()))?;
check_snapshot(&end_snapshot)
.map_err(|_| Error::change_data_feed_unsupported(end_snapshot.version()))?;
check_table_config(&start_snapshot)?;
check_table_config(&end_snapshot)?;

// Verify that the start and end schemas are compatible. We must still check schema
// compatibility for each schema update in the CDF range.
Expand Down
2 changes: 1 addition & 1 deletion kernel/src/table_changes/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ impl TableChangesScan {
partition_columns: end_snapshot.metadata().partition_columns.clone(),
logical_schema: self.logical_schema.clone(),
physical_schema: self.physical_schema.clone(),
column_mapping_mode: end_snapshot.column_mapping_mode,
column_mapping_mode: end_snapshot.column_mapping_mode(),
}
}

Expand Down
Loading

0 comments on commit 3305d3a

Please sign in to comment.