139 lines
3.8 KiB
Python
Executable File
139 lines
3.8 KiB
Python
Executable File
#!/usr/bin/env python2.7
|
|
|
|
### Cesar Rodriguez June 2021
|
|
### Runs either a step test or a square trajectory to assess controller performance
|
|
import rospy, tf
|
|
import time
|
|
from geometry_msgs.msg import Point
|
|
from std_msgs.msg import Bool
|
|
from oscillation_ctrl.srv import WaypointTrack
|
|
|
|
class Main:
|
|
# class method used to initialize variables once when the program starts
|
|
def __init__(self):
|
|
|
|
self.tstart = rospy.get_time() # Keep track of the start time
|
|
while self.tstart == 0.0: # Need to make sure get_rostime works
|
|
self.tstart = rospy.get_time()
|
|
|
|
# variable(s)
|
|
self.Point = Point() # Point which will actually be published
|
|
self.xd = Point() # buffer point to determine current location
|
|
|
|
# Init x, y, & z coordinates
|
|
self.Point.x = 0
|
|
self.Point.y = 0
|
|
self.Point.z = 3.5
|
|
|
|
# Test parameters
|
|
# Square
|
|
# self.xarray = [1,2,2,2,1,0,0]
|
|
# self.yarray = [0,0,1,2,2,2,1]
|
|
self.i = 0
|
|
self.j = 0
|
|
self.buffer = 10
|
|
|
|
# Step
|
|
self.step_size = 5.0 # meters
|
|
self.bool = False
|
|
|
|
self.param_exists = False # check in case param does not exist
|
|
while self.param_exists == False:
|
|
if rospy.has_param('status/test_type'):
|
|
self.test = rospy.get_param('status/test_type')
|
|
self.param_exists = True
|
|
elif rospy.get_time() - self.tstart >= 3.0 or rospy.get_param('status/test_type') == 'none':
|
|
self.test = 'none'
|
|
break
|
|
|
|
if rospy.has_param('sim/square_x'):
|
|
self.xarray = rospy.get_param('sim/square_x')
|
|
self.yarray = rospy.get_param('sim/square_y')
|
|
elif rospy.has_param('mocap/square_x'):
|
|
self.xarray = rospy.get_param('mocap/square_x')
|
|
self.yarray = rospy.get_param('mocap/square_y')
|
|
else:
|
|
self.xarray = [1,2,2,2,1,0,0]
|
|
self.yarray = [0,0,1,2,2,2,1]
|
|
|
|
# get waypoints
|
|
self.get_xd = rospy.ServiceProxy('/status/waypoint_tracker',WaypointTrack)
|
|
|
|
# subscriber(s), with specified topic, message type, and class callback method
|
|
rospy.Subscriber('/status/path_follow',Bool, self.wait_cb)
|
|
|
|
# publisher(s), with specified topic, message type, and queue_size
|
|
self.publisher = rospy.Publisher('/reference/waypoints', Point, queue_size = 5)
|
|
|
|
# rate(s)
|
|
pub_rate = 1 # rate for the publisher method, specified in Hz
|
|
|
|
# timer(s), used to control method loop frequencies as defined by the rate(s)
|
|
self.pub_timer = rospy.Timer(rospy.Duration(1.0/pub_rate), self.pub)
|
|
|
|
def wait_cb(self,data):
|
|
self.bool = data
|
|
|
|
def waypoints_client(self):
|
|
rospy.wait_for_service('/status/waypoint_tracker')
|
|
try:
|
|
resp = self.get_xd(False,self.xd)
|
|
self.Point.z = resp.xd.z
|
|
except ValueError:
|
|
pass
|
|
|
|
def square(self):
|
|
# Check if i is too large. loop back to first point
|
|
if self.i >= len(self.xarray):
|
|
self.Point.x = 0
|
|
self.Point.y = 0
|
|
else:
|
|
self.Point.x = self.xarray[self.i]
|
|
self.Point.y = self.yarray[self.i]
|
|
|
|
# self.Point = self.Square_Point
|
|
self.j += 1
|
|
self.i = self.j // self.buffer
|
|
|
|
def step(self):
|
|
# Published desired msgs
|
|
self.Point.x = self.step_size # STEP (meters)
|
|
|
|
def user_feedback(self):
|
|
rospy.loginfo("Sending [Point x] %.2f [Point y] %.2f",
|
|
self.Point.x, self.Point.y)
|
|
|
|
def determine_test(self):
|
|
if self.test == 'step':
|
|
self.step()
|
|
elif self.test == 'square':
|
|
self.square()
|
|
|
|
# Publish messages
|
|
def pub(self,pub_timer):
|
|
if self.bool == False:
|
|
rospy.loginfo('Waiting...')
|
|
self.waypoints_client() # get waypoints while waiting
|
|
elif self.bool == False and self.test == 'none':
|
|
rospy.loginfo('NO TEST WILL BE PERFORMED')
|
|
else:
|
|
# Run either step or square test
|
|
self.determine_test()
|
|
# Publish message
|
|
self.publisher.publish(self.Point)
|
|
# Give user feedback
|
|
self.user_feedback()
|
|
|
|
|
|
if __name__=="__main__":
|
|
|
|
# Initiate ROS node
|
|
rospy.init_node('test_node',anonymous=True)
|
|
try:
|
|
Main() # create class object
|
|
rospy.spin() # loop until shutdown signal
|
|
|
|
except rospy.ROSInterruptException:
|
|
pass
|
|
|