Skip to content

Commit

Permalink
refactor(Break Change): 重构共享主题实现,使之符合 mqtt5 协议规范.
Browse files Browse the repository at this point in the history
1. 重构共享主题实现,使之符合 mqtt5 协议规范
2. springboot 2.6.9 -> 2.7.10
3. readme.md 更新
4. ClientSub 增加了 shareName 字段,导致 kryo 反序列化之前的数据失败.
  • Loading branch information
Amazingwujun committed Apr 18, 2023
1 parent d82315f commit 7fc4279
Show file tree
Hide file tree
Showing 12 changed files with 169 additions and 94 deletions.
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.9</version>
<version>2.7.10</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.jun</groupId>
<artifactId>mqttx</artifactId>
<version>1.2.1</version>
<version>1.2.2</version>
<name>mqttx</name>
<description>mqtt broker</description>
<url>https://github.com/Amazingwujun/mqttx</url>
Expand Down
16 changes: 7 additions & 9 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -254,15 +254,14 @@ services:

#### 4.6 共享主题支持

共享订阅是 `mqtt5` 协议规定的内容,很多 mq(例如 `kafka`) 都有实现
共享订阅是协议 `mqtt5` 规定的内容,**`MQTTX`** 参考协议标准实现

1. `mqttx.share-topic.enable`: 功能开关,默认 `true`
1. 格式: `$share/{ShareName}/{filter}`, `$share` 为前缀, `ShareName` 为共享订阅名, `filter` 就是非共享订阅主题过滤器。

2. 格式: `$share/{ShareName}/{filter}`, `$share` 为前缀, `ShareName` 为共享订阅名, `filter` 就是非共享订阅主题过滤器。

3. 目前支持 `hash`, `random`, `round` 三种规则

> `hash` 选出的 **client** 会随着**订阅客户端数量**及**发送消息客户端 `clientId`** 变化而变化
2. 支持如下两种消息分发规则
1. `round`: 轮询
2. `random`: 随机
3. 详细内容请参考协议 [MQTT Version 5.0 (oasis-open.org)](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901250)

下图展示了共享主题与常规主题之间的差异:

Expand Down Expand Up @@ -569,8 +568,7 @@ Content-Length: 91
| `mqttx.websocket.enable` | `false` | websocket 开关 |
| `mqttx.websocket.port` | `8083` | websocket 监听端口 |
| `mqttx.websocket.path` | `/mqtt` | websocket path |
| `mqttx.share-topic.enable` | `true` | 共享主题功能开关 |
| `mqttx.share-topic.share-sub-strategy` | `round` | 负载均衡策略, 目前支持随机、轮询、哈希 |
| `mqttx.share-topic.share-sub-strategy` | `round` | 负载均衡策略, 目前支持随机、轮询 |
| `mqttx.sys-topic.enable` | `false` | 系统主题功能开关 |
| `mqttx.sys-topic.interval` | `60s` | 定时发布间隔 |
| `mqttx.message-bridge.enable` | `false` | 消息桥接功能开关 |
Expand Down
97 changes: 54 additions & 43 deletions src/main/java/com/jun/mqttx/broker/handler/PublishHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import com.jun.mqttx.utils.JsonSerializer;
import com.jun.mqttx.utils.RateLimiter;
import com.jun.mqttx.utils.Serializer;
import com.jun.mqttx.utils.TopicUtils;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.mqtt.*;
Expand All @@ -42,7 +41,9 @@
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.GroupedFlux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import java.time.Instant;
import java.util.*;
Expand All @@ -69,7 +70,7 @@ public class PublishHandler extends AbstractMqttTopicSecureHandler implements Wa
private final IPublishMessageService publishMessageService;
private final IPubRelMessageService pubRelMessageService;
private final String brokerId;
private final boolean enableTopicSubPubSecure, enableShareTopic, enableRateLimiter, ignoreClientSelfPub;
private final boolean enableTopicSubPubSecure, enableRateLimiter, ignoreClientSelfPub;
/** 共享主题轮询策略 */
private final ShareStrategy shareStrategy;
/** 消息桥接开关 */
Expand All @@ -86,20 +87,25 @@ public class PublishHandler extends AbstractMqttTopicSecureHandler implements Wa

//@formatter:on

