Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add GetAync method to IRequiredActor<TActor> to resolve issues where actor is not available upon injection (i.e. BackgroundServices) #264

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ namespace Akka.Hosting
public interface IRequiredActor<TActor>
{
Akka.Actor.IActorRef ActorRef { get; }
System.Threading.Tasks.Task<Akka.Actor.IActorRef> GetAsync(System.Threading.CancellationToken cancellationToken = default);
}
public sealed class LoggerConfigBuilder
{
Expand Down Expand Up @@ -152,6 +153,7 @@ namespace Akka.Hosting
{
public RequiredActor(Akka.Hosting.IReadOnlyActorRegistry registry) { }
public Akka.Actor.IActorRef ActorRef { get; }
public System.Threading.Tasks.Task<Akka.Actor.IActorRef> GetAsync(System.Threading.CancellationToken cancellationToken = default) { }
}
public delegate System.Threading.Tasks.Task StartupTask(Akka.Actor.ActorSystem system, Akka.Hosting.IActorRegistry registry);
public enum TriStateValue
Expand Down
1 change: 1 addition & 0 deletions src/Akka.Hosting.Tests/Akka.Hosting.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Akka.Hosting.TestKit\Akka.Hosting.TestKit.csproj" />
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needed to add the Hosting.TestKit in order to test the BackgroundService issue with #208

<ProjectReference Include="..\Akka.Hosting\Akka.Hosting.csproj" />
</ItemGroup>

Expand Down
108 changes: 108 additions & 0 deletions src/Akka.Hosting.Tests/Bugfix208Specs.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Actor.Dsl;
using FluentAssertions;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Xunit;

namespace Akka.Hosting.Tests;

public class Bugfix208Specs : TestKit.TestKit
{
private class MyTestActor : ReceiveActor
{
public record SetData(string Data);

public record GetData();

private string _data = string.Empty;

public MyTestActor()
{
Receive<SetData>(s =>
{
_data = s.Data;
});

Receive<GetData>(g =>
{
Sender.Tell(_data);
});
}
}

private class TestActorKey{}

private class MyBackgroundService : BackgroundService
{
private readonly IRequiredActor<TestActorKey> _testActor;

public MyBackgroundService(IRequiredActor<TestActorKey> requiredActor)
{
_testActor = requiredActor;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we tried to resolve the IRequiredActor<TActor>.ActorRef here, like users did in #208, but it won't work - if we block the constructor of a BackgroundService here it'll stop the AkkaService and another services from starting. It's too fragile of a fix for end-users, so instead we're going with throwing a clearer exception when an ActorRegistry miss occurs and offer an additional API on the IRequiredActor<TActor> interface to address it.

}

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var myRef = await _testActor.GetAsync(stoppingToken);
myRef.Tell("BackgroundService started");
}
}

protected override void ConfigureServices(HostBuilderContext context, IServiceCollection services)
{
services.AddHostedService<MyBackgroundService>();
base.ConfigureServices(context, services);
}

protected override void ConfigureAkka(AkkaConfigurationBuilder builder, IServiceProvider provider)
{
builder.WithActors((system, registry, arg3) =>
{
registry.Register<TestActorKey>(system.ActorOf(Props.Create(() => new MyTestActor()), "test-actor"));
});
}

/// <summary>
/// Reproduction for https://github.com/akkadotnet/Akka.Hosting/issues/208
/// </summary>
[Fact]
public async Task ShouldStartHostedServiceThatDependsOnActor()
{
// arrange
var testActorRef = ActorRegistry.Get<TestActorKey>();

// act

// assert

// workaround for https://github.com/akkadotnet/Akka.Hosting/issues/265
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the reproduction for #265

var attempts = 5;
do
{
attempts--;
try
{
var r = await testActorRef.Ask<string>(new MyTestActor.GetData(), TimeSpan.FromMilliseconds(100));
r.Should().Be("BackgroundService started");
}
catch (Exception e)
{
attempts--;
if (attempts == 0)
{
throw;
}
}
} while (attempts > 0);

// await AwaitAssertAsync(async () =>
// {
// var r = await testActorRef.Ask<string>(new MyTestActor.GetData(), TimeSpan.FromMilliseconds(100));
// r.Should().Be("BackgroundService started");
// }, RemainingOrDefault, TimeSpan.FromMilliseconds(150));
}
}
2 changes: 1 addition & 1 deletion src/Akka.Hosting.Tests/Logging/LogMessageFormatterSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public async Task TransformMessagesTest()
try
{
var sys = host.Services.GetRequiredService<ActorSystem>();
var testKit = new TestKit.Xunit2.TestKit(sys);
var testKit = new Akka.TestKit.Xunit2.TestKit(sys);

var probe = testKit.CreateTestProbe();
sys.EventStream.Subscribe(probe, typeof(Error));
Expand Down
2 changes: 1 addition & 1 deletion src/Akka.Hosting.Tests/Logging/LoggerConfigEnd2EndSpecs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

namespace Akka.Hosting.Tests.Logging;

public class LoggerConfigEnd2EndSpecs : TestKit.Xunit2.TestKit
public class LoggerConfigEnd2EndSpecs : Akka.TestKit.Xunit2.TestKit
{
private class CustomLoggingProvider : ILoggerProvider
{
Expand Down
63 changes: 57 additions & 6 deletions src/Akka.Hosting/ActorRegistry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Util;
using Microsoft.Extensions.Hosting;

namespace Akka.Hosting
{
Expand All @@ -23,21 +24,72 @@ public interface IRequiredActor<TActor>
/// The underlying actor resolved via <see cref="ActorRegistry"/> using the given <see cref="TActor"/> key.
/// </summary>
IActorRef ActorRef { get; }
}

/// <summary>
/// When calling from inside another <see cref="IHostedService"/>, actor registrations may not be
/// available at startup (due to Akka.NET itself being started asynchronously in another hosted service).
///
/// Instead - you should call the GetAsync method to wait for that actor to be populated into the <see cref="ActorRegistry"/>
/// by the AkkaService at startup.
/// </summary>
/// <param name="cancellationToken">Optional cancellation token.</param>
/// <returns>A Task that will return the <see cref="IActorRef"/> using the given key.</returns>
Task<IActorRef> GetAsync(CancellationToken cancellationToken = default);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the correct fix - offer a method on the IRequiredActor<TActor> interface that uses the ActorRegistry.GetAsync under the covers. Users will need to call this if they fall under the #208 edge case.

}

/// <summary>
/// INTERNAL API
/// </summary>
/// <typeparam name="TActor">The type key of the actor - corresponds to a matching entry inside the <see cref="IActorRegistry"/>.</typeparam>
public sealed class RequiredActor<TActor> : IRequiredActor<TActor>
{
private readonly IReadOnlyActorRegistry _registry;

public RequiredActor(IReadOnlyActorRegistry registry)
{
ActorRef = registry.Get<TActor>();
_registry = registry;
}

private IActorRef? _internalRef = null;

/// <inheritdoc cref="IRequiredActor{TActor}.ActorRef"/>
public IActorRef ActorRef { get; }
public IActorRef ActorRef
{
get
{
// attempt 1 - used cached value
if (_internalRef != null)
return _internalRef;

// attempt 2 - synchronously check the registry (fast path)
if (_registry.TryGet<TActor>(out _internalRef))
{
return _internalRef;
}


throw new MissingActorRegistryEntryException(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Try our best to give the users a clear signal as to why this failed and what they should do instead. We can't really solve the problem for them and the normal, sync ActorRef property works for 99.9% of cases so we don't want to remove that just to support an edge case.

$"Unable to resolve actor type [{typeof(TActor)})] - if you're using IRequiredActor<T> inside the constructor" +
$"of an IHostedService, consider using the GetAsync method instead so you can wait for the actor to be populated by the AkkaService (which runs in parallel.)");
}
}

/// <inheritdoc cref="IRequiredActor{TActor}.GetAsync"/>
public async Task<IActorRef> GetAsync(CancellationToken cancellationToken = default)
{
// attempt 1 - used cached value
if (_internalRef != null)
return _internalRef;

// attempt 2 - synchronously check the registry (fast path)
if (_registry.TryGet<TActor>(out _internalRef))
{
return _internalRef;
}

// attempt 3 - wait for the actor to be registered
return await _registry.GetAsync<TActor>(cancellationToken).ConfigureAwait(false);
}
}

/// <summary>
Expand Down Expand Up @@ -176,8 +228,7 @@ public void Register<TKey>(IActorRef actor, bool overwrite = false)
/// <remarks>
/// Have to store a collection of <see cref="WaitForActorRegistration"/>s here so each waiter gets its own cancellation token.
/// </remarks>
private readonly ConcurrentDictionary<Type, ImmutableHashSet<WaitForActorRegistration>> _actorWaiters =
new ConcurrentDictionary<Type, ImmutableHashSet<WaitForActorRegistration>>();
private readonly ConcurrentDictionary<Type, ImmutableHashSet<WaitForActorRegistration>> _actorWaiters = new();

/// <summary>
/// Attempts to register an actor with the registry.
Expand Down Expand Up @@ -245,7 +296,7 @@ public bool TryGet(Type key, out IActorRef actor)
/// <inheritdoc cref="IReadOnlyActorRegistry.GetAsync{TKey}"/>
public async Task<IActorRef> GetAsync<TKey>(CancellationToken ct = default)
{
return await GetAsync(typeof(TKey), ct);
return await GetAsync(typeof(TKey), ct).ConfigureAwait(false);
}

/// <inheritdoc cref="IReadOnlyActorRegistry.GetAsync"/>
Expand Down