diff --git a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/db/PeriodicProcessesRepository.scala b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/db/PeriodicProcessesRepository.scala index 9906c609bc4..24cf6c47dac 100644 --- a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/db/PeriodicProcessesRepository.scala +++ b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/db/PeriodicProcessesRepository.scala @@ -325,7 +325,11 @@ class SlickPeriodicProcessesRepository( getLatestDeploymentsForEachSchedule( processesHavingDeploymentsWithMatchingStatus, deploymentsPerScheduleMaxCount = 1 - ).map(_.values.headOption.getOrElse(SchedulesState(Map.empty))) + ).map(schedulesForProcessNames => + SchedulesState( + schedulesForProcessNames.values.map(_.schedules).foldLeft(Map.empty[ScheduleId, ScheduleData])(_ ++ _) + ) + ) } override def getLatestDeploymentsForActiveSchedules( diff --git a/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessServiceIntegrationTest.scala b/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessServiceIntegrationTest.scala index 892fba32f48..12b2daa5c41 100644 --- a/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessServiceIntegrationTest.scala +++ b/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessServiceIntegrationTest.scala @@ -235,6 +235,7 @@ class PeriodicProcessServiceIntegrationTest ) service.handleFinished.futureValue + // here we check that scenarios that not fired are still on the "toDeploy" list and finished are not on the list val toDeployAfterFinish = service.findToBeDeployed.futureValue toDeployAfterFinish.map(_.periodicProcess.processVersion.processName) should contain only every30MinutesProcessName service.deactivate(processName).futureValue @@ -270,6 +271,58 @@ class PeriodicProcessServiceIntegrationTest ) } + it should "handleFinished for all finished periodic scenarios waiting for reschedule" in withFixture() { f => + val timeToTriggerCheck = startTime.plus(2, ChronoUnit.HOURS) + var currentTime = startTime + def service = f.periodicProcessService(currentTime) + + service + .schedule( + cronEveryHour, + ProcessVersion.empty.copy(processName = ProcessName("first")), + sampleProcess, + randomProcessActionId + ) + .futureValue + service + .schedule( + cronEveryHour, + ProcessVersion.empty.copy(processName = ProcessName("second")), + sampleProcess, + randomProcessActionId + ) + .futureValue + + currentTime = timeToTriggerCheck + + // deploy all + service.findToBeDeployed.futureValue + .foreach(pp => service.deploy(pp).futureValue) + + val stateAfterDeploy = service.getLatestDeploymentsForActiveSchedules(1).futureValue + stateAfterDeploy should have size 2 + + // finish all + stateAfterDeploy.values.foreach(schedulesState => { + val deployment = schedulesState.firstScheduleData.latestDeployments.head + f.delegateDeploymentManagerStub.setStateStatus( + processName, + SimpleStateStatus.Finished, + Some(deployment.id) + ) + }) + service.handleFinished.futureValue + + // check all are rescheduled for next run + val stateAfterFinish = service.getLatestDeploymentsForActiveSchedules(1).futureValue + stateAfterFinish.values.foreach(schedulesState => { + val deployment = schedulesState.firstScheduleData.latestDeployments.head + deployment.state should matchPattern { + case PeriodicProcessDeploymentState(_, _, PeriodicProcessDeploymentStatus.Scheduled) => + } + }) + } + it should "redeploy scenarios that failed on deploy" in withFixture(deploymentRetryConfig = DeploymentRetryConfig(deployMaxRetries = 1) ) { f =>