from __future__ import print_function import atexit import math import os import random import re import shlex import signal import subprocess import sys import tempfile import time from math import acos, atan2, cos, pi, sqrt import pexpect from pymavlink.rotmat import Vector3, Matrix3 if (sys.version_info[0] >= 3): ENCODING = 'ascii' else: ENCODING = None RADIUS_OF_EARTH = 6378100.0 # in meters # List of open terminal windows for macosx windowID = [] def m2ft(x): """Meters to feet.""" return float(x) / 0.3048 def ft2m(x): """Feet to meters.""" return float(x) * 0.3048 def kt2mps(x): return x * 0.514444444 def mps2kt(x): return x / 0.514444444 def topdir(): """Return top of git tree where autotest is running from.""" d = os.path.dirname(os.path.realpath(__file__)) assert(os.path.basename(d) == 'pysim') d = os.path.dirname(d) assert(os.path.basename(d) == 'autotest') d = os.path.dirname(d) assert(os.path.basename(d) == 'Tools') d = os.path.dirname(d) return d def relcurdir(path): """Return a path relative to current dir""" return os.path.relpath(path, os.getcwd()) def reltopdir(path): """Returns the normalized ABSOLUTE path for 'path', where path is a path relative to topdir""" return os.path.normpath(os.path.join(topdir(), path)) def run_cmd(cmd, directory=".", show=True, output=False, checkfail=True): """Run a shell command.""" shell = False if not isinstance(cmd, list): cmd = [cmd] shell = True if show: print("Running: (%s) in (%s)" % (cmd_as_shell(cmd), directory,)) if output: return subprocess.Popen(cmd, shell=shell, stdout=subprocess.PIPE, cwd=directory).communicate()[0] elif checkfail: return subprocess.check_call(cmd, shell=shell, cwd=directory) else: return subprocess.call(cmd, shell=shell, cwd=directory) def rmfile(path): """Remove a file if it exists.""" try: os.unlink(path) except Exception: pass def deltree(path): """Delete a tree of files.""" run_cmd('rm -rf %s' % path) def relwaf(): return "./modules/waf/waf-light" def waf_configure(board, j=None, debug=False, math_check_indexes=False, coverage=False, ekf_single=False, postype_single=False, sitl_32bit=False, extra_args=[], extra_hwdef=None): cmd_configure = [relwaf(), "configure", "--board", board] if debug: cmd_configure.append('--debug') if coverage: cmd_configure.append('--coverage') if math_check_indexes: cmd_configure.append('--enable-math-check-indexes') if ekf_single: cmd_configure.append('--ekf-single') if postype_single: cmd_configure.append('--postype-single') if sitl_32bit: cmd_configure.append('--sitl-32bit') if extra_hwdef is not None: cmd_configure.extend(['--extra-hwdef', extra_hwdef]) if j is not None: cmd_configure.extend(['-j', str(j)]) pieces = [shlex.split(x) for x in extra_args] for piece in pieces: cmd_configure.extend(piece) run_cmd(cmd_configure, directory=topdir(), checkfail=True) def waf_clean(): run_cmd([relwaf(), "clean"], directory=topdir(), checkfail=True) def waf_build(target=None): cmd = [relwaf(), "build"] if target is not None: cmd.append(target) run_cmd(cmd, directory=topdir(), checkfail=True) def build_SITL(build_target, j=None, debug=False, board='sitl', clean=True, configure=True, math_check_indexes=False, coverage=False, ekf_single=False, postype_single=False, sitl_32bit=False, extra_configure_args=[]): # first configure if configure: waf_configure(board, j=j, debug=debug, math_check_indexes=math_check_indexes, ekf_single=ekf_single, postype_single=postype_single, coverage=coverage, sitl_32bit=sitl_32bit, extra_args=extra_configure_args) # then clean if clean: waf_clean() # then build cmd_make = [relwaf(), "build", "--target", build_target] if j is not None: cmd_make.extend(['-j', str(j)]) run_cmd(cmd_make, directory=topdir(), checkfail=True, show=True) return True def build_examples(board, j=None, debug=False, clean=False, configure=True, math_check_indexes=False, coverage=False, ekf_single=False, postype_single=False, sitl_32bit=False, extra_configure_args=[]): # first configure if configure: waf_configure(board, j=j, debug=debug, math_check_indexes=math_check_indexes, ekf_single=ekf_single, postype_single=postype_single, coverage=coverage, sitl_32bit=sitl_32bit, extra_args=extra_configure_args) # then clean if clean: waf_clean() # then build cmd_make = [relwaf(), "examples"] run_cmd(cmd_make, directory=topdir(), checkfail=True, show=True) return True def build_replay(board, j=None, debug=False, clean=False): # first configure waf_configure(board, j=j, debug=debug) # then clean if clean: waf_clean() # then build cmd_make = [relwaf(), "replay"] run_cmd(cmd_make, directory=topdir(), checkfail=True, show=True) return True def build_tests(board, j=None, debug=False, clean=False, configure=True, math_check_indexes=False, coverage=False, ekf_single=False, postype_single=False, sitl_32bit=False, extra_configure_args=[]): # first configure if configure: waf_configure(board, j=j, debug=debug, math_check_indexes=math_check_indexes, ekf_single=ekf_single, postype_single=postype_single, coverage=coverage, sitl_32bit=sitl_32bit, extra_args=extra_configure_args) # then clean if clean: waf_clean() # then build run_cmd([relwaf(), "tests"], directory=topdir(), checkfail=True, show=True) return True # list of pexpect children to close on exit close_list = [] def pexpect_autoclose(p): """Mark for autoclosing.""" global close_list close_list.append(p) def pexpect_close(p): """Close a pexpect child.""" global close_list ex = None if p is None: print("Nothing to close") return try: p.kill(signal.SIGTERM) except IOError as e: print("Caught exception: %s" % str(e)) ex = e pass if ex is None: # give the process some time to go away for i in range(20): if not p.isalive(): break time.sleep(0.05) try: p.close() except Exception: pass try: p.close(force=True) except Exception: pass if p in close_list: close_list.remove(p) def pexpect_close_all(): """Close all pexpect children.""" global close_list for p in close_list[:]: pexpect_close(p) def pexpect_drain(p): """Drain any pending input.""" import pexpect try: p.read_nonblocking(1000, timeout=0) except Exception: pass def cmd_as_shell(cmd): return (" ".join(['"%s"' % x for x in cmd])) def make_safe_filename(text): """Return a version of text safe for use as a filename.""" r = re.compile("([^a-zA-Z0-9_.+-])") text.replace('/', '-') filename = r.sub(lambda m: str(hex(ord(str(m.group(1))))).upper(), text) return filename def valgrind_log_filepath(binary, model): return make_safe_filename('%s-%s-valgrind.log' % (os.path.basename(binary), model,)) def kill_screen_gdb(): cmd = ["screen", "-X", "-S", "ardupilot-gdb", "quit"] subprocess.Popen(cmd) def kill_mac_terminal(): global windowID for window in windowID: cmd = ("osascript -e \'tell application \"Terminal\" to close " "(window(get index of window id %s))\'" % window) os.system(cmd) def start_SITL(binary, valgrind=False, callgrind=False, gdb=False, gdb_no_tui=False, wipe=False, synthetic_clock=True, home=None, model=None, speedup=1, defaults_filepath=None, unhide_parameters=False, gdbserver=False, breakpoints=[], disable_breakpoints=False, customisations=[], lldb=False, enable_fgview_output=False, supplementary=False): if model is None and not supplementary: raise ValueError("model must not be None") """Launch a SITL instance.""" cmd = [] if (callgrind or valgrind) and os.path.exists('/usr/bin/valgrind'): # we specify a prefix for vgdb-pipe because on Vagrant virtual # machines the pipes are created on the mountpoint for the # shared directory with the host machine. mmap's, # unsurprisingly, fail on files created on that mountpoint. vgdb_prefix = os.path.join(tempfile.gettempdir(), "vgdb-pipe") log_file = valgrind_log_filepath(binary=binary, model=model) cmd.extend([ 'valgrind', # adding this option allows valgrind to cope with the overload # of operator new "--soname-synonyms=somalloc=nouserintercepts", '--vgdb-prefix=%s' % vgdb_prefix, '-q', '--log-file=%s' % log_file]) if callgrind: cmd.extend(["--tool=callgrind"]) if gdbserver: cmd.extend(['gdbserver', 'localhost:3333']) if gdb: # attach gdb to the gdbserver: f = open("/tmp/x.gdb", "w") f.write("target extended-remote localhost:3333\nc\n") for breakpoint in breakpoints: f.write("b %s\n" % (breakpoint,)) if disable_breakpoints: f.write("disable\n") f.close() run_cmd('screen -d -m -S ardupilot-gdbserver ' 'bash -c "gdb -x /tmp/x.gdb"') elif gdb: f = open("/tmp/x.gdb", "w") f.write("set pagination off\n") for breakpoint in breakpoints: f.write("b %s\n" % (breakpoint,)) if disable_breakpoints: f.write("disable\n") if not gdb_no_tui: f.write("tui enable\n") f.write("r\n") f.close() if sys.platform == "darwin" and os.getenv('DISPLAY'): cmd.extend(['gdb', '-x', '/tmp/x.gdb', '--args']) elif os.environ.get('DISPLAY'): cmd.extend(['xterm', '-e', 'gdb', '-x', '/tmp/x.gdb', '--args']) else: cmd.extend(['screen', '-L', '-Logfile', 'gdb.log', '-d', '-m', '-S', 'ardupilot-gdb', 'gdb', '-x', '/tmp/x.gdb', binary, '--args']) elif lldb: f = open("/tmp/x.lldb", "w") for breakpoint in breakpoints: f.write("b %s\n" % (breakpoint,)) if disable_breakpoints: f.write("disable\n") f.write("settings set target.process.stop-on-exec false\n") f.write("process launch\n") f.close() if sys.platform == "darwin" and os.getenv('DISPLAY'): cmd.extend(['lldb', '-s', '/tmp/x.lldb', '--']) elif os.environ.get('DISPLAY'): cmd.extend(['xterm', '-e', 'lldb', '-s','/tmp/x.lldb', '--']) else: raise RuntimeError("DISPLAY was not set") cmd.append(binary) if not supplementary: if wipe: cmd.append('-w') if synthetic_clock: cmd.append('-S') if home is not None: cmd.extend(['--home', home]) cmd.extend(['--model', model]) if speedup != 1: cmd.extend(['--speedup', str(speedup)]) if defaults_filepath is not None: if type(defaults_filepath) == list: defaults = [reltopdir(path) for path in defaults_filepath] if len(defaults): cmd.extend(['--defaults', ",".join(defaults)]) else: cmd.extend(['--defaults', reltopdir(defaults_filepath)]) if unhide_parameters: cmd.extend(['--unhide-groups']) # somewhere for MAVProxy to connect to: cmd.append('--uartC=tcp:2') if not enable_fgview_output: cmd.append("--disable-fgview"); cmd.extend(customisations) if (gdb or lldb) and sys.platform == "darwin" and os.getenv('DISPLAY'): global windowID # on MacOS record the window IDs so we can close them later atexit.register(kill_mac_terminal) child = None mydir = os.path.dirname(os.path.realpath(__file__)) autotest_dir = os.path.realpath(os.path.join(mydir, '..')) runme = [os.path.join(autotest_dir, "run_in_terminal_window.sh"), 'mactest'] runme.extend(cmd) print(runme) print(cmd) out = subprocess.Popen(runme, stdout=subprocess.PIPE).communicate()[0] out = out.decode('utf-8') p = re.compile('tab 1 of window id (.*)') tstart = time.time() while time.time() - tstart < 5: tabs = p.findall(out) if len(tabs) > 0: break time.sleep(0.1) # sleep for extra 2 seconds for application to start time.sleep(2) if len(tabs) > 0: windowID.append(tabs[0]) else: print("Cannot find %s process terminal" % binary) elif gdb and not os.getenv('DISPLAY'): subprocess.Popen(cmd) atexit.register(kill_screen_gdb) # we are expected to return a pexpect wrapped around the # stdout of the ArduPilot binary. Not going to happen until # AP gets a redirect-stdout-to-filehandle option. So, in the # meantime, return a dummy: return pexpect.spawn("true", ["true"], logfile=sys.stdout, encoding=ENCODING, timeout=5) else: print("Running: %s" % cmd_as_shell(cmd)) first = cmd[0] rest = cmd[1:] child = pexpect.spawn(first, rest, logfile=sys.stdout, encoding=ENCODING, timeout=5) pexpect_autoclose(child) # give time for parameters to properly setup time.sleep(3) if gdb or lldb: # if we run GDB we do so in an xterm. "Waiting for # connection" is never going to appear on xterm's output. # ... so let's give it another magic second. time.sleep(1) # TODO: have a SITL-compiled ardupilot able to have its # console on an output fd. else: child.expect('Waiting for ', timeout=300) return child def mavproxy_cmd(): '''return path to which mavproxy to use''' return os.getenv('MAVPROXY_CMD', 'mavproxy.py') def MAVProxy_version(): '''return the current version of mavproxy as a tuple e.g. (1,8,8)''' command = "%s --version" % mavproxy_cmd() output = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE).communicate()[0] output = output.decode('ascii') match = re.search("MAVProxy Version: ([0-9]+)[.]([0-9]+)[.]([0-9]+)", output) if match is None: raise ValueError("Unable to determine MAVProxy version from (%s)" % output) return (int(match.group(1)), int(match.group(2)), int(match.group(3))) def start_MAVProxy_SITL(atype, aircraft=None, setup=False, master='tcp:127.0.0.1:5762', options=[], pexpect_timeout=60, logfile=sys.stdout): """Launch mavproxy connected to a SITL instance.""" local_mp_modules_dir = os.path.abspath( os.path.join(__file__, '..', '..', '..', 'mavproxy_modules')) env = dict(os.environ) old = env.get('PYTHONPATH', None) env['PYTHONPATH'] = local_mp_modules_dir if old is not None: env['PYTHONPATH'] += os.path.pathsep + old import pexpect global close_list cmd = [] cmd.append(mavproxy_cmd()) cmd.extend(['--master', master]) if setup: cmd.append('--setup') if aircraft is None: aircraft = 'test.%s' % atype cmd.extend(['--aircraft', aircraft]) cmd.extend(options) cmd.extend(['--default-modules', 'misc,terrain,wp,rally,fence,param,arm,mode,rc,cmdlong,output']) print("PYTHONPATH: %s" % str(env['PYTHONPATH'])) print("Running: %s" % cmd_as_shell(cmd)) ret = pexpect.spawn(cmd[0], cmd[1:], logfile=logfile, encoding=ENCODING, timeout=pexpect_timeout, env=env) ret.delaybeforesend = 0 pexpect_autoclose(ret) return ret def expect_setup_callback(e, callback): """Setup a callback that is called once a second while waiting for patterns.""" import pexpect def _expect_callback(pattern, timeout=e.timeout): tstart = time.time() while time.time() < tstart + timeout: try: ret = e.expect_saved(pattern, timeout=1) return ret except pexpect.TIMEOUT: e.expect_user_callback(e) print("Timed out looking for %s" % pattern) raise pexpect.TIMEOUT(timeout) e.expect_user_callback = callback e.expect_saved = e.expect e.expect = _expect_callback def mkdir_p(directory): """Like mkdir -p .""" if not directory: return if directory.endswith("/"): mkdir_p(directory[:-1]) return if os.path.isdir(directory): return mkdir_p(os.path.dirname(directory)) os.mkdir(directory) def loadfile(fname): """Load a file as a string.""" f = open(fname, mode='r') r = f.read() f.close() return r def lock_file(fname): """Lock a file.""" import fcntl f = open(fname, mode='w') try: fcntl.lockf(f, fcntl.LOCK_EX | fcntl.LOCK_NB) except Exception: return None return f def check_parent(parent_pid=None): """Check our parent process is still alive.""" if parent_pid is None: try: parent_pid = os.getppid() except Exception: pass if parent_pid is None: return try: os.kill(parent_pid, 0) except Exception: print("Parent had finished - exiting") sys.exit(1) def EarthRatesToBodyRates(dcm, earth_rates): """Convert the angular velocities from earth frame to body frame. Thanks to James Goppert for the formula all inputs and outputs are in radians returns a gyro vector in body frame, in rad/s . """ from math import sin, cos (phi, theta, psi) = dcm.to_euler() phiDot = earth_rates.x thetaDot = earth_rates.y psiDot = earth_rates.z p = phiDot - psiDot * sin(theta) q = cos(phi) * thetaDot + sin(phi) * psiDot * cos(theta) r = cos(phi) * psiDot * cos(theta) - sin(phi) * thetaDot return Vector3(p, q, r) def BodyRatesToEarthRates(dcm, gyro): """Convert the angular velocities from body frame to earth frame. all inputs and outputs are in radians/s returns a earth rate vector. """ from math import sin, cos, tan, fabs p = gyro.x q = gyro.y r = gyro.z (phi, theta, psi) = dcm.to_euler() phiDot = p + tan(theta) * (q * sin(phi) + r * cos(phi)) thetaDot = q * cos(phi) - r * sin(phi) if fabs(cos(theta)) < 1.0e-20: theta += 1.0e-10 psiDot = (q * sin(phi) + r * cos(phi)) / cos(theta) return Vector3(phiDot, thetaDot, psiDot) def gps_newpos(lat, lon, bearing, distance): """Extrapolate latitude/longitude given a heading and distance thanks to http://www.movable-type.co.uk/scripts/latlong.html . """ from math import sin, asin, cos, atan2, radians, degrees lat1 = radians(lat) lon1 = radians(lon) brng = radians(bearing) dr = distance / RADIUS_OF_EARTH lat2 = asin(sin(lat1) * cos(dr) + cos(lat1) * sin(dr) * cos(brng)) lon2 = lon1 + atan2(sin(brng) * sin(dr) * cos(lat1), cos(dr) - sin(lat1) * sin(lat2)) return (degrees(lat2), degrees(lon2)) def gps_distance(lat1, lon1, lat2, lon2): """Return distance between two points in meters, coordinates are in degrees thanks to http://www.movable-type.co.uk/scripts/latlong.html .""" lat1 = math.radians(lat1) lat2 = math.radians(lat2) lon1 = math.radians(lon1) lon2 = math.radians(lon2) dLat = lat2 - lat1 dLon = lon2 - lon1 a = math.sin(0.5 * dLat)**2 + math.sin(0.5 * dLon)**2 * math.cos(lat1) * math.cos(lat2) c = 2.0 * math.atan2(math.sqrt(a), math.sqrt(1.0 - a)) return RADIUS_OF_EARTH * c def gps_bearing(lat1, lon1, lat2, lon2): """Return bearing between two points in degrees, in range 0-360 thanks to http://www.movable-type.co.uk/scripts/latlong.html .""" lat1 = math.radians(lat1) lat2 = math.radians(lat2) lon1 = math.radians(lon1) lon2 = math.radians(lon2) dLon = lon2 - lon1 y = math.sin(dLon) * math.cos(lat2) x = math.cos(lat1) * math.sin(lat2) - math.sin(lat1) * math.cos(lat2) * math.cos(dLon) bearing = math.degrees(math.atan2(y, x)) if bearing < 0: bearing += 360.0 return bearing class Wind(object): """A wind generation object.""" def __init__(self, windstring, cross_section=0.1): a = windstring.split(',') if len(a) != 3: raise RuntimeError("Expected wind in speed,direction,turbulance form, not %s" % windstring) self.speed = float(a[0]) # m/s self.direction = float(a[1]) # direction the wind is going in self.turbulance = float(a[2]) # turbulance factor (standard deviation) # the cross-section of the aircraft to wind. This is multiplied by the # difference in the wind and the velocity of the aircraft to give the acceleration self.cross_section = cross_section # the time constant for the turbulance - the average period of the # changes over time self.turbulance_time_constant = 5.0 # wind time record self.tlast = time.time() # initial turbulance multiplier self.turbulance_mul = 1.0 def current(self, deltat=None): """Return current wind speed and direction as a tuple speed is in m/s, direction in degrees.""" if deltat is None: tnow = time.time() deltat = tnow - self.tlast self.tlast = tnow # update turbulance random walk w_delta = math.sqrt(deltat) * (1.0 - random.gauss(1.0, self.turbulance)) w_delta -= (self.turbulance_mul - 1.0) * (deltat / self.turbulance_time_constant) self.turbulance_mul += w_delta speed = self.speed * math.fabs(self.turbulance_mul) return (speed, self.direction) # Calculate drag. def drag(self, velocity, deltat=None): """Return current wind force in Earth frame. The velocity parameter is a Vector3 of the current velocity of the aircraft in earth frame, m/s .""" from math import radians # (m/s, degrees) : wind vector as a magnitude and angle. (speed, direction) = self.current(deltat=deltat) # speed = self.speed # direction = self.direction # Get the wind vector. w = toVec(speed, radians(direction)) obj_speed = velocity.length() # Compute the angle between the object vector and wind vector by taking # the dot product and dividing by the magnitudes. d = w.length() * obj_speed if d == 0: alpha = 0 else: alpha = acos((w * velocity) / d) # Get the relative wind speed and angle from the object. Note that the # relative wind speed includes the velocity of the object; i.e., there # is a headwind equivalent to the object's speed even if there is no # absolute wind. (rel_speed, beta) = apparent_wind(speed, obj_speed, alpha) # Return the vector of the relative wind, relative to the coordinate # system. relWindVec = toVec(rel_speed, beta + atan2(velocity.y, velocity.x)) # Combine them to get the acceleration vector. return Vector3(acc(relWindVec.x, drag_force(self, relWindVec.x)), acc(relWindVec.y, drag_force(self, relWindVec.y)), 0) def apparent_wind(wind_sp, obj_speed, alpha): """http://en.wikipedia.org/wiki/Apparent_wind Returns apparent wind speed and angle of apparent wind. Alpha is the angle between the object and the true wind. alpha of 0 rads is a headwind; pi a tailwind. Speeds should always be positive.""" delta = wind_sp * cos(alpha) x = wind_sp**2 + obj_speed**2 + 2 * obj_speed * delta rel_speed = sqrt(x) if rel_speed == 0: beta = pi else: beta = acos((delta + obj_speed) / rel_speed) return (rel_speed, beta) def drag_force(wind, sp): """See http://en.wikipedia.org/wiki/Drag_equation Drag equation is F(a) = cl * p/2 * v^2 * a, where cl : drag coefficient (let's assume it's low, .e.g., 0.2), p : density of air (assume about 1 kg/m^3, the density just over 1500m elevation), v : relative speed of wind (to the body), a : area acted on (this is captured by the cross_section parameter). So then we have F(a) = 0.2 * 1/2 * v^2 * cross_section = 0.1 * v^2 * cross_section.""" return (sp**2.0) * 0.1 * wind.cross_section def acc(val, mag): """ Function to make the force vector. relWindVec is the direction the apparent wind comes *from*. We want to compute the accleration vector in the direction the wind blows to.""" if val == 0: return mag else: return (val / abs(val)) * (0 - mag) def toVec(magnitude, angle): """Converts a magnitude and angle (radians) to a vector in the xy plane.""" v = Vector3(magnitude, 0, 0) m = Matrix3() m.from_euler(0, 0, angle) return m.transposed() * v def constrain(value, minv, maxv): """Constrain a value to a range.""" if value < minv: value = minv if value > maxv: value = maxv return value def load_local_module(fname): '''load a python module from within the ardupilot tree''' fname = os.path.join(topdir(), fname) if sys.version_info.major >= 3: import importlib.util spec = importlib.util.spec_from_file_location("local_module", fname) ret = importlib.util.module_from_spec(spec) spec.loader.exec_module(ret) else: import imp ret = imp.load_source("local_module", fname) return ret if __name__ == "__main__": import doctest doctest.testmod()