From 7ff187227ab230e26b7c2abd64ebcbd280f200cd Mon Sep 17 00:00:00 2001 From: Chris Hajas Date: Mon, 27 Jun 2022 14:57:33 -0700 Subject: [PATCH] Support intermediate aggs in Orca plans (#13707) Orca in GPDB6 has support for intermediate aggs, which isn't used in postgres. This is useful when we have a DQA and a regular "ride-along" agg. However, we need to differentiate when we should run the combine/final/trans functions when this ride-along agg is present. This commit re-adds support for intermediate aggs. The logic here is the same as 6X, however, instead of explicitly using the aggstage, we use the aggsplit, which is determined from the aggstage. The logic is defined in `AGGSPLIT_INTERNMEDIATE`. The changes in nodeAgg.c are to allow the aggref and aggstate to differ for an aggregate. This is necessary and expected in the case of an intermediate agg, as the loop will iterate over each aggstate->aggs, but the aggsplit can now be different between the aggref and the aggstate. Thus the aggsplit references are also changed to use aggref instead of aggstate. --- src/backend/executor/nodeAgg.c | 14 ++- .../translate/CTranslatorDXLToScalar.cpp | 5 +- .../data/dxl/minidump/DQA-1-RegularAgg.mdp | 2 +- .../data/dxl/minidump/DQA-2-RegularAgg.mdp | 2 +- .../minidump/DQA-SplitScalarWithAggAndGuc.mdp | 2 +- .../data/dxl/minidump/MDQA-SameDQAColumn.mdp | 2 +- .../libgpopt/src/xforms/CXformSplitDQA.cpp | 86 +++++-------------- src/include/nodes/nodes.h | 2 + .../regress/expected/aggregates_optimizer.out | 2 - 9 files changed, 34 insertions(+), 83 deletions(-) diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c index a19c442d9d4..75f160856c4 100644 --- a/src/backend/executor/nodeAgg.c +++ b/src/backend/executor/nodeAgg.c @@ -3884,8 +3884,6 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) /* Planner should have assigned aggregate to correct level */ Assert(aggref->agglevelsup == 0); - /* ... and the split mode should match */ - Assert(aggref->aggsplit == aggstate->aggsplit); peragg = &peraggs[aggref->aggno]; @@ -3917,7 +3915,7 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) Assert(OidIsValid(aggtranstype)); /* Final function only required if we're finalizing the aggregates */ - if (DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit)) + if (DO_AGGSPLIT_SKIPFINAL(aggref->aggsplit)) peragg->finalfn_oid = finalfn_oid = InvalidOid; else peragg->finalfn_oid = finalfn_oid = aggform->aggfinalfn; @@ -3936,10 +3934,10 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) * every aggregate with an INTERNAL state has a serialization * function. Verify that. */ - if (DO_AGGSPLIT_SERIALIZE(aggstate->aggsplit)) + if (DO_AGGSPLIT_SERIALIZE(aggref->aggsplit)) { /* serialization only valid when not running finalfn */ - Assert(DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit)); + Assert(DO_AGGSPLIT_SKIPFINAL(aggref->aggsplit)); if (!OidIsValid(aggform->aggserialfn)) elog(ERROR, "serialfunc not provided for serialization aggregation"); @@ -3947,10 +3945,10 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) } /* Likewise for deserialization functions */ - if (DO_AGGSPLIT_DESERIALIZE(aggstate->aggsplit)) + if (DO_AGGSPLIT_DESERIALIZE(aggref->aggsplit)) { /* deserialization only valid when combining states */ - Assert(DO_AGGSPLIT_COMBINE(aggstate->aggsplit)); + Assert(DO_AGGSPLIT_COMBINE(aggref->aggsplit)); if (!OidIsValid(aggform->aggdeserialfn)) elog(ERROR, "deserialfunc not provided for deserialization aggregation"); @@ -4058,7 +4056,7 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) * of using the transition function, we'll use the combine * function */ - if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit)) + if (DO_AGGSPLIT_COMBINE(aggref->aggsplit)) { transfn_oid = aggform->aggcombinefn; diff --git a/src/backend/gpopt/translate/CTranslatorDXLToScalar.cpp b/src/backend/gpopt/translate/CTranslatorDXLToScalar.cpp index 9720525e517..cdfeb69f710 100644 --- a/src/backend/gpopt/translate/CTranslatorDXLToScalar.cpp +++ b/src/backend/gpopt/translate/CTranslatorDXLToScalar.cpp @@ -591,10 +591,7 @@ CTranslatorDXLToScalar::TranslateDXLScalarAggrefToScalar( aggref->aggsplit = AGGSPLIT_INITIAL_SERIAL; break; case EdxlaggstageIntermediate: - GPOS_RAISE( - gpdxl::ExmaDXL, gpdxl::ExmiPlStmt2DXLConversion, - GPOS_WSZ_LIT( - "GPDB_96_MERGE_FIXME: Intermediate aggregate stage not implemented")); + aggref->aggsplit = AGGSPLIT_INTERNMEDIATE; break; case EdxlaggstageFinal: aggref->aggsplit = AGGSPLIT_FINAL_DESERIAL; diff --git a/src/backend/gporca/data/dxl/minidump/DQA-1-RegularAgg.mdp b/src/backend/gporca/data/dxl/minidump/DQA-1-RegularAgg.mdp index c6f1155860e..c1eaf49b793 100644 --- a/src/backend/gporca/data/dxl/minidump/DQA-1-RegularAgg.mdp +++ b/src/backend/gporca/data/dxl/minidump/DQA-1-RegularAgg.mdp @@ -275,7 +275,7 @@ - + diff --git a/src/backend/gporca/data/dxl/minidump/DQA-2-RegularAgg.mdp b/src/backend/gporca/data/dxl/minidump/DQA-2-RegularAgg.mdp index 12327bbf392..b39c08475b7 100644 --- a/src/backend/gporca/data/dxl/minidump/DQA-2-RegularAgg.mdp +++ b/src/backend/gporca/data/dxl/minidump/DQA-2-RegularAgg.mdp @@ -320,7 +320,7 @@ - + diff --git a/src/backend/gporca/data/dxl/minidump/DQA-SplitScalarWithAggAndGuc.mdp b/src/backend/gporca/data/dxl/minidump/DQA-SplitScalarWithAggAndGuc.mdp index a17981cbd55..adb33a993bd 100644 --- a/src/backend/gporca/data/dxl/minidump/DQA-SplitScalarWithAggAndGuc.mdp +++ b/src/backend/gporca/data/dxl/minidump/DQA-SplitScalarWithAggAndGuc.mdp @@ -350,7 +350,7 @@ - + diff --git a/src/backend/gporca/data/dxl/minidump/MDQA-SameDQAColumn.mdp b/src/backend/gporca/data/dxl/minidump/MDQA-SameDQAColumn.mdp index a259e819c01..52b5267a191 100644 --- a/src/backend/gporca/data/dxl/minidump/MDQA-SameDQAColumn.mdp +++ b/src/backend/gporca/data/dxl/minidump/MDQA-SameDQAColumn.mdp @@ -344,7 +344,7 @@ - + diff --git a/src/backend/gporca/libgpopt/src/xforms/CXformSplitDQA.cpp b/src/backend/gporca/libgpopt/src/xforms/CXformSplitDQA.cpp index a9af5f9b10b..be45591b97b 100644 --- a/src/backend/gporca/libgpopt/src/xforms/CXformSplitDQA.cpp +++ b/src/backend/gporca/libgpopt/src/xforms/CXformSplitDQA.cpp @@ -79,40 +79,6 @@ CXformSplitDQA::Exfp(CExpressionHandle &exprhdl) const return CXform::ExfpHigh; } -// Checks whether or not the project list contains at least one DQA and one -// non-DQA. -static bool -FContainsRideAlongAggregate(CExpression *pexprProjectList) -{ - bool hasDQA = false; - bool hasNonDQA = false; - - const ULONG size = pexprProjectList->PdrgPexpr()->Size(); - for (ULONG ul = 0; ul < size; ul++) - { - CExpression *pexpr = (*pexprProjectList->PdrgPexpr())[ul]; - - const ULONG sizeInner = pexpr->PdrgPexpr()->Size(); - CScalarAggFunc *paggfunc; - if (sizeInner != 1 || (paggfunc = CScalarAggFunc::PopConvert( - (*pexpr->PdrgPexpr())[0]->Pop())) == nullptr) - { - continue; - } - - if (paggfunc->IsDistinct()) - { - hasDQA = true; - } - else - { - hasNonDQA = true; - } - } - - return hasDQA && hasNonDQA; -} - //--------------------------------------------------------------------------- // @function: // CXformSplitDQA::Transform @@ -184,37 +150,27 @@ CXformSplitDQA::Transform(CXformContext *pxfctxt, CXformResult *pxfres, pxfres->Add(pexprThreeStageDQA); - // GPDB_96_MERGE_FIXME: Postgres 9.6 merge commit 38d881555207 replaced - // Greenplum multi-stage aggregate executor code with upstream. In the - // process, we lost the intermediate aggregate stage which is useful when - // we have a 'ride-along' aggregate. For example, - // - // SELECT SUM(a), COUNT(DISTINCT b) FROM foo; - // - // After we re-implement intermediate aggregate stage in executor we should - // be able to re-enable the following transform optimization. - if (!FContainsRideAlongAggregate(pexprProjectList)) - { - // generate two-stage agg - // this transform is useful for cases where distinct column is same as distributed column. - // for a query like "select count(distinct a) from bar;" - // we generate a two stage agg where the aggregate operator gives us the distinct values. - // CScalarProjectList for the Local agg below is empty on purpose. - - // +--CLogicalGbAgg( Global ) Grp Cols: [][Global] - // |--CLogicalGbAgg( Local ) Grp Cols: ["a" (0)][Local], - // | |--CLogicalGet "bar" ("bar"), - // | +--CScalarProjectList - // +--CScalarProjectList - // +--CScalarProjectElement "count" (9) - // +--CScalarAggFunc (count , Distinct: false , Aggregate Stage: Global) - // +--CScalarIdent "a" (0) - - CExpression *pexprTwoStageScalarDQA = PexprSplitHelper( - mp, col_factory, md_accessor, pexpr, pexprRelational, phmexprcr, - pdrgpcrArgDQA, CLogicalGbAgg::EasTwoStageScalarDQA); - pxfres->Add(pexprTwoStageScalarDQA); - } + + // generate two-stage agg + // this transform is useful for cases where distinct column is same as distributed column. + // for a query like "select count(distinct a) from bar;" + // we generate a two stage agg where the aggregate operator gives us the distinct values. + // CScalarProjectList for the Local agg below is empty on purpose. + + // +--CLogicalGbAgg( Global ) Grp Cols: [][Global] + // |--CLogicalGbAgg( Local ) Grp Cols: ["a" (0)][Local], + // | |--CLogicalGet "bar" ("bar"), + // | +--CScalarProjectList + // +--CScalarProjectList + // +--CScalarProjectElement "count" (9) + // +--CScalarAggFunc (count , Distinct: false , Aggregate Stage: Global) + // +--CScalarIdent "a" (0) + + CExpression *pexprTwoStageScalarDQA = PexprSplitHelper( + mp, col_factory, md_accessor, pexpr, pexprRelational, phmexprcr, + pdrgpcrArgDQA, CLogicalGbAgg::EasTwoStageScalarDQA); + pxfres->Add(pexprTwoStageScalarDQA); + // generate local DQA, global agg for both scalar and non-scalar agg cases. // for a query like "select count(distinct a) from bar;" diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h index f0fe4ca3060..2c7c027c875 100644 --- a/src/include/nodes/nodes.h +++ b/src/include/nodes/nodes.h @@ -985,6 +985,8 @@ typedef enum AggSplit * stripped away from Aggs in setrefs.c. */ AGGSPLIT_DEDUPLICATED = AGGSPLITOP_DEDUPLICATED, + + AGGSPLIT_INTERNMEDIATE = AGGSPLITOP_SKIPFINAL | AGGSPLITOP_SERIALIZE | AGGSPLITOP_COMBINE | AGGSPLITOP_DESERIALIZE, } AggSplit; /* Test whether an AggSplit value selects each primitive option: */ diff --git a/src/test/regress/expected/aggregates_optimizer.out b/src/test/regress/expected/aggregates_optimizer.out index b3cdac99b47..0c44dcfd641 100644 --- a/src/test/regress/expected/aggregates_optimizer.out +++ b/src/test/regress/expected/aggregates_optimizer.out @@ -579,8 +579,6 @@ group by ten order by ten; select ten, count(four), sum(DISTINCT four) from onek group by ten order by ten; -INFO: GPORCA failed to produce a plan, falling back to planner -DETAIL: GPDB Expression type: GPDB_96_MERGE_FIXME: Intermediate aggregate stage not implemented not supported in DXL ten | count | sum -----+-------+----- 0 | 100 | 2