microRTPS: timesync: properly apply offsets

This commit is contained in:
TSC21 2020-03-09 13:16:24 +00:00 committed by Nuno Marques
parent 67491b473c
commit 250b6b24ab
3 changed files with 57 additions and 56 deletions

View File

@ -121,13 +121,15 @@ void RtpsTopics::publish(uint8_t topic_ID, char data_buffer[], size_t len)
st.deserialize(cdr_des);
@[ if topic == 'Timesync' or topic == 'timesync']@
_timesync->processTimesyncMsg(&st);
if (st.sys_id() == 1) {
@[ end if]@
// apply timestamp offset
_timesync->applyOffset(st.timestamp());
@[ if topic == 'Timesync' or topic == 'timesync']@
if (st.sys_id() == 1)
@[ end if]@
_timesync->subtractOffset(st.timestamp());
_@(topic)_pub.publish(&st);
@[ if topic == 'Timesync' or topic == 'timesync']@
}
@[ end if]@
}
break;
@[end for]@
@ -160,11 +162,16 @@ bool RtpsTopics::getMsg(const uint8_t topic_ID, eprosima::fastcdr::Cdr &scdr)
@[ else]@
@(topic) msg = _@(topic)_sub.getMsg();
@[ end if]@
@[ end if]@
@[ if topic == 'Timesync' or topic == 'timesync']@
if (msg.sys_id() == 0) {
@[ end if]@
// apply timestamp offset
_timesync->applyOffset(msg.timestamp());
_timesync->addOffset(msg.timestamp());
msg.serialize(scdr);
@[ if topic == 'Timesync' or topic == 'timesync']@
}
@[ end if]@
ret = true;
_@(topic)_sub.unlockMsg();
}

View File

