diff --git a/Tools/autotest/arduplane.py b/Tools/autotest/arduplane.py index e017b04562..5d41df7dba 100644 --- a/Tools/autotest/arduplane.py +++ b/Tools/autotest/arduplane.py @@ -1642,6 +1642,10 @@ class AutoTestPlane(AutoTest): "Test Buttons", self.test_button), + ("FRSkyPassThrough", + "Test FrSky PassThrough serial output", + self.test_frsky_passthrough), + ("FRSkyD", "Test FrSkyD serial output", self.test_frsky_d), diff --git a/Tools/autotest/common.py b/Tools/autotest/common.py index e04621b825..040ee3c609 100644 --- a/Tools/autotest/common.py +++ b/Tools/autotest/common.py @@ -1,6 +1,7 @@ from __future__ import print_function import abc +import copy import errno import math import os @@ -14,6 +15,7 @@ import fnmatch import operator import numpy import socket +import struct from MAVProxy.modules.lib import mp_util @@ -164,17 +166,13 @@ class MAVProxyLogFile(object): else: sys.stdout.flush() -class FRSkyD(object): +class FRSky(object): def __init__(self, destination_address): self.destination_address = destination_address - self.state_WANT_START_STOP_D = 16, - self.state_WANT_ID = 17, - self.state_WANT_BYTE1 = 18, - self.state_WANT_BYTE2 = 19, - - self.START_STOP_D = 0x5E - self.BYTESTUFF_D = 0x5D + self.buffer = "" + self.connected = False + self.port = None self.dataid_GPS_ALT_BP = 0x01 self.dataid_TEMP1 = 0x02 @@ -195,17 +193,6 @@ class FRSkyD(object): self.dataid_CURRENT = 0x28 self.dataid_VFAS = 0x39 - self.state = self.state_WANT_START_STOP_D - - self.buffer = "" - self.connected = False - self.data_by_id = {} - self.port = None - self.bad_chars = 0 - - def progress(self, message): - print("FRSky: %s" % message) - def connect(self): try: self.connected = False @@ -242,6 +229,32 @@ class FRSkyD(object): # self.progress("Read %u bytes" % len(data)) return data + def update(self): + if not self.connected: + if not self.connect(): + return + self.update_read() + +class FRSkyD(FRSky): + def __init__(self, destination_address): + super(FRSkyD, self).__init__(destination_address) + + self.state_WANT_START_STOP_D = 16, + self.state_WANT_ID = 17, + self.state_WANT_BYTE1 = 18, + self.state_WANT_BYTE2 = 19, + + self.START_STOP_D = 0x5E + self.BYTESTUFF_D = 0x5D + + self.state = self.state_WANT_START_STOP_D + + self.data_by_id = {} + self.bad_chars = 0 + + def progress(self, message): + print("FRSkyD: %s" % message) + def handle_data(self, dataid, value): self.progress("%u=%u" % (dataid, value)) self.data_by_id[dataid] = value @@ -297,11 +310,164 @@ class FRSkyD(object): pass return None + +class FRSkyPassThrough(FRSky): + def __init__(self, destination_address): + super(FRSkyPassThrough, self).__init__(destination_address) + + self.state_SEND_POLL = 34 + self.state_WANT_FRAME_TYPE = 35 + self.state_WANT_ID1 = 36, + self.state_WANT_ID2 = 37, + self.state_WANT_DATA = 38, + self.state_WANT_CRC = 39, + +# self.START_STOP_D = 0x5E +# self.BYTESTUFF_D = 0x5D + + self.START_STOP_SPORT = 0x7E + self.BYTESTUFF_SPORT = 0x7D + self.SPORT_DATA_FRAME = 0x10 + + self.SENSOR_ID_28 = 0x1B + + self.state = self.state_WANT_FRAME_TYPE + + self.data_by_id = {} + self.bad_chars = 0 + + self.last_message_received = 0 + self.warn_delta = 0 + self.poll_sent = 0 + + self.id_descriptions = { + 0x5000: "status text (dynamic)", + 0x5006: "Attitude and range (dynamic)", + 0x800: "GPS lat (600 with 1 sensor)", + 0x800: "GPS lon (600 with 1 sensor)", + 0x5005: "Vel and Yaw", + 0x5001: "AP status", + 0x5002: "GPS Status", + 0x5004: "Home", + 0x5008: "Battery 2 status", + 0x5003: "Battery 1 status", + 0x5007: "parameters", + } + + def progress(self, message): + print("FRSkyPassthrough: %s" % message) + + def handle_data(self, dataid, value): + self.progress("%s (0x%x)=%u" % (self.id_descriptions[dataid], dataid, value)) + self.data_by_id[dataid] = value + + def read_bytestuffed_byte(self): + if ord(self.buffer[0]) == 0x7D: + # byte-stuffed + if len(self.buffer) < 2: + self.consume = 0 + return None + self.consume = 2 + if ord(self.buffer[1]) == 0x5E: + return self.START_STOP_SPORT + if ord(self.buffer[1]) == 0x5D: + return self.BYTESTUFF_SPORT + raise ValueError("Unknown stuffed byte (0x%02x)" % ord(self.buffer[1])) + return ord(self.buffer[0]) + + def calc_crc(self, byte): + self.crc += byte + self.crc += self.crc >> 8 + self.crc &= 0xFF + def update(self): if not self.connected: if not self.connect(): return - self.update_read() + self.buffer += self.do_read() + now = time.time() + tdelta = now - self.last_message_received + if now - self.poll_sent > 2: + if self.state != self.state_WANT_FRAME_TYPE: + raise ValueError("Expected to be wanting a frame type when repolling") + self.progress("Re-polling") + self.state = self.state_SEND_POLL + + if self.state == self.state_SEND_POLL: + self.progress("Sending poll") + buf = struct.pack('> VELANDYAW_YAW_OFFSET + xy_vel = value >> VELANDYAW_XYVEL_OFFSET & 0xFF + z_vel_dm_per_second = value & 0xFFFF + + gpi = self.mav.recv_match(type='GLOBAL_POSITION_INT', blocking=True, timeout=1) + if gpi is None: + return + self.progress(" yaw=%u gpi=%u" % (yaw, gpi.hdg)) + self.progress(" xy_vel=%u" % xy_vel) + self.progress(" z_vel_dm_per_second=%u" % z_vel_dm_per_second) + if int(round(yaw/10,3)) == int(round(gpi.hdg/100, 3)): + self.progress("Yaw match") + return True + # FIXME: need to be under way to check the velocities, really.... + return False + + def tfp_validate_battery1(self, value): + self.progress("validating battery1 (0x%02x)" % value) + BATT_VOLTAGE_LIMIT = 0x1FF + BATT_CURRENT_OFFSET = 9 + BATT_TOTALMAH_LIMIT = 0x7FFF + BATT_TOTALMAH_OFFSET = 17 + voltage = (value & BATT_VOLTAGE_LIMIT)/10.0 + batt = self.mav.recv_match( + type='BATTERY_STATUS', + blocking=True, + timeout=5, + condition="BATTERY_STATUS.id==0" + ) + if batt is None: + raise NotAchievedException("Did not get BATTERY_STATUS message") + battery_status_value = batt.voltages[0]/1000.0 + self.progress("BATTERY_STATUS==%f frsky==%f" % (battery_status_value, voltage)) + if abs(battery_status_value - voltage) > 0.1: + return False + return True + + def test_frsky_passthrough(self): + self.set_parameter("SERIAL5_PROTOCOL", 10) # serial5 is FRSky passthrough + self.customise_SITL_commandline([ + "--uartF=tcp:6735" # serial5 spews to localhost:6735 + ]) + frsky = FRSkyPassThrough(("127.0.0.1", 6735)) + self.wait_ready_to_arm() + self.drain_mav_unparsed() + m = self.mav.recv_match(type='GLOBAL_POSITION_INT', blocking=True, timeout=1) + if m is None: + raise NotAchievedException("Did not receive GLOBAL_POSITION_INT") + gpi_abs_alt = m.alt / 1000 # mm -> m + tstart = self.get_sim_time_cached() + # anything with a lambda in here needs a proper test written. + # This, at least makes sure we're getting some of each + # message. These are ordered according to the wfq scheduler + wants = { + 0x5000: lambda x : True, + 0x5006: lambda x : True, + 0x800: lambda x : True, + 0x5005: self.tfp_validate_vel_and_yaw, + 0x5001: lambda x : True, + 0x5002: lambda x : True, + 0x5004: lambda x : True, + # 0x5008: lambda x : True, # no second battery, so this doesn't arrive + 0x5003: self.tfp_validate_battery1, + 0x5007: lambda x : True, + } + while len(wants): + print("Still wanting (%s)" % ",".join([ ("0x%02x" % x) for x in wants.keys()])) + wants_copy = copy.copy(wants) + t2 = self.get_sim_time_cached() + if t2 - tstart > 10: + raise AutoTestTimeoutException("Failed to get frsky data") + frsky.update() + for want in wants_copy: + data = frsky.get_data(want) + if data is None: + continue + self.progress("Checking %s" % str(want)) + if wants[want](data): + self.progress(" Fulfilled") + del wants[want] + def test_frsky_d(self): self.set_parameter("SERIAL5_PROTOCOL", 3) # serial5 is FRSky output self.customise_SITL_commandline([