diff --git a/be/src/vec/core/sort_cursor.h b/be/src/vec/core/sort_cursor.h index d48d4a7b57fdfd2..84f4483a56d4951 100644 --- a/be/src/vec/core/sort_cursor.h +++ b/be/src/vec/core/sort_cursor.h @@ -251,7 +251,7 @@ struct BlockSupplierSortCursorImpl : public MergeSortCursorImpl { } else if (!status.ok()) { // Currently, a known error status is emitted when sender // close recei - throw std::runtime_error(status.msg()); + throw Exception(status.code(), status.msg()); } return false; } diff --git a/be/src/vec/runtime/vsorted_run_merger.cpp b/be/src/vec/runtime/vsorted_run_merger.cpp index a9c0c53e4874994..3eb8d5f63820d6b 100644 --- a/be/src/vec/runtime/vsorted_run_merger.cpp +++ b/be/src/vec/runtime/vsorted_run_merger.cpp @@ -48,7 +48,7 @@ VSortedRunMerger::VSortedRunMerger(const VExprContextSPtrs& ordering_expr, _batch_size(batch_size), _limit(limit), _offset(offset) { - init_timers(profile); + _init_timers(profile); } VSortedRunMerger::VSortedRunMerger(const SortDescription& desc, const size_t batch_size, @@ -60,10 +60,10 @@ VSortedRunMerger::VSortedRunMerger(const SortDescription& desc, const size_t bat _offset(offset), _get_next_timer(nullptr), _get_next_block_timer(nullptr) { - init_timers(profile); + _init_timers(profile); } -void VSortedRunMerger::init_timers(RuntimeProfile* profile) { +void VSortedRunMerger::_init_timers(RuntimeProfile* profile) { _get_next_timer = ADD_TIMER(profile, "MergeGetNext"); _get_next_block_timer = ADD_TIMER(profile, "MergeGetNextBlock"); } @@ -97,7 +97,7 @@ Status VSortedRunMerger::prepare(const vector& input_runs) { return Status::OK(); } -Status VSortedRunMerger::get_next(Block* output_block, bool* eos) { +Status VSortedRunMerger::_get_next_internal(Block* output_block, bool* eos) { ScopedTimer timer(_get_next_timer); // Only have one receive data queue of data, no need to do merge and // copy the data of block. @@ -105,7 +105,7 @@ Status VSortedRunMerger::get_next(Block* output_block, bool* eos) { if (_pending_cursor != nullptr) { MergeSortCursor cursor(_pending_cursor); - if (has_next_block(cursor)) { + if (_has_next_block(cursor)) { _priority_queue.push(cursor); } _pending_cursor = nullptr; @@ -124,7 +124,7 @@ Status VSortedRunMerger::get_next(Block* output_block, bool* eos) { _priority_queue.pop(); return Status::OK(); } - has_next_block(current); + _has_next_block(current); } else { current->pos += _offset; _offset = 0; @@ -139,7 +139,7 @@ Status VSortedRunMerger::get_next(Block* output_block, bool* eos) { _priority_queue.pop(); return Status::OK(); } - *eos = !has_next_block(current); + *eos = !_has_next_block(current); } else { *eos = true; } @@ -156,7 +156,7 @@ Status VSortedRunMerger::get_next(Block* output_block, bool* eos) { _priority_queue.pop(); return Status::OK(); } - *eos = !has_next_block(current); + *eos = !_has_next_block(current); } else { *eos = true; } @@ -190,7 +190,7 @@ Status VSortedRunMerger::get_next(Block* output_block, bool* eos) { } // In pipeline engine, needs to check if the sender is readable before the next reading. - if (!next_heap(current)) { + if (!_next_heap(current)) { return Status::OK(); } @@ -213,7 +213,11 @@ Status VSortedRunMerger::get_next(Block* output_block, bool* eos) { return Status::OK(); } -bool VSortedRunMerger::next_heap(MergeSortCursor& current) { +Status VSortedRunMerger::get_next(Block* output_block, bool* eos) { + RETURN_IF_CATCH_EXCEPTION(return _get_next_internal(output_block, eos)); +} + +bool VSortedRunMerger::_next_heap(MergeSortCursor& current) { if (!current->isLast()) { current->next(); _priority_queue.push(current); @@ -221,13 +225,13 @@ bool VSortedRunMerger::next_heap(MergeSortCursor& current) { // need to check sender is readable again before the next reading. _pending_cursor = current.impl; return false; - } else if (has_next_block(current)) { + } else if (_has_next_block(current)) { _priority_queue.push(current); } return true; } -inline bool VSortedRunMerger::has_next_block(doris::vectorized::MergeSortCursor& current) { +inline bool VSortedRunMerger::_has_next_block(doris::vectorized::MergeSortCursor& current) { ScopedTimer timer(_get_next_block_timer); return current->has_next_block(); } diff --git a/be/src/vec/runtime/vsorted_run_merger.h b/be/src/vec/runtime/vsorted_run_merger.h index 2f9ebe04a68b381..902d46f8153f66f 100644 --- a/be/src/vec/runtime/vsorted_run_merger.h +++ b/be/src/vec/runtime/vsorted_run_merger.h @@ -94,11 +94,13 @@ class VSortedRunMerger { RuntimeProfile::Counter* _get_next_block_timer; private: - void init_timers(RuntimeProfile* profile); + void _init_timers(RuntimeProfile* profile); /// In pipeline engine, return false if need to read one more block from sender. - bool next_heap(MergeSortCursor& current); - bool has_next_block(MergeSortCursor& current); + bool _next_heap(MergeSortCursor& current); + bool _has_next_block(MergeSortCursor& current); + + Status _get_next_internal(Block* output_block, bool* eos); }; } // namespace vectorized