Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Drop RowConverter from GroupOrderingPartial #14566

Merged
merged 8 commits into from
Feb 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions datafusion/physical-plan/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,7 @@ tokio = { workspace = true, features = [
[[bench]]
harness = false
name = "spm"

[[bench]]
harness = false
name = "partial_ordering"
77 changes: 77 additions & 0 deletions datafusion/physical-plan/benches/partial_ordering.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::sync::Arc;

use arrow::array::{ArrayRef, Int32Array};
use arrow_schema::{DataType, Field, Schema, SortOptions};
use criterion::{criterion_group, criterion_main, Criterion};
use datafusion_physical_expr::{expressions::col, LexOrdering, PhysicalSortExpr};
use datafusion_physical_plan::aggregates::order::GroupOrderingPartial;

const BATCH_SIZE: usize = 8192;

fn create_test_arrays(num_columns: usize) -> Vec<ArrayRef> {
(0..num_columns)
.map(|i| {
Arc::new(Int32Array::from_iter_values(
(0..BATCH_SIZE as i32).map(|x| x * (i + 1) as i32),
)) as ArrayRef
})
.collect()
}
fn bench_new_groups(c: &mut Criterion) {
let mut group = c.benchmark_group("group_ordering_partial");

// Test with 1, 2, 4, and 8 order indices
for num_columns in [1, 2, 4, 8] {
let fields: Vec<Field> = (0..num_columns)
.map(|i| Field::new(format!("col{}", i), DataType::Int32, false))
.collect();
let schema = Schema::new(fields);

let order_indices: Vec<usize> = (0..num_columns).collect();
let ordering = LexOrdering::new(
(0..num_columns)
.map(|i| {
PhysicalSortExpr::new(
col(&format!("col{}", i), &schema).unwrap(),
SortOptions::default(),
)
})
.collect(),
);

group.bench_function(format!("order_indices_{}", num_columns), |b| {
let batch_group_values = create_test_arrays(num_columns);
let group_indices: Vec<usize> = (0..BATCH_SIZE).collect();

b.iter(|| {
let mut ordering =
GroupOrderingPartial::try_new(&schema, &order_indices, &ordering)
.unwrap();
ordering
.new_groups(&batch_group_values, &group_indices, BATCH_SIZE)
.unwrap();
});
});
}
group.finish();
}

criterion_group!(benches, bench_new_groups);
criterion_main!(benches);
217 changes: 169 additions & 48 deletions datafusion/physical-plan/src/aggregates/order/partial.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@
// under the License.

use arrow::array::ArrayRef;
use arrow::compute::SortOptions;
use arrow::datatypes::Schema;
use arrow::row::{OwnedRow, RowConverter, Rows, SortField};
use datafusion_common::Result;
use arrow_ord::partition::partition;
use datafusion_common::utils::{compare_rows, get_row_at_idx};
use datafusion_common::{Result, ScalarValue};
use datafusion_execution::memory_pool::proxy::VecAllocExt;
use datafusion_expr::EmitTo;
use datafusion_physical_expr_common::sort_expr::LexOrdering;
use std::cmp::Ordering;
use std::mem::size_of;
use std::sync::Arc;

Expand Down Expand Up @@ -69,13 +72,9 @@ pub struct GroupOrderingPartial {
/// For example if grouping by `id, state` and ordered by `state`
/// this would be `[1]`.
order_indices: Vec<usize>,

/// Converter for the sort key (used on the group columns
/// specified in `order_indexes`)
row_converter: RowConverter,
}

#[derive(Debug, Default)]
#[derive(Debug, Default, PartialEq)]
enum State {
/// The ordering was temporarily taken. `Self::Taken` is left
/// when state must be temporarily taken to satisfy the borrow
Expand All @@ -93,7 +92,7 @@ enum State {
/// Smallest group index with the sort_key
current_sort: usize,
/// The sort key of group_index `current_sort`
sort_key: OwnedRow,
sort_key: Vec<ScalarValue>,
/// index of the current group for which values are being
/// generated
current: usize,
Expand All @@ -103,47 +102,47 @@ enum State {
Complete,
}

impl State {
fn size(&self) -> usize {
match self {
State::Taken => 0,
State::Start => 0,
State::InProgress { sort_key, .. } => sort_key
.iter()
.map(|scalar_value| scalar_value.size())
.sum(),
State::Complete => 0,
}
}
}

impl GroupOrderingPartial {
/// TODO: Remove unnecessary `input_schema` parameter.
pub fn try_new(
input_schema: &Schema,
_input_schema: &Schema,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not remove this arg?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This way we can avoid API change, though I don't have a strong preference

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can add a comment or something as a follow on pr

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additionally, the input_schema argument in order::GroupOrdering::try_new will also be unnecessary

impl GroupOrdering {
/// Create a `GroupOrdering` for the specified ordering
pub fn try_new(
input_schema: &Schema,
mode: &InputOrderMode,
ordering: &LexOrdering,
) -> Result<Self> {
match mode {
InputOrderMode::Linear => Ok(GroupOrdering::None),
InputOrderMode::PartiallySorted(order_indices) => {
GroupOrderingPartial::try_new(input_schema, order_indices, ordering)
.map(GroupOrdering::Partial)
}
InputOrderMode::Sorted => Ok(GroupOrdering::Full(GroupOrderingFull::new())),
}
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a TODO comment to the code

order_indices: &[usize],
ordering: &LexOrdering,
) -> Result<Self> {
assert!(!order_indices.is_empty());
assert!(order_indices.len() <= ordering.len());

// get only the section of ordering, that consist of group by expressions.
let fields = ordering[0..order_indices.len()]
.iter()
.map(|sort_expr| {
Ok(SortField::new_with_options(
sort_expr.expr.data_type(input_schema)?,
sort_expr.options,
))
})
.collect::<Result<Vec<_>>>()?;

Ok(Self {
state: State::Start,
order_indices: order_indices.to_vec(),
row_converter: RowConverter::new(fields)?,
})
}

/// Creates sort keys from the group values
/// Select sort keys from the group values
///
/// For example, if group_values had `A, B, C` but the input was
/// only sorted on `B` and `C` this should return rows for (`B`,
/// `C`)
fn compute_sort_keys(&mut self, group_values: &[ArrayRef]) -> Result<Rows> {
fn compute_sort_keys(&mut self, group_values: &[ArrayRef]) -> Vec<ArrayRef> {
// Take only the columns that are in the sort key
let sort_values: Vec<_> = self
.order_indices
self.order_indices
.iter()
.map(|&idx| Arc::clone(&group_values[idx]))
.collect();

Ok(self.row_converter.convert_columns(&sort_values)?)
.collect()
}

/// How many groups be emitted, or None if no data can be emitted
Expand Down Expand Up @@ -194,6 +193,23 @@ impl GroupOrderingPartial {
};
}

fn updated_sort_key(
current_sort: usize,
sort_key: Option<Vec<ScalarValue>>,
range_current_sort: usize,
range_sort_key: Vec<ScalarValue>,
) -> Result<(usize, Vec<ScalarValue>)> {
if let Some(sort_key) = sort_key {
let sort_options = vec![SortOptions::new(false, false); sort_key.len()];
let ordering = compare_rows(&sort_key, &range_sort_key, &sort_options)?;
if ordering == Ordering::Equal {
return Ok((current_sort, sort_key));
}
}

Ok((range_current_sort, range_sort_key))
}

/// Called when new groups are added in a batch. See documentation
/// on [`super::GroupOrdering::new_groups`]
pub fn new_groups(
Expand All @@ -207,46 +223,151 @@ impl GroupOrderingPartial {

let max_group_index = total_num_groups - 1;

// compute the sort key values for each group
let sort_keys = self.compute_sort_keys(batch_group_values)?;

let old_state = std::mem::take(&mut self.state);
let (mut current_sort, mut sort_key) = match &old_state {
let (current_sort, sort_key) = match std::mem::take(&mut self.state) {
State::Taken => unreachable!("State previously taken"),
State::Start => (0, sort_keys.row(0)),
State::Start => (0, None),
State::InProgress {
current_sort,
sort_key,
..
} => (*current_sort, sort_key.row()),
} => (current_sort, Some(sort_key)),
State::Complete => {
panic!("Saw new group after the end of input");
}
};

// Find latest sort key
let iter = group_indices.iter().zip(sort_keys.iter());
for (&group_index, group_sort_key) in iter {
// Does this group have seen a new sort_key?
if sort_key != group_sort_key {
current_sort = group_index;
sort_key = group_sort_key;
}
}
// Select the sort key columns
let sort_keys = self.compute_sort_keys(batch_group_values);

// Check if the sort keys indicate a boundary inside the batch
let ranges = partition(&sort_keys)?.ranges();
let last_range = ranges.last().unwrap();

let range_current_sort = group_indices[last_range.start];
let range_sort_key = get_row_at_idx(&sort_keys, last_range.start)?;

let (current_sort, sort_key) = if last_range.start == 0 {
// There was no boundary in the batch. Compare with the previous sort_key (if present)
// to check if there was a boundary between the current batch and the previous one.
Self::updated_sort_key(
current_sort,
sort_key,
range_current_sort,
range_sort_key,
)?
} else {
(range_current_sort, range_sort_key)
};

self.state = State::InProgress {
current_sort,
sort_key: sort_key.owned(),
current: max_group_index,
sort_key,
};

Ok(())
}

/// Return the size of memory allocated by this structure
pub(crate) fn size(&self) -> usize {
size_of::<Self>()
+ self.order_indices.allocated_size()
+ self.row_converter.size()
size_of::<Self>() + self.order_indices.allocated_size() + self.state.size()
}
}

#[cfg(test)]
mod tests {
use arrow::array::Int32Array;
use arrow_schema::{DataType, Field};
use datafusion_physical_expr::{expressions::col, PhysicalSortExpr};

use super::*;

#[test]
fn test_group_ordering_partial() -> Result<()> {
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
]);

// Ordered on column a
let order_indices = vec![0];

let ordering = LexOrdering::new(vec![PhysicalSortExpr::new(
col("a", &schema)?,
SortOptions::default(),
)]);

let mut group_ordering =
GroupOrderingPartial::try_new(&schema, &order_indices, &ordering)?;

let batch_group_values: Vec<ArrayRef> = vec![
Arc::new(Int32Array::from(vec![1, 2, 3])),
Arc::new(Int32Array::from(vec![2, 1, 3])),
];

let group_indices = vec![0, 1, 2];
let total_num_groups = 3;

group_ordering.new_groups(
&batch_group_values,
&group_indices,
total_num_groups,
)?;

assert_eq!(
group_ordering.state,
State::InProgress {
current_sort: 2,
sort_key: vec![ScalarValue::Int32(Some(3))],
current: 2
}
);

// push without a boundary
let batch_group_values: Vec<ArrayRef> = vec![
Arc::new(Int32Array::from(vec![3, 3, 3])),
Arc::new(Int32Array::from(vec![2, 1, 7])),
];
let group_indices = vec![3, 4, 5];
let total_num_groups = 6;

group_ordering.new_groups(
&batch_group_values,
&group_indices,
total_num_groups,
)?;

assert_eq!(
group_ordering.state,
State::InProgress {
current_sort: 2,
sort_key: vec![ScalarValue::Int32(Some(3))],
current: 5
}
);

// push with only a boundary to previous batch
let batch_group_values: Vec<ArrayRef> = vec![
Arc::new(Int32Array::from(vec![4, 4, 4])),
Arc::new(Int32Array::from(vec![1, 1, 1])),
];
let group_indices = vec![6, 7, 8];
let total_num_groups = 9;

group_ordering.new_groups(
&batch_group_values,
&group_indices,
total_num_groups,
)?;
assert_eq!(
group_ordering.state,
State::InProgress {
current_sort: 6,
sort_key: vec![ScalarValue::Int32(Some(4))],
current: 8
}
);

Ok(())
}
}
Loading