Skip to content

Commit

Permalink
add jitter to progress delay (#127)
Browse files Browse the repository at this point in the history
  • Loading branch information
crm-dhu authored Mar 11, 2024
1 parent 2678ae2 commit 74661ea
Show file tree
Hide file tree
Showing 9 changed files with 144 additions and 35 deletions.
37 changes: 22 additions & 15 deletions orchard-core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,32 @@ com.salesforce.mce.orchard {
// specified with HikariCP.
dataSourceClass = "org.postgresql.ds.PGSimpleDataSource"
properties = {
serverName = "db"
serverName = ${?POSTGRES_SERVER}
portNumber = "5432"
portNumber = ${?POSTGRES_PORT}
databaseName = "postgres"
databaseName = ${?POSTGRES_DBNAME}
user = "postgres"
user = ${?POSTGRES_USER}
password = ${?ORCHARD_PG_SECRET}
serverName = "db"
serverName = ${?POSTGRES_SERVER}
portNumber = "5432"
portNumber = ${?POSTGRES_PORT}
databaseName = "postgres"
databaseName = ${?POSTGRES_DBNAME}
user = "postgres"
user = ${?POSTGRES_USER}
password = ${?ORCHARD_PG_SECRET}
}
numThreads = 2
}
}

activity {
checkProgressDelay = 10 seconds
}
activity {
checkProgressDelay = 10 seconds
// An example of checkProgressDelay using jitteredDelay:
// checkProgressDelay = {
// jitteredDelay {
// minDelay = 10 seconds
// maxDelay = 20 seconds
// }
// }
}

