From efbe9f0046d0f3b2cca3ddf1b13d1c033c846644 Mon Sep 17 00:00:00 2001 From: Jingsong Date: Mon, 23 Sep 2024 18:38:00 +0800 Subject: [PATCH] fix splitStatistics in source too --- .../flink/source/BaseDataTableSource.java | 12 +++---- .../paimon/flink/source/FlinkTableSource.java | 33 +++++++++++++++---- .../paimon/flink/BatchFileStoreITCase.java | 8 ++--- 3 files changed, 37 insertions(+), 16 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java index 3a5c5baec643d..e2613aaee8939 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java @@ -30,7 +30,7 @@ import org.apache.paimon.flink.lookup.LookupRuntimeProviderFactory; import org.apache.paimon.options.Options; import org.apache.paimon.predicate.Predicate; -import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.DataTable; import org.apache.paimon.table.Table; import org.apache.paimon.utils.Projection; @@ -198,7 +198,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { } private ScanRuntimeProvider createCountStarScan() { - long count = ((FileStoreTable) table).newSnapshotReader().withFilter(predicate).rowCount(); + long count = ((DataTable) table).newSnapshotReader().withFilter(predicate).rowCount(); NumberSequenceRowSource source = new NumberSequenceRowSource(count, count); return new SourceProvider() { @Override @@ -254,16 +254,16 @@ public boolean applyAggregates( return false; } - if (!table.primaryKeys().isEmpty()) { + if (!(table instanceof DataTable)) { return false; } - CoreOptions options = CoreOptions.fromMap(table.options()); - if (options.deletionVectorsEnabled()) { + if (!table.primaryKeys().isEmpty()) { return false; } - if (!(table instanceof FileStoreTable)) { + CoreOptions options = ((DataTable) table).coreOptions(); + if (options.deletionVectorsEnabled()) { return false; } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java index a804591dc526d..9e89a2e99f347 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java @@ -22,11 +22,13 @@ import org.apache.paimon.flink.FlinkConnectorOptions; import org.apache.paimon.flink.LogicalTypeConversion; import org.apache.paimon.flink.PredicateConverter; +import org.apache.paimon.manifest.PartitionEntry; import org.apache.paimon.options.Options; import org.apache.paimon.predicate.PartitionPredicateVisitor; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.predicate.PredicateBuilder; import org.apache.paimon.predicate.PredicateVisitor; +import org.apache.paimon.table.DataTable; import org.apache.paimon.table.Table; import org.apache.paimon.table.source.Split; @@ -169,9 +171,28 @@ protected Integer inferSourceParallelism(StreamExecutionEnvironment env) { protected void scanSplitsForInference() { if (splitStatistics == null) { - List splits = - table.newReadBuilder().withFilter(predicate).newScan().plan().splits(); - splitStatistics = new SplitStatistics(splits); + if (table instanceof DataTable) { + List partitionEntries = + ((DataTable) table) + .newSnapshotReader() + .withFilter(predicate) + .partitionEntries(); + long totalSize = 0; + long rowCount = 0; + for (PartitionEntry entry : partitionEntries) { + totalSize += entry.fileSizeInBytes(); + rowCount += entry.recordCount(); + } + long splitTargetSize = ((DataTable) table).coreOptions().splitTargetSize(); + splitStatistics = + new SplitStatistics((int) (totalSize / splitTargetSize + 1), rowCount); + } else { + List splits = + table.newReadBuilder().withFilter(predicate).newScan().plan().splits(); + splitStatistics = + new SplitStatistics( + splits.size(), splits.stream().mapToLong(Split::rowCount).sum()); + } } } @@ -181,9 +202,9 @@ protected static class SplitStatistics { private final int splitNumber; private final long totalRowCount; - protected SplitStatistics(List splits) { - this.splitNumber = splits.size(); - this.totalRowCount = splits.stream().mapToLong(Split::rowCount).sum(); + protected SplitStatistics(int splitNumber, long totalRowCount) { + this.splitNumber = splitNumber; + this.totalRowCount = totalRowCount; } public int splitNumber() { diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java index 605aa72b79b2c..ef0ec6dc15a2d 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java @@ -552,11 +552,11 @@ public void testCountStarAppend() { @Test public void testCountStarPartAppend() { sql("CREATE TABLE count_part_append (f0 INT, f1 STRING, dt STRING) PARTITIONED BY (dt)"); - sql("INSERT INTO count_part_append VALUES (1, 'a', '1'), (2, 'b', '2')"); + sql("INSERT INTO count_part_append VALUES (1, 'a', '1'), (1, 'a', '1'), (2, 'b', '2')"); String sql = "SELECT COUNT(*) FROM count_part_append WHERE dt = '1'"; - assertThat(sql(sql)).containsOnly(Row.of(1L)); - // TODO wait Flink SQL fixing this bug - // validateCount1PushDown(sql); + + assertThat(sql(sql)).containsOnly(Row.of(2L)); + validateCount1PushDown(sql); } @Test