Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix different json representation in scheduling mechanism #7519

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .run/NussknackerApp-postgres.run.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
<env name="DB_USER" value="nu" />
<env name="DB_PASSWORD" value="nupassword" />
<env name="TABLES_DEFINITION_FILE" value="../../../nussknacker-dist/src/universal/conf/dev-tables-definition.sql" />
<env name="MANAGERS_DIR" value="managers" />
</envs>
<option name="INCLUDE_PROVIDED_SCOPE" value="true" />
<option name="MAIN_CLASS_NAME" value="pl.touk.nussknacker.ui.NussknackerApp" />
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package db.migration

import com.typesafe.scalalogging.LazyLogging
import pl.touk.nussknacker.ui.db.migration.SlickMigration

import scala.concurrent.ExecutionContext.Implicits.global

trait V1_062__ScenarioSchedulingTablesModificationsDefinition extends SlickMigration with LazyLogging {
piotrp marked this conversation as resolved.
Show resolved Hide resolved

import profile.api._

override def migrateActions: DBIOAction[Any, NoStream, Effect.All] = {
logger.info("Starting migration V1_062__ScenarioSchedulingTablesModifications")
for {
_ <- sqlu"""ALTER TABLE "periodic_scenarios" RENAME TO "scheduled_scenarios";"""
_ <- sqlu"""ALTER TABLE "scheduled_scenarios" ALTER COLUMN "input_config_during_execution" DROP NOT NULL;"""
_ <- sqlu"""ALTER TABLE "scheduled_scenarios" ADD COLUMN "resolved_scenario_json" VARCHAR(10485760);"""
mgoworko marked this conversation as resolved.
Show resolved Hide resolved
_ <- sqlu"""ALTER TABLE "periodic_scenario_deployments" RENAME TO "scheduled_scenario_deployments";"""
} yield logger.info("Execution finished for migration V1_062__ScenarioSchedulingTablesModifications")
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package db.migration.hsql

import db.migration.V1_062__ScenarioSchedulingTablesModificationsDefinition
import slick.jdbc.{HsqldbProfile, JdbcProfile}

class V1_062__ScenarioSchedulingTablesModifications extends V1_062__ScenarioSchedulingTablesModificationsDefinition {
override protected lazy val profile: JdbcProfile = HsqldbProfile
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package db.migration.postgres

import db.migration.V1_062__ScenarioSchedulingTablesModificationsDefinition
import slick.jdbc.{JdbcProfile, PostgresProfile}

class V1_062__ScenarioSchedulingTablesModifications extends V1_062__ScenarioSchedulingTablesModificationsDefinition {
override protected lazy val profile: JdbcProfile = PostgresProfile
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ trait PeriodicProcessDeploymentsTableFactory extends PeriodicProcessesTableFacto
MappedColumnType.base[PeriodicProcessDeploymentStatus, String](_.toString, PeriodicProcessDeploymentStatus.withName)

class PeriodicProcessDeploymentsTable(tag: Tag)
extends Table[PeriodicProcessDeploymentEntity](tag, "periodic_scenario_deployments") {
extends Table[PeriodicProcessDeploymentEntity](tag, "scheduled_scenario_deployments") {

def id: Rep[PeriodicProcessDeploymentId] = column[PeriodicProcessDeploymentId]("id", O.PrimaryKey, O.AutoInc)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package pl.touk.nussknacker.ui.db.entity

import io.circe.Decoder
import io.circe.syntax.EncoderOps
import pl.touk.nussknacker.engine.api.deployment.scheduler.model.RuntimeParams
import pl.touk.nussknacker.engine.api.deployment.ProcessActionId
import pl.touk.nussknacker.engine.api.deployment.scheduler.model.RuntimeParams
import pl.touk.nussknacker.engine.api.process.{ProcessId, ProcessName, VersionId}
import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
import pl.touk.nussknacker.engine.marshall.ProcessMarshaller
import pl.touk.nussknacker.ui.process.periodic.model.PeriodicProcessId
import slick.jdbc.JdbcProfile
import slick.lifted.ProvenShape
Expand Down Expand Up @@ -38,8 +40,14 @@ trait PeriodicProcessesTableFactory extends BaseEntityFactory {
}
)

implicit val canonicalProcessTypedType: BaseColumnType[CanonicalProcess] =
MappedColumnType.base[CanonicalProcess, String](
_.asJson.noSpaces,
ProcessMarshaller.fromJsonUnsafe
)

abstract class PeriodicProcessesTable[ENTITY <: PeriodicProcessEntity](tag: Tag)
extends Table[ENTITY](tag, "periodic_scenarios") {
extends Table[ENTITY](tag, "scheduled_scenarios") {

def id: Rep[PeriodicProcessId] = column[PeriodicProcessId]("id", O.PrimaryKey, O.AutoInc)

Expand All @@ -63,12 +71,14 @@ trait PeriodicProcessesTableFactory extends BaseEntityFactory {

}

class PeriodicProcessesWithInputConfigJsonTable(tag: Tag)
extends PeriodicProcessesTable[PeriodicProcessEntityWithInputConfigJson](tag) {
class PeriodicProcessesWithDeploymentDetailsTable(tag: Tag)
extends PeriodicProcessesTable[PeriodicProcessEntityWithDeploymentDetails](tag) {

def inputConfigDuringExecutionJson: Rep[Option[String]] = column[Option[String]]("input_config_during_execution")

def inputConfigDuringExecutionJson: Rep[String] = column[String]("input_config_during_execution", NotNull)
def resolvedScenario: Rep[Option[CanonicalProcess]] = column[Option[CanonicalProcess]]("resolved_scenario_json")

override def * : ProvenShape[PeriodicProcessEntityWithInputConfigJson] = (
override def * : ProvenShape[PeriodicProcessEntityWithDeploymentDetails] = (
id,
processId,
processName,
Expand All @@ -80,14 +90,15 @@ trait PeriodicProcessesTableFactory extends BaseEntityFactory {
createdAt,
processActionId,
inputConfigDuringExecutionJson,
) <> (PeriodicProcessEntityWithInputConfigJson.apply _ tupled, PeriodicProcessEntityWithInputConfigJson.unapply)
resolvedScenario,
) <> (PeriodicProcessEntityWithDeploymentDetails.apply _ tupled, PeriodicProcessEntityWithDeploymentDetails.unapply)

}

class PeriodicProcessesWithoutInputConfigJsonTable(tag: Tag)
extends PeriodicProcessesTable[PeriodicProcessEntityWithoutInputConfigJson](tag) {
class PeriodicProcessesWithoutDeploymentDetailsTable(tag: Tag)
extends PeriodicProcessesTable[PeriodicProcessEntityWithoutDeploymentDetails](tag) {

override def * : ProvenShape[PeriodicProcessEntityWithoutInputConfigJson] = (
override def * : ProvenShape[PeriodicProcessEntityWithoutDeploymentDetails] = (
id,
processId,
processName,
Expand All @@ -98,13 +109,14 @@ trait PeriodicProcessesTableFactory extends BaseEntityFactory {
active,
createdAt,
processActionId
) <> (PeriodicProcessEntityWithoutInputConfigJson.apply _ tupled, PeriodicProcessEntityWithoutInputConfigJson.unapply)
) <> (PeriodicProcessEntityWithoutDeploymentDetails.apply _ tupled, PeriodicProcessEntityWithoutDeploymentDetails.unapply)

}

object PeriodicProcessesWithoutInputConfig extends TableQuery(new PeriodicProcessesWithoutInputConfigJsonTable(_))
object PeriodicProcessesWithoutDeploymentDetails
extends TableQuery(new PeriodicProcessesWithoutDeploymentDetailsTable(_))

object PeriodicProcessesWithInputConfig extends TableQuery(new PeriodicProcessesWithInputConfigJsonTable(_))
object PeriodicProcessesWithDeploymentDetails extends TableQuery(new PeriodicProcessesWithDeploymentDetailsTable(_))

}

Expand Down Expand Up @@ -132,7 +144,7 @@ trait PeriodicProcessEntity {

}

case class PeriodicProcessEntityWithInputConfigJson(
case class PeriodicProcessEntityWithDeploymentDetails(
id: PeriodicProcessId,
processId: Option[ProcessId],
processName: ProcessName,
Expand All @@ -143,10 +155,11 @@ case class PeriodicProcessEntityWithInputConfigJson(
active: Boolean,
createdAt: LocalDateTime,
processActionId: Option[ProcessActionId],
inputConfigDuringExecutionJson: String,
inputConfigDuringExecutionJson: Option[String],
resolvedScenario: Option[CanonicalProcess],
) extends PeriodicProcessEntity

case class PeriodicProcessEntityWithoutInputConfigJson(
case class PeriodicProcessEntityWithoutDeploymentDetails(
id: PeriodicProcessId,
processId: Option[ProcessId],
processName: ProcessName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import pl.touk.nussknacker.engine.api.component.{
import pl.touk.nussknacker.engine.api.deployment.StateStatus.StatusName
import pl.touk.nussknacker.engine.api.deployment._
import pl.touk.nussknacker.engine.api.deployment.scheduler.model.{ScheduleProperty => _, _}
import pl.touk.nussknacker.engine.api.deployment.scheduler.model.{ScheduleProperty => ApiScheduleProperty}
import pl.touk.nussknacker.engine.api.deployment.scheduler.services._
import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus
import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus.ProblemStateStatus
Expand Down Expand Up @@ -395,11 +394,8 @@ class PeriodicProcessService(
_ <- periodicProcessesRepository.markFinished(deployment.id).run
currentState <- periodicProcessesRepository.findProcessData(deployment.id).run
canonicalProcessOpt <- periodicProcessesRepository
.fetchCanonicalProcessWithVersion(
processName,
versionId
)
.map(_.map(_._1))
.fetchCanonicalProcess(deployment.periodicProcessId, processName, versionId)
.run
canonicalProcess = canonicalProcessOpt.getOrElse {
throw new PeriodicProcessException(
s"Could not fetch CanonicalProcess with ProcessVersion for processName=$processName, versionId=$versionId"
Expand Down Expand Up @@ -494,14 +490,18 @@ class PeriodicProcessService(
)
processName = deploymentWithJarData.processName
versionId = deploymentWithJarData.versionId
canonicalProcessWithVersionOpt <- periodicProcessesRepository
.fetchCanonicalProcessWithVersion(
processName,
versionId
canonicalProcessOpt <- periodicProcessesRepository
.fetchCanonicalProcess(deployment.periodicProcess.id, processName, versionId)
.run
canonicalProcess = canonicalProcessOpt.getOrElse {
throw new PeriodicProcessException(
s"Could not fetch CanonicalProcess for processName=$processName, versionId=$versionId"
)
canonicalProcessWithVersion = canonicalProcessWithVersionOpt.getOrElse {
}
processVersionOpt <- periodicProcessesRepository.fetchProcessVersion(processName, versionId)
processVersion = processVersionOpt.getOrElse {
throw new PeriodicProcessException(
s"Could not fetch CanonicalProcess with ProcessVersion for processName=$processName, versionId=$versionId"
s"Could not fetch ProcessVersion for processName=$processName, versionId=$versionId"
)
}
inputConfigDuringExecutionJsonOpt <- periodicProcessesRepository
Expand All @@ -514,8 +514,8 @@ class PeriodicProcessService(
}
enrichedProcessConfig <- processConfigEnricher.onDeploy(
ProcessConfigEnricher.DeployData(
canonicalProcessWithVersion._1,
canonicalProcessWithVersion._2,
canonicalProcess,
processVersion,
inputConfigDuringExecutionJson,
deployment.toDetails
)
Expand All @@ -524,8 +524,8 @@ class PeriodicProcessService(
deploymentWithJarData,
enrichedProcessConfig.inputConfigDuringExecutionJson,
deploymentData,
canonicalProcessWithVersion._1,
canonicalProcessWithVersion._2,
canonicalProcess,
processVersion,
)
} yield externalDeploymentId
deploymentAction
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -412,11 +412,28 @@ class SlickLegacyPeriodicProcessesRepository(
update.map(_ => ())
}

override def fetchCanonicalProcessWithVersion(
override def fetchCanonicalProcess(
periodicProcessId: PeriodicProcessId,
processName: ProcessName,
versionId: VersionId,
): Action[Option[CanonicalProcess]] = {
PeriodicProcessesWithJson
.filter(p => p.id === periodicProcessId)
.result
.headOption
.map(_.map(_.processJson))
}

override def fetchProcessVersion(
processName: ProcessName,
versionId: VersionId
): Future[Option[(CanonicalProcess, ProcessVersion)]] =
fetchingProcessRepository.getCanonicalProcessWithVersion(processName, versionId)(NussknackerInternalUser.instance)
): Future[Option[ProcessVersion]] = {
fetchingProcessRepository
.getProcessVersion(
processName,
versionId
)(NussknackerInternalUser.instance)
}

def fetchInputConfigDuringExecutionJson(periodicProcessId: PeriodicProcessId): Action[Option[String]] =
PeriodicProcessesWithJson
Expand Down Expand Up @@ -462,6 +479,7 @@ class SlickLegacyPeriodicProcessesRepository(
private def scheduleDeploymentData(deployment: PeriodicProcessDeploymentEntity): ScheduleDeploymentData = {
ScheduleDeploymentData(
deployment.id,
deployment.periodicProcessId,
deployment.createdAt,
deployment.runAt,
deployment.deployedAt,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ case class ScheduleId(processId: PeriodicProcessId, scheduleName: ScheduleName)

case class ScheduleDeploymentData(
id: PeriodicProcessDeploymentId,
periodicProcessId: PeriodicProcessId,
createdAt: LocalDateTime,
runAt: LocalDateTime,
deployedAt: Option[LocalDateTime],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,19 +54,16 @@ abstract class DBFetchingProcessRepository[F[_]: Monad](

import api._

override def getCanonicalProcessWithVersion(
override def getProcessVersion(
processName: ProcessName,
versionId: VersionId
)(
implicit user: LoggedUser,
): F[Option[(CanonicalProcess, ProcessVersion)]] = {
): F[Option[ProcessVersion]] = {
val result = for {
processId <- OptionT(fetchProcessId(processName))
details <- OptionT(fetchProcessDetailsForId[CanonicalProcess](processId, versionId))
} yield (
details.json,
details.toEngineProcessVersion,
)
} yield details.toEngineProcessVersion
result.value
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ abstract class FetchingProcessRepository[F[_]: Monad] extends ProcessDBQueryRepo
query: ScenarioQuery
)(implicit loggedUser: LoggedUser, ec: ExecutionContext): F[List[ScenarioWithDetailsEntity[PS]]]

def getCanonicalProcessWithVersion(
def getProcessVersion(
processName: ProcessName,
versionId: VersionId
)(
implicit user: LoggedUser,
): F[Option[(CanonicalProcess, ProcessVersion)]]
): F[Option[ProcessVersion]]

def fetchProcessId(processName: ProcessName)(implicit ec: ExecutionContext): F[Option[ProcessId]]

Expand Down
Loading