From 5de9737d1db56934d7df13cc56c136f80c146aa1 Mon Sep 17 00:00:00 2001 From: Mark Johnson <104826595+rr-mark@users.noreply.github.com> Date: Thu, 6 Feb 2025 02:27:04 +0000 Subject: [PATCH] Ports moveit #3676 and #3682 (#3283) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Use separate callback queue + spinner for ExecuteTrajectoryAction (#3676) This allows parallel execution + planning. Also required modifying updateSceneWithCurrentState() to allow skipping a scene update with a new robot state (from CurrentStateMonitor), if the planning scene is currently locked (due to planning). Otherwise, the CurrentStateMonitor would block too. Co-authored-by: Robert Haschke * PSM: simplify state_update_pending_ (#3682) * Move update of state_update_pending_ to updateSceneWithCurrentState() * Revert to try_lock While there are a few other locks except explicit user locks (getPlanningSceneServiceCallback(), collisionObjectCallback(), attachObjectCallback(), newPlanningSceneCallback(), and scenePublishingThread()), these occur rather seldom (scenePublishingThread() publishes at 2Hz). Hence, we might indeed balance a non-blocking CSM vs. missed PS updates in favour of CSM. * Don't block for scene update from stateUpdateTimerCallback too The timer callback and CSM's state update callbacks are served from the same callback queue, which would block CSM again. * further locking adaptations reading dt_state_update_ and last_robot_state_update_wall_time_ does not lead to logic errors, but at most to a skipped or redundant update on corrupted data. Alternatively we could be on the safe side and turn both variables into std::atomic, but that would effectively mean locks on every read. Instead, only set state_update_pending_ as an atomic, which is lockfree in this case. Co-authored-by: Michael Görner * Ports changes to ROS2. --------- Co-authored-by: Robert Haschke Co-authored-by: Robert Haschke Co-authored-by: Michael Görner Co-authored-by: Sebastian Castro <4603398+sea-bass@users.noreply.github.com> Co-authored-by: Sebastian Jahr (cherry picked from commit ba35aaa58ca76762e531e932475cbfbbcd628702) --- .../execute_trajectory_action_capability.cpp | 15 ++- .../execute_trajectory_action_capability.hpp | 6 + .../planning_scene_monitor.hpp | 28 ++--- .../src/planning_scene_monitor.cpp | 103 +++++++----------- 4 files changed, 72 insertions(+), 80 deletions(-) diff --git a/moveit_ros/move_group/src/default_capabilities/execute_trajectory_action_capability.cpp b/moveit_ros/move_group/src/default_capabilities/execute_trajectory_action_capability.cpp index 602b16a6ee..3f31cb3b07 100644 --- a/moveit_ros/move_group/src/default_capabilities/execute_trajectory_action_capability.cpp +++ b/moveit_ros/move_group/src/default_capabilities/execute_trajectory_action_capability.cpp @@ -57,9 +57,21 @@ MoveGroupExecuteTrajectoryAction::MoveGroupExecuteTrajectoryAction() : MoveGroup { } +MoveGroupExecuteTrajectoryAction::~MoveGroupExecuteTrajectoryAction() +{ + callback_executor_.cancel(); + + if (callback_thread_.joinable()) + callback_thread_.join(); +} + void MoveGroupExecuteTrajectoryAction::initialize() { auto node = context_->moveit_cpp_->getNode(); + callback_group_ = node->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive, + false /* don't spin with node executor */); + callback_executor_.add_callback_group(callback_group_, node->get_node_base_interface()); + callback_thread_ = std::thread([this]() { callback_executor_.spin(); }); // start the move action server execute_action_server_ = rclcpp_action::create_server( node->get_node_base_interface(), node->get_node_clock_interface(), node->get_node_logging_interface(), @@ -68,7 +80,8 @@ void MoveGroupExecuteTrajectoryAction::initialize() return rclcpp_action::GoalResponse::ACCEPT_AND_EXECUTE; }, [](const std::shared_ptr& /* unused */) { return rclcpp_action::CancelResponse::ACCEPT; }, - [this](const auto& goal) { executePathCallback(goal); }); + [this](const auto& goal) { executePathCallback(goal); }, rcl_action_server_get_default_options(), + callback_group_); } void MoveGroupExecuteTrajectoryAction::executePathCallback(const std::shared_ptr& goal) diff --git a/moveit_ros/move_group/src/default_capabilities/execute_trajectory_action_capability.hpp b/moveit_ros/move_group/src/default_capabilities/execute_trajectory_action_capability.hpp index 174e041f16..b37bf8b316 100644 --- a/moveit_ros/move_group/src/default_capabilities/execute_trajectory_action_capability.hpp +++ b/moveit_ros/move_group/src/default_capabilities/execute_trajectory_action_capability.hpp @@ -55,6 +55,7 @@ class MoveGroupExecuteTrajectoryAction : public MoveGroupCapability { public: MoveGroupExecuteTrajectoryAction(); + ~MoveGroupExecuteTrajectoryAction() override; void initialize() override; @@ -65,6 +66,11 @@ class MoveGroupExecuteTrajectoryAction : public MoveGroupCapability void preemptExecuteTrajectoryCallback(); void setExecuteTrajectoryState(MoveGroupState state, const std::shared_ptr& goal); + // Use our own callback group and executor to allow execution parallel to other capabilities + rclcpp::CallbackGroup::SharedPtr callback_group_; + rclcpp::executors::SingleThreadedExecutor callback_executor_; + std::thread callback_thread_; + std::shared_ptr> execute_action_server_; }; diff --git a/moveit_ros/planning/planning_scene_monitor/include/moveit/planning_scene_monitor/planning_scene_monitor.hpp b/moveit_ros/planning/planning_scene_monitor/include/moveit/planning_scene_monitor/planning_scene_monitor.hpp index b634928369..73d323d0ea 100644 --- a/moveit_ros/planning/planning_scene_monitor/include/moveit/planning_scene_monitor/planning_scene_monitor.hpp +++ b/moveit_ros/planning/planning_scene_monitor/include/moveit/planning_scene_monitor/planning_scene_monitor.hpp @@ -296,8 +296,9 @@ class MOVEIT_PLANNING_SCENE_MONITOR_EXPORT PlanningSceneMonitor /** @brief Update the scene using the monitored state. This function is automatically called when an update to the current state is received (if startStateMonitor() has been called). - The updates are throttled to a maximum update frequency however, which is set by setStateUpdateFrequency(). */ - void updateSceneWithCurrentState(); + The updates are throttled to a maximum update frequency however, which is set by setStateUpdateFrequency(). + @param skip_update_if_locked causes the update to be skipped if the planning scene is locked. */ + void updateSceneWithCurrentState(bool skip_update_if_locked = false); /** @brief Update the scene using the monitored state at a specified frequency, in Hz. This function has an effect only when updates from the CurrentStateMonitor are received at a higher frequency. @@ -557,15 +558,19 @@ class MOVEIT_PLANNING_SCENE_MONITOR_EXPORT PlanningSceneMonitor void updatePublishSettings(bool publish_geom_updates, bool publish_state_updates, bool publish_transform_updates, bool publish_planning_scene, double publish_planning_scene_hz); - // Lock for state_update_pending_ and dt_state_update_ - std::mutex state_pending_mutex_; + /// True if current_state_monitor_ has a newer RobotState than scene_ + std::atomic state_update_pending_; - /// True when we need to update the RobotState from current_state_monitor_ - // This field is protected by state_pending_mutex_ - volatile bool state_update_pending_; + // Lock for writing last_robot_state_update_wall_time_ and dt_state_update_ + std::mutex state_update_mutex_; + + /// Last time the state was updated from current_state_monitor_ + // Only access this from callback functions (and constructor) + // This field is protected by state_update_mutex_ + std::chrono::system_clock::time_point last_robot_state_update_wall_time_; /// the amount of time to wait in between updates to the robot state - // This field is protected by state_pending_mutex_ + // This field is protected by state_update_mutex_ std::chrono::duration dt_state_update_; // 1hz /// the amount of time to wait when looking up transforms @@ -574,15 +579,10 @@ class MOVEIT_PLANNING_SCENE_MONITOR_EXPORT PlanningSceneMonitor rclcpp::Duration shape_transform_cache_lookup_wait_time_; /// timer for state updates. - // Check if last_state_update_ is true and if so call updateSceneWithCurrentState() + // If state_update_pending_ is true, call updateSceneWithCurrentState() // Not safe to access from callback functions. - rclcpp::TimerBase::SharedPtr state_update_timer_; - /// Last time the state was updated from current_state_monitor_ - // Only access this from callback functions (and constructor) - std::chrono::system_clock::time_point last_robot_state_update_wall_time_; - robot_model_loader::RobotModelLoaderPtr rm_loader_; moveit::core::RobotModelConstPtr robot_model_; diff --git a/moveit_ros/planning/planning_scene_monitor/src/planning_scene_monitor.cpp b/moveit_ros/planning/planning_scene_monitor/src/planning_scene_monitor.cpp index 1995c34146..d6e59c53df 100644 --- a/moveit_ros/planning/planning_scene_monitor/src/planning_scene_monitor.cpp +++ b/moveit_ros/planning/planning_scene_monitor/src/planning_scene_monitor.cpp @@ -223,7 +223,7 @@ void PlanningSceneMonitor::initialize(const planning_scene::PlanningScenePtr& sc shape_transform_cache_lookup_wait_time_ = rclcpp::Duration::from_seconds(temp_wait_time); - state_update_pending_ = false; + state_update_pending_.store(false); // Period for 0.1 sec using std::chrono::nanoseconds; state_update_timer_ = pnode_->create_wall_timer(dt_state_update_, [this]() { return stateUpdateTimerCallback(); }); @@ -1110,12 +1110,8 @@ bool PlanningSceneMonitor::waitForCurrentRobotState(const rclcpp::Time& t, doubl If waitForCurrentState failed, we didn't get any new state updates within wait_time. */ if (success) { - std::unique_lock lock(state_pending_mutex_); - if (state_update_pending_) // enforce state update + if (state_update_pending_.load()) // perform state update { - state_update_pending_ = false; - last_robot_state_update_wall_time_ = std::chrono::system_clock::now(); - lock.unlock(); updateSceneWithCurrentState(); } return true; @@ -1354,7 +1350,7 @@ void PlanningSceneMonitor::startStateMonitor(const std::string& joint_states_top current_state_monitor_->startStateMonitor(joint_states_topic); { - std::unique_lock lock(state_pending_mutex_); + std::unique_lock lock(state_update_mutex_); if (dt_state_update_.count() > 0) { // ROS original: state_update_timer_.start(); @@ -1387,69 +1383,29 @@ void PlanningSceneMonitor::stopStateMonitor() if (attached_collision_object_subscriber_) attached_collision_object_subscriber_.reset(); - // stop must be called with state_pending_mutex_ unlocked to avoid deadlock if (state_update_timer_) state_update_timer_->cancel(); - { - std::unique_lock lock(state_pending_mutex_); - state_update_pending_ = false; - } + state_update_pending_.store(false); } void PlanningSceneMonitor::onStateUpdate(const sensor_msgs::msg::JointState::ConstSharedPtr& /*joint_state */) { - const std::chrono::system_clock::time_point& n = std::chrono::system_clock::now(); - std::chrono::duration dt = n - last_robot_state_update_wall_time_; - - bool update = false; - { - std::unique_lock lock(state_pending_mutex_); + state_update_pending_.store(true); - if (dt.count() < dt_state_update_.count()) - { - state_update_pending_ = true; - } - else - { - state_update_pending_ = false; - last_robot_state_update_wall_time_ = n; - update = true; - } - } - // run the state update with state_pending_mutex_ unlocked - if (update) - updateSceneWithCurrentState(); + // Read access to last_robot_state_update_wall_time_ and dt_state_update_ is unprotected here + // as reading invalid values is not critical (just postpones the next state update) + // only update every dt_state_update_ seconds + if (std::chrono::system_clock::now() - last_robot_state_update_wall_time_ >= dt_state_update_) + updateSceneWithCurrentState(true); } void PlanningSceneMonitor::stateUpdateTimerCallback() { - if (state_update_pending_) - { - bool update = false; - - std::chrono::system_clock::time_point n = std::chrono::system_clock::now(); - std::chrono::duration dt = n - last_robot_state_update_wall_time_; - - { - // lock for access to dt_state_update_ and state_update_pending_ - std::unique_lock lock(state_pending_mutex_); - if (state_update_pending_ && dt.count() >= dt_state_update_.count()) - { - state_update_pending_ = false; - last_robot_state_update_wall_time_ = std::chrono::system_clock::now(); - auto sec = std::chrono::duration(last_robot_state_update_wall_time_.time_since_epoch()).count(); - update = true; - RCLCPP_DEBUG(logger_, "performPendingStateUpdate: %f", fmod(sec, 10)); - } - } - - // run the state update with state_pending_mutex_ unlocked - if (update) - { - updateSceneWithCurrentState(); - RCLCPP_DEBUG(logger_, "performPendingStateUpdate done"); - } - } + // Read access to last_robot_state_update_wall_time_ and dt_state_update_ is unprotected here + // as reading invalid values is not critical (just postpones the next state update) + if (state_update_pending_.load() && + std::chrono::system_clock::now() - last_robot_state_update_wall_time_ >= dt_state_update_) + updateSceneWithCurrentState(true); } void PlanningSceneMonitor::octomapUpdateCallback() @@ -1481,7 +1437,7 @@ void PlanningSceneMonitor::setStateUpdateFrequency(double hz) bool update = false; if (hz > std::numeric_limits::epsilon()) { - std::unique_lock lock(state_pending_mutex_); + std::unique_lock lock(state_update_mutex_); dt_state_update_ = std::chrono::duration(1.0 / hz); // ROS original: state_update_timer_.start(); // TODO: re-enable WallTimer start() @@ -1489,14 +1445,14 @@ void PlanningSceneMonitor::setStateUpdateFrequency(double hz) } else { - // stop must be called with state_pending_mutex_ unlocked to avoid deadlock + // stop must be called with state_update_mutex_ unlocked to avoid deadlock // ROS original: state_update_timer_.stop(); // TODO: re-enable WallTimer stop() if (state_update_timer_) state_update_timer_->cancel(); - std::unique_lock lock(state_pending_mutex_); + std::unique_lock lock(state_update_mutex_); dt_state_update_ = std::chrono::duration(0.0); - if (state_update_pending_) + if (state_update_pending_.load()) update = true; } RCLCPP_INFO(logger_, "Updating internal planning scene state at most every %lf seconds", dt_state_update_.count()); @@ -1505,7 +1461,7 @@ void PlanningSceneMonitor::setStateUpdateFrequency(double hz) updateSceneWithCurrentState(); } -void PlanningSceneMonitor::updateSceneWithCurrentState() +void PlanningSceneMonitor::updateSceneWithCurrentState(bool skip_update_if_locked) { rclcpp::Time time = node_->now(); rclcpp::Clock steady_clock = rclcpp::Clock(RCL_STEADY_TIME); @@ -1524,12 +1480,29 @@ void PlanningSceneMonitor::updateSceneWithCurrentState() } { - std::unique_lock ulock(scene_update_mutex_); + std::unique_lock ulock(scene_update_mutex_, std::defer_lock); + if (!skip_update_if_locked) + { + ulock.lock(); + } + else if (!ulock.try_lock()) + { + // Return if we can't lock scene_update_mutex, thus not blocking CurrentStateMonitor + return; + } last_update_time_ = last_robot_motion_time_ = current_state_monitor_->getCurrentStateTime(); RCLCPP_DEBUG(logger_, "robot state update %f", fmod(last_robot_motion_time_.seconds(), 10.)); current_state_monitor_->setToCurrentState(scene_->getCurrentStateNonConst()); scene_->getCurrentStateNonConst().update(); // compute all transforms } + + // Update state_update_mutex_ and last_robot_state_update_wall_time_ + { + std::unique_lock lock(state_update_mutex_); + last_robot_state_update_wall_time_ = std::chrono::system_clock::now(); + state_update_pending_.store(false); + } + triggerSceneUpdateEvent(UPDATE_STATE); } else