Skip to content

Commit

Permalink
GH-3040: Add virtual threads customizer for RabbitMQ binder
Browse files Browse the repository at this point in the history
Fixes: #3040

This change adds out-of-the-box `ListenerContainerCustomizer<AbstractMessageListenerContainer>`
to set `VirtualThreadTaskExecutor` into an `AbstractMessageListenerContainer` created by the binder
when `Threading.VIRTUAL` condition is met
  • Loading branch information
artembilan committed Nov 14, 2024
1 parent 1b083c9 commit 31ef9e9
Showing 1 changed file with 16 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2023 the original author or authors.
* Copyright 2015-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -24,12 +24,15 @@
import org.springframework.amqp.rabbit.connection.AbstractConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionNameStrategy;
import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
import org.springframework.amqp.support.postprocessor.DelegatingDecompressingPostProcessor;
import org.springframework.amqp.support.postprocessor.GZipPostProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.amqp.RabbitProperties;
import org.springframework.boot.autoconfigure.condition.ConditionalOnThreading;
import org.springframework.boot.autoconfigure.context.PropertyPlaceholderAutoConfiguration;
import org.springframework.boot.autoconfigure.thread.Threading;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.stream.binder.rabbit.RabbitMessageChannelBinder;
import org.springframework.cloud.stream.binder.rabbit.properties.RabbitBinderConfigurationProperties;
Expand All @@ -42,6 +45,7 @@
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.core.task.VirtualThreadTaskExecutor;
import org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter;
import org.springframework.integration.amqp.inbound.AmqpMessageSource;
import org.springframework.lang.Nullable;
Expand Down Expand Up @@ -150,4 +154,15 @@ MessagePostProcessor gZipPostProcessor() {
RabbitExchangeQueueProvisioner provisioningProvider(List<DeclarableCustomizer> customizers) {
return new RabbitExchangeQueueProvisioner(this.rabbitConnectionFactory, customizers);
}

@Bean
@ConditionalOnThreading(Threading.VIRTUAL)
ListenerContainerCustomizer<MessageListenerContainer> listenerContainerVirtualThreadExecutorCustomizer() {
return (container, destinationName, group) -> {
if (container instanceof AbstractMessageListenerContainer listenerContainer) {
listenerContainer.setTaskExecutor(new VirtualThreadTaskExecutor(destinationName + "-"));
}
};
}

}

0 comments on commit 31ef9e9

Please sign in to comment.