resource {
reAttemptDelay = 10 seconds
}
resource {
reAttemptDelay = 10 seconds
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,30 @@ import scala.jdk.DurationConverters._

import com.typesafe.config.{Config, ConfigFactory}

import com.salesforce.mce.orchard.util.{FixedDelay, JitteredDelay, Policy}

class OrchardSettings private (config: Config) {

val jitteredDelayKey = "jitteredDelay"

def slickDatabaseConf = config.getConfig("jdbc")

def providerConfig(provider: String): Config = config.getConfig(s"io.$provider")

val checkProgressDelay = config.getDuration("activity.checkProgressDelay").toScala
private def delayPolicy(config: Config, path: String): Policy = {
config.getAnyRef(path) match {
case _: String => FixedDelay(config.getDuration(path).toScala)
case _ if config.getObject(path).containsKey(jitteredDelayKey) =>
val minDelay = config.getDuration(s"$path.$jitteredDelayKey.minDelay").toScala
val maxDelay = config.getDuration(s"$path.$jitteredDelayKey.maxDelay").toScala
JitteredDelay(minDelay, maxDelay)
case _ => FixedDelay() // fixed delay with a default delay value
}
}

val checkProgressDelayPolicy = delayPolicy(config, "activity.checkProgressDelay")

val resourceReattemptDelay = config.getDuration("resource.reAttemptDelay").toScala
val resourceReattemptDelayPolicy = delayPolicy(config, "resource.reAttemptDelay")

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,17 @@

package com.salesforce.mce.orchard.system.actor

import scala.concurrent.duration._
import scala.concurrent.ExecutionContext

import org.apache.pekko.actor.typed._
import org.apache.pekko.actor.typed.scaladsl._
import play.api.libs.json.JsValue

import com.salesforce.mce.orchard.db.{ActivityAttemptQuery, OrchardDatabase}
import com.salesforce.mce.orchard.db.{ActivityAttemptQuery, OrchardDatabase, ResourceInstanceQuery}
import com.salesforce.mce.orchard.io.ActivityIO
import com.salesforce.mce.orchard.model.Status
import com.salesforce.mce.orchard.system.util.InvalidJsonException
import com.salesforce.mce.orchard.db.ResourceInstanceQuery
import com.salesforce.mce.orchard.util.Policy

object ActivityAttempt {

Expand Down Expand Up @@ -52,7 +51,7 @@ object ActivityAttempt {
activityType: String,
activitySpec: JsValue,
resourceId: String,
checkProgressDelay: FiniteDuration
checkProgressDelay: Policy
): Behavior[Msg] = Behaviors.supervise {
Behaviors.setup { ctx: ActorContext[Msg] =>
Behaviors.withTimers { timers: TimerScheduler[Msg] =>
Expand Down Expand Up @@ -147,7 +146,7 @@ object ActivityAttempt {
resourceMgr: ActorRef[ResourceMgr.Msg],
activityType: String,
activitySpec: JsValue,
checkProgressDelay: FiniteDuration
checkProgressDelay: Policy
): Behavior[Msg] =
Behaviors
.receiveMessage[Msg] {
Expand Down Expand Up @@ -189,7 +188,7 @@ object ActivityAttempt {

// Resource not ready yet
case Left(Status.Pending) =>
ps.timers.startSingleTimer(CheckProgress, checkProgressDelay)
ps.timers.startSingleTimer(CheckProgress, checkProgressDelay.delay())
Behaviors.same

// Resource down for unknown reason, cancel the activity?
Expand All @@ -214,9 +213,9 @@ object ActivityAttempt {
resourceInstance: Int,
activityIO: ActivityIO,
attemptSpec: JsValue,
checkProgressDelay: FiniteDuration
checkProgressDelay: Policy
): Behavior[Msg] = {
ps.timers.startSingleTimer(CheckProgress, checkProgressDelay)
ps.timers.startSingleTimer(CheckProgress, checkProgressDelay.delay())
Behaviors
.receiveMessage[Msg] {
case Cancel =>
Expand All @@ -242,7 +241,7 @@ object ActivityAttempt {
ps.database.sync(ps.query.setTerminated(status, ""))
terminate(ps, status)
case Right(status) =>
ps.timers.startSingleTimer(CheckProgress, checkProgressDelay)
ps.timers.startSingleTimer(CheckProgress, checkProgressDelay.delay())
Behaviors.same
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,14 @@

package com.salesforce.mce.orchard.system.actor

import scala.concurrent.duration.FiniteDuration

import org.apache.pekko.actor.typed.scaladsl._
import org.apache.pekko.actor.typed.{ActorRef, Behavior}
import play.api.libs.json.JsValue

import com.salesforce.mce.orchard.OrchardSettings
import com.salesforce.mce.orchard.db.{ActivityQuery, OrchardDatabase}
import com.salesforce.mce.orchard.model.Status
import com.salesforce.mce.orchard.util.Policy

object ActivityMgr {

Expand Down Expand Up @@ -70,13 +69,13 @@ object ActivityMgr {
.sync(activityQuery.get())
.map { r =>
params.ctx.log.info(s"${ctx.self} init with status ${r.status}")
init(params, r.status, orchardSettings.checkProgressDelay)
init(params, r.status, orchardSettings.checkProgressDelayPolicy)
}
.getOrElse(terminate(params, Status.Failed))

}

private def init(ps: Params, status: Status.Value, checkProgressDelay: FiniteDuration): Behavior[Msg] = status match {
private def init(ps: Params, status: Status.Value, checkProgressDelay: Policy): Behavior[Msg] = status match {

// this is the state if everything follows the happy path
case Status.Pending =>
Expand Down Expand Up @@ -136,7 +135,7 @@ object ActivityMgr {
Behaviors.stopped
}

private def idle(ps: Params, checkProgressDelay: FiniteDuration): Behavior[Msg] = Behaviors
private def idle(ps: Params, checkProgressDelay: Policy): Behavior[Msg] = Behaviors
.receiveMessage[Msg] {
case Start =>
ps.ctx.log.info(s"${ps.ctx.self} (idle) received Start")
Expand Down Expand Up @@ -182,7 +181,7 @@ object ActivityMgr {
ps: Params,
attemptId: Int,
attempt: ActorRef[ActivityAttempt.Msg],
checkProgressDelay: FiniteDuration
checkProgressDelay: Policy
): Behavior[Msg] = Behaviors
.receiveMessage[Msg] {
case Start =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ object ResourceMgr {

resourceR.status match {
case Status.Pending =>
idle(ps, orchardSettings.resourceReattemptDelay)
idle(ps, orchardSettings.resourceReattemptDelayPolicy.delay())

case Status.Running =>
val resourceInsts = database.sync(resourceQuery.instances())
Expand All @@ -114,7 +114,7 @@ object ResourceMgr {
ps,
instId
)
} yield running(ps, orchardSettings.resourceReattemptDelay, rscInst, instId)
} yield running(ps, orchardSettings.resourceReattemptDelayPolicy.delay(), rscInst, instId)

result.left.map { sts =>
database.sync(resourceQuery.setTerminated(sts))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright (c) 2022, salesforce.com, inc.
* All rights reserved.
* SPDX-License-Identifier: BSD-3-Clause
* For full license text, see the LICENSE file in the repo root or https://opensource.org/licenses/BSD-3-Clause
*/

package com.salesforce.mce.orchard.util

import scala.concurrent.duration
import scala.concurrent.duration.FiniteDuration

trait FixedDelay extends Policy {

def fixedDelay: FiniteDuration = FixedDelay.defaultFixedDelay

def delay(): FiniteDuration = FiniteDuration(fixedDelay.toSeconds, duration.SECONDS)

}

object FixedDelay {

val defaultFixedDelay: FiniteDuration = FiniteDuration(10, duration.SECONDS)

def apply(fixedDelayValue: FiniteDuration = defaultFixedDelay) = new FixedDelay {
override def fixedDelay = fixedDelayValue

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright (c) 2022, salesforce.com, inc.
* All rights reserved.
* SPDX-License-Identifier: BSD-3-Clause
* For full license text, see the LICENSE file in the repo root or https://opensource.org/licenses/BSD-3-Clause
*/

package com.salesforce.mce.orchard.util

import scala.concurrent.duration
import scala.concurrent.duration.FiniteDuration
import scala.util.Random

trait JitteredDelay extends Policy {

def minDelay: FiniteDuration = JitteredDelay.defaultMinDelay

def maxDelay: FiniteDuration = JitteredDelay.defaultMaxDelay

def delay(): FiniteDuration = {
val rand = new Random()
FiniteDuration(rand.between(minDelay.toSeconds, maxDelay.toSeconds + 1L), duration.SECONDS)
}
}

object JitteredDelay {

val defaultMinDelay: FiniteDuration = FiniteDuration(10, duration.SECONDS)

val defaultMaxDelay: FiniteDuration = FiniteDuration(20, duration.SECONDS)

def apply(
minDelayValue: FiniteDuration = defaultMinDelay,
maxDelayValue: FiniteDuration = defaultMaxDelay
) =
new JitteredDelay {
override def minDelay: FiniteDuration = minDelayValue

override def maxDelay: FiniteDuration = maxDelayValue
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright (c) 2022, salesforce.com, inc.
* All rights reserved.
* SPDX-License-Identifier: BSD-3-Clause
* For full license text, see the LICENSE file in the repo root or https://opensource.org/licenses/BSD-3-Clause
*/

package com.salesforce.mce.orchard.util

import scala.concurrent.duration.FiniteDuration

trait Policy {

/**
* @return the number of seconds to wait for the next attempt
*/
def delay(): FiniteDuration

}
2 changes: 1 addition & 1 deletion version.sbt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
ThisBuild / version := "0.22.2"
ThisBuild / version := "0.23.0"

0 comments on commit 74661ea

Please sign in to comment.