improved sync algo. and packet delay estimation log.

This commit is contained in:
vivek-shankar 2018-07-28 21:03:39 -04:00
parent 145598f18c
commit 3853479d63
3 changed files with 206 additions and 121 deletions

View File

@ -34,6 +34,18 @@ struct rb_struct
}; };
typedef struct rb_struct RB_struct; typedef struct rb_struct RB_struct;
struct neiMsg
{
uint8_t* msg;
uint16_t msgid;
double time_received;
double time_sent;
neiMsg(uint8_t* m, uint16_t mid, double tr, double ts)
: msg(m), msgid(mid), time_received(tr), time_sent(ts) {};
neiMsg();
};
typedef struct pos_struct nei_msg_struct;
struct neiStatus struct neiStatus
{ {
uint gps_strenght = 0; uint gps_strenght = 0;
@ -54,7 +66,7 @@ struct neitime
int age; int age;
neitime(uint64_t nht, uint64_t nlt, uint64_t mht, uint64_t mlt, double nr, double mr) neitime(uint64_t nht, uint64_t nlt, uint64_t mht, uint64_t mlt, double nr, double mr)
: nei_hardware_time(nht), nei_logical_time(nlt), node_hardware_time(mht), node_logical_time(mlt), : nei_hardware_time(nht), nei_logical_time(nlt), node_hardware_time(mht), node_logical_time(mlt),
nei_rate(nr), relative_rate(mr) {}; nei_rate(nr), relative_rate(mr),age(0) {};
neitime() neitime()
{ {
} }

View File

@ -38,9 +38,16 @@
#include <cmath> #include <cmath>
#include "buzzuav_closures.h" #include "buzzuav_closures.h"
#define UPDATER_MESSAGE_CONSTANT 987654321 /*
#define BUZZ_MESSAGE_CONSTANT_WTO_TIME 586782343 * ROSBuzz message types
#define BUZZ_MESSAGE_CONSTANT_TIME 523534343 */
typedef enum {
ROS_BUZZ_MSG_NIL = 0, // dummy msg
UPDATER_MESSAGE, // Update msg
BUZZ_MESSAGE_WTO_TIME, // Broadcast message wihout time info
BUZZ_MESSAGE_TIME, // Broadcast message with time info
} rosbuzz_msgtype;
// Time sync algo. constants // Time sync algo. constants
#define COM_DELAY 33000000 // in nano seconds i.e 33 ms #define COM_DELAY 33000000 // in nano seconds i.e 33 ms
#define TIME_SYNC_JUMP_THR 500000000 #define TIME_SYNC_JUMP_THR 500000000
@ -55,6 +62,25 @@ using namespace std;
namespace rosbzz_node namespace rosbzz_node
{ {
+
template <typename T>
class circularBuffer {
private:
vector<T> data;
unsigned int lastEntryPos;
int size;
public:
circularBuffer(uint8_t s): data(s), lastEntryPos(0), size(s){};
~circularBuffer(){};
void push(T d){
data[lastEntryPos] = d;
if(lastEntryPos > size-1) lastEntryPos = 0;
else lastEntryPos++;
}
vector<T> get_data(){ return data;};
};
class roscontroller class roscontroller
{ {
public: public:
@ -91,6 +117,20 @@ private:
}; };
typedef struct POSE ros_pose; typedef struct POSE ros_pose;
struct MsgData
{
int msgid;
uint16_t nid;
uint16_t size;
double sent_time;
double received_time;
MsgData(int mi, uint16_t ni, uint16_t s, double st, double rt):
msgid(mi), nid(ni), size(s),sent_time(st), received_time(rt){};
MsgData(){};
};
typedef struct MsgData msg_data;
ros_pose target, home, cur_pos; ros_pose target, home, cur_pos;
uint64_t payload; uint64_t payload;
@ -103,6 +143,9 @@ private:
ros::Time previous_step_time; ros::Time previous_step_time;
double logical_time_rate; double logical_time_rate;
bool time_sync_jumped; bool time_sync_jumped;
double com_delay;
std::vector<msg_data> inmsgdata;
double out_msg_time;
std::string robot_name = ""; std::string robot_name = "";
int rc_cmd; int rc_cmd;
@ -111,7 +154,7 @@ private:
int barrier; int barrier;
int update; int update;
int statepub_active; int statepub_active;
int message_number = 0; int16_t msg_id = 0;
uint8_t no_of_robots = 0; uint8_t no_of_robots = 0;
bool rcclient; bool rcclient;
bool xbeeplugged = false; bool xbeeplugged = false;
@ -281,5 +324,7 @@ private:
void get_xbee_status(); void get_xbee_status();
void time_sync_step(); void time_sync_step();
void push_timesync_nei_msg(int nid, uint64_t nh, uint64_t nl, double nr); void push_timesync_nei_msg(int nid, uint64_t nh, uint64_t nl, double nr);
uint64_t get_logical_time();
void set_logical_time_correction(uint64_t cor);
}; };
} }

