From b0aceaea703e5ec8b0a11eee9f1b84ef42e3a733 Mon Sep 17 00:00:00 2001 From: Peter Barker Date: Fri, 7 Feb 2020 14:57:08 +1100 Subject: [PATCH] autotest: add tests for LTM telemetry output --- Tools/autotest/arduplane.py | 4 + Tools/autotest/common.py | 255 ++++++++++++++++++++++++++++++++++++ 2 files changed, 259 insertions(+) diff --git a/Tools/autotest/arduplane.py b/Tools/autotest/arduplane.py index faed3893c7..d79e0bfda4 100644 --- a/Tools/autotest/arduplane.py +++ b/Tools/autotest/arduplane.py @@ -1713,6 +1713,10 @@ class AutoTestPlane(AutoTest): "Test FrSkyD serial output", self.test_frsky_d), + ("LTM", + "Test LTM serial output", + self.test_ltm), + ("AdvancedFailsafe", "Test Advanced Failsafe", self.test_advanced_failsafe), diff --git a/Tools/autotest/common.py b/Tools/autotest/common.py index a043aad9c5..356bbb723d 100644 --- a/Tools/autotest/common.py +++ b/Tools/autotest/common.py @@ -216,6 +216,152 @@ class Telem(object): return self.update_read() +class LTM(Telem): + def __init__(self, destination_address): + super(LTM, self).__init__(destination_address) + + self.HEADER1 = 0x24 + self.HEADER2 = 0x54 + + self.FRAME_G = 0x47 + self.FRAME_A = 0x41 + self.FRAME_S = 0x53 + + self.frame_lengths = { + self.FRAME_G: 18, + self.FRAME_A: 10, + self.FRAME_S: 11, + } + self.frame_lengths = { + self.FRAME_G: 18, + self.FRAME_A: 10, + self.FRAME_S: 11, + } + + self.data_by_id = {} + self.frames = {} + + def g(self): + return self.frames.get(self.FRAME_G, None) + def a(self): + return self.frames.get(self.FRAME_A, None) + def s(self): + return self.frames.get(self.FRAME_S, None) + + def progress(self, message): + print("LTM: %s" % message) + + def handle_data(self, dataid, value): + self.progress("%u=%u" % (dataid, value)) + self.data_by_id[dataid] = value + + def consume_frame(self): + frame_type = ord(self.buffer[2]) + frame_length = self.frame_lengths[frame_type] + # check frame CRC + crc = 0 + count = 0 + for c in self.buffer[3:frame_length-1]: + old = crc + crc ^= ord(c) + count += 1 + if crc != ord(self.buffer[frame_length-1]): + raise NotAchievedException("Invalid checksum on frame type %s" % str(chr(frame_type))) +# self.progress("Received valid %s frame" % str(chr(frame_type))) + + class Frame(object): + def __init__(self, buffer): + self.buffer = buffer + def intn(self, offset, count): + ret = 0 + for i in range(offset, offset+count): + print("byte: %02x" % ord(self.buffer[i])) + ret = ret | (ord(self.buffer[i]) << ((i-offset)*8)) + return ret + def int32(self, offset): + t = struct.unpack(">2) + def fix_type(self): + s = ord(self.buffer[16]) + return s & 0b11 + + class FrameA(Frame): + def __init__(self, buffer): + super(FrameA, self,).__init__(buffer) + def pitch(self): + return self.int16(3) + def roll(self): + return self.int16(5) + def hdg(self): + return self.int16(7) + class FrameS(Frame): + def __init__(self, buffer): + super(FrameS, self,).__init__(buffer) + + if frame_type == self.FRAME_G: + frame = FrameG(self.buffer[0:frame_length-1]) + elif frame_type == self.FRAME_A: + frame = FrameA(self.buffer[0:frame_length-1]) + elif frame_type == self.FRAME_S: + frame = FrameS(self.buffer[0:frame_length-1]) + else: + raise NotAchievedException("Bad frame?!?!?!") + self.buffer = self.buffer[frame_length:] + self.frames[frame_type] = frame + + def update_read(self): + self.buffer += self.do_read() + while len(self.buffer): + if len(self.buffer) == 0: + break + if sys.version_info.major >= 3: + b = self.buffer[0] + else: + b = ord(self.buffer[0]) + if ord(self.buffer[0]) != self.HEADER1: + self.bad_chars += 1 + self.buffer = self.buffer[1:] + continue + if ord(self.buffer[1]) != self.HEADER2: + self.bad_chars += 1 + self.buffer = self.buffer[1:] + continue + if ord(self.buffer[2]) not in [self.FRAME_G, self.FRAME_A, self.FRAME_S]: + self.bad_chars += 1 + self.buffer = self.buffer[1:] + continue + frame_len = self.frame_lengths[ord(self.buffer[2])] + if len(self.buffer) < frame_len: + continue + self.consume_frame() + + def get_data(self, dataid): + try: + return self.data_by_id[dataid] + except KeyError as e: + pass + return None + class FRSky(Telem): def __init__(self, destination_address): super(FRSky, self).__init__(destination_address) @@ -4433,6 +4579,115 @@ switch value''' if alt == gpi_abs_alt: break + def test_ltm_g(self, ltm): + g = ltm.g() + if g is None: + return + m = self.mav.recv_match(type='GLOBAL_POSITION_INT', blocking=True) + print("m: %s" % str(m)) + + print("g.lat=%s m.lat=%s" % (str(g.lat()), str(m.lat))) + if abs(m.lat - g.lat()) > 10: + return False + + print("g.lon:%s m.lon:%s" % (str(g.lon()), str(m.lon))) + if abs(m.lon - g.lon()) > 10: + return False + + print("gndspeed: %s" % str(g.gndspeed())) + if g.gndspeed() != 0: + # FIXME if we start the vehicle moving.... check against VFR_HUD? + return False + + print("g.alt=%s m.alt=%s" % (str(g.alt()/100.0), str(m.relative_alt/1000.0))) + if abs(m.relative_alt/1000.0 - g.alt()/100.0) > 1: + return False + + print("sats: %s" % str(g.sats())) + m = self.mav.recv_match(type='GPS_RAW_INT', blocking=True) + if m.satellites_visible != g.sats(): + return False + + constrained_fix_type = m.fix_type + if constrained_fix_type > 3: + constrained_fix_type = 3 + print("fix_type: %s" % g.fix_type()) + if constrained_fix_type != g.fix_type(): + return False + + return True + + def test_ltm_a(self, ltm): + a = ltm.a() + if a is None: + return + m = self.mav.recv_match(type='ATTITUDE', blocking=True) + + pitch = a.pitch() + print("pitch: %s" % str(pitch)) + if abs(math.degrees(m.pitch) - pitch) > 1: + return False + + roll = a.roll() + print("roll: %s" % str(roll)) + if abs(math.degrees(m.roll) - roll) > 1: + return False + + hdg = a.hdg() + myaw = math.degrees(m.yaw) + myaw += 360 + myaw %= 360 + print("a.hdg=%s m.hdg=%s" % (str(hdg), str(myaw))) + if abs(myaw - hdg) > 1: + return False + + return True + + def test_ltm_s(self, ltm): + s = ltm.s() + if s is None: + return + # FIXME. Actually check the field values are correct :-) + return True + + def test_ltm(self): + self.set_parameter("SERIAL5_PROTOCOL", 25) # serial5 is LTM output + self.customise_SITL_commandline([ + "--uartF=tcp:6735" # serial5 spews to localhost:6735 + ]) + ltm = LTM(("127.0.0.1", 6735)) + self.wait_ready_to_arm() + self.drain_mav_unparsed() + m = self.mav.recv_match(type='GLOBAL_POSITION_INT', blocking=True, timeout=1) + if m is None: + raise NotAchievedException("Did not receive GLOBAL_POSITION_INT") + gpi_abs_alt = int(m.alt / 1000) # mm -> m + + wants = { + "g": self.test_ltm_g, + "a": self.test_ltm_a, + "s": self.test_ltm_s, + } + + tstart = self.get_sim_time_cached() + while True: + self.progress("Still wanting (%s)" % + ",".join([ ("%s" % x) for x in wants.keys()])) + if len(wants) == 0: + break + now = self.get_sim_time_cached() + if now - tstart > 10: + raise AutoTestTimeoutException("Failed to get ltm data") + + ltm.update() + + wants_copy = copy.copy(wants) + for want in wants_copy: + self.progress("Checking %s" % str(want)) + if wants[want](ltm): + self.progress(" Fulfilled") + del wants[want] + def tests(self): return [ ("PIDTuning",