AP_DDS: support UDP transport

and switch serial transport to use custom transport
This commit is contained in:
arshPratap 2023-04-24 15:13:32 +10:00 committed by Andrew Tridgell
parent aa25461bbe
commit fbc7a6dd9b
7 changed files with 309 additions and 126 deletions

View File

@ -5,7 +5,6 @@
#include <AP_GPS/AP_GPS.h>
#include <AP_HAL/AP_HAL.h>
#include <AP_RTC/AP_RTC.h>
#include <AP_SerialManager/AP_SerialManager.h>
#include <AP_Math/AP_Math.h>
#include <GCS_MAVLink/GCS.h>
#include <AP_BattMonitor/AP_BattMonitor.h>
@ -21,11 +20,25 @@ static char WGS_84_FRAME_ID[] = "WGS-84";
// https://www.ros.org/reps/rep-0105.html#base-link
static char BASE_LINK_FRAME_ID[] = "base_link";
AP_HAL::UARTDriver *dds_port;
const AP_Param::GroupInfo AP_DDS_Client::var_info[] {
// @Param: _ENABLE
// @DisplayName: DDS enable
// @Description: Enable DDS subsystem
// @Values: 0:Disabled,1:Enabled
// @RebootRequired: True
// @User: Advanced
AP_GROUPINFO_FLAGS("_ENABLE", 1, AP_DDS_Client, enabled, 0, AP_PARAM_FLAG_ENABLE),
const AP_Param::GroupInfo AP_DDS_Client::var_info[]= {
//! @todo Params go here
#if AP_DDS_UDP_ENABLED
// @Param: _UDP_PORT
// @DisplayName: DDS UDP port
// @Description: UDP port number for DDS
// @Range: 1 65535
// @RebootRequired: True
// @User: Standard
AP_GROUPINFO("_PORT", 2, AP_DDS_Client, udp.port, 2019),
#endif
AP_GROUPEND
};
@ -183,8 +196,7 @@ void AP_DDS_Client::update_topic(sensor_msgs_msg_BatteryState& msg, const uint8_
update_topic(msg.header.stamp);
auto &battery = AP::battery();
if (!battery.healthy(instance))
{
if (!battery.healthy(instance)) {
msg.power_supply_status = 3; //POWER_SUPPLY_HEALTH_DEAD
msg.present = false;
return;
@ -203,36 +215,27 @@ void AP_DDS_Client::update_topic(sensor_msgs_msg_BatteryState& msg, const uint8_
msg.design_capacity = design_capacity;
uint8_t percentage;
if (battery.capacity_remaining_pct(percentage, instance))
{
if (battery.capacity_remaining_pct(percentage, instance)) {
msg.percentage = percentage/100.0;
msg.charge = (percentage * design_capacity)/100.0;
}
else
{
} else {
msg.percentage = NAN;
msg.charge = NAN;
}
msg.capacity = NAN;
if (battery.current_amps(current, instance))
{
if (battery.current_amps(current, instance)) {
if (percentage == 100) {
msg.power_supply_status = 4; //POWER_SUPPLY_STATUS_FULL
}
else if (current < 0.0) {
} else if (current < 0.0) {
msg.power_supply_status = 1; //POWER_SUPPLY_STATUS_CHARGING
}
else if (current > 0.0) {
} else if (current > 0.0) {
msg.power_supply_status = 2; //POWER_SUPPLY_STATUS_DISCHARGING
}
else {
} else {
msg.power_supply_status = 3; //POWER_SUPPLY_STATUS_NOT_CHARGING
}
}
else
{
} else {
msg.power_supply_status = 0; //POWER_SUPPLY_STATUS_UNKNOWN
}
@ -240,8 +243,7 @@ void AP_DDS_Client::update_topic(sensor_msgs_msg_BatteryState& msg, const uint8_
msg.power_supply_technology = 0; //POWER_SUPPLY_TECHNOLOGY_UNKNOWN
if (battery.has_cell_voltages(instance))
{
if (battery.has_cell_voltages(instance)) {
const uint16_t* cellVoltages = battery.get_cell_voltages(instance).cells;
std::copy(cellVoltages, cellVoltages + AP_BATT_MONITOR_CELLS_MAX, msg.cell_voltage);
}
@ -268,8 +270,7 @@ void AP_DDS_Client::update_topic(geometry_msgs_msg_PoseStamped& msg)
// as well as invert Z
Vector3f position;
if (ahrs.get_relative_position_NED_home(position))
{
if (ahrs.get_relative_position_NED_home(position)) {
msg.pose.position.x = position[1];
msg.pose.position.y = position[0];
msg.pose.position.z = -position[2];
@ -284,8 +285,7 @@ void AP_DDS_Client::update_topic(geometry_msgs_msg_PoseStamped& msg)
// as well as invert Z (NED to ENU convertion) as well as a 90 degree rotation in the Z axis
// for x to point forward
Quaternion orientation;
if (ahrs.get_quaternion(orientation))
{
if (ahrs.get_quaternion(orientation)) {
Quaternion aux(orientation[0], orientation[2], orientation[1], -orientation[3]); //NED to ENU transformation
Quaternion transformation (sqrt(2)/2,0,0,sqrt(2)/2); // Z axis 90 degree rotation
orientation = aux * transformation;
@ -316,8 +316,7 @@ void AP_DDS_Client::update_topic(geometry_msgs_msg_TwistStamped& msg)
// As a consequence, to follow ROS REP 103, it is necessary to switch X and Y,
// as well as invert Z
Vector3f velocity;
if (ahrs.get_velocity_NED(velocity))
{
if (ahrs.get_velocity_NED(velocity)) {
msg.twist.linear.x = velocity[1];
msg.twist.linear.y = velocity[0];
msg.twist.linear.z = -velocity[2];
@ -340,15 +339,24 @@ void AP_DDS_Client::update_topic(geometry_msgs_msg_TwistStamped& msg)
}
/*
class constructor
start the DDS thread
*/
AP_DDS_Client::AP_DDS_Client(void)
bool AP_DDS_Client::start(void)
{
AP_Param::setup_object_defaults(this, var_info);
AP_Param::load_object_from_eeprom(this, var_info);
if (enabled == 0) {
return true;
}
if (!hal.scheduler->thread_create(FUNCTOR_BIND_MEMBER(&AP_DDS_Client::main_loop, void),
"DDS",
8192, AP_HAL::Scheduler::PRIORITY_IO, 1)) {
GCS_SEND_TEXT(MAV_SEVERITY_ERROR,"DDS Client: thread create failed");
return false;
}
return true;
}
/*
@ -371,35 +379,42 @@ void AP_DDS_Client::main_loop(void)
}
}
bool AP_DDS_Client::init()
{
AP_SerialManager *serial_manager = AP_SerialManager::get_singleton();
dds_port = serial_manager->find_serial(AP_SerialManager::SerialProtocol_DDS_XRCE, 0);
if (dds_port == nullptr) {
// serial init will fail if the SERIALn_PROTOCOL is not setup
bool initTransportStatus = ddsSerialInit();
is_using_serial = initTransportStatus;
#if AP_DDS_UDP_ENABLED
// fallback to UDP if available
if (!initTransportStatus) {
initTransportStatus = ddsUdpInit();
}
#endif
if (!initTransportStatus) {
GCS_SEND_TEXT(MAV_SEVERITY_INFO,"DDS Client: Transport Initialization failed");
return false;
}
// ensure we own the UART
dds_port->begin(0);
constexpr uint8_t fd = 0;
constexpr uint8_t relativeSerialAgentAddr = 0;
constexpr uint8_t relativeSerialClientAddr = 1;
if (!uxr_init_serial_transport(&serial_transport,fd,relativeSerialAgentAddr,relativeSerialClientAddr)) {
return false;
}
constexpr uint32_t uniqueClientKey = 0xAAAABBBB;
//TODO does this need to be inside the loop to handle reconnect?
uxr_init_session(&session, &serial_transport.comm, uniqueClientKey);
while (!uxr_create_session(&session)) {
GCS_SEND_TEXT(MAV_SEVERITY_INFO,"DDS Client: Initialization waiting...");
hal.scheduler->delay(1000);
}
reliable_in = uxr_create_input_reliable_stream(&session,input_reliable_stream,BUFFER_SIZE_SERIAL,STREAM_HISTORY);
reliable_out = uxr_create_output_reliable_stream(&session,output_reliable_stream,BUFFER_SIZE_SERIAL,STREAM_HISTORY);
// setup reliable stream buffers
input_reliable_stream = new uint8_t[DDS_STREAM_HISTORY*DDS_BUFFER_SIZE];
if (input_reliable_stream == nullptr) {
GCS_SEND_TEXT(MAV_SEVERITY_ERROR,"DDS Client: allocation failed");
return false;
}
output_reliable_stream = new uint8_t[DDS_STREAM_HISTORY*DDS_BUFFER_SIZE];
if (output_reliable_stream == nullptr) {
GCS_SEND_TEXT(MAV_SEVERITY_ERROR,"DDS Client: allocation failed");
return false;
}
reliable_in = uxr_create_input_reliable_stream(&session, input_reliable_stream, DDS_BUFFER_SIZE, DDS_STREAM_HISTORY);
reliable_out = uxr_create_output_reliable_stream(&session, output_reliable_stream, DDS_BUFFER_SIZE, DDS_STREAM_HISTORY);
GCS_SEND_TEXT(MAV_SEVERITY_INFO,"DDS Client: Init Complete");
@ -601,61 +616,6 @@ void AP_DDS_Client::update()
connected = uxr_run_session_time(&session, 1);
}
/*
implement C functions for serial transport
*/
extern "C" {
#include <uxr/client/profile/transport/serial/serial_transport_platform.h>
}
bool uxr_init_serial_platform(void* args, int fd, uint8_t remote_addr, uint8_t local_addr)
{
//! @todo Add error reporting
return true;
}
bool uxr_close_serial_platform(void* args)
{
//! @todo Add error reporting
return true;
}
size_t uxr_write_serial_data_platform(void* args, const uint8_t* buf, size_t len, uint8_t* errcode)
{
if (dds_port == nullptr) {
*errcode = 1;
return 0;
}
ssize_t bytes_written = dds_port->write(buf, len);
if (bytes_written <= 0) {
*errcode = 1;
return 0;
}
//! @todo Add populate the error code correctly
*errcode = 0;
return bytes_written;
}
size_t uxr_read_serial_data_platform(void* args, uint8_t* buf, size_t len, int timeout, uint8_t* errcode)
{
if (dds_port == nullptr) {
*errcode = 1;
return 0;
}
while (timeout > 0 && dds_port->available() < len) {
hal.scheduler->delay(1); // TODO select or poll this is limiting speed (1mS)
timeout--;
}
ssize_t bytes_read = dds_port->read(buf, len);
if (bytes_read <= 0) {
*errcode = 1;
return 0;
}
//! @todo Add error reporting
*errcode = 0;
return bytes_read;
}
#if CONFIG_HAL_BOARD != HAL_BOARD_SITL
extern "C" {
int clock_gettime(clockid_t clockid, struct timespec *ts);

View File

@ -21,8 +21,16 @@
#include "fcntl.h"
#include <AP_Param/AP_Param.h>
#define STREAM_HISTORY 8
#define BUFFER_SIZE_SERIAL UXR_CONFIG_SERIAL_TRANSPORT_MTU * STREAM_HISTORY
#define DDS_STREAM_HISTORY 8
#define DDS_BUFFER_SIZE 512
// UDP only on SITL for now
#define AP_DDS_UDP_ENABLED (CONFIG_HAL_BOARD == HAL_BOARD_SITL)
#if AP_DDS_UDP_ENABLED
#include <AP_HAL/utility/Socket.h>
#endif
extern const AP_HAL::HAL& hal;
@ -31,16 +39,16 @@ class AP_DDS_Client
private:
AP_Int8 enabled;
// Serial Allocation
uxrSerialTransport serial_transport; // client uxr serial transport
uxrSession session; //Session
bool is_using_serial; // true when using serial transport
// Input Stream
uint8_t input_reliable_stream[BUFFER_SIZE_SERIAL];
// input and output stream
uint8_t *input_reliable_stream;
uint8_t *output_reliable_stream;
uxrStreamId reliable_in;
// Output Stream
uint8_t output_reliable_stream[BUFFER_SIZE_SERIAL];
uxrStreamId reliable_out;
// Topic
@ -74,11 +82,40 @@ private:
// The last ms timestamp AP_DDS wrote a Local Velocity message
uint64_t last_local_velocity_time_ms;
// functions for serial transport
bool ddsSerialInit();
static bool serial_transport_open(uxrCustomTransport* args);
static bool serial_transport_close(uxrCustomTransport* transport);
static size_t serial_transport_write(uxrCustomTransport* transport, const uint8_t* buf, size_t len, uint8_t* error);
static size_t serial_transport_read(uxrCustomTransport* transport, uint8_t* buf, size_t len, int timeout, uint8_t* error);
struct {
AP_HAL::UARTDriver *port;
uxrCustomTransport transport;
} serial;
#if AP_DDS_UDP_ENABLED
// functions for udp transport
bool ddsUdpInit();
static bool udp_transport_open(uxrCustomTransport* args);
static bool udp_transport_close(uxrCustomTransport* transport);
static size_t udp_transport_write(uxrCustomTransport* transport, const uint8_t* buf, size_t len, uint8_t* error);
static size_t udp_transport_read(uxrCustomTransport* transport, uint8_t* buf, size_t len, int timeout, uint8_t* error);
struct {
AP_Int32 port;
// UDP endpoint
const char* ip = "127.0.0.1";
// UDP Allocation
uxrCustomTransport transport;
SocketAPM *socket;
} udp;
#endif
// client key we present
static constexpr uint32_t uniqueClientKey = 0xAAAABBBB;
public:
// Constructor
AP_DDS_Client();
bool start(void);
void main_loop(void);
//! @brief Initialize the client's transport, uxr session, and IO stream(s)

View File

@ -0,0 +1,95 @@
#include "AP_DDS_Client.h"
#include <AP_SerialManager/AP_SerialManager.h>
#include <errno.h>
/*
open connection on a serial port
*/
bool AP_DDS_Client::serial_transport_open(uxrCustomTransport *t)
{
AP_DDS_Client *dds = (AP_DDS_Client *)t->args;
AP_SerialManager *serial_manager = AP_SerialManager::get_singleton();
auto *dds_port = serial_manager->find_serial(AP_SerialManager::SerialProtocol_DDS_XRCE, 0);
if (dds_port == nullptr) {
return false;
}
// ensure we own the UART
dds_port->begin(0);
dds->serial.port = dds_port;
return true;
}
/*
close serial transport
*/
bool AP_DDS_Client::serial_transport_close(uxrCustomTransport *t)
{
// we don't actually close the UART
return true;
}
/*
write on serial transport
*/
size_t AP_DDS_Client::serial_transport_write(uxrCustomTransport *t, const uint8_t* buf, size_t len, uint8_t* error)
{
AP_DDS_Client *dds = (AP_DDS_Client *)t->args;
if (dds->serial.port == nullptr) {
*error = EINVAL;
return 0;
}
ssize_t bytes_written = dds->serial.port->write(buf, len);
if (bytes_written <= 0) {
*error = 1;
return 0;
}
//! @todo populate the error code correctly
*error = 0;
return bytes_written;
}
/*
read from a serial transport
*/
size_t AP_DDS_Client::serial_transport_read(uxrCustomTransport *t, uint8_t* buf, size_t len, int timeout_ms, uint8_t* error)
{
AP_DDS_Client *dds = (AP_DDS_Client *)t->args;
if (dds->serial.port == nullptr) {
*error = EINVAL;
return 0;
}
const uint32_t tstart = AP_HAL::millis();
while (AP_HAL::millis() - tstart < uint32_t(timeout_ms) &&
dds->serial.port->available() < len) {
hal.scheduler->delay_microseconds(100); // TODO select or poll this is limiting speed (100us)
}
ssize_t bytes_read = dds->serial.port->read(buf, len);
if (bytes_read <= 0) {
*error = 1;
return 0;
}
//! @todo Add error reporting
*error = 0;
return bytes_read;
}
/*
initialise serial connection
*/
bool AP_DDS_Client::ddsSerialInit()
{
// setup a framed transport for serial
uxr_set_custom_transport_callbacks(&serial.transport, true,
serial_transport_open,
serial_transport_close,
serial_transport_write,
serial_transport_read);
if (!uxr_init_custom_transport(&serial.transport, (void*)this)) {
return false;
}
uxr_init_session(&session, &serial.transport.comm, uniqueClientKey);
return true;
}

View File

@ -0,0 +1,89 @@
#include "AP_DDS_Client.h"
#if AP_DDS_UDP_ENABLED
#include <errno.h>
/*
open connection on UDP
*/
bool AP_DDS_Client::udp_transport_open(uxrCustomTransport *t)
{
AP_DDS_Client *dds = (AP_DDS_Client *)t->args;
auto *sock = new SocketAPM(true);
if (sock == nullptr) {
return false;
}
if (!sock->connect(dds->udp.ip, dds->udp.port.get())) {
return false;
}
dds->udp.socket = sock;
return true;
}
/*
close UDP connection
*/
bool AP_DDS_Client::udp_transport_close(uxrCustomTransport *t)
{
AP_DDS_Client *dds = (AP_DDS_Client *)t->args;
delete dds->udp.socket;
dds->udp.socket = nullptr;
return true;
}
/*
write on UDP
*/
size_t AP_DDS_Client::udp_transport_write(uxrCustomTransport *t, const uint8_t* buf, size_t len, uint8_t* error)
{
AP_DDS_Client *dds = (AP_DDS_Client *)t->args;
if (dds->udp.socket == nullptr) {
*error = EINVAL;
return 0;
}
const ssize_t ret = dds->udp.socket->send(buf, len);
if (ret <= 0) {
*error = errno;
return 0;
}
return ret;
}
/*
read from UDP
*/
size_t AP_DDS_Client::udp_transport_read(uxrCustomTransport *t, uint8_t* buf, size_t len, int timeout_ms, uint8_t* error)
{
AP_DDS_Client *dds = (AP_DDS_Client *)t->args;
if (dds->udp.socket == nullptr) {
*error = EINVAL;
return 0;
}
const ssize_t ret = dds->udp.socket->recv(buf, len, timeout_ms);
if (ret <= 0) {
*error = errno;
return 0;
}
return ret;
}
/*
initialise UDP connection
*/
bool AP_DDS_Client::ddsUdpInit()
{
// setup a non-framed transport for UDP
uxr_set_custom_transport_callbacks(&udp.transport, false,
udp_transport_open,
udp_transport_close,
udp_transport_write,
udp_transport_read);
if (!uxr_init_custom_transport(&udp.transport, (void*)this)) {
return false;
}
uxr_init_session(&session, &udp.transport.comm, uniqueClientKey);
return true;
}
#endif // AP_DDS_UDP_ENABLED

View File

@ -80,17 +80,19 @@ sudo apt-get install socat
## Setup ardupilot for SITL with DDS
Set up your [SITL](https://ardupilot.org/dev/docs/setting-up-sitl-on-linux.html).
Run the simulator with the following command. Take note how two parameters need adjusting from default to use DDS.
Run the simulator with the following command. Take note how three parameters need adjusting from default to use DDS.
| Name | Description |
| - | - |
| DDS_ENABLE | Set to 1 to enable DDS |
| SERIAL1_BAUD | The serial baud rate for DDS |
| SERIAL1_PROTOCOL | Set this to 45 to use DDS on the serial port |
```bash
# Wipe params till you see "AP: ArduPilot Ready"
# Select your favorite vehicle type
sim_vehicle.py -w -v ArduPlane
sim_vehicle.py -w -v ArduPlane --enable-dds
# Set params
param set DDS_ENABLE 1
param set SERIAL1_BAUD 115
# See libraries/AP_SerialManager/AP_SerialManager.h AP_SerialManager SerialProtocol_DDS_XRCE
param set SERIAL1_PROTOCOL 45

View File

@ -23,7 +23,7 @@ config = {
'UCLIENT_UDP_TRANSPORT_MTU': 300,
'UCLIENT_TCP_TRANSPORT_MTU': 350,
'UCLIENT_SERIAL_TRANSPORT_MTU': 1024,
'UCLIENT_CUSTOM_TRANSPORT_MTU': 400,
'UCLIENT_CUSTOM_TRANSPORT_MTU': 512,
'CONFIG_MACHINE_ENDIANNESS': 1, # little endian
'UCLIENT_SHARED_MEMORY_MAX_ENTITIES': 0,
'UCLIENT_SHARED_MEMORY_STATIC_MEM_SIZE': 0,
@ -34,10 +34,10 @@ defines = {
"UCLIENT_PROFILE_UDP": 0,
"UCLIENT_PROFILE_TCP": 0,
"UCLIENT_PROFILE_CAN": 0,
"UCLIENT_PROFILE_SERIAL": 1,
"UCLIENT_PROFILE_CUSTOM_TRANSPORT": 0,
"UCLIENT_PROFILE_SERIAL": 0,
"UCLIENT_PROFILE_CUSTOM_TRANSPORT": 1,
"UCLIENT_PROFILE_DISCOVERY": 0,
"UCLIENT_PLATFORM_POSIX": 1,
"UCLIENT_PLATFORM_POSIX": 0,
"UCLIENT_PLATFORM_POSIX_NOPOLL": 0,
"UCLIENT_PLATFORM_WINDOWS": 0,
"UCLIENT_PLATFORM_FREERTOS_PLUS_TCP": 0,
@ -47,7 +47,7 @@ defines = {
"UCLIENT_EXTERNAL_SERIAL": 0,
"UCLIENT_PROFILE_MULTITHREAD": 0,
"UCLIENT_PROFILE_SHARED_MEMORY": 0,
"UCLIENT_PROFILE_STREAM_FRAMING": 0,
"UCLIENT_PROFILE_STREAM_FRAMING": 1,
"UCLIENT_PLATFORM_RTEMS_BSD_NET": 0,
"UCLIENT_TWEAK_XRCE_WRITE_LIMIT": 0,
"UCLIENT_HARD_LIVELINESS_CHECK": 0,

View File

@ -10,7 +10,7 @@ def configure(cfg):
'modules/Micro-XRCE-DDS-Client/src/c/core/session/*.c',
'modules/Micro-XRCE-DDS-Client/src/c/core/serialization/*.c',
'modules/Micro-XRCE-DDS-Client/src/c/util/*.c',
'modules/Micro-XRCE-DDS-Client/src/c/profile/transport/serial/serial_transport.c',
'modules/Micro-XRCE-DDS-Client/src/c/profile/transport/custom/custom_transport.c',
'modules/Micro-XRCE-DDS-Client/src/c/profile/transport/stream_framing/stream_framing_protocol.c',
'modules/Micro-CDR/src/c/types/*.c',
'modules/Micro-CDR/src/c/common.c',