Skip to content

Commit

Permalink
[core] Add late-arrival threshold in ForceUpLevel0Compaction for diso…
Browse files Browse the repository at this point in the history
…rdered updates
  • Loading branch information
xiangyuf committed Feb 12, 2025
1 parent 797522c commit 478b0d5
Show file tree
Hide file tree
Showing 8 changed files with 235 additions and 92 deletions.
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,12 @@
<td>String</td>
<td>Specifies the commit user prefix.</td>
</tr>
<tr>
<td><h5>compaction.late-arrival-threshold</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Duration</td>
<td>The threshold of partitioned pk table to differentiate late arrived data. Late arrived data can be configured with a less frequent compaction strategy to sacrifice timeliness for overall resource usage saving. This option is only valid for a partitioned pk table when needLookup() is true.</td>
</tr>
<tr>
<td><h5>compaction.max-size-amplification-percent</h5></td>
<td style="word-wrap: break-word;">200</td>
Expand Down
14 changes: 9 additions & 5 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.paimon.utils.StringUtils;

import javax.annotation.Nullable;

import java.io.Serializable;
import java.lang.reflect.Field;
import java.time.Duration;
Expand Down Expand Up @@ -624,12 +625,15 @@ public class CoreOptions implements Serializable {
text("Default value of Bucketed Append Table is '5'."))
.build());

public static final ConfigOption<Duration> LATE_ARRIVED_THRESHOLD =
key("compaction.late-arrived-threshold")
public static final ConfigOption<Duration> LATE_ARRIVAL_THRESHOLD =
key("compaction.late-arrival-threshold")
.durationType()
.noDefaultValue()
.withDescription(
"The threshold of partitioned pk table to differentiate late arrived data. Late arrived data can be configured with a less frequent compaction strategy to sacrifice timeliness for overall resource usage saving. This option is only valid for a partitioned pk table when needLookup() is true.");
"The threshold of partitioned pk table to differentiate late arrived data. "
+ "Late arrived data can be configured with a less frequent compaction "
+ "strategy to sacrifice timeliness for overall resource usage saving. "
+ "This option is only valid for a partitioned pk table when needLookup() is true.");

public static final ConfigOption<ChangelogProducer> CHANGELOG_PRODUCER =
key("changelog-producer")
Expand Down Expand Up @@ -2080,8 +2084,8 @@ public Optional<Integer> compactionMaxFileNum() {
return options.getOptional(COMPACTION_MAX_FILE_NUM);
}

