diff --git a/Akka.Persistence.EventStore.sln b/Akka.Persistence.EventStore.sln index e457ed2..41e2bf7 100644 --- a/Akka.Persistence.EventStore.sln +++ b/Akka.Persistence.EventStore.sln @@ -43,8 +43,6 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Benchmarks", "Benchmarks", EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.Persistence.EventStore.Benchmarks", "src\Akka.Persistence.EventStore.Benchmarks\Akka.Persistence.EventStore.Benchmarks.csproj", "{EF1D827E-2B2B-4BA0-8733-D54CACDEE69F}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.Persistence.EventStore.Benchmark.Tests", "src\Akka.Persistence.EventStore.Benchmark.Tests\Akka.Persistence.EventStore.Benchmark.Tests.csproj", "{277FF4C2-DEB0-42E9-8A84-ADAB131F814A}" -EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -115,18 +113,6 @@ Global {EF1D827E-2B2B-4BA0-8733-D54CACDEE69F}.Release|x64.Build.0 = Release|Any CPU {EF1D827E-2B2B-4BA0-8733-D54CACDEE69F}.Release|x86.ActiveCfg = Release|Any CPU {EF1D827E-2B2B-4BA0-8733-D54CACDEE69F}.Release|x86.Build.0 = Release|Any CPU - {277FF4C2-DEB0-42E9-8A84-ADAB131F814A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {277FF4C2-DEB0-42E9-8A84-ADAB131F814A}.Debug|Any CPU.Build.0 = Debug|Any CPU - {277FF4C2-DEB0-42E9-8A84-ADAB131F814A}.Debug|x64.ActiveCfg = Debug|Any CPU - {277FF4C2-DEB0-42E9-8A84-ADAB131F814A}.Debug|x64.Build.0 = Debug|Any CPU - {277FF4C2-DEB0-42E9-8A84-ADAB131F814A}.Debug|x86.ActiveCfg = Debug|Any CPU - {277FF4C2-DEB0-42E9-8A84-ADAB131F814A}.Debug|x86.Build.0 = Debug|Any CPU - {277FF4C2-DEB0-42E9-8A84-ADAB131F814A}.Release|Any CPU.ActiveCfg = Release|Any CPU - {277FF4C2-DEB0-42E9-8A84-ADAB131F814A}.Release|Any CPU.Build.0 = Release|Any CPU - {277FF4C2-DEB0-42E9-8A84-ADAB131F814A}.Release|x64.ActiveCfg = Release|Any CPU - {277FF4C2-DEB0-42E9-8A84-ADAB131F814A}.Release|x64.Build.0 = Release|Any CPU - {277FF4C2-DEB0-42E9-8A84-ADAB131F814A}.Release|x86.ActiveCfg = Release|Any CPU - {277FF4C2-DEB0-42E9-8A84-ADAB131F814A}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -139,6 +125,5 @@ Global {DF2C9C02-9F0D-4FC8-8F72-234FD68FC918} = {F4AC94E7-D5F3-4B85-9810-A8BF02441883} {6017AE31-4718-413B-983E-EAF9D4B465C9} = {DF2C9C02-9F0D-4FC8-8F72-234FD68FC918} {EF1D827E-2B2B-4BA0-8733-D54CACDEE69F} = {6DCB3F60-66B1-44BE-AA32-C7CA4B11563D} - {277FF4C2-DEB0-42E9-8A84-ADAB131F814A} = {6DCB3F60-66B1-44BE-AA32-C7CA4B11563D} EndGlobalSection EndGlobal diff --git a/Directory.Packages.props b/Directory.Packages.props index bdd4231..1edb110 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -12,6 +12,7 @@ + diff --git a/src/Akka.Persistence.EventStore.Benchmark.Tests/Akka.Persistence.EventStore.Benchmark.Tests.csproj b/src/Akka.Persistence.EventStore.Benchmark.Tests/Akka.Persistence.EventStore.Benchmark.Tests.csproj deleted file mode 100644 index 9dc58c8..0000000 --- a/src/Akka.Persistence.EventStore.Benchmark.Tests/Akka.Persistence.EventStore.Benchmark.Tests.csproj +++ /dev/null @@ -1,18 +0,0 @@ - - - - net8.0 - enable - enable - - - - - - - - - - - - diff --git a/src/Akka.Persistence.EventStore.Benchmark.Tests/BenchActor.cs b/src/Akka.Persistence.EventStore.Benchmark.Tests/BenchActor.cs deleted file mode 100644 index b65f414..0000000 --- a/src/Akka.Persistence.EventStore.Benchmark.Tests/BenchActor.cs +++ /dev/null @@ -1,128 +0,0 @@ -using Akka.Actor; -using Akka.Util; - -namespace Akka.Persistence.EventStore.Benchmark.Tests; - -internal class BenchActor : UntypedPersistentActor -{ - private const int BatchSize = 50; - private List _batch = new(BatchSize); - private int _counter; - - public BenchActor(string persistenceId, IActorRef replyTo, int replyAfter, bool groupName) - { - PersistenceId = persistenceId + MurmurHash.StringHash(Context.Parent.Path.Name + Context.Self.Path.Name); - ReplyTo = replyTo; - ReplyAfter = replyAfter; - } - - public BenchActor(string persistenceId, IActorRef replyTo, int replyAfter) - { - PersistenceId = persistenceId; - ReplyTo = replyTo; - ReplyAfter = replyAfter; - } - - public override string PersistenceId { get; } - - public IActorRef ReplyTo { get; } - - public int ReplyAfter { get; } - - protected override void OnRecover(object message) - { - switch (message) - { - case Cmd c: - _counter++; - - if (c.Payload != _counter) - throw new ArgumentException($"Expected to receive [{_counter}] yet got: [{c.Payload}]"); - - if (_counter == ReplyAfter) - ReplyTo.Tell(c.Payload); - - break; - } - } - - protected override void OnCommand(object message) - { - switch (message) - { - case Cmd { Mode: "p" } c: - Persist( - c, - d => - { - _counter += 1; - if (d.Payload != _counter) - throw new ArgumentException($"Expected to receive [{_counter}] yet got: [{d.Payload}]"); - if (_counter == ReplyAfter) - ReplyTo.Tell(d.Payload); - }); - - break; - - case Cmd { Mode: "pb" } c: - _batch.Add(c); - - if (_batch.Count % BatchSize == 0) - { - PersistAll( - _batch, - d => - { - _counter += 1; - if (d.Payload != _counter) - throw new ArgumentException( - $"Expected to receive [{_counter}] yet got: [{d.Payload}]"); - if (_counter == ReplyAfter) - ReplyTo.Tell(d.Payload); - }); - _batch = new List(BatchSize); - } - - break; - - case Cmd { Mode: "pa" } c: - PersistAsync( - c, - d => - { - _counter += 1; - if (d.Payload != _counter) - throw new ArgumentException($"Expected to receive [{_counter}] yet got: [{d.Payload}]"); - if (_counter == ReplyAfter) - ReplyTo.Tell(d.Payload); - }); - - break; - - case Cmd { Mode: "pba" } c: - _batch.Add(c); - - if (_batch.Count % BatchSize == 0) - { - PersistAllAsync( - _batch, - d => - { - _counter += 1; - if (d.Payload != _counter) - throw new ArgumentException( - $"Expected to receive [{_counter}] yet got: [{d.Payload}]"); - if (_counter == ReplyAfter) - ReplyTo.Tell(d.Payload); - }); - _batch = new List(BatchSize); - } - - break; - - case ResetCounter: - _counter = 0; - break; - } - } -} \ No newline at end of file diff --git a/src/Akka.Persistence.EventStore.Benchmark.Tests/Cmd.cs b/src/Akka.Persistence.EventStore.Benchmark.Tests/Cmd.cs deleted file mode 100644 index 50553fa..0000000 --- a/src/Akka.Persistence.EventStore.Benchmark.Tests/Cmd.cs +++ /dev/null @@ -1,14 +0,0 @@ -namespace Akka.Persistence.EventStore.Benchmark.Tests; - -public class Cmd -{ - public Cmd(string mode, int payload) - { - Mode = mode; - Payload = payload; - } - - public string Mode { get; } - - public int Payload { get; } -} \ No newline at end of file diff --git a/src/Akka.Persistence.EventStore.Benchmark.Tests/EventStoreJournalPerfSpec.cs b/src/Akka.Persistence.EventStore.Benchmark.Tests/EventStoreJournalPerfSpec.cs deleted file mode 100644 index 0ced68e..0000000 --- a/src/Akka.Persistence.EventStore.Benchmark.Tests/EventStoreJournalPerfSpec.cs +++ /dev/null @@ -1,520 +0,0 @@ -using System.Collections.Immutable; -using System.Diagnostics; -using Akka.Actor; -using Akka.Configuration; -using Akka.Persistence.EventStore.Tests; -using Akka.Routing; -using Akka.TestKit; -using Akka.Util.Internal; -using JetBrains.dotMemoryUnit; -using JetBrains.dotMemoryUnit.Kernel; -using Xunit; -using Xunit.Abstractions; - -namespace Akka.Persistence.EventStore.Benchmark.Tests; - -[Collection(nameof(EventStorePersistenceBenchmark))] -public sealed class EventStoreJournalPerfSpec : Akka.TestKit.Xunit2.TestKit -{ - // Number of measurement iterations each test will be run. - private const int MeasurementIterations = 10; - - // Number of messages sent to the PersistentActor under test for each test iteration - private readonly int _eventsCount; - - private readonly TimeSpan _expectDuration; - private readonly TestProbe _testProbe; - - public EventStoreJournalPerfSpec(ITestOutputHelper output, EventStoreContainer fixture) - : base(Configuration(fixture), nameof(EventStoreJournalPerfSpec), output) - { - ThreadPool.SetMinThreads(12, 12); - _eventsCount = TestConstants.DockerNumMessages; - _expectDuration = TimeSpan.FromSeconds(40); - _testProbe = CreateTestProbe(); - } - - private static Config Configuration(EventStoreContainer fixture) - { - return ConfigurationFactory.ParseString( - $$""" - - akka.persistence { - publish-plugin-commands = on - journal { - plugin = "akka.persistence.journal.eventstore" - eventstore { - auto-initialize = true - connection-string = "{{fixture.ConnectionString}}" - } - } - } - """) - .WithFallback(Persistence.DefaultConfig()) - .WithFallback(EventStorePersistence.DefaultConfiguration); - } - - private IReadOnlyList Commands => Enumerable.Range(1, _eventsCount).ToList(); - - private IActorRef BenchActor(string pid) - => Sys.ActorOf(Props.Create(() => new BenchActor(pid, _testProbe, _eventsCount))); - - private (IActorRef aut, TestProbe probe) BenchActorNewProbe(string pid) - { - var tp = CreateTestProbe(); - return (Sys.ActorOf(Props.Create(() => new BenchActor(pid, tp, _eventsCount))), tp); - } - - private (IActorRef aut, TestProbe probe) BenchActorNewProbeGroup(string pid, int numActors, int numMessages) - { - var tp = CreateTestProbe(); - return (Sys.ActorOf( - Props - .Create(() => new BenchActor(pid, tp, numMessages, false)) - .WithRouter(new RoundRobinPool(numActors))), tp); - } - - private async Task FeedAndExpectLastRouterSetAsync( - (IActorRef actor, TestProbe probe) autSet, - string mode, - IReadOnlyList commands, - int numExpect) - { - commands.ForEach(c => autSet.actor.Tell(new Broadcast(new Cmd(mode, c)))); - - for (var i = 0; i < numExpect; i++) - await autSet.probe.ExpectMsgAsync(commands[^1], _expectDuration); - } - - private async Task FeedAndExpectLastAsync(IActorRef actor, string mode, IReadOnlyList commands) - { - commands.ForEach(c => actor.Tell(new Cmd(mode, c))); - await _testProbe.ExpectMsgAsync(commands[^1], _expectDuration); - } - - private async Task FeedAndExpectLastSpecificAsync( - (IActorRef actor, TestProbe probe) aut, - string mode, - IReadOnlyList commands) - { - commands.ForEach(c => aut.actor.Tell(new Cmd(mode, c))); - - await aut.probe.ExpectMsgAsync(commands[^1], _expectDuration); - } - - private async Task MeasureAsync(Func msg, Func block) - { - var measurements = new List(MeasurementIterations); - - await block(); // warm-up - - var i = 0; - while (i < MeasurementIterations) - { - var sw = Stopwatch.StartNew(); - await block(); - sw.Stop(); - measurements.Add(sw.Elapsed); - Output.WriteLine(msg(sw.Elapsed)); - i++; - } - - var avgTime = measurements.Select(c => c.TotalMilliseconds).Sum() / MeasurementIterations; - var msgPerSec = _eventsCount / avgTime * 1000; - - Output.WriteLine($"Average time: {avgTime} ms, {msgPerSec} msg/sec"); - } - - private async Task MeasureGroupAsync(Func msg, Func block, int numMsg, int numGroup) - { - var measurements = new List(MeasurementIterations); - - await block(); - await block(); // warm-up - - var i = 0; - while (i < MeasurementIterations) - { - var sw = Stopwatch.StartNew(); - await block(); - sw.Stop(); - measurements.Add(sw.Elapsed); - Output.WriteLine(msg(sw.Elapsed)); - i++; - } - - var avgTime = measurements.Select(c => c.TotalMilliseconds).Sum() / MeasurementIterations; - var msgPerSec = numMsg / avgTime * 1000; - var msgPerSecTotal = numMsg * numGroup / avgTime * 1000; - - Output.WriteLine( - $"Workers: {numGroup} , Average time: {avgTime} ms, {msgPerSec} msg/sec/actor, {msgPerSecTotal} total msg/sec."); - } - - [DotMemoryUnit(CollectAllocations = true, FailIfRunWithoutSupport = false)] - [Fact] - public void DotMemory_PersistenceActor_performance_must_measure_Persist() - { - dotMemory.Check(); - - var p1 = BenchActor("DotMemoryPersistPid"); - - dotMemory.Check( - _ => - { -#pragma warning disable xUnit1031 - MeasureAsync( - d => $"Persist()-ing {_eventsCount} took {d.TotalMilliseconds} ms", - async () => - { - await FeedAndExpectLastAsync(p1, "p", Commands); - p1.Tell(ResetCounter.Instance); - }).GetAwaiter().GetResult(); -#pragma warning restore xUnit1031 - } - ); - - dotMemory.Check( - _ => - { -#pragma warning disable xUnit1031 - MeasureAsync( - d => $"Persist()-ing {_eventsCount} took {d.TotalMilliseconds} ms", - async () => - { - await FeedAndExpectLastAsync(p1, "p", Commands); - p1.Tell(ResetCounter.Instance); - }).GetAwaiter().GetResult(); -#pragma warning restore xUnit1031 - } - ); - - dotMemoryApi.SaveCollectedData(@"c:\temp\dotmemory"); - } - - [DotMemoryUnit(CollectAllocations = true, FailIfRunWithoutSupport = false)] - [Fact] - public void DotMemory_PersistenceActor_performance_must_measure_PersistGroup400() - { - dotMemory.Check(); - - const int numGroup = 400; - var numCommands = Math.Min(_eventsCount / 100, 500); - - dotMemory.Check( - _ => - { -#pragma warning disable xUnit1031 - RunGroupBenchmarkAsync(numGroup, numCommands).GetAwaiter().GetResult(); -#pragma warning restore xUnit1031 - } - ); - - dotMemory.Check( - _ => - { -#pragma warning disable xUnit1031 - RunGroupBenchmarkAsync(numGroup, numCommands).GetAwaiter().GetResult(); -#pragma warning restore xUnit1031 - } - ); - - dotMemoryApi.SaveCollectedData(@"c:\temp\dotmemory"); - } - - [Fact] - public async Task PersistenceActor_performance_must_measure_Persist() - { - var p1 = BenchActor("PersistPid"); - - await MeasureAsync( - d => - $"Persist()-ing {_eventsCount} took {d.TotalMilliseconds} ms", - async () => - { - await FeedAndExpectLastAsync(p1, "p", Commands); - p1.Tell(ResetCounter.Instance); - }); - } - - [Fact] - public async Task PersistenceActor_performance_must_measure_PersistGroup10() - { - const int numGroup = 10; - var numCommands = Math.Min(_eventsCount / 10, 1000); - await RunGroupBenchmarkAsync(numGroup, numCommands); - } - - [Fact] - public async Task PersistenceActor_performance_must_measure_PersistGroup25() - { - const int numGroup = 25; - var numCommands = Math.Min(_eventsCount / 25, 1000); - await RunGroupBenchmarkAsync(numGroup, numCommands); - } - - [Fact] - public async Task PersistenceActor_performance_must_measure_PersistGroup50() - { - const int numGroup = 50; - var numCommands = Math.Min(_eventsCount / 50, 1000); - await RunGroupBenchmarkAsync(numGroup, numCommands); - } - - [Fact] - public async Task PersistenceActor_performance_must_measure_PersistGroup100() - { - const int numGroup = 100; - var numCommands = Math.Min(_eventsCount / 100, 1000); - await RunGroupBenchmarkAsync(numGroup, numCommands); - } - - [Fact] - public async Task PersistenceActor_performance_must_measure_PersistGroup200() - { - const int numGroup = 200; - var numCommands = Math.Min(_eventsCount / 100, 500); - await RunGroupBenchmarkAsync(numGroup, numCommands); - } - - [Fact] - public async Task PersistenceActor_performance_must_measure_PersistGroup400() - { - const int numGroup = 400; - var numCommands = Math.Min(_eventsCount / 100, 500); - await RunGroupBenchmarkAsync(numGroup, numCommands); - } - - private async Task RunGroupBenchmarkAsync(int numGroup, int numCommands) - { - var p1 = BenchActorNewProbeGroup("GroupPersistPid" + numGroup, numGroup, numCommands); - await MeasureGroupAsync( - d => $"Persist()-ing {numCommands} * {numGroup} took {d.TotalMilliseconds} ms", - async () => - { - await FeedAndExpectLastRouterSetAsync( - p1, - "p", - Commands.Take(numCommands).ToImmutableList(), - numGroup); - - p1.aut.Tell(new Broadcast(ResetCounter.Instance)); - }, - numCommands, - numGroup - ); - } - - [Fact] - public async Task PersistenceActor_performance_must_measure_PersistAll() - { - var p1 = BenchActor("PersistAllPid"); - await MeasureAsync( - d => $"PersistAll()-ing {_eventsCount} took {d.TotalMilliseconds} ms", - async () => - { - await FeedAndExpectLastAsync(p1, "pb", Commands); - p1.Tell(ResetCounter.Instance); - }); - } - - [Fact] - public async Task PersistenceActor_performance_must_measure_PersistAsync() - { - var p1 = BenchActor("PersistAsyncPid"); - await MeasureAsync( - d => $"PersistAsync()-ing {_eventsCount} took {d.TotalMilliseconds} ms", - async () => - { - await FeedAndExpectLastAsync(p1, "pa", Commands); - p1.Tell(ResetCounter.Instance); - }); - } - - [Fact] - public async Task PersistenceActor_performance_must_measure_PersistAllAsync() - { - var p1 = BenchActor("PersistAllAsyncPid"); - await MeasureAsync( - d => $"PersistAllAsync()-ing {_eventsCount} took {d.TotalMilliseconds} ms", - async () => - { - await FeedAndExpectLastAsync(p1, "pba", Commands); - p1.Tell(ResetCounter.Instance); - }); - } - - [Fact] - public async Task PersistenceActor_performance_must_measure_Recovering() - { - var p1 = BenchActor("PersistRecoverPid"); - - await FeedAndExpectLastAsync(p1, "p", Commands); - - await MeasureAsync( - d => $"Recovering {_eventsCount} took {d.TotalMilliseconds} ms", - async () => - { - BenchActor("PersistRecoverPid"); - await _testProbe.ExpectMsgAsync(Commands[^1], _expectDuration); - }); - } - - [Fact] - public async Task PersistenceActor_performance_must_measure_RecoveringTwo() - { - var p1 = BenchActorNewProbe("DoublePersistRecoverPid1"); - var p2 = BenchActorNewProbe("DoublePersistRecoverPid2"); - - await FeedAndExpectLastSpecificAsync(p1, "p", Commands); - await FeedAndExpectLastSpecificAsync(p2, "p", Commands); - - await MeasureGroupAsync( - d => $"Recovering {_eventsCount} took {d.TotalMilliseconds} ms", - async () => - { - async Task Task1() - { - var (_, probe) = BenchActorNewProbe("DoublePersistRecoverPid1"); - await probe.ExpectMsgAsync(Commands[^1], _expectDuration); - } - - async Task Task2() - { - var (_, probe) = BenchActorNewProbe("DoublePersistRecoverPid2"); - await probe.ExpectMsgAsync(Commands[^1], _expectDuration); - } - - await Task.WhenAll(Task1(), Task2()); - }, - _eventsCount, - 2); - } - - [Fact] - public async Task PersistenceActor_performance_must_measure_RecoveringFour() - { - var p1 = BenchActorNewProbe("QuadPersistRecoverPid1"); - var p2 = BenchActorNewProbe("QuadPersistRecoverPid2"); - var p3 = BenchActorNewProbe("QuadPersistRecoverPid3"); - var p4 = BenchActorNewProbe("QuadPersistRecoverPid4"); - - await FeedAndExpectLastSpecificAsync(p1, "p", Commands); - await FeedAndExpectLastSpecificAsync(p2, "p", Commands); - await FeedAndExpectLastSpecificAsync(p3, "p", Commands); - await FeedAndExpectLastSpecificAsync(p4, "p", Commands); - - await MeasureGroupAsync( - d => $"Recovering {_eventsCount} took {d.TotalMilliseconds} ms", - async () => - { - async Task Task1() - { - var (_, probe) = BenchActorNewProbe("QuadPersistRecoverPid1"); - await probe.ExpectMsgAsync(Commands[^1], _expectDuration); - } - - async Task Task2() - { - var (_, probe) = BenchActorNewProbe("QuadPersistRecoverPid2"); - await probe.ExpectMsgAsync(Commands[^1], _expectDuration); - } - - async Task Task3() - { - var (_, probe) = BenchActorNewProbe("QuadPersistRecoverPid3"); - await probe.ExpectMsgAsync(Commands[^1], _expectDuration); - } - - async Task Task4() - { - var (_, probe) = BenchActorNewProbe("QuadPersistRecoverPid4"); - await probe.ExpectMsgAsync(Commands[^1], _expectDuration); - } - - await Task.WhenAll(Task1(), Task2(), Task3(), Task4()); - }, - _eventsCount, - 4); - } - - [Fact] - public async Task PersistenceActor_performance_must_measure_Recovering8() - { - var p1 = BenchActorNewProbe("OctPersistRecoverPid1"); - var p2 = BenchActorNewProbe("OctPersistRecoverPid2"); - var p3 = BenchActorNewProbe("OctPersistRecoverPid3"); - var p4 = BenchActorNewProbe("OctPersistRecoverPid4"); - var p5 = BenchActorNewProbe("OctPersistRecoverPid5"); - var p6 = BenchActorNewProbe("OctPersistRecoverPid6"); - var p7 = BenchActorNewProbe("OctPersistRecoverPid7"); - var p8 = BenchActorNewProbe("OctPersistRecoverPid8"); - - await FeedAndExpectLastSpecificAsync(p1, "p", Commands); - await FeedAndExpectLastSpecificAsync(p2, "p", Commands); - await FeedAndExpectLastSpecificAsync(p3, "p", Commands); - await FeedAndExpectLastSpecificAsync(p4, "p", Commands); - await FeedAndExpectLastSpecificAsync(p5, "p", Commands); - await FeedAndExpectLastSpecificAsync(p6, "p", Commands); - await FeedAndExpectLastSpecificAsync(p7, "p", Commands); - await FeedAndExpectLastSpecificAsync(p8, "p", Commands); - - await MeasureGroupAsync( - d => - $"Recovering {_eventsCount} took {d.TotalMilliseconds} ms , {_eventsCount * 8 / d.TotalMilliseconds * 1000} total msg/sec", - async () => - { - async Task Task1() - { - var (_, probe) = BenchActorNewProbe("OctPersistRecoverPid1"); - await probe.ExpectMsgAsync(Commands[^1], _expectDuration); - } - - async Task Task2() - { - var (_, probe) = BenchActorNewProbe("OctPersistRecoverPid2"); - await probe.ExpectMsgAsync(Commands[^1], _expectDuration); - } - - async Task Task3() - { - var (_, probe) = BenchActorNewProbe("OctPersistRecoverPid3"); - await probe.ExpectMsgAsync(Commands[^1], _expectDuration); - } - - async Task Task4() - { - var (_, probe) = BenchActorNewProbe("OctPersistRecoverPid4"); - await probe.ExpectMsgAsync(Commands[^1], _expectDuration); - } - - async Task Task5() - { - var (_, probe) = BenchActorNewProbe("OctPersistRecoverPid5"); - await probe.ExpectMsgAsync(Commands[^1], _expectDuration); - } - - async Task Task6() - { - var (_, probe) = BenchActorNewProbe("OctPersistRecoverPid6"); - await probe.ExpectMsgAsync(Commands[^1], _expectDuration); - } - - async Task Task7() - { - var (_, probe) = BenchActorNewProbe("OctPersistRecoverPid7"); - await probe.ExpectMsgAsync(Commands[^1], _expectDuration); - } - - async Task Task8() - { - var (_, probe) = BenchActorNewProbe("OctPersistRecoverPid8"); - await probe.ExpectMsgAsync(Commands[^1], _expectDuration); - } - - await Task.WhenAll(Task1(), Task2(), Task3(), Task4(), Task5(), Task6(), Task7(), Task8()); - }, - _eventsCount, - 8); - } -} \ No newline at end of file diff --git a/src/Akka.Persistence.EventStore.Benchmark.Tests/EventStorePersistenceBenchmark.cs b/src/Akka.Persistence.EventStore.Benchmark.Tests/EventStorePersistenceBenchmark.cs deleted file mode 100644 index 0ad1358..0000000 --- a/src/Akka.Persistence.EventStore.Benchmark.Tests/EventStorePersistenceBenchmark.cs +++ /dev/null @@ -1,7 +0,0 @@ -using Akka.Persistence.EventStore.Tests; -using Xunit; - -namespace Akka.Persistence.EventStore.Benchmark.Tests; - -[CollectionDefinition(nameof(EventStorePersistenceBenchmark), DisableParallelization = true)] -public sealed class EventStorePersistenceBenchmark : ICollectionFixture; \ No newline at end of file diff --git a/src/Akka.Persistence.EventStore.Benchmark.Tests/ResetCounter.cs b/src/Akka.Persistence.EventStore.Benchmark.Tests/ResetCounter.cs deleted file mode 100644 index c11ce38..0000000 --- a/src/Akka.Persistence.EventStore.Benchmark.Tests/ResetCounter.cs +++ /dev/null @@ -1,7 +0,0 @@ -namespace Akka.Persistence.EventStore.Benchmark.Tests; - -internal class ResetCounter -{ - private ResetCounter() { } - public static ResetCounter Instance { get; } = new(); -} \ No newline at end of file diff --git a/src/Akka.Persistence.EventStore.Benchmark.Tests/TestConstants.cs b/src/Akka.Persistence.EventStore.Benchmark.Tests/TestConstants.cs deleted file mode 100644 index a032bca..0000000 --- a/src/Akka.Persistence.EventStore.Benchmark.Tests/TestConstants.cs +++ /dev/null @@ -1,7 +0,0 @@ -namespace Akka.Persistence.EventStore.Benchmark.Tests; - -public static class TestConstants -{ - public const int NumMessages = 1000; - public const int DockerNumMessages = 1000; -} \ No newline at end of file diff --git a/src/Akka.Persistence.EventStore.Benchmarks/Akka.Persistence.EventStore.Benchmarks.csproj b/src/Akka.Persistence.EventStore.Benchmarks/Akka.Persistence.EventStore.Benchmarks.csproj index 3de3db0..7a4c5ba 100644 --- a/src/Akka.Persistence.EventStore.Benchmarks/Akka.Persistence.EventStore.Benchmarks.csproj +++ b/src/Akka.Persistence.EventStore.Benchmarks/Akka.Persistence.EventStore.Benchmarks.csproj @@ -13,6 +13,7 @@ + diff --git a/src/Akka.Persistence.EventStore.Benchmarks/BasePersistBenchmarks.cs b/src/Akka.Persistence.EventStore.Benchmarks/BasePersistBenchmarks.cs new file mode 100644 index 0000000..65312c8 --- /dev/null +++ b/src/Akka.Persistence.EventStore.Benchmarks/BasePersistBenchmarks.cs @@ -0,0 +1,135 @@ +using System.Collections.Immutable; +using Akka.Actor; +using Akka.Persistence.EventStore.Benchmarks.BenchmarkActors; +using Akka.Persistence.EventStore.Benchmarks.Columns; +using Akka.Routing; +using Akka.TestKit; +using Akka.TestKit.Xunit2; +using Akka.Util.Internal; +using BenchmarkDotNet.Attributes; +using BenchmarkDotNet.Configs; +using BenchmarkDotNet.Diagnosers; +using BenchmarkDotNet.Loggers; + +namespace Akka.Persistence.EventStore.Benchmarks; + +[Config(typeof(Config))] +public abstract class BasePersistBenchmarks +{ + private class Config : ManualConfig + { + public Config() + { + AddDiagnoser(MemoryDiagnoser.Default); + AddLogger(ConsoleLogger.Default); + AddColumn(new TotalMessagesPerSecondColumn()); + AddColumn(new MessagesPerHandlerPerSecondColumn()); + } + } + + private static readonly TimeSpan ExpectDuration = TimeSpan.FromSeconds(5); + + private EventStoreBenchmarkFixture.CleanActorSystem? _sys; + + private IBenchmarkProxy _benchmarkProxy = null!; + + [GlobalSetup] + public async Task Setup() + { + _sys = await EventStoreBenchmarkFixture.CreateActorSystemWithCleanDb("system"); + } + + [GlobalCleanup] + public async Task Cleanup() + { + if (_sys is not null) + await _sys.DisposeAsync(); + } + + [IterationSetup] + public void SetupActors() + { + var testProbe = new TestProbe( + _sys!.System, + new XunitAssertions()); + + var isGrouped = Configuration.NumberOfHandlers > 1; + + var benchActorProps = Props.Create(() => new BenchActor( + $"persist-{Guid.NewGuid()}", + testProbe, + Configuration.NumberOfMessagesPerIteration, + isGrouped)); + + if (isGrouped) + benchActorProps = benchActorProps.WithRouter(new RoundRobinPool(Configuration.NumberOfHandlers)); + + var benchActor = _sys.System.ActorOf(benchActorProps); + + _benchmarkProxy = isGrouped + ? new RoundRobinBenchmarkProxy(benchActor, testProbe, Configuration.Commands[^1], + Configuration.NumberOfHandlers) + : new SingleActorBenchmarkProxy(benchActor, testProbe, Configuration.Commands[^1]); + } + + [ParamsSource(nameof(GetNumberOfEventsConfiguration))] + public MessagesPerSecondConfiguration Configuration { get; set; } = null!; + + public static IImmutableList GetNumberOfEventsConfiguration() + { + const int numberOfEvents = 1000; + var configurationActors = ImmutableList.Create(1, 10, 25, 100, 200, 400); + + return configurationActors + .Select(x => new MessagesPerSecondConfiguration( + numberOfEvents / x, + x)) + .ToImmutableList(); + } + + protected async Task RunBenchmark(string mode) + { + Configuration.Commands.ForEach(cmd => _benchmarkProxy.Send(mode, cmd)); + + await _benchmarkProxy.ExpectDone(); + } + + private interface IBenchmarkProxy + { + void Send(string mode, int cmd); + + Task ExpectDone(); + } + + private class SingleActorBenchmarkProxy(IActorRef benchActor, TestProbe testProbe, int lastCommand) + : IBenchmarkProxy + { + public void Send(string mode, int cmd) + { + benchActor.Tell(new BenchActor.Commands.Cmd(mode, cmd)); + } + + public async Task ExpectDone() + { + await testProbe.ExpectMsgAsync(lastCommand, ExpectDuration); + } + } + + private class RoundRobinBenchmarkProxy( + IActorRef broadcaster, + TestProbe testProbe, + int lastCommand, + int numberOfActors) : IBenchmarkProxy + { + public void Send(string mode, int cmd) + { + broadcaster.Tell(new Broadcast(new BenchActor.Commands.Cmd(mode, cmd))); + } + + public async Task ExpectDone() + { + for (var i = 0; i < numberOfActors; i++) + await testProbe.ExpectMsgAsync(lastCommand, ExpectDuration); + } + } +} \ No newline at end of file diff --git a/src/Akka.Persistence.EventStore.Benchmarks/BenchmarkActors/BenchActor.cs b/src/Akka.Persistence.EventStore.Benchmarks/BenchmarkActors/BenchActor.cs new file mode 100644 index 0000000..55d9d5d --- /dev/null +++ b/src/Akka.Persistence.EventStore.Benchmarks/BenchmarkActors/BenchActor.cs @@ -0,0 +1,93 @@ +using Akka.Actor; + +namespace Akka.Persistence.EventStore.Benchmarks.BenchmarkActors; + +internal class BenchActor(string persistenceId, IActorRef replyTo, int replyAfter, bool grouped) + : UntypedPersistentActor +{ + public static class Commands + { + public record Cmd(string Mode, int Payload); + } + + private const int BatchSize = 50; + private List _batch = new(BatchSize); + private int _counter; + + public override string PersistenceId { get; } = grouped + ? $"{persistenceId}{Guid.NewGuid()}" + : persistenceId; + + protected override void OnRecover(object message) + { + switch (message) + { + case Commands.Cmd c: + HandleCounter(c.Payload); + + break; + } + } + + protected override void OnCommand(object message) + { + switch (message) + { + case Commands.Cmd { Mode: "p" } c: + Persist( + c, + d => { HandleCounter(d.Payload); }); + + break; + + case Commands.Cmd { Mode: "pb" } c: + _batch.Add(c); + + if (_batch.Count % BatchSize == 0 || c.Payload == replyAfter) + { + PersistAll( + _batch, + d => { HandleCounter(d.Payload); }); + + _batch = new List(BatchSize); + } + + break; + + case Commands.Cmd { Mode: "pa" } c: + PersistAsync( + c, + d => { HandleCounter(d.Payload); }); + + break; + + case Commands.Cmd { Mode: "pba" } c: + _batch.Add(c); + + if (_batch.Count % BatchSize == 0 || c.Payload == replyAfter) + { + PersistAllAsync( + _batch, + d => { HandleCounter(d.Payload); }); + + _batch = new List(BatchSize); + } + + break; + } + } + + private void HandleCounter(int payload) + { + _counter++; + + if (payload != _counter) + { + throw new ArgumentException( + $"Expected to receive [{_counter}] yet got: [{payload}]"); + } + + if (_counter == replyAfter) + replyTo.Tell(payload); + } +} \ No newline at end of file diff --git a/src/Akka.Persistence.EventStore.Benchmarks/EventTagger.cs b/src/Akka.Persistence.EventStore.Benchmarks/BenchmarkActors/EventTagger.cs similarity index 92% rename from src/Akka.Persistence.EventStore.Benchmarks/EventTagger.cs rename to src/Akka.Persistence.EventStore.Benchmarks/BenchmarkActors/EventTagger.cs index 60baac3..94838de 100644 --- a/src/Akka.Persistence.EventStore.Benchmarks/EventTagger.cs +++ b/src/Akka.Persistence.EventStore.Benchmarks/BenchmarkActors/EventTagger.cs @@ -1,6 +1,6 @@ using Akka.Persistence.Journal; -namespace Akka.Persistence.EventStore.Benchmarks; +namespace Akka.Persistence.EventStore.Benchmarks.BenchmarkActors; public sealed class EventTagger : IWriteEventAdapter { diff --git a/src/Akka.Persistence.EventStore.Benchmarks/InitializeDbActor.cs b/src/Akka.Persistence.EventStore.Benchmarks/BenchmarkActors/InitializeDbActor.cs similarity index 96% rename from src/Akka.Persistence.EventStore.Benchmarks/InitializeDbActor.cs rename to src/Akka.Persistence.EventStore.Benchmarks/BenchmarkActors/InitializeDbActor.cs index ce6ea65..b7b9287 100644 --- a/src/Akka.Persistence.EventStore.Benchmarks/InitializeDbActor.cs +++ b/src/Akka.Persistence.EventStore.Benchmarks/BenchmarkActors/InitializeDbActor.cs @@ -1,7 +1,7 @@ using Akka.Actor; using Akka.Event; -namespace Akka.Persistence.EventStore.Benchmarks; +namespace Akka.Persistence.EventStore.Benchmarks.BenchmarkActors; public class InitializeDbActor : ReceivePersistentActor { diff --git a/src/Akka.Persistence.EventStore.Benchmarks/Columns/MessagesPerHandlerPerSecondColumn.cs b/src/Akka.Persistence.EventStore.Benchmarks/Columns/MessagesPerHandlerPerSecondColumn.cs new file mode 100644 index 0000000..3174705 --- /dev/null +++ b/src/Akka.Persistence.EventStore.Benchmarks/Columns/MessagesPerHandlerPerSecondColumn.cs @@ -0,0 +1,16 @@ +using BenchmarkDotNet.Running; + +namespace Akka.Persistence.EventStore.Benchmarks.Columns; + +public class MessagesPerHandlerPerSecondColumn : MessagesPerSecondColumn +{ + public override string Id => "msg/handler/sec"; + public override string ColumnName => "Msg/sec/handler"; + public override int PriorityInCategory => 1; + public override string Legend => "Number of messages per handler per second"; + + protected override double GetWorkersMultiplier(BenchmarkCase benchmark, MessagesPerSecondAttribute? config) + { + return config?.GetNumberOfMessagesPerIteration(benchmark) ?? 1; + } +} \ No newline at end of file diff --git a/src/Akka.Persistence.EventStore.Benchmarks/Columns/MessagesPerSecondAttribute.cs b/src/Akka.Persistence.EventStore.Benchmarks/Columns/MessagesPerSecondAttribute.cs new file mode 100644 index 0000000..3306311 --- /dev/null +++ b/src/Akka.Persistence.EventStore.Benchmarks/Columns/MessagesPerSecondAttribute.cs @@ -0,0 +1,67 @@ +using BenchmarkDotNet.Running; + +namespace Akka.Persistence.EventStore.Benchmarks.Columns; + +public class MessagesPerSecondAttribute : Attribute +{ + private readonly Func _getNumberOfMessagesPerIteration; + private readonly Func _getNumberOfHandlers; + + public MessagesPerSecondAttribute(int numberOfMessagesPerIteration, string getNumberOfHandlersFromParameter) + { + _getNumberOfMessagesPerIteration = _ => numberOfMessagesPerIteration; + + _getNumberOfHandlers = benchmark => GetParameterValue(benchmark, getNumberOfHandlersFromParameter, ParameterAsInt); + } + + public MessagesPerSecondAttribute(string configurationParameter) + { + _getNumberOfMessagesPerIteration = benchmark => + { + var configuration = GetParameterValue(benchmark, configurationParameter, ParameterAsConfiguration); + + return configuration.NumberOfMessagesPerIteration; + }; + + _getNumberOfHandlers = benchmark => + { + var configuration = GetParameterValue(benchmark, configurationParameter, ParameterAsConfiguration); + + return configuration.NumberOfHandlers; + }; + } + + public int GetNumberOfMessagesPerIteration(BenchmarkCase benchmark) + { + return _getNumberOfMessagesPerIteration(benchmark); + } + + public int GetNumberOfHandlers(BenchmarkCase benchmark) + { + return _getNumberOfHandlers(benchmark); + } + + private static T GetParameterValue( + BenchmarkCase benchmark, + string parameterName, + Func parse) + { + var parameterValue = benchmark + .Parameters + .Items + .FirstOrDefault(x => x.Name == parameterName) + ?.Value; + + return parse(parameterValue); + } + + private static int ParameterAsInt(object? parameterValue) + { + return parameterValue is int value ? value : 1; + } + + private static MessagesPerSecondConfiguration ParameterAsConfiguration(object? parameterValue) + { + return parameterValue is MessagesPerSecondConfiguration value ? value : new MessagesPerSecondConfiguration(1, 1); + } +} \ No newline at end of file diff --git a/src/Akka.Persistence.EventStore.Benchmarks/Columns/MessagesPerSecondColumn.cs b/src/Akka.Persistence.EventStore.Benchmarks/Columns/MessagesPerSecondColumn.cs new file mode 100644 index 0000000..cfe7440 --- /dev/null +++ b/src/Akka.Persistence.EventStore.Benchmarks/Columns/MessagesPerSecondColumn.cs @@ -0,0 +1,57 @@ +using System.Collections.Immutable; +using System.Reflection; +using BenchmarkDotNet.Columns; +using BenchmarkDotNet.Engines; +using BenchmarkDotNet.Reports; +using BenchmarkDotNet.Running; + +namespace Akka.Persistence.EventStore.Benchmarks.Columns; + +public abstract class MessagesPerSecondColumn : IColumn +{ + public abstract string Id { get; } + public abstract string ColumnName { get; } + public bool AlwaysShow => true; + public ColumnCategory Category => ColumnCategory.Custom; + public abstract int PriorityInCategory { get; } + public bool IsNumeric => true; + public UnitType UnitType => UnitType.Dimensionless; + public abstract string Legend { get; } + + public string GetValue(Summary summary, BenchmarkCase benchmarkCase) => + GetValue(summary, benchmarkCase, SummaryStyle.Default); + + public string GetValue(Summary summary, BenchmarkCase benchmarkCase, SummaryStyle style) + { + if (!summary.HasReport(benchmarkCase)) + return ""; + + var configuration = benchmarkCase + .Descriptor + .WorkloadMethod + .GetCustomAttribute(); + + var measurements = summary[benchmarkCase]! + .AllMeasurements + .Where(x => x.IterationMode == IterationMode.Workload) + .ToImmutableList(); + + var totalNanoSeconds = measurements + .Sum(x => x.Nanoseconds); + + var totalOperations = measurements + .Sum(x => x.Operations); + + var nanosecondsPerOperation = totalNanoSeconds / totalOperations; + + var msgPerSecond = GetWorkersMultiplier(benchmarkCase, configuration) / (nanosecondsPerOperation / 1_000_000_000); + + return msgPerSecond.ToString("N0"); + } + + public bool IsDefault(Summary summary, BenchmarkCase benchmarkCase) => false; + + public bool IsAvailable(Summary summary) => true; + + protected abstract double GetWorkersMultiplier(BenchmarkCase benchmark, MessagesPerSecondAttribute? config); +} \ No newline at end of file diff --git a/src/Akka.Persistence.EventStore.Benchmarks/Columns/MessagesPerSecondConfiguration.cs b/src/Akka.Persistence.EventStore.Benchmarks/Columns/MessagesPerSecondConfiguration.cs new file mode 100644 index 0000000..ef95c1d --- /dev/null +++ b/src/Akka.Persistence.EventStore.Benchmarks/Columns/MessagesPerSecondConfiguration.cs @@ -0,0 +1,26 @@ +using System.Collections.Immutable; + +namespace Akka.Persistence.EventStore.Benchmarks.Columns; + +public class MessagesPerSecondConfiguration +{ + public MessagesPerSecondConfiguration(int numberOfMessagesPerIteration, int numberOfHandlers) + { + NumberOfMessagesPerIteration = numberOfMessagesPerIteration; + NumberOfHandlers = numberOfHandlers; + + Commands = Enumerable + .Range(1, numberOfMessagesPerIteration) + .Select(x => x) + .ToImmutableList(); + } + + public int NumberOfMessagesPerIteration { get; } + public int NumberOfHandlers { get; } + public IImmutableList Commands { get; } + + public override string ToString() + { + return $"m/i: {NumberOfMessagesPerIteration}, h: {NumberOfHandlers}"; + } +} \ No newline at end of file diff --git a/src/Akka.Persistence.EventStore.Benchmarks/Columns/TotalMessagesPerSecondColumn.cs b/src/Akka.Persistence.EventStore.Benchmarks/Columns/TotalMessagesPerSecondColumn.cs new file mode 100644 index 0000000..26d18ea --- /dev/null +++ b/src/Akka.Persistence.EventStore.Benchmarks/Columns/TotalMessagesPerSecondColumn.cs @@ -0,0 +1,16 @@ +using BenchmarkDotNet.Running; + +namespace Akka.Persistence.EventStore.Benchmarks.Columns; + +public class TotalMessagesPerSecondColumn : MessagesPerSecondColumn +{ + public override string Id => "total_msg/sec"; + public override string ColumnName => "Total msg/sec"; + public override int PriorityInCategory => 0; + public override string Legend => "Total number of messages handled per second"; + + protected override double GetWorkersMultiplier(BenchmarkCase benchmark, MessagesPerSecondAttribute? config) + { + return (config?.GetNumberOfMessagesPerIteration(benchmark) ?? 1) * (config?.GetNumberOfHandlers(benchmark) ?? 1); + } +} \ No newline at end of file diff --git a/src/Akka.Persistence.EventStore.Benchmarks/EventStoreBenchmarkFixture.cs b/src/Akka.Persistence.EventStore.Benchmarks/EventStoreBenchmarkFixture.cs index b687072..50bf0c1 100644 --- a/src/Akka.Persistence.EventStore.Benchmarks/EventStoreBenchmarkFixture.cs +++ b/src/Akka.Persistence.EventStore.Benchmarks/EventStoreBenchmarkFixture.cs @@ -1,5 +1,6 @@ using Akka.Actor; using Akka.Configuration; +using Akka.Persistence.EventStore.Benchmarks.BenchmarkActors; using Akka.Persistence.EventStore.Tests; using FluentAssertions.Extensions; @@ -7,9 +8,7 @@ namespace Akka.Persistence.EventStore.Benchmarks; public static class EventStoreBenchmarkFixture { - private static EventStoreContainer? _eventStoreContainer; - - public static async Task CreateActorSystem(string name, Config? extraConfig = null) + public static async Task CreateActorSystemFromSeededData(string name, Config? extraConfig = null) { var config = ConfigurationFactory.ParseString(await File.ReadAllTextAsync("benchmark.conf")) .WithFallback(extraConfig ?? "") @@ -18,23 +17,52 @@ public static async Task CreateActorSystem(string name, Config? ext return ActorSystem.Create(name, config); } + + public static async Task CreateActorSystemWithCleanDb(string name, Config? extraConfig = null) + { + var eventStoreContainer = new EventStoreContainer(); + await eventStoreContainer.InitializeAsync(); + + var config = ConfigurationFactory.ParseString($$""" + akka.persistence.journal { + plugin = akka.persistence.journal.eventstore + eventstore { + connection-string = "{{eventStoreContainer.ConnectionString}}" + } + } + + akka.persistence.query.journal.eventstore { + write-plugin = akka.persistence.journal.eventstore + } + """) + .WithFallback(extraConfig ?? "") + .WithFallback(Persistence.DefaultConfig()) + .WithFallback(EventStorePersistence.DefaultConfiguration); + + var actorSystem = ActorSystem.Create(name, config); + + return new CleanActorSystem(actorSystem, eventStoreContainer.EventStoreContainerName ?? ""); + } public static async Task Initialize() { - _eventStoreContainer = new EventStoreContainer(); - await _eventStoreContainer.InitializeAsync(); + var eventStoreContainer = new EventStoreContainer(); + await eventStoreContainer.InitializeAsync(); await File.WriteAllTextAsync( "benchmark.conf", $$""" + container-name = "{{eventStoreContainer.EventStoreContainerName}}" + akka.persistence.journal { plugin = akka.persistence.journal.eventstore eventstore { - connection-string = "{{_eventStoreContainer.ConnectionString}}" + connection-string = "{{eventStoreContainer.ConnectionString}}" event-adapters { event-tagger = "{{typeof(EventTagger).AssemblyQualifiedName}}" } + event-adapter-bindings { "System.Int32" = event-tagger } @@ -46,7 +74,7 @@ await File.WriteAllTextAsync( } """); - var sys = await CreateActorSystem("Initializer", """ + var sys = await CreateActorSystemFromSeededData("Initializer", """ akka.persistence.journal { eventstore { auto-initialize = true @@ -61,9 +89,28 @@ await File.WriteAllTextAsync( 20.Minutes()); } - public static async Task Dispose() + public static async Task Cleanup() + { + if (!File.Exists("benchmark.conf")) + return; + + var config = ConfigurationFactory.ParseString(await File.ReadAllTextAsync("benchmark.conf")); + + var eventStoreContainerName = config.GetString("container-name"); + + if (!string.IsNullOrEmpty(eventStoreContainerName)) + await EventStoreDockerContainer.Stop(eventStoreContainerName); + + await File.WriteAllTextAsync("benchmark.conf", ""); + } + + public class CleanActorSystem(ActorSystem system, string containerName) : IAsyncDisposable { - if (_eventStoreContainer is not null) - await _eventStoreContainer.DisposeAsync(); + public ActorSystem System { get; } = system; + + public async ValueTask DisposeAsync() + { + await EventStoreDockerContainer.Stop(containerName); + } } } \ No newline at end of file diff --git a/src/Akka.Persistence.EventStore.Benchmarks/MicroBenchmarkConfig.cs b/src/Akka.Persistence.EventStore.Benchmarks/MicroBenchmarkConfig.cs deleted file mode 100644 index 2ffb1a0..0000000 --- a/src/Akka.Persistence.EventStore.Benchmarks/MicroBenchmarkConfig.cs +++ /dev/null @@ -1,17 +0,0 @@ -using BenchmarkDotNet.Configs; -using BenchmarkDotNet.Diagnosers; -using BenchmarkDotNet.Loggers; - -namespace Akka.Persistence.EventStore.Benchmarks; - -/// -/// Basic BenchmarkDotNet configuration used for micro benchmarks. -/// -public class MicroBenchmarkConfig : ManualConfig -{ - public MicroBenchmarkConfig() - { - AddDiagnoser(MemoryDiagnoser.Default); - AddLogger(ConsoleLogger.Default); - } -} diff --git a/src/Akka.Persistence.EventStore.Benchmarks/PersistAllAsyncBenchmarks.cs b/src/Akka.Persistence.EventStore.Benchmarks/PersistAllAsyncBenchmarks.cs new file mode 100644 index 0000000..7420b64 --- /dev/null +++ b/src/Akka.Persistence.EventStore.Benchmarks/PersistAllAsyncBenchmarks.cs @@ -0,0 +1,13 @@ +using Akka.Persistence.EventStore.Benchmarks.Columns; +using BenchmarkDotNet.Attributes; + +namespace Akka.Persistence.EventStore.Benchmarks; + +public class PersistAllAsyncBenchmarks : BasePersistBenchmarks +{ + [Benchmark, MessagesPerSecond(nameof(Configuration))] + public async Task PersistAllAsync() + { + await RunBenchmark("pba"); + } +} \ No newline at end of file diff --git a/src/Akka.Persistence.EventStore.Benchmarks/PersistAllBenchmarks.cs b/src/Akka.Persistence.EventStore.Benchmarks/PersistAllBenchmarks.cs new file mode 100644 index 0000000..bf30cd3 --- /dev/null +++ b/src/Akka.Persistence.EventStore.Benchmarks/PersistAllBenchmarks.cs @@ -0,0 +1,13 @@ +using Akka.Persistence.EventStore.Benchmarks.Columns; +using BenchmarkDotNet.Attributes; + +namespace Akka.Persistence.EventStore.Benchmarks; + +public class PersistAllBenchmarks : BasePersistBenchmarks +{ + [Benchmark, MessagesPerSecond(nameof(Configuration))] + public async Task PersistAll() + { + await RunBenchmark("pb"); + } +} \ No newline at end of file diff --git a/src/Akka.Persistence.EventStore.Benchmarks/PersistAsyncBenchmarks.cs b/src/Akka.Persistence.EventStore.Benchmarks/PersistAsyncBenchmarks.cs new file mode 100644 index 0000000..7219064 --- /dev/null +++ b/src/Akka.Persistence.EventStore.Benchmarks/PersistAsyncBenchmarks.cs @@ -0,0 +1,13 @@ +using Akka.Persistence.EventStore.Benchmarks.Columns; +using BenchmarkDotNet.Attributes; + +namespace Akka.Persistence.EventStore.Benchmarks; + +public class PersistAsyncBenchmarks : BasePersistBenchmarks +{ + [Benchmark, MessagesPerSecond(nameof(Configuration))] + public async Task PersistAsync() + { + await RunBenchmark("pa"); + } +} \ No newline at end of file diff --git a/src/Akka.Persistence.EventStore.Benchmarks/PersistBenchmarks.cs b/src/Akka.Persistence.EventStore.Benchmarks/PersistBenchmarks.cs new file mode 100644 index 0000000..8e4e665 --- /dev/null +++ b/src/Akka.Persistence.EventStore.Benchmarks/PersistBenchmarks.cs @@ -0,0 +1,13 @@ +using Akka.Persistence.EventStore.Benchmarks.Columns; +using BenchmarkDotNet.Attributes; + +namespace Akka.Persistence.EventStore.Benchmarks; + +public class PersistBenchmarks : BasePersistBenchmarks +{ + [Benchmark, MessagesPerSecond(nameof(Configuration))] + public async Task Persist() + { + await RunBenchmark("p"); + } +} \ No newline at end of file diff --git a/src/Akka.Persistence.EventStore.Benchmarks/Program.cs b/src/Akka.Persistence.EventStore.Benchmarks/Program.cs index fd5664a..37aee21 100644 --- a/src/Akka.Persistence.EventStore.Benchmarks/Program.cs +++ b/src/Akka.Persistence.EventStore.Benchmarks/Program.cs @@ -2,13 +2,17 @@ using Akka.Persistence.EventStore.Benchmarks; using BenchmarkDotNet.Running; -try +var firstArg = args.FirstOrDefault(); + +switch (firstArg) { - await EventStoreBenchmarkFixture.Initialize(); - - BenchmarkSwitcher.FromAssembly(Assembly.GetExecutingAssembly()).Run(args); + case "seed": + await EventStoreBenchmarkFixture.Initialize(); + break; + case "cleanup": + await EventStoreBenchmarkFixture.Cleanup(); + break; + default: + BenchmarkSwitcher.FromAssembly(Assembly.GetExecutingAssembly()).Run(args); + break; } -finally -{ - await EventStoreBenchmarkFixture.Dispose(); -} \ No newline at end of file diff --git a/src/Akka.Persistence.EventStore.Benchmarks/EventStoreTagBenchmark.cs b/src/Akka.Persistence.EventStore.Benchmarks/QueryByTagBenchmarks.cs similarity index 86% rename from src/Akka.Persistence.EventStore.Benchmarks/EventStoreTagBenchmark.cs rename to src/Akka.Persistence.EventStore.Benchmarks/QueryByTagBenchmarks.cs index a718649..6fbb7b8 100644 --- a/src/Akka.Persistence.EventStore.Benchmarks/EventStoreTagBenchmark.cs +++ b/src/Akka.Persistence.EventStore.Benchmarks/QueryByTagBenchmarks.cs @@ -3,13 +3,25 @@ using Akka.Persistence.Query; using Akka.Streams; using BenchmarkDotNet.Attributes; +using BenchmarkDotNet.Configs; +using BenchmarkDotNet.Diagnosers; +using BenchmarkDotNet.Loggers; using FluentAssertions; namespace Akka.Persistence.EventStore.Benchmarks; -[Config(typeof(MicroBenchmarkConfig))] -public class EventStoreTagBenchmark +[Config(typeof(Config))] +public class QueryByTagBenchmarks { + private class Config : ManualConfig + { + public Config() + { + AddDiagnoser(MemoryDiagnoser.Default); + AddLogger(ConsoleLogger.Default); + } + } + private IMaterializer? _materializer; private IReadJournal? _readJournal; @@ -18,7 +30,7 @@ public class EventStoreTagBenchmark [GlobalSetup] public async Task Setup() { - _sys = await EventStoreBenchmarkFixture.CreateActorSystem("system"); + _sys = await EventStoreBenchmarkFixture.CreateActorSystemFromSeededData("system"); _materializer = _sys.Materializer(); _readJournal = _sys.ReadJournalFor("akka.persistence.query.journal.eventstore"); } diff --git a/src/Akka.Persistence.EventStore.Benchmarks/README.md b/src/Akka.Persistence.EventStore.Benchmarks/README.md index 4b423d2..5b88afe 100644 --- a/src/Akka.Persistence.EventStore.Benchmarks/README.md +++ b/src/Akka.Persistence.EventStore.Benchmarks/README.md @@ -5,4 +5,6 @@ This benchmark uses BenchmarkDotNet to benchmark the performance of `CurrentEven How to run this benchmark: 1. You have to have docker installed on your machine. 2. Go to the project directory. -3. Run the benchmark by running `dotnet run -c Release` \ No newline at end of file +3. Seed data by running `dotnet run seed -c Release` +4. Run the benchmark by running `dotnet run -c Release` +5. Cleanup data by running `dotnet run cleanup -c Release` \ No newline at end of file diff --git a/src/Akka.Persistence.EventStore.Benchmarks/RecoverBenchmarks.cs b/src/Akka.Persistence.EventStore.Benchmarks/RecoverBenchmarks.cs new file mode 100644 index 0000000..ba02660 --- /dev/null +++ b/src/Akka.Persistence.EventStore.Benchmarks/RecoverBenchmarks.cs @@ -0,0 +1,95 @@ +using System.Collections.Immutable; +using Akka.Actor; +using Akka.Persistence.EventStore.Benchmarks.BenchmarkActors; +using Akka.Persistence.EventStore.Benchmarks.Columns; +using Akka.TestKit; +using Akka.TestKit.Xunit2; +using Akka.Util.Internal; +using BenchmarkDotNet.Attributes; +using BenchmarkDotNet.Configs; +using BenchmarkDotNet.Diagnosers; +using BenchmarkDotNet.Loggers; + +namespace Akka.Persistence.EventStore.Benchmarks; + +[Config(typeof(Config))] +public class RecoverBenchmarks +{ + private class Config : ManualConfig + { + public Config() + { + AddDiagnoser(MemoryDiagnoser.Default); + AddLogger(ConsoleLogger.Default); + AddColumn(new TotalMessagesPerSecondColumn()); + AddColumn(new MessagesPerHandlerPerSecondColumn()); + } + } + + private const int EventsCount = 1000; + + private static readonly IImmutableList Commands = Enumerable.Range(1, EventsCount).ToImmutableList(); + private static readonly TimeSpan ExpectDuration = TimeSpan.FromSeconds(40); + + private EventStoreBenchmarkFixture.CleanActorSystem? _sys; + private IImmutableList _persistenceIds = ImmutableList.Empty; + + [GlobalSetup] + public async Task Setup() + { + _sys = await EventStoreBenchmarkFixture.CreateActorSystemWithCleanDb("system"); + + _persistenceIds = Enumerable + .Range(1, NumberOfActors) + .Select(x => $"recover-{x}") + .ToImmutableList(); + + await Task.WhenAll(_persistenceIds + .Select(async x => + { + var testProbe = new TestProbe( + _sys!.System, + new XunitAssertions()); + + var benchActor = _sys.System.ActorOf(Props.Create(() => new BenchActor( + x, + testProbe, + EventsCount, + false))); + + Commands.ForEach(cmd => benchActor.Tell(new BenchActor.Commands.Cmd("p", cmd))); + + await testProbe.ExpectMsgAsync(Commands[^1], ExpectDuration); + })); + } + + [GlobalCleanup] + public async Task Cleanup() + { + if (_sys is not null) + await _sys.DisposeAsync(); + } + + [Params(1, 2, 4, 8)] + public int NumberOfActors { get; set; } + + [Benchmark, MessagesPerSecond(EventsCount, nameof(NumberOfActors))] + public async Task Recover() + { + await Task.WhenAll(_persistenceIds + .Select(async x => + { + var testProbe = new TestProbe( + _sys!.System, + new XunitAssertions()); + + _sys.System.ActorOf(Props.Create(() => new BenchActor( + x, + testProbe, + EventsCount, + false))); + + await testProbe.ExpectMsgAsync(Commands[^1], ExpectDuration); + })); + } +} \ No newline at end of file diff --git a/src/Akka.Persistence.EventStore.Benchmarks/SerializationBenchmarks.cs b/src/Akka.Persistence.EventStore.Benchmarks/SerializationBenchmarks.cs new file mode 100644 index 0000000..05e710e --- /dev/null +++ b/src/Akka.Persistence.EventStore.Benchmarks/SerializationBenchmarks.cs @@ -0,0 +1,130 @@ +using System.Collections.Immutable; +using Akka.Persistence.EventStore.Configuration; +using Akka.Persistence.EventStore.Serialization; +using BenchmarkDotNet.Attributes; +using BenchmarkDotNet.Configs; +using BenchmarkDotNet.Diagnosers; +using BenchmarkDotNet.Loggers; +using EventStore.Client; + +namespace Akka.Persistence.EventStore.Benchmarks; + +[Config(typeof(Config))] +public class SerializationBenchmarks +{ + private class Config : ManualConfig + { + public Config() + { + AddDiagnoser(MemoryDiagnoser.Default); + AddLogger(ConsoleLogger.Default); + } + } + + private readonly ComplexEvent _complexEvent = ComplexEvent.Create(); + + private DefaultMessageAdapter _adapter = null!; + private EventStoreBenchmarkFixture.CleanActorSystem? _sys; + + private ResolvedEvent _serializedStringEvent; + private ResolvedEvent _serializedComplexEvent; + + [GlobalSetup] + public async Task Setup() + { + _sys = await EventStoreBenchmarkFixture.CreateActorSystemWithCleanDb("system"); + + _adapter = new DefaultMessageAdapter( + _sys.System.Serialization, + new EventStoreJournalSettings(_sys.System.Settings.Config.GetConfig("akka.persistence.journal.eventstore"))); + + var serializedStringEvent = await _adapter.Adapt(new Persistent("a")); + var serializedComplexEvent = await _adapter.Adapt(new Persistent(_complexEvent)); + + _serializedStringEvent = new ResolvedEvent( + new EventRecord( + "string", + Uuid.NewUuid(), + StreamPosition.FromInt64(1), + Position.Start, + new Dictionary + { + ["type"] = serializedStringEvent.Type, + ["created"] = DateTime.Now.Ticks.ToString(), + ["content-type"] = serializedStringEvent.ContentType + }, + serializedStringEvent.Data, + serializedStringEvent.Metadata), + null, + null); + + _serializedComplexEvent = new ResolvedEvent( + new EventRecord( + "string", + Uuid.NewUuid(), + StreamPosition.FromInt64(1), + Position.Start, + new Dictionary + { + ["type"] = serializedComplexEvent.Type, + ["created"] = DateTime.Now.Ticks.ToString(), + ["content-type"] = serializedComplexEvent.ContentType + }, + serializedComplexEvent.Data, + serializedComplexEvent.Metadata), + null, + null); + } + + [GlobalCleanup] + public async Task Cleanup() + { + if (_sys is not null) + await _sys.DisposeAsync(); + } + + [Benchmark] + public async Task SerializeStringEvent() + { + await _adapter.Adapt(new Persistent("a")); + } + + [Benchmark] + public async Task SerializeComplexEvent() + { + await _adapter.Adapt(new Persistent(_complexEvent)); + } + + [Benchmark] + public async Task DeSerializeStringEvent() + { + await _adapter.AdaptEvent(_serializedStringEvent); + } + + [Benchmark] + public async Task DeSerializeComplexEvent() + { + await _adapter.AdaptEvent(_serializedComplexEvent); + } + + public record ComplexEvent( + string Name, + int Number, + ComplexEvent.SubData SubItem, + IImmutableList SubItemList) + { + public static ComplexEvent Create() + { + return new ComplexEvent( + "Name", + 100, + new SubData("123"), + ImmutableList.Create( + new SubData("1"), + new SubData("2"), + new SubData("3"))); + } + + public record SubData(string Value); + } +} \ No newline at end of file diff --git a/src/Akka.Persistence.EventStore.Tests/EventStoreContainer.cs b/src/Akka.Persistence.EventStore.Tests/EventStoreContainer.cs index 55b88ef..d25939b 100644 --- a/src/Akka.Persistence.EventStore.Tests/EventStoreContainer.cs +++ b/src/Akka.Persistence.EventStore.Tests/EventStoreContainer.cs @@ -1,154 +1,23 @@ -using System.Diagnostics; -using Docker.DotNet; -using Docker.DotNet.Models; -using System.Runtime.InteropServices; -using Xunit; +using Xunit; namespace Akka.Persistence.EventStore.Tests; public class EventStoreContainer : IAsyncLifetime { - private DockerClient? _client; - private readonly string _eventStoreContainerName = $"es-{Guid.NewGuid():N}"; - private static readonly Random Random; - private const string ImageName = "eventstore/eventstore"; - private const string Tag = "23.10.0-jammy"; - private const string EventStoreImage = ImageName + ":" + Tag; - private int _httpPort; - - static EventStoreContainer() - { - Random = new Random(); - } - + public string? EventStoreContainerName { get; private set; } public string? ConnectionString { get; private set; } public async Task InitializeAsync() { - DockerClientConfiguration config; - - if (RuntimeInformation.IsOSPlatform(OSPlatform.Linux) || - RuntimeInformation.IsOSPlatform(OSPlatform.OSX)) - { - config = new DockerClientConfiguration(new Uri("unix:///var/run/docker.sock")); - } - else if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) - { - config = new DockerClientConfiguration(new Uri("npipe://./pipe/docker_engine")); - } - else - { - throw new Exception("Unsupported OS"); - } - - _client = config.CreateClient(); - - var images = await _client.Images.ListImagesAsync(new ImagesListParameters - { - Filters = new Dictionary> - { - { - "reference", - new Dictionary - { - { EventStoreImage, true } - } - } - } - }); - - if (images.Count == 0) - { - await _client.Images.CreateImageAsync( - new ImagesCreateParameters { FromImage = ImageName, Tag = Tag }, null, - new Progress(message => - { - Console.WriteLine(!string.IsNullOrEmpty(message.ErrorMessage) - ? message.ErrorMessage - : $"{message.ID} {message.Status} {message.ProgressMessage}"); - })); - } - - _httpPort = Random.Next(2100, 2399); - - await _client.Containers.CreateContainerAsync( - new CreateContainerParameters - { - Image = EventStoreImage, - Name = _eventStoreContainerName, - Tty = true, - ExposedPorts = new Dictionary - { - { "2113/tcp", new EmptyStruct() } - }, - Env = new List - { - "EVENTSTORE_RUN_PROJECTIONS=All", - "EVENTSTORE_MEM_DB=True", - "EVENTSTORE_INSECURE=True" - }, - HostConfig = new HostConfig - { - PortBindings = new Dictionary> - { - { - "2113/tcp", - new List - { - new() - { - HostPort = $"{_httpPort}" - } - } - } - } - } - }); - - // Starting the container ... - await _client.Containers.StartContainerAsync( - _eventStoreContainerName, - new ContainerStartParameters()); - - ConnectionString = $"esdb://admin:changeit@localhost:{_httpPort}?tls=false&tlsVerifyCert=false"; - - await WaitForEventStoreToStart(TimeSpan.FromSeconds(5), _client); - - async Task WaitForEventStoreToStart(TimeSpan timeout, IDockerClient dockerClient) - { - var logStream = await dockerClient.Containers.GetContainerLogsAsync(_eventStoreContainerName, - new ContainerLogsParameters - { - Follow = true, - ShowStdout = true, - ShowStderr = true - }); - - using (var reader = new StreamReader(logStream)) - { - var stopwatch = Stopwatch.StartNew(); - - while (stopwatch.Elapsed < timeout && await reader.ReadLineAsync() is { } line) - { - if (line.Contains("IS LEADER... SPARTA!")) break; - } - - stopwatch.Stop(); - } - - await logStream.DisposeAsync(); - } + var startResponse = await EventStoreDockerContainer.Start(); + + ConnectionString = $"esdb://admin:changeit@localhost:{startResponse.HttpPort}?tls=false&tlsVerifyCert=false"; + EventStoreContainerName = startResponse.ContainerName; } public async Task DisposeAsync() { - if (_client != null) - { - await _client.Containers.StopContainerAsync(_eventStoreContainerName, - new ContainerStopParameters { WaitBeforeKillSeconds = 0 }); - await _client.Containers.RemoveContainerAsync(_eventStoreContainerName, - new ContainerRemoveParameters { Force = true }); - _client.Dispose(); - } + if (!string.IsNullOrEmpty(EventStoreContainerName)) + await EventStoreDockerContainer.Stop(EventStoreContainerName); } } \ No newline at end of file diff --git a/src/Akka.Persistence.EventStore.Tests/EventStoreDockerContainer.cs b/src/Akka.Persistence.EventStore.Tests/EventStoreDockerContainer.cs new file mode 100644 index 0000000..e7f7ae2 --- /dev/null +++ b/src/Akka.Persistence.EventStore.Tests/EventStoreDockerContainer.cs @@ -0,0 +1,148 @@ +using System.Diagnostics; +using System.Runtime.InteropServices; +using Docker.DotNet; +using Docker.DotNet.Models; + +namespace Akka.Persistence.EventStore.Tests; + +public static class EventStoreDockerContainer +{ + private static readonly Random Random; + private static readonly DockerClient Client; + + private const string ImageName = "eventstore/eventstore"; + private const string Tag = "23.10.0-jammy"; + private const string EventStoreImage = ImageName + ":" + Tag; + + static EventStoreDockerContainer() + { + Random = new Random(); + + DockerClientConfiguration config; + + if (RuntimeInformation.IsOSPlatform(OSPlatform.Linux) || + RuntimeInformation.IsOSPlatform(OSPlatform.OSX)) + { + config = new DockerClientConfiguration(new Uri("unix:///var/run/docker.sock")); + } + else if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) + { + config = new DockerClientConfiguration(new Uri("npipe://./pipe/docker_engine")); + } + else + { + throw new Exception("Unsupported OS"); + } + + Client = config.CreateClient(); + } + + public static async Task Start() + { + var images = await Client.Images.ListImagesAsync(new ImagesListParameters + { + Filters = new Dictionary> + { + { + "reference", + new Dictionary + { + { EventStoreImage, true } + } + } + } + }); + + if (images.Count == 0) + { + await Client.Images.CreateImageAsync( + new ImagesCreateParameters { FromImage = ImageName, Tag = Tag }, null, + new Progress(message => + { + Console.WriteLine(!string.IsNullOrEmpty(message.ErrorMessage) + ? message.ErrorMessage + : $"{message.ID} {message.Status} {message.ProgressMessage}"); + })); + } + + var httpPort = Random.Next(2100, 2399); + var containerName = $"es-{Guid.NewGuid():N}"; + + await Client.Containers.CreateContainerAsync( + new CreateContainerParameters + { + Image = EventStoreImage, + Name = containerName, + Tty = true, + ExposedPorts = new Dictionary + { + { "2113/tcp", new EmptyStruct() } + }, + Env = new List + { + "EVENTSTORE_RUN_PROJECTIONS=All", + "EVENTSTORE_MEM_DB=True", + "EVENTSTORE_INSECURE=True" + }, + HostConfig = new HostConfig + { + PortBindings = new Dictionary> + { + { + "2113/tcp", + new List + { + new() + { + HostPort = $"{httpPort}" + } + } + } + } + } + }); + + await Client.Containers.StartContainerAsync( + containerName, + new ContainerStartParameters()); + + await WaitForEventStoreToStart(TimeSpan.FromSeconds(5), Client); + + return new StartContainerResponse(containerName, httpPort); + + async Task WaitForEventStoreToStart(TimeSpan timeout, IDockerClient dockerClient) + { + var logStream = await dockerClient.Containers.GetContainerLogsAsync(containerName, + new ContainerLogsParameters + { + Follow = true, + ShowStdout = true, + ShowStderr = true + }); + + using (var reader = new StreamReader(logStream)) + { + var stopwatch = Stopwatch.StartNew(); + + while (stopwatch.Elapsed < timeout && await reader.ReadLineAsync() is { } line) + { + if (line.Contains("IS LEADER... SPARTA!")) break; + } + + stopwatch.Stop(); + } + + await logStream.DisposeAsync(); + } + } + + public static async Task Stop(string containerName) + { + await Client.Containers.StopContainerAsync(containerName, + new ContainerStopParameters { WaitBeforeKillSeconds = 0 }); + await Client.Containers.RemoveContainerAsync(containerName, + new ContainerRemoveParameters { Force = true }); + } + + public record StartContainerResponse(string ContainerName, int HttpPort); +} \ No newline at end of file