From c4183905300367bccd0113145e1c5ced73ed45b2 Mon Sep 17 00:00:00 2001 From: Mryange Date: Thu, 23 Jan 2025 19:39:53 +0800 Subject: [PATCH] upd --- be/src/pipeline/exec/exchange_sink_buffer.cpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index 9869ea4b3d2692..9cedfaa683555f 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -431,12 +431,15 @@ void ExchangeSinkBuffer::_set_receiver_eof(InstanceLoId id) { std::queue>& q = _instance_to_package_queue[id]; for (; !q.empty(); q.pop()) { + // Must update _total_queue_size here, otherwise if _total_queue_size > _queue_capacity at EOF, + // ExchangeSinkQueueDependency will be blocked and pipeline will be deadlocked _total_queue_size--; if (q.front().block) { COUNTER_UPDATE(_parent->memory_used_counter(), -q.front().block->ByteSizeLong()); } } + // Try to wake up pipeline after clearing the queue if (_queue_dependency && _total_queue_size <= _queue_capacity) { _queue_dependency->set_ready(); }