Skip to content

Commit

Permalink
TPCH: q1
Browse files Browse the repository at this point in the history
  • Loading branch information
holicc committed Nov 8, 2024
1 parent cfd5194 commit 1ec1c10
Show file tree
Hide file tree
Showing 28 changed files with 709 additions and 349 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ edition = "2021"

[workspace.dependencies]
sqlparser = { path = "sqlparser" }
parquet = "53.0.0"
arrow = "53.0.0"
parquet = "53.2.0"
arrow = "53.2.0"
url = "2.5.0"
log = "^0.4"
dashmap = "6.0.1"
Expand Down
5 changes: 0 additions & 5 deletions qurious/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,3 @@ rayon = { version = "1.10.0" }
harness = false
name = "sqllogictests"
path = "tests/sqllogictests.rs"

[[test]]
harness = false
name = "tpch"
path = "tests/tpch.rs"
10 changes: 10 additions & 0 deletions qurious/src/common/transformed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,16 @@ impl<T> Transformed<T> {
}
}

pub fn map_data<U, F>(self, f: F) -> Result<Transformed<U>>
where
F: FnOnce(T) -> Result<U>,
{
f(self.data).map(|u| Transformed {
data: u,
transformed: self.transformed,
})
}

pub fn update<U, F>(self, f: F) -> Transformed<U>
where
F: FnOnce(T) -> U,
Expand Down
72 changes: 47 additions & 25 deletions qurious/src/datatypes/scalar.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
use crate::error::{Error, Result};
use arrow::{
array::{
new_null_array, Array, ArrayRef, BooleanArray, Decimal128Array, Decimal256Array, Float32Array, Float64Array,
Int16Array, Int32Array, Int64Array, Int8Array, LargeStringArray, StringArray, UInt16Array, UInt32Array,
UInt64Array, UInt8Array,
new_null_array, Array, ArrayRef, ArrowPrimitiveType, BooleanArray, Decimal128Array, Decimal256Array, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, LargeStringArray, PrimitiveArray, StringArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array
},
datatypes::{i256, DataType, Field},
};
Expand Down Expand Up @@ -55,10 +53,28 @@ macro_rules! build_decimal_array {
}

