Skip to content

Commit

Permalink
This commit avoids unnecessary thread sync of PQ notFull state. (#14129
Browse files Browse the repository at this point in the history
…) (#14141)

When PQ is full, workers wake up writer thread in every read.
However, without removing a fully acked page, queue is still full.
This commit changes the condition of notFull signal.
Fixed: #6801

(cherry picked from commit da68ff3)

Co-authored-by: kaisecheng <[email protected]>
  • Loading branch information
github-actions[bot] and kaisecheng authored May 24, 2022
1 parent fc9f6af commit 4371da8
Showing 1 changed file with 10 additions and 6 deletions.
16 changes: 10 additions & 6 deletions logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java
Original file line number Diff line number Diff line change
Expand Up @@ -509,21 +509,25 @@ private void behead() throws IOException {
public boolean isFull() {
lock.lock();
try {
if (this.maxBytes > 0L && isMaxBytesReached()) {
return true;
} else {
return (this.maxUnread > 0 && this.unreadCount >= this.maxUnread);
}
return isMaxBytesReached() || isMaxUnreadReached();
} finally {
lock.unlock();
}
}

private boolean isMaxBytesReached() {
if (this.maxBytes <= 0L) {
return false;
}

final long persistedByteSize = getPersistedByteSize();
return ((persistedByteSize > this.maxBytes) || (persistedByteSize == this.maxBytes && !this.headPage.hasSpace(1)));
}

private boolean isMaxUnreadReached() {
return this.maxUnread > 0 && (this.unreadCount >= this.maxUnread);
}

/**
* Queue is considered empty if it does not contain any tail page and the headpage has no element or all
* elements are acked
Expand Down Expand Up @@ -636,7 +640,7 @@ private Batch readPageBatch(Page p, int limit, long timeout) throws IOException
}

if (! p.isFullyRead()) {
boolean wasFull = isFull();
boolean wasFull = isMaxUnreadReached();

final SequencedList<byte[]> serialized = p.read(left);
int n = serialized.getElements().size();
Expand Down

0 comments on commit 4371da8

Please sign in to comment.