From 125c8fa1fac81fc7d3bb78592eac786e4e53f978 Mon Sep 17 00:00:00 2001 From: Rhys Mainwaring Date: Tue, 10 Oct 2023 20:14:05 +0100 Subject: [PATCH] AP_DDS: support automatic reconnect to micro-ROS agent - Add ping test and attempt reconnect if connection dropped. - Retry ping test max_attempts before exiting. - Move `uxr_init_session` from transport init to session init for reconnect - Tidy handling of transport.comm - Fix codestyle Signed-off-by: Rhys Mainwaring AP_DDS: use PONG_IN_SESSION_STATUS in status check Signed-off-by: Rhys Mainwaring AP_DDS: add local variables to clarify arguments to uxr_ping_agent_session Signed-off-by: Rhys Mainwaring --- libraries/AP_DDS/AP_DDS_Client.cpp | 104 ++++++++++++++++++++++++++--- libraries/AP_DDS/AP_DDS_Client.h | 17 +++-- libraries/AP_DDS/AP_DDS_Serial.cpp | 2 +- libraries/AP_DDS/AP_DDS_UDP.cpp | 2 +- 4 files changed, 109 insertions(+), 16 deletions(-) diff --git a/libraries/AP_DDS/AP_DDS_Client.cpp b/libraries/AP_DDS/AP_DDS_Client.cpp index eecac3fedf..29560fcc79 100644 --- a/libraries/AP_DDS/AP_DDS_Client.cpp +++ b/libraries/AP_DDS/AP_DDS_Client.cpp @@ -2,6 +2,8 @@ #if AP_DDS_ENABLED +#include + #include #include #include @@ -34,6 +36,7 @@ static constexpr uint16_t DELAY_LOCAL_POSE_TOPIC_MS = 33; static constexpr uint16_t DELAY_LOCAL_VELOCITY_TOPIC_MS = 33; static constexpr uint16_t DELAY_GEO_POSE_TOPIC_MS = 33; static constexpr uint16_t DELAY_CLOCK_TOPIC_MS = 10; +static constexpr uint16_t DELAY_PING_MS = 500; // Define the subscriber data members, which are static class scope. // If these are created on the stack in the subscriber, @@ -67,6 +70,18 @@ const AP_Param::GroupInfo AP_DDS_Client::var_info[] { AP_GROUPEND }; +AP_DDS_Client::~AP_DDS_Client() +{ + // close transport + if (is_using_serial) { + uxr_close_custom_transport(&serial.transport); + } else { +#if AP_DDS_UDP_ENABLED + uxr_close_custom_transport(&udp.transport); +#endif + } +} + void AP_DDS_Client::update_topic(builtin_interfaces_msg_Time& msg) { uint64_t utc_usec; @@ -569,22 +584,83 @@ void AP_DDS_Client::on_request(uxrSession* uxr_session, uxrObjectId object_id, u */ void AP_DDS_Client::main_loop(void) { - if (!init() || !create()) { - GCS_SEND_TEXT(MAV_SEVERITY_ERROR, "%s Creation Requests failed", msg_prefix); + if (!init_transport()) { return; } - GCS_SEND_TEXT(MAV_SEVERITY_INFO, "%s Initialization passed", msg_prefix); - - populate_static_transforms(tx_static_transforms_topic); - write_static_transforms(); + //! @todo check for request to stop task while (true) { - hal.scheduler->delay(1); - update(); + if (comm == nullptr) { + GCS_SEND_TEXT(MAV_SEVERITY_ERROR, "DDS Client: transport invalid, exiting"); + return; + } + + // check ping + const uint64_t ping_timeout_ms{1000}; + const uint8_t ping_max_attempts{10}; + if (!uxr_ping_agent_attempts(comm, ping_timeout_ms, ping_max_attempts)) { + GCS_SEND_TEXT(MAV_SEVERITY_ERROR, "DDS Client: No ping response, exiting"); + return; + } + + // create session + if (!init_session() || !create()) { + GCS_SEND_TEXT(MAV_SEVERITY_ERROR, "DDS Client: Creation Requests failed"); + return; + } + connected = true; + GCS_SEND_TEXT(MAV_SEVERITY_INFO, "DDS Client: Initialization passed"); + + populate_static_transforms(tx_static_transforms_topic); + write_static_transforms(); + + uint64_t last_ping_ms{0}; + uint8_t num_pings_missed{0}; + bool had_ping_reply{false}; + while (connected) { + hal.scheduler->delay(1); + + // publish topics + update(); + + // check ping response + if (session.on_pong_flag == PONG_IN_SESSION_STATUS) { + had_ping_reply = true; + } + + const auto cur_time_ms = AP_HAL::millis64(); + if (cur_time_ms - last_ping_ms > DELAY_PING_MS) { + last_ping_ms = cur_time_ms; + + if (had_ping_reply) { + num_pings_missed = 0; + + } else { + ++num_pings_missed; + } + + const int ping_agent_timeout_ms{0}; + const uint8_t ping_agent_attempts{1}; + uxr_ping_agent_session(&session, ping_agent_timeout_ms, ping_agent_attempts); + + had_ping_reply = false; + } + + if (num_pings_missed > 2) { + GCS_SEND_TEXT(MAV_SEVERITY_ERROR, + "DDS Client: No ping response, disconnecting"); + connected = false; + } + } + + // delete session if connected + if (connected) { + uxr_delete_session(&session); + } } } -bool AP_DDS_Client::init() +bool AP_DDS_Client::init_transport() { // serial init will fail if the SERIALn_PROTOCOL is not setup bool initTransportStatus = ddsSerialInit(); @@ -602,6 +678,14 @@ bool AP_DDS_Client::init() return false; } + return true; +} + +bool AP_DDS_Client::init_session() +{ + // init session + uxr_init_session(&session, comm, key); + // Register topic callbacks uxr_set_topic_callback(&session, AP_DDS_Client::on_topic_trampoline, this); @@ -931,7 +1015,7 @@ void AP_DDS_Client::update() write_clock_topic(); } - connected = uxr_run_session_time(&session, 1); + status_ok = uxr_run_session_time(&session, 1); } #if CONFIG_HAL_BOARD != HAL_BOARD_SITL diff --git a/libraries/AP_DDS/AP_DDS_Client.h b/libraries/AP_DDS/AP_DDS_Client.h index ac6a7c0d56..acdf4c7ddc 100644 --- a/libraries/AP_DDS/AP_DDS_Client.h +++ b/libraries/AP_DDS/AP_DDS_Client.h @@ -73,7 +73,8 @@ private: HAL_Semaphore csem; // connection parametrics - bool connected = true; + bool status_ok{false}; + bool connected{false}; static void update_topic(builtin_interfaces_msg_Time& msg); bool update_topic(sensor_msgs_msg_NavSatFix& msg, const uint8_t instance) WARN_IF_UNUSED; @@ -143,17 +144,25 @@ private: SocketAPM *socket; } udp; #endif + // pointer to transport's communication structure + uxrCommunication *comm{nullptr}; // client key we present - static constexpr uint32_t uniqueClientKey = 0xAAAABBBB; + static constexpr uint32_t key = 0xAAAABBBB; public: + ~AP_DDS_Client(); + bool start(void); void main_loop(void); - //! @brief Initialize the client's transport, uxr session, and IO stream(s) + //! @brief Initialize the client's transport //! @return True on successful initialization, false on failure - bool init() WARN_IF_UNUSED; + bool init_transport() WARN_IF_UNUSED; + + //! @brief Initialize the client's uxr session and IO stream(s) + //! @return True on successful initialization, false on failure + bool init_session() WARN_IF_UNUSED; //! @brief Set up the client's participants, data read/writes, // publishers, subscribers diff --git a/libraries/AP_DDS/AP_DDS_Serial.cpp b/libraries/AP_DDS/AP_DDS_Serial.cpp index 8ce7cc3b0e..3906e16521 100644 --- a/libraries/AP_DDS/AP_DDS_Serial.cpp +++ b/libraries/AP_DDS/AP_DDS_Serial.cpp @@ -90,6 +90,6 @@ bool AP_DDS_Client::ddsSerialInit() if (!uxr_init_custom_transport(&serial.transport, (void*)this)) { return false; } - uxr_init_session(&session, &serial.transport.comm, uniqueClientKey); + comm = &serial.transport.comm; return true; } diff --git a/libraries/AP_DDS/AP_DDS_UDP.cpp b/libraries/AP_DDS/AP_DDS_UDP.cpp index 0569e0ecda..2610291231 100644 --- a/libraries/AP_DDS/AP_DDS_UDP.cpp +++ b/libraries/AP_DDS/AP_DDS_UDP.cpp @@ -83,7 +83,7 @@ bool AP_DDS_Client::ddsUdpInit() if (!uxr_init_custom_transport(&udp.transport, (void*)this)) { return false; } - uxr_init_session(&session, &udp.transport.comm, uniqueClientKey); + comm = &udp.transport.comm; return true; } #endif // AP_DDS_UDP_ENABLED