Tools: autotest: flake8 compliance

Taking the opportunity to make massive changes while we're destroying
history anyway.
This commit is contained in:
Peter Barker 2018-03-14 22:08:53 +11:00 committed by Randy Mackay
parent 259dda810d
commit b348cfa985
7 changed files with 1011 additions and 709 deletions

View File

@ -3,26 +3,43 @@
# Drive APMrover2 in SITL
from __future__ import print_function
import shutil
import os
import pexpect
import shutil
import time
from common import AutoTest
from common import *
from pysim import util
from pysim import vehicleinfo
from pymavlink import mavutil
# get location of scripts
testdir = os.path.dirname(os.path.realpath(__file__))
# HOME = mavutil.location(-35.362938, 149.165085, 584, 270)
HOME = mavutil.location(40.071374969556928, -105.22978898137808, 1583.702759, 246)
HOME = mavutil.location(40.071374969556928,
-105.22978898137808,
1583.702759,
246)
class AutotestRover(Autotest):
def __init__(self, binary, viewerip=None, use_map=False, valgrind=False, gdb=False, speedup=10, frame=None, params=None, gdbserver=False):
super(AutotestRover, self).__init__()
class AutoTestRover(AutoTest):
def __init__(self,
binary,
viewerip=None,
use_map=False,
valgrind=False,
gdb=False,
speedup=10,
frame=None,
params=None,
gdbserver=False):
super(AutoTestRover, self).__init__()
self.binary = binary
self.options = '--sitl=127.0.0.1:5501 --out=127.0.0.1:19550 --streamrate=10'
self.options = ("--sitl=127.0.0.1:5501 --out=127.0.0.1:19550"
" --streamrate=10")
self.viewerip = viewerip
self.use_map = use_map
self.valgrind = valgrind
@ -31,7 +48,10 @@ class AutotestRover(Autotest):
self.params = params
self.gdbserver = gdbserver
self.home = "%f,%f,%u,%u" % (HOME.lat, HOME.lng, HOME.alt, HOME.heading)
self.home = "%f,%f,%u,%u" % (HOME.lat,
HOME.lng,
HOME.alt,
HOME.heading)
self.homeloc = None
self.speedup = speedup
self.speedup_default = 10
@ -48,17 +68,21 @@ class AutotestRover(Autotest):
if self.use_map:
self.options += ' --map'
self.sitl = util.start_SITL(self.binary, wipe=True, model=self.frame, home=self.home,
self.sitl = util.start_SITL(self.binary,
wipe=True,
model=self.frame,
home=self.home,
speedup=self.speedup_default)
self.mavproxy = util.start_MAVProxy_SITL('APMrover2')
progress("WAITING FOR PARAMETERS")
self.progress("WAITING FOR PARAMETERS")
self.mavproxy.expect('Received [0-9]+ parameters')
# setup test parameters
vinfo = vehicleinfo.VehicleInfo()
if self.params is None:
self.params = vinfo.options["APMrover2"]["frames"][self.frame]["default_params_filename"]
frames = vinfo.options["APMrover2"]["frames"]
self.params = frames[self.frame]["default_params_filename"]
if not isinstance(self.params, list):
self.params = [self.params]
for x in self.params:
@ -66,21 +90,27 @@ class AutotestRover(Autotest):
self.mavproxy.expect('Loaded [0-9]+ parameters')
self.set_parameter('LOG_REPLAY', 1)
self.set_parameter('LOG_DISARMED', 1)
progress("RELOADING SITL WITH NEW PARAMETERS")
self.progress("RELOADING SITL WITH NEW PARAMETERS")
# restart with new parms
util.pexpect_close(self.mavproxy)
util.pexpect_close(self.sitl)
self.sitl = util.start_SITL(self.binary, model=self.frame, home=self.home, speedup=self.speedup,
valgrind=self.valgrind, gdb=self.gdb, gdbserver=self.gdbserver)
self.mavproxy = util.start_MAVProxy_SITL('APMrover2', options=self.options)
self.sitl = util.start_SITL(self.binary,
model=self.frame,
home=self.home,
speedup=self.speedup,
valgrind=self.valgrind,
gdb=self.gdb,
gdbserver=self.gdbserver)
self.mavproxy = util.start_MAVProxy_SITL('APMrover2',
options=self.options)
self.mavproxy.expect('Telemetry log: (\S+)')
logfile = self.mavproxy.match.group(1)
progress("LOGFILE %s" % logfile)
self.progress("LOGFILE %s" % logfile)
buildlog = util.reltopdir("../buildlogs/APMrover2-test.tlog")
progress("buildlog=%s" % buildlog)
buildlog = self.buildlogs_path("APMrover2-test.tlog")
self.progress("buildlog=%s" % buildlog)
if os.path.exists(buildlog):
os.unlink(buildlog)
try:
@ -90,23 +120,26 @@ class AutotestRover(Autotest):
self.mavproxy.expect('Received [0-9]+ parameters')
util.expect_setup_callback(self.mavproxy, expect_callback)
util.expect_setup_callback(self.mavproxy, self.expect_callback)
expect_list_clear()
expect_list_extend([self.sitl, self.mavproxy])
self.expect_list_clear()
self.expect_list_extend([self.sitl, self.mavproxy])
progress("Started simulator")
self.progress("Started simulator")
# get a mavlink connection going
connection_string = '127.0.0.1:19550'
try:
self.mav = mavutil.mavlink_connection('127.0.0.1:19550', robust_parsing=True)
self.mav = mavutil.mavlink_connection(connection_string,
robust_parsing=True)
except Exception as msg:
progress("Failed to start mavlink connection on 127.0.0.1:19550" % msg)
self.progress("Failed to start mavlink connection on %s" %
connection_string)
raise
self.mav.message_hooks.append(message_hook)
self.mav.idle_hooks.append(idle_hook)
self.mav.message_hooks.append(self.message_hook)
self.mav.idle_hooks.append(self.idle_hook)
self.hasInit = True
progress("Ready to start testing!")
self.progress("Ready to start testing!")
def close(self):
if self.use_map:
@ -117,10 +150,12 @@ class AutotestRover(Autotest):
util.pexpect_close(self.mavproxy)
util.pexpect_close(self.sitl)
valgrind_log = util.valgrind_log_filepath(binary=self.binary, model=self.frame)
valgrind_log = util.valgrind_log_filepath(binary=self.binary,
model=self.frame)
if os.path.exists(valgrind_log):
os.chmod(valgrind_log, 0o644)
shutil.copy(valgrind_log, util.reltopdir("../buildlogs/APMrover2-valgrind.log"))
shutil.copy(valgrind_log,
self.buildlogs_path("APMrover2-valgrind.log"))
# def reset_and_arm(self):
# """Reset RC, set to MANUAL and arm."""
@ -136,7 +171,7 @@ class AutotestRover(Autotest):
# # TEST ARM RADIO
# def test_arm_motors_radio(self):
# """Test Arming motors with radio."""
# progress("Test arming motors with radio")
# self.progress("Test arming motors with radio")
# self.mavproxy.send('switch 6\n') # stabilize/manual mode
# self.wait_mode('MANUAL')
# self.mavproxy.send('rc 3 1500\n') # throttle at zero
@ -145,13 +180,13 @@ class AutotestRover(Autotest):
# self.mavproxy.send('rc 1 1500\n')
#
# self.mav.motors_armed_wait()
# progress("MOTORS ARMED OK")
# self.progress("MOTORS ARMED OK")
# return True
#
# # TEST DISARM RADIO
# def test_disarm_motors_radio(self):
# """Test Disarm motors with radio."""
# progress("Test disarming motors with radio")
# self.progress("Test disarming motors with radio")
# self.mavproxy.send('switch 6\n') # stabilize/manual mode
# self.wait_mode('MANUAL')
# self.mavproxy.send('rc 3 1500\n') # throttle at zero
@ -163,21 +198,21 @@ class AutotestRover(Autotest):
# self.mav.wait_heartbeat()
# if not self.mav.motors_armed():
# disarm_delay = self.get_sim_time() - tstart
# progress("MOTORS DISARMED OK WITH RADIO")
# self.progress("MOTORS DISARMED OK WITH RADIO")
# self.mavproxy.send('rc 1 1500\n') # steer full right
# self.mavproxy.send('rc 4 1500\n') # yaw full right
# progress("Disarm in %ss" % disarm_delay)
# self.progress("Disarm in %ss" % disarm_delay)
# return True
# progress("FAILED TO DISARM WITH RADIO")
# self.progress("FAILED TO DISARM WITH RADIO")
# return False
#
# # TEST AUTO DISARM
# def test_autodisarm_motors(self):
# """Test Autodisarm motors."""
# progress("Test Autodisarming motors")
# self.progress("Test Autodisarming motors")
# self.mavproxy.send('switch 6\n') # stabilize/manual mode
# # NOT IMPLEMENTED ON ROVER
# progress("MOTORS AUTODISARMED OK")
# self.progress("MOTORS AUTODISARMED OK")
# return True
#
# # TEST RC OVERRIDE
@ -188,7 +223,7 @@ class AutotestRover(Autotest):
#
# def test_mission(self, filename):
# """Test a mission from a file."""
# progress("Test mission %s" % filename)
# self.progress("Test mission %s" % filename)
# num_wp = self.load_mission_from_file(filename)
# self.mavproxy.send('wp set 1\n')
# self.mav.wait_heartbeat()
@ -200,7 +235,7 @@ class AutotestRover(Autotest):
# self.mavproxy.expect("Mission Complete")
# self.mav.wait_heartbeat()
# self.wait_mode('HOLD')
# progress("test: MISSION COMPLETE: passed=%s" % ret)
# self.progress("test: MISSION COMPLETE: passed=%s" % ret)
# return ret
##########################################################
@ -209,7 +244,7 @@ class AutotestRover(Autotest):
# Drive a square in manual mode
def drive_square(self, side=50):
"""Drive a square, Driving N then E ."""
progress("TEST SQUARE")
self.progress("TEST SQUARE")
success = True
# use LEARNING Mode
@ -217,54 +252,54 @@ class AutotestRover(Autotest):
self.wait_mode('MANUAL')
# first aim north
progress("\nTurn right towards north")
self.progress("\nTurn right towards north")
if not self.reach_heading_manual(10):
success = False
# save bottom left corner of box as waypoint
progress("Save WP 1 & 2")
self.progress("Save WP 1 & 2")
self.save_wp()
# pitch forward to fly north
progress("\nGoing north %u meters" % side)
self.progress("\nGoing north %u meters" % side)
if not self.reach_distance_manual(side):
success = False
# save top left corner of square as waypoint
progress("Save WP 3")
self.progress("Save WP 3")
self.save_wp()
# roll right to fly east
progress("\nGoing east %u meters" % side)
self.progress("\nGoing east %u meters" % side)
if not self.reach_heading_manual(100):
success = False
if not self.reach_distance_manual(side):
success = False
# save top right corner of square as waypoint
progress("Save WP 4")
self.progress("Save WP 4")
self.save_wp()
# pitch back to fly south
progress("\nGoing south %u meters" % side)
self.progress("\nGoing south %u meters" % side)
if not self.reach_heading_manual(190):
success = False
if not self.reach_distance_manual(side):
success = False
# save bottom right corner of square as waypoint
progress("Save WP 5")
self.progress("Save WP 5")
self.save_wp()
# roll left to fly west
progress("\nGoing west %u meters" % side)
self.progress("\nGoing west %u meters" % side)
if not self.reach_heading_manual(280):
success = False
if not self.reach_distance_manual(side):
success = False
# save bottom left corner of square (should be near home) as waypoint
progress("Save WP 6")
self.progress("Save WP 6")
self.save_wp()
return success
@ -275,23 +310,24 @@ class AutotestRover(Autotest):
self.wait_mode('MANUAL')
self.set_rc(3, 2000)
progress("Driving left circuit")
self.progress("Driving left circuit")
# do 4 turns
for i in range(0, 4):
# hard left
progress("Starting turn %u" % i)
self.progress("Starting turn %u" % i)
self.set_rc(1, 1000)
if not self.wait_heading(270 - (90*i), accuracy=10):
return False
self.set_rc(1, 1500)
progress("Starting leg %u" % i)
self.progress("Starting leg %u" % i)
if not self.wait_distance(50, accuracy=7):
return False
self.set_rc(3, 1500)
progress("Circuit complete")
self.progress("Circuit complete")
return True
# def test_throttle_failsafe(self, home, distance_min=10, side=60, timeout=300):
# def test_throttle_failsafe(self, home, distance_min=10, side=60,
# timeout=300):
# """Fly east, Failsafe, return, land."""
#
# self.mavproxy.send('switch 6\n') # manual mode
@ -299,17 +335,17 @@ class AutotestRover(Autotest):
# self.mavproxy.send("param set FS_ACTION 1\n")
#
# # first aim east
# progress("turn east")
# self.progress("turn east")
# if not self.reach_heading_manual(135):
# return False
#
# # fly east 60 meters
# progress("# Going forward %u meters" % side)
# self.progress("# Going forward %u meters" % side)
# if not self.reach_distance_manual(side):
# return False
#
# # pull throttle low
# progress("# Enter Failsafe")
# self.progress("# Enter Failsafe")
# self.mavproxy.send('rc 3 900\n')
#
# tstart = self.get_sim_time()
@ -318,10 +354,11 @@ class AutotestRover(Autotest):
# m = self.mav.recv_match(type='VFR_HUD', blocking=True)
# pos = self.mav.location()
# home_distance = self.get_distance(home, pos)
# progress("Alt: %u HomeDistance: %.0f" % (m.alt, home_distance))
# self.progress("Alt: %u HomeDistance: %.0f" %
# (m.alt, home_distance))
# # check if we've reached home
# if home_distance <= distance_min:
# progress("RTL Complete")
# self.progress("RTL Complete")
# success = True
#
# # reduce throttle
@ -332,10 +369,11 @@ class AutotestRover(Autotest):
# self.wait_mode('MANUAL')
#
# if success:
# progress("Reached failsafe home OK")
# self.progress("Reached failsafe home OK")
# return True
# else:
# progress("Failed to reach Home on failsafe RTL - timed out after %u seconds" % timeout)
# self.progress("Failed to reach Home on failsafe RTL - "
# "timed out after %u seconds" % timeout)
# return False
#################################################
@ -343,7 +381,7 @@ class AutotestRover(Autotest):
#################################################
def drive_mission(self, filename):
"""Drive a mission from a file."""
progress("Driving mission %s" % filename)
self.progress("Driving mission %s" % filename)
self.mavproxy.send('wp load %s\n' % filename)
self.mavproxy.expect('Flight plan received')
self.mavproxy.send('wp list\n')
@ -354,21 +392,23 @@ class AutotestRover(Autotest):
if not self.wait_waypoint(1, 4, max_dist=5):
return False
self.wait_mode('HOLD')
progress("Mission OK")
self.progress("Mission OK")
return True
def do_get_banner(self):
self.mavproxy.send("long DO_SEND_BANNER 1\n")
start = time.time()
while True:
m = self.mav.recv_match(type='STATUSTEXT', blocking=True, timeout=1)
m = self.mav.recv_match(type='STATUSTEXT',
blocking=True,
timeout=1)
if m is not None and "ArduRover" in m.text:
progress("banner received: %s" % m.text)
self.progress("banner received: %s" % m.text)
return True
if time.time() - start > 10:
break
progress("banner not received")
self.progress("banner not received")
return False
@ -427,15 +467,23 @@ class AutotestRover(Autotest):
delta = distance_without_brakes - distance_with_brakes
if delta < distance_without_brakes * 0.05: # 5% isn't asking for much
progress("Brakes have negligible effect (with=%0.2fm without=%0.2fm delta=%0.2fm)" % (distance_with_brakes, distance_without_brakes, delta))
self.progress("Brakes have negligible effect"
"(with=%0.2fm without=%0.2fm delta=%0.2fm)" %
(distance_with_brakes,
distance_without_brakes,
delta))
return False
else:
progress("Brakes work (with=%0.2fm without=%0.2fm delta=%0.2fm)" % (distance_with_brakes, distance_without_brakes, delta))
self.progress(
"Brakes work (with=%0.2fm without=%0.2fm delta=%0.2fm)" %
(distance_with_brakes, distance_without_brakes, delta))
return True
def drive_rtl_mission(self):
mission_filepath = os.path.join(testdir, "ArduRover-Missions", "rtl.txt")
mission_filepath = os.path.join(testdir,
"ArduRover-Missions",
"rtl.txt")
self.mavproxy.send('wp load %s\n' % mission_filepath)
self.mavproxy.expect('Flight plan received')
self.mavproxy.send('switch 4\n') # auto mode
@ -447,16 +495,16 @@ class AutotestRover(Autotest):
blocking=True,
timeout=0.1)
if m is None:
progress("Did not receive NAV_CONTROLLER_OUTPUT message")
self.progress("Did not receive NAV_CONTROLLER_OUTPUT message")
return False
wp_dist_min = 5
if m.wp_dist < wp_dist_min:
progress("Did not start at least 5 metres from destination")
self.progress("Did not start at least 5 metres from destination")
return False
progress("NAV_CONTROLLER_OUTPUT.wp_dist looks good (%u >= %u)" %
(m.wp_dist, wp_dist_min,))
self.progress("NAV_CONTROLLER_OUTPUT.wp_dist looks good (%u >= %u)" %
(m.wp_dist, wp_dist_min,))
self.wait_mode('HOLD')
@ -464,112 +512,114 @@ class AutotestRover(Autotest):
home_distance = self.get_distance(HOME, pos)
home_distance_max = 5
if home_distance > home_distance_max:
progress("Did not get home (%u metres distant > %u)" %
(home_distance, home_distance_max))
self.progress("Did not get home (%u metres distant > %u)" %
(home_distance, home_distance_max))
return False
self.mavproxy.send('switch 6\n')
self.wait_mode('MANUAL')
progress("RTL Mission OK")
self.progress("RTL Mission OK")
return True
def autotest(self):
"""Autotest APMrover2 in SITL."""
if not self.hasInit:
self.init()
progress("Started simulator")
self.progress("Started simulator")
failed = False
e = 'None'
try:
progress("Waiting for a heartbeat with mavlink protocol %s" % self.mav.WIRE_PROTOCOL_VERSION)
self.progress("Waiting for a heartbeat with mavlink protocol %s" %
self.mav.WIRE_PROTOCOL_VERSION)
self.mav.wait_heartbeat()
progress("Setting up RC parameters")
self.progress("Setting up RC parameters")
self.set_rc_default()
self.set_rc(8, 1800)
progress("Waiting for GPS fix")
self.progress("Waiting for GPS fix")
self.mav.wait_gps_fix()
self.homeloc = self.mav.location()
progress("Home location: %s" % self.homeloc)
self.progress("Home location: %s" % self.homeloc)
self.mavproxy.send('switch 6\n') # Manual mode
self.wait_mode('MANUAL')
progress("Waiting reading for arm")
self.progress("Waiting reading for arm")
self.wait_ready_to_arm()
if not self.arm_vehicle():
progress("Failed to ARM")
self.progress("Failed to ARM")
failed = True
progress("#")
progress("########## Drive an RTL mission ##########")
progress("#")
self.progress("#")
self.progress("########## Drive an RTL mission ##########")
self.progress("#")
# Drive a square in learning mode
# self.reset_and_arm()
if not self.drive_rtl_mission():
progress("Failed RTL mission")
self.progress("Failed RTL mission")
failed = True
progress("#")
progress("########## Drive a square and save WPs with CH7 switch ##########")
progress("#")
self.progress("#")
self.progress("########## Drive a square and save WPs with CH7"
"switch ##########")
self.progress("#")
# Drive a square in learning mode
# self.reset_and_arm()
if not self.drive_square():
progress("Failed drive square")
self.progress("Failed drive square")
failed = True
if not self.drive_mission(os.path.join(testdir, "rover1.txt")):
progress("Failed mission")
self.progress("Failed mission")
failed = True
if not self.drive_brake():
progress("Failed brake")
self.progress("Failed brake")
failed = True
if not self.disarm_vehicle():
progress("Failed to DISARM")
self.progress("Failed to DISARM")
failed = True
# do not move this to be the first test. MAVProxy's dedupe
# function may bite you.
progress("Getting banner")
self.progress("Getting banner")
if not self.do_get_banner():
progress("FAILED: get banner")
self.progress("FAILED: get banner")
failed = True
progress("Getting autopilot capabilities")
self.progress("Getting autopilot capabilities")
if not self.do_get_autopilot_capabilities():
progress("FAILED: get capabilities")
self.progress("FAILED: get capabilities")
failed = True
progress("Setting mode via MAV_COMMAND_DO_SET_MODE")
self.progress("Setting mode via MAV_COMMAND_DO_SET_MODE")
if not self.do_set_mode_via_command_long():
failed = True
# Throttle Failsafe
progress("#")
progress("########## Test Failsafe ##########")
progress("#")
self.progress("#")
self.progress("########## Test Failsafe ##########")
self.progress("#")
# self.reset_and_arm()
# if not self.test_throttle_failsafe(HOME, distance_min=4):
# progress("Throttle failsafe failed")
# self.progress("Throttle failsafe failed")
# sucess = False
if not self.log_download(util.reltopdir("../buildlogs/APMrover2-log.bin")):
progress("Failed log download")
if not self.log_download(self.buildlogs_path("APMrover2-log.bin")):
self.progress("Failed log download")
failed = True
# if not drive_left_circuit(self):
# progress("Failed left circuit")
# self.progress("Failed left circuit")
# failed = True
# if not drive_RTL(self):
# progress("Failed RTL")
# self.progress("Failed RTL")
# failed = True
except pexpect.TIMEOUT as e:
progress("Failed with timeout")
self.progress("Failed with timeout")
failed = True
self.close()
if failed:
progress("FAILED: %s" % e)
self.progress("FAILED: %s" % e)
return False
return True

