forked from progschj/ThreadPool
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy paththread_pool.h
149 lines (130 loc) · 3.55 KB
/
thread_pool.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
#ifndef _PTHREAD_POOL_
#define _PTHREAD_POOL_
#include "locker.h"
#include <list>
#include <stdio.h>
#include <exception>
#include <errno.h>
#include <pthread.h>
#include <iostream>
#include <stdlib.h>
#include <vector>
template<class T>
class threadpool
{
private:
int thread_number;
int max_task_number;
std::list<T *> task_queue;
mutex_locker queue_mutex_locker;
sem_locker queue_sem_locker;
bool is_stop;
public:
threadpool(int thread_num = 20, int max_task_num = 30);
~threadpool();
bool append_task(T *task);
void start();
void stop();
std::vector<pthread_t> all_threads;
private:
static void *worker(void *arg);
void run();
};
template <class T>
threadpool<T>::threadpool(int thread_num, int max_task_num):
thread_number(thread_num),
max_task_number(max_task_num),
is_stop(false)
{
all_threads.clear();
if((thread_num <= 0) || max_task_num <= 0) {
std::cout << "thread pool can't init because thread_number = 0 or max_task_number = 0.\n";
exit(-1);
}
for(size_t i = 0; i < thread_num; ++i) {
pthread_t _thread;
all_threads.push_back(_thread);
}
if(all_threads.empty()) {
std::cout << "Can't initial thread pool because thread array can't new.";
exit(-1);
}
}
template <class T>
threadpool<T>::~threadpool()
{
std::cout << __FUNCTION__ << " start.\n";
all_threads.clear();
is_stop = true;
}
template <class T>
void threadpool<T>::stop()
{
is_stop = true;
queue_sem_locker.add();
}
template <class T>
void threadpool<T>::start() {
std::cout << __FUNCTION__ << " start.\n";
for(int i = 0; i < thread_number; ++i) {
std::cout << "create the " << i << "th thread.\n";
if(pthread_create(&(all_threads[i]), NULL, worker, this) != 0) {
all_threads.clear();
throw std::exception();
}
// if(pthread_detach(all_threads[i])) {
// all_threads.clear();
// throw std::exception();
// }
}
}
template <class T>
bool threadpool<T>::append_task(T *task) {
std::cout << __FUNCTION__ << " start.\n";
queue_mutex_locker.mutex_lock();
std::cout << "task_queue.size(): " << task_queue.size() << "\n";
if(task_queue.size() >= max_task_number) {
queue_mutex_locker.mutex_unlock();
return false;
}
task_queue.push_back(task);
queue_mutex_locker.mutex_unlock();
queue_sem_locker.add();
std::cout << "queue_sem_locker.add()\n";
return true;
}
template <class T>
void *threadpool<T>::worker(void *arg) {
std::cout << __FUNCTION__ << " start, pthreadId: " << (unsigned long)pthread_self() << ".\n";
threadpool *pool = (threadpool *)arg;
pool->run();
return pool;
}
template <class T>
void threadpool<T>::run() {
std::cout << __FUNCTION__ << " start.\n";
while(!is_stop) {
std::cout << "Waiting 4 semaphore.\n";
queue_sem_locker.wait();
std::cout << "Got a semaphore.\n";
if(errno == EINTR) {
std::cout << "error.\n";
continue;
}
queue_mutex_locker.mutex_lock();
if(task_queue.empty()) {
queue_mutex_locker.mutex_unlock();
continue;
}
T *task = task_queue.front();
task_queue.pop_front();
queue_mutex_locker.mutex_unlock();
if(!task) {
continue;
}
std::cout << "pthreadId = " << (unsigned long)pthread_self() << "\n";
task->doit();
}
std::cout << "Close " << (unsigned long)pthread_self() << "\n";
}
#endif