Skip to content

Commit

Permalink
ENG-3474: Use thread pool to pickup status tablet when wait is disall…
Browse files Browse the repository at this point in the history
…owed

Summary: Added usage of thread pool, when we cannot pickup status tablet in current thread.

Test Plan:
Launch local cluster and start workload:

java -jar java/yb-loadtester/target/yb-sample-apps.jar \
  -workload CassandraTransactionalKeyValue \
  -num_threads_read 0 -num_threads_write 1 \
  -nodes 127.0.0.1:9042 -num_unique_keys 4 -nouuid

Reviewers: mikhail, robert

Reviewed By: robert

Subscribers: bharat, ybase

Differential Revision: https://phabricator.dev.yugabyte.com/D4991
  • Loading branch information
spolitov committed Jun 19, 2018
1 parent 68bb722 commit 73dfe10
Show file tree
Hide file tree
Showing 10 changed files with 129 additions and 58 deletions.
35 changes: 33 additions & 2 deletions src/yb/client/transaction_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "yb/rpc/tasks_pool.h"

#include "yb/util/random_util.h"
#include "yb/util/thread_restrictions.h"

#include "yb/client/client.h"

Expand Down Expand Up @@ -154,6 +155,29 @@ class PickStatusTabletTask {
PickStatusTabletCallback callback_;
};

class InvokeCallbackTask {
public:
InvokeCallbackTask(TransactionTableState* table_state,
PickStatusTabletCallback callback)
: table_state_(table_state), callback_(std::move(callback)) {
}

void Run() {
InvokeCallback(table_state_->local_tablet_filter, table_state_->tablets, callback_);
}

void Done(const Status& status) {
if (!status.ok()) {
callback_(status);
}
callback_ = PickStatusTabletCallback();
}

private:
TransactionTableState* table_state_;
PickStatusTabletCallback callback_;
};

constexpr size_t kQueueLimit = 150;
constexpr size_t kMaxWorkers = 50;

Expand All @@ -167,13 +191,19 @@ class TransactionManager::Impl {
clock_(clock),
table_state_{std::move(local_tablet_filter)},
thread_pool_("TransactionManager", kQueueLimit, kMaxWorkers),
tasks_pool_(kQueueLimit) {
tasks_pool_(kQueueLimit),
invoke_callback_tasks_(kQueueLimit) {
CHECK(clock);
}

void PickStatusTablet(PickStatusTabletCallback callback) {
if (table_state_.status.load(std::memory_order_acquire) == TransactionTableStatus::kResolved) {
InvokeCallback(table_state_.local_tablet_filter, table_state_.tablets, callback);
if (ThreadRestrictions::IsWaitAllowed()) {
InvokeCallback(table_state_.local_tablet_filter, table_state_.tablets, callback);
} else if (!invoke_callback_tasks_.Enqueue(&thread_pool_, &table_state_, callback)) {
callback(STATUS_FORMAT(ServiceUnavailable, "Invoke callback queue overflow, exists: $0",
invoke_callback_tasks_.size()));
}
return;
}
if (!tasks_pool_.Enqueue(&thread_pool_, client_, &table_state_, std::move(callback))) {
Expand Down Expand Up @@ -216,6 +246,7 @@ class TransactionManager::Impl {
std::atomic<bool> closed_{false};
yb::rpc::ThreadPool thread_pool_; // TODO async operations instead of pool
yb::rpc::TasksPool<PickStatusTabletTask> tasks_pool_;
yb::rpc::TasksPool<InvokeCallbackTask> invoke_callback_tasks_;
yb::rpc::Rpcs rpcs_;
};

Expand Down
3 changes: 2 additions & 1 deletion src/yb/consensus/replica_state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@
#include "yb/util/logging.h"
#include "yb/util/opid.h"
#include "yb/util/status.h"
#include "yb/util/trace.h"
#include "yb/util/tostring.h"
#include "yb/util/trace.h"
#include "yb/util/thread_restrictions.h"
#include "yb/util/enums.h"

DEFINE_int32(inject_delay_commit_pre_voter_to_voter_secs, 0,
Expand Down
1 change: 1 addition & 0 deletions src/yb/rpc/messenger.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
#include "yb/util/size_literals.h"
#include "yb/util/status.h"
#include "yb/util/threadpool.h"
#include "yb/util/thread_restrictions.h"
#include "yb/util/trace.h"

using namespace std::literals;
Expand Down
1 change: 1 addition & 0 deletions src/yb/util/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ set(UTIL_SRCS
coding.cc
concurrent_value.cc
condition_variable.cc
countdown_latch.cc
crc.cc
cross_thread_mutex.cc
crypt.cc
Expand Down
76 changes: 76 additions & 0 deletions src/yb/util/countdown_latch.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright (c) YugaByte, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
// in compliance with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations
// under the License.
//

#include "yb/util/countdown_latch.h"

#include "yb/util/thread_restrictions.h"

namespace yb {

CountDownLatch::CountDownLatch(int count)
: cond_(&lock_),
count_(count) {
}

void CountDownLatch::CountDown(uint64_t amount) {
MutexLock lock(lock_);
if (count_ == 0) {
return;
}

if (amount >= count_) {
count_ = 0;
} else {
count_ -= amount;
}

if (count_ == 0) {
// Latch has triggered.
cond_.Broadcast();
}
}

void CountDownLatch::Wait() const {
ThreadRestrictions::AssertWaitAllowed();
MutexLock lock(lock_);
while (count_ > 0) {
cond_.Wait();
}
}

bool CountDownLatch::WaitFor(const MonoDelta& delta) const {
ThreadRestrictions::AssertWaitAllowed();
MutexLock lock(lock_);
while (count_ > 0) {
if (!cond_.TimedWait(delta)) {
return false;
}
}
return true;
}

void CountDownLatch::Reset(uint64_t count) {
MutexLock lock(lock_);
count_ = count;
if (count_ == 0) {
// Awake any waiters if we reset to 0.
cond_.Broadcast();
}
}

uint64_t CountDownLatch::count() const {
MutexLock lock(lock_);
return count_;
}

} // namespace yb
62 changes: 7 additions & 55 deletions src/yb/util/countdown_latch.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
#include "yb/util/condition_variable.h"
#include "yb/util/monotime.h"
#include "yb/util/mutex.h"
#include "yb/util/thread_restrictions.h"

namespace yb {

Expand All @@ -46,31 +45,12 @@ namespace yb {
class CountDownLatch {
public:
// Initialize the latch with the given initial count.
explicit CountDownLatch(int count)
: cond_(&lock_),
count_(count) {
}
explicit CountDownLatch(int count);

// Decrement the count of this latch by 'amount'
// If the new count is less than or equal to zero, then all waiting threads are woken up.
// If the count is already zero, this has no effect.
void CountDown(uint64_t amount) {
MutexLock lock(lock_);
if (count_ == 0) {
return;
}

if (amount >= count_) {
count_ = 0;
} else {
count_ -= amount;
}

if (count_ == 0) {
// Latch has triggered.
cond_.Broadcast();
}
}
void CountDown(uint64_t amount);

// Decrement the count of this latch.
// If the new count is zero, then all waiting threads are woken up.
Expand All @@ -81,51 +61,23 @@ class CountDownLatch {

// Wait until the count on the latch reaches zero.
// If the count is already zero, this returns immediately.
void Wait() const {
ThreadRestrictions::AssertWaitAllowed();
MutexLock lock(lock_);
while (count_ > 0) {
cond_.Wait();
}
}
void Wait() const;

// Waits for the count on the latch to reach zero, or until 'until' time is reached.
// Returns true if the count became zero, false otherwise.
bool WaitUntil(const MonoTime& when) const {
ThreadRestrictions::AssertWaitAllowed();
MonoDelta relative = when.GetDeltaSince(MonoTime::Now());
return WaitFor(relative);
return WaitFor(when - MonoTime::Now());
}

// Waits for the count on the latch to reach zero, or until 'delta' time elapses.
// Returns true if the count became zero, false otherwise.
bool WaitFor(const MonoDelta& delta) const {
ThreadRestrictions::AssertWaitAllowed();
MutexLock lock(lock_);
while (count_ > 0) {
if (!cond_.TimedWait(delta)) {
return false;
}
}
return true;
}
bool WaitFor(const MonoDelta& delta) const;

// Reset the latch with the given count. This is equivalent to reconstructing
// the latch. If 'count' is 0, and there are currently waiters, those waiters
// will be triggered as if you counted down to 0.
void Reset(uint64_t count) {
MutexLock lock(lock_);
count_ = count;
if (count_ == 0) {
// Awake any waiters if we reset to 0.
cond_.Broadcast();
}
}

uint64_t count() const {
MutexLock lock(lock_);
return count_;
}
void Reset(uint64_t count);
uint64_t count() const;

private:
mutable Mutex lock_;
Expand Down
1 change: 1 addition & 0 deletions src/yb/util/path_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@

#include "yb/util/env_util.h"
#include "yb/util/debug/trace_event.h"
#include "yb/util/thread_restrictions.h"
#include "yb/gutil/gscoped_ptr.h"

using std::string;
Expand Down
4 changes: 4 additions & 0 deletions src/yb/util/thread_restrictions.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ bool ThreadRestrictions::SetWaitAllowed(bool allowed) {
return previous_allowed;
}

bool ThreadRestrictions::IsWaitAllowed() {
return LoadTLS()->wait_allowed;
}

void ThreadRestrictions::AssertWaitAllowed() {
CHECK(LoadTLS()->wait_allowed)
<< "Waiting is not allowed to be used on this thread to prevent "
Expand Down
3 changes: 3 additions & 0 deletions src/yb/util/thread_restrictions.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ class ThreadRestrictions {
// value.
static bool SetWaitAllowed(bool allowed);

static bool IsWaitAllowed();

// Check whether the current thread is allowed to wait/block.
// FATALs if not.
static void AssertWaitAllowed();
Expand All @@ -124,6 +126,7 @@ class ThreadRestrictions {
static void AssertIOAllowed() {}
static bool SetWaitAllowed(bool allowed) { return true; }
static void AssertWaitAllowed() {}
static bool IsWaitAllowed() { return true; }
#endif

private:
Expand Down
1 change: 1 addition & 0 deletions src/yb/yql/cql/ql/util/ql_env.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

#include "yb/master/catalog_manager.h"
#include "yb/rpc/messenger.h"
#include "yb/util/thread_restrictions.h"
#include "yb/util/trace.h"

namespace yb {
Expand Down

0 comments on commit 73dfe10

Please sign in to comment.