From fbc7a6dd9bbac7ba589c14a40c622120df016e0c Mon Sep 17 00:00:00 2001 From: arshPratap Date: Mon, 24 Apr 2023 15:13:32 +1000 Subject: [PATCH] AP_DDS: support UDP transport and switch serial transport to use custom transport --- libraries/AP_DDS/AP_DDS_Client.cpp | 174 +++++++++++------------------ libraries/AP_DDS/AP_DDS_Client.h | 59 ++++++++-- libraries/AP_DDS/AP_DDS_Serial.cpp | 95 ++++++++++++++++ libraries/AP_DDS/AP_DDS_UDP.cpp | 89 +++++++++++++++ libraries/AP_DDS/README.md | 6 +- libraries/AP_DDS/gen_config_h.py | 10 +- libraries/AP_DDS/wscript | 2 +- 7 files changed, 309 insertions(+), 126 deletions(-) create mode 100644 libraries/AP_DDS/AP_DDS_Serial.cpp create mode 100644 libraries/AP_DDS/AP_DDS_UDP.cpp diff --git a/libraries/AP_DDS/AP_DDS_Client.cpp b/libraries/AP_DDS/AP_DDS_Client.cpp index 7833c7b3b6..39618cb095 100644 --- a/libraries/AP_DDS/AP_DDS_Client.cpp +++ b/libraries/AP_DDS/AP_DDS_Client.cpp @@ -5,7 +5,6 @@ #include #include #include -#include #include #include #include @@ -21,11 +20,25 @@ static char WGS_84_FRAME_ID[] = "WGS-84"; // https://www.ros.org/reps/rep-0105.html#base-link static char BASE_LINK_FRAME_ID[] = "base_link"; -AP_HAL::UARTDriver *dds_port; +const AP_Param::GroupInfo AP_DDS_Client::var_info[] { + // @Param: _ENABLE + // @DisplayName: DDS enable + // @Description: Enable DDS subsystem + // @Values: 0:Disabled,1:Enabled + // @RebootRequired: True + // @User: Advanced + AP_GROUPINFO_FLAGS("_ENABLE", 1, AP_DDS_Client, enabled, 0, AP_PARAM_FLAG_ENABLE), -const AP_Param::GroupInfo AP_DDS_Client::var_info[]= { - //! @todo Params go here +#if AP_DDS_UDP_ENABLED + // @Param: _UDP_PORT + // @DisplayName: DDS UDP port + // @Description: UDP port number for DDS + // @Range: 1 65535 + // @RebootRequired: True + // @User: Standard + AP_GROUPINFO("_PORT", 2, AP_DDS_Client, udp.port, 2019), +#endif AP_GROUPEND }; @@ -183,8 +196,7 @@ void AP_DDS_Client::update_topic(sensor_msgs_msg_BatteryState& msg, const uint8_ update_topic(msg.header.stamp); auto &battery = AP::battery(); - if (!battery.healthy(instance)) - { + if (!battery.healthy(instance)) { msg.power_supply_status = 3; //POWER_SUPPLY_HEALTH_DEAD msg.present = false; return; @@ -203,36 +215,27 @@ void AP_DDS_Client::update_topic(sensor_msgs_msg_BatteryState& msg, const uint8_ msg.design_capacity = design_capacity; uint8_t percentage; - if (battery.capacity_remaining_pct(percentage, instance)) - { + if (battery.capacity_remaining_pct(percentage, instance)) { msg.percentage = percentage/100.0; msg.charge = (percentage * design_capacity)/100.0; - } - else - { + } else { msg.percentage = NAN; msg.charge = NAN; } msg.capacity = NAN; - if (battery.current_amps(current, instance)) - { + if (battery.current_amps(current, instance)) { if (percentage == 100) { msg.power_supply_status = 4; //POWER_SUPPLY_STATUS_FULL - } - else if (current < 0.0) { + } else if (current < 0.0) { msg.power_supply_status = 1; //POWER_SUPPLY_STATUS_CHARGING - } - else if (current > 0.0) { + } else if (current > 0.0) { msg.power_supply_status = 2; //POWER_SUPPLY_STATUS_DISCHARGING - } - else { + } else { msg.power_supply_status = 3; //POWER_SUPPLY_STATUS_NOT_CHARGING } - } - else - { + } else { msg.power_supply_status = 0; //POWER_SUPPLY_STATUS_UNKNOWN } @@ -240,8 +243,7 @@ void AP_DDS_Client::update_topic(sensor_msgs_msg_BatteryState& msg, const uint8_ msg.power_supply_technology = 0; //POWER_SUPPLY_TECHNOLOGY_UNKNOWN - if (battery.has_cell_voltages(instance)) - { + if (battery.has_cell_voltages(instance)) { const uint16_t* cellVoltages = battery.get_cell_voltages(instance).cells; std::copy(cellVoltages, cellVoltages + AP_BATT_MONITOR_CELLS_MAX, msg.cell_voltage); } @@ -268,8 +270,7 @@ void AP_DDS_Client::update_topic(geometry_msgs_msg_PoseStamped& msg) // as well as invert Z Vector3f position; - if (ahrs.get_relative_position_NED_home(position)) - { + if (ahrs.get_relative_position_NED_home(position)) { msg.pose.position.x = position[1]; msg.pose.position.y = position[0]; msg.pose.position.z = -position[2]; @@ -284,8 +285,7 @@ void AP_DDS_Client::update_topic(geometry_msgs_msg_PoseStamped& msg) // as well as invert Z (NED to ENU convertion) as well as a 90 degree rotation in the Z axis // for x to point forward Quaternion orientation; - if (ahrs.get_quaternion(orientation)) - { + if (ahrs.get_quaternion(orientation)) { Quaternion aux(orientation[0], orientation[2], orientation[1], -orientation[3]); //NED to ENU transformation Quaternion transformation (sqrt(2)/2,0,0,sqrt(2)/2); // Z axis 90 degree rotation orientation = aux * transformation; @@ -316,8 +316,7 @@ void AP_DDS_Client::update_topic(geometry_msgs_msg_TwistStamped& msg) // As a consequence, to follow ROS REP 103, it is necessary to switch X and Y, // as well as invert Z Vector3f velocity; - if (ahrs.get_velocity_NED(velocity)) - { + if (ahrs.get_velocity_NED(velocity)) { msg.twist.linear.x = velocity[1]; msg.twist.linear.y = velocity[0]; msg.twist.linear.z = -velocity[2]; @@ -340,15 +339,24 @@ void AP_DDS_Client::update_topic(geometry_msgs_msg_TwistStamped& msg) } /* - class constructor + start the DDS thread */ -AP_DDS_Client::AP_DDS_Client(void) +bool AP_DDS_Client::start(void) { + AP_Param::setup_object_defaults(this, var_info); + AP_Param::load_object_from_eeprom(this, var_info); + + if (enabled == 0) { + return true; + } + if (!hal.scheduler->thread_create(FUNCTOR_BIND_MEMBER(&AP_DDS_Client::main_loop, void), "DDS", 8192, AP_HAL::Scheduler::PRIORITY_IO, 1)) { GCS_SEND_TEXT(MAV_SEVERITY_ERROR,"DDS Client: thread create failed"); + return false; } + return true; } /* @@ -371,35 +379,42 @@ void AP_DDS_Client::main_loop(void) } } - bool AP_DDS_Client::init() { - AP_SerialManager *serial_manager = AP_SerialManager::get_singleton(); - dds_port = serial_manager->find_serial(AP_SerialManager::SerialProtocol_DDS_XRCE, 0); - if (dds_port == nullptr) { + // serial init will fail if the SERIALn_PROTOCOL is not setup + bool initTransportStatus = ddsSerialInit(); + is_using_serial = initTransportStatus; + +#if AP_DDS_UDP_ENABLED + // fallback to UDP if available + if (!initTransportStatus) { + initTransportStatus = ddsUdpInit(); + } +#endif + + if (!initTransportStatus) { + GCS_SEND_TEXT(MAV_SEVERITY_INFO,"DDS Client: Transport Initialization failed"); return false; } - - // ensure we own the UART - dds_port->begin(0); - - constexpr uint8_t fd = 0; - constexpr uint8_t relativeSerialAgentAddr = 0; - constexpr uint8_t relativeSerialClientAddr = 1; - if (!uxr_init_serial_transport(&serial_transport,fd,relativeSerialAgentAddr,relativeSerialClientAddr)) { - return false; - } - - constexpr uint32_t uniqueClientKey = 0xAAAABBBB; - //TODO does this need to be inside the loop to handle reconnect? - uxr_init_session(&session, &serial_transport.comm, uniqueClientKey); while (!uxr_create_session(&session)) { GCS_SEND_TEXT(MAV_SEVERITY_INFO,"DDS Client: Initialization waiting..."); hal.scheduler->delay(1000); } - reliable_in = uxr_create_input_reliable_stream(&session,input_reliable_stream,BUFFER_SIZE_SERIAL,STREAM_HISTORY); - reliable_out = uxr_create_output_reliable_stream(&session,output_reliable_stream,BUFFER_SIZE_SERIAL,STREAM_HISTORY); + // setup reliable stream buffers + input_reliable_stream = new uint8_t[DDS_STREAM_HISTORY*DDS_BUFFER_SIZE]; + if (input_reliable_stream == nullptr) { + GCS_SEND_TEXT(MAV_SEVERITY_ERROR,"DDS Client: allocation failed"); + return false; + } + output_reliable_stream = new uint8_t[DDS_STREAM_HISTORY*DDS_BUFFER_SIZE]; + if (output_reliable_stream == nullptr) { + GCS_SEND_TEXT(MAV_SEVERITY_ERROR,"DDS Client: allocation failed"); + return false; + } + + reliable_in = uxr_create_input_reliable_stream(&session, input_reliable_stream, DDS_BUFFER_SIZE, DDS_STREAM_HISTORY); + reliable_out = uxr_create_output_reliable_stream(&session, output_reliable_stream, DDS_BUFFER_SIZE, DDS_STREAM_HISTORY); GCS_SEND_TEXT(MAV_SEVERITY_INFO,"DDS Client: Init Complete"); @@ -601,61 +616,6 @@ void AP_DDS_Client::update() connected = uxr_run_session_time(&session, 1); } -/* - implement C functions for serial transport - */ -extern "C" { -#include -} - -bool uxr_init_serial_platform(void* args, int fd, uint8_t remote_addr, uint8_t local_addr) -{ - //! @todo Add error reporting - return true; -} - -bool uxr_close_serial_platform(void* args) -{ - //! @todo Add error reporting - return true; -} - -size_t uxr_write_serial_data_platform(void* args, const uint8_t* buf, size_t len, uint8_t* errcode) -{ - if (dds_port == nullptr) { - *errcode = 1; - return 0; - } - ssize_t bytes_written = dds_port->write(buf, len); - if (bytes_written <= 0) { - *errcode = 1; - return 0; - } - //! @todo Add populate the error code correctly - *errcode = 0; - return bytes_written; -} - -size_t uxr_read_serial_data_platform(void* args, uint8_t* buf, size_t len, int timeout, uint8_t* errcode) -{ - if (dds_port == nullptr) { - *errcode = 1; - return 0; - } - while (timeout > 0 && dds_port->available() < len) { - hal.scheduler->delay(1); // TODO select or poll this is limiting speed (1mS) - timeout--; - } - ssize_t bytes_read = dds_port->read(buf, len); - if (bytes_read <= 0) { - *errcode = 1; - return 0; - } - //! @todo Add error reporting - *errcode = 0; - return bytes_read; -} - #if CONFIG_HAL_BOARD != HAL_BOARD_SITL extern "C" { int clock_gettime(clockid_t clockid, struct timespec *ts); diff --git a/libraries/AP_DDS/AP_DDS_Client.h b/libraries/AP_DDS/AP_DDS_Client.h index 459cc73ed5..ecce3df1d2 100644 --- a/libraries/AP_DDS/AP_DDS_Client.h +++ b/libraries/AP_DDS/AP_DDS_Client.h @@ -21,8 +21,16 @@ #include "fcntl.h" #include -#define STREAM_HISTORY 8 -#define BUFFER_SIZE_SERIAL UXR_CONFIG_SERIAL_TRANSPORT_MTU * STREAM_HISTORY + +#define DDS_STREAM_HISTORY 8 +#define DDS_BUFFER_SIZE 512 + +// UDP only on SITL for now +#define AP_DDS_UDP_ENABLED (CONFIG_HAL_BOARD == HAL_BOARD_SITL) + +#if AP_DDS_UDP_ENABLED +#include +#endif extern const AP_HAL::HAL& hal; @@ -31,16 +39,16 @@ class AP_DDS_Client private: + AP_Int8 enabled; + // Serial Allocation - uxrSerialTransport serial_transport; // client uxr serial transport uxrSession session; //Session + bool is_using_serial; // true when using serial transport - // Input Stream - uint8_t input_reliable_stream[BUFFER_SIZE_SERIAL]; + // input and output stream + uint8_t *input_reliable_stream; + uint8_t *output_reliable_stream; uxrStreamId reliable_in; - - // Output Stream - uint8_t output_reliable_stream[BUFFER_SIZE_SERIAL]; uxrStreamId reliable_out; // Topic @@ -74,11 +82,40 @@ private: // The last ms timestamp AP_DDS wrote a Local Velocity message uint64_t last_local_velocity_time_ms; + // functions for serial transport + bool ddsSerialInit(); + static bool serial_transport_open(uxrCustomTransport* args); + static bool serial_transport_close(uxrCustomTransport* transport); + static size_t serial_transport_write(uxrCustomTransport* transport, const uint8_t* buf, size_t len, uint8_t* error); + static size_t serial_transport_read(uxrCustomTransport* transport, uint8_t* buf, size_t len, int timeout, uint8_t* error); + struct { + AP_HAL::UARTDriver *port; + uxrCustomTransport transport; + } serial; + +#if AP_DDS_UDP_ENABLED + // functions for udp transport + bool ddsUdpInit(); + static bool udp_transport_open(uxrCustomTransport* args); + static bool udp_transport_close(uxrCustomTransport* transport); + static size_t udp_transport_write(uxrCustomTransport* transport, const uint8_t* buf, size_t len, uint8_t* error); + static size_t udp_transport_read(uxrCustomTransport* transport, uint8_t* buf, size_t len, int timeout, uint8_t* error); + + struct { + AP_Int32 port; + // UDP endpoint + const char* ip = "127.0.0.1"; + // UDP Allocation + uxrCustomTransport transport; + SocketAPM *socket; + } udp; +#endif + + // client key we present + static constexpr uint32_t uniqueClientKey = 0xAAAABBBB; public: - // Constructor - AP_DDS_Client(); - + bool start(void); void main_loop(void); //! @brief Initialize the client's transport, uxr session, and IO stream(s) diff --git a/libraries/AP_DDS/AP_DDS_Serial.cpp b/libraries/AP_DDS/AP_DDS_Serial.cpp new file mode 100644 index 0000000000..8ce7cc3b0e --- /dev/null +++ b/libraries/AP_DDS/AP_DDS_Serial.cpp @@ -0,0 +1,95 @@ +#include "AP_DDS_Client.h" + +#include + +#include + +/* + open connection on a serial port + */ +bool AP_DDS_Client::serial_transport_open(uxrCustomTransport *t) +{ + AP_DDS_Client *dds = (AP_DDS_Client *)t->args; + AP_SerialManager *serial_manager = AP_SerialManager::get_singleton(); + auto *dds_port = serial_manager->find_serial(AP_SerialManager::SerialProtocol_DDS_XRCE, 0); + if (dds_port == nullptr) { + return false; + } + // ensure we own the UART + dds_port->begin(0); + dds->serial.port = dds_port; + return true; +} + +/* + close serial transport + */ +bool AP_DDS_Client::serial_transport_close(uxrCustomTransport *t) +{ + // we don't actually close the UART + return true; +} + +/* + write on serial transport + */ +size_t AP_DDS_Client::serial_transport_write(uxrCustomTransport *t, const uint8_t* buf, size_t len, uint8_t* error) +{ + AP_DDS_Client *dds = (AP_DDS_Client *)t->args; + if (dds->serial.port == nullptr) { + *error = EINVAL; + return 0; + } + ssize_t bytes_written = dds->serial.port->write(buf, len); + if (bytes_written <= 0) { + *error = 1; + return 0; + } + //! @todo populate the error code correctly + *error = 0; + return bytes_written; +} + +/* + read from a serial transport + */ +size_t AP_DDS_Client::serial_transport_read(uxrCustomTransport *t, uint8_t* buf, size_t len, int timeout_ms, uint8_t* error) +{ + AP_DDS_Client *dds = (AP_DDS_Client *)t->args; + if (dds->serial.port == nullptr) { + *error = EINVAL; + return 0; + } + const uint32_t tstart = AP_HAL::millis(); + while (AP_HAL::millis() - tstart < uint32_t(timeout_ms) && + dds->serial.port->available() < len) { + hal.scheduler->delay_microseconds(100); // TODO select or poll this is limiting speed (100us) + } + ssize_t bytes_read = dds->serial.port->read(buf, len); + if (bytes_read <= 0) { + *error = 1; + return 0; + } + //! @todo Add error reporting + *error = 0; + return bytes_read; +} + +/* + initialise serial connection + */ +bool AP_DDS_Client::ddsSerialInit() +{ + // setup a framed transport for serial + uxr_set_custom_transport_callbacks(&serial.transport, true, + serial_transport_open, + serial_transport_close, + serial_transport_write, + serial_transport_read); + + if (!uxr_init_custom_transport(&serial.transport, (void*)this)) { + return false; + } + uxr_init_session(&session, &serial.transport.comm, uniqueClientKey); + return true; +} diff --git a/libraries/AP_DDS/AP_DDS_UDP.cpp b/libraries/AP_DDS/AP_DDS_UDP.cpp new file mode 100644 index 0000000000..0569e0ecda --- /dev/null +++ b/libraries/AP_DDS/AP_DDS_UDP.cpp @@ -0,0 +1,89 @@ +#include "AP_DDS_Client.h" + +#if AP_DDS_UDP_ENABLED + +#include + +/* + open connection on UDP + */ +bool AP_DDS_Client::udp_transport_open(uxrCustomTransport *t) +{ + AP_DDS_Client *dds = (AP_DDS_Client *)t->args; + auto *sock = new SocketAPM(true); + if (sock == nullptr) { + return false; + } + if (!sock->connect(dds->udp.ip, dds->udp.port.get())) { + return false; + } + dds->udp.socket = sock; + return true; +} + +/* + close UDP connection + */ +bool AP_DDS_Client::udp_transport_close(uxrCustomTransport *t) +{ + AP_DDS_Client *dds = (AP_DDS_Client *)t->args; + delete dds->udp.socket; + dds->udp.socket = nullptr; + return true; +} + +/* + write on UDP + */ +size_t AP_DDS_Client::udp_transport_write(uxrCustomTransport *t, const uint8_t* buf, size_t len, uint8_t* error) +{ + AP_DDS_Client *dds = (AP_DDS_Client *)t->args; + if (dds->udp.socket == nullptr) { + *error = EINVAL; + return 0; + } + const ssize_t ret = dds->udp.socket->send(buf, len); + if (ret <= 0) { + *error = errno; + return 0; + } + return ret; +} + +/* + read from UDP + */ +size_t AP_DDS_Client::udp_transport_read(uxrCustomTransport *t, uint8_t* buf, size_t len, int timeout_ms, uint8_t* error) +{ + AP_DDS_Client *dds = (AP_DDS_Client *)t->args; + if (dds->udp.socket == nullptr) { + *error = EINVAL; + return 0; + } + const ssize_t ret = dds->udp.socket->recv(buf, len, timeout_ms); + if (ret <= 0) { + *error = errno; + return 0; + } + return ret; +} + +/* + initialise UDP connection + */ +bool AP_DDS_Client::ddsUdpInit() +{ + // setup a non-framed transport for UDP + uxr_set_custom_transport_callbacks(&udp.transport, false, + udp_transport_open, + udp_transport_close, + udp_transport_write, + udp_transport_read); + + if (!uxr_init_custom_transport(&udp.transport, (void*)this)) { + return false; + } + uxr_init_session(&session, &udp.transport.comm, uniqueClientKey); + return true; +} +#endif // AP_DDS_UDP_ENABLED diff --git a/libraries/AP_DDS/README.md b/libraries/AP_DDS/README.md index 0c835c0cf8..3752760914 100644 --- a/libraries/AP_DDS/README.md +++ b/libraries/AP_DDS/README.md @@ -80,17 +80,19 @@ sudo apt-get install socat ## Setup ardupilot for SITL with DDS Set up your [SITL](https://ardupilot.org/dev/docs/setting-up-sitl-on-linux.html). -Run the simulator with the following command. Take note how two parameters need adjusting from default to use DDS. +Run the simulator with the following command. Take note how three parameters need adjusting from default to use DDS. | Name | Description | | - | - | +| DDS_ENABLE | Set to 1 to enable DDS | | SERIAL1_BAUD | The serial baud rate for DDS | | SERIAL1_PROTOCOL | Set this to 45 to use DDS on the serial port | ```bash # Wipe params till you see "AP: ArduPilot Ready" # Select your favorite vehicle type -sim_vehicle.py -w -v ArduPlane +sim_vehicle.py -w -v ArduPlane --enable-dds # Set params +param set DDS_ENABLE 1 param set SERIAL1_BAUD 115 # See libraries/AP_SerialManager/AP_SerialManager.h AP_SerialManager SerialProtocol_DDS_XRCE param set SERIAL1_PROTOCOL 45 diff --git a/libraries/AP_DDS/gen_config_h.py b/libraries/AP_DDS/gen_config_h.py index f35af8f7b8..618a9f84f7 100755 --- a/libraries/AP_DDS/gen_config_h.py +++ b/libraries/AP_DDS/gen_config_h.py @@ -23,7 +23,7 @@ config = { 'UCLIENT_UDP_TRANSPORT_MTU': 300, 'UCLIENT_TCP_TRANSPORT_MTU': 350, 'UCLIENT_SERIAL_TRANSPORT_MTU': 1024, - 'UCLIENT_CUSTOM_TRANSPORT_MTU': 400, + 'UCLIENT_CUSTOM_TRANSPORT_MTU': 512, 'CONFIG_MACHINE_ENDIANNESS': 1, # little endian 'UCLIENT_SHARED_MEMORY_MAX_ENTITIES': 0, 'UCLIENT_SHARED_MEMORY_STATIC_MEM_SIZE': 0, @@ -34,10 +34,10 @@ defines = { "UCLIENT_PROFILE_UDP": 0, "UCLIENT_PROFILE_TCP": 0, "UCLIENT_PROFILE_CAN": 0, - "UCLIENT_PROFILE_SERIAL": 1, - "UCLIENT_PROFILE_CUSTOM_TRANSPORT": 0, + "UCLIENT_PROFILE_SERIAL": 0, + "UCLIENT_PROFILE_CUSTOM_TRANSPORT": 1, "UCLIENT_PROFILE_DISCOVERY": 0, - "UCLIENT_PLATFORM_POSIX": 1, + "UCLIENT_PLATFORM_POSIX": 0, "UCLIENT_PLATFORM_POSIX_NOPOLL": 0, "UCLIENT_PLATFORM_WINDOWS": 0, "UCLIENT_PLATFORM_FREERTOS_PLUS_TCP": 0, @@ -47,7 +47,7 @@ defines = { "UCLIENT_EXTERNAL_SERIAL": 0, "UCLIENT_PROFILE_MULTITHREAD": 0, "UCLIENT_PROFILE_SHARED_MEMORY": 0, - "UCLIENT_PROFILE_STREAM_FRAMING": 0, + "UCLIENT_PROFILE_STREAM_FRAMING": 1, "UCLIENT_PLATFORM_RTEMS_BSD_NET": 0, "UCLIENT_TWEAK_XRCE_WRITE_LIMIT": 0, "UCLIENT_HARD_LIVELINESS_CHECK": 0, diff --git a/libraries/AP_DDS/wscript b/libraries/AP_DDS/wscript index d4e62852fe..b181f9ab21 100644 --- a/libraries/AP_DDS/wscript +++ b/libraries/AP_DDS/wscript @@ -10,7 +10,7 @@ def configure(cfg): 'modules/Micro-XRCE-DDS-Client/src/c/core/session/*.c', 'modules/Micro-XRCE-DDS-Client/src/c/core/serialization/*.c', 'modules/Micro-XRCE-DDS-Client/src/c/util/*.c', - 'modules/Micro-XRCE-DDS-Client/src/c/profile/transport/serial/serial_transport.c', + 'modules/Micro-XRCE-DDS-Client/src/c/profile/transport/custom/custom_transport.c', 'modules/Micro-XRCE-DDS-Client/src/c/profile/transport/stream_framing/stream_framing_protocol.c', 'modules/Micro-CDR/src/c/types/*.c', 'modules/Micro-CDR/src/c/common.c',