Skip to content

Commit

Permalink
fix: or-2595 failed messages exceptions are sent to dlq
Browse files Browse the repository at this point in the history
---------

Co-authored-by: Koen Metsu <[email protected]>
Co-authored-by: Jan Lesage <[email protected]>
  • Loading branch information
3 people authored Dec 6, 2024
1 parent 8ee592a commit bf44b16
Show file tree
Hide file tree
Showing 17 changed files with 206 additions and 76 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/pre-merge-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ jobs:
shell: bash
if: matrix.projects == 'test/AssociationRegistry.Test.Public.Api' || matrix.projects == 'test/AssociationRegistry.Test.Admin.Api' || matrix.projects == 'test/AssociationRegistry.Test.E2E'
run: |
docker run -d --name localstack -v ${{ github.workspace }}/.localstack/init:/etc/localstack/init -p 4566:4566 -p 4510-4559:4510-4559 -e SERVICES=sqs,s3 -e DOCKER_HOST=unix:///var/run/docker.sock -e DEFAULT_REGION=eu-west-1 -e DEBUG=1 -e PORT_WEB_UI=8080 localstack/localstack
docker run -d --name localstack -v ${{ github.workspace }}/.localstack/init:/etc/localstack/init -p 4566:4566 -p 4510-4559:4510-4559 -e SERVICES=sqs,s3 -e DOCKER_HOST=unix:///var/run/docker.sock -e DEFAULT_REGION=us-east-1 -e DEBUG=1 -e PORT_WEB_UI=8080 localstack/localstack
- name: Run wiremock container
shell: bash
Expand Down
3 changes: 0 additions & 3 deletions .localstack/init/ready.d/01_sqs-init.redrive.json

This file was deleted.

30 changes: 19 additions & 11 deletions .localstack/init/ready.d/01_sqs-init.sh
Original file line number Diff line number Diff line change
@@ -1,19 +1,27 @@
#!/bin/sh

awslocal s3api create-bucket --bucket verenigingsregister-uwp-data

# Set variables
REGION="eu-west-1"
REGION="us-east-1"
ACCOUNT_ID="000000000000" # Default account ID used by LocalStack
MAX_RECEIVE_COUNT=3
QUEUE_PART=""

# DLQ names
DLQ_NAME="verenigingsregister-addressmatch-dlq"

# Construct DLQ ARNs
DLQ_ARN="arn:aws:sqs:$REGION:$ACCOUNT_ID:$DLQ_NAME"
with_dlq()
{
QUEUE_NAME="verenigingsregister-$@"
DLQ_NAME="$QUEUE_NAME-dlq"
DLQ_ARN="arn:aws:sqs:$REGION:$ACCOUNT_ID:$DLQ_NAME"

# Create DLQs
awslocal sqs create-queue --region $REGION --queue-name $DLQ_NAME
awslocal sqs create-queue --region $REGION --queue-name $QUEUE_NAME
awslocal sqs create-queue --region $REGION --queue-name $DLQ_NAME
awslocal sqs set-queue-attributes \
--queue-url http://sqs.$REGION.localhost.localstack.cloud:4566/000000000000/$QUEUE_NAME \
--attributes "{
\"RedrivePolicy\": \"{ \\\"maxReceiveCount\\\": \\\"3\\\", \\\"deadLetterTargetArn\\\": \\\"$DLQ_ARN\\\" }\"
}"
}

awslocal sqs create-queue --region $REGION --queue-name verenigingsregister-addressmatch --attribute file:///etc/localstack/init/ready.d/01_sqs-init.redrive.json
with_dlq addressmatch
with_dlq addressmatch-e2e
with_dlq grarsync
with_dlq grarsync-e2e
2 changes: 2 additions & 0 deletions .localstack/init/ready.d/02_s3-init.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
#!/bin/sh
awslocal s3api create-bucket --bucket verenigingsregister-uwp-data
3 changes: 0 additions & 3 deletions .localstack/init/ready.d/02_sqs-init.redrive.json

This file was deleted.

17 changes: 0 additions & 17 deletions .localstack/init/ready.d/02_sqs-init.sh

