Skip to content

Commit

Permalink
[flink] Fix partition filter mistch the column index (apache#4079)
Browse files Browse the repository at this point in the history
  • Loading branch information
leaves12138 authored Aug 28, 2024
1 parent fb862df commit 99db1b7
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ public String[] call(
p ->
PredicateBuilder.partition(
p,
table.rowType(),
((FileStoreTable) table)
.schema()
.logicalPartitionType(),
CoreOptions.PARTITION_DEFAULT_NAME
.defaultValue()))
.toArray(Predicate[]::new));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,56 @@ public void testFileIndexProcedureSchemaEvolution() throws Exception {
Assertions.assertThat(count.get()).isEqualTo(0);
}

@Test
public void testPartitionFilter() throws Exception {
sql(
"CREATE TABLE T ("
+ " k INT,"
+ " v STRING,"
+ " hh INT,"
+ " dt STRING"
+ ") PARTITIONED BY (dt, hh) WITH ("
+ " 'write-only' = 'true',"
+ " 'file.format' = 'avro',"
+ " 'bucket' = '-1'"
+ ")");

sql(
"INSERT INTO T VALUES (1, '10', 15, '20221208'), (4, '100', 16, '20221208'), (5, '1000', 15, '20221209')");

sql(
"INSERT INTO T VALUES (1, '10', 15, '20221208'), (4, '100', 16, '20221208'), (5, '1000', 15, '20221209')");

FileStoreTable table = paimonTable("T");

Predicate predicateK = new PredicateBuilder(table.rowType()).equal(0, 2);
Predicate predicateV =
new PredicateBuilder(table.rowType()).equal(1, BinaryString.fromString("101"));
RecordReader<InternalRow> reader =
table.newRead()
.withFilter(PredicateBuilder.and(predicateK, predicateV))
.createReader(table.newScan().plan());
AtomicInteger count = new AtomicInteger(0);
reader.forEachRemaining(r -> count.incrementAndGet());

// parquet format predicate would not reduce record read from file
Assertions.assertThat(count.get()).isEqualTo(6);

tEnv.getConfig().set(TableConfigOptions.TABLE_DML_SYNC, true);
sql("ALTER TABLE T SET ('file-index.bloom-filter.columns'='order_id,v')");
sql("CALL sys.rewrite_file_index('default.T', 'dt=20221208')");

reader =
table.newRead()
.withFilter(PredicateBuilder.and(predicateK, predicateV))
.createReader(table.newScan().plan());
count.set(0);
reader.forEachRemaining(r -> count.incrementAndGet());

// only partition 20221208 is filtered.
Assertions.assertThat(count.get()).isEqualTo(2);
}

@Test
public void testFileIndexProcedureDropIndex() throws Exception {
sql(
Expand Down

0 comments on commit 99db1b7

Please sign in to comment.