Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
holicc committed Aug 19, 2024
1 parent cb0c5bb commit 2956a62
Show file tree
Hide file tree
Showing 40 changed files with 800 additions and 806 deletions.
4 changes: 1 addition & 3 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,5 @@
"rust-analyzer.linkedProjects": [
"./Cargo.toml"
],
"rust-analyzer.cargo.features": [
"postgresql"
],
"rust-analyzer.cargo.features": [],
}
7 changes: 2 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[workspace]
members = ["qurious", "server", "sqlparser"]
members = ["qurious", "sqlparser"]
resolver = "2"


Expand All @@ -19,10 +19,7 @@ sqlparser = { path = "sqlparser" }
parquet = "52.0.0"
arrow = "52.0.0"
url = "2.5.0"
tokio = { version = "1.37.0", features = ["full"] }
async-trait = "0.1.80"
tokio-stream = "0.1.15"
log = "0.4.21"
log = "^0.4"
dashmap = "6.0.1"
connectorx = { git = "https://github.com/holicc/connector-x.git" }

Expand Down
9 changes: 9 additions & 0 deletions qurious/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,19 @@ arrow = { workspace = true }
url = { workspace = true }
dashmap = { workspace = true }
connectorx = { workspace = true, features = ["src_postgres", "dst_arrow"] }
log = { workspace = true }
postgres = "0.19.8"
itertools = "0.13.0"
rayon = "1.10.0"


[dev-dependencies]
arrow = { version = "52.0.0", features = ["prettyprint", "test_utils"] }
async-trait = "0.1.81"
env_logger = "0.11.5"
sqllogictest = "0.21.0"

[[test]]
harness = false
name = "sqllogictests"
path = "tests/sqllogictests.rs"
7 changes: 2 additions & 5 deletions qurious/src/common/table_relation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,6 @@ impl TableRelation {
}
}
}



}

impl Display for TableRelation {
Expand All @@ -105,13 +102,13 @@ impl Display for TableRelation {

impl From<String> for TableRelation {
fn from(value: String) -> Self {
TableRelation::parse_str(&value)
TableRelation::parse_str(&value.to_ascii_lowercase())
}
}

impl From<&str> for TableRelation {
fn from(value: &str) -> Self {
TableRelation::parse_str(value)
TableRelation::parse_str(value.to_ascii_lowercase().as_str())
}
}

Expand Down
2 changes: 1 addition & 1 deletion qurious/src/datasource/file/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ pub fn read_csv<T: DataFilePath>(path: T, options: CsvReadOptions) -> Result<Arc
.with_format(format)
.build(file)
.and_then(|reader| reader.into_iter().collect())
.map(|data| Arc::new(MemoryTable::new(schema, data)) as Arc<dyn TableProvider>)
.map_err(|e| Error::ArrowError(e))
.and_then(|data| MemoryTable::try_new(schema, data).map(|v| Arc::new(v) as Arc<dyn TableProvider>))
}
_ => unimplemented!(),
}
Expand Down
2 changes: 1 addition & 1 deletion qurious/src/datasource/file/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ pub fn read_json<T: DataFilePath>(path: T) -> Result<Arc<dyn TableProvider>> {
ReaderBuilder::new(schema.clone())
.build(reader)
.and_then(|builder| builder.into_iter().collect())
.map(|data| Arc::new(MemoryTable::new(schema, data)) as Arc<dyn TableProvider>)
.map_err(|e| Error::ArrowError(e))
.and_then(|data| MemoryTable::try_new(schema, data).map(|v| Arc::new(v) as Arc<dyn TableProvider>))
}

#[cfg(test)]
Expand Down
19 changes: 1 addition & 18 deletions qurious/src/datasource/file/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@ pub mod csv;
pub mod json;
pub mod parquet;

use std::fs::{self, File};

use arrow::{csv::reader::BufReader, datatypes::SchemaRef};
use std::fs::{self};
use url::Url;

use crate::error::{Error, Result};
Expand Down Expand Up @@ -40,18 +38,3 @@ pub fn parse_path<S: AsRef<str>>(path: S) -> Result<Url> {
Err(e) => Err(Error::InternalError(e.to_string())),
}
}

/// FileSource is a data source for reading data from a file
/// different file formats have different Readers
/// all readers from arrows
#[derive(Debug)]
pub struct FileSource<R> {
schema: SchemaRef,
reader: R,
}

impl FileSource<BufReader<File>> {
pub fn new(schema: SchemaRef, reader: BufReader<File>) -> Self {
Self { schema, reader }
}
}
2 changes: 1 addition & 1 deletion qurious/src/datasource/file/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub fn read_parquet<T: DataFilePath>(path: T) -> Result<Arc<dyn TableProvider>>
let schema = builder.schema().clone();
let data = builder.build()?.collect::<Result<Vec<_>, arrow::error::ArrowError>>()?;

