Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AsyncioDispatcher cleanup tasks atexit #138

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/code.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ jobs:
fail-fast: false
matrix:
os: [ubuntu-latest, windows-latest, macos-latest]
python: [cp36, cp37, cp38, cp39, cp310]
python: [cp37, cp38, cp39, cp310]

include:
# Put coverage and results files in the project directory for mac
Expand Down Expand Up @@ -147,7 +147,7 @@ jobs:
fail-fast: false
matrix:
os: [ubuntu-latest, windows-latest, macos-latest]
python: [cp36, cp37, cp38, cp39, cp310]
python: [cp37, cp38, cp39, cp310]

runs-on: ${{ matrix.os }}

Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ Versioning <https://semver.org/spec/v2.0.0.html>`_.
Unreleased_
-----------

Changed:

- `AsyncioDispatcher cleanup tasks atexit <../../pull/138>`_

Fixed:

- `Fix conversion of ctypes pointers passed to C extension <../../pull/154>`_
Expand Down
3 changes: 1 addition & 2 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,14 @@ long_description_content_type = text/x-rst
classifiers =
Development Status :: 5 - Production/Stable
License :: OSI Approved :: Apache Software License
Programming Language :: Python :: 3.6
Programming Language :: Python :: 3.7
Programming Language :: Python :: 3.8
Programming Language :: Python :: 3.9
Programming Language :: Python :: 3.10

[options]
packages = softioc
python_requires = >=3.6
python_requires = >=3.7

[options.entry_points]
# Include a command line script
Expand Down
53 changes: 44 additions & 9 deletions softioc/asyncio_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,33 +5,62 @@
import atexit

class AsyncioDispatcher:
def __init__(self, loop=None):
def __init__(self, loop=None, debug=False):
"""A dispatcher for `asyncio` based IOCs, suitable to be passed to
`softioc.iocInit`. Means that `on_update` callback functions can be
async.

If a ``loop`` is provided it must already be running. Otherwise a new
Event Loop will be created and run in a dedicated thread.
``debug`` is passed through to ``asyncio.run()``.

For a clean exit, call ``softioc.interactive_ioc(..., call_exit=False)``
"""
if loop is None:
# will wait until worker is executing the new loop
started = threading.Event()
# Make one and run it in a background thread
self.loop = asyncio.new_event_loop()
worker = threading.Thread(target=self.loop.run_forever)
self.__worker = threading.Thread(
target=asyncio.run,
args=(self.__inloop(started),),
kwargs={'debug': debug})
# Explicitly manage worker thread as part of interpreter shutdown.
# Otherwise threading module will deadlock trying to join()
# before our atexit hook runs, while the loop is still running.
worker.daemon = True
self.__worker.daemon = True

self.__worker.start()
started.wait()

self.__atexit = atexit.register(self.__shutdown)

assert self.loop is not None and self.loop.is_running()

@atexit.register
def aioJoin(worker=worker, loop=self.loop):
loop.call_soon_threadsafe(loop.stop)
worker.join()
worker.start()
elif not loop.is_running():
raise ValueError("Provided asyncio event loop is not running")
else:
self.loop = loop

def close(self):
if self.__atexit is not None:
atexit.unregister(self.__atexit)
self.__atexit = None
AlexanderWells-diamond marked this conversation as resolved.
Show resolved Hide resolved

self.__shutdown()

async def __inloop(self, started):
self.loop = asyncio.get_running_loop()
self.__interrupt = asyncio.Event()
started.set()
del started
coretl marked this conversation as resolved.
Show resolved Hide resolved
await self.__interrupt.wait()

def __shutdown(self):
if self.__worker is not None:
self.loop.call_soon_threadsafe(self.__interrupt.set)
self.__worker.join()
self.__worker = None

def __call__(
self,
func,
Expand All @@ -48,3 +77,9 @@ async def async_wrapper():
except Exception:
logging.exception("Exception when running dispatched callback")
asyncio.run_coroutine_threadsafe(async_wrapper(), self.loop)

def __enter__(self):
return self
AlexanderWells-diamond marked this conversation as resolved.
Show resolved Hide resolved

def __exit__(self, A, B, C):
self.close()
77 changes: 76 additions & 1 deletion tests/test_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,14 @@

from multiprocessing.connection import Listener

from conftest import requires_cothread, ADDRESS, select_and_recv
from conftest import (
ADDRESS, select_and_recv,
log, get_multiprocessing_context, TIMEOUT,
create_random_prefix
)

from softioc.asyncio_dispatcher import AsyncioDispatcher
from softioc import builder, softioc

@pytest.mark.asyncio
async def test_asyncio_ioc(asyncio_ioc):
Expand Down Expand Up @@ -131,3 +136,73 @@ def test_asyncio_dispatcher_event_loop():
event_loop = asyncio.get_event_loop()
with pytest.raises(ValueError):
AsyncioDispatcher(loop=event_loop)

def asyncio_dispatcher_test_func(device_name, child_conn):

log("CHILD: Child started")

builder.SetDeviceName(device_name)


with AsyncioDispatcher() as dispatcher:
# Create some records
ai = builder.aIn('AI', initial_value=5)
builder.aOut('AO', initial_value=12.45, always_update=True,
on_update=lambda v: ai.set(v))

# Boilerplate get the IOC started
builder.LoadDatabase()
softioc.iocInit(dispatcher)

# Start processes required to be run after iocInit
async def update():
while True:
ai.set(ai.get() + 1)
await asyncio.sleep(0.01)

dispatcher(update)

log("CHILD: Sending Ready")
child_conn.send("R")

# Keep process alive while main thread runs CAGET
if child_conn.poll(TIMEOUT):
val = child_conn.recv()
assert val == "D", "Did not receive expected Done character"


async def test_asyncio_dispatcher_as_context_manager():
"""Test that the asyncio dispatcher can be used as a context manager"""
ctx = get_multiprocessing_context()
parent_conn, child_conn = ctx.Pipe()

device_name = create_random_prefix()

process = ctx.Process(
target=asyncio_dispatcher_test_func,
args=(device_name, child_conn),
)

process.start()

log("PARENT: Child started, waiting for R command")

from aioca import caget
try:
# Wait for message that IOC has started
select_and_recv(parent_conn, "R")

# ao_val = await caget(device_name + ":AO")
ao_val = await caget(device_name + ":AO")
assert ao_val == 12.45

# Confirm the value of the AI record is increasing
ai_val_1 = await caget(device_name + ":AI")
await asyncio.sleep(1)
ai_val_2 = await caget(device_name + ":AI")
assert ai_val_2 > ai_val_1

finally:
parent_conn.send("D") # "Done"
process.join(timeout=TIMEOUT)
assert process.exitcode == 0 # clean exit
Loading