File diff suppressed because it is too large Load Diff

View File

@ -9,20 +9,31 @@ import shutil
import pexpect
from pymavlink import mavutil
from common import *
from pysim import util
from common import AutoTest
# get location of scripts
testdir = os.path.dirname(os.path.realpath(__file__))
HOME = mavutil.location(-35.362938, 149.165085, 585, 354)
WIND = "0,180,0.2" # speed,direction,variance
class AutotestPlane(Autotest):
def __init__(self, binary, viewerip=None, use_map=False, valgrind=False, gdb=False, speedup=10, frame=None, params=None, gdbserver=False):
super(AutotestPlane, self).__init__()
class AutoTestPlane(AutoTest):
def __init__(self,
binary,
viewerip=None,
use_map=False,
valgrind=False,
gdb=False,
speedup=10,
frame=None,
params=None,
gdbserver=False):
super(AutoTestPlane, self).__init__()
self.binary = binary
self.options = '--sitl=127.0.0.1:5501 --out=127.0.0.1:19550 --streamrate=10'
self.options = ('--sitl=127.0.0.1:5501 --out=127.0.0.1:19550'
' --streamrate=10')
self.viewerip = viewerip
self.use_map = use_map
self.valgrind = valgrind
@ -31,7 +42,10 @@ class AutotestPlane(Autotest):
self.params = params
self.gdbserver = gdbserver
self.home = "%f,%f,%u,%u" % (HOME.lat, HOME.lng, HOME.alt, HOME.heading)
self.home = "%f,%f,%u,%u" % (HOME.lat,
HOME.lng,
HOME.alt,
HOME.heading)
self.homeloc = None
self.speedup = speedup
self.speedup_default = 10
@ -48,16 +62,25 @@ class AutotestPlane(Autotest):
if self.use_map:
self.options += ' --map'
self.sitl = util.start_SITL(self.binary, wipe=True, model=self.frame, home=self.home, speedup=self.speedup,
defaults_file=os.path.join(testdir, 'default_params/plane-jsbsim.parm'),
valgrind=self.valgrind, gdb=self.gdb, gdbserver=self.gdbserver)
self.mavproxy = util.start_MAVProxy_SITL('ArduPlane', options=self.options)
defaults_file = os.path.join(testdir,
'default_params/plane-jsbsim.parm')
self.sitl = util.start_SITL(self.binary,
wipe=True,
model=self.frame,
home=self.home,
speedup=self.speedup,
defaults_file=defaults_file,
valgrind=self.valgrind,
gdb=self.gdb,
gdbserver=self.gdbserver)
self.mavproxy = util.start_MAVProxy_SITL('ArduPlane',
options=self.options)
self.mavproxy.expect('Telemetry log: (\S+)')
logfile = self.mavproxy.match.group(1)
progress("LOGFILE %s" % logfile)
self.progress("LOGFILE %s" % logfile)
buildlog = util.reltopdir("../buildlogs/ArduPlane-test.tlog")
progress("buildlog=%s" % buildlog)
buildlog = self.buildlogs_path("ArduPlane-test.tlog")
self.progress("buildlog=%s" % buildlog)
if os.path.exists(buildlog):
os.unlink(buildlog)
try:
@ -67,23 +90,26 @@ class AutotestPlane(Autotest):
self.mavproxy.expect('Received [0-9]+ parameters')
util.expect_setup_callback(self.mavproxy, expect_callback)
util.expect_setup_callback(self.mavproxy, self.expect_callback)
expect_list_clear()
expect_list_extend([self.sitl, self.mavproxy])
self.expect_list_clear()
self.expect_list_extend([self.sitl, self.mavproxy])
progress("Started simulator")
self.progress("Started simulator")
# get a mavlink connection going
connection_string = '127.0.0.1:19550'
try:
self.mav = mavutil.mavlink_connection('127.0.0.1:19550', robust_parsing=True)
self.mav = mavutil.mavlink_connection(connection_string,
robust_parsing=True)
except Exception as msg:
progress("Failed to start mavlink connection on 127.0.0.1:19550" % msg)
self.progress("Failed to start mavlink connection on %s: %s"
% (connection_string, msg))
raise
self.mav.message_hooks.append(message_hook)
self.mav.idle_hooks.append(idle_hook)
self.mav.message_hooks.append(self.message_hook)
self.mav.idle_hooks.append(self.idle_hook)
self.hasInit = True
progress("Ready to start testing!")
self.progress("Ready to start testing!")
def close(self):
if self.use_map:
@ -94,10 +120,12 @@ class AutotestPlane(Autotest):
util.pexpect_close(self.mavproxy)
util.pexpect_close(self.sitl)
valgrind_log = util.valgrind_log_filepath(binary=self.binary, model=self.frame)
valgrind_log = util.valgrind_log_filepath(binary=self.binary,
model=self.frame)
if os.path.exists(valgrind_log):
os.chmod(valgrind_log, 0o644)
shutil.copy(valgrind_log, util.reltopdir("../buildlogs/ArduPlane-valgrind.log"))
shutil.copy(valgrind_log,
self.buildlogs_path("ArduPlane-valgrind.log"))
def takeoff(self):
"""Takeoff get to 30m altitude."""
@ -128,13 +156,15 @@ class AutotestPlane(Autotest):
self.set_rc(3, 2000)
# gain a bit of altitude
if not self.wait_altitude(self.homeloc.alt+150, self.homeloc.alt+180, timeout=30):
if not self.wait_altitude(self.homeloc.alt+150,
self.homeloc.alt+180,
timeout=30):
return False
# level off
self.set_rc(2, 1500)
progress("TAKEOFF COMPLETE")
self.progress("TAKEOFF COMPLETE")
return True
def fly_left_circuit(self):
@ -144,43 +174,45 @@ class AutotestPlane(Autotest):
self.set_rc(3, 2000)
if not self.wait_level_flight():
return False
progress("Flying left circuit")
self.progress("Flying left circuit")
# do 4 turns
for i in range(0, 4):
# hard left
progress("Starting turn %u" % i)
self.progress("Starting turn %u" % i)
self.set_rc(1, 1000)
if not self.wait_heading(270 - (90*i), accuracy=10):
return False
self.set_rc(1, 1500)
progress("Starting leg %u" % i)
self.progress("Starting leg %u" % i)
if not self.wait_distance(100, accuracy=20):
return False
progress("Circuit complete")
self.progress("Circuit complete")
return True
def fly_RTL(self):
"""Fly to home."""
progress("Flying home in RTL")
self.progress("Flying home in RTL")
self.mavproxy.send('switch 2\n')
self.wait_mode('RTL')
if not self.wait_location(self.homeloc, accuracy=120,
target_altitude=self.homeloc.alt+100, height_accuracy=20,
if not self.wait_location(self.homeloc,
accuracy=120,
target_altitude=self.homeloc.alt+100,
height_accuracy=20,
timeout=180):
return False
progress("RTL Complete")
self.progress("RTL Complete")
return True
def fly_LOITER(self, num_circles=4):
"""Loiter where we are."""
progress("Testing LOITER for %u turns" % num_circles)
self.progress("Testing LOITER for %u turns" % num_circles)
self.mavproxy.send('loiter\n')
self.wait_mode('LOITER')
m = self.mav.recv_match(type='VFR_HUD', blocking=True)
initial_alt = m.alt
progress("Initial altitude %u\n" % initial_alt)
self.progress("Initial altitude %u\n" % initial_alt)
while num_circles > 0:
if not self.wait_heading(0, accuracy=10, timeout=60):
@ -188,31 +220,32 @@ class AutotestPlane(Autotest):
if not self.wait_heading(180, accuracy=10, timeout=60):
return False
num_circles -= 1
progress("Loiter %u circles left" % num_circles)
self.progress("Loiter %u circles left" % num_circles)
m = self.mav.recv_match(type='VFR_HUD', blocking=True)
final_alt = m.alt
progress("Final altitude %u initial %u\n" % (final_alt, initial_alt))
self.progress("Final altitude %u initial %u\n" %
(final_alt, initial_alt))
self.mavproxy.send('mode FBWA\n')
self.wait_mode('FBWA')
if abs(final_alt - initial_alt) > 20:
progress("Failed to maintain altitude")
self.progress("Failed to maintain altitude")
return False
progress("Completed Loiter OK")
self.progress("Completed Loiter OK")
return True
def fly_CIRCLE(self, num_circles=1):
"""Circle where we are."""
progress("Testing CIRCLE for %u turns" % num_circles)
self.progress("Testing CIRCLE for %u turns" % num_circles)
self.mavproxy.send('mode CIRCLE\n')
self.wait_mode('CIRCLE')
m = self.mav.recv_match(type='VFR_HUD', blocking=True)
initial_alt = m.alt
progress("Initial altitude %u\n" % initial_alt)
self.progress("Initial altitude %u\n" % initial_alt)
while num_circles > 0:
if not self.wait_heading(0, accuracy=10, timeout=60):
@ -220,26 +253,27 @@ class AutotestPlane(Autotest):
if not self.wait_heading(180, accuracy=10, timeout=60):
return False
num_circles -= 1
progress("CIRCLE %u circles left" % num_circles)
self.progress("CIRCLE %u circles left" % num_circles)
m = self.mav.recv_match(type='VFR_HUD', blocking=True)
final_alt = m.alt
progress("Final altitude %u initial %u\n" % (final_alt, initial_alt))
self.progress("Final altitude %u initial %u\n" %
(final_alt, initial_alt))
self.mavproxy.send('mode FBWA\n')
self.wait_mode('FBWA')
if abs(final_alt - initial_alt) > 20:
progress("Failed to maintain altitude")
self.progress("Failed to maintain altitude")
return False
progress("Completed CIRCLE OK")
self.progress("Completed CIRCLE OK")
return True
def wait_level_flight(self, accuracy=5, timeout=30):
"""Wait for level flight."""
tstart = self.get_sim_time()
progress("Waiting for level flight")
self.progress("Waiting for level flight")
self.set_rc(1, 1500)
self.set_rc(2, 1500)
self.set_rc(4, 1500)
@ -247,11 +281,11 @@ class AutotestPlane(Autotest):
m = self.mav.recv_match(type='ATTITUDE', blocking=True)
roll = math.degrees(m.roll)
pitch = math.degrees(m.pitch)
progress("Roll=%.1f Pitch=%.1f" % (roll, pitch))
self.progress("Roll=%.1f Pitch=%.1f" % (roll, pitch))
if math.fabs(roll) <= accuracy and math.fabs(pitch) <= accuracy:
progress("Attained level flight")
self.progress("Attained level flight")
return True
progress("Failed to attain level flight")
self.progress("Failed to attain level flight")
return False
def change_altitude(self, altitude, accuracy=30):
@ -266,7 +300,8 @@ class AutotestPlane(Autotest):
if not self.wait_altitude(altitude-accuracy/2, altitude+accuracy/2):
return False
self.set_rc(2, 1500)
progress("Reached target altitude at %u" % self.mav.messages['VFR_HUD'].alt)
self.progress("Reached target altitude at %u" %
self.mav.messages['VFR_HUD'].alt)
return self.wait_level_flight()
def axial_left_roll(self, count=1):
@ -281,7 +316,7 @@ class AutotestPlane(Autotest):
self.wait_mode('MANUAL')
while count > 0:
progress("Starting roll")
self.progress("Starting roll")
self.set_rc(1, 1000)
if not self.wait_roll(-150, accuracy=90):
self.set_rc(1, 1500)
@ -313,7 +348,7 @@ class AutotestPlane(Autotest):
self.wait_mode('MANUAL')
while count > 0:
progress("Starting loop")
self.progress("Starting loop")
self.set_rc(2, 1000)
if not self.wait_pitch(-60, accuracy=20):
return False
@ -342,7 +377,7 @@ class AutotestPlane(Autotest):
count = 1
while count > 0:
progress("Starting roll")
self.progress("Starting roll")
self.set_rc(1, 2000)
if not self.wait_roll(-150, accuracy=90):
return False
@ -376,7 +411,7 @@ class AutotestPlane(Autotest):
count = 1
while count > 0:
progress("Starting roll")
self.progress("Starting roll")
self.set_rc(1, 1000)
if not self.wait_roll(-150, accuracy=90):
return False
@ -398,7 +433,7 @@ class AutotestPlane(Autotest):
count = 2
while count > 0:
progress("Starting loop")
self.progress("Starting loop")
self.set_rc(2, 1000)
if not self.wait_pitch(-60, accuracy=20):
return False
@ -429,55 +464,56 @@ class AutotestPlane(Autotest):
m = self.mav.recv_match(type='VFR_HUD', blocking=True)
initial_alt = m.alt
progress("Initial altitude %u\n" % initial_alt)
self.progress("Initial altitude %u\n" % initial_alt)
progress("Flying right circuit")
self.progress("Flying right circuit")
# do 4 turns
for i in range(0, 4):
# hard left
progress("Starting turn %u" % i)
self.progress("Starting turn %u" % i)
self.set_rc(1, 1800)
if not self.wait_heading(0 + (90*i), accuracy=20, timeout=60):
self.set_rc(1, 1500)
return False
self.set_rc(1, 1500)
progress("Starting leg %u" % i)
self.progress("Starting leg %u" % i)
if not self.wait_distance(100, accuracy=20):
return False
progress("Circuit complete")
self.progress("Circuit complete")
progress("Flying rudder left circuit")
self.progress("Flying rudder left circuit")
# do 4 turns
for i in range(0, 4):
# hard left
progress("Starting turn %u" % i)
self.progress("Starting turn %u" % i)
self.set_rc(4, 1900)
if not self.wait_heading(360 - (90*i), accuracy=20, timeout=60):
self.set_rc(4, 1500)
return False
self.set_rc(4, 1500)
progress("Starting leg %u" % i)
self.progress("Starting leg %u" % i)
if not self.wait_distance(100, accuracy=20):
return False
progress("Circuit complete")
self.progress("Circuit complete")
m = self.mav.recv_match(type='VFR_HUD', blocking=True)
final_alt = m.alt
progress("Final altitude %u initial %u\n" % (final_alt, initial_alt))
self.progress("Final altitude %u initial %u\n" %
(final_alt, initial_alt))
# back to FBWA
self.mavproxy.send('mode FBWA\n')
self.wait_mode('FBWA')
if abs(final_alt - initial_alt) > 20:
progress("Failed to maintain altitude")
self.progress("Failed to maintain altitude")
return False
return self.wait_level_flight()
def fly_mission(self, filename, height_accuracy=-1, target_altitude=None):
"""Fly a mission from a file."""
progress("Flying mission %s" % filename)
self.progress("Flying mission %s" % filename)
self.mavproxy.send('wp load %s\n' % filename)
self.mavproxy.expect('Flight plan received')
self.mavproxy.send('wp list\n')
@ -489,7 +525,7 @@ class AutotestPlane(Autotest):
if not self.wait_groundspeed(0, 0.5, timeout=60):
return False
self.mavproxy.expect("Auto disarmed")
progress("Mission OK")
self.progress("Mission OK")
return True
def autotest(self):
@ -501,81 +537,83 @@ class AutotestPlane(Autotest):
fail_list = []
e = 'None'
try:
progress("Waiting for a heartbeat with mavlink protocol %s" % self.mav.WIRE_PROTOCOL_VERSION)
self.progress("Waiting for a heartbeat with mavlink protocol %s"
% self.mav.WIRE_PROTOCOL_VERSION)
self.mav.wait_heartbeat()
progress("Setting up RC parameters")
self.progress("Setting up RC parameters")
self.set_rc_default()
self.set_rc(3, 1000)
self.set_rc(8, 1800)
progress("Waiting for GPS fix")
self.progress("Waiting for GPS fix")
self.mav.recv_match(condition='VFR_HUD.alt>10', blocking=True)
self.mav.wait_gps_fix()
while self.mav.location().alt < 10:
self.mav.wait_gps_fix()
self.homeloc = self.mav.location()
progress("Home location: %s" % self.homeloc)
self.progress("Home location: %s" % self.homeloc)
if not self.takeoff():
progress("Failed takeoff")
self.progress("Failed takeoff")
failed = True
fail_list.append("takeoff")
if not self.fly_left_circuit():
progress("Failed left circuit")
self.progress("Failed left circuit")
failed = True
fail_list.append("left_circuit")
if not self.axial_left_roll(1):
progress("Failed left roll")
self.progress("Failed left roll")
failed = True
fail_list.append("left_roll")
if not self.inside_loop():
progress("Failed inside loop")
self.progress("Failed inside loop")
failed = True
fail_list.append("inside_loop")
if not self.test_stabilize():
progress("Failed stabilize test")
self.progress("Failed stabilize test")
failed = True
fail_list.append("stabilize")
if not self.test_acro():
progress("Failed ACRO test")
self.progress("Failed ACRO test")
failed = True
fail_list.append("acro")
if not self.test_FBWB():
progress("Failed FBWB test")
self.progress("Failed FBWB test")
failed = True
fail_list.append("fbwb")
if not self.test_FBWB(mode='CRUISE'):
progress("Failed CRUISE test")
self.progress("Failed CRUISE test")
failed = True
fail_list.append("cruise")
if not self.fly_RTL():
progress("Failed RTL")
self.progress("Failed RTL")
failed = True
fail_list.append("RTL")
if not self.fly_LOITER():
progress("Failed LOITER")
self.progress("Failed LOITER")
failed = True
fail_list.append("LOITER")
if not self.fly_CIRCLE():
progress("Failed CIRCLE")
self.progress("Failed CIRCLE")
failed = True
fail_list.append("LOITER")
if not self.fly_mission(os.path.join(testdir, "ap1.txt"), height_accuracy=10,
if not self.fly_mission(os.path.join(testdir, "ap1.txt"),
height_accuracy=10,
target_altitude=self.homeloc.alt+100):
progress("Failed mission")
self.progress("Failed mission")
failed = True
fail_list.append("mission")
if not self.log_download(util.reltopdir("../buildlogs/ArduPlane-log.bin")):
progress("Failed log download")
if not self.log_download(self.buildlogs_path("ArduPlane-log.bin")):
self.progress("Failed log download")
failed = True
fail_list.append("log_download")
except pexpect.TIMEOUT as e:
progress("Failed with timeout")
self.progress("Failed with timeout")
failed = True
fail_list.append("timeout")
self.close()
if failed:
progress("FAILED: %s" % e)
progress("Fail list: %s" % fail_list)
self.progress("FAILED: %s" % e)
self.progress("Fail list: %s" % fail_list)
return False
return True

View File

@ -8,20 +8,31 @@ import shutil
import pexpect
from pymavlink import mavutil
from common import *
from pysim import util
from pysim import vehicleinfo
from common import AutoTest
# get location of scripts
testdir = os.path.dirname(os.path.realpath(__file__))
HOME = mavutil.location(33.810313, -118.393867, 0, 185)
class AutotestSub(Autotest):
def __init__(self, binary, viewerip=None, use_map=False, valgrind=False, gdb=False, speedup=10, frame=None, params=None, gdbserver=False):
super(AutotestSub, self).__init__()
class AutoTestSub(AutoTest):
def __init__(self,
binary,
viewerip=None,
use_map=False,
valgrind=False,
gdb=False,
speedup=10,
frame=None,
params=None,
gdbserver=False):
super(AutoTestSub, self).__init__()
self.binary = binary
self.options = '--sitl=127.0.0.1:5501 --out=127.0.0.1:19550 --streamrate=10'
self.options = ('--sitl=127.0.0.1:5501 --out=127.0.0.1:19550'
' --streamrate=10')
self.viewerip = viewerip
self.use_map = use_map
self.valgrind = valgrind
@ -30,7 +41,10 @@ class AutotestSub(Autotest):
self.params = params
self.gdbserver = gdbserver
self.home = "%f,%f,%u,%u" % (HOME.lat, HOME.lng, HOME.alt, HOME.heading)
self.home = "%f,%f,%u,%u" % (HOME.lat,
HOME.lng,
HOME.alt,
HOME.heading)
self.homeloc = None
self.speedup = speedup
self.speedup_default = 10
@ -47,17 +61,21 @@ class AutotestSub(Autotest):
if self.use_map:
self.options += ' --map'
self.sitl = util.start_SITL(self.binary, wipe=True, model=self.frame, home=self.home,
self.sitl = util.start_SITL(self.binary,
wipe=True,
model=self.frame,
home=self.home,
speedup=self.speedup_default)
self.mavproxy = util.start_MAVProxy_SITL('ArduSub')
progress("WAITING FOR PARAMETERS")
self.progress("WAITING FOR PARAMETERS")
self.mavproxy.expect('Received [0-9]+ parameters')
# setup test parameters
vinfo = vehicleinfo.VehicleInfo()
if self.params is None:
self.params = vinfo.options["ArduSub"]["frames"][self.frame]["default_params_filename"]
frames = vinfo.options["ArduSub"]["frames"]
self.params = frames[self.frame]["default_params_filename"]
if not isinstance(self.params, list):
self.params = [self.params]
for x in self.params:
@ -65,21 +83,27 @@ class AutotestSub(Autotest):
self.mavproxy.expect('Loaded [0-9]+ parameters')
self.set_parameter('LOG_REPLAY', 1)
self.set_parameter('LOG_DISARMED', 1)
progress("RELOADING SITL WITH NEW PARAMETERS")
self.progress("RELOADING SITL WITH NEW PARAMETERS")
# restart with new parms
util.pexpect_close(self.mavproxy)
util.pexpect_close(self.sitl)
self.sitl = util.start_SITL(self.binary, model=self.frame, home=self.home, speedup=self.speedup,
valgrind=self.valgrind, gdb=self.gdb, gdbserver=self.gdbserver)
self.mavproxy = util.start_MAVProxy_SITL('ArduSub', options=self.options)
self.sitl = util.start_SITL(self.binary,
model=self.frame,
home=self.home,
speedup=self.speedup,
valgrind=self.valgrind,
gdb=self.gdb,
gdbserver=self.gdbserver)
self.mavproxy = util.start_MAVProxy_SITL('ArduSub',
options=self.options)
self.mavproxy.expect('Telemetry log: (\S+)')
logfile = self.mavproxy.match.group(1)
progress("LOGFILE %s" % logfile)
self.progress("LOGFILE %s" % logfile)
buildlog = util.reltopdir("../buildlogs/ArduSub-test.tlog")
progress("buildlog=%s" % buildlog)
self.progress("buildlog=%s" % buildlog)
if os.path.exists(buildlog):
os.unlink(buildlog)
try:
@ -89,23 +113,26 @@ class AutotestSub(Autotest):
self.mavproxy.expect('Received [0-9]+ parameters')
util.expect_setup_callback(self.mavproxy, expect_callback)
util.expect_setup_callback(self.mavproxy, self.expect_callback)
expect_list_clear()
expect_list_extend([self.sitl, self.mavproxy])
self.expect_list_clear()
self.expect_list_extend([self.sitl, self.mavproxy])
progress("Started simulator")
self.progress("Started simulator")
# get a mavlink connection going
connection_string = '127.0.0.1:19550'
try:
self.mav = mavutil.mavlink_connection('127.0.0.1:19550', robust_parsing=True)
self.mav = mavutil.mavlink_connection(connection_string,
robust_parsing=True)
except Exception as msg:
progress("Failed to start mavlink connection on 127.0.0.1:19550" % msg)
self.progress("Failed to start mavlink connection on %s: %s" %
(connection_string, msg,))
raise
self.mav.message_hooks.append(message_hook)
self.mav.idle_hooks.append(idle_hook)
self.mav.message_hooks.append(self.message_hook)
self.mav.idle_hooks.append(self.idle_hook)
self.hasInit = True
progress("Ready to start testing!")
self.progress("Ready to start testing!")
def close(self):
if self.use_map:
@ -116,10 +143,12 @@ class AutotestSub(Autotest):
util.pexpect_close(self.mavproxy)
util.pexpect_close(self.sitl)
valgrind_log = util.valgrind_log_filepath(binary=self.binary, model=self.frame)
valgrind_log = util.valgrind_log_filepath(binary=self.binary,
model=self.frame)
if os.path.exists(valgrind_log):
os.chmod(valgrind_log, 0o644)
shutil.copy(valgrind_log, util.reltopdir("../buildlogs/ArduSub-valgrind.log"))
shutil.copy(valgrind_log,
self.buildlogs_path("ArduSub-valgrind.log"))
def dive_manual(self):
self.set_rc(3, 1600)
@ -154,12 +183,12 @@ class AutotestSub(Autotest):
self.set_rc_default()
self.disarm_vehicle()
progress("Manual dive OK")
self.progress("Manual dive OK")
return True
def dive_mission(self, filename):
progress("Executing mission %s" % filename)
self.progress("Executing mission %s" % filename)
self.mavproxy.send('wp load %s\n' % filename)
self.mavproxy.expect('Flight plan received')
self.mavproxy.send('wp list\n')
@ -167,7 +196,7 @@ class AutotestSub(Autotest):
self.set_rc_default()
if not self.arm_vehicle():
progress("Failed to ARM")
self.progress("Failed to ARM")
return False
self.mavproxy.send('mode auto\n')
@ -178,7 +207,7 @@ class AutotestSub(Autotest):
self.disarm_vehicle()
progress("Mission OK")
self.progress("Mission OK")
return True
def autotest(self):
@ -189,37 +218,38 @@ class AutotestSub(Autotest):
failed = False
e = 'None'
try:
progress("Waiting for a heartbeat with mavlink protocol %s" % self.mav.WIRE_PROTOCOL_VERSION)
self.progress("Waiting for a heartbeat with mavlink protocol %s"
% self.mav.WIRE_PROTOCOL_VERSION)
self.mav.wait_heartbeat()
self.mavproxy.send('param set FS_GCS_ENABLE 0\n')
progress("Waiting for GPS fix")
self.progress("Waiting for GPS fix")
self.mav.wait_gps_fix()
# wait for EKF and GPS checks to pass
self.mavproxy.expect('IMU0 is using GPS')
self.homeloc = self.mav.location()
progress("Home location: %s" % self.homeloc)
self.progress("Home location: %s" % self.homeloc)
self.set_rc_default()
if not self.arm_vehicle():
progress("Failed to ARM")
self.progress("Failed to ARM")
failed = True
if not self.dive_manual():
progress("Failed manual dive")
self.progress("Failed manual dive")
failed = True
if not self.dive_mission(os.path.join(testdir, "sub_mission.txt")):
progress("Failed auto mission")
self.progress("Failed auto mission")
failed = True
if not self.log_download(util.reltopdir("../buildlogs/ArduSub-log.bin")):
progress("Failed log download")
if not self.log_download(self.buildlogs_path("ArduSub-log.bin")):
self.progress("Failed log download")
failed = True
except pexpect.TIMEOUT as e:
progress("Failed with timeout")
self.progress("Failed with timeout")
failed = True
self.close()
if failed:
progress("FAILED: %s" % e)
self.progress("FAILED: %s" % e)
return False
return True

View File

@ -261,27 +261,27 @@ def run_step(step):
fly_opts["speedup"] = opts.speedup
if step == 'fly.ArduCopter':
arducopter = AutotestCopter(binary, frame=opts.frame, **fly_opts)
arducopter = AutoTestCopter(binary, frame=opts.frame, **fly_opts)
return arducopter.autotest()
if step == 'fly.CopterAVC':
arducopter = AutotestCopter(binary, **fly_opts)
arducopter = AutoTestCopter(binary, **fly_opts)
return arducopter.autotest_heli()
if step == 'fly.ArduPlane':
arduplane = AutotestPlane(binary, **fly_opts)
arduplane = AutoTestPlane(binary, **fly_opts)
return arduplane.autotest()
if step == 'fly.QuadPlane':
quadplane = AutotestQuadPlane(binary, **fly_opts)
quadplane = AutoTestQuadPlane(binary, **fly_opts)
return quadplane.autotest()
if step == 'drive.APMrover2':
apmrover2 = AutotestRover(binary, frame=opts.frame, **fly_opts)
apmrover2 = AutoTestRover(binary, frame=opts.frame, **fly_opts)
return apmrover2.autotest()
if step == 'dive.ArduSub':
ardusub = AutotestSub(binary, **fly_opts)
ardusub = AutoTestSub(binary, **fly_opts)
return ardusub.autotest()
if step == 'build.All':

View File

@ -24,53 +24,11 @@ else:
ABC = abc.ABCMeta('ABC', (), {})
def progress(text):
"""Display autotest progress text."""
print("AUTOTEST: " + text)
class AutoTestTimeoutException(Exception):
pass
#################################################
# GENERAL UTILITIES
#################################################
def expect_list_clear():
"""clear the expect list."""
global expect_list
for p in expect_list[:]:
expect_list.remove(p)
def expect_list_extend(list_to_add):
"""Extend the expect list."""
global expect_list
expect_list.extend(list_to_add)
def idle_hook(mav):
"""Called when waiting for a mavlink message."""
global expect_list
for p in expect_list:
util.pexpect_drain(p)
def message_hook(mav, msg):
"""Called as each mavlink msg is received."""
idle_hook(mav)
def expect_callback(e):
"""Called when waiting for a expect pattern."""
global expect_list
for p in expect_list:
if p == e:
continue
util.pexpect_drain(p)
class Autotest(ABC):
class AutoTest(ABC):
"""Base abstract class.
It implements the common function for all vehicle types.
"""
@ -78,6 +36,55 @@ class Autotest(ABC):
self.mavproxy = None
self.mav = None
def progress(self, text):
"""Display autotest progress text."""
print("AUTOTEST: " + text)
# following two functions swiped from autotest.py:
def buildlogs_dirpath(self):
return os.getenv("BUILDLOGS", util.reltopdir("../buildlogs"))
def buildlogs_path(self, path):
'''return a string representing path in the buildlogs directory'''
bits = [self.buildlogs_dirpath()]
if isinstance(path, list):
bits.extend(path)
else:
bits.append(path)
return os.path.join(*bits)
#################################################
# GENERAL UTILITIES
#################################################
def expect_list_clear(self):
"""clear the expect list."""
global expect_list
for p in expect_list[:]:
expect_list.remove(p)
def expect_list_extend(self, list_to_add):
"""Extend the expect list."""
global expect_list
expect_list.extend(list_to_add)
def idle_hook(self, mav):
"""Called when waiting for a mavlink message."""
global expect_list
for p in expect_list:
util.pexpect_drain(p)
def message_hook(self, mav, msg):
"""Called as each mavlink msg is received."""
self.idle_hook(mav)
def expect_callback(self, e):
"""Called when waiting for a expect pattern."""
global expect_list
for p in expect_list:
if p == e:
continue
util.pexpect_drain(p)
#################################################
# SIM UTILITIES
#################################################
@ -89,18 +96,24 @@ class Autotest(ABC):
def sim_location(self):
"""Return current simulator location."""
m = self.mav.recv_match(type='SIMSTATE', blocking=True)
return mavutil.location(m.lat*1.0e-7, m.lng*1.0e-7, 0, math.degrees(m.yaw))
return mavutil.location(m.lat*1.0e-7,
m.lng*1.0e-7,
0,
math.degrees(m.yaw))
def save_wp(self):
"""Trigger RC 7 to save waypoint."""
self.mavproxy.send('rc 7 1000\n')
self.mav.recv_match(condition='RC_CHANNELS.chan7_raw==1000', blocking=True)
self.mav.recv_match(condition='RC_CHANNELS.chan7_raw==1000',
blocking=True)
self.wait_seconds(1)
self.mavproxy.send('rc 7 2000\n')
self.mav.recv_match(condition='RC_CHANNELS.chan7_raw==2000', blocking=True)
self.mav.recv_match(condition='RC_CHANNELS.chan7_raw==2000',
blocking=True)
self.wait_seconds(1)
self.mavproxy.send('rc 7 1000\n')
self.mav.recv_match(condition='RC_CHANNELS.chan7_raw==1000', blocking=True)
self.mav.recv_match(condition='RC_CHANNELS.chan7_raw==1000',
blocking=True)
self.wait_seconds(1)
def log_download(self, filename, timeout=360):
@ -154,7 +167,7 @@ class Autotest(ABC):
self.mavproxy.send('wp save %s\n' % filename)
self.mavproxy.expect('Saved ([0-9]+) waypoints')
num_wp = int(self.mavproxy.match.group(1))
progress("num_wp: %d" % num_wp)
self.progress("num_wp: %d" % num_wp)
return num_wp
def set_rc_default(self):
@ -171,21 +184,21 @@ class Autotest(ABC):
chan_pwm = getattr(m, "chan" + str(chan) + "_raw")
if chan_pwm == pwm:
return True
progress("Failed to send RC commands")
self.progress("Failed to send RC commands")
return False
def arm_vehicle(self):
"""Arm vehicle with mavlink arm message."""
self.mavproxy.send('arm throttle\n')
self.mav.motors_armed_wait()
progress("ARMED")
self.progress("ARMED")
return True
def disarm_vehicle(self):
"""Disarm vehicle with mavlink disarm message."""
self.mavproxy.send('disarm\n')
self.mav.motors_disarmed_wait()
progress("DISARMED")
self.progress("DISARMED")
return True
def set_parameter(self, name, value):
@ -197,7 +210,8 @@ class Autotest(ABC):
if float(returned_value) == float(value):
# yes, exactly equal.
break
progress("Param fetch returned incorrect value (%s) vs (%s)" % (returned_value, value))
self.progress("Param fetch returned incorrect value (%s) vs (%s)"
% (returned_value, value))
def get_parameter(self, name):
self.mavproxy.send("param fetch %s\n" % name)
@ -226,11 +240,13 @@ class Autotest(ABC):
def do_get_autopilot_capabilities(self):
self.mavproxy.send("long REQUEST_AUTOPILOT_CAPABILITIES 1\n")
m = self.mav.recv_match(type='AUTOPILOT_VERSION', blocking=True, timeout=10)
m = self.mav.recv_match(type='AUTOPILOT_VERSION',
blocking=True,
timeout=10)
if m is None:
progress("AUTOPILOT_VERSION not received")
self.progress("AUTOPILOT_VERSION not received")
return False
progress("AUTOPILOT_VERSION received")
self.progress("AUTOPILOT_VERSION received")
return True
def do_set_mode_via_command_long(self):
@ -238,8 +254,11 @@ class Autotest(ABC):
custom_mode = 4 # hold
start = time.time()
while time.time() - start < 5:
self.mavproxy.send("long DO_SET_MODE %u %u\n" % (base_mode, custom_mode))
m = self.mav.recv_match(type='HEARTBEAT', blocking=True, timeout=10)
self.mavproxy.send("long DO_SET_MODE %u %u\n" %
(base_mode, custom_mode))
m = self.mav.recv_match(type='HEARTBEAT',
blocking=True,
timeout=10)
if m is None:
return False
if m.custom_mode == custom_mode:
@ -257,22 +276,25 @@ class Autotest(ABC):
mavutil.mavlink.MAV_TYPE_TRICOPTER]:
self.mavproxy.send('rc 4 1580\n')
if not self.wait_heading(heading):
progress("Failed to reach heading")
self.progress("Failed to reach heading")
return False
self.mavproxy.send('rc 4 1500\n')
self.mav.recv_match(condition='RC_CHANNELS.chan4_raw==1500', blocking=True)
self.mav.recv_match(condition='RC_CHANNELS.chan4_raw==1500',
blocking=True)
if self.mav.mav_type == mavutil.mavlink.MAV_TYPE_FIXED_WING:
progress("NOT IMPLEMENTED")
self.progress("NOT IMPLEMENTED")
if self.mav.mav_type == mavutil.mavlink.MAV_TYPE_GROUND_ROVER:
self.mavproxy.send('rc 1 1700\n')
self.mavproxy.send('rc 3 1550\n')
if not self.wait_heading(heading):
progress("Failed to reach heading")
self.progress("Failed to reach heading")
return False
self.mavproxy.send('rc 3 1500\n')
self.mav.recv_match(condition='RC_CHANNELS.chan3_raw==1500', blocking=True)
self.mav.recv_match(condition='RC_CHANNELS.chan3_raw==1500',
blocking=True)
self.mavproxy.send('rc 1 1500\n')
self.mav.recv_match(condition='RC_CHANNELS.chan1_raw==1500', blocking=True)
self.mav.recv_match(condition='RC_CHANNELS.chan1_raw==1500',
blocking=True)
return True
def reach_distance_manual(self, distance):
@ -285,19 +307,21 @@ class Autotest(ABC):
mavutil.mavlink.MAV_TYPE_TRICOPTER]:
self.mavproxy.send('rc 2 1350\n')
if not self.wait_distance(distance, accuracy=5, timeout=60):
progress("Failed to reach distance of %u" % distance)
self.progress("Failed to reach distance of %u" % distance)
return False
self.mavproxy.send('rc 2 1500\n')
self.mav.recv_match(condition='RC_CHANNELS.chan2_raw==1500', blocking=True)
self.mav.recv_match(condition='RC_CHANNELS.chan2_raw==1500',
blocking=True)
if self.mav.mav_type == mavutil.mavlink.MAV_TYPE_FIXED_WING:
progress("NOT IMPLEMENTED")
self.progress("NOT IMPLEMENTED")
if self.mav.mav_type == mavutil.mavlink.MAV_TYPE_GROUND_ROVER:
self.mavproxy.send('rc 3 1700\n')
if not self.wait_distance(distance, accuracy=2):
progress("Failed to reach distance of %u" % distance)
self.progress("Failed to reach distance of %u" % distance)
return False
self.mavproxy.send('rc 3 1500\n')
self.mav.recv_match(condition='RC_CHANNELS.chan3_raw==1500', blocking=True)
self.mav.recv_match(condition='RC_CHANNELS.chan3_raw==1500',
blocking=True)
return True
#################################################
@ -316,71 +340,76 @@ class Autotest(ABC):
previous_alt = 0
tstart = self.get_sim_time()
progress("Waiting for altitude between %u and %u" % (alt_min, alt_max))
self.progress("Waiting for altitude between %u and %u" %
(alt_min, alt_max))
while self.get_sim_time() < tstart + timeout:
m = self.mav.recv_match(type='VFR_HUD', blocking=True)
climb_rate = m.alt - previous_alt
previous_alt = m.alt
progress("Wait Altitude: Cur:%u, min_alt:%u, climb_rate: %u" % (m.alt, alt_min, climb_rate))
self.progress("Wait Altitude: Cur:%u, min_alt:%u, climb_rate: %u"
% (m.alt, alt_min, climb_rate))
if m.alt >= alt_min and m.alt <= alt_max:
progress("Altitude OK")
self.progress("Altitude OK")
return True
progress("Failed to attain altitude range")
self.progress("Failed to attain altitude range")
return False
def wait_groundspeed(self, gs_min, gs_max, timeout=30):
"""Wait for a given ground speed range."""
tstart = self.get_sim_time()
progress("Waiting for groundspeed between %.1f and %.1f" % (gs_min, gs_max))
self.progress("Waiting for groundspeed between %.1f and %.1f" %
(gs_min, gs_max))
while self.get_sim_time() < tstart + timeout:
m = self.mav.recv_match(type='VFR_HUD', blocking=True)
progress("Wait groundspeed %.1f, target:%.1f" % (m.groundspeed, gs_min))
self.progress("Wait groundspeed %.1f, target:%.1f" %
(m.groundspeed, gs_min))
if m.groundspeed >= gs_min and m.groundspeed <= gs_max:
return True
progress("Failed to attain groundspeed range")
self.progress("Failed to attain groundspeed range")
return False
def wait_roll(self, roll, accuracy, timeout=30):
"""Wait for a given roll in degrees."""
tstart = self.get_sim_time()
progress("Waiting for roll of %d at %s" % (roll, time.ctime()))
self.progress("Waiting for roll of %d at %s" % (roll, time.ctime()))
while self.get_sim_time() < tstart + timeout:
m = self.mav.recv_match(type='ATTITUDE', blocking=True)
p = math.degrees(m.pitch)
r = math.degrees(m.roll)
progress("Roll %d Pitch %d" % (r, p))
self.progress("Roll %d Pitch %d" % (r, p))
if math.fabs(r - roll) <= accuracy:
progress("Attained roll %d" % roll)
self.progress("Attained roll %d" % roll)
return True
progress("Failed to attain roll %d" % roll)
self.progress("Failed to attain roll %d" % roll)
return False
def wait_pitch(self, pitch, accuracy, timeout=30):
"""Wait for a given pitch in degrees."""
tstart = self.get_sim_time()
progress("Waiting for pitch of %u at %s" % (pitch, time.ctime()))
self.progress("Waiting for pitch of %u at %s" % (pitch, time.ctime()))
while self.get_sim_time() < tstart + timeout:
m = self.mav.recv_match(type='ATTITUDE', blocking=True)
p = math.degrees(m.pitch)
r = math.degrees(m.roll)
progress("Pitch %d Roll %d" % (p, r))
self.progress("Pitch %d Roll %d" % (p, r))
if math.fabs(p - pitch) <= accuracy:
progress("Attained pitch %d" % pitch)
self.progress("Attained pitch %d" % pitch)
return True
progress("Failed to attain pitch %d" % pitch)
self.progress("Failed to attain pitch %d" % pitch)
return False
def wait_heading(self, heading, accuracy=5, timeout=30):
"""Wait for a given heading."""
tstart = self.get_sim_time()
progress("Waiting for heading %u with accuracy %u" % (heading, accuracy))
self.progress("Waiting for heading %u with accuracy %u" %
(heading, accuracy))
while self.get_sim_time() < tstart + timeout:
m = self.mav.recv_match(type='VFR_HUD', blocking=True)
progress("Heading %u" % m.heading)
self.progress("Heading %u" % m.heading)
if math.fabs(m.heading - heading) <= accuracy:
progress("Attained heading %u" % heading)
self.progress("Attained heading %u" % heading)
return True
progress("Failed to attain heading %u" % heading)
self.progress("Failed to attain heading %u" % heading)
return False
def wait_distance(self, distance, accuracy=5, timeout=30):
@ -390,36 +419,49 @@ class Autotest(ABC):
while self.get_sim_time() < tstart + timeout:
pos = self.mav.location()
delta = self.get_distance(start, pos)
progress("Distance %.2f meters" % delta)
self.progress("Distance %.2f meters" % delta)
if math.fabs(delta - distance) <= accuracy:
progress("Attained distance %.2f meters OK" % delta)
self.progress("Attained distance %.2f meters OK" % delta)
return True
if delta > (distance + accuracy):
progress("Failed distance - overshoot delta=%f distance=%f" % (delta, distance))
self.progress("Failed distance - overshoot delta=%f dist=%f"
% (delta, distance))
return False
progress("Failed to attain distance %u" % distance)
self.progress("Failed to attain distance %u" % distance)
return False
def wait_location(self, loc, accuracy=5, timeout=30, target_altitude=None, height_accuracy=-1):
def wait_location(self,
loc,
accuracy=5,
timeout=30,
target_altitude=None,
height_accuracy=-1):
"""Wait for arrival at a location."""
tstart = self.get_sim_time()
if target_altitude is None:
target_altitude = loc.alt
progress("Waiting for location %.4f,%.4f at altitude %.1f height_accuracy=%.1f" % (
loc.lat, loc.lng, target_altitude, height_accuracy))
self.progress("Waiting for location"
"%.4f,%.4f at altitude %.1f height_accuracy=%.1f" %
(loc.lat, loc.lng, target_altitude, height_accuracy))
while self.get_sim_time() < tstart + timeout:
pos = self.mav.location()
delta = self.get_distance(loc, pos)
progress("Distance %.2f meters alt %.1f" % (delta, pos.alt))
self.progress("Distance %.2f meters alt %.1f" % (delta, pos.alt))
if delta <= accuracy:
if height_accuracy != -1 and math.fabs(pos.alt - target_altitude) > height_accuracy:
height_delta = math.fabs(pos.alt - target_altitude)
if (height_accuracy != -1 and height_delta > height_accuracy):
continue
progress("Reached location (%.2f meters)" % delta)
self.progress("Reached location (%.2f meters)" % delta)
return True
progress("Failed to attain location")
self.progress("Failed to attain location")
return False
def wait_waypoint(self, wpnum_start, wpnum_end, allow_skip=True, max_dist=2, timeout=400):
def wait_waypoint(self,
wpnum_start,
wpnum_end,
allow_skip=True,
max_dist=2,
timeout=400):
"""Wait for waypoint ranges."""
tstart = self.get_sim_time()
# this message arrives after we set the current WP
@ -427,52 +469,60 @@ class Autotest(ABC):
current_wp = start_wp
mode = self.mav.flightmode
progress("\ntest: wait for waypoint ranges start=%u end=%u\n\n" % (wpnum_start, wpnum_end))
self.progress("\ntest: wait for waypoint ranges start=%u end=%u\n\n"
% (wpnum_start, wpnum_end))
# if start_wp != wpnum_start:
# progress("test: Expected start waypoint %u but got %u" % (wpnum_start, start_wp))
# self.progress("test: Expected start waypoint %u but got %u" %
# (wpnum_start, start_wp))
# return False
while self.get_sim_time() < tstart + timeout:
seq = self.mav.waypoint_current()
m = self.mav.recv_match(type='NAV_CONTROLLER_OUTPUT', blocking=True)
m = self.mav.recv_match(type='NAV_CONTROLLER_OUTPUT',
blocking=True)
wp_dist = m.wp_dist
m = self.mav.recv_match(type='VFR_HUD', blocking=True)
# if we changed mode, fail
if self.mav.flightmode != mode:
progress('Exited %s mode' % mode)
self.progress('Exited %s mode' % mode)
return False
progress("test: WP %u (wp_dist=%u Alt=%d), current_wp: %u, wpnum_end: %u" % (seq, wp_dist, m.alt, current_wp, wpnum_end))
self.progress("test: WP %u (wp_dist=%u Alt=%d), current_wp: %u,"
"wpnum_end: %u" %
(seq, wp_dist, m.alt, current_wp, wpnum_end))
if seq == current_wp+1 or (seq > current_wp+1 and allow_skip):
progress("test: Starting new waypoint %u" % seq)
self.progress("test: Starting new waypoint %u" % seq)
tstart = self.get_sim_time()
current_wp = seq
# the wp_dist check is a hack until we can sort out the right seqnum
# for end of mission
# if current_wp == wpnum_end or (current_wp == wpnum_end-1 and wp_dist < 2):
# the wp_dist check is a hack until we can sort out
# the right seqnum for end of mission
# if current_wp == wpnum_end or (current_wp == wpnum_end-1 and
# wp_dist < 2):
if current_wp == wpnum_end and wp_dist < max_dist:
progress("Reached final waypoint %u" % seq)
self.progress("Reached final waypoint %u" % seq)
return True
if seq >= 255:
progress("Reached final waypoint %u" % seq)
self.progress("Reached final waypoint %u" % seq)
return True
if seq > current_wp+1:
progress("Failed: Skipped waypoint! Got wp %u expected %u" % (seq, current_wp+1))
self.progress("Failed: Skipped waypoint! Got wp %u expected %u"
% (seq, current_wp+1))
return False
progress("Failed: Timed out waiting for waypoint %u of %u" % (wpnum_end, wpnum_end))
self.progress("Failed: Timed out waiting for waypoint %u of %u" %
(wpnum_end, wpnum_end))
return False
def wait_mode(self, mode, timeout=None):
"""Wait for mode to change."""
progress("Waiting for mode %s" % mode)
self.progress("Waiting for mode %s" % mode)
tstart = self.get_sim_time()
hastimeout = False
while self.mav.flightmode.upper() != mode.upper() and not hastimeout:
if timeout is not None:
hastimeout = self.get_sim_time() > tstart + timeout
self.mav.wait_heartbeat()
progress("Got mode %s" % mode)
self.progress("Got mode %s" % mode)
return self.mav.flightmode
def wait_ready_to_arm(self, timeout=None):
@ -484,16 +534,17 @@ class Autotest(ABC):
tstart = self.get_sim_time()
required_value = 831
progress("Waiting for EKF value %u" % required_value)
self.progress("Waiting for EKF value %u" % required_value)
while timeout is None or self.get_sim_time() < tstart + timeout:
m = self.mav.recv_match(type='EKF_STATUS_REPORT', blocking=True)
current = m.flags
if (tstart - self.get_sim_time()) % 5 == 0:
progress("Wait EKF.flags: required:%u current:%u" % (required_value, current))
self.progress("Wait EKF.flags: required:%u current:%u" %
(required_value, current))
if current == required_value:
progress("EKF Flags OK")
self.progress("EKF Flags OK")
return
progress("Failed to get EKF.flags=%u" % required_value)
self.progress("Failed to get EKF.flags=%u" % required_value)
raise AutoTestTimeoutException()
@abc.abstractmethod
@ -506,50 +557,53 @@ class Autotest(ABC):
# sucess = True
# # TEST ARMING/DISARM
# if not self.arm_vehicle():
# progress("Failed to ARM")
# self.progress("Failed to ARM")
# sucess = False
# if not self.disarm_vehicle():
# progress("Failed to DISARM")
# self.progress("Failed to DISARM")
# sucess = False
# if not self.test_arm_motors_radio():
# progress("Failed to ARM with radio")
# self.progress("Failed to ARM with radio")
# sucess = False
# if not self.test_disarm_motors_radio():
# progress("Failed to ARM with radio")
# self.progress("Failed to ARM with radio")
# sucess = False
# if not self.test_autodisarm_motors():
# progress("Failed to AUTO DISARM")
# self.progress("Failed to AUTO DISARM")
# sucess = False
# # TODO: Test failure on arm (with arming check)
# # TEST MISSION FILE
# # TODO : rework that to work on autotest server
# # progress("TEST LOADING MISSION")
# # num_wp = self.load_mission_from_file(os.path.join(testdir, "fake_mission.txt"))
# # self.progress("TEST LOADING MISSION")
# # num_wp = self.load_mission_from_file(
# os.path.join(testdir, "fake_mission.txt"))
# # if num_wp == 0:
# # progress("Failed to load all_msg_mission")
# # self.progress("Failed to load all_msg_mission")
# # sucess = False
# #
# # progress("TEST SAVING MISSION")
# # self.progress("TEST SAVING MISSION")
# # num_wp_old = num_wp
# # num_wp = self.save_mission_to_file(os.path.join(testdir, "fake_mission2.txt"))
# # num_wp = self.save_mission_to_file(os.path.join(testdir,
# "fake_mission2.txt"))
# # if num_wp != num_wp_old:
# # progress("Failed to save all_msg_mission")
# # self.progress("Failed to save all_msg_mission")
# # sucess = False
#
# progress("TEST CLEARING MISSION")
# self.progress("TEST CLEARING MISSION")
# self.mavproxy.send("wp clear\n")
# self.mavproxy.send('wp list\n')
# self.mavproxy.expect('Requesting [0-9]+ waypoints')
# num_wp = mavwp.MAVWPLoader().count()
# if num_wp != 0:
# progress("Failed to clear mission ")
# self.progress("Failed to clear mission ")
# sucess = False
#
# return sucess
#
# # TESTS FAILSAFE
# @abc.abstractmethod
# def test_throttle_failsafe(self, home, distance_min=10, side=60, timeout=180):
# def test_throttle_failsafe(self, home, distance_min=10, side=60,
# timeout=180):
# """Test that RTL success in case of thottle failsafe."""
# pass
#
@ -576,16 +630,17 @@ class Autotest(ABC):
# @abc.abstractmethod
# def test_rtl(self, home, distance_min=10, timeout=250):
# """Test that RTL success."""
# progress("# Enter RTL")
# self.progress("# Enter RTL")
# self.mavproxy.send('switch 3\n')
# tstart = self.get_sim_time()
# while self.get_sim_time() < tstart + timeout:
# m = self.mav.recv_match(type='VFR_HUD', blocking=True)
# pos = self.mav.location()
# home_distance = self.get_distance(home, pos)
# progress("Alt: %u HomeDistance: %.0f" % (m.alt, home_distance))
# self.progress("Alt: %u HomeDistance: %.0f" %
# (m.alt, home_distance))
# if m.alt <= 1 and home_distance < distance_min:
# progress("RTL Complete")
# self.progress("RTL Complete")
# return True
# return False
#

View File

@ -7,7 +7,7 @@ import pexpect
import shutil
from pymavlink import mavutil
from common import *
from common import AutoTest
from pysim import util
# get location of scripts
@ -18,11 +18,21 @@ FENCE = 'ArduPlane-Missions/Dalby-OBC2016-fence.txt'
WIND = "0,180,0.2" # speed,direction,variance
class AutotestQuadPlane(Autotest):
def __init__(self, binary, viewerip=None, use_map=False, valgrind=False, gdb=False, speedup=10, frame=None, params=None, gdbserver=False):
super(AutotestQuadPlane, self).__init__()
class AutoTestQuadPlane(AutoTest):
def __init__(self,
binary,
viewerip=None,
use_map=False,
valgrind=False,
gdb=False,
speedup=10,
frame=None,
params=None,
gdbserver=False):
super(AutoTestQuadPlane, self).__init__()
self.binary = binary
self.options = '--sitl=127.0.0.1:5501 --out=127.0.0.1:19550 --streamrate=10'
self.options = ('--sitl=127.0.0.1:5501 --out=127.0.0.1:19550'
' --streamrate=10')
self.viewerip = viewerip
self.use_map = use_map
self.valgrind = valgrind
@ -31,7 +41,10 @@ class AutotestQuadPlane(Autotest):
self.params = params
self.gdbserver = gdbserver
self.home = "%f,%f,%u,%u" % (HOME.lat, HOME.lng, HOME.alt, HOME.heading)
self.home = "%f,%f,%u,%u" % (HOME.lat,
HOME.lng,
HOME.alt,
HOME.heading)
self.homeloc = None
self.speedup = speedup
self.speedup_default = 10
@ -53,16 +66,24 @@ class AutotestQuadPlane(Autotest):
if self.use_map:
self.options += ' --map'
self.sitl = util.start_SITL(self.binary, wipe=True, model=self.frame, home=self.home, speedup=self.speedup,
defaults_file=os.path.join(testdir, 'default_params/quadplane.parm'),
valgrind=self.valgrind, gdb=self.gdb, gdbserver=self.gdbserver)
self.mavproxy = util.start_MAVProxy_SITL('QuadPlane', options=self.options)
defaults_file = os.path.join(testdir, 'default_params/quadplane.parm')
self.sitl = util.start_SITL(self.binary,
wipe=True,
model=self.frame,
home=self.home,
speedup=self.speedup,
defaults_file=defaults_file,
valgrind=self.valgrind,
gdb=self.gdb,
gdbserver=self.gdbserver)
self.mavproxy = util.start_MAVProxy_SITL('QuadPlane',
options=self.options)
self.mavproxy.expect('Telemetry log: (\S+)')
logfile = self.mavproxy.match.group(1)
progress("LOGFILE %s" % logfile)
self.progress("LOGFILE %s" % logfile)
buildlog = util.reltopdir("../buildlogs/QuadPlane-test.tlog")
progress("buildlog=%s" % buildlog)
self.progress("buildlog=%s" % buildlog)
if os.path.exists(buildlog):
os.unlink(buildlog)
try:
@ -72,23 +93,26 @@ class AutotestQuadPlane(Autotest):
self.mavproxy.expect('Received [0-9]+ parameters')
util.expect_setup_callback(self.mavproxy, expect_callback)
util.expect_setup_callback(self.mavproxy, self.expect_callback)
expect_list_clear()
expect_list_extend([self.sitl, self.mavproxy])
self.expect_list_clear()
self.expect_list_extend([self.sitl, self.mavproxy])
progress("Started simulator")
self.progress("Started simulator")
# get a mavlink connection going
connection_string = '127.0.0.1:19550'
try:
self.mav = mavutil.mavlink_connection('127.0.0.1:19550', robust_parsing=True)
self.mav = mavutil.mavlink_connection(connection_string,
robust_parsing=True)
except Exception as msg:
progress("Failed to start mavlink connection on 127.0.0.1:19550" % msg)
self.progress("Failed to start mavlink connection on %s: %s" %
(connection_string, msg))
raise
self.mav.message_hooks.append(message_hook)
self.mav.idle_hooks.append(idle_hook)
self.mav.message_hooks.append(self.message_hook)
self.mav.idle_hooks.append(self.idle_hook)
self.hasInit = True
progress("Ready to start testing!")
self.progress("Ready to start testing!")
def close(self):
if self.use_map:
@ -99,10 +123,12 @@ class AutotestQuadPlane(Autotest):
util.pexpect_close(self.mavproxy)
util.pexpect_close(self.sitl)
valgrind_log = util.valgrind_log_filepath(binary=self.binary, model=self.frame)
valgrind_log = util.valgrind_log_filepath(binary=self.binary,
model=self.frame)
if os.path.exists(valgrind_log):
os.chmod(valgrind_log, 0o644)
shutil.copy(valgrind_log, util.reltopdir("../buildlogs/QuadPlane-valgrind.log"))
shutil.copy(valgrind_log,
self.buildlogs_path("QuadPlane-valgrind.log"))
# def test_arm_motors_radio(self):
# super(AutotestQuadPlane, self).test_arm_motors_radio()
@ -114,17 +140,20 @@ class AutotestQuadPlane(Autotest):
# super(AutotestQuadPlane, self).test_autodisarm_motors()
#
# def test_rtl(self, home, distance_min=10, timeout=250):
# super(AutotestQuadPlane, self).test_rtl(home, distance_min=10, timeout=250)
# super(AutotestQuadPlane, self).test_rtl(home,
# distance_min=10, timeout=250)
#
# def test_throttle_failsafe(self, home, distance_min=10, side=60, timeout=180):
# super(AutotestQuadPlane, self).test_throttle_failsafe(home, distance_min=10, side=60, timeout=180)
# def test_throttle_failsafe(self, home, distance_min=10,
# side=60, timeout=180):
# super(AutotestQuadPlane, self).test_throttle_failsafe(home,
# distance_min=10, side=60, timeout=180)
#
# def test_mission(self, filename):
# super(AutotestQuadPlane, self).test_mission(filename)
def fly_mission(self, filename, fence, height_accuracy=-1):
"""Fly a mission from a file."""
progress("Flying mission %s" % filename)
self.progress("Flying mission %s" % filename)
self.mavproxy.send('wp load %s\n' % filename)
self.mavproxy.expect('Flight plan received')
self.mavproxy.send('fence load %s\n' % fence)
@ -141,7 +170,7 @@ class AutotestQuadPlane(Autotest):
if not self.wait_waypoint(20, 34, max_dist=60, timeout=1200):
return False
self.mavproxy.expect('DISARMED')
progress("Mission OK")
self.progress("Mission OK")
return True
def autotest(self):
@ -153,33 +182,36 @@ class AutotestQuadPlane(Autotest):
failed = False
e = 'None'
try:
progress("Waiting for a heartbeat with mavlink protocol %s" % self.mav.WIRE_PROTOCOL_VERSION)
self.progress("Waiting for a heartbeat with mavlink protocol %s"
% self.mav.WIRE_PROTOCOL_VERSION)
self.mav.wait_heartbeat()
progress("Waiting for GPS fix")
self.progress("Waiting for GPS fix")
self.mav.recv_match(condition='VFR_HUD.alt>10', blocking=True)
self.mav.wait_gps_fix()
while self.mav.location().alt < 10:
self.mav.wait_gps_fix()
self.homeloc = self.mav.location()
progress("Home location: %s" % self.homeloc)
self.progress("Home location: %s" % self.homeloc)
# wait for EKF and GPS checks to pass
progress("Waiting reading for arm")
self.progress("Waiting reading for arm")
self.wait_seconds(30)
self.arm_vehicle()
if not self.fly_mission(os.path.join(testdir, "ArduPlane-Missions/Dalby-OBC2016.txt"),
os.path.join(testdir, "ArduPlane-Missions/Dalby-OBC2016-fence.txt")):
progress("Failed mission")
m = os.path.join(testdir, "ArduPlane-Missions/Dalby-OBC2016.txt")
f = os.path.join(testdir,
"ArduPlane-Missions/Dalby-OBC2016-fence.txt")
if not self.fly_mission(m, f):
self.progress("Failed mission")
failed = True
except pexpect.TIMEOUT as e:
progress("Failed with timeout")
self.progress("Failed with timeout")
failed = True
self.close()
if failed:
progress("FAILED: %s" % e)
self.progress("FAILED: %s" % e)
return False
return True