This file was deleted.

4 changes: 1 addition & 3 deletions AssociationRegistry.sln
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "boot", "boot", "{74A2617E-B
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "ready", "ready", "{24B63C4D-D0B0-41BF-9CC8-8C3C7E6D5778}"
ProjectSection(SolutionItems) = preProject
.localstack\init\ready.d\01_sqs-init.redrive.json = .localstack\init\ready.d\01_sqs-init.redrive.json
.localstack\init\ready.d\01_sqs-init.sh = .localstack\init\ready.d\01_sqs-init.sh
.localstack\init\ready.d\02_sqs-init.redrive.json = .localstack\init\ready.d\02_sqs-init.redrive.json
.localstack\init\ready.d\02_sqs-init.sh = .localstack\init\ready.d\02_sqs-init.sh
.localstack\init\ready.d\02_s3-init.sh = .localstack\init\ready.d\02_s3-init.sh
EndProjectSection
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "AdresMatch", "AdresMatch", "{CB723987-9ECB-446A-832A-18CA30D7B8FE}"
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ services:
environment:
SERVICES: sqs,s3
DOCKER_HOST: unix:///var/run/docker.sock
DEFAULT_REGION: eu-west-1
DEFAULT_REGION: us-east-1
DEBUG: 1
PORT_WEB_UI: 8080
volumes:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,27 +1,21 @@
namespace AssociationRegistry.Admin.Api.Infrastructure.AWS;

using Acties.GrarConsumer.HeradresseerLocaties;
using Amazon.SQS;
using Framework;
using Grar.GrarUpdates.Fusies.TeHeradresserenLocaties;
using Hosts.Configuration;
using Hosts.Configuration.ConfigurationBindings;
using Kbo;
using System.Text.Json;


using Wolverine;

