Skip to content

Commit

Permalink
Pass onPartitionRecoveryFinished from kafka persistence package
Browse files Browse the repository at this point in the history
  • Loading branch information
kronolynx committed Dec 12, 2023
1 parent e00ebd4 commit 165bf7c
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ trait PartitionFlowOf[F[_]] {
topicPartition: TopicPartition,
assignedAt: Offset,
scheduleCommit: ScheduleCommit[F],
onRecoveryFinished: Option[F[Unit]] = None
): Resource[F, PartitionFlow[F]]

}
Expand All @@ -31,8 +30,9 @@ object PartitionFlowOf {
def apply[F[_]: Async: LogOf](
keyStateOf: KeyStateOf[F],
config: PartitionFlowConfig = PartitionFlowConfig(),
filter: Option[FilterRecord[F]] = None
): PartitionFlowOf[F] = { (topicPartition, assignedAt, scheduleCommit, onRecoveryFinished) =>
filter: Option[FilterRecord[F]] = None,
onRecoveryFinished: Option[F[Unit]]
): PartitionFlowOf[F] = { (topicPartition, assignedAt, scheduleCommit) =>
PartitionFlow.resource(topicPartition, assignedAt, keyStateOf, config, filter, scheduleCommit, onRecoveryFinished)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ class EntityRegistryTest extends FunSuite {
fold = fold,
registry = registry
),
config = PartitionFlowConfig()
config = PartitionFlowConfig(),
onRecoveryFinished = None
)
partitionFlow <- partitionFlowOf.apply(
topicPartition = TopicPartition.empty,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,8 @@ object PartitionFlowMetrics {
topicPartition: TopicPartition,
assignedAt: Offset,
scheduleCommit: ScheduleCommit[F],
onRecoveryFinished: Option[F[Unit]]
) =
partitionFlowOf(topicPartition, assignedAt, scheduleCommit, onRecoveryFinished) map (_.withMetrics)
partitionFlowOf(topicPartition, assignedAt, scheduleCommit) map (_.withMetrics)

}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ class FlowSpec extends CassandraSpec {
config = PartitionFlowConfig(
triggerTimersInterval = 1.minute,
commitOnRevoke = true
)
),
onRecoveryFinished = None
)
topicFlowOf = TopicFlowOf(partitionFlowOf)
records = NonEmptyList.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ package object kafkapersistence {
* enhances framework with metrics
* @param filter
* optional function to pre-filter incoming events before they are processed by `fold`
* @param onPartitionRecoveryFinished
* optional effect to execute when partition recovery is completed
*/
def kafkaEagerRecovery[F[_]: Async: LogOf, S](
kafkaPersistenceModuleOf: KafkaPersistenceModuleOf[F, S],
Expand All @@ -65,21 +67,23 @@ package object kafkapersistence {
partitionFlowConfig: PartitionFlowConfig,
metrics: FlowMetrics[F] = FlowMetrics.empty[F],
filter: Option[FilterRecord[F]] = None,
registry: EntityRegistry[F, KafkaKey, S]
registry: EntityRegistry[F, KafkaKey, S],
onPartitionRecoveryFinished: Option[F[Unit]] = None
): PartitionFlowOf[F] =
kafkaEagerRecovery(
kafkaPersistenceModuleOf = kafkaPersistenceModuleOf,
applicationId = applicationId,
groupId = groupId,
timersOf = timersOf,
timerFlowOf = timerFlowOf,
fold = EnhancedFold.fromFold(fold),
tick = tick,
partitionFlowConfig = partitionFlowConfig,
metrics = metrics,
filter = filter,
additionalPersistOf = AdditionalStatePersistOf.empty[F, S],
registry = registry
kafkaPersistenceModuleOf = kafkaPersistenceModuleOf,
applicationId = applicationId,
groupId = groupId,
timersOf = timersOf,
timerFlowOf = timerFlowOf,
fold = EnhancedFold.fromFold(fold),
tick = tick,
partitionFlowConfig = partitionFlowConfig,
metrics = metrics,
filter = filter,
additionalPersistOf = AdditionalStatePersistOf.empty[F, S],
registry = registry,
onPartitionRecoveryFinished = onPartitionRecoveryFinished
)

/** Create a PartitionFlowOf with a snapshot-based persistence and recovery from a Kafka
Expand Down Expand Up @@ -119,6 +123,8 @@ package object kafkapersistence {
* @param additionalPersistOf
* a factory of `AdditionalStatePersist` that can either enable or disable additional state persisting. That part
* of functionality in `KeyFlowExtras` will work only if you pass a functional (non-empty) implementation here
* @param onPartitionRecoveryFinished
* optional effect to execute when partition recovery is completed
*/
def kafkaEagerRecovery[F[_]: Async: LogOf, S](
kafkaPersistenceModuleOf: KafkaPersistenceModuleOf[F, S],
Expand All @@ -132,14 +138,14 @@ package object kafkapersistence {
metrics: FlowMetrics[F],
filter: Option[FilterRecord[F]],
additionalPersistOf: AdditionalStatePersistOf[F, S],
registry: EntityRegistry[F, KafkaKey, S]
registry: EntityRegistry[F, KafkaKey, S],
onPartitionRecoveryFinished: Option[F[Unit]]
): PartitionFlowOf[F] =
new PartitionFlowOf[F] {
override def apply(
topicPartition: TopicPartition,
assignedAt: Offset,
scheduleCommit: ScheduleCommit[F],
onRecoveryFinished: Option[F[Unit]]
): Resource[F, PartitionFlow[F]] = {
for {
// TODO: per-partition persistence module with 'String -> ByteVector' cache or global persistence module with 'KafkaKey -> ByteVector' cache?
Expand All @@ -160,10 +166,11 @@ package object kafkapersistence {
additionalPersistOf = additionalPersistOf,
registry = registry
) withMetrics metrics.keyStateOfMetrics,
config = partitionFlowConfig,
filter = filter
config = partitionFlowConfig,
filter = filter,
onRecoveryFinished = onPartitionRecoveryFinished
)
partitionFlow <- partitionFlowOf(topicPartition, assignedAt, scheduleCommit, onRecoveryFinished)
partitionFlow <- partitionFlowOf(topicPartition, assignedAt, scheduleCommit)
} yield partitionFlow
}
}
Expand Down

0 comments on commit 165bf7c

Please sign in to comment.