From 6d5f12d2a2ab97f17070179cf3ccc94c3cbb4845 Mon Sep 17 00:00:00 2001 From: TSC21 Date: Fri, 7 May 2021 13:32:25 +0200 Subject: [PATCH] microRTPS: client: add TX rate limiter --- .../uorb_microcdr/microRTPS_client.cpp.em | 142 ++++++++---------- .../micrortps_client/microRTPS_client.h | 28 ++-- .../microRTPS_client_main.cpp | 38 +++-- 3 files changed, 104 insertions(+), 104 deletions(-) diff --git a/msg/templates/uorb_microcdr/microRTPS_client.cpp.em b/msg/templates/uorb_microcdr/microRTPS_client.cpp.em index 89cf558ff4..fc06ddf3ff 100644 --- a/msg/templates/uorb_microcdr/microRTPS_client.cpp.em +++ b/msg/templates/uorb_microcdr/microRTPS_client.cpp.em @@ -57,8 +57,8 @@ receive_base_types = [s.short_name for idx, s in enumerate(spec) if scope[idx] = * ****************************************************************************/ -#include "microRTPS_transport.h" -#include "microRTPS_client.h" +#include +#include #include #include @@ -76,14 +76,7 @@ receive_base_types = [s.short_name for idx, s in enumerate(spec) if scope[idx] = #include @[end for]@ -uint8_t last_remote_msg_seq = 0; -uint8_t last_msg_seq = 0; - -uint64_t tx_last_sec_read = 0; -uint64_t rx_last_sec_read = 0; - -pthread_mutex_t tx_lock; -pthread_mutex_t rx_lock; +using namespace time_literals; @[if recv_topics]@ // Publishers for received messages @@ -101,17 +94,17 @@ struct SendTopicsSubs { uORB::Subscription @(topic)_sub{ORB_ID(@(topic))}; @[ end for]@ }; -@[end if]@ - -@[if send_topics]@ struct SendThreadArgs { + const uint32_t &datarate; uint64_t &total_sent; uint64_t &sent_last_sec; uint64_t &sent; int &sent_loop; - SendThreadArgs(uint64_t &total_sent_, uint64_t &sent_last_sec_, uint64_t &sent_, int &sent_loop_) - : total_sent(total_sent_), + SendThreadArgs(const uint32_t &datarate_, uint64_t &total_sent_, + uint64_t &sent_last_sec_, uint64_t &sent_, int &sent_loop_) + : datarate(datarate_), + total_sent(total_sent_), sent_last_sec(sent_last_sec_), sent(sent_), sent_loop(sent_loop_) {} @@ -119,26 +112,26 @@ struct SendThreadArgs { void *send(void *args) { - char data_buffer[BUFFER_SIZE] = {}; - int read = 0; - uint32_t length = 0; - size_t header_length = 0; + char data_buffer[BUFFER_SIZE]{}; + int read{0}; + uint32_t length{0}; + size_t header_length{0}; + uint8_t last_msg_seq{0}; + uint8_t last_remote_msg_seq{0}; + struct SendThreadArgs *data = reinterpret_cast(args); SendTopicsSubs *subs = new SendTopicsSubs(); + float bandwidth_mult{0}; + float tx_interval{1.f}; + uint64_t tx_last_sec_read{0}; + hrt_abstime last_stats_update{0}; + // ucdrBuffer to serialize using the user defined buffer ucdrBuffer writer; header_length = transport_node->get_header_length(); ucdr_init_buffer(&writer, reinterpret_cast(&data_buffer[header_length]), BUFFER_SIZE - header_length); - pthread_t tx_per_second_thread; - int rc = pthread_create(&tx_per_second_thread, nullptr, &tx_per_second, (void *)&data->sent_last_sec); - if (rc != 0) { - errno = rc; - PX4_ERR("Could not create tx counter thread (%d)", errno); - return nullptr; - } - while (!_should_exit_task) { @[ for idx, topic in enumerate(send_topics)]@ { @@ -163,9 +156,7 @@ void *send(void *args) if (0 < (read = transport_node->write(static_cast(@(rtps_message_id(ids, topic))), data_buffer, length))) { data->total_sent += read; - pthread_mutex_lock(&tx_lock); tx_last_sec_read += read; - pthread_mutex_unlock(&tx_lock); ++data->sent; } @@ -176,40 +167,31 @@ void *send(void *args) } } @[ end for]@ - px4_usleep(_options.sleep_us); + + if (hrt_absolute_time() - last_stats_update >= 1_s) { + data->sent_last_sec = tx_last_sec_read; + if (data->datarate > 0) { + bandwidth_mult = static_cast(data->datarate) / static_cast(tx_last_sec_read); + // Apply a low-pass filter to determine the new TX interval + tx_interval += 0.5f * (tx_interval / bandwidth_mult - tx_interval); + // Clamp the interval between 1 and 1000 ms + tx_interval = math::constrain(tx_interval, MIN_TX_INTERVAL_US, MAX_TX_INTERVAL_US); + } + tx_last_sec_read = 0; + last_stats_update = hrt_absolute_time(); + } + + px4_usleep(tx_interval); + ++data->sent_loop; } - pthread_join(tx_per_second_thread, nullptr); - delete subs; + delete(data); + delete(subs); return nullptr; } -void *tx_per_second(void *sent_last_sec) -{ - while (!_should_exit_task) { - pthread_mutex_lock(&tx_lock); - *((uint64_t *) sent_last_sec) = tx_last_sec_read; - tx_last_sec_read = 0; - pthread_mutex_unlock(&tx_lock); - px4_sleep(1); - } - return nullptr; -} - -void *rx_per_second(void *rcvd_last_sec) -{ - while (!_should_exit_task) { - pthread_mutex_lock(&rx_lock); - *((uint64_t *) rcvd_last_sec) = rx_last_sec_read; - rx_last_sec_read = 0; - pthread_mutex_unlock(&rx_lock); - px4_sleep(1); - } - return nullptr; -} - static int launch_send_thread(pthread_t &sender_thread, struct SendThreadArgs &args) { pthread_attr_t sender_thread_attr; @@ -225,8 +207,8 @@ static int launch_send_thread(pthread_t &sender_thread, struct SendThreadArgs &a PX4_ERR("Could not create send thread (%d)", errno); return -1; } - rc = pthread_setname_np(sender_thread, "micrortps_client_send"); - if (pthread_setname_np(sender_thread, "micrortps_client_send")) { + rc = pthread_setname_np(sender_thread, "urtpsclient_snd"); + if (pthread_setname_np(sender_thread, "urtpsclient_snd")) { errno = rc; PX4_ERR("Could not set pthread name for the send thread (%d)", errno); } @@ -236,13 +218,21 @@ static int launch_send_thread(pthread_t &sender_thread, struct SendThreadArgs &a } @[end if]@ -void micrortps_start_topics(struct timespec &begin, uint64_t &total_rcvd, uint64_t &total_sent, uint64_t &sent_last_sec, +void micrortps_start_topics(const uint32_t &datarate, struct timespec &begin, uint64_t &total_rcvd, + uint64_t &total_sent, uint64_t &sent_last_sec, uint64_t &rcvd_last_sec, uint64_t &received, uint64_t &sent, int &rcvd_loop, int &sent_loop) { + px4_clock_gettime(CLOCK_REALTIME, &begin); + _should_exit_task = false; + @[if recv_topics]@ - char data_buffer[BUFFER_SIZE] = {}; - int read = 0; - uint8_t topic_ID = 255; + char data_buffer[BUFFER_SIZE]{}; + int read{0}; + uint8_t topic_ID{255}; + + uint64_t rx_last_sec_read{0}; + hrt_abstime last_stats_update{0}; + RcvTopicsPubs *pubs = new RcvTopicsPubs(); // Set the main task name to 'urtpsclient_rcv' in case there is @@ -252,21 +242,11 @@ void micrortps_start_topics(struct timespec &begin, uint64_t &total_rcvd, uint64 // ucdrBuffer to deserialize using the user defined buffer ucdrBuffer reader; ucdr_init_buffer(&reader, reinterpret_cast(data_buffer), BUFFER_SIZE); - - pthread_t rx_per_second_thread; - int rc = pthread_create(&rx_per_second_thread, nullptr, &rx_per_second, (void *)&rcvd_last_sec); - if (rc != 0) { - errno = rc; - PX4_ERR("Could not create rx counter thread (%d)", errno); - return; - } @[end if]@ - px4_clock_gettime(CLOCK_REALTIME, &begin); - _should_exit_task = false; @[if send_topics]@ // var struct to be updated on the thread - SendThreadArgs *sender_thread_args = new SendThreadArgs(total_sent, sent_last_sec, sent, sent_loop); + SendThreadArgs *sender_thread_args = new SendThreadArgs(datarate, total_sent, sent_last_sec, sent, sent_loop); // create a thread for sending data pthread_t sender_thread; @@ -275,11 +255,9 @@ void micrortps_start_topics(struct timespec &begin, uint64_t &total_rcvd, uint64 while (!_should_exit_task) { @[if recv_topics]@ - while (0 < (read = transport_node->read(&topic_ID, data_buffer, BUFFER_SIZE))) { + if (0 < (read = transport_node->read(&topic_ID, data_buffer, BUFFER_SIZE))) { total_rcvd += read; - pthread_mutex_lock(&rx_lock); - rcvd_last_sec += read; - pthread_mutex_unlock(&rx_lock); + rx_last_sec_read += read; uint64_t read_time = hrt_absolute_time(); @@ -307,6 +285,12 @@ void micrortps_start_topics(struct timespec &begin, uint64_t &total_rcvd, uint64 } @[end if]@ + if (hrt_absolute_time() - last_stats_update >= 1_s) { + rcvd_last_sec = rx_last_sec_read; + rx_last_sec_read = 0; + last_stats_update = hrt_absolute_time(); + } + // loop forever if informed loop number is negative if (_options.loops >= 0 && rcvd_loop >= _options.loops) { break; } @@ -315,12 +299,10 @@ void micrortps_start_topics(struct timespec &begin, uint64_t &total_rcvd, uint64 } @[if send_topics]@ - delete sender_thread_args; _should_exit_task = true; pthread_join(sender_thread, nullptr); @[end if]@ @[if recv_topics]@ - pthread_join(rx_per_second_thread, nullptr); - delete pubs; + delete(pubs); @[end if]@ } diff --git a/src/modules/micrortps_bridge/micrortps_client/microRTPS_client.h b/src/modules/micrortps_bridge/micrortps_client/microRTPS_client.h index 37b149d989..381b9a1603 100644 --- a/src/modules/micrortps_bridge/micrortps_client/microRTPS_client.h +++ b/src/modules/micrortps_bridge/micrortps_client/microRTPS_client.h @@ -33,7 +33,7 @@ #pragma once -#include "microRTPS_transport.h" +#include #include #include @@ -49,21 +49,22 @@ #include #include -#define LOOPS -1 -#define SLEEP_US 1 -#define BAUDRATE 460800 -#define DEVICE "/dev/ttyACM0" -#define POLL_MS 1 -#define IP "127.0.0.1" -#define DEFAULT_RECV_PORT 2019 -#define DEFAULT_SEND_PORT 2020 +#define LOOPS -1 +#define SLEEP_US 1000 +#define BAUDRATE 460800 +#define MAX_DATA_RATE 10000000 +#define DEVICE "/dev/ttyACM0" +#define POLL_MS 1 +#define IP "127.0.0.1" +#define DEFAULT_RECV_PORT 2019 +#define DEFAULT_SEND_PORT 2020 +#define MIN_TX_INTERVAL_US 1000.f +#define MAX_TX_INTERVAL_US 1000000.f void *send(void *args); -void *tx_per_second(void *sent_last_sec); -void *rx_per_second(void *rcvd_last_sec); - -void micrortps_start_topics(struct timespec &begin, uint64_t &total_rcvd, uint64_t &total_sent, uint64_t &sent_last_sec, +void micrortps_start_topics(const uint32_t &datarate, struct timespec &begin, uint64_t &total_rcvd, + uint64_t &total_sent, uint64_t &sent_last_sec, uint64_t &rcvd_last_sec, uint64_t &received, uint64_t &sent, int &rcvd_loop, int &sent_loop); struct baudtype { @@ -83,6 +84,7 @@ struct options { uint16_t send_port = DEFAULT_SEND_PORT; uint32_t sleep_us = SLEEP_US; uint32_t baudrate = BAUDRATE; + uint32_t datarate = 0; uint32_t poll_ms = POLL_MS; int loops = LOOPS; bool sw_flow_control = false; diff --git a/src/modules/micrortps_bridge/micrortps_client/microRTPS_client_main.cpp b/src/modules/micrortps_bridge/micrortps_client/microRTPS_client_main.cpp index ae81c0a103..228b3d3de9 100644 --- a/src/modules/micrortps_bridge/micrortps_client/microRTPS_client_main.cpp +++ b/src/modules/micrortps_bridge/micrortps_client/microRTPS_client_main.cpp @@ -31,8 +31,8 @@ * ****************************************************************************/ -#include "microRTPS_transport.h" -#include "microRTPS_client.h" +#include +#include #include #include @@ -75,10 +75,11 @@ static void usage(const char *name) PRINT_MODULE_USAGE_PARAM_STRING('t', "UART", "UART|UDP", "Transport protocol", true); PRINT_MODULE_USAGE_PARAM_STRING('d', "/dev/ttyACM0", "", "Select Serial Device", true); PRINT_MODULE_USAGE_PARAM_INT('b', 460800, 9600, 3000000, "Baudrate (can also be p:)", true); + PRINT_MODULE_USAGE_PARAM_INT('m', 0, 10, 10000000, "Maximum sending data rate in B/s", true); PRINT_MODULE_USAGE_PARAM_INT('p', -1, 1, 1000, "Poll timeout for UART in ms", true); PRINT_MODULE_USAGE_PARAM_INT('l', 10000, -1, 100000, "Limit number of iterations until the program exits (-1=infinite)", true); - PRINT_MODULE_USAGE_PARAM_INT('w', 1, 1, 1000000, "Time in us for which each iteration sleeps", true); + PRINT_MODULE_USAGE_PARAM_INT('w', 1, 1, 1000000, "Time in us for which each read from the link iteration sleeps", true); PRINT_MODULE_USAGE_PARAM_INT('r', 2019, 0, 65536, "Select UDP Network Port for receiving (local)", true); PRINT_MODULE_USAGE_PARAM_INT('s', 2020, 0, 65536, "Select UDP Network Port for sending (remote)", true); PRINT_MODULE_USAGE_PARAM_STRING('i', "127.0.0.1", "", "Select IP address (remote)", true); @@ -96,7 +97,7 @@ static int parse_options(int argc, char *argv[]) int myoptind = 1; const char *myoptarg = nullptr; - while ((ch = px4_getopt(argc, argv, "t:d:l:w:b:p:r:s:i:fhv", &myoptind, &myoptarg)) != EOF) { + while ((ch = px4_getopt(argc, argv, "t:d:l:w:b:m:p:r:s:i:fhv", &myoptind, &myoptarg)) != EOF) { switch (ch) { case 't': _options.transport = strcmp(myoptarg, "UDP") == 0 ? options::eTransports::UDP @@ -120,6 +121,8 @@ static int parse_options(int argc, char *argv[]) break; } + case 'm': _options.datarate = strtoul(myoptarg, nullptr, 10); break; + case 'p': _options.poll_ms = strtoul(myoptarg, nullptr, 10); break; case 'r': _options.recv_port = strtoul(myoptarg, nullptr, 10); break; @@ -140,9 +143,13 @@ static int parse_options(int argc, char *argv[]) } } + if (_options.datarate > MAX_DATA_RATE) { + _options.datarate = MAX_DATA_RATE; + PX4_WARN("Invalid data rate. Using max datarate of %ul B/s", MAX_DATA_RATE); + } + if (_options.poll_ms < 1) { - _options.poll_ms = 1; - PX4_ERR("poll timeout too low, using 1 ms"); + PX4_WARN("Poll timeout too low, using %ul ms", POLL_MS); } if (_options.hw_flow_control && _options.sw_flow_control) { @@ -193,7 +200,8 @@ static int micrortps_start(int argc, char *argv[]) return -1; } - micrortps_start_topics(begin, total_rcvd, total_sent, sent_last_sec, rcvd_last_sec, received, sent, rcv_loop, + micrortps_start_topics(_options.datarate, begin, total_rcvd, total_sent, sent_last_sec, rcvd_last_sec, received, sent, + rcv_loop, send_loop); px4_clock_gettime(CLOCK_REALTIME, &end); @@ -262,10 +270,18 @@ int micrortps_client_main(int argc, char *argv[]) printf("\ttotal data read: %" PRIu64 " bytes\n", total_rcvd); printf("\ttotal data sent: %" PRIu64 " bytes\n", total_sent); printf("\trates:\n"); - printf("\t rx: %.3f kB/s\n", rcvd_last_sec / 1E3); - printf("\t tx: %.3f kB/s\n", sent_last_sec / 1E3); - printf("\t avg rx: %.3f kB/s\n", static_cast(total_rcvd / (1e3 * elapsed_secs))); - printf("\t avg tx: %.3f kB/s\n", static_cast(total_sent / (1e3 * elapsed_secs))); + printf("\t rx: %.3f kB/s\n", rcvd_last_sec / 1e3); + printf("\t tx: %.3f kB/s\n", sent_last_sec / 1e3); + printf("\t avg rx: %.3f kB/s\n", total_rcvd / (1e3 * elapsed_secs)); + printf("\t avg tx: %.3f kB/s\n", total_sent / (1e3 * elapsed_secs)); + printf("\t tx rate max:"); + + if (_options.datarate != 0) { + printf(" %.1f kB/s\n", _options.datarate / 1e3); + + } else { + printf(" Unlimited\n"); + } } return 0;