Ok(Arc::new(MemoryTable::new(schema, data)))
MemoryTable::try_new(schema, data).map(|v| Arc::new(v) as Arc<dyn TableProvider>)
}

#[cfg(test)]
Expand Down
47 changes: 29 additions & 18 deletions qurious/src/datasource/memory.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::RwLock;

use arrow::datatypes::Schema;
use arrow::datatypes::SchemaRef;
Expand All @@ -9,22 +10,23 @@ use crate::datatypes::scalar::ScalarValue;
use crate::error::Error;
use crate::error::Result;
use crate::logical::expr::LogicalExpr;
use crate::physical::plan::PhysicalPlan;
use crate::provider::table::TableProvider;

#[derive(Clone, Debug)]
pub struct MemoryTable {
schema: SchemaRef,
data: Vec<RecordBatch>,
data: Arc<RwLock<Vec<RecordBatch>>>,
column_defaults: HashMap<String, ScalarValue>,
}

impl MemoryTable {
pub fn new(schema: SchemaRef, data: Vec<RecordBatch>) -> Self {
Self {
pub fn try_new(schema: SchemaRef, data: Vec<RecordBatch>) -> Result<Self> {
Ok(Self {
schema,
data,
data: Arc::new(RwLock::new(data)),
column_defaults: HashMap::new(),
}
})
}

pub fn with_default_values(self, columns_defaults: HashMap<String, ScalarValue>) -> Self {
Expand All @@ -39,7 +41,7 @@ impl Default for MemoryTable {
fn default() -> Self {
Self {
schema: Arc::new(Schema::empty()),
data: vec![],
data: Arc::new(RwLock::new(vec![])),
column_defaults: HashMap::new(),
}
}
Expand All @@ -51,24 +53,33 @@ impl TableProvider for MemoryTable {
}

fn scan(&self, projection: Option<Vec<String>>, _filters: &[LogicalExpr]) -> Result<Vec<RecordBatch>> {
let batches = self.data.read().map_err(|e| Error::InternalError(e.to_string()))?;

if let Some(projection) = projection {
let mut r = vec![];
for p in projection {
let index = self
.schema
.fields
.iter()
.position(|f| f.name() == &p)
.ok_or(Error::ColumnNotFound(p))?;
r.push(self.data[index].clone());
}
Ok(r)
let indices = projection
.iter()
.map(|name| self.schema.index_of(name).map_err(|e| Error::ArrowError(e)))
.collect::<Result<Vec<_>>>()?;

batches
.iter()
.map(|batch| batch.project(&indices).map_err(|e| Error::ArrowError(e)))
.collect()
} else {
Ok(self.data.clone())
Ok(batches.clone())
}
}

fn get_column_default(&self, column: &str) -> Option<ScalarValue> {
self.column_defaults.get(column).map(|v| v.clone())
}

fn insert(&self, input: Arc<dyn PhysicalPlan>) -> Result<u64> {
let mut batces = self.data.write().map_err(|e| Error::InternalError(e.to_string()))?;
let mut input_batch = input.execute()?;

batces.append(&mut input_batch);

Ok(input_batch.iter().map(|batch| batch.num_rows()).sum::<usize>() as u64)
}
}
41 changes: 30 additions & 11 deletions qurious/src/execution/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use crate::planner::QueryPlanner;
use crate::provider::catalog::CatalogProvider;
use crate::provider::schema::SchemaProvider;
use crate::provider::table::TableProvider;
use crate::utils::batch::make_count_batch;
use crate::{error::Result, planner::DefaultQueryPlanner};

use crate::execution::providers::CatalogProviderList;
Expand Down Expand Up @@ -68,11 +69,12 @@ impl ExecuteSession {
let source = self.find_table_provider(relation)?;
let input = self.planner.create_physical_plan(input)?;

match op {
DmlOperator::Insert => source.insert(input)?.execute(),
DmlOperator::Update => source.update(input)?.execute(),
DmlOperator::Delete => source.delete(input)?.execute(),
}
let rows_affected = match op {
DmlOperator::Insert => source.insert(input)?,
_ => todo!(),
};

Ok(vec![make_count_batch(rows_affected)])
}
plan => self.planner.create_physical_plan(&plan)?.execute(),
}
Expand Down Expand Up @@ -167,7 +169,7 @@ impl ExecuteSession {
schema_provider
.register_table(
table.table().to_owned(),
Arc::new(MemoryTable::new(schema.clone(), batch)),
Arc::new(MemoryTable::try_new(schema.clone(), batch)?),
)
.map(|_| vec![])
}
Expand Down Expand Up @@ -196,10 +198,7 @@ mod tests {
datasource::{connectorx::postgres::PostgresCatalogProvider, memory::MemoryTable},
test_utils::assert_batch_eq,
};
use arrow::{
array::{Int32Array, StringArray},
util::pretty::print_batches,
};
use arrow::array::{Int32Array, StringArray};

use super::*;

Expand All @@ -211,6 +210,26 @@ mod tests {
assert_batch_eq(&batch, expected);
}

#[test]
fn test_create_table() -> Result<()> {
let session = ExecuteSession::new()?;
let sql = r#"create table t(v1 int)"#;

session.sql(sql)?;
session.sql("insert into T values (1)")?;

let batch = session.sql("select * from T")?;
assert_batch_eq(&batch, vec![
"+----+",
"| v1 |",
"+----+",
"| 1 |",
"+----+",
]);

Ok(())
}

#[test]
fn test_execute_sql() -> Result<()> {
let session = ExecuteSession::new()?;
Expand All @@ -226,7 +245,7 @@ mod tests {
Arc::new(StringArray::from(vec!["a", "b", "c", "d", "e"])),
],
)?];
let datasource = MemoryTable::new(schema, data.clone());
let datasource = MemoryTable::try_new(schema, data.clone())?;

session.register_table("t", Arc::new(datasource))?;

Expand Down
1 change: 1 addition & 0 deletions qurious/src/logical/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ impl LogicalExpr {
LogicalExpr::Literal(v) => Ok(Arc::new(v.to_field())),
LogicalExpr::Alias(a) => a.expr.field(plan),
LogicalExpr::Wildcard => Ok(Arc::new(Field::new("*", DataType::Null, true))),
LogicalExpr::Cast(c) => c.field(plan),
_ => Err(Error::InternalError(format!(
"Cannot determine schema for expression: {:?}",
self
Expand Down
3 changes: 2 additions & 1 deletion qurious/src/logical/plan/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ impl Projection {
.filter_map(|f| match f {
LogicalExpr::Column(i) => Some(i.field(&input)),
LogicalExpr::Literal(i) => Some(Ok(Arc::new(i.to_field()))),
_ => None,
LogicalExpr::Alias(i) => Some(i.expr.field(&input)),
a => todo!("Projection::try_new: {:?}", a),
})
.collect::<Result<Vec<FieldRef>>>()
.map(|fields| Arc::new(Schema::new(fields)))?,
Expand Down
43 changes: 14 additions & 29 deletions qurious/src/physical/expr/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,17 @@ use crate::error::Result;
use crate::physical::expr::PhysicalExpr;
use arrow::{
array::{new_null_array, Array, ArrayRef, GenericStringBuilder, OffsetSizeTrait, RecordBatch},
compute::{cast_with_options, CastOptions},
datatypes::DataType::{self, *},
util::display::{ArrayFormatter, FormatOptions},
util::display::{ArrayFormatter, DurationFormat, FormatOptions},
};

pub const DEFAULT_FORMAT_OPTIONS: FormatOptions<'static> =
FormatOptions::new().with_duration_format(DurationFormat::Pretty);

pub const DEFAULT_CAST_OPTIONS: CastOptions<'static> = CastOptions {
safe: false,
format_options: DEFAULT_FORMAT_OPTIONS,
};

#[derive(Debug)]
Expand All @@ -14,24 +23,17 @@ pub struct CastExpr {
data_type: DataType,
}

impl CastExpr{
impl CastExpr {
pub fn new(expr: Arc<dyn PhysicalExpr>, data_type: DataType) -> Self {
Self { expr, data_type }
}
}

impl PhysicalExpr for CastExpr {
fn evaluate(&self, input: &RecordBatch) -> Result<ArrayRef> {
let array = self.expr.evaluate(input)?;

let from_type = array.data_type();
let to_type = &self.data_type;
match (from_type, to_type) {
(Null, _) => Ok(new_null_array(to_type, array.len())),
(_, Utf8) => to_str_array::<i32>(&array),
(_, LargeUtf8) => to_str_array::<i64>(&array),
_ => todo!(),
}
self.expr
.evaluate(input)
.and_then(|array| cast_with_options(&array, &self.data_type, &DEFAULT_CAST_OPTIONS).map_err(|e| e.into()))
}
}

Expand All @@ -40,20 +42,3 @@ impl Display for CastExpr {
write!(f, "CAST({} AS {})", self.expr, self.data_type)
}
}

pub(crate) fn to_str_array<O: OffsetSizeTrait>(array: &dyn Array) -> Result<ArrayRef> {
let mut builder = GenericStringBuilder::<O>::new();
let formatter = ArrayFormatter::try_new(array, &FormatOptions::default())?;
let nulls = array.nulls();
for i in 0..array.len() {
match nulls.map(|x| x.is_null(i)).unwrap_or_default() {
true => builder.append_null(),
false => {
formatter.value(i).write(&mut builder)?;
// tell the builder the row is finished
builder.append_value("");
}
}
}
Ok(Arc::new(builder.finish()))
}
Loading

0 comments on commit 2956a62

Please sign in to comment.