autotest: flake8 compliance for arduplane.py

This commit is contained in:
Peter Barker 2021-02-12 13:44:28 +11:00 committed by Peter Barker
parent 78aa729b90
commit 628a3843b2

View File

@ -1,6 +1,11 @@
#!/usr/bin/env python #!/usr/bin/env python
# Fly ArduPlane in SITL '''
Fly ArduPlane in SITL
AP_FLAKE8_CLEAN
'''
from __future__ import print_function from __future__ import print_function
import math import math
import os import os
@ -640,7 +645,7 @@ class AutoTestPlane(AutoTest):
) )
self.delay_sim_time(10) self.delay_sim_time(10)
self.progress("Ensuring initial speed is known and relatively constant") self.progress("Ensuring initial speed is known and relatively constant")
initial_speed = 21.5; initial_speed = 21.5
timeout = 10 timeout = 10
tstart = self.get_sim_time() tstart = self.get_sim_time()
while True: while True:
@ -702,12 +707,14 @@ class AutoTestPlane(AutoTest):
def fly_home_land_and_disarm(self, timeout=120): def fly_home_land_and_disarm(self, timeout=120):
filename = "flaps.txt" filename = "flaps.txt"
self.progress("Using %s to fly home" % filename) self.progress("Using %s to fly home" % filename)
num_wp = self.load_mission(filename) self.load_mission(filename)
self.change_mode("AUTO") self.change_mode("AUTO")
self.mavproxy.send('wp set 7\n') self.mavproxy.send('wp set 7\n')
self.drain_mav() self.drain_mav()
# TODO: reflect on file to find this magic waypoint number? # TODO: reflect on file to find this magic waypoint number?
# self.wait_waypoint(7, num_wp-1, timeout=500) # we tend to miss the final waypoint by a fair bit, and this is probably too noisy anyway? # self.wait_waypoint(7, num_wp-1, timeout=500) # we
# tend to miss the final waypoint by a fair bit, and
# this is probably too noisy anyway?
self.wait_disarmed(timeout=timeout) self.wait_disarmed(timeout=timeout)
def fly_flaps(self): def fly_flaps(self):
@ -865,7 +872,8 @@ class AutoTestPlane(AutoTest):
self.context_collect("HEARTBEAT") self.context_collect("HEARTBEAT")
self.set_parameter("SIM_RC_FAIL", 2) # throttle-to-950 self.set_parameter("SIM_RC_FAIL", 2) # throttle-to-950
self.wait_mode('RTL') # long failsafe self.wait_mode('RTL') # long failsafe
if (not self.get_mode_from_mode_mapping("CIRCLE") in [x.custom_mode for x in self.context_stop_collecting("HEARTBEAT")]): if (not self.get_mode_from_mode_mapping("CIRCLE") in
[x.custom_mode for x in self.context_stop_collecting("HEARTBEAT")]):
raise NotAchievedException("Did not go via circle mode") raise NotAchievedException("Did not go via circle mode")
self.progress("Ensure we've had our throttle squashed to 950") self.progress("Ensure we've had our throttle squashed to 950")
self.wait_rc_channel_value(3, 950) self.wait_rc_channel_value(3, 950)
@ -902,7 +910,8 @@ class AutoTestPlane(AutoTest):
self.context_collect("HEARTBEAT") self.context_collect("HEARTBEAT")
self.set_parameter("SIM_RC_FAIL", 1) # no-pulses self.set_parameter("SIM_RC_FAIL", 1) # no-pulses
self.wait_mode('RTL') # long failsafe self.wait_mode('RTL') # long failsafe
if (not self.get_mode_from_mode_mapping("CIRCLE") in [x.custom_mode for x in self.context_stop_collecting("HEARTBEAT")]): if (not self.get_mode_from_mode_mapping("CIRCLE") in
[x.custom_mode for x in self.context_stop_collecting("HEARTBEAT")]):
raise NotAchievedException("Did not go via circle mode") raise NotAchievedException("Did not go via circle mode")
self.drain_mav_unparsed() self.drain_mav_unparsed()
m = self.mav.recv_match(type='SYS_STATUS', blocking=True) m = self.mav.recv_match(type='SYS_STATUS', blocking=True)
@ -1035,7 +1044,8 @@ class AutoTestPlane(AutoTest):
m = self.mav.recv_match(type='SYS_STATUS', blocking=True, timeout=1) m = self.mav.recv_match(type='SYS_STATUS', blocking=True, timeout=1)
if m is None: if m is None:
raise NotAchievedException("Did not receive SYS_STATUS") raise NotAchievedException("Did not receive SYS_STATUS")
tests = [ ( "present", present, m.onboard_control_sensors_present ), tests = [
("present", present, m.onboard_control_sensors_present),
("enabled", enabled, m.onboard_control_sensors_enabled), ("enabled", enabled, m.onboard_control_sensors_enabled),
("health", health, m.onboard_control_sensors_health), ("health", health, m.onboard_control_sensors_health),
] ]
@ -1119,7 +1129,7 @@ class AutoTestPlane(AutoTest):
self.load_fence("CMAC-fence.txt") self.load_fence("CMAC-fence.txt")
m = self.mav.recv_match(type='FENCE_STATUS', blocking=True, timeout=2) m = self.mav.recv_match(type='FENCE_STATUS', blocking=True, timeout=2)
if m is not None: if m is not None:
raise NotAchievedException("Got FENCE_STATUS unexpectedly"); raise NotAchievedException("Got FENCE_STATUS unexpectedly")
self.drain_mav_unparsed() self.drain_mav_unparsed()
self.set_parameter("FENCE_ACTION", mavutil.mavlink.FENCE_ACTION_NONE) # report only self.set_parameter("FENCE_ACTION", mavutil.mavlink.FENCE_ACTION_NONE) # report only
self.assert_fence_sys_status(False, False, True) self.assert_fence_sys_status(False, False, True)
@ -1130,7 +1140,7 @@ class AutoTestPlane(AutoTest):
self.assert_fence_sys_status(True, True, True) self.assert_fence_sys_status(True, True, True)
m = self.mav.recv_match(type='FENCE_STATUS', blocking=True, timeout=2) m = self.mav.recv_match(type='FENCE_STATUS', blocking=True, timeout=2)
if m is None: if m is None:
raise NotAchievedException("Did not get FENCE_STATUS"); raise NotAchievedException("Did not get FENCE_STATUS")
if m.breach_status: if m.breach_status:
raise NotAchievedException("Breached fence unexpectedly (%u)" % raise NotAchievedException("Breached fence unexpectedly (%u)" %
(m.breach_status)) (m.breach_status))
@ -1192,7 +1202,7 @@ class AutoTestPlane(AutoTest):
raise NotAchievedException("Did not breach fence") raise NotAchievedException("Did not breach fence")
m = self.mav.recv_match(type='FENCE_STATUS', blocking=True, timeout=2) m = self.mav.recv_match(type='FENCE_STATUS', blocking=True, timeout=2)
if m is None: if m is None:
raise NotAchievedException("Did not get FENCE_STATUS"); raise NotAchievedException("Did not get FENCE_STATUS")
if m.breach_status == 0: if m.breach_status == 0:
continue continue
@ -1367,6 +1377,7 @@ class AutoTestPlane(AutoTest):
self.simstate = None self.simstate = None
self.last_print = 0 self.last_print = 0
self.max_divergence = 0 self.max_divergence = 0
def validate_global_position_int_against_simstate(mav, m): def validate_global_position_int_against_simstate(mav, m):
if m.get_type() == 'GLOBAL_POSITION_INT': if m.get_type() == 'GLOBAL_POSITION_INT':
self.gpi = m self.gpi = m
@ -1512,7 +1523,9 @@ class AutoTestPlane(AutoTest):
if gpi is None: if gpi is None:
raise NotAchievedException("Did not receive GLOBAL_POSITION_INT message") raise NotAchievedException("Did not receive GLOBAL_POSITION_INT message")
if abs(rf.distance - gpi.relative_alt/1000.0) > 3: if abs(rf.distance - gpi.relative_alt/1000.0) > 3:
raise NotAchievedException("rangefinder alt (%s) disagrees with global-position-int.relative_alt (%s)" % (rf.distance, gpi.relative_alt/1000.0)) raise NotAchievedException(
"rangefinder alt (%s) disagrees with global-position-int.relative_alt (%s)" %
(rf.distance, gpi.relative_alt/1000.0))
self.wait_statustext("Auto disarmed", timeout=60) self.wait_statustext("Auto disarmed", timeout=60)
self.progress("Ensure RFND messages in log") self.progress("Ensure RFND messages in log")
@ -1544,7 +1557,6 @@ class AutoTestPlane(AutoTest):
self.change_mode("FBWA") # we don't update PIDs in MANUAL self.change_mode("FBWA") # we don't update PIDs in MANUAL
super(AutoTestPlane, self).test_pid_tuning() super(AutoTestPlane, self).test_pid_tuning()
def test_setting_modes_via_auxswitches(self): def test_setting_modes_via_auxswitches(self):
self.set_parameter("FLTMODE1", 1) # circle self.set_parameter("FLTMODE1", 1) # circle
self.set_rc(8, 950) self.set_rc(8, 950)
@ -1590,7 +1602,8 @@ class AutoTestPlane(AutoTest):
def test_adsb_send_threatening_adsb_message(self, here): def test_adsb_send_threatening_adsb_message(self, here):
self.progress("Sending ABSD_VEHICLE message") self.progress("Sending ABSD_VEHICLE message")
self.mav.mav.adsb_vehicle_send(37, # ICAO address self.mav.mav.adsb_vehicle_send(
37, # ICAO address
int(here.lat * 1e7), int(here.lat * 1e7),
int(here.lng * 1e7), int(here.lng * 1e7),
mavutil.mavlink.ADSB_ALTITUDE_TYPE_PRESSURE_QNH, mavutil.mavlink.ADSB_ALTITUDE_TYPE_PRESSURE_QNH,
@ -1633,7 +1646,8 @@ class AutoTestPlane(AutoTest):
self.wait_mode("RTL") self.wait_mode("RTL")
self.progress("Sending far-away ABSD_VEHICLE message") self.progress("Sending far-away ABSD_VEHICLE message")
self.mav.mav.adsb_vehicle_send(37, # ICAO address self.mav.mav.adsb_vehicle_send(
37, # ICAO address
int(here.lat+1 * 1e7), int(here.lat+1 * 1e7),
int(here.lng * 1e7), int(here.lng * 1e7),
mavutil.mavlink.ADSB_ALTITUDE_TYPE_PRESSURE_QNH, mavutil.mavlink.ADSB_ALTITUDE_TYPE_PRESSURE_QNH,
@ -1798,14 +1812,14 @@ class AutoTestPlane(AutoTest):
model = "plane-soaring" model = "plane-soaring"
self.customise_SITL_commandline([], self.customise_SITL_commandline(
[],
model=model, model=model,
defaults_filepath=self.model_defaults_filepath("ArduPlane", model), defaults_filepath=self.model_defaults_filepath("ArduPlane", model),
wipe=True) wipe=True)
self.load_mission('CMAC-soar.txt') self.load_mission('CMAC-soar.txt')
self.mavproxy.send("wp set 1\n") self.mavproxy.send("wp set 1\n")
self.change_mode('AUTO') self.change_mode('AUTO')
self.wait_ready_to_arm() self.wait_ready_to_arm()
@ -1816,7 +1830,7 @@ class AutoTestPlane(AutoTest):
for i in range(8): for i in range(8):
rcx_option = self.get_parameter('RC{0}_OPTION'.format(i+1)) rcx_option = self.get_parameter('RC{0}_OPTION'.format(i+1))
if rcx_option == 88: if rcx_option == 88:
rc_chan = i+1; rc_chan = i+1
break break
if rc_chan == 0: if rc_chan == 0:
@ -1825,7 +1839,7 @@ class AutoTestPlane(AutoTest):
self.set_rc_from_map({ self.set_rc_from_map({
rc_chan: 1900, rc_chan: 1900,
3: 1500, # Use trim airspeed. 3: 1500, # Use trim airspeed.
}); })
# Wait to detect thermal # Wait to detect thermal
self.progress("Waiting for thermal") self.progress("Waiting for thermal")
@ -1843,7 +1857,6 @@ class AutoTestPlane(AutoTest):
# Disable thermals # Disable thermals
self.set_parameter("SIM_THML_SCENARI", 0) self.set_parameter("SIM_THML_SCENARI", 0)
# Wait to descend to SOAR_ALT_MIN # Wait to descend to SOAR_ALT_MIN
self.progress("Waiting for glide to min altitude") self.progress("Waiting for glide to min altitude")
alt_min = self.get_parameter('SOAR_ALT_MIN') alt_min = self.get_parameter('SOAR_ALT_MIN')
@ -2113,7 +2126,6 @@ class AutoTestPlane(AutoTest):
if not bad_value: if not bad_value:
raise NotAchievedException("uncompensated IMUs did not vary enough") raise NotAchievedException("uncompensated IMUs did not vary enough")
def ekf_lane_switch(self): def ekf_lane_switch(self):
self.context_push() self.context_push()
@ -2139,6 +2151,7 @@ class AutoTestPlane(AutoTest):
self.reboot_sitl() self.reboot_sitl()
self.lane_switches = [] self.lane_switches = []
# add an EKF lane switch hook # add an EKF lane switch hook
def statustext_hook(mav, message): def statustext_hook(mav, message):
if message.get_type() != 'STATUSTEXT': if message.get_type() != 'STATUSTEXT':
@ -2155,14 +2168,18 @@ class AutoTestPlane(AutoTest):
self.change_mode('CIRCLE') self.change_mode('CIRCLE')
try: try:
##################################################################################################################################################### ###################################################################
self.progress("Checking EKF3 Lane Switching trigger from all sensors") self.progress("Checking EKF3 Lane Switching trigger from all sensors")
##################################################################################################################################################### ###################################################################
self.start_subtest("ACCELEROMETER: Change z-axis offset") self.start_subtest("ACCELEROMETER: Change z-axis offset")
# create an accelerometer error by changing the Z-axis offset # create an accelerometer error by changing the Z-axis offset
self.context_collect("STATUSTEXT") self.context_collect("STATUSTEXT")
old_parameter = self.get_parameter("INS_ACCOFFS_Z") old_parameter = self.get_parameter("INS_ACCOFFS_Z")
self.wait_statustext(text="EKF3 lane switch", timeout=30, the_function=self.set_parameter("INS_ACCOFFS_Z", old_parameter + 5), check_context=True) self.wait_statustext(
text="EKF3 lane switch",
timeout=30,
the_function=self.set_parameter("INS_ACCOFFS_Z", old_parameter + 5),
check_context=True)
if self.lane_switches != [1]: if self.lane_switches != [1]:
raise NotAchievedException("Expected lane switch 1, got %s" % str(self.lane_switches[-1])) raise NotAchievedException("Expected lane switch 1, got %s" % str(self.lane_switches[-1]))
# Cleanup # Cleanup
@ -2170,13 +2187,17 @@ class AutoTestPlane(AutoTest):
self.context_clear_collection("STATUSTEXT") self.context_clear_collection("STATUSTEXT")
self.wait_heading(0, accuracy=10, timeout=60) self.wait_heading(0, accuracy=10, timeout=60)
self.wait_heading(180, accuracy=10, timeout=60) self.wait_heading(180, accuracy=10, timeout=60)
##################################################################################################################################################### ###################################################################
self.start_subtest("BAROMETER: Freeze to last measured value") self.start_subtest("BAROMETER: Freeze to last measured value")
self.context_collect("STATUSTEXT") self.context_collect("STATUSTEXT")
# create a barometer error by inhibiting any pressure change while changing altitude # create a barometer error by inhibiting any pressure change while changing altitude
old_parameter = self.get_parameter("SIM_BAR2_FREEZE") old_parameter = self.get_parameter("SIM_BAR2_FREEZE")
self.set_parameter("SIM_BAR2_FREEZE", 1) self.set_parameter("SIM_BAR2_FREEZE", 1)
self.wait_statustext(text="EKF3 lane switch", timeout=30, the_function=lambda: self.set_rc(2, 2000), check_context=True) self.wait_statustext(
text="EKF3 lane switch",
timeout=30,
the_function=lambda: self.set_rc(2, 2000),
check_context=True)
if self.lane_switches != [1, 0]: if self.lane_switches != [1, 0]:
raise NotAchievedException("Expected lane switch 0, got %s" % str(self.lane_switches[-1])) raise NotAchievedException("Expected lane switch 0, got %s" % str(self.lane_switches[-1]))
# Cleanup # Cleanup
@ -2185,11 +2206,13 @@ class AutoTestPlane(AutoTest):
self.context_clear_collection("STATUSTEXT") self.context_clear_collection("STATUSTEXT")
self.wait_heading(0, accuracy=10, timeout=60) self.wait_heading(0, accuracy=10, timeout=60)
self.wait_heading(180, accuracy=10, timeout=60) self.wait_heading(180, accuracy=10, timeout=60)
##################################################################################################################################################### ###################################################################
self.start_subtest("GPS: Apply GPS Velocity Error in NED") self.start_subtest("GPS: Apply GPS Velocity Error in NED")
self.context_push() self.context_push()
self.context_collect("STATUSTEXT") self.context_collect("STATUSTEXT")
# create a GPS velocity error by adding a random 2m/s noise on each axis
# create a GPS velocity error by adding a random 2m/s
# noise on each axis
def sim_gps_verr(): def sim_gps_verr():
self.set_parameters({ self.set_parameters({
"SIM_GPS_VERR_X": self.get_parameter("SIM_GPS_VERR_X") + 2, "SIM_GPS_VERR_X": self.get_parameter("SIM_GPS_VERR_X") + 2,
@ -2204,12 +2227,16 @@ class AutoTestPlane(AutoTest):
self.context_clear_collection("STATUSTEXT") self.context_clear_collection("STATUSTEXT")
self.wait_heading(0, accuracy=10, timeout=60) self.wait_heading(0, accuracy=10, timeout=60)
self.wait_heading(180, accuracy=10, timeout=60) self.wait_heading(180, accuracy=10, timeout=60)
##################################################################################################################################################### ###################################################################
self.start_subtest("MAGNETOMETER: Change X-Axis Offset") self.start_subtest("MAGNETOMETER: Change X-Axis Offset")
self.context_collect("STATUSTEXT") self.context_collect("STATUSTEXT")
# create a magnetometer error by changing the X-axis offset # create a magnetometer error by changing the X-axis offset
old_parameter = self.get_parameter("SIM_MAG2_OFS_X") old_parameter = self.get_parameter("SIM_MAG2_OFS_X")
self.wait_statustext(text="EKF3 lane switch", timeout=30, the_function=self.set_parameter("SIM_MAG2_OFS_X", old_parameter + 150), check_context=True) self.wait_statustext(
text="EKF3 lane switch",
timeout=30,
the_function=self.set_parameter("SIM_MAG2_OFS_X", old_parameter + 150),
check_context=True)
if self.lane_switches != [1, 0, 1, 0]: if self.lane_switches != [1, 0, 1, 0]:
raise NotAchievedException("Expected lane switch 0, got %s" % str(self.lane_switches[-1])) raise NotAchievedException("Expected lane switch 0, got %s" % str(self.lane_switches[-1]))
# Cleanup # Cleanup
@ -2217,14 +2244,16 @@ class AutoTestPlane(AutoTest):
self.context_clear_collection("STATUSTEXT") self.context_clear_collection("STATUSTEXT")
self.wait_heading(0, accuracy=10, timeout=60) self.wait_heading(0, accuracy=10, timeout=60)
self.wait_heading(180, accuracy=10, timeout=60) self.wait_heading(180, accuracy=10, timeout=60)
##################################################################################################################################################### ###################################################################
self.start_subtest("AIRSPEED: Fail to constant value") self.start_subtest("AIRSPEED: Fail to constant value")
self.context_push() self.context_push()
self.context_collect("STATUSTEXT") self.context_collect("STATUSTEXT")
# create an airspeed sensor error by freezing to the current airspeed then changing the groundspeed # create an airspeed sensor error by freezing to the
# current airspeed then changing the groundspeed
old_parameter = self.get_parameter("SIM_ARSPD_FAIL") old_parameter = self.get_parameter("SIM_ARSPD_FAIL")
m = self.mav.recv_match(type='VFR_HUD', blocking=True) m = self.mav.recv_match(type='VFR_HUD', blocking=True)
self.set_parameter("SIM_ARSPD_FAIL", m.airspeed) self.set_parameter("SIM_ARSPD_FAIL", m.airspeed)
def change_speed(): def change_speed():
self.change_mode("GUIDED") self.change_mode("GUIDED")
self.run_cmd_int( self.run_cmd_int(
@ -2258,18 +2287,22 @@ class AutoTestPlane(AutoTest):
self.context_clear_collection("STATUSTEXT") self.context_clear_collection("STATUSTEXT")
self.wait_heading(0, accuracy=10, timeout=60) self.wait_heading(0, accuracy=10, timeout=60)
self.wait_heading(180, accuracy=10, timeout=60) self.wait_heading(180, accuracy=10, timeout=60)
##################################################################################################################################################### ###################################################################
self.progress("GYROSCOPE: Change Y-Axis Offset") self.progress("GYROSCOPE: Change Y-Axis Offset")
self.context_collect("STATUSTEXT") self.context_collect("STATUSTEXT")
# create a gyroscope error by changing the Y-axis offset # create a gyroscope error by changing the Y-axis offset
old_parameter = self.get_parameter("INS_GYR2OFFS_Y") old_parameter = self.get_parameter("INS_GYR2OFFS_Y")
self.wait_statustext(text="EKF3 lane switch", timeout=30, the_function=self.set_parameter("INS_GYR2OFFS_Y", old_parameter + 1), check_context=True) self.wait_statustext(
text="EKF3 lane switch",
timeout=30,
the_function=self.set_parameter("INS_GYR2OFFS_Y", old_parameter + 1),
check_context=True)
if self.lane_switches != [1, 0, 1, 0, 1, 0]: if self.lane_switches != [1, 0, 1, 0, 1, 0]:
raise NotAchievedException("Expected lane switch 0, got %s" % str(self.lane_switches[-1])) raise NotAchievedException("Expected lane switch 0, got %s" % str(self.lane_switches[-1]))
# Cleanup # Cleanup
self.set_parameter("INS_GYR2OFFS_Y", old_parameter) self.set_parameter("INS_GYR2OFFS_Y", old_parameter)
self.context_clear_collection("STATUSTEXT") self.context_clear_collection("STATUSTEXT")
##################################################################################################################################################### ###################################################################
self.disarm_vehicle() self.disarm_vehicle()