AP_DDS: Added ROS 2 service support

This commit is contained in:
arshPratap 2023-06-27 12:16:06 +05:30 committed by Andrew Tridgell
parent 5f17e33b39
commit 47efaf9c62
6 changed files with 239 additions and 21 deletions

View File

@ -9,8 +9,11 @@
#include <GCS_MAVLink/GCS.h>
#include <AP_BattMonitor/AP_BattMonitor.h>
#include <AP_AHRS/AP_AHRS.h>
#include <AP_Arming/AP_Arming.h>
#include "AP_DDS_Client.h"
#include "AP_DDS_Topic_Table.h"
#include "AP_DDS_Service_Table.h"
static constexpr uint16_t DELAY_TIME_TOPIC_MS = 10;
static constexpr uint16_t DELAY_BATTERY_STATE_TOPIC_MS = 1000;
@ -19,6 +22,7 @@ static constexpr uint16_t DELAY_LOCAL_VELOCITY_TOPIC_MS = 33;
static constexpr uint16_t DELAY_GEO_POSE_TOPIC_MS = 33;
static constexpr uint16_t DELAY_CLOCK_TOPIC_MS = 10;
static char WGS_84_FRAME_ID[] = "WGS-84";
// https://www.ros.org/reps/rep-0105.html#base-link
static char BASE_LINK_FRAME_ID[] = "base_link";
@ -29,6 +33,7 @@ sensor_msgs_msg_Joy AP_DDS_Client::rx_joy_topic {};
tf2_msgs_msg_TFMessage AP_DDS_Client::rx_dynamic_transforms_topic {};
geometry_msgs_msg_TwistStamped AP_DDS_Client::rx_velocity_control_topic {};
const AP_Param::GroupInfo AP_DDS_Client::var_info[] {
// @Param: _ENABLE
@ -47,13 +52,12 @@ const AP_Param::GroupInfo AP_DDS_Client::var_info[] {
// @RebootRequired: True
// @User: Standard
AP_GROUPINFO("_PORT", 2, AP_DDS_Client, udp.port, 2019),
#endif
AP_GROUPEND
};
#include "AP_DDS_Topic_Table.h"
void AP_DDS_Client::update_topic(builtin_interfaces_msg_Time& msg)
{
uint64_t utc_usec;
@ -409,15 +413,24 @@ bool AP_DDS_Client::start(void)
}
// read function triggered at every subscription callback
void AP_DDS_Client::on_topic (uxrSession* uxr_session, uxrObjectId object_id, uint16_t request_id, uxrStreamId stream_id, struct ucdrBuffer* ub, uint16_t length, void* args)
void AP_DDS_Client::on_topic_trampoline(uxrSession* uxr_session, uxrObjectId object_id, uint16_t request_id, uxrStreamId stream_id, struct ucdrBuffer* ub, uint16_t length,
void* args)
{
AP_DDS_Client *dds = (AP_DDS_Client *)args;
dds->on_topic(uxr_session, object_id, request_id, stream_id, ub, length);
}
void AP_DDS_Client::on_topic(uxrSession* uxr_session, uxrObjectId object_id, uint16_t request_id, uxrStreamId stream_id, struct ucdrBuffer* ub, uint16_t length)
{
(void) uxr_session; (void) object_id; (void) request_id; (void) stream_id; (void) length;
/*
TEMPLATE for reading to the subscribed topics
1) Store the read contents into the ucdr buffer
2) Deserialize the said contents into the topic instance
*/
(void) uxr_session;
(void) request_id;
(void) stream_id;
(void) length;
switch (object_id.id) {
case topics[to_underlying(TopicIndex::JOY_SUB)].dr_id.id: {
const bool success = sensor_msgs_msg_Joy_deserialize_topic(ub, &rx_joy_topic);
@ -426,8 +439,7 @@ void AP_DDS_Client::on_topic (uxrSession* uxr_session, uxrObjectId object_id, ui
break;
}
uint32_t* count_ptr = (uint32_t*) args;
(*count_ptr)++;
subscribe_sample_count++;
if (rx_joy_topic.axes_size >= 4) {
GCS_SEND_TEXT(MAV_SEVERITY_INFO,"Received sensor_msgs/Joy: %f, %f, %f, %f",
rx_joy_topic.axes[0], rx_joy_topic.axes[1], rx_joy_topic.axes[2], rx_joy_topic.axes[3]);
@ -442,8 +454,8 @@ void AP_DDS_Client::on_topic (uxrSession* uxr_session, uxrObjectId object_id, ui
if (success == false) {
break;
}
uint32_t* count_ptr = (uint32_t*) args;
(*count_ptr)++;
subscribe_sample_count++;
if (rx_dynamic_transforms_topic.transforms_size > 0) {
GCS_SEND_TEXT(MAV_SEVERITY_INFO,"Received tf2_msgs/TFMessage of length: %u",
static_cast<unsigned>(rx_dynamic_transforms_topic.transforms_size));
@ -458,8 +470,8 @@ void AP_DDS_Client::on_topic (uxrSession* uxr_session, uxrObjectId object_id, ui
if (success == false) {
break;
}
uint32_t* count_ptr = (uint32_t*) args;
(*count_ptr)++;
subscribe_sample_count++;
GCS_SEND_TEXT(MAV_SEVERITY_INFO,"Received geometry_msgs/TwistStamped");
// TODO implement the velocity control to AP
break;
@ -468,6 +480,64 @@ void AP_DDS_Client::on_topic (uxrSession* uxr_session, uxrObjectId object_id, ui
}
/*
callback on request completion
*/
void AP_DDS_Client::on_request_trampoline(uxrSession* uxr_session, uxrObjectId object_id, uint16_t request_id, SampleIdentity* sample_id, ucdrBuffer* ub, uint16_t length, void* args)
{
AP_DDS_Client *dds = (AP_DDS_Client *)args;
dds->on_request(uxr_session, object_id, request_id, sample_id, ub, length);
}
void AP_DDS_Client::on_request(uxrSession* uxr_session, uxrObjectId object_id, uint16_t request_id, SampleIdentity* sample_id, ucdrBuffer* ub, uint16_t length)
{
(void) request_id;
(void) length;
switch (object_id.id) {
case services[to_underlying(ServiceIndex::ARMING_MOTORS)].rep_id: {
bool arm;
bool result;
const bool deserialize_success = ucdr_deserialize_bool(ub,&arm);
if (deserialize_success == false) {
break;
}
if (arm) {
GCS_SEND_TEXT(MAV_SEVERITY_INFO,"Request for arming received");
result = AP::arming().arm(AP_Arming::Method::DDS);
} else {
GCS_SEND_TEXT(MAV_SEVERITY_INFO,"Request for disarming received");
result = AP::arming().disarm(AP_Arming::Method::DDS);
}
const uxrObjectId replier_id = {
.id = services[to_underlying(ServiceIndex::ARMING_MOTORS)].rep_id,
.type = UXR_REPLIER_ID
};
//Todo : Fix the size-handling of services with the help of the functions autogenerated via Micro-XRCE-DDS Gen
uint8_t reply_buffer[8] {};
ucdrBuffer reply_ub;
ucdr_init_buffer(&reply_ub, reply_buffer, sizeof(reply_buffer));
const bool serialize_success = ucdr_serialize_bool(&reply_ub,result);
if (serialize_success == false) {
break;
}
request_sample_count++;
uxr_buffer_reply(uxr_session, reliable_out, replier_id, sample_id, reply_buffer, ucdr_buffer_length(&reply_ub));
if (result) {
GCS_SEND_TEXT(MAV_SEVERITY_INFO,"DDS Client: Request for Arming/Disarming : SUCCESS");
} else {
GCS_SEND_TEXT(MAV_SEVERITY_INFO,"DDS Client: Request for Arming/Disarming : FAIL");
}
break;
}
}
}
/*
main loop for DDS thread
*/
@ -507,7 +577,10 @@ bool AP_DDS_Client::init()
}
// Register topic callbacks
uxr_set_topic_callback(&session, AP_DDS_Client::on_topic, &count);
uxr_set_topic_callback(&session, AP_DDS_Client::on_topic_trampoline, this);
// ROS-2 Service : Register service request callbacks
uxr_set_request_callback(&session, AP_DDS_Client::on_request_trampoline, this);
while (!uxr_create_session(&session)) {
GCS_SEND_TEXT(MAV_SEVERITY_INFO,"DDS Client: Initialization waiting...");
@ -559,7 +632,7 @@ bool AP_DDS_Client::create()
return false;
}
for (size_t i = 0 ; i < ARRAY_SIZE(topics); i++) {
for (uint16_t i = 0 ; i < ARRAY_SIZE(topics); i++) {
// Topic
const uxrObjectId topic_id = {
.id = topics[i].topic_id,
@ -571,7 +644,7 @@ bool AP_DDS_Client::create()
// Status requests
constexpr uint8_t nRequests = 3;
uint16_t requests[nRequests];
constexpr uint16_t requestTimeoutMs = (uint8_t) nRequests * maxTimeMsPerRequestMs;
constexpr uint16_t requestTimeoutMs = nRequests * maxTimeMsPerRequestMs;
uint8_t status[nRequests];
if (strlen(topics[i].dw_profile_label) > 0) {
@ -593,14 +666,14 @@ bool AP_DDS_Client::create()
requests[2] = dwriter_req_id;
if (!uxr_run_session_until_all_status(&session, requestTimeoutMs, requests, status, nRequests)) {
GCS_SEND_TEXT(MAV_SEVERITY_ERROR,"XRCE Client: Topic/Pub/Writer session request failure for index 'TODO'");
GCS_SEND_TEXT(MAV_SEVERITY_ERROR,"XRCE Client: Topic/Pub/Writer session request failure for index '%u'",i);
for (uint8_t s = 0 ; s < nRequests; s++) {
GCS_SEND_TEXT(MAV_SEVERITY_ERROR,"XRCE Client: Status '%d' result '%u'", s, status[s]);
}
// TODO add a failure log message sharing the status results
return false;
} else {
GCS_SEND_TEXT(MAV_SEVERITY_INFO,"XRCE Client: Topic/Pub/Writer session pass for index 'TOOO'");
GCS_SEND_TEXT(MAV_SEVERITY_INFO,"XRCE Client: Topic/Pub/Writer session pass for index '%u'",i);
}
} else if (strlen(topics[i].dr_profile_label) > 0) {
// Subscriber
@ -621,18 +694,51 @@ bool AP_DDS_Client::create()
requests[2] = dreader_req_id;
if (!uxr_run_session_until_all_status(&session, requestTimeoutMs, requests, status, nRequests)) {
GCS_SEND_TEXT(MAV_SEVERITY_ERROR,"XRCE Client: Topic/Sub/Reader session request failure for index '%d'",(int)i);
GCS_SEND_TEXT(MAV_SEVERITY_ERROR,"XRCE Client: Topic/Sub/Reader session request failure for index '%u'",i);
for (uint8_t s = 0 ; s < nRequests; s++) {
GCS_SEND_TEXT(MAV_SEVERITY_ERROR,"XRCE Client: Status '%d' result '%u'", s, status[s]);
}
// TODO add a failure log message sharing the status results
return false;
} else {
GCS_SEND_TEXT(MAV_SEVERITY_INFO,"XRCE Client: Topic/Sub/Reader session pass for index '%d'",(int)i);
GCS_SEND_TEXT(MAV_SEVERITY_INFO,"XRCE Client: Topic/Sub/Reader session pass for index '%u'",i);
uxr_buffer_request_data(&session, reliable_out, topics[i].dr_id, reliable_in, &delivery_control);
}
}
}
// ROS-2 Service : else case for service requests
for (uint16_t i = 0; i < ARRAY_SIZE(services); i++) {
constexpr uint16_t requestTimeoutMs = maxTimeMsPerRequestMs;
if (strlen(services[i].rep_profile_label) > 0) {
const uxrObjectId rep_id = {
.id = services[i].rep_id,
.type = UXR_REPLIER_ID
};
const char* replier_ref = services[i].rep_profile_label;
const auto replier_req_id = uxr_buffer_create_replier_ref(&session, reliable_out, rep_id, participant_id, replier_ref, UXR_REPLACE);
uint16_t request = replier_req_id;
uint8_t status;
if (!uxr_run_session_until_all_status(&session, requestTimeoutMs, &request, &status, 1)) {
GCS_SEND_TEXT(MAV_SEVERITY_ERROR,"XRCE Client: Service/Replier session request failure for index '%u'",i);
GCS_SEND_TEXT(MAV_SEVERITY_ERROR,"XRCE Client: Status result '%u'", status);
// TODO add a failure log message sharing the status results
return false;
} else {
GCS_SEND_TEXT(MAV_SEVERITY_INFO,"XRCE Client: Service/Replier session pass for index '%u'",i);
uxr_buffer_request_data(&session, reliable_out, rep_id, reliable_in, &delivery_control);
}
} else if (strlen(services[i].req_profile_label) > 0) {
// TODO : Add Similar Code for Requester Profile
}
}
return true;
}

View File

@ -87,10 +87,16 @@ private:
static void update_topic(rosgraph_msgs_msg_Clock& msg);
// subscription callback function
static void on_topic(uxrSession* session, uxrObjectId object_id, uint16_t request_id, uxrStreamId stream_id, struct ucdrBuffer* ub, uint16_t length, void* args);
static void on_topic_trampoline(uxrSession* session, uxrObjectId object_id, uint16_t request_id, uxrStreamId stream_id, struct ucdrBuffer* ub, uint16_t length, void* args);
void on_topic(uxrSession* session, uxrObjectId object_id, uint16_t request_id, uxrStreamId stream_id, struct ucdrBuffer* ub, uint16_t length);
// count of subscribed samples
uint32_t count;
uint32_t subscribe_sample_count;
// service replier callback function
static void on_request_trampoline(uxrSession* session, uxrObjectId object_id, uint16_t request_id, SampleIdentity* sample_id, ucdrBuffer* ub, uint16_t length, void* args);
void on_request(uxrSession* session, uxrObjectId object_id, uint16_t request_id, SampleIdentity* sample_id, ucdrBuffer* ub, uint16_t length);
// count of request samples
uint32_t request_sample_count;
// delivery control parameters
uxrDeliveryControl delivery_control {
@ -195,7 +201,24 @@ public:
};
static const struct Topic_table topics[];
//! @brief Convenience grouping for a single "channel" of services
struct Service_table {
//! @brief Request ID for the service
const uint8_t req_id;
//! @brief Reply ID for the service
const uint8_t rep_id;
//! @brief Profile Label for the service
const char* srv_profile_label;
//! @brief Profile Label for the service requester
const char* req_profile_label;
//! @brief Profile Label for the service replier
const char* rep_profile_label;
};
static const struct Service_table services[];
};
#endif // AP_DDS_ENABLED

View File

@ -0,0 +1,21 @@
#include "uxr/client/client.h"
enum class ServiceIndex: uint8_t {
ARMING_MOTORS
};
static inline constexpr uint8_t to_underlying(const ServiceIndex index)
{
static_assert(sizeof(index) == sizeof(uint8_t));
return static_cast<uint8_t>(index);
}
constexpr struct AP_DDS_Client::Service_table AP_DDS_Client::services[] = {
{
.req_id = to_underlying(ServiceIndex::ARMING_MOTORS),
.rep_id = to_underlying(ServiceIndex::ARMING_MOTORS),
.srv_profile_label = "ArmMotorsService",
.req_profile_label = "",
.rep_profile_label = "ArmMotors_Replier",
}
};

View File

@ -0,0 +1,36 @@
types:
idls:
- >
struct ArmMotors_Request
{
boolean arm;
};
struct ArmMotors_Response
{
boolean result;
};
systems:
dds: { type: fastdds }
ros2: { type: ros2 }
routes:
dds_server:
server: dds
clients: ros2
services:
arm_motors: {
request_type: ArmMotors_Request,
reply_type: ArmMotors_Response,
route: dds_server,
remap: {
dds: {
topic: ArmMotorsService,
},
ros2: {
request_type: "ardupilot_msgs/ArmMotors:request",
reply_type: "ardupilot_msgs/ArmMotors:response"
}
}
}

View File

@ -223,6 +223,9 @@ Next, follow the associated section for your chosen transport, and finally you c
$ ros2 topic echo /ap/time
sec: 1678668735
nanosec: 729410000
$ ros2 service list
/arm_motors
---
```
@ -232,7 +235,34 @@ Next, follow the associated section for your chosen transport, and finally you c
```
In order to consume the transforms, it's highly recommended to [create and run a transform broadcaster in ROS 2](https://docs.ros.org/en/humble/Concepts/About-Tf2.html#tutorials).
## Using ROS 2 services (with Integration Services)
### Prerequisites
- Install and setup [Micro-XRCE Agent](https://micro-xrce-dds.docs.eprosima.com/en/latest/installation.html#installing-the-agent-standalone)
- Install and setup [Integration Services](https://integration-service.docs.eprosima.com/en/latest/installation_manual/installation.html) (it would be good to have a separate workspace for this)
- Get System Handles for [ROS 2](https://github.com/eProsima/ROS2-SH)
- Get System Handles for [Fast-DDS](https://github.com/eProsima/FastDDS-SH)
- Once the above-mentioned System Handles have been cloned, build the Integration Services with the following command :
`colcon build --cmake-args -DMIX_ROS_PACKAGES="example_interfaces ardupilot_msgs"`
### Setup
- The necessary ROS 2 messages and service defintions (especially for the Arming/Disarming Services) are already defined in the `ardupilot_msgs` folder in the `Tools` directory.
### Terminal 1 (XRCE Agent)
- Move to the **AP_DDS** folder and run the XRCE Agent as follows `MicroXRCEAgent udp4 -p 2019 -r dds_xrce_profile.xml`
### Terminal 2 (Integration Service)
- Source ROS 2 installation
- Source Integration Service installation
- Move to the **AP_DDS** folder and run the following command `integration-service Is-Config/Arm_Motors_DDS_IS_config.yaml`
### Terminal 3 (Ardupilot)
- Make sure you have successfully setup Ardupilot and the `DDS_ENABLE` param is set to 1
- Run SITL with the following command `sim_vehicle.py -v ArduPlane -DG --console --enable-dds`
### Terminal 4 (ROS 2 Client)
- Run the following command : `ros2 service call /arm_motors ardupilot_msgs/srv/ArmMotors "{arm: True}"`
## Contributing to AP_DDS library
### Adding DDS messages to Ardupilot

View File

@ -268,4 +268,6 @@
<dataType>geometry_msgs::msg::dds_::TwistStamped_</dataType>
</topic>
</data_reader>
<replier profile_name="ArmMotors_Replier" service_name="ArmMotorsService" request_type="ArmMotors_Request" reply_type="ArmMotors_Response">
</replier>
</profiles>