From 1cfb4e4a4912f2650d0a3263e8b0cdbd4ef09b41 Mon Sep 17 00:00:00 2001 From: Peter Barker Date: Wed, 8 Jan 2020 23:28:43 +1100 Subject: [PATCH] autotest: test FRSky serial output by taking output via TCP --- Tools/autotest/arduplane.py | 4 + Tools/autotest/common.py | 167 ++++++++++++++++++++++++++++++++++++ 2 files changed, 171 insertions(+) diff --git a/Tools/autotest/arduplane.py b/Tools/autotest/arduplane.py index 28622f97db..e017b04562 100644 --- a/Tools/autotest/arduplane.py +++ b/Tools/autotest/arduplane.py @@ -1642,6 +1642,10 @@ class AutoTestPlane(AutoTest): "Test Buttons", self.test_button), + ("FRSkyD", + "Test FrSkyD serial output", + self.test_frsky_d), + ("AdvancedFailsafe", "Test Advanced Failsafe", self.test_advanced_failsafe), diff --git a/Tools/autotest/common.py b/Tools/autotest/common.py index 76d48ebf8f..e04621b825 100644 --- a/Tools/autotest/common.py +++ b/Tools/autotest/common.py @@ -1,6 +1,7 @@ from __future__ import print_function import abc +import errno import math import os import re @@ -12,6 +13,7 @@ import pexpect import fnmatch import operator import numpy +import socket from MAVProxy.modules.lib import mp_util @@ -162,6 +164,145 @@ class MAVProxyLogFile(object): else: sys.stdout.flush() +class FRSkyD(object): + def __init__(self, destination_address): + self.destination_address = destination_address + + self.state_WANT_START_STOP_D = 16, + self.state_WANT_ID = 17, + self.state_WANT_BYTE1 = 18, + self.state_WANT_BYTE2 = 19, + + self.START_STOP_D = 0x5E + self.BYTESTUFF_D = 0x5D + + self.dataid_GPS_ALT_BP = 0x01 + self.dataid_TEMP1 = 0x02 + self.dataid_FUEL = 0x04 + self.dataid_TEMP2 = 0x05 + self.dataid_GPS_ALT_AP = 0x09 + self.dataid_BARO_ALT_BP = 0x10 + self.dataid_GPS_SPEED_BP = 0x11 + self.dataid_GPS_LONG_BP = 0x12 + self.dataid_GPS_LAT_BP = 0x13 + self.dataid_GPS_COURS_BP = 0x14 + self.dataid_GPS_SPEED_AP = 0x19 + self.dataid_GPS_LONG_AP = 0x1A + self.dataid_GPS_LAT_AP = 0x1B + self.dataid_BARO_ALT_AP = 0x21 + self.dataid_GPS_LONG_EW = 0x22 + self.dataid_GPS_LAT_NS = 0x23 + self.dataid_CURRENT = 0x28 + self.dataid_VFAS = 0x39 + + self.state = self.state_WANT_START_STOP_D + + self.buffer = "" + self.connected = False + self.data_by_id = {} + self.port = None + self.bad_chars = 0 + + def progress(self, message): + print("FRSky: %s" % message) + + def connect(self): + try: + self.connected = False + self.progress("Connecting to (%s:%u)" % self.destination_address) + if self.port is not None: + try: + self.port.close() # might be reopening + except Exception as e: + pass + self.port = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.port.connect(self.destination_address) + self.port.setblocking(0) + self.port.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1) + self.connected = True + self.progress("Connected") + except IOError as e: + self.progress("Failed to connect: %s" % str(e)) + time.sleep(0.5) + return False + return True + + def do_read(self): + try: + data = self.port.recv(1024) + except socket.error as e: + if e.errno not in [ errno.EAGAIN, errno.EWOULDBLOCK ]: + self.progress("Exception: %s" % str(e)) + self.connected = False + return "" + if len(data) == 0: + self.progress("EOF") + self.connected = False + return "" +# self.progress("Read %u bytes" % len(data)) + return data + + def handle_data(self, dataid, value): + self.progress("%u=%u" % (dataid, value)) + self.data_by_id[dataid] = value + + def update_read(self): + self.buffer += self.do_read() + consume = None + while len(self.buffer): + if consume is not None: + self.buffer = self.buffer[consume:] + if len(self.buffer) == 0: + break + consume = 1 + b = ord(self.buffer[0]) + if self.state == self.state_WANT_START_STOP_D: + if b != self.START_STOP_D: + # we may come into a stream mid-way, so we can't judge + self.bad_chars += 1 + continue + self.state = self.state_WANT_ID + continue + elif self.state == self.state_WANT_ID: + self.dataid = b + self.state = self.state_WANT_BYTE1 + continue + elif self.state in [self.state_WANT_BYTE1, self.state_WANT_BYTE2]: + if b == 0x5D: + # byte-stuffed + if len(self.buffer) < 2: + # try again in a little while + consume = 0 + return + if ord(self.buffer[1]) == 0x3E: + b = self.START_STOP_D + elif ord(self.buffer[1]) == 0x3D: + b = self.BYTESTUFF_D; + else: + raise ValueError("Unknown stuffed byte") + consume = 2 + if self.state == self.state_WANT_BYTE1: + self.b1 = b + self.state = self.state_WANT_BYTE2 + continue + + data = self.b1 | b << 8 + self.handle_data(self.dataid, data) + self.state = self.state_WANT_START_STOP_D + + def get_data(self, dataid): + try: + return self.data_by_id[dataid] + except KeyError as e: + pass + return None + + def update(self): + if not self.connected: + if not self.connect(): + return + self.update_read() + class AutoTest(ABC): """Base abstract class. It implements the common function for all vehicle types. @@ -3738,6 +3879,32 @@ switch value''' if m3.state != 0: raise NotAchievedException("Didn't get expected mask back in message (mask=0 state=%u" % (m3.state)) + def test_frsky_d(self): + self.set_parameter("SERIAL5_PROTOCOL", 3) # serial5 is FRSky output + self.customise_SITL_commandline([ + "--uartF=tcp:6735" # serial5 spews to localhost:6735 + ]) + frsky = FRSkyD(("127.0.0.1", 6735)) + self.wait_ready_to_arm() + self.drain_mav_unparsed() + m = self.mav.recv_match(type='GLOBAL_POSITION_INT', blocking=True, timeout=1) + if m is None: + raise NotAchievedException("Did not receive GLOBAL_POSITION_INT") + gpi_abs_alt = m.alt / 1000 # mm -> m + tstart = self.get_sim_time_cached() + while True: + t2 = self.get_sim_time_cached() + if t2 - tstart > 10: + raise AutoTestTimeoutException("Failed to get frsky data") + frsky.update() + alt = frsky.get_data(frsky.dataid_GPS_ALT_BP) + self.progress("Got alt (%s)" % str(alt)) + if alt is None: + continue + self.drain_mav_unparsed() + if alt == gpi_abs_alt: + break + def tests(self): return [ ("PIDTuning",