diff --git a/Cargo.toml b/Cargo.toml index 9c4bf54..7f5d14f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,6 +15,7 @@ connectorx = { version = "0.3.3-alpha.1", default-features = false, features = [ "dst_arrow", ] } sqlparser = { git = "https://github.com/holicc/sqlparser.git" } +url = "2.5.0" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/src/datasource/db/postgres.rs b/src/datasource/db/postgres.rs index ee99a81..9befabb 100644 --- a/src/datasource/db/postgres.rs +++ b/src/datasource/db/postgres.rs @@ -193,7 +193,7 @@ mod tests { DataSource, }, datatypes::scalar::ScalarValue, - logical::expr::{column, eq, BinaryExpr, LogicalExpr}, + logical::expr::{column, eq, LogicalExpr}, }; use std::{sync::Arc, vec}; diff --git a/src/datasource/file/csv.rs b/src/datasource/file/csv.rs index e69de29..8e743df 100644 --- a/src/datasource/file/csv.rs +++ b/src/datasource/file/csv.rs @@ -0,0 +1,95 @@ +use std::fs::File; +use std::io::Seek; +use std::sync::Arc; + +use arrow::array::RecordBatch; +use arrow::csv::reader::Format; +use arrow::csv::{Reader, ReaderBuilder}; +use arrow::datatypes::SchemaRef; + +use crate::datasource::memory::MemoryDataSource; +use crate::datasource::DataSource; +use crate::error::{Error, Result}; +use crate::logical::expr::LogicalExpr; + +use super::DataFilePath; + +pub struct CsvReadOptions { + has_header: bool, + delimiter: u8, + quote: Option, + escape: Option, +} + +impl Default for CsvReadOptions { + fn default() -> Self { + Self { + has_header: true, + delimiter: b',', + quote: None, + escape: None, + } + } +} + +pub fn read_csv(path: T, options: CsvReadOptions) -> Result> { + let url = path.to_url()?; + + match url.scheme() { + "file" => { + // FIXME this may not ok when csv file is too big to read into memory + let mut file = + File::open(url.path()).map_err(|e| Error::InternalError(e.to_string()))?; + + let mut format = Format::default() + .with_header(options.has_header) + .with_delimiter(options.delimiter); + + if let Some(quote) = options.quote { + format = format.with_quote(quote); + } + if let Some(escape) = options.escape { + format = format.with_escape(escape); + } + + // max records set 2 means we only read the first 2 records to infer the schema + // first line is header + // second line is data to infer the data type + let (schema, _) = format + .infer_schema(&mut file, None) + .map_err(|e| Error::ArrowError(e))?; + + // rewind the file to the beginning because the schema inference + file.rewind().unwrap(); + + let schema = Arc::new(schema); + + ReaderBuilder::new(schema.clone()) + .with_format(format) + .build(file) + .and_then(|reader| reader.into_iter().collect()) + .map(|data| Arc::new(MemoryDataSource::new(schema, data)) as Arc) + .map_err(|e| Error::ArrowError(e)) + } + _ => unimplemented!(), + } +} + +#[cfg(test)] +mod tests { + use arrow::util; + + use super::*; + + #[test] + fn test_read_csv() { + let options = CsvReadOptions::default(); + + let source = read_csv("tests/testdata/file/case1.csv", options).unwrap(); + + println!( + "{}", + util::pretty::pretty_format_batches(&source.scan(None, &vec![]).unwrap()).unwrap() + ); + } +} diff --git a/src/datasource/file/mod.rs b/src/datasource/file/mod.rs index e69de29..75aaf41 100644 --- a/src/datasource/file/mod.rs +++ b/src/datasource/file/mod.rs @@ -0,0 +1,66 @@ +pub mod csv; + +use std::{ + fs::{self, File}, + vec, +}; + +use arrow::{ + csv::{reader::BufReader, Reader}, + datatypes::SchemaRef, +}; +use url::Url; + +use crate::{ + error::{Error, Result}, + logical::expr::LogicalExpr, +}; + +use super::DataSource; + +pub trait DataFilePath { + fn to_url(self) -> Result; +} + +impl DataFilePath for String { + fn to_url(self) -> Result { + parse_path(self) + } +} + +impl DataFilePath for &str { + fn to_url(self) -> Result { + parse_path(self) + } +} + +fn parse_path>(path: S) -> Result { + match path.as_ref().parse::() { + Ok(url) => Ok(url), + Err(url::ParseError::RelativeUrlWithoutBase) => fs::canonicalize(path.as_ref()) + .and_then(|absolute| Ok(Url::from_file_path(absolute).unwrap())) + .map_err(|e| { + Error::InternalError(format!( + "file path: {}, err: {}", + path.as_ref(), + e.to_string() + )) + }), + Err(e) => Err(Error::InternalError(e.to_string())), + } +} + +/// FileSource is a data source for reading data from a file +/// different file formats have different Readers +/// all readers from arrows +#[derive(Debug)] +pub struct FileSource { + schema: SchemaRef, + reader: R, +} + +impl FileSource> { + pub fn new(schema: SchemaRef, reader: BufReader) -> Self { + Self { schema, reader } + } +} diff --git a/src/execution/session.rs b/src/execution/session.rs index dd50d67..2137b97 100644 --- a/src/execution/session.rs +++ b/src/execution/session.rs @@ -80,4 +80,6 @@ mod tests { Ok(()) } + + } diff --git a/src/planner/sql.rs b/src/planner/sql.rs index 93dc1d5..8f51d99 100644 --- a/src/planner/sql.rs +++ b/src/planner/sql.rs @@ -8,6 +8,7 @@ use sqlparser::{ use crate::{ common::TableRelation, + datasource::file::csv::{self, CsvReadOptions}, datatypes::scalar::ScalarValue, error::{Error, Result}, execution::registry::{ImmutableHashMapTableRegistry, TableRegistry}, @@ -115,7 +116,7 @@ impl SqlQueryPlanner { Ok((builder.build(), Some(name.into()))) } } - From::TableFunction { name, args, alias } => todo!(), + From::TableFunction { name, args, alias } => self.table_func_to_plan(name, args, alias), From::SubQuery { query, alias } => todo!(), From::Join { left, @@ -126,6 +127,31 @@ impl SqlQueryPlanner { } } + fn table_func_to_plan( + &self, + name: String, + args: Vec, + alias: Option, + ) -> Result<(LogicalPlan, Option)> { + match name.to_lowercase().as_str() { + "read_csv" => { + let (path, options) = self.parse_csv_options(args)?; + let table_name = "tmp_csv_table"; + let table_srouce = csv::read_csv(path, options)?; + let plan = LogicalPlanBuilder::scan(table_name, table_srouce, None).build(); + + Ok((plan, Some(table_name.into()))) + } + "read_json" => todo!(), + + _ => todo!(), + } + } + + fn parse_csv_options(&self, args: Vec) -> Result<(String, CsvReadOptions)> { + todo!() + } + fn apply_table_alias(&self, plan: LogicalPlan, alias: String) -> Result { todo!("apply_table_alias") } @@ -354,6 +380,14 @@ mod tests { } } + #[test] + fn test_table_function() { + quick_test( + "SELECT * FROM read_csv('./test.csv')", + "Projection: (Int64(1))\n Empty Relation\n", + ); + } + #[test] fn test_empty_relation() { quick_test("SELECT 1", "Projection: (Int64(1))\n Empty Relation\n"); diff --git a/tests/testdata/file/case1.csv b/tests/testdata/file/case1.csv index e69de29..f08b203 100644 --- a/tests/testdata/file/case1.csv +++ b/tests/testdata/file/case1.csv @@ -0,0 +1,2 @@ +id,name,localtion +1,BeiJing University,China BeiJing \ No newline at end of file