diff --git a/src/EvenireDB.Benchmark/EventsProviderBenckmarks.cs b/src/EvenireDB.Benchmark/EventsProviderBenckmarks.cs index 77764a8..0fcd3a2 100644 --- a/src/EvenireDB.Benchmark/EventsProviderBenckmarks.cs +++ b/src/EvenireDB.Benchmark/EventsProviderBenckmarks.cs @@ -2,6 +2,7 @@ using BenchmarkDotNet.Engines; using EvenireDB; using EvenireDB.Common; +using EvenireDB.Extents; using EvenireDB.Utils; using Microsoft.Extensions.Logging.Abstractions; using System.Threading.Channels; @@ -26,8 +27,9 @@ public void GlobalSetup() if (!Directory.Exists(dataPath)) Directory.CreateDirectory(dataPath); - var repoConfig = new FileEventsRepositoryConfig(dataPath); - var repo = new FileEventsRepository(repoConfig); + var repoConfig = new FileEventsRepositoryConfig(); + var extentInfoProvider = new ExtentInfoProvider(new ExtentInfoProviderConfig(dataPath)); + var repo = new FileEventsRepository(repoConfig, extentInfoProvider); var cache = new EventsCache( new NullLogger(), diff --git a/src/EvenireDB.Benchmark/FileEventsRepositoryWriteBenckmarks.cs b/src/EvenireDB.Benchmark/FileEventsRepositoryWriteBenckmarks.cs index 6ac2973..aad3b06 100644 --- a/src/EvenireDB.Benchmark/FileEventsRepositoryWriteBenckmarks.cs +++ b/src/EvenireDB.Benchmark/FileEventsRepositoryWriteBenckmarks.cs @@ -1,14 +1,14 @@ using BenchmarkDotNet.Attributes; using EvenireDB; -using Microsoft.Diagnostics.Tracing.Parsers.Kernel; +using EvenireDB.Extents; using System.Buffers; -using System.Runtime.CompilerServices; public class FileEventsRepositoryWriteBenckmarks { private readonly static byte[] _data = Enumerable.Repeat((byte)42, 100).ToArray(); private FileEventsRepositoryConfig _repoConfig; + private IExtentInfoProvider _extentInfoProvider; private IEventDataValidator _factory; private Event[] BuildEvents(int count) @@ -24,14 +24,15 @@ public void Setup() Directory.CreateDirectory(dataPath); _factory = new EventDataValidator(500_000); - _repoConfig = new FileEventsRepositoryConfig(dataPath); + _extentInfoProvider = new ExtentInfoProvider(new ExtentInfoProviderConfig(dataPath)); + _repoConfig = new FileEventsRepositoryConfig(); } [Benchmark(Baseline = true)] [ArgumentsSource(nameof(Data))] public async Task WriteAsync_Baseline(Event[] events) { - var sut = new FileEventsRepository(_repoConfig); + var sut = new FileEventsRepository(_repoConfig, _extentInfoProvider); await sut.AppendAsync(Guid.NewGuid(), events); } diff --git a/src/EvenireDB/EvenireDB.csproj b/src/EvenireDB/EvenireDB.csproj index d4ebf85..d07fa28 100644 --- a/src/EvenireDB/EvenireDB.csproj +++ b/src/EvenireDB/EvenireDB.csproj @@ -4,8 +4,7 @@ net8.0 enable enable - - true + true @@ -20,7 +19,6 @@ - diff --git a/src/EvenireDB/ExtentInfoProviderConfig.cs b/src/EvenireDB/ExtentInfoProviderConfig.cs new file mode 100644 index 0000000..308cb0c --- /dev/null +++ b/src/EvenireDB/ExtentInfoProviderConfig.cs @@ -0,0 +1,5 @@ + +namespace EvenireDB +{ + internal record ExtentInfoProviderConfig(string BasePath); +} \ No newline at end of file diff --git a/src/EvenireDB/Extents/ExtentInfo.cs b/src/EvenireDB/Extents/ExtentInfo.cs new file mode 100644 index 0000000..7781005 --- /dev/null +++ b/src/EvenireDB/Extents/ExtentInfo.cs @@ -0,0 +1,8 @@ +namespace EvenireDB.Extents +{ + public readonly struct ExtentInfo + { + public readonly required string DataPath { get; init; } + public readonly required string HeadersPath { get; init; } + } +} \ No newline at end of file diff --git a/src/EvenireDB/Extents/ExtentInfoProvider.cs b/src/EvenireDB/Extents/ExtentInfoProvider.cs new file mode 100644 index 0000000..d778c18 --- /dev/null +++ b/src/EvenireDB/Extents/ExtentInfoProvider.cs @@ -0,0 +1,26 @@ +namespace EvenireDB.Extents +{ + internal class ExtentInfoProvider : IExtentInfoProvider + { + private readonly ExtentInfoProviderConfig _config; + + public ExtentInfoProvider(ExtentInfoProviderConfig config) + { + _config = config ?? throw new ArgumentNullException(nameof(config)); + if (!Directory.Exists(_config.BasePath)) + Directory.CreateDirectory(config.BasePath); + } + + public ExtentInfo GetLatest(Guid streamId) + { + // TODO: tests + var key = streamId.ToString("N"); + int extentNumber = 0; // TODO: calculate + return new ExtentInfo + { + DataPath = Path.Combine(_config.BasePath, $"{key}_{extentNumber}_data.dat"), + HeadersPath = Path.Combine(_config.BasePath, $"{key}_{extentNumber}_headers.dat"), + }; + } + } +} \ No newline at end of file diff --git a/src/EvenireDB/Extents/IExtentInfoProvider.cs b/src/EvenireDB/Extents/IExtentInfoProvider.cs new file mode 100644 index 0000000..1337006 --- /dev/null +++ b/src/EvenireDB/Extents/IExtentInfoProvider.cs @@ -0,0 +1,7 @@ +namespace EvenireDB.Extents +{ + public interface IExtentInfoProvider + { + ExtentInfo GetLatest(Guid streamId); + } +} \ No newline at end of file diff --git a/src/EvenireDB/FileEventsRepository.cs b/src/EvenireDB/FileEventsRepository.cs index 4137e0f..983d8a3 100644 --- a/src/EvenireDB/FileEventsRepository.cs +++ b/src/EvenireDB/FileEventsRepository.cs @@ -1,4 +1,5 @@ using EvenireDB.Common; +using EvenireDB.Extents; using System.Buffers; using System.Collections.Concurrent; using System.Runtime.CompilerServices; @@ -6,35 +7,29 @@ namespace EvenireDB { - public class FileEventsRepository : IEventsRepository + internal class FileEventsRepository : IEventsRepository { - private readonly ConcurrentDictionary _eventTypes = new(); private readonly FileEventsRepositoryConfig _config; + private readonly ConcurrentDictionary _eventTypes = new(); + private readonly IExtentInfoProvider _extentInfoProvider; - private const string DataFileSuffix = "_data"; - private const string HeadersFileSuffix = "_headers"; - - public FileEventsRepository(FileEventsRepositoryConfig config) + public FileEventsRepository( + FileEventsRepositoryConfig config, + IExtentInfoProvider extentInfoProvider) { + _extentInfoProvider = extentInfoProvider ?? throw new ArgumentNullException(nameof(extentInfoProvider)); _config = config ?? throw new ArgumentNullException(nameof(config)); - - if (!Directory.Exists(_config.BasePath)) - Directory.CreateDirectory(config.BasePath); } - private string GetStreamPath(Guid streamId, string type) - => Path.Combine(_config.BasePath, $"{streamId}{type}.dat"); - public async IAsyncEnumerable ReadAsync( Guid streamId, [EnumeratorCancellation] CancellationToken cancellationToken = default) -{ - string headersPath = GetStreamPath(streamId, HeadersFileSuffix); - string dataPath = GetStreamPath(streamId, DataFileSuffix); - if (!File.Exists(headersPath) || !File.Exists(dataPath)) + { + var extentInfo = _extentInfoProvider.GetLatest(streamId); + if (!File.Exists(extentInfo.DataPath) || !File.Exists(extentInfo.HeadersPath)) yield break; - using var headersStream = new FileStream(headersPath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite); + using var headersStream = new FileStream(extentInfo.HeadersPath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite); var headers = new List(); int dataBufferSize = 0; @@ -74,12 +69,12 @@ public async IAsyncEnumerable ReadAsync( // now we read the data for all the events in a single buffer // so that we can parse it directly, avoiding accessing the file any longer - using var dataStream = new FileStream(dataPath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite); + using var dataStream = new FileStream(extentInfo.DataPath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite); dataStream.Position = headers[0].DataPosition; var dataBuffer = ArrayPool.Shared.Rent(dataBufferSize); var dataBufferMem = dataBuffer.AsMemory(0, dataBufferSize); - + try { await dataStream.ReadAsync(dataBufferMem, cancellationToken) @@ -96,7 +91,7 @@ await dataStream.ReadAsync(dataBufferMem, cancellationToken) // if skip is specified, when calculating the source offset we need to subtract the position of the first block of data // because the stream position was already set after opening it long srcOffset = headers[i].DataPosition - headers[0].DataPosition; - + // need to copy the memory here as the source array is rented // AND we need to give the event data to the calling client dataBufferMem.Slice((int)srcOffset, headers[i].EventDataLength) @@ -110,16 +105,14 @@ await dataStream.ReadAsync(dataBufferMem, cancellationToken) finally { ArrayPool.Shared.Return(dataBuffer); - } + } } public async ValueTask AppendAsync(Guid streamId, IEnumerable events, CancellationToken cancellationToken = default) { - string dataPath = GetStreamPath(streamId, DataFileSuffix); - using var dataStream = new FileStream(dataPath, FileMode.Append, FileAccess.Write, FileShare.Read); - - string headersPath = GetStreamPath(streamId, HeadersFileSuffix); - using var headersStream = new FileStream(headersPath, FileMode.Append, FileAccess.Write, FileShare.Read); + var extentInfo = _extentInfoProvider.GetLatest(streamId); + using var dataStream = new FileStream(extentInfo.DataPath, FileMode.Append, FileAccess.Write, FileShare.Read); + using var headersStream = new FileStream(extentInfo.HeadersPath, FileMode.Append, FileAccess.Write, FileShare.Read); var eventsCount = events.Count(); @@ -133,7 +126,7 @@ public async ValueTask AppendAsync(Guid streamId, IEnumerable events, Can var eventType = _eventTypes.GetOrAdd(@event.Type, static type => { - var dest = new byte[Constants.MAX_EVENT_TYPE_LENGTH]; + var dest = new byte[Constants.MAX_EVENT_TYPE_LENGTH]; Encoding.UTF8.GetBytes(type, dest); return dest; }); @@ -155,7 +148,7 @@ await headersStream.WriteAsync(headerBuffer, 0, RawEventHeader.SIZE, cancellatio finally { ArrayPool.Shared.Return(headerBuffer); - } + } await dataStream.FlushAsync().ConfigureAwait(false); await headersStream.FlushAsync().ConfigureAwait(false); diff --git a/src/EvenireDB/FileEventsRepositoryConfig.cs b/src/EvenireDB/FileEventsRepositoryConfig.cs index 7913bac..b2a8adc 100644 --- a/src/EvenireDB/FileEventsRepositoryConfig.cs +++ b/src/EvenireDB/FileEventsRepositoryConfig.cs @@ -1,5 +1,5 @@ namespace EvenireDB { - public record FileEventsRepositoryConfig(string BasePath, uint MaxPageSize = 100); + public record FileEventsRepositoryConfig(uint MaxPageSize = 100); } \ No newline at end of file diff --git a/src/EvenireDB/IServiceCollectionExtensions.cs b/src/EvenireDB/IServiceCollectionExtensions.cs index d862afc..d035e0e 100644 --- a/src/EvenireDB/IServiceCollectionExtensions.cs +++ b/src/EvenireDB/IServiceCollectionExtensions.cs @@ -1,4 +1,5 @@ -using EvenireDB.Server; +using EvenireDB.Extents; +using EvenireDB.Server; using EvenireDB.Utils; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; @@ -38,6 +39,7 @@ public static IServiceCollection AddEvenire(this IServiceCollection services, Ev { return new EventDataValidator(settings.MaxEventDataSize); }) + .AddSingleton(new FileEventsRepositoryConfig(settings.MaxEventsPageSizeFromDisk)) .AddSingleton(ctx => { var dataPath = settings.DataFolder; @@ -49,8 +51,9 @@ public static IServiceCollection AddEvenire(this IServiceCollection services, Ev dataPath = Path.Combine(AppContext.BaseDirectory, dataPath); } - return new FileEventsRepositoryConfig(dataPath, settings.MaxEventsPageSizeFromDisk); + return new ExtentInfoProviderConfig(dataPath); }) + .AddSingleton() .AddSingleton() .AddHostedService() .AddSingleton(ctx => diff --git a/src/EvenireDB/RawEventHeader.cs b/src/EvenireDB/RawEventHeader.cs index d8894bb..f6f3e9f 100644 --- a/src/EvenireDB/RawEventHeader.cs +++ b/src/EvenireDB/RawEventHeader.cs @@ -3,7 +3,7 @@ namespace EvenireDB { - //TODO: evaluate https://github.com/MessagePack-CSharp/MessagePack-CSharp + //TODO: evaluate https://github.com/Cysharp/MemoryPack or https://github.com/MessagePack-CSharp/MessagePack-CSharp internal readonly struct RawEventHeader { public readonly long EventIdTimestamp; diff --git a/tests/EvenireDB.Tests/DataFixture.cs b/tests/EvenireDB.Tests/DataFixture.cs index 12ce8c3..d8ab2be 100644 --- a/tests/EvenireDB.Tests/DataFixture.cs +++ b/tests/EvenireDB.Tests/DataFixture.cs @@ -3,14 +3,13 @@ public class DataFixture : IAsyncLifetime { private const string BaseDataPath = "./data/"; - private readonly List _configs = new(); - private readonly IEventDataValidator _factory = new EventDataValidator(1000); + private readonly List _configs = new(); - public FileEventsRepositoryConfig CreateRepoConfig(Guid aggregateId) + internal ExtentInfoProviderConfig CreateExtentsConfig(Guid aggregateId) { var path = Path.Combine(BaseDataPath, aggregateId.ToString()); Directory.CreateDirectory(path); - var config = new FileEventsRepositoryConfig(path); + var config = new ExtentInfoProviderConfig(path); _configs.Add(config); return config; } diff --git a/tests/EvenireDB.Tests/FileEventsRepositoryTests.cs b/tests/EvenireDB.Tests/FileEventsRepositoryTests.cs index a7c6e6a..fbb310d 100644 --- a/tests/EvenireDB.Tests/FileEventsRepositoryTests.cs +++ b/tests/EvenireDB.Tests/FileEventsRepositoryTests.cs @@ -1,3 +1,5 @@ +using EvenireDB.Extents; + namespace EvenireDB.Tests { public class FileEventsRepositoryTests : IClassFixture @@ -17,12 +19,13 @@ public async Task AppendAsync_should_write_events(int eventsCount, int expectedF var events = _fixture.BuildEvents(eventsCount, new byte[] { 0x42 }); var streamId = Guid.NewGuid(); - var config = _fixture.CreateRepoConfig(streamId); - var sut = new FileEventsRepository(config); + var config = _fixture.CreateExtentsConfig(streamId); + var extentInfoProvider = new ExtentInfoProvider(config); + var sut = new FileEventsRepository(new FileEventsRepositoryConfig(), extentInfoProvider); await sut.AppendAsync(streamId, events).ConfigureAwait(false); - var eventsFilePath = Path.Combine(config.BasePath, streamId + "_data.dat"); - var bytes = File.ReadAllBytes(eventsFilePath); + var extentInfo = extentInfoProvider.GetLatest(streamId); + var bytes = File.ReadAllBytes(extentInfo.DataPath); Assert.Equal(expectedFileSize, bytes.Length); } @@ -35,13 +38,14 @@ public async Task AppendAsync_should_append_events(int batchesCount, int eventsP .ToArray(); var streamId = Guid.NewGuid(); - var config = _fixture.CreateRepoConfig(streamId); - var sut = new FileEventsRepository(config); + var config = _fixture.CreateExtentsConfig(streamId); + var extentInfoProvider = new ExtentInfoProvider(config); + var sut = new FileEventsRepository(new FileEventsRepositoryConfig(), extentInfoProvider); foreach (var events in batches) await sut.AppendAsync(streamId, events).ConfigureAwait(false); - var eventsFilePath = Path.Combine(config.BasePath, streamId + "_data.dat"); - var bytes = File.ReadAllBytes(eventsFilePath); + var extentInfo = extentInfoProvider.GetLatest(streamId); + var bytes = File.ReadAllBytes(extentInfo.DataPath); Assert.Equal(expectedFileSize, bytes.Length); } @@ -53,8 +57,8 @@ public async Task ReadAsync_should_read_entire_stream(int eventsCount) var expectedEvents = _fixture.BuildEvents(eventsCount); var streamId = Guid.NewGuid(); - var config = _fixture.CreateRepoConfig(streamId); - var sut = new FileEventsRepository(config); + var config = _fixture.CreateExtentsConfig(streamId); + var sut = new FileEventsRepository(new FileEventsRepositoryConfig(), new ExtentInfoProvider(config)); await sut.AppendAsync(streamId, expectedEvents).ConfigureAwait(false); var events = await sut.ReadAsync(streamId).ToArrayAsync().ConfigureAwait(false); @@ -78,8 +82,8 @@ public async Task ReadAsync_should_read_events_appended_in_batches(int batchesCo .ToArray(); var streamId = Guid.NewGuid(); - var config = _fixture.CreateRepoConfig(streamId); - var sut = new FileEventsRepository(config); + var config = _fixture.CreateExtentsConfig(streamId); + var sut = new FileEventsRepository(new FileEventsRepositoryConfig(), new ExtentInfoProvider(config)); foreach (var events in batches) await sut.AppendAsync(streamId, events).ConfigureAwait(false);