Skip to content

Commit

Permalink
Initial code for Subscriber
Browse files Browse the repository at this point in the history
  • Loading branch information
jakhog committed Jun 13, 2024
1 parent 2314abf commit 4e207d1
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 39 deletions.
39 changes: 0 additions & 39 deletions Source/ICanSubscribeToNodes.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,42 +11,3 @@ public interface ICanSubscribeToNodes
{
Task SubscribeToChangesFor(ISession connection, TimeSpan publishInterval, IEnumerable<(NodeId node, TimeSpan samplingInterval)> nodes, Func<NodeValue,Task> handleValue, CancellationToken cancellationToken);
}

// var subscription = new Subscription()
// {
// PublishingInterval = 1000,
// PublishingEnabled = true,
// TimestampsToReturn = TimestampsToReturn.Both
// };

// var nodeId = new NodeId("ns=2;s=ismclient.MEI037.MonCh");

// var monitored = new MonitoredItem()
// {
// StartNodeId = nodeId,
// SamplingInterval = 10000,
// };
// monitored.Notification += (sender, e) =>
// {
// if (e.NotificationValue is MonitoredItemNotification not)
// {
// Console.WriteLine($"Notification: {not.Value.Value} - {not.Value.ServerTimestamp} and {not.Value.SourceTimestamp}");
// }
// };
// var other = new MonitoredItem()
// {
// StartNodeId = new NodeId("ns=2;s=ismclient.MEI038.MonCh"),
// SamplingInterval = 5000,
// };
// other.Notification += (sender, e) =>
// {
// if (e.NotificationValue is MonitoredItemNotification not)
// {
// Console.WriteLine($"Notification other: {not.Value.Value} - {not.Value.ServerTimestamp} and {not.Value.SourceTimestamp}");
// }
// };
// subscription.AddItem(monitored);
// subscription.AddItem(other);

// var added = session.AddSubscription(subscription);
// await subscription.CreateAsync();
98 changes: 98 additions & 0 deletions Source/Subscriber.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Opc.Ua;
using Opc.Ua.Client;
using Serilog;

namespace RaaLabs.Edge.Connectors.OPCUA;

public class Subscriber : ICanSubscribeToNodes
{
private readonly ILogger _logger;

Check warning on line 14 in Source/Subscriber.cs

View workflow job for this annotation

GitHub Actions / sonarcloud / Sonarcloud

Remove this unread private field '_logger' or refactor the code to use its value. (https://rules.sonarsource.com/csharp/RSPEC-4487)

Check warning on line 14 in Source/Subscriber.cs

View workflow job for this annotation

GitHub Actions / sonarcloud / Sonarcloud

Remove this unread private field '_logger' or refactor the code to use its value. (https://rules.sonarsource.com/csharp/RSPEC-4487)

Check warning on line 14 in Source/Subscriber.cs

View workflow job for this annotation

GitHub Actions / sonarcloud / Sonarcloud

Remove this unread private field '_logger' or refactor the code to use its value. (https://rules.sonarsource.com/csharp/RSPEC-4487)

public Subscriber(ILogger logger)
{
_logger = logger;
}

public async Task SubscribeToChangesFor(ISession connection, TimeSpan publishInterval, IEnumerable<(NodeId node, TimeSpan samplingInterval)> nodes, Func<NodeValue, Task> handleValue, CancellationToken cancellationToken)
{
using var subscription = CreateEmptySubscription(publishInterval);
var channel = Channel.CreateUnbounded<NodeValue>(new(){ SingleReader = true });

foreach (var (node, samplingInterval) in nodes)
{
subscription.AddItem(MonitoringFor(node, samplingInterval, channel));
}

connection.AddSubscription(subscription);
await subscription.CreateAsync(cancellationToken).ConfigureAwait(false);

DeleteSubscriptionWhenCancelled(subscription, cancellationToken);
CompleteChannelWhenSubscriptionCompletes(subscription, channel);

await foreach (var value in channel.Reader.ReadAllAsync(CancellationToken.None))
{
await handleValue(value).ConfigureAwait(false);
}
}

private MonitoredItem MonitoringFor(NodeId nodeId, TimeSpan samplingInterval, ChannelWriter<NodeValue> writer)
{
var monitored = new MonitoredItem()
{
StartNodeId = nodeId,
SamplingInterval = (int)samplingInterval.TotalMilliseconds,
};

monitored.Notification += (_, notification) =>
{
if (notification.NotificationValue is not MonitoredItemNotification monitored)
{
return;
}

if (!writer.TryWrite(new(nodeId, monitored.Value)))
{

Check warning on line 59 in Source/Subscriber.cs

View workflow job for this annotation

GitHub Actions / sonarcloud / Sonarcloud

Either remove or fill this block of code. (https://rules.sonarsource.com/csharp/RSPEC-108)

Check warning on line 59 in Source/Subscriber.cs

View workflow job for this annotation

GitHub Actions / sonarcloud / Sonarcloud

Either remove or fill this block of code. (https://rules.sonarsource.com/csharp/RSPEC-108)

Check warning on line 59 in Source/Subscriber.cs

View workflow job for this annotation

GitHub Actions / sonarcloud / Sonarcloud

Either remove or fill this block of code. (https://rules.sonarsource.com/csharp/RSPEC-108)

}
};

return monitored;
}

private void DeleteSubscriptionWhenCancelled(Subscription subscription, CancellationToken cancellationToken) =>
cancellationToken.Register(() =>
{
subscription.Delete(true);
});

private void CompleteChannelWhenSubscriptionCompletes(Subscription subscription, ChannelWriter<NodeValue> writer)
{
subscription.PublishStatusChanged += (_, changed) =>
{
if ((changed.Status & PublishStateChangedMask.Stopped) != 0)
{
writer.Complete();
}
};
subscription.StateChanged += (_, changed) =>
{
if ((changed.Status & SubscriptionChangeMask.Deleted) != 0)
{
writer.Complete();
}
};
}

private static Subscription CreateEmptySubscription(TimeSpan publishInterval) =>
new()
{
PublishingInterval = (int)publishInterval.TotalMilliseconds,
PublishingEnabled = true,
TimestampsToReturn = TimestampsToReturn.Both,
};
}

0 comments on commit 4e207d1

Please sign in to comment.