Tools: use base class to share code between vehicle tests

Autotest: common.py add new base class and test

Autotest : Common add heartbeat handling on wait_mode timeout

Autotest: common.py add reach_heading_manual and reach_distance_manual

Autotest: new autotest implementation for Rover

Autotest : rover add drive_square test

Autotest: new autotest implementation for Copter

Autotest: new autotest implementation for Plane

Autotest: new autotest implementation for Sub

Autotest: new autotest implementation

Autotest: new autotest implementation for QuadPlane

Autotest : Sub disable GCS_Failsafe on autotest to prevent timeout. The failsafe aren't currently tested on Sub
This commit is contained in:
Pierre Kancir 2018-03-05 16:14:34 +01:00 committed by Randy Mackay
parent 5b7116bbbd
commit 259dda810d
7 changed files with 3167 additions and 2759 deletions

View File

@ -2,363 +2,574 @@
# Drive APMrover2 in SITL # Drive APMrover2 in SITL
from __future__ import print_function from __future__ import print_function
import os
import shutil import shutil
import pexpect import pexpect
from common import * from common import *
from pysim import util from pysim import util
from pysim import vehicleinfo from pysim import vehicleinfo
from pymavlink import mavutil, mavwp
# get location of scripts # get location of scripts
testdir = os.path.dirname(os.path.realpath(__file__)) testdir = os.path.dirname(os.path.realpath(__file__))
#################################################
# CONFIG
#################################################
# HOME = mavutil.location(-35.362938, 149.165085, 584, 270) # HOME = mavutil.location(-35.362938, 149.165085, 584, 270)
HOME = mavutil.location(40.071374969556928, -105.22978898137808, 1583.702759, 246) HOME = mavutil.location(40.071374969556928, -105.22978898137808, 1583.702759, 246)
homeloc = None
num_wp = 0
speedup_default = 10
########################################################## class AutotestRover(Autotest):
# TESTS DRIVE def __init__(self, binary, viewerip=None, use_map=False, valgrind=False, gdb=False, speedup=10, frame=None, params=None, gdbserver=False):
########################################################## super(AutotestRover, self).__init__()
def drive_left_circuit(mavproxy, mav): self.binary = binary
"""Drive a left circuit, 50m on a side.""" self.options = '--sitl=127.0.0.1:5501 --out=127.0.0.1:19550 --streamrate=10'
mavproxy.send('switch 6\n') self.viewerip = viewerip
wait_mode(mav, 'MANUAL') self.use_map = use_map
set_rc(mavproxy, mav, 3, 2000) self.valgrind = valgrind
self.gdb = gdb
self.frame = frame
self.params = params
self.gdbserver = gdbserver
progress("Driving left circuit") self.home = "%f,%f,%u,%u" % (HOME.lat, HOME.lng, HOME.alt, HOME.heading)
# do 4 turns self.homeloc = None
for i in range(0, 4): self.speedup = speedup
# hard left self.speedup_default = 10
progress("Starting turn %u" % i)
set_rc(mavproxy, mav, 1, 1000) self.sitl = None
if not wait_heading(mav, 270 - (90*i), accuracy=10): self.hasInit = False
def init(self):
if self.frame is None:
self.frame = 'rover'
if self.viewerip:
self.options += " --out=%s:14550" % self.viewerip
if self.use_map:
self.options += ' --map'
self.sitl = util.start_SITL(self.binary, wipe=True, model=self.frame, home=self.home,
speedup=self.speedup_default)
self.mavproxy = util.start_MAVProxy_SITL('APMrover2')
progress("WAITING FOR PARAMETERS")
self.mavproxy.expect('Received [0-9]+ parameters')
# setup test parameters
vinfo = vehicleinfo.VehicleInfo()
if self.params is None:
self.params = vinfo.options["APMrover2"]["frames"][self.frame]["default_params_filename"]
if not isinstance(self.params, list):
self.params = [self.params]
for x in self.params:
self.mavproxy.send("param load %s\n" % os.path.join(testdir, x))
self.mavproxy.expect('Loaded [0-9]+ parameters')
self.set_parameter('LOG_REPLAY', 1)
self.set_parameter('LOG_DISARMED', 1)
progress("RELOADING SITL WITH NEW PARAMETERS")
# restart with new parms
util.pexpect_close(self.mavproxy)
util.pexpect_close(self.sitl)
self.sitl = util.start_SITL(self.binary, model=self.frame, home=self.home, speedup=self.speedup,
valgrind=self.valgrind, gdb=self.gdb, gdbserver=self.gdbserver)
self.mavproxy = util.start_MAVProxy_SITL('APMrover2', options=self.options)
self.mavproxy.expect('Telemetry log: (\S+)')
logfile = self.mavproxy.match.group(1)
progress("LOGFILE %s" % logfile)
buildlog = util.reltopdir("../buildlogs/APMrover2-test.tlog")
progress("buildlog=%s" % buildlog)
if os.path.exists(buildlog):
os.unlink(buildlog)
try:
os.link(logfile, buildlog)
except Exception:
pass
self.mavproxy.expect('Received [0-9]+ parameters')
util.expect_setup_callback(self.mavproxy, expect_callback)
expect_list_clear()
expect_list_extend([self.sitl, self.mavproxy])
progress("Started simulator")
# get a mavlink connection going
try:
self.mav = mavutil.mavlink_connection('127.0.0.1:19550', robust_parsing=True)
except Exception as msg:
progress("Failed to start mavlink connection on 127.0.0.1:19550" % msg)
raise
self.mav.message_hooks.append(message_hook)
self.mav.idle_hooks.append(idle_hook)
self.hasInit = True
progress("Ready to start testing!")
def close(self):
if self.use_map:
self.mavproxy.send("module unload map\n")
self.mavproxy.expect("Unloaded module map")
self.mav.close()
util.pexpect_close(self.mavproxy)
util.pexpect_close(self.sitl)
valgrind_log = util.valgrind_log_filepath(binary=self.binary, model=self.frame)
if os.path.exists(valgrind_log):
os.chmod(valgrind_log, 0o644)
shutil.copy(valgrind_log, util.reltopdir("../buildlogs/APMrover2-valgrind.log"))
# def reset_and_arm(self):
# """Reset RC, set to MANUAL and arm."""
# self.mav.wait_heartbeat()
# # ensure all sticks in the middle
# self.set_rc_default()
# self.mavproxy.send('switch 1\n')
# self.mav.wait_heartbeat()
# self.disarm_vehicle()
# self.mav.wait_heartbeat()
# self.arm_vehicle()
#
# # TEST ARM RADIO
# def test_arm_motors_radio(self):
# """Test Arming motors with radio."""
# progress("Test arming motors with radio")
# self.mavproxy.send('switch 6\n') # stabilize/manual mode
# self.wait_mode('MANUAL')
# self.mavproxy.send('rc 3 1500\n') # throttle at zero
# self.mavproxy.send('rc 1 2000\n') # steer full right
# self.mavproxy.expect('APM: Throttle armed')
# self.mavproxy.send('rc 1 1500\n')
#
# self.mav.motors_armed_wait()
# progress("MOTORS ARMED OK")
# return True
#
# # TEST DISARM RADIO
# def test_disarm_motors_radio(self):
# """Test Disarm motors with radio."""
# progress("Test disarming motors with radio")
# self.mavproxy.send('switch 6\n') # stabilize/manual mode
# self.wait_mode('MANUAL')
# self.mavproxy.send('rc 3 1500\n') # throttle at zero
# self.mavproxy.send('rc 1 1000\n') # steer full right
# tstart = self.get_sim_time()
# self.mav.wait_heartbeat()
# timeout = 15
# while self.get_sim_time() < tstart + timeout:
# self.mav.wait_heartbeat()
# if not self.mav.motors_armed():
# disarm_delay = self.get_sim_time() - tstart
# progress("MOTORS DISARMED OK WITH RADIO")
# self.mavproxy.send('rc 1 1500\n') # steer full right
# self.mavproxy.send('rc 4 1500\n') # yaw full right
# progress("Disarm in %ss" % disarm_delay)
# return True
# progress("FAILED TO DISARM WITH RADIO")
# return False
#
# # TEST AUTO DISARM
# def test_autodisarm_motors(self):
# """Test Autodisarm motors."""
# progress("Test Autodisarming motors")
# self.mavproxy.send('switch 6\n') # stabilize/manual mode
# # NOT IMPLEMENTED ON ROVER
# progress("MOTORS AUTODISARMED OK")
# return True
#
# # TEST RC OVERRIDE
# # TEST RC OVERRIDE TIMEOUT
# def test_rtl(self, home, distance_min=5, timeout=250):
# """Return, land."""
# super(AutotestRover, self).test_rtl(home, distance_min, timeout)
#
# def test_mission(self, filename):
# """Test a mission from a file."""
# progress("Test mission %s" % filename)
# num_wp = self.load_mission_from_file(filename)
# self.mavproxy.send('wp set 1\n')
# self.mav.wait_heartbeat()
# self.mavproxy.send('switch 4\n') # auto mode
# self.wait_mode('AUTO')
# ret = self.wait_waypoint(0, num_wp-1, max_dist=5, timeout=500)
#
# if ret:
# self.mavproxy.expect("Mission Complete")
# self.mav.wait_heartbeat()
# self.wait_mode('HOLD')
# progress("test: MISSION COMPLETE: passed=%s" % ret)
# return ret
##########################################################
# TESTS DRIVE
##########################################################
# Drive a square in manual mode
def drive_square(self, side=50):
"""Drive a square, Driving N then E ."""
progress("TEST SQUARE")
success = True
# use LEARNING Mode
self.mavproxy.send('switch 5\n')
self.wait_mode('MANUAL')
# first aim north
progress("\nTurn right towards north")
if not self.reach_heading_manual(10):
success = False
# save bottom left corner of box as waypoint
progress("Save WP 1 & 2")
self.save_wp()
# pitch forward to fly north
progress("\nGoing north %u meters" % side)
if not self.reach_distance_manual(side):
success = False
# save top left corner of square as waypoint
progress("Save WP 3")
self.save_wp()
# roll right to fly east
progress("\nGoing east %u meters" % side)
if not self.reach_heading_manual(100):
success = False
if not self.reach_distance_manual(side):
success = False
# save top right corner of square as waypoint
progress("Save WP 4")
self.save_wp()
# pitch back to fly south
progress("\nGoing south %u meters" % side)
if not self.reach_heading_manual(190):
success = False
if not self.reach_distance_manual(side):
success = False
# save bottom right corner of square as waypoint
progress("Save WP 5")
self.save_wp()
# roll left to fly west
progress("\nGoing west %u meters" % side)
if not self.reach_heading_manual(280):
success = False
if not self.reach_distance_manual(side):
success = False
# save bottom left corner of square (should be near home) as waypoint
progress("Save WP 6")
self.save_wp()
return success
def drive_left_circuit(self):
"""Drive a left circuit, 50m on a side."""
self.mavproxy.send('switch 6\n')
self.wait_mode('MANUAL')
self.set_rc(3, 2000)
progress("Driving left circuit")
# do 4 turns
for i in range(0, 4):
# hard left
progress("Starting turn %u" % i)
self.set_rc(1, 1000)
if not self.wait_heading(270 - (90*i), accuracy=10):
return False
self.set_rc(1, 1500)
progress("Starting leg %u" % i)
if not self.wait_distance(50, accuracy=7):
return False
self.set_rc(3, 1500)
progress("Circuit complete")
return True
# def test_throttle_failsafe(self, home, distance_min=10, side=60, timeout=300):
# """Fly east, Failsafe, return, land."""
#
# self.mavproxy.send('switch 6\n') # manual mode
# self.wait_mode('MANUAL')
# self.mavproxy.send("param set FS_ACTION 1\n")
#
# # first aim east
# progress("turn east")
# if not self.reach_heading_manual(135):
# return False
#
# # fly east 60 meters
# progress("# Going forward %u meters" % side)
# if not self.reach_distance_manual(side):
# return False
#
# # pull throttle low
# progress("# Enter Failsafe")
# self.mavproxy.send('rc 3 900\n')
#
# tstart = self.get_sim_time()
# success = False
# while self.get_sim_time() < tstart + timeout and not success:
# m = self.mav.recv_match(type='VFR_HUD', blocking=True)
# pos = self.mav.location()
# home_distance = self.get_distance(home, pos)
# progress("Alt: %u HomeDistance: %.0f" % (m.alt, home_distance))
# # check if we've reached home
# if home_distance <= distance_min:
# progress("RTL Complete")
# success = True
#
# # reduce throttle
# self.mavproxy.send('rc 3 1500\n')
# self.mavproxy.expect('APM: Failsafe ended')
# self.mavproxy.send('switch 2\n') # manual mode
# self.mav.wait_heartbeat()
# self.wait_mode('MANUAL')
#
# if success:
# progress("Reached failsafe home OK")
# return True
# else:
# progress("Failed to reach Home on failsafe RTL - timed out after %u seconds" % timeout)
# return False
#################################################
# AUTOTEST ALL
#################################################
def drive_mission(self, filename):
"""Drive a mission from a file."""
progress("Driving mission %s" % filename)
self.mavproxy.send('wp load %s\n' % filename)
self.mavproxy.expect('Flight plan received')
self.mavproxy.send('wp list\n')
self.mavproxy.expect('Requesting [0-9]+ waypoints')
self.mavproxy.send('switch 4\n') # auto mode
self.set_rc(3, 1500)
self.wait_mode('AUTO')
if not self.wait_waypoint(1, 4, max_dist=5):
return False return False
set_rc(mavproxy, mav, 1, 1500) self.wait_mode('HOLD')
progress("Starting leg %u" % i) progress("Mission OK")
if not wait_distance(mav, 50, accuracy=7): return True
def do_get_banner(self):
self.mavproxy.send("long DO_SEND_BANNER 1\n")
start = time.time()
while True:
m = self.mav.recv_match(type='STATUSTEXT', blocking=True, timeout=1)
if m is not None and "ArduRover" in m.text:
progress("banner received: %s" % m.text)
return True
if time.time() - start > 10:
break
progress("banner not received")
return False
def drive_brake_get_stopping_distance(self, speed):
# measure our stopping distance:
old_cruise_speed = self.get_parameter('CRUISE_SPEED')
old_accel_max = self.get_parameter('ATC_ACCEL_MAX')
# controller tends not to meet cruise speed (max of ~14 when 15
# set), thus *1.2
self.set_parameter('CRUISE_SPEED', speed*1.2)
# at time of writing, the vehicle is only capable of 10m/s/s accel
self.set_parameter('ATC_ACCEL_MAX', 15)
self.mavproxy.send("mode STEERING\n")
self.wait_mode('STEERING')
self.set_rc(3, 2000)
self.wait_groundspeed(15, 100)
initial = self.mav.location()
initial_time = time.time()
while time.time() - initial_time < 2:
# wait for a position update from the autopilot
start = self.mav.location()
if start != initial:
break
self.set_rc(3, 1500)
self.wait_groundspeed(0, 0.2) # why do we not stop?!
initial = self.mav.location()
initial_time = time.time()
while time.time() - initial_time < 2:
# wait for a position update from the autopilot
stop = self.mav.location()
if stop != initial:
break
delta = self.get_distance(start, stop)
self.set_parameter('CRUISE_SPEED', old_cruise_speed)
self.set_parameter('ATC_ACCEL_MAX', old_accel_max)
return delta
def drive_brake(self):
old_using_brake = self.get_parameter('ATC_BRAKE')
old_cruise_speed = self.get_parameter('CRUISE_SPEED')
self.set_parameter('CRUISE_SPEED', 15)
self.set_parameter('ATC_BRAKE', 0)
distance_without_brakes = self.drive_brake_get_stopping_distance(15)
# brakes on:
self.set_parameter('ATC_BRAKE', 1)
distance_with_brakes = self.drive_brake_get_stopping_distance(15)
# revert state:
self.set_parameter('ATC_BRAKE', old_using_brake)
self.set_parameter('CRUISE_SPEED', old_cruise_speed)
delta = distance_without_brakes - distance_with_brakes
if delta < distance_without_brakes * 0.05: # 5% isn't asking for much
progress("Brakes have negligible effect (with=%0.2fm without=%0.2fm delta=%0.2fm)" % (distance_with_brakes, distance_without_brakes, delta))
return False return False
set_rc(mavproxy, mav, 3, 1500) else:
progress("Circuit complete") progress("Brakes work (with=%0.2fm without=%0.2fm delta=%0.2fm)" % (distance_with_brakes, distance_without_brakes, delta))
return True
return True
def drive_RTL(mavproxy, mav): def drive_rtl_mission(self):
"""Drive to home.""" mission_filepath = os.path.join(testdir, "ArduRover-Missions", "rtl.txt")
progress("Driving home in RTL") self.mavproxy.send('wp load %s\n' % mission_filepath)
mavproxy.send('switch 3\n') self.mavproxy.expect('Flight plan received')
if not wait_location(mav, homeloc, accuracy=22, timeout=90): self.mavproxy.send('switch 4\n') # auto mode
return False self.set_rc(3, 1500)
progress("RTL Complete") self.wait_mode('AUTO')
return True self.mavproxy.expect('Executing RTL')
m = self.mav.recv_match(type='NAV_CONTROLLER_OUTPUT',
blocking=True,
timeout=0.1)
if m is None:
progress("Did not receive NAV_CONTROLLER_OUTPUT message")
return False
################################################# wp_dist_min = 5
# AUTOTEST ALL if m.wp_dist < wp_dist_min:
################################################# progress("Did not start at least 5 metres from destination")
def drive_mission(mavproxy, mav, filename): return False
"""Drive a mission from a file."""
global homeloc
progress("Driving mission %s" % filename)
mavproxy.send('wp load %s\n' % filename)
mavproxy.expect('Flight plan received')
mavproxy.send('wp list\n')
mavproxy.expect('Requesting [0-9]+ waypoints')
mavproxy.send('switch 4\n') # auto mode
set_rc(mavproxy, mav, 3, 1500)
wait_mode(mav, 'AUTO')
if not wait_waypoint(mav, 1, 4, max_dist=5):
return False
wait_mode(mav, 'HOLD')
progress("Mission OK")
return True
progress("NAV_CONTROLLER_OUTPUT.wp_dist looks good (%u >= %u)" %
(m.wp_dist, wp_dist_min,))
def do_get_banner(mavproxy, mav): self.wait_mode('HOLD')
mavproxy.send("long DO_SEND_BANNER 1\n")
start = time.time()
while True:
m = mav.recv_match(type='STATUSTEXT', blocking=True, timeout=1)
if m is not None and "ArduRover" in m.text:
progress("banner received: %s" % (m.text))
return True
if time.time() - start > 10:
break
progress("banner not received") pos = self.mav.location()
home_distance = self.get_distance(HOME, pos)
home_distance_max = 5
if home_distance > home_distance_max:
progress("Did not get home (%u metres distant > %u)" %
(home_distance, home_distance_max))
return False
self.mavproxy.send('switch 6\n')
self.wait_mode('MANUAL')
progress("RTL Mission OK")
return True
return False def autotest(self):
"""Autotest APMrover2 in SITL."""
if not self.hasInit:
self.init()
progress("Started simulator")
failed = False
e = 'None'
try:
progress("Waiting for a heartbeat with mavlink protocol %s" % self.mav.WIRE_PROTOCOL_VERSION)
self.mav.wait_heartbeat()
progress("Setting up RC parameters")
self.set_rc_default()
self.set_rc(8, 1800)
progress("Waiting for GPS fix")
self.mav.wait_gps_fix()
self.homeloc = self.mav.location()
progress("Home location: %s" % self.homeloc)
self.mavproxy.send('switch 6\n') # Manual mode
self.wait_mode('MANUAL')
progress("Waiting reading for arm")
self.wait_ready_to_arm()
if not self.arm_vehicle():
progress("Failed to ARM")
failed = True
def drive_brake_get_stopping_distance(mavproxy, mav, speed): progress("#")
# measure our stopping distance: progress("########## Drive an RTL mission ##########")
old_cruise_speed = get_parameter(mavproxy, 'CRUISE_SPEED') progress("#")
old_accel_max = get_parameter(mavproxy, 'ATC_ACCEL_MAX') # Drive a square in learning mode
# self.reset_and_arm()
if not self.drive_rtl_mission():
progress("Failed RTL mission")
failed = True
# controller tends not to meet cruise speed (max of ~14 when 15 progress("#")
# set), thus *1.2 progress("########## Drive a square and save WPs with CH7 switch ##########")
set_parameter(mavproxy, 'CRUISE_SPEED', speed*1.2) progress("#")
# at time of writing, the vehicle is only capable of 10m/s/s accel # Drive a square in learning mode
set_parameter(mavproxy, 'ATC_ACCEL_MAX', 15) # self.reset_and_arm()
mavproxy.send("mode STEERING\n") if not self.drive_square():
wait_mode(mav, 'STEERING') progress("Failed drive square")
set_rc(mavproxy, mav, 3, 2000) failed = True
wait_groundspeed(mav, 15, 100)
initial = mav.location()
initial_time = time.time()
while time.time() - initial_time < 2:
# wait for a position update from the autopilot
start = mav.location()
if start != initial:
break
set_rc(mavproxy, mav, 3, 1500)
wait_groundspeed(mav, 0, 0.2) # why do we not stop?!
initial = mav.location()
initial_time = time.time()
while time.time() - initial_time < 2:
# wait for a position update from the autopilot
stop = mav.location()
if stop != initial:
break
delta = get_distance(start, stop)
set_parameter(mavproxy, 'CRUISE_SPEED', old_cruise_speed) if not self.drive_mission(os.path.join(testdir, "rover1.txt")):
set_parameter(mavproxy, 'ATC_ACCEL_MAX', old_accel_max) progress("Failed mission")
failed = True
return delta if not self.drive_brake():
progress("Failed brake")
failed = True
if not self.disarm_vehicle():
progress("Failed to DISARM")
failed = True
def drive_brake(mavproxy, mav): # do not move this to be the first test. MAVProxy's dedupe
old_using_brake = get_parameter(mavproxy, 'ATC_BRAKE') # function may bite you.
old_cruise_speed = get_parameter(mavproxy, 'CRUISE_SPEED') progress("Getting banner")
if not self.do_get_banner():
progress("FAILED: get banner")
failed = True
set_parameter(mavproxy, 'CRUISE_SPEED', 15) progress("Getting autopilot capabilities")
set_parameter(mavproxy, 'ATC_BRAKE', 0) if not self.do_get_autopilot_capabilities():
progress("FAILED: get capabilities")
failed = True
distance_without_brakes = drive_brake_get_stopping_distance(mavproxy, mav, 15) progress("Setting mode via MAV_COMMAND_DO_SET_MODE")
if not self.do_set_mode_via_command_long():
failed = True
# brakes on: # Throttle Failsafe
set_parameter(mavproxy, 'ATC_BRAKE', 1) progress("#")
distance_with_brakes = drive_brake_get_stopping_distance(mavproxy, mav, 15) progress("########## Test Failsafe ##########")
# revert state: progress("#")
set_parameter(mavproxy, 'ATC_BRAKE', old_using_brake) # self.reset_and_arm()
set_parameter(mavproxy, 'CRUISE_SPEED', old_cruise_speed) # if not self.test_throttle_failsafe(HOME, distance_min=4):
# progress("Throttle failsafe failed")
# sucess = False
delta = distance_without_brakes - distance_with_brakes if not self.log_download(util.reltopdir("../buildlogs/APMrover2-log.bin")):
if delta < distance_without_brakes * 0.05: # 5% isn't asking for much progress("Failed log download")
progress("Brakes have negligible effect (with=%0.2fm without=%0.2fm delta=%0.2fm)" % (distance_with_brakes, distance_without_brakes, delta)) failed = True
return False # if not drive_left_circuit(self):
else: # progress("Failed left circuit")
progress("Brakes work (with=%0.2fm without=%0.2fm delta=%0.2fm)" % (distance_with_brakes, distance_without_brakes, delta)) # failed = True
# if not drive_RTL(self):
# progress("Failed RTL")
# failed = True
return True except pexpect.TIMEOUT as e:
progress("Failed with timeout")
vinfo = vehicleinfo.VehicleInfo()
def drive_rtl_mission(mavproxy, mav):
mission_filepath = os.path.join(testdir, "ArduRover-Missions", "rtl.txt")
mavproxy.send('wp load %s\n' % mission_filepath)
mavproxy.expect('Flight plan received')
mavproxy.send('switch 4\n') # auto mode
set_rc(mavproxy, mav, 3, 1500)
wait_mode(mav, 'AUTO')
mavproxy.expect('Executing RTL')
m = mav.recv_match(type='NAV_CONTROLLER_OUTPUT',
blocking=True,
timeout=0.1)
if m is None:
progress("Did not receive NAV_CONTROLLER_OUTPUT message")
return False
wp_dist_min = 5
if m.wp_dist < wp_dist_min:
progress("Did not start at least 5 metres from destination")
return False
progress("NAV_CONTROLLER_OUTPUT.wp_dist looks good (%u >= %u)" %
(m.wp_dist, wp_dist_min,))
wait_mode(mav, 'HOLD')
pos = mav.location()
home_distance = get_distance(HOME, pos)
home_distance_max = 5
if home_distance > home_distance_max:
progress("Did not get home (%u metres distant > %u)" %
(home_distance,home_distance_max))
return False
mavproxy.send('switch 6\n')
wait_mode(mav, 'MANUAL')
progress("RTL Mission OK")
return True
def drive_APMrover2(binary, viewerip=None, use_map=False, valgrind=False, gdb=False, frame=None, params=None, gdbserver=False, speedup=10):
"""Drive APMrover2 in SITL.
you can pass viewerip as an IP address to optionally send fg and
mavproxy packets too for local viewing of the mission in real time
"""
global homeloc
if frame is None:
frame = 'rover'
options = '--sitl=127.0.0.1:5501 --out=127.0.0.1:19550 --streamrate=10'
if viewerip:
options += " --out=%s:14550" % viewerip
if use_map:
options += ' --map'
home = "%f,%f,%u,%u" % (HOME.lat, HOME.lng, HOME.alt, HOME.heading)
sitl = util.start_SITL(binary, wipe=True, model=frame, home=home, speedup=speedup)
mavproxy = util.start_MAVProxy_SITL('APMrover2')
progress("WAITING FOR PARAMETERS")
mavproxy.expect('Received [0-9]+ parameters')
# setup test parameters
if params is None:
params = vinfo.options["APMrover2"]["frames"][frame]["default_params_filename"]
if not isinstance(params, list):
params = [params]
for x in params:
mavproxy.send("param load %s\n" % os.path.join(testdir, x))
mavproxy.expect('Loaded [0-9]+ parameters')
set_parameter(mavproxy, 'LOG_REPLAY', 1)
set_parameter(mavproxy, 'LOG_DISARMED', 1)
# restart with new parms
util.pexpect_close(mavproxy)
util.pexpect_close(sitl)
sitl = util.start_SITL(binary, model='rover', home=home, speedup=speedup, valgrind=valgrind, gdb=gdb, gdbserver=gdbserver)
mavproxy = util.start_MAVProxy_SITL('APMrover2', options=options)
mavproxy.expect('Telemetry log: (\S+)\r\n')
logfile = mavproxy.match.group(1)
progress("LOGFILE %s" % logfile)
buildlog = util.reltopdir("../buildlogs/APMrover2-test.tlog")
progress("buildlog=%s" % buildlog)
if os.path.exists(buildlog):
os.unlink(buildlog)
try:
os.link(logfile, buildlog)
except Exception:
pass
mavproxy.expect('Received [0-9]+ parameters')
util.expect_setup_callback(mavproxy, expect_callback)
expect_list_clear()
expect_list_extend([sitl, mavproxy])
progress("Started simulator")
# get a mavlink connection going
try:
mav = mavutil.mavlink_connection('127.0.0.1:19550', robust_parsing=True)
except Exception as msg:
progress("Failed to start mavlink connection on 127.0.0.1:19550" % msg)
raise
mav.message_hooks.append(message_hook)
mav.idle_hooks.append(idle_hook)
failed = False
e = 'None'
try:
progress("Waiting for a heartbeat with mavlink protocol %s" % mav.WIRE_PROTOCOL_VERSION)
mav.wait_heartbeat()
progress("Setting up RC parameters")
set_rc_default(mavproxy)
set_rc(mavproxy, mav, 8, 1800)
progress("Waiting for GPS fix")
mav.wait_gps_fix()
homeloc = mav.location()
progress("Home location: %s" % homeloc)
mavproxy.send('switch 6\n') # Manual mode
wait_mode(mav, 'MANUAL')
progress("Waiting reading for arm")
wait_ready_to_arm(mav)
if not arm_vehicle(mavproxy, mav):
progress("Failed to ARM")
failed = True failed = True
progress("#") self.close()
progress("########## Drive an RTL mission ##########")
progress("#")
# Drive a square in learning mode
if not drive_rtl_mission(mavproxy, mav):
progress("Failed RTL mission")
failed = True
progress("#") if failed:
progress("########## Drive a square and save WPs with CH7 switch ##########") progress("FAILED: %s" % e)
progress("#") return False
# Drive a square in learning mode return True
if not drive_mission(mavproxy, mav, os.path.join(testdir, "rover1.txt")):
progress("Failed mission")
failed = True
if not drive_brake(mavproxy, mav):
progress("Failed brake")
failed = True
if not disarm_vehicle(mavproxy, mav):
progress("Failed to DISARM")
failed = True
# do not move this to be the first test. MAVProxy's dedupe
# function may bite you.
progress("Getting banner")
if not do_get_banner(mavproxy, mav):
progress("FAILED: get banner")
failed = True
progress("Getting autopilot capabilities")
if not do_get_autopilot_capabilities(mavproxy, mav):
progress("FAILED: get capabilities")
failed = True
progress("Setting mode via MAV_COMMAND_DO_SET_MODE")
if not do_set_mode_via_command_long(mavproxy, mav):
failed = True
if not log_download(mavproxy, mav, util.reltopdir("../buildlogs/APMrover2-log.bin")):
progress("Failed log download")
failed = True
# if not drive_left_circuit(mavproxy, mav):
# progress("Failed left circuit")
# failed = True
# if not drive_RTL(mavproxy, mav):
# progress("Failed RTL")
# failed = True
except pexpect.TIMEOUT as e:
progress("Failed with timeout")
failed = True
mav.close()
util.pexpect_close(mavproxy)
util.pexpect_close(sitl)
valgrind_log = util.valgrind_log_filepath(binary=binary, model='rover')
if os.path.exists(valgrind_log):
os.chmod(valgrind_log, 0o644)
shutil.copy(valgrind_log, util.reltopdir("../buildlogs/APMrover2-valgrind.log"))
if failed:
progress("FAILED: %s" % e)
return False
return True

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -10,179 +10,216 @@ from pymavlink import mavutil
from common import * from common import *
from pysim import util from pysim import util
from pysim import vehicleinfo
# get location of scripts # get location of scripts
testdir = os.path.dirname(os.path.realpath(__file__)) testdir = os.path.dirname(os.path.realpath(__file__))
HOME = mavutil.location(33.810313, -118.393867, 0, 185) HOME = mavutil.location(33.810313, -118.393867, 0, 185)
homeloc = None
def dive_manual(mavproxy, mav): class AutotestSub(Autotest):
set_rc(mavproxy, mav, 3, 1600) def __init__(self, binary, viewerip=None, use_map=False, valgrind=False, gdb=False, speedup=10, frame=None, params=None, gdbserver=False):
set_rc(mavproxy, mav, 5, 1600) super(AutotestSub, self).__init__()
set_rc(mavproxy, mav, 6, 1550) self.binary = binary
self.options = '--sitl=127.0.0.1:5501 --out=127.0.0.1:19550 --streamrate=10'
self.viewerip = viewerip
self.use_map = use_map
self.valgrind = valgrind
self.gdb = gdb
self.frame = frame
self.params = params
self.gdbserver = gdbserver
if not wait_distance(mav, 50, accuracy=7, timeout=200): self.home = "%f,%f,%u,%u" % (HOME.lat, HOME.lng, HOME.alt, HOME.heading)
return False self.homeloc = None
self.speedup = speedup
self.speedup_default = 10
set_rc(mavproxy, mav, 4, 1550) self.sitl = None
self.hasInit = False
if not wait_heading(mav, 0): def init(self):
return False if self.frame is None:
self.frame = 'vectored'
set_rc(mavproxy, mav, 4, 1500) if self.viewerip:
self.options += " --out=%s:14550" % self.viewerip
if self.use_map:
self.options += ' --map'
if not wait_distance(mav, 50, accuracy=7, timeout=100): self.sitl = util.start_SITL(self.binary, wipe=True, model=self.frame, home=self.home,
return False speedup=self.speedup_default)
self.mavproxy = util.start_MAVProxy_SITL('ArduSub')
set_rc(mavproxy, mav, 4, 1550) progress("WAITING FOR PARAMETERS")
self.mavproxy.expect('Received [0-9]+ parameters')
if not wait_heading(mav, 0): # setup test parameters
return False vinfo = vehicleinfo.VehicleInfo()
if self.params is None:
self.params = vinfo.options["ArduSub"]["frames"][self.frame]["default_params_filename"]
if not isinstance(self.params, list):
self.params = [self.params]
for x in self.params:
self.mavproxy.send("param load %s\n" % os.path.join(testdir, x))
self.mavproxy.expect('Loaded [0-9]+ parameters')
self.set_parameter('LOG_REPLAY', 1)
self.set_parameter('LOG_DISARMED', 1)
progress("RELOADING SITL WITH NEW PARAMETERS")
set_rc(mavproxy, mav, 4, 1500) # restart with new parms
set_rc(mavproxy, mav, 5, 1500) util.pexpect_close(self.mavproxy)
set_rc(mavproxy, mav, 6, 1100) util.pexpect_close(self.sitl)
if not wait_distance(mav, 75, accuracy=7, timeout=100): self.sitl = util.start_SITL(self.binary, model=self.frame, home=self.home, speedup=self.speedup,
return False valgrind=self.valgrind, gdb=self.gdb, gdbserver=self.gdbserver)
self.mavproxy = util.start_MAVProxy_SITL('ArduSub', options=self.options)
self.mavproxy.expect('Telemetry log: (\S+)')
logfile = self.mavproxy.match.group(1)
progress("LOGFILE %s" % logfile)
set_rc_default(mavproxy) buildlog = util.reltopdir("../buildlogs/ArduSub-test.tlog")
progress("buildlog=%s" % buildlog)
if os.path.exists(buildlog):
os.unlink(buildlog)
try:
os.link(logfile, buildlog)
except Exception:
pass
disarm_vehicle(mavproxy, mav) self.mavproxy.expect('Received [0-9]+ parameters')
progress("Manual dive OK")
return True
util.expect_setup_callback(self.mavproxy, expect_callback)
def dive_mission(mavproxy, mav, filename): expect_list_clear()
expect_list_extend([self.sitl, self.mavproxy])
progress("Executing mission %s" % filename) progress("Started simulator")
mavproxy.send('wp load %s\n' % filename)
mavproxy.expect('Flight plan received')
mavproxy.send('wp list\n')
mavproxy.expect('Saved [0-9]+ waypoints')
set_rc_default(mavproxy)
if not arm_vehicle(mavproxy, mav): # get a mavlink connection going
progress("Failed to ARM") try:
return False self.mav = mavutil.mavlink_connection('127.0.0.1:19550', robust_parsing=True)
except Exception as msg:
progress("Failed to start mavlink connection on 127.0.0.1:19550" % msg)
raise
self.mav.message_hooks.append(message_hook)
self.mav.idle_hooks.append(idle_hook)
self.hasInit = True
progress("Ready to start testing!")
mavproxy.send('mode auto\n') def close(self):
wait_mode(mav, 'AUTO') if self.use_map:
self.mavproxy.send("module unload map\n")
self.mavproxy.expect("Unloaded module map")
if not wait_waypoint(mav, 1, 5, max_dist=5): self.mav.close()
return False util.pexpect_close(self.mavproxy)
util.pexpect_close(self.sitl)
disarm_vehicle(mavproxy, mav) valgrind_log = util.valgrind_log_filepath(binary=self.binary, model=self.frame)
if os.path.exists(valgrind_log):
os.chmod(valgrind_log, 0o644)
shutil.copy(valgrind_log, util.reltopdir("../buildlogs/ArduSub-valgrind.log"))
progress("Mission OK") def dive_manual(self):
return True self.set_rc(3, 1600)
self.set_rc(5, 1600)
self.set_rc(6, 1550)
if not self.wait_distance(50, accuracy=7, timeout=200):
return False
def dive_ArduSub(binary, viewerip=None, use_map=False, valgrind=False, gdb=False, gdbserver=False, speedup=10): self.set_rc(4, 1550)
"""Dive ArduSub in SITL.
you can pass viewerip as an IP address to optionally send fg and if not self.wait_heading(0):
mavproxy packets too for local viewing of the mission in real time return False
"""
options = '--sitl=127.0.0.1:5501 --out=127.0.0.1:19550 --streamrate=10'
if viewerip:
options += " --out=%s:14550" % viewerip
if use_map:
options += ' --map'
home = "%f,%f,%u,%u" % (HOME.lat, HOME.lng, HOME.alt, HOME.heading) self.set_rc(4, 1500)
sitl = util.start_SITL(binary, model='vectored', wipe=True, home=home, speedup=speedup)
mavproxy = util.start_MAVProxy_SITL('ArduSub')
mavproxy.expect('Received [0-9]+ parameters')
# setup test parameters if not self.wait_distance(50, accuracy=7, timeout=100):
mavproxy.send("param load %s/default_params/sub.parm\n" % testdir) return False
mavproxy.expect('Loaded [0-9]+ parameters')
mavproxy.send('param set FS_GCS_ENABLE 0\n')
mavproxy.send("param set LOG_REPLAY 1\n")
mavproxy.send("param set LOG_DISARMED 1\n")
time.sleep(3)
# reboot with new parameters self.set_rc(4, 1550)
util.pexpect_close(mavproxy)
util.pexpect_close(sitl)
sitl = util.start_SITL(binary, model='vectored', home=home, speedup=speedup, valgrind=valgrind, gdb=gdb, gdbserver=gdbserver) if not self.wait_heading(0):
mavproxy = util.start_MAVProxy_SITL('ArduSub', options=options) return False
mavproxy.expect('Telemetry log: (\S+)\r\n')
logfile = mavproxy.match.group(1)
progress("LOGFILE %s" % logfile)
buildlog = util.reltopdir("../buildlogs/ArduSub-test.tlog") self.set_rc(4, 1500)
progress("buildlog=%s" % buildlog) self.set_rc(5, 1500)
if os.path.exists(buildlog): self.set_rc(6, 1100)
os.unlink(buildlog)
try:
os.link(logfile, buildlog)
except Exception:
pass
mavproxy.expect('Received [0-9]+ parameters') if not self.wait_distance(75, accuracy=7, timeout=100):
return False
util.expect_setup_callback(mavproxy, expect_callback) self.set_rc_default()
expect_list_clear() self.disarm_vehicle()
expect_list_extend([sitl, mavproxy]) progress("Manual dive OK")
return True
progress("Started simulator") def dive_mission(self, filename):
# get a mavlink connection going progress("Executing mission %s" % filename)
try: self.mavproxy.send('wp load %s\n' % filename)
mav = mavutil.mavlink_connection('127.0.0.1:19550', robust_parsing=True) self.mavproxy.expect('Flight plan received')
except Exception as msg: self.mavproxy.send('wp list\n')
progress("Failed to start mavlink connection on 127.0.0.1:19550" % msg) self.mavproxy.expect('Saved [0-9]+ waypoints')
raise self.set_rc_default()
mav.message_hooks.append(message_hook)
mav.idle_hooks.append(idle_hook)
failed = False if not self.arm_vehicle():
e = 'None'
try:
progress("Waiting for a heartbeat with mavlink protocol %s" % mav.WIRE_PROTOCOL_VERSION)
mav.wait_heartbeat()
progress("Waiting for GPS fix")
mav.wait_gps_fix()
# wait for EKF and GPS checks to pass
mavproxy.expect('IMU0 is using GPS')
homeloc = mav.location()
progress("Home location: %s" % homeloc)
set_rc_default(mavproxy)
if not arm_vehicle(mavproxy, mav):
progress("Failed to ARM") progress("Failed to ARM")
failed = True return False
if not dive_manual(mavproxy, mav):
progress("Failed manual dive")
failed = True
if not dive_mission(mavproxy, mav, os.path.join(testdir, "sub_mission.txt")):
progress("Failed auto mission")
failed = True
if not log_download(mavproxy, mav, util.reltopdir("../buildlogs/ArduSub-log.bin")):
progress("Failed log download")
failed = True
except pexpect.TIMEOUT as e:
progress("Failed with timeout")
failed = True
mav.close() self.mavproxy.send('mode auto\n')
util.pexpect_close(mavproxy) self.wait_mode('AUTO')
util.pexpect_close(sitl)
valgrind_log = util.valgrind_log_filepath(binary=binary, model='sub') if not self.wait_waypoint(1, 5, max_dist=5):
if os.path.exists(valgrind_log): return False
os.chmod(valgrind_log, 0o644)
shutil.copy(valgrind_log, util.reltopdir("../buildlogs/APMrover2-valgrind.log"))
if failed: self.disarm_vehicle()
progress("FAILED: %s" % e)
return False progress("Mission OK")
return True return True
def autotest(self):
"""Autotest ArduSub in SITL."""
if not self.hasInit:
self.init()
failed = False
e = 'None'
try:
progress("Waiting for a heartbeat with mavlink protocol %s" % self.mav.WIRE_PROTOCOL_VERSION)
self.mav.wait_heartbeat()
self.mavproxy.send('param set FS_GCS_ENABLE 0\n')
progress("Waiting for GPS fix")
self.mav.wait_gps_fix()
# wait for EKF and GPS checks to pass
self.mavproxy.expect('IMU0 is using GPS')
self.homeloc = self.mav.location()
progress("Home location: %s" % self.homeloc)
self.set_rc_default()
if not self.arm_vehicle():
progress("Failed to ARM")
failed = True
if not self.dive_manual():
progress("Failed manual dive")
failed = True
if not self.dive_mission(os.path.join(testdir, "sub_mission.txt")):
progress("Failed auto mission")
failed = True
if not self.log_download(util.reltopdir("../buildlogs/ArduSub-log.bin")):
progress("Failed log download")
failed = True
except pexpect.TIMEOUT as e:
progress("Failed with timeout")
failed = True
self.close()
if failed:
progress("FAILED: %s" % e)
return False
return True

