AP_DDS: support automatic reconnect to micro-ROS agent

- Add ping test and attempt reconnect if connection dropped.
- Retry ping test max_attempts before exiting.
- Move `uxr_init_session` from transport init to session init for reconnect
- Tidy handling of transport.comm
- Fix codestyle

Signed-off-by: Rhys Mainwaring <rhys.mainwaring@me.com>

AP_DDS: use PONG_IN_SESSION_STATUS in status check

Signed-off-by: Rhys Mainwaring <rhys.mainwaring@me.com>

AP_DDS: add local variables to clarify arguments to uxr_ping_agent_session

Signed-off-by: Rhys Mainwaring <rhys.mainwaring@me.com>
This commit is contained in:
Rhys Mainwaring 2023-10-10 20:14:05 +01:00 committed by Peter Barker
parent 4f00dfa69a
commit 125c8fa1fa
4 changed files with 109 additions and 16 deletions

View File

@ -2,6 +2,8 @@
#if AP_DDS_ENABLED
#include <uxr/client/util/ping.h>
#include <AP_GPS/AP_GPS.h>
#include <AP_HAL/AP_HAL.h>
#include <AP_RTC/AP_RTC.h>
@ -34,6 +36,7 @@ static constexpr uint16_t DELAY_LOCAL_POSE_TOPIC_MS = 33;
static constexpr uint16_t DELAY_LOCAL_VELOCITY_TOPIC_MS = 33;
static constexpr uint16_t DELAY_GEO_POSE_TOPIC_MS = 33;
static constexpr uint16_t DELAY_CLOCK_TOPIC_MS = 10;
static constexpr uint16_t DELAY_PING_MS = 500;
// Define the subscriber data members, which are static class scope.
// If these are created on the stack in the subscriber,
@ -67,6 +70,18 @@ const AP_Param::GroupInfo AP_DDS_Client::var_info[] {
AP_GROUPEND
};
AP_DDS_Client::~AP_DDS_Client()
{
// close transport
if (is_using_serial) {
uxr_close_custom_transport(&serial.transport);
} else {
#if AP_DDS_UDP_ENABLED
uxr_close_custom_transport(&udp.transport);
#endif
}
}
void AP_DDS_Client::update_topic(builtin_interfaces_msg_Time& msg)
{
uint64_t utc_usec;
@ -569,22 +584,83 @@ void AP_DDS_Client::on_request(uxrSession* uxr_session, uxrObjectId object_id, u
*/
void AP_DDS_Client::main_loop(void)
{
if (!init() || !create()) {
GCS_SEND_TEXT(MAV_SEVERITY_ERROR, "%s Creation Requests failed", msg_prefix);
if (!init_transport()) {
return;
}
GCS_SEND_TEXT(MAV_SEVERITY_INFO, "%s Initialization passed", msg_prefix);
populate_static_transforms(tx_static_transforms_topic);
write_static_transforms();
//! @todo check for request to stop task
while (true) {
hal.scheduler->delay(1);
update();
if (comm == nullptr) {
GCS_SEND_TEXT(MAV_SEVERITY_ERROR, "DDS Client: transport invalid, exiting");
return;
}
// check ping
const uint64_t ping_timeout_ms{1000};
const uint8_t ping_max_attempts{10};
if (!uxr_ping_agent_attempts(comm, ping_timeout_ms, ping_max_attempts)) {
GCS_SEND_TEXT(MAV_SEVERITY_ERROR, "DDS Client: No ping response, exiting");
return;
}
// create session
if (!init_session() || !create()) {
GCS_SEND_TEXT(MAV_SEVERITY_ERROR, "DDS Client: Creation Requests failed");
return;
}
connected = true;
GCS_SEND_TEXT(MAV_SEVERITY_INFO, "DDS Client: Initialization passed");
populate_static_transforms(tx_static_transforms_topic);
write_static_transforms();
uint64_t last_ping_ms{0};
uint8_t num_pings_missed{0};
bool had_ping_reply{false};
while (connected) {
hal.scheduler->delay(1);
// publish topics
update();
// check ping response
if (session.on_pong_flag == PONG_IN_SESSION_STATUS) {
had_ping_reply = true;
}
const auto cur_time_ms = AP_HAL::millis64();
if (cur_time_ms - last_ping_ms > DELAY_PING_MS) {
last_ping_ms = cur_time_ms;
if (had_ping_reply) {
num_pings_missed = 0;
} else {
++num_pings_missed;
}
const int ping_agent_timeout_ms{0};
const uint8_t ping_agent_attempts{1};
uxr_ping_agent_session(&session, ping_agent_timeout_ms, ping_agent_attempts);
had_ping_reply = false;
}
if (num_pings_missed > 2) {
GCS_SEND_TEXT(MAV_SEVERITY_ERROR,
"DDS Client: No ping response, disconnecting");
connected = false;
}
}
// delete session if connected
if (connected) {
uxr_delete_session(&session);
}
}
}
bool AP_DDS_Client::init()
bool AP_DDS_Client::init_transport()
{
// serial init will fail if the SERIALn_PROTOCOL is not setup
bool initTransportStatus = ddsSerialInit();
@ -602,6 +678,14 @@ bool AP_DDS_Client::init()
return false;
}
return true;
}
bool AP_DDS_Client::init_session()
{
// init session
uxr_init_session(&session, comm, key);
// Register topic callbacks
uxr_set_topic_callback(&session, AP_DDS_Client::on_topic_trampoline, this);
@ -931,7 +1015,7 @@ void AP_DDS_Client::update()
write_clock_topic();
}
connected = uxr_run_session_time(&session, 1);
status_ok = uxr_run_session_time(&session, 1);
}
#if CONFIG_HAL_BOARD != HAL_BOARD_SITL

View File

@ -73,7 +73,8 @@ private:
HAL_Semaphore csem;
// connection parametrics
bool connected = true;
bool status_ok{false};
bool connected{false};
static void update_topic(builtin_interfaces_msg_Time& msg);
bool update_topic(sensor_msgs_msg_NavSatFix& msg, const uint8_t instance) WARN_IF_UNUSED;
@ -143,17 +144,25 @@ private:
SocketAPM *socket;
} udp;
#endif
// pointer to transport's communication structure
uxrCommunication *comm{nullptr};
// client key we present
static constexpr uint32_t uniqueClientKey = 0xAAAABBBB;
static constexpr uint32_t key = 0xAAAABBBB;
public:
~AP_DDS_Client();
bool start(void);
void main_loop(void);
//! @brief Initialize the client's transport, uxr session, and IO stream(s)
//! @brief Initialize the client's transport
//! @return True on successful initialization, false on failure
bool init() WARN_IF_UNUSED;
bool init_transport() WARN_IF_UNUSED;
//! @brief Initialize the client's uxr session and IO stream(s)
//! @return True on successful initialization, false on failure
bool init_session() WARN_IF_UNUSED;
//! @brief Set up the client's participants, data read/writes,
// publishers, subscribers

View File

@ -90,6 +90,6 @@ bool AP_DDS_Client::ddsSerialInit()
if (!uxr_init_custom_transport(&serial.transport, (void*)this)) {
return false;
}
uxr_init_session(&session, &serial.transport.comm, uniqueClientKey);
comm = &serial.transport.comm;
return true;
}

View File

@ -83,7 +83,7 @@ bool AP_DDS_Client::ddsUdpInit()
if (!uxr_init_custom_transport(&udp.transport, (void*)this)) {
return false;
}
uxr_init_session(&session, &udp.transport.comm, uniqueClientKey);
comm = &udp.transport.comm;
return true;
}
#endif // AP_DDS_UDP_ENABLED