autotest: add tests for high latency control protocol

This commit is contained in:
Peter Barker 2022-03-03 14:54:55 +11:00 committed by Peter Barker
parent 2e5c201222
commit 3fa12152f6
1 changed files with 208 additions and 56 deletions

View File

@ -1829,6 +1829,22 @@ class AutoTest(ABC):
0,
0)
def run_cmd_enable_high_latency(self, new_state):
p1 = 0
if new_state:
p1 = 1
self.run_cmd(
mavutil.mavlink.MAV_CMD_CONTROL_HIGH_LATENCY,
p1, # p1 - enable/disable
0, # p2
0, # p3
0, # p4
0, # p5
0, # p6
0, # p7
)
def reboot_sitl_mav(self, required_bootcount=None):
"""Reboot SITL instance using mavlink and wait for it to reconnect."""
# we must make sure that stats have been reset - otherwise
@ -2662,7 +2678,7 @@ class AutoTest(ABC):
count = 0
tstart = time.time()
while True:
this = self.mav.recv(1000000)
this = mav.recv(1000000)
if len(this) == 0:
break
count += len(this)
@ -2798,6 +2814,87 @@ class AutoTest(ABC):
if m.temperature_air == -128: # High_Latency2 defaults to INT8_MIN for no temperature available
raise NotAchievedException("Air Temperature not received from HIGH_LATENCY2")
self.HIGH_LATENCY2_links()
def HIGH_LATENCY2_links(self):
self.start_subtest("SerialProtocol_MAVLinkHL links")
ex = None
self.context_push()
mav2 = None
try:
self.set_parameter("SERIAL2_PROTOCOL", 43) # HL)
self.reboot_sitl()
mav2 = mavutil.mavlink_connection(
"tcp:localhost:5763",
robust_parsing=True,
source_system=7,
source_component=7,
)
self.start_subsubtest("Don't get HIGH_LATENCY2 by default")
for mav in self.mav, mav2:
self.assert_not_receive_message('HIGH_LATENCY2', mav=mav, timeout=10)
self.start_subsubtest("Get HIGH_LATENCY2 upon link enabled only on HL link")
self.run_cmd_enable_high_latency(True)
self.assert_receive_message("HIGH_LATENCY2", mav=mav2, timeout=10)
self.assert_not_receive_message("HIGH_LATENCY2", mav=self.mav, timeout=10)
self.start_subsubtest("Not get HIGH_LATENCY2 upon HL disable")
self.run_cmd_enable_high_latency(False)
self.delay_sim_time(10)
self.assert_not_receive_message('HIGH_LATENCY2', mav=self.mav, timeout=10)
self.drain_mav(mav2)
self.assert_not_receive_message('HIGH_LATENCY2', mav=mav2, timeout=10)
self.start_subsubtest("Stream rate adjustments")
self.run_cmd_enable_high_latency(True)
self.assert_message_rate_hz("HIGH_LATENCY2", 0.2, ndigits=1, mav=mav2)
for test_rate in (1, 0.1, 2):
self.test_rate(
"HIGH_LATENCY2 on enabled link",
test_rate,
test_rate,
mav=mav2,
ndigits=1,
victim_message="HIGH_LATENCY2",
message_rate_sample_period=60,
)
self.assert_not_receive_message("HIGH_LATENCY2", mav=self.mav, timeout=10)
self.run_cmd_enable_high_latency(False)
self.start_subsubtest("Not get HIGH_LATENCY2 after disabling after playing with rates")
self.assert_not_receive_message('HIGH_LATENCY2', mav=self.mav, timeout=10)
self.delay_sim_time(1)
self.drain_mav(mav2)
self.assert_not_receive_message('HIGH_LATENCY2', mav=mav2, timeout=10)
self.start_subsubtest("Enable and disable should not affect non-HL links getting HIGH_LATENCY2")
self.set_message_rate_hz("HIGH_LATENCY2", 5, mav=self.mav)
self.assert_message_rate_hz("HIGH_LATENCY2", 5, mav=self.mav)
self.run_cmd_enable_high_latency(True)
self.assert_message_rate_hz("HIGH_LATENCY2", 5, mav=self.mav),
self.run_cmd_enable_high_latency(False)
self.assert_message_rate_hz("HIGH_LATENCY2", 5, mav=self.mav)
except Exception as e:
self.print_exception_caught(e)
ex = e
self.context_pop()
self.reboot_sitl()
self.set_message_rate_hz("HIGH_LATENCY2", 0)
if ex is not None:
raise ex
def test_log_download(self):
if self.is_tracker():
@ -3057,7 +3154,7 @@ class AutoTest(ABC):
timeout = 30
if self.valgrind:
timeout *= 10
if time.time() - self.last_sim_time_cached_wallclock > timeout:
if time.time() - self.last_sim_time_cached_wallclock > timeout and not self.gdb:
raise AutoTestTimeoutException("sim_time_cached is not updating!")
return ret
@ -3543,8 +3640,29 @@ class AutoTest(ABC):
raise ValueError("count %u not handled" % count)
self.progress("Files same")
def assert_receive_message(self, type, timeout=1, verbose=False, very_verbose=False):
m = self.mav.recv_match(type=type, blocking=True, timeout=timeout)
def assert_not_receive_message(self, message, timeout=1, mav=None):
'''this is like assert_not_receiving_message but uses sim time not
wallclock time'''
self.progress("making sure we're not getting %s messages" % message)
if mav is None:
mav = self.mav
tstart = self.get_sim_time_cached()
while True:
m = mav.recv_match(type=message, blocking=True, timeout=0.1)
if m is not None:
self.progress("Received: %s" % self.dump_message_verbose(m))
raise PreconditionFailedException("Receiving %s messages" % message)
if mav != self.mav:
# update timestamp....
self.drain_mav(self.mav)
if self.get_sim_time_cached() - tstart > timeout:
return
def assert_receive_message(self, type, timeout=1, verbose=False, very_verbose=False, mav=None):
if mav is None:
mav = self.mav
m = mav.recv_match(type=type, blocking=True, timeout=timeout)
if verbose:
self.progress("Received (%s)" % str(m))
if very_verbose:
@ -4796,8 +4914,11 @@ class AutoTest(ABC):
p7,
target_sysid=None,
target_compid=None,
mav=None,
):
"""Send a MAVLink command long."""
if mav is None:
mav = self.mav
if target_sysid is None:
target_sysid = self.sysid_thismav()
if target_compid is None:
@ -4818,17 +4939,17 @@ class AutoTest(ABC):
p5,
p6,
p7))
self.mav.mav.command_long_send(target_sysid,
target_compid,
command,
1, # confirmation
p1,
p2,
p3,
p4,
p5,
p6,
p7)
mav.mav.command_long_send(target_sysid,
target_compid,
command,
1, # confirmation
p1,
p2,
p3,
p4,
p5,
p6,
p7)
def run_cmd(self,
command,
@ -4843,8 +4964,9 @@ class AutoTest(ABC):
target_sysid=None,
target_compid=None,
timeout=10,
quiet=False):
self.drain_mav()
quiet=False,
mav=None):
self.drain_mav(mav=mav)
self.get_sim_time() # required for timeout in run_cmd_get_ack to work
self.send_cmd(
command,
@ -4857,20 +4979,25 @@ class AutoTest(ABC):
p7,
target_sysid=target_sysid,
target_compid=target_compid,
mav=mav,
)
self.run_cmd_get_ack(command, want_result, timeout, quiet=quiet)
self.run_cmd_get_ack(command, want_result, timeout, quiet=quiet, mav=mav)
def run_cmd_get_ack(self, command, want_result, timeout, quiet=False):
def run_cmd_get_ack(self, command, want_result, timeout, quiet=False, mav=None):
# note that the caller should ensure that this cached
# timestamp is reasonably up-to-date!
if mav is None:
mav = self.mav
tstart = self.get_sim_time_cached()
while True:
if mav != self.mav:
self.drain_mav()
delta_time = self.get_sim_time_cached() - tstart
if delta_time > timeout:
raise AutoTestTimeoutException("Did not get good COMMAND_ACK within %fs" % timeout)
m = self.mav.recv_match(type='COMMAND_ACK',
blocking=True,
timeout=0.1)
m = mav.recv_match(type='COMMAND_ACK',
blocking=True,
timeout=0.1)
if m is None:
continue
if not quiet:
@ -8328,16 +8455,22 @@ Also, ignores heartbeats not from our target system'''
self.progress("ALL PASS")
# TODO : Test arming magic;
def get_message_rate(self, victim_message, timeout):
def get_message_rate(self, victim_message, timeout=10, mav=None):
if mav is None:
mav = self.mav
tstart = self.get_sim_time()
count = 0
while self.get_sim_time_cached() < tstart + timeout:
m = self.mav.recv_match(type=victim_message,
blocking=True,
timeout=0.1
)
m = mav.recv_match(
type=victim_message,
blocking=True,
timeout=0.1
)
if m is not None:
count += 1
if mav != self.mav:
self.drain_mav(self.mav)
time_delta = self.get_sim_time_cached() - tstart
self.progress("%s count after %f seconds: %u" %
(victim_message, time_delta, count))
@ -8346,7 +8479,7 @@ Also, ignores heartbeats not from our target system'''
def rate_to_interval_us(self, rate):
return 1/float(rate)*1000000.0
def set_message_rate_hz(self, id, rate_hz):
def set_message_rate_hz(self, id, rate_hz, mav=None):
'''set a message rate in Hz; 0 for original, -1 to disable'''
if type(id) == str:
id = eval("mavutil.mavlink.MAVLINK_MSG_ID_%s" % id)
@ -8361,15 +8494,20 @@ Also, ignores heartbeats not from our target system'''
0,
0,
0,
0)
0,
mav=mav)
def send_get_message_interval(self, victim_message_id):
self.mav.mav.command_long_send(
def send_get_message_interval(self, victim_message, mav=None):
if mav is None:
mav = self.mav
if type(victim_message) == str:
victim_message = eval("mavutil.mavlink.MAVLINK_MSG_ID_%s" % victim_message)
mav.mav.command_long_send(
1,
1,
mavutil.mavlink.MAV_CMD_GET_MESSAGE_INTERVAL,
1, # confirmation
float(victim_message_id),
float(victim_message),
0,
0,
0,
@ -8377,23 +8515,36 @@ Also, ignores heartbeats not from our target system'''
0,
0)
def test_rate(self, desc, in_rate, expected_rate):
def test_rate(self,
desc,
in_rate,
expected_rate
, mav=None,
victim_message="VFR_HUD",
ndigits=0,
message_rate_sample_period=10):
if mav is None:
mav = self.mav
self.progress("###### %s" % desc)
self.progress("Setting rate to %u" % in_rate)
self.progress("Setting rate to %f" % round(in_rate, ndigits=ndigits))
self.set_message_rate_hz(self.victim_message_id, in_rate)
self.set_message_rate_hz(victim_message, in_rate, mav=mav)
new_measured_rate = self.get_message_rate(self.victim_message, 10)
self.progress("Measured rate: %f (want %u)" %
(new_measured_rate, expected_rate))
if round(new_measured_rate) != expected_rate:
raise NotAchievedException("Rate not achieved (got %f want %u)" %
(new_measured_rate, expected_rate))
new_measured_rate = self.get_message_rate(victim_message, timeout=message_rate_sample_period, mav=mav)
self.progress(
"Measured rate: %f (want %f)" %
(round(new_measured_rate, ndigits=ndigits),
round(expected_rate, ndigits=ndigits))
)
if round(new_measured_rate, ndigits=ndigits) != round(expected_rate, ndigits=ndigits):
raise NotAchievedException("Rate not achieved (got %f want %f)" %
(round(new_measured_rate, ndigits), round(expected_rate, ndigits)))
# make sure get_message_interval works:
self.send_get_message_interval(self.victim_message_id)
self.send_get_message_interval(victim_message, mav=mav)
m = self.mav.recv_match(type='MESSAGE_INTERVAL', blocking=True)
m = mav.recv_match(type='MESSAGE_INTERVAL', blocking=True)
if in_rate == 0:
want = self.rate_to_interval_us(expected_rate)
@ -8405,7 +8556,7 @@ Also, ignores heartbeats not from our target system'''
if m.interval_us != want:
raise NotAchievedException("Did not read same interval back from autopilot: want=%d got=%d)" %
(want, m.interval_us))
m = self.mav.recv_match(type='COMMAND_ACK', blocking=True)
m = mav.recv_match(type='COMMAND_ACK', blocking=True)
if m.result != mavutil.mavlink.MAV_RESULT_ACCEPTED:
raise NotAchievedException("Expected ACCEPTED for reading message interval")
@ -8439,22 +8590,22 @@ Also, ignores heartbeats not from our target system'''
if ex is not None:
raise ex
def assert_message_rate_hz(self, message, want_rate, sample_period=20):
self.drain_mav()
rate = round(self.get_message_rate(message, sample_period))
self.progress("%s: Want=%u got=%u" % (message, want_rate, rate))
def assert_message_rate_hz(self, message, want_rate, sample_period=20, ndigits=0, mav=None):
if mav is None:
mav = self.mav
self.drain_mav(mav)
rate = round(self.get_message_rate(message, sample_period, mav=mav), ndigits=ndigits)
self.progress("%s: Want=%f got=%f" % (message, round(want_rate, ndigits=ndigits), round(rate, ndigits=ndigits)))
if rate != want_rate:
raise NotAchievedException("Did not get expected rate (want=%u got=%u" % (want_rate, rate))
raise NotAchievedException("Did not get expected rate (want=%f got=%f)" % (want_rate, rate))
def test_set_message_interval_basic(self):
self.victim_message = 'VFR_HUD'
self.victim_message_id = mavutil.mavlink.MAVLINK_MSG_ID_VFR_HUD
ex = None
try:
rate = round(self.get_message_rate(self.victim_message, 20))
rate = round(self.get_message_rate("VFR_HUD", 20))
self.progress("Initial rate: %u" % rate)
self.test_rate("Test set to %u" % (rate/2,), rate/2, rate/2)
self.test_rate("Test set to %u" % (rate/2,), rate/2, rate/2, victim_message="VFR_HUD")
# this assumes the streamrates have not been played with:
self.test_rate("Resetting original rate using 0-value", 0, rate)
self.test_rate("Disabling using -1-value", -1, 0)
@ -9768,9 +9919,11 @@ switch value'''
want_result=mavutil.mavlink.MAV_RESULT_ACCEPTED)
self.disarm_vehicle()
def assert_not_receiving_message(self, message, timeout=1):
def assert_not_receiving_message(self, message, timeout=1, mav=None):
self.progress("making sure we're not getting %s messages" % message)
m = self.mav.recv_match(type=message, blocking=True, timeout=timeout)
if mav is None:
mav = self.mav
m = mav.recv_match(type=message, blocking=True, timeout=timeout)
if m is not None:
raise PreconditionFailedException("Receiving %s messags" % message)
@ -10055,7 +10208,6 @@ switch value'''
self.wait_attitude(desroll=0, despitch=0, timeout=120, tolerance=1.5)
if ahrs_type != 0: # we don't get secondary msgs while DCM is primary
self.wait_attitude(desroll=0, despitch=0, message_type='AHRS2', tolerance=1, timeout=120)
# self.send_debug_trap()
self.wait_attitude_quaternion(desroll=0, despitch=0, tolerance=1, timeout=120)
self.context_pop()