Skip to content

Commit

Permalink
GH-3067: Mapping of multiple headers with same key with SimpleKafkaHe…
Browse files Browse the repository at this point in the history
…aderMapper
  • Loading branch information
poznachowski committed Oct 30, 2024
1 parent de5383f commit e88b4d7
Show file tree
Hide file tree
Showing 5 changed files with 222 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,23 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.assertj.core.util.Streams;

import org.springframework.messaging.MessageHeaders;
import org.springframework.util.Assert;
Expand All @@ -48,12 +51,14 @@
* @author Gary Russell
* @author Artem Bilan
* @author Soby Chacko
* @author Grzegorz Poznachowski
*
* @since 1.3
*
*/
public class DefaultKafkaHeaderMapper extends AbstractKafkaHeaderMapper {

private static final String ITERABLE_HEADER_TYPE_PATTERN = "%s#%s";

private static final String JAVA_LANG_STRING = "java.lang.String";

private static final Set<String> TRUSTED_ARRAY_TYPES = Set.of(
Expand Down Expand Up @@ -96,6 +101,7 @@ public class DefaultKafkaHeaderMapper extends AbstractKafkaHeaderMapper {
* {@code "!id", "!timestamp" and "*"}. In addition, most of the headers in
* {@link KafkaHeaders} are never mapped as headers since they represent data in
* consumer/producer records.
*
* @see #DefaultKafkaHeaderMapper(ObjectMapper)
*/
public DefaultKafkaHeaderMapper() {
Expand All @@ -110,6 +116,7 @@ public DefaultKafkaHeaderMapper() {
* {@code "!id", "!timestamp" and "*"}. In addition, most of the headers in
* {@link KafkaHeaders} are never mapped as headers since they represent data in
* consumer/producer records.
*
* @param objectMapper the object mapper.
* @see org.springframework.util.PatternMatchUtils#simpleMatch(String, String)
*/
Expand All @@ -128,6 +135,7 @@ public DefaultKafkaHeaderMapper(ObjectMapper objectMapper) {
* generally should not map the {@code "id" and "timestamp"} headers. Note:
* most of the headers in {@link KafkaHeaders} are ever mapped as headers since they
* represent data in consumer/producer records.
*
* @param patterns the patterns.
* @see org.springframework.util.PatternMatchUtils#simpleMatch(String, String)
*/
Expand All @@ -143,8 +151,9 @@ public DefaultKafkaHeaderMapper(String... patterns) {
* you generally should not map the {@code "id" and "timestamp"} headers. Note: most
* of the headers in {@link KafkaHeaders} are never mapped as headers since they
* represent data in consumer/producer records.
*
* @param objectMapper the object mapper.
* @param patterns the patterns.
* @param patterns the patterns.
* @see org.springframework.util.PatternMatchUtils#simpleMatch(String, String)
*/
public DefaultKafkaHeaderMapper(ObjectMapper objectMapper, String... patterns) {
Expand All @@ -160,6 +169,7 @@ private DefaultKafkaHeaderMapper(boolean outbound, ObjectMapper objectMapper, St

/**
* Create an instance for inbound mapping only with pattern matching.
*
* @param patterns the patterns to match.
* @return the header mapper.
* @since 2.8.8
Expand All @@ -170,8 +180,9 @@ public static DefaultKafkaHeaderMapper forInboundOnlyWithMatchers(String... patt

/**
* Create an instance for inbound mapping only with pattern matching.
*
* @param objectMapper the object mapper.
* @param patterns the patterns to match.
* @param patterns the patterns to match.
* @return the header mapper.
* @since 2.8.8
*/
Expand All @@ -181,6 +192,7 @@ public static DefaultKafkaHeaderMapper forInboundOnlyWithMatchers(ObjectMapper o

/**
* Return the object mapper.
*
* @return the mapper.
*/
protected ObjectMapper getObjectMapper() {
Expand All @@ -189,6 +201,7 @@ protected ObjectMapper getObjectMapper() {

/**
* Provide direct access to the trusted packages set for subclasses.
*
* @return the trusted packages.
* @since 2.2
*/
Expand All @@ -198,6 +211,7 @@ protected Set<String> getTrustedPackages() {

/**
* Provide direct access to the toString() classes by subclasses.
*
* @return the toString() classes.
* @since 2.2
*/
Expand All @@ -214,6 +228,7 @@ protected boolean isEncodeStrings() {
* raw String value is converted to a byte array using the configured charset. Set to
* true if a consumer of the outbound record is using Spring for Apache Kafka version
* less than 2.3
*
* @param encodeStrings true to encode (default false).
* @since 2.3
*/
Expand All @@ -234,6 +249,7 @@ public void setEncodeStrings(boolean encodeStrings) {
* If any of the supplied packages is {@code "*"}, all packages are trusted.
* If a class for a non-trusted package is encountered, the header is returned to the
* application with value of type {@link NonTrustedHeaderType}.
*
* @param packagesToTrust the packages to trust.
*/
public void addTrustedPackages(String... packagesToTrust) {
Expand All @@ -253,6 +269,7 @@ public void addTrustedPackages(String... packagesToTrust) {
/**
* Add class names that the outbound mapper should perform toString() operations on
* before mapping.
*
* @param classNames the class names.
* @since 2.2
*/
Expand All @@ -264,32 +281,15 @@ public void addToStringClasses(String... classNames) {
public void fromHeaders(MessageHeaders headers, Headers target) {
final Map<String, String> jsonHeaders = new HashMap<>();
final ObjectMapper headerObjectMapper = getObjectMapper();
headers.forEach((key, rawValue) -> {
if (matches(key, rawValue)) {
Object valueToAdd = headerValueToAddOut(key, rawValue);
if (valueToAdd instanceof byte[]) {
target.add(new RecordHeader(key, (byte[]) valueToAdd));
headers.forEach((key, value) -> {
if (matches(key, value)) {
if (value instanceof List<?> values) {
for (int i = 0; i < values.size(); i++) {
resolveHeader(key, values.get(i), target, jsonHeaders, i);
}
}
else {
try {
String className = valueToAdd.getClass().getName();
boolean encodeToJson = this.encodeStrings;
if (this.toStringClasses.contains(className)) {
valueToAdd = valueToAdd.toString();
className = JAVA_LANG_STRING;
encodeToJson = true;
}
if (!encodeToJson && valueToAdd instanceof String) {
target.add(new RecordHeader(key, ((String) valueToAdd).getBytes(getCharset())));
}
else {
target.add(new RecordHeader(key, headerObjectMapper.writeValueAsBytes(valueToAdd)));
}
jsonHeaders.put(key, className);
}
catch (Exception e) {
logger.error(e, () -> "Could not map " + key + " with type " + rawValue.getClass().getName());
}
resolveHeader(key, value, target, jsonHeaders, null);
}
}
});
Expand All @@ -303,34 +303,84 @@ public void fromHeaders(MessageHeaders headers, Headers target) {
}
}

@Override
public void toHeaders(Headers source, final Map<String, Object> headers) {
final Map<String, String> jsonTypes = decodeJsonTypes(source);
source.forEach(header -> {
String headerName = header.key();
if (headerName.equals(KafkaHeaders.DELIVERY_ATTEMPT) && matchesForInbound(headerName)) {
headers.put(headerName, ByteBuffer.wrap(header.value()).getInt());
}
else if (headerName.equals(KafkaHeaders.LISTENER_INFO) && matchesForInbound(headerName)) {
headers.put(headerName, new String(header.value(), getCharset()));
}
else if (headerName.equals(KafkaUtils.KEY_DESERIALIZER_EXCEPTION_HEADER) ||
headerName.equals(KafkaUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER)) {
headers.put(headerName, header);
}
else if (!(headerName.equals(JSON_TYPES)) && matchesForInbound(headerName)) {
if (jsonTypes.containsKey(headerName)) {
String requestedType = jsonTypes.get(headerName);
populateJsonValueHeader(header, requestedType, headers);
private void resolveHeader(String headerName, Object value, Headers target, Map<String, String> jsonHeaders, Integer headerIndex) {
Object valueToAdd = headerValueToAddOut(headerName, value);
if (valueToAdd instanceof byte[] byteArray) {
target.add(new RecordHeader(headerName, byteArray));
}
else {
try {
String className = valueToAdd.getClass().getName();
boolean encodeToJson = this.encodeStrings;
if (this.toStringClasses.contains(className)) {
valueToAdd = valueToAdd.toString();
className = JAVA_LANG_STRING;
encodeToJson = true;
}
if (!encodeToJson && valueToAdd instanceof String stringValue) {
target.add(new RecordHeader(headerName, stringValue.getBytes(getCharset())));
}
else {
headers.put(headerName, headerValueToAddIn(header));
target.add(new RecordHeader(headerName, this.objectMapper.writeValueAsBytes(valueToAdd)));
}
jsonHeaders.put(headerIndex == null ?
headerName :
ITERABLE_HEADER_TYPE_PATTERN.formatted(headerName, headerIndex), className);
}
});
catch (Exception e) {
logger.error(e, () -> "Could not map " + headerName + " with type " + value.getClass().getName());
}
}
}

@Override
public void toHeaders(Headers source, final Map<String, Object> target) {
final Map<String, String> jsonTypes = decodeJsonTypes(source);

Streams.stream(source)
.collect(Collectors.groupingBy(Header::key))
.forEach((headerName, headers) -> {
Header lastHeader = headers.get(headers.size() - 1);
if (headerName.equals(KafkaUtils.KEY_DESERIALIZER_EXCEPTION_HEADER) ||
headerName.equals(KafkaUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER)) {
target.put(headerName, lastHeader);
}
else if (headerName.equals(KafkaHeaders.DELIVERY_ATTEMPT) && matchesForInbound(headerName)) {
target.put(headerName, ByteBuffer.wrap(lastHeader.value()).getInt());
}
else if (headerName.equals(KafkaHeaders.LISTENER_INFO) && matchesForInbound(headerName)) {
target.put(headerName, new String(lastHeader.value(), getCharset()));
}
else if (!(headerName.equals(JSON_TYPES)) && matchesForInbound(headerName)) {
if (headers.size() == 1) {
if (jsonTypes.containsKey(headerName)) {
String requestedType = jsonTypes.get(headerName);
target.put(headerName, resolveJsonValueHeader(headers.get(0), requestedType));
}
else {
target.put(headerName, headerValueToAddIn(headers.get(0)));
}
}
else {
List<Object> valueList = new ArrayList<>();
for (int i = 0; i < headers.size(); i++) {
var jsonTypeIterableHeader = ITERABLE_HEADER_TYPE_PATTERN.formatted(headerName, i);
if (jsonTypes.containsKey(jsonTypeIterableHeader)) {
String requestedType = jsonTypes.get(jsonTypeIterableHeader);
valueList.add(resolveJsonValueHeader(headers.get(i), requestedType));
}
else {
valueList.add(headerValueToAddIn(headers.get(i)));
}
}
Collections.reverse(valueList);
target.put(headerName, valueList);
}
}
});
}

private void populateJsonValueHeader(Header header, String requestedType, Map<String, Object> headers) {
private Object resolveJsonValueHeader(Header header, String requestedType) {
Class<?> type = Object.class;
boolean trusted = false;
try {
Expand All @@ -343,22 +393,21 @@ private void populateJsonValueHeader(Header header, String requestedType, Map<St
logger.error(e, () -> "Could not load class for header: " + header.key());
}
if (String.class.equals(type) && (header.value().length == 0 || header.value()[0] != '"')) {
headers.put(header.key(), new String(header.value(), getCharset()));
return new String(header.value(), getCharset());
}
else {
if (trusted) {
try {
Object value = decodeValue(header, type);
headers.put(header.key(), value);
return decodeValue(header, type);
}
catch (IOException e) {
logger.error(e, () ->
"Could not decode json type: " + requestedType + " for key: " + header.key());
headers.put(header.key(), header.value());
return header.value();
}
}
else {
headers.put(header.key(), new NonTrustedHeaderType(header.value(), requestedType));
return new NonTrustedHeaderType(header.value(), requestedType);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023 the original author or authors.
* Copyright 2023-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,6 +17,7 @@
package org.springframework.kafka.support;

import java.nio.ByteBuffer;
import java.util.List;

import org.springframework.kafka.retrytopic.RetryTopicHeaders;
import org.springframework.lang.Nullable;
Expand Down Expand Up @@ -64,10 +65,27 @@ public int getNonBlockingRetryDeliveryAttempt() {
}

private int fromBytes(String headerName) {
byte[] header = getHeader(headerName, byte[].class);
byte[] header = getFirstHeaderIfIterable(headerName, byte[].class);
return header == null ? 1 : ByteBuffer.wrap(header).getInt();
}

@SuppressWarnings("unchecked")
@Nullable
public <T> T getFirstHeaderIfIterable(String key, Class<T> type) {
Object value = getHeader(key);
if (value == null) {
return null;
}
if (value instanceof List<?> iterable) {
value = iterable.get(0);
}
if (!type.isAssignableFrom(value.getClass())) {
throw new IllegalArgumentException("Incorrect type specified for header '" + key + "'. Expected [" + type
+ "] but actual type is [" + value.getClass() + "]");
}
return (T) value;
}

/**
* Get a header value with a specific type.
* @param <T> the type.
Expand Down
Loading

0 comments on commit e88b4d7

Please sign in to comment.