autotest: add test case for GPS ordering

also fix flake8 issues
This commit is contained in:
bugobliterator 2021-03-01 21:04:08 +05:30 committed by Randy Mackay
parent 48277b43e4
commit 5074b6d336
3 changed files with 71 additions and 32 deletions

View File

@ -10,7 +10,6 @@ from __future__ import print_function
import copy import copy
import math import math
import os import os
import signal
import shutil import shutil
import time import time
import numpy import numpy
@ -2103,35 +2102,73 @@ class AutoTestCopter(AutoTest):
self.set_parameter("GPS_TYPE", 9) self.set_parameter("GPS_TYPE", 9)
self.set_parameter("GPS_TYPE2", 9) self.set_parameter("GPS_TYPE2", 9)
self.set_parameter("SIM_GPS2_DISABLE", 0) self.set_parameter("SIM_GPS2_DISABLE", 0)
os.kill(self.sup_prog[0].pid, signal.SIGSTOP)
os.kill(self.sup_prog[1].pid, signal.SIGSTOP) self.context_push()
self.set_parameter("ARMING_CHECK", 1 << 3)
self.context_collect('STATUSTEXT')
self.reboot_sitl() self.reboot_sitl()
# Test UAVCAN GPS ordering working # Test UAVCAN GPS ordering working
os.kill(self.sup_prog[0].pid, signal.SIGCONT) gps1_det_text = self.wait_statustext("GPS 1: specified as UAVCAN.*", regex=True, check_context=True)
gps1_det_text = self.wait_text("GPS 1: specified as UAVCAN") gps2_det_text = self.wait_statustext("GPS 2: specified as UAVCAN.*", regex=True, check_context=True)
os.kill(self.sup_prog[1].pid, signal.SIGCONT)
gps2_det_text = self.wait_text("GPS 2: specified as UAVCAN")
gps1_nodeid = int(gps1_det_text.split('-')[1]) gps1_nodeid = int(gps1_det_text.split('-')[1])
gps2_nodeid = int(gps2_det_text.split('-')[1]) gps2_nodeid = int(gps2_det_text.split('-')[1])
if gps1_nodeid is None or gps2_nodeid is None: if gps1_nodeid is None or gps2_nodeid is None:
raise NotAchievedException("GPS not ordered per the order of Node IDs") raise NotAchievedException("GPS not ordered per the order of Node IDs")
self.set_parameter("GPS1_CAN_OVRIDE", gps2_nodeid)
self.set_parameter("GPS2_CAN_OVRIDE", gps1_nodeid) self.context_stop_collecting('STATUSTEXT')
# Reboot the SITL, and shuffle the AP_Periph GPS_Order_Tests = [[gps2_nodeid, gps2_nodeid, gps2_nodeid, 0,
os.kill(self.sup_prog[0].pid, signal.SIGSTOP) "PreArm: Same Node Id {} set for multiple GPS".format(gps2_nodeid)],
os.kill(self.sup_prog[1].pid, signal.SIGSTOP) [gps1_nodeid, int(gps2_nodeid/2), gps1_nodeid, 0,
self.drain_mav() "Selected GPS Node {} not set as instance {}".format(int(gps2_nodeid/2), 2)],
self.reboot_sitl() [int(gps1_nodeid/2), gps2_nodeid, 0, gps2_nodeid,
try: "Selected GPS Node {} not set as instance {}".format(int(gps1_nodeid/2), 1)],
# order should have changed this time [gps1_nodeid, gps2_nodeid, gps1_nodeid, gps2_nodeid, ""],
os.kill(self.sup_prog[0].pid, signal.SIGCONT) [gps2_nodeid, gps1_nodeid, gps2_nodeid, gps1_nodeid, ""],
gps1_det_text = self.wait_text("GPS 2: specified as UAVCAN") [gps1_nodeid, 0, gps1_nodeid, gps2_nodeid, ""],
os.kill(self.sup_prog[1].pid, signal.SIGCONT) [0, gps2_nodeid, gps1_nodeid, gps2_nodeid, ""]]
gps2_det_text = self.wait_text("GPS 1: specified as UAVCAN") for case in GPS_Order_Tests:
except: self.progress("############################### Trying Case: " + str(case))
raise NotAchievedException("GPS not ordered as requested") self.set_parameter("GPS1_CAN_OVRIDE", case[0])
self.set_parameter("GPS2_CAN_OVRIDE", case[1])
self.drain_mav()
self.context_collect('STATUSTEXT')
self.reboot_sitl()
gps1_det_text = None
gps2_det_text = None
try:
gps1_det_text = self.wait_statustext("GPS 1: specified as UAVCAN.*", regex=True, check_context=True)
except AutoTestTimeoutException:
pass
try:
gps2_det_text = self.wait_statustext("GPS 2: specified as UAVCAN.*", regex=True, check_context=True)
except AutoTestTimeoutException:
pass
self.context_stop_collecting('STATUSTEXT')
self.change_mode('LOITER')
if case[2] == 0 and case[3] == 0:
if gps1_det_text or gps2_det_text:
raise NotAchievedException("Failed ordering for requested CASE:", case)
if case[2] == 0 or case[3] == 0:
if bool(gps1_det_text is not None) == bool(gps2_det_text is not None):
print(gps1_det_text)
print(gps2_det_text)
raise NotAchievedException("Failed ordering for requested CASE:", case)
if gps1_det_text:
if case[2] != int(gps1_det_text.split('-')[1]):
raise NotAchievedException("Failed ordering for requested CASE:", case)
if gps2_det_text:
if case[3] != int(gps2_det_text.split('-')[1]):
raise NotAchievedException("Failed ordering for requested CASE:", case)
if len(case[4]):
self.mavproxy.send('arm throttle\n')
self.mavproxy.expect(case[4])
self.progress("############################### All GPS Order Cases Tests Passed")
self.context_pop()
self.fly_auto_test() self.fly_auto_test()
def fly_motor_fail(self, fail_servo=0, fail_mul=0.0, holdtime=30): def fly_motor_fail(self, fail_servo=0, fail_mul=0.0, holdtime=30):

