Skip to content

Commit

Permalink
Use lila's scalafmt config
Browse files Browse the repository at this point in the history
  • Loading branch information
lenguyenthanh committed Nov 23, 2023
1 parent 866af67 commit 0ac713f
Show file tree
Hide file tree
Showing 12 changed files with 118 additions and 143 deletions.
46 changes: 9 additions & 37 deletions .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -1,43 +1,14 @@
version = 3.7.15
version = "3.7.17"
runner.dialect = scala3
maxColumn = 110

newlines.source = keep

trailingCommas = "multiple"

rewrite {
rules = [Imports]
imports.sort = ascii
}

rewrite.scala3 {
convertToNewSyntax = yes
removeOptionalBraces = yes
}

align {
allowOverflow = true
preset = more
openParenCallSite = false
stripMargin = true
}

continuationIndent {
callSite = 2
defnSite = 4
}

docstrings {
style = Asterisk
oneline = keep
wrap = no
}
align.preset = more
maxColumn = 110
spaces.inImportCurlyBraces = true
rewrite.rules = [SortModifiers]
rewrite.redundantBraces.stringInterpolation = true

spaces {
beforeContextBoundColon = Never
inImportCurlyBraces = true
}
rewrite.scala3.convertToNewSyntax = yes
rewrite.scala3.removeOptionalBraces = yes

