From 73dfe10f114a113f0213a17f786b5da4481d1a5d Mon Sep 17 00:00:00 2001 From: Sergei Politov Date: Tue, 19 Jun 2018 20:23:14 +0300 Subject: [PATCH] ENG-3474: Use thread pool to pickup status tablet when wait is disallowed Summary: Added usage of thread pool, when we cannot pickup status tablet in current thread. Test Plan: Launch local cluster and start workload: java -jar java/yb-loadtester/target/yb-sample-apps.jar \ -workload CassandraTransactionalKeyValue \ -num_threads_read 0 -num_threads_write 1 \ -nodes 127.0.0.1:9042 -num_unique_keys 4 -nouuid Reviewers: mikhail, robert Reviewed By: robert Subscribers: bharat, ybase Differential Revision: https://phabricator.dev.yugabyte.com/D4991 --- src/yb/client/transaction_manager.cc | 35 ++++++++++++- src/yb/consensus/replica_state.cc | 3 +- src/yb/rpc/messenger.cc | 1 + src/yb/util/CMakeLists.txt | 1 + src/yb/util/countdown_latch.cc | 76 ++++++++++++++++++++++++++++ src/yb/util/countdown_latch.h | 62 +++-------------------- src/yb/util/path_util.cc | 1 + src/yb/util/thread_restrictions.cc | 4 ++ src/yb/util/thread_restrictions.h | 3 ++ src/yb/yql/cql/ql/util/ql_env.cc | 1 + 10 files changed, 129 insertions(+), 58 deletions(-) create mode 100644 src/yb/util/countdown_latch.cc diff --git a/src/yb/client/transaction_manager.cc b/src/yb/client/transaction_manager.cc index 4ddf62bd11c9..43e551c1b66a 100644 --- a/src/yb/client/transaction_manager.cc +++ b/src/yb/client/transaction_manager.cc @@ -20,6 +20,7 @@ #include "yb/rpc/tasks_pool.h" #include "yb/util/random_util.h" +#include "yb/util/thread_restrictions.h" #include "yb/client/client.h" @@ -154,6 +155,29 @@ class PickStatusTabletTask { PickStatusTabletCallback callback_; }; +class InvokeCallbackTask { + public: + InvokeCallbackTask(TransactionTableState* table_state, + PickStatusTabletCallback callback) + : table_state_(table_state), callback_(std::move(callback)) { + } + + void Run() { + InvokeCallback(table_state_->local_tablet_filter, table_state_->tablets, callback_); + } + + void Done(const Status& status) { + if (!status.ok()) { + callback_(status); + } + callback_ = PickStatusTabletCallback(); + } + + private: + TransactionTableState* table_state_; + PickStatusTabletCallback callback_; +}; + constexpr size_t kQueueLimit = 150; constexpr size_t kMaxWorkers = 50; @@ -167,13 +191,19 @@ class TransactionManager::Impl { clock_(clock), table_state_{std::move(local_tablet_filter)}, thread_pool_("TransactionManager", kQueueLimit, kMaxWorkers), - tasks_pool_(kQueueLimit) { + tasks_pool_(kQueueLimit), + invoke_callback_tasks_(kQueueLimit) { CHECK(clock); } void PickStatusTablet(PickStatusTabletCallback callback) { if (table_state_.status.load(std::memory_order_acquire) == TransactionTableStatus::kResolved) { - InvokeCallback(table_state_.local_tablet_filter, table_state_.tablets, callback); + if (ThreadRestrictions::IsWaitAllowed()) { + InvokeCallback(table_state_.local_tablet_filter, table_state_.tablets, callback); + } else if (!invoke_callback_tasks_.Enqueue(&thread_pool_, &table_state_, callback)) { + callback(STATUS_FORMAT(ServiceUnavailable, "Invoke callback queue overflow, exists: $0", + invoke_callback_tasks_.size())); + } return; } if (!tasks_pool_.Enqueue(&thread_pool_, client_, &table_state_, std::move(callback))) { @@ -216,6 +246,7 @@ class TransactionManager::Impl { std::atomic closed_{false}; yb::rpc::ThreadPool thread_pool_; // TODO async operations instead of pool yb::rpc::TasksPool tasks_pool_; + yb::rpc::TasksPool invoke_callback_tasks_; yb::rpc::Rpcs rpcs_; }; diff --git a/src/yb/consensus/replica_state.cc b/src/yb/consensus/replica_state.cc index 03e98254eac9..19f588659eed 100644 --- a/src/yb/consensus/replica_state.cc +++ b/src/yb/consensus/replica_state.cc @@ -45,8 +45,9 @@ #include "yb/util/logging.h" #include "yb/util/opid.h" #include "yb/util/status.h" -#include "yb/util/trace.h" #include "yb/util/tostring.h" +#include "yb/util/trace.h" +#include "yb/util/thread_restrictions.h" #include "yb/util/enums.h" DEFINE_int32(inject_delay_commit_pre_voter_to_voter_secs, 0, diff --git a/src/yb/rpc/messenger.cc b/src/yb/rpc/messenger.cc index 251b9e09ac22..b049709a1cfb 100644 --- a/src/yb/rpc/messenger.cc +++ b/src/yb/rpc/messenger.cc @@ -70,6 +70,7 @@ #include "yb/util/size_literals.h" #include "yb/util/status.h" #include "yb/util/threadpool.h" +#include "yb/util/thread_restrictions.h" #include "yb/util/trace.h" using namespace std::literals; diff --git a/src/yb/util/CMakeLists.txt b/src/yb/util/CMakeLists.txt index 6929b8621bf5..de36e0f3fdcb 100644 --- a/src/yb/util/CMakeLists.txt +++ b/src/yb/util/CMakeLists.txt @@ -138,6 +138,7 @@ set(UTIL_SRCS coding.cc concurrent_value.cc condition_variable.cc + countdown_latch.cc crc.cc cross_thread_mutex.cc crypt.cc diff --git a/src/yb/util/countdown_latch.cc b/src/yb/util/countdown_latch.cc new file mode 100644 index 000000000000..4115f0a6e862 --- /dev/null +++ b/src/yb/util/countdown_latch.cc @@ -0,0 +1,76 @@ +// Copyright (c) YugaByte, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations +// under the License. +// + +#include "yb/util/countdown_latch.h" + +#include "yb/util/thread_restrictions.h" + +namespace yb { + +CountDownLatch::CountDownLatch(int count) + : cond_(&lock_), + count_(count) { +} + +void CountDownLatch::CountDown(uint64_t amount) { + MutexLock lock(lock_); + if (count_ == 0) { + return; + } + + if (amount >= count_) { + count_ = 0; + } else { + count_ -= amount; + } + + if (count_ == 0) { + // Latch has triggered. + cond_.Broadcast(); + } +} + +void CountDownLatch::Wait() const { + ThreadRestrictions::AssertWaitAllowed(); + MutexLock lock(lock_); + while (count_ > 0) { + cond_.Wait(); + } +} + +bool CountDownLatch::WaitFor(const MonoDelta& delta) const { + ThreadRestrictions::AssertWaitAllowed(); + MutexLock lock(lock_); + while (count_ > 0) { + if (!cond_.TimedWait(delta)) { + return false; + } + } + return true; +} + +void CountDownLatch::Reset(uint64_t count) { + MutexLock lock(lock_); + count_ = count; + if (count_ == 0) { + // Awake any waiters if we reset to 0. + cond_.Broadcast(); + } +} + +uint64_t CountDownLatch::count() const { + MutexLock lock(lock_); + return count_; +} + +} // namespace yb diff --git a/src/yb/util/countdown_latch.h b/src/yb/util/countdown_latch.h index dd289ea43407..fae2bf74a31a 100644 --- a/src/yb/util/countdown_latch.h +++ b/src/yb/util/countdown_latch.h @@ -36,7 +36,6 @@ #include "yb/util/condition_variable.h" #include "yb/util/monotime.h" #include "yb/util/mutex.h" -#include "yb/util/thread_restrictions.h" namespace yb { @@ -46,31 +45,12 @@ namespace yb { class CountDownLatch { public: // Initialize the latch with the given initial count. - explicit CountDownLatch(int count) - : cond_(&lock_), - count_(count) { - } + explicit CountDownLatch(int count); // Decrement the count of this latch by 'amount' // If the new count is less than or equal to zero, then all waiting threads are woken up. // If the count is already zero, this has no effect. - void CountDown(uint64_t amount) { - MutexLock lock(lock_); - if (count_ == 0) { - return; - } - - if (amount >= count_) { - count_ = 0; - } else { - count_ -= amount; - } - - if (count_ == 0) { - // Latch has triggered. - cond_.Broadcast(); - } - } + void CountDown(uint64_t amount); // Decrement the count of this latch. // If the new count is zero, then all waiting threads are woken up. @@ -81,51 +61,23 @@ class CountDownLatch { // Wait until the count on the latch reaches zero. // If the count is already zero, this returns immediately. - void Wait() const { - ThreadRestrictions::AssertWaitAllowed(); - MutexLock lock(lock_); - while (count_ > 0) { - cond_.Wait(); - } - } + void Wait() const; // Waits for the count on the latch to reach zero, or until 'until' time is reached. // Returns true if the count became zero, false otherwise. bool WaitUntil(const MonoTime& when) const { - ThreadRestrictions::AssertWaitAllowed(); - MonoDelta relative = when.GetDeltaSince(MonoTime::Now()); - return WaitFor(relative); + return WaitFor(when - MonoTime::Now()); } // Waits for the count on the latch to reach zero, or until 'delta' time elapses. // Returns true if the count became zero, false otherwise. - bool WaitFor(const MonoDelta& delta) const { - ThreadRestrictions::AssertWaitAllowed(); - MutexLock lock(lock_); - while (count_ > 0) { - if (!cond_.TimedWait(delta)) { - return false; - } - } - return true; - } + bool WaitFor(const MonoDelta& delta) const; // Reset the latch with the given count. This is equivalent to reconstructing // the latch. If 'count' is 0, and there are currently waiters, those waiters // will be triggered as if you counted down to 0. - void Reset(uint64_t count) { - MutexLock lock(lock_); - count_ = count; - if (count_ == 0) { - // Awake any waiters if we reset to 0. - cond_.Broadcast(); - } - } - - uint64_t count() const { - MutexLock lock(lock_); - return count_; - } + void Reset(uint64_t count); + uint64_t count() const; private: mutable Mutex lock_; diff --git a/src/yb/util/path_util.cc b/src/yb/util/path_util.cc index c243b6f54d6d..83d44bb58bf6 100644 --- a/src/yb/util/path_util.cc +++ b/src/yb/util/path_util.cc @@ -48,6 +48,7 @@ #include "yb/util/env_util.h" #include "yb/util/debug/trace_event.h" +#include "yb/util/thread_restrictions.h" #include "yb/gutil/gscoped_ptr.h" using std::string; diff --git a/src/yb/util/thread_restrictions.cc b/src/yb/util/thread_restrictions.cc index 2f26ad1b4f62..10d58029095d 100644 --- a/src/yb/util/thread_restrictions.cc +++ b/src/yb/util/thread_restrictions.cc @@ -83,6 +83,10 @@ bool ThreadRestrictions::SetWaitAllowed(bool allowed) { return previous_allowed; } +bool ThreadRestrictions::IsWaitAllowed() { + return LoadTLS()->wait_allowed; +} + void ThreadRestrictions::AssertWaitAllowed() { CHECK(LoadTLS()->wait_allowed) << "Waiting is not allowed to be used on this thread to prevent " diff --git a/src/yb/util/thread_restrictions.h b/src/yb/util/thread_restrictions.h index 361991e0d187..1838afad1f0f 100644 --- a/src/yb/util/thread_restrictions.h +++ b/src/yb/util/thread_restrictions.h @@ -114,6 +114,8 @@ class ThreadRestrictions { // value. static bool SetWaitAllowed(bool allowed); + static bool IsWaitAllowed(); + // Check whether the current thread is allowed to wait/block. // FATALs if not. static void AssertWaitAllowed(); @@ -124,6 +126,7 @@ class ThreadRestrictions { static void AssertIOAllowed() {} static bool SetWaitAllowed(bool allowed) { return true; } static void AssertWaitAllowed() {} + static bool IsWaitAllowed() { return true; } #endif private: diff --git a/src/yb/yql/cql/ql/util/ql_env.cc b/src/yb/yql/cql/ql/util/ql_env.cc index 9e63e5171faf..f307415f2bec 100644 --- a/src/yb/yql/cql/ql/util/ql_env.cc +++ b/src/yb/yql/cql/ql/util/ql_env.cc @@ -23,6 +23,7 @@ #include "yb/master/catalog_manager.h" #include "yb/rpc/messenger.h" +#include "yb/util/thread_restrictions.h" #include "yb/util/trace.h" namespace yb {