uxrce_dds_client: refactor init to retry indefinitely

- move init from UxrceddsClient to init() method so that retry is
   possible for both serial and UDP init
This commit is contained in:
Daniel Agar 2023-12-13 15:49:51 -05:00
parent 9f4ae0a85d
commit b115d3cd44
2 changed files with 134 additions and 65 deletions

View File

@ -49,10 +49,6 @@
#include <stdlib.h> #include <stdlib.h>
#include <unistd.h> #include <unistd.h>
#if defined(CONFIG_NET) || defined(__PX4_POSIX)
# define UXRCE_DDS_CLIENT_UDP 1
#endif
#define STREAM_HISTORY 4 #define STREAM_HISTORY 4
#define BUFFER_SIZE (UXR_CONFIG_SERIAL_TRANSPORT_MTU * STREAM_HISTORY) // MTU==512 by default #define BUFFER_SIZE (UXR_CONFIG_SERIAL_TRANSPORT_MTU * STREAM_HISTORY) // MTU==512 by default
@ -110,66 +106,115 @@ void on_request(
UxrceddsClient::UxrceddsClient(Transport transport, const char *device, int baudrate, const char *agent_ip, UxrceddsClient::UxrceddsClient(Transport transport, const char *device, int baudrate, const char *agent_ip,
const char *port, const char *client_namespace) : const char *port, const char *client_namespace) :
ModuleParams(nullptr), ModuleParams(nullptr),
_transport(transport),
_baudrate(baudrate),
_client_namespace(client_namespace) _client_namespace(client_namespace)
{ {
if (transport == Transport::Serial) { if (device) {
// store serial port name */
strncpy(_device, device, sizeof(_device) - 1);
}
int fd = -1; #if defined(UXRCE_DDS_CLIENT_UDP)
for (int attempt = 0; attempt < 3; attempt++) { if (agent_ip) {
fd = open(device, O_RDWR | O_NOCTTY | O_NONBLOCK); strncpy(_agent_ip, agent_ip, sizeof(_agent_ip) - 1);
}
if (fd < 0) { if (port) {
PX4_ERR("open %s failed (%i)", device, errno); strncpy(_port, port, sizeof(_port) - 1);
// sleep before trying again }
px4_usleep(1_s);
} else { #endif // UXRCE_DDS_CLIENT_UDP
break; }
}
bool UxrceddsClient::init()
{
deinit();
if (_transport == Transport::Serial) {
int fd = open(_device, O_RDWR | O_NOCTTY | O_NONBLOCK);
if (fd < 0) {
PX4_ERR("open %s failed (%i)", _device, errno);
return false;
} }
_transport_serial = new uxrSerialTransport(); _transport_serial = new uxrSerialTransport();
if (fd >= 0 && setBaudrate(fd, baudrate) == 0 && _transport_serial) { // TODO:
// TODO: uint8_t remote_addr = 0; // Identifier of the Agent in the connection
uint8_t remote_addr = 0; // Identifier of the Agent in the connection uint8_t local_addr = 1; // Identifier of the Client in the serial connection
uint8_t local_addr = 1; // Identifier of the Client in the serial connection
if (uxr_init_serial_transport(_transport_serial, fd, remote_addr, local_addr)) { if (_transport_serial
_comm = &_transport_serial->comm; && setBaudrate(fd, _baudrate)
_fd = fd; && uxr_init_serial_transport(_transport_serial, fd, remote_addr, local_addr)
) {
PX4_INFO("init serial %s @ %d baud", _device, _baudrate);
} else { _comm = &_transport_serial->comm;
PX4_ERR("uxr_init_serial_transport failed"); _fd = fd;
}
return true;
} }
} else if (transport == Transport::Udp) { PX4_ERR("init serial %s @ %d baud failed", _device, _baudrate);
close(fd);
#if defined(UXRCE_DDS_CLIENT_UDP) delete _transport_serial;
_transport_udp = new uxrUDPTransport(); _transport_serial = nullptr;
strncpy(_port, port, PORT_MAX_LENGTH - 1);
strncpy(_agent_ip, agent_ip, AGENT_IP_MAX_LENGTH - 1);
if (_transport_udp) { return false;
if (uxr_init_udp_transport(_transport_udp, UXR_IPv4, _agent_ip, _port)) {
_comm = &_transport_udp->comm;
_fd = _transport_udp->platform.poll_fd.fd;
} else {
PX4_ERR("uxr_init_udp_transport failed");
}
}
#else
PX4_ERR("UDP not supported");
#endif
} }
_participant_config = static_cast<ParticipantConfig>(_param_uxrce_dds_ptcfg.get()); #if defined(UXRCE_DDS_CLIENT_UDP)
_synchronize_timestamps = _param_uxrce_dds_synct.get() > 0;
if (_transport == Transport::Udp) {
_transport_udp = new uxrUDPTransport();
if (_transport_udp && uxr_init_udp_transport(_transport_udp, UXR_IPv4, _agent_ip, _port)) {
PX4_INFO("init UDP agent IP:%s, port:%s", _agent_ip, _port);
_comm = &_transport_udp->comm;
_fd = _transport_udp->platform.poll_fd.fd;
return true;
} else {
PX4_ERR("init UDP agent IP:%s, port:%s failed", _agent_ip, _port);
}
}
#endif // UXRCE_DDS_CLIENT_UDP
return false;
}
void UxrceddsClient::deinit()
{
if (_fd >= 0) {
close(_fd);
_fd = -1;
}
if (_transport_serial) {
uxr_close_serial_transport(_transport_serial);
delete _transport_serial;
_transport_serial = nullptr;
}
#if defined(UXRCE_DDS_CLIENT_UDP)
if (_transport_udp) {
uxr_close_udp_transport(_transport_udp);
delete _transport_udp;
_transport_udp = nullptr;
}
#endif // UXRCE_DDS_CLIENT_UDP
_comm = nullptr;
} }
UxrceddsClient::~UxrceddsClient() UxrceddsClient::~UxrceddsClient()
@ -184,10 +229,14 @@ UxrceddsClient::~UxrceddsClient()
delete _transport_serial; delete _transport_serial;
} }
#if defined(UXRCE_DDS_CLIENT_UDP)
if (_transport_udp) { if (_transport_udp) {
uxr_close_udp_transport(_transport_udp); uxr_close_udp_transport(_transport_udp);
delete _transport_udp; delete _transport_udp;
} }
#endif // UXRCE_DDS_CLIENT_UDP
} }
static void fillMessageFormatResponse(const message_format_request_s &message_format_request, static void fillMessageFormatResponse(const message_format_request_s &message_format_request,
@ -286,11 +335,6 @@ void UxrceddsClient::syncSystemClock(uxrSession *session)
void UxrceddsClient::run() void UxrceddsClient::run()
{ {
if (!_comm) {
PX4_ERR("init failed");
return;
}
_subs = new SendTopicsSubs(); _subs = new SendTopicsSubs();
_pubs = new RcvTopicsPubs(); _pubs = new RcvTopicsPubs();
@ -300,6 +344,17 @@ void UxrceddsClient::run()
} }
while (!should_exit()) { while (!should_exit()) {
while (!should_exit() && !_comm) {
if (!init()) {
// sleep before trying again
px4_usleep(1'000'000);
}
}
_participant_config = static_cast<ParticipantConfig>(_param_uxrce_dds_ptcfg.get());
_synchronize_timestamps = (_param_uxrce_dds_synct.get() > 0);
bool got_response = false; bool got_response = false;
while (!should_exit() && !got_response) { while (!should_exit() && !got_response) {
@ -451,7 +506,7 @@ void UxrceddsClient::run()
break; break;
} }
px4_usleep(10_ms); px4_usleep(10'000);
} }
hrt_abstime last_sync_session = 0; hrt_abstime last_sync_session = 0;
@ -567,7 +622,7 @@ void UxrceddsClient::run()
} }
} }
int UxrceddsClient::setBaudrate(int fd, unsigned baud) bool UxrceddsClient::setBaudrate(int fd, unsigned baud)
{ {
int speed; int speed;
@ -640,7 +695,7 @@ int UxrceddsClient::setBaudrate(int fd, unsigned baud)
default: default:
PX4_ERR("ERR: unknown baudrate: %d", baud); PX4_ERR("ERR: unknown baudrate: %d", baud);
return -EINVAL; return false;
} }
struct termios uart_config; struct termios uart_config;
@ -688,20 +743,20 @@ int UxrceddsClient::setBaudrate(int fd, unsigned baud)
/* set baud rate */ /* set baud rate */
if ((termios_state = cfsetispeed(&uart_config, speed)) < 0) { if ((termios_state = cfsetispeed(&uart_config, speed)) < 0) {
PX4_ERR("ERR: %d (cfsetispeed)", termios_state); PX4_ERR("ERR: %d (cfsetispeed)", termios_state);
return -1; return false;
} }
if ((termios_state = cfsetospeed(&uart_config, speed)) < 0) { if ((termios_state = cfsetospeed(&uart_config, speed)) < 0) {
PX4_ERR("ERR: %d (cfsetospeed)", termios_state); PX4_ERR("ERR: %d (cfsetospeed)", termios_state);
return -1; return false;
} }
if ((termios_state = tcsetattr(fd, TCSANOW, &uart_config)) < 0) { if ((termios_state = tcsetattr(fd, TCSANOW, &uart_config)) < 0) {
PX4_ERR("ERR: %d (tcsetattr)", termios_state); PX4_ERR("ERR: %d (tcsetattr)", termios_state);
return -1; return false;
} }
return 0; return true;
} }
bool UxrceddsClient::add_replier(SrvBase *replier) bool UxrceddsClient::add_replier(SrvBase *replier)

