diff --git a/msg/templates/urtps/RtpsTopics.cpp.em b/msg/templates/urtps/RtpsTopics.cpp.em index 1763d8979e..d643bcd6a0 100644 --- a/msg/templates/urtps/RtpsTopics.cpp.em +++ b/msg/templates/urtps/RtpsTopics.cpp.em @@ -121,13 +121,15 @@ void RtpsTopics::publish(uint8_t topic_ID, char data_buffer[], size_t len) st.deserialize(cdr_des); @[ if topic == 'Timesync' or topic == 'timesync']@ _timesync->processTimesyncMsg(&st); + + if (st.sys_id() == 1) { @[ end if]@ // apply timestamp offset - _timesync->applyOffset(st.timestamp()); -@[ if topic == 'Timesync' or topic == 'timesync']@ - if (st.sys_id() == 1) -@[ end if]@ + _timesync->subtractOffset(st.timestamp()); _@(topic)_pub.publish(&st); +@[ if topic == 'Timesync' or topic == 'timesync']@ + } +@[ end if]@ } break; @[end for]@ @@ -160,11 +162,16 @@ bool RtpsTopics::getMsg(const uint8_t topic_ID, eprosima::fastcdr::Cdr &scdr) @[ else]@ @(topic) msg = _@(topic)_sub.getMsg(); @[ end if]@ +@[ end if]@ +@[ if topic == 'Timesync' or topic == 'timesync']@ + if (msg.sys_id() == 0) { @[ end if]@ // apply timestamp offset - _timesync->applyOffset(msg.timestamp()); - + _timesync->addOffset(msg.timestamp()); msg.serialize(scdr); +@[ if topic == 'Timesync' or topic == 'timesync']@ + } +@[ end if]@ ret = true; _@(topic)_sub.unlockMsg(); } diff --git a/msg/templates/urtps/microRTPS_timesync.cpp.em b/msg/templates/urtps/microRTPS_timesync.cpp.em index 0e10bd37ea..a013b1ced3 100644 --- a/msg/templates/urtps/microRTPS_timesync.cpp.em +++ b/msg/templates/urtps/microRTPS_timesync.cpp.em @@ -77,8 +77,6 @@ TimeSync::TimeSync() TimeSync::~TimeSync() { stop(); } -void TimeSync::setNewOffsetCB(std::function callback) { _notifyNewOffset = callback; } - @[if ros2_distro]@ void TimeSync::start(const Timesync_Publisher* pub) { @[else]@ @@ -138,7 +136,7 @@ bool TimeSync::addMeasurement(int64_t local_t1_ns, int64_t remote_t2_ns, int64_t } if (_num_samples >= WINDOW_SIZE) { - if (std::abs(measurement_offset - _offset_ns) > TRIGGER_RESET_THRESHOLD_NS) { + if (std::abs(measurement_offset - _offset_ns.load()) > TRIGGER_RESET_THRESHOLD_NS) { _request_reset_counter++; std::cout << std::endl << "Timesync offset outlier, discarding" << std::endl; return false; @@ -148,24 +146,10 @@ bool TimeSync::addMeasurement(int64_t local_t1_ns, int64_t remote_t2_ns, int64_t } if (_num_samples == 0) { - _offset_ns = measurement_offset; + updateOffset(measurement_offset); _skew_ns_per_sync = 0; } - { - int64_t local_t2 = remote_t2_ns - _offset_ns; - int64_t time_there = local_t2 - local_t1_ns; - - int64_t remote_t3 = local_t3_ns + _offset_ns; - int64_t time_back = remote_t3 - remote_t2_ns; - - if (std::abs(time_back - time_there) > 3ll * 1000ll * 1000ll) { - std::cout << "trip there: " << time_there / 1e6f << "ms trip back: " << time_back / 1e6f - << "ms , discarding" << std::endl; - return false; - } - } - // ignore if rtti > 10ms if (rtti > 15ll * 1000ll * 1000ll) { std::cout << std::endl @@ -177,9 +161,9 @@ bool TimeSync::addMeasurement(int64_t local_t1_ns, int64_t remote_t2_ns, int64_t double alpha = ALPHA_INITIAL * (1. - schedule) + ALPHA_FINAL * schedule; double beta = BETA_INTIIAL * (1. - schedule) + BETA_FINAL * schedule; - int64_t offset_prev = _offset_ns; - _offset_ns = static_cast((_skew_ns_per_sync + _offset_ns) * (1. - alpha) + measurement_offset * alpha); - _skew_ns_per_sync = static_cast(beta * (_offset_ns - offset_prev) + (1. - beta) * _skew_ns_per_sync); + int64_t offset_prev = _offset_ns.load(); + updateOffset(static_cast((_skew_ns_per_sync + _offset_ns.load()) * (1. - alpha) + measurement_offset * alpha)); + _skew_ns_per_sync = static_cast(beta * (_offset_ns.load() - offset_prev) + (1. - beta) * _skew_ns_per_sync); _num_samples++; @@ -188,51 +172,61 @@ bool TimeSync::addMeasurement(int64_t local_t1_ns, int64_t remote_t2_ns, int64_t @[if 1.5 <= fastrtpsgen_version <= 1.7]@ @[ if ros2_distro]@ -void TimeSync::processTimesyncMsg(const @(package)::msg::dds_::Timesync_* msg) { +void TimeSync::processTimesyncMsg(@(package)::msg::dds_::Timesync_* msg) { @[ else]@ -void TimeSync::processTimesyncMsg(const timesync_* msg) { +void TimeSync::processTimesyncMsg(timesync_* msg) { @[ end if]@ @[else]@ @[ if ros2_distro]@ -void TimeSync::processTimesyncMsg(const @(package)::msg::Timesync* msg) { +void TimeSync::processTimesyncMsg(@(package)::msg::Timesync* msg) { @[ else]@ -void TimeSync::processTimesyncMsg(const timesync* msg) { +void TimeSync::processTimesyncMsg(timesync* msg) { @[ end if]@ @[end if]@ - if (msg->sys_id() == 1 && msg->seq() != _last_remote_msg_seq && msg->tc1() > 0) { - _last_remote_msg_seq = msg->seq(); + if (msg->sys_id() == 1 && msg->seq() != _last_remote_msg_seq) { + if (msg->tc1() > 0) { + _last_remote_msg_seq = msg->seq(); - int64_t now_time = getSystemMonoNanos(); + if (!addMeasurement(msg->ts1(), msg->tc1(), getMonoRawTimeNSec())) { + std::cerr << "Offset not updated" << std::endl; + } - bool added = addMeasurement(msg->ts1(), msg->tc1(), now_time); - if (added && _notifyNewOffset) _notifyNewOffset(_offset_ns); - // std::cout << "Offset: " << _offset_ns << std::endl; + } else if (msg->tc1() == 0) { + _last_remote_msg_seq = msg->seq(); + + msg->timestamp() = getMonoTimeUSec(); + msg->sys_id() = 0; + msg->seq()++; + msg->tc1() = getMonoRawTimeNSec(); + + _timesync_pub.publish(msg); + } } } @[if 1.5 <= fastrtpsgen_version <= 1.7]@ @[ if ros2_distro]@ @(package)::msg::dds_::Timesync_ TimeSync::newTimesyncMsg() { - @(package)::msg::dds_::Timesync_ msg{}; + @(package)::msg::dds_::Timesync_ msg{}; @[ else]@ timesync_ TimeSync::newTimesyncMsg() { - timesync_ msg{}; + timesync_ msg{}; @[ end if]@ @[else]@ @[ if ros2_distro]@ @(package)::msg::Timesync TimeSync::newTimesyncMsg() { - @(package)::msg::Timesync msg{}; + @(package)::msg::Timesync msg{}; @[ else]@ timesync TimeSync::newTimesyncMsg() { - timesync msg{}; + timesync msg{}; @[ end if]@ @[end if]@ - msg.timestamp() = getMonoTime(); - applyOffset(msg.timestamp()); + + msg.timestamp() = getMonoTimeUSec(); msg.sys_id() = 0; msg.seq() = _last_msg_seq; msg.tc1() = 0; - msg.ts1() = getSystemMonoNanos(); + msg.ts1() = getMonoRawTimeNSec(); _last_msg_seq++; diff --git a/msg/templates/urtps/microRTPS_timesync.h.em b/msg/templates/urtps/microRTPS_timesync.h.em index a6896b5c1c..e97041eb71 100644 --- a/msg/templates/urtps/microRTPS_timesync.h.em +++ b/msg/templates/urtps/microRTPS_timesync.h.em @@ -99,51 +99,51 @@ public: void reset(); void stop(); - void setNewOffsetCB(std::function callback); - /** * Get clock monotonic time (raw) in nanoseconds */ - inline int64_t getSystemMonoNanos() { + inline int64_t getMonoRawTimeNSec() { timespec t; clock_gettime(CLOCK_MONOTONIC_RAW, &t); - return (int64_t)t.tv_sec * 1000000000ll + t.tv_nsec; + return (int64_t)t.tv_sec * 1000000000LL + t.tv_nsec; } /** * Get system monotonic time in microseconds */ - inline uint64_t getMonoTime() { + inline int64_t getMonoTimeUSec() { timespec t; clock_gettime(CLOCK_MONOTONIC, &t); - return (t.tv_sec * 1000000000ll + t.tv_nsec) / 1000ULL; + return (int64_t)(t.tv_sec * 1000000000LL + t.tv_nsec) / 1000LL; } bool addMeasurement(int64_t local_t1_ns, int64_t remote_t2_ns, int64_t local_t3_ns); @[if 1.5 <= fastrtpsgen_version <= 1.7]@ @[ if ros2_distro]@ - void processTimesyncMsg(const @(package)::msg::dds_::Timesync_* msg); + void processTimesyncMsg(@(package)::msg::dds_::Timesync_* msg); @(package)::msg::dds_::Timesync_ newTimesyncMsg(); @[ else]@ - void processTimesyncMsg(const timesync_* msg); + void processTimesyncMsg(timesync_* msg); timesync_ newTimesyncMsg(); @[ end if]@ @[else]@ @[ if ros2_distro]@ - void processTimesyncMsg(const @(package)::msg::Timesync* msg); + void processTimesyncMsg(@(package)::msg::Timesync* msg); @(package)::msg::Timesync newTimesyncMsg(); @[ else]@ - void processTimesyncMsg(const timesync* msg); + void processTimesyncMsg(timesync* msg); timesync newTimesyncMsg(); @[ end if]@ @[end if]@ - inline void applyOffset(uint64_t timestamp) { timestamp += _offset_ns.load(); } + inline int64_t getOffset() { return _offset_ns.load(); } + inline void addOffset(uint64_t& timestamp) { timestamp = (timestamp * 1000LL + _offset_ns.load()) / 10000ULL; } + inline void subtractOffset(uint64_t& timestamp) { timestamp = (timestamp * 1000LL - _offset_ns.load()) / 10000ULL; } private: std::atomic _offset_ns; @@ -165,5 +165,5 @@ private: std::unique_ptr _send_timesync_thread; std::atomic _request_stop{false}; - std::function _notifyNewOffset; + inline void updateOffset(const uint64_t& offset) { _offset_ns.store(offset, std::memory_order_relaxed); } };