Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add table changes example, improve TableChanges documentation #597

Merged
merged 9 commits into from
Dec 14, 2024
4 changes: 2 additions & 2 deletions kernel/examples/read-table-changes/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ fn main() -> DeltaResult<()> {
)?);
let table_changes = table.table_changes(engine.as_ref(), cli.start_version, cli.end_version)?;

let x = table_changes.into_scan_builder().build()?;
let batches: Vec<RecordBatch> = x
let table_changes_scan = table_changes.into_scan_builder().build()?;
let batches: Vec<RecordBatch> = table_changes_scan
.execute(engine.clone())?
.map(|scan_result| -> DeltaResult<_> {
let scan_result = scan_result?;
Expand Down
2 changes: 1 addition & 1 deletion kernel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
//! https://github.com/delta-io/delta-kernel-rs/tree/main/kernel/examples/read-table-single-threaded
//! [read-table-multi-threaded]:
//! https://github.com/delta-io/delta-kernel-rs/tree/main/kernel/examples/read-table-multi-threaded
//![read-table-changes]:
//! [read-table-changes]:
//! https://github.com/delta-io/delta-kernel-rs/tree/main/kernel/examples/read-table-changes
//!
//! Simple write examples can be found in the [`write.rs`] integration tests. Standalone write
Expand Down
41 changes: 20 additions & 21 deletions kernel/src/table_changes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@
//! // Construct a table from a path oaeuhoanut
//! let table = Table::try_from_uri(path)?;
//!
//! // Declare the version range for the table's change data feed
//! // Get the table changes (change data feed) between version 0 and 1
//! let table_changes = table.table_changes(engine.as_ref(), 0, 1)?;
//!
//! // Optionally specify a schema and predicate for the table changes scan
//! // Optionally specify a schema and predicate to apply to the table changes scan
//! let schema = table_changes
//! .schema()
//! .project(&["id", "_commit_version"])?;
Expand All @@ -28,7 +28,7 @@
//! .build()?;
//!
//! // Execute the table changes scan to get a fallible iterator of `ScanResult`s
//! let batches = table_changes_scan.execute(engine.clone())?;
//! let table_change_batches = table_changes_scan.execute(engine.clone())?;
//! # Ok::<(), Error>(())
//! ```
use std::collections::HashSet;
Expand Down Expand Up @@ -71,21 +71,28 @@ static CDF_FIELDS: LazyLock<[StructField; 3]> = LazyLock::new(|| {
/// - `_change_type`: String representing the type of change that for that commit. This may be one
/// of `delete`, `insert`, `update_preimage`, or `update_postimage`.
/// - `_commit_version`: Long representing the commit the change occurred in.
/// - `_commit_timestamp`: Time at which the commit occurred. If In-commit timestamps is enabled,
/// this is retrieved from the [`CommitInfo`] action. Otherwise, the timestamp is the same as the
/// commit file's modification timestamp.
/// - `_commit_timestamp`: Time at which the commit occurred. The timestamp is retrieved from the
/// file modification time of the log file. No timezone is associated with the timestamp.
///
/// Currently, in-commit timestamps (ICT) is not supported. In the future when ICT is enabled, the
/// timestamp will be retrieved from the `inCommitTimestamp` field of the CommitInfo` action.
/// See issue [#559](https://github.com/delta-io/delta-kernel-rs/issues/559)
/// For details on In-Commit Timestamps, see the [Protocol](https://github.com/delta-io/delta/blob/master/PROTOCOL.md#in-commit-timestamps).
///
///
/// Three properties must hold for the entire CDF range:
/// - Reading must be supported for every commit in the range. This is determined using [`ensure_read_supported`]
/// - Reading must be supported for every commit in the range. Currently the only read feature allowed
/// is deletion vectors. This will be expanded in the future to support more delta table features.
/// Because only deletion vectors are supported, reader version 2 will not be allowed. That is
// because version 2 requires that column mapping is enabled. Reader versions 1 and 3 are allowed.
/// - Change Data Feed must be enabled for the entire range with the `delta.enableChangeDataFeed`
/// table property set to 'true'.
/// table property set to `true`. Performing change data feed on tables with column mapping is
/// currently disallowed. We check that column mapping is disabled, or the column mapping mode is `None`.
/// - The schema for each commit must be compatible with the end schema. This means that all the
/// same fields and their nullability are the same. Schema compatibility will be expanded in the
/// future to allow compatible schemas that are not the exact same.
/// See issue [#523](https://github.com/delta-io/delta-kernel-rs/issues/523)
///
/// [`CommitInfo`]: crate::actions::CommitInfo
/// [`ensure_read_supported`]: crate::actions::Protocol::ensure_read_supported
/// # Examples
/// Get `TableChanges` for versions 0 to 1 (inclusive)
/// ```rust
Expand Down Expand Up @@ -225,11 +232,8 @@ impl TableChanges {
}
}

/// Ensures that change data feed is enabled in `table_properties`.
///
/// Performing change data feed on tables with column mapping is currently disallowed.
/// This will be less restrictive in the future. Because column mapping is disallowed, we also
/// check that column mapping is disabled, or the column mapping mode is `None`.
/// Ensures that change data feed is enabled in `table_properties`. See the documentation
/// of [`TableChanges`] for more details.
fn check_cdf_table_properties(table_properties: &TableProperties) -> DeltaResult<()> {
require!(
table_properties.enable_change_data_feed.unwrap_or(false),
Expand All @@ -246,12 +250,7 @@ fn check_cdf_table_properties(table_properties: &TableProperties) -> DeltaResult
}

/// Ensures that Change Data Feed is supported for a table with this [`Protocol`] .
//
// Currently the only read feature allowed is deletion vectors. This will be expanded in the
// future to support more delta table features.
//
// Because only deletion vectors are supported, reader version 2 will not be allowed. That is
// because version 2 requires that column mapping is enabled. Reader versions 1 and 3 are allowed.
//See the documentation of [`TableChanges`] for more details.
OussamaSaoudi-db marked this conversation as resolved.
Show resolved Hide resolved
fn ensure_cdf_read_supported(protocol: &Protocol) -> DeltaResult<()> {
static CDF_SUPPORTED_READER_FEATURES: LazyLock<HashSet<ReaderFeatures>> =
LazyLock::new(|| HashSet::from([ReaderFeatures::DeletionVectors]));
Expand Down
Loading