Skip to content

Commit

Permalink
first commit
Browse files Browse the repository at this point in the history
  • Loading branch information
mizrael committed Jun 9, 2024
1 parent c60864d commit 351ecc7
Show file tree
Hide file tree
Showing 16 changed files with 427 additions and 244 deletions.
7 changes: 1 addition & 6 deletions src/EvenireDB/ExtentInfo.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,3 @@
namespace EvenireDB;

public readonly struct ExtentInfo
{
public readonly required Guid StreamId { get; init; }
public readonly required string DataPath { get; init; }
public readonly required string HeadersPath { get; init; }
}
public record ExtentInfo(Guid StreamId, string DataPath, string HeadersPath);
17 changes: 11 additions & 6 deletions src/EvenireDB/ExtentInfoProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,22 @@ public ExtentInfoProvider(ExtentInfoProviderConfig config)
Directory.CreateDirectory(config.BasePath);
}

public ExtentInfo GetExtentInfo(Guid streamId)
public ExtentInfo? GetExtentInfo(Guid streamId)
{
// TODO: tests
var key = streamId.ToString("N");
int extentNumber = 0; // TODO: calculate

var headersPath = Path.Combine(_config.BasePath, $"{key}_{extentNumber}_headers.dat");
if(!File.Exists(headersPath))
return null;

return new ExtentInfo
{
StreamId = streamId,
DataPath = Path.Combine(_config.BasePath, $"{key}_{extentNumber}_data.dat"),
HeadersPath = Path.Combine(_config.BasePath, $"{key}_{extentNumber}_headers.dat"),
};
(
StreamId: streamId,
DataPath: Path.Combine(_config.BasePath, $"{key}_{extentNumber}_data.dat"),
HeadersPath: headersPath
);
}

public IEnumerable<ExtentInfo> GetExtentsInfo()
Expand Down
14 changes: 0 additions & 14 deletions src/EvenireDB/IEventsProvider.cs

This file was deleted.

12 changes: 12 additions & 0 deletions src/EvenireDB/IEventsReader.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
using EvenireDB.Common;

namespace EvenireDB;

public interface IEventsReader
{
IAsyncEnumerable<Event> ReadAsync(
Guid streamId,
StreamPosition startPosition,
Direction direction = Direction.Forward,
CancellationToken cancellationToken = default);
}
2 changes: 1 addition & 1 deletion src/EvenireDB/IExtentInfoProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@ namespace EvenireDB;

public interface IExtentInfoProvider
{
ExtentInfo GetExtentInfo(Guid streamId);
ExtentInfo? GetExtentInfo(Guid streamId);
IEnumerable<ExtentInfo> GetExtentsInfo();
}
3 changes: 2 additions & 1 deletion src/EvenireDB/IStreamInfoProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@ namespace EvenireDB;
public interface IStreamInfoProvider
{
IEnumerable<StreamInfo> GetStreamsInfo();
StreamInfo GetStreamInfo(Guid streamId);
StreamInfo? GetStreamInfo(Guid streamId);
ValueTask DeleteStreamAsync(Guid streamId, CancellationToken cancellationToken = default);
}
1 change: 1 addition & 0 deletions src/EvenireDB/IStreamsCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ public interface IStreamsCache
void Update(Guid streamId, CachedEvents entry);
ValueTask<CachedEvents> GetEventsAsync(Guid streamId, CancellationToken cancellationToken);
bool ContainsKey(Guid streamId);
void Remove(Guid streamId);
}
104 changes: 62 additions & 42 deletions src/EvenireDB/LogMessages.cs
Original file line number Diff line number Diff line change
@@ -1,46 +1,66 @@
using Microsoft.Extensions.Logging;

namespace EvenireDB
namespace EvenireDB;

