From 307b1a436555a93a6b1341e606684189e63d2dff Mon Sep 17 00:00:00 2001 From: Xiangyu Feng Date: Wed, 12 Feb 2025 19:42:55 +0800 Subject: [PATCH] [core] Add late-arrival threshold in ForceUpLevel0Compaction for disordered updates --- .../java/org/apache/paimon/CoreOptions.java | 14 ++- .../compact/ForceUpLevel0Compaction.java | 54 ++++++++- .../compact/TimeAwareCompaction.java | 76 ------------- .../operation/KeyValueFileStoreWrite.java | 17 ++- .../partition/PartitionTimeExtractor.java | 1 + .../compact/ForceUpLevel0CompactionTest.java | 52 +++++++++ .../operation/KeyValueFileStoreWriteTest.java | 107 ++++++++++++++++++ 7 files changed, 229 insertions(+), 92 deletions(-) delete mode 100644 paimon-core/src/main/java/org/apache/paimon/mergetree/compact/TimeAwareCompaction.java create mode 100644 paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreWriteTest.java diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index 33a8552dc5e4..21ff57609ec7 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -41,6 +41,7 @@ import org.apache.paimon.utils.StringUtils; import javax.annotation.Nullable; + import java.io.Serializable; import java.lang.reflect.Field; import java.time.Duration; @@ -624,12 +625,15 @@ public class CoreOptions implements Serializable { text("Default value of Bucketed Append Table is '5'.")) .build()); - public static final ConfigOption LATE_ARRIVED_THRESHOLD = - key("compaction.late-arrived-threshold") + public static final ConfigOption LATE_ARRIVAL_THRESHOLD = + key("compaction.late-arrival-threshold") .durationType() .noDefaultValue() .withDescription( - "The threshold of partitioned pk table to differentiate late arrived data. Late arrived data can be configured with a less frequent compaction strategy to sacrifice timeliness for overall resource usage saving. This option is only valid for a partitioned pk table when needLookup() is true."); + "The threshold of partitioned pk table to differentiate late arrived data. " + + "Late arrived data can be configured with a less frequent compaction " + + "strategy to sacrifice timeliness for overall resource usage saving. " + + "This option is only valid for a partitioned pk table when needLookup() is true."); public static final ConfigOption CHANGELOG_PRODUCER = key("changelog-producer") @@ -2080,8 +2084,8 @@ public Optional compactionMaxFileNum() { return options.getOptional(COMPACTION_MAX_FILE_NUM); } - public Optional lateArrivedThreshold() { - return options.getOptional(LATE_ARRIVED_THRESHOLD); + public Optional lateArrivalThreshold() { + return options.getOptional(LATE_ARRIVAL_THRESHOLD); } public long dynamicBucketTargetRowNum() { diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ForceUpLevel0Compaction.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ForceUpLevel0Compaction.java index d3ec39cb67db..ceedf131976e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ForceUpLevel0Compaction.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ForceUpLevel0Compaction.java @@ -18,25 +18,58 @@ package org.apache.paimon.mergetree.compact; +import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.compact.CompactUnit; import org.apache.paimon.mergetree.LevelSortedRun; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; import java.util.List; import java.util.Optional; -/** A {@link CompactStrategy} to force compacting level 0 files. */ +/** + * A {@link CompactStrategy} to force compacting level 0 files. + * + *

Note: This strategy will increase the compaction frequency drastically when updates are + * seriously disordered for partitioned PK table. + * + *

In certain cases, users can actually accept worse data freshness for the late arrived data in + * the historical partitions for lower compaction frequency. This can significantly reduce the + * compaction resource usage. {@link UniversalCompaction} can be used in this case by configuring + * CoreOptions.LATE_ARRIVAL_THRESHOLD. + * + *

Notice: Enabling CoreOptions.LATE_ARRIVAL_THRESHOLD may result in several data files in L0 + * cannot be seen for a long time. This can be fixed with offline Historical Partition Compact. + */ public class ForceUpLevel0Compaction implements CompactStrategy { + private static final Logger LOG = LoggerFactory.getLogger(ForceUpLevel0Compaction.class); + + private final Duration lateArrivalThreshold; + private final LocalDateTime currentPartitionDate; private final UniversalCompaction universal; public ForceUpLevel0Compaction(UniversalCompaction universal) { + this(universal, null, null); + } + + public ForceUpLevel0Compaction( + UniversalCompaction universal, + Duration lateArrivalThreshold, + LocalDateTime currentPartitionDate) { this.universal = universal; + this.lateArrivalThreshold = lateArrivalThreshold; + this.currentPartitionDate = currentPartitionDate; } @Override public Optional pick(int numLevels, List runs) { Optional pick = universal.pick(numLevels, runs); - if (pick.isPresent()) { + if (pick.isPresent() || isLateArrival()) { return pick; } @@ -54,4 +87,21 @@ public Optional pick(int numLevels, List runs) { : Optional.of( universal.pickForSizeRatio(numLevels - 1, runs, candidateCount, true)); } + + @VisibleForTesting + public boolean isLateArrival() { + if (lateArrivalThreshold == null) { + return false; + } + LocalDateTime lateArrivalDate = LocalDateTime.now().minus(lateArrivalThreshold); + // For lateArrivedDate=20250120, any data insert into partitions<=20250120 will be + // considered as late arrived data. + boolean result = !currentPartitionDate.isAfter(lateArrivalDate); + LOG.debug( + "Current partition Date: {}, late arrival Date: {}, late arrival result: {}.", + currentPartitionDate.format(DateTimeFormatter.BASIC_ISO_DATE), + lateArrivalDate.format(DateTimeFormatter.BASIC_ISO_DATE), + result); + return result; + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/TimeAwareCompaction.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/TimeAwareCompaction.java deleted file mode 100644 index 7b0850cc8ecc..000000000000 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/TimeAwareCompaction.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.mergetree.compact; - -import org.apache.paimon.compact.CompactUnit; -import org.apache.paimon.mergetree.LevelSortedRun; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.time.Duration; -import java.time.LocalDateTime; -import java.time.format.DateTimeFormatter; -import java.util.List; -import java.util.Optional; - -/** - * A combined {@link CompactStrategy} that uses different compact strategies for recent partitions - * and historical partitions. - */ -public class TimeAwareCompaction implements CompactStrategy { - - private static final Logger LOG = LoggerFactory.getLogger(TimeAwareCompaction.class); - private final Duration lateArrivedThreshold; - private final CompactStrategy currentCompactStrategy; - private final CompactStrategy lateArrivedCompactStrategy; - private final LocalDateTime currentPartitionDate; - - public TimeAwareCompaction( - Duration lateArrivedThreshold, - CompactStrategy currentCompactStrategy, - CompactStrategy historicalCompactStrategy, - LocalDateTime partitionDate) { - this.lateArrivedThreshold = lateArrivedThreshold; - this.currentCompactStrategy = currentCompactStrategy; - this.lateArrivedCompactStrategy = historicalCompactStrategy; - this.currentPartitionDate = partitionDate; - } - - @Override - public Optional pick(int numLevels, List runs) { - if (isLateArrived()) { - return lateArrivedCompactStrategy.pick(numLevels, runs); - } else { - return currentCompactStrategy.pick(numLevels, runs); - } - } - - private boolean isLateArrived() { - LocalDateTime lateArrivedDate = LocalDateTime.now().minus(lateArrivedThreshold); - // For lateArrivedDate=20250120, any data insert into partitions<=20250120 will be - // considered as late arrived data. - boolean result = !currentPartitionDate.isAfter(lateArrivedDate); - LOG.debug( - "Current partition Date: {}, late arrived Date: {}, late arrived result: {}.", - currentPartitionDate.format(DateTimeFormatter.BASIC_ISO_DATE), - lateArrivedDate.format(DateTimeFormatter.BASIC_ISO_DATE), - result); - return result; - } -} diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java index c5cdcf81789a..bfac2019a7f6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java @@ -60,24 +60,26 @@ import org.apache.paimon.mergetree.compact.MergeFunctionFactory; import org.apache.paimon.mergetree.compact.MergeTreeCompactManager; import org.apache.paimon.mergetree.compact.MergeTreeCompactRewriter; -import org.apache.paimon.mergetree.compact.TimeAwareCompaction; import org.apache.paimon.mergetree.compact.UniversalCompaction; import org.apache.paimon.options.Options; import org.apache.paimon.partition.PartitionTimeExtractor; import org.apache.paimon.schema.KeyValueFieldsExtractor; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; -import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.CommitIncrement; import org.apache.paimon.utils.FieldsComparator; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.UserDefinedSeqComparator; + +import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nullable; + import java.time.Duration; import java.util.Comparator; import java.util.List; @@ -229,18 +231,15 @@ public CompactStrategy createCompactStrategy(CoreOptions options, BinaryRow part options.sortedRunSizeRatio(), options.numSortedRunCompactionTrigger(), options.optimizedCompactionInterval()); - ForceUpLevel0Compaction forceUpLevel0Compaction = - new ForceUpLevel0Compaction(universalCompaction); if (options.needLookup()) { - Optional lateArrivedThreshold = options.lateArrivedThreshold(); + Optional lateArrivedThreshold = options.lateArrivalThreshold(); if (partition.getFieldCount() > 0 && lateArrivedThreshold.isPresent()) { - return new TimeAwareCompaction( - lateArrivedThreshold.get(), - forceUpLevel0Compaction, + return new ForceUpLevel0Compaction( universalCompaction, + lateArrivedThreshold.get(), partitionTimeExtractor.extract(partition, partitionType)); } else { - return forceUpLevel0Compaction; + return new ForceUpLevel0Compaction(universalCompaction); } } else { return universalCompaction; diff --git a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionTimeExtractor.java b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionTimeExtractor.java index 1fc30d6a6baa..9af1789131b7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionTimeExtractor.java +++ b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionTimeExtractor.java @@ -24,6 +24,7 @@ import org.apache.paimon.utils.RowDataToObjectArrayConverter; import javax.annotation.Nullable; + import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/ForceUpLevel0CompactionTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/ForceUpLevel0CompactionTest.java index 07eedc921491..cec99566f538 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/ForceUpLevel0CompactionTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/ForceUpLevel0CompactionTest.java @@ -24,6 +24,8 @@ import org.junit.jupiter.api.Test; +import java.time.Duration; +import java.time.LocalDateTime; import java.util.Arrays; import java.util.Collections; import java.util.Optional; @@ -59,6 +61,56 @@ public void testForceCompaction0() { assertThat(result.get().outputLevel()).isEqualTo(2); } + @Test + public void testForceCompaction0WithLateArrivalThreshold() { + int lateArrivalThresholdDays = 3; + Duration lateArrivalThreshold = Duration.ofDays(lateArrivalThresholdDays); + for (int i = 0; i < 30; i++) { + LocalDateTime testPartitionDate = LocalDateTime.now().minusDays(i); + ForceUpLevel0Compaction compaction = + new ForceUpLevel0Compaction( + new UniversalCompaction(200, 1, 5), + lateArrivalThreshold, + testPartitionDate); + + if (i >= lateArrivalThresholdDays) { + assertThat(compaction.isLateArrival()).isTrue(); + + Optional result = + compaction.pick(3, Arrays.asList(run(0, 1), run(0, 1))); + assertThat(result).isEmpty(); + + result = compaction.pick(3, Arrays.asList(run(0, 1), run(0, 1), run(1, 10))); + assertThat(result).isEmpty(); + + result = + compaction.pick( + 4, + Arrays.asList( + run(0, 1), run(0, 1), run(0, 1), run(1, 10), run(2, 30))); + assertThat(result).isPresent(); + assertThat(result.get().outputLevel()).isEqualTo(1); + assertThat(result.get().files().size()).isEqualTo(4); + + } else { + assertThat(compaction.isLateArrival()).isFalse(); + + Optional result = + compaction.pick(3, Arrays.asList(run(0, 1), run(0, 1))); + assertThat(result).isPresent(); + assertThat(result.get().outputLevel()).isEqualTo(2); + + result = compaction.pick(3, Arrays.asList(run(0, 1), run(1, 10))); + assertThat(result).isPresent(); + assertThat(result.get().outputLevel()).isEqualTo(2); + + result = compaction.pick(3, Arrays.asList(run(0, 1), run(0, 5), run(2, 10))); + assertThat(result).isPresent(); + assertThat(result.get().outputLevel()).isEqualTo(1); + } + } + } + private LevelSortedRun run(int level, int size) { return new LevelSortedRun(level, SortedRun.fromSingle(file(size))); } diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreWriteTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreWriteTest.java new file mode 100644 index 000000000000..06286f2faf41 --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreWriteTest.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.operation; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.KeyValue; +import org.apache.paimon.TestFileStore; +import org.apache.paimon.TestKeyValueGenerator; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.mergetree.compact.CompactStrategy; +import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction; +import org.apache.paimon.mergetree.compact.ForceUpLevel0Compaction; +import org.apache.paimon.partition.PartitionTimeExtractor; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.schema.TableSchema; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.time.Duration; +import java.time.LocalDateTime; +import java.util.HashMap; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link KeyValueFileStoreWrite}. */ +public class KeyValueFileStoreWriteTest { + + private static final int NUM_BUCKETS = 10; + + @TempDir java.nio.file.Path tempDir; + + @Test + public void testLateArrival() throws Exception { + SchemaManager schemaManager = + new SchemaManager(LocalFileIO.create(), new Path(tempDir.toUri())); + Map options = new HashMap<>(); + Duration lateArrivalThreshold = Duration.ofDays(1L); + options.put(CoreOptions.LATE_ARRIVAL_THRESHOLD.key(), "1 d"); + options.put(CoreOptions.DELETION_VECTORS_ENABLED.key(), "true"); + options.put(CoreOptions.PARTITION_TIMESTAMP_FORMATTER.key(), "yyyyMMdd H"); + options.put(CoreOptions.PARTITION_TIMESTAMP_PATTERN.key(), "$dt $hr"); + + TableSchema schema = + schemaManager.createTable( + new Schema( + TestKeyValueGenerator.DEFAULT_ROW_TYPE.getFields(), + TestKeyValueGenerator.DEFAULT_PART_TYPE.getFieldNames(), + TestKeyValueGenerator.getPrimaryKeys( + TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED), + options, + null)); + TestFileStore store = + new TestFileStore.Builder( + "avro", + tempDir.toString(), + NUM_BUCKETS, + TestKeyValueGenerator.DEFAULT_PART_TYPE, + TestKeyValueGenerator.KEY_TYPE, + TestKeyValueGenerator.DEFAULT_ROW_TYPE, + TestKeyValueGenerator.TestKeyValueFieldsExtractor.EXTRACTOR, + DeduplicateMergeFunction.factory(), + schema) + .build(); + + KeyValueFileStoreWrite write = (KeyValueFileStoreWrite) store.newWrite(); + TestKeyValueGenerator gen = new TestKeyValueGenerator(); + PartitionTimeExtractor partitionTimeExtractor = new PartitionTimeExtractor(store.options()); + + for (int i = 0; i < 30; i++) { + KeyValue keyValue = gen.next(); + BinaryRow partition = gen.getPartition(keyValue); + LocalDateTime partitionTime = + partitionTimeExtractor.extract( + partition, TestKeyValueGenerator.DEFAULT_PART_TYPE); + CompactStrategy compactStrategy = + write.createCompactStrategy(store.options(), gen.getPartition(keyValue)); + assertThat(compactStrategy).isInstanceOf(ForceUpLevel0Compaction.class); + ForceUpLevel0Compaction forceUpLevel0Compaction = + (ForceUpLevel0Compaction) compactStrategy; + assertThat(forceUpLevel0Compaction.isLateArrival()) + .isEqualTo( + !partitionTime.isAfter( + LocalDateTime.now().minus(lateArrivalThreshold))); + } + } +}