Skip to content

Commit

Permalink
WIP: support INSERT
Browse files Browse the repository at this point in the history
  • Loading branch information
holicc committed Jul 11, 2024
1 parent d94964c commit 6efb797
Show file tree
Hide file tree
Showing 12 changed files with 525 additions and 166 deletions.
21 changes: 20 additions & 1 deletion src/datasource/memory.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use std::collections::HashMap;
use std::sync::Arc;

use arrow::datatypes::Schema;
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;

use crate::datatypes::scalar::ScalarValue;
use crate::error::Error;
use crate::error::Result;
use crate::logical::expr::LogicalExpr;
Expand All @@ -14,11 +16,23 @@ use super::DataSource;
pub struct MemoryDataSource {
schema: SchemaRef,
data: Vec<RecordBatch>,
column_defaults: HashMap<String, ScalarValue>,
}

impl MemoryDataSource {
pub fn new(schema: SchemaRef, data: Vec<RecordBatch>) -> Self {
Self { schema, data }
Self {
schema,
data,
column_defaults: HashMap::new(),
}
}

pub fn with_default_values(self, columns_defaults: HashMap<String, ScalarValue>) -> Self {
Self {
column_defaults: columns_defaults,
..self
}
}
}

Expand All @@ -27,6 +41,7 @@ impl Default for MemoryDataSource {
Self {
schema: Arc::new(Schema::empty()),
data: vec![],
column_defaults: HashMap::new(),
}
}
}
Expand All @@ -53,4 +68,8 @@ impl DataSource for MemoryDataSource {
Ok(self.data.clone())
}
}

fn get_column_default(&self, column: &str) -> Option<&ScalarValue> {
self.column_defaults.get(column)
}
}
11 changes: 8 additions & 3 deletions src/datasource/mod.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
pub mod file;
pub mod memory;

use crate::{error::Result, logical::expr::LogicalExpr};
use crate::{datatypes::scalar::ScalarValue, error::Result, logical::expr::LogicalExpr};
use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
use std::fmt::Debug;
use std::{collections::HashMap, fmt::Debug};

pub trait DataSource: Debug + Sync + Send {
fn schema(&self) -> SchemaRef;

/// Perform a scan of the data source and return the results as RecordBatch
fn scan(&self, projection: Option<Vec<String>>, filters: &[LogicalExpr]) -> Result<Vec<RecordBatch>>;
}

/// Get the default value for a column, if available.
fn get_column_default(&self, _column: &str) -> Option<&ScalarValue> {
None
}
}
8 changes: 7 additions & 1 deletion src/datatypes/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,12 @@ impl ScalarValue {
}
}

impl From<&str> for ScalarValue {
fn from(s: &str) -> Self {
ScalarValue::Utf8(Some(s.to_string()))
}
}

impl From<ScalarValue> for Field {
fn from(value: ScalarValue) -> Self {
value.to_field()
Expand Down Expand Up @@ -154,4 +160,4 @@ impl Display for ScalarValue {
ScalarValue::Utf8(None) => write!(f, "null"),
}
}
}
}
5 changes: 4 additions & 1 deletion src/logical/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ impl LogicalPlanBuilder {

pub fn empty(produce_one_row: bool) -> Self {
LogicalPlanBuilder {
plan: LogicalPlan::EmptyRelation(EmptyRelation::new(Arc::new(Schema::empty()), produce_one_row)),
plan: LogicalPlan::EmptyRelation(EmptyRelation {
schema: Arc::new(Schema::empty()),
produce_one_row,
}),
}
}

Expand Down
60 changes: 60 additions & 0 deletions src/logical/plan/dml.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
use std::fmt::{self, Display, Formatter};

use arrow::datatypes::SchemaRef;

use crate::logical::plan::LogicalPlan;

#[derive(Debug, Clone)]
pub enum DmlStatement {
Insert(Insert),
Update(Update),
Delete(Delete),
}

impl DmlStatement {
pub fn schema(&self) -> SchemaRef {
match self {
DmlStatement::Insert(i) => i.input.schema(),
DmlStatement::Update(u) => u.input.schema(),
DmlStatement::Delete(d) => d.selection.schema(),
}
}

pub fn children(&self) -> Option<Vec<&LogicalPlan>> {
match self {
DmlStatement::Insert(i) => Some(vec![&i.input]),
DmlStatement::Update(u) => Some(vec![&u.input, &u.selection]),
DmlStatement::Delete(d) => Some(vec![&d.selection]),
}
}
}

impl Display for DmlStatement {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
match self {
DmlStatement::Insert(i) => write!(f, "Dml: op=[Insert Into] table=[{}]", i.table_name),
DmlStatement::Update(u) => write!(f, "Update: [{}]", u.table_name),
DmlStatement::Delete(d) => write!(f, "Delete: [{}]", d.table_name),
}
}
}

#[derive(Debug, Clone)]
pub struct Insert {
pub table_name: String,
pub table_schema: SchemaRef,
pub input: Box<LogicalPlan>,
}

#[derive(Debug, Clone)]
pub struct Update {
pub table_name: String,
pub input: Box<LogicalPlan>,
pub selection: Box<LogicalPlan>,
}

