diff --git a/influxdb3_write/src/lib.rs b/influxdb3_write/src/lib.rs index a4f7cc2b33d..f060533781b 100644 --- a/influxdb3_write/src/lib.rs +++ b/influxdb3_write/src/lib.rs @@ -9,14 +9,29 @@ pub mod paths; pub mod persister; pub mod write_buffer; +use anyhow::Context; use async_trait::async_trait; use data_types::{NamespaceName, TimestampMinMax}; -use datafusion::{catalog::Session, error::DataFusionError, prelude::Expr}; +use datafusion::{ + catalog::Session, + common::{Column, DFSchema}, + error::DataFusionError, + execution::context::ExecutionProps, + logical_expr::interval_arithmetic::Interval, + physical_expr::{ + analyze, create_physical_expr, + utils::{Guarantee, LiteralGuarantee}, + AnalysisContext, ExprBoundaries, + }, + prelude::Expr, + scalar::ScalarValue, +}; +use hashbrown::{HashMap, HashSet}; use influxdb3_cache::{ distinct_cache::{CreateDistinctCacheArgs, DistinctCacheProvider}, last_cache::LastCacheProvider, }; -use influxdb3_catalog::catalog::{Catalog, CatalogSequenceNumber, DatabaseSchema}; +use influxdb3_catalog::catalog::{Catalog, CatalogSequenceNumber, DatabaseSchema, TableDefinition}; use influxdb3_id::{ColumnId, DbId, ParquetFileId, SerdeVecMap, TableId}; use influxdb3_wal::{ DistinctCacheDefinition, LastCacheDefinition, SnapshotSequenceNumber, Wal, @@ -24,6 +39,7 @@ use influxdb3_wal::{ }; use iox_query::QueryChunk; use iox_time::Time; +use schema::{InfluxColumnType, TIME_COLUMN_NAME}; use serde::{Deserialize, Serialize}; use std::{fmt::Debug, sync::Arc, time::Duration}; use thiserror::Error; @@ -41,6 +57,9 @@ pub enum Error { #[error("persister error: {0}")] Persister(#[from] persister::Error), + + #[error(transparent)] + Anyhow(#[from] anyhow::Error), } pub type Result = std::result::Result; @@ -90,7 +109,16 @@ pub trait Bufferer: Debug + Send + Sync + 'static { fn wal(&self) -> Arc; /// Returns the parquet files for a given database and table - fn parquet_files(&self, db_id: DbId, table_id: TableId) -> Vec; + fn parquet_files(&self, db_id: DbId, table_id: TableId) -> Vec { + self.parquet_files_filtered(db_id, table_id, &BufferFilter::default()) + } + + fn parquet_files_filtered( + &self, + db_id: DbId, + table_id: TableId, + filter: &BufferFilter, + ) -> Vec; /// A channel to watch for when new persisted snapshots are created fn watch_persisted_snapshots(&self) -> tokio::sync::watch::Receiver>; @@ -462,6 +490,193 @@ pub(crate) mod test_help { } } +/// A derived set of filters that are used to prune data in the buffer when serving queries +#[derive(Debug, Default)] +pub struct BufferFilter { + time_lower_bound: Option, + time_upper_bound: Option, + guarantees: HashMap, +} + +#[derive(Debug)] +pub struct BufferGuarantee { + pub guarantee: Guarantee, + pub literals: HashSet>, +} + +impl BufferFilter { + /// Create a new `BufferFilter` given a [`TableDefinition`] and set of filter [`Expr`]s from + /// a logical query plan. + /// + /// This method analyzes the incoming `exprs` to do two things: + /// + /// - determine if there are any filters on the `time` column, in which case, attempt to derive + /// an interval that defines the boundaries on `time` from the query. + /// - determine if there are any _literal guarantees_ on tag columns contained in the filter + /// predicates of the query. + pub fn new(table_def: &Arc, exprs: &[Expr]) -> Result { + let mut time_interval: Option = None; + let arrow_schema = table_def.schema.as_arrow(); + let time_col_index = arrow_schema + .fields() + .iter() + .position(|f| f.name() == TIME_COLUMN_NAME) + .context("table should have a time column")?; + let mut guarantees = HashMap::new(); + + // DF schema and execution properties used for handling physical expressions: + let df_schema = DFSchema::try_from(Arc::clone(&arrow_schema)) + .context("table schema was not able to convert to datafusion schema")?; + let props = ExecutionProps::new(); + + for expr in exprs.iter().filter(|e| { + // NOTE: filter out most expression types, as they are not relevant to time bound + // analysis, or deriving literal guarantees on tag columns + matches!( + e, + Expr::BinaryExpr(_) | Expr::Not(_) | Expr::Between(_) | Expr::InList(_) + ) + }) { + let Ok(physical_expr) = create_physical_expr(expr, &df_schema, &props) else { + continue; + }; + // Check if the expression refers to the `time` column: + if expr + .column_refs() + .contains(&Column::new_unqualified(TIME_COLUMN_NAME)) + { + // Determine time bounds, if provided: + let boundaries = ExprBoundaries::try_new_unbounded(&arrow_schema) + .context("unable to create unbounded expr boundaries on incoming expression")?; + let mut analysis = analyze( + &physical_expr, + AnalysisContext::new(boundaries), + &arrow_schema, + ) + .context("unable to analyze provided filters for a boundary on the time column")?; + + // Set the boundaries on the time column using the evaluated interval, if it exisxts + // If an interval was already derived from a previous expression, we take their + // intersection, or produce an error if: + // - the derived intervals are not compatible (different types) + // - the derived intervals do not intersect, this should be a user error, i.e., a + // poorly formed query + if let Some(ExprBoundaries { interval, .. }) = (time_col_index + < analysis.boundaries.len()) + .then_some(analysis.boundaries.remove(time_col_index)) + { + if let Some(existing) = time_interval.take() { + let intersection = existing.intersect(interval).context( + "failed to derive a time interval from provided filters", + )?.context("provided filters on time column did not produce a valid set of boundaries")?; + time_interval.replace(intersection); + } else { + time_interval.replace(interval); + } + } + } + + // Determine any literal guarantees made on tag columns: + let literal_guarantees = LiteralGuarantee::analyze(&physical_expr); + for LiteralGuarantee { + column, + guarantee, + literals, + } in literal_guarantees + { + // We are only interested in literal guarantees on tag columns for the buffer index: + let Some((column_id, InfluxColumnType::Tag)) = table_def + .column_definition(column.name()) + .map(|def| (def.id, def.data_type)) + else { + continue; + }; + + // We are only interested in string literals with respect to tag columns: + let literals = literals + .into_iter() + .filter_map(|l| match l { + ScalarValue::Utf8(Some(s)) | ScalarValue::Utf8View(Some(s)) => { + Some(Arc::::from(s.as_str())) + } + _ => None, + }) + .collect::>>(); + + if literals.is_empty() { + continue; + } + + // Update the guarantees on this column. We handle multiple guarantees here, i.e., + // if there are multiple Expr's that lead to multiple guarantees. + // + // NOTE: it may not be likely that there would be multiple literal guarantees on a + // single column from the Expr set, as DataFusion simplifies incoming expressions. + guarantees + .entry(column_id) + .and_modify(|e: &mut BufferGuarantee| { + use Guarantee::*; + match (e.guarantee, guarantee) { + (In, In) | (NotIn, NotIn) => { + e.literals = e.literals.union(&literals).cloned().collect() + } + (In, NotIn) => { + e.literals = e.literals.difference(&literals).cloned().collect() + } + (NotIn, In) => { + e.literals = literals.difference(&e.literals).cloned().collect() + } + } + }) + .or_insert(BufferGuarantee { + guarantee, + literals, + }); + } + } + + // Determine the lower and upper bound from the derived interval on time: + let (time_lower_bound, time_upper_bound) = if let Some(i) = time_interval { + let low = if let ScalarValue::TimestampNanosecond(Some(l), _) = i.lower() { + Some(*l) + } else { + None + }; + let high = if let ScalarValue::TimestampNanosecond(Some(h), _) = i.upper() { + Some(*h) + } else { + None + }; + + (low, high) + } else { + (None, None) + }; + + // Only hold onto guarantees that have literals in them: + guarantees.retain(|_, g| !g.literals.is_empty()); + + Ok(Self { + time_lower_bound, + time_upper_bound, + guarantees, + }) + } + + pub fn test_time_stamp_min_max(&self, min: i64, max: i64) -> bool { + match (self.time_lower_bound, self.time_upper_bound) { + (None, None) => true, + (None, Some(u)) => min <= u, + (Some(l), None) => max >= l, + (Some(l), Some(u)) => min <= u && max >= l, + } + } + + pub fn guarantees(&self) -> impl Iterator { + self.guarantees.iter() + } +} + #[cfg(test)] mod tests { use influxdb3_catalog::catalog::CatalogSequenceNumber; diff --git a/influxdb3_write/src/write_buffer/mod.rs b/influxdb3_write/src/write_buffer/mod.rs index b4456d36a77..cf9d9deba58 100644 --- a/influxdb3_write/src/write_buffer/mod.rs +++ b/influxdb3_write/src/write_buffer/mod.rs @@ -12,8 +12,8 @@ use crate::write_buffer::queryable_buffer::QueryableBuffer; use crate::write_buffer::validator::WriteValidator; use crate::{chunk::ParquetChunk, DatabaseManager}; use crate::{ - BufferedWriteRequest, Bufferer, ChunkContainer, DistinctCacheManager, LastCacheManager, - ParquetFile, PersistedSnapshot, Precision, WriteBuffer, WriteLineError, + BufferFilter, BufferedWriteRequest, Bufferer, ChunkContainer, DistinctCacheManager, + LastCacheManager, ParquetFile, PersistedSnapshot, Precision, WriteBuffer, WriteLineError, }; use async_trait::async_trait; use data_types::{ @@ -319,30 +319,35 @@ impl WriteBufferImpl { DataFusionError::Execution(format!("database {} not found", database_name)) })?; - let (table_id, table_schema) = - db_schema.table_id_and_schema(table_name).ok_or_else(|| { - DataFusionError::Execution(format!( - "table {} not found in db {}", - table_name, database_name - )) - })?; + let table_def = db_schema.table_definition(table_name).ok_or_else(|| { + DataFusionError::Execution(format!( + "table {} not found in db {}", + table_name, database_name + )) + })?; + + let buffer_filter = BufferFilter::new(&table_def, filters) + .inspect_err(|error| warn!(?error, "buffer filter generation failed")) + .map_err(|error| DataFusionError::External(Box::new(error)))?; let mut chunks = self.buffer.get_table_chunks( Arc::clone(&db_schema), table_name, - filters, + &buffer_filter, projection, ctx, )?; - let parquet_files = self.persisted_files.get_files(db_schema.id, table_id); + let parquet_files = + self.persisted_files + .get_files(db_schema.id, table_def.table_id, &buffer_filter); let mut chunk_order = chunks.len() as i64; for parquet_file in parquet_files { let parquet_chunk = parquet_chunk_from_file( &parquet_file, - &table_schema, + &table_def.schema, self.persister.object_store_url().clone(), self.persister.object_store(), chunk_order, @@ -427,8 +432,13 @@ impl Bufferer for WriteBufferImpl { Arc::clone(&self.wal) } - fn parquet_files(&self, db_id: DbId, table_id: TableId) -> Vec { - self.buffer.persisted_parquet_files(db_id, table_id) + fn parquet_files_filtered( + &self, + db_id: DbId, + table_id: TableId, + filter: &BufferFilter, + ) -> Vec { + self.buffer.persisted_parquet_files(db_id, table_id, filter) } fn watch_persisted_snapshots(&self) -> Receiver> { @@ -2092,7 +2102,9 @@ mod tests { verify_snapshot_count(1, &wbuf.persister).await; // get the path for the created parquet file: - let persisted_files = wbuf.persisted_files().get_files(db_id, tbl_id); + let persisted_files = + wbuf.persisted_files() + .get_files(db_id, tbl_id, &BufferFilter::default()); assert_eq!(1, persisted_files.len()); let path = ObjPath::from(persisted_files[0].path.as_str()); @@ -2198,7 +2210,9 @@ mod tests { verify_snapshot_count(1, &wbuf.persister).await; // get the path for the created parquet file: - let persisted_files = wbuf.persisted_files().get_files(db_id, tbl_id); + let persisted_files = + wbuf.persisted_files() + .get_files(db_id, tbl_id, &BufferFilter::default()); assert_eq!(1, persisted_files.len()); let path = ObjPath::from(persisted_files[0].path.as_str()); diff --git a/influxdb3_write/src/write_buffer/persisted_files.rs b/influxdb3_write/src/write_buffer/persisted_files.rs index e43ca76dd4e..3b724998cdf 100644 --- a/influxdb3_write/src/write_buffer/persisted_files.rs +++ b/influxdb3_write/src/write_buffer/persisted_files.rs @@ -2,6 +2,7 @@ //! When queries come in they will combine whatever chunks exist from `QueryableBuffer` with //! the persisted files to get the full set of data to query. +use crate::BufferFilter; use crate::{ParquetFile, PersistedSnapshot}; use hashbrown::HashMap; use influxdb3_id::DbId; @@ -47,7 +48,12 @@ impl PersistedFiles { } /// Get the list of files for a given database and table, always return in descending order of min_time - pub fn get_files(&self, db_id: DbId, table_id: TableId) -> Vec { + pub fn get_files( + &self, + db_id: DbId, + table_id: TableId, + filter: &BufferFilter, + ) -> Vec { let three_days_ago = (self.time_provider.now() - crate::THREE_DAYS).timestamp_nanos(); let mut files = { let inner = self.inner.read(); @@ -58,7 +64,8 @@ impl PersistedFiles { .cloned() .unwrap_or_default() .into_iter() - .filter(|file| dbg!(file.min_time) > dbg!(three_days_ago)) + .filter(|file| filter.test_time_stamp_min_max(file.min_time, file.max_time)) + .filter(|file| file.min_time > three_days_ago) .collect::>() }; @@ -169,12 +176,18 @@ fn update_persisted_files_with_snapshot( #[cfg(test)] mod tests { + use datafusion::prelude::col; + use datafusion::prelude::lit_timestamp_nano; + use datafusion::prelude::Expr; use influxdb3_catalog::catalog::CatalogSequenceNumber; + use influxdb3_catalog::catalog::TableDefinition; + use influxdb3_id::ColumnId; use influxdb3_wal::{SnapshotSequenceNumber, WalFileSequenceNumber}; use iox_time::MockProvider; use iox_time::Time; use observability_deps::tracing::info; use pretty_assertions::assert_eq; + use schema::InfluxColumnType; use crate::ParquetFileId; @@ -262,6 +275,125 @@ mod tests { assert_eq!(150, row_count); } + #[test] + fn test_get_files_with_filters() { + let parquet_files = (0..100) + .step_by(10) + .map(|i| { + let chunk_time = i; + ParquetFile { + id: ParquetFileId::new(), + path: format!("/path/{i:03}.parquet"), + size_bytes: 1, + row_count: 1, + chunk_time, + min_time: chunk_time, + max_time: chunk_time + 10, + } + }) + .collect(); + let persisted_snapshots = vec![build_snapshot(parquet_files, 0, 0, 0)]; + let persisted_files = PersistedFiles::new_from_persisted_snapshots( + Arc::new(MockProvider::new(Time::from_timestamp_nanos(0))), + persisted_snapshots, + ); + + struct TestCase<'a> { + filter: &'a [Expr], + expected_n_files: usize, + } + + let test_cases = [ + TestCase { + filter: &[], + expected_n_files: 10, + }, + TestCase { + filter: &[col("time").gt(lit_timestamp_nano(0))], + expected_n_files: 10, + }, + TestCase { + filter: &[col("time").gt(lit_timestamp_nano(50))], + expected_n_files: 5, + }, + TestCase { + filter: &[col("time").gt(lit_timestamp_nano(90))], + expected_n_files: 1, + }, + TestCase { + filter: &[col("time").gt(lit_timestamp_nano(100))], + expected_n_files: 0, + }, + TestCase { + filter: &[col("time").lt(lit_timestamp_nano(100))], + expected_n_files: 10, + }, + TestCase { + filter: &[col("time").lt(lit_timestamp_nano(50))], + expected_n_files: 5, + }, + TestCase { + filter: &[col("time").lt(lit_timestamp_nano(10))], + expected_n_files: 1, + }, + TestCase { + filter: &[col("time").lt(lit_timestamp_nano(0))], + expected_n_files: 0, + }, + TestCase { + filter: &[col("time") + .gt(lit_timestamp_nano(20)) + .and(col("time").lt(lit_timestamp_nano(40)))], + expected_n_files: 2, + }, + TestCase { + filter: &[col("time") + .gt(lit_timestamp_nano(20)) + .and(col("time").lt(lit_timestamp_nano(30)))], + expected_n_files: 1, + }, + TestCase { + filter: &[col("time") + .gt(lit_timestamp_nano(21)) + .and(col("time").lt(lit_timestamp_nano(29)))], + expected_n_files: 1, + }, + TestCase { + filter: &[col("time") + .gt(lit_timestamp_nano(0)) + .and(col("time").lt(lit_timestamp_nano(100)))], + expected_n_files: 10, + }, + ]; + + let table_def = Arc::new( + TableDefinition::new( + TableId::from(0), + "test-tbl".into(), + vec![( + ColumnId::from(0), + "time".into(), + InfluxColumnType::Timestamp, + )], + vec![], + ) + .unwrap(), + ); + + for t in test_cases { + let filter = BufferFilter::new(&table_def, t.filter).unwrap(); + let filtered_files = + persisted_files.get_files(DbId::from(0), TableId::from(0), &filter); + assert_eq!( + t.expected_n_files, + filtered_files.len(), + "wrong number of filtered files:\n\ + result: {filtered_files:?}\n\ + filter provided: {filter:?}" + ); + } + } + fn build_persisted_snapshots() -> Vec { let mut all_persisted_snapshot_files = Vec::new(); let parquet_files_1 = build_parquet_files(5); diff --git a/influxdb3_write/src/write_buffer/queryable_buffer.rs b/influxdb3_write/src/write_buffer/queryable_buffer.rs index 43c0f67080a..585d152037a 100644 --- a/influxdb3_write/src/write_buffer/queryable_buffer.rs +++ b/influxdb3_write/src/write_buffer/queryable_buffer.rs @@ -3,7 +3,7 @@ use crate::paths::ParquetFilePath; use crate::persister::Persister; use crate::write_buffer::persisted_files::PersistedFiles; use crate::write_buffer::table_buffer::TableBuffer; -use crate::{ParquetFile, ParquetFileId, PersistedSnapshot}; +use crate::{BufferFilter, ParquetFile, ParquetFileId, PersistedSnapshot}; use anyhow::Context; use arrow::record_batch::RecordBatch; use async_trait::async_trait; @@ -13,7 +13,6 @@ use data_types::{ }; use datafusion::catalog::Session; use datafusion::common::DataFusionError; -use datafusion::logical_expr::Expr; use datafusion_util::stream_from_batches; use hashbrown::HashMap; use influxdb3_cache::distinct_cache::DistinctCacheProvider; @@ -100,7 +99,7 @@ impl QueryableBuffer { &self, db_schema: Arc, table_name: &str, - filters: &[Expr], + buffer_filter: &BufferFilter, _projection: Option<&Vec>, _ctx: &dyn Session, ) -> Result>, DataFusionError> { @@ -120,7 +119,7 @@ impl QueryableBuffer { }; Ok(table_buffer - .partitioned_record_batches(Arc::clone(&table_def), filters) + .partitioned_record_batches(Arc::clone(&table_def), buffer_filter) .map_err(|e| DataFusionError::Execution(format!("error getting batches {}", e)))? .into_iter() .filter(|(_, (ts_min_max, _))| { @@ -416,8 +415,13 @@ impl QueryableBuffer { receiver } - pub fn persisted_parquet_files(&self, db_id: DbId, table_id: TableId) -> Vec { - self.persisted_files.get_files(db_id, table_id) + pub fn persisted_parquet_files( + &self, + db_id: DbId, + table_id: TableId, + filter: &BufferFilter, + ) -> Vec { + self.persisted_files.get_files(db_id, table_id, filter) } pub fn persisted_snapshot_notify_rx( @@ -877,9 +881,11 @@ mod tests { // validate we have a single persisted file let db = catalog.db_schema("testdb").unwrap(); let table = db.table_definition("foo").unwrap(); - let files = queryable_buffer - .persisted_files - .get_files(db.id, table.table_id); + let files = queryable_buffer.persisted_files.get_files( + db.id, + table.table_id, + &BufferFilter::default(), + ); assert_eq!(files.len(), 1); // now force another snapshot, persisting the data to parquet file @@ -908,9 +914,11 @@ mod tests { .unwrap(); // validate we have two persisted files - let files = queryable_buffer - .persisted_files - .get_files(db.id, table.table_id); + let files = queryable_buffer.persisted_files.get_files( + db.id, + table.table_id, + &BufferFilter::default(), + ); assert_eq!(files.len(), 2); } } diff --git a/influxdb3_write/src/write_buffer/table_buffer.rs b/influxdb3_write/src/write_buffer/table_buffer.rs index d1ca35764c7..9304dcbbcc4 100644 --- a/influxdb3_write/src/write_buffer/table_buffer.rs +++ b/influxdb3_write/src/write_buffer/table_buffer.rs @@ -8,20 +8,22 @@ use arrow::array::{ use arrow::datatypes::{GenericStringType, Int32Type}; use arrow::record_batch::RecordBatch; use data_types::TimestampMinMax; -use datafusion::logical_expr::{BinaryExpr, Expr}; -use hashbrown::HashMap; +use datafusion::physical_expr::utils::Guarantee; +use hashbrown::{HashMap, HashSet}; use influxdb3_catalog::catalog::TableDefinition; use influxdb3_id::ColumnId; use influxdb3_wal::{FieldData, Row}; -use observability_deps::tracing::{debug, error}; +use observability_deps::tracing::error; use schema::sort::SortKey; use schema::{InfluxColumnType, InfluxFieldType, Schema, SchemaBuilder}; use std::collections::btree_map::Entry; -use std::collections::{BTreeMap, HashSet}; +use std::collections::BTreeMap; use std::mem::size_of; use std::sync::Arc; use thiserror::Error; +use crate::{BufferFilter, BufferGuarantee}; + #[derive(Debug, Error)] pub enum Error { #[error("Field not found in table buffer: {0}")] @@ -71,11 +73,13 @@ impl TableBuffer { pub fn partitioned_record_batches( &self, table_def: Arc, - filter: &[Expr], + filter: &BufferFilter, ) -> Result)>> { let mut batches = HashMap::new(); let schema = table_def.schema.as_arrow(); - for sc in &self.snapshotting_chunks { + for sc in self.snapshotting_chunks.iter().filter(|sc| { + filter.test_time_stamp_min_max(sc.timestamp_min_max.min, sc.timestamp_min_max.max) + }) { let cols: std::result::Result, _> = schema .fields() .iter() @@ -95,7 +99,11 @@ impl TableBuffer { *ts = ts.union(&sc.timestamp_min_max); v.push(rb); } - for (t, c) in &self.chunk_time_to_chunks { + for (t, c) in self + .chunk_time_to_chunks + .iter() + .filter(|(_, c)| filter.test_time_stamp_min_max(c.timestamp_min, c.timestamp_max)) + { let ts_min_max = TimestampMinMax::new(c.timestamp_min, c.timestamp_max); let (ts, v) = batches .entry(*t) @@ -106,40 +114,6 @@ impl TableBuffer { Ok(batches) } - pub fn record_batches( - &self, - table_def: Arc, - filter: &[Expr], - ) -> Result> { - let mut batches = - Vec::with_capacity(self.snapshotting_chunks.len() + self.chunk_time_to_chunks.len()); - let schema = table_def.schema.as_arrow(); - - for sc in &self.snapshotting_chunks { - let cols: std::result::Result, _> = schema - .fields() - .iter() - .map(|f| { - let col = sc - .record_batch - .column_by_name(f.name()) - .ok_or(Error::FieldNotFound(f.name().to_string())); - col.cloned() - }) - .collect(); - let cols = cols?; - let rb = RecordBatch::try_new(schema.clone(), cols)?; - - batches.push(rb); - } - - for c in self.chunk_time_to_chunks.values() { - batches.push(c.record_batch(Arc::clone(&table_def), filter)?) - } - - Ok(batches) - } - pub fn timestamp_min_max(&self) -> TimestampMinMax { let (min, max) = if self.chunk_time_to_chunks.is_empty() { (0, 0) @@ -265,7 +239,6 @@ impl MutableTableChunk { self.timestamp_max = self.timestamp_max.max(*v); let b = self.data.entry(f.id).or_insert_with(|| { - debug!("Creating new timestamp builder"); let mut time_builder = TimestampNanosecondBuilder::new(); // append nulls for all previous rows time_builder.append_nulls(row_index + self.row_count); @@ -383,7 +356,6 @@ impl MutableTableChunk { // add nulls for any columns not present for (name, builder) in &mut self.data { if !value_added.contains(name) { - debug!("Adding null for column {}", name); match builder { Builder::Bool(b) => b.append_null(), Builder::F64(b) => b.append_null(), @@ -408,18 +380,16 @@ impl MutableTableChunk { fn record_batch( &self, table_def: Arc, - filter: &[Expr], + filter: &BufferFilter, ) -> Result { - let row_ids = self - .index - .get_rows_from_index_for_filter(Arc::clone(&table_def), filter); + let row_ids = self.index.get_rows_from_index_for_filter(filter); let schema = table_def.schema.as_arrow(); let mut cols = Vec::with_capacity(schema.fields().len()); for f in schema.fields() { match row_ids { - Some(row_ids) => { + Some(ref row_ids) => { let b = table_def .column_name_to_id(f.name().as_str()) .and_then(|id| self.data.get(&id)); @@ -576,7 +546,7 @@ impl std::fmt::Debug for MutableTableChunk { #[derive(Debug, Clone)] struct BufferIndex { // column id -> string value -> row indexes - columns: HashMap>>, + columns: HashMap>>, } impl BufferIndex { @@ -594,34 +564,52 @@ impl BufferIndex { if let Some(column) = self.columns.get_mut(&column_id) { column .entry_ref(value) - .and_modify(|c| c.push(row_index)) - .or_insert(vec![row_index]); + .and_modify(|c| { + c.insert(row_index); + }) + .or_insert([row_index].into_iter().collect()); } } - fn get_rows_from_index_for_filter( - &self, - table_def: Arc, - filter: &[Expr], - ) -> Option<&Vec> { - for expr in filter { - if let Expr::BinaryExpr(BinaryExpr { left, op, right }) = expr { - if *op == datafusion::logical_expr::Operator::Eq { - if let Expr::Column(c) = left.as_ref() { - if let Expr::Literal(datafusion::scalar::ScalarValue::Utf8(Some(v))) = - right.as_ref() - { - return table_def - .column_name_to_id(c.name()) - .and_then(|id| self.columns.get(&id)) - .and_then(|m| m.get(v.as_str())); - } + fn get_rows_from_index_for_filter(&self, filter: &BufferFilter) -> Option> { + let mut row_ids = HashSet::new(); + for ( + col_id, + BufferGuarantee { + guarantee, + literals, + }, + ) in filter.guarantees() + { + let Some(row_map) = self.columns.get(col_id) else { + continue; + }; + match guarantee { + Guarantee::In => { + for literal in literals { + let Some(row) = row_map.get(literal.as_ref()) else { + continue; + }; + row_ids = row_ids.union(row).copied().collect(); + } + } + Guarantee::NotIn => { + row_ids.extend(row_map.values().flatten().copied()); + for literal in literals { + let Some(row) = row_map.get(literal.as_ref()) else { + continue; + }; + row_ids = row_ids.difference(row).copied().collect(); } } } } - None + if row_ids.is_empty() { + None + } else { + Some(row_ids.into_iter().collect()) + } } #[allow(dead_code)] @@ -695,7 +683,7 @@ impl Builder { } } - fn get_rows(&self, rows: &[usize]) -> ArrayRef { + fn get_rows(&self, rows: &HashSet) -> ArrayRef { match self { Self::Bool(b) => { let b = b.finish_cloned(); @@ -795,82 +783,79 @@ impl Builder { #[cfg(test)] mod tests { + use crate::{write_buffer::validator::WriteValidator, Precision}; + use super::*; - use arrow_util::{assert_batches_eq, assert_batches_sorted_eq}; - use datafusion::common::Column; - use influxdb3_id::TableId; - use influxdb3_wal::Field; - use schema::InfluxFieldType; + use arrow_util::assert_batches_sorted_eq; + use data_types::NamespaceName; + use datafusion::prelude::{col, lit, lit_timestamp_nano, Expr}; + use influxdb3_catalog::catalog::{Catalog, DatabaseSchema}; + use iox_time::Time; + + struct TestWriter { + catalog: Arc, + } + + impl TestWriter { + const DB_NAME: &str = "test-db"; + + fn new() -> Self { + let catalog = Arc::new(Catalog::new("test-node".into(), "test-instance".into())); + Self { catalog } + } + + fn write_to_rows(&self, lp: impl AsRef, ingest_time_sec: i64) -> Vec { + let db = NamespaceName::try_from(Self::DB_NAME).unwrap(); + let ingest_time_ns = ingest_time_sec * 1_000_000_000; + let validator = + WriteValidator::initialize(db, Arc::clone(&self.catalog), ingest_time_ns).unwrap(); + validator + .v1_parse_lines_and_update_schema( + lp.as_ref(), + false, + Time::from_timestamp_nanos(ingest_time_ns), + Precision::Nanosecond, + ) + .map(|r| r.into_inner().to_rows()) + .unwrap() + } + + fn db_schema(&self) -> Arc { + self.catalog.db_schema(Self::DB_NAME).unwrap() + } + } #[test] - fn partitioned_table_buffer_batches() { - let table_def = Arc::new( - TableDefinition::new( - TableId::new(), - "test_table".into(), - vec![ - (ColumnId::from(0), "tag".into(), InfluxColumnType::Tag), - ( - ColumnId::from(1), - "val".into(), - InfluxColumnType::Field(InfluxFieldType::String), - ), - ( - ColumnId::from(2), - "time".into(), - InfluxColumnType::Timestamp, - ), - ], - vec![0.into()], - ) - .unwrap(), - ); - let mut table_buffer = TableBuffer::new(vec![ColumnId::from(0)], SortKey::empty()); + fn test_partitioned_table_buffer_batches() { + let writer = TestWriter::new(); + let mut row_batches = Vec::new(); for t in 0..10 { let offset = t * 10; - let rows = vec![ - Row { - time: offset + 1, - fields: vec![ - Field { - id: ColumnId::from(0), - value: FieldData::Tag("a".to_string()), - }, - Field { - id: ColumnId::from(1), - value: FieldData::String(format!("thing {t}-1")), - }, - Field { - id: ColumnId::from(2), - value: FieldData::Timestamp(offset + 1), - }, - ], - }, - Row { - time: offset + 2, - fields: vec![ - Field { - id: ColumnId::from(0), - value: FieldData::Tag("b".to_string()), - }, - Field { - id: ColumnId::from(1), - value: FieldData::String(format!("thing {t}-2")), - }, - Field { - id: ColumnId::from(2), - value: FieldData::Timestamp(offset + 2), - }, - ], - }, - ]; + let rows = writer.write_to_rows( + format!( + "\ + tbl,tag=a val=\"thing {t}-1\" {o1}\n\ + tbl,tag=b val=\"thing {t}-2\" {o2}\n\ + ", + o1 = offset + 1, + o2 = offset + 2, + ), + offset, + ); + row_batches.push((rows, offset)); + } + + let table_def = writer.db_schema().table_definition("tbl").unwrap(); + let tag_col_id = table_def.column_name_to_id("tag").unwrap(); + let mut table_buffer = TableBuffer::new(vec![tag_col_id], SortKey::empty()); + for (rows, offset) in row_batches { table_buffer.buffer_chunk(offset, &rows); } let partitioned_batches = table_buffer - .partitioned_record_batches(Arc::clone(&table_def), &[]) + .partitioned_record_batches(Arc::clone(&table_def), &BufferFilter::default()) .unwrap(); assert_eq!(10, partitioned_batches.len()); @@ -902,209 +887,165 @@ mod tests { } #[test] - fn tag_row_index() { - let table_def = Arc::new( - TableDefinition::new( - TableId::new(), - "test_table".into(), - vec![ - (ColumnId::from(0), "tag".into(), InfluxColumnType::Tag), - ( - ColumnId::from(1), - "value".into(), - InfluxColumnType::Field(InfluxFieldType::Integer), - ), - ( - ColumnId::from(2), - "time".into(), - InfluxColumnType::Timestamp, - ), - ], - vec![0.into()], - ) - .unwrap(), + fn test_row_index_tag_filtering() { + let writer = TestWriter::new(); + let rows = writer.write_to_rows( + "\ + tbl,tag=a value=1i 1\n\ + tbl,tag=b value=2i 1\n\ + tbl,tag=a value=3i 2\n\ + tbl,tag=b value=4i 2\n\ + tbl,tag=a value=5i 3\n\ + tbl,tag=c value=6i 3", + 0, ); - let mut table_buffer = TableBuffer::new(vec![ColumnId::from(0)], SortKey::empty()); - - let rows = vec![ - Row { - time: 1, - fields: vec![ - Field { - id: ColumnId::from(0), - value: FieldData::Tag("a".to_string()), - }, - Field { - id: ColumnId::from(1), - value: FieldData::Integer(1), - }, - Field { - id: ColumnId::from(2), - value: FieldData::Timestamp(1), - }, + let table_def = writer.db_schema().table_definition("tbl").unwrap(); + let tag_id = table_def.column_name_to_id("tag").unwrap(); + let mut table_buffer = TableBuffer::new(vec![tag_id], SortKey::empty()); + + table_buffer.buffer_chunk(0, &rows); + + struct TestCase<'a> { + filter: &'a [Expr], + expected_rows: &'a [usize], + expected_output: &'a [&'a str], + } + + let test_cases = [ + TestCase { + filter: &[col("tag").eq(lit("a"))], + expected_rows: &[0, 2, 4], + expected_output: &[ + "+-----+--------------------------------+-------+", + "| tag | time | value |", + "+-----+--------------------------------+-------+", + "| a | 1970-01-01T00:00:00.000000001Z | 1 |", + "| a | 1970-01-01T00:00:00.000000002Z | 3 |", + "| a | 1970-01-01T00:00:00.000000003Z | 5 |", + "+-----+--------------------------------+-------+", ], }, - Row { - time: 2, - fields: vec![ - Field { - id: ColumnId::from(0), - value: FieldData::Tag("b".to_string()), - }, - Field { - id: ColumnId::from(1), - value: FieldData::Integer(2), - }, - Field { - id: ColumnId::from(2), - value: FieldData::Timestamp(2), - }, + TestCase { + filter: &[col("tag").eq(lit("b"))], + expected_rows: &[1, 3], + expected_output: &[ + "+-----+--------------------------------+-------+", + "| tag | time | value |", + "+-----+--------------------------------+-------+", + "| b | 1970-01-01T00:00:00.000000001Z | 2 |", + "| b | 1970-01-01T00:00:00.000000002Z | 4 |", + "+-----+--------------------------------+-------+", ], }, - Row { - time: 3, - fields: vec![ - Field { - id: ColumnId::from(0), - value: FieldData::Tag("a".to_string()), - }, - Field { - id: ColumnId::from(1), - value: FieldData::Integer(3), - }, - Field { - id: ColumnId::from(2), - value: FieldData::Timestamp(3), - }, + TestCase { + filter: &[col("tag").eq(lit("c"))], + expected_rows: &[5], + expected_output: &[ + "+-----+--------------------------------+-------+", + "| tag | time | value |", + "+-----+--------------------------------+-------+", + "| c | 1970-01-01T00:00:00.000000003Z | 6 |", + "+-----+--------------------------------+-------+", ], }, - ]; - - table_buffer.buffer_chunk(0, &rows); - - let filter = &[Expr::BinaryExpr(BinaryExpr { - left: Box::new(Expr::Column(Column { - relation: None, - name: "tag".to_string(), - })), - op: datafusion::logical_expr::Operator::Eq, - right: Box::new(Expr::Literal(datafusion::scalar::ScalarValue::Utf8(Some( - "a".to_string(), - )))), - })]; - let a_rows = table_buffer - .chunk_time_to_chunks - .get(&0) - .unwrap() - .index - .get_rows_from_index_for_filter(Arc::clone(&table_def), filter) - .unwrap(); - assert_eq!(a_rows, &[0, 2]); - - let a = table_buffer - .record_batches(Arc::clone(&table_def), filter) - .unwrap(); - let expected_a = vec![ - "+-----+--------------------------------+-------+", - "| tag | time | value |", - "+-----+--------------------------------+-------+", - "| a | 1970-01-01T00:00:00.000000001Z | 1 |", - "| a | 1970-01-01T00:00:00.000000003Z | 3 |", - "+-----+--------------------------------+-------+", - ]; - assert_batches_eq!(&expected_a, &a); - - let filter = &[Expr::BinaryExpr(BinaryExpr { - left: Box::new(Expr::Column(Column { - relation: None, - name: "tag".to_string(), - })), - op: datafusion::logical_expr::Operator::Eq, - right: Box::new(Expr::Literal(datafusion::scalar::ScalarValue::Utf8(Some( - "b".to_string(), - )))), - })]; - - let b_rows = table_buffer - .chunk_time_to_chunks - .get(&0) - .unwrap() - .index - .get_rows_from_index_for_filter(Arc::clone(&table_def), filter) - .unwrap(); - assert_eq!(b_rows, &[1]); - - let b = table_buffer - .record_batches(Arc::clone(&table_def), filter) - .unwrap(); - let expected_b = vec![ - "+-----+--------------------------------+-------+", - "| tag | time | value |", - "+-----+--------------------------------+-------+", - "| b | 1970-01-01T00:00:00.000000002Z | 2 |", - "+-----+--------------------------------+-------+", - ]; - assert_batches_eq!(&expected_b, &b); - } - - #[test] - fn computed_size_of_buffer() { - let mut table_buffer = TableBuffer::new(vec![ColumnId::from(0)], SortKey::empty()); - - let rows = vec![ - Row { - time: 1, - fields: vec![ - Field { - id: ColumnId::from(0), - value: FieldData::Tag("a".to_string()), - }, - Field { - id: ColumnId::from(1), - value: FieldData::Integer(1), - }, - Field { - id: ColumnId::from(2), - value: FieldData::Timestamp(1), - }, + TestCase { + filter: &[col("tag").eq(lit("a")).or(col("tag").eq(lit("c")))], + expected_rows: &[0, 2, 4, 5], + expected_output: &[ + "+-----+--------------------------------+-------+", + "| tag | time | value |", + "+-----+--------------------------------+-------+", + "| a | 1970-01-01T00:00:00.000000001Z | 1 |", + "| a | 1970-01-01T00:00:00.000000002Z | 3 |", + "| a | 1970-01-01T00:00:00.000000003Z | 5 |", + "| c | 1970-01-01T00:00:00.000000003Z | 6 |", + "+-----+--------------------------------+-------+", ], }, - Row { - time: 2, - fields: vec![ - Field { - id: ColumnId::from(0), - value: FieldData::Tag("b".to_string()), - }, - Field { - id: ColumnId::from(1), - value: FieldData::Integer(2), - }, - Field { - id: ColumnId::from(2), - value: FieldData::Timestamp(2), - }, + TestCase { + filter: &[col("tag").not_eq(lit("a"))], + expected_rows: &[1, 3, 5], + expected_output: &[ + "+-----+--------------------------------+-------+", + "| tag | time | value |", + "+-----+--------------------------------+-------+", + "| b | 1970-01-01T00:00:00.000000001Z | 2 |", + "| b | 1970-01-01T00:00:00.000000002Z | 4 |", + "| c | 1970-01-01T00:00:00.000000003Z | 6 |", + "+-----+--------------------------------+-------+", ], }, - Row { - time: 3, - fields: vec![ - Field { - id: ColumnId::from(0), - value: FieldData::Tag("this is a long tag value to store".to_string()), - }, - Field { - id: ColumnId::from(1), - value: FieldData::Integer(3), - }, - Field { - id: ColumnId::from(2), - value: FieldData::Timestamp(3), - }, + TestCase { + filter: &[col("tag").in_list(vec![lit("a"), lit("c")], false)], + expected_rows: &[0, 2, 4, 5], + expected_output: &[ + "+-----+--------------------------------+-------+", + "| tag | time | value |", + "+-----+--------------------------------+-------+", + "| a | 1970-01-01T00:00:00.000000001Z | 1 |", + "| a | 1970-01-01T00:00:00.000000002Z | 3 |", + "| a | 1970-01-01T00:00:00.000000003Z | 5 |", + "| c | 1970-01-01T00:00:00.000000003Z | 6 |", + "+-----+--------------------------------+-------+", + ], + }, + TestCase { + filter: &[col("tag").in_list(vec![lit("a"), lit("c")], true)], + expected_rows: &[1, 3], + expected_output: &[ + "+-----+--------------------------------+-------+", + "| tag | time | value |", + "+-----+--------------------------------+-------+", + "| b | 1970-01-01T00:00:00.000000001Z | 2 |", + "| b | 1970-01-01T00:00:00.000000002Z | 4 |", + "+-----+--------------------------------+-------+", ], }, ]; + for t in test_cases { + let filter = BufferFilter::new(&table_def, t.filter).unwrap(); + let rows = table_buffer + .chunk_time_to_chunks + .get(&0) + .unwrap() + .index + .get_rows_from_index_for_filter(&filter) + .unwrap(); + assert_eq!( + rows, + HashSet::::from_iter(t.expected_rows.iter().copied()) + ); + let batches = table_buffer + .partitioned_record_batches(Arc::clone(&table_def), &filter) + .unwrap() + .into_values() + .flat_map(|(_, batch)| batch.into_iter()) + .collect::>(); + assert_batches_sorted_eq!(t.expected_output, &batches); + } + } + + #[test] + fn test_computed_size_of_buffer() { + let writer = TestWriter::new(); + + let rows = writer.write_to_rows( + "\ + tbl,tag=a value=1i 1\n\ + tbl,tag=b value=2i 2\n\ + tbl,tag=this\\ is\\ a\\ long\\ tag\\ value\\ to\\ store value=3i 3\n\ + ", + 0, + ); + + let tag_col_id = writer + .db_schema() + .table_definition("tbl") + .and_then(|tbl| tbl.column_name_to_id("tag")) + .unwrap(); + + let mut table_buffer = TableBuffer::new(vec![tag_col_id], SortKey::empty()); table_buffer.buffer_chunk(0, &rows); let size = table_buffer.computed_size(); @@ -1118,4 +1059,94 @@ mod tests { assert_eq!(timestamp_min_max.min, 0); assert_eq!(timestamp_min_max.max, 0); } + + #[test_log::test] + fn test_time_filters() { + let writer = TestWriter::new(); + + let mut row_batches = Vec::new(); + for offset in 0..100 { + let rows = writer.write_to_rows( + format!( + "\ + tbl,tag=a val={}\n\ + tbl,tag=b val={}\n\ + ", + offset + 1, + offset + 2 + ), + offset, + ); + row_batches.push((offset, rows)); + } + let table_def = writer.db_schema().table_definition("tbl").unwrap(); + let tag_col_id = table_def.column_name_to_id("tag").unwrap(); + let mut table_buffer = TableBuffer::new(vec![tag_col_id], SortKey::empty()); + + for (offset, rows) in row_batches { + table_buffer.buffer_chunk(offset, &rows); + } + + struct TestCase<'a> { + filter: &'a [Expr], + expected_output: &'a [&'a str], + } + + let test_cases = [ + TestCase { + filter: &[col("time").gt(lit_timestamp_nano(97_000_000_000i64))], + expected_output: &[ + "+-----+----------------------+-------+", + "| tag | time | val |", + "+-----+----------------------+-------+", + "| a | 1970-01-01T00:01:38Z | 99.0 |", + "| a | 1970-01-01T00:01:39Z | 100.0 |", + "| b | 1970-01-01T00:01:38Z | 100.0 |", + "| b | 1970-01-01T00:01:39Z | 101.0 |", + "+-----+----------------------+-------+", + ], + }, + TestCase { + filter: &[col("time").lt(lit_timestamp_nano(3_000_000_000i64))], + expected_output: &[ + "+-----+----------------------+-----+", + "| tag | time | val |", + "+-----+----------------------+-----+", + "| a | 1970-01-01T00:00:00Z | 1.0 |", + "| a | 1970-01-01T00:00:01Z | 2.0 |", + "| a | 1970-01-01T00:00:02Z | 3.0 |", + "| b | 1970-01-01T00:00:00Z | 2.0 |", + "| b | 1970-01-01T00:00:01Z | 3.0 |", + "| b | 1970-01-01T00:00:02Z | 4.0 |", + "+-----+----------------------+-----+", + ], + }, + TestCase { + filter: &[col("time") + .gt(lit_timestamp_nano(3_000_000_000i64)) + .and(col("time").lt(lit_timestamp_nano(6_000_000_000i64)))], + expected_output: &[ + "+-----+----------------------+-----+", + "| tag | time | val |", + "+-----+----------------------+-----+", + "| a | 1970-01-01T00:00:04Z | 5.0 |", + "| a | 1970-01-01T00:00:05Z | 6.0 |", + "| b | 1970-01-01T00:00:04Z | 6.0 |", + "| b | 1970-01-01T00:00:05Z | 7.0 |", + "+-----+----------------------+-----+", + ], + }, + ]; + + for t in test_cases { + let filter = BufferFilter::new(&table_def, t.filter).unwrap(); + let batches = table_buffer + .partitioned_record_batches(Arc::clone(&table_def), &filter) + .unwrap() + .into_values() + .flat_map(|(_, batches)| batches) + .collect::>(); + assert_batches_sorted_eq!(t.expected_output, &batches); + } + } }