Skip to content

Commit

Permalink
Collect ca_client_context operations
Browse files Browse the repository at this point in the history
Each instance of the caContext class represents a separate CA context,
so each CAChannelProvider creates one and keeps a shared_ptr to it,
making that available to its channels and channel operations. These
also take their own shared_ptr to it as well so the context cannot be
destroyed while it might be needed.

A related caContext Attach object is intended to be short-lived, and
to be allocated on the stack. When created it saves the current CA
context for the thread, replacing it from the caContext given to its
constructor. CA operations will now use the attached context. When the
Attach destructor runs it detaches the thread from the current context
(checking still has the expected value) and re-attaches the thread to
any context that was saved by the constructor.
  • Loading branch information
anjohnson committed Nov 17, 2020
1 parent 849f003 commit 23770d6
Show file tree
Hide file tree
Showing 8 changed files with 175 additions and 91 deletions.
3 changes: 2 additions & 1 deletion src/ca/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ LIB_SYS_LIBS_WIN32 += netapi32 ws2_32

INC += pv/caProvider.h

pvAccessCA_SRCS += notifierConveyor.cpp
pvAccessCA_SRCS += caProvider.cpp
pvAccessCA_SRCS += caContext.cpp
pvAccessCA_SRCS += caChannel.cpp
pvAccessCA_SRCS += dbdToPv.cpp
pvAccessCA_SRCS += notifierConveyor.cpp

include $(TOP)/configure/RULES
51 changes: 22 additions & 29 deletions src/ca/caChannel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ CAChannel::CAChannel(std::string const & channelName,
channelID(0),
channelCreated(false),
channelConnected(false),
connectNotification(new Notification())
connectNotification(new Notification()),
ca_context(channelProvider->caContext())
{
}

Expand All @@ -120,17 +121,17 @@ void CAChannel::activate(short priority)
ChannelRequester::shared_pointer req(channelRequester.lock());
if (!req) return;
connectNotification->setClient(shared_from_this());
attachContext();
Attach to(ca_context);
int result = ca_create_channel(channelName.c_str(),
ca_connection_handler,
this,
priority, // TODO mapping
&channelID);
ca_connection_handler, this,
priority, // TODO mapping
&channelID);
if (result == ECA_NORMAL) {
channelCreated = true;
CAChannelProviderPtr provider(channelProvider.lock());
if(provider) provider->addChannel(shared_from_this());
EXCEPTION_GUARD(req->channelCreated(Status::Ok, shared_from_this()));
channelCreated = true;
CAChannelProviderPtr provider(channelProvider.lock());
if (provider)
provider->addChannel(shared_from_this());
EXCEPTION_GUARD(req->channelCreated(Status::Ok, shared_from_this()));
}
else {
Status errorStatus(Status::STATUSTYPE_ERROR, string(ca_message(result)));
Expand Down Expand Up @@ -162,7 +163,7 @@ void CAChannel::disconnectChannel()
}
monitorlist.resize(0);
/* Clear CA Channel */
attachContext();
Attach to(ca_context);
int result = ca_clear_channel(channelID);
if (result == ECA_NORMAL) return;
string mess("CAChannel::disconnectChannel() ");
Expand Down Expand Up @@ -361,18 +362,6 @@ void CAChannelGetField::callRequester(CAChannelPtr const & caChannel)

/* ---------------------------------------------------------- */

void CAChannel::attachContext()
{
CAChannelProviderPtr provider(channelProvider.lock());
if (provider) {
provider->attachContext();
return;
}
string mess("CAChannel::attachContext provider does not exist ");
mess += getChannelName();
throw std::runtime_error(mess);
}

