Skip to content

Commit

Permalink
refactor: create snapshot until last message
Browse files Browse the repository at this point in the history
  • Loading branch information
jvandaal authored Jan 23, 2025
1 parent ab4efb1 commit 675fe0d
Show file tree
Hide file tree
Showing 28 changed files with 241 additions and 723 deletions.
5 changes: 2 additions & 3 deletions src/RoadRegistry.AdminHost/Infrastructure/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ protected Program()
public static async Task Main(string[] args)
{
var roadRegistryHost = new RoadRegistryHostBuilder<Program>(args)
.ConfigureServices((hostContext, services) => services
.ConfigureServices((_, services) => services
.AddTicketing()
.AddEmailClient()
.AddSingleton(sp => Dispatch.Using(Resolve.WhenEqualToMessage(
Expand Down Expand Up @@ -58,11 +58,10 @@ public static async Task Main(string[] args)
.AddRoadNetworkCommandQueue()
.AddRoadNetworkEventWriter()
.AddRoadRegistrySnapshot()
.AddRoadNetworkSnapshotStrategyOptions()
.AddEditorContext()
.AddProductContext()
)
.ConfigureContainer((hostContext, builder) =>
.ConfigureContainer((_, builder) =>
{
builder.RegisterModule<MediatorModule>();
builder.RegisterModule<BackOffice.Handlers.MediatorModule>();
Expand Down
17 changes: 3 additions & 14 deletions src/RoadRegistry.BackOffice.Api/Infrastructure/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,14 @@ namespace RoadRegistry.BackOffice.Api.Infrastructure;
using Be.Vlaanderen.Basisregisters.Api;
using Be.Vlaanderen.Basisregisters.Api.Exceptions;
using Be.Vlaanderen.Basisregisters.Auth.AcmIdm;
using Be.Vlaanderen.Basisregisters.BlobStore;
using Be.Vlaanderen.Basisregisters.BlobStore.Sql;
using Be.Vlaanderen.Basisregisters.Shaperon.Geometries;
using Behaviors;
using Configuration;
using Controllers.Attributes;
using Core;
using Editor.Schema;
using Extensions;
using FeatureCompare.Readers;
using FeatureToggles;
using FluentValidation;
using Framework;
Expand All @@ -38,7 +37,6 @@ namespace RoadRegistry.BackOffice.Api.Infrastructure;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Mvc.Formatters;
using Microsoft.Data.SqlClient;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
Expand All @@ -57,14 +55,8 @@ namespace RoadRegistry.BackOffice.Api.Infrastructure;
using Serilog.Extensions.Logging;
using Snapshot.Handlers.Sqs;
using SqlStreamStore;
using SystemHealthCheck;
using System;
using System.Linq;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using FeatureCompare.Readers;
using Sync.MunicipalityRegistry;
using SystemHealthCheck;
using SystemHealthCheck.HealthChecks;
using ZipArchiveWriters.Cleaning;
using DomainAssemblyMarker = BackOffice.Handlers.Sqs.DomainAssemblyMarker;
Expand All @@ -75,12 +67,10 @@ public class Startup
{
private const string DatabaseTag = "db";
private readonly IConfiguration _configuration;
private readonly IWebHostEnvironment _webHostEnvironment;

public Startup(IConfiguration configuration, IWebHostEnvironment webHostEnvironment)
public Startup(IConfiguration configuration)
{
_configuration = configuration;
_webHostEnvironment = webHostEnvironment;
}

public void Configure(
Expand Down Expand Up @@ -329,7 +319,6 @@ public void ConfigureServices(IServiceCollection services)
.AddHealthCommandQueue()
.AddRoadNetworkCommandQueue()
.AddOrganizationCommandQueue()
.AddRoadNetworkSnapshotStrategyOptions()
.AddSingleton(apiOptions)
.Configure<ResponseOptions>(_configuration)

Expand Down

This file was deleted.

3 changes: 0 additions & 3 deletions src/RoadRegistry.BackOffice/Core/IRoadNetworks.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@ public interface IRoadNetworks
{
Task<RoadNetwork> Get(CancellationToken cancellationToken);
Task<RoadNetwork> Get(StreamName streamName, CancellationToken cancellationToken);
Task<RoadNetwork> Get(bool restoreSnapshot, ProcessMessageHandler cancelMessageProcessing, CancellationToken cancellationToken);
Task<(RoadNetwork, int)> GetWithVersion(CancellationToken cancellationToken);
Task<(RoadNetwork, int)> GetWithVersion(bool restoreSnapshot, ProcessMessageHandler cancelMessageProcessing, CancellationToken cancellationToken);

Task<RoadNetwork> ForOutlinedRoadSegment(RoadSegmentId roadSegmentId, CancellationToken cancellationToken);
}
25 changes: 8 additions & 17 deletions src/RoadRegistry.BackOffice/Core/RoadNetworks.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,27 +64,18 @@ public ProcessSnapshotContext(int? version)
Version = version;
}
}

public async Task<RoadNetwork> Get(CancellationToken cancellationToken)
{
var (roadNetwork, _) = await GetWithVersion(cancellationToken);
var (roadNetwork, _) = await GetWithVersion(true, null, cancellationToken);
return roadNetwork;
}

public async Task<RoadNetwork> Get(StreamName streamName, CancellationToken cancellationToken)
{
var (roadNetwork, _) = await GetWithVersion(streamName, true, null, cancellationToken);
return roadNetwork;
}
public async Task<RoadNetwork> Get(bool restoreSnapshot, ProcessMessageHandler cancelMessageProcessing, CancellationToken cancellationToken)
{
var (roadNetwork, _) = await GetWithVersion(restoreSnapshot, cancelMessageProcessing, cancellationToken);
return roadNetwork;
}

public Task<(RoadNetwork, int)> GetWithVersion(CancellationToken cancellationToken)
{
return GetWithVersion(true, null, cancellationToken);
}

public Task<(RoadNetwork, int)> GetWithVersion(bool restoreSnapshot, ProcessMessageHandler cancelMessageProcessing, CancellationToken cancellationToken)
{
Expand Down Expand Up @@ -124,7 +115,7 @@ public async Task<RoadNetwork> ForOutlinedRoadSegment(RoadSegmentId roadSegmentI
var roadNetwork = RoadNetwork.Factory(view.ToImmutable());
_map.Attach(new EventSourcedEntityMapEntry(roadNetwork, streamName, ExpectedVersion.Any));

return (roadNetwork, snapshotContext.Version.Value);
return (roadNetwork, snapshotContext.Version!.Value);
}

private (RoadNetwork, int) EmptyRoadNetwork(StreamName streamName)
Expand All @@ -137,7 +128,7 @@ public async Task<RoadNetwork> ForOutlinedRoadSegment(RoadSegmentId roadSegmentI
private async Task<IRoadNetworkView> ProcessPages(StreamName streamName, IRoadNetworkView view, ProcessSnapshotContext snapshotContext, ReadStreamPage page, ProcessMessageHandler cancelMessageProcessing, CancellationToken cancellationToken)
{
view = await ProcessPage(view, snapshotContext, page, cancelMessageProcessing, cancellationToken);

var sw = Stopwatch.StartNew();
while (!page.IsEnd && !snapshotContext.ProcessingCancelled)
{
Expand All @@ -160,7 +151,7 @@ private async Task<IRoadNetworkView> ProcessPages(StreamName streamName, IRoadNe
private async Task<IRoadNetworkView> ProcessPage(IRoadNetworkView view, ProcessSnapshotContext snapshotContext, ReadStreamPage page, ProcessMessageHandler cancelMessageProcessing, CancellationToken cancellationToken)
{
var messages = new List<object>(page.Messages.Length);

foreach (var message in page.Messages)
{
cancellationToken.ThrowIfCancellationRequested();
Expand Down Expand Up @@ -204,12 +195,12 @@ await message.GetJsonData(cancellationToken),
var sw = Stopwatch.StartNew();

int version;

if (restoreSnapshot && streamName.SupportsSnapshot)
{
_logger.LogInformation("Read started for RoadNetwork snapshot");
var (snapshot, snapshotVersion) = await _snapshotReader.ReadSnapshotAsync(cancellationToken);

_logger.LogInformation("Read finished for RoadNetwork snapshot version {SnapshotVersion} in {StopwatchElapsedMilliseconds}ms", snapshotVersion, sw.ElapsedMilliseconds);

if (snapshot != null && snapshotVersion != ExpectedVersion.NoStream)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
namespace RoadRegistry.BackOffice.Extensions;

using System;
using System.Collections.Generic;
using System.Configuration;
using System.Linq;
using Amazon.SimpleEmailV2;
using Be.Vlaanderen.Basisregisters.Aws.DistributedS3Cache;
using Be.Vlaanderen.Basisregisters.BlobStore.Sql;
using Configuration;
using Core;
using FeatureCompare;
Expand All @@ -11,16 +14,11 @@ namespace RoadRegistry.BackOffice.Extensions;
using FeatureCompare.Validation;
using FeatureToggle;
using Framework;
using Microsoft.Data.SqlClient;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.IO;
using NodaTime;
using System;
using System.Collections.Generic;
using System.Configuration;
using System.Linq;
using Uploads;

public static class ServiceCollectionExtensions
Expand Down Expand Up @@ -111,7 +109,7 @@ public static IServiceCollection AddOrganizationCommandQueue(this IServiceCollec
public static IServiceCollection AddEventEnricher(this IServiceCollection services)
{
return services
.AddSingleton<EventEnricher>(sp => EnrichEvent.WithTime(sp.GetRequiredService<IClock>()));
.AddSingleton(sp => EnrichEvent.WithTime(sp.GetRequiredService<IClock>()));
}

public static IServiceCollection AddRoadNetworkEventWriter(this IServiceCollection services)
Expand Down Expand Up @@ -176,18 +174,6 @@ public static IServiceCollection RegisterOptions<TOptions>(this IServiceCollecti
return services.AddSingleton(sp => sp.GetRequiredService<IConfiguration>().GetOptions<TOptions>(configurationSectionKey));
}

public static IServiceCollection AddRoadNetworkSnapshotStrategyOptions(this IServiceCollection services)
{
return services
.RegisterOptions<RoadNetworkSnapshotStrategyOptions>(options =>
{
if (options.EventCount <= 0)
{
throw new ConfigurationErrorsException($"{nameof(options.EventCount)} must be greater than zero");
}
});
}

public static IServiceCollection AddFeatureCompare(this IServiceCollection services)
{
return services
Expand Down
1 change: 0 additions & 1 deletion src/RoadRegistry.Hosts/RoadRegistrySqsLambdaHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ protected sealed override TicketError MapDomainException(DomainException excepti
}
}


return ticketError;
}
}
16 changes: 3 additions & 13 deletions src/RoadRegistry.Snapshot.Handlers.Sqs.Lambda/Function.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,32 +6,22 @@ namespace RoadRegistry.Snapshot.Handlers.Sqs.Lambda;

using Autofac;
using BackOffice;
using BackOffice.Extensions;
using Be.Vlaanderen.Basisregisters.EventHandling.Autofac;
using Hosts;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;

public class Function : RoadRegistryLambdaFunction<MessageHandler>
{
protected override string ApplicationName => "RoadRegistry.Snapshot.Handlers.Sqs.Lambda";

public Function() : base(new[] { typeof(Sqs.DomainAssemblyMarker).Assembly })
public Function() : base([typeof(SnapshotHandlersSqsAssemblyMarker).Assembly])
{
}

protected override void ConfigureServices(HostBuilderContext context, IServiceCollection services)
{
services
.AddRoadNetworkSnapshotStrategyOptions()
;
}

protected override void ConfigureContainer(HostBuilderContext context, ContainerBuilder builder)
{
builder
.RegisterModule(new EventHandlingModule(typeof(Sqs.DomainAssemblyMarker).Assembly, EventSerializerSettings))
.RegisterModule<ContextModule>()
;
.RegisterModule(new EventHandlingModule(typeof(SnapshotHandlersSqsAssemblyMarker).Assembly, EventSerializerSettings))
.RegisterModule<ContextModule>();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ namespace RoadRegistry.Snapshot.Handlers.Sqs.Lambda.Handlers;
using System.Diagnostics;
using BackOffice;
using BackOffice.Abstractions.RoadNetworks;
using BackOffice.Configuration;
using BackOffice.Core;
using Be.Vlaanderen.Basisregisters.Sqs.Lambda.Infrastructure;
using Hosts;
Expand All @@ -16,7 +15,6 @@ public sealed class CreateRoadNetworkSnapshotSqsLambdaRequestHandler : SqsLambda
{
private readonly IRoadNetworkSnapshotReader _snapshotReader;
private readonly IRoadNetworkSnapshotWriter _snapshotWriter;
private readonly RoadNetworkSnapshotStrategyOptions _snapshotStrategyOptions;
private readonly Stopwatch _stopwatch;

public CreateRoadNetworkSnapshotSqsLambdaRequestHandler(
Expand All @@ -26,14 +24,12 @@ public CreateRoadNetworkSnapshotSqsLambdaRequestHandler(
IRoadRegistryContext context,
IRoadNetworkSnapshotReader snapshotReader,
IRoadNetworkSnapshotWriter snapshotWriter,
RoadNetworkSnapshotStrategyOptions snapshotStrategyOptions,
ILogger<CreateRoadNetworkSnapshotSqsLambdaRequestHandler> logger)
: base(options, retryPolicy, ticketing, null, context, logger)
ILoggerFactory loggerFactory)
: base(options, retryPolicy, ticketing, context, loggerFactory.CreateLogger<CreateRoadNetworkSnapshotSqsLambdaRequestHandler>())
{
_stopwatch = new Stopwatch();
_snapshotReader = snapshotReader;
_snapshotWriter = snapshotWriter;
_snapshotStrategyOptions = snapshotStrategyOptions;
}

protected override async Task<object> InnerHandle(CreateRoadNetworkSnapshotSqsLambdaRequest request, CancellationToken cancellationToken)
Expand All @@ -42,38 +38,26 @@ protected override async Task<object> InnerHandle(CreateRoadNetworkSnapshotSqsLa

try
{
var streamVersion = request.Request.StreamVersion;
var streamMaxVersion = _snapshotStrategyOptions.GetLastAllowedStreamVersionToTakeSnapshot(streamVersion);
var requestStreamVersion = request.Request.StreamVersion;
var snapshotStreamVersion = await _snapshotReader.ReadSnapshotVersionAsync(cancellationToken);

// Check if snapshot should be taken
if (streamVersion.Equals(streamMaxVersion))
if (snapshotStreamVersion >= requestStreamVersion)
{
var snapshotVersion = await _snapshotReader.ReadSnapshotVersionAsync(cancellationToken);

// Check if current snapshot is already further that stream version
if (streamMaxVersion > 0 && snapshotVersion >= streamMaxVersion)
{
Logger.LogWarning("Create snapshot skipped for new message received from SQS with snapshot version {SnapshotVersion} and stream version {StreamVersion}", snapshotVersion, streamVersion);
}
else
{
var (roadnetwork, roadnetworkVersion) = await RoadRegistryContext.RoadNetworks.GetWithVersion(true,
(messageStreamVersion, _) => streamMaxVersion > 0 && messageStreamVersion > streamMaxVersion, cancellationToken);
var snapshot = roadnetwork.TakeSnapshot();
Logger.LogWarning("Create snapshot skipped for new message received from SQS with snapshot version {SnapshotVersion} and stream version {StreamVersion}", snapshotStreamVersion, requestStreamVersion);
return new CreateRoadNetworkSnapshotResponse(null);
}

Logger.LogInformation("Create snapshot started for new message received from SQS with snapshot version {SnapshotVersion}", roadnetworkVersion);
await _snapshotWriter.WriteSnapshot(snapshot, roadnetworkVersion, cancellationToken);
Logger.LogInformation("Create snapshot completed for version {SnapshotVersion} in {TotalElapsedTimespan}", roadnetworkVersion, _stopwatch.Elapsed);
var (roadnetwork, roadnetworkVersion) = await RoadRegistryContext.RoadNetworks.GetWithVersion(
true,
cancelMessageProcessing: (_, _) => _stopwatch.Elapsed.Minutes >= 10,
cancellationToken);
var snapshot = roadnetwork.TakeSnapshot();

return new CreateRoadNetworkSnapshotResponse(roadnetworkVersion);
}
}
else
{
Logger.LogInformation("Snapshot strategy determined that the strategy limit had not been reached");
}
Logger.LogInformation("Create snapshot started for new message received from SQS with snapshot version {SnapshotVersion}", roadnetworkVersion);
await _snapshotWriter.WriteSnapshot(snapshot, roadnetworkVersion, cancellationToken);
Logger.LogInformation("Create snapshot completed for version {SnapshotVersion} in {TotalElapsedTimespan}", roadnetworkVersion, _stopwatch.Elapsed);

return new CreateRoadNetworkSnapshotResponse(null);
return new CreateRoadNetworkSnapshotResponse(roadnetworkVersion);
}
catch (Exception ex)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public SnapshotLambdaHealthCheckSqsLambdaRequestHandler(
IRoadNetworkSnapshotReader snapshotReader,
IRoadNetworkSnapshotWriter snapshotWriter,
ILogger<SnapshotLambdaHealthCheckSqsLambdaRequestHandler> logger)
: base(options, retryPolicy, ticketing, null, context, logger)
: base(options, retryPolicy, ticketing, context, logger)
{
_snapshotReader = snapshotReader;
_snapshotWriter = snapshotWriter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@ protected SqsLambdaHandler(
SqsLambdaHandlerOptions options,
ICustomRetryPolicy retryPolicy,
ITicketing ticketing,
IIdempotentCommandHandler idempotentCommandHandler,
IRoadRegistryContext roadRegistryContext,
ILogger logger)
: base(options, retryPolicy, ticketing, idempotentCommandHandler, roadRegistryContext, logger)
: base(options, retryPolicy, ticketing, null, roadRegistryContext, logger)
{
}

Expand Down
Loading

0 comments on commit 675fe0d

Please sign in to comment.