autotest: add trivial test for CRSF in copter

This commit is contained in:
Peter Barker 2020-06-06 10:24:59 +10:00 committed by Andrew Tridgell
parent 2bae080be4
commit 6eb1d289b8
2 changed files with 70 additions and 0 deletions

View File

@ -5239,6 +5239,10 @@ class AutoTestCopter(AutoTest):
"Test SITL onboard compass calibration",
self.test_mag_calibration),
("CRSF",
"Test RC CRSF",
self.test_crsf),
("LogUpload",
"Log upload",
self.log_upload),

View File

@ -212,6 +212,17 @@ class Telem(object):
# self.progress("Read %u bytes" % len(data))
return data
def do_write(self, some_bytes):
try:
written = self.port.send(some_bytes)
except socket.error as e:
if e.errno in [ errno.EAGAIN, errno.EWOULDBLOCK ]:
return 0
self.progress("Exception: %s" % str(e))
raise
if written != len(some_bytes):
raise ValueError("Short write")
def update(self):
if not self.connected:
if not self.connect():
@ -384,6 +395,26 @@ class LTM(Telem):
pass
return None
class CRSF(Telem):
def __init__(self, destination_address):
super(CRSF, self).__init__(destination_address)
self.dataid_vtx_frame = 0
self.dataid_vtx_telem = 1
self.dataid_vtx_unknown = 2
self.data_id_map = {
self.dataid_vtx_frame: bytearray([0xC8, 0x8, 0xF, 0xCE, 0x30, 0x8, 0x16, 0xE9, 0x0, 0x5F]),
self.dataid_vtx_telem: bytearray([0xC8, 0x7, 0x10, 0xCE, 0xE, 0x16, 0x65, 0x0, 0x1B]),
self.dataid_vtx_unknown: bytearray([0xC8, 0x9, 0x8, 0x0, 0x9E, 0x0, 0x0, 0x0, 0x0, 0x0, 0x95]),
}
def write_data_id(self, dataid):
self.do_write(self.data_id_map[dataid])
def progress(self, message):
print("CRSF: %s" % message)
class FRSky(Telem):
def __init__(self, destination_address):
super(FRSky, self).__init__(destination_address)
@ -6741,6 +6772,41 @@ switch value'''
self.progress(" Fulfilled")
del wants[want]
def test_crsf(self):
self.context_push()
ex = None
try:
self.set_parameter("SERIAL5_PROTOCOL", 23) # serial5 is RCIN input
self.customise_SITL_commandline([
"--uartF=tcp:6735" # serial5 reads from to localhost:6735
])
crsf = CRSF(("127.0.0.1", 6735))
crsf.connect()
self.wait_ready_to_arm()
self.drain_mav_unparsed()
prev = self.get_parameter("LOG_BITMASK")
self.progress("Writing vtx_frame")
crsf.write_data_id(crsf.dataid_vtx_frame)
self.delay_sim_time(5)
self.progress("Writing vtx_telem")
crsf.write_data_id(crsf.dataid_vtx_telem)
self.delay_sim_time(5)
self.progress("Writing vtx_unknown")
crsf.write_data_id(crsf.dataid_vtx_unknown)
self.delay_sim_time(5)
except Exception as e:
self.progress("Caught exception: %s" %
self.get_exception_stacktrace(e))
ex = e
self.context_pop()
self.disarm_vehicle(force=True)
self.reboot_sitl()
if ex is not None:
raise ex
def tests(self):
return [
("PIDTuning",