diff --git a/modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala b/modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala index 80c3a8f01..6c7a24daf 100644 --- a/modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala +++ b/modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala @@ -116,6 +116,8 @@ object KafkaConsumer { private def createKafkaConsumer[F[_], K, V]( settings: ConsumerSettings[F, K, V], + keyDes: Deserializer[F, K], + valueDes: Deserializer[F, V], actor: KafkaConsumerActor[F], fiber: FakeFiber[F], streamIdRef: Ref[F, StreamId], @@ -414,15 +416,6 @@ object KafkaConsumer { withConsumer.blocking(_.commitSync(offsets.asJava)) } - private[this] def request[A]( - request: (Either[Throwable, A] => F[Unit]) => Request[F] - ): F[A] = - Deferred[F, Either[Throwable, A]].flatMap { deferred => - requests.offer(request(deferred.complete(_).void)) >> - F.race(awaitTermination.as(ConsumerShutdownException()), deferred.get.rethrow) - }.rethrow - - override def assignment: F[SortedSet[TopicPartition]] = assignment(Option.empty) diff --git a/modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala b/modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala index 2c302cdf8..36f6c7fdb 100644 --- a/modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala +++ b/modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala @@ -575,8 +575,7 @@ private[kafka] object KafkaConsumerActor { sealed abstract class Request[F[_]] object Request { - final case class Permit[F[_]](callback: Resource[F, Unit] => F[Unit]) - extends Request[F] + final case class Permit[F[_]](callback: Resource[F, Unit] => F[Unit]) extends Request[F] final case class Poll[F[_]]() extends Request[F] private[this] val pollInstance: Poll[Nothing] =