Skip to content

Commit

Permalink
Simplify fetchPartition code
Browse files Browse the repository at this point in the history
  • Loading branch information
bplommer committed Mar 15, 2022
1 parent 0631075 commit 187329e
Showing 1 changed file with 19 additions and 21 deletions.
40 changes: 19 additions & 21 deletions modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -177,34 +177,33 @@ object KafkaConsumer {
)
)

def fetchPartition(deferred: Deferred[F, PartitionRequest]): F[Unit] = {
val fetch = permit.surround {
val callback = deferred.complete(_: PartitionRequest).void

def storeFetch =
actor.ref
.modify { state =>
val (newState, oldFetch) =
def fetchPartition: F[Unit] = {
val fetch: F[PartitionRequest] = permit.surround {

def storeFetch: F[PartitionRequest] = Deferred[F, PartitionRequest].flatMap {
deferred =>
val callback = deferred.complete(_: PartitionRequest).void
actor.ref.modify { state =>
val (newState, oldFetches) =
state.withFetch(partition, streamId, callback)
(newState, (newState, oldFetch))
}
.flatMap {
case (newState, oldFetches) =>
logging.log(LogEntry.StoredFetch(partition, callback, newState)) >>

newState ->
(logging.log(LogEntry.StoredFetch(partition, callback, newState)) >>
oldFetches.traverse_ { fetch =>
fetch.completeRevoked(Chunk.empty) >>
logging.log(LogEntry.RevokedPreviousFetch(partition, streamId))
}
}
})
}.flatten >> deferred.get
}

def completeRevoked =
callback((Chunk.empty, FetchCompletedReason.TopicPartitionRevoked))
def revoked: F[PartitionRequest] =
F.pure(Chunk.empty -> FetchCompletedReason.TopicPartitionRevoked)

withConsumer
.blocking { _.assignment.contains(partition) }
.ifM(storeFetch, completeRevoked) >>
deferred.get
.ifM(storeFetch, revoked)
}

F.race(shutdown, fetch).flatMap {
case Left(()) =>
stopReqs.complete(()).void
Expand All @@ -230,8 +229,7 @@ object KafkaConsumer {
Stream
.repeatEval {
stopReqs.tryGet.flatMap {
case None =>
Deferred[F, PartitionRequest] >>= fetchPartition
case None => fetchPartition

case Some(()) =>
// Prevent issuing additional requests after partition is
Expand Down

0 comments on commit 187329e

Please sign in to comment.