mavlink ulog streaming: add rate limiting

This limits the maximum bandwidth of ulog streaming to 70% of the specified
mavlink datarate. If less is used, the leftover is assigned to the mavlink
streams, if more is used, it starts to drop.

mavlink status outputs the currently used rate, to check if a link is
saturated.
This commit is contained in:
Beat Küng 2016-10-17 11:36:47 +02:00 committed by Lorenz Meier
parent 8f5656f033
commit 318c970477
4 changed files with 54 additions and 9 deletions

View File

@ -1601,8 +1601,13 @@ Mavlink::update_rate_mult()
}
}
float mavlink_ulog_streaming_rate_inv = 1.0f;
if (_mavlink_ulog) {
mavlink_ulog_streaming_rate_inv = 1.f - _mavlink_ulog->current_data_rate();
}
/* scale up and down as the link permits */
float bandwidth_mult = (float)(_datarate - const_rate) / rate;
float bandwidth_mult = (float)(_datarate * mavlink_ulog_streaming_rate_inv - const_rate) / rate;
/* if we do not have flow control, limit to the set data rate */
if (!get_flow_control_enabled()) {
@ -2480,6 +2485,10 @@ Mavlink::display_status()
printf("\ttxerr: %.3f kB/s\n", (double)_rate_txerr);
printf("\trx: %.3f kB/s\n", (double)_rate_rx);
printf("\trate mult: %.3f\n", (double)_rate_mult);
if (_mavlink_ulog) {
printf("\tULog rate: %.1f%% of max %.1f%%\n", (double)_mavlink_ulog->current_data_rate()*100.,
(double)_mavlink_ulog->maximum_data_rate()*100.);
}
printf("\taccepting commands: %s\n", (accepting_commands()) ? "YES" : "NO");
printf("\tMAVLink version: %i\n", _protocol_version);

View File

@ -440,7 +440,7 @@ public:
MavlinkULog *get_ulog_streaming() { return _mavlink_ulog; }
void try_start_ulog_streaming(uint8_t target_system, uint8_t target_component) {
if (_mavlink_ulog) { return; }
_mavlink_ulog = MavlinkULog::try_start(target_system, target_component);
_mavlink_ulog = MavlinkULog::try_start(_datarate, 0.7f, target_system, target_component);
}
void request_stop_ulog_streaming() {
if (_mavlink_ulog) { _mavlink_ulog_stop_requested = true; }

View File

@ -41,14 +41,20 @@
#include "mavlink_ulog.h"
#include <px4_log.h>
#include <errno.h>
#include <mathlib/mathlib.h>
bool MavlinkULog::_init = false;
MavlinkULog *MavlinkULog::_instance = nullptr;
sem_t MavlinkULog::_lock;
const float MavlinkULog::_rate_calculation_delta_t = 0.1f;
MavlinkULog::MavlinkULog(uint8_t target_system, uint8_t target_component)
: _target_system(target_system), _target_component(target_component)
MavlinkULog::MavlinkULog(int datarate, float max_rate_factor, uint8_t target_system, uint8_t target_component)
: _target_system(target_system), _target_component(target_component),
_max_rate_factor(max_rate_factor),
_max_num_messages(math::max(1, (int)ceilf(_rate_calculation_delta_t * _max_rate_factor * datarate /
(MAVLINK_MSG_ID_LOGGING_DATA_LEN + MAVLINK_NUM_NON_PAYLOAD_BYTES)))),
_current_rate_factor(max_rate_factor)
{
_ulog_stream_sub = orb_subscribe(ORB_ID(ulog_stream));
@ -57,6 +63,7 @@ MavlinkULog::MavlinkULog(uint8_t target_system, uint8_t target_component)
}
_waiting_for_initial_ack = true;
_last_sent_time = hrt_absolute_time(); //(ab)use this timestamp during initialization
_next_rate_check = _last_sent_time + _rate_calculation_delta_t * 1.e6f;
}
MavlinkULog::~MavlinkULog()
@ -123,7 +130,7 @@ int MavlinkULog::handle_update(mavlink_channel_t channel)
bool updated = false;
int ret = orb_check(_ulog_stream_sub, &updated);
while (updated && !ret) {
while (updated && !ret && _current_num_msgs < _max_num_messages) {
orb_copy(ORB_ID(ulog_stream), _ulog_stream_sub, &_ulog_data);
if (_ulog_data.flags & ulog_stream_s::FLAGS_NEED_ACK) {
_sent_tries = 1;
@ -152,9 +159,24 @@ int MavlinkULog::handle_update(mavlink_channel_t channel)
memcpy(msg.data, _ulog_data.data, sizeof(msg.data));
mavlink_msg_logging_data_send_struct(channel, &msg);
}
++_current_num_msgs;
ret = orb_check(_ulog_stream_sub, &updated);
}
//need to update the rate?
hrt_abstime t = hrt_absolute_time();
if (t > _next_rate_check) {
if (_current_num_msgs < _max_num_messages) {
_current_rate_factor = _max_rate_factor * (float)_current_num_msgs / _max_num_messages;
} else {
_current_rate_factor = _max_rate_factor;
}
_current_num_msgs = 0;
_next_rate_check = t + _rate_calculation_delta_t * 1.e6f;
PX4_DEBUG("current rate=%.3f (max=%i msgs in %.3fs)", (double)_current_rate_factor, _max_num_messages,
(double)_rate_calculation_delta_t);
}
return 0;
}
@ -167,13 +189,13 @@ void MavlinkULog::initialize()
_init = true;
}
MavlinkULog* MavlinkULog::try_start(uint8_t target_system, uint8_t target_component)
MavlinkULog* MavlinkULog::try_start(int datarate, float max_rate_factor, uint8_t target_system, uint8_t target_component)
{
MavlinkULog *ret = nullptr;
bool failed = false;
lock();
if (!_instance) {
ret = _instance = new MavlinkULog(target_system, target_component);
ret = _instance = new MavlinkULog(datarate, max_rate_factor, target_system, target_component);
if (!_instance) {
failed = true;
}

View File

@ -66,9 +66,13 @@ public:
/**
* try to start a new stream. This fails if a stream is already running.
* thread-safe
* @param datarate maximum link data rate in B/s
* @param max_rate_factor let ulog streaming use a maximum of max_rate_factor * datarate
* @param target_system ID for mavlink message
* @param target_component ID for mavlink message
* @return instance, or nullptr
*/
static MavlinkULog *try_start(uint8_t target_system, uint8_t target_component);
static MavlinkULog *try_start(int datarate, float max_rate_factor, uint8_t target_system, uint8_t target_component);
/**
* stop the stream. It also deletes the singleton object, so make sure cleanup
@ -89,9 +93,13 @@ public:
/** this is called when we got an vehicle_command_ack from the logger */
void start_ack_received();
float current_data_rate() const { return _current_rate_factor; }
float maximum_data_rate() const { return _max_rate_factor; }
int get_ulog_stream_fd() const { return _ulog_stream_sub; }
private:
MavlinkULog(uint8_t target_system, uint8_t target_component);
MavlinkULog(int datarate, float max_rate_factor, uint8_t target_system, uint8_t target_component);
~MavlinkULog();
@ -110,6 +118,7 @@ private:
static sem_t _lock;
static bool _init;
static MavlinkULog *_instance;
static const float _rate_calculation_delta_t; ///< rate update interval
int _ulog_stream_sub = -1;
orb_advert_t _ulog_stream_ack_pub = nullptr;
@ -122,6 +131,11 @@ private:
const uint8_t _target_system;
const uint8_t _target_component;
const float _max_rate_factor; ///< maximum rate percentage at which we're allowed to push data
const int _max_num_messages; ///< maximum number of messages we can send within _rate_calculation_delta_t
float _current_rate_factor; ///< currently used rate percentage
int _current_num_msgs = 0; ///< number of messages sent within the current time interval
hrt_abstime _next_rate_check; ///< next timestamp at which to update the rate
/* do not allow copying this class */
MavlinkULog(const MavlinkULog &) = delete;