Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Add open method to PartitionMarkDone for CustomPartitionMarkDone. #4995

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,30 +55,34 @@
/** Report partition submission information to remote http server. */
public class HttpReportMarkDoneAction implements PartitionMarkDoneAction {

private final OkHttpClient client;
private final String url;
private final ObjectMapper mapper;
private OkHttpClient client;
private String url;
private ObjectMapper mapper;
private static final MediaType MEDIA_TYPE = MediaType.parse("application/json");

private final FileStoreTable fileStoreTable;
private String tableName;
private String location;

private final String params;
private String params;

private static final String RESPONSE_SUCCESS = "SUCCESS";

private static final String THREAD_NAME = "PAIMON-HTTP-REPORT-MARK-DONE-ACTION-THREAD";

public HttpReportMarkDoneAction(FileStoreTable fileStoreTable, CoreOptions options) {
@Override
public void open(FileStoreTable fileStoreTable, CoreOptions options) {

Preconditions.checkArgument(
!StringUtils.isNullOrWhitespaceOnly(options.httpReportMarkDoneActionUrl()),
String.format(
"Parameter %s must be non-empty when you use `http-report` partition mark done action.",
PARTITION_MARK_DONE_ACTION_URL.key()));

this.fileStoreTable = fileStoreTable;
this.params = options.httpReportMarkDoneActionParams();
this.url = options.httpReportMarkDoneActionUrl();
this.tableName = fileStoreTable.fullName();
this.location = fileStoreTable.location().toString();

this.mapper = new ObjectMapper();
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
Expand All @@ -102,10 +106,7 @@ public void markDone(String partition) throws Exception {
HttpReportMarkDoneResponse response =
post(
new HttpReportMarkDoneRequest(
params,
fileStoreTable.fullName(),
fileStoreTable.location().toString(),
partition),
params, this.tableName, this.location, partition),
Collections.emptyMap());
Preconditions.checkState(
reportIsSuccess(response),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
/** Action to mark partitions done. */
public interface PartitionMarkDoneAction extends Closeable {

default void open(FileStoreTable fileStoreTable, CoreOptions options) {}

void markDone(String partition) throws Exception;

static List<PartitionMarkDoneAction> createActions(
Expand All @@ -55,13 +57,14 @@ static List<PartitionMarkDoneAction> createActions(
return new MarkPartitionDoneEventAction(
createPartitionHandler(fileStoreTable, options));
case HTTP_REPORT:
return new HttpReportMarkDoneAction(fileStoreTable, options);
return new HttpReportMarkDoneAction();
case CUSTOM:
return generateCustomMarkDoneAction(cl, options);
default:
throw new UnsupportedOperationException(action.toString());
}
})
.peek(action -> action.open(fileStoreTable, options))
.collect(Collectors.toList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,13 +167,15 @@ public void testPartitionMarkDoneWithMultiplePartitionKey(boolean hasPk, String
@MethodSource("testArguments")
public void testCustomPartitionMarkDoneAction(boolean hasPk, String invoker) throws Exception {

MockCustomPartitionMarkDoneAction.getMarkedDonePartitions().clear();
Map<String, String> options = new HashMap<>(2);
options.put(PARTITION_MARK_DONE_ACTION.key(), SUCCESS_FILE + "," + CUSTOM);
options.put(
PARTITION_MARK_DONE_CUSTOM_CLASS.key(),
MockCustomPartitionMarkDoneAction.class.getName());

FileStoreTable table = prepareTable(hasPk, options);
String fullTableName = table.fullName();

switch (invoker) {
case "action":
Expand Down Expand Up @@ -217,7 +219,9 @@ public void testCustomPartitionMarkDoneAction(boolean hasPk, String invoker) thr
assertThat(successFile2).isNotNull();

assertThat(MockCustomPartitionMarkDoneAction.getMarkedDonePartitions())
.containsExactlyInAnyOrder("partKey0=0/partKey1=1/", "partKey0=1/partKey1=0/");
.containsExactlyInAnyOrder(
"table=" + fullTableName + ",partition=partKey0=0/partKey1=1/",
"table=" + fullTableName + ",partition=partKey0=1/partKey1=0/");
}

@ParameterizedTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,6 @@ public void testCustomPartitionMarkDoneAction() throws Exception {
assertThat(table2.fileIO().exists(successFile)).isEqualTo(true);

assertThat(MockCustomPartitionMarkDoneAction.getMarkedDonePartitions().iterator().next())
.isEqualTo("a=0/");
.isEqualTo("table=default.T,partition=a=0/");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ public void stopServer() throws Exception {

@Test
public void testHttpReportMarkDoneActionSuccessResponse() throws Exception {
HttpReportMarkDoneAction httpReportMarkDoneAction = createHttpReportMarkDoneAction();
HttpReportMarkDoneAction httpReportMarkDoneAction = new HttpReportMarkDoneAction();
httpReportMarkDoneAction.open(fileStoreTable, createCoreOptions());

server.enqueueResponse(successResponse, 200);

Expand All @@ -97,16 +98,19 @@ public void testHttpReportMarkDoneActionSuccessResponse() throws Exception {

// test params is null.
params = null;
HttpReportMarkDoneAction httpReportMarkDoneAction3 = createHttpReportMarkDoneAction();
HttpReportMarkDoneAction httpReportMarkDoneAction3 = new HttpReportMarkDoneAction();
httpReportMarkDoneAction3.open(fileStoreTable, createCoreOptions());

server.enqueueResponse(successResponse, 200);
httpReportMarkDoneAction3.markDone(partition);
RecordedRequest request3 = server.takeRequest(10, TimeUnit.SECONDS);
assertRequest(request3);
}

@Test
public void testHttpReportMarkDoneActionFailedResponse() throws Exception {
HttpReportMarkDoneAction markDoneAction = createHttpReportMarkDoneAction();
public void testHttpReportMarkDoneActionFailedResponse() {
HttpReportMarkDoneAction markDoneAction = new HttpReportMarkDoneAction();
markDoneAction.open(fileStoreTable, createCoreOptions());

// status failed.
server.enqueueResponse(failedResponse, 200);
Expand Down Expand Up @@ -160,10 +164,6 @@ public static CoreOptions createCoreOptions() {
return new CoreOptions(httpOptions);
}

public HttpReportMarkDoneAction createHttpReportMarkDoneAction() {
return new HttpReportMarkDoneAction(fileStoreTable, createCoreOptions());
}

public FileStoreTable createFileStoreTable() throws Exception {
org.apache.paimon.fs.Path tablePath =
new org.apache.paimon.fs.Path(folder.newFolder().toURI().toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@

package org.apache.paimon.flink.sink.partition;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.partition.actions.PartitionMarkDoneAction;
import org.apache.paimon.table.FileStoreTable;

import java.io.IOException;
import java.util.HashSet;
Expand All @@ -29,9 +31,17 @@ public class MockCustomPartitionMarkDoneAction implements PartitionMarkDoneActio

private static final Set<String> markedDonePartitions = new HashSet<>();

private String tableName;

@Override
public void open(FileStoreTable fileStoreTable, CoreOptions options) {
this.tableName = fileStoreTable.fullName();
}

@Override
public void markDone(String partition) {
MockCustomPartitionMarkDoneAction.markedDonePartitions.add(partition);
MockCustomPartitionMarkDoneAction.markedDonePartitions.add(
String.format("table=%s,partition=%s", tableName, partition));
}

public static Set<String> getMarkedDonePartitions() {
Expand Down
Loading