diff --git a/Tools/autotest/arducopter.py b/Tools/autotest/arducopter.py index 52ca115901..aff4c48de6 100644 --- a/Tools/autotest/arducopter.py +++ b/Tools/autotest/arducopter.py @@ -5239,6 +5239,10 @@ class AutoTestCopter(AutoTest): "Test SITL onboard compass calibration", self.test_mag_calibration), + ("CRSF", + "Test RC CRSF", + self.test_crsf), + ("LogUpload", "Log upload", self.log_upload), diff --git a/Tools/autotest/common.py b/Tools/autotest/common.py index 417db0bf68..fcf731d072 100644 --- a/Tools/autotest/common.py +++ b/Tools/autotest/common.py @@ -212,6 +212,17 @@ class Telem(object): # self.progress("Read %u bytes" % len(data)) return data + def do_write(self, some_bytes): + try: + written = self.port.send(some_bytes) + except socket.error as e: + if e.errno in [ errno.EAGAIN, errno.EWOULDBLOCK ]: + return 0 + self.progress("Exception: %s" % str(e)) + raise + if written != len(some_bytes): + raise ValueError("Short write") + def update(self): if not self.connected: if not self.connect(): @@ -384,6 +395,26 @@ class LTM(Telem): pass return None +class CRSF(Telem): + def __init__(self, destination_address): + super(CRSF, self).__init__(destination_address) + + self.dataid_vtx_frame = 0 + self.dataid_vtx_telem = 1 + self.dataid_vtx_unknown = 2 + + self.data_id_map = { + self.dataid_vtx_frame: bytearray([0xC8, 0x8, 0xF, 0xCE, 0x30, 0x8, 0x16, 0xE9, 0x0, 0x5F]), + self.dataid_vtx_telem: bytearray([0xC8, 0x7, 0x10, 0xCE, 0xE, 0x16, 0x65, 0x0, 0x1B]), + self.dataid_vtx_unknown: bytearray([0xC8, 0x9, 0x8, 0x0, 0x9E, 0x0, 0x0, 0x0, 0x0, 0x0, 0x95]), + } + + def write_data_id(self, dataid): + self.do_write(self.data_id_map[dataid]) + + def progress(self, message): + print("CRSF: %s" % message) + class FRSky(Telem): def __init__(self, destination_address): super(FRSky, self).__init__(destination_address) @@ -6741,6 +6772,41 @@ switch value''' self.progress(" Fulfilled") del wants[want] + + + def test_crsf(self): + self.context_push() + ex = None + try: + self.set_parameter("SERIAL5_PROTOCOL", 23) # serial5 is RCIN input + self.customise_SITL_commandline([ + "--uartF=tcp:6735" # serial5 reads from to localhost:6735 + ]) + crsf = CRSF(("127.0.0.1", 6735)) + crsf.connect() + self.wait_ready_to_arm() + self.drain_mav_unparsed() + + prev = self.get_parameter("LOG_BITMASK") + self.progress("Writing vtx_frame") + crsf.write_data_id(crsf.dataid_vtx_frame) + self.delay_sim_time(5) + self.progress("Writing vtx_telem") + crsf.write_data_id(crsf.dataid_vtx_telem) + self.delay_sim_time(5) + self.progress("Writing vtx_unknown") + crsf.write_data_id(crsf.dataid_vtx_unknown) + self.delay_sim_time(5) + except Exception as e: + self.progress("Caught exception: %s" % + self.get_exception_stacktrace(e)) + ex = e + self.context_pop() + self.disarm_vehicle(force=True) + self.reboot_sitl() + if ex is not None: + raise ex + def tests(self): return [ ("PIDTuning",