microRTPS: allow timesync using ROS time

This commit is contained in:
TSC21 2021-07-06 13:19:14 +02:00 committed by Nuno Marques
parent e762d57222
commit 6d4f65a47a
4 changed files with 169 additions and 45 deletions

View File

@ -113,14 +113,14 @@ void RtpsTopics::publish(const uint8_t topic_ID, char data_buffer[], size_t len)
if (getMsgSysID(&st) == 1) {
@[ end if]@
// apply timestamp offset
uint64_t timestamp = getMsgTimestamp(&st);
uint64_t timestamp_sample = getMsgTimestampSample(&st);
_timesync->subtractOffset(timestamp);
setMsgTimestamp(&st, timestamp);
_timesync->subtractOffset(timestamp_sample);
setMsgTimestampSample(&st, timestamp_sample);
_@(topic)_pub.publish(&st);
// apply timestamp offset
uint64_t timestamp = getMsgTimestamp(&st);
uint64_t timestamp_sample = getMsgTimestampSample(&st);
_timesync->subtractOffset(timestamp);
setMsgTimestamp(&st, timestamp);
_timesync->subtractOffset(timestamp_sample);
setMsgTimestampSample(&st, timestamp_sample);
_@(topic)_pub.publish(&st);
@[ if topic == 'Timesync' or topic == 'timesync']@
}
@ -152,15 +152,15 @@ bool RtpsTopics::getMsg(const uint8_t topic_ID, eprosima::fastcdr::Cdr &scdr)
if (getMsgSysID(&msg) == 0) {
@[ end if]@
// apply timestamps offset
uint64_t timestamp = getMsgTimestamp(&msg);
uint64_t timestamp_sample = getMsgTimestampSample(&msg);
_timesync->addOffset(timestamp);
setMsgTimestamp(&msg, timestamp);
_timesync->addOffset(timestamp_sample);
setMsgTimestampSample(&msg, timestamp_sample);
msg.serialize(scdr);
ret = true;
// apply timestamps offset
uint64_t timestamp = getMsgTimestamp(&msg);
uint64_t timestamp_sample = getMsgTimestampSample(&msg);
_timesync->addOffset(timestamp);
setMsgTimestamp(&msg, timestamp);
_timesync->addOffset(timestamp_sample);
setMsgTimestampSample(&msg, timestamp_sample);
msg.serialize(scdr);
ret = true;
@[ if topic == 'Timesync' or topic == 'timesync']@
}

View File

