Skip to content

Commit

Permalink
Sort periodic process deployments not only by runAt but also by creat…
Browse files Browse the repository at this point in the history
…edAt (#4992)

* Sort periodic process deployments not only by runAt but also by createdAt

* Add changelog entry

* Add ordering for DeploymentStatus, change changelog entry, reorder variables

* Revert .reverse instead of .findLast for crossCompile purposes

* Add comment in PeriodicProcessesRepository about ordering
  • Loading branch information
philemone authored Nov 10, 2023
1 parent ab05355 commit bea0e9a
Show file tree
Hide file tree
Showing 8 changed files with 47 additions and 17 deletions.
3 changes: 3 additions & 0 deletions docs/Changelog.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

1.12.4 (?? Oct 2023)
* [#4992](https://github.com/TouK/nussknacker/pull/4992) Fix: List of periodic deployments is now sorted not only by schedule time but also by its creation time.

1.12.3 (26 Oct 2023)
1.12.2 (25 Oct 2023)
1.12.1 (25 Oct 2023)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ class PeriodicProcessService(
// We retry scenarios that failed on deployment. Failure recovery of running scenarios should be handled by Flink's restart strategy
toBeRetried <- scheduledProcessesRepository.findToBeRetried.run
// We don't block scheduled deployments by retries
} yield toBeDeployed.sortBy(_.runAt) ++ toBeRetried.sortBy(_.nextRetryAt)
} yield toBeDeployed.sortBy(d => (d.runAt, d.createdAt)) ++ toBeRetried.sortBy(d => (d.nextRetryAt, d.createdAt))
}

// Currently we don't allow simultaneous runs of one scenario - only sequential, so if other schedule kicks in, it'll have to wait
Expand Down Expand Up @@ -420,14 +420,15 @@ class PeriodicProcessService(
DeploymentStatus(
deployment.id,
scheduleId,
deployment.createdAt,
deployment.runAt,
deployment.state.status,
scheduleData.process.active,
runtimeStatuses.getStatus(deployment.id)
)
}
}
.sortBy(_.runAt)(Ordering[LocalDateTime].reverse)
.sorted(DeploymentStatus.ordering.reverse)

for {
activeSchedules <- getLatestDeploymentsForActiveSchedules(name, MaxDeploymentsStatus)
Expand Down Expand Up @@ -497,8 +498,7 @@ object PeriodicProcessService {
def limitedAndSortedDeployments: List[DeploymentStatus] =
(activeDeploymentsStatuses ++ inactiveDeploymentsStatuses.take(
MaxDeploymentsStatus - activeDeploymentsStatuses.size
))
.sortBy(_.runAt)(Ordering[LocalDateTime].reverse)
)).sorted(DeploymentStatus.ordering.reverse)

// We present merged name to be possible to filter scenario by status
override def name: StatusName = mergedStatusDetails.status.name
Expand Down Expand Up @@ -565,8 +565,8 @@ object PeriodicProcessService {
*/
def pickMostImportantActiveDeployment: Option[DeploymentStatus] = {
val lastActiveDeploymentStatusForEachSchedule =
latestDeploymentForEachSchedule(activeDeploymentsStatuses)
.sortBy(_.runAt)(Ordering[LocalDateTime])
latestDeploymentForEachSchedule(activeDeploymentsStatuses).sorted

def first(status: PeriodicProcessDeploymentStatus) =
lastActiveDeploymentStatusForEachSchedule.find(_.status == status)

Expand All @@ -586,7 +586,7 @@ object PeriodicProcessService {
.groupBy(_.scheduleId)
.values
.toList
.map(_.sortBy(_.runAt)(Ordering[LocalDateTime].reverse).head)
.map(_.min(DeploymentStatus.ordering.reverse))
}

}
Expand All @@ -595,6 +595,7 @@ object PeriodicProcessService {
// to present to users is scheduleName+runAt
deploymentId: PeriodicProcessDeploymentId,
scheduleId: ScheduleId,
createdAt: LocalDateTime,
runAt: LocalDateTime,
// This status is almost fine but:
// - we don't have cancel status - we have to check processActive as well (isCanceled)
Expand All @@ -621,4 +622,15 @@ object PeriodicProcessService {

}

object DeploymentStatus {

implicit val ordering: Ordering[DeploymentStatus] = (self: DeploymentStatus, that: DeploymentStatus) => {
self.runAt.compareTo(that.runAt) match {
case 0 => self.createdAt.compareTo(that.createdAt)
case a => a
}
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ object PeriodicProcessStateDefinitionManager {

def statusTooltip(processStatus: PeriodicProcessStatus): String = {
processStatus.limitedAndSortedDeployments
.map { case d @ DeploymentStatus(_, scheduleId, runAt, status, _, _) =>
.map { case d @ DeploymentStatus(_, scheduleId, _, runAt, status, _, _) =>
val refinedStatus = {
if (d.isCanceled) {
"Canceled"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ object PeriodicProcessesRepository {
PeriodicProcessDeployment(
processDeploymentEntity.id,
process,
processDeploymentEntity.createdAt,
processDeploymentEntity.runAt,
ScheduleName(processDeploymentEntity.scheduleName),
processDeploymentEntity.retriesLeft,
Expand Down Expand Up @@ -309,7 +310,10 @@ class SlickPeriodicProcessesRepository(
(
rowNumber() :: Over
.partitionBy((deployment.periodicProcessId, deployment.scheduleName))
.sortBy(deployment.runAt.desc),
.sortBy(
deployment.runAt.desc,
deployment.createdAt.desc
), // Remember to change DeploymentStatus.ordering accordingly
process,
deployment
)
Expand Down Expand Up @@ -354,7 +358,7 @@ class SlickPeriodicProcessesRepository(
.filter(deployment =>
deployment.periodicProcessId === process.id && (deployment.scheduleName === scheduleName || deployment.scheduleName.isEmpty && scheduleName.isEmpty)
)
.sortBy(_.runAt.desc)
.sortBy(a => (a.runAt.desc, a.createdAt.desc)) // Remember to change DeploymentStatus.ordering accordingly
.take(deploymentsPerScheduleMaxCount)
.result
.map(_.map((process, _)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import java.time.{Clock, LocalDateTime}
case class PeriodicProcessDeployment(
id: PeriodicProcessDeploymentId,
periodicProcess: PeriodicProcess,
createdAt: LocalDateTime,
runAt: LocalDateTime,
scheduleName: ScheduleName,
retriesLeft: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,14 @@ case class ScheduleId(processId: PeriodicProcessId, scheduleName: ScheduleName)

case class ScheduleDeploymentData(
id: PeriodicProcessDeploymentId,
createdAt: LocalDateTime,
runAt: LocalDateTime,
retriesLeft: Int,
nextRetryAt: Option[LocalDateTime],
state: PeriodicProcessDeploymentState
) {
def toFullDeploymentData(process: PeriodicProcess, scheduleName: ScheduleName): PeriodicProcessDeployment =
PeriodicProcessDeployment(id, process, runAt, scheduleName, retriesLeft, nextRetryAt, state)
PeriodicProcessDeployment(id, process, createdAt, runAt, scheduleName, retriesLeft, nextRetryAt, state)

def display = s"deploymentId=$id"

Expand All @@ -61,6 +62,7 @@ object ScheduleDeploymentData {
def apply(deployment: PeriodicProcessDeploymentEntity): ScheduleDeploymentData = {
ScheduleDeploymentData(
deployment.id,
deployment.createdAt,
deployment.runAt,
deployment.retriesLeft,
deployment.nextRetryAt,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,14 @@ import java.time.LocalDateTime

object PeriodicProcessDeploymentGen {

val now: LocalDateTime = LocalDateTime.now()

def apply(): PeriodicProcessDeployment = {
PeriodicProcessDeployment(
id = PeriodicProcessDeploymentId(42),
periodicProcess = PeriodicProcessGen(),
runAt = LocalDateTime.now(),
createdAt = now.minusMinutes(10),
runAt = now,
scheduleName = ScheduleName(None),
retriesLeft = 0,
nextRetryAt = None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ class PeriodicProcessStateDefinitionManagerTest extends AnyFunSuite with Matcher

private val fooRunAt = LocalDateTime.of(2023, 1, 1, 10, 0)

private val fooCreatedAt = fooRunAt.minusMinutes(5)

private val notNamedScheduleId = ScheduleId(fooProcessId, ScheduleName(None))

private val nextDeploymentId = new AtomicLong()
Expand All @@ -32,6 +34,7 @@ class PeriodicProcessStateDefinitionManagerTest extends AnyFunSuite with Matcher
val deploymentStatus = DeploymentStatus(
generateDeploymentId,
notNamedScheduleId,
fooCreatedAt,
fooRunAt,
PeriodicProcessDeploymentStatus.Scheduled,
processActive = true,
Expand All @@ -41,29 +44,31 @@ class PeriodicProcessStateDefinitionManagerTest extends AnyFunSuite with Matcher
statusTooltip(status) shouldEqual "Scheduled at: 2023-01-01 10:00 status: Scheduled"
}

test("display periodic deployment status for named schedules") {
test("display sorted periodic deployment status for named schedules") {
val firstScheduleId = generateScheduleId
val firstDeploymentStatus = DeploymentStatus(
generateDeploymentId,
firstScheduleId,
fooCreatedAt.minusMinutes(1),
fooRunAt,
PeriodicProcessDeploymentStatus.Scheduled,
PeriodicProcessDeploymentStatus.Deployed,
processActive = true,
None
)
val secScheduleId = generateScheduleId
val secDeploymentStatus = DeploymentStatus(
generateDeploymentId,
secScheduleId,
fooCreatedAt,
fooRunAt,
PeriodicProcessDeploymentStatus.Deployed,
PeriodicProcessDeploymentStatus.Scheduled,
processActive = true,
None
)
val status = PeriodicProcessStatus(List(firstDeploymentStatus, secDeploymentStatus), List.empty)
statusTooltip(status) shouldEqual
s"""Schedule ${firstScheduleId.scheduleName.display} scheduled at: 2023-01-01 10:00 status: Scheduled,
|Schedule ${secScheduleId.scheduleName.display} scheduled at: 2023-01-01 10:00 status: Deployed""".stripMargin
s"""Schedule ${secScheduleId.scheduleName.display} scheduled at: 2023-01-01 10:00 status: Scheduled,
|Schedule ${firstScheduleId.scheduleName.display} scheduled at: 2023-01-01 10:00 status: Deployed""".stripMargin
}

private def generateDeploymentId = PeriodicProcessDeploymentId(nextDeploymentId.getAndIncrement())
Expand Down

0 comments on commit bea0e9a

Please sign in to comment.