Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add payload tx/rx timeouts to DDS #24309

Merged
merged 4 commits into from
Feb 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions src/modules/uxrce_dds_client/module.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,27 @@ parameters:
category: System
reboot_required: true
default: 0

UXRCE_DDS_TX_TO:
description:
short: TX rate timeout configuration
long: |
Specifies after how many seconds without sending data the DDS connection is reestablished.
A value less than one disables the TX rate timeout.
type: int32
category: System
reboot_required: true
default: 3
unit: s

UXRCE_DDS_RX_TO:
description:
short: RX rate timeout configuration
long: |
Specifies after how many seconds without receiving data the DDS connection is reestablished.
A value less than one disables the RX rate timeout.
type: int32
category: System
reboot_required: true
default: -1
unit: s
158 changes: 106 additions & 52 deletions src/modules/uxrce_dds_client/uxrce_dds_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ void UxrceddsClient::deinit()
_comm = nullptr;
}

bool UxrceddsClient::setup_session(uxrSession *session)
bool UxrceddsClient::setupSession(uxrSession *session)
{
_participant_config = static_cast<ParticipantConfig>(_param_uxrce_dds_ptcfg.get());
_synchronize_timestamps = (_param_uxrce_dds_synct.get() > 0);
Expand Down Expand Up @@ -379,7 +379,7 @@ bool UxrceddsClient::setup_session(uxrSession *session)
return true;
}

void UxrceddsClient::delete_session(uxrSession *session)
void UxrceddsClient::deleteSession(uxrSession *session)
{
delete_repliers();

Expand Down Expand Up @@ -472,6 +472,20 @@ static void fillMessageFormatResponse(const message_format_request_s &message_fo
message_format_response.timestamp = hrt_absolute_time();
}

void UxrceddsClient::calculateTxRxRate()
{
const hrt_abstime now = hrt_absolute_time();

if (now - _last_status_update > 1_s) {
float dt = (now - _last_status_update) / 1e6f;
_last_payload_tx_rate = (_subs->num_payload_sent - _last_num_payload_sent) / dt;
_last_payload_rx_rate = (_pubs->num_payload_received - _last_num_payload_received) / dt;
_last_num_payload_sent = _subs->num_payload_sent;
_last_num_payload_received = _pubs->num_payload_received;
_last_status_update = now;
}
}

void UxrceddsClient::handleMessageFormatRequest()
{
message_format_request_s message_format_request;
Expand All @@ -483,6 +497,87 @@ void UxrceddsClient::handleMessageFormatRequest()
}
}

void UxrceddsClient::checkConnectivity(uxrSession *session)
{
// Reset TX zero counter, when data is sent
if (_last_payload_tx_rate > 0) {
_num_tx_rate_zero = 0;
}

// Reset RX zero counter, when data is received
if (_last_payload_rx_rate > 0) {
_num_rx_rate_zero = 0;
}

const hrt_abstime now = hrt_absolute_time();

// Start ping and tx/rx rate monitoring, unless we're actively sending & receiving payloads successfully
if ((_last_payload_tx_rate > 0) && (_last_payload_rx_rate > 0)) {
_connected = true;
_num_pings_missed = 0;
_last_ping = now;

} else {
if (hrt_elapsed_time(&_last_ping) > 1_s) {
// Check payload tx rate
if (_last_payload_tx_rate == 0) {
_num_tx_rate_zero++;
}

// Check payload rx rate
if (_last_payload_rx_rate == 0) {
_num_rx_rate_zero++;
}

// Check ping
_last_ping = now;

if (_had_ping_reply) {
_num_pings_missed = 0;

} else {
++_num_pings_missed;
}

int timeout_ms = 1'000; // 1 second
uint8_t attempts = 1;
uxr_ping_agent_session(session, timeout_ms, attempts);

_had_ping_reply = false;
}

if (_num_pings_missed >= 3) {
PX4_ERR("No ping response, disconnecting");
_connected = false;
}

int32_t tx_timeout = _param_uxrce_dds_tx_to.get();
int32_t rx_timeout = _param_uxrce_dds_rx_to.get();

if (tx_timeout > 0 && _num_tx_rate_zero >= tx_timeout) {
PX4_ERR("Payload TX rate zero for too long, disconnecting");
_connected = false;
}

if (rx_timeout > 0 && _num_rx_rate_zero >= rx_timeout) {
alexcekay marked this conversation as resolved.
Show resolved Hide resolved
PX4_ERR("Payload RX rate zero for too long, disconnecting");
_connected = false;
}
}
}

void UxrceddsClient::resetConnectivityCounters()
{
_last_status_update = hrt_absolute_time();
_last_ping = hrt_absolute_time();
_had_ping_reply = false;
_num_pings_missed = 0;
_last_num_payload_sent = 0;
_last_num_payload_received = 0;
_num_tx_rate_zero = 0;
_num_rx_rate_zero = 0;
}