#[derive(Debug, Clone)]
pub struct Delete {
pub table_name: String,
pub selection: Box<LogicalPlan>,
}
75 changes: 50 additions & 25 deletions src/logical/plan/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mod aggregate;
mod ddl;
mod dml;
mod filter;
mod join;
mod limit;
Expand All @@ -8,8 +9,11 @@ mod scan;
mod sort;
mod sub_query;

use std::fmt::{self, Display, Formatter};

pub use aggregate::Aggregate;
pub use ddl::*;
pub use dml::*;
pub use filter::Filter;
pub use join::*;
pub use limit::Limit;
Expand All @@ -20,6 +24,8 @@ pub use sub_query::SubqueryAlias;

use arrow::datatypes::SchemaRef;

use super::expr::LogicalExpr;

#[macro_export]
macro_rules! impl_logical_plan {
($name:ident) => {
Expand All @@ -35,29 +41,6 @@ macro_rules! impl_logical_plan {
};
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct EmptyRelation {
pub produce_one_row: bool,
pub schema: SchemaRef,
}

impl EmptyRelation {
pub fn new(schema: SchemaRef, produce_one_row: bool) -> Self {
Self {
produce_one_row,
schema,
}
}

pub fn schema(&self) -> SchemaRef {
self.schema.clone()
}

pub fn children(&self) -> Option<Vec<&LogicalPlan>> {
None
}
}

#[derive(Debug, Clone)]
pub enum LogicalPlan {
/// Apply Cross Join to two logical plans.
Expand All @@ -68,6 +51,8 @@ pub enum LogicalPlan {
Aggregate(Aggregate),
TableScan(TableScan),
EmptyRelation(EmptyRelation),
/// VALUES (1, 2), (3, 4)
Values(Values),
/// Aliased relation provides, or changes, the name of a relation.
SubqueryAlias(SubqueryAlias),
/// Sort the result set by the specified expressions.
Expand All @@ -76,6 +61,8 @@ pub enum LogicalPlan {
Limit(Limit),
/// Data Definition Language (DDL) statements. CREATE, DROP, etc.
Ddl(DdlStatement),
/// Data Manipulation Language (DML) statements. INSERT, UPDATE, DELETE, etc.
Dml(DmlStatement),
}

impl LogicalPlan {
Expand All @@ -85,29 +72,33 @@ impl LogicalPlan {
LogicalPlan::Filter(f) => f.schema(),
LogicalPlan::Aggregate(a) => a.schema(),
LogicalPlan::TableScan(t) => t.schema(),
LogicalPlan::EmptyRelation(e) => e.schema(),
LogicalPlan::EmptyRelation(e) => e.schema.clone(),
LogicalPlan::CrossJoin(s) => s.schema(),
LogicalPlan::SubqueryAlias(s) => s.schema(),
LogicalPlan::Join(j) => j.schema(),
LogicalPlan::Sort(s) => s.schema(),
LogicalPlan::Limit(l) => l.schema(),
LogicalPlan::Ddl(d) => d.schema(),
LogicalPlan::Dml(d) => d.schema(),
LogicalPlan::Values(v) => v.schema.clone(),
}
}

pub fn children(&self) -> Option<Vec<&LogicalPlan>> {
match self {
LogicalPlan::EmptyRelation(_) | LogicalPlan::Values(_) => None,

LogicalPlan::Projection(p) => p.children(),
LogicalPlan::Filter(f) => f.children(),
LogicalPlan::Aggregate(a) => a.children(),
LogicalPlan::TableScan(t) => t.children(),
LogicalPlan::EmptyRelation(e) => e.children(),
LogicalPlan::CrossJoin(s) => s.children(),
LogicalPlan::SubqueryAlias(s) => s.children(),
LogicalPlan::Join(j) => j.children(),
LogicalPlan::Sort(s) => s.children(),
LogicalPlan::Limit(l) => l.children(),
LogicalPlan::Ddl(l) => l.children(),
LogicalPlan::Dml(l) => l.children(),
}
}
}
Expand All @@ -126,6 +117,40 @@ impl std::fmt::Display for LogicalPlan {
LogicalPlan::Sort(s) => write!(f, "{}", s),
LogicalPlan::Limit(l) => write!(f, "{}", l),
LogicalPlan::Ddl(l) => write!(f, "{}", l),
LogicalPlan::Values(v) => write!(f, "{}", v),
LogicalPlan::Dml(d) => write!(f, "{}", d),
}
}
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct EmptyRelation {
pub produce_one_row: bool,
pub schema: SchemaRef,
}

#[derive(Debug, Clone)]
pub struct Values {
pub values: Vec<Vec<LogicalExpr>>,
pub schema: SchemaRef,
}

impl Display for Values {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
write!(f, "Values: [")?;
for (i, row) in self.values.iter().enumerate() {
write!(f, "[")?;
for (j, value) in row.iter().enumerate() {
write!(f, "{}", value)?;
if j < row.len() - 1 {
write!(f, ", ")?;
}
}
write!(f, "]")?;
if i < self.values.len() - 1 {
write!(f, ", ")?;
}
}
write!(f, "]")
}
}
Loading

0 comments on commit 6efb797

Please sign in to comment.