diff --git a/diagnostic_aggregator/include/diagnostic_aggregator/aggregator.h b/diagnostic_aggregator/include/diagnostic_aggregator/aggregator.h index feee6a4e9..8caad88f6 100644 --- a/diagnostic_aggregator/include/diagnostic_aggregator/aggregator.h +++ b/diagnostic_aggregator/include/diagnostic_aggregator/aggregator.h @@ -78,11 +78,11 @@ Input (status names): /Robot/Sensors/Tilt Hokuyo/Frequency /Robot/Sensors/Tilt Hokuyo/Connection \endverbatim - * The analyzer should always output a DiagnosticStatus with the name of the + * The analyzer should always output a DiagnosticStatus with the name of the * prefix. Any other data output is up to the analyzer developer. - * + * * Analyzer's are loaded by specifying the private parameters of the - * aggregator. + * aggregator. \verbatim base_path: My Robot pub_rate: 1.0 @@ -102,7 +102,7 @@ pub_rate: 1.0 * the aggregator will report the error and publish it in the aggregated output. */ class Aggregator -{ +{ public: /*! *\brief Constructor initializes with main prefix (ex: '/Robot') @@ -127,6 +127,11 @@ class Aggregator double getPubRate() const { return pub_rate_; } private: + typedef boost::shared_ptr AnalyzerPtr; + typedef boost::shared_ptr BondPtr; + typedef std::pair BondAnalyzerPair; + typedef std::vector BondAnalyzerPairs; + ros::NodeHandle n_; ros::ServiceServer add_srv_; /**< AddDiagnostics, /diagnostics_agg/add_diagnostics */ ros::Subscriber diag_sub_; /**< DiagnosticArray, /diagnostics */ @@ -153,7 +158,7 @@ class Aggregator OtherAnalyzer* other_analyzer_; - std::vector > bonds_; /**< \brief Contains all bonds for additional diagnostics. */ + BondAnalyzerPairs bonds_; /**< \brief Contains all bonds for additional diagnostics. */ /* *!\brief called when a bond between the aggregator and a node is broken @@ -161,10 +166,8 @@ class Aggregator * Modifies the contents of added_analyzers_ and analyzer_group, removing the * diagnostics that had been brought up by that bond. *!\param bond_id The bond id (namespace) from which the analyzer was created - *!\param analyzer Shared pointer to the analyzer group that was added */ - void bondBroken(std::string bond_id, - boost::shared_ptr analyzer); + void bondBroken(std::string bond_id); /* *!\brief called when a bond is formed between the aggregator and a node. @@ -193,7 +196,7 @@ class Aggregator struct BondIDMatch { BondIDMatch(const std::string s) : s(s) {} - bool operator()(const boost::shared_ptr& b){ return s == b->getId(); } + bool operator()(const std::pair< boost::shared_ptr, boost::shared_ptr > & p){ return s == p.first->getId(); } const std::string s; }; diff --git a/diagnostic_aggregator/include/diagnostic_aggregator/analyzer_group.h b/diagnostic_aggregator/include/diagnostic_aggregator/analyzer_group.h index d62ace484..b75183106 100644 --- a/diagnostic_aggregator/include/diagnostic_aggregator/analyzer_group.h +++ b/diagnostic_aggregator/include/diagnostic_aggregator/analyzer_group.h @@ -33,7 +33,7 @@ *********************************************************************/ /** - * \author Kevin Watts + * \author Kevin Watts */ #ifndef DIAGNOSTIC_ANALYZER_GROUP_H @@ -60,10 +60,10 @@ namespace diagnostic_aggregator { *\brief Allows analyzers to be grouped together, or used as sub-analyzers * * The AnalyzerGroup is used by the diagnostic aggregator internally to - * load and handle analyzers. It can be used as a normal analyzer plugin to + * load and handle analyzers. It can be used as a normal analyzer plugin to * allow analyzers to become "sub-analyzers", or move as a group. * - * The "sub-analyzers" are initialized using parameters in the "~analyzers" + * The "sub-analyzers" are initialized using parameters in the "~analyzers" * namespace of the AnalyzerGroup. The "type" parameters determines the analyzer type. * * Example initialization: @@ -92,14 +92,14 @@ namespace diagnostic_aggregator { * num_items: 3 *\endverbatim * - * Each namespace below "analyzers" describes a new Analyzer that will be loaded as a + * Each namespace below "analyzers" describes a new Analyzer that will be loaded as a * sub-analyzer. Any analyzer that fails to initialize or loads incorrectly will * generate an error in the console output, and a special diagnostic item in the output - * of the AnalyzerGroup that describes the error. + * of the AnalyzerGroup that describes the error. * - * In the above example, the AnalyzerGroup will have three sub-analyzers. The + * In the above example, the AnalyzerGroup will have three sub-analyzers. The * AnalyzerGroup will report a DiagnosticStatus message in the processed output with - * the name "Sensors" (the top-level state). The "Sensors" message will have the + * the name "Sensors" (the top-level state). The "Sensors" message will have the * level of the highest of the sub-analyzers, or the highest of "Sensors/Base Hokuyo", * "Sensors/Tilt Hokuyo" and "Sensors/IMU". The state of any other items, like * "Sensors/IMU/Connection" won't matter to the AnalyzerGroup. @@ -136,11 +136,6 @@ class AnalyzerGroup : public Analyzer */ virtual bool match(const std::string name); - /*! - *\brief Clear match arrays. Used when analyzers are added or removed - */ - void resetMatches(); - /*! *\brief Analyze returns true if any sub-analyzers will analyze an item */ @@ -156,6 +151,11 @@ class AnalyzerGroup : public Analyzer virtual std::string getName() const { return nice_name_; } private: + /*! + *\brief Clear match arrays. Used when analyzers are added or removed + */ + void resetMatches(); + std::string path_, nice_name_; /*! diff --git a/diagnostic_aggregator/scripts/add_analyzers b/diagnostic_aggregator/scripts/add_analyzers index c2d9ef061..da9dc539f 100755 --- a/diagnostic_aggregator/scripts/add_analyzers +++ b/diagnostic_aggregator/scripts/add_analyzers @@ -1,7 +1,5 @@ #!/usr/bin/env python -NAME='add_analyzers' - import sys import argparse from bondpy import bondpy @@ -12,13 +10,36 @@ import rospy class AddAnalyzers: def __init__(self, args): - rospy.on_shutdown(self.remove_group) self.bond = None + self.add_diagnostics = None + self.name = rospy.get_name() + self.namespace = None self.add_analyzers(args) + rospy.on_shutdown(self.remove_group) def remove_group(self): - if self.bond: - self.bond.shutdown() + if self.add_diagnostics is None: + return; + + # unload the analyzers + try: + # this will reverse the load if it is loaded on the other side + resp = self.add_diagnostics(load_namespace=self.namespace) + if resp.success: + rospy.loginfo('Add Analyzers: successfully removed analyzers from the diagnostic aggregator [{0}]'.format(self.name)) + else: + rospy.logerr('Add Analyzers: failed to remove analyzers on the diagnostic aggregator [{0}][{1}]'.format(self.name, resp.message)) + except rospy.service.ServiceException: + rospy.logerr('Add Analyzers: unloading service returned failure [{0}]'.format(self.name)) + except rospy.ROSException: + rospy.logerr('Add Analyzers: add timed out while waiting for diagnostics_agg service, or ROS shutdown [{0}]'.format(self.name)) + + # Cannot rely on bonds to get a message across if parts of ros are shutting down + # https://github.com/ros/bond_core/issues/14 + # + # if self.bond: + # self.bond.shutdown() + def add_analyzers(self, myargv): usage = """ @@ -32,7 +53,7 @@ class AddAnalyzers: parser.add_argument('-t', '--timeout', type=float, dest='timeout', default=None, help='time in seconds to wait for the diagnostic_agg service to come up before timing out. Default waits indefinitely') args = parser.parse_args(myargv[1:]) - namespace = rospy.resolve_name(rospy.get_name()) + self.namespace = rospy.resolve_name(rospy.get_name()) if args.analyzer_yaml is None: # nothing to do - it will assume parameters are already loaded on @@ -43,27 +64,26 @@ class AddAnalyzers: for params, ns in paramlist: rosparam.upload_params(ns, params) - self.bond = bondpy.Bond("/diagnostics_agg/bond", namespace) + self.bond = bondpy.Bond("/diagnostics_agg/bond", self.namespace) try: rospy.wait_for_service('/diagnostics_agg/add_diagnostics', timeout=args.timeout) - self.bond.start() - add_diagnostics = rospy.ServiceProxy('/diagnostics_agg/add_diagnostics', AddDiagnostics) - resp = add_diagnostics(load_namespace=namespace) + self.add_diagnostics = rospy.ServiceProxy('/diagnostics_agg/add_diagnostics', AddDiagnostics) + resp = self.add_diagnostics(load_namespace=self.namespace) if resp.success: - rospy.loginfo(NAME + ' successfully added analyzers to diagnostic aggregator') + rospy.loginfo('Add Analyzers: successfully added analyzers to diagnostic aggregator [{0}]'.format(self.name)) + self.bond.start() else: - rospy.logerr(NAME + ' did not add any analyzers to diagnostic aggregator: ' + resp.message) + rospy.logerr('Add Analyzers: did not add any analyzers to diagnostic aggregator [{0}][{1}]'.format(self.name, resp.message)) rospy.signal_shutdown('') except rospy.service.ServiceException: - rospy.logerr(NAME + ' service returned failure - missing aggregator or failed init of analyzer group?') + rospy.logerr('Add Analyzers: service returned failure - missing aggregator or failed init of analyzer group? [{0}]'.format(self.name)) rospy.signal_shutdown('') except rospy.ROSException: - rospy.logerr(NAME + ' add timed out while waiting for diagnostics_agg service, or ROS shutdown') + rospy.logerr('Add Analyzers: add timed out while waiting for diagnostics_agg service, or ROS shutdown [{0}]'.format(self.name)) rospy.signal_shutdown('') - rospy.spin() - if __name__ == '__main__': - rospy.init_node(NAME) + rospy.init_node('add_analyzers') AddAnalyzers(rospy.myargv()) + rospy.spin() diff --git a/diagnostic_aggregator/src/aggregator.cpp b/diagnostic_aggregator/src/aggregator.cpp index e48f741b5..92a33e429 100644 --- a/diagnostic_aggregator/src/aggregator.cpp +++ b/diagnostic_aggregator/src/aggregator.cpp @@ -81,7 +81,7 @@ void Aggregator::checkTimestamp(const diagnostic_msgs::DiagnosticArray::ConstPtr stamp_warn += ", "; stamp_warn += it->name; } - + if (!ros_warnings_.count(stamp_warn)) { ROS_WARN("%s", stamp_warn.c_str()); @@ -119,64 +119,80 @@ Aggregator::~Aggregator() } -void Aggregator::bondBroken(string bond_id, boost::shared_ptr analyzer) +void Aggregator::bondBroken(string bond_id) { boost::mutex::scoped_lock lock(mutex_); // Possibility of multiple bonds breaking at once ROS_DEBUG("Bond for namespace %s was broken", bond_id.c_str()); - std::vector >::iterator elem; + BondAnalyzerPairs::iterator elem; elem = std::find_if(bonds_.begin(), bonds_.end(), BondIDMatch(bond_id)); if (elem == bonds_.end()){ ROS_WARN("Broken bond tried to erase a bond which didn't exist."); } else { + if (!analyzer_group_->removeAnalyzer(elem->second)) + { + ROS_WARN("Broken bond tried to remove an analyzer which didn't exist."); + } bonds_.erase(elem); } - if (!analyzer_group_->removeAnalyzer(analyzer)) - { - ROS_WARN("Broken bond tried to remove an analyzer which didn't exist."); - } - - analyzer_group_->resetMatches(); } void Aggregator::bondFormed(boost::shared_ptr group){ ROS_DEBUG("Bond formed"); boost::mutex::scoped_lock lock(mutex_); analyzer_group_->addAnalyzer(group); - analyzer_group_->resetMatches(); } - +/* + * This will load diagnostics if they are not already loaded + * and reverse the operation (unload) them if they are. + */ bool Aggregator::addDiagnostics(diagnostic_msgs::AddDiagnostics::Request &req, diagnostic_msgs::AddDiagnostics::Response &res) { - ROS_DEBUG("Got load request for namespace %s", req.load_namespace.c_str()); // Don't currently support relative or private namespace definitions if (req.load_namespace[0] != '/') { - res.message = "Requested load from non-global namespace. Private and relative namespaces are not supported."; + res.message = "Requested (un)load from non-global namespace. Private and relative namespaces are not supported."; res.success = false; return true; } boost::shared_ptr group = boost::make_shared(); - { // lock here ensures that bonds from the same namespace aren't added twice. - // Without it, possibility of two simultaneous calls adding two objects. + { + // Without lock, possibility of two simultaneous calls in this function at the same time boost::mutex::scoped_lock lock(mutex_); - // rebuff attempts to add things from the same namespace twice - if (std::find_if(bonds_.begin(), bonds_.end(), BondIDMatch(req.load_namespace)) != bonds_.end()) + // if adding to a namespace that already has it, remove it - toggle! + BondAnalyzerPairs::iterator existing_bond_analyzer_iter = std::find_if(bonds_.begin(), bonds_.end(), BondIDMatch(req.load_namespace)); + if (existing_bond_analyzer_iter != bonds_.end()) { - res.message = "Requested load from namespace " + req.load_namespace + " which is already in use"; - res.success = false; + ROS_DEBUG("Got unload request for namespace %s", req.load_namespace.c_str()); + if (!analyzer_group_->removeAnalyzer(existing_bond_analyzer_iter->second)) + { + ROS_WARN("Tried to remove an analyzer which didn't exist."); + } + + // erase might delete the last bond reference triggering bondBroken(string bond_id) + // leading to a deadlock because of mutex_ lock, so create a temp reference + { + BondPtr temp_bond_reference = existing_bond_analyzer_iter->first; + bonds_.erase(existing_bond_analyzer_iter); + lock.unlock(); + //destroy temp_bond_reference reference + } + + res.message = "Unloaded from namespace '" + req.load_namespace + "'"; + res.success = true; return true; } + ROS_DEBUG("Got load request for namespace %s", req.load_namespace.c_str()); boost::shared_ptr req_bond = boost::make_shared( "/diagnostics_agg/bond", req.load_namespace, - boost::function(boost::bind(&Aggregator::bondBroken, this, req.load_namespace, group)), + boost::function(boost::bind(&Aggregator::bondBroken, this, req.load_namespace)), boost::function(boost::bind(&Aggregator::bondFormed, this, group)) ); req_bond->start(); - bonds_.push_back(req_bond); // bond formed, keep track of it + bonds_.push_back(std::make_pair(req_bond, group)); // bond formed, keep track of it with associated analyzer } if (group->init(base_path_, ros::NodeHandle(req.load_namespace))) @@ -201,7 +217,7 @@ void Aggregator::publishData() diag_toplevel_state.name = "toplevel_state"; diag_toplevel_state.level = -1; int min_level = 255; - + vector > processed; { boost::mutex::scoped_lock lock(mutex_); diff --git a/diagnostic_aggregator/src/aggregator_node.cpp b/diagnostic_aggregator/src/aggregator_node.cpp index f8c9d85fb..6f363b718 100644 --- a/diagnostic_aggregator/src/aggregator_node.cpp +++ b/diagnostic_aggregator/src/aggregator_node.cpp @@ -42,16 +42,16 @@ using namespace std; int main(int argc, char **argv) { ros::init(argc, argv, "diagnostic_aggregator"); - + try { diagnostic_aggregator::Aggregator agg; - + ros::Rate pub_rate(agg.getPubRate()); - while (agg.ok()) + while (ros::ok() && agg.ok()) { - ros::spinOnce(); agg.publishData(); + ros::spinOnce(); pub_rate.sleep(); } } @@ -60,8 +60,8 @@ int main(int argc, char **argv) ROS_FATAL("Diagnostic aggregator node caught exception. Aborting. %s", e.what()); ROS_BREAK(); } - + exit(0); return 0; } - + diff --git a/diagnostic_aggregator/src/analyzer_group.cpp b/diagnostic_aggregator/src/analyzer_group.cpp index 63ff393bc..c31e97c45 100644 --- a/diagnostic_aggregator/src/analyzer_group.cpp +++ b/diagnostic_aggregator/src/analyzer_group.cpp @@ -40,8 +40,8 @@ using namespace std; using namespace diagnostic_aggregator; PLUGINLIB_DECLARE_CLASS(diagnostic_aggregator, - AnalyzerGroup, - diagnostic_aggregator::AnalyzerGroup, + AnalyzerGroup, + diagnostic_aggregator::AnalyzerGroup, diagnostic_aggregator::Analyzer) AnalyzerGroup::AnalyzerGroup() : @@ -53,7 +53,7 @@ AnalyzerGroup::AnalyzerGroup() : bool AnalyzerGroup::init(const string base_path, const ros::NodeHandle &n) { n.param("path", nice_name_, string("")); - + if (base_path.size() > 0 && base_path != "/") path_ = base_path + "/" + nice_name_; else @@ -64,7 +64,7 @@ bool AnalyzerGroup::init(const string base_path, const ros::NodeHandle &n) path_ = "/" + path_; ros::NodeHandle analyzers_nh = ros::NodeHandle(n, "analyzers"); - + XmlRpc::XmlRpcValue analyzer_params; analyzers_nh.getParam("", analyzer_params); ROS_DEBUG("Analyzer params: %s.", analyzer_params.toXml().c_str()); @@ -79,7 +79,7 @@ bool AnalyzerGroup::init(const string base_path, const ros::NodeHandle &n) XmlRpc::XmlRpcValue analyzer_value = xml_it->second; string ns = analyzer_name; - + if (!analyzer_value.hasMember("type")) { ROS_ERROR("Namespace %s has no member 'type', unable to initialize analyzer for this namespace.", analyzers_nh.getNamespace().c_str()); @@ -91,7 +91,7 @@ bool AnalyzerGroup::init(const string base_path, const ros::NodeHandle &n) XmlRpc::XmlRpcValue analyzer_type = analyzer_value["type"]; string an_type = analyzer_type; - + boost::shared_ptr analyzer; try { @@ -138,7 +138,7 @@ bool AnalyzerGroup::init(const string base_path, const ros::NodeHandle &n) init_ok = false; continue; } - + if (!analyzer->init(path_, ros::NodeHandle(analyzers_nh, ns))) { ROS_ERROR("Unable to initialize analyzer NS: %s, type: %s", analyzers_nh.getNamespace().c_str(), an_type.c_str()); @@ -147,7 +147,7 @@ bool AnalyzerGroup::init(const string base_path, const ros::NodeHandle &n) init_ok = false; continue; } - + analyzers_.push_back(analyzer); } @@ -168,6 +168,7 @@ AnalyzerGroup::~AnalyzerGroup() bool AnalyzerGroup::addAnalyzer(boost::shared_ptr& analyzer) { analyzers_.push_back(analyzer); + resetMatches(); return true; } @@ -177,6 +178,7 @@ bool AnalyzerGroup::removeAnalyzer(boost::shared_ptr& analyzer) if (it != analyzers_.end()) { analyzers_.erase(it); + resetMatches(); return true; } return false; @@ -198,7 +200,7 @@ bool AnalyzerGroup::match(const string name) } return false; } - + matched_[name].resize(analyzers_.size()); for (unsigned int i = 0; i < analyzers_.size(); ++i) { @@ -227,7 +229,7 @@ bool AnalyzerGroup::analyze(const boost::shared_ptr item) if (mtch_vec[i]) analyzed = analyzers_[i]->analyze(item) || analyzed; } - + return analyzed; } @@ -245,7 +247,7 @@ vector > AnalyzerGroup::rep header_status->level = 2; header_status->message = "No analyzers"; output.push_back(header_status); - + if (header_status->name == "" || header_status->name == "/") header_status->name = "/AnalyzerGroup"; @@ -277,7 +279,7 @@ vector > AnalyzerGroup::rep diagnostic_msgs::KeyValue kv; kv.key = nice_name; kv.value = processed[i]->message; - + all_stale = all_stale && (processed[i]->level == 3); header_status->level = max(header_status->level, processed[i]->level); header_status->values.push_back(kv);