autotest: add test for Frsky SPort

This commit is contained in:
Peter Barker 2020-01-13 11:48:01 +11:00 committed by Peter Barker
parent 280a44fc73
commit ba13f0bb29
2 changed files with 102 additions and 24 deletions

View File

@ -1642,6 +1642,10 @@ class AutoTestPlane(AutoTest):
"Test Buttons",
self.test_button),
("FRSkySPort",
"Test FrSky SPort mode",
self.test_frsky_sport),
("FRSkyPassThrough",
"Test FrSky PassThrough serial output",
self.test_frsky_passthrough),

View File

@ -311,9 +311,9 @@ class FRSkyD(FRSky):
return None
class FRSkyPassThrough(FRSky):
class FRSkySPort(FRSky):
def __init__(self, destination_address):
super(FRSkyPassThrough, self).__init__(destination_address)
super(FRSkySPort, self).__init__(destination_address)
self.state_SEND_POLL = 34
self.state_WANT_FRAME_TYPE = 35
@ -322,22 +322,21 @@ class FRSkyPassThrough(FRSky):
self.state_WANT_DATA = 38,
self.state_WANT_CRC = 39,
# self.START_STOP_D = 0x5E
# self.BYTESTUFF_D = 0x5D
self.START_STOP_SPORT = 0x7E
self.BYTESTUFF_SPORT = 0x7D
self.SPORT_DATA_FRAME = 0x10
self.SENSOR_ID_28 = 0x1B
self.SENSOR_ID_VARIO = 0x00 # Sensor ID 0
self.SENSOR_ID_FAS = 0x22 # Sensor ID 2
self.SENSOR_ID_GPS = 0x83 # Sensor ID 3
self.SENSOR_ID_SP2UR = 0xC6 # Sensor ID 6
self.SENSOR_ID_28 = 0x1B # Sensor ID 28
self.state = self.state_WANT_FRAME_TYPE
self.data_by_id = {}
self.bad_chars = 0
self.last_message_received = 0
self.warn_delta = 0
self.poll_sent = 0
self.id_descriptions = {
@ -352,10 +351,28 @@ class FRSkyPassThrough(FRSky):
0x5008: "Battery 2 status",
0x5003: "Battery 1 status",
0x5007: "parameters",
# SPort non-passthrough:
0x02: "Temp1",
0x04: "Fuel",
0x05: "Temp2",
0x10: "Baro Alt BP",
0x13: "GPS_LAT_BP",
0x1B: "GPS_LAT_AP",
0x21: "BARO_ALT_AP",
0x39: "VFAS",
}
self.sensors_to_poll = [
self.SENSOR_ID_VARIO,
self.SENSOR_ID_FAS,
self.SENSOR_ID_GPS,
self.SENSOR_ID_SP2UR,
]
self.next_sensor_id_to_poll = 0 # offset into sensors_to_poll
def progress(self, message):
print("FRSkyPassthrough: %s" % message)
print("FRSkySPort: %s" % message)
def handle_data(self, dataid, value):
self.progress("%s (0x%x)=%u" % (self.id_descriptions[dataid], dataid, value))
@ -380,13 +397,15 @@ class FRSkyPassThrough(FRSky):
self.crc += self.crc >> 8
self.crc &= 0xFF
def update(self):
if not self.connected:
if not self.connect():
return
self.buffer += self.do_read()
def next_sensor(self):
ret = self.sensors_to_poll[self.next_sensor_id_to_poll]
self.next_sensor_id_to_poll += 1
if self.next_sensor_id_to_poll >= len(self.sensors_to_poll):
self.next_sensor_id_to_poll = 0
return ret
def check_poll(self):
now = time.time()
tdelta = now - self.last_message_received
if now - self.poll_sent > 2:
if self.state != self.state_WANT_FRAME_TYPE:
raise ValueError("Expected to be wanting a frame type when repolling")
@ -394,12 +413,22 @@ class FRSkyPassThrough(FRSky):
self.state = self.state_SEND_POLL
if self.state == self.state_SEND_POLL:
self.progress("Sending poll")
buf = struct.pack('<BB', self.START_STOP_SPORT, self.SENSOR_ID_28)
sensor_id = self.next_sensor()
self.progress("Sending poll for 0x%02x" % sensor_id)
buf = struct.pack('<BB', self.START_STOP_SPORT, sensor_id)
self.port.sendall(buf)
self.state = self.state_WANT_FRAME_TYPE
self.poll_sent = now
def update(self):
if not self.connected:
if not self.connect():
return
self.check_poll()
self.do_sport_read()
def do_sport_read(self):
self.buffer += self.do_read()
self.consume = None
while len(self.buffer):
if self.consume is not None:
@ -457,7 +486,6 @@ class FRSkyPassThrough(FRSky):
# raise ValueError("Incorrect frsky checksum (want=%02x got=%02x id=0x%x)" % (crc, self.crc, dataid))
else:
self.handle_data(dataid, self.data)
self.last_message_received = now
self.state = self.state_SEND_POLL
else:
raise ValueError("Unknown state (%u)" % self.state)
@ -469,6 +497,15 @@ class FRSkyPassThrough(FRSky):
pass
return None
class FRSkyPassThrough(FRSkySPort):
def __init__(self, destination_address):
super(FRSkyPassThrough, self).__init__(destination_address)
self.sensors_to_poll = [self.SENSOR_ID_28]
def progress(self, message):
print("FRSkyPassthrough: %s" % message)
class AutoTest(ABC):
"""Base abstract class.
It implements the common function for all vehicle types.
@ -4095,11 +4132,6 @@ switch value'''
frsky = FRSkyPassThrough(("127.0.0.1", 6735))
self.wait_ready_to_arm()
self.drain_mav_unparsed()
m = self.mav.recv_match(type='GLOBAL_POSITION_INT', blocking=True, timeout=1)
if m is None:
raise NotAchievedException("Did not receive GLOBAL_POSITION_INT")
gpi_abs_alt = m.alt / 1000 # mm -> m
tstart = self.get_sim_time_cached()
# anything with a lambda in here needs a proper test written.
# This, at least makes sure we're getting some of each
# message. These are ordered according to the wfq scheduler
@ -4115,8 +4147,9 @@ switch value'''
0x5003: self.tfp_validate_battery1,
0x5007: lambda x : True,
}
tstart = self.get_sim_time_cached()
while len(wants):
print("Still wanting (%s)" % ",".join([ ("0x%02x" % x) for x in wants.keys()]))
self.progress("Still wanting (%s)" % ",".join([ ("0x%02x" % x) for x in wants.keys()]))
wants_copy = copy.copy(wants)
t2 = self.get_sim_time_cached()
if t2 - tstart > 10:
@ -4131,6 +4164,47 @@ switch value'''
self.progress(" Fulfilled")
del wants[want]
def test_frsky_sport(self):
self.set_parameter("SERIAL5_PROTOCOL", 4) # serial5 is FRSky sport
self.customise_SITL_commandline([
"--uartF=tcp:6735" # serial5 spews to localhost:6735
])
frsky = FRSkySPort(("127.0.0.1", 6735))
self.wait_ready_to_arm()
self.drain_mav_unparsed()
# anything with a lambda in here needs a proper test written.
# This, at least makes sure we're getting some of each
# message.
wants = {
0x02: lambda x : True,
0x04: lambda x : True,
0x05: lambda x : True,
0x10: lambda x : True,
0x13: lambda x : True,
0x1B: lambda x : True,
0x21: lambda x : True,
0x39: lambda x : True,
}
tstart = self.get_sim_time_cached()
last_wanting_print = 0
while len(wants):
now = self.get_sim_time()
if now - last_wanting_print > 1:
self.progress("Still wanting (%s)" % ",".join([ ("0x%02x" % x) for x in wants.keys()]))
last_wanting_print = now
wants_copy = copy.copy(wants)
if now - tstart > 10:
raise AutoTestTimeoutException("Failed to get frsky data")
frsky.update()
for want in wants_copy:
data = frsky.get_data(want)
if data is None:
continue
self.progress("Checking %s" % str(want))
if wants[want](data):
self.progress(" Fulfilled")
del wants[want]
def test_frsky_d(self):
self.set_parameter("SERIAL5_PROTOCOL", 3) # serial5 is FRSky output
self.customise_SITL_commandline([