Skip to content

Commit

Permalink
open62541: fix race condition
Browse files Browse the repository at this point in the history
The scope of opslock was too small in processRequests().
It would need to overlap with the scope of clientlock but
that could lead to deadlock. Solution: use only clientlock
with large enough scope and drop opslock.
  • Loading branch information
dirk-zimoch committed Jan 28, 2025
1 parent b5db094 commit 6c4eb54
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 65 deletions.
128 changes: 64 additions & 64 deletions devOpcuaSup/open62541/SessionOpen62541.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -652,41 +652,42 @@ SessionOpen62541::processRequests (std::vector<std::shared_ptr<ReadRequest>> &ba

{
Guard G(clientlock);
if (!isConnected()) return; // may have disconnected while we waited
status=UA_Client_sendAsyncReadRequest(client, &request,
[] (UA_Client *client,
void *userdata,
UA_UInt32
requestId,
UA_ReadResponse *response)
{
static_cast<SessionOpen62541*>(userdata)->readComplete(requestId, response);
},
this, &id);
}
UA_ReadRequest_clear(&request);
if (UA_STATUS_IS_BAD(status)) {
errlogPrintf(
"OPC UA session %s: (requestRead) beginRead service failed with status %s\n",
name.c_str(),
UA_StatusCode_name(status));
// Create readFailure events for all items of the batch
for (auto c : batch) {
c->item->setIncomingEvent(ProcessReason::readFailure);
if (isConnected()) { // may have disconnected while we waited
status=UA_Client_sendAsyncReadRequest(client, &request,
[] (UA_Client *client,
void *userdata,
UA_UInt32
requestId,
UA_ReadResponse *response)
{
static_cast<SessionOpen62541*>(userdata)->readComplete(requestId, response);
},
this, &id);
if (UA_STATUS_IS_BAD(status)) {
errlogPrintf(
"OPC UA session %s: (requestRead) beginRead service failed with status %s\n",
name.c_str(),
UA_StatusCode_name(status));
// Create readFailure events for all items of the batch
for (auto c : batch) {
c->item->setIncomingEvent(ProcessReason::readFailure);
}
} else {
if (debug >= 5)
std::cout << "Session " << name
<< ": (requestRead) beginRead service ok"
<< " (transaction id " << id
<< "; retrieving " << itemsToRead->size()
<< " nodes)"
<< std::endl;
outstandingOps.insert(
std::pair<UA_UInt32,
std::unique_ptr<std::vector<ItemOpen62541 *>>>(id, std::move(itemsToRead)));
}
}
} else {
if (debug >= 5)
std::cout << "Session " << name
<< ": (requestRead) beginRead service ok"
<< " (transaction id " << id
<< "; retrieving " << itemsToRead->size()
<< " nodes)"
<< std::endl;
Guard G(opslock);
outstandingOps.insert(
std::pair<UA_UInt32,
std::unique_ptr<std::vector<ItemOpen62541 *>>>(id, std::move(itemsToRead)));
}

UA_ReadRequest_clear(&request);
}

void
Expand Down Expand Up @@ -731,38 +732,39 @@ SessionOpen62541::processRequests (std::vector<std::shared_ptr<WriteRequest>> &b

{
Guard G(clientlock);
if (!isConnected()) return; // may have disconnected while we waited
status=UA_Client_sendAsyncWriteRequest(client, &request,
[] (UA_Client *client,
void *userdata,
UA_UInt32 requestId,
UA_WriteResponse *response)
{
static_cast<SessionOpen62541*>(userdata)->writeComplete(requestId, response);
},
this, &id);
if (isConnected()) { // may have disconnected while we waited
status=UA_Client_sendAsyncWriteRequest(client, &request,
[] (UA_Client *client,
void *userdata,
UA_UInt32 requestId,
UA_WriteResponse *response)
{
static_cast<SessionOpen62541*>(userdata)->writeComplete(requestId, response);
},
this, &id);

if (UA_STATUS_IS_BAD(status)) {
errlogPrintf("OPC UA session %s: (requestWrite) beginWrite service failed with status %s\n",
name.c_str(), UA_StatusCode_name(status));
// Create writeFailure events for all items of the batch
for (auto c : batch) {
c->item->setIncomingEvent(ProcessReason::writeFailure);
}
} else {
if (debug >= 5)
std::cout << "Session " << name
<< ": (requestWrite) beginWrite service ok"
<< " (transaction id " << id
<< "; writing " << itemsToWrite->size()
<< " nodes)"
<< std::endl;
outstandingOps.insert(std::pair<UA_UInt32,
std::unique_ptr<std::vector<ItemOpen62541 *>>>(id, std::move(itemsToWrite)));
}
}
}

UA_WriteRequest_clear(&request);
if (UA_STATUS_IS_BAD(status)) {
errlogPrintf("OPC UA session %s: (requestWrite) beginWrite service failed with status %s\n",
name.c_str(), UA_StatusCode_name(status));
// Create writeFailure events for all items of the batch
for (auto c : batch) {
c->item->setIncomingEvent(ProcessReason::writeFailure);
}
} else {
if (debug >= 5)
std::cout << "Session " << name
<< ": (requestWrite) beginWrite service ok"
<< " (transaction id " << id
<< "; writing " << itemsToWrite->size()
<< " nodes)"
<< std::endl;
Guard G(opslock);
outstandingOps.insert(std::pair<UA_UInt32,
std::unique_ptr<std::vector<ItemOpen62541 *>>>(id, std::move(itemsToWrite)));
}
}

void
Expand Down Expand Up @@ -2383,7 +2385,6 @@ void
SessionOpen62541::readComplete (UA_UInt32 transactionId,
UA_ReadResponse* response)
{
Guard G(opslock);
auto it = outstandingOps.find(transactionId);
if (it == outstandingOps.end()) {
errlogPrintf("OPC UA session %s: (readComplete) received a callback "
Expand Down Expand Up @@ -2449,7 +2450,6 @@ void
SessionOpen62541::writeComplete (UA_UInt32 transactionId,
UA_WriteResponse* response)
{
Guard G(opslock);
auto it = outstandingOps.find(transactionId);
if (it == outstandingOps.end()) {
errlogPrintf("OPC UA session %s: (writeComplete) received a callback "
Expand Down
1 change: 0 additions & 1 deletion devOpcuaSup/open62541/SessionOpen62541.h
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,6 @@ class SessionOpen62541
int transactionId; /**< next transaction id */
/** itemOpen62541 vectors of outstanding read or write operations, indexed by transaction id */
std::map<UA_UInt32, std::unique_ptr<std::vector<ItemOpen62541 *>>> outstandingOps;
epicsMutex opslock; /**< lock for outstandingOps map */

RequestQueueBatcher<WriteRequest> writer; /**< batcher for write requests */
unsigned int writeNodesMax; /**< max number of nodes per write request */
Expand Down

0 comments on commit 6c4eb54

Please sign in to comment.