autotest: add more convenience methods for checking received data

This commit is contained in:
Peter Barker 2022-04-02 18:24:51 +11:00 committed by Peter Barker
parent 8e37c93e7d
commit 62fe90caa1
1 changed files with 51 additions and 6 deletions

View File

@ -3283,21 +3283,44 @@ class AutoTest(ABC):
if not temp_ok:
raise NotAchievedException("target temperature")
def assert_message_field_values(self, m, fieldvalues, verbose=True):
def message_has_field_values(self, m, fieldvalues, verbose=True, epsilon=None):
for (fieldname, value) in fieldvalues.items():
got = getattr(m, fieldname)
if got != value:
raise NotAchievedException("Expected %s.%s to be %s, got %s" %
if math.isnan(value) or math.isnan(got):
equal = math.isnan(value) and math.isnan(got)
elif epsilon is not None:
equal = abs(got - value) <= epsilon
else:
equal = got == value
if not equal:
self.progress("Expected %s.%s to be %s, got %s" %
(m.get_type(), fieldname, value, got))
return False
if verbose:
self.progress("%s.%s has expected value %s" %
(m.get_type(), fieldname, value))
return True
def assert_received_message_field_values(self, message, fieldvalues, verbose=True, very_verbose=False):
def assert_message_field_values(self, m, fieldvalues, verbose=True, epsilon=None):
if self.message_has_field_values(m, fieldvalues, verbose=verbose, epsilon=epsilon):
return
raise NotAchievedException("Did not get expected field values")
def assert_received_message_field_values(self, message, fieldvalues, verbose=True, very_verbose=False, epsilon=None):
m = self.assert_receive_message(message, verbose=verbose, very_verbose=very_verbose)
self.assert_message_field_values(m, fieldvalues, verbose=verbose)
self.assert_message_field_values(m, fieldvalues, verbose=verbose, epsilon=epsilon)
return m
def wait_message_field_values(self, message, fieldvalues, timeout=10, epsilon=None):
tstart = self.get_sim_time_cached()
while True:
if self.get_sim_time_cached() - tstart > timeout:
raise NotAchievedException("Field never reached values")
m = self.assert_receive_message(message)
if self.message_has_field_values(m, fieldvalues, epsilon=epsilon):
break
def onboard_logging_not_log_disarmed(self):
self.start_subtest("Test LOG_DISARMED-is-false behaviour")
self.set_parameter("LOG_DISARMED", 0)
@ -3671,6 +3694,21 @@ class AutoTest(ABC):
raise NotAchievedException("Did not get %s" % type)
return m
def assert_receive_named_value_float(self, name, timeout=10):
tstart = self.get_sim_time_cached()
while True:
if self.get_sim_time_cached() - tstart > timeout:
raise NotAchievedException("Did not get NAMED_VALUE_FLOAT %s" % name)
m = self.assert_receive_message('NAMED_VALUE_FLOAT', verbose=1, very_verbose=1, timeout=timeout)
if m.name != name:
continue
return m
def assert_receive_named_value_float_value(self, name, value, epsilon=0.0001, timeout=10):
m = self.assert_receive_named_value_float_value(name, timeout=timeout)
if abs(m.value - value) > epsilon:
raise NotAchievedException("Bad %s want=%f got=%f" % (name, value, m.value))
def assert_rally_files_same(self, file1, file2):
self.progress("Comparing (%s) and (%s)" % (file1, file2, ))
f1 = open(file1)
@ -4076,6 +4114,9 @@ class AutoTest(ABC):
def set_rc_from_map(self, _map, timeout=20):
map_copy = _map.copy()
for v in map_copy.values():
if type(v) != int:
raise NotAchievedException("RC values must be integers")
self.rc_queue.put(map_copy)
if self.rc_thread is None:
@ -9749,6 +9790,10 @@ switch value'''
print("m=%s" % str(m))
return m is not None
def assert_current_onboard_log_contains_message(self, messagetype):
if not self.current_onboard_log_contains_message(messagetype):
raise NotAchievedException("Current onboard log does not contain message %s" % messagetype)
def run_tests(self, tests):
"""Autotest vehicle in SITL."""
if self.run_tests_called: