diff --git a/Tools/autotest/arduplane.py b/Tools/autotest/arduplane.py index 3661ef1558..e9d6ab2828 100644 --- a/Tools/autotest/arduplane.py +++ b/Tools/autotest/arduplane.py @@ -3442,6 +3442,10 @@ class AutoTestPlane(AutoTest): "Test MegaSquirt EFI", self.MegaSquirt), + ("MSP_DJI", + "Test MSP DJI serial output", + self.test_msp_dji), + ("LogUpload", "Log upload", self.log_upload), diff --git a/Tools/autotest/common.py b/Tools/autotest/common.py index cd941c618c..d2b8dd9f17 100644 --- a/Tools/autotest/common.py +++ b/Tools/autotest/common.py @@ -320,6 +320,165 @@ class Telem(object): self.update_read() +class MSP_Generic(Telem): + def __init__(self, destination_address): + super(MSP_Generic, self).__init__(destination_address) + + self.callback = None + + self.STATE_IDLE = "IDLE" + self.STATE_WANT_HEADER_DOLLARS = "WANT_DOLLARS" + self.STATE_WANT_HEADER_M = "WANT_M" + self.STATE_WANT_HEADER_GT = "WANT_GT" + self.STATE_WANT_DATA_SIZE = "WANT_DATA_SIZE" + self.STATE_WANT_COMMAND = "WANT_COMMAND" + self.STATE_WANT_DATA = "WANT_DATA" + self.STATE_WANT_CHECKSUM = "WANT_CHECKSUM" + + self.state = self.STATE_IDLE + + def progress(self, message): + print("MSP: %s" % message) + + def set_state(self, state): + # self.progress("Moving to state (%s)" % state) + self.state = state + + def init_checksum(self, b): + self.checksum = 0 + self.add_to_checksum(b) + + def add_to_checksum(self, b): + self.checksum ^= (b & 0xFF) + + def process_command(self, cmd, data): + if self.callback is not None: + self.callback(cmd, data) + else: + print("cmd=%s" % str(cmd)) + + def update_read(self): + for byte in self.do_read(): + # print("Got (0x%02x) (%s) (%s) state=%s" % (byte, chr(byte), str(type(byte)), self.state)) + if self.state == self.STATE_IDLE: + # reset state + self.set_state(self.STATE_WANT_HEADER_DOLLARS) + # deliberate fallthrough right here + if self.state == self.STATE_WANT_HEADER_DOLLARS: + if chr(byte) == '$': + self.set_state(self.STATE_WANT_HEADER_M) + continue + if self.state == self.STATE_WANT_HEADER_M: + if chr(byte) != 'M': + raise Exception("Malformed packet") + self.set_state(self.STATE_WANT_HEADER_GT) + continue + if self.state == self.STATE_WANT_HEADER_GT: + if chr(byte) != '>': + raise Exception("Malformed packet") + self.set_state(self.STATE_WANT_DATA_SIZE) + continue + if self.state == self.STATE_WANT_DATA_SIZE: + self.data_size = byte + self.set_state(self.STATE_WANT_COMMAND) + self.data = bytearray() + self.checksum = 0 + self.add_to_checksum(byte) + continue + if self.state == self.STATE_WANT_COMMAND: + self.command = byte + self.add_to_checksum(byte) + if self.data_size != 0: + self.set_state(self.STATE_WANT_DATA) + else: + self.set_state(self.STATE_WANT_CHECKSUM) + continue + if self.state == self.STATE_WANT_DATA: + self.add_to_checksum(byte) + self.data.append(byte) + if len(self.data) == self.data_size: + self.set_state(self.STATE_WANT_CHECKSUM) + continue + if self.state == self.STATE_WANT_CHECKSUM: + if self.checksum != byte: + raise Exception("Checksum fail (want=0x%02x calced=0x%02x" % + (byte, self.checksum)) + self.process_command(self.command, self.data) + self.set_state(self.STATE_IDLE) + + +class MSP_DJI(MSP_Generic): + FRAME_GPS_RAW = 106 + FRAME_ATTITUDE = 108 + + def __init__(self, destination_address): + super(MSP_DJI, self).__init__(destination_address) + self.callback = self.command_callback + self.frames = {} + + class Frame(object): + def __init__(self, data): + self.data = data + + def intn(self, offset, count): + ret = 0 + for i in range(offset, offset+count): + ret = ret | (ord(self.data[i]) << ((i-offset)*8)) + return ret + + def int32(self, offset): + t = struct.unpack(" 10: + raise NotAchievedException("Did not get location") + msp.update() + self.drain_mav_unparsed(quiet=True) + try: + f = msp.get_frame(msp.FRAME_GPS_RAW) + except KeyError: + continue + dist = self.get_distance_int(f.LocationInt(), self.sim_location_int()) + print("lat=%f lon=%f dist=%f" % (f.lat(), f.lon(), dist)) + if dist < 1: + break + def test_crsf(self): self.context_push() ex = None