diff --git a/Btms.Backend.Data/Mongo/MongoIndexService.cs b/Btms.Backend.Data/Mongo/MongoIndexService.cs index 05fa10dc..b340fd78 100644 --- a/Btms.Backend.Data/Mongo/MongoIndexService.cs +++ b/Btms.Backend.Data/Mongo/MongoIndexService.cs @@ -12,18 +12,21 @@ public Task StartAsync(CancellationToken cancellationToken) { return Task.WhenAll( CreateIndex("MatchReferenceIdx", - Builders.IndexKeys.Ascending(n => n._MatchReference), cancellationToken), + Builders.IndexKeys.Ascending(n => n._MatchReference), cancellationToken: cancellationToken), CreateIndex("Created", - Builders.IndexKeys.Ascending(n => n.Created), cancellationToken), + Builders.IndexKeys.Ascending(n => n.Created), cancellationToken: cancellationToken), CreateIndex("CreatedSource", - Builders.IndexKeys.Ascending(n => n.CreatedSource), cancellationToken), + Builders.IndexKeys.Ascending(n => n.CreatedSource), cancellationToken: cancellationToken), CreateIndex("MatchReferenceIdx", - Builders.IndexKeys.Ascending(m => m._MatchReferences), cancellationToken), + Builders.IndexKeys.Ascending(m => m._MatchReferences), cancellationToken: cancellationToken), CreateIndex("Created", - Builders.IndexKeys.Ascending(m => m.Created), cancellationToken), + Builders.IndexKeys.Ascending(m => m.Created), cancellationToken: cancellationToken), CreateIndex("CreatedSource", - Builders.IndexKeys.Ascending(m => m.CreatedSource), cancellationToken) + Builders.IndexKeys.Ascending(m => m.CreatedSource), cancellationToken: cancellationToken), + CreateIndex("UniqueDecisionNumber", + Builders.IndexKeys.Ascending(m => m.EntryReference) + .Ascending(new StringFieldDefinition("decisions.header.decisionNumber")), true, cancellationToken: cancellationToken) ); @@ -34,7 +37,7 @@ public Task StopAsync(CancellationToken cancellationToken) return Task.CompletedTask; } - private async Task CreateIndex(string name, IndexKeysDefinition keys, CancellationToken cancellationToken) + private async Task CreateIndex(string name, IndexKeysDefinition keys, bool unique = false, CancellationToken cancellationToken = default) { try { @@ -43,6 +46,7 @@ private async Task CreateIndex(string name, IndexKeysDefinition keys, Canc { Name = name, Background = true, + Unique = unique, }); await database.GetCollection(typeof(T).Name).Indexes.CreateOneAsync(indexModel, cancellationToken: cancellationToken); } diff --git a/Btms.Business.Tests/Services/Decisions/DecisionMessageBuilderTests.cs b/Btms.Business.Tests/Services/Decisions/DecisionMessageBuilderTests.cs index 3eb81e2f..37e42532 100644 --- a/Btms.Business.Tests/Services/Decisions/DecisionMessageBuilderTests.cs +++ b/Btms.Business.Tests/Services/Decisions/DecisionMessageBuilderTests.cs @@ -100,7 +100,8 @@ private static DecisionContext CreateDecisionContext() ] } ], - new MatchingResult() + new MatchingResult(), + "TestMessageId" ); } diff --git a/Btms.Business.Tests/Services/Decisions/DecisionServiceTests.cs b/Btms.Business.Tests/Services/Decisions/DecisionServiceTests.cs index 7dd2c42e..7f0fbd13 100644 --- a/Btms.Business.Tests/Services/Decisions/DecisionServiceTests.cs +++ b/Btms.Business.Tests/Services/Decisions/DecisionServiceTests.cs @@ -1,3 +1,6 @@ +using Btms.Backend.Data; +using Btms.Backend.Data.InMemory; +using Btms.Business.Builders; using Btms.Business.Services.Decisions; using Btms.Business.Services.Decisions.Finders; using Btms.Business.Services.Matching; @@ -96,6 +99,10 @@ public DecisionServiceTests() _serviceCollection.AddSingleton(); _serviceCollection.AddSingleton(Substitute.For>()); _serviceCollection.AddSingleton(Substitute.For()); + _serviceCollection.AddSingleton(); + _serviceCollection.AddSingleton(); + _serviceCollection.AddLogging(); + _serviceCollection.AddSingleton(); } private ServiceProvider ConfigureDecisionFinders(ImportNotification notification, string[] checkCodes) @@ -146,6 +153,7 @@ private static DecisionContext CreateDecisionContext(ImportNotificationTypeEnum? [ new Movement { + EntryReference = "movement-1", Id = "movement-1", BtmsStatus = MovementStatus.Default(), Items = @@ -158,7 +166,8 @@ private static DecisionContext CreateDecisionContext(ImportNotificationTypeEnum? ] } ], - matchingResult + matchingResult, + "TestMessageId" ); } } \ No newline at end of file diff --git a/Btms.Business.Tests/Services/Decisions/NoMatchDecisionsTest.cs b/Btms.Business.Tests/Services/Decisions/NoMatchDecisionsTest.cs index d4109664..c8d12d6c 100644 --- a/Btms.Business.Tests/Services/Decisions/NoMatchDecisionsTest.cs +++ b/Btms.Business.Tests/Services/Decisions/NoMatchDecisionsTest.cs @@ -1,3 +1,4 @@ +using Btms.Backend.Data.InMemory; using Btms.Business.Builders; using Btms.Business.Services.Decisions; using Btms.Business.Services.Decisions.Finders; @@ -26,13 +27,16 @@ public async Task WhenClearanceRequest_HasNotMatch_AndH220Checks_ThenNoDecisionS // Arrange var movement = GenerateMovementWithH220Checks(); - var sut = new DecisionService(NullLogger.Instance, Array.Empty()); + var sut = new DecisionService(NullLogger.Instance, + Array.Empty(), + new MovementBuilderFactory(new DecisionStatusFinder(), NullLogger.Instance), + new MemoryMongoDbContext()); var matchingResult = new MatchingResult(); matchingResult.AddDocumentNoMatch(movement.Id!, movement.Items[0].ItemNumber!.Value, movement.Items[0].Documents?[0].DocumentReference!); // Act - var decisionResult = await sut.Process(new DecisionContext(new List(), [movement], matchingResult), CancellationToken.None); + var decisionResult = await sut.Process(new DecisionContext(new List(), [movement], matchingResult, "TestMessageId"), CancellationToken.None); // Assert decisionResult.Should().NotBeNull(); @@ -50,13 +54,16 @@ public async Task WhenClearanceRequest_HasNotMatch_AndNoChecks_ThenNoDecisionSho // Arrange var movements = GenerateMovements(false); - var sut = new DecisionService(NullLogger.Instance, Array.Empty()); + var sut = new DecisionService(NullLogger.Instance, + Array.Empty(), + new MovementBuilderFactory(new DecisionStatusFinder(), NullLogger.Instance), + new MemoryMongoDbContext()); var matchingResult = new MatchingResult(); matchingResult.AddDocumentNoMatch(movements[0].Id!, movements[0].Items[0].ItemNumber!.Value, movements[0].Items[0].Documents?[0].DocumentReference!); // Act - var decisionResult = await sut.Process(new DecisionContext(new List(), movements, matchingResult), CancellationToken.None); + var decisionResult = await sut.Process(new DecisionContext(new List(), movements, matchingResult, "TestMessageId"), CancellationToken.None); // Assert decisionResult.Should().NotBeNull(); @@ -72,13 +79,16 @@ public async Task WhenClearanceRequest_HasNotMatch_ThenDecisionCodeShouldBeNoMat var movements = GenerateMovements(true); movements[0].Items[0].Checks = [new Check() { CheckCode = "TEST" }]; - var sut = new DecisionService(NullLogger.Instance, Array.Empty()); + var sut = new DecisionService(NullLogger.Instance, + Array.Empty(), + new MovementBuilderFactory(new DecisionStatusFinder(), NullLogger.Instance), + new MemoryMongoDbContext()); var matchingResult = new MatchingResult(); matchingResult.AddDocumentNoMatch(movements[0].Id!, movements[0].Items[0].ItemNumber!.Value, movements[0].Items[0].Documents?[0].DocumentReference!); // Act - var decisionResult = await sut.Process(new DecisionContext(new List(), movements, matchingResult), CancellationToken.None); + var decisionResult = await sut.Process(new DecisionContext(new List(), movements, matchingResult, "TestMessageId"), CancellationToken.None); // Assert decisionResult.Should().NotBeNull(); diff --git a/Btms.Business/Builders/MovementBuilder.cs b/Btms.Business/Builders/MovementBuilder.cs index debf0e72..d99cd23b 100644 --- a/Btms.Business/Builders/MovementBuilder.cs +++ b/Btms.Business/Builders/MovementBuilder.cs @@ -1,12 +1,11 @@ -using System.Diagnostics.CodeAnalysis; using Btms.Business.Extensions; using Btms.Common.Extensions; using Btms.Model; using Btms.Model.Auditing; using Btms.Model.Cds; using Btms.Model.ChangeLog; -using Btms.Model.Ipaffs; using Microsoft.Extensions.Logging; +using System.Diagnostics.CodeAnalysis; namespace Btms.Business.Builders; @@ -198,6 +197,19 @@ public AuditEntry UpdateAuditEntry(string messageId, CreatedBySystem source, Cha return auditEntry; } + public AuditEntry SkippedAuditEntry(string messageId, CreatedBySystem source) + { + GuardNullMovement(); + + var auditEntry = AuditEntry.CreateSkippedVersion( + messageId, + _movement.ClearanceRequests[0].Header!.EntryVersionNumber.GetValueOrDefault(), + _movement.UpdatedSource, + source); + + return auditEntry; + } + private static string BuildNormalizedDecisionPath(string fullPath) { return fullPath.Replace("RAW/DECISIONS/", ""); diff --git a/Btms.Business/Pipelines/PreProcessing/ImportNotificationPreProcessor.cs b/Btms.Business/Pipelines/PreProcessing/ImportNotificationPreProcessor.cs index 1c866290..2f233787 100644 --- a/Btms.Business/Pipelines/PreProcessing/ImportNotificationPreProcessor.cs +++ b/Btms.Business/Pipelines/PreProcessing/ImportNotificationPreProcessor.cs @@ -59,14 +59,20 @@ public class ImportNotificationPreProcessor(IMongoDbContext dbContext, ILogger result; if (internalNotification.UpdatedSource.TrimMicroseconds() == existingNotification.UpdatedSource.TrimMicroseconds()) { - return PreProcessResult.AlreadyProcessed(existingNotification); + result = PreProcessResult.AlreadyProcessed(existingNotification); + } + else + { + logger.MessageSkipped(preProcessingContext.MessageId, preProcessingContext.Message.ReferenceNumber!); + result = PreProcessResult.Skipped(existingNotification); } - logger.MessageSkipped(preProcessingContext.MessageId, preProcessingContext.Message.ReferenceNumber!); - return PreProcessResult.Skipped(existingNotification); + internalNotification.Skipped(preProcessingContext.MessageId, internalNotification.Version.GetValueOrDefault()); + return result; } } \ No newline at end of file diff --git a/Btms.Business/Pipelines/PreProcessing/MovementPreProcessor.cs b/Btms.Business/Pipelines/PreProcessing/MovementPreProcessor.cs index 69d1f29a..9d79b3aa 100644 --- a/Btms.Business/Pipelines/PreProcessing/MovementPreProcessor.cs +++ b/Btms.Business/Pipelines/PreProcessing/MovementPreProcessor.cs @@ -2,7 +2,6 @@ using Btms.Business.Builders; using Btms.Model; using Btms.Model.Auditing; -using Btms.Model.ChangeLog; using Btms.Types.Alvs; using Btms.Types.Alvs.Mapping; using Microsoft.Extensions.Logging; @@ -13,15 +12,12 @@ public class MovementPreProcessor(IMongoDbContext dbContext, ILogger> Process(PreProcessingContext preProcessingContext) { - var internalClearanceRequest = AlvsClearanceRequestMapper.Map(preProcessingContext.Message); var mb = movementBuilderFactory.From(internalClearanceRequest); var existingMovement = await dbContext.Movements.Find(mb.Id); if (existingMovement is null) { - // ArgumentNullException.ThrowIfNull(movement); - var auditEntry = mb.CreateAuditEntry( preProcessingContext.MessageId, CreatedBySystem.Cds @@ -33,11 +29,10 @@ public async Task> Process(PreProcessingContext existingMovement.ClearanceRequests[0].Header?.EntryVersionNumber) + var existingBuilder = movementBuilderFactory.From(existingMovement); + if (mb.IsEntryVersionNumberGreaterThan(existingMovement.ClearanceRequests[0].Header?.EntryVersionNumber)) { - var existingBuilder = movementBuilderFactory.From(existingMovement); - // var changeSet = movement.ClearanceRequests[^1].GenerateChangeSet(existingMovement.ClearanceRequests[0]); var changeSet = mb.GenerateChangeSet(existingBuilder); var auditEntry = mb.UpdateAuditEntry( @@ -50,25 +45,30 @@ public async Task> Process(PreProcessingContext - // x.Header?.EntryReference == - // movement.ClearanceRequests[0].Header?.EntryReference); - // existingMovement.ClearanceRequests.AddRange(movement.ClearanceRequests); - // - // existingMovement.Items.AddRange(movement.Items); - await dbContext.Movements.Update(existingMovement); return PreProcessResult.Changed(existingMovement, changeSet); } + PreProcessingResult result; + if (mb.IsEntryVersionNumberEqualTo(existingMovement.ClearanceRequests[0].Header?.EntryVersionNumber)) { - return PreProcessResult.AlreadyProcessed(existingMovement); + result = PreProcessResult.AlreadyProcessed(existingMovement); } + else + { + logger.MessageSkipped(preProcessingContext.MessageId, preProcessingContext.Message.Header?.EntryReference!); + result = PreProcessResult.Skipped(existingMovement); + } + + var skippedAuditEntry = existingBuilder.SkippedAuditEntry( + preProcessingContext.MessageId, + CreatedBySystem.Cds); + + existingBuilder.Update(skippedAuditEntry); - logger.MessageSkipped(preProcessingContext.MessageId, preProcessingContext.Message.Header?.EntryReference!); - return PreProcessResult.Skipped(existingMovement); + return result; } } \ No newline at end of file diff --git a/Btms.Business/Services/Decisions/DecisionContext.cs b/Btms.Business/Services/Decisions/DecisionContext.cs index c18011f4..fe3cb4b4 100644 --- a/Btms.Business/Services/Decisions/DecisionContext.cs +++ b/Btms.Business/Services/Decisions/DecisionContext.cs @@ -7,9 +7,12 @@ namespace Btms.Business.Services.Decisions; public class DecisionContext( List notifications, List movements, - MatchingResult matchingResult) + MatchingResult matchingResult, + string messageId) { public List Notifications { get; } = notifications; public List Movements { get; } = movements; public MatchingResult MatchingResult { get; } = matchingResult; + + public string MessageId { get; } = messageId; } \ No newline at end of file diff --git a/Btms.Business/Services/Decisions/DecisionService.cs b/Btms.Business/Services/Decisions/DecisionService.cs index 06a901d1..547f6528 100644 --- a/Btms.Business/Services/Decisions/DecisionService.cs +++ b/Btms.Business/Services/Decisions/DecisionService.cs @@ -1,12 +1,17 @@ using System.Diagnostics.CodeAnalysis; +using Btms.Backend.Data; +using Btms.Business.Builders; using Btms.Business.Services.Decisions.Finders; using Btms.Business.Services.Matching; +using Btms.Model.Cds; using Btms.Model.Ipaffs; +using Btms.Types.Alvs.Mapping; using Microsoft.Extensions.Logging; namespace Btms.Business.Services.Decisions; -public class DecisionService(ILogger logger, IEnumerable decisionFinders) : IDecisionService +public class DecisionService(ILogger logger, IEnumerable decisionFinders, + MovementBuilderFactory movementBuilderFactory, IMongoDbContext dbContext) : IDecisionService { public async Task Process(DecisionContext decisionContext, CancellationToken cancellationToken) { @@ -19,6 +24,30 @@ public async Task Process(DecisionContext decisionContext, Cance decisionResult.AddDecisionMessage(message); } + var notificationContext = decisionContext.Notifications + .Select(n => new DecisionImportNotifications + { + Id = n.Id!, + Version = n.Version, + Created = n.Created, + Updated = n.Updated, + UpdatedEntity = n.UpdatedEntity, + CreatedSource = n.CreatedSource!.Value, + UpdatedSource = n.UpdatedSource!.Value + }) + .ToList(); + + foreach (var decisionResultDecisionsMessage in decisionResult.DecisionsMessages) + { + var internalDecision = DecisionMapper.Map(decisionResultDecisionsMessage); + var m = decisionContext.Movements.First(m => m.Id!.Equals(decisionResultDecisionsMessage.Header?.EntryReference)); + var existingMovementBuilder = movementBuilderFactory + .From(m) + .MergeDecision(decisionContext.MessageId, internalDecision, notificationContext); + m = existingMovementBuilder.Build(); + await dbContext.Movements.Update(m, cancellationToken); + } + return decisionResult; } diff --git a/Btms.Consumers/AlvsClearanceRequestConsumer.cs b/Btms.Consumers/AlvsClearanceRequestConsumer.cs index 6d647865..1de1a567 100644 --- a/Btms.Consumers/AlvsClearanceRequestConsumer.cs +++ b/Btms.Consumers/AlvsClearanceRequestConsumer.cs @@ -71,14 +71,12 @@ public async Task OnHandle(AlvsClearanceRequest message, CancellationToken cance new MatchingContext(linkResult.Notifications, linkResult.Movements), Context.CancellationToken); var decisionContext = - new DecisionContext(linkResult.Notifications, linkResult.Movements, matchResult); + new DecisionContext(linkResult.Notifications, linkResult.Movements, matchResult, messageId); var decisionResult = await decisionService.Process(decisionContext, Context.CancellationToken); await validationService.PostDecision(linkResult, decisionResult, Context.CancellationToken); - await dbContext.SaveChangesAsync(Context.CancellationToken); - await Context.Bus.PublishDecisions(messageId, decisionResult, decisionContext, cancellationToken: cancellationToken); } else { @@ -87,6 +85,8 @@ public async Task OnHandle(AlvsClearanceRequest message, CancellationToken cance message.Header?.EntryReference, messageId, preProcessingResult.Outcome.ToString(), preProcessingResult.Record.GetLatestAuditEntry().Status); } + + await dbContext.SaveChangesAsync(Context.CancellationToken); } } diff --git a/Btms.Consumers/NotificationConsumer.cs b/Btms.Consumers/NotificationConsumer.cs index d8d73ac8..6d5f55f8 100644 --- a/Btms.Consumers/NotificationConsumer.cs +++ b/Btms.Consumers/NotificationConsumer.cs @@ -11,6 +11,8 @@ using Btms.Business.Services.Validating; using Btms.Model.Cds; using DecisionContext = Btms.Business.Services.Decisions.DecisionContext; +using Btms.Business.Builders; +using Btms.Types.Alvs.Mapping; namespace Btms.Consumers; @@ -75,35 +77,26 @@ public async Task OnHandle(ImportNotification message, CancellationToken cancell var matchResult = await matchingService.Process( new MatchingContext(notifications, linkResult.Movements), Context.CancellationToken); - var decisionContext = new DecisionContext(notifications, linkResult.Movements, matchResult); + var decisionContext = new DecisionContext(notifications, linkResult.Movements, matchResult, messageId); var decisionResult = await decisionService.Process(decisionContext, Context.CancellationToken); await validationService.PostDecision(linkResult, decisionResult, Context.CancellationToken); - - await dbContext.SaveChangesAsync(Context.CancellationToken); - - await Context.Bus.PublishDecisions(messageId, decisionResult, decisionContext, cancellationToken: cancellationToken); } else if (preProcessingResult.IsDeleted()) { var linkContext = new ImportNotificationLinkContext(preProcessingResult.Record, preProcessingResult.ChangeSet); await linkingService.UnLink(linkContext, Context.CancellationToken); - await dbContext.SaveChangesAsync(Context.CancellationToken); } else { - if (preProcessingResult.Outcome != PreProcessingOutcome.Skipped || - preProcessingResult.Outcome != PreProcessingOutcome.AlreadyProcessed) - { - await dbContext.SaveChangesAsync(Context.CancellationToken); - } - logger.LogWarning("Skipping Linking/Matching/Decisions for {Id} with MessageId {MessageId} with Pre-Processing Outcome {PreProcessingOutcome} Because Last AuditState was {AuditState}", message.ReferenceNumber, messageId, preProcessingResult.Outcome.ToString(), preProcessingResult.Record.GetLatestAuditEntry().Status); LogStatus("IsCreatedOrChanged=false", message); } + await dbContext.SaveChangesAsync(Context.CancellationToken); + } } diff --git a/Btms.Model/ChangeLog/ChangeSet.cs b/Btms.Model/ChangeLog/ChangeSet.cs index ba06f0a3..6c920053 100644 --- a/Btms.Model/ChangeLog/ChangeSet.cs +++ b/Btms.Model/ChangeLog/ChangeSet.cs @@ -13,7 +13,8 @@ public class ChangeSet(JsonPatch jsonPatch, JsonNode jsonNodePrevious) { TypeInfoResolver = new ChangeSetTypeInfoResolver(), PropertyNameCaseInsensitive = true, - NumberHandling = JsonNumberHandling.AllowReadingFromString + NumberHandling = JsonNumberHandling.AllowReadingFromString, + Converters = { new JsonStringEnumConverter(JsonNamingPolicy.CamelCase) } }; public JsonPatch JsonPatch { get; } = jsonPatch;