Skip to content

Commit

Permalink
Add config for transactional consuming (#1433)
Browse files Browse the repository at this point in the history
Make it easier to configure read-committed.
  • Loading branch information
erikvanoosten authored Jan 11, 2025
1 parent 3155a0a commit 8ee33ac
Showing 1 changed file with 16 additions and 0 deletions.
16 changes: 16 additions & 0 deletions zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package zio.kafka.consumer

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.IsolationLevel
import zio._
import zio.kafka.consumer.Consumer.OffsetRetrieval
import zio.kafka.consumer.fetch.{ FetchStrategy, QueueSizeBasedFetchStrategy }
Expand Down Expand Up @@ -330,6 +331,21 @@ final case class ConsumerSettings(
def withAuthErrorRetrySchedule(authErrorRetrySchedule: Schedule[Any, Throwable, Any]): ConsumerSettings =
copy(authErrorRetrySchedule = authErrorRetrySchedule)

/**
* Controls how to consume records produced transactionally.
*
* @param readCommitted
* when `true`, only consume records which have been committed, when `false`, consume all records, even records
* which are part of an aborted transaction. Non-transactional records will be consumed unconditionally in either
* mode.
*
* Note that Kafka's default is to read all records (`readCommitted = false`).
*/
def withReadCommitted(readCommitted: Boolean = true): ConsumerSettings = {
val isolationLevel = if (readCommitted) IsolationLevel.READ_COMMITTED else IsolationLevel.READ_UNCOMMITTED
withProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, isolationLevel.toString)
}

}

object ConsumerSettings {
Expand Down

0 comments on commit 8ee33ac

Please sign in to comment.