From dc2fb9c1a5d49d1816c0af3de8c7e9d01a743a9b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=A4=A7=E7=9F=B3=E5=A4=B4?= Date: Wed, 11 Dec 2024 10:40:45 +0800 Subject: [PATCH] =?UTF-8?q?=E5=BC=82=E6=AD=A5=E6=93=8D=E4=BD=9C=E5=8A=A0?= =?UTF-8?q?=E4=B8=8A=20CancellationToken?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- NewLife.MQTT/MqttClient.cs | 55 ++++++++++++-------- NewLife.MqttServer/NewLife.MqttServer.csproj | 2 +- 2 files changed, 33 insertions(+), 24 deletions(-) diff --git a/NewLife.MQTT/MqttClient.cs b/NewLife.MQTT/MqttClient.cs index c1f6124..a750eb4 100644 --- a/NewLife.MQTT/MqttClient.cs +++ b/NewLife.MQTT/MqttClient.cs @@ -196,8 +196,9 @@ private void Init() /// 发送命令 /// 消息 /// 是否等待响应 + /// /// - protected virtual async Task SendAsync(MqttMessage msg, Boolean waitForResponse = true) + protected virtual async Task SendAsync(MqttMessage msg, Boolean waitForResponse, CancellationToken cancellationToken) { if (msg is MqttIdMessage idm && idm.Id == 0 && (msg.Type != MqttType.Publish || msg.QoS > 0)) idm.Id = (UInt16)Interlocked.Increment(ref g_id); @@ -226,7 +227,7 @@ private void Init() return null; } - var rs = await client.SendMessageAsync(msg).ConfigureAwait(false); + var rs = await client.SendMessageAsync(msg, cancellationToken).ConfigureAwait(false); // 重置 _taskCanceledCount = 0; @@ -357,8 +358,9 @@ private void Client_Received(Object sender, ReceivedEventArgs e) public event EventHandler? Connected; /// 连接服务端 + /// /// - public Task ConnectAsync() + public Task ConnectAsync(CancellationToken cancellationToken = default) { if (ClientId.IsNullOrEmpty()) throw new ArgumentNullException(nameof(ClientId)); @@ -370,13 +372,14 @@ public Task ConnectAsync() CleanSession = CleanSession, }; - return ConnectAsync(message); + return ConnectAsync(message, cancellationToken); } /// 连接服务端 /// + /// /// - public async Task ConnectAsync(ConnectMessage message) + public async Task ConnectAsync(ConnectMessage message, CancellationToken cancellationToken = default) { if (message == null) throw new ArgumentNullException(nameof(message)); @@ -391,7 +394,7 @@ public async Task ConnectAsync(ConnectMessage message) // 心跳 if (KeepAlive > 0 && message.KeepAliveInSeconds == 0) message.KeepAliveInSeconds = (UInt16)KeepAlive; - var rs = (await SendAsync(message).ConfigureAwait(false)) as ConnAck; + var rs = (await SendAsync(message, true, cancellationToken).ConfigureAwait(false)) as ConnAck; // 判断响应,是否成功连接 if (rs!.ReturnCode != ConnectReturnCode.Accepted) @@ -414,7 +417,7 @@ public async Task ConnectAsync(ConnectMessage message) Requests = _subs.Values.ToArray(), }; - var rs2 = (await SendAsync(message2).ConfigureAwait(false)) as SubAck; + var rs2 = (await SendAsync(message2, true, cancellationToken).ConfigureAwait(false)) as SubAck; if (rs2 == null) _subs.Clear(); } @@ -423,11 +426,11 @@ public async Task ConnectAsync(ConnectMessage message) /// 断开连接 /// - public async Task DisconnectAsync() + public async Task DisconnectAsync(CancellationToken cancellationToken = default) { var message = new DisconnectMessage(); - await SendAsync(message, false).ConfigureAwait(false); + await SendAsync(message, true, cancellationToken).ConfigureAwait(false); var e = new EventArgs(); Disconnected?.Invoke(this, e); @@ -448,7 +451,7 @@ private void Client_Closed(Object sender, EventArgs e) if (Disposed || !Reconnect) return; WriteLog("尝试重新连接"); - ConnectAsync().GetAwaiter(); + ConnectAsync().Wait(Timeout); } #endregion @@ -476,17 +479,18 @@ private void Client_Closed(Object sender, EventArgs e) /// 发布消息 /// + /// /// - public async Task PublishAsync(PublishMessage message) + public async Task PublishAsync(PublishMessage message, CancellationToken cancellationToken = default) { if (message == null) throw new ArgumentNullException(nameof(message)); - var rs = (await SendAsync(message, message.QoS != QualityOfService.AtMostOnce).ConfigureAwait(false)) as MqttIdMessage; + var rs = (await SendAsync(message, message.QoS != QualityOfService.AtMostOnce, cancellationToken).ConfigureAwait(false)) as MqttIdMessage; if (rs is PubRec) { var rel = new PubRel(); - var cmp = (await SendAsync(rel, true).ConfigureAwait(false)) as PubComp; + var cmp = (await SendAsync(rel, true, cancellationToken).ConfigureAwait(false)) as PubComp; return cmp; } @@ -498,30 +502,33 @@ private void Client_Closed(Object sender, EventArgs e) /// 订阅主题 /// 主题过滤器 /// 收到该主题消息时的回调 + /// /// - public Task SubscribeAsync(String topicFilter, Action? callback = null) + public Task SubscribeAsync(String topicFilter, Action? callback = null, CancellationToken cancellationToken = default) { var subscription = new Subscription(topicFilter, QualityOfService.AtMostOnce); - return SubscribeAsync([subscription], callback); + return SubscribeAsync([subscription], callback, cancellationToken); } /// 订阅主题 /// 主题过滤器 /// 服务质量 + /// /// - public Task SubscribeAsync(String[] topicFilters, QualityOfService qos = QualityOfService.AtMostOnce) + public Task SubscribeAsync(String[] topicFilters, QualityOfService qos = QualityOfService.AtMostOnce, CancellationToken cancellationToken = default) { var subscriptions = topicFilters.Select(e => new Subscription(e, qos)).ToList(); - return SubscribeAsync(subscriptions); + return SubscribeAsync(subscriptions, null, cancellationToken); } /// 订阅主题 /// 订阅集合 /// 收到该主题消息时的回调 + /// /// - public async Task SubscribeAsync(IList subscriptions, Action? callback = null) + public async Task SubscribeAsync(IList subscriptions, Action? callback = null, CancellationToken cancellationToken = default) { // 已订阅,不重复 subscriptions = subscriptions.Where(e => !_subs.ContainsKey(e.TopicFilter)).ToList(); @@ -532,7 +539,7 @@ private void Client_Closed(Object sender, EventArgs e) Requests = subscriptions, }; - var rs = (await SendAsync(message).ConfigureAwait(false)) as SubAck; + var rs = (await SendAsync(message, true, cancellationToken).ConfigureAwait(false)) as SubAck; if (rs != null) { foreach (var item in subscriptions) @@ -547,15 +554,16 @@ private void Client_Closed(Object sender, EventArgs e) /// 取消订阅主题 /// 主题过滤器 + /// /// - public async Task UnsubscribeAsync(params String[] topicFilters) + public async Task UnsubscribeAsync(String[] topicFilters, CancellationToken cancellationToken = default) { var message = new UnsubscribeMessage { TopicFilters = topicFilters, }; - var rs = (await SendAsync(message).ConfigureAwait(false)) as UnsubAck; + var rs = (await SendAsync(message, true, cancellationToken).ConfigureAwait(false)) as UnsubAck; if (rs != null) { @@ -571,8 +579,9 @@ private void Client_Closed(Object sender, EventArgs e) #region 心跳 /// 心跳 + /// /// - public async Task PingAsync() + public async Task PingAsync(CancellationToken cancellationToken = default) { if (!IsConnected) { @@ -581,7 +590,7 @@ private void Client_Closed(Object sender, EventArgs e) var message = new PingRequest(); - var rs = (await SendAsync(message).ConfigureAwait(false)) as PingResponse; + var rs = (await SendAsync(message, true, cancellationToken).ConfigureAwait(false)) as PingResponse; return rs; } diff --git a/NewLife.MqttServer/NewLife.MqttServer.csproj b/NewLife.MqttServer/NewLife.MqttServer.csproj index 7a79930..b3dfb73 100644 --- a/NewLife.MqttServer/NewLife.MqttServer.csproj +++ b/NewLife.MqttServer/NewLife.MqttServer.csproj @@ -22,7 +22,7 @@ - +