From 1114a8cd8822f7c212857d5541346bed461bbbe3 Mon Sep 17 00:00:00 2001 From: Pxl Date: Sun, 26 Jan 2025 15:05:07 +0800 Subject: [PATCH] [Bug](join) return eof when join build sink awakend by downstream source (#47380) ### What problem does this PR solve? 1. return eof when join build sink awakend by downstream source to avoid HashJoinBuildSinkLocalState::close meet error. ![QQ_1737641060365](https://github.com/user-attachments/assets/8b8ddc15-7616-45ca-8afa-8895df21b52c) 2. add WakeUpEarlyReason to profile 3. add debug point `Pipeline::make_all_runnable.sleep` to reproduce problem in regression test ```cpp Exception in inverted_index_p0/ssb_unique_sql_zstd/sql/q4.3.sql: java.lang.IllegalStateException: exceptions : exception : errCode = 2, detailMessage = (127.0.0.1)[INTERNAL_ERROR]rf process meet error: [E6] bf not inited and not ignored/disabled, rf: RuntimeFilter: (id = 0, type = bloomfilter, is_broadcast: true, ignored: false, disabled: false, build_bf_cardinality: true, dependency: none, synced_size: -1, has_local_target: true, has_remote_target: false, error_msg: [] 0# doris::Exception::Exception(int, std::basic_string_view > const&) at /root/doris/be/src/common/exception.cpp:29 1# doris::Exception::Exception, std::allocator > >(int, std::basic_string_view > const&, std::__cxx11::basic_string, std::allocator >&&) at /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/basic_string.h:187 2# doris::IRuntimeFilter::signal() at /root/doris/be/src/exprs/runtime_filter.cpp:610 3# doris::IRuntimeFilter::publish(doris::RuntimeState*, bool)::$_1::operator()(std::shared_ptr, bool, unsigned long) const at /root/doris/be/src/exprs/runtime_filter.cpp:0 4# doris::IRuntimeFilter::publish(doris::RuntimeState*, bool) at /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/shared_ptr_base.h:701 5# doris::VRuntimeFilterSlots::publish(doris::RuntimeState*, bool) at /root/doris/be/src/exprs/runtime_filter_slots.h:0 6# doris::pipeline::HashJoinBuildSinkLocalState::close(doris::RuntimeState*, doris::Status) at /root/doris/be/src/pipeline/exec/hashjoin_build_sink.cpp:173 7# doris::pipeline::DataSinkOperatorXBase::close(doris::RuntimeState*, doris::Status) at /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/unique_ptr.h:360 8# doris::pipeline::PipelineTask::close(doris::Status, bool) at /root/doris/be/src/common/status.h:390 9# doris::pipeline::_close_task(doris::pipeline::PipelineTask*, doris::Status) at /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/unique_ptr.h:360 10# doris::pipeline::TaskScheduler::_do_work(int) at /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/unique_ptr.h:360 11# doris::ThreadPool::dispatch_thread() at /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/move.h:206 12# doris::Thread::supervise_thread(void*) at /var/local/ldb-toolchain/bin/../usr/include/pthread.h:563 13# ? 14# __clone ``` --- be/src/pipeline/exec/hashjoin_build_sink.cpp | 14 ++-- be/src/pipeline/pipeline.cpp | 8 ++ .../runtime/shared_hash_table_controller.h | 1 - .../join/test_slow_close/test_slow_close.out | 4 + .../test_slow_close/test_slow_close.groovy | 78 +++++++++++++++++++ 5 files changed, 95 insertions(+), 10 deletions(-) create mode 100644 regression-test/data/query_p0/join/test_slow_close/test_slow_close.out create mode 100644 regression-test/suites/query_p0/join/test_slow_close/test_slow_close.groovy diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index 47025aa62f77cd..e53ebca2304b2b 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -172,20 +172,17 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_statu SCOPED_TIMER(_publish_runtime_filter_timer); RETURN_IF_ERROR(_runtime_filter_slots->publish(state, !_should_build_hash_table)); } catch (Exception& e) { - bool blocked_by_complete_build_stage = p._shared_hashtable_controller && - !p._shared_hash_table_context->complete_build_stage; bool blocked_by_shared_hash_table_signal = !_should_build_hash_table && p._shared_hashtable_controller && !p._shared_hash_table_context->signaled; return Status::InternalError( "rf process meet error: {}, wake_up_early: {}, should_build_hash_table: " - "{}, _finish_dependency: {}, blocked_by_complete_build_stage: {}, " + "{}, _finish_dependency: {}, " "blocked_by_shared_hash_table_signal: " "{}", e.to_string(), state->get_task()->wake_up_early(), _should_build_hash_table, - _finish_dependency->debug_string(), blocked_by_complete_build_stage, - blocked_by_shared_hash_table_signal); + _finish_dependency->debug_string(), blocked_by_shared_hash_table_signal); } return Base::close(state, exec_status); } @@ -557,7 +554,6 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* local_state.process_build_block(state, (*local_state._shared_state->build_block))); if (_shared_hashtable_controller) { _shared_hash_table_context->status = Status::OK(); - _shared_hash_table_context->complete_build_stage = true; // arena will be shared with other instances. _shared_hash_table_context->arena = local_state._shared_state->arena; _shared_hash_table_context->hash_table_variants = @@ -569,12 +565,12 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* local_state._shared_state->build_indexes_null; local_state._runtime_filter_slots->copy_to_shared_context(_shared_hash_table_context); } - } else if (!local_state._should_build_hash_table && - _shared_hash_table_context->complete_build_stage) { + } else if (!local_state._should_build_hash_table) { DCHECK(_shared_hashtable_controller != nullptr); DCHECK(_shared_hash_table_context != nullptr); // the instance which is not build hash table, it's should wait the signal of hash table build finished. - // but if it's running and signaled == false, maybe the source operator have closed caused by some short circuit, + // but if it's running and signaled == false, maybe the source operator have closed caused by some short circuit + // return eof will make task marked as wake_up_early if (!_shared_hash_table_context->signaled) { return Status::Error("source have closed"); } diff --git a/be/src/pipeline/pipeline.cpp b/be/src/pipeline/pipeline.cpp index 1292edea9ee82f..2dd0394d2aeeec 100644 --- a/be/src/pipeline/pipeline.cpp +++ b/be/src/pipeline/pipeline.cpp @@ -112,6 +112,14 @@ Status Pipeline::set_sink(DataSinkOperatorPtr& sink) { } void Pipeline::make_all_runnable() { + DBUG_EXECUTE_IF("Pipeline::make_all_runnable.sleep", { + auto pipeline_id = DebugPoints::instance()->get_debug_param_or_default( + "Pipeline::make_all_runnable", "pipeline_id", 0); + if (pipeline_id == id()) { + sleep(10); + } + }); + if (_sink->count_down_destination()) { for (auto* task : _tasks) { if (task) { diff --git a/be/src/vec/runtime/shared_hash_table_controller.h b/be/src/vec/runtime/shared_hash_table_controller.h index ff9ad4d0ef48c2..ad4c734112038c 100644 --- a/be/src/vec/runtime/shared_hash_table_controller.h +++ b/be/src/vec/runtime/shared_hash_table_controller.h @@ -69,7 +69,6 @@ struct SharedHashTableContext { std::map runtime_filters; std::atomic signaled = false; bool short_circuit_for_null_in_probe_side = false; - std::atomic complete_build_stage = false; }; using SharedHashTableContextPtr = std::shared_ptr; diff --git a/regression-test/data/query_p0/join/test_slow_close/test_slow_close.out b/regression-test/data/query_p0/join/test_slow_close/test_slow_close.out new file mode 100644 index 00000000000000..cb92be84e47e0b --- /dev/null +++ b/regression-test/data/query_p0/join/test_slow_close/test_slow_close.out @@ -0,0 +1,4 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +0 true + diff --git a/regression-test/suites/query_p0/join/test_slow_close/test_slow_close.groovy b/regression-test/suites/query_p0/join/test_slow_close/test_slow_close.groovy new file mode 100644 index 00000000000000..8d1c33ff9231ce --- /dev/null +++ b/regression-test/suites/query_p0/join/test_slow_close/test_slow_close.groovy @@ -0,0 +1,78 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_slow_close") { + sql "set disable_join_reorder=true;" + sql "set runtime_filter_type='bloom_filter';" + sql "set parallel_pipeline_task_num=3" + sql "set ignore_runtime_filter_ids='1,2';" + sql "set enable_runtime_filter_prune=false;" + + sql """ drop table if exists t1; """ + sql """ drop table if exists t3; """ + sql """ drop table if exists t5; """ + + sql """ + create table t1 ( + k1 int null, + k2 int null + ) + duplicate key (k1) + distributed BY hash(k1) buckets 16 + properties("replication_num" = "1"); + """ + + sql """ + create table t3 ( + k1 int null, + k2 int null + ) + duplicate key (k1) + distributed BY hash(k1) buckets 16 + properties("replication_num" = "1"); + + """ + + sql """ + create table t5 ( + k1 int null, + k2 int null + ) + duplicate key (k1) + distributed BY hash(k1) buckets 16 + properties("replication_num" = "1"); + """ + + sql """ + insert into t1 select e1,e1 from (select 1 k1) as t lateral view explode_numbers(100000) tmp1 as e1; + """ + + sql """ + insert into t3 values(1,1),(2,2),(3,3); + """ + + sql """ + insert into t5 values(1,1),(2,2),(3,3),(4,4),(5,5); + """ + + try { + GetDebugPoint().enableDebugPointForAllBEs("Pipeline::make_all_runnable.sleep",[pipeline_id: 4]) + qt_sql "select count(*),sleep(2) from (select t1.k1 from t5 join [broadcast] t1 on t1.k1=t5.k1) tmp join [broadcast] t3 join t3 t3s [broadcast] on tmp.k1=t3.k1 and t3s.k1=t3.k1 where t3.k2=5;" + } finally { + GetDebugPoint().disableDebugPointForAllBEs("Pipeline::make_all_runnable.sleep") + } +}