Skip to content

Commit

Permalink
[fix]Fix multi-table sink, schema change result is incorrect
Browse files Browse the repository at this point in the history
  • Loading branch information
DongLiang-0 committed Jan 31, 2024
1 parent 007d524 commit 2d67738
Show file tree
Hide file tree
Showing 6 changed files with 189 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.NullNode;
import org.apache.commons.collections.CollectionUtils;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumChangeContext;
Expand All @@ -36,8 +37,10 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.regex.Pattern;

import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_DEFAULT;
Expand Down Expand Up @@ -71,6 +74,8 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin
private String targetTableSuffix;
private JsonDebeziumDataChange dataChange;
private JsonDebeziumSchemaChange schemaChange;
private final Set<String> initTableSet = new HashSet<>();
private final Set<String> tableMappingSet = new HashSet<>();

public JsonDebeziumSchemaSerializer(
DorisOptions dorisOptions,
Expand Down Expand Up @@ -156,13 +161,25 @@ public DorisRecord serialize(String record) throws IOException {
return null;
}

if (firstLoad) {
schemaChange.init(recordRoot);
firstLoad = false;
Map<String, String> tableMapping = schemaChange.getTableMapping();
if (initSchemaChange(firstLoad, tableMapping)) {
schemaChange.init(recordRoot, initTableSet);
this.firstLoad = false;
}

return dataChange.serialize(record, recordRoot, op);
}

private boolean initSchemaChange(boolean firstLoad, Map<String, String> tableMapping) {
if (firstLoad) {
return true;
}
tableMappingSet.clear();
tableMappingSet.addAll(tableMapping.keySet());
tableMappingSet.removeAll(initTableSet);
return CollectionUtils.isNotEmpty(tableMappingSet);
}

private String extractJsonNode(JsonNode record, String key) {
return record != null && record.get(key) != null && !(record.get(key) instanceof NullNode)
? record.get(key).asText()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.doris.flink.tools.cdc.SourceSchema;

import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;

/**
Expand Down Expand Up @@ -60,7 +61,7 @@ public abstract class JsonDebeziumSchemaChange implements ChangeEvent {

public abstract boolean schemaChange(JsonNode recordRoot);

public abstract void init(JsonNode recordRoot);
public abstract void init(JsonNode recordRoot, Set<String> initTableSet);

/** When cdc synchronizes multiple tables, it will capture multiple table schema changes. */
protected boolean checkTable(JsonNode recordRoot) {
Expand Down Expand Up @@ -136,6 +137,10 @@ protected JsonNode extractHistoryRecord(JsonNode record) throws JsonProcessingEx
return record;
}

public Map<String, String> getTableMapping() {
return tableMapping;
}

@VisibleForTesting
public void setSchemaChangeManager(SchemaChangeManager schemaChangeManager) {
this.schemaChangeManager = schemaChangeManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

Expand All @@ -55,7 +56,7 @@ public JsonDebeziumSchemaChangeImpl(JsonDebeziumChangeContext changeContext) {
}

@Override
public void init(JsonNode recordRoot) {
public void init(JsonNode recordRoot, Set<String> initTableSet) {
// do nothing
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -66,13 +67,15 @@ public class JsonDebeziumSchemaChangeImplV2 extends JsonDebeziumSchemaChange {
Pattern.compile(
"ALTER\\s+TABLE\\s+(\\w+)\\s+RENAME\\s+COLUMN\\s+(\\w+)\\s+TO\\s+(\\w+)",
Pattern.CASE_INSENSITIVE);
private Map<String, FieldSchema> originFieldSchemaMap;
// schemaChange saves table names, field, and field column information
private Map<String, Map<String, FieldSchema>> originFieldSchemaMap = new LinkedHashMap<>();
private SourceConnector sourceConnector;
// create table properties
private final Map<String, String> tableProperties;
private String targetDatabase;
private String targetTablePrefix;
private String targetTableSuffix;
private final Set<String> filledTables = new HashSet<>();

public JsonDebeziumSchemaChangeImplV2(JsonDebeziumChangeContext changeContext) {
this.addDropDDLPattern = Pattern.compile(addDropDDLRegex, Pattern.CASE_INSENSITIVE);
Expand All @@ -95,14 +98,20 @@ public JsonDebeziumSchemaChangeImplV2(JsonDebeziumChangeContext changeContext) {
}

@Override
public void init(JsonNode recordRoot) {
originFieldSchemaMap = new LinkedHashMap<>();
public void init(JsonNode recordRoot, Set<String> initTableSet) {
String dorisTable = getDorisTableIdentifier(recordRoot);
if (Objects.isNull(dorisTable) || originFieldSchemaMap.containsKey(dorisTable)) {
return;
}
initTableSet.add(dorisTable);
Set<String> columnNameSet = extractAfterRow(recordRoot).keySet();
if (CollectionUtils.isEmpty(columnNameSet)) {
columnNameSet = extractBeforeRow(recordRoot).keySet();
}
columnNameSet.forEach(
columnName -> originFieldSchemaMap.put(columnName, new FieldSchema()));
Map<String, FieldSchema> fieldSchemaMap = new LinkedHashMap<>();
columnNameSet.forEach(columnName -> fieldSchemaMap.put(columnName, new FieldSchema()));

originFieldSchemaMap.put(dorisTable, fieldSchemaMap);
}

@Override
Expand Down Expand Up @@ -184,24 +193,28 @@ public List<String> extractDDLList(JsonNode record) throws IOException {
sourceConnector =
SourceConnector.valueOf(
record.get("source").get("connector").asText().toUpperCase());
fillOriginSchema(columns);
}
if (!filledTables.contains(dorisTable)) {
fillOriginSchema(dorisTable, columns);
filledTables.add(dorisTable);
}

Map<String, FieldSchema> fieldSchemaMap = originFieldSchemaMap.get(dorisTable);
// rename ddl
Matcher renameMatcher = renameDDLPattern.matcher(ddl);
if (renameMatcher.find()) {
String oldColumnName = renameMatcher.group(2);
String newColumnName = renameMatcher.group(3);
return SchemaChangeHelper.generateRenameDDLSql(
dorisTable, oldColumnName, newColumnName, originFieldSchemaMap);
dorisTable, oldColumnName, newColumnName, fieldSchemaMap);
}

// add/drop ddl
Map<String, FieldSchema> updateFiledSchema = new LinkedHashMap<>();
for (JsonNode column : columns) {
buildFieldSchema(updateFiledSchema, column);
}
SchemaChangeHelper.compareSchema(updateFiledSchema, originFieldSchemaMap);
SchemaChangeHelper.compareSchema(updateFiledSchema, fieldSchemaMap);
// In order to avoid other source table column change operations other than add/drop/rename,
// which may lead to the accidental deletion of the doris column.
Matcher matcher = addDropDDLPattern.matcher(ddl);
Expand Down Expand Up @@ -330,16 +343,17 @@ private Map<String, Object> extractRow(JsonNode recordRow) {
}

@VisibleForTesting
public void fillOriginSchema(JsonNode columns) {
if (Objects.nonNull(originFieldSchemaMap)) {
public void fillOriginSchema(String tableName, JsonNode columns) {
Map<String, FieldSchema> fieldSchemaMap = originFieldSchemaMap.get(tableName);
if (Objects.nonNull(fieldSchemaMap)) {
for (JsonNode column : columns) {
String fieldName = column.get("name").asText();
if (originFieldSchemaMap.containsKey(fieldName)) {
if (fieldSchemaMap.containsKey(fieldName)) {
String dorisTypeName = buildDorisTypeName(column);
String defaultValue =
handleDefaultValue(extractJsonNode(column, "defaultValueExpression"));
String comment = extractJsonNode(column, "comment");
FieldSchema fieldSchema = originFieldSchemaMap.get(fieldName);
FieldSchema fieldSchema = fieldSchemaMap.get(fieldName);
fieldSchema.setName(fieldName);
fieldSchema.setTypeString(dorisTypeName);
fieldSchema.setComment(comment);
Expand All @@ -351,8 +365,10 @@ public void fillOriginSchema(JsonNode columns) {
"Current schema change failed! You need to ensure that "
+ "there is data in the table."
+ dorisOptions.getTableIdentifier());
originFieldSchemaMap = new LinkedHashMap<>();
columns.forEach(column -> buildFieldSchema(originFieldSchemaMap, column));
fieldSchemaMap = new LinkedHashMap<>();
Map<String, FieldSchema> finalFieldSchemaMap = fieldSchemaMap;
columns.forEach(column -> buildFieldSchema(finalFieldSchemaMap, column));
originFieldSchemaMap.put(tableName, fieldSchemaMap);
}
}

Expand Down Expand Up @@ -403,12 +419,13 @@ private String handleDefaultValue(String defaultValue) {
}

@VisibleForTesting
public void setOriginFieldSchemaMap(Map<String, FieldSchema> originFieldSchemaMap) {
public void setOriginFieldSchemaMap(
Map<String, Map<String, FieldSchema>> originFieldSchemaMap) {
this.originFieldSchemaMap = originFieldSchemaMap;
}

@VisibleForTesting
public Map<String, FieldSchema> getOriginFieldSchemaMap() {
public Map<String, Map<String, FieldSchema>> getOriginFieldSchemaMap() {
return originFieldSchemaMap;
}

Expand Down
Loading

0 comments on commit 2d67738

Please sign in to comment.