Skip to content

Commit

Permalink
Add CSV file reader function and test case
Browse files Browse the repository at this point in the history
  • Loading branch information
holicc committed Mar 6, 2024
1 parent ab2f651 commit b22ba4f
Show file tree
Hide file tree
Showing 7 changed files with 202 additions and 2 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ connectorx = { version = "0.3.3-alpha.1", default-features = false, features = [
"dst_arrow",
] }
sqlparser = { git = "https://github.com/holicc/sqlparser.git" }
url = "2.5.0"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

Expand Down
2 changes: 1 addition & 1 deletion src/datasource/db/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ mod tests {
DataSource,
},
datatypes::scalar::ScalarValue,
logical::expr::{column, eq, BinaryExpr, LogicalExpr},
logical::expr::{column, eq, LogicalExpr},
};
use std::{sync::Arc, vec};

Expand Down
95 changes: 95 additions & 0 deletions src/datasource/file/csv.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
use std::fs::File;
use std::io::Seek;
use std::sync::Arc;

use arrow::array::RecordBatch;
use arrow::csv::reader::Format;
use arrow::csv::{Reader, ReaderBuilder};
use arrow::datatypes::SchemaRef;

use crate::datasource::memory::MemoryDataSource;
use crate::datasource::DataSource;
use crate::error::{Error, Result};
use crate::logical::expr::LogicalExpr;

use super::DataFilePath;

pub struct CsvReadOptions {
has_header: bool,
delimiter: u8,
quote: Option<u8>,
escape: Option<u8>,
}

impl Default for CsvReadOptions {
fn default() -> Self {
Self {
has_header: true,
delimiter: b',',
quote: None,
escape: None,
}
}
}

pub fn read_csv<T: DataFilePath>(path: T, options: CsvReadOptions) -> Result<Arc<dyn DataSource>> {
let url = path.to_url()?;

match url.scheme() {
"file" => {
// FIXME this may not ok when csv file is too big to read into memory
let mut file =
File::open(url.path()).map_err(|e| Error::InternalError(e.to_string()))?;

let mut format = Format::default()
.with_header(options.has_header)
.with_delimiter(options.delimiter);

if let Some(quote) = options.quote {
format = format.with_quote(quote);
}
if let Some(escape) = options.escape {
format = format.with_escape(escape);
}

// max records set 2 means we only read the first 2 records to infer the schema
// first line is header
// second line is data to infer the data type
let (schema, _) = format
.infer_schema(&mut file, None)
.map_err(|e| Error::ArrowError(e))?;

// rewind the file to the beginning because the schema inference
file.rewind().unwrap();

let schema = Arc::new(schema);

ReaderBuilder::new(schema.clone())
.with_format(format)
.build(file)
.and_then(|reader| reader.into_iter().collect())
.map(|data| Arc::new(MemoryDataSource::new(schema, data)) as Arc<dyn DataSource>)
.map_err(|e| Error::ArrowError(e))
}
_ => unimplemented!(),
}
}

#[cfg(test)]
mod tests {
use arrow::util;

use super::*;

#[test]
fn test_read_csv() {
let options = CsvReadOptions::default();

let source = read_csv("tests/testdata/file/case1.csv", options).unwrap();

println!(
"{}",
util::pretty::pretty_format_batches(&source.scan(None, &vec![]).unwrap()).unwrap()
);
}
}
66 changes: 66 additions & 0 deletions src/datasource/file/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
pub mod csv;

use std::{
fs::{self, File},
vec,
};

use arrow::{
csv::{reader::BufReader, Reader},
datatypes::SchemaRef,
};
use url::Url;

use crate::{
error::{Error, Result},
logical::expr::LogicalExpr,
};

use super::DataSource;

pub trait DataFilePath {
fn to_url(self) -> Result<Url>;
}

impl DataFilePath for String {
fn to_url(self) -> Result<Url> {
parse_path(self)
}
}

impl DataFilePath for &str {
fn to_url(self) -> Result<Url> {
parse_path(self)
}
}

fn parse_path<S: AsRef<str>>(path: S) -> Result<Url> {
match path.as_ref().parse::<Url>() {
Ok(url) => Ok(url),
Err(url::ParseError::RelativeUrlWithoutBase) => fs::canonicalize(path.as_ref())
.and_then(|absolute| Ok(Url::from_file_path(absolute).unwrap()))
.map_err(|e| {
Error::InternalError(format!(
"file path: {}, err: {}",
path.as_ref(),
e.to_string()
))
}),
Err(e) => Err(Error::InternalError(e.to_string())),
}
}

/// FileSource is a data source for reading data from a file
/// different file formats have different Readers
/// all readers from arrows
#[derive(Debug)]
pub struct FileSource<R> {
schema: SchemaRef,
reader: R,
}

impl FileSource<BufReader<File>> {
pub fn new(schema: SchemaRef, reader: BufReader<File>) -> Self {
Self { schema, reader }
}
}
2 changes: 2 additions & 0 deletions src/execution/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,6 @@ mod tests {

Ok(())
}


}
36 changes: 35 additions & 1 deletion src/planner/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use sqlparser::{

use crate::{
common::TableRelation,
datasource::file::csv::{self, CsvReadOptions},
datatypes::scalar::ScalarValue,
error::{Error, Result},
execution::registry::{ImmutableHashMapTableRegistry, TableRegistry},
Expand Down Expand Up @@ -115,7 +116,7 @@ impl SqlQueryPlanner {
Ok((builder.build(), Some(name.into())))
}
}
From::TableFunction { name, args, alias } => todo!(),
From::TableFunction { name, args, alias } => self.table_func_to_plan(name, args, alias),
From::SubQuery { query, alias } => todo!(),
From::Join {
left,
Expand All @@ -126,6 +127,31 @@ impl SqlQueryPlanner {
}
}

fn table_func_to_plan(
&self,
name: String,
args: Vec<Expression>,
alias: Option<String>,
) -> Result<(LogicalPlan, Option<TableRelation>)> {
match name.to_lowercase().as_str() {
"read_csv" => {
let (path, options) = self.parse_csv_options(args)?;
let table_name = "tmp_csv_table";
let table_srouce = csv::read_csv(path, options)?;
let plan = LogicalPlanBuilder::scan(table_name, table_srouce, None).build();

Ok((plan, Some(table_name.into())))
}
"read_json" => todo!(),

_ => todo!(),
}
}

fn parse_csv_options(&self, args: Vec<Expression>) -> Result<(String, CsvReadOptions)> {
todo!()
}

fn apply_table_alias(&self, plan: LogicalPlan, alias: String) -> Result<LogicalPlan> {
todo!("apply_table_alias")
}
Expand Down Expand Up @@ -354,6 +380,14 @@ mod tests {
}
}

#[test]
fn test_table_function() {
quick_test(
"SELECT * FROM read_csv('./test.csv')",
"Projection: (Int64(1))\n Empty Relation\n",
);
}

#[test]
fn test_empty_relation() {
quick_test("SELECT 1", "Projection: (Int64(1))\n Empty Relation\n");
Expand Down
2 changes: 2 additions & 0 deletions tests/testdata/file/case1.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
id,name,localtion
1,BeiJing University,China BeiJing

0 comments on commit b22ba4f

Please sign in to comment.