From 5074b6d3363d4bcfb4bc840bee9147bafd311852 Mon Sep 17 00:00:00 2001 From: bugobliterator Date: Mon, 1 Mar 2021 21:04:08 +0530 Subject: [PATCH] autotest: add test case for GPS ordering also fix flake8 issues --- Tools/autotest/arducopter.py | 85 ++++++++++++++++++++++++++---------- Tools/autotest/autotest.py | 2 +- Tools/autotest/common.py | 16 ++++--- 3 files changed, 71 insertions(+), 32 deletions(-) diff --git a/Tools/autotest/arducopter.py b/Tools/autotest/arducopter.py index 45ede7d9b7..e6ffc35a96 100644 --- a/Tools/autotest/arducopter.py +++ b/Tools/autotest/arducopter.py @@ -10,7 +10,6 @@ from __future__ import print_function import copy import math import os -import signal import shutil import time import numpy @@ -2103,35 +2102,73 @@ class AutoTestCopter(AutoTest): self.set_parameter("GPS_TYPE", 9) self.set_parameter("GPS_TYPE2", 9) self.set_parameter("SIM_GPS2_DISABLE", 0) - os.kill(self.sup_prog[0].pid, signal.SIGSTOP) - os.kill(self.sup_prog[1].pid, signal.SIGSTOP) + + self.context_push() + self.set_parameter("ARMING_CHECK", 1 << 3) + self.context_collect('STATUSTEXT') + self.reboot_sitl() # Test UAVCAN GPS ordering working - os.kill(self.sup_prog[0].pid, signal.SIGCONT) - gps1_det_text = self.wait_text("GPS 1: specified as UAVCAN") - os.kill(self.sup_prog[1].pid, signal.SIGCONT) - gps2_det_text = self.wait_text("GPS 2: specified as UAVCAN") + gps1_det_text = self.wait_statustext("GPS 1: specified as UAVCAN.*", regex=True, check_context=True) + gps2_det_text = self.wait_statustext("GPS 2: specified as UAVCAN.*", regex=True, check_context=True) gps1_nodeid = int(gps1_det_text.split('-')[1]) gps2_nodeid = int(gps2_det_text.split('-')[1]) if gps1_nodeid is None or gps2_nodeid is None: raise NotAchievedException("GPS not ordered per the order of Node IDs") - self.set_parameter("GPS1_CAN_OVRIDE", gps2_nodeid) - self.set_parameter("GPS2_CAN_OVRIDE", gps1_nodeid) - - # Reboot the SITL, and shuffle the AP_Periph - os.kill(self.sup_prog[0].pid, signal.SIGSTOP) - os.kill(self.sup_prog[1].pid, signal.SIGSTOP) - self.drain_mav() - self.reboot_sitl() - try: - # order should have changed this time - os.kill(self.sup_prog[0].pid, signal.SIGCONT) - gps1_det_text = self.wait_text("GPS 2: specified as UAVCAN") - os.kill(self.sup_prog[1].pid, signal.SIGCONT) - gps2_det_text = self.wait_text("GPS 1: specified as UAVCAN") - except: - raise NotAchievedException("GPS not ordered as requested") - + + self.context_stop_collecting('STATUSTEXT') + + GPS_Order_Tests = [[gps2_nodeid, gps2_nodeid, gps2_nodeid, 0, + "PreArm: Same Node Id {} set for multiple GPS".format(gps2_nodeid)], + [gps1_nodeid, int(gps2_nodeid/2), gps1_nodeid, 0, + "Selected GPS Node {} not set as instance {}".format(int(gps2_nodeid/2), 2)], + [int(gps1_nodeid/2), gps2_nodeid, 0, gps2_nodeid, + "Selected GPS Node {} not set as instance {}".format(int(gps1_nodeid/2), 1)], + [gps1_nodeid, gps2_nodeid, gps1_nodeid, gps2_nodeid, ""], + [gps2_nodeid, gps1_nodeid, gps2_nodeid, gps1_nodeid, ""], + [gps1_nodeid, 0, gps1_nodeid, gps2_nodeid, ""], + [0, gps2_nodeid, gps1_nodeid, gps2_nodeid, ""]] + for case in GPS_Order_Tests: + self.progress("############################### Trying Case: " + str(case)) + self.set_parameter("GPS1_CAN_OVRIDE", case[0]) + self.set_parameter("GPS2_CAN_OVRIDE", case[1]) + self.drain_mav() + self.context_collect('STATUSTEXT') + self.reboot_sitl() + gps1_det_text = None + gps2_det_text = None + try: + gps1_det_text = self.wait_statustext("GPS 1: specified as UAVCAN.*", regex=True, check_context=True) + except AutoTestTimeoutException: + pass + try: + gps2_det_text = self.wait_statustext("GPS 2: specified as UAVCAN.*", regex=True, check_context=True) + except AutoTestTimeoutException: + pass + + self.context_stop_collecting('STATUSTEXT') + self.change_mode('LOITER') + if case[2] == 0 and case[3] == 0: + if gps1_det_text or gps2_det_text: + raise NotAchievedException("Failed ordering for requested CASE:", case) + + if case[2] == 0 or case[3] == 0: + if bool(gps1_det_text is not None) == bool(gps2_det_text is not None): + print(gps1_det_text) + print(gps2_det_text) + raise NotAchievedException("Failed ordering for requested CASE:", case) + + if gps1_det_text: + if case[2] != int(gps1_det_text.split('-')[1]): + raise NotAchievedException("Failed ordering for requested CASE:", case) + if gps2_det_text: + if case[3] != int(gps2_det_text.split('-')[1]): + raise NotAchievedException("Failed ordering for requested CASE:", case) + if len(case[4]): + self.mavproxy.send('arm throttle\n') + self.mavproxy.expect(case[4]) + self.progress("############################### All GPS Order Cases Tests Passed") + self.context_pop() self.fly_auto_test() def fly_motor_fail(self, fail_servo=0, fail_mul=0.0, holdtime=30): diff --git a/Tools/autotest/autotest.py b/Tools/autotest/autotest.py index b2cb6eeaae..2875c30921 100755 --- a/Tools/autotest/autotest.py +++ b/Tools/autotest/autotest.py @@ -465,7 +465,7 @@ def run_step(step): supplementary_binaries.append([util.reltopdir(os.path.join('build', config_name, 'bin', - binary_name)), + binary_name)), '-I {}'.format(instance_num)]) # we are running in conjunction with a supplementary app # can't have speedup diff --git a/Tools/autotest/common.py b/Tools/autotest/common.py index c3f2b1361b..a95f0959c6 100644 --- a/Tools/autotest/common.py +++ b/Tools/autotest/common.py @@ -5403,10 +5403,10 @@ Also, ignores heartbeats not from our target system''' for statustext in [x.text for x in c.collections["STATUSTEXT"]]: if regex: if re.match(text, statustext): - return True + return statustext elif text.lower() in statustext.lower(): - return True - return False + return statustext + return None def wait_statustext(self, text, timeout=20, the_function=None, check_context=False, regex=False, wallclock_timeout=False): """Wait for a specific STATUSTEXT.""" @@ -5419,12 +5419,14 @@ Also, ignores heartbeats not from our target system''' # which checks all incoming messages. self.progress("Waiting for text : %s" % text.lower()) if check_context: - if self.statustext_in_collections(text, regex=regex): + statustext = self.statustext_in_collections(text, regex=regex) + if statustext: self.progress("Found expected text in collection: %s" % text.lower()) - return text + return statustext global statustext_found global statustext_full + statustext_full = None statustext_found = False def mh(mav, m): @@ -5436,6 +5438,7 @@ Also, ignores heartbeats not from our target system''' self.re_match = re.match(text, m.text) if self.re_match: statustext_found = True + statustext_full = m.text if text.lower() in m.text.lower(): self.progress("Received expected text: %s" % m.text.lower()) statustext_found = True @@ -5460,8 +5463,7 @@ Also, ignores heartbeats not from our target system''' self.mav.recv_match(type='STATUSTEXT', blocking=True, timeout=0.1) finally: self.remove_message_hook(mh) - if statustext_found: - return statustext_full + return statustext_full def get_mavlink_connection_going(self): # get a mavlink connection going