LogAnalyzer: convert tabs to spaces (only) per coding conventions

This commit is contained in:
Kevin Hester 2014-08-12 08:54:15 -07:00 committed by Randy Mackay
parent 68be36d4f8
commit 332ab9bc1b
7 changed files with 422 additions and 422 deletions

View File

@ -5,106 +5,106 @@ import math
class TestCompass(Test):
'''test for compass offsets and throttle interference'''
'''test for compass offsets and throttle interference'''
def __init__(self):
Test.__init__(self)
self.name = "Compass"
def __init__(self):
Test.__init__(self)
self.name = "Compass"
def run(self, logdata, verbose):
self.result = TestResult()
self.result.status = TestResult.StatusType.GOOD
def run(self, logdata, verbose):
self.result = TestResult()
self.result.status = TestResult.StatusType.GOOD
# test that compass offset parameters are within recommended range (+/- 150)
maxOffset = 150
if logdata.hardwareType == "PX4":
maxOffset = 250 # Pixhawks have their offsets scaled larger
compassOfsX = logdata.parameters["COMPASS_OFS_X"]
compassOfsY = logdata.parameters["COMPASS_OFS_Y"]
compassOfsZ = logdata.parameters["COMPASS_OFS_Z"]
if abs(compassOfsX) > maxOffset or abs(compassOfsY) > maxOffset or abs(compassOfsZ) > maxOffset:
self.result.status = TestResult.StatusType.FAIL
self.result.statusMessage = "Large compass off params (X:%.2f, Y:%.2f, Z:%.2f)\n" % (compassOfsX,compassOfsY,compassOfsZ)
# test that compass offset parameters are within recommended range (+/- 150)
maxOffset = 150
if logdata.hardwareType == "PX4":
maxOffset = 250 # Pixhawks have their offsets scaled larger
compassOfsX = logdata.parameters["COMPASS_OFS_X"]
compassOfsY = logdata.parameters["COMPASS_OFS_Y"]
compassOfsZ = logdata.parameters["COMPASS_OFS_Z"]
if abs(compassOfsX) > maxOffset or abs(compassOfsY) > maxOffset or abs(compassOfsZ) > maxOffset:
self.result.status = TestResult.StatusType.FAIL
self.result.statusMessage = "Large compass off params (X:%.2f, Y:%.2f, Z:%.2f)\n" % (compassOfsX,compassOfsY,compassOfsZ)
# check for excessive compass offsets using MAG data if present (it can change during flight is compass learn is on)
if "MAG" in logdata.channels:
errMsg = ""
if logdata.channels["MAG"]["OfsX"].max() > maxOffset:
errMsg = errMsg + "\nX: %.2f" % logdata.channels["MAG"]["OfsX"].max()
err = True
if logdata.channels["MAG"]["OfsX"].min() < -maxOffset:
errMsg = errMsg + "\nX: %.2f" % logdata.channels["MAG"]["OfsX"].min()
err = True
if logdata.channels["MAG"]["OfsY"].max() > maxOffset:
errMsg = errMsg + "\nY: %.2f" % logdata.channels["MAG"]["OfsY"].max()
err = True
if logdata.channels["MAG"]["OfsY"].min() < -maxOffset:
errMsg = errMsg + "\nY: %.2f" % logdata.channels["MAG"]["OfsY"].min()
err = True
if logdata.channels["MAG"]["OfsZ"].max() > maxOffset:
errMsg = errMsg + "\nZ: %.2f" % logdata.channels["MAG"]["OfsZ"].max()
err = True
if logdata.channels["MAG"]["OfsZ"].min() < -maxOffset:
errMsg = errMsg + "\nZ: %.2f" % logdata.channels["MAG"]["OfsZ"].min()
err = True
if errMsg:
self.result.status = TestResult.StatusType.FAIL
self.result.statusMessage = self.result.statusMessage + "Large compass offset in MAG data:" + errMsg + "\n"
# check for excessive compass offsets using MAG data if present (it can change during flight is compass learn is on)
if "MAG" in logdata.channels:
errMsg = ""
if logdata.channels["MAG"]["OfsX"].max() > maxOffset:
errMsg = errMsg + "\nX: %.2f" % logdata.channels["MAG"]["OfsX"].max()
err = True
if logdata.channels["MAG"]["OfsX"].min() < -maxOffset:
errMsg = errMsg + "\nX: %.2f" % logdata.channels["MAG"]["OfsX"].min()
err = True
if logdata.channels["MAG"]["OfsY"].max() > maxOffset:
errMsg = errMsg + "\nY: %.2f" % logdata.channels["MAG"]["OfsY"].max()
err = True
if logdata.channels["MAG"]["OfsY"].min() < -maxOffset:
errMsg = errMsg + "\nY: %.2f" % logdata.channels["MAG"]["OfsY"].min()
err = True
if logdata.channels["MAG"]["OfsZ"].max() > maxOffset:
errMsg = errMsg + "\nZ: %.2f" % logdata.channels["MAG"]["OfsZ"].max()
err = True
if logdata.channels["MAG"]["OfsZ"].min() < -maxOffset:
errMsg = errMsg + "\nZ: %.2f" % logdata.channels["MAG"]["OfsZ"].min()
err = True
if errMsg:
self.result.status = TestResult.StatusType.FAIL
self.result.statusMessage = self.result.statusMessage + "Large compass offset in MAG data:" + errMsg + "\n"
# check for mag field length change, and length outside of recommended range
if "MAG" in logdata.channels:
percentDiffThresholdWARN = 0.25
percentDiffThresholdFAIL = 0.35
minMagFieldThreshold = 120.0
maxMagFieldThreshold = 550.0
index = 0
length = len(logdata.channels["MAG"]["MagX"].listData)
magField = []
(minMagField, maxMagField) = (None,None)
(minMagFieldLine, maxMagFieldLine) = (None,None)
zerosFound = False
while index<length:
mx = logdata.channels["MAG"]["MagX"].listData[index][1]
my = logdata.channels["MAG"]["MagY"].listData[index][1]
mz = logdata.channels["MAG"]["MagZ"].listData[index][1]
if ((mx==0) and (my==0) and (mz==0)): # sometimes they're zero, not sure why, same reason as why we get NaNs as offsets?
zerosFound = True
else:
mf = math.sqrt(mx*mx + my*my + mz*mz)
magField.append(mf)
if mf<minMagField:
minMagField = mf
minMagFieldLine = logdata.channels["MAG"]["MagX"].listData[index][0]
if mf>maxMagField:
maxMagField = mf
maxMagFieldLine = logdata.channels["MAG"]["MagX"].listData[index][0]
if index == 0:
(minMagField, maxMagField) = (mf,mf)
index += 1
percentDiff = (maxMagField-minMagField) / minMagField
if percentDiff > percentDiffThresholdFAIL:
self.result.status = TestResult.StatusType.FAIL
self.result.statusMessage = self.result.statusMessage + "Large change in mag_field (%.2f%%)\n" % (percentDiff*100)
elif percentDiff > percentDiffThresholdWARN:
if self.result.status != TestResult.StatusType.FAIL:
self.result.status = TestResult.StatusType.WARN
self.result.statusMessage = self.result.statusMessage + "Moderate change in mag_field (%.2f%%)\n" % (percentDiff*100)
else:
self.result.statusMessage = self.result.statusMessage + "mag_field interference within limits (%.2f%%)\n" % (percentDiff*100)
if minMagField < minMagFieldThreshold:
self.result.statusMessage = self.result.statusMessage + "Min mag field length (%.2f) < recommended (%.2f)\n" % (minMagField,minMagFieldThreshold)
if maxMagField > maxMagFieldThreshold:
self.result.statusMessage = self.result.statusMessage + "Max mag field length (%.2f) > recommended (%.2f)\n" % (maxMagField,maxMagFieldThreshold)
if zerosFound:
if self.result.status != TestResult.StatusType.FAIL:
self.result.status = TestResult.StatusType.WARN
self.result.statusMessage = self.result.statusMessage + "All zeros found in MAG X/Y/Z log data\n"
if verbose:
self.result.statusMessage = self.result.statusMessage + "Min mag_field of %.2f on line %d\n" % (minMagField,minMagFieldLine)
self.result.statusMessage = self.result.statusMessage + "Max mag_field of %.2f on line %d\n" % (maxMagField,maxMagFieldLine)
# check for mag field length change, and length outside of recommended range
if "MAG" in logdata.channels:
percentDiffThresholdWARN = 0.25
percentDiffThresholdFAIL = 0.35
minMagFieldThreshold = 120.0
maxMagFieldThreshold = 550.0
index = 0
length = len(logdata.channels["MAG"]["MagX"].listData)
magField = []
(minMagField, maxMagField) = (None,None)
(minMagFieldLine, maxMagFieldLine) = (None,None)
zerosFound = False
while index<length:
mx = logdata.channels["MAG"]["MagX"].listData[index][1]
my = logdata.channels["MAG"]["MagY"].listData[index][1]
mz = logdata.channels["MAG"]["MagZ"].listData[index][1]
if ((mx==0) and (my==0) and (mz==0)): # sometimes they're zero, not sure why, same reason as why we get NaNs as offsets?
zerosFound = True
else:
mf = math.sqrt(mx*mx + my*my + mz*mz)
magField.append(mf)
if mf<minMagField:
minMagField = mf
minMagFieldLine = logdata.channels["MAG"]["MagX"].listData[index][0]
if mf>maxMagField:
maxMagField = mf
maxMagFieldLine = logdata.channels["MAG"]["MagX"].listData[index][0]
if index == 0:
(minMagField, maxMagField) = (mf,mf)
index += 1
percentDiff = (maxMagField-minMagField) / minMagField
if percentDiff > percentDiffThresholdFAIL:
self.result.status = TestResult.StatusType.FAIL
self.result.statusMessage = self.result.statusMessage + "Large change in mag_field (%.2f%%)\n" % (percentDiff*100)
elif percentDiff > percentDiffThresholdWARN:
if self.result.status != TestResult.StatusType.FAIL:
self.result.status = TestResult.StatusType.WARN
self.result.statusMessage = self.result.statusMessage + "Moderate change in mag_field (%.2f%%)\n" % (percentDiff*100)
else:
self.result.statusMessage = self.result.statusMessage + "mag_field interference within limits (%.2f%%)\n" % (percentDiff*100)
if minMagField < minMagFieldThreshold:
self.result.statusMessage = self.result.statusMessage + "Min mag field length (%.2f) < recommended (%.2f)\n" % (minMagField,minMagFieldThreshold)
if maxMagField > maxMagFieldThreshold:
self.result.statusMessage = self.result.statusMessage + "Max mag field length (%.2f) > recommended (%.2f)\n" % (maxMagField,maxMagFieldThreshold)
if zerosFound:
if self.result.status != TestResult.StatusType.FAIL:
self.result.status = TestResult.StatusType.WARN
self.result.statusMessage = self.result.statusMessage + "All zeros found in MAG X/Y/Z log data\n"
if verbose:
self.result.statusMessage = self.result.statusMessage + "Min mag_field of %.2f on line %d\n" % (minMagField,minMagFieldLine)
self.result.statusMessage = self.result.statusMessage + "Max mag_field of %.2f on line %d\n" % (maxMagField,maxMagFieldLine)
else:
self.result.statusMessage = self.result.statusMessage + "No MAG data, unable to test mag_field interference\n"
else:
self.result.statusMessage = self.result.statusMessage + "No MAG data, unable to test mag_field interference\n"

View File

@ -5,55 +5,55 @@ import math # for isnan()
class TestParams(Test):
'''test for any obviously bad parameters in the config'''
'''test for any obviously bad parameters in the config'''
def __init__(self):
Test.__init__(self)
self.name = "Parameters"
def __init__(self):
Test.__init__(self)
self.name = "Parameters"
# helper functions
def __checkParamIsEqual(self, paramName, expectedValue, logdata):
value = logdata.parameters[paramName]
if value != expectedValue:
self.result.status = TestResult.StatusType.FAIL
self.result.statusMessage = self.result.statusMessage + "%s set to %s, expecting %s\n" % (paramName, `value`, `expectedValue`)
def __checkParamIsLessThan(self, paramName, maxValue, logdata):
value = logdata.parameters[paramName]
if value >= maxValue:
self.result.status = TestResult.StatusType.FAIL
self.result.statusMessage = self.result.statusMessage + "%s set to %s, expecting less than %s\n" % (paramName, `value`, `maxValue`)
def __checkParamIsMoreThan(self, paramName, minValue, logdata):
value = logdata.parameters[paramName]
if value <= minValue:
self.result.status = TestResult.StatusType.FAIL
self.result.statusMessage = self.result.statusMessage + "%s set to %s, expecting less than %s\n" % (paramName, `value`, `minValue`)
# helper functions
def __checkParamIsEqual(self, paramName, expectedValue, logdata):
value = logdata.parameters[paramName]
if value != expectedValue:
self.result.status = TestResult.StatusType.FAIL
self.result.statusMessage = self.result.statusMessage + "%s set to %s, expecting %s\n" % (paramName, `value`, `expectedValue`)
def __checkParamIsLessThan(self, paramName, maxValue, logdata):
value = logdata.parameters[paramName]
if value >= maxValue:
self.result.status = TestResult.StatusType.FAIL
self.result.statusMessage = self.result.statusMessage + "%s set to %s, expecting less than %s\n" % (paramName, `value`, `maxValue`)
def __checkParamIsMoreThan(self, paramName, minValue, logdata):
value = logdata.parameters[paramName]
if value <= minValue:
self.result.status = TestResult.StatusType.FAIL
self.result.statusMessage = self.result.statusMessage + "%s set to %s, expecting less than %s\n" % (paramName, `value`, `minValue`)
def run(self, logdata, verbose):
self.result = TestResult()
self.result.status = TestResult.StatusType.GOOD # GOOD by default, tests below will override it if they fail
def run(self, logdata, verbose):
self.result = TestResult()
self.result.status = TestResult.StatusType.GOOD # GOOD by default, tests below will override it if they fail
# check all params for NaN
for name,value in logdata.parameters.iteritems():
if math.isnan(value):
self.result.status = TestResult.StatusType.FAIL
self.result.statusMessage = self.result.statusMessage + name + " is NaN\n"
# check all params for NaN
for name,value in logdata.parameters.iteritems():
if math.isnan(value):
self.result.status = TestResult.StatusType.FAIL
self.result.statusMessage = self.result.statusMessage + name + " is NaN\n"
# add parameter checks below using the helper functions, any failures will trigger a FAIL status and accumulate info in statusMessage
# if more complex checking or correlations are required you can access parameter values directly using the logdata.parameters[paramName] dict
if logdata.vehicleType == "ArduCopter":
self.__checkParamIsEqual ("MAG_ENABLE", 1, logdata)
self.__checkParamIsLessThan("THR_MIN", 200, logdata)
self.__checkParamIsLessThan("THR_MID", 701, logdata)
self.__checkParamIsMoreThan("THR_MID", 299, logdata)
# TODO: add more parameter tests, these are just an example...
elif logdata.vehicleType == "ArduPlane":
# TODO: add parameter checks for plane...
pass
elif logdata.vehicleType == "ArduRover":
# TODO: add parameter checks for rover...
pass
# add parameter checks below using the helper functions, any failures will trigger a FAIL status and accumulate info in statusMessage
# if more complex checking or correlations are required you can access parameter values directly using the logdata.parameters[paramName] dict
if logdata.vehicleType == "ArduCopter":
self.__checkParamIsEqual ("MAG_ENABLE", 1, logdata)
self.__checkParamIsLessThan("THR_MIN", 200, logdata)
self.__checkParamIsLessThan("THR_MID", 701, logdata)
self.__checkParamIsMoreThan("THR_MID", 299, logdata)
# TODO: add more parameter tests, these are just an example...
elif logdata.vehicleType == "ArduPlane":
# TODO: add parameter checks for plane...
pass
elif logdata.vehicleType == "ArduRover":
# TODO: add parameter checks for rover...
pass
if self.result.status == TestResult.StatusType.FAIL:
self.result.statusMessage = "Bad parameters found:\n" + self.result.statusMessage
if self.result.status == TestResult.StatusType.FAIL:
self.result.statusMessage = "Bad parameters found:\n" + self.result.statusMessage

