ardupilot/Tools/autotest/pysim/util.py

835 lines
25 KiB
Python

from __future__ import print_function
'''
AP_FLAKE8_CLEAN
'''
import atexit
import math
import os
import re
import shlex
import signal
import subprocess
import sys
import tempfile
import time
import pexpect
if sys.version_info[0] >= 3:
ENCODING = 'ascii'
else:
ENCODING = None
RADIUS_OF_EARTH = 6378100.0 # in meters
# List of open terminal windows for macosx
windowID = []
def topdir():
"""Return top of git tree where autotest is running from."""
d = os.path.dirname(os.path.realpath(__file__))
assert os.path.basename(d) == 'pysim'
d = os.path.dirname(d)
assert os.path.basename(d) == 'autotest'
d = os.path.dirname(d)
assert os.path.basename(d) == 'Tools'
d = os.path.dirname(d)
return d
def relcurdir(path):
"""Return a path relative to current dir"""
return os.path.relpath(path, os.getcwd())
def reltopdir(path):
"""Returns the normalized ABSOLUTE path for 'path', where path is a path relative to topdir"""
return os.path.normpath(os.path.join(topdir(), path))
def run_cmd(cmd, directory=".", show=True, output=False, checkfail=True):
"""Run a shell command."""
shell = False
if not isinstance(cmd, list):
cmd = [cmd]
shell = True
if show:
print("Running: (%s) in (%s)" % (cmd_as_shell(cmd), directory,))
if output:
return subprocess.Popen(cmd, shell=shell, stdout=subprocess.PIPE, cwd=directory).communicate()[0]
elif checkfail:
return subprocess.check_call(cmd, shell=shell, cwd=directory)
else:
return subprocess.call(cmd, shell=shell, cwd=directory)
def rmfile(path):
"""Remove a file if it exists."""
try:
os.unlink(path)
except (OSError, FileNotFoundError):
pass
def deltree(path):
"""Delete a tree of files."""
run_cmd('rm -rf %s' % path)
def relwaf():
return "./modules/waf/waf-light"
def waf_configure(board,
j=None,
debug=False,
math_check_indexes=False,
coverage=False,
ekf_single=False,
postype_single=False,
force_32bit=False,
extra_args=[],
extra_hwdef=None,
ubsan=False,
ubsan_abort=False,
num_aux_imus=0,
dronecan_tests=False,
extra_defines={}):
cmd_configure = [relwaf(), "configure", "--board", board]
if debug:
cmd_configure.append('--debug')
if coverage:
cmd_configure.append('--coverage')
if math_check_indexes:
cmd_configure.append('--enable-math-check-indexes')
if ekf_single:
cmd_configure.append('--ekf-single')
if postype_single:
cmd_configure.append('--postype-single')
if force_32bit:
cmd_configure.append('--force-32bit')
if ubsan:
cmd_configure.append('--ubsan')
if ubsan_abort:
cmd_configure.append('--ubsan-abort')
if num_aux_imus > 0:
cmd_configure.append('--num-aux-imus=%u' % num_aux_imus)
if dronecan_tests:
cmd_configure.append('--enable-dronecan-tests')
if extra_hwdef is not None:
cmd_configure.extend(['--extra-hwdef', extra_hwdef])
for nv in extra_defines.items():
cmd_configure.extend(['--define', "%s=%s" % nv])
if j is not None:
cmd_configure.extend(['-j', str(j)])
pieces = [shlex.split(x) for x in extra_args]
for piece in pieces:
cmd_configure.extend(piece)
run_cmd(cmd_configure, directory=topdir(), checkfail=True)
def waf_clean():
run_cmd([relwaf(), "clean"], directory=topdir(), checkfail=True)
def waf_build(target=None):
cmd = [relwaf(), "build"]
if target is not None:
cmd.append(target)
run_cmd(cmd, directory=topdir(), checkfail=True)
def build_SITL(
build_target,
board='sitl',
clean=True,
configure=True,
coverage=False,
debug=False,
ekf_single=False,
extra_configure_args=[],
extra_defines={},
j=None,
math_check_indexes=False,
postype_single=False,
force_32bit=False,
ubsan=False,
ubsan_abort=False,
num_aux_imus=0,
dronecan_tests=False,
):
# first configure
if configure:
waf_configure(board,
j=j,
debug=debug,
math_check_indexes=math_check_indexes,
ekf_single=ekf_single,
postype_single=postype_single,
coverage=coverage,
force_32bit=force_32bit,
ubsan=ubsan,
ubsan_abort=ubsan_abort,
extra_defines=extra_defines,
num_aux_imus=num_aux_imus,
dronecan_tests=dronecan_tests,
extra_args=extra_configure_args,)
# then clean
if clean:
waf_clean()
# then build
cmd_make = [relwaf(), "build", "--target", build_target]
if j is not None:
cmd_make.extend(['-j', str(j)])
run_cmd(cmd_make, directory=topdir(), checkfail=True, show=True)
return True
def build_examples(board, j=None, debug=False, clean=False, configure=True, math_check_indexes=False, coverage=False,
ekf_single=False, postype_single=False, force_32bit=False, ubsan=False, ubsan_abort=False,
num_aux_imus=0, dronecan_tests=False,
extra_configure_args=[]):
# first configure
if configure:
waf_configure(board,
j=j,
debug=debug,
math_check_indexes=math_check_indexes,
ekf_single=ekf_single,
postype_single=postype_single,
coverage=coverage,
force_32bit=force_32bit,
ubsan=ubsan,
ubsan_abort=ubsan_abort,
extra_args=extra_configure_args,
dronecan_tests=dronecan_tests)
# then clean
if clean:
waf_clean()
# then build
cmd_make = [relwaf(), "examples"]
run_cmd(cmd_make, directory=topdir(), checkfail=True, show=True)
return True
def build_replay(board, j=None, debug=False, clean=False):
# first configure
waf_configure(board, j=j, debug=debug)
# then clean
if clean:
waf_clean()
# then build
cmd_make = [relwaf(), "replay"]
run_cmd(cmd_make, directory=topdir(), checkfail=True, show=True)
return True
def build_tests(board,
j=None,
debug=False,
clean=False,
configure=True,
math_check_indexes=False,
coverage=False,
ekf_single=False,
postype_single=False,
force_32bit=False,
ubsan=False,
ubsan_abort=False,
num_aux_imus=0,
dronecan_tests=False,
extra_configure_args=[]):
# first configure
if configure:
waf_configure(board,
j=j,
debug=debug,
math_check_indexes=math_check_indexes,
ekf_single=ekf_single,
postype_single=postype_single,
coverage=coverage,
force_32bit=force_32bit,
ubsan=ubsan,
ubsan_abort=ubsan_abort,
num_aux_imus=num_aux_imus,
dronecan_tests=dronecan_tests,
extra_args=extra_configure_args,)
# then clean
if clean:
waf_clean()
# then build
run_cmd([relwaf(), "tests"], directory=topdir(), checkfail=True, show=True)
return True
# list of pexpect children to close on exit
close_list = []
def pexpect_autoclose(p):
"""Mark for autoclosing."""
global close_list
close_list.append(p)
def pexpect_close(p):
"""Close a pexpect child."""
global close_list
ex = None
if p is None:
print("Nothing to close")
return
try:
p.kill(signal.SIGTERM)
except IOError as e:
print("Caught exception: %s" % str(e))
ex = e
pass
if ex is None:
# give the process some time to go away
for i in range(20):
if not p.isalive():
break
time.sleep(0.05)
try:
p.close()
except Exception:
pass
try:
p.close(force=True)
except Exception:
pass
if p in close_list:
close_list.remove(p)
def pexpect_close_all():
"""Close all pexpect children."""
global close_list
for p in close_list[:]:
pexpect_close(p)
def pexpect_drain(p):
"""Drain any pending input."""
try:
p.read_nonblocking(1000, timeout=0)
except Exception:
pass
def cmd_as_shell(cmd):
return (" ".join(['"%s"' % x for x in cmd]))
def make_safe_filename(text):
"""Return a version of text safe for use as a filename."""
r = re.compile("([^a-zA-Z0-9_.+-])")
text.replace('/', '-')
filename = r.sub(lambda m: str(hex(ord(str(m.group(1))))).upper(), text)
return filename
def valgrind_log_filepath(binary, model):
return make_safe_filename('%s-%s-valgrind.log' % (os.path.basename(binary), model,))
def kill_screen_gdb():
cmd = ["screen", "-X", "-S", "ardupilot-gdb", "quit"]
subprocess.Popen(cmd)
def kill_mac_terminal():
global windowID
for window in windowID:
cmd = ("osascript -e \'tell application \"Terminal\" to close "
"(window(get index of window id %s))\'" % window)
os.system(cmd)
class FakeMacOSXSpawn(object):
"""something that looks like a pspawn child so we can ignore attempts
to pause (and otherwise kill(1) SITL. MacOSX using osascript to
start/stop sitl
"""
def __init__(self):
pass
def progress(self, message):
print(message)
def kill(self, sig):
# self.progress("FakeMacOSXSpawn: ignoring kill(%s)" % str(sig))
pass
def isalive(self):
self.progress("FakeMacOSXSpawn: assuming process is alive")
return True
class PSpawnStdPrettyPrinter(object):
'''a fake filehandle-like object which prefixes a string to all lines
before printing to stdout/stderr. To be used to pass to
pexpect.spawn's logfile argument
'''
def __init__(self, output=sys.stdout, prefix="stdout"):
self.output = output
self.prefix = prefix
self.buffer = ""
def close(self):
self.print_prefixed_line(self.buffer)
def write(self, data):
self.buffer += data
lines = self.buffer.split("\n")
self.buffer = lines[-1]
lines.pop()
for line in lines:
self.print_prefixed_line(line)
def print_prefixed_line(self, line):
print("%s: %s" % (self.prefix, line), file=self.output)
def flush(self):
pass
def start_SITL(binary,
valgrind=False,
callgrind=False,
gdb=False,
gdb_no_tui=False,
wipe=False,
synthetic_clock=True,
home=None,
model=None,
speedup=1,
sim_rate_hz=None,
defaults_filepath=[],
unhide_parameters=False,
gdbserver=False,
breakpoints=[],
disable_breakpoints=False,
customisations=[],
lldb=False,
enable_fgview=False,
supplementary=False,
stdout_prefix=None):
if model is None and not supplementary:
raise ValueError("model must not be None")
"""Launch a SITL instance."""
cmd = []
if (callgrind or valgrind) and os.path.exists('/usr/bin/valgrind'):
# we specify a prefix for vgdb-pipe because on Vagrant virtual
# machines the pipes are created on the mountpoint for the
# shared directory with the host machine. mmap's,
# unsurprisingly, fail on files created on that mountpoint.
vgdb_prefix = os.path.join(tempfile.gettempdir(), "vgdb-pipe")
log_file = valgrind_log_filepath(binary=binary, model=model)
cmd.extend([
'valgrind',
# adding this option allows valgrind to cope with the overload
# of operator new
"--soname-synonyms=somalloc=nouserintercepts",
'--vgdb-prefix=%s' % vgdb_prefix,
'-q',
'--log-file=%s' % log_file])
if callgrind:
cmd.extend(["--tool=callgrind"])
if gdbserver:
cmd.extend(['gdbserver', 'localhost:3333'])
if gdb:
# attach gdb to the gdbserver:
f = open("/tmp/x.gdb", "w")
f.write("target extended-remote localhost:3333\nc\n")
for breakingpoint in breakpoints:
f.write("b %s\n" % (breakingpoint,))
if disable_breakpoints:
f.write("disable\n")
f.close()
run_cmd('screen -d -m -S ardupilot-gdbserver '
'bash -c "gdb -x /tmp/x.gdb"')
elif gdb:
f = open("/tmp/x.gdb", "w")
f.write("set pagination off\n")
for breakingpoint in breakpoints:
f.write("b %s\n" % (breakingpoint,))
if disable_breakpoints:
f.write("disable\n")
if not gdb_no_tui:
f.write("tui enable\n")
f.write("r\n")
f.close()
if sys.platform == "darwin" and os.getenv('DISPLAY'):
cmd.extend(['gdb', '-x', '/tmp/x.gdb', '--args'])
elif os.environ.get('DISPLAY'):
cmd.extend(['xterm', '-e', 'gdb', '-x', '/tmp/x.gdb', '--args'])
else:
cmd.extend(['screen',
'-L', '-Logfile', 'gdb.log',
'-d',
'-m',
'-S', 'ardupilot-gdb',
'gdb', '--cd', os.getcwd(), '-x', '/tmp/x.gdb', binary, '--args'])
elif lldb:
f = open("/tmp/x.lldb", "w")
for breakingpoint in breakpoints:
f.write("b %s\n" % (breakingpoint,))
if disable_breakpoints:
f.write("disable\n")
f.write("settings set target.process.stop-on-exec false\n")
f.write("process launch\n")
f.close()
if sys.platform == "darwin" and os.getenv('DISPLAY'):
cmd.extend(['lldb', '-s', '/tmp/x.lldb', '--'])
elif os.environ.get('DISPLAY'):
cmd.extend(['xterm', '-e', 'lldb', '-s', '/tmp/x.lldb', '--'])
else:
raise RuntimeError("DISPLAY was not set")
cmd.append(binary)
if defaults_filepath is None:
defaults_filepath = []
if not isinstance(defaults_filepath, list):
defaults_filepath = [defaults_filepath]
defaults = [reltopdir(path) for path in defaults_filepath]
if not supplementary:
if wipe:
cmd.append('-w')
if synthetic_clock:
cmd.append('-S')
if home is not None:
cmd.extend(['--home', home])
cmd.extend(['--model', model])
if speedup is not None and speedup != 1:
ntf = tempfile.NamedTemporaryFile(mode="w", delete=False)
print(f"SIM_SPEEDUP {speedup}", file=ntf)
ntf.close()
# prepend it so that a caller can override the speedup in
# passed-in defaults:
defaults = [ntf.name] + defaults
if sim_rate_hz is not None:
cmd.extend(['--rate', str(sim_rate_hz)])
if unhide_parameters:
cmd.extend(['--unhide-groups'])
# somewhere for MAVProxy to connect to:
cmd.append('--serial1=tcp:2')
if enable_fgview:
cmd.append("--enable-fgview")
if len(defaults):
cmd.extend(['--defaults', ",".join(defaults)])
cmd.extend(customisations)
if "--defaults" in customisations:
raise ValueError("--defaults must be passed in via defaults_filepath keyword argument, not as part of customisation list") # noqa
pexpect_logfile_prefix = stdout_prefix
if pexpect_logfile_prefix is None:
pexpect_logfile_prefix = os.path.basename(binary)
pexpect_logfile = PSpawnStdPrettyPrinter(prefix=pexpect_logfile_prefix)
if (gdb or lldb) and sys.platform == "darwin" and os.getenv('DISPLAY'):
global windowID
# on MacOS record the window IDs so we can close them later
atexit.register(kill_mac_terminal)
child = None
mydir = os.path.dirname(os.path.realpath(__file__))
autotest_dir = os.path.realpath(os.path.join(mydir, '..'))
runme = [os.path.join(autotest_dir, "run_in_terminal_window.sh"), 'mactest']
runme.extend(cmd)
print(cmd)
out = subprocess.Popen(runme, stdout=subprocess.PIPE).communicate()[0]
out = out.decode('utf-8')
p = re.compile('tab 1 of window id (.*)')
tstart = time.time()
while time.time() - tstart < 5:
tabs = p.findall(out)
if len(tabs) > 0:
break
time.sleep(0.1)
# sleep for extra 2 seconds for application to start
time.sleep(2)
if len(tabs) > 0:
windowID.append(tabs[0])
else:
print("Cannot find %s process terminal" % binary)
child = FakeMacOSXSpawn()
elif gdb and not os.getenv('DISPLAY'):
subprocess.Popen(cmd)
atexit.register(kill_screen_gdb)
# we are expected to return a pexpect wrapped around the
# stdout of the ArduPilot binary. Not going to happen until
# AP gets a redirect-stdout-to-filehandle option. So, in the
# meantime, return a dummy:
return pexpect.spawn("true", ["true"],
logfile=pexpect_logfile,
encoding=ENCODING,
timeout=5)
else:
print("Running: %s" % cmd_as_shell(cmd))
first = cmd[0]
rest = cmd[1:]
child = pexpect.spawn(first, rest, logfile=pexpect_logfile, encoding=ENCODING, timeout=5)
pexpect_autoclose(child)
if gdb or lldb:
# if we run GDB we do so in an xterm. "Waiting for
# connection" is never going to appear on xterm's output.
# ... so let's give it another magic second.
time.sleep(1)
# TODO: have a SITL-compiled ardupilot able to have its
# console on an output fd.
else:
child.expect('Waiting for ', timeout=300)
return child
def mavproxy_cmd():
"""return path to which mavproxy to use"""
return os.getenv('MAVPROXY_CMD', 'mavproxy.py')
def MAVProxy_version():
"""return the current version of mavproxy as a tuple e.g. (1,8,8)"""
command = "%s --version" % mavproxy_cmd()
output = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE).communicate()[0]
output = output.decode('ascii')
match = re.search("MAVProxy Version: ([0-9]+)[.]([0-9]+)[.]([0-9]+)", output)
if match is None:
raise ValueError("Unable to determine MAVProxy version from (%s)" % output)
return int(match.group(1)), int(match.group(2)), int(match.group(3))
def start_MAVProxy_SITL(atype,
aircraft=None,
setup=False,
master=None,
options=[],
sitl_rcin_port=5501,
pexpect_timeout=60,
logfile=sys.stdout):
"""Launch mavproxy connected to a SITL instance."""
if master is None:
raise ValueError("Expected a master")
local_mp_modules_dir = os.path.abspath(
os.path.join(__file__, '..', '..', '..', 'mavproxy_modules'))
env = dict(os.environ)
old = env.get('PYTHONPATH', None)
env['PYTHONPATH'] = local_mp_modules_dir
if old is not None:
env['PYTHONPATH'] += os.path.pathsep + old
global close_list
cmd = []
cmd.append(mavproxy_cmd())
cmd.extend(['--master', master])
cmd.extend(['--sitl', "localhost:%u" % sitl_rcin_port])
if setup:
cmd.append('--setup')
if aircraft is None:
aircraft = 'test.%s' % atype
cmd.extend(['--aircraft', aircraft])
cmd.extend(options)
cmd.extend(['--default-modules', 'misc,wp,rally,fence,param,arm,mode,rc,cmdlong,output'])
print("PYTHONPATH: %s" % str(env['PYTHONPATH']))
print("Running: %s" % cmd_as_shell(cmd))
ret = pexpect.spawn(cmd[0], cmd[1:], logfile=logfile, encoding=ENCODING, timeout=pexpect_timeout, env=env)
ret.delaybeforesend = 0
pexpect_autoclose(ret)
return ret
def start_PPP_daemon(ips, sockaddr):
"""Start pppd for networking"""
global close_list
cmd = "sudo pppd socket %s debug noauth nodetach %s" % (sockaddr, ips)
cmd = cmd.split()
print("Running: %s" % cmd_as_shell(cmd))
ret = pexpect.spawn(cmd[0], cmd[1:], logfile=sys.stdout, encoding=ENCODING, timeout=30)
ret.delaybeforesend = 0
pexpect_autoclose(ret)
return ret
def expect_setup_callback(e, callback):
"""Setup a callback that is called once a second while waiting for
patterns."""
def _expect_callback(pattern, timeout=e.timeout):
tstart = time.time()
while time.time() < tstart + timeout:
try:
ret = e.expect_saved(pattern, timeout=1)
return ret
except pexpect.TIMEOUT:
e.expect_user_callback(e)
print("Timed out looking for %s" % pattern)
raise pexpect.TIMEOUT(timeout)
e.expect_user_callback = callback
e.expect_saved = e.expect
e.expect = _expect_callback
def mkdir_p(directory):
"""Like mkdir -p ."""
if not directory:
return
if directory.endswith("/"):
mkdir_p(directory[:-1])
return
if os.path.isdir(directory):
return
mkdir_p(os.path.dirname(directory))
os.mkdir(directory)
def loadfile(fname):
"""Load a file as a string."""
f = open(fname, mode='r')
r = f.read()
f.close()
return r
def lock_file(fname):
"""Lock a file."""
import fcntl
f = open(fname, mode='w')
try:
fcntl.lockf(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
except OSError:
return None
return f
def check_parent(parent_pid=None):
"""Check our parent process is still alive."""
if parent_pid is None:
try:
parent_pid = os.getppid()
except OSError:
pass
if parent_pid is None:
return
try:
os.kill(parent_pid, 0)
except OSError:
print("Parent had finished - exiting")
sys.exit(1)
def gps_newpos(lat, lon, bearing, distance):
"""Extrapolate latitude/longitude given a heading and distance
thanks to http://www.movable-type.co.uk/scripts/latlong.html .
"""
from math import sin, asin, cos, atan2, radians, degrees
lat1 = radians(lat)
lon1 = radians(lon)
brng = radians(bearing)
dr = distance / RADIUS_OF_EARTH
lat2 = asin(sin(lat1) * cos(dr) +
cos(lat1) * sin(dr) * cos(brng))
lon2 = lon1 + atan2(sin(brng) * sin(dr) * cos(lat1),
cos(dr) - sin(lat1) * sin(lat2))
return degrees(lat2), degrees(lon2)
def gps_distance(lat1, lon1, lat2, lon2):
"""Return distance between two points in meters,
coordinates are in degrees
thanks to http://www.movable-type.co.uk/scripts/latlong.html ."""
lat1 = math.radians(lat1)
lat2 = math.radians(lat2)
lon1 = math.radians(lon1)
lon2 = math.radians(lon2)
dLat = lat2 - lat1
dLon = lon2 - lon1
a = math.sin(0.5 * dLat)**2 + math.sin(0.5 * dLon)**2 * math.cos(lat1) * math.cos(lat2)
c = 2.0 * math.atan2(math.sqrt(a), math.sqrt(1.0 - a))
return RADIUS_OF_EARTH * c
def gps_bearing(lat1, lon1, lat2, lon2):
"""Return bearing between two points in degrees, in range 0-360
thanks to http://www.movable-type.co.uk/scripts/latlong.html ."""
lat1 = math.radians(lat1)
lat2 = math.radians(lat2)
lon1 = math.radians(lon1)
lon2 = math.radians(lon2)
dLon = lon2 - lon1
y = math.sin(dLon) * math.cos(lat2)
x = math.cos(lat1) * math.sin(lat2) - math.sin(lat1) * math.cos(lat2) * math.cos(dLon)
bearing = math.degrees(math.atan2(y, x))
if bearing < 0:
bearing += 360.0
return bearing
def constrain(value, minv, maxv):
"""Constrain a value to a range."""
if value < minv:
value = minv
if value > maxv:
value = maxv
return value
def load_local_module(fname):
"""load a python module from within the ardupilot tree"""
fname = os.path.join(topdir(), fname)
if sys.version_info.major >= 3:
import importlib.util
spec = importlib.util.spec_from_file_location("local_module", fname)
ret = importlib.util.module_from_spec(spec)
spec.loader.exec_module(ret)
else:
import imp
ret = imp.load_source("local_module", fname)
return ret
def get_git_hash(short=False):
short_v = "--short=8 " if short else ""
githash = run_cmd(f'git rev-parse {short_v}HEAD', output=True, directory=reltopdir('.')).strip()
if sys.version_info.major >= 3:
githash = githash.decode('utf-8')
return githash
if __name__ == "__main__":
import doctest
doctest.testmod()