Skip to content

Latest commit

 

History

History
128 lines (94 loc) · 5.82 KB

README.md

File metadata and controls

128 lines (94 loc) · 5.82 KB

Skafka

Build Status Coverage Status Codacy Badge Version License: MIT

Scala wrapper for kafka-clients v3.4.0

Motivation

Kafka provides an official Java client out of the box, which could be used from Scala code without any additional modifications.

The main disadvantage of using an official client directly is that it implies a very specific threading model to the application. I.e. the consumer is not thread safe and also expects a rebalance listener to do the operations in the same thread.

This makes wrapping a client with Cats Effect classes a bit more complicated than just calling IO { consumer.poll() } unless this is the only call, which is expected to be used.

Skafka does exactly that: a very thin wrapper over official Kafka client to provide a ready-made Cats Effect API and handle some corner cases concerning ConsumerRebalanceListener calls.

Comparing to more full-featured libraries such as FS2 Kafka, it might be a little bit more reliable, because there is little code/logic to hide the accidenital bugs in.

To summarize:

  1. If it suits your goals (i.e. you only ever need to do consumer.poll() without acting on rebalance etc.) then using an official Kafka client directly, optionally, wrapping all the calls with cats.effect.IO, is a totally fine idea.
  2. If more complicated integration to Cats Effect is required, i.e. ConsumerRebalanceListener is going to be used then consider using Skafka.
  3. If streaming with FS2 is required or any other features the library provides then FS2 Kafka could be a good choice. Note, that it is less trivial then Skafka and may contain more bugs on top of the official Kafka client.

Key features

  1. It provides null-less Scala apis for Producer & Consumer

  2. Makes it easy to use your effect monad with help of cats-effect

  3. Blocking calls are being executed on provided ExecutionContext.

  4. Simple case class based configuration

  5. Support of typesafe config

Producer usage example

val producer = Producer.of[IO](config, ecBlocking)
val metadata: IO[RecordMetadata] = producer.use { producer =>
  val record = ProducerRecord(topic = "topic", key = "key", value = "value") 
  producer.send(record).flatten 
}

Consumer usage example

val consumer = Consumer.of[IO, String, String](config, ecBlocking)
val records: IO[ConsumerRecords[String, String]] = consumer.use { consumer => 
  for {
    _       <- consumer.subscribe(Nel("topic"), None)
    records <- consumer.poll(100.millis)
  } yield records 
}

Java client metrics example

The example below demonstrates creation of Consumer, but same can be done for Producer as well.

⚠️ using ConsumerMetricsOf.withJavaClientMetrics (or its alternative metrics.exposeJavaClientMetrics) registers new Prometheus collector under the hood. Please use unique prefixes for each collector to avoid duplicated metrics in Prometheus (i.e. runtime exception on registration). Prefix can be set as parameter in: ConsumerMetricsOf.withJavaClientMetrics(prometheus, Some("the_prefix"))

import ConsumerMetricsOf.*

val config: ConsumerConfig = ???
val prometheus: CollectorRegistry = ???
val metrics: ConsumerMetrics[IO] = ???

for {
  metrics   <- metrics.exposeJavaClientMetrics(prometheus)
  consumerOf = ConsumerOf.apply1(metrics1.some) 
  consumer  <- consumerOf(config)
} yield ???

Setup

addSbtPlugin("com.evolution" % "sbt-artifactory-plugin" % "0.0.2")

libraryDependencies += "com.evolutiongaming" %% "skafka" % "15.0.0"

Notes

While Skafka provides an ability to use ConsumerRebalanceListener functionality, not all of the method calls are supported.

See the following PRs for more details: #150 #122

To our latest knowledge neither FS2 Kafka supports all of the methods / functionality.

Release process

The release process is based on Git tags and makes use of sbt-dynver to automatically obtain the version from the latest Git tag. The flow is defined in .github/workflows/release.yml.
A typical release process is as follows:

  1. Create and push a new Git tag. The version should be in the format vX.Y.Z (example: v4.1.0). Example: git tag v4.1.0 && git push origin v4.1.0
  2. Create a new release in GitHub. Go to the Releases page, click Draft a new release, select Choose a tag, pick the tag you just created
  3. Press Generate release notes. Release title will be automatically filled with the tag name. Change the description if needed
  4. Press Publish release