Skip to content

Commit

Permalink
Refactor codebase to implement PartialEq, Eq, and Hash traits for Ope…
Browse files Browse the repository at this point in the history
…rator, Alias, SortExpr, BinaryExpr, and CastExpr
  • Loading branch information
holicc committed Sep 6, 2024
1 parent ad99f04 commit ee957e7
Show file tree
Hide file tree
Showing 21 changed files with 402 additions and 69 deletions.
2 changes: 1 addition & 1 deletion qurious/src/datatypes/operator.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::fmt::Display;

#[derive(Debug, Clone, Copy)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum Operator {
Eq,
NotEq,
Expand Down
8 changes: 8 additions & 0 deletions qurious/src/datatypes/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,14 @@ impl TryFrom<&DataType> for ScalarValue {
}
}

impl Eq for ScalarValue {}

impl std::hash::Hash for ScalarValue {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.to_value_string().hash(state);
}
}

impl Display for ScalarValue {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Expand Down
2 changes: 1 addition & 1 deletion qurious/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ macro_rules! arrow_err {
#[macro_export]
macro_rules! internal_err {
($($arg:tt)*) => {
Error::InternalError(format!($($arg)*))
Err(Error::InternalError(format!($($arg)*)))
};
}

Expand Down
9 changes: 7 additions & 2 deletions qurious/src/execution/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::common::table_relation::TableRelation;
use crate::datasource::memory::MemoryTable;
use crate::error::Error;
use crate::logical::plan::{CreateMemoryTable, DdlStatement, DmlOperator, DmlStatement, DropTable, LogicalPlan};
use crate::optimizer::Optimzier;
use crate::planner::sql::{parse_csv_options, parse_file_path, SqlQueryPlanner};
use crate::planner::QueryPlanner;
use crate::provider::catalog::CatalogProvider;
Expand All @@ -27,6 +28,7 @@ pub struct ExecuteSession {
planner: Arc<dyn QueryPlanner>,
table_factory: DefaultTableFactory,
catalog_list: CatalogProviderList,
optimizer: Optimzier,
}

impl ExecuteSession {
Expand All @@ -46,6 +48,7 @@ impl ExecuteSession {
planner: Arc::new(DefaultQueryPlanner::default()),
catalog_list,
table_factory: DefaultTableFactory::new(),
optimizer: Optimzier::new(),
})
}

Expand All @@ -61,7 +64,9 @@ impl ExecuteSession {
}

pub fn execute_logical_plan(&self, plan: &LogicalPlan) -> Result<Vec<RecordBatch>> {
match plan {
// let plan = self.optimizer.optimize(plan)?;

match &plan {
LogicalPlan::Ddl(ddl) => self.execute_ddl(ddl),
LogicalPlan::Dml(DmlStatement {
relation, op, input, ..
Expand Down Expand Up @@ -221,7 +226,7 @@ mod tests {
session.sql(sql)?;
session.sql("insert into t values(1,4,2), (2,3,3), (3,4,4), (4,3,5)")?;

let batch = session.sql("select sum(v1), count(v3), min(v3), max(v1) from t group by v2, v2")?;
let batch = session.sql("select 1+0.1")?;

print_batches(&batch)?;

Expand Down
5 changes: 5 additions & 0 deletions qurious/src/logical/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ impl LogicalPlanBuilder {
Projection::try_new(input, exprs.into_iter().map(|exp| exp.into()).collect()).map(LogicalPlan::Projection)
}

pub fn add_project(self, exprs: impl IntoIterator<Item = impl Into<LogicalExpr>>) -> Result<Self> {
Projection::try_new(self.plan, exprs.into_iter().map(|exp| exp.into()).collect())
.map(|s| LogicalPlanBuilder::from(LogicalPlan::Projection(s)))
}

pub fn empty(produce_one_row: bool) -> Self {
LogicalPlanBuilder {
plan: LogicalPlan::EmptyRelation(EmptyRelation {
Expand Down
34 changes: 25 additions & 9 deletions qurious/src/logical/expr/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ use crate::logical::plan::LogicalPlan;
use std::fmt::Display;
use std::sync::Arc;

#[derive(Debug, Clone, PartialEq)]
use super::Column;

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum AggregateOperator {
Sum,
Min,
Expand Down Expand Up @@ -40,22 +42,36 @@ impl From<String> for AggregateOperator {
}
}

#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct AggregateExpr {
pub op: AggregateOperator,
pub expr: Box<LogicalExpr>,
}

impl AggregateExpr {
pub fn field(&self, plan: &LogicalPlan) -> Result<FieldRef> {
match self.op {
AggregateOperator::Count => Ok(Arc::new(Field::new(
self.op.to_string(),
self.expr.field(plan)?.data_type().clone(),
self.expr.field(plan).map(|field| {
let col_name = if let LogicalExpr::Column(inner) = self.expr.as_ref() {
&inner.quanlified_name()
} else {
field.name()
};

Arc::new(Field::new(
format!("{}({})", self.op, col_name),
field.data_type().clone(),
false,
))),
_ => self.expr.field(plan),
}
))
})
}

pub fn as_column(&self) -> Result<LogicalExpr> {
self.expr.as_column().map(|inner_col| {
LogicalExpr::Column(Column {
name: format!("{}({})", self.op, inner_col),
relation: None,
})
})
}
}

Expand Down
2 changes: 1 addition & 1 deletion qurious/src/logical/expr/alias.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::fmt::Display;

use super::LogicalExpr;

#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct Alias {
pub expr: Box<LogicalExpr>,
pub name: String,
Expand Down
2 changes: 1 addition & 1 deletion qurious/src/logical/expr/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::sync::Arc;
use super::cast::CastExpr;
use super::LogicalExpr;

#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct BinaryExpr {
pub left: Box<LogicalExpr>,
pub op: Operator,
Expand Down
2 changes: 1 addition & 1 deletion qurious/src/logical/expr/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::logical::expr::LogicalExpr;
use crate::logical::plan::LogicalPlan;
use arrow::datatypes::{DataType, Field, FieldRef};

#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct CastExpr {
pub expr: Box<LogicalExpr>,
pub data_type: DataType,
Expand Down
13 changes: 12 additions & 1 deletion qurious/src/logical/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@ pub use column::*;
pub use literal::*;
pub use sort::*;

use crate::common::table_relation::TableRelation;
use crate::datatypes::scalar::ScalarValue;
use crate::error::{Error, Result};
use crate::logical::plan::LogicalPlan;
use arrow::datatypes::{DataType, Field, FieldRef};

use self::alias::Alias;

#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum LogicalExpr {
Alias(Alias),
Column(Column),
Expand Down Expand Up @@ -92,6 +93,16 @@ impl LogicalExpr {
name: name.into(),
})
}

pub fn as_column(&self) -> Result<LogicalExpr> {
match self {
LogicalExpr::Column(_) => Ok(self.clone()),
LogicalExpr::Wildcard | LogicalExpr::BinaryExpr(_) | LogicalExpr::AggregateExpr(_) => Ok(
LogicalExpr::Column(Column::new(format!("{}", self), None::<TableRelation>)),
),
_ => Err(Error::InternalError(format!("Expect column, got {:?}", self))),
}
}
}

impl Display for LogicalExpr {
Expand Down
2 changes: 1 addition & 1 deletion qurious/src/logical/expr/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::fmt::Display;

use crate::logical::expr::LogicalExpr;

#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq,Hash)]
pub struct SortExpr {
pub expr: Box<LogicalExpr>,
pub asc: bool,
Expand Down
6 changes: 5 additions & 1 deletion qurious/src/logical/plan/aggregate.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use arrow::datatypes::{Schema, SchemaRef};
use itertools::Itertools;

use super::LogicalPlan;
use crate::error::Result;
Expand All @@ -17,8 +18,11 @@ pub struct Aggregate {

impl Aggregate {
pub fn try_new(input: LogicalPlan, group_expr: Vec<LogicalExpr>, aggr_expr: Vec<AggregateExpr>) -> Result<Self> {
let group_expr = group_expr.into_iter().unique().collect::<Vec<_>>();
let aggr_expr = aggr_expr.into_iter().unique().collect::<Vec<_>>();

let group_fields = group_expr.iter().map(|f| f.field(&input));
let agg_fields = aggr_expr.iter().map(|f| &f.expr).map(|f| f.field(&input));
let agg_fields = aggr_expr.iter().map(|f| f.field(&input));

Ok(Self {
schema: Arc::new(Schema::new(group_fields.chain(agg_fields).collect::<Result<Vec<_>>>()?)),
Expand Down
7 changes: 7 additions & 0 deletions qurious/src/logical/plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,13 @@ impl LogicalPlan {
}
}

pub fn base_plan(plan: &LogicalPlan) -> &LogicalPlan {
match plan {
LogicalPlan::Aggregate(Aggregate { input, .. }) => base_plan(&input),
_ => plan,
}
}

impl std::fmt::Display for LogicalPlan {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Expand Down
15 changes: 14 additions & 1 deletion qurious/src/logical/plan/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use crate::{logical::expr::LogicalExpr, logical::plan::LogicalPlan};
use std::fmt::Display;
use std::sync::Arc;

use super::base_plan;

#[derive(Debug, Clone)]
pub struct Projection {
pub schema: SchemaRef,
Expand All @@ -21,7 +23,10 @@ impl Projection {
LogicalExpr::Column(i) => Some(i.field(&input)),
LogicalExpr::Literal(i) => Some(Ok(Arc::new(i.to_field()))),
LogicalExpr::Alias(i) => Some(i.expr.field(&input)),
LogicalExpr::AggregateExpr(i) => Some(i.field(&input)),
LogicalExpr::AggregateExpr(i) => {
let plan = base_plan(&input);
Some(i.field(plan))
}
LogicalExpr::BinaryExpr(i) => Some(i.field(&input)),
a => todo!("Projection::try_new: {:?}", a),
})
Expand All @@ -32,6 +37,14 @@ impl Projection {
})
}

pub fn try_new_with_schema(input: LogicalPlan, exprs: Vec<LogicalExpr>, schema: SchemaRef) -> Result<Self> {
Ok(Self {
schema,
input: Box::new(input),
exprs,
})
}

pub fn schema(&self) -> SchemaRef {
self.schema.clone()
}
Expand Down
6 changes: 3 additions & 3 deletions qurious/src/logical/plan/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,12 @@ impl TableScan {

impl Display for TableScan {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if self.projections.is_some() {
if let Some(projections) = &self.projections {
write!(
f,
"TableScan: {} Projection: {:?}",
"TableScan: {} Projection: [{}]",
self.relation.to_quanlify_name(),
self.projections
projections.join(", ")
)
} else {
write!(f, "TableScan: {}", self.relation.to_quanlify_name())
Expand Down
5 changes: 3 additions & 2 deletions qurious/src/optimizer/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
mod optimize_projections;
mod push_down_projections;

use push_down_projections::ProjectionPushDownRule;
use optimize_projections::OptimizeProjections;

use crate::{error::Result, logical::plan::LogicalPlan};

Expand All @@ -17,7 +18,7 @@ pub struct Optimzier {
impl Optimzier {
pub fn new() -> Self {
Self {
rules: vec![Box::new(ProjectionPushDownRule)],
rules: vec![Box::new(OptimizeProjections::default())],
}
}

Expand Down
Loading

0 comments on commit ee957e7

Please sign in to comment.