Skip to content

Commit

Permalink
Fix tsan potential deadlock between StatefulWriter and `FlowControl…
Browse files Browse the repository at this point in the history
…ler` (#5432)

* Refs #22339: Add BB test

Signed-off-by: Mario Dominguez <[email protected]>

* Refs #22339: Fix tsan deadlock report

Signed-off-by: Mario Dominguez <[email protected]>

* Refs #22339: Take writer's mutex before rproxy->stop() and check_acked_status()

Signed-off-by: Mario Dominguez <[email protected]>

* Refs #22339: Apply Miguels suggestion

Signed-off-by: Mario Dominguez <[email protected]>

---------

Signed-off-by: Mario Dominguez <[email protected]>
(cherry picked from commit 8fcd7ca)

# Conflicts:
#	src/cpp/rtps/writer/StatefulWriter.cpp
#	test/blackbox/common/DDSBlackboxTestsBasic.cpp
  • Loading branch information
Mario-DL authored and mergify[bot] committed Dec 13, 2024
1 parent 28552ce commit b1b3187
Show file tree
Hide file tree
Showing 2 changed files with 187 additions and 35 deletions.
75 changes: 40 additions & 35 deletions src/cpp/rtps/writer/StatefulWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1233,55 +1233,58 @@ bool StatefulWriter::matched_reader_remove(
{
ReaderProxy* rproxy = nullptr;
std::unique_lock<RecursiveTimedMutex> lock(mp_mutex);
std::unique_lock<LocatorSelectorSender> guard_locator_selector_general(locator_selector_general_);
std::unique_lock<LocatorSelectorSender> guard_locator_selector_async(locator_selector_async_);

for (ReaderProxyIterator it = matched_local_readers_.begin();
it != matched_local_readers_.end(); ++it)
{
if ((*it)->guid() == reader_guid)
{
EPROSIMA_LOG_INFO(RTPS_WRITER, "Reader Proxy removed: " << reader_guid);
rproxy = std::move(*it);
it = matched_local_readers_.erase(it);
break;
}
}
std::lock_guard<LocatorSelectorSender> guard_locator_selector_general(locator_selector_general_);
std::lock_guard<LocatorSelectorSender> guard_locator_selector_async(locator_selector_async_);

if (rproxy == nullptr)
{
for (ReaderProxyIterator it = matched_datasharing_readers_.begin();
it != matched_datasharing_readers_.end(); ++it)
for (ReaderProxyIterator it = matched_local_readers_.begin();
it != matched_local_readers_.end(); ++it)
{
if ((*it)->guid() == reader_guid)
{
EPROSIMA_LOG_INFO(RTPS_WRITER, "Reader Proxy removed: " << reader_guid);
rproxy = std::move(*it);
it = matched_datasharing_readers_.erase(it);
it = matched_local_readers_.erase(it);
break;
}
}
}

if (rproxy == nullptr)
{
for (ReaderProxyIterator it = matched_remote_readers_.begin();
it != matched_remote_readers_.end(); ++it)
if (rproxy == nullptr)
{
if ((*it)->guid() == reader_guid)
for (ReaderProxyIterator it = matched_datasharing_readers_.begin();
it != matched_datasharing_readers_.end(); ++it)
{
EPROSIMA_LOG_INFO(RTPS_WRITER, "Reader Proxy removed: " << reader_guid);
rproxy = std::move(*it);
it = matched_remote_readers_.erase(it);
break;
if ((*it)->guid() == reader_guid)
{
EPROSIMA_LOG_INFO(RTPS_WRITER, "Reader Proxy removed: " << reader_guid);
rproxy = std::move(*it);
it = matched_datasharing_readers_.erase(it);
break;
}
}
}
}

locator_selector_general_.locator_selector.remove_entry(reader_guid);
locator_selector_async_.locator_selector.remove_entry(reader_guid);
update_reader_info(locator_selector_general_, false);
update_reader_info(locator_selector_async_, false);
if (rproxy == nullptr)
{
for (ReaderProxyIterator it = matched_remote_readers_.begin();
it != matched_remote_readers_.end(); ++it)
{
if ((*it)->guid() == reader_guid)
{
EPROSIMA_LOG_INFO(RTPS_WRITER, "Reader Proxy removed: " << reader_guid);
rproxy = std::move(*it);
it = matched_remote_readers_.erase(it);
break;
}
}
}

locator_selector_general_.locator_selector.remove_entry(reader_guid);
locator_selector_async_.locator_selector.remove_entry(reader_guid);
update_reader_info(locator_selector_general_, false);
update_reader_info(locator_selector_async_, false);
}

if (getMatchedReadersSize() == 0)
{
Expand All @@ -1297,12 +1300,14 @@ bool StatefulWriter::matched_reader_remove(

if (nullptr != mp_listener)
{
// call the listener without locks taken
guard_locator_selector_async.unlock();
guard_locator_selector_general.unlock();
// listener is called without locks taken
lock.unlock();
<<<<<<< HEAD

mp_listener->on_reader_discovery(this, ReaderDiscoveryInfo::REMOVED_READER, reader_guid, nullptr);
=======
listener_->on_reader_discovery(this, ReaderDiscoveryStatus::REMOVED_READER, reader_guid, nullptr);
>>>>>>> 8fcd7ca48 (Fix tsan potential deadlock between `StatefulWriter` and `FlowController` (#5432))
}
return true;
}
Expand Down
147 changes: 147 additions & 0 deletions test/blackbox/common/DDSBlackboxTestsBasic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -899,6 +899,153 @@ TEST(DDSBasic, max_output_message_size_writer)

}

<<<<<<< HEAD
=======
/**
* @test This test checks that it is possible to register two TypeSupport instances of the same type
* under the same DomainParticipant.
*/
TEST(DDSBasic, register_two_identical_typesupports)
{
// Set DomainParticipantFactory to create disabled entities
DomainParticipantFactory* factory = DomainParticipantFactory::get_instance();
ASSERT_NE(nullptr, factory);

// Create a disabled DomainParticipant, setting it to in turn create disable entities
DomainParticipant* participant = factory->create_participant((uint32_t)GET_PID() % 230, PARTICIPANT_QOS_DEFAULT);
ASSERT_NE(nullptr, participant);

// Register a type support
TypeSupport type_support_1;
type_support_1.reset(new HelloWorldPubSubType());
EXPECT_EQ(RETCODE_OK, participant->register_type(type_support_1));

// Register a second instance of the type support with the same TopicDataType
TypeSupport type_support_2;
type_support_2.reset(new HelloWorldPubSubType());
EXPECT_EQ(RETCODE_OK, participant->register_type(type_support_2));
}

/**
* @test This is a regression test for Redmine Issue 21293.
* The destruction among intra-process participants should be correctly performed.
* local_reader() has to return a valid pointer.
*
*/
TEST(DDSBasic, successful_destruction_among_intraprocess_participants)
{
namespace dds = eprosima::fastdds::dds;
auto factory = dds::DomainParticipantFactory::get_instance();

// Set intraprocess delivery to full
LibrarySettings library_settings;
factory->get_library_settings(library_settings);
auto old_library_settings = library_settings;
library_settings.intraprocess_delivery = INTRAPROCESS_FULL;
factory->set_library_settings(library_settings);

{
auto participant_1 = std::make_shared<PubSubParticipant<HelloWorldPubSubType>>(1u, 1u, 1u, 1u);

ASSERT_TRUE(participant_1->init_participant());
participant_1->pub_topic_name(TEST_TOPIC_NAME);
ASSERT_TRUE(participant_1->init_publisher(0u));
participant_1->sub_topic_name(TEST_TOPIC_NAME + "_Return");
ASSERT_TRUE(participant_1->init_subscriber(0u));

std::vector<std::shared_ptr<PubSubParticipant<HelloWorldPubSubType>>> reception_participants;

size_t num_reception_participants = 50;

for (size_t i = 0; i < num_reception_participants; i++)
{
reception_participants.push_back(std::make_shared<PubSubParticipant<HelloWorldPubSubType>>(1u, 1u, 1u, 1u));
ASSERT_TRUE(reception_participants.back()->init_participant());
reception_participants.back()->sub_topic_name(TEST_TOPIC_NAME);
ASSERT_TRUE(reception_participants.back()->init_subscriber(0u));
reception_participants.back()->pub_topic_name(TEST_TOPIC_NAME + "_Return");
ASSERT_TRUE(reception_participants.back()->init_publisher(0u));
}

participant_1->wait_discovery(std::chrono::seconds::zero(), (uint8_t)num_reception_participants, true);

participant_1->pub_wait_discovery((unsigned int)num_reception_participants);
participant_1->sub_wait_discovery((unsigned int)num_reception_participants);

auto data_12 = default_helloworld_data_generator();

std::thread p1_thread([&participant_1, &data_12]()
{
auto data_size = data_12.size();
for (size_t i = 0; i < data_size; i++)
{
participant_1->send_sample(data_12.back());
data_12.pop_back();
}
});

std::vector<std::thread> reception_threads;
reception_threads.reserve(num_reception_participants);
for (auto& reception_participant : reception_participants)
{
reception_threads.emplace_back([&reception_participant]()
{
auto data_21 = default_helloworld_data_generator();
for (auto& data : data_21)
{
reception_participant->send_sample(data);
}

reception_participant.reset();
});
}

p1_thread.join();
for (auto& rec_thread : reception_threads)
{
rec_thread.join();
}
}
}
TEST(DDSBasic, reliable_volatile_writer_secure_builtin_no_potential_deadlock)
{
// Create
PubSubWriter<HelloWorldPubSubType> writer("HelloWorldTopic_no_potential_deadlock");
PubSubReader<HelloWorldPubSubType> reader("HelloWorldTopic_no_potential_deadlock");

writer.asynchronously(eprosima::fastdds::dds::ASYNCHRONOUS_PUBLISH_MODE)
.durability_kind(eprosima::fastdds::dds::VOLATILE_DURABILITY_QOS)
.history_kind(eprosima::fastdds::dds::KEEP_LAST_HISTORY_QOS)
.history_depth(20)
.init();

ASSERT_TRUE(writer.isInitialized());

reader.reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS)
.history_kind(eprosima::fastdds::dds::KEEP_LAST_HISTORY_QOS)
.history_depth(20)
.durability_kind(eprosima::fastdds::dds::VOLATILE_DURABILITY_QOS)
.init();

ASSERT_TRUE(reader.isInitialized());

auto data = default_helloworld_data_generator(30);

std::thread th([&]()
{
reader.startReception(data);
reader.block_for_at_least(5);
});

writer.wait_discovery();
writer.send(data);

th.join();
reader.destroy();
writer.destroy();
}

>>>>>>> 8fcd7ca48 (Fix tsan potential deadlock between `StatefulWriter` and `FlowController` (#5432))
} // namespace dds
} // namespace fastdds
} // namespace eprosima

0 comments on commit b1b3187

Please sign in to comment.