oscillation_ctrl/src/perform_test.py

122 lines
3.2 KiB
Python
Raw Normal View History

2022-04-05 11:00:59 -03:00
#!/usr/bin/env python2.7
### Cesar Rodriguez June 2021
### Script to generate set points which form a square with 2m side lengths
import rospy, tf
import time
from geometry_msgs.msg import Point
from std_msgs.msg import Bool
class Main:
# class method used to initialize variables once when the program starts
def __init__(self):
self.tstart = rospy.get_time() # Keep track of the start time
while self.tstart == 0.0: # Need to make sure get_rostime works
self.tstart = rospy.get_time()
# variable(s)
self.Point = Point() # Point which will actually be published
self.xd = Point() # buffer point to determine current location
# Init x, y, & z coordinates
self.Point.x = 0
self.Point.y = 0
self.Point.z = 3.5
self.xarray = [1,2,2,2,1,0,0]
self.yarray = [0,0,1,2,2,2,1]
self.i = 0
self.j = 0
self.buffer = 10
self.bool = False
self.param_exists = False # check in case param does not exist
while self.param_exists == False:
if rospy.has_param('status/test_type'):
self.test = rospy.get_param('status/test_type')
self.param_exists = True
elif rospy.get_time() - self.tstart >= 3.0 or rospy.get_param('status/test_type') == 'none':
self.test = 'none'
break
# subscriber(s), with specified topic, message type, and class callback method
rospy.Subscriber('/status/path_follow',Bool, self.wait_cb)
# publisher(s), with specified topic, message type, and queue_size
self.publisher = rospy.Publisher('/reference/waypoints', Point, queue_size = 5)
# rate(s)
pub_rate = 1 # rate for the publisher method, specified in Hz
# timer(s), used to control method loop frequencies as defined by the rate(s)
self.pub_timer = rospy.Timer(rospy.Duration(1.0/pub_rate), self.pub)
def wait_cb(self,data):
self.bool = data
def waypoints_srv_cb(self):
rospy.wait_for_service('/status/waypoint_tracker')
try:
resp = self.get_xd(False,self.xd)
self.Point.z = resp.xd.z
except ValueError:
pass
def square(self):
# Check if i is too large. loop back to first point
if self.i >= len(self.xarray):
self.Point.x = 0
self.Point.y = 0
else:
self.Square_Point.x = self.xarray[self.i]
self.Square_Point.y = self.yarray[self.i]
self.Point = self.Square_Point
self.j += 1
self.i = self.j // self.buffer
def step(self):
# Published desired msgs
self.Point.x = 5 # STEP (meters)
def user_feedback(self):
rospy.loginfo("Sending [Point x] %d [Point y] %d",
self.Point.x, self.Point.y)
def determine_test(self):
if self.test == 'step':
self.step()
elif self.test == 'square':
self.square()
# Publish messages
def pub(self,pub_timer):
if self.bool == False:
rospy.loginfo('Waiting...')
elif self.bool == False and self.test == 'none':
rospy.loginfo('NO TEST WILL BE PERFORMED')
else:
# Run either step or square test
self.determine_test()
# Publish message
self.publisher.publish(self.Point)
# Give user feedback
self.user_feedback()
if __name__=="__main__":
# Initiate ROS node
rospy.init_node('test_node',anonymous=True)
try:
Main() # create class object
rospy.spin() # loop until shutdown signal
except rospy.ROSInterruptException:
pass