Tools/LogAnalyzer: pass flake8

`TestDualGyroDrift.py` ignored because it is mostly commented out code
This commit is contained in:
Luiz Georg 2022-07-18 10:52:56 -03:00 committed by Peter Barker
parent da0c365f3d
commit d76f3d71ae
21 changed files with 155 additions and 103 deletions

View File

@ -4,6 +4,8 @@
# Initial code by Andrew Chapman (amchapman@gmail.com), 16th Jan 2014
#
# AP_FLAKE8_CLEAN
from __future__ import print_function
import bisect
@ -31,8 +33,11 @@ class Format(object):
@staticmethod
def trycastToFormatType(value, valueType):
'''using format characters from libraries/DataFlash/DataFlash.h to cast strings to basic python int/float/string types
tries a cast, if it does not work, well, acceptable as the text logs do not match the format, e.g. MODE is expected to be int'''
"""
Using format characters from libraries/DataFlash/DataFlash.h to cast strings to basic python int/float/string
types tries a cast, if it does not work, well, acceptable as the text logs do not match the format, e.g. MODE is
expected to be int
"""
try:
if valueType in "fcCeELd":
return float(value)
@ -78,7 +83,6 @@ class Format(object):
def init(a, *x):
if len(x) != len(a.labels):
raise ValueError("Invalid Length")
# print(list(zip(a.labels, x)))
for (l, v) in zip(a.labels, x):
try:
setattr(a, l, v)
@ -90,7 +94,6 @@ class Format(object):
# finally, create the class
cls = type('Log__{:s}'.format(self.name), (object,), members)
# print(members)
return cls
@ -213,8 +216,6 @@ class BinaryFormat(ctypes.LittleEndianStructure):
if ctypes.sizeof(cls) != cls.SIZE:
print("size mismatch for {} expected {} got {}".format(cls, ctypes.sizeof(cls), cls.SIZE), file=sys.stderr)
# for i in cls.labels:
# print("{} = {}".format(i,getattr(cls,'_'+i)))
return None
return cls
@ -230,12 +231,9 @@ class Channel(object):
# TODO: store data as a scipy spline curve so we can more easily interpolate and sample the slope?
def __init__(self):
self.dictData = (
{}
) # dict of linenum->value # store dupe data in dict and list for now, until we decide which is the better way to go
self.listData = (
[]
) # list of (linenum,value) # store dupe data in dict and list for now, until we decide which is the better way to go
# store dupe data in dict and list for now, until we decide which is the better way to go
self.dictData = {} # dict of linenum->value
self.listData = [] # list of (linenum,value)
def getSegment(self, startLine, endLine):
'''returns a segment of this data (from startLine to endLine, inclusive) as a new Channel instance'''
@ -257,7 +255,6 @@ class Channel(object):
index = bisect.bisect_left(self.listData, (lineNumber, -99999))
while index < len(self.listData):
line = self.listData[index][0]
# print("Looking forwards for nearest value to line number %d, starting at line %d" % (lineNumber,line)) # TEMP
if line >= lineNumber:
return (self.listData[index][1], line)
index += 1
@ -268,14 +265,16 @@ class Channel(object):
index = bisect.bisect_left(self.listData, (lineNumber, -99999)) - 1
while index >= 0:
line = self.listData[index][0]
# print("Looking backwards for nearest value to line number %d, starting at line %d" % (lineNumber,line)) # TEMP
if line <= lineNumber:
return (self.listData[index][1], line)
index -= 1
raise ValueError("Error finding nearest value for line %d" % lineNumber)
def getNearestValue(self, lineNumber, lookForwards=True):
'''find the nearest data value to the given lineNumber, defaults to first looking forwards. Returns (value,lineNumber)'''
"""
Find the nearest data value to the given lineNumber, defaults to first looking forwards.
Returns (value,lineNumber)
"""
if lookForwards:
try:
return self.getNearestValueFwd(lineNumber)
@ -299,8 +298,6 @@ class Channel(object):
def getIndexOf(self, lineNumber):
'''returns the index within this channel's listData of the given lineNumber, or raises an Exception if not found'''
index = bisect.bisect_left(self.listData, (lineNumber, -99999))
# print("INDEX of line %d: %d" % (lineNumber,index))
# print("self.listData[index][0]: %d" % self.listData[index][0])
if self.listData[index][0] == lineNumber:
return index
else:
@ -308,9 +305,13 @@ class Channel(object):
class LogIterator:
'''Smart iterator that can move through a log by line number and maintain an index into the nearest values of all data channels'''
"""
Smart iterator that can move through a log by line number and maintain an index into the nearest values of all data
channels
"""
# TODO: LogIterator currently indexes the next available value rather than the nearest value, we should make it configurable between next/nearest
# TODO: LogIterator currently indexes the next available value rather than the nearest value, we should make it
# configurable between next/nearest
class LogIteratorSubValue:
'''syntactic sugar to allow access by LogIterator[lineLabel][dataLabel]'''
@ -355,7 +356,7 @@ class LogIterator:
# check if the currentLine has gone past our the line we're pointing to for this type of data
dataLabel = self.logdata.formats[lineLabel].labels[0]
(index, lineNumber) = self.iterators[lineLabel]
# if so, and it is not the last entry in the log, then increment the indices for all dataLabels under that lineLabel
# if so, and it is not the last entry in the log, increment the indices for dataLabels under that lineLabel
if (self.currentLine > lineNumber) and (
index < len(self.logdata.channels[lineLabel][dataLabel].listData) - 1
):
@ -379,7 +380,7 @@ class DataflashLogHelper:
@staticmethod
def getTimeAtLine(logdata, lineNumber):
'''returns the nearest GPS timestamp in milliseconds after the given line number'''
if not "GPS" in logdata.channels:
if "GPS" not in logdata.channels:
raise Exception("no GPS log data found")
# older logs use 'TIme', newer logs use 'TimeMS'
# even newer logs use TimeUS
@ -400,7 +401,10 @@ class DataflashLogHelper:
@staticmethod
def findLoiterChunks(logdata, minLengthSeconds=0, noRCInputs=True):
'''returns a list of (to,from) pairs defining sections of the log which are in loiter mode. Ordered from longest to shortest in time. If noRCInputs == True it only returns chunks with no control inputs'''
"""
Returns a list of (to, from) pairs defining sections of the log which are in loiter mode, ordered from longest
to shortest in time. If `noRCInputs == True` it only returns chunks with no control inputs
"""
# TODO: implement noRCInputs handling when identifying stable loiter chunks, for now we're ignoring it
def chunkSizeCompare(chunk1, chunk2):
@ -430,9 +434,7 @@ class DataflashLogHelper:
) / 1000.0
if chunkTimeSeconds > minLengthSeconds:
chunks.append((startLine, endLine))
# print("LOITER chunk: %d to %d, %d lines" % (startLine,endLine,endLine-startLine+1))
# print(" (time %d to %d, %d seconds)" % (DataflashLogHelper.getTimeAtLine(logdata,startLine), DataflashLogHelper.getTimeAtLine(logdata,endLine), chunkTimeSeconds))
chunks.sort(chunkSizeCompare)
chunks.sort(key=lambda chunk: chunk[1] - chunk[0])
return chunks
@staticmethod
@ -445,7 +447,7 @@ class DataflashLogHelper:
if "CTUN" in logdata.channels:
try:
maxThrottle = logdata.channels["CTUN"]["ThrOut"].max()
except KeyError as e:
except KeyError:
# ThrOut was shorted to ThO at some stage...
maxThrottle = logdata.channels["CTUN"]["ThO"].max()
# at roughly the same time ThO became a range from 0 to 1
@ -456,7 +458,10 @@ class DataflashLogHelper:
class DataflashLog(object):
'''ArduPilot Dataflash log file reader and container class. Keep this simple, add more advanced or specific functions to DataflashLogHelper class'''
"""
ArduPilot Dataflash log file reader and container class. Keep this simple, add more advanced or specific functions
to DataflashLogHelper class
"""
knownHardwareTypes = ["APM", "PX4", "MPNG"]
@ -539,7 +544,6 @@ class DataflashLog(object):
elif format == 'auto':
if self.filename == '<stdin>':
# assuming TXT format
# raise ValueError("Invalid log format for stdin: {}".format(format))
head = ""
else:
head = f.read(4)
@ -620,7 +624,7 @@ class DataflashLog(object):
else:
# assume it has ModeNum:
self.modeChanges[lineNumber] = (modes[int(e.Mode)], e.ModeNum)
except ValueError as x:
except ValueError:
if hasattr(e, 'ThrCrs'):
self.modeChanges[lineNumber] = (e.Mode, e.ThrCrs)
else:
@ -687,7 +691,7 @@ class DataflashLog(object):
groupName = e.NAME
# first time seeing this type of log line, create the channel storage
if not groupName in self.channels:
if groupName not in self.channels:
self.channels[groupName] = {}
for label in e.labels:
self.channels[groupName][label] = Channel()
@ -708,7 +712,6 @@ class DataflashLog(object):
lineNumber = lineNumber + 1
numBytes += len(line) + 1
try:
# print("Reading line: %d" % lineNumber)
line = line.strip('\n\r')
tokens = line.split(', ')
# first handle the log header lines
@ -727,7 +730,8 @@ class DataflashLog(object):
elif len(tokens2) == 3 and tokens2[0] == "Free" and tokens2[1] == "RAM:":
self.freeRAM = int(tokens2[2])
elif tokens2[0] in knownHardwareTypes:
self.hardwareType = line # not sure if we can parse this more usefully, for now only need to report it back verbatim
# not sure if we can parse this more usefully, for now only need to report it back verbatim
self.hardwareType = line
elif (len(tokens2) == 2 or len(tokens2) == 3) and tokens2[1][
0
].lower() == "v": # e.g. ArduCopter V3.1 (5c6503e2)
@ -777,7 +781,7 @@ class DataflashLog(object):
while len(data) > offset + ctypes.sizeof(logheader):
h = logheader.from_buffer(data, offset)
if not (h.head1 == 0xA3 and h.head2 == 0x95):
if ignoreBadlines == False:
if ignoreBadlines is False:
raise ValueError(h)
else:
if h.head1 == 0xFF and h.head2 == 0xFF and h.msgid == 0xFF:

