KafkaConsumer
is a concrete Kafka client to consume records from Kafka topics for Kafka developers to write Kafka consumers.
KafkaConsumer
is created with properties and (key and value) deserializers.
Note
|
bootstrap.servers and group.id properties are mandatory. Use ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG and ConsumerConfig.GROUP_ID_CONFIG values in your source code, respectively. |
val bootstrapServers = ":9092,localhost:9092"
val groupId = "kafka-sandbox"
import org.apache.kafka.clients.consumer.ConsumerConfig
val requiredConfigsOnly = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers,
ConsumerConfig.GROUP_ID_CONFIG -> groupId
)
import org.apache.kafka.common.serialization.StringDeserializer
val keyDeserializer = new StringDeserializer
val valueDeserializer = new StringDeserializer
import scala.collection.JavaConverters._
import org.apache.kafka.clients.consumer.KafkaConsumer
val consumer = new KafkaConsumer[String, String](
requiredConfigsOnly.asJava,
keyDeserializer,
valueDeserializer)
Once created, KafkaConsumer
is supposed to subscribe to topics or assign partitions.
scala> :type consumer
org.apache.kafka.clients.consumer.KafkaConsumer[String,String]
import scala.collection.JavaConverters._
val topics = Seq("input").asJava
consumer.subscribe(topics)
KafkaConsumer
is then requested to poll for records (in a loop).
scala> :type consumer
org.apache.kafka.clients.consumer.KafkaConsumer[String,String]
import java.time.Duration.ZERO
val records = consumer.poll(ZERO)
records.asScala.foreach(println)
KafkaConsumer
registers itself in JMX with kafka.consumer prefix.
Important
|
|
Name | Description |
---|---|
|
Zero or more PartitionAssignors Configured using ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG (aka Used exclusively to create the ConsumerCoordinator |
|
Used mainly (?) to create the Fetcher and ConsumerCoordinator Used also in poll, pollOnce and wakeup (but I think the usage should be limited to create Fetcher and ConsumerCoordinator) |
|
|
|
|
|
Created right when Used when…FIXME |
|
Used when…FIXME |
|
Created for a KafkaConsumer with the following:
Used to create a NetworkClient (for the ConsumerNetworkClient), the ConsumerNetworkClient itself, the ConsumerCoordinator and Fetcher Used in…FIXME |
|
|
|
retry.backoff.ms property or a user-defined value |
|
Corresponds to request.timeout.ms property
|
|
Created when |
Tip
|
Enable Add the following line to
Refer to Logging. |
void assign(Collection<TopicPartition> partitions)
Note
|
assign is part of Consumer Contract to…FIXME.
|
assign
…FIXME
void subscribe(Collection<String> topics) // (1)
void subscribe(Collection<String> topics, ConsumerRebalanceListener listener)
void subscribe(Pattern pattern, ConsumerRebalanceListener listener)
-
A short-hand for the other subscribe with
NoOpConsumerRebalanceListener
asConsumerRebalanceListener
subscribe
subscribes KafkaConsumer
to the given topics.
Note
|
subscribe is a part of Consumer Contract to…FIXME
|
val topics = Seq("topic1")
println(s"Subscribing to ${topics.mkString(", ")}")
import scala.collection.JavaConverters._
consumer.subscribe(topics.asJava)
Internally, subscribe
prints out the following DEBUG message to the logs:
DEBUG Subscribed to topic(s): [comma-separated topics]
subscribe
then requests SubscriptionState to subscribe
for the topics
and listener
.
In the end, subscribe
requests SubscriptionState for groupSubscription
that it then passes along to Metadata to set the topics to track.
ConsumerRecords<K, V> poll(
Duration timeout)
Note
|
poll is part of the Consumer Contract to poll for ConsumerRecords.
|
poll
polls for new records until timeout
expires.
scala> :type consumer
org.apache.kafka.clients.consumer.KafkaConsumer[String,String]
import java.time.Duration.ZERO
while (true) {
println(s"Polling for records for $ZERO secs")
val records = consumer.poll(ZERO)
// do something with the records
// e.g. print them out to the console
records.asScala.foreach(println)
}
Note
|
KafkaConsumer has to be subscribed to topics or assigned partitions before calling poll.
|
scala> :type consumer
org.apache.kafka.clients.consumer.KafkaConsumer[String,String]
import java.time.Duration
scala> val records = consumer.poll(Duration.ZERO)
java.lang.IllegalStateException: Consumer is not subscribed to any topics or assigned any partitions
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1171)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1164)
... 36 elided
void seek(TopicPartition partition, long offset)
Note
|
seek is part of Consumer Contract to…FIXME.
|
seek
…FIXME
boolean updateFetchPositions(final Timer timer)
updateFetchPositions
…FIXME
Note
|
updateFetchPositions is used when…FIXME
|
Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout)
pollOnce
…FIXME
Note
|
pollOnce is used exclusively when KafkaConsumer is requested to poll
|
Map<String, List<PartitionInfo>> listTopics()
Internally, listTopics
simply requests Fetcher for metadata for all topics and returns it.
consumer.listTopics().asScala.foreach { case (name, partitions) =>
println(s"topic: $name (partitions: ${partitions.size()})")
}
Note
|
listTopics uses requestTimeoutMs that corresponds to request.timeout.ms property.
|
Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions)
beginningOffsets
requests Fetcher for beginningOffsets and returns it.
KafkaConsumer
takes the following when created:
-
Consumer configuration (that is converted internally to ConsumerConfig)
-
Deserializer for keys
-
Deserializer for values
KafkaConsumer
initializes the internal registries and counters.
Note
|
KafkaConsumer API offers other constructors that in the end use the public 3-argument constructor that in turn passes the call on to the private internal constructor.
|
// Public API
KafkaConsumer(
Map<String, Object> configs,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer)
When created, KafkaConsumer
adds the keyDeserializer and valueDeserializer to configs (as key.deserializer and value.deserializer properties respectively) and creates a ConsumerConfig.
KafkaConsumer
passes the call on to the internal constructor.
KafkaConsumer(
ConsumerConfig config,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer)
When called, the internal KafkaConsumer
constructor prints out the following DEBUG message to the logs:
DEBUG Starting the Kafka consumer
KafkaConsumer
sets the internal requestTimeoutMs to request.timeout.ms property.
KafkaConsumer
sets the internal clientId to client.id or generates one with prefix consumer- (starting from 1) if not set.
KafkaConsumer
sets the internal Metrics (and JmxReporter
with kafka.consumer prefix).
KafkaConsumer
sets the internal retryBackoffMs to retry.backoff.ms property.
Caution
|
FIXME Finish me! |
KafkaConsumer
creates the internal Metadata with the following arguments:
-
allowAutoTopicCreation
enabled -
topicExpiryEnabled
disabled -
ClusterResourceListeners with user-defined list of ConsumerInterceptors in interceptor.classes property
KafkaConsumer
updates metadata
with bootstrap.servers.
Caution
|
FIXME Finish me! |
KafkaConsumer
creates a NetworkClient with…FIXME
Caution
|
FIXME Finish me! |
KafkaConsumer
creates Fetcher with the following properties:
In the end, KafkaConsumer
prints out the following DEBUG message to the logs:
DEBUG Kafka consumer created
Any issues while creating a KafkaConsumer
are reported as KafkaException
.
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
void wakeup()
Note
|
wakeup is a part of Consumer Contract.
|
wakeup
simply requests ConsumerNetworkClient to wakeup.
Note
|
Quoting
Read about Selection in java.nio.channels.Selector's javadoc. |
Note
|
wakeup is used when…FIXME
|
ClusterResourceListeners configureClusterResourceListeners(
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer,
List<?>... candidateLists)
configureClusterResourceListeners
creates a ClusterResourceListeners and registers ClusterResourceListener
instances from the input candidateLists
, keyDeserializer
and valueDeserializer
.
Note
|
|
void throwIfNoAssignorsConfigured()
throwIfNoAssignorsConfigured
…FIXME
Note
|
throwIfNoAssignorsConfigured is used exclusively when KafkaConsumer is requested to subscribe to topics.
|
boolean updateAssignmentMetadataIfNeeded(
Timer timer)
updateAssignmentMetadataIfNeeded
requests the ConsumerCoordinator to poll until the Timer
expires.
updateAssignmentMetadataIfNeeded
returns false
if the poll was unsuccessful, i.e. FIXME
If the poll was successful, updateAssignmentMetadataIfNeeded
updateFetchPositions.
Note
|
updateAssignmentMetadataIfNeeded is used exclusively when KafkaConsumer is requested to poll for records.
|
void commitAsync()
void commitAsync(
OffsetCommitCallback callback)
void commitAsync(
Map<TopicPartition, OffsetAndMetadata> offsets,
OffsetCommitCallback callback)
Note
|
commitAsync is part of the Consumer Contract to…FIXME.
|
commitAsync
…FIXME
OffsetAndMetadata committed(
TopicPartition partition)
OffsetAndMetadata committed(
TopicPartition partition,
Duration timeout)
Note
|
committed is part of the Consumer Contract to…FIXME.
|
committed
…FIXME
void close()
void close(
Duration timeout)
Note
|
close is part of the Consumer Contract to…FIXME.
|
close
…FIXME
long position(
TopicPartition partition)
long position(
TopicPartition partition,
Duration timeout)
Note
|
position is part of the Consumer contract.
|
position
…FIXME
ConsumerRecords<K, V> poll(
Timer timer,
boolean includeMetadataInTimeout)
poll
first acquireAndEnsureOpen.
poll
requests the ConsumerNetworkClient to maybeTriggerWakeup.
poll
…FIXME
Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(
Timer timer)
pollForFetches
…FIXME
Note
|
pollForFetches is used exclusively when KafkaConsumer is requested to poll for records.
|