mavlink: fix MAVLink message forwarding

This switches from the horribly intertwined ringbuffer implementation to
the new VariableLengthRingbuffer implementation.

By ditching the previous implementation, we fix MAVLink message
forwarding, which didn't work reliably. The reason it didn't work is
that multiple mavlink messages could be added but only one of them was
sent out because the buffer didn't keep track of the messages properly
and only read the first one.

Signed-off-by: Julian Oes <julian@oes.ch>
This commit is contained in:
Julian Oes 2023-10-06 08:12:26 +13:00 committed by Daniel Agar
parent da34e5e2c8
commit fefdad83bf
3 changed files with 32 additions and 187 deletions

View File

@ -1,6 +1,6 @@
############################################################################ ############################################################################
# #
# Copyright (c) 2015-2021 PX4 Development Team. All rights reserved. # Copyright (c) 2015-2023 PX4 Development Team. All rights reserved.
# #
# Redistribution and use in source and binary forms, with or without # Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions # modification, are permitted provided that the following conditions
@ -141,6 +141,7 @@ px4_add_module(
mavlink_c mavlink_c
timesync timesync
tunes tunes
variable_length_ringbuffer
version version
UNITY_BUILD UNITY_BUILD
) )

View File

@ -1,6 +1,6 @@
/**************************************************************************** /****************************************************************************
* *
* Copyright (c) 2012-2021 PX4 Development Team. All rights reserved. * Copyright (c) 2012-2023 PX4 Development Team. All rights reserved.
* *
* Redistribution and use in source and binary forms, with or without * Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions * modification, are permitted provided that the following conditions
@ -179,6 +179,7 @@ Mavlink::~Mavlink()
perf_free(_loop_perf); perf_free(_loop_perf);
perf_free(_loop_interval_perf); perf_free(_loop_interval_perf);
perf_free(_send_byte_error_perf); perf_free(_send_byte_error_perf);
perf_free(_forwarding_error_perf);
} }
void void
@ -1224,117 +1225,16 @@ Mavlink::configure_stream_threadsafe(const char *stream_name, const float rate)
} }
} }
int
Mavlink::message_buffer_init(int size)
{
_message_buffer.size = size;
_message_buffer.write_ptr = 0;
_message_buffer.read_ptr = 0;
_message_buffer.data = (char *)malloc(_message_buffer.size);
int ret;
if (_message_buffer.data == nullptr) {
ret = PX4_ERROR;
_message_buffer.size = 0;
} else {
ret = OK;
}
return ret;
}
void
Mavlink::message_buffer_destroy()
{
_message_buffer.size = 0;
_message_buffer.write_ptr = 0;
_message_buffer.read_ptr = 0;
free(_message_buffer.data);
}
int
Mavlink::message_buffer_count()
{
int n = _message_buffer.write_ptr - _message_buffer.read_ptr;
if (n < 0) {
n += _message_buffer.size;
}
return n;
}
bool
Mavlink::message_buffer_write(const void *ptr, int size)
{
// bytes available to write
int available = _message_buffer.read_ptr - _message_buffer.write_ptr - 1;
if (available < 0) {
available += _message_buffer.size;
}
if (size > available) {
// buffer overflow
return false;
}
char *c = (char *) ptr;
int n = _message_buffer.size - _message_buffer.write_ptr; // bytes to end of the buffer
if (n < size) {
// message goes over end of the buffer
memcpy(&(_message_buffer.data[_message_buffer.write_ptr]), c, n);
_message_buffer.write_ptr = 0;
} else {
n = 0;
}
// now: n = bytes already written
int p = size - n; // number of bytes to write
memcpy(&(_message_buffer.data[_message_buffer.write_ptr]), &(c[n]), p);
_message_buffer.write_ptr = (_message_buffer.write_ptr + p) % _message_buffer.size;
return true;
}
int
Mavlink::message_buffer_get_ptr(void **ptr, bool *is_part)
{
// bytes available to read
int available = _message_buffer.write_ptr - _message_buffer.read_ptr;
if (available == 0) {
return 0; // buffer is empty
}
int n = 0;
if (available > 0) {
// read pointer is before write pointer, all available bytes can be read
n = available;
*is_part = false;
} else {
// read pointer is after write pointer, read bytes from read_ptr to end of the buffer
n = _message_buffer.size - _message_buffer.read_ptr;
*is_part = _message_buffer.write_ptr > 0;
}
*ptr = &(_message_buffer.data[_message_buffer.read_ptr]);
return n;
}
void void
Mavlink::pass_message(const mavlink_message_t *msg) Mavlink::pass_message(const mavlink_message_t *msg)
{ {
/* size is 8 bytes plus variable payload */ /* size is 12 bytes plus variable payload */
int size = MAVLINK_NUM_NON_PAYLOAD_BYTES + msg->len; int size = MAVLINK_NUM_NON_PAYLOAD_BYTES + msg->len;
pthread_mutex_lock(&_message_buffer_mutex); LockGuard lg{_message_buffer_mutex};
message_buffer_write(msg, size);
pthread_mutex_unlock(&_message_buffer_mutex); if (!_message_buffer.push_back(reinterpret_cast<const uint8_t *>(msg), size)) {
perf_count(_forwarding_error_perf);
}
} }
MavlinkShell * MavlinkShell *
@ -2217,7 +2117,7 @@ Mavlink::task_main(int argc, char *argv[])
return PX4_ERROR; return PX4_ERROR;
} }
/* initialize send mutex */ pthread_mutex_init(&_message_buffer_mutex, nullptr);
pthread_mutex_init(&_send_mutex, nullptr); pthread_mutex_init(&_send_mutex, nullptr);
pthread_mutex_init(&_radio_status_mutex, nullptr); pthread_mutex_init(&_radio_status_mutex, nullptr);
@ -2227,13 +2127,12 @@ Mavlink::task_main(int argc, char *argv[])
* make space for two messages plus off-by-one space as we use the empty element * make space for two messages plus off-by-one space as we use the empty element
* marker ring buffer approach. * marker ring buffer approach.
*/ */
if (OK != message_buffer_init(2 * sizeof(mavlink_message_t) + 1)) { LockGuard lg{_message_buffer_mutex};
if (!_message_buffer.allocate(2 * sizeof(mavlink_message_t) + 1)) {
PX4_ERR("msg buf alloc fail"); PX4_ERR("msg buf alloc fail");
return PX4_ERROR; return PX4_ERROR;
} }
/* initialize message buffer mutex */
pthread_mutex_init(&_message_buffer_mutex, nullptr);
} }
/* Activate sending the data by default (for the IRIDIUM mode it will be disabled after the first round of packages is sent)*/ /* Activate sending the data by default (for the IRIDIUM mode it will be disabled after the first round of packages is sent)*/
@ -2575,50 +2474,21 @@ Mavlink::task_main(int argc, char *argv[])
_events.update(t); _events.update(t);
/* pass messages from other UARTs */ /* pass messages from other instances */
if (get_forwarding_on()) { if (get_forwarding_on()) {
bool is_part;
uint8_t *read_ptr;
uint8_t *write_ptr;
pthread_mutex_lock(&_message_buffer_mutex);
int available = message_buffer_get_ptr((void **)&read_ptr, &is_part);
pthread_mutex_unlock(&_message_buffer_mutex);
if (available > 0) {
// Reconstruct message from buffer
mavlink_message_t msg; mavlink_message_t msg;
write_ptr = (uint8_t *)&msg; size_t available_bytes;
{
// Pull a single message from the buffer // We only send one message at a time, not to put too much strain on a
size_t read_count = available; // link from forwarded messages.
LockGuard lg{_message_buffer_mutex};
if (read_count > sizeof(mavlink_message_t)) { available_bytes = _message_buffer.pop_front(reinterpret_cast<uint8_t *>(&msg), sizeof(msg));
read_count = sizeof(mavlink_message_t); // We need to make sure to release the lock here before sending the
// bytes out via IP or UART which could potentially take longer.
} }
memcpy(write_ptr, read_ptr, read_count); if (available_bytes > 0) {
// We hold the mutex until after we complete the second part of the buffer. If we don't
// we may end up breaking the empty slot overflow detection semantics when we mark the
// possibly partial read below.
pthread_mutex_lock(&_message_buffer_mutex);
message_buffer_mark_read(read_count);
/* write second part of buffer if there is some */
if (is_part && read_count < sizeof(mavlink_message_t)) {
write_ptr += read_count;
available = message_buffer_get_ptr((void **)&read_ptr, &is_part);
read_count = sizeof(mavlink_message_t) - read_count;
memcpy(write_ptr, read_ptr, read_count);
message_buffer_mark_read(available);
}
pthread_mutex_unlock(&_message_buffer_mutex);
resend_message(&msg); resend_message(&msg);
} }
} }
@ -2668,11 +2538,6 @@ Mavlink::task_main(int argc, char *argv[])
_socket_fd = -1; _socket_fd = -1;
} }
if (get_forwarding_on()) {
message_buffer_destroy();
pthread_mutex_destroy(&_message_buffer_mutex);
}
if (_mavlink_ulog) { if (_mavlink_ulog) {
_mavlink_ulog->stop(); _mavlink_ulog->stop();
_mavlink_ulog = nullptr; _mavlink_ulog = nullptr;
@ -2680,6 +2545,7 @@ Mavlink::task_main(int argc, char *argv[])
pthread_mutex_destroy(&_send_mutex); pthread_mutex_destroy(&_send_mutex);
pthread_mutex_destroy(&_radio_status_mutex); pthread_mutex_destroy(&_radio_status_mutex);
pthread_mutex_destroy(&_message_buffer_mutex);
PX4_INFO("exiting channel %i", (int)_channel); PX4_INFO("exiting channel %i", (int)_channel);

View File

@ -1,6 +1,6 @@
/**************************************************************************** /****************************************************************************
* *
* Copyright (c) 2012-2018 PX4 Development Team. All rights reserved. * Copyright (c) 2012-2023 PX4 Development Team. All rights reserved.
* *
* Redistribution and use in source and binary forms, with or without * Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions * modification, are permitted provided that the following conditions
@ -60,6 +60,7 @@
#include <containers/List.hpp> #include <containers/List.hpp>
#include <parameters/param.h> #include <parameters/param.h>
#include <lib/variable_length_ringbuffer/VariableLengthRingbuffer.hpp>
#include <perf/perf_counter.h> #include <perf/perf_counter.h>
#include <px4_platform_common/cli.h> #include <px4_platform_common/cli.h>
#include <px4_platform_common/px4_config.h> #include <px4_platform_common/px4_config.h>
@ -419,11 +420,6 @@ public:
bool get_wait_to_transmit() { return _wait_to_transmit; } bool get_wait_to_transmit() { return _wait_to_transmit; }
bool should_transmit() { return (_transmitting_enabled && (!_wait_to_transmit || (_wait_to_transmit && _received_messages))); } bool should_transmit() { return (_transmitting_enabled && (!_wait_to_transmit || (_wait_to_transmit && _received_messages))); }
bool message_buffer_write(const void *ptr, int size);
void lockMessageBufferMutex(void) { pthread_mutex_lock(&_message_buffer_mutex); }
void unlockMessageBufferMutex(void) { pthread_mutex_unlock(&_message_buffer_mutex); }
/** /**
* Count transmitted bytes * Count transmitted bytes
*/ */
@ -651,16 +647,9 @@ private:
ping_statistics_s _ping_stats {}; ping_statistics_s _ping_stats {};
struct mavlink_message_buffer { pthread_mutex_t _message_buffer_mutex{};
int write_ptr; VariableLengthRingbuffer _message_buffer{};
int read_ptr;
int size;
char *data;
};
mavlink_message_buffer _message_buffer {};
pthread_mutex_t _message_buffer_mutex {};
pthread_mutex_t _send_mutex {}; pthread_mutex_t _send_mutex {};
pthread_mutex_t _radio_status_mutex {}; pthread_mutex_t _radio_status_mutex {};
@ -682,6 +671,7 @@ private:
perf_counter_t _loop_perf{perf_alloc(PC_ELAPSED, MODULE_NAME": tx run elapsed")}; /**< loop performance counter */ perf_counter_t _loop_perf{perf_alloc(PC_ELAPSED, MODULE_NAME": tx run elapsed")}; /**< loop performance counter */
perf_counter_t _loop_interval_perf{perf_alloc(PC_INTERVAL, MODULE_NAME": tx run interval")}; /**< loop interval performance counter */ perf_counter_t _loop_interval_perf{perf_alloc(PC_INTERVAL, MODULE_NAME": tx run interval")}; /**< loop interval performance counter */
perf_counter_t _send_byte_error_perf{perf_alloc(PC_COUNT, MODULE_NAME": send_bytes error")}; /**< send bytes error count */ perf_counter_t _send_byte_error_perf{perf_alloc(PC_COUNT, MODULE_NAME": send_bytes error")}; /**< send bytes error count */
perf_counter_t _forwarding_error_perf{perf_alloc(PC_COUNT, MODULE_NAME": forwarding error")}; /**< forwarding messages error count */
void mavlink_update_parameters(); void mavlink_update_parameters();
@ -711,18 +701,6 @@ private:
*/ */
int configure_streams_to_default(const char *configure_single_stream = nullptr); int configure_streams_to_default(const char *configure_single_stream = nullptr);
int message_buffer_init(int size);
void message_buffer_destroy();
int message_buffer_count();
int message_buffer_is_empty() const { return (_message_buffer.read_ptr == _message_buffer.write_ptr); }
int message_buffer_get_ptr(void **ptr, bool *is_part);
void message_buffer_mark_read(int n) { _message_buffer.read_ptr = (_message_buffer.read_ptr + n) % _message_buffer.size; }
void pass_message(const mavlink_message_t *msg); void pass_message(const mavlink_message_t *msg);
void publish_telemetry_status(); void publish_telemetry_status();