View File

@ -5,13 +5,15 @@
# Initial code by Andrew Chapman (amchapman@gmail.com), 16th Jan 2014
#
# AP_FLAKE8_CLEAN
# some logging oddities noticed while doing this, to be followed up on:
# - tradheli MOT labels Mot1,Mot2,Mot3,Mot4,GGain
# - Pixhawk doesn't output one of the FMT labels... forget which one
# - MAG offsets seem to be constant (only seen data on Pixhawk)
# - MAG offsets seem to be cast to int before being output? (param is -84.67, logged as -84)
# - copter+plane use 'V' in their vehicle type/version/build line, rover uses lower case 'v'. Copter+Rover give a build number, plane does not
# - copter+plane use 'V' in their vehicle type/version/build line, rover uses lower case 'v'.
# Copter+Rover give a build number, plane does not
# - CTUN.ThrOut on copter is 0-1000, on plane+rover it is 0-100
# TODO: add test for noisy baro values
@ -25,7 +27,6 @@ import glob
import imp
import inspect
import os
import pprint # temp
import sys
import time
from xml.sax.saxutils import escape
@ -38,7 +39,8 @@ class TestResult(object):
'''all tests return a standardized result type'''
class StatusType:
# NA means not applicable for this log (e.g. copter tests against a plane log), UNKNOWN means it is missing data required for the test
# NA means not applicable for this log (e.g. copter tests against a plane log)
# UNKNOWN means it is missing data required for the test
GOOD, FAIL, WARN, UNKNOWN, NA = range(5)
status = None
@ -46,7 +48,10 @@ class TestResult(object):
class Test(object):
'''base class to be inherited by log tests. Each test should be quite granular so we have lots of small tests with clear results'''
"""
Base class to be inherited by log tests. Each test should be quite granular so we have lots of small tests with
clear results
"""
def __init__(self):
self.name = ""
@ -140,7 +145,8 @@ class TestSuite(object):
print('\n')
print(
'The Log Analyzer is currently BETA code.\nFor any support or feedback on the log analyzer please email Andrew Chapman (amchapman@gmail.com)'
"The Log Analyzer is currently BETA code."
"\nFor any support or feedback on the log analyzer please email Andrew Chapman (amchapman@gmail.com)"
)
print('\n')
@ -218,8 +224,6 @@ class TestSuite(object):
def main():
dirName = os.path.dirname(os.path.abspath(__file__))
# deal with command line arguments
parser = argparse.ArgumentParser(description='Analyze an APM Dataflash log for known issues')
parser.add_argument('logfile', type=argparse.FileType('r'), help='path to Dataflash log file (or - for stdin)')

