Skip to content

Commit

Permalink
Do not create thread if tests_per_worker==1
Browse files Browse the repository at this point in the history
Avoid creating a thread when there is one worker. This allows
benefitting from signal-based communication with the test workers when
threads are not used (e.g., for timeouts as in solves
kevlened#96)
  • Loading branch information
tovmeod authored and andrea-segalini committed Feb 13, 2024
1 parent e2bd6ee commit c6f5d05
Showing 1 changed file with 37 additions and 30 deletions.
67 changes: 37 additions & 30 deletions pytest_parallel/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import threading
import multiprocessing
from tblib import pickling_support
from multiprocessing import Manager, Process
from multiprocessing import current_process, Manager, Process

# In Python 3.8 and later, the default on macOS is spawn.
# We force forking behavior at the expense of safety.
Expand Down Expand Up @@ -62,12 +62,41 @@ def process_with_threads(config, queue, session, tests_per_worker, errors):
# so we know we are running as a worker.
config.parallel_worker = True

threads = []
for _ in range(tests_per_worker):
thread = ThreadWorker(queue, session, errors)
thread.start()
threads.append(thread)
[t.join() for t in threads]
if tests_per_worker == 1:
worker_run(current_process().name, queue, session, errors)
else:
threads = []
for _ in range(tests_per_worker):
thread = ThreadWorker(queue, session, errors)
thread.start()
threads.append(thread)
[t.join() for t in threads]


def worker_run(name, queue, session, errors):
pickling_support.install()
while True:
try:
index = queue.get()
if index == 'stop':
queue.task_done()
break
except ConnectionRefusedError:
time.sleep(.1)
continue
item = session.items[index]
try:
run_test(session, item, None)
except BaseException:
import pickle
import sys

errors.put((name, pickle.dumps(sys.exc_info())))
finally:
try:
queue.task_done()
except ConnectionRefusedError:
pass


class ThreadWorker(threading.Thread):
Expand All @@ -78,29 +107,7 @@ def __init__(self, queue, session, errors):
self.errors = errors

def run(self):
pickling_support.install()
while True:
try:
index = self.queue.get()
if index == 'stop':
self.queue.task_done()
break
except ConnectionRefusedError:
time.sleep(.1)
continue
item = self.session.items[index]
try:
run_test(self.session, item, None)
except BaseException:
import pickle
import sys

self.errors.put((self.name, pickle.dumps(sys.exc_info())))
finally:
try:
self.queue.task_done()
except ConnectionRefusedError:
pass
worker_run(self.name, self.queue, self.session, self.errors)


@pytest.mark.trylast
Expand Down

0 comments on commit c6f5d05

Please sign in to comment.