diff --git a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/PullConsumer.java b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/PullConsumer.java index fae7c20d0..5af653d64 100644 --- a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/PullConsumer.java +++ b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/PullConsumer.java @@ -37,7 +37,7 @@ public interface PullConsumer extends Closeable { * @param topic the topic that needs to be monitored. * @param listener the callback to detect the message queue changes. */ - void registerMessageQueueChangeListenerByTopic(String topic, TopicMessageQueueChangeListener listener); + void registerMessageQueueChangeListenerByTopic(String topic, TopicMessageQueueChangeListener listener) throws ClientException; /** * Fetch message queues of the topic. diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PullConsumerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PullConsumerImpl.java index 4b926f3fd..d08400dbb 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PullConsumerImpl.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PullConsumerImpl.java @@ -164,8 +164,9 @@ public String getConsumerGroup() { } @Override - public synchronized void registerMessageQueueChangeListenerByTopic(String topic, - TopicMessageQueueChangeListener listener) { + public void registerMessageQueueChangeListenerByTopic(String topic, TopicMessageQueueChangeListener listener) + throws ClientException { + // TODO: add lock? checkNotNull(topic, "topic should not be null"); checkNotNull(listener, "listener should not be null"); if (!this.isRunning()) { @@ -178,6 +179,7 @@ public synchronized void registerMessageQueueChangeListenerByTopic(String topic, topic); } topicMessageQueueChangeListenerMap.put(topic, listener); + fetchMessageQueues(topic); } public int getMaxCacheMessageCountEachQueue() { @@ -537,7 +539,8 @@ private List transformTopicRouteData(TopicRouteData topicRoute .collect(Collectors.toList()); } - public synchronized void onTopicRouteDataUpdate0(String topic, TopicRouteData topicRouteData) { + public void onTopicRouteDataUpdate0(String topic, TopicRouteData topicRouteData) { + // TODO: add lock? final List newMqs = transformTopicRouteData(topicRouteData); Set newMqSet = new HashSet<>(newMqs); final List oldMqs = topicMessageQueuesCache.get(topic);