From 9b4a5420067ebfb49bef28073c75aa0274ba3a60 Mon Sep 17 00:00:00 2001 From: mgoworko <37329559+mgoworko@users.noreply.github.com> Date: Fri, 3 Jan 2025 15:47:10 +0100 Subject: [PATCH] Periodic scenario status query improvement (#7386) --- .../ui/process/deployment/DeploymentService.scala | 14 ++++++++------ docs/Changelog.md | 1 + .../periodic/PeriodicDeploymentManager.scala | 8 +++++++- .../periodic/PeriodicProcessService.scala | 12 +++++------- 4 files changed, 21 insertions(+), 14 deletions(-) diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/deployment/DeploymentService.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/deployment/DeploymentService.scala index 24193ea0534..e9476b5f101 100644 --- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/deployment/DeploymentService.scala +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/deployment/DeploymentService.scala @@ -447,11 +447,14 @@ class DeploymentService( .map { case process if process.isFragment => DBIO.successful(process) case process => - val prefetchedStatusDetails = for { - prefetchedStatusDetailsForProcessingTypes <- prefetchedStates.get(process.processingType) - prefetchedStatusDetails <- prefetchedStatusDetailsForProcessingTypes.get(process.name) - } yield prefetchedStatusDetails - prefetchedStatusDetails match { + val prefetchedState = for { + prefetchedStatesForProcessingType <- prefetchedStates.get(process.processingType) + // State is prefetched for all scenarios for the given processing type. + // If there is no information available for a specific scenario name, + // then it means that DM is not aware of this scenario, and we should default to List.empty[StatusDetails]. + prefetchedState = prefetchedStatesForProcessingType.getOrElse(process.name, List.empty) + } yield prefetchedState + prefetchedState match { case Some(prefetchedStatusDetails) => getProcessStateUsingPrefetchedStatus( process.toEntity, @@ -466,7 +469,6 @@ class DeploymentService( None, ).map(state => process.copy(state = Some(state))) } - } .sequence[DB, ScenarioWithDetails] } yield processesWithState diff --git a/docs/Changelog.md b/docs/Changelog.md index 80fbd463b6d..47a500638d8 100644 --- a/docs/Changelog.md +++ b/docs/Changelog.md @@ -39,6 +39,7 @@ * [#7376](https://github.com/TouK/nussknacker/pull/7376) Previously, when count was > 1, the value was evaluated once and emitted times count. For example: if the value was evaluated to be a random UUID and count was 5, one UUID was generated and emitted 5 times. Now in one count batch each value is evaluated separately. +* [#7386](https://github.com/TouK/nussknacker/pull/7386) Improve Periodic DeploymentManager db queries, continuation of [#7323](https://github.com/TouK/nussknacker/pull/7323) ## 1.18 diff --git a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicDeploymentManager.scala b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicDeploymentManager.scala index d5869d1682b..d5bac21c6a1 100644 --- a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicDeploymentManager.scala +++ b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicDeploymentManager.scala @@ -210,7 +210,13 @@ class PeriodicDeploymentManager private[periodic] ( deployedVersionId: Option[VersionId], currentlyPresentedVersionId: Option[VersionId], ): Future[ProcessState] = { - val statusDetails = statusDetailsList.head + val statusDetails = statusDetailsList match { + case head :: _ => + head + case Nil => + val status = PeriodicProcessStatus(List.empty, List.empty) + status.mergedStatusDetails.copy(status = status) + } // TODO: add "real" presentation of deployments in GUI val mergedStatus = processStateDefinitionManager .processState( diff --git a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessService.scala b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessService.scala index ac4ab19cbd5..43c41264a8f 100644 --- a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessService.scala +++ b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessService.scala @@ -503,13 +503,11 @@ class PeriodicProcessService( override def getAllProcessesStates()( implicit freshnessPolicy: DataFreshnessPolicy ): Future[WithDataFreshnessStatus[Map[ProcessName, List[StatusDetails]]]] = { - supported.getAllProcessesStates().flatMap { statusesWithFreshness => - mergeStatusWithDeployments(statusesWithFreshness.value).map { statusDetails => - statusesWithFreshness.map(_.flatMap { case (name, _) => - statusDetails.get(name).map(statusDetails => (name, List(statusDetails))) - }) - } - } + for { + allStatusDetailsInDelegate <- supported.getAllProcessesStates() + allStatusDetailsInPeriodic <- mergeStatusWithDeployments(allStatusDetailsInDelegate.value) + result = allStatusDetailsInPeriodic.map { case (name, status) => (name, List(status)) } + } yield allStatusDetailsInDelegate.map(_ => result) } }