From 9dcf9272473c80c5113dceb9dfabd4599bfa925e Mon Sep 17 00:00:00 2001 From: Krzysztof Laskowski Date: Tue, 27 Sep 2022 14:31:03 +0000 Subject: [PATCH 1/4] CallbackQueue removeByID blocks unless callback self-removes (#2283) Making sure currently executing callback finishes before removing thread returns from removeByID allows to avoid race condition in a case when e.g. ros::Timer held in a class and capturing `this` is stopped just before destruction of the class and its data members: https://github.com/aurzenligl/study/blob/master/ros-timer/src/race.cpp --- clients/roscpp/src/libros/callback_queue.cpp | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/clients/roscpp/src/libros/callback_queue.cpp b/clients/roscpp/src/libros/callback_queue.cpp index 165b3357ab..9a3cfa3656 100644 --- a/clients/roscpp/src/libros/callback_queue.cpp +++ b/clients/roscpp/src/libros/callback_queue.cpp @@ -166,9 +166,10 @@ void CallbackQueue::removeByID(uint64_t removal_id) } { - boost::unique_lock rw_lock(id_info->calling_rw_mutex, boost::defer_lock); - if (rw_lock.try_lock()) + // Unless we're removing from callback, we lock the calling mutex to ensure callback is not being executed. + if (tls_->calling_in_this_thread != id_info->id) { + boost::unique_lock rw_lock(id_info->calling_rw_mutex); boost::mutex::scoped_lock lock(mutex_); D_CallbackInfo::iterator it = callbacks_.begin(); for (; it != callbacks_.end();) @@ -186,8 +187,8 @@ void CallbackQueue::removeByID(uint64_t removal_id) } else { - // We failed to acquire the lock, it can be that we are trying to remove something from the callback queue - // while it is being executed. Mark it for removal and let it be cleaned up later. + // Since we're removing from callback, locking twice would deadlock. + // Instead, mark callback for removal and let it be cleaned up later. boost::mutex::scoped_lock lock(mutex_); for (D_CallbackInfo::iterator it = callbacks_.begin(); it != callbacks_.end(); it++) { From 35aab20d37393f3f03024f507aeb24a3819f2c6f Mon Sep 17 00:00:00 2001 From: Krzysztof Laskowski Date: Tue, 27 Sep 2022 15:14:37 +0000 Subject: [PATCH 2/4] Remove trailing whitespace (#2283) --- test/test_roscpp/test/test_callback_queue.cpp | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/test/test_roscpp/test/test_callback_queue.cpp b/test/test_roscpp/test/test_callback_queue.cpp index a642e60a7e..fc4a97dc1c 100644 --- a/test/test_roscpp/test/test_callback_queue.cpp +++ b/test/test_roscpp/test/test_callback_queue.cpp @@ -177,7 +177,7 @@ TEST(CallbackQueue, removeSelf) queue.callOne(); queue.addCallback(cb2, 1); - + queue.callAvailable(); EXPECT_EQ(cb1->count, 1U); @@ -549,6 +549,3 @@ int main(int argc, char** argv) testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); } - - - From 25fb5a86e8e256c3abec1efc2cb1b981bf079d2a Mon Sep 17 00:00:00 2001 From: Krzysztof Laskowski Date: Tue, 27 Sep 2022 15:17:04 +0000 Subject: [PATCH 3/4] Fix for testcase assuming non-blocking removeByID (#2283) Rationale for the testcase was the following deadlock scenario: https://gist.github.com/iwanders/ede48fb649fd47f9b1f9a52c527b463c Changed testcase presents how the same scenario can be carried out with blocking removeByID (with exception for self-removal). The external mutex from the scenario must be unlocked for the call of ros::Timer::stop, otherwise scenario stays as it is. External thread returns from removeByID once cb call finishes, spinner thread returns immediately. --- test/test_roscpp/test/test_callback_queue.cpp | 90 +++++++++---------- 1 file changed, 45 insertions(+), 45 deletions(-) diff --git a/test/test_roscpp/test/test_callback_queue.cpp b/test/test_roscpp/test/test_callback_queue.cpp index fc4a97dc1c..d0666bc077 100644 --- a/test/test_roscpp/test/test_callback_queue.cpp +++ b/test/test_roscpp/test/test_callback_queue.cpp @@ -42,6 +42,7 @@ #include #include #include +#include #include using namespace ros; @@ -66,6 +67,26 @@ class CountingCallback : public CallbackInterface }; typedef boost::shared_ptr CountingCallbackPtr; +struct CustomCallback : public CallbackInterface +{ + template + CustomCallback(Func fun) + : function(fun), count(0) + {} + + virtual CallResult call() + { + function(); + ++count; + + return Success; + } + + boost::function function; + size_t count; +}; +typedef boost::shared_ptr CustomCallbackPtr; + void callAvailableThread(CallbackQueue* queue, bool& done) { while (!done) @@ -184,73 +205,52 @@ TEST(CallbackQueue, removeSelf) EXPECT_EQ(cb2->count, 1U); } -class BlockingCallback : public CallbackInterface -{ -public: - BlockingCallback() - : count(0) - {} - - virtual CallResult call() - { - mutex_.lock(); - ++count; - - return Success; - } - - boost::mutex mutex_; - size_t count; -}; -typedef boost::shared_ptr BlockingCallbackPtr; - - -// This test checks whether removing callbacks by an id doesn't block if one of those callback is being executed. -TEST(CallbackQueue, removeCallbackWhileExecuting) +// This test checks whether self-removing callbacks by an id doesn't block if one of those callback is being executed. +TEST(CallbackQueue, selfRemoveCallbackWhileExecuting) { const uint64_t owner_id = 1; const uint64_t unrelated_id = 2; + boost::mutex external_mtx; + CallbackQueue queue; - BlockingCallbackPtr cb1(boost::make_shared()); + CustomCallbackPtr cb1(boost::make_shared([&]() { + boost::unique_lock external_lock(external_mtx); + boost::this_thread::sleep_for(boost::chrono::milliseconds(300)); + + { + boost::reverse_lock> unlocker(external_lock); + queue.removeByID(owner_id); // external thread blocks here, spinner doesn't + } + })); CountingCallbackPtr cb2(boost::make_shared()); CountingCallbackPtr cb3(boost::make_shared()); - cb1->mutex_.lock(); // lock the mutex to ensure the blocking callback will stall processing of callback queue. - - queue.addCallback(cb1, owner_id); // Add the blocking callback. - - // Now, we need to serve the callback queue from another thread. - bool done = false; - boost::thread t = boost::thread(boost::bind(&callAvailableThread, &queue, boost::ref(done))); - - ros::WallDuration(1.0).sleep(); // Callback 1 should be being served now. - + queue.addCallback(cb1, owner_id); // Add the self-removing callback. queue.addCallback(cb2, unrelated_id); // Add a second callback with different owner. queue.addCallback(cb3, owner_id); // Add a third with same owner, this one should never get executed. - // Now try to remove the callback that's being executed. - queue.removeByID(owner_id); // This should not block because cb1 is being served, it should prevent cb3 from running. - - ros::WallDuration(1.0).sleep(); + // Let's use an external thread to execute cb function and hold its external lock + boost::thread t1([&]() { cb1->call(); }); + boost::this_thread::sleep_for(boost::chrono::milliseconds(100)); - // The removeByID should not block, so now we can unblock the blocking callback. - cb1->mutex_.unlock(); // This allows processing of cb1 to continue. + // Now, we need to serve the callback queue from another thread. + bool done = false; + boost::thread t2([&]() { callAvailableThread(&queue, done); }); while (!queue.isEmpty()) // Wait until the queue is empty. { ros::WallDuration(0.01).sleep(); } - // Properly shut down our callback serving thread. + // Properly shut down our threads. done = true; - t.join(); + t2.join(); + t1.join(); - EXPECT_EQ(cb1->count, 1U); + EXPECT_EQ(cb1->count, 2U); EXPECT_EQ(cb2->count, 1U); EXPECT_EQ(cb3->count, 0U); - - cb1->mutex_.unlock(); // Ensure the mutex is unlocked before destruction. } class RecursiveCallback : public CallbackInterface From 6f31e6284712bb023b7755ed0b5d3f727e6dac21 Mon Sep 17 00:00:00 2001 From: Krzysztof Laskowski Date: Tue, 27 Sep 2022 15:14:45 +0000 Subject: [PATCH 4/4] New testcase checks removeByID blocks until cb call finishes (#2283) --- test/test_roscpp/test/test_callback_queue.cpp | 31 +++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/test/test_roscpp/test/test_callback_queue.cpp b/test/test_roscpp/test/test_callback_queue.cpp index d0666bc077..2f6607446f 100644 --- a/test/test_roscpp/test/test_callback_queue.cpp +++ b/test/test_roscpp/test/test_callback_queue.cpp @@ -253,6 +253,37 @@ TEST(CallbackQueue, selfRemoveCallbackWhileExecuting) EXPECT_EQ(cb3->count, 0U); } +// This test checks whether non-spinner thread blocks on removeByID until currently executing callback finishes +TEST(CallbackQueue, removeCallbackWhileExecuting) +{ + const uint64_t cb_id = 1; + boost::barrier barrier(2); + + CallbackQueue queue; + CustomCallbackPtr cb(boost::make_shared([&]() { + barrier.wait(); + barrier.wait(); + })); + queue.addCallback(cb, cb_id); + + // Let's ensure spinner thread executes callback now + bool done = false; + boost::thread t1([&]() { callAvailableThread(&queue, done); }); + barrier.wait(); + + // External removing thread blocks on removeByID + boost::thread t2([&]() { queue.removeByID(cb_id); }); + EXPECT_FALSE(t2.try_join_for(boost::chrono::milliseconds(200))); // removebyID blocks until cb finishes + + // When callback finishes, external thread proceeds + barrier.wait(); + t2.join(); + EXPECT_EQ(cb->count, 1U); + + done = true; + t1.join(); +} + class RecursiveCallback : public CallbackInterface { public: