autotest: add new wait_and_maintain_range method

This commit is contained in:
Peter Barker 2022-07-05 13:30:20 +10:00 committed by Peter Barker
parent e77d0ce1a4
commit 86096261d0
1 changed files with 137 additions and 42 deletions

View File

@ -5700,18 +5700,12 @@ class AutoTest(ABC):
return getattr(msg, field)
raise MsgRcvTimeoutException("Failed to get %s" % field)
def validator(value2, target2=None):
if speed_min <= value2 <= speed_max:
return True
else:
return False
self.wait_and_maintain(
self.wait_and_maintain_range(
value_name=field,
target=(speed_min+speed_max)/2,
minimum=speed_min,
maximum=speed_max,
current_value_getter=lambda: get_speed(timeout),
accuracy=(speed_max - speed_min)/2,
validator=lambda value2, target2: validator(value2, target2),
validator=None,
timeout=timeout,
**kwargs
)
@ -5765,32 +5759,126 @@ class AutoTest(ABC):
)
def wait_and_maintain(self, value_name, target, current_value_getter, validator=None, accuracy=2.0, timeout=30, **kwargs):
if type(target) is Vector3:
return self.wait_and_maintain_vector(
value_name,
target,
current_value_getter,
validator,
timeout=30,
**kwargs
)
return self.wait_and_maintain_range(
value_name,
minimum=target - accuracy/2,
maximum=target + accuracy/2,
current_value_getter=current_value_getter,
validator=validator,
timeout=timeout,
print_diagnostics_as_target_not_range=True,
**kwargs
)
def wait_and_maintain_vector(self,
value_name,
target,
current_value_getter,
validator,
timeout=30,
**kwargs):
tstart = self.get_sim_time()
achieving_duration_start = None
if type(target) is Vector3:
sum_of_achieved_values = Vector3()
last_value = Vector3()
else:
sum_of_achieved_values = 0.0
last_value = 0.0
sum_of_achieved_values = Vector3()
last_value = Vector3()
count_of_achieved_values = 0
called_function = kwargs.get("called_function", None)
minimum_duration = kwargs.get("minimum_duration", 0)
if minimum_duration >= timeout:
raise ValueError("minimum_duration >= timeout")
if type(target) is Vector3:
self.progress("Waiting for %s=(%s) with accuracy %.02f" % (value_name, str(target), accuracy))
else:
self.progress("Waiting for %s=(%s)" % (value_name, str(target)))
last_print_time = 0
while self.get_sim_time_cached() < tstart + timeout: # if we failed to received message with the getter the sim time isn't updated # noqa
last_value = current_value_getter()
if called_function is not None:
called_function(last_value, target)
is_value_valid = validator(last_value, target)
if self.get_sim_time_cached() - last_print_time > 1:
if is_value_valid:
want_or_got = "got"
else:
want_or_got = "want"
achieved_duration_bit = ""
if achieving_duration_start is not None:
so_far = self.get_sim_time_cached() - achieving_duration_start
achieved_duration_bit = " (maintain=%.1f/%.1f)" % (so_far, minimum_duration)
self.progress(
"%s=(%s) (%s (%s))%s" %
(value_name,
str(last_value),
want_or_got,
str(target),
achieved_duration_bit)
)
last_print_time = self.get_sim_time_cached()
if is_value_valid:
sum_of_achieved_values += last_value
count_of_achieved_values += 1.0
if achieving_duration_start is None:
achieving_duration_start = self.get_sim_time_cached()
if self.get_sim_time_cached() - achieving_duration_start >= minimum_duration:
self.progress("Attained %s=%s" % (
value_name,
str(sum_of_achieved_values * (1.0 / count_of_achieved_values))))
return True
else:
achieving_duration_start = None
sum_of_achieved_values.zero()
count_of_achieved_values = 0
raise AutoTestTimeoutException(
"Failed to attain %s want %s, reached %s" %
(value_name,
str(target),
str(sum_of_achieved_values / count_of_achieved_values) if count_of_achieved_values != 0 else str(last_value)))
def wait_and_maintain_range(self,
value_name,
minimum,
maximum,
current_value_getter,
validator=None,
timeout=30,
print_diagnostics_as_target_not_range=False,
**kwargs):
if print_diagnostics_as_target_not_range:
target = (minimum + maximum) / 2
accuracy = (maximum - minimum) / 2
tstart = self.get_sim_time()
achieving_duration_start = None
sum_of_achieved_values = 0.0
last_value = 0.0
count_of_achieved_values = 0
called_function = kwargs.get("called_function", None)
minimum_duration = kwargs.get("minimum_duration", 0)
if minimum_duration >= timeout:
raise ValueError("minimum_duration >= timeout")
if print_diagnostics_as_target_not_range:
self.progress("Waiting for %s=%.02f with accuracy %.02f" % (value_name, target, accuracy))
else:
self.progress("Waiting for %s between (%s) and (%s)" % (value_name, str(minimum), str(maximum)))
last_print_time = 0
while self.get_sim_time_cached() < tstart + timeout: # if we failed to received message with the getter the sim time isn't updated # noqa
last_value = current_value_getter()
if called_function is not None:
called_function(last_value, target)
if validator is not None:
is_value_valid = validator(last_value, target)
if print_diagnostics_as_target_not_range:
is_value_valid = validator(last_value, target)
else:
is_value_valid = validator(last_value, minimum, maximum)
else:
is_value_valid = math.fabs(last_value - target) <= accuracy
is_value_valid = (minimum <= last_value) and (last_value <= maximum)
if self.get_sim_time_cached() - last_print_time > 1:
if is_value_valid:
want_or_got = "got"
@ -5801,19 +5889,26 @@ class AutoTest(ABC):
so_far = self.get_sim_time_cached() - achieving_duration_start
achieved_duration_bit = " (maintain=%.1f/%.1f)" % (so_far, minimum_duration)
if type(target) is Vector3:
if print_diagnostics_as_target_not_range:
self.progress(
"%s=(%s) (%s (%s) +- %f)%s" %
"%s=%0.2f (%s %f +- %f)%s" %
(value_name,
str(last_value),
last_value,
want_or_got,
str(target),
target,
accuracy,
achieved_duration_bit)
)
else:
self.progress("%s=%0.2f (%s %f +- %f)%s" %
(value_name, last_value, want_or_got, target, accuracy, achieved_duration_bit))
self.progress(
"%s=%0.2f (%s between %s and %s)%s" %
(value_name,
last_value,
want_or_got,
str(minimum),
str(maximum),
achieved_duration_bit)
)
last_print_time = self.get_sim_time_cached()
if is_value_valid:
sum_of_achieved_values += last_value
@ -5821,25 +5916,25 @@ class AutoTest(ABC):
if achieving_duration_start is None:
achieving_duration_start = self.get_sim_time_cached()
if self.get_sim_time_cached() - achieving_duration_start >= minimum_duration:
if type(target) is Vector3:
self.progress("Attained %s=%s" % (
value_name,
str(sum_of_achieved_values * (1.0 / count_of_achieved_values))))
else:
self.progress("Attained %s=%f" % (value_name, sum_of_achieved_values / count_of_achieved_values))
self.progress("Attained %s=%f" % (value_name, sum_of_achieved_values / count_of_achieved_values))
return True
else:
achieving_duration_start = None
if type(target) is Vector3:
sum_of_achieved_values.zero()
else:
sum_of_achieved_values = 0.0
sum_of_achieved_values = 0.0
count_of_achieved_values = 0
raise AutoTestTimeoutException(
"Failed to attain %s want %s, reached %s" %
(value_name,
str(target),
str(sum_of_achieved_values / count_of_achieved_values) if count_of_achieved_values != 0 else str(last_value)))
if print_diagnostics_as_target_not_range:
raise AutoTestTimeoutException(
"Failed to attain %s want %s, reached %s" %
(value_name,
str(target),
str(sum_of_achieved_values / count_of_achieved_values) if count_of_achieved_values != 0 else str(last_value)))
else:
raise AutoTestTimeoutException(
"Failed to attain %s between %s and %s, reached %s" %
(value_name,
str(minimum),
str(maximum),
str(sum_of_achieved_values / count_of_achieved_values) if count_of_achieved_values != 0 else str(last_value)))
def wait_heading(self, heading, accuracy=5, timeout=30, **kwargs):
"""Wait for a given heading."""