public static partial class LogMessages
{
public static partial class LogMessages
public enum EventIds
{
public enum EventIds
{
ReadingStreamFromRepository = 0,
HighMemoryUsageDetected,
MemoryUsageBelowTreshold,
EventsGroupPersistenceError,
AppendingEventsToStream
}

[LoggerMessage(
EventId = (int)EventIds.ReadingStreamFromRepository,
Level = LogLevel.Warning,
Message = "Reading stream '{StreamId}' from repository")]
public static partial void ReadingStreamFromRepository(this ILogger logger, Guid streamId);

[LoggerMessage(
EventId = (int)EventIds.AppendingEventsToStream,
Level = LogLevel.Debug,
Message = "Appending {EventsCount} events to stream '{StreamId}'...")]
public static partial void AppendingEventsToStream(this ILogger logger, int eventsCount, Guid streamId);

[LoggerMessage(
EventId = (int)EventIds.HighMemoryUsageDetected,
Level = LogLevel.Warning,
Message = "Memory usage is {MemoryUsage} bytes, more than the allowed value: {MaxAllowedAllocatedBytes}. Dropping some cached streams.")]
public static partial void HighMemoryUsageDetected(this ILogger logger, long memoryUsage, long maxAllowedAllocatedBytes);

[LoggerMessage(
EventId = (int)EventIds.MemoryUsageBelowTreshold,
Level = LogLevel.Debug,
Message = "Memory usage is {MemoryUsage} / {MaxAllowedAllocatedBytes} bytes")]
public static partial void MemoryUsageBelowTreshold(this ILogger logger, long memoryUsage, long maxAllowedAllocatedBytes);

[LoggerMessage(
EventId = (int)EventIds.EventsGroupPersistenceError,
Level = LogLevel.Error,
Message = "an error has occurred while persisting events group for stream {StreamId}: {Error}")]
public static partial void EventsGroupPersistenceError(this ILogger logger, Guid streamId, string error);
}
}
ReadingStreamFromRepository = 0,
HighMemoryUsageDetected,
MemoryUsageBelowTreshold,
EventsGroupPersistenceError,
AppendingEventsToStream,
StreamDeletionAttempt,
StreamDeleted,
StreamDeletionFailed
}

[LoggerMessage(
EventId = (int)EventIds.ReadingStreamFromRepository,
Level = LogLevel.Warning,
Message = "Reading stream '{StreamId}' from repository")]
public static partial void ReadingStreamFromRepository(this ILogger logger, Guid streamId);

[LoggerMessage(
EventId = (int)EventIds.AppendingEventsToStream,
Level = LogLevel.Debug,
Message = "Appending {EventsCount} events to stream '{StreamId}'...")]
public static partial void AppendingEventsToStream(this ILogger logger, int eventsCount, Guid streamId);

[LoggerMessage(
EventId = (int)EventIds.HighMemoryUsageDetected,
Level = LogLevel.Warning,
Message = "Memory usage is {MemoryUsage} bytes, more than the allowed value: {MaxAllowedAllocatedBytes}. Dropping some cached streams.")]
public static partial void HighMemoryUsageDetected(this ILogger logger, long memoryUsage, long maxAllowedAllocatedBytes);

[LoggerMessage(
EventId = (int)EventIds.MemoryUsageBelowTreshold,
Level = LogLevel.Debug,
Message = "Memory usage is {MemoryUsage} / {MaxAllowedAllocatedBytes} bytes")]
public static partial void MemoryUsageBelowTreshold(this ILogger logger, long memoryUsage, long maxAllowedAllocatedBytes);

[LoggerMessage(
EventId = (int)EventIds.EventsGroupPersistenceError,
Level = LogLevel.Error,
Message = "an error has occurred while persisting events group for stream {StreamId}: {Error}")]
public static partial void EventsGroupPersistenceError(this ILogger logger, Guid streamId, string error);

[LoggerMessage(
EventId = (int)EventIds.StreamDeletionAttempt,
Level = LogLevel.Information,
Message = "Trying to delete stream '{StreamId}', attempt #{Attempt} ...")]
public static partial void StreamDeletionAttempt(this ILogger logger, Guid streamId, int attempt);

[LoggerMessage(
EventId = (int)EventIds.StreamDeleted,
Level = LogLevel.Information,
Message = "Stream '{StreamId}' deleted.")]
public static partial void StreamDeleted(this ILogger logger, Guid streamId);

[LoggerMessage(
EventId = (int)EventIds.StreamDeletionFailed,
Level = LogLevel.Critical,
Message = "Stream '{StreamId}' deletion failed: {Error}")]
public static partial void StreamDeletionFailed(this ILogger logger, Guid streamId, string error);
}
6 changes: 3 additions & 3 deletions src/EvenireDB/StreamInfo.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
namespace EvenireDB;