View File

@ -5,6 +5,8 @@
#
#
# AP_FLAKE8_CLEAN
# TODO: implement more unit+regression tests
from __future__ import print_function
@ -397,6 +399,6 @@ try:
print("All unit/regression tests GOOD\n")
except Exception as e:
except Exception:
print("Error found: " + traceback.format_exc())
print("UNIT TEST FAILED\n")

View File

@ -1,3 +1,6 @@
# AP_FLAKE8_CLEAN
class VehicleType:
Plane = 17
Copter = 23

View File

@ -1,4 +1,5 @@
import DataflashLog
# AP_FLAKE8_CLEAN
from LogAnalyzer import Test, TestResult
from VehicleType import VehicleType
@ -70,7 +71,7 @@ class TestAutotune(Test):
for i in ['EV', 'ATDE', 'ATUN']:
r = False
if not i in logdata.channels:
if i not in logdata.channels:
self.result.status = TestResult.StatusType.UNKNOWN
self.result.statusMessage = "No {} log data".format(i)
r = True
@ -126,7 +127,9 @@ class TestAutotune(Test):
for key in logdata.channels['ATUN']:
setattr(atun, key, logdata.channels['ATUN'][key].getNearestValueFwd(linenext)[0])
linenext = logdata.channels['ATUN'][key].getNearestValueFwd(linenext)[1] + 1
self.result.statusMessage += 'ATUN Axis:{atun.Axis} TuneStep:{atun.TuneStep} RateMin:{atun.RateMin:5.0f} RateMax:{atun.RateMax:5.0f} RPGain:{atun.RPGain:1.4f} RDGain:{atun.RDGain:1.4f} SPGain:{atun.SPGain:1.1f} (@line:{l})\n'.format(
l=linenext, s=s, atun=atun
)
self.result.statusMessage += (
"ATUN Axis:{atun.Axis} TuneStep:{atun.TuneStep} RateMin:{atun.RateMin:5.0f}"
" RateMax:{atun.RateMax:5.0f} RPGain:{atun.RPGain:1.4f} RDGain:{atun.RDGain:1.4f}"
" SPGain:{atun.SPGain:1.1f} (@line:{l})\n"
).format(l=linenext, atun=atun)
self.result.statusMessage += '\n'

