Merge remote-tracking branch 'origin/master'

This commit is contained in:
Randy Mackay 2011-11-13 22:22:35 +09:00
commit 4d22385266
8 changed files with 555 additions and 196 deletions

View File

@ -24,9 +24,9 @@ RC7_TRIM 1500.000000
RC8_MAX 2000.000000
RC8_MIN 1000.000000
RC8_TRIM 1500.000000
FLTMODE1 7
FLTMODE2 5
FLTMODE3 4
FLTMODE4 3
FLTMODE5 1
FLTMODE1 7
FLTMODE2 1
FLTMODE3 2
FLTMODE4 3
FLTMODE5 5
FLTMODE6 0

View File

@ -0,0 +1,38 @@
LOG_BITMASK 4095
SWITCH_ENABLE 0
RC2_REV -1
RC1_MAX 2000
RC1_MIN 1000
RC1_TRIM 1500
RC2_MAX 2000
RC2_MIN 1000
RC2_TRIM 1500
RC3_MAX 2000
RC3_MIN 1000
RC3_TRIM 1500
RC4_MAX 2000
RC4_MIN 1000
RC4_TRIM 1500
RC5_MAX 2000
RC5_MIN 1000
RC5_TRIM 1500
RC6_MAX 2000
RC6_MIN 1000
RC6_TRIM 1500
RC7_MAX 2000
RC7_MIN 1000
RC7_TRIM 1500
RC8_MAX 2000
RC8_MIN 1000
RC8_TRIM 1500
FLTMODE1 10
FLTMODE2 11
FLTMODE3 12
FLTMODE4 5
FLTMODE5 2
FLTMODE6 0
FLTMODE_CH 8
PTCH2SRV_P 2
RLL2SRV_I 0.1
RLL2SRV_P 1.2
INVERTEDFLT_CH 7

7
Tools/autotest/ap1.txt Normal file
View File

@ -0,0 +1,7 @@
QGC WPL 110
0 1 0 16 0.000000 0.000000 0.000000 0.000000 -35.362881 149.165222 582.000000 1
1 0 3 16 0.000000 0.000000 0.000000 0.000000 -35.361553 149.163956 120.000000 1
2 0 3 16 0.000000 0.000000 0.000000 0.000000 -35.364540 149.162857 120.000000 1
3 0 3 16 0.000000 0.000000 0.000000 0.000000 -35.361721 149.161835 120.000000 1
4 0 3 16 0.000000 0.000000 0.000000 0.000000 -35.364170 149.164627 120.000000 1
5 0 3 20 0.000000 0.000000 0.000000 0.000000 -35.363289 149.165253 50.000000 1

View File

