From 2d67738bb95ecaf57337a51e6bb92c0251b3aaea Mon Sep 17 00:00:00 2001 From: DongLiang-0 <1747644936@qq.com> Date: Wed, 31 Jan 2024 18:44:50 +0800 Subject: [PATCH] [fix]Fix multi-table sink, schema change result is incorrect --- .../JsonDebeziumSchemaSerializer.java | 23 ++- .../JsonDebeziumSchemaChange.java | 7 +- .../JsonDebeziumSchemaChangeImpl.java | 3 +- .../JsonDebeziumSchemaChangeImplV2.java | 49 ++++-- .../TestJsonDebeziumSchemaChangeImplV2.java | 146 +++++++++++++++--- .../tools/cdc/CdcMysqlSyncDatabaseCase.java | 2 + 6 files changed, 189 insertions(+), 41 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java index 5bea2d931..43714f0b1 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java @@ -25,6 +25,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.JsonNodeFactory; import com.fasterxml.jackson.databind.node.NullNode; +import org.apache.commons.collections.CollectionUtils; import org.apache.doris.flink.cfg.DorisExecutionOptions; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumChangeContext; @@ -36,8 +37,10 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.HashSet; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.regex.Pattern; import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_DEFAULT; @@ -71,6 +74,8 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer initTableSet = new HashSet<>(); + private final Set tableMappingSet = new HashSet<>(); public JsonDebeziumSchemaSerializer( DorisOptions dorisOptions, @@ -156,13 +161,25 @@ public DorisRecord serialize(String record) throws IOException { return null; } - if (firstLoad) { - schemaChange.init(recordRoot); - firstLoad = false; + Map tableMapping = schemaChange.getTableMapping(); + if (initSchemaChange(firstLoad, tableMapping)) { + schemaChange.init(recordRoot, initTableSet); + this.firstLoad = false; } + return dataChange.serialize(record, recordRoot, op); } + private boolean initSchemaChange(boolean firstLoad, Map tableMapping) { + if (firstLoad) { + return true; + } + tableMappingSet.clear(); + tableMappingSet.addAll(tableMapping.keySet()); + tableMappingSet.removeAll(initTableSet); + return CollectionUtils.isNotEmpty(tableMappingSet); + } + private String extractJsonNode(JsonNode record, String key) { return record != null && record.get(key) != null && !(record.get(key) instanceof NullNode) ? record.get(key).asText() diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java index 4c6716441..0ea2bce1d 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java @@ -32,6 +32,7 @@ import org.apache.doris.flink.tools.cdc.SourceSchema; import java.util.Map; +import java.util.Set; import java.util.regex.Pattern; /** @@ -60,7 +61,7 @@ public abstract class JsonDebeziumSchemaChange implements ChangeEvent { public abstract boolean schemaChange(JsonNode recordRoot); - public abstract void init(JsonNode recordRoot); + public abstract void init(JsonNode recordRoot, Set initTableSet); /** When cdc synchronizes multiple tables, it will capture multiple table schema changes. */ protected boolean checkTable(JsonNode recordRoot) { @@ -136,6 +137,10 @@ protected JsonNode extractHistoryRecord(JsonNode record) throws JsonProcessingEx return record; } + public Map getTableMapping() { + return tableMapping; + } + @VisibleForTesting public void setSchemaChangeManager(SchemaChangeManager schemaChangeManager) { this.schemaChangeManager = schemaChangeManager; diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImpl.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImpl.java index 4cf097053..e119c355d 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImpl.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImpl.java @@ -32,6 +32,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -55,7 +56,7 @@ public JsonDebeziumSchemaChangeImpl(JsonDebeziumChangeContext changeContext) { } @Override - public void init(JsonNode recordRoot) { + public void init(JsonNode recordRoot, Set initTableSet) { // do nothing } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java index e5f699433..3057860b0 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java @@ -48,6 +48,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -66,13 +67,15 @@ public class JsonDebeziumSchemaChangeImplV2 extends JsonDebeziumSchemaChange { Pattern.compile( "ALTER\\s+TABLE\\s+(\\w+)\\s+RENAME\\s+COLUMN\\s+(\\w+)\\s+TO\\s+(\\w+)", Pattern.CASE_INSENSITIVE); - private Map originFieldSchemaMap; + // schemaChange saves table names, field, and field column information + private Map> originFieldSchemaMap = new LinkedHashMap<>(); private SourceConnector sourceConnector; // create table properties private final Map tableProperties; private String targetDatabase; private String targetTablePrefix; private String targetTableSuffix; + private final Set filledTables = new HashSet<>(); public JsonDebeziumSchemaChangeImplV2(JsonDebeziumChangeContext changeContext) { this.addDropDDLPattern = Pattern.compile(addDropDDLRegex, Pattern.CASE_INSENSITIVE); @@ -95,14 +98,20 @@ public JsonDebeziumSchemaChangeImplV2(JsonDebeziumChangeContext changeContext) { } @Override - public void init(JsonNode recordRoot) { - originFieldSchemaMap = new LinkedHashMap<>(); + public void init(JsonNode recordRoot, Set initTableSet) { + String dorisTable = getDorisTableIdentifier(recordRoot); + if (Objects.isNull(dorisTable) || originFieldSchemaMap.containsKey(dorisTable)) { + return; + } + initTableSet.add(dorisTable); Set columnNameSet = extractAfterRow(recordRoot).keySet(); if (CollectionUtils.isEmpty(columnNameSet)) { columnNameSet = extractBeforeRow(recordRoot).keySet(); } - columnNameSet.forEach( - columnName -> originFieldSchemaMap.put(columnName, new FieldSchema())); + Map fieldSchemaMap = new LinkedHashMap<>(); + columnNameSet.forEach(columnName -> fieldSchemaMap.put(columnName, new FieldSchema())); + + originFieldSchemaMap.put(dorisTable, fieldSchemaMap); } @Override @@ -184,16 +193,20 @@ public List extractDDLList(JsonNode record) throws IOException { sourceConnector = SourceConnector.valueOf( record.get("source").get("connector").asText().toUpperCase()); - fillOriginSchema(columns); + } + if (!filledTables.contains(dorisTable)) { + fillOriginSchema(dorisTable, columns); + filledTables.add(dorisTable); } + Map fieldSchemaMap = originFieldSchemaMap.get(dorisTable); // rename ddl Matcher renameMatcher = renameDDLPattern.matcher(ddl); if (renameMatcher.find()) { String oldColumnName = renameMatcher.group(2); String newColumnName = renameMatcher.group(3); return SchemaChangeHelper.generateRenameDDLSql( - dorisTable, oldColumnName, newColumnName, originFieldSchemaMap); + dorisTable, oldColumnName, newColumnName, fieldSchemaMap); } // add/drop ddl @@ -201,7 +214,7 @@ public List extractDDLList(JsonNode record) throws IOException { for (JsonNode column : columns) { buildFieldSchema(updateFiledSchema, column); } - SchemaChangeHelper.compareSchema(updateFiledSchema, originFieldSchemaMap); + SchemaChangeHelper.compareSchema(updateFiledSchema, fieldSchemaMap); // In order to avoid other source table column change operations other than add/drop/rename, // which may lead to the accidental deletion of the doris column. Matcher matcher = addDropDDLPattern.matcher(ddl); @@ -330,16 +343,17 @@ private Map extractRow(JsonNode recordRow) { } @VisibleForTesting - public void fillOriginSchema(JsonNode columns) { - if (Objects.nonNull(originFieldSchemaMap)) { + public void fillOriginSchema(String tableName, JsonNode columns) { + Map fieldSchemaMap = originFieldSchemaMap.get(tableName); + if (Objects.nonNull(fieldSchemaMap)) { for (JsonNode column : columns) { String fieldName = column.get("name").asText(); - if (originFieldSchemaMap.containsKey(fieldName)) { + if (fieldSchemaMap.containsKey(fieldName)) { String dorisTypeName = buildDorisTypeName(column); String defaultValue = handleDefaultValue(extractJsonNode(column, "defaultValueExpression")); String comment = extractJsonNode(column, "comment"); - FieldSchema fieldSchema = originFieldSchemaMap.get(fieldName); + FieldSchema fieldSchema = fieldSchemaMap.get(fieldName); fieldSchema.setName(fieldName); fieldSchema.setTypeString(dorisTypeName); fieldSchema.setComment(comment); @@ -351,8 +365,10 @@ public void fillOriginSchema(JsonNode columns) { "Current schema change failed! You need to ensure that " + "there is data in the table." + dorisOptions.getTableIdentifier()); - originFieldSchemaMap = new LinkedHashMap<>(); - columns.forEach(column -> buildFieldSchema(originFieldSchemaMap, column)); + fieldSchemaMap = new LinkedHashMap<>(); + Map finalFieldSchemaMap = fieldSchemaMap; + columns.forEach(column -> buildFieldSchema(finalFieldSchemaMap, column)); + originFieldSchemaMap.put(tableName, fieldSchemaMap); } } @@ -403,12 +419,13 @@ private String handleDefaultValue(String defaultValue) { } @VisibleForTesting - public void setOriginFieldSchemaMap(Map originFieldSchemaMap) { + public void setOriginFieldSchemaMap( + Map> originFieldSchemaMap) { this.originFieldSchemaMap = originFieldSchemaMap; } @VisibleForTesting - public Map getOriginFieldSchemaMap() { + public Map> getOriginFieldSchemaMap() { return originFieldSchemaMap; } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java index 8aca52111..22dd1a84f 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java @@ -73,16 +73,18 @@ public void testExtractDDLListMultipleColumns() throws IOException { String sql3 = "ALTER TABLE `test`.`t1` DROP COLUMN `c13`"; List srcSqlList = Arrays.asList(sql0, sql1, sql2, sql3); - Map originFiledSchemaMap = new LinkedHashMap<>(); - originFiledSchemaMap.put("c2", new FieldSchema()); - originFiledSchemaMap.put("c555", new FieldSchema()); - originFiledSchemaMap.put("c666", new FieldSchema()); - originFiledSchemaMap.put("c4", new FieldSchema()); - originFiledSchemaMap.put("c13", new FieldSchema()); - - String record = + Map> originFiledSchemaMap = new LinkedHashMap<>(); + Map filedSchemaMap = new LinkedHashMap<>(); + filedSchemaMap.put("c2", new FieldSchema()); + filedSchemaMap.put("c555", new FieldSchema()); + filedSchemaMap.put("c666", new FieldSchema()); + filedSchemaMap.put("c4", new FieldSchema()); + filedSchemaMap.put("c13", new FieldSchema()); + originFiledSchemaMap.put("test.t1", filedSchemaMap); + + String record1 = "{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1691033764674,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000029\",\"pos\":23305,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000029\\\",\\\"pos\\\":23305,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1691033764,\\\"file\\\":\\\"binlog.000029\\\",\\\"pos\\\":23464,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table t1 drop c11, drop column c3, add c12 int default 100\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"t1\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"10000\\\",\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c2\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c555\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":100,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c666\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":4,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"100\\\",\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c4\\\",\\\"jdbcType\\\":-5,\\\"typeName\\\":\\\"BIGINT\\\",\\\"typeExpression\\\":\\\"BIGINT\\\",\\\"charsetName\\\":null,\\\"position\\\":5,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"555\\\",\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c199\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":6,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c12\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":7,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"100\\\",\\\"enumValues\\\":[]}]},\\\"comment\\\":null}]}\"}"; - JsonNode recordRoot = objectMapper.readTree(record); + JsonNode recordRoot = objectMapper.readTree(record1); schemaChange.setOriginFieldSchemaMap(originFiledSchemaMap); List ddlSQLList = schemaChange.extractDDLList(recordRoot); for (int i = 0; i < ddlSQLList.size(); i++) { @@ -157,15 +159,18 @@ public void testFillOriginSchema() throws IOException { "test_time", new FieldSchema("test_time", "DATETIMEV2(0)", null, null)); srcFiledSchemaMap.put("c1", new FieldSchema("c1", "INT", "100", null)); + String tableName = "db.test_fill"; schemaChange.setSourceConnector("mysql"); String columnsString = "[{\"name\":\"id\",\"jdbcType\":4,\"typeName\":\"INT\",\"typeExpression\":\"INT\",\"charsetName\":null,\"position\":1,\"optional\":false,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":false,\"enumValues\":[]},{\"name\":\"name\",\"jdbcType\":12,\"typeName\":\"VARCHAR\",\"typeExpression\":\"VARCHAR\",\"charsetName\":\"utf8mb4\",\"length\":50,\"position\":2,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"enumValues\":[]},{\"name\":\"test_time\",\"jdbcType\":93,\"typeName\":\"DATETIME\",\"typeExpression\":\"DATETIME\",\"charsetName\":null,\"position\":3,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"enumValues\":[]},{\"name\":\"c1\",\"jdbcType\":4,\"typeName\":\"INT\",\"typeExpression\":\"INT\",\"charsetName\":null,\"position\":4,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"defaultValueExpression\":\"100\",\"enumValues\":[]},{\"name\":\"cc\",\"jdbcType\":4,\"typeName\":\"INT\",\"typeExpression\":\"INT\",\"charsetName\":null,\"position\":5,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"defaultValueExpression\":\"100\",\"enumValues\":[]}]"; JsonNode columns = objectMapper.readTree(columnsString); - schemaChange.fillOriginSchema(columns); - Map originFieldSchemaMap = schemaChange.getOriginFieldSchemaMap(); + schemaChange.fillOriginSchema(tableName, columns); + Map> originFieldSchemaMap = + schemaChange.getOriginFieldSchemaMap(); + Map fieldSchemaMap = originFieldSchemaMap.get(tableName); Iterator> originFieldSchemaIterator = - originFieldSchemaMap.entrySet().iterator(); + fieldSchemaMap.entrySet().iterator(); for (Entry entry : srcFiledSchemaMap.entrySet()) { FieldSchema srcFiledSchema = entry.getValue(); Entry originField = originFieldSchemaIterator.next(); @@ -180,6 +185,102 @@ public void testFillOriginSchema() throws IOException { } } + @Test + public void testMultipleFillOriginSchema() throws IOException { + Map> originFiledSchema = buildOriginFiledSchema(); + String tableName1 = "db.test_fill"; + String tableName2 = "test.t1"; + + schemaChange.setOriginFieldSchemaMap(originFiledSchema); + schemaChange.setSourceConnector("mysql"); + String columnsString1 = + "[{\"name\":\"id\",\"jdbcType\":4,\"typeName\":\"INT\",\"typeExpression\":\"INT\",\"charsetName\":null,\"position\":1,\"optional\":false,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":false,\"enumValues\":[]},{\"name\":\"name\",\"jdbcType\":12,\"typeName\":\"VARCHAR\",\"typeExpression\":\"VARCHAR\",\"charsetName\":\"utf8mb4\",\"length\":50,\"position\":2,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"enumValues\":[]},{\"name\":\"test_time\",\"jdbcType\":93,\"typeName\":\"DATETIME\",\"typeExpression\":\"DATETIME\",\"charsetName\":null,\"position\":3,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"enumValues\":[]},{\"name\":\"c1\",\"jdbcType\":4,\"typeName\":\"INT\",\"typeExpression\":\"INT\",\"charsetName\":null,\"position\":4,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"defaultValueExpression\":\"100\",\"enumValues\":[]},{\"name\":\"cc\",\"jdbcType\":4,\"typeName\":\"INT\",\"typeExpression\":\"INT\",\"charsetName\":null,\"position\":5,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"defaultValueExpression\":\"100\",\"enumValues\":[]}]"; + JsonNode columns1 = objectMapper.readTree(columnsString1); + + String columnsString2 = + "[{\"name\":\"id\",\"jdbcType\":4,\"typeName\":\"INT\",\"typeExpression\":\"INT\",\"charsetName\":null,\"position\":1,\"optional\":false,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"defaultValueExpression\":\"10000\",\"enumValues\":[]},{\"name\":\"c2\",\"jdbcType\":4,\"typeName\":\"INT\",\"typeExpression\":\"INT\",\"charsetName\":null,\"position\":2,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"enumValues\":[]},{\"name\":\"c555\",\"jdbcType\":12,\"typeName\":\"VARCHAR\",\"typeExpression\":\"VARCHAR\",\"charsetName\":\"utf8mb4\",\"length\":100,\"position\":3,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"enumValues\":[]},{\"name\":\"c666\",\"jdbcType\":4,\"typeName\":\"INT\",\"typeExpression\":\"INT\",\"charsetName\":null,\"position\":4,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"defaultValueExpression\":\"100\",\"enumValues\":[]},{\"name\":\"c4\",\"jdbcType\":-5,\"typeName\":\"BIGINT\",\"typeExpression\":\"BIGINT\",\"charsetName\":null,\"position\":5,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"defaultValueExpression\":\"555\",\"enumValues\":[]},{\"name\":\"c199\",\"jdbcType\":4,\"typeName\":\"INT\",\"typeExpression\":\"INT\",\"charsetName\":null,\"position\":6,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"enumValues\":[]},{\"name\":\"c12\",\"jdbcType\":4,\"typeName\":\"INT\",\"typeExpression\":\"INT\",\"charsetName\":null,\"position\":7,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"defaultValueExpression\":\"100\",\"enumValues\":[]}]"; + JsonNode columns2 = objectMapper.readTree(columnsString2); + + schemaChange.fillOriginSchema(tableName1, columns1); + schemaChange.fillOriginSchema(tableName2, columns2); + Map> originFieldSchemaMap = + schemaChange.getOriginFieldSchemaMap(); + + Map originSchema1 = originFieldSchemaMap.get(tableName1); + Map originSchema2 = originFieldSchemaMap.get(tableName2); + + Map> scrFiledSchema = buildSrcFiledSchema(); + Map scrSchema1 = scrFiledSchema.get(tableName1); + Map scrSchema2 = scrFiledSchema.get(tableName2); + + compareResults(originSchema1, scrSchema1); + compareResults(originSchema2, scrSchema2); + } + + private void compareResults( + Map originSchema, Map scrSchema) { + Iterator> originFieldSchemaIterator = + originSchema.entrySet().iterator(); + for (Entry srcEntry : scrSchema.entrySet()) { + FieldSchema srcFiledSchema = srcEntry.getValue(); + Entry originField = originFieldSchemaIterator.next(); + + Assert.assertEquals(srcEntry.getKey(), originField.getKey()); + Assert.assertEquals(srcFiledSchema.getName(), originField.getValue().getName()); + Assert.assertEquals( + srcFiledSchema.getTypeString(), originField.getValue().getTypeString()); + Assert.assertEquals( + srcFiledSchema.getDefaultValue(), originField.getValue().getDefaultValue()); + Assert.assertEquals(srcFiledSchema.getComment(), originField.getValue().getComment()); + } + } + + private Map> buildSrcFiledSchema() { + String tab1 = "db.test_fill"; + String tab2 = "test.t1"; + Map> scrFiledSchema = new LinkedHashMap<>(); + Map filedSchemaMap1 = new LinkedHashMap<>(); + filedSchemaMap1.put("id", new FieldSchema("id", "INT", null, null)); + filedSchemaMap1.put("name", new FieldSchema("name", "VARCHAR(150)", null, null)); + filedSchemaMap1.put("test_time", new FieldSchema("test_time", "DATETIMEV2(0)", null, null)); + filedSchemaMap1.put("c1", new FieldSchema("c1", "INT", "100", null)); + scrFiledSchema.put(tab1, filedSchemaMap1); + + Map filedSchemaMap2 = new LinkedHashMap<>(); + filedSchemaMap2.put("id", new FieldSchema("id", "INT", "10000", null)); + filedSchemaMap2.put("c2", new FieldSchema("c2", "INT", null, null)); + filedSchemaMap2.put("c555", new FieldSchema("c555", "VARCHAR(300)", null, null)); + filedSchemaMap2.put("c666", new FieldSchema("c666", "INT", "100", null)); + filedSchemaMap2.put("c4", new FieldSchema("c4", "BIGINT", "555", null)); + filedSchemaMap2.put("c199", new FieldSchema("c199", "INT", null, null)); + filedSchemaMap2.put("c12", new FieldSchema("c12", "INT", "100", null)); + scrFiledSchema.put(tab2, filedSchemaMap2); + return scrFiledSchema; + } + + private Map> buildOriginFiledSchema() { + String tab1 = "db.test_fill"; + String tab2 = "test.t1"; + Map> originFiledSchema = new LinkedHashMap<>(); + Map filedSchemaMap1 = new LinkedHashMap<>(); + filedSchemaMap1.put("id", new FieldSchema()); + filedSchemaMap1.put("name", new FieldSchema()); + filedSchemaMap1.put("test_time", new FieldSchema()); + filedSchemaMap1.put("c1", new FieldSchema()); + originFiledSchema.put(tab1, filedSchemaMap1); + + Map filedSchemaMap2 = new LinkedHashMap<>(); + filedSchemaMap2.put("id", new FieldSchema()); + filedSchemaMap2.put("c2", new FieldSchema()); + filedSchemaMap2.put("c555", new FieldSchema()); + filedSchemaMap2.put("c666", new FieldSchema()); + filedSchemaMap2.put("c4", new FieldSchema()); + filedSchemaMap2.put("c199", new FieldSchema()); + filedSchemaMap2.put("c12", new FieldSchema()); + originFiledSchema.put(tab2, filedSchemaMap2); + return originFiledSchema; + } + @Test public void testBuildMysql2DorisTypeName() throws IOException { String columnInfo = @@ -204,14 +305,16 @@ public void testBuildOracle2DorisTypeName() throws IOException { public void testExtractDDLListRename() throws IOException { String columnInfo = "{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1698314781975,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000046\",\"pos\":5197,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000046\\\",\\\"pos\\\":5197,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1698314781,\\\"file\\\":\\\"binlog.000046\\\",\\\"pos\\\":5331,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table t1 rename column c3 to c333\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"t1\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"10000\\\",\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c2\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c333\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":10,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]}]},\\\"comment\\\":null}]}\"}"; - Map originFieldSchemaMap = Maps.newHashMap(); + Map> originFieldSchemaHashMap = new LinkedHashMap<>(); + Map fieldSchemaHashMap = Maps.newHashMap(); JsonNode record = objectMapper.readTree(columnInfo); schemaChange.setSourceConnector("mysql"); - originFieldSchemaMap.put("id", new FieldSchema("id", "INT", "", "")); - originFieldSchemaMap.put("c2", new FieldSchema("c2", "INT", "", "")); - originFieldSchemaMap.put("c3", new FieldSchema("c3", "VARCHAR(30)", "", "")); - schemaChange.setOriginFieldSchemaMap(originFieldSchemaMap); + fieldSchemaHashMap.put("id", new FieldSchema("id", "INT", "", "")); + fieldSchemaHashMap.put("c2", new FieldSchema("c2", "INT", "", "")); + fieldSchemaHashMap.put("c3", new FieldSchema("c3", "VARCHAR(30)", "", "")); + originFieldSchemaHashMap.put("test.t1", fieldSchemaHashMap); + schemaChange.setOriginFieldSchemaMap(originFieldSchemaHashMap); List ddlList = schemaChange.extractDDLList(record); Assert.assertEquals("ALTER TABLE `test`.`t1` RENAME COLUMN `c3` `c333`", ddlList.get(0)); @@ -293,15 +396,18 @@ public void testDateTimeFullOrigin() throws JsonProcessingException { "test_ts_6", new FieldSchema("test_ts_6", "DATETIMEV2(6)", "current_timestamp", null)); + String tableName = "db.test_fill"; schemaChange.setSourceConnector("mysql"); String columnsString = "[{\"name\":\"id\",\"jdbcType\":4,\"typeName\":\"INT\",\"typeExpression\":\"INT\",\"charsetName\":null,\"position\":1,\"optional\":false,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":false,\"enumValues\":[]},{\"name\":\"test_dt_0\",\"jdbcType\":93,\"typeName\":\"DATETIME\",\"typeExpression\":\"DATETIME\",\"charsetName\":null,\"position\":2,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"enumValues\":[]},{\"name\":\"test_dt_1\",\"jdbcType\":93,\"typeName\":\"DATETIME\",\"typeExpression\":\"DATETIME\",\"charsetName\":null,\"length\":1,\"position\":3,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"enumValues\":[]},{\"name\":\"test_dt_3\",\"jdbcType\":93,\"typeName\":\"DATETIME\",\"typeExpression\":\"DATETIME\",\"charsetName\":null,\"length\":3,\"position\":4,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"enumValues\":[]},{\"name\":\"test_dt_6\",\"jdbcType\":93,\"typeName\":\"DATETIME\",\"typeExpression\":\"DATETIME\",\"charsetName\":null,\"length\":6,\"position\":5,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"enumValues\":[]},{\"name\":\"test_ts_0\",\"jdbcType\":2014,\"typeName\":\"TIMESTAMP\",\"typeExpression\":\"TIMESTAMP\",\"charsetName\":null,\"position\":6,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"enumValues\":[]},{\"name\":\"test_ts_1\",\"jdbcType\":2014,\"typeName\":\"TIMESTAMP\",\"typeExpression\":\"TIMESTAMP\",\"charsetName\":null,\"length\":1,\"position\":7,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"defaultValueExpression\":\"1970-01-01 00:00:00\",\"enumValues\":[]},{\"name\":\"test_ts_3\",\"jdbcType\":2014,\"typeName\":\"TIMESTAMP\",\"typeExpression\":\"TIMESTAMP\",\"charsetName\":null,\"length\":3,\"position\":8,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"defaultValueExpression\":\"1970-01-01 00:00:00\",\"enumValues\":[]},{\"name\":\"test_ts_6\",\"jdbcType\":2014,\"typeName\":\"TIMESTAMP\",\"typeExpression\":\"TIMESTAMP\",\"charsetName\":null,\"length\":6,\"position\":9,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"defaultValueExpression\":\"1970-01-01 00:00:00\",\"enumValues\":[]}]},\"comment\":null}]}"; JsonNode columns = objectMapper.readTree(columnsString); - schemaChange.fillOriginSchema(columns); - Map originFieldSchemaMap = schemaChange.getOriginFieldSchemaMap(); + schemaChange.fillOriginSchema(tableName, columns); + Map> originFieldSchemaMap = + schemaChange.getOriginFieldSchemaMap(); + Map fieldSchemaMap = originFieldSchemaMap.get(tableName); Iterator> originFieldSchemaIterator = - originFieldSchemaMap.entrySet().iterator(); + fieldSchemaMap.entrySet().iterator(); for (Entry entry : srcFiledSchemaMap.entrySet()) { FieldSchema srcFiledSchema = entry.getValue(); Entry originField = originFieldSchemaIterator.next(); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java index 3cc9db8da..88176e842 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java @@ -72,6 +72,7 @@ public static void main(String[] args) throws Exception { String multiToOneTarget = "a|b"; boolean ignoreDefaultValue = false; boolean useNewSchemaChange = false; + boolean singleSink = false; DatabaseSync databaseSync = new MysqlDatabaseSync(); databaseSync .setEnv(env) @@ -88,6 +89,7 @@ public static void main(String[] args) throws Exception { .setTableConfig(tableConfig) .setCreateTableOnly(false) .setNewSchemaChange(useNewSchemaChange) + .setSingleSink(singleSink) .create(); databaseSync.build(); env.execute(String.format("MySQL-Doris Database Sync: %s", database));