From 3702c60e0622a3e0867776527d86d1340b4ed029 Mon Sep 17 00:00:00 2001 From: LinMingQiang Date: Wed, 12 Feb 2025 16:22:51 +0800 Subject: [PATCH] [core] remove minor_compact action. --- .../generated/core_configuration.html | 2 +- .../java/org/apache/paimon/CoreOptions.java | 66 +++++++------------ .../AppendOnlyFixedBucketFileStoreWrite.java | 2 +- .../operation/KeyValueFileStoreWrite.java | 4 +- .../paimon/table/AbstractFileStoreTable.java | 6 +- .../apache/paimon/flink/sink/FlinkSink.java | 2 +- .../sink/MultiTablesStoreCompactOperator.java | 7 +- .../flink/sink/StoreCompactOperator.java | 5 +- .../paimon/flink/sink/UnawareBucketSink.java | 4 +- .../paimon/flink/sink/WriteActionsITCase.java | 63 ++++++++++-------- 10 files changed, 77 insertions(+), 84 deletions(-) diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index d99f279368c8..82c1ae0effbe 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -1024,7 +1024,7 @@
write-actions
"all" String - This parameter is used to specify which actions will be performed during the writing process. This parameter is effective only when write-only is false and has no effect on action or procedure.
1. 'all': By default, all actions will be performed.
2. 'partition-expire': Perform partition expiration action.
3. 'snapshot-expire': Perform snapshot expiration action.
4. 'tag-automatic-creation': Perform automatic creation tag action.
5. 'full-compact': Perform full compaction action.
6. 'minor-compact': Perform minor compaction action.
Both can be configured at the same time: 'partition-expire,snapshot-expire,tag-automatic-creation', if you want to skip all actions you can set this to ''. + This parameter is used to specify which actions will be performed during the writing process. This parameter is effective only when write-only is false and has no effect on action or procedure.
1. 'all': By default, all actions will be performed.
2. 'partition-expire': Perform partition expiration action.
3. 'snapshot-expire': Perform snapshot expiration action.
4. 'tag-automatic-creation': Perform automatic creation tag action.
5. 'full-compact': Perform full compaction action.
Both can be configured at the same time: 'partition-expire,snapshot-expire,tag-automatic-creation', if you want to skip all actions you can set this to ''.
write-buffer-for-append
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index 9cabcebd5e11..dd690779f378 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -465,8 +465,6 @@ public class CoreOptions implements Serializable { .linebreak() .text("5. 'full-compact': Perform full compaction action.") .linebreak() - .text("6. 'minor-compact': Perform minor compaction action.") - .linebreak() .text( "Both can be configured at the same time: 'partition-expire," + "snapshot-expire,tag-automatic-creation', " @@ -2287,45 +2285,6 @@ public static Set writeActions(String str) { .collect(Collectors.toCollection(HashSet::new)); } - public boolean doPartitionExpireAction(Set doWriteActions) { - return doAllWriteActions(doWriteActions) - || doWriteActions.contains(WriteAction.PARTITION_EXPIRE); - } - - public boolean doSnapshotExpireAction(Set doWriteActions) { - return doAllWriteActions(doWriteActions) - || doWriteActions.contains(WriteAction.SNAPSHOT_EXPIRE); - } - - public boolean doAutoCreateTagAction(Set doWriteActions) { - return doAllWriteActions(doWriteActions) - || doWriteActions.contains(WriteAction.TAG_AUTOMATIC_CREATION); - } - - public boolean doFullCompactionAction(Set doWriteActions) { - return doAllWriteActions(doWriteActions) - || doWriteActions.contains(WriteAction.FULL_COMPACT); - } - - public boolean doMinorCompactionAction(Set doWriteActions) { - return doAllWriteActions(doWriteActions) - || doWriteActions.contains(WriteAction.MINOR_COMPACT); - } - - public boolean doCompact() { - return doCompact(writeActions()); - } - - public static boolean doCompact(Set doWriteActions) { - return doWriteActions.contains(WriteAction.ALL) - || doWriteActions.contains(WriteAction.FULL_COMPACT) - || doWriteActions.contains(WriteAction.MINOR_COMPACT); - } - - public boolean doAllWriteActions(Set doWriteActions) { - return doWriteActions.contains(WriteAction.ALL); - } - public boolean streamingReadOverwrite() { return options.get(STREAMING_READ_OVERWRITE); } @@ -3324,7 +3283,6 @@ public enum WriteAction { TAG_AUTOMATIC_CREATION("tag-automatic-creation"), // Actions during writing. - MINOR_COMPACT("minor-compact"), FULL_COMPACT("full-compact"); private final String value; @@ -3337,6 +3295,30 @@ public enum WriteAction { public String toString() { return value; } + + public static boolean doPartitionExpireAction(Set doWriteActions) { + return doAllWriteActions(doWriteActions) + || doWriteActions.contains(WriteAction.PARTITION_EXPIRE); + } + + public static boolean doSnapshotExpireAction(Set doWriteActions) { + return doAllWriteActions(doWriteActions) + || doWriteActions.contains(WriteAction.SNAPSHOT_EXPIRE); + } + + public static boolean doAutoCreateTagAction(Set doWriteActions) { + return doAllWriteActions(doWriteActions) + || doWriteActions.contains(WriteAction.TAG_AUTOMATIC_CREATION); + } + + public static boolean doFullCompactionAction(Set doWriteActions) { + return doAllWriteActions(doWriteActions) + || doWriteActions.contains(WriteAction.FULL_COMPACT); + } + + public static boolean doAllWriteActions(Set doWriteActions) { + return doWriteActions.contains(WriteAction.ALL); + } } /** The order type of table sort. */ diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFixedBucketFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFixedBucketFileStoreWrite.java index d0afdd381c8a..1d22ee3ca118 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFixedBucketFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFixedBucketFileStoreWrite.java @@ -78,7 +78,7 @@ protected CompactManager getCompactManager( List restoredFiles, ExecutorService compactExecutor, @Nullable DeletionVectorsMaintainer dvMaintainer) { - if (!options.doCompact()) { + if (!CoreOptions.WriteAction.doFullCompactionAction(options.writeActions())) { return new NoopCompactManager(); } else { Function dvFactory = diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java index e40238d066f4..aeb6eef2daf5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java @@ -239,7 +239,7 @@ private CompactManager createCompactManager( ExecutorService compactExecutor, Levels levels, @Nullable DeletionVectorsMaintainer dvMaintainer) { - if (!options.doCompact()) { + if (!CoreOptions.WriteAction.doFullCompactionAction(options.writeActions())) { return new NoopCompactManager(); } else { Comparator keyComparator = keyComparatorSupplier.get(); @@ -289,7 +289,7 @@ private MergeTreeCompactRewriter createRewriter( ChangelogProducer changelogProducer = options.changelogProducer(); LookupStrategy lookupStrategy = options.lookupStrategy(); if (changelogProducer.equals(FULL_COMPACTION) - && options.doFullCompactionAction(options.writeActions())) { + && CoreOptions.WriteAction.doFullCompactionAction(options.writeActions())) { return new FullChangelogMergeTreeCompactRewriter( maxLevel, mergeEngine, diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index d6c7fe532679..6c1837eb03c1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -435,7 +435,7 @@ public TableCommitImpl newCommit(String commitUser) { CoreOptions options = coreOptions(); Runnable snapshotExpire = null; Set skippingActions = options.writeActions(); - if (options.doSnapshotExpireAction(skippingActions)) { + if (WriteAction.doSnapshotExpireAction(skippingActions)) { boolean changelogDecoupled = options.changelogLifecycleDecoupled(); ExpireConfig expireConfig = options.expireConfig(); ExpireSnapshots expireChangelog = newExpireChangelog().config(expireConfig); @@ -452,10 +452,10 @@ public TableCommitImpl newCommit(String commitUser) { return new TableCommitImpl( store().newCommit(commitUser, createCommitCallbacks(commitUser)), snapshotExpire, - options.doPartitionExpireAction(skippingActions) + WriteAction.doPartitionExpireAction(skippingActions) ? store().newPartitionExpire(commitUser) : null, - options.doAutoCreateTagAction(skippingActions) + WriteAction.doAutoCreateTagAction(skippingActions) ? store().newTagCreationManager() : null, CoreOptions.fromMap(options()).consumerExpireTime(), diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java index 4bc3ea5f941f..02fb95d4ef12 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java @@ -114,7 +114,7 @@ private StoreSinkWrite.Provider createWriteProvider( Set writeActions = coreOptions.writeActions(); waitCompaction = coreOptions.prepareCommitWaitCompaction(); int deltaCommits = -1; - if (coreOptions.doFullCompactionAction(writeActions)) { + if (CoreOptions.WriteAction.doFullCompactionAction(writeActions)) { if (options.contains(FULL_COMPACTION_DELTA_COMMITS)) { deltaCommits = options.get(FULL_COMPACTION_DELTA_COMMITS); } else if (options.contains(CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL)) { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java index aaadcacaf90d..10144f6ca6c5 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java @@ -150,13 +150,12 @@ public void processElement(StreamRecord element) throws Exception { FileStoreTable table = getTable(tableId); Preconditions.checkArgument( - table.coreOptions().doCompact(), + CoreOptions.WriteAction.doFullCompactionAction(table.coreOptions().writeActions()), String.format( - "%s should not be true or %s should be %s or contains %s/%s for MultiTablesStoreCompactOperator.", + "%s should not be true or %s should be %s or contains %s for MultiTablesStoreCompactOperator.", CoreOptions.WRITE_ONLY.key(), CoreOptions.WRITE_ACTIONS.key(), CoreOptions.WriteAction.ALL, - CoreOptions.WriteAction.MINOR_COMPACT, CoreOptions.WriteAction.FULL_COMPACT)); storeSinkWriteProvider = @@ -270,7 +269,7 @@ private StoreSinkWrite.Provider createWriteProvider( Set writeActions = coreOptions.writeActions(); int deltaCommits = -1; - if (coreOptions.doFullCompactionAction(writeActions)) { + if (CoreOptions.WriteAction.doFullCompactionAction(writeActions)) { if (options.contains(FULL_COMPACTION_DELTA_COMMITS)) { deltaCommits = options.get(FULL_COMPACTION_DELTA_COMMITS); } else if (options.contains(CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL)) { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java index b2bd132d5f3a..c1b6c8adb69a 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java @@ -170,13 +170,12 @@ public void close() throws Exception { public static void checkWriteActions(CoreOptions coreOptions) { Preconditions.checkArgument( - coreOptions.doCompact(), + CoreOptions.WriteAction.doFullCompactionAction(coreOptions.writeActions()), String.format( - "%s should not be true or %s should be %s or contains %s/%s for StoreCompactOperator.", + "%s should not be true or %s should be %s or contains %s for StoreCompactOperator.", CoreOptions.WRITE_ONLY.key(), CoreOptions.WRITE_ACTIONS.key(), CoreOptions.WriteAction.ALL, - CoreOptions.WriteAction.MINOR_COMPACT, CoreOptions.WriteAction.FULL_COMPACT)); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketSink.java index 2fea587837e3..53098e56683f 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketSink.java @@ -18,6 +18,7 @@ package org.apache.paimon.flink.sink; +import org.apache.paimon.CoreOptions; import org.apache.paimon.flink.FlinkConnectorOptions; import org.apache.paimon.flink.compact.UnawareBucketNewFilesCompactionCoordinatorOperator; import org.apache.paimon.flink.compact.UnawareBucketNewFilesCompactionWorkerOperator; @@ -87,7 +88,8 @@ public DataStream doWrite( .setParallelism(written.getParallelism()); } - boolean enableCompaction = table.coreOptions().doCompact(); + boolean enableCompaction = + CoreOptions.WriteAction.doFullCompactionAction(table.coreOptions().writeActions()); boolean isStreamingMode = input.getExecutionEnvironment() .getConfiguration() diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriteActionsITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriteActionsITCase.java index 50505a821ef2..91cb348ad81d 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriteActionsITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriteActionsITCase.java @@ -43,7 +43,13 @@ public class WriteActionsITCase extends CatalogITCaseBase { @Timeout(value = TIMEOUT) @ParameterizedTest - @ValueSource(strings = {"PARTITION-EXPIRE", "SNAPSHOT-EXPIRE", "TAG-AUTOMATIC-CREATION"}) + @ValueSource( + strings = { + "PARTITION-EXPIRE", + "SNAPSHOT-EXPIRE", + "TAG-AUTOMATIC-CREATION", + "FULL-COMPACT" + }) public void testWriteActionsWhichExecutedDuringCommit(String val) throws Exception { CoreOptions.WriteAction writeAction = @@ -51,11 +57,7 @@ public void testWriteActionsWhichExecutedDuringCommit(String val) throws Excepti HashMap writeActionOptions = createOptions( - String.format( - "%s,%s,%s", - writeAction, - CoreOptions.WriteAction.FULL_COMPACT, - CoreOptions.WriteAction.MINOR_COMPACT)); + String.format("%s,%s", writeAction, CoreOptions.WriteAction.FULL_COMPACT)); createPrimaryKeyTable("T", writeActionOptions); sql("INSERT INTO T VALUES ('HXH', '20250101')"); @@ -95,6 +97,17 @@ public void testWriteActionsWhichExecutedDuringCommit(String val) throws Excepti // Snapshot 2 is Compact, full compact. assertThat(snapshotManager.snapshot(2).commitKind()) .isEqualTo(Snapshot.CommitKind.COMPACT); + break; + case FULL_COMPACT: + // A single write will trigger full compact. + expectTable(table, snapshotManager, 2, 2, 0, "20250101"); + // Snapshot 1 is APPEND. + assertThat(snapshotManager.snapshot(1).commitKind()) + .isEqualTo(Snapshot.CommitKind.APPEND); + // Snapshot 2 is COMPACT. + assertThat(snapshotManager.snapshot(2).commitKind()) + .isEqualTo(Snapshot.CommitKind.COMPACT); + break; } } @@ -150,13 +163,11 @@ public void testSkipOrDoAllWriteActions(String action) throws Exception { @Timeout(value = TIMEOUT) @ParameterizedTest - @ValueSource(strings = {"FULL-COMPACT", "MINOR-COMPACT"}) + @ValueSource(strings = {"DO", "SKIP"}) public void testAppendOnlyCompactActions(String val) throws Exception { - CoreOptions.WriteAction writeAction = - CoreOptions.WriteAction.valueOf(val.replace("-", "_")); - - HashMap writeActionOptions = createOptions(writeAction.toString()); + HashMap writeActionOptions = + createOptions(val.equals("DO") ? "FULL_COMPACT" : ""); writeActionOptions.put(CoreOptions.COMPACTION_MAX_FILE_NUM.key(), "4"); createAppendOnlyTable("T", writeActionOptions); @@ -167,29 +178,29 @@ public void testAppendOnlyCompactActions(String val) throws Exception { sql("INSERT INTO T VALUES ('HXH', '20250101')"); sql("INSERT INTO T VALUES ('HXH', '20250101')"); - switch (writeAction) { - case FULL_COMPACT: + switch (val) { + case "DO": // Trigger full compaction. expectTable(table, snapshotManager, 4, 4, 0, "20250101"); + // Snapshot 3 is APPEND. + assertThat(snapshotManager.snapshot(3).commitKind()) + .isEqualTo(Snapshot.CommitKind.APPEND); + // Snapshot 4 is COMPACT. + assertThat(snapshotManager.snapshot(4).commitKind()) + .isEqualTo(Snapshot.CommitKind.COMPACT); break; - case MINOR_COMPACT: - // Will not trigger full compact because we skip it. + case "SKIP": expectTable(table, snapshotManager, 3, 3, 0, "20250101"); - // Trigger minor compact. - sql("INSERT INTO T VALUES ('HXH', '20250101')"); - expectTable(table, snapshotManager, 5, 5, 0, "20250101"); } } @Timeout(value = TIMEOUT) @ParameterizedTest - @ValueSource(strings = {"FULL-COMPACT", "MINOR-COMPACT"}) + @ValueSource(strings = {"DO", "SKIP"}) public void testPrimaryKeyTableCompactActions(String val) throws Exception { - CoreOptions.WriteAction writeAction = - CoreOptions.WriteAction.valueOf(val.replace("-", "_")); - - HashMap writeActionOptions = createOptions(writeAction.toString()); + HashMap writeActionOptions = + createOptions(val.equals("DO") ? "FULL_COMPACT" : ""); // Ensure that a single data write does not trigger a minor compaction. writeActionOptions.put(CoreOptions.NUM_SORTED_RUNS_COMPACTION_TRIGGER.key(), "2"); @@ -200,8 +211,8 @@ public void testPrimaryKeyTableCompactActions(String val) throws Exception { FileStoreTable table = paimonTable("T"); SnapshotManager snapshotManager = table.snapshotManager(); - switch (writeAction) { - case FULL_COMPACT: + switch (val) { + case "DO": // A single write will trigger full compact. expectTable(table, snapshotManager, 2, 2, 0, "20250101"); // Snapshot 1 is APPEND. @@ -211,7 +222,7 @@ public void testPrimaryKeyTableCompactActions(String val) throws Exception { assertThat(snapshotManager.snapshot(2).commitKind()) .isEqualTo(Snapshot.CommitKind.COMPACT); break; - case MINOR_COMPACT: + case "SKIP": // A single write will not trigger a minor compaction. expectTable(table, snapshotManager, 1, 1, 0, "20250101"); // Snapshot 1 is APPEND.