diff --git a/src/modules/uxrce_dds_client/dds_topics.h.em b/src/modules/uxrce_dds_client/dds_topics.h.em index 0addb6b903..9e4be9832e 100644 --- a/src/modules/uxrce_dds_client/dds_topics.h.em +++ b/src/modules/uxrce_dds_client/dds_topics.h.em @@ -21,12 +21,16 @@ import os #include #include -#include +#include #include +#include @[for include in type_includes]@ #include +#include @[end for]@ +#define UXRCE_DEFAULT_POLL_RATE 10 + typedef bool (*UcdrSerializeMethod)(const void* data, ucdrBuffer& buf, int64_t time_offset); static constexpr int max_topic_size = 512; @@ -35,7 +39,7 @@ static_assert(sizeof(@(pub['simple_base_type'])_s) <= max_topic_size, "topic too @[ end for]@ struct SendSubscription { - uORB::Subscription subscription; + const struct orb_metadata *orb_meta; uxrObjectId data_writer; const char* dds_type_name; uint32_t topic_size; @@ -46,7 +50,7 @@ struct SendSubscription { struct SendTopicsSubs { SendSubscription send_subscriptions[@(len(publications))] = { @[ for pub in publications]@ - { uORB::Subscription(ORB_ID(@(pub['topic_simple']))), + { ORB_ID(@(pub['topic_simple'])), uxr_object_id(0, UXR_INVALID_ID), "@(pub['dds_type'])", ucdr_topic_size_@(pub['simple_base_type'])(), @@ -55,12 +59,23 @@ struct SendTopicsSubs { @[ end for]@ }; + px4_pollfd_struct_t fds[@(len(publications))] {}; + uint32_t num_payload_sent{}; + void init(); void update(uxrSession *session, uxrStreamId reliable_out_stream_id, uxrStreamId best_effort_stream_id, uxrObjectId participant_id, const char *client_namespace); void reset(); }; +void SendTopicsSubs::init() { + for (unsigned idx = 0; idx < sizeof(send_subscriptions)/sizeof(send_subscriptions[0]); ++idx) { + fds[idx].fd = orb_subscribe(send_subscriptions[idx].orb_meta); + fds[idx].events = POLLIN; + orb_set_interval(fds[idx].fd, UXRCE_DEFAULT_POLL_RATE); + } +} + void SendTopicsSubs::reset() { num_payload_sent = 0; for (unsigned idx = 0; idx < sizeof(send_subscriptions)/sizeof(send_subscriptions[0]); ++idx) { @@ -75,10 +90,12 @@ void SendTopicsSubs::update(uxrSession *session, uxrStreamId reliable_out_stream alignas(sizeof(uint64_t)) char topic_data[max_topic_size]; for (unsigned idx = 0; idx < sizeof(send_subscriptions)/sizeof(send_subscriptions[0]); ++idx) { - if (send_subscriptions[idx].subscription.update(&topic_data)) { + if (fds[idx].revents & POLLIN) { + // Topic updated, copy data and send + orb_copy(send_subscriptions[idx].orb_meta, fds[idx].fd, &topic_data); if (send_subscriptions[idx].data_writer.id == UXR_INVALID_ID) { // data writer not created yet - create_data_writer(session, reliable_out_stream_id, participant_id, send_subscriptions[idx].subscription.orb_id(), client_namespace, send_subscriptions[idx].subscription.get_topic()->o_name, + create_data_writer(session, reliable_out_stream_id, participant_id, static_cast(send_subscriptions[idx].orb_meta->o_id), client_namespace, send_subscriptions[idx].orb_meta->o_name, send_subscriptions[idx].dds_type_name, send_subscriptions[idx].data_writer); } diff --git a/src/modules/uxrce_dds_client/uxrce_dds_client.cpp b/src/modules/uxrce_dds_client/uxrce_dds_client.cpp index 91d7792d36..f4c9cfd0ce 100644 --- a/src/modules/uxrce_dds_client/uxrce_dds_client.cpp +++ b/src/modules/uxrce_dds_client/uxrce_dds_client.cpp @@ -335,9 +335,31 @@ void UxrceddsClient::run() bool had_ping_reply = false; uint32_t last_num_payload_sent{}; uint32_t last_num_payload_received{}; + int poll_error_counter = 0; + + _subs->init(); while (!should_exit() && _connected) { + /* Wait for topic updates for max 1000 ms (1sec) */ + int poll = px4_poll(&_subs->fds[0], (sizeof(_subs->fds) / sizeof(_subs->fds[0])), 1000); + + /* Handle the poll results */ + if (poll == 0) { + /* Timeout, no updates in selected uorbs */ + continue; + + } else if (poll < 0) { + /* Error */ + if (poll_error_counter < 10 || poll_error_counter % 50 == 0) { + /* Prevent flooding */ + PX4_ERR("ERROR while polling uorbs: %d", poll); + } + + poll_error_counter++; + continue; + } + _subs->update(&session, reliable_out, best_effort_out, participant_id, _client_namespace); uxr_run_session_timeout(&session, 0); @@ -388,7 +410,6 @@ void UxrceddsClient::run() _connected = false; } - px4_usleep(1000); } uxr_delete_session_retries(&session, _connected ? 1 : 0);