px4-firmware/Tools/ecl_ekf/process_logdata_ekf.py

166 lines
6.2 KiB
Python
Executable File

#! /usr/bin/env python3
from __future__ import print_function
import argparse
import os
import sys
import csv
from typing import Dict
from pyulog import ULog
from analyse_logdata_ekf import analyse_ekf
from plotting.pdf_report import create_pdf_report
from analysis.detectors import PreconditionError
"""
Performs a health assessment on the ecl EKF navigation estimator data contained in a an ULog file
Outputs a health assessment summary in a csv file named <inputfilename>.mdat.csv
Outputs summary plots in a pdf file named <inputfilename>.pdf
"""
def get_arguments():
parser = argparse.ArgumentParser(description='Analyse the estimator_status and ekf2_innovation message data')
parser.add_argument('filename', metavar='file.ulg', help='ULog input file')
parser.add_argument('--no-plots', action='store_true',
help='Whether to only analyse and not plot the summaries for developers.')
parser.add_argument('--check-level-thresholds', type=str, default=None,
help='The csv file of fail and warning test thresholds for analysis.')
parser.add_argument('--check-table', type=str, default=None,
help='The csv file with descriptions of the checks.')
parser.add_argument('--no-sensor-safety-margin', action='store_true',
help='Whether to not cut-off 5s after take-off and 5s before landing '
'(for certain sensors that might be influence by proximity to ground).')
return parser.parse_args()
def create_results_table(
check_table_filename: str, master_status: str, check_status: Dict[str, str],
metrics: Dict[str, float], airtime_info: Dict[str, float]) -> Dict[str, list]:
"""
creates the output results table
:param check_table_filename:
:param master_status:
:param check_status:
:param metrics:
:param airtime_info:
:return:
"""
try:
with open(check_table_filename, 'r') as file:
reader = csv.DictReader(file)
test_results_table = {
row['check_id']: [float('NaN'), row['check_description']] for row in reader}
print('Using test description loaded from {:s}'.format(check_table_filename))
except:
raise PreconditionError('could not find {:s}'.format(check_table_filename))
# store metrics
for key, value in metrics.items():
test_results_table[key][0] = value
# store check results
for key, value in check_status.items():
test_results_table[key][0] = value
# store check results
for key, value in test_results_table.items():
if key.endswith('_status'):
test_results_table[key][0] = str(value[0])
# store master status
test_results_table['master_status'][0] = master_status
# store take_off and landing information
test_results_table['in_air_transition_time'][0] = airtime_info['in_air_transition_time']
test_results_table['on_ground_transition_time'][0] = airtime_info['on_ground_transition_time']
return test_results_table
def process_logdata_ekf(
filename: str, check_level_dict_filename: str, check_table_filename: str,
plot: bool = True, sensor_safety_margins: bool = True):
## load the log and extract the necessary data for the analyses
try:
ulog = ULog(filename)
except:
raise PreconditionError('could not open {:s}'.format(filename))
try:
# get the dictionary of fail and warning test thresholds from a csv file
with open(check_level_dict_filename, 'r') as file:
reader = csv.DictReader(file)
check_levels = {row['check_id']: float(row['threshold']) for row in reader}
print('Using test criteria loaded from {:s}'.format(check_level_dict_filename))
except:
raise PreconditionError('could not find {:s}'.format(check_level_dict_filename))
in_air_margin = 5.0 if sensor_safety_margins else 0.0
# perform the ekf analysis
master_status, check_status, metrics, airtime_info = analyse_ekf(
ulog, check_levels, red_thresh=1.0, amb_thresh=0.5, min_flight_duration_seconds=5.0,
in_air_margin_seconds=in_air_margin)
test_results = create_results_table(
check_table_filename, master_status, check_status, metrics, airtime_info)
# write metadata to a .csv file
with open('{:s}.mdat.csv'.format(filename), "w") as file:
file.write("name,value,description\n")
# loop through the test results dictionary and write each entry on a separate row, with data comma separated
# save data in alphabetical order
key_list = list(test_results.keys())
key_list.sort()
for key in key_list:
file.write(key + "," + str(test_results[key][0]) + "," + test_results[key][1] + "\n")
print('Test results written to {:s}.mdat.csv'.format(filename))
if plot:
create_pdf_report(ulog, '{:s}.pdf'.format(filename))
print('Plots saved to {:s}.pdf'.format(filename))
return test_results
def main() -> None:
args = get_arguments()
if args.check_level_thresholds is not None:
check_level_dict_filename = args.check_level_thresholds
else:
file_dir = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__)))
check_level_dict_filename = os.path.join(file_dir, "check_level_dict.csv")
if args.check_table is not None:
check_table_filename = args.check_table
else:
file_dir = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__)))
check_table_filename = os.path.join(file_dir, "check_table.csv")
try:
test_results = process_logdata_ekf(
args.filename, check_level_dict_filename, check_table_filename,
plot=not args.no_plots, sensor_safety_margins=not args.no_sensor_safety_margin)
except Exception as e:
print(str(e))
sys.exit(-1)
# print master test status to console
if (test_results['master_status'][0] == 'Pass'):
print('No anomalies detected')
elif (test_results['master_status'][0] == 'Warning'):
print('Minor anomalies detected')
elif (test_results['master_status'][0] == 'Fail'):
print('Major anomalies detected')
sys.exit(-1)
if __name__ == '__main__':
main()