Skip to content

Commit

Permalink
Some generics optimization in the MessagingMessageListenerAdapter
Browse files Browse the repository at this point in the history
* Fix warning in the `EmbeddedKafkaContextCustomizerTests.testTransactionReplicationFactor()`
  • Loading branch information
artembilan committed Nov 11, 2024
1 parent de99cf6 commit 00775a5
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ void testMulti() {
}

@Test
@SuppressWarnings("unchecked")
void testTransactionReplicationFactor() {
EmbeddedKafka annotationWithPorts =
AnnotationUtils.findAnnotation(TestWithEmbeddedKafkaTransactionFactor.class, EmbeddedKafka.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import org.springframework.util.ClassUtils;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import org.springframework.util.TypeUtils;

/**
* An abstract {@link org.springframework.kafka.listener.MessageListener} adapter
Expand Down Expand Up @@ -320,6 +321,20 @@ public void setBeanResolver(BeanResolver beanResolver) {
this.evaluationContext.addPropertyAccessor(new MapAccessor());
}

/**
* Set the retry callback for failures of both {@link CompletableFuture} and {@link Mono}.
* {@link MessagingMessageListenerAdapter#asyncFailure(Object, Acknowledgment, Consumer, Throwable, Message)}
* will invoke {@link MessagingMessageListenerAdapter#asyncRetryCallback} when
* {@link CompletableFuture} or {@link Mono} fails to complete.
* @param asyncRetryCallback the callback for async retry.
* @since 3.3
*/
public void setCallbackForAsyncFailure(
@Nullable BiConsumer<ConsumerRecord<K, V>, RuntimeException> asyncRetryCallback) {

this.asyncRetryCallback = asyncRetryCallback;
}

protected boolean isMessageList() {
return this.isMessageList;
}
Expand Down Expand Up @@ -392,6 +407,7 @@ public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekC

protected Message<?> toMessagingMessage(ConsumerRecord<K, V> cRecord, @Nullable Acknowledgment acknowledgment,
Consumer<?, ?> consumer) {

return getMessageConverter().toMessage(cRecord, acknowledgment, consumer, getType());
}

Expand Down Expand Up @@ -875,70 +891,47 @@ else if (isAck || isKotlinContinuation || isConsumer || annotationHeaderIsGroupI
private Type extractGenericParameterTypFromMethodParameter(MethodParameter methodParameter) {
Type genericParameterType = methodParameter.getGenericParameterType();
if (genericParameterType instanceof ParameterizedType parameterizedType) {
if (parameterizedType.getRawType().equals(Message.class)) {
Type rawType = parameterizedType.getRawType();
if (rawType.equals(Message.class)) {
genericParameterType = parameterizedType.getActualTypeArguments()[0];
}
else if (parameterizedType.getRawType().equals(List.class)
&& parameterizedType.getActualTypeArguments().length == 1) {

Type paramType = getTypeFromWildCardWithUpperBound(parameterizedType.getActualTypeArguments()[0]);
this.isConsumerRecordList = parameterIsType(paramType, ConsumerRecord.class);
boolean messageWithGeneric = rawByParameterIsType(paramType, Message.class);
this.isMessageList = Message.class.equals(paramType) || messageWithGeneric;
if (messageWithGeneric) {
else if (rawType.equals(List.class) && parameterizedType.getActualTypeArguments().length == 1) {
Type paramType = parameterizedType.getActualTypeArguments()[0];
boolean messageHasGeneric = paramType instanceof ParameterizedType pType
&& pType.getRawType().equals(Message.class);
this.isMessageList = TypeUtils.isAssignable(paramType, Message.class) || messageHasGeneric;
this.isConsumerRecordList = TypeUtils.isAssignable(paramType, ConsumerRecord.class);
if (messageHasGeneric) {
genericParameterType = ((ParameterizedType) paramType).getActualTypeArguments()[0];
}
}
else {
this.isConsumerRecords = parameterizedType.getRawType().equals(ConsumerRecords.class);
this.isConsumerRecords = rawType.equals(ConsumerRecords.class);
}
}
return genericParameterType;
}

private boolean annotationHeaderIsGroupId(MethodParameter methodParameter) {
private static boolean annotationHeaderIsGroupId(MethodParameter methodParameter) {
Header header = methodParameter.getParameterAnnotation(Header.class);
return header != null && KafkaHeaders.GROUP_ID.equals(header.value());
}

private Type getTypeFromWildCardWithUpperBound(Type paramType) {
if (paramType instanceof WildcardType wcType
&& wcType.getUpperBounds() != null
&& wcType.getUpperBounds().length > 0) {
paramType = wcType.getUpperBounds()[0];
}
return paramType;
}

private boolean isMessageWithNoTypeInfo(Type parameterType) {
private static boolean isMessageWithNoTypeInfo(Type parameterType) {
if (parameterType instanceof ParameterizedType pType && pType.getRawType().equals(Message.class)) {
return pType.getActualTypeArguments()[0] instanceof WildcardType;
}
return Message.class.equals(parameterType); // could be Message without a generic type
}

private boolean parameterIsType(Type parameterType, Type type) {
private static boolean parameterIsType(Type parameterType, Type type) {
return parameterType.equals(type) || rawByParameterIsType(parameterType, type);
}

private boolean rawByParameterIsType(Type parameterType, Type type) {
private static boolean rawByParameterIsType(Type parameterType, Type type) {
return parameterType instanceof ParameterizedType pType && pType.getRawType().equals(type);
}

/**
* Set the retry callback for failures of both {@link CompletableFuture} and {@link Mono}.
* {@link MessagingMessageListenerAdapter#asyncFailure(Object, Acknowledgment, Consumer, Throwable, Message)}
* will invoke {@link MessagingMessageListenerAdapter#asyncRetryCallback} when
* {@link CompletableFuture} or {@link Mono} fails to complete.
* @param asyncRetryCallback the callback for async retry.
* @since 3.3
*/
public void setCallbackForAsyncFailure(
@Nullable BiConsumer<ConsumerRecord<K, V>, RuntimeException> asyncRetryCallback) {

this.asyncRetryCallback = asyncRetryCallback;
}

/**
* Root object for reply expression evaluation.
* @param request the request.
Expand All @@ -947,6 +940,7 @@ public void setCallbackForAsyncFailure(
* @since 2.0
*/
public record ReplyExpressionRoot(Object request, Object source, Object result) {

}

static class NoOpAck implements Acknowledgment {
Expand Down

0 comments on commit 00775a5

Please sign in to comment.