Skip to content

Commit

Permalink
Removed unrelated changes
Browse files Browse the repository at this point in the history
Signed-off-by: Krishna Kondaka <[email protected]>
  • Loading branch information
Krishna Kondaka committed Nov 27, 2023
1 parent 8782155 commit cfe0262
Showing 1 changed file with 3 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -224,20 +224,14 @@ private void addAcknowledgedOffsets(final TopicPartition topicPartition, final R

if (Objects.isNull(commitTracker) && errLogRateLimiter.isAllowed(System.currentTimeMillis())) {
LOG.error("Commit tracker not found for TopicPartition: {}", topicPartition);
return;
}

final OffsetAndMetadata offsetAndMetadata = commitTracker.addCompletedOffsets(offsetRange);
final OffsetAndMetadata offsetAndMetadata =
partitionCommitTrackerMap.get(partitionId).addCompletedOffsets(offsetRange);
updateOffsetsToCommit(topicPartition, offsetAndMetadata);
}

private void resetOffsets() {
// resetting offsets is similar to committing acknowledged offsets. Throttle the frequency of resets by
// checking current time with last commit time. Same "lastCommitTime" and commit interval are used in both cases
long currentTimeMillis = System.currentTimeMillis();
if ((currentTimeMillis - lastCommitTime) < topicConfig.getCommitInterval().toMillis()) {
return;
}
if (partitionsToReset.size() > 0) {
partitionsToReset.forEach(partition -> {
try {
Expand All @@ -250,8 +244,6 @@ private void resetOffsets() {
consumer.seek(partition, offsetAndMetadata);
}
partitionCommitTrackerMap.remove(partition.partition());
final long epoch = getCurrentTimeNanos();
ownedPartitionsEpoch.put(partition, epoch);
} catch (Exception e) {
LOG.error("Failed to seek to last committed offset upon negative acknowledgement {}", partition, e);
}
Expand Down Expand Up @@ -501,6 +493,7 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
continue;
}
LOG.info("Assigned partition {}", topicPartition);
partitionCommitTrackerMap.remove(topicPartition.partition());
ownedPartitionsEpoch.put(topicPartition, epoch);
}
}
Expand Down

0 comments on commit cfe0262

Please sign in to comment.