autotest: add context_set_message_rate_hz

for magically unsetting message rates after we are no longer interested in them
This commit is contained in:
Peter Barker 2023-08-11 18:29:34 +10:00 committed by Andrew Tridgell
parent 808fcbda0e
commit 6f65b889c7
1 changed files with 30 additions and 0 deletions

View File

@ -203,6 +203,7 @@ class Context(object):
self.original_heartbeat_interval_ms = None
self.installed_scripts = []
self.installed_modules = []
self.overridden_message_rates = {}
# https://stackoverflow.com/questions/616645/how-do-i-duplicate-sys-stdout-to-a-log-file-in-python
@ -3420,6 +3421,14 @@ class AutoTest(ABC):
raise NotAchievedException("Air Temperature not received from HIGH_LATENCY2")
self.HIGH_LATENCY2_links()
def context_set_message_rate_hz(self, id, rate_hz):
overridden_message_rates = self.context_get().overridden_message_rates
if id not in overridden_message_rates:
overridden_message_rates[id] = self.get_message_rate(id)
self.set_message_rate_hz(id, rate_hz)
def HIGH_LATENCY2_links(self):
self.start_subtest("SerialProtocol_MAVLinkHL links")
@ -5623,6 +5632,8 @@ class AutoTest(ABC):
self.remove_message_hook(hook)
for script in dead.installed_scripts:
self.remove_installed_script(script)
for (message_id, interval_us) in dead.overridden_message_rates.items():
self.set_message_interval(message_id, interval_us)
for module in dead.installed_modules:
print("Removing module (%s)" % module)
self.remove_installed_modules(module)
@ -9745,6 +9756,25 @@ Also, ignores heartbeats not from our target system'''
0,
0)
def get_message_interval(self, victim_message, mav=None):
'''returns message interval in microseconds'''
self.send_get_message_interval(victim_message, mav=mav)
m = self.assert_receive_message('MESSAGE_INTERVAL', timeout=1, mav=mav)
if m.message_id != victim_message:
raise NotAchievedException("Unexpected ID in MESSAGE_INTERVAL")
return m.interval_us
def set_message_interval(self, victim_message, interval_us, mav=None):
'''sets message interval in microseconds'''
if type(victim_message) == str:
victim_message = eval("mavutil.mavlink.MAVLINK_MSG_ID_%s" % victim_message)
self.run_cmd(
mavutil.mavlink.MAV_CMD_SET_MESSAGE_INTERVAL,
p1=victim_message,
p2=interval_us,
mav=mav,
)
def test_rate(self,
desc,
in_rate,