Skip to content

Commit

Permalink
(fixes #2) make sure resources are retried on external failures (#9)
Browse files Browse the repository at this point in the history
  • Loading branch information
realstraw authored Apr 8, 2022
1 parent c8dd26e commit e695bde
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 46 deletions.
2 changes: 1 addition & 1 deletion .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version = 3.5.0
version = 3.5.1

runner.dialect = "scala213source3"

Expand Down
16 changes: 9 additions & 7 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@ val awsVersion = "2.17.+"
val awsEc2 = "software.amazon.awssdk" % "ec2" % awsVersion
val awsEmr = "software.amazon.awssdk" % "emr" % awsVersion
val awsSsm = "software.amazon.awssdk" % "ssm" % awsVersion
val slick = "com.typesafe.slick" %% "slick" % slickVersion
val slickHikaricp = "com.typesafe.slick" %% "slick-hikaricp" % slickVersion
val postgresql = "org.postgresql" % "postgresql" % "42.3.3"
val playJson = "com.typesafe.play" %% "play-json" % playJsonVersion
val akkaActor = "com.typesafe.akka" %% "akka-actor-typed" % akkaVersion
val akkaTestkit = "com.typesafe.akka" %% "akka-actor-testkit-typed" % akkaVersion % Test
val slick = "com.typesafe.slick" %% "slick" % slickVersion
val slickHikaricp = "com.typesafe.slick" %% "slick-hikaricp" % slickVersion
val postgresql = "org.postgresql" % "postgresql" % "42.3.3"
val playJson = "com.typesafe.play" %% "play-json" % playJsonVersion
val akkaActor = "com.typesafe.akka" %% "akka-actor-typed" % akkaVersion
val akkaTestkit = "com.typesafe.akka" %% "akka-actor-testkit-typed" % akkaVersion % Test
val scalaTestArtifact = "org.scalatest" %% "scalatest" % "3.2.11" % Test
val scalaPlusPlay = "org.scalatestplus.play" %% "scalatestplus-play" % "5.1.0" % Test
val logback = "ch.qos.logback" % "logback-classic" % "1.2.11" % Test

lazy val commonSettings = Seq(
scalacOptions ++= Seq("-deprecation", "-feature", "-Xlint"), // , "-Xfatal-warnings"),
Expand Down Expand Up @@ -50,7 +51,8 @@ lazy val orchardCore = (project in file("orchard-core")).
postgresql,
playJson,
akkaActor,
akkaTestkit
akkaTestkit,
logback
)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ object ResourceInstance {
database: OrchardDatabase,
query: ResourceInstanceQuery,
resourceIO: ResourceIO,
instanceId: Int,
timers: TimerScheduler[ResourceInstance.Msg]
)

Expand All @@ -56,6 +57,7 @@ object ResourceInstance {
database,
query,
resourceIO,
instanceId,
timers
)

Expand All @@ -73,6 +75,7 @@ object ResourceInstance {
}
}

// pending is the initial state after RI is created
private def pending(ps: Params): Behavior[Msg] = Behaviors.receiveMessage {
case GetResourceInstSpec(replyTo) =>
ps.ctx.log.info(s"${ps.ctx.self} (pending) received GetResourceInstSpec($replyTo)")
Expand All @@ -85,13 +88,12 @@ object ResourceInstance {
case Left(exp) =>
ps.ctx.log.error(s"${ps.ctx.self} (pending) Exception when creating resource", exp)
ps.database.sync(ps.query.setTerminated(Status.Failed, exp.getMessage()))
replyTo ! ResourceMgr.ResourceInstSpecRsp(Left(Status.Pending))
terminate(ps.resourceMgr, Status.Failed)
terminate(ps, Status.Failed, Option(replyTo))
}
case Shutdown(status) =>
ps.ctx.log.info(s"${ps.ctx.self} (pending) received Shutdown($status)")
ps.database.sync(ps.query.setTerminated(status, ""))
terminate(ps.resourceMgr, status)
terminate(ps, status, None)
}

private def activating(ps: Params, instSpec: JsValue): Behavior[Msg] = Behaviors.receiveMessage {
Expand All @@ -111,16 +113,14 @@ object ResourceInstance {
ps.ctx.log.info(
s"${ps.ctx.self} (activating) received resource status $sts, shutting down"
)
replyTo ! ResourceMgr.ResourceInstSpecRsp(Left(sts))
shuttindDown(ps, instSpec, sts)
shuttingDown(ps, instSpec, sts, Option(replyTo))
case Left(exp) =>
ps.ctx.log.info(s"${ps.ctx.self} (activating) received resource exception, shutting down")
replyTo ! ResourceMgr.ResourceInstSpecRsp(Left(Status.Failed))
shuttindDown(ps, instSpec, Status.Failed)
shuttingDown(ps, instSpec, Status.Failed, Option(replyTo))
}
case Shutdown(status) =>
ps.ctx.log.info(s"${ps.ctx.self} (activating) received Shutdown($status)")
shuttindDown(ps, instSpec, Status.Failed)
shuttingDown(ps, instSpec, Status.Failed, None)
}

