#!/usr/bin/env python2.7 ### Cesar Rodriguez July 2021 ### Based off of Klausen 2017 - Smooth trajectory generation based on desired waypoints import rospy, tf import numpy as np import time import math import rosservice from scipy.integrate import odeint from oscillation_ctrl.msg import RefSignal, LoadAngles, TetheredStatus from oscillation_ctrl.srv import WaypointTrack from geometry_msgs.msg import Pose, PoseStamped, Point, TwistStamped from sensor_msgs.msg import Imu from mavros_msgs.msg import State class DesiredPoint(): def __init__(self,x,y,z): self.x = x self.y = y self.z = z class Main: def __init__(self): # rate(s) rate = 50 # rate for the publisher method, specified in Hz -- 10 Hz # initialize variables self.tstart = rospy.get_time() # Keep track of the start time while self.tstart == 0.0: # Need to make sure get_rostime works self.tstart = rospy.get_time() self.dt = 1.0/rate self.tmax = self.dt # used to get desired pos, vel, and acc for next time step (tmax) self.n = self.tmax/self.dt + 1 self.t = np.linspace(0, self.tmax, self.n) # Time array # self.t = np.linspace(0, 1.0, 10) # Time array # Message generation/ collection self.vel_data = TwistStamped() # This is needed to get drone vel from gps self.imu_data = Imu() # Needed for to get drone acc from IMU self.ref_sig = RefSignal() # Smooth Signal self.load_angles = LoadAngles() self.has_run = 0 # Bool to keep track of first run instance self.dr_pos = Pose() self.dr_vel = self.vel_data.twist.linear self.dr_acc = self.imu_data.linear_acceleration self.tetherL = self.get_tether() # --------------------------------------------------------------------------------# # SUBSCRIBERS # # --------------------------------------------------------------------------------# # Topic, msg type, and class callback method rospy.Subscriber('/status/load_angles', LoadAngles, self.loadAngles_cb) rospy.Subscriber('/mavros/local_position/pose', PoseStamped, self.dronePos_cb) # rospy.Subscriber('/status/twoBody_status', TetheredStatus, self.dronePos_cb) rospy.Subscriber('/mavros/local_position/velocity_body', TwistStamped, self.droneVel_cb) rospy.Subscriber('/mavros/imu/data', Imu, self.droneAcc_cb) # --------------------------------------------------------------------------------# # PUBLISHERS # --------------------------------------------------------------------------------# # Publish desired path to compute attitudes self.pub_path = rospy.Publisher('/reference/path',RefSignal,queue_size = 10) # Needed for geometric controller to compute thrust #self.pub_ref = rospy.Publisher('/reference/flatsetpoint',FlatTarget,queue_size = 10) # timer(s), used to control method loop freq(s) as defined by the rate(s) self.pub_tim = rospy.Timer(rospy.Duration(1.0/rate), self.publisher) # --------------------------------------------------------------------------------# # FEEDBACK AND INPUT SHAPING # --------------------------------------------------------------------------------# # Smooth path variables self.EPS_F = np.zeros(9) # Final Epsilon/ signal self.EPS_I = np.zeros(9) # Epsilon shapefilter # Constants for smooth path generation self.w_tune = 1 # 1 also works well :) 3.13 works well? ######################################################################### self.epsilon = 0.7 # Damping ratio # need exception if we do not have tether: if self.tetherL == 0.0: self.wn = self.w_tune else: self.wn = math.sqrt(9.81/self.tetherL) self.wd = self.wn*math.sqrt(1 - self.epsilon**2) self.k4 = 4*self.epsilon*self.w_tune self.k3 = ((2 + 4*self.epsilon**2)*self.w_tune**2)/self.k4 self.k2 = (4*self.epsilon*self.w_tune**3)/(self.k4*self.k3) self.k1 = (self.w_tune**4)/(self.k2*self.k3*self.k4) # For saturation: self.max = [0, 8, 1.5, 3] #[0, 10, 1.5, 3] self.get_xd = rospy.ServiceProxy('/status/waypoint_tracker',WaypointTrack) self.empty_point = Point() # Needed to query waypoint_server self.des_waypoints = True # True = changing waypoints # Initial conditions: [pos, vel, acc, jerk] self.x0 = [0, 0, 0, 0] self.y0 = [0, 0, 0, 0] self.z0 = [0, 0, 0, 0] # Constants for feedback self.Gd = 0.325 #0.325 self.td = 2*self.Gd*math.pi/self.wd # Constants for shape filter/ Input shaping self.t1 = 0 self.t2 = math.pi/self.wd self.K = math.exp(-self.epsilon*math.pi/math.sqrt(1 - self.epsilon**2)) self.A1 = 1/(1 + self.K) self.A2 = self.A1*self.K # Need to determine how large of any array needs to be stored to use delayed functions self.SF_delay_x = np.zeros([4,int(round(self.t2/self.dt))]) # Shapefilter delay; 4 - p,v,a,j self.SF_delay_y = np.zeros([4,int(round(self.t2/self.dt))]) self.phi_fb = np.zeros(int(round(self.td/self.dt))) # Feedback delay self.phi_vel_fb = np.zeros(int(round(self.td/self.dt))) self.phi_acc_fb = np.zeros(int(round(self.td/self.dt))) self.theta_fb = np.zeros(int(round(self.td/self.dt))) self.theta_vel_fb = np.zeros(int(round(self.td/self.dt))) self.theta_acc_fb = np.zeros(int(round(self.td/self.dt))) # Need index to keep track of path array of ^^^^ length self.SF_idx = 0 self.FB_idx = 0 # Constants for sigmoid self.at = 3 # acceleration theshold self.pc = 0.7 # From Klausen 2017 self.sk = self.SF_delay_x.shape[1] # from Klausen 2017 self.ak = np.zeros([self.sk]) self.s_gain = 0.0 # Gain found from sigmoid self.service_list = rosservice.get_service_list() # --------------------------------------------------------------------------------# # CALLBACK FUNCTIONS # --------------------------------------------------------------------------------# # Callback to get link names, states, and pload deflection angles def loadAngles_cb(self,msg): try: self.load_angles = msg except ValueError: pass # Callback drone pose def dronePos_cb(self,msg): try: self.dr_pos = msg.pose # self.dr_pos = msg.drone_pos except ValueError: pass # Callback for drone velocity def droneVel_cb(self,msg): try: self.dr_vel = msg.twist.linear except ValueError or TypeError: pass # Callback for drone accel from IMU data def droneAcc_cb(self,msg): try: self.dr_acc = msg.linear_acceleration except ValueError: pass def waypoints_srv_cb(self): if '/status/waypoint_tracker' in self.service_list: rospy.wait_for_service('/status/waypoint_tracker') try: resp = self.get_xd(False,self.empty_point) self.xd = resp.xd except ValueError: pass else: self.xd = DesiredPoint(0.0,0.0,2.0) def get_tether(self): """ Get tether length based on set parameters""" self.param_exists = False while self.param_exists == False: if rospy.has_param('status/tether_length'): tether_length = rospy.get_param('status/tether_length') # Tether length self.param_exists = True elif rospy.get_time() - self.tstart >= 15: tether_length = 0.0 break return tether_length # --------------------------------------------------------------------------------# # ALGORITHM # --------------------------------------------------------------------------------# # --------------------------------------------------------------------------------# # TRAJECTORY GENERATION BASED ON WAYPONTS xd # --------------------------------------------------------------------------------# def statespace(self,y,t,xd): # Update initial conditions # # Need the statespace array: pos,vel,acc,jer = y error = xd - pos if abs(error) <= 0.25: error = 0.0 # Derivative of statesape array: dydt = [vel, acc, jer, self.k4*(self.k3*(self.k2*(self.k1*(error) - vel) - acc) - jer)] return dydt # Sigmoid def sigmoid(self): for i in range(self.sk): if math.sqrt(self.SF_delay_x[2,i]**2 + self.SF_delay_y[2,i]**2) < self.at: self.ak[i] = 1 else: self.ak[i] = 0 pk = sum(self.ak)/self.sk self.s_gain = 1/(1 + math.exp(-self.sk*(pk-self.pc))) def delay(self,j,cmd): ''' Function to keep track of values in past. Needed for Shape filtering as well as feedback j -> tells if we are doing pos, vel, or acc cmd -> determines SF or FB ''' if cmd == 1: # Shape filter if self.SF_idx < len(self.SF_delay_x[0]): # First, fill up the delay array self.SF_delay_x[j,self.SF_idx] = self.x[1,j] self.SF_delay_y[j,self.SF_idx] = self.y[1,j] else: # once array is filled, we start using the first value every time # x self.SF_delay_x[j,:] = np.roll(self.SF_delay_x[j,:],-1) # makes the first value last self.SF_delay_x[j,len(self.SF_delay_x[0])-1] = self.x[1,j] # updates last value to current x # y self.SF_delay_y[j,:] = np.roll(self.SF_delay_y[j,:],-1) self.SF_delay_y[j,len(self.SF_delay_y[0])-1] = self.y[1,j] elif cmd == 2: # Feedback if self.FB_idx < len(self.theta_fb): # First, fill up the delay array self.theta_fb[self.FB_idx] = self.load_angles.theta self.theta_vel_fb[self.FB_idx] = self.load_angles.thetadot self.theta_acc_fb[self.FB_idx] = self.load_angles.thetadot - self.theta_vel_fb[self.FB_idx-1] self.phi_fb[self.FB_idx] = self.load_angles.phi self.phi_vel_fb[self.FB_idx] = self.load_angles.phidot self.phi_acc_fb[self.FB_idx] = self.load_angles.phidot - self.phi_vel_fb[self.FB_idx-1] else: # once array is filled, need to shift values w/ latest value at end self.theta_fb[:] = np.roll(self.theta_fb[:],-1) self.theta_vel_fb[:] = np.roll(self.theta_vel_fb[:],-1) self.theta_acc_fb[:] = np.roll(self.theta_acc_fb[:],-1) self.phi_fb[:] = np.roll(self.phi_fb[:],-1) self.phi_vel_fb[:] = np.roll(self.phi_vel_fb[:],-1) self.phi_acc_fb[:] = np.roll(self.phi_acc_fb[:],-1) self.theta_fb[len(self.theta_fb)-1] = self.load_angles.theta # change final value self.theta_vel_fb[len(self.theta_fb)-1] = self.load_angles.thetadot self.theta_acc_fb[len(self.theta_fb)-1] = self.load_angles.thetadot - self.theta_vel_fb[len(self.theta_fb)-1] self.phi_fb[len(self.phi_fb)-1] = self.load_angles.phi self.phi_vel_fb[len(self.theta_fb)-1] = self.load_angles.phidot self.phi_acc_fb[len(self.theta_fb)-1] = self.load_angles.phidot - self.phi_vel_fb[len(self.theta_fb)-1] else: print('No delay') # Convolution of shape filter and trajectory, and feedback def convo(self): # check if waypoints have changed self.waypoints_srv_cb() # needed for delay function # 1 = determine shapefilter array # 2 = determine theta/phi_fb SHAPEFIL = 1 FEEDBACK = 2 # SOLVE ODE (get ref pos, vel, accel) self.x = odeint(self.statespace,self.x0,self.t,args=(self.xd.x,)) self.y = odeint(self.statespace,self.y0,self.t,args=(self.xd.y,)) #self.z = odeint(self.statespace,self.z0,self.t,args=(self.xd.z,)) for i in range(1,len(self.y0)): self.x[:,i] = np.clip(self.x[:,i], a_min = -self.max[i], a_max = self.max[i]) self.y[:,i] = np.clip(self.y[:,i], a_min = -self.max[i], a_max = self.max[i]) #self.z[:,i] = np.clip(self.z[:,i], a_min = -self.max[i], a_max = self.max[i]) for j in range(3): # 3 is due to pos, vel, acc. NOT due x, y, z self.delay(j,SHAPEFIL) # Determine the delay (shapefilter) array if self.SF_idx < len(self.SF_delay_x[0]): self.EPS_I[3*j] = self.x[-1,j] self.EPS_I[3*j+1] = self.y[-1,j] #self.EPS_I[3*j+2] = self.z[1,j] else: self.EPS_I[3*j] = self.A1*self.x[-1,j] + self.A2*self.SF_delay_x[j,0] # Determine convolution (x) self.EPS_I[3*j+1] = self.A1*self.y[-1,j] + self.A2*self.SF_delay_y[j,0] # Determine convolution (y) #self.EPS_I[3*j+2] = self.z[1,j] # No need to convolute z-dim self.delay(1,FEEDBACK) # Detemine feedback array self.sigmoid() # Determine sigmoid gain EPS_D = self.fback() # Feedback Epsilon # for i in range(9): # Populate EPS_F buffer with desired change based on feedback # self.EPS_F[i] = self.EPS_I[i] + self.s_gain*EPS_D[i] #+ EPS_D[i] self.EPS_F = self.EPS_I + self.s_gain*EPS_D # Populate msg with epsilon_final self.ref_sig.header.stamp = rospy.Time.now() #self.ref_sig.type_mask = 2 # Need typemask = 2 to use correct attitude controller - Jaeyoung Lin self.ref_sig.position.x = self.EPS_F[0] self.ref_sig.position.y = self.EPS_F[1] self.ref_sig.velocity.x = self.EPS_F[3] self.ref_sig.velocity.y = self.EPS_F[4] self.ref_sig.acceleration.x = self.EPS_F[6] self.ref_sig.acceleration.y = self.EPS_F[7] # Do not need to evaluate z self.ref_sig.position.z = self.xd.z self.ref_sig.velocity.z = 0.0 self.ref_sig.acceleration.z = 0.0 #self.ref_sig.position.z = self.EPS_F[2] #self.ref_sig.velocity.z = self.EPS_F[5] #self.ref_sig.acceleration.z = self.EPS_F[8] # self.x0 = [self.dr_pos.position.x, self.x[1,1], self.x[1,2], self.x[1,3]] # self.y0 = [self.dr_pos.position.y, self.y[1,1], self.y[1,2], self.y[1,3]] # self.z0 = [self.dr_pos.position.z, self.z[1,1], self.z[1,2], self.z[1,3]] self.x0 = [self.x[1,0], self.x[1,1], self.x[1,2], self.x[1,3]] self.y0 = [self.y[1,0], self.y[1,1], self.y[1,2], self.y[1,3]] self.SF_idx += 1 self.FB_idx += 1 def fback(self): """ Uses first element in feedback array as this is what get replaced in each iteration """ xr = self.Gd*self.tetherL*math.sin(self.theta_fb[0]) xdotr = self.Gd*self.tetherL*math.cos(self.theta_fb[0])*self.theta_vel_fb[0] xddotr = -self.Gd*self.tetherL*math.sin(self.theta_fb[0])*(self.theta_vel_fb[0]**2) + math.cos(self.theta_fb[0])*self.theta_acc_fb[0] yr = -self.Gd*self.tetherL*math.sin(self.phi_fb[0]) ydotr = -self.Gd*self.tetherL*math.cos(self.phi_fb[0])*self.phi_vel_fb[0] yddotr = self.Gd*self.tetherL*math.sin(self.phi_fb[0])*self.phi_vel_fb[0]**2 + math.cos(self.phi_fb[0])*self.phi_acc_fb[0] EPS_D = np.array([xr,yr,0,xdotr,ydotr,0,xddotr,yddotr,0]) return EPS_D def screen_output(self): # Feedback to user # rospy.loginfo(' Var | x | y | z ') # rospy.loginfo('Pos: %.2f %.2f %.2f',self.EPS_F[0],self.EPS_F[1],self.EPS_F[2]) # rospy.loginfo('Vel: %.2f %.2f %.2f',self.EPS_F[3],self.EPS_F[4],self.EPS_F[5]) # rospy.loginfo('Acc: %.2f %.2f %.2f',self.EPS_F[6],self.EPS_F[7],self.EPS_F[8]) # rospy.loginfo('_______________________') rospy.loginfo_once('Tether length: %.2f',self.tetherL) # rospy.loginfo('Theta: %.2f',self.load_angles.theta) # rospy.loginfo('Phi: %.2f',self.load_angles.phi) def publisher(self,pub_tim): # Determine final ref signal self.convo() # Publish reference signal self.pub_path.publish(self.ref_sig) # Give user feedback on published message: self.screen_output() if __name__=="__main__": # Initiate ROS node rospy.init_node('desPath_node',anonymous=True) try: Main() # create class object rospy.spin() # loop until shutdown signal except rospy.ROSInterruptException: pass