View File

@ -3,62 +3,62 @@ import DataflashLog
class TestPerformance(Test):
'''check performance monitoring messages (PM) for issues with slow loops, etc'''
'''check performance monitoring messages (PM) for issues with slow loops, etc'''
def __init__(self):
Test.__init__(self)
self.name = "PM"
def __init__(self):
Test.__init__(self)
self.name = "PM"
def run(self, logdata, verbose):
self.result = TestResult()
self.result.status = TestResult.StatusType.GOOD
def run(self, logdata, verbose):
self.result = TestResult()
self.result.status = TestResult.StatusType.GOOD
# this test should be valid for all vehicle types, just need to figure out why PM logging data is different in each
if logdata.vehicleType != "ArduCopter":
self.result.status = TestResult.StatusType.NA
return
# this test should be valid for all vehicle types, just need to figure out why PM logging data is different in each
if logdata.vehicleType != "ArduCopter":
self.result.status = TestResult.StatusType.NA
return
# NOTE: we'll ignore MaxT altogether for now, it seems there are quite regularly one or two high values in there, even ignoring the ones expected after arm/disarm events
# gather info on arm/disarm lines, we will ignore the MaxT data from the first line found after each of these
# armingLines = []
# for line,ev in logdata.channels["EV"]["Id"].listData:
# if (ev == 10) or (ev == 11):
# armingLines.append(line)
# ignoreMaxTLines = []
# for maxT in logdata.channels["PM"]["MaxT"].listData:
# if not armingLines:
# break
# if maxT[0] > armingLines[0]:
# #print "Ignoring maxT from line %d, as it is the first PM line after arming on line %d" % (maxT[0],armingLines[0])
# ignoreMaxTLines.append(maxT[0])
# armingLines.pop(0)
# NOTE: we'll ignore MaxT altogether for now, it seems there are quite regularly one or two high values in there, even ignoring the ones expected after arm/disarm events
# gather info on arm/disarm lines, we will ignore the MaxT data from the first line found after each of these
# armingLines = []
# for line,ev in logdata.channels["EV"]["Id"].listData:
# if (ev == 10) or (ev == 11):
# armingLines.append(line)
# ignoreMaxTLines = []
# for maxT in logdata.channels["PM"]["MaxT"].listData:
# if not armingLines:
# break
# if maxT[0] > armingLines[0]:
# #print "Ignoring maxT from line %d, as it is the first PM line after arming on line %d" % (maxT[0],armingLines[0])
# ignoreMaxTLines.append(maxT[0])
# armingLines.pop(0)
if "PM" not in logdata.channels:
self.result.status = TestResult.StatusType.UNKNOWN
self.result.statusMessage = "No PM log data"
return
if "PM" not in logdata.channels:
self.result.status = TestResult.StatusType.UNKNOWN
self.result.statusMessage = "No PM log data"
return
# check for slow loops, i.e. NLon greater than 6% of NLoop
maxPercentSlow = 0
maxPercentSlowLine = 0
slowLoopLineCount = 0
for i in range(len(logdata.channels["PM"]["NLon"].listData)):
(line, nLon) = logdata.channels["PM"]["NLon"].listData[i]
(line, nLoop) = logdata.channels["PM"]["NLoop"].listData[i]
(line, maxT) = logdata.channels["PM"]["MaxT"].listData[i]
percentSlow = (nLon / float(nLoop)) * 100
if percentSlow > 6.0:
slowLoopLineCount = slowLoopLineCount + 1
if percentSlow > maxPercentSlow:
maxPercentSlow = percentSlow
maxPercentSlowLine = line
#if (maxT > 13000) and line not in ignoreMaxTLines:
# print "MaxT of %d detected on line %d" % (maxT,line)
if (maxPercentSlow > 10) or (slowLoopLineCount > 6):
self.result.status = TestResult.StatusType.FAIL
self.result.statusMessage = "%d slow loop lines found, max %.2f%% on line %d" % (slowLoopLineCount,maxPercentSlow,maxPercentSlowLine)
elif (maxPercentSlow > 6):
self.result.status = TestResult.StatusType.WARN
self.result.statusMessage = "%d slow loop lines found, max %.2f%% on line %d" % (slowLoopLineCount,maxPercentSlow,maxPercentSlowLine)
# check for slow loops, i.e. NLon greater than 6% of NLoop
maxPercentSlow = 0
maxPercentSlowLine = 0
slowLoopLineCount = 0
for i in range(len(logdata.channels["PM"]["NLon"].listData)):
(line, nLon) = logdata.channels["PM"]["NLon"].listData[i]
(line, nLoop) = logdata.channels["PM"]["NLoop"].listData[i]
(line, maxT) = logdata.channels["PM"]["MaxT"].listData[i]
percentSlow = (nLon / float(nLoop)) * 100
if percentSlow > 6.0:
slowLoopLineCount = slowLoopLineCount + 1
if percentSlow > maxPercentSlow:
maxPercentSlow = percentSlow
maxPercentSlowLine = line
#if (maxT > 13000) and line not in ignoreMaxTLines:
# print "MaxT of %d detected on line %d" % (maxT,line)
if (maxPercentSlow > 10) or (slowLoopLineCount > 6):
self.result.status = TestResult.StatusType.FAIL
self.result.statusMessage = "%d slow loop lines found, max %.2f%% on line %d" % (slowLoopLineCount,maxPercentSlow,maxPercentSlowLine)
elif (maxPercentSlow > 6):
self.result.status = TestResult.StatusType.WARN
self.result.statusMessage = "%d slow loop lines found, max %.2f%% on line %d" % (slowLoopLineCount,maxPercentSlow,maxPercentSlowLine)