View File

@ -1,6 +1,5 @@
import collections
# AP_FLAKE8_CLEAN
import DataflashLog
from LogAnalyzer import Test, TestResult
@ -19,7 +18,8 @@ class TestBrownout(Test):
# FIXME: cope with LOG_ARM_DISARM_MSG message
if "EV" in logdata.channels:
# step through the arm/disarm events in order, to see if they're symmetrical
# note: it seems landing detection isn't robust enough to rely upon here, so we'll only consider arm+disarm, not takeoff+land
# note: it seems landing detection isn't robust enough to rely upon here, so we'll only consider arm+disarm,
# not takeoff+land
for line, ev in logdata.channels["EV"]["Id"].listData:
if ev == 10:
isArmed = True

View File

@ -1,7 +1,9 @@
# AP_FLAKE8_CLEAN
import math
from functools import reduce
import DataflashLog
from LogAnalyzer import Test, TestResult

View File

@ -1,6 +1,8 @@
# AP_FLAKE8_CLEAN
from __future__ import print_function
import DataflashLog
from LogAnalyzer import Test, TestResult
@ -25,12 +27,10 @@ class TestDupeLogData(Test):
# c
data = logdata.channels["ATT"]["Pitch"].listData
for i in range(sampleStartIndex, len(data)):
# print("Checking against index %d" % i)
if i == sampleStartIndex:
continue # skip matching against ourselves
j = 0
while j < 20 and (i + j) < len(data) and data[i + j][1] == sample[j][1]:
# print("### Match found, j=%d, data=%f, sample=%f, log data matched to sample at line %d" % (j,data[i+j][1],sample[j][1],data[i+j][0]))
j += 1
if j == 20: # all samples match
return data[i][0]
@ -41,7 +41,8 @@ class TestDupeLogData(Test):
self.result = TestResult()
self.result.status = TestResult.StatusType.GOOD
# this could be made more flexible by not hard-coding to use ATT data, could make it dynamic based on whatever is available as long as it is highly variable
# this could be made more flexible by not hard-coding to use ATT data, could make it dynamic based on whatever
# is available as long as it is highly variable
if "ATT" not in logdata.channels:
self.result.status = TestResult.StatusType.UNKNOWN
self.result.statusMessage = "No ATT log data"
@ -49,22 +50,18 @@ class TestDupeLogData(Test):
# pick 10 sample points within the range of ATT data we have
sampleStartIndices = []
attStartIndex = 0
attEndIndex = len(logdata.channels["ATT"]["Pitch"].listData) - 1
step = int(attEndIndex / 11)
for i in range(step, attEndIndex - step, step):
sampleStartIndices.append(i)
# print("Dupe data sample point index %d at line %d" % (i, logdata.channels["ATT"]["Pitch"].listData[i][0]))
# get 20 datapoints of pitch from each sample location and check for a match elsewhere
sampleIndex = 0
for i in range(sampleStartIndices[0], len(logdata.channels["ATT"]["Pitch"].listData)):
if i == sampleStartIndices[sampleIndex]:
# print("Checking sample %d" % i)
sample = logdata.channels["ATT"]["Pitch"].listData[i : i + 20]
matchedLine = self.__matchSample(sample, i, logdata)
if matchedLine:
# print("Data from line %d found duplicated at line %d" % (sample[0][0],matchedLine))
self.result.status = TestResult.StatusType.FAIL
self.result.statusMessage = "Duplicate data chunks found in log (%d and %d)" % (
sample[0][0],

View File

@ -1,3 +1,6 @@
# AP_FLAKE8_CLEAN
import DataflashLog
from LogAnalyzer import Test, TestResult

View File

@ -1,4 +1,5 @@
import DataflashLog
# AP_FLAKE8_CLEAN
from LogAnalyzer import Test, TestResult

View File

@ -1,3 +1,6 @@
# AP_FLAKE8_CLEAN
from LogAnalyzer import Test, TestResult

View File

@ -1,8 +1,10 @@
# AP_FLAKE8_CLEAN
from __future__ import print_function
from math import sqrt
import DataflashLog
from LogAnalyzer import Test, TestResult
@ -23,12 +25,12 @@ class TestIMUMatch(Test):
self.result = TestResult()
self.result.status = TestResult.StatusType.GOOD
if ("IMU" in logdata.channels) and (not "IMU2" in logdata.channels):
if ("IMU" in logdata.channels) and ("IMU2" not in logdata.channels):
self.result.status = TestResult.StatusType.NA
self.result.statusMessage = "No IMU2"
return
if (not "IMU" in logdata.channels) or (not "IMU2" in logdata.channels):
if ("IMU" not in logdata.channels) or ("IMU2" not in logdata.channels):
self.result.status = TestResult.StatusType.UNKNOWN
self.result.statusMessage = "No IMU log data"
return
@ -115,7 +117,6 @@ class TestIMUMatch(Test):
diff_filtered = sqrt(xdiff_filtered**2 + ydiff_filtered**2 + zdiff_filtered**2)
max_diff_filtered = max(max_diff_filtered, diff_filtered)
# print(max_diff_filtered)
last_t = t
if max_diff_filtered > fail_threshold:

View File

@ -1,4 +1,5 @@
import DataflashLog
# AP_FLAKE8_CLEAN
from LogAnalyzer import Test, TestResult
from VehicleType import VehicleType
@ -19,7 +20,7 @@ class TestBalanceTwist(Test):
return
self.result.status = TestResult.StatusType.UNKNOWN
if not "RCOU" in logdata.channels:
if "RCOU" not in logdata.channels:
return
ch = []
@ -50,7 +51,7 @@ class TestBalanceTwist(Test):
/ (logdata.parameters["RC3_MAX"] - logdata.parameters["RC3_MIN"])
/ 1000.0
)
except KeyError as e:
except KeyError:
min_throttle = (
logdata.parameters["MOT_PWM_MIN"]
/ (logdata.parameters["MOT_PWM_MAX"] - logdata.parameters["RC3_MIN"])

View File

@ -1,3 +1,6 @@
# AP_FLAKE8_CLEAN
import math
from LogAnalyzer import Test, TestResult
@ -36,5 +39,5 @@ class TestNaN(Test):
field,
)
raise ValueError()
except ValueError as e:
except ValueError:
continue

