From 61196b7d04f4d097f1119e7c1d919194971543c2 Mon Sep 17 00:00:00 2001 From: Peter Barker Date: Mon, 8 Feb 2021 10:52:35 +1100 Subject: [PATCH] autotest: add tests for iBus telemetry --- Tools/autotest/vehicle_test_suite.py | 233 ++++++++++++++++++++++++++- 1 file changed, 232 insertions(+), 1 deletion(-) diff --git a/Tools/autotest/vehicle_test_suite.py b/Tools/autotest/vehicle_test_suite.py index 7d2d6055ba..a596dc2a37 100644 --- a/Tools/autotest/vehicle_test_suite.py +++ b/Tools/autotest/vehicle_test_suite.py @@ -341,7 +341,129 @@ class Telem(object): if not self.connected: if not self.connect(): return - self.update_read() + return self.update_read() + + +class IBusMessage(object): + def checksum_bytes(self, out): + checksum = 0xFFFF + for b in iter(out): + checksum -= b + return checksum + + +class IBusResponse(IBusMessage): + def __init__(self, some_bytes): + self.len = some_bytes[0] + checksum = self.checksum_bytes(some_bytes[:self.len-2]) + if checksum >> 8 != some_bytes[self.len-1]: + raise ValueError("Checksum bad (-1)") + if checksum & 0xff != some_bytes[self.len-2]: + raise ValueError("Checksum bad (-2)") + self.address = some_bytes[1] & 0x0F + self.handle_payload_bytes(some_bytes[2:self.len-2]) + + +class IBusResponse_DISCOVER(IBusResponse): + def handle_payload_bytes(self, payload_bytes): + if len(payload_bytes): + raise ValueError("Not expecting payload bytes (%u)" % + (len(payload_bytes), )) + + +class IBusResponse_GET_SENSOR_TYPE(IBusResponse): + def handle_payload_bytes(self, payload_bytes): + if len(payload_bytes) != 2: + raise ValueError("Expected 2 payload bytes") + self.sensor_type = payload_bytes[0] + self.sensor_length = payload_bytes[1] + + +class IBusResponse_GET_SENSOR_VALUE(IBusResponse): + def handle_payload_bytes(self, payload_bytes): + self.sensor_value = payload_bytes + + def get_sensor_value(self): + '''returns an integer based off content''' + ret = 0 + for i in range(len(self.sensor_value)): + x = self.sensor_value[i] + if sys.version_info.major < 3: + x = ord(x) + ret = ret | (x << (i*8)) + return ret + + +class IBusRequest(IBusMessage): + def __init__(self, command, address): + self.command = command + self.address = address + + def payload_bytes(self): + '''most requests don't have a payload''' + return bytearray() + + def for_wire(self): + out = bytearray() + payload_bytes = self.payload_bytes() + payload_length = len(payload_bytes) + length = 1 + 1 + payload_length + 2 # len+cmd|adr+payloadlen+cksum + format_string = '> 8)) + return out + + +class IBusRequest_DISCOVER(IBusRequest): + def __init__(self, address): + super(IBusRequest_DISCOVER, self).__init__(0x08, address) + + +class IBusRequest_GET_SENSOR_TYPE(IBusRequest): + def __init__(self, address): + super(IBusRequest_GET_SENSOR_TYPE, self).__init__(0x09, address) + + +class IBusRequest_GET_SENSOR_VALUE(IBusRequest): + def __init__(self, address): + super(IBusRequest_GET_SENSOR_VALUE, self).__init__(0x0A, address) + + +class IBus(Telem): + def __init__(self, destination_address): + super(IBus, self).__init__(destination_address) + + def progress_tag(self): + return "IBus" + + def packet_from_buffer(self, buffer): + t = buffer[1] >> 4 + if sys.version_info.major < 3: + t = ord(t) + if t == 0x08: + return IBusResponse_DISCOVER(buffer) + if t == 0x09: + return IBusResponse_GET_SENSOR_TYPE(buffer) + if t == 0x0A: + return IBusResponse_GET_SENSOR_VALUE(buffer) + raise ValueError("Unknown response type (%u)" % t) + + def update_read(self): + self.buffer += self.do_read() + while len(self.buffer): + msglen = self.buffer[0] + if sys.version_info.major < 3: + msglen = ord(msglen) + if len(self.buffer) < msglen: + return + packet = self.packet_from_buffer(self.buffer[:msglen]) + self.buffer = self.buffer[msglen:] + return packet class WaitAndMaintain(object): @@ -14349,6 +14471,115 @@ SERIAL5_BAUD 128 self._MotorTest(self.run_cmd, **kwargs) self._MotorTest(self.run_cmd_int, **kwargs) + def test_ibus_voltage(self, message): + batt = self.mav.recv_match( + type='BATTERY_STATUS', + blocking=True, + timeout=5, + condition="BATTERY_STATUS.id==0" + ) + if batt is None: + raise NotAchievedException("Did not get BATTERY_STATUS message") + want = int(batt.voltages[0] * 0.1) + + if want != message.get_sensor_value(): + raise NotAchievedException("Bad voltage (want=%u got=%u)" % + (want, message.get_sensor_value())) + self.progress("iBus voltage OK") + + def test_ibus_armed(self, message): + got = message.get_sensor_value() + want = 1 if self.armed() else 0 + if got != want: + raise NotAchievedException("Expected armed %u got %u" % + (want, got)) + self.progress("iBus armed OK") + + def test_ibus_mode(self, message): + got = message.get_sensor_value() + want = self.mav.messages['HEARTBEAT'].custom_mode + if got != want: + raise NotAchievedException("Expected mode %u got %u" % + (want, got)) + self.progress("iBus mode OK") + + def test_ibus_get_response(self, ibus, timeout=5): + tstart = self.get_sim_time() + while True: + now = self.get_sim_time() + if now - tstart > timeout: + raise AutoTestTimeoutException("Failed to get ibus data") + packet = ibus.update() + if packet is not None: + return packet + + def IBus(self): + '''test the IBus protocol''' + self.set_parameter("SERIAL5_PROTOCOL", 49) + self.customise_SITL_commandline([ + "--serial5=tcp:6735" # serial5 spews to localhost:6735 + ]) + ibus = IBus(("127.0.0.1", 6735)) + ibus.connect() + + # expected_sensors should match the list created in AP_IBus_Telem + expected_sensors = { + # sensor id : (len, IBUS_MEAS_TYPE_*, test_function) + 1: (2, 0x15, self.test_ibus_armed), + 2: (2, 0x16, self.test_ibus_mode), + 5: (2, 0x03, self.test_ibus_voltage), + } + + for (sensor_addr, results) in expected_sensors.items(): + # first make sure it is present: + request = IBusRequest_DISCOVER(sensor_addr) + ibus.port.sendall(request.for_wire()) + + packet = self.test_ibus_get_response(ibus) + if packet.address != sensor_addr: + raise ValueError("Unexpected sensor address %u" % + (packet.address,)) + + (expected_length, expected_type, validator) = results + + self.progress("Getting sensor (%x) type" % (sensor_addr)) + request = IBusRequest_GET_SENSOR_TYPE(sensor_addr) + ibus.port.sendall(request.for_wire()) + + packet = self.test_ibus_get_response(ibus) + if packet.address != sensor_addr: + raise ValueError("Unexpected sensor address %u" % + (packet.address,)) + + if packet.sensor_type != expected_type: + raise ValueError("Unexpected sensor type want=%u got=%u" % + (expected_type, packet.sensor_type)) + + if packet.sensor_length != expected_length: + raise ValueError("Unexpected sensor len want=%u got=%u" % + (expected_length, packet.sensor_length)) + + self.progress("Getting sensor (%x) value" % (sensor_addr)) + request = IBusRequest_GET_SENSOR_VALUE(sensor_addr) + ibus.port.sendall(request.for_wire()) + + packet = self.test_ibus_get_response(ibus) + validator(packet) + + # self.progress("Ensure we cover all sensors") + # for i in range(1, 17): # zero is special + # if i in expected_sensors: + # continue + # request = IBusRequest_DISCOVER(i) + # ibus.port.sendall(request.for_wire()) + + # try: + # packet = self.test_ibus_get_response(ibus, timeout=1) + # except AutoTestTimeoutException: + # continue + # self.progress("Received packet (%s)" % str(packet)) + # raise NotAchievedException("IBus sensor %u is untested" % i) + def tests(self): return [ self.PIDTuning,