Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify and refactor #21

Merged
merged 43 commits into from
Jun 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
9db227c
Remove certificate validation
jakhog Jun 11, 2024
08c03fc
Start preparing new structure
jakhog Jun 11, 2024
8f36d65
Ready to start on new structure
jakhog Jun 12, 2024
9ba2258
More splitting
jakhog Jun 12, 2024
5c058e5
Now with samples
jakhog Jun 12, 2024
7fc4c92
Use ISession interface instead of concrete type
jakhog Jun 13, 2024
e61ccee
Make code style work with VScode
jakhog Jun 13, 2024
e62e93a
Implementation and tests for creating sessions
jakhog Jun 13, 2024
a21f32f
Forgot the container setup
jakhog Jun 13, 2024
0bfe316
Use NodeId type instead of string
jakhog Jun 13, 2024
1509a0a
DataReader
jakhog Jun 13, 2024
d7333ae
add RL copyright
katarinagud Jun 13, 2024
105d7a2
add optional source to configuration
katarinagud Jun 13, 2024
5654e51
create datapoints from datavalues
katarinagud Jun 13, 2024
b839642
merge invalid timestamps into one
katarinagud Jun 13, 2024
9148bca
add tests for parser
katarinagud Jun 13, 2024
ea3f006
Tests for DataReader
jakhog Jun 13, 2024
1425652
Fix the behaviour
jakhog Jun 13, 2024
e6050ec
Merge remote-tracking branch 'origin/simplify-n-refactor' into simpli…
jakhog Jun 13, 2024
6bcc5a3
log for bad statuscode
katarinagud Jun 13, 2024
2314abf
add nodereader
katarinagud Jun 13, 2024
4e207d1
Initial code for Subscriber
jakhog Jun 13, 2024
2255241
add all the things to program
katarinagud Jun 14, 2024
844cf8e
Add lots of logs
jakhog Jun 14, 2024
60cabb5
howboutnow?
katarinagud Jun 14, 2024
bfa12b8
WhenAny
jakhog Jun 14, 2024
7bead56
Tests for Subscriber
jakhog Jun 14, 2024
4ac947c
Subscriber metrics
jakhog Jun 14, 2024
24b335a
Time with zone?
jakhog Jun 14, 2024
1ece0b9
Fix the other tests
jakhog Jun 14, 2024
c569816
reader tests
katarinagud Jun 17, 2024
e292761
Add tests requested by Sonarcloud, and rewrite exception handling
jakhog Jun 17, 2024
d772900
Merge remote-tracking branch 'origin/simplify-n-refactor' into simpli…
jakhog Jun 17, 2024
591263e
Add tests for Connector
jakhog Jun 17, 2024
a72b9b8
Ensure license header is everywhere
jakhog Jun 17, 2024
090b5c9
reader tests
katarinagud Jun 17, 2024
78ba546
maybe this is better
katarinagud Jun 17, 2024
2ff849c
with callback, but is still cancelled
katarinagud Jun 17, 2024
0961e6f
rename to match class
katarinagud Jun 18, 2024
64a5117
catch cancelexception
katarinagud Jun 18, 2024
7cece85
metrics and cleanup
katarinagud Jun 18, 2024
7a4b780
whopsi
katarinagud Jun 18, 2024
f16c855
update config example in readme
katarinagud Jun 18, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 12 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,18 @@ The module is configured using a JSON file. `connector.json` represents the conn
```json
{
"serverUrl": "opc.tcp://<HOST>:<PORT>/<SERVERNAME>",
"nodeIds": [
"ns=3;i=1002",
"ns=3;i=1001"
],
"opcUaServerCertificateIssuer": "Name of certificate issuer", // used to verify untrusted server certificates
"opcUaServerCertificateSubject": "Issuer subject", // used to verify untrusted server certificates
"opcUaServerAutoAcceptUntrustedCertificates": false
"publishingIntervalSeconds": 1.0,
"nodes": [
{
"id": "ns=3;i=1002",
"subscribeIntervalSeconds": 1.0,
"readIntervalSeconds": 1.0
},
{
"id": "ns=3;i=1001",
"readIntervalSeconds": 1.0
}
]
}
```

Expand Down
File renamed without changes.
9 changes: 0 additions & 9 deletions Source/BatchConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,8 @@

namespace RaaLabs.Edge.Connectors.OPCUA;

