autotest: create collections-on-context to record selected messages

This commit is contained in:
Peter Barker 2020-09-07 16:57:10 +10:00 committed by Peter Barker
parent f3866e5f39
commit 9228f47dc9
2 changed files with 63 additions and 10 deletions

View File

@ -843,9 +843,11 @@ class AutoTestPlane(AutoTest):
self.set_parameter("THR_FS_VALUE", 960)
self.progress("Failing receiver (throttle-to-950)")
self.context_collect("HEARTBEAT")
self.set_parameter("SIM_RC_FAIL", 2) # throttle-to-950
self.wait_mode('CIRCLE') # short failsafe
self.wait_mode('RTL') # long failsafe
if (not self.get_mode_from_mode_mapping("CIRCLE") in [x.custom_mode for x in self.context_stop_collecting("HEARTBEAT")]):
raise NotAchievedException("Did not go via circle mode")
self.progress("Ensure we've had our throttle squashed to 950")
self.wait_rc_channel_value(3, 950)
self.drain_mav_unparsed()
@ -878,9 +880,11 @@ class AutoTestPlane(AutoTest):
self.change_mode('MANUAL')
self.progress("Failing receiver (no-pulses)")
self.context_collect("HEARTBEAT")
self.set_parameter("SIM_RC_FAIL", 1) # no-pulses
self.wait_mode('CIRCLE') # short failsafe
self.wait_mode('RTL') # long failsafe
if (not self.get_mode_from_mode_mapping("CIRCLE") in [x.custom_mode for x in self.context_stop_collecting("HEARTBEAT")]):
raise NotAchievedException("Did not go via circle mode")
self.drain_mav_unparsed()
m = self.mav.recv_match(type='SYS_STATUS', blocking=True)
print("%s" % str(m))
@ -913,20 +917,22 @@ class AutoTestPlane(AutoTest):
self.progress("Ensure long failsafe can trigger when short failsafe disabled")
self.context_push()
self.context_collect("STATUSTEXT")
ex = None
try:
self.set_parameter("FS_SHORT_ACTN", 3) # 3 means disabled
self.set_parameter("SIM_RC_FAIL", 1)
self.wait_statustext("Long event on")
self.wait_statustext("Long event on", check_context=True)
self.wait_mode("RTL")
# self.context_clear_collection("STATUSTEXT")
self.set_parameter("SIM_RC_FAIL", 0)
self.wait_text("Long event off")
self.wait_text("Long event off", check_context=True)
self.change_mode("MANUAL")
self.progress("Trying again with THR_FS_VALUE")
self.set_parameter("THR_FS_VALUE", 960)
self.set_parameter("SIM_RC_FAIL", 2)
self.wait_statustext("Long event on")
self.wait_statustext("Long event on", check_context=True)
self.wait_mode("RTL")
except Exception as e:
self.progress("Exception caught:")

View File

@ -134,6 +134,7 @@ class Context(object):
self.parameters = []
self.sitl_commandline_customised = False
self.message_hooks = []
self.collections = {}
# https://stackoverflow.com/questions/616645/how-do-i-duplicate-sys-stdout-to-a-log-file-in-python
class TeeBoth(object):
@ -2222,10 +2223,14 @@ class AutoTest(ABC):
def install_message_hook_context(self, hook):
'''installs a message hook which will be removed when the context goes
away'''
if self.mav is None:
return
self.mav.message_hooks.append(hook)
self.context_get().message_hooks.append(hook)
def remove_message_hook(self, hook):
if self.mav is None:
return
oldlen = len(self.mav.message_hooks)
self.mav.message_hooks = list(filter(lambda x : x != hook,
self.mav.message_hooks))
@ -3019,7 +3024,9 @@ class AutoTest(ABC):
while attempts > 0:
if verbose:
self.progress("Sending param_request_read for (%s)" % name)
self.drain_mav_unparsed(quiet=True)
# we MUST parse here or collections fail where we need
# them to work!
self.drain_mav(quiet=True)
tstart = self.get_sim_time()
encname = name
if sys.version_info.major >= 3 and type(encname) != bytes:
@ -3073,7 +3080,38 @@ class AutoTest(ABC):
def context_push(self):
"""Save a copy of the parameters."""
self.contexts.append(Context())
context = Context()
self.contexts.append(context)
# add a message hook so we can collect messages conveniently:
def mh(mav, m):
t = m.get_type()
if t in context.collections:
print("m=%s" % str(m))
context.collections[t].append(m)
self.install_message_hook_context(mh)
def context_collect(self, msg_type):
'''start collecting messages of type msg_type into context collection'''
context = self.context_get()
if msg_type in context.collections:
return
context.collections[msg_type] = []
def context_clear_collection(self, msg_type):
'''clear collection of message type msg_type'''
context = self.context_get()
if msg_type not in context.collections:
raise NotAchievedException("Not collecting (%s)" % str(msg_type))
context.collections[msg_type] = []
def context_stop_collecting(self, msg_type):
'''stop collecting messages of type msg_type in context collection. Returns the collected messages'''
context = self.context_get()
if msg_type not in context.collections:
raise Exception("Not collecting %s" % str(msg_type))
ret = context.collections[msg_type]
del context.collections[msg_type]
return ret
def context_pop(self):
"""Set parameters to origin values in reverse order."""
@ -4010,7 +4048,7 @@ Also, ignores heartbeats not from our target system'''
def wait_text(self, *args, **kwargs):
self.wait_statustext(*args, **kwargs)
def wait_statustext(self, text, timeout=20, the_function=None):
def wait_statustext(self, text, timeout=20, the_function=None, check_context=False):
"""Wait for a specific STATUSTEXT."""
# Statustexts are often triggered by something we've just
@ -4020,6 +4058,15 @@ Also, ignores heartbeats not from our target system'''
# a new SYSTEM_TIME message), so we install a message hook
# which checks all incoming messages.
self.progress("Waiting for text : %s" % text.lower())
if check_context:
c = self.context_get()
if "STATUSTEXT" not in c.collections:
raise NotAchievedException("Asked to check context but it isn't collecting!")
for statustext in [x.text for x in c.collections["STATUSTEXT"]]:
if text.lower() in statustext.lower():
self.progress("Found expected text in collection: %s" % text.lower())
return
global statustext_found
statustext_found = False
def mh(mav, m):
@ -4027,7 +4074,7 @@ Also, ignores heartbeats not from our target system'''
if m.get_type() != "STATUSTEXT":
return
if text.lower() in m.text.lower():
self.progress("Received expected text : %s" % m.text.lower())
self.progress("Received expected text: %s" % m.text.lower())
statustext_found = True
self.install_message_hook_context(mh)
tstart = self.get_sim_time()
@ -4037,7 +4084,7 @@ Also, ignores heartbeats not from our target system'''
if the_function is not None:
the_function()
m = self.mav.recv_match(type='STATUSTEXT', blocking=True, timeout=0.1)
raise AutoTestTimeoutException("Failed to received text : %s" %
raise AutoTestTimeoutException("Failed to receive text: %s" %
text.lower())
def get_mavlink_connection_going(self):