@ -1,6 +1,7 @@
# fly ArduCopter in SIL
import util, pexpect, sys, time, math, shutil, os
from common import *
# get location of scripts
testdir=os.path.dirname(os.path.realpath(__file__))
@ -13,120 +14,11 @@ HOME_LOCATION='-35.362938,149.165085,584,270'
homeloc = None
num_wp = 0
# a list of pexpect objects to read while waiting for
# messages. This keeps the output to stdout flowing
expect_list = []
def message_hook(mav, msg):
'''called as each mavlink msg is received'''
global expect_list
#if msg.get_type() in [ 'NAV_CONTROLLER_OUTPUT', 'GPS_RAW' ]:
# print(msg)
for p in expect_list:
try:
p.read_nonblocking(100, timeout=0)
except pexpect.TIMEOUT:
pass
def expect_callback(e):
'''called when waiting for a expect pattern'''
global expect_list
for p in expect_list:
if p == e:
continue
try:
while p.read_nonblocking(100, timeout=0):
pass
except pexpect.TIMEOUT:
pass
class location(object):
'''represent a GPS coordinate'''
def __init__(self, lat, lng, alt=0):
self.lat = lat
self.lng = lng
self.alt = alt
def get_distance(loc1, loc2):
'''get ground distance between two locations'''
dlat = loc2.lat - loc1.lat
dlong = loc2.lng - loc1.lng
return math.sqrt((dlat*dlat) + (dlong*dlong)) * 1.113195e5
def get_bearing(loc1, loc2):
'''get bearing from loc1 to loc2'''
off_x = loc2.lng - loc1.lng
off_y = loc2.lat - loc1.lat
bearing = 90.00 + math.atan2(-off_y, off_x) * 57.2957795
if bearing < 0:
bearing += 360.00
return bearing;
def current_location(mav):
'''return current location'''
# ensure we have a position
mav.recv_match(type='VFR_HUD', blocking=True)
mav.recv_match(type='GPS_RAW', blocking=True)
return location(mav.messages['GPS_RAW'].lat,
mav.messages['GPS_RAW'].lon,
mav.messages['VFR_HUD'].alt)
def wait_altitude(mav, alt_min, alt_max, timeout=60):
'''wait for a given altitude range'''
tstart = time.time()
print("Waiting for altitude between %u and %u" % (alt_min, alt_max))
while time.time() < tstart + timeout:
m = mav.recv_match(type='VFR_HUD', blocking=True)
print("Altitude %u" % m.alt)
if m.alt >= alt_min and m.alt <= alt_max:
return True
print("Failed to attain altitude range")
return False
def wait_waypoint(mav, wpnum_start, wpnum_end, allow_skip=True, timeout=60):
'''wait for waypoint ranges'''
tstart = time.time()
m = mav.recv_match(type='WAYPOINT_CURRENT', blocking=True)
start_wp = m.seq
current_wp = start_wp
print("\n***wait for waypoint ranges***\n\n\n")
if start_wp != wpnum_start:
print("Expected start waypoint %u but got %u" % (wpnum_start, start_wp))
return False
while time.time() < tstart + timeout:
m = mav.recv_match(type='WAYPOINT_CURRENT', blocking=True)
print("WP %u" % m.seq)
if m.seq == current_wp:
continue
if m.seq == current_wp+1 or (m.seq > current_wp+1 and allow_skip):
print("Starting new waypoint %u" % m.seq)
tstart = time.time()
current_wp = m.seq
if current_wp == wpnum_end:
print("Reached final waypoint %u" % m.seq)
return True
if m.seq > current_wp+1:
print("Skipped waypoint! Got wp %u expected %u" % (m.seq, current_wp+1))
return False
print("Timed out waiting for waypoint %u" % wpnum_end)
return False
def save_wp(mavproxy, mav):
mavproxy.send('rc 7 2000\n')
mav.recv_match(condition='RC_CHANNELS_RAW.chan7_raw==2000', blocking=True)
mavproxy.send('rc 7 1000\n')
mav.recv_match(condition='RC_CHANNELS_RAW.chan7_raw==1000', blocking=True)
mavproxy.send('wp list\n')
def arm_motors(mavproxy):
def arm_motors(mavproxy, mav):
'''arm motors'''
print("Arming motors")
mavproxy.send('switch 6\n') # stabilize mode
mavproxy.expect('STABILIZE>')
wait_mode(mav, 'STABILIZE')
mavproxy.send('rc 3 1000\n')
mavproxy.send('rc 4 2000\n')
mavproxy.expect('APM: ARMING MOTORS')
@ -134,7 +26,7 @@ def arm_motors(mavproxy):
print("MOTORS ARMED OK")
return True
def disarm_motors(mavproxy):
def disarm_motors(mavproxy, mav):
'''disarm motors'''
print("Disarming motors")
mavproxy.send('switch 6\n') # stabilize mode
@ -149,7 +41,7 @@ def disarm_motors(mavproxy):
def takeoff(mavproxy, mav):
'''takeoff get to 30m altitude'''
mavproxy.send('switch 6\n') # stabilize mode
mavproxy.expect('STABILIZE>')
wait_mode(mav, 'STABILIZE')
mavproxy.send('rc 3 1500\n')
wait_altitude(mav, 30, 40)
print("TAKEOFF COMPLETE")
@ -158,10 +50,8 @@ def takeoff(mavproxy, mav):
def loiter(mavproxy, mav, maxaltchange=10, holdtime=10, timeout=60):
'''hold loiter position'''
mavproxy.send('switch 2\n') # loiter mode
mavproxy.expect('LOITER>')
mavproxy.send('status\n')
mavproxy.expect('>')
mavproxy.send('switch 5\n') # loiter mode
wait_mode(mav, 'LOITER')
m = mav.recv_match(type='VFR_HUD', blocking=True)
start_altitude = m.alt
tstart = time.time()
@ -179,56 +69,10 @@ def loiter(mavproxy, mav, maxaltchange=10, holdtime=10, timeout=60):
return False
def wait_heading(mav, heading, accuracy=5, timeout=30):
'''wait for a given heading'''
tstart = time.time()
while time.time() < tstart + timeout:
m = mav.recv_match(type='VFR_HUD', blocking=True)
print("Heading %u" % m.heading)
if math.fabs(m.heading - heading) <= accuracy:
return True
print("Failed to attain heading %u" % heading)
return False
def wait_distance(mav, distance, accuracy=5, timeout=30):
'''wait for flight of a given distance'''
tstart = time.time()
start = current_location(mav)
while time.time() < tstart + timeout:
m = mav.recv_match(type='GPS_RAW', blocking=True)
pos = current_location(mav)
delta = get_distance(start, pos)
print("Distance %.2f meters" % delta)
if math.fabs(delta - distance) <= accuracy:
return True
print("Failed to attain distance %u" % distance)
return False
def wait_location(mav, loc, accuracy=5, timeout=30, target_altitude=None, height_accuracy=-1):
'''wait for arrival at a location'''
tstart = time.time()
if target_altitude is None:
target_altitude = loc.alt
while time.time() < tstart + timeout:
m = mav.recv_match(type='GPS_RAW', blocking=True)
pos = current_location(mav)
delta = get_distance(loc, pos)
print("Distance %.2f meters" % delta)
if delta <= accuracy:
if height_accuracy != -1 and math.fabs(pos.alt - target_altitude) > height_accuracy:
continue
print("Reached location (%.2f meters)" % delta)
return True
print("Failed to attain location")
return False
def fly_square(mavproxy, mav, side=50, timeout=120):
'''fly a square, flying N then E'''
mavproxy.send('switch 6\n')
mavproxy.expect('STABILIZE>')
wait_mode(mav, 'STABILIZE')
tstart = time.time()
failed = False
@ -288,9 +132,7 @@ def land(mavproxy, mav, timeout=60):
'''land the quad'''
print("STARTING LANDING")
mavproxy.send('switch 6\n')
mavproxy.expect('STABILIZE>')
mavproxy.send('status\n')
mavproxy.expect('>')
wait_mode(mav, 'STABILIZE')
# start by dropping throttle till we have lost 5m
mavproxy.send('rc 3 1380\n')
@ -301,20 +143,16 @@ def land(mavproxy, mav, timeout=60):
mavproxy.send('rc 3 1400\n')
tstart = time.time()
if wait_altitude(mav, -5, 0):
print("LANDING OK")
return True
else:
print("LANDING FAILED")
return False
ret = wait_altitude(mav, -5, 0)
print("LANDING: ok=%s" % ret)
return ret
def circle(mavproxy, mav, maxaltchange=10, holdtime=90, timeout=35):
'''fly circle'''
print("FLY CIRCLE")
mavproxy.send('switch 1\n') # CIRCLE mode
mavproxy.expect('CIRCLE>')
mavproxy.send('status\n')
mavproxy.expect('>')
wait_mode(mav, 'CIRCLE')
m = mav.recv_match(type='VFR_HUD', blocking=True)
start_altitude = m.alt
tstart = time.time()
@ -333,21 +171,27 @@ def fly_mission(mavproxy, mav, height_accuracy=-1, target_altitude=None):
print("Fly a mission")
global homeloc
global num_wp
mavproxy.send('wp set 1\n')
mavproxy.send('switch 4\n') # auto mode
mavproxy.expect('AUTO>')
wait_mode(mav, 'AUTO')
wait_altitude(mav, 30, 40)
if wait_waypoint(mav, 1, num_wp):
print("MISSION COMPLETE")
return True
else:
return False
#wait_altitude(mav, 30, 40)
ret = wait_waypoint(mav, 1, num_wp, timeout=90)
print("MISSION COMPLETE: passed=%s" % ret)
# wait here until ready
mavproxy.send('switch 5\n')
wait_mode(mav, 'LOITER')
return ret
#if not wait_distance(mav, 30, timeout=120):
# return False
#if not wait_location(mav, homeloc, timeout=600, target_altitude=target_altitude, height_accuracy=height_accuracy):
# return False
def load_mission(mavproxy, mav, filename):
'''load a mission from a file'''
global num_wp
@ -361,7 +205,8 @@ def load_mission(mavproxy, mav, filename):
num_wp = wploader.count()
print("loaded mission")
for i in range(num_wp):
print (dir(wploader.wp(i)))
print wploader.wp(i)
return True
def setup_rc(mavproxy):
'''setup RC override control'''
@ -437,12 +282,13 @@ def fly_ArduCopter(viewerip=None):
failed = False
e = 'None'
try:
mav.wait_heartbeat()
mav.recv_match(type='GPS_RAW', blocking=True)
setup_rc(mavproxy)
homeloc = current_location(mav)
if not arm_motors(mavproxy):
if not arm_motors(mavproxy, mav):
failed = True
if not takeoff(mavproxy, mav):
@ -458,22 +304,24 @@ def fly_ArduCopter(viewerip=None):
if not circle(mavproxy, mav):
failed = True
# fly the stores mission
# fly the stored mission
if not fly_mission(mavproxy, mav,height_accuracy = 0.5, target_altitude=10):
failed = True
#fly_mission(mavproxy, mav, os.path.join(testdir, "mission_ttt.txt"), height_accuracy=0.2)
if not load_mission(mavproxy, mav, os.path.join(testdir, "mission_ttt.txt")):
if not load_mission(mavproxy, mav, os.path.join(testdir, "mission1.txt")):
failed = True
if not fly_mission(mavproxy, mav,height_accuracy = 0.5, target_altitude=10):
failed = True
else:
print("Flew mission_ttt OK")
if not land(mavproxy, mav):
failed = True
if not disarm_motors(mavproxy):
if not disarm_motors(mavproxy, mav):
failed = True
except pexpect.TIMEOUT, e:
failed = True

