Tools: autotest: add test for SET_MESSAGE_INTERVAL

This commit is contained in:
Peter Barker 2018-05-22 14:19:45 +10:00 committed by Randy Mackay
parent ef3654e343
commit aff81d6f76

View File

@ -775,6 +775,101 @@ Brakes have negligible effect (with=%0.2fm without=%0.2fm delta=%0.2fm)
if ex is not None: if ex is not None:
raise ex raise ex
def get_message_rate(self, victim_message, timeout):
tstart = self.get_sim_time()
count = 0
while self.get_sim_time_cached() < tstart + timeout:
m = self.mav.recv_match(type=victim_message,
blocking=True,
timeout=0.1
)
if m is not None:
count += 1
time_delta = self.get_sim_time_cached() - tstart
self.progress("%s count after %f seconds: %u" %
(victim_message, time_delta, count))
return count/time_delta
def rate_to_interval_us(self, rate):
return 1/float(rate)*1000000.0
def test_rate(self, desc, in_rate, expected_rate):
self.progress("###### %s" % desc)
self.progress("Setting rate to %u" % in_rate)
# SET_MESSAGE_INTERVAL rates are given in microseconds
if in_rate == 0 or in_rate == -1:
set_interval = in_rate
else:
set_interval = self.rate_to_interval_us(in_rate)
self.mavproxy.send("long SET_MESSAGE_INTERVAL %u %d\n" %
(self.victim_message_id, set_interval))
self.mav.recv_match(type='COMMAND_ACK', blocking=True)
new_measured_rate = self.get_message_rate(self.victim_message, 10)
self.progress("Measured rate: %f (want %u)" %
(new_measured_rate, expected_rate))
if round(new_measured_rate) != expected_rate:
raise NotAchievedException("Rate not achieved (got %f want %u)" %
(new_measured_rate, expected_rate))
def test_set_message_interval(self):
self.victim_message = 'VFR_HUD'
self.victim_message_id = mavutil.mavlink.MAVLINK_MSG_ID_VFR_HUD
try:
# tell MAVProxy to stop stuffing around with the rates:
self.mavproxy.send("set streamrate -1\n")
rate = round(self.get_message_rate(self.victim_message, 20))
self.progress("Initial rate: %u" % rate)
self.test_rate("Test set to %u" % (rate/2,), rate/2, rate/2)
# this assumes the streamrates have not been played with:
self.test_rate("Resetting original rate using 0-value", 0, rate)
self.test_rate("Disabling using -1-value", -1, 0)
self.test_rate("Resetting original rate", rate, rate)
self.progress("try getting a message which is not ordinarily streamed out")
rate = round(self.get_message_rate("CAMERA_FEEDBACK", 20))
if rate != 0:
raise PreconditionFailedException("Already getting CAMERA_FEEDBACK")
self.progress("try various message rates")
camera_feedback_id = 180
for want_rate in 5, 6, 7, 8, 9, 10, 11, 12, 13:
self.mavproxy.send("long SET_MESSAGE_INTERVAL %u %d\n" %
(camera_feedback_id,
self.rate_to_interval_us(want_rate)))
rate = round(self.get_message_rate("CAMERA_FEEDBACK", 20))
self.progress("Want=%f got=%f" % (want_rate, rate))
if rate != want_rate:
raise NotAchievedException("Did not get expected rate")
self.progress("try at the main loop rate")
# have to reset the speedup as MAVProxy can't keep up otherwise
old_speedup = self.get_parameter("SIM_SPEEDUP")
self.set_parameter("SIM_SPEEDUP", 1.0)
camera_feedback_id = 180
# ArduPilot currently limits message rate to 80% of main loop rate:
want_rate = self.get_parameter("SCHED_LOOP_RATE") * 0.8
self.mavproxy.send("long SET_MESSAGE_INTERVAL %u %d\n" %
(camera_feedback_id,
self.rate_to_interval_us(want_rate)))
rate = round(self.get_message_rate("CAMERA_FEEDBACK", 20))
self.set_parameter("SIM_SPEEDUP", old_speedup)
self.progress("Want=%f got=%f" % (want_rate, rate))
if abs(rate - want_rate) > 2:
raise NotAchievedException("Did not get expected rate")
sr = self.sitl_streamrate()
self.mavproxy.send("set streamrate %u\n" % sr)
except Exception as e:
# tell MAVProxy to start stuffing around with the rates:
sr = self.sitl_streamrate()
self.mavproxy.send("set streamrate %u\n" % sr)
raise e
def autotest(self): def autotest(self):
"""Autotest APMrover2 in SITL.""" """Autotest APMrover2 in SITL."""
self.check_test_syntax(test_file=os.path.realpath(__file__)) self.check_test_syntax(test_file=os.path.realpath(__file__))
@ -853,6 +948,9 @@ Brakes have negligible effect (with=%0.2fm without=%0.2fm delta=%0.2fm)
self.run_test("Test Camera Mission Items", self.run_test("Test Camera Mission Items",
self.test_camera_mission_items) self.test_camera_mission_items)
self.run_test("Test MAV_CMD_SET_MESSAGE_INTERVAL",
self.test_set_message_interval)
self.run_test("Download logs", lambda: self.run_test("Download logs", lambda:
self.log_download( self.log_download(
self.buildlogs_path("APMrover2-log.bin"), self.buildlogs_path("APMrover2-log.bin"),