Skip to content

Commit

Permalink
Added missing configuration. Updated akka. Added PublicApi annotation…
Browse files Browse the repository at this point in the history
…s. (#37)
  • Loading branch information
MattiasJakobsson authored Apr 17, 2024
1 parent df25b9b commit 4d42a24
Show file tree
Hide file tree
Showing 26 changed files with 212 additions and 102 deletions.
35 changes: 18 additions & 17 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
@@ -1,40 +1,41 @@
<Project>
<PropertyGroup>
<ManagePackageVersionsCentrally>true</ManagePackageVersionsCentrally>
<AkkaVersion>1.5.18</AkkaVersion>
<AkkaVersion>1.5.19</AkkaVersion>
<EventStoreVersion>23.2.1</EventStoreVersion>
<NBenchVersion>1.2.2</NBenchVersion>
<XunitVersion>2.4.1</XunitVersion>
<TestSdkVersion>15.9.0</TestSdkVersion>
<XunitVersion>2.7.1</XunitVersion>
<TestSdkVersion>17.9.0</TestSdkVersion>
</PropertyGroup>
<!-- Akka.NET Package Versions -->
<ItemGroup>
<PackageVersion Include="Akka.Hosting" Version="1.5.6.1" />
<PackageVersion Include="Akka.Persistence.Hosting" Version="1.5.18" />
<PackageVersion Include="NuGet.Frameworks" Version="6.9.1" />
<PackageVersion Include="Akka" Version="1.5.18" />
<PackageVersion Include="Akka.Persistence.Hosting" Version="$(AkkaVersion)" />
<PackageVersion Include="Akka" Version="$(AkkaVersion)" />
<PackageVersion Include="Akka.Persistence" Version="$(AkkaVersion)" />
<PackageVersion Include="Akka.Persistence.Query" Version="$(AkkaVersion)" />
</ItemGroup>
<!-- EventStore Package Versions -->
<ItemGroup>
<PackageVersion Include="EventStore.Client.Grpc.PersistentSubscriptions" Version="$(EventStoreVersion)"/>
<PackageVersion Include="EventStore.Client.Grpc.ProjectionManagement" Version="$(EventStoreVersion)"/>
<PackageVersion Include="EventStore.Client.Grpc.Streams" Version="$(EventStoreVersion)"/>
<PackageVersion Include="EventStore.Client.Grpc.PersistentSubscriptions" Version="$(EventStoreVersion)" />
<PackageVersion Include="EventStore.Client.Grpc.ProjectionManagement" Version="$(EventStoreVersion)" />
<PackageVersion Include="EventStore.Client.Grpc.Streams" Version="$(EventStoreVersion)" />
</ItemGroup>
<!-- Testing Utilities -->
<ItemGroup>
<PackageVersion Include="Akka.Persistence.TCK" Version="1.5.18" />
<PackageVersion Include="Akka.Hosting.TestKit" Version="1.5.18" />
<PackageVersion Include="Akka.Persistence.TCK" Version="$(AkkaVersion)" />
<PackageVersion Include="Akka.Hosting.TestKit" Version="$(AkkaVersion)" />
<PackageVersion Include="Microsoft.Extensions.Configuration.Json" Version="8.0.0" />
<PackageVersion Include="Microsoft.NET.Test.Sdk" Version="17.9.0" />
<PackageVersion Include="xunit" Version="2.7.0" />
<PackageVersion Include="xunit.runner.visualstudio" Version="2.5.7" />
<PackageVersion Include="Microsoft.NET.Test.Sdk" Version="$(TestSdkVersion)" />
<PackageVersion Include="xunit" Version="$(XunitVersion)" />
<PackageVersion Include="xunit.runner.visualstudio" Version="2.5.8">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageVersion>
<PackageVersion Include="FluentAssertions" Version="6.12.0" />
<PackageVersion Include="coverlet.collector" Version="6.0.2" />
<PackageVersion Include="Docker.DotNet" Version="3.125.15" />
</ItemGroup>
<ItemGroup>
<GlobalPackageReference Include="Microsoft.SourceLink.GitHub" Version="8.0.0" PrivateAssets="all" />
<PackageVersion Include="JetBrains.Annotations" Version="2023.3.0" />
<PackageVersion Include="NuGet.Frameworks" Version="6.9.1" />
</ItemGroup>
</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public void DefaultOptionsTest()
actualConfig.GetString("persistence-ids-stream-name").Should().Be(defaultConfig.GetString("persistence-ids-stream-name"));
actualConfig.GetString("persisted-events-stream-name").Should().Be(defaultConfig.GetString("persisted-events-stream-name"));
actualConfig.GetString("tenant").Should().Be(defaultConfig.GetString("tenant"));
actualConfig.GetString("materializer-dispatcher").Should()
.Be(defaultConfig.GetString("materializer-dispatcher"));
}

