Skip to content

Commit

Permalink
Merge pull request #30 from RaaLabs/healthchecks
Browse files Browse the repository at this point in the history
Healthchecks
  • Loading branch information
maikberthelsen authored Oct 22, 2024
2 parents c700a2f + 78e2509 commit b8b4f94
Show file tree
Hide file tree
Showing 12 changed files with 87 additions and 40 deletions.
16 changes: 8 additions & 8 deletions Source/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ namespace RaaLabs.Edge.Connectors.OPCUA;

public class Client : ICreateSessions
{
private readonly ApplicationConfiguration _application;
private readonly ConfiguredEndpoint _endpoint;
private readonly UserIdentity _identity;
private readonly ISessionFactory _factory;
private readonly IMetricsHandler _metrics;
private readonly ILogger _logger;
readonly ApplicationConfiguration _application;
readonly ConfiguredEndpoint _endpoint;
readonly UserIdentity _identity;
readonly ISessionFactory _factory;
readonly IMetricsHandler _metrics;
readonly ILogger _logger;

public Client(ConnectorConfiguration config, ISessionFactory factory, IMetricsHandler metrics, ILogger logger)
{
Expand Down Expand Up @@ -66,7 +66,7 @@ public async Task<ISession> ConnectToServer(CancellationToken cancellationToken)
}
}

private static (ConfiguredEndpoint, UserIdentity) ConnectAnonymouslyToAnyServerOn(string url)
static (ConfiguredEndpoint, UserIdentity) ConnectAnonymouslyToAnyServerOn(string url)
{
var descriptor = new EndpointDescription(url);
descriptor.UserIdentityTokens.Add(new UserTokenPolicy(UserTokenType.Anonymous));
Expand All @@ -77,7 +77,7 @@ private static (ConfiguredEndpoint, UserIdentity) ConnectAnonymouslyToAnyServerO
return (endpoint, identity);
}