public PublishHandler(IPublishMessageService publishMessageService, IRetainMessageService retainMessageService,
ISubscriptionService subscriptionService, IPubRelMessageService pubRelMessageService, ISessionService sessionService,
@Nullable IInternalMessagePublishService internalMessagePublishService, MqttxConfig config,
@Nullable KafkaTemplate<String, byte[]> kafkaTemplate, Serializer serializer) {
public PublishHandler(IPublishMessageService publishMessageService,
IRetainMessageService retainMessageService,
ISubscriptionService subscriptionService,
IPubRelMessageService pubRelMessageService,
ISessionService sessionService,
@Nullable IInternalMessagePublishService internalMessagePublishService,
MqttxConfig config,
@Nullable KafkaTemplate<String, byte[]> kafkaTemplate,
Serializer serializer) {
super(config.getCluster().getEnable());
Assert.notNull(publishMessageService, "publishMessageService can't be null");
Assert.notNull(retainMessageService, "retainMessageService can't be null");
Assert.notNull(subscriptionService, "publishMessageService can't be null");
Assert.notNull(pubRelMessageService, "publishMessageService can't be null");
Assert.notNull(config, "mqttxConfig can't be null");

MqttxConfig.ShareTopic shareTopic = config.getShareTopic();
MqttxConfig.MessageBridge messageBridge = config.getMessageBridge();
MqttxConfig.RateLimiter rateLimiter = config.getRateLimiter();
var shareTopic = config.getShareTopic();
var messageBridge = config.getMessageBridge();
var rateLimiter = config.getRateLimiter();
this.sessionService = sessionService;
this.serializer = serializer;
this.publishMessageService = publishMessageService;
Expand All @@ -109,7 +115,6 @@ public PublishHandler(IPublishMessageService publishMessageService, IRetainMessa
this.brokerId = config.getBrokerId();
this.enableTopicSubPubSecure = config.getEnableTopicSubPubSecure();
this.ignoreClientSelfPub = config.getIgnoreClientSelfPub();
this.enableShareTopic = shareTopic.getEnable();
if (!CollectionUtils.isEmpty(rateLimiter.getTopicRateLimits()) && rateLimiter.getEnable()) {
enableRateLimiter = true;
rateLimiter.getTopicRateLimits()
Expand Down Expand Up @@ -196,13 +201,16 @@ public void process(ChannelHandlerContext ctx, MqttMessage msg) {

// 响应
switch (qos) {
case AT_MOST_ONCE -> publish(pubMsg, ctx, false).doOnSuccess(unused -> {
if (retain) {
handleRetainMsg(pubMsg).subscribe();
}
}).subscribe();
case AT_MOST_ONCE -> publish(pubMsg, ctx, false)
.publishOn(Schedulers.boundedElastic())
.doOnSuccess(unused -> {
if (retain) {
handleRetainMsg(pubMsg).subscribe();
}
}).subscribe();
case AT_LEAST_ONCE -> {
publish(pubMsg, ctx, false)
.publishOn(Schedulers.boundedElastic())
.doOnSuccess(unused -> {
MqttMessage pubAck = MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.PUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
Expand All @@ -223,6 +231,7 @@ public void process(ChannelHandlerContext ctx, MqttMessage msg) {
Session session = getSession(ctx);
if (!session.isDupMsg(packetId)) {
publish(pubMsg, ctx, false)
.publishOn(Schedulers.boundedElastic())
.doOnSuccess(unused -> {
// 保存 pub
session.savePubRelInMsg(packetId);
Expand Down Expand Up @@ -261,9 +270,11 @@ public void process(ChannelHandlerContext ctx, MqttMessage msg) {
return Mono.empty();
} else {
return publish(pubMsg, ctx, false)
.publishOn(Schedulers.boundedElastic())
.doOnSuccess(unused -> pubRelMessageService.saveIn(clientId(ctx), packetId).subscribe());
}
})
.publishOn(Schedulers.boundedElastic())
.doOnSuccess(unused -> {
var pubRec = MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.PUBREC, false, MqttQoS.AT_MOST_ONCE, false, 0),
Expand Down Expand Up @@ -313,7 +324,7 @@ public Mono<Void> publish(final PubMsg pubMsg, ChannelHandlerContext ctx, boolea
}

// 获取 topic 订阅者 id 列表
String topic = pubMsg.getTopic();
final var topic = pubMsg.getTopic();
Flux<ClientSub> clientSubFlux = subscriptionService.searchSubscribeClientList(topic)
.filter(clientSub -> {
if (ignoreClientSelfPub) {
Expand All @@ -325,27 +336,28 @@ public Mono<Void> publish(final PubMsg pubMsg, ChannelHandlerContext ctx, boolea
});

// 共享订阅
if (enableShareTopic && TopicUtils.isShare(topic)) {
return clientSubFlux.collectList()
.map(e -> chooseClient(e, topic))
.flatMap(clientSub -> {
pubMsg.setAppointedClientId(clientSub.getClientId());
return publish0(clientSub, pubMsg, isClusterMessage).doOnSuccess(unused -> {
// 满足如下条件,则发送消息给集群
// 1 集群模式开启
// 2 订阅的客户端连接在其它实例上
if (isClusterMode() && !ConnectHandler.CLIENT_MAP.containsKey(clientSub.getClientId())) {
internalMessagePublish(pubMsg);
}
});
})
.then();
}
var f1 = clientSubFlux.filter(ClientSub::isShareSub)
.groupBy(ClientSub::getShareName)
.flatMap(GroupedFlux::collectList)
.map(t -> chooseClient(t, topic))
.flatMap(clientSub -> {
pubMsg.setAppointedClientId(clientSub.getClientId());
return publish0(clientSub, pubMsg, isClusterMessage).doOnSuccess(unused -> {
// 满足如下条件,则发送消息给集群
// 1 集群模式开启
// 2 订阅的客户端连接在其它实例上
if (isClusterMode() && !ConnectHandler.CLIENT_MAP.containsKey(clientSub.getClientId())) {
internalMessagePublish(pubMsg);
}
});
});

return clientSubFlux
// 普通订阅
var f2 = clientSubFlux
.filter(ClientSub::notShareSub)
.collectList()
.doOnSuccess(lst -> {
// 将消息推送给集群中的broker
// 将消息推送给集群中的 broker
if (isClusterMode() && !isClusterMessage) {
// 判断是否需要进行集群消息分发
boolean flag = false;
Expand All @@ -361,8 +373,9 @@ public Mono<Void> publish(final PubMsg pubMsg, ChannelHandlerContext ctx, boolea
}
})
.flatMapIterable(Function.identity())
.flatMap(clientSub -> publish0(clientSub, pubMsg, isClusterMessage))
.then();
.flatMap(clientSub -> publish0(clientSub, pubMsg, isClusterMessage));

return Mono.when(f1, f2);
}

/**
Expand Down Expand Up @@ -544,7 +557,6 @@ public boolean support(String channel) {
* 共享订阅选择客户端, 支持的策略如下:
* <ol>
* <li>随机: {@link ShareStrategy#random}</li>
* <li>哈希: {@link ShareStrategy#hash}</li>
* <li>轮询: {@link ShareStrategy#round}</li>
* </ol>
*
Expand All @@ -554,15 +566,14 @@ public boolean support(String channel) {
private ClientSub chooseClient(List<ClientSub> clientSubList, String topic) {
// 集合排序
clientSubList.sort(ClientSub::compareTo);
final var size = clientSubList.size();

if (hash == shareStrategy) {
return clientSubList.get(topic.hashCode() % clientSubList.size());
} else if (random == shareStrategy) {
int key = ThreadLocalRandom.current().nextInt(0, clientSubList.size());
return clientSubList.get(key % clientSubList.size());
if (random == shareStrategy) {
int key = ThreadLocalRandom.current().nextInt(0, size);
return clientSubList.get(key % size);
} else if (round == shareStrategy) {
int i = roundMap.computeIfAbsent(topic, s -> new AtomicInteger(0)).getAndIncrement();
return clientSubList.get(i % clientSubList.size());
return clientSubList.get(i % size);
}

throw new IllegalArgumentException("不可能到达的代码, strategy:" + shareStrategy);
Expand Down
12 changes: 10 additions & 2 deletions src/main/java/com/jun/mqttx/broker/handler/SubscribeHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.jun.mqttx.config.MqttxConfig;
import com.jun.mqttx.entity.BrokerStatus;
import com.jun.mqttx.entity.ClientSub;
import com.jun.mqttx.entity.ShareTopic;
import com.jun.mqttx.service.IRetainMessageService;
import com.jun.mqttx.service.ISubscriptionService;
import com.jun.mqttx.utils.TopicUtils;
Expand Down Expand Up @@ -100,8 +101,15 @@ public void process(final ChannelHandlerContext ctx, MqttMessage msg) {
List<Integer> grantedQosLevels = new ArrayList<>(mqttTopicSubscriptions.size());
var needSave = new ArrayList<ClientSub>();
mqttTopicSubscriptions.forEach(mqttTopicSubscription -> {
final String topic = mqttTopicSubscription.topicName();
String topic = mqttTopicSubscription.topicName();
int qos = mqttTopicSubscription.qualityOfService().value();
final var isShareTopic = TopicUtils.isShare(topic);
String shareName = null;
if (isShareTopic) {
ShareTopic shareTopic = TopicUtils.parseFrom(topic);
topic = shareTopic.filter();
shareName = shareTopic.name();
}

if (!TopicUtils.isValid(topic)) {
// Failure
Expand All @@ -120,7 +128,7 @@ public void process(final ChannelHandlerContext ctx, MqttMessage msg) {
if (TopicUtils.isSys(topic)) {
qos = 0x80;
} else {
ClientSub clientSub = ClientSub.of(clientId, qos, topic, isCleanSession(ctx));
ClientSub clientSub = ClientSub.of(clientId, qos, topic, isCleanSession(ctx), shareName);
needSave.add(clientSub);
}
}
Expand Down
46 changes: 21 additions & 25 deletions src/main/java/com/jun/mqttx/broker/handler/UnsubscribeHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,63 +17,56 @@
package com.jun.mqttx.broker.handler;

import com.jun.mqttx.config.MqttxConfig;
import com.jun.mqttx.entity.ShareTopic;
import com.jun.mqttx.entity.Tuple2;
import com.jun.mqttx.service.ISubscriptionService;
import com.jun.mqttx.utils.TopicUtils;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.mqtt.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.ObjectUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

/**
* {@link MqttMessageType#UNSUBSCRIBE} 消息处理器
*
* @author Jun
* @since 1.0.4
*/
@Slf4j
@Handler(type = MqttMessageType.UNSUBSCRIBE)
public class UnsubscribeHandler extends AbstractMqttSessionHandler {

private final Boolean enableSysTopic;
private final ISubscriptionService subscriptionService;

public UnsubscribeHandler(MqttxConfig config, ISubscriptionService subscriptionService) {
super(config.getCluster().getEnable());
this.enableSysTopic = config.getSysTopic().getEnable();
this.subscriptionService = subscriptionService;
}

@Override
public void process(ChannelHandlerContext ctx, MqttMessage msg) {
MqttUnsubscribeMessage mqttUnsubscribeMessage = (MqttUnsubscribeMessage) msg;
int messageId = mqttUnsubscribeMessage.variableHeader().messageId();
MqttUnsubscribePayload payload = mqttUnsubscribeMessage.payload();
final var mqttUnsubscribeMessage = (MqttUnsubscribeMessage) msg;
final var messageId = mqttUnsubscribeMessage.variableHeader().messageId();
final var payload = mqttUnsubscribeMessage.payload();

// 系统主题
List<String> collect = new ArrayList<>(payload.topics());
if (enableSysTopic) {
List<String> unSubSysTopics = collect.stream().filter(TopicUtils::isSys).collect(Collectors.toList());
collect.removeAll(unSubSysTopics);
Mono.when(unsubscribeSysTopics(unSubSysTopics, ctx), subscriptionService.unsubscribe(clientId(ctx), isCleanSession(ctx), collect))
.doOnSuccess(unused -> {
// response
MqttMessage mqttMessage = MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
MqttMessageIdVariableHeader.from(messageId),
null
);
ctx.writeAndFlush(mqttMessage);
}).subscribe();
return;
}
var generalTopics = new ArrayList<>(payload.topics());
var unSubSysTopics = generalTopics.stream().filter(TopicUtils::isSys).toList();

// 移除系统主题
generalTopics.removeAll(unSubSysTopics);

// 非系统主题
subscriptionService.unsubscribe(clientId(ctx), isCleanSession(ctx), collect)
// 删除订阅
Mono.when(unsubscribeSysTopics(unSubSysTopics, ctx), subscriptionService.unsubscribe(clientId(ctx), isCleanSession(ctx), generalTopics))
.doOnError(throwable -> log.error(String.format("主题订阅[%s]删除失败", generalTopics), throwable))
.doOnSuccess(unused -> {
// response
MqttMessage mqttMessage = MqttMessageFactory.newMessage(
var mqttMessage = MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
MqttMessageIdVariableHeader.from(messageId),
null
Expand All @@ -89,6 +82,9 @@ public void process(ChannelHandlerContext ctx, MqttMessage msg) {
* @param ctx {@link ChannelHandlerContext}
*/
private Mono<Void> unsubscribeSysTopics(List<String> unSubSysTopics, ChannelHandlerContext ctx) {
if (ObjectUtils.isEmpty(unSubSysTopics)) {
return Mono.empty();
}
return subscriptionService.unsubscribeSys(clientId(ctx), unSubSysTopics);
}
}
Loading

0 comments on commit 7fc4279

Please sign in to comment.