Skip to content

Commit

Permalink
Merge pull request #961: [proxima-direct-io-kafka] #350 check endOffs…
Browse files Browse the repository at this point in the history
…ets to mark partition idle
  • Loading branch information
je-ik authored Feb 6, 2025
2 parents bfdbd08 + e908729 commit 67e9745
Show file tree
Hide file tree
Showing 3 changed files with 245 additions and 253 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import cz.o2.proxima.direct.io.kafka.ElementConsumers.BulkConsumer;
import cz.o2.proxima.direct.io.kafka.ElementConsumers.OnlineConsumer;
import cz.o2.proxima.internal.com.google.common.annotations.VisibleForTesting;
import cz.o2.proxima.internal.com.google.common.base.MoreObjects;
import cz.o2.proxima.internal.com.google.common.base.Preconditions;
import java.time.Duration;
import java.util.ArrayList;
Expand All @@ -49,7 +50,6 @@
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -395,7 +395,6 @@ private void submitConsumerWithObserver(
new AtomicReference<>();
final AtomicReference<PartitionedWatermarkEstimator> watermarkEstimator =
new AtomicReference<>(null);
final Map<TopicPartition, Integer> emptyPollCount = new ConcurrentHashMap<>();
final Map<TopicPartition, Integer> topicPartitionToId = new HashMap<>();
final Duration pollDuration = Duration.ofMillis(consumerPollInterval);
final KafkaThroughputLimiter throughputLimiter =
Expand All @@ -405,13 +404,7 @@ private void submitConsumerWithObserver(
createObserveHandle(shutdown, seekOffsets, consumer, readyLatch, completedLatch));
consumer.onStart();
ConsumerRebalanceListener listener =
listener(
name,
consumerRef,
consumer,
emptyPollCount,
topicPartitionToId,
watermarkEstimator);
listener(name, consumerRef, consumer, topicPartitionToId, watermarkEstimator);

try (KafkaConsumer<Object, Object> kafka =
createConsumer(name, offsets, name != null ? listener : null, position)) {
Expand Down Expand Up @@ -448,20 +441,22 @@ private void submitConsumerWithObserver(
AtomicReference<Throwable> error = new AtomicReference<>();
long pollTimeMs = 0L;

Map<TopicPartition, Long> polledOffsets = new HashMap<>(assignment.size());
do {
if (poll.isEmpty()) {
Optional.ofNullable(watermarkEstimator.get()).ifPresent(consumer::onIdle);
}
logConsumerWatermark(name, offsets, watermarkEstimator, poll.count());
poll =
seekToNewOffsetsIfNeeded(seekOffsets, consumer, watermarkEstimator, kafka, poll);
seekToNewOffsetsIfNeeded(
seekOffsets, consumer, watermarkEstimator, kafka, poll, polledOffsets);
long bytesPolled = 0L;
// increase all partition's empty poll counter by 1
emptyPollCount.replaceAll((k, v) -> v + 1);
Set<TopicPartition> emptyPartitions = new HashSet<>(assignment);
for (ConsumerRecord<Object, Object> r : poll) {
bytesPolled += r.serializedKeySize() + r.serializedValueSize();
TopicPartition tp = new TopicPartition(r.topic(), r.partition());
emptyPollCount.put(tp, 0);
// offsets should be increasing in polled records
polledOffsets.put(tp, r.offset());
preWrite.accept(tp, r);
StreamElement ingest = serializer.read(r, getEntityDescriptor());
if (ingest != null) {
Expand Down Expand Up @@ -492,12 +487,18 @@ private void submitConsumerWithObserver(
}
}
}
increaseWatermarkOnEmptyPolls(emptyPollCount, topicPartitionToId, watermarkEstimator);
if (!emptyPartitions.isEmpty()) {
increaseWatermarkOnEmptyPolls(
kafka.endOffsets(kafka.assignment()),
polledOffsets,
topicPartitionToId,
watermarkEstimator);
}
if (!flushCommits(kafka, consumer)) {
handleRebalanceInOffsetCommit(kafka, listener);
}
rethrowErrorIfPresent(name, error);
terminateIfConsumed(stopAtCurrent, kafka, endOffsets, emptyPollCount, completed);
terminateIfConsumed(stopAtCurrent, kafka, endOffsets, polledOffsets, completed);
throughputLimiter.sleepToLimitThroughput(bytesPolled, pollTimeMs);
long startTime = System.currentTimeMillis();
poll = kafka.poll(pollDuration);
Expand Down Expand Up @@ -558,7 +559,8 @@ private ConsumerRecords<Object, Object> seekToNewOffsetsIfNeeded(
final ElementConsumer<Object, Object> consumer,
final AtomicReference<PartitionedWatermarkEstimator> watermarkEstimator,
final KafkaConsumer<Object, Object> kafka,
final ConsumerRecords<Object, Object> poll) {
final ConsumerRecords<Object, Object> poll,
final Map<TopicPartition, Long> polledOffsets) {

synchronized (seekOffsets) {
if (!seekOffsets.isEmpty()) {
Expand All @@ -575,6 +577,11 @@ private ConsumerRecords<Object, Object> seekToNewOffsetsIfNeeded(
kafka.position(tp),
watermarkEstimator.get().getWatermark()))
.collect(Collectors.toList()));
for (TopicOffset to : seekOffsets) {
polledOffsets.put(
new TopicPartition(to.getPartition().getTopic(), to.getPartition().getPartition()),
-1L);
}
log.info("Seeked consumer to offsets {} as requested", seekOffsets);
seekOffsets.clear();
return ConsumerRecords.empty();
Expand Down Expand Up @@ -614,17 +621,13 @@ private void terminateIfConsumed(
boolean stopAtCurrent,
KafkaConsumer<?, ?> consumer,
Map<TopicPartition, Long> endOffsets,
Map<TopicPartition, Integer> emptyPollCount,
Map<TopicPartition, Long> polledOffsets,
AtomicBoolean completed) {

if (stopAtCurrent) {
if (emptyPollCount.values().stream().allMatch(v -> v >= emptyPollCount.size())) {
// we need to re-fetch the end offsets, because the topic might have been truncated
// by log-roll
endOffsets.clear();
endOffsets.putAll(findNonEmptyEndOffsets(consumer));
}
if (endOffsets.isEmpty()) {
if (polledOffsets.entrySet().stream()
.allMatch(
e -> MoreObjects.firstNonNull(endOffsets.get(e.getKey()), -1L) <= e.getValue())) {
log.info(
"Assignment {} reached end of current data. Terminating consumption.",
consumer.assignment());
Expand Down Expand Up @@ -655,16 +658,18 @@ private boolean flushCommits(
}

private void increaseWatermarkOnEmptyPolls(
Map<TopicPartition, Integer> emptyPollCount,
Map<TopicPartition, Long> endOffsets,
Map<TopicPartition, Long> polledOffsets,
Map<TopicPartition, Integer> topicPartitionToId,
AtomicReference<PartitionedWatermarkEstimator> watermarkEstimator) {

// we have to poll at least number of assigned partitions-times and still have empty poll
// on that partition to be sure that it is actually empty
int numEmptyPolls = emptyPollCount.size();
emptyPollCount.entrySet().stream()
.filter(e -> e.getValue() >= numEmptyPolls)
.forEach(e -> watermarkEstimator.get().idle(topicPartitionToId.get(e.getKey())));
endOffsets.forEach(
(tp, endOffset) -> {
long polledOffset = MoreObjects.firstNonNull(polledOffsets.get(tp), 0L);
if (endOffset <= polledOffset + 1) {
watermarkEstimator.get().idle(topicPartitionToId.get(tp));
}
});
}

private ObserveHandle createObserveHandle(
Expand Down Expand Up @@ -864,7 +869,6 @@ private ConsumerRebalanceListener listener(
String name,
AtomicReference<KafkaConsumer<Object, Object>> kafka,
ElementConsumer<Object, Object> consumer,
Map<TopicPartition, Integer> emptyPollCount,
Map<TopicPartition, Integer> topicPartitionToId,
AtomicReference<PartitionedWatermarkEstimator> watermarkEstimator) {

Expand All @@ -881,15 +885,10 @@ public void onPartitionsRevoked(Collection<TopicPartition> parts) {
public void onPartitionsAssigned(Collection<TopicPartition> parts) {
currentlyAssigned.addAll(parts);
log.info("Consumer {} has assigned partitions {}", name, currentlyAssigned);
emptyPollCount.clear();
topicPartitionToId.clear();
AtomicInteger id = new AtomicInteger();

currentlyAssigned.forEach(
p -> {
topicPartitionToId.put(p, id.getAndIncrement());
emptyPollCount.put(p, 0);
});
currentlyAssigned.forEach(p -> topicPartitionToId.put(p, id.getAndIncrement()));

if (currentlyAssigned.isEmpty()) {
watermarkEstimator.set(createWatermarkEstimatorForEmptyParts());
Expand Down
Loading

0 comments on commit 67e9745

Please sign in to comment.