public class SqsClientWrapper : ISqsClientWrapper
{
private readonly IAmazonSQS _sqsClient;
private readonly string _kboSyncQueueUrl;
private readonly string _readdressQueueUrl;
private readonly IMessageBus _messageBus;

public SqsClientWrapper(IAmazonSQS sqsClient, AppSettings appSettings, GrarOptions grarOptions)
public SqsClientWrapper(IMessageBus messageBus)
{
_sqsClient = sqsClient;
_kboSyncQueueUrl = appSettings.KboSyncQueueUrl;
_readdressQueueUrl = grarOptions.Sqs.GrarSyncQueueUrl;
_messageBus = messageBus;
}

public async Task QueueReaddressMessage(HeradresseerLocatiesMessage message)
Expand All @@ -31,9 +25,7 @@ public async Task QueueReaddressMessage(HeradresseerLocatiesMessage message)

public async Task QueueMessage<TMessage>(TMessage message)
{
await _sqsClient.SendMessageAsync(
_readdressQueueUrl,
JsonSerializer.Serialize(message));
await _messageBus.SendAsync(message);
}

public async Task QueueKboNummerToSynchronise(string kboNummer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,14 @@
using Hosts.Configuration;
using JasperFx.CodeGeneration;
using Serilog;
using System.Text.Encodings.Web;
using System.Text.Json;
using System.Text.Json.Serialization;
using Vereniging;
using Wolverine;
using Wolverine.AmazonSqs;
using Wolverine.ErrorHandling;
using Wolverine.Runtime.Serialization;

public static class WolverineExtensions
{
Expand Down Expand Up @@ -105,14 +109,18 @@ private static void ConfigureGrarSyncListener(
sqsDeadLetterQueueName
);

options.PublishMessage<OverkoepelendeGrarConsumerMessage>()
.ToSqsQueue(sqsQueueName);

options.ListenToSqsQueue(sqsQueueName, configure: configure =>
{
configure.MaxNumberOfMessages = 1;
configure.DeadLetterQueueName = sqsDeadLetterQueueName;
})
.ConfigureDeadLetterQueue(sqsDeadLetterQueueName)
.ReceiveRawJsonMessage(typeof(OverkoepelendeGrarConsumerMessage))
.ConfigureDeadLetterQueue(sqsDeadLetterQueueName, configure: queue =>
{
queue.DeadLetterQueueName = sqsDeadLetterQueueName;
})
.MaximumParallelMessages(1);
}
}

5 changes: 4 additions & 1 deletion src/AssociationRegistry.Admin.Api/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,10 @@ private static void ConfigureServices(WebApplicationBuilder builder)
var appSettings = builder.Configuration.Get<AppSettings>();

var sqsClient = grarOptions.Sqs.UseLocalStack
? new AmazonSQSClient(new BasicAWSCredentials(accessKey: "dummy", secretKey: "dummy"), RegionEndpoint.EUWest1)
? new AmazonSQSClient(new BasicAWSCredentials("dummy", "dummy"), new AmazonSQSConfig()
{
ServiceURL = "http://localhost:4566",
})
: new AmazonSQSClient(RegionEndpoint.EUWest1);

builder.Services
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
namespace AssociationRegistry.Test.Admin.Api.Framework.Fixtures;

using Amazon.SQS;
using AssociationRegistry.Admin.Api;
using AssociationRegistry.Admin.Api.Constants;
using AssociationRegistry.Framework;
Expand Down Expand Up @@ -32,6 +33,7 @@ public abstract class AdminApiFixture : IDisposable, IAsyncLifetime
private readonly string _identifier = "adminapifixture";
private readonly WebApplicationFactory<Program> _adminApiServer;
private readonly WebApplicationFactory<ProjectionHostProgram> _projectionHostServer;
public IConfigurationRoot Configuration { get; }

internal IElasticClient ElasticClient
=> (IElasticClient)_adminApiServer.Services.GetRequiredService(typeof(ElasticClient));
Expand All @@ -56,13 +58,15 @@ private string DuplicateDetectionIndexName

protected AdminApiFixture()
{
Configuration = GetConfiguration();

WaitFor.PostGreSQLToBecomeAvailable(
new NullLogger<AdminApiFixture>(),
GetConnectionString(GetConfiguration(), RootDatabase))
GetConnectionString(Configuration, RootDatabase))
.GetAwaiter().GetResult();

DropDatabase();
EnsureDbExists(GetConfiguration());
EnsureDbExists(Configuration);

OaktonEnvironment.AutoStartHost = true;

Expand All @@ -72,20 +76,23 @@ protected AdminApiFixture()
{
builder.UseContentRoot(Directory.GetCurrentDirectory());
builder.UseSetting(key: "PostgreSQLOptions:database", _identifier);
builder.UseConfiguration(GetConfiguration());
builder.UseConfiguration(Configuration);
builder.UseSetting(key: "ElasticClientOptions:Indices:Verenigingen", _identifier);
});

_adminApiServer.CreateClient();

SqsClientWrapper = _adminApiServer.Services.GetRequiredService<ISqsClientWrapper>();
AmazonSqs = _adminApiServer.Services.GetRequiredService<IAmazonSQS>();

AdminApiClients = new AdminApiClients(
GetConfiguration().GetSection(nameof(OAuth2IntrospectionOptions))
.Get<OAuth2IntrospectionOptions>(),
Configuration.GetSection(nameof(OAuth2IntrospectionOptions))
.Get<OAuth2IntrospectionOptions>(),
_adminApiServer.CreateClient);

WaitFor.PostGreSQLToBecomeAvailable(
new NullLogger<AdminApiFixture>(),
GetConnectionString(GetConfiguration(), GetConfiguration().GetPostgreSqlOptionsSection().Database!))
GetConnectionString(Configuration, Configuration.GetPostgreSqlOptionsSection().Database!))
.GetAwaiter().GetResult();

var postgreSqlOptionsSection = _adminApiServer.Services.GetRequiredService<PostgreSqlOptionsSection>();
Expand All @@ -102,11 +109,14 @@ protected AdminApiFixture()
{
builder.UseContentRoot(Directory.GetCurrentDirectory());
builder.UseSetting($"{PostgreSqlOptionsSection.SectionName}:{nameof(PostgreSqlOptionsSection.Database)}", _identifier);
builder.UseConfiguration(GetConfiguration());
builder.UseConfiguration(Configuration);
builder.UseSetting(key: "ElasticClientOptions:Indices:Verenigingen", _identifier);
});
}

