Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #632 ] Fix NPE caused by using @ ExtRocketMQTemplateConfiguration annotation extension to send messages #634

Merged
merged 1 commit into from
Mar 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.SmartLifecycle;
import org.springframework.core.OrderComparator;
import org.springframework.core.annotation.AnnotationUtils;

Expand All @@ -32,14 +33,16 @@
import java.util.function.BiFunction;
import java.util.stream.Collectors;

public class RocketMQMessageListenerBeanPostProcessor implements ApplicationContextAware, BeanPostProcessor, InitializingBean {
public class RocketMQMessageListenerBeanPostProcessor implements ApplicationContextAware, BeanPostProcessor, InitializingBean, SmartLifecycle {

private ApplicationContext applicationContext;

private AnnotationEnhancer enhancer;

private RocketMQMessageListenerContainerRegistrar listenerContainerRegistrar;

private boolean running = false;

@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
return bean;
Expand All @@ -58,6 +61,34 @@ public Object postProcessAfterInitialization(Object bean, String beanName) throw
return bean;
}

@Override
public int getPhase() {
return Integer.MAX_VALUE - 2000;
}

@Override
public void start() {
if (!isRunning()) {
this.setRunning(true);
listenerContainerRegistrar.startContainer();
}
}

@Override
public void stop() {

}

public void setRunning(boolean running) {
this.running = running;
}


@Override
public boolean isRunning() {
return running;
}

@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.util.StringUtils;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

public class RocketMQMessageListenerContainerRegistrar implements ApplicationContextAware {
Expand All @@ -52,6 +54,8 @@ public class RocketMQMessageListenerContainerRegistrar implements ApplicationCon

private final RocketMQMessageConverter rocketMQMessageConverter;

private final List<DefaultRocketMQListenerContainer> containers = new ArrayList<>();

public RocketMQMessageListenerContainerRegistrar(RocketMQMessageConverter rocketMQMessageConverter,
ConfigurableEnvironment environment, RocketMQProperties rocketMQProperties) {
this.rocketMQMessageConverter = rocketMQMessageConverter;
Expand Down Expand Up @@ -97,18 +101,25 @@ public void registerContainer(String beanName, Object bean, RocketMQMessageListe
genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class, () -> createRocketMQListenerContainer(containerBeanName, bean, annotation));
DefaultRocketMQListenerContainer container = genericApplicationContext.getBean(containerBeanName,
DefaultRocketMQListenerContainer.class);
if (!container.isRunning()) {
try {
container.start();
} catch (Exception e) {
log.error("Started container failed. {}", container, e);
throw new RuntimeException(e);
}
}

containers.add(container);

log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName);
}

public void startContainer() {
for (DefaultRocketMQListenerContainer container : containers) {
if (!container.isRunning()) {
try {
container.start();
} catch (Exception e) {
log.error("Started container failed. {}", container, e);
throw new RuntimeException(e);
}
}
}
}

private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String name, Object bean,
RocketMQMessageListener annotation) {
DefaultRocketMQListenerContainer container = new DefaultRocketMQListenerContainer();
Expand Down
Loading