Skip to content

Commit

Permalink
[flink] Fixed deserialization failure when kafka cdc records contain …
Browse files Browse the repository at this point in the history
…nested structures (#2526)
  • Loading branch information
zhuangchong authored Dec 21, 2023
1 parent aadfb6d commit c1399be
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 5 deletions.
16 changes: 16 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -229,4 +229,20 @@ public static boolean isInteroperable(DataType t1, DataType t2) {
return t1.copy(true).equals(t2.copy(true));
}
}

public static boolean isBasicType(Object obj) {
Class<?> clazz = obj.getClass();
return clazz.isPrimitive() || isWrapperType(clazz) || clazz.equals(String.class);
}

private static boolean isWrapperType(Class<?> clazz) {
return clazz.equals(Boolean.class)
|| clazz.equals(Character.class)
|| clazz.equals(Byte.class)
|| clazz.equals(Short.class)
|| clazz.equals(Integer.class)
|| clazz.equals(Long.class)
|| clazz.equals(Float.class)
|| clazz.equals(Double.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.utils.JsonSerdeUtil;
import org.apache.paimon.utils.TypeUtils;

import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
Expand All @@ -42,10 +44,10 @@
import javax.annotation.Nullable;

import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

Expand Down Expand Up @@ -141,10 +143,27 @@ public void flatMap(String value, Collector<RichCdcMultiplexRecord> out) {
protected Map<String, String> extractRowData(
JsonNode record, LinkedHashMap<String, DataType> paimonFieldTypes) {
paimonFieldTypes.putAll(fillDefaultStringTypes(record));
Map<String, String> recordMap =
OBJECT_MAPPER.convertValue(record, new TypeReference<Map<String, String>>() {});

Map<String, String> rowData = new HashMap<>(recordMap);
Map<String, Object> recordMap =
OBJECT_MAPPER.convertValue(record, new TypeReference<Map<String, Object>>() {});
Map<String, String> rowData =
recordMap.entrySet().stream()
.collect(
Collectors.toMap(
Map.Entry::getKey,
entry -> {
if (Objects.nonNull(entry.getValue())
&& !TypeUtils.isBasicType(entry.getValue())) {
try {
return OBJECT_MAPPER
.writer()
.writeValueAsString(entry.getValue());
} catch (JsonProcessingException e) {
LOG.error("Failed to deserialize record.", e);
return Objects.toString(entry.getValue());
}
}
return Objects.toString(entry.getValue(), null);
}));
evalComputedColumns(rowData, paimonFieldTypes);
return rowData;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,4 +181,41 @@ public void testComputedColumn() throws Exception {
rowType,
Arrays.asList("id", "_year"));
}

@Test
@Timeout(60)
public void testRecordWithNestedDataType() throws Exception {
String topic = "nested_type";
createTestTopic(topic, 1, 1);

List<String> lines = readLines("kafka/debezium/table/nestedtype/debezium-data-1.txt");
try {
writeRecordsToKafka(topic, lines);
} catch (Exception e) {
throw new Exception("Failed to write canal data to Kafka.", e);
}

Map<String, String> kafkaConfig = getBasicKafkaConfig();
kafkaConfig.put(VALUE_FORMAT.key(), "debezium-json");
kafkaConfig.put(TOPIC.key(), topic);
KafkaSyncTableAction action =
syncTableActionBuilder(kafkaConfig)
.withPrimaryKeys("id")
.withTableConfig(getBasicTableConfig())
.build();
runActionWithDefaultEnv(action);

FileStoreTable table = getFileStoreTable(tableName);

RowType rowType =
RowType.of(
new DataType[] {
DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.STRING()
},
new String[] {"id", "name", "row"});
List<String> primaryKeys = Collections.singletonList("id");
List<String> expected =
Collections.singletonList("+I[101, hammer, {\"row_key\":\"value\"}]");
waitForResult(expected, table, rowType, primaryKeys);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

{"before": null, "after": {"id": 101, "name": "hammer", "row": {"row_key":"value"} }, "source": {"version": "1.9.7.Final", "connector": "mysql", "name": "mysql_binlog_source", "ts_ms": 1596684883000, "snapshot": "false", "db": "test", "sequence": null, "table": "product", "server_id": 0, "gtid": null, "file": "", "pos": 0, "row": 0, "thread": null, "query": null}, "op": "c", "ts_ms": 1596684883000, "transaction": null}

0 comments on commit c1399be

Please sign in to comment.