private def running(ps: Params, instSpec: JsValue): Behavior[Msg] = Behaviors.receiveMessage {
Expand All @@ -131,17 +131,21 @@ object ResourceInstance {
replyTo ! ResourceMgr.ResourceInstSpecRsp(Right(instSpec))
Behaviors.same
case sts =>
ps.ctx.log.error(s"${ps.ctx.self} (running) Unexpected resource status $sts")
ps.ctx.log.error(s"${ps.ctx.self} (running) UNEXPECTED resource status $sts")
ps.database.sync(ps.query.setTerminated(Status.Failed, ""))
replyTo ! ResourceMgr.ResourceInstSpecRsp(Left(Status.Failed))
terminate(ps.resourceMgr, Status.Failed)
terminate(ps, Status.Failed, Option(replyTo))
}
case Shutdown(status) =>
ps.ctx.log.info(s"${ps.ctx.self} (running) received Shutdown($status)")
shuttindDown(ps, instSpec, status)
shuttingDown(ps, instSpec, status, None)
}

private def shuttindDown(ps: Params, instSpec: JsValue, status: Status.Value): Behavior[Msg] = {
private def shuttingDown(
ps: Params,
instSpec: JsValue,
status: Status.Value,
replyTo: Option[ActorRef[ResourceMgr.ResourceInstSpecRsp]]
): Behavior[Msg] = {
val errorMsg = ps.resourceIO.terminate(instSpec) match {
case Right(sts) =>
""
Expand All @@ -152,15 +156,33 @@ object ResourceInstance {
exp.getMessage()
}
ps.database.sync(ps.query.setTerminated(status, errorMsg))
terminate(ps.resourceMgr, status)
terminate(ps, status, replyTo)
}

// We should not terminate resource instance actor as it may need to respond to pending queries
// from activity attempts in the mailbox
private def inactive(ps: Params, status: Status.Value): Behavior[Msg] = Behaviors.receiveMessage {
case GetResourceInstSpec(replyTo) =>
ps.ctx.log.info(s"${ps.ctx.self} (inactive) received GetResourceInstSpec($replyTo)")
ps.resourceMgr ! ResourceMgr.InactiveResourceInstance(ps.instanceId, status, replyTo)
Behaviors.same
case Shutdown(sts) =>
ps.ctx.log.info(s"${ps.ctx.self} (inactive) received Shutdown($sts)")
Behaviors.same
}

private def terminate(
resourceMgr: ActorRef[ResourceMgr.Msg],
status: Status.Value
ps: Params,
status: Status.Value,
replyTo: Option[ActorRef[ResourceMgr.ResourceInstSpecRsp]]
): Behavior[Msg] = {
resourceMgr ! ResourceMgr.ResourceInstanceFinished(status)
Behaviors.stopped
replyTo match {
case Some(r) =>
ps.resourceMgr ! ResourceMgr.InactiveResourceInstance(ps.instanceId, status, r)
case None =>
ps.resourceMgr ! ResourceMgr.ResourceInstanceFinished(status)
}
inactive(ps, status)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ object ResourceMgr {
case class GetResourceInstSpec(replyTo: ActorRef[ResourceInstSpecRsp]) extends Msg
case class ResourceInstSpecRsp(spec: Either[Status.Value, JsValue])
case class ResourceInstanceFinished(status: Status.Value) extends Msg
case class InactiveResourceInstance(
instanceId: Int,
status: Status.Value,
replyTo: ActorRef[ResourceInstSpecRsp]
) extends Msg
// the shutdown call from WorkflowMgr, ResourceMgr should never shutdown unless told by WFMgr
case class Shutdown(status: Status.Value) extends Msg

Expand Down Expand Up @@ -120,11 +125,15 @@ object ResourceMgr {
running(ps, rscInst, instId)
}

case ResourceInstanceFinished(status) =>
ps.ctx.log.error(
s"${ps.ctx.self} (idle) received unexpected ResourceInstanceFinished($status)"
)
Behaviors.stopped
// no resource instance should exist yet, this is unexpected
case msg: ResourceInstanceFinished =>
ps.ctx.log.error(s"${ps.ctx.self} (idle) received UNEXPECTED $msg")
Behaviors.unhandled

// no resource instance should exist yet, this is unexpected
case msg: InactiveResourceInstance =>
ps.ctx.log.error(s"${ps.ctx.self} (idle) received UNEXPECTED $msg")
Behaviors.unhandled

case Shutdown(status) =>
ps.ctx.log.info(s"${ps.ctx.self} (idle) received Shutdown($status)")
Expand All @@ -136,32 +145,43 @@ object ResourceMgr {
def running(
ps: Params,
resourceInst: ActorRef[ResourceInstance.Msg],
instId: Int
currentInstId: Int
): Behavior[Msg] = Behaviors.receiveMessage {
case GetResourceInstSpec(replyTo) =>
ps.ctx.log.info(s"${ps.ctx.self} (running) received GetResourceInstSpec($replyTo)")
resourceInst ! ResourceInstance.GetResourceInstSpec(replyTo)
Behaviors.same
// resource should not receive finished status during "running" state, so we will keep the
// resource manager up.
case ResourceInstanceFinished(Status.Finished) =>
// resource should not receive finished status during "running" state.
case ResourceInstanceFinished(status) =>
ps.ctx.log.error(
s"${ps.ctx.self} (running) received UNEXPECTED ResourceInstanceFinished($status, None)"
)
Behaviors.unhandled
case InactiveResourceInstance(instId, status, replyTo) =>
ps.ctx.log.info(
s"${ps.ctx.self} (running) received ResourceInstanceFinished(Status.Finished)"
s"${ps.ctx.self} (running) received InactiveResourceInstance($instId, $status, $replyTo)"
)
ps.database.sync(ps.resourceQuery.setTerminated(Status.Finished))
finished(ps, Status.Finished)
case ResourceInstanceFinished(failureStatus) =>
ps.ctx.log.info(s"${ps.ctx.self} (running) received ResourceInstanceFinished($failureStatus)")
if (instId >= ps.maxAttempt) {
// in case resource is terminated (normally) by external entities
val failureStatus = if (status == Status.Finished) Status.Failed else status

// maybe the current instance is already a new one
if (instId < currentInstId) {
ps.ctx.self ! GetResourceInstSpec(replyTo)
Behaviors.same
} else if (currentInstId >= ps.maxAttempt) {
ps.database.sync(ps.resourceQuery.setTerminated(failureStatus))
replyTo ! ResourceInstSpecRsp(Left(failureStatus))
finished(ps, failureStatus)
} else {
val newInstId = instId + 1
val newInstId = currentInstId + 1
// create a new instance upon failure and deligate the response to the new instance
spawnResourceInstance(ps.ctx, ps.database, ps, newInstId) match {
case Left(sts) =>
ps.database.sync(ps.resourceQuery.setTerminated(sts))
replyTo ! ResourceInstSpecRsp(Left(failureStatus))
finished(ps, sts)
case Right(rscInst) =>
ps.ctx.self ! GetResourceInstSpec(replyTo)
running(ps, rscInst, newInstId)
}
}
Expand All @@ -171,6 +191,8 @@ object ResourceMgr {
terminating(ps.ctx, ps, status)
}

// finished status is when resource instance creation failed, we set resource manager in a state
// that it can still handle incoming calls, but won't create new instance anymore
def finished(
ps: Params,
status: Status.Value
Expand All @@ -181,8 +203,14 @@ object ResourceMgr {
Behaviors.same
case ResourceInstanceFinished(sts) =>
ps.ctx.log.error(
s"${ps.ctx.self} (finished) received unexpected ResourceInstanceFinished($sts)"
s"${ps.ctx.self} (finished) received UNEXPECTED ResourceInstanceFinished($sts)"
)
Behaviors.same
case InactiveResourceInstance(instanceId, sts, replyTo) =>
ps.ctx.log.info(
s"${ps.ctx.self} (finished) received InactiveResourceInstance($instanceId, $sts, $replyTo)"
)
ps.ctx.self ! GetResourceInstSpec(replyTo)
Behaviors.same
case Shutdown(_) =>
ps.ctx.log.info(s"${ps.ctx.self} (finished) received Shutdown(_)")
Expand All @@ -198,10 +226,16 @@ object ResourceMgr {
ps.ctx.log.info(s"${ps.ctx.self} (terminating) received GetResourceInstSpec(${replyTo})")
replyTo ! ResourceInstSpecRsp(Left(status))
Behaviors.same
case ResourceInstanceFinished(_) =>
ps.ctx.log.info(s"${ps.ctx.self} (terminating) received ResourceInstanceFinished(_)")
case ResourceInstanceFinished(sts) =>
ps.ctx.log.info(s"${ps.ctx.self} (terminating) received ResourceInstanceFinished($sts)")
ps.database.sync(ps.resourceQuery.setTerminated(status))
terminate(ps, status)
case InactiveResourceInstance(instId, sts, replyTo) =>
ps.ctx.log.info(
s"${ps.ctx.self} (terminating) received InactiveResourceInstance($instId, $sts, $replyTo)"
)
ctx.self ! GetResourceInstSpec(replyTo)
Behaviors.same
case Shutdown(_) =>
Behaviors.same
}
Expand Down
2 changes: 1 addition & 1 deletion version.sbt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
ThisBuild / version := "0.3.1"
ThisBuild / version := "0.3.2"

0 comments on commit e695bde

Please sign in to comment.