Skip to content

Commit

Permalink
Release participant_stateless secure builtin writer history change …
Browse files Browse the repository at this point in the history
…when authentication has finished (#5386) (#5393)

* Release `participant_stateless` secure builtin writer history change when authentication has finished (#5386)

* TMP: REMOVE THIS COMMIT

Signed-off-by: Mario Dominguez <[email protected]>

* Refs #22033: BB test

Signed-off-by: Mario Dominguez <[email protected]>

* Refs #22033: Modify secure builtins initial payload size

Signed-off-by: Mario Dominguez <[email protected]>

* Refs #22033: Fix: release stateless msg payload pool when participant cryptography succeeds

Signed-off-by: Mario Dominguez <[email protected]>

---------

Signed-off-by: Mario Dominguez <[email protected]>
(cherry picked from commit b414621)

* Update waitAuthorized in PubSub fastrtps_deprecated API

Signed-off-by: Mario Dominguez <[email protected]>

* Fix windows compilation error

Signed-off-by: Mario Dominguez <[email protected]>

---------

Signed-off-by: Mario Dominguez <[email protected]>
Co-authored-by: Mario Domínguez López <[email protected]>
Co-authored-by: Mario Dominguez <[email protected]>
  • Loading branch information
3 people authored Nov 19, 2024
1 parent f743ee1 commit 7f54166
Show file tree
Hide file tree
Showing 7 changed files with 229 additions and 30 deletions.
33 changes: 28 additions & 5 deletions src/cpp/rtps/security/SecurityManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1070,11 +1070,13 @@ void SecurityManager::delete_participant_stateless_message_entities()
void SecurityManager::create_participant_stateless_message_pool()
{
participant_stateless_message_writer_hattr_ =
{ PREALLOCATED_WITH_REALLOC_MEMORY_MODE, participant_->getMaxMessageSize(), 20, 100 };
{ PREALLOCATED_WITH_REALLOC_MEMORY_MODE, static_cast<uint32_t>(PARTICIPANT_STATELESS_MESSAGE_PAYLOAD_DEFAULT_SIZE),
20, 100};
participant_stateless_message_reader_hattr_ =
{ PREALLOCATED_WITH_REALLOC_MEMORY_MODE, participant_->getMaxMessageSize(), 10, 5000 };
{ PREALLOCATED_WITH_REALLOC_MEMORY_MODE, static_cast<uint32_t>(PARTICIPANT_STATELESS_MESSAGE_PAYLOAD_DEFAULT_SIZE),
10, 5000};

BasicPoolConfig cfg{ PREALLOCATED_WITH_REALLOC_MEMORY_MODE, participant_->getMaxMessageSize() };
BasicPoolConfig cfg{ PREALLOCATED_WITH_REALLOC_MEMORY_MODE, PARTICIPANT_STATELESS_MESSAGE_PAYLOAD_DEFAULT_SIZE};
participant_stateless_message_pool_ = TopicPayloadPoolRegistry::get("DCPSParticipantStatelessMessage", cfg);

PoolConfig writer_cfg = PoolConfig::from_history_attributes(participant_stateless_message_writer_hattr_);
Expand Down Expand Up @@ -1226,7 +1228,8 @@ void SecurityManager::delete_participant_volatile_message_secure_entities()
void SecurityManager::create_participant_volatile_message_secure_pool()
{
participant_volatile_message_secure_hattr_ =
{ PREALLOCATED_WITH_REALLOC_MEMORY_MODE, participant_->getMaxMessageSize(), 10, 0 };
{ PREALLOCATED_WITH_REALLOC_MEMORY_MODE, static_cast<uint32_t>(PARTICIPANT_VOLATILE_MESSAGE_PAYLOAD_DEFAULT_SIZE),
10, 0 };

PoolConfig pool_cfg = PoolConfig::from_history_attributes(participant_volatile_message_secure_hattr_);
participant_volatile_message_secure_pool_ =
Expand Down Expand Up @@ -1725,6 +1728,7 @@ void SecurityManager::process_participant_volatile_message_secure(
const GUID_t remote_participant_key(message.message_identity().source_guid().guidPrefix,
c_EntityId_RTPSParticipant);
std::shared_ptr<ParticipantCryptoHandle> remote_participant_crypto;
DiscoveredParticipantInfo::AuthUniquePtr remote_participant_info;

// Search remote participant crypto handle.
{
Expand All @@ -1740,6 +1744,7 @@ void SecurityManager::process_participant_volatile_message_secure(
}

remote_participant_crypto = dp_it->second->get_participant_crypto();
remote_participant_info = dp_it->second->get_auth();
}
else
{
Expand All @@ -1761,12 +1766,30 @@ void SecurityManager::process_participant_volatile_message_secure(
EPROSIMA_LOG_ERROR(SECURITY, "Cannot set remote participant crypto tokens ("
<< remote_participant_key << ") - (" << exception.what() << ")");
}
else
{
// Release the change from the participant_stateless_message_writer_pool_
// As both participants have already authorized each other

if (remote_participant_info &&
remote_participant_info->change_sequence_number_ != SequenceNumber_t::unknown())
{
participant_stateless_message_writer_history_->remove_change(
remote_participant_info->change_sequence_number_);
remote_participant_info->change_sequence_number_ = SequenceNumber_t::unknown();
}
}
}
else
{
std::lock_guard<shared_mutex> _(mutex_);
remote_participant_pending_messages_.emplace(remote_participant_key, std::move(message.message_data()));
}

if (remote_participant_info)
{
restore_discovered_participant_info(remote_participant_key, remote_participant_info);
}
}
else if (message.message_class_id().compare(GMCLASSID_SECURITY_READER_CRYPTO_TOKENS) == 0)
{
Expand Down Expand Up @@ -1922,7 +1945,7 @@ void SecurityManager::process_participant_volatile_message_secure(
}
else
{
EPROSIMA_LOG_INFO(SECURITY, "Discarted ParticipantGenericMessage with class id " << message.message_class_id());
EPROSIMA_LOG_INFO(SECURITY, "Discarded ParticipantGenericMessage with class id " << message.message_class_id());
}
}

Expand Down
10 changes: 9 additions & 1 deletion src/cpp/rtps/security/SecurityManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ struct EndpointSecurityAttributes;
*/
class SecurityManager : private WriterListener
{
static constexpr std::size_t PARTICIPANT_STATELESS_MESSAGE_PAYLOAD_DEFAULT_SIZE = 8192;
static constexpr std::size_t PARTICIPANT_VOLATILE_MESSAGE_PAYLOAD_DEFAULT_SIZE = 1024;

public:

/**
Expand Down Expand Up @@ -403,14 +406,19 @@ class SecurityManager : private WriterListener
}

AuthenticationInfo(
AuthenticationInfo&& auth)
AuthenticationInfo&& auth) noexcept
: identity_handle_(std::move(auth.identity_handle_))
, handshake_handle_(std::move(auth.handshake_handle_))
, auth_status_(auth.auth_status_)
, expected_sequence_number_(auth.expected_sequence_number_)
, change_sequence_number_(std::move(auth.change_sequence_number_))
, event_(std::move(auth.event_))
{
auth.identity_handle_ = nullptr;
auth.handshake_handle_ = nullptr;
auth.auth_status_ = AUTHENTICATION_NOT_AVAILABLE;
auth.expected_sequence_number_ = 0;
auth.change_sequence_number_ = SequenceNumber_t::unknown();
}

int32_t handshake_requests_sent_;
Expand Down
31 changes: 26 additions & 5 deletions test/blackbox/api/dds-pim/PubSubReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -847,16 +847,28 @@ class PubSubReader
}

#if HAVE_SECURITY
void waitAuthorized()
void waitAuthorized(
std::chrono::seconds timeout = std::chrono::seconds::zero(),
unsigned int expected = 1)
{
std::unique_lock<std::mutex> lock(mutexAuthentication_);

std::cout << "Reader is waiting authorization..." << std::endl;

cvAuthentication_.wait(lock, [&]() -> bool
{
return authorized_ > 0;
});
if (timeout == std::chrono::seconds::zero())
{
cvAuthentication_.wait(lock, [&]()
{
return authorized_ >= expected;
});
}
else
{
cvAuthentication_.wait_for(lock, timeout, [&]()
{
return authorized_ >= expected;
});
}

std::cout << "Reader authorization finished..." << std::endl;
}
Expand Down Expand Up @@ -1162,6 +1174,15 @@ class PubSubReader
return *this;
}

PubSubReader& participants_allocation_properties(
size_t initial,
size_t maximum)
{
participant_qos_.allocation().participants.initial = initial;
participant_qos_.allocation().participants.maximum = maximum;
return *this;
}

PubSubReader& expect_no_allocs()
{
// TODO(Mcc): Add no allocations check code when feature is completely ready
Expand Down
31 changes: 26 additions & 5 deletions test/blackbox/api/dds-pim/PubSubWriter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -729,16 +729,28 @@ class PubSubWriter
}

#if HAVE_SECURITY
void waitAuthorized()
void waitAuthorized(
std::chrono::seconds timeout = std::chrono::seconds::zero(),
unsigned int expected = 1)
{
std::unique_lock<std::mutex> lock(mutexAuthentication_);

std::cout << "Writer is waiting authorization..." << std::endl;

cvAuthentication_.wait(lock, [&]() -> bool
{
return authorized_ > 0;
});
if (timeout == std::chrono::seconds::zero())
{
cvAuthentication_.wait(lock, [&]()
{
return authorized_ >= expected;
});
}
else
{
cvAuthentication_.wait_for(lock, timeout, [&]()
{
return authorized_ >= expected;
});
}

std::cout << "Writer authorization finished..." << std::endl;
}
Expand Down Expand Up @@ -1124,6 +1136,15 @@ class PubSubWriter
return *this;
}

PubSubWriter& participants_allocation_properties(
size_t initial,
size_t maximum)
{
participant_qos_.allocation().participants.initial = initial;
participant_qos_.allocation().participants.maximum = maximum;
return *this;
}

PubSubWriter& expect_no_allocs()
{
// TODO(Mcc): Add no allocations check code when feature is completely ready
Expand Down
22 changes: 17 additions & 5 deletions test/blackbox/api/fastrtps_deprecated/PubSubReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -599,16 +599,28 @@ class PubSubReader
}

#if HAVE_SECURITY
void waitAuthorized()
void waitAuthorized(
std::chrono::seconds timeout = std::chrono::seconds::zero(),
unsigned int expected = 1)
{
std::unique_lock<std::mutex> lock(mutexAuthentication_);

std::cout << "Reader is waiting authorization..." << std::endl;

cvAuthentication_.wait(lock, [&]() -> bool
{
return authorized_ > 0;
});
if (timeout == std::chrono::seconds::zero())
{
cvAuthentication_.wait(lock, [&]()
{
return authorized_ >= expected;
});
}
else
{
cvAuthentication_.wait_for(lock, timeout, [&]()
{
return authorized_ >= expected;
});
}

std::cout << "Reader authorization finished..." << std::endl;
}
Expand Down
22 changes: 17 additions & 5 deletions test/blackbox/api/fastrtps_deprecated/PubSubWriter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -561,16 +561,28 @@ class PubSubWriter
}

#if HAVE_SECURITY
void waitAuthorized()
void waitAuthorized(
std::chrono::seconds timeout = std::chrono::seconds::zero(),
unsigned int expected = 1)
{
std::unique_lock<std::mutex> lock(mutexAuthentication_);

std::cout << "Writer is waiting authorization..." << std::endl;

cvAuthentication_.wait(lock, [&]() -> bool
{
return authorized_ > 0;
});
if (timeout == std::chrono::seconds::zero())
{
cvAuthentication_.wait(lock, [&]()
{
return authorized_ >= expected;
});
}
else
{
cvAuthentication_.wait_for(lock, timeout, [&]()
{
return authorized_ >= expected;
});
}

std::cout << "Writer authorization finished..." << std::endl;
}
Expand Down
Loading

0 comments on commit 7f54166

Please sign in to comment.