diff --git a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/DeploymentManagerDependencies.scala b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/DeploymentManagerDependencies.scala index 0e3ef004026..27f402a0384 100644 --- a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/DeploymentManagerDependencies.scala +++ b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/DeploymentManagerDependencies.scala @@ -1,7 +1,11 @@ package pl.touk.nussknacker.engine import akka.actor.ActorSystem -import pl.touk.nussknacker.engine.api.deployment.{ProcessingTypeActionService, ProcessingTypeDeployedScenariosProvider} +import pl.touk.nussknacker.engine.api.deployment.{ + ProcessingTypeActionService, + ProcessingTypeDeployedScenariosProvider, + ScenarioActivityManager +} import sttp.client3.SttpBackend import scala.concurrent.{ExecutionContext, Future} @@ -9,6 +13,7 @@ import scala.concurrent.{ExecutionContext, Future} case class DeploymentManagerDependencies( deployedScenariosProvider: ProcessingTypeDeployedScenariosProvider, actionService: ProcessingTypeActionService, + scenarioActivityManager: ScenarioActivityManager, executionContext: ExecutionContext, actorSystem: ActorSystem, sttpBackend: SttpBackend[Future, Any], diff --git a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/BaseDeploymentManager.scala b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/BaseDeploymentManager.scala index 6bcd3567380..6da947d7ffc 100644 --- a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/BaseDeploymentManager.scala +++ b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/BaseDeploymentManager.scala @@ -1,5 +1,6 @@ package pl.touk.nussknacker.engine.api.deployment +import pl.touk.nussknacker.engine.api.deployment.ScenarioActivityHandling.AllScenarioActivitiesStoredByNussknacker import pl.touk.nussknacker.engine.api.deployment.simple.SimpleProcessStateDefinitionManager import pl.touk.nussknacker.engine.deployment.CustomActionDefinition @@ -11,4 +12,6 @@ trait BaseDeploymentManager extends DeploymentManager { override def customActionsDefinitions: List[CustomActionDefinition] = List.empty + override def scenarioActivityHandling: ScenarioActivityHandling = AllScenarioActivitiesStoredByNussknacker + } diff --git a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/DeploymentManager.scala b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/DeploymentManager.scala index fb1bc12e34a..196f97ca155 100644 --- a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/DeploymentManager.scala +++ b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/DeploymentManager.scala @@ -70,6 +70,8 @@ trait DeploymentManager extends AutoCloseable { def customActionsDefinitions: List[CustomActionDefinition] + def scenarioActivityHandling: ScenarioActivityHandling + protected final def notImplemented: Future[Nothing] = Future.failed(new NotImplementedError()) diff --git a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/ScenarioActivityHandling.scala b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/ScenarioActivityHandling.scala new file mode 100644 index 00000000000..932e9c31f26 --- /dev/null +++ b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/ScenarioActivityHandling.scala @@ -0,0 +1,21 @@ +package pl.touk.nussknacker.engine.api.deployment + +import pl.touk.nussknacker.engine.api.process.ProcessIdWithName + +import scala.concurrent.Future + +sealed trait ScenarioActivityHandling + +object ScenarioActivityHandling { + + case object AllScenarioActivitiesStoredByNussknacker extends ScenarioActivityHandling + + trait ManagerSpecificScenarioActivitiesStoredByManager extends ScenarioActivityHandling { + + def managerSpecificScenarioActivities( + processIdWithName: ProcessIdWithName + ): Future[List[ScenarioActivity]] + + } + +} diff --git a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/ScenarioActivityManager.scala b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/ScenarioActivityManager.scala new file mode 100644 index 00000000000..11a23a86823 --- /dev/null +++ b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/ScenarioActivityManager.scala @@ -0,0 +1,41 @@ +package pl.touk.nussknacker.engine.api.deployment + +import pl.touk.nussknacker.engine.api.deployment.ScenarioActivityManager.ModificationResult + +import scala.concurrent.Future + +trait ScenarioActivityManager { + + def saveActivity( + scenarioActivity: ScenarioActivity + ): Future[Unit] + + def modifyActivity( + scenarioActivityId: ScenarioActivityId, + modify: ScenarioActivity => ScenarioActivity, + ): Future[ModificationResult] + +} + +object ScenarioActivityManager { + sealed trait ModificationResult + + object ModificationResult { + case object Success extends ModificationResult + case object Failure extends ModificationResult + } + +} + +object NoOpScenarioActivityManager extends ScenarioActivityManager { + + def saveActivity( + scenarioActivity: ScenarioActivity + ): Future[Unit] = Future.unit + + def modifyActivity( + scenarioActivityId: ScenarioActivityId, + modify: ScenarioActivity => ScenarioActivity, + ): Future[ModificationResult] = Future.successful(ModificationResult.Success) + +} diff --git a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/cache/CachingProcessStateDeploymentManager.scala b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/cache/CachingProcessStateDeploymentManager.scala index be4135f3cab..6c6915fe5b1 100644 --- a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/cache/CachingProcessStateDeploymentManager.scala +++ b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/cache/CachingProcessStateDeploymentManager.scala @@ -58,6 +58,8 @@ class CachingProcessStateDeploymentManager( override def customActionsDefinitions: List[CustomActionDefinition] = delegate.customActionsDefinitions + override def scenarioActivityHandling: ScenarioActivityHandling = delegate.scenarioActivityHandling + override def close(): Unit = delegate.close() } diff --git a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/testing/DeploymentManagerStub.scala b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/testing/DeploymentManagerStub.scala index aff7240d144..d7f65f529ef 100644 --- a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/testing/DeploymentManagerStub.scala +++ b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/testing/DeploymentManagerStub.scala @@ -2,14 +2,14 @@ package pl.touk.nussknacker.engine.testing import cats.data.{Validated, ValidatedNel} import com.typesafe.config.Config +import pl.touk.nussknacker.engine.api.StreamMetaData import pl.touk.nussknacker.engine.api.component.ScenarioPropertyConfig import pl.touk.nussknacker.engine.api.definition._ +import pl.touk.nussknacker.engine.api.deployment.ScenarioActivityHandling.AllScenarioActivitiesStoredByNussknacker import pl.touk.nussknacker.engine.api.deployment._ import pl.touk.nussknacker.engine.api.deployment.simple.{SimpleProcessStateDefinitionManager, SimpleStateStatus} -import pl.touk.nussknacker.engine.api.StreamMetaData import pl.touk.nussknacker.engine.api.process.{ProcessIdWithName, ProcessName} import pl.touk.nussknacker.engine.deployment.CustomActionDefinition -import pl.touk.nussknacker.engine.newdeployment import pl.touk.nussknacker.engine.{ BaseModelData, DeploymentManagerDependencies, diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/api/RemoteEnvironmentResources.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/api/RemoteEnvironmentResources.scala index 5c9080d9516..a801d750773 100644 --- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/api/RemoteEnvironmentResources.scala +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/api/RemoteEnvironmentResources.scala @@ -96,7 +96,7 @@ class RemoteEnvironmentResources( scenarioActivityId = ScenarioActivityId.random, user = user.scenarioUser, date = clock.instant(), - scenarioVersionId = Some(ScenarioVersionId(details.processVersionId.value)), + scenarioVersionId = Some(ScenarioVersionId.from(details.processVersionId)), destinationEnvironment = Environment(remoteEnvironment.environmentId) ) ) diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/api/ScenarioActivityApiHttpService.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/api/ScenarioActivityApiHttpService.scala index 0704ccf92da..a5160056ca5 100644 --- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/api/ScenarioActivityApiHttpService.scala +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/api/ScenarioActivityApiHttpService.scala @@ -2,13 +2,17 @@ package pl.touk.nussknacker.ui.api import cats.data.EitherT import com.typesafe.scalalogging.LazyLogging +import pl.touk.nussknacker.engine.api.deployment.ScenarioActivityHandling.{ + AllScenarioActivitiesStoredByNussknacker, + ManagerSpecificScenarioActivitiesStoredByManager +} import pl.touk.nussknacker.engine.api.deployment.{ ScenarioActivity, ScenarioActivityId, ScenarioAttachment, ScenarioComment } -import pl.touk.nussknacker.engine.api.process.{ProcessId, ProcessName} +import pl.touk.nussknacker.engine.api.process.{ProcessId, ProcessIdWithName, ProcessName} import pl.touk.nussknacker.security.Permission import pl.touk.nussknacker.security.Permission.Permission import pl.touk.nussknacker.ui.api.description.scenarioActivity.Dtos.ScenarioActivityError.{ @@ -19,6 +23,7 @@ import pl.touk.nussknacker.ui.api.description.scenarioActivity.Dtos.ScenarioActi } import pl.touk.nussknacker.ui.api.description.scenarioActivity.Dtos._ import pl.touk.nussknacker.ui.api.description.scenarioActivity.{Dtos, Endpoints} +import pl.touk.nussknacker.ui.process.deployment.DeploymentManagerDispatcher import pl.touk.nussknacker.ui.process.repository.DBIOActionRunner import pl.touk.nussknacker.ui.process.repository.activities.ScenarioActivityRepository import pl.touk.nussknacker.ui.process.{ProcessService, ScenarioAttachmentService} @@ -33,6 +38,7 @@ import scala.concurrent.{ExecutionContext, Future} class ScenarioActivityApiHttpService( authManager: AuthManager, + deploymentManagerDispatcher: DeploymentManagerDispatcher, scenarioActivityRepository: ScenarioActivityRepository, scenarioService: ProcessService, scenarioAuthorizer: AuthorizeProcess, @@ -115,7 +121,7 @@ class ScenarioActivityApiHttpService( for { scenarioId <- getScenarioIdByName(scenarioName) _ <- isAuthorized(scenarioId, Permission.Read) - activities <- fetchActivities(scenarioId) + activities <- fetchActivities(ProcessIdWithName(scenarioId, scenarioName)) } yield ScenarioActivities(activities) } } @@ -210,15 +216,27 @@ class ScenarioActivityApiHttpService( ) private def fetchActivities( - scenarioId: ProcessId - ): EitherT[Future, ScenarioActivityError, List[Dtos.ScenarioActivity]] = - EitherT - .right( - dbioActionRunner.run( - scenarioActivityRepository.findActivities(scenarioId) - ) - ) - .map(_.map(toDto).toList) + processIdWithName: ProcessIdWithName + )(implicit loggedUser: LoggedUser): EitherT[Future, ScenarioActivityError, List[Dtos.ScenarioActivity]] = + EitherT.right { + for { + generalActivities <- dbioActionRunner.run(scenarioActivityRepository.findActivities(processIdWithName.id)) + deploymentManager <- deploymentManagerDispatcher.deploymentManager(processIdWithName) + deploymentManagerSpecificActivities <- deploymentManager match { + case Some(manager) => + manager.scenarioActivityHandling match { + case AllScenarioActivitiesStoredByNussknacker => + Future.successful(List.empty) + case handling: ManagerSpecificScenarioActivitiesStoredByManager => + handling.managerSpecificScenarioActivities(processIdWithName) + } + case None => + Future.successful(List.empty) + } + combinedActivities = (generalActivities ++ deploymentManagerSpecificActivities).map(toDto) + sortedCombinedActivities = combinedActivities.toList.sortBy(_.date) + } yield sortedCombinedActivities + } private def toDto(scenarioComment: ScenarioComment): Dtos.ScenarioActivityComment = { scenarioComment match { @@ -389,7 +407,8 @@ class ScenarioActivityApiHttpService( scenarioVersionId, comment, dateFinished, - errorMessage + status, + errorMessage, ) => Dtos.ScenarioActivity.forPerformedSingleExecution( id = scenarioActivityId.value, @@ -398,6 +417,7 @@ class ScenarioActivityApiHttpService( scenarioVersionId = scenarioVersionId.map(_.value), comment = toDto(comment), dateFinished = dateFinished, + status = status, errorMessage = errorMessage, ) case ScenarioActivity.PerformedScheduledExecution( @@ -407,7 +427,11 @@ class ScenarioActivityApiHttpService( date, scenarioVersionId, dateFinished, - errorMessage + scheduleName, + status, + createdAt, + nextRetryAt, + retriesLeft, ) => Dtos.ScenarioActivity.forPerformedScheduledExecution( id = scenarioActivityId.value, @@ -415,7 +439,11 @@ class ScenarioActivityApiHttpService( date = date, scenarioVersionId = scenarioVersionId.map(_.value), dateFinished = dateFinished, - errorMessage = errorMessage, + scheduleName = scheduleName, + status = status, + createdAt = createdAt, + retriesLeft = retriesLeft, + nextRetryAt = nextRetryAt, ) case ScenarioActivity.AutomaticUpdate( _, diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/api/description/scenarioActivity/Dtos.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/api/description/scenarioActivity/Dtos.scala index 86ebb366963..2e1c6165b8e 100644 --- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/api/description/scenarioActivity/Dtos.scala +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/api/description/scenarioActivity/Dtos.scala @@ -8,6 +8,7 @@ import io.circe import io.circe.generic.extras import io.circe.generic.extras.semiauto.deriveConfiguredCodec import io.circe.{Decoder, Encoder} +import pl.touk.nussknacker.engine.api.deployment.ScheduledExecutionStatus import pl.touk.nussknacker.engine.api.process.{ProcessName, VersionId} import pl.touk.nussknacker.engine.requestresponse.openapi.OApiDocumentation.dropNulls import pl.touk.nussknacker.restmodel.BaseEndpointDefinitions @@ -586,6 +587,7 @@ object Dtos { scenarioVersionId: Option[Long], comment: ScenarioActivityComment, dateFinished: Option[Instant], + status: Option[String], errorMessage: Option[String], ): ScenarioActivity = ScenarioActivity( id = id, @@ -596,6 +598,7 @@ object Dtos { comment = Some(comment), attachment = None, additionalFields = List( + status.map(AdditionalField("status", _)), dateFinished.map(date => AdditionalField("dateFinished", date.toString)), errorMessage.map(e => AdditionalField("errorMessage", e)), ).flatten @@ -607,20 +610,38 @@ object Dtos { date: Instant, scenarioVersionId: Option[Long], dateFinished: Option[Instant], - errorMessage: Option[String], - ): ScenarioActivity = ScenarioActivity( - id = id, - `type` = ScenarioActivityType.PerformedScheduledExecution, - user = user, - date = date, - scenarioVersionId = scenarioVersionId, - comment = None, - attachment = None, - additionalFields = List( - dateFinished.map(date => AdditionalField("dateFinished", date.toString)), - errorMessage.map(error => AdditionalField("errorMessage", error)), - ).flatten - ) + scheduleName: String, + status: ScheduledExecutionStatus, + createdAt: Instant, + nextRetryAt: Option[Instant], + retriesLeft: Option[Int], + ): ScenarioActivity = { + val humanReadableStatus = status match { + case ScheduledExecutionStatus.Scheduled => "Scheduled" + case ScheduledExecutionStatus.Deployed => "Deployed" + case ScheduledExecutionStatus.Finished => "Execution finished" + case ScheduledExecutionStatus.Failed => "Execution failed" + case ScheduledExecutionStatus.DeploymentWillBeRetried => "Deployment will be retried" + case ScheduledExecutionStatus.DeploymentFailed => "Deployment failed" + } + ScenarioActivity( + id = id, + `type` = ScenarioActivityType.PerformedScheduledExecution, + user = user, + date = date, + scenarioVersionId = scenarioVersionId, + comment = None, + attachment = None, + additionalFields = List( + Some(AdditionalField("status", humanReadableStatus)), + Some(AdditionalField("createdAt", createdAt.toString)), + dateFinished.map(date => AdditionalField("dateFinished", date.toString)), + Some(AdditionalField("scheduleName", scheduleName)), + Some(AdditionalField("retriesLeft", retriesLeft.toString)), + nextRetryAt.map(nra => AdditionalField("nextRetryAt", nra.toString)), + ).flatten + ) + } // Other/technical diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/api/description/scenarioActivity/Examples.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/api/description/scenarioActivity/Examples.scala index 9c4abcb7cbb..1fd356cf195 100644 --- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/api/description/scenarioActivity/Examples.scala +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/api/description/scenarioActivity/Examples.scala @@ -1,5 +1,6 @@ package pl.touk.nussknacker.ui.api.description.scenarioActivity +import pl.touk.nussknacker.engine.api.deployment.ScheduledExecutionStatus import pl.touk.nussknacker.engine.api.process.ProcessName import pl.touk.nussknacker.ui.api.description.scenarioActivity.Dtos.ScenarioActivityError.{ NoActivity, @@ -183,6 +184,7 @@ object Examples { lastModifiedAt = Instant.parse("2024-01-17T14:21:17Z") ), dateFinished = Some(Instant.parse("2024-01-17T14:21:17Z")), + status = Some("IN_PROGRESS"), errorMessage = Some("Execution error occurred"), ), ScenarioActivity.forPerformedSingleExecution( @@ -196,6 +198,7 @@ object Examples { lastModifiedAt = Instant.parse("2024-01-17T14:21:17Z") ), dateFinished = Some(Instant.parse("2024-01-17T14:21:17Z")), + status = Some("FAILED"), errorMessage = None, ), ScenarioActivity.forPerformedScheduledExecution( @@ -204,7 +207,23 @@ object Examples { date = Instant.parse("2024-01-17T14:21:17Z"), scenarioVersionId = Some(1), dateFinished = Some(Instant.parse("2024-01-17T14:21:17Z")), - errorMessage = None, + scheduleName = "main-schedule", + status = ScheduledExecutionStatus.Finished, + createdAt = Instant.parse("2024-01-17T13:21:17Z"), + retriesLeft = None, + nextRetryAt = None, + ), + ScenarioActivity.forPerformedScheduledExecution( + id = UUID.fromString("9b27797e-aa03-42ba-8406-d0ae8005a883"), + user = "some user", + date = Instant.parse("2024-01-17T14:21:17Z"), + scenarioVersionId = Some(1), + dateFinished = Some(Instant.parse("2024-01-17T14:21:17Z")), + scheduleName = "main-schedule", + status = ScheduledExecutionStatus.DeploymentWillBeRetried, + createdAt = Instant.parse("2024-01-17T13:21:17Z"), + retriesLeft = Some(1), + nextRetryAt = Some(Instant.parse("2024-01-17T15:21:17Z")), ), ScenarioActivity.forAutomaticUpdate( id = UUID.fromString("33509d37-7657-4229-940f-b5736c82fb13"), diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/deployment/DeploymentManagerDispatcher.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/deployment/DeploymentManagerDispatcher.scala index f3953d63999..7dcbe0decf4 100644 --- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/deployment/DeploymentManagerDispatcher.scala +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/deployment/DeploymentManagerDispatcher.scala @@ -19,6 +19,15 @@ class DeploymentManagerDispatcher( processRepository.fetchProcessingType(processId).map(deploymentManagerUnsafe) } + def deploymentManager( + processId: ProcessIdWithName + )(implicit ec: ExecutionContext, user: LoggedUser): Future[Option[DeploymentManager]] = { + for { + processingType <- processRepository.fetchProcessingType(processId) + maybeDeploymentManager = deploymentManager(processingType) + } yield maybeDeploymentManager + } + def deploymentManager(processingType: ProcessingType)(implicit user: LoggedUser): Option[DeploymentManager] = { managers.forProcessingType(processingType) } diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/deployment/RepositoryBasedScenarioActivityManager.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/deployment/RepositoryBasedScenarioActivityManager.scala new file mode 100644 index 00000000000..3abf514edb0 --- /dev/null +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/deployment/RepositoryBasedScenarioActivityManager.scala @@ -0,0 +1,37 @@ +package pl.touk.nussknacker.ui.process.deployment + +import pl.touk.nussknacker.engine.api.deployment.ScenarioActivityManager.ModificationResult +import pl.touk.nussknacker.engine.api.deployment.{ScenarioActivity, ScenarioActivityId, ScenarioActivityManager} +import pl.touk.nussknacker.ui.process.repository.DBIOActionRunner +import pl.touk.nussknacker.ui.process.repository.activities.ScenarioActivityRepository +import pl.touk.nussknacker.ui.process.repository.activities.ScenarioActivityRepository.ModifyActivityError + +import scala.concurrent.{ExecutionContext, Future} + +class RepositoryBasedScenarioActivityManager( + repository: ScenarioActivityRepository, + dbioActionRunner: DBIOActionRunner +)(implicit executionContext: ExecutionContext) + extends ScenarioActivityManager { + + override def saveActivity( + scenarioActivity: ScenarioActivity + ): Future[Unit] = { + dbioActionRunner + .run(repository.addActivity(scenarioActivity)) + .map((_: ScenarioActivityId) => ()) + } + + override def modifyActivity( + scenarioActivityId: ScenarioActivityId, + modification: ScenarioActivity => ScenarioActivity + ): Future[ModificationResult] = dbioActionRunner.run( + repository + .modifyActivity(scenarioActivityId, modification) + .map { + case Right(_: Unit) => ModificationResult.Success + case Left(_: ModifyActivityError) => ModificationResult.Failure + } + ) + +} diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/newactivity/ActivityService.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/newactivity/ActivityService.scala index 72e54bcd0df..22292bcf222 100644 --- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/newactivity/ActivityService.scala +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/newactivity/ActivityService.scala @@ -1,10 +1,10 @@ package pl.touk.nussknacker.ui.process.newactivity import cats.data.EitherT +import pl.touk.nussknacker.engine.api.Comment import pl.touk.nussknacker.engine.api.deployment._ import pl.touk.nussknacker.engine.api.process.{ProcessId, VersionId} import pl.touk.nussknacker.ui.api.DeploymentCommentSettings -import pl.touk.nussknacker.engine.api.Comment import pl.touk.nussknacker.ui.process.newactivity.ActivityService._ import pl.touk.nussknacker.ui.process.newdeployment.DeploymentService.RunDeploymentError import pl.touk.nussknacker.ui.process.newdeployment.{DeploymentService, RunDeploymentCommand} @@ -13,7 +13,7 @@ import pl.touk.nussknacker.ui.process.repository.{DBIOActionRunner, DeploymentCo import pl.touk.nussknacker.ui.security.api.LoggedUser import pl.touk.nussknacker.ui.util.LoggedUserUtils.Ops -import java.time.{Clock, Instant} +import java.time.Clock import scala.concurrent.{ExecutionContext, Future} // TODO: This service in the future should handle all activities that modify anything in application. @@ -74,13 +74,13 @@ class ActivityService( scenarioActivityId = ScenarioActivityId.random, user = loggedUser.scenarioUser, date = now, - scenarioVersionId = Some(ScenarioVersionId(scenarioGraphVersionId.value)), + scenarioVersionId = Some(ScenarioVersionId.from(scenarioGraphVersionId)), comment = commentOpt match { case Some(comment) => ScenarioComment.Available(comment.content, UserName(loggedUser.username), now) case None => ScenarioComment.Deleted(UserName(loggedUser.username), now) }, ) - )(loggedUser) + ) ) .map(_ => ()) ) diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/processingtype/InvalidDeploymentManagerStub.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/processingtype/InvalidDeploymentManagerStub.scala index 501dd6ba3ca..d19fba00397 100644 --- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/processingtype/InvalidDeploymentManagerStub.scala +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/processingtype/InvalidDeploymentManagerStub.scala @@ -1,5 +1,6 @@ package pl.touk.nussknacker.ui.process.processingtype +import pl.touk.nussknacker.engine.api.deployment.ScenarioActivityHandling.AllScenarioActivitiesStoredByNussknacker import pl.touk.nussknacker.engine.api.deployment._ import pl.touk.nussknacker.engine.api.deployment.simple.SimpleProcessStateDefinitionManager import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus.ProblemStateStatus @@ -45,5 +46,8 @@ object InvalidDeploymentManagerStub extends DeploymentManager { override def deploymentSynchronisationSupport: DeploymentSynchronisationSupport = NoDeploymentSynchronisationSupport + override def scenarioActivityHandling: ScenarioActivityHandling = + AllScenarioActivitiesStoredByNussknacker + override def close(): Unit = () } diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/repository/ProcessRepository.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/repository/ProcessRepository.scala index ae3e1b106f1..b9678cb7b1f 100644 --- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/repository/ProcessRepository.scala +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/repository/ProcessRepository.scala @@ -189,7 +189,7 @@ class DBProcessRepository( scenarioActivityId = ScenarioActivityId.random, user = loggedUser.scenarioUser, date = clock.instant(), - scenarioVersionId = res.newVersion.map(v => ScenarioVersionId(v.value)) + scenarioVersionId = res.newVersion.map(ScenarioVersionId.from) ) ) } yield res.newVersion.map(ProcessCreated(res.processId, _)) @@ -208,7 +208,7 @@ class DBProcessRepository( scenarioActivityId = ScenarioActivityId.random, user = loggedUser.scenarioUser, date = Instant.now(), - scenarioVersionId = Some(ScenarioVersionId(versionId.value)), + scenarioVersionId = Some(ScenarioVersionId.from(versionId)), comment = updateProcessAction.comment match { case Some(comment) => ScenarioComment.Available( @@ -237,10 +237,10 @@ class DBProcessRepository( scenarioActivityId = ScenarioActivityId.random, user = loggedUser.scenarioUser, date = clock.instant(), - scenarioVersionId = Some(ScenarioVersionId(versionId.value)), + scenarioVersionId = Some(ScenarioVersionId.from(versionId)), sourceEnvironment = Environment(migrateProcessAction.sourceEnvironment), sourceUser = UserName(user), - sourceScenarioVersionId = migrateProcessAction.sourceScenarioVersionId.map(v => ScenarioVersionId(v.value)), + sourceScenarioVersionId = migrateProcessAction.sourceScenarioVersionId.map(ScenarioVersionId.from), targetEnvironment = Some(Environment(migrateProcessAction.targetEnvironment)), ) ) @@ -257,7 +257,7 @@ class DBProcessRepository( scenarioActivityId = ScenarioActivityId.random, user = loggedUser.scenarioUser, date = Instant.now(), - scenarioVersionId = Some(ScenarioVersionId(versionId.value)), + scenarioVersionId = Some(ScenarioVersionId.from(versionId)), changes = automaticProcessUpdateAction.migrationsApplies.map(_.description).mkString(", "), errorMessage = None, ) @@ -394,7 +394,7 @@ class DBProcessRepository( scenarioActivityId = ScenarioActivityId.random, user = loggedUser.scenarioUser, date = Instant.now(), - scenarioVersionId = Some(ScenarioVersionId(version.id.value)), + scenarioVersionId = Some(ScenarioVersionId.from(version.id)), oldName = process.name.value, newName = newName.value ) diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/repository/activities/DbScenarioActivityRepository.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/repository/activities/DbScenarioActivityRepository.scala index 274c2f82905..d7b69e16b79 100644 --- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/repository/activities/DbScenarioActivityRepository.scala +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/repository/activities/DbScenarioActivityRepository.scala @@ -18,13 +18,16 @@ import pl.touk.nussknacker.ui.db.entity.{ import pl.touk.nussknacker.ui.db.{DbRef, NuTables} import pl.touk.nussknacker.ui.process.ScenarioAttachmentService.AttachmentToAdd import pl.touk.nussknacker.ui.process.repository.DbioRepository -import pl.touk.nussknacker.ui.process.repository.activities.ScenarioActivityRepository.ModifyCommentError +import pl.touk.nussknacker.ui.process.repository.activities.ScenarioActivityRepository.{ + ModifyActivityError, + ModifyCommentError +} import pl.touk.nussknacker.ui.security.api.LoggedUser import pl.touk.nussknacker.ui.statistics.{AttachmentsTotal, CommentsTotal} import pl.touk.nussknacker.ui.util.LoggedUserUtils.Ops import java.sql.Timestamp -import java.time.Clock +import java.time.{Clock, Instant} import scala.concurrent.ExecutionContext import scala.util.Try @@ -45,10 +48,23 @@ class DbScenarioActivityRepository(override protected val dbRef: DbRef, clock: C def addActivity( scenarioActivity: ScenarioActivity, - )(implicit user: LoggedUser): DB[ScenarioActivityId] = { + ): DB[ScenarioActivityId] = { insertActivity(scenarioActivity).map(_.activityId) } + def modifyActivity( + activityId: ScenarioActivityId, + modification: ScenarioActivity => ScenarioActivity, + ): DB[Either[ModifyActivityError, Unit]] = { + modifyActivityByActivityId[ModifyActivityError, ScenarioActivity]( + activityId = activityId, + activityDoesNotExistError = ModifyActivityError.ActivityDoesNotExist, + validateCurrentValue = validateActivityExistsForScenario, + modify = originalActivity => toEntity(modification(originalActivity)), + couldNotModifyError = ModifyActivityError.CouldNotModifyActivity, + ) + } + def addComment( scenarioId: ProcessId, processVersionId: VersionId, @@ -61,7 +77,7 @@ class DbScenarioActivityRepository(override protected val dbRef: DbRef, clock: C scenarioActivityId = ScenarioActivityId.random, user = user.scenarioUser, date = now, - scenarioVersionId = Some(ScenarioVersionId(processVersionId.value)), + scenarioVersionId = Some(ScenarioVersionId.from(processVersionId)), comment = ScenarioComment.Available( comment = comment, lastModifiedByUserName = UserName(user.username), @@ -147,7 +163,7 @@ class DbScenarioActivityRepository(override protected val dbRef: DbRef, clock: C scenarioActivityId = ScenarioActivityId.random, user = user.scenarioUser, date = now, - scenarioVersionId = Some(ScenarioVersionId(attachmentToAdd.scenarioVersionId.value)), + scenarioVersionId = Some(ScenarioVersionId.from(attachmentToAdd.scenarioVersionId)), attachment = ScenarioAttachment.Available( attachmentId = AttachmentId(attachment.id), attachmentFilename = AttachmentFilename(attachmentToAdd.fileName), @@ -227,7 +243,7 @@ class DbScenarioActivityRepository(override protected val dbRef: DbRef, clock: C for { _ <- Either.cond(entity.scenarioId == scenarioId, (), ModifyCommentError.CommentDoesNotExist) _ <- entity.comment.toRight(ModifyCommentError.CommentDoesNotExist) - } yield () + } yield entity } private def toComment( @@ -325,14 +341,18 @@ class DbScenarioActivityRepository(override protected val dbRef: DbRef, clock: C private lazy val attachmentInsertQuery = attachmentsTable returning attachmentsTable.map(_.id) into ((item, id) => item.copy(id = id)) - private def modifyActivityByActivityId[ERROR]( + private def validateActivityExistsForScenario(entity: ScenarioActivityEntityData) = { + fromEntity(entity).left.map(_ => ModifyActivityError.CouldNotModifyActivity).map(_._2) + } + + private def modifyActivityByActivityId[ERROR, T]( activityId: ScenarioActivityId, activityDoesNotExistError: ERROR, - validateCurrentValue: ScenarioActivityEntityData => Either[ERROR, Unit], - modify: ScenarioActivityEntityData => ScenarioActivityEntityData, + validateCurrentValue: ScenarioActivityEntityData => Either[ERROR, T], + modify: T => ScenarioActivityEntityData, couldNotModifyError: ERROR, ): DB[Either[ERROR, Unit]] = { - modifyActivity[ScenarioActivityId, ERROR]( + doModifyActivity[ScenarioActivityId, ERROR, T]( key = activityId, fetchActivity = activityByIdCompiled(_).result.headOption, updateRow = (id: ScenarioActivityId, updatedEntity) => activityByIdCompiled(id).update(updatedEntity), @@ -346,11 +366,11 @@ class DbScenarioActivityRepository(override protected val dbRef: DbRef, clock: C private def modifyActivityByRowId[ERROR]( rowId: Long, activityDoesNotExistError: ERROR, - validateCurrentValue: ScenarioActivityEntityData => Either[ERROR, Unit], + validateCurrentValue: ScenarioActivityEntityData => Either[ERROR, ScenarioActivityEntityData], modify: ScenarioActivityEntityData => ScenarioActivityEntityData, couldNotModifyError: ERROR, ): DB[Either[ERROR, Unit]] = { - modifyActivity[Long, ERROR]( + doModifyActivity[Long, ERROR, ScenarioActivityEntityData]( key = rowId, fetchActivity = activityByRowIdCompiled(_).result.headOption, updateRow = (id: Long, updatedEntity) => activityByRowIdCompiled(id).update(updatedEntity), @@ -361,22 +381,22 @@ class DbScenarioActivityRepository(override protected val dbRef: DbRef, clock: C ) } - private def modifyActivity[KEY, ERROR]( + private def doModifyActivity[KEY, ERROR, VALIDATED]( key: KEY, fetchActivity: KEY => DB[Option[ScenarioActivityEntityData]], updateRow: (KEY, ScenarioActivityEntityData) => DB[Int], activityDoesNotExistError: ERROR, - validateCurrentValue: ScenarioActivityEntityData => Either[ERROR, Unit], - modify: ScenarioActivityEntityData => ScenarioActivityEntityData, + validateCurrentValue: ScenarioActivityEntityData => Either[ERROR, VALIDATED], + modify: VALIDATED => ScenarioActivityEntityData, couldNotModifyError: ERROR, ): DB[Either[ERROR, Unit]] = { val action = for { fetchedActivity <- fetchActivity(key) result <- { val modifiedEntity = for { - entity <- fetchedActivity.toRight(activityDoesNotExistError) - _ <- validateCurrentValue(entity) - modifiedEntity = modify(entity) + entity <- fetchedActivity.toRight(activityDoesNotExistError) + validated <- validateCurrentValue(entity) + modifiedEntity = modify(validated) } yield modifiedEntity modifiedEntity match { @@ -466,7 +486,7 @@ class DbScenarioActivityRepository(override protected val dbRef: DbRef, clock: C id = -1, activityType = activityType, scenarioId = ProcessId(scenarioActivity.scenarioId.value), - activityId = ScenarioActivityId.random, + activityId = scenarioActivity.scenarioActivityId, userId = scenarioActivity.user.id.map(_.value), userName = scenarioActivity.user.name.value, impersonatedByUserId = scenarioActivity.user.impersonatedByUserId.map(_.value), @@ -601,7 +621,15 @@ class DbScenarioActivityRepository(override protected val dbRef: DbRef, clock: C case activity: ScenarioActivity.PerformedScheduledExecution => createEntity(scenarioActivity)( finishedAt = activity.dateFinished.map(Timestamp.from), - errorMessage = activity.errorMessage, + additionalProperties = AdditionalProperties( + List( + Some("scheduleName" -> activity.scheduleName), + Some("status" -> activity.status.entryName), + Some("createdAt" -> activity.createdAt.toString), + activity.nextRetryAt.map(nra => "nextRetryAt" -> nra.toString), + activity.retriesLeft.map(rl => "retriesLeft" -> rl.toString) + ).flatten.toMap + ) ) case activity: ScenarioActivity.AutomaticUpdate => createEntity(scenarioActivity)( @@ -612,8 +640,11 @@ class DbScenarioActivityRepository(override protected val dbRef: DbRef, clock: C ) ) ) - case _: ScenarioActivity.CustomAction => - createEntity(scenarioActivity)() + case activity: ScenarioActivity.CustomAction => + createEntity(scenarioActivity)( + comment = comment(activity.comment), + lastModifiedByUserName = lastModifiedByUserName(activity.comment), + ) } } @@ -675,10 +706,6 @@ class DbScenarioActivityRepository(override protected val dbRef: DbRef, clock: C } } - private def additionalPropertyListFromEntity(entity: ScenarioActivityEntityData, namePrefix: String): List[String] = { - entity.additionalProperties.properties.filter(_._1.startsWith(namePrefix)).values.toList - } - private def additionalPropertyFromEntity(entity: ScenarioActivityEntityData, name: String): Either[String, String] = { optionalAdditionalPropertyFromEntity(entity, name).toRight(s"Missing additional property $name") } @@ -842,7 +869,7 @@ class DbScenarioActivityRepository(override protected val dbRef: DbRef, clock: C scenarioVersionId = entity.scenarioVersion, sourceEnvironment = Environment(sourceEnvironment), sourceUser = UserName(sourceUser), - sourceScenarioVersionId = sourceScenarioVersion.map(ScenarioVersionId), + sourceScenarioVersionId = sourceScenarioVersion.map(ScenarioVersionId.apply), targetEnvironment = targetEnvironment.map(Environment), )).map((entity.id, _)) case ScenarioActivityType.OutgoingMigration => @@ -867,21 +894,31 @@ class DbScenarioActivityRepository(override protected val dbRef: DbRef, clock: C scenarioVersionId = entity.scenarioVersion, comment = comment, dateFinished = entity.finishedAt.map(_.toInstant), + status = entity.state.map(_.toString), errorMessage = entity.errorMessage, )).map((entity.id, _)) case ScenarioActivityType.PerformedScheduledExecution => - ScenarioActivity - .PerformedScheduledExecution( - scenarioId = scenarioIdFromEntity(entity), - scenarioActivityId = entity.activityId, - user = userFromEntity(entity), - date = entity.createdAt.toInstant, - scenarioVersionId = entity.scenarioVersion, - dateFinished = entity.finishedAt.map(_.toInstant), - errorMessage = entity.errorMessage, + (for { + scheduleName <- additionalPropertyFromEntity(entity, "scheduleName") + status <- additionalPropertyFromEntity(entity, "status").flatMap( + ScheduledExecutionStatus.withNameEither(_).left.map(_.getMessage) ) - .asRight - .map((entity.id, _)) + createdAt <- additionalPropertyFromEntity(entity, "createdAt").map(Instant.parse) + nextRetryAt = optionalAdditionalPropertyFromEntity(entity, "nextRetryAt").map(Instant.parse) + retriesLeft = optionalAdditionalPropertyFromEntity(entity, "retriesLeft").flatMap(toIntOption) + } yield ScenarioActivity.PerformedScheduledExecution( + scenarioId = scenarioIdFromEntity(entity), + scenarioActivityId = entity.activityId, + user = userFromEntity(entity), + date = entity.createdAt.toInstant, + scenarioVersionId = entity.scenarioVersion, + dateFinished = entity.finishedAt.map(_.toInstant), + scheduleName = scheduleName, + status = status, + createdAt = createdAt, + nextRetryAt = nextRetryAt, + retriesLeft = retriesLeft, + )).map((entity.id, _)) case ScenarioActivityType.AutomaticUpdate => (for { description <- additionalPropertyFromEntity(entity, "description") @@ -923,4 +960,6 @@ class DbScenarioActivityRepository(override protected val dbRef: DbRef, clock: C private def toLongOption(str: String) = Try(str.toLong).toOption + private def toIntOption(str: String) = Try(str.toInt).toOption + } diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/repository/activities/ScenarioActivityRepository.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/repository/activities/ScenarioActivityRepository.scala index 5a7b8c453c4..f79b380cc11 100644 --- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/repository/activities/ScenarioActivityRepository.scala +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/repository/activities/ScenarioActivityRepository.scala @@ -6,7 +6,10 @@ import pl.touk.nussknacker.engine.api.process.{ProcessId, VersionId} import pl.touk.nussknacker.ui.api.description.scenarioActivity.Dtos.Legacy import pl.touk.nussknacker.ui.db.entity.AttachmentEntityData import pl.touk.nussknacker.ui.process.ScenarioAttachmentService.AttachmentToAdd -import pl.touk.nussknacker.ui.process.repository.activities.ScenarioActivityRepository.ModifyCommentError +import pl.touk.nussknacker.ui.process.repository.activities.ScenarioActivityRepository.{ + ModifyActivityError, + ModifyCommentError +} import pl.touk.nussknacker.ui.security.api.LoggedUser trait ScenarioActivityRepository { @@ -17,7 +20,12 @@ trait ScenarioActivityRepository { def addActivity( scenarioActivity: ScenarioActivity, - )(implicit user: LoggedUser): DB[ScenarioActivityId] + ): DB[ScenarioActivityId] + + def modifyActivity( + activityId: ScenarioActivityId, + modification: ScenarioActivity => ScenarioActivity, + ): DB[Either[ModifyActivityError, Unit]] def addComment( scenarioId: ProcessId, @@ -78,4 +86,11 @@ object ScenarioActivityRepository { case object CouldNotModifyComment extends ModifyCommentError } + sealed trait ModifyActivityError + + object ModifyActivityError { + case object ActivityDoesNotExist extends ModifyActivityError + case object CouldNotModifyActivity extends ModifyActivityError + } + } diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/server/AkkaHttpBasedRouteProvider.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/server/AkkaHttpBasedRouteProvider.scala index b85f2a2c10c..b39009734ba 100644 --- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/server/AkkaHttpBasedRouteProvider.scala +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/server/AkkaHttpBasedRouteProvider.scala @@ -49,6 +49,7 @@ import pl.touk.nussknacker.ui.process.deployment.{ DefaultProcessingTypeDeployedScenariosProvider, DeploymentManagerDispatcher, DeploymentService => LegacyDeploymentService, + RepositoryBasedScenarioActivityManager, ScenarioResolver, ScenarioTestExecutorServiceImpl } @@ -81,15 +82,13 @@ import pl.touk.nussknacker.ui.statistics.{ } import pl.touk.nussknacker.ui.suggester.ExpressionSuggester import pl.touk.nussknacker.ui.uiresolving.UIProcessResolver -import pl.touk.nussknacker.ui.util.{CorsSupport, OptionsMethodSupport, SecurityHeadersSupport, WithDirectives} +import pl.touk.nussknacker.ui.util._ import pl.touk.nussknacker.ui.validation.{ NodeValidator, ParametersValidator, ScenarioLabelsValidator, UIProcessValidator } -import pl.touk.nussknacker.ui.util._ -import pl.touk.nussknacker.ui.validation.{NodeValidator, ParametersValidator, UIProcessValidator} import sttp.client3.SttpBackend import sttp.client3.asynchttpclient.future.AsyncHttpClientFutureBackend @@ -125,13 +124,17 @@ class AkkaHttpBasedRouteProvider( countsReporter <- createCountsReporter(featureTogglesConfig, environment, sttpBackend) actionServiceSupplier = new DelayedInitActionServiceSupplier additionalUIConfigProvider = createAdditionalUIConfigProvider(resolvedConfig, sttpBackend) + deploymentRepository = new DeploymentRepository(dbRef, Clock.systemDefaultZone()) + scenarioActivityRepository = new DbScenarioActivityRepository(dbRef, designerClock) + dbioRunner = DBIOActionRunner(dbRef) processingTypeDataProvider <- prepareProcessingTypeDataReload( additionalUIConfigProvider, actionServiceSupplier, + scenarioActivityRepository, + dbioRunner, sttpBackend, ) - deploymentRepository = new DeploymentRepository(dbRef, Clock.systemDefaultZone()) - dbioRunner = DBIOActionRunner(dbRef) + deploymentsStatusesSynchronizer = new DeploymentsStatusesSynchronizer( deploymentRepository, processingTypeDataProvider.mapValues( @@ -246,8 +249,6 @@ class AkkaHttpBasedRouteProvider( // correct classloader and that won't cause further delays during handling requests processingTypeDataProvider.reloadAll().unsafeRunSync() - val processActivityRepository = new DbScenarioActivityRepository(dbRef, designerClock) - val authenticationResources = AuthenticationResources(resolvedConfig, getClass.getClassLoader, sttpBackend) val authManager = new AuthManager(authenticationResources) @@ -256,7 +257,7 @@ class AkkaHttpBasedRouteProvider( dbRef, designerClock, processRepository, - processActivityRepository, + scenarioActivityRepository, scenarioLabelsRepository, environment ) @@ -382,6 +383,7 @@ class AkkaHttpBasedRouteProvider( val scenarioActivityApiHttpService = new ScenarioActivityApiHttpService( authManager = authManager, + deploymentManagerDispatcher = dmDispatcher, scenarioActivityRepository = scenarioActivityRepository, scenarioService = processService, scenarioAuthorizer = processAuthorizer, @@ -518,7 +520,7 @@ class AkkaHttpBasedRouteProvider( processService, processingTypeDataProvider.mapValues(_.deploymentData.deploymentManagerType), fingerprintService, - processActivityRepository, + scenarioActivityRepository, componentService, feStatisticsRepository, processingTypeDataProvider @@ -677,6 +679,8 @@ class AkkaHttpBasedRouteProvider( private def prepareProcessingTypeDataReload( additionalUIConfigProvider: AdditionalUIConfigProvider, actionServiceProvider: Supplier[ActionService], + scenarioActivityRepository: DbScenarioActivityRepository, + dbioActionRunner: DBIOActionRunner, sttpBackend: SttpBackend[Future, Any], )(implicit executionContext: ExecutionContext): Resource[IO, ReloadableProcessingTypeDataProvider] = { Resource @@ -685,7 +689,13 @@ class AkkaHttpBasedRouteProvider( new ReloadableProcessingTypeDataProvider( processingTypeDataLoader.loadProcessingTypeData( getModelDependencies(additionalUIConfigProvider, _), - getDeploymentManagerDependencies(actionServiceProvider, sttpBackend, _), + getDeploymentManagerDependencies( + actionServiceProvider, + scenarioActivityRepository, + dbioActionRunner, + sttpBackend, + _ + ), ) ) ) @@ -696,6 +706,8 @@ class AkkaHttpBasedRouteProvider( private def getDeploymentManagerDependencies( actionServiceProvider: Supplier[ActionService], + scenarioActivityRepository: DbScenarioActivityRepository, + dbioActionRunner: DBIOActionRunner, sttpBackend: SttpBackend[Future, Any], processingType: ProcessingType )(implicit executionContext: ExecutionContext) = { @@ -705,6 +717,10 @@ class AkkaHttpBasedRouteProvider( processingType, actionServiceProvider.get(), ), + new RepositoryBasedScenarioActivityManager( + scenarioActivityRepository, + dbioActionRunner, + ), system.dispatcher, system, sttpBackend diff --git a/designer/server/src/test/scala/db/migration/V1_057__MigrateActionsAndCommentsToScenarioActivities.scala b/designer/server/src/test/scala/db/migration/V1_057__MigrateActionsAndCommentsToScenarioActivities.scala index 78273ae172c..f9270c86538 100644 --- a/designer/server/src/test/scala/db/migration/V1_057__MigrateActionsAndCommentsToScenarioActivities.scala +++ b/designer/server/src/test/scala/db/migration/V1_057__MigrateActionsAndCommentsToScenarioActivities.scala @@ -237,6 +237,7 @@ class V1_057__MigrateActionsAndCommentsToScenarioActivities date = date, scenarioVersionId = sv, dateFinished = None, + status = Some("IN_PROGRESS"), errorMessage = None, comment = Available("Deployed at the request of business", user.name, date) ) diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/test/mock/MockDeploymentManager.scala b/designer/server/src/test/scala/pl/touk/nussknacker/test/mock/MockDeploymentManager.scala index 02ff2e2d45a..f98cb0c8532 100644 --- a/designer/server/src/test/scala/pl/touk/nussknacker/test/mock/MockDeploymentManager.scala +++ b/designer/server/src/test/scala/pl/touk/nussknacker/test/mock/MockDeploymentManager.scala @@ -1,34 +1,29 @@ package pl.touk.nussknacker.test.mock +import _root_.sttp.client3.testing.SttpBackendStub import akka.actor.ActorSystem import cats.data.Validated.valid import cats.data.ValidatedNel import com.google.common.collect.LinkedHashMultimap import com.typesafe.config.Config +import pl.touk.nussknacker.engine._ +import pl.touk.nussknacker.engine.api.definition.{ + NotBlankParameterValidator, + NotNullParameterValidator, + StringParameterEditor +} import pl.touk.nussknacker.engine.api.deployment._ import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus import pl.touk.nussknacker.engine.api.process.ProcessName import pl.touk.nussknacker.engine.api.{ProcessVersion, StreamMetaData} import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess -import pl.touk.nussknacker.engine.deployment.{ - CustomActionDefinition, - CustomActionParameter, - CustomActionResult, - DeploymentId, - ExternalDeploymentId -} +import pl.touk.nussknacker.engine.deployment._ import pl.touk.nussknacker.engine.management.{FlinkDeploymentManager, FlinkStreamingDeploymentManagerProvider} -import pl.touk.nussknacker.engine._ import pl.touk.nussknacker.test.config.ConfigWithScalaVersion import pl.touk.nussknacker.test.utils.domain.TestFactory import shapeless.syntax.typeable.typeableOps -import _root_.sttp.client3.testing.SttpBackendStub -import pl.touk.nussknacker.engine.api.definition.{ - NotBlankParameterValidator, - NotNullParameterValidator, - StringParameterEditor -} +import java.time.Instant import java.util.UUID import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue} import scala.concurrent.ExecutionContext.Implicits.global @@ -47,7 +42,8 @@ class MockDeploymentManager( defaultProcessStateStatus: StateStatus = SimpleStateStatus.NotDeployed, deployedScenariosProvider: ProcessingTypeDeployedScenariosProvider = new ProcessingTypeDeployedScenariosProviderStub(List.empty), - actionService: ProcessingTypeActionService = new ProcessingTypeActionServiceStub + actionService: ProcessingTypeActionService = new ProcessingTypeActionServiceStub, + scenarioActivityManager: ScenarioActivityManager = NoOpScenarioActivityManager, ) extends FlinkDeploymentManager( ModelData( ProcessingTypeConfig.read(ConfigWithScalaVersion.StreamingProcessTypeConfig), @@ -56,6 +52,7 @@ class MockDeploymentManager( DeploymentManagerDependencies( deployedScenariosProvider, actionService, + scenarioActivityManager, ExecutionContext.global, ActorSystem("MockDeploymentManager"), SttpBackendStub.asynchronousFuture @@ -94,13 +91,46 @@ class MockDeploymentManager( import command._ logger.debug(s"Adding deploy for ${processVersion.processName}") deploys.add(processVersion.processName) - this.synchronized { - Option(deployResult.get(processVersion.processName)) - .map(_.toArray(Array.empty[Future[Option[ExternalDeploymentId]]])) - .getOrElse(Array.empty) - .lastOption - .getOrElse(Future.successful(None)) - } + + val customActivityId = ScenarioActivityId.random + for { + _ <- scenarioActivityManager.saveActivity( + ScenarioActivity.CustomAction( + scenarioId = ScenarioId(processVersion.processId.value), + scenarioActivityId = customActivityId, + user = ScenarioUser.internalNuUser, + date = Instant.now(), + scenarioVersionId = Some(ScenarioVersionId.from(processVersion.versionId)), + actionName = "Custom action of MockDeploymentManager", + comment = ScenarioComment.Available( + comment = "???", + lastModifiedByUserName = ScenarioUser.internalNuUser.name, + lastModifiedAt = Instant.now() + ) + ) + ) + externalDeploymentId <- this.synchronized { + Option(deployResult.get(processVersion.processName)) + .map(_.toArray(Array.empty[Future[Option[ExternalDeploymentId]]])) + .getOrElse(Array.empty) + .lastOption + .getOrElse(Future.successful(None)) + } + _ <- scenarioActivityManager.modifyActivity( + customActivityId, + { + case customActionActivity: ScenarioActivity.CustomAction => + customActionActivity.copy( + comment = ScenarioComment.Available( + comment = s"With successfully updated comment", + lastModifiedByUserName = ScenarioUser.internalNuUser.name, + lastModifiedAt = Instant.now() + ) + ) + case other => other + } + ) + } yield externalDeploymentId } override protected def waitForDuringDeployFinished( diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/test/utils/domain/TestFactory.scala b/designer/server/src/test/scala/pl/touk/nussknacker/test/utils/domain/TestFactory.scala index c3be03d54a9..63783155046 100644 --- a/designer/server/src/test/scala/pl/touk/nussknacker/test/utils/domain/TestFactory.scala +++ b/designer/server/src/test/scala/pl/touk/nussknacker/test/utils/domain/TestFactory.scala @@ -9,6 +9,7 @@ import db.util.DBIOActionInstances._ import pl.touk.nussknacker.engine.api.component.{DesignerWideComponentId, ProcessingMode} import pl.touk.nussknacker.engine.api.definition.FixedExpressionValue import pl.touk.nussknacker.engine.api.deployment.{ + NoOpScenarioActivityManager, ProcessingTypeActionServiceStub, ProcessingTypeDeployedScenariosProviderStub } @@ -131,6 +132,7 @@ object TestFactory { DeploymentManagerDependencies( new ProcessingTypeDeployedScenariosProviderStub(List.empty), new ProcessingTypeActionServiceStub, + NoOpScenarioActivityManager, actorSystem.dispatcher, actorSystem, SttpBackendStub.asynchronousFuture diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/ScenarioActivityApiHttpServiceBusinessSpec.scala b/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/ScenarioActivityApiHttpServiceBusinessSpec.scala index e53d82dde1c..207619041d2 100644 --- a/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/ScenarioActivityApiHttpServiceBusinessSpec.scala +++ b/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/ScenarioActivityApiHttpServiceBusinessSpec.scala @@ -3,6 +3,8 @@ package pl.touk.nussknacker.ui.api import io.restassured.RestAssured.`given` import io.restassured.module.scala.RestAssuredSupport.AddThenToResponse import org.scalatest.freespec.AnyFreeSpecLike +import pl.touk.nussknacker.development.manager.MockableDeploymentManagerProvider.MockableDeploymentManager +import pl.touk.nussknacker.engine.api.deployment._ import pl.touk.nussknacker.engine.build.ScenarioBuilder import pl.touk.nussknacker.test.base.it.{NuItTest, WithSimplifiedConfigScenarioHelper} import pl.touk.nussknacker.test.config.{ @@ -326,6 +328,70 @@ class ScenarioActivityApiHttpServiceBusinessSpec ) ) } + "return SCENARIO_CREATED activity and activities returned by deployment manager" in { + given() + .applicationState { + createSavedScenario(exampleScenario) + MockableDeploymentManager.configureManagerSpecificScenarioActivities( + List( + ScenarioActivity.CustomAction( + scenarioId = ScenarioId(123), + scenarioActivityId = ScenarioActivityId.random, + user = ScenarioUser(None, UserName("custom-user"), None, None), + date = clock.instant(), + scenarioVersionId = None, + actionName = "Custom action handled by deployment manager", + comment = ScenarioComment.Available( + comment = "Executed on custom deployment manager", + lastModifiedByUserName = UserName("custom-user"), + lastModifiedAt = clock.instant() + ) + ) + ) + ) + } + .when() + .basicAuthAllPermUser() + .get(s"$nuDesignerHttpAddress/api/processes/$exampleScenarioName/activity/activities") + .Then() + .statusCode(200) + .body( + matchJsonWithRegexValues( + s""" + |{ + | "activities": [ + | { + | "id": "${regexes.looseUuidRegex}", + | "user": "admin", + | "date": "${regexes.zuluDateRegex}", + | "scenarioVersionId": 1, + | "additionalFields": [], + | "type": "SCENARIO_CREATED" + | }, + | { + | "id": "${regexes.looseUuidRegex}", + | "user": "custom-user", + | "date": "${regexes.zuluDateRegex}", + | "comment": { + | "content": { + | "value": "Executed on custom deployment manager", + | "status": "AVAILABLE" + | }, + | "lastModifiedBy": "custom-user", + | "lastModifiedAt": "${regexes.zuluDateRegex}" + | }, + | "additionalFields": [ + | {"name": "actionName", "value": "Custom action handled by deployment manager"} + | ], + | "type": "CUSTOM_ACTION" + | } + | ] + |} + |""".stripMargin + ) + ) + } + "return 404 for no existing scenario" in { given() .when() diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/ScenarioAttachmentServiceSpec.scala b/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/ScenarioAttachmentServiceSpec.scala index 17cdc4f94e0..83f03e98b58 100644 --- a/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/ScenarioAttachmentServiceSpec.scala +++ b/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/ScenarioAttachmentServiceSpec.scala @@ -11,6 +11,7 @@ import pl.touk.nussknacker.ui.api.description.scenarioActivity.Dtos.Legacy.Proce import pl.touk.nussknacker.ui.config.AttachmentsConfig import pl.touk.nussknacker.ui.db.entity.AttachmentEntityData import pl.touk.nussknacker.ui.process.repository.activities.ScenarioActivityRepository +import pl.touk.nussknacker.ui.process.repository.activities.ScenarioActivityRepository.ModifyActivityError import pl.touk.nussknacker.ui.security.api.{LoggedUser, RealLoggedUser} import slick.dbio.DBIO @@ -52,41 +53,51 @@ class ScenarioAttachmentServiceSpec extends AnyFunSuite with Matchers with Scala private object TestProcessActivityRepository extends ScenarioActivityRepository { - override def findActivities(scenarioId: ProcessId): DB[Seq[ScenarioActivity]] = ??? + override def findActivities(scenarioId: ProcessId): DB[Seq[ScenarioActivity]] = notSupported("findActivities") - override def addActivity(scenarioActivity: ScenarioActivity)(implicit user: LoggedUser): DB[ScenarioActivityId] = ??? + override def addActivity(scenarioActivity: ScenarioActivity): DB[ScenarioActivityId] = notSupported("addActivity") + + override def modifyActivity( + activityId: ScenarioActivityId, + modification: ScenarioActivity => ScenarioActivity, + ): DB[Either[ModifyActivityError, Unit]] = notSupported("modifyActivity") override def addComment(scenarioId: ProcessId, processVersionId: VersionId, comment: String)( implicit user: LoggedUser - ): DB[ScenarioActivityId] = ??? + ): DB[ScenarioActivityId] = notSupported("addComment") override def addAttachment(attachmentToAdd: ScenarioAttachmentService.AttachmentToAdd)( implicit user: LoggedUser ): DB[ScenarioActivityId] = DBIO.successful(ScenarioActivityId.random) - override def findAttachments(scenarioId: ProcessId): DB[Seq[AttachmentEntityData]] = ??? + override def findAttachments(scenarioId: ProcessId): DB[Seq[AttachmentEntityData]] = notSupported("findAttachments") - override def findAttachment(scenarioId: ProcessId, attachmentId: Long): DB[Option[AttachmentEntityData]] = ??? + override def findAttachment(scenarioId: ProcessId, attachmentId: Long): DB[Option[AttachmentEntityData]] = + notSupported("findAttachment") - override def findActivity(processId: ProcessId): DB[ProcessActivity] = ??? + override def findActivity(processId: ProcessId): DB[ProcessActivity] = notSupported("findActivity") - override def getActivityStats: DB[Map[String, Int]] = ??? + override def getActivityStats: DB[Map[String, Int]] = notSupported("getActivityStats") override def editComment(scenarioId: ProcessId, scenarioActivityId: ScenarioActivityId, comment: String)( implicit user: LoggedUser - ): DB[Either[ScenarioActivityRepository.ModifyCommentError, Unit]] = ??? + ): DB[Either[ScenarioActivityRepository.ModifyCommentError, Unit]] = notSupported("editComment") override def editComment(scenarioId: ProcessId, commentId: Long, comment: String)( implicit user: LoggedUser - ): DB[Either[ScenarioActivityRepository.ModifyCommentError, Unit]] = ??? + ): DB[Either[ScenarioActivityRepository.ModifyCommentError, Unit]] = notSupported("editComment") override def deleteComment(scenarioId: ProcessId, commentId: Long)( implicit user: LoggedUser - ): DB[Either[ScenarioActivityRepository.ModifyCommentError, Unit]] = ??? + ): DB[Either[ScenarioActivityRepository.ModifyCommentError, Unit]] = notSupported("deleteComment") override def deleteComment(scenarioId: ProcessId, scenarioActivityId: ScenarioActivityId)( implicit user: LoggedUser - ): DB[Either[ScenarioActivityRepository.ModifyCommentError, Unit]] = ??? + ): DB[Either[ScenarioActivityRepository.ModifyCommentError, Unit]] = notSupported("deleteComment") + + private def notSupported(methodName: String): Nothing = throw new Exception( + s"Method $methodName not supported by TestProcessActivityRepository test implementation" + ) } diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/deployment/DeploymentServiceSpec.scala b/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/deployment/DeploymentServiceSpec.scala index 538381c3f06..1faf9551618 100644 --- a/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/deployment/DeploymentServiceSpec.scala +++ b/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/deployment/DeploymentServiceSpec.scala @@ -8,7 +8,6 @@ import org.scalatest.LoneElement._ import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, OptionValues} -import pl.touk.nussknacker.engine.api.ProcessVersion import pl.touk.nussknacker.engine.api.component.NodesDeploymentData import pl.touk.nussknacker.engine.api.deployment.DeploymentUpdateStrategy.StateRestoringStrategy import pl.touk.nussknacker.engine.api.deployment.ScenarioActionName.{Cancel, Deploy} @@ -16,6 +15,7 @@ import pl.touk.nussknacker.engine.api.deployment._ import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus.ProblemStateStatus import pl.touk.nussknacker.engine.api.process._ +import pl.touk.nussknacker.engine.api.{Comment, ProcessVersion} import pl.touk.nussknacker.engine.build.ScenarioBuilder import pl.touk.nussknacker.engine.deployment.{CustomActionResult, DeploymentId, ExternalDeploymentId} import pl.touk.nussknacker.test.base.db.WithHsqlDbTesting @@ -27,12 +27,11 @@ import pl.touk.nussknacker.test.utils.scalas.DBIOActionValues import pl.touk.nussknacker.test.{EitherValuesDetailedMessage, NuScalaTestAssertions, PatientScalaFutures} import pl.touk.nussknacker.ui.api.DeploymentCommentSettings import pl.touk.nussknacker.ui.listener.ProcessChangeEvent.{OnActionExecutionFinished, OnActionSuccess} -import pl.touk.nussknacker.ui.process.processingtype.provider.{ProcessingTypeDataProvider, ProcessingTypeDataState} -import pl.touk.nussknacker.ui.process.processingtype.provider.ProcessingTypeDataProvider.noCombinedDataFun import pl.touk.nussknacker.ui.process.processingtype.ValueWithRestriction +import pl.touk.nussknacker.ui.process.processingtype.provider.ProcessingTypeDataProvider.noCombinedDataFun +import pl.touk.nussknacker.ui.process.processingtype.provider.{ProcessingTypeDataProvider, ProcessingTypeDataState} import pl.touk.nussknacker.ui.process.repository.ProcessRepository.CreateProcessAction import pl.touk.nussknacker.ui.process.repository.{CommentValidationError, DBIOActionRunner} -import pl.touk.nussknacker.engine.api.Comment import pl.touk.nussknacker.ui.process.{ScenarioQuery, ScenarioWithDetailsConversions} import pl.touk.nussknacker.ui.security.api.LoggedUser import slick.dbio.DBIOAction @@ -98,7 +97,8 @@ class DeploymentServiceSpec deploymentManager = new MockDeploymentManager( SimpleStateStatus.Running, DefaultProcessingTypeDeployedScenariosProvider(testDbRef, "streaming"), - new DefaultProcessingTypeActionService("streaming", deploymentService) + new DefaultProcessingTypeActionService("streaming", deploymentService), + new RepositoryBasedScenarioActivityManager(activityRepository, dbioRunner), ) private def createDeploymentService( @@ -382,6 +382,24 @@ class DeploymentServiceSpec checkStatusAction(SimpleStateStatus.Running, Some(ScenarioActionName.Deploy)) listener.events.toArray.filter(_.isInstanceOf[OnActionSuccess]) should have length 1 } + + val activities = dbioRunner.run(activityRepository.findActivities(processIdWithName.id)).futureValue + + activities.size shouldBe 3 + activities(0) match { + case _: ScenarioActivity.ScenarioCreated => () + case _ => fail("First activity should be ScenarioCreated") + } + activities(1) match { + case _: ScenarioActivity.ScenarioDeployed => () + case _ => fail("Second activity should be ScenarioDeployed") + } + activities(2) match { + case ScenarioActivity.CustomAction(_, _, _, _, _, actionName, ScenarioComment.Available(content, _, _)) => + actionName shouldBe "Custom action of MockDeploymentManager" + content shouldBe "With successfully updated comment" + case _ => fail("Third activity should be CustomAction with comment") + } } test("Should skip notifications and deployment on validation errors") { diff --git a/docs-internal/api/nu-designer-openapi.yaml b/docs-internal/api/nu-designer-openapi.yaml index 57d1435eda7..0017fda21fc 100644 --- a/docs-internal/api/nu-designer-openapi.yaml +++ b/docs-internal/api/nu-designer-openapi.yaml @@ -3770,6 +3770,8 @@ paths: lastModifiedBy: some user lastModifiedAt: '2024-01-17T14:21:17Z' additionalFields: + - name: status + value: IN_PROGRESS - name: dateFinished value: '2024-01-17T14:21:17Z' - name: errorMessage @@ -3785,6 +3787,8 @@ paths: lastModifiedBy: some user lastModifiedAt: '2024-01-17T14:21:17Z' additionalFields: + - name: status + value: FAILED - name: dateFinished value: '2024-01-17T14:21:17Z' type: PERFORMED_SINGLE_EXECUTION @@ -3793,8 +3797,34 @@ paths: date: '2024-01-17T14:21:17Z' scenarioVersionId: 1 additionalFields: + - name: status + value: Execution finished + - name: createdAt + value: '2024-01-17T13:21:17Z' - name: dateFinished value: '2024-01-17T14:21:17Z' + - name: scheduleName + value: main-schedule + - name: retriesLeft + value: None + type: PERFORMED_SCHEDULED_EXECUTION + - id: 9b27797e-aa03-42ba-8406-d0ae8005a883 + user: some user + date: '2024-01-17T14:21:17Z' + scenarioVersionId: 1 + additionalFields: + - name: status + value: Deployment will be retried + - name: createdAt + value: '2024-01-17T13:21:17Z' + - name: dateFinished + value: '2024-01-17T14:21:17Z' + - name: scheduleName + value: main-schedule + - name: retriesLeft + value: Some(1) + - name: nextRetryAt + value: '2024-01-17T15:21:17Z' type: PERFORMED_SCHEDULED_EXECUTION - id: 33509d37-7657-4229-940f-b5736c82fb13 user: some user diff --git a/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/DevelopmentDeploymentManagerProvider.scala b/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/DevelopmentDeploymentManagerProvider.scala index 967fe9c3c01..2459e9ef47c 100644 --- a/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/DevelopmentDeploymentManagerProvider.scala +++ b/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/DevelopmentDeploymentManagerProvider.scala @@ -14,6 +14,7 @@ import pl.touk.nussknacker.engine.api.definition.{ MandatoryParameterValidator, StringParameterEditor } +import pl.touk.nussknacker.engine.api.deployment.ScenarioActivityHandling.AllScenarioActivitiesStoredByNussknacker import pl.touk.nussknacker.engine.api.deployment._ import pl.touk.nussknacker.engine.api.deployment.simple.{SimpleProcessStateDefinitionManager, SimpleStateStatus} import pl.touk.nussknacker.engine.api.process.ProcessName @@ -182,6 +183,9 @@ class DevelopmentDeploymentManager(actorSystem: ActorSystem, modelData: BaseMode } } + override def scenarioActivityHandling: ScenarioActivityHandling = + AllScenarioActivitiesStoredByNussknacker + override def close(): Unit = {} private def changeState(name: ProcessName, stateStatus: StateStatus): Unit = diff --git a/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/MockableDeploymentManagerProvider.scala b/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/MockableDeploymentManagerProvider.scala index e8897f9f3cc..360ea87ff55 100644 --- a/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/MockableDeploymentManagerProvider.scala +++ b/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/MockableDeploymentManagerProvider.scala @@ -9,6 +9,7 @@ import pl.touk.nussknacker.engine.ModelData.BaseModelDataExt import pl.touk.nussknacker.engine._ import pl.touk.nussknacker.engine.api.component.ScenarioPropertyConfig import pl.touk.nussknacker.engine.api.definition.{NotBlankParameterValidator, StringParameterEditor} +import pl.touk.nussknacker.engine.api.deployment.ScenarioActivityHandling.ManagerSpecificScenarioActivitiesStoredByManager import pl.touk.nussknacker.engine.api.deployment._ import pl.touk.nussknacker.engine.api.deployment.simple.{SimpleProcessStateDefinitionManager, SimpleStateStatus} import pl.touk.nussknacker.engine.api.process.{ProcessIdWithName, ProcessName} @@ -27,7 +28,6 @@ import scala.util.Try class MockableDeploymentManagerProvider extends DeploymentManagerProvider { import net.ceedubs.ficus.Ficus._ - import net.ceedubs.ficus.readers.ArbitraryTypeReader._ override def createDeploymentManager( modelData: BaseModelData, @@ -131,6 +131,16 @@ object MockableDeploymentManagerProvider { override def deploymentSynchronisationSupport: DeploymentSynchronisationSupport = NoDeploymentSynchronisationSupport + override def scenarioActivityHandling: ScenarioActivityHandling = + new ManagerSpecificScenarioActivitiesStoredByManager { + + override def managerSpecificScenarioActivities( + processIdWithName: ProcessIdWithName + ): Future[List[ScenarioActivity]] = + Future.successful(MockableDeploymentManager.managerSpecificScenarioActivities.get()) + + } + override def close(): Unit = {} } @@ -138,10 +148,10 @@ object MockableDeploymentManagerProvider { // improved, but there is no need to do it ATM. object MockableDeploymentManager { - private val scenarioStatuses = new AtomicReference[Map[ScenarioName, StateStatus]](Map.empty) - private val testResults = new AtomicReference[Map[ScenarioName, TestResults[Json]]](Map.empty) - private val deploymentResults = - new AtomicReference[Map[DeploymentId, Try[Option[ExternalDeploymentId]]]](Map.empty) + private val scenarioStatuses = new AtomicReference[Map[ScenarioName, StateStatus]](Map.empty) + private val testResults = new AtomicReference[Map[ScenarioName, TestResults[Json]]](Map.empty) + private val deploymentResults = new AtomicReference[Map[DeploymentId, Try[Option[ExternalDeploymentId]]]](Map.empty) + private val managerSpecificScenarioActivities = new AtomicReference[List[ScenarioActivity]](List.empty) def configureScenarioStatuses(scenarioStates: Map[ScenarioName, StateStatus]): Unit = { MockableDeploymentManager.scenarioStatuses.set(scenarioStates) @@ -155,10 +165,15 @@ object MockableDeploymentManagerProvider { MockableDeploymentManager.testResults.set(scenarioTestResults) } + def configureManagerSpecificScenarioActivities(scenarioActivities: List[ScenarioActivity]): Unit = { + MockableDeploymentManager.managerSpecificScenarioActivities.set(scenarioActivities) + } + def clean(): Unit = { MockableDeploymentManager.scenarioStatuses.set(Map.empty) MockableDeploymentManager.deploymentResults.set(Map.empty) MockableDeploymentManager.testResults.set(Map.empty) + MockableDeploymentManager.managerSpecificScenarioActivities.set(List.empty) } } diff --git a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicDeploymentManager.scala b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicDeploymentManager.scala index d4285ca080d..0b76f2e6a74 100644 --- a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicDeploymentManager.scala +++ b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicDeploymentManager.scala @@ -2,6 +2,7 @@ package pl.touk.nussknacker.engine.management.periodic import com.typesafe.config.Config import com.typesafe.scalalogging.LazyLogging +import pl.touk.nussknacker.engine.api.deployment.ScenarioActivityHandling.ManagerSpecificScenarioActivitiesStoredByManager import pl.touk.nussknacker.engine.api.deployment._ import pl.touk.nussknacker.engine.api.process.{ProcessIdWithName, ProcessName} import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess @@ -222,4 +223,28 @@ class PeriodicDeploymentManager private[periodic] ( // We should move periodic mechanism to the core and reuse new synchronization mechanism also in this case. override def deploymentSynchronisationSupport: DeploymentSynchronisationSupport = NoDeploymentSynchronisationSupport + // todo NU-1772 + // In the current implementation: + // - PeriodicDeploymentManager is a kind of plugin, and it has its own data source (separate db) + // - PeriodicDeploymentManager returns (by implementing ManagerSpecificScenarioActivitiesStoredByManager) custom ScenarioActivities, that are associated with operations performed internally by the manager + // Why is it not the ideal solution: + // - we have different data sources for ScenarioActivities, and merging data from two sources may be problematic, e.g. when paginating results + // How can it be redesigned: + // - we could do it using the ManagerSpecificScenarioActivitiesStoredByNussknacker instead + // - that way, Nu would provide hooks, that would allow the manager to save and modify its custom activities in the Nu database + // - only the Nussknacker database would then be used, as single source of Scenario Activities + // Why not implemented that way in the first place? + // - we have to migrate information about old periodic deployments, or decide that we don't need it + // - we have to modify the logic of the PeriodicDeploymentManager + // - we may need to refactor PeriodicDeploymentManager data source first + override val scenarioActivityHandling: ManagerSpecificScenarioActivitiesStoredByManager = + new ManagerSpecificScenarioActivitiesStoredByManager { + + override def managerSpecificScenarioActivities( + processIdWithName: ProcessIdWithName + ): Future[List[ScenarioActivity]] = + service.getScenarioActivitiesSpecificToPeriodicProcess(processIdWithName) + + } + } diff --git a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessService.scala b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessService.scala index c06e0d53dae..c37199f9fe9 100644 --- a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessService.scala +++ b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessService.scala @@ -8,7 +8,7 @@ import pl.touk.nussknacker.engine.api.deployment.StateStatus.StatusName import pl.touk.nussknacker.engine.api.deployment._ import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus.ProblemStateStatus -import pl.touk.nussknacker.engine.api.process.ProcessName +import pl.touk.nussknacker.engine.api.process.{ProcessIdWithName, ProcessName} import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.deployment.{DeploymentData, DeploymentId} import pl.touk.nussknacker.engine.management.periodic.PeriodicProcessService.{ @@ -25,7 +25,7 @@ import pl.touk.nussknacker.engine.management.periodic.service._ import java.time.chrono.ChronoLocalDateTime import java.time.temporal.ChronoUnit -import java.time.{Clock, LocalDateTime} +import java.time.{Clock, Instant, LocalDateTime, ZoneOffset} import scala.concurrent.{ExecutionContext, Future} import scala.util.control.NonFatal @@ -58,6 +58,30 @@ class PeriodicProcessService( private implicit val localDateOrdering: Ordering[LocalDateTime] = Ordering.by(identity[ChronoLocalDateTime[_]]) + def getScenarioActivitiesSpecificToPeriodicProcess( + processIdWithName: ProcessIdWithName + ): Future[List[ScenarioActivity]] = + scheduledProcessesRepository + .getSchedulesState(processIdWithName.name) + .run + .map(_.groupedByPeriodicProcess) + .map(_.flatMap(_.deployments)) + .map(_.map { deployment => + ScenarioActivity.PerformedScheduledExecution( + scenarioId = ScenarioId(processIdWithName.id.value), + scenarioActivityId = ScenarioActivityId.random, + user = ScenarioUser.internalNuUser, + date = instantAtUTC(deployment.runAt), + scenarioVersionId = Some(ScenarioVersionId.from(deployment.periodicProcess.processVersion.versionId)), + dateFinished = deployment.state.completedAt.map(instantAtUTC), + scheduleName = deployment.scheduleName.display, + status = scheduledExecutionStatus(deployment.state.status), + createdAt = instantAtUTC(deployment.createdAt), + nextRetryAt = deployment.nextRetryAt.map(instantAtUTC), + retriesLeft = deployment.nextRetryAt.map(_ => deployment.retriesLeft), + ) + }.toList.sortBy(_.date)) + def schedule( schedule: ScheduleProperty, processVersion: ProcessVersion, @@ -496,6 +520,26 @@ class PeriodicProcessService( } + private def scheduledExecutionStatus(status: PeriodicProcessDeploymentStatus): ScheduledExecutionStatus = { + status match { + case PeriodicProcessDeploymentStatus.Scheduled => + ScheduledExecutionStatus.Scheduled + case PeriodicProcessDeploymentStatus.Deployed => + ScheduledExecutionStatus.Deployed + case PeriodicProcessDeploymentStatus.Finished => + ScheduledExecutionStatus.Finished + case PeriodicProcessDeploymentStatus.Failed => + ScheduledExecutionStatus.Failed + case PeriodicProcessDeploymentStatus.RetryingDeploy => + ScheduledExecutionStatus.DeploymentWillBeRetried + case PeriodicProcessDeploymentStatus.FailedOnDeploy => + ScheduledExecutionStatus.DeploymentFailed + } + } + + private def instantAtUTC(localDateTime: LocalDateTime): Instant = + localDateTime.toInstant(ZoneOffset.UTC) + } object PeriodicProcessService { diff --git a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/db/PeriodicProcessesRepository.scala b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/db/PeriodicProcessesRepository.scala index a1697f3d9dd..267f6316fbf 100644 --- a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/db/PeriodicProcessesRepository.scala +++ b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/db/PeriodicProcessesRepository.scala @@ -112,6 +112,10 @@ trait PeriodicProcessesRepository { def markInactive(processId: PeriodicProcessId): Action[Unit] + def getSchedulesState( + scenarioName: ProcessName + ): Action[SchedulesState] + def create( deploymentWithJarData: DeploymentWithJarData[CanonicalProcess], scheduleProperty: ScheduleProperty, @@ -183,6 +187,17 @@ class SlickPeriodicProcessesRepository( override def run[T](action: DBIOAction[T, NoStream, Effect.All]): Future[T] = db.run(action.transactionally) + override def getSchedulesState( + scenarioName: ProcessName + ): Action[SchedulesState] = { + PeriodicProcessesWithoutJson + .filter(_.processName === scenarioName) + .join(PeriodicProcessDeployments) + .on(_.id === _.periodicProcessId) + .result + .map(toSchedulesState) + } + override def create( deploymentWithJarData: DeploymentWithJarData[CanonicalProcess], scheduleProperty: ScheduleProperty, diff --git a/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicDeploymentManagerTest.scala b/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicDeploymentManagerTest.scala index c7289710492..e9d08636226 100644 --- a/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicDeploymentManagerTest.scala +++ b/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicDeploymentManagerTest.scala @@ -6,10 +6,9 @@ import org.scalatest.matchers.should.Matchers import org.scalatest.prop.TableDrivenPropertyChecks import org.scalatest.{Inside, OptionValues} import pl.touk.nussknacker.engine.api.deployment.DeploymentUpdateStrategy.StateRestoringStrategy -import pl.touk.nussknacker.engine.api.deployment.ScenarioActionName +import pl.touk.nussknacker.engine.api.deployment._ import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus.ProblemStateStatus -import pl.touk.nussknacker.engine.api.deployment._ import pl.touk.nussknacker.engine.api.process.{ProcessId, ProcessIdWithName, ProcessName, VersionId} import pl.touk.nussknacker.engine.api.{MetaData, ProcessVersion, StreamMetaData} import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess @@ -379,6 +378,39 @@ class PeriodicDeploymentManagerTest PeriodicProcessDeploymentStatus.Finished, PeriodicProcessDeploymentStatus.Scheduled ) + + val activities = + f.periodicDeploymentManager.scenarioActivityHandling.managerSpecificScenarioActivities(idWithName).futureValue + val firstActivity = activities.head.asInstanceOf[ScenarioActivity.PerformedScheduledExecution] + val secondActivity = activities(1).asInstanceOf[ScenarioActivity.PerformedScheduledExecution] + activities shouldBe List( + ScenarioActivity.PerformedScheduledExecution( + scenarioId = ScenarioId(1), + scenarioActivityId = firstActivity.scenarioActivityId, + user = ScenarioUser(None, UserName("Nussknacker"), None, None), + date = firstActivity.date, + scenarioVersionId = Some(ScenarioVersionId(1)), + dateFinished = firstActivity.dateFinished, + scheduleName = "[default]", + status = ScheduledExecutionStatus.Finished, + createdAt = firstActivity.createdAt, + retriesLeft = None, + nextRetryAt = None + ), + ScenarioActivity.PerformedScheduledExecution( + scenarioId = ScenarioId(1), + scenarioActivityId = secondActivity.scenarioActivityId, + user = ScenarioUser(None, UserName("Nussknacker"), None, None), + date = secondActivity.date, + scenarioVersionId = Some(ScenarioVersionId(42)), + dateFinished = secondActivity.dateFinished, + scheduleName = "[default]", + status = ScheduledExecutionStatus.Scheduled, + createdAt = secondActivity.createdAt, + retriesLeft = None, + nextRetryAt = None + ) + ) } test("should cancel failed job after RescheduleActor handles finished") { @@ -399,6 +431,26 @@ class PeriodicDeploymentManagerTest f.repository.deploymentEntities.loneElement.status shouldBe PeriodicProcessDeploymentStatus.Failed f.getMergedStatusDetails.status shouldEqual SimpleStateStatus.Canceled + + val activities = + f.periodicDeploymentManager.scenarioActivityHandling.managerSpecificScenarioActivities(idWithName).futureValue + val headActivity = activities.head.asInstanceOf[ScenarioActivity.PerformedScheduledExecution] + activities shouldBe List( + ScenarioActivity.PerformedScheduledExecution( + scenarioId = ScenarioId(1), + scenarioActivityId = headActivity.scenarioActivityId, + user = ScenarioUser(None, UserName("Nussknacker"), None, None), + date = headActivity.date, + scenarioVersionId = Some(ScenarioVersionId(1)), + dateFinished = headActivity.dateFinished, + scheduleName = "[default]", + status = ScheduledExecutionStatus.Failed, + createdAt = headActivity.createdAt, + retriesLeft = None, + nextRetryAt = None + ) + ) + } test("should reschedule failed job after RescheduleActor handles finished when configured") { diff --git a/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/db/InMemPeriodicProcessesRepository.scala b/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/db/InMemPeriodicProcessesRepository.scala index abab7ff3794..d16786058ea 100644 --- a/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/db/InMemPeriodicProcessesRepository.scala +++ b/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/db/InMemPeriodicProcessesRepository.scala @@ -115,6 +115,15 @@ class InMemPeriodicProcessesRepository(processingType: String) extends PeriodicP id } + override def getSchedulesState( + scenarioName: ProcessName + ): Action[SchedulesState] = { + val filteredProcesses = processEntities.filter { pe => + pe.processName == scenarioName && deploymentEntities.exists(d => d.periodicProcessId == pe.id) + }.toSeq + getLatestDeploymentsForPeriodicProcesses(filteredProcesses, deploymentsPerScheduleMaxCount = Int.MaxValue) + } + override def markInactive(processId: PeriodicProcessId): Unit = processEntities.zipWithIndex .find { case (process, _) => process.id == processId } diff --git a/engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/FlinkStreamingDeploymentManagerProviderHelper.scala b/engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/FlinkStreamingDeploymentManagerProviderHelper.scala index 7fe0ffc159c..3ac061207ea 100644 --- a/engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/FlinkStreamingDeploymentManagerProviderHelper.scala +++ b/engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/FlinkStreamingDeploymentManagerProviderHelper.scala @@ -5,8 +5,10 @@ import org.asynchttpclient.DefaultAsyncHttpClientConfig import pl.touk.nussknacker.engine.api.component.DesignerWideComponentId import pl.touk.nussknacker.engine.api.deployment.{ DeploymentManager, + NoOpScenarioActivityManager, ProcessingTypeActionServiceStub, - ProcessingTypeDeployedScenariosProviderStub + ProcessingTypeDeployedScenariosProviderStub, + ScenarioActivityManager } import pl.touk.nussknacker.engine.management.FlinkStreamingDeploymentManagerProvider import pl.touk.nussknacker.engine.{ @@ -38,6 +40,7 @@ object FlinkStreamingDeploymentManagerProviderHelper { val deploymentManagerDependencies = DeploymentManagerDependencies( new ProcessingTypeDeployedScenariosProviderStub(List.empty), new ProcessingTypeActionServiceStub, + NoOpScenarioActivityManager, actorSystem.dispatcher, actorSystem, backend diff --git a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkDeploymentManager.scala b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkDeploymentManager.scala index 9097d62fa2a..e61be722b14 100644 --- a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkDeploymentManager.scala +++ b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkDeploymentManager.scala @@ -5,9 +5,9 @@ import com.typesafe.scalalogging.LazyLogging import io.circe.syntax.EncoderOps import pl.touk.nussknacker.engine.ModelData._ import pl.touk.nussknacker.engine.api.ProcessVersion +import pl.touk.nussknacker.engine.api.deployment.ScenarioActivityHandling.AllScenarioActivitiesStoredByNussknacker import pl.touk.nussknacker.engine.api.deployment.DeploymentUpdateStrategy.StateRestoringStrategy import pl.touk.nussknacker.engine.api.deployment._ -import pl.touk.nussknacker.engine.newdeployment import pl.touk.nussknacker.engine.api.deployment.inconsistency.InconsistentStateDetector import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus import pl.touk.nussknacker.engine.api.process.{ProcessIdWithName, ProcessName} @@ -19,7 +19,7 @@ import pl.touk.nussknacker.engine.deployment.{ ExternalDeploymentId } import pl.touk.nussknacker.engine.management.FlinkDeploymentManager.prepareProgramArgs -import pl.touk.nussknacker.engine.{BaseModelData, DeploymentManagerDependencies} +import pl.touk.nussknacker.engine.{BaseModelData, DeploymentManagerDependencies, newdeployment} import scala.concurrent.Future @@ -263,6 +263,10 @@ abstract class FlinkDeploymentManager( ): Future[Option[ExternalDeploymentId]] override def processStateDefinitionManager: ProcessStateDefinitionManager = FlinkProcessStateDefinitionManager + + override def scenarioActivityHandling: ScenarioActivityHandling = + AllScenarioActivitiesStoredByNussknacker + } object FlinkDeploymentManager { diff --git a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkRestManager.scala b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkRestManager.scala index 0765c2927ac..fae0d547781 100644 --- a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkRestManager.scala +++ b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkRestManager.scala @@ -5,7 +5,7 @@ import org.apache.flink.api.common.{JobID, JobStatus} import pl.touk.nussknacker.engine.api.ProcessVersion import pl.touk.nussknacker.engine.api.deployment._ import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus -import pl.touk.nussknacker.engine.api.process.{ProcessId, ProcessName, VersionId} +import pl.touk.nussknacker.engine.api.process.{ProcessId, ProcessIdWithName, ProcessName, VersionId} import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.deployment.{DeploymentId, ExternalDeploymentId} import pl.touk.nussknacker.engine.management.FlinkRestManager.ParsedJobConfig diff --git a/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/FlinkRestManagerSpec.scala b/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/FlinkRestManagerSpec.scala index b3066f660eb..ea47b28fd3f 100644 --- a/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/FlinkRestManagerSpec.scala +++ b/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/FlinkRestManagerSpec.scala @@ -545,6 +545,7 @@ class FlinkRestManagerSpec extends AnyFunSuite with Matchers with PatientScalaFu val deploymentManagerDependencies = DeploymentManagerDependencies( new ProcessingTypeDeployedScenariosProviderStub(List.empty), new ProcessingTypeActionServiceStub, + NoOpScenarioActivityManager, ExecutionContext.global, ActorSystem(getClass.getSimpleName), sttpBackend diff --git a/engine/lite/deploymentManager/src/main/scala/pl/touk/nussknacker/lite/manager/LiteDeploymentManager.scala b/engine/lite/deploymentManager/src/main/scala/pl/touk/nussknacker/lite/manager/LiteDeploymentManager.scala index cce5a79778f..ab3a035a197 100644 --- a/engine/lite/deploymentManager/src/main/scala/pl/touk/nussknacker/lite/manager/LiteDeploymentManager.scala +++ b/engine/lite/deploymentManager/src/main/scala/pl/touk/nussknacker/lite/manager/LiteDeploymentManager.scala @@ -4,7 +4,12 @@ import io.circe.Json import pl.touk.nussknacker.engine.BaseModelData import pl.touk.nussknacker.engine.ModelData.BaseModelDataExt import pl.touk.nussknacker.engine.api.JobData -import pl.touk.nussknacker.engine.api.deployment.{BaseDeploymentManager, DMTestScenarioCommand} +import pl.touk.nussknacker.engine.api.deployment.ScenarioActivityHandling.AllScenarioActivitiesStoredByNussknacker +import pl.touk.nussknacker.engine.api.deployment.{ + BaseDeploymentManager, + DMTestScenarioCommand, + ScenarioActivityHandling +} import pl.touk.nussknacker.engine.lite.kafka.KafkaTransactionalScenarioInterpreter import pl.touk.nussknacker.engine.testmode.TestProcess diff --git a/engine/lite/embeddedDeploymentManager/src/test/scala/pl/touk/nussknacker/streaming/embedded/RequestResponseEmbeddedDeploymentManagerTest.scala b/engine/lite/embeddedDeploymentManager/src/test/scala/pl/touk/nussknacker/streaming/embedded/RequestResponseEmbeddedDeploymentManagerTest.scala index 33d0df7ec14..0d28fae5775 100644 --- a/engine/lite/embeddedDeploymentManager/src/test/scala/pl/touk/nussknacker/streaming/embedded/RequestResponseEmbeddedDeploymentManagerTest.scala +++ b/engine/lite/embeddedDeploymentManager/src/test/scala/pl/touk/nussknacker/streaming/embedded/RequestResponseEmbeddedDeploymentManagerTest.scala @@ -16,8 +16,10 @@ import pl.touk.nussknacker.engine.api.deployment.{ DeployedScenarioData, DeploymentManager, DeploymentUpdateStrategy, + NoOpScenarioActivityManager, ProcessingTypeActionServiceStub, - ProcessingTypeDeployedScenariosProviderStub + ProcessingTypeDeployedScenariosProviderStub, + ScenarioActivityManager } import pl.touk.nussknacker.engine.api.process.ProcessName import pl.touk.nussknacker.engine.build.ScenarioBuilder @@ -53,6 +55,7 @@ class RequestResponseEmbeddedDeploymentManagerTest val dependencies = DeploymentManagerDependencies( new ProcessingTypeDeployedScenariosProviderStub(initiallyDeployedScenarios), new ProcessingTypeActionServiceStub, + NoOpScenarioActivityManager, as.dispatcher, as, SttpBackendStub.asynchronousFuture diff --git a/engine/lite/k8sDeploymentManager/src/test/scala/pl/touk/nussknacker/k8s/manager/BaseK8sDeploymentManagerTest.scala b/engine/lite/k8sDeploymentManager/src/test/scala/pl/touk/nussknacker/k8s/manager/BaseK8sDeploymentManagerTest.scala index 36b71ad1e69..05b006824bc 100644 --- a/engine/lite/k8sDeploymentManager/src/test/scala/pl/touk/nussknacker/k8s/manager/BaseK8sDeploymentManagerTest.scala +++ b/engine/lite/k8sDeploymentManager/src/test/scala/pl/touk/nussknacker/k8s/manager/BaseK8sDeploymentManagerTest.scala @@ -60,6 +60,7 @@ class BaseK8sDeploymentManagerTest val dependencies = DeploymentManagerDependencies( new ProcessingTypeDeployedScenariosProviderStub(List.empty), new ProcessingTypeActionServiceStub, + NoOpScenarioActivityManager, system.dispatcher, system, backend diff --git a/engine/lite/k8sDeploymentManager/src/test/scala/pl/touk/nussknacker/k8s/manager/K8sDeploymentManagerOnMocksTest.scala b/engine/lite/k8sDeploymentManager/src/test/scala/pl/touk/nussknacker/k8s/manager/K8sDeploymentManagerOnMocksTest.scala index 6b77ed4be0d..16b645f8582 100644 --- a/engine/lite/k8sDeploymentManager/src/test/scala/pl/touk/nussknacker/k8s/manager/K8sDeploymentManagerOnMocksTest.scala +++ b/engine/lite/k8sDeploymentManager/src/test/scala/pl/touk/nussknacker/k8s/manager/K8sDeploymentManagerOnMocksTest.scala @@ -11,9 +11,11 @@ import org.scalatest.{BeforeAndAfterAll, Inside, OptionValues} import pl.touk.nussknacker.engine.DeploymentManagerDependencies import pl.touk.nussknacker.engine.api.deployment.{ DataFreshnessPolicy, + NoOpScenarioActivityManager, ProcessingTypeActionServiceStub, ProcessingTypeDeployedScenariosProvider, - ProcessingTypeDeployedScenariosProviderStub + ProcessingTypeDeployedScenariosProviderStub, + ScenarioActivityManager } import pl.touk.nussknacker.engine.api.process.ProcessName import pl.touk.nussknacker.engine.testing.LocalModelData @@ -72,6 +74,7 @@ class K8sDeploymentManagerOnMocksTest DeploymentManagerDependencies( new ProcessingTypeDeployedScenariosProviderStub(List.empty), new ProcessingTypeActionServiceStub, + NoOpScenarioActivityManager, system.dispatcher, system, SttpBackendStub.asynchronousFuture diff --git a/extensions-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/ScenarioActivity.scala b/extensions-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/ScenarioActivity.scala index 592bc1272a5..19df447e84a 100644 --- a/extensions-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/ScenarioActivity.scala +++ b/extensions-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/ScenarioActivity.scala @@ -1,14 +1,22 @@ package pl.touk.nussknacker.engine.api.deployment +import enumeratum.EnumEntry.UpperSnakecase +import enumeratum.{Enum, EnumEntry} import pl.touk.nussknacker.engine.api.component.ProcessingMode +import pl.touk.nussknacker.engine.api.process.VersionId import java.time.Instant import java.util.UUID +import scala.collection.immutable final case class ScenarioId(value: Long) extends AnyVal final case class ScenarioVersionId(value: Long) extends AnyVal +object ScenarioVersionId { + def from(versionId: VersionId): ScenarioVersionId = ScenarioVersionId(versionId.value) +} + final case class ScenarioActivityId(value: UUID) extends AnyVal object ScenarioActivityId { @@ -22,6 +30,10 @@ final case class ScenarioUser( impersonatedByUserName: Option[UserName], ) +object ScenarioUser { + val internalNuUser: ScenarioUser = ScenarioUser(None, UserName("Nussknacker"), None, None) +} + final case class UserId(value: String) final case class UserName(value: String) @@ -65,6 +77,24 @@ object ScenarioAttachment { final case class Environment(name: String) extends AnyVal +sealed trait ScheduledExecutionStatus extends EnumEntry with UpperSnakecase + +object ScheduledExecutionStatus extends Enum[ScheduledExecutionStatus] { + case object Scheduled extends ScheduledExecutionStatus + + case object Deployed extends ScheduledExecutionStatus + + case object Finished extends ScheduledExecutionStatus + + case object Failed extends ScheduledExecutionStatus + + case object DeploymentWillBeRetried extends ScheduledExecutionStatus + + case object DeploymentFailed extends ScheduledExecutionStatus + + override def values: immutable.IndexedSeq[ScheduledExecutionStatus] = findValues +} + sealed trait ScenarioActivity { def scenarioId: ScenarioId def scenarioActivityId: ScenarioActivityId @@ -210,6 +240,7 @@ object ScenarioActivity { scenarioVersionId: Option[ScenarioVersionId], comment: ScenarioComment, dateFinished: Option[Instant], + status: Option[String], errorMessage: Option[String], ) extends ScenarioActivity @@ -220,7 +251,11 @@ object ScenarioActivity { date: Instant, scenarioVersionId: Option[ScenarioVersionId], dateFinished: Option[Instant], - errorMessage: Option[String], + scheduleName: String, + status: ScheduledExecutionStatus, + createdAt: Instant, + nextRetryAt: Option[Instant], + retriesLeft: Option[Int], ) extends ScenarioActivity // Other/technical