View File

@ -15,11 +15,12 @@ import sys
import time import time
import traceback import traceback
import apmrover2 from apmrover2 import *
import arducopter from arducopter import *
import arduplane from quadplane import *
import quadplane from arduplane import *
import ardusub from ardusub import *
from pysim import util from pysim import util
from pymavlink import mavutil from pymavlink import mavutil
from pymavlink.generator import mavtemplate from pymavlink.generator import mavtemplate
@ -260,22 +261,28 @@ def run_step(step):
fly_opts["speedup"] = opts.speedup fly_opts["speedup"] = opts.speedup
if step == 'fly.ArduCopter': if step == 'fly.ArduCopter':
return arducopter.fly_ArduCopter(binary, frame=opts.frame, **fly_opts) arducopter = AutotestCopter(binary, frame=opts.frame, **fly_opts)
return arducopter.autotest()
if step == 'fly.CopterAVC': if step == 'fly.CopterAVC':
return arducopter.fly_CopterAVC(binary, **fly_opts) arducopter = AutotestCopter(binary, **fly_opts)
return arducopter.autotest_heli()
if step == 'fly.ArduPlane': if step == 'fly.ArduPlane':
return arduplane.fly_ArduPlane(binary, **fly_opts) arduplane = AutotestPlane(binary, **fly_opts)
return arduplane.autotest()
if step == 'fly.QuadPlane': if step == 'fly.QuadPlane':
return quadplane.fly_QuadPlane(binary, **fly_opts) quadplane = AutotestQuadPlane(binary, **fly_opts)
return quadplane.autotest()
if step == 'drive.APMrover2': if step == 'drive.APMrover2':
return apmrover2.drive_APMrover2(binary, frame=opts.frame, **fly_opts) apmrover2 = AutotestRover(binary, frame=opts.frame, **fly_opts)
return apmrover2.autotest()
if step == 'dive.ArduSub': if step == 'dive.ArduSub':
return ardusub.dive_ArduSub(binary, **fly_opts) ardusub = AutotestSub(binary, **fly_opts)
return ardusub.autotest()
if step == 'build.All': if step == 'build.All':
return build_all() return build_all()

View File

@ -6,10 +6,28 @@ from pymavlink import mavwp, mavutil
from pysim import util from pysim import util
import sys
import abc
import os
# a list of pexpect objects to read while waiting for # a list of pexpect objects to read while waiting for
# messages. This keeps the output to stdout flowing # messages. This keeps the output to stdout flowing
expect_list = [] expect_list = []
# get location of scripts
testdir = os.path.dirname(os.path.realpath(__file__))
# Check python version for abstract base class
if sys.version_info[0] >= 3 and sys.version_info[1] >= 4:
ABC = abc.ABC
else:
ABC = abc.ABCMeta('ABC', (), {})
def progress(text):
"""Display autotest progress text."""
print("AUTOTEST: " + text)
class AutoTestTimeoutException(Exception): class AutoTestTimeoutException(Exception):
pass pass
@ -52,393 +70,530 @@ def expect_callback(e):
util.pexpect_drain(p) util.pexpect_drain(p)
################################################# class Autotest(ABC):
# SIM UTILITIES """Base abstract class.
################################################# It implements the common function for all vehicle types.
def progress(text): """
"""Display autotest progress text.""" def __init__(self):
print("AUTOTEST: " + text) self.mavproxy = None
self.mav = None
#################################################
# SIM UTILITIES
#################################################
def get_sim_time(self):
"""Get SITL time."""
m = self.mav.recv_match(type='SYSTEM_TIME', blocking=True)
return m.time_boot_ms * 1.0e-3
def get_sim_time(mav): def sim_location(self):
"""Get SITL time.""" """Return current simulator location."""
m = mav.recv_match(type='SYSTEM_TIME', blocking=True) m = self.mav.recv_match(type='SIMSTATE', blocking=True)
return m.time_boot_ms * 1.0e-3 return mavutil.location(m.lat*1.0e-7, m.lng*1.0e-7, 0, math.degrees(m.yaw))
def save_wp(self):
"""Trigger RC 7 to save waypoint."""
self.mavproxy.send('rc 7 1000\n')
self.mav.recv_match(condition='RC_CHANNELS.chan7_raw==1000', blocking=True)
self.wait_seconds(1)
self.mavproxy.send('rc 7 2000\n')
self.mav.recv_match(condition='RC_CHANNELS.chan7_raw==2000', blocking=True)
self.wait_seconds(1)
self.mavproxy.send('rc 7 1000\n')
self.mav.recv_match(condition='RC_CHANNELS.chan7_raw==1000', blocking=True)
self.wait_seconds(1)
def sim_location(mav): def log_download(self, filename, timeout=360):
"""Return current simulator location.""" """Download latest log."""
m = mav.recv_match(type='SIMSTATE', blocking=True) self.disarm_vehicle()
return mavutil.location(m.lat*1.0e-7, m.lng*1.0e-7, 0, math.degrees(m.yaw)) self.mav.wait_heartbeat()
self.mavproxy.send("log list\n")
self.mavproxy.expect("numLogs")
self.mav.wait_heartbeat()
self.mav.wait_heartbeat()
self.mavproxy.send("set shownoise 0\n")
self.mavproxy.send("log download latest %s\n" % filename)
self.mavproxy.expect("Finished downloading", timeout=timeout)
self.mav.wait_heartbeat()
self.mav.wait_heartbeat()
return True
def show_gps_and_sim_positions(self, on_off):
"""Allow to display gps and actual position on map."""
if on_off is True:
# turn on simulator display of gps and actual position
self.mavproxy.send('map set showgpspos 1\n')
self.mavproxy.send('map set showsimpos 1\n')
else:
# turn off simulator display of gps and actual position
self.mavproxy.send('map set showgpspos 0\n')
self.mavproxy.send('map set showsimpos 0\n')
def save_wp(mavproxy, mav): def mission_count(self, filename):
"""Trigger RC 7 to save waypoint.""" """Load a mission from a file and return number of waypoints."""
mavproxy.send('rc 7 1000\n') wploader = mavwp.MAVWPLoader()
mav.recv_match(condition='RC_CHANNELS.chan7_raw==1000', blocking=True) wploader.load(filename)
wait_seconds(mav, 1) num_wp = wploader.count()
mavproxy.send('rc 7 2000\n') return num_wp
mav.recv_match(condition='RC_CHANNELS.chan7_raw==2000', blocking=True)
wait_seconds(mav, 1)
mavproxy.send('rc 7 1000\n')
mav.recv_match(condition='RC_CHANNELS.chan7_raw==1000', blocking=True)
wait_seconds(mav, 1)
def load_mission_from_file(self, filename):
"""Load a mission from a file to flight controller."""
self.mavproxy.send('wp load %s\n' % filename)
self.mavproxy.expect('Flight plan received')
self.mavproxy.send('wp list\n')
self.mavproxy.expect('Requesting [0-9]+ waypoints')
def log_download(mavproxy, mav, filename, timeout=360): # update num_wp
"""Download latest log.""" wploader = mavwp.MAVWPLoader()
mavproxy.send("log list\n") wploader.load(filename)
mavproxy.expect("numLogs") num_wp = wploader.count()
mav.wait_heartbeat() return num_wp
mav.wait_heartbeat()
mavproxy.send("set shownoise 0\n")
mavproxy.send("log download latest %s\n" % filename)
mavproxy.expect("Finished downloading", timeout=timeout)
mav.wait_heartbeat()
mav.wait_heartbeat()
return True
def save_mission_to_file(self, filename):
"""Save a mission to a file"""
self.mavproxy.send('wp save %s\n' % filename)
self.mavproxy.expect('Saved ([0-9]+) waypoints')
num_wp = int(self.mavproxy.match.group(1))
progress("num_wp: %d" % num_wp)
return num_wp
def show_gps_and_sim_positions(mavproxy, on_off): def set_rc_default(self):
"""Allow to display gps and actual position on map.""" """Setup all simulated RC control to 1500."""
if on_off is True: for chan in range(1, 16):
# turn on simulator display of gps and actual position self.mavproxy.send('rc %u 1500\n' % chan)
mavproxy.send('map set showgpspos 1\n')
mavproxy.send('map set showsimpos 1\n')
else:
# turn off simulator display of gps and actual position
mavproxy.send('map set showgpspos 0\n')
mavproxy.send('map set showsimpos 0\n')
def set_rc(self, chan, pwm, timeout=5):
def mission_count(filename): """Setup a simulated RC control to a PWM value"""
"""Load a mission from a file and return number of waypoints.""" tstart = self.get_sim_time()
wploader = mavwp.MAVWPLoader() while self.get_sim_time() < tstart + timeout:
wploader.load(filename) self.mavproxy.send('rc %u %u\n' % (chan, pwm))
num_wp = wploader.count() m = self.mav.recv_match(type='RC_CHANNELS', blocking=True)
return num_wp chan_pwm = getattr(m, "chan" + str(chan) + "_raw")
if chan_pwm == pwm:
return True
def load_mission_from_file(mavproxy, filename): progress("Failed to send RC commands")
"""Load a mission from a file to flight controller."""
mavproxy.send('wp load %s\n' % filename)
mavproxy.expect('Flight plan received')
mavproxy.send('wp list\n')
mavproxy.expect('Requesting [0-9]+ waypoints')
# update num_wp
wploader = mavwp.MAVWPLoader()
wploader.load(filename)
num_wp = wploader.count()
return num_wp
def save_mission_to_file(mavproxy, filename):
"""Save a mission to a file"""
mavproxy.send('wp save %s\n' % filename)
mavproxy.expect('Saved ([0-9]+) waypoints')
num_wp = int(mavproxy.match.group(1))
progress("num_wp: %d" % num_wp)
return num_wp
def set_rc_default(mavproxy):
"""Setup all simulated RC control to 1500."""
for chan in range(1, 16):
mavproxy.send('rc %u 1500\n' % chan)
def set_rc(mavproxy, mav, chan, pwm, timeout=5):
"""Setup a simulated RC control to a PWM value"""
tstart = get_sim_time(mav)
while get_sim_time(mav) < tstart + timeout:
mavproxy.send('rc %u %u\n' % (chan, pwm))
m = mav.recv_match(type='RC_CHANNELS', blocking=True)
chan_pwm = getattr(m, "chan" + str(chan) + "_raw")
if chan_pwm == pwm:
return True
progress("Failed to send RC commands")
return False
def arm_vehicle(mavproxy, mav):
"""Arm vehicle with mavlink arm message."""
mavproxy.send('arm throttle\n')
mav.motors_armed_wait()
progress("ARMED")
return True
def disarm_vehicle(mavproxy, mav):
"""Disarm vehicle with mavlink disarm message."""
mavproxy.send('disarm\n')
mav.motors_disarmed_wait()
progress("DISARMED")
return True
def set_parameter(mavproxy, name, value):
for i in range(1, 10):
mavproxy.send("param set %s %s\n" % (name, str(value)))
mavproxy.send("param fetch %s\n" % (name))
mavproxy.expect("%s = (.*)" % (name,))
returned_value = mavproxy.match.group(1)
if float(returned_value) == float(value):
# yes, exactly equal.
break
progress("Param fetch returned incorrect value (%s) vs (%s)" % (returned_value, value))
def get_parameter(mavproxy, name):
mavproxy.send("param fetch %s\n" % (name))
mavproxy.expect("%s = (.*)" % (name,))
return float(mavproxy.match.group(1))
#################################################
# UTILITIES
#################################################
def get_distance(loc1, loc2):
"""Get ground distance between two locations."""
dlat = loc2.lat - loc1.lat
dlong = loc2.lng - loc1.lng
return math.sqrt((dlat*dlat) + (dlong*dlong)) * 1.113195e5
def get_bearing(loc1, loc2):
"""Get bearing from loc1 to loc2."""
off_x = loc2.lng - loc1.lng
off_y = loc2.lat - loc1.lat
bearing = 90.00 + math.atan2(-off_y, off_x) * 57.2957795
if bearing < 0:
bearing += 360.00
return bearing
def do_get_autopilot_capabilities(mavproxy, mav):
mavproxy.send("long REQUEST_AUTOPILOT_CAPABILITIES 1\n")
m = mav.recv_match(type='AUTOPILOT_VERSION', blocking=True, timeout=10)
if m is None:
progress("AUTOPILOT_VERSION not received")
return False return False
progress("AUTOPILOT_VERSION received")
return True
def arm_vehicle(self):
"""Arm vehicle with mavlink arm message."""
self.mavproxy.send('arm throttle\n')
self.mav.motors_armed_wait()
progress("ARMED")
return True
def do_set_mode_via_command_long(mavproxy, mav): def disarm_vehicle(self):
base_mode = mavutil.mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED """Disarm vehicle with mavlink disarm message."""
custom_mode = 4 # hold self.mavproxy.send('disarm\n')
start = time.time() self.mav.motors_disarmed_wait()
while time.time() - start < 5: progress("DISARMED")
mavproxy.send("long DO_SET_MODE %u %u\n" % (base_mode,custom_mode)) return True
m = mav.recv_match(type='HEARTBEAT', blocking=True, timeout=10)
def set_parameter(self, name, value):
for i in range(1, 10):
self.mavproxy.send("param set %s %s\n" % (name, str(value)))
self.mavproxy.send("param fetch %s\n" % name)
self.mavproxy.expect("%s = (.*)" % (name,))
returned_value = self.mavproxy.match.group(1)
if float(returned_value) == float(value):
# yes, exactly equal.
break
progress("Param fetch returned incorrect value (%s) vs (%s)" % (returned_value, value))
def get_parameter(self, name):
self.mavproxy.send("param fetch %s\n" % name)
self.mavproxy.expect("%s = (.*)" % (name,))
return float(self.mavproxy.match.group(1))
#################################################
# UTILITIES
#################################################
@staticmethod
def get_distance(loc1, loc2):
"""Get ground distance between two locations."""
dlat = loc2.lat - loc1.lat
dlong = loc2.lng - loc1.lng
return math.sqrt((dlat*dlat) + (dlong*dlong)) * 1.113195e5
@staticmethod
def get_bearing(loc1, loc2):
"""Get bearing from loc1 to loc2."""
off_x = loc2.lng - loc1.lng
off_y = loc2.lat - loc1.lat
bearing = 90.00 + math.atan2(-off_y, off_x) * 57.2957795
if bearing < 0:
bearing += 360.00
return bearing
def do_get_autopilot_capabilities(self):
self.mavproxy.send("long REQUEST_AUTOPILOT_CAPABILITIES 1\n")
m = self.mav.recv_match(type='AUTOPILOT_VERSION', blocking=True, timeout=10)
if m is None: if m is None:
progress("AUTOPILOT_VERSION not received")
return False return False
if m.custom_mode == custom_mode: progress("AUTOPILOT_VERSION received")
return True return True
time.sleep(0.1)
return False
def do_set_mode_via_command_long(self):
base_mode = mavutil.mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED
custom_mode = 4 # hold
start = time.time()
while time.time() - start < 5:
self.mavproxy.send("long DO_SET_MODE %u %u\n" % (base_mode, custom_mode))
m = self.mav.recv_match(type='HEARTBEAT', blocking=True, timeout=10)
if m is None:
return False
if m.custom_mode == custom_mode:
return True
time.sleep(0.1)
return False
################################################# def reach_heading_manual(self, heading):
# WAIT UTILITIES """Manually direct the vehicle to the target heading."""
################################################# if self.mav.mav_type in [mavutil.mavlink.MAV_TYPE_QUADROTOR,
def wait_seconds(mav, seconds_to_wait): mavutil.mavlink.MAV_TYPE_HELICOPTER,
"""Wait some second in SITL time.""" mavutil.mavlink.MAV_TYPE_HEXAROTOR,
tstart = get_sim_time(mav) mavutil.mavlink.MAV_TYPE_OCTOROTOR,
tnow = tstart mavutil.mavlink.MAV_TYPE_COAXIAL,
while tstart + seconds_to_wait > tnow: mavutil.mavlink.MAV_TYPE_TRICOPTER]:
tnow = get_sim_time(mav) self.mavproxy.send('rc 4 1580\n')
if not self.wait_heading(heading):
progress("Failed to reach heading")
return False
self.mavproxy.send('rc 4 1500\n')
self.mav.recv_match(condition='RC_CHANNELS.chan4_raw==1500', blocking=True)
if self.mav.mav_type == mavutil.mavlink.MAV_TYPE_FIXED_WING:
progress("NOT IMPLEMENTED")
if self.mav.mav_type == mavutil.mavlink.MAV_TYPE_GROUND_ROVER:
self.mavproxy.send('rc 1 1700\n')
self.mavproxy.send('rc 3 1550\n')
if not self.wait_heading(heading):
progress("Failed to reach heading")
return False
self.mavproxy.send('rc 3 1500\n')
self.mav.recv_match(condition='RC_CHANNELS.chan3_raw==1500', blocking=True)
self.mavproxy.send('rc 1 1500\n')
self.mav.recv_match(condition='RC_CHANNELS.chan1_raw==1500', blocking=True)
return True
def reach_distance_manual(self, distance):
"""Manually direct the vehicle to the target distance from home."""
if self.mav.mav_type in [mavutil.mavlink.MAV_TYPE_QUADROTOR,
mavutil.mavlink.MAV_TYPE_HELICOPTER,
mavutil.mavlink.MAV_TYPE_HEXAROTOR,
mavutil.mavlink.MAV_TYPE_OCTOROTOR,
mavutil.mavlink.MAV_TYPE_COAXIAL,
mavutil.mavlink.MAV_TYPE_TRICOPTER]:
self.mavproxy.send('rc 2 1350\n')
if not self.wait_distance(distance, accuracy=5, timeout=60):
progress("Failed to reach distance of %u" % distance)
return False
self.mavproxy.send('rc 2 1500\n')
self.mav.recv_match(condition='RC_CHANNELS.chan2_raw==1500', blocking=True)
if self.mav.mav_type == mavutil.mavlink.MAV_TYPE_FIXED_WING:
progress("NOT IMPLEMENTED")
if self.mav.mav_type == mavutil.mavlink.MAV_TYPE_GROUND_ROVER:
self.mavproxy.send('rc 3 1700\n')
if not self.wait_distance(distance, accuracy=2):
progress("Failed to reach distance of %u" % distance)
return False
self.mavproxy.send('rc 3 1500\n')
self.mav.recv_match(condition='RC_CHANNELS.chan3_raw==1500', blocking=True)
return True
def wait_altitude(mav, alt_min, alt_max, timeout=30): #################################################
"""Wait for a given altitude range.""" # WAIT UTILITIES
climb_rate = 0 #################################################
previous_alt = 0 def wait_seconds(self, seconds_to_wait):
"""Wait some second in SITL time."""
tstart = self.get_sim_time()
tnow = tstart
while tstart + seconds_to_wait > tnow:
tnow = self.get_sim_time()
tstart = get_sim_time(mav) def wait_altitude(self, alt_min, alt_max, timeout=30):
progress("Waiting for altitude between %u and %u" % (alt_min, alt_max)) """Wait for a given altitude range."""
while get_sim_time(mav) < tstart + timeout: climb_rate = 0
m = mav.recv_match(type='VFR_HUD', blocking=True) previous_alt = 0
climb_rate = m.alt - previous_alt
previous_alt = m.alt
progress("Wait Altitude: Cur:%u, min_alt:%u, climb_rate: %u" % (m.alt, alt_min, climb_rate))
if m.alt >= alt_min and m.alt <= alt_max:
progress("Altitude OK")
return True
progress("Failed to attain altitude range")
return False
tstart = self.get_sim_time()
progress("Waiting for altitude between %u and %u" % (alt_min, alt_max))
while self.get_sim_time() < tstart + timeout:
m = self.mav.recv_match(type='VFR_HUD', blocking=True)
climb_rate = m.alt - previous_alt
previous_alt = m.alt
progress("Wait Altitude: Cur:%u, min_alt:%u, climb_rate: %u" % (m.alt, alt_min, climb_rate))
if m.alt >= alt_min and m.alt <= alt_max:
progress("Altitude OK")
return True
progress("Failed to attain altitude range")
return False
def wait_groundspeed(mav, gs_min, gs_max, timeout=30): def wait_groundspeed(self, gs_min, gs_max, timeout=30):
"""Wait for a given ground speed range.""" """Wait for a given ground speed range."""
tstart = get_sim_time(mav) tstart = self.get_sim_time()
progress("Waiting for groundspeed between %.1f and %.1f" % (gs_min, gs_max)) progress("Waiting for groundspeed between %.1f and %.1f" % (gs_min, gs_max))
while get_sim_time(mav) < tstart + timeout: while self.get_sim_time() < tstart + timeout:
m = mav.recv_match(type='VFR_HUD', blocking=True) m = self.mav.recv_match(type='VFR_HUD', blocking=True)
progress("Wait groundspeed %.1f, target:%.1f" % (m.groundspeed, gs_min)) progress("Wait groundspeed %.1f, target:%.1f" % (m.groundspeed, gs_min))
if m.groundspeed >= gs_min and m.groundspeed <= gs_max: if m.groundspeed >= gs_min and m.groundspeed <= gs_max:
return True return True
progress("Failed to attain groundspeed range") progress("Failed to attain groundspeed range")
return False return False
def wait_roll(self, roll, accuracy, timeout=30):
"""Wait for a given roll in degrees."""
tstart = self.get_sim_time()
progress("Waiting for roll of %d at %s" % (roll, time.ctime()))
while self.get_sim_time() < tstart + timeout:
m = self.mav.recv_match(type='ATTITUDE', blocking=True)
p = math.degrees(m.pitch)
r = math.degrees(m.roll)
progress("Roll %d Pitch %d" % (r, p))
if math.fabs(r - roll) <= accuracy:
progress("Attained roll %d" % roll)
return True
progress("Failed to attain roll %d" % roll)
return False
def wait_roll(mav, roll, accuracy, timeout=30): def wait_pitch(self, pitch, accuracy, timeout=30):
"""Wait for a given roll in degrees.""" """Wait for a given pitch in degrees."""
tstart = get_sim_time(mav) tstart = self.get_sim_time()
progress("Waiting for roll of %d at %s" % (roll, time.ctime())) progress("Waiting for pitch of %u at %s" % (pitch, time.ctime()))
while get_sim_time(mav) < tstart + timeout: while self.get_sim_time() < tstart + timeout:
m = mav.recv_match(type='ATTITUDE', blocking=True) m = self.mav.recv_match(type='ATTITUDE', blocking=True)
p = math.degrees(m.pitch) p = math.degrees(m.pitch)
r = math.degrees(m.roll) r = math.degrees(m.roll)
progress("Roll %d Pitch %d" % (r, p)) progress("Pitch %d Roll %d" % (p, r))
if math.fabs(r - roll) <= accuracy: if math.fabs(p - pitch) <= accuracy:
progress("Attained roll %d" % roll) progress("Attained pitch %d" % pitch)
return True return True
progress("Failed to attain roll %d" % roll) progress("Failed to attain pitch %d" % pitch)
return False return False
def wait_heading(self, heading, accuracy=5, timeout=30):
"""Wait for a given heading."""
tstart = self.get_sim_time()
progress("Waiting for heading %u with accuracy %u" % (heading, accuracy))
while self.get_sim_time() < tstart + timeout:
m = self.mav.recv_match(type='VFR_HUD', blocking=True)
progress("Heading %u" % m.heading)
if math.fabs(m.heading - heading) <= accuracy:
progress("Attained heading %u" % heading)
return True
progress("Failed to attain heading %u" % heading)
return False
def wait_pitch(mav, pitch, accuracy, timeout=30): def wait_distance(self, distance, accuracy=5, timeout=30):
"""Wait for a given pitch in degrees.""" """Wait for flight of a given distance."""
tstart = get_sim_time(mav) tstart = self.get_sim_time()
progress("Waiting for pitch of %u at %s" % (pitch, time.ctime())) start = self.mav.location()
while get_sim_time(mav) < tstart + timeout: while self.get_sim_time() < tstart + timeout:
m = mav.recv_match(type='ATTITUDE', blocking=True) pos = self.mav.location()
p = math.degrees(m.pitch) delta = self.get_distance(start, pos)
r = math.degrees(m.roll) progress("Distance %.2f meters" % delta)
progress("Pitch %d Roll %d" % (p, r)) if math.fabs(delta - distance) <= accuracy:
if math.fabs(p - pitch) <= accuracy: progress("Attained distance %.2f meters OK" % delta)
progress("Attained pitch %d" % pitch) return True
return True if delta > (distance + accuracy):
progress("Failed to attain pitch %d" % pitch) progress("Failed distance - overshoot delta=%f distance=%f" % (delta, distance))
return False return False
progress("Failed to attain distance %u" % distance)
return False
def wait_location(self, loc, accuracy=5, timeout=30, target_altitude=None, height_accuracy=-1):
"""Wait for arrival at a location."""
tstart = self.get_sim_time()
if target_altitude is None:
target_altitude = loc.alt
progress("Waiting for location %.4f,%.4f at altitude %.1f height_accuracy=%.1f" % (
loc.lat, loc.lng, target_altitude, height_accuracy))
while self.get_sim_time() < tstart + timeout:
pos = self.mav.location()
delta = self.get_distance(loc, pos)
progress("Distance %.2f meters alt %.1f" % (delta, pos.alt))
if delta <= accuracy:
if height_accuracy != -1 and math.fabs(pos.alt - target_altitude) > height_accuracy:
continue
progress("Reached location (%.2f meters)" % delta)
return True
progress("Failed to attain location")
return False
def wait_heading(mav, heading, accuracy=5, timeout=30): def wait_waypoint(self, wpnum_start, wpnum_end, allow_skip=True, max_dist=2, timeout=400):
"""Wait for a given heading.""" """Wait for waypoint ranges."""
tstart = get_sim_time(mav) tstart = self.get_sim_time()
progress("Waiting for heading %u with accuracy %u" % (heading, accuracy)) # this message arrives after we set the current WP
while get_sim_time(mav) < tstart + timeout: start_wp = self.mav.waypoint_current()
m = mav.recv_match(type='VFR_HUD', blocking=True) current_wp = start_wp
progress("Heading %u" % m.heading) mode = self.mav.flightmode
if math.fabs(m.heading - heading) <= accuracy:
progress("Attained heading %u" % heading)
return True
progress("Failed to attain heading %u" % heading)
return False
progress("\ntest: wait for waypoint ranges start=%u end=%u\n\n" % (wpnum_start, wpnum_end))
# if start_wp != wpnum_start:
# progress("test: Expected start waypoint %u but got %u" % (wpnum_start, start_wp))
# return False
def wait_distance(mav, distance, accuracy=5, timeout=30): while self.get_sim_time() < tstart + timeout:
"""Wait for flight of a given distance.""" seq = self.mav.waypoint_current()
tstart = get_sim_time(mav) m = self.mav.recv_match(type='NAV_CONTROLLER_OUTPUT', blocking=True)
start = mav.location() wp_dist = m.wp_dist
while get_sim_time(mav) < tstart + timeout: m = self.mav.recv_match(type='VFR_HUD', blocking=True)
pos = mav.location()
delta = get_distance(start, pos)
progress("Distance %.2f meters" % delta)
if math.fabs(delta - distance) <= accuracy:
progress("Attained distance %.2f meters OK" % delta)
return True
if delta > (distance + accuracy):
progress("Failed distance - overshoot delta=%f distance=%f" % (delta, distance))
return False
progress("Failed to attain distance %u" % distance)
return False
# if we changed mode, fail
if self.mav.flightmode != mode:
progress('Exited %s mode' % mode)
return False
def wait_location(mav, loc, accuracy=5, timeout=30, target_altitude=None, height_accuracy=-1): progress("test: WP %u (wp_dist=%u Alt=%d), current_wp: %u, wpnum_end: %u" % (seq, wp_dist, m.alt, current_wp, wpnum_end))
"""Wait for arrival at a location.""" if seq == current_wp+1 or (seq > current_wp+1 and allow_skip):
tstart = get_sim_time(mav) progress("test: Starting new waypoint %u" % seq)
if target_altitude is None: tstart = self.get_sim_time()
target_altitude = loc.alt current_wp = seq
progress("Waiting for location %.4f,%.4f at altitude %.1f height_accuracy=%.1f" % ( # the wp_dist check is a hack until we can sort out the right seqnum
loc.lat, loc.lng, target_altitude, height_accuracy)) # for end of mission
while get_sim_time(mav) < tstart + timeout: # if current_wp == wpnum_end or (current_wp == wpnum_end-1 and wp_dist < 2):
pos = mav.location() if current_wp == wpnum_end and wp_dist < max_dist:
delta = get_distance(loc, pos) progress("Reached final waypoint %u" % seq)
progress("Distance %.2f meters alt %.1f" % (delta, pos.alt)) return True
if delta <= accuracy: if seq >= 255:
if height_accuracy != -1 and math.fabs(pos.alt - target_altitude) > height_accuracy: progress("Reached final waypoint %u" % seq)
continue return True
progress("Reached location (%.2f meters)" % delta) if seq > current_wp+1:
return True progress("Failed: Skipped waypoint! Got wp %u expected %u" % (seq, current_wp+1))
progress("Failed to attain location") return False
return False progress("Failed: Timed out waiting for waypoint %u of %u" % (wpnum_end, wpnum_end))
return False
def wait_mode(self, mode, timeout=None):
"""Wait for mode to change."""
progress("Waiting for mode %s" % mode)
tstart = self.get_sim_time()
hastimeout = False
while self.mav.flightmode.upper() != mode.upper() and not hastimeout:
if timeout is not None:
hastimeout = self.get_sim_time() > tstart + timeout
self.mav.wait_heartbeat()
progress("Got mode %s" % mode)
return self.mav.flightmode
def wait_waypoint(mav, wpnum_start, wpnum_end, allow_skip=True, max_dist=2, timeout=400): def wait_ready_to_arm(self, timeout=None):
"""Wait for waypoint ranges.""" # wait for EKF checks to pass
tstart = get_sim_time(mav) return self.wait_ekf_happy(timeout=timeout)
# this message arrives after we set the current WP
start_wp = mav.waypoint_current()
current_wp = start_wp
mode = mav.flightmode
progress("\ntest: wait for waypoint ranges start=%u end=%u\n\n" % (wpnum_start, wpnum_end)) def wait_ekf_happy(self, timeout=30):
# if start_wp != wpnum_start: """Wait for EKF to be happy"""
# progress("test: Expected start waypoint %u but got %u" % (wpnum_start, start_wp))
# return False
while get_sim_time(mav) < tstart + timeout: tstart = self.get_sim_time()
seq = mav.waypoint_current() required_value = 831
m = mav.recv_match(type='NAV_CONTROLLER_OUTPUT', blocking=True) progress("Waiting for EKF value %u" % required_value)
wp_dist = m.wp_dist while timeout is None or self.get_sim_time() < tstart + timeout:
m = mav.recv_match(type='VFR_HUD', blocking=True) m = self.mav.recv_match(type='EKF_STATUS_REPORT', blocking=True)
current = m.flags
if (tstart - self.get_sim_time()) % 5 == 0:
progress("Wait EKF.flags: required:%u current:%u" % (required_value, current))
if current == required_value:
progress("EKF Flags OK")
return
progress("Failed to get EKF.flags=%u" % required_value)
raise AutoTestTimeoutException()
# if we changed mode, fail @abc.abstractmethod
if mav.flightmode != mode: def init(self):
progress('Exited %s mode' % mode) """Initilialize autotest feature."""
return False pass
progress("test: WP %u (wp_dist=%u Alt=%d), current_wp: %u, wpnum_end: %u" % (seq, wp_dist, m.alt, current_wp, wpnum_end)) # def test_common_feature(self):
if seq == current_wp+1 or (seq > current_wp+1 and allow_skip): # """Common feature to test."""
progress("test: Starting new waypoint %u" % seq) # sucess = True
tstart = get_sim_time(mav) # # TEST ARMING/DISARM
current_wp = seq # if not self.arm_vehicle():
# the wp_dist check is a hack until we can sort out the right seqnum # progress("Failed to ARM")
# for end of mission # sucess = False
# if current_wp == wpnum_end or (current_wp == wpnum_end-1 and wp_dist < 2): # if not self.disarm_vehicle():
if (current_wp == wpnum_end and wp_dist < max_dist): # progress("Failed to DISARM")
progress("Reached final waypoint %u" % seq) # sucess = False
return True # if not self.test_arm_motors_radio():
if (seq >= 255): # progress("Failed to ARM with radio")
progress("Reached final waypoint %u" % seq) # sucess = False
return True # if not self.test_disarm_motors_radio():
if seq > current_wp+1: # progress("Failed to ARM with radio")
progress("Failed: Skipped waypoint! Got wp %u expected %u" % (seq, current_wp+1)) # sucess = False
return False # if not self.test_autodisarm_motors():
progress("Failed: Timed out waiting for waypoint %u of %u" % (wpnum_end, wpnum_end)) # progress("Failed to AUTO DISARM")
return False # sucess = False
# # TODO: Test failure on arm (with arming check)
# # TEST MISSION FILE
# # TODO : rework that to work on autotest server
# # progress("TEST LOADING MISSION")
# # num_wp = self.load_mission_from_file(os.path.join(testdir, "fake_mission.txt"))
# # if num_wp == 0:
# # progress("Failed to load all_msg_mission")
# # sucess = False
# #
# # progress("TEST SAVING MISSION")
# # num_wp_old = num_wp
# # num_wp = self.save_mission_to_file(os.path.join(testdir, "fake_mission2.txt"))
# # if num_wp != num_wp_old:
# # progress("Failed to save all_msg_mission")
# # sucess = False
#
# progress("TEST CLEARING MISSION")
# self.mavproxy.send("wp clear\n")
# self.mavproxy.send('wp list\n')
# self.mavproxy.expect('Requesting [0-9]+ waypoints')
# num_wp = mavwp.MAVWPLoader().count()
# if num_wp != 0:
# progress("Failed to clear mission ")
# sucess = False
#
# return sucess
#
# # TESTS FAILSAFE
# @abc.abstractmethod
# def test_throttle_failsafe(self, home, distance_min=10, side=60, timeout=180):
# """Test that RTL success in case of thottle failsafe."""
# pass
#
# # TEST ARM RADIO
# @abc.abstractmethod
# def test_arm_motors_radio(self):
# """Test arming with RC sticks."""
# pass
#
# # TEST DISARM RADIO
# @abc.abstractmethod
# def test_disarm_motors_radio(self):
# """Test disarming with RC sticks."""
# pass
#
# # TEST AUTO DISARM
# @abc.abstractmethod
# def test_autodisarm_motors(self):
# """Test auto disarming."""
# pass
#
# # TEST RC OVERRIDE
# # TEST RC OVERRIDE TIMEOUT
# @abc.abstractmethod
# def test_rtl(self, home, distance_min=10, timeout=250):
# """Test that RTL success."""
# progress("# Enter RTL")
# self.mavproxy.send('switch 3\n')
# tstart = self.get_sim_time()
# while self.get_sim_time() < tstart + timeout:
# m = self.mav.recv_match(type='VFR_HUD', blocking=True)
# pos = self.mav.location()
# home_distance = self.get_distance(home, pos)
# progress("Alt: %u HomeDistance: %.0f" % (m.alt, home_distance))
# if m.alt <= 1 and home_distance < distance_min:
# progress("RTL Complete")
# return True
# return False
#
# @abc.abstractmethod
# def test_mission(self, filename):
# pass
@abc.abstractmethod
def wait_mode(mav, mode, timeout=None): def autotest(self):
"""Wait for mode to change.""" """Autotest used by ArduPilot autotest CI."""
progress("Waiting for mode %s" % mode) pass
mav.wait_heartbeat()
mav.recv_match(condition='MAV.flightmode.upper()=="%s".upper()' % mode, timeout=timeout, blocking=True)
progress("Got mode %s" % mode)
return mav.flightmode
def wait_ready_to_arm(mav, timeout=None):
# wait for EKF checks to pass
return wait_ekf_happy(mav, timeout=timeout)
def wait_ekf_happy(mav, timeout=30):
"""Wait for EKF to be happy"""
tstart = get_sim_time(mav)
required_value = 831
progress("Waiting for EKF value %u" % (required_value))
while timeout is None or get_sim_time(mav) < tstart + timeout:
m = mav.recv_match(type='EKF_STATUS_REPORT', blocking=True)
current = m.flags
if (tstart - get_sim_time(mav)) % 5 == 0:
progress("Wait EKF.flags: required:%u current:%u" % (required_value, current))
if current == required_value:
progress("EKF Flags OK")
return
progress("Failed to get EKF.flags=%u" % required_value)
raise AutoTestTimeoutException()

View File

@ -12,124 +12,174 @@ from pysim import util
# get location of scripts # get location of scripts
testdir = os.path.dirname(os.path.realpath(__file__)) testdir = os.path.dirname(os.path.realpath(__file__))
HOME = mavutil.location(-27.274439, 151.290064, 343, 8.7)
HOME_LOCATION = '-27.274439,151.290064,343,8.7'
MISSION = 'ArduPlane-Missions/Dalby-OBC2016.txt' MISSION = 'ArduPlane-Missions/Dalby-OBC2016.txt'
FENCE = 'ArduPlane-Missions/Dalby-OBC2016-fence.txt' FENCE = 'ArduPlane-Missions/Dalby-OBC2016-fence.txt'
WIND = "0,180,0.2" # speed,direction,variance WIND = "0,180,0.2" # speed,direction,variance
homeloc = None
class AutotestQuadPlane(Autotest):
def __init__(self, binary, viewerip=None, use_map=False, valgrind=False, gdb=False, speedup=10, frame=None, params=None, gdbserver=False):
super(AutotestQuadPlane, self).__init__()
self.binary = binary
self.options = '--sitl=127.0.0.1:5501 --out=127.0.0.1:19550 --streamrate=10'
self.viewerip = viewerip
self.use_map = use_map
self.valgrind = valgrind
self.gdb = gdb
self.frame = frame
self.params = params
self.gdbserver = gdbserver
def fly_mission(mavproxy, mav, filename, fence, height_accuracy=-1): self.home = "%f,%f,%u,%u" % (HOME.lat, HOME.lng, HOME.alt, HOME.heading)
"""Fly a mission from a file.""" self.homeloc = None
progress("Flying mission %s" % filename) self.speedup = speedup
mavproxy.send('wp load %s\n' % filename) self.speedup_default = 10
mavproxy.expect('Flight plan received')
mavproxy.send('fence load %s\n' % fence)
mavproxy.send('wp list\n')
mavproxy.expect('Requesting [0-9]+ waypoints')
mavproxy.send('mode AUTO\n')
wait_mode(mav, 'AUTO')
if not wait_waypoint(mav, 1, 19, max_dist=60, timeout=1200):
return False
mavproxy.expect('DISARMED')
# wait for blood sample here
mavproxy.send('wp set 20\n')
arm_vehicle(mavproxy, mav)
if not wait_waypoint(mav, 20, 34, max_dist=60, timeout=1200):
return False
mavproxy.expect('DISARMED')
progress("Mission OK")
return True
self.log_name = "ArduCopter"
self.logfile = None
self.buildlog = None
self.copy_tlog = False
def fly_QuadPlane(binary, viewerip=None, use_map=False, valgrind=False, gdb=False, gdbserver=False, speedup=10): self.sitl = None
"""Fly QuadPlane in SITL. self.hasInit = False
you can pass viewerip as an IP address to optionally send fg and def init(self):
mavproxy packets too for local viewing of the flight in real time. if self.frame is None:
""" self.frame = 'quadplane'
global homeloc
options = '--sitl=127.0.0.1:5501 --out=127.0.0.1:19550 --streamrate=10' if self.viewerip:
if viewerip: self.options += " --out=%s:14550" % self.viewerip
options += " --out=%s:14550" % viewerip if self.use_map:
if use_map: self.options += ' --map'
options += ' --map'
sitl = util.start_SITL(binary, model='quadplane', wipe=True, home=HOME_LOCATION, speedup=speedup, self.sitl = util.start_SITL(self.binary, wipe=True, model=self.frame, home=self.home, speedup=self.speedup,
defaults_file=os.path.join(testdir, 'default_params/quadplane.parm'), valgrind=valgrind, gdb=gdb, gdbserver=gdbserver) defaults_file=os.path.join(testdir, 'default_params/quadplane.parm'),
mavproxy = util.start_MAVProxy_SITL('QuadPlane', options=options) valgrind=self.valgrind, gdb=self.gdb, gdbserver=self.gdbserver)
mavproxy.expect('Telemetry log: (\S+)') self.mavproxy = util.start_MAVProxy_SITL('QuadPlane', options=self.options)
logfile = mavproxy.match.group(1) self.mavproxy.expect('Telemetry log: (\S+)')
progress("LOGFILE %s" % logfile) logfile = self.mavproxy.match.group(1)
progress("LOGFILE %s" % logfile)
buildlog = util.reltopdir("../buildlogs/QuadPlane-test.tlog") buildlog = util.reltopdir("../buildlogs/QuadPlane-test.tlog")
progress("buildlog=%s" % buildlog) progress("buildlog=%s" % buildlog)
if os.path.exists(buildlog): if os.path.exists(buildlog):
os.unlink(buildlog) os.unlink(buildlog)
try: try:
os.link(logfile, buildlog) os.link(logfile, buildlog)
except Exception: except Exception:
pass pass
util.expect_setup_callback(mavproxy, expect_callback) self.mavproxy.expect('Received [0-9]+ parameters')
mavproxy.expect('Received [0-9]+ parameters') util.expect_setup_callback(self.mavproxy, expect_callback)
expect_list_clear() expect_list_clear()
expect_list_extend([sitl, mavproxy]) expect_list_extend([self.sitl, self.mavproxy])
progress("Started simulator") progress("Started simulator")
# get a mavlink connection going # get a mavlink connection going
try: try:
mav = mavutil.mavlink_connection('127.0.0.1:19550', robust_parsing=True) self.mav = mavutil.mavlink_connection('127.0.0.1:19550', robust_parsing=True)
except Exception as msg: except Exception as msg:
progress("Failed to start mavlink connection on 127.0.0.1:19550" % msg) progress("Failed to start mavlink connection on 127.0.0.1:19550" % msg)
raise raise
mav.message_hooks.append(message_hook) self.mav.message_hooks.append(message_hook)
mav.idle_hooks.append(idle_hook) self.mav.idle_hooks.append(idle_hook)
self.hasInit = True
progress("Ready to start testing!")
failed = False def close(self):
e = 'None' if self.use_map:
try: self.mavproxy.send("module unload map\n")
progress("Waiting for a heartbeat with mavlink protocol %s" % mav.WIRE_PROTOCOL_VERSION) self.mavproxy.expect("Unloaded module map")
mav.wait_heartbeat()
progress("Waiting for GPS fix")
mav.recv_match(condition='VFR_HUD.alt>10', blocking=True)
mav.wait_gps_fix()
while mav.location().alt < 10:
mav.wait_gps_fix()
homeloc = mav.location()
progress("Home location: %s" % homeloc)
# wait for EKF and GPS checks to pass self.mav.close()
wait_seconds(mav, 30) util.pexpect_close(self.mavproxy)
util.pexpect_close(self.sitl)
arm_vehicle(mavproxy, mav) valgrind_log = util.valgrind_log_filepath(binary=self.binary, model=self.frame)
if os.path.exists(valgrind_log):
os.chmod(valgrind_log, 0o644)
shutil.copy(valgrind_log, util.reltopdir("../buildlogs/QuadPlane-valgrind.log"))
if not fly_mission(mavproxy, mav, # def test_arm_motors_radio(self):
os.path.join(testdir, "ArduPlane-Missions/Dalby-OBC2016.txt"), # super(AutotestQuadPlane, self).test_arm_motors_radio()
os.path.join(testdir, "ArduPlane-Missions/Dalby-OBC2016-fence.txt")): #
progress("Failed mission") # def test_disarm_motors_radio(self):
# super(AutotestQuadPlane, self).test_disarm_motors_radio()
#
# def test_autodisarm_motors(self):
# super(AutotestQuadPlane, self).test_autodisarm_motors()
#
# def test_rtl(self, home, distance_min=10, timeout=250):
# super(AutotestQuadPlane, self).test_rtl(home, distance_min=10, timeout=250)
#
# def test_throttle_failsafe(self, home, distance_min=10, side=60, timeout=180):
# super(AutotestQuadPlane, self).test_throttle_failsafe(home, distance_min=10, side=60, timeout=180)
#
# def test_mission(self, filename):
# super(AutotestQuadPlane, self).test_mission(filename)
def fly_mission(self, filename, fence, height_accuracy=-1):
"""Fly a mission from a file."""
progress("Flying mission %s" % filename)
self.mavproxy.send('wp load %s\n' % filename)
self.mavproxy.expect('Flight plan received')
self.mavproxy.send('fence load %s\n' % fence)
self.mavproxy.send('wp list\n')
self.mavproxy.expect('Requesting [0-9]+ waypoints')
self.mavproxy.send('mode AUTO\n')
self.wait_mode('AUTO')
if not self.wait_waypoint(1, 19, max_dist=60, timeout=1200):
return False
self.mavproxy.expect('DISARMED')
# wait for blood sample here
self.mavproxy.send('wp set 20\n')
self.arm_vehicle()
if not self.wait_waypoint(20, 34, max_dist=60, timeout=1200):
return False
self.mavproxy.expect('DISARMED')
progress("Mission OK")
return True
def autotest(self):
"""Autotest QuadPlane in SITL."""
self.frame = 'quadplane'
if not self.hasInit:
self.init()
failed = False
e = 'None'
try:
progress("Waiting for a heartbeat with mavlink protocol %s" % self.mav.WIRE_PROTOCOL_VERSION)
self.mav.wait_heartbeat()
progress("Waiting for GPS fix")
self.mav.recv_match(condition='VFR_HUD.alt>10', blocking=True)
self.mav.wait_gps_fix()
while self.mav.location().alt < 10:
self.mav.wait_gps_fix()
self.homeloc = self.mav.location()
progress("Home location: %s" % self.homeloc)
# wait for EKF and GPS checks to pass
progress("Waiting reading for arm")
self.wait_seconds(30)
self.arm_vehicle()
if not self.fly_mission(os.path.join(testdir, "ArduPlane-Missions/Dalby-OBC2016.txt"),
os.path.join(testdir, "ArduPlane-Missions/Dalby-OBC2016-fence.txt")):
progress("Failed mission")
failed = True
except pexpect.TIMEOUT as e:
progress("Failed with timeout")
failed = True failed = True
except pexpect.TIMEOUT as e:
progress("Failed with timeout")
failed = True
mav.close() self.close()
util.pexpect_close(mavproxy)
util.pexpect_close(sitl)
valgrind_log = util.valgrind_log_filepath(binary=binary, model='quadplane') if failed:
if os.path.exists(valgrind_log): progress("FAILED: %s" % e)
os.chmod(valgrind_log, 0o644) return False
shutil.copy(valgrind_log, util.reltopdir("../buildlogs/QuadPlane-valgrind.log")) return True
if failed:
progress("FAILED: %s" % e)
return False
return True