@ -16,6 +16,11 @@ import genmsg.msgs
from px_generate_uorb_topic_helper import * # this is in Tools/
from px_generate_uorb_topic_files import MsgScope # this is in Tools/
try:
ros2_distro = ros2_distro[0].decode("utf-8")
except AttributeError:
ros2_distro = ros2_distro[0]
send_topics = [(alias[idx] if alias[idx] else s.short_name) for idx, s in enumerate(spec) if scope[idx] == MsgScope.SEND]
recv_topics = [(alias[idx] if alias[idx] else s.short_name) for idx, s in enumerate(spec) if scope[idx] == MsgScope.RECEIVE]
}@
@ -54,6 +59,7 @@ recv_topics = [(alias[idx] if alias[idx] else s.short_name) for idx, s in enumer
#include <thread>
#include <atomic>
#include <getopt.h>
#include <unistd.h>
#include <poll.h>
#include <chrono>
@ -72,6 +78,10 @@ recv_topics = [(alias[idx] if alias[idx] else s.short_name) for idx, s in enumer
#include "microRTPS_timesync.h"
#include "RtpsTopics.h"
@[if ros2_distro]@
#include <rclcpp/rclcpp.hpp>
@[end if]@
// Default values
#define DEVICE "/dev/ttyACM0"
#define SLEEP_US 1
@ -86,8 +96,8 @@ using namespace eprosima;
using namespace eprosima::fastrtps;
volatile sig_atomic_t running = 1;
Transport_node *transport_node = nullptr;
RtpsTopics topics;
std::unique_ptr<Transport_node> transport_node;
std::unique_ptr<RtpsTopics> topics;
uint32_t total_sent = 0, sent = 0;
struct options {
@ -129,13 +139,30 @@ static void usage(const char *name)
static int parse_options(int argc, char **argv)
{
static const struct option options[] = {
{"baudrate", required_argument, NULL, 'b'},
{"device", required_argument, NULL, 'd'},
{"sw-flow-control", no_argument, NULL, 'f'},
{"hw-flow-control", no_argument, NULL, 'g'},
{"ip-address", required_argument, NULL, 'i'},
{"namespace", required_argument, NULL, 'n'},
{"poll-ms", required_argument, NULL, 'p'},
{"reception-port", required_argument, NULL, 'r'},
{"sending-port", required_argument, NULL, 's'},
{"transport", required_argument, NULL, 't'},
{"increase-verbosity", no_argument, NULL, 'v'},
{"sleep-time-us", required_argument, NULL, 'w'},
{"ros-args", required_argument, NULL, 0},
{"help", no_argument, NULL, 'h'},
{NULL, 0, NULL, 0}};
int ch;
while ((ch = getopt(argc, argv, "t:d:w:b:p:r:s:i:fhvn:")) != EOF) {
while ((ch = getopt_long(argc, argv, "t:d:w:b:o:r:s:i:fhvn:", options, nullptr)) >= 0) {
switch (ch) {
case 't': _options.transport = strcmp(optarg, "UDP") == 0 ?
options::eTransports::UDP
: options::eTransports::UART; break;
options::eTransports::UDP
: options::eTransports::UART; break;
case 'd': if (nullptr != optarg) strcpy(_options.device, optarg); break;
@ -143,7 +170,7 @@ static int parse_options(int argc, char **argv)
case 'b': _options.baudrate = strtoul(optarg, nullptr, 10); break;
case 'p': _options.poll_ms = strtol(optarg, nullptr, 10); break;
case 'o': _options.poll_ms = strtol(optarg, nullptr, 10); break;
case 'r': _options.recv_port = strtoul(optarg, nullptr, 10); break;
@ -153,15 +180,21 @@ static int parse_options(int argc, char **argv)
case 'f': _options.sw_flow_control = true; break;
case 'h': _options.hw_flow_control = true; break;
case 'g': _options.hw_flow_control = true; break;
case 'h': usage(argv[0]); exit(0); break;
case 'v': _options.verbose_debug = true; break;
case 'n': if (nullptr != optarg) _options.ns = std::string(optarg) + "/"; break;
default:
@[if ros2_distro]@
break;
@[else]@
usage(argv[0]);
return -1;
@[end if]@
}
}
@ -207,7 +240,7 @@ void t_send(void *)
eprosima::fastcdr::Cdr scdr(cdrbuffer);
if (!exit_sender_thread) {
if (topics.getMsg(topic_ID, scdr)) {
if (topics->getMsg(topic_ID, scdr)) {
length = scdr.getSerializedDataLength();
if (0 < (length = transport_node->write(topic_ID, data_buffer, length))) {
@ -234,6 +267,13 @@ int main(int argc, char **argv)
return -1;
}
topics = std::make_unique<RtpsTopics>();
@[if ros2_distro]@
// Initialize communications via the rmw implementation and set up a global signal handler.
rclcpp::init(argc, argv, rclcpp::InitOptions());
@[end if]@
// register signal SIGINT and signal handler
signal(SIGINT, signal_handler);
@ -248,7 +288,7 @@ int main(int argc, char **argv)
switch (_options.transport) {
case options::eTransports::UART: {
transport_node = new UART_node(_options.device, _options.baudrate, _options.poll_ms,
transport_node = std::make_unique<UART_node>(_options.device, _options.baudrate, _options.poll_ms,
_options.sw_flow_control, _options.hw_flow_control, _options.verbose_debug);
printf("[ micrortps_agent ]\tUART transport: device: %s; baudrate: %d; sleep: %dus; poll: %dms; flow_control: %s\n",
_options.device, _options.baudrate, _options.sleep_us, _options.poll_ms,
@ -257,7 +297,7 @@ int main(int argc, char **argv)
break;
case options::eTransports::UDP: {
transport_node = new UDP_node(_options.ip, _options.recv_port, _options.send_port, _options.verbose_debug);
transport_node = std::make_unique<UDP_node>(_options.ip, _options.recv_port, _options.send_port, _options.verbose_debug);
printf("[ micrortps_agent ]\tUDP transport: ip address: %s; recv port: %u; send port: %u; sleep: %dus\n",
_options.ip, _options.recv_port, _options.send_port, _options.sleep_us);
}
@ -285,10 +325,10 @@ int main(int argc, char **argv)
@[end if]@
// Init timesync
topics.set_timesync(std::make_shared<TimeSync>(_options.verbose_debug));
topics->set_timesync(std::make_shared<TimeSync>(_options.verbose_debug));
@[if recv_topics]@
topics.init(&t_send_queue_cv, &t_send_queue_mutex, &t_send_queue, _options.ns);
topics->init(&t_send_queue_cv, &t_send_queue_mutex, &t_send_queue, _options.ns);
@[end if]@
running = true;
@ -303,7 +343,7 @@ int main(int argc, char **argv)
// Publish messages received from UART
if (0 < (length = transport_node->read(&topic_ID, data_buffer, BUFFER_SIZE))) {
topics.publish(topic_ID, data_buffer, sizeof(data_buffer));
topics->publish(topic_ID, data_buffer, sizeof(data_buffer));
++received;
total_read += length;
receiving = true;
@ -332,8 +372,11 @@ int main(int argc, char **argv)
}
@[end if]@
delete transport_node;
transport_node = nullptr;
@[if ros2_distro]@
rclcpp::shutdown();
@[end if]@
transport_node.reset();
topics.reset();
return 0;
}

View File

@ -68,6 +68,9 @@ except AttributeError:
TimeSync::TimeSync(bool debug)
: _offset_ns(-1),
@[if ros2_distro]@
_timesync_node(std::make_shared<rclcpp::Node>("timesync_node")),
@[end if]@
_skew_ns_per_sync(0.0),
_num_samples(0),
_request_reset_counter(0),
@ -82,6 +85,12 @@ void TimeSync::start(TimesyncPublisher *pub)
{
stop();
@[if ros2_distro]@
auto spin_node = [this]() {
rclcpp::spin(_timesync_node);
};
@[end if]@
auto run = [this, pub]() {
while (!_request_stop) {
timesync_msg_t msg = newTimesyncMsg();
@ -92,6 +101,9 @@ void TimeSync::start(TimesyncPublisher *pub)
}
};
_request_stop = false;
@[if ros2_distro]@
_timesync_node_thread.reset(new std::thread(spin_node));
@[end if]@
_send_timesync_thread.reset(new std::thread(run));
}
@ -99,6 +111,9 @@ void TimeSync::stop()
{
_request_stop = true;
@[if ros2_distro]@
if (_timesync_node_thread && _timesync_node_thread->joinable()) { _timesync_node_thread->join(); }
@[end if]@
if (_send_timesync_thread && _send_timesync_thread->joinable()) { _send_timesync_thread->join(); }
}
@ -108,17 +123,29 @@ void TimeSync::reset()
_request_reset_counter = 0;
}
int64_t TimeSync::getTimeNSec()
@[if ros2_distro]@
int64_t TimeSync::getROSTimeNSec()
{
return _timesync_node->now().nanoseconds();
}
int64_t TimeSync::getROSTimeUSec()
{
return RCL_NS_TO_US(getROSTimeNSec());
}
@[else]@
int64_t TimeSync::getSteadyTimeNSec()
{
auto time = std::chrono::steady_clock::now();
return std::chrono::time_point_cast<std::chrono::nanoseconds>(time).time_since_epoch().count();
}
int64_t TimeSync::getTimeUSec()
int64_t TimeSync::getSteadyTimeUSec()
{
auto time = std::chrono::steady_clock::now();
return std::chrono::time_point_cast<std::chrono::microseconds>(time).time_since_epoch().count();
}
@[end if]@
bool TimeSync::addMeasurement(int64_t local_t1_ns, int64_t remote_t2_ns, int64_t local_t3_ns)
{
@ -187,15 +214,27 @@ void TimeSync::processTimesyncMsg(timesync_msg_t *msg, TimesyncPublisher *pub)
_last_remote_msg_seq = getMsgSeq(msg);
if (getMsgTC1(msg) > 0) {
if (!addMeasurement(getMsgTS1(msg), getMsgTC1(msg), getTimeNSec())) {
@[if ros2_distro]@
if (!addMeasurement(getMsgTS1(msg), getMsgTC1(msg), getROSTimeNSec())) {
@[else]@
if (!addMeasurement(getMsgTS1(msg), getMsgTC1(msg), getSteadyTimeNSec())) {
@[end if]@
if (_debug) { std::cerr << "\033[1;33m[ micrortps__timesync ]\tOffset not updated\033[0m" << std::endl; }
}
} else if (getMsgTC1(msg) == 0) {
setMsgTimestamp(msg, getTimeUSec());
@[if ros2_distro]@
setMsgTimestamp(msg, getROSTimeUSec());
@[else]@
setMsgTimestamp(msg, getSteadyTimeUSec());
@[end if]@
setMsgSysID(msg, 0);
setMsgSeq(msg, getMsgSeq(msg) + 1);
setMsgTC1(msg, getTimeNSec());
@[if ros2_distro]@
setMsgTC1(msg, getROSTimeNSec());
@[else]@
setMsgTC1(msg, getSteadyTimeNSec());
@[end if]@
pub->publish(msg);
}
@ -206,11 +245,19 @@ timesync_msg_t TimeSync::newTimesyncMsg()
{
timesync_msg_t msg{};
setMsgTimestamp(&msg, getTimeUSec());
@[if ros2_distro]@
setMsgTimestamp(&msg, getROSTimeUSec());
@[else]@
setMsgTimestamp(&msg, getSteadyTimeUSec());
@[end if]@
setMsgSysID(&msg, 0);
setMsgSeq(&msg, _last_msg_seq);
setMsgTC1(&msg, 0);
setMsgTS1(&msg, getTimeNSec());
@[if ros2_distro]@
setMsgTS1(&msg, getROSTimeNSec());
@[else]@
setMsgTS1(&msg, getSteadyTimeNSec());
@[end if]@
_last_msg_seq++;

View File

@ -71,6 +71,10 @@ except AttributeError:
@[if ros2_distro]@
#include "Timesync_Publisher.h"
#include <rcl/time.h>
#include <rclcpp/clock.hpp>
#include <rclcpp/rclcpp.hpp>
@[else]@
#include "timesync_Publisher.h"
@[end if]@
@ -127,17 +131,36 @@ public:
*/
void stop();
@[if ros2_distro]@
/**
* @@brief Get clock monotonic time (raw) in nanoseconds
* @@return System CLOCK_MONOTONIC time in nanoseconds
* @@brief Get ROS time in nanoseconds. This will match the system time, which
* corresponds to the system-wide real time since epoch. If use_sim_time
* is set, the simulation time is grabbed by the node and used instead
* More info about ROS2 clock and time in:
* https://design.ros2.org/articles/clock_and_time.html
* @@return ROS time in nanoseconds
*/
static int64_t getTimeNSec();
int64_t getROSTimeNSec();
/**
* @@brief Get system monotonic time in microseconds
* @@return System CLOCK_MONOTONIC time in microseconds
* @@brief Get ROS time in microseconds. Fetches the time from getROSTimeNSec()
* and converts it to microseconds
* @@return ROS time in microseconds
*/
static int64_t getTimeUSec();
int64_t getROSTimeUSec();
@[else]@
/**
* @@brief Get clock monotonic time (raw) in nanoseconds
* @@return Steady CLOCK_MONOTONIC time in nanoseconds
*/
int64_t getSteadyTimeNSec();
/**
* @@brief Get clock monotonic time (raw) in microseconds
* @@return Steady CLOCK_MONOTONIC time in microseconds
*/
int64_t getSteadyTimeUSec();
@[end if]@
/**
* @@brief Adds a time offset measurement to be filtered
@ -180,6 +203,14 @@ public:
private:
std::atomic<int64_t> _offset_ns;
@[if ros2_distro]@
/**
* @@brief A ROS2 node to fetch the ROS time to be used for timesync
*/
std::shared_ptr<rclcpp::Node> _timesync_node;
@[end if]@
int64_t _skew_ns_per_sync;
int64_t _num_samples;
@ -190,6 +221,9 @@ private:
bool _debug;
std::unique_ptr<std::thread> _send_timesync_thread;
@[if ros2_distro]@
std::unique_ptr<std::thread> _timesync_node_thread;
@[end if]@
std::atomic<bool> _request_stop{false};
/**