Skip to content

Commit

Permalink
impl agg and subquery plans to sql (apache#9606)
Browse files Browse the repository at this point in the history
  • Loading branch information
devinjdangelo authored Mar 15, 2024
1 parent 3c26e59 commit 219de5f
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 13 deletions.
3 changes: 3 additions & 0 deletions datafusion/sql/src/unparser/ast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,9 @@ impl SelectBuilder {
new.projection = value;
new
}
pub fn already_projected(&self) -> bool {
!self.projection.is_empty()
}
pub fn into(&mut self, value: Option<ast::SelectInto>) -> &mut Self {
let new = self;
new.into = value;
Expand Down
102 changes: 89 additions & 13 deletions datafusion/sql/src/unparser/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@
// specific language governing permissions and limitations
// under the License.

use datafusion_common::{not_impl_err, plan_err, DataFusionError, Result};
use datafusion_common::{internal_err, not_impl_err, plan_err, DataFusionError, Result};
use datafusion_expr::{expr::Alias, Expr, JoinConstraint, JoinType, LogicalPlan};
use sqlparser::ast;
use sqlparser::ast::{self, Ident, SelectItem};

use super::{
ast::{
BuilderError, QueryBuilder, RelationBuilder, SelectBuilder, TableRelationBuilder,
TableWithJoinsBuilder,
BuilderError, DerivedRelationBuilder, QueryBuilder, RelationBuilder,
SelectBuilder, TableRelationBuilder, TableWithJoinsBuilder,
},
Unparser,
};
Expand Down Expand Up @@ -129,14 +129,88 @@ impl Unparser<'_> {
Ok(())
}
LogicalPlan::Projection(p) => {
let items = p
.expr
.iter()
.map(|e| self.select_item_to_sql(e))
.collect::<Result<Vec<_>>>()?;
select.projection(items);

self.select_to_sql_recursively(p.input.as_ref(), query, select, relation)
// A second projection implies a derived tablefactor
if !select.already_projected() {
// Special handling when projecting an agregation plan
if let LogicalPlan::Aggregate(agg) = p.input.as_ref() {
let mut items = p
.expr
.iter()
.filter(|e| !matches!(e, Expr::AggregateFunction(_)))
.map(|e| self.select_item_to_sql(e))
.collect::<Result<Vec<_>>>()?;

let proj_aggs = p
.expr
.iter()
.filter(|e| matches!(e, Expr::AggregateFunction(_)))
.zip(agg.aggr_expr.iter())
.map(|(proj, agg_exp)| {
let sql_agg_expr = self.select_item_to_sql(agg_exp)?;
let maybe_aliased =
if let Expr::Alias(Alias { name, .. }) = proj {
if let SelectItem::UnnamedExpr(aggregation_fun) =
sql_agg_expr
{
SelectItem::ExprWithAlias {
expr: aggregation_fun,
alias: Ident {
value: name.to_string(),
quote_style: None,
},
}
} else {
sql_agg_expr
}
} else {
sql_agg_expr
};
Ok(maybe_aliased)
})
.collect::<Result<Vec<_>>>()?;
items.extend(proj_aggs);
select.projection(items);
select.group_by(ast::GroupByExpr::Expressions(
agg.group_expr
.iter()
.map(|expr| self.expr_to_sql(expr))
.collect::<Result<Vec<_>>>()?,
));
self.select_to_sql_recursively(
agg.input.as_ref(),
query,
select,
relation,
)
} else {
let items = p
.expr
.iter()
.map(|e| self.select_item_to_sql(e))
.collect::<Result<Vec<_>>>()?;
select.projection(items);
self.select_to_sql_recursively(
p.input.as_ref(),
query,
select,
relation,
)
}
} else {
let mut derived_builder = DerivedRelationBuilder::default();
derived_builder.lateral(false).alias(None).subquery({
let inner_statment = self.plan_to_sql(plan)?;
if let ast::Statement::Query(inner_query) = inner_statment {
inner_query
} else {
return internal_err!(
"Subquery must be a Query, but found {inner_statment:?}"
);
}
});
relation.derived(derived_builder);
Ok(())
}
}
LogicalPlan::Filter(filter) => {
let filter_expr = self.expr_to_sql(&filter.predicate)?;
Expand Down Expand Up @@ -176,7 +250,9 @@ impl Unparser<'_> {
)
}
LogicalPlan::Aggregate(_agg) => {
not_impl_err!("Unsupported operator: {plan:?}")
not_impl_err!(
"Unsupported aggregation plan not following a projection: {plan:?}"
)
}
LogicalPlan::Distinct(_distinct) => {
not_impl_err!("Unsupported operator: {plan:?}")
Expand Down
24 changes: 24 additions & 0 deletions datafusion/sql/tests/sql_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4560,6 +4560,26 @@ fn roundtrip_statement() {
"select ta.j1_id, tb.j2_string, tc.j3_string from j1 ta join j2 tb on (ta.j1_id = tb.j2_id) join j3 tc on (ta.j1_id = tc.j3_id);",
r#"SELECT ta.j1_id, tb.j2_string, tc.j3_string FROM j1 AS ta JOIN j2 AS tb ON (ta.j1_id = tb.j2_id) JOIN j3 AS tc ON (ta.j1_id = tc.j3_id)"#,
),
(
"select * from (select id, first_name from person)",
"SELECT person.id, person.first_name FROM (SELECT person.id, person.first_name FROM person)"
),
(
"select * from (select id, first_name from (select * from person))",
"SELECT person.id, person.first_name FROM (SELECT person.id, person.first_name FROM (SELECT person.id, person.first_name, person.last_name, person.age, person.state, person.salary, person.birth_date, person.😀 FROM person))"
),
(
"select id, count(*) as cnt from (select id from person) group by id",
"SELECT person.id, COUNT(*) AS cnt FROM (SELECT person.id FROM person) GROUP BY person.id"
),
(
"select id, count(*) as cnt from (select p1.id as id from person p1 inner join person p2 on p1.id=p2.id) group by id",
"SELECT p1.id, COUNT(*) AS cnt FROM (SELECT p1.id FROM person AS p1 JOIN person AS p2 ON (p1.id = p2.id)) GROUP BY p1.id"
),
(
"select id, count(*), first_name from person group by first_name, id",
"SELECT person.id, COUNT(*), person.first_name FROM person GROUP BY person.first_name, person.id"
),
];

let roundtrip = |sql: &str| -> Result<String> {
Expand All @@ -4570,8 +4590,12 @@ fn roundtrip_statement() {
let sql_to_rel = SqlToRel::new(&context);
let plan = sql_to_rel.sql_statement_to_plan(statement)?;

println!("{}", plan.display_indent());

let ast = plan_to_sql(&plan)?;

println!("{ast}");

Ok(format!("{}", ast))
};

Expand Down

0 comments on commit 219de5f

Please sign in to comment.