diff --git a/Tools/LogAnalyzer/tests/TestCompass.py b/Tools/LogAnalyzer/tests/TestCompass.py index 0d345ab5ee..a642bd1227 100644 --- a/Tools/LogAnalyzer/tests/TestCompass.py +++ b/Tools/LogAnalyzer/tests/TestCompass.py @@ -5,106 +5,106 @@ import math class TestCompass(Test): - '''test for compass offsets and throttle interference''' + '''test for compass offsets and throttle interference''' - def __init__(self): - Test.__init__(self) - self.name = "Compass" + def __init__(self): + Test.__init__(self) + self.name = "Compass" - def run(self, logdata, verbose): - self.result = TestResult() - self.result.status = TestResult.StatusType.GOOD + def run(self, logdata, verbose): + self.result = TestResult() + self.result.status = TestResult.StatusType.GOOD - # test that compass offset parameters are within recommended range (+/- 150) - maxOffset = 150 - if logdata.hardwareType == "PX4": - maxOffset = 250 # Pixhawks have their offsets scaled larger - compassOfsX = logdata.parameters["COMPASS_OFS_X"] - compassOfsY = logdata.parameters["COMPASS_OFS_Y"] - compassOfsZ = logdata.parameters["COMPASS_OFS_Z"] - if abs(compassOfsX) > maxOffset or abs(compassOfsY) > maxOffset or abs(compassOfsZ) > maxOffset: - self.result.status = TestResult.StatusType.FAIL - self.result.statusMessage = "Large compass off params (X:%.2f, Y:%.2f, Z:%.2f)\n" % (compassOfsX,compassOfsY,compassOfsZ) + # test that compass offset parameters are within recommended range (+/- 150) + maxOffset = 150 + if logdata.hardwareType == "PX4": + maxOffset = 250 # Pixhawks have their offsets scaled larger + compassOfsX = logdata.parameters["COMPASS_OFS_X"] + compassOfsY = logdata.parameters["COMPASS_OFS_Y"] + compassOfsZ = logdata.parameters["COMPASS_OFS_Z"] + if abs(compassOfsX) > maxOffset or abs(compassOfsY) > maxOffset or abs(compassOfsZ) > maxOffset: + self.result.status = TestResult.StatusType.FAIL + self.result.statusMessage = "Large compass off params (X:%.2f, Y:%.2f, Z:%.2f)\n" % (compassOfsX,compassOfsY,compassOfsZ) - # check for excessive compass offsets using MAG data if present (it can change during flight is compass learn is on) - if "MAG" in logdata.channels: - errMsg = "" - if logdata.channels["MAG"]["OfsX"].max() > maxOffset: - errMsg = errMsg + "\nX: %.2f" % logdata.channels["MAG"]["OfsX"].max() - err = True - if logdata.channels["MAG"]["OfsX"].min() < -maxOffset: - errMsg = errMsg + "\nX: %.2f" % logdata.channels["MAG"]["OfsX"].min() - err = True - if logdata.channels["MAG"]["OfsY"].max() > maxOffset: - errMsg = errMsg + "\nY: %.2f" % logdata.channels["MAG"]["OfsY"].max() - err = True - if logdata.channels["MAG"]["OfsY"].min() < -maxOffset: - errMsg = errMsg + "\nY: %.2f" % logdata.channels["MAG"]["OfsY"].min() - err = True - if logdata.channels["MAG"]["OfsZ"].max() > maxOffset: - errMsg = errMsg + "\nZ: %.2f" % logdata.channels["MAG"]["OfsZ"].max() - err = True - if logdata.channels["MAG"]["OfsZ"].min() < -maxOffset: - errMsg = errMsg + "\nZ: %.2f" % logdata.channels["MAG"]["OfsZ"].min() - err = True - if errMsg: - self.result.status = TestResult.StatusType.FAIL - self.result.statusMessage = self.result.statusMessage + "Large compass offset in MAG data:" + errMsg + "\n" + # check for excessive compass offsets using MAG data if present (it can change during flight is compass learn is on) + if "MAG" in logdata.channels: + errMsg = "" + if logdata.channels["MAG"]["OfsX"].max() > maxOffset: + errMsg = errMsg + "\nX: %.2f" % logdata.channels["MAG"]["OfsX"].max() + err = True + if logdata.channels["MAG"]["OfsX"].min() < -maxOffset: + errMsg = errMsg + "\nX: %.2f" % logdata.channels["MAG"]["OfsX"].min() + err = True + if logdata.channels["MAG"]["OfsY"].max() > maxOffset: + errMsg = errMsg + "\nY: %.2f" % logdata.channels["MAG"]["OfsY"].max() + err = True + if logdata.channels["MAG"]["OfsY"].min() < -maxOffset: + errMsg = errMsg + "\nY: %.2f" % logdata.channels["MAG"]["OfsY"].min() + err = True + if logdata.channels["MAG"]["OfsZ"].max() > maxOffset: + errMsg = errMsg + "\nZ: %.2f" % logdata.channels["MAG"]["OfsZ"].max() + err = True + if logdata.channels["MAG"]["OfsZ"].min() < -maxOffset: + errMsg = errMsg + "\nZ: %.2f" % logdata.channels["MAG"]["OfsZ"].min() + err = True + if errMsg: + self.result.status = TestResult.StatusType.FAIL + self.result.statusMessage = self.result.statusMessage + "Large compass offset in MAG data:" + errMsg + "\n" - # check for mag field length change, and length outside of recommended range - if "MAG" in logdata.channels: - percentDiffThresholdWARN = 0.25 - percentDiffThresholdFAIL = 0.35 - minMagFieldThreshold = 120.0 - maxMagFieldThreshold = 550.0 - index = 0 - length = len(logdata.channels["MAG"]["MagX"].listData) - magField = [] - (minMagField, maxMagField) = (None,None) - (minMagFieldLine, maxMagFieldLine) = (None,None) - zerosFound = False - while indexmaxMagField: - maxMagField = mf - maxMagFieldLine = logdata.channels["MAG"]["MagX"].listData[index][0] - if index == 0: - (minMagField, maxMagField) = (mf,mf) - index += 1 - percentDiff = (maxMagField-minMagField) / minMagField - if percentDiff > percentDiffThresholdFAIL: - self.result.status = TestResult.StatusType.FAIL - self.result.statusMessage = self.result.statusMessage + "Large change in mag_field (%.2f%%)\n" % (percentDiff*100) - elif percentDiff > percentDiffThresholdWARN: - if self.result.status != TestResult.StatusType.FAIL: - self.result.status = TestResult.StatusType.WARN - self.result.statusMessage = self.result.statusMessage + "Moderate change in mag_field (%.2f%%)\n" % (percentDiff*100) - else: - self.result.statusMessage = self.result.statusMessage + "mag_field interference within limits (%.2f%%)\n" % (percentDiff*100) - if minMagField < minMagFieldThreshold: - self.result.statusMessage = self.result.statusMessage + "Min mag field length (%.2f) < recommended (%.2f)\n" % (minMagField,minMagFieldThreshold) - if maxMagField > maxMagFieldThreshold: - self.result.statusMessage = self.result.statusMessage + "Max mag field length (%.2f) > recommended (%.2f)\n" % (maxMagField,maxMagFieldThreshold) - if zerosFound: - if self.result.status != TestResult.StatusType.FAIL: - self.result.status = TestResult.StatusType.WARN - self.result.statusMessage = self.result.statusMessage + "All zeros found in MAG X/Y/Z log data\n" - if verbose: - self.result.statusMessage = self.result.statusMessage + "Min mag_field of %.2f on line %d\n" % (minMagField,minMagFieldLine) - self.result.statusMessage = self.result.statusMessage + "Max mag_field of %.2f on line %d\n" % (maxMagField,maxMagFieldLine) + # check for mag field length change, and length outside of recommended range + if "MAG" in logdata.channels: + percentDiffThresholdWARN = 0.25 + percentDiffThresholdFAIL = 0.35 + minMagFieldThreshold = 120.0 + maxMagFieldThreshold = 550.0 + index = 0 + length = len(logdata.channels["MAG"]["MagX"].listData) + magField = [] + (minMagField, maxMagField) = (None,None) + (minMagFieldLine, maxMagFieldLine) = (None,None) + zerosFound = False + while indexmaxMagField: + maxMagField = mf + maxMagFieldLine = logdata.channels["MAG"]["MagX"].listData[index][0] + if index == 0: + (minMagField, maxMagField) = (mf,mf) + index += 1 + percentDiff = (maxMagField-minMagField) / minMagField + if percentDiff > percentDiffThresholdFAIL: + self.result.status = TestResult.StatusType.FAIL + self.result.statusMessage = self.result.statusMessage + "Large change in mag_field (%.2f%%)\n" % (percentDiff*100) + elif percentDiff > percentDiffThresholdWARN: + if self.result.status != TestResult.StatusType.FAIL: + self.result.status = TestResult.StatusType.WARN + self.result.statusMessage = self.result.statusMessage + "Moderate change in mag_field (%.2f%%)\n" % (percentDiff*100) + else: + self.result.statusMessage = self.result.statusMessage + "mag_field interference within limits (%.2f%%)\n" % (percentDiff*100) + if minMagField < minMagFieldThreshold: + self.result.statusMessage = self.result.statusMessage + "Min mag field length (%.2f) < recommended (%.2f)\n" % (minMagField,minMagFieldThreshold) + if maxMagField > maxMagFieldThreshold: + self.result.statusMessage = self.result.statusMessage + "Max mag field length (%.2f) > recommended (%.2f)\n" % (maxMagField,maxMagFieldThreshold) + if zerosFound: + if self.result.status != TestResult.StatusType.FAIL: + self.result.status = TestResult.StatusType.WARN + self.result.statusMessage = self.result.statusMessage + "All zeros found in MAG X/Y/Z log data\n" + if verbose: + self.result.statusMessage = self.result.statusMessage + "Min mag_field of %.2f on line %d\n" % (minMagField,minMagFieldLine) + self.result.statusMessage = self.result.statusMessage + "Max mag_field of %.2f on line %d\n" % (maxMagField,maxMagFieldLine) - else: - self.result.statusMessage = self.result.statusMessage + "No MAG data, unable to test mag_field interference\n" + else: + self.result.statusMessage = self.result.statusMessage + "No MAG data, unable to test mag_field interference\n" diff --git a/Tools/LogAnalyzer/tests/TestParams.py b/Tools/LogAnalyzer/tests/TestParams.py index fcee9df912..e7057a4c93 100644 --- a/Tools/LogAnalyzer/tests/TestParams.py +++ b/Tools/LogAnalyzer/tests/TestParams.py @@ -5,55 +5,55 @@ import math # for isnan() class TestParams(Test): - '''test for any obviously bad parameters in the config''' + '''test for any obviously bad parameters in the config''' - def __init__(self): - Test.__init__(self) - self.name = "Parameters" + def __init__(self): + Test.__init__(self) + self.name = "Parameters" - # helper functions - def __checkParamIsEqual(self, paramName, expectedValue, logdata): - value = logdata.parameters[paramName] - if value != expectedValue: - self.result.status = TestResult.StatusType.FAIL - self.result.statusMessage = self.result.statusMessage + "%s set to %s, expecting %s\n" % (paramName, `value`, `expectedValue`) - def __checkParamIsLessThan(self, paramName, maxValue, logdata): - value = logdata.parameters[paramName] - if value >= maxValue: - self.result.status = TestResult.StatusType.FAIL - self.result.statusMessage = self.result.statusMessage + "%s set to %s, expecting less than %s\n" % (paramName, `value`, `maxValue`) - def __checkParamIsMoreThan(self, paramName, minValue, logdata): - value = logdata.parameters[paramName] - if value <= minValue: - self.result.status = TestResult.StatusType.FAIL - self.result.statusMessage = self.result.statusMessage + "%s set to %s, expecting less than %s\n" % (paramName, `value`, `minValue`) + # helper functions + def __checkParamIsEqual(self, paramName, expectedValue, logdata): + value = logdata.parameters[paramName] + if value != expectedValue: + self.result.status = TestResult.StatusType.FAIL + self.result.statusMessage = self.result.statusMessage + "%s set to %s, expecting %s\n" % (paramName, `value`, `expectedValue`) + def __checkParamIsLessThan(self, paramName, maxValue, logdata): + value = logdata.parameters[paramName] + if value >= maxValue: + self.result.status = TestResult.StatusType.FAIL + self.result.statusMessage = self.result.statusMessage + "%s set to %s, expecting less than %s\n" % (paramName, `value`, `maxValue`) + def __checkParamIsMoreThan(self, paramName, minValue, logdata): + value = logdata.parameters[paramName] + if value <= minValue: + self.result.status = TestResult.StatusType.FAIL + self.result.statusMessage = self.result.statusMessage + "%s set to %s, expecting less than %s\n" % (paramName, `value`, `minValue`) - def run(self, logdata, verbose): - self.result = TestResult() - self.result.status = TestResult.StatusType.GOOD # GOOD by default, tests below will override it if they fail + def run(self, logdata, verbose): + self.result = TestResult() + self.result.status = TestResult.StatusType.GOOD # GOOD by default, tests below will override it if they fail - # check all params for NaN - for name,value in logdata.parameters.iteritems(): - if math.isnan(value): - self.result.status = TestResult.StatusType.FAIL - self.result.statusMessage = self.result.statusMessage + name + " is NaN\n" + # check all params for NaN + for name,value in logdata.parameters.iteritems(): + if math.isnan(value): + self.result.status = TestResult.StatusType.FAIL + self.result.statusMessage = self.result.statusMessage + name + " is NaN\n" - # add parameter checks below using the helper functions, any failures will trigger a FAIL status and accumulate info in statusMessage - # if more complex checking or correlations are required you can access parameter values directly using the logdata.parameters[paramName] dict - if logdata.vehicleType == "ArduCopter": - self.__checkParamIsEqual ("MAG_ENABLE", 1, logdata) - self.__checkParamIsLessThan("THR_MIN", 200, logdata) - self.__checkParamIsLessThan("THR_MID", 701, logdata) - self.__checkParamIsMoreThan("THR_MID", 299, logdata) - # TODO: add more parameter tests, these are just an example... - elif logdata.vehicleType == "ArduPlane": - # TODO: add parameter checks for plane... - pass - elif logdata.vehicleType == "ArduRover": - # TODO: add parameter checks for rover... - pass + # add parameter checks below using the helper functions, any failures will trigger a FAIL status and accumulate info in statusMessage + # if more complex checking or correlations are required you can access parameter values directly using the logdata.parameters[paramName] dict + if logdata.vehicleType == "ArduCopter": + self.__checkParamIsEqual ("MAG_ENABLE", 1, logdata) + self.__checkParamIsLessThan("THR_MIN", 200, logdata) + self.__checkParamIsLessThan("THR_MID", 701, logdata) + self.__checkParamIsMoreThan("THR_MID", 299, logdata) + # TODO: add more parameter tests, these are just an example... + elif logdata.vehicleType == "ArduPlane": + # TODO: add parameter checks for plane... + pass + elif logdata.vehicleType == "ArduRover": + # TODO: add parameter checks for rover... + pass - if self.result.status == TestResult.StatusType.FAIL: - self.result.statusMessage = "Bad parameters found:\n" + self.result.statusMessage + if self.result.status == TestResult.StatusType.FAIL: + self.result.statusMessage = "Bad parameters found:\n" + self.result.statusMessage diff --git a/Tools/LogAnalyzer/tests/TestPerformance.py b/Tools/LogAnalyzer/tests/TestPerformance.py index 1d1e0f187c..9154da9b7d 100644 --- a/Tools/LogAnalyzer/tests/TestPerformance.py +++ b/Tools/LogAnalyzer/tests/TestPerformance.py @@ -3,62 +3,62 @@ import DataflashLog class TestPerformance(Test): - '''check performance monitoring messages (PM) for issues with slow loops, etc''' + '''check performance monitoring messages (PM) for issues with slow loops, etc''' - def __init__(self): - Test.__init__(self) - self.name = "PM" + def __init__(self): + Test.__init__(self) + self.name = "PM" - def run(self, logdata, verbose): - self.result = TestResult() - self.result.status = TestResult.StatusType.GOOD + def run(self, logdata, verbose): + self.result = TestResult() + self.result.status = TestResult.StatusType.GOOD - # this test should be valid for all vehicle types, just need to figure out why PM logging data is different in each - if logdata.vehicleType != "ArduCopter": - self.result.status = TestResult.StatusType.NA - return + # this test should be valid for all vehicle types, just need to figure out why PM logging data is different in each + if logdata.vehicleType != "ArduCopter": + self.result.status = TestResult.StatusType.NA + return - # NOTE: we'll ignore MaxT altogether for now, it seems there are quite regularly one or two high values in there, even ignoring the ones expected after arm/disarm events - # gather info on arm/disarm lines, we will ignore the MaxT data from the first line found after each of these - # armingLines = [] - # for line,ev in logdata.channels["EV"]["Id"].listData: - # if (ev == 10) or (ev == 11): - # armingLines.append(line) - # ignoreMaxTLines = [] - # for maxT in logdata.channels["PM"]["MaxT"].listData: - # if not armingLines: - # break - # if maxT[0] > armingLines[0]: - # #print "Ignoring maxT from line %d, as it is the first PM line after arming on line %d" % (maxT[0],armingLines[0]) - # ignoreMaxTLines.append(maxT[0]) - # armingLines.pop(0) + # NOTE: we'll ignore MaxT altogether for now, it seems there are quite regularly one or two high values in there, even ignoring the ones expected after arm/disarm events + # gather info on arm/disarm lines, we will ignore the MaxT data from the first line found after each of these + # armingLines = [] + # for line,ev in logdata.channels["EV"]["Id"].listData: + # if (ev == 10) or (ev == 11): + # armingLines.append(line) + # ignoreMaxTLines = [] + # for maxT in logdata.channels["PM"]["MaxT"].listData: + # if not armingLines: + # break + # if maxT[0] > armingLines[0]: + # #print "Ignoring maxT from line %d, as it is the first PM line after arming on line %d" % (maxT[0],armingLines[0]) + # ignoreMaxTLines.append(maxT[0]) + # armingLines.pop(0) - if "PM" not in logdata.channels: - self.result.status = TestResult.StatusType.UNKNOWN - self.result.statusMessage = "No PM log data" - return + if "PM" not in logdata.channels: + self.result.status = TestResult.StatusType.UNKNOWN + self.result.statusMessage = "No PM log data" + return - # check for slow loops, i.e. NLon greater than 6% of NLoop - maxPercentSlow = 0 - maxPercentSlowLine = 0 - slowLoopLineCount = 0 - for i in range(len(logdata.channels["PM"]["NLon"].listData)): - (line, nLon) = logdata.channels["PM"]["NLon"].listData[i] - (line, nLoop) = logdata.channels["PM"]["NLoop"].listData[i] - (line, maxT) = logdata.channels["PM"]["MaxT"].listData[i] - percentSlow = (nLon / float(nLoop)) * 100 - if percentSlow > 6.0: - slowLoopLineCount = slowLoopLineCount + 1 - if percentSlow > maxPercentSlow: - maxPercentSlow = percentSlow - maxPercentSlowLine = line - #if (maxT > 13000) and line not in ignoreMaxTLines: - # print "MaxT of %d detected on line %d" % (maxT,line) - if (maxPercentSlow > 10) or (slowLoopLineCount > 6): - self.result.status = TestResult.StatusType.FAIL - self.result.statusMessage = "%d slow loop lines found, max %.2f%% on line %d" % (slowLoopLineCount,maxPercentSlow,maxPercentSlowLine) - elif (maxPercentSlow > 6): - self.result.status = TestResult.StatusType.WARN - self.result.statusMessage = "%d slow loop lines found, max %.2f%% on line %d" % (slowLoopLineCount,maxPercentSlow,maxPercentSlowLine) + # check for slow loops, i.e. NLon greater than 6% of NLoop + maxPercentSlow = 0 + maxPercentSlowLine = 0 + slowLoopLineCount = 0 + for i in range(len(logdata.channels["PM"]["NLon"].listData)): + (line, nLon) = logdata.channels["PM"]["NLon"].listData[i] + (line, nLoop) = logdata.channels["PM"]["NLoop"].listData[i] + (line, maxT) = logdata.channels["PM"]["MaxT"].listData[i] + percentSlow = (nLon / float(nLoop)) * 100 + if percentSlow > 6.0: + slowLoopLineCount = slowLoopLineCount + 1 + if percentSlow > maxPercentSlow: + maxPercentSlow = percentSlow + maxPercentSlowLine = line + #if (maxT > 13000) and line not in ignoreMaxTLines: + # print "MaxT of %d detected on line %d" % (maxT,line) + if (maxPercentSlow > 10) or (slowLoopLineCount > 6): + self.result.status = TestResult.StatusType.FAIL + self.result.statusMessage = "%d slow loop lines found, max %.2f%% on line %d" % (slowLoopLineCount,maxPercentSlow,maxPercentSlowLine) + elif (maxPercentSlow > 6): + self.result.status = TestResult.StatusType.WARN + self.result.statusMessage = "%d slow loop lines found, max %.2f%% on line %d" % (slowLoopLineCount,maxPercentSlow,maxPercentSlowLine) diff --git a/Tools/LogAnalyzer/tests/TestPitchRollCoupling.py b/Tools/LogAnalyzer/tests/TestPitchRollCoupling.py index b4c5375a3a..5d72754237 100644 --- a/Tools/LogAnalyzer/tests/TestPitchRollCoupling.py +++ b/Tools/LogAnalyzer/tests/TestPitchRollCoupling.py @@ -5,116 +5,116 @@ import collections class TestPitchRollCoupling(Test): - '''test for divergence between input and output pitch/roll, i.e. mechanical failure or bad PID tuning''' - # TODO: currently we're only checking for roll/pitch outside of max lean angle, will come back later to analyze roll/pitch in versus out values + '''test for divergence between input and output pitch/roll, i.e. mechanical failure or bad PID tuning''' + # TODO: currently we're only checking for roll/pitch outside of max lean angle, will come back later to analyze roll/pitch in versus out values - def __init__(self): - Test.__init__(self) - self.name = "Pitch/Roll" - self.enable = True # TEMP + def __init__(self): + Test.__init__(self) + self.name = "Pitch/Roll" + self.enable = True # TEMP - def run(self, logdata, verbose): - self.result = TestResult() - self.result.status = TestResult.StatusType.GOOD + def run(self, logdata, verbose): + self.result = TestResult() + self.result.status = TestResult.StatusType.GOOD - if logdata.vehicleType != "ArduCopter": - self.result.status = TestResult.StatusType.NA - return + if logdata.vehicleType != "ArduCopter": + self.result.status = TestResult.StatusType.NA + return - if not "ATT" in logdata.channels: - self.result.status = TestResult.StatusType.UNKNOWN - self.result.statusMessage = "No ATT log data" - return + if not "ATT" in logdata.channels: + self.result.status = TestResult.StatusType.UNKNOWN + self.result.statusMessage = "No ATT log data" + return - # figure out where each mode begins and ends, so we can treat auto and manual modes differently and ignore acro/tune modes - autoModes = ["RTL","AUTO","LAND","LOITER","GUIDED","CIRCLE","OF_LOITER","HYBRID"] # use NTUN DRol+DPit - manualModes = ["STABILIZE","DRIFT","ALTHOLD","ALT_HOLD"] # use CTUN RollIn/DesRoll + PitchIn/DesPitch - ignoreModes = ["ACRO","SPORT","FLIP","AUTOTUNE",""] # ignore data from these modes - autoSegments = [] # list of (startLine,endLine) pairs - manualSegments = [] # list of (startLine,endLine) pairs - orderedModes = collections.OrderedDict(sorted(logdata.modeChanges.items(), key=lambda t: t[0])) - isAuto = False # we always start in a manual control mode - prevLine = 0 - mode = "" - for line,modepair in orderedModes.iteritems(): - mode = modepair[0].upper() - if prevLine == 0: - prevLine = line - if mode in autoModes: - if not isAuto: - manualSegments.append((prevLine,line-1)) - prevLine = line - isAuto = True - elif mode in manualModes: - if isAuto: - autoSegments.append((prevLine,line-1)) - prevLine = line - isAuto = False - elif mode in ignoreModes: - if isAuto: - autoSegments.append((prevLine,line-1)) - else: - manualSegments.append((prevLine,line-1)) - prevLine = 0 - else: - raise Exception("Unknown mode in TestPitchRollCoupling: %s" % mode) - # and handle the last segment, which doesn't have an ending - if mode in autoModes: - autoSegments.append((prevLine,logdata.lineCount)) - elif mode in manualModes: - manualSegments.append((prevLine,logdata.lineCount)) + # figure out where each mode begins and ends, so we can treat auto and manual modes differently and ignore acro/tune modes + autoModes = ["RTL","AUTO","LAND","LOITER","GUIDED","CIRCLE","OF_LOITER","HYBRID"] # use NTUN DRol+DPit + manualModes = ["STABILIZE","DRIFT","ALTHOLD","ALT_HOLD"] # use CTUN RollIn/DesRoll + PitchIn/DesPitch + ignoreModes = ["ACRO","SPORT","FLIP","AUTOTUNE",""] # ignore data from these modes + autoSegments = [] # list of (startLine,endLine) pairs + manualSegments = [] # list of (startLine,endLine) pairs + orderedModes = collections.OrderedDict(sorted(logdata.modeChanges.items(), key=lambda t: t[0])) + isAuto = False # we always start in a manual control mode + prevLine = 0 + mode = "" + for line,modepair in orderedModes.iteritems(): + mode = modepair[0].upper() + if prevLine == 0: + prevLine = line + if mode in autoModes: + if not isAuto: + manualSegments.append((prevLine,line-1)) + prevLine = line + isAuto = True + elif mode in manualModes: + if isAuto: + autoSegments.append((prevLine,line-1)) + prevLine = line + isAuto = False + elif mode in ignoreModes: + if isAuto: + autoSegments.append((prevLine,line-1)) + else: + manualSegments.append((prevLine,line-1)) + prevLine = 0 + else: + raise Exception("Unknown mode in TestPitchRollCoupling: %s" % mode) + # and handle the last segment, which doesn't have an ending + if mode in autoModes: + autoSegments.append((prevLine,logdata.lineCount)) + elif mode in manualModes: + manualSegments.append((prevLine,logdata.lineCount)) - # figure out max lean angle, the ANGLE_MAX param was added in AC3.1 - maxLeanAngle = 45.0 - if "ANGLE_MAX" in logdata.parameters: - maxLeanAngle = logdata.parameters["ANGLE_MAX"] / 100.0 - maxLeanAngleBuffer = 10 # allow a buffer margin + # figure out max lean angle, the ANGLE_MAX param was added in AC3.1 + maxLeanAngle = 45.0 + if "ANGLE_MAX" in logdata.parameters: + maxLeanAngle = logdata.parameters["ANGLE_MAX"] / 100.0 + maxLeanAngleBuffer = 10 # allow a buffer margin - # ignore anything below this altitude, to discard any data while not flying - minAltThreshold = 2.0 + # ignore anything below this altitude, to discard any data while not flying + minAltThreshold = 2.0 - # look through manual+auto flight segments - # TODO: filter to ignore single points outside range? - (maxRoll, maxRollLine) = (0.0, 0) - (maxPitch, maxPitchLine) = (0.0, 0) - for (startLine,endLine) in manualSegments+autoSegments: - # quick up-front test, only fallover into more complex line-by-line check if max()>threshold - rollSeg = logdata.channels["ATT"]["Roll"].getSegment(startLine,endLine) - pitchSeg = logdata.channels["ATT"]["Pitch"].getSegment(startLine,endLine) - if not rollSeg.dictData and not pitchSeg.dictData: - continue - # check max roll+pitch for any time where relative altitude is above minAltThreshold - roll = max(abs(rollSeg.min()), abs(rollSeg.max())) - pitch = max(abs(pitchSeg.min()), abs(pitchSeg.max())) - if (roll>(maxLeanAngle+maxLeanAngleBuffer) and abs(roll)>abs(maxRoll)) or (pitch>(maxLeanAngle+maxLeanAngleBuffer) and abs(pitch)>abs(maxPitch)): - lit = DataflashLog.LogIterator(logdata, startLine) - assert(lit.currentLine == startLine) - while lit.currentLine <= endLine: - relativeAlt = lit["CTUN"]["BarAlt"] - if relativeAlt > minAltThreshold: - roll = lit["ATT"]["Roll"] - pitch = lit["ATT"]["Pitch"] - if abs(roll)>(maxLeanAngle+maxLeanAngleBuffer) and abs(roll)>abs(maxRoll): - maxRoll = roll - maxRollLine = lit.currentLine - if abs(pitch)>(maxLeanAngle+maxLeanAngleBuffer) and abs(pitch)>abs(maxPitch): - maxPitch = pitch - maxPitchLine = lit.currentLine - lit.next() - # check for breaking max lean angles - if maxRoll and abs(maxRoll)>abs(maxPitch): - self.result.status = TestResult.StatusType.FAIL - self.result.statusMessage = "Roll (%.2f, line %d) > maximum lean angle (%.2f)" % (maxRoll, maxRollLine, maxLeanAngle) - return - if maxPitch: - self.result.status = TestResult.StatusType.FAIL - self.result.statusMessage = "Pitch (%.2f, line %d) > maximum lean angle (%.2f)" % (maxPitch, maxPitchLine, maxLeanAngle) - return + # look through manual+auto flight segments + # TODO: filter to ignore single points outside range? + (maxRoll, maxRollLine) = (0.0, 0) + (maxPitch, maxPitchLine) = (0.0, 0) + for (startLine,endLine) in manualSegments+autoSegments: + # quick up-front test, only fallover into more complex line-by-line check if max()>threshold + rollSeg = logdata.channels["ATT"]["Roll"].getSegment(startLine,endLine) + pitchSeg = logdata.channels["ATT"]["Pitch"].getSegment(startLine,endLine) + if not rollSeg.dictData and not pitchSeg.dictData: + continue + # check max roll+pitch for any time where relative altitude is above minAltThreshold + roll = max(abs(rollSeg.min()), abs(rollSeg.max())) + pitch = max(abs(pitchSeg.min()), abs(pitchSeg.max())) + if (roll>(maxLeanAngle+maxLeanAngleBuffer) and abs(roll)>abs(maxRoll)) or (pitch>(maxLeanAngle+maxLeanAngleBuffer) and abs(pitch)>abs(maxPitch)): + lit = DataflashLog.LogIterator(logdata, startLine) + assert(lit.currentLine == startLine) + while lit.currentLine <= endLine: + relativeAlt = lit["CTUN"]["BarAlt"] + if relativeAlt > minAltThreshold: + roll = lit["ATT"]["Roll"] + pitch = lit["ATT"]["Pitch"] + if abs(roll)>(maxLeanAngle+maxLeanAngleBuffer) and abs(roll)>abs(maxRoll): + maxRoll = roll + maxRollLine = lit.currentLine + if abs(pitch)>(maxLeanAngle+maxLeanAngleBuffer) and abs(pitch)>abs(maxPitch): + maxPitch = pitch + maxPitchLine = lit.currentLine + lit.next() + # check for breaking max lean angles + if maxRoll and abs(maxRoll)>abs(maxPitch): + self.result.status = TestResult.StatusType.FAIL + self.result.statusMessage = "Roll (%.2f, line %d) > maximum lean angle (%.2f)" % (maxRoll, maxRollLine, maxLeanAngle) + return + if maxPitch: + self.result.status = TestResult.StatusType.FAIL + self.result.statusMessage = "Pitch (%.2f, line %d) > maximum lean angle (%.2f)" % (maxPitch, maxPitchLine, maxLeanAngle) + return - # TODO: use numpy/scipy to check Roll+RollIn curves for fitness (ignore where we're not airborne) - # ... + # TODO: use numpy/scipy to check Roll+RollIn curves for fitness (ignore where we're not airborne) + # ... diff --git a/Tools/LogAnalyzer/tests/TestThrust.py b/Tools/LogAnalyzer/tests/TestThrust.py index 7a5319922f..44b6686fa6 100644 --- a/Tools/LogAnalyzer/tests/TestThrust.py +++ b/Tools/LogAnalyzer/tests/TestThrust.py @@ -3,76 +3,76 @@ import DataflashLog class TestThrust(Test): - '''test for sufficient thrust (copter only for now)''' + '''test for sufficient thrust (copter only for now)''' - def __init__(self): - Test.__init__(self) - self.name = "Thrust" - - def run(self, logdata, verbose): - self.result = TestResult() - self.result.status = TestResult.StatusType.GOOD + def __init__(self): + Test.__init__(self) + self.name = "Thrust" + + def run(self, logdata, verbose): + self.result = TestResult() + self.result.status = TestResult.StatusType.GOOD - if logdata.vehicleType != "ArduCopter": - self.result.status = TestResult.StatusType.NA - return + if logdata.vehicleType != "ArduCopter": + self.result.status = TestResult.StatusType.NA + return - if not "CTUN" in logdata.channels: - self.result.status = TestResult.StatusType.UNKNOWN - self.result.statusMessage = "No CTUN log data" - return - if not "ATT" in logdata.channels: - self.result.status = TestResult.StatusType.UNKNOWN - self.result.statusMessage = "No ATT log data" - return + if not "CTUN" in logdata.channels: + self.result.status = TestResult.StatusType.UNKNOWN + self.result.statusMessage = "No CTUN log data" + return + if not "ATT" in logdata.channels: + self.result.status = TestResult.StatusType.UNKNOWN + self.result.statusMessage = "No ATT log data" + return - # check for throttle (CTUN.ThrOut) above 700 for a chunk of time with copter not rising + # check for throttle (CTUN.ThrOut) above 700 for a chunk of time with copter not rising - highThrottleThreshold = 700 - tiltThreshold = 20 # ignore high throttle when roll or tilt is above this value - climbThresholdWARN = 100 - climbThresholdFAIL = 50 - minSampleLength = 50 + highThrottleThreshold = 700 + tiltThreshold = 20 # ignore high throttle when roll or tilt is above this value + climbThresholdWARN = 100 + climbThresholdFAIL = 50 + minSampleLength = 50 - highThrottleSegments = [] + highThrottleSegments = [] - # find any contiguous chunks where CTUN.ThrOut > highThrottleThreshold, ignore high throttle if tilt > tiltThreshold, and discard any segments shorter than minSampleLength - start = None - data = logdata.channels["CTUN"]["ThrOut"].listData - for i in range(0,len(data)): - (lineNumber,value) = data[i] - isBelowTiltThreshold = True - if value > highThrottleThreshold: - (roll,meh) = logdata.channels["ATT"]["Roll"].getNearestValue(lineNumber) - (pitch,meh) = logdata.channels["ATT"]["Pitch"].getNearestValue(lineNumber) - if (abs(roll) > tiltThreshold) or (abs(pitch) > tiltThreshold): - isBelowTiltThreshold = False - if (value > highThrottleThreshold) and isBelowTiltThreshold: - if start == None: - start = i - elif start != None: - if (i-start) > minSampleLength: - #print "Found high throttle chunk from line %d to %d (%d samples)" % (data[start][0],data[i][0],i-start+1) - highThrottleSegments.append((start,i)) - start = None + # find any contiguous chunks where CTUN.ThrOut > highThrottleThreshold, ignore high throttle if tilt > tiltThreshold, and discard any segments shorter than minSampleLength + start = None + data = logdata.channels["CTUN"]["ThrOut"].listData + for i in range(0,len(data)): + (lineNumber,value) = data[i] + isBelowTiltThreshold = True + if value > highThrottleThreshold: + (roll,meh) = logdata.channels["ATT"]["Roll"].getNearestValue(lineNumber) + (pitch,meh) = logdata.channels["ATT"]["Pitch"].getNearestValue(lineNumber) + if (abs(roll) > tiltThreshold) or (abs(pitch) > tiltThreshold): + isBelowTiltThreshold = False + if (value > highThrottleThreshold) and isBelowTiltThreshold: + if start == None: + start = i + elif start != None: + if (i-start) > minSampleLength: + #print "Found high throttle chunk from line %d to %d (%d samples)" % (data[start][0],data[i][0],i-start+1) + highThrottleSegments.append((start,i)) + start = None - climbRate = "CRate" - if "CRate" not in logdata.channels["CTUN"]: - climbRate = "CRt" + climbRate = "CRate" + if "CRate" not in logdata.channels["CTUN"]: + climbRate = "CRt" - # loop through each checking climbRate, if < 50 FAIL, if < 100 WARN - # TODO: we should filter climbRate and use its slope rather than value for this test - for seg in highThrottleSegments: - (startLine,endLine) = (data[seg[0]][0], data[seg[1]][0]) - avgClimbRate = logdata.channels["CTUN"][climbRate].getSegment(startLine,endLine).avg() - avgThrOut = logdata.channels["CTUN"]["ThrOut"].getSegment(startLine,endLine).avg() - if avgClimbRate < climbThresholdFAIL: - self.result.status = TestResult.StatusType.FAIL - self.result.statusMessage = "Avg climb rate %.2f cm/s for throttle avg %d" % (avgClimbRate,avgThrOut) - return - if avgClimbRate < climbThresholdWARN: - self.result.status = TestResult.StatusType.WARN - self.result.statusMessage = "Avg climb rate %.2f cm/s for throttle avg %d" % (avgClimbRate,avgThrOut) + # loop through each checking climbRate, if < 50 FAIL, if < 100 WARN + # TODO: we should filter climbRate and use its slope rather than value for this test + for seg in highThrottleSegments: + (startLine,endLine) = (data[seg[0]][0], data[seg[1]][0]) + avgClimbRate = logdata.channels["CTUN"][climbRate].getSegment(startLine,endLine).avg() + avgThrOut = logdata.channels["CTUN"]["ThrOut"].getSegment(startLine,endLine).avg() + if avgClimbRate < climbThresholdFAIL: + self.result.status = TestResult.StatusType.FAIL + self.result.statusMessage = "Avg climb rate %.2f cm/s for throttle avg %d" % (avgClimbRate,avgThrOut) + return + if avgClimbRate < climbThresholdWARN: + self.result.status = TestResult.StatusType.WARN + self.result.statusMessage = "Avg climb rate %.2f cm/s for throttle avg %d" % (avgClimbRate,avgThrOut) diff --git a/Tools/LogAnalyzer/tests/TestVCC.py b/Tools/LogAnalyzer/tests/TestVCC.py index b5c250d556..2983730e65 100644 --- a/Tools/LogAnalyzer/tests/TestVCC.py +++ b/Tools/LogAnalyzer/tests/TestVCC.py @@ -5,31 +5,31 @@ import collections class TestVCC(Test): - '''test for VCC within recommendations, or abrupt end to log in flight''' + '''test for VCC within recommendations, or abrupt end to log in flight''' - def __init__(self): - Test.__init__(self) - self.name = "VCC" + def __init__(self): + Test.__init__(self) + self.name = "VCC" - def run(self, logdata, verbose): - self.result = TestResult() - self.result.status = TestResult.StatusType.GOOD + def run(self, logdata, verbose): + self.result = TestResult() + self.result.status = TestResult.StatusType.GOOD - if not "CURR" in logdata.channels: - self.result.status = TestResult.StatusType.UNKNOWN - self.result.statusMessage = "No CURR log data" - return + if not "CURR" in logdata.channels: + self.result.status = TestResult.StatusType.UNKNOWN + self.result.statusMessage = "No CURR log data" + return - # just a naive min/max test for now - vccMin = logdata.channels["CURR"]["Vcc"].min() - vccMax = logdata.channels["CURR"]["Vcc"].max() - vccDiff = vccMax - vccMin; - vccMinThreshold = 4.6 * 1000; - vccMaxDiff = 0.3 * 1000; - if vccDiff > vccMaxDiff: - self.result.status = TestResult.StatusType.WARN - self.result.statusMessage = "VCC min/max diff %sv, should be <%sv" % (vccDiff/1000.0, vccMaxDiff/1000.0) - elif vccMin < vccMinThreshold: - self.result.status = TestResult.StatusType.FAIL - self.result.statusMessage = "VCC below minimum of %sv (%sv)" % (`vccMinThreshold/1000.0`,`vccMin/1000.0`) - + # just a naive min/max test for now + vccMin = logdata.channels["CURR"]["Vcc"].min() + vccMax = logdata.channels["CURR"]["Vcc"].max() + vccDiff = vccMax - vccMin; + vccMinThreshold = 4.6 * 1000; + vccMaxDiff = 0.3 * 1000; + if vccDiff > vccMaxDiff: + self.result.status = TestResult.StatusType.WARN + self.result.statusMessage = "VCC min/max diff %sv, should be <%sv" % (vccDiff/1000.0, vccMaxDiff/1000.0) + elif vccMin < vccMinThreshold: + self.result.status = TestResult.StatusType.FAIL + self.result.statusMessage = "VCC below minimum of %sv (%sv)" % (`vccMinThreshold/1000.0`,`vccMin/1000.0`) + diff --git a/Tools/LogAnalyzer/tests/TestVibration.py b/Tools/LogAnalyzer/tests/TestVibration.py index 42b0391542..a44d7bcb22 100644 --- a/Tools/LogAnalyzer/tests/TestVibration.py +++ b/Tools/LogAnalyzer/tests/TestVibration.py @@ -5,63 +5,63 @@ import numpy class TestVibration(Test): - '''test for accelerometer vibration (accX/accY/accZ) within recommendations''' + '''test for accelerometer vibration (accX/accY/accZ) within recommendations''' - def __init__(self): - Test.__init__(self) - self.name = "Vibration" + def __init__(self): + Test.__init__(self) + self.name = "Vibration" - def run(self, logdata, verbose): - self.result = TestResult() + def run(self, logdata, verbose): + self.result = TestResult() - if logdata.vehicleType != "ArduCopter": - self.result.status = TestResult.StatusType.NA - return + if logdata.vehicleType != "ArduCopter": + self.result.status = TestResult.StatusType.NA + return - # constants - gravity = -9.81 - aimRangeWarnXY = 1.5 - aimRangeFailXY = 3.0 - aimRangeWarnZ = 2.0 # gravity +/- aim range - aimRangeFailZ = 5.0 # gravity +/- aim range + # constants + gravity = -9.81 + aimRangeWarnXY = 1.5 + aimRangeFailXY = 3.0 + aimRangeWarnZ = 2.0 # gravity +/- aim range + aimRangeFailZ = 5.0 # gravity +/- aim range - if not "IMU" in logdata.channels: - self.result.status = TestResult.StatusType.UNKNOWN - self.result.statusMessage = "No IMU log data" - return + if not "IMU" in logdata.channels: + self.result.status = TestResult.StatusType.UNKNOWN + self.result.statusMessage = "No IMU log data" + return - # find some stable LOITER data to analyze, at least 10 seconds - chunks = DataflashLog.DataflashLogHelper.findLoiterChunks(logdata, minLengthSeconds=10, noRCInputs=True) - if not chunks: - self.result.status = TestResult.StatusType.UNKNOWN - self.result.statusMessage = "No stable LOITER log data found" - return + # find some stable LOITER data to analyze, at least 10 seconds + chunks = DataflashLog.DataflashLogHelper.findLoiterChunks(logdata, minLengthSeconds=10, noRCInputs=True) + if not chunks: + self.result.status = TestResult.StatusType.UNKNOWN + self.result.statusMessage = "No stable LOITER log data found" + return - # for now we'll just use the first (largest) chunk of LOITER data - # TODO: ignore the first couple of secs to avoid bad data during transition - or can we check more analytically that we're stable? - # TODO: accumulate all LOITER chunks over min size, or just use the largest one? - startLine = chunks[0][0] - endLine = chunks[0][1] - #print "TestVibration using LOITER chunk from lines %s to %s" % (`startLine`, `endLine`) + # for now we'll just use the first (largest) chunk of LOITER data + # TODO: ignore the first couple of secs to avoid bad data during transition - or can we check more analytically that we're stable? + # TODO: accumulate all LOITER chunks over min size, or just use the largest one? + startLine = chunks[0][0] + endLine = chunks[0][1] + #print "TestVibration using LOITER chunk from lines %s to %s" % (`startLine`, `endLine`) - def getStdDevIMU(logdata, channelName, startLine,endLine): - loiterData = logdata.channels["IMU"][channelName].getSegment(startLine,endLine) - numpyData = numpy.array(loiterData.dictData.values()) - return numpy.std(numpyData) + def getStdDevIMU(logdata, channelName, startLine,endLine): + loiterData = logdata.channels["IMU"][channelName].getSegment(startLine,endLine) + numpyData = numpy.array(loiterData.dictData.values()) + return numpy.std(numpyData) - # use 2x standard deviations as the metric, so if 95% of samples lie within the aim range we're good - stdDevX = abs(2 * getStdDevIMU(logdata,"AccX",startLine,endLine)) - stdDevY = abs(2 * getStdDevIMU(logdata,"AccY",startLine,endLine)) - stdDevZ = abs(2 * getStdDevIMU(logdata,"AccZ",startLine,endLine)) - if (stdDevX > aimRangeFailXY) or (stdDevY > aimRangeFailXY) or (stdDevZ > aimRangeFailZ): - self.result.status = TestResult.StatusType.FAIL - self.result.statusMessage = "Vibration too high (X:%.2fg, Y:%.2fg, Z:%.2fg)" % (stdDevX,stdDevY,stdDevZ) - elif (stdDevX > aimRangeWarnXY) or (stdDevY > aimRangeWarnXY) or (stdDevZ > aimRangeWarnZ): - self.result.status = TestResult.StatusType.WARN - self.result.statusMessage = "Vibration slightly high (X:%.2fg, Y:%.2fg, Z:%.2fg)" % (stdDevX,stdDevY,stdDevZ) - else: - self.result.status = TestResult.StatusType.GOOD - self.result.statusMessage = "Good vibration values (X:%.2fg, Y:%.2fg, Z:%.2fg)" % (stdDevX,stdDevY,stdDevZ) + # use 2x standard deviations as the metric, so if 95% of samples lie within the aim range we're good + stdDevX = abs(2 * getStdDevIMU(logdata,"AccX",startLine,endLine)) + stdDevY = abs(2 * getStdDevIMU(logdata,"AccY",startLine,endLine)) + stdDevZ = abs(2 * getStdDevIMU(logdata,"AccZ",startLine,endLine)) + if (stdDevX > aimRangeFailXY) or (stdDevY > aimRangeFailXY) or (stdDevZ > aimRangeFailZ): + self.result.status = TestResult.StatusType.FAIL + self.result.statusMessage = "Vibration too high (X:%.2fg, Y:%.2fg, Z:%.2fg)" % (stdDevX,stdDevY,stdDevZ) + elif (stdDevX > aimRangeWarnXY) or (stdDevY > aimRangeWarnXY) or (stdDevZ > aimRangeWarnZ): + self.result.status = TestResult.StatusType.WARN + self.result.statusMessage = "Vibration slightly high (X:%.2fg, Y:%.2fg, Z:%.2fg)" % (stdDevX,stdDevY,stdDevZ) + else: + self.result.status = TestResult.StatusType.GOOD + self.result.statusMessage = "Good vibration values (X:%.2fg, Y:%.2fg, Z:%.2fg)" % (stdDevX,stdDevY,stdDevZ) - +