View File

@ -63,9 +63,12 @@ logical_clock(ros::Time()), previous_step_time(ros::Time())
} }
// time sync algo. parameter intialization // time sync algo. parameter intialization
previous_step_time.fromNSec(ros::Time::now().toNSec()); previous_step_time.fromNSec(ros::Time::now().toNSec());
logical_clock.fromNSec(ros::Time::now().toNSec()); if(robot_id==10)logical_clock.fromSec(101.0);
logical_time_rate = 0; else logical_clock.fromNSec(0);
logical_time_rate = 1;
time_sync_jumped = false; time_sync_jumped = false;
com_delay = 0;
out_msg_time=0;
// Create log dir and open log file // Create log dir and open log file
std::string path = std::string path =
bzzfile_name.substr(0, bzzfile_name.find_last_of("\\/")) + "/"; bzzfile_name.substr(0, bzzfile_name.find_last_of("\\/")) + "/";
@ -172,38 +175,38 @@ void roscontroller::RosControllerRun()
// log data // log data
// hardware time now // hardware time now
log<<ros::Time::now().toSec()<<","; log<<ros::Time::now().toSec()<<",";
get_logical_time();
// logical time now // logical time now
log<<logical_clock.toSec()<<","; log<<logical_clock.toSec()<<",";
log<<buzz_utility::get_timesync_state()<<",";
log<<cur_pos.latitude << "," << cur_pos.longitude << "," log<<cur_pos.latitude << "," << cur_pos.longitude << ","
<< cur_pos.altitude << ","; << cur_pos.altitude << ",";
log << (int)no_of_robots<<","; log << (int)no_of_robots<<",";
log << neighbours_pos_map.size()<< ","; log << neighbours_pos_map.size()<< ",";
std::vector<uint8_t*> in_msg = buzz_utility::get_inmsg_vector(); log<<(int)inmsgdata.size()<<",";
log <<(int)in_msg.size()<<","; log<< msg_id<<",";
log <<buzz_utility::getuavstate()<<","; log<< out_msg_time<<",";
log <<buzz_utility::getuavstate();
if(neighbours_pos_map.size() > 0)log<<",";
map<int, buzz_utility::Pos_struct>::iterator it = map<int, buzz_utility::Pos_struct>::iterator it =
neighbours_pos_map.begin(); neighbours_pos_map.begin();
for (; it != neighbours_pos_map.end(); ++it) for (; it != neighbours_pos_map.end(); ++it)
{ {
log<< it->first<<","; log<< it->first<<",";
log << (double)it->second.x << "," << (double)it->second.y log << (double)it->second.x << "," << (double)it->second.y
<< "," << (double)it->second.z << ","; << "," << (double)it->second.z;
} }
for (std::vector<msg_data>::iterator it = inmsgdata.begin() ; it != inmsgdata.end(); ++it){
for (std::vector<uint8_t*>::iterator it = in_msg.begin() ; it != in_msg.end(); ++it){ log<<","<<(int)it->nid <<","<<(int)it->msgid<<","<<(int)it->size<<","<<it->sent_time
uint8_t* first_INmsg = (uint8_t*)(*it); <<","<<it->received_time;
size_t tot = 0;
// Size is at first 2 bytes
uint16_t msg_size = (*(uint16_t*)first_INmsg) * sizeof(uint64_t);
tot += sizeof(uint16_t);
// Decode neighbor Id
uint16_t neigh_id = *(uint16_t*)(first_INmsg + tot);
log<<(int)neigh_id<<","<<(int)msg_size<<",";
} }
inmsgdata.clear();
log<<std::endl; log<<std::endl;
time_sync_step(); time_sync_step();
if(debug) if(debug)
ROS_INFO("[Debug : %i ] logical Time %f , har time %f",robot_id, logical_clock.toSec(), ros::Time::now().toSec()); ROS_INFO("[Debug : %i ] logical Time %f , har time %f, rate: %f",robot_id,
logical_clock.toSec(), ros::Time::now().toSec(),logical_time_rate);
// Call Step from buzz script // Call Step from buzz script
buzz_utility::buzz_script_step(); buzz_utility::buzz_script_step();
// Prepare messages and publish them // Prepare messages and publish them
@ -601,28 +604,53 @@ with size......... | /
memcpy(position, tmp, 3 * sizeof(uint64_t)); memcpy(position, tmp, 3 * sizeof(uint64_t));
mavros_msgs::Mavlink payload_out; mavros_msgs::Mavlink payload_out;
// Add Robot id and message number to the published message // Add Robot id and message number to the published message
if (message_number < 0) if (msg_id < 0)
message_number = 0; msg_id = 0;
else else
message_number++; msg_id++;
payload_out.sysid = (uint8_t)robot_id; payload_out.sysid = (uint8_t)robot_id;
payload_out.msgid = (uint32_t)message_number; payload_out.msgid = (uint32_t)msg_id;
if(buzz_utility::get_timesync_state()){ if(buzz_utility::get_timesync_state()){
payload_out.payload64.push_back(BUZZ_MESSAGE_CONSTANT_TIME); // prepare rosbuzz msg header
uint8_t mtype = (uint8_t)BUZZ_MESSAGE_TIME;
uint16_t mid = (uint16_t)msg_id;
get_logical_time();
float stime = (float)logical_clock.toSec();
uint8_t r8header[4];
uint64_t rheader = 0;
memcpy(r8header, &mtype, sizeof(uint8_t));
memcpy(r8header+1, &mid, sizeof(uint16_t));
memcpy(r8header+3, &stime, sizeof(float));
memcpy(&rheader, r8header, 7*sizeof(uint8_t));
ROS_INFO("SENT Header : mtype %u mid %u stime %f", mtype,mid,stime);
// push header into the buffer
payload_out.payload64.push_back(rheader);
payload_out.payload64.push_back(position[0]); payload_out.payload64.push_back(position[0]);
payload_out.payload64.push_back(position[1]); payload_out.payload64.push_back(position[1]);
payload_out.payload64.push_back(position[2]); payload_out.payload64.push_back(position[2]);
//payload_out.payload64.push_back((uint64_t)message_number); //payload_out.payload64.push_back((uint64_t)msg_id);
// add time sync algo data // add time sync algo data
payload_out.payload64.push_back(ros::Time::now().toNSec()+COM_DELAY); payload_out.payload64.push_back(ros::Time::now().toNSec());
payload_out.payload64.push_back(logical_clock.toNSec()+COM_DELAY); payload_out.payload64.push_back(logical_clock.toNSec());
//uint64_t ltrate64 = 0; // ROS_INFO("[%i] SENT_MSG NO : %i",robot_id, msg_id);
//memcpy((void*)&ltrate64, (void*)&logical_time_rate, sizeof(uint64_t)); ROS_INFO("SENT L CLock : %"PRIu64" , H clock : %"PRIu64" , Rate %f",
//payload_out.payload64.push_back(ltrate64); ros::Time::now().toNSec(),logical_clock.toNSec(), logical_time_rate );
} }
else{ else{
payload_out.payload64.push_back(BUZZ_MESSAGE_CONSTANT_WTO_TIME); // prepare rosbuzz msg header
uint8_t mtype = (uint8_t)BUZZ_MESSAGE_WTO_TIME;
uint16_t mid = (uint16_t)msg_id;
get_logical_time();
float stime = (float)logical_clock.toSec();
uint8_t r8header[4];
uint64_t rheader = 0;
memcpy(r8header, &mtype, sizeof(uint8_t));
memcpy(r8header+1, &mid, sizeof(uint16_t));
memcpy(r8header+3, &stime, sizeof(float));
memcpy(&rheader, r8header, 7*sizeof(uint8_t));
ROS_INFO("SENT Header : mtype %u mid %u stime %f", mtype,mid,stime);
// push header into the buffer
payload_out.payload64.push_back(rheader);
payload_out.payload64.push_back(position[0]); payload_out.payload64.push_back(position[0]);
payload_out.payload64.push_back(position[1]); payload_out.payload64.push_back(position[1]);
payload_out.payload64.push_back(position[2]); payload_out.payload64.push_back(position[2]);
@ -633,12 +661,13 @@ with size......... | /
{ {
payload_out.payload64.push_back(payload_out_ptr[i]); payload_out.payload64.push_back(payload_out_ptr[i]);
} }
get_logical_time();
out_msg_time = logical_clock.toSec();
// publish prepared messages in respective topic // publish prepared messages in respective topic
payload_pub.publish(payload_out); payload_pub.publish(payload_out);
delete[] out; delete[] out;
delete[] payload_out_ptr; delete[] payload_out_ptr;
/// Check for updater message if present send // Check for updater message if present send
if (update && buzz_update::is_msg_present()) if (update && buzz_update::is_msg_present())
{ {
uint8_t* buff_send = 0; uint8_t* buff_send = 0;
@ -646,8 +675,14 @@ with size......... | /
; ;
int tot = 0; int tot = 0;
mavros_msgs::Mavlink update_packets; mavros_msgs::Mavlink update_packets;
fprintf(stdout, "Appending code into message ...\n"); // Add Robot id and message number to the published message
fprintf(stdout, "Sent Update packet Size: %u \n", updater_msgSize); if (msg_id < 0)
msg_id = 0;
else
msg_id++;
update_packets.sysid = (uint8_t)robot_id;
update_packets.msgid = (uint32_t)msg_id;
if(debug) ROS_INFO("Broadcasted Update packet Size: %u", updater_msgSize);
// allocate mem and clear it // allocate mem and clear it
buff_send = (uint8_t*)malloc(sizeof(uint16_t) + updater_msgSize); buff_send = (uint8_t*)malloc(sizeof(uint16_t) + updater_msgSize);
memset(buff_send, 0, sizeof(uint16_t) + updater_msgSize); memset(buff_send, 0, sizeof(uint16_t) + updater_msgSize);
@ -663,19 +698,23 @@ with size......... | /
uint64_t* payload_64 = new uint64_t[total_size]; uint64_t* payload_64 = new uint64_t[total_size];
memcpy((void*)payload_64, (void*)buff_send, total_size * sizeof(uint64_t)); memcpy((void*)payload_64, (void*)buff_send, total_size * sizeof(uint64_t));
free(buff_send); free(buff_send);
// Send a constant number to differenciate updater msgs // prepare rosbuzz msg header
update_packets.payload64.push_back((uint64_t)UPDATER_MESSAGE_CONSTANT); uint8_t mtype = (uint8_t)UPDATER_MESSAGE;
uint16_t mid = (uint16_t)msg_id;
get_logical_time();
float stime = (float)logical_clock.toSec();
uint8_t r8header[4];
uint64_t rheader = 0;
memcpy(r8header, &mtype, sizeof(uint8_t));
memcpy(r8header+1, &mid, sizeof(uint16_t));
memcpy(r8header+3, &stime, sizeof(float));
memcpy(&rheader, r8header, 7*sizeof(uint8_t));
// push header into the buffer
payload_out.payload64.push_back(rheader);
for (int i = 0; i < total_size; i++) for (int i = 0; i < total_size; i++)
{ {
update_packets.payload64.push_back(payload_64[i]); update_packets.payload64.push_back(payload_64[i]);
} }
// Add Robot id and message number to the published message
if (message_number < 0)
message_number = 0;
else
message_number++;
update_packets.sysid = (uint8_t)robot_id;
update_packets.msgid = (uint32_t)message_number;
payload_pub.publish(update_packets); payload_pub.publish(update_packets);
delete[] payload_64; delete[] payload_64;
} }
@ -1089,8 +1128,20 @@ void roscontroller::payload_obt(const mavros_msgs::Mavlink::ConstPtr& msg)
-----------------------------------------------------------------------------------------------------*/ -----------------------------------------------------------------------------------------------------*/
{ {
// decode msg header
uint64_t rheader = msg->payload64[0];
uint8_t r8header[4];
uint8_t mtype;
uint16_t mid;
float stime;
memcpy(r8header,&rheader, 7*sizeof(uint8_t));
memcpy(&mtype, r8header, sizeof(uint8_t));
memcpy(&mid, r8header+1, sizeof(uint16_t));
memcpy(&stime, r8header+3, sizeof(float));
ROS_INFO("Received Header : mtype %u mid %u stime %f", mtype,mid,stime);
// Check for Updater message, if updater message push it into updater FIFO // Check for Updater message, if updater message push it into updater FIFO
if ((uint64_t)msg->payload64[0] == (uint64_t)UPDATER_MESSAGE_CONSTANT) if ((int)mtype == (int)UPDATER_MESSAGE)
{ {
uint16_t obt_msg_size = sizeof(uint64_t) * (msg->payload64.size()); uint16_t obt_msg_size = sizeof(uint64_t) * (msg->payload64.size());
uint64_t message_obt[obt_msg_size]; uint64_t message_obt[obt_msg_size];
@ -1105,7 +1156,7 @@ void roscontroller::payload_obt(const mavros_msgs::Mavlink::ConstPtr& msg)
// Copy packet into temporary buffer neglecting update constant // Copy packet into temporary buffer neglecting update constant
memcpy((void*)pl, (void*)(message_obt + 1), obt_msg_size); memcpy((void*)pl, (void*)(message_obt + 1), obt_msg_size);
uint16_t unMsgSize = *(uint16_t*)(pl); uint16_t unMsgSize = *(uint16_t*)(pl);
fprintf(stdout, "Update packet received, read msg size : %u \n", unMsgSize); if(debug) ROS_INFO("Update packet received, size : %u", unMsgSize);
if (unMsgSize > 0) if (unMsgSize > 0)
{ {
buzz_update::code_message_inqueue_append((uint8_t*)(pl + sizeof(uint16_t)), unMsgSize); buzz_update::code_message_inqueue_append((uint8_t*)(pl + sizeof(uint16_t)), unMsgSize);
@ -1114,8 +1165,8 @@ void roscontroller::payload_obt(const mavros_msgs::Mavlink::ConstPtr& msg)
free(pl); free(pl);
} }
// BVM FIFO message // BVM FIFO message
else if (msg->payload64[0] == (uint64_t)BUZZ_MESSAGE_CONSTANT_TIME || else if ((int)mtype == (int)BUZZ_MESSAGE_TIME ||
msg->payload64[0] == (uint64_t)BUZZ_MESSAGE_CONSTANT_WTO_TIME) (int)mtype == (int)BUZZ_MESSAGE_WTO_TIME)
{ {
uint64_t message_obt[msg->payload64.size() - 1]; uint64_t message_obt[msg->payload64.size() - 1];
// Go throught the obtained payload // Go throught the obtained payload
@ -1123,6 +1174,15 @@ void roscontroller::payload_obt(const mavros_msgs::Mavlink::ConstPtr& msg)
{ {
message_obt[i - 1] = (uint64_t)msg->payload64[i]; message_obt[i - 1] = (uint64_t)msg->payload64[i];
} }
// determine buzz msg index based on msg type
int index = 3;
if((int)mtype == (int)BUZZ_MESSAGE_TIME) index = 5;
// Extract robot id of the neighbour
uint16_t* out = buzz_utility::u64_cvt_u16((uint64_t) * (message_obt + index));
get_logical_time();
// store in msg data for data log
msg_data inm(mid,out[1],out[0]*sizeof(uint64_t),stime,logical_clock.toSec());
inmsgdata.push_back(inm);
// Extract neighbours position from payload // Extract neighbours position from payload
double neighbours_pos_payload[3]; double neighbours_pos_payload[3];
memcpy(neighbours_pos_payload, message_obt, 3 * sizeof(uint64_t)); memcpy(neighbours_pos_payload, message_obt, 3 * sizeof(uint64_t));
@ -1133,11 +1193,8 @@ void roscontroller::payload_obt(const mavros_msgs::Mavlink::ConstPtr& msg)
nei_pos.longitude = neighbours_pos_payload[1]; nei_pos.longitude = neighbours_pos_payload[1];
nei_pos.altitude = neighbours_pos_payload[2]; nei_pos.altitude = neighbours_pos_payload[2];
double cvt_neighbours_pos_payload[3]; double cvt_neighbours_pos_payload[3];
// Compute RB in robot body ref. frame
gps_rb(nei_pos, cvt_neighbours_pos_payload); gps_rb(nei_pos, cvt_neighbours_pos_payload);
int index = 3;
if(msg->payload64[0] == (uint64_t)BUZZ_MESSAGE_CONSTANT_TIME) index = 5;
// Extract robot id of the neighbour
uint16_t* out = buzz_utility::u64_cvt_u16((uint64_t) * (message_obt + index));
if (debug) if (debug)
ROS_WARN("RAB of %i: %f, %f", (int)out[1], cvt_neighbours_pos_payload[0], cvt_neighbours_pos_payload[1]); ROS_WARN("RAB of %i: %f, %f", (int)out[1], cvt_neighbours_pos_payload[0], cvt_neighbours_pos_payload[1]);
// Pass neighbour position to local maintaner // Pass neighbour position to local maintaner
@ -1148,17 +1205,16 @@ void roscontroller::payload_obt(const mavros_msgs::Mavlink::ConstPtr& msg)
// TODO: remove roscontroller local map array for neighbors // TODO: remove roscontroller local map array for neighbors
neighbours_pos_put((int)out[1], n_pos); neighbours_pos_put((int)out[1], n_pos);
buzzuav_closures::neighbour_pos_callback((int)out[1], n_pos.x, n_pos.y, n_pos.z); buzzuav_closures::neighbour_pos_callback((int)out[1], n_pos.x, n_pos.y, n_pos.z);
if(msg->payload64[0] == (uint64_t)BUZZ_MESSAGE_CONSTANT_TIME){ if((int)mtype == (int)BUZZ_MESSAGE_TIME){
// update time struct for sync algo // update time struct for sync algo
double nr = 0; double nr = 0;
push_timesync_nei_msg((int)out[1], message_obt[3],message_obt[4],nr); push_timesync_nei_msg((int)out[1], message_obt[3],message_obt[4],nr);
delete[] out;
buzz_utility::in_msg_append((message_obt + index)); buzz_utility::in_msg_append((message_obt + index));
} }
else{ else{
delete[] out;
buzz_utility::in_msg_append((message_obt + index)); buzz_utility::in_msg_append((message_obt + index));
} }
delete[] out;
} }
} }
@ -1427,7 +1483,7 @@ void roscontroller::time_sync_step()
* Steps the time syncronization algorithm * Steps the time syncronization algorithm
------------------------------------------------------------------ */ ------------------------------------------------------------------ */
{ {
// ROS_INFO("Stepping time sync : %f ", logical_clock.toSec()); //ROS_INFO("Stepping time sync : %f ", logical_clock.toSec());
double avgRate = logical_time_rate; double avgRate = logical_time_rate;
double avgOffset = 0; double avgOffset = 0;
int offsetCount = 0; int offsetCount = 0;
@ -1437,98 +1493,70 @@ void roscontroller::time_sync_step()
avgRate += (it->second).relative_rate; avgRate += (it->second).relative_rate;
// estimate current offset // estimate current offset
int64_t offset = (int64_t)(it->second).nei_logical_time - (int64_t)(it->second).node_logical_time; int64_t offset = (int64_t)(it->second).nei_logical_time - (int64_t)(it->second).node_logical_time;
// ROS_INFO("time sync loop offset %" PRId64 ", nei clock : %" PRId64 ", my clock :%" PRId64, offset,(int64_t)(it->second).nei_logical_time, (int64_t)(it->second).node_logical_time);
if(offset > 0){
// neighbor is ahead in time
avgOffset = avgOffset + (1 * offset);
offsetCount++;
}
else{
if(std::abs(offset)<TIME_SYNC_JUMP_THR){
avgOffset = avgOffset + offset; avgOffset = avgOffset + offset;
offsetCount++; offsetCount++;
} ROS_INFO("time sync loop offset %" PRId64 ", nei clock : %" PRId64 ", my clock :%" PRId64, offset,(int64_t)(it->second).nei_logical_time, (int64_t)(it->second).node_logical_time);
}
if((it->second).age < BUZZRATE) (it->second).age++; if((it->second).age < BUZZRATE) (it->second).age++;
else neighbours_time_map.erase(it); else neighbours_time_map.erase(it);
} }
avgRate = avgRate/(neighbours_time_map.size()+1);
if(offsetCount>0 && !time_sync_jumped){ if(offsetCount>0 && !time_sync_jumped){
int64_t correction = (int64_t)ceil(avgOffset / (offsetCount+1)); int64_t correction = (int64_t)ceil(avgOffset / (offsetCount+1));
if(std::abs(correction) < TIME_SYNC_JUMP_THR){ if(std::abs(correction) < TIME_SYNC_JUMP_THR){
// ROS_INFO("With correction before correcting time sync : %f ", logical_clock.toSec()); // ROS_INFO("With correction before correcting time sync : %f ", logical_clock.toSec());
// correct time with the diff within this step + differnce in nei + rate change set_logical_time_correction(correction);
uint64_t time_diff = ros::Time::now().toNSec() - previous_step_time.toNSec(); ROS_INFO("With correction time sync : %f ", logical_clock.toSec());
uint64_t old_time = logical_clock.toNSec();
avgRate = avgRate/(neighbours_time_map.size()+1);
// ROS_INFO(" time sync loop : avgOffset %f, offsetCount : %d , correction %" PRId64 "time diff %" PRIu64 " old_time %" PRIu64 " rate %f",
// avgOffset, offsetCount, correction, time_diff, old_time, avgRate);
uint64_t final_time = old_time + time_diff + (uint64_t)correction;// + (time_diff * avgRate);
// ROS_INFO(" Final time: %" PRIu64, final_time);
logical_clock.fromNSec(final_time);
previous_step_time.fromNSec(ros::Time::now().toNSec());
// ROS_INFO("With correction time sync : %f ", logical_clock.toSec());
}
else{
// correct time with the diff within this step + rate change
uint64_t time_diff = ros::Time::now().toNSec() - previous_step_time.toNSec();
uint64_t old_time = logical_clock.toNSec();
avgRate = avgRate/(neighbours_time_map.size()+1);
logical_clock.fromNSec(old_time + time_diff); // + time_diff * avgRate);
previous_step_time.fromNSec(ros::Time::now().toNSec());
//ROS_INFO("inside non jump without coorection time sync : %f ", logical_clock.toSec());
} }
} }
else{ get_logical_time(); // just to update clock
// correct time with the diff within this step + rate change
uint64_t time_diff = ros::Time::now().toNSec() - previous_step_time.toNSec();
uint64_t old_time = logical_clock.toNSec();
avgRate = avgRate/(neighbours_time_map.size()+1);
logical_clock.fromNSec(old_time + time_diff); // + time_diff * avgRate);
previous_step_time.fromNSec(ros::Time::now().toNSec());
//ROS_INFO("without correction time sync : %f ", logical_clock.toSec());
}
time_sync_jumped = false; time_sync_jumped = false;
//neighbours_time_map.clear(); //neighbours_time_map.clear();
logical_time_rate = avgRate; logical_time_rate = avgRate;
} }
void roscontroller::push_timesync_nei_msg(int nid, uint64_t nh, uint64_t nl, double nr) void roscontroller::push_timesync_nei_msg(int nid, uint64_t nh, uint64_t nl, double nr)
/* /*
* pushes a time syncronization msg into its data slot * pushes a time syncronization msg into its data slot
------------------------------------------------------------------ */ ------------------------------------------------------------------ */
{ {
ROS_INFO("Received from [%i] L %"PRIu64" , H: %"PRIu64" , nei rate %f",nid, nl,nh,nr);
map<int, buzz_utility::neighbor_time>::iterator it = neighbours_time_map.find(nid); map<int, buzz_utility::neighbor_time>::iterator it = neighbours_time_map.find(nid);
double relativeRate =0; double relativeRate =1;
if (it != neighbours_time_map.end()){ if (it != neighbours_time_map.end()){
uint64_t deltaLocal = ros::Time::now().toNSec() - (it->second).node_hardware_time; int64_t deltaLocal = ros::Time::now().toNSec() - (it->second).node_hardware_time;
uint64_t delatNei = round((nh - (it->second).nei_hardware_time ) int64_t delatNei = round(nh - (it->second).nei_hardware_time);
+ (nh - (it->second).nei_hardware_time ) * nr);
double currentRate = 0; double currentRate = 0;
if(deltaLocal !=0) currentRate = ((double)delatNei - (double)deltaLocal)/(double)deltaLocal; ROS_INFO("current in hand for nei : me H %"PRIu64" , nei H %"PRIu64" ", (it->second).node_hardware_time,
(it->second).nei_hardware_time );
if(deltaLocal !=0 && delatNei !=0) currentRate = ((double)delatNei - (double)deltaLocal)/(double)deltaLocal;
relativeRate = MOVING_AVERAGE_ALPHA*((it->second).relative_rate) relativeRate = MOVING_AVERAGE_ALPHA*((it->second).relative_rate)
+ (1- MOVING_AVERAGE_ALPHA)*currentRate; + (1- MOVING_AVERAGE_ALPHA)*currentRate;
// ROS_INFO("deltaLocal %" PRIu64 ", delatNei %" PRIu64 " , currentrate %f , this relative rate %f, final relativeRate %f", deltaLocal, delatNei, currentRate, ROS_INFO("SYNC MSG RECEIVED deltaLocal %" PRIu64 ", delatNei %" PRId64 " , currentrate %f , this relative rate %f, final relativeRate %f",
// (it->second).relative_rate, relativeRate); deltaLocal, delatNei, currentRate, (it->second).relative_rate, relativeRate);
neighbours_time_map.erase(it); neighbours_time_map.erase(it);
} }
int64_t offset = (int64_t)nl - (int64_t)logical_clock.toNSec(); int64_t offset = (int64_t)nl - (int64_t)logical_clock.toNSec();
// ROS_INFO("neigh msg push from %i "", my clock %" PRIu64 ", nei clock %" PRIu64 ", offset: %" PRId64", relative rate %f", nid, logical_clock.toNSec(), nl, offset, relativeRate);
if(offset > TIME_SYNC_JUMP_THR){ if(offset > TIME_SYNC_JUMP_THR){
uint64_t time_diff = ros::Time::now().toNSec() - previous_step_time.toNSec(); set_logical_time_correction(offset);// + time_diff * logical_time_rate );
uint64_t old_time = logical_clock.toNSec();
logical_clock.fromNSec(old_time + time_diff + offset );// + time_diff * logical_time_rate );
previous_step_time.fromNSec(ros::Time::now().toNSec());
time_sync_jumped = true; time_sync_jumped = true;
ROS_WARN("CLock Jumped %"PRIu64, logical_clock.toNSec());
} }
buzz_utility::neighbor_time nt(nh, nl, ros::Time::now().toNSec(), buzz_utility::neighbor_time nt(nh, nl, ros::Time::now().toNSec(),
logical_clock.toNSec(), nr, relativeRate); logical_clock.toNSec(), nr, relativeRate);
neighbours_time_map.insert(make_pair(nid, nt)); neighbours_time_map.insert(make_pair(nid, nt));
// ROS_INFO("neigh time sync : %f , nei logical %i", logical_clock.toSec(), nl);
} }
uint64_t roscontroller::get_logical_time(){
uint64_t time_diff = ros::Time::now().toNSec() - previous_step_time.toNSec();
uint64_t old_time = logical_clock.toNSec();
logical_clock.fromNSec(old_time + time_diff);// + time_diff * logical_time_rate);
previous_step_time.fromNSec(ros::Time::now().toNSec());
return logical_clock.toNSec();
}
void roscontroller::set_logical_time_correction(uint64_t cor){
uint64_t l_time_now = get_logical_time();
logical_clock.fromNSec(l_time_now + cor);
}
} }