autotest: add test for MAVlite

This commit is contained in:
Peter Barker 2020-09-25 12:38:28 +10:00 committed by Andrew Tridgell
parent 4dec72a230
commit 18330d16de
2 changed files with 499 additions and 29 deletions

View File

@ -2086,6 +2086,10 @@ class AutoTestPlane(AutoTest):
"Test FrSky PassThrough serial output",
self.test_frsky_passthrough),
("FRSkyMAVlite",
"Test FrSky MAVlite serial output",
self.test_frsky_mavlite),
("FRSkyD",
"Test FrSkyD serial output",
self.test_frsky_d),

View File

@ -178,19 +178,24 @@ class MAVProxyLogFile(object):
sys.stdout.flush()
class Telem(object):
def __init__(self, destination_address, progress_function=None):
def __init__(self, destination_address, progress_function=None, verbose=False):
self.destination_address = destination_address
self.progress_function = progress_function
self.verbose = verbose
self.buffer = bytes()
self.connected = False
self.port = None
self.progress_function = progress_function
self.progress_log = ""
def progress(self, message):
message = "%s: %s" % (self.progress_tag(), message)
if self.progress_function is not None:
self.progress_function(message)
return
if not self.verbose:
self.progress_log += message
return
print(message)
def connect(self):
@ -460,9 +465,9 @@ class FRSkyD(FRSky):
super(FRSkyD, self).__init__(destination_address)
self.state_WANT_START_STOP_D = 16,
self.state_WANT_ID = 17,
self.state_WANT_BYTE1 = 18,
self.state_WANT_BYTE2 = 19,
self.state_WANT_ID = 17
self.state_WANT_BYTE1 = 18
self.state_WANT_BYTE2 = 19
self.START_STOP_D = 0x5E
self.BYTESTUFF_D = 0x5D
@ -534,20 +539,215 @@ class FRSkyD(FRSky):
return None
class SPortPacket(object):
def __init__(self):
self.START_STOP_SPORT = 0x7E
self.BYTESTUFF_SPORT = 0x7D
class SPortUplinkPacket(SPortPacket):
def __init__(self, appid0, appid1, data0, data1, data2, data3):
super(SPortUplinkPacket, self).__init__()
self.appid0 = appid0
self.appid1 = appid1
self.data0 = data0
self.data1 = data1
self.data2 = data2
self.data3 = data3
self.SENSOR_ID_UPLINK_ID = 0x0D
self.SPORT_UPLINK_FRAME = 0x30
self.uplink_id = self.SENSOR_ID_UPLINK_ID
self.frame = self.SPORT_UPLINK_FRAME
def packed(self):
return struct.pack('<BBBBBBBB',
self.uplink_id,
self.frame,
self.appid0 & 0xff,
self.appid1 & 0xff,
self.data0 & 0xff,
self.data1 & 0xff,
self.data2 & 0xff,
self.data3 & 0xff,
)
def update_checksum(self, byte):
self.checksum += byte;
self.checksum += self.checksum >> 8;
self.checksum &= 0xFF;
def checksum(self):
self.checksum = 0
self.update_checksum(self.frame & 0xff)
self.update_checksum(self.appid0 & 0xff)
self.update_checksum(self.appid1 & 0xff)
self.update_checksum(self.data0 & 0xff)
self.update_checksum(self.data1 & 0xff)
self.update_checksum(self.data2 & 0xff)
self.update_checksum(self.data3 & 0xff)
self.checksum = 0xff - ((self.checksum & 0xff) + (self.checksum >> 8));
return self.checksum & 0xff
def for_wire(self):
out = bytearray()
out.extend(self.packed())
out.extend(struct.pack('<B', self.checksum()))
stuffed = bytearray()
stuffed.extend(struct.pack('<B', self.START_STOP_SPORT))
for pbyte in out:
if pbyte in [self.BYTESTUFF_SPORT,
self.START_STOP_SPORT]:
# bytestuff
stuffed.append(self.BYTESTUFF_SPORT)
stuffed.append(pbyte ^ self.SPORT_FRAME_XOR)
else:
stuffed.append(pbyte)
return stuffed
class SPortPollPacket(SPortPacket):
def __init__(self, sensor):
super(SPortPollPacket, self).__init__()
self.sensor = sensor
def for_wire(self):
return struct.pack('<BB',
self.START_STOP_SPORT,
self.sensor & 0xff,
)
class MAVliteMessage(object):
def __init__(self, msgid, body):
self.msgid = msgid
self.body = body
self.SENSOR_ID_UPLINK_ID = 0x0D
self.SPORT_UPLINK_FRAME = 0x30
def checksum_bytes(self, some_bytes):
checksum = 0
for b in some_bytes:
checksum += b
checksum += checksum >> 8
checksum &= 0xFF
return checksum
def to_sport_packets(self):
ret = []
all_bytes = bytearray([len(self.body), self.msgid])
all_bytes.extend(self.body)
# insert sequence numbers:
seq = 0
sequenced = bytearray()
while len(all_bytes):
chunk = all_bytes[0:5]
all_bytes = all_bytes[5:]
sequenced.append(seq)
sequenced.extend(chunk)
seq += 1
# we may need another sport packet just for the checksum:
if len(sequenced) % 6 == 0:
sequenced.append(seq)
seq += 1
checksum = self.checksum_bytes(sequenced)
sequenced.append(checksum)
while len(sequenced):
chunk = sequenced[0:6]
sequenced = sequenced[6:]
chunk.extend([0] * (6-len(chunk))) # pad to 6
packet = SPortUplinkPacket(
*chunk
)
ret.append(packet)
return ret
class SPortToMAVlite(object):
def __init__(self):
self.state_WANT_LEN = "want len"
self.state_WANT_MSGID = "want msgid"
self.state_WANT_PAYLOAD = "want payload"
self.state_WANT_CHECKSUM = "want checksum"
self.state_MESSAGE_RECEIVED = "message received"
self.reset()
def progress(self, message):
print("SPortToMAVLite: %s" % message)
def reset(self):
self.want_seq = 0
self.all_bytes = bytearray()
self.payload = bytearray()
self.state = self.state_WANT_LEN
def checksum_bytes(self, some_bytes):
checksum = 0
for b in some_bytes:
checksum += b;
checksum += checksum >> 8;
checksum &= 0xFF;
return checksum
def downlink_handler(self, some_bytes):
'''adds some_bytes into a mavlite message'''
if some_bytes[0] == 0x00:
self.reset()
if some_bytes[0] != self.want_seq:
raise NotAchievedException("Unexpected seqno; want=%u got=%u" %
(self.want_seq, some_bytes[0]))
self.all_bytes.append(some_bytes[0])
self.want_seq += 1
for byte in some_bytes[1:]:
if self.state == self.state_WANT_LEN:
self.payload_len = byte
self.all_bytes.append(byte)
self.state = self.state_WANT_MSGID
continue
if self.state == self.state_WANT_MSGID:
self.msgid = byte
self.all_bytes.append(byte)
if self.payload_len == 0:
self.state = self.state_WANT_CHECKSUM
else:
self.state = self.state_WANT_PAYLOAD
continue
if self.state == self.state_WANT_PAYLOAD:
self.payload.append(byte)
self.all_bytes.append(byte)
if len(self.payload) == self.payload_len:
self.state = self.state_WANT_CHECKSUM
continue
if self.state == self.state_WANT_CHECKSUM:
calculated_checksum = self.checksum_bytes(self.all_bytes)
if calculated_checksum != byte:
raise Exception("Checksum failure (calc=%u) (recv=%u)" % (calculated_checksum, byte))
self.state = self.state_MESSAGE_RECEIVED
break
def get_message(self):
if self.state != self.state_MESSAGE_RECEIVED:
raise Exception("Wrong state")
return MAVliteMessage(self.msgid, self.payload)
class FRSkySPort(FRSky):
def __init__(self, destination_address):
def __init__(self, destination_address, verbose=True):
super(FRSkySPort, self).__init__(destination_address)
self.state_SEND_POLL = 34
self.state_WANT_FRAME_TYPE = 35
self.state_WANT_ID1 = 36,
self.state_WANT_ID2 = 37,
self.state_WANT_DATA = 38,
self.state_WANT_CRC = 39,
self.state_SEND_POLL = "sendpoll"
self.state_WANT_FRAME_TYPE = "want_frame_type"
self.state_WANT_ID1 = "want_id1"
self.state_WANT_ID2 = "want id2"
self.state_WANT_DATA = "want data"
self.state_WANT_CRC = "want crc"
self.START_STOP_SPORT = 0x7E
self.BYTESTUFF_SPORT = 0x7D
self.SPORT_DATA_FRAME = 0x10
self.SPORT_DOWNLINK_FRAME = 0x32
self.SPORT_FRAME_XOR = 0x20
self.SENSOR_ID_VARIO = 0x00 # Sensor ID 0
self.SENSOR_ID_FAS = 0x22 # Sensor ID 2
@ -555,6 +755,11 @@ class FRSkySPort(FRSky):
self.SENSOR_ID_SP2UR = 0xC6 # Sensor ID 6
self.SENSOR_ID_27 = 0x1B # Sensor ID 27
# MAVlite support:
self.SENSOR_ID_DOWNLINK1_ID = 0x34
self.SENSOR_ID_DOWNLINK2_ID = 0x67
self.SENSOR_ID_UPLINK_ID = 0x0D
self.state = self.state_WANT_FRAME_TYPE
self.data_by_id = {}
@ -589,8 +794,12 @@ class FRSkySPort(FRSky):
0x21: "BARO_ALT_AP",
0x28: "CURR",
0x30: "VARIO",
0x39: "VFAS"
0x39: "VFAS",
# 0x800: "GPS", ## comments as duplicated dictrionary key
0x34: "DOWNLINK1_ID",
0x67: "DOWNLINK2_ID",
0x0D: "UPLINK_ID",
}
self.sensors_to_poll = [
@ -601,9 +810,16 @@ class FRSkySPort(FRSky):
]
self.next_sensor_id_to_poll = 0 # offset into sensors_to_poll
self.data_downlink_handler = None
def progress_tag(self):
return "FRSkySPort"
def handle_data_downlink(self, some_bytes):
self.progress("DOWNLINK %s" % (str(some_bytes),))
if self.data_downlink_handler is not None:
self.data_downlink_handler(some_bytes)
def handle_data(self, dataid, value):
self.progress("%s (0x%x)=%u" % (self.id_descriptions[dataid], dataid, value))
self.data_by_id[dataid] = value
@ -667,11 +883,77 @@ class FRSkySPort(FRSky):
if sensor_id not in self.sensor_id_poll_counts:
self.sensor_id_poll_counts[sensor_id] = 0
self.sensor_id_poll_counts[sensor_id] += 1
buf = struct.pack('<BB', self.START_STOP_SPORT, sensor_id)
self.port.sendall(buf)
packet = SPortPollPacket(sensor_id)
self.send_sport_packet(packet)
self.state = self.state_WANT_FRAME_TYPE
self.poll_sent = now
def send_sport_packets(self, packets):
for packet in packets:
self.send_sport_packet(packet)
def send_sport_packet(self, packet):
stuffed = packet.for_wire()
self.progress("Sending (%s) (%u)" % (["0x%02x" % x for x in bytearray(stuffed)],len(stuffed)))
self.port.sendall(stuffed)
def send_mavlite_param_request_read(self, parameter_name):
mavlite_msg = MAVliteMessage(
mavutil.mavlink.MAVLINK_MSG_ID_PARAM_REQUEST_READ,
bytearray(parameter_name.encode())
)
packets = mavlite_msg.to_sport_packets()
self.send_sport_packets(packets)
def send_mavlite_param_set(self, parameter_name, value):
out = bytearray(struct.pack("<f", value))
out.extend(parameter_name.encode())
mavlite_msg = MAVliteMessage(
mavutil.mavlink.MAVLINK_MSG_ID_PARAM_SET,
out
)
packets = mavlite_msg.to_sport_packets()
self.send_sport_packets(packets)
def send_mavlite_command_long(self,
command,
p1=None,
p2=None,
p3=None,
p4=None,
p5=None,
p6=None,
p7=None,
):
params = bytearray()
seen_none = False
for p in p1, p2, p3, p4, p5, p6, p7:
if p is None:
seen_none = True
continue
if seen_none:
raise ValueError("Can't have values after Nones!")
params.extend(bytearray(struct.pack("<f", p)))
out = bytearray(struct.pack("<H", command)) # first two bytes are command-id
options = len(params) // 4 # low-three-bits is parameter count
out.extend(bytearray(struct.pack("<B", options))) # second byte is options
out.extend(params) # then the float values
mavlite_msg = MAVliteMessage(
mavutil.mavlink.MAVLINK_MSG_ID_COMMAND_LONG,
out
)
packets = mavlite_msg.to_sport_packets()
self.send_sport_packets(packets)
def update(self):
if not self.connected:
if not self.connect():
@ -694,15 +976,16 @@ class FRSkySPort(FRSky):
b = ord(self.buffer[0])
# self.progress("Have (%s) bytes state=%s b=0x%02x" % (str(len(self.buffer)), str(self.state), b));
if self.state == self.state_WANT_FRAME_TYPE:
if b != self.SPORT_DATA_FRAME:
# we may come into a stream mid-way, so we can't judge
self.progress("############# Bad char %x" % b)
raise ValueError("Bad char (0x%02x)" % b)
self.bad_chars += 1
if b in [self.SPORT_DATA_FRAME, self.SPORT_DOWNLINK_FRAME] :
self.frame = b
self.crc = 0
self.calc_crc(b)
self.state = self.state_WANT_ID1
continue
self.crc = 0
self.calc_crc(b)
self.state = self.state_WANT_ID1
# we may come into a stream mid-way, so we can't judge
self.progress("############# Bad char %x" % b)
raise ValueError("Bad char (0x%02x)" % b)
self.bad_chars += 1
continue
elif self.state == self.state_WANT_ID1:
self.id1 = self.read_bytestuffed_byte()
@ -717,7 +1000,7 @@ class FRSkySPort(FRSky):
break
self.calc_crc(self.id2)
self.state = self.state_WANT_DATA
self.data_byte_count = 0
self.data_bytes = []
self.data = 0
continue
elif self.state == self.state_WANT_DATA:
@ -725,9 +1008,9 @@ class FRSkySPort(FRSky):
if data_byte is None:
break
self.calc_crc(data_byte)
self.data = self.data | (data_byte << (8*(self.data_byte_count)))
self.data_byte_count += 1
if self.data_byte_count == 4:
self.data = self.data | (data_byte << (8*(len(self.data_bytes))))
self.data_bytes.append(data_byte)
if len(self.data_bytes) == 4:
self.state = self.state_WANT_CRC
continue
elif self.state == self.state_WANT_CRC:
@ -740,10 +1023,24 @@ class FRSkySPort(FRSky):
self.progress("Incorrect frsky checksum (received=%02x calculated=%02x id=0x%x)" % (crc, self.crc, dataid))
# raise ValueError("Incorrect frsky checksum (want=%02x got=%02x id=0x%x)" % (crc, self.crc, dataid))
else:
self.handle_data(dataid, self.data)
if self.frame == self.SPORT_DOWNLINK_FRAME:
self.handle_data_downlink([
self.id1,
self.id2,
self.data_bytes[0],
self.data_bytes[1],
self.data_bytes[2],
self.data_bytes[3]]
)
else:
self.handle_data(dataid, self.data)
self.state = self.state_SEND_POLL
elif self.state == self.state_SEND_POLL:
# this is done in check_poll
print("in send_poll state")
pass
else:
raise ValueError("Unknown state (%u)" % self.state)
raise ValueError("Unknown state (%s)" % self.state)
def get_data(self, dataid):
try:
@ -7124,6 +7421,175 @@ switch value'''
self.progress("Counts of dataids received:")
frsky.dump_dataid_counts_as_progress_messages()
def decode_mavlite_param_value(self, message):
'''returns a tuple of parameter name, value'''
(value,) = struct.unpack("<f", message[0:4])
name = message[4:]
return (name, value)
def decode_mavlite_command_ack(self, message):
'''returns a tuple of parameter name, value'''
(command,result) = struct.unpack("<HB", message)
return (command, result)
def read_message_via_mavlite(self, frsky, sport_to_mavlite):
'''read bytes from frsky mavlite stream, trying to form up a mavlite
message'''
tstart = self.get_sim_time_cached()
while True:
self.drain_mav(quiet=True)
tnow = self.get_sim_time_cached()
if tnow - tstart > 30:
raise NotAchievedException("Did not get parameter via mavlite")
frsky.update()
if sport_to_mavlite.state == sport_to_mavlite.state_MESSAGE_RECEIVED:
message = sport_to_mavlite.get_message()
sport_to_mavlite.reset()
# self.progress("############ message received (type=%u)" % message.msgid)
return message
def read_parameter_via_mavlite(self, frsky, sport_to_mavlite, name):
tstart = self.get_sim_time_cached()
while True:
tnow = self.get_sim_time_cached()
if tnow - tstart > 30:
raise NotAchievedException("Did not get parameter via mavlite")
message = self.read_message_via_mavlite(frsky, sport_to_mavlite)
if message.msgid != mavutil.mavlink.MAVLINK_MSG_ID_PARAM_VALUE:
raise NotAchievedException("Unexpected msgid %u received" % message.msgid)
(got_name, value) = self.decode_mavlite_param_value(message.body)
# self.progress("Received parameter: %s=%f" % (name, value))
got_name = got_name.decode('ascii')
if got_name != name:
raise NotAchievedException("Incorrect name received (want=%s) (got=%s)" % (name, got_name))
return value
def get_parameter_via_mavlite(self, frsky, sport_to_mavlite, name):
# self.progress("########## Sending request")
frsky.send_mavlite_param_request_read(name)
return self.read_parameter_via_mavlite(frsky, sport_to_mavlite, name)
def set_parameter_via_mavlite(self, frsky, sport_to_mavlite, name, value):
tstart = self.get_sim_time_cached()
# self.progress("########## Sending request")
frsky.send_mavlite_param_set(name, value)
# new value is echoed back immediately:
got_val = self.read_parameter_via_mavlite(frsky, sport_to_mavlite, name)
if abs(got_val - value) > 0.00001:
raise NotAchievedException("Returned value not same as set value (want=%f got=%f)" % (value, got_val))
def run_cmd_via_mavlite(self,
frsky,
sport_to_mavlite,
command,
p1=None,
p2=None,
p3=None,
p4=None,
p5=None,
p6=None,
p7=None,
want_result=mavutil.mavlink.MAV_RESULT_ACCEPTED):
frsky.send_mavlite_command_long(command,
p1=p1,
p2=p2,
p3=p3,
p4=p4,
p5=p5,
p6=p6,
p7=p7,
)
self.run_cmd_via_mavlite_get_ack(frsky,
sport_to_mavlite,
command,
want_result
)
def run_cmd_via_mavlite_get_ack(self, frsky, sport_to_mavlite, command, want_result):
'''expect and read a command-ack from frsky sport passthrough'''
msg = self.read_message_via_mavlite(frsky, sport_to_mavlite)
if msg.msgid != mavutil.mavlink.MAVLINK_MSG_ID_COMMAND_ACK:
raise NotAchievedException("Expected a command-ack, got a %u" % msg.msgid)
(got_command, got_result) = self.decode_mavlite_command_ack(msg.body)
if got_command != command:
raise NotAchievedException("Did not receive expected command in command_ack; want=%u got=%u" % (command, got_command))
if got_result != want_result:
raise NotAchievedException("Did not receive expected result in command_ack; want=%u got=%u" % (want_result, got_result))
def test_frsky_mavlite(self):
self.set_parameter("SERIAL5_PROTOCOL", 10) # serial5 is FRSky passthrough
self.customise_SITL_commandline([
"--uartF=tcp:6735" # serial5 spews to localhost:6735
])
frsky = FRSkyPassThrough(("127.0.0.1", 6735))
frsky.connect()
sport_to_mavlite = SPortToMAVlite()
frsky.data_downlink_handler = sport_to_mavlite.downlink_handler
self.start_subtest("Get parameter via MAVlite")
param_name = "STAB_PITCH_DOWN" # FIXME: want common across vehicles
set_value = 97.21
self.set_parameter(param_name, set_value) # DO NOT FLY
got_value = self.get_parameter_via_mavlite(frsky,
sport_to_mavlite,
param_name)
if abs(got_value - set_value) > 0.00001:
raise NotAchievedException("Incorrect value retrieved via mavlite (want=%f got=%f)" % (set_value, got_value))
self.progress("Got value OK")
self.end_subtest("Get parameter via MAVlite")
self.start_subtest("Set parameter via MAVlite")
param_name = "STAB_PITCH_DOWN" # FIXME: want common across vehicles
set_value = 91.67
# frsky.verbose = True
self.set_parameter_via_mavlite(frsky, sport_to_mavlite, param_name, set_value) # DO NOT FLY
got_value = self.get_parameter(param_name)
if abs(got_value - set_value) > 0.00001:
raise NotAchievedException("Incorrect value retrieved via mavlink (want=%f got=%f)" % (set_value, got_value))
self.progress("Set value OK")
self.end_subtest("Set parameter via MAVlite")
self.start_subtest("Calibrate Baro via MAVLite")
self.context_push()
self.context_collect("STATUSTEXT")
self.run_cmd_via_mavlite(frsky,
sport_to_mavlite,
mavutil.mavlink.MAV_CMD_PREFLIGHT_CALIBRATION,
p1=0,
p2=0,
p3=1.0,
)
self.wait_statustext("Updating barometer calibration", check_context=True)
self.context_pop()
self.end_subtest("Calibrate Baro via MAVLite")
self.start_subtest("Change mode via MAVLite")
# FIXME: currently plane-specific
self.run_cmd_via_mavlite(frsky,
sport_to_mavlite,
mavutil.mavlink.MAV_CMD_DO_SET_MODE,
p1=mavutil.mavlink.PLANE_MODE_MANUAL,
)
self.wait_mode("MANUAL")
self.run_cmd_via_mavlite(frsky,
sport_to_mavlite,
mavutil.mavlink.MAV_CMD_DO_SET_MODE,
p1=mavutil.mavlink.PLANE_MODE_FLY_BY_WIRE_A,
)
self.wait_mode("FBWA")
self.end_subtest("Change mode via MAVLite")
self.start_subtest("Enable fence via MAVlite")
# FIXME: currently plane-specific
self.run_cmd_via_mavlite(frsky,
sport_to_mavlite,
mavutil.mavlink.MAV_CMD_DO_FENCE_ENABLE,
p1=1,
want_result=mavutil.mavlink.MAV_RESULT_UNSUPPORTED,
)
self.end_subtest("Enable fence via MAVlite")
def tfs_validate_gps_alt(self, value):
self.progress("validating gps altitude integer part (0x%02x)" % value)
alt = value