Skip to content

Commit

Permalink
Fix scenario json representation differences in the scheduling mechan…
Browse files Browse the repository at this point in the history
…ism (#7519)
  • Loading branch information
mgoworko authored Feb 7, 2025
1 parent caf7850 commit 85792ca
Show file tree
Hide file tree
Showing 15 changed files with 155 additions and 94 deletions.
1 change: 1 addition & 0 deletions .run/NussknackerApp-postgres.run.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
<env name="DB_USER" value="nu" />
<env name="DB_PASSWORD" value="nupassword" />
<env name="TABLES_DEFINITION_FILE" value="../../../nussknacker-dist/src/universal/conf/dev-tables-definition.sql" />
<env name="MANAGERS_DIR" value="managers" />
</envs>
<option name="INCLUDE_PROVIDED_SCOPE" value="true" />
<option name="MAIN_CLASS_NAME" value="pl.touk.nussknacker.ui.NussknackerApp" />
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
ALTER TABLE "periodic_scenarios" RENAME TO "scheduled_scenarios";
ALTER TABLE "scheduled_scenarios" ALTER COLUMN "input_config_during_execution" DROP NOT NULL;
ALTER TABLE "scheduled_scenarios" ADD COLUMN "resolved_scenario_json" VARCHAR(10485760);
ALTER TABLE "scheduled_scenarios" ADD CONSTRAINT scheduled_scenarios_check_config_and_json_present_for_active
CHECK (
("active" = TRUE AND "input_config_during_execution" IS NOT NULL AND "resolved_scenario_json" IS NOT NULL)
OR
("active" = FALSE)
);
ALTER TABLE "periodic_scenario_deployments" RENAME TO "scheduled_scenario_deployments";
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ trait PeriodicProcessDeploymentsTableFactory extends PeriodicProcessesTableFacto
MappedColumnType.base[PeriodicProcessDeploymentStatus, String](_.toString, PeriodicProcessDeploymentStatus.withName)

class PeriodicProcessDeploymentsTable(tag: Tag)
extends Table[PeriodicProcessDeploymentEntity](tag, "periodic_scenario_deployments") {
extends Table[PeriodicProcessDeploymentEntity](tag, "scheduled_scenario_deployments") {

def id: Rep[PeriodicProcessDeploymentId] = column[PeriodicProcessDeploymentId]("id", O.PrimaryKey, O.AutoInc)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package pl.touk.nussknacker.ui.db.entity

import io.circe.Decoder
import io.circe.syntax.EncoderOps
import pl.touk.nussknacker.engine.api.deployment.scheduler.model.RuntimeParams
import pl.touk.nussknacker.engine.api.deployment.ProcessActionId
import pl.touk.nussknacker.engine.api.deployment.scheduler.model.RuntimeParams
import pl.touk.nussknacker.engine.api.process.{ProcessId, ProcessName, VersionId}
import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
import pl.touk.nussknacker.engine.marshall.ProcessMarshaller
import pl.touk.nussknacker.ui.process.periodic.model.PeriodicProcessId
import slick.jdbc.JdbcProfile
import slick.lifted.ProvenShape
Expand Down Expand Up @@ -38,8 +40,14 @@ trait PeriodicProcessesTableFactory extends BaseEntityFactory {
}
)

implicit val canonicalProcessTypedType: BaseColumnType[CanonicalProcess] =
MappedColumnType.base[CanonicalProcess, String](
_.asJson.noSpaces,
ProcessMarshaller.fromJsonUnsafe
)

abstract class PeriodicProcessesTable[ENTITY <: PeriodicProcessEntity](tag: Tag)
extends Table[ENTITY](tag, "periodic_scenarios") {
extends Table[ENTITY](tag, "scheduled_scenarios") {

def id: Rep[PeriodicProcessId] = column[PeriodicProcessId]("id", O.PrimaryKey, O.AutoInc)

Expand All @@ -63,12 +71,14 @@ trait PeriodicProcessesTableFactory extends BaseEntityFactory {

}

class PeriodicProcessesWithInputConfigJsonTable(tag: Tag)
extends PeriodicProcessesTable[PeriodicProcessEntityWithInputConfigJson](tag) {
class PeriodicProcessesWithDeploymentDetailsTable(tag: Tag)
extends PeriodicProcessesTable[PeriodicProcessEntityWithDeploymentDetails](tag) {

def inputConfigDuringExecutionJson: Rep[Option[String]] = column[Option[String]]("input_config_during_execution")

def inputConfigDuringExecutionJson: Rep[String] = column[String]("input_config_during_execution", NotNull)
def resolvedScenario: Rep[Option[CanonicalProcess]] = column[Option[CanonicalProcess]]("resolved_scenario_json")

override def * : ProvenShape[PeriodicProcessEntityWithInputConfigJson] = (
override def * : ProvenShape[PeriodicProcessEntityWithDeploymentDetails] = (
id,
processId,
processName,
Expand All @@ -80,14 +90,15 @@ trait PeriodicProcessesTableFactory extends BaseEntityFactory {
createdAt,
processActionId,
inputConfigDuringExecutionJson,
) <> (PeriodicProcessEntityWithInputConfigJson.apply _ tupled, PeriodicProcessEntityWithInputConfigJson.unapply)
resolvedScenario,
) <> (PeriodicProcessEntityWithDeploymentDetails.apply _ tupled, PeriodicProcessEntityWithDeploymentDetails.unapply)

}

class PeriodicProcessesWithoutInputConfigJsonTable(tag: Tag)
extends PeriodicProcessesTable[PeriodicProcessEntityWithoutInputConfigJson](tag) {
class PeriodicProcessesWithoutDeploymentDetailsTable(tag: Tag)
extends PeriodicProcessesTable[PeriodicProcessEntityWithoutDeploymentDetails](tag) {

override def * : ProvenShape[PeriodicProcessEntityWithoutInputConfigJson] = (
override def * : ProvenShape[PeriodicProcessEntityWithoutDeploymentDetails] = (
id,
processId,
processName,
Expand All @@ -98,13 +109,14 @@ trait PeriodicProcessesTableFactory extends BaseEntityFactory {
active,
createdAt,
processActionId
) <> (PeriodicProcessEntityWithoutInputConfigJson.apply _ tupled, PeriodicProcessEntityWithoutInputConfigJson.unapply)
) <> (PeriodicProcessEntityWithoutDeploymentDetails.apply _ tupled, PeriodicProcessEntityWithoutDeploymentDetails.unapply)

}

object PeriodicProcessesWithoutInputConfig extends TableQuery(new PeriodicProcessesWithoutInputConfigJsonTable(_))
object PeriodicProcessesWithoutDeploymentDetails
extends TableQuery(new PeriodicProcessesWithoutDeploymentDetailsTable(_))

object PeriodicProcessesWithInputConfig extends TableQuery(new PeriodicProcessesWithInputConfigJsonTable(_))
object PeriodicProcessesWithDeploymentDetails extends TableQuery(new PeriodicProcessesWithDeploymentDetailsTable(_))

}

Expand Down Expand Up @@ -132,7 +144,7 @@ trait PeriodicProcessEntity {

}

case class PeriodicProcessEntityWithInputConfigJson(
case class PeriodicProcessEntityWithDeploymentDetails(
id: PeriodicProcessId,
processId: Option[ProcessId],
processName: ProcessName,
Expand All @@ -143,10 +155,11 @@ case class PeriodicProcessEntityWithInputConfigJson(
active: Boolean,
createdAt: LocalDateTime,
processActionId: Option[ProcessActionId],
inputConfigDuringExecutionJson: String,
inputConfigDuringExecutionJson: Option[String],
resolvedScenario: Option[CanonicalProcess],
) extends PeriodicProcessEntity

case class PeriodicProcessEntityWithoutInputConfigJson(
case class PeriodicProcessEntityWithoutDeploymentDetails(
id: PeriodicProcessId,
processId: Option[ProcessId],
processName: ProcessName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import pl.touk.nussknacker.engine.api.component.{
import pl.touk.nussknacker.engine.api.deployment.StateStatus.StatusName
import pl.touk.nussknacker.engine.api.deployment._
import pl.touk.nussknacker.engine.api.deployment.scheduler.model.{ScheduleProperty => _, _}
import pl.touk.nussknacker.engine.api.deployment.scheduler.model.{ScheduleProperty => ApiScheduleProperty}
import pl.touk.nussknacker.engine.api.deployment.scheduler.services._
import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus
import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus.ProblemStateStatus
Expand Down Expand Up @@ -392,14 +391,9 @@ class PeriodicProcessService(
): Future[Unit] = {
logger.info(s"Marking ${deployment.display} with status: ${deployment.state.status} as finished")
for {
_ <- periodicProcessesRepository.markFinished(deployment.id).run
currentState <- periodicProcessesRepository.findProcessData(deployment.id).run
canonicalProcessOpt <- periodicProcessesRepository
.fetchCanonicalProcessWithVersion(
processName,
versionId
)
.map(_.map(_._1))
_ <- periodicProcessesRepository.markFinished(deployment.id).run
currentState <- periodicProcessesRepository.findProcessData(deployment.id).run
canonicalProcessOpt <- periodicProcessesRepository.fetchCanonicalProcess(deployment.periodicProcessId).run
canonicalProcess = canonicalProcessOpt.getOrElse {
throw new PeriodicProcessException(
s"Could not fetch CanonicalProcess with ProcessVersion for processName=$processName, versionId=$versionId"
Expand Down Expand Up @@ -494,14 +488,16 @@ class PeriodicProcessService(
)
processName = deploymentWithJarData.processName
versionId = deploymentWithJarData.versionId
canonicalProcessWithVersionOpt <- periodicProcessesRepository
.fetchCanonicalProcessWithVersion(
processName,
versionId
canonicalProcessOpt <- periodicProcessesRepository.fetchCanonicalProcess(deployment.periodicProcess.id).run
canonicalProcess = canonicalProcessOpt.getOrElse {
throw new PeriodicProcessException(
s"Could not fetch CanonicalProcess for processName=$processName, versionId=$versionId"
)
canonicalProcessWithVersion = canonicalProcessWithVersionOpt.getOrElse {
}
processVersionOpt <- periodicProcessesRepository.fetchProcessVersion(processName, versionId)
processVersion = processVersionOpt.getOrElse {
throw new PeriodicProcessException(
s"Could not fetch CanonicalProcess with ProcessVersion for processName=$processName, versionId=$versionId"
s"Could not fetch ProcessVersion for processName=$processName, versionId=$versionId"
)
}
inputConfigDuringExecutionJsonOpt <- periodicProcessesRepository
Expand All @@ -514,8 +510,8 @@ class PeriodicProcessService(
}
enrichedProcessConfig <- processConfigEnricher.onDeploy(
ProcessConfigEnricher.DeployData(
canonicalProcessWithVersion._1,
canonicalProcessWithVersion._2,
canonicalProcess,
processVersion,
inputConfigDuringExecutionJson,
deployment.toDetails
)
Expand All @@ -524,8 +520,8 @@ class PeriodicProcessService(
deploymentWithJarData,
enrichedProcessConfig.inputConfigDuringExecutionJson,
deploymentData,
canonicalProcessWithVersion._1,
canonicalProcessWithVersion._2,
canonicalProcess,
processVersion,
)
} yield externalDeploymentId
deploymentAction
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -412,11 +412,26 @@ class SlickLegacyPeriodicProcessesRepository(
update.map(_ => ())
}

override def fetchCanonicalProcessWithVersion(
override def fetchCanonicalProcess(
periodicProcessId: PeriodicProcessId,
): Action[Option[CanonicalProcess]] = {
PeriodicProcessesWithJson
.filter(p => p.id === periodicProcessId)
.result
.headOption
.map(_.map(_.processJson))
}

override def fetchProcessVersion(
processName: ProcessName,
versionId: VersionId
): Future[Option[(CanonicalProcess, ProcessVersion)]] =
fetchingProcessRepository.getCanonicalProcessWithVersion(processName, versionId)(NussknackerInternalUser.instance)
): Future[Option[ProcessVersion]] = {
fetchingProcessRepository
.getProcessVersion(
processName,
versionId
)(NussknackerInternalUser.instance)
}

def fetchInputConfigDuringExecutionJson(periodicProcessId: PeriodicProcessId): Action[Option[String]] =
PeriodicProcessesWithJson
Expand Down Expand Up @@ -462,6 +477,7 @@ class SlickLegacyPeriodicProcessesRepository(
private def scheduleDeploymentData(deployment: PeriodicProcessDeploymentEntity): ScheduleDeploymentData = {
ScheduleDeploymentData(
deployment.id,
deployment.periodicProcessId,
deployment.createdAt,
deployment.runAt,
deployment.deployedAt,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ case class ScheduleId(processId: PeriodicProcessId, scheduleName: ScheduleName)

case class ScheduleDeploymentData(
id: PeriodicProcessDeploymentId,
periodicProcessId: PeriodicProcessId,
createdAt: LocalDateTime,
runAt: LocalDateTime,
deployedAt: Option[LocalDateTime],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,19 +54,16 @@ abstract class DBFetchingProcessRepository[F[_]: Monad](

import api._

override def getCanonicalProcessWithVersion(
override def getProcessVersion(
processName: ProcessName,
versionId: VersionId
)(
implicit user: LoggedUser,
): F[Option[(CanonicalProcess, ProcessVersion)]] = {
): F[Option[ProcessVersion]] = {
val result = for {
processId <- OptionT(fetchProcessId(processName))
details <- OptionT(fetchProcessDetailsForId[CanonicalProcess](processId, versionId))
} yield (
details.json,
details.toEngineProcessVersion,
)
} yield details.toEngineProcessVersion
result.value
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package pl.touk.nussknacker.ui.process.repository
import cats.Monad
import pl.touk.nussknacker.engine.api.ProcessVersion
import pl.touk.nussknacker.engine.api.process._
import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
import pl.touk.nussknacker.ui.process.ScenarioQuery
import pl.touk.nussknacker.ui.security.api.LoggedUser

Expand All @@ -29,12 +28,12 @@ abstract class FetchingProcessRepository[F[_]: Monad] extends ProcessDBQueryRepo
query: ScenarioQuery
)(implicit loggedUser: LoggedUser, ec: ExecutionContext): F[List[PS]]

def getCanonicalProcessWithVersion(
def getProcessVersion(
processName: ProcessName,
versionId: VersionId
)(
implicit user: LoggedUser,
): F[Option[(CanonicalProcess, ProcessVersion)]]
): F[Option[ProcessVersion]]

def fetchProcessId(processName: ProcessName)(implicit ec: ExecutionContext): F[Option[ProcessId]]

Expand Down
Loading

0 comments on commit 85792ca

Please sign in to comment.