View File

@ -5,116 +5,116 @@ import collections
class TestPitchRollCoupling(Test):
'''test for divergence between input and output pitch/roll, i.e. mechanical failure or bad PID tuning'''
# TODO: currently we're only checking for roll/pitch outside of max lean angle, will come back later to analyze roll/pitch in versus out values
'''test for divergence between input and output pitch/roll, i.e. mechanical failure or bad PID tuning'''
# TODO: currently we're only checking for roll/pitch outside of max lean angle, will come back later to analyze roll/pitch in versus out values
def __init__(self):
Test.__init__(self)
self.name = "Pitch/Roll"
self.enable = True # TEMP
def __init__(self):
Test.__init__(self)
self.name = "Pitch/Roll"
self.enable = True # TEMP
def run(self, logdata, verbose):
self.result = TestResult()
self.result.status = TestResult.StatusType.GOOD
def run(self, logdata, verbose):
self.result = TestResult()
self.result.status = TestResult.StatusType.GOOD
if logdata.vehicleType != "ArduCopter":
self.result.status = TestResult.StatusType.NA
return
if logdata.vehicleType != "ArduCopter":
self.result.status = TestResult.StatusType.NA
return
if not "ATT" in logdata.channels:
self.result.status = TestResult.StatusType.UNKNOWN
self.result.statusMessage = "No ATT log data"
return
if not "ATT" in logdata.channels:
self.result.status = TestResult.StatusType.UNKNOWN
self.result.statusMessage = "No ATT log data"
return
# figure out where each mode begins and ends, so we can treat auto and manual modes differently and ignore acro/tune modes
autoModes = ["RTL","AUTO","LAND","LOITER","GUIDED","CIRCLE","OF_LOITER","HYBRID"] # use NTUN DRol+DPit
manualModes = ["STABILIZE","DRIFT","ALTHOLD","ALT_HOLD"] # use CTUN RollIn/DesRoll + PitchIn/DesPitch
ignoreModes = ["ACRO","SPORT","FLIP","AUTOTUNE",""] # ignore data from these modes
autoSegments = [] # list of (startLine,endLine) pairs
manualSegments = [] # list of (startLine,endLine) pairs
orderedModes = collections.OrderedDict(sorted(logdata.modeChanges.items(), key=lambda t: t[0]))
isAuto = False # we always start in a manual control mode
prevLine = 0
mode = ""
for line,modepair in orderedModes.iteritems():
mode = modepair[0].upper()
if prevLine == 0:
prevLine = line
if mode in autoModes:
if not isAuto:
manualSegments.append((prevLine,line-1))
prevLine = line
isAuto = True
elif mode in manualModes:
if isAuto:
autoSegments.append((prevLine,line-1))
prevLine = line
isAuto = False
elif mode in ignoreModes:
if isAuto:
autoSegments.append((prevLine,line-1))
else:
manualSegments.append((prevLine,line-1))
prevLine = 0
else:
raise Exception("Unknown mode in TestPitchRollCoupling: %s" % mode)
# and handle the last segment, which doesn't have an ending
if mode in autoModes:
autoSegments.append((prevLine,logdata.lineCount))
elif mode in manualModes:
manualSegments.append((prevLine,logdata.lineCount))
# figure out where each mode begins and ends, so we can treat auto and manual modes differently and ignore acro/tune modes
autoModes = ["RTL","AUTO","LAND","LOITER","GUIDED","CIRCLE","OF_LOITER","HYBRID"] # use NTUN DRol+DPit
manualModes = ["STABILIZE","DRIFT","ALTHOLD","ALT_HOLD"] # use CTUN RollIn/DesRoll + PitchIn/DesPitch
ignoreModes = ["ACRO","SPORT","FLIP","AUTOTUNE",""] # ignore data from these modes
autoSegments = [] # list of (startLine,endLine) pairs
manualSegments = [] # list of (startLine,endLine) pairs
orderedModes = collections.OrderedDict(sorted(logdata.modeChanges.items(), key=lambda t: t[0]))
isAuto = False # we always start in a manual control mode
prevLine = 0
mode = ""
for line,modepair in orderedModes.iteritems():
mode = modepair[0].upper()
if prevLine == 0:
prevLine = line
if mode in autoModes:
if not isAuto:
manualSegments.append((prevLine,line-1))
prevLine = line
isAuto = True
elif mode in manualModes:
if isAuto:
autoSegments.append((prevLine,line-1))
prevLine = line
isAuto = False
elif mode in ignoreModes:
if isAuto:
autoSegments.append((prevLine,line-1))
else:
manualSegments.append((prevLine,line-1))
prevLine = 0
else:
raise Exception("Unknown mode in TestPitchRollCoupling: %s" % mode)
# and handle the last segment, which doesn't have an ending
if mode in autoModes:
autoSegments.append((prevLine,logdata.lineCount))
elif mode in manualModes:
manualSegments.append((prevLine,logdata.lineCount))
# figure out max lean angle, the ANGLE_MAX param was added in AC3.1
maxLeanAngle = 45.0
if "ANGLE_MAX" in logdata.parameters:
maxLeanAngle = logdata.parameters["ANGLE_MAX"] / 100.0
maxLeanAngleBuffer = 10 # allow a buffer margin
# figure out max lean angle, the ANGLE_MAX param was added in AC3.1
maxLeanAngle = 45.0
if "ANGLE_MAX" in logdata.parameters:
maxLeanAngle = logdata.parameters["ANGLE_MAX"] / 100.0
maxLeanAngleBuffer = 10 # allow a buffer margin
# ignore anything below this altitude, to discard any data while not flying
minAltThreshold = 2.0
# ignore anything below this altitude, to discard any data while not flying
minAltThreshold = 2.0
# look through manual+auto flight segments
# TODO: filter to ignore single points outside range?
(maxRoll, maxRollLine) = (0.0, 0)
(maxPitch, maxPitchLine) = (0.0, 0)
for (startLine,endLine) in manualSegments+autoSegments:
# quick up-front test, only fallover into more complex line-by-line check if max()>threshold
rollSeg = logdata.channels["ATT"]["Roll"].getSegment(startLine,endLine)
pitchSeg = logdata.channels["ATT"]["Pitch"].getSegment(startLine,endLine)
if not rollSeg.dictData and not pitchSeg.dictData:
continue
# check max roll+pitch for any time where relative altitude is above minAltThreshold
roll = max(abs(rollSeg.min()), abs(rollSeg.max()))
pitch = max(abs(pitchSeg.min()), abs(pitchSeg.max()))
if (roll>(maxLeanAngle+maxLeanAngleBuffer) and abs(roll)>abs(maxRoll)) or (pitch>(maxLeanAngle+maxLeanAngleBuffer) and abs(pitch)>abs(maxPitch)):
lit = DataflashLog.LogIterator(logdata, startLine)
assert(lit.currentLine == startLine)
while lit.currentLine <= endLine:
relativeAlt = lit["CTUN"]["BarAlt"]
if relativeAlt > minAltThreshold:
roll = lit["ATT"]["Roll"]
pitch = lit["ATT"]["Pitch"]
if abs(roll)>(maxLeanAngle+maxLeanAngleBuffer) and abs(roll)>abs(maxRoll):
maxRoll = roll
maxRollLine = lit.currentLine
if abs(pitch)>(maxLeanAngle+maxLeanAngleBuffer) and abs(pitch)>abs(maxPitch):
maxPitch = pitch
maxPitchLine = lit.currentLine
lit.next()
# check for breaking max lean angles
if maxRoll and abs(maxRoll)>abs(maxPitch):
self.result.status = TestResult.StatusType.FAIL
self.result.statusMessage = "Roll (%.2f, line %d) > maximum lean angle (%.2f)" % (maxRoll, maxRollLine, maxLeanAngle)
return
if maxPitch:
self.result.status = TestResult.StatusType.FAIL
self.result.statusMessage = "Pitch (%.2f, line %d) > maximum lean angle (%.2f)" % (maxPitch, maxPitchLine, maxLeanAngle)
return
# look through manual+auto flight segments
# TODO: filter to ignore single points outside range?
(maxRoll, maxRollLine) = (0.0, 0)
(maxPitch, maxPitchLine) = (0.0, 0)
for (startLine,endLine) in manualSegments+autoSegments:
# quick up-front test, only fallover into more complex line-by-line check if max()>threshold
rollSeg = logdata.channels["ATT"]["Roll"].getSegment(startLine,endLine)
pitchSeg = logdata.channels["ATT"]["Pitch"].getSegment(startLine,endLine)
if not rollSeg.dictData and not pitchSeg.dictData:
continue
# check max roll+pitch for any time where relative altitude is above minAltThreshold
roll = max(abs(rollSeg.min()), abs(rollSeg.max()))
pitch = max(abs(pitchSeg.min()), abs(pitchSeg.max()))
if (roll>(maxLeanAngle+maxLeanAngleBuffer) and abs(roll)>abs(maxRoll)) or (pitch>(maxLeanAngle+maxLeanAngleBuffer) and abs(pitch)>abs(maxPitch)):
lit = DataflashLog.LogIterator(logdata, startLine)
assert(lit.currentLine == startLine)
while lit.currentLine <= endLine:
relativeAlt = lit["CTUN"]["BarAlt"]
if relativeAlt > minAltThreshold:
roll = lit["ATT"]["Roll"]
pitch = lit["ATT"]["Pitch"]
if abs(roll)>(maxLeanAngle+maxLeanAngleBuffer) and abs(roll)>abs(maxRoll):
maxRoll = roll
maxRollLine = lit.currentLine
if abs(pitch)>(maxLeanAngle+maxLeanAngleBuffer) and abs(pitch)>abs(maxPitch):
maxPitch = pitch
maxPitchLine = lit.currentLine
lit.next()
# check for breaking max lean angles
if maxRoll and abs(maxRoll)>abs(maxPitch):
self.result.status = TestResult.StatusType.FAIL
self.result.statusMessage = "Roll (%.2f, line %d) > maximum lean angle (%.2f)" % (maxRoll, maxRollLine, maxLeanAngle)
return
if maxPitch:
self.result.status = TestResult.StatusType.FAIL
self.result.statusMessage = "Pitch (%.2f, line %d) > maximum lean angle (%.2f)" % (maxPitch, maxPitchLine, maxLeanAngle)
return
# TODO: use numpy/scipy to check Roll+RollIn curves for fitness (ignore where we're not airborne)
# ...
# TODO: use numpy/scipy to check Roll+RollIn curves for fitness (ignore where we're not airborne)
# ...

