Skip to content

Commit

Permalink
[test] Fix unstable tests in MySqlSyncDatabaseTableListITCase (apache…
Browse files Browse the repository at this point in the history
  • Loading branch information
tsreaper authored Sep 26, 2024
1 parent 703a2a2 commit f394501
Showing 1 changed file with 14 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.flink.action.cdc.mysql;

import org.apache.paimon.flink.action.MultiTablesSinkMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
Expand All @@ -28,7 +29,9 @@
import org.junit.jupiter.api.Timeout;

import java.sql.Statement;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;

Expand All @@ -45,7 +48,7 @@ public static void startContainers() {
}

@Test
@Timeout(180)
@Timeout(60)
public void testActionRunResult() throws Exception {
Map<String, String> mySqlConfig = getBasicMySqlConfig();
mySqlConfig.put(
Expand Down Expand Up @@ -80,14 +83,18 @@ public void testActionRunResult() throws Exception {
// test newly created tables
if (mode == COMBINED) {
try (Statement statement = getStatement()) {
// ensure the job steps into incremental phase
waitForResult(
Collections.singletonList("+I[1, A]"),
getFileStoreTable("shard_1_t2"),
FileStoreTable t2 = getFileStoreTable("shard_1_t2");
RowType rowTypeT2 =
RowType.of(
new DataType[] {DataTypes.INT().notNull(), DataTypes.VARCHAR(100)},
new String[] {"k", "name"}),
Collections.singletonList("k"));
new String[] {"k", "name"});
List<String> primaryKeysT2 = Collections.singletonList("k");

// ensure the job steps into incremental phase
waitForResult(Collections.singletonList("+I[1, A]"), t2, rowTypeT2, primaryKeysT2);
statement.executeUpdate("USE shard_1");
statement.executeUpdate("INSERT INTO t2 VALUES (2, 'B')");
waitForResult(Arrays.asList("+I[1, A]", "+I[2, B]"), t2, rowTypeT2, primaryKeysT2);

// case 1: new tables in existed database
statement.executeUpdate("USE shard_2");
Expand Down

0 comments on commit f394501

Please sign in to comment.