public readonly record struct StreamInfo(
Guid StreamId,
long EventsCount,
public record StreamInfo(
Guid StreamId,
long EventsCount,
bool IsCached,
DateTimeOffset CreatedAt,
DateTimeOffset LastAccessedAt);
58 changes: 54 additions & 4 deletions src/EvenireDB/StreamInfoProvider.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Runtime.InteropServices;
using Microsoft.Extensions.Logging;

namespace EvenireDB;

Expand All @@ -7,18 +8,28 @@ internal class StreamInfoProvider : IStreamInfoProvider
private readonly int _headerSize = Marshal.SizeOf<RawHeader>();
private readonly IExtentInfoProvider _extentInfoProvider;
private readonly IStreamsCache _cache;

public StreamInfoProvider(IExtentInfoProvider extentInfoProvider, IStreamsCache cache)
private readonly ILogger<StreamInfoProvider> _logger;

public StreamInfoProvider(
IExtentInfoProvider extentInfoProvider,
IStreamsCache cache,
ILogger<StreamInfoProvider> logger)
{
_extentInfoProvider = extentInfoProvider ?? throw new ArgumentNullException(nameof(extentInfoProvider));
_cache = cache ?? throw new ArgumentNullException(nameof(cache));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}

public StreamInfo GetStreamInfo(Guid streamId)
public StreamInfo? GetStreamInfo(Guid streamId)
{
var extent = _extentInfoProvider.GetExtentInfo(streamId);
if (extent is null)
return null;

var fileInfo = new FileInfo(extent.HeadersPath);
if (!fileInfo.Exists)
return null;

var headersCount = fileInfo.Length / _headerSize;
return new StreamInfo(
streamId,
Expand All @@ -31,9 +42,48 @@ public StreamInfo GetStreamInfo(Guid streamId)
public IEnumerable<StreamInfo> GetStreamsInfo()
{
var allExtents = _extentInfoProvider.GetExtentsInfo();
List<StreamInfo> results = new();
foreach (var extent in allExtents)
{
yield return GetStreamInfo(extent.StreamId);
var info = GetStreamInfo(extent.StreamId);
if (info is not null)
results.Add(info!);
}

return results;
}

public async ValueTask DeleteStreamAsync(Guid streamId, CancellationToken cancellationToken = default)
{
var extent = _extentInfoProvider.GetExtentInfo(streamId);
var deleted = false;
int attempt = 0;

while (!deleted && attempt < 10 && !cancellationToken.IsCancellationRequested)
{
_logger.StreamDeletionAttempt(streamId, attempt);

try
{
if (File.Exists(extent.HeadersPath))

Check warning on line 68 in src/EvenireDB/StreamInfoProvider.cs

View workflow job for this annotation

GitHub Actions / build

Dereference of a possibly null reference.
File.Delete(extent.HeadersPath);

if (File.Exists(extent.DataPath))
File.Delete(extent.DataPath);

_cache.Remove(streamId);

deleted = true;
}
catch (Exception ex)
{
_logger.StreamDeletionFailed(streamId, ex.Message);
attempt++;
await Task.Delay(1000);
}
}

if (deleted)
_logger.StreamDeleted(streamId);
}
}
3 changes: 3 additions & 0 deletions src/EvenireDB/StreamsCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,7 @@ public void Update(Guid streamId, CachedEvents entry)

public bool ContainsKey(Guid streamId)
=> _cache.ContainsKey(streamId);

public void Remove(Guid streamId)
=> _cache.Remove(streamId);
}
2 changes: 2 additions & 0 deletions src/EvenireDB/Utils/ICache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ public interface ICache<TKey, TValue> where TKey : notnull
ValueTask<TValue> GetOrAddAsync(TKey key, Func<TKey, CancellationToken, ValueTask<TValue>> valueFactory, CancellationToken cancellationToken = default);
void AddOrUpdate(TKey key, TValue value);
void DropOldest(uint maxCount);
void Remove(TKey key);

public uint Count { get; }
IEnumerable<TKey> Keys { get; }
}
35 changes: 34 additions & 1 deletion src/EvenireDB/Utils/LRUCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,42 @@ public void DropOldest(uint maxCount)
countToRemove--;
}

if(curr is not null)
if (curr is not null)
curr.Next = null;
}
}

public void Remove(TKey key)
{
if (!_cache.TryGetValue(key, out var node))
return;

SemaphoreSlim semaphore = GetSemaphore(key);
semaphore.Wait();

try
{
if (_head == node)
_head = node.Next;

if (_tail == node)
_tail = node.Previous;

if (node.Previous != null)
node.Previous.Next = node.Next;

if (node.Next != null)
node.Next.Previous = node.Previous;

_cache.Remove(key);
_semaphores.Remove(key);
}
finally
{
semaphore.Release();
}
}

public bool ContainsKey(TKey key)
=> _cache.ContainsKey(key);

Expand Down Expand Up @@ -185,6 +216,8 @@ private void MoveToHead(Node node)

public uint Count => (uint)_cache.Count;

public IEnumerable<TKey> Keys => _cache.Keys;

protected virtual void Dispose(bool disposing)
{
if (!_disposed)
Expand Down
3 changes: 2 additions & 1 deletion tests/EvenireDB.Tests/GlobalUsings.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
global using Xunit;
global using FluentAssertions;
global using NSubstitute;
global using EvenireDB.Utils;
global using EvenireDB.Utils;
global using Microsoft.Extensions.Logging;
Loading

0 comments on commit 351ecc7

Please sign in to comment.