diff --git a/pyproject.toml b/pyproject.toml index 4adf1d4c..6f6427da 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,2 +1,2 @@ [build-system] -requires = ["setuptools", "setuptools_dso>=1.3a1", "wheel", "numpy", "Cython>=0.20", "epicscorelibs>=7.0.3.99.2.0a1", "pvxslibs"] +requires = ["setuptools", "setuptools_dso>=1.3a1", "wheel", "numpy", "Cython>=0.20", "epicscorelibs>=7.0.3.99.2.0a1", "pvxslibs>=1.1.4a1"] diff --git a/setup.py b/setup.py index d2e961ff..f2360a2b 100755 --- a/setup.py +++ b/setup.py @@ -44,6 +44,7 @@ # are all c++, and MSVC doesn't allow extern "C" to # return c++ types. cppflags = get_config_var('CPPFLAGS') + [('__PYX_EXTERN_C','extern')] +cppflags += [('PVXS_ENABLE_EXPERT_API', None)] exts = cythonize([ Extension( @@ -55,6 +56,7 @@ "src/pvxs_source.cpp", "src/pvxs_type.cpp", "src/pvxs_value.cpp", + "src/notify.cpp", ], include_dirs = get_numpy_include_dirs()+[epicscorelibs.path.include_path, pvxslibs.path.include_path, 'src', 'src/p4p'], define_macros = cppflags + [ diff --git a/src/Makefile b/src/Makefile index a599ec7c..3b9520d7 100644 --- a/src/Makefile +++ b/src/Makefile @@ -6,6 +6,7 @@ include $(TOP)/configure/CONFIG_PY # are all c++, and MSVC doesn't allow extern "C" to # return c++ types. USR_CPPFLAGS += -D__PYX_EXTERN_C=extern +USR_CPPFLAGS += -DPVXS_ENABLE_EXPERT_API # place .so in subdirectory INSTALL_SHRLIB = $(PY_INSTALL_DIR)/p4p @@ -21,6 +22,7 @@ _p4p_SRCS += pvxs_value.cpp _p4p_SRCS += pvxs_sharedpv.cpp _p4p_SRCS += pvxs_source.cpp _p4p_SRCS += pvxs_client.cpp +_p4p_SRCS += notify.cpp _p4p_CPPFLAGS += -DPVXS_ENABLE_EXPERT_API diff --git a/src/notify.cpp b/src/notify.cpp new file mode 100644 index 00000000..00fd15f1 --- /dev/null +++ b/src/notify.cpp @@ -0,0 +1,280 @@ + +#include +#include +#include +#include +#include +#include + +#include + +#include "p4p.h" + +DEFINE_LOGGER(logme, "p4p.notify"); + +// special key to interrupt handle() +static +constexpr uint32_t interruptor(std::numeric_limits::max()); + +namespace p4p { + +Notifier::~Notifier() {} + +struct NotifierImpl : public Notifier, public std::enable_shared_from_this { + const std::weak_ptr weak_hub; + + // guarded by Pvt::lock + bool ready = false; + + explicit + NotifierImpl(std::weak_ptr weak_hub) + :weak_hub(weak_hub) + {} + virtual ~NotifierImpl(); + virtual void notify(); +}; + +struct NotificationHub::Pvt : public std::enable_shared_from_this { + // const after create() + const SOCKET tx; + + epicsMutex lock; + + // guarded by lock + SOCKET rx; + std::list> pending; + std::list trash; + + // keep with Hub to ensure destruction from python side. + // function may capture python objects. + std::map> actions; + + bool interrupted = false; + + Pvt(SOCKET tx, SOCKET rx); + ~Pvt(); + void poke() const noexcept; +}; + +NotifierImpl::~NotifierImpl() +{ + if(auto hub = weak_hub.lock()) { + bool wake = false; + { + Guard G(hub->lock); + wake = hub->pending.empty() && hub->trash.empty(); + hub->trash.push_back(this); + } + if(wake) + hub->poke(); + } +} + +void NotifierImpl::notify() +{ + if(auto hub = weak_hub.lock()) { + bool wake = false; + { + Guard G(hub->lock); + if(!ready) { + wake = hub->pending.empty() && hub->trash.empty(); + hub->pending.push_back(shared_from_this()); + ready = true; + } + } + if(wake) + hub->poke(); + } +} + +NotificationHub NotificationHub::create(bool blocking) +{ + SOCKET s[2]; + compat_socketpair(s); + + NotificationHub ret; + ret.pvt = std::make_shared(s[0], s[1]); + // tx side always blocking. Only need to send() when pending list becomes not empty + if(!blocking) { + compat_make_socket_nonblocking(ret.pvt->rx); + } + return ret; +} + +void NotificationHub::close() +{ + if(pvt){ + Guard G(pvt->lock); + + if(pvt->rx!=INVALID_SOCKET) { + epicsSocketDestroy(pvt->rx); + pvt->rx = INVALID_SOCKET; + } + pvt->pending.clear(); + pvt->actions.clear(); + } + pvt.reset(); +} + +SOCKET NotificationHub::fileno() const +{ + if(!pvt) + throw std::invalid_argument("NULL NotificationHub"); + Guard G(pvt->lock); + return pvt->rx; +} + +std::shared_ptr +NotificationHub::add(std::function&& fn) +{ + if(!pvt) + throw std::invalid_argument("NULL NotificationHub"); + + Guard G(pvt->lock); + + auto ret(std::make_shared(pvt->shared_from_this())); + + pvt->actions.emplace(ret.get(), std::move(fn)); + + return ret; +} + +std::shared_ptr +NotificationHub::add(PyObject *raw) +{ + auto handler(PyRef::borrow(raw)); + auto fn = [handler]() { + PyLock L; + auto ret(PyRef::allownull(PyObject_CallFunction(handler.obj, ""))); + if(!ret.obj) { + PySys_WriteStderr("Unhandled Exception %s:%d\n", __FILE__, __LINE__); + PyErr_Print(); + PyErr_Clear(); + } + }; + + return add(fn); +} + +void NotificationHub::handle() const +{ + if(!pvt) + throw std::invalid_argument("NULL NotificationHub"); + + Guard G(pvt->lock); + SOCKET rx = pvt->rx; + + while(!pvt->interrupted) { + constexpr size_t max_batch_size = 16u; + char buf[max_batch_size]; + + int ret; + { + UnGuard U(G); + ret = recv(rx, buf, sizeof(buf), 0); + } + if(ret < 0) { + auto err = SOCKERRNO; + if(err == SOCK_EWOULDBLOCK || err == EAGAIN || err == SOCK_EINTR) { + return; // try again later + + } else { + std::ostringstream msg; + msg<<__func__<<" Socket error "<interrupted = false; +} + +void NotificationHub::poll() const +{ + Guard G(pvt->lock); + + // take ownership of TODO list now so that any concurrent additions + // while unlocked will provoke a poke() + auto trash(std::move(pvt->trash)); + auto pending(std::move(pvt->pending)); + + for(auto notifee : trash) { + auto it(pvt->actions.find(notifee)); + if(it!=pvt->actions.end()) { + auto act(std::move(it->second)); + pvt->actions.erase(it); + + UnGuard U(G); + + act = nullptr; + } + } + + for(auto W : pending) { + if(auto notifee = W.lock()) { + if(!notifee->ready) + continue; + + auto it(pvt->actions.find(notifee.get())); + if(it==pvt->actions.end()) + continue; + + notifee->ready = false; + + try { + UnGuard U(G); + (it->second)(); + }catch(std::exception& e){ + log_err_printf(logme, "Unhandled exception in callback %s: %s", + it->second.target_type().name(), + e.what()); + } + } + } +} + +void +NotificationHub::interrupt() const noexcept +{ + if(pvt) { + { + Guard G(pvt->lock); + if(pvt->interrupted) + return; + pvt->interrupted = true; + } + char b = '!'; + auto ret = send(pvt->tx, &b, sizeof(b), 0); + if(ret!=sizeof(b)) + log_warn_printf(logme, "%s unable to wakeup: %d,%d", + __func__, (int)ret, SOCKERRNO); + } + +} + +NotificationHub::Pvt::Pvt(SOCKET tx, SOCKET rx) + :tx(tx) + ,rx(rx) +{} + +NotificationHub::Pvt::~Pvt() { + (void)epicsSocketDestroy(tx); + if(rx!=INVALID_SOCKET) + (void)epicsSocketDestroy(rx); +} + +void NotificationHub::Pvt::poke() const noexcept +{ + char b = '!'; + auto ret = send(tx, &b, sizeof(b), 0); + if(ret!=sizeof(b)) + log_warn_printf(logme, "%s unable to wakeup: %d,%d", + __func__, (int)ret, SOCKERRNO); +} + +} // namespace p4p diff --git a/src/p4p.h b/src/p4p.h index b48923eb..68d32e48 100644 --- a/src/p4p.h +++ b/src/p4p.h @@ -3,7 +3,9 @@ #include #include +#include +#include #include #include @@ -52,6 +54,8 @@ using namespace pvxs; typedef epicsGuard Guard; typedef epicsGuardRelease UnGuard; +struct NotificationHub; + struct SB { std::ostringstream strm; operator std::string() { return strm.str(); } @@ -193,9 +197,32 @@ void opBuilder(Builder& builder, PyObject *handler) { builder.build(opBuilder(handler)); } void opEvent(client::MonitorBuilder& builder, PyObject *handler); +void opEventHub(NotificationHub& hub, client::MonitorBuilder& builder, PyObject *handler); PyObject* monPop(const std::shared_ptr& mon); +/******* notify *******/ + +struct Notifier { + virtual ~Notifier(); + virtual void notify() =0; +}; + +struct NotificationHub { + static + NotificationHub create(bool blocking); + void close(); + SOCKET fileno() const; + std::shared_ptr add(std::function&& fn); + std::shared_ptr add(PyObject *handler); + void handle() const; + void poll() const; + void interrupt() const noexcept; + struct Pvt; +private: + std::shared_ptr pvt; +}; + /******* odometer (testing tool) *******/ std::shared_ptr makeOdometer(const std::string& name); diff --git a/src/p4p/_p4p.pyx b/src/p4p/_p4p.pyx index 42d2f9aa..37f0b3b9 100644 --- a/src/p4p/_p4p.pyx +++ b/src/p4p/_p4p.pyx @@ -26,6 +26,10 @@ from pvxs cimport sharedpv from weakref import WeakSet +cdef extern from "": + struct SOCKET: # osiSock.h + pass + cdef extern from "" namespace "p4p": # p4p.h redefines/overrides some definitions from Python.h (eg. PyMODINIT_FUNC) # it also (re)defines macros effecting numpy/arrayobject.h @@ -56,8 +60,23 @@ cdef extern from "" namespace "p4p": void opHandler[Builder](Builder& builder, object handler) void opBuilder[Builder](Builder& builder, object handler) void opEvent(client.MonitorBuilder& builder, object handler) + void opEventHub(NotificationHub& hub, client.MonitorBuilder& builder, object handler) object monPop(const shared_ptr[client.Subscription]& mon) with gil + # notify.cpp + cdef cppclass Notifier: + void notify() except+ + + cdef cppclass NotificationHub: + @staticmethod + NotificationHub create(bool blocking) nogil except+ + void close() nogil except+ + SOCKET fileno() nogil + shared_ptr[Notifier] add(object) nogil except+ + void handle() nogil except+ + void poll() nogil except+ + void interrupt() nogil const + cimport numpy # must cimport after p4p.h is included numpy.import_array() @@ -99,6 +118,38 @@ def listRefs(): def _forceLazy(): pass +cdef class Hub: + """ Mux. notification of updates from multiple Subscriptions. + """ + cdef NotificationHub nh + cdef bool blocking + def __init__(self, bool blocking=True): + self.nh = NotificationHub.create(blocking) + self.blocking = blocking + + def fileno(self): + """Handle of the underlying socket used to queue notifications. + Call handle() when readable. + """ + return self.nh.fileno() + + def handle(self): + """Process any pending notifications. + If created with blocking=True, wait for further notifications or interrupt(). + If blocking=False, returns when none remain pending. + """ + with nogil: + if self.blocking: + self.nh.handle() + else: + self.nh.poll() + + def interrupt(self): + """When created with blocking=True, cause handle() to return after issueing + any pending notifications. + """ + self.nh.interrupt() + ############### data # py object to hold ownership @@ -557,20 +608,28 @@ cdef class ClientMonitor: cdef shared_ptr[client.Subscription] sub cdef readonly object handler cdef object __weakref__ + cdef readonly bool notify_disconnect - def __init__(self, ClientProvider ctxt, basestring name, handler=None, _Value pvRequest=None): + def __init__(self, ClientProvider ctxt, basestring name, handler=None, + Hub hub=None, + _Value pvRequest=None, bool notify_disconnect=True): cdef string pvname = name.encode() cdef client.MonitorBuilder builder + cdef bool maskDiscon = not notify_disconnect if not ctxt.ctxt: raise RuntimeError("Context closed") self.handler = handler + self.notify_disconnect = notify_disconnect builder = ctxt.ctxt.monitor(pvname) \ .maskConnected(True) \ - .maskDisconnected(False) - opEvent(builder, handler) + .maskDisconnected(maskDiscon) + if hub is None: + opEvent(builder, handler) + else: + opEventHub(hub.nh, builder, handler) if pvRequest is not None: builder.rawRequest(pvRequest.val) self.sub = builder.exec_() @@ -594,6 +653,21 @@ cdef class ClientMonitor: if sub: return monPop(sub) # will unlock/relock GIL + def stats(self, reset=False): + cdef client.SubscriptionStat info + cdef shared_ptr[client.Subscription] sub = self.sub + cdef bool breset = reset + if sub: + with nogil: + sub.get().stats(info, breset) + return { + 'nQueue': info.nQueue, + 'nSrvSquash': info.nSrvSquash, + 'nCliSquash': info.nCliSquash, + 'maxQueue': info.maxQueue, + 'limitQueue': info.limitQueue, + } + all_providers = WeakSet() cdef class ClientProvider: diff --git a/src/p4p/client/Qt.py b/src/p4p/client/Qt.py index 625f900f..c5b766a9 100644 --- a/src/p4p/client/Qt.py +++ b/src/p4p/client/Qt.py @@ -102,6 +102,9 @@ def _add(self, slot, limitHz=10.0): # schedule to receive initial update later (avoids recursion) QCoreApplication.postEvent(self, CBEvent(slot)) + def stats(self): + return self._op.stats() + def _event(self): _log.debug('event1 %s', self.name) # called on PVA worker thread diff --git a/src/p4p/client/asyncio.py b/src/p4p/client/asyncio.py index e6d1d9ab..38eb1c21 100644 --- a/src/p4p/client/asyncio.py +++ b/src/p4p/client/asyncio.py @@ -3,6 +3,7 @@ _log = logging.getLogger(__name__) import asyncio +import socket from functools import partial, wraps @@ -120,6 +121,39 @@ async def exec(): def __init__(self, provider='pva', conf=None, useenv=True, nt=None, unwrap=None): super(Context, self).__init__(provider, conf=conf, useenv=useenv, nt=nt, unwrap=unwrap) + self._hub = None + self._hub_reader = None + + async def __hub_read(self): + W = None + try: + sock = socket.fromfd(self._hub.fileno(), socket.AF_INET, socket.SOCK_STREAM) # dup()s + R, W = await asyncio.open_connection(sock=sock) + while True: + await R.read(1024) # ignore what is read. Only meaning is to wakeup and poll() + self._hub.handle() + except asyncio.CancelledError: + pass + except: + _log.exception("Error while processing Notifications") + raise + finally: + if W is not None: + W.close() + + def __hub(self): + H = self._hub + if H is None: + self._hub = H = raw.Hub(blocking=False) + self._hub_reader = create_task(self.__hub_read()) + return H + + def close(self): + if self._hub is not None: + self._hub_reader.cancel() + self._hub = self._hub_reader = None + super(Context, self).close() + async def get(self, name, request=None): """Fetch current value of some number of PVs. @@ -293,9 +327,10 @@ def monitor(self, name, cb, request=None, notify_disconnect=False) -> "Subscript """ assert asyncio.iscoroutinefunction(cb), "monitor callback must be coroutine" R = Subscription(name, cb, notify_disconnect=notify_disconnect) - cb = partial(get_running_loop().call_soon_threadsafe, R._E.set) + cb = R._E.set - R._S = super(Context, self).monitor(name, cb, request) + R._S = super(Context, self).monitor(name, cb, request, notify_disconnect=notify_disconnect, + hub=self.__hub()) return R @@ -345,9 +380,12 @@ async def wait_closed(self): assert self._S is None, "Not close()'d" await self._T + def stats(self): + return self._S.stats() + async def _handle(self): if self._notify_disconnect: - await self._cb(Disconnected()) # all subscriptions are inittially disconnected + await self._cb(Disconnected()) # all subscriptions are initially disconnected E = None try: diff --git a/src/p4p/client/cothread.py b/src/p4p/client/cothread.py index 7aeeb79b..dfc4544c 100644 --- a/src/p4p/client/cothread.py +++ b/src/p4p/client/cothread.py @@ -267,6 +267,9 @@ def empty(self): 'Is data pending in event queue?' return self._S is None or self._S.empty() + def stats(self): + return self._S.stats() + def _event(self): if self._S is not None: self._E.Signal() diff --git a/src/p4p/client/raw.py b/src/p4p/client/raw.py index b3767287..328ce82f 100644 --- a/src/p4p/client/raw.py +++ b/src/p4p/client/raw.py @@ -13,7 +13,7 @@ from queue import Queue, Full, Empty from .. import _p4p -from .._p4p import Cancelled, Disconnected, Finished, RemoteError +from .._p4p import Cancelled, Disconnected, Finished, RemoteError, Hub from ..wrapper import Value, Type from ..nt import buildNT @@ -25,6 +25,7 @@ 'Subscription', 'Context', 'RemoteError', + 'Hub', ) diff --git a/src/p4p/client/thread.py b/src/p4p/client/thread.py index e70714be..2b26ef32 100644 --- a/src/p4p/client/thread.py +++ b/src/p4p/client/thread.py @@ -54,7 +54,6 @@ class Subscription(object): def __init__(self, ctxt, name, cb, notify_disconnect=False, queue=None): self.name, self._S, self._cb = name, None, cb - self._notify_disconnect = notify_disconnect self._Q = queue or ctxt._Q or _defaultWorkQueue() if notify_disconnect: # all subscriptions are inittially disconnected @@ -84,6 +83,9 @@ def empty(self): 'Is data pending in event queue?' return self._S is None or self._S.empty() + def stats(self): + return self._S.stats() + def _event(self): try: assert self._S is not None, self._S @@ -105,7 +107,7 @@ def _handle(self): elif isinstance(E, Exception): _log.debug('Subscription notify for %s with %s', self.name, E) - if self._notify_disconnect: + if S.notify_disconnect: self._cb(E) elif isinstance(E, RemoteError): @@ -173,6 +175,11 @@ def __init__(self, provider='pva', conf=None, useenv=True, nt=None, unwrap=None, self._Q = queue + self._hub = raw.Hub(blocking=True) + self._hub_worker = threading.Thread(target=self._hub.handle) + self._hub_worker.daemon=True + self._hub_worker.start() + def _channel(self, name): with self._channel_lock: return super(Context, self)._channel(name) @@ -198,6 +205,12 @@ def _queue(self): def close(self): """Force close all Channels and cancel all Operations """ + if self._hub is not None: + _log.debug('Join Hub') + self._hub.interrupt() + self._hub_worker.join() + self._hub = self._hub_worker = None + _log.debug('Joined Hub') if self._Q is not None: for T in self._T: self._Q.interrupt() @@ -432,5 +445,6 @@ def monitor(self, name, cb, request=None, notify_disconnect=False, queue=None): """ R = Subscription(self, name, cb, notify_disconnect=notify_disconnect, queue=queue) - R._S = super(Context, self).monitor(name, R._event, request) + R._S = super(Context, self).monitor(name, R._event, request, notify_disconnect=notify_disconnect, + hub=self._hub) return R diff --git a/src/p4p/pvxs/client.pxd b/src/p4p/pvxs/client.pxd index 3ee5a8c1..0e9430fb 100644 --- a/src/p4p/pvxs/client.pxd +++ b/src/p4p/pvxs/client.pxd @@ -114,11 +114,19 @@ cdef extern from "" namespace "pvxs::client" nogil: Value build() except+ + cdef struct SubscriptionStat "pvxs::client::SubscriptionStat": + size_t nQueue + size_t nSrvSquash + size_t nCliSquash + size_t maxQueue + size_t limitQueue + cdef cppclass Subscription: bool cancel() except+ void pause(bool p) except+ void resume() except+ Value pop() except+ + void stats(SubscriptionStat&, bool reset) except+ cdef cppclass Result: pass diff --git a/src/pvxs_client.cpp b/src/pvxs_client.cpp index 34da00b2..121b3e42 100644 --- a/src/pvxs_client.cpp +++ b/src/pvxs_client.cpp @@ -91,6 +91,27 @@ void opEvent(client::MonitorBuilder& builder, PyObject *handler) }); } +void opEventHub(NotificationHub& hub, client::MonitorBuilder& builder, PyObject *handler) +{ + auto notify(hub.add([handler]() mutable { + // on python worker + + PyLock L; + + auto ret(PyRef::allownull(PyObject_CallFunction(handler, ""))); + if(!ret.obj) { + PySys_WriteStderr("Unhandled Exception %s:%d\n", __FILE__, __LINE__); + PyErr_Print(); + PyErr_Clear(); + } + })); + + builder.event([notify](client::Subscription&) mutable { + // on PVA worker + notify->notify(); + }); +} + PyObject* monPop(const std::shared_ptr& mon) { PyObject* klass;