View File

@ -465,7 +465,7 @@ def run_step(step):
supplementary_binaries.append([util.reltopdir(os.path.join('build', supplementary_binaries.append([util.reltopdir(os.path.join('build',
config_name, config_name,
'bin', 'bin',
binary_name)), binary_name)),
'-I {}'.format(instance_num)]) '-I {}'.format(instance_num)])
# we are running in conjunction with a supplementary app # we are running in conjunction with a supplementary app
# can't have speedup # can't have speedup

View File

@ -5403,10 +5403,10 @@ Also, ignores heartbeats not from our target system'''
for statustext in [x.text for x in c.collections["STATUSTEXT"]]: for statustext in [x.text for x in c.collections["STATUSTEXT"]]:
if regex: if regex:
if re.match(text, statustext): if re.match(text, statustext):
return True return statustext
elif text.lower() in statustext.lower(): elif text.lower() in statustext.lower():
return True return statustext
return False return None
def wait_statustext(self, text, timeout=20, the_function=None, check_context=False, regex=False, wallclock_timeout=False): def wait_statustext(self, text, timeout=20, the_function=None, check_context=False, regex=False, wallclock_timeout=False):
"""Wait for a specific STATUSTEXT.""" """Wait for a specific STATUSTEXT."""
@ -5419,12 +5419,14 @@ Also, ignores heartbeats not from our target system'''
# which checks all incoming messages. # which checks all incoming messages.
self.progress("Waiting for text : %s" % text.lower()) self.progress("Waiting for text : %s" % text.lower())
if check_context: if check_context:
if self.statustext_in_collections(text, regex=regex): statustext = self.statustext_in_collections(text, regex=regex)
if statustext:
self.progress("Found expected text in collection: %s" % text.lower()) self.progress("Found expected text in collection: %s" % text.lower())
return text return statustext
global statustext_found global statustext_found
global statustext_full global statustext_full
statustext_full = None
statustext_found = False statustext_found = False
def mh(mav, m): def mh(mav, m):
@ -5436,6 +5438,7 @@ Also, ignores heartbeats not from our target system'''
self.re_match = re.match(text, m.text) self.re_match = re.match(text, m.text)
if self.re_match: if self.re_match:
statustext_found = True statustext_found = True
statustext_full = m.text
if text.lower() in m.text.lower(): if text.lower() in m.text.lower():
self.progress("Received expected text: %s" % m.text.lower()) self.progress("Received expected text: %s" % m.text.lower())
statustext_found = True statustext_found = True
@ -5460,8 +5463,7 @@ Also, ignores heartbeats not from our target system'''
self.mav.recv_match(type='STATUSTEXT', blocking=True, timeout=0.1) self.mav.recv_match(type='STATUSTEXT', blocking=True, timeout=0.1)
finally: finally:
self.remove_message_hook(mh) self.remove_message_hook(mh)
if statustext_found: return statustext_full
return statustext_full
def get_mavlink_connection_going(self): def get_mavlink_connection_going(self):
# get a mavlink connection going # get a mavlink connection going