diff --git a/msg/templates/uorb_microcdr/microRTPS_client.cpp.em b/msg/templates/uorb_microcdr/microRTPS_client.cpp.em index 1ab5e56e0c..ba49bfbcb8 100644 --- a/msg/templates/uorb_microcdr/microRTPS_client.cpp.em +++ b/msg/templates/uorb_microcdr/microRTPS_client.cpp.em @@ -253,10 +253,14 @@ void micrortps_start_topics(const uint32_t &datarate, struct timespec &begin, ui while (!_should_exit_task) { @[if recv_topics]@ - while (0 < (read = transport_node->read(&topic_ID, data_buffer, BUFFER_SIZE))) { + + read = transport_node->read(); + if (read > 0) { total_rcvd += read; rx_last_sec_read += read; + } + while (transport_node->parse(&topic_ID, data_buffer, BUFFER_SIZE)) { uint64_t read_time = hrt_absolute_time(); switch (topic_ID) { diff --git a/msg/templates/urtps/microRTPS_agent.cpp.em b/msg/templates/urtps/microRTPS_agent.cpp.em index 07510f9174..224bf3dcd7 100644 --- a/msg/templates/urtps/microRTPS_agent.cpp.em +++ b/msg/templates/urtps/microRTPS_agent.cpp.em @@ -364,10 +364,14 @@ int main(int argc, char **argv) if (!receiving) { start = std::chrono::steady_clock::now(); } // Publish messages received from UART - if (0 < (length = transport_node->read(&topic_ID, data_buffer, BUFFER_SIZE))) { + length = transport_node->read(); + if (length > 0) { + total_read += length; + } + + while (transport_node->parse(&topic_ID, data_buffer, BUFFER_SIZE)) { topics->publish(topic_ID, data_buffer, sizeof(data_buffer)); ++received; - total_read += length; receiving = true; end = std::chrono::steady_clock::now(); } diff --git a/msg/templates/urtps/microRTPS_transport.cpp b/msg/templates/urtps/microRTPS_transport.cpp index 9252d26eb9..51bddadbac 100644 --- a/msg/templates/urtps/microRTPS_transport.cpp +++ b/msg/templates/urtps/microRTPS_transport.cpp @@ -113,14 +113,8 @@ uint16_t Transport_node::crc16(uint8_t const *buffer, size_t len) return crc; } -ssize_t Transport_node::read(uint8_t *topic_id, char out_buffer[], size_t buffer_len) +ssize_t Transport_node::read() { - if (nullptr == out_buffer || nullptr == topic_id || !fds_OK()) { - return -1; - } - - *topic_id = 255; - ssize_t len = node_read((void *)(_rx_buffer + _rx_buff_pos), sizeof(_rx_buffer) - _rx_buff_pos); if (len < 0) { @@ -143,85 +137,141 @@ ssize_t Transport_node::read(uint8_t *topic_id, char out_buffer[], size_t buffer _rx_buff_pos += len; - // We read some - size_t header_size = sizeof(struct Header); + return len; +} - // but not enough - if (_rx_buff_pos < header_size) { - return 0; +#ifndef PX4_DEBUG + +void Transport_node::print_buffer_debug() +{ + for (uint32_t i = 0; i < BUFFER_SIZE; ++i) { + if (i >= _rx_buff_pos) { + printf("."); + continue; + } + + printf("%" PRIi8, _rx_buffer[i]); + + if (_rx_buffer[i] == '>') { + printf("(X)"); + } + + printf(" "); } - uint32_t msg_start_pos = 0; + printf("\n_rx_buff_pos: %" PRIu32 "\n", _rx_buff_pos); +} - for (msg_start_pos = 0; msg_start_pos <= _rx_buff_pos - header_size; ++msg_start_pos) { - if ('>' == _rx_buffer[msg_start_pos] && memcmp(_rx_buffer + msg_start_pos, ">>>", 3) == 0) { +#endif /* PX4_DEBUG */ + +bool Transport_node::parse( + uint8_t *topic_id, char out_buffer[], size_t buffer_len) +{ + if (nullptr == out_buffer || nullptr == topic_id || !fds_OK()) { + return false; + } + + *topic_id = 255; + + static constexpr size_t header_size = sizeof(struct Header); + + // No need to look for a message if not even a header fits into the buffer. + if (_rx_buff_pos < header_size) { + return false; + } + + // Try to find the start of a message using the magic start sequence of '>>>'. + int32_t msg_start_pos = -1; + + for (uint32_t i = 0; i <= _rx_buff_pos; ++i) { + if ('>' == _rx_buffer[i] && memcmp(_rx_buffer + i, ">>>", 3) == 0) { + msg_start_pos = i; break; } } - // Start not found - if (msg_start_pos > (_rx_buff_pos - header_size)) { + auto DropDataBeforeIndex = [&](uint32_t index) { + if (index == 0) { + return; + } + + if (index > _rx_buff_pos) { + index = _rx_buff_pos; + #ifndef PX4_DEBUG - if (_debug) { printf("\033[1;33m[ micrortps_transport ]\t (↓↓ %" PRIu32 ")\033[0m\n", msg_start_pos); } + if (_debug) { printf("\033[0;31m[ micrortps_transport ]\t clamped index from %" PRIu32 " to %" PRIu32 "\033[0m\n", index, _rx_buff_pos); } #else - if (_debug) { PX4_DEBUG(" (↓↓ %" PRIu32 ")", msg_start_pos); } + if (_debug) { PX4_ERR("ERROR: clamped index from %" PRIu32 " to %" PRIu32 "\n", index, _rx_buff_pos); } #endif /* PX4_DEBUG */ + } - // All we've checked so far is garbage, drop it - but save unchecked bytes - memmove(_rx_buffer, _rx_buffer + msg_start_pos, _rx_buff_pos - msg_start_pos); - _rx_buff_pos -= msg_start_pos; - return -1; + memmove(_rx_buffer, _rx_buffer + index, _rx_buff_pos - index); + _rx_buff_pos -= index; + }; + + // No start sequence has been found in the buffer. We can drop all data up to + // 2 characters before the end (to account for a non-complete start sequence). + if (msg_start_pos == -1) { + // Note, _rx_buff_pos should always be larger than 2 as we enforce a min. + // buffer content corresponding to a header length some lines above. + int32_t drop_before_idx = _rx_buff_pos - 2; + DropDataBeforeIndex(drop_before_idx); + return false; } - // [>,>,>,topic_id,sys_id,seq,payload_length_H,payload_length_L,CRCHigh,CRCLow,payloadStart, ... ,payloadEnd] - struct Header *header = (struct Header *)&_rx_buffer[msg_start_pos]; - uint32_t payload_len = ((uint32_t)header->payload_len_h << 8) | header->payload_len_l; + // A start sequence has been found. We can drop everything before this index + // in the buffer. This could happen at the startup when syncing to the + // stream or with noise on the data stream. + DropDataBeforeIndex(msg_start_pos); + + // We have found a start sequence. Let's check if the header is in the buffer. + // [>,>,>,topic_id,sys_id,seq,payload_length_H,payload_length_L,CRCHigh,CRCLow,payloadStart, + // ... ,payloadEnd] + if (header_size > _rx_buff_pos) { + // Header not yet in the buffer. + return false; + } + + struct Header *header = (struct Header *)&_rx_buffer[0]; + + uint32_t payload_len = + ((uint32_t)header->payload_len_h << 8) | header->payload_len_l; + + // The message won't fit the output or processing buffer. This could happen + // with a corrupted header or by an actual message that is too large for our + // buffers. Let's drop it. + uint32_t message_length = header_size + payload_len; + + if (message_length > buffer_len || (message_length + 3) > BUFFER_SIZE) { + DropDataBeforeIndex(3 + header_size); + return false; + } + + // Let's check if all payload data according to the message length in the + // header is already in the buffer. + if ((header_size + payload_len) > _rx_buff_pos) { + // The buffer does not yet contain the full message. We need to wait for + // more data. + return false; + } // The received message comes from this system. Discard it. // This might happen when: // 1. The same UDP port is being used to send a rcv packets or - // 2. The same topic on the agent is being used for outgoing and incoming data + // 2. The same topic on the agent is being used for outgoing and incoming + // data if (header->sys_id == _sys_id) { - // Drop the message and continue with the read buffer - memmove(_rx_buffer, _rx_buffer + msg_start_pos + 1, _rx_buff_pos - (msg_start_pos + 1)); - _rx_buff_pos -= (msg_start_pos + 1); - return -1; - } - - // The message won't fit the buffer. - if (buffer_len < header_size + payload_len) { - // Drop the message and continue with the read buffer - memmove(_rx_buffer, _rx_buffer + msg_start_pos + 1, _rx_buff_pos - (msg_start_pos + 1)); - _rx_buff_pos -= (msg_start_pos + 1); - return -EMSGSIZE; - } - - // We do not have a complete message yet - if (msg_start_pos + header_size + payload_len > _rx_buff_pos) { - // If there's garbage at the beginning, drop it - if (msg_start_pos > 0) { -#ifndef PX4_DEBUG - - if (_debug) { printf("\033[1;33m[ micrortps_transport ]\t (↓ %" PRIu32 ")\033[0m\n", msg_start_pos); } - -#else - - if (_debug) { PX4_DEBUG(" (↓ %" PRIu32 ")", msg_start_pos); } - -#endif /* PX4_DEBUG */ - memmove(_rx_buffer, _rx_buffer + msg_start_pos, _rx_buff_pos - msg_start_pos); - _rx_buff_pos -= msg_start_pos; - } - - return 0; + DropDataBeforeIndex(3); + return false; } + // Check the CRC of the message. uint16_t read_crc = ((uint16_t)header->crc_h << 8) | header->crc_l; - uint16_t calc_crc = crc16((uint8_t *)_rx_buffer + msg_start_pos + header_size, payload_len); + uint16_t calc_crc = crc16((uint8_t *)_rx_buffer + header_size, payload_len); if (read_crc != calc_crc) { #ifndef PX4_DEBUG @@ -234,26 +284,16 @@ ssize_t Transport_node::read(uint8_t *topic_id, char out_buffer[], size_t buffer #endif /* PX4_DEBUG */ - // Drop garbage up just beyond the start of the message - memmove(_rx_buffer, _rx_buffer + (msg_start_pos + 1), _rx_buff_pos); - - // If there is a CRC error, the payload len cannot be trusted - _rx_buff_pos -= (msg_start_pos + 1); - - len = -1; - - } else { - // copy message to outbuffer and set other return values - memmove(out_buffer, _rx_buffer + msg_start_pos + header_size, payload_len); - *topic_id = header->topic_id; - len = payload_len + header_size; - - // discard message from _rx_buffer - _rx_buff_pos -= msg_start_pos + header_size + payload_len; - memmove(_rx_buffer, _rx_buffer + msg_start_pos + header_size + payload_len, _rx_buff_pos); + DropDataBeforeIndex(3); + return false; } - return len; + // Copy message to output buffer and drop it from the buffer. + memmove(out_buffer, _rx_buffer + header_size, payload_len); + *topic_id = header->topic_id; + + DropDataBeforeIndex(header_size + payload_len); + return true; } size_t Transport_node::get_header_length() diff --git a/msg/templates/urtps/microRTPS_transport.h b/msg/templates/urtps/microRTPS_transport.h index 3605b63f8c..a9c3567e6d 100644 --- a/msg/templates/urtps/microRTPS_transport.h +++ b/msg/templates/urtps/microRTPS_transport.h @@ -56,7 +56,9 @@ public: virtual int init() {return 0;} virtual uint8_t close() {return 0;} - ssize_t read(uint8_t *topic_id, char out_buffer[], size_t buffer_len); + + ssize_t read(); + bool parse(uint8_t *topic_id, char out_buffer[], size_t buffer_len); /** * write a buffer @@ -76,6 +78,10 @@ public: size_t get_header_length(); private: +#ifndef PX4_DEBUG + void print_buffer_debug(); +#endif /* PX4_DEBUG */ + struct __attribute__((packed)) Header { char marker[3]; uint8_t topic_id;