From e0c82990b1112c02d59b23f061f90c8b70a81a5f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Wed, 18 Sep 2024 11:43:20 +0800 Subject: [PATCH] [core] System files table distributed execute remould --- .../paimon/table/system/FilesTable.java | 38 ++++++++++++++++--- .../paimon/table/system/FilesTableTest.java | 3 +- 2 files changed, 34 insertions(+), 7 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java index 99a9298d3324..7d61f5d5641f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java @@ -18,6 +18,7 @@ package org.apache.paimon.table.system; +import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalArray; @@ -75,6 +76,7 @@ import java.util.TreeMap; import java.util.function.Function; import java.util.function.Supplier; +import java.util.stream.Collectors; import static org.apache.paimon.catalog.Catalog.SYSTEM_TABLE_SPLITTER; @@ -134,7 +136,7 @@ public List primaryKeys() { @Override public InnerTableScan newScan() { - return new FilesScan(); + return new FilesScan(storeTable.newScan().listPartitions()); } @Override @@ -154,6 +156,12 @@ private static class FilesScan extends ReadOnceTableScan { @Nullable private LeafPredicate bucketPredicate; @Nullable private LeafPredicate levelPredicate; + private final List partitions; + + public FilesScan(List partitions) { + this.partitions = partitions; + } + @Override public InnerTableScan withFilter(Predicate pushdown) { if (pushdown == null) { @@ -170,9 +178,24 @@ public InnerTableScan withFilter(Predicate pushdown) { @Override public Plan innerPlan() { - return () -> - Collections.singletonList( - new FilesSplit(partitionPredicate, bucketPredicate, levelPredicate)); + + if (partitionPredicate != null) { + return () -> + Collections.singletonList( + new FilesSplit( + partitionPredicate, bucketPredicate, levelPredicate, null)); + } else { + return () -> + partitions.stream() + .map( + p -> + new FilesSplit( + partitionPredicate, + bucketPredicate, + levelPredicate, + p)) + .collect(Collectors.toList()); + } } } @@ -181,14 +204,17 @@ private static class FilesSplit extends SingletonSplit { @Nullable private final LeafPredicate partitionPredicate; @Nullable private final LeafPredicate bucketPredicate; @Nullable private final LeafPredicate levelPredicate; + @Nullable private final BinaryRow partition; private FilesSplit( @Nullable LeafPredicate partitionPredicate, @Nullable LeafPredicate bucketPredicate, - @Nullable LeafPredicate levelPredicate) { + @Nullable LeafPredicate levelPredicate, + @Nullable BinaryRow partition) { this.partitionPredicate = partitionPredicate; this.bucketPredicate = bucketPredicate; this.levelPredicate = levelPredicate; + this.partition = partition; } @Override @@ -237,6 +263,8 @@ private TableScan.Plan tablePlan(FileStoreTable storeTable) { scan.withPartitionFilter(partSpec); } // TODO support range? + } else if (partition != null) { + scan.withPartitionFilter(Collections.singletonList(partition)); } if (bucketPredicate != null) { scan.withBucketFilter( diff --git a/paimon-core/src/test/java/org/apache/paimon/table/system/FilesTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/system/FilesTableTest.java index 0d7a8b497c57..89fb201faba2 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/system/FilesTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/system/FilesTableTest.java @@ -159,8 +159,7 @@ public void testReadFilesFromSpecifiedSnapshot() throws Exception { } @Test - public void testReadFilesFromNotExistSnapshot() throws Exception { - + public void testReadFilesFromNotExistSnapshot() { filesTable = (FilesTable) filesTable.copy(