diff --git a/.gitignore b/.gitignore index a7faecad9..11bf875df 100644 --- a/.gitignore +++ b/.gitignore @@ -6,6 +6,7 @@ .idea/ .vscode/ .vim +.zed # Rust .cargo/ diff --git a/README.md b/README.md index 46ec1b10f..6e25a2ddb 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,7 @@ Delta-kernel-rs is split into a few different crates: - kernel: The actual core kernel crate - acceptance: Acceptance tests that validate correctness via the [Delta Acceptance Tests][dat] - derive-macros: A crate for our [derive-macros] to live in -- ffi: Functionallity that enables delta-kernel-rs to be used from `C` or `C++` See the [ffi](ffi) +- ffi: Functionality that enables delta-kernel-rs to be used from `C` or `C++` See the [ffi](ffi) directory for more information. ## Building @@ -66,12 +66,12 @@ are still unstable. We therefore may break APIs within minor releases (that is, we will not break APIs in patch releases (`0.1.0` -> `0.1.1`). ## Arrow versioning -If you enable the `default-engine` or `sync-engine` features, you get an implemenation of the +If you enable the `default-engine` or `sync-engine` features, you get an implementation of the `Engine` trait that uses [Arrow] as its data format. The [`arrow crate`](https://docs.rs/arrow/latest/arrow/) tends to release new major versions rather quickly. To enable engines that already integrate arrow to also integrate kernel and not force them -to track a specific version of arrow that kernel depends on, we take as broad dependecy on arrow +to track a specific version of arrow that kernel depends on, we take as broad dependency on arrow versions as we can. This means you can force kernel to rely on the specific arrow version that your engine already uses, @@ -96,7 +96,7 @@ arrow-schema = "53.0" parquet = "53.0" ``` -Note that unfortunatly patching in `cargo` requires that _exactly one_ version matches your +Note that unfortunately patching in `cargo` requires that _exactly one_ version matches your specification. If only arrow "53.0.0" had been released the above will work, but if "53.0.1" where to be released, the specification will break and you will need to provide a more restrictive specification like `"=53.0.0"`. @@ -111,7 +111,7 @@ and then checking what version of `object_store` it depends on. ## Documentation - [API Docs](https://docs.rs/delta_kernel/latest/delta_kernel/) -- [arcitecture.md](doc/architecture.md) document describing the kernel architecture (currently wip) +- [architecture.md](doc/architecture.md) document describing the kernel architecture (currently wip) ## Examples diff --git a/ffi/README.md b/ffi/README.md index 6106b685f..47ea31e60 100644 --- a/ffi/README.md +++ b/ffi/README.md @@ -8,7 +8,7 @@ This crate provides a c foreign function internface (ffi) for delta-kernel-rs. You can build static and shared-libraries, as well as the include headers by simply running: ```sh -cargo build [--release] [--features default-engine] +cargo build [--release] ``` This will place libraries in the root `target` dir (`../target/[debug,release]` from the directory containing this README), and headers in `../target/ffi-headers`. In that directory there will be a `delta_kernel_ffi.h` file, which is the C header, and a `delta_kernel_ffi.hpp` which is the C++ header. diff --git a/ffi/examples/read-table/README.md b/ffi/examples/read-table/README.md index 4debb048e..e70d1e42a 100644 --- a/ffi/examples/read-table/README.md +++ b/ffi/examples/read-table/README.md @@ -10,9 +10,9 @@ This example is built with [cmake]. Instructions below assume you start in the d Note that prior to building these examples you must build `delta_kernel_ffi` (see [the FFI readme] for details). TLDR: ```bash # from repo root -$ cargo build -p delta_kernel_ffi [--release] [--features default-engine, tracing] +$ cargo build -p delta_kernel_ffi [--release] --features tracing # from ffi/ dir -$ cargo build [--release] [--features default-engine, tracing] +$ cargo build [--release] --features tracing ``` There are two configurations that can currently be configured in cmake: diff --git a/ffi/examples/read-table/arrow.c b/ffi/examples/read-table/arrow.c index d58a2fa2d..7eb32b7c3 100644 --- a/ffi/examples/read-table/arrow.c +++ b/ffi/examples/read-table/arrow.c @@ -97,7 +97,7 @@ static GArrowRecordBatch* add_partition_columns( } GArrowArray* partition_col = garrow_array_builder_finish((GArrowArrayBuilder*)builder, &error); - if (report_g_error("Can't build string array for parition column", error)) { + if (report_g_error("Can't build string array for partition column", error)) { printf("Giving up on column %s\n", col); g_error_free(error); g_object_unref(builder); @@ -144,7 +144,7 @@ static void add_batch_to_context( } record_batch = add_partition_columns(record_batch, partition_cols, partition_values); if (record_batch == NULL) { - printf("Failed to add parition columns, not adding batch\n"); + printf("Failed to add partition columns, not adding batch\n"); return; } context->batches = g_list_append(context->batches, record_batch); diff --git a/ffi/examples/read-table/read_table.c b/ffi/examples/read-table/read_table.c index 0aa8caa41..7b1a7f2c7 100644 --- a/ffi/examples/read-table/read_table.c +++ b/ffi/examples/read-table/read_table.c @@ -43,7 +43,7 @@ void print_partition_info(struct EngineContext* context, const CStringMap* parti } // Kernel will call this function for each file that should be scanned. The arguments include enough -// context to constuct the correct logical data from the physically read parquet +// context to construct the correct logical data from the physically read parquet void scan_row_callback( void* engine_context, KernelStringSlice path, diff --git a/ffi/src/engine_funcs.rs b/ffi/src/engine_funcs.rs index f8534dfc0..1afb60510 100644 --- a/ffi/src/engine_funcs.rs +++ b/ffi/src/engine_funcs.rs @@ -42,7 +42,7 @@ impl Drop for FileReadResultIterator { } } -/// Call the engine back with the next `EngingeData` batch read by Parquet/Json handler. The +/// Call the engine back with the next `EngineData` batch read by Parquet/Json handler. The /// _engine_ "owns" the data that is passed into the `engine_visitor`, since it is allocated by the /// `Engine` being used for log-replay. If the engine wants the kernel to free this data, it _must_ /// call [`free_engine_data`] on it. diff --git a/ffi/src/expressions/kernel.rs b/ffi/src/expressions/kernel.rs index f2ed8b1a3..a5116db47 100644 --- a/ffi/src/expressions/kernel.rs +++ b/ffi/src/expressions/kernel.rs @@ -83,7 +83,7 @@ pub struct EngineExpressionVisitor { /// Visit a 64bit timestamp belonging to the list identified by `sibling_list_id`. /// The timestamp is microsecond precision with no timezone. pub visit_literal_timestamp_ntz: VisitLiteralFn, - /// Visit a 32bit intger `date` representing days since UNIX epoch 1970-01-01. The `date` belongs + /// Visit a 32bit integer `date` representing days since UNIX epoch 1970-01-01. The `date` belongs /// to the list identified by `sibling_list_id`. pub visit_literal_date: VisitLiteralFn, /// Visit binary data at the `buffer` with length `len` belonging to the list identified by diff --git a/ffi/src/handle.rs b/ffi/src/handle.rs index 27b35bea5..30b695ecc 100644 --- a/ffi/src/handle.rs +++ b/ffi/src/handle.rs @@ -2,8 +2,8 @@ //! boundary. //! //! Creating a [`Handle`] always implies some kind of ownership transfer. A mutable handle takes -//! ownership of the object itself (analagous to [`Box`]), while a non-mutable (shared) handle -//! takes ownership of a shared reference to the object (analagous to [`std::sync::Arc`]). Thus, a created +//! ownership of the object itself (analogous to [`Box`]), while a non-mutable (shared) handle +//! takes ownership of a shared reference to the object (analogous to [`std::sync::Arc`]). Thus, a created //! handle remains [valid][Handle#Validity], and its underlying object remains accessible, until the //! handle is explicitly dropped or consumed. Dropping a mutable handle always drops the underlying //! object as well; dropping a shared handle only drops the underlying object if the handle was the diff --git a/ffi/src/scan.rs b/ffi/src/scan.rs index d5695c130..86f5e7e5f 100644 --- a/ffi/src/scan.rs +++ b/ffi/src/scan.rs @@ -383,7 +383,7 @@ struct ContextWrapper { /// data which provides the data handle and selection vector as each element in the iterator. /// /// # Safety -/// engine is responsbile for passing a valid [`ExclusiveEngineData`] and selection vector. +/// engine is responsible for passing a valid [`ExclusiveEngineData`] and selection vector. #[no_mangle] pub unsafe extern "C" fn visit_scan_data( data: Handle, diff --git a/ffi/src/test_ffi.rs b/ffi/src/test_ffi.rs index 27c7063fa..55456d7e5 100644 --- a/ffi/src/test_ffi.rs +++ b/ffi/src/test_ffi.rs @@ -12,7 +12,7 @@ use delta_kernel::{ /// output expression can be found in `ffi/tests/test_expression_visitor/expected.txt`. /// /// # Safety -/// The caller is responsible for freeing the retured memory, either by calling +/// The caller is responsible for freeing the returned memory, either by calling /// [`free_kernel_predicate`], or [`Handle::drop_handle`] #[no_mangle] pub unsafe extern "C" fn get_testing_kernel_expression() -> Handle { @@ -25,18 +25,17 @@ pub unsafe extern "C" fn get_testing_kernel_expression() -> Handle arrow::datatypes::Schema { fn create_kernel_schema() -> delta_kernel::schema::Schema { use delta_kernel::schema::{DataType, Schema, StructField}; - let field_a = StructField::new("a", DataType::LONG, false); - let field_b = StructField::new("b", DataType::BOOLEAN, false); + let field_a = StructField::not_null("a", DataType::LONG); + let field_b = StructField::not_null("b", DataType::BOOLEAN); Schema::new(vec![field_a, field_b]) } fn main() { let arrow_schema = create_arrow_schema(); let kernel_schema = create_kernel_schema(); - let convereted: delta_kernel::schema::Schema = + let converted: delta_kernel::schema::Schema = delta_kernel::schema::Schema::try_from(&arrow_schema).expect("couldn't convert"); - assert!(kernel_schema == convereted); + assert!(kernel_schema == converted); println!("Okay, made it"); } diff --git a/kernel/examples/inspect-table/src/main.rs b/kernel/examples/inspect-table/src/main.rs index ea25a8404..194530004 100644 --- a/kernel/examples/inspect-table/src/main.rs +++ b/kernel/examples/inspect-table/src/main.rs @@ -184,7 +184,7 @@ fn print_scan_file( fn try_main() -> DeltaResult<()> { let cli = Cli::parse(); - // build a table and get the lastest snapshot from it + // build a table and get the latest snapshot from it let table = Table::try_from_uri(&cli.path)?; let engine = DefaultEngine::try_new( diff --git a/kernel/examples/read-table-multi-threaded/src/main.rs b/kernel/examples/read-table-multi-threaded/src/main.rs index d97b6c2d3..e689a4ef4 100644 --- a/kernel/examples/read-table-multi-threaded/src/main.rs +++ b/kernel/examples/read-table-multi-threaded/src/main.rs @@ -104,7 +104,7 @@ fn truncate_batch(batch: RecordBatch, rows: usize) -> RecordBatch { RecordBatch::try_new(batch.schema(), cols).unwrap() } -// This is the callback that will be called fo each valid scan row +// This is the callback that will be called for each valid scan row fn send_scan_file( scan_tx: &mut spmc::Sender, path: &str, @@ -125,7 +125,7 @@ fn send_scan_file( fn try_main() -> DeltaResult<()> { let cli = Cli::parse(); - // build a table and get the lastest snapshot from it + // build a table and get the latest snapshot from it let table = Table::try_from_uri(&cli.path)?; println!("Reading {}", table.location()); @@ -279,7 +279,7 @@ fn do_work( // this example uses the parquet_handler from the engine, but an engine could // choose to use whatever method it might want to read a parquet file. The reader - // could, for example, fill in the parition columns, or apply deletion vectors. Here + // could, for example, fill in the partition columns, or apply deletion vectors. Here // we assume a more naive parquet reader and fix the data up after the fact. // further parallelism would also be possible here as we could read the parquet file // in chunks where each thread reads one chunk. The engine would need to ensure diff --git a/kernel/examples/read-table-single-threaded/src/main.rs b/kernel/examples/read-table-single-threaded/src/main.rs index 32ad3173d..9bbc9476d 100644 --- a/kernel/examples/read-table-single-threaded/src/main.rs +++ b/kernel/examples/read-table-single-threaded/src/main.rs @@ -69,7 +69,7 @@ fn main() -> ExitCode { fn try_main() -> DeltaResult<()> { let cli = Cli::parse(); - // build a table and get the lastest snapshot from it + // build a table and get the latest snapshot from it let table = Table::try_from_uri(&cli.path)?; println!("Reading {}", table.location()); diff --git a/kernel/src/actions/mod.rs b/kernel/src/actions/mod.rs index c62486873..2352e0db7 100644 --- a/kernel/src/actions/mod.rs +++ b/kernel/src/actions/mod.rs @@ -331,8 +331,13 @@ where struct CommitInfo { /// The time this logical file was created, as milliseconds since the epoch. /// Read: optional, write: required (that is, kernel always writes). - /// If in-commit timestamps are enabled, this is always required. pub(crate) timestamp: Option, + /// The time this logical file was created, as milliseconds since the epoch. Unlike + /// `timestamp`, this field is guaranteed to be monotonically increase with each commit. + /// Note: If in-commit timestamps are enabled, both the following must be true: + /// - The `inCommitTimestamp` field must always be present in CommitInfo. + /// - The CommitInfo action must always be the first one in a commit. + pub(crate) in_commit_timestamp: Option, /// An arbitrary string that identifies the operation associated with this commit. This is /// specified by the engine. Read: optional, write: required (that is, kernel alwarys writes). pub(crate) operation: Option, @@ -375,7 +380,7 @@ pub struct Add { /// in the added file must be contained in one or more remove actions in the same version. pub data_change: bool, - /// Contains [statistics] (e.g., count, min/max values for columns) about the data in this logical file. + /// Contains [statistics] (e.g., count, min/max values for columns) about the data in this logical file encoded as a JSON string. /// /// [statistics]: https://github.com/delta-io/delta/blob/master/PROTOCOL.md#Per-file-Statistics #[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))] @@ -519,38 +524,30 @@ mod tests { .project(&[METADATA_NAME]) .expect("Couldn't get metaData field"); - let expected = Arc::new(StructType::new([StructField::new( + let expected = Arc::new(StructType::new([StructField::nullable( "metaData", StructType::new([ - StructField::new("id", DataType::STRING, false), - StructField::new("name", DataType::STRING, true), - StructField::new("description", DataType::STRING, true), - StructField::new( + StructField::not_null("id", DataType::STRING), + StructField::nullable("name", DataType::STRING), + StructField::nullable("description", DataType::STRING), + StructField::not_null( "format", StructType::new([ - StructField::new("provider", DataType::STRING, false), - StructField::new( + StructField::not_null("provider", DataType::STRING), + StructField::not_null( "options", MapType::new(DataType::STRING, DataType::STRING, false), - false, ), ]), - false, ), - StructField::new("schemaString", DataType::STRING, false), - StructField::new( - "partitionColumns", - ArrayType::new(DataType::STRING, false), - false, - ), - StructField::new("createdTime", DataType::LONG, true), - StructField::new( + StructField::not_null("schemaString", DataType::STRING), + StructField::not_null("partitionColumns", ArrayType::new(DataType::STRING, false)), + StructField::nullable("createdTime", DataType::LONG), + StructField::not_null( "configuration", MapType::new(DataType::STRING, DataType::STRING, false), - false, ), ]), - true, )])); assert_eq!(schema, expected); } @@ -561,61 +558,55 @@ mod tests { .project(&[ADD_NAME]) .expect("Couldn't get add field"); - let expected = Arc::new(StructType::new([StructField::new( + let expected = Arc::new(StructType::new([StructField::nullable( "add", StructType::new([ - StructField::new("path", DataType::STRING, false), - StructField::new( + StructField::not_null("path", DataType::STRING), + StructField::not_null( "partitionValues", MapType::new(DataType::STRING, DataType::STRING, true), - false, ), - StructField::new("size", DataType::LONG, false), - StructField::new("modificationTime", DataType::LONG, false), - StructField::new("dataChange", DataType::BOOLEAN, false), - StructField::new("stats", DataType::STRING, true), - StructField::new( + StructField::not_null("size", DataType::LONG), + StructField::not_null("modificationTime", DataType::LONG), + StructField::not_null("dataChange", DataType::BOOLEAN), + StructField::nullable("stats", DataType::STRING), + StructField::nullable( "tags", MapType::new(DataType::STRING, DataType::STRING, false), - true, ), deletion_vector_field(), - StructField::new("baseRowId", DataType::LONG, true), - StructField::new("defaultRowCommitVersion", DataType::LONG, true), - StructField::new("clusteringProvider", DataType::STRING, true), + StructField::nullable("baseRowId", DataType::LONG), + StructField::nullable("defaultRowCommitVersion", DataType::LONG), + StructField::nullable("clusteringProvider", DataType::STRING), ]), - true, )])); assert_eq!(schema, expected); } fn tags_field() -> StructField { - StructField::new( + StructField::nullable( "tags", MapType::new(DataType::STRING, DataType::STRING, false), - true, ) } fn partition_values_field() -> StructField { - StructField::new( + StructField::nullable( "partitionValues", MapType::new(DataType::STRING, DataType::STRING, false), - true, ) } fn deletion_vector_field() -> StructField { - StructField::new( + StructField::nullable( "deletionVector", DataType::struct_type([ - StructField::new("storageType", DataType::STRING, false), - StructField::new("pathOrInlineDv", DataType::STRING, false), - StructField::new("offset", DataType::INTEGER, true), - StructField::new("sizeInBytes", DataType::INTEGER, false), - StructField::new("cardinality", DataType::LONG, false), + StructField::not_null("storageType", DataType::STRING), + StructField::not_null("pathOrInlineDv", DataType::STRING), + StructField::nullable("offset", DataType::INTEGER), + StructField::not_null("sizeInBytes", DataType::INTEGER), + StructField::not_null("cardinality", DataType::LONG), ]), - true, ) } @@ -624,21 +615,20 @@ mod tests { let schema = get_log_schema() .project(&[REMOVE_NAME]) .expect("Couldn't get remove field"); - let expected = Arc::new(StructType::new([StructField::new( + let expected = Arc::new(StructType::new([StructField::nullable( "remove", StructType::new([ - StructField::new("path", DataType::STRING, false), - StructField::new("deletionTimestamp", DataType::LONG, true), - StructField::new("dataChange", DataType::BOOLEAN, false), - StructField::new("extendedFileMetadata", DataType::BOOLEAN, true), + StructField::not_null("path", DataType::STRING), + StructField::nullable("deletionTimestamp", DataType::LONG), + StructField::not_null("dataChange", DataType::BOOLEAN), + StructField::nullable("extendedFileMetadata", DataType::BOOLEAN), partition_values_field(), - StructField::new("size", DataType::LONG, true), + StructField::nullable("size", DataType::LONG), tags_field(), deletion_vector_field(), - StructField::new("baseRowId", DataType::LONG, true), - StructField::new("defaultRowCommitVersion", DataType::LONG, true), + StructField::nullable("baseRowId", DataType::LONG), + StructField::nullable("defaultRowCommitVersion", DataType::LONG), ]), - true, )])); assert_eq!(schema, expected); } @@ -648,20 +638,18 @@ mod tests { let schema = get_log_schema() .project(&[CDC_NAME]) .expect("Couldn't get remove field"); - let expected = Arc::new(StructType::new([StructField::new( + let expected = Arc::new(StructType::new([StructField::nullable( "cdc", StructType::new([ - StructField::new("path", DataType::STRING, false), - StructField::new( + StructField::not_null("path", DataType::STRING), + StructField::not_null( "partitionValues", MapType::new(DataType::STRING, DataType::STRING, true), - false, ), - StructField::new("size", DataType::LONG, false), - StructField::new("dataChange", DataType::BOOLEAN, false), + StructField::not_null("size", DataType::LONG), + StructField::not_null("dataChange", DataType::BOOLEAN), tags_field(), ]), - true, )])); assert_eq!(schema, expected); } @@ -672,14 +660,13 @@ mod tests { .project(&["txn"]) .expect("Couldn't get transaction field"); - let expected = Arc::new(StructType::new([StructField::new( + let expected = Arc::new(StructType::new([StructField::nullable( "txn", StructType::new([ - StructField::new("appId", DataType::STRING, false), - StructField::new("version", DataType::LONG, false), - StructField::new("lastUpdated", DataType::LONG, true), + StructField::not_null("appId", DataType::STRING), + StructField::not_null("version", DataType::LONG), + StructField::nullable("lastUpdated", DataType::LONG), ]), - true, )])); assert_eq!(schema, expected); } @@ -690,24 +677,22 @@ mod tests { .project(&["commitInfo"]) .expect("Couldn't get commitInfo field"); - let expected = Arc::new(StructType::new(vec![StructField::new( + let expected = Arc::new(StructType::new(vec![StructField::nullable( "commitInfo", StructType::new(vec![ - StructField::new("timestamp", DataType::LONG, true), - StructField::new("operation", DataType::STRING, true), - StructField::new( + StructField::nullable("timestamp", DataType::LONG), + StructField::nullable("inCommitTimestamp", DataType::LONG), + StructField::nullable("operation", DataType::STRING), + StructField::nullable( "operationParameters", MapType::new(DataType::STRING, DataType::STRING, false), - true, ), - StructField::new("kernelVersion", DataType::STRING, true), - StructField::new( + StructField::nullable("kernelVersion", DataType::STRING), + StructField::nullable( "engineCommitInfo", MapType::new(DataType::STRING, DataType::STRING, false), - true, ), ]), - true, )])); assert_eq!(schema, expected); } diff --git a/kernel/src/actions/schemas.rs b/kernel/src/actions/schemas.rs index 0588c04d4..aa3b3e47b 100644 --- a/kernel/src/actions/schemas.rs +++ b/kernel/src/actions/schemas.rs @@ -84,20 +84,20 @@ pub(crate) trait GetNullableContainerStructField { // nullable values impl GetNullableContainerStructField for T { fn get_nullable_container_struct_field(name: impl Into) -> StructField { - StructField::new(name, T::to_nullable_container_type(), false) + StructField::not_null(name, T::to_nullable_container_type()) } } // Normal types produce non-nullable fields impl GetStructField for T { fn get_struct_field(name: impl Into) -> StructField { - StructField::new(name, T::to_data_type(), false) + StructField::not_null(name, T::to_data_type()) } } // Option types produce nullable fields impl GetStructField for Option { fn get_struct_field(name: impl Into) -> StructField { - StructField::new(name, T::to_data_type(), true) + StructField::nullable(name, T::to_data_type()) } } diff --git a/kernel/src/actions/visitors.rs b/kernel/src/actions/visitors.rs index 7d4be1a82..957befe80 100644 --- a/kernel/src/actions/visitors.rs +++ b/kernel/src/actions/visitors.rs @@ -267,7 +267,8 @@ impl RemoveVisitor { let extended_file_metadata: Option = getters[3].get_opt(row_index, "remove.extendedFileMetadata")?; - // TODO(nick) handle partition values in getters[4] + let partition_values: Option> = + getters[4].get_opt(row_index, "remove.partitionValues")?; let size: Option = getters[5].get_opt(row_index, "remove.size")?; @@ -284,7 +285,7 @@ impl RemoveVisitor { data_change, deletion_timestamp, extended_file_metadata, - partition_values: None, + partition_values, size, tags: None, deletion_vector, @@ -305,10 +306,9 @@ impl RowVisitor for RemoveVisitor { } fn visit<'a>(&mut self, row_count: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<()> { for i in 0..row_count { - // Since path column is required, use it to detect presence of an Remove action + // Since path column is required, use it to detect presence of a Remove action if let Some(path) = getters[0].get_opt(i, "remove.path")? { self.removes.push(Self::visit_remove(i, path, getters)?); - break; } } Ok(()) @@ -367,7 +367,7 @@ impl RowVisitor for CdcVisitor { pub type SetTransactionMap = HashMap; -/// Extact application transaction actions from the log into a map +/// Extract application transaction actions from the log into a map /// /// This visitor maintains the first entry for each application id it /// encounters. When a specific application id is required then @@ -603,11 +603,7 @@ mod tests { modification_time: 1670892998135, data_change: true, stats: Some("{\"numRecords\":1,\"minValues\":{\"c3\":5},\"maxValues\":{\"c3\":5},\"nullCount\":{\"c3\":0}}".into()), - tags: None, - deletion_vector: None, - base_row_id: None, - default_row_commit_version: None, - clustering_provider: None, + ..Default::default() }; let add2 = Add { path: "c1=5/c2=b/part-00007-4e73fa3b-2c88-424a-8051-f8b54328ffdb.c000.snappy.parquet".into(), @@ -630,11 +626,51 @@ mod tests { ..add1.clone() }; let expected = vec![add1, add2, add3]; + assert_eq!(add_visitor.adds.len(), expected.len()); for (add, expected) in add_visitor.adds.into_iter().zip(expected.into_iter()) { assert_eq!(add, expected); } } + #[test] + fn test_parse_remove_partitioned() { + let engine = SyncEngine::new(); + let json_handler = engine.get_json_handler(); + let json_strings: StringArray = vec![ + r#"{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}"#, + r#"{"metaData":{"id":"aff5cb91-8cd9-4195-aef9-446908507302","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"c1\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c2\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c3\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["c1","c2"],"configuration":{},"createdTime":1670892997849}}"#, + r#"{"remove":{"path":"c1=4/c2=c/part-00003-f525f459-34f9-46f5-82d6-d42121d883fd.c000.snappy.parquet","deletionTimestamp":1670892998135,"dataChange":true,"partitionValues":{"c1":"4","c2":"c"},"size":452}}"#, + ] + .into(); + let output_schema = get_log_schema().clone(); + let batch = json_handler + .parse_json(string_array_to_engine_data(json_strings), output_schema) + .unwrap(); + let mut remove_visitor = RemoveVisitor::default(); + remove_visitor.visit_rows_of(batch.as_ref()).unwrap(); + let expected_remove = Remove { + path: "c1=4/c2=c/part-00003-f525f459-34f9-46f5-82d6-d42121d883fd.c000.snappy.parquet" + .into(), + deletion_timestamp: Some(1670892998135), + data_change: true, + partition_values: Some(HashMap::from([ + ("c1".to_string(), "4".to_string()), + ("c2".to_string(), "c".to_string()), + ])), + size: Some(452), + ..Default::default() + }; + assert_eq!( + remove_visitor.removes.len(), + 1, + "Unexpected number of remove actions" + ); + assert_eq!( + remove_visitor.removes[0], expected_remove, + "Unexpected remove action" + ); + } + #[test] fn test_parse_txn() { let engine = SyncEngine::new(); diff --git a/kernel/src/engine/arrow_conversion.rs b/kernel/src/engine/arrow_conversion.rs index fbfdb487a..0b905ff3a 100644 --- a/kernel/src/engine/arrow_conversion.rs +++ b/kernel/src/engine/arrow_conversion.rs @@ -263,8 +263,7 @@ mod tests { fn test_metadata_string_conversion() -> DeltaResult<()> { let mut metadata = HashMap::new(); metadata.insert("description", "hello world".to_owned()); - let struct_field = - StructField::new("name", DataType::STRING, false).with_metadata(metadata); + let struct_field = StructField::not_null("name", DataType::STRING).with_metadata(metadata); let arrow_field = ArrowField::try_from(&struct_field)?; let new_metadata = arrow_field.metadata(); diff --git a/kernel/src/engine/arrow_utils.rs b/kernel/src/engine/arrow_utils.rs index 4700b72c0..06441b9d4 100644 --- a/kernel/src/engine/arrow_utils.rs +++ b/kernel/src/engine/arrow_utils.rs @@ -55,9 +55,9 @@ macro_rules! prim_array_cmp { pub(crate) use prim_array_cmp; -/// Get the indicies in `parquet_schema` of the specified columns in `requested_schema`. This -/// returns a tuples of (mask_indicies: Vec, reorder_indicies: -/// Vec). `mask_indicies` is used for generating the mask for reading from the +/// Get the indices in `parquet_schema` of the specified columns in `requested_schema`. This +/// returns a tuples of (mask_indices: Vec, reorder_indices: +/// Vec). `mask_indices` is used for generating the mask for reading from the pub(crate) fn make_arrow_error(s: impl Into) -> Error { Error::Arrow(arrow_schema::ArrowError::InvalidArgumentError(s.into())).with_backtrace() } @@ -763,9 +763,9 @@ mod tests { #[test] fn simple_mask_indices() { let requested_schema = Arc::new(StructType::new([ - StructField::new("i", DataType::INTEGER, false), - StructField::new("s", DataType::STRING, true), - StructField::new("i2", DataType::INTEGER, true), + StructField::not_null("i", DataType::INTEGER), + StructField::nullable("s", DataType::STRING), + StructField::nullable("i2", DataType::INTEGER), ])); let parquet_schema = Arc::new(ArrowSchema::new(vec![ ArrowField::new("i", ArrowDataType::Int32, false), @@ -787,8 +787,8 @@ mod tests { #[test] fn ensure_data_types_fails_correctly() { let requested_schema = Arc::new(StructType::new([ - StructField::new("i", DataType::INTEGER, false), - StructField::new("s", DataType::INTEGER, true), + StructField::not_null("i", DataType::INTEGER), + StructField::nullable("s", DataType::INTEGER), ])); let parquet_schema = Arc::new(ArrowSchema::new(vec![ ArrowField::new("i", ArrowDataType::Int32, false), @@ -798,8 +798,8 @@ mod tests { assert!(res.is_err()); let requested_schema = Arc::new(StructType::new([ - StructField::new("i", DataType::INTEGER, false), - StructField::new("s", DataType::STRING, true), + StructField::not_null("i", DataType::INTEGER), + StructField::nullable("s", DataType::STRING), ])); let parquet_schema = Arc::new(ArrowSchema::new(vec![ ArrowField::new("i", ArrowDataType::Int32, false), @@ -811,10 +811,9 @@ mod tests { #[test] fn mask_with_map() { - let requested_schema = Arc::new(StructType::new([StructField::new( + let requested_schema = Arc::new(StructType::new([StructField::not_null( "map", MapType::new(DataType::INTEGER, DataType::STRING, false), - false, )])); let parquet_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new_map( "map", @@ -835,9 +834,9 @@ mod tests { #[test] fn simple_reorder_indices() { let requested_schema = Arc::new(StructType::new([ - StructField::new("i", DataType::INTEGER, false), - StructField::new("s", DataType::STRING, true), - StructField::new("i2", DataType::INTEGER, true), + StructField::not_null("i", DataType::INTEGER), + StructField::nullable("s", DataType::STRING), + StructField::nullable("i2", DataType::INTEGER), ])); let parquet_schema = Arc::new(ArrowSchema::new(vec![ ArrowField::new("i2", ArrowDataType::Int32, true), @@ -859,9 +858,9 @@ mod tests { #[test] fn simple_nullable_field_missing() { let requested_schema = Arc::new(StructType::new([ - StructField::new("i", DataType::INTEGER, false), - StructField::new("s", DataType::STRING, true), - StructField::new("i2", DataType::INTEGER, true), + StructField::not_null("i", DataType::INTEGER), + StructField::nullable("s", DataType::STRING), + StructField::nullable("i2", DataType::INTEGER), ])); let parquet_schema = Arc::new(ArrowSchema::new(vec![ ArrowField::new("i", ArrowDataType::Int32, false), @@ -882,16 +881,15 @@ mod tests { #[test] fn nested_indices() { let requested_schema = Arc::new(StructType::new([ - StructField::new("i", DataType::INTEGER, false), - StructField::new( + StructField::not_null("i", DataType::INTEGER), + StructField::not_null( "nested", StructType::new([ - StructField::new("int32", DataType::INTEGER, false), - StructField::new("string", DataType::STRING, false), + StructField::not_null("int32", DataType::INTEGER), + StructField::not_null("string", DataType::STRING), ]), - false, ), - StructField::new("j", DataType::INTEGER, false), + StructField::not_null("j", DataType::INTEGER), ])); let parquet_schema = nested_parquet_schema(); let (mask_indices, reorder_indices) = @@ -912,16 +910,15 @@ mod tests { #[test] fn nested_indices_reorder() { let requested_schema = Arc::new(StructType::new([ - StructField::new( + StructField::not_null( "nested", StructType::new([ - StructField::new("string", DataType::STRING, false), - StructField::new("int32", DataType::INTEGER, false), + StructField::not_null("string", DataType::STRING), + StructField::not_null("int32", DataType::INTEGER), ]), - false, ), - StructField::new("j", DataType::INTEGER, false), - StructField::new("i", DataType::INTEGER, false), + StructField::not_null("j", DataType::INTEGER), + StructField::not_null("i", DataType::INTEGER), ])); let parquet_schema = nested_parquet_schema(); let (mask_indices, reorder_indices) = @@ -942,13 +939,12 @@ mod tests { #[test] fn nested_indices_mask_inner() { let requested_schema = Arc::new(StructType::new([ - StructField::new("i", DataType::INTEGER, false), - StructField::new( + StructField::not_null("i", DataType::INTEGER), + StructField::not_null( "nested", - StructType::new([StructField::new("int32", DataType::INTEGER, false)]), - false, + StructType::new([StructField::not_null("int32", DataType::INTEGER)]), ), - StructField::new("j", DataType::INTEGER, false), + StructField::not_null("j", DataType::INTEGER), ])); let parquet_schema = nested_parquet_schema(); let (mask_indices, reorder_indices) = @@ -966,9 +962,9 @@ mod tests { #[test] fn simple_list_mask() { let requested_schema = Arc::new(StructType::new([ - StructField::new("i", DataType::INTEGER, false), - StructField::new("list", ArrayType::new(DataType::INTEGER, false), false), - StructField::new("j", DataType::INTEGER, false), + StructField::not_null("i", DataType::INTEGER), + StructField::not_null("list", ArrayType::new(DataType::INTEGER, false)), + StructField::not_null("j", DataType::INTEGER), ])); let parquet_schema = Arc::new(ArrowSchema::new(vec![ ArrowField::new("i", ArrowDataType::Int32, false), @@ -997,10 +993,9 @@ mod tests { #[test] fn list_skip_earlier_element() { - let requested_schema = Arc::new(StructType::new([StructField::new( + let requested_schema = Arc::new(StructType::new([StructField::not_null( "list", ArrayType::new(DataType::INTEGER, false), - false, )])); let parquet_schema = Arc::new(ArrowSchema::new(vec![ ArrowField::new("i", ArrowDataType::Int32, false), @@ -1025,20 +1020,19 @@ mod tests { #[test] fn nested_indices_list() { let requested_schema = Arc::new(StructType::new([ - StructField::new("i", DataType::INTEGER, false), - StructField::new( + StructField::not_null("i", DataType::INTEGER), + StructField::not_null( "list", ArrayType::new( StructType::new([ - StructField::new("int32", DataType::INTEGER, false), - StructField::new("string", DataType::STRING, false), + StructField::not_null("int32", DataType::INTEGER), + StructField::not_null("string", DataType::STRING), ]) .into(), false, ), - false, ), - StructField::new("j", DataType::INTEGER, false), + StructField::not_null("j", DataType::INTEGER), ])); let parquet_schema = Arc::new(ArrowSchema::new(vec![ ArrowField::new("i", ArrowDataType::Int32, false), @@ -1077,8 +1071,8 @@ mod tests { #[test] fn nested_indices_unselected_list() { let requested_schema = Arc::new(StructType::new([ - StructField::new("i", DataType::INTEGER, false), - StructField::new("j", DataType::INTEGER, false), + StructField::not_null("i", DataType::INTEGER), + StructField::not_null("j", DataType::INTEGER), ])); let parquet_schema = Arc::new(ArrowSchema::new(vec![ ArrowField::new("i", ArrowDataType::Int32, false), @@ -1110,16 +1104,15 @@ mod tests { #[test] fn nested_indices_list_mask_inner() { let requested_schema = Arc::new(StructType::new([ - StructField::new("i", DataType::INTEGER, false), - StructField::new( + StructField::not_null("i", DataType::INTEGER), + StructField::not_null( "list", ArrayType::new( - StructType::new([StructField::new("int32", DataType::INTEGER, false)]).into(), + StructType::new([StructField::not_null("int32", DataType::INTEGER)]).into(), false, ), - false, ), - StructField::new("j", DataType::INTEGER, false), + StructField::not_null("j", DataType::INTEGER), ])); let parquet_schema = Arc::new(ArrowSchema::new(vec![ ArrowField::new("i", ArrowDataType::Int32, false), @@ -1155,20 +1148,19 @@ mod tests { #[test] fn nested_indices_list_mask_inner_reorder() { let requested_schema = Arc::new(StructType::new([ - StructField::new("i", DataType::INTEGER, false), - StructField::new( + StructField::not_null("i", DataType::INTEGER), + StructField::not_null( "list", ArrayType::new( StructType::new([ - StructField::new("string", DataType::STRING, false), - StructField::new("int2", DataType::INTEGER, false), + StructField::not_null("string", DataType::STRING), + StructField::not_null("int2", DataType::INTEGER), ]) .into(), false, ), - false, ), - StructField::new("j", DataType::INTEGER, false), + StructField::not_null("j", DataType::INTEGER), ])); let parquet_schema = Arc::new(ArrowSchema::new(vec![ ArrowField::new("i", ArrowDataType::Int32, false), // field 0 @@ -1208,16 +1200,15 @@ mod tests { #[test] fn skipped_struct() { let requested_schema = Arc::new(StructType::new([ - StructField::new("i", DataType::INTEGER, false), - StructField::new( + StructField::not_null("i", DataType::INTEGER), + StructField::not_null( "nested", StructType::new([ - StructField::new("int32", DataType::INTEGER, false), - StructField::new("string", DataType::STRING, false), + StructField::not_null("int32", DataType::INTEGER), + StructField::not_null("string", DataType::STRING), ]), - false, ), - StructField::new("j", DataType::INTEGER, false), + StructField::not_null("j", DataType::INTEGER), ])); let parquet_schema = Arc::new(ArrowSchema::new(vec![ ArrowField::new( @@ -1386,8 +1377,8 @@ mod tests { #[test] fn no_matches() { let requested_schema = Arc::new(StructType::new([ - StructField::new("s", DataType::STRING, true), - StructField::new("i2", DataType::INTEGER, true), + StructField::nullable("s", DataType::STRING), + StructField::nullable("i2", DataType::INTEGER), ])); let nots_field = ArrowField::new("NOTs", ArrowDataType::Utf8, true); let noti2_field = ArrowField::new("NOTi2", ArrowDataType::Int32, true); diff --git a/kernel/src/engine/default/json.rs b/kernel/src/engine/default/json.rs index 1912a7b34..ab296e12a 100644 --- a/kernel/src/engine/default/json.rs +++ b/kernel/src/engine/default/json.rs @@ -29,7 +29,7 @@ pub struct DefaultJsonHandler { store: Arc, /// The executor to run async tasks on task_executor: Arc, - /// The maximun number of batches to read ahead + /// The maximum number of batches to read ahead readahead: usize, /// The number of rows to read per batch batch_size: usize, diff --git a/kernel/src/engine/default/parquet.rs b/kernel/src/engine/default/parquet.rs index 1acc4ef4a..a65d329a2 100644 --- a/kernel/src/engine/default/parquet.rs +++ b/kernel/src/engine/default/parquet.rs @@ -258,7 +258,7 @@ impl FileOpener for ParquetOpener { let mut reader = ParquetObjectReader::new(store, meta); let metadata = ArrowReaderMetadata::load_async(&mut reader, Default::default()).await?; let parquet_schema = metadata.schema(); - let (indicies, requested_ordering) = + let (indices, requested_ordering) = get_requested_indices(&table_schema, parquet_schema)?; let options = ArrowReaderOptions::new(); //.with_page_index(enable_page_index); let mut builder = @@ -267,7 +267,7 @@ impl FileOpener for ParquetOpener { &table_schema, parquet_schema, builder.parquet_schema(), - &indicies, + &indices, ) { builder = builder.with_projection(mask) } @@ -330,7 +330,7 @@ impl FileOpener for PresignedUrlOpener { let reader = client.get(file_meta.location).send().await?.bytes().await?; let metadata = ArrowReaderMetadata::load(&reader, Default::default())?; let parquet_schema = metadata.schema(); - let (indicies, requested_ordering) = + let (indices, requested_ordering) = get_requested_indices(&table_schema, parquet_schema)?; let options = ArrowReaderOptions::new(); @@ -340,7 +340,7 @@ impl FileOpener for PresignedUrlOpener { &table_schema, parquet_schema, builder.parquet_schema(), - &indicies, + &indices, ) { builder = builder.with_projection(mask) } diff --git a/kernel/src/engine/ensure_data_types.rs b/kernel/src/engine/ensure_data_types.rs index 9b7ea7819..88ff01626 100644 --- a/kernel/src/engine/ensure_data_types.rs +++ b/kernel/src/engine/ensure_data_types.rs @@ -400,36 +400,33 @@ mod tests { #[test] fn ensure_struct() { - let schema = DataType::struct_type([StructField::new( + let schema = DataType::struct_type([StructField::nullable( "a", ArrayType::new( DataType::struct_type([ - StructField::new("w", DataType::LONG, true), - StructField::new("x", ArrayType::new(DataType::LONG, true), true), - StructField::new( + StructField::nullable("w", DataType::LONG), + StructField::nullable("x", ArrayType::new(DataType::LONG, true)), + StructField::nullable( "y", MapType::new(DataType::LONG, DataType::STRING, true), - true, ), - StructField::new( + StructField::nullable( "z", DataType::struct_type([ - StructField::new("n", DataType::LONG, true), - StructField::new("m", DataType::STRING, true), + StructField::nullable("n", DataType::LONG), + StructField::nullable("m", DataType::STRING), ]), - true, ), ]), true, ), - true, )]); let arrow_struct: ArrowDataType = (&schema).try_into().unwrap(); assert!(ensure_data_types(&schema, &arrow_struct, true).is_ok()); let kernel_simple = DataType::struct_type([ - StructField::new("w", DataType::LONG, true), - StructField::new("x", DataType::LONG, true), + StructField::nullable("w", DataType::LONG), + StructField::nullable("x", DataType::LONG), ]); let arrow_simple_ok = ArrowField::new_struct( diff --git a/kernel/src/engine/parquet_row_group_skipping.rs b/kernel/src/engine/parquet_row_group_skipping.rs index 20eea0acb..0adae6c4b 100644 --- a/kernel/src/engine/parquet_row_group_skipping.rs +++ b/kernel/src/engine/parquet_row_group_skipping.rs @@ -1,8 +1,6 @@ //! An implementation of parquet row group skipping using data skipping predicates over footer stats. -use crate::predicates::parquet_stats_skipping::{ - ParquetStatsProvider, ParquetStatsSkippingFilter as _, -}; use crate::expressions::{ColumnName, Expression, Scalar, UnaryExpression, BinaryExpression, VariadicExpression}; +use crate::predicates::parquet_stats_skipping::ParquetStatsProvider; use crate::schema::{DataType, PrimitiveType}; use chrono::{DateTime, Days}; use parquet::arrow::arrow_reader::ArrowReaderBuilder; @@ -57,6 +55,7 @@ impl<'a> RowGroupFilter<'a> { /// Applies a filtering predicate to a row group. Return value false means to skip it. fn apply(row_group: &'a RowGroupMetaData, predicate: &Expression) -> bool { + use crate::predicates::PredicateEvaluator as _; RowGroupFilter::new(row_group, predicate).eval_sql_where(predicate) != Some(false) } diff --git a/kernel/src/engine/sync/parquet.rs b/kernel/src/engine/sync/parquet.rs index 2a54e2e86..260ef321b 100644 --- a/kernel/src/engine/sync/parquet.rs +++ b/kernel/src/engine/sync/parquet.rs @@ -21,9 +21,8 @@ fn try_create_from_parquet( let metadata = ArrowReaderMetadata::load(&file, Default::default())?; let parquet_schema = metadata.schema(); let mut builder = ParquetRecordBatchReaderBuilder::try_new(file)?; - let (indicies, requested_ordering) = get_requested_indices(&schema, parquet_schema)?; - if let Some(mask) = generate_mask(&schema, parquet_schema, builder.parquet_schema(), &indicies) - { + let (indices, requested_ordering) = get_requested_indices(&schema, parquet_schema)?; + if let Some(mask) = generate_mask(&schema, parquet_schema, builder.parquet_schema(), &indices) { builder = builder.with_projection(mask); } if let Some(predicate) = predicate { diff --git a/kernel/src/engine_data.rs b/kernel/src/engine_data.rs index e421d0ad6..25a7e84bd 100644 --- a/kernel/src/engine_data.rs +++ b/kernel/src/engine_data.rs @@ -199,7 +199,7 @@ pub trait RowVisitor { /// "getter" of type [`GetData`] will be present. This can be used to actually get at the data /// for each row. You can `use` the `TypedGetData` trait if you want to have a way to extract /// typed data that will fail if the "getter" is for an unexpected type. The data in `getters` - /// does not outlive the call to this funtion (i.e. it should be copied if needed). + /// does not outlive the call to this function (i.e. it should be copied if needed). fn visit<'a>(&mut self, row_count: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<()>; /// Visit the rows of an [`EngineData`], selecting the leaf column names given by diff --git a/kernel/src/error.rs b/kernel/src/error.rs index e3230aeb9..815ef3e51 100644 --- a/kernel/src/error.rs +++ b/kernel/src/error.rs @@ -1,4 +1,4 @@ -//! Defintions of errors that the delta kernel can encounter +//! Definitions of errors that the delta kernel can encounter use std::{ backtrace::{Backtrace, BacktraceStatus}, @@ -58,7 +58,7 @@ pub enum Error { #[error("Internal error {0}. This is a kernel bug, please report.")] InternalError(String), - /// An error enountered while working with parquet data + /// An error encountered while working with parquet data #[cfg(feature = "parquet")] #[error("Arrow error: {0}")] Parquet(#[from] parquet::errors::ParquetError), @@ -99,7 +99,7 @@ pub enum Error { #[error("No table version found.")] MissingVersion, - /// An error occured while working with deletion vectors + /// An error occurred while working with deletion vectors #[error("Deletion Vector error: {0}")] DeletionVector(String), diff --git a/kernel/src/expressions/mod.rs b/kernel/src/expressions/mod.rs index bad20aea4..47d35afa1 100644 --- a/kernel/src/expressions/mod.rs +++ b/kernel/src/expressions/mod.rs @@ -47,6 +47,17 @@ pub enum BinaryOperator { } impl BinaryOperator { + /// True if this is a comparison for which NULL input always produces NULL output + pub(crate) fn is_null_intolerant_comparison(&self) -> bool { + use BinaryOperator::*; + match self { + Plus | Minus | Multiply | Divide => false, // not a comparison + LessThan | LessThanOrEqual | GreaterThan | GreaterThanOrEqual => true, + Equal | NotEqual => true, + Distinct | In | NotIn => false, // tolerates NULL input + } + } + /// Returns `` (if any) such that `B A` is equivalent to `A B`. pub(crate) fn commute(&self) -> Option { use BinaryOperator::*; @@ -737,7 +748,7 @@ mod tests { ), ]); - // Similer to ExpressionDepthChecker::check, but also returns call count + // Similar to ExpressionDepthChecker::check, but also returns call count let check_with_call_count = |depth_limit| ExpressionDepthChecker::check_with_call_count(&expr, depth_limit); diff --git a/kernel/src/expressions/scalars.rs b/kernel/src/expressions/scalars.rs index 5283c08c5..2ce2fd41a 100644 --- a/kernel/src/expressions/scalars.rs +++ b/kernel/src/expressions/scalars.rs @@ -393,7 +393,7 @@ impl PrimitiveType { // Timestamps may additionally be encoded as a ISO 8601 formatted string such as // `1970-01-01T00:00:00.123456Z`. // - // The difference arrises mostly in how they are to be handled on the engine side - i.e. timestampNTZ + // The difference arises mostly in how they are to be handled on the engine side - i.e. timestampNTZ // is not adjusted to UTC, this is just so we can (de-)serialize it as a date sting. // https://github.com/delta-io/delta/blob/master/PROTOCOL.md#partition-value-serialization TimestampNtz | Timestamp => { diff --git a/kernel/src/lib.rs b/kernel/src/lib.rs index f27907bcd..49dceea75 100644 --- a/kernel/src/lib.rs +++ b/kernel/src/lib.rs @@ -202,7 +202,7 @@ impl FileMeta { /// let b: Arc = a.downcast().unwrap(); /// ``` /// -/// In contrast, very similer code that relies only on `Any` would fail to compile: +/// In contrast, very similar code that relies only on `Any` would fail to compile: /// /// ```fail_compile /// # use std::any::Any; @@ -371,8 +371,20 @@ pub trait JsonHandler: AsAny { output_schema: SchemaRef, ) -> DeltaResult>; - /// Read and parse the JSON format file at given locations and return - /// the data as EngineData with the columns requested by physical schema. + /// Read and parse the JSON format file at given locations and return the data as EngineData with + /// the columns requested by physical schema. Note: The [`FileDataReadResultIterator`] must emit + /// data from files in the order that `files` is given. For example if files ["a", "b"] is provided, + /// then the engine data iterator must first return all the engine data from file "a", _then_ all + /// the engine data from file "b". Moreover, for a given file, all of its [`EngineData`] and + /// constituent rows must be in order that they occur in the file. Consider a file with rows + /// (1, 2, 3). The following are legal iterator batches: + /// iter: [EngineData(1, 2), EngineData(3)] + /// iter: [EngineData(1), EngineData(2, 3)] + /// iter: [EngineData(1, 2, 3)] + /// The following are illegal batches: + /// iter: [EngineData(3), EngineData(1, 2)] + /// iter: [EngineData(1), EngineData(3, 2)] + /// iter: [EngineData(2, 1, 3)] /// /// # Parameters /// @@ -404,7 +416,7 @@ pub trait JsonHandler: AsAny { /// /// - `path` - URL specifying the location to write the JSON file /// - `data` - Iterator of EngineData to write to the JSON file. Each row should be written as - /// a new JSON object appended to the file. (that is, the file is newline-delimeted JSON, and + /// a new JSON object appended to the file. (that is, the file is newline-delimited JSON, and /// each row is a JSON object on a single line) /// - `overwrite` - If true, overwrite the file if it exists. If false, the call must fail if /// the file exists. diff --git a/kernel/src/predicates/mod.rs b/kernel/src/predicates/mod.rs index f13ed9a3b..54004db6f 100644 --- a/kernel/src/predicates/mod.rs +++ b/kernel/src/predicates/mod.rs @@ -20,7 +20,21 @@ mod tests; /// /// Because inversion (`NOT` operator) has special semantics and can often be optimized away by /// pushing it down, most methods take an `inverted` flag. That allows operations like -/// [`UnaryOperator::Not`] to simply evaluate their operand with a flipped `inverted` flag, +/// [`UnaryOperator::Not`] to simply evaluate their operand with a flipped `inverted` flag, and +/// greatly simplifies the implementations of most operators (other than those which have to +/// directly implement NOT semantics, which are unavoidably complex in that regard). +/// +/// # Parameterized output type +/// +/// The types involved in predicate evaluation are parameterized and implementation-specific. For +/// example, [`crate::engine::parquet_stats_skipping::ParquetStatsProvider`] directly evaluates the +/// predicate over parquet footer stats and returns boolean results, while +/// [`crate::scan::data_skipping::DataSkippingPredicateCreator`] instead transforms the input +/// predicate expression to a data skipping predicate expresion that the engine can evaluated +/// directly against Delta data skipping stats during log replay. Although this approach is harder +/// to read and reason about at first, the majority of expressions can be implemented generically, +/// which greatly reduces redundancy and ensures that all flavors of predicate evaluation have the +/// same semantics. /// /// # NULL and error semantics /// @@ -44,6 +58,9 @@ mod tests; pub(crate) trait PredicateEvaluator { type Output; + /// A (possibly inverted) scalar NULL test, e.g. ` IS [NOT] NULL`. + fn eval_scalar_is_null(&self, val: &Scalar, inverted: bool) -> Option; + /// A (possibly inverted) boolean scalar value, e.g. `[NOT] `. fn eval_scalar(&self, val: &Scalar, inverted: bool) -> Option; @@ -123,14 +140,19 @@ pub(crate) trait PredicateEvaluator { fn eval_unary(&self, op: UnaryOperator, expr: &Expr, inverted: bool) -> Option { match op { UnaryOperator::Not => self.eval_expr(expr, !inverted), - UnaryOperator::IsNull => { - // Data skipping only supports IS [NOT] NULL over columns (not expressions) - let Expr::Column(col) = expr else { + UnaryOperator::IsNull => match expr { + // WARNING: Only literals and columns can be safely null-checked. Attempting to + // null-check an expressions such as `a < 10` could wrongly produce FALSE in case + // `a` is just plain missing (rather than known to be NULL. A missing-value can + // arise e.g. if data skipping encounters a column with missing stats, or if + // partition pruning encounters a non-partition column. + Expr::Literal(val) => self.eval_scalar_is_null(val, inverted), + Expr::Column(col) => self.eval_is_null(col, inverted), + _ => { debug!("Unsupported operand: IS [NOT] NULL: {expr:?}"); - return None; - }; - self.eval_is_null(col, inverted) - } + None + } + }, } } @@ -229,12 +251,137 @@ pub(crate) trait PredicateEvaluator { Variadic(VariadicExpression { op, exprs }) => self.eval_variadic(*op, exprs, inverted), } } + + /// Evaluates a predicate with SQL WHERE semantics. + /// + /// By default, [`eval_expr`] behaves badly for comparisons involving NULL columns (e.g. `a < + /// 10` when `a` is NULL), because the comparison correctly evaluates to NULL, but NULL + /// expressions are interpreted as "stats missing" (= cannot skip). This ambiguity can "poison" + /// the entire expression, causing it to return NULL instead of FALSE that would allow skipping: + /// + /// ```text + /// WHERE a < 10 -- NULL (can't skip file) + /// WHERE a < 10 AND TRUE -- NULL (can't skip file) + /// WHERE a < 10 OR FALSE -- NULL (can't skip file) + /// ``` + /// + /// Meanwhile, SQL WHERE semantics only keeps rows for which the filter evaluates to + /// TRUE (discarding rows that evaluate to FALSE or NULL): + /// + /// ```text + /// WHERE a < 10 -- NULL (discard row) + /// WHERE a < 10 AND TRUE -- NULL (discard row) + /// WHERE a < 10 OR FALSE -- NULL (discard row) + /// ``` + /// + /// Conceptually, the behavior difference between data skipping and SQL WHERE semantics can be + /// addressed by evaluating with null-safe semantics, as if by ` IS NOT NULL AND `: + /// + /// ```text + /// WHERE (a < 10) IS NOT NULL AND (a < 10) -- FALSE (skip file) + /// WHERE (a < 10 AND TRUE) IS NOT NULL AND (a < 10 AND TRUE) -- FALSE (skip file) + /// WHERE (a < 10 OR FALSE) IS NOT NULL AND (a < 10 OR FALSE) -- FALSE (skip file) + /// ``` + /// + /// HOWEVER, we cannot safely NULL-check the result of an arbitrary data skipping predicate + /// because an expression will also produce NULL if the value is just plain missing (e.g. data + /// skipping over a column that lacks stats), and if that NULL should propagate all the way to + /// top-level, it would be wrongly interpreted as FALSE (= skippable). + /// + /// To prevent wrong data skipping, the predicate evaluator always returns NULL for a NULL check + /// over anything except for literals and columns with known values. So we must push the NULL + /// check down through supported operations (AND as well as null-intolerant comparisons like + /// `<`, `!=`, etc) until it reaches columns and literals where it can do some good, e.g.: + /// + /// ```text + /// WHERE a < 10 AND (b < 20 OR c < 30) + /// ``` + /// + /// would conceptually be interpreted as + /// + /// ```text + /// WHERE + /// (a < 10 AND (b < 20 OR c < 30)) IS NOT NULL AND + /// (a < 10 AND (b < 20 OR c < 30)) + /// ``` + /// + /// We then push the NULL check down through the top-level AND: + /// + /// ```text + /// WHERE + /// (a < 10 IS NOT NULL AND a < 10) AND + /// ((b < 20 OR c < 30) IS NOT NULL AND (b < 20 OR c < 30)) + /// ``` + /// + /// and attempt to push it further into the `a < 10` and `OR` clauses: + /// + /// ```text + /// WHERE + /// (a IS NOT NULL AND 10 IS NOT NULL AND a < 10) AND + /// (b < 20 OR c < 30) + /// ``` + /// + /// Any time the push-down reaches an operator that does not support push-down (such as OR), we + /// simply drop the NULL check. This way, the top-level NULL check only applies to + /// sub-expressions that can safely implement it, while ignoring other sub-expressions. The + /// unsupported sub-expressions could produce nulls at runtime that prevent skipping, but false + /// positives are OK -- the query will still correctly filter out the unwanted rows that result. + /// + /// At expression evaluation time, a NULL value of `a` (from our example) would evaluate as: + /// + /// ```text + /// AND(..., AND(a IS NOT NULL, 10 IS NOT NULL, a < 10), ...) + /// AND(..., AND(FALSE, TRUE, NULL), ...) + /// AND(..., FALSE, ...) + /// FALSE + /// ``` + /// + /// While a non-NULL value of `a` would instead evaluate as: + /// + /// ```text + /// AND(..., AND(a IS NOT NULL, 10 IS NOT NULL, a < 10), ...) + /// AND(..., AND(TRUE, TRUE, ), ...) + /// AND(..., , ...) + /// ``` + /// + /// And a missing value for `a` would safely disable the clause: + /// + /// ```text + /// AND(..., AND(a IS NOT NULL, 10 IS NOT NULL, a < 10), ...) + /// AND(..., AND(NULL, TRUE, NULL), ...) + /// AND(..., NULL, ...) + /// ``` + fn eval_sql_where(&self, filter: &Expr) -> Option { + use Expr::{Binary, Variadic}; + match filter { + Variadic(v) => { + // Recursively invoke `eval_sql_where` instead of the usual `eval_expr` for AND/OR. + let exprs = v.exprs.iter().map(|expr| self.eval_sql_where(expr)); + self.finish_eval_variadic(v.op, exprs, false) + } + Binary(BinaryExpression { op, left, right }) if op.is_null_intolerant_comparison() => { + // Perform a nullsafe comparison instead of the usual `eval_binary` + let exprs = [ + self.eval_unary(UnaryOperator::IsNull, left, true), + self.eval_unary(UnaryOperator::IsNull, right, true), + self.eval_binary(*op, left, right, false), + ]; + self.finish_eval_variadic(VariadicOperator::And, exprs, false) + } + _ => self.eval_expr(filter, false), + } + } } /// A collection of provided methods from the [`PredicateEvaluator`] trait, factored out to allow -/// reuse by the different predicate evaluator implementations. +/// reuse by multiple bool-output predicate evaluator implementations. pub(crate) struct PredicateEvaluatorDefaults; impl PredicateEvaluatorDefaults { + /// Directly null-tests a scalar. See [`PredicateEvaluator::eval_scalar_is_null`]. + pub(crate) fn eval_scalar_is_null(val: &Scalar, inverted: bool) -> Option { + Some(val.is_null() != inverted) + } + /// Directly evaluates a boolean scalar. See [`PredicateEvaluator::eval_scalar`]. pub(crate) fn eval_scalar(val: &Scalar, inverted: bool) -> Option { match val { @@ -326,6 +473,14 @@ impl ResolveColumnAsScalar for UnimplementedColumnResolver { } } +// Used internally and by some tests +pub(crate) struct EmptyColumnResolver; +impl ResolveColumnAsScalar for EmptyColumnResolver { + fn resolve_column(&self, _col: &ColumnName) -> Option { + None + } +} + // In testing, it is convenient to just build a hashmap of scalar values. #[cfg(test)] impl ResolveColumnAsScalar for std::collections::HashMap { @@ -358,13 +513,17 @@ impl From for DefaultPredicateEvaluator PredicateEvaluator for DefaultPredicateEvaluator { type Output = bool; + fn eval_scalar_is_null(&self, val: &Scalar, inverted: bool) -> Option { + PredicateEvaluatorDefaults::eval_scalar_is_null(val, inverted) + } + fn eval_scalar(&self, val: &Scalar, inverted: bool) -> Option { PredicateEvaluatorDefaults::eval_scalar(val, inverted) } fn eval_is_null(&self, col: &ColumnName, inverted: bool) -> Option { let col = self.resolve_column(col)?; - Some(matches!(col, Scalar::Null(_)) != inverted) + self.eval_scalar_is_null(&col, inverted) } fn eval_lt(&self, col: &ColumnName, val: &Scalar) -> Option { @@ -428,12 +587,6 @@ impl PredicateEvaluator for DefaultPredicateEvaluator< /// example, comparisons involving a column are converted into comparisons over that column's /// min/max stats, and NULL checks are converted into comparisons involving the column's nullcount /// and rowcount stats. -/// -/// The types involved in these operations are parameterized and implementation-specific. For -/// example, [`crate::engine::parquet_stats_skipping::ParquetStatsProvider`] directly evaluates data -/// skipping expressions and returnss boolean results, while -/// [`crate::scan::data_skipping::DataSkippingPredicateCreator`] instead converts the input -/// predicate to a data skipping predicate that can be evaluated directly later. pub(crate) trait DataSkippingPredicateEvaluator { /// The output type produced by this expression evaluator type Output; @@ -454,6 +607,9 @@ pub(crate) trait DataSkippingPredicateEvaluator { /// Retrieves the row count of a column (parquet footers always include this stat). fn get_rowcount_stat(&self) -> Option; + /// See [`PredicateEvaluator::eval_scalar_is_null`] + fn eval_scalar_is_null(&self, val: &Scalar, inverted: bool) -> Option; + /// See [`PredicateEvaluator::eval_scalar`] fn eval_scalar(&self, val: &Scalar, inverted: bool) -> Option; @@ -589,6 +745,10 @@ pub(crate) trait DataSkippingPredicateEvaluator { impl PredicateEvaluator for T { type Output = T::Output; + fn eval_scalar_is_null(&self, val: &Scalar, inverted: bool) -> Option { + self.eval_scalar_is_null(val, inverted) + } + fn eval_scalar(&self, val: &Scalar, inverted: bool) -> Option { self.eval_scalar(val, inverted) } diff --git a/kernel/src/predicates/parquet_stats_skipping.rs b/kernel/src/predicates/parquet_stats_skipping.rs index a8c679d69..ff7536f40 100644 --- a/kernel/src/predicates/parquet_stats_skipping.rs +++ b/kernel/src/predicates/parquet_stats_skipping.rs @@ -1,11 +1,6 @@ //! An implementation of data skipping that leverages parquet stats from the file footer. -use crate::expressions::{ - BinaryExpression, BinaryOperator, ColumnName, Expression as Expr, Scalar, UnaryOperator, - VariadicExpression, VariadicOperator, -}; -use crate::predicates::{ - DataSkippingPredicateEvaluator, PredicateEvaluator, PredicateEvaluatorDefaults, -}; +use crate::expressions::{BinaryOperator, ColumnName, Scalar, VariadicOperator}; +use crate::predicates::{DataSkippingPredicateEvaluator, PredicateEvaluatorDefaults}; use crate::schema::DataType; use std::cmp::Ordering; @@ -65,6 +60,10 @@ impl DataSkippingPredicateEvaluator for T { PredicateEvaluatorDefaults::partial_cmp_scalars(ord, &col, val, inverted) } + fn eval_scalar_is_null(&self, val: &Scalar, inverted: bool) -> Option { + PredicateEvaluatorDefaults::eval_scalar_is_null(val, inverted) + } + fn eval_scalar(&self, val: &Scalar, inverted: bool) -> Option { PredicateEvaluatorDefaults::eval_scalar(val, inverted) } @@ -96,109 +95,3 @@ impl DataSkippingPredicateEvaluator for T { PredicateEvaluatorDefaults::finish_eval_variadic(op, exprs, inverted) } } - -/// Data skipping based on parquet footer stats (e.g. row group skipping). The required methods -/// fetch stats values for requested columns (if available and with compatible types), and the -/// provided methods implement the actual skipping logic. -/// -/// NOTE: We are given a row-based filter, but stats-based predicate evaluation -- which applies to -/// a SET of rows -- has different semantics than row-based predicate evaluation. The provided -/// methods of this class convert various supported expressions into data skipping predicates, and -/// then return the result of evaluating the translated filter. -pub(crate) trait ParquetStatsSkippingFilter { - /// Attempts to filter using SQL WHERE semantics. - /// - /// By default, [`apply_expr`] can produce unwelcome behavior for comparisons involving all-NULL - /// columns (e.g. `a == 10`), because the (legitimately NULL) min/max stats are interpreted as - /// stats-missing that produces a NULL data skipping result). The resulting NULL can "poison" - /// the entire expression, causing it to return NULL instead of FALSE that would allow skipping. - /// - /// Meanwhile, SQL WHERE semantics only keep rows for which the filter evaluates to TRUE -- - /// effectively turning `` into the null-safe predicate `AND( IS NOT NULL, )`. - /// - /// We cannot safely evaluate an arbitrary data skipping expression with null-safe semantics - /// (because NULL could also mean missing-stats), but we CAN safely turn a column reference in a - /// comparison into a null-safe comparison, as long as the comparison's parent expressions are - /// all AND. To see why, consider a WHERE clause filter of the form: - /// - /// ```text - /// AND(..., a {cmp} b, ...) - /// ``` - /// - /// In order allow skipping based on the all-null `a` or `b`, we want to actually evaluate: - /// ```text - /// AND(..., AND(a IS NOT NULL, b IS NOT NULL, a {cmp} b), ...) - /// ``` - /// - /// This optimization relies on the fact that we only support IS [NOT] NULL skipping for - /// columns, and we only support skipping for comparisons between columns and literals. Thus, a - /// typical case such as: `AND(..., x < 10, ...)` would in the all-null case be evaluated as: - /// ```text - /// AND(..., AND(x IS NOT NULL, 10 IS NOT NULL, x < 10), ...) - /// AND(..., AND(FALSE, NULL, NULL), ...) - /// AND(..., FALSE, ...) - /// FALSE - /// ``` - /// - /// In the not all-null case, it would instead evaluate as: - /// ```text - /// AND(..., AND(x IS NOT NULL, 10 IS NOT NULL, x < 10), ...) - /// AND(..., AND(TRUE, NULL, ), ...) - /// ``` - /// - /// If the result was FALSE, it forces both inner and outer AND to FALSE, as desired. If the - /// result was TRUE or NULL, then it does not contribute to data skipping but also does not - /// block it if other legs of the AND evaluate to FALSE. - // TODO: If these are generally useful, we may want to move them into PredicateEvaluator? - fn eval_sql_where(&self, filter: &Expr) -> Option; - fn eval_binary_nullsafe(&self, op: BinaryOperator, left: &Expr, right: &Expr) -> Option; -} - -impl> ParquetStatsSkippingFilter for T { - fn eval_sql_where(&self, filter: &Expr) -> Option { - use Expr::{Binary, Variadic}; - match filter { - Variadic(VariadicExpression { - op: VariadicOperator::And, - exprs, - }) => { - let exprs: Vec<_> = exprs - .iter() - .map(|expr| self.eval_sql_where(expr)) - .map(|result| match result { - Some(value) => Expr::literal(value), - None => Expr::null_literal(DataType::BOOLEAN), - }) - .collect(); - self.eval_variadic(VariadicOperator::And, &exprs, false) - } - Binary(BinaryExpression { op, left, right }) => { - self.eval_binary_nullsafe(*op, left, right) - } - _ => self.eval_expr(filter, false), - } - } - - /// Helper method for [`apply_sql_where`], that evaluates `{a} {cmp} {b}` as - /// ```text - /// AND({a} IS NOT NULL, {b} IS NOT NULL, {a} {cmp} {b}) - /// ``` - /// - /// The null checks only apply to column expressions, so at least one of them will always be - /// NULL (since we don't support skipping over column-column comparisons). If any NULL check - /// fails (producing FALSE), it short-circuits the entire AND without ever evaluating the - /// comparison. Otherwise, the original comparison will run and -- if FALSE -- can cause data - /// skipping as usual. - fn eval_binary_nullsafe(&self, op: BinaryOperator, left: &Expr, right: &Expr) -> Option { - use UnaryOperator::IsNull; - // Convert `a {cmp} b` to `AND(a IS NOT NULL, b IS NOT NULL, a {cmp} b)`, - // and only evaluate the comparison if the null checks don't short circuit. - if let Some(false) = self.eval_unary(IsNull, left, true) { - return Some(false); - } - if let Some(false) = self.eval_unary(IsNull, right, true) { - return Some(false); - } - self.eval_binary(op, left, right, false) - } -} diff --git a/kernel/src/predicates/parquet_stats_skipping/tests.rs b/kernel/src/predicates/parquet_stats_skipping/tests.rs index b1de88e6b..949a4cb68 100644 --- a/kernel/src/predicates/parquet_stats_skipping/tests.rs +++ b/kernel/src/predicates/parquet_stats_skipping/tests.rs @@ -1,6 +1,6 @@ use super::*; use crate::expressions::{column_expr, Expression as Expr}; -use crate::predicates::PredicateEvaluator; +use crate::predicates::PredicateEvaluator as _; use crate::DataType; const TRUE: Option = Some(true); @@ -257,126 +257,3 @@ fn test_eval_is_null() { // all nulls do_test(2, &[TRUE, FALSE]); } - -struct AllNullTestFilter; -impl ParquetStatsProvider for AllNullTestFilter { - fn get_parquet_min_stat(&self, _col: &ColumnName, _data_type: &DataType) -> Option { - None - } - - fn get_parquet_max_stat(&self, _col: &ColumnName, _data_type: &DataType) -> Option { - None - } - - fn get_parquet_nullcount_stat(&self, _col: &ColumnName) -> Option { - Some(self.get_parquet_rowcount_stat()) - } - - fn get_parquet_rowcount_stat(&self) -> i64 { - 10 - } -} - -#[test] -fn test_sql_where() { - let col = &column_expr!("x"); - const VAL: Expr = Expr::Literal(Scalar::Integer(1)); - const NULL: Expr = Expr::Literal(Scalar::Null(DataType::BOOLEAN)); - const FALSE: Expr = Expr::Literal(Scalar::Boolean(false)); - const TRUE: Expr = Expr::Literal(Scalar::Boolean(true)); - - // Basic sanity checks - expect_eq!(AllNullTestFilter.eval_sql_where(&VAL), None, "WHERE {VAL}"); - expect_eq!(AllNullTestFilter.eval_sql_where(col), None, "WHERE {col}"); - expect_eq!( - AllNullTestFilter.eval_sql_where(&Expr::is_null(col.clone())), - Some(true), // No injected NULL checks - "WHERE {col} IS NULL" - ); - expect_eq!( - AllNullTestFilter.eval_sql_where(&Expr::lt(TRUE, FALSE)), - Some(false), // Injected NULL checks don't short circuit when inputs are NOT NULL - "WHERE {TRUE} < {FALSE}" - ); - - // Constrast normal vs SQL WHERE semantics - comparison - expect_eq!( - AllNullTestFilter.eval_expr(&Expr::lt(col.clone(), VAL), false), - None, - "{col} < {VAL}" - ); - expect_eq!( - AllNullTestFilter.eval_sql_where(&Expr::lt(col.clone(), VAL)), - Some(false), - "WHERE {col} < {VAL}" - ); - expect_eq!( - AllNullTestFilter.eval_expr(&Expr::lt(VAL, col.clone()), false), - None, - "{VAL} < {col}" - ); - expect_eq!( - AllNullTestFilter.eval_sql_where(&Expr::lt(VAL, col.clone())), - Some(false), - "WHERE {VAL} < {col}" - ); - - // Constrast normal vs SQL WHERE semantics - comparison inside AND - expect_eq!( - AllNullTestFilter.eval_expr(&Expr::and(NULL, Expr::lt(col.clone(), VAL)), false), - None, - "{NULL} AND {col} < {VAL}" - ); - expect_eq!( - AllNullTestFilter.eval_sql_where(&Expr::and(NULL, Expr::lt(col.clone(), VAL),)), - Some(false), - "WHERE {NULL} AND {col} < {VAL}" - ); - - expect_eq!( - AllNullTestFilter.eval_expr(&Expr::and(TRUE, Expr::lt(col.clone(), VAL)), false), - None, // NULL (from the NULL check) is stronger than TRUE - "{TRUE} AND {col} < {VAL}" - ); - expect_eq!( - AllNullTestFilter.eval_sql_where(&Expr::and(TRUE, Expr::lt(col.clone(), VAL),)), - Some(false), // FALSE (from the NULL check) is stronger than TRUE - "WHERE {TRUE} AND {col} < {VAL}" - ); - - // Contrast normal vs. SQL WHERE semantics - comparison inside AND inside AND - expect_eq!( - AllNullTestFilter.eval_expr( - &Expr::and(TRUE, Expr::and(NULL, Expr::lt(col.clone(), VAL)),), - false, - ), - None, - "{TRUE} AND ({NULL} AND {col} < {VAL})" - ); - expect_eq!( - AllNullTestFilter.eval_sql_where(&Expr::and( - TRUE, - Expr::and(NULL, Expr::lt(col.clone(), VAL)), - )), - Some(false), - "WHERE {TRUE} AND ({NULL} AND {col} < {VAL})" - ); - - // Semantics are the same for comparison inside OR inside AND - expect_eq!( - AllNullTestFilter.eval_expr( - &Expr::or(FALSE, Expr::and(NULL, Expr::lt(col.clone(), VAL)),), - false, - ), - None, - "{FALSE} OR ({NULL} AND {col} < {VAL})" - ); - expect_eq!( - AllNullTestFilter.eval_sql_where(&Expr::or( - FALSE, - Expr::and(NULL, Expr::lt(col.clone(), VAL)), - )), - None, - "WHERE {FALSE} OR ({NULL} AND {col} < {VAL})" - ); -} diff --git a/kernel/src/predicates/tests.rs b/kernel/src/predicates/tests.rs index ce273e7b8..fcfb08eb9 100644 --- a/kernel/src/predicates/tests.rs +++ b/kernel/src/predicates/tests.rs @@ -2,7 +2,6 @@ use super::*; use crate::expressions::{ column_expr, column_name, ArrayData, Expression, StructData, UnaryOperator, }; -use crate::predicates::PredicateEvaluator; use crate::schema::ArrayType; use crate::DataType; @@ -51,7 +50,7 @@ fn test_default_eval_scalar() { } } -// verifies that partial orderings behave as excpected for all Scalar types +// verifies that partial orderings behave as expected for all Scalar types #[test] fn test_default_partial_cmp_scalars() { use Ordering::*; @@ -394,12 +393,12 @@ fn test_eval_is_null() { let expr = Expression::literal(1); expect_eq!( filter.eval_unary(UnaryOperator::IsNull, &expr, true), - None, + Some(true), "1 IS NOT NULL" ); expect_eq!( filter.eval_unary(UnaryOperator::IsNull, &expr, false), - None, + Some(false), "1 IS NULL" ); } @@ -570,3 +569,81 @@ fn eval_binary() { ); } } + +// NOTE: `None` is NOT equivalent to `Some(Scalar::Null)` +struct NullColumnResolver; +impl ResolveColumnAsScalar for NullColumnResolver { + fn resolve_column(&self, _col: &ColumnName) -> Option { + Some(Scalar::Null(DataType::INTEGER)) + } +} + +#[test] +fn test_sql_where() { + let col = &column_expr!("x"); + const VAL: Expr = Expr::Literal(Scalar::Integer(1)); + const NULL: Expr = Expr::Literal(Scalar::Null(DataType::BOOLEAN)); + const FALSE: Expr = Expr::Literal(Scalar::Boolean(false)); + const TRUE: Expr = Expr::Literal(Scalar::Boolean(true)); + let null_filter = DefaultPredicateEvaluator::from(NullColumnResolver); + let empty_filter = DefaultPredicateEvaluator::from(EmptyColumnResolver); + + // Basic sanity checks + expect_eq!(null_filter.eval_sql_where(&VAL), None, "WHERE {VAL}"); + expect_eq!(null_filter.eval_sql_where(col), None, "WHERE {col}"); + + // SQL eval does not modify behavior of IS NULL + let expr = &Expr::is_null(col.clone()); + expect_eq!(null_filter.eval_sql_where(expr), Some(true), "{expr}"); + + // Injected NULL checks only short circuit if inputs are NULL + let expr = &Expr::lt(FALSE, TRUE); + expect_eq!(null_filter.eval_sql_where(expr), Some(true), "{expr}"); + expect_eq!(empty_filter.eval_sql_where(expr), Some(true), "{expr}"); + + // Constrast normal vs SQL WHERE semantics - comparison + let expr = &Expr::lt(col.clone(), VAL); + expect_eq!(null_filter.eval_expr(expr, false), None, "{expr}"); + expect_eq!(null_filter.eval_sql_where(expr), Some(false), "{expr}"); + // NULL check produces NULL due to missing column + expect_eq!(empty_filter.eval_sql_where(expr), None, "{expr}"); + + let expr = &Expr::lt(VAL, col.clone()); + expect_eq!(null_filter.eval_expr(expr, false), None, "{expr}"); + expect_eq!(null_filter.eval_sql_where(expr), Some(false), "{expr}"); + expect_eq!(empty_filter.eval_sql_where(expr), None, "{expr}"); + + let expr = &Expr::distinct(VAL, col.clone()); + expect_eq!(null_filter.eval_expr(expr, false), Some(true), "{expr}"); + expect_eq!(null_filter.eval_sql_where(expr), Some(true), "{expr}"); + expect_eq!(empty_filter.eval_sql_where(expr), None, "{expr}"); + + let expr = &Expr::distinct(NULL, col.clone()); + expect_eq!(null_filter.eval_expr(expr, false), Some(false), "{expr}"); + expect_eq!(null_filter.eval_sql_where(expr), Some(false), "{expr}"); + expect_eq!(empty_filter.eval_sql_where(expr), None, "{expr}"); + + // Constrast normal vs SQL WHERE semantics - comparison inside AND + let expr = &Expr::and(NULL, Expr::lt(col.clone(), VAL)); + expect_eq!(null_filter.eval_expr(expr, false), None, "{expr}"); + expect_eq!(null_filter.eval_sql_where(expr), Some(false), "{expr}"); + expect_eq!(empty_filter.eval_sql_where(expr), None, "{expr}"); + + // NULL/FALSE (from the NULL check) is stronger than TRUE + let expr = &Expr::and(TRUE, Expr::lt(col.clone(), VAL)); + expect_eq!(null_filter.eval_expr(expr, false), None, "{expr}"); + expect_eq!(null_filter.eval_sql_where(expr), Some(false), "{expr}"); + expect_eq!(empty_filter.eval_sql_where(expr), None, "{expr}"); + + // Contrast normal vs. SQL WHERE semantics - comparison inside AND inside AND + let expr = &Expr::and(TRUE, Expr::and(NULL, Expr::lt(col.clone(), VAL))); + expect_eq!(null_filter.eval_expr(expr, false), None, "{expr}"); + expect_eq!(null_filter.eval_sql_where(expr), Some(false), "{expr}"); + expect_eq!(empty_filter.eval_sql_where(expr), None, "{expr}"); + + // Ditto for comparison inside OR inside AND + let expr = &Expr::or(FALSE, Expr::and(NULL, Expr::lt(col.clone(), VAL))); + expect_eq!(null_filter.eval_expr(expr, false), None, "{expr}"); + expect_eq!(null_filter.eval_sql_where(expr), Some(false), "{expr}"); + expect_eq!(empty_filter.eval_sql_where(expr), None, "{expr}"); +} diff --git a/kernel/src/scan/data_skipping.rs b/kernel/src/scan/data_skipping.rs index 54eb5344c..cea54c4c9 100644 --- a/kernel/src/scan/data_skipping.rs +++ b/kernel/src/scan/data_skipping.rs @@ -24,7 +24,7 @@ mod tests; /// Returns `None` if the predicate is not eligible for data skipping. /// /// We normalize each binary operation to a comparison between a column and a literal value and -/// rewite that in terms of the min/max values of the column. +/// rewrite that in terms of the min/max values of the column. /// For example, `1 < a` is rewritten as `minValues.a > 1`. /// /// For Unary `Not`, we push the Not down using De Morgan's Laws to invert everything below the Not. @@ -36,8 +36,15 @@ mod tests; /// are not eligible for data skipping. /// - `OR` is rewritten only if all operands are eligible for data skipping. Otherwise, the whole OR /// expression is dropped. -fn as_data_skipping_predicate(expr: &Expr, inverted: bool) -> Option { - DataSkippingPredicateCreator.eval_expr(expr, inverted) +#[cfg(test)] +fn as_data_skipping_predicate(expr: &Expr) -> Option { + DataSkippingPredicateCreator.eval_expr(expr, false) +} + +/// Like `as_data_skipping_predicate`, but invokes [`PredicateEvaluator::eval_sql_where`] instead +/// of [`PredicateEvaluator::eval_expr`]. +fn as_sql_data_skipping_predicate(expr: &Expr) -> Option { + DataSkippingPredicateCreator.eval_sql_where(expr) } pub(crate) struct DataSkippingFilter { @@ -59,7 +66,7 @@ impl DataSkippingFilter { physical_predicate: Option<(ExpressionRef, SchemaRef)>, ) -> Option { static PREDICATE_SCHEMA: LazyLock = LazyLock::new(|| { - DataType::struct_type([StructField::new("predicate", DataType::BOOLEAN, true)]) + DataType::struct_type([StructField::nullable("predicate", DataType::BOOLEAN)]) }); static STATS_EXPR: LazyLock = LazyLock::new(|| column_expr!("add.stats")); static FILTER_EXPR: LazyLock = @@ -82,10 +89,10 @@ impl DataSkippingFilter { .transform_struct(&referenced_schema)? .into_owned(); let stats_schema = Arc::new(StructType::new([ - StructField::new("numRecords", DataType::LONG, true), - StructField::new("nullCount", nullcount_schema, true), - StructField::new("minValues", referenced_schema.clone(), true), - StructField::new("maxValues", referenced_schema, true), + StructField::nullable("numRecords", DataType::LONG), + StructField::nullable("nullCount", nullcount_schema), + StructField::nullable("minValues", referenced_schema.clone()), + StructField::nullable("maxValues", referenced_schema), ])); // Skipping happens in several steps: @@ -108,7 +115,7 @@ impl DataSkippingFilter { let skipping_evaluator = engine.get_expression_handler().get_evaluator( stats_schema.clone(), - Expr::struct_from([as_data_skipping_predicate(&predicate, false)?]), + Expr::struct_from([as_sql_data_skipping_predicate(&predicate)?]), PREDICATE_SCHEMA.clone(), ); @@ -205,6 +212,10 @@ impl DataSkippingPredicateEvaluator for DataSkippingPredicateCreator { Some(Expr::binary(op, col, val.clone())) } + fn eval_scalar_is_null(&self, val: &Scalar, inverted: bool) -> Option { + PredicateEvaluatorDefaults::eval_scalar_is_null(val, inverted).map(Expr::literal) + } + fn eval_scalar(&self, val: &Scalar, inverted: bool) -> Option { PredicateEvaluatorDefaults::eval_scalar(val, inverted).map(Expr::literal) } diff --git a/kernel/src/scan/data_skipping/tests.rs b/kernel/src/scan/data_skipping/tests.rs index e12adb526..4f1d74f63 100644 --- a/kernel/src/scan/data_skipping/tests.rs +++ b/kernel/src/scan/data_skipping/tests.rs @@ -34,7 +34,7 @@ fn test_eval_is_null() { ]); let filter = DefaultPredicateEvaluator::from(resolver); for (expr, expect) in expressions.iter().zip(expected) { - let pred = as_data_skipping_predicate(expr, false).unwrap(); + let pred = as_data_skipping_predicate(expr).unwrap(); expect_eq!( filter.eval_expr(&pred, false), *expect, @@ -77,7 +77,7 @@ fn test_eval_binary_comparisons() { ]); let filter = DefaultPredicateEvaluator::from(resolver); for (expr, expect) in expressions.iter().zip(expected.iter()) { - let pred = as_data_skipping_predicate(expr, false).unwrap(); + let pred = as_data_skipping_predicate(expr).unwrap(); expect_eq!( filter.eval_expr(&pred, false), *expect, @@ -160,7 +160,7 @@ fn test_eval_variadic() { .collect(); let expr = Expr::and_from(inputs.clone()); - let pred = as_data_skipping_predicate(&expr, false).unwrap(); + let pred = as_data_skipping_predicate(&expr).unwrap(); expect_eq!( filter.eval_expr(&pred, false), *expect_and, @@ -168,19 +168,19 @@ fn test_eval_variadic() { ); let expr = Expr::or_from(inputs.clone()); - let pred = as_data_skipping_predicate(&expr, false).unwrap(); + let pred = as_data_skipping_predicate(&expr).unwrap(); expect_eq!(filter.eval_expr(&pred, false), *expect_or, "OR({inputs:?})"); - let expr = Expr::and_from(inputs.clone()); - let pred = as_data_skipping_predicate(&expr, true).unwrap(); + let expr = !Expr::and_from(inputs.clone()); + let pred = as_data_skipping_predicate(&expr).unwrap(); expect_eq!( filter.eval_expr(&pred, false), expect_and.map(|val| !val), "NOT AND({inputs:?})" ); - let expr = Expr::or_from(inputs.clone()); - let pred = as_data_skipping_predicate(&expr, true).unwrap(); + let expr = !Expr::or_from(inputs.clone()); + let pred = as_data_skipping_predicate(&expr).unwrap(); expect_eq!( filter.eval_expr(&pred, false), expect_or.map(|val| !val), @@ -216,7 +216,7 @@ fn test_eval_distinct() { ]); let filter = DefaultPredicateEvaluator::from(resolver); for (expr, expect) in expressions.iter().zip(expected) { - let pred = as_data_skipping_predicate(expr, false).unwrap(); + let pred = as_data_skipping_predicate(expr).unwrap(); expect_eq!( filter.eval_expr(&pred, false), *expect, @@ -252,3 +252,93 @@ fn test_eval_distinct() { // min < value < max, all nulls do_test(five, fifteen, 2, &[TRUE, FALSE, FALSE, TRUE]); } + +#[test] +fn test_sql_where() { + let col = &column_expr!("x"); + const VAL: Expr = Expr::Literal(Scalar::Integer(10)); + const NULL: Expr = Expr::Literal(Scalar::Null(DataType::BOOLEAN)); + const FALSE: Expr = Expr::Literal(Scalar::Boolean(false)); + const TRUE: Expr = Expr::Literal(Scalar::Boolean(true)); + + const ROWCOUNT: i64 = 2; + const ALL_NULL: i64 = ROWCOUNT; + const SOME_NULL: i64 = 1; + const NO_NULL: i64 = 0; + let do_test = + |nulls: i64, expr: &Expr, missing: bool, expect: Option, expect_sql: Option| { + assert!((0..=ROWCOUNT).contains(&nulls)); + let (min, max) = if nulls < ROWCOUNT { + (Scalar::Integer(5), Scalar::Integer(15)) + } else { + ( + Scalar::Null(DataType::INTEGER), + Scalar::Null(DataType::INTEGER), + ) + }; + let resolver = if missing { + HashMap::new() + } else { + HashMap::from_iter([ + (column_name!("numRecords"), Scalar::from(ROWCOUNT)), + (column_name!("nullCount.x"), Scalar::from(nulls)), + (column_name!("minValues.x"), min.clone()), + (column_name!("maxValues.x"), max.clone()), + ]) + }; + let filter = DefaultPredicateEvaluator::from(resolver); + let pred = as_data_skipping_predicate(expr).unwrap(); + expect_eq!( + filter.eval_expr(&pred, false), + expect, + "{expr:#?} became {pred:#?} ({min}..{max}, {nulls} nulls)" + ); + let sql_pred = as_sql_data_skipping_predicate(expr).unwrap(); + expect_eq!( + filter.eval_expr(&sql_pred, false), + expect_sql, + "{expr:#?} became {sql_pred:#?} ({min}..{max}, {nulls} nulls)" + ); + }; + + // Sanity tests -- only all-null columns should behave differently between normal and SQL WHERE. + const MISSING: bool = true; + const PRESENT: bool = false; + let expr = &Expr::lt(TRUE, FALSE); + do_test(ALL_NULL, expr, MISSING, Some(false), Some(false)); + + let expr = &Expr::is_not_null(col.clone()); + do_test(ALL_NULL, expr, PRESENT, Some(false), Some(false)); + do_test(ALL_NULL, expr, MISSING, None, None); + + // SQL WHERE allows a present-but-all-null column to be pruned, but not a missing column. + let expr = &Expr::lt(col.clone(), VAL); + do_test(NO_NULL, expr, PRESENT, Some(true), Some(true)); + do_test(SOME_NULL, expr, PRESENT, Some(true), Some(true)); + do_test(ALL_NULL, expr, PRESENT, None, Some(false)); + do_test(ALL_NULL, expr, MISSING, None, None); + + // Comparison inside AND works + let expr = &Expr::and(NULL, Expr::lt(col.clone(), VAL)); + do_test(ALL_NULL, expr, PRESENT, None, Some(false)); + do_test(ALL_NULL, expr, MISSING, None, None); + + let expr = &Expr::and(TRUE, Expr::lt(VAL, col.clone())); + do_test(ALL_NULL, expr, PRESENT, None, Some(false)); + do_test(ALL_NULL, expr, MISSING, None, None); + + // Comparison inside AND inside AND works + let expr = &Expr::and(TRUE, Expr::and(NULL, Expr::lt(col.clone(), VAL))); + do_test(ALL_NULL, expr, PRESENT, None, Some(false)); + do_test(ALL_NULL, expr, MISSING, None, None); + + // Comparison inside OR works + let expr = &Expr::or(FALSE, Expr::lt(col.clone(), VAL)); + do_test(ALL_NULL, expr, PRESENT, None, Some(false)); + do_test(ALL_NULL, expr, MISSING, None, None); + + // Comparison inside AND inside OR works + let expr = &Expr::or(FALSE, Expr::and(TRUE, Expr::lt(col.clone(), VAL))); + do_test(ALL_NULL, expr, PRESENT, None, Some(false)); + do_test(ALL_NULL, expr, MISSING, None, None); +} diff --git a/kernel/src/scan/log_replay.rs b/kernel/src/scan/log_replay.rs index fb5c2b0fa..d7f83a4fa 100644 --- a/kernel/src/scan/log_replay.rs +++ b/kernel/src/scan/log_replay.rs @@ -162,21 +162,21 @@ pub(crate) static SCAN_ROW_SCHEMA: LazyLock> = LazyLock::new(|| // Note that fields projected out of a nullable struct must be nullable let partition_values = MapType::new(DataType::STRING, DataType::STRING, true); let file_constant_values = - StructType::new([StructField::new("partitionValues", partition_values, true)]); + StructType::new([StructField::nullable("partitionValues", partition_values)]); let deletion_vector = StructType::new([ - StructField::new("storageType", DataType::STRING, true), - StructField::new("pathOrInlineDv", DataType::STRING, true), - StructField::new("offset", DataType::INTEGER, true), - StructField::new("sizeInBytes", DataType::INTEGER, true), - StructField::new("cardinality", DataType::LONG, true), + StructField::nullable("storageType", DataType::STRING), + StructField::nullable("pathOrInlineDv", DataType::STRING), + StructField::nullable("offset", DataType::INTEGER), + StructField::nullable("sizeInBytes", DataType::INTEGER), + StructField::nullable("cardinality", DataType::LONG), ]); Arc::new(StructType::new([ - StructField::new("path", DataType::STRING, true), - StructField::new("size", DataType::LONG, true), - StructField::new("modificationTime", DataType::LONG, true), - StructField::new("stats", DataType::STRING, true), - StructField::new("deletionVector", deletion_vector, true), - StructField::new("fileConstantValues", file_constant_values, true), + StructField::nullable("path", DataType::STRING), + StructField::nullable("size", DataType::LONG), + StructField::nullable("modificationTime", DataType::LONG), + StructField::nullable("stats", DataType::STRING), + StructField::nullable("deletionVector", deletion_vector), + StructField::nullable("fileConstantValues", file_constant_values), ])) }); diff --git a/kernel/src/scan/mod.rs b/kernel/src/scan/mod.rs index e0d345b56..49af0222c 100644 --- a/kernel/src/scan/mod.rs +++ b/kernel/src/scan/mod.rs @@ -13,9 +13,7 @@ use crate::actions::deletion_vector::{ }; use crate::actions::{get_log_add_schema, get_log_schema, ADD_NAME, REMOVE_NAME}; use crate::expressions::{ColumnName, Expression, ExpressionRef, ExpressionTransform, Scalar}; -use crate::predicates::parquet_stats_skipping::{ - ParquetStatsProvider, ParquetStatsSkippingFilter as _, -}; +use crate::predicates::{DefaultPredicateEvaluator, EmptyColumnResolver}; use crate::scan::state::{DvInfo, Stats}; use crate::schema::{ ArrayType, DataType, MapType, PrimitiveType, Schema, SchemaRef, SchemaTransform, StructField, @@ -184,27 +182,10 @@ impl PhysicalPredicate { // Evaluates a static data skipping predicate, ignoring any column references, and returns true if // the predicate allows to statically skip all files. Since this is direct evaluation (not an -// expression rewrite), we use a dummy `ParquetStatsProvider` that provides no stats. +// expression rewrite), we use a `DefaultPredicateEvaluator` with an empty column resolver. fn can_statically_skip_all_files(predicate: &Expression) -> bool { - struct NoStats; - impl ParquetStatsProvider for NoStats { - fn get_parquet_min_stat(&self, _: &ColumnName, _: &DataType) -> Option { - None - } - - fn get_parquet_max_stat(&self, _: &ColumnName, _: &DataType) -> Option { - None - } - - fn get_parquet_nullcount_stat(&self, _: &ColumnName) -> Option { - None - } - - fn get_parquet_rowcount_stat(&self) -> i64 { - 0 - } - } - NoStats.eval_sql_where(predicate) == Some(false) + use crate::predicates::PredicateEvaluator as _; + DefaultPredicateEvaluator::from(EmptyColumnResolver).eval_sql_where(predicate) == Some(false) } // Build the stats read schema filtering the table schema to keep only skipping-eligible @@ -820,34 +801,32 @@ mod tests { #[test] fn test_physical_predicate() { let logical_schema = StructType::new(vec![ - StructField::new("a", DataType::LONG, true), - StructField::new("b", DataType::LONG, true).with_metadata([( + StructField::nullable("a", DataType::LONG), + StructField::nullable("b", DataType::LONG).with_metadata([( ColumnMetadataKey::ColumnMappingPhysicalName.as_ref(), "phys_b", )]), - StructField::new("phys_b", DataType::LONG, true).with_metadata([( + StructField::nullable("phys_b", DataType::LONG).with_metadata([( ColumnMetadataKey::ColumnMappingPhysicalName.as_ref(), "phys_c", )]), - StructField::new( + StructField::nullable( "nested", StructType::new(vec![ - StructField::new("x", DataType::LONG, true), - StructField::new("y", DataType::LONG, true).with_metadata([( + StructField::nullable("x", DataType::LONG), + StructField::nullable("y", DataType::LONG).with_metadata([( ColumnMetadataKey::ColumnMappingPhysicalName.as_ref(), "phys_y", )]), ]), - true, ), - StructField::new( + StructField::nullable( "mapped", - StructType::new(vec![StructField::new("n", DataType::LONG, true) + StructType::new(vec![StructField::nullable("n", DataType::LONG) .with_metadata([( ColumnMetadataKey::ColumnMappingPhysicalName.as_ref(), "phys_n", )])]), - true, ) .with_metadata([( ColumnMetadataKey::ColumnMappingPhysicalName.as_ref(), @@ -868,14 +847,14 @@ mod tests { column_expr!("a"), Some(PhysicalPredicate::Some( column_expr!("a").into(), - StructType::new(vec![StructField::new("a", DataType::LONG, true)]).into(), + StructType::new(vec![StructField::nullable("a", DataType::LONG)]).into(), )), ), ( column_expr!("b"), Some(PhysicalPredicate::Some( column_expr!("phys_b").into(), - StructType::new(vec![StructField::new("phys_b", DataType::LONG, true) + StructType::new(vec![StructField::nullable("phys_b", DataType::LONG) .with_metadata([( ColumnMetadataKey::ColumnMappingPhysicalName.as_ref(), "phys_b", @@ -887,10 +866,9 @@ mod tests { column_expr!("nested.x"), Some(PhysicalPredicate::Some( column_expr!("nested.x").into(), - StructType::new(vec![StructField::new( + StructType::new(vec![StructField::nullable( "nested", - StructType::new(vec![StructField::new("x", DataType::LONG, true)]), - true, + StructType::new(vec![StructField::nullable("x", DataType::LONG)]), )]) .into(), )), @@ -899,14 +877,13 @@ mod tests { column_expr!("nested.y"), Some(PhysicalPredicate::Some( column_expr!("nested.phys_y").into(), - StructType::new(vec![StructField::new( + StructType::new(vec![StructField::nullable( "nested", - StructType::new(vec![StructField::new("phys_y", DataType::LONG, true) + StructType::new(vec![StructField::nullable("phys_y", DataType::LONG) .with_metadata([( ColumnMetadataKey::ColumnMappingPhysicalName.as_ref(), "phys_y", )])]), - true, )]) .into(), )), @@ -915,14 +892,13 @@ mod tests { column_expr!("mapped.n"), Some(PhysicalPredicate::Some( column_expr!("phys_mapped.phys_n").into(), - StructType::new(vec![StructField::new( + StructType::new(vec![StructField::nullable( "phys_mapped", - StructType::new(vec![StructField::new("phys_n", DataType::LONG, true) + StructType::new(vec![StructField::nullable("phys_n", DataType::LONG) .with_metadata([( ColumnMetadataKey::ColumnMappingPhysicalName.as_ref(), "phys_n", )])]), - true, ) .with_metadata([( ColumnMetadataKey::ColumnMappingPhysicalName.as_ref(), @@ -935,14 +911,13 @@ mod tests { Expression::and(column_expr!("mapped.n"), true), Some(PhysicalPredicate::Some( Expression::and(column_expr!("phys_mapped.phys_n"), true).into(), - StructType::new(vec![StructField::new( + StructType::new(vec![StructField::nullable( "phys_mapped", - StructType::new(vec![StructField::new("phys_n", DataType::LONG, true) + StructType::new(vec![StructField::nullable("phys_n", DataType::LONG) .with_metadata([( ColumnMetadataKey::ColumnMappingPhysicalName.as_ref(), "phys_n", )])]), - true, ) .with_metadata([( ColumnMetadataKey::ColumnMappingPhysicalName.as_ref(), diff --git a/kernel/src/schema.rs b/kernel/src/schema.rs index 42901751f..a4cd44a6a 100644 --- a/kernel/src/schema.rs +++ b/kernel/src/schema.rs @@ -122,6 +122,16 @@ impl StructField { } } + /// Creates a new nullable field + pub fn nullable(name: impl Into, data_type: impl Into) -> Self { + Self::new(name, data_type, true) + } + + /// Creates a new non-nullable field + pub fn not_null(name: impl Into, data_type: impl Into) -> Self { + Self::new(name, data_type, false) + } + pub fn with_metadata( mut self, metadata: impl IntoIterator, impl Into)>, @@ -223,7 +233,7 @@ pub struct StructType { pub type_name: String, /// The type of element stored in this array // We use indexmap to preserve the order of fields as they are defined in the schema - // while also allowing for fast lookup by name. The atlerative to do a liner search + // while also allowing for fast lookup by name. The alternative is to do a linear search // for each field by name would be potentially quite expensive for large schemas. pub fields: IndexMap, } @@ -421,7 +431,7 @@ impl MapType { /// Create a schema assuming the map is stored as a struct with the specified key and value field names pub fn as_struct_schema(&self, key_name: String, val_name: String) -> Schema { StructType::new([ - StructField::new(key_name, self.key_type.clone(), false), + StructField::not_null(key_name, self.key_type.clone()), StructField::new(val_name, self.value_type.clone(), self.value_contains_null), ]) } @@ -1090,73 +1100,65 @@ mod tests { #[test] fn test_depth_checker() { let schema = DataType::struct_type([ - StructField::new( + StructField::nullable( "a", ArrayType::new( DataType::struct_type([ - StructField::new("w", DataType::LONG, true), - StructField::new("x", ArrayType::new(DataType::LONG, true), true), - StructField::new( + StructField::nullable("w", DataType::LONG), + StructField::nullable("x", ArrayType::new(DataType::LONG, true)), + StructField::nullable( "y", MapType::new(DataType::LONG, DataType::STRING, true), - true, ), - StructField::new( + StructField::nullable( "z", DataType::struct_type([ - StructField::new("n", DataType::LONG, true), - StructField::new("m", DataType::STRING, true), + StructField::nullable("n", DataType::LONG), + StructField::nullable("m", DataType::STRING), ]), - true, ), ]), true, ), - true, ), - StructField::new( + StructField::nullable( "b", DataType::struct_type([ - StructField::new("o", ArrayType::new(DataType::LONG, true), true), - StructField::new( + StructField::nullable("o", ArrayType::new(DataType::LONG, true)), + StructField::nullable( "p", MapType::new(DataType::LONG, DataType::STRING, true), - true, ), - StructField::new( + StructField::nullable( "q", DataType::struct_type([ - StructField::new( + StructField::nullable( "s", DataType::struct_type([ - StructField::new("u", DataType::LONG, true), - StructField::new("v", DataType::LONG, true), + StructField::nullable("u", DataType::LONG), + StructField::nullable("v", DataType::LONG), ]), - true, ), - StructField::new("t", DataType::LONG, true), + StructField::nullable("t", DataType::LONG), ]), - true, ), - StructField::new("r", DataType::LONG, true), + StructField::nullable("r", DataType::LONG), ]), - true, ), - StructField::new( + StructField::nullable( "c", MapType::new( DataType::LONG, DataType::struct_type([ - StructField::new("f", DataType::LONG, true), - StructField::new("g", DataType::STRING, true), + StructField::nullable("f", DataType::LONG), + StructField::nullable("g", DataType::STRING), ]), true, ), - true, ), ]); - // Similer to SchemaDepthChecker::check, but also returns call count + // Similar to SchemaDepthChecker::check, but also returns call count let check_with_call_count = |depth_limit| SchemaDepthChecker::check_with_call_count(&schema, depth_limit); diff --git a/kernel/src/table_changes/log_replay/tests.rs b/kernel/src/table_changes/log_replay/tests.rs index f2dbdd956..35c4a99f8 100644 --- a/kernel/src/table_changes/log_replay/tests.rs +++ b/kernel/src/table_changes/log_replay/tests.rs @@ -23,8 +23,8 @@ use std::sync::Arc; fn get_schema() -> StructType { StructType::new([ - StructField::new("id", DataType::INTEGER, true), - StructField::new("value", DataType::STRING, true), + StructField::nullable("id", DataType::INTEGER), + StructField::nullable("value", DataType::STRING), ]) } @@ -219,17 +219,17 @@ async fn incompatible_schemas_fail() { // The CDF schema has fields: `id: int` and `value: string`. // This commit has schema with fields: `id: long`, `value: string` and `year: int` (nullable). let schema = StructType::new([ - StructField::new("id", DataType::LONG, true), - StructField::new("value", DataType::STRING, true), - StructField::new("year", DataType::INTEGER, true), + StructField::nullable("id", DataType::LONG), + StructField::nullable("value", DataType::STRING), + StructField::nullable("year", DataType::INTEGER), ]); assert_incompatible_schema(schema, get_schema()).await; // The CDF schema has fields: `id: int` and `value: string`. // This commit has schema with fields: `id: long` and `value: string`. let schema = StructType::new([ - StructField::new("id", DataType::LONG, true), - StructField::new("value", DataType::STRING, true), + StructField::nullable("id", DataType::LONG), + StructField::nullable("value", DataType::STRING), ]); assert_incompatible_schema(schema, get_schema()).await; @@ -238,12 +238,12 @@ async fn incompatible_schemas_fail() { // The CDF schema has fields: `id: long` and `value: string`. // This commit has schema with fields: `id: int` and `value: string`. let cdf_schema = StructType::new([ - StructField::new("id", DataType::LONG, true), - StructField::new("value", DataType::STRING, true), + StructField::nullable("id", DataType::LONG), + StructField::nullable("value", DataType::STRING), ]); let commit_schema = StructType::new([ - StructField::new("id", DataType::INTEGER, true), - StructField::new("value", DataType::STRING, true), + StructField::nullable("id", DataType::INTEGER), + StructField::nullable("value", DataType::STRING), ]); assert_incompatible_schema(cdf_schema, commit_schema).await; @@ -252,16 +252,16 @@ async fn incompatible_schemas_fail() { // The CDF schema has fields: nullable `id` and nullable `value`. // This commit has schema with fields: non-nullable `id` and nullable `value`. let schema = StructType::new([ - StructField::new("id", DataType::LONG, false), - StructField::new("value", DataType::STRING, true), + StructField::not_null("id", DataType::LONG), + StructField::nullable("value", DataType::STRING), ]); assert_incompatible_schema(schema, get_schema()).await; // The CDF schema has fields: `id: int` and `value: string`. // This commit has schema with fields:`id: string` and `value: string`. let schema = StructType::new([ - StructField::new("id", DataType::STRING, true), - StructField::new("value", DataType::STRING, true), + StructField::nullable("id", DataType::STRING), + StructField::nullable("value", DataType::STRING), ]); assert_incompatible_schema(schema, get_schema()).await; @@ -469,6 +469,7 @@ async fn dv() { assert_eq!(sv, &[false, true, true]); } +// Note: Data skipping does not work on Remove actions. #[tokio::test] async fn data_skipping_filter() { let engine = Arc::new(SyncEngine::new()); diff --git a/kernel/src/table_changes/mod.rs b/kernel/src/table_changes/mod.rs index b74f65b7a..a855668d8 100644 --- a/kernel/src/table_changes/mod.rs +++ b/kernel/src/table_changes/mod.rs @@ -60,9 +60,9 @@ static ADD_CHANGE_TYPE: &str = "insert"; static REMOVE_CHANGE_TYPE: &str = "delete"; static CDF_FIELDS: LazyLock<[StructField; 3]> = LazyLock::new(|| { [ - StructField::new(CHANGE_TYPE_COL_NAME, DataType::STRING, false), - StructField::new(COMMIT_VERSION_COL_NAME, DataType::LONG, false), - StructField::new(COMMIT_TIMESTAMP_COL_NAME, DataType::TIMESTAMP, false), + StructField::not_null(CHANGE_TYPE_COL_NAME, DataType::STRING), + StructField::not_null(COMMIT_VERSION_COL_NAME, DataType::LONG), + StructField::not_null(COMMIT_TIMESTAMP_COL_NAME, DataType::TIMESTAMP), ] }); @@ -138,6 +138,14 @@ impl TableChanges { start_version: Version, end_version: Option, ) -> DeltaResult { + let log_root = table_root.join("_delta_log/")?; + let log_segment = LogSegment::for_table_changes( + engine.get_file_system_client().as_ref(), + log_root, + start_version, + end_version, + )?; + // Both snapshots ensure that reading is supported at the start and end version using // `ensure_read_supported`. Note that we must still verify that reading is // supported for every protocol action in the CDF range. @@ -173,14 +181,6 @@ impl TableChanges { ))); } - let log_root = table_root.join("_delta_log/")?; - let log_segment = LogSegment::for_table_changes( - engine.get_file_system_client().as_ref(), - log_root, - start_version, - end_version, - )?; - let schema = StructType::new( end_snapshot .schema() @@ -316,8 +316,8 @@ mod tests { let engine = Box::new(SyncEngine::new()); let table = Table::try_from_uri(path).unwrap(); let expected_schema = [ - StructField::new("part", DataType::INTEGER, true), - StructField::new("id", DataType::INTEGER, true), + StructField::nullable("part", DataType::INTEGER), + StructField::nullable("id", DataType::INTEGER), ] .into_iter() .chain(CDF_FIELDS.clone()); diff --git a/kernel/src/table_changes/physical_to_logical.rs b/kernel/src/table_changes/physical_to_logical.rs index bc8488081..a953048a9 100644 --- a/kernel/src/table_changes/physical_to_logical.rs +++ b/kernel/src/table_changes/physical_to_logical.rs @@ -69,7 +69,7 @@ pub(crate) fn scan_file_physical_schema( physical_schema: &StructType, ) -> SchemaRef { if scan_file.scan_type == CdfScanFileType::Cdc { - let change_type = StructField::new(CHANGE_TYPE_COL_NAME, DataType::STRING, false); + let change_type = StructField::not_null(CHANGE_TYPE_COL_NAME, DataType::STRING); let fields = physical_schema.fields().cloned().chain(Some(change_type)); StructType::new(fields).into() } else { @@ -104,11 +104,11 @@ mod tests { commit_timestamp: 1234, }; let logical_schema = StructType::new([ - StructField::new("id", DataType::STRING, true), - StructField::new("age", DataType::LONG, false), - StructField::new(CHANGE_TYPE_COL_NAME, DataType::STRING, false), - StructField::new(COMMIT_VERSION_COL_NAME, DataType::LONG, false), - StructField::new(COMMIT_TIMESTAMP_COL_NAME, DataType::TIMESTAMP, false), + StructField::nullable("id", DataType::STRING), + StructField::not_null("age", DataType::LONG), + StructField::not_null(CHANGE_TYPE_COL_NAME, DataType::STRING), + StructField::not_null(COMMIT_VERSION_COL_NAME, DataType::LONG), + StructField::not_null(COMMIT_TIMESTAMP_COL_NAME, DataType::TIMESTAMP), ]); let all_fields = vec![ ColumnType::Selected("id".to_string()), diff --git a/kernel/src/table_changes/scan.rs b/kernel/src/table_changes/scan.rs index 9b0ba3067..dffd40f68 100644 --- a/kernel/src/table_changes/scan.rs +++ b/kernel/src/table_changes/scan.rs @@ -420,8 +420,8 @@ mod tests { assert_eq!( scan.logical_schema, StructType::new([ - StructField::new("id", DataType::INTEGER, true), - StructField::new("_commit_version", DataType::LONG, false), + StructField::nullable("id", DataType::INTEGER), + StructField::not_null("_commit_version", DataType::LONG), ]) .into() ); @@ -429,7 +429,7 @@ mod tests { scan.physical_predicate, PhysicalPredicate::Some( predicate, - StructType::new([StructField::new("id", DataType::INTEGER, true),]).into() + StructType::new([StructField::nullable("id", DataType::INTEGER),]).into() ) ); } diff --git a/kernel/src/table_changes/scan_file.rs b/kernel/src/table_changes/scan_file.rs index cc4514186..f428e09df 100644 --- a/kernel/src/table_changes/scan_file.rs +++ b/kernel/src/table_changes/scan_file.rs @@ -176,37 +176,37 @@ impl RowVisitor for CdfScanFileVisitor<'_, T> { pub(crate) fn cdf_scan_row_schema() -> SchemaRef { static CDF_SCAN_ROW_SCHEMA: LazyLock> = LazyLock::new(|| { let deletion_vector = StructType::new([ - StructField::new("storageType", DataType::STRING, true), - StructField::new("pathOrInlineDv", DataType::STRING, true), - StructField::new("offset", DataType::INTEGER, true), - StructField::new("sizeInBytes", DataType::INTEGER, true), - StructField::new("cardinality", DataType::LONG, true), + StructField::nullable("storageType", DataType::STRING), + StructField::nullable("pathOrInlineDv", DataType::STRING), + StructField::nullable("offset", DataType::INTEGER), + StructField::nullable("sizeInBytes", DataType::INTEGER), + StructField::nullable("cardinality", DataType::LONG), ]); let partition_values = MapType::new(DataType::STRING, DataType::STRING, true); let file_constant_values = - StructType::new([StructField::new("partitionValues", partition_values, true)]); + StructType::new([StructField::nullable("partitionValues", partition_values)]); let add = StructType::new([ - StructField::new("path", DataType::STRING, true), - StructField::new("deletionVector", deletion_vector.clone(), true), - StructField::new("fileConstantValues", file_constant_values.clone(), true), + StructField::nullable("path", DataType::STRING), + StructField::nullable("deletionVector", deletion_vector.clone()), + StructField::nullable("fileConstantValues", file_constant_values.clone()), ]); let remove = StructType::new([ - StructField::new("path", DataType::STRING, true), - StructField::new("deletionVector", deletion_vector, true), - StructField::new("fileConstantValues", file_constant_values.clone(), true), + StructField::nullable("path", DataType::STRING), + StructField::nullable("deletionVector", deletion_vector), + StructField::nullable("fileConstantValues", file_constant_values.clone()), ]); let cdc = StructType::new([ - StructField::new("path", DataType::STRING, true), - StructField::new("fileConstantValues", file_constant_values, true), + StructField::nullable("path", DataType::STRING), + StructField::nullable("fileConstantValues", file_constant_values), ]); Arc::new(StructType::new([ - StructField::new("add", add, true), - StructField::new("remove", remove, true), - StructField::new("cdc", cdc, true), - StructField::new("timestamp", DataType::LONG, false), - StructField::new("commit_version", DataType::LONG, false), + StructField::nullable("add", add), + StructField::nullable("remove", remove), + StructField::nullable("cdc", cdc), + StructField::not_null("timestamp", DataType::LONG), + StructField::not_null("commit_version", DataType::LONG), ])) }); CDF_SCAN_ROW_SCHEMA.clone() @@ -334,8 +334,8 @@ mod tests { ) .unwrap(); let table_schema = StructType::new([ - StructField::new("id", DataType::INTEGER, true), - StructField::new("value", DataType::STRING, true), + StructField::nullable("id", DataType::INTEGER), + StructField::nullable("value", DataType::STRING), ]); let scan_data = table_changes_action_iter( Arc::new(engine), diff --git a/kernel/src/transaction.rs b/kernel/src/transaction.rs index c6e93ea7b..d74c2456a 100644 --- a/kernel/src/transaction.rs +++ b/kernel/src/transaction.rs @@ -241,7 +241,7 @@ impl WriteContext { /// Result after committing a transaction. If 'committed', the version is the new version written /// to the log. If 'conflict', the transaction is returned so the caller can resolve the conflict /// (along with the version which conflicted). -// TODO(zach): in order to make the returning of a transcation useful, we need to add APIs to +// TODO(zach): in order to make the returning of a transaction useful, we need to add APIs to // update the transaction to a new version etc. #[derive(Debug)] pub enum CommitResult { @@ -277,10 +277,9 @@ fn generate_commit_info( // HACK (part 1/2): since we don't have proper map support, we create a literal struct with // one null field to create data that serializes as "operationParameters": {} Expression::literal(Scalar::Struct(StructData::try_new( - vec![StructField::new( + vec![StructField::nullable( "operation_parameter_int", DataType::INTEGER, - true, )], vec![Scalar::Null(DataType::INTEGER)], )?)), @@ -304,10 +303,9 @@ fn generate_commit_info( }; let engine_commit_info_schema = commit_info_data_type.project_as_struct(&["engineCommitInfo"])?; - let hack_data_type = DataType::Struct(Box::new(StructType::new(vec![StructField::new( + let hack_data_type = DataType::Struct(Box::new(StructType::new(vec![StructField::nullable( "hack_operation_parameter_int", DataType::INTEGER, - true, )]))); commit_info_data_type @@ -315,6 +313,12 @@ fn generate_commit_info( .get_mut("operationParameters") .ok_or_else(|| Error::missing_column("operationParameters"))? .data_type = hack_data_type; + + // Since writing in-commit timestamps is not supported, we remove the field so it is not + // written to the log + commit_info_data_type + .fields + .shift_remove("inCommitTimestamp"); commit_info_field.data_type = DataType::Struct(commit_info_data_type); let commit_info_evaluator = engine.get_expression_handler().get_evaluator( @@ -671,15 +675,14 @@ mod tests { fn test_write_metadata_schema() { let schema = get_write_metadata_schema(); let expected = StructType::new(vec![ - StructField::new("path", DataType::STRING, false), - StructField::new( + StructField::not_null("path", DataType::STRING), + StructField::not_null( "partitionValues", MapType::new(DataType::STRING, DataType::STRING, true), - false, ), - StructField::new("size", DataType::LONG, false), - StructField::new("modificationTime", DataType::LONG, false), - StructField::new("dataChange", DataType::BOOLEAN, false), + StructField::not_null("size", DataType::LONG), + StructField::not_null("modificationTime", DataType::LONG), + StructField::not_null("dataChange", DataType::BOOLEAN), ]); assert_eq!(*schema, expected.into()); } diff --git a/kernel/tests/cdf.rs b/kernel/tests/cdf.rs index 2be5324fc..2560dc71d 100644 --- a/kernel/tests/cdf.rs +++ b/kernel/tests/cdf.rs @@ -6,7 +6,7 @@ use delta_kernel::engine::sync::SyncEngine; use itertools::Itertools; use delta_kernel::engine::arrow_data::ArrowEngineData; -use delta_kernel::{DeltaResult, Table, Version}; +use delta_kernel::{DeltaResult, Error, ExpressionRef, Table, Version}; mod common; use common::{load_test_data, to_arrow}; @@ -15,6 +15,7 @@ fn read_cdf_for_table( test_name: impl AsRef, start_version: Version, end_version: impl Into>, + predicate: impl Into>, ) -> DeltaResult> { let test_dir = load_test_data("tests/data", test_name.as_ref()).unwrap(); let test_path = test_dir.path().join(test_name.as_ref()); @@ -34,6 +35,7 @@ fn read_cdf_for_table( let scan = table_changes .into_scan_builder() .with_schema(schema) + .with_predicate(predicate) .build()?; let batches: Vec = scan .execute(engine)? @@ -53,7 +55,7 @@ fn read_cdf_for_table( #[test] fn cdf_with_deletion_vector() -> Result<(), Box> { - let batches = read_cdf_for_table("cdf-table-with-dv", 0, None)?; + let batches = read_cdf_for_table("cdf-table-with-dv", 0, None, None)?; // Each commit performs the following: // 0. Insert 0..=9 // 1. Remove [0, 9] @@ -99,7 +101,7 @@ fn cdf_with_deletion_vector() -> Result<(), Box> { #[test] fn basic_cdf() -> Result<(), Box> { - let batches = read_cdf_for_table("cdf-table", 0, None)?; + let batches = read_cdf_for_table("cdf-table", 0, None, None)?; let mut expected = vec![ "+----+--------+------------+------------------+-----------------+", "| id | name | birthday | _change_type | _commit_version |", @@ -136,7 +138,7 @@ fn basic_cdf() -> Result<(), Box> { #[test] fn cdf_non_partitioned() -> Result<(), Box> { - let batches = read_cdf_for_table("cdf-table-non-partitioned", 0, None)?; + let batches = read_cdf_for_table("cdf-table-non-partitioned", 0, None, None)?; let mut expected = vec![ "+----+--------+------------+-------------------+---------------+--------------+----------------+------------------+-----------------+", "| id | name | birthday | long_field | boolean_field | double_field | smallint_field | _change_type | _commit_version |", @@ -175,7 +177,7 @@ fn cdf_non_partitioned() -> Result<(), Box> { #[test] fn cdf_with_cdc_and_dvs() -> Result<(), Box> { - let batches = read_cdf_for_table("cdf-table-with-cdc-and-dvs", 0, None)?; + let batches = read_cdf_for_table("cdf-table-with-cdc-and-dvs", 0, None, None)?; let mut expected = vec![ "+----+--------------------+------------------+-----------------+", "| id | comment | _change_type | _commit_version |", @@ -229,3 +231,312 @@ fn cdf_with_cdc_and_dvs() -> Result<(), Box> { assert_batches_sorted_eq!(expected, &batches); Ok(()) } + +#[test] +fn simple_cdf_version_ranges() -> DeltaResult<()> { + let batches = read_cdf_for_table("cdf-table-simple", 0, 0, None)?; + let mut expected = vec![ + "+----+--------------+-----------------+", + "| id | _change_type | _commit_version |", + "+----+--------------+-----------------+", + "| 0 | insert | 0 |", + "| 1 | insert | 0 |", + "| 2 | insert | 0 |", + "| 3 | insert | 0 |", + "| 4 | insert | 0 |", + "| 5 | insert | 0 |", + "| 6 | insert | 0 |", + "| 7 | insert | 0 |", + "| 8 | insert | 0 |", + "| 9 | insert | 0 |", + "+----+--------------+-----------------+", + ]; + sort_lines!(expected); + assert_batches_sorted_eq!(expected, &batches); + + let batches = read_cdf_for_table("cdf-table-simple", 1, 1, None)?; + let mut expected = vec![ + "+----+--------------+-----------------+", + "| id | _change_type | _commit_version |", + "+----+--------------+-----------------+", + "| 0 | delete | 1 |", + "| 1 | delete | 1 |", + "| 2 | delete | 1 |", + "| 3 | delete | 1 |", + "| 4 | delete | 1 |", + "| 5 | delete | 1 |", + "| 6 | delete | 1 |", + "| 7 | delete | 1 |", + "| 8 | delete | 1 |", + "| 9 | delete | 1 |", + "+----+--------------+-----------------+", + ]; + sort_lines!(expected); + assert_batches_sorted_eq!(expected, &batches); + + let batches = read_cdf_for_table("cdf-table-simple", 2, 2, None)?; + let mut expected = vec![ + "+----+--------------+-----------------+", + "| id | _change_type | _commit_version |", + "+----+--------------+-----------------+", + "| 20 | insert | 2 |", + "| 21 | insert | 2 |", + "| 22 | insert | 2 |", + "| 23 | insert | 2 |", + "| 24 | insert | 2 |", + "+----+--------------+-----------------+", + ]; + sort_lines!(expected); + assert_batches_sorted_eq!(expected, &batches); + + let batches = read_cdf_for_table("cdf-table-simple", 0, 2, None)?; + let mut expected = vec![ + "+----+--------------+-----------------+", + "| id | _change_type | _commit_version |", + "+----+--------------+-----------------+", + "| 0 | insert | 0 |", + "| 1 | insert | 0 |", + "| 2 | insert | 0 |", + "| 3 | insert | 0 |", + "| 4 | insert | 0 |", + "| 5 | insert | 0 |", + "| 6 | insert | 0 |", + "| 7 | insert | 0 |", + "| 8 | insert | 0 |", + "| 9 | insert | 0 |", + "| 0 | delete | 1 |", + "| 1 | delete | 1 |", + "| 2 | delete | 1 |", + "| 3 | delete | 1 |", + "| 4 | delete | 1 |", + "| 5 | delete | 1 |", + "| 6 | delete | 1 |", + "| 7 | delete | 1 |", + "| 8 | delete | 1 |", + "| 9 | delete | 1 |", + "| 20 | insert | 2 |", + "| 21 | insert | 2 |", + "| 22 | insert | 2 |", + "| 23 | insert | 2 |", + "| 24 | insert | 2 |", + "+----+--------------+-----------------+", + ]; + sort_lines!(expected); + assert_batches_sorted_eq!(expected, &batches); + Ok(()) +} + +#[test] +fn update_operations() -> DeltaResult<()> { + let batches = read_cdf_for_table("cdf-table-update-ops", 0, 2, None)?; + // Note: `update_pre` and `update_post` are technically not part of the delta spec, and instead + // should be `update_preimage` and `update_postimage` respectively. However, the tests in + // delta-spark use the post and pre. + let mut expected = vec![ + "+----+--------------+-----------------+", + "| id | _change_type | _commit_version |", + "+----+--------------+-----------------+", + "| 0 | insert | 0 |", + "| 1 | insert | 0 |", + "| 2 | insert | 0 |", + "| 3 | insert | 0 |", + "| 4 | insert | 0 |", + "| 5 | insert | 0 |", + "| 6 | insert | 0 |", + "| 7 | insert | 0 |", + "| 8 | insert | 0 |", + "| 9 | insert | 0 |", + "| 20 | update_pre | 1 |", + "| 21 | update_pre | 1 |", + "| 22 | update_pre | 1 |", + "| 23 | update_pre | 1 |", + "| 24 | update_pre | 1 |", + "| 30 | update_post | 2 |", + "| 31 | update_post | 2 |", + "| 32 | update_post | 2 |", + "| 33 | update_post | 2 |", + "| 34 | update_post | 2 |", + "+----+--------------+-----------------+", + ]; + sort_lines!(expected); + assert_batches_sorted_eq!(expected, &batches); + Ok(()) +} + +#[test] +fn false_data_change_is_ignored() -> DeltaResult<()> { + let batches = read_cdf_for_table("cdf-table-data-change", 0, 1, None)?; + let mut expected = vec![ + "+----+--------------+-----------------+", + "| id | _change_type | _commit_version |", + "+----+--------------+-----------------+", + "| 0 | insert | 0 |", + "| 1 | insert | 0 |", + "| 2 | insert | 0 |", + "| 3 | insert | 0 |", + "| 4 | insert | 0 |", + "| 5 | insert | 0 |", + "| 6 | insert | 0 |", + "| 7 | insert | 0 |", + "| 8 | insert | 0 |", + "| 9 | insert | 0 |", + "+----+--------------+-----------------+", + ]; + sort_lines!(expected); + assert_batches_sorted_eq!(expected, &batches); + Ok(()) +} + +#[test] +fn invalid_range_end_before_start() { + let res = read_cdf_for_table("cdf-table-simple", 1, 0, None); + let expected_msg = + "Failed to build LogSegment: start_version cannot be greater than end_version"; + assert!(matches!(res, Err(Error::Generic(msg)) if msg == expected_msg)); +} + +#[test] +fn invalid_range_start_after_last_version_of_table() { + let res = read_cdf_for_table("cdf-table-simple", 3, 4, None); + let expected_msg = "Expected the first commit to have version 3"; + assert!(matches!(res, Err(Error::Generic(msg)) if msg == expected_msg)); +} + +#[test] +fn partition_table() -> DeltaResult<()> { + let batches = read_cdf_for_table("cdf-table-partitioned", 0, 2, None)?; + let mut expected = vec![ + "+----+------+------+------------------+-----------------+", + "| id | text | part | _change_type | _commit_version |", + "+----+------+------+------------------+-----------------+", + "| 0 | old | 0 | insert | 0 |", + "| 1 | old | 1 | insert | 0 |", + "| 2 | old | 0 | insert | 0 |", + "| 3 | old | 1 | insert | 0 |", + "| 4 | old | 0 | insert | 0 |", + "| 5 | old | 1 | insert | 0 |", + "| 3 | old | 1 | delete | 1 |", + "| 1 | old | 1 | update_preimage | 1 |", + "| 1 | new | 1 | update_postimage | 1 |", + "| 0 | old | 0 | delete | 2 |", + "| 2 | old | 0 | delete | 2 |", + "| 4 | old | 0 | delete | 2 |", + "+----+------+------+------------------+-----------------+", + ]; + sort_lines!(expected); + assert_batches_sorted_eq!(expected, &batches); + Ok(()) +} + +#[test] +fn backtick_column_names() -> DeltaResult<()> { + let batches = read_cdf_for_table("cdf-table-backtick-column-names", 0, None, None)?; + let mut expected = vec![ + "+--------+----------+--------------------------+--------------+-----------------+", + "| id.num | id.num`s | struct_col | _change_type | _commit_version |", + "+--------+----------+--------------------------+--------------+-----------------+", + "| 2 | 10 | {field: 1, field.one: 2} | insert | 0 |", + "| 4 | 10 | {field: 1, field.one: 2} | insert | 0 |", + "| 1 | 10 | {field: 1, field.one: 2} | insert | 1 |", + "| 3 | 10 | {field: 1, field.one: 2} | insert | 1 |", + "| 5 | 10 | {field: 1, field.one: 2} | insert | 1 |", + "+--------+----------+--------------------------+--------------+-----------------+", + ]; + sort_lines!(expected); + assert_batches_sorted_eq!(expected, &batches); + Ok(()) +} + +#[test] +fn unconditional_delete() -> DeltaResult<()> { + let batches = read_cdf_for_table("cdf-table-delete-unconditional", 0, None, None)?; + let mut expected = vec![ + "+----+--------------+-----------------+", + "| id | _change_type | _commit_version |", + "+----+--------------+-----------------+", + "| 0 | insert | 0 |", + "| 1 | insert | 0 |", + "| 2 | insert | 0 |", + "| 3 | insert | 0 |", + "| 4 | insert | 0 |", + "| 5 | insert | 0 |", + "| 6 | insert | 0 |", + "| 7 | insert | 0 |", + "| 8 | insert | 0 |", + "| 9 | insert | 0 |", + "| 0 | delete | 1 |", + "| 1 | delete | 1 |", + "| 2 | delete | 1 |", + "| 3 | delete | 1 |", + "| 4 | delete | 1 |", + "| 5 | delete | 1 |", + "| 6 | delete | 1 |", + "| 7 | delete | 1 |", + "| 8 | delete | 1 |", + "| 9 | delete | 1 |", + "+----+--------------+-----------------+", + ]; + sort_lines!(expected); + assert_batches_sorted_eq!(expected, &batches); + Ok(()) +} + +#[test] +fn conditional_delete_all_rows() -> DeltaResult<()> { + let batches = read_cdf_for_table("cdf-table-delete-conditional-all-rows", 0, None, None)?; + let mut expected = vec![ + "+----+--------------+-----------------+", + "| id | _change_type | _commit_version |", + "+----+--------------+-----------------+", + "| 0 | insert | 0 |", + "| 1 | insert | 0 |", + "| 2 | insert | 0 |", + "| 3 | insert | 0 |", + "| 4 | insert | 0 |", + "| 5 | insert | 0 |", + "| 6 | insert | 0 |", + "| 7 | insert | 0 |", + "| 8 | insert | 0 |", + "| 9 | insert | 0 |", + "| 0 | delete | 1 |", + "| 1 | delete | 1 |", + "| 2 | delete | 1 |", + "| 3 | delete | 1 |", + "| 4 | delete | 1 |", + "| 5 | delete | 1 |", + "| 6 | delete | 1 |", + "| 7 | delete | 1 |", + "| 8 | delete | 1 |", + "| 9 | delete | 1 |", + "+----+--------------+-----------------+", + ]; + sort_lines!(expected); + assert_batches_sorted_eq!(expected, &batches); + Ok(()) +} + +#[test] +fn conditional_delete_two_rows() -> DeltaResult<()> { + let batches = read_cdf_for_table("cdf-table-delete-conditional-two-rows", 0, None, None)?; + let mut expected = vec![ + "+----+--------------+-----------------+", + "| id | _change_type | _commit_version |", + "+----+--------------+-----------------+", + "| 0 | insert | 0 |", + "| 1 | insert | 0 |", + "| 2 | insert | 0 |", + "| 3 | insert | 0 |", + "| 4 | insert | 0 |", + "| 5 | insert | 0 |", + "| 6 | insert | 0 |", + "| 7 | insert | 0 |", + "| 8 | insert | 0 |", + "| 9 | insert | 0 |", + "| 2 | delete | 1 |", + "| 8 | delete | 1 |", + "+----+--------------+-----------------+", + ]; + sort_lines!(expected); + assert_batches_sorted_eq!(expected, &batches); + Ok(()) +} diff --git a/kernel/tests/data/cdf-table-backtick-column-names.tar.zst b/kernel/tests/data/cdf-table-backtick-column-names.tar.zst new file mode 100644 index 000000000..a72a65511 Binary files /dev/null and b/kernel/tests/data/cdf-table-backtick-column-names.tar.zst differ diff --git a/kernel/tests/data/cdf-table-data-change.tar.zst b/kernel/tests/data/cdf-table-data-change.tar.zst new file mode 100644 index 000000000..974f7587f Binary files /dev/null and b/kernel/tests/data/cdf-table-data-change.tar.zst differ diff --git a/kernel/tests/data/cdf-table-delete-conditional-all-rows.tar.zst b/kernel/tests/data/cdf-table-delete-conditional-all-rows.tar.zst new file mode 100644 index 000000000..fe38e64b1 Binary files /dev/null and b/kernel/tests/data/cdf-table-delete-conditional-all-rows.tar.zst differ diff --git a/kernel/tests/data/cdf-table-delete-conditional-two-rows.tar.zst b/kernel/tests/data/cdf-table-delete-conditional-two-rows.tar.zst new file mode 100644 index 000000000..e10371991 Binary files /dev/null and b/kernel/tests/data/cdf-table-delete-conditional-two-rows.tar.zst differ diff --git a/kernel/tests/data/cdf-table-delete-unconditional.tar.zst b/kernel/tests/data/cdf-table-delete-unconditional.tar.zst new file mode 100644 index 000000000..4e57df8be Binary files /dev/null and b/kernel/tests/data/cdf-table-delete-unconditional.tar.zst differ diff --git a/kernel/tests/data/cdf-table-non-partitioned.tar.zst b/kernel/tests/data/cdf-table-non-partitioned.tar.zst index 2a2f08cf0..f97e1ea8a 100644 Binary files a/kernel/tests/data/cdf-table-non-partitioned.tar.zst and b/kernel/tests/data/cdf-table-non-partitioned.tar.zst differ diff --git a/kernel/tests/data/cdf-table-partitioned.tar.zst b/kernel/tests/data/cdf-table-partitioned.tar.zst new file mode 100644 index 000000000..9d47d85a6 Binary files /dev/null and b/kernel/tests/data/cdf-table-partitioned.tar.zst differ diff --git a/kernel/tests/data/cdf-table-simple.tar.zst b/kernel/tests/data/cdf-table-simple.tar.zst new file mode 100644 index 000000000..0051f05f4 Binary files /dev/null and b/kernel/tests/data/cdf-table-simple.tar.zst differ diff --git a/kernel/tests/data/cdf-table-update-ops.tar.zst b/kernel/tests/data/cdf-table-update-ops.tar.zst new file mode 100644 index 000000000..33134b22b Binary files /dev/null and b/kernel/tests/data/cdf-table-update-ops.tar.zst differ diff --git a/kernel/tests/data/cdf-table-with-cdc-and-dvs.tar.zst b/kernel/tests/data/cdf-table-with-cdc-and-dvs.tar.zst index 6304ccb06..110aa0e0e 100644 Binary files a/kernel/tests/data/cdf-table-with-cdc-and-dvs.tar.zst and b/kernel/tests/data/cdf-table-with-cdc-and-dvs.tar.zst differ diff --git a/kernel/tests/data/cdf-table-with-dv.tar.zst b/kernel/tests/data/cdf-table-with-dv.tar.zst index 76c9f09da..d3aa6574d 100644 Binary files a/kernel/tests/data/cdf-table-with-dv.tar.zst and b/kernel/tests/data/cdf-table-with-dv.tar.zst differ diff --git a/kernel/tests/data/cdf-table.tar.zst b/kernel/tests/data/cdf-table.tar.zst index 30868b12c..9bdb37d44 100644 Binary files a/kernel/tests/data/cdf-table.tar.zst and b/kernel/tests/data/cdf-table.tar.zst differ diff --git a/kernel/tests/golden_tables.rs b/kernel/tests/golden_tables.rs index cd9023db1..120271ef2 100644 --- a/kernel/tests/golden_tables.rs +++ b/kernel/tests/golden_tables.rs @@ -26,7 +26,7 @@ use delta_kernel::engine::default::DefaultEngine; mod common; use common::{load_test_data, to_arrow}; -// NB adapated from DAT: read all parquet files in the directory and concatenate them +// NB adapted from DAT: read all parquet files in the directory and concatenate them async fn read_expected(path: &Path) -> DeltaResult { let store = Arc::new(LocalFileSystem::new_with_prefix(path)?); let files = store.list(None).try_collect::>().await?; @@ -368,7 +368,7 @@ golden_test!("deltalog-getChanges", latest_snapshot_test); golden_test!("dv-partitioned-with-checkpoint", latest_snapshot_test); golden_test!("dv-with-columnmapping", latest_snapshot_test); -skip_test!("hive": "test not yet implmented - different file structure"); +skip_test!("hive": "test not yet implemented - different file structure"); golden_test!("kernel-timestamp-int96", latest_snapshot_test); golden_test!("kernel-timestamp-pst", latest_snapshot_test); golden_test!("kernel-timestamp-timestamp_micros", latest_snapshot_test); @@ -436,11 +436,11 @@ skip_test!("canonicalized-paths-special-b": "BUG: path canonicalization"); // // We added two add files with the same path `foo`. The first should have been removed. // // The second should remain, and should have a hard-coded modification time of 1700000000000L // assert(foundFiles.find(_.getPath.endsWith("foo")).exists(_.getModificationTime == 1700000000000L)) -skip_test!("delete-re-add-same-file-different-transactions": "test not yet implmented"); +skip_test!("delete-re-add-same-file-different-transactions": "test not yet implemented"); // data file doesn't exist, get the relative path to compare // assert(new File(addFileStatus.getPath).getName == "special p@#h") -skip_test!("log-replay-special-characters-b": "test not yet implmented"); +skip_test!("log-replay-special-characters-b": "test not yet implemented"); negative_test!("deltalog-invalid-protocol-version"); negative_test!("deltalog-state-reconstruction-from-checkpoint-missing-metadata"); diff --git a/kernel/tests/write.rs b/kernel/tests/write.rs index e62f8fd7c..2ee6dfdd5 100644 --- a/kernel/tests/write.rs +++ b/kernel/tests/write.rs @@ -147,10 +147,9 @@ async fn test_commit_info() -> Result<(), Box> { let (store, engine, table_location) = setup("test_table", true); // create a simple table: one int column named 'number' - let schema = Arc::new(StructType::new(vec![StructField::new( + let schema = Arc::new(StructType::new(vec![StructField::nullable( "number", DataType::INTEGER, - true, )])); let table = create_table(store.clone(), table_location, schema, &[]).await?; @@ -201,10 +200,9 @@ async fn test_empty_commit() -> Result<(), Box> { let (store, engine, table_location) = setup("test_table", true); // create a simple table: one int column named 'number' - let schema = Arc::new(StructType::new(vec![StructField::new( + let schema = Arc::new(StructType::new(vec![StructField::nullable( "number", DataType::INTEGER, - true, )])); let table = create_table(store.clone(), table_location, schema, &[]).await?; @@ -224,10 +222,9 @@ async fn test_invalid_commit_info() -> Result<(), Box> { let (store, engine, table_location) = setup("test_table", true); // create a simple table: one int column named 'number' - let schema = Arc::new(StructType::new(vec![StructField::new( + let schema = Arc::new(StructType::new(vec![StructField::nullable( "number", DataType::INTEGER, - true, )])); let table = create_table(store.clone(), table_location, schema, &[]).await?; @@ -336,10 +333,9 @@ async fn test_append() -> Result<(), Box> { let (store, engine, table_location) = setup("test_table", true); // create a simple table: one int column named 'number' - let schema = Arc::new(StructType::new(vec![StructField::new( + let schema = Arc::new(StructType::new(vec![StructField::nullable( "number", DataType::INTEGER, - true, )])); let table = create_table(store.clone(), table_location, schema.clone(), &[]).await?; @@ -466,13 +462,12 @@ async fn test_append_partitioned() -> Result<(), Box> { // create a simple partitioned table: one int column named 'number', partitioned by string // column named 'partition' let table_schema = Arc::new(StructType::new(vec![ - StructField::new("number", DataType::INTEGER, true), - StructField::new("partition", DataType::STRING, true), + StructField::nullable("number", DataType::INTEGER), + StructField::nullable("partition", DataType::STRING), ])); - let data_schema = Arc::new(StructType::new(vec![StructField::new( + let data_schema = Arc::new(StructType::new(vec![StructField::nullable( "number", DataType::INTEGER, - true, )])); let table = create_table( store.clone(), @@ -611,16 +606,14 @@ async fn test_append_invalid_schema() -> Result<(), Box> let (store, engine, table_location) = setup("test_table", true); // create a simple table: one int column named 'number' - let table_schema = Arc::new(StructType::new(vec![StructField::new( + let table_schema = Arc::new(StructType::new(vec![StructField::nullable( "number", DataType::INTEGER, - true, )])); // incompatible data schema: one string column named 'string' - let data_schema = Arc::new(StructType::new(vec![StructField::new( + let data_schema = Arc::new(StructType::new(vec![StructField::nullable( "string", DataType::STRING, - true, )])); let table = create_table(store.clone(), table_location, table_schema.clone(), &[]).await?; diff --git a/release.sh b/release.sh index 640fb4faa..b16605c9d 100755 --- a/release.sh +++ b/release.sh @@ -53,7 +53,7 @@ is_working_tree_clean() { is_version_published() { local crate_name="$1" local version - version=get_current_version "$crate_name" + version=$(get_current_version "$crate_name") if [[ -z "$version" ]]; then log_error "Could not find crate '$crate_name' in workspace" @@ -122,14 +122,20 @@ handle_release_branch() { # Handle main branch workflow (publish and tag) handle_main_branch() { # could potentially just use full 'cargo release' command here - publish "delta_kernel_derive" "$current_version" - publish "delta_kernel" "$current_version" + publish "delta_kernel_derive" + publish "delta_kernel" + + # hack: just redo getting the version + local version + version=$(get_current_version "delta_kernel") if confirm "Would you like to tag this release?"; then - log_info "Tagging release $current_version..." - git tag -a "v$current_version" -m "Release $current_version" - git push upstream "v$current_version" - log_success "Tagged release $current_version" + log_info "Tagging release $version..." + if confirm "Tagging as v$version. continue?"; then + git tag -a "v$version" -m "Release v$version" + git push upstream tag "v$version" + log_success "Tagged release $version" + fi fi } @@ -138,20 +144,20 @@ publish() { local current_version current_version=$(get_current_version "$crate_name") - if is_version_published "delta_kernel_derive"; then - log_error "delta_kernel_derive version $current_version is already published to crates.io" + if is_version_published "$crate_name"; then + log_error "$crate_name version $current_version is already published to crates.io" fi - log_info "[DRY RUN] Publishing $crate_name version $version to crates.io..." + log_info "[DRY RUN] Publishing $crate_name version $current_version to crates.io..." if ! cargo publish --dry-run -p "$crate_name"; then log_error "Failed to publish $crate_name to crates.io" fi if confirm "Dry run complete. Continue with publishing?"; then - log_info "Publishing $crate_name version $version to crates.io..." + log_info "Publishing $crate_name version $current_version to crates.io..." if ! cargo publish -p "$crate_name"; then log_error "Failed to publish $crate_name to crates.io" fi - log_success "Successfully published $crate_name version $version to crates.io" + log_success "Successfully published $crate_name version $current_version to crates.io" fi }