Skip to content

Commit

Permalink
Add on partition recovered trait
Browse files Browse the repository at this point in the history
  • Loading branch information
kronolynx committed Dec 14, 2023
1 parent c9d5018 commit 31b1166
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,18 +50,26 @@ object PartitionFlow {
record => f(record).pure[F]
}

trait OnPartitionRecovered[F[_]] {
def onRecovered(topicPartition: TopicPartition): F[Unit]
}

object OnPartitionRecovered {
def fromFunc[F[_]](f: TopicPartition => F[Unit]): OnPartitionRecovered[F] = tp => f(tp)
}

def resource[F[_]: Async: LogOf](
topicPartition: TopicPartition,
assignedAt: Offset,
keyStateOf: KeyStateOf[F],
config: PartitionFlowConfig,
filter: Option[FilterRecord[F]] = None,
scheduleCommit: ScheduleCommit[F],
onRecoveryFinished: Option[F[Unit]] = None
onPartitionRecovered: Option[OnPartitionRecovered[F]] = None
): Resource[F, PartitionFlow[F]] =
LogResource[F](getClass, topicPartition.toString) flatMap { implicit log =>
Cache.loading[F, String, PartitionKey[F]] flatMap { cache =>
of(topicPartition, assignedAt, keyStateOf, cache, config, filter, scheduleCommit, onRecoveryFinished)
of(topicPartition, assignedAt, keyStateOf, cache, config, filter, scheduleCommit, onPartitionRecovered)
}
}

Expand All @@ -73,25 +81,25 @@ object PartitionFlow {
config: PartitionFlowConfig,
filter: Option[FilterRecord[F]] = None,
scheduleCommit: ScheduleCommit[F],
onRecoveryFinished: Option[F[Unit]] = None
onPartitionRecovered: Option[OnPartitionRecovered[F]] = None
)(implicit log: Log[F]): Resource[F, PartitionFlow[F]] = for {
clock <- Resource.eval(Clock[F].instant)
committedOffset <- Resource.eval(Ref.of(assignedAt))
timestamp <- Resource.eval(Ref.of(Timestamp(clock, None, assignedAt)))
triggerTimersAt <- Resource.eval(Ref.of(clock))
commitOffsetsAt <- Resource.eval(Ref.of(clock))
flow <- of(
topicPartition = topicPartition,
keyStateOf = keyStateOf,
committedOffset = committedOffset,
timestamp = timestamp,
triggerTimersAt = triggerTimersAt,
commitOffsetsAt = commitOffsetsAt,
cache = cache,
config = config,
filter = filter,
scheduleCommit = scheduleCommit,
onRecoveryFinished = onRecoveryFinished
topicPartition = topicPartition,
keyStateOf = keyStateOf,
committedOffset = committedOffset,
timestamp = timestamp,
triggerTimersAt = triggerTimersAt,
commitOffsetsAt = commitOffsetsAt,
cache = cache,
config = config,
filter = filter,
scheduleCommit = scheduleCommit,
onPartitionRecovered = onPartitionRecovered
)
} yield flow

Expand All @@ -107,7 +115,7 @@ object PartitionFlow {
config: PartitionFlowConfig,
filter: Option[FilterRecord[F]],
scheduleCommit: ScheduleCommit[F],
onRecoveryFinished: Option[F[Unit]]
onPartitionRecovered: Option[OnPartitionRecovered[F]]
)(implicit log: Log[F]): Resource[F, PartitionFlow[F]] = {

def stateOf(createdAt: Timestamp, key: String): F[PartitionKey[F]] =
Expand Down Expand Up @@ -140,7 +148,7 @@ object PartitionFlow {
case Sequential =>
keys.foldM(0)((count, key) => stateOf(timestamp, key) as (count + 1))
}
_ <- onRecoveryFinished.getOrElse(().pure[F])
_ <- onPartitionRecovered.map(_.onRecovered(topicPartition)).getOrElse(().pure[F])
_ <- log.info(s"partition recovery finished, $count keys recovered")
} yield ()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package com.evolutiongaming.kafka.flow
import cats.effect.kernel.Async
import cats.effect.Resource
import com.evolutiongaming.catshelper.LogOf
import com.evolutiongaming.kafka.flow.PartitionFlow.FilterRecord
import com.evolutiongaming.kafka.flow.PartitionFlow.{FilterRecord, OnPartitionRecovered}
import com.evolutiongaming.kafka.flow.kafka.ScheduleCommit
import com.evolutiongaming.skafka.{Offset, TopicPartition}

Expand Down Expand Up @@ -31,9 +31,9 @@ object PartitionFlowOf {
*/
def apply[F[_]: Async: LogOf](
keyStateOf: KeyStateOf[F],
config: PartitionFlowConfig = PartitionFlowConfig(),
filter: Option[FilterRecord[F]] = None,
onRecoveryFinished: Option[F[Unit]] = None
config: PartitionFlowConfig = PartitionFlowConfig(),
filter: Option[FilterRecord[F]] = None,
onRecoveryFinished: Option[OnPartitionRecovered[F]] = None
): PartitionFlowOf[F] = { (topicPartition, assignedAt, scheduleCommit) =>
PartitionFlow.resource(topicPartition, assignedAt, keyStateOf, config, filter, scheduleCommit, onRecoveryFinished)
}
Expand Down

0 comments on commit 31b1166

Please sign in to comment.