diff --git a/.scalafmt.conf b/.scalafmt.conf index afe4634..9255a9d 100644 --- a/.scalafmt.conf +++ b/.scalafmt.conf @@ -1,4 +1,4 @@ -version = 3.5.0 +version = 3.5.1 runner.dialect = "scala213source3" diff --git a/build.sbt b/build.sbt index f638422..6247c3a 100644 --- a/build.sbt +++ b/build.sbt @@ -7,14 +7,15 @@ val awsVersion = "2.17.+" val awsEc2 = "software.amazon.awssdk" % "ec2" % awsVersion val awsEmr = "software.amazon.awssdk" % "emr" % awsVersion val awsSsm = "software.amazon.awssdk" % "ssm" % awsVersion -val slick = "com.typesafe.slick" %% "slick" % slickVersion -val slickHikaricp = "com.typesafe.slick" %% "slick-hikaricp" % slickVersion -val postgresql = "org.postgresql" % "postgresql" % "42.3.3" -val playJson = "com.typesafe.play" %% "play-json" % playJsonVersion -val akkaActor = "com.typesafe.akka" %% "akka-actor-typed" % akkaVersion -val akkaTestkit = "com.typesafe.akka" %% "akka-actor-testkit-typed" % akkaVersion % Test +val slick = "com.typesafe.slick" %% "slick" % slickVersion +val slickHikaricp = "com.typesafe.slick" %% "slick-hikaricp" % slickVersion +val postgresql = "org.postgresql" % "postgresql" % "42.3.3" +val playJson = "com.typesafe.play" %% "play-json" % playJsonVersion +val akkaActor = "com.typesafe.akka" %% "akka-actor-typed" % akkaVersion +val akkaTestkit = "com.typesafe.akka" %% "akka-actor-testkit-typed" % akkaVersion % Test val scalaTestArtifact = "org.scalatest" %% "scalatest" % "3.2.11" % Test val scalaPlusPlay = "org.scalatestplus.play" %% "scalatestplus-play" % "5.1.0" % Test +val logback = "ch.qos.logback" % "logback-classic" % "1.2.11" % Test lazy val commonSettings = Seq( scalacOptions ++= Seq("-deprecation", "-feature", "-Xlint"), // , "-Xfatal-warnings"), @@ -50,7 +51,8 @@ lazy val orchardCore = (project in file("orchard-core")). postgresql, playJson, akkaActor, - akkaTestkit + akkaTestkit, + logback ) ) diff --git a/orchard-core/src/main/scala/com/salesforce/mce/orchard/system/actor/ResourceInstance.scala b/orchard-core/src/main/scala/com/salesforce/mce/orchard/system/actor/ResourceInstance.scala index 2f40057..65ec114 100644 --- a/orchard-core/src/main/scala/com/salesforce/mce/orchard/system/actor/ResourceInstance.scala +++ b/orchard-core/src/main/scala/com/salesforce/mce/orchard/system/actor/ResourceInstance.scala @@ -31,6 +31,7 @@ object ResourceInstance { database: OrchardDatabase, query: ResourceInstanceQuery, resourceIO: ResourceIO, + instanceId: Int, timers: TimerScheduler[ResourceInstance.Msg] ) @@ -56,6 +57,7 @@ object ResourceInstance { database, query, resourceIO, + instanceId, timers ) @@ -73,6 +75,7 @@ object ResourceInstance { } } + // pending is the initial state after RI is created private def pending(ps: Params): Behavior[Msg] = Behaviors.receiveMessage { case GetResourceInstSpec(replyTo) => ps.ctx.log.info(s"${ps.ctx.self} (pending) received GetResourceInstSpec($replyTo)") @@ -85,13 +88,12 @@ object ResourceInstance { case Left(exp) => ps.ctx.log.error(s"${ps.ctx.self} (pending) Exception when creating resource", exp) ps.database.sync(ps.query.setTerminated(Status.Failed, exp.getMessage())) - replyTo ! ResourceMgr.ResourceInstSpecRsp(Left(Status.Pending)) - terminate(ps.resourceMgr, Status.Failed) + terminate(ps, Status.Failed, Option(replyTo)) } case Shutdown(status) => ps.ctx.log.info(s"${ps.ctx.self} (pending) received Shutdown($status)") ps.database.sync(ps.query.setTerminated(status, "")) - terminate(ps.resourceMgr, status) + terminate(ps, status, None) } private def activating(ps: Params, instSpec: JsValue): Behavior[Msg] = Behaviors.receiveMessage { @@ -111,16 +113,14 @@ object ResourceInstance { ps.ctx.log.info( s"${ps.ctx.self} (activating) received resource status $sts, shutting down" ) - replyTo ! ResourceMgr.ResourceInstSpecRsp(Left(sts)) - shuttindDown(ps, instSpec, sts) + shuttingDown(ps, instSpec, sts, Option(replyTo)) case Left(exp) => ps.ctx.log.info(s"${ps.ctx.self} (activating) received resource exception, shutting down") - replyTo ! ResourceMgr.ResourceInstSpecRsp(Left(Status.Failed)) - shuttindDown(ps, instSpec, Status.Failed) + shuttingDown(ps, instSpec, Status.Failed, Option(replyTo)) } case Shutdown(status) => ps.ctx.log.info(s"${ps.ctx.self} (activating) received Shutdown($status)") - shuttindDown(ps, instSpec, Status.Failed) + shuttingDown(ps, instSpec, Status.Failed, None) } private def running(ps: Params, instSpec: JsValue): Behavior[Msg] = Behaviors.receiveMessage { @@ -131,17 +131,21 @@ object ResourceInstance { replyTo ! ResourceMgr.ResourceInstSpecRsp(Right(instSpec)) Behaviors.same case sts => - ps.ctx.log.error(s"${ps.ctx.self} (running) Unexpected resource status $sts") + ps.ctx.log.error(s"${ps.ctx.self} (running) UNEXPECTED resource status $sts") ps.database.sync(ps.query.setTerminated(Status.Failed, "")) - replyTo ! ResourceMgr.ResourceInstSpecRsp(Left(Status.Failed)) - terminate(ps.resourceMgr, Status.Failed) + terminate(ps, Status.Failed, Option(replyTo)) } case Shutdown(status) => ps.ctx.log.info(s"${ps.ctx.self} (running) received Shutdown($status)") - shuttindDown(ps, instSpec, status) + shuttingDown(ps, instSpec, status, None) } - private def shuttindDown(ps: Params, instSpec: JsValue, status: Status.Value): Behavior[Msg] = { + private def shuttingDown( + ps: Params, + instSpec: JsValue, + status: Status.Value, + replyTo: Option[ActorRef[ResourceMgr.ResourceInstSpecRsp]] + ): Behavior[Msg] = { val errorMsg = ps.resourceIO.terminate(instSpec) match { case Right(sts) => "" @@ -152,15 +156,33 @@ object ResourceInstance { exp.getMessage() } ps.database.sync(ps.query.setTerminated(status, errorMsg)) - terminate(ps.resourceMgr, status) + terminate(ps, status, replyTo) + } + + // We should not terminate resource instance actor as it may need to respond to pending queries + // from activity attempts in the mailbox + private def inactive(ps: Params, status: Status.Value): Behavior[Msg] = Behaviors.receiveMessage { + case GetResourceInstSpec(replyTo) => + ps.ctx.log.info(s"${ps.ctx.self} (inactive) received GetResourceInstSpec($replyTo)") + ps.resourceMgr ! ResourceMgr.InactiveResourceInstance(ps.instanceId, status, replyTo) + Behaviors.same + case Shutdown(sts) => + ps.ctx.log.info(s"${ps.ctx.self} (inactive) received Shutdown($sts)") + Behaviors.same } private def terminate( - resourceMgr: ActorRef[ResourceMgr.Msg], - status: Status.Value + ps: Params, + status: Status.Value, + replyTo: Option[ActorRef[ResourceMgr.ResourceInstSpecRsp]] ): Behavior[Msg] = { - resourceMgr ! ResourceMgr.ResourceInstanceFinished(status) - Behaviors.stopped + replyTo match { + case Some(r) => + ps.resourceMgr ! ResourceMgr.InactiveResourceInstance(ps.instanceId, status, r) + case None => + ps.resourceMgr ! ResourceMgr.ResourceInstanceFinished(status) + } + inactive(ps, status) } } diff --git a/orchard-core/src/main/scala/com/salesforce/mce/orchard/system/actor/ResourceMgr.scala b/orchard-core/src/main/scala/com/salesforce/mce/orchard/system/actor/ResourceMgr.scala index 75cec1c..0d564b3 100644 --- a/orchard-core/src/main/scala/com/salesforce/mce/orchard/system/actor/ResourceMgr.scala +++ b/orchard-core/src/main/scala/com/salesforce/mce/orchard/system/actor/ResourceMgr.scala @@ -23,6 +23,11 @@ object ResourceMgr { case class GetResourceInstSpec(replyTo: ActorRef[ResourceInstSpecRsp]) extends Msg case class ResourceInstSpecRsp(spec: Either[Status.Value, JsValue]) case class ResourceInstanceFinished(status: Status.Value) extends Msg + case class InactiveResourceInstance( + instanceId: Int, + status: Status.Value, + replyTo: ActorRef[ResourceInstSpecRsp] + ) extends Msg // the shutdown call from WorkflowMgr, ResourceMgr should never shutdown unless told by WFMgr case class Shutdown(status: Status.Value) extends Msg @@ -120,11 +125,15 @@ object ResourceMgr { running(ps, rscInst, instId) } - case ResourceInstanceFinished(status) => - ps.ctx.log.error( - s"${ps.ctx.self} (idle) received unexpected ResourceInstanceFinished($status)" - ) - Behaviors.stopped + // no resource instance should exist yet, this is unexpected + case msg: ResourceInstanceFinished => + ps.ctx.log.error(s"${ps.ctx.self} (idle) received UNEXPECTED $msg") + Behaviors.unhandled + + // no resource instance should exist yet, this is unexpected + case msg: InactiveResourceInstance => + ps.ctx.log.error(s"${ps.ctx.self} (idle) received UNEXPECTED $msg") + Behaviors.unhandled case Shutdown(status) => ps.ctx.log.info(s"${ps.ctx.self} (idle) received Shutdown($status)") @@ -136,32 +145,43 @@ object ResourceMgr { def running( ps: Params, resourceInst: ActorRef[ResourceInstance.Msg], - instId: Int + currentInstId: Int ): Behavior[Msg] = Behaviors.receiveMessage { case GetResourceInstSpec(replyTo) => ps.ctx.log.info(s"${ps.ctx.self} (running) received GetResourceInstSpec($replyTo)") resourceInst ! ResourceInstance.GetResourceInstSpec(replyTo) Behaviors.same - // resource should not receive finished status during "running" state, so we will keep the - // resource manager up. - case ResourceInstanceFinished(Status.Finished) => + // resource should not receive finished status during "running" state. + case ResourceInstanceFinished(status) => + ps.ctx.log.error( + s"${ps.ctx.self} (running) received UNEXPECTED ResourceInstanceFinished($status, None)" + ) + Behaviors.unhandled + case InactiveResourceInstance(instId, status, replyTo) => ps.ctx.log.info( - s"${ps.ctx.self} (running) received ResourceInstanceFinished(Status.Finished)" + s"${ps.ctx.self} (running) received InactiveResourceInstance($instId, $status, $replyTo)" ) - ps.database.sync(ps.resourceQuery.setTerminated(Status.Finished)) - finished(ps, Status.Finished) - case ResourceInstanceFinished(failureStatus) => - ps.ctx.log.info(s"${ps.ctx.self} (running) received ResourceInstanceFinished($failureStatus)") - if (instId >= ps.maxAttempt) { + // in case resource is terminated (normally) by external entities + val failureStatus = if (status == Status.Finished) Status.Failed else status + + // maybe the current instance is already a new one + if (instId < currentInstId) { + ps.ctx.self ! GetResourceInstSpec(replyTo) + Behaviors.same + } else if (currentInstId >= ps.maxAttempt) { ps.database.sync(ps.resourceQuery.setTerminated(failureStatus)) + replyTo ! ResourceInstSpecRsp(Left(failureStatus)) finished(ps, failureStatus) } else { - val newInstId = instId + 1 + val newInstId = currentInstId + 1 + // create a new instance upon failure and deligate the response to the new instance spawnResourceInstance(ps.ctx, ps.database, ps, newInstId) match { case Left(sts) => ps.database.sync(ps.resourceQuery.setTerminated(sts)) + replyTo ! ResourceInstSpecRsp(Left(failureStatus)) finished(ps, sts) case Right(rscInst) => + ps.ctx.self ! GetResourceInstSpec(replyTo) running(ps, rscInst, newInstId) } } @@ -171,6 +191,8 @@ object ResourceMgr { terminating(ps.ctx, ps, status) } + // finished status is when resource instance creation failed, we set resource manager in a state + // that it can still handle incoming calls, but won't create new instance anymore def finished( ps: Params, status: Status.Value @@ -181,8 +203,14 @@ object ResourceMgr { Behaviors.same case ResourceInstanceFinished(sts) => ps.ctx.log.error( - s"${ps.ctx.self} (finished) received unexpected ResourceInstanceFinished($sts)" + s"${ps.ctx.self} (finished) received UNEXPECTED ResourceInstanceFinished($sts)" + ) + Behaviors.same + case InactiveResourceInstance(instanceId, sts, replyTo) => + ps.ctx.log.info( + s"${ps.ctx.self} (finished) received InactiveResourceInstance($instanceId, $sts, $replyTo)" ) + ps.ctx.self ! GetResourceInstSpec(replyTo) Behaviors.same case Shutdown(_) => ps.ctx.log.info(s"${ps.ctx.self} (finished) received Shutdown(_)") @@ -198,10 +226,16 @@ object ResourceMgr { ps.ctx.log.info(s"${ps.ctx.self} (terminating) received GetResourceInstSpec(${replyTo})") replyTo ! ResourceInstSpecRsp(Left(status)) Behaviors.same - case ResourceInstanceFinished(_) => - ps.ctx.log.info(s"${ps.ctx.self} (terminating) received ResourceInstanceFinished(_)") + case ResourceInstanceFinished(sts) => + ps.ctx.log.info(s"${ps.ctx.self} (terminating) received ResourceInstanceFinished($sts)") ps.database.sync(ps.resourceQuery.setTerminated(status)) terminate(ps, status) + case InactiveResourceInstance(instId, sts, replyTo) => + ps.ctx.log.info( + s"${ps.ctx.self} (terminating) received InactiveResourceInstance($instId, $sts, $replyTo)" + ) + ctx.self ! GetResourceInstSpec(replyTo) + Behaviors.same case Shutdown(_) => Behaviors.same } diff --git a/version.sbt b/version.sbt index 2d03e09..eb12e8f 100644 --- a/version.sbt +++ b/version.sbt @@ -1 +1 @@ -ThisBuild / version := "0.3.1" +ThisBuild / version := "0.3.2"