macro_rules! format_option {
($F:expr, $EXPR:expr) => {{
($F:expr, $EXPR:expr, $TYPE:expr) => {{
match $EXPR {
Some(e) => write!($F, "{e}"),
None => write!($F, "NULL"),
Some(e) => write!($F, "{}({})", $TYPE, e),
None => write!($F, "{}(NULL)", $TYPE),
}
}};
}

macro_rules! format_decimal {
($F:expr, $EXPR:expr, $TYPE:expr, $P:expr, $S:expr) => {{
match $EXPR {
Some(val) => write!($F, "{}({},{},{})", $TYPE, val, $P, $S),
None => write!($F, "{}(NULL,{},{})", $TYPE, $P, $S),
}
}};
}

macro_rules! format_string {
($F:expr, $EXPR:expr, $TYPE:expr) => {{
match $EXPR {
Some(val) => write!($F, "{}('{}')", $TYPE, val),
None => write!($F, "{}(NULL)", $TYPE),
}
}};
}
Expand All @@ -85,6 +101,16 @@ pub enum ScalarValue {
}

impl ScalarValue {
pub fn new_primitive<T: ArrowPrimitiveType>(a: Option<T::Native>, d: &DataType) -> Result<Self> {
match a {
None => d.try_into(),
Some(v) => {
let array = PrimitiveArray::<T>::new(vec![v].into(), None).with_data_type(d.clone());
Self::try_from_array(&array, 0)
}
}
}

pub fn to_field(&self) -> Field {
match self {
ScalarValue::Null => Field::new("null", DataType::Null, true),
Expand Down Expand Up @@ -232,25 +258,21 @@ impl std::hash::Hash for ScalarValue {
impl Display for ScalarValue {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ScalarValue::Null => write!(f, "NULL"),
ScalarValue::Boolean(v) => format_option!(f, v),
ScalarValue::Int64(v) => format_option!(f, v),
ScalarValue::Int32(v) => format_option!(f, v),
ScalarValue::Int16(v) => format_option!(f, v),
ScalarValue::Int8(v) => format_option!(f, v),
ScalarValue::UInt64(v) => format_option!(f, v),
ScalarValue::UInt32(v) => format_option!(f, v),
ScalarValue::UInt16(v) => format_option!(f, v),
ScalarValue::UInt8(v) => format_option!(f, v),
ScalarValue::Float64(v) => format_option!(f, v),
ScalarValue::Float32(v) => format_option!(f, v),
ScalarValue::Decimal128(v, p, s) => {
write!(f, "{v:?},{p:?},{s:?}")
}
ScalarValue::Decimal256(v, p, s) => {
write!(f, "{v:?},{p:?},{s:?}")
}
ScalarValue::Utf8(v) => format_option!(f, v),
ScalarValue::Null => write!(f, "Null"),
ScalarValue::Boolean(v) => format_option!(f, v, "Boolean"),
ScalarValue::Int64(v) => format_option!(f, v, "Int64"),
ScalarValue::Int32(v) => format_option!(f, v, "Int32"),
ScalarValue::Int16(v) => format_option!(f, v, "Int16"),
ScalarValue::Int8(v) => format_option!(f, v, "Int8"),
ScalarValue::UInt64(v) => format_option!(f, v, "UInt64"),
ScalarValue::UInt32(v) => format_option!(f, v, "UInt32"),
ScalarValue::UInt16(v) => format_option!(f, v, "UInt16"),
ScalarValue::UInt8(v) => format_option!(f, v, "UInt8"),
ScalarValue::Float64(v) => format_option!(f, v, "Float64"),
ScalarValue::Float32(v) => format_option!(f, v, "Float32"),
ScalarValue::Decimal128(v, p, s) => format_decimal!(f, v, "Decimal128", p, s),
ScalarValue::Decimal256(v, p, s) => format_decimal!(f, v, "Decimal256", p, s),
ScalarValue::Utf8(v) => format_string!(f, v, "Utf8"),
}
}
}
49 changes: 27 additions & 22 deletions qurious/src/execution/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,9 @@ mod tests {
l_rev VARCHAR,
);",
)?;
session.sql("COPY LINEITEM FROM './tests/test.tbl' ( DELIMITER '|' );")?;
// session.sql("COPY LINEITEM FROM './tests/test.tbl' ( DELIMITER '|' );")?;
session.sql("COPY LINEITEM FROM './tests/tpch/data/lineitem.tbl' ( DELIMITER '|' );")?;

// session.sql("create table t(v1 int not null, v2 int not null, v3 double not null)")?;

// session.sql("create table x(a int, b int);")?;
Expand All @@ -299,29 +301,32 @@ mod tests {
println!("++++++++++++++");
let batch = session.sql(
"
select
l_returnflag,
l_linestatus,
sum(l_quantity) as sum_qty,
sum(l_extendedprice) as sum_base_price,
sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
avg(l_quantity) as avg_qty,
avg(l_extendedprice) as avg_price,
avg(l_discount) as avg_disc,
count(*) as count_order
from
lineitem
where
l_shipdate <= date '1998-09-02'
group by
l_returnflag,
l_linestatus
order by
l_returnflag,
l_linestatus;",
select
l_returnflag,
l_linestatus,
sum(l_quantity) as sum_qty,
sum(l_extendedprice) as sum_base_price,
sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
avg(l_quantity) as avg_qty,
avg(l_extendedprice) as avg_price,
avg(l_discount) as avg_disc,
count(*) as count_order
from
lineitem
where
l_shipdate <= date '1998-09-02'
group by
l_returnflag,
l_linestatus
order by
l_returnflag,
l_linestatus;
",
)?;

// let batch = session.sql("select avg(l_quantity) as count_order from lineitem")?;

print_batches(&batch)?;

Ok(())
Expand Down
25 changes: 23 additions & 2 deletions qurious/src/logical/expr/aggregate.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
use arrow::datatypes::{DataType, Field, FieldRef};
use arrow::datatypes::{
DataType, Field, FieldRef, DECIMAL128_MAX_PRECISION, DECIMAL128_MAX_SCALE, DECIMAL256_MAX_PRECISION,
DECIMAL256_MAX_SCALE,
};

use crate::error::{Error, Result};
use crate::internal_err;
use crate::logical::expr::LogicalExpr;
use crate::logical::plan::LogicalPlan;
use std::convert::TryFrom;
Expand All @@ -22,12 +26,29 @@ impl AggregateOperator {
pub fn infer_type(&self, expr_data_type: &DataType) -> Result<DataType> {
match self {
AggregateOperator::Count => Ok(DataType::Int64),
AggregateOperator::Avg => Ok(DataType::Float64),
AggregateOperator::Avg => avg_return_type(expr_data_type),
_ => Ok(expr_data_type.clone()),
}
}
}

fn avg_return_type(expr_data_type: &DataType) -> Result<DataType> {
match expr_data_type {
DataType::Decimal128(precision, scale) => {
let new_precision = DECIMAL128_MAX_PRECISION.min(*precision + 4);
let new_scale = DECIMAL128_MAX_SCALE.min(*scale + 4);
Ok(DataType::Decimal128(new_precision, new_scale))
}
DataType::Decimal256(precision, scale) => {
let new_precision = DECIMAL256_MAX_PRECISION.min(*precision + 4);
let new_scale = DECIMAL256_MAX_SCALE.min(*scale + 4);
Ok(DataType::Decimal256(new_precision, new_scale))
}
arg_type if arg_type.is_integer() || arg_type.is_floating() => Ok(DataType::Float64),
other => internal_err!("avg does not support {other:?}"),
}
}

impl Display for AggregateOperator {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Expand Down
23 changes: 4 additions & 19 deletions qurious/src/logical/expr/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use arrow::datatypes::{DataType, Field, FieldRef, Schema};
use crate::datatypes::operator::Operator;
use crate::error::Result;
use crate::logical::plan::LogicalPlan;
use crate::utils;
use crate::utils::type_coercion::get_result_type;
use std::fmt::Display;
use std::sync::Arc;

Expand Down Expand Up @@ -34,24 +34,9 @@ impl BinaryExpr {
}

pub fn get_result_type(&self, schema: &Arc<Schema>) -> Result<DataType> {
match self.op {
Operator::Eq
| Operator::NotEq
| Operator::Gt
| Operator::GtEq
| Operator::Lt
| Operator::LtEq
| Operator::And
| Operator::Or => {
return Ok(DataType::Boolean);
}
_ => {
let left_type = self.left.data_type(schema)?;
let right_type = self.right.data_type(schema)?;

Ok(utils::get_input_types(&left_type, &right_type))
}
}
let lhs = self.left.data_type(schema)?;
let rhs = self.right.data_type(schema)?;
get_result_type(&lhs, &self.op, &rhs)
}
}

Expand Down
5 changes: 2 additions & 3 deletions qurious/src/logical/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,8 @@ impl LogicalExpr {
LogicalExpr::BinaryExpr(binary_expr) => binary_expr.get_result_type(schema),
LogicalExpr::Cast(cast_expr) => Ok(cast_expr.data_type.clone()),
LogicalExpr::Function(function) => Ok(function.func.return_type()),
LogicalExpr::AggregateExpr(AggregateExpr { expr, .. })
| LogicalExpr::SortExpr(SortExpr { expr, .. })
| LogicalExpr::Negative(expr) => expr.data_type(schema),
LogicalExpr::AggregateExpr(AggregateExpr { op, expr }) => op.infer_type(&expr.data_type(schema)?),
LogicalExpr::SortExpr(SortExpr { expr, .. }) | LogicalExpr::Negative(expr) => expr.data_type(schema),
LogicalExpr::IsNull(_) | LogicalExpr::IsNotNull(_) => Ok(DataType::Boolean),
LogicalExpr::Wildcard => internal_err!("Wildcard has no data type"),
}
Expand Down
Loading

0 comments on commit 1ec1c10

Please sign in to comment.