Skip to content

Commit

Permalink
Fix sort pushdown for partially compressed chunks
Browse files Browse the repository at this point in the history
Using all sort pathkeys with append node for partially compressed
chunks ends up creating invalid plans when using joined columns
in ORDER BY clause. Using the pathkey prefix that belongs to the
underlying relation fixes the issue and sets us up for possible
incremental sort optimization.
  • Loading branch information
antekresic committed Jun 4, 2024
1 parent f34c293 commit eb548b6
Show file tree
Hide file tree
Showing 6 changed files with 122 additions and 14 deletions.
2 changes: 2 additions & 0 deletions .unreleased/pr_6992
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Fixes: #6975 Fix sort pushdown for partially compressed chunks
Thanks: @srieding for reporting an issue with partially compressed chunks and ordering on joined columns
46 changes: 32 additions & 14 deletions tsl/src/nodes/decompress_chunk/decompress_chunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -971,6 +971,23 @@ ts_decompress_chunk_generate_paths(PlannerInfo *root, RelOptInfo *chunk_rel, Hyp
req_outer);
}
else
{
/* Check all pathkey components can be satisfied by current chunk */
List *pathkeys = NIL;
ListCell *lc;
foreach (lc, root->query_pathkeys)
{
PathKey *pathkey = (PathKey *) lfirst(lc);
EquivalenceClass *pathkey_ec = pathkey->pk_eclass;

Expr *em_expr = find_em_expr_for_rel(pathkey_ec, chunk_rel);

/* No em expression found for our rel */
if (!em_expr)
break;

pathkeys = lappend(pathkeys, pathkey);
}
/*
* Ideally, we would like for this to be a MergeAppend path.
* However, accumulate_append_subpath will cut out MergeAppend
Expand All @@ -982,23 +999,24 @@ ts_decompress_chunk_generate_paths(PlannerInfo *root, RelOptInfo *chunk_rel, Hyp
chunk_rel,
list_make2(chunk_path, uncompressed_path),
NIL /* partial paths */,
root->query_pathkeys /* pathkeys */,
pathkeys,
req_outer,
0,
false,
chunk_path->rows + uncompressed_path->rows);
}
}

/* Add useful sorted versions of the decompress path */
add_chunk_sorted_paths(root, chunk_rel, ht, ht_relid, chunk_path, compressed_path);

/* this has to go after the path is copied for the ordered path since path can get freed in
* add_path */
/* this has to go after the path is copied for the ordered path since path can get freed
* in add_path */
add_path(chunk_rel, chunk_path);
}

