Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle Duplicate decision numbers #132

Merged
merged 16 commits into from
Feb 13, 2025
18 changes: 11 additions & 7 deletions Btms.Backend.Data/Mongo/MongoIndexService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,21 @@ public Task StartAsync(CancellationToken cancellationToken)
{
return Task.WhenAll(
CreateIndex("MatchReferenceIdx",
Builders<ImportNotification>.IndexKeys.Ascending(n => n._MatchReference), cancellationToken),
Builders<ImportNotification>.IndexKeys.Ascending(n => n._MatchReference), cancellationToken: cancellationToken),
CreateIndex("Created",
Builders<ImportNotification>.IndexKeys.Ascending(n => n.Created), cancellationToken),
Builders<ImportNotification>.IndexKeys.Ascending(n => n.Created), cancellationToken: cancellationToken),
CreateIndex("CreatedSource",
Builders<ImportNotification>.IndexKeys.Ascending(n => n.CreatedSource), cancellationToken),
Builders<ImportNotification>.IndexKeys.Ascending(n => n.CreatedSource), cancellationToken: cancellationToken),

CreateIndex("MatchReferenceIdx",
Builders<Movement>.IndexKeys.Ascending(m => m._MatchReferences), cancellationToken),
Builders<Movement>.IndexKeys.Ascending(m => m._MatchReferences), cancellationToken: cancellationToken),
CreateIndex("Created",
Builders<Movement>.IndexKeys.Ascending(m => m.Created), cancellationToken),
Builders<Movement>.IndexKeys.Ascending(m => m.Created), cancellationToken: cancellationToken),
CreateIndex("CreatedSource",
Builders<Movement>.IndexKeys.Ascending(m => m.CreatedSource), cancellationToken)
Builders<Movement>.IndexKeys.Ascending(m => m.CreatedSource), cancellationToken: cancellationToken),
CreateIndex("UniqueDecisionNumber",
Builders<Movement>.IndexKeys.Ascending(m => m.EntryReference)
.Ascending(new StringFieldDefinition<Movement>("decisions.header.decisionNumber")), true, cancellationToken: cancellationToken)

);

Expand All @@ -34,7 +37,7 @@ public Task StopAsync(CancellationToken cancellationToken)
return Task.CompletedTask;
}

private async Task CreateIndex<T>(string name, IndexKeysDefinition<T> keys, CancellationToken cancellationToken)
private async Task CreateIndex<T>(string name, IndexKeysDefinition<T> keys, bool unique = false, CancellationToken cancellationToken = default)
{
try
{
Expand All @@ -43,6 +46,7 @@ private async Task CreateIndex<T>(string name, IndexKeysDefinition<T> keys, Canc
{
Name = name,
Background = true,
Unique = unique,
});
await database.GetCollection<T>(typeof(T).Name).Indexes.CreateOneAsync(indexModel, cancellationToken: cancellationToken);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ private static DecisionContext CreateDecisionContext()
]
}
],
new MatchingResult()
new MatchingResult(),
"TestMessageId"
);
}

Expand Down
11 changes: 10 additions & 1 deletion Btms.Business.Tests/Services/Decisions/DecisionServiceTests.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
using Btms.Backend.Data;
using Btms.Backend.Data.InMemory;
using Btms.Business.Builders;
using Btms.Business.Services.Decisions;
using Btms.Business.Services.Decisions.Finders;
using Btms.Business.Services.Matching;
Expand Down Expand Up @@ -96,6 +99,10 @@ public DecisionServiceTests()
_serviceCollection.AddSingleton<IDecisionService, DecisionService>();
_serviceCollection.AddSingleton(Substitute.For<ILogger<DecisionService>>());
_serviceCollection.AddSingleton(Substitute.For<IPublishBus>());
_serviceCollection.AddSingleton<MovementBuilderFactory>();
_serviceCollection.AddSingleton<DecisionStatusFinder>();
_serviceCollection.AddLogging();
_serviceCollection.AddSingleton<IMongoDbContext, MemoryMongoDbContext>();
}

