Skip to content

Commit

Permalink
修改bug
Browse files Browse the repository at this point in the history
  • Loading branch information
tocrhz committed Nov 23, 2023
1 parent 8e0f6ed commit b243a58
Show file tree
Hide file tree
Showing 8 changed files with 128 additions and 108 deletions.
14 changes: 4 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ MQTT starter for Spring Boot, easier to use.
> This document is machine translated.
## 0. 修改记录
2023-11-23 `v2.0.1`
1. 修改了bug


2023-11-06 `v2.0.0`
1. 改了spring版本3.x, 重构了一下, 发送部分改了, 发送部分不兼容以前的版本了
Expand All @@ -15,15 +18,6 @@ MQTT starter for Spring Boot, easier to use.
2023-03-16 `v1.3.0`
1. `@MqttSubscribe` 注解添加嵌入参数支持(只有topic和client生效,详见`MqttSubscriber#afterInit`#14, #15


2022-11-23 `v1.2.8.1`
1. fix bug #19

2022-10-22 `v1.2.8`
1. 修复对象与字节转换的bug, 优先使用自定义的转换类,如果无法转换,再使用Spring的转换类
2. 新增内置String与对象互转的转换类,使用Jackson, 内置转换类不再使用匿名类
3. 修复一个配置的bug

...

## 1. import
Expand All @@ -32,7 +26,7 @@ MQTT starter for Spring Boot, easier to use.
<dependency>
<groupId>com.github.tocrhz</groupId>
<artifactId>mqtt-spring-boot-starter</artifactId>
<version>2.0.0</version>
<version>2.0.1</version>
</dependency>
```

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

<groupId>com.github.tocrhz</groupId>
<artifactId>mqtt-spring-boot-starter</artifactId>
<version>2.0.0</version>
<version>2.0.1</version>
<name>mqtt-spring-boot-starter</name>
<description>MQTT starter for Spring Boot, easier to use.</description>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import com.github.tocrhz.mqtt.properties.MqttConfigAdapter;
import com.github.tocrhz.mqtt.properties.MqttProperties;
import com.github.tocrhz.mqtt.publisher.MqttPublisher;
import com.github.tocrhz.mqtt.publisher.SimpleMqttClient;
import com.github.tocrhz.mqtt.subscriber.MqttSubscriber;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.springframework.beans.factory.ListableBeanFactory;
Expand All @@ -18,6 +17,8 @@
import org.springframework.context.annotation.Configuration;
import org.springframework.core.annotation.Order;

import java.util.LinkedList;

/**
* mqtt auto configuration
*
Expand Down Expand Up @@ -46,8 +47,7 @@ public MqttAutoConfiguration(ListableBeanFactory beanFactory, ConfigurableBeanFa
@Bean
@ConditionalOnMissingBean(MqttConfigAdapter.class)
public MqttConfigAdapter mqttConfigAdapter() {
return new MqttConfigAdapter() {
};
return MqttConfigAdapter.defaultAdapter();
}

/**
Expand All @@ -60,18 +60,22 @@ public MqttConfigAdapter mqttConfigAdapter() {
* @return MqttConnector
*/
@Bean
public MqttClientManager mqttClientManager(MqttProperties properties, MqttConfigAdapter adapter) {
@Order
@ConditionalOnMissingBean(MqttClientManager.class)
public MqttClientManager mqttClientManager(MqttSubscribeProcessor processor, MqttProperties properties, MqttConfigAdapter adapter) {

LinkedList<MqttSubscriber> subscribers = processor.getSubscribers();
// init property before connected.
adapter.beforeResolveEmbeddedValue(MqttSubscriber.list());
for (MqttSubscriber subscriber : MqttSubscriber.list()) {
adapter.beforeResolveEmbeddedValue(subscribers);
for (MqttSubscriber subscriber : subscribers) {
subscriber.resolveEmbeddedValue(factory);
}
adapter.afterResolveEmbeddedValue(MqttSubscriber.list());
MqttClientManager manager = new MqttClientManager(properties, adapter);
adapter.afterResolveEmbeddedValue(subscribers);
MqttClientManager manager = new MqttClientManager(subscribers, properties, adapter);
// 将mqtt客户端添加进去
properties.forEach(manager::clientNew);
// 建立连接
SimpleMqttClient.scheduled.execute(manager::afterInit);
manager.afterInit();
return manager;
}

Expand All @@ -82,6 +86,7 @@ public MqttClientManager mqttClientManager(MqttProperties properties, MqttConfig
* @return MqttPublisher
*/
@Bean
@Order
@ConditionalOnMissingBean(MqttPublisher.class)
public MqttPublisher mqttPublisher(MqttClientManager manager) {
return new MqttPublisher(manager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,31 +6,33 @@
import com.github.tocrhz.mqtt.properties.MqttProperties;
import com.github.tocrhz.mqtt.publisher.SimpleMqttClient;
import com.github.tocrhz.mqtt.subscriber.MqttSubscriber;
import com.github.tocrhz.mqtt.subscriber.TopicPair;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.IMqttAsyncClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

import java.util.*;
import java.util.stream.Collectors;
import java.util.LinkedHashMap;
import java.util.LinkedList;

/**
* 客户端连接管理一下
*/
@SuppressWarnings("unused")
public class MqttClientManager implements DisposableBean {
private final static Logger log = LoggerFactory.getLogger(MqttClientManager.class);
private final static LinkedHashMap<String, SimpleMqttClient> MQTT_CLIENT_MAP = new LinkedHashMap<>();
private final LinkedHashMap<String, SimpleMqttClient> clients = new LinkedHashMap<>();
private final LinkedList<MqttSubscriber> subscribers;
private final MqttProperties properties;
private final MqttConfigAdapter adapter;

private String defaultClientId = null;

public MqttClientManager(MqttProperties properties, MqttConfigAdapter adapter) {
public MqttClientManager(LinkedList<MqttSubscriber> subscribers, MqttProperties properties, MqttConfigAdapter adapter) {
this.properties = properties;
this.subscribers = subscribers;
this.adapter = adapter;
adapter.setProperties(properties);
}
Expand All @@ -41,7 +43,7 @@ public SimpleMqttClient clientNew(MqttConnectionProperties properties) {
Assert.hasText(clientId, "property clientId is required.");
Assert.notEmpty(properties.getUri(), "property uri cannot be empty.");
Assert.hasText(properties.getUri()[0], "property uri is required.");
if (MQTT_CLIENT_MAP.containsKey(clientId)) {
if (clients.containsKey(clientId)) {
clientClose(clientId);
}
// 填充默认值
Expand All @@ -56,7 +58,7 @@ void clientNew(String clientId, MqttConnectOptions options) {

public SimpleMqttClient clientNew(String clientId, MqttConnectOptions options, Integer defaultPublishQos) {
Assert.hasText(clientId, "clientId is required.");
if (MQTT_CLIENT_MAP.containsKey(clientId)) {
if (clients.containsKey(clientId)) {
clientClose(clientId);
}
// 创建客户端
Expand All @@ -69,20 +71,19 @@ public SimpleMqttClient clientNew(String clientId, MqttConnectOptions options, I
}
// 创建topic
boolean enableShared = this.properties.isEnableSharedSubscription(clientId);
Set<TopicPair> topicPairs = mergeTopics(clientId, enableShared);
int qos = defaultPublishQos != null ? defaultPublishQos : this.properties.getDefaultPublishQos(clientId);
// 创建客户端对象
SimpleMqttClient smc = new SimpleMqttClient(clientId, options, client, topicPairs, enableShared, qos, adapter);
MQTT_CLIENT_MAP.put(clientId, smc);
SimpleMqttClient smc = new SimpleMqttClient(clientId, client, options, enableShared, qos, subscribers, adapter);
clients.put(clientId, smc);
return smc;
}

public void clientClose(String clientId) {
if (MQTT_CLIENT_MAP.containsKey(clientId)) {
if (clients.containsKey(clientId)) {
if (defaultClientId != null && defaultClientId.equals(clientId)) {
String oldDefault = defaultClientId;
String newDefault = null;
for (SimpleMqttClient value : MQTT_CLIENT_MAP.values()) {
for (SimpleMqttClient value : clients.values()) {
if (!value.id().equals(clientId)) {
newDefault = value.id();
}
Expand All @@ -95,30 +96,29 @@ public void clientClose(String clientId) {
log.warn("default mqtt client '{}' closed, other client not exists. ", oldDefault);
}
}
SimpleMqttClient client = MQTT_CLIENT_MAP.remove(clientId);
SimpleMqttClient client = clients.remove(clientId);
client.close();
}
}

public SimpleMqttClient clientGetOrDefault(String clientId) {
if (StringUtils.hasText(clientId) && MQTT_CLIENT_MAP.containsKey(clientId)) {
return MQTT_CLIENT_MAP.get(clientId);
if (StringUtils.hasText(clientId) && clients.containsKey(clientId)) {
return clients.get(clientId);
}
return MQTT_CLIENT_MAP.get(defaultClientId);
return clients.get(defaultClientId);
}

public boolean setDefaultClientId(String clientId) {
if (StringUtils.hasText(clientId) && MQTT_CLIENT_MAP.containsKey(clientId)) {
if (StringUtils.hasText(clientId) && clients.containsKey(clientId)) {
defaultClientId = clientId;
return true;
}
return false;
}

void afterInit() {

// 初始化完成后,全部建立连接
MQTT_CLIENT_MAP.forEach((id, client) -> {
clients.forEach((id, client) -> {
try {
if (defaultClientId == null) {
defaultClientId = id;
Expand All @@ -130,66 +130,20 @@ void afterInit() {
});
}

/**
* 合并相似的主题(实际没啥用)
* merge the same topic
*
* @param clientId clientId
* @return TopicPairs
*/
private Set<TopicPair> mergeTopics(String clientId, boolean enableShared) {
Set<TopicPair> topicPairs = new HashSet<>();
for (MqttSubscriber subscriber : MqttSubscriber.list()) {
if (subscriber.containsClientId(clientId)) {
topicPairs.addAll(subscriber.getTopics());
}
}
if (topicPairs.isEmpty()) {
return topicPairs;
}
TopicPair[] pairs = new TopicPair[topicPairs.size()];
for (TopicPair topic : topicPairs) {
for (int i = 0; i < pairs.length; ++i) {
TopicPair pair = pairs[i];
if (pair == null) {
pairs[i] = topic;
break;
}
if (pair.getQos() != topic.getQos()) {
continue;
}
String temp = pair.getTopic(enableShared)
.replace('+', '\u0000')
.replace("#", "\u0000/\u0000");
if (MqttTopic.isMatched(topic.getTopic(enableShared), temp)) {
pairs[i] = topic;
continue;
}
temp = topic.getTopic(enableShared)
.replace('+', '\u0000')
.replace("#", "\u0000/\u0000");
if (MqttTopic.isMatched(pair.getTopic(enableShared), temp)) {
break;
}
}
}
return Arrays.stream(pairs).filter(Objects::nonNull).collect(Collectors.toSet());
}

@Override
public void destroy() {
// 卸载的时候把缓存清空
log.info("shutting down all mqtt client.");
MQTT_CLIENT_MAP.forEach((id, client) -> {
clients.forEach((id, client) -> {
try {
client.close();
} catch (Exception e) {
log.error("mqtt client '{}' close error: {}", id, e.getMessage(), e);
}
});
MQTT_CLIENT_MAP.clear();
clients.clear();
// 清空订阅处理方法缓存
MqttSubscriber.destroy();
subscribers.clear();
// 清空类型转换缓存
MqttConversionService.destroy();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.springframework.stereotype.Component;

import java.lang.reflect.Method;
import java.util.LinkedList;

/**
* When Bean is initialized, filter out the methods annotated with @MqttSubscribe, and create MqttSubscriber
Expand All @@ -21,13 +22,23 @@
@ConditionalOnProperty(prefix = "mqtt", name = "disable", havingValue = "false", matchIfMissing = true)
public class MqttSubscribeProcessor implements BeanPostProcessor {

private final LinkedList<MqttSubscriber> subscribers = new LinkedList<>();

public LinkedList<MqttSubscriber> getSubscribers() {
return subscribers;
}

public void add(MqttSubscriber subscriber) {
subscribers.add(subscriber);
}

@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
Method[] methods = bean.getClass().getMethods();
for (Method method : methods) {
if (method.isAnnotationPresent(MqttSubscribe.class)) {
SubscriberModel model = SubscriberModel.of(method.getAnnotation(MqttSubscribe.class));
MqttSubscriber.add(MqttSubscriber.of(model, bean, method));
add(MqttSubscriber.of(model, bean, method));
}
}
return bean;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,15 @@
*/
@SuppressWarnings("unused")
public abstract class MqttConfigAdapter {

/**
* 默认的
*/
public static MqttConfigAdapter defaultAdapter() {
return new MqttConfigAdapter() {
};
}

protected MqttProperties mqttProperties;

public final void setProperties(MqttProperties mqttProperties) {
Expand Down
Loading

0 comments on commit b243a58

Please sign in to comment.