autotest: add additional tests for private channels

This commit is contained in:
Peter Barker 2022-11-24 14:56:15 +11:00 committed by Andrew Tridgell
parent 33a319aaf3
commit dcf41f2396
2 changed files with 59 additions and 2 deletions

View File

@ -3848,7 +3848,7 @@ class AutoTest(ABC):
raise ValueError("count %u not handled" % count)
self.progress("Files same")
def assert_not_receive_message(self, message, timeout=1, mav=None):
def assert_not_receive_message(self, message, timeout=1, mav=None, condition=None):
'''this is like assert_not_receiving_message but uses sim time not
wallclock time'''
self.progress("making sure we're not getting %s messages" % message)
@ -3857,7 +3857,7 @@ class AutoTest(ABC):
tstart = self.get_sim_time_cached()
while True:
m = mav.recv_match(type=message, blocking=True, timeout=0.1)
m = mav.recv_match(type=message, blocking=True, timeout=0.1, condition=condition)
if m is not None:
self.progress("Received: %s" % self.dump_message_verbose(m))
raise PreconditionFailedException("Receiving %s messages" % message)

View File

@ -6144,6 +6144,36 @@ Brakes have negligible effect (with=%0.2fm without=%0.2fm delta=%0.2fm)
self.send_poll_message("AUTOPILOT_VERSION", mav=mav2, target_sysid=134)
self.assert_not_receive_message("AUTOPILOT_VERSION", mav=mav2, timeout=10)
# make sure we get heartbeats on the main channel from the non-private mav2:
tstart = self.get_sim_time()
while True:
if self.get_sim_time_cached() - tstart > 5:
raise NotAchievedException("Did not get expected heartbeat from %u" % 7)
m = self.assert_receive_message("HEARTBEAT")
if m.get_srcSystem() == 7:
self.progress("Got heartbeat from (%u) on non-private channel" % 7)
break
# make sure we receive heartbeats from the autotest suite into
# the component:
tstart = self.get_sim_time()
while True:
if self.get_sim_time_cached() - tstart > 5:
raise NotAchievedException("Did not get expected heartbeat from %u" % self.mav.source_system)
m = self.assert_receive_message("HEARTBEAT", mav=mav2)
if m.get_srcSystem() == self.mav.source_system:
self.progress("Got heartbeat from (%u) on non-private channel" % self.mav.source_system)
break
def printmessage(mav, m):
global mav2
if mav == mav2:
return
print("Got (%u/%u) (%s) " % (m.get_srcSystem(), m.get_srcComponent(), str(m)))
# self.install_message_hook_context(printmessage)
# ensure setting the private channel mask doesn't cause us to
# execute these commands:
self.set_parameter("SERIAL2_OPTIONS", 1024)
@ -6156,6 +6186,33 @@ Brakes have negligible effect (with=%0.2fm without=%0.2fm delta=%0.2fm)
self.send_poll_message("AUTOPILOT_VERSION", mav=mav2, target_sysid=134)
self.assert_not_receive_message("AUTOPILOT_VERSION", mav=mav2, timeout=10)
# make sure messages from a private channel don't make it to
# the main channel:
self.drain_mav(self.mav)
self.drain_mav(mav2)
# make sure we do NOT get heartbeats on the main channel from
# the private mav2:
tstart = self.get_sim_time()
while True:
if self.get_sim_time_cached() - tstart > 5:
break
m = self.assert_receive_message("HEARTBEAT")
if m.get_srcSystem() == 7:
raise NotAchievedException("Got heartbeat from private channel")
self.progress("ensure no outside heartbeats reach private channels")
tstart = self.get_sim_time()
while True:
if self.get_sim_time_cached() - tstart > 5:
break
m = self.assert_receive_message("HEARTBEAT")
if m.get_srcSystem() == 1 and m.get_srcComponent() == 1:
continue
# note the above test which shows we get heartbeats from
# both the vehicle and this tests's special heartbeat
raise NotAchievedException("Got heartbeat on private channel from non-vehicle")
def tests(self):
'''return list of all tests'''
ret = super(AutoTestRover, self).tests()