Skip to content

Commit

Permalink
Refactor DmlStatement enum and its associated types
Browse files Browse the repository at this point in the history
  • Loading branch information
holicc committed Aug 2, 2024
1 parent 7d8cf85 commit 1c62d08
Show file tree
Hide file tree
Showing 14 changed files with 342 additions and 86 deletions.
21 changes: 19 additions & 2 deletions qurious/src/datasource/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
pub mod file;
pub mod memory;

use crate::{datatypes::scalar::ScalarValue, error::Result, logical::expr::LogicalExpr};
use crate::{datatypes::scalar::ScalarValue, error::Result, logical::expr::LogicalExpr, physical::plan::PhysicalPlan};
use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
use std::fmt::Debug;
use std::{fmt::Debug, sync::Arc};

pub trait DataSource: Debug + Sync + Send {
fn schema(&self) -> SchemaRef;
Expand All @@ -15,4 +15,21 @@ pub trait DataSource: Debug + Sync + Send {
fn get_column_default(&self, _column: &str) -> Option<&ScalarValue> {
None
}

/// Insert a new record batch into the data source
fn insert_into(&self, _input: Arc<dyn PhysicalPlan>) -> Result<Arc<dyn PhysicalPlan>> {
unimplemented!("insert_into not implemented for {:?}", self)
}

/// Delete records from the data source
/// The input plan is the filter expression to apply to the data source
fn delete(&self, _input: Arc<dyn PhysicalPlan>) -> Result<Arc<dyn PhysicalPlan>> {
unimplemented!("delete not implemented for {:?}", self)
}

/// Update records in the data source
/// The input plan is the filter expression to apply to the data source
fn update(&self, _input: Arc<dyn PhysicalPlan>) -> Result<Arc<dyn PhysicalPlan>> {
unimplemented!("update not implemented for {:?}", self)
}
}
17 changes: 9 additions & 8 deletions qurious/src/execution/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,24 @@ use crate::datasource::DataSource;
use crate::error::Error;
use crate::execution::registry::DefaultTableRegistry;
use crate::logical::plan::LogicalPlan;
use crate::optimizer::Optimzier;
use crate::planner::sql::SqlQueryPlanner;
use crate::planner::QueryPlanner;
use crate::{error::Result, planner::DefaultQueryPlanner};

pub type TableRegistryRef = Arc<RwLock<dyn TableRegistry>>;

pub struct ExecuteSession {
table_registry: Arc<RwLock<dyn TableRegistry>>,
query_planner: Box<dyn QueryPlanner>,
optimizer: Optimzier,
planner: Arc<dyn QueryPlanner>,
table_registry: TableRegistryRef,
}

impl Default for ExecuteSession {
fn default() -> Self {
let table_registry = Arc::new(RwLock::new(DefaultTableRegistry::default()));

Self {
table_registry: Arc::new(RwLock::new(DefaultTableRegistry::default())),
query_planner: Box::new(DefaultQueryPlanner),
optimizer: Optimzier::new(),
planner: Arc::new(DefaultQueryPlanner::new(table_registry.clone())),
table_registry,
}
}
}
Expand All @@ -45,7 +46,7 @@ impl ExecuteSession {

pub fn execute_logical_plan(&self, plan: &LogicalPlan) -> Result<Vec<RecordBatch>> {
// let plan = self.optimizer.optimize(plan)?;
self.query_planner.create_physical_plan(&plan)?.execute()
self.planner.create_physical_plan(&plan)?.execute()
}

pub fn register_table(&mut self, name: &str, table: Arc<dyn DataSource>) -> Result<()> {
Expand Down
6 changes: 3 additions & 3 deletions qurious/src/logical/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,12 @@ impl LogicalPlanBuilder {
})
}

pub fn limit(self, limit: i64, offset: i64) -> Self {
pub fn limit(self, fetch: Option<usize>, skip: usize) -> Self {
LogicalPlanBuilder {
plan: LogicalPlan::Limit(Limit {
input: Box::new(self.plan),
fetch: limit as usize,
offset: offset as usize,
fetch,
skip,
}),
}
}
Expand Down
17 changes: 11 additions & 6 deletions qurious/src/logical/plan/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,27 @@ use crate::logical::plan::LogicalPlan;
#[derive(Debug, Clone)]
pub struct Limit {
pub input: Box<LogicalPlan>,
pub fetch: usize,
pub offset: usize,
pub fetch: Option<usize>,
pub skip: usize,
}

impl Display for Limit {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "Limit: fetch={}, offset={}", self.fetch, self.offset)
write!(
f,
"Limit: fetch={}, skip={}",
self.fetch.map(|x| x.to_string()).unwrap_or("None".to_owned()),
self.skip
)
}
}

impl Limit {
pub fn new(input: LogicalPlan, fetch: usize, offset: usize) -> Self {
pub fn new(input: LogicalPlan, fetch: Option<usize>, skip: usize) -> Self {
Self {
input: Box::new(input),
fetch,
offset,
skip,
}
}

Expand All @@ -31,6 +36,6 @@ impl Limit {
}

pub fn children(&self) -> Option<Vec<&LogicalPlan>> {
self.input.children()
Some(vec![&self.input])
}
}
2 changes: 1 addition & 1 deletion qurious/src/physical/expr/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{fmt::Display, sync::Arc};
use crate::error::Result;
use crate::physical::expr::PhysicalExpr;
use arrow::{
array::{new_null_array, Array, ArrayRef, AsArray, GenericStringBuilder, OffsetSizeTrait, RecordBatch},
array::{new_null_array, Array, ArrayRef, GenericStringBuilder, OffsetSizeTrait, RecordBatch},
datatypes::DataType::{self, *},
util::display::{ArrayFormatter, FormatOptions},
};
Expand Down
37 changes: 37 additions & 0 deletions qurious/src/physical/plan/ddl/create_table.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
use std::sync::Arc;

use crate::common::table_relation::TableRelation;
use crate::error::Result;
use crate::physical::plan::PhysicalPlan;
use arrow::array::RecordBatch;
use arrow::datatypes::SchemaRef;

pub struct CreateTable {
table: TableRelation,
schema: SchemaRef,
source: Arc<dyn PhysicalPlan>,
}

impl CreateTable {
pub fn new(table: String, schema: SchemaRef, source: Arc<dyn PhysicalPlan>) -> Self {
Self {
table: table.into(),
schema,
source,
}
}
}

impl PhysicalPlan for CreateTable {
fn schema(&self) -> SchemaRef {
todo!()
}

fn execute(&self) -> Result<Vec<RecordBatch>> {
todo!()
}

fn children(&self) -> Option<Vec<Arc<dyn PhysicalPlan>>> {
todo!()
}
}
26 changes: 26 additions & 0 deletions qurious/src/physical/plan/ddl/drop_table.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
use crate::physical::plan::PhysicalPlan;

pub struct DropTable {
pub name: String,
pub if_exists: bool,
}

impl DropTable {
pub fn new(name: String, if_exists: bool) -> Self {
Self { name, if_exists }
}
}

impl PhysicalPlan for DropTable {
fn schema(&self) -> arrow::datatypes::SchemaRef {
todo!()
}

fn execute(&self) -> crate::error::Result<Vec<arrow::array::RecordBatch>> {
todo!()
}

fn children(&self) -> Option<Vec<std::sync::Arc<dyn PhysicalPlan>>> {
todo!()
}
}
5 changes: 5 additions & 0 deletions qurious/src/physical/plan/ddl/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
mod create_table;
mod drop_table;

pub use create_table::*;
pub use drop_table::*;
51 changes: 36 additions & 15 deletions qurious/src/physical/plan/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,17 @@ use crate::error::Result;
use crate::physical::plan::PhysicalPlan;

use std::sync::Arc;
use std::usize;

pub struct Limit {
pub input: Arc<dyn PhysicalPlan>,
pub fetch: usize,
pub offset: usize,
pub fetch: Option<usize>,
pub skip: usize,
}

impl Limit {
pub fn new(input: Arc<dyn PhysicalPlan>, fetch: usize, offset: Option<usize>) -> Self {
Self {
input,
fetch,
offset: offset.unwrap_or_default(),
}
pub fn new(input: Arc<dyn PhysicalPlan>, fetch: Option<usize>, skip: usize) -> Self {
Self { input, fetch, skip }
}
}

Expand All @@ -28,12 +25,36 @@ impl PhysicalPlan for Limit {
}

fn execute(&self) -> Result<Vec<RecordBatch>> {
Ok(self
.input
.execute()?
.into_iter()
.map(|batch| batch.slice(self.offset, self.fetch))
.collect())
let max_fetch = self.fetch.unwrap_or(usize::MAX);
let batchs = self.input.execute()?;

let mut results = vec![];
let mut fetched = 0;
let mut skip = self.skip;

for batch in batchs {
let rows = batch.num_rows();

if rows <= skip {
skip -= rows;
continue;
}

let new_batch = batch.slice(skip, rows - skip);

let new_rows = new_batch.num_rows();
let remaining = max_fetch - fetched;

if new_rows <= remaining {
results.push(new_batch);
fetched += new_rows;
} else {
results.push(new_batch.slice(0, remaining));
break;
}
}

Ok(results)
}

fn children(&self) -> Option<Vec<Arc<dyn PhysicalPlan>>> {
Expand All @@ -55,7 +76,7 @@ mod test {
("c", UInt64Type, DataType::UInt64, vec![1, 2, 3, 4]),
);

let limit = Limit::new(input, 3, Some(1));
let limit = Limit::new(input, Some(3), 1);

let reuslts = limit.execute().unwrap();

Expand Down
11 changes: 7 additions & 4 deletions qurious/src/physical/plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ mod limit;
mod projection;
mod scan;
mod sort;
mod values;

pub mod ddl;

pub use aggregate::HashAggregate;
pub use empty::EmptyRelation;
Expand All @@ -14,12 +17,12 @@ pub use join::{join_schema, ColumnIndex, CrossJoin, Join, JoinFilter, JoinSide};
pub use limit::Limit;
pub use projection::Projection;
pub use scan::Scan;

use std::sync::Arc;

use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
pub use sort::*;
pub use values::*;

use crate::error::Result;
use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
use std::sync::Arc;

pub trait PhysicalPlan {
fn schema(&self) -> SchemaRef;
Expand Down
33 changes: 33 additions & 0 deletions qurious/src/physical/plan/values.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
use std::sync::Arc;

use arrow::array::RecordBatch;
use arrow::datatypes::SchemaRef;

use crate::error::Result;
use crate::physical::expr::PhysicalExpr;
use crate::physical::plan::PhysicalPlan;

pub struct Values {
schema: SchemaRef,
exprs: Vec<Vec<Arc<dyn PhysicalExpr>>>,
}

impl Values {
pub fn new(schema: SchemaRef, exprs: Vec<Vec<Arc<dyn PhysicalExpr>>>) -> Self {
Self { schema, exprs }
}
}

impl PhysicalPlan for Values {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}

fn execute(&self) -> Result<Vec<RecordBatch>> {
todo!()
}

fn children(&self) -> Option<Vec<Arc<dyn PhysicalPlan>>> {
None
}
}
Loading

0 comments on commit 1c62d08

Please sign in to comment.