Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-pick commit Support intermediate aggs in Orca plans (#13707) #741

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 6 additions & 8 deletions src/backend/executor/nodeAgg.c
Original file line number Diff line number Diff line change
Expand Up @@ -3884,8 +3884,6 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)

/* Planner should have assigned aggregate to correct level */
Assert(aggref->agglevelsup == 0);
/* ... and the split mode should match */
Assert(aggref->aggsplit == aggstate->aggsplit);

peragg = &peraggs[aggref->aggno];

Expand Down Expand Up @@ -3917,7 +3915,7 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
Assert(OidIsValid(aggtranstype));

/* Final function only required if we're finalizing the aggregates */
if (DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit))
if (DO_AGGSPLIT_SKIPFINAL(aggref->aggsplit))
peragg->finalfn_oid = finalfn_oid = InvalidOid;
else
peragg->finalfn_oid = finalfn_oid = aggform->aggfinalfn;
Expand All @@ -3936,21 +3934,21 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
* every aggregate with an INTERNAL state has a serialization
* function. Verify that.
*/
if (DO_AGGSPLIT_SERIALIZE(aggstate->aggsplit))
if (DO_AGGSPLIT_SERIALIZE(aggref->aggsplit))
{
/* serialization only valid when not running finalfn */
Assert(DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit));
Assert(DO_AGGSPLIT_SKIPFINAL(aggref->aggsplit));

if (!OidIsValid(aggform->aggserialfn))
elog(ERROR, "serialfunc not provided for serialization aggregation");
serialfn_oid = aggform->aggserialfn;
}

