Tools: Common : create generic wait_and_maintain function, upgrade all wait functions to use it

This commit is contained in:
Pierre Kancir 2019-07-08 18:14:28 +02:00 committed by Peter Barker
parent 87d363c3f4
commit 2d65cbd884

View File

@ -2736,137 +2736,149 @@ class AutoTest(ABC):
while tstart + seconds_to_wait > tnow:
tnow = self.get_sim_time()
def wait_altitude(self, alt_min, alt_max, timeout=30, relative=False):
def wait_altitude(self, altitude_min, altitude_max, relative=False, timeout=30, **kwargs):
"""Wait for a given altitude range."""
previous_alt = None
assert altitude_min <= altitude_max, "Minimum altitude should be less than maximum altitude."
tstart = self.get_sim_time()
self.progress("Waiting for altitude between %.02f and %.02f" %
(alt_min, alt_max))
last_wait_alt_msg = 0
while self.get_sim_time_cached() < tstart + timeout:
m = self.mav.recv_match(type='GLOBAL_POSITION_INT', blocking=True)
if m is None:
continue
if relative:
alt = m.relative_alt/1000.0 # mm -> m
def get_altitude(alt_relative=False, timeout2=30):
msg = self.mav.recv_match(type='GLOBAL_POSITION_INT', blocking=True, timeout=timeout2)
if msg:
if alt_relative:
return msg.relative_alt / 1000.0 # mm -> m
else:
alt = m.alt/1000.0 # mm -> m
return msg.alt / 1000.0 # mm -> m
raise MsgRcvTimeoutException("Failed to get Global Position")
if previous_alt is None:
climb_rate = "-"
else:
climb_rate = "%.02f" % (alt - previous_alt)
previous_alt = alt
ok = alt >= alt_min and alt <= alt_max
if ok or self.get_sim_time_cached() - last_wait_alt_msg > 1:
self.progress("Wait Altitude: Cur:%.02f min:%.02f max:%.02f climb_rate: %s"
% (alt, alt_min, alt_max, climb_rate))
last_wait_alt_msg = self.get_sim_time_cached()
if ok:
self.progress("Altitude OK")
def validator(value2, target2=None):
if altitude_min <= value2 <= altitude_max:
return True
raise WaitAltitudeTimout("Failed to attain altitude range")
else:
return False
def wait_groundspeed(self, gs_min, gs_max, timeout=30):
self.wait_and_maintain(value_name="Altitude", target=altitude_min, current_value_getter=lambda: get_altitude(relative, timeout), accuracy=(altitude_max - altitude_min), validator=lambda value2, target2: validator(value2, target2), timeout=timeout, **kwargs)
def wait_groundspeed(self, speed_min, speed_max, timeout=30, **kwargs):
"""Wait for a given ground speed range."""
self.progress("Waiting for groundspeed between %.1f and %.1f" %
(gs_min, gs_max))
last_print = 0
tstart = self.get_sim_time()
while self.get_sim_time_cached() < tstart + timeout:
m = self.mav.recv_match(type='VFR_HUD', blocking=True)
if self.get_sim_time_cached() - last_print > 1:
self.progress("Wait groundspeed %.3f, target:%.3f" %
(m.groundspeed, gs_min))
last_print = self.get_sim_time_cached()
if m.groundspeed >= gs_min and m.groundspeed <= gs_max:
return True
raise WaitGroundSpeedTimeout("Failed to attain groundspeed range")
assert speed_min <= speed_max, "Minimum speed should be less than maximum speed."
def wait_roll(self, roll, accuracy, timeout=30):
def get_groundspeed(timeout2):
msg = self.mav.recv_match(type='VFR_HUD', blocking=True, timeout=timeout2)
if msg:
return msg.groundspeed
raise MsgRcvTimeoutException("Failed to get Groundspeed")
def validator(value2, target2=None):
if speed_min <= value2 <= speed_max:
return True
else:
return False
self.wait_and_maintain(value_name="Groundspeed", target=speed_min, current_value_getter=lambda: get_groundspeed(timeout), accuracy=(speed_max - speed_min), validator=lambda value2, target2: validator(value2, target2), timeout=timeout, **kwargs)
def wait_roll(self, roll, accuracy, timeout=30, **kwargs):
"""Wait for a given roll in degrees."""
tstart = self.get_sim_time()
self.progress("Waiting for roll of %d at %s" % (roll, time.ctime()))
while self.get_sim_time_cached() < tstart + timeout:
m = self.mav.recv_match(type='ATTITUDE', blocking=True)
p = math.degrees(m.pitch)
r = math.degrees(m.roll)
def get_roll(timeout2):
msg = self.mav.recv_match(type='ATTITUDE', blocking=True, timeout=timeout2)
if msg:
p = math.degrees(msg.pitch)
r = math.degrees(msg.roll)
self.progress("Roll %d Pitch %d" % (r, p))
if math.fabs(r - roll) <= accuracy:
self.progress("Attained roll %d" % roll)
return True
raise WaitRollTimeout("Failed to attain roll %d" % roll)
return r
raise MsgRcvTimeoutException("Failed to get Roll")
def wait_pitch(self, pitch, accuracy, timeout=30):
def validator(value2, target2):
return math.fabs((value2 - target2 + 180) % 360 - 180) <= accuracy
self.wait_and_maintain(value_name="Roll", target=roll, current_value_getter=lambda: get_roll(timeout), validator=lambda value2, target2: validator(value2, target2), accuracy=accuracy, timeout=timeout, **kwargs)
def wait_pitch(self, pitch, accuracy, timeout=30, **kwargs):
"""Wait for a given pitch in degrees."""
tstart = self.get_sim_time()
self.progress("Waiting for pitch of %.02f at %s" % (pitch, time.ctime()))
while self.get_sim_time_cached() < tstart + timeout:
m = self.mav.recv_match(type='ATTITUDE', blocking=True)
p = math.degrees(m.pitch)
r = math.degrees(m.roll)
def get_pitch(timeout2):
msg = self.mav.recv_match(type='ATTITUDE', blocking=True, timeout=timeout2)
if msg:
p = math.degrees(msg.pitch)
r = math.degrees(msg.roll)
self.progress("Pitch %d Roll %d" % (p, r))
if math.fabs(p - pitch) <= accuracy:
self.progress("Attained pitch %d" % pitch)
return True
raise WaitPitchTimeout("Failed to attain pitch %d" % pitch)
return p
raise MsgRcvTimeoutException("Failed to get Pitch")
def wait_heading(self, heading, accuracy=5, timeout=30):
"""Wait for a given heading."""
def validator(value2, target2):
return math.fabs((value2 - target2 + 180) % 360 - 180) <= accuracy
self.wait_and_maintain(value_name="Pitch", target=pitch, current_value_getter=lambda: get_pitch(timeout), validator=lambda value2, target2: validator(value2, target2), accuracy=accuracy, timeout=timeout, **kwargs)
def wait_and_maintain(self, value_name, target, current_value_getter, validator=None, accuracy=2.0, timeout=30, **kwargs):
tstart = self.get_sim_time()
self.progress("Waiting for heading %.02f with accuracy %.02f" %
(heading, accuracy))
achieving_duration_start = None
sum_of_achieved_values = 0.0
last_value = 0.0
count_of_achieved_values = 0
called_function = kwargs.get("called_function", None)
minimum_duration = kwargs.get("minimum_duration", 0)
self.progress("Waiting for %s %.02f with accuracy %.02f" % (value_name, target, accuracy))
last_print_time = 0
while True:
now = self.get_sim_time_cached()
if now - tstart >= timeout:
break
m = self.mav.recv_match(type='VFR_HUD', blocking=True)
if now - last_print_time > 1:
self.progress("Heading %d (want %f +- %f)" %
(m.heading, heading, accuracy))
last_print_time = now
if math.fabs(m.heading - heading) <= accuracy:
self.progress("Attained heading %d" % heading)
while self.get_sim_time_cached() < tstart + timeout: # if we failed to received message with the getter the sim time isn't updated
last_value = current_value_getter()
if called_function is not None:
called_function(last_value, target)
if self.get_sim_time_cached() - last_print_time > 1:
self.progress("%s=%0.2f (want %f +- %f)" %
(value_name, last_value, target, accuracy))
last_print_time = self.get_sim_time_cached()
if validator is not None:
is_value_valid = validator(last_value, target)
else:
is_value_valid = math.fabs(last_value - target) <= accuracy
if is_value_valid:
sum_of_achieved_values += last_value
count_of_achieved_values += 1.0
if achieving_duration_start is None:
achieving_duration_start = self.get_sim_time_cached()
if self.get_sim_time_cached() - achieving_duration_start >= minimum_duration:
self.progress("Attained %s=%f" % (value_name, sum_of_achieved_values / count_of_achieved_values))
return True
raise WaitHeadingTimeout("Failed to attain heading %d" % heading)
else:
achieving_duration_start = None
sum_of_achieved_values = 0.0
count_of_achieved_values = 0
raise AutoTestTimeoutException("Failed to attain %s %f, reach %f" % (value_name, target, (sum_of_achieved_values / count_of_achieved_values) if count_of_achieved_values != 0 else last_value))
def wait_distance(self, distance, accuracy=5, timeout=30):
def wait_heading(self, heading, accuracy=5, timeout=30, **kwargs):
"""Wait for a given heading."""
def get_heading_wrapped(timeout2):
msg = self.mav.recv_match(type='VFR_HUD', blocking=True, timeout=timeout2)
if msg:
return msg.heading
raise MsgRcvTimeoutException("Failed to get heading")
def validator(value2, target2):
return math.fabs((value2 - target2 + 180) % 360 - 180) <= accuracy
self.wait_and_maintain(value_name="Heading", target=heading, current_value_getter=lambda: get_heading_wrapped(timeout), validator=lambda value2, target2: validator(value2, target2), accuracy=accuracy, timeout=timeout, **kwargs)
def wait_distance(self, distance, accuracy=2, timeout=30, **kwargs):
"""Wait for flight of a given distance."""
tstart = self.get_sim_time()
start = self.mav.location()
last_distance_message = 0
while self.get_sim_time_cached() < tstart + timeout:
pos = self.mav.location()
delta = self.get_distance(start, pos)
if self.get_sim_time_cached() - last_distance_message >= 1:
self.progress("Distance=%.2f meters want=%.2f" %
(delta, distance))
last_distance_message = self.get_sim_time_cached()
if math.fabs(delta - distance) <= accuracy:
self.progress("Attained distance %.02f meters OK" % delta)
return True
if delta > (distance + accuracy):
raise WaitDistanceTimeout("Failed distance - overshoot delta=%f dist=%f"
% (delta, distance))
raise WaitDistanceTimeout("Failed to attain distance %u" % distance)
def wait_distance_to_location(self, location, distance_min, distance_max, timeout=30):
last_distance_message = 0
tstart = self.get_sim_time()
while self.get_sim_time_cached() < tstart + timeout:
pos = self.mav.location()
delta = self.get_distance(location, pos)
if self.get_sim_time_cached() - last_distance_message >= 1:
self.progress("Distance=%.2f meters want <%.2f and >%.2f" %
(delta, distance_min, distance_max))
last_distance_message = self.get_sim_time_cached()
if (delta >= distance_min and delta <= distance_max):
self.progress("Attained distance %.02f meters OK" % delta)
return True
raise WaitDistanceTimeout("Failed to attain distance <%.2f and >%.2f" %
(distance_min, distance_max))
def get_distance():
return self.get_distance(start, self.mav.location())
def validator(value2, target2):
return (value2 - target2) >= accuracy
self.wait_and_maintain(value_name="Distance", target=distance, current_value_getter=lambda: get_distance(), validator=lambda value2, target2: validator(value2, target2), accuracy=accuracy, timeout=timeout, **kwargs)
def wait_distance_to_location(self, location, distance_min, distance_max, timeout=30, **kwargs):
"""Wait for flight of a given distance."""
assert distance_min <= distance_max, "Distance min should be less than distance max."
def get_distance():
return self.get_distance(location, self.mav.location())
def validator(value2, target2=None):
return distance_min <= value2 <= distance_max
self.wait_and_maintain(value_name="Distance", target=distance_min, current_value_getter=lambda: get_distance(), validator=lambda value2, target2: validator(value2, target2), accuracy=(distance_max - distance_min), timeout=timeout, **kwargs)
def wait_servo_channel_value(self, channel, value, timeout=2, comparator=operator.eq):
"""wait for channel value comparison (default condition is equality)"""
@ -2915,36 +2927,28 @@ class AutoTest(ABC):
def wait_location(self,
loc,
accuracy=5,
accuracy=5.0,
timeout=30,
target_altitude=None,
height_accuracy=-1):
height_accuracy=-1,
**kwargs):
"""Wait for arrival at a location."""
tstart = self.get_sim_time()
if target_altitude is None:
target_altitude = loc.alt
self.progress("Waiting for location"
"%.4f,%.4f at altitude %.1f height_accuracy=%.1f" %
(loc.lat, loc.lng, target_altitude, height_accuracy))
last_distance_message = 0
closest = 10000000
last = 0
while self.get_sim_time() < tstart + timeout:
pos = self.mav.location()
delta = self.get_distance(loc, pos)
if self.get_sim_time_cached() - last_distance_message >= 1:
self.progress("Distance %.2f meters alt %.1f" % (delta, pos.alt))
last_distance_message = self.get_sim_time_cached()
if closest > delta:
closest = delta
last = delta
if delta <= accuracy:
height_delta = math.fabs(pos.alt - target_altitude)
def get_distance_to_loc():
return self.get_distance(self.mav.location(), loc)
def validator(value2, empty=None):
if value2 <= accuracy:
if target_altitude is not None:
height_delta = math.fabs(self.mav.location().alt - target_altitude)
if height_accuracy != -1 and height_delta > height_accuracy:
continue
self.progress("Reached location (%.2f meters)" % delta)
return False
return True
raise WaitLocationTimeout("Failed to attain location (want=<%f) (closest=%f) (last=%f)" % (accuracy, closest, last))
else:
return False
debug_text = "Distance to Location (%.4f, %.4f) " % (loc.lat, loc.lng)
if target_altitude is not None:
debug_text += "at altitude %.1f height_accuracy=%.1f" % (target_altitude, height_accuracy)
self.wait_and_maintain(value_name=debug_text, target=0, current_value_getter=lambda: get_distance_to_loc(), accuracy=accuracy, validator=lambda value2, target2: validator(value2, None), timeout=timeout, **kwargs)
def wait_current_waypoint(self, wpnum, timeout=60):
tstart = self.get_sim_time()