Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
holicc committed Sep 27, 2024
1 parent c5cf695 commit d3744b7
Show file tree
Hide file tree
Showing 10 changed files with 97 additions and 38 deletions.
2 changes: 1 addition & 1 deletion qurious/src/logical/expr/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ impl AggregateExpr {
Arc::new(Field::new(
format!("{}({})", self.op, col_name),
field.data_type().clone(),
false,
true,
))
})
}
Expand Down
10 changes: 9 additions & 1 deletion qurious/src/logical/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,15 @@ impl Display for LogicalExpr {
LogicalExpr::Negative(e) => write!(f, "- {}", e),
LogicalExpr::Literal(v) => write!(f, "{}", v),
LogicalExpr::Wildcard => write!(f, "*"),
_ => write!(f, "{}", self),
LogicalExpr::Alias(alias) => write!(f, "{} AS {}", alias.expr, alias.name),
LogicalExpr::Column(column) => write!(f, "{column}"),
LogicalExpr::BinaryExpr(binary_expr) => write!(f, "{binary_expr}",),
LogicalExpr::AggregateExpr(aggregate_expr) => write!(f, "{aggregate_expr}",),
LogicalExpr::SortExpr(sort_expr) => write!(f, "{sort_expr}",),
LogicalExpr::Cast(cast_expr) => write!(f, "CAST ({} AS {})", cast_expr.expr, cast_expr.data_type),
LogicalExpr::Function(function) => write!(f, "{function}",),
LogicalExpr::IsNull(logical_expr) => write!(f, "{} IS NULL", logical_expr),
LogicalExpr::IsNotNull(logical_expr) => write!(f, "{} IS NOT NULL", logical_expr),
}
}
}
Expand Down
21 changes: 15 additions & 6 deletions qurious/src/logical/plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,24 @@ impl LogicalPlan {
if schema.is_none() {
schema = Some(plan.schema());
}
if let LogicalPlan::TableScan(scan) = plan {
result.push((&scan.relation, schema.take().unwrap()));
} else {
if let Some(children) = plan.children() {
list.extend(children);

match plan {
LogicalPlan::Join(Join { left, right, .. }) => {
list.push(left);
list.push(right);
}
LogicalPlan::TableScan(scan) => {
result.push((&scan.relation, schema.take().unwrap()));
}
_ => {
if let Some(children) = plan.children() {
list.extend(children);
}
}
}
}

}

result
}

Expand Down
2 changes: 1 addition & 1 deletion qurious/src/physical/expr/aggregate/sum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ impl<T: ArrowNumericType> Accumulator for SumAccumulator<T> {
_ => internal_err!("Unsupported data type: {}", T::DATA_TYPE),
}
}
None => Ok(ScalarValue::Null),
None => ScalarValue::try_from(T::DATA_TYPE),
}
}
}
11 changes: 4 additions & 7 deletions qurious/src/physical/plan/aggregate/no_grouping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use crate::error::{Error, Result};
use crate::physical::expr::AggregateExpr;
use crate::physical::plan::PhysicalPlan;
use arrow::array::RecordBatch;
use arrow::compute::concat;
use arrow::datatypes::SchemaRef;
use std::sync::Arc;

Expand Down Expand Up @@ -43,15 +42,13 @@ impl PhysicalPlan for NoGroupingAggregate {
.iter()
.map(|batch| expr.expression().evaluate(&batch))
.collect::<Result<Vec<_>>>()
.and_then(|array| {
let array_ref = array.iter().map(|v| v.as_ref()).collect::<Vec<_>>();
concat(&array_ref).map_err(|e| arrow_err!(e))
})
})
.collect::<Result<Vec<_>>>()?;

