From d76dc386ffd6ff4f86a78d8ac1cc265aaa2a1e62 Mon Sep 17 00:00:00 2001 From: Ankita Gojiya Date: Wed, 24 Jul 2024 17:12:10 +0530 Subject: [PATCH 1/9] Remove metric filteration --- .../VersionB/SparkplugApplication.cs | 22 +++++++++---------- src/SparkplugNet/VersionB/SparkplugNode.cs | 10 ++++----- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/src/SparkplugNet/VersionB/SparkplugApplication.cs b/src/SparkplugNet/VersionB/SparkplugApplication.cs index 76f896b..f4f6d8e 100644 --- a/src/SparkplugNet/VersionB/SparkplugApplication.cs +++ b/src/SparkplugNet/VersionB/SparkplugApplication.cs @@ -149,12 +149,12 @@ private async Task HandleMessagesForVersionB(SparkplugMessageTopic topic, Payloa { // Filter out session number metric. var sessionNumberMetric = payload.Metrics.FirstOrDefault(m => m.Name == Constants.SessionNumberMetricName); - var metricsWithoutSequenceMetric = payload.Metrics.Where(m => m.Name != Constants.SessionNumberMetricName); - var filteredMetrics = this.KnownMetricsStorage.FilterMetrics(metricsWithoutSequenceMetric, topic.MessageType).ToList(); + var metrics = payload.Metrics.Where(m => m.Name != Constants.SessionNumberMetricName).ToList(); + // var filteredMetrics = this.KnownMetricsStorage.FilterMetrics(metricsWithoutSequenceMetric, topic.MessageType).ToList(); if (sessionNumberMetric is not null) { - filteredMetrics.Add(sessionNumberMetric); + metrics.Add(sessionNumberMetric); } // Handle messages. @@ -162,19 +162,19 @@ private async Task HandleMessagesForVersionB(SparkplugMessageTopic topic, Payloa { case SparkplugMessageType.NodeBirth: await this.FireNodeBirthReceived(topic.GroupIdentifier, topic.EdgeNodeIdentifier, - this.ProcessPayload(topic, filteredMetrics, SparkplugMetricStatus.Online)); + this.ProcessPayload(topic, metrics, SparkplugMetricStatus.Online)); break; case SparkplugMessageType.DeviceBirth: if (string.IsNullOrWhiteSpace(topic.DeviceIdentifier)) { throw new InvalidOperationException($"The device identifier is invalid!"); } - + await this.FireDeviceBirthReceived(topic.GroupIdentifier, topic.EdgeNodeIdentifier, topic.DeviceIdentifier, - this.ProcessPayload(topic, filteredMetrics, SparkplugMetricStatus.Online)); + this.ProcessPayload(topic, metrics, SparkplugMetricStatus.Online)); break; case SparkplugMessageType.NodeData: - var nodeDataMetrics = this.ProcessPayload(topic, filteredMetrics, SparkplugMetricStatus.Online); + var nodeDataMetrics = this.ProcessPayload(topic, metrics, SparkplugMetricStatus.Online); await this.FireNodeDataReceived(topic.GroupIdentifier, topic.EdgeNodeIdentifier, nodeDataMetrics); break; case SparkplugMessageType.DeviceData: @@ -183,20 +183,20 @@ await this.FireDeviceBirthReceived(topic.GroupIdentifier, topic.EdgeNodeIdentifi throw new InvalidOperationException($"Topic {topic} is invalid!"); } - var deviceDataMetrics = this.ProcessPayload(topic, filteredMetrics, SparkplugMetricStatus.Online); + var deviceDataMetrics = this.ProcessPayload(topic, metrics, SparkplugMetricStatus.Online); await this.FireDeviceDataReceived(topic.GroupIdentifier, topic.EdgeNodeIdentifier, topic.DeviceIdentifier, deviceDataMetrics); break; case SparkplugMessageType.NodeDeath: - this.ProcessPayload(topic, filteredMetrics, SparkplugMetricStatus.Offline); + this.ProcessPayload(topic, metrics, SparkplugMetricStatus.Offline); await this.FireNodeDeathReceived(topic.GroupIdentifier, topic.EdgeNodeIdentifier, sessionNumberMetric); - break; + break; case SparkplugMessageType.DeviceDeath: if (string.IsNullOrWhiteSpace(topic.DeviceIdentifier)) { throw new InvalidOperationException($"Topic {topic} is invalid!"); } - this.ProcessPayload(topic, filteredMetrics, SparkplugMetricStatus.Offline); + this.ProcessPayload(topic, metrics, SparkplugMetricStatus.Offline); await this.FireDeviceDeathReceived(topic.GroupIdentifier, topic.EdgeNodeIdentifier, topic.DeviceIdentifier); break; } diff --git a/src/SparkplugNet/VersionB/SparkplugNode.cs b/src/SparkplugNet/VersionB/SparkplugNode.cs index 58ed39f..b67defb 100644 --- a/src/SparkplugNet/VersionB/SparkplugNode.cs +++ b/src/SparkplugNet/VersionB/SparkplugNode.cs @@ -114,12 +114,12 @@ private async Task HandleMessagesForVersionB(SparkplugMessageTopic topic, Payloa { // Filter out session number metric. var sessionNumberMetric = payload.Metrics.FirstOrDefault(m => m.Name == Constants.SessionNumberMetricName); - var metricsWithoutSequenceMetric = payload.Metrics.Where(m => m.Name != Constants.SessionNumberMetricName); - var filteredMetrics = this.KnownMetricsStorage.FilterMetrics(metricsWithoutSequenceMetric, topic.MessageType).ToList(); + var metrics = payload.Metrics.ToList(); + // var filteredMetrics = this.KnownMetricsStorage.FilterMetrics(metricsWithoutSequenceMetric, topic.MessageType).ToList(); if (sessionNumberMetric is not null) { - filteredMetrics.Add(sessionNumberMetric); + metrics.Add(sessionNumberMetric); } // Handle messages. @@ -131,11 +131,11 @@ private async Task HandleMessagesForVersionB(SparkplugMessageTopic topic, Payloa throw new InvalidOperationException($"Topic {topic} is invalid!"); } - await this.FireDeviceCommandReceived(topic.DeviceIdentifier, filteredMetrics); + await this.FireDeviceCommandReceived(topic.DeviceIdentifier, metrics); break; case SparkplugMessageType.NodeCommand: - await this.FireNodeCommandReceived(filteredMetrics); + await this.FireNodeCommandReceived(metrics); break; } } From eaba1fa8f4808a30a30ec29f2cba8a546b8adf22 Mon Sep 17 00:00:00 2001 From: Ankita Gojiya Date: Wed, 24 Jul 2024 17:46:41 +0530 Subject: [PATCH 2/9] Code fixes --- src/SparkplugNet/VersionB/SparkplugNode.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/SparkplugNet/VersionB/SparkplugNode.cs b/src/SparkplugNet/VersionB/SparkplugNode.cs index b67defb..3ad52cb 100644 --- a/src/SparkplugNet/VersionB/SparkplugNode.cs +++ b/src/SparkplugNet/VersionB/SparkplugNode.cs @@ -114,7 +114,7 @@ private async Task HandleMessagesForVersionB(SparkplugMessageTopic topic, Payloa { // Filter out session number metric. var sessionNumberMetric = payload.Metrics.FirstOrDefault(m => m.Name == Constants.SessionNumberMetricName); - var metrics = payload.Metrics.ToList(); + var metrics = payload.Metrics.Where(m => m.Name != Constants.SessionNumberMetricName).ToList(); // var filteredMetrics = this.KnownMetricsStorage.FilterMetrics(metricsWithoutSequenceMetric, topic.MessageType).ToList(); if (sessionNumberMetric is not null) From caa896a83e0567e63c5a696fb99130e6d34f6f7c Mon Sep 17 00:00:00 2001 From: Nicholas Parmentier Date: Wed, 31 Jul 2024 12:05:20 +0200 Subject: [PATCH 3/9] GetSparkplugStateMessageTopic now used to generate the Node stateSubscribeTopic --- .../Core/Messages/SparkplugTopicGenerator.cs | 10 ---------- src/SparkplugNet/Core/Node/SparkplugNodeBase.cs | 3 ++- src/SparkplugNet/Core/SparkplugBase.cs | 6 ++++++ 3 files changed, 8 insertions(+), 11 deletions(-) diff --git a/src/SparkplugNet/Core/Messages/SparkplugTopicGenerator.cs b/src/SparkplugNet/Core/Messages/SparkplugTopicGenerator.cs index 0621bd2..b4a7485 100644 --- a/src/SparkplugNet/Core/Messages/SparkplugTopicGenerator.cs +++ b/src/SparkplugNet/Core/Messages/SparkplugTopicGenerator.cs @@ -61,16 +61,6 @@ internal static string GetDeviceCommandSubscribeTopic(SparkplugNamespace nameSpa return $"{nameSpace.GetDescription()}/{groupIdentifier}/{SparkplugMessageType.DeviceCommand.GetDescription()}/{edgeNodeIdentifier}/{deviceIdentifier}"; } - /// - /// Gets state subscription topic. - /// - /// The SCADA host identifier. - /// The state subscription topic . - internal static string GetStateSubscribeTopic(string scadaHostIdentifier) - { - return $"{SparkplugMessageType.StateMessage.GetDescription()}/{scadaHostIdentifier}"; - } - /// /// Gets the topic (Except STATE messages). /// diff --git a/src/SparkplugNet/Core/Node/SparkplugNodeBase.cs b/src/SparkplugNet/Core/Node/SparkplugNodeBase.cs index 27df22d..5847708 100644 --- a/src/SparkplugNet/Core/Node/SparkplugNodeBase.cs +++ b/src/SparkplugNet/Core/Node/SparkplugNodeBase.cs @@ -403,7 +403,8 @@ private async Task SubscribeInternal() await this.client.SubscribeAsync(deviceCommandSubscribeTopic, (MqttQualityOfServiceLevel)SparkplugQualityOfServiceLevel.AtLeastOnce); // Subscribe to the state topic. - var stateSubscribeTopic = SparkplugTopicGenerator.GetStateSubscribeTopic(this.Options.ScadaHostIdentifier); + //var stateSubscribeTopic = SparkplugTopicGenerator.GetStateSubscribeTopic(this.Options.ScadaHostIdentifier); + var stateSubscribeTopic = SparkplugTopicGenerator.GetSparkplugStateMessageTopic(this.Options.ScadaHostIdentifier, this.specificationVersion); await this.client.SubscribeAsync(stateSubscribeTopic, (MqttQualityOfServiceLevel)SparkplugQualityOfServiceLevel.AtLeastOnce); } diff --git a/src/SparkplugNet/Core/SparkplugBase.cs b/src/SparkplugNet/Core/SparkplugBase.cs index 7e0b7da..a3aa994 100644 --- a/src/SparkplugNet/Core/SparkplugBase.cs +++ b/src/SparkplugNet/Core/SparkplugBase.cs @@ -17,6 +17,11 @@ namespace SparkplugNet.Core; /// public partial class SparkplugBase : ISparkplugConnection where T : IMetric, new() { + /// + /// The sparkplug specification version. + /// + internal readonly SparkplugSpecificationVersion specificationVersion; + /// /// The message generator. /// @@ -54,6 +59,7 @@ public SparkplugBase(IEnumerable knownMetrics, SparkplugSpecificationVersion /// public SparkplugBase(KnownMetricStorage knownMetricsStorage, SparkplugSpecificationVersion specificationVersion) { + this.specificationVersion = specificationVersion; this.knownMetrics = knownMetricsStorage; if (typeof(T).IsAssignableFrom(typeof(VersionAData.KuraMetric))) From 427473791e105bcd3501052417ba4a55e0990d35 Mon Sep 17 00:00:00 2001 From: Nicholas Parmentier Date: Mon, 5 Aug 2024 08:35:08 +0200 Subject: [PATCH 4/9] - --- .../Application/SparkplugApplicationBase.cs | 23 ++++---- src/SparkplugNet/Core/GroupState .cs | 22 ++++++++ src/SparkplugNet/Core/MetricState.cs | 2 +- src/SparkplugNet/Core/NodeState .cs | 22 ++++++++ .../VersionA/SparkplugApplication.cs | 33 +++++++++--- .../VersionB/SparkplugApplication.cs | 54 ++++++++++++++++--- 6 files changed, 127 insertions(+), 29 deletions(-) create mode 100644 src/SparkplugNet/Core/GroupState .cs create mode 100644 src/SparkplugNet/Core/NodeState .cs diff --git a/src/SparkplugNet/Core/Application/SparkplugApplicationBase.cs b/src/SparkplugNet/Core/Application/SparkplugApplicationBase.cs index c140274..2ad7f4a 100644 --- a/src/SparkplugNet/Core/Application/SparkplugApplicationBase.cs +++ b/src/SparkplugNet/Core/Application/SparkplugApplicationBase.cs @@ -47,14 +47,9 @@ public SparkplugApplicationBase( } /// - /// Gets the node states. + /// Get the group states. /// - public ConcurrentDictionary> NodeStates { get; } = new(); - - /// - /// Gets the device states. - /// - public ConcurrentDictionary> DeviceStates { get; } = new(); + public ConcurrentDictionary> GroupStates { get; } = new(); /// /// Gets the options. @@ -85,8 +80,7 @@ public async Task Start(SparkplugApplicationOptions applicationOptions) } // Clear states. - this.NodeStates.Clear(); - this.DeviceStates.Clear(); + this.GroupStates.Clear(); // Add handlers. this.AddEventHandlers(); @@ -364,7 +358,7 @@ private async Task ConnectInternal() } else { - builder.WithWebSocketServer(options => + builder.WithWebSocketServer(options => options.WithCookieContainer(this.Options.MqttWebSocketOptions.CookieContainer) .WithCookieContainer(this.Options.MqttWebSocketOptions.Credentials) .WithProxyOptions(this.Options.MqttWebSocketOptions.ProxyOptions) @@ -450,11 +444,12 @@ private async Task SubscribeInternal() /// The metric state. private void UpdateMetricState(SparkplugMetricStatus metricState) { - var keys = new List(this.NodeStates.Keys.ToList()); - - foreach (string key in keys) + foreach (var group in this.GroupStates) { - this.NodeStates[key].MetricStatus = metricState; + foreach (var node in group.Value.NodeStates) + { + node.Value.MetricStatus = metricState; + } } } diff --git a/src/SparkplugNet/Core/GroupState .cs b/src/SparkplugNet/Core/GroupState .cs new file mode 100644 index 0000000..5088007 --- /dev/null +++ b/src/SparkplugNet/Core/GroupState .cs @@ -0,0 +1,22 @@ +// -------------------------------------------------------------------------------------------------------------------- +// +// The project is licensed under the MIT license. +// +// +// A state class for the metrics. +// +// -------------------------------------------------------------------------------------------------------------------- + +namespace SparkplugNet.Core; + +/// +/// The group state class. +/// +/// The type parameter. +public sealed class GroupState where T : IMetric, new() +{ + /// + /// Get the device states. + /// + public ConcurrentDictionary> NodeStates { get; } = new(); +} diff --git a/src/SparkplugNet/Core/MetricState.cs b/src/SparkplugNet/Core/MetricState.cs index 104919d..b2722d5 100644 --- a/src/SparkplugNet/Core/MetricState.cs +++ b/src/SparkplugNet/Core/MetricState.cs @@ -13,7 +13,7 @@ namespace SparkplugNet.Core; /// The metric state class. /// /// The type parameter. -public sealed class MetricState where T : IMetric, new() +public class MetricState where T : IMetric, new() { /// /// Gets or sets the metric status. diff --git a/src/SparkplugNet/Core/NodeState .cs b/src/SparkplugNet/Core/NodeState .cs new file mode 100644 index 0000000..e0816b9 --- /dev/null +++ b/src/SparkplugNet/Core/NodeState .cs @@ -0,0 +1,22 @@ +// -------------------------------------------------------------------------------------------------------------------- +// +// The project is licensed under the MIT license. +// +// +// A state class for the metrics. +// +// -------------------------------------------------------------------------------------------------------------------- + +namespace SparkplugNet.Core; + +/// +/// The node state class. +/// +/// The type parameter. +public sealed class NodeState : MetricState where T : IMetric, new() +{ + /// + /// Get the device states. + /// + public ConcurrentDictionary> DeviceStates { get; set; } = new(); +} diff --git a/src/SparkplugNet/VersionA/SparkplugApplication.cs b/src/SparkplugNet/VersionA/SparkplugApplication.cs index 1f76637..df6a564 100644 --- a/src/SparkplugNet/VersionA/SparkplugApplication.cs +++ b/src/SparkplugNet/VersionA/SparkplugApplication.cs @@ -222,30 +222,51 @@ await this.FireDeviceBirthReceived(topic.GroupIdentifier, topic.EdgeNodeIdentifi /// The topic. /// The metrics. /// The metric status. - /// Thrown if the edge node identifier is invalid. + /// Thrown if any identifier is invalid. /// Thrown if the metric cast is invalid. private IEnumerable ProcessPayload( SparkplugMessageTopic topic, List metrics, SparkplugMetricStatus metricStatus) { - var metricState = new MetricState + // Check group id. + if (string.IsNullOrWhiteSpace(topic.GroupIdentifier)) + { + throw new InvalidOperationException($"The group identifier is invalid {topic.GroupIdentifier}."); + } + + if (!this.GroupStates.ContainsKey(topic.GroupIdentifier)) + { + this.GroupStates[topic.GroupIdentifier] = new GroupState(); + } + + NodeState metricState = new() { MetricStatus = metricStatus }; + // Check node id. + if (string.IsNullOrWhiteSpace(topic.EdgeNodeIdentifier)) + { + throw new InvalidOperationException($"The edge node identifier is invalid {topic.EdgeNodeIdentifier}."); + } + if (!string.IsNullOrWhiteSpace(topic.DeviceIdentifier)) { - if (string.IsNullOrWhiteSpace(topic.EdgeNodeIdentifier)) + if (!this.GroupStates[topic.GroupIdentifier].NodeStates.ContainsKey(topic.EdgeNodeIdentifier)) { - throw new InvalidOperationException($"The edge node identifier is invalid for device {topic.DeviceIdentifier}."); + this.GroupStates[topic.GroupIdentifier] + .NodeStates[topic.EdgeNodeIdentifier] = metricState; } - this.DeviceStates[$"{topic.EdgeNodeIdentifier}/{topic.DeviceIdentifier}"] = metricState; + this.GroupStates[topic.GroupIdentifier] + .NodeStates[topic.EdgeNodeIdentifier] + .DeviceStates[topic.DeviceIdentifier] = metricState; } else { - this.NodeStates[topic.EdgeNodeIdentifier] = metricState; + this.GroupStates[topic.GroupIdentifier] + .NodeStates[topic.EdgeNodeIdentifier] = metricState; } foreach (var payloadMetric in metrics) diff --git a/src/SparkplugNet/VersionB/SparkplugApplication.cs b/src/SparkplugNet/VersionB/SparkplugApplication.cs index f4f6d8e..adeef53 100644 --- a/src/SparkplugNet/VersionB/SparkplugApplication.cs +++ b/src/SparkplugNet/VersionB/SparkplugApplication.cs @@ -9,6 +9,8 @@ namespace SparkplugNet.VersionB; +using System.Diagnostics.Metrics; + /// /// /// A class that handles a Sparkplug application. @@ -169,7 +171,7 @@ await this.FireNodeBirthReceived(topic.GroupIdentifier, topic.EdgeNodeIdentifier { throw new InvalidOperationException($"The device identifier is invalid!"); } - + await this.FireDeviceBirthReceived(topic.GroupIdentifier, topic.EdgeNodeIdentifier, topic.DeviceIdentifier, this.ProcessPayload(topic, metrics, SparkplugMetricStatus.Online)); break; @@ -189,7 +191,7 @@ await this.FireDeviceBirthReceived(topic.GroupIdentifier, topic.EdgeNodeIdentifi case SparkplugMessageType.NodeDeath: this.ProcessPayload(topic, metrics, SparkplugMetricStatus.Offline); await this.FireNodeDeathReceived(topic.GroupIdentifier, topic.EdgeNodeIdentifier, sessionNumberMetric); - break; + break; case SparkplugMessageType.DeviceDeath: if (string.IsNullOrWhiteSpace(topic.DeviceIdentifier)) { @@ -208,27 +210,63 @@ await this.FireDeviceBirthReceived(topic.GroupIdentifier, topic.EdgeNodeIdentifi /// The topic. /// The metrics. /// The metric status. - /// Thrown if the edge node identifier is invalid. + /// Thrown if any identifier is invalid. /// Thrown if the metric cast is invalid. private IEnumerable ProcessPayload(SparkplugMessageTopic topic, List metrics, SparkplugMetricStatus metricStatus) { - var metricState = new MetricState + // Check group id. + if (string.IsNullOrWhiteSpace(topic.GroupIdentifier)) + { + throw new InvalidOperationException($"The group identifier is invalid {topic.GroupIdentifier}."); + } + + if (!this.GroupStates.ContainsKey(topic.GroupIdentifier)) + { + this.GroupStates[topic.GroupIdentifier] = new GroupState(); + } + + NodeState metricState = new() { MetricStatus = metricStatus }; + // Check node id. + if (string.IsNullOrWhiteSpace(topic.EdgeNodeIdentifier)) + { + throw new InvalidOperationException($"The edge node identifier is invalid {topic.EdgeNodeIdentifier}."); + } + if (!string.IsNullOrWhiteSpace(topic.DeviceIdentifier)) { - if (string.IsNullOrWhiteSpace(topic.EdgeNodeIdentifier)) + // If the group doesn't contain the node, create a new node. + if (!this.GroupStates[topic.GroupIdentifier].NodeStates.ContainsKey(topic.EdgeNodeIdentifier)) { - throw new InvalidOperationException($"The edge node identifier is invalid for device {topic.DeviceIdentifier}."); + this.GroupStates[topic.GroupIdentifier] + .NodeStates[topic.EdgeNodeIdentifier] = metricState; } - this.DeviceStates[$"{topic.EdgeNodeIdentifier}/{topic.DeviceIdentifier}"] = metricState; + if (this.GroupStates[topic.GroupIdentifier] + .NodeStates[topic.EdgeNodeIdentifier] + .DeviceStates.TryGetValue(topic.DeviceIdentifier, out var metric)) + { + metricState.Metrics = metric.Metrics; + } + + this.GroupStates[topic.GroupIdentifier] + .NodeStates[topic.EdgeNodeIdentifier] + .DeviceStates[topic.DeviceIdentifier] = metricState; } else { - this.NodeStates[topic.EdgeNodeIdentifier] = metricState; + if (this.GroupStates[topic.GroupIdentifier] + .NodeStates.TryGetValue(topic.EdgeNodeIdentifier, out var metric)) + { + metricState.Metrics = metric.Metrics; + metricState.DeviceStates = metric.DeviceStates; + } + + this.GroupStates[topic.GroupIdentifier] + .NodeStates[topic.EdgeNodeIdentifier] = metricState; } foreach (var payloadMetric in metrics) From e7f4b5d8e2a02c3f8223d47a521758f37ef06597 Mon Sep 17 00:00:00 2001 From: Nicholas Parmentier Date: Fri, 9 Aug 2024 10:43:08 +0200 Subject: [PATCH 5/9] tck-id-payloads-metric-datatype-not-req --- .../Messages/SparkplugMessageGenerator.cs | 16 ++--- .../Core/SparkplugBase.KnownMetricStorage.cs | 6 ++ src/SparkplugNet/VersionB/PayloadConverter.cs | 68 ++++++++++++++++--- .../VersionB/SparkplugApplication.cs | 32 +++++++-- src/SparkplugNet/VersionB/SparkplugNode.cs | 31 +++++++-- 5 files changed, 124 insertions(+), 29 deletions(-) diff --git a/src/SparkplugNet/Core/Messages/SparkplugMessageGenerator.cs b/src/SparkplugNet/Core/Messages/SparkplugMessageGenerator.cs index d96c248..18ca17f 100644 --- a/src/SparkplugNet/Core/Messages/SparkplugMessageGenerator.cs +++ b/src/SparkplugNet/Core/Messages/SparkplugMessageGenerator.cs @@ -672,7 +672,7 @@ private MqttApplicationMessage GetSparkplugNodeBirthB( Timestamp = (ulong)dateTime.ToUnixTimeMilliseconds() }; - var convertedPayload = VersionB.PayloadConverter.ConvertVersionBPayload(payload); + var convertedPayload = VersionB.PayloadConverter.ConvertVersionBPayload(payload, SparkplugMessageType.NodeBirth); var serialized = PayloadHelper.Serialize(convertedPayload); return new MqttApplicationMessageBuilder() @@ -756,7 +756,7 @@ private MqttApplicationMessage GetSparkplugDeviceBirthB( Timestamp = (ulong)dateTime.ToUnixTimeMilliseconds() }; - var convertedPayload = VersionB.PayloadConverter.ConvertVersionBPayload(payload); + var convertedPayload = VersionB.PayloadConverter.ConvertVersionBPayload(payload, SparkplugMessageType.DeviceBirth); var serialized = PayloadHelper.Serialize(convertedPayload); return new MqttApplicationMessageBuilder() @@ -827,7 +827,7 @@ private MqttApplicationMessage GetSparkplugNodeDeathB( Metrics = metrics.ToList() }; - var convertedPayload = VersionB.PayloadConverter.ConvertVersionBPayload(payload); + var convertedPayload = VersionB.PayloadConverter.ConvertVersionBPayload(payload, SparkplugMessageType.NodeDeath); var serialized = PayloadHelper.Serialize(convertedPayload); return new MqttApplicationMessageBuilder() @@ -911,7 +911,7 @@ private MqttApplicationMessage GetSparkplugDeviceDeathB( Timestamp = (ulong)dateTime.ToUnixTimeMilliseconds() }; - var convertedPayload = VersionB.PayloadConverter.ConvertVersionBPayload(payload); + var convertedPayload = VersionB.PayloadConverter.ConvertVersionBPayload(payload, SparkplugMessageType.DeviceDeath); var serialized = PayloadHelper.Serialize(convertedPayload); return new MqttApplicationMessageBuilder() @@ -991,7 +991,7 @@ private MqttApplicationMessage GetSparkplugNodeDataB( Timestamp = (ulong)dateTime.ToUnixTimeMilliseconds() }; - var convertedPayload = VersionB.PayloadConverter.ConvertVersionBPayload(payload); + var convertedPayload = VersionB.PayloadConverter.ConvertVersionBPayload(payload, SparkplugMessageType.NodeData); var serialized = PayloadHelper.Serialize(convertedPayload); return new MqttApplicationMessageBuilder() @@ -1075,7 +1075,7 @@ private MqttApplicationMessage GetSparkplugDeviceDataB( Timestamp = (ulong)dateTime.ToUnixTimeMilliseconds() }; - var convertedPayload = VersionB.PayloadConverter.ConvertVersionBPayload(payload); + var convertedPayload = VersionB.PayloadConverter.ConvertVersionBPayload(payload, SparkplugMessageType.DeviceData); var serialized = PayloadHelper.Serialize(convertedPayload); return new MqttApplicationMessageBuilder() @@ -1154,7 +1154,7 @@ private static MqttApplicationMessage GetSparkplugNodeCommandB( Timestamp = (ulong)dateTime.ToUnixTimeMilliseconds() }; - var convertedPayload = VersionB.PayloadConverter.ConvertVersionBPayload(payload); + var convertedPayload = VersionB.PayloadConverter.ConvertVersionBPayload(payload, SparkplugMessageType.NodeCommand); var serialized = PayloadHelper.Serialize(convertedPayload); return new MqttApplicationMessageBuilder() @@ -1236,7 +1236,7 @@ private static MqttApplicationMessage GetSparkplugDeviceCommandB( Timestamp = (ulong)dateTime.ToUnixTimeMilliseconds() }; - var convertedPayload = VersionB.PayloadConverter.ConvertVersionBPayload(payload); + var convertedPayload = VersionB.PayloadConverter.ConvertVersionBPayload(payload, SparkplugMessageType.DeviceCommand); var serialized = PayloadHelper.Serialize(convertedPayload); return new MqttApplicationMessageBuilder() diff --git a/src/SparkplugNet/Core/SparkplugBase.KnownMetricStorage.cs b/src/SparkplugNet/Core/SparkplugBase.KnownMetricStorage.cs index 83590ed..9244f49 100644 --- a/src/SparkplugNet/Core/SparkplugBase.KnownMetricStorage.cs +++ b/src/SparkplugNet/Core/SparkplugBase.KnownMetricStorage.cs @@ -313,5 +313,11 @@ private void AddVersionBMetric(T metric, Metric versionBMetric) this.knownMetricsByName[metric.Name] = metric; } } + + /// + /// Return the known metrics by name. + /// + /// + public ConcurrentDictionary GetKnownMetricsByName() { return this.knownMetricsByName; } } } diff --git a/src/SparkplugNet/VersionB/PayloadConverter.cs b/src/SparkplugNet/VersionB/PayloadConverter.cs index fcc2242..63ea63a 100644 --- a/src/SparkplugNet/VersionB/PayloadConverter.cs +++ b/src/SparkplugNet/VersionB/PayloadConverter.cs @@ -18,39 +18,56 @@ internal static class PayloadConverter /// Gets the version B payload converted from the ProtoBuf payload. /// /// The . + /// /// The . - public static Payload ConvertVersionBPayload(VersionBProtoBuf.ProtoBufPayload protoPayload) + public static Payload ConvertVersionBPayload(VersionBProtoBuf.ProtoBufPayload protoPayload, ConcurrentDictionary? metrics) => new() { Body = protoPayload.Body, - Metrics = protoPayload.Metrics.Select(ConvertVersionBMetric).ToList(), + Metrics = protoPayload.Metrics.Select(m => ConvertVersionBMetric(m, metrics)).ToList(), Seq = protoPayload.Seq, Timestamp = protoPayload.Timestamp, Uuid = protoPayload.Uuid ?? string.Empty }; + + public static VersionBProtoBuf.ProtoBufPayload ConvertVersionBPayload(Payload payload) + => ConvertVersionBPayload(payload, null); + /// /// Gets the ProtoBuf payload converted from the version B payload. /// /// The . + /// The . /// The . - public static VersionBProtoBuf.ProtoBufPayload ConvertVersionBPayload(Payload payload) + public static VersionBProtoBuf.ProtoBufPayload ConvertVersionBPayload(Payload payload, SparkplugMessageType? sparkplugMessageType) => new() { Body = payload.Body, - Metrics = payload.Metrics.Select(ConvertVersionBMetric).ToList(), + Metrics = payload.Metrics.Select(m => ConvertVersionBMetric(m, sparkplugMessageType)).ToList(), Seq = payload.Seq, Timestamp = payload.Timestamp, Uuid = payload.Uuid }; + /// + /// + /// + /// + /// + public static Metric ConvertVersionBMetric(VersionBProtoBuf.ProtoBufPayload.Metric protoMetric) + { + return ConvertVersionBMetric(protoMetric, null); + } + /// /// Gets the version B metric from the version B ProtoBuf metric. /// /// The . + /// The . /// Thrown if the metric data type is unknown. /// The . - public static Metric ConvertVersionBMetric(VersionBProtoBuf.ProtoBufPayload.Metric protoMetric) + public static Metric ConvertVersionBMetric(VersionBProtoBuf.ProtoBufPayload.Metric protoMetric, ConcurrentDictionary? metrics) { var metric = new Metric() { @@ -62,7 +79,18 @@ public static Metric ConvertVersionBMetric(VersionBProtoBuf.ProtoBufPayload.Metr Timestamp = protoMetric.Timestamp }; - var dataType = ConvertVersionBDataType((VersionBProtoBuf.DataType?)protoMetric.DataType); + // [tck-id-payloads-metric-datatype-not-req] + // The datatype SHOULD NOT be included with metric definitions in NDATA, NCMD, DDATA, and DCMD messages. + VersionBDataTypeEnum dataType; + + if (metrics is null) + { + dataType = ConvertVersionBDataType((VersionBProtoBuf.DataType?)protoMetric.DataType); + } + else + { + dataType = metrics.Where(o => o.Key == metric.Name).Select(o => o.Value.DataType).FirstOrDefault(); + } switch (dataType) { @@ -139,7 +167,7 @@ public static Metric ConvertVersionBMetric(VersionBProtoBuf.ProtoBufPayload.Metr metric.SetValue(VersionBDataTypeEnum.Int32Array, int32Array); break; case VersionBDataTypeEnum.Int64Array: - var int64Array = PayloadHelper.GetArrayOfTFromBytes(protoMetric.BytesValue, BinaryPrimitives.ReadInt64LittleEndian); + var int64Array = PayloadHelper.GetArrayOfTFromBytes(protoMetric.BytesValue, BinaryPrimitives.ReadInt64LittleEndian); metric.SetValue(VersionBDataTypeEnum.Int64Array, int64Array); break; case VersionBDataTypeEnum.UInt8Array: @@ -177,7 +205,7 @@ public static Metric ConvertVersionBMetric(VersionBProtoBuf.ProtoBufPayload.Metr var dateTimeArray = PayloadHelper.GetArrayOfTFromBytes(protoMetric.BytesValue, BinaryPrimitives.ReadUInt64LittleEndian); metric.SetValue(VersionBDataTypeEnum.DateTimeArray, dateTimeArray.Select(x => DateTimeOffset.FromUnixTimeMilliseconds((long)x)).ToArray()); break; - // Todo: What to do here? + // Todo: What to do here? case VersionBDataTypeEnum.PropertySetList: case VersionBDataTypeEnum.Unknown: default: @@ -191,15 +219,35 @@ public static Metric ConvertVersionBMetric(VersionBProtoBuf.ProtoBufPayload.Metr /// Gets the version B ProtoBuf metric from the version B metric. /// /// The . + /// + public static VersionBProtoBuf.ProtoBufPayload.Metric ConvertVersionBMetric(Metric metric) + { + return ConvertVersionBMetric(metric, null); + } + + /// + /// Gets the version B ProtoBuf metric from the version B metric. + /// + /// The . + /// The . /// Thrown if the property set data type is set for a metric. /// Thrown if the metric data type is unknown. /// The . - public static VersionBProtoBuf.ProtoBufPayload.Metric ConvertVersionBMetric(Metric metric) + public static VersionBProtoBuf.ProtoBufPayload.Metric ConvertVersionBMetric(Metric metric, SparkplugMessageType? sparkplugMessageType) { + // [tck-id-payloads-metric-datatype-not-req] + // The datatype SHOULD NOT be included with metric definitions in NDATA, NCMD, DDATA, and DCMD messages. + uint? dataType = null; + + if (sparkplugMessageType == null || sparkplugMessageType == SparkplugMessageType.NodeBirth || sparkplugMessageType == SparkplugMessageType.DeviceBirth) + { + dataType = (uint?)ConvertVersionBDataType(metric.DataType); + } + var protoMetric = new VersionBProtoBuf.ProtoBufPayload.Metric() { Alias = metric.Alias, - DataType = (uint?)ConvertVersionBDataType(metric.DataType), + DataType = dataType, IsHistorical = metric.IsHistorical, IsNull = metric.IsNull, IsTransient = metric.IsTransient, diff --git a/src/SparkplugNet/VersionB/SparkplugApplication.cs b/src/SparkplugNet/VersionB/SparkplugApplication.cs index adeef53..38a8c9e 100644 --- a/src/SparkplugNet/VersionB/SparkplugApplication.cs +++ b/src/SparkplugNet/VersionB/SparkplugApplication.cs @@ -128,17 +128,39 @@ protected override async Task OnMessageReceived(SparkplugMessageTopic topic, byt { var payloadVersionB = PayloadHelper.Deserialize(payload); - if (payloadVersionB is not null) + if (payloadVersionB == null) { return; } + + ConcurrentDictionary? metrics = null; + + if (!(topic.MessageType == SparkplugMessageType.NodeBirth || topic.MessageType == SparkplugMessageType.DeviceBirth)) { - var convertedPayload = PayloadConverter.ConvertVersionBPayload(payloadVersionB); + // Get known metrics + if (!this.GroupStates.TryGetValue(topic.GroupIdentifier, out var groupState)) { return; } + + if (!groupState.NodeStates.TryGetValue(topic.EdgeNodeIdentifier, out var nodeState)) { return; } - if (convertedPayload is not Payload _) + if (topic.DeviceIdentifier is null) { - throw new InvalidCastException("The metric cast didn't work properly."); + metrics = nodeState.Metrics; + } + else if (nodeState.DeviceStates.TryGetValue(topic.DeviceIdentifier, out var metricState)) + { + metrics = metricState.Metrics; } + else + { + return; + } + } - await this.HandleMessagesForVersionB(topic, convertedPayload); + var convertedPayload = PayloadConverter.ConvertVersionBPayload(payloadVersionB, metrics); + + if (convertedPayload is not Payload _) + { + throw new InvalidCastException("The metric cast didn't work properly."); } + + await this.HandleMessagesForVersionB(topic, convertedPayload); } /// diff --git a/src/SparkplugNet/VersionB/SparkplugNode.cs b/src/SparkplugNet/VersionB/SparkplugNode.cs index 3ad52cb..de76e06 100644 --- a/src/SparkplugNet/VersionB/SparkplugNode.cs +++ b/src/SparkplugNet/VersionB/SparkplugNode.cs @@ -91,17 +91,36 @@ protected override async Task OnMessageReceived(SparkplugMessageTopic topic, byt { var payloadVersionB = PayloadHelper.Deserialize(payload); - if (payloadVersionB is not null) - { - var convertedPayload = PayloadConverter.ConvertVersionBPayload(payloadVersionB); + if (payloadVersionB == null) { return; } + + ConcurrentDictionary? metrics = null; - if (convertedPayload is not Payload _) + if (!(topic.MessageType == SparkplugMessageType.NodeBirth || topic.MessageType == SparkplugMessageType.DeviceBirth)) + { + // Get known metrics + if (topic.DeviceIdentifier is null) + { + metrics = this.knownMetrics.GetKnownMetricsByName(); + } + else if (this.KnownDevices.TryGetValue(topic.DeviceIdentifier, out var knownMetricStorage)) + { + metrics = knownMetricStorage.GetKnownMetricsByName(); + } + else { - throw new InvalidCastException("The metric cast didn't work properly."); + return; } + } + + var convertedPayload = PayloadConverter.ConvertVersionBPayload(payloadVersionB, metrics); - await this.HandleMessagesForVersionB(topic, convertedPayload); + if (convertedPayload is not Payload _) + { + throw new InvalidCastException("The metric cast didn't work properly."); } + + await this.HandleMessagesForVersionB(topic, convertedPayload); + } /// From 46d18356ec45deabc29b81c722fb8131965ccba8 Mon Sep 17 00:00:00 2001 From: Nicholas Parmentier Date: Mon, 14 Oct 2024 13:17:01 +0200 Subject: [PATCH 6/9] - --- .../Core/Node/SparkplugNodeBase.cs | 25 ++++++++++++++----- src/SparkplugNet/SparkplugNet.csproj | 1 + src/SparkplugNet/VersionB/PayloadConverter.cs | 16 ++++++++++++ 3 files changed, 36 insertions(+), 6 deletions(-) diff --git a/src/SparkplugNet/Core/Node/SparkplugNodeBase.cs b/src/SparkplugNet/Core/Node/SparkplugNodeBase.cs index 5847708..76b345e 100644 --- a/src/SparkplugNet/Core/Node/SparkplugNodeBase.cs +++ b/src/SparkplugNet/Core/Node/SparkplugNodeBase.cs @@ -86,7 +86,11 @@ public async Task Start(SparkplugNodeOptions nodeOptions, KnownMetricStorage? kn // Connect, subscribe to incoming messages and send a state message. await this.ConnectInternal(); await this.SubscribeInternal(); - await this.PublishNodeAndDeviceBirthsInternal(); + + if (string.IsNullOrEmpty(this.Options.ScadaHostIdentifier)) + { + await this.PublishNodeAndDeviceBirthsInternal(); + } } /// @@ -124,14 +128,11 @@ public async Task PublishMetrics(IEnumerable metrics } /// - /// Does a node rebirth. + /// Does a node birth. /// /// The new metrics. - public async Task Rebirth(IEnumerable metrics) + public async Task Birth(IEnumerable metrics) { - // Send node death first. - await this.SendNodeDeathMessage(); - // Reset the known metrics. this.knownMetrics = new KnownMetricStorage(metrics); @@ -139,6 +140,18 @@ public async Task Rebirth(IEnumerable metrics) await this.PublishNodeAndDeviceBirthsInternal(); } + /// + /// Does a node rebirth. + /// + /// The new metrics. + public async Task Rebirth(IEnumerable metrics) + { + // Send node death first. + await this.SendNodeDeathMessage(); + + await this.Birth(metrics); + } + /// /// Publishes metrics for a node. /// diff --git a/src/SparkplugNet/SparkplugNet.csproj b/src/SparkplugNet/SparkplugNet.csproj index 4774d34..6d69a8c 100644 --- a/src/SparkplugNet/SparkplugNet.csproj +++ b/src/SparkplugNet/SparkplugNet.csproj @@ -28,6 +28,7 @@ NU1803,CS0618,CS0809,NU1901,NU1902 true all + Debug;Release;p1600sedac diff --git a/src/SparkplugNet/VersionB/PayloadConverter.cs b/src/SparkplugNet/VersionB/PayloadConverter.cs index 63ea63a..0229d7b 100644 --- a/src/SparkplugNet/VersionB/PayloadConverter.cs +++ b/src/SparkplugNet/VersionB/PayloadConverter.cs @@ -79,6 +79,22 @@ public static Metric ConvertVersionBMetric(VersionBProtoBuf.ProtoBufPayload.Metr Timestamp = protoMetric.Timestamp }; + // Get properties + if (protoMetric.PropertySetValue is not null) + { + PropertySet propertySet = new(); + propertySet.Keys = protoMetric.PropertySetValue.Keys; + + propertySet.Values = []; + + foreach (var item in protoMetric.PropertySetValue.Values) + { + propertySet.Values.Add(ConvertVersionBPropertyValue(item)); + } + + metric.Properties = propertySet; + } + // [tck-id-payloads-metric-datatype-not-req] // The datatype SHOULD NOT be included with metric definitions in NDATA, NCMD, DDATA, and DCMD messages. VersionBDataTypeEnum dataType; From 0a0112418764fb91466b16900fdad40c61955b50 Mon Sep 17 00:00:00 2001 From: Nicholas Parmentier Date: Wed, 16 Oct 2024 11:12:06 +0200 Subject: [PATCH 7/9] - --- src/SparkplugNet/SparkplugNet.csproj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/SparkplugNet/SparkplugNet.csproj b/src/SparkplugNet/SparkplugNet.csproj index 6d69a8c..deaa5ef 100644 --- a/src/SparkplugNet/SparkplugNet.csproj +++ b/src/SparkplugNet/SparkplugNet.csproj @@ -28,7 +28,7 @@ NU1803,CS0618,CS0809,NU1901,NU1902 true all - Debug;Release;p1600sedac + Debug;Release;p1600sedac;p1000edge From a394886c81c74f5b0d1980534f3b5b59ace2670f Mon Sep 17 00:00:00 2001 From: Nicholas Parmentier Date: Mon, 2 Dec 2024 09:32:41 +0100 Subject: [PATCH 8/9] - --- src/SparkplugNet/SparkplugNet.csproj | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/src/SparkplugNet/SparkplugNet.csproj b/src/SparkplugNet/SparkplugNet.csproj index deaa5ef..a0a6a77 100644 --- a/src/SparkplugNet/SparkplugNet.csproj +++ b/src/SparkplugNet/SparkplugNet.csproj @@ -1,7 +1,7 @@ - net6.0;net8.0 + net8.0 SparkplugNet SparkplugNet true @@ -28,14 +28,10 @@ NU1803,CS0618,CS0809,NU1901,NU1902 true all - Debug;Release;p1600sedac;p1000edge + Debug;Release;p1600sedac;p1000edge;p1600sedacedge;p1800epsi - - all - runtime; build; native; contentfiles; analyzers; buildtransitive - all From c019b18ce085f1ef4be2aab5f01f36e0b39bf47c Mon Sep 17 00:00:00 2001 From: Nicholas Parmentier Date: Wed, 8 Jan 2025 08:11:28 +0100 Subject: [PATCH 9/9] - --- src/SparkplugNet/SparkplugNet.csproj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/SparkplugNet/SparkplugNet.csproj b/src/SparkplugNet/SparkplugNet.csproj index a0a6a77..6220201 100644 --- a/src/SparkplugNet/SparkplugNet.csproj +++ b/src/SparkplugNet/SparkplugNet.csproj @@ -28,7 +28,7 @@ NU1803,CS0618,CS0809,NU1901,NU1902 true all - Debug;Release;p1600sedac;p1000edge;p1600sedacedge;p1800epsi + Debug;Release;p1600sedac;p1000edge;p1600sedacedge;p1800epsi;p1000master