From bd3f836cc93a53522970a3eaaa5f929c5d4eef28 Mon Sep 17 00:00:00 2001 From: Grzegorz Bielski Date: Sat, 3 Aug 2024 09:30:55 +0200 Subject: [PATCH 1/5] Add KafkaHealthCheck --- project/Dependencies.scala | 2 +- .../skafka/KafkaHealthCheck.scala | 253 ++++++++++++++++++ 2 files changed, 254 insertions(+), 1 deletion(-) create mode 100644 skafka/src/main/scala/com/evolutiongaming/skafka/KafkaHealthCheck.scala diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 398dc0fd..7bbf9e74 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -4,7 +4,7 @@ object Dependencies { val `config-tools` = "com.evolutiongaming" %% "config-tools" % "1.0.5" val `future-helper` = "com.evolutiongaming" %% "future-helper" % "1.0.7" - val `cats-helper` = "com.evolutiongaming" %% "cats-helper" % "3.9.0" + val `cats-helper` = "com.evolutiongaming" %% "cats-helper" % "3.10.6-SNAPSHOT" val `testcontainers-kafka` = "com.dimafeng" %% "testcontainers-scala-kafka" % "0.41.0" val `play-json-jsoniter` = "com.evolution" %% "play-json-jsoniter" % "1.0.0" val `scala-java8-compat` = "org.scala-lang.modules" %% "scala-java8-compat" % "1.0.2" diff --git a/skafka/src/main/scala/com/evolutiongaming/skafka/KafkaHealthCheck.scala b/skafka/src/main/scala/com/evolutiongaming/skafka/KafkaHealthCheck.scala new file mode 100644 index 00000000..7393af29 --- /dev/null +++ b/skafka/src/main/scala/com/evolutiongaming/skafka/KafkaHealthCheck.scala @@ -0,0 +1,253 @@ +package com.evolutiongaming.skafka + +import cats.effect._ +import cats.data.{NonEmptySet => Nes} +import cats.effect.syntax.all._ +import cats.syntax.all._ +import cats.{Applicative, Functor, Monad} +import com.evolutiongaming.catshelper.{FromTry, Log, LogOf, RandomIdOf} +import com.evolutiongaming.skafka.consumer.{AutoOffsetReset, ConsumerConfig, ConsumerOf, Consumer => SKafkaConsumer} +import com.evolutiongaming.skafka.producer.{ProducerConfig, ProducerRecord, ProducerOf, Producer => SKafkaProducer} + +import scala.concurrent.CancellationException +import scala.concurrent.duration._ + +trait KafkaHealthCheck[F[_]] { + + def error: F[Option[Throwable]] + + def done: F[Unit] +} + +object KafkaHealthCheck { + + def empty[F[_] : Applicative]: KafkaHealthCheck[F] = new KafkaHealthCheck[F] { + + def error = none[Throwable].pure[F] + + def done = ().pure[F] + } + + + def of[F[_] : Temporal : LogOf : ConsumerOf : ProducerOf : RandomIdOf : FromTry]( + config: Config, + consumerConfig: ConsumerConfig, + producerConfig: ProducerConfig + ): Resource[F, KafkaHealthCheck[F]] = { + + val result = for { + log <- LogOf[F].apply(KafkaHealthCheck.getClass) + randomId <- RandomIdOf[F].apply + } yield { + val key = randomId.value + + val consumer = Consumer.of[F](key, consumerConfig) + + val producer = Producer.of[F](config.topic, producerConfig) + + of( + key = key, + config = config, + stop = false.pure[F], + producer = producer, + consumer = consumer, + log = log) + } + + result + .toResource + .flatten + } + + def of[F[_] : Temporal]( + key: String, + config: Config, + stop: F[Boolean], + producer: Resource[F, Producer[F]], + consumer: Resource[F, Consumer[F]], + log: Log[F] + ): Resource[F, KafkaHealthCheck[F]] = { + + val result = for { + ref <- Ref.of[F, Option[Throwable]](None) + fiber <- (producer, consumer) + .tupled + .use { case (producer, consumer) => run(key, config, stop, producer, consumer, ref.set, log) } + .start + } yield { + val result = new KafkaHealthCheck[F] { + def error = ref.get + def done = fiber.join.flatMap { + case Outcome.Succeeded(_) => Temporal[F].unit + case Outcome.Errored(e) => Temporal[F].raiseError(e) + case Outcome.Canceled() => Temporal[F].raiseError(new CancellationException("HealthCheck cancelled")) + } + } + (result, fiber.cancel) + } + + Resource(result) + } + + def run[F[_] : Temporal]( + key: String, + config: Config, + stop: F[Boolean], + producer: Producer[F], + consumer: Consumer[F], + set: Option[Throwable] => F[Unit], + log: Log[F] + ): F[Unit] = { + + val sleep = Temporal[F].sleep(config.interval) + + def produce(value: String) = { + val record = Record(key = key.some, value = value.some) + for { + _ <- log.debug(s"$key send $value") + _ <- producer.send(record) + } yield {} + } + + def produceConsume(n: Long) = { + val value = n.toString + + def consume(retry: Long) = { + for { + records <- consumer.poll(config.pollTimeout) + found = records.find { record => record.key.contains_(key) && record.value.contains_(value) } + result <- found.fold { + for { + _ <- sleep + _ <- produce(s"$n:$retry") + } yield { + (retry + 1).asLeft[Unit] + } + } { _ => + ().asRight[Long].pure[F] + } + } yield result + } + + val produceConsume = for { + _ <- produce(value) + _ <- 0L.tailRecM(consume) + } yield {} + + produceConsume + .timeout(config.timeout) + .redeem(_.some, _ => none[Throwable]) + } + + def check(n: Long) = { + for { + error <- produceConsume(n) + _ <- error.fold(().pure[F]) { error => log.error(s"$n failed with $error") } + _ <- set(error) + _ <- sleep + stop <- stop + } yield { + if (stop) ().asRight[Long] + else (n + 1).asLeft[Unit] + } + } + + for { + _ <- Temporal[F].sleep(config.initial) + _ <- consumer.subscribe(config.topic) + _ <- consumer.poll(config.interval) + _ <- produceConsume(0L) // warmup + _ <- 1L.tailRecM(check) + } yield {} + } + + + trait Producer[F[_]] { + def send(record: Record): F[Unit] + } + + object Producer { + + def apply[F[_]](implicit F: Producer[F]): Producer[F] = F + + def apply[F[_] : Monad : FromTry](topic: Topic, producer: SKafkaProducer[F]): Producer[F] = { + new Producer[F] { + def send(record: Record) = { + val record1 = ProducerRecord[String, String](topic = topic, key = record.key, value = record.value) + producer.send(record1).void + } + } + } + + def of[F[_] : Monad : ProducerOf : FromTry](topic: Topic, config: ProducerConfig): Resource[F, Producer[F]] = { + for { + producer <- implicitly[ProducerOf[F]].apply(config) + } yield { + Producer[F](topic = topic, producer = producer) + } + } + } + + + trait Consumer[F[_]] { + + def subscribe(topic: Topic): F[Unit] + + def poll(timeout: FiniteDuration): F[Iterable[Record]] + } + + object Consumer { + + def apply[F[_]](implicit F: Consumer[F]): Consumer[F] = F + + def apply[F[_] : Functor](consumer: SKafkaConsumer[F, String, String]): Consumer[F] = { + + new Consumer[F] { + + def subscribe(topic: Topic) = { + consumer.subscribe(Nes.of(topic)) + } + + def poll(timeout: FiniteDuration) = { + for { + records <- consumer.poll(timeout) + } yield for { + record <- records.values.values.flatMap(_.toList) + } yield { + Record(key = record.key.map(_.value), value = record.value.map(_.value)) + } + } + } + } + + def of[F[_] : Monad : ConsumerOf : FromTry](key: String, config: ConsumerConfig): Resource[F, Consumer[F]] = { + val config1 = { + val groupId = config.common.clientId.fold(key) { clientId => s"$clientId-$key" } + config.copy( + groupId = groupId.some, + autoOffsetReset = AutoOffsetReset.Latest) + } + + for { + consumer <- implicitly[ConsumerOf[F]].apply[String, String](config1) + } yield { + Consumer[F](consumer) + } + } + } + + + final case class Record(key: Option[String], value: Option[String]) + + + final case class Config( + topic: Topic = "healthcheck", + initial: FiniteDuration = 10.seconds, + interval: FiniteDuration = 1.second, + timeout: FiniteDuration = 2.minutes, + pollTimeout: FiniteDuration = 10.millis) + + object Config { + val default: Config = Config() + } +} From 671fe86ed575b546a038880dc2dd437bcee521db Mon Sep 17 00:00:00 2001 From: Grzegorz Bielski Date: Mon, 5 Aug 2024 17:12:08 +0200 Subject: [PATCH 2/5] Add KafkaHealthCheckSpec --- build.sbt | 1 + project/Dependencies.scala | 27 ++-- .../skafka/KafkaHealthCheckSpec.scala | 136 ++++++++++++++++++ 3 files changed, 151 insertions(+), 13 deletions(-) create mode 100644 skafka/src/test/scala/com/evolutiongaming/skafka/KafkaHealthCheckSpec.scala diff --git a/build.sbt b/build.sbt index 90f1934f..0a87448d 100644 --- a/build.sbt +++ b/build.sbt @@ -79,6 +79,7 @@ lazy val skafka = (project in file("skafka") scalatest % Test, Cats.laws % Test, discipline % Test, + CatsEffect.effectTestKit % Test, `scala-java8-compat`, `collection-compat` ))) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 7bbf9e74..39296373 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -2,20 +2,20 @@ import sbt._ object Dependencies { - val `config-tools` = "com.evolutiongaming" %% "config-tools" % "1.0.5" - val `future-helper` = "com.evolutiongaming" %% "future-helper" % "1.0.7" - val `cats-helper` = "com.evolutiongaming" %% "cats-helper" % "3.10.6-SNAPSHOT" - val `testcontainers-kafka` = "com.dimafeng" %% "testcontainers-scala-kafka" % "0.41.0" - val `play-json-jsoniter` = "com.evolution" %% "play-json-jsoniter" % "1.0.0" - val `scala-java8-compat` = "org.scala-lang.modules" %% "scala-java8-compat" % "1.0.2" - val `collection-compat` = "org.scala-lang.modules" %% "scala-collection-compat" % "2.11.0" - val scalatest = "org.scalatest" %% "scalatest" % "3.2.17" - val `kind-projector` = "org.typelevel" % "kind-projector" % "0.13.2" - val discipline = "org.typelevel" %% "discipline-scalatest" % "2.2.0" + val `config-tools` = "com.evolutiongaming" %% "config-tools" % "1.0.5" + val `future-helper` = "com.evolutiongaming" %% "future-helper" % "1.0.7" + val `cats-helper` = "com.evolutiongaming" %% "cats-helper" % "3.10.6-SNAPSHOT" + val `testcontainers-kafka` = "com.dimafeng" %% "testcontainers-scala-kafka" % "0.41.0" + val `play-json-jsoniter` = "com.evolution" %% "play-json-jsoniter" % "1.0.0" + val `scala-java8-compat` = "org.scala-lang.modules" %% "scala-java8-compat" % "1.0.2" + val `collection-compat` = "org.scala-lang.modules" %% "scala-collection-compat" % "2.11.0" + val scalatest = "org.scalatest" %% "scalatest" % "3.2.17" + val `kind-projector` = "org.typelevel" % "kind-projector" % "0.13.2" + val discipline = "org.typelevel" %% "discipline-scalatest" % "2.2.0" object Kafka { private val version = "3.4.0" - val clients = "org.apache.kafka" % "kafka-clients" % version + val clients = "org.apache.kafka" % "kafka-clients" % version } object Logback { @@ -38,8 +38,9 @@ object Dependencies { object CatsEffect { private val version = "3.4.8" - val effect = "org.typelevel" %% "cats-effect" % version - val effectStd = "org.typelevel" %% "cats-effect-std" % version + val effect = "org.typelevel" %% "cats-effect" % version + val effectStd = "org.typelevel" %% "cats-effect-std" % version + val effectTestKit = "org.typelevel" %% "cats-effect-testkit" % version } object Smetrics { diff --git a/skafka/src/test/scala/com/evolutiongaming/skafka/KafkaHealthCheckSpec.scala b/skafka/src/test/scala/com/evolutiongaming/skafka/KafkaHealthCheckSpec.scala new file mode 100644 index 00000000..37977b5d --- /dev/null +++ b/skafka/src/test/scala/com/evolutiongaming/skafka/KafkaHealthCheckSpec.scala @@ -0,0 +1,136 @@ +package com.evolutiongaming.skafka + +import cats.effect._ +import cats.syntax.all._ +import cats.effect.testkit.TestControl +import com.evolutiongaming.catshelper.Log +import com.evolutiongaming.skafka.IOSuite._ +import com.evolutiongaming.skafka.KafkaHealthCheck.Record +import com.evolutiongaming.skafka.Topic +import org.scalatest.funsuite.AsyncFunSuite +import org.scalatest.matchers.should.Matchers + +import scala.concurrent.duration._ +import scala.util.control.NoStackTrace + +class KafkaHealthCheckSpec extends AsyncFunSuite with Matchers { + import KafkaHealthCheckSpec._ + + test("error") { + implicit val log = Log.empty[IO] + + val producer = new KafkaHealthCheck.Producer[IO] { + def send(record: Record) = ().pure[IO] + } + + val consumer = new KafkaHealthCheck.Consumer[IO] { + + def subscribe(topic: Topic) = ().pure[IO] + + def poll(timeout: FiniteDuration) = { + if (timeout == 1.second) List.empty[Record].pure[IO] + else Error.raiseError[IO, List[Record]] + } + } + + val healthCheck = KafkaHealthCheck.of[IO]( + key = "key", + config = KafkaHealthCheck.Config(topic = "topic", initial = 0.seconds, interval = 1.second), + stop = false.pure[IO], + producer = Resource.pure[IO, KafkaHealthCheck.Producer[IO]](producer), + consumer = Resource.pure[IO, KafkaHealthCheck.Consumer[IO]](consumer), + log = log + ) + val result = for { + error <- healthCheck.use(_.error.untilDefinedM) + } yield { + error shouldEqual error + } + result.run() + } + + test("periodic healthcheck") { + final case class State( + checks: Int = 0, + subscribed: Option[Topic] = None, + logs: List[String] = List.empty, + records: List[Record] = List.empty + ) + + // TODO: mapK ? + def logOf(ref: Ref[IO, State]): Log[IO] = { + def add(log: String): IO[Unit] = + ref.update(state => state.copy(logs = log :: state.logs)) + + new Log[IO] { + def trace(msg: => String, mdc: Log.Mdc) = add(s"trace $msg") + + def debug(msg: => String, mdc: Log.Mdc) = add(s"debug $msg") + + def info(msg: => String, mdc: Log.Mdc) = add(s"info $msg") + + def warn(msg: => String, mdc: Log.Mdc) = add(s"warn $msg") + + def warn(msg: => String, cause: Throwable, mdc: Log.Mdc) = add(s"warn $msg $cause") + + def error(msg: => String, mdc: Log.Mdc) = add(s"error $msg") + + def error(msg: => String, cause: Throwable, mdc: Log.Mdc) = add(s"error $msg $cause") + } + } + + def consumerOf(ref: Ref[IO, State]) = new KafkaHealthCheck.Consumer[IO] { + def subscribe(topic: Topic): IO[Unit] = + ref.update(_.copy(subscribed = topic.some)) + + def poll(timeout: FiniteDuration): IO[Iterable[Record]] = + ref + .modify(state => + if (state.records.size >= 2) (state.copy(records = List.empty), state.records) + else (state, List.empty) + ) + } + + def producerOf(ref: Ref[IO, State]): KafkaHealthCheck.Producer[IO] = new KafkaHealthCheck.Producer[IO] { + def send(record: Record): IO[Unit] = + ref.update(state => state.copy(records = record :: state.records)) + } + + def stopOf(ref: Ref[IO, State]): IO[Boolean] = + ref.updateAndGet(state => state.copy(checks = state.checks - 1)).map(_.checks <= 0) + + val result = for { + ref <- Ref.of[IO, State](State(checks = 2)) + healthCheck = KafkaHealthCheck.of[IO]( + key = "key", + config = + KafkaHealthCheck.Config(topic = "topic", initial = 0.millis, interval = 0.millis, timeout = 100.millis), + stop = stopOf(ref), + producer = Resource.pure[IO, KafkaHealthCheck.Producer[IO]](producerOf(ref)), + consumer = Resource.pure[IO, KafkaHealthCheck.Consumer[IO]](consumerOf(ref)), + log = logOf(ref) + ) + _ <- TestControl.executeEmbed(healthCheck.use(_.done)) + state <- ref.get + + } yield state shouldEqual State( + checks = 0, + subscribed = "topic".some, + logs = List( + "debug key send 2:0", + "debug key send 2", + "debug key send 1:0", + "debug key send 1", + "debug key send 0:0", + "debug key send 0" + ), + records = List() + ) + + result.run() + } +} + +object KafkaHealthCheckSpec { + val Error: Throwable = new RuntimeException with NoStackTrace +} From a5be56817f6d1e430d9ee97a258916e120496f85 Mon Sep 17 00:00:00 2001 From: Grzegorz Bielski Date: Tue, 6 Aug 2024 12:46:34 +0200 Subject: [PATCH 3/5] Add scaladoc and reformat --- project/Dependencies.scala | 1 + .../skafka/KafkaHealthCheck.scala | 73 ++++++++++--------- .../skafka/KafkaHealthCheckSpec.scala | 1 - 3 files changed, 38 insertions(+), 37 deletions(-) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 39296373..9fd06ab5 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -4,6 +4,7 @@ object Dependencies { val `config-tools` = "com.evolutiongaming" %% "config-tools" % "1.0.5" val `future-helper` = "com.evolutiongaming" %% "future-helper" % "1.0.7" + // TODO: `3.10.6-SNAPSHOT` is a local release, remote release depends on https://github.com/evolution-gaming/cats-helper/pull/288 val `cats-helper` = "com.evolutiongaming" %% "cats-helper" % "3.10.6-SNAPSHOT" val `testcontainers-kafka` = "com.dimafeng" %% "testcontainers-scala-kafka" % "0.41.0" val `play-json-jsoniter` = "com.evolution" %% "play-json-jsoniter" % "1.0.0" diff --git a/skafka/src/main/scala/com/evolutiongaming/skafka/KafkaHealthCheck.scala b/skafka/src/main/scala/com/evolutiongaming/skafka/KafkaHealthCheck.scala index 7393af29..78738ef8 100644 --- a/skafka/src/main/scala/com/evolutiongaming/skafka/KafkaHealthCheck.scala +++ b/skafka/src/main/scala/com/evolutiongaming/skafka/KafkaHealthCheck.scala @@ -12,24 +12,36 @@ import com.evolutiongaming.skafka.producer.{ProducerConfig, ProducerRecord, Prod import scala.concurrent.CancellationException import scala.concurrent.duration._ +/** + * Provides a health check mechanism that repeatedly sends and consumes messages to/from Kafka. + */ trait KafkaHealthCheck[F[_]] { + /** + * Returns the last error that occurred during the health check. + * + * @return + */ def error: F[Option[Throwable]] + /** + * Blocks a fiber until the health check is done. + * + * @return + */ def done: F[Unit] } object KafkaHealthCheck { - def empty[F[_] : Applicative]: KafkaHealthCheck[F] = new KafkaHealthCheck[F] { + def empty[F[_]: Applicative]: KafkaHealthCheck[F] = new KafkaHealthCheck[F] { def error = none[Throwable].pure[F] def done = ().pure[F] } - - def of[F[_] : Temporal : LogOf : ConsumerOf : ProducerOf : RandomIdOf : FromTry]( + def of[F[_]: Temporal: LogOf: ConsumerOf: ProducerOf: RandomIdOf: FromTry]( config: Config, consumerConfig: ConsumerConfig, producerConfig: ProducerConfig @@ -45,13 +57,7 @@ object KafkaHealthCheck { val producer = Producer.of[F](config.topic, producerConfig) - of( - key = key, - config = config, - stop = false.pure[F], - producer = producer, - consumer = consumer, - log = log) + of(key = key, config = config, stop = false.pure[F], producer = producer, consumer = consumer, log = log) } result @@ -59,7 +65,7 @@ object KafkaHealthCheck { .flatten } - def of[F[_] : Temporal]( + def of[F[_]: Temporal]( key: String, config: Config, stop: F[Boolean], @@ -69,7 +75,7 @@ object KafkaHealthCheck { ): Resource[F, KafkaHealthCheck[F]] = { val result = for { - ref <- Ref.of[F, Option[Throwable]](None) + ref <- Ref.of[F, Option[Throwable]](None) fiber <- (producer, consumer) .tupled .use { case (producer, consumer) => run(key, config, stop, producer, consumer, ref.set, log) } @@ -89,7 +95,7 @@ object KafkaHealthCheck { Resource(result) } - def run[F[_] : Temporal]( + def run[F[_]: Temporal]( key: String, config: Config, stop: F[Boolean], @@ -116,7 +122,7 @@ object KafkaHealthCheck { for { records <- consumer.poll(config.pollTimeout) found = records.find { record => record.key.contains_(key) && record.value.contains_(value) } - result <- found.fold { + result <- found.fold { for { _ <- sleep _ <- produce(s"$n:$retry") @@ -141,11 +147,11 @@ object KafkaHealthCheck { def check(n: Long) = { for { - error <- produceConsume(n) - _ <- error.fold(().pure[F]) { error => log.error(s"$n failed with $error") } - _ <- set(error) - _ <- sleep - stop <- stop + error <- produceConsume(n) + _ <- error.fold(().pure[F]) { error => log.error(s"$n failed with $error") } + _ <- set(error) + _ <- sleep + stop <- stop } yield { if (stop) ().asRight[Long] else (n + 1).asLeft[Unit] @@ -161,7 +167,6 @@ object KafkaHealthCheck { } yield {} } - trait Producer[F[_]] { def send(record: Record): F[Unit] } @@ -170,7 +175,7 @@ object KafkaHealthCheck { def apply[F[_]](implicit F: Producer[F]): Producer[F] = F - def apply[F[_] : Monad : FromTry](topic: Topic, producer: SKafkaProducer[F]): Producer[F] = { + def apply[F[_]: Monad: FromTry](topic: Topic, producer: SKafkaProducer[F]): Producer[F] = { new Producer[F] { def send(record: Record) = { val record1 = ProducerRecord[String, String](topic = topic, key = record.key, value = record.value) @@ -179,7 +184,7 @@ object KafkaHealthCheck { } } - def of[F[_] : Monad : ProducerOf : FromTry](topic: Topic, config: ProducerConfig): Resource[F, Producer[F]] = { + def of[F[_]: Monad: ProducerOf: FromTry](topic: Topic, config: ProducerConfig): Resource[F, Producer[F]] = { for { producer <- implicitly[ProducerOf[F]].apply(config) } yield { @@ -188,11 +193,10 @@ object KafkaHealthCheck { } } - trait Consumer[F[_]] { def subscribe(topic: Topic): F[Unit] - + def poll(timeout: FiniteDuration): F[Iterable[Record]] } @@ -200,7 +204,7 @@ object KafkaHealthCheck { def apply[F[_]](implicit F: Consumer[F]): Consumer[F] = F - def apply[F[_] : Functor](consumer: SKafkaConsumer[F, String, String]): Consumer[F] = { + def apply[F[_]: Functor](consumer: SKafkaConsumer[F, String, String]): Consumer[F] = { new Consumer[F] { @@ -220,12 +224,10 @@ object KafkaHealthCheck { } } - def of[F[_] : Monad : ConsumerOf : FromTry](key: String, config: ConsumerConfig): Resource[F, Consumer[F]] = { + def of[F[_]: Monad: ConsumerOf: FromTry](key: String, config: ConsumerConfig): Resource[F, Consumer[F]] = { val config1 = { val groupId = config.common.clientId.fold(key) { clientId => s"$clientId-$key" } - config.copy( - groupId = groupId.some, - autoOffsetReset = AutoOffsetReset.Latest) + config.copy(groupId = groupId.some, autoOffsetReset = AutoOffsetReset.Latest) } for { @@ -236,16 +238,15 @@ object KafkaHealthCheck { } } - final case class Record(key: Option[String], value: Option[String]) - final case class Config( - topic: Topic = "healthcheck", - initial: FiniteDuration = 10.seconds, - interval: FiniteDuration = 1.second, - timeout: FiniteDuration = 2.minutes, - pollTimeout: FiniteDuration = 10.millis) + topic: Topic = "healthcheck", + initial: FiniteDuration = 10.seconds, + interval: FiniteDuration = 1.second, + timeout: FiniteDuration = 2.minutes, + pollTimeout: FiniteDuration = 10.millis + ) object Config { val default: Config = Config() diff --git a/skafka/src/test/scala/com/evolutiongaming/skafka/KafkaHealthCheckSpec.scala b/skafka/src/test/scala/com/evolutiongaming/skafka/KafkaHealthCheckSpec.scala index 37977b5d..b24b5931 100644 --- a/skafka/src/test/scala/com/evolutiongaming/skafka/KafkaHealthCheckSpec.scala +++ b/skafka/src/test/scala/com/evolutiongaming/skafka/KafkaHealthCheckSpec.scala @@ -57,7 +57,6 @@ class KafkaHealthCheckSpec extends AsyncFunSuite with Matchers { records: List[Record] = List.empty ) - // TODO: mapK ? def logOf(ref: Ref[IO, State]): Log[IO] = { def add(log: String): IO[Unit] = ref.update(state => state.copy(logs = log :: state.logs)) From 99a1a23661c87c50c28e179dcab289a22d02fce9 Mon Sep 17 00:00:00 2001 From: Grzegorz Bielski Date: Mon, 12 Aug 2024 13:18:23 +0200 Subject: [PATCH 4/5] use newest cats-helper --- project/Dependencies.scala | 3 +-- .../scala/com/evolutiongaming/skafka/KafkaHealthCheck.scala | 4 ---- .../com/evolutiongaming/skafka/KafkaHealthCheckSpec.scala | 2 +- 3 files changed, 2 insertions(+), 7 deletions(-) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 9fd06ab5..e6353af5 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -4,8 +4,7 @@ object Dependencies { val `config-tools` = "com.evolutiongaming" %% "config-tools" % "1.0.5" val `future-helper` = "com.evolutiongaming" %% "future-helper" % "1.0.7" - // TODO: `3.10.6-SNAPSHOT` is a local release, remote release depends on https://github.com/evolution-gaming/cats-helper/pull/288 - val `cats-helper` = "com.evolutiongaming" %% "cats-helper" % "3.10.6-SNAPSHOT" + val `cats-helper` = "com.evolutiongaming" %% "cats-helper" % "3.11.0" val `testcontainers-kafka` = "com.dimafeng" %% "testcontainers-scala-kafka" % "0.41.0" val `play-json-jsoniter` = "com.evolution" %% "play-json-jsoniter" % "1.0.0" val `scala-java8-compat` = "org.scala-lang.modules" %% "scala-java8-compat" % "1.0.2" diff --git a/skafka/src/main/scala/com/evolutiongaming/skafka/KafkaHealthCheck.scala b/skafka/src/main/scala/com/evolutiongaming/skafka/KafkaHealthCheck.scala index 78738ef8..4d9e2923 100644 --- a/skafka/src/main/scala/com/evolutiongaming/skafka/KafkaHealthCheck.scala +++ b/skafka/src/main/scala/com/evolutiongaming/skafka/KafkaHealthCheck.scala @@ -19,15 +19,11 @@ trait KafkaHealthCheck[F[_]] { /** * Returns the last error that occurred during the health check. - * - * @return */ def error: F[Option[Throwable]] /** * Blocks a fiber until the health check is done. - * - * @return */ def done: F[Unit] } diff --git a/skafka/src/test/scala/com/evolutiongaming/skafka/KafkaHealthCheckSpec.scala b/skafka/src/test/scala/com/evolutiongaming/skafka/KafkaHealthCheckSpec.scala index b24b5931..b212e56d 100644 --- a/skafka/src/test/scala/com/evolutiongaming/skafka/KafkaHealthCheckSpec.scala +++ b/skafka/src/test/scala/com/evolutiongaming/skafka/KafkaHealthCheckSpec.scala @@ -17,7 +17,7 @@ class KafkaHealthCheckSpec extends AsyncFunSuite with Matchers { import KafkaHealthCheckSpec._ test("error") { - implicit val log = Log.empty[IO] + implicit val log: Log[IO] = Log.empty[IO] val producer = new KafkaHealthCheck.Producer[IO] { def send(record: Record) = ().pure[IO] From 1d405d5a83664e1210bfb3e10d4b9ef76ae25035 Mon Sep 17 00:00:00 2001 From: Grzegorz Bielski Date: Mon, 12 Aug 2024 14:23:40 +0200 Subject: [PATCH 5/5] set version to 16.4.0 --- version.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/version.sbt b/version.sbt index 9c22bd02..81ffc51a 100644 --- a/version.sbt +++ b/version.sbt @@ -1 +1 @@ -ThisBuild / version := "16.3.1-SNAPSHOT" +ThisBuild / version := "16.4.0"