diff --git a/src/modules/mavlink/CMakeLists.txt b/src/modules/mavlink/CMakeLists.txt index 01ccb41a77..d295d43744 100644 --- a/src/modules/mavlink/CMakeLists.txt +++ b/src/modules/mavlink/CMakeLists.txt @@ -1,6 +1,6 @@ ############################################################################ # -# Copyright (c) 2015-2021 PX4 Development Team. All rights reserved. +# Copyright (c) 2015-2023 PX4 Development Team. All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions @@ -141,6 +141,7 @@ px4_add_module( mavlink_c timesync tunes + variable_length_ringbuffer version UNITY_BUILD ) diff --git a/src/modules/mavlink/mavlink_main.cpp b/src/modules/mavlink/mavlink_main.cpp index 2b8b9da1a7..80bcc8f5a7 100644 --- a/src/modules/mavlink/mavlink_main.cpp +++ b/src/modules/mavlink/mavlink_main.cpp @@ -1,6 +1,6 @@ /**************************************************************************** * - * Copyright (c) 2012-2021 PX4 Development Team. All rights reserved. + * Copyright (c) 2012-2023 PX4 Development Team. All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions @@ -179,6 +179,7 @@ Mavlink::~Mavlink() perf_free(_loop_perf); perf_free(_loop_interval_perf); perf_free(_send_byte_error_perf); + perf_free(_forwarding_error_perf); } void @@ -1224,117 +1225,16 @@ Mavlink::configure_stream_threadsafe(const char *stream_name, const float rate) } } -int -Mavlink::message_buffer_init(int size) -{ - _message_buffer.size = size; - _message_buffer.write_ptr = 0; - _message_buffer.read_ptr = 0; - _message_buffer.data = (char *)malloc(_message_buffer.size); - - int ret; - - if (_message_buffer.data == nullptr) { - ret = PX4_ERROR; - _message_buffer.size = 0; - - } else { - ret = OK; - } - - return ret; -} - -void -Mavlink::message_buffer_destroy() -{ - _message_buffer.size = 0; - _message_buffer.write_ptr = 0; - _message_buffer.read_ptr = 0; - free(_message_buffer.data); -} - -int -Mavlink::message_buffer_count() -{ - int n = _message_buffer.write_ptr - _message_buffer.read_ptr; - - if (n < 0) { - n += _message_buffer.size; - } - - return n; -} - -bool -Mavlink::message_buffer_write(const void *ptr, int size) -{ - // bytes available to write - int available = _message_buffer.read_ptr - _message_buffer.write_ptr - 1; - - if (available < 0) { - available += _message_buffer.size; - } - - if (size > available) { - // buffer overflow - return false; - } - - char *c = (char *) ptr; - int n = _message_buffer.size - _message_buffer.write_ptr; // bytes to end of the buffer - - if (n < size) { - // message goes over end of the buffer - memcpy(&(_message_buffer.data[_message_buffer.write_ptr]), c, n); - _message_buffer.write_ptr = 0; - - } else { - n = 0; - } - - // now: n = bytes already written - int p = size - n; // number of bytes to write - memcpy(&(_message_buffer.data[_message_buffer.write_ptr]), &(c[n]), p); - _message_buffer.write_ptr = (_message_buffer.write_ptr + p) % _message_buffer.size; - return true; -} - -int -Mavlink::message_buffer_get_ptr(void **ptr, bool *is_part) -{ - // bytes available to read - int available = _message_buffer.write_ptr - _message_buffer.read_ptr; - - if (available == 0) { - return 0; // buffer is empty - } - - int n = 0; - - if (available > 0) { - // read pointer is before write pointer, all available bytes can be read - n = available; - *is_part = false; - - } else { - // read pointer is after write pointer, read bytes from read_ptr to end of the buffer - n = _message_buffer.size - _message_buffer.read_ptr; - *is_part = _message_buffer.write_ptr > 0; - } - - *ptr = &(_message_buffer.data[_message_buffer.read_ptr]); - return n; -} - void Mavlink::pass_message(const mavlink_message_t *msg) { - /* size is 8 bytes plus variable payload */ + /* size is 12 bytes plus variable payload */ int size = MAVLINK_NUM_NON_PAYLOAD_BYTES + msg->len; - pthread_mutex_lock(&_message_buffer_mutex); - message_buffer_write(msg, size); - pthread_mutex_unlock(&_message_buffer_mutex); + LockGuard lg{_message_buffer_mutex}; + + if (!_message_buffer.push_back(reinterpret_cast(msg), size)) { + perf_count(_forwarding_error_perf); + } } MavlinkShell * @@ -2217,7 +2117,7 @@ Mavlink::task_main(int argc, char *argv[]) return PX4_ERROR; } - /* initialize send mutex */ + pthread_mutex_init(&_message_buffer_mutex, nullptr); pthread_mutex_init(&_send_mutex, nullptr); pthread_mutex_init(&_radio_status_mutex, nullptr); @@ -2227,13 +2127,12 @@ Mavlink::task_main(int argc, char *argv[]) * make space for two messages plus off-by-one space as we use the empty element * marker ring buffer approach. */ - if (OK != message_buffer_init(2 * sizeof(mavlink_message_t) + 1)) { + LockGuard lg{_message_buffer_mutex}; + + if (!_message_buffer.allocate(2 * sizeof(mavlink_message_t) + 1)) { PX4_ERR("msg buf alloc fail"); return PX4_ERROR; } - - /* initialize message buffer mutex */ - pthread_mutex_init(&_message_buffer_mutex, nullptr); } /* Activate sending the data by default (for the IRIDIUM mode it will be disabled after the first round of packages is sent)*/ @@ -2575,50 +2474,21 @@ Mavlink::task_main(int argc, char *argv[]) _events.update(t); - /* pass messages from other UARTs */ + /* pass messages from other instances */ if (get_forwarding_on()) { - bool is_part; - uint8_t *read_ptr; - uint8_t *write_ptr; - - pthread_mutex_lock(&_message_buffer_mutex); - int available = message_buffer_get_ptr((void **)&read_ptr, &is_part); - pthread_mutex_unlock(&_message_buffer_mutex); - - if (available > 0) { - // Reconstruct message from buffer - - mavlink_message_t msg; - write_ptr = (uint8_t *)&msg; - - // Pull a single message from the buffer - size_t read_count = available; - - if (read_count > sizeof(mavlink_message_t)) { - read_count = sizeof(mavlink_message_t); - } - - memcpy(write_ptr, read_ptr, read_count); - - // We hold the mutex until after we complete the second part of the buffer. If we don't - // we may end up breaking the empty slot overflow detection semantics when we mark the - // possibly partial read below. - pthread_mutex_lock(&_message_buffer_mutex); - - message_buffer_mark_read(read_count); - - /* write second part of buffer if there is some */ - if (is_part && read_count < sizeof(mavlink_message_t)) { - write_ptr += read_count; - available = message_buffer_get_ptr((void **)&read_ptr, &is_part); - read_count = sizeof(mavlink_message_t) - read_count; - memcpy(write_ptr, read_ptr, read_count); - message_buffer_mark_read(available); - } - - pthread_mutex_unlock(&_message_buffer_mutex); + mavlink_message_t msg; + size_t available_bytes; + { + // We only send one message at a time, not to put too much strain on a + // link from forwarded messages. + LockGuard lg{_message_buffer_mutex}; + available_bytes = _message_buffer.pop_front(reinterpret_cast(&msg), sizeof(msg)); + // We need to make sure to release the lock here before sending the + // bytes out via IP or UART which could potentially take longer. + } + if (available_bytes > 0) { resend_message(&msg); } } @@ -2668,11 +2538,6 @@ Mavlink::task_main(int argc, char *argv[]) _socket_fd = -1; } - if (get_forwarding_on()) { - message_buffer_destroy(); - pthread_mutex_destroy(&_message_buffer_mutex); - } - if (_mavlink_ulog) { _mavlink_ulog->stop(); _mavlink_ulog = nullptr; @@ -2680,6 +2545,7 @@ Mavlink::task_main(int argc, char *argv[]) pthread_mutex_destroy(&_send_mutex); pthread_mutex_destroy(&_radio_status_mutex); + pthread_mutex_destroy(&_message_buffer_mutex); PX4_INFO("exiting channel %i", (int)_channel); diff --git a/src/modules/mavlink/mavlink_main.h b/src/modules/mavlink/mavlink_main.h index 85c0361de3..1ba71c6a2b 100644 --- a/src/modules/mavlink/mavlink_main.h +++ b/src/modules/mavlink/mavlink_main.h @@ -1,6 +1,6 @@ /**************************************************************************** * - * Copyright (c) 2012-2018 PX4 Development Team. All rights reserved. + * Copyright (c) 2012-2023 PX4 Development Team. All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions @@ -60,6 +60,7 @@ #include #include +#include #include #include #include @@ -419,11 +420,6 @@ public: bool get_wait_to_transmit() { return _wait_to_transmit; } bool should_transmit() { return (_transmitting_enabled && (!_wait_to_transmit || (_wait_to_transmit && _received_messages))); } - bool message_buffer_write(const void *ptr, int size); - - void lockMessageBufferMutex(void) { pthread_mutex_lock(&_message_buffer_mutex); } - void unlockMessageBufferMutex(void) { pthread_mutex_unlock(&_message_buffer_mutex); } - /** * Count transmitted bytes */ @@ -651,16 +647,9 @@ private: ping_statistics_s _ping_stats {}; - struct mavlink_message_buffer { - int write_ptr; - int read_ptr; - int size; - char *data; - }; + pthread_mutex_t _message_buffer_mutex{}; + VariableLengthRingbuffer _message_buffer{}; - mavlink_message_buffer _message_buffer {}; - - pthread_mutex_t _message_buffer_mutex {}; pthread_mutex_t _send_mutex {}; pthread_mutex_t _radio_status_mutex {}; @@ -682,6 +671,7 @@ private: perf_counter_t _loop_perf{perf_alloc(PC_ELAPSED, MODULE_NAME": tx run elapsed")}; /**< loop performance counter */ perf_counter_t _loop_interval_perf{perf_alloc(PC_INTERVAL, MODULE_NAME": tx run interval")}; /**< loop interval performance counter */ perf_counter_t _send_byte_error_perf{perf_alloc(PC_COUNT, MODULE_NAME": send_bytes error")}; /**< send bytes error count */ + perf_counter_t _forwarding_error_perf{perf_alloc(PC_COUNT, MODULE_NAME": forwarding error")}; /**< forwarding messages error count */ void mavlink_update_parameters(); @@ -711,18 +701,6 @@ private: */ int configure_streams_to_default(const char *configure_single_stream = nullptr); - int message_buffer_init(int size); - - void message_buffer_destroy(); - - int message_buffer_count(); - - int message_buffer_is_empty() const { return (_message_buffer.read_ptr == _message_buffer.write_ptr); } - - int message_buffer_get_ptr(void **ptr, bool *is_part); - - void message_buffer_mark_read(int n) { _message_buffer.read_ptr = (_message_buffer.read_ptr + n) % _message_buffer.size; } - void pass_message(const mavlink_message_t *msg); void publish_telemetry_status();