269
Tools/autotest/arduplane.py Normal file
View File

@ -0,0 +1,269 @@
# fly ArduPlane in SIL
import util, pexpect, sys, time, math, shutil, os
from common import *
# get location of scripts
testdir=os.path.dirname(os.path.realpath(__file__))
sys.path.insert(0, util.reltopdir('../pymavlink'))
import mavutil
HOME_LOCATION='-35.362938,149.165085,584,270'
homeloc = None
def takeoff(mavproxy, mav):
'''takeoff get to 30m altitude'''
mavproxy.send('switch 4\n')
wait_mode(mav, 'FBWA')
# some rudder to counteract the prop torque
mavproxy.send('rc 4 1600\n')
# get it moving a bit first to avoid bad fgear ground physics
mavproxy.send('rc 3 1150\n')
mav.recv_match(condition='VFR_HUD.groundspeed>3', blocking=True)
# a bit faster
mavproxy.send('rc 3 1600\n')
mav.recv_match(condition='VFR_HUD.groundspeed>10', blocking=True)
# hit the gas harder now, and give it some elevator
mavproxy.send('rc 4 1500\n')
mavproxy.send('rc 2 1200\n')
mavproxy.send('rc 3 1800\n')
# gain a bit of altitude
wait_altitude(mav, homeloc.alt+30, homeloc.alt+40, timeout=30)
# level off
mavproxy.send('rc 2 1500\n')
print("TAKEOFF COMPLETE")
return True
def fly_left_circuit(mavproxy, mav):
'''fly a left circuit, 200m on a side'''
mavproxy.send('switch 4\n')
wait_mode(mav, 'FBWA')
print("Flying left circuit")
# do 4 turns
for i in range(0,4):
# hard left
print("Starting turn %u" % i)
mavproxy.send('rc 1 1000\n')
wait_heading(mav, 270 - (90*i))
mavproxy.send('rc 1 1500\n')
print("Starting leg %u" % i)
wait_distance(mav, 100)
print("Circuit complete")
return True
def fly_RTL(mavproxy, mav):
'''fly to home'''
mavproxy.send('switch 2\n')
wait_mode(mav, 'RTL')
wait_location(mav, homeloc, accuracy=30,
target_altitude=100, height_accuracy=10)
print("RTL Complete")
return True
def fly_LOITER(mavproxy, mav):
'''loiter where we are'''
mavproxy.send('switch 3\n')
wait_mode(mav, 'LOITER')
while True:
wait_heading(mav, 0)
wait_heading(mav, 180)
return True
def change_altitude(mavproxy, mav, altitude, accuracy=10):
'''get to a given altitude'''
mavproxy.send('switch 4\n')
wait_mode(mav, 'FBWA')
alt_error = mav.messages['VFR_HUD'].alt - altitude
if alt_error > 0:
mavproxy.send('rc 2 2000\n')
else:
mavproxy.send('rc 2 1000\n')
wait_altitude(mav, altitude-accuracy/2, altitude+accuracy/2)
mavproxy.send('rc 2 1500\n')
print("Reached target altitude at %u" % mav.messages['VFR_HUD'].alt)
return True
def axial_left_roll(mavproxy, mav, count=1):
'''fly a left axial roll'''
# full throttle!
mavproxy.send('rc 3 2000\n')
change_altitude(mavproxy, mav, homeloc.alt+200)
# fly the roll in manual
mavproxy.send('switch 5\n')
wait_mode(mav, 'STABILIZE')
while count > 0:
print("Starting roll")
mavproxy.send('rc 1 1000\n')
wait_roll(mav, -150, accuracy=20)
wait_roll(mav, 150, accuracy=20)
wait_roll(mav, 0, accuracy=20)
count -= 1
# back to FBWA
mavproxy.send('rc 1 1500\n')
mavproxy.send('switch 4\n')
wait_mode(mav, 'FBWA')
mavproxy.send('rc 3 1700\n')
if not wait_distance(mav, 50):
return False
return True
def setup_rc(mavproxy):
'''setup RC override control'''
for chan in [1,2,4,5,6,7]:
mavproxy.send('rc %u 1500\n' % chan)
mavproxy.send('rc 3 1000\n')
mavproxy.send('rc 8 1800\n')
def fly_mission(mavproxy, mav, filename, height_accuracy=-1, target_altitude=None):
'''fly a mission from a file'''
global homeloc
mavproxy.send('wp load %s\n' % filename)
mavproxy.expect('flight plan received')
mavproxy.send('wp list\n')
mavproxy.expect('Requesting [0-9]+ waypoints')
mavproxy.send('switch 1\n') # auto mode
wait_mode(mav, 'AUTO')
if not wait_distance(mav, 30, timeout=120):
return False
if not wait_location(mav, homeloc, accuracy=50, timeout=600, target_altitude=target_altitude, height_accuracy=height_accuracy):
return False
print("Mission OK")
return True
def fly_ArduPlane(viewerip=None):
'''fly ArduPlane in SIL
you can pass viewerip as an IP address to optionally send fg and
mavproxy packets too for local viewing of the flight in real time
'''
global expect_list, homeloc
sil = util.start_SIL('ArduPlane', wipe=True)
options = '--fgout=127.0.0.1:5502 --fgin=127.0.0.1:5501 --out=127.0.0.1:19550 --streamrate=5'
if viewerip:
options += ' --out=%s:14550' % viewerip
mavproxy = util.start_MAVProxy_SIL('ArduPlane', options=options)
mavproxy.expect('Logging to (\S+)')
logfile = mavproxy.match.group(1)
print("LOGFILE %s" % logfile)
buildlog = util.reltopdir("../buildlogs/ArduPlane-test.mavlog")
print("buildlog=%s" % buildlog)
if os.path.exists(buildlog):
os.unlink(buildlog)
os.link(logfile, buildlog)
mavproxy.expect('Received [0-9]+ parameters')
# setup test parameters
mavproxy.send("param load %s/ArduPlane.parm\n" % testdir)
mavproxy.expect('Loaded [0-9]+ parameters')
util.expect_setup_callback(mavproxy, expect_callback)
fgear_options = '''
--generic=socket,in,25,,5502,udp,MAVLink \
--generic=socket,out,50,,5501,udp,MAVLink \
--aircraft=Rascal110-JSBSim \
--control=mouse \
--disable-intro-music \
--airport=YKRY \
--lat=-35.362851 \
--lon=149.165223 \
--heading=350 \
--altitude=0 \
--geometry=650x550 \
--jpg-httpd=5502 \
--disable-anti-alias-hud \
--disable-hud-3d \
--disable-enhanced-lighting \
--disable-distance-attenuation \
--disable-horizon-effect \
--shading-flat \
--disable-textures \
--timeofday=noon \
--fdm=jsb \
--disable-sound \
--disable-fullscreen \
--disable-random-objects \
--disable-ai-models \
--shading-flat \
--fog-disable \
--disable-specular-highlight \
--disable-skyblend \
--fg-scenery="/home/tridge/project/UAV/fgdata/Scenery" \
--disable-anti-alias-hud \
--wind=0@0 \
'''
# start fgear
if os.getenv('DISPLAY'):
cmd = 'fgfs %s' % fgear_options
else:
cmd = "xvfb-run -s '-screen 0 800x600x24' fgfs --enable-wireframe %s" % fgear_options
fgear = pexpect.spawn(cmd, logfile=sys.stdout, timeout=10)
util.pexpect_autoclose(fgear)
expect_list.extend([fgear, sil, mavproxy])
# get a mavlink connection going
try:
mav = mavutil.mavlink_connection('127.0.0.1:19550', robust_parsing=True)
except Exception, msg:
print("Failed to start mavlink connection on 127.0.0.1:19550" % msg)
raise
mav.message_hooks.append(message_hook)
failed = False
e = 'None'
try:
mav.wait_heartbeat()
setup_rc(mavproxy)
mav.recv_match(type='GPS_RAW', condition='GPS_RAW.lat != 0 and GPS_RAW.alt != 0 and VFR_HUD.alt != 0',
blocking=True)
homeloc = current_location(mav)
print("Home location: %s" % homeloc)
if not takeoff(mavproxy, mav):
failed = True
if not fly_left_circuit(mavproxy, mav):
failed = True
if not axial_left_roll(mavproxy, mav, 1):
failed = True
if not fly_RTL(mavproxy, mav):
failed = True
if not fly_mission(mavproxy, mav, os.path.join(testdir, "ap1.txt"), height_accuracy = 10,
target_altitude=homeloc.alt+100):
failed = True
except pexpect.TIMEOUT, e:
failed = True
util.pexpect_close(mavproxy)
util.pexpect_close(sil)
util.pexpect_close(fgear)
if os.path.exists('ArduPlane-valgrind.log'):
os.chmod('ArduPlane-valgrind.log', 0644)
shutil.copy("ArduPlane-valgrind.log", util.reltopdir("../buildlogs/ArduPlane-valgrind.log"))
if failed:
print("FAILED: %s" % e)
return False
return True

