autotest: use assert_receive_message to remove redundant code

This commit is contained in:
Peter Barker 2022-02-10 11:59:34 +11:00 committed by Peter Barker
parent 0f05cb06fa
commit 376d7ade50
8 changed files with 53 additions and 207 deletions

View File

@ -86,9 +86,7 @@ class AutoTestTracker(AutoTest):
0, # pitch rate
0, # yaw rate
0) # thrust, 0 to 1, translated to a climb/descent rate
m = self.mav.recv_match(type='ATTITUDE', blocking=True, timeout=2)
if m is None:
raise NotAchievedException("Did not get ATTITUDE")
m = self.assert_receive_message('ATTITUDE', timeout=2)
if now - last_debug > 1:
last_debug = now
self.progress("yaw=%f desyaw=%f pitch=%f despitch=%f" %

View File

@ -1241,10 +1241,7 @@ class AutoTestCopter(AutoTest):
self.wait_ready_to_arm()
# fence requires home to be set:
m = self.poll_home_position()
if m is None:
raise NotAchievedException("Did not receive HOME_POSITION")
self.progress("home: %s" % str(m))
m = self.poll_home_position(quiet=False)
self.start_subtest("ensure we can't arm if outside fence")
self.load_fence("fence-in-middle-of-nowhere.txt")
@ -2937,11 +2934,7 @@ class AutoTestCopter(AutoTest):
self.reboot_sitl()
self.progress("Making sure we now get RANGEFINDER messages")
m = self.mav.recv_match(type='RANGEFINDER',
blocking=True,
timeout=10)
if m is None:
raise NotAchievedException("Did not get expected RANGEFINDER msg")
m = self.assert_receive_message('RANGEFINDER', timeout=10)
self.progress("Checking RangeFinder is marked as enabled in mavlink")
m = self.mav.recv_match(type='SYS_STATUS',
@ -3931,10 +3924,7 @@ class AutoTestCopter(AutoTest):
0, # yaw
0, # yawrate
)
m = self.mav.recv_match(type='POSITION_TARGET_LOCAL_NED', blocking=True, timeout=1)
if m is None:
raise NotAchievedException("Did not receive any message for 1 sec")
m = self.assert_receive_message('POSITION_TARGET_LOCAL_NED')
self.progress("Received local target: %s" % str(m))
@ -6170,9 +6160,7 @@ class AutoTestCopter(AutoTest):
if self.get_sim_time_cached() - tstart > timeout:
raise NotAchievedException("Did not move to state/speed")
m = self.mav.recv_match(type="GENERATOR_STATUS", blocking=True, timeout=10)
if m is None:
raise NotAchievedException("Did not get GENERATOR_STATUS")
m = self.assert_receive_message("GENERATOR_STATUS", timeout=10)
if m.generator_speed < rpm_min:
self.progress("Too slow (%u<%u)" % (m.generator_speed, rpm_min))
@ -7336,10 +7324,7 @@ class AutoTestCopter(AutoTest):
if self.get_sim_time_cached() - tstart > 30 and self.mode_is('LAND'):
self.progress("Bug not reproduced")
break
m = self.mav.recv_match(type='GLOBAL_POSITION_INT', blocking=True, timeout=1)
self.progress("Received (%s)" % str(m))
if m is None:
raise NotAchievedException("No GLOBAL_POSITION_INT?!")
m = self.assert_receive_message('GLOBAL_POSITION_INT', timeout=1, verbose=True)
pos_delta = self.get_distance_int(old_pos, m)
self.progress("Distance: %f" % pos_delta)
if pos_delta < 5:
@ -7634,10 +7619,7 @@ class AutoTestCopter(AutoTest):
if self.get_sim_time_cached() - tstart > 30 and self.mode_is('LAND'):
self.progress("Bug not reproduced")
break
m = self.mav.recv_match(type='GLOBAL_POSITION_INT', blocking=True, timeout=1)
self.progress("Received (%s)" % str(m))
if m is None:
raise NotAchievedException("No GLOBAL_POSITION_INT?!")
m = self.assert_receive_message('GLOBAL_POSITION_INT', timeout=1, verbose=True)
pos_delta = self.get_distance_int(old_pos, m)
self.progress("Distance: %f" % pos_delta)
if pos_delta < 5:

View File

@ -628,9 +628,7 @@ class AutoTestPlane(AutoTest):
self.reboot_sitl()
self.wait_ready_to_arm()
m = self.mav.recv_match(type='BATTERY_STATUS', blocking=True, timeout=10)
if m is None:
raise NotAchievedException("Did not get BATTERY_STATUS message")
m = self.assert_receive_message('BATTERY_STATUS', timeout=10)
if m.voltages_ext[0] == 65536:
raise NotAchievedException("Flag value rather than voltage")
if abs(m.voltages_ext[0] - 1000) > 300:
@ -1094,9 +1092,7 @@ class AutoTestPlane(AutoTest):
def assert_fence_sys_status(self, present, enabled, health):
self.delay_sim_time(1)
self.drain_mav_unparsed()
m = self.mav.recv_match(type='SYS_STATUS', blocking=True, timeout=1)
if m is None:
raise NotAchievedException("Did not receive SYS_STATUS")
m = self.assert_receive_message('SYS_STATUS', timeout=1)
tests = [
("present", present, m.onboard_control_sensors_present),
("enabled", enabled, m.onboard_control_sensors_enabled),
@ -1170,9 +1166,7 @@ class AutoTestPlane(AutoTest):
self.assert_fence_sys_status(True, False, True)
self.do_fence_enable()
self.assert_fence_sys_status(True, True, True)
m = self.mav.recv_match(type='FENCE_STATUS', blocking=True, timeout=2)
if m is None:
raise NotAchievedException("Did not get FENCE_STATUS")
m = self.assert_receive_message('FENCE_STATUS', timeout=2)
if m.breach_status:
raise NotAchievedException("Breached fence unexpectedly (%u)" %
(m.breach_status))
@ -1302,9 +1296,7 @@ class AutoTestPlane(AutoTest):
while True:
if self.get_sim_time() - tstart > 30:
raise NotAchievedException("Did not breach fence")
m = self.mav.recv_match(type='FENCE_STATUS', blocking=True, timeout=2)
if m is None:
raise NotAchievedException("Did not get FENCE_STATUS")
m = self.assert_receive_message('FENCE_STATUS', timeout=2)
if m.breach_status == 0:
continue
@ -1963,9 +1955,7 @@ function'''
self.delay_sim_time(2) # TODO: work out why this is required...
self.test_adsb_send_threatening_adsb_message(here)
self.progress("Waiting for collision message")
m = self.mav.recv_match(type='COLLISION', blocking=True, timeout=4)
if m is None:
raise NotAchievedException("Did not get collision message")
m = self.assert_receive_message('COLLISION', timeout=4)
if m.threat_level != 2:
raise NotAchievedException("Expected some threat at least")
if m.action != mavutil.mavlink.MAV_COLLISION_ACTION_RTL:
@ -2034,9 +2024,7 @@ function'''
int(loc.lng * 1e7), # longitude
loc.alt, # altitude
mavutil.mavlink.MAV_MISSION_TYPE_MISSION)
m = self.mav.recv_match(type='MISSION_ACK', blocking=True, timeout=5)
if m is None:
raise NotAchievedException("Did not get MISSION_ACK")
m = self.assert_receive_message('MISSION_ACK', timeout=5)
if m.type != mavutil.mavlink.MAV_MISSION_ERROR:
raise NotAchievedException("Did not get appropriate error")
@ -2058,9 +2046,7 @@ function'''
int(loc.lng * 1e7), # longitude
desired_relative_alt, # altitude
mavutil.mavlink.MAV_MISSION_TYPE_MISSION)
m = self.mav.recv_match(type='MISSION_ACK', blocking=True, timeout=5)
if m is None:
raise NotAchievedException("Did not get MISSION_ACK")
m = self.assert_receive_message('MISSION_ACK', timeout=5)
if m.type != mavutil.mavlink.MAV_MISSION_ACCEPTED:
raise NotAchievedException("Did not get accepted response")
self.wait_location(loc, accuracy=100) # based on loiter radius
@ -2113,14 +2099,10 @@ function'''
raise NotAchievedException("Did not get VFR_HUD")
new_throttle = m.throttle
alt = m.alt
m = self.mav.recv_match(type='ATTITUDE', blocking=True, timeout=5)
if m is None:
raise NotAchievedException("Did not get ATTITUDE")
m = self.assert_receive_message('ATTITUDE', timeout=5)
pitch = math.degrees(m.pitch)
self.progress("Pitch:%f throttle:%u alt:%f" % (pitch, new_throttle, alt))
m = self.mav.recv_match(type='VFR_HUD', blocking=True, timeout=5)
if m is None:
raise NotAchievedException("Did not get VFR_HUD")
m = self.assert_receive_message('VFR_HUD', timeout=5)
initial_throttle = m.throttle
initial_alt = m.alt
self.progress("Initial throttle: %u" % initial_throttle)
@ -2136,14 +2118,10 @@ function'''
'''
if now - tstart > 60:
raise NotAchievedException("Did not see increase in throttle")
m = self.mav.recv_match(type='VFR_HUD', blocking=True, timeout=5)
if m is None:
raise NotAchievedException("Did not get VFR_HUD")
m = self.assert_receive_message('VFR_HUD', timeout=5)
new_throttle = m.throttle
alt = m.alt
m = self.mav.recv_match(type='ATTITUDE', blocking=True, timeout=5)
if m is None:
raise NotAchievedException("Did not get ATTITUDE")
m = self.assert_receive_message('ATTITUDE', timeout=5)
pitch = math.degrees(m.pitch)
self.progress("Pitch:%f throttle:%u alt:%f" % (pitch, new_throttle, alt))
if new_throttle - initial_throttle > 20:
@ -2581,9 +2559,7 @@ function'''
self.set_parameter("SIM_IMUT_FIXED", temp)
self.delay_sim_time(2)
for msg in ['RAW_IMU', 'SCALED_IMU2']:
m = self.mav.recv_match(type=msg, blocking=True, timeout=2)
if m is None:
raise NotAchievedException(msg)
m = self.assert_receive_message(msg, timeout=2)
temperature = m.temperature*0.01
if abs(temperature - temp) > 0.2:
@ -2610,9 +2586,7 @@ function'''
self.wait_heartbeat()
self.wait_heartbeat()
for msg in ['RAW_IMU', 'SCALED_IMU2']:
m = self.mav.recv_match(type=msg, blocking=True, timeout=2)
if m is None:
raise NotAchievedException(msg)
m = self.assert_receive_message(msg, timeout=2)
temperature = m.temperature*0.01
if abs(temperature - temp) > 0.2:

View File

@ -245,9 +245,7 @@ class AutoTestSub(AutoTest):
self.disarm_vehicle()
self.progress("Manual dive OK")
m = self.mav.recv_match(type='SCALED_PRESSURE3', blocking=True)
if m is None:
raise NotAchievedException("Did not get SCALED_PRESSURE3")
m = self.assert_receive_message('SCALED_PRESSURE3')
if m.temperature != 2650:
raise NotAchievedException("Did not get correct TSYS01 temperature")

View File

@ -75,9 +75,7 @@ class AutoTestBalanceBot(AutoTestRover):
self.arm_vehicle()
self.set_rc(3, 1600)
m = self.mav.recv_match(type='WHEEL_DISTANCE', blocking=True, timeout=5)
if m is None:
raise NotAchievedException("Did not get WHEEL_DISTANCE")
m = self.assert_receive_message('WHEEL_DISTANCE', timeout=5)
tstart = self.get_sim_time()
while True:

View File

@ -1701,9 +1701,7 @@ class AutoTest(ABC):
self.mav.mav.fence_fetch_point_send(target_system,
target_component,
idx)
m = self.mav.recv_match(type="FENCE_POINT", blocking=True, timeout=2)
if m is None:
raise NotAchievedException("Did not get fence return point back")
m = self.assert_receive_message("FENCE_POINT", timeout=2)
self.progress("m: %s" % str(m))
if m.idx != idx:
raise NotAchievedException("Invalid idx returned (want=%u got=%u)" %
@ -2855,11 +2853,7 @@ class AutoTest(ABC):
log_id,
ofs,
count)
m = self.mav.recv_match(type='LOG_DATA',
blocking=True,
timeout=2)
if m is None:
raise NotAchievedException("Did not get log data")
m = self.assert_receive_message('LOG_DATA', timeout=2)
if m.ofs != ofs:
raise NotAchievedException("Incorrect offset")
if m.count != count:
@ -2918,11 +2912,7 @@ class AutoTest(ABC):
break
if self.get_sim_time_cached() - tstart > 120:
raise NotAchievedException("Did not download log in good time")
m = self.mav.recv_match(type='LOG_DATA',
blocking=True,
timeout=2)
if m is None:
raise NotAchievedException("Did not get data")
m = self.assert_receive_message('LOG_DATA', timeout=2)
if m.ofs != bytes_read:
raise NotAchievedException("Unexpected offset")
if m.id != log_id:
@ -2956,11 +2946,7 @@ class AutoTest(ABC):
bytes_read,
bytes_to_fetch
)
m = self.mav.recv_match(type='LOG_DATA',
blocking=True,
timeout=2)
if m is None:
raise NotAchievedException("Did not get reply")
m = self.assert_receive_message('LOG_DATA', timeout=2)
self.progress("Read %u bytes at offset %u" % (m.count, m.ofs))
if m.ofs != bytes_read:
raise NotAchievedException("Incorrect offset in reply want=%u got=%u (%s)" % (bytes_read, m.ofs, str(m)))
@ -2998,11 +2984,7 @@ class AutoTest(ABC):
ofs,
bytes_to_fetch
)
m = self.mav.recv_match(type='LOG_DATA',
blocking=True,
timeout=2)
if m is None:
raise NotAchievedException("Did not get reply")
m = self.assert_receive_message('LOG_DATA', timeout=2)
if m.count == 0:
raise NotAchievedException("xZero bytes read (ofs=%u)" % (ofs,))
if m.count > bytes_to_fetch:
@ -3168,9 +3150,7 @@ class AutoTest(ABC):
temp_ok = False
last_print_temp = -100
while self.get_sim_time_cached() - tstart < timeout:
m = self.mav.recv_match(type='RAW_IMU', blocking=True, timeout=2)
if m is None:
raise NotAchievedException("RAW_IMU")
m = self.assert_receive_message('RAW_IMU', timeout=2)
temperature = m.temperature*0.01
if temperature >= target:
self.progress("Reached temperature %.1f" % temperature)
@ -4027,9 +4007,7 @@ class AutoTest(ABC):
"""Ensure all rc outputs are at defaults"""
self.do_timesync_roundtrip()
_defaults = self.rc_defaults()
m = self.mav.recv_match(type='RC_CHANNELS', blocking=True, timeout=5)
if m is None:
raise NotAchievedException("No RC_CHANNELS messages?!")
m = self.assert_receive_message('RC_CHANNELS', timeout=5)
need_set = {}
for chan in _defaults:
default_value = _defaults[chan]
@ -4346,10 +4324,7 @@ class AutoTest(ABC):
while True:
if time.time() - start > timeout:
raise NotAchievedException("Did not achieve value")
m = self.mav.recv_match(type='SERVO_OUTPUT_RAW', blocking=True, timeout=1)
if m is None:
raise NotAchievedException("Did not get SERVO_OUTPUT_RAW")
m = self.assert_receive_message('SERVO_OUTPUT_RAW', timeout=1)
channel_field = "servo%u_raw" % channel
m_value = getattr(m, channel_field, None)
self.progress("Servo%u=%u want=%u" % (channel, m_value, value))
@ -5082,11 +5057,7 @@ class AutoTest(ABC):
0,
0,
0)
m = self.mav.recv_match(type='AUTOPILOT_VERSION',
blocking=True,
timeout=10)
if m is None:
raise NotAchievedException("Did not get AUTOPILOT_VERSION")
m = self.assert_receive_message('AUTOPILOT_VERSION', timeout=10)
return m.capabilities
def test_get_autopilot_capabilities(self):
@ -5317,12 +5288,7 @@ class AutoTest(ABC):
(quality, m.signal_quality))
def get_rangefinder_distance(self):
m = self.mav.recv_match(type='RANGEFINDER',
blocking=True,
timeout=5)
if m is None:
raise NotAchievedException("Did not get RANGEFINDER message")
m = self.assert_receive_message('RANGEFINDER', timeout=5)
return m.distance
def wait_rangefinder_distance(self, dist_min, dist_max, timeout=30, **kwargs):
@ -5346,10 +5312,7 @@ class AutoTest(ABC):
def get_esc_rpm(self, esc):
if esc > 4:
raise ValueError("Only does 1-4")
m = self.mav.recv_match(type='ESC_TELEMETRY_1_TO_4', blocking=True, timeout=1)
if m is None:
raise NotAchievedException("Did not get ESC_TELEMETRY_1_TO_4")
self.progress("%s" % str(m))
m = self.assert_receive_message('ESC_TELEMETRY_1_TO_4', timeout=1, verbose=True)
return m.rpm[esc-1]
def find_first_set_bit(self, mask):
@ -5941,8 +5904,7 @@ class AutoTest(ABC):
last_wp_msg = 0
while self.get_sim_time_cached() < tstart + timeout:
seq = self.mav.waypoint_current()
m = self.mav.recv_match(type='NAV_CONTROLLER_OUTPUT',
blocking=True)
m = self.assert_receive_message('NAV_CONTROLLER_OUTPUT')
wp_dist = m.wp_dist
m = self.mav.recv_match(type='VFR_HUD', blocking=True)
@ -6027,11 +5989,7 @@ class AutoTest(ABC):
return self.sensor_has_state(sensor, present, enabled, healthy, do_assert=True, verbose=verbose)
def sensor_has_state(self, sensor, present=True, enabled=True, healthy=True, do_assert=False, verbose=False):
m = self.mav.recv_match(type='SYS_STATUS', blocking=True, timeout=5)
if m is None:
raise NotAchievedException("Did not receive SYS_STATUS")
if verbose:
self.progress("Status: %s" % str(self.dump_message_verbose(m)))
m = self.assert_receive_message('SYS_STATUS', timeout=5, very_verbose=verbose)
reported_present = m.onboard_control_sensors_present & sensor
reported_enabled = m.onboard_control_sensors_enabled & sensor
reported_healthy = m.onboard_control_sensors_health & sensor
@ -6151,9 +6109,7 @@ class AutoTest(ABC):
self.wait_gps_sys_status_not_present_or_enabled_and_healthy()
armable_time = self.get_sim_time() - start
if require_absolute:
m = self.poll_home_position()
if m is None:
raise NotAchievedException("Did not receive a home position")
self.poll_home_position()
if check_prearm_bit:
self.wait_prearm_sys_status_healthy(timeout=timeout)
self.progress("Took %u seconds to become armable" % armable_time)
@ -6852,11 +6808,7 @@ Also, ignores heartbeats not from our target system'''
self.mav.mav.send(items[m.seq])
remaining_to_send.discard(m.seq)
sent.add(m.seq)
m = self.mav.recv_match(type='MISSION_ACK',
blocking=True,
timeout=1)
if m is None:
raise NotAchievedException("Did not receive MISSION_ACK")
m = self.assert_receive_message('MISSION_ACK', timeout=1)
if m.mission_type != mission_type:
raise NotAchievedException("Mission ack not of expected mission type")
if m.type != mavutil.mavlink.MAV_MISSION_ACCEPTED:
@ -6972,13 +6924,7 @@ Also, ignores heartbeats not from our target system'''
'''returns distance to waypoint navigation target in metres'''
m = self.mav.messages.get("NAV_CONTROLLER_OUTPUT", None)
if m is None or not use_cached_nav_controller_output:
m = self.mav.recv_match(
type='NAV_CONTROLLER_OUTPUT',
blocking=True,
timeout=10,
)
if m is None:
raise NotAchievedException("Did not get NAV_CONTROLLER_OUTPUT")
m = self.assert_receive_message('NAV_CONTROLLER_OUTPUT', timeout=10)
return m.wp_dist
def distance_to_home(self, use_cached_home=False):
@ -8559,9 +8505,7 @@ Also, ignores heartbeats not from our target system'''
rate = round(self.get_message_rate("CAMERA_FEEDBACK", 10))
if rate != 0:
raise PreconditionFailedException("Receiving camera feedback")
m = self.poll_message("CAMERA_FEEDBACK")
if m is None:
raise NotAchievedException("Requested CAMERA_FEEDBACK did not arrive")
self.poll_message("CAMERA_FEEDBACK")
def clear_mission(self, mission_type, target_system=1, target_component=1):
'''clear mision_type from autopilot. Note that this does NOT actually
@ -8581,11 +8525,7 @@ Also, ignores heartbeats not from our target system'''
target_component,
0,
mission_type)
m = self.mav.recv_match(type='MISSION_ACK',
blocking=True,
timeout=5)
if m is None:
raise NotAchievedException("Expected ACK for clearing mission")
m = self.assert_receive_message('MISSION_ACK', timeout=5)
if m.target_system != self.mav.mav.srcSystem:
raise NotAchievedException("ACK not targetted at correct system want=%u got=%u" %
(self.mav.mav.srcSystem, m.target_system))
@ -9775,9 +9715,7 @@ switch value'''
self.assert_not_receiving_message('PID_TUNING', timeout=5)
self.set_parameter("GCS_PID_MASK", 1)
self.progress("making sure we are now getting PID_TUNING messages")
m = self.mav.recv_match(type='PID_TUNING', blocking=True, timeout=5)
if m is None:
raise PreconditionFailedException("Did not start to get PID_TUNING message")
self.assert_receive_message('PID_TUNING', timeout=5)
def sample_mission_filename(self):
return "flaps.txt"
@ -10082,10 +10020,7 @@ switch value'''
raise NotAchievedException("Received BUTTON_CHANGE event")
mask = 1 << pin
self.set_parameter("SIM_PIN_MASK", mask)
m = self.mav.recv_match(type='BUTTON_CHANGE', blocking=True, timeout=1)
if m is None:
raise NotAchievedException("Did not receive BUTTON_CHANGE event")
self.progress("### m: %s" % str(m))
m = self.assert_receive_message('BUTTON_CHANGE', timeout=1, verbose=True)
if not (m.state & mask):
raise NotAchievedException("Bit not set in mask (got=%u want=%u)" % (m.state, mask))
m2 = self.mav.recv_match(type='BUTTON_CHANGE', blocking=True, timeout=10)
@ -11050,9 +10985,7 @@ switch value'''
frsky = FRSkyD(("127.0.0.1", 6735))
self.wait_ready_to_arm()
self.drain_mav_unparsed()
m = self.mav.recv_match(type='GLOBAL_POSITION_INT', blocking=True, timeout=1)
if m is None:
raise NotAchievedException("Did not receive GLOBAL_POSITION_INT")
m = self.assert_receive_message('GLOBAL_POSITION_INT', timeout=1)
gpi_abs_alt = int((m.alt+500) / 1000) # mm -> m
# grab a battery-remaining percentage
@ -11065,9 +10998,7 @@ switch value'''
0,
0,
0)
m = self.mav.recv_match(type='BATTERY_STATUS', blocking=True, timeout=1)
if m is None:
raise NotAchievedException("Did not receive BATTERY_STATUS")
m = self.assert_receive_message('BATTERY_STATUS', timeout=1)
want_battery_remaining_pct = m.battery_remaining
tstart = self.get_sim_time_cached()
@ -11177,10 +11108,6 @@ switch value'''
ltm = LTM(("127.0.0.1", 6735))
self.wait_ready_to_arm()
self.drain_mav_unparsed()
m = self.mav.recv_match(type='GLOBAL_POSITION_INT', blocking=True, timeout=1)
if m is None:
raise NotAchievedException("Did not receive GLOBAL_POSITION_INT")
# gpi_abs_alt = int(m.alt / 1000) # mm -> m
wants = {
"g": self.test_ltm_g,
@ -11223,9 +11150,7 @@ switch value'''
devo = DEVO(("127.0.0.1", 6735))
self.wait_ready_to_arm()
self.drain_mav_unparsed()
m = self.mav.recv_match(type='GLOBAL_POSITION_INT', blocking=True, timeout=1)
if m is None:
raise NotAchievedException("Did not receive GLOBAL_POSITION_INT")
m = self.assert_receive_message('GLOBAL_POSITION_INT', timeout=1)
tstart = self.get_sim_time_cached()
while True:

View File

@ -324,11 +324,7 @@ class AutoTestQuadPlane(AutoTest):
return self.enum_state_name("MAV_LANDED_STATE", state, pretrim="MAV_LANDED_STATE_")
def assert_extended_sys_state(self, vtol_state, landed_state):
m = self.mav.recv_match(type='EXTENDED_SYS_STATE',
blocking=True,
timeout=1)
if m is None:
raise NotAchievedException("Did not get extended_sys_state message")
m = self.assert_receive_message('EXTENDED_SYS_STATE', timeout=1)
if m.vtol_state != vtol_state:
raise ValueError("Bad MAV_VTOL_STATE. Want=%s got=%s" %
(self.vtol_state_name(vtol_state),

View File

@ -535,12 +535,7 @@ Brakes have negligible effect (with=%0.2fm without=%0.2fm delta=%0.2fm)
self.drain_mav()
m = self.mav.recv_match(type='NAV_CONTROLLER_OUTPUT',
blocking=True,
timeout=1)
if m is None:
raise MsgRcvTimeoutException(
"Did not receive NAV_CONTROLLER_OUTPUT message")
m = self.assert_receive_message('NAV_CONTROLLER_OUTPUT', timeout=1)
wp_dist_min = 5
if m.wp_dist < wp_dist_min:
@ -963,11 +958,7 @@ Brakes have negligible effect (with=%0.2fm without=%0.2fm delta=%0.2fm)
delta = self.get_sim_time() - tstart
if delta > 20:
break
m = self.mav.recv_match(type='RC_CHANNELS',
blocking=True,
timeout=1)
if m is None:
raise NotAchievedException("Did not get RC_CHANNELS")
m = self.assert_receive_message('RC_CHANNELS', timeout=1)
channel_field = "chan%u_raw" % ch
m_value = getattr(m, channel_field)
if m_value != ch_override_value:
@ -2339,11 +2330,7 @@ Brakes have negligible effect (with=%0.2fm without=%0.2fm delta=%0.2fm)
def assert_receive_mission_item_request(self, mission_type, seq):
self.progress("Expecting request for item %u" % seq)
m = self.mav.recv_match(type='MISSION_REQUEST',
blocking=True,
timeout=1)
if m is None:
raise NotAchievedException("Did not get MISSION_REQUEST")
m = self.assert_receive_message('MISSION_REQUEST', timeout=1)
if m.mission_type != mission_type:
raise NotAchievedException("Incorrect mission type (wanted=%u got=%u)" %
(mission_type, m.mission_type))
@ -3164,11 +3151,7 @@ Brakes have negligible effect (with=%0.2fm without=%0.2fm delta=%0.2fm)
mav2.mav.srcSystem = old_mav2_system
mav2.mav.srcComponent = old_mav2_component
# we continue to receive requests on the original link:
m = self.mav.recv_match(type='MISSION_REQUEST',
blocking=True,
timeout=1)
if m is None:
raise NotAchievedException("Did not get mission request")
m = self.assert_receive_message('MISSION_REQUEST', timeout=1)
if m.mission_type != mavutil.mavlink.MAV_MISSION_TYPE_RALLY:
raise NotAchievedException("Mission request of incorrect type")
if m.seq != 1:
@ -3460,9 +3443,7 @@ Brakes have negligible effect (with=%0.2fm without=%0.2fm delta=%0.2fm)
if self.get_parameter("MIS_TOTAL") != 0:
raise NotAchievedException("Failed to clear mission")
self.drain_mav_unparsed()
m = self.mav.recv_match(type='MISSION_CURRENT', blocking=True, timeout=5)
if m is None:
raise NotAchievedException("Did not get expected MISSION_CURRENT")
m = self.assert_receive_message('MISSION_CURRENT', timeout=5)
if m.seq != 0:
raise NotAchievedException("Bad mission current")
self.load_mission_using_mavproxy(mavproxy, "rover-gripper-mission.txt")
@ -3774,11 +3755,7 @@ Brakes have negligible effect (with=%0.2fm without=%0.2fm delta=%0.2fm)
return
def assert_fence_breached(self):
m = self.mav.recv_match(type='FENCE_STATUS',
blocking=True,
timeout=10)
if m is None:
raise NotAchievedException("Not receiving fence notifications?")
m = self.assert_receive_message('FENCE_STATUS', timeout=10)
if m.breach_status != 1:
raise NotAchievedException("Expected to be breached")
@ -4714,9 +4691,7 @@ Brakes have negligible effect (with=%0.2fm without=%0.2fm delta=%0.2fm)
self.arm_vehicle()
self.set_rc(3, 1600)
m = self.mav.recv_match(type='WHEEL_DISTANCE', blocking=True, timeout=5)
if m is None:
raise NotAchievedException("Did not get WHEEL_DISTANCE")
m = self.assert_receive_message('WHEEL_DISTANCE', timeout=5)
tstart = self.get_sim_time()
while True: