Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: incremental Snapshot update #651

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 52 additions & 6 deletions kernel/src/log_segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,21 +166,59 @@ impl LogSegment {
.filter_ok(|x| x.is_commit())
.try_collect()?;

// Return a FileNotFound error if there are no new commits.
// Callers can then decide how to handle this case.
require!(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Splitting it up makes it clearer that we want nonempty 👌

!ascending_commit_files.is_empty(),
Error::file_not_found("No new commits.")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the error should be more like "No commits in range", or "Failed to find commits in the range". "New commits" makes sense for an update operation, but CDF could operate over old commits.

);

// - Here check that the start version is correct.
// - [`LogSegment::try_new`] will verify that the `end_version` is correct if present.
// - [`LogSegment::try_new`] also checks that there are no gaps between commits.
// If all three are satisfied, this implies that all the desired commits are present.
require!(
ascending_commit_files
.first()
.is_some_and(|first_commit| first_commit.version == start_version),
// safety: we validated the list is not empty above
ascending_commit_files[0].version == start_version,
Error::generic(format!(
"Expected the first commit to have version {}",
start_version
))
);
LogSegment::try_new(ascending_commit_files, vec![], log_root, end_version)
}

/// Extends this LogSegment with the contents of another LogSegment.
///
/// The other LogSegment must be contiguous with this one, i.e. the end
/// version of this LogSegment must be one less than the start version of
/// the other LogSegment and contain only commit files.
pub(crate) fn extend(&mut self, other: &LogSegment) -> DeltaResult<()> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this passes by reference. I'm not sure when we'd need to keep around such a log segment besides extending an existing one. Should we pass ownership and avoid the clone?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd go a step further -- most of this should probably be inlined at the call site, which eliminates many of the correctness checks that caller already had to perform (or can prove are redundant for other reasons)

(see also my other comment about making an entirely new snapshot instead of mutating an existing one)

require!(
self.log_root == other.log_root,
Error::generic("Cannot merge LogSegments with different log roots")
);
require!(
other.checkpoint_parts.is_empty(),
Error::generic("Cannot extend by LogSegments with checkpoint parts")
);

if other.ascending_commit_files.is_empty() {
return Ok(());
}

require!(
Some(self.end_version + 1) == other.ascending_commit_files.first().map(|f| f.version),
Error::generic("Cannot merge non contiguous LogSegments")
);

self.ascending_commit_files
.extend(other.ascending_commit_files.iter().cloned());
self.end_version = other.end_version;

Ok(())
}

/// Read a stream of log data from this log segment.
///
/// The log files will be read from most recent to oldest.
Expand Down Expand Up @@ -226,8 +264,11 @@ impl LogSegment {
Ok(commit_stream.chain(checkpoint_stream))
}