View File

@ -2,7 +2,8 @@
# APM automatic test suite
# Andrew Tridgell, October 2011
import pexpect, os, util, sys, shutil, arducopter
import pexpect, os, util, sys, shutil
import arducopter, arduplane
import optparse, fnmatch, time, glob, traceback
os.putenv('TMPDIR', util.reltopdir('tmp'))
@ -81,6 +82,8 @@ You can get it from git://git.samba.org/tridge/UAV/HILTest.git
parser = optparse.OptionParser("autotest")
parser.add_option("--skip", type='string', default='', help='list of steps to skip (comma separated)')
parser.add_option("--list", action='store_true', default=False, help='list the available steps')
parser.add_option("--viewerip", default=None, help='IP address to send MAVLink and fg packets to')
parser.add_option("--experimental", default=False, action='store_true', help='enable experimental tests')
opts, args = parser.parse_args()
@ -91,6 +94,7 @@ steps = [
'build.ArduPlane',
'defaults.ArduPlane',
'logs.ArduPlane',
'fly.ArduPlane',
'build1280.ArduCopter',
'build2560.ArduCopter',
'build.ArduCopter',
@ -147,6 +151,12 @@ def run_step(step):
if step == 'fly.ArduCopter':
return arducopter.fly_ArduCopter()
if step == 'fly.ArduPlane':
if not opts.experimental:
print("DISABLED: use --experimental to enable fly.ArduPlane")
return True
return arduplane.fly_ArduPlane(viewerip=opts.viewerip)
if step == 'convertgpx':
return convert_gpx()

181
Tools/autotest/common.py Normal file
View File

@ -0,0 +1,181 @@
import util, pexpect, time, math
# a list of pexpect objects to read while waiting for
# messages. This keeps the output to stdout flowing
expect_list = []
def message_hook(mav, msg):
'''called as each mavlink msg is received'''
global expect_list
# if msg.get_type() in [ 'NAV_CONTROLLER_OUTPUT', 'GPS_RAW' ]:
# print(msg)
for p in expect_list:
try:
p.read_nonblocking(100, timeout=0)
except pexpect.TIMEOUT:
pass
def expect_callback(e):
'''called when waiting for a expect pattern'''
global expect_list
for p in expect_list:
if p == e:
continue
try:
while p.read_nonblocking(100, timeout=0):
pass
except pexpect.TIMEOUT:
pass
class location(object):
'''represent a GPS coordinate'''
def __init__(self, lat, lng, alt=0):
self.lat = lat
self.lng = lng
self.alt = alt
def __str__(self):
return "lat=%.6f,lon=%.6f,alt=%.1f" % (self.lat, self.lng, self.alt)
def get_distance(loc1, loc2):
'''get ground distance between two locations'''
dlat = loc2.lat - loc1.lat
dlong = loc2.lng - loc1.lng
return math.sqrt((dlat*dlat) + (dlong*dlong)) * 1.113195e5
def get_bearing(loc1, loc2):
'''get bearing from loc1 to loc2'''
off_x = loc2.lng - loc1.lng
off_y = loc2.lat - loc1.lat
bearing = 90.00 + math.atan2(-off_y, off_x) * 57.2957795
if bearing < 0:
bearing += 360.00
return bearing;
def current_location(mav):
'''return current location'''
# ensure we have a position
mav.recv_match(type='VFR_HUD', blocking=True)
mav.recv_match(type='GPS_RAW', blocking=True)
return location(mav.messages['GPS_RAW'].lat,
mav.messages['GPS_RAW'].lon,
mav.messages['VFR_HUD'].alt)
def wait_altitude(mav, alt_min, alt_max, timeout=30):
'''wait for a given altitude range'''
tstart = time.time()
print("Waiting for altitude between %u and %u" % (alt_min, alt_max))
while time.time() < tstart + timeout:
m = mav.recv_match(type='VFR_HUD', blocking=True)
print("Altitude %u" % m.alt)
if m.alt >= alt_min and m.alt <= alt_max:
return True
print("Failed to attain altitude range")
return False
def wait_roll(mav, roll, accuracy, timeout=30):
'''wait for a given roll in degrees'''
tstart = time.time()
print("Waiting for roll of %u" % roll)
while time.time() < tstart + timeout:
m = mav.recv_match(type='ATTITUDE', blocking=True)
r = math.degrees(m.roll)
print("Roll %u" % r)
if math.fabs(r - roll) <= accuracy:
return True
print("Failed to attain roll %u" % roll)
return False
def wait_heading(mav, heading, accuracy=5, timeout=30):
'''wait for a given heading'''
tstart = time.time()
while time.time() < tstart + timeout:
m = mav.recv_match(type='VFR_HUD', blocking=True)
print("Heading %u" % m.heading)
if math.fabs(m.heading - heading) <= accuracy:
return True
print("Failed to attain heading %u" % heading)
return False
def wait_distance(mav, distance, accuracy=5, timeout=30):
'''wait for flight of a given distance'''
tstart = time.time()
start = current_location(mav)
while time.time() < tstart + timeout:
m = mav.recv_match(type='GPS_RAW', blocking=True)
pos = current_location(mav)
delta = get_distance(start, pos)
print("Distance %.2f meters" % delta)
if math.fabs(delta - distance) <= accuracy:
return True
print("Failed to attain distance %u" % distance)
return False
def wait_location(mav, loc, accuracy=5, timeout=30, target_altitude=None, height_accuracy=-1):
'''wait for arrival at a location'''
tstart = time.time()
if target_altitude is None:
target_altitude = loc.alt
while time.time() < tstart + timeout:
m = mav.recv_match(type='GPS_RAW', blocking=True)
pos = current_location(mav)
delta = get_distance(loc, pos)
print("Distance %.2f meters" % delta)
if delta <= accuracy:
if height_accuracy != -1 and math.fabs(pos.alt - target_altitude) > height_accuracy:
continue
print("Reached location (%.2f meters)" % delta)
return True
print("Failed to attain location")
return False
def wait_waypoint(mav, wpnum_start, wpnum_end, allow_skip=True, timeout=60):
'''wait for waypoint ranges'''
tstart = time.time()
m = mav.recv_match(type='WAYPOINT_CURRENT', blocking=True)
start_wp = m.seq
current_wp = start_wp
print("\n***wait for waypoint ranges start=%u end=%u ***\n\n\n" % (wpnum_start, wpnum_end))
if start_wp != wpnum_start:
print("Expected start waypoint %u but got %u" % (wpnum_start, start_wp))
return False
while time.time() < tstart + timeout:
m = mav.recv_match(type='WAYPOINT_CURRENT', blocking=True)
seq = m.seq
m = mav.recv_match(type='NAV_CONTROLLER_OUTPUT', blocking=True)
wp_dist = m.wp_dist
print("WP %u (wp_dist=%u)" % (seq, wp_dist))
if seq == current_wp+1 or (seq > current_wp+1 and allow_skip):
print("Starting new waypoint %u" % seq)
tstart = time.time()
current_wp = seq
# the wp_dist check is a hack until we can sort out the right seqnum
# for end of mission
if current_wp == wpnum_end or (current_wp == wpnum_end-1 and wp_dist < 2):
print("Reached final waypoint %u" % seq)
return True
if seq > current_wp+1:
print("Skipped waypoint! Got wp %u expected %u" % (seq, current_wp+1))
return False
print("Timed out waiting for waypoint %u of %u" % (wpnum_end, wpnum_end))
return False
def save_wp(mavproxy, mav):
mavproxy.send('rc 7 2000\n')
mav.recv_match(condition='RC_CHANNELS_RAW.chan7_raw==2000', blocking=True)
mavproxy.send('rc 7 1000\n')
mav.recv_match(condition='RC_CHANNELS_RAW.chan7_raw==1000', blocking=True)
mavproxy.send('wp list\n')
def wait_mode(mav, mode):
'''wait for a flight mode to be engaged'''
mav.recv_match(condition='MAV.flightmode=="%s"' % mode, blocking=True)

View File

@ -73,9 +73,15 @@ def pexpect_autoclose(p):
def pexpect_close(p):
'''close a pexpect child'''
global close_list
p.close()
try:
p.close()
except Exception:
pass
time.sleep(1)
p.close(force=True)
try:
p.close(force=True)
except Exception:
pass
if p in close_list:
close_list.remove(p)