forked from Archive/PX4-Autopilot
418 lines
17 KiB
Python
418 lines
17 KiB
Python
#!/usr/bin/env python2
|
|
from __future__ import division
|
|
|
|
import unittest
|
|
import rospy
|
|
import math
|
|
from geometry_msgs.msg import PoseStamped
|
|
from mavros_msgs.msg import Altitude, ExtendedState, HomePosition, State, \
|
|
WaypointList
|
|
from mavros_msgs.srv import CommandBool, ParamGet, SetMode, WaypointClear, \
|
|
WaypointPush
|
|
from pymavlink import mavutil
|
|
from sensor_msgs.msg import NavSatFix
|
|
|
|
|
|
class MavrosTestCommon(unittest.TestCase):
|
|
def __init__(self, *args):
|
|
super(MavrosTestCommon, self).__init__(*args)
|
|
|
|
def setUp(self):
|
|
self.altitude = Altitude()
|
|
self.extended_state = ExtendedState()
|
|
self.global_position = NavSatFix()
|
|
self.home_position = HomePosition()
|
|
self.local_position = PoseStamped()
|
|
self.mission_wp = WaypointList()
|
|
self.state = State()
|
|
self.mav_type = None
|
|
|
|
self.sub_topics_ready = {
|
|
key: False
|
|
for key in [
|
|
'alt', 'ext_state', 'global_pos', 'home_pos', 'local_pos',
|
|
'mission_wp', 'state'
|
|
]
|
|
}
|
|
|
|
# ROS services
|
|
service_timeout = 30
|
|
rospy.loginfo("waiting for ROS services")
|
|
try:
|
|
rospy.wait_for_service('mavros/param/get', service_timeout)
|
|
rospy.wait_for_service('mavros/cmd/arming', service_timeout)
|
|
rospy.wait_for_service('mavros/mission/push', service_timeout)
|
|
rospy.wait_for_service('mavros/mission/clear', service_timeout)
|
|
rospy.wait_for_service('mavros/set_mode', service_timeout)
|
|
rospy.loginfo("ROS services are up")
|
|
except rospy.ROSException:
|
|
self.fail("failed to connect to services")
|
|
self.get_param_srv = rospy.ServiceProxy('mavros/param/get', ParamGet)
|
|
self.set_arming_srv = rospy.ServiceProxy('mavros/cmd/arming',
|
|
CommandBool)
|
|
self.set_mode_srv = rospy.ServiceProxy('mavros/set_mode', SetMode)
|
|
self.wp_clear_srv = rospy.ServiceProxy('mavros/mission/clear',
|
|
WaypointClear)
|
|
self.wp_push_srv = rospy.ServiceProxy('mavros/mission/push',
|
|
WaypointPush)
|
|
|
|
# ROS subscribers
|
|
self.alt_sub = rospy.Subscriber('mavros/altitude', Altitude,
|
|
self.altitude_callback)
|
|
self.ext_state_sub = rospy.Subscriber('mavros/extended_state',
|
|
ExtendedState,
|
|
self.extended_state_callback)
|
|
self.global_pos_sub = rospy.Subscriber('mavros/global_position/global',
|
|
NavSatFix,
|
|
self.global_position_callback)
|
|
self.home_pos_sub = rospy.Subscriber('mavros/home_position/home',
|
|
HomePosition,
|
|
self.home_position_callback)
|
|
self.local_pos_sub = rospy.Subscriber('mavros/local_position/pose',
|
|
PoseStamped,
|
|
self.local_position_callback)
|
|
self.mission_wp_sub = rospy.Subscriber(
|
|
'mavros/mission/waypoints', WaypointList, self.mission_wp_callback)
|
|
self.state_sub = rospy.Subscriber('mavros/state', State,
|
|
self.state_callback)
|
|
|
|
def tearDown(self):
|
|
self.log_topic_vars()
|
|
|
|
#
|
|
# Callback functions
|
|
#
|
|
def altitude_callback(self, data):
|
|
self.altitude = data
|
|
|
|
# amsl has been observed to be nan while other fields are valid
|
|
if not self.sub_topics_ready['alt'] and not math.isnan(data.amsl):
|
|
self.sub_topics_ready['alt'] = True
|
|
|
|
def extended_state_callback(self, data):
|
|
if self.extended_state.vtol_state != data.vtol_state:
|
|
rospy.loginfo("VTOL state changed from {0} to {1}".format(
|
|
mavutil.mavlink.enums['MAV_VTOL_STATE']
|
|
[self.extended_state.vtol_state].name, mavutil.mavlink.enums[
|
|
'MAV_VTOL_STATE'][data.vtol_state].name))
|
|
|
|
if self.extended_state.landed_state != data.landed_state:
|
|
rospy.loginfo("landed state changed from {0} to {1}".format(
|
|
mavutil.mavlink.enums['MAV_LANDED_STATE']
|
|
[self.extended_state.landed_state].name, mavutil.mavlink.enums[
|
|
'MAV_LANDED_STATE'][data.landed_state].name))
|
|
|
|
self.extended_state = data
|
|
|
|
if not self.sub_topics_ready['ext_state']:
|
|
self.sub_topics_ready['ext_state'] = True
|
|
|
|
def global_position_callback(self, data):
|
|
self.global_position = data
|
|
|
|
if not self.sub_topics_ready['global_pos']:
|
|
self.sub_topics_ready['global_pos'] = True
|
|
|
|
def home_position_callback(self, data):
|
|
self.home_position = data
|
|
|
|
if not self.sub_topics_ready['home_pos']:
|
|
self.sub_topics_ready['home_pos'] = True
|
|
|
|
def local_position_callback(self, data):
|
|
self.local_position = data
|
|
|
|
if not self.sub_topics_ready['local_pos']:
|
|
self.sub_topics_ready['local_pos'] = True
|
|
|
|
def mission_wp_callback(self, data):
|
|
if self.mission_wp.current_seq != data.current_seq:
|
|
rospy.loginfo("current mission waypoint sequence updated: {0}".
|
|
format(data.current_seq))
|
|
|
|
self.mission_wp = data
|
|
|
|
if not self.sub_topics_ready['mission_wp']:
|
|
self.sub_topics_ready['mission_wp'] = True
|
|
|
|
def state_callback(self, data):
|
|
if self.state.armed != data.armed:
|
|
rospy.loginfo("armed state changed from {0} to {1}".format(
|
|
self.state.armed, data.armed))
|
|
|
|
if self.state.connected != data.connected:
|
|
rospy.loginfo("connected changed from {0} to {1}".format(
|
|
self.state.connected, data.connected))
|
|
|
|
if self.state.mode != data.mode:
|
|
rospy.loginfo("mode changed from {0} to {1}".format(
|
|
self.state.mode, data.mode))
|
|
|
|
if self.state.system_status != data.system_status:
|
|
rospy.loginfo("system_status changed from {0} to {1}".format(
|
|
mavutil.mavlink.enums['MAV_STATE'][
|
|
self.state.system_status].name, mavutil.mavlink.enums[
|
|
'MAV_STATE'][data.system_status].name))
|
|
|
|
self.state = data
|
|
|
|
# mavros publishes a disconnected state message on init
|
|
if not self.sub_topics_ready['state'] and data.connected:
|
|
self.sub_topics_ready['state'] = True
|
|
|
|
#
|
|
# Helper methods
|
|
#
|
|
def set_arm(self, arm, timeout):
|
|
"""arm: True to arm or False to disarm, timeout(int): seconds"""
|
|
rospy.loginfo("setting FCU arm: {0}".format(arm))
|
|
old_arm = self.state.armed
|
|
loop_freq = 1 # Hz
|
|
rate = rospy.Rate(loop_freq)
|
|
arm_set = False
|
|
for i in xrange(timeout * loop_freq):
|
|
if self.state.armed == arm:
|
|
arm_set = True
|
|
rospy.loginfo("set arm success | seconds: {0} of {1}".format(
|
|
i / loop_freq, timeout))
|
|
break
|
|
else:
|
|
try:
|
|
res = self.set_arming_srv(arm)
|
|
if not res.success:
|
|
rospy.logerr("failed to send arm command")
|
|
except rospy.ServiceException as e:
|
|
rospy.logerr(e)
|
|
|
|
try:
|
|
rate.sleep()
|
|
except rospy.ROSException as e:
|
|
self.fail(e)
|
|
|
|
self.assertTrue(arm_set, (
|
|
"failed to set arm | new arm: {0}, old arm: {1} | timeout(seconds): {2}".
|
|
format(arm, old_arm, timeout)))
|
|
|
|
def set_mode(self, mode, timeout):
|
|
"""mode: PX4 mode string, timeout(int): seconds"""
|
|
rospy.loginfo("setting FCU mode: {0}".format(mode))
|
|
old_mode = self.state.mode
|
|
loop_freq = 1 # Hz
|
|
rate = rospy.Rate(loop_freq)
|
|
mode_set = False
|
|
for i in xrange(timeout * loop_freq):
|
|
if self.state.mode == mode:
|
|
mode_set = True
|
|
rospy.loginfo("set mode success | seconds: {0} of {1}".format(
|
|
i / loop_freq, timeout))
|
|
break
|
|
else:
|
|
try:
|
|
res = self.set_mode_srv(0, mode) # 0 is custom mode
|
|
if not res.mode_sent:
|
|
rospy.logerr("failed to send mode command")
|
|
except rospy.ServiceException as e:
|
|
rospy.logerr(e)
|
|
|
|
try:
|
|
rate.sleep()
|
|
except rospy.ROSException as e:
|
|
self.fail(e)
|
|
|
|
self.assertTrue(mode_set, (
|
|
"failed to set mode | new mode: {0}, old mode: {1} | timeout(seconds): {2}".
|
|
format(mode, old_mode, timeout)))
|
|
|
|
def wait_for_topics(self, timeout):
|
|
"""wait for simulation to be ready, make sure we're getting topic info
|
|
from all topics by checking dictionary of flag values set in callbacks,
|
|
timeout(int): seconds"""
|
|
rospy.loginfo("waiting for subscribed topics to be ready")
|
|
loop_freq = 1 # Hz
|
|
rate = rospy.Rate(loop_freq)
|
|
simulation_ready = False
|
|
for i in xrange(timeout * loop_freq):
|
|
if all(value for value in self.sub_topics_ready.values()):
|
|
simulation_ready = True
|
|
rospy.loginfo("simulation topics ready | seconds: {0} of {1}".
|
|
format(i / loop_freq, timeout))
|
|
break
|
|
|
|
try:
|
|
rate.sleep()
|
|
except rospy.ROSException as e:
|
|
self.fail(e)
|
|
|
|
self.assertTrue(simulation_ready, (
|
|
"failed to hear from all subscribed simulation topics | topic ready flags: {0} | timeout(seconds): {1}".
|
|
format(self.sub_topics_ready, timeout)))
|
|
|
|
def wait_for_landed_state(self, desired_landed_state, timeout, index):
|
|
rospy.loginfo("waiting for landed state | state: {0}, index: {1}".
|
|
format(mavutil.mavlink.enums['MAV_LANDED_STATE'][
|
|
desired_landed_state].name, index))
|
|
loop_freq = 10 # Hz
|
|
rate = rospy.Rate(loop_freq)
|
|
landed_state_confirmed = False
|
|
for i in xrange(timeout * loop_freq):
|
|
if self.extended_state.landed_state == desired_landed_state:
|
|
landed_state_confirmed = True
|
|
rospy.loginfo("landed state confirmed | seconds: {0} of {1}".
|
|
format(i / loop_freq, timeout))
|
|
break
|
|
|
|
try:
|
|
rate.sleep()
|
|
except rospy.ROSException as e:
|
|
self.fail(e)
|
|
|
|
self.assertTrue(landed_state_confirmed, (
|
|
"landed state not detected | desired: {0}, current: {1} | index: {2}, timeout(seconds): {3}".
|
|
format(mavutil.mavlink.enums['MAV_LANDED_STATE'][
|
|
desired_landed_state].name, mavutil.mavlink.enums[
|
|
'MAV_LANDED_STATE'][self.extended_state.landed_state].name,
|
|
index, timeout)))
|
|
|
|
def wait_for_vtol_state(self, transition, timeout, index):
|
|
"""Wait for VTOL transition, timeout(int): seconds"""
|
|
rospy.loginfo(
|
|
"waiting for VTOL transition | transition: {0}, index: {1}".format(
|
|
mavutil.mavlink.enums['MAV_VTOL_STATE'][
|
|
transition].name, index))
|
|
loop_freq = 10 # Hz
|
|
rate = rospy.Rate(loop_freq)
|
|
transitioned = False
|
|
for i in xrange(timeout * loop_freq):
|
|
if transition == self.extended_state.vtol_state:
|
|
rospy.loginfo("transitioned | seconds: {0} of {1}".format(
|
|
i / loop_freq, timeout))
|
|
transitioned = True
|
|
break
|
|
|
|
try:
|
|
rate.sleep()
|
|
except rospy.ROSException as e:
|
|
self.fail(e)
|
|
|
|
self.assertTrue(transitioned, (
|
|
"transition not detected | desired: {0}, current: {1} | index: {2} timeout(seconds): {3}".
|
|
format(mavutil.mavlink.enums['MAV_VTOL_STATE'][transition].name,
|
|
mavutil.mavlink.enums['MAV_VTOL_STATE'][
|
|
self.extended_state.vtol_state].name, index, timeout)))
|
|
|
|
def clear_wps(self, timeout):
|
|
"""timeout(int): seconds"""
|
|
loop_freq = 1 # Hz
|
|
rate = rospy.Rate(loop_freq)
|
|
wps_cleared = False
|
|
for i in xrange(timeout * loop_freq):
|
|
if not self.mission_wp.waypoints:
|
|
wps_cleared = True
|
|
rospy.loginfo("clear waypoints success | seconds: {0} of {1}".
|
|
format(i / loop_freq, timeout))
|
|
break
|
|
else:
|
|
try:
|
|
res = self.wp_clear_srv()
|
|
if not res.success:
|
|
rospy.logerr("failed to send waypoint clear command")
|
|
except rospy.ServiceException as e:
|
|
rospy.logerr(e)
|
|
|
|
try:
|
|
rate.sleep()
|
|
except rospy.ROSException as e:
|
|
self.fail(e)
|
|
|
|
self.assertTrue(wps_cleared, (
|
|
"failed to clear waypoints | timeout(seconds): {0}".format(timeout)
|
|
))
|
|
|
|
def send_wps(self, waypoints, timeout):
|
|
"""waypoints, timeout(int): seconds"""
|
|
rospy.loginfo("sending mission waypoints")
|
|
if self.mission_wp.waypoints:
|
|
rospy.loginfo("FCU already has mission waypoints")
|
|
|
|
loop_freq = 1 # Hz
|
|
rate = rospy.Rate(loop_freq)
|
|
wps_sent = False
|
|
wps_verified = False
|
|
for i in xrange(timeout * loop_freq):
|
|
if not wps_sent:
|
|
try:
|
|
res = self.wp_push_srv(start_index=0, waypoints=waypoints)
|
|
wps_sent = res.success
|
|
if wps_sent:
|
|
rospy.loginfo("waypoints successfully transferred")
|
|
except rospy.ServiceException as e:
|
|
rospy.logerr(e)
|
|
else:
|
|
if len(waypoints) == len(self.mission_wp.waypoints):
|
|
rospy.loginfo("number of waypoints transferred: {0}".
|
|
format(len(waypoints)))
|
|
wps_verified = True
|
|
|
|
if wps_sent and wps_verified:
|
|
rospy.loginfo("send waypoints success | seconds: {0} of {1}".
|
|
format(i / loop_freq, timeout))
|
|
break
|
|
|
|
try:
|
|
rate.sleep()
|
|
except rospy.ROSException as e:
|
|
self.fail(e)
|
|
|
|
self.assertTrue((
|
|
wps_sent and wps_verified
|
|
), "mission could not be transferred and verified | timeout(seconds): {0}".
|
|
format(timeout))
|
|
|
|
def wait_for_mav_type(self, timeout):
|
|
"""Wait for MAV_TYPE parameter, timeout(int): seconds"""
|
|
rospy.loginfo("waiting for MAV_TYPE")
|
|
loop_freq = 1 # Hz
|
|
rate = rospy.Rate(loop_freq)
|
|
res = False
|
|
for i in xrange(timeout * loop_freq):
|
|
try:
|
|
res = self.get_param_srv('MAV_TYPE')
|
|
if res.success:
|
|
self.mav_type = res.value.integer
|
|
rospy.loginfo(
|
|
"MAV_TYPE received | type: {0} | seconds: {1} of {2}".
|
|
format(mavutil.mavlink.enums['MAV_TYPE'][self.mav_type]
|
|
.name, i / loop_freq, timeout))
|
|
break
|
|
except rospy.ServiceException as e:
|
|
rospy.logerr(e)
|
|
|
|
try:
|
|
rate.sleep()
|
|
except rospy.ROSException as e:
|
|
self.fail(e)
|
|
|
|
self.assertTrue(res.success, (
|
|
"MAV_TYPE param get failed | timeout(seconds): {0}".format(timeout)
|
|
))
|
|
|
|
def log_topic_vars(self):
|
|
"""log the state of topic variables"""
|
|
rospy.loginfo("========================")
|
|
rospy.loginfo("===== topic values =====")
|
|
rospy.loginfo("========================")
|
|
rospy.loginfo("altitude:\n{}".format(self.altitude))
|
|
rospy.loginfo("========================")
|
|
rospy.loginfo("extended_state:\n{}".format(self.extended_state))
|
|
rospy.loginfo("========================")
|
|
rospy.loginfo("global_position:\n{}".format(self.global_position))
|
|
rospy.loginfo("========================")
|
|
rospy.loginfo("home_position:\n{}".format(self.home_position))
|
|
rospy.loginfo("========================")
|
|
rospy.loginfo("local_position:\n{}".format(self.local_position))
|
|
rospy.loginfo("========================")
|
|
rospy.loginfo("mission_wp:\n{}".format(self.mission_wp))
|
|
rospy.loginfo("========================")
|
|
rospy.loginfo("state:\n{}".format(self.state))
|
|
rospy.loginfo("========================")
|