From 85792cac0d9d2013bb5e84f5fb71a87592e82bc2 Mon Sep 17 00:00:00 2001
From: mgoworko <37329559+mgoworko@users.noreply.github.com>
Date: Fri, 7 Feb 2025 14:21:23 +0100
Subject: [PATCH] Fix scenario json representation differences in the
scheduling mechanism (#7519)
---
.run/NussknackerApp-postgres.run.xml | 1 +
..._ScenarioSchedulingTablesModifications.sql | 10 +++
.../PeriodicProcessDeploymentsTable.scala | 2 +-
.../ui/db/entity/PeriodicProcessesTable.scala | 45 +++++++----
.../periodic/PeriodicProcessService.scala | 34 ++++----
.../LegacyPeriodicProcessesRepository.scala | 22 ++++-
.../periodic/model/SchedulesState.scala | 1 +
.../DBFetchingProcessRepository.scala | 9 +--
.../FetchingProcessRepository.scala | 5 +-
.../PeriodicProcessesRepository.scala | 80 ++++++++++++-------
.../nussknacker/test/base/db/DbTesting.scala | 4 +-
.../mock/MockFetchingProcessRepository.scala | 12 ++-
.../db/InMemPeriodicProcessesRepository.scala | 22 +++--
docs/Changelog.md | 1 +
docs/MigrationGuide.md | 1 +
15 files changed, 155 insertions(+), 94 deletions(-)
create mode 100644 designer/server/src/main/resources/db/migration/common/V1_062__ScenarioSchedulingTablesModifications.sql
diff --git a/.run/NussknackerApp-postgres.run.xml b/.run/NussknackerApp-postgres.run.xml
index e32b1c3aba8..775850a6641 100644
--- a/.run/NussknackerApp-postgres.run.xml
+++ b/.run/NussknackerApp-postgres.run.xml
@@ -22,6 +22,7 @@
+
diff --git a/designer/server/src/main/resources/db/migration/common/V1_062__ScenarioSchedulingTablesModifications.sql b/designer/server/src/main/resources/db/migration/common/V1_062__ScenarioSchedulingTablesModifications.sql
new file mode 100644
index 00000000000..ddee2b0f942
--- /dev/null
+++ b/designer/server/src/main/resources/db/migration/common/V1_062__ScenarioSchedulingTablesModifications.sql
@@ -0,0 +1,10 @@
+ALTER TABLE "periodic_scenarios" RENAME TO "scheduled_scenarios";
+ALTER TABLE "scheduled_scenarios" ALTER COLUMN "input_config_during_execution" DROP NOT NULL;
+ALTER TABLE "scheduled_scenarios" ADD COLUMN "resolved_scenario_json" VARCHAR(10485760);
+ALTER TABLE "scheduled_scenarios" ADD CONSTRAINT scheduled_scenarios_check_config_and_json_present_for_active
+CHECK (
+ ("active" = TRUE AND "input_config_during_execution" IS NOT NULL AND "resolved_scenario_json" IS NOT NULL)
+ OR
+ ("active" = FALSE)
+);
+ALTER TABLE "periodic_scenario_deployments" RENAME TO "scheduled_scenario_deployments";
diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/db/entity/PeriodicProcessDeploymentsTable.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/db/entity/PeriodicProcessDeploymentsTable.scala
index 8c82157e36d..c36e85b578e 100644
--- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/db/entity/PeriodicProcessDeploymentsTable.scala
+++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/db/entity/PeriodicProcessDeploymentsTable.scala
@@ -23,7 +23,7 @@ trait PeriodicProcessDeploymentsTableFactory extends PeriodicProcessesTableFacto
MappedColumnType.base[PeriodicProcessDeploymentStatus, String](_.toString, PeriodicProcessDeploymentStatus.withName)
class PeriodicProcessDeploymentsTable(tag: Tag)
- extends Table[PeriodicProcessDeploymentEntity](tag, "periodic_scenario_deployments") {
+ extends Table[PeriodicProcessDeploymentEntity](tag, "scheduled_scenario_deployments") {
def id: Rep[PeriodicProcessDeploymentId] = column[PeriodicProcessDeploymentId]("id", O.PrimaryKey, O.AutoInc)
diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/db/entity/PeriodicProcessesTable.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/db/entity/PeriodicProcessesTable.scala
index f4d7c968a50..4a8463772cb 100644
--- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/db/entity/PeriodicProcessesTable.scala
+++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/db/entity/PeriodicProcessesTable.scala
@@ -2,9 +2,11 @@ package pl.touk.nussknacker.ui.db.entity
import io.circe.Decoder
import io.circe.syntax.EncoderOps
-import pl.touk.nussknacker.engine.api.deployment.scheduler.model.RuntimeParams
import pl.touk.nussknacker.engine.api.deployment.ProcessActionId
+import pl.touk.nussknacker.engine.api.deployment.scheduler.model.RuntimeParams
import pl.touk.nussknacker.engine.api.process.{ProcessId, ProcessName, VersionId}
+import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
+import pl.touk.nussknacker.engine.marshall.ProcessMarshaller
import pl.touk.nussknacker.ui.process.periodic.model.PeriodicProcessId
import slick.jdbc.JdbcProfile
import slick.lifted.ProvenShape
@@ -38,8 +40,14 @@ trait PeriodicProcessesTableFactory extends BaseEntityFactory {
}
)
+ implicit val canonicalProcessTypedType: BaseColumnType[CanonicalProcess] =
+ MappedColumnType.base[CanonicalProcess, String](
+ _.asJson.noSpaces,
+ ProcessMarshaller.fromJsonUnsafe
+ )
+
abstract class PeriodicProcessesTable[ENTITY <: PeriodicProcessEntity](tag: Tag)
- extends Table[ENTITY](tag, "periodic_scenarios") {
+ extends Table[ENTITY](tag, "scheduled_scenarios") {
def id: Rep[PeriodicProcessId] = column[PeriodicProcessId]("id", O.PrimaryKey, O.AutoInc)
@@ -63,12 +71,14 @@ trait PeriodicProcessesTableFactory extends BaseEntityFactory {
}
- class PeriodicProcessesWithInputConfigJsonTable(tag: Tag)
- extends PeriodicProcessesTable[PeriodicProcessEntityWithInputConfigJson](tag) {
+ class PeriodicProcessesWithDeploymentDetailsTable(tag: Tag)
+ extends PeriodicProcessesTable[PeriodicProcessEntityWithDeploymentDetails](tag) {
+
+ def inputConfigDuringExecutionJson: Rep[Option[String]] = column[Option[String]]("input_config_during_execution")
- def inputConfigDuringExecutionJson: Rep[String] = column[String]("input_config_during_execution", NotNull)
+ def resolvedScenario: Rep[Option[CanonicalProcess]] = column[Option[CanonicalProcess]]("resolved_scenario_json")
- override def * : ProvenShape[PeriodicProcessEntityWithInputConfigJson] = (
+ override def * : ProvenShape[PeriodicProcessEntityWithDeploymentDetails] = (
id,
processId,
processName,
@@ -80,14 +90,15 @@ trait PeriodicProcessesTableFactory extends BaseEntityFactory {
createdAt,
processActionId,
inputConfigDuringExecutionJson,
- ) <> (PeriodicProcessEntityWithInputConfigJson.apply _ tupled, PeriodicProcessEntityWithInputConfigJson.unapply)
+ resolvedScenario,
+ ) <> (PeriodicProcessEntityWithDeploymentDetails.apply _ tupled, PeriodicProcessEntityWithDeploymentDetails.unapply)
}
- class PeriodicProcessesWithoutInputConfigJsonTable(tag: Tag)
- extends PeriodicProcessesTable[PeriodicProcessEntityWithoutInputConfigJson](tag) {
+ class PeriodicProcessesWithoutDeploymentDetailsTable(tag: Tag)
+ extends PeriodicProcessesTable[PeriodicProcessEntityWithoutDeploymentDetails](tag) {
- override def * : ProvenShape[PeriodicProcessEntityWithoutInputConfigJson] = (
+ override def * : ProvenShape[PeriodicProcessEntityWithoutDeploymentDetails] = (
id,
processId,
processName,
@@ -98,13 +109,14 @@ trait PeriodicProcessesTableFactory extends BaseEntityFactory {
active,
createdAt,
processActionId
- ) <> (PeriodicProcessEntityWithoutInputConfigJson.apply _ tupled, PeriodicProcessEntityWithoutInputConfigJson.unapply)
+ ) <> (PeriodicProcessEntityWithoutDeploymentDetails.apply _ tupled, PeriodicProcessEntityWithoutDeploymentDetails.unapply)
}
- object PeriodicProcessesWithoutInputConfig extends TableQuery(new PeriodicProcessesWithoutInputConfigJsonTable(_))
+ object PeriodicProcessesWithoutDeploymentDetails
+ extends TableQuery(new PeriodicProcessesWithoutDeploymentDetailsTable(_))
- object PeriodicProcessesWithInputConfig extends TableQuery(new PeriodicProcessesWithInputConfigJsonTable(_))
+ object PeriodicProcessesWithDeploymentDetails extends TableQuery(new PeriodicProcessesWithDeploymentDetailsTable(_))
}
@@ -132,7 +144,7 @@ trait PeriodicProcessEntity {
}
-case class PeriodicProcessEntityWithInputConfigJson(
+case class PeriodicProcessEntityWithDeploymentDetails(
id: PeriodicProcessId,
processId: Option[ProcessId],
processName: ProcessName,
@@ -143,10 +155,11 @@ case class PeriodicProcessEntityWithInputConfigJson(
active: Boolean,
createdAt: LocalDateTime,
processActionId: Option[ProcessActionId],
- inputConfigDuringExecutionJson: String,
+ inputConfigDuringExecutionJson: Option[String],
+ resolvedScenario: Option[CanonicalProcess],
) extends PeriodicProcessEntity
-case class PeriodicProcessEntityWithoutInputConfigJson(
+case class PeriodicProcessEntityWithoutDeploymentDetails(
id: PeriodicProcessId,
processId: Option[ProcessId],
processName: ProcessName,
diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/periodic/PeriodicProcessService.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/periodic/PeriodicProcessService.scala
index 669dbb6e832..93512bb1a2f 100644
--- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/periodic/PeriodicProcessService.scala
+++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/periodic/PeriodicProcessService.scala
@@ -11,7 +11,6 @@ import pl.touk.nussknacker.engine.api.component.{
import pl.touk.nussknacker.engine.api.deployment.StateStatus.StatusName
import pl.touk.nussknacker.engine.api.deployment._
import pl.touk.nussknacker.engine.api.deployment.scheduler.model.{ScheduleProperty => _, _}
-import pl.touk.nussknacker.engine.api.deployment.scheduler.model.{ScheduleProperty => ApiScheduleProperty}
import pl.touk.nussknacker.engine.api.deployment.scheduler.services._
import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus
import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus.ProblemStateStatus
@@ -392,14 +391,9 @@ class PeriodicProcessService(
): Future[Unit] = {
logger.info(s"Marking ${deployment.display} with status: ${deployment.state.status} as finished")
for {
- _ <- periodicProcessesRepository.markFinished(deployment.id).run
- currentState <- periodicProcessesRepository.findProcessData(deployment.id).run
- canonicalProcessOpt <- periodicProcessesRepository
- .fetchCanonicalProcessWithVersion(
- processName,
- versionId
- )
- .map(_.map(_._1))
+ _ <- periodicProcessesRepository.markFinished(deployment.id).run
+ currentState <- periodicProcessesRepository.findProcessData(deployment.id).run
+ canonicalProcessOpt <- periodicProcessesRepository.fetchCanonicalProcess(deployment.periodicProcessId).run
canonicalProcess = canonicalProcessOpt.getOrElse {
throw new PeriodicProcessException(
s"Could not fetch CanonicalProcess with ProcessVersion for processName=$processName, versionId=$versionId"
@@ -494,14 +488,16 @@ class PeriodicProcessService(
)
processName = deploymentWithJarData.processName
versionId = deploymentWithJarData.versionId
- canonicalProcessWithVersionOpt <- periodicProcessesRepository
- .fetchCanonicalProcessWithVersion(
- processName,
- versionId
+ canonicalProcessOpt <- periodicProcessesRepository.fetchCanonicalProcess(deployment.periodicProcess.id).run
+ canonicalProcess = canonicalProcessOpt.getOrElse {
+ throw new PeriodicProcessException(
+ s"Could not fetch CanonicalProcess for processName=$processName, versionId=$versionId"
)
- canonicalProcessWithVersion = canonicalProcessWithVersionOpt.getOrElse {
+ }
+ processVersionOpt <- periodicProcessesRepository.fetchProcessVersion(processName, versionId)
+ processVersion = processVersionOpt.getOrElse {
throw new PeriodicProcessException(
- s"Could not fetch CanonicalProcess with ProcessVersion for processName=$processName, versionId=$versionId"
+ s"Could not fetch ProcessVersion for processName=$processName, versionId=$versionId"
)
}
inputConfigDuringExecutionJsonOpt <- periodicProcessesRepository
@@ -514,8 +510,8 @@ class PeriodicProcessService(
}
enrichedProcessConfig <- processConfigEnricher.onDeploy(
ProcessConfigEnricher.DeployData(
- canonicalProcessWithVersion._1,
- canonicalProcessWithVersion._2,
+ canonicalProcess,
+ processVersion,
inputConfigDuringExecutionJson,
deployment.toDetails
)
@@ -524,8 +520,8 @@ class PeriodicProcessService(
deploymentWithJarData,
enrichedProcessConfig.inputConfigDuringExecutionJson,
deploymentData,
- canonicalProcessWithVersion._1,
- canonicalProcessWithVersion._2,
+ canonicalProcess,
+ processVersion,
)
} yield externalDeploymentId
deploymentAction
diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/periodic/legacy/db/LegacyPeriodicProcessesRepository.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/periodic/legacy/db/LegacyPeriodicProcessesRepository.scala
index a74e5e29ce3..269656257f2 100644
--- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/periodic/legacy/db/LegacyPeriodicProcessesRepository.scala
+++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/periodic/legacy/db/LegacyPeriodicProcessesRepository.scala
@@ -412,11 +412,26 @@ class SlickLegacyPeriodicProcessesRepository(
update.map(_ => ())
}
- override def fetchCanonicalProcessWithVersion(
+ override def fetchCanonicalProcess(
+ periodicProcessId: PeriodicProcessId,
+ ): Action[Option[CanonicalProcess]] = {
+ PeriodicProcessesWithJson
+ .filter(p => p.id === periodicProcessId)
+ .result
+ .headOption
+ .map(_.map(_.processJson))
+ }
+
+ override def fetchProcessVersion(
processName: ProcessName,
versionId: VersionId
- ): Future[Option[(CanonicalProcess, ProcessVersion)]] =
- fetchingProcessRepository.getCanonicalProcessWithVersion(processName, versionId)(NussknackerInternalUser.instance)
+ ): Future[Option[ProcessVersion]] = {
+ fetchingProcessRepository
+ .getProcessVersion(
+ processName,
+ versionId
+ )(NussknackerInternalUser.instance)
+ }
def fetchInputConfigDuringExecutionJson(periodicProcessId: PeriodicProcessId): Action[Option[String]] =
PeriodicProcessesWithJson
@@ -462,6 +477,7 @@ class SlickLegacyPeriodicProcessesRepository(
private def scheduleDeploymentData(deployment: PeriodicProcessDeploymentEntity): ScheduleDeploymentData = {
ScheduleDeploymentData(
deployment.id,
+ deployment.periodicProcessId,
deployment.createdAt,
deployment.runAt,
deployment.deployedAt,
diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/periodic/model/SchedulesState.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/periodic/model/SchedulesState.scala
index b4161cffe46..27a6bb02d77 100644
--- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/periodic/model/SchedulesState.scala
+++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/periodic/model/SchedulesState.scala
@@ -43,6 +43,7 @@ case class ScheduleId(processId: PeriodicProcessId, scheduleName: ScheduleName)
case class ScheduleDeploymentData(
id: PeriodicProcessDeploymentId,
+ periodicProcessId: PeriodicProcessId,
createdAt: LocalDateTime,
runAt: LocalDateTime,
deployedAt: Option[LocalDateTime],
diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/repository/DBFetchingProcessRepository.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/repository/DBFetchingProcessRepository.scala
index 57bc3b84ea9..7ff3cb06441 100644
--- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/repository/DBFetchingProcessRepository.scala
+++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/repository/DBFetchingProcessRepository.scala
@@ -54,19 +54,16 @@ abstract class DBFetchingProcessRepository[F[_]: Monad](
import api._
- override def getCanonicalProcessWithVersion(
+ override def getProcessVersion(
processName: ProcessName,
versionId: VersionId
)(
implicit user: LoggedUser,
- ): F[Option[(CanonicalProcess, ProcessVersion)]] = {
+ ): F[Option[ProcessVersion]] = {
val result = for {
processId <- OptionT(fetchProcessId(processName))
details <- OptionT(fetchProcessDetailsForId[CanonicalProcess](processId, versionId))
- } yield (
- details.json,
- details.toEngineProcessVersion,
- )
+ } yield details.toEngineProcessVersion
result.value
}
diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/repository/FetchingProcessRepository.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/repository/FetchingProcessRepository.scala
index 92da4cff33c..f92a9b9fb62 100644
--- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/repository/FetchingProcessRepository.scala
+++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/repository/FetchingProcessRepository.scala
@@ -3,7 +3,6 @@ package pl.touk.nussknacker.ui.process.repository
import cats.Monad
import pl.touk.nussknacker.engine.api.ProcessVersion
import pl.touk.nussknacker.engine.api.process._
-import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
import pl.touk.nussknacker.ui.process.ScenarioQuery
import pl.touk.nussknacker.ui.security.api.LoggedUser
@@ -29,12 +28,12 @@ abstract class FetchingProcessRepository[F[_]: Monad] extends ProcessDBQueryRepo
query: ScenarioQuery
)(implicit loggedUser: LoggedUser, ec: ExecutionContext): F[List[PS]]
- def getCanonicalProcessWithVersion(
+ def getProcessVersion(
processName: ProcessName,
versionId: VersionId
)(
implicit user: LoggedUser,
- ): F[Option[(CanonicalProcess, ProcessVersion)]]
+ ): F[Option[ProcessVersion]]
def fetchProcessId(processName: ProcessName)(implicit ec: ExecutionContext): F[Option[ProcessId]]
diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/repository/PeriodicProcessesRepository.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/repository/PeriodicProcessesRepository.scala
index bd653a5a53a..b23eceeb86f 100644
--- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/repository/PeriodicProcessesRepository.scala
+++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/repository/PeriodicProcessesRepository.scala
@@ -3,7 +3,6 @@ package pl.touk.nussknacker.ui.process.repository
import com.github.tminglei.slickpg.ExPostgresProfile
import com.typesafe.scalalogging.LazyLogging
import db.util.DBIOActionInstances
-import db.util.DBIOActionInstances.DB
import io.circe.parser.decode
import io.circe.syntax.EncoderOps
import pl.touk.nussknacker.engine.api.ProcessVersion
@@ -160,10 +159,14 @@ trait PeriodicProcessesRepository {
periodicProcessId: PeriodicProcessId,
): Action[Option[String]]
- def fetchCanonicalProcessWithVersion(
+ def fetchCanonicalProcess(
+ periodicProcessId: PeriodicProcessId,
+ ): Action[Option[CanonicalProcess]]
+
+ def fetchProcessVersion(
processName: ProcessName,
versionId: VersionId
- ): Future[Option[(CanonicalProcess, ProcessVersion)]]
+ ): Future[Option[ProcessVersion]]
}
@@ -189,7 +192,7 @@ class SlickPeriodicProcessesRepository(
scenarioName: ProcessName,
afterOpt: Option[LocalDateTime],
): Action[SchedulesState] = {
- PeriodicProcessesWithoutInputConfig
+ PeriodicProcessesWithoutDeploymentDetails
.filter(_.processName === scenarioName)
.join(PeriodicProcessDeployments)
.on(_.id === _.periodicProcessId)
@@ -205,7 +208,7 @@ class SlickPeriodicProcessesRepository(
scheduleProperty: ScheduleProperty,
processActionId: ProcessActionId,
): Action[PeriodicProcess] = {
- val processEntity = PeriodicProcessEntityWithInputConfigJson(
+ val processEntity = PeriodicProcessEntityWithDeploymentDetails(
id = PeriodicProcessId(-1),
processId = deploymentWithRuntimeParams.processId,
processName = deploymentWithRuntimeParams.processName,
@@ -216,9 +219,10 @@ class SlickPeriodicProcessesRepository(
active = true,
createdAt = now(),
Some(processActionId),
- inputConfigDuringExecutionJson = inputConfigDuringExecutionJson,
+ inputConfigDuringExecutionJson = Some(inputConfigDuringExecutionJson),
+ resolvedScenario = Some(canonicalProcess)
)
- ((PeriodicProcessesWithInputConfig returning PeriodicProcessesWithInputConfig into ((_, id) =>
+ ((PeriodicProcessesWithDeploymentDetails returning PeriodicProcessesWithDeploymentDetails into ((_, id) =>
id
)) += processEntity)
.map(PeriodicProcessesRepository.createPeriodicProcess)
@@ -246,8 +250,8 @@ class SlickPeriodicProcessesRepository(
private def findProcesses(
query: Query[
- (PeriodicProcessesWithoutInputConfigJsonTable, PeriodicProcessDeploymentsTable),
- (PeriodicProcessEntityWithoutInputConfigJson, PeriodicProcessDeploymentEntity),
+ (PeriodicProcessesWithoutDeploymentDetailsTable, PeriodicProcessDeploymentsTable),
+ (PeriodicProcessEntityWithoutDeploymentDetails, PeriodicProcessDeploymentEntity),
Seq
]
) = {
@@ -262,7 +266,7 @@ class SlickPeriodicProcessesRepository(
override def findProcessData(id: PeriodicProcessDeploymentId): Action[PeriodicProcessDeployment] =
findProcesses(
- (PeriodicProcessesWithoutInputConfig join PeriodicProcessDeployments on (_.id === _.periodicProcessId))
+ (PeriodicProcessesWithoutDeploymentDetails join PeriodicProcessDeployments on (_.id === _.periodicProcessId))
.filter { case (_, deployment) => deployment.id === id }
).map(_.head)
@@ -309,7 +313,7 @@ class SlickPeriodicProcessesRepository(
override def findActiveSchedulesForProcessesHavingDeploymentWithMatchingStatus(
expectedDeploymentStatuses: Set[PeriodicProcessDeploymentStatus],
): Action[SchedulesState] = {
- val processesHavingDeploymentsWithMatchingStatus = PeriodicProcessesWithoutInputConfig.filter(p =>
+ val processesHavingDeploymentsWithMatchingStatus = PeriodicProcessesWithoutDeploymentDetails.filter(p =>
p.active &&
PeriodicProcessDeployments
.filter(d => d.periodicProcessId === p.id && d.status.inSet(expectedDeploymentStatuses))
@@ -330,7 +334,7 @@ class SlickPeriodicProcessesRepository(
deploymentsPerScheduleMaxCount: Int,
): Action[SchedulesState] = {
val activeProcessesQuery =
- PeriodicProcessesWithoutInputConfig.filter(p => p.processName === processName && p.active)
+ PeriodicProcessesWithoutDeploymentDetails.filter(p => p.processName === processName && p.active)
getLatestDeploymentsForEachSchedule(activeProcessesQuery, deploymentsPerScheduleMaxCount)
.map(_.getOrElse(processName, SchedulesState(Map.empty)))
}
@@ -338,7 +342,7 @@ class SlickPeriodicProcessesRepository(
override def getLatestDeploymentsForActiveSchedules(
deploymentsPerScheduleMaxCount: Int,
): Action[Map[ProcessName, SchedulesState]] = {
- val activeProcessesQuery = PeriodicProcessesWithoutInputConfig.filter(_.active)
+ val activeProcessesQuery = PeriodicProcessesWithoutDeploymentDetails.filter(_.active)
getLatestDeploymentsForEachSchedule(activeProcessesQuery, deploymentsPerScheduleMaxCount)
}
@@ -347,7 +351,7 @@ class SlickPeriodicProcessesRepository(
inactiveProcessesMaxCount: Int,
deploymentsPerScheduleMaxCount: Int,
): Action[SchedulesState] = {
- val filteredProcessesQuery = PeriodicProcessesWithoutInputConfig
+ val filteredProcessesQuery = PeriodicProcessesWithoutDeploymentDetails
.filter(p => p.processName === processName && !p.active)
.sortBy(_.createdAt.desc)
.take(inactiveProcessesMaxCount)
@@ -359,7 +363,7 @@ class SlickPeriodicProcessesRepository(
inactiveProcessesMaxCount: Int,
deploymentsPerScheduleMaxCount: Int,
): Action[Map[ProcessName, SchedulesState]] = {
- val filteredProcessesQuery = PeriodicProcessesWithoutInputConfig
+ val filteredProcessesQuery = PeriodicProcessesWithoutDeploymentDetails
.filter(!_.active)
.sortBy(_.createdAt.desc)
.take(inactiveProcessesMaxCount)
@@ -368,8 +372,8 @@ class SlickPeriodicProcessesRepository(
private def getLatestDeploymentsForEachSchedule(
periodicProcessesQuery: Query[
- PeriodicProcessesWithoutInputConfigJsonTable,
- PeriodicProcessEntityWithoutInputConfigJson,
+ PeriodicProcessesWithoutDeploymentDetailsTable,
+ PeriodicProcessEntityWithoutDeploymentDetails,
Seq
],
deploymentsPerScheduleMaxCount: Int,
@@ -386,8 +390,8 @@ class SlickPeriodicProcessesRepository(
private def getLatestDeploymentsForEachSchedulePostgres(
periodicProcessesQuery: Query[
- PeriodicProcessesWithoutInputConfigJsonTable,
- PeriodicProcessEntityWithoutInputConfigJson,
+ PeriodicProcessesWithoutDeploymentDetailsTable,
+ PeriodicProcessEntityWithoutDeploymentDetails,
Seq
],
deploymentsPerScheduleMaxCount: Int
@@ -424,8 +428,8 @@ class SlickPeriodicProcessesRepository(
// with foreign key to periodic_process and with schedule_name column - it would reduce number of queries
private def getLatestDeploymentsForEachScheduleJdbcGeneric(
periodicProcessesQuery: Query[
- PeriodicProcessesWithoutInputConfigJsonTable,
- PeriodicProcessEntityWithoutInputConfigJson,
+ PeriodicProcessesWithoutDeploymentDetailsTable,
+ PeriodicProcessEntityWithoutDeploymentDetails,
Seq
],
deploymentsPerScheduleMaxCount: Int
@@ -489,21 +493,22 @@ class SlickPeriodicProcessesRepository(
override def markInactive(processId: PeriodicProcessId): Action[Unit] = {
val q = for {
- p <- PeriodicProcessesWithoutInputConfig if p.id === processId
- } yield p.active
- val update = q.update(false)
+ p <- PeriodicProcessesWithDeploymentDetails if p.id === processId
+ } yield (p.active, p.inputConfigDuringExecutionJson, p.resolvedScenario)
+ val update = q.update(false, None, None)
update.map(_ => ())
}
def fetchInputConfigDuringExecutionJson(periodicProcessId: PeriodicProcessId): Action[Option[String]] =
- PeriodicProcessesWithInputConfig
+ PeriodicProcessesWithDeploymentDetails
.filter(p => p.id === periodicProcessId)
.map(_.inputConfigDuringExecutionJson)
.result
.headOption
+ .map(_.flatten)
private def activePeriodicProcessWithDeploymentQuery(processingType: String) = {
- (PeriodicProcessesWithoutInputConfig.filter(p => p.active === true && p.processingType === processingType)
+ (PeriodicProcessesWithoutDeploymentDetails.filter(p => p.active === true && p.processingType === processingType)
join PeriodicProcessDeployments on (_.id === _.periodicProcessId))
}
@@ -539,6 +544,7 @@ class SlickPeriodicProcessesRepository(
private def scheduleDeploymentData(deployment: PeriodicProcessDeploymentEntity): ScheduleDeploymentData = {
ScheduleDeploymentData(
deployment.id,
+ deployment.periodicProcessId,
deployment.createdAt,
deployment.runAt,
deployment.deployedAt,
@@ -548,10 +554,26 @@ class SlickPeriodicProcessesRepository(
)
}
- override def fetchCanonicalProcessWithVersion(
+ override def fetchCanonicalProcess(
+ periodicProcessId: PeriodicProcessId,
+ ): Action[Option[CanonicalProcess]] = {
+ PeriodicProcessesWithDeploymentDetails
+ .filter(p => p.id === periodicProcessId)
+ .map(_.resolvedScenario)
+ .result
+ .headOption
+ .map(_.flatten)
+ }
+
+ override def fetchProcessVersion(
processName: ProcessName,
versionId: VersionId
- ): Future[Option[(CanonicalProcess, ProcessVersion)]] =
- fetchingProcessRepository.getCanonicalProcessWithVersion(processName, versionId)(NussknackerInternalUser.instance)
+ ): Future[Option[ProcessVersion]] = {
+ fetchingProcessRepository
+ .getProcessVersion(
+ processName,
+ versionId
+ )(NussknackerInternalUser.instance)
+ }
}
diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/test/base/db/DbTesting.scala b/designer/server/src/test/scala/pl/touk/nussknacker/test/base/db/DbTesting.scala
index ce16e649949..dc025503e88 100644
--- a/designer/server/src/test/scala/pl/touk/nussknacker/test/base/db/DbTesting.scala
+++ b/designer/server/src/test/scala/pl/touk/nussknacker/test/base/db/DbTesting.scala
@@ -93,8 +93,8 @@ trait DbTesting extends BeforeAndAfterEach with BeforeAndAfterAll {
session.prepareStatement("""delete from "environments"""").execute()
session.prepareStatement("""delete from "processes"""").execute()
session.prepareStatement("""delete from "fingerprints"""").execute()
- session.prepareStatement("""delete from "periodic_scenarios"""").execute()
- session.prepareStatement("""delete from "periodic_scenario_deployments"""").execute()
+ session.prepareStatement("""delete from "scheduled_scenarios"""").execute()
+ session.prepareStatement("""delete from "scheduled_scenario_deployments"""").execute()
}
}
diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/test/mock/MockFetchingProcessRepository.scala b/designer/server/src/test/scala/pl/touk/nussknacker/test/mock/MockFetchingProcessRepository.scala
index f49136199df..6bc0ea06d1a 100644
--- a/designer/server/src/test/scala/pl/touk/nussknacker/test/mock/MockFetchingProcessRepository.scala
+++ b/designer/server/src/test/scala/pl/touk/nussknacker/test/mock/MockFetchingProcessRepository.scala
@@ -45,16 +45,14 @@ class MockFetchingProcessRepository private (
extends FetchingProcessRepository[Future]
with BasicRepository {
- override def getCanonicalProcessWithVersion(processName: ProcessName, versionId: VersionId)(
- implicit user: LoggedUser
- ): Future[Option[(CanonicalProcess, ProcessVersion)]] = {
+ override def getProcessVersion(
+ processName: ProcessName,
+ versionId: VersionId
+ )(implicit user: LoggedUser): Future[Option[ProcessVersion]] = {
val result = for {
processId <- OptionT(fetchProcessId(processName))
details <- OptionT(fetchProcessDetailsForId[CanonicalProcess](processId, versionId))
- } yield (
- details.json,
- details.toEngineProcessVersion,
- )
+ } yield details.toEngineProcessVersion
result.value
}
diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/periodic/flink/db/InMemPeriodicProcessesRepository.scala b/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/periodic/flink/db/InMemPeriodicProcessesRepository.scala
index fff0dba9f50..9ed5d7d3bae 100644
--- a/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/periodic/flink/db/InMemPeriodicProcessesRepository.scala
+++ b/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/periodic/flink/db/InMemPeriodicProcessesRepository.scala
@@ -2,14 +2,13 @@ package pl.touk.nussknacker.ui.process.periodic.flink.db
import io.circe.syntax.EncoderOps
import pl.touk.nussknacker.engine.api.ProcessVersion
-import pl.touk.nussknacker.engine.api.deployment.scheduler.model.{DeploymentWithRuntimeParams, RuntimeParams}
import pl.touk.nussknacker.engine.api.deployment.ProcessActionId
+import pl.touk.nussknacker.engine.api.deployment.scheduler.model.{DeploymentWithRuntimeParams, RuntimeParams}
import pl.touk.nussknacker.engine.api.process.{ProcessId, ProcessName, VersionId}
import pl.touk.nussknacker.engine.build.ScenarioBuilder
import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
import pl.touk.nussknacker.ui.process.periodic._
import pl.touk.nussknacker.ui.process.periodic.flink.db.InMemPeriodicProcessesRepository._
-import pl.touk.nussknacker.ui.process.periodic.flink.db.InMemPeriodicProcessesRepository.getLatestDeploymentQueryCount
import pl.touk.nussknacker.ui.process.periodic.model.PeriodicProcessDeploymentStatus.PeriodicProcessDeploymentStatus
import pl.touk.nussknacker.ui.process.periodic.model._
import pl.touk.nussknacker.ui.process.repository.PeriodicProcessesRepository
@@ -31,9 +30,9 @@ class InMemPeriodicProcessesRepository(processingType: String) extends PeriodicP
var processEntities: mutable.ListBuffer[TestPeriodicProcessEntity] = ListBuffer.empty
var deploymentEntities: mutable.ListBuffer[TestPeriodicProcessDeploymentEntity] = ListBuffer.empty
- private def canonicalProcess(processName: ProcessName) = {
+ private val canonicalProcess = {
ScenarioBuilder
- .streaming(processName.value)
+ .streaming("test")
.source("start", "source")
.emptySink("end", "KafkaSink")
}
@@ -346,11 +345,17 @@ class InMemPeriodicProcessesRepository(processingType: String) extends PeriodicP
Future.successful(deployments.filter(d => d.runAt.isBefore(now) || d.runAt.isEqual(now)))
}
- override def fetchCanonicalProcessWithVersion(
+ override def fetchCanonicalProcess(
+ periodicProcessId: PeriodicProcessId,
+ ): Future[Option[CanonicalProcess]] = Future.successful {
+ Some(canonicalProcess)
+ }
+
+ override def fetchProcessVersion(
processName: ProcessName,
- versionId: VersionId
- ): Future[Option[(CanonicalProcess, ProcessVersion)]] = Future.successful {
- Some(canonicalProcess(processName), ProcessVersion.empty)
+ versionId: VersionId,
+ ): Future[Option[ProcessVersion]] = Future.successful {
+ Some(ProcessVersion.empty)
}
override def fetchInputConfigDuringExecutionJson(
@@ -469,6 +474,7 @@ object InMemPeriodicProcessesRepository {
private def scheduleDeploymentData(deployment: TestPeriodicProcessDeploymentEntity): ScheduleDeploymentData = {
ScheduleDeploymentData(
deployment.id,
+ deployment.periodicProcessId,
deployment.createdAt,
deployment.runAt,
deployment.deployedAt,
diff --git a/docs/Changelog.md b/docs/Changelog.md
index dce85b70557..c3904f1ea4f 100644
--- a/docs/Changelog.md
+++ b/docs/Changelog.md
@@ -59,6 +59,7 @@
* [#7364](https://github.com/TouK/nussknacker/pull/7364) PeriodicDeploymentManger is no longer a separate DM, but instead is an optional functionality and decorator for all DMs
* in order to use it, DM must implement interface `schedulingSupported`, that handles deployments on a specific engine
* implementation provided for Flink DM
+ * additional, necessary, db schema changes concerning the periodic/scheduling mechanism introduced in [#7519](https://github.com/TouK/nussknacker/pull/7519)
* [#7443](https://github.com/TouK/nussknacker/pull/7443) Indexing on record is more similar to indexing on map. The change lets us access record values dynamically. For example now spel expression "{a: 5, b: 10}[#input.field]" compiles and has type "Integer" inferred from types of values of the record. This lets us access record value based on user input, for instance if user passes "{"field": "b"}" to scenario we will get value "10", whereas input {"field": "c"} would result in "null". Expression "{a: 5}["b"]" still does not compile because it is known at compile time that record does not have property "b".
* [#7324](https://github.com/TouK/nussknacker/pull/7324) Fix: Passing Flink Job Global Params
* [#7335](https://github.com/TouK/nussknacker/pull/7335) introduced `managersDirs` config to configure deployment managers directory paths (you can use `MANAGERS_DIR` env in case of docker-based deployments). The default is `./managers`.
diff --git a/docs/MigrationGuide.md b/docs/MigrationGuide.md
index 1821c9a9a3a..9cb2ae11a27 100644
--- a/docs/MigrationGuide.md
+++ b/docs/MigrationGuide.md
@@ -39,6 +39,7 @@ To see the biggest differences please consult the [changelog](Changelog.md).
If there were any custom actions defined in some custom DeploymentManager implementation,
they should be modified to use the predefined set of actions or otherwise replaced by custom links and handled outside Nussknacker.
* [#7364](https://github.com/TouK/nussknacker/pull/7364)
+ * additional, necessary, db schema changes concerning the periodic/scheduling mechanism introduced in [#7519](https://github.com/TouK/nussknacker/pull/7519)
* the PeriodicDeploymentManager is no longer a separate DM type
* in `scenarioTypes` config section, the `deploymentConfig` of a periodic scenario type (only Flink was supported so far) may have looked like that:
```hocon