Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor compression filter handling #6329

Merged
merged 1 commit into from
Nov 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 45 additions & 49 deletions tsl/src/compression/compression.c
Original file line number Diff line number Diff line change
Expand Up @@ -934,8 +934,9 @@ row_compressor_init(RowCompressor *row_compressor, TupleDesc uncompressed_tuple_

if (compression_info->orderby_column_index > 0)
{
char *segment_min_col_name = compression_column_segment_min_name(compression_info);
char *segment_max_col_name = compression_column_segment_max_name(compression_info);
int16 index = compression_info->orderby_column_index;
char *segment_min_col_name = column_segment_min_name(index);
char *segment_max_col_name = column_segment_max_name(index);
AttrNumber segment_min_attr_number =
get_attnum(compressed_table->rd_id, segment_min_col_name);
AttrNumber segment_max_attr_number =
Expand Down Expand Up @@ -1970,22 +1971,23 @@ build_scankeys(int32 hypertable_id, Oid hypertable_relid, RowDecompressor *decom
}
if (COMPRESSIONCOL_IS_ORDER_BY(fd))
{
int16 index = fd->orderby_column_index;
/* Cannot optimize orderby columns with NULL values since those
* are not visible in metadata
*/
if (isnull)
continue;

key_index = create_segment_filter_scankey(decompressor,
compression_column_segment_min_name(fd),
column_segment_min_name(index),
BTLessEqualStrategyNumber,
scankeys,
key_index,
null_columns,
value,
false); /* is_null_check */
key_index = create_segment_filter_scankey(decompressor,
compression_column_segment_max_name(fd),
column_segment_max_name(index),
BTGreaterEqualStrategyNumber,
scankeys,
key_index,
Expand Down Expand Up @@ -2543,13 +2545,12 @@ ts_fuzz_compression(PG_FUNCTION_ARGS)
#endif

#if PG14_GE
static SegmentFilter *
add_filter_column_strategy(char *column_name, StrategyNumber strategy, Const *value,
bool is_null_check)
static BatchFilter *
make_batchfilter(char *column_name, StrategyNumber strategy, Const *value, bool is_null_check)
{
SegmentFilter *segment_filter = palloc0(sizeof(*segment_filter));
BatchFilter *segment_filter = palloc0(sizeof(*segment_filter));

*segment_filter = (SegmentFilter){
*segment_filter = (BatchFilter){
.strategy = strategy,
.value = value,
.is_null_check = is_null_check,
Expand Down Expand Up @@ -2643,7 +2644,7 @@ fix_and_reorder_index_filters(Relation comp_chunk_rel, Relation index_rel,
forboth (lp, segmentby_predicates, lf, index_filters)
{
Node *node = lfirst(lp);
SegmentFilter *sf = lfirst(lf);
BatchFilter *sf = lfirst(lf);

if (node == NULL)
continue;
Expand Down Expand Up @@ -2788,7 +2789,7 @@ find_matching_index(Relation comp_chunk_rel, List *index_filters)
{
AttrNumber attnum = index_rel->rd_index->indkey.values[i];
char *attname = get_attname(RelationGetRelid(comp_chunk_rel), attnum, false);
SegmentFilter *sf = lfirst(li);
BatchFilter *sf = lfirst(li);
/* ensure column exists in index relation */
if (!strcmp(attname, sf->column_name.data))
{
Expand Down Expand Up @@ -2877,59 +2878,55 @@ fill_predicate_context(Chunk *ch, List *predicates, List **filters, List **index
{
/* save segment by column name and its corresponding value specified in
* WHERE */
*index_filters =
lappend(*index_filters,
add_filter_column_strategy(column_name,
op_strategy,
arg_value,
false)); /* is_null_check */
*index_filters = lappend(*index_filters,
make_batchfilter(column_name,
op_strategy,
arg_value,
false)); /* is_null_check */
*segmentby_predicates = lappend(*segmentby_predicates, node);
}
}
}
else if (COMPRESSIONCOL_IS_ORDER_BY(fd))
{
int16 index = fd->orderby_column_index;
switch (op_strategy)
{
case BTEqualStrategyNumber:
{
/* orderby col = value implies min <= value and max >= value */
*filters = lappend(
*filters,
add_filter_column_strategy(compression_column_segment_min_name(fd),
BTLessEqualStrategyNumber,
arg_value,
false)); /* is_null_check */
*filters = lappend(
*filters,
add_filter_column_strategy(compression_column_segment_max_name(fd),
BTGreaterEqualStrategyNumber,
arg_value,
false)); /* is_null_check */
*filters = lappend(*filters,
make_batchfilter(column_segment_min_name(index),
BTLessEqualStrategyNumber,
arg_value,
false)); /* is_null_check */
*filters = lappend(*filters,
make_batchfilter(column_segment_max_name(index),
BTGreaterEqualStrategyNumber,
arg_value,
false)); /* is_null_check */
}
break;
case BTLessStrategyNumber:
case BTLessEqualStrategyNumber:
{
/* orderby col <[=] value implies min <[=] value */
*filters = lappend(
*filters,
add_filter_column_strategy(compression_column_segment_min_name(fd),
op_strategy,
arg_value,
false)); /* is_null_check */
*filters = lappend(*filters,
make_batchfilter(column_segment_min_name(index),
op_strategy,
arg_value,
false)); /* is_null_check */
}
break;
case BTGreaterStrategyNumber:
case BTGreaterEqualStrategyNumber:
{
/* orderby col >[=] value implies max >[=] value */
*filters = lappend(
*filters,
add_filter_column_strategy(compression_column_segment_max_name(fd),
op_strategy,
arg_value,
false)); /* is_null_check */
*filters = lappend(*filters,
make_batchfilter(column_segment_max_name(index),
op_strategy,
arg_value,
false)); /* is_null_check */
}
}
}
Expand All @@ -2949,12 +2946,11 @@ fill_predicate_context(Chunk *ch, List *predicates, List **filters, List **index
ts_hypertable_compression_get_by_pkey(ch->fd.hypertable_id, column_name);
if (COMPRESSIONCOL_IS_SEGMENT_BY(fd))
{
*index_filters =
lappend(*index_filters,
add_filter_column_strategy(column_name,
InvalidStrategy,
NULL,
true)); /* is_null_check */
*index_filters = lappend(*index_filters,
make_batchfilter(column_name,
InvalidStrategy,
NULL,
true)); /* is_null_check */
*segmentby_predicates = lappend(*segmentby_predicates, node);
if (ntest->nulltesttype == IS_NULL)
*is_null = lappend_int(*is_null, 1);
Expand Down Expand Up @@ -2986,7 +2982,7 @@ build_update_delete_scankeys(RowDecompressor *decompressor, List *filters, int *
Bitmapset **null_columns)
{
ListCell *lc;
SegmentFilter *filter;
BatchFilter *filter;
int key_index = 0;

ScanKeyData *scankeys = palloc0(filters->length * sizeof(ScanKeyData));
Expand Down Expand Up @@ -3288,7 +3284,7 @@ decompress_batches_for_update_delete(HypertableModifyState *ht_state, Chunk *chu
Relation comp_chunk_rel;
Chunk *comp_chunk;
RowDecompressor decompressor;
SegmentFilter *filter;
BatchFilter *filter;

bool chunk_status_changed = false;
ScanKeyData *scankeys = NULL;
Expand Down
10 changes: 7 additions & 3 deletions tsl/src/compression/compression.h
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,12 @@ typedef struct RowCompressor
int insert_options;
} RowCompressor;

/* SegmentFilter is used for filtering segments based on qualifiers */
typedef struct SegmentFilter
/*
* BatchFilter is used for filtering batches before decompressing.
* The columns will either be segmentby columns or the corresponding
* metadata columns of orderby columns.
*/
typedef struct BatchFilter
{
/* Column which we use for filtering */
NameData column_name;
Expand All @@ -279,7 +283,7 @@ typedef struct SegmentFilter
Const *value;
/* IS NULL or IS NOT NULL */
bool is_null_check;
} SegmentFilter;
} BatchFilter;

extern Datum tsl_compressed_data_decompress_forward(PG_FUNCTION_ARGS);
extern Datum tsl_compressed_data_decompress_reverse(PG_FUNCTION_ARGS);
Expand Down
35 changes: 11 additions & 24 deletions tsl/src/compression/create.c
Original file line number Diff line number Diff line change
Expand Up @@ -137,18 +137,6 @@ column_segment_max_name(int16 column_index)
COMPRESSION_COLUMN_METADATA_MAX_COLUMN_NAME);
}

char *
compression_column_segment_min_name(const FormData_hypertable_compression *fd)
{
return column_segment_min_name(fd->orderby_column_index);
}

char *
compression_column_segment_max_name(const FormData_hypertable_compression *fd)
{
return column_segment_max_name(fd->orderby_column_index);
}

static void
compresscolinfo_add_metadata_columns(CompressColInfo *cc, Relation uncompressed_rel)
{
Expand Down Expand Up @@ -178,6 +166,7 @@ compresscolinfo_add_metadata_columns(CompressColInfo *cc, Relation uncompressed_
{
if (cc->col_meta[colno].orderby_column_index > 0)
{
int16 index = cc->col_meta[colno].orderby_column_index;
FormData_hypertable_compression fd = cc->col_meta[colno];
AttrNumber col_attno = get_attnum(uncompressed_rel->rd_id, NameStr(fd.attname));
Form_pg_attribute attr = TupleDescAttr(RelationGetDescr(uncompressed_rel),
Expand All @@ -191,18 +180,16 @@ compresscolinfo_add_metadata_columns(CompressColInfo *cc, Relation uncompressed_
errdetail("Could not identify a less-than operator for the type.")));

/* segment_meta min and max columns */
cc->coldeflist =
lappend(cc->coldeflist,
makeColumnDef(compression_column_segment_min_name(&cc->col_meta[colno]),
attr->atttypid,
-1 /* typemod */,
0 /*collation*/));
cc->coldeflist =
lappend(cc->coldeflist,
makeColumnDef(compression_column_segment_max_name(&cc->col_meta[colno]),
attr->atttypid,
-1 /* typemod */,
0 /*collation*/));
cc->coldeflist = lappend(cc->coldeflist,
makeColumnDef(column_segment_min_name(index),
attr->atttypid,
-1 /* typemod */,
0 /*collation*/));
cc->coldeflist = lappend(cc->coldeflist,
makeColumnDef(column_segment_max_name(index),
attr->atttypid,
-1 /* typemod */,
0 /*collation*/));
}
}
}
Expand Down
3 changes: 0 additions & 3 deletions tsl/src/compression/create.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,6 @@ void tsl_process_compress_table_drop_column(Hypertable *ht, char *name);
void tsl_process_compress_table_rename_column(Hypertable *ht, const RenameStmt *stmt);
Chunk *create_compress_chunk(Hypertable *compress_ht, Chunk *src_chunk, Oid table_id);

char *compression_column_segment_min_name(const FormData_hypertable_compression *fd);
char *compression_column_segment_max_name(const FormData_hypertable_compression *fd);

char *column_segment_min_name(int16 column_index);
char *column_segment_max_name(int16 column_index);

Expand Down
7 changes: 3 additions & 4 deletions tsl/src/nodes/decompress_chunk/decompress_chunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -1091,15 +1091,14 @@ compressed_rel_setup_reltarget(RelOptInfo *compressed_rel, CompressionInfo *info
/* if the column is an orderby, add it's metadata columns too */
if (column_info->orderby_column_index > 0)
{
int16 index = column_info->orderby_column_index;
compressed_reltarget_add_var_for_column(compressed_rel,
compressed_relid,
compression_column_segment_min_name(
column_info),
column_segment_min_name(index),
&attrs_used);
compressed_reltarget_add_var_for_column(compressed_rel,
compressed_relid,
compression_column_segment_max_name(
column_info),
column_segment_max_name(index),
&attrs_used);
}
}
Expand Down
20 changes: 10 additions & 10 deletions tsl/src/nodes/decompress_chunk/qual_pushdown.c
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,9 @@ make_segment_meta_opexpr(QualPushdownContext *context, Oid opno, AttrNumber meta
}

static AttrNumber
get_segment_meta_min_attr_number(FormData_hypertable_compression *compression_info,
Oid compressed_relid)
get_segment_meta_min_attr_number(int16 orderby_column_index, Oid compressed_relid)
{
char *meta_col_name = compression_column_segment_min_name(compression_info);
char *meta_col_name = column_segment_min_name(orderby_column_index);

if (meta_col_name == NULL)
elog(ERROR, "could not find meta column");
Expand All @@ -140,10 +139,9 @@ get_segment_meta_min_attr_number(FormData_hypertable_compression *compression_in
}

static AttrNumber
get_segment_meta_max_attr_number(FormData_hypertable_compression *compression_info,
Oid compressed_relid)
get_segment_meta_max_attr_number(int16 orderby_column_index, Oid compressed_relid)
{
char *meta_col_name = compression_column_segment_max_name(compression_info);
char *meta_col_name = column_segment_max_name(orderby_column_index);

if (meta_col_name == NULL)
elog(ERROR, "could not find meta column");
Expand Down Expand Up @@ -224,6 +222,8 @@ pushdown_op_to_segment_meta_min_max(QualPushdownContext *context, List *expr_arg
else
return NULL;

int16 index = compression_info->orderby_column_index;

/* May be able to allow non-strict operations as well.
* Next steps: Think through edge cases, either allow and write tests or figure out why we must
* block strict operations
Expand Down Expand Up @@ -267,15 +267,15 @@ pushdown_op_to_segment_meta_min_max(QualPushdownContext *context, List *expr_arg
return make_andclause(list_make2(
make_segment_meta_opexpr(context,
opno_le,
get_segment_meta_min_attr_number(compression_info,
get_segment_meta_min_attr_number(index,
context->compressed_rte
->relid),
var_with_segment_meta,
expr,
BTLessEqualStrategyNumber),
make_segment_meta_opexpr(context,
opno_ge,
get_segment_meta_max_attr_number(compression_info,
get_segment_meta_max_attr_number(index,
context->compressed_rte
->relid),
var_with_segment_meta,
Expand All @@ -295,7 +295,7 @@ pushdown_op_to_segment_meta_min_max(QualPushdownContext *context, List *expr_arg
return (Expr *)
make_segment_meta_opexpr(context,
opno,
get_segment_meta_min_attr_number(compression_info,
get_segment_meta_min_attr_number(index,
context
->compressed_rte
->relid),
Expand All @@ -317,7 +317,7 @@ pushdown_op_to_segment_meta_min_max(QualPushdownContext *context, List *expr_arg
return (Expr *)
make_segment_meta_opexpr(context,
opno,
get_segment_meta_max_attr_number(compression_info,
get_segment_meta_max_attr_number(index,
context
->compressed_rte
->relid),
Expand Down