autotest: fix attitude test

autotest: add a get_messages_frame method

Attempts to get a set of messages as close as reasonable in time.
This commit is contained in:
Peter Barker 2021-03-15 12:38:08 +11:00 committed by Peter Barker
parent b873e7fcb0
commit 0c7f3d20a4
2 changed files with 112 additions and 52 deletions

View File

@ -12,6 +12,7 @@ import os
import time import time
from pymavlink import quaternion from pymavlink import quaternion
from pymavlink import mavextra
from pymavlink import mavutil from pymavlink import mavutil
from common import AutoTest from common import AutoTest
@ -1424,6 +1425,70 @@ class AutoTestPlane(AutoTest):
mavproxy:''' mavproxy:'''
self.clear_fence_using_mavproxy() self.clear_fence_using_mavproxy()
def check_attitudes_match(self, a, b):
'''make sure ahrs2 and simstate and ATTTIUDE_QUATERNION all match'''
# these are ordered to bookend the list with timestamps (which
# both attitude messages have):
get_names = ['ATTITUDE', 'SIMSTATE', 'AHRS2', 'ATTITUDE_QUATERNION']
msgs = self.get_messages_frame(get_names)
for get_name in get_names:
self.progress("%s: %s" % (get_name, msgs[get_name]))
simstate = msgs['SIMSTATE']
attitude = msgs['ATTITUDE']
ahrs2 = msgs['AHRS2']
attitude_quaternion = msgs['ATTITUDE_QUATERNION']
# check ATTITUDE
want = math.degrees(simstate.roll)
got = math.degrees(attitude.roll)
if abs(mavextra.angle_diff(want, got)) > 20:
raise NotAchievedException("ATTITUDE.Roll looks bad (want=%f got=%f)" %
(want, got))
want = math.degrees(simstate.pitch)
got = math.degrees(attitude.pitch)
if abs(mavextra.angle_diff(want, got)) > 20:
raise NotAchievedException("ATTITUDE.Pitch looks bad (want=%f got=%f)" %
(want, got))
# check AHRS2
want = math.degrees(simstate.roll)
got = math.degrees(ahrs2.roll)
if abs(mavextra.angle_diff(want, got)) > 20:
raise NotAchievedException("AHRS2.Roll looks bad (want=%f got=%f)" %
(want, got))
want = math.degrees(simstate.pitch)
got = math.degrees(ahrs2.pitch)
if abs(mavextra.angle_diff(want, got)) > 20:
raise NotAchievedException("AHRS2.Pitch looks bad (want=%f got=%f)" %
(want, got))
# check ATTITUDE_QUATERNION
q = quaternion.Quaternion([
attitude_quaternion.q1,
attitude_quaternion.q2,
attitude_quaternion.q3,
attitude_quaternion.q4
])
euler = q.euler
self.progress("attquat:%s q:%s euler:%s" % (
str(attitude_quaternion), q, euler))
want = math.degrees(simstate.roll)
got = math.degrees(euler[0])
if mavextra.angle_diff(want, got) > 20:
raise NotAchievedException("quat roll differs from attitude roll; want=%f got=%f" %
(want, got))
want = math.degrees(simstate.pitch)
got = math.degrees(euler[1])
if mavextra.angle_diff(want, got) > 20:
raise NotAchievedException("quat pitch differs from attitude pitch; want=%f got=%f" %
(want, got))
def fly_ahrs2_test(self): def fly_ahrs2_test(self):
'''check secondary estimator is looking OK''' '''check secondary estimator is looking OK'''
@ -1444,44 +1509,7 @@ class AutoTestPlane(AutoTest):
if self.get_distance_int(gpi, ahrs2) > 10: if self.get_distance_int(gpi, ahrs2) > 10:
raise NotAchievedException("Secondary location looks bad") raise NotAchievedException("Secondary location looks bad")
# check attitude self.check_attitudes_match(1, 2)
simstate = self.mav.recv_match(type='SIMSTATE', blocking=True, timeout=1)
attitude_quaternion = self.poll_message('ATTITUDE_QUATERNION')
if simstate is None:
raise NotAchievedException("Did not receive SIMSTATE message")
self.progress("SIMSTATE: %s" % str(simstate))
want = math.degrees(simstate.roll)
got = math.degrees(ahrs2.roll)
if abs(want - got) > 5:
raise NotAchievedException("Secondary roll looks bad (want=%f got=%f)" %
(want, got))
want = math.degrees(simstate.pitch)
got = math.degrees(ahrs2.pitch)
if abs(want - got) > 5:
raise NotAchievedException("Secondary pitch looks bad (want=%f got=%f)" %
(want, got))
q = quaternion.Quaternion([
attitude_quaternion.q1,
attitude_quaternion.q2,
attitude_quaternion.q3,
attitude_quaternion.q4
])
euler = q.euler
self.progress("attquat:%s q:%s euler:%s" % (
str(attitude_quaternion), q, euler))
want = math.degrees(simstate.roll)
got = math.degrees(euler[0])
if want - got > 5:
raise NotAchievedException("quat roll differs from attitude roll; want=%f got=%f" %
(want, got))
want = math.degrees(simstate.pitch)
got = math.degrees(euler[1])
if want - got > 5:
raise NotAchievedException("quat pitch differs from attitude pitch; want=%f got=%f" %
(want, got))
def test_main_flight(self): def test_main_flight(self):

