From 23770d6fa4775f85383bac217762d032905b303d Mon Sep 17 00:00:00 2001 From: Andrew Johnson Date: Thu, 8 Oct 2020 00:13:24 -0500 Subject: [PATCH] Collect ca_client_context operations Each instance of the caContext class represents a separate CA context, so each CAChannelProvider creates one and keeps a shared_ptr to it, making that available to its channels and channel operations. These also take their own shared_ptr to it as well so the context cannot be destroyed while it might be needed. A related caContext Attach object is intended to be short-lived, and to be allocated on the stack. When created it saves the current CA context for the thread, replacing it from the caContext given to its constructor. CA operations will now use the attached context. When the Attach destructor runs it detaches the thread from the current context (checking still has the expected value) and re-attaches the thread to any context that was saved by the constructor. --- src/ca/Makefile | 3 +- src/ca/caChannel.cpp | 51 ++++++++++++++------------------- src/ca/caChannel.h | 23 +++++++++------ src/ca/caContext.cpp | 65 ++++++++++++++++++++++++++++++++++++++++++ src/ca/caContext.h | 51 +++++++++++++++++++++++++++++++++ src/ca/caProvider.cpp | 38 ++---------------------- src/ca/caProviderPvt.h | 9 +++--- src/ca/dbdToPv.cpp | 26 +++++++++-------- 8 files changed, 175 insertions(+), 91 deletions(-) create mode 100644 src/ca/caContext.cpp create mode 100644 src/ca/caContext.h diff --git a/src/ca/Makefile b/src/ca/Makefile index bdc8d490..90710701 100644 --- a/src/ca/Makefile +++ b/src/ca/Makefile @@ -11,9 +11,10 @@ LIB_SYS_LIBS_WIN32 += netapi32 ws2_32 INC += pv/caProvider.h -pvAccessCA_SRCS += notifierConveyor.cpp pvAccessCA_SRCS += caProvider.cpp +pvAccessCA_SRCS += caContext.cpp pvAccessCA_SRCS += caChannel.cpp pvAccessCA_SRCS += dbdToPv.cpp +pvAccessCA_SRCS += notifierConveyor.cpp include $(TOP)/configure/RULES diff --git a/src/ca/caChannel.cpp b/src/ca/caChannel.cpp index ff40b91d..38b8f0cf 100644 --- a/src/ca/caChannel.cpp +++ b/src/ca/caChannel.cpp @@ -111,7 +111,8 @@ CAChannel::CAChannel(std::string const & channelName, channelID(0), channelCreated(false), channelConnected(false), - connectNotification(new Notification()) + connectNotification(new Notification()), + ca_context(channelProvider->caContext()) { } @@ -120,17 +121,17 @@ void CAChannel::activate(short priority) ChannelRequester::shared_pointer req(channelRequester.lock()); if (!req) return; connectNotification->setClient(shared_from_this()); - attachContext(); + Attach to(ca_context); int result = ca_create_channel(channelName.c_str(), - ca_connection_handler, - this, - priority, // TODO mapping - &channelID); + ca_connection_handler, this, + priority, // TODO mapping + &channelID); if (result == ECA_NORMAL) { - channelCreated = true; - CAChannelProviderPtr provider(channelProvider.lock()); - if(provider) provider->addChannel(shared_from_this()); - EXCEPTION_GUARD(req->channelCreated(Status::Ok, shared_from_this())); + channelCreated = true; + CAChannelProviderPtr provider(channelProvider.lock()); + if (provider) + provider->addChannel(shared_from_this()); + EXCEPTION_GUARD(req->channelCreated(Status::Ok, shared_from_this())); } else { Status errorStatus(Status::STATUSTYPE_ERROR, string(ca_message(result))); @@ -162,7 +163,7 @@ void CAChannel::disconnectChannel() } monitorlist.resize(0); /* Clear CA Channel */ - attachContext(); + Attach to(ca_context); int result = ca_clear_channel(channelID); if (result == ECA_NORMAL) return; string mess("CAChannel::disconnectChannel() "); @@ -361,18 +362,6 @@ void CAChannelGetField::callRequester(CAChannelPtr const & caChannel) /* ---------------------------------------------------------- */ -void CAChannel::attachContext() -{ - CAChannelProviderPtr provider(channelProvider.lock()); - if (provider) { - provider->attachContext(); - return; - } - string mess("CAChannel::attachContext provider does not exist "); - mess += getChannelName(); - throw std::runtime_error(mess); -} - CAChannelGetPtr CAChannelGet::create( CAChannel::shared_pointer const & channel, ChannelGetRequester::shared_pointer const & channelGetRequester, @@ -389,7 +378,8 @@ CAChannelGet::CAChannelGet(CAChannel::shared_pointer const & channel, channelGetRequester(channelGetRequester), pvRequest(pvRequest), getStatus(Status::Ok), - getNotification(new Notification()) + getNotification(new Notification()), + ca_context(channel->caContext()) {} CAChannelGet::~CAChannelGet() @@ -445,8 +435,8 @@ void CAChannelGet::get() { ChannelGetRequester::shared_pointer getRequester(channelGetRequester.lock()); if (!getRequester) return; - channel->attachContext(); bitSet->clear(); + Attach to(ca_context); int result = ca_array_get_callback(dbdToPv->getRequestType(), 0, channel->getChannelID(), ca_get_handler, this); @@ -492,7 +482,8 @@ CAChannelPut::CAChannelPut(CAChannel::shared_pointer const & channel, isPut(false), getStatus(Status::Ok), putStatus(Status::Ok), - putNotification(new Notification()) + putNotification(new Notification()), + ca_context(channel->caContext()) {} CAChannelPut::~CAChannelPut() @@ -603,8 +594,8 @@ void CAChannelPut::get() isPut = false; } - channel->attachContext(); bitSet->clear(); + Attach to(ca_context); int result = ca_array_get_callback(dbdToPv->getRequestType(), 0, channel->getChannelID(), ca_put_get_handler, this); @@ -737,7 +728,8 @@ CAChannelMonitor::CAChannelMonitor( isStarted(false), pevid(NULL), eventMask(DBE_VALUE | DBE_ALARM), - eventNotification(new Notification()) + eventNotification(new Notification()), + ca_context(channel->caContext()) {} CAChannelMonitor::~CAChannelMonitor() @@ -833,7 +825,7 @@ Status CAChannelMonitor::start() isStarted = true; monitorQueue->start(); } - channel->attachContext(); + Attach to(ca_context); int result = ca_create_subscription(dbdToPv->getRequestType(), 0, channel->getChannelID(), eventMask, @@ -859,6 +851,7 @@ Status CAChannelMonitor::stop() isStarted = false; } monitorQueue->stop(); + // Attach to(ca_context); -- Not required! int result = ca_clear_subscription(pevid); if (result==ECA_NORMAL) return Status::Ok; diff --git a/src/ca/caChannel.h b/src/ca/caChannel.h index ec0993bc..d8e3c76c 100644 --- a/src/ca/caChannel.h +++ b/src/ca/caChannel.h @@ -96,14 +96,16 @@ class CAChannel : epics::pvData::PVStructurePtr const & pvRequest); virtual void printInfo(std::ostream& out); - void attachContext(); void disconnectChannel(); void connect(bool isConnected); - virtual void notifyClient(); void notifyResult(NotificationPtr const ¬ificationPtr); + virtual void notifyClient(); + + CAContextPtr caContext() { + return ca_context; + } private: - virtual void destroy() {} CAChannel(std::string const & channelName, CAChannelProvider::shared_pointer const & channelProvider, ChannelRequester::shared_pointer const & channelRequester); @@ -117,6 +119,7 @@ class CAChannel : bool channelCreated; bool channelConnected; NotificationPtr connectNotification; + CAContextPtr ca_context; epics::pvData::Mutex requestsMutex; std::queue getFieldQueue; @@ -148,16 +151,17 @@ class CAChannelGet : void activate(); virtual void notifyClient(); private: - virtual void destroy() {} CAChannelGet(CAChannel::shared_pointer const & _channel, ChannelGetRequester::shared_pointer const & _channelGetRequester, epics::pvData::PVStructurePtr const & pvRequest); - + CAChannelPtr channel; ChannelGetRequester::weak_pointer channelGetRequester; const epics::pvData::PVStructure::shared_pointer pvRequest; epics::pvData::Status getStatus; NotificationPtr getNotification; + CAContextPtr ca_context; + DbdToPvPtr dbdToPv; epics::pvData::Mutex mutex; epics::pvData::PVStructure::shared_pointer pvStructure; @@ -190,7 +194,6 @@ class CAChannelPut : void activate(); virtual void notifyClient(); private: - virtual void destroy() {} CAChannelPut(CAChannel::shared_pointer const & _channel, ChannelPutRequester::shared_pointer const & _channelPutRequester, epics::pvData::PVStructurePtr const & pvRequest); @@ -202,6 +205,8 @@ class CAChannelPut : epics::pvData::Status getStatus; epics::pvData::Status putStatus; NotificationPtr putNotification; + CAContextPtr ca_context; + DbdToPvPtr dbdToPv; epics::pvData::Mutex mutex; epics::pvData::PVStructure::shared_pointer pvStructure; @@ -233,9 +238,8 @@ class CAChannelMonitor : void activate(); virtual void notifyClient(); private: - virtual void destroy() {} - CAChannelMonitor(CAChannel::shared_pointer const & _channel, - MonitorRequester::shared_pointer const & _monitorRequester, + CAChannelMonitor(CAChannel::shared_pointer const & channel, + MonitorRequester::shared_pointer const & monitorRequester, epics::pvData::PVStructurePtr const & pvRequest); CAChannelPtr channel; MonitorRequester::weak_pointer monitorRequester; @@ -244,6 +248,7 @@ class CAChannelMonitor : evid pevid; unsigned long eventMask; NotificationPtr eventNotification; + CAContextPtr ca_context; DbdToPvPtr dbdToPv; epics::pvData::Mutex mutex; diff --git a/src/ca/caContext.cpp b/src/ca/caContext.cpp new file mode 100644 index 00000000..afaec2ad --- /dev/null +++ b/src/ca/caContext.cpp @@ -0,0 +1,65 @@ +/** + * Copyright - See the COPYRIGHT that is included with this distribution. + * pvAccessCPP is distributed subject to a Software License Agreement found + * in file LICENSE that is included with this distribution. + */ + +#include + +#define epicsExportSharedSymbols +#include "caContext.h" + +namespace epics { +namespace pvAccess { +namespace ca { + +CAContext::CAContext() +{ + int result = ca_context_create(ca_enable_preemptive_callback); + if (result != ECA_NORMAL) + throw std::runtime_error("Can't create CA context"); + + ca_context = ca_current_context(); +} + +ca_client_context* CAContext::attach() +{ + ca_client_context *thread_context = ca_current_context(); + if (thread_context != ca_context) { + if (thread_context) + ca_detach_context(); + + int result = ca_attach_context(ca_context); + if (result != ECA_NORMAL) + throw std::runtime_error("Can't attach to CA context"); + } + return thread_context; +} + +void CAContext::detach(ca_client_context* restore) \ +{ + ca_client_context *thread_context = ca_current_context(); + if (thread_context != ca_context) + std::cerr << "CA context was changed!" << std::endl; + + ca_detach_context(); + + if (restore) { + int result = ca_attach_context(restore); + if (result != ECA_NORMAL) + std::cerr << "Can't re-attach to CA context" << std::endl; + } +} + +CAContext::~CAContext() +{ + ca_client_context *thread_context = attach(); + ca_context_destroy(); + if (thread_context != ca_context) { + int result = ca_attach_context(ca_context); + if (result != ECA_NORMAL) + std::cerr << "Can't re-attach to CA context" << std::endl; + } +} + +}}} diff --git a/src/ca/caContext.h b/src/ca/caContext.h new file mode 100644 index 00000000..620cb20f --- /dev/null +++ b/src/ca/caContext.h @@ -0,0 +1,51 @@ +/** + * Copyright - See the COPYRIGHT that is included with this distribution. + * pvAccessCPP is distributed subject to a Software License Agreement found + * in file LICENSE that is included with this distribution. + */ + +#ifndef INC_caContext_H +#define INC_caContext_H + +#include +#include + +namespace epics { +namespace pvAccess { +namespace ca { + + +class Attach; +class CAContext; +typedef std::tr1::shared_ptr CAContextPtr; + +class CAContext +{ +public: + CAContext(); + ~CAContext(); +private: + ca_client_context* ca_context; + +private: // Internal API + friend class Attach; + ca_client_context* attach(); + void detach(ca_client_context* restore); +}; + +class Attach +{ +public: + Attach(const CAContextPtr & to) : + context(to), saved_context(to->attach()) {} + ~Attach() { + context->detach(saved_context); + } +private: + CAContextPtr context; + ca_client_context* saved_context; +}; + +}}} + +#endif // INC_caContext_H diff --git a/src/ca/caProvider.cpp b/src/ca/caProvider.cpp index 26af4dd3..28f14f01 100644 --- a/src/ca/caProvider.cpp +++ b/src/ca/caProvider.cpp @@ -20,18 +20,11 @@ namespace ca { using namespace epics::pvData; -CAChannelProvider::CAChannelProvider() - : current_context(0) -{ - initialize(); -} - CAChannelProvider::CAChannelProvider(const std::tr1::shared_ptr &) - : current_context(0) + : ca_context(CAContextPtr(new CAContext())) { connectNotifier.start(); resultNotifier.start(); - initialize(); } CAChannelProvider::~CAChannelProvider() @@ -52,7 +45,6 @@ CAChannelProvider::~CAChannelProvider() channelQ.front()->disconnectChannel(); channelQ.pop(); } - ca_context_destroy(); } std::string CAChannelProvider::getProviderName() @@ -137,33 +129,7 @@ void CAChannelProvider::poll() { } -void CAChannelProvider::attachContext() -{ - ca_client_context *thread_context = ca_current_context(); - if (thread_context == current_context) - return; - int result = ca_attach_context(current_context); - if (result == ECA_ISATTACHED) - return; - if (result != ECA_NORMAL) - { - std::string mess("CAChannelProvider::attachContext error calling ca_attach_context "); - mess += ca_message(result); - throw std::runtime_error(mess); - } -} - -void CAChannelProvider::initialize() -{ - int result = ca_context_create(ca_enable_preemptive_callback); - if (result != ECA_NORMAL) - { - std::string mess("CAChannelProvider::initialize error calling ca_context_create "); - mess += ca_message(result); - throw std::runtime_error(mess); - } - current_context = ca_current_context(); -} +// ---------------- CAClientFactory ---------------- void CAClientFactory::start() { diff --git a/src/ca/caProviderPvt.h b/src/ca/caProviderPvt.h index 9eb48b3f..7e31f73d 100644 --- a/src/ca/caProviderPvt.h +++ b/src/ca/caProviderPvt.h @@ -18,6 +18,7 @@ #include #include +#include "caContext.h" #include "notifierConveyor.h" @@ -49,7 +50,6 @@ class CAChannelProvider : { public: POINTER_DEFINITIONS(CAChannelProvider); - CAChannelProvider(); CAChannelProvider(const std::tr1::shared_ptr&); virtual ~CAChannelProvider(); @@ -79,9 +79,11 @@ class CAChannelProvider : virtual void flush(); virtual void poll(); - void attachContext(); void addChannel(const CAChannelPtr & channel); + CAContextPtr caContext() { + return ca_context; + } void notifyConnection(NotificationPtr const ¬ificationPtr) { connectNotifier.notifyClient(notificationPtr); } @@ -89,8 +91,7 @@ class CAChannelProvider : resultNotifier.notifyClient(notificationPtr); } private: - void initialize(); - ca_client_context* current_context; + CAContextPtr ca_context; epics::pvData::Mutex channelListMutex; std::vector caChannelList; diff --git a/src/ca/dbdToPv.cpp b/src/ca/dbdToPv.cpp index 64da47fa..33ccf512 100644 --- a/src/ca/dbdToPv.cpp +++ b/src/ca/dbdToPv.cpp @@ -357,11 +357,10 @@ void DbdToPv::getChoices(CAChannelPtr const & caChannel) { if(caRequestType==DBR_ENUM||caRequestType==DBR_TIME_ENUM) { - caChannel->attachContext(); chid channelID = caChannel->getChannelID(); - int result = ca_array_get_callback(DBR_GR_ENUM, - 1, - channelID, enumChoicesHandler, this); + Attach to(caChannel->caContext()); + int result = ca_array_get_callback(DBR_GR_ENUM, 1, + channelID, enumChoicesHandler, this); if (result == ECA_NORMAL) { result = ca_flush_io(); choicesEvent.wait(); @@ -969,18 +968,21 @@ Status DbdToPv::putToDBD( } Status status = Status::Ok; int result = 0; - caChannel->attachContext(); - if(block) { + Attach to(caChannel->caContext()); + if (block) { result = ca_array_put_callback(caValueType,count,channelID,pValue,putHandler,userarg); - } else { + } + else { result = ca_array_put(caValueType,count,channelID,pValue); } - if(result==ECA_NORMAL) { - ca_flush_io(); - } else { - status = Status(Status::STATUSTYPE_ERROR, string(ca_message(result))); + if (result == ECA_NORMAL) { + ca_flush_io(); + } + else { + status = Status(Status::STATUSTYPE_ERROR, string(ca_message(result))); } - if(ca_stringBuffer!=NULL) delete[] ca_stringBuffer; + if (ca_stringBuffer != NULL) + delete[] ca_stringBuffer; return status; }