private static ApplicationConfiguration ConnectorDescription() => new()
static ApplicationConfiguration ConnectorDescription() => new()
{
ApplicationType = ApplicationType.Client,

Expand Down
12 changes: 6 additions & 6 deletions Source/Connector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ namespace RaaLabs.Edge.Connectors.OPCUA;

public class Connector : IRunAsync, IProduceEvent<OpcuaDatapointOutput>
{
private readonly ICreateSessions _sessions;
private readonly IRetrieveData _retriever;
private readonly ICreateDatapointsFromDataValues _datapoints;
private readonly ILogger _logger;
private readonly IMetricsHandler _metrics;
readonly ICreateSessions _sessions;
readonly IRetrieveData _retriever;
readonly ICreateDatapointsFromDataValues _datapoints;
readonly ILogger _logger;
readonly IMetricsHandler _metrics;

public Connector(ICreateSessions sessions, IRetrieveData retriever, ICreateDatapointsFromDataValues datapoints, ILogger logger, IMetricsHandler metrics)
{
Expand Down Expand Up @@ -51,7 +51,7 @@ public async Task Run()
}
}

private Task ConvertAndSendDataValue(NodeValue value)
Task ConvertAndSendDataValue(NodeValue value)
{
_metrics.NumberOfMessagesSent(1);
return SendDatapoint!(_datapoints.CreateDatapointFrom(value));
Expand Down
18 changes: 9 additions & 9 deletions Source/DataReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ namespace RaaLabs.Edge.Connectors.OPCUA;

public class DataReader : IRetrieveData
{
private readonly TimeSpan _publishInterval;
private readonly List<(NodeId, TimeSpan)> _subscribeNodes;
private readonly List<(NodeId, TimeSpan)> _readNodes;
private readonly ICanSubscribeToNodes _subscriber;
private readonly ICanReadNodes _reader;
private readonly ILogger _logger;
readonly TimeSpan _publishInterval;
readonly List<(NodeId, TimeSpan)> _subscribeNodes;
readonly List<(NodeId, TimeSpan)> _readNodes;
readonly ICanSubscribeToNodes _subscriber;
readonly ICanReadNodes _reader;
readonly ILogger _logger;

public DataReader(ConnectorConfiguration config, ICanSubscribeToNodes subscriber, ICanReadNodes reader, ILogger logger)
{
Expand Down Expand Up @@ -74,21 +74,21 @@ public async Task ReadDataForever(ISession connection, Func<NodeValue, Task> han
}
}

private Task SubscribeOrSleep(ISession connection, Func<NodeValue, Task> handleValue, CancellationToken cancellationToken) =>
Task SubscribeOrSleep(ISession connection, Func<NodeValue, Task> handleValue, CancellationToken cancellationToken) =>
_subscribeNodes.Count switch
{
0 => Task.Delay(Timeout.InfiniteTimeSpan, cancellationToken),
_ => Task.Run(() => _subscriber.SubscribeToChangesFor(connection, _publishInterval, _subscribeNodes, handleValue, cancellationToken), cancellationToken)
};

private Task ReadOrSleep(ISession connection, Func<NodeValue, Task> handleValue, CancellationToken cancellationToken) =>
Task ReadOrSleep(ISession connection, Func<NodeValue, Task> handleValue, CancellationToken cancellationToken) =>
_readNodes.Count switch
{
0 => Task.Delay(Timeout.InfiniteTimeSpan, cancellationToken),
_ => Task.Run(() => _reader.ReadNodesForever(connection, _readNodes, handleValue, cancellationToken), cancellationToken)
};

private static void ThrowIfFailedWithError(Task task)
static void ThrowIfFailedWithError(Task task)
{
if (task.Exception?.GetBaseException() is {} error and not OperationCanceledException)
{
Expand Down
46 changes: 46 additions & 0 deletions Source/HealthCheck.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
using System;
using System.Globalization;
using RaaLabs.Edge.Connectors.OPCUA.Events;
using RaaLabs.Edge.Modules.Diagnostics.Health;
using RaaLabs.Edge.Modules.EventHandling;
using Serilog;

namespace RaaLabs.Edge.Connectors.OPCUA;

public class HealthCheck : IConsumeEvent<OpcuaDatapointOutput>, IExposeHealthStatus
{
readonly TimeSpan _maxTimeBetweenDatapoints;
readonly TimeProvider _clock;
readonly ILogger _logger;
DateTimeOffset _lastTimestampReceived;

public HealthCheck(TimeProvider clock, ILogger logger)
{
_maxTimeBetweenDatapoints = TimeSpan.FromSeconds(double.Parse(Environment.GetEnvironmentVariable("HEALTHCHECK_MAX_SECONDS_BETWEEN_DATAPOINTS") ?? "120", CultureInfo.CurrentCulture));
_clock = clock;
_logger = logger;
_lastTimestampReceived = _clock.GetUtcNow();
}

public void Handle(OpcuaDatapointOutput @event)
{
_lastTimestampReceived = _clock.GetUtcNow();
_logger.Debug("Sent data, updating last timestamp received in healthcheck with {LastTimestampReceived}", _lastTimestampReceived);
}

public bool IsReady => true;
public bool IsHealthy
{
get
{
var timeSinceLastDatapoint = _clock.GetUtcNow() - _lastTimestampReceived;
if (timeSinceLastDatapoint > _maxTimeBetweenDatapoints)
{
_logger.Warning("Time since data was sent is {Since}, returning unhealthy", timeSinceLastDatapoint);
return false;
}
_logger.Debug("Time since data was sent is {Since}, returning healthy", timeSinceLastDatapoint);
return true;
}
}
}
8 changes: 4 additions & 4 deletions Source/OPCUA.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
<PackageReference Include="RaaLabs.Edge.Modules.Configuration" Version="1.16.3" />
<PackageReference Include="RaaLabs.Edge.Modules.EdgeHub" Version="1.16.3" />
<PackageReference Include="RaaLabs.Edge.Modules.Diagnostics" Version="1.16.3" />
<PackageReference Include="OPCFoundation.NetStandard.Opc.Ua" Version="1.5.374.70" />
<PackageReference Include="OPCFoundation.NetStandard.Opc.Ua.Client" Version="1.5.374.70" />
<PackageReference Include="OPCFoundation.NetStandard.Opc.Ua.Configuration" Version="1.5.374.70" />
<PackageReference Include="OPCFoundation.NetStandard.Opc.Ua.Core" Version="1.5.374.70" />
<PackageReference Include="OPCFoundation.NetStandard.Opc.Ua.Server" Version="1.5.374.118" />
<PackageReference Include="OPCFoundation.NetStandard.Opc.Ua.Client" Version="1.5.374.118" />
<PackageReference Include="OPCFoundation.NetStandard.Opc.Ua.Configuration" Version="1.5.374.118" />
<PackageReference Include="OPCFoundation.NetStandard.Opc.Ua.Core" Version="1.5.374.118" />
</ItemGroup>
</Project>
1 change: 1 addition & 0 deletions Source/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ static void Main(string[] args)
.WithModule<Configuration>()
.WithModule<EdgeHub>()
.WithModule<RaaLabsDiagnostics>()
.WithHandler<HealthCheck>()
.WithTask<Connector>()
.WithManualRegistration(_ =>
{
Expand Down
6 changes: 3 additions & 3 deletions Source/Reader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ namespace RaaLabs.Edge.Connectors.OPCUA;

public class Reader : ICanReadNodes
{
private readonly ILogger _logger;
private readonly IMetricsHandler _metrics;
readonly ILogger _logger;
readonly IMetricsHandler _metrics;

public Reader(ILogger logger, IMetricsHandler metrics)
{
Expand Down Expand Up @@ -50,7 +50,7 @@ public async Task ReadNodesForever(ISession connection, IEnumerable<(NodeId node
_logger.Debug("Read operation completed...");
}

private async Task ReadNodeForever(ISession connection, NodeId node, TimeSpan readInterval, Func<NodeValue, Task> handleValue, CancellationToken cancellationToken)
static async Task ReadNodeForever(ISession connection, NodeId node, TimeSpan readInterval, Func<NodeValue, Task> handleValue, CancellationToken cancellationToken)
{
using var timer = new PeriodicTimer(readInterval);
while (!cancellationToken.IsCancellationRequested)
Expand Down
12 changes: 6 additions & 6 deletions Source/Subscriber.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ namespace RaaLabs.Edge.Connectors.OPCUA;

public class Subscriber : ICanSubscribeToNodes
{
private readonly IMetricsHandler _metrics;
private readonly ILogger _logger;
readonly IMetricsHandler _metrics;
readonly ILogger _logger;

public Subscriber(IMetricsHandler metrics, ILogger logger)
{
Expand Down Expand Up @@ -58,7 +58,7 @@ public async Task SubscribeToChangesFor(ISession connection, TimeSpan publishInt
}
}

private MonitoredItem MonitoringFor(NodeId nodeId, TimeSpan samplingInterval, ChannelWriter<NodeValue> writer)
MonitoredItem MonitoringFor(NodeId nodeId, TimeSpan samplingInterval, ChannelWriter<NodeValue> writer)
{
var monitored = new MonitoredItem()
{
Expand All @@ -83,14 +83,14 @@ private MonitoredItem MonitoringFor(NodeId nodeId, TimeSpan samplingInterval, Ch
return monitored;
}

private void DeleteSubscriptionWhenCancelled(Subscription subscription, CancellationToken cancellationToken) =>
void DeleteSubscriptionWhenCancelled(Subscription subscription, CancellationToken cancellationToken) =>
cancellationToken.Register(() =>
{
_logger.Debug("CancellationToken cancelled, deleting subscription");
subscription.Delete(true);
});

private void CompleteChannelWhenSubscriptionCompletes(Subscription subscription, ChannelWriter<NodeValue> writer)
void CompleteChannelWhenSubscriptionCompletes(Subscription subscription, ChannelWriter<NodeValue> writer)
{
subscription.PublishStatusChanged += (_, changed) =>
{
Expand All @@ -112,7 +112,7 @@ private void CompleteChannelWhenSubscriptionCompletes(Subscription subscription,
};
}

private static Subscription CreateEmptySubscription(TimeSpan publishInterval) =>
static Subscription CreateEmptySubscription(TimeSpan publishInterval) =>
new()
{
PublishingInterval = (int)publishInterval.TotalMilliseconds,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public class and_datavalue_received_then_cancelled : given.a_reader
static IEnumerable<(NodeId node, TimeSpan readInterval)> nodes;
static List<NodeValue> handled_values;

Establish context = async () =>
Establish context = () =>
{
nodes = [(new NodeId(321), TimeSpan.FromSeconds(1))];

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public class and_exception_thrown_from_handlevalue : given.a_reader
static IEnumerable<(NodeId node, TimeSpan readInterval)> nodes;
static CancellationToken ct;

Establish context = async () =>
Establish context = () =>
{
nodes = [
(new NodeId(321), TimeSpan.FromSeconds(1)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public class and_exception_thrown_from_readvalueasync : given.a_reader
static IEnumerable<(NodeId node, TimeSpan readInterval)> nodes;
static CancellationToken ct;

Establish context = async () =>
Establish context = () =>
{
nodes = [
(new NodeId(321), TimeSpan.FromSeconds(1)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public class from_multiple_nodes_with_different_readinterval : given.a_reader
static IEnumerable<(NodeId node, TimeSpan readInterval)> nodes;
static List<NodeValue> handled_values;

Establish context = async () =>
Establish context = () =>
{
nodes = [
(new NodeId(321), TimeSpan.FromSeconds(1)),
Expand Down

0 comments on commit b8b4f94

Please sign in to comment.