diff --git a/.github/workflows/code.yml b/.github/workflows/code.yml index 7f693a9c..68d590bf 100644 --- a/.github/workflows/code.yml +++ b/.github/workflows/code.yml @@ -54,7 +54,7 @@ jobs: fail-fast: false matrix: os: [ubuntu-latest, windows-latest, macos-latest] - python: [cp36, cp37, cp38, cp39, cp310] + python: [cp37, cp38, cp39, cp310] include: # Put coverage and results files in the project directory for mac @@ -147,7 +147,7 @@ jobs: fail-fast: false matrix: os: [ubuntu-latest, windows-latest, macos-latest] - python: [cp36, cp37, cp38, cp39, cp310] + python: [cp37, cp38, cp39, cp310] runs-on: ${{ matrix.os }} diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 0a52e959..199a4011 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -10,6 +10,10 @@ Versioning `_. Unreleased_ ----------- +Changed: + +- `AsyncioDispatcher cleanup tasks atexit <../../pull/138>`_ + Fixed: - `Fix conversion of ctypes pointers passed to C extension <../../pull/154>`_ diff --git a/setup.cfg b/setup.cfg index 6e18f1aa..5bf9577e 100644 --- a/setup.cfg +++ b/setup.cfg @@ -10,7 +10,6 @@ long_description_content_type = text/x-rst classifiers = Development Status :: 5 - Production/Stable License :: OSI Approved :: Apache Software License - Programming Language :: Python :: 3.6 Programming Language :: Python :: 3.7 Programming Language :: Python :: 3.8 Programming Language :: Python :: 3.9 @@ -18,7 +17,7 @@ classifiers = [options] packages = softioc -python_requires = >=3.6 +python_requires = >=3.7 [options.entry_points] # Include a command line script diff --git a/softioc/asyncio_dispatcher.py b/softioc/asyncio_dispatcher.py index b09f502f..eb186771 100644 --- a/softioc/asyncio_dispatcher.py +++ b/softioc/asyncio_dispatcher.py @@ -5,33 +5,62 @@ import atexit class AsyncioDispatcher: - def __init__(self, loop=None): + def __init__(self, loop=None, debug=False): """A dispatcher for `asyncio` based IOCs, suitable to be passed to `softioc.iocInit`. Means that `on_update` callback functions can be async. If a ``loop`` is provided it must already be running. Otherwise a new Event Loop will be created and run in a dedicated thread. + ``debug`` is passed through to ``asyncio.run()``. + + For a clean exit, call ``softioc.interactive_ioc(..., call_exit=False)`` """ if loop is None: + # will wait until worker is executing the new loop + started = threading.Event() # Make one and run it in a background thread - self.loop = asyncio.new_event_loop() - worker = threading.Thread(target=self.loop.run_forever) + self.__worker = threading.Thread( + target=asyncio.run, + args=(self.__inloop(started),), + kwargs={'debug': debug}) # Explicitly manage worker thread as part of interpreter shutdown. # Otherwise threading module will deadlock trying to join() # before our atexit hook runs, while the loop is still running. - worker.daemon = True + self.__worker.daemon = True + + self.__worker.start() + started.wait() + + self.__atexit = atexit.register(self.__shutdown) + + assert self.loop is not None and self.loop.is_running() - @atexit.register - def aioJoin(worker=worker, loop=self.loop): - loop.call_soon_threadsafe(loop.stop) - worker.join() - worker.start() elif not loop.is_running(): raise ValueError("Provided asyncio event loop is not running") else: self.loop = loop + def close(self): + if self.__atexit is not None: + atexit.unregister(self.__atexit) + self.__atexit = None + + self.__shutdown() + + async def __inloop(self, started): + self.loop = asyncio.get_running_loop() + self.__interrupt = asyncio.Event() + started.set() + del started + await self.__interrupt.wait() + + def __shutdown(self): + if self.__worker is not None: + self.loop.call_soon_threadsafe(self.__interrupt.set) + self.__worker.join() + self.__worker = None + def __call__( self, func, @@ -48,3 +77,9 @@ async def async_wrapper(): except Exception: logging.exception("Exception when running dispatched callback") asyncio.run_coroutine_threadsafe(async_wrapper(), self.loop) + + def __enter__(self): + return self + + def __exit__(self, A, B, C): + self.close() diff --git a/tests/test_asyncio.py b/tests/test_asyncio.py index 326ee1ff..9de997ce 100644 --- a/tests/test_asyncio.py +++ b/tests/test_asyncio.py @@ -4,9 +4,14 @@ from multiprocessing.connection import Listener -from conftest import requires_cothread, ADDRESS, select_and_recv +from conftest import ( + ADDRESS, select_and_recv, + log, get_multiprocessing_context, TIMEOUT, + create_random_prefix + ) from softioc.asyncio_dispatcher import AsyncioDispatcher +from softioc import builder, softioc @pytest.mark.asyncio async def test_asyncio_ioc(asyncio_ioc): @@ -131,3 +136,73 @@ def test_asyncio_dispatcher_event_loop(): event_loop = asyncio.get_event_loop() with pytest.raises(ValueError): AsyncioDispatcher(loop=event_loop) + +def asyncio_dispatcher_test_func(device_name, child_conn): + + log("CHILD: Child started") + + builder.SetDeviceName(device_name) + + + with AsyncioDispatcher() as dispatcher: + # Create some records + ai = builder.aIn('AI', initial_value=5) + builder.aOut('AO', initial_value=12.45, always_update=True, + on_update=lambda v: ai.set(v)) + + # Boilerplate get the IOC started + builder.LoadDatabase() + softioc.iocInit(dispatcher) + + # Start processes required to be run after iocInit + async def update(): + while True: + ai.set(ai.get() + 1) + await asyncio.sleep(0.01) + + dispatcher(update) + + log("CHILD: Sending Ready") + child_conn.send("R") + + # Keep process alive while main thread runs CAGET + if child_conn.poll(TIMEOUT): + val = child_conn.recv() + assert val == "D", "Did not receive expected Done character" + + +async def test_asyncio_dispatcher_as_context_manager(): + """Test that the asyncio dispatcher can be used as a context manager""" + ctx = get_multiprocessing_context() + parent_conn, child_conn = ctx.Pipe() + + device_name = create_random_prefix() + + process = ctx.Process( + target=asyncio_dispatcher_test_func, + args=(device_name, child_conn), + ) + + process.start() + + log("PARENT: Child started, waiting for R command") + + from aioca import caget + try: + # Wait for message that IOC has started + select_and_recv(parent_conn, "R") + + # ao_val = await caget(device_name + ":AO") + ao_val = await caget(device_name + ":AO") + assert ao_val == 12.45 + + # Confirm the value of the AI record is increasing + ai_val_1 = await caget(device_name + ":AI") + await asyncio.sleep(1) + ai_val_2 = await caget(device_name + ":AI") + assert ai_val_2 > ai_val_1 + + finally: + parent_conn.send("D") # "Done" + process.join(timeout=TIMEOUT) + assert process.exitcode == 0 # clean exit