/// <summary>
/// Config class for batching settings.
/// </summary>
public class BatchConfiguration : IEdgeHubOutgoingEventBatchConfiguration
{
/// <summary>
/// The batch size.
/// </summary>
public int BatchSize { get; set; } = int.Parse(Environment.GetEnvironmentVariable("EDGEHUB_BATCH_SIZE") ?? "250", CultureInfo.CurrentCulture);
/// <summary>
/// The batch interval in milliseconds.
/// </summary>
public int Interval { get; set; } = int.Parse(Environment.GetEnvironmentVariable("EDGEHUB_BATCH_INTERVAL") ?? "5000", CultureInfo.CurrentCulture);
}
88 changes: 88 additions & 0 deletions Source/Client.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright (c) RaaLabs. All rights reserved.
// Licensed under the GPLv2 License. See LICENSE file in the project root for full license information.

using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Opc.Ua;
using Opc.Ua.Client;
using Serilog;

namespace RaaLabs.Edge.Connectors.OPCUA;

public class Client : ICreateSessions
{
private readonly ApplicationConfiguration _application;
private readonly ConfiguredEndpoint _endpoint;
private readonly UserIdentity _identity;
private readonly ISessionFactory _factory;
private readonly IMetricsHandler _metrics;
private readonly ILogger _logger;

public Client(ConnectorConfiguration config, ISessionFactory factory, IMetricsHandler metrics, ILogger logger)
{
_application = ConnectorDescription();
_application.ClientConfiguration = new()
{
DefaultSessionTimeout = (int)TimeSpan.FromHours(1).TotalMilliseconds,
};

(_endpoint, _identity) = ConnectAnonymouslyToAnyServerOn(config.ServerUrl);
_factory = factory;
_metrics = metrics;
_logger = logger;
}

public async Task<ISession> ConnectToServer(CancellationToken cancellationToken)
{
try
{
var sessionName = $"RaaEdge OPCUA Connector {Guid.NewGuid()}";
_logger.Information("Creating session '{SessionName}' with OPCUA server on '{EndpointUrl}'", sessionName, _endpoint.EndpointUrl);
_metrics.NumberOfSessionConnectionAttempts(1);
var timer = Stopwatch.StartNew();

var session = await _factory.CreateAsync(
_application,
_endpoint,
updateBeforeConnect: false,
checkDomain: false,
sessionName,
(uint)_application.ClientConfiguration.DefaultSessionTimeout,
_identity,
[],
cancellationToken
).ConfigureAwait(false);

_metrics.NumberOfSessionConnections(1);
_metrics.SessionConnectionTime(timer.Elapsed.TotalSeconds);
return session;
}
catch (Exception error)
{
_logger.Error(error, "Failed to connect to server");
throw;
}
}

private static (ConfiguredEndpoint, UserIdentity) ConnectAnonymouslyToAnyServerOn(string url)
{
var descriptor = new EndpointDescription(url);
descriptor.UserIdentityTokens.Add(new UserTokenPolicy(UserTokenType.Anonymous));
descriptor.Server.ApplicationUri = null;
var endpoint = new ConfiguredEndpoint(null, descriptor);

var identity = new UserIdentity(new AnonymousIdentityToken());
return (endpoint, identity);
}

private static ApplicationConfiguration ConnectorDescription() => new()
{
ApplicationType = ApplicationType.Client,

ApplicationName = "RaaEDGE OPC UA Connector",
ApplicationUri = "https://github.com/RaaLabs/Connectors.OPCUA",
ProductUri = "https://github.com/RaaLabs",
};
}
59 changes: 59 additions & 0 deletions Source/Connector.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright (c) RaaLabs. All rights reserved.
// Licensed under the GPLv2 License. See LICENSE file in the project root for full license information.

using System;
using System.Threading;
using System.Threading.Tasks;
using RaaLabs.Edge.Connectors.OPCUA.Events;
using RaaLabs.Edge.Modules.EventHandling;
using Serilog;

namespace RaaLabs.Edge.Connectors.OPCUA;

