Skip to content

Commit

Permalink
Ports moveit #3676 and #3682 (#3283)
Browse files Browse the repository at this point in the history
* Use separate callback queue + spinner for ExecuteTrajectoryAction (#3676)

This allows parallel execution + planning.

Also required modifying updateSceneWithCurrentState() to allow skipping a scene update with a new robot state (from CurrentStateMonitor), if the planning scene is currently locked (due to planning).
Otherwise, the CurrentStateMonitor would block too.

Co-authored-by: Robert Haschke <[email protected]>

* PSM: simplify state_update_pending_ (#3682)

* Move update of state_update_pending_ to updateSceneWithCurrentState()

* Revert to try_lock

While there are a few other locks except explicit user locks (getPlanningSceneServiceCallback(), collisionObjectCallback(), attachObjectCallback(), newPlanningSceneCallback(), and scenePublishingThread()), these occur rather seldom
(scenePublishingThread() publishes at 2Hz).

Hence, we might indeed balance a non-blocking CSM vs. missed PS updates in favour of CSM.

* Don't block for scene update from stateUpdateTimerCallback too

The timer callback and CSM's state update callbacks are served from the same callback queue, which would block CSM again.

* further locking adaptations

reading dt_state_update_ and last_robot_state_update_wall_time_
does not lead to logic errors, but at most to a skipped or redundant update on corrupted data.
Alternatively we could be on the safe side and turn both variables into std::atomic, but that
would effectively mean locks on every read.

Instead, only set state_update_pending_ as an atomic, which is lockfree in this case.

Co-authored-by: Michael Görner <[email protected]>

* Ports changes to ROS2.

---------

Co-authored-by: Robert Haschke <[email protected]>
Co-authored-by: Robert Haschke <[email protected]>
Co-authored-by: Michael Görner <[email protected]>
Co-authored-by: Sebastian Castro <[email protected]>
Co-authored-by: Sebastian Jahr <[email protected]>
(cherry picked from commit ba35aaa)
  • Loading branch information
rr-mark authored and mergify[bot] committed Feb 6, 2025
1 parent 969bd40 commit 5de9737
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,21 @@ MoveGroupExecuteTrajectoryAction::MoveGroupExecuteTrajectoryAction() : MoveGroup
{
}

MoveGroupExecuteTrajectoryAction::~MoveGroupExecuteTrajectoryAction()
{
callback_executor_.cancel();

if (callback_thread_.joinable())
callback_thread_.join();
}

void MoveGroupExecuteTrajectoryAction::initialize()
{
auto node = context_->moveit_cpp_->getNode();
callback_group_ = node->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive,
false /* don't spin with node executor */);
callback_executor_.add_callback_group(callback_group_, node->get_node_base_interface());
callback_thread_ = std::thread([this]() { callback_executor_.spin(); });
// start the move action server
execute_action_server_ = rclcpp_action::create_server<ExecTrajectory>(
node->get_node_base_interface(), node->get_node_clock_interface(), node->get_node_logging_interface(),
Expand All @@ -68,7 +80,8 @@ void MoveGroupExecuteTrajectoryAction::initialize()
return rclcpp_action::GoalResponse::ACCEPT_AND_EXECUTE;
},
[](const std::shared_ptr<ExecTrajectoryGoal>& /* unused */) { return rclcpp_action::CancelResponse::ACCEPT; },
[this](const auto& goal) { executePathCallback(goal); });
[this](const auto& goal) { executePathCallback(goal); }, rcl_action_server_get_default_options(),
callback_group_);
}

