Skip to content

Commit

Permalink
DataReader
Browse files Browse the repository at this point in the history
  • Loading branch information
jakhog committed Jun 13, 2024
1 parent 0bfe316 commit 1509a0a
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 2 deletions.
61 changes: 61 additions & 0 deletions Source/DataReader.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Opc.Ua;
using Opc.Ua.Client;
using Serilog;

namespace RaaLabs.Edge.Connectors.OPCUA;

public class DataReader : IRetrieveData
{
private readonly TimeSpan _publishInterval;
private readonly List<(NodeId, TimeSpan)> _subscribeNodes;
private readonly List<(NodeId, TimeSpan)> _readNodes;
private readonly ICanSubscribeToNodes _subscriber;
private readonly ICanReadNodes _reader;
private readonly ILogger _logger;

public DataReader(ConnectorConfiguration config, ICanSubscribeToNodes subscriber, ICanReadNodes reader, ILogger logger)
{
_publishInterval = TimeSpan.FromSeconds(config.PublishIntervalSeconds);
_subscribeNodes = config.Nodes
.Where(_ => _.SubscribeIntervalSeconds is not null)
.Select(_ => (new NodeId(_.NodeId), TimeSpan.FromSeconds(_.SubscribeIntervalSeconds!.Value)))
.ToList();
_readNodes = config.Nodes
.Where(_ => _.ReadIntervalSeconds is not null)
.Select(_ => (new NodeId(_.NodeId), TimeSpan.FromSeconds(_.ReadIntervalSeconds!.Value)))
.ToList();
_subscriber = subscriber;
_reader = reader;
_logger = logger;
}

public async Task ReadDataForever(ISession connection, Func<NodeValue, Task> handleValue, CancellationToken cancellationToken)
{
using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);

var (subscription, reader) = (Task.CompletedTask, Task.CompletedTask);
try
{
subscription = Task.Run(() => _subscriber.SubscribeToChangesFor(connection, _publishInterval, _subscribeNodes, handleValue, cts.Token), cts.Token);
reader = Task.Run(() => _reader.ReadNodesForever(connection, _readNodes, handleValue, cts.Token), cts.Token);

await Task.WhenAny(subscription, reader).ConfigureAwait(false);
_logger.Warning("Reading data completed, it should not...");
}
catch (Exception error)
{
_logger.Error(error, "Failure occured while reading data");
throw;
}
finally
{
cts.Cancel();
await Task.WhenAll(subscription, reader).ConfigureAwait(false);
}
}
}
2 changes: 0 additions & 2 deletions Source/IRetrieveData.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,4 @@ namespace RaaLabs.Edge.Connectors.OPCUA;
public interface IRetrieveData
{
Task ReadDataForever(ISession connection, Func<NodeValue,Task> handleValue, CancellationToken cancellationToken);
// 1. Create a subscription with monitored item fora ll nodes with subscribeinterval set
// 2. Create one task per node with readinterval set (that reads in loop with timerasync)
}
1 change: 1 addition & 0 deletions Source/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ static void Main(string[] args)
{
_.RegisterInstance(DefaultSessionFactory.Instance).As<ISessionFactory>();
_.RegisterType<Connector>().As<ICreateSessions>();
_.RegisterType<DataReader>().As<IRetrieveData>();
})
.Build();

Expand Down

0 comments on commit 1509a0a

Please sign in to comment.