View File

@ -4223,7 +4223,7 @@ class AutoTest(ABC):
target_compid=None, target_compid=None,
timeout=10, timeout=10,
quiet=False): quiet=False):
self.drain_mav_unparsed() self.drain_mav()
self.get_sim_time() # required for timeout in run_cmd_get_ack to work self.get_sim_time() # required for timeout in run_cmd_get_ack to work
self.send_cmd( self.send_cmd(
command, command,
@ -7441,14 +7441,10 @@ Also, ignores heartbeats not from our target system'''
if ex is not None: if ex is not None:
raise ex raise ex
def poll_message(self, message_id, timeout=10): def send_poll_message(self, message_id, target_sysid=None, target_compid=None):
if type(message_id) == str: if type(message_id) == str:
message_id = eval("mavutil.mavlink.MAVLINK_MSG_ID_%s" % message_id) message_id = eval("mavutil.mavlink.MAVLINK_MSG_ID_%s" % message_id)
# temporarily use a constant in place of self.send_cmd(mavutil.mavlink.MAV_CMD_REQUEST_MESSAGE,
# mavutil.mavlink.MAV_CMD_REQUEST_MESSAGE until we have a
# pymavlink release:
tstart = self.get_sim_time()
self.run_cmd(mavutil.mavlink.MAV_CMD_REQUEST_MESSAGE,
message_id, message_id,
0, 0,
0, 0,
@ -7456,7 +7452,21 @@ Also, ignores heartbeats not from our target system'''
0, 0,
0, 0,
0, 0,
timeout=timeout) target_sysid=target_sysid,
target_compid=target_compid)
def poll_message(self, message_id, timeout=10):
if type(message_id) == str:
message_id = eval("mavutil.mavlink.MAVLINK_MSG_ID_%s" % message_id)
self.drain_mav_unparsed()
tstart = self.get_sim_time() # required for timeout in run_cmd_get_ack to work
self.send_poll_message(message_id)
self.run_cmd_get_ack(
mavutil.mavlink.MAV_CMD_REQUEST_MESSAGE,
mavutil.mavlink.MAV_RESULT_ACCEPTED,
timeout,
quiet=False
)
while True: while True:
if self.get_sim_time_cached() - tstart > timeout: if self.get_sim_time_cached() - tstart > timeout:
raise NotAchievedException("Did not receive polled message") raise NotAchievedException("Did not receive polled message")
@ -7468,6 +7478,28 @@ Also, ignores heartbeats not from our target system'''
continue continue
return m return m
def get_messages_frame(self, msg_names):
'''try to get a "frame" of named messages - a set of messages as close
in time as possible'''
msgs = {}
def get_msgs(mav, m):
t = m.get_type()
if t in msg_names:
msgs[t] = m
self.do_timesync_roundtrip()
self.install_message_hook(get_msgs)
for msg_name in msg_names:
self.send_poll_message(msg_name)
while True:
self.mav.recv_match(blocking=True)
if len(msgs.keys()) == len(msg_names):
break
self.remove_message_hook(get_msgs)
return msgs
def test_request_message(self, timeout=60): def test_request_message(self, timeout=60):
rate = round(self.get_message_rate("CAMERA_FEEDBACK", 10)) rate = round(self.get_message_rate("CAMERA_FEEDBACK", 10))
if rate != 0: if rate != 0: