diff --git a/integrationtests/python_src/px4_it/mavros/mavros_offboard_attctl_test.py b/integrationtests/python_src/px4_it/mavros/mavros_offboard_attctl_test.py index 7dbe7ed5bd..6c6fa2f3c0 100755 --- a/integrationtests/python_src/px4_it/mavros/mavros_offboard_attctl_test.py +++ b/integrationtests/python_src/px4_it/mavros/mavros_offboard_attctl_test.py @@ -42,16 +42,13 @@ PKG = 'px4' import unittest import rospy -import rosbag -import time - -from std_msgs.msg import Header -from std_msgs.msg import Float64 -from geometry_msgs.msg import PoseStamped, Quaternion from tf.transformations import quaternion_from_euler -from mavros_msgs.srv import CommandLong +from geometry_msgs.msg import PoseStamped, Quaternion, Vector3 +from mavros_msgs.msg import AttitudeTarget, State +from mavros_msgs.srv import CommandBool, SetMode from sensor_msgs.msg import NavSatFix -#from px4_test_helper import PX4TestHelper +from std_msgs.msg import Header + class MavrosOffboardAttctlTest(unittest.TestCase): """ @@ -62,23 +59,26 @@ class MavrosOffboardAttctlTest(unittest.TestCase): """ def setUp(self): - rospy.init_node('test_node', anonymous=True) - rospy.wait_for_service('mavros/cmd/arming', 30) - #self.helper = PX4TestHelper("mavros_offboard_attctl_test") - #self.helper.setUp() - - rospy.Subscriber("mavros/local_position/pose", PoseStamped, self.position_callback) - rospy.Subscriber("mavros/global_position/global", NavSatFix, self.global_position_callback) - self.pub_att = rospy.Publisher('mavros/setpoint_attitude/attitude', PoseStamped, queue_size=10) - self.pub_thr = rospy.Publisher('mavros/setpoint_attitude/att_throttle', Float64, queue_size=10) - rospy.wait_for_service('mavros/cmd/command', 30) - self._srv_cmd_long = rospy.ServiceProxy('mavros/cmd/command', CommandLong, persistent=True) - self.rate = rospy.Rate(10) # 10hz + self.rate = rospy.Rate(10) # 10hz self.has_global_pos = False self.local_position = PoseStamped() + self.state = State() + + # setup ROS topics and services + rospy.wait_for_service('mavros/cmd/arming', 30) + rospy.wait_for_service('mavros/set_mode', 30) + self.set_arming_srv = rospy.ServiceProxy('mavros/cmd/arming', + CommandBool) + self.set_mode_srv = rospy.ServiceProxy('mavros/set_mode', SetMode) + rospy.Subscriber('mavros/local_position/pose', PoseStamped, + self.position_callback) + rospy.Subscriber('mavros/global_position/global', NavSatFix, + self.global_position_callback) + rospy.Subscriber('mavros/state', State, self.state_callback) + self.att_setpoint_pub = rospy.Publisher( + 'mavros/setpoint_raw/attitude', AttitudeTarget, queue_size=10) def tearDown(self): - #self.helper.tearDown() pass # @@ -90,62 +90,90 @@ class MavrosOffboardAttctlTest(unittest.TestCase): def global_position_callback(self, data): self.has_global_pos = True - def test_attctl(self): - """Test offboard attitude control""" + def state_callback(self, data): + self.state = data - # FIXME: hack to wait for simulation to be ready + # + # Helper methods + # + def wait_until_ready(self): + """FIXME: hack to wait for simulation to be ready""" + rospy.loginfo("waiting for global position") while not self.has_global_pos: self.rate.sleep() + # + # Test method + # + def test_attctl(self): + """Test offboard attitude control""" + + self.wait_until_ready() + # set some attitude and thrust - att = PoseStamped() + att = AttitudeTarget() att.header = Header() att.header.frame_id = "base_footprint" att.header.stamp = rospy.Time.now() - quaternion = quaternion_from_euler(0.25, 0.25, 0) - att.pose.orientation = Quaternion(*quaternion) + att.orientation = Quaternion(*quaternion_from_euler(0.25, 0.25, 0)) + att.body_rate = Vector3() + att.thrust = 0.7 + att.type_mask = 7 - throttle = Float64() - throttle.data = 0.7 - armed = False - - # does it cross expected boundaries in X seconds? - count = 0 - timeout = 120 - while count < timeout: - # update timestamp for each published SP + # send some setpoints before starting + for i in xrange(20): att.header.stamp = rospy.Time.now() - - self.pub_att.publish(att) - #self.helper.bag_write('mavros/setpoint_attitude/attitude', att) - self.pub_thr.publish(throttle) - #self.helper.bag_write('mavros/setpoint_attitude/att_throttle', throttle) + self.att_setpoint_pub.publish(att) self.rate.sleep() - # FIXME: arm and switch to offboard - # (need to wait the first few rounds until PX4 has the offboard stream) - if not armed and count > 5: - self._srv_cmd_long(False, 176, False, - 1, 6, 0, 0, 0, 0, 0) - # make sure the first command doesn't get lost - time.sleep(1) + rospy.loginfo("set mission mode and arm") + while self.state.mode != "OFFBOARD" or not self.state.armed: + if self.state.mode != "OFFBOARD": + try: + self.set_mode_srv(0, 'OFFBOARD') + except rospy.ServiceException as e: + rospy.logerr(e) + if not self.state.armed: + try: + self.set_arming_srv(True) + except rospy.ServiceException as e: + rospy.logerr(e) + rospy.sleep(2) - self._srv_cmd_long(False, 400, False, - # arm - 1, 0, 0, 0, 0, 0, 0) + rospy.loginfo("run mission") + # does it cross expected boundaries in X seconds? + timeout = 120 + crossed = False + for count in xrange(timeout): + # update timestamp for each published SP + att.header.stamp = rospy.Time.now() + self.att_setpoint_pub.publish(att) - armed = True - - if (self.local_position.pose.position.x > 5 - and self.local_position.pose.position.z > 5 - and self.local_position.pose.position.y < -5): + if (self.local_position.pose.position.x > 5 and + self.local_position.pose.position.z > 5 and + self.local_position.pose.position.y < -5): + rospy.loginfo("boundary crossed | count {0}".format(count)) + crossed = True break - count = count + 1 - self.assertTrue(count < timeout, "took too long to cross boundaries") + self.rate.sleep() + + self.assertTrue(crossed, ( + "took too long to cross boundaries | x: {0}, y: {1}, z: {2}, timeout: {3}". + format(self.local_position.pose.position.x, + self.local_position.pose.position.y, + self.local_position.pose.position.z, timeout))) + + if self.state.armed: + try: + self.set_arming_srv(False) + except rospy.ServiceException as e: + rospy.logerr(e) if __name__ == '__main__': import rostest - rostest.rosrun(PKG, 'mavros_offboard_attctl_test', MavrosOffboardAttctlTest) - #unittest.main() + rospy.init_node('test_node', anonymous=True) + + rostest.rosrun(PKG, 'mavros_offboard_attctl_test', + MavrosOffboardAttctlTest) diff --git a/integrationtests/python_src/px4_it/mavros/mavros_offboard_posctl_test.py b/integrationtests/python_src/px4_it/mavros/mavros_offboard_posctl_test.py index 188110c03f..b0facae7ce 100755 --- a/integrationtests/python_src/px4_it/mavros/mavros_offboard_posctl_test.py +++ b/integrationtests/python_src/px4_it/mavros/mavros_offboard_posctl_test.py @@ -43,18 +43,14 @@ PKG = 'px4' import unittest import rospy import math -import rosbag -import time - -from numpy import linalg import numpy as np - -from std_msgs.msg import Header -from geometry_msgs.msg import PoseStamped, Quaternion from tf.transformations import quaternion_from_euler -from mavros_msgs.srv import CommandLong +from geometry_msgs.msg import PoseStamped, Quaternion +from mavros_msgs.msg import State +from mavros_msgs.srv import CommandBool, SetMode from sensor_msgs.msg import NavSatFix -#from px4_test_helper import PX4TestHelper +from std_msgs.msg import Header + class MavrosOffboardPosctlTest(unittest.TestCase): """ @@ -66,22 +62,26 @@ class MavrosOffboardPosctlTest(unittest.TestCase): """ def setUp(self): - rospy.init_node('test_node', anonymous=True) - #self.helper = PX4TestHelper("mavros_offboard_posctl_test") - #self.helper.setUp() - - rospy.Subscriber("mavros/local_position/pose", PoseStamped, self.position_callback) - rospy.Subscriber("mavros/global_position/global", NavSatFix, self.global_position_callback) - self.pub_spt = rospy.Publisher('mavros/setpoint_position/local', PoseStamped, queue_size=10) - rospy.wait_for_service('mavros/cmd/command', 30) - self._srv_cmd_long = rospy.ServiceProxy('mavros/cmd/command', CommandLong, persistent=True) - self.rate = rospy.Rate(10) # 10hz + self.rate = rospy.Rate(10) # 10hz self.has_global_pos = False self.local_position = PoseStamped() - self.armed = False + self.state = State() + + # setup ROS topics and services + rospy.wait_for_service('mavros/cmd/arming', 30) + rospy.wait_for_service('mavros/set_mode', 30) + self.set_arming_srv = rospy.ServiceProxy('/mavros/cmd/arming', + CommandBool) + self.set_mode_srv = rospy.ServiceProxy('/mavros/set_mode', SetMode) + rospy.Subscriber('mavros/local_position/pose', PoseStamped, + self.position_callback) + rospy.Subscriber('mavros/global_position/global', NavSatFix, + self.global_position_callback) + rospy.Subscriber('mavros/state', State, self.state_callback) + self.pos_setpoint_pub = rospy.Publisher( + 'mavros/setpoint_position/local', PoseStamped, queue_size=10) def tearDown(self): - #self.helper.tearDown() pass # @@ -93,20 +93,28 @@ class MavrosOffboardPosctlTest(unittest.TestCase): def global_position_callback(self, data): self.has_global_pos = True + def state_callback(self, data): + self.state = data + # # Helper methods # + def wait_until_ready(self): + """FIXME: hack to wait for simulation to be ready""" + rospy.loginfo("waiting for global position") + while not self.has_global_pos: + self.rate.sleep() + def is_at_position(self, x, y, z, offset): - rospy.logdebug("current position %f, %f, %f" % - (self.local_position.pose.position.x, - self.local_position.pose.position.y, - self.local_position.pose.position.z)) + rospy.logdebug("current position | x:{0}, y:{1}, z:{2}".format( + self.local_position.pose.position.x, self.local_position.pose. + position.y, self.local_position.pose.position.z)) desired = np.array((x, y, z)) pos = np.array((self.local_position.pose.position.x, self.local_position.pose.position.y, self.local_position.pose.position.z)) - return linalg.norm(desired - pos) < offset + return np.linalg.norm(desired - pos) < offset def reach_position(self, x, y, z, timeout): # set a position setpoint @@ -123,54 +131,75 @@ class MavrosOffboardPosctlTest(unittest.TestCase): quaternion = quaternion_from_euler(0, 0, yaw) pos.pose.orientation = Quaternion(*quaternion) - # does it reach the position in X seconds? - count = 0 - while count < timeout: - # update timestamp for each published SP + # send some setpoints before starting + for i in xrange(20): pos.header.stamp = rospy.Time.now() - self.pub_spt.publish(pos) - #self.helper.bag_write('mavros/setpoint_position/local', pos) - - # FIXME: arm and switch to offboard - # (need to wait the first few rounds until PX4 has the offboard stream) - if not self.armed and count > 5: - self._srv_cmd_long(False, 176, False, - 1, 6, 0, 0, 0, 0, 0) - # make sure the first command doesn't get lost - time.sleep(1) - - self._srv_cmd_long(False, 400, False, - # arm - 1, 0, 0, 0, 0, 0, 0) - - self.armed = True - - if self.is_at_position(pos.pose.position.x, pos.pose.position.y, pos.pose.position.z, 1): - break - count = count + 1 + self.pos_setpoint_pub.publish(pos) self.rate.sleep() - self.assertTrue(count < timeout, "took too long to get to position") + rospy.loginfo("set mission mode and arm") + while self.state.mode != "OFFBOARD" or not self.state.armed: + if self.state.mode != "OFFBOARD": + try: + self.set_mode_srv(0, 'OFFBOARD') + except rospy.ServiceException as e: + rospy.logerr(e) + if not self.state.armed: + try: + self.set_arming_srv(True) + except rospy.ServiceException as e: + rospy.logerr(e) + rospy.sleep(2) + rospy.loginfo("run mission") + # does it reach the position in X seconds? + reached = False + for count in xrange(timeout): + # update timestamp for each published SP + pos.header.stamp = rospy.Time.now() + self.pos_setpoint_pub.publish(pos) + + if self.is_at_position(pos.pose.position.x, pos.pose.position.y, + pos.pose.position.z, 1): + rospy.loginfo( + "position reached | count: {0}, x: {0}, y: {1}, z: {2}". + format(count, pos.pose.position.x, pos.pose.position.y, + pos.pose.position.z)) + reached = True + break + + self.rate.sleep() + + self.assertTrue(reached, ( + "took too long to get to position | x: {0}, y: {1}, z: {2}, timeout: {3}". + format(self.local_position.pose.position.x, + self.local_position.pose.position.y, + self.local_position.pose.position.z, timeout))) + + # + # Test method + # def test_posctl(self): """Test offboard position control""" - # FIXME: hack to wait for simulation to be ready - while not self.has_global_pos: - self.rate.sleep() + self.wait_until_ready() - positions = ( - (0, 0, 0), - (2, 2, 2), - (2, -2, 2), - (-2, -2, 2), - (2, 2, 2)) + positions = ((0, 0, 0), (2, 2, 2), (2, -2, 2), (-2, -2, 2), (2, 2, 2)) - for i in range(0, len(positions)): - self.reach_position(positions[i][0], positions[i][1], positions[i][2], 180) + for i in xrange(len(positions)): + self.reach_position(positions[i][0], positions[i][1], + positions[i][2], 180) + + if self.state.armed: + try: + self.set_arming_srv(False) + except rospy.ServiceException as e: + rospy.logerr(e) if __name__ == '__main__': import rostest - rostest.rosrun(PKG, 'mavros_offboard_posctl_test', MavrosOffboardPosctlTest) - #unittest.main() + rospy.init_node('test_node', anonymous=True) + + rostest.rosrun(PKG, 'mavros_offboard_posctl_test', + MavrosOffboardPosctlTest) diff --git a/integrationtests/python_src/px4_it/mavros/mission_test.py b/integrationtests/python_src/px4_it/mavros/mission_test.py index bf228e671a..ad2eff002b 100755 --- a/integrationtests/python_src/px4_it/mavros/mission_test.py +++ b/integrationtests/python_src/px4_it/mavros/mission_test.py @@ -43,91 +43,102 @@ PKG = 'px4' import unittest import rospy -import math -import rosbag -import sys -import os -import time import glob import json - -import mavros -from pymavlink import mavutil -from mavros import mavlink - +import math +import os import px4tools - -from geometry_msgs.msg import PoseStamped -from mavros_msgs.srv import CommandLong, WaypointPush -from mavros_msgs.msg import Mavlink, Waypoint, ExtendedState -from sensor_msgs.msg import NavSatFix +import sys +from mavros import mavlink from mavros.mission import QGroundControlWP -#from px4_test_helper import PX4TestHelper +from pymavlink import mavutil +from threading import Thread +from geometry_msgs.msg import PoseStamped +from mavros_msgs.msg import Altitude, ExtendedState, Mavlink, State, Waypoint +from mavros_msgs.srv import CommandBool, SetMode, WaypointPush +from sensor_msgs.msg import NavSatFix + def get_last_log(): try: log_path = os.environ['PX4_LOG_DIR'] except KeyError: - log_path = os.path.join(os.environ['HOME'], 'ros/rootfs/fs/microsd/log') - last_log_dir = sorted( - glob.glob(os.path.join(log_path, '*')))[-1] + log_path = os.path.join(os.environ['HOME'], + '.ros/rootfs/fs/microsd/log') + last_log_dir = sorted(glob.glob(os.path.join(log_path, '*')))[-1] last_log = sorted(glob.glob(os.path.join(last_log_dir, '*.ulg')))[-1] return last_log + def read_new_mission(f): d = json.load(f) current = True for wp in d['items']: yield Waypoint( - is_current = current, - frame = int(wp['frame']), - command = int(wp['command']), - param1 = float(wp['param1']), - param2 = float(wp['param2']), - param3 = float(wp['param3']), - param4 = float(wp['param4']), - x_lat = float(wp['coordinate'][0]), - y_long = float(wp['coordinate'][1]), - z_alt = float(wp['coordinate'][2]), - autocontinue = bool(wp['autoContinue'])) + is_current=current, + frame=int(wp['frame']), + command=int(wp['command']), + param1=float(wp['param1']), + param2=float(wp['param2']), + param3=float(wp['param3']), + param4=float(wp['param4']), + x_lat=float(wp['coordinate'][0]), + y_long=float(wp['coordinate'][1]), + z_alt=float(wp['coordinate'][2]), + autocontinue=bool(wp['autoContinue'])) if current: current = False + class MavrosMissionTest(unittest.TestCase): """ Run a mission """ def setUp(self): - rospy.init_node('test_node', anonymous=True) - - self.rate = rospy.Rate(10) # 10hz + self.rate = rospy.Rate(10) # 10hz self.has_global_pos = False self.local_position = PoseStamped() self.global_position = NavSatFix() self.extended_state = ExtendedState() - self.home_alt = 0 + self.altitude = Altitude() + self.state = State() self.mc_rad = 5 self.fw_rad = 60 self.fw_alt_rad = 10 - self.last_alt_d = 9999 - self.last_pos_d = 9999 + self.last_alt_d = None + self.last_pos_d = None self.mission_name = "" - # need to simulate heartbeat for datalink loss detection - rospy.Timer(rospy.Duration(0.5), self.send_heartbeat) + # setup ROS topics and services + rospy.wait_for_service('mavros/mission/push', 30) + rospy.wait_for_service('mavros/cmd/arming', 30) + rospy.wait_for_service('mavros/set_mode', 30) + self.wp_push_srv = rospy.ServiceProxy('mavros/mission/push', + WaypointPush) + self.set_arming_srv = rospy.ServiceProxy('/mavros/cmd/arming', + CommandBool) + self.set_mode_srv = rospy.ServiceProxy('/mavros/set_mode', SetMode) + rospy.Subscriber('mavros/local_position/pose', PoseStamped, + self.position_callback) + rospy.Subscriber('mavros/global_position/global', NavSatFix, + self.global_position_callback) + rospy.Subscriber('mavros/extended_state', ExtendedState, + self.extended_state_callback) + rospy.Subscriber('mavros/altitude', Altitude, self.altitude_callback) + rospy.Subscriber('mavros/state', State, self.state_callback) + self.mavlink_pub = rospy.Publisher('mavlink/to', Mavlink, queue_size=1) - rospy.wait_for_service('mavros/cmd/command', 30) - self.pub_mavlink = rospy.Publisher('mavlink/to', Mavlink, queue_size=1) - self._srv_cmd_long = rospy.ServiceProxy('mavros/cmd/command', CommandLong, persistent=True) - self._srv_wp_push = rospy.ServiceProxy('mavros/mission/push', WaypointPush, persistent=True) - - rospy.Subscriber("mavros/local_position/pose", PoseStamped, self.position_callback) - rospy.Subscriber("mavros/global_position/global", NavSatFix, self.global_position_callback) - rospy.Subscriber("mavros/extended_state", ExtendedState, self.extended_state_callback) + # need to simulate heartbeat to prevent datalink loss detection + self.hb_mav_msg = mavutil.mavlink.MAVLink_heartbeat_message( + mavutil.mavlink.MAV_TYPE_GCS, 0, 0, 0, 0, 0) + self.hb_mav_msg.pack(mavutil.mavlink.MAVLink('', 2, 1)) + self.hb_ros_msg = mavlink.convert_to_rosmsg(self.hb_mav_msg) + self.hb_thread = Thread(target=self.send_heartbeat, args=()) + self.hb_thread.daemon = True + self.hb_thread.start() def tearDown(self): - #self.helper.tearDown() pass # @@ -140,77 +151,91 @@ class MavrosMissionTest(unittest.TestCase): self.global_position = data if not self.has_global_pos: - if data.altitude != 0: - self.home_alt = data.altitude - self.has_global_pos = True + self.has_global_pos = True def extended_state_callback(self, data): - - prev_state = self.extended_state.vtol_state; - + prev_state = self.extended_state.vtol_state self.extended_state = data + if (prev_state != self.extended_state.vtol_state): - print("VTOL state change: %d" % self.extended_state.vtol_state); + rospy.loginfo("VTOL state change: {0}".format( + self.extended_state.vtol_state)) + + def state_callback(self, data): + self.state = data + + def altitude_callback(self, data): + self.altitude = data # # Helper methods # + def send_heartbeat(self): + while not rospy.is_shutdown(): + self.mavlink_pub.publish(self.hb_ros_msg) + try: + rospy.sleep(0.5) + except rospy.ROSInterruptException: + pass + def is_at_position(self, lat, lon, alt, xy_offset, z_offset): - R = 6371000 # metres + R = 6371000 # metres rlat1 = math.radians(lat) rlat2 = math.radians(self.global_position.latitude) rlat_d = math.radians(self.global_position.latitude - lat) rlon_d = math.radians(self.global_position.longitude - lon) - a = (math.sin(rlat_d / 2) * math.sin(rlat_d / 2) + - math.cos(rlat1) * math.cos(rlat2) * - math.sin(rlon_d / 2) * math.sin(rlon_d / 2)) - c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a)) + a = (math.sin(rlat_d / 2) * math.sin(rlat_d / 2) + math.cos(rlat1) * + math.cos(rlat2) * math.sin(rlon_d / 2) * math.sin(rlon_d / 2)) + c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a)) d = R * c - alt_d = abs(alt - self.global_position.altitude) - - #rospy.loginfo("d: %f, alt_d: %f", d, alt_d) + alt_d = abs(alt - self.altitude.amsl) # remember best distances - if self.last_pos_d > d: + if not self.last_pos_d or self.last_pos_d > d: self.last_pos_d = d - if self.last_alt_d > alt_d: + if not self.last_alt_d or self.last_alt_d > alt_d: self.last_alt_d = alt_d + rospy.logdebug("d: {0}, alt_d: {1}".format(d, alt_d)) return d < xy_offset and alt_d < z_offset def reach_position(self, lat, lon, alt, timeout, index): # reset best distances - self.last_alt_d = 9999 - self.last_pos_d = 9999 + self.last_alt_d = None + self.last_pos_d = None - rospy.loginfo("trying to reach waypoint " + - "lat: %13.9f, lon: %13.9f, alt: %6.2f, timeout: %d, index: %d" % - (lat, lon, alt, timeout, index)) + rospy.loginfo( + "trying to reach waypoint | " + + "lat: {0:13.9f}, lon: {1:13.9f}, alt: {2:6.2f}, timeout: {3}, index: {4}". + format(lat, lon, alt, timeout, index)) # does it reach the position in X seconds? - count = 0 - while count < timeout: + reached = False + for count in xrange(timeout): # use MC radius by default # FIXME: also check MAV_TYPE from system status, otherwise pure fixed-wing won't work xy_radius = self.mc_rad z_radius = self.mc_rad # use FW radius if in FW or in transition - if (self.extended_state.vtol_state == ExtendedState.VTOL_STATE_FW or - self.extended_state.vtol_state == ExtendedState.VTOL_STATE_TRANSITION_TO_MC or - self.extended_state.vtol_state == ExtendedState.VTOL_STATE_TRANSITION_TO_FW): + if (self.extended_state.vtol_state == ExtendedState.VTOL_STATE_FW + or self.extended_state.vtol_state == + ExtendedState.VTOL_STATE_TRANSITION_TO_MC or + self.extended_state.vtol_state == + ExtendedState.VTOL_STATE_TRANSITION_TO_FW): xy_radius = self.fw_rad z_radius = self.fw_alt_rad if self.is_at_position(lat, lon, alt, xy_radius, z_radius): - rospy.loginfo("position reached, index: %d, count: %d, pos_d: %f, alt_d: %f" % - (index, count, self.last_pos_d, self.last_alt_d)) + reached = True + rospy.loginfo( + "position reached | index: {0}, count: {1}, pos_d: {2}, alt_d: {3}". + format(index, count, self.last_pos_d, self.last_alt_d)) break - count = count + 1 self.rate.sleep() vtol_state_string = "VTOL undefined" @@ -219,88 +244,79 @@ class MavrosMissionTest(unittest.TestCase): vtol_state_string = "VTOL MC" if (self.extended_state.vtol_state == ExtendedState.VTOL_STATE_FW): vtol_state_string = "VTOL FW" - if (self.extended_state.vtol_state == ExtendedState.VTOL_STATE_TRANSITION_TO_MC): + if (self.extended_state.vtol_state == + ExtendedState.VTOL_STATE_TRANSITION_TO_MC): vtol_state_string = "VTOL FW->MC" - if (self.extended_state.vtol_state == ExtendedState.VTOL_STATE_TRANSITION_TO_FW): + if (self.extended_state.vtol_state == + ExtendedState.VTOL_STATE_TRANSITION_TO_FW): vtol_state_string = "VTOL MC->FW" - self.assertTrue(count < timeout, (("(%s) took too long to get to position " + - "lat: %13.9f, lon: %13.9f, alt: %6.2f, xy off: %f, z off: %f, timeout: %d, index: %d, pos_d: %f, alt_d: %f, VTOL state: %s") % - (self.mission_name, lat, lon, alt, xy_radius, z_radius, timeout, index, self.last_pos_d, self.last_alt_d, vtol_state_string))) - - def run_mission(self): - # Hack to wait until vehicle is ready - # TODO better integration with pre-flight status reporting - time.sleep(5) - """switch mode: auto and arm""" - self._srv_cmd_long(False, 176, False, - # custom, auto, mission - 1, 4, 4, 0, 0, 0, 0) - # make sure the first command doesn't get lost - time.sleep(1) - - self._srv_cmd_long(False, 400, False, - # arm - 1, 0, 0, 0, 0, 0, 0) + self.assertTrue(reached, ( + "({0}) took too long to get to position | lat: {1:13.9f}, lon: {2:13.9f}, alt: {3:6.2f}, xy off: {4}, z off: {5}, timeout: {6}, index: {7}, pos_d: {8}, alt_d: {9}, VTOL state: {10}". + format(self.mission_name, lat, lon, alt, xy_radius, z_radius, + timeout, index, self.last_pos_d, self.last_alt_d, + vtol_state_string))) def wait_until_ready(self): """FIXME: hack to wait for simulation to be ready""" - while not self.has_global_pos: + rospy.loginfo("waiting for global position") + while not self.has_global_pos or math.isnan( + self.altitude.amsl) or math.isnan(self.altitude.relative): self.rate.sleep() def wait_on_landing(self, timeout, index): """Wait for landed state""" - - rospy.loginfo("waiting for landing " + - "timeout: %d, index: %d" % - (timeout, index)) - - count = 0 - while count < timeout: + rospy.loginfo("waiting for landing | timeout: {0}, index: {1}".format( + timeout, index)) + landed = False + for count in xrange(timeout): if self.extended_state.landed_state == ExtendedState.LANDED_STATE_ON_GROUND: + rospy.loginfo( + "landed | index: {0}, count: {1}".format(index, count)) + landed = True break - count = count + 1 self.rate.sleep() - self.assertTrue(count < timeout, ("(%s) landing not detected after landing WP " + - "timeout: %d, index: %d") % - (self.mission_name, timeout, index)) + self.assertTrue(landed, ( + "({0}) landing not detected after landing WP | timeout: {1}, index: {2}". + format(self.mission_name, timeout, index))) def wait_on_transition(self, transition, timeout, index): """Wait for VTOL transition""" - rospy.loginfo("waiting for VTOL transition " + - "transition: %d, timeout: %d, index: %d" % - (transition, timeout, index)) - - count = 0 - while count < timeout: + rospy.loginfo( + "waiting for VTOL transition | transition: {0}, timeout: {1}, index: {2}". + format(transition, timeout, index)) + transitioned = False + for count in xrange(timeout): # transition to MC if (transition == ExtendedState.VTOL_STATE_MC and - self.extended_state.vtol_state == ExtendedState.VTOL_STATE_MC): + self.extended_state.vtol_state == + ExtendedState.VTOL_STATE_MC): + rospy.loginfo("transitioned | index: {0}, count: {1}".format( + index, count)) + transitioned = True break # transition to FW if (transition == ExtendedState.VTOL_STATE_FW and - self.extended_state.vtol_state == ExtendedState.VTOL_STATE_FW): + self.extended_state.vtol_state == + ExtendedState.VTOL_STATE_FW): + rospy.loginfo("transitioned | index: {0}, count: {1}".format( + index, count)) + transitioned = True break - count = count + 1 self.rate.sleep() - self.assertTrue(count < timeout, ("(%s) transition not detected " + - "timeout: %d, index: %d") % - (self.mission_name, timeout, index)) - - def send_heartbeat(self, event=None): - # mav type gcs - mavmsg = mavutil.mavlink.MAVLink_heartbeat_message(6, 0, 0, 0, 0, 0) - # XXX: hack: using header object to set mav properties - mavmsg.pack(mavutil.mavlink.MAVLink_header(0, 0, 0, 2, 1)) - rosmsg = mavlink.convert_to_rosmsg(mavmsg) - self.pub_mavlink.publish(rosmsg) + self.assertTrue(transitioned, ( + "({0}) transition not detected | timeout: {1}, index: {2}".format( + self.mission_name, timeout, index))) + # + # Test method + # def test_mission(self): """Test mission""" @@ -309,11 +325,11 @@ class MavrosMissionTest(unittest.TestCase): return self.mission_name = sys.argv[1] - mission_file = os.path.dirname(os.path.realpath(__file__)) + "/" + sys.argv[1] + mission_file = os.path.dirname( + os.path.realpath(__file__)) + "/" + sys.argv[1] - rospy.loginfo("reading mission %s", mission_file) + rospy.loginfo("reading mission {0}".format(mission_file)) wps = [] - with open(mission_file, 'r') as f: mission_ext = os.path.splitext(mission_file)[1] if mission_ext == '.mission': @@ -330,35 +346,52 @@ class MavrosMissionTest(unittest.TestCase): else: raise IOError('unknown mission file extension', mission_ext) - rospy.loginfo("wait until ready") self.wait_until_ready() rospy.loginfo("send mission") - res = self._srv_wp_push(wps) - rospy.loginfo(res) - self.assertTrue(res.success, "(%s) mission could not be transfered" % self.mission_name) + result = False + try: + res = self.wp_push_srv(start_index=0, waypoints=wps) + result = res.success + except rospy.ServiceException as e: + rospy.logerr(e) + self.assertTrue( + result, + "({0}) mission could not be transfered".format(self.mission_name)) + + rospy.loginfo("set mission mode and arm") + while self.state.mode != "AUTO.MISSION" or not self.state.armed: + if self.state.mode != "AUTO.MISSION": + try: + res = self.set_mode_srv(0, 'AUTO.MISSION') + except rospy.ServiceException as e: + rospy.logerr(e) + if not self.state.armed: + try: + self.set_arming_srv(True) + except rospy.ServiceException as e: + rospy.logerr(e) + rospy.sleep(2) rospy.loginfo("run mission") - self.run_mission() - - index = 0 - for waypoint in wps: + for index, waypoint in enumerate(wps): # only check position for waypoints where this makes sense if waypoint.frame == Waypoint.FRAME_GLOBAL_REL_ALT or waypoint.frame == Waypoint.FRAME_GLOBAL: alt = waypoint.z_alt if waypoint.frame == Waypoint.FRAME_GLOBAL_REL_ALT: - alt += self.home_alt + alt += self.altitude.amsl - self.altitude.relative - self.reach_position(waypoint.x_lat, waypoint.y_long, alt, 600, index) + self.reach_position(waypoint.x_lat, waypoint.y_long, alt, 600, + index) # check if VTOL transition happens if applicable if waypoint.command == 84 or waypoint.command == 85 or waypoint.command == 3000: transition = waypoint.param1 - if waypoint.command == 84: # VTOL takeoff implies transition to FW + if waypoint.command == 84: # VTOL takeoff implies transition to FW transition = ExtendedState.VTOL_STATE_FW - if waypoint.command == 85: # VTOL takeoff implies transition to MC + if waypoint.command == 85: # VTOL takeoff implies transition to MC transition = ExtendedState.VTOL_STATE_MC self.wait_on_transition(transition, 600, index) @@ -367,25 +400,32 @@ class MavrosMissionTest(unittest.TestCase): if waypoint.command == 85 or waypoint.command == 21: self.wait_on_landing(600, index) - index += 1 + if self.state.armed: + try: + self.set_arming_srv(False) + except rospy.ServiceException as e: + rospy.logerr(e) rospy.loginfo("mission done, calculating performance metrics") last_log = get_last_log() - rospy.loginfo("log file %s", last_log) + rospy.loginfo("log file {0}".format(last_log)) data = px4tools.ulog.read_ulog(last_log).concat(dt=0.1) data = px4tools.ulog.compute_data(data) res = px4tools.estimator_analysis(data, False) # enforce performance - self.assertTrue(abs(res['roll_error_mean']) < 5.0, str(res)) + self.assertTrue(abs(res['roll_error_mean']) < 5.0, str(res)) self.assertTrue(abs(res['pitch_error_mean']) < 5.0, str(res)) self.assertTrue(abs(res['yaw_error_mean']) < 5.0, str(res)) self.assertTrue(res['roll_error_std'] < 5.0, str(res)) self.assertTrue(res['pitch_error_std'] < 5.0, str(res)) self.assertTrue(res['yaw_error_std'] < 5.0, str(res)) + if __name__ == '__main__': import rostest + rospy.init_node('test_node', anonymous=True) + name = "mavros_mission_test" if len(sys.argv) > 1: name += "-%s" % sys.argv[1] diff --git a/launch/mavros_posix_tests_iris.launch b/launch/mavros_posix_tests_iris.launch index 04a79f814e..7fae242b7e 100644 --- a/launch/mavros_posix_tests_iris.launch +++ b/launch/mavros_posix_tests_iris.launch @@ -15,9 +15,9 @@ - - - + + +