autotest: add tests for iBus telemetry

This commit is contained in:
Peter Barker 2021-02-08 10:52:35 +11:00 committed by Peter Barker
parent 602f5bc61c
commit 61196b7d04

View File

@ -341,7 +341,129 @@ class Telem(object):
if not self.connected:
if not self.connect():
return
self.update_read()
return self.update_read()
class IBusMessage(object):
def checksum_bytes(self, out):
checksum = 0xFFFF
for b in iter(out):
checksum -= b
return checksum
class IBusResponse(IBusMessage):
def __init__(self, some_bytes):
self.len = some_bytes[0]
checksum = self.checksum_bytes(some_bytes[:self.len-2])
if checksum >> 8 != some_bytes[self.len-1]:
raise ValueError("Checksum bad (-1)")
if checksum & 0xff != some_bytes[self.len-2]:
raise ValueError("Checksum bad (-2)")
self.address = some_bytes[1] & 0x0F
self.handle_payload_bytes(some_bytes[2:self.len-2])
class IBusResponse_DISCOVER(IBusResponse):
def handle_payload_bytes(self, payload_bytes):
if len(payload_bytes):
raise ValueError("Not expecting payload bytes (%u)" %
(len(payload_bytes), ))
class IBusResponse_GET_SENSOR_TYPE(IBusResponse):
def handle_payload_bytes(self, payload_bytes):
if len(payload_bytes) != 2:
raise ValueError("Expected 2 payload bytes")
self.sensor_type = payload_bytes[0]
self.sensor_length = payload_bytes[1]
class IBusResponse_GET_SENSOR_VALUE(IBusResponse):
def handle_payload_bytes(self, payload_bytes):
self.sensor_value = payload_bytes
def get_sensor_value(self):
'''returns an integer based off content'''
ret = 0
for i in range(len(self.sensor_value)):
x = self.sensor_value[i]
if sys.version_info.major < 3:
x = ord(x)
ret = ret | (x << (i*8))
return ret
class IBusRequest(IBusMessage):
def __init__(self, command, address):
self.command = command
self.address = address
def payload_bytes(self):
'''most requests don't have a payload'''
return bytearray()
def for_wire(self):
out = bytearray()
payload_bytes = self.payload_bytes()
payload_length = len(payload_bytes)
length = 1 + 1 + payload_length + 2 # len+cmd|adr+payloadlen+cksum
format_string = '<BB' + ('B' * payload_length)
out.extend(struct.pack(format_string,
length,
(self.command << 4) | self.address,
*payload_bytes,
))
checksum = self.checksum_bytes(out)
out.extend(struct.pack("<BB", checksum & 0xff, checksum >> 8))
return out
class IBusRequest_DISCOVER(IBusRequest):
def __init__(self, address):
super(IBusRequest_DISCOVER, self).__init__(0x08, address)
class IBusRequest_GET_SENSOR_TYPE(IBusRequest):
def __init__(self, address):
super(IBusRequest_GET_SENSOR_TYPE, self).__init__(0x09, address)
class IBusRequest_GET_SENSOR_VALUE(IBusRequest):
def __init__(self, address):
super(IBusRequest_GET_SENSOR_VALUE, self).__init__(0x0A, address)
class IBus(Telem):
def __init__(self, destination_address):
super(IBus, self).__init__(destination_address)
def progress_tag(self):
return "IBus"
def packet_from_buffer(self, buffer):
t = buffer[1] >> 4
if sys.version_info.major < 3:
t = ord(t)
if t == 0x08:
return IBusResponse_DISCOVER(buffer)
if t == 0x09:
return IBusResponse_GET_SENSOR_TYPE(buffer)
if t == 0x0A:
return IBusResponse_GET_SENSOR_VALUE(buffer)
raise ValueError("Unknown response type (%u)" % t)
def update_read(self):
self.buffer += self.do_read()
while len(self.buffer):
msglen = self.buffer[0]
if sys.version_info.major < 3:
msglen = ord(msglen)
if len(self.buffer) < msglen:
return
packet = self.packet_from_buffer(self.buffer[:msglen])
self.buffer = self.buffer[msglen:]
return packet
class WaitAndMaintain(object):
@ -14349,6 +14471,115 @@ SERIAL5_BAUD 128
self._MotorTest(self.run_cmd, **kwargs)
self._MotorTest(self.run_cmd_int, **kwargs)
def test_ibus_voltage(self, message):
batt = self.mav.recv_match(
type='BATTERY_STATUS',
blocking=True,
timeout=5,
condition="BATTERY_STATUS.id==0"
)
if batt is None:
raise NotAchievedException("Did not get BATTERY_STATUS message")
want = int(batt.voltages[0] * 0.1)
if want != message.get_sensor_value():
raise NotAchievedException("Bad voltage (want=%u got=%u)" %
(want, message.get_sensor_value()))
self.progress("iBus voltage OK")
def test_ibus_armed(self, message):
got = message.get_sensor_value()
want = 1 if self.armed() else 0
if got != want:
raise NotAchievedException("Expected armed %u got %u" %
(want, got))
self.progress("iBus armed OK")
def test_ibus_mode(self, message):
got = message.get_sensor_value()
want = self.mav.messages['HEARTBEAT'].custom_mode
if got != want:
raise NotAchievedException("Expected mode %u got %u" %
(want, got))
self.progress("iBus mode OK")
def test_ibus_get_response(self, ibus, timeout=5):
tstart = self.get_sim_time()
while True:
now = self.get_sim_time()
if now - tstart > timeout:
raise AutoTestTimeoutException("Failed to get ibus data")
packet = ibus.update()
if packet is not None:
return packet
def IBus(self):
'''test the IBus protocol'''
self.set_parameter("SERIAL5_PROTOCOL", 49)
self.customise_SITL_commandline([
"--serial5=tcp:6735" # serial5 spews to localhost:6735
])
ibus = IBus(("127.0.0.1", 6735))
ibus.connect()
# expected_sensors should match the list created in AP_IBus_Telem
expected_sensors = {
# sensor id : (len, IBUS_MEAS_TYPE_*, test_function)
1: (2, 0x15, self.test_ibus_armed),
2: (2, 0x16, self.test_ibus_mode),
5: (2, 0x03, self.test_ibus_voltage),
}
for (sensor_addr, results) in expected_sensors.items():
# first make sure it is present:
request = IBusRequest_DISCOVER(sensor_addr)
ibus.port.sendall(request.for_wire())
packet = self.test_ibus_get_response(ibus)
if packet.address != sensor_addr:
raise ValueError("Unexpected sensor address %u" %
(packet.address,))
(expected_length, expected_type, validator) = results
self.progress("Getting sensor (%x) type" % (sensor_addr))
request = IBusRequest_GET_SENSOR_TYPE(sensor_addr)
ibus.port.sendall(request.for_wire())
packet = self.test_ibus_get_response(ibus)
if packet.address != sensor_addr:
raise ValueError("Unexpected sensor address %u" %
(packet.address,))
if packet.sensor_type != expected_type:
raise ValueError("Unexpected sensor type want=%u got=%u" %
(expected_type, packet.sensor_type))
if packet.sensor_length != expected_length:
raise ValueError("Unexpected sensor len want=%u got=%u" %
(expected_length, packet.sensor_length))
self.progress("Getting sensor (%x) value" % (sensor_addr))
request = IBusRequest_GET_SENSOR_VALUE(sensor_addr)
ibus.port.sendall(request.for_wire())
packet = self.test_ibus_get_response(ibus)
validator(packet)
# self.progress("Ensure we cover all sensors")
# for i in range(1, 17): # zero is special
# if i in expected_sensors:
# continue
# request = IBusRequest_DISCOVER(i)
# ibus.port.sendall(request.for_wire())
# try:
# packet = self.test_ibus_get_response(ibus, timeout=1)
# except AutoTestTimeoutException:
# continue
# self.progress("Received packet (%s)" % str(packet))
# raise NotAchievedException("IBus sensor %u is untested" % i)
def tests(self):
return [
self.PIDTuning,