px4-firmware/integrationtests/python_src/px4_it/mavros/mavros_test_common.py

461 lines
18 KiB
Python

#!/usr/bin/env python2
from __future__ import division
import unittest
import rospy
import math
from geometry_msgs.msg import PoseStamped
from mavros_msgs.msg import Altitude, ExtendedState, HomePosition, ParamValue, State, \
WaypointList
from mavros_msgs.srv import CommandBool, ParamGet, ParamSet, SetMode, SetModeRequest, WaypointClear, \
WaypointPush
from pymavlink import mavutil
from sensor_msgs.msg import NavSatFix, Imu
from six.moves import xrange
class MavrosTestCommon(unittest.TestCase):
def __init__(self, *args):
super(MavrosTestCommon, self).__init__(*args)
def setUp(self):
self.altitude = Altitude()
self.extended_state = ExtendedState()
self.global_position = NavSatFix()
self.imu_data = Imu()
self.home_position = HomePosition()
self.local_position = PoseStamped()
self.mission_wp = WaypointList()
self.state = State()
self.mav_type = None
self.sub_topics_ready = {
key: False
for key in [
'alt', 'ext_state', 'global_pos', 'home_pos', 'local_pos',
'mission_wp', 'state', 'imu'
]
}
# ROS services
service_timeout = 30
rospy.loginfo("waiting for ROS services")
try:
rospy.wait_for_service('mavros/param/get', service_timeout)
rospy.wait_for_service('mavros/param/set', service_timeout)
rospy.wait_for_service('mavros/cmd/arming', service_timeout)
rospy.wait_for_service('mavros/mission/push', service_timeout)
rospy.wait_for_service('mavros/mission/clear', service_timeout)
rospy.wait_for_service('mavros/set_mode', service_timeout)
rospy.loginfo("ROS services are up")
except rospy.ROSException:
self.fail("failed to connect to services")
self.get_param_srv = rospy.ServiceProxy('mavros/param/get', ParamGet)
self.set_param_srv = rospy.ServiceProxy('mavros/param/set', ParamSet)
self.set_arming_srv = rospy.ServiceProxy('mavros/cmd/arming',
CommandBool)
self.set_mode_srv = rospy.ServiceProxy('mavros/set_mode', SetMode)
self.wp_clear_srv = rospy.ServiceProxy('mavros/mission/clear',
WaypointClear)
self.wp_push_srv = rospy.ServiceProxy('mavros/mission/push',
WaypointPush)
# ROS subscribers
self.alt_sub = rospy.Subscriber('mavros/altitude', Altitude,
self.altitude_callback)
self.ext_state_sub = rospy.Subscriber('mavros/extended_state',
ExtendedState,
self.extended_state_callback)
self.global_pos_sub = rospy.Subscriber('mavros/global_position/global',
NavSatFix,
self.global_position_callback)
self.imu_data_sub = rospy.Subscriber('mavros/imu/data',
Imu,
self.imu_data_callback)
self.home_pos_sub = rospy.Subscriber('mavros/home_position/home',
HomePosition,
self.home_position_callback)
self.local_pos_sub = rospy.Subscriber('mavros/local_position/pose',
PoseStamped,
self.local_position_callback)
self.mission_wp_sub = rospy.Subscriber(
'mavros/mission/waypoints', WaypointList, self.mission_wp_callback)
self.state_sub = rospy.Subscriber('mavros/state', State,
self.state_callback)
def tearDown(self):
self.log_topic_vars()
#
# Callback functions
#
def altitude_callback(self, data):
self.altitude = data
# amsl has been observed to be nan while other fields are valid
if not self.sub_topics_ready['alt'] and not math.isnan(data.amsl):
self.sub_topics_ready['alt'] = True
def extended_state_callback(self, data):
if self.extended_state.vtol_state != data.vtol_state:
rospy.loginfo("VTOL state changed from {0} to {1}".format(
mavutil.mavlink.enums['MAV_VTOL_STATE']
[self.extended_state.vtol_state].name, mavutil.mavlink.enums[
'MAV_VTOL_STATE'][data.vtol_state].name))
if self.extended_state.landed_state != data.landed_state:
rospy.loginfo("landed state changed from {0} to {1}".format(
mavutil.mavlink.enums['MAV_LANDED_STATE']
[self.extended_state.landed_state].name, mavutil.mavlink.enums[
'MAV_LANDED_STATE'][data.landed_state].name))
self.extended_state = data
if not self.sub_topics_ready['ext_state']:
self.sub_topics_ready['ext_state'] = True
def global_position_callback(self, data):
self.global_position = data
if not self.sub_topics_ready['global_pos']:
self.sub_topics_ready['global_pos'] = True
def imu_data_callback(self, data):
self.imu_data = data
if not self.sub_topics_ready['imu']:
self.sub_topics_ready['imu'] = True
def home_position_callback(self, data):
self.home_position = data
if not self.sub_topics_ready['home_pos']:
self.sub_topics_ready['home_pos'] = True
def local_position_callback(self, data):
self.local_position = data
if not self.sub_topics_ready['local_pos']:
self.sub_topics_ready['local_pos'] = True
def mission_wp_callback(self, data):
if self.mission_wp.current_seq != data.current_seq:
rospy.loginfo("current mission waypoint sequence updated: {0}".
format(data.current_seq))
self.mission_wp = data
if not self.sub_topics_ready['mission_wp']:
self.sub_topics_ready['mission_wp'] = True
def state_callback(self, data):
if self.state.armed != data.armed:
rospy.loginfo("armed state changed from {0} to {1}".format(
self.state.armed, data.armed))
if self.state.connected != data.connected:
rospy.loginfo("connected changed from {0} to {1}".format(
self.state.connected, data.connected))
if self.state.mode != data.mode:
rospy.loginfo("mode changed from {0} to {1}".format(
self.state.mode, data.mode))
if self.state.system_status != data.system_status:
rospy.loginfo("system_status changed from {0} to {1}".format(
mavutil.mavlink.enums['MAV_STATE'][
self.state.system_status].name, mavutil.mavlink.enums[
'MAV_STATE'][data.system_status].name))
self.state = data
# mavros publishes a disconnected state message on init
if not self.sub_topics_ready['state'] and data.connected:
self.sub_topics_ready['state'] = True
#
# Helper methods
#
def set_arm(self, arm, timeout):
"""arm: True to arm or False to disarm, timeout(int): seconds"""
rospy.loginfo("setting FCU arm: {0}".format(arm))
old_arm = self.state.armed
loop_freq = 1 # Hz
rate = rospy.Rate(loop_freq)
arm_set = False
for i in xrange(timeout * loop_freq):
if self.state.armed == arm:
arm_set = True
rospy.loginfo("set arm success | seconds: {0} of {1}".format(
i / loop_freq, timeout))
break
else:
try:
res = self.set_arming_srv(arm)
if not res.success:
rospy.logerr("failed to send arm command")
except rospy.ServiceException as e:
rospy.logerr(e)
try:
rate.sleep()
except rospy.ROSException as e:
self.fail(e)
self.assertTrue(arm_set, (
"failed to set arm | new arm: {0}, old arm: {1} | timeout(seconds): {2}".
format(arm, old_arm, timeout)))
def set_mode(self, mode, timeout):
"""mode: PX4 mode string, timeout(int): seconds"""
rospy.loginfo("setting FCU mode: {0}".format(mode))
old_mode = self.state.mode
loop_freq = 1 # Hz
rate = rospy.Rate(loop_freq)
mode_set = False
for i in xrange(timeout * loop_freq):
if self.state.mode == mode:
mode_set = True
rospy.loginfo("set mode success | seconds: {0} of {1}".format(
i / loop_freq, timeout))
break
else:
try:
res = self.set_mode_srv(0, mode) # 0 is custom mode
if not res.mode_sent:
rospy.logerr("failed to send mode command")
except rospy.ServiceException as e:
rospy.logerr(e)
try:
rate.sleep()
except rospy.ROSException as e:
self.fail(e)
self.assertTrue(mode_set, (
"failed to set mode | new mode: {0}, old mode: {1} | timeout(seconds): {2}".
format(mode, old_mode, timeout)))
def set_param(self, param_id, param_value, timeout):
"""param: PX4 param string, ParamValue, timeout(int): seconds"""
if param_value.integer != 0:
value = param_value.integer
else:
value = param_value.real
rospy.loginfo("setting PX4 parameter: {0} with value {1}".
format(param_id, value))
loop_freq = 1 # Hz
rate = rospy.Rate(loop_freq)
param_set = False
for i in xrange(timeout * loop_freq):
try:
res = self.set_param_srv(param_id, param_value)
if res.success:
rospy.loginfo("param {0} set to {1} | seconds: {2} of {3}".
format(param_id, value, i / loop_freq, timeout))
break
except rospy.ServiceException as e:
rospy.logerr(e)
try:
rate.sleep()
except rospy.ROSException as e:
self.fail(e)
self.assertTrue(res.success, (
"failed to set param | param_id: {0}, param_value: {1} | timeout(seconds): {2}".
format(param_id, value, timeout)))
def wait_for_topics(self, timeout):
"""wait for simulation to be ready, make sure we're getting topic info
from all topics by checking dictionary of flag values set in callbacks,
timeout(int): seconds"""
rospy.loginfo("waiting for subscribed topics to be ready")
loop_freq = 1 # Hz
rate = rospy.Rate(loop_freq)
simulation_ready = False
for i in xrange(timeout * loop_freq):
if all(value for value in self.sub_topics_ready.values()):
simulation_ready = True
rospy.loginfo("simulation topics ready | seconds: {0} of {1}".
format(i / loop_freq, timeout))
break
try:
rate.sleep()
except rospy.ROSException as e:
self.fail(e)
self.assertTrue(simulation_ready, (
"failed to hear from all subscribed simulation topics | topic ready flags: {0} | timeout(seconds): {1}".
format(self.sub_topics_ready, timeout)))
def wait_for_landed_state(self, desired_landed_state, timeout, index):
rospy.loginfo("waiting for landed state | state: {0}, index: {1}".
format(mavutil.mavlink.enums['MAV_LANDED_STATE'][
desired_landed_state].name, index))
loop_freq = 10 # Hz
rate = rospy.Rate(loop_freq)
landed_state_confirmed = False
for i in xrange(timeout * loop_freq):
if self.extended_state.landed_state == desired_landed_state:
landed_state_confirmed = True
rospy.loginfo("landed state confirmed | seconds: {0} of {1}".
format(i / loop_freq, timeout))
break
try:
rate.sleep()
except rospy.ROSException as e:
self.fail(e)
self.assertTrue(landed_state_confirmed, (
"landed state not detected | desired: {0}, current: {1} | index: {2}, timeout(seconds): {3}".
format(mavutil.mavlink.enums['MAV_LANDED_STATE'][
desired_landed_state].name, mavutil.mavlink.enums[
'MAV_LANDED_STATE'][self.extended_state.landed_state].name,
index, timeout)))
def wait_for_vtol_state(self, transition, timeout, index):
"""Wait for VTOL transition, timeout(int): seconds"""
rospy.loginfo(
"waiting for VTOL transition | transition: {0}, index: {1}".format(
mavutil.mavlink.enums['MAV_VTOL_STATE'][
transition].name, index))
loop_freq = 10 # Hz
rate = rospy.Rate(loop_freq)
transitioned = False
for i in xrange(timeout * loop_freq):
if transition == self.extended_state.vtol_state:
rospy.loginfo("transitioned | seconds: {0} of {1}".format(
i / loop_freq, timeout))
transitioned = True
break
try:
rate.sleep()
except rospy.ROSException as e:
self.fail(e)
self.assertTrue(transitioned, (
"transition not detected | desired: {0}, current: {1} | index: {2} timeout(seconds): {3}".
format(mavutil.mavlink.enums['MAV_VTOL_STATE'][transition].name,
mavutil.mavlink.enums['MAV_VTOL_STATE'][
self.extended_state.vtol_state].name, index, timeout)))
def clear_wps(self, timeout):
"""timeout(int): seconds"""
loop_freq = 1 # Hz
rate = rospy.Rate(loop_freq)
wps_cleared = False
for i in xrange(timeout * loop_freq):
if not self.mission_wp.waypoints:
wps_cleared = True
rospy.loginfo("clear waypoints success | seconds: {0} of {1}".
format(i / loop_freq, timeout))
break
else:
try:
res = self.wp_clear_srv()
if not res.success:
rospy.logerr("failed to send waypoint clear command")
except rospy.ServiceException as e:
rospy.logerr(e)
try:
rate.sleep()
except rospy.ROSException as e:
self.fail(e)
self.assertTrue(wps_cleared, (
"failed to clear waypoints | timeout(seconds): {0}".format(timeout)
))
def send_wps(self, waypoints, timeout):
"""waypoints, timeout(int): seconds"""
rospy.loginfo("sending mission waypoints")
if self.mission_wp.waypoints:
rospy.loginfo("FCU already has mission waypoints")
loop_freq = 1 # Hz
rate = rospy.Rate(loop_freq)
wps_sent = False
wps_verified = False
for i in xrange(timeout * loop_freq):
if not wps_sent:
try:
res = self.wp_push_srv(start_index=0, waypoints=waypoints)
wps_sent = res.success
if wps_sent:
rospy.loginfo("waypoints successfully transferred")
except rospy.ServiceException as e:
rospy.logerr(e)
else:
if len(waypoints) == len(self.mission_wp.waypoints):
rospy.loginfo("number of waypoints transferred: {0}".
format(len(waypoints)))
wps_verified = True
if wps_sent and wps_verified:
rospy.loginfo("send waypoints success | seconds: {0} of {1}".
format(i / loop_freq, timeout))
break
try:
rate.sleep()
except rospy.ROSException as e:
self.fail(e)
self.assertTrue((
wps_sent and wps_verified
), "mission could not be transferred and verified | timeout(seconds): {0}".
format(timeout))
def wait_for_mav_type(self, timeout):
"""Wait for MAV_TYPE parameter, timeout(int): seconds"""
rospy.loginfo("waiting for MAV_TYPE")
loop_freq = 1 # Hz
rate = rospy.Rate(loop_freq)
res = False
for i in xrange(timeout * loop_freq):
try:
res = self.get_param_srv('MAV_TYPE')
if res.success:
self.mav_type = res.value.integer
rospy.loginfo(
"MAV_TYPE received | type: {0} | seconds: {1} of {2}".
format(mavutil.mavlink.enums['MAV_TYPE'][self.mav_type]
.name, i / loop_freq, timeout))
break
except rospy.ServiceException as e:
rospy.logerr(e)
try:
rate.sleep()
except rospy.ROSException as e:
self.fail(e)
self.assertTrue(res.success, (
"MAV_TYPE param get failed | timeout(seconds): {0}".format(timeout)
))
def log_topic_vars(self):
"""log the state of topic variables"""
rospy.loginfo("========================")
rospy.loginfo("===== topic values =====")
rospy.loginfo("========================")
rospy.loginfo("altitude:\n{}".format(self.altitude))
rospy.loginfo("========================")
rospy.loginfo("extended_state:\n{}".format(self.extended_state))
rospy.loginfo("========================")
rospy.loginfo("global_position:\n{}".format(self.global_position))
rospy.loginfo("========================")
rospy.loginfo("home_position:\n{}".format(self.home_position))
rospy.loginfo("========================")
rospy.loginfo("local_position:\n{}".format(self.local_position))
rospy.loginfo("========================")
rospy.loginfo("mission_wp:\n{}".format(self.mission_wp))
rospy.loginfo("========================")
rospy.loginfo("state:\n{}".format(self.state))
rospy.loginfo("========================")