Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Push the runtime filter from HashJoin down to SeqScan or AM. #724

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Open
25 changes: 25 additions & 0 deletions src/backend/commands/explain.c
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ static void show_incremental_sort_info(IncrementalSortState *incrsortstate,
static void show_hash_info(HashState *hashstate, ExplainState *es);
static void show_runtime_filter_info(RuntimeFilterState *rfstate,
ExplainState *es);
static void show_pushdown_runtime_filter_info(const char *qlabel,
PlanState *planstate,
ExplainState *es);
static void show_memoize_info(MemoizeState *mstate, List *ancestors,
ExplainState *es);
static void show_hashagg_info(AggState *hashstate, ExplainState *es);
Expand Down Expand Up @@ -2415,6 +2418,10 @@ ExplainNode(PlanState *planstate, List *ancestors,
/* fall through to print additional fields the same as SeqScan */
/* FALLTHROUGH */
case T_SeqScan:
if (gp_enable_runtime_filter_pushdown && IsA(planstate, SeqScanState))
show_pushdown_runtime_filter_info("Rows Removed by Pushdown Runtime Filter",
planstate, es);
/* FALLTHROUGH */
case T_DynamicSeqScan:
case T_ValuesScan:
case T_CteScan:
Expand Down Expand Up @@ -4285,6 +4292,24 @@ show_instrumentation_count(const char *qlabel, int which,
}
}

/*
* If it's EXPLAIN ANALYZE, show instrumentation information with pushdown
* runtime filter.
*/
static void
show_pushdown_runtime_filter_info(const char *qlabel,
PlanState *planstate,
ExplainState *es)
{
Assert(gp_enable_runtime_filter_pushdown && IsA(planstate, SeqScanState));

if (!es->analyze || !planstate->instrument)
return;

if (planstate->instrument->prf_work)
ExplainPropertyFloat(qlabel, NULL, planstate->instrument->nfilteredPRF, 0, es);
}

/*
* Show extra information for a ForeignScan node.
*/
Expand Down
6 changes: 6 additions & 0 deletions src/backend/commands/explain_gp.c
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ typedef struct CdbExplain_StatInst
double nloops; /* # of run cycles for this node */
double nfiltered1;
double nfiltered2;
bool prf_work;
double nfilteredPRF;
double execmemused; /* executor memory used (bytes) */
double workmemused; /* work_mem actually used (bytes) */
double workmemwanted; /* work_mem to avoid workfile i/o (bytes) */
Expand Down Expand Up @@ -886,6 +888,8 @@ cdbexplain_collectStatsFromNode(PlanState *planstate, CdbExplain_SendStatCtx *ct
si->nloops = instr->nloops;
si->nfiltered1 = instr->nfiltered1;
si->nfiltered2 = instr->nfiltered2;
si->prf_work = instr->prf_work;
si->nfilteredPRF = instr->nfilteredPRF;
si->workmemused = instr->workmemused;
si->workmemwanted = instr->workmemwanted;
si->workfileCreated = instr->workfileCreated;
Expand Down Expand Up @@ -1189,6 +1193,8 @@ cdbexplain_depositStatsToNode(PlanState *planstate, CdbExplain_RecvStatCtx *ctx)
instr->nloops = nodeAcc->nsimax->nloops;
instr->nfiltered1 = nodeAcc->nsimax->nfiltered1;
instr->nfiltered2 = nodeAcc->nsimax->nfiltered2;
instr->prf_work = nodeAcc->nsimax->prf_work;
instr->nfilteredPRF = nodeAcc->nsimax->nfilteredPRF;
instr->execmemused = nodeAcc->nsimax->execmemused;
instr->workmemused = nodeAcc->nsimax->workmemused;
instr->workmemwanted = nodeAcc->nsimax->workmemwanted;
Expand Down
159 changes: 158 additions & 1 deletion src/backend/executor/nodeHash.c
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,10 @@ static bool ExecParallelHashTuplePrealloc(HashJoinTable hashtable,
size_t size);
static void ExecParallelHashMergeCounters(HashJoinTable hashtable);
static void ExecParallelHashCloseBatchAccessors(HashJoinTable hashtable);

static void BuildRuntimeFilter(HashState *node, TupleTableSlot *slot);
static void PushdownRuntimeFilter(HashState *node);
static void FreeRuntimeFilter(HashState *node);
static void ResetRuntimeFilter(HashState *node);

/* ----------------------------------------------------------------
* ExecHash
Expand Down Expand Up @@ -193,7 +196,15 @@ MultiExecPrivateHash(HashState *node)
{
slot = ExecProcNode(outerNode);
if (TupIsNull(slot))
{
if (gp_enable_runtime_filter_pushdown && node->filters)
PushdownRuntimeFilter(node);
break;
}

if (gp_enable_runtime_filter_pushdown && node->filters)
BuildRuntimeFilter(node, slot);

/* We have to compute the hash value */
econtext->ecxt_outertuple = slot;
bool hashkeys_null = false;
Expand Down Expand Up @@ -335,6 +346,7 @@ MultiExecParallelHash(HashState *node)
slot = ExecProcNode(outerNode);
if (TupIsNull(slot))
break;

econtext->ecxt_outertuple = slot;
if (ExecHashGetHashValue(node, hashtable, econtext, hashkeys,
false, hashtable->keepNulls,
Expand Down Expand Up @@ -512,6 +524,9 @@ ExecEndHash(HashState *node)
*/
outerPlan = outerPlanState(node);
ExecEndNode(outerPlan);

if (gp_enable_runtime_filter_pushdown && node->filters)
FreeRuntimeFilter(node);
}


Expand Down Expand Up @@ -2520,6 +2535,9 @@ ExecReScanHash(HashState *node)
*/
if (node->ps.lefttree->chgParam == NULL)
ExecReScan(node->ps.lefttree);

if (gp_enable_runtime_filter_pushdown && node->filters)
ResetRuntimeFilter(node);
}


Expand Down Expand Up @@ -4126,3 +4144,142 @@ get_hash_mem(void)

return (int) mem_limit;
}

/*
* Convert AttrFilter to ScanKeyData and send these runtime filters to the
* target node(seqscan).
*/
void
PushdownRuntimeFilter(HashState *node)
{
ListCell *lc;
List *scankeys;
ScanKey sk;
AttrFilter *attr_filter;

foreach (lc, node->filters)
{
scankeys = NIL;

attr_filter = lfirst(lc);
if (!IsA(attr_filter->target, SeqScanState) || attr_filter->empty)
continue;

/* bloom filter */
sk = (ScanKey)palloc0(sizeof(ScanKeyData));
sk->sk_flags = SK_BLOOM_FILTER;
sk->sk_attno = attr_filter->lattno;
sk->sk_subtype = INT8OID;
sk->sk_argument = PointerGetDatum(attr_filter->blm_filter);
scankeys = lappend(scankeys, sk);

/* range filter */
sk = (ScanKey)palloc0(sizeof(ScanKeyData));
sk->sk_flags = 0;
sk->sk_attno = attr_filter->lattno;
sk->sk_strategy = BTGreaterEqualStrategyNumber;
sk->sk_subtype = INT8OID;
sk->sk_argument = attr_filter->min;
scankeys = lappend(scankeys, sk);

sk = (ScanKey)palloc0(sizeof(ScanKeyData));
sk->sk_flags = 0;
sk->sk_attno = attr_filter->lattno;
sk->sk_strategy = BTLessEqualStrategyNumber;
sk->sk_subtype = INT8OID;
sk->sk_argument = attr_filter->max;
scankeys = lappend(scankeys, sk);

/* append new runtime filters to target node */
SeqScanState *sss = castNode(SeqScanState, attr_filter->target);
sss->filters = list_concat(sss->filters, scankeys);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we merge filter here on the same attno ?

Copy link
Contributor Author

@zhangyue-hashdata zhangyue-hashdata Nov 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Combining Bloom filters will result in a higher False Positive Rate (FPR) compared to using each of the individual Bloom filters separately, so it is not recommended;
  2. There is the same problem to combine range filters like combining Bloom filters;
  3. There is only one Bloom filter and one range filter on the same attribute in many cases;

Copy link
Member

@yjhjstz yjhjstz Nov 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

create table t1(a int, b int) with(parallel_workers=2);
create table rt1(a int, b int) with(parallel_workers=2);
create table rt2(a int, b int);
create table rt3(a int, b int);
insert into t1 select i, i from generate_series(1, 100000) i;
insert into t1 select i, i+1 from generate_series(1, 10) i;
insert into rt1 select i, i+1 from generate_series(1, 10) i;
insert into rt2 select i, i+1 from generate_series(1, 10000) i;
insert into rt3 select i, i+1 from generate_series(1, 10) i;
analyze t1;
analyze rt1;
analyze rt2;
analyze rt3;

explain analyze select * from rt1 join t1 on rt1.a = t1.b join rt3 on rt3.a = t1.b;

postgres=# explain select * from rt1 join t1 on rt1.a = t1.b join rt3 on rt3.a = t1.b;
                                   QUERY PLAN                                   
--------------------------------------------------------------------------------
 Gather Motion 3:1  (slice1; segments: 3)  (cost=2.45..428.51 rows=17 width=24)
   ->  Hash Join  (cost=2.45..428.29 rows=6 width=24)
         Hash Cond: (t1.b = rt1.a)
         ->  Hash Join  (cost=1.23..427.00 rows=6 width=16)
               Hash Cond: (t1.b = rt3.a)
               ->  Seq Scan on t1  (cost=0.00..342.37 rows=33337 width=8)
               ->  Hash  (cost=1.10..1.10 rows=10 width=8)
                     ->  Seq Scan on rt3  (cost=0.00..1.10 rows=10 width=8)
         ->  Hash  (cost=1.10..1.10 rows=10 width=8)
               ->  Seq Scan on rt1  (cost=0.00..1.10 rows=10 width=8)
 Optimizer: Postgres query optimizer
(11 rows)

you can try this case, will got two range filters.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it

}
}

static void
BuildRuntimeFilter(HashState *node, TupleTableSlot *slot)
{
Datum val;
bool isnull;
ListCell *lc;
AttrFilter *attr_filter;

foreach (lc, node->filters)
{
attr_filter = (AttrFilter *) lfirst(lc);

val = slot_getattr(slot, attr_filter->rattno, &isnull);
if (isnull)
continue;

attr_filter->empty = false;

if ((int64_t)val < (int64_t)attr_filter->min)
attr_filter->min = val;

if ((int64_t)val > (int64_t)attr_filter->max)
attr_filter->max = val;

if (attr_filter->blm_filter)
bloom_add_element(attr_filter->blm_filter, (unsigned char *)&val, sizeof(Datum));
}
}

void
FreeRuntimeFilter(HashState *node)
{
ListCell *lc;
AttrFilter *attr_filter;

if (!node->filters)
return;

foreach (lc, node->filters)
{
attr_filter = lfirst(lc);
if (attr_filter->blm_filter)
bloom_free(attr_filter->blm_filter);
}

list_free_deep(node->filters);
node->filters = NIL;
}

void
ResetRuntimeFilter(HashState *node)
{
ListCell *lc;
AttrFilter *attr_filter;
SeqScanState *sss;

if (!node->filters)
return;

foreach (lc, node->filters)
{
attr_filter = lfirst(lc);
attr_filter->empty = true;

if (IsA(attr_filter->target, SeqScanState))
{
sss = castNode(SeqScanState, attr_filter->target);
if (sss->filters)
{
list_free_deep(sss->filters);
sss->filters = NIL;
}
}

if (attr_filter->blm_filter)
bloom_free(attr_filter->blm_filter);

attr_filter->blm_filter = bloom_create_aggresive(node->ps.plan->plan_rows,
work_mem,
random());

StaticAssertDecl(sizeof(LONG_MAX) == sizeof(Datum), "sizeof(LONG_MAX) should be equal to sizeof(Datum)");
StaticAssertDecl(sizeof(LONG_MIN) == sizeof(Datum), "sizeof(LONG_MIN) should be equal to sizeof(Datum)");
attr_filter->min = LONG_MAX;
attr_filter->max = LONG_MIN;
}
}
Loading