oscillation_ctrl/src/perform_test.py

130 lines
3.5 KiB
Python
Raw Normal View History

2022-04-05 11:00:59 -03:00
#!/usr/bin/env python2.7
### Cesar Rodriguez June 2021
2022-04-18 10:10:17 -03:00
### Runs either a step test or a square trajectory to assess controller performance
2022-04-05 11:00:59 -03:00
import rospy, tf
import time
from geometry_msgs.msg import Point
from std_msgs.msg import Bool
2022-04-18 10:10:17 -03:00
from oscillation_ctrl.srv import WaypointTrack
2022-04-05 11:00:59 -03:00
class Main:
# class method used to initialize variables once when the program starts
def __init__(self):
self.tstart = rospy.get_time() # Keep track of the start time
while self.tstart == 0.0: # Need to make sure get_rostime works
self.tstart = rospy.get_time()
# variable(s)
self.Point = Point() # Point which will actually be published
2022-04-18 10:10:17 -03:00
self.xd = Point() # buffer point to determine current location
2022-04-05 11:00:59 -03:00
# Init x, y, & z coordinates
self.Point.x = 0
self.Point.y = 0
self.Point.z = 3.5
2022-04-18 10:10:17 -03:00
# Test parameters
# Square
2022-04-05 11:00:59 -03:00
self.xarray = [1,2,2,2,1,0,0]
self.yarray = [0,0,1,2,2,2,1]
self.i = 0
self.j = 0
self.buffer = 10
2022-04-18 10:10:17 -03:00
# Step
self.step_size = 5.0 # meters
2022-04-05 11:00:59 -03:00
self.bool = False
self.param_exists = False # check in case param does not exist
while self.param_exists == False:
if rospy.has_param('status/test_type'):
self.test = rospy.get_param('status/test_type')
self.param_exists = True
elif rospy.get_time() - self.tstart >= 3.0 or rospy.get_param('status/test_type') == 'none':
self.test = 'none'
break
2022-04-18 10:10:17 -03:00
# get waypoints
self.get_xd = rospy.ServiceProxy('/status/waypoint_tracker',WaypointTrack)
2022-04-05 11:00:59 -03:00
# subscriber(s), with specified topic, message type, and class callback method
rospy.Subscriber('/status/path_follow',Bool, self.wait_cb)
# publisher(s), with specified topic, message type, and queue_size
self.publisher = rospy.Publisher('/reference/waypoints', Point, queue_size = 5)
# rate(s)
pub_rate = 1 # rate for the publisher method, specified in Hz
# timer(s), used to control method loop frequencies as defined by the rate(s)
self.pub_timer = rospy.Timer(rospy.Duration(1.0/pub_rate), self.pub)
def wait_cb(self,data):
self.bool = data
2022-04-18 10:10:17 -03:00
def waypoints_client(self):
2022-04-05 11:00:59 -03:00
rospy.wait_for_service('/status/waypoint_tracker')
try:
resp = self.get_xd(False,self.xd)
self.Point.z = resp.xd.z
except ValueError:
pass
def square(self):
# Check if i is too large. loop back to first point
if self.i >= len(self.xarray):
self.Point.x = 0
self.Point.y = 0
else:
self.Square_Point.x = self.xarray[self.i]
self.Square_Point.y = self.yarray[self.i]
self.Point = self.Square_Point
self.j += 1
self.i = self.j // self.buffer
def step(self):
# Published desired msgs
2022-04-18 10:10:17 -03:00
self.Point.x = self.step_size # STEP (meters)
2022-04-05 11:00:59 -03:00
def user_feedback(self):
rospy.loginfo("Sending [Point x] %d [Point y] %d",
self.Point.x, self.Point.y)
def determine_test(self):
if self.test == 'step':
self.step()
elif self.test == 'square':
self.square()
# Publish messages
def pub(self,pub_timer):
if self.bool == False:
rospy.loginfo('Waiting...')
2022-04-18 10:10:17 -03:00
self.waypoints_client() # get waypoints while waiting
2022-04-05 11:00:59 -03:00
elif self.bool == False and self.test == 'none':
rospy.loginfo('NO TEST WILL BE PERFORMED')
else:
# Run either step or square test
self.determine_test()
# Publish message
self.publisher.publish(self.Point)
# Give user feedback
self.user_feedback()
if __name__=="__main__":
# Initiate ROS node
rospy.init_node('test_node',anonymous=True)
try:
Main() # create class object
rospy.spin() # loop until shutdown signal
except rospy.ROSInterruptException:
pass