Tools: autotest: Add and raise exceptions for wait_* helper functions

This commit is contained in:
Karthik Desai 2018-04-27 16:48:06 +02:00 committed by Peter Barker
parent cde2247a0c
commit d26c09ccac
1 changed files with 82 additions and 37 deletions

View File

@ -24,9 +24,64 @@ else:
ABC = abc.ABCMeta('ABC', (), {})
class AutoTestTimeoutException(Exception):
class ErrorException(Exception):
"""Base class for other exceptions"""
pass
class AutoTestTimeoutException(ErrorException):
pass
class WaitModeTimeout(AutoTestTimeoutException):
"""Thrown when fails to achieve given mode change."""
pass
class WaitAltitudeTimout(AutoTestTimeoutException):
"""Thrown when fails to achieve given altitude range."""
pass
class WaitGroundSpeedTimeout(AutoTestTimeoutException):
"""Thrown when fails to achieve given ground speed range."""
pass
class WaitRollTimeout(AutoTestTimeoutException):
"""Thrown when fails to achieve given roll in degrees."""
pass
class WaitPitchTimeout(AutoTestTimeoutException):
"""Thrown when fails to achieve given pitch in degrees."""
pass
class WaitHeadingTimeout(AutoTestTimeoutException):
"""Thrown when fails to achieve given heading."""
pass
class WaitDistanceTimeout(AutoTestTimeoutException):
"""Thrown when fails to attain distance"""
pass
class WaitLocationTimeout(AutoTestTimeoutException):
"""Thrown when fails to attain location"""
pass
class WaitWaypointTimeout(AutoTestTimeoutException):
"""Thrown when fails to attain waypoint ranges"""
pass
class SetRCTimeout(AutoTestTimeoutException):
"""Thrown when fails to send RC commands"""
pass
class MsgRcvTimeoutException(AutoTestTimeoutException):
"""Thrown when fails to receive an expected message"""
pass
class NotAchievedException(ErrorException):
"""Thrown when fails to achieve a goal"""
pass
class PreconditionFailedException(ErrorException):
"""Thrown when a precondition for a test is not met"""
pass
class AutoTest(ABC):
"""Base abstract class.
@ -200,7 +255,6 @@ class AutoTest(ABC):
self.mavproxy.expect("Finished downloading", timeout=timeout)
self.mav.wait_heartbeat()
self.mav.wait_heartbeat()
return True
def show_gps_and_sim_positions(self, on_off):
"""Allow to display gps and actual position on map."""
@ -255,8 +309,8 @@ class AutoTest(ABC):
chan_pwm = getattr(m, "chan" + str(chan) + "_raw")
if chan_pwm == pwm:
return True
self.progress("Failed to send RC commands")
return False
self.progress("Failed to send RC commands to channel %s" % str(chan))
raise SetRCTimeout()
def arm_vehicle(self):
"""Arm vehicle with mavlink arm message."""
@ -316,9 +370,8 @@ class AutoTest(ABC):
timeout=10)
if m is None:
self.progress("AUTOPILOT_VERSION not received")
return False
raise NotAchievedException()
self.progress("AUTOPILOT_VERSION received")
return True
def do_set_mode_via_command_long(self):
base_mode = mavutil.mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED
@ -331,11 +384,11 @@ class AutoTest(ABC):
blocking=True,
timeout=10)
if m is None:
return False
raise ErrorException()
if m.custom_mode == custom_mode:
return True
return
time.sleep(0.1)
return False
return AutoTestTimeoutException()
def reach_heading_manual(self, heading):
"""Manually direct the vehicle to the target heading."""
@ -346,9 +399,7 @@ class AutoTest(ABC):
mavutil.mavlink.MAV_TYPE_COAXIAL,
mavutil.mavlink.MAV_TYPE_TRICOPTER]:
self.mavproxy.send('rc 4 1580\n')
if not self.wait_heading(heading):
self.progress("Failed to reach heading")
return False
self.wait_heading(heading)
self.mavproxy.send('rc 4 1500\n')
self.mav.recv_match(condition='RC_CHANNELS.chan4_raw==1500',
blocking=True)
@ -357,16 +408,13 @@ class AutoTest(ABC):
if self.mav.mav_type == mavutil.mavlink.MAV_TYPE_GROUND_ROVER:
self.mavproxy.send('rc 1 1700\n')
self.mavproxy.send('rc 3 1550\n')
if not self.wait_heading(heading):
self.progress("Failed to reach heading")
return False
self.wait_heading(heading)
self.mavproxy.send('rc 3 1500\n')
self.mav.recv_match(condition='RC_CHANNELS.chan3_raw==1500',
blocking=True)
self.mavproxy.send('rc 1 1500\n')
self.mav.recv_match(condition='RC_CHANNELS.chan1_raw==1500',
blocking=True)
return True
def reach_distance_manual(self, distance):
"""Manually direct the vehicle to the target distance from home."""
@ -377,9 +425,7 @@ class AutoTest(ABC):
mavutil.mavlink.MAV_TYPE_COAXIAL,
mavutil.mavlink.MAV_TYPE_TRICOPTER]:
self.mavproxy.send('rc 2 1350\n')
if not self.wait_distance(distance, accuracy=5, timeout=60):
self.progress("Failed to reach distance of %u" % distance)
return False
self.wait_distance(distance, accuracy=5, timeout=60)
self.mavproxy.send('rc 2 1500\n')
self.mav.recv_match(condition='RC_CHANNELS.chan2_raw==1500',
blocking=True)
@ -387,13 +433,10 @@ class AutoTest(ABC):
self.progress("NOT IMPLEMENTED")
if self.mav.mav_type == mavutil.mavlink.MAV_TYPE_GROUND_ROVER:
self.mavproxy.send('rc 3 1700\n')
if not self.wait_distance(distance, accuracy=2):
self.progress("Failed to reach distance of %u" % distance)
return False
self.wait_distance(distance, accuracy=2)
self.mavproxy.send('rc 3 1500\n')
self.mav.recv_match(condition='RC_CHANNELS.chan3_raw==1500',
blocking=True)
return True
#################################################
# WAIT UTILITIES
@ -423,7 +466,7 @@ class AutoTest(ABC):
self.progress("Altitude OK")
return True
self.progress("Failed to attain altitude range")
return False
raise WaitAltitudeTimout()
def wait_groundspeed(self, gs_min, gs_max, timeout=30):
"""Wait for a given ground speed range."""
@ -437,7 +480,7 @@ class AutoTest(ABC):
if m.groundspeed >= gs_min and m.groundspeed <= gs_max:
return True
self.progress("Failed to attain groundspeed range")
return False
raise WaitGroundSpeedTimeout()
def wait_roll(self, roll, accuracy, timeout=30):
"""Wait for a given roll in degrees."""
@ -452,7 +495,7 @@ class AutoTest(ABC):
self.progress("Attained roll %d" % roll)
return True
self.progress("Failed to attain roll %d" % roll)
return False
raise WaitRollTimeout()
def wait_pitch(self, pitch, accuracy, timeout=30):
"""Wait for a given pitch in degrees."""
@ -467,7 +510,7 @@ class AutoTest(ABC):
self.progress("Attained pitch %d" % pitch)
return True
self.progress("Failed to attain pitch %d" % pitch)
return False
raise WaitPitchTimeout()
def wait_heading(self, heading, accuracy=5, timeout=30):
"""Wait for a given heading."""
@ -481,7 +524,7 @@ class AutoTest(ABC):
self.progress("Attained heading %u" % heading)
return True
self.progress("Failed to attain heading %u" % heading)
return False
raise WaitHeadingTimeout()
def wait_distance(self, distance, accuracy=5, timeout=30):
"""Wait for flight of a given distance."""
@ -497,9 +540,9 @@ class AutoTest(ABC):
if delta > (distance + accuracy):
self.progress("Failed distance - overshoot delta=%f dist=%f"
% (delta, distance))
return False
raise WaitDistanceTimeout()
self.progress("Failed to attain distance %u" % distance)
return False
raise WaitDistanceTimeout()
def wait_location(self,
loc,
@ -525,7 +568,7 @@ class AutoTest(ABC):
self.progress("Reached location (%.2f meters)" % delta)
return True
self.progress("Failed to attain location")
return False
raise WaitLocationTimeout()
def wait_waypoint(self,
wpnum_start,
@ -545,7 +588,7 @@ class AutoTest(ABC):
# if start_wp != wpnum_start:
# self.progress("test: Expected start waypoint %u but got %u" %
# (wpnum_start, start_wp))
# return False
# raise WaitWaypointTimeout()
while self.get_sim_time() < tstart + timeout:
seq = self.mav.waypoint_current()
@ -557,7 +600,7 @@ class AutoTest(ABC):
# if we changed mode, fail
if self.mav.flightmode != mode:
self.progress('Exited %s mode' % mode)
return False
raise WaitWaypointTimeout()
self.progress("test: WP %u (wp_dist=%u Alt=%d), current_wp: %u,"
"wpnum_end: %u" %
@ -579,10 +622,10 @@ class AutoTest(ABC):
if seq > current_wp+1:
self.progress("Failed: Skipped waypoint! Got wp %u expected %u"
% (seq, current_wp+1))
return False
raise WaitWaypointTimeout()
self.progress("Failed: Timed out waiting for waypoint %u of %u" %
(wpnum_end, wpnum_end))
return False
raise WaitWaypointTimeout()
def wait_mode(self, mode, timeout=None):
"""Wait for mode to change."""
@ -594,7 +637,9 @@ class AutoTest(ABC):
hastimeout = self.get_sim_time() > tstart + timeout
self.mav.wait_heartbeat()
self.progress("Got mode %s" % mode)
return self.mav.flightmode
if self.mav.flightmode != mode and hastimeout:
raise WaitModeTimeout()
return True
def wait_ready_to_arm(self, timeout=None):
# wait for EKF checks to pass
@ -614,7 +659,7 @@ class AutoTest(ABC):
(required_value, current))
if current == required_value:
self.progress("EKF Flags OK")
return
return True
self.progress("Failed to get EKF.flags=%u" % required_value)
raise AutoTestTimeoutException()