From 162e7403fde9a4ae157c2fddce3dffe3b9506434 Mon Sep 17 00:00:00 2001 From: Matthew Gapp <61894094+matthewgapp@users.noreply.github.com> Date: Wed, 10 Jan 2024 16:45:42 -0800 Subject: [PATCH 01/10] add config flag for recursive ctes update docs from script update slt test for doc change --- datafusion/common/src/config.rs | 5 +++++ datafusion/sql/src/query.rs | 13 ++++++++++++- .../sqllogictest/test_files/information_schema.slt | 2 ++ docs/source/user-guide/configs.md | 1 + testing | 2 +- 5 files changed, 21 insertions(+), 2 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 996a505dea80..e00c17930850 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -290,6 +290,11 @@ config_namespace! { /// Hive. Note that this setting does not affect reading partitioned /// tables (e.g. `/table/year=2021/month=01/data.parquet`). pub listing_table_ignore_subdirectory: bool, default = true + + /// Should DataFusion support recursive CTEs + /// Defaults to false since this feature is a work in progress and may not + /// behave as expected + pub enable_recursive_ctes: bool, default = false } } diff --git a/datafusion/sql/src/query.rs b/datafusion/sql/src/query.rs index dd4cab126261..388377e3ee6b 100644 --- a/datafusion/sql/src/query.rs +++ b/datafusion/sql/src/query.rs @@ -54,7 +54,18 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { // Process CTEs from top to bottom // do not allow self-references if with.recursive { - return not_impl_err!("Recursive CTEs are not supported"); + if self + .context_provider + .options() + .execution + .enable_recursive_ctes + { + return plan_err!( + "Recursive CTEs are enabled but are not yet supported" + ); + } else { + return not_impl_err!("Recursive CTEs are not supported"); + } } for cte in with.cte_tables { diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 44daa5141677..b37b78ab6d79 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -150,6 +150,7 @@ datafusion.execution.aggregate.scalar_update_factor 10 datafusion.execution.batch_size 8192 datafusion.execution.coalesce_batches true datafusion.execution.collect_statistics false +datafusion.execution.enable_recursive_ctes false datafusion.execution.listing_table_ignore_subdirectory true datafusion.execution.max_buffered_batches_per_output_file 2 datafusion.execution.meta_fetch_concurrency 32 @@ -225,6 +226,7 @@ datafusion.execution.aggregate.scalar_update_factor 10 Specifies the threshold f datafusion.execution.batch_size 8192 Default batch size while creating new batches, it's especially useful for buffer-in-memory batches since creating tiny batches would result in too much metadata memory consumption datafusion.execution.coalesce_batches true When set to true, record batches will be examined between each operator and small batches will be coalesced into larger batches. This is helpful when there are highly selective filters or joins that could produce tiny output batches. The target batch size is determined by the configuration setting datafusion.execution.collect_statistics false Should DataFusion collect statistics after listing files +datafusion.execution.enable_recursive_ctes false Should DataFusion support recursive CTEs Defaults to false since this feature is a work in progress and may not behave as expected datafusion.execution.listing_table_ignore_subdirectory true Should sub directories be ignored when scanning directories for data files. Defaults to true (ignores subdirectories), consistent with Hive. Note that this setting does not affect reading partitioned tables (e.g. `/table/year=2021/month=01/data.parquet`). datafusion.execution.max_buffered_batches_per_output_file 2 This is the maximum number of RecordBatches buffered for each output file being worked. Higher values can potentially give faster write performance at the cost of higher peak memory consumption datafusion.execution.meta_fetch_concurrency 32 Number of files to read in parallel when inferring schema and statistics diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 5e26e2b205dd..a812b74284cf 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -83,6 +83,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.execution.soft_max_rows_per_output_file | 50000000 | Target number of rows in output files when writing multiple. This is a soft max, so it can be exceeded slightly. There also will be one file smaller than the limit if the total number of rows written is not roughly divisible by the soft max | | datafusion.execution.max_buffered_batches_per_output_file | 2 | This is the maximum number of RecordBatches buffered for each output file being worked. Higher values can potentially give faster write performance at the cost of higher peak memory consumption | | datafusion.execution.listing_table_ignore_subdirectory | true | Should sub directories be ignored when scanning directories for data files. Defaults to true (ignores subdirectories), consistent with Hive. Note that this setting does not affect reading partitioned tables (e.g. `/table/year=2021/month=01/data.parquet`). | +| datafusion.execution.enable_recursive_ctes | false | Should DataFusion support recursive CTEs Defaults to false since this feature is a work in progress and may not behave as expected | | datafusion.optimizer.enable_distinct_aggregation_soft_limit | true | When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read. | | datafusion.optimizer.enable_round_robin_repartition | true | When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores | | datafusion.optimizer.enable_topk_aggregation | true | When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible | diff --git a/testing b/testing index 98fceecd024d..bb8b92eb0ba7 160000 --- a/testing +++ b/testing @@ -1 +1 @@ -Subproject commit 98fceecd024dccd2f8a00e32fc144975f218acf4 +Subproject commit bb8b92eb0ba7d9d1ae2348f454d97dd361d36ade From cbe79e5edec9bdbf20c89a5f8eba5ee6d2bbe78d Mon Sep 17 00:00:00 2001 From: Matthew Gapp <61894094+matthewgapp@users.noreply.github.com> Date: Tue, 16 Jan 2024 09:35:08 -0800 Subject: [PATCH 02/10] restore testing pin --- testing | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/testing b/testing index bb8b92eb0ba7..98fceecd024d 160000 --- a/testing +++ b/testing @@ -1 +1 @@ -Subproject commit bb8b92eb0ba7d9d1ae2348f454d97dd361d36ade +Subproject commit 98fceecd024dccd2f8a00e32fc144975f218acf4 From 6ef0ff2458f0e7ed2056a4a56c4a0222af4c4c49 Mon Sep 17 00:00:00 2001 From: Matthew Gapp <61894094+matthewgapp@users.noreply.github.com> Date: Sat, 13 Jan 2024 16:09:34 -0800 Subject: [PATCH 03/10] add sql -> logical plan support * impl cte as work table * move SharedState to continuance * impl WorkTableState wip: readying pr to implement only logical plan fix sql integration test wip: add sql test for logical plan wip: format test assertion --- datafusion-cli/Cargo.lock | 2 + datafusion/common/src/dfschema.rs | 5 + datafusion/core/src/datasource/cte.rs | 89 +++++++++++ datafusion/core/src/datasource/mod.rs | 1 + datafusion/core/src/execution/context/mod.rs | 10 ++ datafusion/core/src/physical_planner.rs | 7 +- datafusion/expr/src/logical_plan/builder.rs | 23 +++ datafusion/expr/src/logical_plan/mod.rs | 4 +- datafusion/expr/src/logical_plan/plan.rs | 46 ++++++ .../optimizer/src/common_subexpr_eliminate.rs | 1 + .../optimizer/src/optimize_projections.rs | 1 + datafusion/proto/src/logical_plan/mod.rs | 3 + datafusion/sql/src/planner.rs | 9 ++ datafusion/sql/src/query.rs | 149 +++++++++++++++--- datafusion/sql/tests/sql_integration.rs | 51 +++++- 15 files changed, 372 insertions(+), 29 deletions(-) create mode 100644 datafusion/core/src/datasource/cte.rs diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index 5663e736dbd8..eed8ec1876d0 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -1207,6 +1207,7 @@ dependencies = [ "parking_lot", "rand", "tempfile", + "tokio", "url", ] @@ -1298,6 +1299,7 @@ dependencies = [ "pin-project-lite", "rand", "tokio", + "tokio-stream", "uuid", ] diff --git a/datafusion/common/src/dfschema.rs b/datafusion/common/src/dfschema.rs index c715fad1122f..dd7365c45782 100644 --- a/datafusion/common/src/dfschema.rs +++ b/datafusion/common/src/dfschema.rs @@ -909,6 +909,11 @@ impl DFField { self.field = f.into(); self } + + pub fn with_qualifier(mut self, qualifier: impl Into) -> Self { + self.qualifier = Some(qualifier.into()); + self + } } impl From for DFField { diff --git a/datafusion/core/src/datasource/cte.rs b/datafusion/core/src/datasource/cte.rs new file mode 100644 index 000000000000..9fb241f49db3 --- /dev/null +++ b/datafusion/core/src/datasource/cte.rs @@ -0,0 +1,89 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! CteWorkTable implementation used for recursive queries + +use std::any::Any; +use std::sync::Arc; + +use arrow::datatypes::SchemaRef; +use async_trait::async_trait; +use datafusion_common::not_impl_err; + +use crate::{ + error::Result, + logical_expr::{Expr, LogicalPlan, TableProviderFilterPushDown}, + physical_plan::ExecutionPlan, +}; + +use datafusion_common::DataFusionError; + +use crate::datasource::{TableProvider, TableType}; +use crate::execution::context::SessionState; + +/// TODO: add docs +pub struct CteWorkTable { + name: String, + table_schema: SchemaRef, +} + +impl CteWorkTable { + /// TODO: add doc + pub fn new(name: &str, table_schema: SchemaRef) -> Self { + Self { + name: name.to_owned(), + table_schema, + } + } +} + +#[async_trait] +impl TableProvider for CteWorkTable { + fn as_any(&self) -> &dyn Any { + self + } + + fn get_logical_plan(&self) -> Option<&LogicalPlan> { + None + } + + fn schema(&self) -> SchemaRef { + self.table_schema.clone() + } + + fn table_type(&self) -> TableType { + TableType::Temporary + } + + async fn scan( + &self, + _state: &SessionState, + _projection: Option<&Vec>, + _filters: &[Expr], + _limit: Option, + ) -> Result> { + not_impl_err!("scan not implemented for CteWorkTable yet") + } + + fn supports_filter_pushdown( + &self, + _filter: &Expr, + ) -> Result { + // TODO: should we support filter pushdown? + Ok(TableProviderFilterPushDown::Unsupported) + } +} diff --git a/datafusion/core/src/datasource/mod.rs b/datafusion/core/src/datasource/mod.rs index 2e516cc36a01..93f197ec9438 100644 --- a/datafusion/core/src/datasource/mod.rs +++ b/datafusion/core/src/datasource/mod.rs @@ -20,6 +20,7 @@ //! [`ListingTable`]: crate::datasource::listing::ListingTable pub mod avro_to_arrow; +pub mod cte; pub mod default_table_source; pub mod empty; pub mod file_format; diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 1e378541b624..30033aefccd5 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -26,6 +26,7 @@ mod parquet; use crate::{ catalog::{CatalogList, MemoryCatalogList}, datasource::{ + cte::CteWorkTable, function::{TableFunction, TableFunctionImpl}, listing::{ListingOptions, ListingTable}, provider::TableProviderFactory, @@ -1899,6 +1900,15 @@ impl<'a> ContextProvider for SessionContextProvider<'a> { Ok(provider_as_source(provider)) } + fn create_cte_work_table( + &self, + name: &str, + schema: SchemaRef, + ) -> Result> { + let table = Arc::new(CteWorkTable::new(name, schema)); + Ok(provider_as_source(table)) + } + fn get_function_meta(&self, name: &str) -> Option> { self.state.scalar_functions().get(name).cloned() } diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 98390ac271d0..bc448fe06fcf 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -87,8 +87,8 @@ use datafusion_expr::expr::{ use datafusion_expr::expr_rewriter::unnormalize_cols; use datafusion_expr::logical_plan::builder::wrap_projection_for_join_if_necessary; use datafusion_expr::{ - DescribeTable, DmlStatement, ScalarFunctionDefinition, StringifiedPlan, WindowFrame, - WindowFrameBound, WriteOp, + DescribeTable, DmlStatement, RecursiveQuery, ScalarFunctionDefinition, + StringifiedPlan, WindowFrame, WindowFrameBound, WriteOp, }; use datafusion_physical_expr::expressions::Literal; use datafusion_physical_plan::placeholder_row::PlaceholderRowExec; @@ -1290,6 +1290,9 @@ impl DefaultPhysicalPlanner { Ok(plan) } } + LogicalPlan::RecursiveQuery(RecursiveQuery { name: _, static_term: _, recursive_term: _, is_distinct: _,.. }) => { + not_impl_err!("Physical counterpart of RecursiveQuery is not implemented yet") + } }; exec_plan }.boxed() diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 847fbbbf61c7..ef70d30f77b4 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -55,6 +55,8 @@ use datafusion_common::{ ScalarValue, TableReference, ToDFSchema, UnnestOptions, }; +use super::plan::RecursiveQuery; + /// Default table name for unnamed table pub const UNNAMED_TABLE: &str = "?table?"; @@ -121,6 +123,27 @@ impl LogicalPlanBuilder { })) } + /// Convert a regular plan into a recursive query. + pub fn to_recursive_query( + &self, + name: String, + recursive_term: LogicalPlan, + is_distinct: bool, + ) -> Result { + // TODO: we need to do a bunch of validation here. Maybe more. + if is_distinct { + return Err(DataFusionError::NotImplemented( + "Recursive queries with distinct is not supported".to_string(), + )); + } + Ok(Self::from(LogicalPlan::RecursiveQuery(RecursiveQuery { + name, + static_term: Arc::new(self.plan.clone()), + recursive_term: Arc::new(recursive_term), + is_distinct, + }))) + } + /// Create a values list based relation, and the schema is inferred from data, consuming /// `value`. See the [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html) /// documentation for more details. diff --git a/datafusion/expr/src/logical_plan/mod.rs b/datafusion/expr/src/logical_plan/mod.rs index bc722dd69ace..f6e6000897a5 100644 --- a/datafusion/expr/src/logical_plan/mod.rs +++ b/datafusion/expr/src/logical_plan/mod.rs @@ -36,8 +36,8 @@ pub use plan::{ projection_schema, Aggregate, Analyze, CrossJoin, DescribeTable, Distinct, DistinctOn, EmptyRelation, Explain, Extension, Filter, Join, JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType, Prepare, Projection, - Repartition, Sort, StringifiedPlan, Subquery, SubqueryAlias, TableScan, - ToStringifiedPlan, Union, Unnest, Values, Window, + RecursiveQuery, Repartition, Sort, StringifiedPlan, Subquery, SubqueryAlias, + TableScan, ToStringifiedPlan, Union, Unnest, Values, Window, }; pub use statement::{ SetVariable, Statement, TransactionAccessMode, TransactionConclusion, TransactionEnd, diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 93a38fb40df5..1cf325caa90c 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -154,6 +154,8 @@ pub enum LogicalPlan { /// Unnest a column that contains a nested list type such as an /// ARRAY. This is used to implement SQL `UNNEST` Unnest(Unnest), + /// A variadic query (e.g. "Recursive CTEs") + RecursiveQuery(RecursiveQuery), } impl LogicalPlan { @@ -191,6 +193,10 @@ impl LogicalPlan { LogicalPlan::Copy(CopyTo { input, .. }) => input.schema(), LogicalPlan::Ddl(ddl) => ddl.schema(), LogicalPlan::Unnest(Unnest { schema, .. }) => schema, + LogicalPlan::RecursiveQuery(RecursiveQuery { static_term, .. }) => { + // we take the schema of the static term as the schema of the entire recursive query + static_term.schema() + } } } @@ -243,6 +249,10 @@ impl LogicalPlan { | LogicalPlan::TableScan(_) => { vec![self.schema()] } + LogicalPlan::RecursiveQuery(RecursiveQuery { static_term, .. }) => { + // return only the schema of the static term + static_term.all_schemas() + } // return children schemas LogicalPlan::Limit(_) | LogicalPlan::Subquery(_) @@ -384,6 +394,7 @@ impl LogicalPlan { .try_for_each(f), // plans without expressions LogicalPlan::EmptyRelation(_) + | LogicalPlan::RecursiveQuery(_) | LogicalPlan::Subquery(_) | LogicalPlan::SubqueryAlias(_) | LogicalPlan::Limit(_) @@ -430,6 +441,11 @@ impl LogicalPlan { LogicalPlan::Ddl(ddl) => ddl.inputs(), LogicalPlan::Unnest(Unnest { input, .. }) => vec![input], LogicalPlan::Prepare(Prepare { input, .. }) => vec![input], + LogicalPlan::RecursiveQuery(RecursiveQuery { + static_term, + recursive_term, + .. + }) => vec![static_term, recursive_term], // plans without inputs LogicalPlan::TableScan { .. } | LogicalPlan::Statement { .. } @@ -510,6 +526,9 @@ impl LogicalPlan { cross.left.head_output_expr() } } + LogicalPlan::RecursiveQuery(RecursiveQuery { static_term, .. }) => { + static_term.head_output_expr() + } LogicalPlan::Union(union) => Ok(Some(Expr::Column( union.schema.fields()[0].qualified_column(), ))), @@ -835,6 +854,14 @@ impl LogicalPlan { }; Ok(LogicalPlan::Distinct(distinct)) } + LogicalPlan::RecursiveQuery(RecursiveQuery { + name, is_distinct, .. + }) => Ok(LogicalPlan::RecursiveQuery(RecursiveQuery { + name: name.clone(), + static_term: Arc::new(inputs[0].clone()), + recursive_term: Arc::new(inputs[1].clone()), + is_distinct: *is_distinct, + })), LogicalPlan::Analyze(a) => { assert!(expr.is_empty()); assert_eq!(inputs.len(), 1); @@ -1073,6 +1100,7 @@ impl LogicalPlan { }), LogicalPlan::TableScan(TableScan { fetch, .. }) => *fetch, LogicalPlan::EmptyRelation(_) => Some(0), + LogicalPlan::RecursiveQuery(_) => None, LogicalPlan::Subquery(_) => None, LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => input.max_rows(), LogicalPlan::Limit(Limit { fetch, .. }) => *fetch, @@ -1408,6 +1436,11 @@ impl LogicalPlan { fn fmt(&self, f: &mut Formatter) -> fmt::Result { match self.0 { LogicalPlan::EmptyRelation(_) => write!(f, "EmptyRelation"), + LogicalPlan::RecursiveQuery(RecursiveQuery { + is_distinct, .. + }) => { + write!(f, "RecursiveQuery: is_distinct={}", is_distinct) + } LogicalPlan::Values(Values { ref values, .. }) => { let str_values: Vec<_> = values .iter() @@ -1718,6 +1751,19 @@ pub struct EmptyRelation { pub schema: DFSchemaRef, } +/// A variadic query operation +#[derive(Clone, PartialEq, Eq, Hash)] +pub struct RecursiveQuery { + /// Name of the query + pub name: String, + /// The static term + pub static_term: Arc, + /// The recursive term + pub recursive_term: Arc, + /// Distinction + pub is_distinct: bool, +} + /// Values expression. See /// [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html) /// documentation for more details. diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs b/datafusion/optimizer/src/common_subexpr_eliminate.rs index fc867df23c36..f29c7406acc9 100644 --- a/datafusion/optimizer/src/common_subexpr_eliminate.rs +++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs @@ -365,6 +365,7 @@ impl OptimizerRule for CommonSubexprEliminate { | LogicalPlan::Dml(_) | LogicalPlan::Copy(_) | LogicalPlan::Unnest(_) + | LogicalPlan::RecursiveQuery(_) | LogicalPlan::Prepare(_) => { // apply the optimization to all inputs of the plan utils::optimize_children(self, plan, config)? diff --git a/datafusion/optimizer/src/optimize_projections.rs b/datafusion/optimizer/src/optimize_projections.rs index d9c45510972c..ab0cb0a26551 100644 --- a/datafusion/optimizer/src/optimize_projections.rs +++ b/datafusion/optimizer/src/optimize_projections.rs @@ -163,6 +163,7 @@ fn optimize_projections( .collect::>() } LogicalPlan::EmptyRelation(_) + | LogicalPlan::RecursiveQuery(_) | LogicalPlan::Statement(_) | LogicalPlan::Values(_) | LogicalPlan::Extension(_) diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs index 6ca95519a9b1..09398b48cab6 100644 --- a/datafusion/proto/src/logical_plan/mod.rs +++ b/datafusion/proto/src/logical_plan/mod.rs @@ -1715,6 +1715,9 @@ impl AsLogicalPlan for LogicalPlanNode { LogicalPlan::DescribeTable(_) => Err(proto_error( "LogicalPlan serde is not yet implemented for DescribeTable", )), + LogicalPlan::RecursiveQuery(_) => Err(proto_error( + "LogicalPlan serde is not yet implemented for RecursiveQuery", + )), } } } diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs index a04df5589b85..aac61f446d88 100644 --- a/datafusion/sql/src/planner.rs +++ b/datafusion/sql/src/planner.rs @@ -61,6 +61,15 @@ pub trait ContextProvider { not_impl_err!("Table Functions are not supported") } + /// TODO: add doc + fn create_cte_work_table( + &self, + _name: &str, + _schema: SchemaRef, + ) -> Result> { + not_impl_err!("Recursive CTE is not implemented") + } + /// Getter for a UDF description fn get_function_meta(&self, name: &str) -> Option>; /// Getter for a UDAF description diff --git a/datafusion/sql/src/query.rs b/datafusion/sql/src/query.rs index 388377e3ee6b..a69752187542 100644 --- a/datafusion/sql/src/query.rs +++ b/datafusion/sql/src/query.rs @@ -19,6 +19,7 @@ use std::sync::Arc; use crate::planner::{ContextProvider, PlannerContext, SqlToRel}; +use arrow::datatypes::Schema; use datafusion_common::{ not_impl_err, plan_err, sql_err, Constraints, DataFusionError, Result, ScalarValue, }; @@ -26,7 +27,8 @@ use datafusion_expr::{ CreateMemoryTable, DdlStatement, Distinct, Expr, LogicalPlan, LogicalPlanBuilder, }; use sqlparser::ast::{ - Expr as SQLExpr, Offset as SQLOffset, OrderByExpr, Query, SetExpr, Value, + Expr as SQLExpr, Offset as SQLOffset, OrderByExpr, Query, SetExpr, SetOperator, + SetQuantifier, Value, }; use sqlparser::parser::ParserError::ParserError; @@ -52,21 +54,8 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { let set_expr = query.body; if let Some(with) = query.with { // Process CTEs from top to bottom - // do not allow self-references - if with.recursive { - if self - .context_provider - .options() - .execution - .enable_recursive_ctes - { - return plan_err!( - "Recursive CTEs are enabled but are not yet supported" - ); - } else { - return not_impl_err!("Recursive CTEs are not supported"); - } - } + + let is_recursive = with.recursive; for cte in with.cte_tables { // A `WITH` block can't use the same name more than once @@ -76,16 +65,128 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { "WITH query name {cte_name:?} specified more than once" ))); } - // create logical plan & pass backreferencing CTEs - // CTE expr don't need extend outer_query_schema - let logical_plan = - self.query_to_plan(*cte.query, &mut planner_context.clone())?; + let cte_query = cte.query; + + if is_recursive { + if !self + .context_provider + .options() + .execution + .enable_recursive_ctes + { + return not_impl_err!("Recursive CTEs are not enabled"); + } + + match *cte_query.body { + SetExpr::SetOperation { + op: SetOperator::Union, + left, + right, + set_quantifier, + } => { + let distinct = set_quantifier != SetQuantifier::All; + + // Each recursive CTE consists from two parts in the logical plan: + // 1. A static term (the left hand side on the SQL, where the + // referencing to the same CTE is not allowed) + // + // 2. A recursive term (the right hand side, and the recursive + // part) + + // Since static term does not have any specific properties, it can + // be compiled as if it was a regular expression. This will + // allow us to infer the schema to be used in the recursive term. + + // ---------- Step 1: Compile the static term ------------------ + let static_plan = self + .set_expr_to_plan(*left, &mut planner_context.clone())?; + + // Since the recursive CTEs include a component that references a + // table with its name, like the example below: + // + // WITH RECURSIVE values(n) AS ( + // SELECT 1 as n -- static term + // UNION ALL + // SELECT n + 1 + // FROM values -- self reference + // WHERE n < 100 + // ) + // + // We need a temporary 'relation' to be referenced and used. PostgreSQL + // calls this a 'working table', but it is entirely an implementation + // detail and a 'real' table with that name might not even exist (as + // in the case of DataFusion). + // + // Since we can't simply register a table during planning stage (it is + // an execution problem), we'll use a relation object that preserves the + // schema of the input perfectly and also knows which recursive CTE it is + // bound to. - // Each `WITH` block can change the column names in the last - // projection (e.g. "WITH table(t1, t2) AS SELECT 1, 2"). - let logical_plan = self.apply_table_alias(logical_plan, cte.alias)?; + // ---------- Step 2: Create a temporary relation ------------------ + // Step 2.1: Create a table source for the temporary relation + let work_table_source = + self.context_provider.create_cte_work_table( + &cte_name, + Arc::new(Schema::from(static_plan.schema().as_ref())), + )?; - planner_context.insert_cte(cte_name, logical_plan); + // Step 2.2: Create a temporary relation logical plan that will be used + // as the input to the recursive term + let work_table_plan = LogicalPlanBuilder::scan( + cte_name.to_string(), + work_table_source, + None, + )? + .build()?; + + let name = cte_name.clone(); + + // Step 2.3: Register the temporary relation in the planning context + // For all the self references in the variadic term, we'll replace it + // with the temporary relation we created above by temporarily registering + // it as a CTE. This temporary relation in the planning context will be + // replaced by the actual CTE plan once we're done with the planning. + planner_context.insert_cte(cte_name.clone(), work_table_plan); + + // ---------- Step 3: Compile the recursive term ------------------ + // this uses the named_relation we inserted above to resolve the + // relation. This ensures that the recursive term uses the named relation logical plan + // and thus the 'continuance' physical plan as its input and source + let recursive_plan = self + .set_expr_to_plan(*right, &mut planner_context.clone())?; + + // ---------- Step 4: Create the final plan ------------------ + // Step 4.1: Compile the final plan + let logical_plan = LogicalPlanBuilder::from(static_plan) + .to_recursive_query(name, recursive_plan, distinct)? + .build()?; + + let final_plan = + self.apply_table_alias(logical_plan, cte.alias)?; + + // Step 4.2: Remove the temporary relation from the planning context and replace it + // with the final plan. + planner_context.insert_cte(cte_name.clone(), final_plan); + } + _ => { + return Err(DataFusionError::SQL( + ParserError("Invalid recursive CTE".to_string()), + None, + )); + } + }; + } else { + // create logical plan & pass backreferencing CTEs + // CTE expr don't need extend outer_query_schema + let logical_plan = + self.query_to_plan(*cte_query, &mut planner_context.clone())?; + + // Each `WITH` block can change the column names in the last + // projection (e.g. "WITH table(t1, t2) AS SELECT 1, 2"). + let logical_plan = self.apply_table_alias(logical_plan, cte.alias)?; + + planner_context.insert_cte(cte_name, logical_plan); + } } } let plan = self.set_expr_to_plan(*(set_expr.clone()), planner_context)?; diff --git a/datafusion/sql/tests/sql_integration.rs b/datafusion/sql/tests/sql_integration.rs index 44da4cd4d836..c88e2d1130ed 100644 --- a/datafusion/sql/tests/sql_integration.rs +++ b/datafusion/sql/tests/sql_integration.rs @@ -1394,11 +1394,46 @@ fn recursive_ctes() { select * from numbers;"; let err = logical_plan(sql).expect_err("query should have failed"); assert_eq!( - "This feature is not implemented: Recursive CTEs are not supported", + "This feature is not implemented: Recursive CTEs are not enabled", err.strip_backtrace() ); } +#[test] +fn recursive_ctes_enabled() { + let sql = " + WITH RECURSIVE numbers AS ( + select 1 as n + UNION ALL + select n + 1 FROM numbers WHERE N < 10 + ) + select * from numbers;"; + + // manually setting up test here so that we can enable recursive ctes + let mut context = MockContextProvider::default(); + context.options_mut().execution.enable_recursive_ctes = true; + + let planner = SqlToRel::new_with_options(&context, ParserOptions::default()); + let result = DFParser::parse_sql_with_dialect(sql, &GenericDialect {}); + let mut ast = result.unwrap(); + + let plan = planner + .statement_to_plan(ast.pop_front().unwrap()) + .expect("recursive cte plan creation failed"); + + assert_eq!( + format!("{plan:?}"), + "Projection: numbers.n\ + \n SubqueryAlias: numbers\ + \n RecursiveQuery: is_distinct=false\ + \n Projection: Int64(1) AS n\ + \n EmptyRelation\ + \n Projection: numbers.n + Int64(1)\ + \n Filter: numbers.n < Int64(10)\ + \n TableScan: numbers" + ); +} + #[test] fn select_simple_aggregate_with_groupby_and_column_is_in_aggregate_and_groupby() { quick_test( @@ -2692,6 +2727,12 @@ struct MockContextProvider { udafs: HashMap>, } +impl MockContextProvider { + fn options_mut(&mut self) -> &mut ConfigOptions { + &mut self.options + } +} + impl ContextProvider for MockContextProvider { fn get_table_source(&self, name: TableReference) -> Result> { let schema = match name.table() { @@ -2801,6 +2842,14 @@ impl ContextProvider for MockContextProvider { fn options(&self) -> &ConfigOptions { &self.options } + + fn create_cte_work_table( + &self, + _name: &str, + schema: SchemaRef, + ) -> Result> { + Ok(Arc::new(EmptyTable::new(schema))) + } } #[test] From 8d38c92974a626b7a7d2372f0b10c0af1bed1a3e Mon Sep 17 00:00:00 2001 From: Matthew Gapp <61894094+matthewgapp@users.noreply.github.com> Date: Mon, 15 Jan 2024 13:37:59 -0800 Subject: [PATCH 04/10] wip: remove uncessary with qualifier method some docs more docs --- datafusion/common/src/dfschema.rs | 5 ----- .../core/src/datasource/{cte.rs => cte_worktable.rs} | 9 +++++++-- datafusion/core/src/datasource/mod.rs | 2 +- datafusion/core/src/execution/context/mod.rs | 5 ++++- 4 files changed, 12 insertions(+), 9 deletions(-) rename datafusion/core/src/datasource/{cte.rs => cte_worktable.rs} (80%) diff --git a/datafusion/common/src/dfschema.rs b/datafusion/common/src/dfschema.rs index dd7365c45782..c715fad1122f 100644 --- a/datafusion/common/src/dfschema.rs +++ b/datafusion/common/src/dfschema.rs @@ -909,11 +909,6 @@ impl DFField { self.field = f.into(); self } - - pub fn with_qualifier(mut self, qualifier: impl Into) -> Self { - self.qualifier = Some(qualifier.into()); - self - } } impl From for DFField { diff --git a/datafusion/core/src/datasource/cte.rs b/datafusion/core/src/datasource/cte_worktable.rs similarity index 80% rename from datafusion/core/src/datasource/cte.rs rename to datafusion/core/src/datasource/cte_worktable.rs index 9fb241f49db3..147d4040c8c7 100644 --- a/datafusion/core/src/datasource/cte.rs +++ b/datafusion/core/src/datasource/cte_worktable.rs @@ -35,14 +35,19 @@ use datafusion_common::DataFusionError; use crate::datasource::{TableProvider, TableType}; use crate::execution::context::SessionState; -/// TODO: add docs +/// The temporary working table where the previous iteration of a recursive query is stored +/// Naming is based on PostgreSQL's implementation. +/// See here for more details: www.postgresql.org/docs/11/queries-with.html#id-1.5.6.12.5.4 pub struct CteWorkTable { name: String, + /// This schema must be shared across both the static and recursive terms of a recursive query table_schema: SchemaRef, } impl CteWorkTable { - /// TODO: add doc + /// construct a new CteWorkTable with the given name and schema + /// This schema must match the schema of the recursive term of the query + /// Since the scan method will contain an physical plan that assumes this schema pub fn new(name: &str, table_schema: SchemaRef) -> Self { Self { name: name.to_owned(), diff --git a/datafusion/core/src/datasource/mod.rs b/datafusion/core/src/datasource/mod.rs index 93f197ec9438..8f20da183a93 100644 --- a/datafusion/core/src/datasource/mod.rs +++ b/datafusion/core/src/datasource/mod.rs @@ -20,7 +20,7 @@ //! [`ListingTable`]: crate::datasource::listing::ListingTable pub mod avro_to_arrow; -pub mod cte; +pub mod cte_worktable; pub mod default_table_source; pub mod empty; pub mod file_format; diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 30033aefccd5..9b623d7a51ec 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -26,7 +26,7 @@ mod parquet; use crate::{ catalog::{CatalogList, MemoryCatalogList}, datasource::{ - cte::CteWorkTable, + cte_worktable::CteWorkTable, function::{TableFunction, TableFunctionImpl}, listing::{ListingOptions, ListingTable}, provider::TableProviderFactory, @@ -1900,6 +1900,9 @@ impl<'a> ContextProvider for SessionContextProvider<'a> { Ok(provider_as_source(provider)) } + /// Create a new CTE work table for a recursive CTE logical plan + /// This table will be used in conjunction with a Worktable physical plan + /// to read and write each iteration of a recursive CTE fn create_cte_work_table( &self, name: &str, From 675a6df8eb2de2a8bacc7359b2c7b73c24b3831f Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 18 Jan 2024 14:14:28 -0500 Subject: [PATCH 05/10] Add comments to `RecursiveQuery` --- datafusion/expr/src/logical_plan/plan.rs | 31 +++++++++++++++++++++--- 1 file changed, 27 insertions(+), 4 deletions(-) diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 1cf325caa90c..5ab8a9c99cd0 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -1751,16 +1751,39 @@ pub struct EmptyRelation { pub schema: DFSchemaRef, } -/// A variadic query operation +/// A variadic query operation, Recursive CTE. +/// +/// # Recursive Query Evaluation +/// +/// From the [Postgres Docs]: +/// +/// 1. Evaluate the non-recursive term. For `UNION` (but not `UNION ALL`), +/// discard duplicate rows. Include all remaining rows in the result of the +/// recursive query, and also place them in a temporary working table. +// +/// 2. So long as the working table is not empty, repeat these steps: +/// +/// * Evaluate the recursive term, substituting the current contents of the +/// working table for the recursive self-reference. For `UNION` (but not `UNION +/// ALL`), discard duplicate rows and rows that duplicate any previous result +/// row. Include all remaining rows in the result of the recursive query, and +/// also place them in a temporary intermediate table. +/// +/// * Replace the contents of the working table with the contents of the +/// intermediate table, then empty the intermediate table. +/// +/// [Postgres Docs]: https://www.postgresql.org/docs/current/queries-with.html#QUERIES-WITH-RECURSIVE #[derive(Clone, PartialEq, Eq, Hash)] pub struct RecursiveQuery { /// Name of the query pub name: String, - /// The static term + /// The static term (initial contents of the working table) pub static_term: Arc, - /// The recursive term + /// The recursive term (evaluated on the contents of the working table until + /// it returns an empty set) pub recursive_term: Arc, - /// Distinction + /// Should the output of the recursive term be deduplicated (`UNION`) or + /// not (`UNION ALL`). pub is_distinct: bool, } From 85c276426f61ea42dca81e2b9f3415b3d9ffe0c6 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 18 Jan 2024 14:17:54 -0500 Subject: [PATCH 06/10] Update datfusion-cli Cargo.lock --- datafusion-cli/Cargo.lock | 68 ++++++++++++++++----------------------- 1 file changed, 28 insertions(+), 40 deletions(-) diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index eed8ec1876d0..db5913503aaa 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -360,9 +360,9 @@ dependencies = [ [[package]] name = "async-compression" -version = "0.4.5" +version = "0.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc2d0cfb2a7388d34f590e76686704c494ed7aaceed62ee1ba35cbf363abc2a5" +checksum = "a116f46a969224200a0a97f29cfd4c50e7534e4b4826bd23ea2c3c533039c82c" dependencies = [ "bzip2", "flate2", @@ -733,9 +733,9 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "bitflags" -version = "2.4.1" +version = "2.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "327762f6e5a765692301e5bb513e0d9fef63be86bbc14528052b1cd3e6f03e07" +checksum = "ed570934406eb16438a4e976b1b4500774099c13b8cb96eec99f620f05090ddf" [[package]] name = "blake2" @@ -1125,7 +1125,7 @@ dependencies = [ "half", "hashbrown 0.14.3", "indexmap 2.1.0", - "itertools 0.12.0", + "itertools", "log", "num-traits", "num_cpus", @@ -1207,7 +1207,6 @@ dependencies = [ "parking_lot", "rand", "tempfile", - "tokio", "url", ] @@ -1236,7 +1235,7 @@ dependencies = [ "datafusion-expr", "datafusion-physical-expr", "hashbrown 0.14.3", - "itertools 0.12.0", + "itertools", "log", "regex-syntax", ] @@ -1261,7 +1260,7 @@ dependencies = [ "hashbrown 0.14.3", "hex", "indexmap 2.1.0", - "itertools 0.12.0", + "itertools", "log", "md-5", "paste", @@ -1292,14 +1291,13 @@ dependencies = [ "half", "hashbrown 0.14.3", "indexmap 2.1.0", - "itertools 0.12.0", + "itertools", "log", "once_cell", "parking_lot", "pin-project-lite", "rand", "tokio", - "tokio-stream", "uuid", ] @@ -1654,9 +1652,9 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "h2" -version = "0.3.23" +version = "0.3.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b553656127a00601c8ae5590fcfdc118e4083a7924b6cf4ffc1ea4b99dc429d7" +checksum = "bb2c4422095b67ee78da96fbb51a4cc413b3b25883c7717ff7ca1ab31022c9c9" dependencies = [ "bytes", "fnv", @@ -1724,9 +1722,9 @@ dependencies = [ [[package]] name = "hermit-abi" -version = "0.3.3" +version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d77f7ec81a6d05a3abb01ab6eb7590f6083d08449fe5a1c8b1e620283546ccb7" +checksum = "5d3d0e0f38255e7fa3cf31335b3a56f05febd18025f4db5ef7a0cfb4f8da651f" [[package]] name = "hex" @@ -1910,15 +1908,6 @@ version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3" -[[package]] -name = "itertools" -version = "0.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1c173a5686ce8bfa551b3563d0c2170bf24ca44da99c7ca4bfdab5418c3fe57" -dependencies = [ - "either", -] - [[package]] name = "itertools" version = "0.12.0" @@ -2074,16 +2063,16 @@ version = "0.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "85c833ca1e66078851dba29046874e38f08b2c883700aa29a03ddd3b23814ee8" dependencies = [ - "bitflags 2.4.1", + "bitflags 2.4.2", "libc", "redox_syscall", ] [[package]] name = "linux-raw-sys" -version = "0.4.12" +version = "0.4.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4cd1a83af159aa67994778be9070f0ae1bd732942279cabb14f86f986a21456" +checksum = "01cda141df6706de531b6c46c3a33ecca755538219bd484262fa09410c13539c" [[package]] name = "lock_api" @@ -2281,7 +2270,7 @@ version = "1.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" dependencies = [ - "hermit-abi 0.3.3", + "hermit-abi 0.3.4", "libc", ] @@ -2307,7 +2296,7 @@ dependencies = [ "futures", "humantime", "hyper", - "itertools 0.12.0", + "itertools", "parking_lot", "percent-encoding", "quick-xml", @@ -2518,9 +2507,9 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" [[package]] name = "pkg-config" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69d3587f8a9e599cc7ec2c00e331f71c4e69a5f9a4b8a6efd5b07466b9736f9a" +checksum = "2900ede94e305130c13ddd391e0ab7cbaeb783945ae07a279c268cb05109c6cb" [[package]] name = "powerfmt" @@ -2536,14 +2525,13 @@ checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" [[package]] name = "predicates" -version = "3.0.4" +version = "3.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6dfc28575c2e3f19cb3c73b93af36460ae898d426eba6fc15b9bd2a5220758a0" +checksum = "68b87bfd4605926cdfefc1c3b5f8fe560e3feca9d5552cf68c466d3d8236c7e8" dependencies = [ "anstyle", "difflib", "float-cmp", - "itertools 0.11.0", "normalize-line-endings", "predicates-core", "regex", @@ -2838,11 +2826,11 @@ dependencies = [ [[package]] name = "rustix" -version = "0.38.29" +version = "0.38.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0a1a81a2478639a14e68937903356dbac62cf52171148924f754bb8a8cd7a96c" +checksum = "322394588aaf33c24007e8bb3238ee3e4c5c09c084ab32bc73890b99ff326bca" dependencies = [ - "bitflags 2.4.1", + "bitflags 2.4.2", "errno", "libc", "linux-raw-sys", @@ -3104,9 +3092,9 @@ dependencies = [ [[package]] name = "smallvec" -version = "1.11.2" +version = "1.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4dccd0940a2dcdf68d092b8cbab7dc0ad8fa938bf95787e1b916b0e3d0e8e970" +checksum = "2593d31f82ead8df961d8bd23a64c2ccf2eb5dd34b0a34bfb4dd54011c72009e" [[package]] name = "snafu" @@ -3565,9 +3553,9 @@ checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825" [[package]] name = "unicode-bidi" -version = "0.3.14" +version = "0.3.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6f2528f27a9eb2b21e69c95319b30bd0efd85d09c379741b0f78ea1d86be2416" +checksum = "08f95100a766bf4f8f28f90d77e0a5461bbdb219042e7679bebe79004fed8d75" [[package]] name = "unicode-ident" From 63445d874fede05ed05d8d7b17f3114160b6e7f8 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 18 Jan 2024 14:29:46 -0500 Subject: [PATCH 07/10] Fix clippy --- datafusion/core/src/datasource/cte_worktable.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/datafusion/core/src/datasource/cte_worktable.rs b/datafusion/core/src/datasource/cte_worktable.rs index 147d4040c8c7..de13e73e003b 100644 --- a/datafusion/core/src/datasource/cte_worktable.rs +++ b/datafusion/core/src/datasource/cte_worktable.rs @@ -39,6 +39,9 @@ use crate::execution::context::SessionState; /// Naming is based on PostgreSQL's implementation. /// See here for more details: www.postgresql.org/docs/11/queries-with.html#id-1.5.6.12.5.4 pub struct CteWorkTable { + /// The name of the CTE work table + // WIP, see https://github.com/apache/arrow-datafusion/issues/462 + #[allow(dead_code)] name: String, /// This schema must be shared across both the static and recursive terms of a recursive query table_schema: SchemaRef, From 129d41ab054aaf1c2320b2567ec18311f228af53 Mon Sep 17 00:00:00 2001 From: Matthew Gapp <61894094+matthewgapp@users.noreply.github.com> Date: Thu, 18 Jan 2024 11:50:18 -0800 Subject: [PATCH 08/10] better errors and comments --- datafusion/expr/src/logical_plan/builder.rs | 3 ++- datafusion/sql/src/query.rs | 7 +++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index ef70d30f77b4..eb5e5bd42634 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -124,6 +124,7 @@ impl LogicalPlanBuilder { } /// Convert a regular plan into a recursive query. + /// `is_distinct` indicates whether the recursive term should be de-duplicated (`UNION`) after each iteration or not (`UNION ALL`). pub fn to_recursive_query( &self, name: String, @@ -133,7 +134,7 @@ impl LogicalPlanBuilder { // TODO: we need to do a bunch of validation here. Maybe more. if is_distinct { return Err(DataFusionError::NotImplemented( - "Recursive queries with distinct is not supported".to_string(), + "Recursive queries with a distinct 'UNION' (in which the previous iteration's results will be de-duplicated) is not supported".to_string(), )); } Ok(Self::from(LogicalPlan::RecursiveQuery(RecursiveQuery { diff --git a/datafusion/sql/src/query.rs b/datafusion/sql/src/query.rs index a69752187542..af0b91ae6c7e 100644 --- a/datafusion/sql/src/query.rs +++ b/datafusion/sql/src/query.rs @@ -65,7 +65,6 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { "WITH query name {cte_name:?} specified more than once" ))); } - let cte_query = cte.query; if is_recursive { if !self @@ -77,7 +76,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { return not_impl_err!("Recursive CTEs are not enabled"); } - match *cte_query.body { + match *cte.query.body { SetExpr::SetOperation { op: SetOperator::Union, left, @@ -170,7 +169,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { } _ => { return Err(DataFusionError::SQL( - ParserError("Invalid recursive CTE".to_string()), + ParserError(format!("Unsupported CTE: {cte}")), None, )); } @@ -179,7 +178,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { // create logical plan & pass backreferencing CTEs // CTE expr don't need extend outer_query_schema let logical_plan = - self.query_to_plan(*cte_query, &mut planner_context.clone())?; + self.query_to_plan(*cte.query, &mut planner_context.clone())?; // Each `WITH` block can change the column names in the last // projection (e.g. "WITH table(t1, t2) AS SELECT 1, 2"). From cd95b7948477d8d76141c9608853152892b01dd7 Mon Sep 17 00:00:00 2001 From: Matthew Gapp <61894094+matthewgapp@users.noreply.github.com> Date: Thu, 18 Jan 2024 14:02:18 -0800 Subject: [PATCH 09/10] add doc comment with rationale for create_cte_worktable method --- datafusion/sql/src/planner.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs index aac61f446d88..64aaecc87ed1 100644 --- a/datafusion/sql/src/planner.rs +++ b/datafusion/sql/src/planner.rs @@ -61,7 +61,11 @@ pub trait ContextProvider { not_impl_err!("Table Functions are not supported") } - /// TODO: add doc + /// This provides a worktable (an intermediate table that is used to store the results of a CTE during execution) + /// We don't directly implement this in the logical plan's ['SqlToRel`] + /// because the sql code needs access to a table that contains execution-related types that can't be a direct dependency + /// of the sql crate (namely, the `CteWorktable`). + /// The ContextProvider provides a way to "hide" this dependency. fn create_cte_work_table( &self, _name: &str, From ead6b307f02267c349e0d182ba52dcf9790c8cb7 Mon Sep 17 00:00:00 2001 From: Matthew Gapp <61894094+matthewgapp@users.noreply.github.com> Date: Thu, 18 Jan 2024 14:06:18 -0800 Subject: [PATCH 10/10] wip: tweak --- datafusion/sql/src/planner.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs index 64aaecc87ed1..d4dd42edcd39 100644 --- a/datafusion/sql/src/planner.rs +++ b/datafusion/sql/src/planner.rs @@ -65,7 +65,7 @@ pub trait ContextProvider { /// We don't directly implement this in the logical plan's ['SqlToRel`] /// because the sql code needs access to a table that contains execution-related types that can't be a direct dependency /// of the sql crate (namely, the `CteWorktable`). - /// The ContextProvider provides a way to "hide" this dependency. + /// The [`ContextProvider`] provides a way to "hide" this dependency. fn create_cte_work_table( &self, _name: &str,