diff --git a/Directory.Packages.props b/Directory.Packages.props index d7c0d35..e19d247 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -1,40 +1,41 @@ true - 1.5.18 + 1.5.19 23.2.1 - 1.2.2 - 2.4.1 - 15.9.0 + 2.7.1 + 17.9.0 - - - - + + - - - + + + - - + + - - - + + + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + - + + \ No newline at end of file diff --git a/src/Akka.Persistence.EventStore.Hosting.Tests/JournalSettingsSpec.cs b/src/Akka.Persistence.EventStore.Hosting.Tests/JournalSettingsSpec.cs index d37c4f0..e93c63c 100644 --- a/src/Akka.Persistence.EventStore.Hosting.Tests/JournalSettingsSpec.cs +++ b/src/Akka.Persistence.EventStore.Hosting.Tests/JournalSettingsSpec.cs @@ -36,6 +36,8 @@ public void DefaultOptionsTest() actualConfig.GetString("persistence-ids-stream-name").Should().Be(defaultConfig.GetString("persistence-ids-stream-name")); actualConfig.GetString("persisted-events-stream-name").Should().Be(defaultConfig.GetString("persisted-events-stream-name")); actualConfig.GetString("tenant").Should().Be(defaultConfig.GetString("tenant")); + actualConfig.GetString("materializer-dispatcher").Should() + .Be(defaultConfig.GetString("materializer-dispatcher")); } [Fact(DisplayName = "Custom Options should modify default config")] @@ -52,7 +54,8 @@ public void ModifiedOptionsTest() TaggedStreamNamePattern = "custom-tagged-[[TAG]]", PersistedEventsStreamName = "persisted-events-custom", PersistenceIdsStreamName = "persistence-ids-custom", - Tenant = "tenant" + Tenant = "tenant", + MaterializerDispatcher = "custom-dispatcher" }; var fullConfig = opt.ToConfig(); @@ -71,5 +74,6 @@ public void ModifiedOptionsTest() config.PersistedEventsStreamName.Should().Be("persisted-events-custom"); config.PersistenceIdsStreamName.Should().Be("persistence-ids-custom"); config.Tenant.Should().Be("tenant"); + config.MaterializerDispatcher.Should().Be("custom-dispatcher"); } } \ No newline at end of file diff --git a/src/Akka.Persistence.EventStore.Hosting.Tests/SnapshotSettingsSpec.cs b/src/Akka.Persistence.EventStore.Hosting.Tests/SnapshotSettingsSpec.cs index f37ed1e..95b0402 100644 --- a/src/Akka.Persistence.EventStore.Hosting.Tests/SnapshotSettingsSpec.cs +++ b/src/Akka.Persistence.EventStore.Hosting.Tests/SnapshotSettingsSpec.cs @@ -32,6 +32,8 @@ public void DefaultOptionsTest() actualConfig.GetString("adapter").Should().Be(defaultConfig.GetString("adapter")); actualConfig.GetString("prefix").Should().Be(defaultConfig.GetString("prefix")); actualConfig.GetString("tenant").Should().Be(defaultConfig.GetString("tenant")); + actualConfig.GetString("materializer-dispatcher").Should() + .Be(defaultConfig.GetString("materializer-dispatcher")); } [Fact(DisplayName = "Custom Options should modify default config")] @@ -43,7 +45,8 @@ public void ModifiedOptionsTest() ConnectionString = "a", Adapter = "custom", Prefix = "custom@", - Tenant = "tenant" + Tenant = "tenant", + MaterializerDispatcher = "custom-dispatcher" }; var fullConfig = opt.ToConfig(); @@ -57,5 +60,6 @@ public void ModifiedOptionsTest() config.Adapter.Should().Be("custom"); config.StreamPrefix.Should().Be("custom@"); config.Tenant.Should().Be("tenant"); + config.MaterializerDispatcher.Should().Be("custom-dispatcher"); } } \ No newline at end of file diff --git a/src/Akka.Persistence.EventStore.Hosting/EventStoreJournalOptions.cs b/src/Akka.Persistence.EventStore.Hosting/EventStoreJournalOptions.cs index 8cc454f..078c707 100644 --- a/src/Akka.Persistence.EventStore.Hosting/EventStoreJournalOptions.cs +++ b/src/Akka.Persistence.EventStore.Hosting/EventStoreJournalOptions.cs @@ -16,18 +16,21 @@ public EventStoreJournalOptions() : this(true) private static readonly Config Default = EventStorePersistence.DefaultJournalConfiguration; private static readonly Config DefaultQuery = EventStorePersistence.DefaultQueryConfiguration; - public string? ConnectionString { get; set; } - public string? Adapter { get; set; } - public string? StreamPrefix { get; set; } - public string? TaggedStreamNamePattern { get; set; } - public string? PersistenceIdsStreamName { get; set; } - public string? PersistedEventsStreamName { get; set; } - public TimeSpan? QueryRefreshInterval { get; set; } - public string? Tenant { get; set; } + public string? ConnectionString { get; init; } + public string? Adapter { get; init; } + public string? StreamPrefix { get; init; } + public string? TaggedStreamNamePattern { get; init; } + public string? PersistenceIdsStreamName { get; init; } + public string? PersistedEventsStreamName { get; init; } + public TimeSpan? QueryRefreshInterval { get; init; } + public TimeSpan? QueryProjectionCatchupTimeout { get; init; } + public string? Tenant { get; init; } + public string? MaterializerDispatcher { get; init; } public override string Identifier { get; set; } = identifier; public Config DefaultQueryConfig => DefaultQuery.MoveTo(QueryPluginId); protected override Config InternalDefaultConfig => Default; - public string QueryPluginId => $"akka.persistence.query.journal.{Identifier}"; + + private string QueryPluginId => $"akka.persistence.query.journal.{Identifier}"; protected override StringBuilder Build(StringBuilder sb) { @@ -50,6 +53,9 @@ protected override StringBuilder Build(StringBuilder sb) if (!string.IsNullOrEmpty(PersistedEventsStreamName)) sb.AppendLine($"persisted-events-stream-name = {PersistedEventsStreamName.ToHocon()}"); + + if (!string.IsNullOrEmpty(MaterializerDispatcher)) + sb.AppendLine($"materializer-dispatcher = {MaterializerDispatcher.ToHocon()}"); if (!string.IsNullOrEmpty(Tenant)) sb.AppendLine($"tenant = {Tenant.ToHocon()}"); @@ -62,6 +68,9 @@ protected override StringBuilder Build(StringBuilder sb) if (QueryRefreshInterval != null) sb.AppendLine($"refresh-interval = {QueryRefreshInterval.ToHocon()}"); + + if (QueryProjectionCatchupTimeout != null) + sb.AppendLine($"projection-catchup-timeout = {QueryProjectionCatchupTimeout.ToHocon()}"); sb.AppendLine("}"); diff --git a/src/Akka.Persistence.EventStore.Hosting/EventStoreSnapshotOptions.cs b/src/Akka.Persistence.EventStore.Hosting/EventStoreSnapshotOptions.cs index 1c731df..41e6b04 100644 --- a/src/Akka.Persistence.EventStore.Hosting/EventStoreSnapshotOptions.cs +++ b/src/Akka.Persistence.EventStore.Hosting/EventStoreSnapshotOptions.cs @@ -18,6 +18,7 @@ public EventStoreSnapshotOptions() : this(true) public string? Adapter { get; set; } public string? Prefix { get; set; } public string? Tenant { get; set; } + public string? MaterializerDispatcher { get; set; } public override string Identifier { get; set; } = identifier; protected override Config InternalDefaultConfig => Default; @@ -34,6 +35,9 @@ protected override StringBuilder Build(StringBuilder sb) if (!string.IsNullOrEmpty(Prefix)) sb.AppendLine($"prefix = {Prefix.ToHocon()}"); + if (!string.IsNullOrEmpty(MaterializerDispatcher)) + sb.AppendLine($"materializer-dispatcher = {MaterializerDispatcher.ToHocon()}"); + if (!string.IsNullOrEmpty(Tenant)) sb.AppendLine($"tenant = {Tenant.ToHocon()}"); diff --git a/src/Akka.Persistence.EventStore.Hosting/EventStoreTenantOptions.cs b/src/Akka.Persistence.EventStore.Hosting/EventStoreTenantOptions.cs index 380f725..c844f7c 100644 --- a/src/Akka.Persistence.EventStore.Hosting/EventStoreTenantOptions.cs +++ b/src/Akka.Persistence.EventStore.Hosting/EventStoreTenantOptions.cs @@ -6,14 +6,12 @@ namespace Akka.Persistence.EventStore.Hosting; public class EventStoreTenantOptions(string? tenantStreamNamePattern) { - public string? TenantStreamNamePattern => tenantStreamNamePattern; - - public StringBuilder Build(StringBuilder sb) + private StringBuilder Build(StringBuilder sb) { sb.AppendLine($"{EventStorePersistence.TenantConfigPath} {{"); - if (!string.IsNullOrEmpty(TenantStreamNamePattern)) - sb.AppendLine($"tenant-stream-name-pattern = {TenantStreamNamePattern.ToHocon()}"); + if (!string.IsNullOrEmpty(tenantStreamNamePattern)) + sb.AppendLine($"tenant-stream-name-pattern = {tenantStreamNamePattern.ToHocon()}"); sb.AppendLine("}"); diff --git a/src/Akka.Persistence.EventStore.Hosting/HostingExtensions.cs b/src/Akka.Persistence.EventStore.Hosting/HostingExtensions.cs index ce6422d..5caa5fc 100644 --- a/src/Akka.Persistence.EventStore.Hosting/HostingExtensions.cs +++ b/src/Akka.Persistence.EventStore.Hosting/HostingExtensions.cs @@ -1,8 +1,10 @@ using Akka.Hosting; using Akka.Persistence.Hosting; +using JetBrains.Annotations; namespace Akka.Persistence.EventStore.Hosting; +[PublicAPI] public static class HostingExtensions { public static AkkaConfigurationBuilder WithEventStorePersistence( @@ -20,7 +22,10 @@ public static AkkaConfigurationBuilder WithEventStorePersistence( string? taggedJournalStreamPattern = null, string? persistenceIdsStreamName = null, string? persistedEventsStreamName = null, - string? tenantStreamNamePattern = null) + string? tenantStreamNamePattern = null, + string? materializerDispatcher = null, + TimeSpan? queryRefreshInterval = null, + TimeSpan? queryProjectionCatchupTimeout = null) { if (mode == PersistenceMode.SnapshotStore && journalBuilder is not null) throw new Exception($"{nameof(journalBuilder)} can only be set when {nameof(mode)} is set to either {PersistenceMode.Both} or {PersistenceMode.Journal}"); @@ -37,7 +42,10 @@ public static AkkaConfigurationBuilder WithEventStorePersistence( TaggedStreamNamePattern = taggedJournalStreamPattern, PersistedEventsStreamName = persistedEventsStreamName, PersistenceIdsStreamName = persistenceIdsStreamName, - Tenant = tenant + Tenant = tenant, + MaterializerDispatcher = materializerDispatcher, + QueryRefreshInterval = queryRefreshInterval, + QueryProjectionCatchupTimeout = queryProjectionCatchupTimeout }; var adapters = new AkkaPersistenceJournalBuilder(journalOptions.Identifier, builder); @@ -52,7 +60,8 @@ public static AkkaConfigurationBuilder WithEventStorePersistence( AutoInitialize = autoInitialize, Adapter = adapter, Prefix = snapshotStreamPrefix, - Tenant = tenant + Tenant = tenant, + MaterializerDispatcher = materializerDispatcher }; var tenantOptions = !string.IsNullOrEmpty(tenantStreamNamePattern) diff --git a/src/Akka.Persistence.EventStore.Tests/DatabaseFixture.cs b/src/Akka.Persistence.EventStore.Tests/DatabaseFixture.cs index fe0a87d..bebd23c 100644 --- a/src/Akka.Persistence.EventStore.Tests/DatabaseFixture.cs +++ b/src/Akka.Persistence.EventStore.Tests/DatabaseFixture.cs @@ -1,4 +1,5 @@ -using Docker.DotNet; +using System.Diagnostics; +using Docker.DotNet; using Docker.DotNet.Models; using Microsoft.Extensions.Configuration; using System.Runtime.InteropServices; @@ -93,8 +94,7 @@ await _client.Containers.CreateContainerAsync( { "EVENTSTORE_RUN_PROJECTIONS=All", "EVENTSTORE_MEM_DB=True", - "EVENTSTORE_INSECURE=True", - "EVENTSTORE_ENABLE_ATOM_PUB_OVER_HTTP=True" + "EVENTSTORE_INSECURE=True" }, HostConfig = new HostConfig { @@ -120,8 +120,32 @@ await _client.Containers.StartContainerAsync( new ContainerStartParameters()); ConnectionString = $"esdb://admin:changeit@localhost:{_httpPort}?tls=false&tlsVerifyCert=false"; - - await Task.Delay(5000); + + await WaitForEventStoreToStart(TimeSpan.FromSeconds(5), _client); + + async Task WaitForEventStoreToStart(TimeSpan timeout, IDockerClient dockerClient) + { + var logStream = await dockerClient.Containers.GetContainerLogsAsync(_eventStoreContainerName, new ContainerLogsParameters + { + Follow = true, + ShowStdout = true, + ShowStderr = true + }); + + using (var reader = new StreamReader(logStream)) + { + var stopwatch = Stopwatch.StartNew(); + + while (stopwatch.Elapsed < timeout && await reader.ReadLineAsync() is { } line) + { + if (line.Contains("IS LEADER... SPARTA!")) break; + } + + stopwatch.Stop(); + } + + await logStream.DisposeAsync(); + } } else { diff --git a/src/Akka.Persistence.EventStore.Tests/EventStoreConfiguration.cs b/src/Akka.Persistence.EventStore.Tests/EventStoreConfiguration.cs index 401a794..8e4fd9a 100644 --- a/src/Akka.Persistence.EventStore.Tests/EventStoreConfiguration.cs +++ b/src/Akka.Persistence.EventStore.Tests/EventStoreConfiguration.cs @@ -34,6 +34,7 @@ class = ""Akka.Persistence.EventStore.Snapshot.EventStoreSnapshotStore, Akka.Per class = ""Akka.Persistence.EventStore.Query.EventStoreReadJournalProvider, Akka.Persistence.EventStore"" write-plugin = ""akka.persistence.journal.eventstore"" refresh-interval = 1s + projection-catchup-timeout = 1s }} akka.test.single-expect-default = 10s"); diff --git a/src/Akka.Persistence.EventStore/Akka.Persistence.EventStore.csproj b/src/Akka.Persistence.EventStore/Akka.Persistence.EventStore.csproj index ab58b2d..b2c1f5e 100644 --- a/src/Akka.Persistence.EventStore/Akka.Persistence.EventStore.csproj +++ b/src/Akka.Persistence.EventStore/Akka.Persistence.EventStore.csproj @@ -26,6 +26,7 @@ + diff --git a/src/Akka.Persistence.EventStore/Akka.Persistence.EventStore.nuspec b/src/Akka.Persistence.EventStore/Akka.Persistence.EventStore.nuspec deleted file mode 100644 index 18b1969..0000000 --- a/src/Akka.Persistence.EventStore/Akka.Persistence.EventStore.nuspec +++ /dev/null @@ -1,18 +0,0 @@ - - - - $id$ - $version$ - $title$ - $author$ - $author$ - https://github.com/AkkaNetContrib/Akka.Persistence.EventStore/blob/dev/LICENSE - https://github.com/AkkaNetContrib/Akka.Persistence.EventStore - http://getakka.net/images/AkkaNetLogo.Normal.png - false - $description$ - Initial release - Copyright 2018 - akka.net persistence eventstore - - \ No newline at end of file diff --git a/src/Akka.Persistence.EventStore/Configuration/EventStoreReadJournalSettings.cs b/src/Akka.Persistence.EventStore/Configuration/EventStoreReadJournalSettings.cs index 96dc7ec..0973325 100644 --- a/src/Akka.Persistence.EventStore/Configuration/EventStoreReadJournalSettings.cs +++ b/src/Akka.Persistence.EventStore/Configuration/EventStoreReadJournalSettings.cs @@ -14,8 +14,10 @@ public EventStoreReadJournalSettings(Config config) WritePlugin = config.GetString("write-plugin"); QueryRefreshInterval = config.GetTimeSpan("refresh-interval", TimeSpan.FromSeconds(5)); + ProjectionCatchupTimeout = config.GetTimeSpan("projection-catchup-timeout", TimeSpan.FromMilliseconds(500)); } public string WritePlugin { get; } public TimeSpan QueryRefreshInterval { get; } + public TimeSpan ProjectionCatchupTimeout { get; } } \ No newline at end of file diff --git a/src/Akka.Persistence.EventStore/Configuration/ISettingsWithAdapter.cs b/src/Akka.Persistence.EventStore/Configuration/ISettingsWithAdapter.cs index 3d04086..676502a 100644 --- a/src/Akka.Persistence.EventStore/Configuration/ISettingsWithAdapter.cs +++ b/src/Akka.Persistence.EventStore/Configuration/ISettingsWithAdapter.cs @@ -5,7 +5,4 @@ public interface ISettingsWithAdapter string Adapter { get; } string DefaultSerializer { get; } public string Tenant { get; } - string StreamPrefix { get; } - - string GetStreamName(string persistenceId, EventStoreTenantSettings tenantSettings); } \ No newline at end of file diff --git a/src/Akka.Persistence.EventStore/EventStorePersistence.cs b/src/Akka.Persistence.EventStore/EventStorePersistence.cs index da22d1a..3c9cf1e 100644 --- a/src/Akka.Persistence.EventStore/EventStorePersistence.cs +++ b/src/Akka.Persistence.EventStore/EventStorePersistence.cs @@ -2,9 +2,11 @@ using Akka.Configuration; using Akka.Persistence.EventStore.Journal; using Akka.Persistence.EventStore.Snapshot; +using JetBrains.Annotations; namespace Akka.Persistence.EventStore; +[PublicAPI] public class EventStorePersistence : IExtension { public const string JournalConfigPath = "akka.persistence.journal.eventstore"; diff --git a/src/Akka.Persistence.EventStore/EventStorePersistenceProvider.cs b/src/Akka.Persistence.EventStore/EventStorePersistenceProvider.cs index bb550e8..efac438 100644 --- a/src/Akka.Persistence.EventStore/EventStorePersistenceProvider.cs +++ b/src/Akka.Persistence.EventStore/EventStorePersistenceProvider.cs @@ -1,10 +1,12 @@ using Akka.Actor; +using JetBrains.Annotations; namespace Akka.Persistence.EventStore; /// /// Extension Id provider for the EventStore Persistence extension. /// +[PublicAPI] public class EventStorePersistenceProvider : ExtensionIdProvider { /// diff --git a/src/Akka.Persistence.EventStore/Journal/EventStoreJournal.cs b/src/Akka.Persistence.EventStore/Journal/EventStoreJournal.cs index 06621c0..ec1ee1e 100644 --- a/src/Akka.Persistence.EventStore/Journal/EventStoreJournal.cs +++ b/src/Akka.Persistence.EventStore/Journal/EventStoreJournal.cs @@ -13,9 +13,11 @@ using Akka.Streams.Dsl; using Akka.Streams.Implementation.Stages; using EventStore.Client; +using JetBrains.Annotations; namespace Akka.Persistence.EventStore.Journal; +[PublicAPI] public class EventStoreJournal : AsyncWriteJournal, IWithUnboundedStash { private const string LastSequenceNumberMetaDataKey = "lastSeq"; @@ -28,6 +30,7 @@ public class EventStoreJournal : AsyncWriteJournal, IWithUnboundedStash private IMessageAdapter _adapter = null!; private ActorMaterializer _mat = null!; + // ReSharper disable once ConvertToPrimaryConstructor public EventStoreJournal(Config journalConfig) { _log = Context.GetLogger(); @@ -106,7 +109,7 @@ await EventStoreSource : StreamRevision.FromInt64(lowSequenceId); await Source.From(persistentMessages - .Select(x => x.WithTimestamp(DateTime.UtcNow.Ticks))) + .Select(x => x.Timestamp > 0 ? x : x.WithTimestamp(DateTime.UtcNow.Ticks))) .SerializeWith(_adapter) .Grouped(persistentMessages.Count) .Select(x => new EventStoreWrite( diff --git a/src/Akka.Persistence.EventStore/Messages/NewPersistenceIdFound.cs b/src/Akka.Persistence.EventStore/Messages/NewPersistenceIdFound.cs index 6c37e06..f93be2b 100644 --- a/src/Akka.Persistence.EventStore/Messages/NewPersistenceIdFound.cs +++ b/src/Akka.Persistence.EventStore/Messages/NewPersistenceIdFound.cs @@ -1,3 +1,5 @@ +using JetBrains.Annotations; + namespace Akka.Persistence.EventStore.Messages; -public record NewPersistenceIdFound(string PersistenceId); \ No newline at end of file +public record NewPersistenceIdFound([PublicAPI]string PersistenceId); \ No newline at end of file diff --git a/src/Akka.Persistence.EventStore/Query/EventStoreReadJournal.cs b/src/Akka.Persistence.EventStore/Query/EventStoreReadJournal.cs index 139be62..1995dbe 100644 --- a/src/Akka.Persistence.EventStore/Query/EventStoreReadJournal.cs +++ b/src/Akka.Persistence.EventStore/Query/EventStoreReadJournal.cs @@ -76,7 +76,8 @@ public Source PersistenceIds() _settings.QueryRefreshInterval) .DeSerializeEventWith(_adapter) .Filter(filter) - .Select(r => r.Data.PersistenceId); + .Select(r => r.Data.PersistenceId) + .MapMaterializedValue(_ => NotUsed.Instance); } public Source CurrentPersistenceIds() @@ -87,10 +88,11 @@ public Source CurrentPersistenceIds() .FromStream( _eventStoreClient, filter, - noEventGracePeriod: TimeSpan.FromMilliseconds(300)) + noEventGracePeriod: _settings.ProjectionCatchupTimeout) .DeSerializeEventWith(_adapter) .Filter(filter) - .Select(r => r.Data.PersistenceId); + .Select(r => r.Data.PersistenceId) + .MapMaterializedValue(_ => NotUsed.Instance); } public Source EventsByTag(string tag, Offset offset) => EventsFromStreamSource( @@ -130,7 +132,7 @@ private Source EventsFromStreamSource( filter, refreshInterval, resolveLinkTos, - TimeSpan.FromMilliseconds(500)) + _settings.ProjectionCatchupTimeout) .DeSerializeEventWith(_adapter) .Filter(filter) .SelectMany(r => @@ -144,7 +146,8 @@ private Source EventsFromStreamSource( sequenceNr: r.representation.SequenceNr, @event: r.representation.Payload, timestamp: r.representation.Timestamp, - Array.Empty())); + [])) + .MapMaterializedValue(_ => NotUsed.Instance); private ImmutableList AdaptEvents( IPersistentRepresentation persistentRepresentation) diff --git a/src/Akka.Persistence.EventStore/Query/EventStoreReadJournalProvider.cs b/src/Akka.Persistence.EventStore/Query/EventStoreReadJournalProvider.cs index c28c26d..87d0d60 100644 --- a/src/Akka.Persistence.EventStore/Query/EventStoreReadJournalProvider.cs +++ b/src/Akka.Persistence.EventStore/Query/EventStoreReadJournalProvider.cs @@ -1,9 +1,11 @@ using Akka.Actor; using Akka.Configuration; using Akka.Persistence.Query; +using JetBrains.Annotations; namespace Akka.Persistence.EventStore.Query; +[PublicAPI] public class EventStoreReadJournalProvider(ExtendedActorSystem system, Config config) : IReadJournalProvider { public IReadJournal GetReadJournal() diff --git a/src/Akka.Persistence.EventStore/Serialization/DefaultMessageAdapter.cs b/src/Akka.Persistence.EventStore/Serialization/DefaultMessageAdapter.cs index 354818c..021cabf 100644 --- a/src/Akka.Persistence.EventStore/Serialization/DefaultMessageAdapter.cs +++ b/src/Akka.Persistence.EventStore/Serialization/DefaultMessageAdapter.cs @@ -3,6 +3,7 @@ using Akka.Persistence.EventStore.Configuration; using Akka.Persistence.Journal; using EventStore.Client; +using JetBrains.Annotations; namespace Akka.Persistence.EventStore.Serialization; @@ -53,7 +54,7 @@ public async Task Adapt(SnapshotMetadata snapshotMetadata, object sna if (payloadType == null) return null; - var payload = await DeSerialize(evnt.Event.Data.ToArray(), payloadType); + var payload = await DeSerialize(evnt.Event.Data, payloadType); if (payload == null) return null; @@ -84,7 +85,7 @@ public async Task Adapt(SnapshotMetadata snapshotMetadata, object sna if (payloadType == null) return null; - var payload = await DeSerialize(evnt.Event.Data.ToArray(), payloadType); + var payload = await DeSerialize(evnt.Event.Data, payloadType); if (payload == null) return null; @@ -99,6 +100,7 @@ public virtual string GetManifest(Type type) return type.ToClrTypeName(); } + [PublicAPI] protected virtual Task> Serialize(object data) { var serializer = serialization.FindSerializerForType(data.GetType(), settings.DefaultSerializer); @@ -106,6 +108,7 @@ protected virtual Task> Serialize(object data) return Task.FromResult(new ReadOnlyMemory(serializer.ToBinary(data))); } + [PublicAPI] protected virtual Task DeSerialize(ReadOnlyMemory data, Type type) { var serializer = serialization.FindSerializerForType(type, settings.DefaultSerializer); @@ -113,16 +116,19 @@ protected virtual Task> Serialize(object data) return Task.FromResult(serializer.FromBinary(data.ToArray(), type)); } + [PublicAPI] protected virtual string GetEventType(object data) { return data.GetType().Name.ToEventCase(); } + [PublicAPI] protected virtual Type? GetTypeFromManifest(string manifest) { return Type.GetType(manifest, false); } + [PublicAPI] protected virtual IStoredEventMetadata GetEventMetadata( IPersistentRepresentation message, IImmutableSet tags) @@ -130,6 +136,7 @@ protected virtual IStoredEventMetadata GetEventMetadata( return new StoredEventMetadata(message, tags, settings.Tenant); } + [PublicAPI] protected virtual async Task GetEventMetadataFrom(ResolvedEvent evnt) { var metadata = await DeSerialize(evnt.Event.Metadata, typeof(StoredEventMetadata)); @@ -137,6 +144,7 @@ protected virtual IStoredEventMetadata GetEventMetadata( return metadata as IStoredEventMetadata; } + [PublicAPI] protected virtual IStoredSnapshotMetadata GetSnapshotMetadata( SnapshotMetadata snapshotMetadata, string manifest) @@ -144,6 +152,7 @@ protected virtual IStoredSnapshotMetadata GetSnapshotMetadata( return new StoredSnapshotMetadata(snapshotMetadata, manifest, settings.Tenant); } + [PublicAPI] protected virtual async Task GetSnapshotMetadataFrom(ResolvedEvent evnt) { var metadata = await DeSerialize(evnt.Event.Metadata, typeof(StoredSnapshotMetadata)); @@ -151,6 +160,7 @@ protected virtual IStoredSnapshotMetadata GetSnapshotMetadata( return metadata as IStoredSnapshotMetadata; } + [PublicAPI] public interface IStoredEventMetadata { // ReSharper disable once InconsistentNaming @@ -171,6 +181,7 @@ public interface IStoredEventMetadata public class StoredEventMetadata : IStoredEventMetadata { + [PublicAPI] public StoredEventMetadata() { @@ -195,6 +206,7 @@ public StoredEventMetadata( public string persistenceId { get; set; } = null!; // ReSharper disable once InconsistentNaming + [PublicAPI] public DateTimeOffset occurredOn { get; set; } public string manifest { get; set; } = null!; public long sequenceNr { get; set; } @@ -202,12 +214,15 @@ public StoredEventMetadata( public string journalType { get; set; } = null!; public long? timestamp { get; set; } // ReSharper disable once InconsistentNaming + [PublicAPI] public string tenant { get; set; } = null!; // ReSharper disable once InconsistentNaming + [PublicAPI] public IImmutableSet tags { get; set; } = ImmutableHashSet.Empty; public IActorRef? sender { get; set; } } + [PublicAPI] public interface IStoredSnapshotMetadata { // ReSharper disable once InconsistentNaming @@ -222,6 +237,7 @@ public interface IStoredSnapshotMetadata DateTime occurredOn { get; } } + [PublicAPI] public class StoredSnapshotMetadata : IStoredSnapshotMetadata { public StoredSnapshotMetadata() diff --git a/src/Akka.Persistence.EventStore/Snapshot/EventStoreSnapshotStore.cs b/src/Akka.Persistence.EventStore/Snapshot/EventStoreSnapshotStore.cs index 1625461..0c89cd5 100644 --- a/src/Akka.Persistence.EventStore/Snapshot/EventStoreSnapshotStore.cs +++ b/src/Akka.Persistence.EventStore/Snapshot/EventStoreSnapshotStore.cs @@ -10,9 +10,11 @@ using Akka.Streams.Dsl; using Akka.Streams.Implementation.Stages; using EventStore.Client; +using JetBrains.Annotations; namespace Akka.Persistence.EventStore.Snapshot; +[PublicAPI] public class EventStoreSnapshotStore : SnapshotStore { private readonly EventStoreClient _eventStoreClient; diff --git a/src/Akka.Persistence.EventStore/Streams/EventStoreSource.cs b/src/Akka.Persistence.EventStore/Streams/EventStoreSource.cs index c170bf2..3d3ae1f 100644 --- a/src/Akka.Persistence.EventStore/Streams/EventStoreSource.cs +++ b/src/Akka.Persistence.EventStore/Streams/EventStoreSource.cs @@ -7,26 +7,43 @@ namespace Akka.Persistence.EventStore.Streams; public static class EventStoreSource { - public static Source FromStream( + public static Source FromStream( EventStoreClient client, IEventStoreStreamOrigin from, TimeSpan? refreshInterval = null, bool resolveLinkTos = false, - TimeSpan? noEventGracePeriod = null) + TimeSpan? noEventGracePeriod = null, + RestartSettings? restartWith = null) { - return Source.From(StartIterator); + ICancelable cancelable = new Cancelable(); + + if (restartWith != null) + { + return RestartSource + .OnFailuresWithBackoff(Create, restartWith) + .MapMaterializedValue(_ => cancelable); + } + + return Create() + .MapMaterializedValue(_ => cancelable); + + Source Create() + { + return Source.From(StartIterator); + } async IAsyncEnumerable StartIterator() { var startPosition = from.From; - while (true) + while (!cancelable.IsCancellationRequested) { var readResult = client.ReadStreamAsync( from.Direction, from.StreamName, startPosition, - resolveLinkTos: resolveLinkTos); + resolveLinkTos: resolveLinkTos, + cancellationToken: cancelable.Token); var readState = await readResult.ReadState; @@ -34,8 +51,22 @@ async IAsyncEnumerable StartIterator() if (readState == ReadState.Ok) { - await foreach (var evnt in readResult) + await using var enumerator = readResult.GetAsyncEnumerator(cancelable.Token); + + while (!cancelable.IsCancellationRequested) { + try + { + if (!await enumerator.MoveNextAsync(cancelable.Token)) + break; + } + catch (OperationCanceledException) + { + yield break; + } + + var evnt = enumerator.Current; + startPosition = (evnt.Link?.EventNumber ?? evnt.OriginalEventNumber) + 1; foundEvents = true; @@ -46,7 +77,14 @@ async IAsyncEnumerable StartIterator() if (refreshInterval == null && !foundEvents && noEventGracePeriod != null) { - await Task.Delay(noEventGracePeriod.Value); + try + { + await Task.Delay(noEventGracePeriod.Value, cancelable.Token); + } + catch (OperationCanceledException) + { + yield break; + } noEventGracePeriod = null; @@ -56,7 +94,14 @@ async IAsyncEnumerable StartIterator() if (refreshInterval == null) yield break; - await Task.Delay(refreshInterval.Value); + try + { + await Task.Delay(refreshInterval.Value, cancelable.Token); + } + catch (OperationCanceledException) + { + yield break; + } } } } diff --git a/src/Akka.Persistence.EventStore/Streams/EventStoreStreamSourceExtensions.cs b/src/Akka.Persistence.EventStore/Streams/EventStoreStreamSourceExtensions.cs index 439e474..98348bc 100644 --- a/src/Akka.Persistence.EventStore/Streams/EventStoreStreamSourceExtensions.cs +++ b/src/Akka.Persistence.EventStore/Streams/EventStoreStreamSourceExtensions.cs @@ -1,11 +1,12 @@ -using Akka.Actor; using Akka.Persistence.EventStore.Query; using Akka.Persistence.EventStore.Serialization; using Akka.Streams.Dsl; using EventStore.Client; +using JetBrains.Annotations; namespace Akka.Persistence.EventStore.Streams; +[PublicAPI] public static class EventStoreStreamSourceExtensions { public static Source SerializeWith( @@ -40,8 +41,8 @@ public static Source SerializeWith( .SerializeWith(msg => adapter.Adapt(msg.Metadata, msg.Snapshot)); } - public static Source DeSerializeWith( - this Source source, + public static Source DeSerializeWith( + this Source source, Func> deserializer) { return source @@ -50,16 +51,16 @@ public static Source DeSerializeWith( .Select(x => x!); } - public static Source DeSerializeWith( - this Source source, + public static Source DeSerializeWith( + this Source source, Func deserializer) { return source .DeSerializeWith(msg => Task.FromResult(deserializer(msg))); } - public static Source, NotUsed> DeSerializeEventWith( - this Source source, + public static Source, TMat> DeSerializeEventWith( + this Source source, IMessageAdapter adapter) { return source @@ -74,8 +75,8 @@ public static Source, NotUsed> DeSer }); } - public static Source, NotUsed> DeSerializeSnapshotWith( - this Source source, + public static Source, TMat> DeSerializeSnapshotWith( + this Source source, IMessageAdapter adapter) { return source @@ -90,8 +91,8 @@ public static Source, NotUsed> DeSerializeSna }); } - public static Source, ICancelable> DeserializeWith( - this Source source, + public static Source, TMat> DeserializeWith( + this Source source, Func> deserializer) { return source @@ -103,23 +104,23 @@ public static Source, ICancelable> DeserializeWith, ICancelable> DeserializeWith( - this Source source, + public static Source, TMat> DeserializeWith( + this Source source, IMessageAdapter adapter) { return source .DeserializeWith(adapter.AdaptEvent); } - public static Source Filter( - this Source source, + public static Source Filter( + this Source source, IEventStoreStreamFilter filter) { return source.Via(new FilterStreamStage(filter)); } - public static Source, NotUsed> Filter( - this Source, NotUsed> source, + public static Source, TMat> Filter( + this Source, TMat> source, IEventStoreStreamFilter filter) { return source.Filter(new ReplayCompletionFilter(filter)); diff --git a/src/Akka.Persistence.EventStore/Streams/EventStoreSubscriptionDisconnectedException.cs b/src/Akka.Persistence.EventStore/Streams/EventStoreSubscriptionDisconnectedException.cs deleted file mode 100644 index 4354a69..0000000 --- a/src/Akka.Persistence.EventStore/Streams/EventStoreSubscriptionDisconnectedException.cs +++ /dev/null @@ -1,4 +0,0 @@ -namespace Akka.Persistence.EventStore.Streams; - -public class EventStoreSubscriptionDisconnectedException(string streamName, string groupName) - : Exception($"Subscription to stream {streamName} in group {groupName} disconnected"); \ No newline at end of file diff --git a/src/Akka.Persistence.EventStore/persistence.conf b/src/Akka.Persistence.EventStore/persistence.conf index 745554a..8abca95 100644 --- a/src/Akka.Persistence.EventStore/persistence.conf +++ b/src/Akka.Persistence.EventStore/persistence.conf @@ -8,7 +8,7 @@ connection-string = "" # dispatcher used to drive journal actor - plugin-dispatcher = "akka.actor.default-dispatcher" + materializer-dispatcher = "akka.actor.default-dispatcher" # Adapter to use to map Akka message to/from EventStore (see README) # (default | assembly qualified class name) diff --git a/src/Akka.Persistence.EventStore/snapshot.conf b/src/Akka.Persistence.EventStore/snapshot.conf index 1d0dbbb..bef348a 100644 --- a/src/Akka.Persistence.EventStore/snapshot.conf +++ b/src/Akka.Persistence.EventStore/snapshot.conf @@ -7,7 +7,7 @@ connection-string = "" # dispatcher used to drive journal actor - plugin-dispatcher = "akka.actor.default-dispatcher" + materializer-dispatcher = "akka.actor.default-dispatcher" # Adapter to use to map Akka snapshots to/from EventStore (see README) # (default | assembly qualified class name)