Skip to content

Commit

Permalink
upd
Browse files Browse the repository at this point in the history
  • Loading branch information
Mryange authored and yiguolei committed Jan 25, 2025
1 parent 1e730e2 commit c418390
Showing 1 changed file with 3 additions and 0 deletions.
3 changes: 3 additions & 0 deletions be/src/pipeline/exec/exchange_sink_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -431,12 +431,15 @@ void ExchangeSinkBuffer::_set_receiver_eof(InstanceLoId id) {

std::queue<TransmitInfo, std::list<TransmitInfo>>& q = _instance_to_package_queue[id];
for (; !q.empty(); q.pop()) {
// Must update _total_queue_size here, otherwise if _total_queue_size > _queue_capacity at EOF,
// ExchangeSinkQueueDependency will be blocked and pipeline will be deadlocked
_total_queue_size--;
if (q.front().block) {
COUNTER_UPDATE(_parent->memory_used_counter(), -q.front().block->ByteSizeLong());
}
}

// Try to wake up pipeline after clearing the queue
if (_queue_dependency && _total_queue_size <= _queue_capacity) {
_queue_dependency->set_ready();
}
Expand Down

0 comments on commit c418390

Please sign in to comment.