Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Lame Duck Mode Event Handler #716

Merged
merged 6 commits into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/NATS.Client.Core/INatsConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ public interface INatsConnection : INatsClient
/// </summary>
event AsyncEventHandler<NatsMessageDroppedEventArgs>? MessageDropped;

/// <summary>
/// Event that is raised when server goes into Lame Duck Mode.
/// </summary>
public event AsyncEventHandler<NatsLameDuckModeActivatedEventArgs>? LameDuckModeActivated;

/// <summary>
/// Server information received from the NATS server.
/// </summary>
Expand Down
23 changes: 21 additions & 2 deletions src/NATS.Client.Core/NatsConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,13 @@ internal enum NatsEvent
ConnectionDisconnected,
ReconnectFailed,
MessageDropped,
LameDuckModeActivated,
}

public partial class NatsConnection : INatsConnection
{
#pragma warning disable SA1401
internal readonly ConnectionStatsCounter Counter; // allow to call from external sources
internal volatile ServerInfo? WritableServerInfo;

#pragma warning restore SA1401
private readonly object _gate = new object();
private readonly ILogger<NatsConnection> _logger;
Expand All @@ -44,6 +43,7 @@ public partial class NatsConnection : INatsConnection
private readonly ClientOpts _clientOpts;
private readonly SubscriptionManager _subscriptionManager;

private ServerInfo? _writableServerInfo;
private int _pongCount;
private int _connectionState;
private int _isDisposed;
Expand Down Expand Up @@ -109,6 +109,8 @@ public NatsConnection(NatsOpts opts)

public event AsyncEventHandler<NatsMessageDroppedEventArgs>? MessageDropped;

public event AsyncEventHandler<NatsLameDuckModeActivatedEventArgs>? LameDuckModeActivated;

public INatsConnection Connection => this;

public NatsOpts Opts { get; }
Expand All @@ -134,6 +136,20 @@ private set

public Func<ISocketConnection, ValueTask<ISocketConnection>>? OnSocketAvailableAsync { get; set; }

internal ServerInfo? WritableServerInfo
{
get => Interlocked.CompareExchange(ref _writableServerInfo, null, null);
set
{
if (value?.LameDuckMode == true)
{
_eventChannel.Writer.TryWrite((NatsEvent.LameDuckModeActivated, new NatsLameDuckModeActivatedEventArgs(_currentConnectUri!.Uri)));
}

Interlocked.Exchange(ref _writableServerInfo, value);
}
}

internal bool IsDisposed
{
get => Interlocked.CompareExchange(ref _isDisposed, 0, 0) == 1;
Expand Down Expand Up @@ -762,6 +778,9 @@ private async Task PublishEventsAsync()
case NatsEvent.MessageDropped when MessageDropped != null && args is NatsMessageDroppedEventArgs error:
await MessageDropped.InvokeAsync(this, error).ConfigureAwait(false);
break;
case NatsEvent.LameDuckModeActivated when LameDuckModeActivated != null && args is NatsLameDuckModeActivatedEventArgs uri:
await LameDuckModeActivated.InvokeAsync(this, uri).ConfigureAwait(false);
break;
}
}
}
Expand Down
8 changes: 8 additions & 0 deletions src/NATS.Client.Core/NatsEventArgs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,11 @@ public NatsMessageDroppedEventArgs(NatsSubBase subscription, int pending, string

public object? Data { get; }
}

public class NatsLameDuckModeActivatedEventArgs : NatsEventArgs
{
public NatsLameDuckModeActivatedEventArgs(Uri uri)
: base("Lame duck mode activated") => Uri = uri;

public Uri Uri { get; }
}
55 changes: 55 additions & 0 deletions tests/NATS.Client.Core.Tests/NatsConnectionTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,61 @@ public async Task ReconnectOnOpenConnection_ShouldDisconnectAndOpenNewConnection
openedCount.ShouldBe(1);
disconnectedCount.ShouldBe(1);
}

[SkipIfNatsServer(versionEarlierThan: "2.10")]
public async Task LameDuckModeActivated_EventHandlerShouldBeInvokedWhenInfoWithLDMReceived()
{
await using var natsServer = NatsServer.Start(
_output,
new NatsServerOptsBuilder()
.AddServerConfigText("""
accounts: {
SYS: {
users: [
{user: "sys", password: "password"}
]
},
}

system_account: SYS
""")
.UseTransport(_transportType)
.Build());

var natsOpts = new NatsOpts
{
Url = natsServer.ClientUrl,
AuthOpts = new NatsAuthOpts
{
Username = "sys",
Password = "password",
},
};

await using var connection = natsServer.CreateClientConnection(natsOpts);
await connection.ConnectAsync();

var invocationCount = 0;
var ldmSignal = new WaitSignal();

connection.LameDuckModeActivated += (_, _) =>
{
Interlocked.Increment(ref invocationCount);
ldmSignal.Pulse();
return default;
};

var subject = $"$SYS.REQ.SERVER.{connection.ServerInfo!.Id}.LDM";

// Act
await connection.RequestAsync<string, string>(
subject: subject,
data: $$"""{"cid":{{connection.ServerInfo!.ClientId}}}""");
await ldmSignal;

// Assert
invocationCount.ShouldBe(1);
}
}

[JsonSerializable(typeof(SampleClass))]
Expand Down
2 changes: 2 additions & 0 deletions tests/NATS.Client.JetStream.Tests/NatsJsContextFactoryTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ public class MockConnection : INatsConnection
public event AsyncEventHandler<NatsEventArgs>? ReconnectFailed;

public event AsyncEventHandler<NatsMessageDroppedEventArgs>? MessageDropped;

public event AsyncEventHandler<NatsLameDuckModeActivatedEventArgs>? LameDuckModeActivated;
#pragma warning restore CS0067

public INatsServerInfo? ServerInfo { get; } = null;
Expand Down
Loading