uxrce_dds_client: add reliable_qos option topics

It's not set for any topic, as it only works for UDP but not over serial.
Over serial, all topics are created, but then no data is sent:
uxrce_dds_client status
INFO  [uxrce_dds_client] Running, connected
INFO  [uxrce_dds_client] Using transport:     serial
INFO  [uxrce_dds_client] Payload tx:          0 B/s
INFO  [uxrce_dds_client] Payload rx:          0 B/s
This commit is contained in:
Beat Küng 2024-01-18 09:44:54 +01:00
parent a73a308486
commit 39d7c5b60c
No known key found for this signature in database
GPG Key ID: 866DB5F0E24821BB
3 changed files with 13 additions and 7 deletions

View File

@ -45,6 +45,7 @@ struct SendSubscription {
const char* dds_type_name;
uint32_t topic_size;
UcdrSerializeMethod ucdr_serialize_method;
bool reliable_qos;
};
// Subscribers for messages to send
@ -56,6 +57,7 @@ struct SendTopicsSubs {
"@(pub['dds_type'])",
ucdr_topic_size_@(pub['simple_base_type'])(),
&ucdr_serialize_@(pub['simple_base_type']),
@(pub['reliable_qos']),
},
@[ end for]@
};
@ -97,14 +99,15 @@ void SendTopicsSubs::update(uxrSession *session, uxrStreamId reliable_out_stream
if (send_subscriptions[idx].data_writer.id == UXR_INVALID_ID) {
// data writer not created yet
create_data_writer(session, reliable_out_stream_id, participant_id, static_cast<ORB_ID>(send_subscriptions[idx].orb_meta->o_id), client_namespace, send_subscriptions[idx].orb_meta->o_name,
send_subscriptions[idx].dds_type_name, send_subscriptions[idx].data_writer);
send_subscriptions[idx].dds_type_name, send_subscriptions[idx].data_writer, send_subscriptions[idx].reliable_qos);
}
if (send_subscriptions[idx].data_writer.id != UXR_INVALID_ID) {
ucdrBuffer ub;
uint32_t topic_size = send_subscriptions[idx].topic_size;
if (uxr_prepare_output_stream(session, best_effort_stream_id, send_subscriptions[idx].data_writer, &ub, topic_size) != UXR_INVALID_REQUEST_ID) {
const uxrStreamId stream_id = send_subscriptions[idx].reliable_qos ? reliable_out_stream_id : best_effort_stream_id;
if (uxr_prepare_output_stream(session, stream_id, send_subscriptions[idx].data_writer, &ub, topic_size) != UXR_INVALID_REQUEST_ID) {
send_subscriptions[idx].ucdr_serialize_method(&topic_data, ub, time_offset_us);
// TODO: fill up the MTU and then flush, which reduces the packet overhead
uxr_flash_output_streams(session);
@ -169,7 +172,8 @@ bool RcvTopicsPubs::init(uxrSession *session, uxrStreamId reliable_out_stream_id
@[ for idx, sub in enumerate(subscriptions + subscriptions_multi)]@
{
uint16_t queue_depth = uORB::DefaultQueueSize<@(sub['simple_base_type'])_s>::value * 2; // use a bit larger queue size than internal
create_data_reader(session, reliable_out_stream_id, best_effort_in_stream_id, participant_id, @(idx), client_namespace, "@(sub['topic_simple'])", "@(sub['dds_type'])", queue_depth);
const uxrStreamId stream_in_id = @(sub['reliable_qos']) ? reliable_in_stream_id : best_effort_in_stream_id;
create_data_reader(session, reliable_out_stream_id, stream_in_id, participant_id, @(idx), client_namespace, "@(sub['topic_simple'])", "@(sub['dds_type'])", queue_depth, @(sub['reliable_qos']));
}
@[ end for]@

View File

@ -99,6 +99,8 @@ def process_message_type(msg_type):
# topic_simple: eg vehicle_status
msg_type['topic_simple'] = msg_type['topic'].split('/')[-1]
msg_type['reliable_qos'] = 'true' if msg_type.get('reliable_qos', False) else 'false'
pubs_not_empty = msg_map['publications'] is not None
if pubs_not_empty:
for p in msg_map['publications']:

View File

@ -36,7 +36,7 @@ static bool generate_topic_name(char *topic, const char *client_namespace, const
static bool create_data_writer(uxrSession *session, uxrStreamId reliable_out_stream_id, uxrObjectId participant_id,
ORB_ID orb_id, const char *client_namespace, const char *topic_name_simple, const char *type_name,
uxrObjectId &datawriter_id)
uxrObjectId &datawriter_id, bool reliable_qos)
{
// topic
char topic_name[TOPIC_NAME_SIZE];
@ -62,7 +62,7 @@ static bool create_data_writer(uxrSession *session, uxrStreamId reliable_out_str
uxrQoS_t qos = {
.durability = UXR_DURABILITY_TRANSIENT_LOCAL,
.reliability = UXR_RELIABILITY_BEST_EFFORT,
.reliability = reliable_qos ? UXR_RELIABILITY_RELIABLE : UXR_RELIABILITY_BEST_EFFORT,
.history = UXR_HISTORY_KEEP_LAST,
.depth = 0,
};
@ -88,7 +88,7 @@ static bool create_data_writer(uxrSession *session, uxrStreamId reliable_out_str
static bool create_data_reader(uxrSession *session, uxrStreamId reliable_out_stream_id, uxrStreamId input_stream_id,
uxrObjectId participant_id, uint16_t index, const char *client_namespace, const char *topic_name_simple,
const char *type_name, uint16_t queue_depth)
const char *type_name, uint16_t queue_depth, bool reliable_qos)
{
// topic
char topic_name[TOPIC_NAME_SIZE];
@ -118,7 +118,7 @@ static bool create_data_reader(uxrSession *session, uxrStreamId reliable_out_str
uxrQoS_t qos = {
.durability = UXR_DURABILITY_VOLATILE,
.reliability = UXR_RELIABILITY_BEST_EFFORT,
.reliability = reliable_qos ? UXR_RELIABILITY_RELIABLE : UXR_RELIABILITY_BEST_EFFORT,
.history = UXR_HISTORY_KEEP_LAST,
.depth = queue_depth,
};