From 6157c1cb54bb8f1209c4a3499fe28b53d2317a82 Mon Sep 17 00:00:00 2001 From: herefree <841043203@qq.com> Date: Mon, 2 Sep 2024 15:28:10 +0800 Subject: [PATCH] Optimize tags system table when specifying tag_name --- .../apache/paimon/table/system/TagsTable.java | 40 ++++++++++++++++--- .../org/apache/paimon/utils/TagManager.java | 10 +++++ .../paimon/flink/CatalogTableITCase.java | 5 +++ 3 files changed, 49 insertions(+), 6 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java index d92876e4c0f0..c5fcfe06b6a0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java @@ -26,6 +26,9 @@ import org.apache.paimon.disk.IOManager; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; +import org.apache.paimon.predicate.Equal; +import org.apache.paimon.predicate.LeafPredicate; +import org.apache.paimon.predicate.LeafPredicateExtractor; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.table.FileStoreTable; @@ -51,6 +54,8 @@ import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators; +import javax.annotation.Nullable; + import java.util.Arrays; import java.util.Collections; import java.util.Iterator; @@ -129,16 +134,23 @@ public Table copy(Map dynamicOptions) { } private class TagsScan extends ReadOnceTableScan { + private @Nullable LeafPredicate tagName; @Override public InnerTableScan withFilter(Predicate predicate) { + if (predicate == null) { + return this; + } // TODO + Map leafPredicates = + predicate.visit(LeafPredicateExtractor.INSTANCE); + tagName = leafPredicates.get("tag_name"); return this; } @Override public Plan innerPlan() { - return () -> Collections.singletonList(new TagsSplit(location)); + return () -> Collections.singletonList(new TagsSplit(location, tagName)); } } @@ -148,8 +160,11 @@ private static class TagsSplit extends SingletonSplit { private final Path location; - private TagsSplit(Path location) { + private final @Nullable LeafPredicate tagName; + + private TagsSplit(Path location, @Nullable LeafPredicate tagName) { this.location = location; + this.tagName = tagName; } @Override @@ -161,7 +176,7 @@ public boolean equals(Object o) { return false; } TagsSplit that = (TagsSplit) o; - return Objects.equals(location, that.location); + return Objects.equals(location, that.location) && Objects.equals(tagName, that.tagName); } @Override @@ -202,10 +217,23 @@ public RecordReader createReader(Split split) { throw new IllegalArgumentException("Unsupported split: " + split.getClass()); } Path location = ((TagsSplit) split).location; - List> tags = new TagManager(fileIO, location, branch).tagObjects(); + LeafPredicate predicate = ((TagsSplit) split).tagName; + TagManager tagManager = new TagManager(fileIO, location, branch); + Map nameToSnapshot = new LinkedHashMap<>(); - for (Pair tag : tags) { - nameToSnapshot.put(tag.getValue(), tag.getKey()); + + if (predicate != null + && predicate.function() instanceof Equal + && predicate.literals().get(0) instanceof BinaryString) { + String equalValue = predicate.literals().get(0).toString(); + Tag tag = tagManager.tag(equalValue); + if (tag != null) { + nameToSnapshot.put(equalValue, tag); + } + } else { + for (Pair tag : tagManager.tagObjects()) { + nameToSnapshot.put(tag.getValue(), tag.getKey()); + } } Iterator rows = diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java index 2833ca33cdf1..d9ef134018a6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java @@ -35,6 +35,7 @@ import java.io.FileNotFoundException; import java.io.IOException; +import java.io.UncheckedIOException; import java.time.Duration; import java.time.LocalDateTime; import java.util.ArrayList; @@ -382,4 +383,13 @@ private int findIndex(Snapshot taggedSnapshot, List taggedSnapshots) { "Didn't find tag with snapshot id '%s'.This is unexpected.", taggedSnapshot.id())); } + + /** Read tag for tagName. */ + public Tag tag(String tagName) { + try { + return Tag.safelyFromPath(fileIO, tagPath(tagName)); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java index e1bbcbcf2382..3785c3db867c 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java @@ -812,6 +812,11 @@ public void testTagsTable() throws Exception { "SELECT tag_name, snapshot_id, schema_id, record_count FROM T$tags ORDER BY tag_name"); assertThat(result).containsExactly(Row.of("tag1", 1L, 0L, 1L), Row.of("tag2", 2L, 0L, 2L)); + + result = + sql( + "SELECT tag_name, snapshot_id, schema_id, record_count FROM T$tags where tag_name = 'tag1' "); + assertThat(result).containsExactly(Row.of("tag1", 1L, 0L, 1L)); } @Test