Skip to content

Commit

Permalink
WIP: support pgwrie
Browse files Browse the repository at this point in the history
  • Loading branch information
holicc committed Jun 26, 2024
1 parent d2f108e commit c2d1614
Show file tree
Hide file tree
Showing 23 changed files with 792 additions and 199 deletions.
3 changes: 3 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,7 @@
"rust-analyzer.linkedProjects": [
"./Cargo.toml"
],
"rust-analyzer.cargo.features": [
"postgresql"
],
}
12 changes: 10 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,18 @@ arrow = "51.0.0"
sqlparser = { git = "https://github.com/holicc/sqlparser.git" }
url = "2.5.0"
parquet = "51.0.0"
pgwire = "0.22.0"
tokio = { version = "1.37.0", features = ["macros","time"] }

tokio = { version = "1.37.0", features = ["full"] }
async-trait = "0.1.80"
log = "0.4.21"

pgwire = { version = "0.23.0", optional = true }
tokio-postgres = { version = "0.7.10", optional = true }
chrono = "0.4.38"
futures = "0.3.30"

[features]
postgresql = ["dep:pgwire", "dep:tokio-postgres"]

[dev-dependencies]
arrow = { version = "51.0.0", features = ["prettyprint", "test_utils"] }
2 changes: 1 addition & 1 deletion src/datasource/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{error::Result, logical::expr::LogicalExpr};
use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
use std::fmt::Debug;

pub trait DataSource: Debug {
pub trait DataSource: Debug + Sync + Send {
fn schema(&self) -> SchemaRef;

/// Perform a scan of the data source and return the results as RecordBatch
Expand Down
2 changes: 1 addition & 1 deletion src/datatypes/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,4 +154,4 @@ impl Display for ScalarValue {
ScalarValue::Utf8(None) => write!(f, "null"),
}
}
}
}
21 changes: 4 additions & 17 deletions src/error.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::{fmt::Display, io};

use arrow::error::ArrowError;
use std::fmt::Display;

pub type Result<T, E = Error> = std::result::Result<T, E>;

Expand All @@ -17,21 +16,9 @@ pub enum Error {
TableNotFound(String),
}

impl From<ArrowError> for Error {
fn from(e: ArrowError) -> Self {
Error::ArrowError(e)
}
}

impl From<io::Error> for Error {
fn from(e: io::Error) -> Self {
Error::InternalError(e.to_string())
}
}

