px4-firmware/Tools/ecl_ekf/analysis/checks.py

148 lines
6.0 KiB
Python

#! /usr/bin/env python3
"""
function collection for performing ecl ekf checks
"""
from typing import Tuple, List, Dict
def perform_ecl_ekf_checks(
metrics: Dict[str, float], sensor_checks: List[str], innov_fail_checks: List[str],
check_levels: Dict[str, float]) -> Tuple[Dict[str, str], str]:
"""
# performs the imu, sensor, amd ekf checks and calculates a master status.
:param metrics:
:param sensor_checks:
:param innov_fail_checks:
:param check_levels:
:return:
"""
imu_status = perform_imu_checks(metrics, check_levels)
sensor_status = perform_sensor_innov_checks(
metrics, sensor_checks, innov_fail_checks, check_levels)
ekf_status = dict()
ekf_status['filter_fault_status'] = 'Fail' if metrics['filter_faults_max'] > 0 else 'Pass'
# combine the status from the checks
combined_status = dict()
combined_status.update(imu_status)
combined_status.update(sensor_status)
combined_status.update(ekf_status)
if any(val == 'Fail' for val in combined_status.values()):
master_status = 'Fail'
elif any(val == 'Warning' for val in combined_status.values()):
master_status = 'Warning'
else:
master_status = 'Pass'
return combined_status, master_status
def perform_imu_checks(
imu_metrics: Dict[str, float], check_levels: Dict[str, float]) -> Dict[str, str]:
"""
performs the imu checks.
:param imu_metrics:
:param check_levels:
:return:
"""
# check for IMU sensor warnings
imu_status = dict()
# perform the vibration check
imu_status['imu_vibration_check'] = 'Pass'
for imu_vibr_metric in ['imu_coning', 'imu_hfgyro', 'imu_hfaccel']:
mean_metric = '{:s}_mean_warn'.format(imu_vibr_metric)
peak_metric = '{:s}_peak_warn'.format(imu_vibr_metric)
mean_key = '{:s}_mean'.format(imu_vibr_metric)
peak_key = '{:s}_peak'.format(imu_vibr_metric)
if mean_key in imu_metrics and peak_key in imu_metrics:
if imu_metrics[mean_key] > check_levels[mean_metric] \
or imu_metrics[peak_key] > check_levels[peak_metric]:
imu_status['imu_vibration_check'] = 'Warning'
if imu_status['imu_vibration_check'] == 'Warning':
print('IMU vibration check warning.')
# perform the imu bias check
if imu_metrics['imu_dang_bias_median'] > check_levels['imu_dang_bias_median_warn'] \
or imu_metrics['imu_dvel_bias_median'] > check_levels['imu_dvel_bias_median_warn']:
imu_status['imu_bias_check'] = 'Warning'
print('IMU bias check warning.')
else:
imu_status['imu_bias_check'] = 'Pass'
# perform output predictor
if imu_metrics['output_obs_ang_err_median'] > check_levels['obs_ang_err_median_warn'] \
or imu_metrics['output_obs_vel_err_median'] > check_levels['obs_vel_err_median_warn'] \
or imu_metrics['output_obs_pos_err_median'] > check_levels['obs_pos_err_median_warn']:
imu_status['imu_output_predictor_check'] = 'Warning'
print('IMU output predictor check warning.')
else:
imu_status['imu_output_predictor_check'] = 'Pass'
imu_status['imu_sensor_status'] = 'Warning' if any(
val == 'Warning' for val in imu_status.values()) else 'Pass'
return imu_status
def perform_sensor_innov_checks(
metrics: Dict[str, float], sensor_checks: List[str], innov_fail_checks: List[str],
check_levels: Dict[str, float]) -> Dict[str, str]:
"""
performs the sensor checks.
:param metrics:
:param sensor_checks:
:param innov_fail_checks:
:param check_levels:
:return:
"""
sensor_status = dict()
for result_id in ['hgt', 'mag', 'vel', 'pos', 'tas', 'hagl']:
# only run sensor checks, if they apply.
if result_id in sensor_checks:
if metrics['{:s}_percentage_amber'.format(result_id)] > check_levels[
'{:s}_amber_fail_pct'.format(result_id)]:
sensor_status['{:s}_sensor_status'.format(result_id)] = 'Fail'
print('{:s} sensor check failure.'.format(result_id))
elif metrics['{:s}_percentage_amber'.format(result_id)] > check_levels[
'{:s}_amber_warn_pct'.format(result_id)]:
sensor_status['{:s}_sensor_status'.format(result_id)] = 'Warning'
print('{:s} sensor check warning.'.format(result_id))
else:
sensor_status['{:s}_sensor_status'.format(result_id)] = 'Pass'
# perform innovation checks.
for signal_id, metric_name, result_id in [('posv', 'hgt_fail_percentage', 'hgt'),
('magx', 'magx_fail_percentage', 'mag'),
('magy', 'magy_fail_percentage', 'mag'),
('magz', 'magz_fail_percentage', 'mag'),
('yaw', 'yaw_fail_percentage', 'yaw'),
('velh', 'vel_fail_percentage', 'vel'),
('velv', 'vel_fail_percentage', 'vel'),
('posh', 'pos_fail_percentage', 'pos'),
('tas', 'tas_fail_percentage', 'tas'),
('hagl', 'hagl_fail_percentage', 'hagl'),
('ofx', 'ofx_fail_percentage', 'flow'),
('ofy', 'ofy_fail_percentage', 'flow')]:
# only run innov fail checks, if they apply.
if signal_id in innov_fail_checks:
if metrics[metric_name] > check_levels['{:s}_fail_pct'.format(result_id)]:
sensor_status['{:s}_sensor_status'.format(result_id)] = 'Fail'
print('{:s} sensor check failure.'.format(result_id))
else:
if not ('{:s}_sensor_status'.format(result_id) in sensor_status):
sensor_status['{:s}_sensor_status'.format(result_id)] = 'Pass'
return sensor_status