Skip to content

Commit

Permalink
[core] Add open method to PartitionMarkDone for CustomPartitionMarkDone.
Browse files Browse the repository at this point in the history
  • Loading branch information
LinMingQiang committed Jan 24, 2025
1 parent f564400 commit cbecad1
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,30 +55,34 @@
/** Report partition submission information to remote http server. */
public class HttpReportMarkDoneAction implements PartitionMarkDoneAction {

private final OkHttpClient client;
private final String url;
private final ObjectMapper mapper;
private OkHttpClient client;
private String url;
private ObjectMapper mapper;
private static final MediaType MEDIA_TYPE = MediaType.parse("application/json");

private final FileStoreTable fileStoreTable;
private String tableName;
private String location;

private final String params;
private String params;

private static final String RESPONSE_SUCCESS = "SUCCESS";

private static final String THREAD_NAME = "PAIMON-HTTP-REPORT-MARK-DONE-ACTION-THREAD";

public HttpReportMarkDoneAction(FileStoreTable fileStoreTable, CoreOptions options) {
@Override
public void open(FileStoreTable fileStoreTable, CoreOptions options) {

Preconditions.checkArgument(
!StringUtils.isNullOrWhitespaceOnly(options.httpReportMarkDoneActionUrl()),
String.format(
"Parameter %s must be non-empty when you use `http-report` partition mark done action.",
PARTITION_MARK_DONE_ACTION_URL.key()));

this.fileStoreTable = fileStoreTable;
this.params = options.httpReportMarkDoneActionParams();
this.url = options.httpReportMarkDoneActionUrl();
this.tableName = fileStoreTable.fullName();
this.location = fileStoreTable.location().toString();

this.mapper = new ObjectMapper();
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
Expand All @@ -102,10 +106,7 @@ public void markDone(String partition) throws Exception {
HttpReportMarkDoneResponse response =
post(
new HttpReportMarkDoneRequest(
params,
fileStoreTable.fullName(),
fileStoreTable.location().toString(),
partition),
params, this.tableName, this.location, partition),
Collections.emptyMap());
Preconditions.checkState(
reportIsSuccess(response),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
/** Action to mark partitions done. */
public interface PartitionMarkDoneAction extends Closeable {

default void open(FileStoreTable fileStoreTable, CoreOptions options) {}

void markDone(String partition) throws Exception;

static List<PartitionMarkDoneAction> createActions(
Expand All @@ -55,13 +57,14 @@ static List<PartitionMarkDoneAction> createActions(
return new MarkPartitionDoneEventAction(
createPartitionHandler(fileStoreTable, options));
case HTTP_REPORT:
return new HttpReportMarkDoneAction(fileStoreTable, options);
return new HttpReportMarkDoneAction();
case CUSTOM:
return generateCustomMarkDoneAction(cl, options);
default:
throw new UnsupportedOperationException(action.toString());
}
})
.peek(action -> action.open(fileStoreTable, options))
.collect(Collectors.toList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,6 @@ public void testCustomPartitionMarkDoneAction() throws Exception {
assertThat(table2.fileIO().exists(successFile)).isEqualTo(true);

assertThat(MockCustomPartitionMarkDoneAction.getMarkedDonePartitions().iterator().next())
.isEqualTo("a=0/");
.isEqualTo("table=default.T,partition=a=0/");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ public void stopServer() throws Exception {

@Test
public void testHttpReportMarkDoneActionSuccessResponse() throws Exception {
HttpReportMarkDoneAction httpReportMarkDoneAction = createHttpReportMarkDoneAction();
HttpReportMarkDoneAction httpReportMarkDoneAction = new HttpReportMarkDoneAction();
httpReportMarkDoneAction.open(fileStoreTable, createCoreOptions());

server.enqueueResponse(successResponse, 200);

Expand All @@ -97,16 +98,19 @@ public void testHttpReportMarkDoneActionSuccessResponse() throws Exception {

// test params is null.
params = null;
HttpReportMarkDoneAction httpReportMarkDoneAction3 = createHttpReportMarkDoneAction();
HttpReportMarkDoneAction httpReportMarkDoneAction3 = new HttpReportMarkDoneAction();
httpReportMarkDoneAction3.open(fileStoreTable, createCoreOptions());

server.enqueueResponse(successResponse, 200);
httpReportMarkDoneAction3.markDone(partition);
RecordedRequest request3 = server.takeRequest(10, TimeUnit.SECONDS);
assertRequest(request3);
}

@Test
public void testHttpReportMarkDoneActionFailedResponse() throws Exception {
HttpReportMarkDoneAction markDoneAction = createHttpReportMarkDoneAction();
public void testHttpReportMarkDoneActionFailedResponse() {
HttpReportMarkDoneAction markDoneAction = new HttpReportMarkDoneAction();
markDoneAction.open(fileStoreTable, createCoreOptions());

// status failed.
server.enqueueResponse(failedResponse, 200);
Expand Down Expand Up @@ -160,10 +164,6 @@ public static CoreOptions createCoreOptions() {
return new CoreOptions(httpOptions);
}

public HttpReportMarkDoneAction createHttpReportMarkDoneAction() {
return new HttpReportMarkDoneAction(fileStoreTable, createCoreOptions());
}

public FileStoreTable createFileStoreTable() throws Exception {
org.apache.paimon.fs.Path tablePath =
new org.apache.paimon.fs.Path(folder.newFolder().toURI().toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@

package org.apache.paimon.flink.sink.partition;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.partition.actions.PartitionMarkDoneAction;
import org.apache.paimon.table.FileStoreTable;

import java.io.IOException;
import java.util.HashSet;
Expand All @@ -29,9 +31,17 @@ public class MockCustomPartitionMarkDoneAction implements PartitionMarkDoneActio

private static final Set<String> markedDonePartitions = new HashSet<>();

private String tableName;

@Override
public void open(FileStoreTable fileStoreTable, CoreOptions options) {
this.tableName = fileStoreTable.fullName();
}

@Override
public void markDone(String partition) {
MockCustomPartitionMarkDoneAction.markedDonePartitions.add(partition);
MockCustomPartitionMarkDoneAction.markedDonePartitions.add(
String.format("table=%s,partition=%s", tableName, partition));
}

public static Set<String> getMarkedDonePartitions() {
Expand Down

0 comments on commit cbecad1

Please sign in to comment.