ardupilot/Tools/autotest/pysim/util.py
2022-02-01 17:35:26 +11:00

853 lines
26 KiB
Python

from __future__ import print_function
import atexit
import math
import os
import random
import re
import shlex
import signal
import subprocess
import sys
import tempfile
import time
from math import acos, atan2, cos, pi, sqrt
import pexpect
from pymavlink.rotmat import Vector3, Matrix3
if (sys.version_info[0] >= 3):
ENCODING = 'ascii'
else:
ENCODING = None
RADIUS_OF_EARTH = 6378100.0 # in meters
# List of open terminal windows for macosx
windowID = []
def m2ft(x):
"""Meters to feet."""
return float(x) / 0.3048
def ft2m(x):
"""Feet to meters."""
return float(x) * 0.3048
def kt2mps(x):
return x * 0.514444444
def mps2kt(x):
return x / 0.514444444
def topdir():
"""Return top of git tree where autotest is running from."""
d = os.path.dirname(os.path.realpath(__file__))
assert(os.path.basename(d) == 'pysim')
d = os.path.dirname(d)
assert(os.path.basename(d) == 'autotest')
d = os.path.dirname(d)
assert(os.path.basename(d) == 'Tools')
d = os.path.dirname(d)
return d
def reltopdir(path):
"""Return a path relative to topdir()."""
return os.path.relpath(path, topdir())
def run_cmd(cmd, directory=".", show=True, output=False, checkfail=True):
"""Run a shell command."""
shell = False
if not isinstance(cmd, list):
cmd = [cmd]
shell = True
if show:
print("Running: (%s) in (%s)" % (cmd_as_shell(cmd), directory,))
if output:
return subprocess.Popen(cmd, shell=shell, stdout=subprocess.PIPE, cwd=directory).communicate()[0]
elif checkfail:
return subprocess.check_call(cmd, shell=shell, cwd=directory)
else:
return subprocess.call(cmd, shell=shell, cwd=directory)
def rmfile(path):
"""Remove a file if it exists."""
try:
os.unlink(path)
except Exception:
pass
def deltree(path):
"""Delete a tree of files."""
run_cmd('rm -rf %s' % path)
def relwaf():
return "./modules/waf/waf-light"
def waf_configure(board, j=None, debug=False, math_check_indexes=False, coverage=False, ekf_single=False, postype_single=False, sitl_32bit=False, extra_args=[], extra_hwdef=None):
cmd_configure = [relwaf(), "configure", "--board", board]
if debug:
cmd_configure.append('--debug')
if coverage:
cmd_configure.append('--coverage')
if math_check_indexes:
cmd_configure.append('--enable-math-check-indexes')
if ekf_single:
cmd_configure.append('--ekf-single')
if postype_single:
cmd_configure.append('--postype-single')
if sitl_32bit:
cmd_configure.append('--sitl-32bit')
if extra_hwdef is not None:
cmd_configure.extend(['--extra-hwdef', extra_hwdef])
if j is not None:
cmd_configure.extend(['-j', str(j)])
pieces = [shlex.split(x) for x in extra_args]
for piece in pieces:
cmd_configure.extend(piece)
run_cmd(cmd_configure, directory=topdir(), checkfail=True)
def waf_clean():
run_cmd([relwaf(), "clean"], directory=topdir(), checkfail=True)
def waf_build(target=None):
cmd = [relwaf(), "build"]
if target is not None:
cmd.append(target)
run_cmd(cmd, directory=topdir(), checkfail=True)
def build_SITL(build_target, j=None, debug=False, board='sitl', clean=True, configure=True, math_check_indexes=False, coverage=False,
ekf_single=False, postype_single=False, sitl_32bit=False, extra_configure_args=[]):
# first configure
if configure:
waf_configure(board,
j=j,
debug=debug,
math_check_indexes=math_check_indexes,
ekf_single=ekf_single,
postype_single=postype_single,
coverage=coverage,
sitl_32bit=sitl_32bit,
extra_args=extra_configure_args)
# then clean
if clean:
waf_clean()
# then build
cmd_make = [relwaf(), "build", "--target", build_target]
if j is not None:
cmd_make.extend(['-j', str(j)])
run_cmd(cmd_make, directory=topdir(), checkfail=True, show=True)
return True
def build_examples(board, j=None, debug=False, clean=False, configure=True, math_check_indexes=False, coverage=False,
ekf_single=False, postype_single=False, sitl_32bit=False,
extra_configure_args=[]):
# first configure
if configure:
waf_configure(board,
j=j,
debug=debug,
math_check_indexes=math_check_indexes,
ekf_single=ekf_single,
postype_single=postype_single,
coverage=coverage,
sitl_32bit=sitl_32bit,
extra_args=extra_configure_args)
# then clean
if clean:
waf_clean()
# then build
cmd_make = [relwaf(), "examples"]
run_cmd(cmd_make, directory=topdir(), checkfail=True, show=True)
return True
def build_replay(board, j=None, debug=False, clean=False):
# first configure
waf_configure(board, j=j, debug=debug)
# then clean
if clean:
waf_clean()
# then build
cmd_make = [relwaf(), "replay"]
run_cmd(cmd_make, directory=topdir(), checkfail=True, show=True)
return True
def build_tests(board, j=None, debug=False, clean=False, configure=True, math_check_indexes=False, coverage=False,
ekf_single=False, postype_single=False, sitl_32bit=False, extra_configure_args=[]):
# first configure
if configure:
waf_configure(board,
j=j,
debug=debug,
math_check_indexes=math_check_indexes,
ekf_single=ekf_single,
postype_single=postype_single,
coverage=coverage,
sitl_32bit=sitl_32bit,
extra_args=extra_configure_args)
# then clean
if clean:
waf_clean()
# then build
run_cmd([relwaf(), "tests"], directory=topdir(), checkfail=True, show=True)
return True
# list of pexpect children to close on exit
close_list = []
def pexpect_autoclose(p):
"""Mark for autoclosing."""
global close_list
close_list.append(p)
def pexpect_close(p):
"""Close a pexpect child."""
global close_list
ex = None
if p is None:
print("Nothing to close")
return
try:
p.kill(signal.SIGTERM)
except IOError as e:
print("Caught exception: %s" % str(e))
ex = e
pass
if ex is None:
# give the process some time to go away
for i in range(20):
if not p.isalive():
break
time.sleep(0.05)
try:
p.close()
except Exception:
pass
try:
p.close(force=True)
except Exception:
pass
if p in close_list:
close_list.remove(p)
def pexpect_close_all():
"""Close all pexpect children."""
global close_list
for p in close_list[:]:
pexpect_close(p)
def pexpect_drain(p):
"""Drain any pending input."""
import pexpect
try:
p.read_nonblocking(1000, timeout=0)
except Exception:
pass
def cmd_as_shell(cmd):
return (" ".join(['"%s"' % x for x in cmd]))
def make_safe_filename(text):
"""Return a version of text safe for use as a filename."""
r = re.compile("([^a-zA-Z0-9_.+-])")
text.replace('/', '-')
filename = r.sub(lambda m: str(hex(ord(str(m.group(1))))).upper(), text)
return filename
def valgrind_log_filepath(binary, model):
return make_safe_filename('%s-%s-valgrind.log' % (os.path.basename(binary), model,))
def kill_screen_gdb():
cmd = ["screen", "-X", "-S", "ardupilot-gdb", "quit"]
subprocess.Popen(cmd)
def kill_mac_terminal():
global windowID
for window in windowID:
cmd = ("osascript -e \'tell application \"Terminal\" to close "
"(window(get index of window id %s))\'" % window)
os.system(cmd)
def start_SITL(binary,
valgrind=False,
callgrind=False,
gdb=False,
gdb_no_tui=False,
wipe=False,
synthetic_clock=True,
home=None,
model=None,
speedup=1,
defaults_filepath=None,
unhide_parameters=False,
gdbserver=False,
breakpoints=[],
disable_breakpoints=False,
customisations=[],
lldb=False,
enable_fgview_output=False,
supplementary=False):
if model is None and not supplementary:
raise ValueError("model must not be None")
"""Launch a SITL instance."""
cmd = []
if (callgrind or valgrind) and os.path.exists('/usr/bin/valgrind'):
# we specify a prefix for vgdb-pipe because on Vagrant virtual
# machines the pipes are created on the mountpoint for the
# shared directory with the host machine. mmap's,
# unsurprisingly, fail on files created on that mountpoint.
vgdb_prefix = os.path.join(tempfile.gettempdir(), "vgdb-pipe")
log_file = valgrind_log_filepath(binary=binary, model=model)
cmd.extend([
'valgrind',
# adding this option allows valgrind to cope with the overload
# of operator new
"--soname-synonyms=somalloc=nouserintercepts",
'--vgdb-prefix=%s' % vgdb_prefix,
'-q',
'--log-file=%s' % log_file])
if callgrind:
cmd.extend(["--tool=callgrind"])
if gdbserver:
cmd.extend(['gdbserver', 'localhost:3333'])
if gdb:
# attach gdb to the gdbserver:
f = open("/tmp/x.gdb", "w")
f.write("target extended-remote localhost:3333\nc\n")
for breakpoint in breakpoints:
f.write("b %s\n" % (breakpoint,))
if disable_breakpoints:
f.write("disable\n")
f.close()
run_cmd('screen -d -m -S ardupilot-gdbserver '
'bash -c "gdb -x /tmp/x.gdb"')
elif gdb:
f = open("/tmp/x.gdb", "w")
f.write("set pagination off\n")
for breakpoint in breakpoints:
f.write("b %s\n" % (breakpoint,))
if disable_breakpoints:
f.write("disable\n")
if not gdb_no_tui:
f.write("tui enable\n")
f.write("r\n")
f.close()
if sys.platform == "darwin" and os.getenv('DISPLAY'):
cmd.extend(['gdb', '-x', '/tmp/x.gdb', '--args'])
elif os.environ.get('DISPLAY'):
cmd.extend(['xterm', '-e', 'gdb', '-x', '/tmp/x.gdb', '--args'])
else:
cmd.extend(['screen',
'-L', '-Logfile', 'gdb.log',
'-d',
'-m',
'-S', 'ardupilot-gdb',
'gdb', '-x', '/tmp/x.gdb', binary, '--args'])
elif lldb:
f = open("/tmp/x.lldb", "w")
for breakpoint in breakpoints:
f.write("b %s\n" % (breakpoint,))
if disable_breakpoints:
f.write("disable\n")
f.write("settings set target.process.stop-on-exec false\n")
f.write("process launch\n")
f.close()
if sys.platform == "darwin" and os.getenv('DISPLAY'):
cmd.extend(['lldb', '-s', '/tmp/x.lldb', '--'])
elif os.environ.get('DISPLAY'):
cmd.extend(['xterm', '-e', 'lldb', '-s','/tmp/x.lldb', '--'])
else:
raise RuntimeError("DISPLAY was not set")
cmd.append(binary)
if not supplementary:
if wipe:
cmd.append('-w')
if synthetic_clock:
cmd.append('-S')
if home is not None:
cmd.extend(['--home', home])
cmd.extend(['--model', model])
if speedup != 1:
cmd.extend(['--speedup', str(speedup)])
if defaults_filepath is not None:
if type(defaults_filepath) == list:
if len(defaults_filepath):
cmd.extend(['--defaults', ",".join(defaults_filepath)])
else:
cmd.extend(['--defaults', defaults_filepath])
if unhide_parameters:
cmd.extend(['--unhide-groups'])
# somewhere for MAVProxy to connect to:
cmd.append('--uartC=tcp:2')
if not enable_fgview_output:
cmd.append("--disable-fgview");
cmd.extend(customisations)
if (gdb or lldb) and sys.platform == "darwin" and os.getenv('DISPLAY'):
global windowID
# on MacOS record the window IDs so we can close them later
atexit.register(kill_mac_terminal)
child = None
mydir = os.path.dirname(os.path.realpath(__file__))
autotest_dir = os.path.realpath(os.path.join(mydir, '..'))
runme = [os.path.join(autotest_dir, "run_in_terminal_window.sh"), 'mactest']
runme.extend(cmd)
print(runme)
print(cmd)
out = subprocess.Popen(runme, stdout=subprocess.PIPE).communicate()[0]
out = out.decode('utf-8')
p = re.compile('tab 1 of window id (.*)')
tstart = time.time()
while time.time() - tstart < 5:
tabs = p.findall(out)
if len(tabs) > 0:
break
time.sleep(0.1)
# sleep for extra 2 seconds for application to start
time.sleep(2)
if len(tabs) > 0:
windowID.append(tabs[0])
else:
print("Cannot find %s process terminal" % binary)
elif gdb and not os.getenv('DISPLAY'):
subprocess.Popen(cmd)
atexit.register(kill_screen_gdb)
# we are expected to return a pexpect wrapped around the
# stdout of the ArduPilot binary. Not going to happen until
# AP gets a redirect-stdout-to-filehandle option. So, in the
# meantime, return a dummy:
return pexpect.spawn("true", ["true"],
logfile=sys.stdout,
encoding=ENCODING,
timeout=5)
else:
print("Running: %s" % cmd_as_shell(cmd))
first = cmd[0]
rest = cmd[1:]
child = pexpect.spawn(first, rest, logfile=sys.stdout, encoding=ENCODING, timeout=5)
pexpect_autoclose(child)
# give time for parameters to properly setup
time.sleep(3)
if gdb or lldb:
# if we run GDB we do so in an xterm. "Waiting for
# connection" is never going to appear on xterm's output.
# ... so let's give it another magic second.
time.sleep(1)
# TODO: have a SITL-compiled ardupilot able to have its
# console on an output fd.
else:
child.expect('Waiting for ', timeout=300)
return child
def mavproxy_cmd():
'''return path to which mavproxy to use'''
return os.getenv('MAVPROXY_CMD', 'mavproxy.py')
def MAVProxy_version():
'''return the current version of mavproxy as a tuple e.g. (1,8,8)'''
command = "%s --version" % mavproxy_cmd()
output = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE).communicate()[0]
output = output.decode('ascii')
match = re.search("MAVProxy Version: ([0-9]+)[.]([0-9]+)[.]([0-9]+)", output)
if match is None:
raise ValueError("Unable to determine MAVProxy version from (%s)" % output)
return (int(match.group(1)), int(match.group(2)), int(match.group(3)))
def start_MAVProxy_SITL(atype,
aircraft=None,
setup=False,
master='tcp:127.0.0.1:5762',
options=[],
pexpect_timeout=60,
logfile=sys.stdout):
"""Launch mavproxy connected to a SITL instance."""
local_mp_modules_dir = os.path.abspath(
os.path.join(__file__, '..', '..', '..', 'mavproxy_modules'))
env = dict(os.environ)
old = env.get('PYTHONPATH', None)
env['PYTHONPATH'] = local_mp_modules_dir
if old is not None:
env['PYTHONPATH'] += os.path.pathsep + old
import pexpect
global close_list
cmd = []
cmd.append(mavproxy_cmd())
cmd.extend(['--master', master])
if setup:
cmd.append('--setup')
if aircraft is None:
aircraft = 'test.%s' % atype
cmd.extend(['--aircraft', aircraft])
cmd.extend(options)
cmd.extend(['--default-modules', 'misc,terrain,wp,rally,fence,param,arm,mode,rc,cmdlong,output'])
print("PYTHONPATH: %s" % str(env['PYTHONPATH']))
print("Running: %s" % cmd_as_shell(cmd))
ret = pexpect.spawn(cmd[0], cmd[1:], logfile=logfile, encoding=ENCODING, timeout=pexpect_timeout, env=env)
ret.delaybeforesend = 0
pexpect_autoclose(ret)
return ret
def expect_setup_callback(e, callback):
"""Setup a callback that is called once a second while waiting for
patterns."""
import pexpect
def _expect_callback(pattern, timeout=e.timeout):
tstart = time.time()
while time.time() < tstart + timeout:
try:
ret = e.expect_saved(pattern, timeout=1)
return ret
except pexpect.TIMEOUT:
e.expect_user_callback(e)
print("Timed out looking for %s" % pattern)
raise pexpect.TIMEOUT(timeout)
e.expect_user_callback = callback
e.expect_saved = e.expect
e.expect = _expect_callback
def mkdir_p(directory):
"""Like mkdir -p ."""
if not directory:
return
if directory.endswith("/"):
mkdir_p(directory[:-1])
return
if os.path.isdir(directory):
return
mkdir_p(os.path.dirname(directory))
os.mkdir(directory)
def loadfile(fname):
"""Load a file as a string."""
f = open(fname, mode='r')
r = f.read()
f.close()
return r
def lock_file(fname):
"""Lock a file."""
import fcntl
f = open(fname, mode='w')
try:
fcntl.lockf(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
except Exception:
return None
return f
def check_parent(parent_pid=None):
"""Check our parent process is still alive."""
if parent_pid is None:
try:
parent_pid = os.getppid()
except Exception:
pass
if parent_pid is None:
return
try:
os.kill(parent_pid, 0)
except Exception:
print("Parent had finished - exiting")
sys.exit(1)
def EarthRatesToBodyRates(dcm, earth_rates):
"""Convert the angular velocities from earth frame to
body frame. Thanks to James Goppert for the formula
all inputs and outputs are in radians
returns a gyro vector in body frame, in rad/s .
"""
from math import sin, cos
(phi, theta, psi) = dcm.to_euler()
phiDot = earth_rates.x
thetaDot = earth_rates.y
psiDot = earth_rates.z
p = phiDot - psiDot * sin(theta)
q = cos(phi) * thetaDot + sin(phi) * psiDot * cos(theta)
r = cos(phi) * psiDot * cos(theta) - sin(phi) * thetaDot
return Vector3(p, q, r)
def BodyRatesToEarthRates(dcm, gyro):
"""Convert the angular velocities from body frame to
earth frame.
all inputs and outputs are in radians/s
returns a earth rate vector.
"""
from math import sin, cos, tan, fabs
p = gyro.x
q = gyro.y
r = gyro.z
(phi, theta, psi) = dcm.to_euler()
phiDot = p + tan(theta) * (q * sin(phi) + r * cos(phi))
thetaDot = q * cos(phi) - r * sin(phi)
if fabs(cos(theta)) < 1.0e-20:
theta += 1.0e-10
psiDot = (q * sin(phi) + r * cos(phi)) / cos(theta)
return Vector3(phiDot, thetaDot, psiDot)
def gps_newpos(lat, lon, bearing, distance):
"""Extrapolate latitude/longitude given a heading and distance
thanks to http://www.movable-type.co.uk/scripts/latlong.html .
"""
from math import sin, asin, cos, atan2, radians, degrees
lat1 = radians(lat)
lon1 = radians(lon)
brng = radians(bearing)
dr = distance / RADIUS_OF_EARTH
lat2 = asin(sin(lat1) * cos(dr) +
cos(lat1) * sin(dr) * cos(brng))
lon2 = lon1 + atan2(sin(brng) * sin(dr) * cos(lat1),
cos(dr) - sin(lat1) * sin(lat2))
return (degrees(lat2), degrees(lon2))
def gps_distance(lat1, lon1, lat2, lon2):
"""Return distance between two points in meters,
coordinates are in degrees
thanks to http://www.movable-type.co.uk/scripts/latlong.html ."""
lat1 = math.radians(lat1)
lat2 = math.radians(lat2)
lon1 = math.radians(lon1)
lon2 = math.radians(lon2)
dLat = lat2 - lat1
dLon = lon2 - lon1
a = math.sin(0.5 * dLat)**2 + math.sin(0.5 * dLon)**2 * math.cos(lat1) * math.cos(lat2)
c = 2.0 * math.atan2(math.sqrt(a), math.sqrt(1.0 - a))
return RADIUS_OF_EARTH * c
def gps_bearing(lat1, lon1, lat2, lon2):
"""Return bearing between two points in degrees, in range 0-360
thanks to http://www.movable-type.co.uk/scripts/latlong.html ."""
lat1 = math.radians(lat1)
lat2 = math.radians(lat2)
lon1 = math.radians(lon1)
lon2 = math.radians(lon2)
dLon = lon2 - lon1
y = math.sin(dLon) * math.cos(lat2)
x = math.cos(lat1) * math.sin(lat2) - math.sin(lat1) * math.cos(lat2) * math.cos(dLon)
bearing = math.degrees(math.atan2(y, x))
if bearing < 0:
bearing += 360.0
return bearing
class Wind(object):
"""A wind generation object."""
def __init__(self, windstring, cross_section=0.1):
a = windstring.split(',')
if len(a) != 3:
raise RuntimeError("Expected wind in speed,direction,turbulance form, not %s" % windstring)
self.speed = float(a[0]) # m/s
self.direction = float(a[1]) # direction the wind is going in
self.turbulance = float(a[2]) # turbulance factor (standard deviation)
# the cross-section of the aircraft to wind. This is multiplied by the
# difference in the wind and the velocity of the aircraft to give the acceleration
self.cross_section = cross_section
# the time constant for the turbulance - the average period of the
# changes over time
self.turbulance_time_constant = 5.0
# wind time record
self.tlast = time.time()
# initial turbulance multiplier
self.turbulance_mul = 1.0
def current(self, deltat=None):
"""Return current wind speed and direction as a tuple
speed is in m/s, direction in degrees."""
if deltat is None:
tnow = time.time()
deltat = tnow - self.tlast
self.tlast = tnow
# update turbulance random walk
w_delta = math.sqrt(deltat) * (1.0 - random.gauss(1.0, self.turbulance))
w_delta -= (self.turbulance_mul - 1.0) * (deltat / self.turbulance_time_constant)
self.turbulance_mul += w_delta
speed = self.speed * math.fabs(self.turbulance_mul)
return (speed, self.direction)
# Calculate drag.
def drag(self, velocity, deltat=None):
"""Return current wind force in Earth frame. The velocity parameter is
a Vector3 of the current velocity of the aircraft in earth frame, m/s ."""
from math import radians
# (m/s, degrees) : wind vector as a magnitude and angle.
(speed, direction) = self.current(deltat=deltat)
# speed = self.speed
# direction = self.direction
# Get the wind vector.
w = toVec(speed, radians(direction))
obj_speed = velocity.length()
# Compute the angle between the object vector and wind vector by taking
# the dot product and dividing by the magnitudes.
d = w.length() * obj_speed
if d == 0:
alpha = 0
else:
alpha = acos((w * velocity) / d)
# Get the relative wind speed and angle from the object. Note that the
# relative wind speed includes the velocity of the object; i.e., there
# is a headwind equivalent to the object's speed even if there is no
# absolute wind.
(rel_speed, beta) = apparent_wind(speed, obj_speed, alpha)
# Return the vector of the relative wind, relative to the coordinate
# system.
relWindVec = toVec(rel_speed, beta + atan2(velocity.y, velocity.x))
# Combine them to get the acceleration vector.
return Vector3(acc(relWindVec.x, drag_force(self, relWindVec.x)), acc(relWindVec.y, drag_force(self, relWindVec.y)), 0)
def apparent_wind(wind_sp, obj_speed, alpha):
"""http://en.wikipedia.org/wiki/Apparent_wind
Returns apparent wind speed and angle of apparent wind. Alpha is the angle
between the object and the true wind. alpha of 0 rads is a headwind; pi a
tailwind. Speeds should always be positive."""
delta = wind_sp * cos(alpha)
x = wind_sp**2 + obj_speed**2 + 2 * obj_speed * delta
rel_speed = sqrt(x)
if rel_speed == 0:
beta = pi
else:
beta = acos((delta + obj_speed) / rel_speed)
return (rel_speed, beta)
def drag_force(wind, sp):
"""See http://en.wikipedia.org/wiki/Drag_equation
Drag equation is F(a) = cl * p/2 * v^2 * a, where cl : drag coefficient
(let's assume it's low, .e.g., 0.2), p : density of air (assume about 1
kg/m^3, the density just over 1500m elevation), v : relative speed of wind
(to the body), a : area acted on (this is captured by the cross_section
parameter).
So then we have
F(a) = 0.2 * 1/2 * v^2 * cross_section = 0.1 * v^2 * cross_section."""
return (sp**2.0) * 0.1 * wind.cross_section
def acc(val, mag):
""" Function to make the force vector. relWindVec is the direction the apparent
wind comes *from*. We want to compute the accleration vector in the direction
the wind blows to."""
if val == 0:
return mag
else:
return (val / abs(val)) * (0 - mag)
def toVec(magnitude, angle):
"""Converts a magnitude and angle (radians) to a vector in the xy plane."""
v = Vector3(magnitude, 0, 0)
m = Matrix3()
m.from_euler(0, 0, angle)
return m.transposed() * v
def constrain(value, minv, maxv):
"""Constrain a value to a range."""
if value < minv:
value = minv
if value > maxv:
value = maxv
return value
def load_local_module(fname):
'''load a python module from within the ardupilot tree'''
fname = os.path.join(topdir(), fname)
if sys.version_info.major >= 3:
import importlib.util
spec = importlib.util.spec_from_file_location("local_module", fname)
ret = importlib.util.module_from_spec(spec)
spec.loader.exec_module(ret)
else:
import imp
ret = imp.load_source("local_module", fname)
return ret
if __name__ == "__main__":
import doctest
doctest.testmod()