Skip to content

Commit

Permalink
[fix](sort) fix coredump by uncaught exception
Browse files Browse the repository at this point in the history
  • Loading branch information
jacktengg committed Jan 14, 2025
1 parent 6debbc3 commit 04cf8ff
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 16 deletions.
2 changes: 1 addition & 1 deletion be/src/vec/core/sort_cursor.h
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ struct BlockSupplierSortCursorImpl : public MergeSortCursorImpl {
} else if (!status.ok()) {
// Currently, a known error status is emitted when sender
// close recei
throw std::runtime_error(status.msg());
throw Exception(status.code(), status.msg());
}
return false;
}
Expand Down
28 changes: 16 additions & 12 deletions be/src/vec/runtime/vsorted_run_merger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ VSortedRunMerger::VSortedRunMerger(const VExprContextSPtrs& ordering_expr,
_batch_size(batch_size),
_limit(limit),
_offset(offset) {
init_timers(profile);
_init_timers(profile);
}

VSortedRunMerger::VSortedRunMerger(const SortDescription& desc, const size_t batch_size,
Expand All @@ -60,10 +60,10 @@ VSortedRunMerger::VSortedRunMerger(const SortDescription& desc, const size_t bat
_offset(offset),
_get_next_timer(nullptr),
_get_next_block_timer(nullptr) {
init_timers(profile);
_init_timers(profile);
}

void VSortedRunMerger::init_timers(RuntimeProfile* profile) {
void VSortedRunMerger::_init_timers(RuntimeProfile* profile) {
_get_next_timer = ADD_TIMER(profile, "MergeGetNext");
_get_next_block_timer = ADD_TIMER(profile, "MergeGetNextBlock");
}
Expand Down Expand Up @@ -97,15 +97,15 @@ Status VSortedRunMerger::prepare(const vector<BlockSupplier>& input_runs) {
return Status::OK();
}

Status VSortedRunMerger::get_next(Block* output_block, bool* eos) {
Status VSortedRunMerger::_get_next_internal(Block* output_block, bool* eos) {
ScopedTimer<MonotonicStopWatch> timer(_get_next_timer);
// Only have one receive data queue of data, no need to do merge and
// copy the data of block.
// return the data in receive data directly

if (_pending_cursor != nullptr) {
MergeSortCursor cursor(_pending_cursor);
if (has_next_block(cursor)) {
if (_has_next_block(cursor)) {
_priority_queue.push(cursor);
}
_pending_cursor = nullptr;
Expand All @@ -124,7 +124,7 @@ Status VSortedRunMerger::get_next(Block* output_block, bool* eos) {
_priority_queue.pop();
return Status::OK();
}
has_next_block(current);
_has_next_block(current);
} else {
current->pos += _offset;
_offset = 0;
Expand All @@ -139,7 +139,7 @@ Status VSortedRunMerger::get_next(Block* output_block, bool* eos) {
_priority_queue.pop();
return Status::OK();
}
*eos = !has_next_block(current);
*eos = !_has_next_block(current);
} else {
*eos = true;
}
Expand All @@ -156,7 +156,7 @@ Status VSortedRunMerger::get_next(Block* output_block, bool* eos) {
_priority_queue.pop();
return Status::OK();
}
*eos = !has_next_block(current);
*eos = !_has_next_block(current);
} else {
*eos = true;
}
Expand Down Expand Up @@ -190,7 +190,7 @@ Status VSortedRunMerger::get_next(Block* output_block, bool* eos) {
}

// In pipeline engine, needs to check if the sender is readable before the next reading.
if (!next_heap(current)) {
if (!_next_heap(current)) {
return Status::OK();
}

Expand All @@ -213,21 +213,25 @@ Status VSortedRunMerger::get_next(Block* output_block, bool* eos) {
return Status::OK();
}

bool VSortedRunMerger::next_heap(MergeSortCursor& current) {
Status VSortedRunMerger::get_next(Block* output_block, bool* eos) {
RETURN_IF_CATCH_EXCEPTION(return _get_next_internal(output_block, eos));
}

bool VSortedRunMerger::_next_heap(MergeSortCursor& current) {
if (!current->isLast()) {
current->next();
_priority_queue.push(current);
} else if (_pipeline_engine_enabled) {
// need to check sender is readable again before the next reading.
_pending_cursor = current.impl;
return false;
} else if (has_next_block(current)) {
} else if (_has_next_block(current)) {
_priority_queue.push(current);
}
return true;
}

inline bool VSortedRunMerger::has_next_block(doris::vectorized::MergeSortCursor& current) {
inline bool VSortedRunMerger::_has_next_block(doris::vectorized::MergeSortCursor& current) {
ScopedTimer<MonotonicStopWatch> timer(_get_next_block_timer);
return current->has_next_block();
}
Expand Down
8 changes: 5 additions & 3 deletions be/src/vec/runtime/vsorted_run_merger.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,13 @@ class VSortedRunMerger {
RuntimeProfile::Counter* _get_next_block_timer;

private:
void init_timers(RuntimeProfile* profile);
void _init_timers(RuntimeProfile* profile);

/// In pipeline engine, return false if need to read one more block from sender.
bool next_heap(MergeSortCursor& current);
bool has_next_block(MergeSortCursor& current);
bool _next_heap(MergeSortCursor& current);
bool _has_next_block(MergeSortCursor& current);

Status _get_next_internal(Block* output_block, bool* eos);
};

} // namespace vectorized
Expand Down

0 comments on commit 04cf8ff

Please sign in to comment.