// Get the most up-to-date Protocol and Metadata actions
pub(crate) fn read_metadata(&self, engine: &dyn Engine) -> DeltaResult<(Metadata, Protocol)> {
// Scan the log segment for metadata and protocol actions
pub(crate) fn read_metadata_opt(
&self,
engine: &dyn Engine,
) -> DeltaResult<(Option<Metadata>, Option<Protocol>)> {
let data_batches = self.replay_for_metadata(engine)?;
let (mut metadata_opt, mut protocol_opt) = (None, None);
for batch in data_batches {
Expand All @@ -243,7 +284,12 @@ impl LogSegment {
break;
}
}
match (metadata_opt, protocol_opt) {
Ok((metadata_opt, protocol_opt))
}

// Get the most up-to-date Protocol and Metadata actions
pub(crate) fn read_metadata(&self, engine: &dyn Engine) -> DeltaResult<(Metadata, Protocol)> {
match self.read_metadata_opt(engine)? {
(Some(m), Some(p)) => Ok((m, p)),
(None, Some(_)) => Err(Error::MissingMetadata),
(Some(_), None) => Err(Error::MissingProtocol),
Expand Down
60 changes: 58 additions & 2 deletions kernel/src/log_segment/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::engine::default::filesystem::ObjectStoreFileSystemClient;
use crate::engine::sync::SyncEngine;
use crate::log_segment::LogSegment;
use crate::snapshot::CheckpointMetadata;
use crate::{FileSystemClient, Table};
use crate::{Error, FileSystemClient, Table};
use test_utils::delta_path_for_version;

// NOTE: In addition to testing the meta-predicate for metadata replay, this test also verifies
Expand Down Expand Up @@ -500,7 +500,6 @@ fn test_non_contiguous_log() {

#[test]
fn table_changes_fails_with_larger_start_version_than_end() {
// Commit with version 1 is missing
let (client, log_root) = build_log_with_paths_and_checkpoint(
&[
delta_path_for_version(0, "json"),
Expand All @@ -511,3 +510,60 @@ fn table_changes_fails_with_larger_start_version_than_end() {
let log_segment_res = LogSegment::for_table_changes(client.as_ref(), log_root, 1, Some(0));
assert!(log_segment_res.is_err());
}

#[test]
fn empty_log_segment_returns_specific_error() {
let (client, log_root) = build_log_with_paths_and_checkpoint(
&[
delta_path_for_version(0, "json"),
delta_path_for_version(1, "json"),
],
None,
);
let log_segment_res = LogSegment::for_table_changes(client.as_ref(), log_root, 2, None);
assert!(matches!(log_segment_res, Err(Error::FileNotFound(_))));
}

#[test]
fn extend_log_segment_by_andother_log_segment() {
let (client, log_root) = build_log_with_paths_and_checkpoint(
&[
delta_path_for_version(0, "json"),
delta_path_for_version(1, "json"),
delta_path_for_version(2, "json"),
delta_path_for_version(3, "json"),
],
None,
);

let mut log_segment =
LogSegment::for_snapshot(client.as_ref(), log_root.clone(), None, 1).unwrap();
assert_eq!(log_segment.end_version, 1);
assert_eq!(log_segment.ascending_commit_files.len(), 2);
let log_segment_changes =
LogSegment::for_table_changes(client.as_ref(), log_root.clone(), 2, None).unwrap();
log_segment.extend(&log_segment_changes).unwrap();
assert_eq!(log_segment.end_version, 3);
assert_eq!(log_segment.ascending_commit_files.len(), 4);

// non contiguous ranges
let mut log_segment =
LogSegment::for_snapshot(client.as_ref(), log_root.clone(), None, 0).unwrap();
assert!(log_segment.extend(&log_segment_changes).is_err());

// non matching log roots
let mut log_segment =
LogSegment::for_snapshot(client.as_ref(), log_root.clone(), None, 1).unwrap();
let mut log_segment_changes =
LogSegment::for_table_changes(client.as_ref(), log_root.clone(), 2, None).unwrap();
log_segment_changes.log_root = Url::parse("file:///").unwrap();
assert!(log_segment.extend(&log_segment_changes).is_err());

// non-empty checkpoint parts
let mut log_segment =
LogSegment::for_snapshot(client.as_ref(), log_root.clone(), None, 1).unwrap();
let mut log_segment_changes =
LogSegment::for_table_changes(client.as_ref(), log_root.clone(), 2, None).unwrap();
log_segment_changes.checkpoint_parts = log_segment.ascending_commit_files.clone();
assert!(log_segment.extend(&log_segment_changes).is_err());
}
93 changes: 91 additions & 2 deletions kernel/src/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ impl Snapshot {
pub fn try_new(
table_root: Url,
engine: &dyn Engine,
version: Option<Version>,
version: impl Into<Option<Version>>,
) -> DeltaResult<Self> {
let fs_client = engine.get_file_system_client();
let log_root = table_root.join("_delta_log/")?;
Expand Down Expand Up @@ -152,6 +152,64 @@ impl Snapshot {
pub fn into_scan_builder(self) -> ScanBuilder {
ScanBuilder::new(self)
}

/// Update the `Snapshot` to the target version.
///
/// # Parameters
/// - `target_version`: desired version of the `Snapshot` after update, defaults to latest.
/// - `engine`: Implementation of [`Engine`] apis.
///
/// # Returns
/// - boolean flag indicating if the `Snapshot` was updated.
pub fn update(
&mut self,
Comment on lines +164 to +165
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not clear to me that we want a mut update? An engine could conceivably want to keep the original snapshot because e.g. other queries are still using it. And if there is no sharing, then we should probably just consume self.

Either way, it seems better to return an entirely new Snapshot? Especially since that makes this code exception safe (an error partway through doesn't leave any visible changes).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also -- if we do take &self then we may want to consider returning a Cow, since a common case is that the passed-in snapshot is still the latest. If consuming self, it's easy enough to return self as needed.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea perhaps we need some discussion on what API we want here? I had a PR that gives a new Snapshot with a Snapshot::new_from API: #549

target_version: impl Into<Option<Version>>,
engine: &dyn Engine,
) -> DeltaResult<bool> {
let fs_client = engine.get_file_system_client();
let log_root = self.table_root.join("_delta_log/")?;
let log_segment = match LogSegment::for_table_changes(
fs_client.as_ref(),
log_root,
self.version() + 1,
target_version,
) {
Ok(segment) => segment,
Err(Error::FileNotFound(_)) => return Ok(false),
Err(e) => return Err(e),
};

let (metadata, protocol) = log_segment.read_metadata_opt(engine)?;
if let Some(p) = &protocol {
p.ensure_read_supported()?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the column mapping mode also be checked in the case only Protocol is updated? the column_mapping_mode function takes protocol as a parameter, so I imagine whatever the output column_mapping_mode is should be re-validated.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact this is another usecase for my TableConfiguration PR where we do all the validations in one place without having to think about these cases.

}
let (schema, table_properties) = if let Some(m) = &metadata {
let schema = m.parse_schema()?;
let table_properties = m.parse_table_properties();
let column_mapping_mode = column_mapping_mode(
protocol.as_ref().unwrap_or(&self.protocol),
&table_properties,
);
validate_schema_column_mapping(&schema, column_mapping_mode)?;
(Some(schema), Some(table_properties))
} else {
(None, None)
};

// NOTE: we try to extend the log segment first, so that if it fails, we don't update the
// snapshot. Otherwise callers might end up with an inconsistent snapshot.
self.log_segment.extend(&log_segment)?;
if let Some(p) = protocol {
self.protocol = p;
}
if let (Some(m), Some(s), Some(t)) = (metadata, schema, table_properties) {
self.metadata = m;
self.schema = s;
self.table_properties = t;
}

Ok(true)
}
}

#[derive(Debug, Deserialize, Serialize)]
Expand Down Expand Up @@ -217,7 +275,7 @@ mod tests {
use crate::engine::default::filesystem::ObjectStoreFileSystemClient;
use crate::engine::sync::SyncEngine;
use crate::path::ParsedLogPath;
use crate::schema::StructType;
use crate::schema::{DataType, PrimitiveType, StructType};

#[test]
fn test_snapshot_read_metadata() {
Expand Down Expand Up @@ -348,4 +406,35 @@ mod tests {
3,
);
}

#[test]
fn test_snapshot_update() {
let path = std::fs::canonicalize(PathBuf::from("./tests/data/type-widening/")).unwrap();

let location = url::Url::from_directory_path(path).unwrap();
let engine = SyncEngine::new();
let mut snapshot = Snapshot::try_new(location, &engine, 0).unwrap();

assert_eq!(snapshot.protocol().min_reader_version(), 1);
assert_eq!(snapshot.table_properties.enable_type_widening, None);

snapshot.update(1, &engine).unwrap();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Such a clean n simple api to update! 👌

assert_eq!(snapshot.protocol().min_reader_version(), 3);
assert_eq!(snapshot.table_properties.enable_type_widening, Some(true));
assert!(matches!(
snapshot.schema().field("int_decimal").unwrap().data_type(),
&DataType::Primitive(PrimitiveType::Integer)
));

snapshot.update(None, &engine).unwrap();
assert_eq!(snapshot.protocol().min_reader_version(), 3);
assert_eq!(snapshot.table_properties.enable_type_widening, Some(true));
assert!(matches!(
snapshot.schema().field("int_decimal").unwrap().data_type(),
&DataType::Primitive(PrimitiveType::Decimal(11, 1))
));

// update return false if no new version
assert!(!snapshot.update(None, &engine).unwrap());
}
}
5 changes: 5 additions & 0 deletions kernel/src/table_properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ pub struct TableProperties {
/// whether to enable row tracking during writes.
pub enable_row_tracking: Option<bool>,

/// whether to enable type widening
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see enableTypeWidening in the protocol. How come it's not part of the protocol doesn't have it when other docs do?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like it's still an RFC at this point: https://github.com/delta-io/delta/blob/master/protocol_rfcs/type-widening.md. We can totally implement support for it, but details of the feature could technically change before the RFC is merged into the actual spec. I believe the feature name also needs to be typeWidening-preview or similar, until that happens.

pub enable_type_widening: Option<bool>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re

Note
removed the breaking change label, since calling code should not require updates.

The semver check fails here with the message:

A pub struct constructible with a struct literal has a new pub field. Existing struct literals must be updated to include the new field.
ref: https://doc.rust-lang.org/reference/expressions/struct-expr.html
impl: https://github.com/obi1kenobi/cargo-semver-checks/tree/v0.38.0/src/lints/constructible_struct_adds_field.ron

Failed in:
Summary semver requires new major version: 1 major and 0 minor checks failed
field TableProperties.enable_type_widening in /home/runner/work/delta-kernel-rs/delta-kernel-rs/kernel/src/table_properties.rs:141

That seems like a legit (tho annoying) callout of a breaking change?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep +1 came here to comment the same thing (this is a breaking change) - also this seems a tangential change, do we want to include it here?


/// any unrecognized properties are passed through and ignored by the parser
pub unknown_properties: HashMap<String, String>,
}
Expand Down Expand Up @@ -268,6 +271,7 @@ mod tests {
("delta.tuneFileSizesForRewrites", "true"),
("delta.checkpointPolicy", "v2"),
("delta.enableRowTracking", "true"),
("delta.enableTypeWidening", "true"),
];
let actual = TableProperties::from(properties.into_iter());
let expected = TableProperties {
Expand All @@ -293,6 +297,7 @@ mod tests {
tune_file_sizes_for_rewrites: Some(true),
checkpoint_policy: Some(CheckpointPolicy::V2),
enable_row_tracking: Some(true),
enable_type_widening: Some(true),
unknown_properties: HashMap::new(),
};
assert_eq!(actual, expected);
Expand Down
1 change: 1 addition & 0 deletions kernel/src/table_properties/deserialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ fn try_parse(props: &mut TableProperties, k: &str, v: &str) -> Option<()> {
}
"delta.checkpointPolicy" => props.checkpoint_policy = CheckpointPolicy::try_from(v).ok(),
"delta.enableRowTracking" => props.enable_row_tracking = Some(parse_bool(v)?),
"delta.enableTypeWidening" => props.enable_type_widening = Some(parse_bool(v)?),
_ => return None,
}
Some(())
Expand Down
3 changes: 1 addition & 2 deletions kernel/tests/cdf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,8 +398,7 @@ fn invalid_range_end_before_start() {
#[test]
fn invalid_range_start_after_last_version_of_table() {
let res = read_cdf_for_table("cdf-table-simple", 3, 4, None);
let expected_msg = "Expected the first commit to have version 3";
assert!(matches!(res, Err(Error::Generic(msg)) if msg == expected_msg));
assert!(matches!(res, Err(Error::FileNotFound(_))));
}

#[test]
Expand Down
Loading