diff --git a/odc/Topology.h b/odc/Topology.h index 08f9726..d5ccdb5 100644 --- a/odc/Topology.h +++ b/odc/Topology.h @@ -311,10 +311,12 @@ class BasicTopology : public AsioBase return false; } else { // if nMin is satisfied, ignore the entire collection & shutdown the responsible agent + auto& colDetails = mSession.getCollectionDetails(colId); OLOG(info, mPartitionID, mSession.mLastRunNr.load()) << "Ignoring failed collection '" << runtimeColPath << "' (id: " << colId << ")" << " as the remaining number of '" << colPath << "' collections (" << nCurrent - << ") is greater than or equal to nMin (" << nMin << ")."; + << ") is greater than or equal to nMin (" << nMin << ")." + << " On host: " << colDetails.mHost << ", working directory: " << colDetails.mWrkDir; return true; } } @@ -498,7 +500,8 @@ class BasicTopology : public AsioBase bool expendable = false; // check if we have an unexpected exit if (device.state == DeviceState::Error || (device.state == DeviceState::Exiting && lastState != DeviceState::Idle)) { - OLOG(error, mPartitionID, mSession.mLastRunNr.load()) << "Device " << device.taskId << " unexpectedly reached " << device.state << " state"; + auto& deviceDetails = mSession.getTaskDetails(device.taskId); + OLOG(error, mPartitionID, mSession.mLastRunNr.load()) << "Device " << device.taskId << " unexpectedly reached " << device.state << " state. On host: " << deviceDetails.mHost << ", working directory: " << deviceDetails.mWrkDir; // check if the device is expendable expendable = IgnoreExpendable(device); // Update SetProperties OPs only if unexpected exit diff --git a/tests/odc-fixtures.h b/tests/odc-fixtures.h index f656c14..5baf764 100644 --- a/tests/odc-fixtures.h +++ b/tests/odc-fixtures.h @@ -24,6 +24,7 @@ #include #include +#include #include #include #include @@ -85,6 +86,41 @@ struct TopologyFixture auto topologyRequest = STopologyRequest::makeRequest(topologyInfo); topologyRequest->setMessageCallback([](const SMessageResponseData& message) { BOOST_TEST_MESSAGE(message.m_msg); }); + + std::mutex mtx; + // fill the task/collection details + topologyRequest->setResponseCallback([this, &mtx](const dds::tools_api::STopologyResponseData& res) { + std::cout << "DDS Activate Response: " + << "agentID: " << res.m_agentID + << "; slotID: " << res.m_slotID + << "; taskID: " << res.m_taskID + << "; collectionID: " << res.m_collectionID + << "; host: " << res.m_host + << "; path: " << res.m_path + << "; workDir: " << res.m_wrkDir + << "; activated: " << res.m_activated + << std::endl; + + // We are not interested in stopped tasks + if (res.m_activated) { + // response callbacks can be called in parallel - protect session access with a lock + std::lock_guard lock(mtx); + mSession.mTaskDetails.emplace(res.m_taskID, TaskDetails{res.m_agentID, res.m_slotID, res.m_taskID, res.m_collectionID, res.m_path, res.m_host, res.m_wrkDir}); + + if (res.m_collectionID > 0) { + if (mSession.mCollectionDetails.find(res.m_collectionID) == mSession.mCollectionDetails.end()) { + std::string path = res.m_path; + auto pos = path.rfind('/'); + if (pos != std::string::npos) { + path.erase(pos); + } + mSession.mCollectionDetails.emplace(res.m_collectionID, CollectionDetails{res.m_agentID, res.m_collectionID, path, res.m_host, res.m_wrkDir}); + mSession.mRuntimeCollectionIndex.at(res.m_collectionID)->mRuntimeCollectionAgents[res.m_collectionID] = res.m_agentID; + } + } + } + }); + topologyRequest->setDoneCallback([blocker]() mutable { blocker.Signal(); }); mSession.mDDSSession.sendRequest(topologyRequest); blocker.Wait();