private ServiceProvider ConfigureDecisionFinders(ImportNotification notification, string[] checkCodes)
Expand Down Expand Up @@ -146,6 +153,7 @@ private static DecisionContext CreateDecisionContext(ImportNotificationTypeEnum?
[
new Movement
{
EntryReference = "movement-1",
Id = "movement-1",
BtmsStatus = MovementStatus.Default(),
Items =
Expand All @@ -158,7 +166,8 @@ private static DecisionContext CreateDecisionContext(ImportNotificationTypeEnum?
]
}
],
matchingResult
matchingResult,
"TestMessageId"
);
}
}
22 changes: 16 additions & 6 deletions Btms.Business.Tests/Services/Decisions/NoMatchDecisionsTest.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using Btms.Backend.Data.InMemory;
using Btms.Business.Builders;
using Btms.Business.Services.Decisions;
using Btms.Business.Services.Decisions.Finders;
Expand Down Expand Up @@ -26,13 +27,16 @@ public async Task WhenClearanceRequest_HasNotMatch_AndH220Checks_ThenNoDecisionS
// Arrange
var movement = GenerateMovementWithH220Checks();

var sut = new DecisionService(NullLogger<DecisionService>.Instance, Array.Empty<IDecisionFinder>());
var sut = new DecisionService(NullLogger<DecisionService>.Instance,
Array.Empty<IDecisionFinder>(),
new MovementBuilderFactory(new DecisionStatusFinder(), NullLogger<MovementBuilder>.Instance),
new MemoryMongoDbContext());

var matchingResult = new MatchingResult();
matchingResult.AddDocumentNoMatch(movement.Id!, movement.Items[0].ItemNumber!.Value, movement.Items[0].Documents?[0].DocumentReference!);

// Act
var decisionResult = await sut.Process(new DecisionContext(new List<ImportNotification>(), [movement], matchingResult), CancellationToken.None);
var decisionResult = await sut.Process(new DecisionContext(new List<ImportNotification>(), [movement], matchingResult, "TestMessageId"), CancellationToken.None);

// Assert
decisionResult.Should().NotBeNull();
Expand All @@ -50,13 +54,16 @@ public async Task WhenClearanceRequest_HasNotMatch_AndNoChecks_ThenNoDecisionSho
// Arrange
var movements = GenerateMovements(false);

var sut = new DecisionService(NullLogger<DecisionService>.Instance, Array.Empty<IDecisionFinder>());
var sut = new DecisionService(NullLogger<DecisionService>.Instance,
Array.Empty<IDecisionFinder>(),
new MovementBuilderFactory(new DecisionStatusFinder(), NullLogger<MovementBuilder>.Instance),
new MemoryMongoDbContext());

var matchingResult = new MatchingResult();
matchingResult.AddDocumentNoMatch(movements[0].Id!, movements[0].Items[0].ItemNumber!.Value, movements[0].Items[0].Documents?[0].DocumentReference!);

// Act
var decisionResult = await sut.Process(new DecisionContext(new List<ImportNotification>(), movements, matchingResult), CancellationToken.None);
var decisionResult = await sut.Process(new DecisionContext(new List<ImportNotification>(), movements, matchingResult, "TestMessageId"), CancellationToken.None);

// Assert
decisionResult.Should().NotBeNull();
Expand All @@ -72,13 +79,16 @@ public async Task WhenClearanceRequest_HasNotMatch_ThenDecisionCodeShouldBeNoMat
var movements = GenerateMovements(true);
movements[0].Items[0].Checks = [new Check() { CheckCode = "TEST" }];

var sut = new DecisionService(NullLogger<DecisionService>.Instance, Array.Empty<IDecisionFinder>());
var sut = new DecisionService(NullLogger<DecisionService>.Instance,
Array.Empty<IDecisionFinder>(),
new MovementBuilderFactory(new DecisionStatusFinder(), NullLogger<MovementBuilder>.Instance),
new MemoryMongoDbContext());

var matchingResult = new MatchingResult();
matchingResult.AddDocumentNoMatch(movements[0].Id!, movements[0].Items[0].ItemNumber!.Value, movements[0].Items[0].Documents?[0].DocumentReference!);

// Act
var decisionResult = await sut.Process(new DecisionContext(new List<ImportNotification>(), movements, matchingResult), CancellationToken.None);
var decisionResult = await sut.Process(new DecisionContext(new List<ImportNotification>(), movements, matchingResult, "TestMessageId"), CancellationToken.None);

// Assert
decisionResult.Should().NotBeNull();
Expand Down
16 changes: 14 additions & 2 deletions Btms.Business/Builders/MovementBuilder.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
using System.Diagnostics.CodeAnalysis;
using Btms.Business.Extensions;
using Btms.Common.Extensions;
using Btms.Model;
using Btms.Model.Auditing;
using Btms.Model.Cds;
using Btms.Model.ChangeLog;
using Btms.Model.Ipaffs;
using Microsoft.Extensions.Logging;
using System.Diagnostics.CodeAnalysis;

namespace Btms.Business.Builders;

