Skip to content

Commit

Permalink
[NU-1772] Scenario activities BE - phase 3 - periodic deployment acti…
Browse files Browse the repository at this point in the history
…vities (#6971)
  • Loading branch information
mgoworko authored Oct 9, 2024
1 parent f8a9fe4 commit 4607243
Show file tree
Hide file tree
Showing 42 changed files with 787 additions and 148 deletions.
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
package pl.touk.nussknacker.engine

import akka.actor.ActorSystem
import pl.touk.nussknacker.engine.api.deployment.{ProcessingTypeActionService, ProcessingTypeDeployedScenariosProvider}
import pl.touk.nussknacker.engine.api.deployment.{
ProcessingTypeActionService,
ProcessingTypeDeployedScenariosProvider,
ScenarioActivityManager
}
import sttp.client3.SttpBackend

import scala.concurrent.{ExecutionContext, Future}

case class DeploymentManagerDependencies(
deployedScenariosProvider: ProcessingTypeDeployedScenariosProvider,
actionService: ProcessingTypeActionService,
scenarioActivityManager: ScenarioActivityManager,
executionContext: ExecutionContext,
actorSystem: ActorSystem,
sttpBackend: SttpBackend[Future, Any],
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package pl.touk.nussknacker.engine.api.deployment

import pl.touk.nussknacker.engine.api.deployment.ScenarioActivityHandling.AllScenarioActivitiesStoredByNussknacker
import pl.touk.nussknacker.engine.api.deployment.simple.SimpleProcessStateDefinitionManager
import pl.touk.nussknacker.engine.deployment.CustomActionDefinition

Expand All @@ -11,4 +12,6 @@ trait BaseDeploymentManager extends DeploymentManager {

override def customActionsDefinitions: List[CustomActionDefinition] = List.empty

override def scenarioActivityHandling: ScenarioActivityHandling = AllScenarioActivitiesStoredByNussknacker

}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ trait DeploymentManager extends AutoCloseable {

def customActionsDefinitions: List[CustomActionDefinition]

def scenarioActivityHandling: ScenarioActivityHandling

protected final def notImplemented: Future[Nothing] =
Future.failed(new NotImplementedError())

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package pl.touk.nussknacker.engine.api.deployment

import pl.touk.nussknacker.engine.api.process.ProcessIdWithName

import scala.concurrent.Future

sealed trait ScenarioActivityHandling

object ScenarioActivityHandling {

case object AllScenarioActivitiesStoredByNussknacker extends ScenarioActivityHandling

trait ManagerSpecificScenarioActivitiesStoredByManager extends ScenarioActivityHandling {

def managerSpecificScenarioActivities(
processIdWithName: ProcessIdWithName
): Future[List[ScenarioActivity]]

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package pl.touk.nussknacker.engine.api.deployment

import pl.touk.nussknacker.engine.api.deployment.ScenarioActivityManager.ModificationResult

import scala.concurrent.Future

trait ScenarioActivityManager {

def saveActivity(
scenarioActivity: ScenarioActivity
): Future[Unit]

def modifyActivity(
scenarioActivityId: ScenarioActivityId,
modify: ScenarioActivity => ScenarioActivity,
): Future[ModificationResult]

}

object ScenarioActivityManager {
sealed trait ModificationResult

object ModificationResult {
case object Success extends ModificationResult
case object Failure extends ModificationResult
}

}

object NoOpScenarioActivityManager extends ScenarioActivityManager {

def saveActivity(
scenarioActivity: ScenarioActivity
): Future[Unit] = Future.unit

def modifyActivity(
scenarioActivityId: ScenarioActivityId,
modify: ScenarioActivity => ScenarioActivity,
): Future[ModificationResult] = Future.successful(ModificationResult.Success)

}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ class CachingProcessStateDeploymentManager(

override def customActionsDefinitions: List[CustomActionDefinition] = delegate.customActionsDefinitions

override def scenarioActivityHandling: ScenarioActivityHandling = delegate.scenarioActivityHandling

override def close(): Unit = delegate.close()

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ package pl.touk.nussknacker.engine.testing

import cats.data.{Validated, ValidatedNel}
import com.typesafe.config.Config
import pl.touk.nussknacker.engine.api.StreamMetaData
import pl.touk.nussknacker.engine.api.component.ScenarioPropertyConfig
import pl.touk.nussknacker.engine.api.definition._
import pl.touk.nussknacker.engine.api.deployment.ScenarioActivityHandling.AllScenarioActivitiesStoredByNussknacker
import pl.touk.nussknacker.engine.api.deployment._
import pl.touk.nussknacker.engine.api.deployment.simple.{SimpleProcessStateDefinitionManager, SimpleStateStatus}
import pl.touk.nussknacker.engine.api.StreamMetaData
import pl.touk.nussknacker.engine.api.process.{ProcessIdWithName, ProcessName}
import pl.touk.nussknacker.engine.deployment.CustomActionDefinition
import pl.touk.nussknacker.engine.newdeployment
import pl.touk.nussknacker.engine.{
BaseModelData,
DeploymentManagerDependencies,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class RemoteEnvironmentResources(
scenarioActivityId = ScenarioActivityId.random,
user = user.scenarioUser,
date = clock.instant(),
scenarioVersionId = Some(ScenarioVersionId(details.processVersionId.value)),
scenarioVersionId = Some(ScenarioVersionId.from(details.processVersionId)),
destinationEnvironment = Environment(remoteEnvironment.environmentId)
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,17 @@ package pl.touk.nussknacker.ui.api

import cats.data.EitherT
import com.typesafe.scalalogging.LazyLogging
import pl.touk.nussknacker.engine.api.deployment.ScenarioActivityHandling.{
AllScenarioActivitiesStoredByNussknacker,
ManagerSpecificScenarioActivitiesStoredByManager
}
import pl.touk.nussknacker.engine.api.deployment.{
ScenarioActivity,
ScenarioActivityId,
ScenarioAttachment,
ScenarioComment
}
import pl.touk.nussknacker.engine.api.process.{ProcessId, ProcessName}
import pl.touk.nussknacker.engine.api.process.{ProcessId, ProcessIdWithName, ProcessName}
import pl.touk.nussknacker.security.Permission
import pl.touk.nussknacker.security.Permission.Permission
import pl.touk.nussknacker.ui.api.description.scenarioActivity.Dtos.ScenarioActivityError.{
Expand All @@ -19,6 +23,7 @@ import pl.touk.nussknacker.ui.api.description.scenarioActivity.Dtos.ScenarioActi
}
import pl.touk.nussknacker.ui.api.description.scenarioActivity.Dtos._
import pl.touk.nussknacker.ui.api.description.scenarioActivity.{Dtos, Endpoints}
import pl.touk.nussknacker.ui.process.deployment.DeploymentManagerDispatcher
import pl.touk.nussknacker.ui.process.repository.DBIOActionRunner
import pl.touk.nussknacker.ui.process.repository.activities.ScenarioActivityRepository
import pl.touk.nussknacker.ui.process.{ProcessService, ScenarioAttachmentService}
Expand All @@ -33,6 +38,7 @@ import scala.concurrent.{ExecutionContext, Future}

class ScenarioActivityApiHttpService(
authManager: AuthManager,
deploymentManagerDispatcher: DeploymentManagerDispatcher,
scenarioActivityRepository: ScenarioActivityRepository,
scenarioService: ProcessService,
scenarioAuthorizer: AuthorizeProcess,
Expand Down Expand Up @@ -115,7 +121,7 @@ class ScenarioActivityApiHttpService(
for {
scenarioId <- getScenarioIdByName(scenarioName)
_ <- isAuthorized(scenarioId, Permission.Read)
activities <- fetchActivities(scenarioId)
activities <- fetchActivities(ProcessIdWithName(scenarioId, scenarioName))
} yield ScenarioActivities(activities)
}
}
Expand Down Expand Up @@ -210,15 +216,27 @@ class ScenarioActivityApiHttpService(
)

private def fetchActivities(
scenarioId: ProcessId
): EitherT[Future, ScenarioActivityError, List[Dtos.ScenarioActivity]] =
EitherT
.right(
dbioActionRunner.run(
scenarioActivityRepository.findActivities(scenarioId)
)
)
.map(_.map(toDto).toList)
processIdWithName: ProcessIdWithName
)(implicit loggedUser: LoggedUser): EitherT[Future, ScenarioActivityError, List[Dtos.ScenarioActivity]] =
EitherT.right {
for {
generalActivities <- dbioActionRunner.run(scenarioActivityRepository.findActivities(processIdWithName.id))
deploymentManager <- deploymentManagerDispatcher.deploymentManager(processIdWithName)
deploymentManagerSpecificActivities <- deploymentManager match {
case Some(manager) =>
manager.scenarioActivityHandling match {
case AllScenarioActivitiesStoredByNussknacker =>
Future.successful(List.empty)
case handling: ManagerSpecificScenarioActivitiesStoredByManager =>
handling.managerSpecificScenarioActivities(processIdWithName)
}
case None =>
Future.successful(List.empty)
}
combinedActivities = (generalActivities ++ deploymentManagerSpecificActivities).map(toDto)
sortedCombinedActivities = combinedActivities.toList.sortBy(_.date)
} yield sortedCombinedActivities
}

private def toDto(scenarioComment: ScenarioComment): Dtos.ScenarioActivityComment = {
scenarioComment match {
Expand Down Expand Up @@ -389,7 +407,8 @@ class ScenarioActivityApiHttpService(
scenarioVersionId,
comment,
dateFinished,
errorMessage
status,
errorMessage,
) =>
Dtos.ScenarioActivity.forPerformedSingleExecution(
id = scenarioActivityId.value,
Expand All @@ -398,6 +417,7 @@ class ScenarioActivityApiHttpService(
scenarioVersionId = scenarioVersionId.map(_.value),
comment = toDto(comment),
dateFinished = dateFinished,
status = status,
errorMessage = errorMessage,
)
case ScenarioActivity.PerformedScheduledExecution(
Expand All @@ -407,15 +427,23 @@ class ScenarioActivityApiHttpService(
date,
scenarioVersionId,
dateFinished,
errorMessage
scheduleName,
status,
createdAt,
nextRetryAt,
retriesLeft,
) =>
Dtos.ScenarioActivity.forPerformedScheduledExecution(
id = scenarioActivityId.value,
user = user.name.value,
date = date,
scenarioVersionId = scenarioVersionId.map(_.value),
dateFinished = dateFinished,
errorMessage = errorMessage,
scheduleName = scheduleName,
status = status,
createdAt = createdAt,
retriesLeft = retriesLeft,
nextRetryAt = nextRetryAt,
)
case ScenarioActivity.AutomaticUpdate(
_,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import io.circe
import io.circe.generic.extras
import io.circe.generic.extras.semiauto.deriveConfiguredCodec
import io.circe.{Decoder, Encoder}
import pl.touk.nussknacker.engine.api.deployment.ScheduledExecutionStatus
import pl.touk.nussknacker.engine.api.process.{ProcessName, VersionId}
import pl.touk.nussknacker.engine.requestresponse.openapi.OApiDocumentation.dropNulls
import pl.touk.nussknacker.restmodel.BaseEndpointDefinitions
Expand Down Expand Up @@ -586,6 +587,7 @@ object Dtos {
scenarioVersionId: Option[Long],
comment: ScenarioActivityComment,
dateFinished: Option[Instant],
status: Option[String],
errorMessage: Option[String],
): ScenarioActivity = ScenarioActivity(
id = id,
Expand All @@ -596,6 +598,7 @@ object Dtos {
comment = Some(comment),
attachment = None,
additionalFields = List(
status.map(AdditionalField("status", _)),
dateFinished.map(date => AdditionalField("dateFinished", date.toString)),
errorMessage.map(e => AdditionalField("errorMessage", e)),
).flatten
Expand All @@ -607,20 +610,38 @@ object Dtos {
date: Instant,
scenarioVersionId: Option[Long],
dateFinished: Option[Instant],
errorMessage: Option[String],
): ScenarioActivity = ScenarioActivity(
id = id,
`type` = ScenarioActivityType.PerformedScheduledExecution,
user = user,
date = date,
scenarioVersionId = scenarioVersionId,
comment = None,
attachment = None,
additionalFields = List(
dateFinished.map(date => AdditionalField("dateFinished", date.toString)),
errorMessage.map(error => AdditionalField("errorMessage", error)),
).flatten
)
scheduleName: String,
status: ScheduledExecutionStatus,
createdAt: Instant,
nextRetryAt: Option[Instant],
retriesLeft: Option[Int],
): ScenarioActivity = {
val humanReadableStatus = status match {
case ScheduledExecutionStatus.Scheduled => "Scheduled"
case ScheduledExecutionStatus.Deployed => "Deployed"
case ScheduledExecutionStatus.Finished => "Execution finished"
case ScheduledExecutionStatus.Failed => "Execution failed"
case ScheduledExecutionStatus.DeploymentWillBeRetried => "Deployment will be retried"
case ScheduledExecutionStatus.DeploymentFailed => "Deployment failed"
}
ScenarioActivity(
id = id,
`type` = ScenarioActivityType.PerformedScheduledExecution,
user = user,
date = date,
scenarioVersionId = scenarioVersionId,
comment = None,
attachment = None,
additionalFields = List(
Some(AdditionalField("status", humanReadableStatus)),
Some(AdditionalField("createdAt", createdAt.toString)),
dateFinished.map(date => AdditionalField("dateFinished", date.toString)),
Some(AdditionalField("scheduleName", scheduleName)),
Some(AdditionalField("retriesLeft", retriesLeft.toString)),
nextRetryAt.map(nra => AdditionalField("nextRetryAt", nra.toString)),
).flatten
)
}

// Other/technical

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package pl.touk.nussknacker.ui.api.description.scenarioActivity

import pl.touk.nussknacker.engine.api.deployment.ScheduledExecutionStatus
import pl.touk.nussknacker.engine.api.process.ProcessName
import pl.touk.nussknacker.ui.api.description.scenarioActivity.Dtos.ScenarioActivityError.{
NoActivity,
Expand Down Expand Up @@ -183,6 +184,7 @@ object Examples {
lastModifiedAt = Instant.parse("2024-01-17T14:21:17Z")
),
dateFinished = Some(Instant.parse("2024-01-17T14:21:17Z")),
status = Some("IN_PROGRESS"),
errorMessage = Some("Execution error occurred"),
),
ScenarioActivity.forPerformedSingleExecution(
Expand All @@ -196,6 +198,7 @@ object Examples {
lastModifiedAt = Instant.parse("2024-01-17T14:21:17Z")
),
dateFinished = Some(Instant.parse("2024-01-17T14:21:17Z")),
status = Some("FAILED"),
errorMessage = None,
),
ScenarioActivity.forPerformedScheduledExecution(
Expand All @@ -204,7 +207,23 @@ object Examples {
date = Instant.parse("2024-01-17T14:21:17Z"),
scenarioVersionId = Some(1),
dateFinished = Some(Instant.parse("2024-01-17T14:21:17Z")),
errorMessage = None,
scheduleName = "main-schedule",
status = ScheduledExecutionStatus.Finished,
createdAt = Instant.parse("2024-01-17T13:21:17Z"),
retriesLeft = None,
nextRetryAt = None,
),
ScenarioActivity.forPerformedScheduledExecution(
id = UUID.fromString("9b27797e-aa03-42ba-8406-d0ae8005a883"),
user = "some user",
date = Instant.parse("2024-01-17T14:21:17Z"),
scenarioVersionId = Some(1),
dateFinished = Some(Instant.parse("2024-01-17T14:21:17Z")),
scheduleName = "main-schedule",
status = ScheduledExecutionStatus.DeploymentWillBeRetried,
createdAt = Instant.parse("2024-01-17T13:21:17Z"),
retriesLeft = Some(1),
nextRetryAt = Some(Instant.parse("2024-01-17T15:21:17Z")),
),
ScenarioActivity.forAutomaticUpdate(
id = UUID.fromString("33509d37-7657-4229-940f-b5736c82fb13"),
Expand Down
Loading

0 comments on commit 4607243

Please sign in to comment.