Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make eval_sql_where available to DefaultPredicateEvaluator #627

Merged
merged 17 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions kernel/src/engine/parquet_row_group_skipping.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
//! An implementation of parquet row group skipping using data skipping predicates over footer stats.
use crate::predicates::parquet_stats_skipping::{
Copy link
Collaborator Author

@scovich scovich Jan 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aside: Not sure how this out of order import escaped cargo fmt before now?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like it's just flattening it and you moved the import anyways.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't cargo fmt order imports alphabetically? If so, how did use crate::predicates end up before use crate::expressions?

ParquetStatsProvider, ParquetStatsSkippingFilter as _,
};
use crate::expressions::{ColumnName, Expression, Scalar, UnaryExpression, BinaryExpression, VariadicExpression};
use crate::predicates::parquet_stats_skipping::ParquetStatsProvider;
use crate::schema::{DataType, PrimitiveType};
use chrono::{DateTime, Days};
use parquet::arrow::arrow_reader::ArrowReaderBuilder;
Expand Down Expand Up @@ -57,6 +55,7 @@ impl<'a> RowGroupFilter<'a> {

/// Applies a filtering predicate to a row group. Return value false means to skip it.
fn apply(row_group: &'a RowGroupMetaData, predicate: &Expression) -> bool {
use crate::predicates::PredicateEvaluator as _;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aside: I didn't know this could be used to import something as unnamed. cool stuff!

RowGroupFilter::new(row_group, predicate).eval_sql_where(predicate) != Some(false)
}

Expand Down
11 changes: 11 additions & 0 deletions kernel/src/expressions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,17 @@ pub enum BinaryOperator {
}

impl BinaryOperator {
/// True if this is a comparison for which NULL input always produces NULL output
pub(crate) fn is_null_intolerant_comparison(&self) -> bool {
use BinaryOperator::*;
match self {
Plus | Minus | Multiply | Divide => false, // not a comparison
LessThan | LessThanOrEqual | GreaterThan | GreaterThanOrEqual => true,
Equal | NotEqual => true,
Distinct | In | NotIn => false, // tolerates NULL input
}
}

/// Returns `<op2>` (if any) such that `B <op2> A` is equivalent to `A <op> B`.
pub(crate) fn commute(&self) -> Option<BinaryOperator> {
use BinaryOperator::*;
Expand Down
192 changes: 176 additions & 16 deletions kernel/src/predicates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,21 @@ mod tests;
///
/// Because inversion (`NOT` operator) has special semantics and can often be optimized away by
/// pushing it down, most methods take an `inverted` flag. That allows operations like
/// [`UnaryOperator::Not`] to simply evaluate their operand with a flipped `inverted` flag,
/// [`UnaryOperator::Not`] to simply evaluate their operand with a flipped `inverted` flag, and
/// greatly simplifies the implementations of most operators (other than those which have to
/// directly implement NOT semantics, which are unavoidably complex in that regard).
///
/// # Parameterized output type
///
/// The types involved in predicate evaluation are parameterized and implementation-specific. For
/// example, [`crate::engine::parquet_stats_skipping::ParquetStatsProvider`] directly evaluates the
/// predicate over parquet footer stats and returns boolean results, while
/// [`crate::scan::data_skipping::DataSkippingPredicateCreator`] instead transforms the input
/// predicate expression to a data skipping predicate expresion that the engine can evaluated
/// directly against Delta data skipping stats during log replay. Although this approach is harder
/// to read and reason about at first, the majority of expressions can be implemented generically,
/// which greatly reduces redundancy and ensures that all flavors of predicate evaluation have the
/// same semantics.
///
/// # NULL and error semantics
///
Expand All @@ -44,6 +58,9 @@ mod tests;
pub(crate) trait PredicateEvaluator {
type Output;

/// A (possibly inverted) scalar NULL test, e.g. `<value> IS [NOT] NULL`.
fn eval_scalar_is_null(&self, val: &Scalar, inverted: bool) -> Option<Self::Output>;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry slight tangent: it feels like the IS [NOT] NULL implies that output = bool? For the default case it has output = bool but then for data skiping that output is an expr?

Copy link
Collaborator Author

@scovich scovich Jan 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PredicateEvaluator is generic.

The default and parquet predicate evaluators (which do "direct" evaluation) do directly return bool.

The data skipping predicate evaluator is different -- it is "indirect" and translates a normal predicate expression into a data skipping predicate expression that can later be applied multiple times to different stats rows (by engine expression handler in prod, or with the default predicate evaluator in tests). In that case eval_scalar_is_null (like all methods) returns Option<Expr>. The returned expression will return boolean values once we do evaluate it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @scovich! to ensure my understanding: for example,

  1. default/parquet predicate eval would yield immediate evaluation to bool of col < 5
  2. whereas data skipping predicate eval would yield some expr like minValues.col < 5. the 'evaluation' of the predicate in PredicateEvaluator is more like a transformation in that case?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exactly!


/// A (possibly inverted) boolean scalar value, e.g. `[NOT] <value>`.
fn eval_scalar(&self, val: &Scalar, inverted: bool) -> Option<Self::Output>;

Expand Down Expand Up @@ -123,14 +140,19 @@ pub(crate) trait PredicateEvaluator {
fn eval_unary(&self, op: UnaryOperator, expr: &Expr, inverted: bool) -> Option<Self::Output> {
match op {
UnaryOperator::Not => self.eval_expr(expr, !inverted),
UnaryOperator::IsNull => {
// Data skipping only supports IS [NOT] NULL over columns (not expressions)
let Expr::Column(col) = expr else {
UnaryOperator::IsNull => match expr {
// WARNING: Only literals and columns can be safely null-checked. Attempting to
// null-check an expressions such as `a < 10` could wrongly produce FALSE in case
// `a` is just plain missing (rather than known to be NULL. A missing-value can
// arise e.g. if data skipping encounters a column with missing stats, or if
// partition pruning encounters a non-partition column.
Expr::Literal(val) => self.eval_scalar_is_null(val, inverted),
Expr::Column(col) => self.eval_is_null(col, inverted),
_ => {
debug!("Unsupported operand: IS [NOT] NULL: {expr:?}");
return None;
};
self.eval_is_null(col, inverted)
}
None
}
},
}
}

Expand Down Expand Up @@ -229,12 +251,137 @@ pub(crate) trait PredicateEvaluator {
Variadic(VariadicExpression { op, exprs }) => self.eval_variadic(*op, exprs, inverted),
}
}

/// Evaluates a predicate with SQL WHERE semantics.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EXTREMELY helpful comment + examples :)

///
/// By default, [`eval_expr`] behaves badly for comparisons involving NULL columns (e.g. `a <
/// 10` when `a` is NULL), because the comparison correctly evaluates to NULL, but NULL
/// expressions are interpreted as "stats missing" (= cannot skip). This ambiguity can "poison"
/// the entire expression, causing it to return NULL instead of FALSE that would allow skipping:
///
/// ```text
/// WHERE a < 10 -- NULL (can't skip file)
/// WHERE a < 10 AND TRUE -- NULL (can't skip file)
/// WHERE a < 10 OR FALSE -- NULL (can't skip file)
/// ```
///
/// Meanwhile, SQL WHERE semantics only keeps rows for which the filter evaluates to
/// TRUE (discarding rows that evaluate to FALSE or NULL):
///
/// ```text
/// WHERE a < 10 -- NULL (discard row)
/// WHERE a < 10 AND TRUE -- NULL (discard row)
/// WHERE a < 10 OR FALSE -- NULL (discard row)
/// ```
///
/// Conceptually, the behavior difference between data skipping and SQL WHERE semantics can be
/// addressed by evaluating with null-safe semantics, as if by `<expr> IS NOT NULL AND <expr>`:
///
/// ```text
/// WHERE (a < 10) IS NOT NULL AND (a < 10) -- FALSE (skip file)
/// WHERE (a < 10 AND TRUE) IS NOT NULL AND (a < 10 AND TRUE) -- FALSE (skip file)
/// WHERE (a < 10 OR FALSE) IS NOT NULL AND (a < 10 OR FALSE) -- FALSE (skip file)
/// ```
///
/// HOWEVER, we cannot safely NULL-check the result of an arbitrary data skipping predicate
/// because an expression will also produce NULL if the value is just plain missing (e.g. data
/// skipping over a column that lacks stats), and if that NULL should propagate all the way to
/// top-level, it would be wrongly interpreted as FALSE (= skippable).
///
/// To prevent wrong data skipping, the predicate evaluator always returns NULL for a NULL check
/// over anything except for literals and columns with known values. So we must push the NULL
/// check down through supported operations (AND as well as null-intolerant comparisons like
/// `<`, `!=`, etc) until it reaches columns and literals where it can do some good, e.g.:
///
/// ```text
/// WHERE a < 10 AND (b < 20 OR c < 30)
/// ```
///
/// would conceptually be interpreted as
///
/// ```text
/// WHERE
/// (a < 10 AND (b < 20 OR c < 30)) IS NOT NULL AND
/// (a < 10 AND (b < 20 OR c < 30))
/// ```
///
/// We then push the NULL check down through the top-level AND:
///
/// ```text
/// WHERE
/// (a < 10 IS NOT NULL AND a < 10) AND
/// ((b < 20 OR c < 30) IS NOT NULL AND (b < 20 OR c < 30))
/// ```
///
/// and attempt to push it further into the `a < 10` and `OR` clauses:
///
/// ```text
/// WHERE
/// (a IS NOT NULL AND 10 IS NOT NULL AND a < 10) AND
/// (b < 20 OR c < 30)
/// ```
///
/// Any time the push-down reaches an operator that does not support push-down (such as OR), we
/// simply drop the NULL check. This way, the top-level NULL check only applies to
/// sub-expressions that can safely implement it, while ignoring other sub-expressions. The
/// unsupported sub-expressions could produce nulls at runtime that prevent skipping, but false
/// positives are OK -- the query will still correctly filter out the unwanted rows that result.
///
/// At expression evaluation time, a NULL value of `a` (from our example) would evaluate as:
///
/// ```text
/// AND(..., AND(a IS NOT NULL, 10 IS NOT NULL, a < 10), ...)
/// AND(..., AND(FALSE, TRUE, NULL), ...)
/// AND(..., FALSE, ...)
/// FALSE
/// ```
///
/// While a non-NULL value of `a` would instead evaluate as:
///
/// ```text
/// AND(..., AND(a IS NOT NULL, 10 IS NOT NULL, a < 10), ...)
/// AND(..., AND(TRUE, TRUE, <result>), ...)
/// AND(..., <result>, ...)
/// ```
///
/// And a missing value for `a` would safely disable the clause:
///
/// ```text
/// AND(..., AND(a IS NOT NULL, 10 IS NOT NULL, a < 10), ...)
/// AND(..., AND(NULL, TRUE, NULL), ...)
/// AND(..., NULL, ...)
/// ```
fn eval_sql_where(&self, filter: &Expr) -> Option<Self::Output> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay so we are moving this from ParquetStatsSkippingFilter to more general PredicateEvaluator and then replaced instances of needing the ParquetStatsSkippingFilter with just the a (Default)PredicateEvaluator and feeding it whatever stats data it needs?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly. There was nothing inherent to parquet stats skipping in the logic, that was just the place it happened to land first.

use Expr::{Binary, Variadic};
match filter {
Variadic(v) => {
// Recursively invoke `eval_sql_where` instead of the usual `eval_expr` for AND/OR.
let exprs = v.exprs.iter().map(|expr| self.eval_sql_where(expr));
self.finish_eval_variadic(v.op, exprs, false)
}
Binary(BinaryExpression { op, left, right }) if op.is_null_intolerant_comparison() => {
// Perform a nullsafe comparison instead of the usual `eval_binary`
let exprs = [
self.eval_unary(UnaryOperator::IsNull, left, true),
self.eval_unary(UnaryOperator::IsNull, right, true),
Comment on lines +365 to +366
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aside: ahh yea now I really feel the usefulness of our new APIs in #646.. I found myself having to think through the inverted=true cases here and below...

Copy link
Collaborator Author

@scovich scovich Jan 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this predicate eval code is a bit mind bending for sure. Inversion and generic Output type are really powerful but also a lot harder to grok than "normal" code.

I kept worrying about that while designing this code but:

  • I actually started with non-invertible versions everywhere, but the complexity blew up in a different way -- you had to manually enumerate all the cases in redundant code, which is way bulkier and way more error prone. I eventually gave up and introduced the inverted flag everywhere for a significant code simplification.
  • The generic output type allows to turn 2-3 very similar implementations into one, which vastly reduces duplication and bug surface (from getting one of the similar copies slightly wrong). Even in earlier versions of this PR, there were two implementations of the eval_sql_where logic -- one for boolean output and a different one for expression output. Eventually I realized they were logically equivalent, in spite of looking rather different, which is what allowed to hoist it all the way up to PredicateEvaluator and ditch the annoyingly separate SqlWherePredicateEvaluator trait I had been using.

So, given a choice between compact and robust but harder to understand, vs. easy to understand but redundant and error-prone... the former seemed like a net win in spite of the learning curve to new entrants.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

completely agree and if I could then turn this into a little request: I think this is super useful context, any change we could embed it into (doc)comment somewhere?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is already a doc comment on PredicateEvaluator about inversion, does it suffice or are there gaps?

/// # Inverted expression semantics
///
/// Because inversion (`NOT` operator) has special semantics and can often be optimized away by
/// pushing it down, most methods take an `inverted` flag. That allows operations like
/// [`UnaryOperator::Not`] to simply evaluate their operand with a flipped `inverted` flag.

I guess it doesn't directly speak to the complexity tho...

There is also a doc comment about the parametrized Output type on DataSkippingPredicateEvaluator:

/// The types involved in these operations are parameterized and implementation-specific. For
/// example, [`crate::engine::parquet_stats_skipping::ParquetStatsProvider`] directly evaluates data
/// skipping expressions and returns boolean results, while
/// [`crate::scan::data_skipping::DataSkippingPredicateCreator`] instead converts the input
/// predicate to a data skipping predicate that can be evaluated directly later.

Maybe I should move it to PredicateEvaluator?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did some editing, PTAL?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm!

self.eval_binary(*op, left, right, false),
];
self.finish_eval_variadic(VariadicOperator::And, exprs, false)
}
_ => self.eval_expr(filter, false),
}
}
}

/// A collection of provided methods from the [`PredicateEvaluator`] trait, factored out to allow
/// reuse by the different predicate evaluator implementations.
/// reuse by multiple bool-output predicate evaluator implementations.
pub(crate) struct PredicateEvaluatorDefaults;
impl PredicateEvaluatorDefaults {
/// Directly null-tests a scalar. See [`PredicateEvaluator::eval_scalar_is_null`].
pub(crate) fn eval_scalar_is_null(val: &Scalar, inverted: bool) -> Option<bool> {
Some(val.is_null() != inverted)
}

/// Directly evaluates a boolean scalar. See [`PredicateEvaluator::eval_scalar`].
pub(crate) fn eval_scalar(val: &Scalar, inverted: bool) -> Option<bool> {
match val {
Expand Down Expand Up @@ -326,6 +473,14 @@ impl ResolveColumnAsScalar for UnimplementedColumnResolver {
}
}

// Used internally and by some tests
pub(crate) struct EmptyColumnResolver;
impl ResolveColumnAsScalar for EmptyColumnResolver {
fn resolve_column(&self, _col: &ColumnName) -> Option<Scalar> {
None
}
}

// In testing, it is convenient to just build a hashmap of scalar values.
#[cfg(test)]
impl ResolveColumnAsScalar for std::collections::HashMap<ColumnName, Scalar> {
Expand Down Expand Up @@ -358,13 +513,17 @@ impl<R: ResolveColumnAsScalar + 'static> From<R> for DefaultPredicateEvaluator<R
impl<R: ResolveColumnAsScalar> PredicateEvaluator for DefaultPredicateEvaluator<R> {
type Output = bool;

fn eval_scalar_is_null(&self, val: &Scalar, inverted: bool) -> Option<bool> {
PredicateEvaluatorDefaults::eval_scalar_is_null(val, inverted)
}

fn eval_scalar(&self, val: &Scalar, inverted: bool) -> Option<bool> {
PredicateEvaluatorDefaults::eval_scalar(val, inverted)
}

fn eval_is_null(&self, col: &ColumnName, inverted: bool) -> Option<bool> {
let col = self.resolve_column(col)?;
Some(matches!(col, Scalar::Null(_)) != inverted)
self.eval_scalar_is_null(&col, inverted)
}

fn eval_lt(&self, col: &ColumnName, val: &Scalar) -> Option<bool> {
Expand Down Expand Up @@ -428,12 +587,6 @@ impl<R: ResolveColumnAsScalar> PredicateEvaluator for DefaultPredicateEvaluator<
/// example, comparisons involving a column are converted into comparisons over that column's
/// min/max stats, and NULL checks are converted into comparisons involving the column's nullcount
/// and rowcount stats.
///
/// The types involved in these operations are parameterized and implementation-specific. For
/// example, [`crate::engine::parquet_stats_skipping::ParquetStatsProvider`] directly evaluates data
/// skipping expressions and returnss boolean results, while
/// [`crate::scan::data_skipping::DataSkippingPredicateCreator`] instead converts the input
/// predicate to a data skipping predicate that can be evaluated directly later.
pub(crate) trait DataSkippingPredicateEvaluator {
/// The output type produced by this expression evaluator
type Output;
Expand All @@ -454,6 +607,9 @@ pub(crate) trait DataSkippingPredicateEvaluator {
/// Retrieves the row count of a column (parquet footers always include this stat).
fn get_rowcount_stat(&self) -> Option<Self::IntStat>;

/// See [`PredicateEvaluator::eval_scalar_is_null`]
fn eval_scalar_is_null(&self, val: &Scalar, inverted: bool) -> Option<Self::Output>;

/// See [`PredicateEvaluator::eval_scalar`]
fn eval_scalar(&self, val: &Scalar, inverted: bool) -> Option<Self::Output>;

Expand Down Expand Up @@ -589,6 +745,10 @@ pub(crate) trait DataSkippingPredicateEvaluator {
impl<T: DataSkippingPredicateEvaluator> PredicateEvaluator for T {
type Output = T::Output;

fn eval_scalar_is_null(&self, val: &Scalar, inverted: bool) -> Option<Self::Output> {
self.eval_scalar_is_null(val, inverted)
}

fn eval_scalar(&self, val: &Scalar, inverted: bool) -> Option<Self::Output> {
self.eval_scalar(val, inverted)
}
Expand Down
Loading
Loading