void UxrceddsClient::syncSystemClock(uxrSession *session)
{
struct timespec ts = {};
Expand Down Expand Up @@ -535,8 +630,8 @@ void UxrceddsClient::run()
continue;
}

if (!setup_session(&session)) {
delete_session(&session);
if (!setupSession(&session)) {
deleteSession(&session);
px4_usleep(1'000'000);
PX4_ERR("session setup failed, will retry now");
continue;
Expand All @@ -552,13 +647,8 @@ void UxrceddsClient::run()
}

hrt_abstime last_sync_session = 0;
hrt_abstime last_status_update = hrt_absolute_time();
hrt_abstime last_ping = hrt_absolute_time();
int num_pings_missed = 0;
bool had_ping_reply = false;
uint32_t last_num_payload_sent{};
uint32_t last_num_payload_received{};
int poll_error_counter = 0;
resetConnectivityCounters();

_subs->init();
_subs_initialized = true;
Expand Down Expand Up @@ -629,55 +719,19 @@ void UxrceddsClient::run()
// Check for a ping response
/* PONG_IN_SESSION_STATUS */
if (session.on_pong_flag == 1) {
had_ping_reply = true;
_had_ping_reply = true;
}

const hrt_abstime now = hrt_absolute_time();

if (now - last_status_update > 1_s) {
float dt = (now - last_status_update) / 1e6f;
_last_payload_tx_rate = (_subs->num_payload_sent - last_num_payload_sent) / dt;
_last_payload_rx_rate = (_pubs->num_payload_received - last_num_payload_received) / dt;
last_num_payload_sent = _subs->num_payload_sent;
last_num_payload_received = _pubs->num_payload_received;
last_status_update = now;
}
// Calculate the payload tx/rx rate for connectivity monitoring
calculateTxRxRate();

// Handle ping, unless we're actively sending & receiving payloads successfully
if ((_last_payload_tx_rate > 0) && (_last_payload_rx_rate > 0)) {
_connected = true;
num_pings_missed = 0;
last_ping = now;

} else {
if (hrt_elapsed_time(&last_ping) > 1_s) {
last_ping = now;

if (had_ping_reply) {
num_pings_missed = 0;

} else {
++num_pings_missed;
}

int timeout_ms = 1'000; // 1 second
uint8_t attempts = 1;
uxr_ping_agent_session(&session, timeout_ms, attempts);

had_ping_reply = false;
}

if (num_pings_missed >= 3) {
PX4_INFO("No ping response, disconnecting");
_connected = false;
}
}
// Check if there is still connectivity with the agent
checkConnectivity(&session);

perf_end(_loop_perf);

}

delete_session(&session);
deleteSession(&session);
}
}

Expand Down
20 changes: 17 additions & 3 deletions src/modules/uxrce_dds_client/uxrce_dds_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,17 @@ class UxrceddsClient : public ModuleBase<UxrceddsClient>, public ModuleParams
bool init();
void deinit();

bool setup_session(uxrSession *session);
void delete_session(uxrSession *session);
bool setupSession(uxrSession *session);
void deleteSession(uxrSession *session);

bool setBaudrate(int fd, unsigned baud);

void handleMessageFormatRequest();

void calculateTxRxRate();
void checkConnectivity(uxrSession *session);
void resetConnectivityCounters();

uORB::Publication<message_format_response_s> _message_format_response_pub{ORB_ID(message_format_response)};
uORB::Subscription _message_format_request_sub{ORB_ID(message_format_request)};

Expand Down Expand Up @@ -179,6 +183,14 @@ class UxrceddsClient : public ModuleBase<UxrceddsClient>, public ModuleParams
uxrCommunication *_comm{nullptr};
int _fd{-1};

hrt_abstime _last_status_update;
hrt_abstime _last_ping;
bool _had_ping_reply{false};
int _num_pings_missed{0};
int32_t _num_tx_rate_zero{0};
int32_t _num_rx_rate_zero{0};
uint32_t _last_num_payload_sent{0};
uint32_t _last_num_payload_received{0};
int _last_payload_tx_rate{}; ///< in B/s
int _last_payload_rx_rate{}; ///< in B/s

Expand All @@ -197,6 +209,8 @@ class UxrceddsClient : public ModuleBase<UxrceddsClient>, public ModuleParams
(ParamInt<px4::params::UXRCE_DDS_KEY>) _param_uxrce_key,
(ParamInt<px4::params::UXRCE_DDS_PTCFG>) _param_uxrce_dds_ptcfg,
(ParamInt<px4::params::UXRCE_DDS_SYNCC>) _param_uxrce_dds_syncc,
(ParamInt<px4::params::UXRCE_DDS_SYNCT>) _param_uxrce_dds_synct
(ParamInt<px4::params::UXRCE_DDS_SYNCT>) _param_uxrce_dds_synct,
(ParamInt<px4::params::UXRCE_DDS_TX_TO>) _param_uxrce_dds_tx_to,
(ParamInt<px4::params::UXRCE_DDS_RX_TO>) _param_uxrce_dds_rx_to
)
};
Loading