Skip to content

Commit

Permalink
Merge pull request #14 from mizrael/memory
Browse files Browse the repository at this point in the history
updated memory watcher
  • Loading branch information
mizrael authored Jun 7, 2024
2 parents f3cc6ae + b1d2328 commit ef5093e
Show file tree
Hide file tree
Showing 11 changed files with 126 additions and 22 deletions.
3 changes: 3 additions & 0 deletions src/EvenireDB.Server/DTO/ApiError.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
namespace EvenireDB.Server.DTO;

public record ApiError(int Code, string Message);
6 changes: 2 additions & 4 deletions src/EvenireDB.Server/DTO/EventDataDTO.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,2 @@
namespace EvenireDB.Server.DTO
{
public record EventDataDTO(string Type, ReadOnlyMemory<byte> Data);
}
namespace EvenireDB.Server.DTO;
public record EventDataDTO(string Type, ReadOnlyMemory<byte> Data);
17 changes: 9 additions & 8 deletions src/EvenireDB.Server/Routes/StreamsRoutes.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ private static async IAsyncEnumerable<EventDTO> GetEvents(
[FromQuery(Name = "pos")] uint startPosition = 0,
[FromQuery(Name = "dir")] Direction direction = Direction.Forward)
{
await foreach (var @event in reader.ReadAsync(streamId, direction: direction, startPosition: startPosition))
await foreach (var @event in reader.ReadAsync(streamId, direction: direction, startPosition: startPosition).ConfigureAwait(false))
yield return EventDTO.FromModel(@event);
}

