autotest: add support for minimum_duration to wait_ekf_flags

.... by re-implementing in terms of a WaitAndMaintain class
This commit is contained in:
Peter Barker 2024-05-17 14:08:00 +10:00 committed by Peter Barker
parent 792fdc2fb8
commit 0b8c72c8bb

View File

@ -465,6 +465,47 @@ class WaitAndMaintainLocation(WaitAndMaintain):
return (f"Want=({self.target.lat},{self.target.lng}) distance={self.horizontal_error(current_value)}")
class WaitAndMaintainEKFFlags(WaitAndMaintain):
'''Waits for EKF status flags to include required_flags and have
error_bits *not* set.'''
def __init__(self, test_suite, required_flags, error_bits, **kwargs):
super(WaitAndMaintainEKFFlags, self).__init__(test_suite, **kwargs)
self.required_flags = required_flags
self.error_bits = error_bits
self.last_EKF_STATUS_REPORT = None
def announce_start_text(self):
return f"Waiting for EKF value {self.required_flags}"
def get_current_value(self):
self.last_EKF_STATUS_REPORT = self.test_suite.assert_receive_message('EKF_STATUS_REPORT', timeout=10)
return self.last_EKF_STATUS_REPORT.flags
def validate_value(self, value):
if value & self.error_bits:
return False
if (value & self.required_flags) != self.required_flags:
return False
return True
def success_text(self):
return "EKF Flags OK"
def timeoutexception(self):
self.progress("Last EKF status report:")
self.progress(self.test_suite.dump_message_verbose(self.last_EKF_STATUS_REPORT))
return AutoTestTimeoutException(f"Failed to get EKF.flags={self.required_flags}")
def progress_text(self, current_value):
error_bits_str = ""
if current_value & self.error_bits:
error_bits_str = " (error bits present)"
return (f"Want=({self.required_flags}) got={current_value}{error_bits_str}")
class WaitAndMaintainArmed(WaitAndMaintain):
def get_current_value(self):
return self.test_suite.armed()
@ -8017,33 +8058,8 @@ Also, ignores heartbeats not from our target system'''
error_bits |= mavutil.mavlink.ESTIMATOR_GPS_GLITCH
self.wait_ekf_flags(required_value, error_bits, timeout=timeout)
def wait_ekf_flags(self, required_value, error_bits, timeout=30):
self.progress("Waiting for EKF value %u" % required_value)
last_print_time = 0
tstart = self.get_sim_time()
m = None
while timeout is None or self.get_sim_time_cached() < tstart + timeout:
m = self.mav.recv_match(type='EKF_STATUS_REPORT', blocking=True, timeout=timeout)
if m is None:
continue
current = m.flags
errors = current & error_bits
everything_ok = (errors == 0 and
current & required_value == required_value)
if everything_ok or self.get_sim_time_cached() - last_print_time > 1:
self.progress("Wait EKF.flags: required:%u current:%u errors=%u" %
(required_value, current, errors))
last_print_time = self.get_sim_time_cached()
if everything_ok:
self.progress("EKF Flags OK")
return True
m_str = str(m)
if m is not None:
m_str = self.dump_message_verbose(m)
self.progress("Last EKF_STATUS_REPORT message:")
self.progress(m_str)
raise AutoTestTimeoutException("Failed to get EKF.flags=%u" %
required_value)
def wait_ekf_flags(self, required_value, error_bits, **kwargs):
WaitAndMaintainEKFFlags(self, required_value, error_bits, **kwargs).run()
def wait_gps_disable(self, position_horizontal=True, position_vertical=False, timeout=30):
"""Disable GPS and wait for EKF to report the end of assistance from GPS."""