[Fact(DisplayName = "Custom Options should modify default config")]
Expand All @@ -52,7 +54,8 @@ public void ModifiedOptionsTest()
TaggedStreamNamePattern = "custom-tagged-[[TAG]]",
PersistedEventsStreamName = "persisted-events-custom",
PersistenceIdsStreamName = "persistence-ids-custom",
Tenant = "tenant"
Tenant = "tenant",
MaterializerDispatcher = "custom-dispatcher"
};

var fullConfig = opt.ToConfig();
Expand All @@ -71,5 +74,6 @@ public void ModifiedOptionsTest()
config.PersistedEventsStreamName.Should().Be("persisted-events-custom");
config.PersistenceIdsStreamName.Should().Be("persistence-ids-custom");
config.Tenant.Should().Be("tenant");
config.MaterializerDispatcher.Should().Be("custom-dispatcher");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ public void DefaultOptionsTest()
actualConfig.GetString("adapter").Should().Be(defaultConfig.GetString("adapter"));
actualConfig.GetString("prefix").Should().Be(defaultConfig.GetString("prefix"));
actualConfig.GetString("tenant").Should().Be(defaultConfig.GetString("tenant"));
actualConfig.GetString("materializer-dispatcher").Should()
.Be(defaultConfig.GetString("materializer-dispatcher"));
}

[Fact(DisplayName = "Custom Options should modify default config")]
Expand All @@ -43,7 +45,8 @@ public void ModifiedOptionsTest()
ConnectionString = "a",
Adapter = "custom",
Prefix = "custom@",
Tenant = "tenant"
Tenant = "tenant",
MaterializerDispatcher = "custom-dispatcher"
};

var fullConfig = opt.ToConfig();
Expand All @@ -57,5 +60,6 @@ public void ModifiedOptionsTest()
config.Adapter.Should().Be("custom");
config.StreamPrefix.Should().Be("custom@");
config.Tenant.Should().Be("tenant");
config.MaterializerDispatcher.Should().Be("custom-dispatcher");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,21 @@ public EventStoreJournalOptions() : this(true)
private static readonly Config Default = EventStorePersistence.DefaultJournalConfiguration;
private static readonly Config DefaultQuery = EventStorePersistence.DefaultQueryConfiguration;

public string? ConnectionString { get; set; }
public string? Adapter { get; set; }
public string? StreamPrefix { get; set; }
public string? TaggedStreamNamePattern { get; set; }
public string? PersistenceIdsStreamName { get; set; }
public string? PersistedEventsStreamName { get; set; }
public TimeSpan? QueryRefreshInterval { get; set; }
public string? Tenant { get; set; }
public string? ConnectionString { get; init; }
public string? Adapter { get; init; }
public string? StreamPrefix { get; init; }
public string? TaggedStreamNamePattern { get; init; }
public string? PersistenceIdsStreamName { get; init; }
public string? PersistedEventsStreamName { get; init; }
public TimeSpan? QueryRefreshInterval { get; init; }
public TimeSpan? QueryProjectionCatchupTimeout { get; init; }
public string? Tenant { get; init; }
public string? MaterializerDispatcher { get; init; }
public override string Identifier { get; set; } = identifier;
public Config DefaultQueryConfig => DefaultQuery.MoveTo(QueryPluginId);
protected override Config InternalDefaultConfig => Default;
public string QueryPluginId => $"akka.persistence.query.journal.{Identifier}";

private string QueryPluginId => $"akka.persistence.query.journal.{Identifier}";

