Skip to content

Commit

Permalink
Fix backwards scan on Hypercore TAM
Browse files Browse the repository at this point in the history
Fix a bug when backwards scanning a ColumnarScan and add a test case.
  • Loading branch information
erimatnor committed Feb 7, 2025
1 parent 8cff1c2 commit cde8185
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 24 deletions.
9 changes: 5 additions & 4 deletions tsl/src/hypercore/arrow_tts.h
Original file line number Diff line number Diff line change
Expand Up @@ -383,18 +383,19 @@ arrow_slot_try_getnext(TupleTableSlot *slot, ScanDirection direction)
Assert(direction == ForwardScanDirection || direction == BackwardScanDirection);

/* If empty or not containing a compressed tuple, there is nothing to do */
if (unlikely(TTS_EMPTY(slot)) || aslot->tuple_index == InvalidTupleIndex)
if (unlikely(TTS_EMPTY(slot)) || aslot->tuple_index == InvalidTupleIndex ||
arrow_slot_is_consumed(slot))
return false;

if (direction == ForwardScanDirection)
if (likely(direction == ForwardScanDirection))
{
if (aslot->tuple_index < aslot->total_row_count)
if (!arrow_slot_is_last(slot))
{
ExecStoreNextArrowTuple(slot);
return true;
}
}
else if (aslot->tuple_index > 1)
else if (!arrow_slot_is_first(slot))
{
Assert(direction == BackwardScanDirection);
ExecStorePreviousArrowTuple(slot);
Expand Down
12 changes: 6 additions & 6 deletions tsl/src/hypercore/hypercore_handler.c
Original file line number Diff line number Diff line change
Expand Up @@ -873,26 +873,26 @@ hypercore_getnextslot_noncompressed(HypercoreScanDesc scan, ScanDirection direct
return result;
}

static bool
static inline bool
should_read_new_compressed_slot(TupleTableSlot *slot, ScanDirection direction)
{
/* Scans are never invoked with NoMovementScanDirection */
Assert(direction != NoMovementScanDirection);

/* A slot can be empty if just started the scan or (or moved back to the
* start due to backward scan) */
if (TTS_EMPTY(slot))
if (TTS_EMPTY(slot) || arrow_slot_is_consumed(slot))
return true;

if (direction == ForwardScanDirection)
if (likely(direction == ForwardScanDirection))
{
if (arrow_slot_is_last(slot) || arrow_slot_is_consumed(slot))
if (arrow_slot_is_last(slot))
return true;
}
else if (direction == BackwardScanDirection)
{
/* Check if backward scan reached the start or the slot values */
if (arrow_slot_row_index(slot) <= 1)
if (arrow_slot_is_first(slot))
return true;
}

Expand All @@ -914,7 +914,7 @@ hypercore_getnextslot_compressed(HypercoreScanDesc scan, ScanDirection direction
{
ExecClearTuple(slot);

if (direction == ForwardScanDirection)
if (likely(direction == ForwardScanDirection))
{
scan->hs_scan_state = HYPERCORE_SCAN_NON_COMPRESSED;
return hypercore_getnextslot(&scan->rs_base, direction, slot);
Expand Down
31 changes: 23 additions & 8 deletions tsl/src/hypercore/vector_quals.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
#include <postgres.h>
#include "nodes/decompress_chunk/vector_quals.h"
#include <access/sdir.h>
#include <utils/memutils.h>

#include "arrow_tts.h"
Expand Down Expand Up @@ -88,12 +89,13 @@ uint16
ExecVectorQual(VectorQualState *vqstate, ExprContext *econtext)
{
TupleTableSlot *slot = econtext->ecxt_scantuple;
const uint16 rowindex = arrow_slot_row_index(slot);
ScanDirection direction = econtext->ecxt_estate->es_direction;

/* Compute the vector quals over both compressed and non-compressed
* tuples. In case a non-compressed tuple is filtered, return SomeRowsPass
* although only one row will pass. */
if (rowindex <= 1)
if ((direction == ForwardScanDirection && arrow_slot_is_first(slot)) ||
(direction == BackwardScanDirection && arrow_slot_is_last(slot)))
{
vector_qual_state_reset(vqstate);
VectorQualSummary vector_qual_summary = vqstate->vectorized_quals_constified != NIL ?
Expand All @@ -114,6 +116,8 @@ ExecVectorQual(VectorQualState *vqstate, ExprContext *econtext)
case SomeRowsPass:
break;
}

arrow_slot_set_qual_result(slot, vqstate->vector_qual_result);
}

/* Fast path when all rows have passed (i.e., no rows filtered). No need
Expand All @@ -123,16 +127,27 @@ ExecVectorQual(VectorQualState *vqstate, ExprContext *econtext)

const uint16 nrows = arrow_slot_total_row_count(slot);
const uint16 off = arrow_slot_arrow_offset(slot);
const uint64 *qual_result = arrow_slot_get_qual_result(slot);
uint16 nfiltered = 0;

for (uint16 i = off; i < nrows; i++)
if (direction == ForwardScanDirection)
{
if (arrow_row_is_valid(vqstate->vector_qual_result, i))
break;
nfiltered++;
for (uint16 i = off; i < nrows; i++)
{
if (arrow_row_is_valid(qual_result, i))
break;
nfiltered++;
}
}
else
{
for (uint16 i = off; i > 0; i--)
{
if (arrow_row_is_valid(qual_result, i))
break;
nfiltered++;
}
}

arrow_slot_set_qual_result(slot, vqstate->vector_qual_result);

return nfiltered;
}
19 changes: 13 additions & 6 deletions tsl/src/nodes/columnar_scan/columnar_scan.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ typedef struct ColumnarScanState
{
CustomScanState css;
VectorQualState vqstate;
ExprState *segmentby_qual;
ExprState *segmentby_exprstate;
ScanKey scankeys;
int nscankeys;
List *scankey_quals;
Expand Down Expand Up @@ -487,13 +487,13 @@ ExecSegmentbyQual(ExprState *qual, ExprContext *econtext)
TupleTableSlot *slot = econtext->ecxt_scantuple;
ScanDirection direction = econtext->ecxt_estate->es_direction;

Assert(direction == ForwardScanDirection || direction == BackwardScanDirection);
Assert(TTS_IS_ARROWTUPLE(slot));

if (qual == NULL || !should_check_segmentby_qual(direction, slot))
return true;

Assert(!arrow_slot_is_consumed(slot));

return ExecQual(qual, econtext);
}

Expand Down Expand Up @@ -540,7 +540,7 @@ columnar_scan_exec(CustomScanState *state)
* If no quals to check, do the fast path and just return the raw scan
* tuple or a projected one.
*/
if (!qual && !has_vecquals && !cstate->segmentby_qual)
if (!qual && !has_vecquals && !cstate->segmentby_exprstate)
{
bool gottuple = getnextslot(scandesc, direction, slot);

Expand Down Expand Up @@ -594,7 +594,7 @@ columnar_scan_exec(CustomScanState *state)
* since segmentby quals don't need decompression and can filter all
* values in an arrow slot in one go.
*/
if (!ExecSegmentbyQual(cstate->segmentby_qual, econtext))
if (!ExecSegmentbyQual(cstate->segmentby_exprstate, econtext))
{
/* The slot didn't pass filters so read the next slot */
const uint16 nrows = arrow_slot_total_row_count(slot);
Expand All @@ -613,7 +613,14 @@ columnar_scan_exec(CustomScanState *state)
TS_DEBUG_LOG("vectorized filtering of %u rows", nfiltered);

/* Skip ahead with the amount filtered */
ExecIncrArrowTuple(slot, nfiltered);
if (direction == ForwardScanDirection)
ExecIncrArrowTuple(slot, nfiltered);
else
{
Assert(direction == BackwardScanDirection);
ExecDecrArrowTuple(slot, nfiltered);
}

InstrCountFiltered1(state, nfiltered);

if (nfiltered == total_nrows && total_nrows > 1)
Expand Down Expand Up @@ -787,7 +794,7 @@ columnar_scan_begin(CustomScanState *state, EState *estate, int eflags)
if (cstate->css.ss.ps.ps_ProjInfo)
create_simple_projection_state_if_possible(cstate);

cstate->segmentby_qual = ExecInitQual(cstate->segmentby_quals, (PlanState *) state);
cstate->segmentby_exprstate = ExecInitQual(cstate->segmentby_quals, (PlanState *) state);

/*
* After having initialized ExprState for processing regular and segmentby
Expand Down
47 changes: 47 additions & 0 deletions tsl/test/expected/hypercore_scans.out
Original file line number Diff line number Diff line change
Expand Up @@ -1159,3 +1159,50 @@ where time <= '2022-06-02' and device = '1'::oid;
1381.63850675326
(1 row)

--
-- Test backwards scan with segmentby and vector quals
--
select count(*)-4 as myoffset from readings
where time <= '2022-06-02' and device in (1, 2)
\gset
-- Get the last four values to compare with cursor fetch backward from
-- the end
select * from readings
where time <= '2022-06-02' and device in (1, 2)
offset :myoffset;
time | location | device | temp | humidity
------------------------------+----------+--------+------------------+------------------
Wed Jun 01 19:10:00 2022 PDT | 3 | 2 | 25.6339083021175 | 85.7531443688847
Wed Jun 01 20:30:00 2022 PDT | 2 | 2 | 6.02098537951642 | 56.3153986241908
Wed Jun 01 23:15:00 2022 PDT | 2 | 2 | 35.7529756426646 | 86.0243391529811
Wed Jun 01 23:40:00 2022 PDT | 3 | 2 | 3.94232546784524 | 55.454709690509
(4 rows)

begin;
declare cur1 scroll cursor for
select * from readings
where time <= '2022-06-02' and device in (1, 2);
move last cur1;
-- move one step beyond last
fetch forward 1 from cur1;
time | location | device | temp | humidity
------+----------+--------+------+----------
(0 rows)

-- fetch the last 4 values with two fetches
fetch backward 2 from cur1;
time | location | device | temp | humidity
------------------------------+----------+--------+------------------+------------------
Wed Jun 01 23:40:00 2022 PDT | 3 | 2 | 3.94232546784524 | 55.454709690509
Wed Jun 01 23:15:00 2022 PDT | 2 | 2 | 35.7529756426646 | 86.0243391529811
(2 rows)

fetch backward 2 from cur1;
time | location | device | temp | humidity
------------------------------+----------+--------+------------------+------------------
Wed Jun 01 20:30:00 2022 PDT | 2 | 2 | 6.02098537951642 | 56.3153986241908
Wed Jun 01 19:10:00 2022 PDT | 3 | 2 | 25.6339083021175 | 85.7531443688847
(2 rows)

close cur1;
commit;
26 changes: 26 additions & 0 deletions tsl/test/sql/hypercore_scans.sql
Original file line number Diff line number Diff line change
Expand Up @@ -449,3 +449,29 @@ where time <= '2022-06-02' and device = '1'::oid;

select sum(humidity) from readings
where time <= '2022-06-02' and device = '1'::oid;

--
-- Test backwards scan with segmentby and vector quals
--
select count(*)-4 as myoffset from readings
where time <= '2022-06-02' and device in (1, 2)
\gset

-- Get the last four values to compare with cursor fetch backward from
-- the end
select * from readings
where time <= '2022-06-02' and device in (1, 2)
offset :myoffset;

begin;
declare cur1 scroll cursor for
select * from readings
where time <= '2022-06-02' and device in (1, 2);
move last cur1;
-- move one step beyond last
fetch forward 1 from cur1;
-- fetch the last 4 values with two fetches
fetch backward 2 from cur1;
fetch backward 2 from cur1;
close cur1;
commit;

0 comments on commit cde8185

Please sign in to comment.