public Optional<Duration> lateArrivedThreshold() {
return options.getOptional(LATE_ARRIVED_THRESHOLD);
public Optional<Duration> lateArrivalThreshold() {
return options.getOptional(LATE_ARRIVAL_THRESHOLD);
}

public long dynamicBucketTargetRowNum() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,58 @@

package org.apache.paimon.mergetree.compact;

import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.compact.CompactUnit;
import org.apache.paimon.mergetree.LevelSortedRun;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.Optional;

/** A {@link CompactStrategy} to force compacting level 0 files. */
/**
* A {@link CompactStrategy} to force compacting level 0 files.
*
* <p>Note: This strategy will increase the compaction frequency drastically when updates are
* seriously disordered for partitioned PK table.
*
* <p>In certain cases, users can actually accept worse data freshness for the late arrived data in
* the historical partitions for lower compaction frequency. This can significantly reduce the
* compaction resource usage. {@link UniversalCompaction} can be used in this case by configuring
* CoreOptions.LATE_ARRIVAL_THRESHOLD.
*
* <p>Notice: Enabling CoreOptions.LATE_ARRIVAL_THRESHOLD may result in several data files in L0
* cannot be seen for a long time. This can be fixed with offline Historical Partition Compact.
*/
public class ForceUpLevel0Compaction implements CompactStrategy {
private static final Logger LOG = LoggerFactory.getLogger(ForceUpLevel0Compaction.class);

private final Duration lateArrivalThreshold;
private final LocalDateTime currentPartitionDate;

private final UniversalCompaction universal;

public ForceUpLevel0Compaction(UniversalCompaction universal) {
this(universal, null, null);
}

public ForceUpLevel0Compaction(
UniversalCompaction universal,
Duration lateArrivalThreshold,
LocalDateTime currentPartitionDate) {
this.universal = universal;
this.lateArrivalThreshold = lateArrivalThreshold;
this.currentPartitionDate = currentPartitionDate;
}

@Override
public Optional<CompactUnit> pick(int numLevels, List<LevelSortedRun> runs) {
Optional<CompactUnit> pick = universal.pick(numLevels, runs);
if (pick.isPresent()) {
if (pick.isPresent() || isLateArrival()) {
return pick;
}

Expand All @@ -54,4 +87,21 @@ public Optional<CompactUnit> pick(int numLevels, List<LevelSortedRun> runs) {
: Optional.of(
universal.pickForSizeRatio(numLevels - 1, runs, candidateCount, true));
}

@VisibleForTesting
public boolean isLateArrival() {
if (lateArrivalThreshold == null) {
return false;
}
LocalDateTime lateArrivalDate = LocalDateTime.now().minus(lateArrivalThreshold);
// For lateArrivedDate=20250120, any data insert into partitions<=20250120 will be
// considered as late arrived data.
boolean result = !currentPartitionDate.isAfter(lateArrivalDate);
LOG.debug(
"Current partition Date: {}, late arrival Date: {}, late arrival result: {}.",
currentPartitionDate.format(DateTimeFormatter.BASIC_ISO_DATE),
lateArrivalDate.format(DateTimeFormatter.BASIC_ISO_DATE),
result);
return result;
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -60,24 +60,26 @@
import org.apache.paimon.mergetree.compact.MergeFunctionFactory;
import org.apache.paimon.mergetree.compact.MergeTreeCompactManager;
import org.apache.paimon.mergetree.compact.MergeTreeCompactRewriter;
import org.apache.paimon.mergetree.compact.TimeAwareCompaction;
import org.apache.paimon.mergetree.compact.UniversalCompaction;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.PartitionTimeExtractor;
import org.apache.paimon.schema.KeyValueFieldsExtractor;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.CommitIncrement;
import org.apache.paimon.utils.FieldsComparator;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.UserDefinedSeqComparator;

import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.time.Duration;
import java.util.Comparator;
import java.util.List;
Expand Down Expand Up @@ -229,18 +231,15 @@ public CompactStrategy createCompactStrategy(CoreOptions options, BinaryRow part
options.sortedRunSizeRatio(),
options.numSortedRunCompactionTrigger(),
options.optimizedCompactionInterval());
ForceUpLevel0Compaction forceUpLevel0Compaction =
new ForceUpLevel0Compaction(universalCompaction);
if (options.needLookup()) {
Optional<Duration> lateArrivedThreshold = options.lateArrivedThreshold();
Optional<Duration> lateArrivedThreshold = options.lateArrivalThreshold();
if (partition.getFieldCount() > 0 && lateArrivedThreshold.isPresent()) {
return new TimeAwareCompaction(
lateArrivedThreshold.get(),
forceUpLevel0Compaction,
return new ForceUpLevel0Compaction(
universalCompaction,
lateArrivedThreshold.get(),
partitionTimeExtractor.extract(partition, partitionType));
} else {
return forceUpLevel0Compaction;
return new ForceUpLevel0Compaction(universalCompaction);
}
} else {
return universalCompaction;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.paimon.utils.RowDataToObjectArrayConverter;

import javax.annotation.Nullable;

import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@

import org.junit.jupiter.api.Test;

import java.time.Duration;
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.Collections;
import java.util.Optional;
Expand Down Expand Up @@ -59,6 +61,56 @@ public void testForceCompaction0() {
assertThat(result.get().outputLevel()).isEqualTo(2);
}

@Test
public void testForceCompaction0WithLateArrivalThreshold() {
int lateArrivalThresholdDays = 3;
Duration lateArrivalThreshold = Duration.ofDays(lateArrivalThresholdDays);
for (int i = 0; i < 30; i++) {
LocalDateTime testPartitionDate = LocalDateTime.now().minusDays(i);
ForceUpLevel0Compaction compaction =
new ForceUpLevel0Compaction(
new UniversalCompaction(200, 1, 5),
lateArrivalThreshold,
testPartitionDate);

if (i >= lateArrivalThresholdDays) {
assertThat(compaction.isLateArrival()).isTrue();

Optional<CompactUnit> result =
compaction.pick(3, Arrays.asList(run(0, 1), run(0, 1)));
assertThat(result).isEmpty();

result = compaction.pick(3, Arrays.asList(run(0, 1), run(0, 1), run(1, 10)));
assertThat(result).isEmpty();

result =
compaction.pick(
4,
Arrays.asList(
run(0, 1), run(0, 1), run(0, 1), run(1, 10), run(2, 30)));
assertThat(result).isPresent();
assertThat(result.get().outputLevel()).isEqualTo(1);
assertThat(result.get().files().size()).isEqualTo(4);

} else {
assertThat(compaction.isLateArrival()).isFalse();

Optional<CompactUnit> result =
compaction.pick(3, Arrays.asList(run(0, 1), run(0, 1)));
assertThat(result).isPresent();
assertThat(result.get().outputLevel()).isEqualTo(2);

result = compaction.pick(3, Arrays.asList(run(0, 1), run(1, 10)));
assertThat(result).isPresent();
assertThat(result.get().outputLevel()).isEqualTo(2);

result = compaction.pick(3, Arrays.asList(run(0, 1), run(0, 5), run(2, 10)));
assertThat(result).isPresent();
assertThat(result.get().outputLevel()).isEqualTo(1);
}
}
}

private LevelSortedRun run(int level, int size) {
return new LevelSortedRun(level, SortedRun.fromSingle(file(size)));
}
Expand Down
Loading

0 comments on commit 478b0d5

Please sign in to comment.