Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

assign unique id to each logical plan node #136

Merged
merged 1 commit into from
Apr 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions xngin-expr/src/id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ impl From<u32> for GlobalID {
}
}

pub const INVALID_GLOBAL_ID: GlobalID = GlobalID(0);

/// QueryID wraps u32 to be the identifier of subqueries in single query.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct QueryID(u32);
Expand Down
71 changes: 31 additions & 40 deletions xngin-plan/src/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,16 +80,12 @@ impl Explain for Op {
fn explain<F: Write>(&self, f: &mut F, conf: &ExplainConf) -> fmt::Result {
match &self.kind {
OpKind::Proj { cols, .. } => {
if let OpOutput::Ref(cols) = &self.output {
f.write_str("Proj{")?;
write_refs(f, cols.iter().map(|c| &c.expr), ", ", conf)?;
f.write_str("}")
} else if let Some(cols) = cols {
if let Some(cols) = cols {
f.write_str("Proj{")?;
write_refs(f, cols.iter().map(|c| &c.expr), ", ", conf)?;
f.write_str("}")
} else {
return Err(fmt::Error);
Err(fmt::Error)
}
}
OpKind::Filt { pred, .. } => {
Expand Down Expand Up @@ -117,20 +113,16 @@ impl Explain for Op {
f.write_char('}')
}
OpKind::Row(row) => {
if let OpOutput::Ref(row) = &self.output {
f.write_str("Row{")?;
write_refs(f, row.iter().map(|c| &c.expr), ", ", conf)?;
f.write_char('}')
} else if let Some(row) = row {
if let Some(row) = row {
f.write_str("Row{")?;
write_refs(f, row.iter().map(|c| &c.expr), ", ", conf)?;
f.write_char('}')
} else {
return Err(fmt::Error);
Err(fmt::Error)
}
}
OpKind::Scan(scan) => {
write!(f, "Table{{id={},cols=[", scan.table.value())?;
write!(f, "Table{{name={},cols=[", scan.table)?;
write_refs(f, scan.cols.iter().map(|c| &c.expr), ", ", conf)?;
f.write_str("]}}")
}
Expand Down Expand Up @@ -360,47 +352,46 @@ impl Explain for Const {
impl Explain for Col {
fn explain<F: Write>(&self, f: &mut F, conf: &ExplainConf) -> fmt::Result {
match &self.kind {
ColKind::Table(table_id, alias, _) => {
ColKind::Table(_, alias, _) => {
if conf.show_col_name {
write!(f, "{}#{}", alias, self.gid.value())
} else {
write!(f, "#{}", self.gid.value())
}
}
ColKind::Query(query_id) => {
if conf.show_col_name {
write!(
f,
"t{}.{}[{}]#{}",
table_id.value(),
alias,
"q{}[{}]#{}",
query_id.value(),
self.idx.value(),
self.gid.value()
)
} else {
write!(f, "#{}", self.gid.value())
}
}
ColKind::Correlated(query_id) => {
if conf.show_col_name {
write!(
f,
"t{}[{}]#{}",
table_id.value(),
"cq{}[{}]#{}",
query_id.value(),
self.idx.value(),
self.gid.value()
)
} else {
write!(f, "c#{}", self.gid.value())
}
}
ColKind::Intra(_) => {
if conf.show_col_name {
write!(f, "i[{}]#{}", self.idx.value(), self.gid.value())
} else {
write!(f, "i#{}", self.gid.value())
}
}
ColKind::Query(query_id) => write!(
f,
"q{}[{}]#{}",
query_id.value(),
self.idx.value(),
self.gid.value()
),
ColKind::Correlated(query_id) => write!(
f,
"cq{}[{}]#{}",
query_id.value(),
self.idx.value(),
self.gid.value()
),
ColKind::Intra(child_id) => write!(
f,
"i{}[{}]#{}",
child_id,
self.idx.value(),
self.gid.value()
),
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion xngin-plan/src/join/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::error::{Error, Result};
use crate::lgc::{Op, OpKind};
use std::collections::HashSet;
use std::ops::{Deref, DerefMut};
use xngin_expr::{ExprKind, QueryID};
use xngin_expr::{ExprKind, GlobalID, QueryID};

// alias of join graph
pub use graph::Graph as JoinGraph;
Expand Down
57 changes: 40 additions & 17 deletions xngin-plan/src/lgc/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@ use semistr::SemiStr;
use xngin_catalog::{Catalog, SchemaID, TableID};
use xngin_expr::controlflow::ControlFlow;
use xngin_expr::{
self as expr, ColIndex, ExprKind, ExprMutVisitor, Plhd, PredFuncKind, QueryID, Setq, SubqKind,
self as expr, ColIndex, ExprKind, ExprMutVisitor, GlobalID, Plhd, PredFuncKind, QueryID, Setq,
SubqKind,
};
use xngin_sql::ast::*;

pub struct LgcBuilder<'a, C: Catalog> {
catalog: &'a C,
default_schema: SchemaID,
default_schema_id: SchemaID,
default_schema: SemiStr,
qs: QuerySet,
scopes: Scopes,
attaches: Vec<QueryID>,
Expand All @@ -29,14 +31,15 @@ impl<'c, C: Catalog> LgcBuilder<'c, C> {
#[inline]
pub(super) fn new(catalog: &'c C, default_schema: &str) -> Result<Self> {
let qs = QuerySet::default();
let default_schema = if let Some(s) = catalog.find_schema_by_name(default_schema) {
let default_schema_id = if let Some(s) = catalog.find_schema_by_name(default_schema) {
s.id
} else {
return Err(Error::SchemaNotExists(default_schema.to_string()));
};
Ok(LgcBuilder {
catalog,
default_schema,
default_schema_id,
default_schema: SemiStr::new(default_schema),
qs,
scopes: Scopes::default(),
attaches: vec![],
Expand Down Expand Up @@ -338,7 +341,7 @@ impl<'c, C: Catalog> LgcBuilder<'c, C> {
}
}
}
Ok((Op::new(OpKind::Row(Some(cols))), Location::Virtual))
Ok((self.gen_op(OpKind::Row(Some(cols))), Location::Virtual))
}
Query::Table(select_table) => self.setup_select_table(select_table, phc, colgen),
Query::Set(select_set) => self.setup_select_set(select_set, phc, colgen),
Expand Down Expand Up @@ -457,37 +460,37 @@ impl<'c, C: Catalog> LgcBuilder<'c, C> {
let mut root = if from.len() == 1 {
from.into_iter().next().unwrap().into()
} else {
Op::new(OpKind::cross_join(from))
self.gen_op(OpKind::cross_join(from))
};
// b) build Filt operator
if let Some(pred) = filter {
root = Op::new(OpKind::filt(pred.into_conj(), root));
root = self.gen_op(OpKind::filt(pred.into_conj(), root));
}
// c) build Aggr operator
if !groups.is_empty() || scalar_aggr {
root = Op::new(OpKind::aggr(groups, root));
root = self.gen_op(OpKind::aggr(groups, root));
}
// d) build Proj operator or merge into Aggr
match &mut root.kind {
OpKind::Aggr(aggr) => {
aggr.proj = proj_cols;
}
_ => {
root = Op::new(OpKind::proj(proj_cols, root));
root = self.gen_op(OpKind::proj(proj_cols, root));
}
}
// e) build Filter operator from HAVING clause
if let Some(pred) = having {
root = Op::new(OpKind::filt(pred.into_conj(), root));
root = self.gen_op(OpKind::filt(pred.into_conj(), root));
}
// f) build Sort operator
if !order.is_empty() {
// scalar aggr query only returns 1 row, ingore SORT operation
root = Op::new(OpKind::sort(order, root));
root = self.gen_op(OpKind::sort(order, root));
}
// g) build Limit operator
if let Some((start, end)) = limit {
root = Op::new(OpKind::limit(start, end, root));
root = self.gen_op(OpKind::limit(start, end, root));
}
Ok((root, Location::Intermediate))
}
Expand Down Expand Up @@ -517,7 +520,7 @@ impl<'c, C: Catalog> LgcBuilder<'c, C> {
self.build_subquery(&None, &select_set.right, phc.allow_unknown_ident, colgen)?;
let right = SubqOp::query(query_id);
Ok((
Op::new(OpKind::setop(kind, q, left, right)),
self.gen_op(OpKind::setop(kind, q, left, right)),
Location::Intermediate,
))
}
Expand Down Expand Up @@ -838,7 +841,7 @@ impl<'c, C: Catalog> LgcBuilder<'c, C> {
)));
};
// now manually construct a subquery to expose all columns in the table
self.table_to_subquery(schema_id, table_id, colgen)?
self.table_to_subquery(schema_id, schema_name, table_id, tbl_name, colgen)?
} else if let Some(query_id) = self.find_cte(&tbl_name) {
// schema not present, first check CTEs
let query_id = *query_id;
Expand All @@ -848,9 +851,15 @@ impl<'c, C: Catalog> LgcBuilder<'c, C> {
self.qs.copy_query(&query_id)?
} else if let Some(t) = self
.catalog
.find_table_by_name(&self.default_schema, &tbl_name)
.find_table_by_name(&self.default_schema_id, &tbl_name)
{
self.table_to_subquery(t.schema_id, t.id, colgen)?
self.table_to_subquery(
t.schema_id,
self.default_schema.clone(),
t.id,
t.name,
colgen,
)?
} else {
return Err(Error::TableNotExists(tbl_name.to_string()));
};
Expand Down Expand Up @@ -904,7 +913,9 @@ impl<'c, C: Catalog> LgcBuilder<'c, C> {
fn table_to_subquery(
&mut self,
schema_id: SchemaID,
schema_name: SemiStr,
table_id: TableID,
table_name: SemiStr,
colgen: &mut ColGen,
) -> Result<QueryID> {
let all_cols = self.catalog.all_columns_in_table(&table_id);
Expand All @@ -916,7 +927,14 @@ impl<'c, C: Catalog> LgcBuilder<'c, C> {
let col = colgen.gen_tbl_col(qry_id, table_id, idx, c.pty, c.name.clone());
proj_cols.push(ProjCol::implicit_alias(col, c.name))
}
let scan = OpKind::table(qry_id, schema_id, table_id, proj_cols);
let scan = OpKind::table(
qry_id,
schema_id,
schema_name,
table_id,
table_name,
proj_cols,
);
subquery.root = Op::new(scan);
// todo: currently we assume all tables are located on disk.
subquery.location = Location::Disk;
Expand All @@ -931,6 +949,11 @@ impl<'c, C: Catalog> LgcBuilder<'c, C> {
.rev()
.find_map(|s| s.cte_aliases.get(alias))
}

#[inline]
fn gen_op(&mut self, kind: OpKind) -> Op {
Op::new(kind)
}
}

/// Validate proj and aggr.
Expand Down
5 changes: 3 additions & 2 deletions xngin-plan/src/lgc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -789,7 +789,7 @@ pub(crate) mod tests {
#[inline]
fn enter(&mut self, op: &Op) -> ControlFlow<TableID> {
match &op.kind {
OpKind::Scan(scan) => ControlFlow::Break(scan.table),
OpKind::Scan(scan) => ControlFlow::Break(scan.table_id),
_ => ControlFlow::Continue(()),
}
}
Expand Down Expand Up @@ -935,7 +935,8 @@ pub(crate) mod tests {
use crate::explain::{Explain, ExplainConf};
println!("SQL: {}", sql);
let mut s = String::new();
plan.explain(&mut s, &ExplainConf::default()).unwrap();
let conf = ExplainConf::default();
plan.explain(&mut s, &conf).unwrap();
println!("Plan:\n{}", s)
}
}
Loading
Loading