mirror of
https://github.com/ArduPilot/ardupilot
synced 2025-02-22 07:44:03 -04:00
autotest: test FRSky serial output by taking output via TCP
This commit is contained in:
parent
3074855ea3
commit
1cfb4e4a49
@ -1642,6 +1642,10 @@ class AutoTestPlane(AutoTest):
|
||||
"Test Buttons",
|
||||
self.test_button),
|
||||
|
||||
("FRSkyD",
|
||||
"Test FrSkyD serial output",
|
||||
self.test_frsky_d),
|
||||
|
||||
("AdvancedFailsafe",
|
||||
"Test Advanced Failsafe",
|
||||
self.test_advanced_failsafe),
|
||||
|
@ -1,6 +1,7 @@
|
||||
from __future__ import print_function
|
||||
|
||||
import abc
|
||||
import errno
|
||||
import math
|
||||
import os
|
||||
import re
|
||||
@ -12,6 +13,7 @@ import pexpect
|
||||
import fnmatch
|
||||
import operator
|
||||
import numpy
|
||||
import socket
|
||||
|
||||
from MAVProxy.modules.lib import mp_util
|
||||
|
||||
@ -162,6 +164,145 @@ class MAVProxyLogFile(object):
|
||||
else:
|
||||
sys.stdout.flush()
|
||||
|
||||
class FRSkyD(object):
|
||||
def __init__(self, destination_address):
|
||||
self.destination_address = destination_address
|
||||
|
||||
self.state_WANT_START_STOP_D = 16,
|
||||
self.state_WANT_ID = 17,
|
||||
self.state_WANT_BYTE1 = 18,
|
||||
self.state_WANT_BYTE2 = 19,
|
||||
|
||||
self.START_STOP_D = 0x5E
|
||||
self.BYTESTUFF_D = 0x5D
|
||||
|
||||
self.dataid_GPS_ALT_BP = 0x01
|
||||
self.dataid_TEMP1 = 0x02
|
||||
self.dataid_FUEL = 0x04
|
||||
self.dataid_TEMP2 = 0x05
|
||||
self.dataid_GPS_ALT_AP = 0x09
|
||||
self.dataid_BARO_ALT_BP = 0x10
|
||||
self.dataid_GPS_SPEED_BP = 0x11
|
||||
self.dataid_GPS_LONG_BP = 0x12
|
||||
self.dataid_GPS_LAT_BP = 0x13
|
||||
self.dataid_GPS_COURS_BP = 0x14
|
||||
self.dataid_GPS_SPEED_AP = 0x19
|
||||
self.dataid_GPS_LONG_AP = 0x1A
|
||||
self.dataid_GPS_LAT_AP = 0x1B
|
||||
self.dataid_BARO_ALT_AP = 0x21
|
||||
self.dataid_GPS_LONG_EW = 0x22
|
||||
self.dataid_GPS_LAT_NS = 0x23
|
||||
self.dataid_CURRENT = 0x28
|
||||
self.dataid_VFAS = 0x39
|
||||
|
||||
self.state = self.state_WANT_START_STOP_D
|
||||
|
||||
self.buffer = ""
|
||||
self.connected = False
|
||||
self.data_by_id = {}
|
||||
self.port = None
|
||||
self.bad_chars = 0
|
||||
|
||||
def progress(self, message):
|
||||
print("FRSky: %s" % message)
|
||||
|
||||
def connect(self):
|
||||
try:
|
||||
self.connected = False
|
||||
self.progress("Connecting to (%s:%u)" % self.destination_address)
|
||||
if self.port is not None:
|
||||
try:
|
||||
self.port.close() # might be reopening
|
||||
except Exception as e:
|
||||
pass
|
||||
self.port = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
self.port.connect(self.destination_address)
|
||||
self.port.setblocking(0)
|
||||
self.port.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1)
|
||||
self.connected = True
|
||||
self.progress("Connected")
|
||||
except IOError as e:
|
||||
self.progress("Failed to connect: %s" % str(e))
|
||||
time.sleep(0.5)
|
||||
return False
|
||||
return True
|
||||
|
||||
def do_read(self):
|
||||
try:
|
||||
data = self.port.recv(1024)
|
||||
except socket.error as e:
|
||||
if e.errno not in [ errno.EAGAIN, errno.EWOULDBLOCK ]:
|
||||
self.progress("Exception: %s" % str(e))
|
||||
self.connected = False
|
||||
return ""
|
||||
if len(data) == 0:
|
||||
self.progress("EOF")
|
||||
self.connected = False
|
||||
return ""
|
||||
# self.progress("Read %u bytes" % len(data))
|
||||
return data
|
||||
|
||||
def handle_data(self, dataid, value):
|
||||
self.progress("%u=%u" % (dataid, value))
|
||||
self.data_by_id[dataid] = value
|
||||
|
||||
def update_read(self):
|
||||
self.buffer += self.do_read()
|
||||
consume = None
|
||||
while len(self.buffer):
|
||||
if consume is not None:
|
||||
self.buffer = self.buffer[consume:]
|
||||
if len(self.buffer) == 0:
|
||||
break
|
||||
consume = 1
|
||||
b = ord(self.buffer[0])
|
||||
if self.state == self.state_WANT_START_STOP_D:
|
||||
if b != self.START_STOP_D:
|
||||
# we may come into a stream mid-way, so we can't judge
|
||||
self.bad_chars += 1
|
||||
continue
|
||||
self.state = self.state_WANT_ID
|
||||
continue
|
||||
elif self.state == self.state_WANT_ID:
|
||||
self.dataid = b
|
||||
self.state = self.state_WANT_BYTE1
|
||||
continue
|
||||
elif self.state in [self.state_WANT_BYTE1, self.state_WANT_BYTE2]:
|
||||
if b == 0x5D:
|
||||
# byte-stuffed
|
||||
if len(self.buffer) < 2:
|
||||
# try again in a little while
|
||||
consume = 0
|
||||
return
|
||||
if ord(self.buffer[1]) == 0x3E:
|
||||
b = self.START_STOP_D
|
||||
elif ord(self.buffer[1]) == 0x3D:
|
||||
b = self.BYTESTUFF_D;
|
||||
else:
|
||||
raise ValueError("Unknown stuffed byte")
|
||||
consume = 2
|
||||
if self.state == self.state_WANT_BYTE1:
|
||||
self.b1 = b
|
||||
self.state = self.state_WANT_BYTE2
|
||||
continue
|
||||
|
||||
data = self.b1 | b << 8
|
||||
self.handle_data(self.dataid, data)
|
||||
self.state = self.state_WANT_START_STOP_D
|
||||
|
||||
def get_data(self, dataid):
|
||||
try:
|
||||
return self.data_by_id[dataid]
|
||||
except KeyError as e:
|
||||
pass
|
||||
return None
|
||||
|
||||
def update(self):
|
||||
if not self.connected:
|
||||
if not self.connect():
|
||||
return
|
||||
self.update_read()
|
||||
|
||||
class AutoTest(ABC):
|
||||
"""Base abstract class.
|
||||
It implements the common function for all vehicle types.
|
||||
@ -3738,6 +3879,32 @@ switch value'''
|
||||
if m3.state != 0:
|
||||
raise NotAchievedException("Didn't get expected mask back in message (mask=0 state=%u" % (m3.state))
|
||||
|
||||
def test_frsky_d(self):
|
||||
self.set_parameter("SERIAL5_PROTOCOL", 3) # serial5 is FRSky output
|
||||
self.customise_SITL_commandline([
|
||||
"--uartF=tcp:6735" # serial5 spews to localhost:6735
|
||||
])
|
||||
frsky = FRSkyD(("127.0.0.1", 6735))
|
||||
self.wait_ready_to_arm()
|
||||
self.drain_mav_unparsed()
|
||||
m = self.mav.recv_match(type='GLOBAL_POSITION_INT', blocking=True, timeout=1)
|
||||
if m is None:
|
||||
raise NotAchievedException("Did not receive GLOBAL_POSITION_INT")
|
||||
gpi_abs_alt = m.alt / 1000 # mm -> m
|
||||
tstart = self.get_sim_time_cached()
|
||||
while True:
|
||||
t2 = self.get_sim_time_cached()
|
||||
if t2 - tstart > 10:
|
||||
raise AutoTestTimeoutException("Failed to get frsky data")
|
||||
frsky.update()
|
||||
alt = frsky.get_data(frsky.dataid_GPS_ALT_BP)
|
||||
self.progress("Got alt (%s)" % str(alt))
|
||||
if alt is None:
|
||||
continue
|
||||
self.drain_mav_unparsed()
|
||||
if alt == gpi_abs_alt:
|
||||
break
|
||||
|
||||
def tests(self):
|
||||
return [
|
||||
("PIDTuning",
|
||||
|
Loading…
Reference in New Issue
Block a user