for (accum, value) in accums.iter_mut().zip(values.iter()) {
accum.accumluate(value)?;
for (accum, values) in accums.iter_mut().zip(values.iter()) {
for ele in values {
accum.accumluate(ele)?;
}
}

let columns = accums
Expand Down
32 changes: 21 additions & 11 deletions qurious/src/planner/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ impl<'a> SqlQueryPlanner<'a> {
.into_iter()
.map(|col| {
let name = col.name.clone();
let data_type = sql_to_arrow_data_type(&col.datatype);
let data_type = sql_to_arrow_data_type(&col.datatype)?;
Ok(Field::new(&name, data_type, col.nullable))
})
.collect::<Result<Vec<_>>>()?,
Expand Down Expand Up @@ -707,11 +707,11 @@ impl<'a> SqlQueryPlanner<'a> {
}
Expression::Cast { expr, data_type } => {
let expr = self.sql_to_expr(*expr)?;
Ok(expr.cast_to(&sql_to_arrow_data_type(&data_type)))
Ok(expr.cast_to(&sql_to_arrow_data_type(&data_type)?))
}
Expression::TypedString { data_type, value } => Ok(LogicalExpr::Cast(CastExpr {
expr: Box::new(LogicalExpr::Literal(ScalarValue::Utf8(Some(value)))),
data_type: sql_to_arrow_data_type(&data_type),
data_type: sql_to_arrow_data_type(&data_type)?,
})),
Expression::Extract { field, expr } => self.handle_function(
"EXTRACT",
Expand Down Expand Up @@ -985,15 +985,25 @@ pub(crate) fn parse_csv_options(mut args: Vec<FunctionArgument>) -> Result<CsvRe
Ok(options)
}

fn sql_to_arrow_data_type(data_type: &sqlparser::datatype::DataType) -> arrow::datatypes::DataType {
fn sql_to_arrow_data_type(data_type: &sqlparser::datatype::DataType) -> Result<arrow::datatypes::DataType> {
match data_type {
sqlparser::datatype::DataType::Integer => arrow::datatypes::DataType::Int64,
sqlparser::datatype::DataType::Boolean => arrow::datatypes::DataType::Boolean,
sqlparser::datatype::DataType::Float => arrow::datatypes::DataType::Float64,
sqlparser::datatype::DataType::String => arrow::datatypes::DataType::Utf8,
sqlparser::datatype::DataType::Date => arrow::datatypes::DataType::Date32,
sqlparser::datatype::DataType::Timestamp => arrow::datatypes::DataType::Timestamp(TimeUnit::Millisecond, None),
sqlparser::datatype::DataType::Int16 => arrow::datatypes::DataType::Int16,
sqlparser::datatype::DataType::Integer => Ok(arrow::datatypes::DataType::Int64),
sqlparser::datatype::DataType::Boolean => Ok(arrow::datatypes::DataType::Boolean),
sqlparser::datatype::DataType::Float => Ok(arrow::datatypes::DataType::Float64),
sqlparser::datatype::DataType::String =>Ok( arrow::datatypes::DataType::Utf8),
sqlparser::datatype::DataType::Date => Ok(arrow::datatypes::DataType::Date32),
sqlparser::datatype::DataType::Timestamp =>Ok(arrow::datatypes::DataType::Timestamp(TimeUnit::Millisecond, None)),
sqlparser::datatype::DataType::Int16 => Ok(arrow::datatypes::DataType::Int16),
sqlparser::datatype::DataType::Decimal(precision, scale) => match (precision,scale){
(Some(precision),Some(scale)) if *precision == 0 || *precision > 76 || (*scale as i8).unsigned_abs() > (*precision as u8) => internal_err!("Decimal(precision = {precision}, scale = {scale}) should satisfy `0 < precision <= 76`, and `scale <= precision`."),
(Some(precision),Some(scale)) if *precision > 38 && *precision <= 76 => Ok(arrow::datatypes::DataType::Decimal256(*precision, *scale)),
(Some(precision),Some(scale)) if *precision <= 38 => Ok(arrow::datatypes::DataType::Decimal128(*precision, *scale)),
(Some(precision),None) if *precision == 0 || *precision > 76 => internal_err!("Decimal(precision = {precision}) should satisfy `0 < precision <= 76`."),
(Some(precision),None) if *precision <= 38 => Ok(arrow::datatypes::DataType::Decimal128(*precision, 0)),
(Some(precision),None) if *precision > 38 && *precision <= 76 => Ok(arrow::datatypes::DataType::Decimal256(*precision, 0)),
(None,None) => Ok(arrow::datatypes::DataType::Decimal128(38, 10)),
_ => internal_err!("Cannot specify only scale for decimal data type")
},
}
}

Expand Down
10 changes: 0 additions & 10 deletions qurious/tests/sql/aggregation.slt
Original file line number Diff line number Diff line change
Expand Up @@ -203,15 +203,5 @@ CREATE TABLE test(x INT, y INT);
statement ok
INSERT INTO test VALUES (1, 1), (2, 2), (3, 3), (3, 5), (NULL, NULL);

query I
SELECT first(x) FROM test;
----
1

query I
SELECT last(y) FROM test;
----
NULL

statement ok
DROP TABLE test;
4 changes: 4 additions & 0 deletions sqlparser/src/datatype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub enum DataType {
String,
Date,
Timestamp,
Decimal(Option<u8>, Option<i8>),
Int16,
}

Expand All @@ -22,6 +23,9 @@ impl Display for DataType {
DataType::Date => write!(f, "Date"),
DataType::Timestamp => write!(f, "Timestamp"),
DataType::Int16 => write!(f, "Int16"),
DataType::Decimal(precision, scale) => {
write!(f, "Decimal({:?}, {:?})", precision, scale)
}
}
}
}
Expand Down
40 changes: 39 additions & 1 deletion sqlparser/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ impl<'a> Parser<'a> {
}
let mut nullable = true;
let name = self.next_ident()?;
let datatype = self.next_token().and_then(|t| t.datatype())?;
let datatype = self.parse_data_type()?;
let primary_key = if self.next_if_token(TokenType::Keyword(Keyword::Primary)).is_some() {
self.next_except(TokenType::Keyword(Keyword::Key))?;

Expand Down Expand Up @@ -735,6 +735,26 @@ impl<'a> Parser<'a> {
TokenType::Float | TokenType::Keyword(Keyword::Double) => Ok(DataType::Float),
TokenType::Keyword(Keyword::Bool) | TokenType::Keyword(Keyword::Boolean) => Ok(DataType::Boolean),
TokenType::Keyword(Keyword::Date) => Ok(DataType::Date),
TokenType::Keyword(Keyword::Decimal) => {
let (precision, scale) = if self.next_if_token(TokenType::LParen).is_some() {
let precision = self
.next_token()?
.literal
.parse()
.map_err(|e| Error::ParseIntError(e, token.clone()))?;
self.next_except(TokenType::Comma)?;
let scale = self
.next_token()?
.literal
.parse()
.map_err(|e| Error::ParseIntError(e, token.clone()))?;
self.next_except(TokenType::RParen)?;
(Some(precision), Some(scale))
} else {
(None, None)
};
Ok(DataType::Decimal(precision, scale))
}
_ => Err(Error::ParserError(format!(
"[parse_data_type] unexpected token {:?}",
token
Expand Down Expand Up @@ -1320,6 +1340,24 @@ mod tests {

#[test]
fn test_parse_create_table() {
assert_stmt_eq(
"create table t(v1 decimal(10, 2) not null)",
Statement::CreateTable {
query: None,
table: "t".to_owned(),
columns: vec![ast::Column {
name: "v1".to_owned(),
datatype: DataType::Decimal(Some(10), Some(2)),
nullable: false,
unique: false,
references: None,
primary_key: false,
index: false,
}],
check_exists: false,
},
);

assert_stmt_eq(
"create table t (a smallint not null);",
Statement::CreateTable {
Expand Down
3 changes: 3 additions & 0 deletions sqlparser/src/token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ pub enum Keyword {
Timestamp,
Double,
SmallInt,
Decimal,
/// extract a field from a timestamp
Extract,
Year,
Expand Down Expand Up @@ -129,6 +130,7 @@ impl TokenType {
"timestamp" => TokenType::Keyword(Keyword::Timestamp),
"double" | "float" => TokenType::Keyword(Keyword::Double),
"smallint" => TokenType::Keyword(Keyword::SmallInt),
"decimal" => TokenType::Keyword(Keyword::Decimal),
"primary" => TokenType::Keyword(Keyword::Primary),
"key" => TokenType::Keyword(Keyword::Key),
"with" => TokenType::Keyword(Keyword::With),
Expand Down Expand Up @@ -242,6 +244,7 @@ impl Token {
TokenType::Keyword(Keyword::Datetime) => Ok(DataType::Timestamp),
TokenType::Keyword(Keyword::VarChar) => Ok(DataType::String),
TokenType::Keyword(Keyword::Double) => Ok(DataType::Float),

_ => Err(Error::UnKnownDataType(self.clone())),
}
}
Expand Down

0 comments on commit d3744b7

Please sign in to comment.