public class Connector : IRunAsync, IProduceEvent<OpcuaDatapointOutput>
{
private readonly ICreateSessions _sessions;
private readonly IRetrieveData _retriever;
private readonly ICreateDatapointsFromDataValues _datapoints;
private readonly ILogger _logger;
private readonly IMetricsHandler _metrics;

public Connector(ICreateSessions sessions, IRetrieveData retriever, ICreateDatapointsFromDataValues datapoints, ILogger logger, IMetricsHandler metrics)
{
_sessions = sessions;
_retriever = retriever;
_datapoints = datapoints;
_logger = logger;
_metrics = metrics;
}

public event AsyncEventEmitter<OpcuaDatapointOutput>? SendDatapoint;

public async Task Run()
{
while (true)
{
try
{
_logger.Information("Initiating connection to server");
var connection = await _sessions.ConnectToServer(CancellationToken.None).ConfigureAwait(false);

_logger.Information("Starting data reader");
await _retriever.ReadDataForever(connection, ConvertAndSendDataValue, CancellationToken.None).ConfigureAwait(false);

_logger.Warning("Data reader stopped");
}
catch (Exception error)
{
_logger.Error(error, "Failure occured while connecting or reading data");
await Task.Delay(TimeSpan.FromSeconds(1)).ConfigureAwait(false);
}
}
}

private Task ConvertAndSendDataValue(NodeValue value)
{
_metrics.NumberOfMessagesSent(1);
return SendDatapoint!(_datapoints.CreateDatapointFrom(value));
}
}
24 changes: 24 additions & 0 deletions Source/ConnectorConfiguration.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright (c) RaaLabs. All rights reserved.
// Licensed under the GPLv2 License. See LICENSE file in the project root for full license information.

using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using RaaLabs.Edge.Modules.Configuration;

namespace RaaLabs.Edge.Connectors.OPCUA;

[Name("configuration.json"), RestartOnChange, ExcludeFromCodeCoverage]
public class ConnectorConfiguration : IConfiguration
{
public required string ServerUrl { get; init; }
public double PublishIntervalSeconds { get; init; } = 1.0;
public IList<NodeConfiguration> Nodes { get; init; } = [];
public string? Source { get; set; }
}

public class NodeConfiguration
{
public required string NodeId { get; init; }
public double? SubscribeIntervalSeconds { get; init; }
public double? ReadIntervalSeconds { get; init; }
}
65 changes: 65 additions & 0 deletions Source/DataPointParser.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright (c) RaaLabs. All rights reserved.
// Licensed under the GPLv2 License. See LICENSE file in the project root for full license information.

using System;
using RaaLabs.Edge.Connectors.OPCUA.Events;
using Serilog;
using StatusCode = Opc.Ua.StatusCode;

namespace RaaLabs.Edge.Connectors.OPCUA;

public class DataPointParser : ICreateDatapointsFromDataValues
{
private readonly ConnectorConfiguration _configuration;
private readonly ILogger _logger;
private readonly IMetricsHandler _metrics;
private readonly TimeProvider _clock;

public DataPointParser(ConnectorConfiguration configuration, ILogger logger, IMetricsHandler metrics, TimeProvider clock)
{
_configuration = configuration;
_logger = logger;
_metrics = metrics;
_clock = clock;
}

public OpcuaDatapointOutput CreateDatapointFrom(NodeValue nodeValue)
{
LogWarningIfStatusCodeIsNotGood(nodeValue);
LogWarningIfFaultyTimestamp(nodeValue, nodeValue.Value.ServerTimestamp, "Server");
LogWarningIfFaultyTimestamp(nodeValue, nodeValue.Value.SourceTimestamp, "Source");

return new()
{
Source = _configuration.Source ?? "OPCUA",
Tag = nodeValue.Node.ToString(),
Value = nodeValue.Value.Value,
Timestamp = _clock.GetUtcNow().ToUnixTimeMilliseconds()
};
}

private void LogWarningIfStatusCodeIsNotGood(NodeValue nodeValue)
{
if (StatusCode.IsGood(nodeValue.Value.StatusCode) && StatusCode.IsNotBad(nodeValue.Value.StatusCode)) return;

_logger.Warning("Bad status code for node {NodeId} - {StatusCode}", nodeValue.Node, nodeValue.Value.StatusCode);
_metrics.NumberOfBadStatusCodesFor(1, nodeValue.Node.ToString()!);
}

private void LogWarningIfFaultyTimestamp(NodeValue nodeValue, DateTime timestamp, string timestampType)
{
var dateTimeOffset = (DateTimeOffset) timestamp;
var utcNow = _clock.GetUtcNow();

if (dateTimeOffset > utcNow + TimeSpan.FromMinutes(15))
{
_logger.Warning("Timestamp more than 15 minutes from the future for node {NodeValueNode} - {Timestamp}. Timestamp from {Source} is never used as property for OpcuaDatapointOutput", nodeValue.Node, timestamp, timestampType);
_metrics.NumberOfFutureTimestampsFor(1, nodeValue.Node.ToString());
}
else if (dateTimeOffset < utcNow - TimeSpan.FromMinutes(15))
{
_logger.Warning("Timestamp older than 15 minutes for node {NodeValueNode} - {Timestamp}. Timestamp from {Source} is never used as property for OpcuaDatapointOutput", nodeValue.Node, timestamp, timestampType);
_metrics.NumberOfOldTimestampsFor(1, nodeValue.Node.ToString());
}
}
}
98 changes: 98 additions & 0 deletions Source/DataReader.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Copyright (c) RaaLabs. All rights reserved.
// Licensed under the GPLv2 License. See LICENSE file in the project root for full license information.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Opc.Ua;
using Opc.Ua.Client;
using Serilog;

