Add 'src/klausen_control.py'

This commit is contained in:
cesar.alejandro 2022-03-01 18:08:27 +00:00
parent 4b12a3ea48
commit bb148364c1
1 changed files with 338 additions and 0 deletions

338
src/klausen_control.py Normal file
View File

@ -0,0 +1,338 @@
#!/usr/bin/env python2.7
### Cesar Rodriguez Aug 2021
### Trajectory controller
### Based off of Klausen 2017
#~~~ Uses generated trajectory and feedback to minimize oscillations
#~~~ Controller determines desired forces on drone to dampen oscillations
#~~~ which are converted into attitude commands
import rospy, tf
import numpy as np
import time
import math
from tf.transformations import *
from scipy.integrate import odeint
from offboard_ex.msg import tethered_status, RefSignal
from controller_msgs.msg import FlatTarget
from geometry_msgs.msg import Point, Pose
from geometry_msgs.msg import TwistStamped, Vector3, Vector3Stamped, PoseStamped, Quaternion
from gazebo_msgs.srv import GetLinkState
from sensor_msgs.msg import Imu
from pymavlink import mavutil
class Main:
def __init__(self):
# rate(s)
rate = 200 # rate for the publisher method, specified in Hz -- 20 Hz
# initialize variables
# time variables
self.dt = 1.0/rate
self.tmax = self.dt
self.n = self.tmax/self.dt + 1
self.t = np.linspace(0, self.tmax, self.n) # Time array
# Msgs types
self.vel_data = TwistStamped() # This is needed to get drone vel from gps
self.imu_data = Imu() # Needed for to get drone acc from IMU
self.path_pos = np.zeros([3,1])
self.path_vel = np.zeros([3,1])
self.path_acc = np.zeros([3,1])
self.pl_pos = Pose()
self.dr_pos = Pose()
self.quaternion = PoseStamped()
self.EulerAng = [0,0,0] # Will find the euler angles, and then convert to q
# Drone var
self.phi = 0.0 # Payload angle of deflection from x-axis
self.phidot = 0.0
self.theta = 0.0 # Payload angle of deflection from y-axis
self.thetadot = 0.0
self.has_run = 0 # Bool to keep track of first run instance
self.drone_id = ''
self.pload_id = ''
# Col1 = theta, theta dot; Col2 = phi, phidot for self.PHI
self.PHI = np.array([[0,0],[0,0]])
self.dr_vel = np.zeros([3,1])
self.dr_angVel = np.zeros([3,1])
self.dr_acc = np.zeros([3,1]) #self.imu_data.linear_acceleration
# Tether var - Check if current method is used
# Get tether length
self.param_exists = False
while self.param_exists == False: # Need to wait until param is ready
if rospy.has_param('sim/tether_length'):
self.tetherL = rospy.get_param('sim/tether_length') # Tether length
self.param_exists = True
self.tether = True # Assume we have a tether
# Tuning gains
self.K1 = np.identity(3)
self.K2 = np.identity(5)
# Values which require updates *_p = prior
self.z1_p = np.zeros([3,1])
self.a45_0 = np.zeros(2)
self.alpha = np.zeros([5,1])
self.alphadot = np.zeros([5,1])
self.a45 = self.alpha[3:5]
self.a45dot = np.array([[0],[0]])
# Error terms
self.z1 = np.zeros([3,1]) # dr_pos - ref_sig_pos
self.z2 = np.zeros([5,1]) # [vx;vy;vz;thetadot;phidot] - alpha
# Constants
self.drone_m = 1.437
self.pl_m = 0.5
self.tot_m = self.drone_m + self.pl_m
self.tune = 1 # Tuning parameter
self.dist = np.array([0,0,0,0.01,0.01]) # Wind disturbance
# --------------------------------------------------------------------------------#
# SUBSCRIBERS
# --------------------------------------------------------------------------------#
# Topic, msg type, and class callback method
rospy.Subscriber('/status/twoBody_status', tethered_status, self.linkState_cb)
rospy.Subscriber('/reference/path', FlatTarget, self.refsig_cb)
rospy.Subscriber('/mavros/local_position/velocity_body', TwistStamped, self.droneVel_cb)
rospy.Subscriber('/mavros/imu/data', Imu, self.droneAcc_cb)
# --------------------------------------------------------------------------------#
# PUBLISHERS
# --------------------------------------------------------------------------------#
self.pub_q = rospy.Publisher('/command/quaternions',PoseStamped,queue_size = 10)
# timer(s), used to control method loop freq(s) as defined by the rate(s)
self.pub_time = rospy.Timer(rospy.Duration(1.0/rate), self.publisher) #f this was 5.0 rate before
# ------------------------------------ _init_ ends ------------------------------ #
# --------------------------------------------------------------------------------#
# CALLBACK FUNCTIONS
# --------------------------------------------------------------------------------#
# Callback to get link names, states, and pload deflection angles
def linkState_cb(self,msg):
try:
self.drone_id = msg.drone_id
self.pload_id = msg.pload_id
self.dr_pos = msg.drone_pos
self.pl_pos = msg.pload_pos
#self.tetherL = msg.length
self.phi = msg.phi
self.phidot = msg.phidot
self.theta = msg.theta
self.thetadot = msg.thetadot
# Populate self.PHI
self.PHI = np.array([[self.theta,self.thetadot],[self.phi,self.phidot]])
except ValueError:
pass
# Callback for drone velocity ####### IS THIS ON? ##########
def droneVel_cb(self,msg):
try:
self.dr_vel = np.array([[msg.twist.linear.x],[msg.twist.linear.y],[msg.twist.linear.z]])
self.dr_angVel = np.array([[msg.twist.angular.x],[msg.twist.angular.y],[msg.twist.angular.z]])
except ValueError or TypeError:
pass
# Callback for drone accel from IMU data
def droneAcc_cb(self,msg):
try:
self.dr_acc = np.array([[msg.linear_acceleration.x],[msg.linear_acceleration.y],[msg.linear_acceleration.z]])
except ValueError:
pass
# Callback reference signal
def refsig_cb(self,msg):
try:
self.path_pos = np.array([[msg.position.x],[msg.position.y],[msg.position.z]])
self.path_vel = np.array([[msg.velocity.x],[msg.velocity.y],[msg.velocity.z]])
self.path_acc = np.array([[msg.acceleration.x],[msg.acceleration.y],[msg.acceleration.z + 9.81]])
except ValueError:
pass
# Check if vehicle has tether
def tether_check(self):
if self.tether == True:
rospy.loginfo_once('USING TETHER MODEL')
else:
rospy.loginfo_once('NO TETHER DETECTED')
# ---------------------------------ODE SOLVER-------------------------------------#
def statespace(self,y,t,Ka,Kb,Kc):
# Need the statespace array:
a1,a2 = y
K = np.dot(Ka,[[a1],[a2]]) + Kb
# Derivative of statespace array:
dydt = np.dot(Kc,K)
dydt = [dydt[0][0], dydt[1][0]] # og dydt is list of arrays, need list of floats
return dydt
# --------------------------------------------------------------------------------#
def control(self):
# Populate position vector and gamma (g). g is state space vector: [px,py,pz,theta,phi]
p = np.array([[self.dr_pos.position.x],[self.dr_pos.position.y],[self.dr_pos.position.z]])
g = np.array([[self.dr_pos.position.x],[self.dr_pos.position.y],[self.dr_pos.position.z],[self.theta],[self.phi]])
# State some variables for shorthand
L = self.tetherL
c_theta = math.cos(self.theta)
c_phi = math.cos(self.phi)
s_theta = math.sin(self.theta)
s_phi = math.sin(self.phi)
# Check for tether
if L <= 0.01:
self.tether = False
else:
self.tether = True
# Check if tether was correctly detected
self.tether_check()
# Control matrices - this may be better in _init_
M = [[self.tot_m, 0, 0, 0, L*self.pl_m*c_theta],
[0, self.tot_m, 0, -L*self.pl_m*c_phi*c_theta, L*self.pl_m*s_phi*s_theta],
[0, 0, self.tot_m, -L*self.pl_m*c_theta*s_phi, -L*self.pl_m*c_phi*s_theta],
[0, -L*self.pl_m*c_phi*c_theta,-L*self.pl_m*c_theta*s_phi, (L**2)*self.pl_m*c_theta**2 + 0.01*s_theta**2, 0],
[L*self.pl_m*c_theta, L*self.pl_m*s_phi*s_theta, -L*self.pl_m*c_phi*s_theta, 0, L**2*self.pl_m]]
C = [[0,0,0,0,-L*self.thetadot*self.pl_m*s_theta],
[0,0,0,L*self.pl_m*(self.phidot*c_theta*s_phi + self.thetadot*c_phi*s_theta), L*self.pl_m*(self.phidot*c_phi*s_theta + self.thetadot*c_theta*s_phi)],
[0,0,0,-L*self.pl_m*(self.phidot*c_phi*c_theta - self.thetadot*s_phi*s_theta),-L*self.pl_m*(self.thetadot*c_phi*c_theta - self.phidot*s_phi*s_theta)],
[0,0,0,-0.5*(L**2)*self.pl_m*self.thetadot*math.sin(2*self.theta) + 0.5*0.01*self.thetadot*math.sin(2*self.theta), -0.5*(L**2)*self.pl_m*self.phidot*math.sin(2*self.theta)],
[0,0,0,0.5*(L**2)*self.pl_m*self.phidot*math.sin(2*self.theta),0]]
G = [[0],[0],[-9.81*self.tot_m],[L*9.81*self.pl_m*c_theta*s_phi],[L*9.81*self.pl_m*c_phi*s_theta]]
H = [[1,0,0,0,0],[0,1,0,0,0],[0,0,1,0,0]]
D = np.diag(self.dist) # Already in array format
# Need to convert to useful format -> np.array
M = np.array(M)
C = np.array(C)
G = np.array(G)
H = np.array(H)
# Shorthand for matrices which will be used
M_a = M[3:5,3:5] # M_alpha - rows 4 to 5 and columns 4 to 5 ... looks wrong its not
C_a = C[3:5,3:5]
D_a = D[3:5,3:5]
G_a = G[3:5]
M_b = M[3:5,:3] # M_4:5,1:3 - rows 4 to 5 and columns 1 to 3
M_c = M[:3,3:5] # M_1:3,4:5 - rows 1 to 3 and columns 4 to 5
C_c = C[:3,3:5]
Ka = -(D_a + C_a + self.K2[3:5,3:5])
Kb = -G_a + np.dot(self.K2[3:5,3:5],self.PHI[:,1]) - np.dot(M_b,self.path_acc - np.dot(self.K1,self.dr_vel - self.path_vel))
# Determine alpha
if self.tether:
M_aI = np.linalg.inv(M_a) # Inverse of M_a
# SOLVE ODE (get alpha)
# Populate buffer
self.a45_buff = odeint(self.statespace,self.a45_0,self.t,args=(Ka,Kb,M_aI))
# Update a45_0 to new a45. Need to transpose to column vector
self.a45_0 = self.a45_buff[1,:]
self.a45 = np.array([[self.a45_0[0]],[self.a45_0[1]]])
# Get alphadot_4:5
self.a45dot = self.statespace(self.a45_0,1,Ka,Kb,M_aI) # Do not need 't' and that's why it is a 1
self.a45dot = np.array([[self.a45dot[0]],[self.a45dot[1]]])
else: # if no tether, alpha_4:5 = 0
self.a45 = np.array([[0],[0]])
self.a45dot = np.array([[0],[0]])
# Determine a_1:3
self.alpha[:3] = self.path_vel - np.dot(self.K1,p - self.path_pos)
# populate error terms
self.z1 = p - self.path_pos
z1_dot = self.dr_vel - self.path_vel
self.z2 = g - self.alpha
B = np.dot(C_c,self.a45) + np.dot(M_c,self.a45dot)
# Gain terms
Kp = np.identity(3) + np.dot(self.K2[:3,:3],self.K1) + self.tune*np.identity(3)
Kd = self.tot_m*self.K1 + self.K2[:3,:3]
Ki = self.tune*self.K1
# Desired body-oriented forces
Fd = B + G[:3] + self.tot_m*self.dr_acc - np.dot(Kd,z1_dot) - np.dot(Kp,self.z1) - np.dot(Ki,self.dt*(self.z1 - self.z1_p))
# Update self.z1_p for "integration"
self.z1_p = self.z1
# Covert Fd into drone frame
dr_orientation = [self.dr_pos.orientation.x, self.dr_pos.orientation.y, self.dr_pos.orientation.z, self.dr_pos.orientation.w]
dr_orientation_inv = quaternion_inverse(dr_orientation)
Fd_tf = quaternion_multiply(dr_orientation,quaternion_multiply([Fd[0],Fd[1],Fd[2],0],dr_orientation_inv)) # Real part of Fd needs = 0
# Convert forces to attiude *EulerAng[2] = yaw = 0
self.EulerAng[1] = math.atan(-Fd_tf[0]/(self.drone_m*9.81)) # Pitch -- maybe
self.EulerAng[0] = math.atan(Fd_tf[1]*math.cos(self.EulerAng[1])/(self.drone_m*9.81)) # Roll -- maybe
# Convert Euler angles to quaternions
q = quaternion_from_euler(self.EulerAng[0],self.EulerAng[1],self.EulerAng[2])
# Populate msg variable
# Attitude control
self.quaternion.header.stamp = rospy.Time.now()
self.quaternion.pose.position.x = 0
self.quaternion.pose.position.y = 0
self.quaternion.pose.position.z = 5.0
self.quaternion.pose.orientation.x = q[0]
self.quaternion.pose.orientation.y = q[1]
self.quaternion.pose.orientation.z = q[2]
self.quaternion.pose.orientation.w = q[3]
def publisher(self,pub_time):
self.control()
self.pub_q.publish(self.quaternion)
# rospy.loginfo('roll: %.2f\npitch: %.2f\n',self.EulerAng[0],self.EulerAng[1])
# --------------------------------------------------------------------------------#
# ALGORITHM
# --------------------------------------------------------------------------------#
if __name__=="__main__":
# Initiate ROS node
rospy.init_node('trajectoryCtrl',anonymous=True)
try:
Main() # create class object
rospy.spin() # loop until shutdown signal
except rospy.ROSInterruptException:
pass