autotest: add a poll_message method

This commit is contained in:
Peter Barker 2020-10-06 22:54:58 +11:00 committed by Peter Barker
parent d31a00a6a1
commit 1d0735f206
2 changed files with 29 additions and 7 deletions

View File

@ -269,6 +269,13 @@ class AutoTestCopter(AutoTest):
self.set_rc(2, 1500) self.set_rc(2, 1500)
self.do_RTL() self.do_RTL()
def fly_to_origin(self, final_alt=10):
origin = self.poll_message("GPS_GLOBAL_ORIGIN")
self.change_mode("GUIDED")
self.guided_move_global_relative_alt(origin.latitude,
origin.longitude,
final_alt)
def change_alt(self, alt_min, climb_throttle=1920, descend_throttle=1080): def change_alt(self, alt_min, climb_throttle=1920, descend_throttle=1080):
"""Change altitude.""" """Change altitude."""
def adjust_altitude(current_alt, target_alt, accuracy): def adjust_altitude(current_alt, target_alt, accuracy):

View File

@ -5960,15 +5960,15 @@ Also, ignores heartbeats not from our target system'''
if ex is not None: if ex is not None:
raise ex raise ex
def test_request_message(self, timeout=60): def poll_message(self, message_id, timeout=10):
rate = round(self.get_message_rate("CAMERA_FEEDBACK", 10)) if type(message_id) == str:
if rate != 0: message_id = eval("mavutil.mavlink.MAVLINK_MSG_ID_%s" % message_id)
raise PreconditionFailedException("Receving camera feedback")
# temporarily use a constant in place of # temporarily use a constant in place of
# mavutil.mavlink.MAV_CMD_REQUEST_MESSAGE until we have a # mavutil.mavlink.MAV_CMD_REQUEST_MESSAGE until we have a
# pymavlink release: # pymavlink release:
self.run_cmd(512, tstart = self.get_sim_time()
180, self.run_cmd(mavutil.mavlink.MAV_CMD_REQUEST_MESSAGE,
message_id,
0, 0,
0, 0,
0, 0,
@ -5976,7 +5976,22 @@ Also, ignores heartbeats not from our target system'''
0, 0,
0, 0,
timeout=timeout) timeout=timeout)
m = self.mav.recv_match(type='CAMERA_FEEDBACK', blocking=True, timeout=1) while True:
if self.get_sim_time_cached() - tstart > timeout:
raise NotAchievedException("Did not receive polled message")
m = self.mav.recv_match(blocking=True,
timeout=0.1)
if m is None:
continue
if m.id != message_id:
continue
return m
def test_request_message(self, timeout=60):
rate = round(self.get_message_rate("CAMERA_FEEDBACK", 10))
if rate != 0:
raise PreconditionFailedException("Receving camera feedback")
m = self.poll_message("CAMERA_FEEDBACK")
if m is None: if m is None:
raise NotAchievedException("Requested CAMERA_FEEDBACK did not arrive") raise NotAchievedException("Requested CAMERA_FEEDBACK did not arrive")