From c0e823b61e39c76be3e63a0d651f9122b5021b97 Mon Sep 17 00:00:00 2001 From: Borahm Lee Date: Fri, 25 Oct 2024 12:00:42 +0900 Subject: [PATCH 1/6] GH-3589: Refactor `toMessage` batch method and fix logging/header issues --- .../BatchMessagingMessageConverter.java | 99 +++++++++---------- 1 file changed, 47 insertions(+), 52 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java index a22fcfd8b4..6487f9f257 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java @@ -23,7 +23,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Optional; import org.apache.commons.logging.LogFactory; import org.apache.kafka.clients.consumer.Consumer; @@ -63,6 +62,7 @@ * @author Biju Kunjummen * @author Sanghyeok An * @author Hope Kim + * @author Borahm Lee * @since 1.1 */ public class BatchMessagingMessageConverter implements BatchMessageConverter { @@ -144,7 +144,8 @@ public void setRawRecordHeader(boolean rawRecordHeader) { } @Override // NOSONAR - public Message toMessage(List> records, @Nullable Acknowledgment acknowledgment, Consumer consumer, Type type) { + public Message toMessage(List> records, @Nullable Acknowledgment acknowledgment, + Consumer consumer, Type type) { KafkaMessageHeaders kafkaMessageHeaders = new KafkaMessageHeaders(this.generateMessageId, this.generateTimestamp); @@ -165,65 +166,39 @@ public Message toMessage(List> records, @Nullable Acknow addToRawHeaders(rawHeaders, convertedHeaders, natives, raws, conversionFailures); commonHeaders(acknowledgment, consumer, rawHeaders, keys, topics, partitions, offsets, timestampTypes, timestamps); - records.forEach(record -> processRecord(record, payloads, keys, topics, partitions, offsets, timestampTypes, timestamps, - convertedHeaders, natives, raws, conversionFailures, rawHeaders, type)); - return MessageBuilder.createMessage(payloads, kafkaMessageHeaders); - } - private void processRecord(ConsumerRecord record, List payloads, List keys, - List topics, List partitions, List offsets, - List timestampTypes, List timestamps, List> convertedHeaders, - List natives, List> raws, List conversionFailures, - Map rawHeaders, Type type) { - payloads.add(obtainPayload(type, record, conversionFailures)); - keys.add(record.key()); - topics.add(record.topic()); - partitions.add(record.partition()); - offsets.add(record.offset()); - - if (record.timestampType() != null) { - timestampTypes.add(record.timestampType().name()); - } - timestamps.add(record.timestamp()); - - boolean logged = false; - String info = null; - - if (this.headerMapper != null && record.headers() != null) { - Map converted = new HashMap<>(); - this.headerMapper.toHeaders(record.headers(), converted); - convertedHeaders.add(converted); - Object object = converted.get(KafkaHeaders.LISTENER_INFO); - info = Optional.ofNullable(object) - .filter(String.class::isInstance) - .map(String.class::cast) - .orElse(null); - } - else { - if (!logged) { - logHeaderWarningOnce(); - logged = true; + String listenerInfo = null; + for (ConsumerRecord record : records) { + addRecordInfo(record, type, payloads, keys, topics, partitions, offsets, timestampTypes, timestamps, conversionFailures); + if (this.headerMapper != null && record.headers() != null) { + addToConvertedHeaders(record.headers(), convertedHeaders); + if (!convertedHeaders.isEmpty()) { + Object obj = convertedHeaders.get(convertedHeaders.size() - 1).get(KafkaHeaders.LISTENER_INFO); + if (obj != null && obj instanceof String) { + listenerInfo = (String) obj; + } + } + } else { + natives.add(record.headers()); + } + if (this.rawRecordHeader) { + raws.add(record); } - natives.add(record.headers()); } - if (this.rawRecordHeader) { - raws.add(record); + if (this.headerMapper == null) { + this.logger.warn(() -> + "No header mapper is available; Jackson is required for the default mapper; " + + "headers (if present) are not mapped but provided raw in " + + KafkaHeaders.NATIVE_HEADERS); } - if (info != null) { - rawHeaders.put(KafkaHeaders.LISTENER_INFO, info); + if (listenerInfo != null) { + rawHeaders.put(KafkaHeaders.LISTENER_INFO, listenerInfo); } - } - - private void logHeaderWarningOnce() { - this.logger.debug(() -> - "No header mapper is available; Jackson is required for the default mapper; " - + "headers (if present) are not mapped but provided raw in " - + KafkaHeaders.NATIVE_HEADERS); + return MessageBuilder.createMessage(payloads, kafkaMessageHeaders); } private void addToRawHeaders(Map rawHeaders, List> convertedHeaders, List natives, List> raws, List conversionFailures) { - if (this.headerMapper != null) { rawHeaders.put(KafkaHeaders.BATCH_CONVERTED_HEADERS, convertedHeaders); } @@ -236,12 +211,32 @@ private void addToRawHeaders(Map rawHeaders, List record, Type type, List payloads, List keys, + List topics, List partitions, List offsets, List timestampTypes, + List timestamps, List conversionFailures) { + payloads.add(obtainPayload(type, record, conversionFailures)); + keys.add(record.key()); + topics.add(record.topic()); + partitions.add(record.partition()); + offsets.add(record.offset()); + timestamps.add(record.timestamp()); + if (record.timestampType() != null) { + timestampTypes.add(record.timestampType().name()); + } + } + private Object obtainPayload(Type type, ConsumerRecord record, List conversionFailures) { return this.recordConverter == null || !containerType(type) ? extractAndConvertValue(record, type) : convert(record, type, conversionFailures); } + private void addToConvertedHeaders(Headers headers, List> convertedHeaders) { + Map converted = new HashMap<>(); + this.headerMapper.toHeaders(headers, converted); + convertedHeaders.add(converted); + } + @Override public List> fromMessage(Message message, String defaultTopic) { throw new UnsupportedOperationException(); From 346b52bb141c2d69c94679481800e98bd5ca1588 Mon Sep 17 00:00:00 2001 From: Borahm Lee Date: Fri, 25 Oct 2024 13:56:49 +0900 Subject: [PATCH 2/6] Remove unnecessary check --- .../support/converter/BatchMessagingMessageConverter.java | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java index 6487f9f257..6c548ffa0d 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java @@ -172,11 +172,9 @@ public Message toMessage(List> records, @Nullable Acknow addRecordInfo(record, type, payloads, keys, topics, partitions, offsets, timestampTypes, timestamps, conversionFailures); if (this.headerMapper != null && record.headers() != null) { addToConvertedHeaders(record.headers(), convertedHeaders); - if (!convertedHeaders.isEmpty()) { - Object obj = convertedHeaders.get(convertedHeaders.size() - 1).get(KafkaHeaders.LISTENER_INFO); - if (obj != null && obj instanceof String) { - listenerInfo = (String) obj; - } + Object obj = convertedHeaders.get(convertedHeaders.size() - 1).get(KafkaHeaders.LISTENER_INFO); + if (obj instanceof String) { + listenerInfo = (String) obj; } } else { natives.add(record.headers()); From 3058e77d66c584ae984fc6401fde139df3e9689a Mon Sep 17 00:00:00 2001 From: Borahm Lee Date: Fri, 25 Oct 2024 14:36:49 +0900 Subject: [PATCH 3/6] Fix checkstyle violation --- .../support/converter/BatchMessagingMessageConverter.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java index 6c548ffa0d..09dd30cb4c 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java @@ -176,7 +176,8 @@ public Message toMessage(List> records, @Nullable Acknow if (obj instanceof String) { listenerInfo = (String) obj; } - } else { + } + else { natives.add(record.headers()); } if (this.rawRecordHeader) { @@ -210,8 +211,8 @@ private void addToRawHeaders(Map rawHeaders, List record, Type type, List payloads, List keys, - List topics, List partitions, List offsets, List timestampTypes, - List timestamps, List conversionFailures) { + List topics, List partitions, List offsets, List timestampTypes, + List timestamps, List conversionFailures) { payloads.add(obtainPayload(type, record, conversionFailures)); keys.add(record.key()); topics.add(record.topic()); From 3163e93042daba821d512d7788f7af8cf43d2e67 Mon Sep 17 00:00:00 2001 From: Borahm Lee Date: Fri, 25 Oct 2024 14:43:48 +0900 Subject: [PATCH 4/6] Change log level from WARN to DEBUG --- .../kafka/support/converter/BatchMessagingMessageConverter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java index 09dd30cb4c..6777d288e4 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java @@ -185,7 +185,7 @@ public Message toMessage(List> records, @Nullable Acknow } } if (this.headerMapper == null) { - this.logger.warn(() -> + this.logger.debug(() -> "No header mapper is available; Jackson is required for the default mapper; " + "headers (if present) are not mapped but provided raw in " + KafkaHeaders.NATIVE_HEADERS); From 4e2029d6d2025f7df319e196f5614adb84e0c52e Mon Sep 17 00:00:00 2001 From: Borahm Lee Date: Mon, 28 Oct 2024 21:53:27 +0900 Subject: [PATCH 5/6] Refactor headers conversion method --- .../support/converter/BatchMessagingMessageConverter.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java index 6777d288e4..d3de157007 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java @@ -171,8 +171,8 @@ public Message toMessage(List> records, @Nullable Acknow for (ConsumerRecord record : records) { addRecordInfo(record, type, payloads, keys, topics, partitions, offsets, timestampTypes, timestamps, conversionFailures); if (this.headerMapper != null && record.headers() != null) { - addToConvertedHeaders(record.headers(), convertedHeaders); - Object obj = convertedHeaders.get(convertedHeaders.size() - 1).get(KafkaHeaders.LISTENER_INFO); + Map converted = convertHeaders(record.headers(), convertedHeaders); + Object obj = converted.get(KafkaHeaders.LISTENER_INFO); if (obj instanceof String) { listenerInfo = (String) obj; } @@ -230,10 +230,11 @@ private Object obtainPayload(Type type, ConsumerRecord record, List> convertedHeaders) { + private Map convertHeaders(Headers headers, List> convertedHeaders) { Map converted = new HashMap<>(); this.headerMapper.toHeaders(headers, converted); convertedHeaders.add(converted); + return converted; } @Override From d235137d4701cc5d596969bc911f8de49d91410b Mon Sep 17 00:00:00 2001 From: Borahm Lee Date: Mon, 28 Oct 2024 22:03:52 +0900 Subject: [PATCH 6/6] Add natives condition for logging when no header mapper --- .../kafka/support/converter/BatchMessagingMessageConverter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java index d3de157007..b4e5af0ef3 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java @@ -184,7 +184,7 @@ public Message toMessage(List> records, @Nullable Acknow raws.add(record); } } - if (this.headerMapper == null) { + if (this.headerMapper == null && !natives.isEmpty()) { this.logger.debug(() -> "No header mapper is available; Jackson is required for the default mapper; " + "headers (if present) are not mapped but provided raw in "