autotest: add test for AHRS backend wind estimates

This commit is contained in:
Peter Barker 2022-10-10 14:47:57 +11:00 committed by Peter Barker
parent c728483a7e
commit 128a6430de
2 changed files with 111 additions and 12 deletions

View File

@ -2798,6 +2798,86 @@ class AutoTestPlane(AutoTest):
self.arm_vehicle()
self.fly_mission(mission)
def wait_and_maintain_wind_estimate(
self,
want_speed,
want_dir,
timeout=10,
speed_tolerance=0.5,
dir_tolerance=5,
**kwargs):
'''wait for wind estimate to reach speed and direction'''
def validator(last, _min, _max):
'''returns false of spd or direction is too-far wrong'''
(spd, di) = last
_min_spd, _min_dir = _min
_max_spd, _max_dir = _max
if spd < _min_spd or spd > _max_spd:
return False
# my apologies to whoever is staring at this and wondering
# why we're not wrapping angles here...
if di < _min_dir or di > _max_dir:
return False
return True
def value_getter():
'''returns a tuple of (wind_speed, wind_dir), where wind_dir is 45 if
wind is coming from NE'''
m = self.assert_receive_message("WIND")
return (m.speed, m.direction)
class ValueAverager(object):
def __init__(self):
self.speed_average = -1
self.dir_average = -1
self.count = 0.0
def add_value(self, value):
(spd, di) = value
if self.speed_average == -1:
self.speed_average = spd
self.dir_average = di
else:
self.speed_average += spd
self.di_average += spd
self.count += 1
return (self.speed_average/self.count, self.dir_average/self.count)
def reset(self):
self.count = 0
self.speed_average = -1
self.dir_average = -1
self.wait_and_maintain_range(
value_name="WindEstimates",
minimum=(want_speed-speed_tolerance, want_dir-dir_tolerance),
maximum=(want_speed+speed_tolerance, want_dir+dir_tolerance),
current_value_getter=value_getter,
value_averager=ValueAverager(),
validator=lambda last, _min, _max: validator(last, _min, _max),
timeout=timeout,
**kwargs
)
def WindEstimates(self):
'''fly non-external AHRS, ensure wind estimate correct'''
self.set_parameters({
"SIM_WIND_SPD": 5,
"SIM_WIND_DIR": 45,
})
self.wait_ready_to_arm()
self.takeoff(70) # default wind sim wind is a sqrt function up to 60m
self.change_mode('LOITER')
# use default estimator to determine when to check others:
self.wait_and_maintain_wind_estimate(5, 45, timeout=120)
for ahrs_type in 0, 2, 3, 10:
self.start_subtest("Checking AHRS_EKF_TYPE=%u" % ahrs_type)
self.set_parameter("AHRS_EKF_TYPE", ahrs_type)
self.wait_and_maintain_wind_estimate(5, 45, speed_tolerance=1)
self.fly_home_land_and_disarm()
def VectorNavEAHRS(self):
'''Test VectorNav EAHRS support'''
self.fly_external_AHRS("VectorNav", 1, "ap1.txt")
@ -4155,6 +4235,7 @@ class AutoTestPlane(AutoTest):
self.EmbeddedParamParser,
self.AerobaticsScripting,
self.MANUAL_CONTROL,
self.WindEstimates,
])
return ret

View File

@ -6125,6 +6125,7 @@ class AutoTest(ABC):
maximum,
current_value_getter,
validator=None,
value_averager=None,
timeout=30,
print_diagnostics_as_target_not_range=False,
**kwargs):
@ -6183,28 +6184,45 @@ class AutoTest(ABC):
achieved_duration_bit)
)
else:
self.progress(
"%s=%0.2f (%s between %s and %s)%s" %
(value_name,
last_value,
want_or_got,
str(minimum),
str(maximum),
achieved_duration_bit)
)
if type(last_value) is float:
self.progress(
"%s=%0.2f (%s between %s and %s)%s" %
(value_name,
last_value,
want_or_got,
str(minimum),
str(maximum),
achieved_duration_bit)
)
else:
self.progress(
"%s=%s (%s between %s and %s)%s" %
(value_name,
last_value,
want_or_got,
str(minimum),
str(maximum),
achieved_duration_bit)
)
last_print_time = self.get_sim_time_cached()
if is_value_valid:
sum_of_achieved_values += last_value
count_of_achieved_values += 1.0
if value_averager is not None:
average = value_averager.add_value(last_value)
else:
sum_of_achieved_values += last_value
count_of_achieved_values += 1.0
average = sum_of_achieved_values / count_of_achieved_values
if achieving_duration_start is None:
achieving_duration_start = self.get_sim_time_cached()
if self.get_sim_time_cached() - achieving_duration_start >= minimum_duration:
self.progress("Attained %s=%f" % (value_name, sum_of_achieved_values / count_of_achieved_values))
self.progress("Attained %s=%s" % (value_name, average))
return True
else:
achieving_duration_start = None
sum_of_achieved_values = 0.0
count_of_achieved_values = 0
if value_averager is not None:
value_averager.reset()
if print_diagnostics_as_target_not_range:
raise AutoTestTimeoutException(
"Failed to attain %s want %s, reached %s" %