autotest: add trivial test for MSP

This commit is contained in:
Peter Barker 2021-11-17 10:07:15 +11:00 committed by Peter Barker
parent 02ad9879fa
commit b42432ea3e
2 changed files with 189 additions and 0 deletions

View File

@ -3442,6 +3442,10 @@ class AutoTestPlane(AutoTest):
"Test MegaSquirt EFI", "Test MegaSquirt EFI",
self.MegaSquirt), self.MegaSquirt),
("MSP_DJI",
"Test MSP DJI serial output",
self.test_msp_dji),
("LogUpload", ("LogUpload",
"Log upload", "Log upload",
self.log_upload), self.log_upload),

View File

@ -320,6 +320,165 @@ class Telem(object):
self.update_read() self.update_read()
class MSP_Generic(Telem):
def __init__(self, destination_address):
super(MSP_Generic, self).__init__(destination_address)
self.callback = None
self.STATE_IDLE = "IDLE"
self.STATE_WANT_HEADER_DOLLARS = "WANT_DOLLARS"
self.STATE_WANT_HEADER_M = "WANT_M"
self.STATE_WANT_HEADER_GT = "WANT_GT"
self.STATE_WANT_DATA_SIZE = "WANT_DATA_SIZE"
self.STATE_WANT_COMMAND = "WANT_COMMAND"
self.STATE_WANT_DATA = "WANT_DATA"
self.STATE_WANT_CHECKSUM = "WANT_CHECKSUM"
self.state = self.STATE_IDLE
def progress(self, message):
print("MSP: %s" % message)
def set_state(self, state):
# self.progress("Moving to state (%s)" % state)
self.state = state
def init_checksum(self, b):
self.checksum = 0
self.add_to_checksum(b)
def add_to_checksum(self, b):
self.checksum ^= (b & 0xFF)
def process_command(self, cmd, data):
if self.callback is not None:
self.callback(cmd, data)
else:
print("cmd=%s" % str(cmd))
def update_read(self):
for byte in self.do_read():
# print("Got (0x%02x) (%s) (%s) state=%s" % (byte, chr(byte), str(type(byte)), self.state))
if self.state == self.STATE_IDLE:
# reset state
self.set_state(self.STATE_WANT_HEADER_DOLLARS)
# deliberate fallthrough right here
if self.state == self.STATE_WANT_HEADER_DOLLARS:
if chr(byte) == '$':
self.set_state(self.STATE_WANT_HEADER_M)
continue
if self.state == self.STATE_WANT_HEADER_M:
if chr(byte) != 'M':
raise Exception("Malformed packet")
self.set_state(self.STATE_WANT_HEADER_GT)
continue
if self.state == self.STATE_WANT_HEADER_GT:
if chr(byte) != '>':
raise Exception("Malformed packet")
self.set_state(self.STATE_WANT_DATA_SIZE)
continue
if self.state == self.STATE_WANT_DATA_SIZE:
self.data_size = byte
self.set_state(self.STATE_WANT_COMMAND)
self.data = bytearray()
self.checksum = 0
self.add_to_checksum(byte)
continue
if self.state == self.STATE_WANT_COMMAND:
self.command = byte
self.add_to_checksum(byte)
if self.data_size != 0:
self.set_state(self.STATE_WANT_DATA)
else:
self.set_state(self.STATE_WANT_CHECKSUM)
continue
if self.state == self.STATE_WANT_DATA:
self.add_to_checksum(byte)
self.data.append(byte)
if len(self.data) == self.data_size:
self.set_state(self.STATE_WANT_CHECKSUM)
continue
if self.state == self.STATE_WANT_CHECKSUM:
if self.checksum != byte:
raise Exception("Checksum fail (want=0x%02x calced=0x%02x" %
(byte, self.checksum))
self.process_command(self.command, self.data)
self.set_state(self.STATE_IDLE)
class MSP_DJI(MSP_Generic):
FRAME_GPS_RAW = 106
FRAME_ATTITUDE = 108
def __init__(self, destination_address):
super(MSP_DJI, self).__init__(destination_address)
self.callback = self.command_callback
self.frames = {}
class Frame(object):
def __init__(self, data):
self.data = data
def intn(self, offset, count):
ret = 0
for i in range(offset, offset+count):
ret = ret | (ord(self.data[i]) << ((i-offset)*8))
return ret
def int32(self, offset):
t = struct.unpack("<i", self.data[offset:offset+4])
return t[0]
def int16(self, offset):
t = struct.unpack("<h", self.data[offset:offset+2])
return t[0]
class FrameATTITUDE(Frame):
def roll(self):
'''roll in degrees'''
return self.int16(0) * 10
def pitch(self):
'''pitch in degrees'''
return self.int16(2) * 10
def yaw(self):
'''yaw in degrees'''
return self.int16(4)
class FrameGPS_RAW(Frame):
'''see gps_state_s'''
def fix_type(self):
return self.uint8(0)
def num_sats(self):
return self.uint8(1)
def lat(self):
return self.int32(2) / 1e7
def lon(self):
return self.int32(6) / 1e7
def LocationInt(self):
# other fields are available, I'm just lazy
return LocationInt(self.int32(2), self.int32(6), 0, 0)
def command_callback(self, frametype, data):
# print("X: %s %s" % (str(frametype), str(data)))
if frametype == MSP_DJI.FRAME_ATTITUDE:
frame = MSP_DJI.FrameATTITUDE(data)
elif frametype == MSP_DJI.FRAME_GPS_RAW:
frame = MSP_DJI.FrameGPS_RAW(data)
else:
return
self.frames[frametype] = frame
def get_frame(self, frametype):
return self.frames[frametype]
class LTM(Telem): class LTM(Telem):
def __init__(self, destination_address): def __init__(self, destination_address):
super(LTM, self).__init__(destination_address) super(LTM, self).__init__(destination_address)
@ -10924,6 +11083,32 @@ switch value'''
self.context_pop() self.context_pop()
self.reboot_sitl() self.reboot_sitl()
def test_msp_dji(self):
self.set_parameter("SERIAL5_PROTOCOL", 33) # serial5 is MSP DJI output
self.set_parameter("MSP_OPTIONS", 1) # telemetry (unpolled) mode
self.customise_SITL_commandline([
"--uartF=tcp:6735" # serial5 spews to localhost:6735
])
msp = MSP_DJI(("127.0.0.1", 6735))
self.wait_ready_to_arm()
self.drain_mav_unparsed()
tstart = self.get_sim_time_cached()
while True:
self.drain_mav()
if self.get_sim_time_cached() - tstart > 10:
raise NotAchievedException("Did not get location")
msp.update()
self.drain_mav_unparsed(quiet=True)
try:
f = msp.get_frame(msp.FRAME_GPS_RAW)
except KeyError:
continue
dist = self.get_distance_int(f.LocationInt(), self.sim_location_int())
print("lat=%f lon=%f dist=%f" % (f.lat(), f.lon(), dist))
if dist < 1:
break
def test_crsf(self): def test_crsf(self):
self.context_push() self.context_push()
ex = None ex = None