namespace RaaLabs.Edge.Connectors.OPCUA;

public class DataReader : IRetrieveData
{
private readonly TimeSpan _publishInterval;
private readonly List<(NodeId, TimeSpan)> _subscribeNodes;
private readonly List<(NodeId, TimeSpan)> _readNodes;
private readonly ICanSubscribeToNodes _subscriber;
private readonly ICanReadNodes _reader;
private readonly ILogger _logger;

public DataReader(ConnectorConfiguration config, ICanSubscribeToNodes subscriber, ICanReadNodes reader, ILogger logger)
{
_publishInterval = TimeSpan.FromSeconds(config.PublishIntervalSeconds);
_subscribeNodes = config.Nodes
.Where(_ => _.SubscribeIntervalSeconds is not null)
.Select(_ => (new NodeId(_.NodeId), TimeSpan.FromSeconds(_.SubscribeIntervalSeconds!.Value)))
.ToList();
_readNodes = config.Nodes
.Where(_ => _.ReadIntervalSeconds is not null)
.Select(_ => (new NodeId(_.NodeId), TimeSpan.FromSeconds(_.ReadIntervalSeconds!.Value)))
.ToList();
_subscriber = subscriber;
_reader = reader;
_logger = logger;
}

public async Task ReadDataForever(ISession connection, Func<NodeValue, Task> handleValue, CancellationToken cancellationToken)
{
using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);

var (subscription, reader) = (Task.CompletedTask, Task.CompletedTask);
try
{
_logger.Information("Starting data reader for {SubscriptionCount} subscriptions and {ReadCount} reads", _subscribeNodes.Count, _readNodes.Count);

subscription = SubscribeOrSleep(connection, handleValue, cts.Token);
reader = ReadOrSleep(connection, handleValue, cts.Token);

await Task.WhenAny(subscription, reader).ConfigureAwait(false);

ThrowIfFailedWithError(subscription);
ThrowIfFailedWithError(reader);

_logger.Warning("Reading data completed, it should not...");
}
catch (Exception error) when (error is not OperationCanceledException)
{
_logger.Error(error, "Failure occured while reading data");
}

cts.Cancel();
try
{
_logger.Information("Waiting for subscription and reader to complete");
await subscription.ConfigureAwait(false);
await reader.ConfigureAwait(false);
}
catch (OperationCanceledException)
{
// Ignore
}
}

private Task SubscribeOrSleep(ISession connection, Func<NodeValue, Task> handleValue, CancellationToken cancellationToken) =>
_subscribeNodes.Count switch
{
0 => Task.Delay(Timeout.InfiniteTimeSpan, cancellationToken),
_ => Task.Run(() => _subscriber.SubscribeToChangesFor(connection, _publishInterval, _subscribeNodes, handleValue, cancellationToken), cancellationToken)
};

private Task ReadOrSleep(ISession connection, Func<NodeValue, Task> handleValue, CancellationToken cancellationToken) =>
_readNodes.Count switch
{
0 => Task.Delay(Timeout.InfiniteTimeSpan, cancellationToken),
_ => Task.Run(() => _reader.ReadNodesForever(connection, _readNodes, handleValue, cancellationToken), cancellationToken)
};

private static void ThrowIfFailedWithError(Task task)
{
if (task.Exception?.GetBaseException() is {} error and not OperationCanceledException)
{
throw error;
}
}
}
16 changes: 16 additions & 0 deletions Source/ICanReadNodes.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright (c) RaaLabs. All rights reserved.
// Licensed under the GPLv2 License. See LICENSE file in the project root for full license information.

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Opc.Ua;
using Opc.Ua.Client;

namespace RaaLabs.Edge.Connectors.OPCUA;

public interface ICanReadNodes
{
Task ReadNodesForever(ISession connection, IEnumerable<(NodeId node, TimeSpan readInterval)> nodes, Func<NodeValue,Task> handleValue, CancellationToken cancellationToken);
}
Loading
Loading