Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
BiteTheDDDDt committed Jan 27, 2025
1 parent d6e500d commit bb15b66
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 6 deletions.
4 changes: 0 additions & 4 deletions be/src/pipeline/exec/multi_cast_data_stream_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,6 @@ Status MultiCastDataStreamSinkOperatorX::sink(RuntimeState* state, vectorized::B
if (in_block->rows() > 0 || eos) {
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows());
auto st = local_state._shared_state->multi_cast_data_streamer->push(state, in_block, eos);
// TODO: improvement: if sink returned END_OF_FILE, pipeline task can be finished
if (st.template is<ErrorCode::END_OF_FILE>()) {
return Status::OK();
}
return st;
}
return Status::OK();
Expand Down
15 changes: 14 additions & 1 deletion be/src/pipeline/pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,20 @@ void Pipeline::make_all_runnable() {

for (auto* task : _tasks) {
if (task) {
task->make_runnable_if_all_downstream_finished();
task->wake_up_early_if_all_downstream_finished();
}
}

if (_sink->count_down_destination()) {
for (auto* task : _tasks) {
if (task) {
task->set_wake_up_early();
}
}
for (auto* task : _tasks) {
if (task) {
task->clear_blocking_state();
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/pipeline_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ class PipelineTask {

bool wake_up_early() const { return _wake_up_early; }

void make_runnable_if_all_downstream_finished() {
void wake_up_early_if_all_downstream_finished() {
if (!_sink_shared_state) {
return;
}
Expand Down

0 comments on commit bb15b66

Please sign in to comment.