Skip to content

Commit

Permalink
Destroy the preload consumer when complete in loadAndRun
Browse files Browse the repository at this point in the history
  • Loading branch information
bcarter97 committed Dec 13, 2024
1 parent 47572c0 commit 7022fd3
Showing 1 changed file with 18 additions and 6 deletions.
24 changes: 18 additions & 6 deletions src/main/scala/uk/sky/fs2/kafka/topicloader/TopicLoader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,13 @@ trait TopicLoader {
topics: NonEmptyList[String],
consumerSettings: ConsumerSettings[F, K, V]
)(onLoad: Resource.ExitCase => F[Unit]): Stream[F, ConsumerRecord[K, V]] = {
def preLoad(
logOffsets: NonEmptyMap[TopicPartition, LogOffsets]
)(using Logger[F]): Stream[F, ConsumerRecord[K, V]] = for {
preloadConsumer <- KafkaConsumer.stream(consumerSettings)
record <- load(logOffsets, preloadConsumer).onFinalizeCase(onLoad)
} yield record

def postLoad(
logOffsets: NonEmptyMap[TopicPartition, LogOffsets]
)(using Logger[F]): Stream[F, ConsumerRecord[K, V]] =
Expand All @@ -84,10 +91,14 @@ trait TopicLoader {

for {
given Logger[F] <- Stream.eval(LoggerFactory[F].create)
preloadConsumer <- KafkaConsumer.stream(consumerSettings)
logOffsets <- Stream.eval(logOffsetsForTopics(topics, LoadAll, preloadConsumer)).flatMap(Stream.fromOption(_))
maybeLogOffsets <- Stream
.eval(
// Destroy the consumer after calculating the offsets
KafkaConsumer.resource(consumerSettings).use(logOffsetsForTopics(topics, LoadAll, _))
)
logOffsets <- Stream.fromOption(maybeLogOffsets)
_ <- Stream.eval(info"log offsets: ${logOffsets.show}")
record <- load(logOffsets, preloadConsumer).onFinalizeCase(onLoad) ++ postLoad(logOffsets)
record <- preLoad(logOffsets) ++ postLoad(logOffsets)
} yield record
}

Expand All @@ -97,9 +108,10 @@ trait TopicLoader {
consumer: KafkaConsumer[F, K, V]
): Stream[F, ConsumerRecord[K, V]] =
for {
logOffsets <- Stream.eval(logOffsetsForTopics(topics, strategy, consumer)).flatMap(Stream.fromOption(_))
_ <- Stream.eval(info"log offsets: ${logOffsets.show}")
record <- load(logOffsets, consumer)
maybeLogOffsets <- Stream.eval(logOffsetsForTopics(topics, strategy, consumer))
logOffsets <- Stream.fromOption(maybeLogOffsets)
_ <- Stream.eval(info"log offsets: ${logOffsets.show}")
record <- load(logOffsets, consumer)
} yield record

private def load[F[_] : Async : Logger, K, V](
Expand Down

0 comments on commit 7022fd3

Please sign in to comment.