Tools: autotest added frsky serial_protocol 4 and 10 testing against mavlink telemetry

This commit is contained in:
yaapu 2020-03-21 01:24:35 +01:00 committed by Peter Barker
parent 1191a5e1ea
commit 75b621a09f

View File

@ -505,7 +505,7 @@ class FRSkySPort(FRSky):
self.SENSOR_ID_FAS = 0x22 # Sensor ID 2 self.SENSOR_ID_FAS = 0x22 # Sensor ID 2
self.SENSOR_ID_GPS = 0x83 # Sensor ID 3 self.SENSOR_ID_GPS = 0x83 # Sensor ID 3
self.SENSOR_ID_SP2UR = 0xC6 # Sensor ID 6 self.SENSOR_ID_SP2UR = 0xC6 # Sensor ID 6
self.SENSOR_ID_28 = 0x1B # Sensor ID 28 self.SENSOR_ID_27 = 0x1B # Sensor ID 27
self.state = self.state_WANT_FRAME_TYPE self.state = self.state_WANT_FRAME_TYPE
@ -527,13 +527,19 @@ class FRSkySPort(FRSky):
0x5007: "parameters", 0x5007: "parameters",
# SPort non-passthrough: # SPort non-passthrough:
0x01: "GPS_ALT_BP",
0x02: "Temp1", 0x02: "Temp1",
0x04: "Fuel", 0x04: "Fuel",
0x05: "Temp2", 0x05: "Temp2",
0x10: "Baro Alt BP", 0x09: "GPS_ALT_AP",
0x10: "BARO_ALT_BP",
0x11: "GPS_ALT_AP",
0x14: "HDG",
0x19: "GPS_SPEED_AP",
0x21: "BARO_ALT_AP", 0x21: "BARO_ALT_AP",
0x28: "CURR",
0x30: "VARIO", 0x30: "VARIO",
0x39: "VFAS", 0x39: "VFAS"
# 0x800: "GPS", ## comments as duplicated dictrionary key # 0x800: "GPS", ## comments as duplicated dictrionary key
} }
@ -686,7 +692,7 @@ class FRSkyPassThrough(FRSkySPort):
def __init__(self, destination_address): def __init__(self, destination_address):
super(FRSkyPassThrough, self).__init__(destination_address) super(FRSkyPassThrough, self).__init__(destination_address)
self.sensors_to_poll = [self.SENSOR_ID_28] self.sensors_to_poll = [self.SENSOR_ID_27]
def progress(self, message): def progress(self, message):
print("FRSkyPassthrough: %s" % message) print("FRSkyPassthrough: %s" % message)
@ -6053,22 +6059,188 @@ switch value'''
if m3.state != 0: if m3.state != 0:
raise NotAchievedException("Didn't get expected mask back in message (mask=0 state=%u" % (m3.state)) raise NotAchievedException("Didn't get expected mask back in message (mask=0 state=%u" % (m3.state))
def tfp_validate_vel_and_yaw(self, value): def compare_number_percent(self, num1, num2, percent):
self.progress("validating vel_and_yaw(0x%02x)" % value) if num1 == 0 and num2 == 0:
VELANDYAW_XYVEL_OFFSET = 9 return True
VELANDYAW_YAW_LIMIT = 0x7FF if abs(num1-num2)/max(abs(num1),abs(num2)) <= percent*0.01:
VELANDYAW_YAW_OFFSET = 16 return True
yaw = value >> VELANDYAW_YAW_OFFSET return False
xy_vel = value >> VELANDYAW_XYVEL_OFFSET & 0xFF
z_vel_dm_per_second = value & 0xFFFF
gpi = self.mav.recv_match(type='GLOBAL_POSITION_INT', blocking=True, timeout=1) def bit_extract(self,number,offset,length):
mask = 0
for i in range(offset,offset+length):
mask |= 1 << i
return (number & mask) >> offset
def tf_encode_gps_latitude(self,lat):
value = 0
if lat < 0:
value = ((abs(lat)/100)*6) | 0x40000000
else:
value = ((abs(lat)/100)*6)
return value
def tf_validate_gps(self, value): # shared by proto 4 and proto 10
self.progress("validating gps (0x%02x)" % value)
lat = value
gpi = self.mav.recv_match(
type='GLOBAL_POSITION_INT',
blocking=True,
timeout=1
)
if gpi is None:
raise NotAchievedException("Did not get GLOBAL_POSITION_INT message")
gpi_lat = self.tf_encode_gps_latitude(gpi.lat)
self.progress("GLOBAL_POSITION_INT lat==%f frsky==%f" % (gpi_lat, lat))
if gpi_lat == lat:
return True
return False
def tfp_prep_number(self,number,digits,power):
res = 0
abs_number = abs(number)
if digits == 2 and power == 1: # number encoded on 8 bits: 7 bits for digits + 1 for 10^power
if abs_number < 100:
res = abs_number<<1
elif abs_number < 1270:
res = (round(abs_number * 0.1)<<1)|0x1
else: # transmit max possible value (0x7F x 10^1 = 1270)
res = 0xFF
if number < 0: # if number is negative, add sign bit in front
res |= 0x1<<8
elif digits == 2 and power == 2: # number encoded on 9 bits: 7 bits for digits + 2 for 10^power
if abs_number < 100:
res = abs_number<<2
elif abs_number < 1000:
res = (round(abs_number * 0.1)<<2)|0x1
elif abs_number < 10000:
res = (round(abs_number * 0.01)<<2)|0x2
elif abs_number < 127000:
res = (round(abs_number * 0.001)<<2)|0x3
else: # transmit max possible value (0x7F x 10^3 = 127000)
res = 0x1FF
if number < 0: # if number is negative, add sign bit in front
res |= 0x1<<9
elif digits == 3 and power == 1: # number encoded on 11 bits: 10 bits for digits + 1 for 10^power
if abs_number < 1000:
res = abs_number<<1
elif abs_number < 10240:
res = (round(abs_number * 0.1)<<1)|0x1
else: # transmit max possible value (0x3FF x 10^1 = 10240)
res = 0x7FF
if number < 0: # if number is negative, add sign bit in front
res |= 0x1<<11
elif digits == 3 and power == 2: # number encoded on 12 bits: 10 bits for digits + 2 for 10^power
if abs_number < 1000:
res = abs_number<<2
elif abs_number < 10000:
res = (round(abs_number * 0.1)<<2)|0x1
elif abs_number < 100000:
res = (round(abs_number * 0.01)<<2)|0x2
elif abs_number < 1024000:
res = (round(abs_number * 0.001)<<2)|0x3
else: # transmit max possible value (0x3FF x 10^3 = 127000)
res = 0xFFF
if number < 0: # if number is negative, add sign bit in front
res |= 0x1<<12
return res
def tfp_validate_ap_status(self, value): # 0x5001
self.progress("validating ap_status(0x%02x)" % value)
flight_mode = self.bit_extract(value,0,5) - 1 # first mode is 1 not 0 :-)
simple_mode = self.bit_extract(value,5,2)
is_flying = not self.bit_extract(value,7,1)
status_armed = self.bit_extract(value,8,1)
batt_failsafe = self.bit_extract(value,9,1)
ekf_failsafe = self.bit_extract(value,10,2)
imu_temp = self.bit_extract(value,26,6) + 19 # IMU temperature: 0 means temp =< 19, 63 means temp => 82
heartbeat = self.mav.recv_match(
type='HEARTBEAT',
blocking=True,
timeout=1
)
if heartbeat is None:
raise NotAchievedException("Did not get HEARTBEAT message")
mav_flight_mode = heartbeat.custom_mode
self.progress(" mode=%u heartbeat=%u" % (flight_mode, mav_flight_mode))
if mav_flight_mode == flight_mode:
self.progress("flight mode match")
return True
# FIXME: need to check other values as well
return False
def tfp_validate_attitude(self, value):
self.progress("validating attitude(0x%02x)" % value)
roll = (min(self.bit_extract(value,0,11),1800) - 900) * 0.2 # roll [0,1800] ==> [-180,180]
pitch = (min(self.bit_extract(value,11,10),900) - 450) * 0.2 # pitch [0,900] ==> [-90,90]
rng_cm = self.bit_extract(value,22,10) * (10^self.bit_extract(value,21,1)) # cm
atti = self.mav.recv_match(
type='ATTITUDE',
blocking=True,
timeout=1
)
if atti is None:
raise NotAchievedException("Did not get ATTITUDE message")
atti_roll = round(atti.roll)
self.progress("ATTITUDE roll==%f frsky==%f" % (atti_roll, roll))
if abs(atti_roll - roll) < 5:
return True
# FIXME: need to check other values as well
return False
def tfp_validate_home_status(self, value):
self.progress("validating home status(0x%02x)" % value)
home_dist_m = self.bit_extract(value,2,10) * (10^self.bit_extract(value,0,2))
home_alt_m = self.bit_extract(value,14,10) * (10^self.bit_extract(value,12,2)) * 0.1 * (self.bit_extract(value,24,1) == 1 and -1 or 1)
home_angle_d = self.bit_extract(value, 25, 7) * 3
gpi = self.mav.recv_match(
type='GLOBAL_POSITION_INT',
blocking=True,
timeout=1
)
if gpi is None:
raise NotAchievedException("Did not get GLOBAL_POSITION_INT message")
gpi_relative_alt = gpi.relative_alt
self.progress("GLOBAL_POSITION_INT rel_alt==%f frsky==%f" % (gpi_relative_alt, home_alt_m))
if abs(gpi_relative_alt - home_alt_m) < 1:
return True
# FIXME: need to check other values as well
return False
def tfp_validate_gps_status(self, value):
self.progress("validating gps status(0x%02x)" % value)
num_sats = self.bit_extract(value,0,4)
gps_status = self.bit_extract(value,4,2) + self.bit_extract(value,14,2)
gps_hdop = self.bit_extract(value,7,7) * (10^self.bit_extract(value,6,1)) # dm
gps_alt = self.bit_extract(value,24,7) * (10^self.bit_extract(value,22,2)) * (self.bit_extract(value,31,1) == 1 and -1 or 1) # dm
gri = self.mav.recv_match(
type='GPS_RAW_INT',
blocking=True,
timeout=1
)
if gri is None:
raise NotAchievedException("Did not get GPS_RAW_INT message")
gri_status = gri.fix_type
self.progress("GPS_RAW_INT fix_type==%f frsky==%f" % (gri_status, gps_status))
if gps_status == gri_status:
return True
# FIXME: need to check other values as well
return False
def tfp_validate_vel_and_yaw(self, value): # 0x5005
self.progress("validating vel_and_yaw(0x%02x)" % value)
z_vel_dm_per_second = self.bit_extract(value,1,7) * (10^self.bit_extract(value,0,1)) * (self.bit_extract(value,8,1) == 1 and -1 or 1)
xy_vel = self.bit_extract(value,10,7) * (10^self.bit_extract(value,9,1))
yaw = self.bit_extract(value,17,11) * 0.2
gpi = self.mav.recv_match(
type='GLOBAL_POSITION_INT',
blocking=True,
timeout=1
)
if gpi is None: if gpi is None:
return return
self.progress(" yaw=%u gpi=%u" % (yaw, gpi.hdg)) self.progress(" yaw=%u gpi=%u" % (yaw, gpi.hdg*0.01))
self.progress(" xy_vel=%u" % xy_vel) self.progress(" xy_vel=%u" % xy_vel)
self.progress(" z_vel_dm_per_second=%u" % z_vel_dm_per_second) self.progress(" z_vel_dm_per_second=%u" % z_vel_dm_per_second)
if int(round(yaw/10.0)) == int(round(gpi.hdg/100.0)): if self.compare_number_percent(gpi.hdg*0.01,yaw,10):
self.progress("Yaw match") self.progress("Yaw match")
return True return True
# FIXME: need to be under way to check the velocities, really.... # FIXME: need to be under way to check the velocities, really....
@ -6076,11 +6248,10 @@ switch value'''
def tfp_validate_battery1(self, value): def tfp_validate_battery1(self, value):
self.progress("validating battery1 (0x%02x)" % value) self.progress("validating battery1 (0x%02x)" % value)
BATT_VOLTAGE_LIMIT = 0x1FF voltage = self.bit_extract(value,0,9) #dV
BATT_CURRENT_OFFSET = 9 current = self.bit_extract(value,10,7) * (10^self.bit_extract(value,9,1))
BATT_TOTALMAH_LIMIT = 0x7FFF mah = self.bit_extract(value,17,15)
BATT_TOTALMAH_OFFSET = 17 voltage = value * 0.1
voltage = (value & BATT_VOLTAGE_LIMIT)/10.0
batt = self.mav.recv_match( batt = self.mav.recv_match(
type='BATTERY_STATUS', type='BATTERY_STATUS',
blocking=True, blocking=True,
@ -6090,10 +6261,31 @@ switch value'''
if batt is None: if batt is None:
raise NotAchievedException("Did not get BATTERY_STATUS message") raise NotAchievedException("Did not get BATTERY_STATUS message")
battery_status_value = batt.voltages[0]/1000.0 battery_status_value = batt.voltages[0]/1000.0
self.progress("BATTERY_STATUS==%f frsky==%f" % (battery_status_value, voltage)) self.progress("BATTERY_STATUS voltage==%f frsky==%f" % (battery_status_value, voltage))
if abs(battery_status_value - voltage) > 0.1: if abs(battery_status_value - voltage) > 0.1:
return False return False
# FIXME: need to check other values as well
return True return True
def tfp_validate_params(self, value):
self.progress("validating params (0x%02x)" % value)
param_id = self.bit_extract(value,24,4)
param_value = self.bit_extract(value,0,24)
if param_id != 1:
return False
frame_type = param_value
hb = self.mav.recv_match(
type='HEARTBEAT',
blocking=True,
timeout=1,
)
if hb is None:
raise NotAchievedException("Did not get HEARTBEAT message")
hb_type = hb.type
self.progress("HEARTBEAT type==%f frsky==%f" % (hb_type, frame_type))
if hb_type == frame_type:
return True
# FIXME: need to check other values as well
return False
def test_frsky_passthrough(self): def test_frsky_passthrough(self):
self.set_parameter("SERIAL5_PROTOCOL", 10) # serial5 is FRSky passthrough self.set_parameter("SERIAL5_PROTOCOL", 10) # serial5 is FRSky passthrough
@ -6170,15 +6362,15 @@ switch value'''
# message. These are ordered according to the wfq scheduler # message. These are ordered according to the wfq scheduler
wants = { wants = {
0x5000: lambda xx : True, 0x5000: lambda xx : True,
0x5006: lambda xx : True, 0x5006: self.tfp_validate_attitude,
0x800: lambda xx : True, 0x800: self.tf_validate_gps,
0x5005: self.tfp_validate_vel_and_yaw, 0x5005: self.tfp_validate_vel_and_yaw,
0x5001: lambda xx : True, 0x5001: self.tfp_validate_ap_status,
0x5002: lambda xx : True, 0x5002: self.tfp_validate_gps_status,
0x5004: lambda xx : True, 0x5004: self.tfp_validate_home_status,
# 0x5008: lambda x : True, # no second battery, so this doesn't arrive #0x5008: lambda x : True, # no second battery, so this doesn't arrive
0x5003: self.tfp_validate_battery1, 0x5003: self.tfp_validate_battery1,
0x5007: lambda xx : True, 0x5007: self.tfp_validate_params,
} }
tstart = self.get_sim_time_cached() tstart = self.get_sim_time_cached()
while len(wants): while len(wants):
@ -6197,6 +6389,160 @@ switch value'''
self.progress(" Fulfilled") self.progress(" Fulfilled")
del wants[want] del wants[want]
def tfs_validate_gps_alt(self, value):
self.progress("validating gps altitude integer part (0x%02x)" % value)
alt = value
gpi = self.mav.recv_match(
type='GLOBAL_POSITION_INT',
blocking=True,
timeout=1
)
if gpi is None:
raise NotAchievedException("Did not get GLOBAL_POSITION_INT message")
gpi_alt = round(gpi.alt*0.001)
self.progress("GLOBAL_POSITION_INT alt==%f frsky==%f" % (gpi_alt, alt))
if self.compare_number_percent(gpi_alt,alt,10):
return True
return False
def tfs_validate_baro_alt(self, value):
self.progress("validating baro altitude integer part (0x%02x)" % value)
alt = value
gpi = self.mav.recv_match(
type='GLOBAL_POSITION_INT',
blocking=True,
timeout=1
)
if gpi is None:
raise NotAchievedException("Did not get GLOBAL_POSITION_INT message")
gpi_alt = round(gpi.relative_alt)
self.progress("GLOBAL_POSITION_INT relative_alt==%f frsky==%f" % (gpi_alt, alt))
if abs(gpi_alt -alt) < 10:
return True
return False
def tfs_validate_gps_speed(self, value):
self.progress("validating gps speed integer part (0x%02x)" % value)
speed = value
vfr_hud = self.mav.recv_match(
type='VFR_HUD',
blocking=True,
timeout=1
)
if vfr_hud is None:
raise NotAchievedException("Did not get VFR_HUD message")
vfr_hud_speed = round(vfr_hud.groundspeed)
self.progress("VFR_HUD groundspeed==%f frsky==%f" % (vfr_hud_speed, speed))
if self.compare_number_percent(vfr_hud_speed,speed,10):
return True
return False
def tfs_validate_yaw(self, value):
self.progress("validating yaw (0x%02x)" % value)
yaw = value
vfr_hud = self.mav.recv_match(
type='VFR_HUD',
blocking=True,
timeout=1
)
if vfr_hud is None:
raise NotAchievedException("Did not get VFR_HUD message")
vfr_hud_yaw = round(vfr_hud.heading)
self.progress("VFR_HUD heading==%f frsky==%f" % (vfr_hud_yaw, yaw))
if self.compare_number_percent(vfr_hud_yaw,yaw,10):
return True
return False
def tfs_validate_vspeed(self, value):
self.progress("validating vspeed (0x%02x)" % value)
vspeed = value
vfr_hud = self.mav.recv_match(
type='VFR_HUD',
blocking=True,
timeout=1
)
if vfr_hud is None:
raise NotAchievedException("Did not get VFR_HUD message")
vfr_hud_vspeed = round(vfr_hud.climb)
self.progress("VFR_HUD climb==%f frsky==%f" % (vfr_hud_vspeed, vspeed))
if self.compare_number_percent(vfr_hud_vspeed,vspeed,10):
return True
return False
def tfs_validate_battery1(self, value):
self.progress("validating battery1 (0x%02x)" % value)
voltage = value/10.0
batt = self.mav.recv_match(
type='BATTERY_STATUS',
blocking=True,
timeout=5,
condition="BATTERY_STATUS.id==0"
)
if batt is None:
raise NotAchievedException("Did not get BATTERY_STATUS message")
battery_status_value = batt.voltages[0]/1000.0
self.progress("BATTERY_STATUS volatge==%f frsky==%f" % (battery_status_value, voltage))
if self.compare_number_percent(battery_status_value,voltage,10):
return True
return False
def tfs_validate_current1(self, value):
self.progress("validating battery1 (0x%02x)" % value)
current = value
batt = self.mav.recv_match(
type='BATTERY_STATUS',
blocking=True,
timeout=5,
condition="BATTERY_STATUS.id==0"
)
if batt is None:
raise NotAchievedException("Did not get BATTERY_STATUS message")
battery_status_current = batt.current_battery/100.0
self.progress("BATTERY_STATUS current==%f frsky==%f" % (battery_status_current, current))
if self.compare_number_percent(battery_status_current,current,10):
return True
return False
def tfs_validate_fuel(self, value):
self.progress("validating fuel (0x%02x)" % value)
fuel = value
batt = self.mav.recv_match(
type='BATTERY_STATUS',
blocking=True,
timeout=5,
condition="BATTERY_STATUS.id==0"
)
if batt is None:
raise NotAchievedException("Did not get BATTERY_STATUS message")
battery_status_fuel = batt.battery_remaining
self.progress("BATTERY_STATUS fuel==%f frsky==%f" % (battery_status_fuel, fuel))
if self.compare_number_percent(battery_status_fuel,fuel,10):
return True
return False
def tfs_validate_tmp1(self, value):
self.progress("validating tmp1 (0x%02x)" % value)
tmp1 = value
heartbeat = self.mav.recv_match(
type='HEARTBEAT',
blocking=True,
timeout=1
)
if heartbeat is None:
raise NotAchievedException("Did not get HEARTBEAT message")
heartbeat_tmp1 = heartbeat.custom_mode
self.progress("GLOBAL_POSITION_INT custom_mode==%f frsky==%f" % (heartbeat_tmp1, tmp1))
if heartbeat_tmp1 == tmp1:
return True
return False
def tfs_validate_tmp2(self, value):
self.progress("validating tmp2 (0x%02x)" % value)
tmp2 = value
gps_raw = self.mav.recv_match(
type='GPS_RAW_INT',
blocking=True,
timeout=1
)
if gps_raw is None:
raise NotAchievedException("Did not get GPS_RAW_INT message")
gps_raw_tmp2 = gps_raw.satellites_visible*10 + gps_raw.fix_type
self.progress("GPS_RAW_INT tmp2==%f frsky==%f" % (gps_raw_tmp2, tmp2))
if gps_raw_tmp2 == tmp2:
return True
return False
def test_frsky_sport(self): def test_frsky_sport(self):
self.set_parameter("SERIAL5_PROTOCOL", 4) # serial5 is FRSky sport self.set_parameter("SERIAL5_PROTOCOL", 4) # serial5 is FRSky sport
self.customise_SITL_commandline([ self.customise_SITL_commandline([
@ -6209,14 +6555,20 @@ switch value'''
# This, at least makes sure we're getting some of each # This, at least makes sure we're getting some of each
# message. # message.
wants = { wants = {
0x02: lambda x : True, 0x01: self.tfs_validate_gps_alt, # gps altitude integer m
0x04: lambda x : True, 0x02: self.tfs_validate_tmp1, # Tmp1
0x05: lambda x : True, 0x04: self.tfs_validate_fuel, # fuel
0x10: lambda x : True, 0x05: self.tfs_validate_tmp2, # Tmp2
0x21: lambda x : True, 0x09: lambda x : True, # gps altitude decimal cm
0x30: lambda x : True, 0x10: self.tfs_validate_baro_alt, # baro alt integer m
0x39: lambda x : True, 0x11: self.tfs_validate_gps_speed, # gps speed integer m/s
0x800: lambda x : True, 0x14: self.tfs_validate_yaw, # yaw in degrees
0x19: lambda x : True, # gps speed decimal cm/s
0x21: lambda x : True, # altitude decimal m
0x28: self.tfs_validate_current1, # current A
0x30: self.tfs_validate_vspeed, # vertical speed m/s
0x39: self.tfs_validate_battery1, # battery 1 voltage
0x800: self.tf_validate_gps, # gps lat/lon
} }
tstart = self.get_sim_time_cached() tstart = self.get_sim_time_cached()
last_wanting_print = 0 last_wanting_print = 0
@ -6226,7 +6578,7 @@ switch value'''
self.progress("Still wanting (%s)" % ",".join([ ("0x%02x" % x) for x in wants.keys()])) self.progress("Still wanting (%s)" % ",".join([ ("0x%02x" % x) for x in wants.keys()]))
last_wanting_print = now last_wanting_print = now
wants_copy = copy.copy(wants) wants_copy = copy.copy(wants)
if now - tstart > 10: if now - tstart > 60:
raise AutoTestTimeoutException("Failed to get frsky data") raise AutoTestTimeoutException("Failed to get frsky data")
frsky.update() frsky.update()
for want in wants_copy: for want in wants_copy: