Skip to content

Commit

Permalink
feat: interceptor filter pattern for serialization/deserialization co…
Browse files Browse the repository at this point in the history
…mmon cases such as Message and KStream (springwolf#441)

* feat: added test for spring messaging consumer bean

* added implmentation to extract instance of class

* refactor(core): use 1024mb of heap for test execution

Co-authored-by: Timon Back <[email protected]>

* refactor(core): extract SpringPayloadAnnotationTypeExtractor as spring bean

Co-authored-by: Timon Back <[email protected]>

* refactor(core): WIP add ListExtractor

Co-authored-by: Timon Back <[email protected]>

* refactor(core): WIP add ListExtractor

Co-authored-by: Timon Back <[email protected]>

* refactor: extract different types of generic payloads

* refactor: handle KStream with positional generic element correctly (wip)

* feat: handle nested generic arguments

* refactor: extract FunctionalChannelBeanBuilder

* refactor(core): extract TypeToClassConverter

* refactor(core): rename SpringPayloadAnnotationTypeExtractor to PayloadClassExtractor

* chore: fix code after rebase to springwolf/master

* feat(core): configure payload classes via properties

* feat(core)!: payload of type List is not extracted by default anymore

* feat(core): add option to disable default extractable classes

Co-authored-by: Timon Back <[email protected]>

---------

Co-authored-by: Sheheryar <[email protected]>
Co-authored-by: David Müller <[email protected]>
Co-authored-by: Timon Back <[email protected]>
  • Loading branch information
4 people authored Nov 24, 2023
1 parent 5d14776 commit 6b7d4b0
Show file tree
Hide file tree
Showing 53 changed files with 894 additions and 461 deletions.
3 changes: 3 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ allprojects {
test {
dependsOn spotlessApply // Automatically fix code formatting if possible

minHeapSize = "128m" // initial heap size
maxHeapSize = "1024m" // maximum heap size

useJUnitPlatform()
testLogging {
// showStandardStreams = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.github.stavshamir.springwolf.asyncapi.DefaultChannelsService;
import io.github.stavshamir.springwolf.asyncapi.SpringwolfInitApplicationListener;
import io.github.stavshamir.springwolf.asyncapi.scanners.channels.ChannelsScanner;
import io.github.stavshamir.springwolf.asyncapi.scanners.channels.payload.PayloadClassExtractor;
import io.github.stavshamir.springwolf.configuration.AsyncApiDocket;
import io.github.stavshamir.springwolf.configuration.AsyncApiDocketService;
import io.github.stavshamir.springwolf.configuration.DefaultAsyncApiDocketService;
Expand Down Expand Up @@ -86,4 +87,10 @@ public AsyncApiDocketService asyncApiDocketService(
public ExampleGenerator exampleGenerator() {
return new ExampleJsonGenerator();
}

@Bean
@ConditionalOnMissingBean
public PayloadClassExtractor payloadClassExtractor(SpringwolfConfigProperties springwolfConfigProperties) {
return new PayloadClassExtractor(springwolfConfigProperties);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.ProducerOperationDataScanner;
import io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.annotation.AsyncListenerAnnotationScanner;
import io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.annotation.AsyncPublisherAnnotationScanner;
import io.github.stavshamir.springwolf.asyncapi.scanners.channels.payload.PayloadClassExtractor;
import io.github.stavshamir.springwolf.asyncapi.scanners.classes.ComponentClassScanner;
import io.github.stavshamir.springwolf.asyncapi.scanners.classes.ConfigurationClassScanner;
import io.github.stavshamir.springwolf.configuration.AsyncApiDocketService;
Expand Down Expand Up @@ -80,12 +81,14 @@ public AsyncListenerAnnotationScanner asyncListenerAnnotationScanner(
ComponentClassScanner componentClassScanner,
SchemasService schemasService,
AsyncApiDocketService asyncApiDocketService,
PayloadClassExtractor payloadClassExtractor,
List<OperationBindingProcessor> operationBindingProcessors,
List<MessageBindingProcessor> messageBindingProcessors) {
return new AsyncListenerAnnotationScanner(
componentClassScanner,
schemasService,
asyncApiDocketService,
payloadClassExtractor,
operationBindingProcessors,
messageBindingProcessors);
}
Expand All @@ -100,12 +103,14 @@ public AsyncPublisherAnnotationScanner asyncPublisherAnnotationScanner(
ComponentClassScanner componentClassScanner,
SchemasService schemasService,
AsyncApiDocketService asyncApiDocketService,
PayloadClassExtractor payloadClassExtractor,
List<OperationBindingProcessor> operationBindingProcessors,
List<MessageBindingProcessor> messageBindingProcessors) {
return new AsyncPublisherAnnotationScanner(
componentClassScanner,
schemasService,
asyncApiDocketService,
payloadClassExtractor,
operationBindingProcessors,
messageBindingProcessors);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.asyncapi.v2.binding.operation.OperationBinding;
import io.github.stavshamir.springwolf.asyncapi.scanners.channels.ChannelMerger;
import io.github.stavshamir.springwolf.asyncapi.scanners.channels.ChannelsScanner;
import io.github.stavshamir.springwolf.asyncapi.scanners.channels.payload.PayloadClassExtractor;
import io.github.stavshamir.springwolf.asyncapi.scanners.classes.ComponentClassScanner;
import io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.Message;
import io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.PayloadReference;
Expand All @@ -29,7 +30,6 @@
import java.util.stream.Collectors;

import static io.github.stavshamir.springwolf.asyncapi.MessageHelper.toMessageObjectOrComposition;
import static io.github.stavshamir.springwolf.asyncapi.scanners.channels.annotation.SpringPayloadAnnotationTypeExtractor.getPayloadType;
import static java.util.stream.Collectors.toSet;

@Slf4j
Expand All @@ -42,6 +42,8 @@ public abstract class AbstractClassLevelListenerScanner<

private final SchemasService schemasService;

protected final PayloadClassExtractor payloadClassExtractor;

/**
* This annotation is used on class level
*
Expand Down Expand Up @@ -165,7 +167,7 @@ private Object getMessageObject(Set<Method> methods) {
}

private Message buildMessage(Method method) {
Class<?> payloadType = getPayloadType(method);
Class<?> payloadType = payloadClassExtractor.extractFrom(method);
String modelName = schemasService.register(payloadType);
String headerModelName = schemasService.register(this.buildHeaders(method));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ private List<Map.Entry<String, ChannelItem>> mapToChannels(Set<Class<?>> compone
* @param method The listener method.
* @return The class object of the payload received by the listener.
*/
// TODO: Inject SpringPayloadAnnotationTypeExtractor here with default implementation?
protected abstract Class<?> getPayloadType(Method method);

private Set<Method> getAnnotatedMethods(Class<?> type) {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
import com.asyncapi.v2.binding.operation.OperationBinding;
import io.github.stavshamir.springwolf.asyncapi.scanners.bindings.MessageBindingProcessor;
import io.github.stavshamir.springwolf.asyncapi.scanners.bindings.OperationBindingProcessor;
import io.github.stavshamir.springwolf.asyncapi.scanners.channels.annotation.SpringPayloadAnnotationTypeExtractor;
import io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.AbstractOperationDataScanner;
import io.github.stavshamir.springwolf.asyncapi.scanners.channels.payload.PayloadClassExtractor;
import io.github.stavshamir.springwolf.asyncapi.scanners.classes.ComponentClassScanner;
import io.github.stavshamir.springwolf.asyncapi.types.ConsumerData;
import io.github.stavshamir.springwolf.asyncapi.types.OperationData;
Expand Down Expand Up @@ -40,8 +40,9 @@ public class AsyncListenerAnnotationScanner extends AbstractOperationDataScanner
private final SchemasService schemasService;
private final AsyncApiDocketService asyncApiDocketService;

private final List<OperationBindingProcessor> operationBindingProcessors;
private final PayloadClassExtractor payloadClassExtractor;

private final List<OperationBindingProcessor> operationBindingProcessors;
private final List<MessageBindingProcessor> messageBindingProcessors;

@Override
Expand Down Expand Up @@ -108,10 +109,8 @@ private ConsumerData toConsumerData(
Message message,
AsyncListener annotation) {
AsyncOperation op = annotation.operation();
Class<?> payloadType = op.payloadType() != Object.class
? op.payloadType()
: SpringPayloadAnnotationTypeExtractor.getPayloadType(method);

Class<?> payloadType =
op.payloadType() != Object.class ? op.payloadType() : payloadClassExtractor.extractFrom(method);
return ConsumerData.builder()
.channelName(resolver.resolveStringValue(op.channelName()))
.description(resolver.resolveStringValue(op.description()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
import com.asyncapi.v2.binding.operation.OperationBinding;
import io.github.stavshamir.springwolf.asyncapi.scanners.bindings.MessageBindingProcessor;
import io.github.stavshamir.springwolf.asyncapi.scanners.bindings.OperationBindingProcessor;
import io.github.stavshamir.springwolf.asyncapi.scanners.channels.annotation.SpringPayloadAnnotationTypeExtractor;
import io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.AbstractOperationDataScanner;
import io.github.stavshamir.springwolf.asyncapi.scanners.channels.payload.PayloadClassExtractor;
import io.github.stavshamir.springwolf.asyncapi.scanners.classes.ComponentClassScanner;
import io.github.stavshamir.springwolf.asyncapi.types.OperationData;
import io.github.stavshamir.springwolf.asyncapi.types.ProducerData;
Expand Down Expand Up @@ -40,8 +40,9 @@ public class AsyncPublisherAnnotationScanner extends AbstractOperationDataScanne
private final SchemasService schemasService;
private final AsyncApiDocketService asyncApiDocketService;

private final List<OperationBindingProcessor> operationBindingProcessors;
private final PayloadClassExtractor payloadClassExtractor;

private final List<OperationBindingProcessor> operationBindingProcessors;
private final List<MessageBindingProcessor> messageBindingProcessors;

@Override
Expand Down Expand Up @@ -108,9 +109,8 @@ private ProducerData toConsumerData(
Message message,
AsyncPublisher annotation) {
AsyncOperation op = annotation.operation();
Class<?> payloadType = op.payloadType() != Object.class
? op.payloadType()
: SpringPayloadAnnotationTypeExtractor.getPayloadType(method);
Class<?> payloadType =
op.payloadType() != Object.class ? op.payloadType() : payloadClassExtractor.extractFrom(method);
return ProducerData.builder()
.channelName(resolver.resolveStringValue(op.channelName()))
.description(resolver.resolveStringValue(op.description()))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
// SPDX-License-Identifier: Apache-2.0
package io.github.stavshamir.springwolf.asyncapi.scanners.channels.payload;

import io.github.stavshamir.springwolf.configuration.properties.SpringwolfConfigProperties;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.messaging.handler.annotation.Payload;

import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.Arrays;
import java.util.Map;

@Slf4j
public class PayloadClassExtractor {
private final TypeToClassConverter typeToClassConverter;

public PayloadClassExtractor(SpringwolfConfigProperties properties) {
Map<String, Integer> extractableClasses = Map.of();
if (properties.getPayload() != null) {
extractableClasses = properties.getPayload().getExtractableClasses();
}
typeToClassConverter = new TypeToClassConverter(extractableClasses);
}

public Class<?> extractFrom(Method method) {
String methodName = String.format("%s::%s", method.getDeclaringClass().getSimpleName(), method.getName());
log.debug("Finding payload type for {}", methodName);

int parameterPayloadIndex =
getPayloadParameterIndex(method.getParameterTypes(), method.getParameterAnnotations(), methodName);

return typeToClassConverter.extractClass(method.getGenericParameterTypes()[parameterPayloadIndex]);
}

public Class<?> typeToClass(Type type) {
return typeToClassConverter.extractClass(type);
}

private int getPayloadParameterIndex(
Class<?>[] parameterClasses, Annotation[][] parameterAnnotations, String methodName) {
switch (parameterClasses.length) {
case 0 -> throw new IllegalArgumentException("Listener methods must not have 0 parameters: " + methodName);
case 1 -> {
return 0;
}
default -> {
int payloadAnnotatedParameterIndex = getPayloadAnnotatedParameterIndex(parameterAnnotations);
if (payloadAnnotatedParameterIndex == -1) {
String msg =
"Multi-parameter AsyncListener methods must have one parameter annotated with @Payload, "
+ "but none was found: "
+ methodName;

throw new IllegalArgumentException(msg);
}
return payloadAnnotatedParameterIndex;
}
}
}

private int getPayloadAnnotatedParameterIndex(Annotation[][] parameterAnnotations) {
for (int i = 0, length = parameterAnnotations.length; i < length; i++) {
Annotation[] annotations = parameterAnnotations[i];
boolean hasPayloadAnnotation = Arrays.stream(annotations).anyMatch(Payload.class::isInstance);

if (hasPayloadAnnotation) {
return i;
}
}

return -1;
}

@RequiredArgsConstructor
private static class TypeToClassConverter {

private final Map<String, Integer> extractableClassToArgumentIndex;

private Class<?> extractClass(Type parameterType) {
try {
if (parameterType instanceof ParameterizedType) {
Type rawParameterType = ((ParameterizedType) parameterType).getRawType();
String rawParameterTypeName = rawParameterType.getTypeName();

Class<?> actualPayloadClass =
extractActualGenericClass((ParameterizedType) parameterType, rawParameterTypeName);
if (actualPayloadClass != Void.class) {
return actualPayloadClass;
}

// nested generic class - fallback to most outer container
return Class.forName(rawParameterTypeName);
}

// no generics used - just a normal type
return Class.forName(parameterType.getTypeName());
} catch (Exception ex) {
log.info("Unable to extract generic data type of %s".formatted(parameterType), ex);
}
return Void.class;
}

private Class<?> extractActualGenericClass(ParameterizedType parameterType, String rawParameterTypeName) {
Type type = parameterType;
String typeName = rawParameterTypeName;

while (type instanceof ParameterizedType && extractableClassToArgumentIndex.containsKey(typeName)) {
Integer index = extractableClassToArgumentIndex.get(rawParameterTypeName);

type = ((ParameterizedType) type).getActualTypeArguments()[index];

typeName = type.getTypeName();
if (type instanceof ParameterizedType) {
typeName = ((ParameterizedType) type).getRawType().getTypeName();
}
}

try {
return Class.forName(typeName);
} catch (ClassNotFoundException ex) {
log.debug("Unable to find class for type %s".formatted(typeName), ex);
}

return Void.class;
}
}
}
Loading

0 comments on commit 6b7d4b0

Please sign in to comment.