From cbecad15da66d276295418928c8db0f9afa00225 Mon Sep 17 00:00:00 2001 From: LinMingQiang Date: Fri, 24 Jan 2025 16:33:49 +0800 Subject: [PATCH 1/2] [core] Add open method to PartitionMarkDone for CustomPartitionMarkDone. --- .../actions/HttpReportMarkDoneAction.java | 23 ++++++++++--------- .../actions/PartitionMarkDoneAction.java | 5 +++- .../CustomPartitionMarkDoneActionTest.java | 2 +- .../HttpReportMarkDoneActionTest.java | 16 ++++++------- .../MockCustomPartitionMarkDoneAction.java | 12 +++++++++- 5 files changed, 36 insertions(+), 22 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/partition/actions/HttpReportMarkDoneAction.java b/paimon-core/src/main/java/org/apache/paimon/partition/actions/HttpReportMarkDoneAction.java index 39c17406b339..4b4f7b6895eb 100644 --- a/paimon-core/src/main/java/org/apache/paimon/partition/actions/HttpReportMarkDoneAction.java +++ b/paimon-core/src/main/java/org/apache/paimon/partition/actions/HttpReportMarkDoneAction.java @@ -55,20 +55,22 @@ /** Report partition submission information to remote http server. */ public class HttpReportMarkDoneAction implements PartitionMarkDoneAction { - private final OkHttpClient client; - private final String url; - private final ObjectMapper mapper; + private OkHttpClient client; + private String url; + private ObjectMapper mapper; private static final MediaType MEDIA_TYPE = MediaType.parse("application/json"); - private final FileStoreTable fileStoreTable; + private String tableName; + private String location; - private final String params; + private String params; private static final String RESPONSE_SUCCESS = "SUCCESS"; private static final String THREAD_NAME = "PAIMON-HTTP-REPORT-MARK-DONE-ACTION-THREAD"; - public HttpReportMarkDoneAction(FileStoreTable fileStoreTable, CoreOptions options) { + @Override + public void open(FileStoreTable fileStoreTable, CoreOptions options) { Preconditions.checkArgument( !StringUtils.isNullOrWhitespaceOnly(options.httpReportMarkDoneActionUrl()), @@ -76,9 +78,11 @@ public HttpReportMarkDoneAction(FileStoreTable fileStoreTable, CoreOptions optio "Parameter %s must be non-empty when you use `http-report` partition mark done action.", PARTITION_MARK_DONE_ACTION_URL.key())); - this.fileStoreTable = fileStoreTable; this.params = options.httpReportMarkDoneActionParams(); this.url = options.httpReportMarkDoneActionUrl(); + this.tableName = fileStoreTable.fullName(); + this.location = fileStoreTable.location().toString(); + this.mapper = new ObjectMapper(); mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false); @@ -102,10 +106,7 @@ public void markDone(String partition) throws Exception { HttpReportMarkDoneResponse response = post( new HttpReportMarkDoneRequest( - params, - fileStoreTable.fullName(), - fileStoreTable.location().toString(), - partition), + params, this.tableName, this.location, partition), Collections.emptyMap()); Preconditions.checkState( reportIsSuccess(response), diff --git a/paimon-core/src/main/java/org/apache/paimon/partition/actions/PartitionMarkDoneAction.java b/paimon-core/src/main/java/org/apache/paimon/partition/actions/PartitionMarkDoneAction.java index ee12fce528ca..f17967dde1c3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/partition/actions/PartitionMarkDoneAction.java +++ b/paimon-core/src/main/java/org/apache/paimon/partition/actions/PartitionMarkDoneAction.java @@ -37,6 +37,8 @@ /** Action to mark partitions done. */ public interface PartitionMarkDoneAction extends Closeable { + default void open(FileStoreTable fileStoreTable, CoreOptions options) {} + void markDone(String partition) throws Exception; static List createActions( @@ -55,13 +57,14 @@ static List createActions( return new MarkPartitionDoneEventAction( createPartitionHandler(fileStoreTable, options)); case HTTP_REPORT: - return new HttpReportMarkDoneAction(fileStoreTable, options); + return new HttpReportMarkDoneAction(); case CUSTOM: return generateCustomMarkDoneAction(cl, options); default: throw new UnsupportedOperationException(action.toString()); } }) + .peek(action -> action.open(fileStoreTable, options)) .collect(Collectors.toList()); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/CustomPartitionMarkDoneActionTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/CustomPartitionMarkDoneActionTest.java index 73ba630b638c..55d1cabdcd98 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/CustomPartitionMarkDoneActionTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/CustomPartitionMarkDoneActionTest.java @@ -99,6 +99,6 @@ public void testCustomPartitionMarkDoneAction() throws Exception { assertThat(table2.fileIO().exists(successFile)).isEqualTo(true); assertThat(MockCustomPartitionMarkDoneAction.getMarkedDonePartitions().iterator().next()) - .isEqualTo("a=0/"); + .isEqualTo("table=default.T,partition=a=0/"); } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/HttpReportMarkDoneActionTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/HttpReportMarkDoneActionTest.java index b79597cb810f..dea8b1d47a69 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/HttpReportMarkDoneActionTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/HttpReportMarkDoneActionTest.java @@ -80,7 +80,8 @@ public void stopServer() throws Exception { @Test public void testHttpReportMarkDoneActionSuccessResponse() throws Exception { - HttpReportMarkDoneAction httpReportMarkDoneAction = createHttpReportMarkDoneAction(); + HttpReportMarkDoneAction httpReportMarkDoneAction = new HttpReportMarkDoneAction(); + httpReportMarkDoneAction.open(fileStoreTable, createCoreOptions()); server.enqueueResponse(successResponse, 200); @@ -97,7 +98,9 @@ public void testHttpReportMarkDoneActionSuccessResponse() throws Exception { // test params is null. params = null; - HttpReportMarkDoneAction httpReportMarkDoneAction3 = createHttpReportMarkDoneAction(); + HttpReportMarkDoneAction httpReportMarkDoneAction3 = new HttpReportMarkDoneAction(); + httpReportMarkDoneAction3.open(fileStoreTable, createCoreOptions()); + server.enqueueResponse(successResponse, 200); httpReportMarkDoneAction3.markDone(partition); RecordedRequest request3 = server.takeRequest(10, TimeUnit.SECONDS); @@ -105,8 +108,9 @@ public void testHttpReportMarkDoneActionSuccessResponse() throws Exception { } @Test - public void testHttpReportMarkDoneActionFailedResponse() throws Exception { - HttpReportMarkDoneAction markDoneAction = createHttpReportMarkDoneAction(); + public void testHttpReportMarkDoneActionFailedResponse() { + HttpReportMarkDoneAction markDoneAction = new HttpReportMarkDoneAction(); + markDoneAction.open(fileStoreTable, createCoreOptions()); // status failed. server.enqueueResponse(failedResponse, 200); @@ -160,10 +164,6 @@ public static CoreOptions createCoreOptions() { return new CoreOptions(httpOptions); } - public HttpReportMarkDoneAction createHttpReportMarkDoneAction() { - return new HttpReportMarkDoneAction(fileStoreTable, createCoreOptions()); - } - public FileStoreTable createFileStoreTable() throws Exception { org.apache.paimon.fs.Path tablePath = new org.apache.paimon.fs.Path(folder.newFolder().toURI().toString()); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/MockCustomPartitionMarkDoneAction.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/MockCustomPartitionMarkDoneAction.java index f8d9b4034672..cf86b29cb66b 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/MockCustomPartitionMarkDoneAction.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/MockCustomPartitionMarkDoneAction.java @@ -18,7 +18,9 @@ package org.apache.paimon.flink.sink.partition; +import org.apache.paimon.CoreOptions; import org.apache.paimon.partition.actions.PartitionMarkDoneAction; +import org.apache.paimon.table.FileStoreTable; import java.io.IOException; import java.util.HashSet; @@ -29,9 +31,17 @@ public class MockCustomPartitionMarkDoneAction implements PartitionMarkDoneActio private static final Set markedDonePartitions = new HashSet<>(); + private String tableName; + + @Override + public void open(FileStoreTable fileStoreTable, CoreOptions options) { + this.tableName = fileStoreTable.fullName(); + } + @Override public void markDone(String partition) { - MockCustomPartitionMarkDoneAction.markedDonePartitions.add(partition); + MockCustomPartitionMarkDoneAction.markedDonePartitions.add( + String.format("table=%s,partition=%s", tableName, partition)); } public static Set getMarkedDonePartitions() { From 9c74fa11f7fc8a5a87627d1bb00cc36643ec9a91 Mon Sep 17 00:00:00 2001 From: LinMingQiang Date: Fri, 24 Jan 2025 18:10:21 +0800 Subject: [PATCH 2/2] [core] fix test. --- .../paimon/flink/action/MarkPartitionDoneActionITCase.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MarkPartitionDoneActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MarkPartitionDoneActionITCase.java index 0a29aebf22bf..e2189f4738d4 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MarkPartitionDoneActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MarkPartitionDoneActionITCase.java @@ -167,6 +167,7 @@ public void testPartitionMarkDoneWithMultiplePartitionKey(boolean hasPk, String @MethodSource("testArguments") public void testCustomPartitionMarkDoneAction(boolean hasPk, String invoker) throws Exception { + MockCustomPartitionMarkDoneAction.getMarkedDonePartitions().clear(); Map options = new HashMap<>(2); options.put(PARTITION_MARK_DONE_ACTION.key(), SUCCESS_FILE + "," + CUSTOM); options.put( @@ -174,6 +175,7 @@ public void testCustomPartitionMarkDoneAction(boolean hasPk, String invoker) thr MockCustomPartitionMarkDoneAction.class.getName()); FileStoreTable table = prepareTable(hasPk, options); + String fullTableName = table.fullName(); switch (invoker) { case "action": @@ -217,7 +219,9 @@ public void testCustomPartitionMarkDoneAction(boolean hasPk, String invoker) thr assertThat(successFile2).isNotNull(); assertThat(MockCustomPartitionMarkDoneAction.getMarkedDonePartitions()) - .containsExactlyInAnyOrder("partKey0=0/partKey1=1/", "partKey0=1/partKey1=0/"); + .containsExactlyInAnyOrder( + "table=" + fullTableName + ",partition=partKey0=0/partKey1=1/", + "table=" + fullTableName + ",partition=partKey0=1/partKey1=0/"); } @ParameterizedTest