Skip to content

Commit

Permalink
Support intermediate aggs in Orca plans (#13707)
Browse files Browse the repository at this point in the history
Orca in GPDB6 has support for intermediate aggs, which isn't used in postgres.
This is useful when we have a DQA and a regular "ride-along" agg.
However, we need to differentiate when we should run the
combine/final/trans functions when this ride-along agg is present.

This commit re-adds support for intermediate aggs. The logic here is the
same as 6X, however, instead of explicitly using the aggstage, we use
the aggsplit, which is determined from the aggstage. The logic is
defined in `AGGSPLIT_INTERNMEDIATE`.

The changes in nodeAgg.c are to allow the aggref and aggstate to differ
for an aggregate. This is necessary and expected in the case of an
intermediate agg, as the loop will iterate over each aggstate->aggs, but
the aggsplit can now be different between the aggref and the aggstate.
Thus the aggsplit references are also changed to use aggref instead of
aggstate.
  • Loading branch information
chrishajas authored and leborchuk committed Jan 9, 2025
1 parent dbc7a71 commit 7ff1872
Show file tree
Hide file tree
Showing 9 changed files with 34 additions and 83 deletions.
14 changes: 6 additions & 8 deletions src/backend/executor/nodeAgg.c
Original file line number Diff line number Diff line change
Expand Up @@ -3884,8 +3884,6 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)

/* Planner should have assigned aggregate to correct level */
Assert(aggref->agglevelsup == 0);
/* ... and the split mode should match */
Assert(aggref->aggsplit == aggstate->aggsplit);

peragg = &peraggs[aggref->aggno];

Expand Down Expand Up @@ -3917,7 +3915,7 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
Assert(OidIsValid(aggtranstype));

/* Final function only required if we're finalizing the aggregates */
if (DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit))
if (DO_AGGSPLIT_SKIPFINAL(aggref->aggsplit))
peragg->finalfn_oid = finalfn_oid = InvalidOid;
else
peragg->finalfn_oid = finalfn_oid = aggform->aggfinalfn;
Expand All @@ -3936,21 +3934,21 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
* every aggregate with an INTERNAL state has a serialization
* function. Verify that.
*/
if (DO_AGGSPLIT_SERIALIZE(aggstate->aggsplit))
if (DO_AGGSPLIT_SERIALIZE(aggref->aggsplit))
{
/* serialization only valid when not running finalfn */
Assert(DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit));
Assert(DO_AGGSPLIT_SKIPFINAL(aggref->aggsplit));

if (!OidIsValid(aggform->aggserialfn))
elog(ERROR, "serialfunc not provided for serialization aggregation");
serialfn_oid = aggform->aggserialfn;
}