CAChannelGetPtr CAChannelGet::create(
CAChannel::shared_pointer const & channel,
ChannelGetRequester::shared_pointer const & channelGetRequester,
Expand All @@ -389,7 +378,8 @@ CAChannelGet::CAChannelGet(CAChannel::shared_pointer const & channel,
channelGetRequester(channelGetRequester),
pvRequest(pvRequest),
getStatus(Status::Ok),
getNotification(new Notification())
getNotification(new Notification()),
ca_context(channel->caContext())
{}

CAChannelGet::~CAChannelGet()
Expand Down Expand Up @@ -445,8 +435,8 @@ void CAChannelGet::get()
{
ChannelGetRequester::shared_pointer getRequester(channelGetRequester.lock());
if (!getRequester) return;
channel->attachContext();
bitSet->clear();
Attach to(ca_context);
int result = ca_array_get_callback(dbdToPv->getRequestType(),
0,
channel->getChannelID(), ca_get_handler, this);
Expand Down Expand Up @@ -492,7 +482,8 @@ CAChannelPut::CAChannelPut(CAChannel::shared_pointer const & channel,
isPut(false),
getStatus(Status::Ok),
putStatus(Status::Ok),
putNotification(new Notification())
putNotification(new Notification()),
ca_context(channel->caContext())
{}

CAChannelPut::~CAChannelPut()
Expand Down Expand Up @@ -603,8 +594,8 @@ void CAChannelPut::get()
isPut = false;
}

channel->attachContext();
bitSet->clear();
Attach to(ca_context);
int result = ca_array_get_callback(dbdToPv->getRequestType(),
0,
channel->getChannelID(), ca_put_get_handler, this);
Expand Down Expand Up @@ -737,7 +728,8 @@ CAChannelMonitor::CAChannelMonitor(
isStarted(false),
pevid(NULL),
eventMask(DBE_VALUE | DBE_ALARM),
eventNotification(new Notification())
eventNotification(new Notification()),
ca_context(channel->caContext())
{}

CAChannelMonitor::~CAChannelMonitor()
Expand Down Expand Up @@ -833,7 +825,7 @@ Status CAChannelMonitor::start()
isStarted = true;
monitorQueue->start();
}
channel->attachContext();
Attach to(ca_context);
int result = ca_create_subscription(dbdToPv->getRequestType(),
0,
channel->getChannelID(), eventMask,
Expand All @@ -859,6 +851,7 @@ Status CAChannelMonitor::stop()
isStarted = false;
}
monitorQueue->stop();
// Attach to(ca_context); -- Not required!
int result = ca_clear_subscription(pevid);
if (result==ECA_NORMAL)
return Status::Ok;
Expand Down
23 changes: 14 additions & 9 deletions src/ca/caChannel.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,16 @@ class CAChannel :
epics::pvData::PVStructurePtr const & pvRequest);
virtual void printInfo(std::ostream& out);

void attachContext();
void disconnectChannel();
void connect(bool isConnected);
virtual void notifyClient();

void notifyResult(NotificationPtr const &notificationPtr);
virtual void notifyClient();

CAContextPtr caContext() {
return ca_context;
}
private:
virtual void destroy() {}
CAChannel(std::string const & channelName,
CAChannelProvider::shared_pointer const & channelProvider,
ChannelRequester::shared_pointer const & channelRequester);
Expand All @@ -117,6 +119,7 @@ class CAChannel :
bool channelCreated;
bool channelConnected;
NotificationPtr connectNotification;
CAContextPtr ca_context;

epics::pvData::Mutex requestsMutex;
std::queue<CAChannelGetFieldPtr> getFieldQueue;
Expand Down Expand Up @@ -148,16 +151,17 @@ class CAChannelGet :
void activate();
virtual void notifyClient();
private:
virtual void destroy() {}
CAChannelGet(CAChannel::shared_pointer const & _channel,
ChannelGetRequester::shared_pointer const & _channelGetRequester,
epics::pvData::PVStructurePtr const & pvRequest);

CAChannelPtr channel;
ChannelGetRequester::weak_pointer channelGetRequester;
const epics::pvData::PVStructure::shared_pointer pvRequest;
epics::pvData::Status getStatus;
NotificationPtr getNotification;
CAContextPtr ca_context;

DbdToPvPtr dbdToPv;
epics::pvData::Mutex mutex;
epics::pvData::PVStructure::shared_pointer pvStructure;
Expand Down Expand Up @@ -190,7 +194,6 @@ class CAChannelPut :
void activate();
virtual void notifyClient();
private:
virtual void destroy() {}
CAChannelPut(CAChannel::shared_pointer const & _channel,
ChannelPutRequester::shared_pointer const & _channelPutRequester,
epics::pvData::PVStructurePtr const & pvRequest);
Expand All @@ -202,6 +205,8 @@ class CAChannelPut :
epics::pvData::Status getStatus;
epics::pvData::Status putStatus;
NotificationPtr putNotification;
CAContextPtr ca_context;

DbdToPvPtr dbdToPv;
epics::pvData::Mutex mutex;
epics::pvData::PVStructure::shared_pointer pvStructure;
Expand Down Expand Up @@ -233,9 +238,8 @@ class CAChannelMonitor :
void activate();
virtual void notifyClient();
private:
virtual void destroy() {}
CAChannelMonitor(CAChannel::shared_pointer const & _channel,
MonitorRequester::shared_pointer const & _monitorRequester,
CAChannelMonitor(CAChannel::shared_pointer const & channel,
MonitorRequester::shared_pointer const & monitorRequester,
epics::pvData::PVStructurePtr const & pvRequest);
CAChannelPtr channel;
MonitorRequester::weak_pointer monitorRequester;
Expand All @@ -244,6 +248,7 @@ class CAChannelMonitor :
evid pevid;
unsigned long eventMask;
NotificationPtr eventNotification;
CAContextPtr ca_context;

DbdToPvPtr dbdToPv;
epics::pvData::Mutex mutex;
Expand Down
65 changes: 65 additions & 0 deletions src/ca/caContext.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/**
* Copyright - See the COPYRIGHT that is included with this distribution.
* pvAccessCPP is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
*/

#include <cadef.h>

#define epicsExportSharedSymbols
#include "caContext.h"

namespace epics {
namespace pvAccess {
namespace ca {

CAContext::CAContext()
{
int result = ca_context_create(ca_enable_preemptive_callback);
if (result != ECA_NORMAL)
throw std::runtime_error("Can't create CA context");

ca_context = ca_current_context();
}

ca_client_context* CAContext::attach()
{
ca_client_context *thread_context = ca_current_context();
if (thread_context != ca_context) {
if (thread_context)
ca_detach_context();

int result = ca_attach_context(ca_context);
if (result != ECA_NORMAL)
throw std::runtime_error("Can't attach to CA context");
}
return thread_context;
}

void CAContext::detach(ca_client_context* restore) \
{
ca_client_context *thread_context = ca_current_context();
if (thread_context != ca_context)
std::cerr << "CA context was changed!" << std::endl;

ca_detach_context();

if (restore) {
int result = ca_attach_context(restore);
if (result != ECA_NORMAL)
std::cerr << "Can't re-attach to CA context" << std::endl;
}
}

CAContext::~CAContext()
{
ca_client_context *thread_context = attach();
ca_context_destroy();
if (thread_context != ca_context) {
int result = ca_attach_context(ca_context);
if (result != ECA_NORMAL)
std::cerr << "Can't re-attach to CA context" << std::endl;
}
}

}}}
51 changes: 51 additions & 0 deletions src/ca/caContext.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/**
* Copyright - See the COPYRIGHT that is included with this distribution.
* pvAccessCPP is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
*/

#ifndef INC_caContext_H
#define INC_caContext_H

#include <cadef.h>
#include <pv/pvAccess.h>

namespace epics {
namespace pvAccess {
namespace ca {


class Attach;
class CAContext;
typedef std::tr1::shared_ptr<CAContext> CAContextPtr;

class CAContext
{
public:
CAContext();
~CAContext();
private:
ca_client_context* ca_context;

private: // Internal API
friend class Attach;
ca_client_context* attach();
void detach(ca_client_context* restore);
};

class Attach
{
public:
Attach(const CAContextPtr & to) :
context(to), saved_context(to->attach()) {}
~Attach() {
context->detach(saved_context);
}
private:
CAContextPtr context;
ca_client_context* saved_context;
};

}}}

#endif // INC_caContext_H
Loading

0 comments on commit 23770d6

Please sign in to comment.