Skip to content

Commit

Permalink
Remove unnecessary empty check
Browse files Browse the repository at this point in the history
  • Loading branch information
bky373 committed Oct 25, 2024
1 parent c0e823b commit 2033cde
Showing 1 changed file with 33 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,33 +16,27 @@

package org.springframework.kafka.support.converter;

import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.utils.Bytes;

import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.DefaultKafkaHeaderMapper;
import org.springframework.kafka.support.JacksonPresent;
import org.springframework.kafka.support.KafkaHeaderMapper;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.KafkaNull;
import org.springframework.kafka.support.*;
import org.springframework.kafka.support.serializer.SerializationUtils;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;

import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* A Messaging {@link MessageConverter} implementation used with a batch
* message listener; the consumer record values are extracted into a collection in
Expand Down Expand Up @@ -90,6 +84,7 @@ public BatchMessagingMessageConverter() {
/**
* Create an instance that converts record values using the supplied
* converter.
*
* @param recordConverter the converter.
* @since 1.3.2
*/
Expand All @@ -103,6 +98,7 @@ public BatchMessagingMessageConverter(RecordMessageConverter recordConverter) {
/**
* Generate {@link Message} {@code ids} for produced messages. If set to {@code false},
* will try to use a default value. By default set to {@code false}.
*
* @param generateMessageId true if a message id should be generated
*/
public void setGenerateMessageId(boolean generateMessageId) {
Expand All @@ -112,6 +108,7 @@ public void setGenerateMessageId(boolean generateMessageId) {
/**
* Generate {@code timestamp} for produced messages. If set to {@code false}, -1 is
* used instead. By default set to {@code false}.
*
* @param generateTimestamp true if a timestamp should be generated
*/
public void setGenerateTimestamp(boolean generateTimestamp) {
Expand All @@ -120,6 +117,7 @@ public void setGenerateTimestamp(boolean generateTimestamp) {

/**
* Set the header mapper to map headers.
*
* @param headerMapper the mapper.
* @since 1.3
*/
Expand All @@ -136,6 +134,7 @@ public RecordMessageConverter getRecordMessageConverter() {
/**
* Set to true to add the raw {@code List<ConsumerRecord<?, ?>>} as a header
* {@link KafkaHeaders#RAW_DATA}.
*
* @param rawRecordHeader true to add the header.
* @since 2.7
*/
Expand All @@ -145,7 +144,7 @@ public void setRawRecordHeader(boolean rawRecordHeader) {

@Override // NOSONAR
public Message<?> toMessage(List<ConsumerRecord<?, ?>> records, @Nullable Acknowledgment acknowledgment,
Consumer<?, ?> consumer, Type type) {
Consumer<?, ?> consumer, Type type) {

KafkaMessageHeaders kafkaMessageHeaders = new KafkaMessageHeaders(this.generateMessageId,
this.generateTimestamp);
Expand All @@ -172,11 +171,9 @@ public Message<?> toMessage(List<ConsumerRecord<?, ?>> records, @Nullable Acknow
addRecordInfo(record, type, payloads, keys, topics, partitions, offsets, timestampTypes, timestamps, conversionFailures);
if (this.headerMapper != null && record.headers() != null) {
addToConvertedHeaders(record.headers(), convertedHeaders);
if (!convertedHeaders.isEmpty()) {
Object obj = convertedHeaders.get(convertedHeaders.size() - 1).get(KafkaHeaders.LISTENER_INFO);
if (obj != null && obj instanceof String) {
listenerInfo = (String) obj;
}
Object obj = convertedHeaders.get(convertedHeaders.size() - 1).get(KafkaHeaders.LISTENER_INFO);
if (obj != null && obj instanceof String) {
listenerInfo = (String) obj;
}
} else {
natives.add(record.headers());
Expand All @@ -198,11 +195,10 @@ public Message<?> toMessage(List<ConsumerRecord<?, ?>> records, @Nullable Acknow
}

private void addToRawHeaders(Map<String, Object> rawHeaders, List<Map<String, Object>> convertedHeaders,
List<Headers> natives, List<ConsumerRecord<?, ?>> raws, List<ConversionException> conversionFailures) {
List<Headers> natives, List<ConsumerRecord<?, ?>> raws, List<ConversionException> conversionFailures) {
if (this.headerMapper != null) {
rawHeaders.put(KafkaHeaders.BATCH_CONVERTED_HEADERS, convertedHeaders);
}
else {
} else {
rawHeaders.put(KafkaHeaders.NATIVE_HEADERS, natives);
}
if (this.rawRecordHeader) {
Expand All @@ -212,8 +208,8 @@ private void addToRawHeaders(Map<String, Object> rawHeaders, List<Map<String, Ob
}

private void addRecordInfo(ConsumerRecord<?, ?> record, Type type, List<Object> payloads, List<Object> keys,
List<String> topics, List<Integer> partitions, List<Long> offsets, List<String> timestampTypes,
List<Long> timestamps, List<ConversionException> conversionFailures) {
List<String> topics, List<Integer> partitions, List<Long> offsets, List<String> timestampTypes,
List<Long> timestamps, List<ConversionException> conversionFailures) {
payloads.add(obtainPayload(type, record, conversionFailures));
keys.add(record.key());
topics.add(record.topic());
Expand Down Expand Up @@ -245,8 +241,9 @@ private void addToConvertedHeaders(Headers headers, List<Map<String, Object>> co
/**
* Subclasses can convert the value; by default, it's returned as provided by Kafka
* unless a {@link RecordMessageConverter} has been provided.
*
* @param record the record.
* @param type the required type.
* @param type the required type.
* @return the value.
*/
protected Object extractAndConvertValue(ConsumerRecord<?, ?> record, Type type) {
Expand All @@ -255,28 +252,26 @@ protected Object extractAndConvertValue(ConsumerRecord<?, ?> record, Type type)

/**
* Convert the record value.
* @param record the record.
* @param type the type - must be a {@link ParameterizedType} with a single generic
* type parameter.
*
* @param record the record.
* @param type the type - must be a {@link ParameterizedType} with a single generic
* type parameter.
* @param conversionFailures Conversion failures.
* @return the converted payload.
*/
protected Object convert(ConsumerRecord<?, ?> record, Type type, List<ConversionException> conversionFailures) {
try {
Object payload = this.recordConverter
.toMessage(record, null, null, ((ParameterizedType) type).getActualTypeArguments()[0]).getPayload();
.toMessage(record, null, null, ((ParameterizedType) type).getActualTypeArguments()[0]).getPayload();
conversionFailures.add(null);
return payload;
}
catch (ConversionException ex) {
} catch (ConversionException ex) {
byte[] original = null;
if (record.value() instanceof byte[]) {
original = (byte[]) record.value();
}
else if (record.value() instanceof Bytes) {
} else if (record.value() instanceof Bytes) {
original = ((Bytes) record.value()).get();
}
else if (record.value() instanceof String) {
} else if (record.value() instanceof String) {
original = ((String) record.value()).getBytes(StandardCharsets.UTF_8);
}
if (original != null) {
Expand All @@ -292,6 +287,7 @@ else if (record.value() instanceof String) {
/**
* Return true if the type is a parameterized type with a single generic type
* parameter.
*
* @param type the type.
* @return true if the conditions are met.
*/
Expand Down

0 comments on commit 2033cde

Please sign in to comment.