Skip to content

Commit

Permalink
Fix merge
Browse files Browse the repository at this point in the history
  • Loading branch information
bplommer committed Mar 15, 2022
1 parent fe05a52 commit 0631075
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 11 deletions.
11 changes: 2 additions & 9 deletions modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ object KafkaConsumer {

private def createKafkaConsumer[F[_], K, V](
settings: ConsumerSettings[F, K, V],
keyDes: Deserializer[F, K],
valueDes: Deserializer[F, V],
actor: KafkaConsumerActor[F],
fiber: FakeFiber[F],
streamIdRef: Ref[F, StreamId],
Expand Down Expand Up @@ -414,15 +416,6 @@ object KafkaConsumer {
withConsumer.blocking(_.commitSync(offsets.asJava))
}

private[this] def request[A](
request: (Either[Throwable, A] => F[Unit]) => Request[F]
): F[A] =
Deferred[F, Either[Throwable, A]].flatMap { deferred =>
requests.offer(request(deferred.complete(_).void)) >>
F.race(awaitTermination.as(ConsumerShutdownException()), deferred.get.rethrow)
}.rethrow


override def assignment: F[SortedSet[TopicPartition]] =
assignment(Option.empty)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -575,8 +575,7 @@ private[kafka] object KafkaConsumerActor {
sealed abstract class Request[F[_]]

object Request {
final case class Permit[F[_]](callback: Resource[F, Unit] => F[Unit])
extends Request[F]
final case class Permit[F[_]](callback: Resource[F, Unit] => F[Unit]) extends Request[F]
final case class Poll[F[_]]() extends Request[F]

private[this] val pollInstance: Poll[Nothing] =
Expand Down

0 comments on commit 0631075

Please sign in to comment.