Skip to content

Commit

Permalink
[core] System files table distributed execute remould
Browse files Browse the repository at this point in the history
  • Loading branch information
leaves12138 committed Sep 19, 2024
1 parent a4c3e3c commit e0c8299
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.table.system;

import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalArray;
Expand Down Expand Up @@ -75,6 +76,7 @@
import java.util.TreeMap;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static org.apache.paimon.catalog.Catalog.SYSTEM_TABLE_SPLITTER;

Expand Down Expand Up @@ -134,7 +136,7 @@ public List<String> primaryKeys() {

@Override
public InnerTableScan newScan() {
return new FilesScan();
return new FilesScan(storeTable.newScan().listPartitions());
}

@Override
Expand All @@ -154,6 +156,12 @@ private static class FilesScan extends ReadOnceTableScan {
@Nullable private LeafPredicate bucketPredicate;
@Nullable private LeafPredicate levelPredicate;

private final List<BinaryRow> partitions;

public FilesScan(List<BinaryRow> partitions) {
this.partitions = partitions;
}

@Override
public InnerTableScan withFilter(Predicate pushdown) {
if (pushdown == null) {
Expand All @@ -170,9 +178,24 @@ public InnerTableScan withFilter(Predicate pushdown) {

@Override
public Plan innerPlan() {
return () ->
Collections.singletonList(
new FilesSplit(partitionPredicate, bucketPredicate, levelPredicate));

if (partitionPredicate != null) {
return () ->
Collections.singletonList(
new FilesSplit(
partitionPredicate, bucketPredicate, levelPredicate, null));
} else {
return () ->
partitions.stream()
.map(
p ->
new FilesSplit(
partitionPredicate,
bucketPredicate,
levelPredicate,
p))
.collect(Collectors.toList());
}
}
}

Expand All @@ -181,14 +204,17 @@ private static class FilesSplit extends SingletonSplit {
@Nullable private final LeafPredicate partitionPredicate;
@Nullable private final LeafPredicate bucketPredicate;
@Nullable private final LeafPredicate levelPredicate;
@Nullable private final BinaryRow partition;

private FilesSplit(
@Nullable LeafPredicate partitionPredicate,
@Nullable LeafPredicate bucketPredicate,
@Nullable LeafPredicate levelPredicate) {
@Nullable LeafPredicate levelPredicate,
@Nullable BinaryRow partition) {
this.partitionPredicate = partitionPredicate;
this.bucketPredicate = bucketPredicate;
this.levelPredicate = levelPredicate;
this.partition = partition;
}

@Override
Expand Down Expand Up @@ -237,6 +263,8 @@ private TableScan.Plan tablePlan(FileStoreTable storeTable) {
scan.withPartitionFilter(partSpec);
}
// TODO support range?
} else if (partition != null) {
scan.withPartitionFilter(Collections.singletonList(partition));
}
if (bucketPredicate != null) {
scan.withBucketFilter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,7 @@ public void testReadFilesFromSpecifiedSnapshot() throws Exception {
}

@Test
public void testReadFilesFromNotExistSnapshot() throws Exception {

public void testReadFilesFromNotExistSnapshot() {
filesTable =
(FilesTable)
filesTable.copy(
Expand Down

0 comments on commit e0c8299

Please sign in to comment.