Skip to content

Commit

Permalink
Ignore jobs in CANCELLING status when checking for duplicate running …
Browse files Browse the repository at this point in the history
…jobs
  • Loading branch information
piotrp committed Jan 20, 2025
1 parent 53ff3f7 commit debbd48
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class InconsistentStateDetector extends LazyLogging {
private def doExtractAtMostOneStatus(
statusDetails: List[StatusDetails]
): Either[StatusDetails, Option[StatusDetails]] = {
val notFinalStatuses = statusDetails.filterNot(isFinalStatus)
val notFinalStatuses = statusDetails.filterNot(isFinalOrTransitioningToFinalStatus)
(statusDetails, notFinalStatuses) match {
case (Nil, Nil) => Right(None)
case (_, singleNotFinished :: Nil) => Right(Some(singleNotFinished))
Expand Down Expand Up @@ -144,8 +144,8 @@ class InconsistentStateDetector extends LazyLogging {
SimpleStateStatus.DefaultFollowingDeployStatuses.contains(state.status)
}

protected def isFinalStatus(state: StatusDetails): Boolean =
SimpleStateStatus.isFinalStatus(state.status)
protected def isFinalOrTransitioningToFinalStatus(state: StatusDetails): Boolean =
SimpleStateStatus.isFinalOrTransitioningToFinalStatus(state.status)

protected def isFinishedStatus(state: StatusDetails): Boolean = {
state.status == SimpleStateStatus.Finished
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,10 @@ object SimpleStateStatus {

val DefaultFollowingDeployStatuses: Set[StateStatus] = Set(DuringDeploy, Running)

def isFinalStatus(status: StateStatus): Boolean =
List(SimpleStateStatus.Finished, SimpleStateStatus.Canceled).contains(status) || ProblemStateStatus.isProblemStatus(
def isFinalOrTransitioningToFinalStatus(status: StateStatus): Boolean =
List(SimpleStateStatus.Finished, SimpleStateStatus.DuringCancel, SimpleStateStatus.Canceled).contains(
status
) || ProblemStateStatus.isProblemStatus(
status
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,10 +244,10 @@ class FlinkRestManager(
deploymentId: Option[DeploymentId],
statuses: List[StatusDetails]
) = {
statuses.filterNot(details => SimpleStateStatus.isFinalStatus(details.status)) match {
statuses.filterNot(details => SimpleStateStatus.isFinalOrTransitioningToFinalStatus(details.status)) match {
case Nil =>
logger.warn(
s"Trying to cancel $processName${deploymentId.map(" with id: " + _).getOrElse("")} which is not present or finished on Flink."
s"Trying to cancel $processName${deploymentId.map(" with id: " + _).getOrElse("")} which is not active on Flink."
)
Future.successful(())
case single :: Nil => cancelFlinkJob(single)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,8 @@ class FlinkRestManagerSpec extends AnyFunSuite with Matchers with PatientScalaFu
test("return running status if cancelled job has last-modification date later then running job") {
statuses = List(
JobOverview("2343", "p1", 20L, 10L, JobStatus.RUNNING.name(), tasksOverview(running = 1)),
JobOverview("1111", "p1", 30L, 5L, JobStatus.CANCELED.name(), tasksOverview(canceled = 1))
JobOverview("1111", "p1", 30L, 5L, JobStatus.CANCELED.name(), tasksOverview(canceled = 1)),
JobOverview("2222", "p1", 30L, 5L, JobStatus.CANCELLING.name(), tasksOverview(canceling = 1))
)

val manager = createManager(statuses)
Expand Down

0 comments on commit debbd48

Please sign in to comment.