Skip to content

Commit

Permalink
Part 1, Read transforms via expressions: Just compute the expression …
Browse files Browse the repository at this point in the history
…and return it. (#607)

<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-incubator/delta-kernel-rs/blob/main/CONTRIBUTING.md
2. Run `cargo t --all-features --all-targets` to get started testing,
and run `cargo fmt`.
  3. Ensure you have added or run the appropriate tests for your PR.
4. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  5. Be sure to keep the PR description updated to reflect all changes.
-->

## What changes are proposed in this pull request?
<!--
Please clarify what changes you are proposing and why the changes are
needed.
The purpose of this section is to outline the changes, why they are
needed, and how this PR fixes the issue.
If the reason for the change is already explained clearly in an issue,
then it does not need to be restated here.
1. If you propose a new API or feature, clarify the use case for a new
API or feature.
  2. If you fix a bug, you can clarify why it is a bug.
-->

This is the initial part of moving to using expressions to express
transformations when reading data. What this PR does is:
- Compute a "static" transform, which is just a set of column
expressions that need to be passed directly through without change, or
enough metadata for lower levels to fill in a "fixup" expression
- The static transform is passed into the iterator that parses each
`Add` file
- When parsing the `Add` file, if there are needed fix-ups (just
partition columns today), the correct expression is created, and
inserted into a row indexed map
- This map is returned so the caller can find out for a given row what,
if any, expression needs to be applied when reading the specified row

Follow-up PRs:
* #612: Propagate this information through when using `visit_scan_files`
* #613: Actually use the data to do transformation and remove
`transform_to_logical` entirely
* #614: Make this work over ffi and use it
* (TODO): Clean up any existing code that's now over complicated in the
scan building

Each of those are more invasive and end up touching significant code, so
I'm staging this as much as possible to make reviews easier.

<!--
Uncomment this section if there are any changes affecting public APIs:
### This PR affects the following public APIs

If there are breaking changes, please ensure the `breaking-changes`
label gets added by CI, and describe why the changes are needed.

Note that _new_ public APIs are not considered breaking.
-->


## How was this change tested?
<!--
Please make sure to add test cases that check the changes thoroughly
including negative and positive cases if possible.
If it was tested in a way different from regular unit tests, please
clarify how you tested, ideally via a reproducible test documented in
the PR description.
-->

Unit tests, and inspection of resultant expressions when run on tables
  • Loading branch information
nicklan authored Jan 23, 2025
1 parent bf7e212 commit 45eedf2
Show file tree
Hide file tree
Showing 8 changed files with 250 additions and 29 deletions.
2 changes: 1 addition & 1 deletion ffi/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ fn kernel_scan_data_next_impl(
.data
.lock()
.map_err(|_| Error::generic("poisoned mutex"))?;
if let Some((data, sel_vec)) = data.next().transpose()? {
if let Some((data, sel_vec, _transforms)) = data.next().transpose()? {
let bool_slice = KernelBoolSlice::from(sel_vec);
(engine_visitor)(engine_context, data.into(), bool_slice);
Ok(true)
Expand Down
2 changes: 1 addition & 1 deletion kernel/examples/inspect-table/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ fn try_main() -> DeltaResult<()> {
let scan = ScanBuilder::new(snapshot).build()?;
let scan_data = scan.scan_data(&engine)?;
for res in scan_data {
let (data, vector) = res?;
let (data, vector, _transforms) = res?;
delta_kernel::scan::state::visit_scan_files(
data.as_ref(),
&vector,
Expand Down
2 changes: 1 addition & 1 deletion kernel/examples/read-table-multi-threaded/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ fn try_main() -> DeltaResult<()> {
drop(record_batch_tx);

for res in scan_data {
let (data, vector) = res?;
let (data, vector, _transforms) = res?;
scan_file_tx = delta_kernel::scan::state::visit_scan_files(
data.as_ref(),
&vector,
Expand Down
4 changes: 3 additions & 1 deletion kernel/src/engine_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,9 @@ pub trait TypedGetData<'a, T> {
fn get_opt(&'a self, row_index: usize, field_name: &str) -> DeltaResult<Option<T>>;
fn get(&'a self, row_index: usize, field_name: &str) -> DeltaResult<T> {
let val = self.get_opt(row_index, field_name)?;
val.ok_or_else(|| Error::MissingData(format!("Data missing for field {field_name}")))
val.ok_or_else(|| {
Error::MissingData(format!("Data missing for field {field_name}")).with_backtrace()
})
}
}

Expand Down
185 changes: 167 additions & 18 deletions kernel/src/scan/log_replay.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
use std::clone::Clone;
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use std::sync::{Arc, LazyLock};

use itertools::Itertools;
use tracing::debug;

use super::data_skipping::DataSkippingFilter;
use super::ScanData;
use super::{ScanData, Transform};
use crate::actions::get_log_add_schema;
use crate::engine_data::{GetData, RowVisitor, TypedGetData as _};
use crate::expressions::{column_expr, column_name, ColumnName, Expression, ExpressionRef};
use crate::scan::DeletionVectorDescriptor;
use crate::scan::{DeletionVectorDescriptor, TransformExpr};
use crate::schema::{ColumnNamesAndTypes, DataType, MapType, SchemaRef, StructField, StructType};
use crate::utils::require;
use crate::{DeltaResult, Engine, EngineData, Error, ExpressionEvaluator};
Expand Down Expand Up @@ -44,12 +45,17 @@ struct LogReplayScanner {
struct AddRemoveDedupVisitor<'seen> {
seen: &'seen mut HashSet<FileActionKey>,
selection_vector: Vec<bool>,
logical_schema: SchemaRef,
transform: Option<Arc<Transform>>,
row_transform_exprs: Vec<Option<ExpressionRef>>,
is_log_batch: bool,
}

impl AddRemoveDedupVisitor<'_> {
/// Checks if log replay already processed this logical file (in which case the current action
/// should be ignored). If not already seen, register it so we can recognize future duplicates.
/// Returns `true` if we have seen the file and should ignore it, `false` if we have not seen it
/// and should process it.
fn check_and_record_seen(&mut self, key: FileActionKey) -> bool {
// Note: each (add.path + add.dv_unique_id()) pair has a
// unique Add + Remove pair in the log. For example:
Expand All @@ -76,18 +82,49 @@ impl AddRemoveDedupVisitor<'_> {
}
}

/// Compute an expression that will transform from physical to logical for a given Add file action
fn get_transform_expr<'a>(
&self,
i: usize,
transform: &Transform,
getters: &[&'a dyn GetData<'a>],
) -> DeltaResult<ExpressionRef> {
let partition_values: HashMap<_, _> = getters[1].get(i, "add.partitionValues")?;
let transforms = transform
.iter()
.map(|transform_expr| match transform_expr {
TransformExpr::Partition(field_idx) => {
let field = self.logical_schema.fields.get_index(*field_idx);
let Some((_, field)) = field else {
return Err(Error::Generic(
format!("logical schema did not contain expected field at {field_idx}, can't transform data")
));
};
let name = field.physical_name();
let partition_value = super::parse_partition_value(
partition_values.get(name),
field.data_type(),
)?;
Ok(partition_value.into())
}
TransformExpr::Static(field_expr) => Ok(field_expr.clone()),
})
.try_collect()?;
Ok(Arc::new(Expression::Struct(transforms)))
}

/// True if this row contains an Add action that should survive log replay. Skip it if the row
/// is not an Add action, or the file has already been seen previously.
fn is_valid_add<'a>(&mut self, i: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<bool> {
// Add will have a path at index 0 if it is valid; otherwise, if it is a log batch, we may
// have a remove with a path at index 4. In either case, extract the three dv getters at
// indexes that immediately follow a valid path index.
let (path, dv_getters, is_add) = if let Some(path) = getters[0].get_str(i, "add.path")? {
(path, &getters[1..4], true)
(path, &getters[2..5], true)
} else if !self.is_log_batch {
return Ok(false);
} else if let Some(path) = getters[4].get_opt(i, "remove.path")? {
(path, &getters[5..8], false)
} else if let Some(path) = getters[5].get_opt(i, "remove.path")? {
(path, &getters[6..9], false)
} else {
return Ok(false);
};
Expand All @@ -101,9 +138,22 @@ impl AddRemoveDedupVisitor<'_> {
None => None,
};

// Process both adds and removes, but only return not already-seen adds
// Check both adds and removes (skipping already-seen), but only transform and return adds
let file_key = FileActionKey::new(path, dv_unique_id);
Ok(!self.check_and_record_seen(file_key) && is_add)
if self.check_and_record_seen(file_key) || !is_add {
return Ok(false);
}
let transform = self
.transform
.as_ref()
.map(|transform| self.get_transform_expr(i, transform, getters))
.transpose()?;
if transform.is_some() {
// fill in any needed `None`s for previous rows
self.row_transform_exprs.resize_with(i, Default::default);
self.row_transform_exprs.push(transform);
}
Ok(true)
}
}

Expand All @@ -113,8 +163,10 @@ impl RowVisitor for AddRemoveDedupVisitor<'_> {
static NAMES_AND_TYPES: LazyLock<ColumnNamesAndTypes> = LazyLock::new(|| {
const STRING: DataType = DataType::STRING;
const INTEGER: DataType = DataType::INTEGER;
let ss_map: DataType = MapType::new(STRING, STRING, true).into();
let types_and_names = vec![
(STRING, column_name!("add.path")),
(ss_map, column_name!("add.partitionValues")),
(STRING, column_name!("add.deletionVector.storageType")),
(STRING, column_name!("add.deletionVector.pathOrInlineDv")),
(INTEGER, column_name!("add.deletionVector.offset")),
Expand All @@ -132,12 +184,12 @@ impl RowVisitor for AddRemoveDedupVisitor<'_> {
} else {
// All checkpoint actions are already reconciled and Remove actions in checkpoint files
// only serve as tombstones for vacuum jobs. So we only need to examine the adds here.
(&names[..4], &types[..4])
(&names[..5], &types[..5])
}
}

fn visit<'a>(&mut self, row_count: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<()> {
let expected_getters = if self.is_log_batch { 8 } else { 4 };
let expected_getters = if self.is_log_batch { 9 } else { 5 };
require!(
getters.len() == expected_getters,
Error::InternalError(format!(
Expand Down Expand Up @@ -207,6 +259,8 @@ impl LogReplayScanner {
&mut self,
add_transform: &dyn ExpressionEvaluator,
actions: &dyn EngineData,
logical_schema: SchemaRef,
transform: Option<Arc<Transform>>,
is_log_batch: bool,
) -> DeltaResult<ScanData> {
// Apply data skipping to get back a selection vector for actions that passed skipping. We
Expand All @@ -220,24 +274,29 @@ impl LogReplayScanner {
let mut visitor = AddRemoveDedupVisitor {
seen: &mut self.seen,
selection_vector,
logical_schema,
transform,
row_transform_exprs: Vec::new(),
is_log_batch,
};
visitor.visit_rows_of(actions)?;

// TODO: Teach expression eval to respect the selection vector we just computed so carefully!
let selection_vector = visitor.selection_vector;
let result = add_transform.evaluate(actions)?;
Ok((result, selection_vector))
Ok((result, selection_vector, visitor.row_transform_exprs))
}
}

/// Given an iterator of (engine_data, bool) tuples and a predicate, returns an iterator of
/// `(engine_data, selection_vec)`. Each row that is selected in the returned `engine_data` _must_
/// be processed to complete the scan. Non-selected rows _must_ be ignored. The boolean flag
/// indicates whether the record batch is a log or checkpoint batch.
pub fn scan_action_iter(
pub(crate) fn scan_action_iter(
engine: &dyn Engine,
action_iter: impl Iterator<Item = DeltaResult<(Box<dyn EngineData>, bool)>>,
logical_schema: SchemaRef,
transform: Option<Arc<Transform>>,
physical_predicate: Option<(ExpressionRef, SchemaRef)>,
) -> impl Iterator<Item = DeltaResult<ScanData>> {
let mut log_scanner = LogReplayScanner::new(engine, physical_predicate);
Expand All @@ -249,20 +308,37 @@ pub fn scan_action_iter(
action_iter
.map(move |action_res| {
let (batch, is_log_batch) = action_res?;
log_scanner.process_scan_batch(add_transform.as_ref(), batch.as_ref(), is_log_batch)
log_scanner.process_scan_batch(
add_transform.as_ref(),
batch.as_ref(),
logical_schema.clone(),
transform.clone(),
is_log_batch,
)
})
.filter(|res| res.as_ref().map_or(true, |(_, sv)| sv.contains(&true)))
.filter(|res| res.as_ref().map_or(true, |(_, sv, _)| sv.contains(&true)))
}

#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::{collections::HashMap, sync::Arc};

use crate::scan::{
state::{DvInfo, Stats},
test_utils::{add_batch_simple, add_batch_with_remove, run_with_validate_callback},
use crate::expressions::{column_name, Scalar};
use crate::scan::state::{DvInfo, Stats};
use crate::scan::test_utils::{
add_batch_simple, add_batch_with_partition_col, add_batch_with_remove,
run_with_validate_callback,
};
use crate::scan::{get_state_info, Scan};
use crate::Expression;
use crate::{
engine::sync::SyncEngine,
schema::{DataType, SchemaRef, StructField, StructType},
ExpressionRef,
};

use super::scan_action_iter;

// dv-info is more complex to validate, we validate that works in the test for visit_scan_files
// in state.rs
fn validate_simple(
Expand All @@ -288,6 +364,8 @@ mod tests {
fn test_scan_action_iter() {
run_with_validate_callback(
vec![add_batch_simple()],
None, // not testing schema
None, // not testing transform
&[true, false],
(),
validate_simple,
Expand All @@ -298,9 +376,80 @@ mod tests {
fn test_scan_action_iter_with_remove() {
run_with_validate_callback(
vec![add_batch_with_remove()],
None, // not testing schema
None, // not testing transform
&[false, false, true, false],
(),
validate_simple,
);
}

#[test]
fn test_no_transforms() {
let batch = vec![add_batch_simple()];
let logical_schema = Arc::new(crate::schema::StructType::new(vec![]));
let iter = scan_action_iter(
&SyncEngine::new(),
batch.into_iter().map(|batch| Ok((batch as _, true))),
logical_schema,
None,
None,
);
for res in iter {
let (_batch, _sel, transforms) = res.unwrap();
assert!(transforms.is_empty(), "Should have no transforms");
}
}

#[test]
fn test_simple_transform() {
let schema: SchemaRef = Arc::new(StructType::new([
StructField::new("value", DataType::INTEGER, true),
StructField::new("date", DataType::DATE, true),
]));
let partition_cols = ["date".to_string()];
let state_info = get_state_info(schema.as_ref(), &partition_cols).unwrap();
let static_transform = Some(Arc::new(Scan::get_static_transform(&state_info.all_fields)));
let batch = vec![add_batch_with_partition_col()];
let iter = scan_action_iter(
&SyncEngine::new(),
batch.into_iter().map(|batch| Ok((batch as _, true))),
schema,
static_transform,
None,
);

fn validate_transform(transform: Option<&ExpressionRef>, expected_date_offset: i32) {
assert!(transform.is_some());
let Expression::Struct(inner) = transform.unwrap().as_ref() else {
panic!("Transform should always be a struct expr");
};
assert_eq!(inner.len(), 2, "expected two items in transform struct");

let Expression::Column(ref name) = inner[0] else {
panic!("Expected first expression to be a column");
};
assert_eq!(name, &column_name!("value"), "First col should be 'value'");

let Expression::Literal(ref scalar) = inner[1] else {
panic!("Expected second expression to be a literal");
};
assert_eq!(
scalar,
&Scalar::Date(expected_date_offset),
"Didn't get expected date offset"
);
}

for res in iter {
let (_batch, _sel, transforms) = res.unwrap();
// in this case we have a metadata action first and protocol 3rd, so we expect 4 items,
// the first and 3rd being a `None`
assert_eq!(transforms.len(), 4, "Should have 4 transforms");
assert!(transforms[0].is_none(), "transform at [0] should be None");
assert!(transforms[2].is_none(), "transform at [2] should be None");
validate_transform(transforms[1].as_ref(), 17511);
validate_transform(transforms[3].as_ref(), 17510);
}
}
}
Loading

0 comments on commit 45eedf2

Please sign in to comment.