@ -77,8 +77,6 @@ TimeSync::TimeSync()
TimeSync::~TimeSync() { stop(); }
void TimeSync::setNewOffsetCB(std::function<void(int64_t)> callback) { _notifyNewOffset = callback; }
@[if ros2_distro]@
void TimeSync::start(const Timesync_Publisher* pub) {
@[else]@
@ -138,7 +136,7 @@ bool TimeSync::addMeasurement(int64_t local_t1_ns, int64_t remote_t2_ns, int64_t
}
if (_num_samples >= WINDOW_SIZE) {
if (std::abs(measurement_offset - _offset_ns) > TRIGGER_RESET_THRESHOLD_NS) {
if (std::abs(measurement_offset - _offset_ns.load()) > TRIGGER_RESET_THRESHOLD_NS) {
_request_reset_counter++;
std::cout << std::endl << "Timesync offset outlier, discarding" << std::endl;
return false;
@ -148,24 +146,10 @@ bool TimeSync::addMeasurement(int64_t local_t1_ns, int64_t remote_t2_ns, int64_t
}
if (_num_samples == 0) {
_offset_ns = measurement_offset;
updateOffset(measurement_offset);
_skew_ns_per_sync = 0;
}
{
int64_t local_t2 = remote_t2_ns - _offset_ns;
int64_t time_there = local_t2 - local_t1_ns;
int64_t remote_t3 = local_t3_ns + _offset_ns;
int64_t time_back = remote_t3 - remote_t2_ns;
if (std::abs(time_back - time_there) > 3ll * 1000ll * 1000ll) {
std::cout << "trip there: " << time_there / 1e6f << "ms trip back: " << time_back / 1e6f
<< "ms , discarding" << std::endl;
return false;
}
}
// ignore if rtti > 10ms
if (rtti > 15ll * 1000ll * 1000ll) {
std::cout << std::endl
@ -177,9 +161,9 @@ bool TimeSync::addMeasurement(int64_t local_t1_ns, int64_t remote_t2_ns, int64_t
double alpha = ALPHA_INITIAL * (1. - schedule) + ALPHA_FINAL * schedule;
double beta = BETA_INTIIAL * (1. - schedule) + BETA_FINAL * schedule;
int64_t offset_prev = _offset_ns;
_offset_ns = static_cast<int64_t>((_skew_ns_per_sync + _offset_ns) * (1. - alpha) + measurement_offset * alpha);
_skew_ns_per_sync = static_cast<int64_t>(beta * (_offset_ns - offset_prev) + (1. - beta) * _skew_ns_per_sync);
int64_t offset_prev = _offset_ns.load();
updateOffset(static_cast<int64_t>((_skew_ns_per_sync + _offset_ns.load()) * (1. - alpha) + measurement_offset * alpha));
_skew_ns_per_sync = static_cast<int64_t>(beta * (_offset_ns.load() - offset_prev) + (1. - beta) * _skew_ns_per_sync);
_num_samples++;
@ -188,51 +172,61 @@ bool TimeSync::addMeasurement(int64_t local_t1_ns, int64_t remote_t2_ns, int64_t
@[if 1.5 <= fastrtpsgen_version <= 1.7]@
@[ if ros2_distro]@
void TimeSync::processTimesyncMsg(const @(package)::msg::dds_::Timesync_* msg) {
void TimeSync::processTimesyncMsg(@(package)::msg::dds_::Timesync_* msg) {
@[ else]@
void TimeSync::processTimesyncMsg(const timesync_* msg) {
void TimeSync::processTimesyncMsg(timesync_* msg) {
@[ end if]@
@[else]@
@[ if ros2_distro]@
void TimeSync::processTimesyncMsg(const @(package)::msg::Timesync* msg) {
void TimeSync::processTimesyncMsg(@(package)::msg::Timesync* msg) {
@[ else]@
void TimeSync::processTimesyncMsg(const timesync* msg) {
void TimeSync::processTimesyncMsg(timesync* msg) {
@[ end if]@
@[end if]@
if (msg->sys_id() == 1 && msg->seq() != _last_remote_msg_seq && msg->tc1() > 0) {
_last_remote_msg_seq = msg->seq();
if (msg->sys_id() == 1 && msg->seq() != _last_remote_msg_seq) {
if (msg->tc1() > 0) {
_last_remote_msg_seq = msg->seq();
int64_t now_time = getSystemMonoNanos();
if (!addMeasurement(msg->ts1(), msg->tc1(), getMonoRawTimeNSec())) {
std::cerr << "Offset not updated" << std::endl;
}
bool added = addMeasurement(msg->ts1(), msg->tc1(), now_time);
if (added && _notifyNewOffset) _notifyNewOffset(_offset_ns);
// std::cout << "Offset: " << _offset_ns << std::endl;
} else if (msg->tc1() == 0) {
_last_remote_msg_seq = msg->seq();
msg->timestamp() = getMonoTimeUSec();
msg->sys_id() = 0;
msg->seq()++;
msg->tc1() = getMonoRawTimeNSec();
_timesync_pub.publish(msg);
}
}
}
@[if 1.5 <= fastrtpsgen_version <= 1.7]@
@[ if ros2_distro]@
@(package)::msg::dds_::Timesync_ TimeSync::newTimesyncMsg() {
@(package)::msg::dds_::Timesync_ msg{};
@(package)::msg::dds_::Timesync_ msg{};
@[ else]@
timesync_ TimeSync::newTimesyncMsg() {
timesync_ msg{};
timesync_ msg{};
@[ end if]@
@[else]@
@[ if ros2_distro]@
@(package)::msg::Timesync TimeSync::newTimesyncMsg() {
@(package)::msg::Timesync msg{};
@(package)::msg::Timesync msg{};
@[ else]@
timesync TimeSync::newTimesyncMsg() {
timesync msg{};
timesync msg{};
@[ end if]@
@[end if]@
msg.timestamp() = getMonoTime();
applyOffset(msg.timestamp());
msg.timestamp() = getMonoTimeUSec();
msg.sys_id() = 0;
msg.seq() = _last_msg_seq;
msg.tc1() = 0;
msg.ts1() = getSystemMonoNanos();
msg.ts1() = getMonoRawTimeNSec();
_last_msg_seq++;

View File

@ -99,51 +99,51 @@ public:
void reset();
void stop();
void setNewOffsetCB(std::function<void(int64_t)> callback);
/**
* Get clock monotonic time (raw) in nanoseconds
*/
inline int64_t getSystemMonoNanos() {
inline int64_t getMonoRawTimeNSec() {
timespec t;
clock_gettime(CLOCK_MONOTONIC_RAW, &t);
return (int64_t)t.tv_sec * 1000000000ll + t.tv_nsec;
return (int64_t)t.tv_sec * 1000000000LL + t.tv_nsec;
}
/**
* Get system monotonic time in microseconds
*/
inline uint64_t getMonoTime() {
inline int64_t getMonoTimeUSec() {
timespec t;
clock_gettime(CLOCK_MONOTONIC, &t);
return (t.tv_sec * 1000000000ll + t.tv_nsec) / 1000ULL;
return (int64_t)(t.tv_sec * 1000000000LL + t.tv_nsec) / 1000LL;
}
bool addMeasurement(int64_t local_t1_ns, int64_t remote_t2_ns, int64_t local_t3_ns);
@[if 1.5 <= fastrtpsgen_version <= 1.7]@
@[ if ros2_distro]@
void processTimesyncMsg(const @(package)::msg::dds_::Timesync_* msg);
void processTimesyncMsg(@(package)::msg::dds_::Timesync_* msg);
@(package)::msg::dds_::Timesync_ newTimesyncMsg();
@[ else]@
void processTimesyncMsg(const timesync_* msg);
void processTimesyncMsg(timesync_* msg);
timesync_ newTimesyncMsg();
@[ end if]@
@[else]@
@[ if ros2_distro]@
void processTimesyncMsg(const @(package)::msg::Timesync* msg);
void processTimesyncMsg(@(package)::msg::Timesync* msg);
@(package)::msg::Timesync newTimesyncMsg();
@[ else]@
void processTimesyncMsg(const timesync* msg);
void processTimesyncMsg(timesync* msg);
timesync newTimesyncMsg();
@[ end if]@
@[end if]@
inline void applyOffset(uint64_t timestamp) { timestamp += _offset_ns.load(); }
inline int64_t getOffset() { return _offset_ns.load(); }
inline void addOffset(uint64_t& timestamp) { timestamp = (timestamp * 1000LL + _offset_ns.load()) / 10000ULL; }
inline void subtractOffset(uint64_t& timestamp) { timestamp = (timestamp * 1000LL - _offset_ns.load()) / 10000ULL; }
private:
std::atomic<int64_t> _offset_ns;
@ -165,5 +165,5 @@ private:
std::unique_ptr<std::thread> _send_timesync_thread;
std::atomic<bool> _request_stop{false};
std::function<void(int64_t)> _notifyNewOffset;
inline void updateOffset(const uint64_t& offset) { _offset_ns.store(offset, std::memory_order_relaxed); }
};