Skip to content

Commit

Permalink
Fix KafkaBuffer isEmpty
Browse files Browse the repository at this point in the history
Signed-off-by: Krishna Kondaka <[email protected]>
  • Loading branch information
Krishna Kondaka committed Feb 29, 2024
1 parent 337eb71 commit 0ad52c9
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,14 @@ public Map.Entry<Collection<T>, CheckpointState> read(int timeoutInMillis) {
@Override
public void checkpoint(final CheckpointState checkpointState) {
checkpointTimer.record(() -> doCheckpoint(checkpointState));
final int numRecordsToBeChecked = checkpointState.getNumRecordsToBeChecked();
recordsInFlight.addAndGet(-numRecordsToBeChecked);
recordsProcessedCounter.increment(numRecordsToBeChecked);
if (!isByteBuffer()) {
final int numRecordsToBeChecked = checkpointState.getNumRecordsToBeChecked();
recordsInFlight.addAndGet(-numRecordsToBeChecked);
recordsProcessedCounter.increment(numRecordsToBeChecked);
}
}

protected int getRecordsInFlight() {
public int getRecordsInFlight() {
return recordsInFlight.intValue();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,11 @@ void write_and_read() throws TimeoutException {
// TODO: The metadata is not included. It needs to be included in the Buffer, though not in the Sink. This may be something we make configurable in the consumer/producer - whether to serialize the metadata or not.
//assertThat(onlyResult.getData().getMetadata(), equalTo(record.getData().getMetadata()));
assertThat(onlyResult.getData().toMap(), equalTo(record.getData().toMap()));
assertThat(objectUnderTest.getRecordsInFlight(), equalTo(0));
assertThat(objectUnderTest.getInnerBufferRecordsInFlight(), equalTo(1));
objectUnderTest.checkpoint(readResult.getValue());
assertThat(objectUnderTest.getRecordsInFlight(), equalTo(0));
assertThat(objectUnderTest.getInnerBufferRecordsInFlight(), equalTo(0));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ public void postProcess(final Long recordsInBuffer) {
public void doCheckpoint(CheckpointState checkpointState) {
try {
setMdc();
innerBuffer.doCheckpoint(checkpointState);
innerBuffer.checkpoint(checkpointState);
} finally {
resetMdc();
}
Expand All @@ -188,6 +188,10 @@ public boolean isWrittenOffHeapOnly() {
return true;
}

int getInnerBufferRecordsInFlight() {
return innerBuffer.getRecordsInFlight();
}

@Override
public void shutdown() {
try {
Expand Down

0 comments on commit 0ad52c9

Please sign in to comment.