@############################################### @# @# EmPy template for generating microRTPS_client.cpp file @# @############################################### @# Start of Template @# @# Context: @# - msgs (List) list of all RTPS messages @# - topics (List) list of all topic names @# - spec (msggen.MsgSpec) Parsed specification of the .msg file @############################################### @{ import os import genmsg.msgs from px_generate_uorb_topic_files import MsgScope # this is in Tools/ topic_names = [s.short_name for s in spec] send_topics = [(alias[idx] if alias[idx] else s.short_name) for idx, s in enumerate(spec) if scope[idx] == MsgScope.SEND] send_base_types = [s.short_name for idx, s in enumerate(spec) if scope[idx] == MsgScope.SEND] recv_topics = [(alias[idx] if alias[idx] else s.short_name) for idx, s in enumerate(spec) if scope[idx] == MsgScope.RECEIVE] receive_base_types = [s.short_name for idx, s in enumerate(spec) if scope[idx] == MsgScope.RECEIVE] }@ /**************************************************************************** * * Copyright (c) 2017 Proyectos y Sistemas de Mantenimiento SL (eProsima). * Copyright (c) 2018-2021 PX4 Development Team. All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * 1. Redistributions of source code must retain the above copyright notice, this * list of conditions and the following disclaimer. * * 2. Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * * 3. Neither the name of the copyright holder nor the names of its contributors * may be used to endorse or promote products derived from this software without * specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. * ****************************************************************************/ #include #include #include #include #include #include #include #include #include #include #include @[for topic in list(set(topic_names))]@ #include #include @[end for]@ using namespace time_literals; @[if recv_topics]@ // Publishers for received messages struct RcvTopicsPubs { @[ for idx, topic in enumerate(recv_topics)]@ uORB::Publication <@(receive_base_types[idx])_s> @(topic)_pub{ORB_ID(@(topic))}; @[ end for]@ }; @[end if]@ @[if send_topics]@ // Subscribers for messages to send struct SendTopicsSubs { @[ for idx, topic in enumerate(send_topics)]@ uORB::Subscription @(topic)_sub{ORB_ID(@(topic))}; @[ end for]@ }; struct SendThreadArgs { const uint32_t &datarate; uint64_t &total_sent; uint64_t &sent_last_sec; uint64_t &sent; int &sent_loop; SendThreadArgs(const uint32_t &datarate_, uint64_t &total_sent_, uint64_t &sent_last_sec_, uint64_t &sent_, int &sent_loop_) : datarate(datarate_), total_sent(total_sent_), sent_last_sec(sent_last_sec_), sent(sent_), sent_loop(sent_loop_) {} }; void *send(void *args) { char data_buffer[BUFFER_SIZE]{}; int read{0}; uint32_t length{0}; size_t header_length{0}; uint8_t last_msg_seq{0}; uint8_t last_remote_msg_seq{0}; struct SendThreadArgs *data = reinterpret_cast(args); SendTopicsSubs *subs = new SendTopicsSubs(); float bandwidth_mult{0}; float tx_interval{1.f}; uint64_t tx_last_sec_read{0}; hrt_abstime last_stats_update{0}; // ucdrBuffer to serialize using the user defined buffer ucdrBuffer writer; header_length = transport_node->get_header_length(); ucdr_init_buffer(&writer, reinterpret_cast(&data_buffer[header_length]), BUFFER_SIZE - header_length); while (!_should_exit_task) { @[ for idx, topic in enumerate(send_topics)]@ { @(send_base_types[idx])_s @(topic)_data; if (subs->@(topic)_sub.update(&@(topic)_data)) { @[ if topic == 'Timesync' or topic == 'timesync']@ if (@(topic)_data.seq != last_remote_msg_seq && @(topic)_data.tc1 == 0) { last_remote_msg_seq = @(topic)_data.seq; @(topic)_data.timestamp = hrt_absolute_time(); @(topic)_data.seq = last_msg_seq; @(topic)_data.tc1 = hrt_absolute_time() * 1000ULL; @(topic)_data.ts1 = @(topic)_data.ts1; last_msg_seq++; @[ end if]@ // copy raw data into local buffer. Payload is shifted by header length to make room for header serialize_@(send_base_types[idx])(&writer, &@(topic)_data, &data_buffer[header_length], &length); if (0 < (read = transport_node->write(static_cast(@(msgs[0].index(topic) + 1)), data_buffer, length))) { data->total_sent += read; tx_last_sec_read += read; ++data->sent; } @[ if topic == 'Timesync' or topic == 'timesync']@ } @[ end if]@ } } @[ end for]@ if (hrt_absolute_time() - last_stats_update >= 1_s) { data->sent_last_sec = tx_last_sec_read; if (data->datarate > 0) { bandwidth_mult = static_cast(data->datarate) / static_cast(tx_last_sec_read); // Apply a low-pass filter to determine the new TX interval tx_interval += 0.5f * (tx_interval / bandwidth_mult - tx_interval); // Clamp the interval between 1 and 1000 ms tx_interval = math::constrain(tx_interval, MIN_TX_INTERVAL_US, MAX_TX_INTERVAL_US); } tx_last_sec_read = 0; last_stats_update = hrt_absolute_time(); } px4_usleep(tx_interval); ++data->sent_loop; } delete(data); delete(subs); return nullptr; } static int launch_send_thread(pthread_t &sender_thread, struct SendThreadArgs &args) { pthread_attr_t sender_thread_attr; pthread_attr_init(&sender_thread_attr); pthread_attr_setstacksize(&sender_thread_attr, PX4_STACK_ADJUSTED(2250)); struct sched_param param; (void)pthread_attr_getschedparam(&sender_thread_attr, ¶m); param.sched_priority = SCHED_PRIORITY_DEFAULT; (void)pthread_attr_setschedparam(&sender_thread_attr, ¶m); int rc = pthread_create(&sender_thread, &sender_thread_attr, &send, (void *)&args); if (rc != 0) { errno = rc; PX4_ERR("Could not create send thread (%d)", errno); return -1; } rc = pthread_setname_np(sender_thread, "urtpsclient_snd"); if (pthread_setname_np(sender_thread, "urtpsclient_snd")) { errno = rc; PX4_ERR("Could not set pthread name for the send thread (%d)", errno); } pthread_attr_destroy(&sender_thread_attr); return 0; } @[end if]@ void micrortps_start_topics(const uint32_t &datarate, struct timespec &begin, uint64_t &total_rcvd, uint64_t &total_sent, uint64_t &sent_last_sec, uint64_t &rcvd_last_sec, uint64_t &received, uint64_t &sent, int &rcvd_loop, int &sent_loop) { px4_clock_gettime(CLOCK_REALTIME, &begin); _should_exit_task = false; @[if recv_topics]@ char data_buffer[BUFFER_SIZE]{}; int read{0}; uint8_t topic_ID{255}; uint64_t rx_last_sec_read{0}; hrt_abstime last_stats_update{0}; RcvTopicsPubs *pubs = new RcvTopicsPubs(); // Set the main task name to 'urtpsclient_rcv' in case there is // data to receive px4_prctl(PR_SET_NAME, "urtpsclient_rcv", px4_getpid()); // ucdrBuffer to deserialize using the user defined buffer ucdrBuffer reader; ucdr_init_buffer(&reader, reinterpret_cast(data_buffer), BUFFER_SIZE); @[end if]@ @[if send_topics]@ // var struct to be updated on the thread SendThreadArgs *sender_thread_args = new SendThreadArgs(datarate, total_sent, sent_last_sec, sent, sent_loop); // create a thread for sending data pthread_t sender_thread; launch_send_thread(sender_thread, (*sender_thread_args)); @[end if]@ while (!_should_exit_task) { @[if recv_topics]@ read = transport_node->read(); if (read > 0) { total_rcvd += read; rx_last_sec_read += read; } while (transport_node->parse(&topic_ID, data_buffer, BUFFER_SIZE)) { uint64_t read_time = hrt_absolute_time(); switch (topic_ID) { @[ for idx, topic in enumerate(recv_topics)]@ case @(msgs[0].index(topic) + 1): { @(receive_base_types[idx])_s @(topic)_data; deserialize_@(receive_base_types[idx])(&reader, &@(topic)_data, data_buffer); if (@(topic)_data.timestamp > read_time) { // don't allow timestamps from the future @(topic)_data.timestamp = read_time; } pubs->@(topic)_pub.publish(@(topic)_data); ++received; } break; @[ end for]@ default: PX4_WARN("Unexpected topic ID '%hhu' to getMsg. Please make sure the client is capable of parsing the message associated to the topic ID '%hhu'", topic_ID, topic_ID); break; } } @[end if]@ if (hrt_absolute_time() - last_stats_update >= 1_s) { rcvd_last_sec = rx_last_sec_read; rx_last_sec_read = 0; last_stats_update = hrt_absolute_time(); } // loop forever if informed loop number is negative if (_options.loops >= 0 && rcvd_loop >= _options.loops) { break; } px4_usleep(_options.sleep_us); ++rcvd_loop; } @[if send_topics]@ _should_exit_task = true; pthread_join(sender_thread, nullptr); @[end if]@ @[if recv_topics]@ delete(pubs); @[end if]@ }