Skip to content

Commit

Permalink
Using threadpool2 as real thread pool
Browse files Browse the repository at this point in the history
  • Loading branch information
COM8 committed Jan 26, 2025
1 parent 46bf22f commit e9a7882
Show file tree
Hide file tree
Showing 11 changed files with 292 additions and 577 deletions.
1 change: 0 additions & 1 deletion cpr/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ add_library(cpr
proxyauth.cpp
session.cpp
threadpool.cpp
threadpool2.cpp
timeout.cpp
unix_socket.cpp
util.cpp
Expand Down
224 changes: 117 additions & 107 deletions cpr/threadpool.cpp
Original file line number Diff line number Diff line change
@@ -1,160 +1,170 @@
#include "cpr/threadpool.h"
#include <algorithm>
#include <cassert>
#include <chrono>
#include <condition_variable>
#include <cstddef>
#include <ctime>
#include <functional>
#include <memory>
#include <mutex>
#include <thread>
#include <utility>

namespace cpr {
size_t ThreadPool::DEFAULT_MAX_THREAD_COUNT = std::thread::hardware_concurrency();

ThreadPool::ThreadPool(size_t min_threads, size_t max_threads, std::chrono::milliseconds max_idle_ms) : min_thread_num(min_threads), max_thread_num(max_threads), max_idle_time(max_idle_ms) {}
ThreadPool::ThreadPool(size_t minThreadCount, size_t maxThreadCount) : minThreadCount(minThreadCount), maxThreadCount(maxThreadCount) {
assert(minThreadCount <= maxThreadCount);
Start();
}

ThreadPool::~ThreadPool() {
Stop();
}

int ThreadPool::Start(size_t start_threads) {
if (status != STOP) {
return -1;
}
status = RUNNING;
start_threads = std::clamp(start_threads, min_thread_num, max_thread_num);
for (size_t i = 0; i < start_threads; ++i) {
CreateThread();
}
return 0;
ThreadPool::State ThreadPool::GetState() const {
return state.load();
}

int ThreadPool::Stop() {
const std::unique_lock status_lock(status_wait_mutex);
if (status == STOP) {
return -1;
}
size_t ThreadPool::GetMaxThreadCount() const {
return maxThreadCount.load();
}

status = STOP;
status_wait_cond.notify_all();
task_cond.notify_all();
size_t ThreadPool::GetCurThreadCount() const {
return curThreadCount.load();
}

for (auto& i : threads) {
if (i.thread->joinable()) {
i.thread->join();
}
}
size_t ThreadPool::GetIdleThreadCount() const {
return idleThreadCount.load();
}

threads.clear();
cur_thread_num = 0;
idle_thread_num = 0;
return 0;
size_t ThreadPool::GetMinThreadCount() const {
return minThreadCount.load();
}

int ThreadPool::Pause() {
if (status == RUNNING) {
status = PAUSE;
void ThreadPool::SetMinThreadCount(size_t minThreadCount) {
assert(minThreadCount <= maxThreadCount);
this->minThreadCount = minThreadCount;
}

void ThreadPool::SetMaxThreadCount(size_t maxThreadCount) {
assert(minThreadCount <= maxThreadCount);
this->maxThreadCount = maxThreadCount;
}

void ThreadPool::Start() {
const std::unique_lock lock(controlMutex);
if (setState(State::RUNNING)) {
for (size_t i = 0; i < std::max(minThreadCount.load(), tasks.size()); i++) {
addThread();
}
}
return 0;
}

int ThreadPool::Resume() {
const std::unique_lock status_lock(status_wait_mutex);
if (status == PAUSE) {
status = RUNNING;
status_wait_cond.notify_all();
void ThreadPool::Stop() {
const std::unique_lock controlLock(controlMutex);
setState(State::STOP);
taskQueueCondVar.notify_all();

// Join all workers
const std::unique_lock workersLock{workerMutex};
auto iter = workers.begin();
while (iter != workers.end()) {
if (iter->thread->joinable()) {
iter->thread->join();
}
iter = workers.erase(iter);
}
return 0;
}

int ThreadPool::Wait() {
void ThreadPool::Wait() {
while (true) {
if (status == STOP || (tasks.empty() && idle_thread_num == cur_thread_num)) {
if ((state != State::RUNNING && curThreadCount <= 0) || (tasks.empty() && curThreadCount <= idleThreadCount)) {
break;
}
std::this_thread::yield();
}
return 0;
}

bool ThreadPool::CreateThread() {
if (cur_thread_num >= max_thread_num) {
bool ThreadPool::setState(State state) {
const std::unique_lock lock(controlMutex);
if (this->state == state) {
return false;
}
std::thread* thread = new std::thread([this] {
bool initialRun = true;
while (status != STOP) {
{
std::unique_lock status_lock(status_wait_mutex);
status_wait_cond.wait(status_lock, [this]() { return status != Status::PAUSE; });
this->state = state;
return true;
}

void ThreadPool::addThread() {
assert(state != State::STOP);

const std::unique_lock lock{workerMutex};
workers.emplace_back();
workers.back().thread = std::make_unique<std::thread>(&ThreadPool::threadFunc, this, std::ref(workers.back()));
curThreadCount++;
idleThreadCount++;
}

void ThreadPool::threadFunc(WorkerThread& workerThread) {
while (true) {
std::cv_status result{std::cv_status::no_timeout};
{
std::unique_lock lock(taskQueueMutex);
if (tasks.empty()) {
result = taskQueueCondVar.wait_for(lock, std::chrono::milliseconds(250));
}
}

if (state == State::STOP) {
curThreadCount--;
break;
}

// A timeout has been reached check if we should cleanup the thread
if (result == std::cv_status::timeout) {
const std::unique_lock lock(controlMutex);
if (curThreadCount > minThreadCount) {
curThreadCount--;
break;
}
}

Task task;
{
std::unique_lock<std::mutex> locker(task_mutex);
task_cond.wait_for(locker, std::chrono::milliseconds(max_idle_time), [this]() { return status == STOP || !tasks.empty(); });
if (status == STOP) {
return;
}
if (tasks.empty()) {
if (cur_thread_num > min_thread_num) {
DelThread(std::this_thread::get_id());
return;
}
continue;
}
if (!initialRun) {
--idle_thread_num;
}
// Check for tasks and execute one
std::function<void()> task;
{
const std::unique_lock lock(taskQueueMutex);
if (!tasks.empty()) {
idleThreadCount--;
task = std::move(tasks.front());
tasks.pop();
}
if (task) {
task();
++idle_thread_num;
if (initialRun) {
initialRun = false;
}
}
}
});
AddThread(thread);
return true;

// Execute the task
if (task) {
task();
idleThreadCount++;
}
}

workerThread.state = State::STOP;

// Mark worker thread to be removed
workerJoinReadyCount++;
idleThreadCount--;
}

void ThreadPool::AddThread(std::thread* thread) {
thread_mutex.lock();
++cur_thread_num;
ThreadData data;
data.thread = std::shared_ptr<std::thread>(thread);
data.id = thread->get_id();
data.status = RUNNING;
data.start_time = std::chrono::steady_clock::now();
data.stop_time = std::chrono::steady_clock::time_point::max();
threads.emplace_back(data);
thread_mutex.unlock();
}

void ThreadPool::DelThread(std::thread::id id) {
const std::chrono::steady_clock::time_point now = std::chrono::steady_clock::now();

thread_mutex.lock();
--cur_thread_num;
--idle_thread_num;
auto iter = threads.begin();
while (iter != threads.end()) {
if (iter->status == STOP && now > iter->stop_time) {
void ThreadPool::joinStoppedThreads() {
const std::unique_lock lock{workerMutex};
auto iter = workers.begin();
while (iter != workers.end()) {
if (iter->state == State::STOP) {
if (iter->thread->joinable()) {
iter->thread->join();
iter = threads.erase(iter);
continue;
}
} else if (iter->id == id) {
iter->status = STOP;
iter->stop_time = std::chrono::steady_clock::now();
iter = workers.erase(iter);
workerJoinReadyCount--;
}
++iter;
}
thread_mutex.unlock();
}

} // namespace cpr
Loading

0 comments on commit e9a7882

Please sign in to comment.