Skip to content

Commit

Permalink
Collect consumed metric message by message
Browse files Browse the repository at this point in the history
Instead of collecting after the whole chunk is dispatched. This makes
the metric more accurate when message processing takes some time.
  • Loading branch information
acogoluegnes committed Jan 31, 2025
1 parent deedc79 commit 59fe87c
Showing 1 changed file with 2 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,6 @@ static int handleDeliver(
}

metricsCollector.chunk(numEntries);
long messagesRead = 0;
MutableBoolean messageIgnored = new MutableBoolean(false);

while (numRecords != 0) {
Expand Down Expand Up @@ -482,7 +481,7 @@ static int handleDeliver(
subscriptionId, offset, chunkTimestamp, committedOffset, chunkContext);
messageIgnored.set(false);
} else {
messagesRead++;
metricsCollector.consume(1);
}
numRecords--;
offset++; // works even for unsigned long
Expand Down Expand Up @@ -551,7 +550,7 @@ static int handleDeliver(
subscriptionId, offset, chunkTimestamp, committedOffset, chunkContext);
messageIgnored.set(false);
} else {
messagesRead++;
metricsCollector.consume(1);
}
numRecordsInBatch--;
offset++; // works even for unsigned long
Expand All @@ -564,7 +563,6 @@ static int handleDeliver(
}
}
}
metricsCollector.consume(messagesRead);
return read;
}

Expand Down

0 comments on commit 59fe87c

Please sign in to comment.