Tools: autotest: allow autotest to connect directly to SITL

This commit is contained in:
Peter Barker 2019-03-13 16:53:56 +11:00 committed by Peter Barker
parent 8c7d63433c
commit ad2ab333f9
5 changed files with 244 additions and 95 deletions

View File

@ -200,10 +200,6 @@ class AutoTestCopter(AutoTest):
self.hover()
self.progress("Copter staging 50 meters east of home at 50 meters altitude In mode Alt Hold")
# Start and stop the GCS heartbeat for GCS failsafe testing
def set_heartbeat_rate(self, rate):
self.mavproxy.send('set heartbeat %u\n' % rate)
# loiter - fly south west, then loiter within 5m position and altitude
def loiter(self, holdtime=10, maxaltchange=5, maxdistchange=5):
"""Hold loiter position."""
@ -711,6 +707,7 @@ class AutoTestCopter(AutoTest):
def test_gcs_failsafe(self, side=60, timeout=360):
# Test double-SmartRTL; ensure we do SmarRTL twice rather than
# landing (tests fix for actual bug)
self.set_parameter("SYSID_MYGCS", self.mav.source_system)
self.context_push()
self.start_subtest("GCS failsafe SmartRTL twice")
self.setGCSfailsafe(3)
@ -4855,6 +4852,8 @@ class AutoTestCopter(AutoTest):
def test_manual_control(self):
'''test manual_control mavlink message'''
self.set_parameter("SYSID_MYGCS", self.mav.source_system)
self.change_mode('STABILIZE')
self.takeoff(10)
@ -4968,7 +4967,7 @@ class AutoTestCopter(AutoTest):
def fly_follow_mode(self):
self.set_parameter("FOLL_ENABLE", 1)
self.set_parameter("FOLL_SYSID", 255)
self.set_parameter("FOLL_SYSID", self.mav.source_system)
foll_ofs_x = 30 # metres
self.set_parameter("FOLL_OFS_X", -foll_ofs_x)
self.set_parameter("FOLL_OFS_TYPE", 1) # relative to other vehicle heading
@ -4991,8 +4990,11 @@ class AutoTestCopter(AutoTest):
self.progress("expected_loc: %s" % str(expected_loc))
last_sent = 0
tstart = self.get_sim_time()
while True:
now = self.get_sim_time_cached()
if now - tstart > 60:
raise NotAchievedException("Did not FOLLOW")
if now - last_sent > 0.5:
gpi = self.global_position_int_for_location(new_loc,
now,

View File

@ -326,11 +326,16 @@ class AutoTestSub(AutoTest):
def reboot_sitl(self):
"""Reboot SITL instance and wait it to reconnect."""
self.mavproxy.send("reboot\n")
self.mavproxy.expect("Init ArduSub")
# empty mav to avoid getting old timestamps:
while self.mav.recv_match(blocking=False):
pass
self.context_push()
self.context_collect("STATUSTEXT")
self.send_reboot_command()
# "Init ArduSub" comes out as raw text, not a statustext
for i in range(2):
try:
self.wait_statustext("ArduPilot Ready", check_context=True, wallclock_timeout=True)
except ConnectionResetError as e:
pass
self.context_pop()
self.initialise_after_reboot_sitl()
def apply_defaultfile_parameters(self):

View File

@ -22,6 +22,7 @@ import threading
from MAVProxy.modules.lib import mp_util
from pymavlink import mavparm
from pymavlink import mavwp, mavutil, DFReader
from pymavlink import mavextra
from pymavlink import mavparm
@ -174,6 +175,8 @@ class Context(object):
self.sitl_commandline_customised = False
self.message_hooks = []
self.collections = {}
self.heartbeat_interval_ms = 1000
self.original_heartbeat_interval_ms = None
# https://stackoverflow.com/questions/616645/how-do-i-duplicate-sys-stdout-to-a-log-file-in-python
class TeeBoth(object):
@ -1177,6 +1180,10 @@ class AutoTest(ABC):
self.logs_dir = logs_dir
self.timesync_number = 137
self.last_progress_sent_as_statustext = None
self.last_heartbeat_time_ms = None
self.last_heartbeat_time_wc_s = 0
self.in_drain_mav = False
self.tlog = None
self.rc_thread = None
self.rc_thread_should_quit = False
@ -1248,23 +1255,17 @@ class AutoTest(ABC):
"""Allow subclasses to override SITL streamrate."""
return 10
def autotest_connection_hostport(self):
'''returns host and port of connection between MAVProxy and autotest,
colon-separated'''
return "127.0.0.1:19550"
def autotest_connection_string_from_mavproxy(self):
return "tcpin:" + self.autotest_connection_hostport()
def autotest_connection_string_to_mavproxy(self):
return "tcp:" + self.autotest_connection_hostport()
def autotest_connection_string_to_ardupilot(self):
return "tcp:127.0.0.1:5760"
def mavproxy_options(self):
"""Returns options to be passed to MAVProxy."""
ret = ['--sitl=127.0.0.1:5502',
'--out=' + self.autotest_connection_string_from_mavproxy(),
'--streamrate=%u' % self.sitl_streamrate(),
'--cmd="set heartbeat %u"' % self.speedup]
'--cmd="set heartbeat %u"' % self.speedup,
'--target-system=%u' % self.sysid_thismav(),
'--target-component=1',
]
if self.viewerip:
ret.append("--out=%s:14550" % self.viewerip)
if self.use_map:
@ -1318,8 +1319,6 @@ class AutoTest(ABC):
self.set_parameter("EK3_ENABLE", 1)
self.set_parameter("AHRS_EKF_TYPE", self.force_ahrs_type)
self.reboot_sitl()
if False: # FIXME: do do this if using MAVProxy:
self.fetch_parameters()
def count_lines_in_filepath(self, filepath):
return len([i for i in open(filepath)])
@ -1492,10 +1491,20 @@ class AutoTest(ABC):
# run_cmd when we don't do that.
if self.valgrind:
self.reboot_check_valgrind_log()
self.progress("Stopping and restarting SITL")
self.stop_SITL()
self.start_SITL(wipe=False)
else:
self.send_reboot_command()
self.progress("Executing reboot command")
self.run_cmd(mavutil.mavlink.MAV_CMD_PREFLIGHT_REBOOT_SHUTDOWN,
1, # confirmation
1, # reboot autopilot
0,
0,
0,
0,
0,
0)
self.detect_and_handle_reboot(old_bootcount, required_bootcount=required_bootcount)
def send_cmd_enter_cpu_lockup(self):
@ -1516,6 +1525,7 @@ class AutoTest(ABC):
"""Reboot SITL instance and wait for it to reconnect."""
self.progress("Rebooting SITL")
self.reboot_sitl_mav(required_bootcount=required_bootcount)
self.do_heartbeats(force=True)
self.assert_simstate_location_is_at_startup_location()
def reboot_sitl_mavproxy(self, required_bootcount=None):
@ -1540,13 +1550,34 @@ class AutoTest(ABC):
pass
except AutoTestTimeoutException:
pass
except ConnectionResetError:
pass
# empty mav to avoid getting old timestamps:
self.drain_mav()
self.do_timesync_roundtrip(timeout_in_wallclock=True)
self.progress("Calling initialise-after-reboot")
self.initialise_after_reboot_sitl()
def set_streamrate(self, streamrate, timeout=10):
def set_streamrate(self, streamrate, timeout=20):
'''set MAV_DATA_STREAM_ALL; timeout is wallclock time'''
tstart = time.time()
while True:
if time.time() - tstart > timeout:
raise NotAchievedException("Failed to set streamrate")
self.mav.mav.request_data_stream_send(
1,
1,
mavutil.mavlink.MAV_DATA_STREAM_ALL,
streamrate,
1)
m = self.mav.recv_match(type='SYSTEM_TIME',
blocking=True,
timeout=1)
if m is not None:
break
def set_streamrate_mavproxy(self, streamrate, timeout=10):
tstart = time.time()
while True:
if time.time() - tstart > timeout:
@ -1989,7 +2020,8 @@ class AutoTest(ABC):
# after reboot stream-rates may be zero. Prompt MAVProxy to
# send a rate-change message by changing away from our normal
# stream rates and back again:
self.set_streamrate(self.sitl_streamrate()+1)
self.drain_mav()
self.wait_heartbeat()
self.set_streamrate(self.sitl_streamrate())
self.progress("Reboot complete")
@ -2001,7 +2033,15 @@ class AutoTest(ABC):
defaults_filepath=defaults_filepath,
customisations=customisations,
wipe=wipe)
self.wait_heartbeat(drain_mav=True)
tstart = time.time()
while True:
if time.time() - tstart > 30:
raise NotAchievedException("Failed to customise")
try:
self.wait_heartbeat(drain_mav=True)
except IOError as e:
pass
break
# MAVProxy only checks the streamrates once every 15 seconds.
# Encourage it:
self.set_streamrate(self.sitl_streamrate()+1)
@ -2094,6 +2134,48 @@ class AutoTest(ABC):
global expect_list
expect_list.remove(item)
def heartbeat_interval_ms(self):
c = self.context_get()
if c is None:
return 1000
return c.heartbeat_interval_ms
def set_heartbeat_interval_ms(self, interval_ms):
c = self.context_get()
if c is None:
raise ValueError("No context")
if c.original_heartbeat_interval_ms is None:
c.original_heartbeat_interval_ms = c.heartbeat_interval_ms
c.heartbeat_interval_ms = interval_ms
def set_heartbeat_rate(self, rate_hz):
if rate_hz == 0:
self.set_heartbeat_interval_ms(None)
return
self.set_heartbeat_interval_ms(1000.0/rate_hz)
def do_heartbeats(self, force=False):
# self.progress("do_heartbeats")
if self.heartbeat_interval_ms() is None and not force:
return
x = self.mav.messages.get("SYSTEM_TIME", None)
now_wc = time.time()
if (force or
x is None or
self.last_heartbeat_time_ms is None or
self.last_heartbeat_time_ms < x.time_boot_ms or
x.time_boot_ms - self.last_heartbeat_time_ms > self.heartbeat_interval_ms() or
now_wc - self.last_heartbeat_time_wc_s > 1):
# self.progress("Sending heartbeat")
if x is not None:
self.last_heartbeat_time_ms = x.time_boot_ms
self.last_heartbeat_time_wc_s = now_wc
self.mav.mav.heartbeat_send(mavutil.mavlink.MAV_TYPE_GCS,
mavutil.mavlink.MAV_AUTOPILOT_INVALID,
0,
0,
0)
def drain_all_pexpects(self):
global expect_list
for p in expect_list:
@ -2101,11 +2183,32 @@ class AutoTest(ABC):
def idle_hook(self, mav):
"""Called when waiting for a mavlink message."""
if self.in_drain_mav:
return
self.drain_all_pexpects()
def message_hook(self, mav, msg):
"""Called as each mavlink msg is received."""
# print("msg: %s" % str(msg))
if msg.get_type() == 'STATUSTEXT':
self.progress("AP: %s" % msg.text)
self.write_msg_to_tlog(msg)
self.idle_hook(mav)
self.do_heartbeats()
def send_message_hook(self, msg, x):
self.write_msg_to_tlog(msg)
def write_msg_to_tlog(self, msg):
usec = int(time.time() * 1.0e6)
if self.tlog is None:
tlog_filename = "autotest-%u.tlog" % usec
self.tlog = open(tlog_filename, 'wb')
content = bytearray(struct.pack('>Q', usec) + msg.get_msgbuf())
self.tlog.write(content)
def expect_callback(self, e):
"""Called when waiting for a expect pattern."""
@ -2114,6 +2217,8 @@ class AutoTest(ABC):
if p == e:
continue
util.pexpect_drain(p)
self.drain_mav(quiet=True)
self.do_heartbeats()
def drain_mav_unparsed(self, mav=None, quiet=True, freshen_sim_time=False):
if mav is None:
@ -2141,6 +2246,7 @@ class AutoTest(ABC):
return self.drain_mav_unparsed(quiet=quiet, mav=mav)
if mav is None:
mav = self.mav
self.in_drain_mav = True
count = 0
tstart = time.time()
while mav.recv_match(blocking=False) is not None:
@ -2153,16 +2259,24 @@ class AutoTest(ABC):
else:
rate = "%f/s" % (count/float(tdelta),)
self.progress("Drained %u messages from mav (%s)" % (count, rate), send_statustext=False)
if not quiet:
self.progress("Drained %u messages from mav (%s)" % (count, rate), send_statustext=False)
self.in_drain_mav = False
def do_timesync_roundtrip(self, quiet=False):
def do_timesync_roundtrip(self, quiet=False, timeout_in_wallclock=False):
if not quiet:
self.progress("Doing timesync roundtrip")
tstart = self.get_sim_time()
if timeout_in_wallclock:
tstart = time.time()
else:
tstart = self.get_sim_time()
self.mav.mav.timesync_send(0, self.timesync_number * 1000 + self.mav.source_system)
while True:
now = self.get_sim_time_cached()
if now - tstart > 1:
if timeout_in_wallclock:
now = time.time()
else:
now = self.get_sim_time_cached()
if now - tstart > 5:
raise AutoTestTimeoutException("Did not get timesync response")
m = self.mav.recv_match(type='TIMESYNC', blocking=True, timeout=1)
if not quiet:
@ -2959,15 +3073,16 @@ class AutoTest(ABC):
"""Load a mission from a file to flight controller."""
self.progress("Loading mission (%s)" % filename)
path = os.path.join(testdir, filepath, filename)
tstart = self.get_sim_time_cached()
tstart = self.get_sim_time()
while True:
t2 = self.get_sim_time_cached()
t2 = self.get_sim_time()
if t2 - tstart > 10:
raise AutoTestTimeoutException("Failed to do waypoint thing")
# the following hack is to get around MAVProxy statustext deduping:
while time.time() - self.last_wp_load < 3:
self.progress("Waiting for MAVProxy de-dupe timer to expire")
time.sleep(1)
self.drain_mav_unparsed()
time.sleep(0.1)
self.mavproxy.send('wp load %s\n' % path)
self.mavproxy.expect('Loaded ([0-9]+) waypoints from')
load_count = self.mavproxy.match.group(1)
@ -3113,6 +3228,7 @@ class AutoTest(ABC):
def check_rc_defaults(self):
"""Ensure all rc outputs are at defaults"""
self.do_timesync_roundtrip()
_defaults = self.rc_defaults()
m = self.mav.recv_match(type='RC_CHANNELS', blocking=True, timeout=5)
if m is None:
@ -3348,8 +3464,7 @@ class AutoTest(ABC):
# motors are not, and we can't disarm further because Copter
# looks at whether its *motors* are armed as part of its
# disarm process.
self.stop_SITL()
self.start_SITL(wipe=False)
self.reset_SITL_commandline()
def cpufailsafe_wait_servo_channel_value(self, channel, value, timeout=30):
'''we get restricted messages while doing cpufailsafe, this working then'''
@ -3379,15 +3494,19 @@ class AutoTest(ABC):
# so get_sim_time breaks:
self.send_cmd_enter_cpu_lockup()
start_time = time.time() # not sim time!
self.context_push()
self.context_collect("STATUSTEXT")
while True:
want = "Initialising ArduPilot"
if time.time() - start_time > 30:
raise NotAchievedException("Did not get %s" % want)
# we still need to parse the incoming messages:
m = self.mav.recv_match(type='STATUSTEXT', blocking=True, timeout=0.1)
x = self.mav.messages.get("STATUSTEXT", None)
if x is not None and want in x.text:
try:
self.wait_statustext(want, timeout=0.1, check_context=True, wallclock_timeout=1)
break
except AutoTestTimeoutException as e:
pass
self.context_pop()
# Different scaling for RC input and servo output means the
# servo output value isn't the rc input value:
self.progress("Setting RC to 1200")
@ -3543,8 +3662,6 @@ class AutoTest(ABC):
def send_set_parameter(self, name, value, verbose=False):
if verbose:
self.progress("Send set param for (%s) (%f)" % (name, value))
if False:
return self.send_set_parameter_mavproxy(name, value)
return self.send_set_parameter_direct(name, value)
def set_parameter(self, name, value, **kwargs):
@ -4707,7 +4824,7 @@ class AutoTest(ABC):
Also, ignores heartbeats not from our target system'''
if drain_mav:
self.drain_mav(quiet=quiet)
orig_timeout = x.get("timeout", 10)
orig_timeout = x.get("timeout", 20)
x["timeout"] = 1
tstart = time.time()
while True:
@ -4814,7 +4931,7 @@ Also, ignores heartbeats not from our target system'''
return True
return False
def wait_statustext(self, text, timeout=20, the_function=None, check_context=False, regex=False):
def wait_statustext(self, text, timeout=20, the_function=None, check_context=False, regex=False, wallclock_timeout=False):
"""Wait for a specific STATUSTEXT."""
# Statustexts are often triggered by something we've just
@ -4842,33 +4959,47 @@ Also, ignores heartbeats not from our target system'''
self.progress("Received expected text: %s" % m.text.lower())
statustext_found = True
self.install_message_hook(mh)
try:
if wallclock_timeout:
tstart = time.time()
else:
tstart = self.get_sim_time()
while self.get_sim_time_cached() < tstart + timeout:
if statustext_found:
return
try:
while not statustext_found:
if wallclock_timeout:
now = time.time()
else:
now = self.get_sim_time_cached()
if now - tstart > timeout:
raise AutoTestTimeoutException("Failed to receive text: %s" %
text.lower())
if the_function is not None:
the_function()
m = self.mav.recv_match(type='STATUSTEXT', blocking=True, timeout=0.1)
finally:
self.remove_message_hook(mh)
raise AutoTestTimeoutException("Failed to receive text: %s" %
text.lower())
def get_mavlink_connection_going(self):
# get a mavlink connection going
connection_string = self.autotest_connection_string_to_mavproxy()
try:
self.mav = mavutil.mavlink_connection(connection_string,
robust_parsing=True,
source_component=250)
self.mav = mavutil.mavlink_connection(
self.autotest_connection_string_to_ardupilot(),
retries=1000,
robust_parsing=True,
source_system=250,
source_component=250,
autoreconnect = True,
dialect="ardupilotmega", # if we don't pass this in we end up with the wrong mavlink version...
)
except Exception as msg:
self.progress("Failed to start mavlink connection on %s: %s" %
(connection_string, msg,))
raise
self.mav.message_hooks.append(self.message_hook)
self.mav.mav.set_send_callback(self.send_message_hook, self)
self.mav.idle_hooks.append(self.idle_hook)
self.set_streamrate(self.sitl_streamrate())
def show_test_timings_key_sorter(self, t):
(k, v) = t
return ((v, k))
@ -5067,8 +5198,8 @@ Also, ignores heartbeats not from our target system'''
self.progress("LOGFILE %s" % self.logfile)
self.try_symlink_tlog()
self.progress("Waiting for Parameters")
self.mavproxy.expect('Received [0-9]+ parameters')
# self.progress("Waiting for Parameters")
# self.mavproxy.expect('Received [0-9]+ parameters')
def start_SITL(self, **sitl_args):
start_sitl_args = {
@ -5089,7 +5220,7 @@ Also, ignores heartbeats not from our target system'''
if "model" not in start_sitl_args or start_sitl_args["model"] is None:
start_sitl_args["model"] = self.frame
self.progress("Starting SITL")
self.progress("Starting SITL", send_statustext=False)
self.sitl = util.start_SITL(self.binary, **start_sitl_args)
self.expect_list_add(self.sitl)
if self.sup_binary is not None:
@ -5119,10 +5250,11 @@ Also, ignores heartbeats not from our target system'''
self.progress("Starting simulator")
self.start_SITL()
self.start_mavproxy()
os.environ['MAVLINK20'] = '1'
self.progress("Starting MAVLink connection")
self.get_mavlink_connection_going()
self.start_mavproxy()
util.expect_setup_callback(self.mavproxy, self.expect_callback)
@ -6649,22 +6781,18 @@ Also, ignores heartbeats not from our target system'''
0)
def send_get_message_interval(self, victim_message_id):
if False:
self.mavproxy.send("long GET_MESSAGE_INTERVAL %u\n" %
(victim_message_id))
else:
self.mav.mav.command_long_send(
1,
1,
mavutil.mavlink.MAV_CMD_GET_MESSAGE_INTERVAL,
1, # confirmation
float(victim_message_id),
0,
0,
0,
0,
0,
0)
self.mav.mav.command_long_send(
1,
1,
mavutil.mavlink.MAV_CMD_GET_MESSAGE_INTERVAL,
1, # confirmation
float(victim_message_id),
0,
0,
0,
0,
0,
0)
def test_rate(self, desc, in_rate, expected_rate):
self.progress("###### %s" % desc)
@ -6882,7 +7010,7 @@ Also, ignores heartbeats not from our target system'''
self.reboot_sitl(required_bootcount=1);
self.progress("Waiting for 'Config error'")
self.mavproxy.expect("Config error");
self.wait_statustext("Config error");
self.progress("Setting %s to %f" % (parameter_name, new_parameter_value))
self.set_parameter(parameter_name, new_parameter_value)
except Exception as e:
@ -7718,6 +7846,7 @@ switch value'''
self.clear_mission(mavutil.mavlink.MAV_MISSION_TYPE_ALL)
for test in tests:
self.drain_mav_unparsed()
self.run_one_test(test)
except pexpect.TIMEOUT:
@ -8082,7 +8211,6 @@ switch value'''
self.mavproxy_load_module("relay")
self.mavproxy.send("sitl_accelcal\n")
self.mavproxy.send("accelcal\n")
self.mavproxy.expect("Init Gyro")
self.mavproxy.expect("Calibrated")
for wanted in [ "level",
"on its LEFT side",
@ -8123,10 +8251,11 @@ switch value'''
def test_button(self):
self.set_parameter("SIM_PIN_MASK", 0)
self.set_parameter("BTN_ENABLE", 1)
self.drain_mav()
self.do_heartbeats(force=True)
btn = 4
pin = 3
self.drain_mav()
self.set_parameter("BTN_PIN%u" % btn, pin)
self.set_parameter("BTN_PIN%u" % btn, pin, verbose=True)
m = self.mav.recv_match(type='BUTTON_CHANGE', blocking=True, timeout=1)
self.progress("### m: %s" % str(m))
if m is not None:

View File

@ -352,6 +352,9 @@ def start_SITL(binary,
cmd.extend(['--unhide-groups'])
cmd.extend(customisations)
# somewhere for MAVProxy to connect to:
cmd.append('--uartC=tcp:2')
if (gdb or lldb) and sys.platform == "darwin" and os.getenv('DISPLAY'):
global windowID
# on MacOS record the window IDs so we can close them later
@ -428,7 +431,7 @@ def MAVProxy_version():
raise ValueError("Unable to determine MAVProxy version from (%s)" % output)
return (int(match.group(1)), int(match.group(2)), int(match.group(3)))
def start_MAVProxy_SITL(atype, aircraft=None, setup=False, master='tcp:127.0.0.1:5760',
def start_MAVProxy_SITL(atype, aircraft=None, setup=False, master='tcp:127.0.0.1:5762',
options=[], logfile=sys.stdout):
"""Launch mavproxy connected to a SITL instance."""
local_mp_modules_dir = os.path.abspath(

View File

@ -675,6 +675,7 @@ Brakes have negligible effect (with=%0.2fm without=%0.2fm delta=%0.2fm)
raise ex
def test_rc_override_cancel(self):
self.set_parameter("SYSID_MYGCS", self.mav.source_system)
self.change_mode('MANUAL')
self.wait_ready_to_arm()
self.zero_throttle()
@ -745,6 +746,7 @@ Brakes have negligible effect (with=%0.2fm without=%0.2fm delta=%0.2fm)
def test_rc_overrides(self):
self.context_push()
self.set_parameter("SYSID_MYGCS", self.mav.source_system)
ex = None
try:
self.set_parameter("RC12_OPTION", 46)
@ -996,6 +998,7 @@ Brakes have negligible effect (with=%0.2fm without=%0.2fm delta=%0.2fm)
def test_manual_control(self):
self.context_push()
self.set_parameter("SYSID_MYGCS", self.mav.source_system)
ex = None
try:
self.set_parameter("RC12_OPTION", 46) # enable/disable rc overrides
@ -1140,13 +1143,16 @@ Brakes have negligible effect (with=%0.2fm without=%0.2fm delta=%0.2fm)
def test_sysid_enforce(self):
'''Run the same arming code with correct then incorrect SYSID'''
if self.mav.source_system != self.mav.mav.srcSystem:
raise PreconditionFailedException("Expected mav.source_system and mav.srcSystem to match")
self.context_push()
old_srcSystem = self.mav.mav.srcSystem
ex = None
try:
# if set_parameter is ever changed to not use MAVProxy
# this test is going to break horribly. Sorry.
self.set_parameter("SYSID_MYGCS", 255) # assume MAVProxy does this!
self.set_parameter("SYSID_ENFORCE", 1) # assume MAVProxy does this!
self.set_parameter("SYSID_MYGCS", self.mav.source_system)
self.set_parameter("SYSID_ENFORCE", 1, add_to_context=False)
self.change_mode('MANUAL')
@ -1155,23 +1161,25 @@ Brakes have negligible effect (with=%0.2fm without=%0.2fm delta=%0.2fm)
self.arm_vehicle(timeout=5)
self.disarm_vehicle()
# temporarily set a different system ID than MAVProxy:
self.progress("Attempting to arm vehicle myself")
old_srcSystem = self.mav.mav.srcSystem
self.do_timesync_roundtrip()
# should not be able to arm from a system id which is not MY_SYSID
self.progress("Attempting to arm vehicle from bad system-id")
success = None
try:
self.mav.mav.srcSystem = 243
# temporarily set a different system ID than normal:
self.mav.mav.srcSystem = 72
self.arm_vehicle(timeout=5)
self.disarm_vehicle()
success = False
except AutoTestTimeoutException as e:
except AutoTestTimeoutException:
success = True
self.mav.mav.srcSystem = old_srcSystem
if not success:
raise NotAchievedException(
"Managed to arm with SYSID_ENFORCE set")
raise NotAchievedException("Managed to arm with SYSID_ENFORCE set")
# should be able to arm from the vehicle's own components:
self.progress("Attempting to arm vehicle from vehicle component")
old_srcSystem = self.mav.mav.srcSystem
comp_arm_exception = None
try:
self.mav.mav.srcSystem = 1
@ -1187,6 +1195,8 @@ Brakes have negligible effect (with=%0.2fm without=%0.2fm delta=%0.2fm)
self.progress("Exception caught: %s" % (
self.get_exception_stacktrace(e)))
ex = e
self.mav.mav.srcSystem = old_srcSystem
self.set_parameter("SYSID_ENFORCE", 0, add_to_context=False)
self.context_pop()
if ex is not None:
raise ex
@ -2944,9 +2954,9 @@ Brakes have negligible effect (with=%0.2fm without=%0.2fm delta=%0.2fm)
target_component,
1)
m = self.mav.recv_match(type="RALLY_POINT", blocking=True, timeout=1)
if m.target_system != 255:
if m.target_system != self.mav.source_system:
raise NotAchievedException("Bad target_system on received rally point (want=%u got=%u)" % (255, m.target_system))
if m.target_component != 250: # autotest's component ID
if m.target_component != self.mav.source_component: # autotest's component ID
raise NotAchievedException("Bad target_component on received rally point")
if m.lat != item1_lat:
raise NotAchievedException("Bad latitude on received rally point")