-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy paththread_pool_manager.hpp
165 lines (151 loc) · 4.41 KB
/
thread_pool_manager.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
#ifndef THREAD_POOL_MANAGER_HPP
#define THREAD_POOL_MANAGER_HPP
#include <thread>
#include <future>
#include <queue>
#include <mutex>
#include <tuple>
#include <atomic>
#include <condition_variable>
#include <exception>
struct MultithreadLevelException : public std::exception
{
const char *what() const throw()
{
return "MultithreadLevelException: Cannot start a thread pool with less then 1 thread!";
}
};
/**
* @brief The following class defines a thread library whose aim is to create a persistent pool of threads,
* allowing for the assignement of new tasks without the need to manually create and join each single thread.
*
*/
template <class ProtectionMethod>
class thread_pool_manager
{
private:
std::vector<std::thread> threadsPool;
std::queue<std::function<void()>> tasksQueue;
std::mutex poolMutex;
std::mutex queueMutex;
std::condition_variable poolCondition;
bool terminatePool;
/**
* @brief The following method creates as many threads as specified by the numOfThreads param.
*
* @param numOfThreads
*/
void createThreads(int numOfThreads)
{
terminatePool = false;
if (numOfThreads > 0)
{
for (int i = 0; i < numOfThreads; i++)
{
threadsPool.push_back(
std::thread(&thread_pool_manager::waitForTasks, this));
}
}
else
{
throw MultithreadLevelException();
}
}
/**
* @brief The following method keeps each created thread on idle, waiting for a new task to be executed.
* When a new task becomes available, one of the idle threads will be awakened in order to process the given task.
*
*/
void waitForTasks()
{
std::function<void()> taskToPerform;
while (true)
{
{
std::unique_lock<std::mutex> lock(queueMutex);
poolCondition.wait(lock, [&]
{ return !tasksQueue.empty() || terminatePool; });
if (!tasksQueue.empty())
{
taskToPerform = tasksQueue.front();
tasksQueue.pop();
}
else
{
return;
}
}
if (taskToPerform != nullptr)
{
taskToPerform();
}
}
}
public:
ProtectionMethod protection;
thread_pool_manager(const thread_pool_manager &) = delete;
thread_pool_manager &operator=(const thread_pool_manager &) = delete;
/**
* @brief The following constructor launches the creation of as many threads
* as the hardware concurrency of the running machine.
*
*/
thread_pool_manager()
{
createThreads(std::thread::hardware_concurrency());
}
/**
* @brief The following constructor launches the creation of as many threads
* as specified by the argument.
*
* @param numOfThreads
*/
thread_pool_manager(int numOfThreads)
{
createThreads(numOfThreads);
}
/**
* @brief The following method returns the number of created threads.
*
* @return int
*/
int getPoolSize()
{
return threadsPool.size();
}
/**
* @brief The following method pushes the new task into the queue of tasks, so to execute it as soon as a thread
* will be on idle.
*
* @param threadIndex
*/
void executeTask(std::function<void()> &&newTask)
{
{
std::lock_guard<std::mutex> lock(queueMutex);
tasksQueue.push(std::function<void()>(newTask));
}
poolCondition.notify_one();
}
/**
* @brief The following method shutdowns the pool of threads.
*
*/
void shutdown()
{
{
std::lock_guard<std::mutex> thMutex(poolMutex);
terminatePool = true;
}
poolCondition.notify_all();
// Join all threads.
for (std::thread &actThread : threadsPool)
{
actThread.join();
}
// Vector and queue emptying
/* threadsPool.clear();
std::queue<std::function<void()>>().swap(tasksQueue); */
}
};
#endif // THREAD_POOL_MANAGER_HPP