ekf2_post-processing: use estimator_status_flags instead of bitmasks

This commit is contained in:
bresch 2022-05-24 14:55:39 +02:00 committed by Mathieu Bresciani
parent 016b8aeb35
commit d04a91a3ae
5 changed files with 72 additions and 167 deletions

View File

@ -11,7 +11,7 @@ from pyulog import ULog
from analysis.detectors import InAirDetector, PreconditionError from analysis.detectors import InAirDetector, PreconditionError
from analysis.metrics import calculate_ecl_ekf_metrics from analysis.metrics import calculate_ecl_ekf_metrics
from analysis.checks import perform_ecl_ekf_checks from analysis.checks import perform_ecl_ekf_checks
from analysis.post_processing import get_estimator_check_flags from analysis.post_processing import get_gps_check_fail_flags
def analyse_ekf( def analyse_ekf(
ulog: ULog, check_levels: Dict[str, float], multi_instance: int = 0, ulog: ULog, check_levels: Dict[str, float], multi_instance: int = 0,
@ -40,6 +40,11 @@ def analyse_ekf(
except: except:
raise PreconditionError('could not find estimator_status instance', multi_instance) raise PreconditionError('could not find estimator_status instance', multi_instance)
try:
estimator_status_flags = ulog.get_dataset('estimator_status_flags', multi_instance).data
except:
raise PreconditionError('could not find estimator_status_flags instance', multi_instance)
try: try:
_ = ulog.get_dataset('estimator_innovations', multi_instance).data _ = ulog.get_dataset('estimator_innovations', multi_instance).data
except: except:
@ -61,14 +66,14 @@ def analyse_ekf(
'in_air_transition_time': round(in_air.take_off + in_air.log_start, 2), 'in_air_transition_time': round(in_air.take_off + in_air.log_start, 2),
'on_ground_transition_time': round(in_air.landing + in_air.log_start, 2)} 'on_ground_transition_time': round(in_air.landing + in_air.log_start, 2)}
control_mode, innov_flags, gps_fail_flags = get_estimator_check_flags(estimator_status) gps_fail_flags = get_gps_check_fail_flags(estimator_status)
sensor_checks, innov_fail_checks = find_checks_that_apply( sensor_checks, innov_fail_checks = find_checks_that_apply(
control_mode, estimator_status, estimator_status_flags, estimator_status,
pos_checks_when_sensors_not_fused=pos_checks_when_sensors_not_fused) pos_checks_when_sensors_not_fused=pos_checks_when_sensors_not_fused)
metrics = calculate_ecl_ekf_metrics( metrics = calculate_ecl_ekf_metrics(
ulog, innov_flags, innov_fail_checks, sensor_checks, in_air, in_air_no_ground_effects, ulog, estimator_status_flags, innov_fail_checks, sensor_checks, in_air, in_air_no_ground_effects,
multi_instance, red_thresh=red_thresh, amb_thresh=amb_thresh) multi_instance, red_thresh=red_thresh, amb_thresh=amb_thresh)
check_status, master_status = perform_ecl_ekf_checks( check_status, master_status = perform_ecl_ekf_checks(
@ -78,12 +83,12 @@ def analyse_ekf(
def find_checks_that_apply( def find_checks_that_apply(
control_mode: dict, estimator_status: dict, pos_checks_when_sensors_not_fused: bool = False) ->\ estimator_status_flags: dict, estimator_status: dict, pos_checks_when_sensors_not_fused: bool = False) ->\
Tuple[List[str], List[str]]: Tuple[List[str], List[str]]:
""" """
finds the checks that apply and stores them in lists for the std checks and the innovation finds the checks that apply and stores them in lists for the std checks and the innovation
fail checks. fail checks.
:param control_mode: :param estimator_status_flags:
:param estimator_status: :param estimator_status:
:param b_pos_only_when_sensors_fused: :param b_pos_only_when_sensors_fused:
:return: a tuple of two lists that contain strings for the std checks and for the innovation :return: a tuple of two lists that contain strings for the std checks and for the innovation
@ -97,7 +102,7 @@ def find_checks_that_apply(
innov_fail_checks.append('posv') innov_fail_checks.append('posv')
# Magnetometer Sensor Checks # Magnetometer Sensor Checks
if (np.amax(control_mode['yaw_aligned']) > 0.5): if (np.amax(estimator_status_flags['cs_yaw_align']) > 0.5):
sensor_checks.append('mag') sensor_checks.append('mag')
innov_fail_checks.append('magx') innov_fail_checks.append('magx')
@ -106,13 +111,14 @@ def find_checks_that_apply(
innov_fail_checks.append('yaw') innov_fail_checks.append('yaw')
# Velocity Sensor Checks # Velocity Sensor Checks
if (np.amax(control_mode['using_gps']) > 0.5): if (np.amax(estimator_status_flags['cs_gps']) > 0.5):
sensor_checks.append('vel') sensor_checks.append('vel')
innov_fail_checks.append('vel') innov_fail_checks.append('velh')
innov_fail_checks.append('velv')
# Position Sensor Checks # Position Sensor Checks
if (pos_checks_when_sensors_not_fused or (np.amax(control_mode['using_gps']) > 0.5) if (pos_checks_when_sensors_not_fused or (np.amax(estimator_status_flags['cs_gps']) > 0.5)
or (np.amax(control_mode['using_evpos']) > 0.5)): or (np.amax(estimator_status_flags['cs_ev_pos']) > 0.5)):
sensor_checks.append('pos') sensor_checks.append('pos')
innov_fail_checks.append('posh') innov_fail_checks.append('posh')
@ -128,7 +134,7 @@ def find_checks_that_apply(
innov_fail_checks.append('hagl') innov_fail_checks.append('hagl')
# optical flow sensor checks # optical flow sensor checks
if (np.amax(control_mode['using_optflow']) > 0.5): if (np.amax(estimator_status_flags['cs_opt_flow']) > 0.5):
innov_fail_checks.append('ofx') innov_fail_checks.append('ofx')
innov_fail_checks.append('ofy') innov_fail_checks.append('ofy')

View File

@ -123,7 +123,8 @@ def perform_sensor_innov_checks(
('magy', 'magy_fail_percentage', 'mag'), ('magy', 'magy_fail_percentage', 'mag'),
('magz', 'magz_fail_percentage', 'mag'), ('magz', 'magz_fail_percentage', 'mag'),
('yaw', 'yaw_fail_percentage', 'yaw'), ('yaw', 'yaw_fail_percentage', 'yaw'),
('vel', 'vel_fail_percentage', 'vel'), ('velh', 'vel_fail_percentage', 'vel'),
('velv', 'vel_fail_percentage', 'vel'),
('posh', 'pos_fail_percentage', 'pos'), ('posh', 'pos_fail_percentage', 'pos'),
('tas', 'tas_fail_percentage', 'tas'), ('tas', 'tas_fail_percentage', 'tas'),
('hagl', 'hagl_fail_percentage', 'hagl'), ('hagl', 'hagl_fail_percentage', 'hagl'),

View File

@ -11,7 +11,7 @@ import numpy as np
from analysis.detectors import InAirDetector from analysis.detectors import InAirDetector
def calculate_ecl_ekf_metrics( def calculate_ecl_ekf_metrics(
ulog: ULog, innov_flags: Dict[str, float], innov_fail_checks: List[str], ulog: ULog, estimator_status_flags: Dict[str, float], innov_fail_checks: List[str],
sensor_checks: List[str], in_air: InAirDetector, in_air_no_ground_effects: InAirDetector, sensor_checks: List[str], in_air: InAirDetector, in_air_no_ground_effects: InAirDetector,
multi_instance: int = 0, red_thresh: float = 1.0, amb_thresh: float = 0.5) -> Tuple[dict, dict, dict, dict]: multi_instance: int = 0, red_thresh: float = 1.0, amb_thresh: float = 0.5) -> Tuple[dict, dict, dict, dict]:
@ -20,7 +20,7 @@ def calculate_ecl_ekf_metrics(
red_thresh=red_thresh, amb_thresh=amb_thresh) red_thresh=red_thresh, amb_thresh=amb_thresh)
innov_fail_metrics = calculate_innov_fail_metrics( innov_fail_metrics = calculate_innov_fail_metrics(
innov_flags, innov_fail_checks, in_air, in_air_no_ground_effects) estimator_status_flags, innov_fail_checks, in_air, in_air_no_ground_effects)
imu_metrics = calculate_imu_metrics(ulog, multi_instance, in_air_no_ground_effects) imu_metrics = calculate_imu_metrics(ulog, multi_instance, in_air_no_ground_effects)
@ -90,10 +90,10 @@ def calculate_sensor_metrics(
def calculate_innov_fail_metrics( def calculate_innov_fail_metrics(
innov_flags: dict, innov_fail_checks: List[str], in_air: InAirDetector, estimator_status_flags: dict, innov_fail_checks: List[str], in_air: InAirDetector,
in_air_no_ground_effects: InAirDetector) -> dict: in_air_no_ground_effects: InAirDetector) -> dict:
""" """
:param innov_flags: :param estimator_status_flags:
:param innov_fail_checks: :param innov_fail_checks:
:param in_air: :param in_air:
:param in_air_no_ground_effects: :param in_air_no_ground_effects:
@ -103,17 +103,18 @@ def calculate_innov_fail_metrics(
innov_fail_metrics = dict() innov_fail_metrics = dict()
# calculate innovation check fail metrics # calculate innovation check fail metrics
for signal_id, signal, result in [('posv', 'posv_innov_fail', 'hgt_fail_percentage'), for signal_id, signal, result in [('posv', 'reject_ver_pos', 'hgt_fail_percentage'),
('magx', 'magx_innov_fail', 'magx_fail_percentage'), ('magx', 'reject_mag_x', 'magx_fail_percentage'),
('magy', 'magy_innov_fail', 'magy_fail_percentage'), ('magy', 'reject_mag_y', 'magy_fail_percentage'),
('magz', 'magz_innov_fail', 'magz_fail_percentage'), ('magz', 'reject_mag_z', 'magz_fail_percentage'),
('yaw', 'yaw_innov_fail', 'yaw_fail_percentage'), ('yaw', 'reject_yaw', 'yaw_fail_percentage'),
('vel', 'vel_innov_fail', 'vel_fail_percentage'), ('velh', 'reject_hor_vel', 'vel_fail_percentage'),
('posh', 'posh_innov_fail', 'pos_fail_percentage'), ('velv', 'reject_ver_vel', 'vel_fail_percentage'),
('tas', 'tas_innov_fail', 'tas_fail_percentage'), ('posh', 'reject_hor_pos', 'pos_fail_percentage'),
('hagl', 'hagl_innov_fail', 'hagl_fail_percentage'), ('tas', 'reject_airspeed', 'tas_fail_percentage'),
('ofx', 'ofx_innov_fail', 'ofx_fail_percentage'), ('hagl', 'reject_hagl', 'hagl_fail_percentage'),
('ofy', 'ofy_innov_fail', 'ofy_fail_percentage')]: ('ofx', 'reject_optflow_x', 'ofx_fail_percentage'),
('ofy', 'reject_optflow_y', 'ofy_fail_percentage')]:
# only run innov fail checks, if they apply. # only run innov fail checks, if they apply.
if signal_id in innov_fail_checks: if signal_id in innov_fail_checks:
@ -125,7 +126,7 @@ def calculate_innov_fail_metrics(
in_air_detector = in_air in_air_detector = in_air
innov_fail_metrics[result] = calculate_stat_from_signal( innov_fail_metrics[result] = calculate_stat_from_signal(
innov_flags, 'estimator_status', signal, in_air_detector, estimator_status_flags, 'estimator_status_flags', signal, in_air_detector,
lambda x: 100.0 * np.mean(x > 0.5)) lambda x: 100.0 * np.mean(x > 0.5))
return innov_fail_metrics return innov_fail_metrics
@ -152,7 +153,7 @@ def calculate_imu_metrics(ulog: ULog, multi_instance, in_air_no_ground_effects:
if vehicle_imu_status_data['accel_device_id'][0] == estimator_status_data['accel_device_id'][0]: if vehicle_imu_status_data['accel_device_id'][0] == estimator_status_data['accel_device_id'][0]:
for signal, result in [('delta_angle_coning_metric', 'imu_coning'), for signal, result in [('gyro_coning_vibration', 'imu_coning'),
('gyro_vibration_metric', 'imu_hfgyro'), ('gyro_vibration_metric', 'imu_hfgyro'),
('accel_vibration_metric', 'imu_hfaccel')]: ('accel_vibration_metric', 'imu_hfaccel')]:

View File

@ -7,115 +7,6 @@ from typing import Tuple
import numpy as np import numpy as np
def get_estimator_check_flags(estimator_status: dict) -> Tuple[dict, dict, dict]:
"""
:param estimator_status:
:return:
"""
control_mode = get_control_mode_flags(estimator_status)
innov_flags = get_innovation_check_flags(estimator_status)
gps_fail_flags = get_gps_check_fail_flags(estimator_status)
return control_mode, innov_flags, gps_fail_flags
def get_control_mode_flags(estimator_status: dict) -> dict:
"""
:param estimator_status:
:return:
"""
control_mode = dict()
# extract control mode metadata from estimator_status.control_mode_flags
# 0 - true if the filter tilt alignment is complete
# 1 - true if the filter yaw alignment is complete
# 2 - true if GPS measurements are being fused
# 3 - true if optical flow measurements are being fused
# 4 - true if a simple magnetic yaw heading is being fused
# 5 - true if 3-axis magnetometer measurement are being fused
# 6 - true if synthetic magnetic declination measurements are being fused
# 7 - true when the vehicle is airborne
# 8 - true when wind velocity is being estimated
# 9 - true when baro height is being fused as a primary height reference
# 10 - true when range finder height is being fused as a primary height reference
# 11 - true when range finder height is being fused as a primary height reference
# 12 - true when local position data from external vision is being fused
# 13 - true when yaw data from external vision measurements is being fused
# 14 - true when height data from external vision measurements is being fused
# 15 - true when synthetic sideslip measurements are being fused
# 16 - true true when the mag field does not match the expected strength
# 17 - true true when the vehicle is operating as a fixed wing vehicle
# 18 - true when the magnetometer has been declared faulty and is no longer being used
# 19 - true true when airspeed measurements are being fused
# 20 - true true when protection from ground effect induced static pressure rise is active
# 21 - true when rng data wasn't ready for more than 10s and new rng values haven't changed enough
# 22 - true when yaw (not ground course) data from a GPS receiver is being fused
# 23 - true when the in-flight mag field alignment has been completed
# 24 - true when local earth frame velocity data from external vision measurements are being fused
# 25 - true when we are using a synthesized measurement for the magnetometer Z component
control_mode['tilt_aligned'] = ((2 ** 0 & estimator_status['control_mode_flags']) > 0) * 1
control_mode['yaw_aligned'] = ((2 ** 1 & estimator_status['control_mode_flags']) > 0) * 1
control_mode['using_gps'] = ((2 ** 2 & estimator_status['control_mode_flags']) > 0) * 1
control_mode['using_optflow'] = ((2 ** 3 & estimator_status['control_mode_flags']) > 0) * 1
control_mode['using_magyaw'] = ((2 ** 4 & estimator_status['control_mode_flags']) > 0) * 1
control_mode['using_mag3d'] = ((2 ** 5 & estimator_status['control_mode_flags']) > 0) * 1
control_mode['using_magdecl'] = ((2 ** 6 & estimator_status['control_mode_flags']) > 0) * 1
control_mode['airborne'] = ((2 ** 7 & estimator_status['control_mode_flags']) > 0) * 1
control_mode['estimating_wind'] = ((2 ** 8 & estimator_status['control_mode_flags']) > 0) * 1
control_mode['using_barohgt'] = ((2 ** 9 & estimator_status['control_mode_flags']) > 0) * 1
control_mode['using_rnghgt'] = ((2 ** 10 & estimator_status['control_mode_flags']) > 0) * 1
control_mode['using_gpshgt'] = ((2 ** 11 & estimator_status['control_mode_flags']) > 0) * 1
control_mode['using_evpos'] = ((2 ** 12 & estimator_status['control_mode_flags']) > 0) * 1
control_mode['using_evyaw'] = ((2 ** 13 & estimator_status['control_mode_flags']) > 0) * 1
control_mode['using_evhgt'] = ((2 ** 14 & estimator_status['control_mode_flags']) > 0) * 1
control_mode['fuse_beta'] = ((2 ** 15 & estimator_status['control_mode_flags']) > 0) * 1
control_mode['mag_field_disturbed'] = ((2 ** 16 & estimator_status['control_mode_flags']) > 0) * 1
control_mode['fixed_wing'] = ((2 ** 17 & estimator_status['control_mode_flags']) > 0) * 1
control_mode['mag_fault'] = ((2 ** 18 & estimator_status['control_mode_flags']) > 0) * 1
control_mode['fuse_aspd'] = ((2 ** 19 & estimator_status['control_mode_flags']) > 0) * 1
control_mode['gnd_effect'] = ((2 ** 20 & estimator_status['control_mode_flags']) > 0) * 1
control_mode['rng_stuck'] = ((2 ** 21 & estimator_status['control_mode_flags']) > 0) * 1
control_mode['gps_yaw'] = ((2 ** 22 & estimator_status['control_mode_flags']) > 0) * 1
control_mode['mag_aligned_in_flight'] = ((2 ** 23 & estimator_status['control_mode_flags']) > 0) * 1
control_mode['ev_vel'] = ((2 ** 24 & estimator_status['control_mode_flags']) > 0) * 1
control_mode['synthetic_mag_z'] = ((2 ** 25 & estimator_status['control_mode_flags']) > 0) * 1
return control_mode
def get_innovation_check_flags(estimator_status: dict) -> dict:
"""
:param estimator_status:
:return:
"""
innov_flags = dict()
# innovation_check_flags summary
# 0 - true if velocity observations have been rejected
# 1 - true if horizontal position observations have been rejected
# 2 - true if true if vertical position observations have been rejected
# 3 - true if the X magnetometer observation has been rejected
# 4 - true if the Y magnetometer observation has been rejected
# 5 - true if the Z magnetometer observation has been rejected
# 6 - true if the yaw observation has been rejected
# 7 - true if the airspeed observation has been rejected
# 8 - true if synthetic sideslip observation has been rejected
# 9 - true if the height above ground observation has been rejected
# 10 - true if the X optical flow observation has been rejected
# 11 - true if the Y optical flow observation has been rejected
innov_flags['vel_innov_fail'] = ((2 ** 0 & estimator_status['innovation_check_flags']) > 0) * 1
innov_flags['posh_innov_fail'] = ((2 ** 1 & estimator_status['innovation_check_flags']) > 0) * 1
innov_flags['posv_innov_fail'] = ((2 ** 2 & estimator_status['innovation_check_flags']) > 0) * 1
innov_flags['magx_innov_fail'] = ((2 ** 3 & estimator_status['innovation_check_flags']) > 0) * 1
innov_flags['magy_innov_fail'] = ((2 ** 4 & estimator_status['innovation_check_flags']) > 0) * 1
innov_flags['magz_innov_fail'] = ((2 ** 5 & estimator_status['innovation_check_flags']) > 0) * 1
innov_flags['yaw_innov_fail'] = ((2 ** 6 & estimator_status['innovation_check_flags']) > 0) * 1
innov_flags['tas_innov_fail'] = ((2 ** 7 & estimator_status['innovation_check_flags']) > 0) * 1
innov_flags['sli_innov_fail'] = ((2 ** 8 & estimator_status['innovation_check_flags']) > 0) * 1
innov_flags['hagl_innov_fail'] = ((2 ** 9 & estimator_status['innovation_check_flags']) > 0) * 1
innov_flags['ofx_innov_fail'] = ((2 ** 10 & estimator_status['innovation_check_flags']) > 0) * 1
innov_flags['ofy_innov_fail'] = ((2 ** 11 & estimator_status['innovation_check_flags']) > 0) * 1
return innov_flags
def get_gps_check_fail_flags(estimator_status: dict) -> dict: def get_gps_check_fail_flags(estimator_status: dict) -> dict:
""" """
:param estimator_status: :param estimator_status:

View File

@ -11,7 +11,7 @@ import numpy as np
from matplotlib.backends.backend_pdf import PdfPages from matplotlib.backends.backend_pdf import PdfPages
from pyulog import ULog from pyulog import ULog
from analysis.post_processing import magnetic_field_estimates_from_states, get_estimator_check_flags from analysis.post_processing import magnetic_field_estimates_from_states, get_gps_check_fail_flags
from plotting.data_plots import TimeSeriesPlot, InnovationPlot, ControlModeSummaryPlot, \ from plotting.data_plots import TimeSeriesPlot, InnovationPlot, ControlModeSummaryPlot, \
CheckFlagsPlot CheckFlagsPlot
from analysis.detectors import PreconditionError from analysis.detectors import PreconditionError
@ -33,6 +33,11 @@ def create_pdf_report(ulog: ULog, multi_instance: int, output_plot_filename: str
except: except:
raise PreconditionError('could not find estimator_status instance', multi_instance) raise PreconditionError('could not find estimator_status instance', multi_instance)
try:
estimator_status_flags = ulog.get_dataset('estimator_status_flags', multi_instance).data
except:
raise PreconditionError('could not find estimator_status_flags instance', multi_instance)
try: try:
estimator_states = ulog.get_dataset('estimator_states', multi_instance).data estimator_states = ulog.get_dataset('estimator_states', multi_instance).data
except: except:
@ -68,12 +73,13 @@ def create_pdf_report(ulog: ULog, multi_instance: int, output_plot_filename: str
except: except:
raise PreconditionError('could not find innovation data') raise PreconditionError('could not find innovation data')
control_mode, innov_flags, gps_fail_flags = get_estimator_check_flags(estimator_status) gps_fail_flags = get_gps_check_fail_flags(estimator_status)
status_time = 1e-6 * estimator_status['timestamp'] status_time = 1e-6 * estimator_status['timestamp']
status_flags_time = 1e-6 * estimator_status_flags['timestamp']
b_finishes_in_air, b_starts_in_air, in_air_duration, in_air_transition_time, \ b_finishes_in_air, b_starts_in_air, in_air_duration, in_air_transition_time, \
on_ground_transition_time = detect_airtime(control_mode, status_time) on_ground_transition_time = detect_airtime(estimator_status_flags, status_flags_time)
with PdfPages(output_plot_filename) as pdf_pages: with PdfPages(output_plot_filename) as pdf_pages:
@ -173,9 +179,9 @@ def create_pdf_report(ulog: ULog, multi_instance: int, output_plot_filename: str
# plot control mode summary A # plot control mode summary A
data_plot = ControlModeSummaryPlot( data_plot = ControlModeSummaryPlot(
status_time, control_mode, [['tilt_aligned', 'yaw_aligned'], status_flags_time, estimator_status_flags, [['cs_tilt_align', 'cs_yaw_align'],
['using_gps', 'using_optflow', 'using_evpos'], ['using_barohgt', 'using_gpshgt', ['cs_gps', 'cs_opt_flow', 'cs_ev_pos'], ['cs_baro_hgt', 'cs_gps_hgt',
'using_rnghgt', 'using_evhgt'], ['using_magyaw', 'using_mag3d', 'using_magdecl']], 'cs_rng_hgt', 'cs_ev_hgt'], ['cs_mag_hdg', 'cs_mag_3d', 'cs_mag_dec']],
x_label='time (sec)', y_labels=['aligned', 'pos aiding', 'hgt aiding', 'mag aiding'], x_label='time (sec)', y_labels=['aligned', 'pos aiding', 'hgt aiding', 'mag aiding'],
annotation_text=[['tilt alignment', 'yaw alignment'], ['GPS aiding', 'optical flow aiding', annotation_text=[['tilt alignment', 'yaw alignment'], ['GPS aiding', 'optical flow aiding',
'external vision aiding'], ['Baro aiding', 'GPS aiding', 'rangefinder aiding', 'external vision aiding'], ['Baro aiding', 'GPS aiding', 'rangefinder aiding',
@ -188,7 +194,7 @@ def create_pdf_report(ulog: ULog, multi_instance: int, output_plot_filename: str
# plot control mode summary B # plot control mode summary B
# construct additional annotations for the airborne plot # construct additional annotations for the airborne plot
airborne_annotations = list() airborne_annotations = list()
if np.amin(np.diff(control_mode['airborne'])) > -0.5: if np.amin(np.diff(estimator_status_flags['cs_in_air'])) > -0.5:
airborne_annotations.append( airborne_annotations.append(
(on_ground_transition_time, 'air to ground transition not detected')) (on_ground_transition_time, 'air to ground transition not detected'))
else: else:
@ -197,7 +203,7 @@ def create_pdf_report(ulog: ULog, multi_instance: int, output_plot_filename: str
if in_air_duration > 0.0: if in_air_duration > 0.0:
airborne_annotations.append(((in_air_transition_time + on_ground_transition_time) / 2, airborne_annotations.append(((in_air_transition_time + on_ground_transition_time) / 2,
'duration = {:.1f} sec'.format(in_air_duration))) 'duration = {:.1f} sec'.format(in_air_duration)))
if np.amax(np.diff(control_mode['airborne'])) < 0.5: if np.amax(np.diff(estimator_status_flags['cs_in_air'])) < 0.5:
airborne_annotations.append( airborne_annotations.append(
(in_air_transition_time, 'ground to air transition not detected')) (in_air_transition_time, 'ground to air transition not detected'))
else: else:
@ -205,7 +211,7 @@ def create_pdf_report(ulog: ULog, multi_instance: int, output_plot_filename: str
(in_air_transition_time, 'in-air at {:.1f} sec'.format(in_air_transition_time))) (in_air_transition_time, 'in-air at {:.1f} sec'.format(in_air_transition_time)))
data_plot = ControlModeSummaryPlot( data_plot = ControlModeSummaryPlot(
status_time, control_mode, [['airborne'], ['estimating_wind']], status_flags_time, estimator_status_flags, [['cs_in_air'], ['cs_wind']],
x_label='time (sec)', y_labels=['airborne', 'estimating wind'], annotation_text=[[], []], x_label='time (sec)', y_labels=['airborne', 'estimating wind'], annotation_text=[[], []],
additional_annotation=[airborne_annotations, []], additional_annotation=[airborne_annotations, []],
plot_title='EKF Control Status - Figure B', pdf_handle=pdf_pages) plot_title='EKF Control Status - Figure B', pdf_handle=pdf_pages)
@ -214,15 +220,15 @@ def create_pdf_report(ulog: ULog, multi_instance: int, output_plot_filename: str
# plot innovation_check_flags summary # plot innovation_check_flags summary
data_plot = CheckFlagsPlot( data_plot = CheckFlagsPlot(
status_time, innov_flags, [['vel_innov_fail', 'posh_innov_fail'], ['posv_innov_fail', status_flags_time, estimator_status_flags, [['reject_hor_vel', 'reject_hor_pos'], ['reject_ver_vel', 'reject_ver_pos',
'hagl_innov_fail'], 'reject_hagl'],
['magx_innov_fail', 'magy_innov_fail', 'magz_innov_fail', ['reject_mag_x', 'reject_mag_y', 'reject_mag_z',
'yaw_innov_fail'], ['tas_innov_fail'], ['sli_innov_fail'], 'reject_yaw'], ['reject_airspeed'], ['reject_sideslip'],
['ofx_innov_fail', ['reject_optflow_x',
'ofy_innov_fail']], x_label='time (sec)', 'reject_optflow_y']], x_label='time (sec)',
y_labels=['failed', 'failed', 'failed', 'failed', 'failed', 'failed'], y_labels=['failed', 'failed', 'failed', 'failed', 'failed', 'failed'],
y_lim=(-0.1, 1.1), y_lim=(-0.1, 1.1),
legend=[['vel NED', 'pos NE'], ['hgt absolute', 'hgt above ground'], legend=[['vel NE', 'pos NE'], ['vel D', 'hgt absolute', 'hgt above ground'],
['mag_x', 'mag_y', 'mag_z', 'yaw'], ['airspeed'], ['sideslip'], ['mag_x', 'mag_y', 'mag_z', 'yaw'], ['airspeed'], ['sideslip'],
['flow X', 'flow Y']], ['flow X', 'flow Y']],
plot_title='EKF Innovation Test Fails', annotate=False, pdf_handle=pdf_pages) plot_title='EKF Innovation Test Fails', annotate=False, pdf_handle=pdf_pages)
@ -344,33 +350,33 @@ def create_pdf_report(ulog: ULog, multi_instance: int, output_plot_filename: str
data_plot.close() data_plot.close()
def detect_airtime(control_mode, status_time): def detect_airtime(estimator_status_flags, status_flags_time):
# define flags for starting and finishing in air # define flags for starting and finishing in air
b_starts_in_air = False b_starts_in_air = False
b_finishes_in_air = False b_finishes_in_air = False
# calculate in-air transition time # calculate in-air transition time
if (np.amin(control_mode['airborne']) < 0.5) and (np.amax(control_mode['airborne']) > 0.5): if (np.amin(estimator_status_flags['cs_in_air']) < 0.5) and (np.amax(estimator_status_flags['cs_in_air']) > 0.5):
in_air_transtion_time_arg = np.argmax(np.diff(control_mode['airborne'])) in_air_transtion_time_arg = np.argmax(np.diff(estimator_status_flags['cs_in_air']))
in_air_transition_time = status_time[in_air_transtion_time_arg] in_air_transition_time = status_flags_time[in_air_transtion_time_arg]
elif (np.amax(control_mode['airborne']) > 0.5): elif (np.amax(estimator_status_flags['cs_in_air']) > 0.5):
in_air_transition_time = np.amin(status_time) in_air_transition_time = np.amin(status_flags_time)
print('log starts while in-air at ' + str(round(in_air_transition_time, 1)) + ' sec') print('log starts while in-air at ' + str(round(in_air_transition_time, 1)) + ' sec')
b_starts_in_air = True b_starts_in_air = True
else: else:
in_air_transition_time = float('NaN') in_air_transition_time = float('NaN')
print('always on ground') print('always on ground')
# calculate on-ground transition time # calculate on-ground transition time
if (np.amin(np.diff(control_mode['airborne'])) < 0.0): if (np.amin(np.diff(estimator_status_flags['cs_in_air'])) < 0.0):
on_ground_transition_time_arg = np.argmin(np.diff(control_mode['airborne'])) on_ground_transition_time_arg = np.argmin(np.diff(estimator_status_flags['cs_in_air']))
on_ground_transition_time = status_time[on_ground_transition_time_arg] on_ground_transition_time = status_flags_time[on_ground_transition_time_arg]
elif (np.amax(control_mode['airborne']) > 0.5): elif (np.amax(estimator_status_flags['cs_in_air']) > 0.5):
on_ground_transition_time = np.amax(status_time) on_ground_transition_time = np.amax(status_flags_time)
print('log finishes while in-air at ' + str(round(on_ground_transition_time, 1)) + ' sec') print('log finishes while in-air at ' + str(round(on_ground_transition_time, 1)) + ' sec')
b_finishes_in_air = True b_finishes_in_air = True
else: else:
on_ground_transition_time = float('NaN') on_ground_transition_time = float('NaN')
print('always on ground') print('always on ground')
if (np.amax(np.diff(control_mode['airborne'])) > 0.5) and (np.amin(np.diff(control_mode['airborne'])) < -0.5): if (np.amax(np.diff(estimator_status_flags['cs_in_air'])) > 0.5) and (np.amin(np.diff(estimator_status_flags['cs_in_air'])) < -0.5):
if ((on_ground_transition_time - in_air_transition_time) > 0.0): if ((on_ground_transition_time - in_air_transition_time) > 0.0):
in_air_duration = on_ground_transition_time - in_air_transition_time in_air_duration = on_ground_transition_time - in_air_transition_time
else: else: