2016-11-08 07:06:05 -04:00
from __future__ import print_function
2018-03-15 08:54:34 -03:00
import abc
2016-07-31 07:22:06 -03:00
import math
2018-11-09 08:32:02 -04:00
import itertools
2018-03-15 08:54:34 -03:00
import os
2018-11-09 08:32:02 -04:00
import re
2018-03-15 08:54:34 -03:00
import shutil
import sys
2016-07-31 07:22:06 -03:00
import time
2018-08-20 13:40:24 -03:00
import pexpect
2018-08-23 04:59:20 -03:00
import fnmatch
2016-07-31 07:22:06 -03:00
2017-08-16 10:55:21 -03:00
from pymavlink import mavwp , mavutil
2018-03-15 19:56:37 -03:00
from pysim import util , vehicleinfo
2016-07-31 07:22:06 -03:00
2011-11-12 05:01:58 -04:00
# a list of pexpect objects to read while waiting for
# messages. This keeps the output to stdout flowing
expect_list = [ ]
2018-03-05 11:14:34 -04:00
# get location of scripts
testdir = os . path . dirname ( os . path . realpath ( __file__ ) )
# Check python version for abstract base class
if sys . version_info [ 0 ] > = 3 and sys . version_info [ 1 ] > = 4 :
2018-11-12 01:39:12 -04:00
ABC = abc . ABC
2018-03-05 11:14:34 -04:00
else :
ABC = abc . ABCMeta ( ' ABC ' , ( ) , { } )
2018-04-27 11:48:06 -03:00
class ErrorException ( Exception ) :
""" Base class for other exceptions """
2017-08-26 02:21:31 -03:00
pass
2018-05-09 00:32:23 -03:00
2018-04-27 11:48:06 -03:00
class AutoTestTimeoutException ( ErrorException ) :
pass
2018-05-09 00:32:23 -03:00
2018-04-27 11:48:06 -03:00
class WaitModeTimeout ( AutoTestTimeoutException ) :
""" Thrown when fails to achieve given mode change. """
pass
2018-05-09 00:32:23 -03:00
2018-04-27 11:48:06 -03:00
class WaitAltitudeTimout ( AutoTestTimeoutException ) :
""" Thrown when fails to achieve given altitude range. """
pass
2018-05-09 00:32:23 -03:00
2018-04-27 11:48:06 -03:00
class WaitGroundSpeedTimeout ( AutoTestTimeoutException ) :
""" Thrown when fails to achieve given ground speed range. """
pass
2018-05-09 00:32:23 -03:00
2018-04-27 11:48:06 -03:00
class WaitRollTimeout ( AutoTestTimeoutException ) :
""" Thrown when fails to achieve given roll in degrees. """
pass
2018-05-09 00:32:23 -03:00
2018-04-27 11:48:06 -03:00
class WaitPitchTimeout ( AutoTestTimeoutException ) :
""" Thrown when fails to achieve given pitch in degrees. """
pass
2018-05-09 00:32:23 -03:00
2018-04-27 11:48:06 -03:00
class WaitHeadingTimeout ( AutoTestTimeoutException ) :
""" Thrown when fails to achieve given heading. """
pass
2018-05-09 00:32:23 -03:00
2018-04-27 11:48:06 -03:00
class WaitDistanceTimeout ( AutoTestTimeoutException ) :
""" Thrown when fails to attain distance """
pass
2018-05-09 00:32:23 -03:00
2018-04-27 11:48:06 -03:00
class WaitLocationTimeout ( AutoTestTimeoutException ) :
""" Thrown when fails to attain location """
pass
2018-05-09 00:32:23 -03:00
2018-04-27 11:48:06 -03:00
class WaitWaypointTimeout ( AutoTestTimeoutException ) :
""" Thrown when fails to attain waypoint ranges """
pass
2018-05-09 00:32:23 -03:00
2018-04-27 11:48:06 -03:00
class SetRCTimeout ( AutoTestTimeoutException ) :
""" Thrown when fails to send RC commands """
pass
2018-05-09 00:32:23 -03:00
2018-04-27 11:48:06 -03:00
class MsgRcvTimeoutException ( AutoTestTimeoutException ) :
""" Thrown when fails to receive an expected message """
pass
2018-05-09 00:32:23 -03:00
2018-04-27 11:48:06 -03:00
class NotAchievedException ( ErrorException ) :
""" Thrown when fails to achieve a goal """
pass
2018-05-09 00:32:23 -03:00
2018-04-27 11:48:06 -03:00
class PreconditionFailedException ( ErrorException ) :
""" Thrown when a precondition for a test is not met """
pass
2016-07-31 07:22:06 -03:00
2018-05-09 00:32:23 -03:00
2018-07-14 21:41:03 -03:00
class Context ( object ) :
def __init__ ( self ) :
self . parameters = [ ]
2018-07-31 06:50:02 -03:00
2018-03-14 08:08:53 -03:00
class AutoTest ( ABC ) :
2018-03-05 11:14:34 -04:00
""" Base abstract class.
It implements the common function for all vehicle types .
"""
2018-03-15 08:31:19 -03:00
def __init__ ( self ,
viewerip = None ,
use_map = False ) :
2018-03-05 11:14:34 -04:00
self . mavproxy = None
self . mav = None
2018-03-15 08:31:19 -03:00
self . viewerip = viewerip
self . use_map = use_map
2018-07-14 21:41:03 -03:00
self . contexts = [ ]
self . context_push ( )
2018-07-31 06:49:22 -03:00
self . buildlog = None
self . copy_tlog = False
self . logfile = None
2018-09-07 20:48:45 -03:00
self . max_set_rc_timeout = 0
2018-11-09 08:32:02 -04:00
self . last_wp_load = 0
2018-03-05 11:14:34 -04:00
2018-07-12 06:44:37 -03:00
@staticmethod
def progress ( text ) :
2018-03-14 08:08:53 -03:00
""" Display autotest progress text. """
print ( " AUTOTEST: " + text )
# following two functions swiped from autotest.py:
2018-07-12 06:44:37 -03:00
@staticmethod
def buildlogs_dirpath ( ) :
2018-03-14 08:08:53 -03:00
return os . getenv ( " BUILDLOGS " , util . reltopdir ( " ../buildlogs " ) )
def buildlogs_path ( self , path ) :
2018-07-31 06:50:02 -03:00
""" Return a string representing path in the buildlogs directory. """
2018-03-14 08:08:53 -03:00
bits = [ self . buildlogs_dirpath ( ) ]
if isinstance ( path , list ) :
bits . extend ( path )
else :
bits . append ( path )
return os . path . join ( * bits )
2018-03-15 08:31:19 -03:00
def sitl_streamrate ( self ) :
2018-07-31 06:50:02 -03:00
""" Allow subclasses to override SITL streamrate. """
2018-03-15 08:31:19 -03:00
return 10
2018-08-03 21:04:19 -03:00
def autotest_connection_hostport ( self ) :
''' returns host and port of connection between MAVProxy and autotest,
colon - separated '''
return " 127.0.0.1:19550 "
def autotest_connection_string_from_mavproxy ( self ) :
return " tcpin: " + self . autotest_connection_hostport ( )
def autotest_connection_string_to_mavproxy ( self ) :
return " tcp: " + self . autotest_connection_hostport ( )
2018-03-15 08:31:19 -03:00
def mavproxy_options ( self ) :
2018-07-31 06:50:02 -03:00
""" Returns options to be passed to MAVProxy. """
2018-03-15 08:31:19 -03:00
ret = [ ' --sitl=127.0.0.1:5501 ' ,
2018-08-03 21:04:19 -03:00
' --out= ' + self . autotest_connection_string_from_mavproxy ( ) ,
2018-03-15 08:31:19 -03:00
' --streamrate= %u ' % self . sitl_streamrate ( ) ]
if self . viewerip :
ret . append ( " --out= %s :14550 " % self . viewerip )
if self . use_map :
ret . append ( ' --map ' )
return ret
2018-03-15 19:56:37 -03:00
def vehicleinfo_key ( self ) :
return self . log_name
2018-06-26 21:56:01 -03:00
def apply_defaultfile_parameters ( self ) :
2018-07-31 06:50:02 -03:00
""" Apply parameter file. """
2018-03-15 19:56:37 -03:00
# setup test parameters
vinfo = vehicleinfo . VehicleInfo ( )
if self . params is None :
frames = vinfo . options [ self . vehicleinfo_key ( ) ] [ " frames " ]
self . params = frames [ self . frame ] [ " default_params_filename " ]
if not isinstance ( self . params , list ) :
self . params = [ self . params ]
for x in self . params :
self . mavproxy . send ( " param load %s \n " % os . path . join ( testdir , x ) )
self . mavproxy . expect ( ' Loaded [0-9]+ parameters ' )
self . set_parameter ( ' LOG_REPLAY ' , 1 )
self . set_parameter ( ' LOG_DISARMED ' , 1 )
2018-06-26 21:56:01 -03:00
self . reboot_sitl ( )
2018-08-22 21:15:46 -03:00
self . fetch_parameters ( )
2018-03-15 19:56:37 -03:00
2018-07-14 21:41:03 -03:00
def fetch_parameters ( self ) :
self . mavproxy . send ( " param fetch \n " )
self . mavproxy . expect ( " Received [0-9]+ parameters " )
2018-06-26 21:56:01 -03:00
def reboot_sitl ( self ) :
2018-07-31 06:50:02 -03:00
""" Reboot SITL instance and wait it to reconnect. """
2018-06-26 21:56:01 -03:00
self . mavproxy . send ( " reboot \n " )
2018-10-10 18:35:53 -03:00
self . mavproxy . expect ( " Initialising APM " )
2018-06-29 07:38:10 -03:00
# empty mav to avoid getting old timestamps:
if self . mav is not None :
while self . mav . recv_match ( blocking = False ) :
pass
# after reboot stream-rates may be zero. Prompt MAVProxy to
# send a rate-change message by changing away from our normal
# stream rates and back again:
if self . mav is not None :
tstart = self . get_sim_time ( )
while True :
2018-10-08 04:10:53 -03:00
self . mavproxy . send ( " set streamrate %u \n " % ( self . sitl_streamrate ( ) + 1 ) )
2018-06-29 07:38:10 -03:00
if self . mav is None :
break
if self . get_sim_time ( ) - tstart > 10 :
2018-10-17 23:55:16 -03:00
raise AutoTestTimeoutException ( " SYSTEM_TIME not received " )
2018-06-29 07:38:10 -03:00
m = self . mav . recv_match ( type = ' SYSTEM_TIME ' ,
blocking = True ,
timeout = 1 )
if m is not None :
print ( " Received ( %s ) " % str ( m ) )
2018-07-12 06:44:03 -03:00
break
2018-06-29 07:38:10 -03:00
self . mavproxy . send ( " set streamrate %u \n " % self . sitl_streamrate ( ) )
self . progress ( " Reboot complete " )
2018-03-15 19:56:37 -03:00
2018-03-15 08:54:34 -03:00
def close ( self ) :
2018-07-31 06:50:02 -03:00
""" Tidy up after running all tests. """
2018-03-15 08:54:34 -03:00
if self . use_map :
self . mavproxy . send ( " module unload map \n " )
self . mavproxy . expect ( " Unloaded module map " )
self . mav . close ( )
util . pexpect_close ( self . mavproxy )
util . pexpect_close ( self . sitl )
valgrind_log = util . valgrind_log_filepath ( binary = self . binary ,
model = self . frame )
if os . path . exists ( valgrind_log ) :
os . chmod ( valgrind_log , 0o644 )
shutil . copy ( valgrind_log ,
self . buildlogs_path ( " %s -valgrind.log " %
self . log_name ) )
2018-04-14 08:31:22 -03:00
def start_test ( self , description ) :
self . progress ( " # " )
self . progress ( " ########## %s ########## " % description )
self . progress ( " # " )
2018-07-31 06:49:22 -03:00
def try_symlink_tlog ( self ) :
self . buildlog = self . buildlogs_path ( self . log_name + " -test.tlog " )
self . progress ( " buildlog= %s " % self . buildlog )
if os . path . exists ( self . buildlog ) :
os . unlink ( self . buildlog )
try :
os . link ( self . logfile , self . buildlog )
except OSError as error :
self . progress ( " OSError [ %d ]: %s " % ( error . errno , error . strerror ) )
self . progress ( " WARN: Failed to create symlink: %s => %s , "
" will copy tlog manually to target location " %
( self . logfile , self . buildlog ) )
self . copy_tlog = True
2018-03-14 08:08:53 -03:00
#################################################
# GENERAL UTILITIES
#################################################
def expect_list_clear ( self ) :
""" clear the expect list. """
global expect_list
for p in expect_list [ : ] :
expect_list . remove ( p )
def expect_list_extend ( self , list_to_add ) :
""" Extend the expect list. """
global expect_list
expect_list . extend ( list_to_add )
def idle_hook ( self , mav ) :
""" Called when waiting for a mavlink message. """
global expect_list
for p in expect_list :
util . pexpect_drain ( p )
def message_hook ( self , mav , msg ) :
""" Called as each mavlink msg is received. """
self . idle_hook ( mav )
def expect_callback ( self , e ) :
""" Called when waiting for a expect pattern. """
global expect_list
for p in expect_list :
if p == e :
continue
2018-10-02 12:44:16 -03:00
util . pexpect_drain ( p )
2018-03-14 08:08:53 -03:00
2018-09-07 20:48:45 -03:00
def drain_mav ( self ) :
count = 0
while self . mav . recv_match ( type = ' SYSTEM_TIME ' , blocking = False ) is not None :
count + = 1
self . progress ( " Drained %u messages from mav " % count )
2018-03-05 11:14:34 -04:00
#################################################
# SIM UTILITIES
#################################################
def get_sim_time ( self ) :
""" Get SITL time. """
m = self . mav . recv_match ( type = ' SYSTEM_TIME ' , blocking = True )
return m . time_boot_ms * 1.0e-3
2018-04-17 23:47:31 -03:00
def get_sim_time_cached ( self ) :
""" Get SITL time. """
x = self . mav . messages . get ( " SYSTEM_TIME " , None )
if x is None :
return self . get_sim_time ( )
return x . time_boot_ms * 1.0e-3
2018-03-05 11:14:34 -04:00
def sim_location ( self ) :
""" Return current simulator location. """
m = self . mav . recv_match ( type = ' SIMSTATE ' , blocking = True )
2018-03-14 08:08:53 -03:00
return mavutil . location ( m . lat * 1.0e-7 ,
m . lng * 1.0e-7 ,
0 ,
math . degrees ( m . yaw ) )
2018-03-05 11:14:34 -04:00
def save_wp ( self ) :
""" Trigger RC 7 to save waypoint. """
self . mavproxy . send ( ' rc 7 1000 \n ' )
2018-03-14 08:08:53 -03:00
self . mav . recv_match ( condition = ' RC_CHANNELS.chan7_raw==1000 ' ,
blocking = True )
2018-03-05 11:14:34 -04:00
self . wait_seconds ( 1 )
self . mavproxy . send ( ' rc 7 2000 \n ' )
2018-03-14 08:08:53 -03:00
self . mav . recv_match ( condition = ' RC_CHANNELS.chan7_raw==2000 ' ,
blocking = True )
2018-03-05 11:14:34 -04:00
self . wait_seconds ( 1 )
self . mavproxy . send ( ' rc 7 1000 \n ' )
2018-03-14 08:08:53 -03:00
self . mav . recv_match ( condition = ' RC_CHANNELS.chan7_raw==1000 ' ,
blocking = True )
2018-03-05 11:14:34 -04:00
self . wait_seconds ( 1 )
2018-08-16 23:48:21 -03:00
def clear_wp ( self ) :
""" Trigger RC 8 to clear waypoint. """
2018-09-06 01:32:00 -03:00
self . progress ( " Clearing waypoints " )
2018-08-16 23:48:21 -03:00
self . set_rc ( 8 , 1000 )
self . wait_seconds ( 0.5 )
self . set_rc ( 8 , 2000 )
self . wait_seconds ( 0.5 )
self . set_rc ( 8 , 1000 )
2018-09-06 01:32:00 -03:00
self . mavproxy . send ( ' wp list \n ' )
self . mavproxy . expect ( ' Requesting 0 waypoints ' )
2018-08-16 23:48:21 -03:00
2018-11-10 01:16:28 -04:00
def log_download ( self , filename , timeout = 360 , upload_logs = False ) :
2018-03-05 11:14:34 -04:00
""" Download latest log. """
self . mav . wait_heartbeat ( )
self . mavproxy . send ( " log list \n " )
self . mavproxy . expect ( " numLogs " )
self . mav . wait_heartbeat ( )
self . mav . wait_heartbeat ( )
self . mavproxy . send ( " set shownoise 0 \n " )
self . mavproxy . send ( " log download latest %s \n " % filename )
self . mavproxy . expect ( " Finished downloading " , timeout = timeout )
self . mav . wait_heartbeat ( )
self . mav . wait_heartbeat ( )
2018-11-10 01:16:28 -04:00
if upload_logs and not os . getenv ( " AUTOTEST_NO_UPLOAD " ) :
# optionally upload logs to server so we can see travis failure logs
import subprocess , glob , datetime
logdir = os . path . dirname ( filename )
datedir = datetime . datetime . now ( ) . strftime ( " % Y- % m- %d - % H- % M " )
flist = glob . glob ( " logs/*.BIN " )
for e in [ ' BIN ' , ' bin ' , ' tlog ' ] :
flist + = glob . glob ( os . path . join ( logdir , ' *. %s ' % e ) )
print ( " Uploading %u logs to http://firmware.ardupilot.org/CI-Logs/ %s " % ( len ( flist ) , datedir ) )
cmd = [ ' rsync ' , ' -avz ' ] + flist + [ ' cilogs@autotest.ardupilot.org::CI-Logs/ %s / ' % datedir ]
subprocess . call ( cmd )
2018-03-05 11:14:34 -04:00
def show_gps_and_sim_positions ( self , on_off ) :
""" Allow to display gps and actual position on map. """
if on_off is True :
# turn on simulator display of gps and actual position
self . mavproxy . send ( ' map set showgpspos 1 \n ' )
self . mavproxy . send ( ' map set showsimpos 1 \n ' )
else :
# turn off simulator display of gps and actual position
self . mavproxy . send ( ' map set showgpspos 0 \n ' )
self . mavproxy . send ( ' map set showsimpos 0 \n ' )
2018-07-12 06:44:37 -03:00
@staticmethod
def mission_count ( filename ) :
2018-03-05 11:14:34 -04:00
""" Load a mission from a file and return number of waypoints. """
wploader = mavwp . MAVWPLoader ( )
wploader . load ( filename )
num_wp = wploader . count ( )
return num_wp
2018-11-09 08:32:02 -04:00
def mission_directory ( self ) :
return testdir
def assert_mission_files_same ( self , file1 , file2 ) :
self . progress ( " Comparing ( %s ) and ( %s ) " % ( file1 , file2 , ) )
f1 = open ( file1 )
f2 = open ( file2 )
for l1 , l2 in itertools . izip ( f1 , f2 ) :
if l1 == l2 :
# e.g. the first "QGC WPL 110" line
continue
if re . match ( " 0 \ s " , l1 ) :
# home changes...
continue
l1 = l1 . rstrip ( )
l2 = l2 . rstrip ( )
fields1 = re . split ( " \ s+ " , l1 )
fields2 = re . split ( " \ s+ " , l2 )
line = int ( fields1 [ 0 ] )
t = int ( fields1 [ 3 ] ) # mission item type
for ( count , ( i1 , i2 ) ) in enumerate ( itertools . izip ( fields1 , fields2 ) ) :
if count == 2 : # frame
if t in [ mavutil . mavlink . MAV_CMD_DO_CHANGE_SPEED ,
mavutil . mavlink . MAV_CMD_CONDITION_YAW ,
mavutil . mavlink . MAV_CMD_NAV_RETURN_TO_LAUNCH ,
mavutil . mavlink . MAV_CMD_NAV_LOITER_TIME ,
mavutil . mavlink . MAV_CMD_DO_JUMP ,
mavutil . mavlink . MAV_CMD_DO_DIGICAM_CONTROL ,
] :
# ardupilot doesn't remember frame on these commands
if int ( i1 ) == 3 :
i1 = 0
if int ( i2 ) == 3 :
i2 = 0
if count == 6 : # param 3
if t in [ mavutil . mavlink . MAV_CMD_NAV_LOITER_TIME ] :
# ardupilot canonicalises this to -1 for ccw or 1 for cw.
if float ( i1 ) == 0 :
i1 = 1.0
if float ( i2 ) == 0 :
i2 = 1.0
if count == 7 : # param 4
if t == mavutil . mavlink . MAV_CMD_NAV_LAND :
# ardupilot canonicalises "0" to "1" param 4 (yaw)
if int ( float ( i1 ) ) == 0 :
i1 = 1
if int ( float ( i2 ) ) == 0 :
i2 = 1
if 0 < = count < = 3 or 11 < = count < = 11 :
if int ( i1 ) != int ( i2 ) :
raise ValueError ( " Files have different content: ( %s vs %s ) ( %s vs %s ) ( %d vs %d ) (count= %u ) " % ( file1 , file2 , l1 , l2 , int ( i1 ) , int ( i2 ) , count ) ) # NOCI
continue
if 4 < = count < = 10 :
delta = abs ( float ( i1 ) - float ( i2 ) )
max_allowed_delta = 0.000009
if delta > max_allowed_delta :
# print("Files have different (float) content: (%s) and (%s) (%s vs %s) (%f vs %f) (%.10f) (count=%u)" % (file1, file2, l1, l2, float(i1), float(i2), delta, count))
# sys.exit(0)
raise ValueError ( " Files have different (float) content: ( %s ) and ( %s ) ( %s vs %s ) ( %f vs %f ) ( %.10f ) (count= %u ) " % ( file1 , file2 , l1 , l2 , float ( i1 ) , float ( i2 ) , delta , count ) ) # NOCI
continue
raise ValueError ( " count %u not handled " % count )
self . progress ( " Files same " )
def load_mission ( self , filename ) :
2018-03-05 11:14:34 -04:00
""" Load a mission from a file to flight controller. """
2018-11-09 08:32:02 -04:00
path = os . path . join ( self . mission_directory ( ) , filename )
tstart = self . get_sim_time_cached ( )
while True :
t2 = self . get_sim_time ( )
if t2 - tstart > 10 :
raise AutoTestTimeoutException ( " Failed to do waypoint thing " )
self . mavproxy . send ( ' wp load %s \n ' % path )
self . mavproxy . expect ( ' Loaded ([0-9]+) waypoints from ' )
load_count = self . mavproxy . match . group ( 1 )
# the following hack is to get around MAVProxy statustext deduping:
while time . time ( ) - self . last_wp_load < 3 :
self . progress ( " Waiting for MAVProxy de-dupe timer to expire " )
time . sleep ( 1 )
self . last_wp_load = time . time ( )
self . mavproxy . expect ( " Flight plan received " )
self . mavproxy . send ( ' wp list \n ' )
self . mavproxy . expect ( ' Requesting ([0-9]+) waypoints ' )
request_count = self . mavproxy . match . group ( 1 )
if load_count != request_count :
self . progress ( " request_count= %s != load_count= %s " %
( request_count , load_count ) )
continue
self . mavproxy . expect ( ' Saved ([0-9]+) waypoints to (.+?way.txt) ' )
save_count = self . mavproxy . match . group ( 1 )
if save_count != request_count :
raise NotAchievedException ( " request count != load count " )
saved_filepath = util . reltopdir ( self . mavproxy . match . group ( 2 ) )
saved_filepath = saved_filepath . rstrip ( )
self . assert_mission_files_same ( path , saved_filepath )
break
self . mavproxy . send ( ' wp status \n ' )
self . mavproxy . expect ( ' Have ( \ d+) of ( \ d+) ' )
status_have = self . mavproxy . match . group ( 1 )
status_want = self . mavproxy . match . group ( 2 )
if status_have != status_want :
raise ValueError ( " status count mismatch " )
if status_have != save_count :
raise ValueError ( " status have not equal to save count " )
2018-03-05 11:14:34 -04:00
# update num_wp
wploader = mavwp . MAVWPLoader ( )
2018-11-09 08:32:02 -04:00
wploader . load ( path )
2018-03-05 11:14:34 -04:00
num_wp = wploader . count ( )
return num_wp
def save_mission_to_file ( self , filename ) :
""" Save a mission to a file """
2018-08-16 23:48:21 -03:00
self . mavproxy . send ( ' wp list \n ' )
self . mavproxy . expect ( ' Requesting [0-9]+ waypoints ' )
2018-03-05 11:14:34 -04:00
self . mavproxy . send ( ' wp save %s \n ' % filename )
self . mavproxy . expect ( ' Saved ([0-9]+) waypoints ' )
num_wp = int ( self . mavproxy . match . group ( 1 ) )
2018-03-14 08:08:53 -03:00
self . progress ( " num_wp: %d " % num_wp )
2018-03-05 11:14:34 -04:00
return num_wp
def set_rc_default ( self ) :
""" Setup all simulated RC control to 1500. """
for chan in range ( 1 , 16 ) :
self . mavproxy . send ( ' rc %u 1500 \n ' % chan )
2018-09-07 20:48:45 -03:00
def set_rc ( self , chan , pwm , timeout = 2000 ) :
2018-03-05 11:14:34 -04:00
""" Setup a simulated RC control to a PWM value """
2018-09-07 20:48:45 -03:00
self . drain_mav ( )
2018-03-05 11:14:34 -04:00
tstart = self . get_sim_time ( )
2018-09-07 20:48:45 -03:00
wclock = time . time ( )
2018-04-17 23:47:31 -03:00
while self . get_sim_time_cached ( ) < tstart + timeout :
2018-03-05 11:14:34 -04:00
self . mavproxy . send ( ' rc %u %u \n ' % ( chan , pwm ) )
m = self . mav . recv_match ( type = ' RC_CHANNELS ' , blocking = True )
chan_pwm = getattr ( m , " chan " + str ( chan ) + " _raw " )
2018-09-07 20:48:45 -03:00
wclock_delta = time . time ( ) - wclock
sim_time_delta = self . get_sim_time_cached ( ) - tstart
if sim_time_delta == 0 :
time_ratio = None
else :
time_ratio = wclock_delta / sim_time_delta
self . progress ( " set_rc (wc= %s st= %s r= %s ): want= %u got= %u " %
( wclock_delta ,
sim_time_delta ,
time_ratio ,
pwm ,
chan_pwm ) )
2018-03-05 11:14:34 -04:00
if chan_pwm == pwm :
2018-09-07 20:48:45 -03:00
delta = self . get_sim_time_cached ( ) - tstart
if delta > self . max_set_rc_timeout :
self . max_set_rc_timeout = delta
2018-03-05 11:14:34 -04:00
return True
2018-10-17 23:55:16 -03:00
raise SetRCTimeout ( (
" Failed to send RC commands to channel %s " % str ( chan ) ) )
2018-03-05 11:14:34 -04:00
2018-08-03 06:42:19 -03:00
def set_throttle_zero ( self ) :
""" Set throttle to zero. """
if self . mav . mav_type == mavutil . mavlink . MAV_TYPE_GROUND_ROVER :
self . set_rc ( 3 , 1500 )
else :
self . set_rc ( 3 , 1000 )
2018-09-17 11:50:30 -03:00
def set_output_to_max ( self , chan ) :
""" Set output to max with RC Radio taking into account REVERSED parameter. """
is_reversed = self . get_parameter ( " RC %u _REVERSED " % chan )
out_max = int ( self . get_parameter ( " RC %u _MAX " % chan ) )
out_min = int ( self . get_parameter ( " RC %u _MIN " % chan ) )
if is_reversed == 0 :
self . set_rc ( chan , out_max )
else :
self . set_rc ( chan , out_min )
def set_output_to_min ( self , chan ) :
""" Set output to min with RC Radio taking into account REVERSED parameter. """
is_reversed = self . get_parameter ( " RC %u _REVERSED " % chan )
out_max = int ( self . get_parameter ( " RC %u _MAX " % chan ) )
out_min = int ( self . get_parameter ( " RC %u _MIN " % chan ) )
if is_reversed == 0 :
self . set_rc ( chan , out_min )
else :
self . set_rc ( chan , out_max )
def set_output_to_trim ( self , chan ) :
""" Set output to trim with RC Radio. """
out_trim = int ( self . get_parameter ( " RC %u _TRIM " % chan ) )
self . set_rc ( chan , out_trim )
def get_rudder_channel ( self ) :
if self . mav . mav_type in [ mavutil . mavlink . MAV_TYPE_QUADROTOR ,
mavutil . mavlink . MAV_TYPE_HELICOPTER ,
mavutil . mavlink . MAV_TYPE_HEXAROTOR ,
mavutil . mavlink . MAV_TYPE_OCTOROTOR ,
mavutil . mavlink . MAV_TYPE_COAXIAL ,
mavutil . mavlink . MAV_TYPE_TRICOPTER ] :
return int ( self . get_parameter ( " RCMAP_YAW " ) )
if self . mav . mav_type == mavutil . mavlink . MAV_TYPE_FIXED_WING :
return int ( self . get_parameter ( " RCMAP_YAW " ) )
if self . mav . mav_type == mavutil . mavlink . MAV_TYPE_GROUND_ROVER :
return int ( self . get_parameter ( " RCMAP_ROLL " ) )
if self . mav . mav_type == mavutil . mavlink . MAV_TYPE_SUBMARINE :
raise ErrorException ( " Arming with rudder is not supported by Submarine " )
2018-05-10 04:15:38 -03:00
def armed ( self ) :
2018-07-31 06:50:02 -03:00
""" Return true if vehicle is armed and safetyoff """
2018-07-12 06:44:03 -03:00
return self . mav . motors_armed ( )
2018-05-10 04:15:38 -03:00
2018-09-10 04:56:28 -03:00
def arm_vehicle ( self , timeout = 20 ) :
2018-03-05 11:14:34 -04:00
""" Arm vehicle with mavlink arm message. """
2018-08-06 05:45:51 -03:00
self . progress ( " Arm motors with MAVLink cmd " )
self . run_cmd ( mavutil . mavlink . MAV_CMD_COMPONENT_ARM_DISARM ,
1 , # ARM
0 ,
0 ,
0 ,
0 ,
0 ,
0 ,
)
tstart = self . get_sim_time ( )
2018-09-10 04:56:28 -03:00
while self . get_sim_time ( ) - tstart < timeout :
2018-08-06 05:45:51 -03:00
self . mav . wait_heartbeat ( )
if self . mav . motors_armed ( ) :
self . progress ( " Motors ARMED " )
return True
2018-10-17 23:55:16 -03:00
raise AutoTestTimeoutException ( " Unable to ARM with mavlink " )
2018-08-06 05:45:51 -03:00
2018-09-10 04:56:28 -03:00
def disarm_vehicle ( self , timeout = 20 ) :
2018-08-06 05:45:51 -03:00
""" Disarm vehicle with mavlink disarm message. """
self . progress ( " Disarm motors with MAVLink cmd " )
self . run_cmd ( mavutil . mavlink . MAV_CMD_COMPONENT_ARM_DISARM ,
0 , # DISARM
0 ,
0 ,
0 ,
0 ,
0 ,
0 ,
)
tstart = self . get_sim_time ( )
2018-09-10 04:56:28 -03:00
while self . get_sim_time ( ) - tstart < timeout :
2018-08-06 05:45:51 -03:00
self . mav . wait_heartbeat ( )
if not self . mav . motors_armed ( ) :
self . progress ( " Motors DISARMED " )
return True
2018-10-17 23:55:16 -03:00
raise AutoTestTimeoutException ( " Unable to DISARM with mavlink " )
2018-08-06 05:45:51 -03:00
def mavproxy_arm_vehicle ( self ) :
""" Arm vehicle with mavlink arm message send from MAVProxy. """
self . progress ( " Arm motors with MavProxy " )
2018-03-05 11:14:34 -04:00
self . mavproxy . send ( ' arm throttle \n ' )
self . mav . motors_armed_wait ( )
2018-03-14 08:08:53 -03:00
self . progress ( " ARMED " )
2018-03-05 11:14:34 -04:00
return True
2018-08-06 05:45:51 -03:00
def mavproxy_disarm_vehicle ( self ) :
""" Disarm vehicle with mavlink disarm message send from MAVProxy. """
self . progress ( " Disarm motors with MavProxy " )
2018-03-05 11:14:34 -04:00
self . mavproxy . send ( ' disarm \n ' )
self . mav . motors_disarmed_wait ( )
2018-03-14 08:08:53 -03:00
self . progress ( " DISARMED " )
2018-03-05 11:14:34 -04:00
return True
2018-09-10 04:56:28 -03:00
def arm_motors_with_rc_input ( self , timeout = 20 ) :
2018-08-03 06:42:19 -03:00
""" Arm motors with radio. """
self . progress ( " Arm motors with radio " )
2018-09-17 11:50:30 -03:00
self . set_output_to_max ( self . get_rudder_channel ( ) )
2018-08-03 06:42:19 -03:00
tstart = self . get_sim_time ( )
2018-10-23 21:00:18 -03:00
while True :
2018-08-03 06:42:19 -03:00
self . mav . wait_heartbeat ( )
2018-09-17 11:50:30 -03:00
if self . mav . motors_armed ( ) :
2018-08-03 06:42:19 -03:00
arm_delay = self . get_sim_time ( ) - tstart
self . progress ( " MOTORS ARMED OK WITH RADIO " )
2018-09-17 11:50:30 -03:00
self . set_output_to_trim ( self . get_rudder_channel ( ) )
2018-08-03 06:42:19 -03:00
self . progress ( " Arm in %s s " % arm_delay ) # TODO check arming time
return True
2018-10-23 21:00:18 -03:00
tdelta = self . get_sim_time ( ) - tstart
print ( " Not armed after %f seconds " % ( tdelta ) )
if tdelta > timeout :
break
2018-08-03 06:42:19 -03:00
self . progress ( " FAILED TO ARM WITH RADIO " )
2018-09-17 11:50:30 -03:00
self . set_output_to_trim ( self . get_rudder_channel ( ) )
2018-08-03 06:42:19 -03:00
return False
2018-09-10 04:56:28 -03:00
def disarm_motors_with_rc_input ( self , timeout = 20 ) :
2018-08-03 06:42:19 -03:00
""" Disarm motors with radio. """
self . progress ( " Disarm motors with radio " )
2018-09-17 11:50:30 -03:00
self . set_output_to_min ( self . get_rudder_channel ( ) )
2018-08-03 06:42:19 -03:00
tstart = self . get_sim_time ( )
while self . get_sim_time ( ) < tstart + timeout :
self . mav . wait_heartbeat ( )
if not self . mav . motors_armed ( ) :
disarm_delay = self . get_sim_time ( ) - tstart
self . progress ( " MOTORS DISARMED OK WITH RADIO " )
2018-09-17 11:50:30 -03:00
self . set_output_to_trim ( self . get_rudder_channel ( ) )
2018-08-03 06:42:19 -03:00
self . progress ( " Disarm in %s s " % disarm_delay ) # TODO check disarming time
return True
self . progress ( " FAILED TO DISARM WITH RADIO " )
2018-09-17 11:50:30 -03:00
self . set_output_to_trim ( self . get_rudder_channel ( ) )
2018-08-03 06:42:19 -03:00
return False
2018-09-19 13:36:50 -03:00
def arm_motors_with_switch ( self , switch_chan , timeout = 20 ) :
""" Arm motors with switch. """
self . progress ( " Arm motors with switch %d " % switch_chan )
self . set_rc ( switch_chan , 2000 )
tstart = self . get_sim_time ( )
while self . get_sim_time ( ) < tstart + timeout :
self . mav . wait_heartbeat ( )
if self . mav . motors_armed ( ) :
self . progress ( " MOTORS ARMED OK WITH SWITCH " )
return True
self . progress ( " FAILED TO ARM WITH SWITCH " )
return False
def disarm_motors_with_switch ( self , switch_chan , timeout = 20 ) :
""" Disarm motors with switch. """
self . progress ( " Disarm motors with switch %d " % switch_chan )
self . set_rc ( switch_chan , 1000 )
tstart = self . get_sim_time ( )
while self . get_sim_time ( ) < tstart + timeout :
self . mav . wait_heartbeat ( )
if not self . mav . motors_armed ( ) :
self . progress ( " MOTORS DISARMED OK WITH SWITCH " )
return True
self . progress ( " FAILED TO DISARM WITH SWITCH " )
return False
2018-08-03 06:42:19 -03:00
def autodisarm_motors ( self ) :
""" Autodisarm motors. """
self . progress ( " Autodisarming motors " )
if self . mav . mav_type == mavutil . mavlink . MAV_TYPE_GROUND_ROVER : # NOT IMPLEMENTED ON ROVER
self . progress ( " MOTORS AUTODISARMED OK " )
return True
tstart = self . get_sim_time ( )
timeout = 15
while self . get_sim_time ( ) < tstart + timeout :
self . mav . wait_heartbeat ( )
if not self . mav . motors_armed ( ) :
disarm_delay = self . get_sim_time ( ) - tstart
self . progress ( " MOTORS AUTODISARMED " )
self . progress ( " Autodisarm in %s s " % disarm_delay ) # TODO check disarming time
return True
self . progress ( " FAILED TO AUTODISARM " )
return False
2018-08-23 04:59:20 -03:00
@staticmethod
def should_fetch_all_for_parameter_change ( param_name ) :
if fnmatch . fnmatch ( param_name , " *_ENABLE " ) or fnmatch . fnmatch ( param_name , " *_ENABLED " ) :
return True
if param_name in [ " ARSPD_TYPE " ,
" ARSPD2_TYPE " ,
" BATT2_MONITOR " ,
" CAN_DRIVER " ,
" COMPASS_PMOT_EN " ,
" OSD_TYPE " ,
" RSSI_TYPE " ,
" WENC_TYPE " ] :
return True
return False
2018-10-02 19:28:21 -03:00
def set_parameter ( self , name , value , add_to_context = True , epsilon = 0.00001 ) :
2018-07-31 06:50:02 -03:00
""" Set parameters from vehicle. """
2018-08-20 13:40:24 -03:00
old_value = self . get_parameter ( name , retry = 2 )
2018-03-05 11:14:34 -04:00
for i in range ( 1 , 10 ) :
self . mavproxy . send ( " param set %s %s \n " % ( name , str ( value ) ) )
2018-07-12 06:45:17 -03:00
returned_value = self . get_parameter ( name )
2018-10-02 19:28:21 -03:00
delta = float ( value ) - returned_value
if abs ( delta ) < epsilon :
2018-03-05 11:14:34 -04:00
# yes, exactly equal.
2018-07-14 21:41:03 -03:00
if add_to_context :
2018-07-31 06:50:02 -03:00
self . context_get ( ) . parameters . append ( ( name , old_value ) )
2018-08-27 10:29:48 -03:00
if self . should_fetch_all_for_parameter_change ( name . upper ( ) ) and value != 0 :
2018-08-23 04:59:20 -03:00
self . fetch_parameters ( )
2018-07-14 21:41:03 -03:00
return
2018-10-17 23:55:16 -03:00
raise ValueError ( " Param fetch returned incorrect value ( %s ) vs ( %s ) "
2018-03-14 08:08:53 -03:00
% ( returned_value , value ) )
2018-03-05 11:14:34 -04:00
2018-08-20 13:40:24 -03:00
def get_parameter ( self , name , retry = 1 , timeout = 60 ) :
2018-07-31 06:50:02 -03:00
""" Get parameters from vehicle. """
2018-08-20 13:40:24 -03:00
for i in range ( 0 , retry ) :
self . mavproxy . send ( " param fetch %s \n " % name )
try :
self . mavproxy . expect ( " %s = ([-0-9.]*) \r \n " % ( name , ) , timeout = timeout / retry )
return float ( self . mavproxy . match . group ( 1 ) )
except pexpect . TIMEOUT :
2018-10-08 23:53:28 -03:00
pass
raise NotAchievedException ( " Failed to retrieve parameter " )
2018-03-05 11:14:34 -04:00
2018-07-14 21:41:03 -03:00
def context_get ( self ) :
2018-07-31 06:50:02 -03:00
""" Get Saved parameters. """
2018-07-14 21:41:03 -03:00
return self . contexts [ - 1 ]
def context_push ( self ) :
2018-07-31 06:50:02 -03:00
""" Save a copy of the parameters. """
2018-07-14 21:41:03 -03:00
self . contexts . append ( Context ( ) )
def context_pop ( self ) :
2018-07-31 06:50:02 -03:00
""" Set parameters to origin values in reverse order. """
2018-07-14 21:41:03 -03:00
dead = self . contexts . pop ( )
dead_parameters = dead . parameters
dead_parameters . reverse ( )
for p in dead_parameters :
( name , old_value ) = p
self . set_parameter ( name ,
old_value ,
add_to_context = False )
2018-07-31 06:59:15 -03:00
def run_cmd ( self ,
command ,
p1 ,
p2 ,
p3 ,
p4 ,
p5 ,
p6 ,
p7 ,
want_result = mavutil . mavlink . MAV_RESULT_ACCEPTED ) :
2018-07-31 06:50:02 -03:00
""" Send a MAVLink command long. """
2018-07-31 06:59:15 -03:00
self . mav . mav . command_long_send ( 1 ,
1 ,
command ,
1 , # confirmation
p1 ,
p2 ,
p3 ,
p4 ,
p5 ,
p6 ,
p7 )
while True :
m = self . mav . recv_match ( type = ' COMMAND_ACK ' , blocking = True )
2018-07-31 06:59:45 -03:00
self . progress ( " ACK received: %s " % str ( m ) )
2018-07-31 06:59:15 -03:00
if m . command == command :
2018-07-31 06:59:45 -03:00
if m . result != want_result :
2018-10-23 01:56:36 -03:00
raise ValueError ( " Expected %s got %s " % ( want_result ,
m . result ) )
2018-07-31 06:59:15 -03:00
break
2018-03-05 11:14:34 -04:00
#################################################
# UTILITIES
#################################################
@staticmethod
def get_distance ( loc1 , loc2 ) :
""" Get ground distance between two locations. """
dlat = loc2 . lat - loc1 . lat
2018-07-30 08:19:14 -03:00
try :
dlong = loc2 . lng - loc1 . lng
except AttributeError :
dlong = loc2 . lon - loc1 . lon
return math . sqrt ( ( dlat * dlat ) + ( dlong * dlong ) ) * 1.113195e5
@staticmethod
def get_distance_int ( loc1 , loc2 ) :
""" Get ground distance between two locations in the normal " int " form
- lat / lon multiplied by 1e7 """
dlat = loc2 . lat - loc1 . lat
try :
dlong = loc2 . lng - loc1 . lng
except AttributeError :
dlong = loc2 . lon - loc1 . lon
dlat / = 10000000.0
dlong / = 10000000.0
2018-03-05 11:14:34 -04:00
return math . sqrt ( ( dlat * dlat ) + ( dlong * dlong ) ) * 1.113195e5
@staticmethod
def get_bearing ( loc1 , loc2 ) :
""" Get bearing from loc1 to loc2. """
off_x = loc2 . lng - loc1 . lng
off_y = loc2 . lat - loc1 . lat
bearing = 90.00 + math . atan2 ( - off_y , off_x ) * 57.2957795
if bearing < 0 :
bearing + = 360.00
return bearing
def do_get_autopilot_capabilities ( self ) :
2018-08-06 05:46:25 -03:00
tstart = self . get_sim_time ( )
while self . get_sim_time ( ) - tstart < 10 :
# Cannot use run_cmd otherwise the respond is lost during the wait for ACK
self . mav . mav . command_long_send ( 1 ,
1 ,
mavutil . mavlink . MAV_CMD_REQUEST_AUTOPILOT_CAPABILITIES ,
0 , # confirmation
1 , # 1: Request autopilot version
0 ,
0 ,
0 ,
0 ,
0 ,
0 )
m = self . mav . recv_match ( type = ' AUTOPILOT_VERSION ' ,
blocking = True ,
timeout = 10 )
if m is not None :
self . progress ( " AUTOPILOT_VERSION received " )
return
2018-10-17 23:55:16 -03:00
raise AutoTestTimeoutException ( " No AUTOPILOT_VERSION received " )
2018-03-05 11:14:34 -04:00
2018-09-10 04:56:54 -03:00
def get_mode_from_mode_mapping ( self , mode ) :
""" Validate and return the mode number from a string or int. """
mode_map = self . mav . mode_mapping ( )
if mode_map is None :
2018-10-17 23:55:16 -03:00
raise ErrorException ( " No mode map " )
2018-09-10 04:56:54 -03:00
if isinstance ( mode , str ) :
if mode in mode_map :
return mode_map . get ( mode )
if mode in mode_map . values ( ) :
return mode
self . progress ( " Available modes ' %s ' " % mode_map )
2018-10-17 23:55:16 -03:00
raise ErrorException ( " Unknown mode ' %s ' " % mode )
2018-09-10 04:56:54 -03:00
def do_set_mode_via_command_long ( self , mode , timeout = 30 ) :
""" Set mode with a command long message. """
2018-03-05 11:14:34 -04:00
base_mode = mavutil . mavlink . MAV_MODE_FLAG_CUSTOM_MODE_ENABLED
2018-09-10 04:56:54 -03:00
custom_mode = self . get_mode_from_mode_mapping ( mode )
2018-08-06 05:46:25 -03:00
tstart = self . get_sim_time ( )
2018-09-10 04:56:54 -03:00
while True :
remaining = timeout - ( self . get_sim_time_cached ( ) - tstart )
if remaining < = 0 :
2018-10-17 23:55:16 -03:00
raise AutoTestTimeoutException ( " Failed to change mode " )
2018-08-06 05:46:25 -03:00
self . run_cmd ( mavutil . mavlink . MAV_CMD_DO_SET_MODE ,
base_mode ,
custom_mode ,
0 ,
0 ,
0 ,
0 ,
0 ,
)
m = self . mav . recv_match ( type = ' HEARTBEAT ' ,
blocking = True ,
2018-10-17 23:55:16 -03:00
timeout = 5 )
2018-08-06 05:46:25 -03:00
if m is None :
2018-10-17 23:55:16 -03:00
raise ErrorException ( " Heartbeat not received " )
2018-08-06 05:46:25 -03:00
if m . custom_mode == custom_mode :
return
2018-09-10 04:56:54 -03:00
def mavproxy_do_set_mode_via_command_long ( self , mode , timeout = 30 ) :
""" Set mode with a command long message with Mavproxy. """
2018-08-06 05:46:25 -03:00
base_mode = mavutil . mavlink . MAV_MODE_FLAG_CUSTOM_MODE_ENABLED
2018-09-10 04:56:54 -03:00
custom_mode = self . get_mode_from_mode_mapping ( mode )
tstart = self . get_sim_time ( )
while True :
remaining = timeout - ( self . get_sim_time_cached ( ) - tstart )
if remaining < = 0 :
2018-10-17 23:55:16 -03:00
raise AutoTestTimeoutException ( " Failed to change mode " )
2018-03-14 08:08:53 -03:00
self . mavproxy . send ( " long DO_SET_MODE %u %u \n " %
( base_mode , custom_mode ) )
m = self . mav . recv_match ( type = ' HEARTBEAT ' ,
blocking = True ,
2018-10-17 23:55:16 -03:00
timeout = 5 )
2018-03-05 11:14:34 -04:00
if m is None :
2018-10-17 23:55:16 -03:00
raise ErrorException ( " Did not receive heartbeat " )
2018-03-05 11:14:34 -04:00
if m . custom_mode == custom_mode :
2018-09-10 04:56:54 -03:00
return True
2017-11-14 00:30:31 -04:00
2018-03-05 11:14:34 -04:00
def reach_heading_manual ( self , heading ) :
""" Manually direct the vehicle to the target heading. """
if self . mav . mav_type in [ mavutil . mavlink . MAV_TYPE_QUADROTOR ,
mavutil . mavlink . MAV_TYPE_HELICOPTER ,
mavutil . mavlink . MAV_TYPE_HEXAROTOR ,
mavutil . mavlink . MAV_TYPE_OCTOROTOR ,
mavutil . mavlink . MAV_TYPE_COAXIAL ,
mavutil . mavlink . MAV_TYPE_TRICOPTER ] :
self . mavproxy . send ( ' rc 4 1580 \n ' )
2018-04-27 11:48:06 -03:00
self . wait_heading ( heading )
2018-03-05 11:14:34 -04:00
self . mavproxy . send ( ' rc 4 1500 \n ' )
2018-03-14 08:08:53 -03:00
self . mav . recv_match ( condition = ' RC_CHANNELS.chan4_raw==1500 ' ,
blocking = True )
2018-03-05 11:14:34 -04:00
if self . mav . mav_type == mavutil . mavlink . MAV_TYPE_FIXED_WING :
2018-03-14 08:08:53 -03:00
self . progress ( " NOT IMPLEMENTED " )
2018-03-05 11:14:34 -04:00
if self . mav . mav_type == mavutil . mavlink . MAV_TYPE_GROUND_ROVER :
self . mavproxy . send ( ' rc 1 1700 \n ' )
self . mavproxy . send ( ' rc 3 1550 \n ' )
2018-04-27 11:48:06 -03:00
self . wait_heading ( heading )
2018-03-05 11:14:34 -04:00
self . mavproxy . send ( ' rc 3 1500 \n ' )
2018-03-14 08:08:53 -03:00
self . mav . recv_match ( condition = ' RC_CHANNELS.chan3_raw==1500 ' ,
blocking = True )
2018-03-05 11:14:34 -04:00
self . mavproxy . send ( ' rc 1 1500 \n ' )
2018-03-14 08:08:53 -03:00
self . mav . recv_match ( condition = ' RC_CHANNELS.chan1_raw==1500 ' ,
blocking = True )
2018-03-05 11:14:34 -04:00
2018-07-31 06:50:02 -03:00
def reach_distance_manual ( self , distance ) :
2018-03-05 11:14:34 -04:00
""" Manually direct the vehicle to the target distance from home. """
if self . mav . mav_type in [ mavutil . mavlink . MAV_TYPE_QUADROTOR ,
mavutil . mavlink . MAV_TYPE_HELICOPTER ,
mavutil . mavlink . MAV_TYPE_HEXAROTOR ,
mavutil . mavlink . MAV_TYPE_OCTOROTOR ,
mavutil . mavlink . MAV_TYPE_COAXIAL ,
mavutil . mavlink . MAV_TYPE_TRICOPTER ] :
self . mavproxy . send ( ' rc 2 1350 \n ' )
2018-04-27 11:48:06 -03:00
self . wait_distance ( distance , accuracy = 5 , timeout = 60 )
2018-03-05 11:14:34 -04:00
self . mavproxy . send ( ' rc 2 1500 \n ' )
2018-03-14 08:08:53 -03:00
self . mav . recv_match ( condition = ' RC_CHANNELS.chan2_raw==1500 ' ,
blocking = True )
2018-03-05 11:14:34 -04:00
if self . mav . mav_type == mavutil . mavlink . MAV_TYPE_FIXED_WING :
2018-03-14 08:08:53 -03:00
self . progress ( " NOT IMPLEMENTED " )
2018-03-05 11:14:34 -04:00
if self . mav . mav_type == mavutil . mavlink . MAV_TYPE_GROUND_ROVER :
self . mavproxy . send ( ' rc 3 1700 \n ' )
2018-04-27 11:48:06 -03:00
self . wait_distance ( distance , accuracy = 2 )
2018-03-05 11:14:34 -04:00
self . mavproxy . send ( ' rc 3 1500 \n ' )
2018-03-14 08:08:53 -03:00
self . mav . recv_match ( condition = ' RC_CHANNELS.chan3_raw==1500 ' ,
blocking = True )
2018-03-05 11:14:34 -04:00
2018-07-31 07:02:11 -03:00
def guided_achieve_heading ( self , heading ) :
tstart = self . get_sim_time ( )
self . run_cmd ( mavutil . mavlink . MAV_CMD_CONDITION_YAW ,
heading , # target angle
10 , # degrees/second
1 , # -1 is counter-clockwise, 1 clockwise
0 , # 1 for relative, 0 for absolute
0 , # p5
0 , # p6
0 , # p7
)
while True :
if self . get_sim_time ( ) - tstart > 200 :
2018-10-17 23:55:16 -03:00
raise NotAchievedException ( " Did not achieve heading " )
2018-07-31 07:02:11 -03:00
m = self . mav . recv_match ( type = ' VFR_HUD ' , blocking = True )
self . progress ( " heading= %f want= %f " % ( m . heading , heading ) )
if m . heading == heading :
return
2018-03-05 11:14:34 -04:00
#################################################
# WAIT UTILITIES
#################################################
def wait_seconds ( self , seconds_to_wait ) :
""" Wait some second in SITL time. """
tstart = self . get_sim_time ( )
tnow = tstart
while tstart + seconds_to_wait > tnow :
tnow = self . get_sim_time ( )
2018-05-29 05:05:56 -03:00
def wait_altitude ( self , alt_min , alt_max , timeout = 30 , relative = False ) :
2018-03-05 11:14:34 -04:00
""" Wait for a given altitude range. """
previous_alt = 0
tstart = self . get_sim_time ( )
2018-03-14 08:08:53 -03:00
self . progress ( " Waiting for altitude between %u and %u " %
( alt_min , alt_max ) )
2018-09-07 20:48:45 -03:00
last_wait_alt_msg = 0
while self . get_sim_time_cached ( ) < tstart + timeout :
2018-05-29 05:05:56 -03:00
m = self . mav . recv_match ( type = ' GLOBAL_POSITION_INT ' , blocking = True )
if m is None :
continue
if relative :
alt = m . relative_alt / 1000.0 # mm -> m
else :
alt = m . alt / 1000.0 # mm -> m
climb_rate = alt - previous_alt
previous_alt = alt
2018-09-07 20:48:45 -03:00
if self . get_sim_time_cached ( ) - last_wait_alt_msg > 1 :
self . progress ( " Wait Altitude: Cur: %u , min_alt: %u , climb_rate: %u "
% ( alt , alt_min , climb_rate ) )
last_wait_alt_msg = self . get_sim_time_cached ( )
2018-05-29 05:05:56 -03:00
if alt > = alt_min and alt < = alt_max :
2018-03-14 08:08:53 -03:00
self . progress ( " Altitude OK " )
2018-03-05 11:14:34 -04:00
return True
2018-10-17 23:55:16 -03:00
raise WaitAltitudeTimout ( " Failed to attain altitude range " )
2017-08-16 10:55:21 -03:00
2018-03-05 11:14:34 -04:00
def wait_groundspeed ( self , gs_min , gs_max , timeout = 30 ) :
""" Wait for a given ground speed range. """
2018-03-14 08:08:53 -03:00
self . progress ( " Waiting for groundspeed between %.1f and %.1f " %
( gs_min , gs_max ) )
2018-09-07 20:48:45 -03:00
last_print = 0
tstart = self . get_sim_time ( )
while self . get_sim_time_cached ( ) < tstart + timeout :
2018-03-05 11:14:34 -04:00
m = self . mav . recv_match ( type = ' VFR_HUD ' , blocking = True )
2018-09-07 20:48:45 -03:00
if self . get_sim_time_cached ( ) - last_print > 1 :
self . progress ( " Wait groundspeed %.1f , target: %.1f " %
( m . groundspeed , gs_min ) )
last_print = self . get_sim_time_cached ( ) ;
2018-03-05 11:14:34 -04:00
if m . groundspeed > = gs_min and m . groundspeed < = gs_max :
return True
2018-10-17 23:55:16 -03:00
raise WaitGroundSpeedTimeout ( " Failed to attain groundspeed range " )
2017-08-16 10:55:21 -03:00
2018-03-05 11:14:34 -04:00
def wait_roll ( self , roll , accuracy , timeout = 30 ) :
""" Wait for a given roll in degrees. """
tstart = self . get_sim_time ( )
2018-03-14 08:08:53 -03:00
self . progress ( " Waiting for roll of %d at %s " % ( roll , time . ctime ( ) ) )
2018-03-05 11:14:34 -04:00
while self . get_sim_time ( ) < tstart + timeout :
m = self . mav . recv_match ( type = ' ATTITUDE ' , blocking = True )
p = math . degrees ( m . pitch )
r = math . degrees ( m . roll )
2018-03-14 08:08:53 -03:00
self . progress ( " Roll %d Pitch %d " % ( r , p ) )
2018-03-05 11:14:34 -04:00
if math . fabs ( r - roll ) < = accuracy :
2018-03-14 08:08:53 -03:00
self . progress ( " Attained roll %d " % roll )
2018-03-05 11:14:34 -04:00
return True
2018-10-17 23:55:16 -03:00
raise WaitRollTimeout ( " Failed to attain roll %d " % roll )
2017-08-16 10:55:21 -03:00
2018-03-05 11:14:34 -04:00
def wait_pitch ( self , pitch , accuracy , timeout = 30 ) :
""" Wait for a given pitch in degrees. """
tstart = self . get_sim_time ( )
2018-03-14 08:08:53 -03:00
self . progress ( " Waiting for pitch of %u at %s " % ( pitch , time . ctime ( ) ) )
2018-03-05 11:14:34 -04:00
while self . get_sim_time ( ) < tstart + timeout :
m = self . mav . recv_match ( type = ' ATTITUDE ' , blocking = True )
p = math . degrees ( m . pitch )
r = math . degrees ( m . roll )
2018-03-14 08:08:53 -03:00
self . progress ( " Pitch %d Roll %d " % ( p , r ) )
2018-03-05 11:14:34 -04:00
if math . fabs ( p - pitch ) < = accuracy :
2018-03-14 08:08:53 -03:00
self . progress ( " Attained pitch %d " % pitch )
2018-03-05 11:14:34 -04:00
return True
2018-10-17 23:55:16 -03:00
raise WaitPitchTimeout ( " Failed to attain pitch %d " % pitch )
2017-08-16 10:55:21 -03:00
2018-03-05 11:14:34 -04:00
def wait_heading ( self , heading , accuracy = 5 , timeout = 30 ) :
""" Wait for a given heading. """
tstart = self . get_sim_time ( )
2018-03-14 08:08:53 -03:00
self . progress ( " Waiting for heading %u with accuracy %u " %
( heading , accuracy ) )
2018-08-16 23:48:36 -03:00
last_print_time = 0
while True :
now = self . get_sim_time_cached ( )
if now - tstart > = timeout :
break
2018-03-05 11:14:34 -04:00
m = self . mav . recv_match ( type = ' VFR_HUD ' , blocking = True )
2018-09-07 20:48:45 -03:00
if now - last_print_time > 1 :
2018-08-16 23:48:36 -03:00
self . progress ( " Heading %u (want %f +- %f ) " % (
m . heading , heading , accuracy ) )
last_print_time = now
2018-03-05 11:14:34 -04:00
if math . fabs ( m . heading - heading ) < = accuracy :
2018-03-14 08:08:53 -03:00
self . progress ( " Attained heading %u " % heading )
2018-03-05 11:14:34 -04:00
return True
2018-10-17 23:55:16 -03:00
raise WaitHeadingTimeout ( " Failed to attain heading %u " % heading )
2017-08-16 10:55:21 -03:00
2018-03-05 11:14:34 -04:00
def wait_distance ( self , distance , accuracy = 5 , timeout = 30 ) :
""" Wait for flight of a given distance. """
tstart = self . get_sim_time ( )
start = self . mav . location ( )
2018-09-07 20:48:45 -03:00
last_distance_message = 0
2018-03-05 11:14:34 -04:00
while self . get_sim_time ( ) < tstart + timeout :
pos = self . mav . location ( )
delta = self . get_distance ( start , pos )
2018-09-07 20:48:45 -03:00
if self . get_sim_time_cached ( ) - last_distance_message > = 1 :
self . progress ( " Distance= %.2f meters want= %.2f " %
( delta , distance ) )
last_distance_message = self . get_sim_time_cached ( )
2018-03-05 11:14:34 -04:00
if math . fabs ( delta - distance ) < = accuracy :
2018-03-14 08:08:53 -03:00
self . progress ( " Attained distance %.2f meters OK " % delta )
2018-03-05 11:14:34 -04:00
return True
if delta > ( distance + accuracy ) :
2018-10-17 23:55:16 -03:00
raise WaitDistanceTimeout (
" Failed distance - overshoot delta= %f dist= %f "
% ( delta , distance ) )
raise WaitDistanceTimeout ( " Failed to attain distance %u " % distance )
2017-08-16 10:55:21 -03:00
2018-08-15 04:48:54 -03:00
def wait_servo_channel_value ( self , channel , value , timeout = 2 ) :
""" wait for channel to hit value """
channel_field = " servo %u _raw " % channel
tstart = self . get_sim_time ( )
while True :
remaining = timeout - ( self . get_sim_time_cached ( ) - tstart )
if remaining < = 0 :
2018-10-17 23:55:16 -03:00
raise NotAchievedException ( " Channel never achieved value " )
2018-08-15 04:48:54 -03:00
m = self . mav . recv_match ( type = ' SERVO_OUTPUT_RAW ' ,
blocking = True ,
timeout = remaining )
m_value = getattr ( m , channel_field , None )
self . progress ( " SERVO_OUTPUT_RAW. %s = %u want= %u " %
( channel_field , m_value , value ) )
if m_value is None :
2018-10-17 23:55:16 -03:00
raise ValueError ( " message has no field %s " % channel_field )
2018-08-15 04:48:54 -03:00
if m_value == value :
return
2018-03-14 08:08:53 -03:00
def wait_location ( self ,
loc ,
accuracy = 5 ,
timeout = 30 ,
target_altitude = None ,
height_accuracy = - 1 ) :
2018-03-05 11:14:34 -04:00
""" Wait for arrival at a location. """
tstart = self . get_sim_time ( )
if target_altitude is None :
target_altitude = loc . alt
2018-03-14 08:08:53 -03:00
self . progress ( " Waiting for location "
" %.4f , %.4f at altitude %.1f height_accuracy= %.1f " %
( loc . lat , loc . lng , target_altitude , height_accuracy ) )
2018-09-07 20:48:45 -03:00
last_distance_message = 0
2018-03-05 11:14:34 -04:00
while self . get_sim_time ( ) < tstart + timeout :
pos = self . mav . location ( )
delta = self . get_distance ( loc , pos )
2018-09-07 20:48:45 -03:00
if self . get_sim_time_cached ( ) - last_distance_message > = 1 :
self . progress ( " Distance %.2f meters alt %.1f " % ( delta , pos . alt ) )
last_distance_message = self . get_sim_time_cached ( )
2018-03-05 11:14:34 -04:00
if delta < = accuracy :
2018-03-14 08:08:53 -03:00
height_delta = math . fabs ( pos . alt - target_altitude )
2018-07-31 06:50:02 -03:00
if height_accuracy != - 1 and height_delta > height_accuracy :
2018-03-05 11:14:34 -04:00
continue
2018-03-14 08:08:53 -03:00
self . progress ( " Reached location ( %.2f meters) " % delta )
2018-03-05 11:14:34 -04:00
return True
2018-10-17 23:55:16 -03:00
raise WaitLocationTimeout ( " Failed to attain location " )
2017-08-16 10:55:21 -03:00
2018-07-24 23:57:08 -03:00
def wait_current_waypoint ( self , wpnum , timeout = 60 ) :
tstart = self . get_sim_time ( )
while self . get_sim_time ( ) < tstart + timeout :
seq = self . mav . waypoint_current ( )
self . progress ( " Waiting for wp= %u current= %u " % ( wpnum , seq ) )
if seq == wpnum :
break ;
2018-03-14 08:08:53 -03:00
def wait_waypoint ( self ,
wpnum_start ,
wpnum_end ,
allow_skip = True ,
max_dist = 2 ,
timeout = 400 ) :
2018-03-05 11:14:34 -04:00
""" Wait for waypoint ranges. """
tstart = self . get_sim_time ( )
# this message arrives after we set the current WP
start_wp = self . mav . waypoint_current ( )
current_wp = start_wp
mode = self . mav . flightmode
2018-09-07 20:48:45 -03:00
self . progress ( " wait for waypoint ranges start= %u end= %u "
2018-03-14 08:08:53 -03:00
% ( wpnum_start , wpnum_end ) )
2018-03-05 11:14:34 -04:00
# if start_wp != wpnum_start:
2018-10-17 23:55:16 -03:00
# raise WaitWaypointTimeout("test: Expected start waypoint %u "
# "but got %u" %
2018-03-14 08:08:53 -03:00
# (wpnum_start, start_wp))
2018-03-05 11:14:34 -04:00
2018-09-07 20:48:45 -03:00
last_wp_msg = 0
while self . get_sim_time_cached ( ) < tstart + timeout :
2018-03-05 11:14:34 -04:00
seq = self . mav . waypoint_current ( )
2018-03-14 08:08:53 -03:00
m = self . mav . recv_match ( type = ' NAV_CONTROLLER_OUTPUT ' ,
blocking = True )
2018-03-05 11:14:34 -04:00
wp_dist = m . wp_dist
m = self . mav . recv_match ( type = ' VFR_HUD ' , blocking = True )
# if we changed mode, fail
if self . mav . flightmode != mode :
2018-10-17 23:55:16 -03:00
raise WaitWaypointTimeout ( ' Exited %s mode ' % mode )
2018-03-05 11:14:34 -04:00
2018-09-07 20:48:45 -03:00
if self . get_sim_time_cached ( ) - last_wp_msg > 1 :
self . progress ( " WP %u (wp_dist= %u Alt= %d ), current_wp: %u , "
" wpnum_end: %u " %
( seq , wp_dist , m . alt , current_wp , wpnum_end ) )
last_wp_msg = self . get_sim_time_cached ( )
2018-03-05 11:14:34 -04:00
if seq == current_wp + 1 or ( seq > current_wp + 1 and allow_skip ) :
2018-03-14 08:08:53 -03:00
self . progress ( " test: Starting new waypoint %u " % seq )
2018-03-05 11:14:34 -04:00
tstart = self . get_sim_time ( )
current_wp = seq
2018-03-14 08:08:53 -03:00
# the wp_dist check is a hack until we can sort out
# the right seqnum for end of mission
# if current_wp == wpnum_end or (current_wp == wpnum_end-1 and
# wp_dist < 2):
2018-03-05 11:14:34 -04:00
if current_wp == wpnum_end and wp_dist < max_dist :
2018-03-14 08:08:53 -03:00
self . progress ( " Reached final waypoint %u " % seq )
2018-03-05 11:14:34 -04:00
return True
if seq > = 255 :
2018-03-14 08:08:53 -03:00
self . progress ( " Reached final waypoint %u " % seq )
2018-03-05 11:14:34 -04:00
return True
if seq > current_wp + 1 :
2018-10-17 23:55:16 -03:00
raise WaitWaypointTimeout ( (
" Skipped waypoint! Got wp %u expected %u "
% ( seq , current_wp + 1 ) ) )
raise WaitWaypointTimeout ( " Timed out waiting for waypoint %u of %u " %
( wpnum_end , wpnum_end ) )
2013-01-14 03:03:10 -04:00
2018-04-14 08:31:22 -03:00
def wait_mode ( self , mode , timeout = 60 ) :
2018-03-05 11:14:34 -04:00
""" Wait for mode to change. """
2018-09-10 04:56:54 -03:00
self . get_mode_from_mode_mapping ( mode )
2018-03-14 08:08:53 -03:00
self . progress ( " Waiting for mode %s " % mode )
2018-03-05 11:14:34 -04:00
tstart = self . get_sim_time ( )
2018-04-14 08:31:22 -03:00
self . mav . wait_heartbeat ( )
while self . mav . flightmode != mode :
if ( timeout is not None and
2018-07-31 06:50:02 -03:00
self . get_sim_time ( ) > tstart + timeout ) :
2018-10-17 23:55:16 -03:00
raise WaitModeTimeout ( " Did not change mode " )
2018-03-05 11:14:34 -04:00
self . mav . wait_heartbeat ( )
2018-07-31 06:50:02 -03:00
# self.progress("heartbeat mode %s Want: %s" % (
# self.mav.flightmode, mode))
2018-03-14 08:08:53 -03:00
self . progress ( " Got mode %s " % mode )
2018-03-05 11:14:34 -04:00
2018-10-02 19:28:21 -03:00
def wait_ready_to_arm ( self , timeout = None , require_absolute = True ) :
2018-03-05 11:14:34 -04:00
# wait for EKF checks to pass
2018-07-30 08:19:14 -03:00
self . progress ( " Waiting reading for arm " )
2018-10-02 19:28:21 -03:00
return self . wait_ekf_happy ( timeout = timeout ,
require_absolute = require_absolute )
2018-03-05 11:14:34 -04:00
2018-10-02 19:28:21 -03:00
def wait_ekf_happy ( self , timeout = 30 , require_absolute = True ) :
2018-03-05 11:14:34 -04:00
""" Wait for EKF to be happy """
2018-07-20 08:53:31 -03:00
""" if using SITL estimates directly """
if ( int ( self . get_parameter ( ' AHRS_EKF_TYPE ' ) ) == 10 ) :
return True
2018-03-05 11:14:34 -04:00
tstart = self . get_sim_time ( )
2018-07-15 21:59:45 -03:00
# all of these must be set for arming to happen:
required_value = ( mavutil . mavlink . EKF_ATTITUDE |
mavutil . mavlink . ESTIMATOR_VELOCITY_HORIZ |
mavutil . mavlink . ESTIMATOR_VELOCITY_VERT |
mavutil . mavlink . ESTIMATOR_POS_HORIZ_REL |
2018-10-02 19:28:21 -03:00
mavutil . mavlink . ESTIMATOR_PRED_POS_HORIZ_REL )
2018-07-15 21:59:45 -03:00
# none of these bits must be set for arming to happen:
error_bits = ( mavutil . mavlink . ESTIMATOR_CONST_POS_MODE |
mavutil . mavlink . ESTIMATOR_ACCEL_ERROR )
2018-10-02 19:28:21 -03:00
if require_absolute :
required_value | = ( mavutil . mavlink . ESTIMATOR_POS_HORIZ_ABS |
mavutil . mavlink . ESTIMATOR_POS_VERT_ABS |
mavutil . mavlink . ESTIMATOR_PRED_POS_HORIZ_ABS )
error_bits | = mavutil . mavlink . ESTIMATOR_GPS_GLITCH
2018-03-14 08:08:53 -03:00
self . progress ( " Waiting for EKF value %u " % required_value )
2018-09-07 20:48:45 -03:00
last_err_print_time = 0
last_print_time = 0
while timeout is None or self . get_sim_time_cached ( ) < tstart + timeout :
2018-03-05 11:14:34 -04:00
m = self . mav . recv_match ( type = ' EKF_STATUS_REPORT ' , blocking = True )
current = m . flags
2018-09-07 20:48:45 -03:00
if self . get_sim_time_cached ( ) - last_print_time > 1 :
2018-03-14 08:08:53 -03:00
self . progress ( " Wait EKF.flags: required: %u current: %u " %
( required_value , current ) )
2018-09-07 20:48:45 -03:00
last_print_time = self . get_sim_time_cached ( )
2018-07-15 21:59:45 -03:00
errors = current & error_bits
2018-09-07 20:48:45 -03:00
if errors and self . get_sim_time_cached ( ) - last_err_print_time > 1 :
2018-07-15 21:59:45 -03:00
self . progress ( " Wait EKF.flags: errors= %u " % errors )
2018-09-07 20:48:45 -03:00
last_err_print_time = self . get_sim_time_cached ( )
2018-07-15 21:59:45 -03:00
continue
if ( current & required_value == required_value ) :
2018-03-14 08:08:53 -03:00
self . progress ( " EKF Flags OK " )
2018-04-27 11:48:06 -03:00
return True
2018-10-17 23:55:16 -03:00
raise AutoTestTimeoutException ( " Failed to get EKF.flags= %u " %
required_value )
2018-03-05 11:14:34 -04:00
2018-08-20 06:45:45 -03:00
def wait_text ( self , text , timeout = 20 , the_function = None ) :
""" Wait a specific STATUS_TEXT. """
self . progress ( " Waiting for text : %s " % text . lower ( ) )
tstart = self . get_sim_time ( )
while self . get_sim_time ( ) < tstart + timeout :
if the_function is not None :
the_function ( )
m = self . mav . recv_match ( type = ' STATUSTEXT ' , blocking = True )
if text . lower ( ) in m . text . lower ( ) :
self . progress ( " Received expected text : %s " % m . text . lower ( ) )
return True
2018-10-17 23:55:16 -03:00
self . progress ( )
raise AutoTestTimeoutException ( " Failed to received text : %s " %
text . lower ( ) )
2018-08-20 06:45:45 -03:00
2018-08-03 20:20:15 -03:00
def get_mavlink_connection_going ( self ) :
# get a mavlink connection going
2018-08-03 21:04:19 -03:00
connection_string = self . autotest_connection_string_to_mavproxy ( )
2018-08-03 20:20:15 -03:00
try :
self . mav = mavutil . mavlink_connection ( connection_string ,
2018-08-07 06:58:09 -03:00
robust_parsing = True ,
source_component = 250 )
2018-08-03 20:20:15 -03:00
except Exception as msg :
self . progress ( " Failed to start mavlink connection on %s : %s " %
( connection_string , msg , ) )
raise
self . mav . message_hooks . append ( self . message_hook )
self . mav . idle_hooks . append ( self . idle_hook )
2018-07-31 06:50:02 -03:00
def run_test ( self , desc , test_function , interact = False ) :
2018-08-20 12:47:58 -03:00
self . start_test ( desc )
2018-05-08 09:12:13 -03:00
try :
2018-07-31 06:50:02 -03:00
test_function ( )
2018-05-08 09:12:13 -03:00
except Exception as e :
2018-05-15 03:42:36 -03:00
self . progress ( ' FAILED: " %s " : %s ' % ( desc , repr ( e ) ) )
2018-05-09 00:32:23 -03:00
self . fail_list . append ( ( desc , e ) )
2018-04-14 08:31:22 -03:00
if interact :
self . progress ( " Starting MAVProxy interaction as directed " )
self . mavproxy . interact ( )
2018-05-08 09:12:13 -03:00
return
self . progress ( ' PASSED: " %s " ' % desc )
2018-08-20 12:47:58 -03:00
def check_test_syntax ( self , test_file ) :
""" Check mistake on autotest function syntax. """
import re
self . start_test ( " Check for syntax mistake in autotest lambda " )
if not os . path . isfile ( test_file ) :
self . progress ( " File %s does not exist " % test_file )
test_file = test_file . rstrip ( ' c ' )
try :
with open ( test_file ) as f :
# check for lambda: test_function without paranthesis
faulty_strings = re . findall ( r " lambda \ s*: \ s* \ w+. \ w+ \ s* \ ) " , f . read ( ) )
if faulty_strings :
2018-10-18 18:54:56 -03:00
desc = ( " Syntax error in autotest lambda at : " +
2018-10-17 23:55:16 -03:00
faulty_strings )
raise ErrorException ( desc )
2018-08-20 12:47:58 -03:00
except ErrorException :
self . progress ( ' FAILED: " %s " ' % " Check for syntax mistake in autotest lambda " )
exit ( 1 )
self . progress ( ' PASSED: " %s " ' % " Check for syntax mistake in autotest lambda " )
2018-03-05 11:14:34 -04:00
@abc.abstractmethod
def init ( self ) :
""" Initilialize autotest feature. """
pass
2018-08-03 06:42:19 -03:00
def test_arm_feature ( self ) :
""" Common feature to test. """
2018-09-17 11:50:30 -03:00
self . context_push ( )
2018-08-03 06:42:19 -03:00
# TEST ARMING/DISARM
2018-09-19 13:36:50 -03:00
self . set_parameter ( " ARMING_RUDDER " , 2 ) # allow arm and disarm with rudder on first tests
interlock_channel = 8 # Plane got flighmode_ch on channel 8
if self . mav . mav_type is not mavutil . mavlink . MAV_TYPE_HELICOPTER : # heli don't need interlock option
interlock_channel = 9
self . set_parameter ( " RC %u _OPTION " % interlock_channel , 32 )
self . set_rc ( interlock_channel , 1000 )
self . set_throttle_zero ( )
self . start_test ( " Test normal arm and disarm features " )
2018-08-03 06:42:19 -03:00
if not self . arm_vehicle ( ) :
2018-10-17 23:55:16 -03:00
raise NotAchievedException ( " Failed to ARM " )
2018-08-03 06:42:19 -03:00
if not self . disarm_vehicle ( ) :
2018-10-17 23:55:16 -03:00
raise NotAchievedException ( " Failed to DISARM " )
2018-08-06 05:45:51 -03:00
if not self . mavproxy_arm_vehicle ( ) :
2018-10-17 23:55:16 -03:00
raise NotAchievedException ( " Failed to ARM " )
2018-08-06 05:45:51 -03:00
if not self . mavproxy_disarm_vehicle ( ) :
2018-10-17 23:55:16 -03:00
raise NotAchievedException ( " Failed to DISARM " )
2018-09-17 11:50:30 -03:00
if self . mav . mav_type != mavutil . mavlink . MAV_TYPE_SUBMARINE :
if not self . arm_motors_with_rc_input ( ) :
2018-10-17 23:55:16 -03:00
raise NotAchievedException ( " Failed to arm with RC input " )
2018-09-17 11:50:30 -03:00
if not self . disarm_motors_with_rc_input ( ) :
2018-10-17 23:55:16 -03:00
raise NotAchievedException ( " Failed to disarm with RC input " )
2018-09-19 13:33:11 -03:00
# self.arm_vehicle()
# if not self.autodisarm_motors():
2018-10-17 23:55:16 -03:00
# raise NotAchievedException("Did not autodisarm")
2018-10-10 18:35:53 -03:00
# Disable auto disarm for next test
# Rover and Sub don't have auto disarm
2018-09-19 13:36:50 -03:00
if self . mav . mav_type in [ mavutil . mavlink . MAV_TYPE_QUADROTOR ,
mavutil . mavlink . MAV_TYPE_HELICOPTER ,
mavutil . mavlink . MAV_TYPE_HEXAROTOR ,
mavutil . mavlink . MAV_TYPE_OCTOROTOR ,
mavutil . mavlink . MAV_TYPE_COAXIAL ,
mavutil . mavlink . MAV_TYPE_TRICOPTER ] :
self . set_parameter ( " DISARM_DELAY " , 0 )
if self . mav . mav_type == mavutil . mavlink . MAV_TYPE_FIXED_WING :
self . set_parameter ( " LAND_DISARMDELAY " , 0 )
2018-10-10 18:35:53 -03:00
# Sub has no 'switches'
if self . mav . mav_type == mavutil . mavlink . MAV_TYPE_SUBMARINE :
if not self . disarm_vehicle ( ) :
raise NotAchievedException ( " Failed to isarm " )
else :
self . start_test ( " Test arm and disarm with switch " )
arming_switch = 7
self . set_parameter ( " RC %d _OPTION " % arming_switch , 41 )
self . set_rc ( arming_switch , 1000 )
if not self . arm_motors_with_switch ( arming_switch ) :
raise NotAchievedException ( " Failed to arm with switch " )
if not self . disarm_motors_with_switch ( arming_switch ) :
raise NotAchievedException ( " Failed to disarm with switch " )
self . set_rc ( arming_switch , 1000 )
2018-09-19 13:43:22 -03:00
if self . mav . mav_type in [ mavutil . mavlink . MAV_TYPE_QUADROTOR ,
mavutil . mavlink . MAV_TYPE_HELICOPTER ,
mavutil . mavlink . MAV_TYPE_HEXAROTOR ,
mavutil . mavlink . MAV_TYPE_OCTOROTOR ,
mavutil . mavlink . MAV_TYPE_COAXIAL ,
mavutil . mavlink . MAV_TYPE_TRICOPTER ] :
self . start_test ( " Test arming failure with throttle too high " )
self . set_rc ( 3 , 1800 )
try :
if self . arm_vehicle ( ) :
2018-10-17 23:55:16 -03:00
raise NotAchievedException ( " Failed to NOT ARM " )
2018-09-19 13:43:22 -03:00
except AutoTestTimeoutException ( ) :
pass
except ValueError :
pass
if self . arm_motors_with_rc_input ( ) :
2018-10-17 23:55:16 -03:00
raise NotAchievedException ( " Failed to NOT ARM " )
2018-09-19 13:43:22 -03:00
if self . arm_motors_with_switch ( arming_switch ) :
2018-10-17 23:55:16 -03:00
raise NotAchievedException ( " Failed to NOT ARM " )
2018-09-19 13:43:22 -03:00
self . set_throttle_zero ( )
self . set_rc ( arming_switch , 1000 )
2018-10-10 18:35:53 -03:00
# Sub doesn't have 'stick commands'
if self . mav . mav_type != mavutil . mavlink . MAV_TYPE_SUBMARINE :
self . start_test ( " Test arming failure with ARMING_RUDDER=0 " )
self . set_parameter ( " ARMING_RUDDER " , 0 )
if self . arm_motors_with_rc_input ( ) :
raise NotAchievedException ( " Failed to NOT ARM " )
self . start_test ( " Test disarming failure with ARMING_RUDDER=0 " )
self . arm_vehicle ( )
if self . disarm_motors_with_rc_input ( ) :
raise NotAchievedException ( " Failed to NOT DISARM " )
self . disarm_vehicle ( )
self . mav . wait_heartbeat ( )
self . start_test ( " Test disarming failure with ARMING_RUDDER=1 " )
self . set_parameter ( " ARMING_RUDDER " , 1 )
self . arm_vehicle ( )
if self . disarm_motors_with_rc_input ( ) :
raise NotAchievedException ( " Failed to NOT ARM " )
self . disarm_vehicle ( )
self . mav . wait_heartbeat ( )
self . set_parameter ( " ARMING_RUDDER " , 2 )
2018-09-19 13:43:59 -03:00
if self . mav . mav_type in [ mavutil . mavlink . MAV_TYPE_QUADROTOR ,
mavutil . mavlink . MAV_TYPE_HELICOPTER ,
mavutil . mavlink . MAV_TYPE_HEXAROTOR ,
mavutil . mavlink . MAV_TYPE_OCTOROTOR ,
mavutil . mavlink . MAV_TYPE_COAXIAL ,
mavutil . mavlink . MAV_TYPE_TRICOPTER ] :
self . start_test ( " Test arming failure with interlock enabled " )
self . set_rc ( interlock_channel , 2000 )
if self . arm_motors_with_rc_input ( ) :
2018-10-17 23:55:16 -03:00
raise NotAchievedException ( " Failed to NOT ARM " )
2018-09-19 13:43:59 -03:00
if self . arm_motors_with_switch ( arming_switch ) :
2018-10-17 23:55:16 -03:00
raise NotAchievedException ( " Failed to NOT ARM " )
2018-09-19 13:43:59 -03:00
self . disarm_vehicle ( )
self . mav . wait_heartbeat ( )
self . set_rc ( arming_switch , 1000 )
self . set_rc ( interlock_channel , 1000 )
2018-09-19 13:45:07 -03:00
if self . mav . mav_type is mavutil . mavlink . MAV_TYPE_HELICOPTER :
self . start_test ( " Test motor interlock enable can ' t be set while disarmed " )
self . set_rc ( interlock_channel , 2000 )
channel_field = " servo %u _raw " % interlock_channel
interlock_value = self . get_parameter ( " SERVO %u _MIN " % interlock_channel )
tstart = self . get_sim_time ( )
while True :
2018-11-04 06:38:15 -04:00
if self . get_sim_time_cached ( ) - tstart > 20 :
self . set_rc ( 8 , 1000 )
break # success!
2018-09-19 13:45:07 -03:00
m = self . mav . recv_match ( type = ' SERVO_OUTPUT_RAW ' ,
blocking = True ,
2018-11-04 06:38:15 -04:00
timeout = 2 )
if m is None :
continue
2018-09-19 13:45:07 -03:00
m_value = getattr ( m , channel_field , None )
if m_value is None :
2018-11-04 06:38:15 -04:00
self . set_rc ( 8 , 1000 )
2018-10-17 23:55:16 -03:00
raise ValueError ( " Message has no %s field " %
channel_field )
2018-09-19 13:45:07 -03:00
self . progress ( " SERVO_OUTPUT_RAW. %s = %u want= %u " %
( channel_field , m_value , interlock_value ) )
if m_value != interlock_value :
2018-11-04 06:38:15 -04:00
self . set_rc ( 8 , 1000 )
2018-10-17 23:55:16 -03:00
raise NotAchievedException (
" Motor interlock was changed while disarmed " )
2018-11-04 06:38:15 -04:00
2018-09-19 13:45:07 -03:00
self . set_rc ( 8 , 1000 )
self . progress ( " ALL PASS " )
2018-09-17 11:50:30 -03:00
self . context_pop ( )
2018-08-03 06:42:19 -03:00
# TODO : add failure test : arming check, wrong mode; Test arming magic; Same for disarm
2018-08-20 06:45:45 -03:00
def test_gripper ( self ) :
self . context_push ( )
self . set_parameter ( " GRIP_ENABLE " , 1 )
self . fetch_parameters ( )
self . set_parameter ( " GRIP_GRAB " , 2000 )
self . set_parameter ( " GRIP_RELEASE " , 1000 )
self . set_parameter ( " GRIP_TYPE " , 1 )
self . set_parameter ( " SIM_GRPS_ENABLE " , 1 )
self . set_parameter ( " SIM_GRPS_PIN " , 8 )
self . set_parameter ( " SERVO8_FUNCTION " , 28 )
self . set_parameter ( " SERVO8_MIN " , 1000 )
self . set_parameter ( " SERVO8_MAX " , 2000 )
2018-10-03 06:44:02 -03:00
self . set_parameter ( " SERVO9_MIN " , 1000 )
self . set_parameter ( " SERVO9_MAX " , 2000 )
2018-08-20 06:45:45 -03:00
self . set_parameter ( " RC9_OPTION " , 19 )
2018-10-03 06:44:02 -03:00
self . set_rc ( 9 , 1500 )
2018-08-20 06:45:45 -03:00
self . reboot_sitl ( )
self . progress ( " Waiting reading for arm " )
self . wait_ready_to_arm ( )
self . progress ( " Test gripper with RC9_OPTION " )
self . progress ( " Releasing load " )
# non strict string matching because of catching text issue....
self . wait_text ( " Gripper load releas " , the_function = lambda : self . set_rc ( 9 , 1000 ) )
self . progress ( " Grabbing load " )
self . wait_text ( " Gripper load grabb " , the_function = lambda : self . set_rc ( 9 , 2000 ) )
self . progress ( " Releasing load " )
self . wait_text ( " Gripper load releas " , the_function = lambda : self . set_rc ( 9 , 1000 ) )
self . progress ( " Grabbing load " )
self . wait_text ( " Gripper load grabb " , the_function = lambda : self . set_rc ( 9 , 2000 ) )
self . progress ( " Test gripper with Mavlink cmd " )
self . progress ( " Releasing load " )
self . wait_text ( " Gripper load releas " ,
the_function = lambda : self . mav . mav . command_long_send ( 1 ,
1 ,
mavutil . mavlink . MAV_CMD_DO_GRIPPER ,
0 ,
1 ,
mavutil . mavlink . GRIPPER_ACTION_RELEASE ,
0 ,
0 ,
0 ,
0 ,
0 ,
) )
self . progress ( " Grabbing load " )
self . wait_text ( " Gripper load grabb " ,
the_function = lambda : self . mav . mav . command_long_send ( 1 ,
1 ,
mavutil . mavlink . MAV_CMD_DO_GRIPPER ,
0 ,
1 ,
mavutil . mavlink . GRIPPER_ACTION_GRAB ,
0 ,
0 ,
0 ,
0 ,
0 ,
) )
self . progress ( " Releasing load " )
self . wait_text ( " Gripper load releas " ,
the_function = lambda : self . mav . mav . command_long_send ( 1 ,
1 ,
mavutil . mavlink . MAV_CMD_DO_GRIPPER ,
0 ,
1 ,
mavutil . mavlink . GRIPPER_ACTION_RELEASE ,
0 ,
0 ,
0 ,
0 ,
0 ,
) )
self . progress ( " Grabbing load " )
self . wait_text ( " Gripper load grabb " ,
the_function = lambda : self . mav . mav . command_long_send ( 1 ,
1 ,
mavutil . mavlink . MAV_CMD_DO_GRIPPER ,
0 ,
1 ,
mavutil . mavlink . GRIPPER_ACTION_GRAB ,
0 ,
0 ,
0 ,
0 ,
0 ,
) )
self . context_pop ( )
self . reboot_sitl ( )
2018-03-05 11:14:34 -04:00
# # TEST MISSION FILE
# # TODO : rework that to work on autotest server
2018-03-14 08:08:53 -03:00
# # self.progress("TEST LOADING MISSION")
2018-11-09 08:32:02 -04:00
# # num_wp = self.load_mission(
2018-03-14 08:08:53 -03:00
# os.path.join(testdir, "fake_mission.txt"))
2018-03-05 11:14:34 -04:00
# # if num_wp == 0:
2018-03-14 08:08:53 -03:00
# # self.progress("Failed to load all_msg_mission")
2018-03-05 11:14:34 -04:00
# # sucess = False
# #
2018-03-14 08:08:53 -03:00
# # self.progress("TEST SAVING MISSION")
2018-03-05 11:14:34 -04:00
# # num_wp_old = num_wp
2018-03-14 08:08:53 -03:00
# # num_wp = self.save_mission_to_file(os.path.join(testdir,
# "fake_mission2.txt"))
2018-03-05 11:14:34 -04:00
# # if num_wp != num_wp_old:
2018-03-14 08:08:53 -03:00
# # self.progress("Failed to save all_msg_mission")
2018-03-05 11:14:34 -04:00
# # sucess = False
#
2018-03-14 08:08:53 -03:00
# self.progress("TEST CLEARING MISSION")
2018-03-05 11:14:34 -04:00
# self.mavproxy.send("wp clear\n")
# self.mavproxy.send('wp list\n')
# self.mavproxy.expect('Requesting [0-9]+ waypoints')
# num_wp = mavwp.MAVWPLoader().count()
# if num_wp != 0:
2018-03-14 08:08:53 -03:00
# self.progress("Failed to clear mission ")
2018-03-05 11:14:34 -04:00
# sucess = False
#
# return sucess
#
# # TESTS FAILSAFE
# @abc.abstractmethod
2018-03-14 08:08:53 -03:00
# def test_throttle_failsafe(self, home, distance_min=10, side=60,
# timeout=180):
2018-03-05 11:14:34 -04:00
# """Test that RTL success in case of thottle failsafe."""
# pass
#
# # TEST ARM RADIO
# @abc.abstractmethod
# def test_arm_motors_radio(self):
# """Test arming with RC sticks."""
# pass
#
# # TEST DISARM RADIO
# @abc.abstractmethod
# def test_disarm_motors_radio(self):
# """Test disarming with RC sticks."""
# pass
#
# # TEST AUTO DISARM
# @abc.abstractmethod
# def test_autodisarm_motors(self):
# """Test auto disarming."""
# pass
#
# # TEST RC OVERRIDE
# # TEST RC OVERRIDE TIMEOUT
# @abc.abstractmethod
# def test_rtl(self, home, distance_min=10, timeout=250):
# """Test that RTL success."""
2018-03-14 08:08:53 -03:00
# self.progress("# Enter RTL")
2018-03-05 11:14:34 -04:00
# self.mavproxy.send('switch 3\n')
# tstart = self.get_sim_time()
# while self.get_sim_time() < tstart + timeout:
# m = self.mav.recv_match(type='VFR_HUD', blocking=True)
# pos = self.mav.location()
# home_distance = self.get_distance(home, pos)
2018-03-14 08:08:53 -03:00
# self.progress("Alt: %u HomeDistance: %.0f" %
# (m.alt, home_distance))
2018-03-05 11:14:34 -04:00
# if m.alt <= 1 and home_distance < distance_min:
2018-03-14 08:08:53 -03:00
# self.progress("RTL Complete")
2018-03-05 11:14:34 -04:00
# return True
# return False
#
# @abc.abstractmethod
# def test_mission(self, filename):
# pass
@abc.abstractmethod
def autotest ( self ) :
""" Autotest used by ArduPilot autotest CI. """
pass