From 137b3955f39b4859118c53e75d7680e4c29c69da Mon Sep 17 00:00:00 2001 From: Alex Date: Thu, 16 Jul 2020 00:54:52 +0200 Subject: [PATCH] Add unit test that proofs race condition error; credits to @Novakov --- tests/test_thread_safety.py | 91 +++++++++++++++++++++++++++++++++++++ 1 file changed, 91 insertions(+) diff --git a/tests/test_thread_safety.py b/tests/test_thread_safety.py index ed55a84..6e10413 100644 --- a/tests/test_thread_safety.py +++ b/tests/test_thread_safety.py @@ -1,5 +1,8 @@ +import pytest + from promise import Promise from promise.dataloader import DataLoader +from promise.utils import PY2 import threading @@ -113,3 +116,91 @@ def do(): assert assert_object['is_same_thread_1'] assert assert_object['is_same_thread_2'] + + +@pytest.mark.skipif(PY2, 'python2 does not support setswitchinterval') +@pytest.mark.parametrize('num_threads', [1]) +@pytest.mark.parametrize('count', [1000]) +@pytest.mark.parametrize('resolution', ['resolve', 'reject']) +def test_with_process_loop(num_threads, count, resolution): + """ + Start a Promise in one thread, but resolve it in another. + """ + import queue + from threading import Thread, Barrier + from traceback import print_exc, format_exc + from sys import setswitchinterval + + _force_stop = False + items = queue.Queue() + barrier = Barrier(num_threads) + + asserts = [] + timeouts = [] + + def event_loop(): + stop_count = num_threads + while True: + item = items.get() + if item[0] == 'STOP': + stop_count -= 1 + if stop_count == 0: + break + if item[0] == 'ABORT': + break + if item[0] == 'ITEM': + (_, resolve, reject, i) = item + if resolution == 'reject': + reject(ArithmeticError(i)) + elif resolution == 'resolve': + resolve(i) + else: + raise ValueError('unsupported resolution: {}'.format(resolution)) + + def worker(): + nonlocal _force_stop + barrier.wait() + # Force fast switching of threads, this is NOT used in real world case. However without this + # I was unable to reproduce the issue. + setswitchinterval(0.000001) + for i in range(0, count): + if _force_stop: + break + + def do(resolve, reject): + items.put(('ITEM', resolve, reject, i)) + + p = Promise(do) + try: + p.get(timeout=1) + except ArithmeticError as e: + if resolution != 'reject': + pytest.fail('unexpected failure on success resolution: {}}'.format(e)) + except AssertionError as e: + print("ASSERT", e) + print_exc() + _force_stop = True + items.put(('ABORT',)) + asserts.append(format_exc()) + except Exception as e: + print("Timeout", e) + print_exc() + _force_stop = True + items.put(('ABORT',)) + timeouts.append(format_exc()) + + items.put(('STOP',)) + + loop_thread = Thread(target=event_loop) + loop_thread.start() + + worker_threads = [Thread(target=worker) for i in range(0, num_threads)] + for t in worker_threads: + t.start() + + loop_thread.join() + for t in worker_threads: + t.join() + + assert asserts == [] + assert timeouts == []