diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java index 0c1b860a2..d6c69c0bc 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java @@ -170,7 +170,7 @@ public boolean build() throws Exception { streamSource.process(buildProcessFunction()); for (Tuple2 dbTbl : dorisTables) { OutputTag recordOutputTag = - ParsingProcessFunction.createRecordOutputTag(dbTbl.f1); + ParsingProcessFunction.createRecordOutputTag(dbTbl.f0, dbTbl.f1); DataStream sideOutput = parsedStream.getSideOutput(recordOutputTag); int sinkParallel = sinkConfig.getInteger( @@ -230,7 +230,7 @@ public DorisSink buildDorisSink() { } public ParsingProcessFunction buildProcessFunction() { - return new ParsingProcessFunction(converter); + return new ParsingProcessFunction(database, converter); } /** create doris sink. */ @@ -479,7 +479,7 @@ private void tryCreateTableIfAbsent( } TableSchema dorisSchema = DorisSchemaFactory.createTableSchema( - database, + targetDb, dorisTable, schema.getFields(), schema.getPrimaryKeys(), diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/ParsingProcessFunction.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/ParsingProcessFunction.java index 787d0ae1a..22e2b9bcb 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/ParsingProcessFunction.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/ParsingProcessFunction.java @@ -21,7 +21,9 @@ import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag; +import org.apache.flink.util.StringUtils; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; @@ -32,8 +34,10 @@ public class ParsingProcessFunction extends ProcessFunction { protected ObjectMapper objectMapper = new ObjectMapper(); private transient Map> recordOutputTags; private DatabaseSync.TableNameConverter converter; + private String database; - public ParsingProcessFunction(DatabaseSync.TableNameConverter converter) { + public ParsingProcessFunction(String database, DatabaseSync.TableNameConverter converter) { + this.database = database; this.converter = converter; } @@ -47,8 +51,17 @@ public void processElement( String record, ProcessFunction.Context context, Collector collector) throws Exception { String tableName = getRecordTableName(record); - String dorisName = converter.convert(tableName); - context.output(getRecordOutputTag(dorisName), record); + String dorisTableName = converter.convert(tableName); + String dorisDbName = database; + if (StringUtils.isNullOrWhitespaceOnly(database)) { + dorisDbName = getRecordDatabaseName(record); + } + context.output(getRecordOutputTag(dorisDbName, dorisTableName), record); + } + + private String getRecordDatabaseName(String record) throws JsonProcessingException { + JsonNode recordRoot = objectMapper.readValue(record, JsonNode.class); + return extractJsonNode(recordRoot.get("source"), "db"); } protected String getRecordTableName(String record) throws Exception { @@ -60,12 +73,13 @@ protected String extractJsonNode(JsonNode record, String key) { return record != null && record.get(key) != null ? record.get(key).asText() : null; } - private OutputTag getRecordOutputTag(String tableName) { + private OutputTag getRecordOutputTag(String databaseName, String tableName) { + String tableIdentifier = databaseName + "." + tableName; return recordOutputTags.computeIfAbsent( - tableName, ParsingProcessFunction::createRecordOutputTag); + tableIdentifier, k -> createRecordOutputTag(databaseName, tableName)); } - public static OutputTag createRecordOutputTag(String tableName) { - return new OutputTag("record-" + tableName) {}; + public static OutputTag createRecordOutputTag(String databaseName, String tableName) { + return new OutputTag(String.format("record-%s-%s", databaseName, tableName)) {}; } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java index 79c261d0c..3526c075a 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java @@ -198,7 +198,7 @@ public DataStreamSource buildCdcSource(StreamExecutionEnvironment env) { @Override public ParsingProcessFunction buildProcessFunction() { - return new MongoParsingProcessFunction(converter); + return new MongoParsingProcessFunction(database, converter); } @Override diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoParsingProcessFunction.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoParsingProcessFunction.java index 737617a01..72c61567e 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoParsingProcessFunction.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoParsingProcessFunction.java @@ -27,8 +27,8 @@ public class MongoParsingProcessFunction extends ParsingProcessFunction { private static final Logger LOG = LoggerFactory.getLogger(MongoParsingProcessFunction.class); - public MongoParsingProcessFunction(TableNameConverter converter) { - super(converter); + public MongoParsingProcessFunction(String databaseName, TableNameConverter converter) { + super(databaseName, converter); } @Override diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/ContainerUtils.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/ContainerUtils.java index e4c99d5a4..f87e498c6 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/ContainerUtils.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/ContainerUtils.java @@ -104,6 +104,16 @@ public static void checkResult( List expected, String query, int columnSize) { + checkResult(connection, logger, expected, query, columnSize, true); + } + + public static void checkResult( + Connection connection, + Logger logger, + List expected, + String query, + int columnSize, + boolean ordered) { List actual = new ArrayList<>(); try (Statement statement = connection.createStatement()) { ResultSet sinkResultSet = statement.executeQuery(query); @@ -131,6 +141,13 @@ public static void checkResult( "checking test result. expected={}, actual={}", String.join(",", expected), String.join(",", actual)); - Assert.assertArrayEquals(expected.toArray(), actual.toArray()); + if (ordered) { + Assert.assertArrayEquals(expected.toArray(), actual.toArray()); + } else { + Assert.assertEquals(expected.size(), actual.size()); + Assert.assertArrayEquals( + expected.stream().sorted().toArray(Object[]::new), + actual.stream().sorted().toArray(Object[]::new)); + } } } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Mysql2DorisE2ECase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Mysql2DorisE2ECase.java index fe715f62d..cb7d83ada 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Mysql2DorisE2ECase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Mysql2DorisE2ECase.java @@ -36,6 +36,7 @@ public class Mysql2DorisE2ECase extends AbstractE2EService { private static final Logger LOG = LoggerFactory.getLogger(Mysql2DorisE2ECase.class); private static final String DATABASE = "test_e2e_mysql"; private static final String CREATE_DATABASE = "CREATE DATABASE IF NOT EXISTS " + DATABASE; + private static final String DROP_DATABASE = "DROP DATABASE IF EXISTS " + DATABASE; private static final String MYSQL_CONF = "--" + DatabaseSyncConfig.MYSQL_CONF; @Before @@ -56,13 +57,8 @@ private List setMysql2DorisDefaultConfig(List argList) { argList.add(MYSQL_CONF); argList.add(PASSWORD + "=" + getMySQLPassword()); argList.add(MYSQL_CONF); - argList.add(DATABASE_NAME + "=" + DATABASE); - // argList.add(MYSQL_CONF); - // argList.add("server-time-zone=UTC"); + argList.add("server-time-zone=UTC"); - // set doris database - argList.add(DORIS_DATABASE); - argList.add(DATABASE); setSinkConfDefaultConfig(argList); return argList; } @@ -82,15 +78,7 @@ private void initMysqlEnvironment(String sourcePath) { private void initDorisEnvironment() { LOG.info("Initializing Doris environment."); - ContainerUtils.executeSQLStatement(getDorisQueryConnection(), LOG, CREATE_DATABASE); - ContainerUtils.executeSQLStatement( - getDorisQueryConnection(), - LOG, - "DROP TABLE IF EXISTS test_e2e_mysql.tbl1", - "DROP TABLE IF EXISTS test_e2e_mysql.tbl2", - "DROP TABLE IF EXISTS test_e2e_mysql.tbl3", - "DROP TABLE IF EXISTS test_e2e_mysql.tbl4", - "DROP TABLE IF EXISTS test_e2e_mysql.tbl5"); + ContainerUtils.executeSQLStatement(getDorisQueryConnection(), LOG, DROP_DATABASE); } private void initEnvironment(String jobName, String mysqlSourcePath) { @@ -436,6 +424,132 @@ private String getCreateTableSQL(String database, String table) throws Exception throw new RuntimeException("Table not exist " + table); } + @Test + public void testMySQL2DorisMultiDatabaseSync() throws Exception { + String jobName = "testMySQL2DorisMultiDatabaseSync"; + ContainerUtils.executeSQLStatement( + getDorisQueryConnection(), LOG, "DROP DATABASE IF EXISTS test_e2e_mysql_db1"); + ContainerUtils.executeSQLStatement( + getDorisQueryConnection(), LOG, "DROP DATABASE IF EXISTS test_e2e_mysql_db2"); + initEnvironment(jobName, "container/e2e/mysql2doris/testMySQL2DorisMultiDbSync_init.sql"); + startMysql2DorisJob(jobName, "container/e2e/mysql2doris/testMySQL2DorisMultiDbSync.txt"); + + // wait 3 times checkpoint + Thread.sleep(30000); + LOG.info("Start to verify init result."); + List initExpected1 = Arrays.asList("1,db1_tb1,18"); + String sql1 = "SELECT * FROM test_e2e_mysql_db1.tbl1"; + ContainerUtils.checkResult(getDorisQueryConnection(), LOG, initExpected1, sql1, 3, false); + + List initExpected2 = Arrays.asList("1,db1_tb2,19"); + String sql2 = "SELECT * FROM test_e2e_mysql_db1.tbl2"; + ContainerUtils.checkResult(getDorisQueryConnection(), LOG, initExpected2, sql2, 3, false); + + List initExpected3 = Arrays.asList("1,db2_tb1,20"); + String sql3 = "SELECT * FROM test_e2e_mysql_db2.tbl1"; + ContainerUtils.checkResult(getDorisQueryConnection(), LOG, initExpected3, sql3, 3, false); + + List initExpected4 = Arrays.asList("1,db2_tb2,21"); + String sql4 = "SELECT * FROM test_e2e_mysql_db2.tbl2"; + ContainerUtils.checkResult(getDorisQueryConnection(), LOG, initExpected4, sql4, 3, false); + + List initExpected5 = Arrays.asList("1,db2_tb3,22"); + String sql5 = "SELECT * FROM test_e2e_mysql_db2.tbl3"; + ContainerUtils.checkResult(getDorisQueryConnection(), LOG, initExpected5, sql5, 3, false); + + // add incremental data + ContainerUtils.executeSQLStatement( + getMySQLQueryConnection(), + LOG, + "insert into test_e2e_mysql_db1.tbl1 values (2,'db1_tb1',180)", + "insert into test_e2e_mysql_db1.tbl2 values (2,'db1_tb2',190)", + "insert into test_e2e_mysql_db2.tbl1 values (2,'db2_tb1',200)", + "insert into test_e2e_mysql_db2.tbl2 values (2,'db2_tb2',210)", + "insert into test_e2e_mysql_db2.tbl3 values (2,'db2_tb3',220)"); + + Thread.sleep(20000); + List incrExpected1 = Arrays.asList("1,db1_tb1,18", "2,db1_tb1,180"); + ContainerUtils.checkResult(getDorisQueryConnection(), LOG, incrExpected1, sql1, 3, false); + + List incrExpected2 = Arrays.asList("1,db1_tb2,19", "2,db1_tb2,190"); + ContainerUtils.checkResult(getDorisQueryConnection(), LOG, incrExpected2, sql2, 3, false); + + List incrExpected3 = Arrays.asList("1,db2_tb1,20", "2,db2_tb1,200"); + ContainerUtils.checkResult(getDorisQueryConnection(), LOG, incrExpected3, sql3, 3, false); + + List incrExpected4 = Arrays.asList("1,db2_tb2,21", "2,db2_tb2,210"); + ContainerUtils.checkResult(getDorisQueryConnection(), LOG, incrExpected4, sql4, 3, false); + + List incrExpected5 = Arrays.asList("1,db2_tb3,22", "2,db2_tb3,220"); + ContainerUtils.checkResult(getDorisQueryConnection(), LOG, incrExpected5, sql5, 3, false); + + cancelE2EJob(jobName); + } + + /** + * Separate databases and tables to write to the same database and table + * + * @throws Exception + */ + @Test + public void testMySQL2DorisMultiDatabase2OneSync() throws Exception { + String jobName = "testMySQL2DorisMultiDatabase2OneSync"; + initEnvironment(jobName, "container/e2e/mysql2doris/testMySQL2DorisMultiDb2One_init.sql"); + startMysql2DorisJob(jobName, "container/e2e/mysql2doris/testMySQL2DorisMultiDb2One.txt"); + + // wait 3 times checkpoint + Thread.sleep(30000); + LOG.info("Start to verify init result."); + List initExpected = Arrays.asList("1,db1_tb1,18", "2,db2_tb1,20"); + String sql1 = "SELECT * FROM test_e2e_mysql.tbl1"; + ContainerUtils.checkResult(getDorisQueryConnection(), LOG, initExpected, sql1, 3, false); + + List initExpected2 = + Arrays.asList( + "1,db1_tb2_1,19", "2,db1_tb2_2,191", "3,db2_tb2_2,21", "4,db2_tbl2_2,211"); + String sql2 = "SELECT * FROM test_e2e_mysql.tbl2_merge"; + ContainerUtils.checkResult(getDorisQueryConnection(), LOG, initExpected2, sql2, 3, false); + + List initExpected3 = Arrays.asList("1,db2_tb3,22"); + String sql3 = "SELECT * FROM test_e2e_mysql.tbl3"; + ContainerUtils.checkResult(getDorisQueryConnection(), LOG, initExpected3, sql3, 3, false); + + // add incremental data + ContainerUtils.executeSQLStatement( + getMySQLQueryConnection(), + LOG, + "insert into test_e2e_mysql_db1.tbl1 values (3,'db1_tb1',180)", + "insert into test_e2e_mysql_db2.tbl1 values (4,'db2_tb1',200)", + "insert into test_e2e_mysql_db1.tbl2_1 values (5,'db1_tb2_1',1901)", + "insert into test_e2e_mysql_db1.tbl2_2 values (6,'db1_tb2_2',1902)", + "insert into test_e2e_mysql_db2.tbl2_1 values (7,'db2_tb2_1',2101)", + "insert into test_e2e_mysql_db2.tbl2_2 values (8,'db2_tb2_2',2102)", + "insert into test_e2e_mysql_db2.tbl3 values (3,'db2_tb3',220)"); + + Thread.sleep(20000); + + List incrExpected = + Arrays.asList("1,db1_tb1,18", "2,db2_tb1,20", "3,db1_tb1,180", "4,db2_tb1,200"); + ContainerUtils.checkResult(getDorisQueryConnection(), LOG, incrExpected, sql1, 3, false); + + List incrExpected2 = + Arrays.asList( + "1,db1_tb2_1,19", + "2,db1_tb2_2,191", + "3,db2_tb2_2,21", + "4,db2_tbl2_2,211", + "5,db1_tb2_1,1901", + "6,db1_tb2_2,1902", + "7,db2_tb2_1,2101", + "8,db2_tb2_2,2102"); + ContainerUtils.checkResult(getDorisQueryConnection(), LOG, incrExpected2, sql2, 3, false); + + List incrExpected3 = Arrays.asList("1,db2_tb3,22", "3,db2_tb3,220"); + ContainerUtils.checkResult(getDorisQueryConnection(), LOG, incrExpected3, sql3, 3, false); + + cancelE2EJob(jobName); + } + @After public void close() { try { diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoParsingProcessFunctionTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoParsingProcessFunctionTest.java index e0c09b0fd..3fca7b833 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoParsingProcessFunctionTest.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoParsingProcessFunctionTest.java @@ -28,7 +28,7 @@ public void getRecordTableName() throws Exception { String record = "{\"_id\":\"{\\\"_id\\\": {\\\"$oid\\\": \\\"66583533791a67a6f8c5a339\\\"}}\",\"operationType\":\"insert\",\"fullDocument\":\"{\\\"_id\\\": {\\\"$oid\\\": \\\"66583533791a67a6f8c5a339\\\"}, \\\"key1\\\": \\\"value1\\\"}\",\"source\":{\"ts_ms\":0,\"snapshot\":\"true\"},\"ts_ms\":1717065582062,\"ns\":{\"db\":\"test\",\"coll\":\"cdc_test\"},\"to\":null,\"documentKey\":\"{\\\"_id\\\": {\\\"$oid\\\": \\\"66583533791a67a6f8c5a339\\\"}}\",\"updateDescription\":null,\"clusterTime\":null,\"txnNumber\":null,\"lsid\":null}"; MongoParsingProcessFunction mongoParsingProcessFunction = - new MongoParsingProcessFunction(null); + new MongoParsingProcessFunction(null, null); String recordTableName = mongoParsingProcessFunction.getRecordTableName(record); assertEquals("cdc_test", recordTableName); } diff --git a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testAutoAddTable.txt b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testAutoAddTable.txt index 88ec4541a..0b9a6247d 100644 --- a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testAutoAddTable.txt +++ b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testAutoAddTable.txt @@ -1,4 +1,6 @@ mysql-sync-database + --database test_e2e_mysql + --mysql-conf database-name=test_e2e_mysql --including-tables "tbl.*|auto_add" --table-conf replication_num=1 --single-sink true diff --git a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testAutoAddTable_init.sql b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testAutoAddTable_init.sql index ec617f30c..f10424913 100644 --- a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testAutoAddTable_init.sql +++ b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testAutoAddTable_init.sql @@ -1,3 +1,4 @@ +DROP DATABASE if EXISTS test_e2e_mysql; CREATE DATABASE if NOT EXISTS test_e2e_mysql; DROP TABLE IF EXISTS test_e2e_mysql.tbl1; CREATE TABLE test_e2e_mysql.tbl1 ( diff --git a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2Doris.txt b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2Doris.txt index 601d08311..d88b20889 100644 --- a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2Doris.txt +++ b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2Doris.txt @@ -1,4 +1,6 @@ mysql-sync-database + --database test_e2e_mysql + --mysql-conf database-name=test_e2e_mysql --including-tables "tbl.*" --table-conf replication_num=1 --single-sink true diff --git a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisByDefault.txt b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisByDefault.txt index 6f69a75b1..b8b2974e6 100644 --- a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisByDefault.txt +++ b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisByDefault.txt @@ -1,3 +1,5 @@ mysql-sync-database + --database test_e2e_mysql + --mysql-conf database-name=test_e2e_mysql --including-tables "tbl1|tbl2|tbl3|tbl5" --table-conf replication_num=1 \ No newline at end of file diff --git a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisByDefault_init.sql b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisByDefault_init.sql index ec617f30c..f10424913 100644 --- a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisByDefault_init.sql +++ b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisByDefault_init.sql @@ -1,3 +1,4 @@ +DROP DATABASE if EXISTS test_e2e_mysql; CREATE DATABASE if NOT EXISTS test_e2e_mysql; DROP TABLE IF EXISTS test_e2e_mysql.tbl1; CREATE TABLE test_e2e_mysql.tbl1 ( diff --git a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisCreateTable.txt b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisCreateTable.txt index 053dc9efa..60511be25 100644 --- a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisCreateTable.txt +++ b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisCreateTable.txt @@ -1,4 +1,6 @@ mysql-sync-database + --database test_e2e_mysql + --mysql-conf database-name=test_e2e_mysql --including-tables "create_tbl_.*" --create-table-only --table-conf table-buckets=create_tbl_uniq:10,create_tbl_from_uniqindex.*:30 diff --git a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisCreateTable_init.sql b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisCreateTable_init.sql index cc3c16a6f..4b65306d2 100644 --- a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisCreateTable_init.sql +++ b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisCreateTable_init.sql @@ -1,3 +1,4 @@ +DROP DATABASE if EXISTS test_e2e_mysql; CREATE DATABASE if NOT EXISTS test_e2e_mysql; DROP TABLE IF EXISTS test_e2e_mysql.create_tbl_uniq; CREATE TABLE test_e2e_mysql.create_tbl_uniq ( diff --git a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisEnableDelete.txt b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisEnableDelete.txt index 1048916ca..4f71f00cc 100644 --- a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisEnableDelete.txt +++ b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisEnableDelete.txt @@ -1,4 +1,6 @@ mysql-sync-database + --database test_e2e_mysql + --mysql-conf database-name=test_e2e_mysql --including-tables "tbl1|tbl2|tbl3|tbl5" --table-conf replication_num=1 --sink-conf sink.enable-delete=false diff --git a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisEnableDelete_init.sql b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisEnableDelete_init.sql index ec617f30c..f10424913 100644 --- a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisEnableDelete_init.sql +++ b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisEnableDelete_init.sql @@ -1,3 +1,4 @@ +DROP DATABASE if EXISTS test_e2e_mysql; CREATE DATABASE if NOT EXISTS test_e2e_mysql; DROP TABLE IF EXISTS test_e2e_mysql.tbl1; CREATE TABLE test_e2e_mysql.tbl1 ( diff --git a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisMultiDb2One.txt b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisMultiDb2One.txt new file mode 100644 index 000000000..42e671e58 --- /dev/null +++ b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisMultiDb2One.txt @@ -0,0 +1,7 @@ +mysql-sync-database + --database test_e2e_mysql + --mysql-conf database-name=test_e2e_mysql_db.* + --including-tables ".*" + --multi-to-one-origin "tbl2.*" + --multi-to-one-target "tbl2_merge" + --table-conf replication_num=1 \ No newline at end of file diff --git a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisMultiDb2One_init.sql b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisMultiDb2One_init.sql new file mode 100644 index 000000000..81d659d1a --- /dev/null +++ b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisMultiDb2One_init.sql @@ -0,0 +1,70 @@ +-- tbl1 +DROP DATABASE if EXISTS test_e2e_mysql_db1; +CREATE DATABASE if NOT EXISTS test_e2e_mysql_db1; +DROP TABLE IF EXISTS test_e2e_mysql_db1.tbl1; +CREATE TABLE test_e2e_mysql_db1.tbl1 ( +`id` int NOT NULL, +`name` varchar(255) DEFAULT NULL, +`age` bigint DEFAULT NULL, +PRIMARY KEY (`id`) USING BTREE +); +insert into test_e2e_mysql_db1.tbl1 values (1,'db1_tb1',18); + +DROP DATABASE if EXISTS test_e2e_mysql_db2; +CREATE DATABASE if NOT EXISTS test_e2e_mysql_db2; +DROP TABLE IF EXISTS test_e2e_mysql_db2.tbl1; +CREATE TABLE test_e2e_mysql_db2.tbl1 ( +`id` int NOT NULL, +`name` varchar(255) DEFAULT NULL, +`age` bigint DEFAULT NULL, +PRIMARY KEY (`id`) USING BTREE +); +insert into test_e2e_mysql_db2.tbl1 values (2,'db2_tb1',20); + +-- tbl2_1 tbl2_2 +DROP TABLE IF EXISTS test_e2e_mysql_db1.tbl2_1; +CREATE TABLE test_e2e_mysql_db1.tbl2_1 ( +`id` int NOT NULL, +`name` varchar(255) DEFAULT NULL, +`age` bigint DEFAULT NULL, +PRIMARY KEY (`id`) USING BTREE +); +insert into test_e2e_mysql_db1.tbl2_1 values (1,'db1_tb2_1',19); + +DROP TABLE IF EXISTS test_e2e_mysql_db1.tbl2_2; +CREATE TABLE test_e2e_mysql_db1.tbl2_2 ( +`id` int NOT NULL, +`name` varchar(255) DEFAULT NULL, +`age` bigint DEFAULT NULL, +PRIMARY KEY (`id`) USING BTREE +); +insert into test_e2e_mysql_db1.tbl2_2 values (2,'db1_tb2_2',191); + +-- db2 +DROP TABLE IF EXISTS test_e2e_mysql_db2.tbl2_1; +CREATE TABLE test_e2e_mysql_db2.tbl2_1 ( +`id` int NOT NULL, +`name` varchar(255) DEFAULT NULL, +`age` bigint DEFAULT NULL, +PRIMARY KEY (`id`) USING BTREE +); +insert into test_e2e_mysql_db2.tbl2_1 values (3,'db2_tb2_2',21); + +DROP TABLE IF EXISTS test_e2e_mysql_db2.tbl2_2; +CREATE TABLE test_e2e_mysql_db2.tbl2_2 ( +`id` int NOT NULL, +`name` varchar(255) DEFAULT NULL, +`age` bigint DEFAULT NULL, +PRIMARY KEY (`id`) USING BTREE +); +insert into test_e2e_mysql_db2.tbl2_2 values (4,'db2_tbl2_2',211); + + +DROP TABLE IF EXISTS test_e2e_mysql_db2.tbl3; +CREATE TABLE test_e2e_mysql_db2.tbl3 ( +`id` int NOT NULL, +`name` varchar(255) DEFAULT NULL, +`age` bigint DEFAULT NULL, +PRIMARY KEY (`id`) USING BTREE +); +insert into test_e2e_mysql_db2.tbl3 values (1,'db2_tb3',22); \ No newline at end of file diff --git a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisMultiDbSync.txt b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisMultiDbSync.txt new file mode 100644 index 000000000..c12902e39 --- /dev/null +++ b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisMultiDbSync.txt @@ -0,0 +1,4 @@ +mysql-sync-database + --mysql-conf database-name=test_e2e_mysql_db.* + --including-tables ".*" + --table-conf replication_num=1 \ No newline at end of file diff --git a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisMultiDbSync_init.sql b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisMultiDbSync_init.sql new file mode 100644 index 000000000..60b022c5f --- /dev/null +++ b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisMultiDbSync_init.sql @@ -0,0 +1,51 @@ +-- db1 +DROP DATABASE if EXISTS test_e2e_mysql_db1; +CREATE DATABASE if NOT EXISTS test_e2e_mysql_db1; +DROP TABLE IF EXISTS test_e2e_mysql_db1.tbl1; +CREATE TABLE test_e2e_mysql_db1.tbl1 ( +`id` int NOT NULL, +`name` varchar(255) DEFAULT NULL, +`age` bigint DEFAULT NULL, +PRIMARY KEY (`id`) USING BTREE +); +insert into test_e2e_mysql_db1.tbl1 values (1,'db1_tb1',18); + + +DROP TABLE IF EXISTS test_e2e_mysql_db1.tbl2; +CREATE TABLE test_e2e_mysql_db1.tbl2 ( +`id` int NOT NULL, +`name` varchar(255) DEFAULT NULL, +`age` bigint DEFAULT NULL, +PRIMARY KEY (`id`) USING BTREE +); +insert into test_e2e_mysql_db1.tbl2 values (1,'db1_tb2',19); + +-- db2 +DROP DATABASE if EXISTS test_e2e_mysql_db2; +CREATE DATABASE if NOT EXISTS test_e2e_mysql_db2; +DROP TABLE IF EXISTS test_e2e_mysql_db2.tbl1; +CREATE TABLE test_e2e_mysql_db2.tbl1 ( +`id` int NOT NULL, +`name` varchar(255) DEFAULT NULL, +`age` bigint DEFAULT NULL, +PRIMARY KEY (`id`) USING BTREE +); +insert into test_e2e_mysql_db2.tbl1 values (1,'db2_tb1',20); + +DROP TABLE IF EXISTS test_e2e_mysql_db2.tbl2; +CREATE TABLE test_e2e_mysql_db2.tbl2 ( +`id` int NOT NULL, +`name` varchar(255) DEFAULT NULL, +`age` bigint DEFAULT NULL, +PRIMARY KEY (`id`) USING BTREE +); +insert into test_e2e_mysql_db2.tbl2 values (1,'db2_tb2',21); + +DROP TABLE IF EXISTS test_e2e_mysql_db2.tbl3; +CREATE TABLE test_e2e_mysql_db2.tbl3 ( +`id` int NOT NULL, +`name` varchar(255) DEFAULT NULL, +`age` bigint DEFAULT NULL, +PRIMARY KEY (`id`) USING BTREE +); +insert into test_e2e_mysql_db2.tbl3 values (1,'db2_tb3',22); \ No newline at end of file diff --git a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisSQLParse.txt b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisSQLParse.txt index d863ecfae..6876afae3 100644 --- a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisSQLParse.txt +++ b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisSQLParse.txt @@ -1,4 +1,6 @@ mysql-sync-database + --database test_e2e_mysql + --mysql-conf database-name=test_e2e_mysql --including-tables "tbl.*|add_tbl" --table-conf replication_num=1 --schema-change-mode sql_parser diff --git a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisSQLParse_init.sql b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisSQLParse_init.sql index ec617f30c..f10424913 100644 --- a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisSQLParse_init.sql +++ b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisSQLParse_init.sql @@ -1,3 +1,4 @@ +DROP DATABASE if EXISTS test_e2e_mysql; CREATE DATABASE if NOT EXISTS test_e2e_mysql; DROP TABLE IF EXISTS test_e2e_mysql.tbl1; CREATE TABLE test_e2e_mysql.tbl1 ( diff --git a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2Doris_init.sql b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2Doris_init.sql index ec617f30c..f10424913 100644 --- a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2Doris_init.sql +++ b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2Doris_init.sql @@ -1,3 +1,4 @@ +DROP DATABASE if EXISTS test_e2e_mysql; CREATE DATABASE if NOT EXISTS test_e2e_mysql; DROP TABLE IF EXISTS test_e2e_mysql.tbl1; CREATE TABLE test_e2e_mysql.tbl1 (