Skip to content

Commit

Permalink
feat: expr analyzer for buffer to filter table chunks
Browse files Browse the repository at this point in the history
  • Loading branch information
hiltontj committed Jan 20, 2025
1 parent 1d8d3d6 commit 2eef092
Show file tree
Hide file tree
Showing 4 changed files with 542 additions and 341 deletions.
168 changes: 166 additions & 2 deletions influxdb3_write/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,37 @@ pub mod paths;
pub mod persister;
pub mod write_buffer;

use anyhow::Context;
use async_trait::async_trait;
use data_types::{NamespaceName, TimestampMinMax};
use datafusion::{catalog::Session, error::DataFusionError, prelude::Expr};
use datafusion::{
catalog::Session,
common::{Column, DFSchema},
error::DataFusionError,
execution::context::ExecutionProps,
physical_expr::{
analyze, create_physical_expr,
utils::{Guarantee, LiteralGuarantee},
AnalysisContext, ExprBoundaries,
},
prelude::Expr,
scalar::ScalarValue,
};
use hashbrown::{HashMap, HashSet};
use influxdb3_cache::{
distinct_cache::{CreateDistinctCacheArgs, DistinctCacheProvider},
last_cache::LastCacheProvider,
};
use influxdb3_catalog::catalog::{Catalog, CatalogSequenceNumber, DatabaseSchema};
use influxdb3_catalog::catalog::{Catalog, CatalogSequenceNumber, DatabaseSchema, TableDefinition};
use influxdb3_id::{ColumnId, DbId, ParquetFileId, SerdeVecMap, TableId};
use influxdb3_wal::{
DistinctCacheDefinition, LastCacheDefinition, SnapshotSequenceNumber, Wal,
WalFileSequenceNumber,
};
use iox_query::QueryChunk;
use iox_time::Time;
use observability_deps::tracing::{debug, info, warn};
use schema::{InfluxColumnType, TIME_COLUMN_NAME};
use serde::{Deserialize, Serialize};
use std::{fmt::Debug, sync::Arc, time::Duration};
use thiserror::Error;
Expand All @@ -41,6 +57,9 @@ pub enum Error {

#[error("persister error: {0}")]
Persister(#[from] persister::Error),

#[error(transparent)]
Anyhow(#[from] anyhow::Error),
}

pub type Result<T, E = Error> = std::result::Result<T, E>;
Expand Down Expand Up @@ -462,6 +481,151 @@ pub(crate) mod test_help {
}
}

#[derive(Debug, Default)]
pub struct BufferFilter {
time_lower_bound: Option<i64>,
time_upper_bound: Option<i64>,
guarantees: HashMap<ColumnId, BufferGuarantee>,
}

#[derive(Debug)]
pub struct BufferGuarantee {
pub guarantee: Guarantee,
pub literals: HashSet<Arc<str>>,
}

impl BufferFilter {
pub fn generate(table_def: &Arc<TableDefinition>, exprs: &[Expr]) -> Result<Self> {
let mut time_lower_bound = None;
let mut time_upper_bound = None;
let arrow_schema = table_def.schema.as_arrow();
let mut guarantees = HashMap::new();
let df_schema = DFSchema::try_from(Arc::clone(&arrow_schema))
.context("table schema was not able to convert to datafusion schema")?;
let props = ExecutionProps::new();
info!(?exprs, "analyzing expressions");
for expr in exprs.iter().filter(|e| match e {
Expr::BinaryExpr(_) | Expr::Not(_) | Expr::Between(_) | Expr::InList(_) => true,
_ => false,
}) {
let Ok(physical_expr) = create_physical_expr(expr, &df_schema, &props) else {
continue;
};
if expr
.column_refs()
.contains(&Column::new_unqualified(TIME_COLUMN_NAME))
{
debug!(">>> has time col expr");
let time_col_index = arrow_schema
.fields()
.iter()
.position(|f| f.name() == TIME_COLUMN_NAME)
.expect("table should have a time column");
// Determine time bounds, if provided:
let boundaries = ExprBoundaries::try_new_unbounded(&arrow_schema)
.context("unable to create unbounded expr boundaries on incoming expression")?;
let analysis = analyze(
&physical_expr,
AnalysisContext::new(boundaries),
&arrow_schema,
)
.inspect_err(|error| {
warn!(?physical_expr, ?arrow_schema, ?error, "failed to analyze")
})
.context("unable to analyze provided filters")?;
// Set the time boundaries by the analyzed expression, if they have not already been
// set. If they have been set, we remove the bounds, because it is not clear how to
// evaluate multiple intervals.
if let Some(ExprBoundaries { interval, .. }) =
analysis.boundaries.get(time_col_index)
{
debug!(?interval, ">>> got the interval");
if let ScalarValue::TimestampNanosecond(Some(lower), _) = interval.lower() {
if let None = time_lower_bound.take() {
time_lower_bound.replace(*lower);
}
}
if let ScalarValue::TimestampNanosecond(Some(upper), _) = interval.upper() {
if let None = time_upper_bound.take() {
time_upper_bound.replace(*upper);
}
}
}
}

// Determine any literal guarantees made on tag columns:
let literal_guarantees = LiteralGuarantee::analyze(&physical_expr);
for LiteralGuarantee {
column,
guarantee,
literals,
} in literal_guarantees
{
// NOTE: only retaining string literals for matching
// on tag columns for the buffer index:
let Some((column_id, InfluxColumnType::Tag)) = table_def
.column_definition(column.name())
.map(|def| (def.id, def.data_type))
else {
continue;
};
let literals = literals
.into_iter()
.filter_map(|l| match l {
ScalarValue::Utf8(Some(s)) | ScalarValue::Utf8View(Some(s)) => {
Some(Arc::<str>::from(s.as_str()))
}
_ => None,
})
.collect::<HashSet<Arc<str>>>();
guarantees
.entry(column_id)
.and_modify(|e: &mut BufferGuarantee| {
// NOTE: it seems unlikely that there would be
// multiple literal guarantees on a single
// column from the Expr set. But we handle
// that here:
use Guarantee::*;
match (e.guarantee, guarantee) {
(In, In) | (NotIn, NotIn) => {
e.literals = e.literals.union(&literals).cloned().collect()
}
(In, NotIn) => {
e.literals = e.literals.difference(&literals).cloned().collect()
}
(NotIn, In) => {
e.literals = literals.difference(&e.literals).cloned().collect()
}
}
})
.or_insert(BufferGuarantee {
guarantee,
literals,
});
}
}

Ok(Self {
time_lower_bound,
time_upper_bound,
guarantees,
})
}

pub fn test_time_stamp_min_max(&self, min: i64, max: i64) -> bool {
match (self.time_lower_bound, self.time_upper_bound) {
(None, None) => true,
(None, Some(u)) => min <= u,
(Some(l), None) => max >= l,
(Some(l), Some(u)) => min <= u && max >= l,
}
}

pub fn guarantees(&self) -> impl Iterator<Item = (&ColumnId, &BufferGuarantee)> {
self.guarantees.iter()
}
}

#[cfg(test)]
mod tests {
use influxdb3_catalog::catalog::CatalogSequenceNumber;
Expand Down
29 changes: 17 additions & 12 deletions influxdb3_write/src/write_buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ use crate::write_buffer::queryable_buffer::QueryableBuffer;
use crate::write_buffer::validator::WriteValidator;
use crate::{chunk::ParquetChunk, DatabaseManager};
use crate::{
BufferedWriteRequest, Bufferer, ChunkContainer, DistinctCacheManager, LastCacheManager,
ParquetFile, PersistedSnapshot, Precision, WriteBuffer, WriteLineError,
BufferFilter, BufferedWriteRequest, Bufferer, ChunkContainer, DistinctCacheManager,
LastCacheManager, ParquetFile, PersistedSnapshot, Precision, WriteBuffer, WriteLineError,
};
use async_trait::async_trait;
use data_types::{
Expand Down Expand Up @@ -319,30 +319,35 @@ impl WriteBufferImpl {
DataFusionError::Execution(format!("database {} not found", database_name))
})?;

let (table_id, table_schema) =
db_schema.table_id_and_schema(table_name).ok_or_else(|| {
DataFusionError::Execution(format!(
"table {} not found in db {}",
table_name, database_name
))
})?;
let table_def = db_schema.table_definition(table_name).ok_or_else(|| {
DataFusionError::Execution(format!(
"table {} not found in db {}",
table_name, database_name
))
})?;

let buffer_filter = BufferFilter::generate(&table_def, filters)
.inspect_err(|error| warn!(?error, "filter gen failed"))
.map_err(|error| DataFusionError::External(Box::new(error)))?;

let mut chunks = self.buffer.get_table_chunks(
Arc::clone(&db_schema),
table_name,
filters,
&buffer_filter,
projection,
ctx,
)?;

let parquet_files = self.persisted_files.get_files(db_schema.id, table_id);
let parquet_files = self
.persisted_files
.get_files(db_schema.id, table_def.table_id);

let mut chunk_order = chunks.len() as i64;

for parquet_file in parquet_files {
let parquet_chunk = parquet_chunk_from_file(
&parquet_file,
&table_schema,
&table_def.schema,
self.persister.object_store_url().clone(),
self.persister.object_store(),
chunk_order,
Expand Down
7 changes: 3 additions & 4 deletions influxdb3_write/src/write_buffer/queryable_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::paths::ParquetFilePath;
use crate::persister::Persister;
use crate::write_buffer::persisted_files::PersistedFiles;
use crate::write_buffer::table_buffer::TableBuffer;
use crate::{ParquetFile, ParquetFileId, PersistedSnapshot};
use crate::{BufferFilter, ParquetFile, ParquetFileId, PersistedSnapshot};
use anyhow::Context;
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
Expand All @@ -13,7 +13,6 @@ use data_types::{
};
use datafusion::catalog::Session;
use datafusion::common::DataFusionError;
use datafusion::logical_expr::Expr;
use datafusion_util::stream_from_batches;
use hashbrown::HashMap;
use influxdb3_cache::distinct_cache::DistinctCacheProvider;
Expand Down Expand Up @@ -100,7 +99,7 @@ impl QueryableBuffer {
&self,
db_schema: Arc<DatabaseSchema>,
table_name: &str,
filters: &[Expr],
buffer_filter: &BufferFilter,
_projection: Option<&Vec<usize>>,
_ctx: &dyn Session,
) -> Result<Vec<Arc<dyn QueryChunk>>, DataFusionError> {
Expand All @@ -120,7 +119,7 @@ impl QueryableBuffer {
};

Ok(table_buffer
.partitioned_record_batches(Arc::clone(&table_def), filters)
.partitioned_record_batches(Arc::clone(&table_def), buffer_filter)
.map_err(|e| DataFusionError::Execution(format!("error getting batches {}", e)))?
.into_iter()
.filter(|(_, (ts_min_max, _))| {
Expand Down
Loading

0 comments on commit 2eef092

Please sign in to comment.