void MoveGroupExecuteTrajectoryAction::executePathCallback(const std::shared_ptr<ExecTrajectoryGoal>& goal)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class MoveGroupExecuteTrajectoryAction : public MoveGroupCapability
{
public:
MoveGroupExecuteTrajectoryAction();
~MoveGroupExecuteTrajectoryAction() override;

void initialize() override;

Expand All @@ -65,6 +66,11 @@ class MoveGroupExecuteTrajectoryAction : public MoveGroupCapability
void preemptExecuteTrajectoryCallback();
void setExecuteTrajectoryState(MoveGroupState state, const std::shared_ptr<ExecTrajectoryGoal>& goal);

// Use our own callback group and executor to allow execution parallel to other capabilities
rclcpp::CallbackGroup::SharedPtr callback_group_;
rclcpp::executors::SingleThreadedExecutor callback_executor_;
std::thread callback_thread_;

std::shared_ptr<rclcpp_action::Server<ExecTrajectory>> execute_action_server_;
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,8 +296,9 @@ class MOVEIT_PLANNING_SCENE_MONITOR_EXPORT PlanningSceneMonitor

/** @brief Update the scene using the monitored state. This function is automatically called when an update to the
current state is received (if startStateMonitor() has been called).
The updates are throttled to a maximum update frequency however, which is set by setStateUpdateFrequency(). */
void updateSceneWithCurrentState();
The updates are throttled to a maximum update frequency however, which is set by setStateUpdateFrequency().
@param skip_update_if_locked causes the update to be skipped if the planning scene is locked. */
void updateSceneWithCurrentState(bool skip_update_if_locked = false);

/** @brief Update the scene using the monitored state at a specified frequency, in Hz. This function has an effect
only when updates from the CurrentStateMonitor are received at a higher frequency.
Expand Down Expand Up @@ -557,15 +558,19 @@ class MOVEIT_PLANNING_SCENE_MONITOR_EXPORT PlanningSceneMonitor
void updatePublishSettings(bool publish_geom_updates, bool publish_state_updates, bool publish_transform_updates,
bool publish_planning_scene, double publish_planning_scene_hz);

// Lock for state_update_pending_ and dt_state_update_
std::mutex state_pending_mutex_;
/// True if current_state_monitor_ has a newer RobotState than scene_
std::atomic<bool> state_update_pending_;

/// True when we need to update the RobotState from current_state_monitor_
// This field is protected by state_pending_mutex_
volatile bool state_update_pending_;
// Lock for writing last_robot_state_update_wall_time_ and dt_state_update_
std::mutex state_update_mutex_;

/// Last time the state was updated from current_state_monitor_
// Only access this from callback functions (and constructor)
// This field is protected by state_update_mutex_
std::chrono::system_clock::time_point last_robot_state_update_wall_time_;

/// the amount of time to wait in between updates to the robot state
// This field is protected by state_pending_mutex_
// This field is protected by state_update_mutex_
std::chrono::duration<double> dt_state_update_; // 1hz

/// the amount of time to wait when looking up transforms
Expand All @@ -574,15 +579,10 @@ class MOVEIT_PLANNING_SCENE_MONITOR_EXPORT PlanningSceneMonitor
rclcpp::Duration shape_transform_cache_lookup_wait_time_;

/// timer for state updates.
// Check if last_state_update_ is true and if so call updateSceneWithCurrentState()
// If state_update_pending_ is true, call updateSceneWithCurrentState()
// Not safe to access from callback functions.

rclcpp::TimerBase::SharedPtr state_update_timer_;

/// Last time the state was updated from current_state_monitor_
// Only access this from callback functions (and constructor)
std::chrono::system_clock::time_point last_robot_state_update_wall_time_;

robot_model_loader::RobotModelLoaderPtr rm_loader_;
moveit::core::RobotModelConstPtr robot_model_;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ void PlanningSceneMonitor::initialize(const planning_scene::PlanningScenePtr& sc

shape_transform_cache_lookup_wait_time_ = rclcpp::Duration::from_seconds(temp_wait_time);

state_update_pending_ = false;
state_update_pending_.store(false);
// Period for 0.1 sec
using std::chrono::nanoseconds;
state_update_timer_ = pnode_->create_wall_timer(dt_state_update_, [this]() { return stateUpdateTimerCallback(); });
Expand Down Expand Up @@ -1110,12 +1110,8 @@ bool PlanningSceneMonitor::waitForCurrentRobotState(const rclcpp::Time& t, doubl
If waitForCurrentState failed, we didn't get any new state updates within wait_time. */
if (success)
{
std::unique_lock<std::mutex> lock(state_pending_mutex_);
if (state_update_pending_) // enforce state update
if (state_update_pending_.load()) // perform state update
{
state_update_pending_ = false;
last_robot_state_update_wall_time_ = std::chrono::system_clock::now();
lock.unlock();
updateSceneWithCurrentState();
}
return true;
Expand Down Expand Up @@ -1354,7 +1350,7 @@ void PlanningSceneMonitor::startStateMonitor(const std::string& joint_states_top
current_state_monitor_->startStateMonitor(joint_states_topic);

{
std::unique_lock<std::mutex> lock(state_pending_mutex_);
std::unique_lock<std::mutex> lock(state_update_mutex_);
if (dt_state_update_.count() > 0)
{
// ROS original: state_update_timer_.start();
Expand Down Expand Up @@ -1387,69 +1383,29 @@ void PlanningSceneMonitor::stopStateMonitor()
if (attached_collision_object_subscriber_)
attached_collision_object_subscriber_.reset();

// stop must be called with state_pending_mutex_ unlocked to avoid deadlock
if (state_update_timer_)
state_update_timer_->cancel();
{
std::unique_lock<std::mutex> lock(state_pending_mutex_);
state_update_pending_ = false;
}
state_update_pending_.store(false);
}

void PlanningSceneMonitor::onStateUpdate(const sensor_msgs::msg::JointState::ConstSharedPtr& /*joint_state */)
{
const std::chrono::system_clock::time_point& n = std::chrono::system_clock::now();
std::chrono::duration<double> dt = n - last_robot_state_update_wall_time_;

bool update = false;
{
std::unique_lock<std::mutex> lock(state_pending_mutex_);
state_update_pending_.store(true);

if (dt.count() < dt_state_update_.count())
{
state_update_pending_ = true;
}
else
{
state_update_pending_ = false;
last_robot_state_update_wall_time_ = n;
update = true;
}
}
// run the state update with state_pending_mutex_ unlocked
if (update)
updateSceneWithCurrentState();
// Read access to last_robot_state_update_wall_time_ and dt_state_update_ is unprotected here
// as reading invalid values is not critical (just postpones the next state update)
// only update every dt_state_update_ seconds
if (std::chrono::system_clock::now() - last_robot_state_update_wall_time_ >= dt_state_update_)
updateSceneWithCurrentState(true);
}

void PlanningSceneMonitor::stateUpdateTimerCallback()
{
if (state_update_pending_)
{
bool update = false;

std::chrono::system_clock::time_point n = std::chrono::system_clock::now();
std::chrono::duration<double> dt = n - last_robot_state_update_wall_time_;

{
// lock for access to dt_state_update_ and state_update_pending_
std::unique_lock<std::mutex> lock(state_pending_mutex_);
if (state_update_pending_ && dt.count() >= dt_state_update_.count())
{
state_update_pending_ = false;
last_robot_state_update_wall_time_ = std::chrono::system_clock::now();
auto sec = std::chrono::duration<double>(last_robot_state_update_wall_time_.time_since_epoch()).count();
update = true;
RCLCPP_DEBUG(logger_, "performPendingStateUpdate: %f", fmod(sec, 10));
}
}

// run the state update with state_pending_mutex_ unlocked
if (update)
{
updateSceneWithCurrentState();
RCLCPP_DEBUG(logger_, "performPendingStateUpdate done");
}
}
// Read access to last_robot_state_update_wall_time_ and dt_state_update_ is unprotected here
// as reading invalid values is not critical (just postpones the next state update)
if (state_update_pending_.load() &&
std::chrono::system_clock::now() - last_robot_state_update_wall_time_ >= dt_state_update_)
updateSceneWithCurrentState(true);
}

void PlanningSceneMonitor::octomapUpdateCallback()
Expand Down Expand Up @@ -1481,22 +1437,22 @@ void PlanningSceneMonitor::setStateUpdateFrequency(double hz)
bool update = false;
if (hz > std::numeric_limits<double>::epsilon())
{
std::unique_lock<std::mutex> lock(state_pending_mutex_);
std::unique_lock<std::mutex> lock(state_update_mutex_);
dt_state_update_ = std::chrono::duration<double>(1.0 / hz);
// ROS original: state_update_timer_.start();
// TODO: re-enable WallTimer start()
state_update_timer_ = pnode_->create_wall_timer(dt_state_update_, [this]() { return stateUpdateTimerCallback(); });
}
else
{
// stop must be called with state_pending_mutex_ unlocked to avoid deadlock
// stop must be called with state_update_mutex_ unlocked to avoid deadlock
// ROS original: state_update_timer_.stop();
// TODO: re-enable WallTimer stop()
if (state_update_timer_)
state_update_timer_->cancel();
std::unique_lock<std::mutex> lock(state_pending_mutex_);
std::unique_lock<std::mutex> lock(state_update_mutex_);
dt_state_update_ = std::chrono::duration<double>(0.0);
if (state_update_pending_)
if (state_update_pending_.load())
update = true;
}
RCLCPP_INFO(logger_, "Updating internal planning scene state at most every %lf seconds", dt_state_update_.count());
Expand All @@ -1505,7 +1461,7 @@ void PlanningSceneMonitor::setStateUpdateFrequency(double hz)
updateSceneWithCurrentState();
}

void PlanningSceneMonitor::updateSceneWithCurrentState()
void PlanningSceneMonitor::updateSceneWithCurrentState(bool skip_update_if_locked)
{
rclcpp::Time time = node_->now();
rclcpp::Clock steady_clock = rclcpp::Clock(RCL_STEADY_TIME);
Expand All @@ -1524,12 +1480,29 @@ void PlanningSceneMonitor::updateSceneWithCurrentState()
}

{
std::unique_lock<std::shared_mutex> ulock(scene_update_mutex_);
std::unique_lock<std::shared_mutex> ulock(scene_update_mutex_, std::defer_lock);
if (!skip_update_if_locked)
{
ulock.lock();
}
else if (!ulock.try_lock())
{
// Return if we can't lock scene_update_mutex, thus not blocking CurrentStateMonitor
return;
}
last_update_time_ = last_robot_motion_time_ = current_state_monitor_->getCurrentStateTime();
RCLCPP_DEBUG(logger_, "robot state update %f", fmod(last_robot_motion_time_.seconds(), 10.));
current_state_monitor_->setToCurrentState(scene_->getCurrentStateNonConst());
scene_->getCurrentStateNonConst().update(); // compute all transforms
}

// Update state_update_mutex_ and last_robot_state_update_wall_time_
{
std::unique_lock<std::mutex> lock(state_update_mutex_);
last_robot_state_update_wall_time_ = std::chrono::system_clock::now();
state_update_pending_.store(false);
}

triggerSceneUpdateEvent(UPDATE_STATE);
}
else
Expand Down

0 comments on commit 5de9737

Please sign in to comment.