Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Caput with callback #98

Merged
merged 9 commits into from
Jul 29, 2022
4 changes: 4 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ Versioning <https://semver.org/spec/v2.0.0.html>`_.
Unreleased_
-----------

Added:

- `Caput with callback <../../pull/98>`_

Fixed:

- `Passing a custom asyncio event loop into the AsyncioDispatcher causes methods to never run <../../pull/96>`_
Expand Down
34 changes: 34 additions & 0 deletions docs/reference/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,23 @@ Test Facilities`_ documentation for more details of each function.
which don't change its value will be discarded. In particular this means
that such updates don't call `validate` or `on_update`.

.. _blocking:

`blocking`
~~~~~~~~~~

Only available on OUT records. When set to `True` the record will set the
``PACT`` field when processing is ongoing. This means that ``caput`` and
similar tools can correctly wait for processing to complete.

This flag defaults to `False`, to retain compatibility with previous
versions.

.. seealso::
`SetBlocking` for configuring a global default blocking value



For all of these functions any EPICS database field can be assigned a value by
passing it as a keyword argument for the corresponding field name (in upper
case) or by assigning to the corresponding field of the returned record object.
Expand Down Expand Up @@ -358,6 +375,23 @@ record creation function.
prevent the accidential creation of records with the currently set device
name.

.. function:: SetBlocking(blocking)

This can be used to globally set the default of the `blocking` flag, which
will apply to all records created after this point. This allows blocking to be
easily set/unset when creating groups of records.

Returns the previous value of the `blocking` flag, which enables code like this::

old_blocking = SetBlocking(new_blocking)
create_records()
SetBlocking(old_blocking)

This does not change the blocking value for any already created records.

.. seealso::
`blocking` for description of the flag


The following helper functions are useful when constructing links between
records.
Expand Down
11 changes: 9 additions & 2 deletions softioc/asyncio_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,19 @@ def aioJoin(worker=worker, loop=self.loop):
else:
self.loop = loop

def __call__(self, func, *args):
def __call__(
self,
func,
func_args=(),
completion = None,
completion_args=()):
async def async_wrapper():
try:
ret = func(*args)
ret = func(*func_args)
if inspect.isawaitable(ret):
await ret
if completion:
completion(*completion_args)
except Exception:
logging.exception("Exception when awaiting callback")
asyncio.run_coroutine_threadsafe(async_wrapper(), self.loop)
6 changes: 5 additions & 1 deletion softioc/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
LoadDbdFile(os.path.join(os.path.dirname(__file__), 'device.dbd'))

from . import device, pythonSoftIoc # noqa
# Re-export this so users only have to import the builder
from .device import SetBlocking # noqa

PythonDevice = pythonSoftIoc.PythonDevice()

Expand Down Expand Up @@ -301,5 +303,7 @@ def UnsetDevice():
'Action',
# Other builder support functions
'LoadDatabase',
'SetDeviceName', 'UnsetDevice'
'SetDeviceName', 'UnsetDevice',
# Device support functions
'SetBlocking'
]
23 changes: 23 additions & 0 deletions softioc/cothread_dispatcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@

class CothreadDispatcher:
def __init__(self):
"""A dispatcher for `cothread` based IOCs, suitable to be passed to
`softioc.iocInit`. """
# Import here to ensure we don't instantiate any of cothread's global
# state unless we have to
import cothread
# Create our own cothread callback queue so that our callbacks
# processing doesn't interfere with other callback processing.
self.__dispatcher = cothread.cothread._Callback()

def __call__(
self,
func,
func_args=(),
completion = None,
completion_args=()):
def wrapper():
func(*func_args)
if completion:
completion(*completion_args)
self.__dispatcher(wrapper)
42 changes: 39 additions & 3 deletions softioc/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,31 @@

from . import alarm
from . import fields
from .imports import dbLoadDatabase, recGblResetAlarms, db_put_field
from .imports import (
create_callback_capsule,
dbLoadDatabase,
signal_processing_complete,
recGblResetAlarms,
db_put_field,
)
from .device_core import DeviceSupportCore, RecordLookup


# This is set from softioc.iocInit
# dispatcher(func, *args) will queue a callback to happen
dispatcher = None

# Global blocking flag, used to mark asynchronous (False) or synchronous (True)
# processing modes for Out records.
# Default False to maintain behaviour from previous versions.
blocking = False

# Set the current global blocking flag, and return the previous value.
def SetBlocking(new_val):
global blocking
old_val = blocking
blocking = new_val
return old_val


# EPICS processing return codes
EPICS_OK = 0
Expand Down Expand Up @@ -142,6 +159,10 @@ def __init__(self, name, **kargs):
else:
self._value = None

self._blocking = kargs.pop('blocking', blocking)
if self._blocking:
self._callback = create_callback_capsule()

self.__super.__init__(name, **kargs)

def init_record(self, record):
Expand All @@ -162,9 +183,18 @@ def init_record(self, record):
recGblResetAlarms(record)
return self._epics_rc_

def __completion(self, record):
'''Signals that all on_update processing is finished'''
if self._blocking:
signal_processing_complete(record, self._callback)

def _process(self, record):
'''Processing suitable for output records. Performs immediate value
validation and asynchronous update notification.'''

if record.PACT:
return EPICS_OK

value = self._read_value(record)
if not self.__always_update and \
self._compare_values(value, self._value):
Expand All @@ -183,7 +213,13 @@ def _process(self, record):
self._value = value
record.UDF = 0
if self.__on_update and self.__enable_write:
dispatcher(self.__on_update, python_value)
record.PACT = self._blocking
dispatcher(
self.__on_update,
func_args=(python_value,),
completion = self.__completion,
completion_args=(record,))

return EPICS_OK


Expand Down
48 changes: 47 additions & 1 deletion softioc/extension.c
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@

/* Provide EPICS functions in Python format */
#define PY_SSIZE_T_CLEAN
#include <Python.h>
#include <string.h>

#define db_accessHFORdb_accessC // Needed to get correct DBF_ values
#include <dbAccess.h>
#include <dbFldTypes.h>
#include <callback.h>
#include <dbStaticLib.h>
#include <asTrapWrite.h>
#include <epicsVersion.h>
#include <dbChannel.h>
#include <asTrapWrite.h>
#include <asDbLib.h>


/* Reference stealing version of PyDict_SetItemString */
static void set_dict_item_steal(
PyObject *dict, const char *name, PyObject *py_value)
Expand Down Expand Up @@ -209,6 +211,46 @@ static PyObject *install_pv_logging(PyObject *self, PyObject *args)
Py_RETURN_NONE;
}

#define CAPSULE_NAME "ProcessDeviceSupportOut.callback"

static void capsule_destructor(PyObject *obj)
{
free(PyCapsule_GetPointer(obj, CAPSULE_NAME));
}


static PyObject *create_callback_capsule(PyObject *self, PyObject *args)
{
void *callback = malloc(sizeof(CALLBACK));
return PyCapsule_New(callback, CAPSULE_NAME, &capsule_destructor);
}

static PyObject *signal_processing_complete(PyObject *self, PyObject *args)
{
int priority;
dbCommon *record;
PyObject *callback_capsule;

if (!PyArg_ParseTuple(args, "inO", &priority, &record, &callback_capsule))
{
return NULL;
}

if (!PyCapsule_IsValid(callback_capsule, CAPSULE_NAME))
{
return PyErr_Format(
PyExc_TypeError,
"Given object was not a capsule with name \"%s\"",
CAPSULE_NAME);
}

CALLBACK *callback = PyCapsule_GetPointer(callback_capsule, CAPSULE_NAME);

callbackRequestProcessCallback(callback, priority, record);

Py_RETURN_NONE;
}

static struct PyMethodDef softioc_methods[] = {
{"get_DBF_values", get_DBF_values, METH_VARARGS,
"Get a map of DBF names to values"},
Expand All @@ -218,6 +260,10 @@ static struct PyMethodDef softioc_methods[] = {
"Put a database field to a value"},
{"install_pv_logging", install_pv_logging, METH_VARARGS,
"Install caput logging to stdout"},
{"signal_processing_complete", signal_processing_complete, METH_VARARGS,
"Inform EPICS that asynchronous record processing has completed"},
{"create_callback_capsule", create_callback_capsule, METH_VARARGS,
"Create a CALLBACK structure inside a PyCapsule"},
{NULL, NULL, 0, NULL} /* Sentinel */
};

Expand Down
12 changes: 12 additions & 0 deletions softioc/imports.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,16 @@ def install_pv_logging(acf_file):
'''Install pv logging'''
_extension.install_pv_logging(acf_file)

def create_callback_capsule():
return _extension.create_callback_capsule()

def signal_processing_complete(record, callback):
'''Signal that asynchronous record processing has completed'''
_extension.signal_processing_complete(
record.PRIO,
record.record.value,
callback)

def expect_success(status, function, args):
assert status == 0, 'Expected success'

Expand Down Expand Up @@ -94,6 +104,8 @@ def from_param(cls, value):

__all__ = [
'get_field_offsets',
'create_callback_capsule',
'signal_processing_complete',
'registryDeviceSupportAdd',
'IOSCANPVT', 'scanIoRequest', 'scanIoInit',
'dbLoadDatabase',
Expand Down
2 changes: 1 addition & 1 deletion softioc/pythonSoftIoc.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def __init__(self, builder, device, name, **fields):
# have to maintain this separately from the corresponding device list.
DeviceKeywords = [
'on_update', 'on_update_name', 'validate', 'always_update',
'initial_value', '_wf_nelm', '_wf_dtype']
'initial_value', '_wf_nelm', '_wf_dtype', 'blocking']
device_kargs = {}
for keyword in DeviceKeywords:
if keyword in fields:
Expand Down
6 changes: 2 additions & 4 deletions softioc/softioc.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from epicsdbbuilder.recordset import recordset

from . import imports, device
from . import cothread_dispatcher

__all__ = ['dbLoadDatabase', 'iocInit', 'interactive_ioc']

Expand All @@ -31,10 +32,7 @@ def iocInit(dispatcher=None):
'''
if dispatcher is None:
# Fallback to cothread
import cothread
# Create our own cothread callback queue so that our callbacks
# processing doesn't interfere with other callback processing.
dispatcher = cothread.cothread._Callback()
dispatcher = cothread_dispatcher.CothreadDispatcher()
# Set the dispatcher for record processing callbacks
device.dispatcher = dispatcher
imports.iocInit()
Expand Down
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def enable_code_coverage():
def select_and_recv(conn, expected_char = None):
"""Wait for the given Connection to have data to receive, and return it.
If a character is provided check its correct before returning it."""
# Must use cothread's select if cothread is prsent, otherwise we'd block
# Must use cothread's select if cothread is present, otherwise we'd block
# processing on all cothread processing. But we don't want to use it
# unless we have to, as importing cothread can cause issues with forking.
if "cothread" in sys.modules:
Expand Down
11 changes: 11 additions & 0 deletions tests/sim_asyncio_ioc.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,17 @@ async def callback(value):
# Create a record to set the alarm
t_ao = builder.aOut('ALARM', on_update=callback)

async def on_update_name_callback(value, name):
print(name, "value", value)

builder.longOut(
"NAME-CALLBACK",
initial_value = 3,
always_update=True,
on_update_name=on_update_name_callback,
blocking=True
)

# Run the IOC
builder.LoadDatabase()
softioc.iocInit(asyncio_dispatcher.AsyncioDispatcher())
Expand Down
3 changes: 3 additions & 0 deletions tests/test_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ async def test_asyncio_ioc(asyncio_ioc):

await caput(pre + ":ALARM", 3, wait=True)

await caput(pre + ":NAME-CALLBACK", 12, wait=True)

# Confirm the ALARM callback has completed
select_and_recv(conn, "C") # "Complete"

Expand Down Expand Up @@ -68,6 +70,7 @@ async def test_asyncio_ioc(asyncio_ioc):
assert "%s:ALARM.VAL 0 -> 3" % pre in out
assert 'on_update %s:AO : 3.0' % pre in out
assert 'async update 3.0 (23.45)' in out
assert "%s:NAME-CALLBACK value 12" % pre in out
assert 'Starting iocInit' in err
assert 'iocRun: All initialization complete' in err
except Exception:
Expand Down
Loading