View File

@ -3,76 +3,76 @@ import DataflashLog
class TestThrust(Test):
'''test for sufficient thrust (copter only for now)'''
'''test for sufficient thrust (copter only for now)'''
def __init__(self):
Test.__init__(self)
self.name = "Thrust"
def run(self, logdata, verbose):
self.result = TestResult()
self.result.status = TestResult.StatusType.GOOD
def __init__(self):
Test.__init__(self)
self.name = "Thrust"
def run(self, logdata, verbose):
self.result = TestResult()
self.result.status = TestResult.StatusType.GOOD
if logdata.vehicleType != "ArduCopter":
self.result.status = TestResult.StatusType.NA
return
if logdata.vehicleType != "ArduCopter":
self.result.status = TestResult.StatusType.NA
return
if not "CTUN" in logdata.channels:
self.result.status = TestResult.StatusType.UNKNOWN
self.result.statusMessage = "No CTUN log data"
return
if not "ATT" in logdata.channels:
self.result.status = TestResult.StatusType.UNKNOWN
self.result.statusMessage = "No ATT log data"
return
if not "CTUN" in logdata.channels:
self.result.status = TestResult.StatusType.UNKNOWN
self.result.statusMessage = "No CTUN log data"
return
if not "ATT" in logdata.channels:
self.result.status = TestResult.StatusType.UNKNOWN
self.result.statusMessage = "No ATT log data"
return
# check for throttle (CTUN.ThrOut) above 700 for a chunk of time with copter not rising
# check for throttle (CTUN.ThrOut) above 700 for a chunk of time with copter not rising
highThrottleThreshold = 700
tiltThreshold = 20 # ignore high throttle when roll or tilt is above this value
climbThresholdWARN = 100
climbThresholdFAIL = 50
minSampleLength = 50
highThrottleThreshold = 700
tiltThreshold = 20 # ignore high throttle when roll or tilt is above this value
climbThresholdWARN = 100
climbThresholdFAIL = 50
minSampleLength = 50
highThrottleSegments = []
highThrottleSegments = []
# find any contiguous chunks where CTUN.ThrOut > highThrottleThreshold, ignore high throttle if tilt > tiltThreshold, and discard any segments shorter than minSampleLength
start = None
data = logdata.channels["CTUN"]["ThrOut"].listData
for i in range(0,len(data)):
(lineNumber,value) = data[i]
isBelowTiltThreshold = True
if value > highThrottleThreshold:
(roll,meh) = logdata.channels["ATT"]["Roll"].getNearestValue(lineNumber)
(pitch,meh) = logdata.channels["ATT"]["Pitch"].getNearestValue(lineNumber)
if (abs(roll) > tiltThreshold) or (abs(pitch) > tiltThreshold):
isBelowTiltThreshold = False
if (value > highThrottleThreshold) and isBelowTiltThreshold:
if start == None:
start = i
elif start != None:
if (i-start) > minSampleLength:
#print "Found high throttle chunk from line %d to %d (%d samples)" % (data[start][0],data[i][0],i-start+1)
highThrottleSegments.append((start,i))
start = None
# find any contiguous chunks where CTUN.ThrOut > highThrottleThreshold, ignore high throttle if tilt > tiltThreshold, and discard any segments shorter than minSampleLength
start = None
data = logdata.channels["CTUN"]["ThrOut"].listData
for i in range(0,len(data)):
(lineNumber,value) = data[i]
isBelowTiltThreshold = True
if value > highThrottleThreshold:
(roll,meh) = logdata.channels["ATT"]["Roll"].getNearestValue(lineNumber)
(pitch,meh) = logdata.channels["ATT"]["Pitch"].getNearestValue(lineNumber)
if (abs(roll) > tiltThreshold) or (abs(pitch) > tiltThreshold):
isBelowTiltThreshold = False
if (value > highThrottleThreshold) and isBelowTiltThreshold:
if start == None:
start = i
elif start != None:
if (i-start) > minSampleLength:
#print "Found high throttle chunk from line %d to %d (%d samples)" % (data[start][0],data[i][0],i-start+1)
highThrottleSegments.append((start,i))
start = None
climbRate = "CRate"
if "CRate" not in logdata.channels["CTUN"]:
climbRate = "CRt"
climbRate = "CRate"
if "CRate" not in logdata.channels["CTUN"]:
climbRate = "CRt"
# loop through each checking climbRate, if < 50 FAIL, if < 100 WARN
# TODO: we should filter climbRate and use its slope rather than value for this test
for seg in highThrottleSegments:
(startLine,endLine) = (data[seg[0]][0], data[seg[1]][0])
avgClimbRate = logdata.channels["CTUN"][climbRate].getSegment(startLine,endLine).avg()
avgThrOut = logdata.channels["CTUN"]["ThrOut"].getSegment(startLine,endLine).avg()
if avgClimbRate < climbThresholdFAIL:
self.result.status = TestResult.StatusType.FAIL
self.result.statusMessage = "Avg climb rate %.2f cm/s for throttle avg %d" % (avgClimbRate,avgThrOut)
return
if avgClimbRate < climbThresholdWARN:
self.result.status = TestResult.StatusType.WARN
self.result.statusMessage = "Avg climb rate %.2f cm/s for throttle avg %d" % (avgClimbRate,avgThrOut)
# loop through each checking climbRate, if < 50 FAIL, if < 100 WARN
# TODO: we should filter climbRate and use its slope rather than value for this test
for seg in highThrottleSegments:
(startLine,endLine) = (data[seg[0]][0], data[seg[1]][0])
avgClimbRate = logdata.channels["CTUN"][climbRate].getSegment(startLine,endLine).avg()
avgThrOut = logdata.channels["CTUN"]["ThrOut"].getSegment(startLine,endLine).avg()
if avgClimbRate < climbThresholdFAIL:
self.result.status = TestResult.StatusType.FAIL
self.result.statusMessage = "Avg climb rate %.2f cm/s for throttle avg %d" % (avgClimbRate,avgThrOut)
return
if avgClimbRate < climbThresholdWARN:
self.result.status = TestResult.StatusType.WARN
self.result.statusMessage = "Avg climb rate %.2f cm/s for throttle avg %d" % (avgClimbRate,avgThrOut)

