Tools: added autotest for frsky rpm telemetry

This commit is contained in:
yaapu 2021-01-29 10:02:42 +01:00 committed by Peter Barker
parent d5868e3c7d
commit bb576a49b8
1 changed files with 101 additions and 22 deletions

View File

@ -794,6 +794,7 @@ class FRSkySPort(FRSky):
self.SENSOR_ID_VARIO = 0x00 # Sensor ID 0
self.SENSOR_ID_FAS = 0x22 # Sensor ID 2
self.SENSOR_ID_GPS = 0x83 # Sensor ID 3
self.SENSOR_ID_RPM = 0xE4 # Sensor ID 4
self.SENSOR_ID_SP2UR = 0xC6 # Sensor ID 6
self.SENSOR_ID_27 = 0x1B # Sensor ID 27
@ -822,6 +823,7 @@ class FRSkySPort(FRSky):
0x5008: "Battery 2 status",
0x5003: "Battery 1 status",
0x5007: "parameters",
0x500A: "rpm",
# SPort non-passthrough:
0x01: "GPS_ALT_BP",
@ -838,6 +840,7 @@ class FRSkySPort(FRSky):
0x30: "VARIO",
0x39: "VFAS",
# 0x800: "GPS", ## comments as duplicated dictrionary key
0x050E: "RPM1",
0x34: "DOWNLINK1_ID",
0x67: "DOWNLINK2_ID",
@ -848,6 +851,7 @@ class FRSkySPort(FRSky):
self.SENSOR_ID_VARIO,
self.SENSOR_ID_FAS,
self.SENSOR_ID_GPS,
self.SENSOR_ID_RPM,
self.SENSOR_ID_SP2UR,
]
self.next_sensor_id_to_poll = 0 # offset into sensors_to_poll
@ -2644,7 +2648,7 @@ class AutoTest(ABC):
if not temp_ok:
raise NotAchievedException("target temperature")
def onboard_logging_not_log_disarmed(self):
self.set_parameter("LOG_DISARMED", 0)
self.set_parameter("LOG_FILE_DSRMROT", 0)
@ -4892,7 +4896,7 @@ Also, ignores heartbeats not from our target system'''
""" if using SITL estimates directly """
if (int(self.get_parameter('AHRS_EKF_TYPE')) == 10):
self.progress("GPS disable skipped")
return
return
# all of these must NOT be set for arming NOT to happen:
not_required_value = 0
@ -8423,6 +8427,7 @@ switch value'''
if gpi_lat == lat:
return True
return False
def tfp_prep_number(self,number,digits,power):
res = 0
abs_number = abs(number)
@ -8490,7 +8495,7 @@ switch value'''
raise NotAchievedException("Did not get HEARTBEAT message")
mav_flight_mode = heartbeat.custom_mode
self.progress(" mode=%u heartbeat=%u" % (flight_mode, mav_flight_mode))
if mav_flight_mode == flight_mode:
if mav_flight_mode == flight_mode:
self.progress("flight mode match")
return True
# FIXME: need to check other values as well
@ -8514,11 +8519,11 @@ switch value'''
return True
# FIXME: need to check other values as well
return False
def tfp_validate_home_status(self, value):
self.progress("validating home status(0x%02x)" % value)
home_dist_m = self.bit_extract(value,2,10) * (10^self.bit_extract(value,0,2))
home_alt_m = self.bit_extract(value,14,10) * (10^self.bit_extract(value,12,2)) * 0.1 * (self.bit_extract(value,24,1) == 1 and -1 or 1)
home_alt_dm = self.bit_extract(value,14,10) * (10^self.bit_extract(value,12,2)) * 0.1 * (self.bit_extract(value,24,1) == 1 and -1 or 1)
home_angle_d = self.bit_extract(value, 25, 7) * 3
gpi = self.mav.recv_match(
type='GLOBAL_POSITION_INT',
@ -8527,13 +8532,13 @@ switch value'''
)
if gpi is None:
raise NotAchievedException("Did not get GLOBAL_POSITION_INT message")
gpi_relative_alt_m = gpi.relative_alt/1000.0
self.progress("GLOBAL_POSITION_INT rel_alt==%fm frsky_home_alt==%fm" % (gpi_relative_alt_m, home_alt_m))
if abs(gpi_relative_alt_m - home_alt_m) < 1:
gpi_relative_alt_dm = gpi.relative_alt/100.0
self.progress("GLOBAL_POSITION_INT rel_alt==%fm frsky_home_alt==%fm" % (gpi_relative_alt_dm, home_alt_dm))
if abs(gpi_relative_alt_dm - home_alt_dm) < 10:
return True
# FIXME: need to check other values as well
return False
def tfp_validate_gps_status(self, value):
self.progress("validating gps status(0x%02x)" % value)
num_sats = self.bit_extract(value,0,4)
@ -8553,6 +8558,7 @@ switch value'''
return True
# FIXME: need to check other values as well
return False
def tfp_validate_vel_and_yaw(self, value): # 0x5005
self.progress("validating vel_and_yaw(0x%02x)" % value)
z_vel_dm_per_second = self.bit_extract(value,1,7) * (10^self.bit_extract(value,0,1)) * (self.bit_extract(value,8,1) == 1 and -1 or 1)
@ -8568,7 +8574,7 @@ switch value'''
self.progress(" yaw=%u gpi=%u" % (yaw, gpi.hdg*0.01))
self.progress(" xy_vel=%u" % xy_vel)
self.progress(" z_vel_dm_per_second=%u" % z_vel_dm_per_second)
if self.compare_number_percent(gpi.hdg*0.01,yaw,10):
if self.compare_number_percent(gpi.hdg*0.01,yaw,10):
self.progress("Yaw match")
return True
# FIXME: need to be under way to check the velocities, really....
@ -8588,12 +8594,13 @@ switch value'''
)
if batt is None:
raise NotAchievedException("Did not get BATTERY_STATUS message")
battery_status_value = batt.voltages[0]/1000.0
battery_status_value = batt.voltages[0]*0.001
self.progress("BATTERY_STATUS voltage==%f frsky==%f" % (battery_status_value, voltage))
if abs(battery_status_value - voltage) > 0.1:
return False
# FIXME: need to check other values as well
return True
def tfp_validate_params(self, value):
param_id = self.bit_extract(value,24,4)
param_value = self.bit_extract(value,0,24)
@ -8616,8 +8623,25 @@ switch value'''
# FIXME: need to check other values as well
return False
def tfp_validate_rpm(self, value):
self.progress("validating rpm (0x%02x)" % value)
tf_rpm = self.bit_extract(value,0,16)
rpm = self.mav.recv_match(
type='RPM',
blocking=True,
timeout=5
)
if rpm is None:
raise NotAchievedException("Did not get RPM message")
rpm_value = round(rpm.rpm1 * 0.1)
self.progress("RPM rpm==%f frsky==%f" % (rpm_value, tf_rpm))
if rpm_value != tf_rpm:
return False
return True
def test_frsky_passthrough(self):
self.set_parameter("SERIAL5_PROTOCOL", 10) # serial5 is FRSky passthrough
self.set_parameter("RPM_TYPE", 1) # enable RPM output
self.customise_SITL_commandline([
"--uartF=tcp:6735" # serial5 spews to localhost:6735
])
@ -8685,6 +8709,14 @@ switch value'''
text = ""
self.wait_ready_to_arm()
# we need to start the engine to get some RPM readings, we do it for plane only
if self.is_plane():
self.set_autodisarm_delay(0)
if not self.arm_vehicle():
raise NotAchievedException("Failed to ARM")
self.set_rc(3,1050)
self.drain_mav_unparsed()
# anything with a lambda in here needs a proper test written.
# This, at least makes sure we're getting some of each
@ -8700,6 +8732,7 @@ switch value'''
#0x5008: lambda x : True, # no second battery, so this doesn't arrive
0x5003: self.tfp_validate_battery1,
0x5007: self.tfp_validate_params,
0x500A: self.tfp_validate_rpm,
}
tstart = self.get_sim_time_cached()
while len(wants):
@ -8726,6 +8759,11 @@ switch value'''
frsky.dump_sensor_id_poll_counts_as_progress_messages()
self.progress("Counts of dataids received:")
frsky.dump_dataid_counts_as_progress_messages()
# disarm
if self.is_plane():
self.zero_throttle()
if not self.disarm_vehicle():
raise NotAchievedException("Failed to DISARM")
def decode_mavlite_param_value(self, message):
'''returns a tuple of parameter name, value'''
@ -8911,9 +8949,10 @@ switch value'''
if self.compare_number_percent(gpi_alt,alt,10):
return True
return False
def tfs_validate_baro_alt(self, value):
self.progress("validating baro altitude integer part (0x%02x)" % value)
alt = value
alt_m = value
gpi = self.mav.recv_match(
type='GLOBAL_POSITION_INT',
blocking=True,
@ -8921,11 +8960,12 @@ switch value'''
)
if gpi is None:
raise NotAchievedException("Did not get GLOBAL_POSITION_INT message")
gpi_alt = round(gpi.relative_alt)
self.progress("GLOBAL_POSITION_INT relative_alt==%f frsky==%f" % (gpi_alt, alt))
if abs(gpi_alt -alt) < 10:
gpi_alt_m = round(gpi.relative_alt*0.001)
self.progress("GLOBAL_POSITION_INT relative_alt==%f frsky==%f" % (gpi_alt_m, alt_m))
if abs(gpi_alt_m - alt_m) < 1:
return True
return False
def tfs_validate_gps_speed(self, value):
self.progress("validating gps speed integer part (0x%02x)" % value)
speed = value
@ -8941,6 +8981,7 @@ switch value'''
if self.compare_number_percent(vfr_hud_speed,speed,10):
return True
return False
def tfs_validate_yaw(self, value):
self.progress("validating yaw (0x%02x)" % value)
yaw = value
@ -8956,6 +8997,7 @@ switch value'''
if self.compare_number_percent(vfr_hud_yaw,yaw,10):
return True
return False
def tfs_validate_vspeed(self, value):
self.progress("validating vspeed (0x%02x)" % value)
vspeed = value
@ -8971,9 +9013,10 @@ switch value'''
if self.compare_number_percent(vfr_hud_vspeed,vspeed,10):
return True
return False
def tfs_validate_battery1(self, value):
self.progress("validating battery1 (0x%02x)" % value)
voltage = value/10.0
voltage = value*0.1
batt = self.mav.recv_match(
type='BATTERY_STATUS',
blocking=True,
@ -8986,10 +9029,12 @@ switch value'''
self.progress("BATTERY_STATUS volatge==%f frsky==%f" % (battery_status_value, voltage))
if self.compare_number_percent(battery_status_value,voltage,10):
return True
return False
return False
def tfs_validate_current1(self, value):
### test frsky current vs BATTERY_STATUS
self.progress("validating battery1 (0x%02x)" % value)
current = value
current = value*0.1
batt = self.mav.recv_match(
type='BATTERY_STATUS',
blocking=True,
@ -8998,11 +9043,12 @@ switch value'''
)
if batt is None:
raise NotAchievedException("Did not get BATTERY_STATUS message")
battery_status_current = batt.current_battery/100.0
battery_status_current = batt.current_battery*0.01
self.progress("BATTERY_STATUS current==%f frsky==%f" % (battery_status_current, current))
if self.compare_number_percent(battery_status_current,current,10):
return True
return False
return False
def tfs_validate_fuel(self, value):
self.progress("validating fuel (0x%02x)" % value)
fuel = value
@ -9018,7 +9064,7 @@ switch value'''
self.progress("BATTERY_STATUS fuel==%f frsky==%f" % (battery_status_fuel, fuel))
if self.compare_number_percent(battery_status_fuel,fuel,10):
return True
return False
return False
def tfs_validate_tmp1(self, value):
self.progress("validating tmp1 (0x%02x)" % value)
@ -9035,6 +9081,7 @@ switch value'''
if heartbeat_tmp1 == tmp1:
return True
return False
def tfs_validate_tmp2(self, value):
self.progress("validating tmp2 (0x%02x)" % value)
tmp2 = value
@ -9050,13 +9097,39 @@ switch value'''
if gps_raw_tmp2 == tmp2:
return True
return False
def tfs_validate_rpm(self, value):
self.progress("validating rpm (0x%02x)" % value)
tfs_rpm = value
rpm = self.mav.recv_match(
type='RPM',
blocking=True,
timeout=5
)
if rpm is None:
raise NotAchievedException("Did not get RPM message")
rpm_value = round(rpm.rpm1)
self.progress("RPM rpm==%f frsky==%f" % (rpm_value, tfs_rpm))
if rpm_value == tfs_rpm:
return True
return False
def test_frsky_sport(self):
self.set_parameter("SERIAL5_PROTOCOL", 4) # serial5 is FRSky sport
self.set_parameter("RPM_TYPE", 1) # enable RPM sensor
self.customise_SITL_commandline([
"--uartF=tcp:6735" # serial5 spews to localhost:6735
])
frsky = FRSkySPort(("127.0.0.1", 6735))
self.wait_ready_to_arm()
# we need to start the engine to get some RPM readings, we do it for plane only
if self.is_plane():
self.set_autodisarm_delay(0)
if not self.arm_vehicle():
raise NotAchievedException("Failed to ARM")
self.set_rc(3,1050)
self.drain_mav_unparsed()
# anything with a lambda in here needs a proper test written.
# This, at least makes sure we're getting some of each
@ -9076,6 +9149,7 @@ switch value'''
0x30: self.tfs_validate_vspeed, # vertical speed m/s
0x39: self.tfs_validate_battery1, # battery 1 voltage
0x800: self.tf_validate_gps, # gps lat/lon
0x50E: self.tfs_validate_rpm, # rpm 1
}
tstart = self.get_sim_time_cached()
last_wanting_print = 0
@ -9085,7 +9159,7 @@ switch value'''
self.progress("Still wanting (%s)" % ",".join([ ("0x%02x" % x) for x in wants.keys()]))
last_wanting_print = now
wants_copy = copy.copy(wants)
if now - tstart > 60:
if now - tstart > 300:
raise AutoTestTimeoutException("Failed to get frsky data")
frsky.update()
for want in wants_copy:
@ -9096,6 +9170,11 @@ switch value'''
if wants[want](data):
self.progress(" Fulfilled")
del wants[want]
# ok done, stop the engine
if self.is_plane():
self.zero_throttle()
if not self.disarm_vehicle():
raise NotAchievedException("Failed to DISARM")
def test_frsky_d(self):
self.set_parameter("SERIAL5_PROTOCOL", 3) # serial5 is FRSky output