Skip to content

Commit

Permalink
Merge pull request #30 from FINTLabs/FFS-1260-FFS-1261
Browse files Browse the repository at this point in the history
FFS-1260 FFS-1261
  • Loading branch information
Battlestad authored Nov 19, 2024
2 parents 940981e + 93899be commit d4d6606
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 54 deletions.
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
package no.fintlabs.flyt.gateway.application.archive.dispatch;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import no.fintlabs.flyt.gateway.application.archive.dispatch.model.instance.ArchiveInstance;
import no.fintlabs.flyt.gateway.application.archive.dispatch.model.instance.JournalpostDto;
import no.fintlabs.flyt.gateway.application.archive.dispatch.sak.CaseDispatchService;
import no.fintlabs.flyt.gateway.application.archive.dispatch.sak.result.CaseDispatchResult;
import no.fintlabs.flyt.kafka.headers.InstanceFlowHeaders;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
Expand Down Expand Up @@ -73,46 +70,31 @@ private Mono<DispatchResult> processById(ArchiveInstance archiveInstance) {
return recordsProcessingService.processRecords(archiveInstance.getCaseId(), false, archiveInstance.getJournalpost());
}

@Getter
@AllArgsConstructor
private static class CaseInfo {
private final boolean newCase;
private final String caseId;
}