protected override StringBuilder Build(StringBuilder sb)
{
Expand All @@ -50,6 +53,9 @@ protected override StringBuilder Build(StringBuilder sb)

if (!string.IsNullOrEmpty(PersistedEventsStreamName))
sb.AppendLine($"persisted-events-stream-name = {PersistedEventsStreamName.ToHocon()}");

if (!string.IsNullOrEmpty(MaterializerDispatcher))
sb.AppendLine($"materializer-dispatcher = {MaterializerDispatcher.ToHocon()}");

if (!string.IsNullOrEmpty(Tenant))
sb.AppendLine($"tenant = {Tenant.ToHocon()}");
Expand All @@ -62,6 +68,9 @@ protected override StringBuilder Build(StringBuilder sb)

if (QueryRefreshInterval != null)
sb.AppendLine($"refresh-interval = {QueryRefreshInterval.ToHocon()}");

if (QueryProjectionCatchupTimeout != null)
sb.AppendLine($"projection-catchup-timeout = {QueryProjectionCatchupTimeout.ToHocon()}");

sb.AppendLine("}");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ public EventStoreSnapshotOptions() : this(true)
public string? Adapter { get; set; }
public string? Prefix { get; set; }
public string? Tenant { get; set; }
public string? MaterializerDispatcher { get; set; }
public override string Identifier { get; set; } = identifier;
protected override Config InternalDefaultConfig => Default;

Expand All @@ -34,6 +35,9 @@ protected override StringBuilder Build(StringBuilder sb)
if (!string.IsNullOrEmpty(Prefix))
sb.AppendLine($"prefix = {Prefix.ToHocon()}");

if (!string.IsNullOrEmpty(MaterializerDispatcher))
sb.AppendLine($"materializer-dispatcher = {MaterializerDispatcher.ToHocon()}");

if (!string.IsNullOrEmpty(Tenant))
sb.AppendLine($"tenant = {Tenant.ToHocon()}");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,12 @@ namespace Akka.Persistence.EventStore.Hosting;

public class EventStoreTenantOptions(string? tenantStreamNamePattern)
{
public string? TenantStreamNamePattern => tenantStreamNamePattern;

public StringBuilder Build(StringBuilder sb)
private StringBuilder Build(StringBuilder sb)
{
sb.AppendLine($"{EventStorePersistence.TenantConfigPath} {{");

if (!string.IsNullOrEmpty(TenantStreamNamePattern))
sb.AppendLine($"tenant-stream-name-pattern = {TenantStreamNamePattern.ToHocon()}");
if (!string.IsNullOrEmpty(tenantStreamNamePattern))
sb.AppendLine($"tenant-stream-name-pattern = {tenantStreamNamePattern.ToHocon()}");

sb.AppendLine("}");

Expand Down
15 changes: 12 additions & 3 deletions src/Akka.Persistence.EventStore.Hosting/HostingExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
using Akka.Hosting;
using Akka.Persistence.Hosting;
using JetBrains.Annotations;

namespace Akka.Persistence.EventStore.Hosting;

[PublicAPI]
public static class HostingExtensions
{
public static AkkaConfigurationBuilder WithEventStorePersistence(
Expand All @@ -20,7 +22,10 @@ public static AkkaConfigurationBuilder WithEventStorePersistence(
string? taggedJournalStreamPattern = null,
string? persistenceIdsStreamName = null,
string? persistedEventsStreamName = null,
string? tenantStreamNamePattern = null)
string? tenantStreamNamePattern = null,
string? materializerDispatcher = null,
TimeSpan? queryRefreshInterval = null,
TimeSpan? queryProjectionCatchupTimeout = null)
{
if (mode == PersistenceMode.SnapshotStore && journalBuilder is not null)
throw new Exception($"{nameof(journalBuilder)} can only be set when {nameof(mode)} is set to either {PersistenceMode.Both} or {PersistenceMode.Journal}");
Expand All @@ -37,7 +42,10 @@ public static AkkaConfigurationBuilder WithEventStorePersistence(
TaggedStreamNamePattern = taggedJournalStreamPattern,
PersistedEventsStreamName = persistedEventsStreamName,
PersistenceIdsStreamName = persistenceIdsStreamName,
Tenant = tenant
Tenant = tenant,
MaterializerDispatcher = materializerDispatcher,
QueryRefreshInterval = queryRefreshInterval,
QueryProjectionCatchupTimeout = queryProjectionCatchupTimeout
};

var adapters = new AkkaPersistenceJournalBuilder(journalOptions.Identifier, builder);
Expand All @@ -52,7 +60,8 @@ public static AkkaConfigurationBuilder WithEventStorePersistence(
AutoInitialize = autoInitialize,
Adapter = adapter,
Prefix = snapshotStreamPrefix,
Tenant = tenant
Tenant = tenant,
MaterializerDispatcher = materializerDispatcher
};

var tenantOptions = !string.IsNullOrEmpty(tenantStreamNamePattern)
Expand Down
34 changes: 29 additions & 5 deletions src/Akka.Persistence.EventStore.Tests/DatabaseFixture.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Docker.DotNet;
using System.Diagnostics;
using Docker.DotNet;
using Docker.DotNet.Models;
using Microsoft.Extensions.Configuration;
using System.Runtime.InteropServices;
Expand Down Expand Up @@ -93,8 +94,7 @@ await _client.Containers.CreateContainerAsync(
{
"EVENTSTORE_RUN_PROJECTIONS=All",
"EVENTSTORE_MEM_DB=True",
"EVENTSTORE_INSECURE=True",
"EVENTSTORE_ENABLE_ATOM_PUB_OVER_HTTP=True"
"EVENTSTORE_INSECURE=True"
},
HostConfig = new HostConfig
{
Expand All @@ -120,8 +120,32 @@ await _client.Containers.StartContainerAsync(
new ContainerStartParameters());

ConnectionString = $"esdb://admin:changeit@localhost:{_httpPort}?tls=false&tlsVerifyCert=false";

await Task.Delay(5000);

await WaitForEventStoreToStart(TimeSpan.FromSeconds(5), _client);

async Task WaitForEventStoreToStart(TimeSpan timeout, IDockerClient dockerClient)
{
var logStream = await dockerClient.Containers.GetContainerLogsAsync(_eventStoreContainerName, new ContainerLogsParameters

Check warning on line 128 in src/Akka.Persistence.EventStore.Tests/DatabaseFixture.cs

View workflow job for this annotation

GitHub Actions / Test-ubuntu-latest

'IContainerOperations.GetContainerLogsAsync(string, ContainerLogsParameters, CancellationToken)' is obsolete: 'The stream returned by this method won't be demultiplexed properly if the container was created without a TTY. Use GetContainerLogsAsync(string, bool, ContainerLogsParameters, CancellationToken) instead'

Check warning on line 128 in src/Akka.Persistence.EventStore.Tests/DatabaseFixture.cs

View workflow job for this annotation

GitHub Actions / Test-ubuntu-latest

'IContainerOperations.GetContainerLogsAsync(string, ContainerLogsParameters, CancellationToken)' is obsolete: 'The stream returned by this method won't be demultiplexed properly if the container was created without a TTY. Use GetContainerLogsAsync(string, bool, ContainerLogsParameters, CancellationToken) instead'
{
Follow = true,
ShowStdout = true,
ShowStderr = true
});

using (var reader = new StreamReader(logStream))
{
var stopwatch = Stopwatch.StartNew();

while (stopwatch.Elapsed < timeout && await reader.ReadLineAsync() is { } line)
{
if (line.Contains("IS LEADER... SPARTA!")) break;
}

stopwatch.Stop();
}

await logStream.DisposeAsync();
}
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class = ""Akka.Persistence.EventStore.Snapshot.EventStoreSnapshotStore, Akka.Per
class = ""Akka.Persistence.EventStore.Query.EventStoreReadJournalProvider, Akka.Persistence.EventStore""
write-plugin = ""akka.persistence.journal.eventstore""
refresh-interval = 1s
projection-catchup-timeout = 1s
}}
akka.test.single-expect-default = 10s");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
<PackageReference Include="EventStore.Client.Grpc.PersistentSubscriptions"/>
<PackageReference Include="EventStore.Client.Grpc.ProjectionManagement"/>
<PackageReference Include="EventStore.Client.Grpc.Streams"/>
<PackageReference Include="JetBrains.Annotations" />
</ItemGroup>

<PropertyGroup Condition=" '$(Configuration)' == 'Release' ">
Expand Down
18 changes: 0 additions & 18 deletions src/Akka.Persistence.EventStore/Akka.Persistence.EventStore.nuspec

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ public EventStoreReadJournalSettings(Config config)

WritePlugin = config.GetString("write-plugin");
QueryRefreshInterval = config.GetTimeSpan("refresh-interval", TimeSpan.FromSeconds(5));
ProjectionCatchupTimeout = config.GetTimeSpan("projection-catchup-timeout", TimeSpan.FromMilliseconds(500));
}

public string WritePlugin { get; }
public TimeSpan QueryRefreshInterval { get; }
public TimeSpan ProjectionCatchupTimeout { get; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,4 @@ public interface ISettingsWithAdapter
string Adapter { get; }
string DefaultSerializer { get; }
public string Tenant { get; }
string StreamPrefix { get; }

string GetStreamName(string persistenceId, EventStoreTenantSettings tenantSettings);
}
2 changes: 2 additions & 0 deletions src/Akka.Persistence.EventStore/EventStorePersistence.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
using Akka.Configuration;
using Akka.Persistence.EventStore.Journal;
using Akka.Persistence.EventStore.Snapshot;
using JetBrains.Annotations;

namespace Akka.Persistence.EventStore;

[PublicAPI]
public class EventStorePersistence : IExtension
{
public const string JournalConfigPath = "akka.persistence.journal.eventstore";
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
using Akka.Actor;
using JetBrains.Annotations;

namespace Akka.Persistence.EventStore;

/// <summary>
/// Extension Id provider for the EventStore Persistence extension.
/// </summary>
[PublicAPI]
public class EventStorePersistenceProvider : ExtensionIdProvider<EventStorePersistence>
{
/// <summary>
Expand Down
Loading

0 comments on commit 4d42a24

Please sign in to comment.