forked from cesar.alejandro/oscillation_ctrl
Delete 'src/MoCap_Localization_fake.py'
This commit is contained in:
parent
302b3cf53e
commit
afc267bd16
|
@ -1,239 +0,0 @@
|
||||||
#!/usr/bin/env python2.7
|
|
||||||
|
|
||||||
### Cesar Rodriguez Mar 2022
|
|
||||||
### Script to simulate mocap readings and see how PX4 behaves
|
|
||||||
|
|
||||||
import rospy, tf
|
|
||||||
import rosservice
|
|
||||||
import time
|
|
||||||
import math
|
|
||||||
import random
|
|
||||||
from tf.transformations import *
|
|
||||||
from oscillation_ctrl.msg import TetheredStatus, LoadAngles
|
|
||||||
from geometry_msgs.msg import PoseStamped
|
|
||||||
from gazebo_msgs.srv import GetLinkState
|
|
||||||
from std_msgs.msg import Bool
|
|
||||||
|
|
||||||
class Main:
|
|
||||||
|
|
||||||
def __init__(self):
|
|
||||||
|
|
||||||
# rate(s)
|
|
||||||
pub_rate = 50 # rate for the publisher method, specified in Hz -- 20 Hz
|
|
||||||
loc_rate = 60 # rate we want to localize vehicle -- 60 Hz
|
|
||||||
self.dt = 1.0/loc_rate
|
|
||||||
|
|
||||||
self.user_fback = True
|
|
||||||
|
|
||||||
rospy.sleep(5) # Sleep for 5 sec. Need to give time to Gazebo to run
|
|
||||||
|
|
||||||
# Variables needed for testing start
|
|
||||||
self.tstart = rospy.get_time() # Keep track of the start time
|
|
||||||
while self.tstart == 0.0: # Need to make sure get_rostime works
|
|
||||||
self.tstart = rospy.get_time()
|
|
||||||
|
|
||||||
# initialize variables
|
|
||||||
self.tetherL = 0.0 # Tether length
|
|
||||||
self.has_run = False # Boolean to keep track of first run instance
|
|
||||||
self.phibuf = 0.0 # Need buffers to determine their rates
|
|
||||||
self.thetabuf = 0.0 #
|
|
||||||
self.pload = False # Check if payload exists
|
|
||||||
# Max dot values to prevent 'blowup'
|
|
||||||
self.angledot_max = 2.0
|
|
||||||
self.drone_eul = [0.0,0.0,0.0]
|
|
||||||
|
|
||||||
# variables for message gen
|
|
||||||
#self.buff_pose1 = PoseStamped()
|
|
||||||
self.drone_pose = PoseStamped()
|
|
||||||
self.pload_pose = PoseStamped()
|
|
||||||
self.load_angles = LoadAngles()
|
|
||||||
self.twobody_status = TetheredStatus()
|
|
||||||
self.drone_id = 'spiri_with_tether::spiri::base'
|
|
||||||
self.pload_id = 'spiri_with_tether::mass::payload'
|
|
||||||
|
|
||||||
# service(s)
|
|
||||||
self.service1 = '/gazebo/get_link_state'
|
|
||||||
self.service2 = '/gazebo/set_link_properties'
|
|
||||||
|
|
||||||
|
|
||||||
# need service list to check if models have spawned
|
|
||||||
self.service_list = rosservice.get_service_list()
|
|
||||||
|
|
||||||
# wait for service to exist
|
|
||||||
while self.service1 not in self.service_list:
|
|
||||||
print ("Waiting for models to spawn...")
|
|
||||||
self.service_list = rosservice.get_service_list()
|
|
||||||
if rospy.get_time() - self.tstart >= 10.0:
|
|
||||||
break
|
|
||||||
|
|
||||||
# publisher(s)
|
|
||||||
self.twobody_pub = rospy.Publisher('/status/twoBody_status', TetheredStatus, queue_size=1)
|
|
||||||
self.loadAng_pub = rospy.Publisher('/status/load_angles', LoadAngles, queue_size=1)
|
|
||||||
#self.pub_wd = rospy.Publisher('/status/path_follow', Bool, queue_size=1)
|
|
||||||
|
|
||||||
#self.pub_timer = rospy.Timer(rospy.Duration(1.0/rate), self.link_state)
|
|
||||||
#self.path_timer = rospy.Timer(rospy.Duration(40.0/rate), self.path_follow)
|
|
||||||
|
|
||||||
### Since there is no tether, we can publish directly to mavros
|
|
||||||
self.visionPose_pub = rospy.Publisher('/mavros/vision_pose/pose', PoseStamped, queue_size=1)
|
|
||||||
|
|
||||||
self.loc_timer = rospy.Timer(rospy.Duration(1.0/loc_rate), self.mocap_localize)
|
|
||||||
self.pub_timer = rospy.Timer(rospy.Duration(1.0/pub_rate), self.publisher)
|
|
||||||
|
|
||||||
# subscriber(s)
|
|
||||||
|
|
||||||
def euler_array(self,orientation):
|
|
||||||
"""
|
|
||||||
Takes in pose msg object and outputs array of euler angs:
|
|
||||||
eul[0] = Roll
|
|
||||||
eul[1] = Pitch
|
|
||||||
eul[2] = Yaw
|
|
||||||
"""
|
|
||||||
eul = euler_from_quaternion([orientation.x,
|
|
||||||
orientation.y,
|
|
||||||
orientation.z,
|
|
||||||
orientation.w])
|
|
||||||
return eul
|
|
||||||
|
|
||||||
def mocap_localize(self,loc_timer):
|
|
||||||
"""
|
|
||||||
Uses Gazebo to simulate MoCap
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
# State which service we are querying
|
|
||||||
get_P = rospy.ServiceProxy(self.service1,GetLinkState)
|
|
||||||
|
|
||||||
# Set reference frame
|
|
||||||
reference = '' # world ref frame
|
|
||||||
|
|
||||||
# Establish links needed --> Spiri base and payload
|
|
||||||
# P = Position vector
|
|
||||||
drone_P = get_P(self.drone_id,reference)
|
|
||||||
|
|
||||||
# Check if payload is part of simulation
|
|
||||||
if not drone_P.success:
|
|
||||||
self.drone_id = 'spiri_mocap::base'
|
|
||||||
drone_P = get_P(self.drone_id,reference) # i.e. no payload
|
|
||||||
|
|
||||||
self.drone_P = drone_P
|
|
||||||
pload_P = get_P(self.pload_id,reference)
|
|
||||||
if pload_P.success: self.pload = True
|
|
||||||
if not self.has_run:
|
|
||||||
if self.pload == True:
|
|
||||||
# Get tether length based off initial displacement
|
|
||||||
self.tetherL = math.sqrt((drone_P.link_state.pose.position.x -
|
|
||||||
pload_P.link_state.pose.position.x)**2 +
|
|
||||||
(drone_P.link_state.pose.position.y -
|
|
||||||
pload_P.link_state.pose.position.y)**2 +
|
|
||||||
(drone_P.link_state.pose.position.z -
|
|
||||||
pload_P.link_state.pose.position.z)**2)
|
|
||||||
rospy.set_param('status/tether_length',self.tetherL)
|
|
||||||
|
|
||||||
else:
|
|
||||||
self.tetherL = 0.0
|
|
||||||
self.has_run = True
|
|
||||||
|
|
||||||
# Need to detemine their location to get angle of deflection
|
|
||||||
# Drone
|
|
||||||
drone_Px = drone_P.link_state.pose.position.x
|
|
||||||
drone_Py = drone_P.link_state.pose.position.y
|
|
||||||
# Get drone orientation
|
|
||||||
|
|
||||||
if self.pload == True: # If there is payload, determine the variables
|
|
||||||
self.twobody_status.pload = True
|
|
||||||
# Pload
|
|
||||||
pload_Px = pload_P.link_state.pose.position.x
|
|
||||||
pload_Py = pload_P.link_state.pose.position.y
|
|
||||||
|
|
||||||
# Determine theta (pitch)
|
|
||||||
x_sep = pload_Px - drone_Px
|
|
||||||
|
|
||||||
if math.fabs(x_sep) >= self.tetherL or x_sep == 0:
|
|
||||||
self.load_angles.theta = 0
|
|
||||||
else:
|
|
||||||
self.load_angles.theta = math.asin(x_sep/self.tetherL)
|
|
||||||
|
|
||||||
# Determine thetadot
|
|
||||||
# self.load_angles.thetadot = min(self.angledot_max,max((self.load_angles.theta - self.thetabuf)/self.dt,-self.angledot_max))
|
|
||||||
self.load_angles.thetadot = (self.load_angles.theta - self.thetabuf)/self.dt
|
|
||||||
self.thetabuf = self.load_angles.theta
|
|
||||||
|
|
||||||
# Determine phi (roll)
|
|
||||||
y_sep = pload_Py - drone_Py
|
|
||||||
|
|
||||||
if math.fabs(y_sep) >= self.tetherL or y_sep == 0:
|
|
||||||
self.load_angles.phi = 0
|
|
||||||
else:
|
|
||||||
self.load_angles.phi = -math.asin(y_sep/self.tetherL)
|
|
||||||
|
|
||||||
# Determine phidot
|
|
||||||
# self.load_angles.phidot = min(self.angledot_max,max((self.load_angles.phi - self.phibuf)/self.dt,-self.angledot_max))
|
|
||||||
self.load_angles.phidot = (self.load_angles.phi - self.phibuf)/self.dt
|
|
||||||
self.phibuf = self.load_angles.phi # Update buffer
|
|
||||||
|
|
||||||
# save pload position
|
|
||||||
self.twobody_status.pload_pos = pload_P.link_state.pose
|
|
||||||
self.pload_pose.pose = self.twobody_status.pload_pos
|
|
||||||
else: # Otherwise, vars = 0
|
|
||||||
x_sep = self.load_angles.phi = self.load_angles.phidot = self.load_angles.theta = self.load_angles.thetadot = 0
|
|
||||||
|
|
||||||
# Populate message
|
|
||||||
#self.status.drone_pos = drone_P.link_state.pose
|
|
||||||
self.drone_pose.pose = drone_P.link_state.pose
|
|
||||||
self.twobody_status.drone_pos = drone_P.link_state.pose
|
|
||||||
|
|
||||||
except rospy.ServiceException as e:
|
|
||||||
rospy.loginfo("Get Link State call failed: {0}".format(e))
|
|
||||||
|
|
||||||
def add_noise(self):
|
|
||||||
# self.drone_pose.pose.position.x = self.drone_pose.pose.position.x
|
|
||||||
# self.drone_pose.pose.position.y = self.drone_pose.pose.position.y
|
|
||||||
# self.drone_pose.pose.position.z = self.drone_pose.pose.position.z
|
|
||||||
self.drone_pose.pose.orientation.x = self.drone_pose.pose.orientation.x + random.uniform(0,0.004)
|
|
||||||
self.drone_pose.pose.orientation.y = self.drone_pose.pose.orientation.y + random.uniform(0,0.004)
|
|
||||||
self.drone_pose.pose.orientation.z = self.drone_pose.pose.orientation.z + random.uniform(0,0.004)
|
|
||||||
self.drone_pose.pose.orientation.w = self.drone_pose.pose.orientation.w + random.uniform(0,0.004)
|
|
||||||
|
|
||||||
def publisher(self,pub_timer):
|
|
||||||
# add noise to signal
|
|
||||||
self.add_noise()
|
|
||||||
# fill out necesssary fields
|
|
||||||
self.drone_pose.header.frame_id = "/map"
|
|
||||||
self.drone_pose.header.stamp = rospy.Time.now()
|
|
||||||
self.load_angles.header.stamp = rospy.Time.now()
|
|
||||||
# publish
|
|
||||||
self.visionPose_pub.publish(self.drone_pose) # publish pose to mavros
|
|
||||||
self.loadAng_pub.publish(self.load_angles) # publish load angles to controller
|
|
||||||
self.twobody_pub.publish(self.twobody_status) # actual pose. Redundant but nice to have
|
|
||||||
# get euler array for user feedback
|
|
||||||
self.drone_eul = self.euler_array(self.drone_pose.pose.orientation)
|
|
||||||
self.user_feedback()
|
|
||||||
|
|
||||||
def user_feedback(self):
|
|
||||||
if self.user_fback:
|
|
||||||
print ("\n")
|
|
||||||
rospy.loginfo("")
|
|
||||||
print ("drone pos.x: " + str(round(self.drone_pose.pose.position.x,2)))
|
|
||||||
print ("drone pos.y: " + str(round(self.drone_pose.pose.position.y,2)))
|
|
||||||
print ("drone pos.z: " + str(round(self.drone_pose.pose.position.z,2)))
|
|
||||||
print ("Roll: " + str(round(self.drone_eul[0]*180/3.14,2)))
|
|
||||||
print ("Pitch: " + str(round(self.drone_eul[1]*180/3.14,2)))
|
|
||||||
print ("Yaw: " + str(round(self.drone_eul[2]*180/3.14,2)))
|
|
||||||
if self.pload:
|
|
||||||
print("Tether length: " + str(round(self.tetherL,2)))
|
|
||||||
print("Theta: " + str(round(self.load_angles.theta*180/3.14,2)))
|
|
||||||
print("Phi: " + str(round(self.load_angles.phi*180/3.14,2)))
|
|
||||||
else:
|
|
||||||
rospy.loginfo_once(self.tetherL)
|
|
||||||
|
|
||||||
if __name__=="__main__":
|
|
||||||
|
|
||||||
# Initiate ROS node
|
|
||||||
rospy.init_node('MoCap_node',anonymous=False)
|
|
||||||
try:
|
|
||||||
Main() # create class object
|
|
||||||
rospy.spin() # loop until shutdown signal
|
|
||||||
|
|
||||||
except rospy.ROSInterruptException:
|
|
||||||
pass
|
|
||||||
|
|
Loading…
Reference in New Issue