View File

@ -5,31 +5,31 @@ import collections
class TestVCC(Test):
'''test for VCC within recommendations, or abrupt end to log in flight'''
'''test for VCC within recommendations, or abrupt end to log in flight'''
def __init__(self):
Test.__init__(self)
self.name = "VCC"
def __init__(self):
Test.__init__(self)
self.name = "VCC"
def run(self, logdata, verbose):
self.result = TestResult()
self.result.status = TestResult.StatusType.GOOD
def run(self, logdata, verbose):
self.result = TestResult()
self.result.status = TestResult.StatusType.GOOD
if not "CURR" in logdata.channels:
self.result.status = TestResult.StatusType.UNKNOWN
self.result.statusMessage = "No CURR log data"
return
if not "CURR" in logdata.channels:
self.result.status = TestResult.StatusType.UNKNOWN
self.result.statusMessage = "No CURR log data"
return
# just a naive min/max test for now
vccMin = logdata.channels["CURR"]["Vcc"].min()
vccMax = logdata.channels["CURR"]["Vcc"].max()
vccDiff = vccMax - vccMin;
vccMinThreshold = 4.6 * 1000;
vccMaxDiff = 0.3 * 1000;
if vccDiff > vccMaxDiff:
self.result.status = TestResult.StatusType.WARN
self.result.statusMessage = "VCC min/max diff %sv, should be <%sv" % (vccDiff/1000.0, vccMaxDiff/1000.0)
elif vccMin < vccMinThreshold:
self.result.status = TestResult.StatusType.FAIL
self.result.statusMessage = "VCC below minimum of %sv (%sv)" % (`vccMinThreshold/1000.0`,`vccMin/1000.0`)
# just a naive min/max test for now
vccMin = logdata.channels["CURR"]["Vcc"].min()
vccMax = logdata.channels["CURR"]["Vcc"].max()
vccDiff = vccMax - vccMin;
vccMinThreshold = 4.6 * 1000;
vccMaxDiff = 0.3 * 1000;
if vccDiff > vccMaxDiff:
self.result.status = TestResult.StatusType.WARN
self.result.statusMessage = "VCC min/max diff %sv, should be <%sv" % (vccDiff/1000.0, vccMaxDiff/1000.0)
elif vccMin < vccMinThreshold:
self.result.status = TestResult.StatusType.FAIL
self.result.statusMessage = "VCC below minimum of %sv (%sv)" % (`vccMinThreshold/1000.0`,`vccMin/1000.0`)

