Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: create snapshot until last message #1602

Merged
merged 3 commits into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions src/RoadRegistry.AdminHost/Infrastructure/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ protected Program()
public static async Task Main(string[] args)
{
var roadRegistryHost = new RoadRegistryHostBuilder<Program>(args)
.ConfigureServices((hostContext, services) => services
.ConfigureServices((_, services) => services
.AddTicketing()
.AddEmailClient()
.AddSingleton(sp => Dispatch.Using(Resolve.WhenEqualToMessage(
Expand Down Expand Up @@ -58,11 +58,10 @@ public static async Task Main(string[] args)
.AddRoadNetworkCommandQueue()
.AddRoadNetworkEventWriter()
.AddRoadRegistrySnapshot()
.AddRoadNetworkSnapshotStrategyOptions()
.AddEditorContext()
.AddProductContext()
)
.ConfigureContainer((hostContext, builder) =>
.ConfigureContainer((_, builder) =>
{
builder.RegisterModule<MediatorModule>();
builder.RegisterModule<BackOffice.Handlers.MediatorModule>();
Expand Down
17 changes: 3 additions & 14 deletions src/RoadRegistry.BackOffice.Api/Infrastructure/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,14 @@ namespace RoadRegistry.BackOffice.Api.Infrastructure;
using Be.Vlaanderen.Basisregisters.Api;
using Be.Vlaanderen.Basisregisters.Api.Exceptions;
using Be.Vlaanderen.Basisregisters.Auth.AcmIdm;
using Be.Vlaanderen.Basisregisters.BlobStore;
using Be.Vlaanderen.Basisregisters.BlobStore.Sql;
using Be.Vlaanderen.Basisregisters.Shaperon.Geometries;
using Behaviors;
using Configuration;
using Controllers.Attributes;
using Core;
using Editor.Schema;
using Extensions;
using FeatureCompare.Readers;
using FeatureToggles;
using FluentValidation;
using Framework;
Expand All @@ -38,7 +37,6 @@ namespace RoadRegistry.BackOffice.Api.Infrastructure;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Mvc.Formatters;
using Microsoft.Data.SqlClient;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
Expand All @@ -57,14 +55,8 @@ namespace RoadRegistry.BackOffice.Api.Infrastructure;
using Serilog.Extensions.Logging;
using Snapshot.Handlers.Sqs;
using SqlStreamStore;
using SystemHealthCheck;
using System;
using System.Linq;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using FeatureCompare.Readers;
using Sync.MunicipalityRegistry;
using SystemHealthCheck;
using SystemHealthCheck.HealthChecks;
using ZipArchiveWriters.Cleaning;
using DomainAssemblyMarker = BackOffice.Handlers.Sqs.DomainAssemblyMarker;
Expand All @@ -75,12 +67,10 @@ public class Startup
{
private const string DatabaseTag = "db";
private readonly IConfiguration _configuration;
private readonly IWebHostEnvironment _webHostEnvironment;

public Startup(IConfiguration configuration, IWebHostEnvironment webHostEnvironment)
public Startup(IConfiguration configuration)
{
_configuration = configuration;
_webHostEnvironment = webHostEnvironment;
}

public void Configure(
Expand Down Expand Up @@ -329,7 +319,6 @@ public void ConfigureServices(IServiceCollection services)
.AddHealthCommandQueue()
.AddRoadNetworkCommandQueue()
.AddOrganizationCommandQueue()
.AddRoadNetworkSnapshotStrategyOptions()
.AddSingleton(apiOptions)
.Configure<ResponseOptions>(_configuration)

Expand Down

This file was deleted.

3 changes: 0 additions & 3 deletions src/RoadRegistry.BackOffice/Core/IRoadNetworks.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@ public interface IRoadNetworks
{
Task<RoadNetwork> Get(CancellationToken cancellationToken);
Task<RoadNetwork> Get(StreamName streamName, CancellationToken cancellationToken);
Task<RoadNetwork> Get(bool restoreSnapshot, ProcessMessageHandler cancelMessageProcessing, CancellationToken cancellationToken);
Task<(RoadNetwork, int)> GetWithVersion(CancellationToken cancellationToken);
Task<(RoadNetwork, int)> GetWithVersion(bool restoreSnapshot, ProcessMessageHandler cancelMessageProcessing, CancellationToken cancellationToken);

Task<RoadNetwork> ForOutlinedRoadSegment(RoadSegmentId roadSegmentId, CancellationToken cancellationToken);
}
25 changes: 8 additions & 17 deletions src/RoadRegistry.BackOffice/Core/RoadNetworks.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,27 +64,18 @@ public ProcessSnapshotContext(int? version)
Version = version;
}
}

public async Task<RoadNetwork> Get(CancellationToken cancellationToken)
{
var (roadNetwork, _) = await GetWithVersion(cancellationToken);
var (roadNetwork, _) = await GetWithVersion(true, null, cancellationToken);
return roadNetwork;
}

public async Task<RoadNetwork> Get(StreamName streamName, CancellationToken cancellationToken)
{
var (roadNetwork, _) = await GetWithVersion(streamName, true, null, cancellationToken);
return roadNetwork;
}
public async Task<RoadNetwork> Get(bool restoreSnapshot, ProcessMessageHandler cancelMessageProcessing, CancellationToken cancellationToken)
{
var (roadNetwork, _) = await GetWithVersion(restoreSnapshot, cancelMessageProcessing, cancellationToken);
return roadNetwork;
}

public Task<(RoadNetwork, int)> GetWithVersion(CancellationToken cancellationToken)
{
return GetWithVersion(true, null, cancellationToken);
}

public Task<(RoadNetwork, int)> GetWithVersion(bool restoreSnapshot, ProcessMessageHandler cancelMessageProcessing, CancellationToken cancellationToken)
{
Expand Down Expand Up @@ -124,7 +115,7 @@ public async Task<RoadNetwork> ForOutlinedRoadSegment(RoadSegmentId roadSegmentI
var roadNetwork = RoadNetwork.Factory(view.ToImmutable());
_map.Attach(new EventSourcedEntityMapEntry(roadNetwork, streamName, ExpectedVersion.Any));

return (roadNetwork, snapshotContext.Version.Value);
return (roadNetwork, snapshotContext.Version!.Value);
}

private (RoadNetwork, int) EmptyRoadNetwork(StreamName streamName)
Expand All @@ -137,7 +128,7 @@ public async Task<RoadNetwork> ForOutlinedRoadSegment(RoadSegmentId roadSegmentI
private async Task<IRoadNetworkView> ProcessPages(StreamName streamName, IRoadNetworkView view, ProcessSnapshotContext snapshotContext, ReadStreamPage page, ProcessMessageHandler cancelMessageProcessing, CancellationToken cancellationToken)
{
view = await ProcessPage(view, snapshotContext, page, cancelMessageProcessing, cancellationToken);

var sw = Stopwatch.StartNew();
while (!page.IsEnd && !snapshotContext.ProcessingCancelled)
{
Expand All @@ -160,7 +151,7 @@ private async Task<IRoadNetworkView> ProcessPages(StreamName streamName, IRoadNe
private async Task<IRoadNetworkView> ProcessPage(IRoadNetworkView view, ProcessSnapshotContext snapshotContext, ReadStreamPage page, ProcessMessageHandler cancelMessageProcessing, CancellationToken cancellationToken)
{
var messages = new List<object>(page.Messages.Length);

foreach (var message in page.Messages)
{
cancellationToken.ThrowIfCancellationRequested();
Expand Down Expand Up @@ -204,12 +195,12 @@ await message.GetJsonData(cancellationToken),
var sw = Stopwatch.StartNew();

int version;

if (restoreSnapshot && streamName.SupportsSnapshot)
{
_logger.LogInformation("Read started for RoadNetwork snapshot");
var (snapshot, snapshotVersion) = await _snapshotReader.ReadSnapshotAsync(cancellationToken);

_logger.LogInformation("Read finished for RoadNetwork snapshot version {SnapshotVersion} in {StopwatchElapsedMilliseconds}ms", snapshotVersion, sw.ElapsedMilliseconds);

if (snapshot != null && snapshotVersion != ExpectedVersion.NoStream)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
namespace RoadRegistry.BackOffice.Extensions;

using System;
using System.Collections.Generic;
using System.Configuration;
using System.Linq;
using Amazon.SimpleEmailV2;
using Be.Vlaanderen.Basisregisters.Aws.DistributedS3Cache;
using Be.Vlaanderen.Basisregisters.BlobStore.Sql;
using Configuration;
using Core;
using FeatureCompare;
Expand All @@ -11,16 +14,11 @@ namespace RoadRegistry.BackOffice.Extensions;
using FeatureCompare.Validation;
using FeatureToggle;
using Framework;
using Microsoft.Data.SqlClient;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.IO;
using NodaTime;
using System;
using System.Collections.Generic;
using System.Configuration;
using System.Linq;
using Uploads;

public static class ServiceCollectionExtensions
Expand Down Expand Up @@ -111,7 +109,7 @@ public static IServiceCollection AddOrganizationCommandQueue(this IServiceCollec
public static IServiceCollection AddEventEnricher(this IServiceCollection services)
{
return services
.AddSingleton<EventEnricher>(sp => EnrichEvent.WithTime(sp.GetRequiredService<IClock>()));
.AddSingleton(sp => EnrichEvent.WithTime(sp.GetRequiredService<IClock>()));
}

public static IServiceCollection AddRoadNetworkEventWriter(this IServiceCollection services)
Expand Down Expand Up @@ -176,18 +174,6 @@ public static IServiceCollection RegisterOptions<TOptions>(this IServiceCollecti
return services.AddSingleton(sp => sp.GetRequiredService<IConfiguration>().GetOptions<TOptions>(configurationSectionKey));
}

public static IServiceCollection AddRoadNetworkSnapshotStrategyOptions(this IServiceCollection services)
{
return services
.RegisterOptions<RoadNetworkSnapshotStrategyOptions>(options =>
{
if (options.EventCount <= 0)
{
throw new ConfigurationErrorsException($"{nameof(options.EventCount)} must be greater than zero");
}
});
}

public static IServiceCollection AddFeatureCompare(this IServiceCollection services)
{
return services
Expand Down
1 change: 0 additions & 1 deletion src/RoadRegistry.Hosts/RoadRegistrySqsLambdaHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ protected sealed override TicketError MapDomainException(DomainException excepti
}
}


return ticketError;
}
}
16 changes: 3 additions & 13 deletions src/RoadRegistry.Snapshot.Handlers.Sqs.Lambda/Function.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,32 +6,22 @@ namespace RoadRegistry.Snapshot.Handlers.Sqs.Lambda;

using Autofac;
using BackOffice;
using BackOffice.Extensions;
using Be.Vlaanderen.Basisregisters.EventHandling.Autofac;
using Hosts;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;

public class Function : RoadRegistryLambdaFunction<MessageHandler>
{
protected override string ApplicationName => "RoadRegistry.Snapshot.Handlers.Sqs.Lambda";

public Function() : base(new[] { typeof(Sqs.DomainAssemblyMarker).Assembly })
public Function() : base([typeof(SnapshotHandlersSqsAssemblyMarker).Assembly])
{
}

protected override void ConfigureServices(HostBuilderContext context, IServiceCollection services)
{
services
.AddRoadNetworkSnapshotStrategyOptions()
;
}

protected override void ConfigureContainer(HostBuilderContext context, ContainerBuilder builder)
{
builder
.RegisterModule(new EventHandlingModule(typeof(Sqs.DomainAssemblyMarker).Assembly, EventSerializerSettings))
.RegisterModule<ContextModule>()
;
.RegisterModule(new EventHandlingModule(typeof(SnapshotHandlersSqsAssemblyMarker).Assembly, EventSerializerSettings))
.RegisterModule<ContextModule>();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ namespace RoadRegistry.Snapshot.Handlers.Sqs.Lambda.Handlers;
using System.Diagnostics;
using BackOffice;
using BackOffice.Abstractions.RoadNetworks;
using BackOffice.Configuration;
using BackOffice.Core;
using Be.Vlaanderen.Basisregisters.Sqs.Lambda.Infrastructure;
using Hosts;
Expand All @@ -16,7 +15,6 @@ public sealed class CreateRoadNetworkSnapshotSqsLambdaRequestHandler : SqsLambda
{
private readonly IRoadNetworkSnapshotReader _snapshotReader;
private readonly IRoadNetworkSnapshotWriter _snapshotWriter;
private readonly RoadNetworkSnapshotStrategyOptions _snapshotStrategyOptions;
private readonly Stopwatch _stopwatch;

public CreateRoadNetworkSnapshotSqsLambdaRequestHandler(
Expand All @@ -26,14 +24,12 @@ public CreateRoadNetworkSnapshotSqsLambdaRequestHandler(
IRoadRegistryContext context,
IRoadNetworkSnapshotReader snapshotReader,
IRoadNetworkSnapshotWriter snapshotWriter,
RoadNetworkSnapshotStrategyOptions snapshotStrategyOptions,
ILogger<CreateRoadNetworkSnapshotSqsLambdaRequestHandler> logger)
: base(options, retryPolicy, ticketing, null, context, logger)
ILoggerFactory loggerFactory)
: base(options, retryPolicy, ticketing, context, loggerFactory.CreateLogger<CreateRoadNetworkSnapshotSqsLambdaRequestHandler>())
{
_stopwatch = new Stopwatch();
_snapshotReader = snapshotReader;
_snapshotWriter = snapshotWriter;
_snapshotStrategyOptions = snapshotStrategyOptions;
}

protected override async Task<object> InnerHandle(CreateRoadNetworkSnapshotSqsLambdaRequest request, CancellationToken cancellationToken)
Expand All @@ -42,38 +38,26 @@ protected override async Task<object> InnerHandle(CreateRoadNetworkSnapshotSqsLa

try
{
var streamVersion = request.Request.StreamVersion;
var streamMaxVersion = _snapshotStrategyOptions.GetLastAllowedStreamVersionToTakeSnapshot(streamVersion);
var requestStreamVersion = request.Request.StreamVersion;
var snapshotStreamVersion = await _snapshotReader.ReadSnapshotVersionAsync(cancellationToken);

// Check if snapshot should be taken
if (streamVersion.Equals(streamMaxVersion))
if (snapshotStreamVersion >= requestStreamVersion)
{
var snapshotVersion = await _snapshotReader.ReadSnapshotVersionAsync(cancellationToken);

// Check if current snapshot is already further that stream version
if (streamMaxVersion > 0 && snapshotVersion >= streamMaxVersion)
{
Logger.LogWarning("Create snapshot skipped for new message received from SQS with snapshot version {SnapshotVersion} and stream version {StreamVersion}", snapshotVersion, streamVersion);
}
else
{
var (roadnetwork, roadnetworkVersion) = await RoadRegistryContext.RoadNetworks.GetWithVersion(true,
(messageStreamVersion, _) => streamMaxVersion > 0 && messageStreamVersion > streamMaxVersion, cancellationToken);
var snapshot = roadnetwork.TakeSnapshot();
Logger.LogWarning("Create snapshot skipped for new message received from SQS with snapshot version {SnapshotVersion} and stream version {StreamVersion}", snapshotStreamVersion, requestStreamVersion);
return new CreateRoadNetworkSnapshotResponse(null);
}

Logger.LogInformation("Create snapshot started for new message received from SQS with snapshot version {SnapshotVersion}", roadnetworkVersion);
await _snapshotWriter.WriteSnapshot(snapshot, roadnetworkVersion, cancellationToken);
Logger.LogInformation("Create snapshot completed for version {SnapshotVersion} in {TotalElapsedTimespan}", roadnetworkVersion, _stopwatch.Elapsed);
var (roadnetwork, roadnetworkVersion) = await RoadRegistryContext.RoadNetworks.GetWithVersion(
true,
cancelMessageProcessing: (_, _) => _stopwatch.Elapsed.Minutes >= 10,
cancellationToken);
var snapshot = roadnetwork.TakeSnapshot();

return new CreateRoadNetworkSnapshotResponse(roadnetworkVersion);
}
}
else
{
Logger.LogInformation("Snapshot strategy determined that the strategy limit had not been reached");
}
Logger.LogInformation("Create snapshot started for new message received from SQS with snapshot version {SnapshotVersion}", roadnetworkVersion);
await _snapshotWriter.WriteSnapshot(snapshot, roadnetworkVersion, cancellationToken);
Logger.LogInformation("Create snapshot completed for version {SnapshotVersion} in {TotalElapsedTimespan}", roadnetworkVersion, _stopwatch.Elapsed);

return new CreateRoadNetworkSnapshotResponse(null);
return new CreateRoadNetworkSnapshotResponse(roadnetworkVersion);
}
catch (Exception ex)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public SnapshotLambdaHealthCheckSqsLambdaRequestHandler(
IRoadNetworkSnapshotReader snapshotReader,
IRoadNetworkSnapshotWriter snapshotWriter,
ILogger<SnapshotLambdaHealthCheckSqsLambdaRequestHandler> logger)
: base(options, retryPolicy, ticketing, null, context, logger)
: base(options, retryPolicy, ticketing, context, logger)
{
_snapshotReader = snapshotReader;
_snapshotWriter = snapshotWriter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@ protected SqsLambdaHandler(
SqsLambdaHandlerOptions options,
ICustomRetryPolicy retryPolicy,
ITicketing ticketing,
IIdempotentCommandHandler idempotentCommandHandler,
IRoadRegistryContext roadRegistryContext,
ILogger logger)
: base(options, retryPolicy, ticketing, idempotentCommandHandler, roadRegistryContext, logger)
: base(options, retryPolicy, ticketing, null, roadRegistryContext, logger)
{
}

Expand Down
Loading
Loading