Skip to content

Commit

Permalink
[Bug](join) return eof when join build sink awakend by downstream sou…
Browse files Browse the repository at this point in the history
…rce (#47380)

### What problem does this PR solve?
1. return eof when join build sink awakend by downstream source to avoid
HashJoinBuildSinkLocalState::close meet error.

![QQ_1737641060365](https://github.com/user-attachments/assets/8b8ddc15-7616-45ca-8afa-8895df21b52c)
2. add WakeUpEarlyReason to profile
3. add debug point `Pipeline::make_all_runnable.sleep` to reproduce
problem in regression test
```cpp
Exception in inverted_index_p0/ssb_unique_sql_zstd/sql/q4.3.sql:
java.lang.IllegalStateException: exceptions : exception : errCode = 2, detailMessage = (127.0.0.1)[INTERNAL_ERROR]rf process meet error: [E6] bf not inited and not ignored/disabled, rf: RuntimeFilter: (id = 0, type = bloomfilter, is_broadcast: true, ignored: false, disabled: false, build_bf_cardinality: true, dependency: none, synced_size: -1, has_local_target: true, has_remote_target: false, error_msg: []
  0#  doris::Exception::Exception(int, std::basic_string_view<char, std::char_traits<char> > const&) at /root/doris/be/src/common/exception.cpp:29
  1#  doris::Exception::Exception<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > >(int, std::basic_string_view<char, std::char_traits<char> > const&, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >&&) at /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/basic_string.h:187
  2#  doris::IRuntimeFilter::signal() at /root/doris/be/src/exprs/runtime_filter.cpp:610
  3#  doris::IRuntimeFilter::publish(doris::RuntimeState*, bool)::$_1::operator()(std::shared_ptr<doris::RuntimePredicateWrapper>, bool, unsigned long) const at /root/doris/be/src/exprs/runtime_filter.cpp:0
  4#  doris::IRuntimeFilter::publish(doris::RuntimeState*, bool) at /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/shared_ptr_base.h:701
  5#  doris::VRuntimeFilterSlots::publish(doris::RuntimeState*, bool) at /root/doris/be/src/exprs/runtime_filter_slots.h:0
  6#  doris::pipeline::HashJoinBuildSinkLocalState::close(doris::RuntimeState*, doris::Status) at /root/doris/be/src/pipeline/exec/hashjoin_build_sink.cpp:173
  7#  doris::pipeline::DataSinkOperatorXBase::close(doris::RuntimeState*, doris::Status) at /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/unique_ptr.h:360
  8#  doris::pipeline::PipelineTask::close(doris::Status, bool) at /root/doris/be/src/common/status.h:390
  9#  doris::pipeline::_close_task(doris::pipeline::PipelineTask*, doris::Status) at /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/unique_ptr.h:360
  10# doris::pipeline::TaskScheduler::_do_work(int) at /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/unique_ptr.h:360
  11# doris::ThreadPool::dispatch_thread() at /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/move.h:206
  12# doris::Thread::supervise_thread(void*) at /var/local/ldb-toolchain/bin/../usr/include/pthread.h:563
  13# ?
  14# __clone
```
  • Loading branch information
BiteTheDDDDt authored Jan 26, 2025
1 parent c9fb76b commit 1114a8c
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 10 deletions.
14 changes: 5 additions & 9 deletions be/src/pipeline/exec/hashjoin_build_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,20 +172,17 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_statu
SCOPED_TIMER(_publish_runtime_filter_timer);
RETURN_IF_ERROR(_runtime_filter_slots->publish(state, !_should_build_hash_table));
} catch (Exception& e) {
bool blocked_by_complete_build_stage = p._shared_hashtable_controller &&
!p._shared_hash_table_context->complete_build_stage;
bool blocked_by_shared_hash_table_signal = !_should_build_hash_table &&
p._shared_hashtable_controller &&
!p._shared_hash_table_context->signaled;

return Status::InternalError(
"rf process meet error: {}, wake_up_early: {}, should_build_hash_table: "
"{}, _finish_dependency: {}, blocked_by_complete_build_stage: {}, "
"{}, _finish_dependency: {}, "
"blocked_by_shared_hash_table_signal: "
"{}",
e.to_string(), state->get_task()->wake_up_early(), _should_build_hash_table,
_finish_dependency->debug_string(), blocked_by_complete_build_stage,
blocked_by_shared_hash_table_signal);
_finish_dependency->debug_string(), blocked_by_shared_hash_table_signal);
}
return Base::close(state, exec_status);
}
Expand Down Expand Up @@ -557,7 +554,6 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
local_state.process_build_block(state, (*local_state._shared_state->build_block)));
if (_shared_hashtable_controller) {
_shared_hash_table_context->status = Status::OK();
_shared_hash_table_context->complete_build_stage = true;
// arena will be shared with other instances.
_shared_hash_table_context->arena = local_state._shared_state->arena;
_shared_hash_table_context->hash_table_variants =
Expand All @@ -569,12 +565,12 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
local_state._shared_state->build_indexes_null;
local_state._runtime_filter_slots->copy_to_shared_context(_shared_hash_table_context);
}
} else if (!local_state._should_build_hash_table &&
_shared_hash_table_context->complete_build_stage) {
} else if (!local_state._should_build_hash_table) {
DCHECK(_shared_hashtable_controller != nullptr);
DCHECK(_shared_hash_table_context != nullptr);
// the instance which is not build hash table, it's should wait the signal of hash table build finished.
// but if it's running and signaled == false, maybe the source operator have closed caused by some short circuit,
// but if it's running and signaled == false, maybe the source operator have closed caused by some short circuit
// return eof will make task marked as wake_up_early
if (!_shared_hash_table_context->signaled) {
return Status::Error<ErrorCode::END_OF_FILE>("source have closed");
}
Expand Down
8 changes: 8 additions & 0 deletions be/src/pipeline/pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,14 @@ Status Pipeline::set_sink(DataSinkOperatorPtr& sink) {
}

void Pipeline::make_all_runnable() {
DBUG_EXECUTE_IF("Pipeline::make_all_runnable.sleep", {
auto pipeline_id = DebugPoints::instance()->get_debug_param_or_default<int32_t>(
"Pipeline::make_all_runnable", "pipeline_id", 0);
if (pipeline_id == id()) {
sleep(10);
}
});

if (_sink->count_down_destination()) {
for (auto* task : _tasks) {
if (task) {
Expand Down
1 change: 0 additions & 1 deletion be/src/vec/runtime/shared_hash_table_controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ struct SharedHashTableContext {
std::map<int, RuntimeFilterContextSPtr> runtime_filters;
std::atomic<bool> signaled = false;
bool short_circuit_for_null_in_probe_side = false;
std::atomic<bool> complete_build_stage = false;
};

using SharedHashTableContextPtr = std::shared_ptr<SharedHashTableContext>;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql --
0 true

Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_slow_close") {
sql "set disable_join_reorder=true;"
sql "set runtime_filter_type='bloom_filter';"
sql "set parallel_pipeline_task_num=3"
sql "set ignore_runtime_filter_ids='1,2';"
sql "set enable_runtime_filter_prune=false;"

sql """ drop table if exists t1; """
sql """ drop table if exists t3; """
sql """ drop table if exists t5; """

sql """
create table t1 (
k1 int null,
k2 int null
)
duplicate key (k1)
distributed BY hash(k1) buckets 16
properties("replication_num" = "1");
"""

sql """
create table t3 (
k1 int null,
k2 int null
)
duplicate key (k1)
distributed BY hash(k1) buckets 16
properties("replication_num" = "1");
"""

sql """
create table t5 (
k1 int null,
k2 int null
)
duplicate key (k1)
distributed BY hash(k1) buckets 16
properties("replication_num" = "1");
"""

sql """
insert into t1 select e1,e1 from (select 1 k1) as t lateral view explode_numbers(100000) tmp1 as e1;
"""

sql """
insert into t3 values(1,1),(2,2),(3,3);
"""

sql """
insert into t5 values(1,1),(2,2),(3,3),(4,4),(5,5);
"""

try {
GetDebugPoint().enableDebugPointForAllBEs("Pipeline::make_all_runnable.sleep",[pipeline_id: 4])
qt_sql "select count(*),sleep(2) from (select t1.k1 from t5 join [broadcast] t1 on t1.k1=t5.k1) tmp join [broadcast] t3 join t3 t3s [broadcast] on tmp.k1=t3.k1 and t3s.k1=t3.k1 where t3.k2=5;"
} finally {
GetDebugPoint().disableDebugPointForAllBEs("Pipeline::make_all_runnable.sleep")
}
}

0 comments on commit 1114a8c

Please sign in to comment.