Skip to content

Commit

Permalink
WIP: 🤔 Maybe i have to stop here
Browse files Browse the repository at this point in the history
  • Loading branch information
holicc committed Dec 3, 2024
1 parent 2a331a7 commit ca64346
Show file tree
Hide file tree
Showing 11 changed files with 397 additions and 348 deletions.
120 changes: 117 additions & 3 deletions qurious/src/common/table_schema.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,125 @@
use std::sync::Arc;
use std::{collections::HashSet, fmt::Display, sync::Arc};

use super::table_relation::TableRelation;
use arrow::datatypes::SchemaRef;
use crate::{
error::{Error, Result},
internal_err,
logical::expr::Column,
};
use arrow::datatypes::{Schema, SchemaRef};

pub type TableSchemaRef = Arc<TableSchema>;

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct TableSchema {
pub schema: SchemaRef,
pub qualified_name: Vec<Option<TableRelation>>,
pub field_qualifiers: Vec<Option<TableRelation>>,
}

impl TableSchema {
pub fn try_from_qualified_schema(relation: impl Into<TableRelation>, schema: SchemaRef) -> Result<Self> {
Ok(Self {
field_qualifiers: vec![Some(relation.into()); schema.fields().len()],
schema,
})
}

pub fn empty() -> Self {
Self {
schema: Arc::new(Schema::empty()),
field_qualifiers: vec![],
}
}

pub fn arrow_schema(&self) -> SchemaRef {
self.schema.clone()
}

pub fn has_field(&self, qualifier: Option<TableRelation>, name: &str) -> bool {
match (self.schema.index_of(name).ok(), qualifier) {
(Some(i), Some(q)) => self.field_qualifiers[i] == Some(q),
(Some(_), None) => true,
_ => false,
}
}

pub fn columns(&self) -> Vec<Column> {
self.schema
.fields()
.iter()
.zip(self.field_qualifiers.iter())
.map(|(f, q)| Column::new(f.name(), q.clone()))
.collect()
}
}

impl TableSchema {
pub fn merge(schemas: Vec<TableSchemaRef>) -> Result<Self> {
let fields = schemas
.into_iter()
.map(|s| {
let s = Arc::unwrap_or_clone(s);
let fields = s.schema.fields().iter().map(|f| f.as_ref().clone());
let field_qualifiers = s.field_qualifiers.into_iter();
fields.zip(field_qualifiers).collect::<Vec<_>>()
})
.flatten()
.collect::<Vec<_>>();

// check if the number of fields and qualifiers are the same
let mut new_fields = HashSet::new();
for (f, q) in &fields {
if !new_fields.insert((f, q)) {
return internal_err!(
"Try merge schema failed, column [{}] is ambiguous, please use qualified name to disambiguate",
f.name()
);
}
}

let (fields, field_qualifiers): (Vec<_>, Vec<_>) = fields.into_iter().unzip();

Ok(TableSchema {
schema: Arc::new(Schema::new(fields)),
field_qualifiers,
})
}
}

impl Display for TableSchema {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}",
self.schema
.fields()
.iter()
.zip(self.field_qualifiers.iter())
.map(|(f, q)| qualified_name(q, &f.name()))
.collect::<Vec<_>>()
.join(", ")
)
}
}

impl From<SchemaRef> for TableSchema {
fn from(value: SchemaRef) -> Self {
TableSchema {
schema: value,
field_qualifiers: vec![],
}
}
}

impl From<Schema> for TableSchema {
fn from(value: Schema) -> Self {
TableSchema::from(Arc::new(value))
}
}

pub fn qualified_name(qualifier: &Option<TableRelation>, name: &str) -> String {
match qualifier {
Some(q) => format!("{}.{}", q, name),
None => name.to_string(),
}
}
26 changes: 11 additions & 15 deletions qurious/src/logical/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ use super::{
expr::{LogicalExpr, SortExpr},
plan::{Aggregate, CrossJoin, EmptyRelation, Filter, Join, Limit, LogicalPlan, Projection, Sort, TableScan},
};
use crate::{common::join_type::JoinType, provider::table::TableProvider};
use crate::{common::table_relation::TableRelation, error::Result};
use crate::{
common::{join_type::JoinType, table_schema::TableSchema},
provider::table::TableProvider,
};