Expand Down Expand Up @@ -198,6 +197,19 @@ public AuditEntry UpdateAuditEntry(string messageId, CreatedBySystem source, Cha
return auditEntry;
}

public AuditEntry SkippedAuditEntry(string messageId, CreatedBySystem source)
{
GuardNullMovement();

var auditEntry = AuditEntry.CreateSkippedVersion(
messageId,
_movement.ClearanceRequests[0].Header!.EntryVersionNumber.GetValueOrDefault(),
_movement.UpdatedSource,
source);

return auditEntry;
}

private static string BuildNormalizedDecisionPath(string fullPath)
{
return fullPath.Replace("RAW/DECISIONS/", "");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,20 @@ public class ImportNotificationPreProcessor(IMongoDbContext dbContext, ILogger<I
return PreProcessResult.Changed(internalNotification, changeSet);
}

PreProcessingResult<Model.Ipaffs.ImportNotification> result;
if (internalNotification.UpdatedSource.TrimMicroseconds() ==
existingNotification.UpdatedSource.TrimMicroseconds())
{
return PreProcessResult.AlreadyProcessed(existingNotification);
result = PreProcessResult.AlreadyProcessed(existingNotification);
}
else
{
logger.MessageSkipped(preProcessingContext.MessageId, preProcessingContext.Message.ReferenceNumber!);
result = PreProcessResult.Skipped(existingNotification);
}

logger.MessageSkipped(preProcessingContext.MessageId, preProcessingContext.Message.ReferenceNumber!);
return PreProcessResult.Skipped(existingNotification);
internalNotification.Skipped(preProcessingContext.MessageId, internalNotification.Version.GetValueOrDefault());
return result;

}
}
34 changes: 17 additions & 17 deletions Btms.Business/Pipelines/PreProcessing/MovementPreProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
using Btms.Business.Builders;
using Btms.Model;
using Btms.Model.Auditing;
using Btms.Model.ChangeLog;
using Btms.Types.Alvs;
using Btms.Types.Alvs.Mapping;
using Microsoft.Extensions.Logging;
Expand All @@ -13,15 +12,12 @@ public class MovementPreProcessor(IMongoDbContext dbContext, ILogger<MovementPre
{
public async Task<PreProcessingResult<Movement>> Process(PreProcessingContext<AlvsClearanceRequest> preProcessingContext)
{

var internalClearanceRequest = AlvsClearanceRequestMapper.Map(preProcessingContext.Message);
var mb = movementBuilderFactory.From(internalClearanceRequest);
var existingMovement = await dbContext.Movements.Find(mb.Id);

if (existingMovement is null)
{
// ArgumentNullException.ThrowIfNull(movement);

var auditEntry = mb.CreateAuditEntry(
preProcessingContext.MessageId,
CreatedBySystem.Cds
Expand All @@ -33,11 +29,10 @@ public async Task<PreProcessingResult<Movement>> Process(PreProcessingContext<Al
return PreProcessResult.New(movement);
}

// if (movement.ClearanceRequests[^1].Header?.EntryVersionNumber > existingMovement.ClearanceRequests[0].Header?.EntryVersionNumber)
var existingBuilder = movementBuilderFactory.From(existingMovement);

if (mb.IsEntryVersionNumberGreaterThan(existingMovement.ClearanceRequests[0].Header?.EntryVersionNumber))
{
var existingBuilder = movementBuilderFactory.From(existingMovement);
// var changeSet = movement.ClearanceRequests[^1].GenerateChangeSet(existingMovement.ClearanceRequests[0]);
var changeSet = mb.GenerateChangeSet(existingBuilder);

var auditEntry = mb.UpdateAuditEntry(
Expand All @@ -50,25 +45,30 @@ public async Task<PreProcessingResult<Movement>> Process(PreProcessingContext<Al

existingBuilder.ReplaceClearanceRequests(mb);

// existingMovement.ClearanceRequests.RemoveAll(x =>
// x.Header?.EntryReference ==
// movement.ClearanceRequests[0].Header?.EntryReference);
// existingMovement.ClearanceRequests.AddRange(movement.ClearanceRequests);
//
// existingMovement.Items.AddRange(movement.Items);

await dbContext.Movements.Update(existingMovement);

return PreProcessResult.Changed(existingMovement, changeSet);
}

PreProcessingResult<Movement> result;

if (mb.IsEntryVersionNumberEqualTo(existingMovement.ClearanceRequests[0].Header?.EntryVersionNumber))
{
return PreProcessResult.AlreadyProcessed(existingMovement);
result = PreProcessResult.AlreadyProcessed(existingMovement);
}
else
{
logger.MessageSkipped(preProcessingContext.MessageId, preProcessingContext.Message.Header?.EntryReference!);
result = PreProcessResult.Skipped(existingMovement);
}

var skippedAuditEntry = existingBuilder.SkippedAuditEntry(
preProcessingContext.MessageId,
CreatedBySystem.Cds);

existingBuilder.Update(skippedAuditEntry);

logger.MessageSkipped(preProcessingContext.MessageId, preProcessingContext.Message.Header?.EntryReference!);
return PreProcessResult.Skipped(existingMovement);
return result;

}
}
5 changes: 4 additions & 1 deletion Btms.Business/Services/Decisions/DecisionContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@ namespace Btms.Business.Services.Decisions;
public class DecisionContext(
List<ImportNotification> notifications,
List<Movement> movements,
MatchingResult matchingResult)
MatchingResult matchingResult,
string messageId)
{
public List<ImportNotification> Notifications { get; } = notifications;
public List<Movement> Movements { get; } = movements;
public MatchingResult MatchingResult { get; } = matchingResult;

public string MessageId { get; } = messageId;
}
31 changes: 30 additions & 1 deletion Btms.Business/Services/Decisions/DecisionService.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
using System.Diagnostics.CodeAnalysis;
using Btms.Backend.Data;
using Btms.Business.Builders;
using Btms.Business.Services.Decisions.Finders;
using Btms.Business.Services.Matching;
using Btms.Model.Cds;
using Btms.Model.Ipaffs;
using Btms.Types.Alvs.Mapping;
using Microsoft.Extensions.Logging;

namespace Btms.Business.Services.Decisions;

public class DecisionService(ILogger<DecisionService> logger, IEnumerable<IDecisionFinder> decisionFinders) : IDecisionService
public class DecisionService(ILogger<DecisionService> logger, IEnumerable<IDecisionFinder> decisionFinders,
MovementBuilderFactory movementBuilderFactory, IMongoDbContext dbContext) : IDecisionService
{
public async Task<DecisionResult> Process(DecisionContext decisionContext, CancellationToken cancellationToken)
{
Expand All @@ -19,6 +24,30 @@ public async Task<DecisionResult> Process(DecisionContext decisionContext, Cance
decisionResult.AddDecisionMessage(message);
}

var notificationContext = decisionContext.Notifications
.Select(n => new DecisionImportNotifications
{
Id = n.Id!,
Version = n.Version,
Created = n.Created,
Updated = n.Updated,
UpdatedEntity = n.UpdatedEntity,
CreatedSource = n.CreatedSource!.Value,
UpdatedSource = n.UpdatedSource!.Value
})
.ToList();

foreach (var decisionResultDecisionsMessage in decisionResult.DecisionsMessages)
{
var internalDecision = DecisionMapper.Map(decisionResultDecisionsMessage);
var m = decisionContext.Movements.First(m => m.Id!.Equals(decisionResultDecisionsMessage.Header?.EntryReference));
var existingMovementBuilder = movementBuilderFactory
.From(m)
.MergeDecision(decisionContext.MessageId, internalDecision, notificationContext);
m = existingMovementBuilder.Build();
await dbContext.Movements.Update(m, cancellationToken);
}

return decisionResult;
}

Expand Down
6 changes: 3 additions & 3 deletions Btms.Consumers/AlvsClearanceRequestConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,12 @@ public async Task OnHandle(AlvsClearanceRequest message, CancellationToken cance
new MatchingContext(linkResult.Notifications, linkResult.Movements), Context.CancellationToken);

var decisionContext =
new DecisionContext(linkResult.Notifications, linkResult.Movements, matchResult);
new DecisionContext(linkResult.Notifications, linkResult.Movements, matchResult, messageId);
var decisionResult = await decisionService.Process(decisionContext, Context.CancellationToken);

await validationService.PostDecision(linkResult, decisionResult, Context.CancellationToken);

await dbContext.SaveChangesAsync(Context.CancellationToken);

await Context.Bus.PublishDecisions(messageId, decisionResult, decisionContext, cancellationToken: cancellationToken);
}
else
{
Expand All @@ -87,6 +85,8 @@ public async Task OnHandle(AlvsClearanceRequest message, CancellationToken cance
message.Header?.EntryReference, messageId, preProcessingResult.Outcome.ToString(),
preProcessingResult.Record.GetLatestAuditEntry().Status);
}

await dbContext.SaveChangesAsync(Context.CancellationToken);
}
}

Expand Down
Loading
Loading