-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: expr analyzer for buffer to filter table chunks #25866
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,21 +9,38 @@ pub mod paths; | |
pub mod persister; | ||
pub mod write_buffer; | ||
|
||
use anyhow::Context; | ||
use async_trait::async_trait; | ||
use data_types::{NamespaceName, TimestampMinMax}; | ||
use datafusion::{catalog::Session, error::DataFusionError, prelude::Expr}; | ||
use datafusion::{ | ||
catalog::Session, | ||
common::{Column, DFSchema}, | ||
error::DataFusionError, | ||
execution::context::ExecutionProps, | ||
logical_expr::interval_arithmetic::Interval, | ||
physical_expr::{ | ||
analyze, create_physical_expr, | ||
utils::{Guarantee, LiteralGuarantee}, | ||
AnalysisContext, ExprBoundaries, | ||
}, | ||
prelude::Expr, | ||
scalar::ScalarValue, | ||
}; | ||
use hashbrown::{HashMap, HashSet}; | ||
use influxdb3_cache::{ | ||
distinct_cache::{CreateDistinctCacheArgs, DistinctCacheProvider}, | ||
last_cache::LastCacheProvider, | ||
}; | ||
use influxdb3_catalog::catalog::{Catalog, CatalogSequenceNumber, DatabaseSchema}; | ||
use influxdb3_catalog::catalog::{Catalog, CatalogSequenceNumber, DatabaseSchema, TableDefinition}; | ||
use influxdb3_id::{ColumnId, DbId, ParquetFileId, SerdeVecMap, TableId}; | ||
use influxdb3_wal::{ | ||
DistinctCacheDefinition, LastCacheDefinition, SnapshotSequenceNumber, Wal, | ||
WalFileSequenceNumber, | ||
}; | ||
use iox_query::QueryChunk; | ||
use iox_time::Time; | ||
use observability_deps::tracing::debug; | ||
use schema::{InfluxColumnType, TIME_COLUMN_NAME}; | ||
use serde::{Deserialize, Serialize}; | ||
use std::{fmt::Debug, sync::Arc, time::Duration}; | ||
use thiserror::Error; | ||
|
@@ -41,6 +58,9 @@ pub enum Error { | |
|
||
#[error("persister error: {0}")] | ||
Persister(#[from] persister::Error), | ||
|
||
#[error(transparent)] | ||
Anyhow(#[from] anyhow::Error), | ||
} | ||
|
||
pub type Result<T, E = Error> = std::result::Result<T, E>; | ||
|
@@ -90,7 +110,17 @@ pub trait Bufferer: Debug + Send + Sync + 'static { | |
fn wal(&self) -> Arc<dyn Wal>; | ||
|
||
/// Returns the parquet files for a given database and table | ||
fn parquet_files(&self, db_id: DbId, table_id: TableId) -> Vec<ParquetFile>; | ||
fn parquet_files(&self, db_id: DbId, table_id: TableId) -> Vec<ParquetFile> { | ||
self.parquet_files_filtered(db_id, table_id, &BufferFilter::default()) | ||
} | ||
|
||
/// Returns the parquet files for a given database and table that satisfy the given filter | ||
fn parquet_files_filtered( | ||
&self, | ||
db_id: DbId, | ||
table_id: TableId, | ||
filter: &BufferFilter, | ||
) -> Vec<ParquetFile>; | ||
|
||
/// A channel to watch for when new persisted snapshots are created | ||
fn watch_persisted_snapshots(&self) -> tokio::sync::watch::Receiver<Option<PersistedSnapshot>>; | ||
|
@@ -462,6 +492,194 @@ pub(crate) mod test_help { | |
} | ||
} | ||
|
||
/// A derived set of filters that are used to prune data in the buffer when serving queries | ||
#[derive(Debug, Default)] | ||
pub struct BufferFilter { | ||
time_lower_bound_ns: Option<i64>, | ||
time_upper_bound_ns: Option<i64>, | ||
guarantees: HashMap<ColumnId, BufferGuarantee>, | ||
} | ||
|
||
#[derive(Debug)] | ||
pub struct BufferGuarantee { | ||
pub guarantee: Guarantee, | ||
pub literals: HashSet<Arc<str>>, | ||
} | ||
|
||
impl BufferFilter { | ||
/// Create a new `BufferFilter` given a [`TableDefinition`] and set of filter [`Expr`]s from | ||
/// a logical query plan. | ||
/// | ||
/// This method analyzes the incoming `exprs` to do two things: | ||
/// | ||
/// - determine if there are any filters on the `time` column, in which case, attempt to derive | ||
/// an interval that defines the boundaries on `time` from the query. | ||
/// - determine if there are any [`LiteralGuarantee`]s on tag columns contained in the filter | ||
/// predicates of the query. | ||
pub fn new(table_def: &Arc<TableDefinition>, exprs: &[Expr]) -> Result<Self> { | ||
debug!(input = ?exprs, ">>> creating buffer filter"); | ||
let mut time_interval: Option<Interval> = None; | ||
let arrow_schema = table_def.schema.as_arrow(); | ||
let time_col_index = arrow_schema | ||
.fields() | ||
.iter() | ||
.position(|f| f.name() == TIME_COLUMN_NAME) | ||
.context("table should have a time column")?; | ||
let mut guarantees = HashMap::new(); | ||
|
||
// DF schema and execution properties used for handling physical expressions: | ||
let df_schema = DFSchema::try_from(Arc::clone(&arrow_schema)) | ||
.context("table schema was not able to convert to datafusion schema")?; | ||
let props = ExecutionProps::new(); | ||
|
||
for expr in exprs.iter().filter(|e| { | ||
// NOTE: filter out most expression types, as they are not relevant to time bound | ||
// analysis, or deriving literal guarantees on tag columns | ||
matches!( | ||
e, | ||
Expr::BinaryExpr(_) | Expr::Not(_) | Expr::Between(_) | Expr::InList(_) | ||
) | ||
}) { | ||
let Ok(physical_expr) = create_physical_expr(expr, &df_schema, &props) else { | ||
continue; | ||
}; | ||
// Check if the expression refers to the `time` column: | ||
if expr | ||
.column_refs() | ||
.contains(&Column::new_unqualified(TIME_COLUMN_NAME)) | ||
{ | ||
// Determine time bounds, if provided: | ||
let boundaries = ExprBoundaries::try_new_unbounded(&arrow_schema) | ||
.context("unable to create unbounded expr boundaries on incoming expression")?; | ||
let mut analysis = analyze( | ||
&physical_expr, | ||
AnalysisContext::new(boundaries), | ||
&arrow_schema, | ||
) | ||
.context("unable to analyze provided filters for a boundary on the time column")?; | ||
|
||
// Set the boundaries on the time column using the evaluated interval, if it exisxts | ||
// If an interval was already derived from a previous expression, we take their | ||
// intersection, or produce an error if: | ||
// - the derived intervals are not compatible (different types) | ||
// - the derived intervals do not intersect, this should be a user error, i.e., a | ||
// poorly formed query | ||
if let Some(ExprBoundaries { interval, .. }) = (time_col_index | ||
< analysis.boundaries.len()) | ||
.then_some(analysis.boundaries.remove(time_col_index)) | ||
{ | ||
if let Some(existing) = time_interval.take() { | ||
let intersection = existing.intersect(interval).context( | ||
"failed to derive a time interval from provided filters", | ||
)?.context("provided filters on time column did not produce a valid set of boundaries")?; | ||
time_interval.replace(intersection); | ||
} else { | ||
time_interval.replace(interval); | ||
} | ||
} | ||
} | ||
|
||
// Determine any literal guarantees made on tag columns: | ||
let literal_guarantees = LiteralGuarantee::analyze(&physical_expr); | ||
for LiteralGuarantee { | ||
column, | ||
guarantee, | ||
literals, | ||
} in literal_guarantees | ||
{ | ||
// We are only interested in literal guarantees on tag columns for the buffer index: | ||
let Some((column_id, InfluxColumnType::Tag)) = table_def | ||
.column_definition(column.name()) | ||
.map(|def| (def.id, def.data_type)) | ||
else { | ||
continue; | ||
}; | ||
|
||
// We are only interested in string literals with respect to tag columns: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is still true, the index is scoped to string fields or tags. |
||
let literals = literals | ||
.into_iter() | ||
.filter_map(|l| match l { | ||
ScalarValue::Utf8(Some(s)) | ScalarValue::Utf8View(Some(s)) => { | ||
Some(Arc::<str>::from(s.as_str())) | ||
} | ||
_ => None, | ||
}) | ||
.collect::<HashSet<Arc<str>>>(); | ||
|
||
if literals.is_empty() { | ||
continue; | ||
} | ||
|
||
// Update the guarantees on this column. We handle multiple guarantees here, i.e., | ||
// if there are multiple Expr's that lead to multiple guarantees on a given column. | ||
guarantees | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure how these work without pointers to the index. In Enterprise, the part that walks the expression tree for index matches pulls the list of file ids (i.e. the posting list) and then does actual intersection and unique with the list of files that match the expression. The resulting set of IDs are the ones that potentially apply to the query. I'm not quite understanding what this is doing without access to the posting list that Enterprise uses. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh wait, looking further down you have an actual row index in the buffer. Might be better to talk through this one on a call. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I had a row index in a long ago iteration of the buffer, but I don't think I brought it along with a number of different refactorings |
||
.entry(column_id) | ||
.and_modify(|e: &mut BufferGuarantee| { | ||
debug!(current = ?e.guarantee, incoming = ?guarantee, ">>> updating existing guarantee"); | ||
use Guarantee::*; | ||
match (e.guarantee, guarantee) { | ||
(In, In) | (NotIn, NotIn) => { | ||
e.literals = e.literals.union(&literals).cloned().collect() | ||
} | ||
(In, NotIn) => { | ||
e.literals = e.literals.difference(&literals).cloned().collect() | ||
} | ||
(NotIn, In) => { | ||
e.literals = literals.difference(&e.literals).cloned().collect() | ||
} | ||
} | ||
}) | ||
.or_insert(BufferGuarantee { | ||
guarantee, | ||
literals, | ||
}); | ||
debug!(?guarantees, ">>> updated guarantees"); | ||
} | ||
} | ||
|
||
// Determine the lower and upper bound from the derived interval on time: | ||
// TODO: we may open this up more to other scalar types, e.g., other timestamp types | ||
// depending on how users define time bounds. | ||
let (time_lower_bound_ns, time_upper_bound_ns) = if let Some(i) = time_interval { | ||
let low = if let ScalarValue::TimestampNanosecond(Some(l), _) = i.lower() { | ||
Some(*l) | ||
} else { | ||
None | ||
}; | ||
let high = if let ScalarValue::TimestampNanosecond(Some(h), _) = i.upper() { | ||
Some(*h) | ||
} else { | ||
None | ||
}; | ||
|
||
(low, high) | ||
} else { | ||
(None, None) | ||
}; | ||
|
||
Ok(Self { | ||
time_lower_bound_ns, | ||
time_upper_bound_ns, | ||
guarantees, | ||
}) | ||
} | ||
|
||
/// Test a `min` and `max` time against this filter to check if the range they define overlaps | ||
/// with the range defined by the bounds in this filter. | ||
pub fn test_time_stamp_min_max(&self, min_time_ns: i64, max_time_ns: i64) -> bool { | ||
match (self.time_lower_bound_ns, self.time_upper_bound_ns) { | ||
(None, None) => true, | ||
(None, Some(u)) => min_time_ns <= u, | ||
(Some(l), None) => max_time_ns >= l, | ||
(Some(l), Some(u)) => min_time_ns <= u && max_time_ns >= l, | ||
} | ||
} | ||
|
||
pub fn guarantees(&self) -> impl Iterator<Item = (&ColumnId, &BufferGuarantee)> { | ||
self.guarantees.iter() | ||
} | ||
} | ||
|
||
#[cfg(test)] | ||
mod tests { | ||
use influxdb3_catalog::catalog::CatalogSequenceNumber; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should use the index columns from the table definition, which may or may not be tags and may not be the entire set of tags.