Expand All @@ -36,25 +36,26 @@ private static async ValueTask<IResult> SaveEvents(
[FromBody] EventDataDTO[]? dtos)
{
if(dtos is null)
return Results.BadRequest();
return Results.BadRequest(new ApiError(ErrorCodes.BadRequest, "No events provided"));

EventData[] events;

try
{
events = mapper.ToModels(dtos);
}
catch
catch(Exception ex)
{
// TODO: build proper response
return Results.BadRequest();
return Results.BadRequest(new ApiError(ErrorCodes.BadRequest, ex.Message));
}

var result = await writer.AppendAsync(streamId, events, expectedVersion);
var result = await writer.AppendAsync(streamId, events, expectedVersion)
.ConfigureAwait(false);
return result switch
{
FailureResult { Code: ErrorCodes.DuplicateEvent } d => Results.Conflict(d.Message),
FailureResult { Code: ErrorCodes.VersionMismatch } d => Results.BadRequest(d.Message),
FailureResult { Code: ErrorCodes.DuplicateEvent } d => Results.Conflict(new ApiError(ErrorCodes.DuplicateEvent, d.Message)),
FailureResult { Code: ErrorCodes.VersionMismatch } d => Results.BadRequest(new ApiError(ErrorCodes.VersionMismatch, d.Message)),
FailureResult { Code: ErrorCodes.BadRequest } d => Results.BadRequest(new ApiError(ErrorCodes.BadRequest, d.Message)),
FailureResult => Results.StatusCode(500),
_ => Results.AcceptedAtRoute(nameof(GetEvents), new { streamId })
};
Expand Down
4 changes: 2 additions & 2 deletions src/EvenireDB.Server/appsettings.local.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
}
},
"Server": {
"MemoryWatcherInterval": "0:00:10",
"MaxAllowedAllocatedBytes": 50000000,
"MemoryWatcherInterval": "0:00:5",
"MaxAllowedAllocatedBytes": 500000000,
"HttpSettings": {
"Port": 5001
},
Expand Down
11 changes: 10 additions & 1 deletion src/EvenireDB.sln
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Protos", "Protos", "{1CC979
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "EvenireDB.Grpc", "EvenireDB.Grpc\EvenireDB.Grpc.csproj", "{DF4DDF36-7203-467C-898C-C5224A9A6B41}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EvenireDB.Common", "EvenireDB.Common\EvenireDB.Common.csproj", "{CA95FFB9-A44C-4DF0-B4C7-D03C0F9A660D}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "EvenireDB.Common", "EvenireDB.Common\EvenireDB.Common.csproj", "{CA95FFB9-A44C-4DF0-B4C7-D03C0F9A660D}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "tools", "tools", "{0BFC47FA-4EFF-41D4-A7F1-EEBAEB44F461}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EvenireDB.Tools.EventsGenerator", "tools\EvenireDB.Tools.EventsGenerator\EvenireDB.Tools.EventsGenerator.csproj", "{90FD3F30-39AF-4F30-8403-C4F799A0C67A}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Expand Down Expand Up @@ -78,6 +82,10 @@ Global
{CA95FFB9-A44C-4DF0-B4C7-D03C0F9A660D}.Debug|Any CPU.Build.0 = Debug|Any CPU
{CA95FFB9-A44C-4DF0-B4C7-D03C0F9A660D}.Release|Any CPU.ActiveCfg = Release|Any CPU
{CA95FFB9-A44C-4DF0-B4C7-D03C0F9A660D}.Release|Any CPU.Build.0 = Release|Any CPU
{90FD3F30-39AF-4F30-8403-C4F799A0C67A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{90FD3F30-39AF-4F30-8403-C4F799A0C67A}.Debug|Any CPU.Build.0 = Debug|Any CPU
{90FD3F30-39AF-4F30-8403-C4F799A0C67A}.Release|Any CPU.ActiveCfg = Release|Any CPU
{90FD3F30-39AF-4F30-8403-C4F799A0C67A}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -87,6 +95,7 @@ Global
{57498174-AE43-4872-8DD1-D1AC649DFD00} = {CDD60004-8C27-46E1-9235-9376B86E9003}
{FC54BD0B-1471-4252-8B3F-97BE64B90B1A} = {CDD60004-8C27-46E1-9235-9376B86E9003}
{B0B002EB-6E66-4D86-B03B-7CB2AABD865E} = {967C1ED1-EA6E-4C72-BB85-38D8D3057A5F}
{90FD3F30-39AF-4F30-8403-C4F799A0C67A} = {0BFC47FA-4EFF-41D4-A7F1-EEBAEB44F461}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {8D2FA660-798C-4C70-9A70-59435A090136}
Expand Down
3 changes: 3 additions & 0 deletions src/EvenireDB/EventsWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ public async ValueTask<IOperationResult> AppendAsync(
int? expectedVersion = null,
CancellationToken cancellationToken = default)
{
if(streamId == Guid.Empty)
return FailureResult.InvalidStream(streamId);

ArgumentNullException.ThrowIfNull(incomingEvents, nameof(incomingEvents));

if (!incomingEvents.Any())
Expand Down
5 changes: 5 additions & 0 deletions src/EvenireDB/FailureResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ public FailureResult(int code, string message)
public string Message { get; } = string.Empty;
public int Code { get; } = ErrorCodes.Unknown;

public static FailureResult InvalidStream(Guid streamId)
=> new FailureResult(
ErrorCodes.BadRequest,
$"invalid stream id: {streamId}.");

public static FailureResult DuplicateEvent(Event? @event)
=> new FailureResult(
ErrorCodes.DuplicateEvent,
Expand Down
2 changes: 1 addition & 1 deletion src/EvenireDB/LogMessages.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public enum EventIds
[LoggerMessage(
EventId = (int)EventIds.HighMemoryUsageDetected,
Level = LogLevel.Warning,
Message = "Memory usage is {MemoryUsage} bytes, more than the allowed value: {MaxAllowedAllocatedBytes}. Dropping some cached streams")]
Message = "Memory usage is {MemoryUsage} bytes, more than the allowed value: {MaxAllowedAllocatedBytes}. Dropping some cached streams.")]
public static partial void HighMemoryUsageDetected(this ILogger logger, long memoryUsage, long maxAllowedAllocatedBytes);

[LoggerMessage(
Expand Down
18 changes: 12 additions & 6 deletions src/EvenireDB/Workers/MemoryWatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ public class MemoryWatcher : BackgroundService
private readonly MemoryWatcherSettings _settings;
private readonly ILogger<MemoryWatcher> _logger;
private readonly IServiceProvider _sp;
private Process? process;

public MemoryWatcher(MemoryWatcherSettings settings, ILogger<MemoryWatcher> logger, IServiceProvider sp)
{
Expand All @@ -23,12 +24,21 @@ public MemoryWatcher(MemoryWatcherSettings settings, ILogger<MemoryWatcher> logg
_sp = sp;
}

public override void Dispose()
{
process?.Dispose();
base.Dispose();
}

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
using var process = Process.GetCurrentProcess();

process ??= Process.GetCurrentProcess();
process.Refresh();

_logger.MemoryUsageBelowTreshold(process.PrivateMemorySize64, _settings.MaxAllowedAllocatedBytes);

bool needDrop = process.PrivateMemorySize64 > _settings.MaxAllowedAllocatedBytes;
if (needDrop)
{
Expand All @@ -42,10 +52,6 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)

GC.Collect();
}
else
{
_logger.MemoryUsageBelowTreshold(process.PrivateMemorySize64, _settings.MaxAllowedAllocatedBytes);
}

await Task.Delay(_settings.Interval, stoppingToken);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\EvenireDB.Client\EvenireDB.Client.csproj" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="System.CommandLine" Version="2.0.0-beta4.22272.1" />
</ItemGroup>

</Project>
61 changes: 61 additions & 0 deletions src/tools/EvenireDB.Tools.EventsGenerator/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
using EvenireDB.Client;
using Microsoft.Extensions.DependencyInjection;
using System.CommandLine;
using System.Text;

var defaultServerUri = new Uri("http://localhost:5001");
var defaultEventsCount = 100;

var serverOption = new Option<Uri>(
name: "--server",
getDefaultValue: () => defaultServerUri,
description: $"The server uri. Defaults to {defaultServerUri}.");
var useGrpc = new Option<bool>(
name: "--grpc",
description: "Use gRPC instead of HTTP. Defaults to false.");
var eventsCount = new Option<int>(
name: "--count",
getDefaultValue: () => defaultEventsCount,
description: $"Count of events to send. Defaults to {defaultEventsCount}.");

var streamIdOption = new Option<Guid>(
name: "--stream",
description: "The stream to use.");


var rootCommand = new RootCommand
{
serverOption,
useGrpc,
eventsCount,
streamIdOption
};
rootCommand.SetHandler(async (streamId, uri, useGrpc, eventsCount) => {
var services = new ServiceCollection();
services.AddEvenireDB(new EvenireConfig(uri, useGrpc));

var provider = services.BuildServiceProvider();
var client = provider.GetRequiredService<IEventsClient>();

Console.ForegroundColor = ConsoleColor.Yellow;
Console.WriteLine($"Sending {eventsCount} events to stream '{streamId}' on server '{uri}'...");

var events = Enumerable.Range(0, eventsCount).Select(i => new EventData($"event-{i}", Encoding.UTF8.GetBytes($"event-{i}"))).ToArray();

try
{
await client.AppendAsync(streamId, events);
}
catch (ClientException cliEx)
{
Console.ForegroundColor = ConsoleColor.Red;
Console.WriteLine("an error has occurred while sending events: " + cliEx.Message);
}

Console.ForegroundColor = ConsoleColor.Green;
Console.WriteLine("Done.");

Console.ResetColor();
}, streamIdOption, serverOption, useGrpc, eventsCount);

await rootCommand.InvokeAsync(args);

0 comments on commit ef5093e

Please sign in to comment.