From c1399be49f881327a20f0a8bc117f32bc69d9a79 Mon Sep 17 00:00:00 2001 From: Kerwin <37063904+zhuangchong@users.noreply.github.com> Date: Thu, 21 Dec 2023 11:19:29 +0800 Subject: [PATCH] [flink] Fixed deserialization failure when kafka cdc records contain nested structures (#2526) --- .../org/apache/paimon/utils/TypeUtils.java | 16 ++++++++ .../flink/action/cdc/format/RecordParser.java | 29 ++++++++++++--- .../KafkaDebeziumSyncTableActionITCase.java | 37 +++++++++++++++++++ .../table/nestedtype/debezium-data-1.txt | 19 ++++++++++ 4 files changed, 96 insertions(+), 5 deletions(-) create mode 100644 paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/nestedtype/debezium-data-1.txt diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java b/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java index 6b3fb7000193..cd15ad383779 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java @@ -229,4 +229,20 @@ public static boolean isInteroperable(DataType t1, DataType t2) { return t1.copy(true).equals(t2.copy(true)); } } + + public static boolean isBasicType(Object obj) { + Class clazz = obj.getClass(); + return clazz.isPrimitive() || isWrapperType(clazz) || clazz.equals(String.class); + } + + private static boolean isWrapperType(Class clazz) { + return clazz.equals(Boolean.class) + || clazz.equals(Character.class) + || clazz.equals(Byte.class) + || clazz.equals(Short.class) + || clazz.equals(Integer.class) + || clazz.equals(Long.class) + || clazz.equals(Float.class) + || clazz.equals(Double.class); + } } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/RecordParser.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/RecordParser.java index 0593fc1e9e4b..2588aad7ad31 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/RecordParser.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/RecordParser.java @@ -27,7 +27,9 @@ import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowKind; import org.apache.paimon.utils.JsonSerdeUtil; +import org.apache.paimon.utils.TypeUtils; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper; @@ -42,10 +44,10 @@ import javax.annotation.Nullable; import java.util.Collections; -import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -141,10 +143,27 @@ public void flatMap(String value, Collector out) { protected Map extractRowData( JsonNode record, LinkedHashMap paimonFieldTypes) { paimonFieldTypes.putAll(fillDefaultStringTypes(record)); - Map recordMap = - OBJECT_MAPPER.convertValue(record, new TypeReference>() {}); - - Map rowData = new HashMap<>(recordMap); + Map recordMap = + OBJECT_MAPPER.convertValue(record, new TypeReference>() {}); + Map rowData = + recordMap.entrySet().stream() + .collect( + Collectors.toMap( + Map.Entry::getKey, + entry -> { + if (Objects.nonNull(entry.getValue()) + && !TypeUtils.isBasicType(entry.getValue())) { + try { + return OBJECT_MAPPER + .writer() + .writeValueAsString(entry.getValue()); + } catch (JsonProcessingException e) { + LOG.error("Failed to deserialize record.", e); + return Objects.toString(entry.getValue()); + } + } + return Objects.toString(entry.getValue(), null); + })); evalComputedColumns(rowData, paimonFieldTypes); return rowData; } diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java index d9cf2a8ea008..6dcb4b1b7807 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java @@ -181,4 +181,41 @@ public void testComputedColumn() throws Exception { rowType, Arrays.asList("id", "_year")); } + + @Test + @Timeout(60) + public void testRecordWithNestedDataType() throws Exception { + String topic = "nested_type"; + createTestTopic(topic, 1, 1); + + List lines = readLines("kafka/debezium/table/nestedtype/debezium-data-1.txt"); + try { + writeRecordsToKafka(topic, lines); + } catch (Exception e) { + throw new Exception("Failed to write canal data to Kafka.", e); + } + + Map kafkaConfig = getBasicKafkaConfig(); + kafkaConfig.put(VALUE_FORMAT.key(), "debezium-json"); + kafkaConfig.put(TOPIC.key(), topic); + KafkaSyncTableAction action = + syncTableActionBuilder(kafkaConfig) + .withPrimaryKeys("id") + .withTableConfig(getBasicTableConfig()) + .build(); + runActionWithDefaultEnv(action); + + FileStoreTable table = getFileStoreTable(tableName); + + RowType rowType = + RowType.of( + new DataType[] { + DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.STRING() + }, + new String[] {"id", "name", "row"}); + List primaryKeys = Collections.singletonList("id"); + List expected = + Collections.singletonList("+I[101, hammer, {\"row_key\":\"value\"}]"); + waitForResult(expected, table, rowType, primaryKeys); + } } diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/nestedtype/debezium-data-1.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/nestedtype/debezium-data-1.txt new file mode 100644 index 000000000000..2394d941086c --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/nestedtype/debezium-data-1.txt @@ -0,0 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{"before": null, "after": {"id": 101, "name": "hammer", "row": {"row_key":"value"} }, "source": {"version": "1.9.7.Final", "connector": "mysql", "name": "mysql_binlog_source", "ts_ms": 1596684883000, "snapshot": "false", "db": "test", "sequence": null, "table": "product", "server_id": 0, "gtid": null, "file": "", "pos": 0, "row": 0, "thread": null, "query": null}, "op": "c", "ts_ms": 1596684883000, "transaction": null}