diff --git a/samples/EvenireDB.Samples.TemperatureSensors/Program.cs b/samples/EvenireDB.Samples.TemperatureSensors/Program.cs index 36be3f9..f301da1 100644 --- a/samples/EvenireDB.Samples.TemperatureSensors/Program.cs +++ b/samples/EvenireDB.Samples.TemperatureSensors/Program.cs @@ -1,7 +1,8 @@ using EvenireDB.Client; using EvenireDB.Common; +using EvenireDB.Samples.TemperatureSensors; using Microsoft.AspNetCore.Mvc; -using System.Text.Json; + var builder = WebApplication.CreateBuilder(args); @@ -32,7 +33,8 @@ app.MapGet("/sensors/{sensorId}", async ([FromServices] IEventsClient client, Guid sensorId) => { var events = new List(); - await foreach(var item in client.ReadAsync(sensorId, StreamPosition.End, Direction.Backward).ConfigureAwait(false)) + var streamId = new StreamId(sensorId, nameof(Sensor)); + await foreach(var item in client.ReadAsync(streamId, StreamPosition.End, Direction.Backward).ConfigureAwait(false)) events.Add(item); if (events.Count() == 0) @@ -43,33 +45,3 @@ }); app.Run(); - -public record Sensor -{ - private Sensor(Guid id, Reading[]? readings) - { - this.Id = id; - this.Readings = readings ?? []; - - this.Average = this.Readings.Select(r => r.Temperature).Average(); - } - - public Guid Id { get; } - - public Reading[] Readings { get; } - - public double Average { get; } - - public static Sensor Create(Guid id, IEnumerable events) - { - var readings = events.Where(evt => evt.Type == "ReadingReceived") - .Select(evt => JsonSerializer.Deserialize(evt.Data.Span)) - .ToArray(); - - return new Sensor(id, readings); - } -} - -public record Reading(double Temperature, DateTimeOffset When); - -public record ReadingReceived(double Temperature, DateTimeOffset When); \ No newline at end of file diff --git a/samples/EvenireDB.Samples.TemperatureSensors/Reading.cs b/samples/EvenireDB.Samples.TemperatureSensors/Reading.cs new file mode 100644 index 0000000..2776a93 --- /dev/null +++ b/samples/EvenireDB.Samples.TemperatureSensors/Reading.cs @@ -0,0 +1,3 @@ +namespace EvenireDB.Samples.TemperatureSensors; + +public record Reading(double Temperature, DateTimeOffset When); diff --git a/samples/EvenireDB.Samples.TemperatureSensors/ReadingReceived.cs b/samples/EvenireDB.Samples.TemperatureSensors/ReadingReceived.cs new file mode 100644 index 0000000..4ee8ad2 --- /dev/null +++ b/samples/EvenireDB.Samples.TemperatureSensors/ReadingReceived.cs @@ -0,0 +1,3 @@ +namespace EvenireDB.Samples.TemperatureSensors; + +public record ReadingReceived(double Temperature, DateTimeOffset When); \ No newline at end of file diff --git a/samples/EvenireDB.Samples.TemperatureSensors/Sensor.cs b/samples/EvenireDB.Samples.TemperatureSensors/Sensor.cs new file mode 100644 index 0000000..e543afb --- /dev/null +++ b/samples/EvenireDB.Samples.TemperatureSensors/Sensor.cs @@ -0,0 +1,30 @@ +using EvenireDB.Client; +using System.Text.Json; + +namespace EvenireDB.Samples.TemperatureSensors; + +public record Sensor +{ + private Sensor(Guid id, Reading[]? readings) + { + this.Id = id; + this.Readings = readings ?? []; + + this.Average = this.Readings.Select(r => r.Temperature).Average(); + } + + public Guid Id { get; } + + public Reading[] Readings { get; } + + public double Average { get; } + + public static Sensor Create(Guid id, IEnumerable events) + { + var readings = events.Where(evt => evt.Type == "ReadingReceived") + .Select(evt => JsonSerializer.Deserialize(evt.Data.Span)) + .ToArray(); + + return new Sensor(id, readings); + } +} diff --git a/samples/EvenireDB.Samples.TemperatureSensors/SensorsFakeProducer.cs b/samples/EvenireDB.Samples.TemperatureSensors/SensorsFakeProducer.cs index 8411e3c..007b308 100644 --- a/samples/EvenireDB.Samples.TemperatureSensors/SensorsFakeProducer.cs +++ b/samples/EvenireDB.Samples.TemperatureSensors/SensorsFakeProducer.cs @@ -1,4 +1,7 @@ using EvenireDB.Client; +using EvenireDB.Common; + +namespace EvenireDB.Samples.TemperatureSensors; public class SensorsFakeProducer : BackgroundService { @@ -19,7 +22,10 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) foreach (var sensorId in _sensorConfig.SensorIds) { var reading = new ReadingReceived(Random.Shared.NextDouble() * 100, DateTimeOffset.UtcNow); - await _eventsClient.AppendAsync(sensorId, new[] + + var streamId = new StreamId(sensorId, nameof(Sensor)); + + await _eventsClient.AppendAsync(streamId, new[] { EventData.Create(reading), }, stoppingToken); diff --git a/samples/EvenireDB.Samples.TemperatureSensors/Settings.cs b/samples/EvenireDB.Samples.TemperatureSensors/Settings.cs index acd4289..a1eb919 100644 --- a/samples/EvenireDB.Samples.TemperatureSensors/Settings.cs +++ b/samples/EvenireDB.Samples.TemperatureSensors/Settings.cs @@ -1,4 +1,6 @@ -public record Settings +namespace EvenireDB.Samples.TemperatureSensors; + +public record Settings { public Guid[] SensorIds { get; init; } -} \ No newline at end of file +} diff --git a/src/EvenireDB.AdminUI/AlertTypes.cs b/src/EvenireDB.AdminUI/AlertTypes.cs new file mode 100644 index 0000000..df589f2 --- /dev/null +++ b/src/EvenireDB.AdminUI/AlertTypes.cs @@ -0,0 +1,7 @@ +public enum AlertTypes +{ + Info, + Success, + Warning, + Danger +} \ No newline at end of file diff --git a/src/EvenireDB.AdminUI/Extensions/StreamIdExtensions.cs b/src/EvenireDB.AdminUI/Extensions/StreamIdExtensions.cs new file mode 100644 index 0000000..4ec4cdd --- /dev/null +++ b/src/EvenireDB.AdminUI/Extensions/StreamIdExtensions.cs @@ -0,0 +1,7 @@ +using EvenireDB.Common; + +public static class StreamIdExtensions +{ + public static string ToFriendlyString(this StreamId streamId) + => $"{streamId.Type}/{streamId.Key}"; +} \ No newline at end of file diff --git a/src/EvenireDB.AdminUI/OperationStatus.cs b/src/EvenireDB.AdminUI/OperationStatus.cs index 6be882e..358b76f 100644 --- a/src/EvenireDB.AdminUI/OperationStatus.cs +++ b/src/EvenireDB.AdminUI/OperationStatus.cs @@ -4,4 +4,4 @@ Processing, Success, Failure -} \ No newline at end of file +} diff --git a/src/EvenireDB.AdminUI/Pages/NewStream.razor b/src/EvenireDB.AdminUI/Pages/NewStream.razor index 03c5a76..9837581 100644 --- a/src/EvenireDB.AdminUI/Pages/NewStream.razor +++ b/src/EvenireDB.AdminUI/Pages/NewStream.razor @@ -5,19 +5,37 @@

Create Stream

- +
+
+ +
+ +
+
+
+ +@if (!string.IsNullOrWhiteSpace(_streamType)) +{ + +}else{ + Set the Stream Type to continue +} @code{ - private Guid _streamId; + private Guid _streamKey; + private string _streamType; + + private StreamId _streamId => new StreamId(_streamKey, _streamType); protected override void OnInitialized() { - _streamId = Guid.NewGuid(); + _streamKey = Guid.NewGuid(); base.OnInitialized(); } private void OnStreamCreated() { - NavigationManager.NavigateTo($"/streams/{_streamId}"); + var url = RoutingUtils.StreamDetails(_streamId); + NavigationManager.NavigateTo(url); } } \ No newline at end of file diff --git a/src/EvenireDB.AdminUI/Pages/StreamDetails.razor b/src/EvenireDB.AdminUI/Pages/StreamDetails.razor index 7766afc..741db42 100644 --- a/src/EvenireDB.AdminUI/Pages/StreamDetails.razor +++ b/src/EvenireDB.AdminUI/Pages/StreamDetails.razor @@ -1,10 +1,21 @@ -@page "/streams/{Id:guid}" +@page "/streams/{Type}/{Key:guid}" @inject EvenireDB.Client.IStreamsClient streamsClient @inject EvenireDB.Client.IEventsClient eventsClient Stream Details

Stream Details

+@if (_stream is not null) +{ + if (_stream.IsCached) + { + cached + } + else + { + not cached + } +} @if (_status == OperationStatus.Processing) { @@ -14,36 +25,36 @@ else if (_status == OperationStatus.Success && _stream != null) {
- -
- - @if (_stream.IsCached) - { - cached - } - else - { - not cached - } + +
+
-
-
- -
- + + +
+
+
- -
- + +
+
- -
- + +
+
+ +
+ +
+ +
+
+
@@ -60,7 +71,7 @@ else if (_status == OperationStatus.Success && _stream != null)
- +
@@ -77,7 +88,7 @@ else if (_status == OperationStatus.Success && _stream != null)
- +
@@ -99,7 +110,12 @@ else if (_status == OperationStatus.Failure) private StreamEvents _streamEvents; [Parameter] - public Guid Id { get; set; } + public Guid Key { get; set; } + + [Parameter] + public string Type { get; set; } + + private StreamId StreamId => new StreamId(Key, Type); protected override async Task OnParametersSetAsync() { @@ -113,9 +129,12 @@ else if (_status == OperationStatus.Failure) _stream = null; _streamEvents?.Reset(); + if (string.IsNullOrWhiteSpace(this.Type)) + return; + try { - _stream = await streamsClient.GetStreamInfoAsync(this.Id); + _stream = await streamsClient.GetStreamInfoAsync(this.StreamId); _status = OperationStatus.Success; } @@ -124,7 +143,7 @@ else if (_status == OperationStatus.Failure) _status = OperationStatus.Failure; } - if(_streamEvents is not null) + if (_streamEvents is not null) await _streamEvents!.FetchEventsAsync(); } } diff --git a/src/EvenireDB.AdminUI/Pages/Streams.razor b/src/EvenireDB.AdminUI/Pages/Streams.razor index 333128b..e78f2a9 100644 --- a/src/EvenireDB.AdminUI/Pages/Streams.razor +++ b/src/EvenireDB.AdminUI/Pages/Streams.razor @@ -32,6 +32,7 @@ Id + Type Events # Is Cached Created on diff --git a/src/EvenireDB.AdminUI/RoutingUtils.cs b/src/EvenireDB.AdminUI/RoutingUtils.cs new file mode 100644 index 0000000..e4e13c6 --- /dev/null +++ b/src/EvenireDB.AdminUI/RoutingUtils.cs @@ -0,0 +1,7 @@ +using EvenireDB.Common; + +public static class RoutingUtils +{ + public static string StreamDetails(StreamId streamId) + => $"streams/{streamId.Type}/{streamId.Key}"; +} diff --git a/src/EvenireDB.AdminUI/Shared/Alert.razor b/src/EvenireDB.AdminUI/Shared/Alert.razor new file mode 100644 index 0000000..81b9401 --- /dev/null +++ b/src/EvenireDB.AdminUI/Shared/Alert.razor @@ -0,0 +1,11 @@ + + +@code{ + [Parameter] + public RenderFragment? ChildContent { get; set; } + + [Parameter] + public AlertTypes Type { get; set; } = AlertTypes.Success; +} \ No newline at end of file diff --git a/src/EvenireDB.AdminUI/Shared/SendEvents.razor b/src/EvenireDB.AdminUI/Shared/SendEvents.razor index 9e2f62f..215db4d 100644 --- a/src/EvenireDB.AdminUI/Shared/SendEvents.razor +++ b/src/EvenireDB.AdminUI/Shared/SendEvents.razor @@ -9,8 +9,8 @@ @if (StreamId is null) {
- -
+ +
@@ -18,16 +18,16 @@ }
- -
+ +
- -
+ +
@@ -68,14 +68,14 @@ private SendEventsModel? Model { get; set; } [Parameter] - public Guid? StreamId { get; set; } = null; + public StreamId? StreamId { get; set; } = null; protected override void OnInitialized() => Model ??= new(); protected override void OnParametersSet() { Model ??= new(); - Model.Id = StreamId ?? Guid.Empty; + Model.Id = StreamId; } private async ValueTask OnSubmitAsync() @@ -86,7 +86,8 @@ { var payload = System.Text.Encoding.UTF8.GetBytes(Model.Payload); var @event = new EventData(Model.EventType, payload); - await eventsClient.AppendAsync(Model.Id, [@event]); + StreamId streamId = Model.Id.Value; + await eventsClient.AppendAsync(streamId, [@event]); _status = OperationStatus.Success; @@ -104,7 +105,7 @@ public class SendEventsModel { [Required] - public Guid Id { get; set; } + public StreamId? Id { get; set; } [Required] public string EventType { get; set; } = string.Empty; diff --git a/src/EvenireDB.AdminUI/Shared/StreamEvents.razor b/src/EvenireDB.AdminUI/Shared/StreamEvents.razor index c287f5c..ed13ab9 100644 --- a/src/EvenireDB.AdminUI/Shared/StreamEvents.razor +++ b/src/EvenireDB.AdminUI/Shared/StreamEvents.razor @@ -6,7 +6,7 @@ Id - Type + Type Created on Payload size @@ -57,7 +57,7 @@ private Event? _selectedEvent; [Parameter] - public Guid? StreamId { get; set; } = null; + public StreamId? StreamId { get; set; } = null; protected override async Task OnParametersSetAsync() { diff --git a/src/EvenireDB.AdminUI/Shared/StreamRow.razor b/src/EvenireDB.AdminUI/Shared/StreamRow.razor index 62acaed..872d6a1 100644 --- a/src/EvenireDB.AdminUI/Shared/StreamRow.razor +++ b/src/EvenireDB.AdminUI/Shared/StreamRow.razor @@ -3,7 +3,8 @@ @if (Stream is not null) { - @Stream.StreamId + @Stream.Id.Key + @Stream.Id.Type @Stream.EventsCount @Stream.IsCached @Stream.CreatedAt @@ -11,7 +12,7 @@ @if (_status != OperationStatus.Processing) { - + @@ -55,7 +56,7 @@ try { - await streamsClient.DeleteStreamAsync(Stream.StreamId); + await streamsClient.DeleteStreamAsync(Stream.Id); _status = OperationStatus.Success; } catch (Exception) diff --git a/src/EvenireDB.AdminUI/_Imports.razor b/src/EvenireDB.AdminUI/_Imports.razor index df65403..8917d48 100644 --- a/src/EvenireDB.AdminUI/_Imports.razor +++ b/src/EvenireDB.AdminUI/_Imports.razor @@ -8,4 +8,5 @@ @using Microsoft.JSInterop @using EvenireDB.AdminUI @using EvenireDB.AdminUI.Shared -@using EvenireDB.Client \ No newline at end of file +@using EvenireDB.Client +@using EvenireDB.Common \ No newline at end of file diff --git a/src/EvenireDB.Benchmark/EvenireDB.Benchmark.csproj b/src/EvenireDB.Benchmark/EvenireDB.Benchmark.csproj deleted file mode 100644 index 236a1ac..0000000 --- a/src/EvenireDB.Benchmark/EvenireDB.Benchmark.csproj +++ /dev/null @@ -1,21 +0,0 @@ - - - - Exe - net8.0 - enable - enable - - - - - - - - - - - - - - diff --git a/src/EvenireDB.Benchmark/EventsProviderBenckmarks.cs b/src/EvenireDB.Benchmark/EventsProviderBenckmarks.cs deleted file mode 100644 index 77db58e..0000000 --- a/src/EvenireDB.Benchmark/EventsProviderBenckmarks.cs +++ /dev/null @@ -1,56 +0,0 @@ -using BenchmarkDotNet.Attributes; -using BenchmarkDotNet.Engines; -using EvenireDB; -using EvenireDB.Common; -using EvenireDB.Persistence; -using EvenireDB.Utils; -using Microsoft.Extensions.Logging.Abstractions; - -public class EventsProviderBenckmarks -{ - private readonly static byte[] _data = Enumerable.Repeat((byte)42, 100).ToArray(); - private readonly static Guid _streamId = Guid.NewGuid(); - private readonly Consumer _consumer = new Consumer(); - - private EventsReader _sut; - - [Params(10, 100, 1000)] - public uint EventsCount; - - [GlobalSetup] - public void GlobalSetup() - { - var basePath = AppContext.BaseDirectory; - var dataPath = Path.Combine(basePath, "data"); - - if (!Directory.Exists(dataPath)) - Directory.CreateDirectory(dataPath); - - var repoConfig = new FileEventsRepositoryConfig(); - var extentInfoProvider = new ExtentInfoProvider(new ExtentInfoProviderConfig(dataPath)); - var dataRepo = new DataRepository(); - var headersRepo = new HeadersRepository(); - var repo = new EventsProvider(headersRepo, dataRepo, extentInfoProvider); - - var cache = new StreamsCache( - new NullLogger(), - new LRUCache(this.EventsCount), - repo); - - var logger = new NullLogger(); - - _sut = new EventsReader(EventsReaderConfig.Default, cache); - - var events = Enumerable.Range(0, (int)this.EventsCount).Select(i => new Event(new EventId(i, 0), "lorem", _data)).ToArray(); - Task.WaitAll(repo.AppendAsync(_streamId, events).AsTask()); - } - - [Benchmark(Baseline = true)] - public async Task ReadAsync_Baseline() - { - int count = 0; - var events = await _sut.ReadAsync(_streamId, StreamPosition.Start).ToListAsync().ConfigureAwait(false); - foreach (var @evt in events) - count++; - } -} \ No newline at end of file diff --git a/src/EvenireDB.Benchmark/GetBytesBenchmark.cs b/src/EvenireDB.Benchmark/GetBytesBenchmark.cs deleted file mode 100644 index 470b8bf..0000000 --- a/src/EvenireDB.Benchmark/GetBytesBenchmark.cs +++ /dev/null @@ -1,51 +0,0 @@ -using BenchmarkDotNet.Attributes; -using System.Text; - -[MemoryDiagnoser] -public class GetBytesBenchmark -{ - private string _input; - - [Params(10, 100, 1000, 10000)] - public int Length { get; set; } - - [GlobalSetup] - public void Setup() - { - _input = new string( Enumerable.Repeat('c', this.Length).ToArray() ); - } - - [Benchmark(Baseline = true)] - public byte[] GetBytesAndCopy() - { - var encoding = Encoding.UTF8; - - var src = encoding.GetBytes(_input); - var dest = new byte[src.Length]; - Array.Copy(src, dest, src.Length); - return dest; - } - - [Benchmark()] - public byte[] GetBytesToDest() - { - var encoding = Encoding.UTF8; - int maxChars = encoding.GetMaxByteCount(_input.Length); - var dest = new byte[maxChars]; - encoding.GetBytes(_input, dest); - return dest; - } - - [Benchmark()] - public Span GetBytesToDestAsSpan() - { - var encoding = Encoding.UTF8; - int maxChars = encoding.GetMaxByteCount(_input.Length); - - var srcSpan = _input.AsSpan(); - Span dest = new byte[maxChars]; - - var written = encoding.GetBytes(srcSpan, dest); - return dest.Slice(0, written); - } -} diff --git a/src/EvenireDB.Benchmark/Program.cs b/src/EvenireDB.Benchmark/Program.cs deleted file mode 100644 index 52de6b3..0000000 --- a/src/EvenireDB.Benchmark/Program.cs +++ /dev/null @@ -1,19 +0,0 @@ -using BenchmarkDotNet.Running; - -// BenchmarkRunner.Run(); - -//TypeLayout.PrintLayout(); - -using var proc = System.Diagnostics.Process.GetCurrentProcess(); - -Console.WriteLine(proc.PrivateMemorySize64); -Console.WriteLine(proc.VirtualMemorySize64); - -var array = new byte[1_000_000_000]; -for(int i = 0; i < array.Length; i++) -{ - array[i] = 1; -} - -Console.WriteLine(proc.PrivateMemorySize64); -Console.WriteLine(proc.VirtualMemorySize64); diff --git a/src/EvenireDB.Client/Exceptions/DuplicatedEventException.cs b/src/EvenireDB.Client/Exceptions/DuplicatedEventException.cs index c7ff998..b8b90c7 100644 --- a/src/EvenireDB.Client/Exceptions/DuplicatedEventException.cs +++ b/src/EvenireDB.Client/Exceptions/DuplicatedEventException.cs @@ -4,10 +4,10 @@ namespace EvenireDB.Client.Exceptions; public class DuplicatedEventException : ClientException { - public DuplicatedEventException(Guid streamId, string message) : base(ErrorCodes.DuplicateEvent, message) + public DuplicatedEventException(StreamId streamId, string message) : base(ErrorCodes.DuplicateEvent, message) { StreamId = streamId; } - public Guid StreamId { get; } + public StreamId StreamId { get; } } diff --git a/src/EvenireDB.Client/Exceptions/StreamNotFoundException.cs b/src/EvenireDB.Client/Exceptions/StreamNotFoundException.cs index be0221c..ffa3c34 100644 --- a/src/EvenireDB.Client/Exceptions/StreamNotFoundException.cs +++ b/src/EvenireDB.Client/Exceptions/StreamNotFoundException.cs @@ -4,10 +4,10 @@ namespace EvenireDB.Client.Exceptions; public class StreamNotFoundException : ClientException { - public StreamNotFoundException(Guid streamId) : base(ErrorCodes.BadRequest, $"stream '{streamId}' does not exist.") + public StreamNotFoundException(StreamId streamId) : base(ErrorCodes.BadRequest, $"stream '{streamId}' does not exist.") { StreamId = streamId; } - public Guid StreamId { get; } + public StreamId StreamId { get; } } \ No newline at end of file diff --git a/src/EvenireDB.Client/GrpcEventsClient.cs b/src/EvenireDB.Client/GrpcEventsClient.cs index 2434ecb..37b9ae0 100644 --- a/src/EvenireDB.Client/GrpcEventsClient.cs +++ b/src/EvenireDB.Client/GrpcEventsClient.cs @@ -4,73 +4,76 @@ using GrpcEvents; using System.Runtime.CompilerServices; -namespace EvenireDB.Client +namespace EvenireDB.Client; + +internal class GrpcEventsClient : IEventsClient { - internal class GrpcEventsClient : IEventsClient + private readonly EventsGrpcService.EventsGrpcServiceClient _client; + + public GrpcEventsClient(EventsGrpcService.EventsGrpcServiceClient client) + { + _client = client ?? throw new ArgumentNullException(nameof(client)); + } + + public async ValueTask AppendAsync( + StreamId streamId, + IEnumerable events, CancellationToken cancellationToken = default) { - private readonly EventsGrpcService.EventsGrpcServiceClient _client; - - public GrpcEventsClient(EventsGrpcService.EventsGrpcServiceClient client) + var request = new AppendRequest() + { + StreamId = streamId.Key.ToString(), + StreamType = streamId.Type + }; + + foreach (var @event in events) { - _client = client ?? throw new ArgumentNullException(nameof(client)); + request.Events.Add(new GrpcEvents.EventData() + { + Type = @event.Type, + Data = Google.Protobuf.UnsafeByteOperations.UnsafeWrap(@event.Data) + }); } - public async ValueTask AppendAsync(Guid streamId, IEnumerable events, CancellationToken cancellationToken = default) + AppendResponse response = null; + try { - var request = new AppendRequest() - { - StreamId = streamId.ToString() - }; + response = await _client.AppendAsync(request, cancellationToken: cancellationToken); + } + catch(Exception ex) + { + throw new ClientException(ErrorCodes.Unknown, $"an error has occurred while appending events to stream '{streamId}': {ex.Message}"); + } - foreach (var @event in events) - { - request.Events.Add(new GrpcEvents.EventData() - { - Type = @event.Type, - Data = Google.Protobuf.UnsafeByteOperations.UnsafeWrap(@event.Data) - }); - } + if (response is null) + throw new ClientException(ErrorCodes.Unknown, "something, somewhere, went terribly, terribly wrong."); - AppendResponse response = null; - try + if (response.Error is not null) + throw response.Error.Code switch { - response = await _client.AppendAsync(request, cancellationToken: cancellationToken); - } - catch(Exception ex) - { - throw new ClientException(ErrorCodes.Unknown, $"an error has occurred while appending events to stream '{streamId}': {ex.Message}"); - } - - if (response is null) - throw new ClientException(ErrorCodes.Unknown, "something, somewhere, went terribly, terribly wrong."); - - if (response.Error is not null) - throw response.Error.Code switch - { - ErrorCodes.DuplicateEvent => new DuplicatedEventException(streamId, response.Error.Message), - _ => new ClientException(ErrorCodes.Unknown, response.Error.Message), - }; - } + ErrorCodes.DuplicateEvent => new DuplicatedEventException(streamId, response.Error.Message), + _ => new ClientException(ErrorCodes.Unknown, response.Error.Message), + }; + } - public async IAsyncEnumerable ReadAsync( - Guid streamId, - StreamPosition position, - Direction direction = Direction.Forward, - [EnumeratorCancellation] CancellationToken cancellationToken = default) + public async IAsyncEnumerable ReadAsync( + StreamId streamId, + StreamPosition position, + Direction direction = Direction.Forward, + [EnumeratorCancellation] CancellationToken cancellationToken = default) + { + var request = new ReadRequest() { - var request = new ReadRequest() - { - Direction = (uint)direction, - StartPosition = position, - StreamId = streamId.ToString(), - }; - using var response = _client.Read(request, cancellationToken: cancellationToken); - await foreach(var item in response.ResponseStream.ReadAllAsync().ConfigureAwait(false)) - { - var eventId = new EventId(item.Id.Timestamp, (ushort)item.Id.Sequence); + Direction = (uint)direction, + StartPosition = position, + StreamId = streamId.Key.ToString(), + StreamType = streamId.Type + }; + using var response = _client.Read(request, cancellationToken: cancellationToken); + await foreach(var item in response.ResponseStream.ReadAllAsync().ConfigureAwait(false)) + { + var eventId = new EventId(item.Id.Timestamp, (ushort)item.Id.Sequence); - yield return new Event(eventId, item.Type, item.Data.Memory); - } + yield return new Event(eventId, item.Type, item.Data.Memory); } } } \ No newline at end of file diff --git a/src/EvenireDB.Client/HttpEventsClient.cs b/src/EvenireDB.Client/HttpEventsClient.cs index 2be0644..25912e9 100644 --- a/src/EvenireDB.Client/HttpEventsClient.cs +++ b/src/EvenireDB.Client/HttpEventsClient.cs @@ -17,12 +17,12 @@ public HttpEventsClient(HttpClient httpClient) } public async IAsyncEnumerable ReadAsync( - Guid streamId, + StreamId streamId, StreamPosition position, Direction direction = Direction.Forward, [EnumeratorCancellation] CancellationToken cancellationToken = default) { - var endpoint = $"/api/v1/streams/{streamId}/events?pos={position}&dir={(int)direction}"; + var endpoint = $"/api/v1/streams/{streamId.Type}/{streamId.Key}/events?pos={position}&dir={(int)direction}"; var response = await _httpClient.GetAsync(endpoint, HttpCompletionOption.ResponseHeadersRead, cancellationToken) .ConfigureAwait(false); response.EnsureSuccessStatusCode(); @@ -32,9 +32,12 @@ public async IAsyncEnumerable ReadAsync( yield return item; } - public async ValueTask AppendAsync(Guid streamId, IEnumerable events, CancellationToken cancellationToken = default) + public async ValueTask AppendAsync( + StreamId streamId, + IEnumerable events, + CancellationToken cancellationToken = default) { - var response = await _httpClient.PostAsJsonAsync($"/api/v1/streams/{streamId}/events", events, cancellationToken) + var response = await _httpClient.PostAsJsonAsync($"/api/v1/streams/{streamId.Type}/{streamId.Key}/events", events, cancellationToken) .ConfigureAwait(false); if (response.IsSuccessStatusCode) return; diff --git a/src/EvenireDB.Client/HttpStreamsClient.cs b/src/EvenireDB.Client/HttpStreamsClient.cs index 66a1758..3adabc7 100644 --- a/src/EvenireDB.Client/HttpStreamsClient.cs +++ b/src/EvenireDB.Client/HttpStreamsClient.cs @@ -13,9 +13,9 @@ public HttpStreamsClient(HttpClient httpClient) _httpClient = httpClient ?? throw new ArgumentNullException(nameof(httpClient)); } - public async ValueTask DeleteStreamAsync(Guid streamId, CancellationToken cancellationToken = default) + public async ValueTask DeleteStreamAsync(StreamId streamId, CancellationToken cancellationToken = default) { - var response = await _httpClient.DeleteAsync($"/api/v1/streams/{streamId}", cancellationToken) + var response = await _httpClient.DeleteAsync($"/api/v1/streams/{streamId.Type}/{streamId.Key}", cancellationToken) .ConfigureAwait(false); if (response.IsSuccessStatusCode) return; @@ -27,9 +27,10 @@ public async ValueTask DeleteStreamAsync(Guid streamId, CancellationToken cancel }; } - public async ValueTask> GetStreamInfosAsync(CancellationToken cancellationToken = default) + public async ValueTask> GetStreamInfosAsync(StreamType? streamsType, CancellationToken cancellationToken = default) { - var response = await _httpClient.GetAsync("/api/v1/streams", cancellationToken) + var typeParam = streamsType is not null ? $"?streamsType={streamsType}" : string.Empty; + var response = await _httpClient.GetAsync($"/api/v1/streams{typeParam}", cancellationToken) .ConfigureAwait(false); response.EnsureSuccessStatusCode(); @@ -37,9 +38,9 @@ public async ValueTask> GetStreamInfosAsync(Cancellation return results; } - public async ValueTask GetStreamInfoAsync(Guid streamId, CancellationToken cancellationToken = default) + public async ValueTask GetStreamInfoAsync(StreamId streamId, CancellationToken cancellationToken = default) { - var response = await _httpClient.GetAsync($"/api/v1/streams/{streamId}", cancellationToken) + var response = await _httpClient.GetAsync($"/api/v1/streams/{streamId.Type}/{streamId.Key}", cancellationToken) .ConfigureAwait(false); if (!response.IsSuccessStatusCode) throw response.StatusCode switch diff --git a/src/EvenireDB.Client/IEventsClient.cs b/src/EvenireDB.Client/IEventsClient.cs index dc3e022..75bab6d 100644 --- a/src/EvenireDB.Client/IEventsClient.cs +++ b/src/EvenireDB.Client/IEventsClient.cs @@ -5,7 +5,7 @@ namespace EvenireDB.Client; public interface IEventsClient { //TODO: add response - ValueTask AppendAsync(Guid streamId, IEnumerable events, CancellationToken cancellationToken = default); + ValueTask AppendAsync(StreamId streamId, IEnumerable events, CancellationToken cancellationToken = default); - IAsyncEnumerable ReadAsync(Guid streamId, StreamPosition position, Direction direction = Direction.Forward, CancellationToken cancellationToken = default); + IAsyncEnumerable ReadAsync(StreamId streamId, StreamPosition position, Direction direction = Direction.Forward, CancellationToken cancellationToken = default); } diff --git a/src/EvenireDB.Client/IEventsClientExtensions.cs b/src/EvenireDB.Client/IEventsClientExtensions.cs index 0da1c02..bea68e7 100644 --- a/src/EvenireDB.Client/IEventsClientExtensions.cs +++ b/src/EvenireDB.Client/IEventsClientExtensions.cs @@ -1,33 +1,36 @@ using EvenireDB.Common; using System.Runtime.CompilerServices; -namespace EvenireDB.Client +namespace EvenireDB.Client; + +public static class IEventsClientExtensions { - public static class IEventsClientExtensions + public static IAsyncEnumerable ReadAsync( + this IEventsClient client, + StreamId streamId, + Direction direction = Direction.Forward, + CancellationToken cancellationToken = default) + => client.ReadAsync(streamId, StreamPosition.Start, direction, cancellationToken); + + public static async IAsyncEnumerable ReadAllAsync( + this IEventsClient client, + StreamId streamId, + [EnumeratorCancellation] CancellationToken cancellationToken = default) { - public static IAsyncEnumerable ReadAsync(this IEventsClient client, Guid streamId, Direction direction = Direction.Forward, CancellationToken cancellationToken = default) - => client.ReadAsync(streamId, StreamPosition.Start, direction, cancellationToken); + uint position = 0; - public static async IAsyncEnumerable ReadAllAsync( - this IEventsClient client, - Guid streamId, - [EnumeratorCancellation] CancellationToken cancellationToken = default) + while (true) { - uint position = 0; - - while (true) + bool hasItems = false; + await foreach (var item in client.ReadAsync(streamId, position: position, direction: Direction.Forward, cancellationToken: cancellationToken).ConfigureAwait(false)) { - bool hasItems = false; - await foreach (var item in client.ReadAsync(streamId, position: position, direction: Direction.Forward, cancellationToken: cancellationToken).ConfigureAwait(false)) - { - position++; - hasItems = true; - yield return item; - } - - if (!hasItems) - break; + position++; + hasItems = true; + yield return item; } + + if (!hasItems) + break; } } } \ No newline at end of file diff --git a/src/EvenireDB.Client/IStreamsClient.cs b/src/EvenireDB.Client/IStreamsClient.cs index 6d8b503..ddf15a9 100644 --- a/src/EvenireDB.Client/IStreamsClient.cs +++ b/src/EvenireDB.Client/IStreamsClient.cs @@ -1,8 +1,10 @@ -namespace EvenireDB.Client; +using EvenireDB.Common; + +namespace EvenireDB.Client; public interface IStreamsClient { - ValueTask> GetStreamInfosAsync(CancellationToken cancellationToken = default); - ValueTask GetStreamInfoAsync(Guid streamId, CancellationToken cancellationToken = default); - ValueTask DeleteStreamAsync(Guid streamId, CancellationToken cancellationToken = default); + ValueTask> GetStreamInfosAsync(StreamType? streamsType = null, CancellationToken cancellationToken = default); + ValueTask GetStreamInfoAsync(StreamId streamId, CancellationToken cancellationToken = default); + ValueTask DeleteStreamAsync(StreamId streamId, CancellationToken cancellationToken = default); } \ No newline at end of file diff --git a/src/EvenireDB.Client/StreamInfo.cs b/src/EvenireDB.Client/StreamInfo.cs deleted file mode 100644 index 623cea4..0000000 --- a/src/EvenireDB.Client/StreamInfo.cs +++ /dev/null @@ -1,8 +0,0 @@ -namespace EvenireDB.Client; - -public record StreamInfo( - Guid StreamId, - long EventsCount, - bool IsCached, - DateTimeOffset CreatedAt, - DateTimeOffset LastAccessedAt); \ No newline at end of file diff --git a/src/EvenireDB.Common/StreamId.cs b/src/EvenireDB.Common/StreamId.cs new file mode 100644 index 0000000..dc56530 --- /dev/null +++ b/src/EvenireDB.Common/StreamId.cs @@ -0,0 +1,24 @@ +using System.Diagnostics.CodeAnalysis; + +namespace EvenireDB.Common; + +public readonly record struct StreamId +{ + public StreamId() { } + + [SetsRequiredMembers] + public StreamId(Guid key, StreamType type) + { + Key = key; + Type = type; + } + + public required readonly Guid Key { get; init; } + public required readonly StreamType Type { get; init; } + + public override int GetHashCode() + => HashCode.Combine(Key, Type); + + public override string ToString() + => $"{Type}/{Key}"; +} diff --git a/src/EvenireDB/StreamInfo.cs b/src/EvenireDB.Common/StreamInfo.cs similarity index 54% rename from src/EvenireDB/StreamInfo.cs rename to src/EvenireDB.Common/StreamInfo.cs index 1ffbb77..26c2183 100644 --- a/src/EvenireDB/StreamInfo.cs +++ b/src/EvenireDB.Common/StreamInfo.cs @@ -1,8 +1,8 @@ -namespace EvenireDB; +namespace EvenireDB.Common; public record StreamInfo( - Guid StreamId, + StreamId Id, long EventsCount, bool IsCached, DateTimeOffset CreatedAt, - DateTimeOffset LastAccessedAt); \ No newline at end of file + DateTimeOffset LastAccessedAt); diff --git a/src/EvenireDB.Common/StreamPosition.cs b/src/EvenireDB.Common/StreamPosition.cs index 8ded0d6..41b4363 100644 --- a/src/EvenireDB.Common/StreamPosition.cs +++ b/src/EvenireDB.Common/StreamPosition.cs @@ -1,26 +1,25 @@ -namespace EvenireDB.Common +namespace EvenireDB.Common; + +public readonly record struct StreamPosition { - public readonly record struct StreamPosition - { - private readonly uint _value; + private readonly uint _value; - public StreamPosition(uint value) - { - _value = value; - } + public StreamPosition(uint value) + { + _value = value; + } - public static readonly StreamPosition Start = new(0); + public static readonly StreamPosition Start = new(0); - public static readonly StreamPosition End = new (uint.MaxValue); + public static readonly StreamPosition End = new (uint.MaxValue); - public static implicit operator uint(StreamPosition streamPosition) => streamPosition._value; + public static implicit operator uint(StreamPosition streamPosition) => streamPosition._value; - public static implicit operator StreamPosition(uint value) => new StreamPosition(value); + public static implicit operator StreamPosition(uint value) => new StreamPosition(value); - public override string ToString() - => _value.ToString(); + public override string ToString() + => _value.ToString(); - public override int GetHashCode() - => _value.GetHashCode(); - } + public override int GetHashCode() + => _value.GetHashCode(); } \ No newline at end of file diff --git a/src/EvenireDB.Common/StreamType.cs b/src/EvenireDB.Common/StreamType.cs new file mode 100644 index 0000000..0cfd48a --- /dev/null +++ b/src/EvenireDB.Common/StreamType.cs @@ -0,0 +1,57 @@ +using System.Diagnostics.CodeAnalysis; +using System.Text.Json.Serialization; + +namespace EvenireDB.Common; + +[JsonConverter(typeof(StreamTypeJsonConverter))] +public readonly record struct StreamType : + IParsable, + IEquatable +{ + private readonly string _value; + + public StreamType(string value) + { + //TODO: add proper validation + ArgumentException.ThrowIfNullOrWhiteSpace(value, nameof(value)); + + _value = value; + } + + public override string ToString() + => _value; + + public override int GetHashCode() + => _value.GetHashCode(); + + public static implicit operator string(StreamType streamType) => streamType._value; + public static implicit operator StreamType(string value) => new StreamType(value); + + public static readonly StreamType Empty = new StreamType("[empty]"); + + public static StreamType Parse(string s, IFormatProvider? provider) + => (StreamType)s; + + public static bool TryParse([NotNullWhen(true)] string? s, IFormatProvider? provider, [MaybeNullWhen(false)] out StreamType result) + { + result = Empty; + + if (string.IsNullOrWhiteSpace(s)) + return false; + + try + { + result = (StreamType)s; + return true; + } + catch + { + return false; + } + } + + public bool Equals(string? other) + { + return _value.Equals(other); + } +} diff --git a/src/EvenireDB.Common/StreamTypeJsonConverter.cs b/src/EvenireDB.Common/StreamTypeJsonConverter.cs new file mode 100644 index 0000000..fc0acd8 --- /dev/null +++ b/src/EvenireDB.Common/StreamTypeJsonConverter.cs @@ -0,0 +1,18 @@ +using System.Text.Json; +using System.Text.Json.Serialization; + +namespace EvenireDB.Common; + +internal class StreamTypeJsonConverter : JsonConverter +{ + public override StreamType Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) + { + var value = reader.GetString(); + return new StreamType(value); + } + + public override void Write(Utf8JsonWriter writer, StreamType value, JsonSerializerOptions options) + { + writer.WriteStringValue(value.ToString()); + } +} \ No newline at end of file diff --git a/src/EvenireDB.Server/Grpc/EventsGrcpServiceImpl.cs b/src/EvenireDB.Server/Grpc/EventsGrcpServiceImpl.cs index f250b1f..319d58e 100644 --- a/src/EvenireDB.Server/Grpc/EventsGrcpServiceImpl.cs +++ b/src/EvenireDB.Server/Grpc/EventsGrcpServiceImpl.cs @@ -1,84 +1,89 @@ using EvenireDB.Common; using Grpc.Core; using GrpcEvents; +using Microsoft.AspNetCore.DataProtection.KeyManagement; -namespace EvenireDB.Server.Grpc +namespace EvenireDB.Server.Grpc; + +public class EventsGrcpServiceImpl : EventsGrpcService.EventsGrpcServiceBase { - public class EventsGrcpServiceImpl : EventsGrpcService.EventsGrpcServiceBase + private readonly IEventsReader _reader; + private readonly IEventsWriter _writer; + private readonly IEventDataValidator _validator; + + public EventsGrcpServiceImpl(IEventsReader reader, IEventDataValidator validator, IEventsWriter writer) { - private readonly IEventsReader _reader; - private readonly IEventsWriter _writer; - private readonly IEventDataValidator _validator; + _reader = reader ?? throw new ArgumentNullException(nameof(reader)); + _validator = validator ?? throw new ArgumentNullException(nameof(validator)); + _writer = writer ?? throw new ArgumentNullException(nameof(writer)); + } - public EventsGrcpServiceImpl(IEventsReader reader, IEventDataValidator validator, IEventsWriter writer) - { - _reader = reader ?? throw new ArgumentNullException(nameof(reader)); - _validator = validator ?? throw new ArgumentNullException(nameof(validator)); - _writer = writer ?? throw new ArgumentNullException(nameof(writer)); - } + public override async Task Read(ReadRequest request, IServerStreamWriter responseStream, ServerCallContext context) + { + if (!Guid.TryParse(request.StreamId, out var key)) + throw new ArgumentOutOfRangeException(nameof(request.StreamId)); //TODO: is this ok? - public override async Task Read(ReadRequest request, IServerStreamWriter responseStream, ServerCallContext context) - { - if (!Guid.TryParse(request.StreamId, out var streamId)) - throw new ArgumentOutOfRangeException(nameof(request.StreamId)); //TODO: is this ok? + ArgumentNullException.ThrowIfNullOrWhiteSpace(request.StreamType, nameof(request.StreamType)); + + var streamId = new StreamId { Key = key, Type = request.StreamType }; - await foreach(var @event in _reader.ReadAsync( - streamId, - direction: (Direction)request.Direction, - startPosition: request.StartPosition).ConfigureAwait(false)) + await foreach(var @event in _reader.ReadAsync( + streamId, + direction: (Direction)request.Direction, + startPosition: request.StartPosition).ConfigureAwait(false)) + { + var dto = new GrpcEvents.Event() { - var dto = new GrpcEvents.Event() + Data = Google.Protobuf.UnsafeByteOperations.UnsafeWrap(@event.Data), + Type = @event.Type, + Id = new GrpcEvents.EventId() { - Data = Google.Protobuf.UnsafeByteOperations.UnsafeWrap(@event.Data), - Type = @event.Type, - Id = new GrpcEvents.EventId() - { - Timestamp = @event.Id.Timestamp, - Sequence = @event.Id.Sequence - } - }; - await responseStream.WriteAsync(dto).ConfigureAwait(false); - } + Timestamp = @event.Id.Timestamp, + Sequence = @event.Id.Sequence + } + }; + await responseStream.WriteAsync(dto).ConfigureAwait(false); } + } - public override async Task Append(AppendRequest request, ServerCallContext context) - { - var events = new List(request.Events.Count); - var response = new AppendResponse() { StreamId = request.StreamId }; - - try - { - var streamId = Guid.Parse(request.StreamId); + public override async Task Append(AppendRequest request, ServerCallContext context) + { + var events = new List(request.Events.Count); + var response = new AppendResponse() { StreamId = request.StreamId, StreamType = request.StreamType }; - foreach (var incoming in request.Events) - { - _validator.Validate(incoming.Type, incoming.Data.Memory); - var @event = new EventData(incoming.Type, incoming.Data.Memory); - events.Add(@event); - } - - var result = await _writer.AppendAsync(streamId, events, request.ExpectedVersion) - .ConfigureAwait(false); + try + { + var key = Guid.Parse(request.StreamId); + var streamId = new StreamId { Key = key, Type = request.StreamType }; - if (result is FailureResult failure) - { - response.Error = new FailureResponse() - { - Code = failure.Code, - Message = failure.Message, - }; - } - } - catch(ArgumentException ex) + foreach (var incoming in request.Events) { - response.Error = new FailureResponse() { Code = ErrorCodes.BadRequest, Message = ex.Message }; + _validator.Validate(incoming.Type, incoming.Data.Memory); + var @event = new EventData(incoming.Type, incoming.Data.Memory); + events.Add(@event); } - catch (Exception ex) + + var result = await _writer.AppendAsync(streamId, events, request.ExpectedVersion) + .ConfigureAwait(false); + + if (result is FailureResult failure) { - response.Error = new FailureResponse() { Code = ErrorCodes.Unknown, Message = ex.Message }; + response.Error = new FailureResponse() + { + Code = failure.Code, + Message = failure.Message, + }; } - - return response; } + catch(ArgumentException ex) + { + response.Error = new FailureResponse() { Code = ErrorCodes.BadRequest, Message = ex.Message }; + } + catch (Exception ex) + { + response.Error = new FailureResponse() { Code = ErrorCodes.Unknown, Message = ex.Message }; + } + + return response; } } diff --git a/src/EvenireDB.Server/Routes/StreamsRoutes.cs b/src/EvenireDB.Server/Routes/StreamsRoutes.cs index 18ec203..a2a3584 100644 --- a/src/EvenireDB.Server/Routes/StreamsRoutes.cs +++ b/src/EvenireDB.Server/Routes/StreamsRoutes.cs @@ -1,8 +1,10 @@ using EvenireDB.Common; using EvenireDB.Server.DTO; +using Microsoft.AspNetCore.DataProtection.KeyManagement; using Microsoft.AspNetCore.Mvc; namespace EvenireDB.Server.Routes; + public static class StreamsRoutes { public static WebApplication MapEventsRoutes(this WebApplication app) @@ -11,52 +13,68 @@ public static WebApplication MapEventsRoutes(this WebApplication app) var v1 = api.MapGroup("/api/v{version:apiVersion}/streams") .HasApiVersion(1.0); v1.MapGet("", GetStreams).WithName(nameof(GetStreams)); - v1.MapGet("/{streamId:guid}", GetStreamInfo).WithName(nameof(GetStreamInfo)); - v1.MapDelete("/{streamId:guid}", DeleteSteamAsync).WithName(nameof(DeleteSteamAsync)); - v1.MapGet("/{streamId:guid}/events", GetEventsAsync).WithName(nameof(GetEventsAsync)); - v1.MapPost("/{streamId:guid}/events", AppendEventsAsync).WithName(nameof(AppendEventsAsync)); + v1.MapGet("/{streamType}/{streamKey:guid}", GetStreamInfo).WithName(nameof(GetStreamInfo)); + v1.MapDelete("/{streamType}/{streamKey:guid}", DeleteSteamAsync).WithName(nameof(DeleteSteamAsync)); + v1.MapGet("/{streamType}/{streamKey:guid}/events", GetEventsAsync).WithName(nameof(GetEventsAsync)); + v1.MapPost("/{streamType}/{streamKey:guid}/events", AppendEventsAsync).WithName(nameof(AppendEventsAsync)); return app; } private static async ValueTask DeleteSteamAsync( [FromServices] IStreamInfoProvider provider, - Guid streamId) + string streamType, + Guid streamKey) { try { - await provider.DeleteStreamAsync(streamId); - return Results.NoContent(); + var streamId = new StreamId { Key = streamKey, Type = streamType }; + var result = await provider.DeleteStreamAsync(streamId); + return result ? Results.NoContent() : Results.NotFound(); } catch (ArgumentException) { - return Results.NotFound(); + return Results.BadRequest(); } } private static IResult GetStreams( - [FromServices] IStreamInfoProvider provider) + [FromServices] IStreamInfoProvider provider, + [FromQuery] StreamType? streamsType = null) //TODO: test types filter { - var streams = provider.GetStreamsInfo(); + var streams = provider.GetStreamsInfo(streamsType); return Results.Ok(streams); } private static IResult GetStreamInfo( [FromServices] IStreamInfoProvider provider, - Guid streamId) + string streamType, + Guid streamKey) { - var result = provider.GetStreamInfo(streamId); - return (result is null) ? - Results.NotFound() : - Results.Ok(result); + try + { + var streamId = new StreamId { Key = streamKey, Type = streamType }; + var result = provider.GetStreamInfo(streamId); + return (result is null) ? + Results.NotFound() : + Results.Ok(result); + } + catch (ArgumentException) + { + return Results.BadRequest(); + } } private static async IAsyncEnumerable GetEventsAsync( [FromServices] IEventsReader reader, - Guid streamId, + string streamType, + Guid streamKey, [FromQuery(Name = "pos")] uint startPosition = 0, [FromQuery(Name = "dir")] Direction direction = Direction.Forward) { + //TODO: handle malformed stream id + + var streamId = new StreamId { Key = streamKey, Type = streamType }; await foreach (var @event in reader.ReadAsync(streamId, direction: direction, startPosition: startPosition).ConfigureAwait(false)) yield return EventDTO.FromModel(@event); } @@ -64,7 +82,8 @@ private static async IAsyncEnumerable GetEventsAsync( private static async ValueTask AppendEventsAsync( [FromServices] EventMapper mapper, [FromServices] IEventsWriter writer, - Guid streamId, + string streamType, + Guid streamKey, [FromQuery(Name = "version")] int? expectedVersion, [FromBody] EventDataDTO[]? dtos) { @@ -82,6 +101,9 @@ private static async ValueTask AppendEventsAsync( return Results.BadRequest(new ApiError(ErrorCodes.BadRequest, ex.Message)); } + //TODO: handle malformed stream id + + var streamId = new StreamId { Key = streamKey, Type = streamType }; var result = await writer.AppendAsync(streamId, events, expectedVersion) .ConfigureAwait(false); return result switch @@ -90,7 +112,7 @@ private static async ValueTask AppendEventsAsync( FailureResult { Code: ErrorCodes.VersionMismatch } d => Results.BadRequest(new ApiError(ErrorCodes.VersionMismatch, d.Message)), FailureResult { Code: ErrorCodes.BadRequest } d => Results.BadRequest(new ApiError(ErrorCodes.BadRequest, d.Message)), FailureResult => Results.StatusCode(500), - _ => Results.AcceptedAtRoute(nameof(GetEventsAsync), new { streamId }) + _ => Results.AcceptedAtRoute(nameof(GetEventsAsync), new { streamKey }) }; } } \ No newline at end of file diff --git a/src/EvenireDB.sln b/src/EvenireDB.sln index c9dc703..9237b8c 100644 --- a/src/EvenireDB.sln +++ b/src/EvenireDB.sln @@ -7,8 +7,6 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "EvenireDB", "EvenireDB\Even EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "EvenireDB.Tests", "..\tests\EvenireDB.Tests\EvenireDB.Tests.csproj", "{0CE5609B-4935-46F0-8EDD-90AB69BFD58D}" EndProject -Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "EvenireDB.Benchmark", "EvenireDB.Benchmark\EvenireDB.Benchmark.csproj", "{3C02C130-1AA5-41FD-A55E-598CD2748CDE}" -EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "EvenireDB.Server", "EvenireDB.Server\EvenireDB.Server.csproj", "{0437DD06-3EE1-44B2-9867-D11570658B67}" EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "EvenireDB.Server.Tests", "..\tests\EvenireDB.Server.Tests\EvenireDB.Server.Tests.csproj", "{57498174-AE43-4872-8DD1-D1AC649DFD00}" @@ -36,7 +34,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "tools", "tools", "{0BFC47FA EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "EvenireDB.Tools.EventsGenerator", "tools\EvenireDB.Tools.EventsGenerator\EvenireDB.Tools.EventsGenerator.csproj", "{90FD3F30-39AF-4F30-8403-C4F799A0C67A}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EvenireDB.AdminUI", "EvenireDB.AdminUI\EvenireDB.AdminUI.csproj", "{314152BF-D751-463B-86C4-E2D586279592}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "EvenireDB.AdminUI", "EvenireDB.AdminUI\EvenireDB.AdminUI.csproj", "{314152BF-D751-463B-86C4-E2D586279592}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution @@ -52,10 +50,6 @@ Global {0CE5609B-4935-46F0-8EDD-90AB69BFD58D}.Debug|Any CPU.Build.0 = Debug|Any CPU {0CE5609B-4935-46F0-8EDD-90AB69BFD58D}.Release|Any CPU.ActiveCfg = Release|Any CPU {0CE5609B-4935-46F0-8EDD-90AB69BFD58D}.Release|Any CPU.Build.0 = Release|Any CPU - {3C02C130-1AA5-41FD-A55E-598CD2748CDE}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {3C02C130-1AA5-41FD-A55E-598CD2748CDE}.Debug|Any CPU.Build.0 = Debug|Any CPU - {3C02C130-1AA5-41FD-A55E-598CD2748CDE}.Release|Any CPU.ActiveCfg = Release|Any CPU - {3C02C130-1AA5-41FD-A55E-598CD2748CDE}.Release|Any CPU.Build.0 = Release|Any CPU {0437DD06-3EE1-44B2-9867-D11570658B67}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {0437DD06-3EE1-44B2-9867-D11570658B67}.Debug|Any CPU.Build.0 = Debug|Any CPU {0437DD06-3EE1-44B2-9867-D11570658B67}.Release|Any CPU.ActiveCfg = Release|Any CPU diff --git a/src/EvenireDB/EventsReader.cs b/src/EvenireDB/EventsReader.cs index 0193ea8..ee5312a 100644 --- a/src/EvenireDB/EventsReader.cs +++ b/src/EvenireDB/EventsReader.cs @@ -1,68 +1,65 @@ using EvenireDB.Common; -using EvenireDB.Persistence; -using Microsoft.Extensions.Logging; using System.Runtime.CompilerServices; -namespace EvenireDB +namespace EvenireDB; + +internal class EventsReader : IEventsReader { - internal class EventsReader : IEventsReader + private readonly IStreamsCache _cache; + private readonly EventsReaderConfig _config; + + public EventsReader( + EventsReaderConfig config, + IStreamsCache cache) { - private readonly IStreamsCache _cache; - private readonly EventsReaderConfig _config; - - public EventsReader( - EventsReaderConfig config, - IStreamsCache cache) - { - _cache = cache ?? throw new ArgumentNullException(nameof(cache)); - _config = config ?? throw new ArgumentNullException(nameof(config)); - } + _cache = cache ?? throw new ArgumentNullException(nameof(cache)); + _config = config ?? throw new ArgumentNullException(nameof(config)); + } - public async IAsyncEnumerable ReadAsync( - Guid streamId, - StreamPosition startPosition, - Direction direction = Direction.Forward, - [EnumeratorCancellation] CancellationToken cancellationToken = default) - { - if (startPosition < 0) - throw new ArgumentOutOfRangeException(nameof(startPosition)); + public async IAsyncEnumerable ReadAsync( + StreamId streamId, + StreamPosition startPosition, + Direction direction = Direction.Forward, + [EnumeratorCancellation] CancellationToken cancellationToken = default) + { + if (startPosition < 0) + throw new ArgumentOutOfRangeException(nameof(startPosition)); - CachedEvents entry = await _cache.GetEventsAsync(streamId, cancellationToken).ConfigureAwait(false); + CachedEvents entry = await _cache.GetEventsAsync(streamId, cancellationToken).ConfigureAwait(false); - if (entry?.Events == null || entry.Events.Count == 0) - yield break; + if (entry?.Events == null || entry.Events.Count == 0) + yield break; - uint totalCount = (uint)entry.Events.Count; - uint pos = startPosition; + uint totalCount = (uint)entry.Events.Count; + uint pos = startPosition; - if (direction == Direction.Forward) - { - if (totalCount < startPosition) - yield break; + if (direction == Direction.Forward) + { + if (totalCount < startPosition) + yield break; - uint j = 0, i = pos, - finalCount = Math.Min(_config.MaxPageSize, totalCount - i); + uint j = 0, i = pos, + finalCount = Math.Min(_config.MaxPageSize, totalCount - i); - while (j++ != finalCount) - { - yield return entry.Events[(int)i++]; - } - } - else + while (j++ != finalCount) { - if (startPosition == StreamPosition.End) - pos = totalCount - 1; + yield return entry.Events[(int)i++]; + } + } + else + { + if (startPosition == StreamPosition.End) + pos = totalCount - 1; - if (pos >= totalCount) - yield break; + if (pos >= totalCount) + yield break; - uint j = 0, i = pos, - finalCount = Math.Min(_config.MaxPageSize, i + 1); + uint j = 0, i = pos, + finalCount = Math.Min(_config.MaxPageSize, i + 1); - while (j++ != finalCount) - { - yield return entry.Events[(int)i--]; - } + while (j++ != finalCount) + { + yield return entry.Events[(int)i--]; } } } diff --git a/src/EvenireDB/EventsWriter.cs b/src/EvenireDB/EventsWriter.cs index 836ac49..6abc55b 100644 --- a/src/EvenireDB/EventsWriter.cs +++ b/src/EvenireDB/EventsWriter.cs @@ -1,80 +1,78 @@ -using Microsoft.Extensions.Logging; +using EvenireDB.Common; +using Microsoft.Extensions.Logging; using System.Threading.Channels; -namespace EvenireDB +namespace EvenireDB; + +// TODO: append to a transaction log +public class EventsWriter : IEventsWriter { - // TODO: append to a transaction log - public class EventsWriter : IEventsWriter + private readonly IStreamsCache _cache; + private readonly ChannelWriter _writer; + private readonly IEventIdGenerator _idGenerator; + private readonly ILogger _logger; + + public EventsWriter(IStreamsCache cache, ChannelWriter writer, IEventIdGenerator idGenerator, ILogger logger) { - private readonly IStreamsCache _cache; - private readonly ChannelWriter _writer; - private readonly IEventIdGenerator _idGenerator; - private readonly ILogger _logger; + _cache = cache; + _writer = writer; + _idGenerator = idGenerator; + _logger = logger; + } + + public async ValueTask AppendAsync( + StreamId streamId, + IEnumerable incomingEvents, + int? expectedVersion = null, + CancellationToken cancellationToken = default) + { + if(incomingEvents is null) + return FailureResult.NullEvents(streamId); - public EventsWriter(IStreamsCache cache, ChannelWriter writer, IEventIdGenerator idGenerator, ILogger logger) - { - _cache = cache; - _writer = writer; - _idGenerator = idGenerator; - _logger = logger; - } - - public async ValueTask AppendAsync( - Guid streamId, - IEnumerable incomingEvents, - int? expectedVersion = null, - CancellationToken cancellationToken = default) - { - if(streamId == Guid.Empty) - return FailureResult.InvalidStream(streamId); + if (!incomingEvents.Any()) + return new SuccessResult(); - ArgumentNullException.ThrowIfNull(incomingEvents, nameof(incomingEvents)); + CachedEvents entry = await _cache.GetEventsAsync(streamId, cancellationToken).ConfigureAwait(false); - if (!incomingEvents.Any()) - return new SuccessResult(); + if (expectedVersion.HasValue && entry.Events.Count != expectedVersion) + return FailureResult.VersionMismatch(streamId, expectedVersion.Value, entry.Events.Count); - CachedEvents entry = await _cache.GetEventsAsync(streamId, cancellationToken).ConfigureAwait(false); + entry.Semaphore.Wait(cancellationToken); + try + { + // TODO: add a metadata field on the event data, use it to allow check for duplicate events + //if (entry.Events.Count > 0 && + // HasDuplicateEvent(incomingEvents, entry, out var duplicate)) + // return FailureResult.DuplicateEvent(duplicate); - if (expectedVersion.HasValue && entry.Events.Count != expectedVersion) - return FailureResult.VersionMismatch(streamId, expectedVersion.Value, entry.Events.Count); + _logger.AppendingEventsToStream(incomingEvents.Count(), streamId); - entry.Semaphore.Wait(cancellationToken); - try + var events = new List(); + EventId? previousEventId = null; + for (int i = 0; i < incomingEvents.Count(); i++) { - // TODO: add a metadata field on the event data, use it to allow check for duplicate events - //if (entry.Events.Count > 0 && - // HasDuplicateEvent(incomingEvents, entry, out var duplicate)) - // return FailureResult.DuplicateEvent(duplicate); - - _logger.AppendingEventsToStream(incomingEvents.Count(), streamId); + var eventData = incomingEvents.ElementAt(i); - var events = new List(); - EventId? previousEventId = null; - for (int i = 0; i < incomingEvents.Count(); i++) - { - var eventData = incomingEvents.ElementAt(i); + var eventId = _idGenerator.Generate(previousEventId); + var @event = new Event(eventId, eventData.Type, eventData.Data); - var eventId = _idGenerator.Generate(previousEventId); - var @event = new Event(eventId, eventData.Type, eventData.Data); + events.Add(@event); - events.Add(@event); - - previousEventId = eventId; - } - - var group = new IncomingEventsGroup(streamId, events); - if (!_writer.TryWrite(group)) - return FailureResult.CannotInitiateWrite(streamId); - - entry.Events.AddRange(events); - _cache.Update(streamId, entry); - } - finally - { - entry.Semaphore.Release(); + previousEventId = eventId; } - return new SuccessResult(); + var group = new IncomingEventsBatch(streamId, events); + if (!_writer.TryWrite(group)) + return FailureResult.CannotInitiateWrite(streamId); + + entry.Events.AddRange(events); + _cache.Update(streamId, entry); } + finally + { + entry.Semaphore.Release(); + } + + return new SuccessResult(); } } \ No newline at end of file diff --git a/src/EvenireDB/Exceptions/StreamException.cs b/src/EvenireDB/Exceptions/StreamException.cs index 8f2f339..c979e9d 100644 --- a/src/EvenireDB/Exceptions/StreamException.cs +++ b/src/EvenireDB/Exceptions/StreamException.cs @@ -1,11 +1,13 @@ +using EvenireDB.Common; + namespace EvenireDB.Exceptions; public class StreamException : Exception { - public StreamException(Guid streamId, string message) : base(message) + public StreamException(StreamId streamId, string message) : base(message) { StreamId = streamId; } - public Guid StreamId { get; } + public StreamId StreamId { get; } } \ No newline at end of file diff --git a/src/EvenireDB/ExtentInfo.cs b/src/EvenireDB/ExtentInfo.cs index 8bc2792..e7e420d 100644 --- a/src/EvenireDB/ExtentInfo.cs +++ b/src/EvenireDB/ExtentInfo.cs @@ -1,3 +1,5 @@ +using EvenireDB.Common; + namespace EvenireDB; -public record ExtentInfo(Guid StreamId, string DataPath, string HeadersPath); \ No newline at end of file +public record ExtentInfo(StreamId StreamId, string DataPath, string HeadersPath); \ No newline at end of file diff --git a/src/EvenireDB/ExtentInfoProvider.cs b/src/EvenireDB/ExtentInfoProvider.cs deleted file mode 100644 index fcd3899..0000000 --- a/src/EvenireDB/ExtentInfoProvider.cs +++ /dev/null @@ -1,48 +0,0 @@ -namespace EvenireDB; - -internal record ExtentInfoProviderConfig(string BasePath); - -internal class ExtentInfoProvider : IExtentInfoProvider -{ - private readonly ExtentInfoProviderConfig _config; - - public ExtentInfoProvider(ExtentInfoProviderConfig config) - { - _config = config ?? throw new ArgumentNullException(nameof(config)); - if (!Directory.Exists(_config.BasePath)) - Directory.CreateDirectory(config.BasePath); - } - - public ExtentInfo? GetExtentInfo(Guid streamId, bool skipCheck = false) - { - // TODO: tests - var key = streamId.ToString("N"); - int extentNumber = 0; // TODO: calculate - - var headersPath = Path.Combine(_config.BasePath, $"{key}_{extentNumber}_headers.dat"); - if(!skipCheck && !File.Exists(headersPath)) - return null; - - return new ExtentInfo - ( - StreamId: streamId, - DataPath: Path.Combine(_config.BasePath, $"{key}_{extentNumber}_data.dat"), - HeadersPath: headersPath - ); - } - - public IEnumerable GetExtentsInfo() - { - // taking the first extent for each stream - // just to be sure that the stream exists - // so that we can parse the stream id from the file name - var headersFiles = Directory.GetFiles(_config.BasePath, "*_0_headers.dat"); - foreach (var headerFile in headersFiles) - { - var filename = Path.GetFileNameWithoutExtension(headerFile); - var key = filename.Substring(0, 32); - - yield return GetExtentInfo(Guid.Parse(key)); - } - } -} diff --git a/src/EvenireDB/ExtentsProvider.cs b/src/EvenireDB/ExtentsProvider.cs new file mode 100644 index 0000000..4d54ea9 --- /dev/null +++ b/src/EvenireDB/ExtentsProvider.cs @@ -0,0 +1,86 @@ +using EvenireDB.Common; +using EvenireDB.Utils; +using System.IO; + +namespace EvenireDB; + +internal record ExtentsProviderConfig(string BasePath); + +// TODO: tests +internal class ExtentsProvider : IExtentsProvider +{ + private readonly ExtentsProviderConfig _config; + + public ExtentsProvider(ExtentsProviderConfig config) + { + _config = config ?? throw new ArgumentNullException(nameof(config)); + if (!Directory.Exists(_config.BasePath)) + Directory.CreateDirectory(config.BasePath); + } + + public ExtentInfo? GetExtentInfo(StreamId streamId, bool createIfMissing = false) + { + var key = streamId.Key.ToString("N"); + int extentNumber = 0; // TODO: calculate + + var typedExtentsPath = Path.Combine(_config.BasePath, streamId.Type); + + lock (this) + { + if (!Directory.Exists(typedExtentsPath)) + Directory.CreateDirectory(typedExtentsPath); + } + + var headersPath = Path.Combine(typedExtentsPath, $"{key}_{extentNumber}_headers.dat"); + if(!createIfMissing && !File.Exists(headersPath)) + return null; + + return new ExtentInfo + ( + StreamId: streamId, + DataPath: Path.Combine(_config.BasePath, $"{key}_{extentNumber}_data.dat"), + HeadersPath: headersPath + ); + } + + public IEnumerable GetAllExtentsInfo(StreamType? streamType = null) + { + var pattern = streamType?.ToString() ?? string.Empty; + var types = Directory.GetDirectories(_config.BasePath, pattern) ?? Array.Empty(); + foreach (var typeFolder in types) + { + string type = Path.GetFileNameWithoutExtension(typeFolder); + + // taking the first extent for each stream + // just to be sure that the stream exists + // so that we can parse the stream id from the file name + var headersFiles = Directory.GetFiles(typeFolder, "*_0_headers.dat"); + foreach (var headerFile in headersFiles) + { + var filename = Path.GetFileNameWithoutExtension(headerFile); + var key = filename.Substring(0, 32); + var streamId = new StreamId { Key = Guid.Parse(key), Type = type }; + + yield return GetExtentInfo(streamId)!; + } + } + } + + public async ValueTask DeleteExtentsAsync(StreamId streamId, CancellationToken cancellationToken = default) + { + // TODO: this should eventually retrieve all the extents for the stream + var extent = GetExtentInfo(streamId); + if (extent is null) + return; + + var result = await FileUtils.TryDeleteFileAsync(extent.HeadersPath, cancellationToken: cancellationToken); + if (!result) + throw new InvalidOperationException($"Failed to delete stream '{streamId}'."); + + result = await FileUtils.TryDeleteFileAsync(extent.DataPath, cancellationToken: cancellationToken); + if (!result) + throw new InvalidOperationException($"Failed to delete stream '{streamId}'."); + } +} + + diff --git a/src/EvenireDB/FailureResult.cs b/src/EvenireDB/FailureResult.cs index a969fa7..bae48c0 100644 --- a/src/EvenireDB/FailureResult.cs +++ b/src/EvenireDB/FailureResult.cs @@ -13,10 +13,11 @@ public FailureResult(int code, string message) public string Message { get; } = string.Empty; public int Code { get; } = ErrorCodes.Unknown; - public static FailureResult InvalidStream(Guid streamId) + + public static FailureResult NullEvents(StreamId streamId) => new FailureResult( ErrorCodes.BadRequest, - $"invalid stream id: {streamId}."); + $"received null events collection for stream {streamId}."); public static FailureResult DuplicateEvent(Event? @event) => new FailureResult( @@ -24,12 +25,12 @@ public static FailureResult DuplicateEvent(Event? @event) (@event is null) ? "one of the incoming events is duplicate." : $"event '{@event.Id}' is already in the stream."); - public static IOperationResult CannotInitiateWrite(Guid streamId) + public static IOperationResult CannotInitiateWrite(StreamId streamId) => new FailureResult( ErrorCodes.CannotInitiateWrite, $"unable to write events for stream '{streamId}'."); - internal static IOperationResult VersionMismatch(Guid streamId, int expected, int actual) + internal static IOperationResult VersionMismatch(StreamId streamId, int expected, int actual) => new FailureResult( ErrorCodes.VersionMismatch, $"stream '{streamId}' is at version {actual} instead of {expected}."); diff --git a/src/EvenireDB/IEventsReader.cs b/src/EvenireDB/IEventsReader.cs index 02b3e3f..e1a418a 100644 --- a/src/EvenireDB/IEventsReader.cs +++ b/src/EvenireDB/IEventsReader.cs @@ -5,7 +5,7 @@ namespace EvenireDB; public interface IEventsReader { IAsyncEnumerable ReadAsync( - Guid streamId, + StreamId streamId, StreamPosition startPosition, Direction direction = Direction.Forward, CancellationToken cancellationToken = default); diff --git a/src/EvenireDB/IEventsWriter.cs b/src/EvenireDB/IEventsWriter.cs index f1908e5..72348b1 100644 --- a/src/EvenireDB/IEventsWriter.cs +++ b/src/EvenireDB/IEventsWriter.cs @@ -1,11 +1,12 @@ -namespace EvenireDB +using EvenireDB.Common; + +namespace EvenireDB; + +public interface IEventsWriter { - public interface IEventsWriter - { - ValueTask AppendAsync( - Guid streamId, - IEnumerable events, - int? expectedVersion = null, - CancellationToken cancellationToken = default); - } + ValueTask AppendAsync( + StreamId streamId, + IEnumerable events, + int? expectedVersion = null, + CancellationToken cancellationToken = default); } \ No newline at end of file diff --git a/src/EvenireDB/IExtentInfoProvider.cs b/src/EvenireDB/IExtentInfoProvider.cs deleted file mode 100644 index 8b22e8e..0000000 --- a/src/EvenireDB/IExtentInfoProvider.cs +++ /dev/null @@ -1,7 +0,0 @@ -namespace EvenireDB; - -public interface IExtentInfoProvider -{ - ExtentInfo? GetExtentInfo(Guid streamId, bool skipCheck = false); - IEnumerable GetExtentsInfo(); -} \ No newline at end of file diff --git a/src/EvenireDB/IExtentsProvider.cs b/src/EvenireDB/IExtentsProvider.cs new file mode 100644 index 0000000..14d3b22 --- /dev/null +++ b/src/EvenireDB/IExtentsProvider.cs @@ -0,0 +1,19 @@ +using EvenireDB.Common; + +namespace EvenireDB; + +public interface IExtentsProvider +{ + /// + /// returns the extent details for the specified stream. + /// + /// + /// the stream type. + /// when true, will return the first extent for the stream. + /// + ExtentInfo? GetExtentInfo(StreamId streamId, bool createIfMissing = false); + + IEnumerable GetAllExtentsInfo(StreamType? streamType = null); + + ValueTask DeleteExtentsAsync(StreamId streamId, CancellationToken cancellationToken = default); +} \ No newline at end of file diff --git a/src/EvenireDB/IServiceCollectionExtensions.cs b/src/EvenireDB/IServiceCollectionExtensions.cs index c147087..89b8a17 100644 --- a/src/EvenireDB/IServiceCollectionExtensions.cs +++ b/src/EvenireDB/IServiceCollectionExtensions.cs @@ -1,4 +1,5 @@ -using EvenireDB.Persistence; +using EvenireDB.Common; +using EvenireDB.Persistence; using EvenireDB.Server; using EvenireDB.Utils; using Microsoft.Extensions.DependencyInjection; @@ -16,7 +17,7 @@ public static EvenireServerSettings GetServerSettings(this IServiceProvider sp) public static IServiceCollection AddEvenire(this IServiceCollection services) { - var channel = Channel.CreateUnbounded(new UnboundedChannelOptions + var channel = Channel.CreateUnbounded(new UnboundedChannelOptions { SingleWriter = false, SingleReader = false, @@ -24,10 +25,10 @@ public static IServiceCollection AddEvenire(this IServiceCollection services) }); services - .AddSingleton>(ctx => + .AddSingleton>(ctx => { var settings = ctx.GetServerSettings(); - return new LRUCache(settings.MaxInMemoryStreamsCount); + return new LRUCache(settings.MaxInMemoryStreamsCount); }) .AddSingleton() .AddSingleton(ctx => @@ -63,9 +64,9 @@ public static IServiceCollection AddEvenire(this IServiceCollection services) dataPath = Path.Combine(AppContext.BaseDirectory, dataPath); } - return new ExtentInfoProviderConfig(dataPath); + return new ExtentsProviderConfig(dataPath); }) - .AddSingleton() + .AddSingleton() .AddSingleton() .AddSingleton() .AddSingleton() diff --git a/src/EvenireDB/IStreamInfoProvider.cs b/src/EvenireDB/IStreamInfoProvider.cs index 2fee86b..004d8c2 100644 --- a/src/EvenireDB/IStreamInfoProvider.cs +++ b/src/EvenireDB/IStreamInfoProvider.cs @@ -1,8 +1,11 @@ +using EvenireDB.Common; + namespace EvenireDB; public interface IStreamInfoProvider { - IEnumerable GetStreamsInfo(); - StreamInfo? GetStreamInfo(Guid streamId); - ValueTask DeleteStreamAsync(Guid streamId, CancellationToken cancellationToken = default); + IEnumerable GetStreamsInfo(StreamType? streamsType = null); + StreamInfo? GetStreamInfo(StreamId streamId); + + ValueTask DeleteStreamAsync(StreamId streamId, CancellationToken cancellationToken = default); } diff --git a/src/EvenireDB/IStreamsCache.cs b/src/EvenireDB/IStreamsCache.cs index 9a9bbfe..cb48273 100644 --- a/src/EvenireDB/IStreamsCache.cs +++ b/src/EvenireDB/IStreamsCache.cs @@ -1,9 +1,11 @@ -namespace EvenireDB; +using EvenireDB.Common; + +namespace EvenireDB; public interface IStreamsCache { - void Update(Guid streamId, CachedEvents entry); - ValueTask GetEventsAsync(Guid streamId, CancellationToken cancellationToken); - bool ContainsKey(Guid streamId); - void Remove(Guid streamId); + void Update(StreamId streamId, CachedEvents entry); + ValueTask GetEventsAsync(StreamId streamId, CancellationToken cancellationToken = default); + bool Contains(StreamId streamId); + void Remove(StreamId streamId); } \ No newline at end of file diff --git a/src/EvenireDB/IncomingEventsBatch.cs b/src/EvenireDB/IncomingEventsBatch.cs new file mode 100644 index 0000000..08c26a9 --- /dev/null +++ b/src/EvenireDB/IncomingEventsBatch.cs @@ -0,0 +1,5 @@ +using EvenireDB.Common; + +namespace EvenireDB; + +public record IncomingEventsBatch(StreamId StreamId, IEnumerable Events); \ No newline at end of file diff --git a/src/EvenireDB/IncomingEventsGroup.cs b/src/EvenireDB/IncomingEventsGroup.cs deleted file mode 100644 index cbafc89..0000000 --- a/src/EvenireDB/IncomingEventsGroup.cs +++ /dev/null @@ -1,4 +0,0 @@ -namespace EvenireDB -{ - public record IncomingEventsGroup(Guid AggregateId, IEnumerable Events); -} \ No newline at end of file diff --git a/src/EvenireDB/LogMessages.cs b/src/EvenireDB/LogMessages.cs index 7d922a2..3aea65c 100644 --- a/src/EvenireDB/LogMessages.cs +++ b/src/EvenireDB/LogMessages.cs @@ -1,4 +1,5 @@ -using Microsoft.Extensions.Logging; +using EvenireDB.Common; +using Microsoft.Extensions.Logging; namespace EvenireDB; @@ -20,13 +21,13 @@ public enum EventIds EventId = (int)EventIds.ReadingStreamFromRepository, Level = LogLevel.Warning, Message = "Reading stream '{StreamId}' from repository")] - public static partial void ReadingStreamFromRepository(this ILogger logger, Guid streamId); + public static partial void ReadingStreamFromRepository(this ILogger logger, StreamId streamId); [LoggerMessage( EventId = (int)EventIds.AppendingEventsToStream, Level = LogLevel.Debug, Message = "Appending {EventsCount} events to stream '{StreamId}'...")] - public static partial void AppendingEventsToStream(this ILogger logger, int eventsCount, Guid streamId); + public static partial void AppendingEventsToStream(this ILogger logger, int eventsCount, StreamId streamId); [LoggerMessage( EventId = (int)EventIds.HighMemoryUsageDetected, @@ -44,23 +45,23 @@ public enum EventIds EventId = (int)EventIds.EventsGroupPersistenceError, Level = LogLevel.Error, Message = "an error has occurred while persisting events group for stream {StreamId}: {Error}")] - public static partial void EventsGroupPersistenceError(this ILogger logger, Guid streamId, string error); + public static partial void EventsGroupPersistenceError(this ILogger logger, StreamId streamId, string error); [LoggerMessage( EventId = (int)EventIds.StreamDeletionAttempt, Level = LogLevel.Information, - Message = "Trying to delete stream '{StreamId}', attempt #{Attempt} ...")] - public static partial void StreamDeletionAttempt(this ILogger logger, Guid streamId, int attempt); + Message = "Trying to delete stream '{StreamId}' ...")] + public static partial void StreamDeletionStarted(this ILogger logger, StreamId streamId); [LoggerMessage( EventId = (int)EventIds.StreamDeleted, Level = LogLevel.Information, Message = "Stream '{StreamId}' deleted.")] - public static partial void StreamDeleted(this ILogger logger, Guid streamId); + public static partial void StreamDeleted(this ILogger logger, StreamId streamId); [LoggerMessage( EventId = (int)EventIds.StreamDeletionFailed, Level = LogLevel.Critical, Message = "Stream '{StreamId}' deletion failed: {Error}")] - public static partial void StreamDeletionFailed(this ILogger logger, Guid streamId, string error); + public static partial void StreamDeletionFailed(this ILogger logger, StreamId streamId, string error); } diff --git a/src/EvenireDB/Persistence/EventsProvider.cs b/src/EvenireDB/Persistence/EventsProvider.cs index 4ab56a3..ff34c2a 100644 --- a/src/EvenireDB/Persistence/EventsProvider.cs +++ b/src/EvenireDB/Persistence/EventsProvider.cs @@ -1,16 +1,18 @@ -using System.Collections.Concurrent; +using EvenireDB.Common; +using EvenireDB.Exceptions; +using System.Collections.Concurrent; using System.Runtime.CompilerServices; namespace EvenireDB.Persistence; internal class EventsProvider : IEventsProvider { - private readonly IExtentInfoProvider _extentInfoProvider; + private readonly IExtentsProvider _extentInfoProvider; private readonly IHeadersRepository _headersRepo; private readonly IDataRepository _dataRepo; - private readonly ConcurrentDictionary _streamLocks = new(); + private readonly ConcurrentDictionary _streamLocks = new(); //TODO: this should be moved to a locks provider - public EventsProvider(IHeadersRepository headersRepo, IDataRepository dataRepo, IExtentInfoProvider extentInfoProvider) + public EventsProvider(IHeadersRepository headersRepo, IDataRepository dataRepo, IExtentsProvider extentInfoProvider) { _headersRepo = headersRepo; _dataRepo = dataRepo; @@ -18,7 +20,7 @@ public EventsProvider(IHeadersRepository headersRepo, IDataRepository dataRepo, } public async IAsyncEnumerable ReadAsync( - Guid streamId, + StreamId streamId, int? skip = null, int? take = null, [EnumeratorCancellation] CancellationToken cancellationToken = default) @@ -35,10 +37,13 @@ public async IAsyncEnumerable ReadAsync( } public async ValueTask AppendAsync( - Guid streamId, - IEnumerable events, CancellationToken cancellationToken = default) + StreamId streamId, + IEnumerable events, + CancellationToken cancellationToken = default) { var extentInfo = _extentInfoProvider.GetExtentInfo(streamId, true); + if (extentInfo is null) + throw new StreamException(streamId, $"Unable to build extent for stream '{streamId}'."); var semaphore = _streamLocks.GetOrAdd(streamId, _ => new SemaphoreSlim(1, 1)); await semaphore.WaitAsync(cancellationToken); diff --git a/src/EvenireDB/Persistence/IEventsProvider.cs b/src/EvenireDB/Persistence/IEventsProvider.cs index 9140288..a808e1e 100644 --- a/src/EvenireDB/Persistence/IEventsProvider.cs +++ b/src/EvenireDB/Persistence/IEventsProvider.cs @@ -1,7 +1,9 @@ -namespace EvenireDB.Persistence; +using EvenireDB.Common; + +namespace EvenireDB.Persistence; public interface IEventsProvider { - ValueTask AppendAsync(Guid streamId, IEnumerable events, CancellationToken cancellationToken = default); - IAsyncEnumerable ReadAsync(Guid streamId, int? skip = null, int? take = null, CancellationToken cancellationToken = default); + ValueTask AppendAsync(StreamId streamId, IEnumerable events, CancellationToken cancellationToken = default); + IAsyncEnumerable ReadAsync(StreamId streamId, int? skip = null, int? take = null, CancellationToken cancellationToken = default); } \ No newline at end of file diff --git a/src/EvenireDB/StreamInfoProvider.cs b/src/EvenireDB/StreamInfoProvider.cs index 3a5044b..5a2cf3e 100644 --- a/src/EvenireDB/StreamInfoProvider.cs +++ b/src/EvenireDB/StreamInfoProvider.cs @@ -1,18 +1,18 @@ -using System.Runtime.InteropServices; -using EvenireDB.Exceptions; +using EvenireDB.Common; using Microsoft.Extensions.Logging; +using System.Runtime.InteropServices; namespace EvenireDB; internal class StreamInfoProvider : IStreamInfoProvider { private readonly int _headerSize = Marshal.SizeOf(); - private readonly IExtentInfoProvider _extentInfoProvider; + private readonly IExtentsProvider _extentInfoProvider; private readonly IStreamsCache _cache; private readonly ILogger _logger; public StreamInfoProvider( - IExtentInfoProvider extentInfoProvider, + IExtentsProvider extentInfoProvider, IStreamsCache cache, ILogger logger) { @@ -21,7 +21,7 @@ public StreamInfoProvider( _logger = logger ?? throw new ArgumentNullException(nameof(logger)); } - public StreamInfo? GetStreamInfo(Guid streamId) + public StreamInfo? GetStreamInfo(StreamId streamId) { var extent = _extentInfoProvider.GetExtentInfo(streamId); if (extent is null) @@ -35,61 +35,45 @@ public StreamInfoProvider( return new StreamInfo( streamId, headersCount, - _cache.ContainsKey(streamId), + _cache.Contains(streamId), fileInfo.CreationTimeUtc, fileInfo.LastWriteTimeUtc); } - public IEnumerable GetStreamsInfo() + public IEnumerable GetStreamsInfo(StreamType? streamsType = null) { - var allExtents = _extentInfoProvider.GetExtentsInfo(); - List results = new(); + var allExtents = _extentInfoProvider.GetAllExtentsInfo(streamsType); foreach (var extent in allExtents) { var info = GetStreamInfo(extent.StreamId); if (info is not null) - results.Add(info!); + yield return info; } - - return results; } - public async ValueTask DeleteStreamAsync(Guid streamId, CancellationToken cancellationToken = default) + //TODO: I don't like this here + public async ValueTask DeleteStreamAsync(StreamId streamId, CancellationToken cancellationToken = default) { - var extent = _extentInfoProvider.GetExtentInfo(streamId); + var extent = _extentInfoProvider.GetExtentInfo(streamId, createIfMissing: false); if (extent is null) - throw new ArgumentException($"Stream '{streamId}' does not exist."); + return false; - var deleted = false; - int attempt = 0; + _logger.StreamDeletionStarted(streamId); - while (!deleted && attempt < 10 && !cancellationToken.IsCancellationRequested) + try { - _logger.StreamDeletionAttempt(streamId, attempt); - - try - { - if (File.Exists(extent.HeadersPath)) - File.Delete(extent.HeadersPath); - - if (File.Exists(extent.DataPath)) - File.Delete(extent.DataPath); + await _extentInfoProvider.DeleteExtentsAsync(streamId, cancellationToken); - _cache.Remove(streamId); - - deleted = true; - } - catch (Exception ex) - { - _logger.StreamDeletionFailed(streamId, ex.Message); - attempt++; - await Task.Delay(1000); - } + _cache.Remove(streamId); + } + catch (Exception ex) + { + _logger.StreamDeletionFailed(streamId, ex.Message); + return false; } - - if (!deleted) - throw new StreamException(streamId, $"Failed to delete stream '{streamId}'."); _logger.StreamDeleted(streamId); + + return true; } } \ No newline at end of file diff --git a/src/EvenireDB/StreamsCache.cs b/src/EvenireDB/StreamsCache.cs index 71a9a05..adb4f06 100644 --- a/src/EvenireDB/StreamsCache.cs +++ b/src/EvenireDB/StreamsCache.cs @@ -1,4 +1,5 @@ -using EvenireDB.Persistence; +using EvenireDB.Common; +using EvenireDB.Persistence; using EvenireDB.Utils; using Microsoft.Extensions.Logging; @@ -6,13 +7,13 @@ namespace EvenireDB; internal class StreamsCache : IStreamsCache { - private readonly ICache _cache; + private readonly ICache _cache; private readonly ILogger _logger; private readonly IEventsProvider _repo; public StreamsCache( ILogger logger, - ICache cache, + ICache cache, IEventsProvider repo) { _logger = logger; @@ -20,7 +21,7 @@ public StreamsCache( _repo = repo; } - private async ValueTask Factory(Guid streamId, CancellationToken cancellationToken) + private async ValueTask Factory(StreamId streamId, CancellationToken cancellationToken) { _logger.ReadingStreamFromRepository(streamId); @@ -30,15 +31,15 @@ private async ValueTask Factory(Guid streamId, CancellationToken c return new CachedEvents(persistedEvents, new SemaphoreSlim(1)); } - public ValueTask GetEventsAsync(Guid streamId, CancellationToken cancellationToken) - => _cache.GetOrAddAsync(streamId, this.Factory, cancellationToken); + public ValueTask GetEventsAsync(StreamId streamId, CancellationToken cancellationToken = default) + => _cache.GetOrAddAsync(streamId, (_,_) => this.Factory(streamId, cancellationToken), cancellationToken); - public void Update(Guid streamId, CachedEvents entry) + public void Update(StreamId streamId, CachedEvents entry) => _cache.AddOrUpdate(streamId, entry); - public bool ContainsKey(Guid streamId) + public bool Contains(StreamId streamId) => _cache.ContainsKey(streamId); - public void Remove(Guid streamId) + public void Remove(StreamId streamId) => _cache.Remove(streamId); } diff --git a/src/EvenireDB/Utils/FileUtils.cs b/src/EvenireDB/Utils/FileUtils.cs new file mode 100644 index 0000000..3c61fbd --- /dev/null +++ b/src/EvenireDB/Utils/FileUtils.cs @@ -0,0 +1,56 @@ +namespace EvenireDB.Utils; + +internal static class FileUtils +{ + public static async ValueTask TryDeleteDirectoryAsync( + string path, + int maxAttempts = 3, + double delayInSeconds = 0.5, + CancellationToken cancellationToken = default) + { + int currAttempt = 0; + var delay = TimeSpan.FromSeconds(delayInSeconds); + while (currAttempt++ < maxAttempts) + { + if (!Directory.Exists(path)) + return true; + + try + { + Directory.Delete(path, true); + } + catch + { + await Task.Delay(delay, cancellationToken).ConfigureAwait(false); + } + } + + return !Directory.Exists(path); + } + + public static async ValueTask TryDeleteFileAsync( + string path, + int maxAttempts = 3, + double delayInSeconds = 0.5, + CancellationToken cancellationToken = default) + { + int currAttempt = 0; + var delay = TimeSpan.FromSeconds(delayInSeconds); + while (currAttempt++ < maxAttempts) + { + if (!File.Exists(path)) + return true; + + try + { + File.Delete(path); + } + catch + { + await Task.Delay(delay, cancellationToken).ConfigureAwait(false); + } + } + + return !File.Exists(path); + } +} \ No newline at end of file diff --git a/src/EvenireDB/Utils/LRUCache.cs b/src/EvenireDB/Utils/LRUCache.cs index 89f7307..158d25f 100644 --- a/src/EvenireDB/Utils/LRUCache.cs +++ b/src/EvenireDB/Utils/LRUCache.cs @@ -238,4 +238,4 @@ public void Dispose() Dispose(disposing: true); GC.SuppressFinalize(this); } -} \ No newline at end of file +} diff --git a/src/EvenireDB/Workers/IncomingEventsPersistenceWorker.cs b/src/EvenireDB/Workers/IncomingEventsPersistenceWorker.cs index ae29f34..64cf5a9 100644 --- a/src/EvenireDB/Workers/IncomingEventsPersistenceWorker.cs +++ b/src/EvenireDB/Workers/IncomingEventsPersistenceWorker.cs @@ -3,46 +3,49 @@ using Microsoft.Extensions.Logging; using System.Threading.Channels; -namespace EvenireDB.Server +namespace EvenireDB.Server; + +public class IncomingEventsPersistenceWorker : BackgroundService { - public class IncomingEventsPersistenceWorker : BackgroundService - { - private readonly ChannelReader _reader; - private readonly IEventsProvider _repo; - private readonly ILogger _logger; + private readonly ChannelReader _reader; + private readonly IEventsProvider _repo; + private readonly ILogger _logger; - public IncomingEventsPersistenceWorker(ChannelReader reader, IEventsProvider repo, ILogger logger) - { - _reader = reader ?? throw new ArgumentNullException(nameof(reader)); - _repo = repo ?? throw new ArgumentNullException(nameof(repo)); - _logger = logger ?? throw new ArgumentNullException(nameof(logger)); - } + public IncomingEventsPersistenceWorker( + ChannelReader reader, + IEventsProvider repo, + ILogger logger) + { + _reader = reader ?? throw new ArgumentNullException(nameof(reader)); + _repo = repo ?? throw new ArgumentNullException(nameof(repo)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + } - protected override async Task ExecuteAsync(CancellationToken cancellationToken) + protected override async Task ExecuteAsync(CancellationToken cancellationToken) + { + await Task.Run(async () => { - await Task.Factory.StartNew(async () => - { - await ExecuteAsyncCore(cancellationToken).ConfigureAwait(false); - }, cancellationToken); - } + await ExecuteAsyncCore(cancellationToken).ConfigureAwait(false); + }, cancellationToken).ConfigureAwait(false); + } - private async Task ExecuteAsyncCore(CancellationToken cancellationToken) - { - while (!cancellationToken.IsCancellationRequested || await _reader.WaitToReadAsync(cancellationToken)) - { - while (_reader.TryRead(out IncomingEventsGroup? group) && group is not null) - { - try - { - await _repo.AppendAsync(group.AggregateId, group.Events, cancellationToken) - .ConfigureAwait(false); - } - catch (Exception ex) - { - _logger.EventsGroupPersistenceError(group.AggregateId, ex.Message); - } - } - } - } + private async Task ExecuteAsyncCore(CancellationToken cancellationToken) + { + while (!cancellationToken.IsCancellationRequested) + await _reader.ReadAllAsync(cancellationToken) + .ForEachAsync(async batch => + { + if (batch is null) + return; + try + { + await _repo.AppendAsync(batch.StreamId, batch.Events, cancellationToken) + .ConfigureAwait(false); + } + catch (Exception ex) + { + _logger.EventsGroupPersistenceError(batch.StreamId, ex.Message); + } + }, cancellationToken); } } \ No newline at end of file diff --git a/src/EvenireDB/Workers/MemoryWatcher.cs b/src/EvenireDB/Workers/MemoryWatcher.cs index fa849d2..011fa37 100644 --- a/src/EvenireDB/Workers/MemoryWatcher.cs +++ b/src/EvenireDB/Workers/MemoryWatcher.cs @@ -1,60 +1,60 @@ -using EvenireDB.Utils; +using EvenireDB.Common; +using EvenireDB.Utils; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using System.Diagnostics; -namespace EvenireDB.Server +namespace EvenireDB.Server; + +public record MemoryWatcherSettings( + TimeSpan Interval, + long MaxAllowedAllocatedBytes); + +public class MemoryWatcher : BackgroundService { - public record MemoryWatcherSettings( - TimeSpan Interval, - long MaxAllowedAllocatedBytes); + private readonly MemoryWatcherSettings _settings; + private readonly ILogger _logger; + private readonly IServiceProvider _sp; + private Process? process; - public class MemoryWatcher : BackgroundService + public MemoryWatcher(MemoryWatcherSettings settings, ILogger logger, IServiceProvider sp) { - private readonly MemoryWatcherSettings _settings; - private readonly ILogger _logger; - private readonly IServiceProvider _sp; - private Process? process; - - public MemoryWatcher(MemoryWatcherSettings settings, ILogger logger, IServiceProvider sp) - { - _settings = settings; - _logger = logger; - _sp = sp; - } + _settings = settings; + _logger = logger; + _sp = sp; + } - public override void Dispose() - { - process?.Dispose(); - base.Dispose(); - } + public override void Dispose() + { + process?.Dispose(); + base.Dispose(); + } - protected override async Task ExecuteAsync(CancellationToken stoppingToken) + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + while (!stoppingToken.IsCancellationRequested) { - while (!stoppingToken.IsCancellationRequested) - { - process ??= Process.GetCurrentProcess(); - process.Refresh(); + process ??= Process.GetCurrentProcess(); + process.Refresh(); - _logger.MemoryUsageBelowTreshold(process.PrivateMemorySize64, _settings.MaxAllowedAllocatedBytes); + _logger.MemoryUsageBelowTreshold(process.PrivateMemorySize64, _settings.MaxAllowedAllocatedBytes); - bool needDrop = process.PrivateMemorySize64 > _settings.MaxAllowedAllocatedBytes; - if (needDrop) - { - _logger.HighMemoryUsageDetected(process.PrivateMemorySize64, _settings.MaxAllowedAllocatedBytes); - - using var scope = _sp.CreateScope(); - var cache = scope.ServiceProvider.GetRequiredService>(); - - var dropCount = cache.Count / 3; - cache.DropOldest(dropCount); + bool needDrop = process.PrivateMemorySize64 > _settings.MaxAllowedAllocatedBytes; + if (needDrop) + { + _logger.HighMemoryUsageDetected(process.PrivateMemorySize64, _settings.MaxAllowedAllocatedBytes); + + using var scope = _sp.CreateScope(); + var cache = scope.ServiceProvider.GetRequiredService>(); - GC.Collect(); - } + var dropCount = cache.Count / 3; + cache.DropOldest(dropCount); - await Task.Delay(_settings.Interval, stoppingToken); + GC.Collect(); } + + await Task.Delay(_settings.Interval, stoppingToken); } } } \ No newline at end of file diff --git a/src/Protos/events.proto b/src/Protos/events.proto index bbc5892..5e35794 100644 --- a/src/Protos/events.proto +++ b/src/Protos/events.proto @@ -13,19 +13,22 @@ service EventsGrpcService { message ReadRequest { string streamId = 1; - uint32 startPosition = 2; - uint32 direction = 3; + string streamType = 2; + uint32 startPosition = 3; + uint32 direction = 4; } message AppendRequest { string streamId = 1; - repeated EventData events = 2; - google.protobuf.Int32Value expectedVersion = 3; + string streamType = 2; + repeated EventData events = 3; + google.protobuf.Int32Value expectedVersion = 4; } message AppendResponse { string streamId = 1; - FailureResponse error = 2; + string streamType = 2; + FailureResponse error = 3; } message FailureResponse { diff --git a/src/tools/EvenireDB.Tools.EventsGenerator/Program.cs b/src/tools/EvenireDB.Tools.EventsGenerator/Program.cs index e07528d..90ae6c9 100644 --- a/src/tools/EvenireDB.Tools.EventsGenerator/Program.cs +++ b/src/tools/EvenireDB.Tools.EventsGenerator/Program.cs @@ -1,5 +1,6 @@ using EvenireDB.Client; using EvenireDB.Client.Exceptions; +using EvenireDB.Common; using Microsoft.Extensions.DependencyInjection; using System.CommandLine; using System.Text; @@ -32,7 +33,9 @@ var streamIdOption = new Option( name: "--stream", description: "The stream to use."); - +var streamTypeOption = new Option( + name: "--type", + description: "The stream type to use."); var rootCommand = new RootCommand { @@ -41,9 +44,10 @@ grpcPort, httpPort, eventsCount, - streamIdOption + streamIdOption, + streamTypeOption }; -rootCommand.SetHandler(async (streamId, uri, useGrpc, grpcPort, httpPort, eventsCount) => { +rootCommand.SetHandler(async (streamKey, streamType, uri, useGrpc, grpcPort, httpPort, eventsCount) => { var clientConfig = new EvenireClientConfig() { ServerUri = uri, @@ -58,6 +62,8 @@ var provider = services.BuildServiceProvider(); var client = provider.GetRequiredService(); + var streamId = new StreamId(streamKey, streamType); + Console.ForegroundColor = ConsoleColor.Yellow; Console.WriteLine($"Sending {eventsCount} events to stream '{streamId}' on server '{uri}'..."); @@ -77,6 +83,6 @@ Console.WriteLine("Done."); Console.ResetColor(); -}, streamIdOption, serverOption, useGrpc, grpcPort, httpPort, eventsCount); +}, streamIdOption, streamTypeOption, serverOption, useGrpc, grpcPort, httpPort, eventsCount); await rootCommand.InvokeAsync(args); \ No newline at end of file diff --git a/tests/EvenireDB.Client.Tests/GrpcEventsClientTests.cs b/tests/EvenireDB.Client.Tests/GrpcEventsClientTests.cs index d923a57..7f64212 100644 --- a/tests/EvenireDB.Client.Tests/GrpcEventsClientTests.cs +++ b/tests/EvenireDB.Client.Tests/GrpcEventsClientTests.cs @@ -2,76 +2,77 @@ using EvenireDB.Common; using EvenireDB.Server.Tests; -namespace EvenireDB.Client.Tests +namespace EvenireDB.Client.Tests; + +public class GrpcEventsClientTests : IClassFixture { - public class GrpcEventsClientTests : IClassFixture + private readonly ServerFixture _serverFixture; + private const string _defaultStreamsType = "lorem"; + + public GrpcEventsClientTests(ServerFixture serverFixture) { - private readonly ServerFixture _serverFixture; + _serverFixture = serverFixture; + } - public GrpcEventsClientTests(ServerFixture serverFixture) - { - _serverFixture = serverFixture; - } + private GrpcEventsClient CreateSut() + { + var channel = _serverFixture.CreateGrpcChannel(); + var client = new GrpcEvents.EventsGrpcService.EventsGrpcServiceClient(channel); + return new GrpcEventsClient(client); + } - private GrpcEventsClient CreateSut() - { - var channel = _serverFixture.CreateGrpcChannel(); - var client = new GrpcEvents.EventsGrpcService.EventsGrpcServiceClient(channel); - return new GrpcEventsClient(client); - } + [Fact] + public async Task ReadAsync_should_return_empty_collection_when_no_events_available() + { + var sut = CreateSut(); - [Fact] - public async Task ReadAsync_should_return_empty_collection_when_no_events_available() - { - var sut = CreateSut(); + var events = await sut.ReadAsync(new StreamId(Guid.NewGuid(), _defaultStreamsType)).ToListAsync(); + events.Should().NotBeNull().And.BeEmpty(); + } - var events = await sut.ReadAsync(Guid.NewGuid()).ToListAsync(); - events.Should().NotBeNull().And.BeEmpty(); - } + [Fact] + public async Task ReadAsync_should_be_able_to_read_backwards() + { + var inputEvents = TestUtils.BuildEvents(242); - [Fact] - public async Task ReadAsync_should_be_able_to_read_backwards() - { - var streamId = Guid.NewGuid(); - var inputEvents = TestUtils.BuildEvents(242); + var sut = CreateSut(); - var sut = CreateSut(); + var streamId = new StreamId(Guid.NewGuid(), _defaultStreamsType); - await sut.AppendAsync(streamId, inputEvents); + await sut.AppendAsync(streamId, inputEvents); - var expectedEvents = inputEvents.Reverse().Take(100).ToArray(); + var expectedEvents = inputEvents.Reverse().Take(100).ToArray(); - var receivedEvents = await sut.ReadAsync(streamId, position: StreamPosition.End, direction: Direction.Backward).ToArrayAsync(); - receivedEvents.Should().NotBeNullOrEmpty() - .And.HaveCount(100); + var receivedEvents = await sut.ReadAsync(streamId, position: StreamPosition.End, direction: Direction.Backward).ToArrayAsync(); + receivedEvents.Should().NotBeNullOrEmpty() + .And.HaveCount(100); - TestUtils.IsEquivalent(receivedEvents, expectedEvents); - } + TestUtils.IsEquivalent(receivedEvents, expectedEvents); + } - [Fact] - public async Task AppendAsync_should_append_events() - { - var streamId = Guid.NewGuid(); - var expectedEvents = TestUtils.BuildEvents(10); + [Fact] + public async Task AppendAsync_should_append_events() + { + var streamId = new StreamId(Guid.NewGuid(), _defaultStreamsType); + var expectedEvents = TestUtils.BuildEvents(10); - var sut = CreateSut(); + var sut = CreateSut(); - await sut.AppendAsync(streamId, expectedEvents); - var receivedEvents = await sut.ReadAsync(streamId).ToArrayAsync(); + await sut.AppendAsync(streamId, expectedEvents); + var receivedEvents = await sut.ReadAsync(streamId).ToArrayAsync(); - TestUtils.IsEquivalent(receivedEvents, expectedEvents); - } + TestUtils.IsEquivalent(receivedEvents, expectedEvents); + } - [Fact(Skip = "TBD")] - public async Task AppendAsync_should_fail_when_events_already_appended() - { - var streamId = Guid.NewGuid(); - var expectedEvents = TestUtils.BuildEvents(10); + [Fact(Skip = "TBD")] + public async Task AppendAsync_should_fail_when_events_already_appended() + { + var streamId = new StreamId(Guid.NewGuid(), _defaultStreamsType); + var expectedEvents = TestUtils.BuildEvents(10); - var sut = CreateSut(); + var sut = CreateSut(); - await sut.AppendAsync(streamId, expectedEvents); - await Assert.ThrowsAsync(async () => await sut.AppendAsync(streamId, expectedEvents)); - } + await sut.AppendAsync(streamId, expectedEvents); + await Assert.ThrowsAsync(async () => await sut.AppendAsync(streamId, expectedEvents)); } } \ No newline at end of file diff --git a/tests/EvenireDB.Client.Tests/HttpEventsClientTests.cs b/tests/EvenireDB.Client.Tests/HttpEventsClientTests.cs index 0f3348a..a91dc91 100644 --- a/tests/EvenireDB.Client.Tests/HttpEventsClientTests.cs +++ b/tests/EvenireDB.Client.Tests/HttpEventsClientTests.cs @@ -7,6 +7,7 @@ namespace EvenireDB.Client.Tests; public class HttpEventsClientTests : IClassFixture { private readonly ServerFixture _serverFixture; + private const string _defaultStreamsType = "lorem"; public HttpEventsClientTests(ServerFixture serverFixture) { @@ -20,14 +21,15 @@ public async Task ReadAsync_should_return_empty_collection_when_no_events_availa using var client = application.CreateClient(); var sut = new HttpEventsClient(client); - var events = await sut.ReadAsync(Guid.NewGuid()).ToListAsync(); + var streamId = new StreamId(Guid.NewGuid(), _defaultStreamsType); + var events = await sut.ReadAsync(streamId).ToListAsync(); events.Should().NotBeNull().And.BeEmpty(); } [Fact] public async Task ReadAsync_should_be_able_to_read_backwards() { - var streamId = Guid.NewGuid(); + var streamId = new StreamId(Guid.NewGuid(), _defaultStreamsType); var inputEvents = TestUtils.BuildEventsData(242); await using var application = _serverFixture.CreateServer(); @@ -45,7 +47,7 @@ public async Task ReadAsync_should_be_able_to_read_backwards() [Fact] public async Task AppendAsync_should_append_events() { - var streamId = Guid.NewGuid(); + var streamId = new StreamId(Guid.NewGuid(), _defaultStreamsType); var expectedEvents = TestUtils.BuildEvents(10); await using var application = _serverFixture.CreateServer(); @@ -61,7 +63,7 @@ public async Task AppendAsync_should_append_events() [Fact(Skip = "TBD")] public async Task AppendAsync_should_fail_when_events_already_appended() { - var streamId = Guid.NewGuid(); + var streamId = new StreamId(Guid.NewGuid(), _defaultStreamsType); var expectedEvents = TestUtils.BuildEvents(10); await using var application = _serverFixture.CreateServer(); diff --git a/tests/EvenireDB.Client.Tests/HttpStreamsClientTests.cs b/tests/EvenireDB.Client.Tests/HttpStreamsClientTests.cs index 23c5880..f5c4c71 100644 --- a/tests/EvenireDB.Client.Tests/HttpStreamsClientTests.cs +++ b/tests/EvenireDB.Client.Tests/HttpStreamsClientTests.cs @@ -1,11 +1,13 @@ using EvenireDB.Client.Exceptions; +using EvenireDB.Common; using EvenireDB.Server.Tests; -using System.IO; namespace EvenireDB.Client.Tests; public class HttpStreamsClientTests : IClassFixture { + private const string _defaultStreamsType = "lorem"; + [Fact] public async Task GetStreamInfosAsync_should_return_nothing_when_no_streams_available() { @@ -13,7 +15,7 @@ public async Task GetStreamInfosAsync_should_return_nothing_when_no_streams_avai using var client = application.CreateClient(); var sut = new HttpStreamsClient(client); - var results = await sut.GetStreamInfosAsync(); + var results = await sut.GetStreamInfosAsync(_defaultStreamsType); results.Should().NotBeNull().And.BeEmpty(); } @@ -25,7 +27,7 @@ public async Task GetStreamInfosAsync_should_return_ok() using var client = application.CreateClient(); var sut = new HttpStreamsClient(client); - var results = await sut.GetStreamInfosAsync(); + var results = await sut.GetStreamInfosAsync(_defaultStreamsType); results.Should().NotBeNull(); } @@ -35,15 +37,15 @@ public async Task GetStreamInfoAsync_should_throw_when_stream_id_invalid() await using var application = new TestServerWebApplicationFactory(); using var client = application.CreateClient(); - + var streamId = new StreamId(Guid.NewGuid(), _defaultStreamsType); var sut = new HttpStreamsClient(client); - await Assert.ThrowsAsync< StreamNotFoundException >(async () => await sut.GetStreamInfoAsync(Guid.NewGuid())); + await Assert.ThrowsAsync< StreamNotFoundException >(async () => await sut.GetStreamInfoAsync(streamId)); } [Fact] public async Task GetStreamInfoAsync_should_return_stream_details_when_existing() { - var streamId = Guid.NewGuid(); + var streamId = new StreamId(Guid.NewGuid(), _defaultStreamsType); await using var application = new TestServerWebApplicationFactory(); @@ -55,14 +57,14 @@ public async Task GetStreamInfoAsync_should_return_stream_details_when_existing( var sut = new HttpStreamsClient(client); var result = await sut.GetStreamInfoAsync(streamId); result.Should().NotBeNull(); - result.StreamId.Should().Be(streamId); + result.Id.Should().Be(streamId); result.EventsCount.Should().Be(42); } [Fact] public async Task DeleteStreamAsync_should_delete_stream_when_existing() { - var streamId = Guid.NewGuid(); + var streamId = new StreamId(Guid.NewGuid(), _defaultStreamsType); await using var application = new TestServerWebApplicationFactory(); @@ -81,7 +83,7 @@ public async Task DeleteStreamAsync_should_delete_stream_when_existing() [Fact] public async Task DeleteStreamAsync_should_throw_when_stream_not_existing() { - var streamId = Guid.NewGuid(); + var streamId = new StreamId(Guid.NewGuid(), _defaultStreamsType); await using var application = new TestServerWebApplicationFactory(); diff --git a/tests/EvenireDB.Client.Tests/IEventsClientExtensionsTests.cs b/tests/EvenireDB.Client.Tests/IEventsClientExtensionsTests.cs index 8804e98..86adf9a 100644 --- a/tests/EvenireDB.Client.Tests/IEventsClientExtensionsTests.cs +++ b/tests/EvenireDB.Client.Tests/IEventsClientExtensionsTests.cs @@ -1,34 +1,35 @@ -using EvenireDB.Server.Tests; +using EvenireDB.Common; +using EvenireDB.Server.Tests; -namespace EvenireDB.Client.Tests +namespace EvenireDB.Client.Tests; + +public class IEventsClientExtensionsTests : IClassFixture { - public class IEventsClientExtensionsTests : IClassFixture - { - private readonly ServerFixture _serverFixture; + private readonly ServerFixture _serverFixture; + private const string _defaultStreamsType = "lorem"; - public IEventsClientExtensionsTests(ServerFixture serverFixture) - { - _serverFixture = serverFixture; - } + public IEventsClientExtensionsTests(ServerFixture serverFixture) + { + _serverFixture = serverFixture; + } - private GrpcEventsClient CreateSut() - { - var channel = _serverFixture.CreateGrpcChannel(); - var client = new GrpcEvents.EventsGrpcService.EventsGrpcServiceClient(channel); - return new GrpcEventsClient(client); - } + private GrpcEventsClient CreateSut() + { + var channel = _serverFixture.CreateGrpcChannel(); + var client = new GrpcEvents.EventsGrpcService.EventsGrpcServiceClient(channel); + return new GrpcEventsClient(client); + } - [Fact] - public async Task ReadAllAsync_should_read_entire_stream() - { - var streamId = Guid.NewGuid(); - var expectedEvents = TestUtils.BuildEvents(242); + [Fact] + public async Task ReadAllAsync_should_read_entire_stream() + { + var streamId = new StreamId(Guid.NewGuid(), _defaultStreamsType); + var expectedEvents = TestUtils.BuildEvents(242); - var sut = CreateSut(); - await sut.AppendAsync(streamId, expectedEvents); + var sut = CreateSut(); + await sut.AppendAsync(streamId, expectedEvents); - var receivedEvents = await sut.ReadAllAsync(streamId).ToArrayAsync(); - TestUtils.IsEquivalent(receivedEvents, expectedEvents); - } + var receivedEvents = await sut.ReadAllAsync(streamId).ToArrayAsync(); + TestUtils.IsEquivalent(receivedEvents, expectedEvents); } } \ No newline at end of file diff --git a/tests/EvenireDB.Server.Tests/GrpcTests.cs b/tests/EvenireDB.Server.Tests/GrpcTests.cs index 3dd5656..c3e63fd 100644 --- a/tests/EvenireDB.Server.Tests/GrpcTests.cs +++ b/tests/EvenireDB.Server.Tests/GrpcTests.cs @@ -24,6 +24,7 @@ public async Task Get_Archive_should_be_empty_when_no_events_available_for_strea var req = new ReadRequest() { StreamId = Guid.NewGuid().ToString(), + StreamType = "lorem" }; var response = client.Read(req); var loadedEvents = await response.ResponseStream.ReadAllAsync().ToListAsync(); @@ -31,7 +32,7 @@ public async Task Get_Archive_should_be_empty_when_no_events_available_for_strea } [Fact] - public async Task Append_should_return_bad_request_when_input_invalid() + public async Task Append_should_return_bad_request_when_events_invalid() { var streamId = Guid.NewGuid(); var dtos = BuildEventDataDTOs(10, null); @@ -41,7 +42,31 @@ public async Task Append_should_return_bad_request_when_input_invalid() var req = new AppendRequest() { - StreamId = streamId.ToString() + StreamId = streamId.ToString(), + StreamType = "lorem" + }; + req.Events.AddRange(dtos); + var response = await client.AppendAsync(req); + response.Should().NotBeNull(); + response.Error.Should().NotBeNull(); + response.Error.Code.Should().Be(ErrorCodes.BadRequest); + } + + [Theory] + [InlineData("")] + [InlineData(" ")] + public async Task Append_should_return_bad_request_when_stream_type_invalid(string streamType) + { + var streamId = Guid.NewGuid(); + var dtos = BuildEventDataDTOs(10, _defaultEventData); + + var channel = _serverFixture.CreateGrpcChannel(); + var client = new EventsGrpcService.EventsGrpcServiceClient(channel); + + var req = new AppendRequest() + { + StreamId = streamId.ToString(), + StreamType = streamType }; req.Events.AddRange(dtos); var response = await client.AppendAsync(req); @@ -61,7 +86,8 @@ public async Task Post_should_return_bad_request_when_input_too_big() var req = new AppendRequest() { - StreamId = streamId.ToString() + StreamId = streamId.ToString(), + StreamType = "lorem" }; req.Events.AddRange(dtos); var response = await client.AppendAsync(req); @@ -81,7 +107,8 @@ public async Task Post_should_return_version_mismatch_when_stream_version_mismat var req = new AppendRequest() { - StreamId = streamId.ToString() + StreamId = streamId.ToString(), + StreamType = "lorem" }; req.Events.AddRange(dtos); await client.AppendAsync(req); @@ -89,6 +116,7 @@ public async Task Post_should_return_version_mismatch_when_stream_version_mismat var req2 = new AppendRequest() { StreamId = streamId.ToString(), + StreamType = "lorem", ExpectedVersion = 42 }; req2.Events.AddRange(dtos); @@ -109,7 +137,8 @@ public async Task Post_should_return_conflict_when_input_already_in_stream() var req = new AppendRequest() { - StreamId = streamId.ToString() + StreamId = streamId.ToString(), + StreamType = "lorem" }; req.Events.AddRange(dtos); var response = await client.AppendAsync(req); @@ -133,7 +162,8 @@ public async Task Post_should_succeed_when_input_valid() var req = new AppendRequest() { - StreamId = streamId.ToString() + StreamId = streamId.ToString(), + StreamType = "lorem" }; req.Events.AddRange(dtos); var response = await client.AppendAsync(req); @@ -143,6 +173,7 @@ public async Task Post_should_succeed_when_input_valid() var readReq = new ReadRequest() { StreamId = streamId.ToString(), + StreamType = "lorem" }; var readResponse = client.Read(readReq); var loadedEvents = await readResponse.ResponseStream.ReadAllAsync().ToListAsync(); diff --git a/tests/EvenireDB.Server.Tests/IncomingEventsSubscriberTests.cs b/tests/EvenireDB.Server.Tests/IncomingEventsSubscriberTests.cs index 85daeb7..fe4c55c 100644 --- a/tests/EvenireDB.Server.Tests/IncomingEventsSubscriberTests.cs +++ b/tests/EvenireDB.Server.Tests/IncomingEventsSubscriberTests.cs @@ -1,36 +1,53 @@ -using EvenireDB.Persistence; +using EvenireDB.Common; +using EvenireDB.Persistence; using Microsoft.Extensions.Logging; using System.Threading.Channels; -namespace EvenireDB.Server.Tests +namespace EvenireDB.Server.Tests; + +public class IncomingEventsSubscriberTests { - public class IncomingEventsSubscriberTests + [Fact] + public async Task Service_should_handle_exceptions_gracefully() { - [Fact] - public async Task Service_should_handle_exceptions_gracefully() - { - var channel = Channel.CreateBounded(10); - var repo = Substitute.For(); - repo.WhenForAnyArgs(r => r.AppendAsync(Arg.Any(), null, default)) - .Throw(); + var events = Array.Empty(); + var streamId = new StreamId(Guid.NewGuid(), "lorem"); + var expectedGroups = Enumerable.Range(0, 10) + .Select(i => new IncomingEventsBatch(streamId, events)) + .ToArray(); + + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + int batchIndex = 0; - var logger = Substitute.For>(); + var reader = Substitute.ForPartsOf>(); + reader.WaitToReadAsync(Arg.Any()) + .ReturnsForAnyArgs(ValueTask.FromResult(batchIndex < expectedGroups.Length)); + reader.TryRead(out Arg.Any()) + .Returns(x => + { + if (batchIndex >= expectedGroups.Length) + { + cts.Cancel(); + return false; + } - var sut = new IncomingEventsPersistenceWorker(channel.Reader, repo, logger); + x[0] = expectedGroups[batchIndex++]; + return true; + }); - await sut.StartAsync(default); + var repo = Substitute.For(); + repo.WhenForAnyArgs(r => r.AppendAsync(streamId, events, Arg.Any())) + .Throw(); - var groups = Enumerable.Range(0, 10) - .Select(i => new IncomingEventsGroup(Guid.NewGuid(), null)) - .ToArray(); - foreach (var group in groups) - await channel.Writer.WriteAsync(group); + var logger = Substitute.For>(); - await Task.Delay(2000); - await sut.StopAsync(default); + var sut = new IncomingEventsPersistenceWorker(reader, repo, logger); + + await sut.StartAsync(cts.Token); - await repo.ReceivedWithAnyArgs(groups.Length) - .AppendAsync(Arg.Any(), null, default); - } + await Task.Delay(TimeSpan.FromSeconds(2)); + + await repo.ReceivedWithAnyArgs(expectedGroups.Length) + .AppendAsync(Arg.Any(), Arg.Any>(), default); } } \ No newline at end of file diff --git a/tests/EvenireDB.Server.Tests/Routes/EventsV1EndpointTests.cs b/tests/EvenireDB.Server.Tests/Routes/EventsV1EndpointTests.cs index 24e03d6..cd5ae2f 100644 --- a/tests/EvenireDB.Server.Tests/Routes/EventsV1EndpointTests.cs +++ b/tests/EvenireDB.Server.Tests/Routes/EventsV1EndpointTests.cs @@ -6,6 +6,7 @@ namespace EvenireDB.Server.Tests.Routes; public class EventsV1EndpointTests : IClassFixture { private readonly ServerFixture _serverFixture; + private const string _defaultStreamsType = "lorem"; public EventsV1EndpointTests(ServerFixture serverFixture) { @@ -18,7 +19,7 @@ public async Task Get_Archive_should_be_empty_when_no_events_available_for_strea await using var application = _serverFixture.CreateServer(); using var client = application.CreateClient(); - var response = await client.GetAsync($"/api/v1/streams/{Guid.NewGuid()}/events"); + var response = await client.GetAsync($"/api/v1/streams/{_defaultStreamsType}/{Guid.NewGuid()}/events"); response.StatusCode.Should().Be(System.Net.HttpStatusCode.OK); var events = await response.Content.ReadFromJsonAsync(); @@ -32,7 +33,7 @@ public async Task Post_should_return_bad_request_when_input_null() await using var application = _serverFixture.CreateServer(); using var client = application.CreateClient(); - var nullPayloadResponse = await client.PostAsJsonAsync($"/api/v1/streams/{streamId}/events", null); + var nullPayloadResponse = await client.PostAsJsonAsync($"/api/v1/streams/{_defaultStreamsType}/{streamId}/events", null); nullPayloadResponse.StatusCode.Should().Be(System.Net.HttpStatusCode.BadRequest); } @@ -45,7 +46,7 @@ public async Task Post_should_return_bad_request_when_input_invalid() using var client = application.CreateClient(); var dtos = HttpRoutesUtils.BuildEventsDTOs(10, null); - var nullDataResponse = await client.PostAsJsonAsync($"/api/v1/streams/{streamId}/events", dtos); + var nullDataResponse = await client.PostAsJsonAsync($"/api/v1/streams/{_defaultStreamsType}/{streamId}/events", dtos); nullDataResponse.StatusCode.Should().Be(System.Net.HttpStatusCode.BadRequest); } @@ -57,7 +58,7 @@ public async Task Post_should_return_bad_request_when_input_too_big() await using var application = _serverFixture.CreateServer(); using var client = application.CreateClient(); var dtos = HttpRoutesUtils.BuildEventsDTOs(1, new byte[500_001]); //TODO: from config - var response = await client.PostAsJsonAsync($"/api/v1/streams/{streamId}/events", dtos); + var response = await client.PostAsJsonAsync($"/api/v1/streams/{_defaultStreamsType}/{streamId}/events", dtos); response.StatusCode.Should().Be(System.Net.HttpStatusCode.BadRequest); } @@ -70,10 +71,10 @@ public async Task Post_should_return_conflict_when_input_already_in_stream() using var client = application.CreateClient(); var dtos = HttpRoutesUtils.BuildEventsDTOs(10, HttpRoutesUtils.DefaultEventData); - var firstResponse = await client.PostAsJsonAsync($"/api/v1/streams/{streamId}/events", dtos); + var firstResponse = await client.PostAsJsonAsync($"/api/v1/streams/{_defaultStreamsType}/{streamId}/events", dtos); firstResponse.StatusCode.Should().Be(System.Net.HttpStatusCode.Accepted); - var errorResponse = await client.PostAsJsonAsync($"/api/v1/streams/{streamId}/events", dtos); + var errorResponse = await client.PostAsJsonAsync($"/api/v1/streams/{_defaultStreamsType}/{streamId}/events", dtos); errorResponse.StatusCode.Should().Be(System.Net.HttpStatusCode.Conflict); } @@ -86,7 +87,7 @@ public async Task Post_should_return_accepted_when_input_valid() await using var application = _serverFixture.CreateServer(); using var client = application.CreateClient(); - var response = await client.PostAsJsonAsync($"/api/v1/streams/{streamId}/events", dtos); + var response = await client.PostAsJsonAsync($"/api/v1/streams/{_defaultStreamsType}/{streamId}/events", dtos); response.StatusCode.Should().Be(System.Net.HttpStatusCode.Accepted); } @@ -99,10 +100,10 @@ public async Task Post_should_return_bad_request_when_stream_version_mismatch() await using var application = _serverFixture.CreateServer(); using var client = application.CreateClient(); - var response = await client.PostAsJsonAsync($"/api/v1/streams/{streamId}/events", dtos); + var response = await client.PostAsJsonAsync($"/api/v1/streams/{_defaultStreamsType}/{streamId}/events", dtos); response.StatusCode.Should().Be(System.Net.HttpStatusCode.Accepted); - var response2 = await client.PostAsJsonAsync($"/api/v1/streams/{streamId}/events?version=2", dtos); + var response2 = await client.PostAsJsonAsync($"/api/v1/streams/{_defaultStreamsType}/{streamId}/events?version=2", dtos); response2.StatusCode.Should().Be(System.Net.HttpStatusCode.BadRequest); } @@ -117,10 +118,10 @@ public async Task Post_should_create_events() await using var application = _serverFixture.CreateServer(); using var client = application.CreateClient(); - var createResp = await client.PostAsJsonAsync($"/api/v1/streams/{streamId}/events", dtos); + var createResp = await client.PostAsJsonAsync($"/api/v1/streams/{_defaultStreamsType}/{streamId}/events", dtos); createResp.EnsureSuccessStatusCode(); - var response = await client.GetAsync($"/api/v1/streams/{streamId}/events"); + var response = await client.GetAsync($"/api/v1/streams/{_defaultStreamsType}/{streamId}/events"); var fetchedEvents = await response.Content.ReadFromJsonAsync(); fetchedEvents.Should().NotBeNull() .And.HaveCount(dtos.Length); diff --git a/tests/EvenireDB.Server.Tests/Routes/StreamsV1EndpointTests.cs b/tests/EvenireDB.Server.Tests/Routes/StreamsV1EndpointTests.cs index 338506c..6aa0e0b 100644 --- a/tests/EvenireDB.Server.Tests/Routes/StreamsV1EndpointTests.cs +++ b/tests/EvenireDB.Server.Tests/Routes/StreamsV1EndpointTests.cs @@ -1,10 +1,12 @@ -using System.Net.Http.Json; +using EvenireDB.Common; +using System.Net.Http.Json; namespace EvenireDB.Server.Tests.Routes; public class StreamsV1EndpointTests : IClassFixture { private readonly ServerFixture _serverFixture; + private const string _defaultStreamsType = "lorem"; public StreamsV1EndpointTests(ServerFixture serverFixture) { @@ -17,12 +19,12 @@ public async Task GetStreams_should_return_not_found_when_stream_id_invalid() await using var application = _serverFixture.CreateServer(); using var client = application.CreateClient(); - var response = await client.GetAsync($"/api/v1/streams/{Guid.NewGuid()}"); + var response = await client.GetAsync($"/api/v1/streams/{_defaultStreamsType}/{Guid.NewGuid()}"); response.StatusCode.Should().Be(System.Net.HttpStatusCode.NotFound); } [Fact] - public async Task GetStreams_should_return_ok_when_stream_id_valid() + public async Task GetStreamInfo_should_return_ok_when_stream_id_valid() { await using var application = _serverFixture.CreateServer(); @@ -31,17 +33,34 @@ public async Task GetStreams_should_return_ok_when_stream_id_valid() var streamId = Guid.NewGuid(); var dtos = HttpRoutesUtils.BuildEventsDTOs(10, HttpRoutesUtils.DefaultEventData); - await client.PostAsJsonAsync($"/api/v1/streams/{streamId}/events", dtos); + await client.PostAsJsonAsync($"/api/v1/streams/{_defaultStreamsType}/{streamId}/events", dtos); - var response = await client.GetAsync($"/api/v1/streams/{streamId}"); + var response = await client.GetAsync($"/api/v1/streams/{_defaultStreamsType}/{streamId}"); response.StatusCode.Should().Be(System.Net.HttpStatusCode.OK); var stream = await response.Content.ReadFromJsonAsync(); stream.Should().NotBeNull(); - stream.StreamId.Should().Be(streamId); + stream.Id.Key.Should().Be(streamId); + stream.Id.Type.ToString().Should().Be(_defaultStreamsType); stream.EventsCount.Should().Be(10); } + [Fact] + public async Task GetStreamInfo_should_return_not_found_when_stream_id_valid_but_type_invalid() + { + await using var application = _serverFixture.CreateServer(); + + using var client = application.CreateClient(); + + var streamId = Guid.NewGuid(); + + var dtos = HttpRoutesUtils.BuildEventsDTOs(10, HttpRoutesUtils.DefaultEventData); + await client.PostAsJsonAsync($"/api/v1/streams/{_defaultStreamsType}/{streamId}/events", dtos); + + var response = await client.GetAsync($"/api/v1/streams/invalid_type/{streamId}"); + response.StatusCode.Should().Be(System.Net.HttpStatusCode.NotFound); + } + [Fact] public async Task GetStreams_should_return_empty_when_no_streams_available() { @@ -65,7 +84,7 @@ public async Task GetStreams_should_return_available_streams() var streamId = Guid.NewGuid(); var dtos = HttpRoutesUtils.BuildEventsDTOs(10, HttpRoutesUtils.DefaultEventData); - await client.PostAsJsonAsync($"/api/v1/streams/{streamId}/events", dtos); + await client.PostAsJsonAsync($"/api/v1/streams/{_defaultStreamsType}/{streamId}/events", dtos); var response = await client.GetAsync("/api/v1/streams"); response.StatusCode.Should().Be(System.Net.HttpStatusCode.OK); @@ -74,7 +93,27 @@ public async Task GetStreams_should_return_available_streams() streams.Should().NotBeNull() .And.NotBeEmpty() .And.HaveCount(1) - .And.Contain(s => s.StreamId == streamId); + .And.Contain(s => s.Id.Key == streamId && s.Id.Type == _defaultStreamsType); + } + + [Fact] + public async Task GetStreams_should_empty_when_type_invalid() + { + await using var application = _serverFixture.CreateServer(); + + using var client = application.CreateClient(); + + var streamId = Guid.NewGuid(); + + var dtos = HttpRoutesUtils.BuildEventsDTOs(10, HttpRoutesUtils.DefaultEventData); + await client.PostAsJsonAsync($"/api/v1/streams/{_defaultStreamsType}/{streamId}/events", dtos); + + var response = await client.GetAsync("/api/v1/streams?streamsType=invalid"); + response.StatusCode.Should().Be(System.Net.HttpStatusCode.OK); + + var streams = await response.Content.ReadFromJsonAsync(); + streams.Should().NotBeNull() + .And.BeEmpty(); } [Fact] @@ -87,12 +126,12 @@ public async Task DeleteSteamAsync_should_delete_existing_stream() var streamId = Guid.NewGuid(); var dtos = HttpRoutesUtils.BuildEventsDTOs(10, HttpRoutesUtils.DefaultEventData); - await client.PostAsJsonAsync($"/api/v1/streams/{streamId}/events", dtos); + await client.PostAsJsonAsync($"/api/v1/streams/{_defaultStreamsType}/{streamId}/events", dtos); - var response = await client.DeleteAsync($"/api/v1/streams/{streamId}"); + var response = await client.DeleteAsync($"/api/v1/streams/{_defaultStreamsType}/{streamId}"); response.StatusCode.Should().Be(System.Net.HttpStatusCode.NoContent); - var secondResp = await client.DeleteAsync($"/api/v1/streams/{streamId}"); + var secondResp = await client.DeleteAsync($"/api/v1/streams/{_defaultStreamsType}/{streamId}"); secondResp.StatusCode.Should().Be(System.Net.HttpStatusCode.NotFound); } @@ -105,7 +144,7 @@ public async Task DeleteSteamAsync_should_return_not_found_when_stream_not_exist var streamId = Guid.NewGuid(); - var response = await client.DeleteAsync($"/api/v1/streams/{streamId}"); + var response = await client.DeleteAsync($"/api/v1/streams/{_defaultStreamsType}/{streamId}"); response.StatusCode.Should().Be(System.Net.HttpStatusCode.NotFound); } } diff --git a/tests/EvenireDB.Server.Tests/TestServerWebApplicationFactory.cs b/tests/EvenireDB.Server.Tests/TestServerWebApplicationFactory.cs index b74f1e0..fb036c4 100644 --- a/tests/EvenireDB.Server.Tests/TestServerWebApplicationFactory.cs +++ b/tests/EvenireDB.Server.Tests/TestServerWebApplicationFactory.cs @@ -9,7 +9,7 @@ public class TestServerWebApplicationFactory : WebApplicationFactory public TestServerWebApplicationFactory() { - _dataFolder = Directory.CreateTempSubdirectory("eveniredb-tests"); + _dataFolder = Directory.CreateTempSubdirectory("eveniredb-server-tests"); } protected override void ConfigureWebHost(IWebHostBuilder builder) @@ -28,17 +28,16 @@ protected override void ConfigureWebHost(IWebHostBuilder builder) protected override void Dispose(bool disposing) { - try + lock (this) { - lock (this) - { + try { if (_dataFolder.Exists) _dataFolder.Delete(true); } - } - catch - { - // best effort - } + catch + { + // best effort + } + } } } diff --git a/tests/EvenireDB.Tests/DataFixture.cs b/tests/EvenireDB.Tests/DataFixture.cs index 83093db..3e551bb 100644 --- a/tests/EvenireDB.Tests/DataFixture.cs +++ b/tests/EvenireDB.Tests/DataFixture.cs @@ -2,29 +2,38 @@ public class DataFixture : IAsyncLifetime { - private const string BaseDataPath = "./data/"; - private readonly List _configs = new(); + private DirectoryInfo _baseDataPath; - internal ExtentInfoProviderConfig CreateExtentsConfig(Guid aggregateId) - { - var path = Path.Combine(BaseDataPath, aggregateId.ToString()); + internal ExtentsProviderConfig CreateExtentsConfig() + { + var path = Path.Combine(_baseDataPath.FullName, Guid.NewGuid().ToString()); Directory.CreateDirectory(path); - var config = new ExtentInfoProviderConfig(path); - _configs.Add(config); + + var config = new ExtentsProviderConfig(path); return config; } public Task DisposeAsync() { - foreach (var config in _configs) - Directory.Delete(config.BasePath, true); + lock (this) + { + try + { + if (_baseDataPath.Exists) + _baseDataPath.Delete(true); + } + catch + { + // best effort + } + } + return Task.CompletedTask; } public Task InitializeAsync() { - if (!Directory.Exists(BaseDataPath)) - Directory.CreateDirectory(BaseDataPath); + _baseDataPath = Directory.CreateTempSubdirectory("eveniredb-tests"); return Task.CompletedTask; } diff --git a/tests/EvenireDB.Tests/EvenireDB.Tests.csproj b/tests/EvenireDB.Tests/EvenireDB.Tests.csproj index 883c1a6..e8abb9e 100644 --- a/tests/EvenireDB.Tests/EvenireDB.Tests.csproj +++ b/tests/EvenireDB.Tests/EvenireDB.Tests.csproj @@ -26,7 +26,6 @@ - diff --git a/tests/EvenireDB.Tests/EventsProviderTests.cs b/tests/EvenireDB.Tests/EventsProviderTests.cs index af3457b..4e0f371 100644 --- a/tests/EvenireDB.Tests/EventsProviderTests.cs +++ b/tests/EvenireDB.Tests/EventsProviderTests.cs @@ -1,3 +1,4 @@ +using EvenireDB.Common; using EvenireDB.Persistence; namespace EvenireDB.Tests; @@ -11,10 +12,10 @@ public EventsProviderTests(DataFixture fixture) _fixture = fixture; } - private EventsProvider CreateSut(Guid streamId, out IExtentInfoProvider extentInfoProvider) + private EventsProvider CreateSut(out IExtentsProvider extentInfoProvider) { - var config = _fixture.CreateExtentsConfig(streamId); - extentInfoProvider = new ExtentInfoProvider(config); + var config = _fixture.CreateExtentsConfig(); + extentInfoProvider = new ExtentsProvider(config); var dataRepo = new DataRepository(); var headersRepo = new HeadersRepository(); return new EventsProvider(headersRepo, dataRepo, extentInfoProvider); @@ -27,9 +28,9 @@ public async Task AppendAsync_should_write_events(int eventsCount, int expectedF { var events = _fixture.BuildEvents(eventsCount, new byte[] { 0x42 }); - var streamId = Guid.NewGuid(); - - var sut = CreateSut(streamId, out var extentInfoProvider); + var streamId = new StreamId { Key = Guid.NewGuid(), Type = "lorem" }; + + var sut = CreateSut(out var extentInfoProvider); await sut.AppendAsync(streamId, events); @@ -46,9 +47,9 @@ public async Task AppendAsync_should_append_events(int batchesCount, int eventsP .Select(b => _fixture.BuildEvents(eventsPerBatch, new byte[] { 0x42 })) .ToArray(); - var streamId = Guid.NewGuid(); - - var sut = CreateSut(streamId, out var extentInfoProvider); + var streamId = new StreamId { Key = Guid.NewGuid(), Type = "lorem" }; + + var sut = CreateSut(out var extentInfoProvider); foreach (var events in batches) await sut.AppendAsync(streamId, events); @@ -65,9 +66,9 @@ public async Task ReadAsync_should_read_entire_stream(int eventsCount) { var expectedEvents = _fixture.BuildEvents(eventsCount); - var streamId = Guid.NewGuid(); + var streamId = new StreamId { Key = Guid.NewGuid(), Type = "lorem" }; - var sut = CreateSut(streamId, out var _); + var sut = CreateSut(out var _); await sut.AppendAsync(streamId, expectedEvents); @@ -91,9 +92,9 @@ public async Task ReadAsync_should_read_events_appended_in_batches(int batchesCo .Select(b => _fixture.BuildEvents(eventsPerBatch, new byte[] { 0x42 })) .ToArray(); - var streamId = Guid.NewGuid(); + var streamId = new StreamId { Key = Guid.NewGuid(), Type = "lorem" }; - var sut = CreateSut(streamId, out var _); + var sut = CreateSut(out var _); foreach (var events in batches) await sut.AppendAsync(streamId, events); @@ -110,9 +111,9 @@ public async Task ReadAsync_should_read_paged_events() { var expectedEvents = _fixture.BuildEvents(42); - var streamId = Guid.NewGuid(); + var streamId = new StreamId { Key = Guid.NewGuid(), Type = "lorem" }; - var sut = CreateSut(streamId, out var _); + var sut = CreateSut(out var _); await sut.AppendAsync(streamId, expectedEvents); @@ -126,15 +127,14 @@ public async Task ReadAsync_should_read_paged_events() } } - [Fact] public async Task ReadAsync_should_return_no_data_when_paging_invalid() { var expectedEvents = _fixture.BuildEvents(42); - var streamId = Guid.NewGuid(); + var streamId = new StreamId { Key = Guid.NewGuid(), Type = "lorem" }; - var sut = CreateSut(streamId, out var _); + var sut = CreateSut(out var _); await sut.AppendAsync(streamId, expectedEvents); diff --git a/tests/EvenireDB.Tests/EventsReaderTests.cs b/tests/EvenireDB.Tests/EventsReaderTests.cs index aa88c0a..ec73cce 100644 --- a/tests/EvenireDB.Tests/EventsReaderTests.cs +++ b/tests/EvenireDB.Tests/EventsReaderTests.cs @@ -11,8 +11,8 @@ public async Task ReadAsync_should_return_empty_collection_when_data_not_availab { var cache = Substitute.For(); var sut = new EventsReader(EventsReaderConfig.Default, cache); - - var events = await sut.ReadAsync(Guid.NewGuid(), StreamPosition.Start) + var streamId = new StreamId { Key = Guid.NewGuid(), Type = "lorem" }; + var events = await sut.ReadAsync(streamId, StreamPosition.Start) .ToListAsync(); events.Should().NotBeNull().And.BeEmpty(); } @@ -20,7 +20,7 @@ public async Task ReadAsync_should_return_empty_collection_when_data_not_availab [Fact] public async Task ReadAsync_should_pull_data_from_repo_on_cache_miss() { - var streamId = Guid.NewGuid(); + var streamId = new StreamId { Key = Guid.NewGuid(), Type = "lorem" }; var sourceEvents = Enumerable.Range(0, 242) .Select(i => new Event(new EventId(i, 0), "lorem", _defaultData)) @@ -42,7 +42,7 @@ public async Task ReadAsync_should_pull_data_from_repo_on_cache_miss() [Fact] public async Task ReadAsync_should_be_able_to_read_backwards() { - var streamId = Guid.NewGuid(); + var streamId = new StreamId { Key = Guid.NewGuid(), Type = "lorem" }; var sourceEvents = Enumerable.Range(0, 242) .Select(i => new Event(new EventId(i, 0), "lorem", _defaultData)) @@ -66,7 +66,7 @@ public async Task ReadAsync_should_be_able_to_read_backwards() [Fact] public async Task ReadAsync_should_be_able_to_read_backwards_from_position() { - var streamId = Guid.NewGuid(); + var streamId = new StreamId { Key = Guid.NewGuid(), Type = "lorem" }; var sourceEvents = Enumerable.Range(0, 242) .Select(i => new Event(new EventId(i, 0), "lorem", _defaultData)) @@ -94,7 +94,7 @@ public async Task ReadAsync_should_be_able_to_read_backwards_from_position() [Fact] public async Task ReadAsync_should_be_able_to_read_last_page_backwards_from_position() { - var streamId = Guid.NewGuid(); + var streamId = new StreamId { Key = Guid.NewGuid(), Type = "lorem" }; var sourceEvents = Enumerable.Range(0, 242) .Select(i => new Event(new EventId(i, 0), "lorem", _defaultData)) @@ -120,7 +120,7 @@ public async Task ReadAsync_should_be_able_to_read_last_page_backwards_from_posi [Fact] public async Task ReadAsync_should_be_able_to_read_forward() { - var streamId = Guid.NewGuid(); + var streamId = new StreamId { Key = Guid.NewGuid(), Type = "lorem" }; var sourceEvents = Enumerable.Range(0, 242) .Select(i => new Event(new EventId(i, 0), "lorem", _defaultData)) @@ -144,7 +144,7 @@ public async Task ReadAsync_should_be_able_to_read_forward() [Fact] public async Task ReadAsync_should_be_able_to_read_forward_from_position() { - var streamId = Guid.NewGuid(); + var streamId = new StreamId { Key = Guid.NewGuid(), Type = "lorem" }; var sourceEvents = Enumerable.Range(0, 242) .Select(i => new Event(new EventId(i, 0), "lorem", _defaultData)) diff --git a/tests/EvenireDB.Tests/EventsWriterTests.cs b/tests/EvenireDB.Tests/EventsWriterTests.cs index 2c3280a..1cc2b50 100644 --- a/tests/EvenireDB.Tests/EventsWriterTests.cs +++ b/tests/EvenireDB.Tests/EventsWriterTests.cs @@ -1,87 +1,85 @@ using EvenireDB.Common; -using Microsoft.Extensions.Logging; using System.Threading.Channels; -namespace EvenireDB.Tests +namespace EvenireDB.Tests; + +public class EventsWriterTests { - public class EventsWriterTests + private readonly static byte[] _defaultData = new byte[] { 0x42 }; + + [Fact] + public async Task AppendAsync_should_fail_when_stream_version_mismatch() + { + var streamId = new StreamId { Key = Guid.NewGuid(), Type = "lorem" }; + + var inputEvents = Enumerable.Range(0, 10) + .Select(i => new Event(new EventId(i, 0), "lorem", _defaultData)) + .ToArray(); + + var cache = Substitute.For(); + cache.GetEventsAsync(streamId, Arg.Any()).Returns(new CachedEvents(new List(), new SemaphoreSlim(1, 1))); + + var channelWriter = NSubstitute.Substitute.ForPartsOf>(); + channelWriter.TryWrite(Arg.Any()).Returns(true); + + var idGenerator = Substitute.For(); + var logger = Substitute.For>(); + var sut = new EventsWriter(cache, channelWriter, idGenerator, logger); + + var result = await sut.AppendAsync(streamId, inputEvents, expectedVersion: inputEvents.Count() - 1); + result.Should().BeOfType(); + + var failure = (FailureResult)result; + failure.Code.Should().Be(ErrorCodes.VersionMismatch); + } + + [Fact] + public async Task AppendAsync_should_fail_when_channel_rejects_message() + { + var streamId = new StreamId { Key = Guid.NewGuid(), Type = "lorem" }; + + var inputEvents = Enumerable.Range(0, 10) + .Select(i => new Event(new EventId(i, 0), "lorem", _defaultData)) + .ToArray(); + + var cache = Substitute.For(); + cache.GetEventsAsync(streamId, Arg.Any()).Returns(new CachedEvents(new List(), new SemaphoreSlim(1,1))); + + var channelWriter = NSubstitute.Substitute.ForPartsOf>(); + channelWriter.TryWrite(Arg.Any()).Returns(false); + + var idGenerator = Substitute.For(); + var logger = Substitute.For>(); + var sut = new EventsWriter(cache, channelWriter, idGenerator, logger); + + await sut.AppendAsync(streamId, inputEvents); + var result = await sut.AppendAsync(streamId, inputEvents); + result.Should().BeOfType(); + + var failure = (FailureResult)result; + failure.Code.Should().Be(ErrorCodes.CannotInitiateWrite); + } + + [Fact] + public async Task AppendAsync_should_succeed_when_events_valid() { - private readonly static byte[] _defaultData = new byte[] { 0x42 }; - - [Fact] - public async Task AppendAsync_should_fail_when_stream_version_mismatch() - { - var streamId = Guid.NewGuid(); - - var inputEvents = Enumerable.Range(0, 10) - .Select(i => new Event(new EventId(i, 0), "lorem", _defaultData)) - .ToArray(); - - var cache = Substitute.For(); - cache.GetEventsAsync(streamId, Arg.Any()).Returns(new CachedEvents(new List(), new SemaphoreSlim(1, 1))); - - var channelWriter = NSubstitute.Substitute.ForPartsOf>(); - channelWriter.TryWrite(Arg.Any()).Returns(true); - - var idGenerator = Substitute.For(); - var logger = Substitute.For>(); - var sut = new EventsWriter(cache, channelWriter, idGenerator, logger); - - var result = await sut.AppendAsync(streamId, inputEvents, expectedVersion: inputEvents.Count() - 1); - result.Should().BeOfType(); - - var failure = (FailureResult)result; - failure.Code.Should().Be(ErrorCodes.VersionMismatch); - } - - [Fact] - public async Task AppendAsync_should_fail_when_channel_rejects_message() - { - var streamId = Guid.NewGuid(); - - var inputEvents = Enumerable.Range(0, 10) - .Select(i => new Event(new EventId(i, 0), "lorem", _defaultData)) - .ToArray(); - - var cache = Substitute.For(); - cache.GetEventsAsync(streamId, Arg.Any()).Returns(new CachedEvents(new List(), new SemaphoreSlim(1,1))); - - var channelWriter = NSubstitute.Substitute.ForPartsOf>(); - channelWriter.TryWrite(Arg.Any()).Returns(false); - - var idGenerator = Substitute.For(); - var logger = Substitute.For>(); - var sut = new EventsWriter(cache, channelWriter, idGenerator, logger); - - await sut.AppendAsync(streamId, inputEvents); - var result = await sut.AppendAsync(streamId, inputEvents); - result.Should().BeOfType(); - - var failure = (FailureResult)result; - failure.Code.Should().Be(ErrorCodes.CannotInitiateWrite); - } - - [Fact] - public async Task AppendAsync_should_succeed_when_events_valid() - { - var streamId = Guid.NewGuid(); - - var expectedEvents = Enumerable.Range(0, 242) - .Select(i => new Event(new EventId(i, 0), "lorem", _defaultData)) - .ToArray(); - - var cache = Substitute.For(); - cache.GetEventsAsync(streamId, Arg.Any()).Returns(new CachedEvents(new List(), new SemaphoreSlim(1, 1))); - - var channelWriter = NSubstitute.Substitute.ForPartsOf>(); - channelWriter.TryWrite(Arg.Any()).Returns(true); - - var idGenerator = Substitute.For(); - var logger = Substitute.For>(); - var sut = new EventsWriter(cache, channelWriter, idGenerator, logger); - - var result = await sut.AppendAsync(streamId, expectedEvents); - result.Should().BeOfType(); - } + var streamId = new StreamId { Key = Guid.NewGuid(), Type = "lorem" }; + + var expectedEvents = Enumerable.Range(0, 242) + .Select(i => new Event(new EventId(i, 0), "lorem", _defaultData)) + .ToArray(); + + var cache = Substitute.For(); + cache.GetEventsAsync(streamId, Arg.Any()).Returns(new CachedEvents(new List(), new SemaphoreSlim(1, 1))); + + var channelWriter = NSubstitute.Substitute.ForPartsOf>(); + channelWriter.TryWrite(Arg.Any()).Returns(true); + + var idGenerator = Substitute.For(); + var logger = Substitute.For>(); + var sut = new EventsWriter(cache, channelWriter, idGenerator, logger); + + var result = await sut.AppendAsync(streamId, expectedEvents); + result.Should().BeOfType(); } } diff --git a/tests/EvenireDB.Tests/StreamInfoProviderTests.cs b/tests/EvenireDB.Tests/StreamInfoProviderTests.cs index a35513b..a510013 100644 --- a/tests/EvenireDB.Tests/StreamInfoProviderTests.cs +++ b/tests/EvenireDB.Tests/StreamInfoProviderTests.cs @@ -1,30 +1,44 @@ +using EvenireDB.Common; + namespace EvenireDB.Tests; public class StreamInfoProviderTests { [Fact] - public async Task DeleteStreamAsync_should_throw_when_stream_does_not_exist() + public async Task DeleteStreamAsync_should_return_false_when_stream_does_not_exist() { - var streamId = Guid.NewGuid(); + var streamId = new StreamId { Key = Guid.NewGuid(), Type = "lorem" }; - var extentsProvider = Substitute.For(); + var extentsProvider = Substitute.For(); var cache = Substitute.For(); var logger = Substitute.For>(); var sut = new StreamInfoProvider(extentsProvider, cache, logger); - await Assert.ThrowsAsync(async () => - await sut.DeleteStreamAsync(streamId)); + var result = await sut.DeleteStreamAsync(streamId); + result.Should().BeFalse(); var exists = sut.GetStreamInfo(streamId) is not null; exists.Should().BeFalse(); } + [Fact] + public async Task GetStreamsInfo_should_return_empty_when_no_streams_available() + { + var extentsProvider = Substitute.For(); + var cache = Substitute.For(); + var logger = Substitute.For>(); + var sut = new StreamInfoProvider(extentsProvider, cache, logger); + + var results = sut.GetStreamsInfo(); + results.Should().BeEmpty(); + } + [Fact] public void GetStreamInfo_should_return_null_when_stream_does_not_exist() { - var streamId = Guid.NewGuid(); + var streamId = new StreamId { Key = Guid.NewGuid(), Type = "lorem" }; - var extentsProvider = Substitute.For(); + var extentsProvider = Substitute.For(); var cache = Substitute.For(); var logger = Substitute.For>(); var sut = new StreamInfoProvider(extentsProvider, cache, logger); diff --git a/tests/EvenireDB.Tests/StreamTypeTests.cs b/tests/EvenireDB.Tests/StreamTypeTests.cs new file mode 100644 index 0000000..4b760c2 --- /dev/null +++ b/tests/EvenireDB.Tests/StreamTypeTests.cs @@ -0,0 +1,16 @@ +using EvenireDB.Common; + +namespace EvenireDB.Tests; + +public class StreamTypeTests +{ + [Theory] + [InlineData("")] + [InlineData(" ")] + [InlineData(null)] + public void StreamType_should_throw_when_value_invalid(string value) + { + Action act = () => new StreamType(value); + act.Should().Throw(); + } +}