oscillation_ctrl/src/ref_signalGen.py

446 lines
16 KiB
Python
Executable File

#!/usr/bin/env python2.7
### Cesar Rodriguez July 2021
### Based off of Klausen 2017 - Smooth trajectory generation based on desired waypoints
import rospy, tf
import numpy as np
import time
import math
from pymavlink import mavutil
from scipy import signal
from scipy.integrate import odeint
from oscillation_ctrl.msg import tethered_status, RefSignal
from oscillation_ctrl.srv import WaypointTrack, WaypointTrackRequest
from controller_msgs.msg import FlatTarget
from geometry_msgs.msg import Pose, Vector3, PoseStamped, Point, TwistStamped
from sensor_msgs.msg import Imu
from mavros_msgs.msg import State
class Main:
def __init__(self):
# rate(s)
rate = 30 # rate for the publisher method, specified in Hz -- 10 Hz
# initialize variables
self.tstart = rospy.get_time() # Keep track of the start time
while self.tstart == 0.0: # Need to make sure get_rostime works
self.tstart = rospy.get_time()
rospy.loginfo(self.tstart)
self.dt = 1.0/rate
# self.dt = 0.5
# self.tmax = 100
self.tmax = self.dt
self.n = self.tmax/self.dt + 1
self.t = np.linspace(0, self.tmax, self.n) # Time array
# Message generation/ collection
self.state = State()
self.mode = ''
self.vel_data = TwistStamped() # This is needed to get drone vel from gps
self.imu_data = Imu() # Needed for to get drone acc from IMU
self.ref_sig = FlatTarget() # Smooth Signal
self.ref_sig.position.z = 5.0 # This does not need to be determined
self.phi = 0.0 # Payload angle of deflection from x-axis
self.phidot = 0.0
self.theta = 0.0 # Payload angle of deflection from y-axis
self.thetadot = 0.0
self.has_run = 0 # Bool to keep track of first run instance
self.drone_id = ''
self.pload_id = ''
self.pl_pos = Pose()
self.dr_pos = Pose()
self.dr_vel = self.vel_data.twist.linear
self.dr_acc = self.imu_data.linear_acceleration
self.dr_acc_p = self.imu_data.linear_acceleration
# Get tether length
self.param_exists = False
while self.param_exists == False:
if rospy.has_param('sim/tether_length'):
self.tetherL = rospy.get_param('sim/tether_length') # Tether length
self.param_exists = True
elif rospy.get_time() - self.tstart >= 3.0:
self.tetherL = 0.0
break
# --------------------------------------------------------------------------------#
# SUBSCRIBERS #
# --------------------------------------------------------------------------------#
# Topic, msg type, and class callback method
rospy.Subscriber('/status/twoBody_status', tethered_status, self.linkState_cb)
rospy.Subscriber('/mavros/local_position/velocity_body', TwistStamped, self.droneVel_cb)
rospy.Subscriber('/mavros/imu/data', Imu, self.droneAcc_cb)
rospy.Subscriber('/mavros/state', State, self.state_cb)
# --------------------------------------------------------------------------------#
# PUBLISHERS
# --------------------------------------------------------------------------------#
# Publish desired path to compute attitudes
self.pub_path = rospy.Publisher('/reference/path',FlatTarget,queue_size = 10)
# Needed for geometric controller to compute thrust
self.pub_ref = rospy.Publisher('/reference/flatsetpoint',FlatTarget,queue_size = 10)
# timer(s), used to control method loop freq(s) as defined by the rate(s)
self.pub_tim = rospy.Timer(rospy.Duration(1.0/rate), self.publisher)
# --------------------------------------------------------------------------------#
# FEEDBACK AND INPUT SHAPING
# --------------------------------------------------------------------------------#
# Smooth path variables
self.EPS_F = np.zeros(9) # Final Epsilon/ signal
self.EPS_I = np.zeros(9) # Epsilon shapefilter
# Constants for smooth path generation
self.w_tune = 3.13 # 3.13 works well? #########################################################################
self.epsilon = 0.7 # Damping ratio
# need exception if we do not have tether:
if self.tetherL == 0.0:
self.wn = self.w_tune
else:
self.wn = math.sqrt(9.81/self.tetherL)
self.wd = self.wn*math.sqrt(1 - self.epsilon**2)
self.k4 = 4*self.epsilon*self.w_tune
self.k3 = ((2 + 4*self.epsilon**2)*self.w_tune**2)/self.k4
self.k2 = (4*self.epsilon*self.w_tune**3)/(self.k4*self.k3)
self.k1 = (self.w_tune**4)/(self.k2*self.k3*self.k4)
# For saturation:
self.jmax = [3, 3, 1]
self.amax = [1.5, 1.5, 1]
self.vmax = [3, 3, 1]
self.max = [0, 3, 1.5, 3] #[0, 3, 1.5, 3]
# create the array: [vmax; amax; jmax]
self.max_array = np.array([[self.vmax],[self.amax],[self.jmax]]).T
# Desired position array
#if rospy.has_param('sim/waypoints'):
# self.xd = rospy.get_param('sim/waypoints') # waypoints
#elif rospy.get_time() - self.tstart >= 3.0:
# self.xd = np.array([[0],[0],[5.0]]) # make our own if there are no waypoints
#self.xd = Point()
self.get_xd = rospy.ServiceProxy('/status/waypoint_tracker',WaypointTrack)
self.empty_point = Point() # Needed to query waypoint_server
self.des_waypoints = True # True = changing waypoints
# Initial conditions: [pos, vel, acc, jerk]
self.x0 = [0, 0, 0, 0]
self.y0 = [0, 0, 0, 0]
self.z0 = [0, 0, 0, 0]
# Constants for feedback
self.Gd = 0.325
self.td = 2*self.Gd*math.pi/self.wd
# Constants for shape filter/ Input shaping
self.t1 = 0
self.t2 = math.pi/self.wd
self.K = math.exp(-self.epsilon*math.pi/math.sqrt(1 - self.epsilon**2))
self.A1 = 1/(1 + self.K)
self.A2 = self.A1*self.K
# Need to determine how large of any array needs to be stored to use delayed functions
self.SF_delay_x = np.zeros([4,int(round(self.t2/self.dt))]) # Shapefilter delay; 4 - p,v,a,j
self.SF_delay_y = np.zeros([4,int(round(self.t2/self.dt))])
self.phi_fb = np.zeros(int(round(self.td/self.dt))) # Feedback delay
self.phi_vel_fb = np.zeros(int(round(self.td/self.dt)))
self.phi_acc_fb = np.zeros(int(round(self.td/self.dt)))
self.theta_fb = np.zeros(int(round(self.td/self.dt)))
self.theta_vel_fb = np.zeros(int(round(self.td/self.dt)))
self.theta_acc_fb = np.zeros(int(round(self.td/self.dt)))
# Need index to keep track of path array of ^^^^ length
self.SF_idx = 0
self.FB_idx = 0
# Constants for sigmoid
self.at = 3 # acceleration theshold
self.pc = 0.7 # From Klausen 2017
self.sk = len(self.SF_delay_x[0]) # from Klausen 2017
self.ak = np.zeros(len(self.SF_delay_x[0]))
self.s_gain = 0.0 # Gain found from sigmoid
# --------------------------------------------------------------------------------#
# CALLBACK FUNCTIONS
# --------------------------------------------------------------------------------#
def state_cb(self, data):
if self.state.armed != data.armed:
rospy.loginfo("armed state changed from {0} to {1}".format(
self.state.armed, data.armed))
if self.state.connected != data.connected:
rospy.loginfo("connected changed from {0} to {1}".format(
self.state.connected, data.connected))
if self.state.mode != data.mode:
rospy.loginfo("mode changed from {0} to {1}".format(
self.state.mode, data.mode))
if self.state.system_status != data.system_status:
rospy.loginfo("system_status changed from {0} to {1}".format(
mavutil.mavlink.enums['MAV_STATE'][
self.state.system_status].name, mavutil.mavlink.enums[
'MAV_STATE'][data.system_status].name))
self.state = data
# mavros publishes a disconnected state message on init
# Callback to get link names, states, and pload deflection angles
def linkState_cb(self,msg):
try:
self.drone_id = msg.drone_id
self.pload_id = msg.pload_id
self.dr_pos = msg.drone_pos
self.pl_pos = msg.pload_pos
self.phi = -msg.phi
self.phidot = -msg.phidot
self.theta = msg.theta
self.thetadot = msg.thetadot
# self.tetherL = msg.length
except ValueError:
pass
# Callback for drone velocity
def droneVel_cb(self,msg):
try:
self.dr_vel = msg.twist.linear
except ValueError or TypeError:
pass
# Callback for drone accel from IMU data
def droneAcc_cb(self,msg):
try:
self.dr_acc = msg.linear_acceleration
except ValueError:
pass
def waypoints_srv_cb(self):
rospy.wait_for_service('/status/waypoint_tracker')
try:
resp = self.get_xd(False,self.empty_point)
self.xd = resp.xd
except ValueError:
pass
#################################################################
#TODO Will need to add a function that gets a message from #
# a node which lets refsignal_gen.py know there has been a #
# change in xd and therefore runs waypoints_srv_cb again #
#################################################################
# --------------------------------------------------------------------------------#
# ALGORITHM
# --------------------------------------------------------------------------------#
# --------------------------------------------------------------------------------#
# TRAJECTORY GENERATION BASED ON WAYPONTS xd
# --------------------------------------------------------------------------------#
def statespace(self,y,t,xd):
# Update initial conditions #
# Need the statespace array:
pos,vel,acc,jer = y
# Derivative of statesape array:
dydt = [vel, acc, jer,
self.k4*(self.k3*(self.k2*(self.k1*(xd - pos) - vel) - acc) - jer)]
return dydt
# Sigmoid
def sigmoid(self):
for i in range(self.sk):
if math.sqrt(self.SF_delay_x[2,i]**2 + self.SF_delay_y[2,i]**2) < self.at:
self.ak[i] = 1
else:
self.ak[i] = 0
pk = sum(self.ak)/self.sk
self.s_gain = 1/(1 + math.exp(-self.sk*(pk-self.pc)))
def delay(self,j,cmd):
'''
Function to keep track of values in past. Needed for
Shape filtering as well as feedback
j -> tells if we are doing pos, vel, or acc
cmd -> determines SF or FB
'''
if cmd == 1: # Shape filter
if self.SF_idx < len(self.SF_delay_x[0]):
# First, fill up the delay array
self.SF_delay_x[j,self.SF_idx] = self.x[1,j]
self.SF_delay_y[j,self.SF_idx] = self.y[1,j]
else:
# once array is filled, we start using the first value every time
# x
self.SF_delay_x[j,:] = np.roll(self.SF_delay_x[j,:],-1) # makes the first value last
self.SF_delay_x[j,len(self.SF_delay_x[0])-1] = self.x[1,j] # updates last value to current x
# y
self.SF_delay_y[j,:] = np.roll(self.SF_delay_y[j,:],-1)
self.SF_delay_y[j,len(self.SF_delay_y[0])-1] = self.y[1,j]
elif cmd == 2: # Feedback
if self.FB_idx < len(self.theta_fb):
# First, fill up the delay array
self.theta_fb[self.FB_idx] = self.theta
self.theta_vel_fb[self.FB_idx] = self.thetadot
self.theta_acc_fb[self.FB_idx] = self.thetadot - self.theta_vel_fb[self.FB_idx-1]
self.phi_fb[self.FB_idx] = self.phi
self.phi_vel_fb[self.FB_idx] = self.phidot
self.phi_acc_fb[self.FB_idx] = self.phidot - self.phi_vel_fb[self.FB_idx-1]
else:
# once array is filled, need to shift values w/ latest value at end
self.theta_fb[:] = np.roll(self.theta_fb[:],-1)
self.theta_vel_fb[:] = np.roll(self.theta_vel_fb[:],-1)
self.theta_acc_fb[:] = np.roll(self.theta_acc_fb[:],-1)
self.phi_fb[:] = np.roll(self.phi_fb[:],-1)
self.phi_vel_fb[:] = np.roll(self.phi_vel_fb[:],-1)
self.phi_acc_fb[:] = np.roll(self.phi_acc_fb[:],-1)
self.theta_fb[len(self.theta_fb)-1] = self.theta # change final value
self.theta_vel_fb[len(self.theta_fb)-1] = self.thetadot
self.theta_acc_fb[len(self.theta_fb)-1] = self.thetadot - self.theta_vel_fb[len(self.theta_fb)-1]
self.phi_fb[len(self.phi_fb)-1] = self.phi
self.phi_vel_fb[len(self.theta_fb)-1] = self.phidot
self.phi_acc_fb[len(self.theta_fb)-1] = self.phidot - self.phi_vel_fb[len(self.theta_fb)-1]
else:
print('No delay')
# Convolution of shape filter and trajectory, and feedback
def convo(self):
self.waypoints_srv_cb()
# needed for delay function
# 1 = determine shapefilter array
# 2 = determine theta/phi_fb
shapeFil = 1
feedBack = 2
# SOLVE ODE (get ref pos, vel, accel)
self.x = odeint(self.statespace,self.x0,self.t,args=(self.xd.x,))
self.y = odeint(self.statespace,self.y0,self.t,args=(self.xd.y,))
self.z = odeint(self.statespace,self.z0,self.t,args=(self.xd.z,))
for i in range(1,len(self.y0)):
self.x[:,i] = np.clip(self.x[:,i], a_min = -self.max[i], a_max = self.max[i])
self.y[:,i] = np.clip(self.y[:,i], a_min = -self.max[i], a_max = self.max[i])
self.z[:,i] = np.clip(self.z[:,i], a_min = -self.max[i], a_max = self.max[i])
for j in range(3): # 3 is due to pos, vel, acc. NOT due x, y, z
self.delay(j,shapeFil) # Determine the delay (shapefilter) array
if self.SF_idx < len(self.SF_delay_x[0]):
self.EPS_I[3*j] = self.x[1,j]
self.EPS_I[3*j+1] = self.y[1,j]
self.EPS_I[3*j+2] = self.z[1,j]
else:
self.EPS_I[3*j] = self.A1*self.x[1,j] + self.A2*self.SF_delay_x[j,0] # Determine convolution (x)
self.EPS_I[3*j+1] = self.A1*self.y[1,j] + self.A2*self.SF_delay_y[j,0] # Determine convolution (y)
self.EPS_I[3*j+2] = self.z[1,j] # No need to convolute z-dim
self.delay(1,feedBack) # Detemine feedback array
self.sigmoid() # Determine sigmoid gain
EPS_D = self.fback() # Feedback Epsilon
for i in range(9):
# Populate EPS_F buffer with desired change based on feedback
self.EPS_F[i] = self.EPS_I[i] + EPS_D[i] #+ EPS_D[i]
# Populate msg with epsilon_final
self.ref_sig.header.stamp = rospy.Time.now()
self.ref_sig.type_mask = 2 # Need typemask = 2 to use correct attitude controller - Jaeyoung Lin
self.ref_sig.position.x = self.EPS_F[0]
self.ref_sig.position.y = self.EPS_F[1]
self.ref_sig.position.z = self.EPS_F[2] + 0.5
self.ref_sig.velocity.x = self.EPS_F[3]
self.ref_sig.velocity.y = self.EPS_F[4]
self.ref_sig.velocity.z = self.EPS_F[5]
self.ref_sig.acceleration.x = self.EPS_F[6]
self.ref_sig.acceleration.y = self.EPS_F[7]
self.ref_sig.acceleration.z = self.EPS_F[8]
self.x0 = [self.dr_pos.position.x, self.x[1,1], self.x[1,2], self.x[1,3]]
self.y0 = [self.dr_pos.position.y, self.y[1,1], self.y[1,2], self.y[1,3]]
self.z0 = [self.dr_pos.position.z, self.z[1,1], self.z[1,2], self.z[1,3]]
self.SF_idx += 1
self.FB_idx += 1
def fback(self):
xr = self.Gd*self.tetherL*math.sin(self.theta_fb[0])
xdotr = self.Gd*self.tetherL*math.cos(self.theta_fb[0])*self.theta_vel_fb[0]
xddotr = -self.Gd*self.tetherL*math.sin(self.theta_fb[0])*(self.theta_vel_fb[0]**2) + math.cos(self.theta_fb[0])*self.theta_acc_fb[0]
yr = -self.Gd*self.tetherL*math.sin(self.phi_fb[0])
ydotr = -self.Gd*self.tetherL*math.cos(self.phi_fb[0])*self.phi_vel_fb[0]
yddotr = self.Gd*self.tetherL*math.sin(self.phi_fb[0])*self.phi_vel_fb[0]**2 + math.cos(self.phi_fb[0])*self.phi_acc_fb[0]
EPS_D = np.array([xr,yr,0,xdotr,ydotr,0,xddotr,yddotr,0])
return EPS_D
def screen_output(self):
# Feedback to user
rospy.loginfo(' Var | x | y | z ')
rospy.loginfo('Pos: %.2f %.2f %.2f',self.EPS_F[0],self.EPS_F[1],self.EPS_F[2])
rospy.loginfo('Vel: %.2f %.2f %.2f',self.EPS_F[3],self.EPS_F[4],self.EPS_F[5])
rospy.loginfo('Acc: %.2f %.2f %.2f',self.EPS_F[6],self.EPS_F[7],self.EPS_F[8])
rospy.loginfo('_______________________')
def publisher(self,pub_tim):
# Determine final ref signal
self.convo()
# Publish reference signal
self.pub_path.publish(self.ref_sig)
self.pub_ref.publish(self.ref_sig)
# Give user feedback on published message:
rospy.loginfo(' Var | x | y | z ')
rospy.loginfo('Pos: %.2f %.2f %.2f',self.EPS_F[0],self.EPS_F[1],self.EPS_F[2])
rospy.loginfo('Vel: %.2f %.2f %.2f',self.EPS_F[3],self.EPS_F[4],self.EPS_F[5])
rospy.loginfo('Acc: %.2f %.2f %.2f',self.EPS_F[6],self.EPS_F[7],self.EPS_F[8])
rospy.loginfo('_______________________')
if __name__=="__main__":
# Initiate ROS node
rospy.init_node('desPath_node',anonymous=True)
try:
Main() # create class object
rospy.spin() # loop until shutdown signal
except rospy.ROSInterruptException:
pass