Compare commits

...

1 Commits

Author SHA1 Message Date
Beniamino Pozzan 64257391c6
[uxrce_dds_client] Allow for arbitrary topic instances to be bridged
Signed-off-by: Beniamino Pozzan <beniamino.pozzan@gmail.com>
2023-11-25 21:22:15 +00:00
3 changed files with 39 additions and 9 deletions

View File

@ -41,6 +41,7 @@ static_assert(sizeof(@(pub['simple_base_type'])_s) <= max_topic_size, "topic too
struct SendSubscription { struct SendSubscription {
const struct orb_metadata *orb_meta; const struct orb_metadata *orb_meta;
const int8_t instance;
uxrObjectId data_writer; uxrObjectId data_writer;
const char* dds_type_name; const char* dds_type_name;
uint32_t topic_size; uint32_t topic_size;
@ -52,6 +53,7 @@ struct SendTopicsSubs {
SendSubscription send_subscriptions[@(len(publications))] = { SendSubscription send_subscriptions[@(len(publications))] = {
@[ for pub in publications]@ @[ for pub in publications]@
{ ORB_ID(@(pub['topic_simple'])), { ORB_ID(@(pub['topic_simple'])),
@(pub['instance']),
uxr_object_id(0, UXR_INVALID_ID), uxr_object_id(0, UXR_INVALID_ID),
"@(pub['dds_type'])", "@(pub['dds_type'])",
ucdr_topic_size_@(pub['simple_base_type'])(), ucdr_topic_size_@(pub['simple_base_type'])(),
@ -71,7 +73,12 @@ struct SendTopicsSubs {
void SendTopicsSubs::init() { void SendTopicsSubs::init() {
for (unsigned idx = 0; idx < sizeof(send_subscriptions)/sizeof(send_subscriptions[0]); ++idx) { for (unsigned idx = 0; idx < sizeof(send_subscriptions)/sizeof(send_subscriptions[0]); ++idx) {
if (send_subscriptions[idx].instance == -1){
fds[idx].fd = orb_subscribe(send_subscriptions[idx].orb_meta); fds[idx].fd = orb_subscribe(send_subscriptions[idx].orb_meta);
}
else{
fds[idx].fd = orb_subscribe_multi(send_subscriptions[idx].orb_meta, send_subscriptions[idx].instance);
}
fds[idx].events = POLLIN; fds[idx].events = POLLIN;
orb_set_interval(fds[idx].fd, UXRCE_DEFAULT_POLL_RATE); orb_set_interval(fds[idx].fd, UXRCE_DEFAULT_POLL_RATE);
} }
@ -97,7 +104,7 @@ void SendTopicsSubs::update(uxrSession *session, uxrStreamId reliable_out_stream
if (send_subscriptions[idx].data_writer.id == UXR_INVALID_ID) { if (send_subscriptions[idx].data_writer.id == UXR_INVALID_ID) {
// data writer not created yet // data writer not created yet
create_data_writer(session, reliable_out_stream_id, participant_id, static_cast<ORB_ID>(send_subscriptions[idx].orb_meta->o_id), client_namespace, send_subscriptions[idx].orb_meta->o_name, create_data_writer(session, reliable_out_stream_id, participant_id, static_cast<ORB_ID>(send_subscriptions[idx].orb_meta->o_id), client_namespace, send_subscriptions[idx].orb_meta->o_name,
send_subscriptions[idx].dds_type_name, send_subscriptions[idx].data_writer); send_subscriptions[idx].instance, send_subscriptions[idx].dds_type_name, send_subscriptions[idx].data_writer);
} }
if (send_subscriptions[idx].data_writer.id != UXR_INVALID_ID) { if (send_subscriptions[idx].data_writer.id != UXR_INVALID_ID) {

View File

@ -98,6 +98,14 @@ def process_message_type(msg_type):
msg_type['dds_type'] = msg_type['type'].replace("::msg::", "::msg::dds_::") + "_" msg_type['dds_type'] = msg_type['type'].replace("::msg::", "::msg::dds_::") + "_"
# topic_simple: eg vehicle_status # topic_simple: eg vehicle_status
msg_type['topic_simple'] = msg_type['topic'].split('/')[-1] msg_type['topic_simple'] = msg_type['topic'].split('/')[-1]
# topic instance
if 'instance' in msg_type:
# if instance is given, check if it it a non negative integer
if not (type(msg_type['instance']) is int and msg_type['instance'] >= 0) :
raise TypeError("`instance` must be a non negative integer")
else:
# if instance is not given,
msg_type['instance'] = -1
pubs_not_empty = msg_map['publications'] is not None pubs_not_empty = msg_map['publications'] is not None
if pubs_not_empty: if pubs_not_empty:

View File

@ -23,30 +23,44 @@ uxrObjectId topic_id_from_orb(ORB_ID orb_id, uint8_t instance = 0)
return uxrObjectId{}; return uxrObjectId{};
} }
static bool generate_topic_name(char *topic, const char *client_namespace, const char *direction, const char *name) static bool generate_topic_name(char *topic, const char *client_namespace, const char *direction, const char *name,
const int8_t uorb_topic_instance)
{ {
if (client_namespace != nullptr) { if (client_namespace != nullptr) {
if (uorb_topic_instance == -1) {
int ret = snprintf(topic, TOPIC_NAME_SIZE, "rt/%s/fmu/%s/%s", client_namespace, direction, name); int ret = snprintf(topic, TOPIC_NAME_SIZE, "rt/%s/fmu/%s/%s", client_namespace, direction, name);
return (ret > 0 && ret < TOPIC_NAME_SIZE); return (ret > 0 && ret < TOPIC_NAME_SIZE);
} }
int ret = snprintf(topic, TOPIC_NAME_SIZE, "rt/%s/fmu/%s/%s%d", client_namespace, direction, name, uorb_topic_instance);
return (ret > 0 && ret < TOPIC_NAME_SIZE);
}
if (uorb_topic_instance == -1) {
int ret = snprintf(topic, TOPIC_NAME_SIZE, "rt/fmu/%s/%s", direction, name); int ret = snprintf(topic, TOPIC_NAME_SIZE, "rt/fmu/%s/%s", direction, name);
return (ret > 0 && ret < TOPIC_NAME_SIZE); return (ret > 0 && ret < TOPIC_NAME_SIZE);
} }
int ret = snprintf(topic, TOPIC_NAME_SIZE, "rt/fmu/%s/%s%d", direction, name, uorb_topic_instance);
return (ret > 0 && ret < TOPIC_NAME_SIZE);
}
static bool create_data_writer(uxrSession *session, uxrStreamId reliable_out_stream_id, uxrObjectId participant_id, static bool create_data_writer(uxrSession *session, uxrStreamId reliable_out_stream_id, uxrObjectId participant_id,
ORB_ID orb_id, const char *client_namespace, const char *topic_name_simple, const char *type_name, ORB_ID orb_id, const char *client_namespace, const char *topic_name_simple, const int8_t uorb_topic_instance,
const char *type_name,
uxrObjectId &datawriter_id) uxrObjectId &datawriter_id)
{ {
// topic // topic
char topic_name[TOPIC_NAME_SIZE]; char topic_name[TOPIC_NAME_SIZE];
if (!generate_topic_name(topic_name, client_namespace, "out", topic_name_simple)) { if (!generate_topic_name(topic_name, client_namespace, "out", topic_name_simple, uorb_topic_instance)) {
PX4_ERR("topic path too long"); PX4_ERR("topic path too long");
return false; return false;
} }
uxrObjectId topic_id = topic_id_from_orb(orb_id); uxrObjectId topic_id = topic_id_from_orb(orb_id, (uorb_topic_instance == -1 ? 0 : uorb_topic_instance));
uint16_t topic_req = uxr_buffer_create_topic_bin(session, reliable_out_stream_id, topic_id, participant_id, topic_name, uint16_t topic_req = uxr_buffer_create_topic_bin(session, reliable_out_stream_id, topic_id, participant_id, topic_name,
type_name, UXR_REPLACE); type_name, UXR_REPLACE);
@ -92,8 +106,9 @@ static bool create_data_reader(uxrSession *session, uxrStreamId reliable_out_str
{ {
// topic // topic
char topic_name[TOPIC_NAME_SIZE]; char topic_name[TOPIC_NAME_SIZE];
const int8_t no_instance = -1;
if (!generate_topic_name(topic_name, client_namespace, "in", topic_name_simple)) { if (!generate_topic_name(topic_name, client_namespace, "in", topic_name_simple, no_instance)) {
PX4_ERR("topic path too long"); PX4_ERR("topic path too long");
return false; return false;
} }