autotest: add option to retry tests / create and use Test definition object

This commit is contained in:
Peter Barker 2021-01-20 13:58:20 +11:00 committed by Peter Barker
parent d81c2ee407
commit c7a6047904
3 changed files with 125 additions and 87 deletions

View File

@ -19,6 +19,7 @@ from pysim import vehicleinfo
from common import AutoTest
from common import NotAchievedException, AutoTestTimeoutException, PreconditionFailedException
from common import Test
# get location of scripts
testdir = os.path.dirname(os.path.realpath(__file__))
@ -6273,61 +6274,62 @@ class AutoTestCopter(AutoTest):
def tests2b(self): #this block currently around 9.5mins here
'''return list of all tests'''
ret = ([
("MotorVibration",
"Fly motor vibration test",
self.fly_motor_vibration),
Test("MotorVibration",
"Fly motor vibration test",
self.fly_motor_vibration),
("DynamicNotches",
"Fly Dynamic Notches",
self.fly_dynamic_notches),
Test("DynamicNotches",
"Fly Dynamic Notches",
self.fly_dynamic_notches),
("GyroFFT",
"Fly Gyro FFT",
self.fly_gyro_fft),
("GyroFFTHarmonic",
"Fly Gyro FFT Harmonic Matching",
self.fly_gyro_fft_harmonic),
Test("GyroFFT",
"Fly Gyro FFT",
self.fly_gyro_fft,
attempts=4),
("CompassReordering",
"Test Compass reordering when priorities are changed",
self.test_mag_reordering), # 40sec?
Test("GyroFFTHarmonic",
"Fly Gyro FFT Harmonic Matching",
self.fly_gyro_fft_harmonic),
("CRSF",
"Test RC CRSF",
self.test_crsf), #20secs ish
Test("CompassReordering",
"Test Compass reordering when priorities are changed",
self.test_mag_reordering), # 40sec?
("MotorTest",
"Run Motor Tests",
self.test_motortest), #20secs ish
Test("CRSF",
"Test RC CRSF",
self.test_crsf), #20secs ish
("AltEstimation",
"Test that Alt Estimation is mandatory for ALT_HOLD",
self.test_alt_estimate_prearm), #20secs ish
Test("MotorTest",
"Run Motor Tests",
self.test_motortest), #20secs ish
("EKFSource",
"Check EKF Source Prearms work",
self.test_ekf_source),
Test("AltEstimation",
"Test that Alt Estimation is mandatory for ALT_HOLD",
self.test_alt_estimate_prearm), #20secs ish
("DataFlash",
"Test DataFlash Block backend",
self.test_dataflash_sitl),
Test("EKFSource",
"Check EKF Source Prearms work",
self.test_ekf_source),
("DataFlashErase",
"Test DataFlash Block backend erase",
self.test_dataflash_erase),
Test("DataFlash",
"Test DataFlash Block backend",
self.test_dataflash_sitl),
("Callisto",
"Test Callisto",
self.test_callisto),
Test("DataFlashErase",
"Test DataFlash Block backend erase",
self.test_dataflash_erase),
("Replay",
"Test Replay",
self.test_replay),
Test("Callisto",
"Test Callisto",
self.test_callisto),
("LogUpload",
"Log upload",
self.log_upload),
Test("Replay",
"Test Replay",
self.test_replay),
Test("LogUpload",
"Log upload",
self.log_upload),
])
return ret

View File

@ -372,8 +372,12 @@ def run_specific_test(step, *args, **kwargs):
print("Got %s" % str(tester))
for a in tester.tests():
print("Got %s" % (a[0]))
if a[0] == test:
if hasattr(a, 'name'):
name = a.name
else:
name = a[0]
print("Got %s" % (name))
if name == test:
return tester.run_tests([a])
print("Failed to find test %s on %s" % (test, testname))
sys.exit(1)

View File

