From debbd48cb7cbb5a82faf566462abeb41c157004a Mon Sep 17 00:00:00 2001 From: Piotr Przybylski Date: Mon, 20 Jan 2025 20:26:34 +0100 Subject: [PATCH] Ignore jobs in CANCELLING status when checking for duplicate running jobs --- .../inconsistency/InconsistentStateDetector.scala | 6 +++--- .../engine/api/deployment/simple/SimpleStateStatus.scala | 6 ++++-- .../nussknacker/engine/management/FlinkRestManager.scala | 4 ++-- .../engine/management/FlinkRestManagerSpec.scala | 3 ++- 4 files changed, 11 insertions(+), 8 deletions(-) diff --git a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/inconsistency/InconsistentStateDetector.scala b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/inconsistency/InconsistentStateDetector.scala index 087c8d9f32d..c2f713a020a 100644 --- a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/inconsistency/InconsistentStateDetector.scala +++ b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/inconsistency/InconsistentStateDetector.scala @@ -41,7 +41,7 @@ class InconsistentStateDetector extends LazyLogging { private def doExtractAtMostOneStatus( statusDetails: List[StatusDetails] ): Either[StatusDetails, Option[StatusDetails]] = { - val notFinalStatuses = statusDetails.filterNot(isFinalStatus) + val notFinalStatuses = statusDetails.filterNot(isFinalOrTransitioningToFinalStatus) (statusDetails, notFinalStatuses) match { case (Nil, Nil) => Right(None) case (_, singleNotFinished :: Nil) => Right(Some(singleNotFinished)) @@ -144,8 +144,8 @@ class InconsistentStateDetector extends LazyLogging { SimpleStateStatus.DefaultFollowingDeployStatuses.contains(state.status) } - protected def isFinalStatus(state: StatusDetails): Boolean = - SimpleStateStatus.isFinalStatus(state.status) + protected def isFinalOrTransitioningToFinalStatus(state: StatusDetails): Boolean = + SimpleStateStatus.isFinalOrTransitioningToFinalStatus(state.status) protected def isFinishedStatus(state: StatusDetails): Boolean = { state.status == SimpleStateStatus.Finished diff --git a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/simple/SimpleStateStatus.scala b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/simple/SimpleStateStatus.scala index f18131e4c8a..2e2c59f67dd 100644 --- a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/simple/SimpleStateStatus.scala +++ b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/simple/SimpleStateStatus.scala @@ -85,8 +85,10 @@ object SimpleStateStatus { val DefaultFollowingDeployStatuses: Set[StateStatus] = Set(DuringDeploy, Running) - def isFinalStatus(status: StateStatus): Boolean = - List(SimpleStateStatus.Finished, SimpleStateStatus.Canceled).contains(status) || ProblemStateStatus.isProblemStatus( + def isFinalOrTransitioningToFinalStatus(status: StateStatus): Boolean = + List(SimpleStateStatus.Finished, SimpleStateStatus.DuringCancel, SimpleStateStatus.Canceled).contains( + status + ) || ProblemStateStatus.isProblemStatus( status ) diff --git a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkRestManager.scala b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkRestManager.scala index 3f22e5a8d17..8673bdc43ce 100644 --- a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkRestManager.scala +++ b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkRestManager.scala @@ -244,10 +244,10 @@ class FlinkRestManager( deploymentId: Option[DeploymentId], statuses: List[StatusDetails] ) = { - statuses.filterNot(details => SimpleStateStatus.isFinalStatus(details.status)) match { + statuses.filterNot(details => SimpleStateStatus.isFinalOrTransitioningToFinalStatus(details.status)) match { case Nil => logger.warn( - s"Trying to cancel $processName${deploymentId.map(" with id: " + _).getOrElse("")} which is not present or finished on Flink." + s"Trying to cancel $processName${deploymentId.map(" with id: " + _).getOrElse("")} which is not active on Flink." ) Future.successful(()) case single :: Nil => cancelFlinkJob(single) diff --git a/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/FlinkRestManagerSpec.scala b/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/FlinkRestManagerSpec.scala index 200458a48ce..83ed7e8c791 100644 --- a/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/FlinkRestManagerSpec.scala +++ b/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/FlinkRestManagerSpec.scala @@ -408,7 +408,8 @@ class FlinkRestManagerSpec extends AnyFunSuite with Matchers with PatientScalaFu test("return running status if cancelled job has last-modification date later then running job") { statuses = List( JobOverview("2343", "p1", 20L, 10L, JobStatus.RUNNING.name(), tasksOverview(running = 1)), - JobOverview("1111", "p1", 30L, 5L, JobStatus.CANCELED.name(), tasksOverview(canceled = 1)) + JobOverview("1111", "p1", 30L, 5L, JobStatus.CANCELED.name(), tasksOverview(canceled = 1)), + JobOverview("2222", "p1", 30L, 5L, JobStatus.CANCELLING.name(), tasksOverview(canceling = 1)) ) val manager = createManager(statuses)