This repository has been archived by the owner on Feb 1, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathThreadpool.py
257 lines (215 loc) · 7.55 KB
/
Threadpool.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
import logging
import threading
import queue
"""
Simple task sharing threadpool. Handles fully generic tasks of any kind.
No way to get return values so tasks that returns None are the only functions
supported!
Author: Jeff Chen
Last modified: 5/23/2022
"""
tname = threading.local() # TLV for thread name
class Kill_Queue():
"""
Queue with a built in kill switch with sem == # of available items,
to be used in multithreading
"""
__queue:queue.Queue
__kill:bool # Kill switch for downThreads
__tasks:any # Avalible downloadable resource device
def __init__(self) -> None:
"""
Create queue and set kill to false
"""
self.__queue = queue.Queue(-1)
self.__kill = False
self.__tasks = threading.Semaphore(0)
def kill(self) -> None:
"""
Turns kill switch on
"""
self.__kill = True
def revive(self) -> None:
"""
Turn kill switch off
"""
self.__kill = False
def status(self) -> bool:
"""
Reports if the queue is dead or alive
Return: True if dead, False if alive
"""
return self.__kill
def enqueue(self, task:any) -> None:
"""
Put an item in the queue
"""
self.__queue.put(task)
self.__tasks.release()
def acquire_resource(self) -> None:
"""
Decrement semaphore keeping track of queue items
"""
self.__tasks.acquire()
def release_resource(self) -> None:
"""
Increment semaphore keeping track of queue items.
Does not need to be called after enqueue as it
increments the semaphore automatically
"""
self.__tasks.release()
def dequeue(self) -> any:
"""
Removes an item
Pre: acquire_resource was called first
Return item in front of the queue
"""
return self.__queue.get()
def task_done(self) -> None:
"""
Indicates queue task was completed
Pre: dequeue was called, thread task was completed
"""
self.__queue.task_done()
def join_queue(self) -> None:
"""
Blocks until all task queue items have been processed
"""
self.__queue.join()
def get_qsize(self) -> int:
"""
Get queue size (unreliable)
Return: queue size
"""
return self.__queue.qsize()
class ThreadPool():
"""
Very basic task sharing threadpool, does not support futures.
Thread local variables:
tname.name: Thread name
tname.id: thread id
"""
# Download task queue, Contains tuples in the structure: (func(),(args1,args2,...))
__task_queue:Kill_Queue
__threads:list # List of threads in the threadpool
__tcount:int # Number of threads
__alive:bool # Checks if the threadpool is alive
def __init__(self, tcount:int) -> None:
"""
Initializes a threadpool
Param:
tcount: Number of threads for the threadpool
"""
self.__task_queue = Kill_Queue()
self.__tcount = tcount
self.__alive = False
def start_threads(self) -> None:
"""
Creates count number of downThreads and starts it
Param:
count: how many threads to create
Return: Threads
"""
self.__threads = []
# Spawn threads
for i in range(0, self.__tcount):
self.__threads.append(ThreadPool.TaskThread(i, self.__task_queue))
self.__threads[i].start()
self.__alive = True
logging.debug(str(self.__tcount) + " threads have been started")
def kill_threads(self) -> None:
"""
Kills all threads in threadpool. Threads are restarted and killed using a
switch, deadlocked or infinitely running threads cannot be killed using
this function.
"""
self.__task_queue.kill()
for i in range(0, len(self.__threads)):
self.__task_queue.release_resource()
for i in self.__threads:
i.join()
self.__alive = False
self.__task_queue.revive()
logging.debug(str(len(self.__threads)) + " threads have been terminated")
def enqueue(self, task:tuple) -> None:
"""
Put an item in task queue
Param:
task: tuple in the structure (func(),(args1,args2,...))
"""
logging.debug("Enqueued into task queue: " + str(task))
self.__task_queue.enqueue(task)
def enqueue_queue(self, task_list:queue.Queue) -> None:
"""
Put an queue in task queue. Each queue element will be 'get()' and then
task_done()
Param:
task_list: queue of task tuples following the structure (func(),(args1,args2,...))
"""
logging.debug("Enqueued into task queue: " + str(task_list))
size = task_list.qsize()
for i in range(0,size):
self.__task_queue.enqueue(task_list)
def join_queue(self) -> None:
"""
Blocks until all task queue items have been processed
"""
logging.debug("Blocking until all tasks are complete")
self.__task_queue.join_queue()
def get_qsize(self) -> int:
"""
Get queue size (unreliable)
Return: task queue size
"""
return self.__task_queue.get_qsize()
def get_status(self) -> bool:
"""
Check if the threadpool is alive
Return: True if alive, false if not
"""
return self.__alive
class TaskThread(threading.Thread):
"""
Fully generic threadpool where tasks of any kind is stored and retrieved in task_queue,
threads are daemon threads and can be killed using kill variable.
"""
__id: int
__task_queue:Kill_Queue
def __init__(self, id: int, task_queue:Kill_Queue) -> None:
"""
Initializes thread with a thread name
Param:
id: thread identifier
task_queue: Queue to get tasks from
tasks: Semaphore assoaciated with task queue
"""
self.__id = id
self.__task_queue = task_queue
super(ThreadPool.TaskThread, self).__init__(daemon=True)
def run(self) -> None:
"""
Worker thread job. Blocks until a task is avalable via downloadables
and retreives the task from download_queue
"""
tname.name = "Thread #" + str(self.__id)
tname.id = self.__id
while True:
# Wait until download is available
self.__task_queue.acquire_resource()
# Check kill signal
if self.__task_queue.status():
logging.debug(tname.name + " has terminated")
return
# Pop queue and download it
todo = self.__task_queue.dequeue()
# If dequeued element is a queue, we process it like its our queue
if type(todo) is queue.Queue:
monitored_todo = todo.get()
logging.debug(tname.name + " (From SubQueue) Processing: " + str(monitored_todo))
monitored_todo[0](*monitored_todo[1])
todo.task_done()
# Else, process the task directly
else:
logging.debug(tname.name + " Processing: " + str(todo))
todo[0](*todo[1])
self.__task_queue.task_done()