View File

@ -44,6 +44,10 @@
#include <lib/timesync/Timesync.hpp> #include <lib/timesync/Timesync.hpp>
#if defined(CONFIG_NET) || defined(__PX4_POSIX)
# define UXRCE_DDS_CLIENT_UDP 1
#endif
#include "srv_base.h" #include "srv_base.h"
#define MAX_NUM_REPLIERS 5 #define MAX_NUM_REPLIERS 5
@ -108,7 +112,11 @@ public:
void delete_repliers(); void delete_repliers();
private: private:
int setBaudrate(int fd, unsigned baud);
bool init();
void deinit();
bool setBaudrate(int fd, unsigned baud);
void handleMessageFormatRequest(); void handleMessageFormatRequest();
@ -118,6 +126,12 @@ private:
/** Synchronizes the system clock if the time is off by more than 5 seconds */ /** Synchronizes the system clock if the time is off by more than 5 seconds */
void syncSystemClock(uxrSession *session); void syncSystemClock(uxrSession *session);
Transport _transport{};
uxrSerialTransport *_transport_serial{nullptr};
char _device[32] {};
int _baudrate{};
const char *_client_namespace; const char *_client_namespace;
enum class ParticipantConfig { enum class ParticipantConfig {
@ -130,13 +144,15 @@ private:
// max port characters (5+'\0') // max port characters (5+'\0')
static const uint8_t PORT_MAX_LENGTH = 6; static const uint8_t PORT_MAX_LENGTH = 6;
// max agent ip characters (15+'\0') // max agent ip characters (15+'\0')
static const uint8_t AGENT_IP_MAX_LENGTH = 16; static const uint8_t AGENT_IP_MAX_LENGTH = 16;
#if defined(CONFIG_NET) || defined(__PX4_POSIX) #if defined(UXRCE_DDS_CLIENT_UDP)
char _port[PORT_MAX_LENGTH]; char _port[PORT_MAX_LENGTH] {};
char _agent_ip[AGENT_IP_MAX_LENGTH]; char _agent_ip[AGENT_IP_MAX_LENGTH] {};
#endif uxrUDPTransport *_transport_udp{nullptr};
#endif // UXRCE_DDS_CLIENT_UDP
SendTopicsSubs *_subs{nullptr}; SendTopicsSubs *_subs{nullptr};
RcvTopicsPubs *_pubs{nullptr}; RcvTopicsPubs *_pubs{nullptr};
@ -144,8 +160,6 @@ private:
SrvBase *repliers_[MAX_NUM_REPLIERS]; SrvBase *repliers_[MAX_NUM_REPLIERS];
uint8_t num_of_repliers{0}; uint8_t num_of_repliers{0};
uxrSerialTransport *_transport_serial{nullptr};
uxrUDPTransport *_transport_udp{nullptr};
uxrCommunication *_comm{nullptr}; uxrCommunication *_comm{nullptr};
int _fd{-1}; int _fd{-1};