View File

@ -1,6 +1,8 @@
# AP_FLAKE8_CLEAN
from math import sqrt
import DataflashLog
import matplotlib.pyplot as plt
import numpy as np
from LogAnalyzer import Test, TestResult
@ -53,8 +55,12 @@ class TestFlow(Test):
2.0 # if the gyro rate is greter than this, the data will not be used by the curve fit (rad/sec)
)
param_std_threshold = 5.0 # maximum allowable 1-std uncertainty in scaling parameter (scale factor * 1000)
param_abs_threshold = 200 # max/min allowable scale factor parameter. Values of FLOW_FXSCALER and FLOW_FYSCALER outside the range of +-param_abs_threshold indicate a sensor configuration problem.
min_num_points = 100 # minimum number of points required for a curve fit - this is necessary, but not sufficient condition - the standard deviation estimate of the fit gradient is also important.
# max/min allowable scale factor parameter. Values of FLOW_FXSCALER and FLOW_FYSCALER outside the range
# of +-param_abs_threshold indicate a sensor configuration problem.
param_abs_threshold = 200
# minimum number of points required for a curve fit - this is necessary, but not sufficient condition - the
# standard deviation estimate of the fit gradient is also important.
min_num_points = 100
# get the existing scale parameters
flow_fxscaler = logdata.parameters["FLOW_FXSCALER"]
@ -204,7 +210,8 @@ class TestFlow(Test):
bodyY_resampled.append(bodyY[i])
flowY_time_us_resampled.append(flow_time_us[i])
# fit a straight line to the flow vs body rate data and calculate the scale factor parameter required to achieve a slope of 1
# fit a straight line to the flow vs body rate data and calculate the scale factor parameter required to
# achieve a slope of 1
coef_flow_x, cov_x = np.polyfit(
bodyX_resampled, flowX_resampled, 1, rcond=None, full=False, w=None, cov=True
)
@ -212,7 +219,8 @@ class TestFlow(Test):
bodyY_resampled, flowY_resampled, 1, rcond=None, full=False, w=None, cov=True
)
# taking the exisiting scale factor parameters into account, calculate the parameter values reequired to achieve a unity slope
# taking the exisiting scale factor parameters into account, calculate the parameter values reequired to
# achieve a unity slope
flow_fxscaler_new = int(1000 * (((1 + 0.001 * float(flow_fxscaler)) / coef_flow_x[0] - 1)))
flow_fyscaler_new = int(1000 * (((1 + 0.001 * float(flow_fyscaler)) / coef_flow_y[0] - 1)))
@ -220,8 +228,9 @@ class TestFlow(Test):
if sqrt(cov_x[0][0]) > param_std_threshold or sqrt(cov_y[0][0]) > param_std_threshold:
FAIL()
self.result.statusMessage = (
"FAIL: inaccurate fit - poor quality or insufficient data\nFLOW_FXSCALER 1STD = %u\nFLOW_FYSCALER 1STD = %u\n"
% (round(1000 * sqrt(cov_x[0][0])), round(1000 * sqrt(cov_y[0][0])))
"FAIL: inaccurate fit - poor quality or insufficient data"
"\nFLOW_FXSCALER 1STD = %u"
"\nFLOW_FYSCALER 1STD = %u\n" % (round(1000 * sqrt(cov_x[0][0])), round(1000 * sqrt(cov_y[0][0])))
)
# Do a sanity check on the scale factors
@ -234,7 +243,12 @@ class TestFlow(Test):
# display recommended scale factors
self.result.statusMessage = (
"Set FLOW_FXSCALER to %i\nSet FLOW_FYSCALER to %i\n\nCal plots saved to flow_calibration.pdf\nCal parameters saved to flow_calibration.param\n\nFLOW_FXSCALER 1STD = %u\nFLOW_FYSCALER 1STD = %u\n"
"Set FLOW_FXSCALER to %i"
"\nSet FLOW_FYSCALER to %i"
"\n\nCal plots saved to flow_calibration.pdf"
"\nCal parameters saved to flow_calibration.param"
"\n\nFLOW_FXSCALER 1STD = %u"
"\nFLOW_FYSCALER 1STD = %u\n"
% (
flow_fxscaler_new,
flow_fyscaler_new,

View File

@ -1,6 +1,8 @@
# AP_FLAKE8_CLEAN
import math # for isnan()
import DataflashLog
from LogAnalyzer import Test, TestResult
from VehicleType import VehicleType
@ -54,8 +56,10 @@ class TestParams(Test):
self.result.statusMessage = self.result.statusMessage + name + " is NaN\n"
try:
# add parameter checks below using the helper functions, any failures will trigger a FAIL status and accumulate info in statusMessage
# if more complex checking or correlations are required you can access parameter values directly using the logdata.parameters[paramName] dict
# add parameter checks below using the helper functions, any failures will trigger a FAIL status and
# accumulate info in statusMessage.
# If more complex checking or correlations are required you can access parameter values directly using the
# logdata.parameters[paramName] dict
if logdata.vehicleType == VehicleType.Copter:
self.__checkParamIsEqual("MAG_ENABLE", 1, logdata)
if "THR_MIN" in logdata.parameters:

View File

@ -1,6 +1,8 @@
# AP_FLAKE8_CLEAN
from __future__ import print_function
import DataflashLog
from LogAnalyzer import Test, TestResult
from VehicleType import VehicleType
@ -21,7 +23,8 @@ class TestPerformance(Test):
self.result.status = TestResult.StatusType.NA
return
# NOTE: we'll ignore MaxT altogether for now, it seems there are quite regularly one or two high values in there, even ignoring the ones expected after arm/disarm events
# NOTE: we'll ignore MaxT altogether for now, it seems there are quite regularly one or two high values in
# there, even ignoring the ones expected after arm/disarm events.
# gather info on arm/disarm lines, we will ignore the MaxT data from the first line found after each of these
# armingLines = []
# for line,ev in logdata.channels["EV"]["Id"].listData:
@ -32,7 +35,6 @@ class TestPerformance(Test):
# if not armingLines:
# break
# if maxT[0] > armingLines[0]:
# #print("Ignoring maxT from line %d, as it is the first PM line after arming on line %d" % (maxT[0],armingLines[0]))
# ignoreMaxTLines.append(maxT[0])
# armingLines.pop(0)
@ -55,8 +57,6 @@ class TestPerformance(Test):
if percentSlow > maxPercentSlow:
maxPercentSlow = percentSlow
maxPercentSlowLine = line
# if (maxT > 13000) and line not in ignoreMaxTLines:
# print("MaxT of %d detected on line %d" % (maxT,line))
if (maxPercentSlow > 10) or (slowLoopLineCount > 6):
self.result.status = TestResult.StatusType.FAIL
self.result.statusMessage = "%d slow loop lines found, max %.2f%% on line %d" % (

View File

@ -1,3 +1,6 @@
# AP_FLAKE8_CLEAN
import collections
import DataflashLog
@ -8,12 +11,12 @@ from VehicleType import VehicleType
class TestPitchRollCoupling(Test):
'''test for divergence between input and output pitch/roll, i.e. mechanical failure or bad PID tuning'''
# TODO: currently we're only checking for roll/pitch outside of max lean angle, will come back later to analyze roll/pitch in versus out values
# TODO: currently we're only checking for roll/pitch outside of max lean angle, will come back later to analyze
# roll/pitch in versus out values
def __init__(self):
Test.__init__(self)
self.name = "Pitch/Roll"
self.enable = True # TEMP
def run(self, logdata, verbose):
self.result = TestResult()
@ -23,12 +26,12 @@ class TestPitchRollCoupling(Test):
self.result.status = TestResult.StatusType.NA
return
if not "ATT" in logdata.channels:
if "ATT" not in logdata.channels:
self.result.status = TestResult.StatusType.UNKNOWN
self.result.statusMessage = "No ATT log data"
return
if not "CTUN" in logdata.channels:
if "CTUN" not in logdata.channels:
self.result.status = TestResult.StatusType.UNKNOWN
self.result.statusMessage = "No CTUN log data"
return
@ -38,7 +41,8 @@ class TestPitchRollCoupling(Test):
else:
self.ctun_baralt_att = 'BAlt'
# figure out where each mode begins and ends, so we can treat auto and manual modes differently and ignore acro/tune modes
# figure out where each mode begins and ends, so we can treat auto and manual modes differently and ignore
# acro/tune modes
autoModes = [
"RTL",
"AUTO",

View File

@ -1,6 +1,8 @@
# AP_FLAKE8_CLEAN
from __future__ import print_function
import DataflashLog
from LogAnalyzer import Test, TestResult
from VehicleType import VehicleType
@ -20,11 +22,11 @@ class TestThrust(Test):
self.result.status = TestResult.StatusType.NA
return
if not "CTUN" in logdata.channels:
if "CTUN" not in logdata.channels:
self.result.status = TestResult.StatusType.UNKNOWN
self.result.statusMessage = "No CTUN log data"
return
if not "ATT" in logdata.channels:
if "ATT" not in logdata.channels:
self.result.status = TestResult.StatusType.UNKNOWN
self.result.statusMessage = "No ATT log data"
return
@ -49,7 +51,8 @@ class TestThrust(Test):
highThrottleSegments = []
# find any contiguous chunks where CTUN.ThrOut > highThrottleThreshold, ignore high throttle if tilt > tiltThreshold, and discard any segments shorter than minSampleLength
# find any contiguous chunks where CTUN.ThrOut > highThrottleThreshold, ignore high throttle if
# tilt > tiltThreshold, and discard any segments shorter than minSampleLength
start = None
data = logdata.channels["CTUN"][throut_key].listData
for i in range(0, len(data)):
@ -61,11 +64,10 @@ class TestThrust(Test):
if (abs(roll) > tiltThreshold) or (abs(pitch) > tiltThreshold):
isBelowTiltThreshold = False
if (value > highThrottleThreshold) and isBelowTiltThreshold:
if start == None:
if start is None:
start = i
elif start != None:
elif start is not None:
if (i - start) > minSampleLength:
# print("Found high throttle chunk from line %d to %d (%d samples)" % (data[start][0],data[i][0],i-start+1))
highThrottleSegments.append((start, i))
start = None

View File

@ -1,6 +1,5 @@
import collections
# AP_FLAKE8_CLEAN
import DataflashLog
from LogAnalyzer import Test, TestResult
@ -15,7 +14,7 @@ class TestVCC(Test):
self.result = TestResult()
self.result.status = TestResult.StatusType.GOOD
if not "CURR" in logdata.channels:
if "CURR" not in logdata.channels:
self.result.status = TestResult.StatusType.UNKNOWN
self.result.statusMessage = "No CURR log data"
return
@ -24,7 +23,7 @@ class TestVCC(Test):
try:
vccMin = logdata.channels["CURR"]["Vcc"].min()
vccMax = logdata.channels["CURR"]["Vcc"].max()
except KeyError as e:
except KeyError:
vccMin = logdata.channels["POWR"]["Vcc"].min()
vccMax = logdata.channels["POWR"]["Vcc"].max()
vccMin *= 1000

View File

@ -1,3 +1,6 @@
# AP_FLAKE8_CLEAN
from __future__ import print_function
import DataflashLog
@ -21,13 +24,12 @@ class TestVibration(Test):
return
# constants
gravity = -9.81
aimRangeWarnXY = 1.5
aimRangeFailXY = 3.0
aimRangeWarnZ = 2.0 # gravity +/- aim range
aimRangeFailZ = 5.0 # gravity +/- aim range
if not "IMU" in logdata.channels:
if "IMU" not in logdata.channels:
self.result.status = TestResult.StatusType.UNKNOWN
self.result.statusMessage = "No IMU log data"
return
@ -40,11 +42,11 @@ class TestVibration(Test):
return
# for now we'll just use the first (largest) chunk of LOITER data
# TODO: ignore the first couple of secs to avoid bad data during transition - or can we check more analytically that we're stable?
# TODO: ignore the first couple of secs to avoid bad data during transition - or can we check more analytically
# that we're stable?
# TODO: accumulate all LOITER chunks over min size, or just use the largest one?
startLine = chunks[0][0]
endLine = chunks[0][1]
# print("TestVibration using LOITER chunk from lines %s to %s" % (repr(startLine), repr(endLine)))
def getStdDevIMU(logdata, channelName, startLine, endLine):
loiterData = logdata.channels["IMU"][channelName].getSegment(startLine, endLine)