uxrce_dds_client: optimizations and instrumentation

- skip ping session if data flowing bidirectionally
 - add perf counters for loop time and interval
 - skip blocking poll if there's input data to read
This commit is contained in:
Daniel Agar 2023-12-19 09:41:19 -05:00
parent b115d3cd44
commit a1cce7e961
2 changed files with 105 additions and 71 deletions

View File

@ -56,8 +56,8 @@
using namespace time_literals;
void on_time(uxrSession *session, int64_t current_time, int64_t received_timestamp, int64_t transmit_timestamp,
int64_t originate_timestamp, void *args)
static void on_time(uxrSession *session, int64_t current_time, int64_t received_timestamp, int64_t transmit_timestamp,
int64_t originate_timestamp, void *args)
{
// latest round trip time (RTT)
int64_t rtt = current_time - originate_timestamp;
@ -76,21 +76,15 @@ void on_time(uxrSession *session, int64_t current_time, int64_t received_timesta
}
}
void on_time_no_sync(uxrSession *session, int64_t current_time, int64_t received_timestamp, int64_t transmit_timestamp,
int64_t originate_timestamp, void *args)
static void on_time_no_sync(uxrSession *session, int64_t current_time, int64_t received_timestamp,
int64_t transmit_timestamp,
int64_t originate_timestamp, void *args)
{
session->time_offset = 0;
}
void on_request(
uxrSession *session,
uxrObjectId object_id,
uint16_t request_id,
SampleIdentity *sample_id,
ucdrBuffer *ub,
uint16_t length,
void *args)
static void on_request(uxrSession *session, uxrObjectId object_id, uint16_t request_id, SampleIdentity *sample_id,
ucdrBuffer *ub, uint16_t length, void *args)
{
(void) request_id;
(void) length;
@ -229,6 +223,9 @@ UxrceddsClient::~UxrceddsClient()
delete _transport_serial;
}
perf_free(_loop_perf);
perf_free(_loop_interval_perf);
#if defined(UXRCE_DDS_CLIENT_UDP)
if (_transport_udp) {
@ -474,9 +471,9 @@ void UxrceddsClient::run()
}
// create VehicleCommand replier
if (num_of_repliers < MAX_NUM_REPLIERS) {
if (_num_of_repliers < MAX_NUM_REPLIERS) {
if (add_replier(new VehicleCommandSrv(&session, reliable_out, reliable_in, participant_id, _client_namespace,
num_of_repliers))) {
_num_of_repliers))) {
PX4_ERR("replier init failed");
return;
}
@ -522,45 +519,48 @@ void UxrceddsClient::run()
while (!should_exit() && _connected) {
/* Wait for topic updates for max 1000 ms (1sec) */
int poll = px4_poll(&_subs->fds[0], (sizeof(_subs->fds) / sizeof(_subs->fds[0])), 1000);
perf_begin(_loop_perf);
perf_count(_loop_interval_perf);
/* Handle the poll results */
if (poll == 0) {
/* Timeout, no updates in selected uorbs */
continue;
int orb_poll_timeout_ms = 10;
} else if (poll < 0) {
/* Error */
if (poll_error_counter < 10 || poll_error_counter % 50 == 0) {
/* Prevent flooding */
PX4_ERR("ERROR while polling uorbs: %d", poll);
int bytes_available = 0;
if (ioctl(_fd, FIONREAD, (unsigned long)&bytes_available) == OK) {
if (bytes_available > 10) {
orb_poll_timeout_ms = 0;
}
poll_error_counter++;
continue;
}
_subs->update(&session, reliable_out, best_effort_out, participant_id, _client_namespace);
/* Wait for topic updates for max 10 ms */
int poll = px4_poll(_subs->fds, (sizeof(_subs->fds) / sizeof(_subs->fds[0])), orb_poll_timeout_ms);
/* Handle the poll results */
if (poll > 0) {
_subs->update(&session, reliable_out, best_effort_out, participant_id, _client_namespace);
} else {
if (poll < 0) {
// poll error
if (poll_error_counter < 10 || poll_error_counter % 50 == 0) {
// prevent flooding
PX4_ERR("ERROR while polling uorbs: %d", poll);
}
poll_error_counter++;
}
}
// run session with 0 timeout (non-blocking)
uxr_run_session_timeout(&session, 0);
// check if there are available replies
process_replies();
// Run the session until we receive no more data or up to a maximum number of iterations.
// The maximum observed number of iterations was 6 (SITL). If we were to run only once, data starts to get
// delayed, causing registered flight modes to time out.
for (int i = 0; i < 10; ++i) {
const uint32_t prev_num_payload_received = _pubs->num_payload_received;
uxr_run_session_timeout(&session, 0);
if (_pubs->num_payload_received == prev_num_payload_received) {
break;
}
}
// time sync session
if (_synchronize_timestamps && hrt_elapsed_time(&last_sync_session) > 1_s) {
if (uxr_sync_session(&session, 100) && _timesync.sync_converged()) {
if (uxr_sync_session(&session, 10) && _timesync.sync_converged()) {
//PX4_INFO("synchronized with time offset %-5" PRId64 "ns", session.time_offset);
last_sync_session = hrt_absolute_time();
@ -568,6 +568,15 @@ void UxrceddsClient::run()
syncSystemClock(&session);
}
}
if (!_timesync_converged && _timesync.sync_converged()) {
PX4_INFO("time sync converged");
} else if (_timesync_converged && !_timesync.sync_converged()) {
PX4_WARN("time sync no longer converged");
}
_timesync_converged = _timesync.sync_converged();
}
handleMessageFormatRequest();
@ -589,26 +598,37 @@ void UxrceddsClient::run()
last_status_update = now;
}
// Handle ping
if (now - last_ping > 500_ms) {
// Handle ping, unless we're actively sending & receiving payloads successfully
if ((_last_payload_tx_rate > 0) && (_last_payload_rx_rate > 0)) {
_connected = true;
num_pings_missed = 0;
last_ping = now;
if (had_ping_reply) {
num_pings_missed = 0;
} else {
if (hrt_elapsed_time(&last_ping) > 1_s) {
last_ping = now;
} else {
++num_pings_missed;
if (had_ping_reply) {
num_pings_missed = 0;
} else {
++num_pings_missed;
}
int timeout_ms = 1'000; // 1 second
uint8_t attempts = 1;
uxr_ping_agent_session(&session, timeout_ms, attempts);
had_ping_reply = false;
}
uxr_ping_agent_session(&session, 0, 1);
had_ping_reply = false;
if (num_pings_missed >= 3) {
PX4_INFO("No ping response, disconnecting");
_connected = false;
}
}
if (num_pings_missed > 2) {
PX4_INFO("No ping response, disconnecting");
_connected = false;
}
perf_end(_loop_perf);
}
@ -761,10 +781,10 @@ bool UxrceddsClient::setBaudrate(int fd, unsigned baud)
bool UxrceddsClient::add_replier(SrvBase *replier)
{
if (num_of_repliers < MAX_NUM_REPLIERS) {
repliers_[num_of_repliers] = replier;
if (_num_of_repliers < MAX_NUM_REPLIERS) {
_repliers[_num_of_repliers] = replier;
num_of_repliers++;
_num_of_repliers++;
}
return false;
@ -773,11 +793,12 @@ bool UxrceddsClient::add_replier(SrvBase *replier)
void UxrceddsClient::process_requests(uxrObjectId object_id, SampleIdentity *sample_id, ucdrBuffer *ub,
const int64_t time_offset_us)
{
for (uint8_t i = 0; i < num_of_repliers; i++) {
if (object_id.id == repliers_[i]->replier_id_.id
&& object_id.type == repliers_[i]->replier_id_.type) {
repliers_[i]->process_request(ub, time_offset_us);
memcpy(&(repliers_[i]->sample_id_), sample_id, sizeof(repliers_[i]->sample_id_));
for (uint8_t i = 0; i < _num_of_repliers; i++) {
if (object_id.id == _repliers[i]->replier_id_.id
&& object_id.type == _repliers[i]->replier_id_.type) {
_repliers[i]->process_request(ub, time_offset_us);
memcpy(&(_repliers[i]->sample_id_), sample_id, sizeof(_repliers[i]->sample_id_));
break;
}
}
@ -785,18 +806,19 @@ void UxrceddsClient::process_requests(uxrObjectId object_id, SampleIdentity *sam
void UxrceddsClient::process_replies()
{
for (uint8_t i = 0; i < num_of_repliers; i++) {
repliers_[i]->process_reply();
for (uint8_t i = 0; i < _num_of_repliers; i++) {
_repliers[i]->process_reply();
}
}
void UxrceddsClient::delete_repliers()
{
for (uint8_t i = 0; i < num_of_repliers; i++) {
delete (repliers_[i]);
for (uint8_t i = 0; i < _num_of_repliers; i++) {
delete (_repliers[i]);
_repliers[i] = nullptr;
}
num_of_repliers = 0;
_num_of_repliers = 0;
}
int UxrceddsClient::custom_command(int argc, char *argv[])
@ -845,6 +867,11 @@ int UxrceddsClient::print_status()
PX4_INFO("Payload rx: %i B/s", _last_payload_rx_rate);
}
PX4_INFO("timesync converged: %s", _timesync.sync_converged() ? "true" : "false");
perf_print_counter(_loop_perf);
perf_print_counter(_loop_interval_perf);
return 0;
}

View File

@ -44,6 +44,8 @@
#include <lib/timesync/Timesync.hpp>
#include <lib/perf/perf_counter.h>
#if defined(CONFIG_NET) || defined(__PX4_POSIX)
# define UXRCE_DDS_CLIENT_UDP 1
#endif
@ -157,8 +159,8 @@ private:
SendTopicsSubs *_subs{nullptr};
RcvTopicsPubs *_pubs{nullptr};
SrvBase *repliers_[MAX_NUM_REPLIERS];
uint8_t num_of_repliers{0};
SrvBase *_repliers[MAX_NUM_REPLIERS];
uint8_t _num_of_repliers{0};
uxrCommunication *_comm{nullptr};
int _fd{-1};
@ -167,8 +169,13 @@ private:
int _last_payload_rx_rate{}; ///< in B/s
bool _connected{false};
bool _timesync_converged{false};
Timesync _timesync{timesync_status_s::SOURCE_PROTOCOL_DDS};
perf_counter_t _loop_perf{perf_alloc(PC_ELAPSED, MODULE_NAME": cycle")};
perf_counter_t _loop_interval_perf{perf_alloc(PC_INTERVAL, MODULE_NAME": cycle interval")};
DEFINE_PARAMETERS(
(ParamInt<px4::params::UXRCE_DDS_DOM_ID>) _param_uxrce_dds_dom_id,
(ParamInt<px4::params::UXRCE_DDS_KEY>) _param_uxrce_key,