Tools: add autotest for DEVO telemetry

This adds an autotest to test if we receive data with devo telemetry correctly.
This commit is contained in:
Shiv Tyagi 2021-11-11 22:00:22 +05:30 committed by Peter Barker
parent a747a4a19a
commit 5811a62bc6
2 changed files with 156 additions and 0 deletions

View File

@ -3322,6 +3322,10 @@ class AutoTestPlane(AutoTest):
"Test LTM serial output",
self.test_ltm),
("DEVO",
"Test DEVO serial output",
self.DEVO),
("AdvancedFailsafe",
"Test Advanced Failsafe",
self.test_advanced_failsafe),

View File

@ -521,6 +521,82 @@ class CRSF(Telem):
return "CRSF"
class DEVO(Telem):
def __init__(self, destination_address):
super(DEVO, self).__init__(destination_address)
self.HEADER = 0xAA
self.frame_length = 20
self.frame = None
self.bad_chars = 0
def progress_tag(self):
return "DEVO"
def consume_frame(self):
# check frame checksum
checksum = 0
for c in self.buffer[:self.frame_length-1]:
if sys.version_info.major < 3:
c = ord(c)
checksum += c
checksum &= 0xff # since we receive 8 bit checksum
buffer_checksum = self.buffer[self.frame_length-1]
if sys.version_info.major < 3:
buffer_checksum = ord(buffer_checksum)
if checksum != buffer_checksum:
raise NotAchievedException("Invalid checksum")
class FRAME(object):
def __init__(self, buffer):
self.buffer = buffer
def int32(self, offset):
t = struct.unpack("<i", self.buffer[offset:offset+4])
return t[0]
def int16(self, offset):
t = struct.unpack("<h", self.buffer[offset:offset+2])
return t[0]
def lon(self):
return self.int32(1)
def lat(self):
return self.int32(5)
def alt(self):
return self.int32(9)
def speed(self):
return self.int16(13)
def temp(self):
return self.int16(15)
def volt(self):
return self.int16(17)
self.frame = FRAME(self.buffer[0:self.frame_length-1])
self.buffer = self.buffer[self.frame_length:]
def update_read(self):
self.buffer += self.do_read()
while len(self.buffer):
if len(self.buffer) == 0:
break
b0 = self.buffer[0]
if sys.version_info.major < 3:
b0 = ord(b0)
if b0 != self.HEADER:
self.bad_chars += 1
self.buffer = self.buffer[1:]
continue
if len(self.buffer) < self.frame_length:
continue
self.consume_frame()
class FRSky(Telem):
def __init__(self, destination_address, verbose=False):
super(FRSky, self).__init__(destination_address, verbose=verbose)
@ -10772,6 +10848,82 @@ switch value'''
self.progress(" Fulfilled")
del wants[want]
def convertDmsToDdFormat(self, dms):
deg = math.trunc(dms * 1e-7)
dd = deg + (((dms * 1.0e-7) - deg) * 100.0 / 60.0)
if dd < -180.0 or dd > 180.0:
dd = 0.0
return math.trunc(dd * 1.0e7)
def DEVO(self):
self.context_push()
self.set_parameter("SERIAL5_PROTOCOL", 17) # serial5 is DEVO output
self.customise_SITL_commandline([
"--uartF=tcp:6735" # serial5 spews to localhost:6735
])
devo = DEVO(("127.0.0.1", 6735))
self.wait_ready_to_arm()
self.drain_mav_unparsed()
m = self.mav.recv_match(type='GLOBAL_POSITION_INT', blocking=True, timeout=1)
if m is None:
raise NotAchievedException("Did not receive GLOBAL_POSITION_INT")
tstart = self.get_sim_time_cached()
while True:
now = self.get_sim_time_cached()
if now - tstart > 10:
raise AutoTestTimeoutException("Failed to get devo data")
devo.update()
frame = devo.frame
if frame is None:
self.progress("No data received")
continue
m = self.mav.recv_match(type='GLOBAL_POSITION_INT', blocking=True)
loc = LocationInt(self.convertDmsToDdFormat(frame.lat()), self.convertDmsToDdFormat(frame.lon()), 0, 0)
print("received lat:%s expected lat:%s" % (str(loc.lat), str(m.lat)))
print("received lon:%s expected lon:%s" % (str(loc.lon), str(m.lon)))
dist_diff = self.get_distance_int(loc, m)
print("Distance:%s" % str(dist_diff))
if abs(dist_diff) > 2:
continue
gpi_rel_alt = int(m.relative_alt / 10) # mm -> cm, since driver send alt in cm
print("received alt:%s expected alt:%s" % (str(frame.alt()), str(gpi_rel_alt)))
if abs(gpi_rel_alt - frame.alt()) > 10:
continue
print("received gndspeed: %s" % str(frame.speed()))
if frame.speed() != 0:
# FIXME if we start the vehicle moving.... check against VFR_HUD?
continue
print("received temp:%s expected temp:%s" % (str(frame.temp()), str(self.mav.messages['HEARTBEAT'].custom_mode)))
if frame.temp() != self.mav.messages['HEARTBEAT'].custom_mode:
# currently we receive mode as temp. This should be fixed when driver is updated
continue
# we match the received voltage with the voltage of primary instance
batt = self.mav.recv_match(
type='BATTERY_STATUS',
blocking=True,
timeout=5,
condition="BATTERY_STATUS.id==0"
)
if batt is None:
raise NotAchievedException("Did not get BATTERY_STATUS message")
volt = batt.voltages[0]*0.001
print("received voltage:%s expected voltage:%s" % (str(frame.volt()*0.1), str(volt)))
if abs(frame.volt()*0.1 - volt) > 0.1:
continue
# if we reach here, exit
break
self.context_pop()
self.reboot_sitl()
def test_crsf(self):
self.context_push()
ex = None