Robustify RTPS bridge stream parsing.

This commit is contained in:
Thomas Schneider 2022-07-04 15:40:36 +02:00 committed by Daniel Agar
parent 089fbdccc9
commit cd66a262ee
4 changed files with 136 additions and 82 deletions

View File

@ -253,10 +253,14 @@ void micrortps_start_topics(const uint32_t &datarate, struct timespec &begin, ui
while (!_should_exit_task) { while (!_should_exit_task) {
@[if recv_topics]@ @[if recv_topics]@
while (0 < (read = transport_node->read(&topic_ID, data_buffer, BUFFER_SIZE))) {
read = transport_node->read();
if (read > 0) {
total_rcvd += read; total_rcvd += read;
rx_last_sec_read += read; rx_last_sec_read += read;
}
while (transport_node->parse(&topic_ID, data_buffer, BUFFER_SIZE)) {
uint64_t read_time = hrt_absolute_time(); uint64_t read_time = hrt_absolute_time();
switch (topic_ID) { switch (topic_ID) {

View File

@ -364,10 +364,14 @@ int main(int argc, char **argv)
if (!receiving) { start = std::chrono::steady_clock::now(); } if (!receiving) { start = std::chrono::steady_clock::now(); }
// Publish messages received from UART // Publish messages received from UART
if (0 < (length = transport_node->read(&topic_ID, data_buffer, BUFFER_SIZE))) { length = transport_node->read();
if (length > 0) {
total_read += length;
}
while (transport_node->parse(&topic_ID, data_buffer, BUFFER_SIZE)) {
topics->publish(topic_ID, data_buffer, sizeof(data_buffer)); topics->publish(topic_ID, data_buffer, sizeof(data_buffer));
++received; ++received;
total_read += length;
receiving = true; receiving = true;
end = std::chrono::steady_clock::now(); end = std::chrono::steady_clock::now();
} }

View File

@ -113,14 +113,8 @@ uint16_t Transport_node::crc16(uint8_t const *buffer, size_t len)
return crc; return crc;
} }
ssize_t Transport_node::read(uint8_t *topic_id, char out_buffer[], size_t buffer_len) ssize_t Transport_node::read()
{ {
if (nullptr == out_buffer || nullptr == topic_id || !fds_OK()) {
return -1;
}
*topic_id = 255;
ssize_t len = node_read((void *)(_rx_buffer + _rx_buff_pos), sizeof(_rx_buffer) - _rx_buff_pos); ssize_t len = node_read((void *)(_rx_buffer + _rx_buff_pos), sizeof(_rx_buffer) - _rx_buff_pos);
if (len < 0) { if (len < 0) {
@ -143,85 +137,141 @@ ssize_t Transport_node::read(uint8_t *topic_id, char out_buffer[], size_t buffer
_rx_buff_pos += len; _rx_buff_pos += len;
// We read some return len;
size_t header_size = sizeof(struct Header);
// but not enough
if (_rx_buff_pos < header_size) {
return 0;
} }
uint32_t msg_start_pos = 0; #ifndef PX4_DEBUG
for (msg_start_pos = 0; msg_start_pos <= _rx_buff_pos - header_size; ++msg_start_pos) { void Transport_node::print_buffer_debug()
if ('>' == _rx_buffer[msg_start_pos] && memcmp(_rx_buffer + msg_start_pos, ">>>", 3) == 0) { {
for (uint32_t i = 0; i < BUFFER_SIZE; ++i) {
if (i >= _rx_buff_pos) {
printf(".");
continue;
}
printf("%" PRIi8, _rx_buffer[i]);
if (_rx_buffer[i] == '>') {
printf("(X)");
}
printf(" ");
}
printf("\n_rx_buff_pos: %" PRIu32 "\n", _rx_buff_pos);
}
#endif /* PX4_DEBUG */
bool Transport_node::parse(
uint8_t *topic_id, char out_buffer[], size_t buffer_len)
{
if (nullptr == out_buffer || nullptr == topic_id || !fds_OK()) {
return false;
}
*topic_id = 255;
static constexpr size_t header_size = sizeof(struct Header);
// No need to look for a message if not even a header fits into the buffer.
if (_rx_buff_pos < header_size) {
return false;
}
// Try to find the start of a message using the magic start sequence of '>>>'.
int32_t msg_start_pos = -1;
for (uint32_t i = 0; i <= _rx_buff_pos; ++i) {
if ('>' == _rx_buffer[i] && memcmp(_rx_buffer + i, ">>>", 3) == 0) {
msg_start_pos = i;
break; break;
} }
} }
// Start not found auto DropDataBeforeIndex = [&](uint32_t index) {
if (msg_start_pos > (_rx_buff_pos - header_size)) { if (index == 0) {
return;
}
if (index > _rx_buff_pos) {
index = _rx_buff_pos;
#ifndef PX4_DEBUG #ifndef PX4_DEBUG
if (_debug) { printf("\033[1;33m[ micrortps_transport ]\t (↓↓ %" PRIu32 ")\033[0m\n", msg_start_pos); } if (_debug) { printf("\033[0;31m[ micrortps_transport ]\t clamped index from %" PRIu32 " to %" PRIu32 "\033[0m\n", index, _rx_buff_pos); }
#else #else
if (_debug) { PX4_DEBUG(" (↓↓ %" PRIu32 ")", msg_start_pos); } if (_debug) { PX4_ERR("ERROR: clamped index from %" PRIu32 " to %" PRIu32 "\n", index, _rx_buff_pos); }
#endif /* PX4_DEBUG */ #endif /* PX4_DEBUG */
// All we've checked so far is garbage, drop it - but save unchecked bytes
memmove(_rx_buffer, _rx_buffer + msg_start_pos, _rx_buff_pos - msg_start_pos);
_rx_buff_pos -= msg_start_pos;
return -1;
} }
// [>,>,>,topic_id,sys_id,seq,payload_length_H,payload_length_L,CRCHigh,CRCLow,payloadStart, ... ,payloadEnd] memmove(_rx_buffer, _rx_buffer + index, _rx_buff_pos - index);
struct Header *header = (struct Header *)&_rx_buffer[msg_start_pos]; _rx_buff_pos -= index;
uint32_t payload_len = ((uint32_t)header->payload_len_h << 8) | header->payload_len_l; };
// No start sequence has been found in the buffer. We can drop all data up to
// 2 characters before the end (to account for a non-complete start sequence).
if (msg_start_pos == -1) {
// Note, _rx_buff_pos should always be larger than 2 as we enforce a min.
// buffer content corresponding to a header length some lines above.
int32_t drop_before_idx = _rx_buff_pos - 2;
DropDataBeforeIndex(drop_before_idx);
return false;
}
// A start sequence has been found. We can drop everything before this index
// in the buffer. This could happen at the startup when syncing to the
// stream or with noise on the data stream.
DropDataBeforeIndex(msg_start_pos);
// We have found a start sequence. Let's check if the header is in the buffer.
// [>,>,>,topic_id,sys_id,seq,payload_length_H,payload_length_L,CRCHigh,CRCLow,payloadStart,
// ... ,payloadEnd]
if (header_size > _rx_buff_pos) {
// Header not yet in the buffer.
return false;
}
struct Header *header = (struct Header *)&_rx_buffer[0];
uint32_t payload_len =
((uint32_t)header->payload_len_h << 8) | header->payload_len_l;
// The message won't fit the output or processing buffer. This could happen
// with a corrupted header or by an actual message that is too large for our
// buffers. Let's drop it.
uint32_t message_length = header_size + payload_len;
if (message_length > buffer_len || (message_length + 3) > BUFFER_SIZE) {
DropDataBeforeIndex(3 + header_size);
return false;
}
// Let's check if all payload data according to the message length in the
// header is already in the buffer.
if ((header_size + payload_len) > _rx_buff_pos) {
// The buffer does not yet contain the full message. We need to wait for
// more data.
return false;
}
// The received message comes from this system. Discard it. // The received message comes from this system. Discard it.
// This might happen when: // This might happen when:
// 1. The same UDP port is being used to send a rcv packets or // 1. The same UDP port is being used to send a rcv packets or
// 2. The same topic on the agent is being used for outgoing and incoming data // 2. The same topic on the agent is being used for outgoing and incoming
// data
if (header->sys_id == _sys_id) { if (header->sys_id == _sys_id) {
// Drop the message and continue with the read buffer DropDataBeforeIndex(3);
memmove(_rx_buffer, _rx_buffer + msg_start_pos + 1, _rx_buff_pos - (msg_start_pos + 1)); return false;
_rx_buff_pos -= (msg_start_pos + 1);
return -1;
}
// The message won't fit the buffer.
if (buffer_len < header_size + payload_len) {
// Drop the message and continue with the read buffer
memmove(_rx_buffer, _rx_buffer + msg_start_pos + 1, _rx_buff_pos - (msg_start_pos + 1));
_rx_buff_pos -= (msg_start_pos + 1);
return -EMSGSIZE;
}
// We do not have a complete message yet
if (msg_start_pos + header_size + payload_len > _rx_buff_pos) {
// If there's garbage at the beginning, drop it
if (msg_start_pos > 0) {
#ifndef PX4_DEBUG
if (_debug) { printf("\033[1;33m[ micrortps_transport ]\t (↓ %" PRIu32 ")\033[0m\n", msg_start_pos); }
#else
if (_debug) { PX4_DEBUG(" (↓ %" PRIu32 ")", msg_start_pos); }
#endif /* PX4_DEBUG */
memmove(_rx_buffer, _rx_buffer + msg_start_pos, _rx_buff_pos - msg_start_pos);
_rx_buff_pos -= msg_start_pos;
}
return 0;
} }
// Check the CRC of the message.
uint16_t read_crc = ((uint16_t)header->crc_h << 8) | header->crc_l; uint16_t read_crc = ((uint16_t)header->crc_h << 8) | header->crc_l;
uint16_t calc_crc = crc16((uint8_t *)_rx_buffer + msg_start_pos + header_size, payload_len); uint16_t calc_crc = crc16((uint8_t *)_rx_buffer + header_size, payload_len);
if (read_crc != calc_crc) { if (read_crc != calc_crc) {
#ifndef PX4_DEBUG #ifndef PX4_DEBUG
@ -234,26 +284,16 @@ ssize_t Transport_node::read(uint8_t *topic_id, char out_buffer[], size_t buffer
#endif /* PX4_DEBUG */ #endif /* PX4_DEBUG */
// Drop garbage up just beyond the start of the message DropDataBeforeIndex(3);
memmove(_rx_buffer, _rx_buffer + (msg_start_pos + 1), _rx_buff_pos); return false;
// If there is a CRC error, the payload len cannot be trusted
_rx_buff_pos -= (msg_start_pos + 1);
len = -1;
} else {
// copy message to outbuffer and set other return values
memmove(out_buffer, _rx_buffer + msg_start_pos + header_size, payload_len);
*topic_id = header->topic_id;
len = payload_len + header_size;
// discard message from _rx_buffer
_rx_buff_pos -= msg_start_pos + header_size + payload_len;
memmove(_rx_buffer, _rx_buffer + msg_start_pos + header_size + payload_len, _rx_buff_pos);
} }
return len; // Copy message to output buffer and drop it from the buffer.
memmove(out_buffer, _rx_buffer + header_size, payload_len);
*topic_id = header->topic_id;
DropDataBeforeIndex(header_size + payload_len);
return true;
} }
size_t Transport_node::get_header_length() size_t Transport_node::get_header_length()

View File

@ -56,7 +56,9 @@ public:
virtual int init() {return 0;} virtual int init() {return 0;}
virtual uint8_t close() {return 0;} virtual uint8_t close() {return 0;}
ssize_t read(uint8_t *topic_id, char out_buffer[], size_t buffer_len);
ssize_t read();
bool parse(uint8_t *topic_id, char out_buffer[], size_t buffer_len);
/** /**
* write a buffer * write a buffer
@ -76,6 +78,10 @@ public:
size_t get_header_length(); size_t get_header_length();
private: private:
#ifndef PX4_DEBUG
void print_buffer_debug();
#endif /* PX4_DEBUG */
struct __attribute__((packed)) Header { struct __attribute__((packed)) Header {
char marker[3]; char marker[3];
uint8_t topic_id; uint8_t topic_id;