Skip to content

Commit

Permalink
assign unique id to each logical plan node
Browse files Browse the repository at this point in the history
  • Loading branch information
jiangzhe committed Apr 16, 2024
1 parent bbabfc2 commit 775ff90
Show file tree
Hide file tree
Showing 15 changed files with 269 additions and 115 deletions.
2 changes: 2 additions & 0 deletions xngin-expr/src/id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ impl From<u32> for GlobalID {
}
}

pub const INVALID_GLOBAL_ID: GlobalID = GlobalID(0);

/// QueryID wraps u32 to be the identifier of subqueries in single query.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct QueryID(u32);
Expand Down
71 changes: 31 additions & 40 deletions xngin-plan/src/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,16 +80,12 @@ impl Explain for Op {
fn explain<F: Write>(&self, f: &mut F, conf: &ExplainConf) -> fmt::Result {
match &self.kind {
OpKind::Proj { cols, .. } => {
if let OpOutput::Ref(cols) = &self.output {
f.write_str("Proj{")?;
write_refs(f, cols.iter().map(|c| &c.expr), ", ", conf)?;
f.write_str("}")
} else if let Some(cols) = cols {
if let Some(cols) = cols {
f.write_str("Proj{")?;
write_refs(f, cols.iter().map(|c| &c.expr), ", ", conf)?;
f.write_str("}")
} else {
return Err(fmt::Error);
Err(fmt::Error)
}
}
OpKind::Filt { pred, .. } => {
Expand Down Expand Up @@ -117,20 +113,16 @@ impl Explain for Op {
f.write_char('}')
}
OpKind::Row(row) => {
if let OpOutput::Ref(row) = &self.output {
f.write_str("Row{")?;
write_refs(f, row.iter().map(|c| &c.expr), ", ", conf)?;
f.write_char('}')
} else if let Some(row) = row {
if let Some(row) = row {
f.write_str("Row{")?;
write_refs(f, row.iter().map(|c| &c.expr), ", ", conf)?;
f.write_char('}')
} else {
return Err(fmt::Error);
Err(fmt::Error)
}
}
OpKind::Scan(scan) => {
write!(f, "Table{{id={},cols=[", scan.table.value())?;
write!(f, "Table{{name={},cols=[", scan.table)?;
write_refs(f, scan.cols.iter().map(|c| &c.expr), ", ", conf)?;
f.write_str("]}}")
}
Expand Down Expand Up @@ -360,47 +352,46 @@ impl Explain for Const {
impl Explain for Col {
fn explain<F: Write>(&self, f: &mut F, conf: &ExplainConf) -> fmt::Result {
match &self.kind {
ColKind::Table(table_id, alias, _) => {
ColKind::Table(_, alias, _) => {
if conf.show_col_name {
write!(f, "{}#{}", alias, self.gid.value())
} else {
write!(f, "#{}", self.gid.value())
}
}
ColKind::Query(query_id) => {
if conf.show_col_name {
write!(
f,
"t{}.{}[{}]#{}",
table_id.value(),
alias,
"q{}[{}]#{}",
query_id.value(),
self.idx.value(),
self.gid.value()
)
} else {
write!(f, "#{}", self.gid.value())
}
}
ColKind::Correlated(query_id) => {
if conf.show_col_name {
write!(
f,
"t{}[{}]#{}",
table_id.value(),
"cq{}[{}]#{}",
query_id.value(),
self.idx.value(),
self.gid.value()
)
} else {
write!(f, "c#{}", self.gid.value())
}
}
ColKind::Intra(_) => {
if conf.show_col_name {
write!(f, "i[{}]#{}", self.idx.value(), self.gid.value())
} else {
write!(f, "i#{}", self.gid.value())
}
}
ColKind::Query(query_id) => write!(
f,
"q{}[{}]#{}",
query_id.value(),
self.idx.value(),
self.gid.value()
),
ColKind::Correlated(query_id) => write!(
f,
"cq{}[{}]#{}",
query_id.value(),
self.idx.value(),
self.gid.value()
),
ColKind::Intra(child_id) => write!(
f,
"i{}[{}]#{}",
child_id,
self.idx.value(),
self.gid.value()
),
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion xngin-plan/src/join/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::error::{Error, Result};
use crate::lgc::{Op, OpKind};
use std::collections::HashSet;
use std::ops::{Deref, DerefMut};
use xngin_expr::{ExprKind, QueryID};
use xngin_expr::{ExprKind, GlobalID, QueryID};

// alias of join graph
pub use graph::Graph as JoinGraph;
Expand Down
57 changes: 40 additions & 17 deletions xngin-plan/src/lgc/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@ use semistr::SemiStr;
use xngin_catalog::{Catalog, SchemaID, TableID};
use xngin_expr::controlflow::ControlFlow;
use xngin_expr::{
self as expr, ColIndex, ExprKind, ExprMutVisitor, Plhd, PredFuncKind, QueryID, Setq, SubqKind,
self as expr, ColIndex, ExprKind, ExprMutVisitor, GlobalID, Plhd, PredFuncKind, QueryID, Setq,
SubqKind,
};
use xngin_sql::ast::*;

pub struct LgcBuilder<'a, C: Catalog> {
catalog: &'a C,
default_schema: SchemaID,
default_schema_id: SchemaID,
default_schema: SemiStr,
qs: QuerySet,
scopes: Scopes,
attaches: Vec<QueryID>,
Expand All @@ -29,14 +31,15 @@ impl<'c, C: Catalog> LgcBuilder<'c, C> {
#[inline]
pub(super) fn new(catalog: &'c C, default_schema: &str) -> Result<Self> {
let qs = QuerySet::default();
let default_schema = if let Some(s) = catalog.find_schema_by_name(default_schema) {
let default_schema_id = if let Some(s) = catalog.find_schema_by_name(default_schema) {
s.id
} else {
return Err(Error::SchemaNotExists(default_schema.to_string()));
};
Ok(LgcBuilder {
catalog,
default_schema,
default_schema_id,
default_schema: SemiStr::new(default_schema),
qs,
scopes: Scopes::default(),
attaches: vec![],
Expand Down Expand Up @@ -338,7 +341,7 @@ impl<'c, C: Catalog> LgcBuilder<'c, C> {
}
}
}
Ok((Op::new(OpKind::Row(Some(cols))), Location::Virtual))
Ok((self.gen_op(OpKind::Row(Some(cols))), Location::Virtual))
}
Query::Table(select_table) => self.setup_select_table(select_table, phc, colgen),
Query::Set(select_set) => self.setup_select_set(select_set, phc, colgen),
Expand Down Expand Up @@ -457,37 +460,37 @@ impl<'c, C: Catalog> LgcBuilder<'c, C> {
let mut root = if from.len() == 1 {
from.into_iter().next().unwrap().into()
} else {
Op::new(OpKind::cross_join(from))
self.gen_op(OpKind::cross_join(from))
};
// b) build Filt operator
if let Some(pred) = filter {
root = Op::new(OpKind::filt(pred.into_conj(), root));
root = self.gen_op(OpKind::filt(pred.into_conj(), root));
}
// c) build Aggr operator
if !groups.is_empty() || scalar_aggr {
root = Op::new(OpKind::aggr(groups, root));
root = self.gen_op(OpKind::aggr(groups, root));
}
// d) build Proj operator or merge into Aggr
match &mut root.kind {
OpKind::Aggr(aggr) => {
aggr.proj = proj_cols;
}
_ => {
root = Op::new(OpKind::proj(proj_cols, root));
root = self.gen_op(OpKind::proj(proj_cols, root));
}
}
// e) build Filter operator from HAVING clause
if let Some(pred) = having {
root = Op::new(OpKind::filt(pred.into_conj(), root));
root = self.gen_op(OpKind::filt(pred.into_conj(), root));
}
// f) build Sort operator
if !order.is_empty() {
// scalar aggr query only returns 1 row, ingore SORT operation
root = Op::new(OpKind::sort(order, root));
root = self.gen_op(OpKind::sort(order, root));
}
// g) build Limit operator
if let Some((start, end)) = limit {
root = Op::new(OpKind::limit(start, end, root));
root = self.gen_op(OpKind::limit(start, end, root));
}
Ok((root, Location::Intermediate))
}
Expand Down Expand Up @@ -517,7 +520,7 @@ impl<'c, C: Catalog> LgcBuilder<'c, C> {
self.build_subquery(&None, &select_set.right, phc.allow_unknown_ident, colgen)?;
let right = SubqOp::query(query_id);
Ok((
Op::new(OpKind::setop(kind, q, left, right)),
self.gen_op(OpKind::setop(kind, q, left, right)),
Location::Intermediate,
))
}
Expand Down Expand Up @@ -838,7 +841,7 @@ impl<'c, C: Catalog> LgcBuilder<'c, C> {
)));
};
// now manually construct a subquery to expose all columns in the table
self.table_to_subquery(schema_id, table_id, colgen)?
self.table_to_subquery(schema_id, schema_name, table_id, tbl_name, colgen)?
} else if let Some(query_id) = self.find_cte(&tbl_name) {
// schema not present, first check CTEs
let query_id = *query_id;
Expand All @@ -848,9 +851,15 @@ impl<'c, C: Catalog> LgcBuilder<'c, C> {
self.qs.copy_query(&query_id)?
} else if let Some(t) = self
.catalog
.find_table_by_name(&self.default_schema, &tbl_name)
.find_table_by_name(&self.default_schema_id, &tbl_name)
{
self.table_to_subquery(t.schema_id, t.id, colgen)?
self.table_to_subquery(
t.schema_id,
self.default_schema.clone(),
t.id,
t.name,
colgen,
)?
} else {
return Err(Error::TableNotExists(tbl_name.to_string()));
};
Expand Down Expand Up @@ -904,7 +913,9 @@ impl<'c, C: Catalog> LgcBuilder<'c, C> {
fn table_to_subquery(
&mut self,
schema_id: SchemaID,
schema_name: SemiStr,
table_id: TableID,
table_name: SemiStr,
colgen: &mut ColGen,
) -> Result<QueryID> {
let all_cols = self.catalog.all_columns_in_table(&table_id);
Expand All @@ -916,7 +927,14 @@ impl<'c, C: Catalog> LgcBuilder<'c, C> {
let col = colgen.gen_tbl_col(qry_id, table_id, idx, c.pty, c.name.clone());
proj_cols.push(ProjCol::implicit_alias(col, c.name))
}
let scan = OpKind::table(qry_id, schema_id, table_id, proj_cols);
let scan = OpKind::table(
qry_id,
schema_id,
schema_name,
table_id,
table_name,
proj_cols,
);
subquery.root = Op::new(scan);
// todo: currently we assume all tables are located on disk.
subquery.location = Location::Disk;
Expand All @@ -931,6 +949,11 @@ impl<'c, C: Catalog> LgcBuilder<'c, C> {
.rev()
.find_map(|s| s.cte_aliases.get(alias))
}

#[inline]
fn gen_op(&mut self, kind: OpKind) -> Op {
Op::new(kind)
}
}

/// Validate proj and aggr.
Expand Down
5 changes: 3 additions & 2 deletions xngin-plan/src/lgc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -789,7 +789,7 @@ pub(crate) mod tests {
#[inline]
fn enter(&mut self, op: &Op) -> ControlFlow<TableID> {
match &op.kind {
OpKind::Scan(scan) => ControlFlow::Break(scan.table),
OpKind::Scan(scan) => ControlFlow::Break(scan.table_id),
_ => ControlFlow::Continue(()),
}
}
Expand Down Expand Up @@ -935,7 +935,8 @@ pub(crate) mod tests {
use crate::explain::{Explain, ExplainConf};
println!("SQL: {}", sql);
let mut s = String::new();
plan.explain(&mut s, &ExplainConf::default()).unwrap();
let conf = ExplainConf::default();
plan.explain(&mut s, &conf).unwrap();
println!("Plan:\n{}", s)
}
}
Loading

0 comments on commit 775ff90

Please sign in to comment.