Skip to content

Commit

Permalink
[MINOR]: Move some repetitive codes to functions (apache#9810)
Browse files Browse the repository at this point in the history
* Minor changes

* Accept both owned and reference
  • Loading branch information
mustafasrepo authored Mar 27, 2024
1 parent 8d3504c commit 56c735c
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 70 deletions.
17 changes: 6 additions & 11 deletions datafusion/core/src/datasource/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::physical_plan::{
common, DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties,
Partitioning, SendableRecordBatchStream,
};
use crate::physical_planner::create_physical_sort_expr;
use crate::physical_planner::create_physical_sort_exprs;

use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
Expand Down Expand Up @@ -231,16 +231,11 @@ impl TableProvider for MemTable {
let file_sort_order = sort_order
.iter()
.map(|sort_exprs| {
sort_exprs
.iter()
.map(|expr| {
create_physical_sort_expr(
expr,
&df_schema,
state.execution_props(),
)
})
.collect::<Result<Vec<_>>>()
create_physical_sort_exprs(
sort_exprs,
&df_schema,
state.execution_props(),
)
})
.collect::<Result<Vec<_>>>()?;
exec = exec.with_sort_information(file_sort_order);
Expand Down
63 changes: 27 additions & 36 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use crate::logical_expr::{
Repartition, Union, UserDefinedLogicalNode,
};
use crate::logical_expr::{Limit, Values};
use crate::physical_expr::create_physical_expr;
use crate::physical_expr::{create_physical_expr, create_physical_exprs};
use crate::physical_optimizer::optimizer::PhysicalOptimizerRule;
use crate::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy};
use crate::physical_plan::analyze::AnalyzeExec;
Expand Down Expand Up @@ -96,6 +96,7 @@ use datafusion_sql::utils::window_expr_common_partition_keys;

use async_trait::async_trait;
use datafusion_common::config::FormatOptions;
use datafusion_physical_expr::LexOrdering;
use futures::future::BoxFuture;
use futures::{FutureExt, StreamExt, TryStreamExt};
use itertools::{multiunzip, Itertools};
Expand Down Expand Up @@ -958,14 +959,7 @@ impl DefaultPhysicalPlanner {
LogicalPlan::Sort(Sort { expr, input, fetch, .. }) => {
let physical_input = self.create_initial_plan(input, session_state).await?;
let input_dfschema = input.as_ref().schema();
let sort_expr = expr
.iter()
.map(|e| create_physical_sort_expr(
e,
input_dfschema,
session_state.execution_props(),
))
.collect::<Result<Vec<_>>>()?;
let sort_expr = create_physical_sort_exprs(expr, input_dfschema, session_state.execution_props())?;
let new_sort = SortExec::new(sort_expr, physical_input)
.with_fetch(*fetch);
Ok(Arc::new(new_sort))
Expand Down Expand Up @@ -1592,18 +1586,11 @@ pub fn create_window_expr_with_name(
window_frame,
null_treatment,
}) => {
let args = args
.iter()
.map(|e| create_physical_expr(e, logical_schema, execution_props))
.collect::<Result<Vec<_>>>()?;
let partition_by = partition_by
.iter()
.map(|e| create_physical_expr(e, logical_schema, execution_props))
.collect::<Result<Vec<_>>>()?;
let order_by = order_by
.iter()
.map(|e| create_physical_sort_expr(e, logical_schema, execution_props))
.collect::<Result<Vec<_>>>()?;
let args = create_physical_exprs(args, logical_schema, execution_props)?;
let partition_by =
create_physical_exprs(partition_by, logical_schema, execution_props)?;
let order_by =
create_physical_sort_exprs(order_by, logical_schema, execution_props)?;

if !is_window_frame_bound_valid(window_frame) {
return plan_err!(
Expand Down Expand Up @@ -1670,10 +1657,8 @@ pub fn create_aggregate_expr_with_name_and_maybe_filter(
order_by,
null_treatment,
}) => {
let args = args
.iter()
.map(|e| create_physical_expr(e, logical_input_schema, execution_props))
.collect::<Result<Vec<_>>>()?;
let args =
create_physical_exprs(args, logical_input_schema, execution_props)?;
let filter = match filter {
Some(e) => Some(create_physical_expr(
e,
Expand All @@ -1683,17 +1668,11 @@ pub fn create_aggregate_expr_with_name_and_maybe_filter(
None => None,
};
let order_by = match order_by {
Some(e) => Some(
e.iter()
.map(|expr| {
create_physical_sort_expr(
expr,
logical_input_schema,
execution_props,
)
})
.collect::<Result<Vec<_>>>()?,
),
Some(e) => Some(create_physical_sort_exprs(
e,
logical_input_schema,
execution_props,
)?),
None => None,
};
let ignore_nulls = null_treatment
Expand Down Expand Up @@ -1780,6 +1759,18 @@ pub fn create_physical_sort_expr(
}
}

/// Create vector of physical sort expression from a vector of logical expression
pub fn create_physical_sort_exprs(
exprs: &[Expr],
input_dfschema: &DFSchema,
execution_props: &ExecutionProps,
) -> Result<LexOrdering> {
exprs
.iter()
.map(|expr| create_physical_sort_expr(expr, input_dfschema, execution_props))
.collect::<Result<Vec<_>>>()
}

impl DefaultPhysicalPlanner {
/// Handles capturing the various plans for EXPLAIN queries
///
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-expr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub use physical_expr::{
physical_exprs_bag_equal, physical_exprs_contains, physical_exprs_equal,
PhysicalExpr, PhysicalExprRef,
};
pub use planner::create_physical_expr;
pub use planner::{create_physical_expr, create_physical_exprs};
pub use scalar_function::ScalarFunctionExpr;
pub use sort_expr::{
LexOrdering, LexOrderingRef, LexRequirement, LexRequirementRef, PhysicalSortExpr,
Expand Down
48 changes: 26 additions & 22 deletions datafusion/physical-expr/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,20 +168,15 @@ pub fn create_physical_expr(
} else {
None
};
let when_expr = case
let (when_expr, then_expr): (Vec<&Expr>, Vec<&Expr>) = case
.when_then_expr
.iter()
.map(|(w, _)| {
create_physical_expr(w.as_ref(), input_dfschema, execution_props)
})
.collect::<Result<Vec<_>>>()?;
let then_expr = case
.when_then_expr
.iter()
.map(|(_, t)| {
create_physical_expr(t.as_ref(), input_dfschema, execution_props)
})
.collect::<Result<Vec<_>>>()?;
.map(|(w, t)| (w.as_ref(), t.as_ref()))
.unzip();
let when_expr =
create_physical_exprs(when_expr, input_dfschema, execution_props)?;
let then_expr =
create_physical_exprs(then_expr, input_dfschema, execution_props)?;
let when_then_expr: Vec<(Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>)> =
when_expr
.iter()
Expand Down Expand Up @@ -248,10 +243,8 @@ pub fn create_physical_expr(
}

Expr::ScalarFunction(ScalarFunction { func_def, args }) => {
let physical_args = args
.iter()
.map(|e| create_physical_expr(e, input_dfschema, execution_props))
.collect::<Result<Vec<_>>>()?;
let physical_args =
create_physical_exprs(args, input_dfschema, execution_props)?;

match func_def {
ScalarFunctionDefinition::BuiltIn(fun) => {
Expand Down Expand Up @@ -310,12 +303,8 @@ pub fn create_physical_expr(
let value_expr =
create_physical_expr(expr, input_dfschema, execution_props)?;

let list_exprs = list
.iter()
.map(|expr| {
create_physical_expr(expr, input_dfschema, execution_props)
})
.collect::<Result<Vec<_>>>()?;
let list_exprs =
create_physical_exprs(list, input_dfschema, execution_props)?;
expressions::in_list(value_expr, list_exprs, negated, input_schema)
}
},
Expand All @@ -325,6 +314,21 @@ pub fn create_physical_expr(
}
}

/// Create vector of Physical Expression from a vector of logical expression
pub fn create_physical_exprs<'a, I>(
exprs: I,
input_dfschema: &DFSchema,
execution_props: &ExecutionProps,
) -> Result<Vec<Arc<dyn PhysicalExpr>>>
where
I: IntoIterator<Item = &'a Expr>,
{
exprs
.into_iter()
.map(|expr| create_physical_expr(expr, input_dfschema, execution_props))
.collect::<Result<Vec<_>>>()
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down

0 comments on commit 56c735c

Please sign in to comment.