microRTPS: add timesync for the agent side

This commit is contained in:
TSC21 2020-03-04 17:43:39 +00:00 committed by Nuno Marques
parent 152427ecdc
commit 170835f3f8
11 changed files with 331 additions and 7 deletions

View File

@ -128,6 +128,7 @@ set(msg_files
tecs_status.msg
telemetry_status.msg
test_motor.msg
timesync.msg
timesync_status.msg
trajectory_bezier.msg
trajectory_waypoint.msg

View File

@ -73,7 +73,6 @@ bool RtpsTopics::init(std::condition_variable* t_send_queue_cv, std::mutex* t_se
std::cerr << "Failed starting @(topic) subscriber" << std::endl;
return false;
}
@[end for]@
std::cout << "--------------------" << std::endl << std::endl;
@[end if]@
@ -83,11 +82,13 @@ bool RtpsTopics::init(std::condition_variable* t_send_queue_cv, std::mutex* t_se
@[for topic in send_topics]@
if (_@(topic)_pub.init()) {
std::cout << "- @(topic) publisher started" << std::endl;
@[ if topic == 'Timesync']@
_timesync->start(&_@(topic)_pub);
@[ end if]@
} else {
std::cerr << "ERROR starting @(topic) publisher" << std::endl;
return false;
}
@[end for]@
std::cout << "--------------------" << std::endl;
@[end if]@
@ -118,6 +119,9 @@ void RtpsTopics::publish(uint8_t topic_ID, char data_buffer[], size_t len)
eprosima::fastcdr::FastBuffer cdrbuffer(data_buffer, len);
eprosima::fastcdr::Cdr cdr_des(cdrbuffer);
st.deserialize(cdr_des);
@[ if topic == 'Timesync']@
_timesync->processTimesyncMsg(&st);
@[ end if]@
_@(topic)_pub.publish(&st);
}
break;

View File

@ -57,6 +57,8 @@ recv_topics = [(alias[idx] if alias[idx] else s.short_name) for idx, s in enumer
#include <condition_variable>
#include <queue>
#include "microRTPS_timesync.h"
@[for topic in send_topics]@
#include "@(topic)_Publisher.h"
@[end for]@
@ -67,6 +69,7 @@ recv_topics = [(alias[idx] if alias[idx] else s.short_name) for idx, s in enumer
class RtpsTopics {
public:
bool init(std::condition_variable* t_send_queue_cv, std::mutex* t_send_queue_mutex, std::queue<uint8_t>* t_send_queue);
void set_timesync(const std::shared_ptr<TimeSync>& timesync) { _timesync = timesync; };
@[if send_topics]@
void publish(uint8_t topic_ID, char data_buffer[], size_t len);
@[end if]@
@ -88,5 +91,7 @@ private:
@(topic)_Subscriber _@(topic)_sub;
@[end for]@
std::shared_ptr<TimeSync> _timesync;
@[end if]@
};

View File

@ -70,6 +70,7 @@ recv_topics = [(alias[idx] if alias[idx] else s.short_name) for idx, s in enumer
#include <fastrtps/Domain.h>
#include "microRTPS_transport.h"
#include "microRTPS_timesync.h"
#include "RtpsTopics.h"
#define BUFFER_SIZE 1024
@ -92,6 +93,9 @@ Transport_node *transport_node = nullptr;
RtpsTopics topics;
uint32_t total_sent = 0, sent = 0;
// Init timesync
std::shared_ptr<TimeSync> timeSync = std::make_shared<TimeSync>();
struct options {
enum class eTransports
{
@ -160,6 +164,7 @@ void signal_handler(int signum)
printf("Interrupt signal (%d) received.\n", signum);
running = 0;
transport_node->close();
timeSync->stop();
}
@[if recv_topics]@
@ -188,6 +193,7 @@ void t_send(void*)
/* make room for the header to fill in later */
eprosima::fastcdr::FastBuffer cdrbuffer(&data_buffer[header_length], sizeof(data_buffer)-header_length);
eprosima::fastcdr::Cdr scdr(cdrbuffer);
if (topics.getMsg(topic_ID, scdr))
{
length = scdr.getSerializedDataLength();
@ -253,6 +259,8 @@ int main(int argc, char** argv)
std::chrono::time_point<std::chrono::steady_clock> start, end;
@[end if]@
topics.set_timesync(timeSync);
@[if recv_topics]@
topics.init(&t_send_queue_cv, &t_send_queue_mutex, &t_send_queue);
@[end if]@
@ -300,5 +308,7 @@ int main(int argc, char** argv)
delete transport_node;
transport_node = nullptr;
timeSync->reset();
return 0;
}

View File

@ -0,0 +1,180 @@
/****************************************************************************
*
* Copyright (c) 2020 PX4 Development Team. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice, this
* list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* 3. Neither the name of the copyright holder nor the names of its contributors
* may be used to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*
****************************************************************************/
/*!
* @file microRTPS_timesync.cpp
* @brief source code for time sync implementation
*/
#include <time.h>
#include <cmath>
#include <iostream>
#include "microRTPS_timesync.h"
#include "Timesync_Publisher.h"
#include "Timesync_Subscriber.h"
TimeSync::TimeSync()
: _offset_ns(-1),
_skew_ns_per_sync(0.0),
_num_samples(0),
_request_reset_counter(0),
_last_msg_seq(0)
{}
TimeSync::~TimeSync() { stop(); }
void TimeSync::setNewOffsetCB(std::function<void(int64_t)> callback) { _notifyNewOffset = callback; }
void TimeSync::start(const Timesync_Publisher* pub) {
stop();
_Timesync_pub = (*pub);
auto run = [this]() {
while (!_request_stop) {
px4_msgs::msg::Timesync msg = newTimesyncMsg();
_Timesync_pub.publish(&msg);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
};
_request_stop = false;
_send_timesync_thread.reset(new std::thread(run));
}
void TimeSync::stop() {
_request_stop = true;
if (_send_timesync_thread && _send_timesync_thread->joinable()) _send_timesync_thread->join();
_send_timesync_thread.reset();
}
void TimeSync::reset() {
_num_samples = 0;
_request_reset_counter = 0;
}
bool TimeSync::addMeasurement(int64_t local_t1_ns, int64_t remote_t2_ns, int64_t local_t3_ns) {
int64_t rtti = local_t3_ns - local_t1_ns;
// assume rtti is evenly split both directions
int64_t remote_t3_ns = remote_t2_ns + rtti / 2ll;
int64_t measurement_offset = remote_t3_ns - local_t3_ns;
if (_request_reset_counter > REQUEST_RESET_COUNTER_THRESHOLD) {
reset();
std::cout << std::endl << "Timesync clock changed, resetting" << std::endl;
}
if (_num_samples >= WINDOW_SIZE) {
if (std::abs(measurement_offset - _offset_ns) > TRIGGER_RESET_THRESHOLD_NS) {
_request_reset_counter++;
std::cout << std::endl << "Timesync offset outlier, discarding" << std::endl;
return false;
} else {
_request_reset_counter = 0;
}
}
if (_num_samples == 0) {
_offset_ns = measurement_offset;
_skew_ns_per_sync = 0;
}
{
int64_t local_t2 = remote_t2_ns - _offset_ns;
int64_t time_there = local_t2 - local_t1_ns;
int64_t remote_t3 = local_t3_ns + _offset_ns;
int64_t time_back = remote_t3 - remote_t2_ns;
if (std::abs(time_back - time_there) > 3ll * 1000ll * 1000ll) {
std::cout << "trip there: " << time_there / 1e6f << "ms trip back: " << time_back / 1e6f
<< "ms , discarding" << std::endl;
return false;
}
}
// ignore if rtti > 10ms
if (rtti > 15ll * 1000ll * 1000ll) {
std::cout << std::endl
<< "RTTI too high for timesync: " << rtti / (1000ll * 1000ll) << "ms" << std::endl;
return false;
}
double schedule = std::min((double)_num_samples / WINDOW_SIZE, 1.);
double alpha = ALPHA_INITIAL * (1. - schedule) + ALPHA_FINAL * schedule;
double beta = BETA_INTIIAL * (1. - schedule) + BETA_FINAL * schedule;
int64_t offset_prev = _offset_ns;
_offset_ns = static_cast<int64_t>((_skew_ns_per_sync + _offset_ns) * (1. - alpha) + measurement_offset * alpha);
_skew_ns_per_sync = static_cast<int64_t>(beta * (_offset_ns - offset_prev) + (1. - beta) * _skew_ns_per_sync);
_num_samples++;
return true;
}
void TimeSync::processTimesyncMsg(const px4_msgs::msg::Timesync* msg) {
if (msg->seq() == _last_msg_seq) return;
_last_msg_seq = msg->seq();
int64_t now_time = getSystemMonoNanos();
if (msg->tc1() == 0) {
px4_msgs::msg::Timesync reply = (*msg);
reply.timestamp() = getSystemTime();
reply.seq() = _last_msg_seq;
reply.tc1() = now_time;
_Timesync_pub.publish(&reply);
} else if (msg->tc1() > 0) {
bool added = addMeasurement(msg->ts1(), msg->tc1(), now_time);
if (added && _notifyNewOffset) _notifyNewOffset(_offset_ns);
}
}
px4_msgs::msg::Timesync TimeSync::newTimesyncMsg() {
px4_msgs::msg::Timesync msg{};
msg.timestamp() = getSystemTime();
msg.seq() = _last_msg_seq;
msg.tc1() = 0;
msg.ts1() = getSystemMonoNanos();
_last_msg_seq++;
return msg;
}

View File

@ -0,0 +1,108 @@
/****************************************************************************
*
* Copyright (c) 2020 PX4 Development Team. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice, this
* list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* 3. Neither the name of the copyright holder nor the names of its contributors
* may be used to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*
****************************************************************************/
/*!
* @file microRTPS_timesync.h
* @brief Adds time sync for the microRTPS bridge
* @author Nuno Marques <nuno.marques@dronesolutions.io>
* @author Julian Kent <julian@auterion.com>
*/
#pragma once
#include <atomic>
#include <functional>
#include <thread>
#include "Timesync_Publisher.h"
#include "Timesync_Subscriber.h"
static constexpr double ALPHA_INITIAL = 0.05;
static constexpr double ALPHA_FINAL = 0.003;
static constexpr double BETA_INTIIAL = 0.05;
static constexpr double BETA_FINAL = 0.003;
static constexpr int WINDOW_SIZE = 500;
static constexpr int64_t UNKNOWN = 0;
static constexpr int64_t TRIGGER_RESET_THRESHOLD_NS = 100ll * 1000ll * 1000ll;
static constexpr int REQUEST_RESET_COUNTER_THRESHOLD = 5;
class TimeSync {
public:
TimeSync();
virtual ~TimeSync();
void start(const Timesync_Publisher* pub);
void reset();
void stop();
void setNewOffsetCB(std::function<void(int64_t)> callback);
/**
* Returns clock monotonic time in nanoseconds
*/
inline int64_t getSystemMonoNanos() {
timespec t;
clock_gettime(CLOCK_MONOTONIC_RAW, &t);
return (int64_t)t.tv_sec * 1000000000ll + t.tv_nsec;
}
/**
* Returns system realtime in seconds
*/
inline uint64_t getSystemTime() {
timespec t;
clock_gettime(CLOCK_REALTIME, &t);
return (uint64_t)t.tv_sec + t.tv_nsec;
}
bool addMeasurement(int64_t local_t1_ns, int64_t remote_t2_ns, int64_t local_t3_ns);
void processTimesyncMsg(const px4_msgs::msg::Timesync* msg);
px4_msgs::msg::Timesync newTimesyncMsg();
private:
int64_t _offset_ns;
int64_t _skew_ns_per_sync;
int64_t _num_samples;
int32_t _request_reset_counter;
uint8_t _last_msg_seq;
Timesync_Publisher _Timesync_pub;
Timesync_Subscriber _Timesync_sub;
std::unique_ptr<std::thread> _send_timesync_thread;
std::atomic<bool> _request_stop{false};
std::function<void(int64_t)> _notifyNewOffset;
};

View File

@ -1,4 +1,8 @@
uint64 timestamp # time since system start (microseconds)
uint8 seq # timesync msg sequence
int64 tc1 # time sync timestamp 1
int64 ts1 # time sync timestamp 2
uint64 remote_timestamp # remote system timestamp (microseconds)
int64 observed_offset # raw time offset directly observed from this timesync packet (microseconds)
int64 estimated_offset # smoothed time offset between companion system and PX4 (microseconds)

View File

@ -416,6 +416,8 @@ def generate_agent(out_dir):
shutil.rmtree(os.path.join(out_dir, "fastrtpsgen"))
cp_wildcard(os.path.join(urtps_templates_dir,
"microRTPS_transport.*"), agent_out_dir)
cp_wildcard(os.path.join(urtps_templates_dir,
"microRTPS_timesync.*"), agent_out_dir)
if cmakelists:
os.rename(os.path.join(out_dir, "microRTPS_agent_CMakeLists.txt"),
os.path.join(out_dir, "CMakeLists.txt"))
@ -457,6 +459,12 @@ def generate_client(out_dir):
if os.path.isfile(def_file):
os.rename(def_file, def_file.replace(".cpp", ".cpp_"))
def_file = os.path.join(default_client_out, "microRTPS_transport.h")
if os.path.isfile(def_file):
os.rename(def_file, def_file.replace(".cpp", ".cpp_"))
def_file = os.path.join(default_client_out, "microRTPS_timesync.cpp")
if os.path.isfile(def_file):
os.rename(def_file, def_file.replace(".cpp", ".cpp_"))
def_file = os.path.join(default_client_out, "microRTPS_timesync.h")
if os.path.isfile(def_file):
os.rename(def_file, def_file.replace(".h", ".h_"))
@ -466,6 +474,8 @@ def generate_client(out_dir):
# Final steps to install client
cp_wildcard(os.path.join(urtps_templates_dir,
"microRTPS_transport.*"), out_dir)
cp_wildcard(os.path.join(urtps_templates_dir,
"microRTPS_timesync.*"), out_dir)
return 0

View File

@ -177,8 +177,10 @@ rtps:
id: 76
- msg: test_motor
id: 77
- msg: timesync_status
- msg: timesync
id: 78
receive: true
send: true
- msg: trajectory_waypoint
id: 79
receive: true

View File

@ -138,7 +138,7 @@ MavlinkTimesync::handle_message(const mavlink_message_t *msg)
}
// Publish status message
timesync_status_s tsync_status{};
timesync_s tsync_status{};
tsync_status.timestamp = hrt_absolute_time();
tsync_status.remote_timestamp = tsync.tc1 / 1000ULL;
@ -146,7 +146,7 @@ MavlinkTimesync::handle_message(const mavlink_message_t *msg)
tsync_status.estimated_offset = (int64_t)_time_offset;
tsync_status.round_trip_time = rtt_us;
_timesync_status_pub.publish(tsync_status);
_timesync_pub.publish(tsync_status);
}
break;

View File

@ -43,7 +43,7 @@
#include "mavlink_bridge_header.h"
#include <uORB/PublicationMulti.hpp>
#include <uORB/topics/timesync_status.h>
#include <uORB/topics/timesync.h>
#include <drivers/drv_hrt.h>
@ -132,7 +132,7 @@ protected:
*/
void reset_filter();
uORB::PublicationMulti<timesync_status_s> _timesync_status_pub{ORB_ID(timesync_status)};
uORB::PublicationMulti<timesync_s> _timesync_pub{ORB_ID(timesync)};
uint32_t _sequence{0};