Skip to content

Commit

Permalink
[core] remove minor_compact action.
Browse files Browse the repository at this point in the history
  • Loading branch information
LinMingQiang committed Feb 12, 2025
1 parent 1deb7fe commit 3702c60
Show file tree
Hide file tree
Showing 10 changed files with 77 additions and 84 deletions.
2 changes: 1 addition & 1 deletion docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -1024,7 +1024,7 @@
<td><h5>write-actions</h5></td>
<td style="word-wrap: break-word;">"all"</td>
<td>String</td>
<td>This parameter is used to specify which actions will be performed during the writing process. This parameter is effective only when write-only is false and has no effect on action or procedure.<br />1. 'all': By default, all actions will be performed.<br />2. 'partition-expire': Perform partition expiration action.<br />3. 'snapshot-expire': Perform snapshot expiration action.<br />4. 'tag-automatic-creation': Perform automatic creation tag action.<br />5. 'full-compact': Perform full compaction action.<br />6. 'minor-compact': Perform minor compaction action.<br />Both can be configured at the same time: 'partition-expire,snapshot-expire,tag-automatic-creation', if you want to skip all actions you can set this to ''.</td>
<td>This parameter is used to specify which actions will be performed during the writing process. This parameter is effective only when write-only is false and has no effect on action or procedure.<br />1. 'all': By default, all actions will be performed.<br />2. 'partition-expire': Perform partition expiration action.<br />3. 'snapshot-expire': Perform snapshot expiration action.<br />4. 'tag-automatic-creation': Perform automatic creation tag action.<br />5. 'full-compact': Perform full compaction action.<br />Both can be configured at the same time: 'partition-expire,snapshot-expire,tag-automatic-creation', if you want to skip all actions you can set this to ''.</td>
</tr>
<tr>
<td><h5>write-buffer-for-append</h5></td>
Expand Down
66 changes: 24 additions & 42 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -465,8 +465,6 @@ public class CoreOptions implements Serializable {
.linebreak()
.text("5. 'full-compact': Perform full compaction action.")
.linebreak()
.text("6. 'minor-compact': Perform minor compaction action.")
.linebreak()
.text(
"Both can be configured at the same time: 'partition-expire,"
+ "snapshot-expire,tag-automatic-creation', "
Expand Down Expand Up @@ -2287,45 +2285,6 @@ public static Set<WriteAction> writeActions(String str) {
.collect(Collectors.toCollection(HashSet::new));
}

public boolean doPartitionExpireAction(Set<WriteAction> doWriteActions) {
return doAllWriteActions(doWriteActions)
|| doWriteActions.contains(WriteAction.PARTITION_EXPIRE);
}

public boolean doSnapshotExpireAction(Set<WriteAction> doWriteActions) {
return doAllWriteActions(doWriteActions)
|| doWriteActions.contains(WriteAction.SNAPSHOT_EXPIRE);
}

public boolean doAutoCreateTagAction(Set<WriteAction> doWriteActions) {
return doAllWriteActions(doWriteActions)
|| doWriteActions.contains(WriteAction.TAG_AUTOMATIC_CREATION);
}

public boolean doFullCompactionAction(Set<WriteAction> doWriteActions) {
return doAllWriteActions(doWriteActions)
|| doWriteActions.contains(WriteAction.FULL_COMPACT);
}

public boolean doMinorCompactionAction(Set<WriteAction> doWriteActions) {
return doAllWriteActions(doWriteActions)
|| doWriteActions.contains(WriteAction.MINOR_COMPACT);
}

public boolean doCompact() {
return doCompact(writeActions());
}

public static boolean doCompact(Set<WriteAction> doWriteActions) {
return doWriteActions.contains(WriteAction.ALL)
|| doWriteActions.contains(WriteAction.FULL_COMPACT)
|| doWriteActions.contains(WriteAction.MINOR_COMPACT);
}

public boolean doAllWriteActions(Set<WriteAction> doWriteActions) {
return doWriteActions.contains(WriteAction.ALL);
}

public boolean streamingReadOverwrite() {
return options.get(STREAMING_READ_OVERWRITE);
}
Expand Down Expand Up @@ -3324,7 +3283,6 @@ public enum WriteAction {
TAG_AUTOMATIC_CREATION("tag-automatic-creation"),

// Actions during writing.
MINOR_COMPACT("minor-compact"),
FULL_COMPACT("full-compact");

private final String value;
Expand All @@ -3337,6 +3295,30 @@ public enum WriteAction {
public String toString() {
return value;
}

public static boolean doPartitionExpireAction(Set<WriteAction> doWriteActions) {
return doAllWriteActions(doWriteActions)
|| doWriteActions.contains(WriteAction.PARTITION_EXPIRE);
}

public static boolean doSnapshotExpireAction(Set<WriteAction> doWriteActions) {
return doAllWriteActions(doWriteActions)
|| doWriteActions.contains(WriteAction.SNAPSHOT_EXPIRE);
}

public static boolean doAutoCreateTagAction(Set<WriteAction> doWriteActions) {
return doAllWriteActions(doWriteActions)
|| doWriteActions.contains(WriteAction.TAG_AUTOMATIC_CREATION);
}

public static boolean doFullCompactionAction(Set<WriteAction> doWriteActions) {
return doAllWriteActions(doWriteActions)
|| doWriteActions.contains(WriteAction.FULL_COMPACT);
}

public static boolean doAllWriteActions(Set<WriteAction> doWriteActions) {
return doWriteActions.contains(WriteAction.ALL);
}
}

/** The order type of table sort. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ protected CompactManager getCompactManager(
List<DataFileMeta> restoredFiles,
ExecutorService compactExecutor,
@Nullable DeletionVectorsMaintainer dvMaintainer) {
if (!options.doCompact()) {
if (!CoreOptions.WriteAction.doFullCompactionAction(options.writeActions())) {
return new NoopCompactManager();
} else {
Function<String, DeletionVector> dvFactory =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ private CompactManager createCompactManager(
ExecutorService compactExecutor,
Levels levels,
@Nullable DeletionVectorsMaintainer dvMaintainer) {
if (!options.doCompact()) {
if (!CoreOptions.WriteAction.doFullCompactionAction(options.writeActions())) {
return new NoopCompactManager();
} else {
Comparator<InternalRow> keyComparator = keyComparatorSupplier.get();
Expand Down Expand Up @@ -289,7 +289,7 @@ private MergeTreeCompactRewriter createRewriter(
ChangelogProducer changelogProducer = options.changelogProducer();
LookupStrategy lookupStrategy = options.lookupStrategy();
if (changelogProducer.equals(FULL_COMPACTION)
&& options.doFullCompactionAction(options.writeActions())) {
&& CoreOptions.WriteAction.doFullCompactionAction(options.writeActions())) {
return new FullChangelogMergeTreeCompactRewriter(
maxLevel,
mergeEngine,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ public TableCommitImpl newCommit(String commitUser) {
CoreOptions options = coreOptions();
Runnable snapshotExpire = null;
Set<WriteAction> skippingActions = options.writeActions();
if (options.doSnapshotExpireAction(skippingActions)) {
if (WriteAction.doSnapshotExpireAction(skippingActions)) {
boolean changelogDecoupled = options.changelogLifecycleDecoupled();
ExpireConfig expireConfig = options.expireConfig();
ExpireSnapshots expireChangelog = newExpireChangelog().config(expireConfig);
Expand All @@ -452,10 +452,10 @@ public TableCommitImpl newCommit(String commitUser) {
return new TableCommitImpl(
store().newCommit(commitUser, createCommitCallbacks(commitUser)),
snapshotExpire,
options.doPartitionExpireAction(skippingActions)
WriteAction.doPartitionExpireAction(skippingActions)
? store().newPartitionExpire(commitUser)
: null,
options.doAutoCreateTagAction(skippingActions)
WriteAction.doAutoCreateTagAction(skippingActions)
? store().newTagCreationManager()
: null,
CoreOptions.fromMap(options()).consumerExpireTime(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ private StoreSinkWrite.Provider createWriteProvider(
Set<CoreOptions.WriteAction> writeActions = coreOptions.writeActions();
waitCompaction = coreOptions.prepareCommitWaitCompaction();
int deltaCommits = -1;
if (coreOptions.doFullCompactionAction(writeActions)) {
if (CoreOptions.WriteAction.doFullCompactionAction(writeActions)) {
if (options.contains(FULL_COMPACTION_DELTA_COMMITS)) {
deltaCommits = options.get(FULL_COMPACTION_DELTA_COMMITS);
} else if (options.contains(CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,13 +150,12 @@ public void processElement(StreamRecord<RowData> element) throws Exception {
FileStoreTable table = getTable(tableId);

Preconditions.checkArgument(
table.coreOptions().doCompact(),
CoreOptions.WriteAction.doFullCompactionAction(table.coreOptions().writeActions()),
String.format(
"%s should not be true or %s should be %s or contains %s/%s for MultiTablesStoreCompactOperator.",
"%s should not be true or %s should be %s or contains %s for MultiTablesStoreCompactOperator.",
CoreOptions.WRITE_ONLY.key(),
CoreOptions.WRITE_ACTIONS.key(),
CoreOptions.WriteAction.ALL,
CoreOptions.WriteAction.MINOR_COMPACT,
CoreOptions.WriteAction.FULL_COMPACT));

storeSinkWriteProvider =
Expand Down Expand Up @@ -270,7 +269,7 @@ private StoreSinkWrite.Provider createWriteProvider(
Set<CoreOptions.WriteAction> writeActions = coreOptions.writeActions();
int deltaCommits = -1;

if (coreOptions.doFullCompactionAction(writeActions)) {
if (CoreOptions.WriteAction.doFullCompactionAction(writeActions)) {
if (options.contains(FULL_COMPACTION_DELTA_COMMITS)) {
deltaCommits = options.get(FULL_COMPACTION_DELTA_COMMITS);
} else if (options.contains(CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,13 +170,12 @@ public void close() throws Exception {

public static void checkWriteActions(CoreOptions coreOptions) {
Preconditions.checkArgument(
coreOptions.doCompact(),
CoreOptions.WriteAction.doFullCompactionAction(coreOptions.writeActions()),
String.format(
"%s should not be true or %s should be %s or contains %s/%s for StoreCompactOperator.",
"%s should not be true or %s should be %s or contains %s for StoreCompactOperator.",
CoreOptions.WRITE_ONLY.key(),
CoreOptions.WRITE_ACTIONS.key(),
CoreOptions.WriteAction.ALL,
CoreOptions.WriteAction.MINOR_COMPACT,
CoreOptions.WriteAction.FULL_COMPACT));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.flink.sink;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.compact.UnawareBucketNewFilesCompactionCoordinatorOperator;
import org.apache.paimon.flink.compact.UnawareBucketNewFilesCompactionWorkerOperator;
Expand Down Expand Up @@ -87,7 +88,8 @@ public DataStream<Committable> doWrite(
.setParallelism(written.getParallelism());
}

boolean enableCompaction = table.coreOptions().doCompact();
boolean enableCompaction =
CoreOptions.WriteAction.doFullCompactionAction(table.coreOptions().writeActions());
boolean isStreamingMode =
input.getExecutionEnvironment()
.getConfiguration()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,21 @@ public class WriteActionsITCase extends CatalogITCaseBase {

@Timeout(value = TIMEOUT)
@ParameterizedTest
@ValueSource(strings = {"PARTITION-EXPIRE", "SNAPSHOT-EXPIRE", "TAG-AUTOMATIC-CREATION"})
@ValueSource(
strings = {
"PARTITION-EXPIRE",
"SNAPSHOT-EXPIRE",
"TAG-AUTOMATIC-CREATION",
"FULL-COMPACT"
})
public void testWriteActionsWhichExecutedDuringCommit(String val) throws Exception {

CoreOptions.WriteAction writeAction =
CoreOptions.WriteAction.valueOf(val.replace("-", "_"));

HashMap<String, String> writeActionOptions =
createOptions(
String.format(
"%s,%s,%s",
writeAction,
CoreOptions.WriteAction.FULL_COMPACT,
CoreOptions.WriteAction.MINOR_COMPACT));
String.format("%s,%s", writeAction, CoreOptions.WriteAction.FULL_COMPACT));

createPrimaryKeyTable("T", writeActionOptions);
sql("INSERT INTO T VALUES ('HXH', '20250101')");
Expand Down Expand Up @@ -95,6 +97,17 @@ public void testWriteActionsWhichExecutedDuringCommit(String val) throws Excepti
// Snapshot 2 is Compact, full compact.
assertThat(snapshotManager.snapshot(2).commitKind())
.isEqualTo(Snapshot.CommitKind.COMPACT);
break;
case FULL_COMPACT:
// A single write will trigger full compact.
expectTable(table, snapshotManager, 2, 2, 0, "20250101");
// Snapshot 1 is APPEND.
assertThat(snapshotManager.snapshot(1).commitKind())
.isEqualTo(Snapshot.CommitKind.APPEND);
// Snapshot 2 is COMPACT.
assertThat(snapshotManager.snapshot(2).commitKind())
.isEqualTo(Snapshot.CommitKind.COMPACT);
break;
}
}

Expand Down Expand Up @@ -150,13 +163,11 @@ public void testSkipOrDoAllWriteActions(String action) throws Exception {

@Timeout(value = TIMEOUT)
@ParameterizedTest
@ValueSource(strings = {"FULL-COMPACT", "MINOR-COMPACT"})
@ValueSource(strings = {"DO", "SKIP"})
public void testAppendOnlyCompactActions(String val) throws Exception {

CoreOptions.WriteAction writeAction =
CoreOptions.WriteAction.valueOf(val.replace("-", "_"));

HashMap<String, String> writeActionOptions = createOptions(writeAction.toString());
HashMap<String, String> writeActionOptions =
createOptions(val.equals("DO") ? "FULL_COMPACT" : "");
writeActionOptions.put(CoreOptions.COMPACTION_MAX_FILE_NUM.key(), "4");

createAppendOnlyTable("T", writeActionOptions);
Expand All @@ -167,29 +178,29 @@ public void testAppendOnlyCompactActions(String val) throws Exception {
sql("INSERT INTO T VALUES ('HXH', '20250101')");
sql("INSERT INTO T VALUES ('HXH', '20250101')");

switch (writeAction) {
case FULL_COMPACT:
switch (val) {
case "DO":
// Trigger full compaction.
expectTable(table, snapshotManager, 4, 4, 0, "20250101");
// Snapshot 3 is APPEND.
assertThat(snapshotManager.snapshot(3).commitKind())
.isEqualTo(Snapshot.CommitKind.APPEND);
// Snapshot 4 is COMPACT.
assertThat(snapshotManager.snapshot(4).commitKind())
.isEqualTo(Snapshot.CommitKind.COMPACT);
break;
case MINOR_COMPACT:
// Will not trigger full compact because we skip it.
case "SKIP":
expectTable(table, snapshotManager, 3, 3, 0, "20250101");
// Trigger minor compact.
sql("INSERT INTO T VALUES ('HXH', '20250101')");
expectTable(table, snapshotManager, 5, 5, 0, "20250101");
}
}

@Timeout(value = TIMEOUT)
@ParameterizedTest
@ValueSource(strings = {"FULL-COMPACT", "MINOR-COMPACT"})
@ValueSource(strings = {"DO", "SKIP"})
public void testPrimaryKeyTableCompactActions(String val) throws Exception {

CoreOptions.WriteAction writeAction =
CoreOptions.WriteAction.valueOf(val.replace("-", "_"));

HashMap<String, String> writeActionOptions = createOptions(writeAction.toString());
HashMap<String, String> writeActionOptions =
createOptions(val.equals("DO") ? "FULL_COMPACT" : "");

// Ensure that a single data write does not trigger a minor compaction.
writeActionOptions.put(CoreOptions.NUM_SORTED_RUNS_COMPACTION_TRIGGER.key(), "2");
Expand All @@ -200,8 +211,8 @@ public void testPrimaryKeyTableCompactActions(String val) throws Exception {
FileStoreTable table = paimonTable("T");
SnapshotManager snapshotManager = table.snapshotManager();

switch (writeAction) {
case FULL_COMPACT:
switch (val) {
case "DO":
// A single write will trigger full compact.
expectTable(table, snapshotManager, 2, 2, 0, "20250101");
// Snapshot 1 is APPEND.
Expand All @@ -211,7 +222,7 @@ public void testPrimaryKeyTableCompactActions(String val) throws Exception {
assertThat(snapshotManager.snapshot(2).commitKind())
.isEqualTo(Snapshot.CommitKind.COMPACT);
break;
case MINOR_COMPACT:
case "SKIP":
// A single write will not trigger a minor compaction.
expectTable(table, snapshotManager, 1, 1, 0, "20250101");
// Snapshot 1 is APPEND.
Expand Down

0 comments on commit 3702c60

Please sign in to comment.