From 1509a0a64fe009f4246c5e3a401ee80f2b70baa3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakob=20H=C3=B8genes?= <1014990+jakhog@users.noreply.github.com> Date: Thu, 13 Jun 2024 10:51:20 +0200 Subject: [PATCH] DataReader --- Source/DataReader.cs | 61 +++++++++++++++++++++++++++++++++++++++++ Source/IRetrieveData.cs | 2 -- Source/Program.cs | 1 + 3 files changed, 62 insertions(+), 2 deletions(-) create mode 100644 Source/DataReader.cs diff --git a/Source/DataReader.cs b/Source/DataReader.cs new file mode 100644 index 0000000..4960d43 --- /dev/null +++ b/Source/DataReader.cs @@ -0,0 +1,61 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Opc.Ua; +using Opc.Ua.Client; +using Serilog; + +namespace RaaLabs.Edge.Connectors.OPCUA; + +public class DataReader : IRetrieveData +{ + private readonly TimeSpan _publishInterval; + private readonly List<(NodeId, TimeSpan)> _subscribeNodes; + private readonly List<(NodeId, TimeSpan)> _readNodes; + private readonly ICanSubscribeToNodes _subscriber; + private readonly ICanReadNodes _reader; + private readonly ILogger _logger; + + public DataReader(ConnectorConfiguration config, ICanSubscribeToNodes subscriber, ICanReadNodes reader, ILogger logger) + { + _publishInterval = TimeSpan.FromSeconds(config.PublishIntervalSeconds); + _subscribeNodes = config.Nodes + .Where(_ => _.SubscribeIntervalSeconds is not null) + .Select(_ => (new NodeId(_.NodeId), TimeSpan.FromSeconds(_.SubscribeIntervalSeconds!.Value))) + .ToList(); + _readNodes = config.Nodes + .Where(_ => _.ReadIntervalSeconds is not null) + .Select(_ => (new NodeId(_.NodeId), TimeSpan.FromSeconds(_.ReadIntervalSeconds!.Value))) + .ToList(); + _subscriber = subscriber; + _reader = reader; + _logger = logger; + } + + public async Task ReadDataForever(ISession connection, Func handleValue, CancellationToken cancellationToken) + { + using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + + var (subscription, reader) = (Task.CompletedTask, Task.CompletedTask); + try + { + subscription = Task.Run(() => _subscriber.SubscribeToChangesFor(connection, _publishInterval, _subscribeNodes, handleValue, cts.Token), cts.Token); + reader = Task.Run(() => _reader.ReadNodesForever(connection, _readNodes, handleValue, cts.Token), cts.Token); + + await Task.WhenAny(subscription, reader).ConfigureAwait(false); + _logger.Warning("Reading data completed, it should not..."); + } + catch (Exception error) + { + _logger.Error(error, "Failure occured while reading data"); + throw; + } + finally + { + cts.Cancel(); + await Task.WhenAll(subscription, reader).ConfigureAwait(false); + } + } +} diff --git a/Source/IRetrieveData.cs b/Source/IRetrieveData.cs index 4738af1..9987d2a 100644 --- a/Source/IRetrieveData.cs +++ b/Source/IRetrieveData.cs @@ -8,6 +8,4 @@ namespace RaaLabs.Edge.Connectors.OPCUA; public interface IRetrieveData { Task ReadDataForever(ISession connection, Func handleValue, CancellationToken cancellationToken); - // 1. Create a subscription with monitored item fora ll nodes with subscribeinterval set - // 2. Create one task per node with readinterval set (that reads in loop with timerasync) } diff --git a/Source/Program.cs b/Source/Program.cs index d41a004..5be359b 100644 --- a/Source/Program.cs +++ b/Source/Program.cs @@ -28,6 +28,7 @@ static void Main(string[] args) { _.RegisterInstance(DefaultSessionFactory.Instance).As(); _.RegisterType().As(); + _.RegisterType().As(); }) .Build();