/* Likewise for deserialization functions */
if (DO_AGGSPLIT_DESERIALIZE(aggstate->aggsplit))
if (DO_AGGSPLIT_DESERIALIZE(aggref->aggsplit))
{
/* deserialization only valid when combining states */
Assert(DO_AGGSPLIT_COMBINE(aggstate->aggsplit));
Assert(DO_AGGSPLIT_COMBINE(aggref->aggsplit));

if (!OidIsValid(aggform->aggdeserialfn))
elog(ERROR, "deserialfunc not provided for deserialization aggregation");
Expand Down Expand Up @@ -4058,7 +4056,7 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
* of using the transition function, we'll use the combine
* function
*/
if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit))
if (DO_AGGSPLIT_COMBINE(aggref->aggsplit))
{
transfn_oid = aggform->aggcombinefn;

Expand Down
5 changes: 1 addition & 4 deletions src/backend/gpopt/translate/CTranslatorDXLToScalar.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -591,10 +591,7 @@ CTranslatorDXLToScalar::TranslateDXLScalarAggrefToScalar(
aggref->aggsplit = AGGSPLIT_INITIAL_SERIAL;
break;
case EdxlaggstageIntermediate:
GPOS_RAISE(
gpdxl::ExmaDXL, gpdxl::ExmiPlStmt2DXLConversion,
GPOS_WSZ_LIT(
"GPDB_96_MERGE_FIXME: Intermediate aggregate stage not implemented"));
aggref->aggsplit = AGGSPLIT_INTERNMEDIATE;
break;
case EdxlaggstageFinal:
aggref->aggsplit = AGGSPLIT_FINAL_DESERIAL;
Expand Down
2 changes: 1 addition & 1 deletion src/backend/gporca/data/dxl/minidump/DQA-1-RegularAgg.mdp
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@
</dxl:LogicalGet>
</dxl:LogicalGroupBy>
</dxl:Query>
<dxl:Plan Id="0" SpaceSize="30">
<dxl:Plan Id="0" SpaceSize="42">
<dxl:GatherMotion InputSegments="0,1" OutputSegments="-1">
<dxl:Properties>
<dxl:Cost StartupCost="0" TotalCost="577.928905" Rows="4.000000" Width="16"/>
Expand Down
2 changes: 1 addition & 1 deletion src/backend/gporca/data/dxl/minidump/DQA-2-RegularAgg.mdp
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@
</dxl:LogicalGet>
</dxl:LogicalGroupBy>
</dxl:Query>
<dxl:Plan Id="0" SpaceSize="30">
<dxl:Plan Id="0" SpaceSize="42">
<dxl:GatherMotion InputSegments="0,1" OutputSegments="-1">
<dxl:Properties>
<dxl:Cost StartupCost="0" TotalCost="579.950342" Rows="4.000000" Width="24"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@
</dxl:LogicalGet>
</dxl:LogicalGroupBy>
</dxl:Query>
<dxl:Plan Id="0" SpaceSize="9">
<dxl:Plan Id="0" SpaceSize="13">
<dxl:Aggregate AggregationStrategy="Plain" StreamSafe="false">
<dxl:Properties>
<dxl:Cost StartupCost="0" TotalCost="438.791035" Rows="1.000000" Width="16"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@
</dxl:LogicalGroupBy>
</dxl:LogicalLimit>
</dxl:Query>
<dxl:Plan Id="0" SpaceSize="101">
<dxl:Plan Id="0" SpaceSize="131">
<dxl:GatherMotion InputSegments="0,1" OutputSegments="-1">
<dxl:Properties>
<dxl:Cost StartupCost="0" TotalCost="585.939124" Rows="5.000000" Width="28"/>
Expand Down
86 changes: 21 additions & 65 deletions src/backend/gporca/libgpopt/src/xforms/CXformSplitDQA.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,40 +79,6 @@ CXformSplitDQA::Exfp(CExpressionHandle &exprhdl) const
return CXform::ExfpHigh;
}

// Checks whether or not the project list contains at least one DQA and one
// non-DQA.
static bool
FContainsRideAlongAggregate(CExpression *pexprProjectList)
{
bool hasDQA = false;
bool hasNonDQA = false;

const ULONG size = pexprProjectList->PdrgPexpr()->Size();
for (ULONG ul = 0; ul < size; ul++)
{
CExpression *pexpr = (*pexprProjectList->PdrgPexpr())[ul];

const ULONG sizeInner = pexpr->PdrgPexpr()->Size();
CScalarAggFunc *paggfunc;
if (sizeInner != 1 || (paggfunc = CScalarAggFunc::PopConvert(
(*pexpr->PdrgPexpr())[0]->Pop())) == nullptr)
{
continue;
}

if (paggfunc->IsDistinct())
{
hasDQA = true;
}
else
{
hasNonDQA = true;
}
}

return hasDQA && hasNonDQA;
}

//---------------------------------------------------------------------------
// @function:
// CXformSplitDQA::Transform
Expand Down Expand Up @@ -184,37 +150,27 @@ CXformSplitDQA::Transform(CXformContext *pxfctxt, CXformResult *pxfres,

pxfres->Add(pexprThreeStageDQA);

// GPDB_96_MERGE_FIXME: Postgres 9.6 merge commit 38d881555207 replaced
// Greenplum multi-stage aggregate executor code with upstream. In the
// process, we lost the intermediate aggregate stage which is useful when
// we have a 'ride-along' aggregate. For example,
//
// SELECT SUM(a), COUNT(DISTINCT b) FROM foo;
//
// After we re-implement intermediate aggregate stage in executor we should
// be able to re-enable the following transform optimization.
if (!FContainsRideAlongAggregate(pexprProjectList))
{
// generate two-stage agg
// this transform is useful for cases where distinct column is same as distributed column.
// for a query like "select count(distinct a) from bar;"
// we generate a two stage agg where the aggregate operator gives us the distinct values.
// CScalarProjectList for the Local agg below is empty on purpose.

// +--CLogicalGbAgg( Global ) Grp Cols: [][Global]
// |--CLogicalGbAgg( Local ) Grp Cols: ["a" (0)][Local],
// | |--CLogicalGet "bar" ("bar"),
// | +--CScalarProjectList
// +--CScalarProjectList
// +--CScalarProjectElement "count" (9)
// +--CScalarAggFunc (count , Distinct: false , Aggregate Stage: Global)
// +--CScalarIdent "a" (0)

CExpression *pexprTwoStageScalarDQA = PexprSplitHelper(
mp, col_factory, md_accessor, pexpr, pexprRelational, phmexprcr,
pdrgpcrArgDQA, CLogicalGbAgg::EasTwoStageScalarDQA);
pxfres->Add(pexprTwoStageScalarDQA);
}

// generate two-stage agg
// this transform is useful for cases where distinct column is same as distributed column.
// for a query like "select count(distinct a) from bar;"
// we generate a two stage agg where the aggregate operator gives us the distinct values.
// CScalarProjectList for the Local agg below is empty on purpose.

// +--CLogicalGbAgg( Global ) Grp Cols: [][Global]
// |--CLogicalGbAgg( Local ) Grp Cols: ["a" (0)][Local],
// | |--CLogicalGet "bar" ("bar"),
// | +--CScalarProjectList
// +--CScalarProjectList
// +--CScalarProjectElement "count" (9)
// +--CScalarAggFunc (count , Distinct: false , Aggregate Stage: Global)
// +--CScalarIdent "a" (0)

CExpression *pexprTwoStageScalarDQA = PexprSplitHelper(
mp, col_factory, md_accessor, pexpr, pexprRelational, phmexprcr,
pdrgpcrArgDQA, CLogicalGbAgg::EasTwoStageScalarDQA);
pxfres->Add(pexprTwoStageScalarDQA);


// generate local DQA, global agg for both scalar and non-scalar agg cases.
// for a query like "select count(distinct a) from bar;"
Expand Down
2 changes: 2 additions & 0 deletions src/include/nodes/nodes.h
Original file line number Diff line number Diff line change
Expand Up @@ -985,6 +985,8 @@ typedef enum AggSplit
* stripped away from Aggs in setrefs.c.
*/
AGGSPLIT_DEDUPLICATED = AGGSPLITOP_DEDUPLICATED,

AGGSPLIT_INTERNMEDIATE = AGGSPLITOP_SKIPFINAL | AGGSPLITOP_SERIALIZE | AGGSPLITOP_COMBINE | AGGSPLITOP_DESERIALIZE,
} AggSplit;

/* Test whether an AggSplit value selects each primitive option: */
Expand Down
2 changes: 0 additions & 2 deletions src/test/regress/expected/aggregates_optimizer.out
Original file line number Diff line number Diff line change
Expand Up @@ -579,8 +579,6 @@ group by ten order by ten;

select ten, count(four), sum(DISTINCT four) from onek
group by ten order by ten;
INFO: GPORCA failed to produce a plan, falling back to planner
DETAIL: GPDB Expression type: GPDB_96_MERGE_FIXME: Intermediate aggregate stage not implemented not supported in DXL
ten | count | sum
-----+-------+-----
0 | 100 | 2
Expand Down

0 comments on commit 7ff1872

Please sign in to comment.