From d26c09ccacbe744b7640358b9678186a51352b45 Mon Sep 17 00:00:00 2001 From: Karthik Desai Date: Fri, 27 Apr 2018 16:48:06 +0200 Subject: [PATCH] Tools: autotest: Add and raise exceptions for wait_* helper functions --- Tools/autotest/common.py | 119 +++++++++++++++++++++++++++------------ 1 file changed, 82 insertions(+), 37 deletions(-) diff --git a/Tools/autotest/common.py b/Tools/autotest/common.py index 75af6a6ccc..f7d2ce248f 100644 --- a/Tools/autotest/common.py +++ b/Tools/autotest/common.py @@ -24,9 +24,64 @@ else: ABC = abc.ABCMeta('ABC', (), {}) -class AutoTestTimeoutException(Exception): +class ErrorException(Exception): + """Base class for other exceptions""" pass +class AutoTestTimeoutException(ErrorException): + pass + +class WaitModeTimeout(AutoTestTimeoutException): + """Thrown when fails to achieve given mode change.""" + pass + +class WaitAltitudeTimout(AutoTestTimeoutException): + """Thrown when fails to achieve given altitude range.""" + pass + +class WaitGroundSpeedTimeout(AutoTestTimeoutException): + """Thrown when fails to achieve given ground speed range.""" + pass + +class WaitRollTimeout(AutoTestTimeoutException): + """Thrown when fails to achieve given roll in degrees.""" + pass + +class WaitPitchTimeout(AutoTestTimeoutException): + """Thrown when fails to achieve given pitch in degrees.""" + pass + +class WaitHeadingTimeout(AutoTestTimeoutException): + """Thrown when fails to achieve given heading.""" + pass + +class WaitDistanceTimeout(AutoTestTimeoutException): + """Thrown when fails to attain distance""" + pass + +class WaitLocationTimeout(AutoTestTimeoutException): + """Thrown when fails to attain location""" + pass + +class WaitWaypointTimeout(AutoTestTimeoutException): + """Thrown when fails to attain waypoint ranges""" + pass + +class SetRCTimeout(AutoTestTimeoutException): + """Thrown when fails to send RC commands""" + pass + +class MsgRcvTimeoutException(AutoTestTimeoutException): + """Thrown when fails to receive an expected message""" + pass + +class NotAchievedException(ErrorException): + """Thrown when fails to achieve a goal""" + pass + +class PreconditionFailedException(ErrorException): + """Thrown when a precondition for a test is not met""" + pass class AutoTest(ABC): """Base abstract class. @@ -200,7 +255,6 @@ class AutoTest(ABC): self.mavproxy.expect("Finished downloading", timeout=timeout) self.mav.wait_heartbeat() self.mav.wait_heartbeat() - return True def show_gps_and_sim_positions(self, on_off): """Allow to display gps and actual position on map.""" @@ -255,8 +309,8 @@ class AutoTest(ABC): chan_pwm = getattr(m, "chan" + str(chan) + "_raw") if chan_pwm == pwm: return True - self.progress("Failed to send RC commands") - return False + self.progress("Failed to send RC commands to channel %s" % str(chan)) + raise SetRCTimeout() def arm_vehicle(self): """Arm vehicle with mavlink arm message.""" @@ -316,9 +370,8 @@ class AutoTest(ABC): timeout=10) if m is None: self.progress("AUTOPILOT_VERSION not received") - return False + raise NotAchievedException() self.progress("AUTOPILOT_VERSION received") - return True def do_set_mode_via_command_long(self): base_mode = mavutil.mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED @@ -331,11 +384,11 @@ class AutoTest(ABC): blocking=True, timeout=10) if m is None: - return False + raise ErrorException() if m.custom_mode == custom_mode: - return True + return time.sleep(0.1) - return False + return AutoTestTimeoutException() def reach_heading_manual(self, heading): """Manually direct the vehicle to the target heading.""" @@ -346,9 +399,7 @@ class AutoTest(ABC): mavutil.mavlink.MAV_TYPE_COAXIAL, mavutil.mavlink.MAV_TYPE_TRICOPTER]: self.mavproxy.send('rc 4 1580\n') - if not self.wait_heading(heading): - self.progress("Failed to reach heading") - return False + self.wait_heading(heading) self.mavproxy.send('rc 4 1500\n') self.mav.recv_match(condition='RC_CHANNELS.chan4_raw==1500', blocking=True) @@ -357,16 +408,13 @@ class AutoTest(ABC): if self.mav.mav_type == mavutil.mavlink.MAV_TYPE_GROUND_ROVER: self.mavproxy.send('rc 1 1700\n') self.mavproxy.send('rc 3 1550\n') - if not self.wait_heading(heading): - self.progress("Failed to reach heading") - return False + self.wait_heading(heading) self.mavproxy.send('rc 3 1500\n') self.mav.recv_match(condition='RC_CHANNELS.chan3_raw==1500', blocking=True) self.mavproxy.send('rc 1 1500\n') self.mav.recv_match(condition='RC_CHANNELS.chan1_raw==1500', blocking=True) - return True def reach_distance_manual(self, distance): """Manually direct the vehicle to the target distance from home.""" @@ -377,9 +425,7 @@ class AutoTest(ABC): mavutil.mavlink.MAV_TYPE_COAXIAL, mavutil.mavlink.MAV_TYPE_TRICOPTER]: self.mavproxy.send('rc 2 1350\n') - if not self.wait_distance(distance, accuracy=5, timeout=60): - self.progress("Failed to reach distance of %u" % distance) - return False + self.wait_distance(distance, accuracy=5, timeout=60) self.mavproxy.send('rc 2 1500\n') self.mav.recv_match(condition='RC_CHANNELS.chan2_raw==1500', blocking=True) @@ -387,13 +433,10 @@ class AutoTest(ABC): self.progress("NOT IMPLEMENTED") if self.mav.mav_type == mavutil.mavlink.MAV_TYPE_GROUND_ROVER: self.mavproxy.send('rc 3 1700\n') - if not self.wait_distance(distance, accuracy=2): - self.progress("Failed to reach distance of %u" % distance) - return False + self.wait_distance(distance, accuracy=2) self.mavproxy.send('rc 3 1500\n') self.mav.recv_match(condition='RC_CHANNELS.chan3_raw==1500', blocking=True) - return True ################################################# # WAIT UTILITIES @@ -423,7 +466,7 @@ class AutoTest(ABC): self.progress("Altitude OK") return True self.progress("Failed to attain altitude range") - return False + raise WaitAltitudeTimout() def wait_groundspeed(self, gs_min, gs_max, timeout=30): """Wait for a given ground speed range.""" @@ -437,7 +480,7 @@ class AutoTest(ABC): if m.groundspeed >= gs_min and m.groundspeed <= gs_max: return True self.progress("Failed to attain groundspeed range") - return False + raise WaitGroundSpeedTimeout() def wait_roll(self, roll, accuracy, timeout=30): """Wait for a given roll in degrees.""" @@ -452,7 +495,7 @@ class AutoTest(ABC): self.progress("Attained roll %d" % roll) return True self.progress("Failed to attain roll %d" % roll) - return False + raise WaitRollTimeout() def wait_pitch(self, pitch, accuracy, timeout=30): """Wait for a given pitch in degrees.""" @@ -467,7 +510,7 @@ class AutoTest(ABC): self.progress("Attained pitch %d" % pitch) return True self.progress("Failed to attain pitch %d" % pitch) - return False + raise WaitPitchTimeout() def wait_heading(self, heading, accuracy=5, timeout=30): """Wait for a given heading.""" @@ -481,7 +524,7 @@ class AutoTest(ABC): self.progress("Attained heading %u" % heading) return True self.progress("Failed to attain heading %u" % heading) - return False + raise WaitHeadingTimeout() def wait_distance(self, distance, accuracy=5, timeout=30): """Wait for flight of a given distance.""" @@ -497,9 +540,9 @@ class AutoTest(ABC): if delta > (distance + accuracy): self.progress("Failed distance - overshoot delta=%f dist=%f" % (delta, distance)) - return False + raise WaitDistanceTimeout() self.progress("Failed to attain distance %u" % distance) - return False + raise WaitDistanceTimeout() def wait_location(self, loc, @@ -525,7 +568,7 @@ class AutoTest(ABC): self.progress("Reached location (%.2f meters)" % delta) return True self.progress("Failed to attain location") - return False + raise WaitLocationTimeout() def wait_waypoint(self, wpnum_start, @@ -545,7 +588,7 @@ class AutoTest(ABC): # if start_wp != wpnum_start: # self.progress("test: Expected start waypoint %u but got %u" % # (wpnum_start, start_wp)) - # return False + # raise WaitWaypointTimeout() while self.get_sim_time() < tstart + timeout: seq = self.mav.waypoint_current() @@ -557,7 +600,7 @@ class AutoTest(ABC): # if we changed mode, fail if self.mav.flightmode != mode: self.progress('Exited %s mode' % mode) - return False + raise WaitWaypointTimeout() self.progress("test: WP %u (wp_dist=%u Alt=%d), current_wp: %u," "wpnum_end: %u" % @@ -579,10 +622,10 @@ class AutoTest(ABC): if seq > current_wp+1: self.progress("Failed: Skipped waypoint! Got wp %u expected %u" % (seq, current_wp+1)) - return False + raise WaitWaypointTimeout() self.progress("Failed: Timed out waiting for waypoint %u of %u" % (wpnum_end, wpnum_end)) - return False + raise WaitWaypointTimeout() def wait_mode(self, mode, timeout=None): """Wait for mode to change.""" @@ -594,7 +637,9 @@ class AutoTest(ABC): hastimeout = self.get_sim_time() > tstart + timeout self.mav.wait_heartbeat() self.progress("Got mode %s" % mode) - return self.mav.flightmode + if self.mav.flightmode != mode and hastimeout: + raise WaitModeTimeout() + return True def wait_ready_to_arm(self, timeout=None): # wait for EKF checks to pass @@ -614,7 +659,7 @@ class AutoTest(ABC): (required_value, current)) if current == required_value: self.progress("EKF Flags OK") - return + return True self.progress("Failed to get EKF.flags=%u" % required_value) raise AutoTestTimeoutException()