Skip to content

Commit

Permalink
Merge pull request #622 from splitgraph/pruning-heuristic
Browse files Browse the repository at this point in the history
Add fine-grained flush pruning
  • Loading branch information
gruuya authored Aug 20, 2024
2 parents 779d110 + 148185f commit 004955a
Show file tree
Hide file tree
Showing 4 changed files with 374 additions and 34 deletions.
2 changes: 1 addition & 1 deletion src/config/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ impl Default for DataSyncConfig {
Self {
max_in_memory_bytes: 3 * 1024 * 1024 * 1024,
max_replication_lag_s: 600,
max_syncs_per_url: 100,
max_syncs_per_url: 50,
write_lock_timeout_s: 3,
flush_task_interval_s: 900,
}
Expand Down
14 changes: 14 additions & 0 deletions src/frontend/flight/sync/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ const IN_MEMORY_OLDEST: &str =
const SQUASH_TIME: &str = "seafowl_changeset_writer_squash_time_seconds";
const SQUASHED_BYTES: &str = "seafowl_changeset_writer_squashed_bytes_total";
const SQUASHED_ROWS: &str = "seafowl_changeset_writer_squashed_rows_total";
const PRUNING_TIME: &str = "seafowl_changeset_writer_pruning_time_milliseconds";
const PRUNING_FILES: &str = "seafowl_changeset_writer_pruning_files_total";
const FLUSH_TIME: &str = "seafowl_changeset_writer_flush_time_seconds";
const FLUSH_BYTES: &str = "seafowl_changeset_writer_flush_bytes_total";
const FLUSH_ROWS: &str = "seafowl_changeset_writer_flush_rows_total";
Expand All @@ -32,6 +34,8 @@ pub struct SyncMetrics {
pub squash_time: Histogram,
pub squashed_bytes: Counter,
pub squashed_rows: Counter,
pub pruning_time: Histogram,
pub pruning_files: Histogram,
pub flush_time: Histogram,
pub flush_bytes: Counter,
pub flush_rows: Counter,
Expand Down Expand Up @@ -79,6 +83,14 @@ impl SyncMetrics {
SQUASHED_ROWS,
"The reduction in row count due to batch squashing"
);
describe_histogram!(
PRUNING_TIME,
"The time taken to prune partition files to re-write"
);
describe_histogram!(
PRUNING_FILES,
"The file count that partition pruning identified"
);
describe_histogram!(FLUSH_TIME, "The time taken to flush a collections of syncs");
describe_counter!(FLUSH_BYTES, "The total byte size flushed");
describe_counter!(FLUSH_ROWS, "The total row count flushed");
Expand All @@ -99,6 +111,8 @@ impl SyncMetrics {
squash_time: histogram!(SQUASH_TIME),
squashed_bytes: counter!(SQUASHED_BYTES),
squashed_rows: counter!(SQUASHED_ROWS),
pruning_time: histogram!(PRUNING_TIME),
pruning_files: histogram!(PRUNING_FILES),
flush_time: histogram!(FLUSH_TIME),
flush_bytes: counter!(FLUSH_BYTES),
flush_rows: counter!(FLUSH_ROWS),
Expand Down
301 changes: 295 additions & 6 deletions src/frontend/flight/sync/utils.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
use crate::frontend::flight::sync::schema::SyncSchema;
use crate::frontend::flight::sync::writer::DataSyncItem;
use arrow::array::{new_null_array, Array, ArrayRef, RecordBatch, UInt64Array};
use arrow::compute::{concat_batches, take};
use crate::frontend::flight::sync::SyncResult;
use arrow::array::{new_null_array, Array, ArrayRef, RecordBatch, Scalar, UInt64Array};
use arrow::compute::kernels::cmp::{gt_eq, lt_eq};
use arrow::compute::{and_kleene, bool_or, concat_batches, filter, is_not_null, take};
use arrow_row::{Row, RowConverter, SortField};
use clade::sync::ColumnRole;
use datafusion::functions_aggregate::min_max::{MaxAccumulator, MinAccumulator};
use datafusion::physical_optimizer::pruning::PruningStatistics;
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::{col, lit, Accumulator, Expr};
use std::collections::{HashMap, HashSet, VecDeque};
use tracing::log::warn;

// Compact a set of record batches into a single one, squashing any chain of changes to a given row
// into a single row in the output batch.
Expand Down Expand Up @@ -278,23 +282,160 @@ pub(super) fn construct_qualifier(
.reduce(|e1: Expr, e2| e1.and(e2)))
}

// Go through each sync and each partition and check whether any single row out of non-NULL PKs can
// possibly be found in that partition, and if so add it to the map.
pub(super) fn get_prune_map(
syncs: &[DataSyncItem],
pruning_stats: &dyn PruningStatistics,
) -> SyncResult<Vec<bool>> {
let partition_count = pruning_stats.num_containers();

// Maps of column, partition -> min max scalar value
let mut min_values: HashMap<(&str, usize), Scalar<ArrayRef>> = HashMap::new();
let mut max_values: HashMap<(&str, usize), Scalar<ArrayRef>> = HashMap::new();

// First gather the stats about the partitions and PK columns
for col in syncs
.first()
.unwrap()
.sync_schema
.columns()
.iter()
.filter(|col| col.role() == ColumnRole::OldPk)
{
let maybe_min_vals = pruning_stats.min_values(&col.name().into());
let maybe_max_vals = pruning_stats.max_values(&col.name().into());

if maybe_min_vals.is_none() && maybe_max_vals.is_none() {
// We have no stats for any partition for this PK column, so short-circuit pruning
// by returning all files
warn!(
"Skipping partition pruning: no min/max stats found for column {}",
col.name(),
);
return Ok(vec![true; partition_count]);
}

if let Some(min_vals) = maybe_min_vals {
for file in 0..partition_count {
min_values.insert(
(col.name().as_str(), file),
Scalar::new(min_vals.slice(file, 1)),
);
}
}
if let Some(max_vals) = maybe_max_vals {
for file in 0..partition_count {
max_values.insert(
(col.name().as_str(), file),
Scalar::new(max_vals.slice(file, 1)),
);
}
}
}

// Start off scanning no files
let mut prune_map = vec![false; partition_count];

for role in [ColumnRole::OldPk, ColumnRole::NewPk] {
for sync in syncs {
for (ind, used) in prune_map.iter_mut().enumerate() {
// Perform pruning only if we don't know whether the partition is needed yet
if !*used {
let mut non_null_map = None;
let mut sync_prune_map = None;
for pk_col in sync
.sync_schema
.columns()
.iter()
.filter(|col| col.role() == role)
{
let array =
sync.batch.column_by_name(pk_col.field().name()).unwrap();

// Scope out any NULL values, which only denote no-PKs when inserting/deleting.
// We re-use the same non-null map since there can't be a scenario where
// some of the PKs are null and others aren't (i.e. NULL PKs are invalid).
let array = match non_null_map {
None => {
let non_nulls = is_not_null(array)?;
let array = filter(array, &non_nulls)?;
non_null_map = Some(non_nulls);
array
}
Some(ref non_nulls) => filter(array, non_nulls)?,
};

if array.is_empty() {
// Old PKs for INSERT/ new PKs for DELETE, nothing to prune here
continue;
}

let col_file = (pk_col.name().as_str(), ind);
let next_sync_prune_map = match (
min_values.get(&col_file),
max_values.get(&col_file),
) {
(Some(min_value), Some(max_value)) => and_kleene(
&gt_eq(&array.as_ref(), min_value)?,
&lt_eq(&array.as_ref(), max_value)?,
)?,
(Some(min_value), None) => gt_eq(&array.as_ref(), min_value)?,
(None, Some(max_value)) => lt_eq(&array.as_ref(), max_value)?,
_ => unreachable!("Validation ensured against this case"),
};

match sync_prune_map {
None => sync_prune_map = Some(next_sync_prune_map),
Some(prev_sync_prune_map) => {
sync_prune_map = Some(and_kleene(
&prev_sync_prune_map,
&next_sync_prune_map,
)?)
}
}
}

if let Some(sync_prune_map) = sync_prune_map
&& (bool_or(&sync_prune_map) == Some(true)
|| sync_prune_map.null_count() > 0)
{
// We've managed to calculate the prune map for all columns and at least one
// PK is either definitely in the current partition file, or it's unknown
// whether it is in the current file, so in either case we must include
// this partition in the scan.
*used = true;
}
}
}
}
}

Ok(prune_map)
}

#[cfg(test)]
mod tests {
use crate::frontend::flight::sync::schema::SyncSchema;
use crate::frontend::flight::sync::utils::{construct_qualifier, squash_batches};
use crate::frontend::flight::sync::utils::{
construct_qualifier, get_prune_map, squash_batches,
};
use crate::frontend::flight::sync::writer::DataSyncItem;
use arrow::array::{
BooleanArray, Float64Array, Int32Array, RecordBatch, StringArray, UInt8Array,
Array, ArrayRef, BooleanArray, Float64Array, Int32Array, RecordBatch,
StringArray, UInt8Array,
};
use arrow_schema::{DataType, Field, Schema};
use clade::sync::{ColumnDescriptor, ColumnRole};
use datafusion_common::assert_batches_eq;
use datafusion::physical_optimizer::pruning::PruningStatistics;
use datafusion_common::{assert_batches_eq, Column, ScalarValue};
use datafusion_expr::{col, lit};
use itertools::Itertools;
use rand::distributions::{Alphanumeric, DistString, Distribution, WeightedIndex};
use rand::seq::IteratorRandom;
use rand::Rng;
use std::collections::HashSet;
use rstest::rstest;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use uuid::Uuid;

Expand Down Expand Up @@ -655,4 +796,152 @@ mod tests {

Ok(())
}

struct TestStatistics {
pub min_values: HashMap<Column, ArrayRef>,
pub max_values: HashMap<Column, ArrayRef>,
}

impl PruningStatistics for TestStatistics {
fn min_values(&self, column: &Column) -> Option<ArrayRef> {
self.min_values.get(column).cloned()
}

fn max_values(&self, column: &Column) -> Option<ArrayRef> {
self.max_values.get(column).cloned()
}

fn num_containers(&self) -> usize {
self.min_values
.values()
.next()
.map(|values| values.len())
.unwrap_or_default()
}

fn null_counts(&self, _column: &Column) -> Option<ArrayRef> {
None
}

fn row_counts(&self, _column: &Column) -> Option<ArrayRef> {
None
}

fn contained(
&self,
_column: &Column,
_values: &HashSet<ScalarValue>,
) -> Option<BooleanArray> {
None
}
}

#[rstest]
#[case(
vec![Some(70), None, Some(30)],
vec![Some(90), Some(40), Some(60)],
vec![Some("aa"), Some("bb"), None],
vec![Some("rr"), Some("gg"), Some("a")],
vec![false, true, false],
)]
#[test]
fn test_sync_pruning(
#[case] c1_min_values: Vec<Option<i32>>,
#[case] c1_max_values: Vec<Option<i32>>,
#[case] c2_min_values: Vec<Option<&str>>,
#[case] c2_max_values: Vec<Option<&str>>,
#[case] expected_prune_map: Vec<bool>,
) -> Result<(), Box<dyn std::error::Error>> {
let schema = Arc::new(Schema::new(vec![
Field::new("old_c1", DataType::Int32, true),
Field::new("old_c2", DataType::Utf8, true),
Field::new("new_c1", DataType::Int32, true),
Field::new("new_c2", DataType::Utf8, true),
]));

let column_descriptors = vec![
ColumnDescriptor {
role: ColumnRole::OldPk as i32,
name: "c1".to_string(),
},
ColumnDescriptor {
role: ColumnRole::OldPk as i32,
name: "c2".to_string(),
},
ColumnDescriptor {
role: ColumnRole::NewPk as i32,
name: "c1".to_string(),
},
ColumnDescriptor {
role: ColumnRole::NewPk as i32,
name: "c2".to_string(),
},
];

let sync_schema = SyncSchema::try_new(column_descriptors, schema.clone())?;

// UPDATE, INSERT
let batch_1 = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from(vec![Some(20), None])),
Arc::new(StringArray::from(vec![Some("ddd"), None])),
Arc::new(Int32Array::from(vec![40, 30])),
Arc::new(StringArray::from(vec!["ccc", "bbb"])),
],
)?;

// DELETE, UPDATE
let batch_2 = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from(vec![10, 50])),
Arc::new(StringArray::from(vec!["aaa", "eee"])),
Arc::new(Int32Array::from(vec![None, Some(60)])),
Arc::new(StringArray::from(vec![None, Some("fff")])),
],
)?;

let syncs = &[
DataSyncItem {
tx_id: Uuid::new_v4(),
sync_schema: sync_schema.clone(),
batch: batch_1,
},
DataSyncItem {
tx_id: Uuid::new_v4(),
sync_schema,
batch: batch_2,
},
];

let test_statistics = TestStatistics {
min_values: HashMap::from([
(
Column::from("c1"),
Arc::new(Int32Array::from(c1_min_values)) as ArrayRef,
),
(
Column::from("c2"),
Arc::new(StringArray::from(c2_min_values)) as ArrayRef,
),
]),
max_values: HashMap::from([
(
Column::from("c1"),
Arc::new(Int32Array::from(c1_max_values)) as ArrayRef,
),
(
Column::from("c2"),
Arc::new(StringArray::from(c2_max_values)) as ArrayRef,
),
]),
};

let prune_map = get_prune_map(syncs, &test_statistics).unwrap();

assert_eq!(prune_map, expected_prune_map);

Ok(())
}
}
Loading

0 comments on commit 004955a

Please sign in to comment.