forked from Archive/PX4-Autopilot
148 lines
6.0 KiB
Python
148 lines
6.0 KiB
Python
#! /usr/bin/env python3
|
|
"""
|
|
function collection for performing ecl ekf checks
|
|
"""
|
|
|
|
from typing import Tuple, List, Dict
|
|
|
|
|
|
def perform_ecl_ekf_checks(
|
|
metrics: Dict[str, float], sensor_checks: List[str], innov_fail_checks: List[str],
|
|
check_levels: Dict[str, float]) -> Tuple[Dict[str, str], str]:
|
|
"""
|
|
# performs the imu, sensor, amd ekf checks and calculates a master status.
|
|
:param metrics:
|
|
:param sensor_checks:
|
|
:param innov_fail_checks:
|
|
:param check_levels:
|
|
:return:
|
|
"""
|
|
|
|
imu_status = perform_imu_checks(metrics, check_levels)
|
|
|
|
sensor_status = perform_sensor_innov_checks(
|
|
metrics, sensor_checks, innov_fail_checks, check_levels)
|
|
|
|
ekf_status = dict()
|
|
ekf_status['filter_fault_status'] = 'Fail' if metrics['filter_faults_max'] > 0 else 'Pass'
|
|
|
|
# combine the status from the checks
|
|
combined_status = dict()
|
|
combined_status.update(imu_status)
|
|
combined_status.update(sensor_status)
|
|
combined_status.update(ekf_status)
|
|
if any(val == 'Fail' for val in combined_status.values()):
|
|
master_status = 'Fail'
|
|
elif any(val == 'Warning' for val in combined_status.values()):
|
|
master_status = 'Warning'
|
|
else:
|
|
master_status = 'Pass'
|
|
|
|
return combined_status, master_status
|
|
|
|
|
|
def perform_imu_checks(
|
|
imu_metrics: Dict[str, float], check_levels: Dict[str, float]) -> Dict[str, str]:
|
|
"""
|
|
performs the imu checks.
|
|
:param imu_metrics:
|
|
:param check_levels:
|
|
:return:
|
|
"""
|
|
|
|
# check for IMU sensor warnings
|
|
imu_status = dict()
|
|
|
|
# perform the vibration check
|
|
imu_status['imu_vibration_check'] = 'Pass'
|
|
for imu_vibr_metric in ['imu_coning', 'imu_hfgyro', 'imu_hfaccel']:
|
|
mean_metric = '{:s}_mean_warn'.format(imu_vibr_metric)
|
|
peak_metric = '{:s}_peak_warn'.format(imu_vibr_metric)
|
|
mean_key = '{:s}_mean'.format(imu_vibr_metric)
|
|
peak_key = '{:s}_peak'.format(imu_vibr_metric)
|
|
if mean_key in imu_metrics and peak_key in imu_metrics:
|
|
if imu_metrics[mean_key] > check_levels[mean_metric] \
|
|
or imu_metrics[peak_key] > check_levels[peak_metric]:
|
|
imu_status['imu_vibration_check'] = 'Warning'
|
|
|
|
if imu_status['imu_vibration_check'] == 'Warning':
|
|
print('IMU vibration check warning.')
|
|
|
|
# perform the imu bias check
|
|
if imu_metrics['imu_dang_bias_median'] > check_levels['imu_dang_bias_median_warn'] \
|
|
or imu_metrics['imu_dvel_bias_median'] > check_levels['imu_dvel_bias_median_warn']:
|
|
imu_status['imu_bias_check'] = 'Warning'
|
|
print('IMU bias check warning.')
|
|
else:
|
|
imu_status['imu_bias_check'] = 'Pass'
|
|
|
|
# perform output predictor
|
|
if imu_metrics['output_obs_ang_err_median'] > check_levels['obs_ang_err_median_warn'] \
|
|
or imu_metrics['output_obs_vel_err_median'] > check_levels['obs_vel_err_median_warn'] \
|
|
or imu_metrics['output_obs_pos_err_median'] > check_levels['obs_pos_err_median_warn']:
|
|
imu_status['imu_output_predictor_check'] = 'Warning'
|
|
print('IMU output predictor check warning.')
|
|
else:
|
|
imu_status['imu_output_predictor_check'] = 'Pass'
|
|
|
|
imu_status['imu_sensor_status'] = 'Warning' if any(
|
|
val == 'Warning' for val in imu_status.values()) else 'Pass'
|
|
|
|
return imu_status
|
|
|
|
|
|
def perform_sensor_innov_checks(
|
|
metrics: Dict[str, float], sensor_checks: List[str], innov_fail_checks: List[str],
|
|
check_levels: Dict[str, float]) -> Dict[str, str]:
|
|
"""
|
|
performs the sensor checks.
|
|
:param metrics:
|
|
:param sensor_checks:
|
|
:param innov_fail_checks:
|
|
:param check_levels:
|
|
:return:
|
|
"""
|
|
|
|
sensor_status = dict()
|
|
|
|
for result_id in ['hgt', 'mag', 'vel', 'pos', 'tas', 'hagl']:
|
|
|
|
# only run sensor checks, if they apply.
|
|
if result_id in sensor_checks:
|
|
if metrics['{:s}_percentage_amber'.format(result_id)] > check_levels[
|
|
'{:s}_amber_fail_pct'.format(result_id)]:
|
|
sensor_status['{:s}_sensor_status'.format(result_id)] = 'Fail'
|
|
print('{:s} sensor check failure.'.format(result_id))
|
|
elif metrics['{:s}_percentage_amber'.format(result_id)] > check_levels[
|
|
'{:s}_amber_warn_pct'.format(result_id)]:
|
|
sensor_status['{:s}_sensor_status'.format(result_id)] = 'Warning'
|
|
print('{:s} sensor check warning.'.format(result_id))
|
|
else:
|
|
sensor_status['{:s}_sensor_status'.format(result_id)] = 'Pass'
|
|
|
|
# perform innovation checks.
|
|
for signal_id, metric_name, result_id in [('posv', 'hgt_fail_percentage', 'hgt'),
|
|
('magx', 'magx_fail_percentage', 'mag'),
|
|
('magy', 'magy_fail_percentage', 'mag'),
|
|
('magz', 'magz_fail_percentage', 'mag'),
|
|
('yaw', 'yaw_fail_percentage', 'yaw'),
|
|
('velh', 'vel_fail_percentage', 'vel'),
|
|
('velv', 'vel_fail_percentage', 'vel'),
|
|
('posh', 'pos_fail_percentage', 'pos'),
|
|
('tas', 'tas_fail_percentage', 'tas'),
|
|
('hagl', 'hagl_fail_percentage', 'hagl'),
|
|
('ofx', 'ofx_fail_percentage', 'flow'),
|
|
('ofy', 'ofy_fail_percentage', 'flow')]:
|
|
|
|
# only run innov fail checks, if they apply.
|
|
if signal_id in innov_fail_checks:
|
|
|
|
if metrics[metric_name] > check_levels['{:s}_fail_pct'.format(result_id)]:
|
|
sensor_status['{:s}_sensor_status'.format(result_id)] = 'Fail'
|
|
print('{:s} sensor check failure.'.format(result_id))
|
|
else:
|
|
if not ('{:s}_sensor_status'.format(result_id) in sensor_status):
|
|
sensor_status['{:s}_sensor_status'.format(result_id)] = 'Pass'
|
|
|
|
return sensor_status
|