@ -1104,6 +1104,14 @@ class LocationInt(object):
self.alt = alt
self.yaw = yaw
class Test(object):
'''a test definition - information about a test'''
def __init__(self, name, description, function, attempts=1):
self.name = name
self.description = description
self.function = function
self.attempts = attempts
class AutoTest(ABC):
"""Base abstract class.
It implements the common function for all vehicle types.
@ -4890,10 +4898,27 @@ Also, ignores heartbeats not from our target system'''
util.run_cmd('/bin/cp build/sitl/bin/* %s' % to_dir,
directory=util.reltopdir('.'))
def run_one_test(self, name, desc, test_function, interact=False):
def run_one_test(self, test, interact=False):
'''new-style run-one-test used by run_tests'''
test_output_filename = self.buildlogs_path("%s-%s.txt" %
(self.log_name(), name))
for i in range(0, test.attempts-1):
if self.run_one_test_attempt(test, interact=interact, attempt=i+2, do_fail_list=False):
return
self.progress("Run attempt failed. Retrying")
self.run_one_test_attempt(test, interact=interact, attempt=1)
def run_one_test_attempt(self, test, interact=False, attempt=1, do_fail_list=True):
'''called by run_one_test to actually run the test in a retry loop'''
name = test.name
desc = test.description
test_function = test.function
if attempt != 1:
self.progress("RETRYING %s" % name)
test_output_filename = self.buildlogs_path("%s-%s-retry-%u.txt" %
(self.log_name(), name, attempt-1))
else:
test_output_filename = self.buildlogs_path("%s-%s.txt" %
(self.log_name(), name))
tee = TeeBoth(test_output_filename, 'w', self.mavproxy_logfile)
prettyname = "%s (%s)" % (name, desc)
@ -4958,7 +4983,8 @@ Also, ignores heartbeats not from our target system'''
else:
self.progress('FAILED: "%s": %s (see %s)' %
(prettyname, repr(ex), test_output_filename))
self.fail_list.append((prettyname, ex, test_output_filename))
if do_fail_list:
self.fail_list.append((prettyname, ex, test_output_filename))
if interact:
self.progress("Starting MAVProxy interaction as directed")
self.mavproxy.interact()
@ -4970,6 +4996,8 @@ Also, ignores heartbeats not from our target system'''
tee.close()
return passed
def check_test_syntax(self, test_file):
"""Check mistake on autotest function syntax."""
self.start_test("Check for syntax mistake in autotest lambda")
@ -7591,8 +7619,7 @@ switch value'''
pass
def test_skipped(self, test, reason):
(name, desc, func) = test
self.progress("##### %s is skipped: %s" % (name, reason))
self.progress("##### %s is skipped: %s" % (test, reason))
self.skip_list.append((test, reason))
def last_onboard_log(self):
@ -7652,8 +7679,7 @@ switch value'''
self.clear_mission(mavutil.mavlink.MAV_MISSION_TYPE_ALL)
for test in tests:
(name, desc, func) = test
self.run_one_test(name, desc, func)
self.run_one_test(test)
except pexpect.TIMEOUT:
self.progress("Failed with timeout")
@ -7673,8 +7699,7 @@ switch value'''
self.progress("Skipped tests:")
for skipped in self.skip_list:
(test, reason) = skipped
(name, desc, func) = test
print(" %s (see %s)" % (name, reason))
print(" %s (see %s)" % (test.name, reason))
if len(self.fail_list):
self.progress("Failing tests:")
@ -8985,43 +9010,43 @@ switch value'''
def tests(self):
return [
("PIDTuning",
"Test PID Tuning",
self.test_pid_tuning),
Test("PIDTuning",
"Test PID Tuning",
self.test_pid_tuning),
("ArmFeatures", "Arm features", self.test_arm_feature),
Test("ArmFeatures", "Arm features", self.test_arm_feature),
("SetHome",
"Test Set Home",
self.fly_test_set_home),
Test("SetHome",
"Test Set Home",
self.fly_test_set_home),
("ConfigErrorLoop",
"Test Config Error Loop",
self.test_config_error_loop),
Test("ConfigErrorLoop",
"Test Config Error Loop",
self.test_config_error_loop),
("CPUFailsafe",
"Ensure we do something appropriate when the main loop stops",
self.CPUFailsafe),
Test("CPUFailsafe",
"Ensure we do something appropriate when the main loop stops",
self.CPUFailsafe),
("Parameters",
"Test Parameter Set/Get",
self.test_parameters),
Test("Parameters",
"Test Parameter Set/Get",
self.test_parameters),
("LoggerDocumentation",
"Test Onboard Logging Generation",
self.test_onboard_logging_generation),
Test("LoggerDocumentation",
"Test Onboard Logging Generation",
self.test_onboard_logging_generation),
("Logging",
"Test Onboard Logging",
self.test_onboard_logging),
Test("Logging",
"Teqst Onboard Logging",
self.test_onboard_logging),
("GetCapabilities",
"Get Capabilities",
self.test_get_autopilot_capabilities),
Test("GetCapabilities",
"Get Capabilities",
self.test_get_autopilot_capabilities),
("InitialMode",
"Test initial mode switching",
self.test_initial_mode),
Test("InitialMode",
"Test initial mode switching",
self.test_initial_mode),
]
def post_tests_announcements(self):
@ -9041,13 +9066,20 @@ switch value'''
def autotest(self):
"""Autotest used by ArduPilot autotest CI."""
all_tests = self.tests()
all_tests = []
for test in self.tests():
if type(test) == Test:
all_tests.append(test)
continue
(name, desc, func) = test
actual_test = Test(name, desc, func)
all_tests.append(actual_test)
disabled = self.disabled_tests()
tests = []
for test in all_tests:
(name, desc, func) = test
if name in disabled:
self.test_skipped(test, disabled[name])
if test.name in disabled:
self.test_skipped(test, disabled[test.name])
continue
tests.append(test)