forked from pyqt/examples
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy paththreadutil_blocking.py
112 lines (99 loc) · 3.42 KB
/
threadutil_blocking.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
"""
A more powerful, synchronous implementation of run_in_main_thread(...).
It allows you to receive results from the function invocation:
@run_in_main_thread
def return_2():
return 2
# Runs the above function in the main thread and prints '2':
print(return_2())
"""
from functools import wraps
from PyQt5.QtCore import pyqtSignal, QObject, QThread
from PyQt5.QtWidgets import QApplication
from threading import Event, get_ident
def run_in_thread(thread_fn):
def decorator(f):
@wraps(f)
def result(*args, **kwargs):
thread = thread_fn()
return Executor.instance().run_in_thread(thread, f, args, kwargs)
return result
return decorator
def _main_thread():
app = QApplication.instance()
if app:
return app.thread()
# We reach here in tests that don't (want to) create a QApplication.
if int(QThread.currentThreadId()) == get_ident():
return QThread.currentThread()
raise RuntimeError('Could not determine main thread')
run_in_main_thread = run_in_thread(_main_thread)
def is_in_main_thread():
return QThread.currentThread() == _main_thread()
class Executor:
_INSTANCE = None
@classmethod
def instance(cls):
if cls._INSTANCE is None:
cls._INSTANCE = cls(QApplication.instance())
return cls._INSTANCE
def __init__(self, app):
self._pending_tasks = []
self._app_is_about_to_quit = False
app.aboutToQuit.connect(self._about_to_quit)
def _about_to_quit(self):
self._app_is_about_to_quit = True
for task in self._pending_tasks:
task.set_exception(SystemExit())
task.has_run.set()
def run_in_thread(self, thread, f, args, kwargs):
if QThread.currentThread() == thread:
return f(*args, **kwargs)
elif self._app_is_about_to_quit:
# In this case, the target thread's event loop most likely is not
# running any more. This would mean that our task (which is
# submitted to the event loop via signals/slots) is never run.
raise SystemExit()
task = Task(f, args, kwargs)
self._pending_tasks.append(task)
try:
receiver = Receiver(task)
receiver.moveToThread(thread)
sender = Sender()
sender.signal.connect(receiver.slot)
sender.signal.emit()
task.has_run.wait()
return task.result
finally:
self._pending_tasks.remove(task)
class Task:
def __init__(self, fn, args, kwargs):
self._fn = fn
self._args = args
self._kwargs = kwargs
self.has_run = Event()
self._result = self._exception = None
def __call__(self):
try:
self._result = self._fn(*self._args, **self._kwargs)
except Exception as e:
self._exception = e
finally:
self.has_run.set()
def set_exception(self, exception):
self._exception = exception
@property
def result(self):
if not self.has_run.is_set():
raise ValueError("Hasn't run.")
if self._exception:
raise self._exception
return self._result
class Sender(QObject):
signal = pyqtSignal()
class Receiver(QObject):
def __init__(self, callback, parent=None):
super().__init__(parent)
self.callback = callback
def slot(self):
self.callback()