Skip to content

Commit

Permalink
Subscriber metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
jakhog committed Jun 14, 2024
1 parent 7bead56 commit 4ac947c
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 0 deletions.
18 changes: 18 additions & 0 deletions Source/IMetricsHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@ public interface IMetricsHandler : IMetricsClient, IWithStateFrom<MetricsHandler
[Counter(Name = "messages_sent_total", Unit = "count", Description = "The total number of messages sent", Exported = true)]
public void NumberOfMessagesSent(int value);

#region DataPointParser.cs
[Counter(Name = "opcua_bad_statuscode_received", Unit = "count", Description = "The total number of bad status code received from OPCUA server", Exported = true)]
public void NumberOfBadStatusCodesFor(int value, string nodeId);

[Counter(Name = "opcua_invalid_timestamp_received", Unit = "count", Description = "The total number of invalid timestamps received from OPCUA server or source", Exported = true)]
public void InvalidTimestampReceived(int value);
#endregion

#region Client.cs
[Counter(Name = "opcua_session_connection_attempts_total", Unit = "count", Description = "The total number of connection attempts to the OPCUA server", Exported = true)]
public void NumberOfSessionConnectionAttempts(long value);

Expand All @@ -34,6 +37,21 @@ public interface IMetricsHandler : IMetricsClient, IWithStateFrom<MetricsHandler

[Counter(Name = "opcua_session_connection_time_seconds_total", Unit = "count", Description = "The total time spent waiting for connections to the OPCUA server to open", Exported = true)]
public void SessionConnectionTime(double value);
#endregion

#region Subscriber.cs
[Counter(Name = "opcua_subscription_attempts_total", Unit = "count", Description = "The total number of subscription attempts to the OPCUA server", Exported = true)]
public void NumberOfSubscriptionAttempts(long value);

[Counter(Name = "opcua_subscriptions_total", Unit = "count", Description = "The total number of successful subscriptions to the OPCUA server", Exported = true)]
public void NumberOfSubscriptions(long value);

[Counter(Name = "opcua_subscription_setup_time_seconds_total", Unit = "count", Description = "The total time spent setting up subscriptions to the OPCUA server", Exported = true)]
public void SubscriptionSetupTime(double value);

[Counter(Name = "opcua_subscription_notifications_received_total", Unit = "count", Description = "The total number of notifications received from the OPCUA server", Exported = true)]
public void NumberOfReceivedMonitorNotifications(long value);
#endregion
}

[ExcludeFromCodeCoverage]
Expand Down
7 changes: 7 additions & 0 deletions Source/Subscriber.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
Expand All @@ -23,6 +24,8 @@ public Subscriber(IMetricsHandler metrics, ILogger logger)
public async Task SubscribeToChangesFor(ISession connection, TimeSpan publishInterval, IEnumerable<(NodeId node, TimeSpan samplingInterval)> nodes, Func<NodeValue, Task> handleValue, CancellationToken cancellationToken)
{
_logger.Debug("Creating subscription with publish interval {PublishInterval}", publishInterval);
_metrics.NumberOfSubscriptionAttempts(1);
var timer = Stopwatch.StartNew();

using var subscription = CreateEmptySubscription(publishInterval);
var channel = Channel.CreateUnbounded<NodeValue>(new(){ SingleReader = true });
Expand All @@ -40,10 +43,14 @@ public async Task SubscribeToChangesFor(ISession connection, TimeSpan publishInt
DeleteSubscriptionWhenCancelled(subscription, cancellationToken);
CompleteChannelWhenSubscriptionCompletes(subscription, channel);

_metrics.NumberOfSubscriptions(1);
_metrics.SubscriptionSetupTime(timer.Elapsed.TotalSeconds);

_logger.Debug("Starting to read values from subscription");
await foreach (var value in channel.Reader.ReadAllAsync(CancellationToken.None))
{
_logger.Verbose("Received value {Value} from subscription", value);
_metrics.NumberOfReceivedMonitorNotifications(1);
await handleValue(value).ConfigureAwait(false);
}
}
Expand Down

0 comments on commit 4ac947c

Please sign in to comment.