pub struct LogicalPlanBuilder {
plan: LogicalPlan,
Expand Down Expand Up @@ -56,25 +59,18 @@ impl LogicalPlanBuilder {
table_source: Arc<dyn TableProvider>,
filter: Option<LogicalExpr>,
) -> Result<Self> {
TableScan::try_new(relation.into(), table_source, None, filter)
TableScan::try_new(relation.into(), table_source, filter)
.map(|s| LogicalPlanBuilder::from(LogicalPlan::TableScan(s)))
}

pub fn cross_join(self, right: LogicalPlan) -> Result<Self> {
let left_fields = self.plan.schema().fields.clone();
let right_fields = right.schema().fields.clone();

// left then right
let schema = Schema::new(
left_fields
.iter()
.chain(right_fields.iter())
.cloned()
.collect::<Vec<_>>(),
);

let schema = TableSchema::merge(vec![self.plan.table_schema(), right.table_schema()])?;
Ok(LogicalPlanBuilder {
plan: LogicalPlan::CrossJoin(CrossJoin::new(Arc::new(self.plan), Arc::new(right), Arc::new(schema))),
plan: LogicalPlan::CrossJoin(CrossJoin {
left: Arc::new(self.plan),
right: Arc::new(right),
schema: Arc::new(schema),
}),
})
}

Expand Down
10 changes: 3 additions & 7 deletions qurious/src/logical/plan/join.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{
common::join_type::JoinType,
common::{join_type::JoinType, table_schema::TableSchemaRef},
logical::{expr::LogicalExpr, plan::LogicalPlan},
};
use arrow::datatypes::SchemaRef;
Expand All @@ -9,16 +9,12 @@ use std::{fmt::Display, sync::Arc};
pub struct CrossJoin {
pub left: Arc<LogicalPlan>,
pub right: Arc<LogicalPlan>,
pub schema: SchemaRef,
pub schema: TableSchemaRef,
}

impl CrossJoin {
pub fn new(left: Arc<LogicalPlan>, right: Arc<LogicalPlan>, schema: SchemaRef) -> Self {
Self { left, right, schema }
}

pub fn schema(&self) -> SchemaRef {
self.schema.clone()
self.schema.arrow_schema()
}

pub fn children(&self) -> Option<Vec<&LogicalPlan>> {
Expand Down
14 changes: 13 additions & 1 deletion qurious/src/logical/plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use arrow::datatypes::SchemaRef;

use super::expr::LogicalExpr;
use crate::common::table_relation::TableRelation;
use crate::common::table_schema::TableSchemaRef;
use crate::common::transformed::{TransformNode, Transformed, TransformedResult};
use crate::error::Result;

Expand Down Expand Up @@ -71,12 +72,13 @@ pub enum LogicalPlan {
impl LogicalPlan {
pub fn relation(&self) -> Option<TableRelation> {
match self {
LogicalPlan::TableScan(s) => Some(s.relation.clone()),
LogicalPlan::TableScan(s) => Some(s.table_name.clone()),
LogicalPlan::SubqueryAlias(SubqueryAlias { alias, .. }) => Some(alias.clone()),
_ => None,
}
}

// FIXME: remove this method when table schema is implemented
pub fn schema(&self) -> SchemaRef {
match self {
LogicalPlan::Projection(p) => p.schema(),
Expand All @@ -95,6 +97,16 @@ impl LogicalPlan {
}
}

pub fn table_schema(&self) -> TableSchemaRef {
match self {
LogicalPlan::TableScan(s) => s.schema.clone(),
LogicalPlan::CrossJoin(s) => s.schema.clone(),
LogicalPlan::SubqueryAlias(s) => s.schema.clone(),
LogicalPlan::Filter(f) => f.input.table_schema(),
_ => todo!("[{}] not implement table_schema", self),
}
}

pub fn children(&self) -> Option<Vec<&LogicalPlan>> {
match self {
LogicalPlan::EmptyRelation(_) | LogicalPlan::Values(_) => None,
Expand Down
84 changes: 20 additions & 64 deletions qurious/src/logical/plan/scan.rs
Original file line number Diff line number Diff line change
@@ -1,113 +1,69 @@
use std::{fmt::Display, sync::Arc, hash::{Hash, Hasher}};
use std::{
fmt::Display,
hash::{Hash, Hasher},
sync::Arc,
};

use arrow::datatypes::{Schema, SchemaRef};
use arrow::datatypes::SchemaRef;

use crate::arrow_err;
use crate::common::table_relation::TableRelation;
use crate::error::{Error, Result};
use crate::common::table_schema::TableSchemaRef;
use crate::common::{table_relation::TableRelation, table_schema::TableSchema};
use crate::error::Result;
use crate::logical::expr::LogicalExpr;
use crate::provider::table::TableProvider;

use super::LogicalPlan;

#[derive(Debug, Clone)]
pub struct TableScan {
pub relation: TableRelation,
pub table_name: TableRelation,
pub source: Arc<dyn TableProvider>,
pub projections: Option<Vec<String>>,
pub projected_schema: SchemaRef,
pub filter: Option<LogicalExpr>,
pub schema: TableSchemaRef,
}

impl TableScan {
pub fn try_new(
relation: impl Into<TableRelation>,
source: Arc<dyn TableProvider>,
projections: Option<Vec<String>>,
filter: Option<LogicalExpr>,
) -> Result<Self> {
let relation = relation.into();

let projected_schema = projections
.as_ref()
.map(|pj| {
pj.iter()
.map(|name| {
source
.schema()
.field_with_name(name)
.map_err(|err| arrow_err!(err))
.cloned()
})
.collect::<Result<Vec<_>>>()
})
.unwrap_or(Ok(source
.schema()
.fields()
.iter()
.map(|f| f.as_ref().clone())
.collect()))
.map(|fields| Arc::new(Schema::new(fields)))?;

let table_name = relation.into();
Ok(Self {
relation,
source,
projections,
projected_schema,
filter,
schema: TableSchema::try_from_qualified_schema(table_name.clone(), source.schema()).map(Arc::new)?,
source,
table_name,
})
}

pub fn schema(&self) -> SchemaRef {
self.projected_schema.clone()
self.schema.arrow_schema()
}

pub fn children(&self) -> Option<Vec<&LogicalPlan>> {
None
}

pub fn set_metadata(&mut self, k: &str, v: &str) {
let schema = self.projected_schema.as_ref().clone();
let mut metadata = schema.metadata;
metadata.insert(k.to_owned(), v.to_owned());

self.projected_schema = Arc::new(Schema::new_with_metadata(schema.fields, metadata));
}
}

impl Display for TableScan {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if let Some(projections) = &self.projections {
write!(
f,
"TableScan: {} Projection: [{}]",
self.relation.to_quanlify_name(),
projections.join(", ")
)
} else {
write!(f, "TableScan: {}", self.relation.to_quanlify_name())
}
write!(f, "TableScan: {}", self.table_name.to_quanlify_name(),)
}
}

impl PartialEq for TableScan {
fn eq(&self, other: &Self) -> bool {
self.relation == other.relation
&& Arc::ptr_eq(&self.source, &other.source)
&& self.projections == other.projections
&& Arc::ptr_eq(&self.projected_schema, &other.projected_schema)
&& self.filter == other.filter
self.table_name == other.table_name && Arc::ptr_eq(&self.source, &other.source)
}
}

impl Eq for TableScan {}

impl Hash for TableScan {
fn hash<H: Hasher>(&self, state: &mut H) {
self.relation.hash(state);
Arc::as_ptr(&self.source).hash(state);
self.projections.hash(state);
Arc::as_ptr(&self.projected_schema).hash(state);
self.table_name.hash(state);
self.schema.hash(state);
self.filter.hash(state);
}
}
Loading

0 comments on commit ca64346

Please sign in to comment.