View File

@ -5,63 +5,63 @@ import numpy
class TestVibration(Test):
'''test for accelerometer vibration (accX/accY/accZ) within recommendations'''
'''test for accelerometer vibration (accX/accY/accZ) within recommendations'''
def __init__(self):
Test.__init__(self)
self.name = "Vibration"
def __init__(self):
Test.__init__(self)
self.name = "Vibration"
def run(self, logdata, verbose):
self.result = TestResult()
def run(self, logdata, verbose):
self.result = TestResult()
if logdata.vehicleType != "ArduCopter":
self.result.status = TestResult.StatusType.NA
return
if logdata.vehicleType != "ArduCopter":
self.result.status = TestResult.StatusType.NA
return
# constants
gravity = -9.81
aimRangeWarnXY = 1.5
aimRangeFailXY = 3.0
aimRangeWarnZ = 2.0 # gravity +/- aim range
aimRangeFailZ = 5.0 # gravity +/- aim range
# constants
gravity = -9.81
aimRangeWarnXY = 1.5
aimRangeFailXY = 3.0
aimRangeWarnZ = 2.0 # gravity +/- aim range
aimRangeFailZ = 5.0 # gravity +/- aim range
if not "IMU" in logdata.channels:
self.result.status = TestResult.StatusType.UNKNOWN
self.result.statusMessage = "No IMU log data"
return
if not "IMU" in logdata.channels:
self.result.status = TestResult.StatusType.UNKNOWN
self.result.statusMessage = "No IMU log data"
return
# find some stable LOITER data to analyze, at least 10 seconds
chunks = DataflashLog.DataflashLogHelper.findLoiterChunks(logdata, minLengthSeconds=10, noRCInputs=True)
if not chunks:
self.result.status = TestResult.StatusType.UNKNOWN
self.result.statusMessage = "No stable LOITER log data found"
return
# find some stable LOITER data to analyze, at least 10 seconds
chunks = DataflashLog.DataflashLogHelper.findLoiterChunks(logdata, minLengthSeconds=10, noRCInputs=True)
if not chunks:
self.result.status = TestResult.StatusType.UNKNOWN
self.result.statusMessage = "No stable LOITER log data found"
return
# for now we'll just use the first (largest) chunk of LOITER data
# TODO: ignore the first couple of secs to avoid bad data during transition - or can we check more analytically that we're stable?
# TODO: accumulate all LOITER chunks over min size, or just use the largest one?
startLine = chunks[0][0]
endLine = chunks[0][1]
#print "TestVibration using LOITER chunk from lines %s to %s" % (`startLine`, `endLine`)
# for now we'll just use the first (largest) chunk of LOITER data
# TODO: ignore the first couple of secs to avoid bad data during transition - or can we check more analytically that we're stable?
# TODO: accumulate all LOITER chunks over min size, or just use the largest one?
startLine = chunks[0][0]
endLine = chunks[0][1]
#print "TestVibration using LOITER chunk from lines %s to %s" % (`startLine`, `endLine`)
def getStdDevIMU(logdata, channelName, startLine,endLine):
loiterData = logdata.channels["IMU"][channelName].getSegment(startLine,endLine)
numpyData = numpy.array(loiterData.dictData.values())
return numpy.std(numpyData)
def getStdDevIMU(logdata, channelName, startLine,endLine):
loiterData = logdata.channels["IMU"][channelName].getSegment(startLine,endLine)
numpyData = numpy.array(loiterData.dictData.values())
return numpy.std(numpyData)
# use 2x standard deviations as the metric, so if 95% of samples lie within the aim range we're good
stdDevX = abs(2 * getStdDevIMU(logdata,"AccX",startLine,endLine))
stdDevY = abs(2 * getStdDevIMU(logdata,"AccY",startLine,endLine))
stdDevZ = abs(2 * getStdDevIMU(logdata,"AccZ",startLine,endLine))
if (stdDevX > aimRangeFailXY) or (stdDevY > aimRangeFailXY) or (stdDevZ > aimRangeFailZ):
self.result.status = TestResult.StatusType.FAIL
self.result.statusMessage = "Vibration too high (X:%.2fg, Y:%.2fg, Z:%.2fg)" % (stdDevX,stdDevY,stdDevZ)
elif (stdDevX > aimRangeWarnXY) or (stdDevY > aimRangeWarnXY) or (stdDevZ > aimRangeWarnZ):
self.result.status = TestResult.StatusType.WARN
self.result.statusMessage = "Vibration slightly high (X:%.2fg, Y:%.2fg, Z:%.2fg)" % (stdDevX,stdDevY,stdDevZ)
else:
self.result.status = TestResult.StatusType.GOOD
self.result.statusMessage = "Good vibration values (X:%.2fg, Y:%.2fg, Z:%.2fg)" % (stdDevX,stdDevY,stdDevZ)
# use 2x standard deviations as the metric, so if 95% of samples lie within the aim range we're good
stdDevX = abs(2 * getStdDevIMU(logdata,"AccX",startLine,endLine))
stdDevY = abs(2 * getStdDevIMU(logdata,"AccY",startLine,endLine))
stdDevZ = abs(2 * getStdDevIMU(logdata,"AccZ",startLine,endLine))
if (stdDevX > aimRangeFailXY) or (stdDevY > aimRangeFailXY) or (stdDevZ > aimRangeFailZ):
self.result.status = TestResult.StatusType.FAIL
self.result.statusMessage = "Vibration too high (X:%.2fg, Y:%.2fg, Z:%.2fg)" % (stdDevX,stdDevY,stdDevZ)
elif (stdDevX > aimRangeWarnXY) or (stdDevY > aimRangeWarnXY) or (stdDevZ > aimRangeWarnZ):
self.result.status = TestResult.StatusType.WARN
self.result.statusMessage = "Vibration slightly high (X:%.2fg, Y:%.2fg, Z:%.2fg)" % (stdDevX,stdDevY,stdDevZ)
else:
self.result.status = TestResult.StatusType.GOOD
self.result.statusMessage = "Good vibration values (X:%.2fg, Y:%.2fg, Z:%.2fg)" % (stdDevX,stdDevY,stdDevZ)