/* Likewise for deserialization functions */
if (DO_AGGSPLIT_DESERIALIZE(aggstate->aggsplit))
if (DO_AGGSPLIT_DESERIALIZE(aggref->aggsplit))
{
/* deserialization only valid when combining states */
Assert(DO_AGGSPLIT_COMBINE(aggstate->aggsplit));
Assert(DO_AGGSPLIT_COMBINE(aggref->aggsplit));

if (!OidIsValid(aggform->aggdeserialfn))
elog(ERROR, "deserialfunc not provided for deserialization aggregation");
Expand Down Expand Up @@ -4058,7 +4056,7 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
* of using the transition function, we'll use the combine
* function
*/
if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit))
if (DO_AGGSPLIT_COMBINE(aggref->aggsplit))
{
transfn_oid = aggform->aggcombinefn;

Expand Down
5 changes: 1 addition & 4 deletions src/backend/gpopt/translate/CTranslatorDXLToScalar.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -591,10 +591,7 @@ CTranslatorDXLToScalar::TranslateDXLScalarAggrefToScalar(
aggref->aggsplit = AGGSPLIT_INITIAL_SERIAL;
break;
case EdxlaggstageIntermediate:
GPOS_RAISE(
gpdxl::ExmaDXL, gpdxl::ExmiPlStmt2DXLConversion,
GPOS_WSZ_LIT(
"GPDB_96_MERGE_FIXME: Intermediate aggregate stage not implemented"));
aggref->aggsplit = AGGSPLIT_INTERNMEDIATE;
break;
case EdxlaggstageFinal:
aggref->aggsplit = AGGSPLIT_FINAL_DESERIAL;
Expand Down
2 changes: 1 addition & 1 deletion src/backend/gporca/data/dxl/minidump/DQA-1-RegularAgg.mdp
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@
</dxl:LogicalGet>
</dxl:LogicalGroupBy>
</dxl:Query>
<dxl:Plan Id="0" SpaceSize="30">
<dxl:Plan Id="0" SpaceSize="42">
<dxl:GatherMotion InputSegments="0,1" OutputSegments="-1">
<dxl:Properties>
<dxl:Cost StartupCost="0" TotalCost="577.928905" Rows="4.000000" Width="16"/>
Expand Down
2 changes: 1 addition & 1 deletion src/backend/gporca/data/dxl/minidump/DQA-2-RegularAgg.mdp
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@
</dxl:LogicalGet>
</dxl:LogicalGroupBy>
</dxl:Query>
<dxl:Plan Id="0" SpaceSize="30">
<dxl:Plan Id="0" SpaceSize="42">
<dxl:GatherMotion InputSegments="0,1" OutputSegments="-1">
<dxl:Properties>
<dxl:Cost StartupCost="0" TotalCost="579.950342" Rows="4.000000" Width="24"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@
</dxl:LogicalGet>
</dxl:LogicalGroupBy>
</dxl:Query>
<dxl:Plan Id="0" SpaceSize="9">
<dxl:Plan Id="0" SpaceSize="13">
<dxl:Aggregate AggregationStrategy="Plain" StreamSafe="false">
<dxl:Properties>
<dxl:Cost StartupCost="0" TotalCost="438.791035" Rows="1.000000" Width="16"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@
</dxl:LogicalGroupBy>
</dxl:LogicalLimit>
</dxl:Query>
<dxl:Plan Id="0" SpaceSize="101">
<dxl:Plan Id="0" SpaceSize="131">
<dxl:GatherMotion InputSegments="0,1" OutputSegments="-1">
<dxl:Properties>
<dxl:Cost StartupCost="0" TotalCost="585.939124" Rows="5.000000" Width="28"/>
Expand Down
86 changes: 21 additions & 65 deletions src/backend/gporca/libgpopt/src/xforms/CXformSplitDQA.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,40 +79,6 @@ CXformSplitDQA::Exfp(CExpressionHandle &exprhdl) const
return CXform::ExfpHigh;
}

// Checks whether or not the project list contains at least one DQA and one
// non-DQA.
static bool
FContainsRideAlongAggregate(CExpression *pexprProjectList)
{
bool hasDQA = false;
bool hasNonDQA = false;

const ULONG size = pexprProjectList->PdrgPexpr()->Size();
for (ULONG ul = 0; ul < size; ul++)
{
CExpression *pexpr = (*pexprProjectList->PdrgPexpr())[ul];

const ULONG sizeInner = pexpr->PdrgPexpr()->Size();
CScalarAggFunc *paggfunc;
if (sizeInner != 1 || (paggfunc = CScalarAggFunc::PopConvert(
(*pexpr->PdrgPexpr())[0]->Pop())) == nullptr)
{
continue;
}

if (paggfunc->IsDistinct())
{
hasDQA = true;
}
else
{
hasNonDQA = true;
}
}

return hasDQA && hasNonDQA;
}

//---------------------------------------------------------------------------
// @function:
// CXformSplitDQA::Transform
Expand Down Expand Up @@ -184,37 +150,27 @@ CXformSplitDQA::Transform(CXformContext *pxfctxt, CXformResult *pxfres,

pxfres->Add(pexprThreeStageDQA);

// GPDB_96_MERGE_FIXME: Postgres 9.6 merge commit 38d881555207 replaced
// Greenplum multi-stage aggregate executor code with upstream. In the
// process, we lost the intermediate aggregate stage which is useful when
// we have a 'ride-along' aggregate. For example,
//
// SELECT SUM(a), COUNT(DISTINCT b) FROM foo;
//
// After we re-implement intermediate aggregate stage in executor we should
// be able to re-enable the following transform optimization.
if (!FContainsRideAlongAggregate(pexprProjectList))
{
// generate two-stage agg
// this transform is useful for cases where distinct column is same as distributed column.
// for a query like "select count(distinct a) from bar;"
// we generate a two stage agg where the aggregate operator gives us the distinct values.
// CScalarProjectList for the Local agg below is empty on purpose.

// +--CLogicalGbAgg( Global ) Grp Cols: [][Global]
// |--CLogicalGbAgg( Local ) Grp Cols: ["a" (0)][Local],
// | |--CLogicalGet "bar" ("bar"),
// | +--CScalarProjectList
// +--CScalarProjectList
// +--CScalarProjectElement "count" (9)
// +--CScalarAggFunc (count , Distinct: false , Aggregate Stage: Global)
// +--CScalarIdent "a" (0)

CExpression *pexprTwoStageScalarDQA = PexprSplitHelper(
mp, col_factory, md_accessor, pexpr, pexprRelational, phmexprcr,
pdrgpcrArgDQA, CLogicalGbAgg::EasTwoStageScalarDQA);
pxfres->Add(pexprTwoStageScalarDQA);
}

// generate two-stage agg
// this transform is useful for cases where distinct column is same as distributed column.
// for a query like "select count(distinct a) from bar;"
// we generate a two stage agg where the aggregate operator gives us the distinct values.
// CScalarProjectList for the Local agg below is empty on purpose.

// +--CLogicalGbAgg( Global ) Grp Cols: [][Global]
// |--CLogicalGbAgg( Local ) Grp Cols: ["a" (0)][Local],
// | |--CLogicalGet "bar" ("bar"),
// | +--CScalarProjectList
// +--CScalarProjectList
// +--CScalarProjectElement "count" (9)
// +--CScalarAggFunc (count , Distinct: false , Aggregate Stage: Global)
// +--CScalarIdent "a" (0)

CExpression *pexprTwoStageScalarDQA = PexprSplitHelper(
mp, col_factory, md_accessor, pexpr, pexprRelational, phmexprcr,
pdrgpcrArgDQA, CLogicalGbAgg::EasTwoStageScalarDQA);
pxfres->Add(pexprTwoStageScalarDQA);


// generate local DQA, global agg for both scalar and non-scalar agg cases.
// for a query like "select count(distinct a) from bar;"
Expand Down
2 changes: 2 additions & 0 deletions src/include/nodes/nodes.h
Original file line number Diff line number Diff line change
Expand Up @@ -985,6 +985,8 @@ typedef enum AggSplit
* stripped away from Aggs in setrefs.c.
*/
AGGSPLIT_DEDUPLICATED = AGGSPLITOP_DEDUPLICATED,

AGGSPLIT_INTERNMEDIATE = AGGSPLITOP_SKIPFINAL | AGGSPLITOP_SERIALIZE | AGGSPLITOP_COMBINE | AGGSPLITOP_DESERIALIZE,
} AggSplit;

/* Test whether an AggSplit value selects each primitive option: */
Expand Down
2 changes: 0 additions & 2 deletions src/test/regress/expected/aggregates_optimizer.out
Original file line number Diff line number Diff line change
Expand Up @@ -579,8 +579,6 @@ group by ten order by ten;

select ten, count(four), sum(DISTINCT four) from onek
group by ten order by ten;
INFO: GPORCA failed to produce a plan, falling back to planner
DETAIL: GPDB Expression type: GPDB_96_MERGE_FIXME: Intermediate aggregate stage not implemented not supported in DXL
ten | count | sum
-----+-------+-----
0 | 100 | 2
Expand Down
Loading