fileOverride {
"glob:**/build.sbt" {
Expand All @@ -47,3 +18,4 @@ fileOverride {
runner.dialect = scala213
}
}

2 changes: 1 addition & 1 deletion app/src/main/scala/App.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,5 @@ object App extends IOApp.Simple:
server <- MkHttpServer.apply.newEmber(config.server, httpApi.httpApp)
_ <- workListenerJob.run().background
_ <- cleanJob.run().background
_ <- Resource.eval(Logger[IO].info(s"Starting server on ${config.server.host}:${config.server.port}"))
_ <- Resource.eval(Logger[IO].info(s"Starting server on ${config.server.host}:${config.server.port}"))
yield ()
3 changes: 2 additions & 1 deletion app/src/main/scala/AppResources.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ object AppResources:
// Default 1000 is good for small servers. But can easily take 100,000.
// workers: How many threads will process pipelined messages.
def instance(conf: RedisConfig): Resource[IO, AppResources] =
RedisConnection.queued[IO]
RedisConnection
.queued[IO]
.withHost(conf.host)
.withPort(conf.port)
.withMaxQueued(1000)
Expand Down
2 changes: 1 addition & 1 deletion app/src/main/scala/Config.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ object Config:
def appConfig = (
RedisConfig.config,
HttpServerConfig.config,
KamonConfig.config,
KamonConfig.config
).parMapN(AppConfig.apply)

case class AppConfig(redis: RedisConfig, server: HttpServerConfig, kamon: KamonConfig)
Expand Down
136 changes: 68 additions & 68 deletions app/src/main/scala/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,11 @@ import lila.fishnet.Work.Move
import java.time.Instant
import lila.fishnet.Lila.Request

/**
* Executor is responsible for:
* store work in memory
* - getting work from the queue
* - sending work to lila
* - adding work to the queue
*/
/** Executor is responsible for: store work in memory
* - getting work from the queue
* - sending work to lila
* - adding work to the queue
*/
trait Executor:
// get a move from the queue return Work
def acquire(accquire: ClientKey): IO[Option[Work.RequestWithId]]
Expand All @@ -29,73 +27,75 @@ object Executor:
type State = Map[WorkId, Work.Move]

def instance(client: LilaClient, monitor: Monitor): IO[Executor] =
Ref.of[IO, State](Map.empty).map: ref =>
new Executor:
Ref
.of[IO, State](Map.empty)
.map: ref =>
new Executor:

def add(work: Request): IO[Unit] =
fromRequest(work).flatMap: move =>
ref.update: m =>
// if m.exists(_._2.similar(work)) then
// logger.info(s"Add coll exist: $move")
clearIfFull(m) + (move.id -> move)
def add(work: Request): IO[Unit] =
fromRequest(work).flatMap: move =>
ref.update: m =>
// if m.exists(_._2.similar(work)) then
// logger.info(s"Add coll exist: $move")
clearIfFull(m) + (move.id -> move)

def acquire(key: ClientKey): IO[Option[Work.RequestWithId]] =
IO.realTimeInstant.flatMap: at =>
ref.modify: coll =>
coll.values
.foldLeft[Option[Work.Move]](none):
case (found, m) if m.nonAcquired =>
Some(found.fold(m): a =>
if m.canAcquire(key) && m.createdAt.isBefore(a.createdAt) then m else a)
case (found, _) => found
.map: m =>
val move = m.assignTo(key, at)
(coll + (move.id -> move)) -> move.toRequestWithId.some
.getOrElse(coll -> none)
def acquire(key: ClientKey): IO[Option[Work.RequestWithId]] =
IO.realTimeInstant.flatMap: at =>
ref.modify: coll =>
coll.values
.foldLeft[Option[Work.Move]](none):
case (found, m) if m.nonAcquired =>
Some(found.fold(m): a =>
if m.canAcquire(key) && m.createdAt.isBefore(a.createdAt) then m else a)
case (found, _) => found
.map: m =>
val move = m.assignTo(key, at)
(coll + (move.id -> move)) -> move.toRequestWithId.some
.getOrElse(coll -> none)

def move(workId: WorkId, apikey: ClientKey, move: BestMove): IO[Unit] =
ref.flatModify: coll =>
coll get workId match
case None =>
coll -> monitor.notFound(workId, apikey)
case Some(work) if work.isAcquiredBy(apikey) =>
move.uci match
case Some(uci) =>
coll - work.id -> (monitor.success(work) >> client.send(Lila.Move(
work.request.id,
work.request.moves,
uci,
)))
case _ =>
updateOrGiveUp(coll, work.invalid) ->
monitor.failure(work, apikey, new Exception("Missing move"))
def move(workId: WorkId, apikey: ClientKey, move: BestMove): IO[Unit] =
ref.flatModify: coll =>
coll get workId match
case None =>
coll -> monitor.notFound(workId, apikey)
case Some(work) if work.isAcquiredBy(apikey) =>
move.uci match
case Some(uci) =>
coll - work.id -> (monitor.success(work) >> client.send(
Lila.Move(
work.request.id,
work.request.moves,
uci
)
))
case _ =>
updateOrGiveUp(coll, work.invalid) ->
monitor.failure(work, apikey, new Exception("Missing move"))

case Some(move) =>
coll -> monitor.notAcquired(move, apikey)
case Some(move) =>
coll -> monitor.notAcquired(move, apikey)

def clean(since: Instant): IO[Unit] =
ref.updateAndGet: coll =>
val timedOut = coll.values.filter(_.acquiredBefore(since))
// if (timedOut.nonEmpty)
// logger.debug(s"cleaning ${timedOut.size} of ${coll.size} moves")
timedOut.foldLeft(coll): (coll, m) =>
// logger.info(s"Timeout move $m")
updateOrGiveUp(coll, m.timeout)
.flatMap(monitor.updateSize)
def clean(since: Instant): IO[Unit] =
ref
.updateAndGet: coll =>
val timedOut = coll.values.filter(_.acquiredBefore(since))
// if (timedOut.nonEmpty)
// logger.debug(s"cleaning ${timedOut.size} of ${coll.size} moves")
timedOut.foldLeft(coll): (coll, m) =>
// logger.info(s"Timeout move $m")
updateOrGiveUp(coll, m.timeout)
.flatMap(monitor.updateSize)

def clearIfFull(coll: State): State =
if coll.size > maxSize then
// logger.warn(s"MoveDB collection is full! maxSize=$maxSize. Dropping all now!")
Map.empty
else
coll
def clearIfFull(coll: State): State =
if coll.size > maxSize then
// logger.warn(s"MoveDB collection is full! maxSize=$maxSize. Dropping all now!")
Map.empty
else coll

def updateOrGiveUp(state: State, move: Work.Move): State =
val newState = state - move.id
if move.isOutOfTries then
newState
else
newState + (move.id -> move)
def updateOrGiveUp(state: State, move: Work.Move): State =
val newState = state - move.id
if move.isOutOfTries then newState
else newState + (move.id -> move)

def fromRequest(req: Lila.Request): IO[Move] =
(IO.delay(Work.makeId), IO.realTimeInstant).mapN: (id, now) =>
Expand All @@ -104,5 +104,5 @@ object Executor:
request = req,
tries = 0,
acquired = None,
createdAt = now,
createdAt = now
)
3 changes: 1 addition & 2 deletions app/src/main/scala/Monitor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ object Monitor:
IO.realTimeInstant.map: now =>
if work.request.level == 8 then
work.acquiredAt.foreach(at => record(lvl8AcquiredTimeRequest, at, now))
if work.request.level == 1 then
record(lvl1FullTimeRequest, work.createdAt, now)
if work.request.level == 1 then record(lvl1FullTimeRequest, work.createdAt, now)