public IAmazonSQS AmazonSqs { get; set; }
public ISqsClientWrapper SqsClientWrapper { get; set; }

public IDocumentStore ApiDocumentStore
=> _adminApiServer.Services.GetRequiredService<IDocumentStore>();

Expand Down
2 changes: 1 addition & 1 deletion test/AssociationRegistry.Test.Admin.Api/appsettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@
"Offset": 0
},
"Sqs": {
"GrarSyncQueueUrl": "https://127.0.0.1.localstack.cloud:4566/000000000000/grar-sync",
"GrarSyncQueueUrl": "http://127.0.0.1:4566/000000000000/verenigingsregister-grarsync",
"GrarSyncQueueName": "verenigingsregister-grarsync",
"GrarSyncDeadLetterQueueName": "verenigingsregister-grarsync-dlq",
"AddressMatchQueueName": "verenigingsregister-addressmatch",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using Admin.Api;
using Alba;
using AlbaHost;
using Amazon.SQS;
using AssociationRegistry.Framework;
using Hosts.Configuration.ConfigurationBindings;
using JasperFx.RuntimeCompiler.Scenarios;
Expand Down Expand Up @@ -37,6 +38,8 @@ public FullBlownApiSetup()

public async Task InitializeAsync()
{
SetUpAdminApiConfiguration();

OaktonEnvironment.AutoStartHost = true;

AdminApiHost = (await AlbaHost.For<Program>(ConfigureForTesting("adminapi")))
Expand All @@ -55,23 +58,30 @@ public async Task InitializeAsync()
ConfigureForTesting("publicapi"));

AcmApiHost = (await AlbaHost.For<Acm.Api.Program>(
ConfigureForTesting("acmapi")))
.EnsureEachCallIsAuthenticatedForAcmApi();
ConfigureForTesting("acmapi")))
.EnsureEachCallIsAuthenticatedForAcmApi();

SqsClientWrapper = AdminApiHost.Services.GetRequiredService<ISqsClientWrapper>();
AmazonSqs = AdminApiHost.Services.GetRequiredService<IAmazonSQS>();

await AdminApiHost.DocumentStore().Storage.ApplyAllConfiguredChangesToDatabaseAsync();
await AdminProjectionHost.DocumentStore().Storage.ApplyAllConfiguredChangesToDatabaseAsync();

await PublicProjectionHost.DocumentStore().Storage.ApplyAllConfiguredChangesToDatabaseAsync();
await PublicApiHost.DocumentStore().Storage.ApplyAllConfiguredChangesToDatabaseAsync();
await AdminApiHost.DocumentStore().Storage.ApplyAllConfiguredChangesToDatabaseAsync();
}

await AcmApiHost.DocumentStore().Storage.ApplyAllConfiguredChangesToDatabaseAsync();
private void SetUpAdminApiConfiguration()
{
var configuration = new ConfigurationBuilder()
.AddJsonFile("appsettings.development.json", false)
.AddJsonFile("appsettings.e2e.json", false)
.AddJsonFile($"appsettings.e2e.adminapi.json", false)
.Build();

await PublicProjectionHost.ResumeAllDaemonsAsync();
await AdminProjectionHost.ResumeAllDaemonsAsync();
await AcmApiHost.ResumeAllDaemonsAsync();
AdminApiConfiguration = configuration;
}

public IAmazonSQS AmazonSqs { get; set; }
public ISqsClientWrapper SqsClientWrapper { get; set; }

private Action<IWebHostBuilder> ConfigureForTesting(string name)
{
var configuration = new ConfigurationBuilder()
Expand All @@ -96,6 +106,8 @@ private Action<IWebHostBuilder> ConfigureForTesting(string name)
};
}

public IConfigurationRoot AdminApiConfiguration { get; set; }

public async Task DisposeAsync()
{
await AdminApiHost.StopAsync();
Expand Down
Loading

0 comments on commit bf44b16

Please sign in to comment.