From 130a2dcb0bf7c6e0e3b89f9a5c4fe79df11229c7 Mon Sep 17 00:00:00 2001 From: Andrew Chapman Date: Mon, 3 Mar 2014 09:07:45 +0100 Subject: [PATCH] LogAnalyzer: added LogIterator, copter roll/pitch > max lean angle test, --- Tools/LogAnalyzer/DataflashLog.py | 135 ++++++++++++------ Tools/LogAnalyzer/LogAnalyzer.py | 2 +- Tools/LogAnalyzer/UnitTest.py | 20 ++- .../tests/TestPitchRollCoupling.py | 107 +++++++++----- Tools/LogAnalyzer/tests/TestVCC.py | 4 +- 5 files changed, 188 insertions(+), 80 deletions(-) diff --git a/Tools/LogAnalyzer/DataflashLog.py b/Tools/LogAnalyzer/DataflashLog.py index da245ae908..c4daadb8fe 100644 --- a/Tools/LogAnalyzer/DataflashLog.py +++ b/Tools/LogAnalyzer/DataflashLog.py @@ -30,9 +30,10 @@ class Format: class Channel: '''storage for a single stream of data, i.e. all GPS.RelAlt values''' - + # TODO: rethink data storage, but do regression test suite first before refactoring it - # TODO: store data as a curve so we can more easily interpolate and sample the slope? + # TODO: store data as a scipy spline curve so we can more easily interpolate and sample the slope? + dictData = None # dict of linenum->value # store dupe data in dict and list for now, until we decide which is the better way to go listData = None # list of (linenum,value) @@ -50,58 +51,111 @@ class Channel: return max(self.dictData.values()) def avg(self): return numpy.mean(self.dictData.values()) + def getNearestValueFwd(self, lineNumber): + '''Returns (value,lineNumber)''' + index = bisect.bisect_left(self.listData, (lineNumber,-99999)) + while index= lineNumber: + return (self.listData[index][1],line) + index += 1 + raise Exception("Error finding nearest value for line %d" % lineNumber) + def getNearestValueBack(self, lineNumber): + '''Returns (value,lineNumber)''' + index = bisect.bisect_left(self.listData, (lineNumber,-99999)) - 1 + while index>=0: + line = self.listData[index][0] + #print "Looking backwards for nearest value to line number %d, starting at line %d" % (lineNumber,line) # TEMP + if line <= lineNumber: + return (self.listData[index][1],line) + index -= 1 + raise Exception("Error finding nearest value for line %d" % lineNumber) def getNearestValue(self, lineNumber, lookForwards=True): - # TODO: redo Channel.getNearestValue() using listData and bisect, profile speed of TestUnderpowered before/after - '''find the nearest data value to the given lineNumber, defaults to looking forwards. Returns (value,lineNumber)''' - if lineNumber in self.dictData: - return (self.dictData[lineNumber], lineNumber) - offset = 1 - if not lookForwards: - offset = -1 - minLine = min(self.dictData.keys()) - maxLine = max(self.dictData.keys()) - line = max(minLine,lineNumber) - line = min(maxLine,line) - while line >= minLine and line <= maxLine: - if line in self.dictData: - return (self.dictData[line], line) - line = line + offset + '''find the nearest data value to the given lineNumber, defaults to first looking forwards. Returns (value,lineNumber)''' + if lookForwards: + try: + return self.getNearestValueFwd(lineNumber) + except: + return self.getNearestValueBack(lineNumber) + else: + try: + return self.getNearestValueBack(lineNumber) + except: + return self.getNearestValueFwd(lineNumber) raise Exception("Error finding nearest value for line %d" % lineNumber) def getInterpolatedValue(self, lineNumber): (prevValue,prevValueLine) = self.getNearestValue(lineNumber, lookForwards=False) - (nextValue,nextValueLine) = self.getNearestValue(lineNumber, lookForwards=False) + (nextValue,nextValueLine) = self.getNearestValue(lineNumber, lookForwards=True) if prevValueLine == nextValueLine: return prevValue weight = (lineNumber-prevValueLine) / float(nextValueLine-prevValueLine) return ((weight*prevValue) + ((1-weight)*nextValue)) def getIndexOf(self, lineNumber): - index = bisect.bisect_left(self.listData, (lineNumber,0)) - 1 - print "INDEX: %d" % index - assert(self.listData[index][0] == lineNumber) - return index + '''returns the index within this channel's listData of the given lineNumber, or raises an Exception if not found''' + index = bisect.bisect_left(self.listData, (lineNumber,-99999)) + #print "INDEX of line %d: %d" % (lineNumber,index) + #print "self.listData[index][0]: %d" % self.listData[index][0] + if (self.listData[index][0] == lineNumber): + return index + else: + raise Exception("Error finding index for line %d" % lineNumber) -# class LogIterator: -# '''Smart iterator that can move through a log by line number and maintain an index into the nearest values of all data channels''' +class LogIterator: + '''Smart iterator that can move through a log by line number and maintain an index into the nearest values of all data channels''' + # TODO: LogIterator currently indexes the next available value rather than the nearest value, we should make it configurable between next/nearest -# iterators = [] # (lineLabel, dataLabel) -> listIndex -# logdata = None -# currentLine = None + class LogIteratorSubValue: + '''syntactic sugar to allow access by LogIterator[lineLabel][dataLabel]''' + logdata = None + iterators = None + lineLabel = None + def __init__(self, logdata, iterators, lineLabel): + self.logdata = logdata + self.lineLabel = lineLabel + self.iterators = iterators + def __getitem__(self, dataLabel): + index = self.iterators[self.lineLabel][0] + return self.logdata.channels[self.lineLabel][dataLabel].listData[index][1] -# def __init__(self, logdata): -# self.logdata = logdata -# self.currentLine = 0 -# for format in self.logdata.formats: -# for label in format.labels: -# iterators[(format,label)] = 0 + iterators = {} # lineLabel -> (listIndex,lineNumber) + logdata = None + currentLine = None -# def __iter__(self): -# return self + def __init__(self, logdata, lineNumber=0): + self.logdata = logdata + self.currentLine = lineNumber + for lineLabel in self.logdata.formats: + if lineLabel in self.logdata.channels: + self.iterators[lineLabel] = () + self.jump(lineNumber) + def __iter__(self): + return self + def __getitem__(self, lineLabel): + return LogIterator.LogIteratorSubValue(self.logdata, self.iterators, lineLabel) + def next(self): + self.currentLine += 1 + if self.currentLine > self.logdata.lineCount: + return self + for lineLabel in self.iterators.keys(): + # check if the currentLine has gone past our the line we're pointing to for this type of data + dataLabel = self.logdata.formats[lineLabel].labels[0] + (index, lineNumber) = self.iterators[lineLabel] + # if so, and it is not the last entry in the log, then increment the indices for all dataLabels under that lineLabel + if (self.currentLine > lineNumber) and (index < len(self.logdata.channels[lineLabel][dataLabel].listData)-1): + index += 1 + lineNumber = self.logdata.channels[lineLabel][dataLabel].listData[index][0] + self.iterators[lineLabel] = (index,lineNumber) + return self + def jump(self, lineNumber): + self.currentLine = lineNumber + for lineLabel in self.iterators.keys(): + dataLabel = self.logdata.formats[lineLabel].labels[0] + (value,lineNumber) = self.logdata.channels[lineLabel][dataLabel].getNearestValue(self.currentLine) + #print " Found value: %.2f, lineNumber: %d" % (value,lineNumber) + #print " Found index: %d" % self.logdata.channels[lineLabel][dataLabel].getIndexOf(lineNumber) + self.iterators[lineLabel] = (self.logdata.channels[lineLabel][dataLabel].getIndexOf(lineNumber), lineNumber) -# def next(self): -# pass - -# def jump(self, lineNumber): -# pass class DataflashLogHelper: @staticmethod @@ -167,7 +221,6 @@ class DataflashLogHelper: class DataflashLog: '''APM Dataflash log file reader and container class. Keep this simple, add more advanced or specific functions to DataflashLogHelper class''' - # TODO: implement some kind of iterator or different data storage approeach where we can step through the log by time/line and easily access, interpolate and cross-reference values from all channels at that point filename = None diff --git a/Tools/LogAnalyzer/LogAnalyzer.py b/Tools/LogAnalyzer/LogAnalyzer.py index 5855a44ae8..80aeacaef1 100755 --- a/Tools/LogAnalyzer/LogAnalyzer.py +++ b/Tools/LogAnalyzer/LogAnalyzer.py @@ -105,7 +105,7 @@ class TestSuite: statusMessageExtra = test.result.statusMessage.strip('\n\r').split('\n')[1:] execTime = "" if outputStats: - execTime = " (%.2fms)" % (test.execTime) + execTime = " (%6.2fms)" % (test.execTime) if test.result.status == TestResult.StatusType.PASS: print " %20s: PASS %-55s%s" % (test.name, statusMessageFirstLine, execTime) elif test.result.status == TestResult.StatusType.FAIL: diff --git a/Tools/LogAnalyzer/UnitTest.py b/Tools/LogAnalyzer/UnitTest.py index 8b4022bad6..a7cd7de6d1 100755 --- a/Tools/LogAnalyzer/UnitTest.py +++ b/Tools/LogAnalyzer/UnitTest.py @@ -5,7 +5,7 @@ # # -# TODO: implement unit+regression tests +# TODO: implement more unit+regression tests import DataflashLog import traceback @@ -42,6 +42,24 @@ try: assert(logdata.durationSecs == 155) assert(logdata.lineCount == 4750) + # test LogIterator class + lit = DataflashLog.LogIterator(logdata) + assert(lit.currentLine == 0) + assert(lit.iterators == {'CURR': (0, 310), 'ERR': (0, 307), 'NTUN': (0, 2206), 'CTUN': (0, 308), 'GPS': (0, 552), 'CMD': (0, 607), 'D32': (0, 305), 'ATT': (0, 311), 'EV': (0, 306), 'DU32': (0, 309), 'PM': (0, 479)}) + lit.jump(500) + assert(lit.iterators == {'CURR': (9, 514), 'ERR': (1, 553), 'NTUN': (0, 2206), 'CTUN': (87, 500), 'GPS': (0, 552), 'CMD': (0, 607), 'D32': (0, 305), 'ATT': (83, 501), 'EV': (4, 606), 'DU32': (9, 513), 'PM': (1, 719)}) + assert(lit['CTUN']['ThrIn'] == 450) + assert(lit['ATT']['RollIn'] == 11.19) + assert(lit['CURR']['CurrTot'] == 25.827288) + assert(lit['D32']['Value'] == 11122) + lit.next() + assert(lit.iterators == {'CURR': (9, 514), 'ERR': (1, 553), 'NTUN': (0, 2206), 'CTUN': (88, 502), 'GPS': (0, 552), 'CMD': (0, 607), 'D32': (0, 305), 'ATT': (83, 501), 'EV': (4, 606), 'DU32': (9, 513), 'PM': (1, 719)}) + lit.jump(4750) + lit.next() + assert(lit.currentLine == 4751) + assert(lit['ATT']['Roll'] == 2.99) + + # TODO: unit test DataflashLog reading 2 # ... diff --git a/Tools/LogAnalyzer/tests/TestPitchRollCoupling.py b/Tools/LogAnalyzer/tests/TestPitchRollCoupling.py index 7b911f5b19..5489d64648 100644 --- a/Tools/LogAnalyzer/tests/TestPitchRollCoupling.py +++ b/Tools/LogAnalyzer/tests/TestPitchRollCoupling.py @@ -4,10 +4,11 @@ import DataflashLog class TestPitchRollCoupling(Test): '''test for divergence between input and output pitch/roll, i.e. mechanical failure or bad PID tuning''' + # TODO: currently we're only checking for roll/pitch outside of max lean angle, will come back later to analyze roll/pitch in versus out values def __init__(self): self.name = "Pitch/Roll" - self.enable = False # TEMP + self.enable = True # TEMP def run(self, logdata): self.result = TestResult() @@ -15,69 +16,107 @@ class TestPitchRollCoupling(Test): if logdata.vehicleType != "ArduCopter": self.result.status = TestResult.StatusType.NA + return if not "ATT" in logdata.channels: self.result.status = TestResult.StatusType.UNKNOWN self.result.statusMessage = "No ATT log data" return - # TODO: implement pitch/roll input/output divergence testing - - - # note: names changed from PitchIn to DesPitch at some point, check for both - - # what to test for? - # - only analyse while we're airborne - # - absolute diff between in+out? - # - accumulated diff between in+out? - # - slope diff between in+out curves? - # - roll/pitch over max in non-acro modes? - # - if direct control use CTUN roll+pitch, if auto mode use NTUN data - - - # figure out where each mode begins and ends, so we can treat auto and manual modes differently + # figure out where each mode begins and ends, so we can treat auto and manual modes differently and ignore acro/tune modes autoModes = ["RTL","AUTO","LAND","LOITER","GUIDED","CIRCLE","OF_LOITER"] # use NTUN DRol+DPit manualModes = ["STABILIZE","DRIFT","ALT_HOLD"] # use CTUN RollIn/DesRoll + PitchIn/DesPitch ignoreModes = ["ACRO","SPORT","FLIP","AUTOTUNE"] # ignore data from these modes autoSegments = [] # list of (startLine,endLine) pairs manualSegments = [] # list of (startLine,endLine) pairs orderedModes = collections.OrderedDict(sorted(logdata.modeChanges.items(), key=lambda t: t[0])) - isAuto = False # always start in a manual control mode - prevLine = 1 + isAuto = False # we always start in a manual control mode + prevLine = 0 for line,modepair in orderedModes.iteritems(): mode = modepair[0].upper() + if prevLine == 0: + prevLine = line if mode in autoModes: - print "On line %d mode changed to %s (AUTO)" % (line,mode) # TEMP if not isAuto: manualSegments.append((prevLine,line-1)) - print " Previous manual segment: " + `(prevLine,line-1)` # TEMP + #print "Adding manual segment: %d,%d" % (prevLine,line-1) prevLine = line isAuto = True elif mode in manualModes: - print "On line %d mode changed to %s (MANUAL)" % (line,mode) # TEMP if isAuto: autoSegments.append((prevLine,line-1)) - print " Previous auto segment: " + `(prevLine,line-1)` # TEMP + #print "Adding auto segment: %d,%d" % (prevLine,line-1) prevLine = line isAuto = False elif mode in ignoreModes: - pass + if isAuto: + autoSegments.append((prevLine,line-1)) + #print "Adding auto segment: %d,%d" % (prevLine,line-1) + else: + manualSegments.append((prevLine,line-1)) + #print "Adding manual segment: %d,%d" % (prevLine,line-1) + prevLine = 0 else: raise Exception("Unknown mode in TestPitchRollCoupling: %s" % mode) + # and handle the last segment, which doesn't have an ending + if mode in autoModes: + autoSegments.append((prevLine,logdata.lineCount)) + #print "Adding final auto segment: %d,%d" % (prevLine,logdata.lineCount) + elif mode in manualModes: + manualSegments.append((prevLine,logdata.lineCount)) + #print "Adding final manual segment: %d,%d" % (prevLine,logdata.lineCount) - # look through manual segments - for startLine,endLine in manualSegments: - (value,attLine) = logdata.channels["ATT"]["Roll"].getNearestValue(startLine, lookForwards=True) - print "Nearest ATT line after %d is %d" % (startLine,attLine) - index = logdata.channels["ATT"]["Roll"].getIndexOf(attLine) - print "First ATT line in manual segment (%d,%d) is line %d" % (startLine,endLine,logdata.channels["ATT"]["Roll"].listData[index][0]) + # figure out max lean angle, the ANGLE_MAX param was added in AC3.1 + maxLeanAngle = 45.0 + if "ANGLE_MAX" in logdata.parameters: + maxLeanAngle = logdata.parameters["ANGLE_MAX"] / 100.0 + maxLeanAngleBuffer = 10 # allow a buffer margin + + # ignore anything below this altitude, to discard any data while not flying + minAltThreshold = 2.0 + + # look through manual+auto flight segments + # TODO: filter to ignore single points outside range? + (maxRoll, maxRollLine) = (0.0, 0) + (maxPitch, maxPitchLine) = (0.0, 0) + for (startLine,endLine) in manualSegments+autoSegments: + #print "Checking segment %d,%d" % (startLine,endLine) + # quick up-front test, only fallover into more complex line-by-line check if max()>threshold + rollSeg = logdata.channels["ATT"]["Roll"].getSegment(startLine,endLine) + pitchSeg = logdata.channels["ATT"]["Pitch"].getSegment(startLine,endLine) + if not rollSeg.dictData and not pitchSeg.dictData: + continue + # check max roll+pitch for any time where relative altitude is above minAltThreshold + roll = max(abs(rollSeg.min()), abs(rollSeg.max())) + pitch = max(abs(pitchSeg.min()), abs(pitchSeg.max())) + if (roll>(maxLeanAngle+maxLeanAngleBuffer) and abs(roll)>abs(maxRoll)) or (pitch>(maxLeanAngle+maxLeanAngleBuffer) and abs(pitch)>abs(maxPitch)): + lit = DataflashLog.LogIterator(logdata, startLine) + assert(lit.currentLine == startLine) + while lit.currentLine <= endLine: + relativeAlt = lit["CTUN"]["BarAlt"] + if relativeAlt > minAltThreshold: + roll = lit["ATT"]["Roll"] + pitch = lit["ATT"]["Pitch"] + if abs(roll)>(maxLeanAngle+maxLeanAngleBuffer) and abs(roll)>abs(maxRoll): + maxRoll = roll + maxRollLine = lit.currentLine + if abs(pitch)>(maxLeanAngle+maxLeanAngleBuffer) and abs(pitch)>abs(maxPitch): + maxPitch = pitch + maxPitchLine = lit.currentLine + lit.next() + # check for breaking max lean angles + if maxRoll and abs(maxRoll)>abs(maxPitch): + self.result.status = TestResult.StatusType.FAIL + self.result.statusMessage = "Roll (%.2f, line %d) > maximum lean angle (%.2f)" % (maxRoll, maxRollLine, maxLeanAngle) + return + if maxPitch: + self.result.status = TestResult.StatusType.FAIL + self.result.statusMessage = "Pitch (%.2f, line %d) > maximum lean angle (%.2f)" % (maxPitch, maxPitchLine, maxLeanAngle) + return - - - - - # look through auto segments + # TODO: use numpy/scipy to check Roll+RollIn curves for fitness (ignore where we're not airborne) # ... @@ -86,5 +125,3 @@ class TestPitchRollCoupling(Test): - - diff --git a/Tools/LogAnalyzer/tests/TestVCC.py b/Tools/LogAnalyzer/tests/TestVCC.py index 353fe95c32..f4437bc4ec 100644 --- a/Tools/LogAnalyzer/tests/TestVCC.py +++ b/Tools/LogAnalyzer/tests/TestVCC.py @@ -27,8 +27,8 @@ class TestVCC(Test): vccMaxDiff = 0.3 * 1000; if vccDiff > vccMaxDiff: self.result.status = TestResult.StatusType.WARN - self.result.statusMessage = "VCC min/max diff %s, should be <%s" % (vccDiff/1000.0, vccMaxDiff/1000.0) + self.result.statusMessage = "VCC min/max diff %sv, should be <%sv" % (vccDiff/1000.0, vccMaxDiff/1000.0) elif vccMin < vccMinThreshold: self.result.status = TestResult.StatusType.FAIL - self.result.statusMessage = "VCC below minimum of %s (%s)" % (`vccMinThreshold/1000.0`,`vccMin/1000.0`) + self.result.statusMessage = "VCC below minimum of %sv (%sv)" % (`vccMinThreshold/1000.0`,`vccMin/1000.0`)