private Mono<DispatchResult> processBySearchOrNew(ArchiveInstance archiveInstance) {
Optional<List<JournalpostDto>> journalpostDtosOptional = archiveInstance.getNewCase().getJournalpost();
return caseDispatchService.findCasesBySearch(archiveInstance)
.flatMap(caseSearchResult -> switch (caseSearchResult.getStatus()) {
case ACCEPTED -> {
if (caseSearchResult.getArchiveCaseIds().size() > 1) {
String caseIds = String.join(", ", caseSearchResult.getArchiveCaseIds());

yield Mono.just(DispatchResult.declined("Found multiple cases: " + caseIds));
} else {
yield (
caseSearchResult.getArchiveCaseIds().size() == 1
? Mono.just(new CaseInfo(
false,
caseSearchResult.getArchiveCaseIds().get(0)))
: caseDispatchService.dispatch(archiveInstance.getNewCase())
.map(CaseDispatchResult::getArchiveCaseId)
.map(caseId -> new CaseInfo(true, caseId))
).flatMap(caseInfo ->
journalpostDtosOptional
.filter(journalpostDtos -> !journalpostDtos.isEmpty())
.map(
journalpostDtos -> recordsProcessingService.processRecords(
caseInfo.getCaseId(),
caseInfo.isNewCase(),
journalpostDtos
)
).orElse(Mono.just(
DispatchResult.accepted(caseInfo.getCaseId())
))
);
if (caseSearchResult.getArchiveCaseIds().isEmpty()) {
yield processNew(archiveInstance);
} else {
yield journalpostDtosOptional
.filter(journalpostDtos -> !journalpostDtos.isEmpty())
.map(
journalpostDtos -> recordsProcessingService.processRecords(
caseSearchResult.getArchiveCaseIds().get(0),
false,
journalpostDtos
)
).orElse(Mono.just(
DispatchResult.accepted(caseSearchResult.getArchiveCaseIds().get(0)))
);
}
}

}
case DECLINED -> Mono.just(DispatchResult.declined(caseSearchResult.getErrorMessage()));
case FAILED -> Mono.just(DispatchResult.failed());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,28 +27,27 @@ public ConcurrentMessageListenerContainer<String, ArchiveInstance> instanceReady
ArchiveInstance.class,
instanceFlowConsumerRecord ->
dispatchService.process(
instanceFlowConsumerRecord.getInstanceFlowHeaders(),
instanceFlowConsumerRecord.getConsumerRecord().value()
).doOnNext(dispatchResult -> {
switch (dispatchResult.getStatus()) {
case ACCEPTED -> instanceDispatchedEventProducerService.publish(
instanceFlowConsumerRecord.getInstanceFlowHeaders()
.toBuilder()
.archiveInstanceId(dispatchResult.getArchiveCaseAndRecordsIds())
.build()
);
case DECLINED ->
instanceDispatchingErrorProducerService.publishInstanceDispatchDeclinedErrorEvent(
instanceFlowConsumerRecord.getInstanceFlowHeaders(),
dispatchResult.getErrorMessage()
);
case FAILED -> instanceDispatchingErrorProducerService.publishGeneralSystemErrorEvent(
instanceFlowConsumerRecord.getInstanceFlowHeaders(),
instanceFlowConsumerRecord.getConsumerRecord().value()
).doOnNext(dispatchResult -> {
switch (dispatchResult.getStatus()) {
case ACCEPTED -> instanceDispatchedEventProducerService.publish(
instanceFlowConsumerRecord.getInstanceFlowHeaders()
.toBuilder()
.archiveInstanceId(dispatchResult.getArchiveCaseAndRecordsIds())
.build()
);
case DECLINED ->
instanceDispatchingErrorProducerService.publishInstanceDispatchDeclinedErrorEvent(
instanceFlowConsumerRecord.getInstanceFlowHeaders(),
dispatchResult.getErrorMessage()
);
}
})
.subscribe(),
case FAILED -> instanceDispatchingErrorProducerService.publishGeneralSystemErrorEvent(
instanceFlowConsumerRecord.getInstanceFlowHeaders(),
dispatchResult.getErrorMessage()
);
}
}).block(),
EventConsumerConfiguration
.builder()
.maxPollIntervalMs(1800000)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ public void givenCaseTypeBySearchOrNewWithJournalPostAndNoCaseFoundShouldCallRec
.verifyComplete();

verify(archiveInstance, times(1)).getType();
verify(archiveInstance, times(2)).getNewCase();
verify(archiveInstance, times(3)).getNewCase();
verifyNoMoreInteractions(archiveInstance);

verify(caseDispatchService, times(1)).findCasesBySearch(archiveInstance);
Expand Down Expand Up @@ -387,4 +387,69 @@ public void givenCaseTypeBySearchOrNewAndFailedCaseSearchShouldReturnFailedResul
verifyNoMoreInteractions(caseDispatchService);
}

}
@Test
public void givenCaseTypeBySearchOrNewAndNoCaseFoundAndNewCaseDeclinedShouldReturnDeclinedResult() {
doReturn(CaseDispatchType.BY_SEARCH_OR_NEW).when(archiveInstance).getType();

SakDto sakDto = mock(SakDto.class);
doReturn(sakDto).when(archiveInstance).getNewCase();
doReturn(Optional.of(List.of())).when(sakDto).getJournalpost();

CaseSearchResult caseSearchResult = mock(CaseSearchResult.class);
doReturn(ACCEPTED).when(caseSearchResult).getStatus();
doReturn(List.of()).when(caseSearchResult).getArchiveCaseIds();
doReturn(Mono.just(caseSearchResult)).when(caseDispatchService).findCasesBySearch(archiveInstance);

CaseDispatchResult caseDispatchResult = mock(CaseDispatchResult.class);
doReturn(DispatchStatus.DECLINED).when(caseDispatchResult).getStatus();
doReturn("testErrorMessage").when(caseDispatchResult).getErrorMessage();
doReturn(Mono.just(caseDispatchResult)).when(caseDispatchService).dispatch(sakDto);

StepVerifier.create(
dispatchService.process(instanceFlowHeaders, archiveInstance)
)
.expectNext(DispatchResult.declined("Sak was declined by the destination with message='testErrorMessage'"))
.verifyComplete();

verify(archiveInstance, times(1)).getType();
verify(archiveInstance, times(2)).getNewCase();
verifyNoMoreInteractions(archiveInstance);

verify(caseDispatchService, times(1)).findCasesBySearch(archiveInstance);
verify(caseDispatchService, times(1)).dispatch(sakDto);
verifyNoMoreInteractions(caseDispatchService);
}

@Test
public void givenCaseTypeBySearchOrNewAndNoCaseFoundAndNewCaseFailedShouldReturnFailedResult() {
doReturn(CaseDispatchType.BY_SEARCH_OR_NEW).when(archiveInstance).getType();

SakDto sakDto = mock(SakDto.class);
doReturn(sakDto).when(archiveInstance).getNewCase();
doReturn(Optional.of(List.of())).when(sakDto).getJournalpost();

CaseSearchResult caseSearchResult = mock(CaseSearchResult.class);
doReturn(ACCEPTED).when(caseSearchResult).getStatus();
doReturn(List.of()).when(caseSearchResult).getArchiveCaseIds();
doReturn(Mono.just(caseSearchResult)).when(caseDispatchService).findCasesBySearch(archiveInstance);

CaseDispatchResult caseDispatchResult = mock(CaseDispatchResult.class);
doReturn(DispatchStatus.FAILED).when(caseDispatchResult).getStatus();
doReturn(Mono.just(caseDispatchResult)).when(caseDispatchService).dispatch(sakDto);

StepVerifier.create(
dispatchService.process(instanceFlowHeaders, archiveInstance)
)
.expectNext(DispatchResult.failed("Sak dispatch failed"))
.verifyComplete();

verify(archiveInstance, times(1)).getType();
verify(archiveInstance, times(2)).getNewCase();
verifyNoMoreInteractions(archiveInstance);

verify(caseDispatchService, times(1)).findCasesBySearch(archiveInstance);
verify(caseDispatchService, times(1)).dispatch(sakDto);
verifyNoMoreInteractions(caseDispatchService);
}

}

0 comments on commit d4d6606

Please sign in to comment.