Skip to content

Commit

Permalink
face problem on optimizer 😣
Browse files Browse the repository at this point in the history
  • Loading branch information
holicc committed Jun 28, 2024
1 parent c8e7af9 commit b313c3c
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 16 deletions.
6 changes: 6 additions & 0 deletions src/datasource/file/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ pub trait DataFilePath {
fn to_url(self) -> Result<Url>;
}

impl DataFilePath for &String {
fn to_url(self) -> Result<Url> {
parse_path(self)
}
}

impl DataFilePath for String {
fn to_url(self) -> Result<Url> {
parse_path(self)
Expand Down
6 changes: 5 additions & 1 deletion src/execution/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,23 @@ use crate::datasource::DataSource;
use crate::error::Error;
use crate::execution::registry::HashMapTableRegistry;
use crate::logical::plan::LogicalPlan;
use crate::optimizer::Optimzier;
use crate::planner::sql::SqlQueryPlanner;
use crate::planner::QueryPlanner;
use crate::{error::Result, planner::DefaultQueryPlanner};

pub struct ExecuteSession {
tables: Arc<RwLock<dyn TableRegistry>>,
query_planner: Box<dyn QueryPlanner>,
optimizer: Optimzier,
}

impl Default for ExecuteSession {
fn default() -> Self {
Self {
tables: Arc::new(RwLock::new(HashMapTableRegistry::default())),
query_planner: Box::new(DefaultQueryPlanner),
optimizer: Optimzier::new(),
}
}
}
Expand All @@ -32,7 +35,8 @@ impl ExecuteSession {
}

pub fn execute_logical_plan(&self, plan: &LogicalPlan) -> Result<Vec<RecordBatch>> {
self.query_planner.create_physical_plan(plan)?.execute()
let plan = self.optimizer.optimize(plan)?;
self.query_planner.create_physical_plan(&plan)?.execute()
}

pub fn register_table(&mut self, name: &str, table: Arc<dyn DataSource>) -> Result<()> {
Expand Down
32 changes: 32 additions & 0 deletions src/optimizer/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,41 @@
mod push_down_projections;

use push_down_projections::ProjectionPushDownRule;

use crate::{error::Result, logical::plan::LogicalPlan};

pub trait OptimizerRule {
fn name(&self) -> &str;

fn optimize(&self, plan: &LogicalPlan) -> Result<Option<LogicalPlan>>;
}

pub struct Optimzier {
rules: Vec<Box<dyn OptimizerRule + Sync + Send>>,
}

impl Optimzier {
pub fn new() -> Self {
Self {
rules: vec![Box::new(ProjectionPushDownRule)],
}
}

pub fn optimize(&self, plan: &LogicalPlan) -> Result<LogicalPlan> {
let mut plan = plan.clone();
let mut optimized = true;
while optimized {
optimized = false;
for rule in &self.rules {
match rule.optimize(&plan)? {
Some(new_plan) => {
plan = new_plan;
optimized = true;
}
None => {}
}
}
}
Ok(plan)
}
}
42 changes: 30 additions & 12 deletions src/planner/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ use sqlparser::{
use crate::{
common::{JoinType, OwnedTableRelation, TableRelation},
datasource::{
file::csv::{self, CsvReadOptions},
file::{
csv::{self, CsvReadOptions},
parquet::read_parquet,
},
DataSource,
},
datatypes::scalar::ScalarValue,
Expand Down Expand Up @@ -175,22 +178,37 @@ impl SqlQueryPlanner {
}

fn table_func_to_plan(&mut self, name: String, args: Vec<Assignment>) -> Result<LogicalPlan> {
match name.to_lowercase().as_str() {
let (table_name, source) = match name.to_lowercase().as_str() {
"read_csv" => {
let (path, options) = self.parse_csv_options(args)?;
let table_name = "tmp_csv_table";
let table_srouce = csv::read_csv(path, options)?;
let plan = LogicalPlanBuilder::scan(table_name, table_srouce.clone(), None)?.build();
// register the table to the table registry
// TODO: we should use a unique name for the table and apply the alias
self.add_new_table(table_name, table_srouce)?;

Ok(plan)
("tmp_csv_table", csv::read_csv(path, options)?)
}
"read_parquet" => {
let path = match args.get(0) {
Some(Assignment {
value: Expression::Literal(Literal::String(s)),
..
}) => s,
_ => {
return Err(Error::InternalError(
"read_parquet function requires the first argument to be a string".to_owned(),
))
}
};

("tmp_parquet_table", read_parquet(path)?)
}
"read_json" => todo!(),

_ => todo!(),
}
};

let plan = LogicalPlanBuilder::scan(table_name, source.clone(), None)?.build();
// register the table to the table registry
// TODO: we should use a unique name for the table and apply the alias
self.add_new_table(table_name, source)?;

Ok(plan)
// register the table to the table registry
}

fn filter_expr(&self, mut plan: LogicalPlan, expr: Option<Expression>) -> Result<LogicalPlan> {
Expand Down
13 changes: 10 additions & 3 deletions src/server/postgresql/datatypes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ fn get_utf8_value(arr: &Arc<dyn Array>, idx: usize) -> &str {
arr.as_any().downcast_ref::<StringArray>().unwrap().value(idx)
}

fn get_large_utf8_value(arr: &Arc<dyn Array>, idx: usize) -> &str {
arr.as_any().downcast_ref::<LargeStringArray>().unwrap().value(idx)
}

fn get_date32_value(arr: &Arc<dyn Array>, idx: usize) -> Option<NaiveDate> {
arr.as_any().downcast_ref::<Date32Array>().unwrap().value_as_date(idx)
}
Expand Down Expand Up @@ -221,6 +225,7 @@ fn encode_value(encoder: &mut DataRowEncoder, arr: &Arc<dyn Array>, idx: usize)
DataType::Float32 => encoder.encode_field(&get_f32_value(arr, idx))?,
DataType::Float64 => encoder.encode_field(&get_f64_value(arr, idx))?,
DataType::Utf8 => encoder.encode_field(&get_utf8_value(arr, idx))?,
DataType::LargeUtf8 => encoder.encode_field(&get_large_utf8_value(arr, idx))?,
DataType::Date32 => encoder.encode_field(&get_date32_value(arr, idx))?,
DataType::Date64 => encoder.encode_field(&get_date64_value(arr, idx))?,
DataType::Time32(unit) => match unit {
Expand Down Expand Up @@ -253,7 +258,6 @@ fn encode_value(encoder: &mut DataRowEncoder, arr: &Arc<dyn Array>, idx: usize)
DataType::Float32 => encoder.encode_field(&get_f32_list_value(arr, idx))?,
DataType::Float64 => encoder.encode_field(&get_f64_list_value(arr, idx))?,
DataType::Utf8 => encoder.encode_field(&get_utf8_list_value(arr, idx))?,

// TODO: more types
list_type => {
return Err(PgWireError::UserError(Box::new(ErrorInfo::new(
Expand All @@ -263,10 +267,11 @@ fn encode_value(encoder: &mut DataRowEncoder, arr: &Arc<dyn Array>, idx: usize)
))))
}
},

_ => {
return Err(PgWireError::UserError(Box::new(ErrorInfo::new(
"ERROR".to_owned(),
"XX000".to_owned(),
"XX001".to_owned(),
format!("Unsupported Datatype {} and array {:?}", arr.data_type(), &arr),
))))
}
Expand All @@ -289,6 +294,7 @@ pub(crate) fn into_pg_type(f: &Field) -> PgWireResult<FieldInfo> {
DataType::Float32 => Type::FLOAT4,
DataType::Float64 => Type::FLOAT8,
DataType::Utf8 => Type::VARCHAR,
DataType::LargeUtf8 => Type::TEXT,
DataType::List(field) => match field.data_type() {
DataType::Boolean => Type::BOOL_ARRAY,
DataType::Int8 | DataType::UInt8 => Type::CHAR_ARRAY,
Expand All @@ -302,6 +308,7 @@ pub(crate) fn into_pg_type(f: &Field) -> PgWireResult<FieldInfo> {
DataType::Float32 => Type::FLOAT4_ARRAY,
DataType::Float64 => Type::FLOAT8_ARRAY,
DataType::Utf8 => Type::VARCHAR_ARRAY,
DataType::LargeUtf8 => Type::TEXT_ARRAY,
list_type => {
return Err(PgWireError::UserError(Box::new(ErrorInfo::new(
"ERROR".to_owned(),
Expand All @@ -313,7 +320,7 @@ pub(crate) fn into_pg_type(f: &Field) -> PgWireResult<FieldInfo> {
_ => {
return Err(PgWireError::UserError(Box::new(ErrorInfo::new(
"ERROR".to_owned(),
"XX000".to_owned(),
"XX002".to_owned(),
format!("Unsupported Datatype {}", f.data_type()),
))));
}
Expand Down

0 comments on commit b313c3c

Please sign in to comment.