/* the chunk_rel now owns the paths, remove them from the compressed_rel so they can't be freed
* if it's planned */
/* the chunk_rel now owns the paths, remove them from the compressed_rel so they can't be
* freed if it's planned */
compressed_rel->pathlist = NIL;
/* create parallel paths */
if (compressed_rel->consider_parallel)
Expand Down Expand Up @@ -1047,9 +1065,9 @@ ts_decompress_chunk_generate_paths(PlannerInfo *root, RelOptInfo *chunk_rel, Hyp

/*
* All children of an append path are required to have the same parameterization
* so we reparameterize here when we couldn't get a path with the parameterization
* we need. Reparameterization should always succeed here since uncompressed_path
* should always be a scan.
* so we reparameterize here when we couldn't get a path with the
* parameterization we need. Reparameterization should always succeed here since
* uncompressed_path should always be a scan.
*/
if (!bms_equal(req_outer, PATH_REQ_OUTER(uncompressed_path)))
{
Expand Down Expand Up @@ -1084,8 +1102,8 @@ ts_decompress_chunk_generate_paths(PlannerInfo *root, RelOptInfo *chunk_rel, Hyp

add_partial_path(chunk_rel, path);
}
/* the chunk_rel now owns the paths, remove them from the compressed_rel so they can't be
* freed if it's planned */
/* the chunk_rel now owns the paths, remove them from the compressed_rel so they can't
* be freed if it's planned */
compressed_rel->partial_pathlist = NIL;
}
/* Remove the compressed_rel from the simple_rel_array to prevent it from
Expand Down Expand Up @@ -1796,10 +1814,10 @@ create_compressed_scan_paths(PlannerInfo *root, RelOptInfo *compressed_rel, Comp
{
/* Almost the same functionality as ts_create_plain_partial_paths.
*
* However, we also create a partial path for small chunks to allow PostgreSQL to choose a
* parallel plan for decompression. If no partial path is present for a single chunk,
* PostgreSQL will not use a parallel plan and all chunks are decompressed by a non-parallel
* plan (even if there are a few bigger chunks).
* However, we also create a partial path for small chunks to allow PostgreSQL to choose
* a parallel plan for decompression. If no partial path is present for a single chunk,
* PostgreSQL will not use a parallel plan and all chunks are decompressed by a
* non-parallel plan (even if there are a few bigger chunks).
*/
int parallel_workers = compute_parallel_worker(compressed_rel,
compressed_rel->pages,
Expand Down
25 changes: 25 additions & 0 deletions tsl/test/expected/merge_append_partially_compressed-14.out
Original file line number Diff line number Diff line change
Expand Up @@ -870,6 +870,31 @@ SELECT * FROM test1 ORDER BY x1, x2, x5, time, x3 DESC LIMIT 10;
-> Seq Scan on _hyper_3_7_chunk (actual rows=1 loops=1)
(12 rows)

-- test append with join column in orderby
-- #6975
CREATE TABLE join_table (
x1 integer,
y1 float);
INSERT INTO join_table VALUES (1, 1.0), (2,2.0);
:PREFIX
SELECT * FROM test1 t1 JOIN join_table jt ON t1.x1 = jt.x1
ORDER BY t1.x1, jt.y1;
QUERY PLAN
--------------------------------------------------------------------------------------------------------
Sort (actual rows=4 loops=1)
Sort Key: t1_1.x1, jt.y1
Sort Method: quicksort
-> Hash Join (actual rows=4 loops=1)
Hash Cond: (jt.x1 = t1_1.x1)
-> Seq Scan on join_table jt (actual rows=2 loops=1)
-> Hash (actual rows=5 loops=1)
Buckets: 4096 Batches: 1
-> Append (actual rows=5 loops=1)
-> Custom Scan (DecompressChunk) on _hyper_3_7_chunk t1_1 (actual rows=4 loops=1)
-> Seq Scan on compress_hyper_4_8_chunk (actual rows=3 loops=1)
-> Seq Scan on _hyper_3_7_chunk t1_1 (actual rows=1 loops=1)
(12 rows)

---------------------------------------------------------------------------
-- test queries without ordered append, but still eligible for sort pushdown
---------------------------------------------------------------------------
Expand Down
25 changes: 25 additions & 0 deletions tsl/test/expected/merge_append_partially_compressed-15.out
Original file line number Diff line number Diff line change
Expand Up @@ -877,6 +877,31 @@ SELECT * FROM test1 ORDER BY x1, x2, x5, time, x3 DESC LIMIT 10;
-> Seq Scan on _hyper_3_7_chunk (actual rows=1 loops=1)
(12 rows)

-- test append with join column in orderby
-- #6975
CREATE TABLE join_table (
x1 integer,
y1 float);
INSERT INTO join_table VALUES (1, 1.0), (2,2.0);
:PREFIX
SELECT * FROM test1 t1 JOIN join_table jt ON t1.x1 = jt.x1
ORDER BY t1.x1, jt.y1;
QUERY PLAN
--------------------------------------------------------------------------------------------------------
Sort (actual rows=4 loops=1)
Sort Key: t1_1.x1, jt.y1
Sort Method: quicksort
-> Hash Join (actual rows=4 loops=1)
Hash Cond: (jt.x1 = t1_1.x1)
-> Seq Scan on join_table jt (actual rows=2 loops=1)
-> Hash (actual rows=5 loops=1)
Buckets: 4096 Batches: 1
-> Append (actual rows=5 loops=1)
-> Custom Scan (DecompressChunk) on _hyper_3_7_chunk t1_1 (actual rows=4 loops=1)
-> Seq Scan on compress_hyper_4_8_chunk (actual rows=3 loops=1)
-> Seq Scan on _hyper_3_7_chunk t1_1 (actual rows=1 loops=1)
(12 rows)

---------------------------------------------------------------------------
-- test queries without ordered append, but still eligible for sort pushdown
---------------------------------------------------------------------------
Expand Down
25 changes: 25 additions & 0 deletions tsl/test/expected/merge_append_partially_compressed-16.out
Original file line number Diff line number Diff line change
Expand Up @@ -877,6 +877,31 @@ SELECT * FROM test1 ORDER BY x1, x2, x5, time, x3 DESC LIMIT 10;
-> Seq Scan on _hyper_3_7_chunk (actual rows=1 loops=1)
(12 rows)

-- test append with join column in orderby
-- #6975
CREATE TABLE join_table (
x1 integer,
y1 float);
INSERT INTO join_table VALUES (1, 1.0), (2,2.0);
:PREFIX
SELECT * FROM test1 t1 JOIN join_table jt ON t1.x1 = jt.x1
ORDER BY t1.x1, jt.y1;
QUERY PLAN
--------------------------------------------------------------------------------------------------------
Sort (actual rows=4 loops=1)
Sort Key: t1_1.x1, jt.y1
Sort Method: quicksort
-> Hash Join (actual rows=4 loops=1)
Hash Cond: (jt.x1 = t1_1.x1)
-> Seq Scan on join_table jt (actual rows=2 loops=1)
-> Hash (actual rows=5 loops=1)
Buckets: 4096 Batches: 1
-> Append (actual rows=5 loops=1)
-> Custom Scan (DecompressChunk) on _hyper_3_7_chunk t1_1 (actual rows=4 loops=1)
-> Seq Scan on compress_hyper_4_8_chunk (actual rows=3 loops=1)
-> Seq Scan on _hyper_3_7_chunk t1_1 (actual rows=1 loops=1)
(12 rows)

---------------------------------------------------------------------------
-- test queries without ordered append, but still eligible for sort pushdown
---------------------------------------------------------------------------
Expand Down
13 changes: 13 additions & 0 deletions tsl/test/sql/merge_append_partially_compressed.sql.in
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,19 @@ SELECT * FROM test1 ORDER BY x1, x2, x5, time ASC, x3 DESC, x4 DESC LIMIT 10; --
:PREFIX
SELECT * FROM test1 ORDER BY x1, x2, x5, time, x3 DESC LIMIT 10;

-- test append with join column in orderby
-- #6975

CREATE TABLE join_table (
x1 integer,
y1 float);

INSERT INTO join_table VALUES (1, 1.0), (2,2.0);

:PREFIX
SELECT * FROM test1 t1 JOIN join_table jt ON t1.x1 = jt.x1
ORDER BY t1.x1, jt.y1;

---------------------------------------------------------------------------
-- test queries without ordered append, but still eligible for sort pushdown
---------------------------------------------------------------------------
Expand Down

0 comments on commit eb548b6

Please sign in to comment.