def failure(work: Work.Move, clientKey: ClientKey, e: Exception) =
Logger[IO].warn(e)(s"Received invalid move ${work.id} for ${work.request.id} by $clientKey")
Expand Down
8 changes: 5 additions & 3 deletions app/src/main/scala/RedisSubscriberJob.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@ object RedisSubscriberJob:
"fishnet-out",
msg =>
Logger[IO].info(s"Received message: $msg") *>
Lila.readMoveReq(msg.message).match
case Some(request) => executor.add(request)
case None => Logger[IO].error(s"Failed to parse message: $msg"),
Lila
.readMoveReq(msg.message)
.match
case Some(request) => executor.add(request)
case None => Logger[IO].error(s"Failed to parse message: $msg"),
) *> pubsub.runMessages

trait CleanJob:
Expand Down
4 changes: 2 additions & 2 deletions app/src/main/scala/Work.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ object Work:
game_id = request.id.value,
position = request.initialFen,
moves = request.moves,
variant = request.variant,
variant = request.variant
)

case class Acquired(clientKey: ClientKey, date: Instant):
Expand All @@ -22,7 +22,7 @@ object Work:
request: Lila.Request,
tries: Int,
acquired: Option[Acquired],
createdAt: Instant,
createdAt: Instant
):

def toRequestWithId =
Expand Down
6 changes: 2 additions & 4 deletions app/src/main/scala/http/KamonInitiator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,5 @@ trait KamonInitiator:
object KamonInitiator:
def apply: KamonInitiator = new KamonInitiator:
def init(config: KamonConfig): IO[Unit] =
if config.enabled then
IO(Kamon.init())
else
IO.unit
if config.enabled then IO(Kamon.init())
else IO.unit
24 changes: 11 additions & 13 deletions app/src/main/scala/model.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,17 @@ object BestMove:

opaque type WorkId = String
object WorkId:
def apply(value: String): WorkId = value
given Encoder[WorkId] = encodeString
given Decoder[WorkId] = decodeString
extension (bm: WorkId)
def value: String = bm
def apply(value: String): WorkId = value
given Encoder[WorkId] = encodeString
given Decoder[WorkId] = decodeString
extension (bm: WorkId) def value: String = bm

opaque type GameId = String
object GameId:
def apply(value: String): GameId = value
given Encoder[GameId] = encodeString
given Decoder[GameId] = decodeString
extension (bm: GameId)
def value: String = bm
def apply(value: String): GameId = value
given Encoder[GameId] = encodeString
given Decoder[GameId] = decodeString
extension (bm: GameId) def value: String = bm

object Fishnet:

Expand All @@ -58,7 +56,7 @@ object Fishnet:
game_id: String,
position: Fen.Epd,
moves: String,
variant: Variant,
variant: Variant
) derives Encoder.AsObject

object Lila:
Expand All @@ -75,7 +73,7 @@ object Lila:
variant: Variant,
moves: String,
level: Int,
clock: Option[Clock],
clock: Option[Clock]
)

def readMoveReq(msg: String): Option[Request] =
Expand All @@ -91,7 +89,7 @@ object Lila:
variant = variant,
moves = moves,
level = level,
clock = clock,
clock = clock
)
case _ => None

Expand Down
9 changes: 5 additions & 4 deletions app/src/test/scala/ExecutorTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ object ExecutorTest extends SimpleIOSuite:
variant = chess.variant.Standard,
moves = "",
level = 1,
clock = None,
clock = None
)

val key = ClientKey("key")
Expand Down Expand Up @@ -109,7 +109,7 @@ object ExecutorTest extends SimpleIOSuite:
client = createLilaClient(ref)
executor <- Executor.instance(client, noopMonitor)
_ <- executor.add(request)
_ <- (executor.acquire(key).flatMap(x => executor.move(x.get.id, key, invalidMove))).replicateA_(2)
_ <- (executor.acquire(key).flatMap(x => executor.move(x.get.id, key, invalidMove))).replicateA_(2)
acquired <- executor.acquire(key)
yield assert(acquired.isDefined)

Expand All @@ -119,15 +119,16 @@ object ExecutorTest extends SimpleIOSuite:
client = createLilaClient(ref)
executor <- Executor.instance(client, noopMonitor)
_ <- executor.add(request)
_ <- (executor.acquire(key).flatMap(x => executor.move(x.get.id, key, invalidMove))).replicateA_(3)
_ <- (executor.acquire(key).flatMap(x => executor.move(x.get.id, key, invalidMove))).replicateA_(3)
acquired <- executor.acquire(key)
yield assert(acquired.isEmpty)

def createExecutor(): IO[Executor] =
createLilaClient.flatMap(Executor.instance(_, noopMonitor))

def createLilaClient: IO[LilaClient] =
Ref.of[IO, List[Lila.Move]](Nil)
Ref
.of[IO, List[Lila.Move]](Nil)
.map(createLilaClient)

def createLilaClient(ref: Ref[IO, List[Lila.Move]]): LilaClient =
Expand Down
Loading

0 comments on commit 0ac713f

Please sign in to comment.