impl From<parquet::errors::ParquetError> for Error {
fn from(e: parquet::errors::ParquetError) -> Self {
Error::InternalError(e.to_string())
impl<T: std::error::Error> From<T> for Error {
fn from(value: T) -> Self {
Error::InternalError(value.to_string())
}
}

Expand Down
4 changes: 3 additions & 1 deletion src/execution/registry/mod.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
use crate::datasource::DataSource;
use crate::error::{Error, Result};
use std::collections::HashMap;
use std::fmt::Debug;
use std::sync::Arc;

pub trait TableRegistry {
pub trait TableRegistry: Debug + Sync + Send {
fn register_table(&mut self, name: &str, table: Arc<dyn DataSource>) -> Result<()>;

fn get_table_source(&self, name: &str) -> Result<Arc<dyn DataSource>>;
}

#[derive(Debug)]
pub struct HashMapTableRegistry {
tables: HashMap<String, Arc<dyn DataSource>>,
}
Expand Down
24 changes: 11 additions & 13 deletions src/execution/session.rs
Original file line number Diff line number Diff line change
@@ -1,40 +1,38 @@
use std::sync::Arc;
use std::sync::{Arc, RwLock};

use arrow::array::RecordBatch;

use super::registry::TableRegistry;
use crate::datasource::DataSource;
use crate::execution::registry::HashMapTableRegistry;
use crate::planner::sql::SqlQueryPlanner;
use crate::planner::QueryPlanner;
use crate::{error::Result, planner::DefaultQueryPlanner};

use crate::execution::registry::HashMapTableRegistry;

use super::registry::TableRegistry;

pub struct ExecuteSession {
tables: Box<dyn TableRegistry>,
tables: Arc<RwLock<dyn TableRegistry>>,
query_planner: Box<dyn QueryPlanner>,
}

impl Default for ExecuteSession {
fn default() -> Self {
Self {
tables: Box::new(HashMapTableRegistry::default()),
tables: Arc::new(RwLock::new(HashMapTableRegistry::default())),
query_planner: Box::new(DefaultQueryPlanner),
}
}
}

impl ExecuteSession {
pub fn sql(&mut self, sql: &str) -> Result<Vec<RecordBatch>> {
let mut sql_planner = SqlQueryPlanner::new(&mut *self.tables);
let plan = sql_planner.create_logical_plan(sql)?;
let plan = self.query_planner.create_physical_plan(&plan)?;
plan.execute()
pub fn sql(&self, sql: &str) -> Result<Vec<RecordBatch>> {
SqlQueryPlanner::create_logical_plan(self.tables.clone(), sql)
.and_then(|logical_plan| self.query_planner.create_physical_plan(&logical_plan))
.and_then(|plan| plan.execute())
}

pub fn register_table(&mut self, name: &str, table: Arc<dyn DataSource>) -> Result<()> {
self.tables.register_table(name, table)
let mut write = self.tables.write()?;
write.register_table(name, table)
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ pub mod logical;
pub mod optimizer;
pub mod physical;
pub mod planner;
pub mod utils;
pub mod server;
pub mod utils;

#[cfg(test)]
pub mod test_utils;
4 changes: 2 additions & 2 deletions src/logical/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ impl LogicalPlanBuilder {
Projection::try_new(input, exprs.into_iter().map(|exp| exp.into()).collect()).map(LogicalPlan::Projection)
}

pub fn empty() -> Self {
pub fn empty(produce_one_row: bool) -> Self {
LogicalPlanBuilder {
plan: LogicalPlan::EmptyRelation(EmptyRelation::new(Arc::new(Schema::empty()))),
plan: LogicalPlan::EmptyRelation(EmptyRelation::new(Arc::new(Schema::empty()), produce_one_row)),
}
}

Expand Down
10 changes: 7 additions & 3 deletions src/logical/plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,16 @@ use arrow::datatypes::SchemaRef;

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct EmptyRelation {
schema: SchemaRef,
pub produce_one_row: bool,
pub schema: SchemaRef,
}

impl EmptyRelation {
pub fn new(schema: SchemaRef) -> Self {
Self { schema }
pub fn new(schema: SchemaRef, produce_one_row: bool) -> Self {
Self {
produce_one_row,
schema,
}
}

pub fn schema(&self) -> SchemaRef {
Expand Down
1 change: 1 addition & 0 deletions src/logical/plan/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ impl Projection {
.iter()
.filter_map(|f| match f {
LogicalExpr::Column(i) => Some(i.field(&input)),
LogicalExpr::Literal(i) => Some(Ok(Arc::new(i.to_field()))),
_ => None,
})
.collect::<Result<Vec<FieldRef>>>()
Expand Down
59 changes: 59 additions & 0 deletions src/physical/plan/empty.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
use std::sync::Arc;

use crate::error::Result;
use crate::physical::plan::PhysicalPlan;
use arrow::array::{ArrayRef, NullArray, RecordBatch, RecordBatchOptions};
use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef};

pub struct EmptyRelation {
produce_one_row: bool,
schema: SchemaRef,
}

impl EmptyRelation {
pub fn new(schema: SchemaRef, produce_one_row: bool) -> Self {
Self {
schema,
produce_one_row,
}
}

fn data(&self) -> Result<Vec<RecordBatch>> {
Ok({
let n_field = self.schema.fields.len();
vec![RecordBatch::try_new_with_options(
Arc::new(Schema::new(
(0..n_field)
.map(|i| Field::new(format!("placeholder_{i}"), DataType::Null, true))
.collect::<Fields>(),
)),
(0..n_field)
.map(|_i| {
let ret: ArrayRef = Arc::new(NullArray::new(1));
ret
})
.collect(),
// Even if column number is empty we can generate single row.
&RecordBatchOptions::new().with_row_count(Some(1)),
)?]
})
}
}

impl PhysicalPlan for EmptyRelation {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}

fn execute(&self) -> Result<Vec<RecordBatch>> {
if self.produce_one_row {
self.data()
} else {
Ok(vec![])
}
}

fn children(&self) -> Option<Vec<Arc<dyn PhysicalPlan>>> {
None
}
}
6 changes: 4 additions & 2 deletions src/physical/plan/mod.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
mod aggregate;
mod empty;
mod filter;
mod join;
mod limit;
mod projection;
mod scan;
mod sort;
mod limit;

pub use aggregate::HashAggregate;
pub use empty::EmptyRelation;
pub use filter::Filter;
pub use join::{join_schema, ColumnIndex, CrossJoin, Join, JoinFilter, JoinSide};
pub use limit::Limit;
pub use projection::Projection;
pub use scan::Scan;
pub use limit::Limit;

use std::sync::Arc;

Expand Down
22 changes: 11 additions & 11 deletions src/physical/plan/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,17 @@ impl PhysicalPlan for Projection {
}

fn execute(&self) -> Result<Vec<RecordBatch>> {
let batches = self.input.execute()?;
let mut reulsts = vec![];
for batch in batches {
let mut columns = vec![];
for expr in &self.exprs {
columns.push(expr.evaluate(&batch)?);
}
reulsts.push(RecordBatch::try_new(self.schema(), columns).map_err(|e| Error::ArrowError(e))?);
}

Ok(reulsts)
self.input
.execute()?
.into_iter()
.map(|batch| {
let mut columns = vec![];
for expr in &self.exprs {
columns.push(expr.evaluate(&batch)?);
}
RecordBatch::try_new(self.schema(), columns).map_err(|e| Error::ArrowError(e))
})
.collect()
}

fn children(&self) -> Option<Vec<Arc<dyn PhysicalPlan>>> {
Expand Down
13 changes: 10 additions & 3 deletions src/planner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{
error::{Error, Result},
logical::{
expr::{AggregateOperator, BinaryExpr, Column, LogicalExpr},
plan::{Aggregate, CrossJoin, Filter, Join, LogicalPlan, Projection, TableScan},
plan::{Aggregate, CrossJoin, EmptyRelation, Filter, Join, LogicalPlan, Projection, TableScan},
},
physical::{
self,
Expand All @@ -19,7 +19,7 @@ use crate::{
},
};

pub trait QueryPlanner: Debug {
pub trait QueryPlanner: Debug + Sync + Send {
fn create_physical_plan(&self, logical_plan: &LogicalPlan) -> Result<Arc<dyn PhysicalPlan>>;

fn create_physical_expr(&self, schema: &SchemaRef, expr: &LogicalExpr) -> Result<Arc<dyn PhysicalExpr>>;
Expand Down Expand Up @@ -146,6 +146,13 @@ impl DefaultQueryPlanner {
physical::plan::Join::try_new(left, right, join.join_type, Some(join_filter))
.map(|j| Arc::new(j) as Arc<dyn PhysicalPlan>)
}

fn physical_empty_relation(&self, empty: &EmptyRelation) -> Result<Arc<dyn PhysicalPlan>> {
Ok(Arc::new(physical::plan::EmptyRelation::new(
empty.schema(),
empty.produce_one_row,
)))
}
}

impl QueryPlanner for DefaultQueryPlanner {
Expand All @@ -155,7 +162,7 @@ impl QueryPlanner for DefaultQueryPlanner {
LogicalPlan::Filter(f) => self.physical_plan_filter(f),
LogicalPlan::Aggregate(a) => self.physical_plan_aggregate(a),
LogicalPlan::TableScan(t) => self.physical_plan_table_scan(t),
LogicalPlan::EmptyRelation(_) => todo!(),
LogicalPlan::EmptyRelation(v) => self.physical_empty_relation(v),
LogicalPlan::CrossJoin(j) => self.physical_plan_cross_join(j),
LogicalPlan::SubqueryAlias(_) => todo!(),
LogicalPlan::Join(join) => self.physical_plan_join(join),
Expand Down
Loading

0 comments on commit c2d1614

Please sign in to comment.