Skip to content

Commit

Permalink
fix splitStatistics in source too
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi committed Sep 23, 2024
1 parent 3e03a03 commit efbe9f0
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.paimon.flink.lookup.LookupRuntimeProviderFactory;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.DataTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.utils.Projection;

Expand Down Expand Up @@ -198,7 +198,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
}

private ScanRuntimeProvider createCountStarScan() {
long count = ((FileStoreTable) table).newSnapshotReader().withFilter(predicate).rowCount();
long count = ((DataTable) table).newSnapshotReader().withFilter(predicate).rowCount();
NumberSequenceRowSource source = new NumberSequenceRowSource(count, count);
return new SourceProvider() {
@Override
Expand Down Expand Up @@ -254,16 +254,16 @@ public boolean applyAggregates(
return false;
}

if (!table.primaryKeys().isEmpty()) {
if (!(table instanceof DataTable)) {
return false;
}

CoreOptions options = CoreOptions.fromMap(table.options());
if (options.deletionVectorsEnabled()) {
if (!table.primaryKeys().isEmpty()) {
return false;
}

if (!(table instanceof FileStoreTable)) {
CoreOptions options = ((DataTable) table).coreOptions();
if (options.deletionVectorsEnabled()) {
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.LogicalTypeConversion;
import org.apache.paimon.flink.PredicateConverter;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.PartitionPredicateVisitor;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.predicate.PredicateVisitor;
import org.apache.paimon.table.DataTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.Split;

Expand Down Expand Up @@ -169,9 +171,28 @@ protected Integer inferSourceParallelism(StreamExecutionEnvironment env) {

protected void scanSplitsForInference() {
if (splitStatistics == null) {
List<Split> splits =
table.newReadBuilder().withFilter(predicate).newScan().plan().splits();
splitStatistics = new SplitStatistics(splits);
if (table instanceof DataTable) {
List<PartitionEntry> partitionEntries =
((DataTable) table)
.newSnapshotReader()
.withFilter(predicate)
.partitionEntries();
long totalSize = 0;
long rowCount = 0;
for (PartitionEntry entry : partitionEntries) {
totalSize += entry.fileSizeInBytes();
rowCount += entry.recordCount();
}
long splitTargetSize = ((DataTable) table).coreOptions().splitTargetSize();
splitStatistics =
new SplitStatistics((int) (totalSize / splitTargetSize + 1), rowCount);
} else {
List<Split> splits =
table.newReadBuilder().withFilter(predicate).newScan().plan().splits();
splitStatistics =
new SplitStatistics(
splits.size(), splits.stream().mapToLong(Split::rowCount).sum());
}
}
}

Expand All @@ -181,9 +202,9 @@ protected static class SplitStatistics {
private final int splitNumber;
private final long totalRowCount;

protected SplitStatistics(List<Split> splits) {
this.splitNumber = splits.size();
this.totalRowCount = splits.stream().mapToLong(Split::rowCount).sum();
protected SplitStatistics(int splitNumber, long totalRowCount) {
this.splitNumber = splitNumber;
this.totalRowCount = totalRowCount;
}

public int splitNumber() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -552,11 +552,11 @@ public void testCountStarAppend() {
@Test
public void testCountStarPartAppend() {
sql("CREATE TABLE count_part_append (f0 INT, f1 STRING, dt STRING) PARTITIONED BY (dt)");
sql("INSERT INTO count_part_append VALUES (1, 'a', '1'), (2, 'b', '2')");
sql("INSERT INTO count_part_append VALUES (1, 'a', '1'), (1, 'a', '1'), (2, 'b', '2')");
String sql = "SELECT COUNT(*) FROM count_part_append WHERE dt = '1'";
assertThat(sql(sql)).containsOnly(Row.of(1L));
// TODO wait Flink SQL fixing this bug
// validateCount1PushDown(sql);

assertThat(sql(sql)).containsOnly(Row.of(2L));
validateCount1PushDown(sql);
}

@Test
Expand Down

0 comments on commit efbe9f0

Please sign in to comment.