diff --git a/kernel/src/log_segment.rs b/kernel/src/log_segment.rs index 24d78a986..2de9239b6 100644 --- a/kernel/src/log_segment.rs +++ b/kernel/src/log_segment.rs @@ -166,14 +166,20 @@ impl LogSegment { .filter_ok(|x| x.is_commit()) .try_collect()?; + // Return a FileNotFound error if there are no new commits. + // Callers can then decide how to handle this case. + require!( + !ascending_commit_files.is_empty(), + Error::file_not_found("No new commits.") + ); + // - Here check that the start version is correct. // - [`LogSegment::try_new`] will verify that the `end_version` is correct if present. // - [`LogSegment::try_new`] also checks that there are no gaps between commits. // If all three are satisfied, this implies that all the desired commits are present. require!( - ascending_commit_files - .first() - .is_some_and(|first_commit| first_commit.version == start_version), + // safety: we validated the list is not empty above + ascending_commit_files[0].version == start_version, Error::generic(format!( "Expected the first commit to have version {}", start_version @@ -181,6 +187,38 @@ impl LogSegment { ); LogSegment::try_new(ascending_commit_files, vec![], log_root, end_version) } + + /// Extends this LogSegment with the contents of another LogSegment. + /// + /// The other LogSegment must be contiguous with this one, i.e. the end + /// version of this LogSegment must be one less than the start version of + /// the other LogSegment and contain only commit files. + pub(crate) fn extend(&mut self, other: &LogSegment) -> DeltaResult<()> { + require!( + self.log_root == other.log_root, + Error::generic("Cannot merge LogSegments with different log roots") + ); + require!( + other.checkpoint_parts.is_empty(), + Error::generic("Cannot extend by LogSegments with checkpoint parts") + ); + + if other.ascending_commit_files.is_empty() { + return Ok(()); + } + + require!( + Some(self.end_version + 1) == other.ascending_commit_files.first().map(|f| f.version), + Error::generic("Cannot merge non contiguous LogSegments") + ); + + self.ascending_commit_files + .extend(other.ascending_commit_files.iter().cloned()); + self.end_version = other.end_version; + + Ok(()) + } + /// Read a stream of log data from this log segment. /// /// The log files will be read from most recent to oldest. @@ -226,8 +264,11 @@ impl LogSegment { Ok(commit_stream.chain(checkpoint_stream)) } - // Get the most up-to-date Protocol and Metadata actions - pub(crate) fn read_metadata(&self, engine: &dyn Engine) -> DeltaResult<(Metadata, Protocol)> { + // Scan the log segment for metadata and protocol actions + pub(crate) fn read_metadata_opt( + &self, + engine: &dyn Engine, + ) -> DeltaResult<(Option, Option)> { let data_batches = self.replay_for_metadata(engine)?; let (mut metadata_opt, mut protocol_opt) = (None, None); for batch in data_batches { @@ -243,7 +284,12 @@ impl LogSegment { break; } } - match (metadata_opt, protocol_opt) { + Ok((metadata_opt, protocol_opt)) + } + + // Get the most up-to-date Protocol and Metadata actions + pub(crate) fn read_metadata(&self, engine: &dyn Engine) -> DeltaResult<(Metadata, Protocol)> { + match self.read_metadata_opt(engine)? { (Some(m), Some(p)) => Ok((m, p)), (None, Some(_)) => Err(Error::MissingMetadata), (Some(_), None) => Err(Error::MissingProtocol), diff --git a/kernel/src/log_segment/tests.rs b/kernel/src/log_segment/tests.rs index ed029b006..598e38d7a 100644 --- a/kernel/src/log_segment/tests.rs +++ b/kernel/src/log_segment/tests.rs @@ -9,7 +9,7 @@ use crate::engine::default::filesystem::ObjectStoreFileSystemClient; use crate::engine::sync::SyncEngine; use crate::log_segment::LogSegment; use crate::snapshot::CheckpointMetadata; -use crate::{FileSystemClient, Table}; +use crate::{Error, FileSystemClient, Table}; use test_utils::delta_path_for_version; // NOTE: In addition to testing the meta-predicate for metadata replay, this test also verifies @@ -500,7 +500,6 @@ fn test_non_contiguous_log() { #[test] fn table_changes_fails_with_larger_start_version_than_end() { - // Commit with version 1 is missing let (client, log_root) = build_log_with_paths_and_checkpoint( &[ delta_path_for_version(0, "json"), @@ -511,3 +510,60 @@ fn table_changes_fails_with_larger_start_version_than_end() { let log_segment_res = LogSegment::for_table_changes(client.as_ref(), log_root, 1, Some(0)); assert!(log_segment_res.is_err()); } + +#[test] +fn empty_log_segment_returns_specific_error() { + let (client, log_root) = build_log_with_paths_and_checkpoint( + &[ + delta_path_for_version(0, "json"), + delta_path_for_version(1, "json"), + ], + None, + ); + let log_segment_res = LogSegment::for_table_changes(client.as_ref(), log_root, 2, None); + assert!(matches!(log_segment_res, Err(Error::FileNotFound(_)))); +} + +#[test] +fn extend_log_segment_by_andother_log_segment() { + let (client, log_root) = build_log_with_paths_and_checkpoint( + &[ + delta_path_for_version(0, "json"), + delta_path_for_version(1, "json"), + delta_path_for_version(2, "json"), + delta_path_for_version(3, "json"), + ], + None, + ); + + let mut log_segment = + LogSegment::for_snapshot(client.as_ref(), log_root.clone(), None, 1).unwrap(); + assert_eq!(log_segment.end_version, 1); + assert_eq!(log_segment.ascending_commit_files.len(), 2); + let log_segment_changes = + LogSegment::for_table_changes(client.as_ref(), log_root.clone(), 2, None).unwrap(); + log_segment.extend(&log_segment_changes).unwrap(); + assert_eq!(log_segment.end_version, 3); + assert_eq!(log_segment.ascending_commit_files.len(), 4); + + // non contiguous ranges + let mut log_segment = + LogSegment::for_snapshot(client.as_ref(), log_root.clone(), None, 0).unwrap(); + assert!(log_segment.extend(&log_segment_changes).is_err()); + + // non matching log roots + let mut log_segment = + LogSegment::for_snapshot(client.as_ref(), log_root.clone(), None, 1).unwrap(); + let mut log_segment_changes = + LogSegment::for_table_changes(client.as_ref(), log_root.clone(), 2, None).unwrap(); + log_segment_changes.log_root = Url::parse("file:///").unwrap(); + assert!(log_segment.extend(&log_segment_changes).is_err()); + + // non-empty checkpoint parts + let mut log_segment = + LogSegment::for_snapshot(client.as_ref(), log_root.clone(), None, 1).unwrap(); + let mut log_segment_changes = + LogSegment::for_table_changes(client.as_ref(), log_root.clone(), 2, None).unwrap(); + log_segment_changes.checkpoint_parts = log_segment.ascending_commit_files.clone(); + assert!(log_segment.extend(&log_segment_changes).is_err()); +} diff --git a/kernel/src/snapshot.rs b/kernel/src/snapshot.rs index 75f52ab78..38829dd2b 100644 --- a/kernel/src/snapshot.rs +++ b/kernel/src/snapshot.rs @@ -59,7 +59,7 @@ impl Snapshot { pub fn try_new( table_root: Url, engine: &dyn Engine, - version: Option, + version: impl Into>, ) -> DeltaResult { let fs_client = engine.get_file_system_client(); let log_root = table_root.join("_delta_log/")?; @@ -152,6 +152,64 @@ impl Snapshot { pub fn into_scan_builder(self) -> ScanBuilder { ScanBuilder::new(self) } + + /// Update the `Snapshot` to the target version. + /// + /// # Parameters + /// - `target_version`: desired version of the `Snapshot` after update, defaults to latest. + /// - `engine`: Implementation of [`Engine`] apis. + /// + /// # Returns + /// - boolean flag indicating if the `Snapshot` was updated. + pub fn update( + &mut self, + target_version: impl Into>, + engine: &dyn Engine, + ) -> DeltaResult { + let fs_client = engine.get_file_system_client(); + let log_root = self.table_root.join("_delta_log/")?; + let log_segment = match LogSegment::for_table_changes( + fs_client.as_ref(), + log_root, + self.version() + 1, + target_version, + ) { + Ok(segment) => segment, + Err(Error::FileNotFound(_)) => return Ok(false), + Err(e) => return Err(e), + }; + + let (metadata, protocol) = log_segment.read_metadata_opt(engine)?; + if let Some(p) = &protocol { + p.ensure_read_supported()?; + } + let (schema, table_properties) = if let Some(m) = &metadata { + let schema = m.parse_schema()?; + let table_properties = m.parse_table_properties(); + let column_mapping_mode = column_mapping_mode( + protocol.as_ref().unwrap_or(&self.protocol), + &table_properties, + ); + validate_schema_column_mapping(&schema, column_mapping_mode)?; + (Some(schema), Some(table_properties)) + } else { + (None, None) + }; + + // NOTE: we try to extend the log segment first, so that if it fails, we don't update the + // snapshot. Otherwise callers might end up with an inconsistent snapshot. + self.log_segment.extend(&log_segment)?; + if let Some(p) = protocol { + self.protocol = p; + } + if let (Some(m), Some(s), Some(t)) = (metadata, schema, table_properties) { + self.metadata = m; + self.schema = s; + self.table_properties = t; + } + + Ok(true) + } } #[derive(Debug, Deserialize, Serialize)] @@ -217,7 +275,7 @@ mod tests { use crate::engine::default::filesystem::ObjectStoreFileSystemClient; use crate::engine::sync::SyncEngine; use crate::path::ParsedLogPath; - use crate::schema::StructType; + use crate::schema::{DataType, PrimitiveType, StructType}; #[test] fn test_snapshot_read_metadata() { @@ -348,4 +406,35 @@ mod tests { 3, ); } + + #[test] + fn test_snapshot_update() { + let path = std::fs::canonicalize(PathBuf::from("./tests/data/type-widening/")).unwrap(); + + let location = url::Url::from_directory_path(path).unwrap(); + let engine = SyncEngine::new(); + let mut snapshot = Snapshot::try_new(location, &engine, 0).unwrap(); + + assert_eq!(snapshot.protocol().min_reader_version(), 1); + assert_eq!(snapshot.table_properties.enable_type_widening, None); + + snapshot.update(1, &engine).unwrap(); + assert_eq!(snapshot.protocol().min_reader_version(), 3); + assert_eq!(snapshot.table_properties.enable_type_widening, Some(true)); + assert!(matches!( + snapshot.schema().field("int_decimal").unwrap().data_type(), + &DataType::Primitive(PrimitiveType::Integer) + )); + + snapshot.update(None, &engine).unwrap(); + assert_eq!(snapshot.protocol().min_reader_version(), 3); + assert_eq!(snapshot.table_properties.enable_type_widening, Some(true)); + assert!(matches!( + snapshot.schema().field("int_decimal").unwrap().data_type(), + &DataType::Primitive(PrimitiveType::Decimal(11, 1)) + )); + + // update return false if no new version + assert!(!snapshot.update(None, &engine).unwrap()); + } } diff --git a/kernel/src/table_properties.rs b/kernel/src/table_properties.rs index 949bd5ac4..0363b202c 100644 --- a/kernel/src/table_properties.rs +++ b/kernel/src/table_properties.rs @@ -137,6 +137,9 @@ pub struct TableProperties { /// whether to enable row tracking during writes. pub enable_row_tracking: Option, + /// whether to enable type widening + pub enable_type_widening: Option, + /// any unrecognized properties are passed through and ignored by the parser pub unknown_properties: HashMap, } @@ -268,6 +271,7 @@ mod tests { ("delta.tuneFileSizesForRewrites", "true"), ("delta.checkpointPolicy", "v2"), ("delta.enableRowTracking", "true"), + ("delta.enableTypeWidening", "true"), ]; let actual = TableProperties::from(properties.into_iter()); let expected = TableProperties { @@ -293,6 +297,7 @@ mod tests { tune_file_sizes_for_rewrites: Some(true), checkpoint_policy: Some(CheckpointPolicy::V2), enable_row_tracking: Some(true), + enable_type_widening: Some(true), unknown_properties: HashMap::new(), }; assert_eq!(actual, expected); diff --git a/kernel/src/table_properties/deserialize.rs b/kernel/src/table_properties/deserialize.rs index c9da1495b..35690d4d4 100644 --- a/kernel/src/table_properties/deserialize.rs +++ b/kernel/src/table_properties/deserialize.rs @@ -76,6 +76,7 @@ fn try_parse(props: &mut TableProperties, k: &str, v: &str) -> Option<()> { } "delta.checkpointPolicy" => props.checkpoint_policy = CheckpointPolicy::try_from(v).ok(), "delta.enableRowTracking" => props.enable_row_tracking = Some(parse_bool(v)?), + "delta.enableTypeWidening" => props.enable_type_widening = Some(parse_bool(v)?), _ => return None, } Some(()) diff --git a/kernel/tests/cdf.rs b/kernel/tests/cdf.rs index 2560dc71d..b7dfe603f 100644 --- a/kernel/tests/cdf.rs +++ b/kernel/tests/cdf.rs @@ -398,8 +398,7 @@ fn invalid_range_end_before_start() { #[test] fn invalid_range_start_after_last_version_of_table() { let res = read_cdf_for_table("cdf-table-simple", 3, 4, None); - let expected_msg = "Expected the first commit to have version 3"; - assert!(matches!(res, Err(Error::Generic(msg)) if msg == expected_msg)); + assert!(matches!(res, Err(Error::FileNotFound(_)))); } #[test]