LogAnalyzer: added LogIterator, copter roll/pitch > max lean angle test,

This commit is contained in:
Andrew Chapman 2014-03-03 09:07:45 +01:00 committed by Andrew Tridgell
parent 81fcf4bda7
commit 130a2dcb0b
5 changed files with 188 additions and 80 deletions

View File

@ -30,9 +30,10 @@ class Format:
class Channel:
'''storage for a single stream of data, i.e. all GPS.RelAlt values'''
# TODO: rethink data storage, but do regression test suite first before refactoring it
# TODO: store data as a curve so we can more easily interpolate and sample the slope?
# TODO: store data as a scipy spline curve so we can more easily interpolate and sample the slope?
dictData = None # dict of linenum->value # store dupe data in dict and list for now, until we decide which is the better way to go
listData = None # list of (linenum,value)
@ -50,58 +51,111 @@ class Channel:
return max(self.dictData.values())
def avg(self):
return numpy.mean(self.dictData.values())
def getNearestValueFwd(self, lineNumber):
'''Returns (value,lineNumber)'''
index = bisect.bisect_left(self.listData, (lineNumber,-99999))
while index<len(self.listData):
line = self.listData[index][0]
#print "Looking forwards for nearest value to line number %d, starting at line %d" % (lineNumber,line) # TEMP
if line >= lineNumber:
return (self.listData[index][1],line)
index += 1
raise Exception("Error finding nearest value for line %d" % lineNumber)
def getNearestValueBack(self, lineNumber):
'''Returns (value,lineNumber)'''
index = bisect.bisect_left(self.listData, (lineNumber,-99999)) - 1
while index>=0:
line = self.listData[index][0]
#print "Looking backwards for nearest value to line number %d, starting at line %d" % (lineNumber,line) # TEMP
if line <= lineNumber:
return (self.listData[index][1],line)
index -= 1
raise Exception("Error finding nearest value for line %d" % lineNumber)
def getNearestValue(self, lineNumber, lookForwards=True):
# TODO: redo Channel.getNearestValue() using listData and bisect, profile speed of TestUnderpowered before/after
'''find the nearest data value to the given lineNumber, defaults to looking forwards. Returns (value,lineNumber)'''
if lineNumber in self.dictData:
return (self.dictData[lineNumber], lineNumber)
offset = 1
if not lookForwards:
offset = -1
minLine = min(self.dictData.keys())
maxLine = max(self.dictData.keys())
line = max(minLine,lineNumber)
line = min(maxLine,line)
while line >= minLine and line <= maxLine:
if line in self.dictData:
return (self.dictData[line], line)
line = line + offset
'''find the nearest data value to the given lineNumber, defaults to first looking forwards. Returns (value,lineNumber)'''
if lookForwards:
try:
return self.getNearestValueFwd(lineNumber)
except:
return self.getNearestValueBack(lineNumber)
else:
try:
return self.getNearestValueBack(lineNumber)
except:
return self.getNearestValueFwd(lineNumber)
raise Exception("Error finding nearest value for line %d" % lineNumber)
def getInterpolatedValue(self, lineNumber):
(prevValue,prevValueLine) = self.getNearestValue(lineNumber, lookForwards=False)
(nextValue,nextValueLine) = self.getNearestValue(lineNumber, lookForwards=False)
(nextValue,nextValueLine) = self.getNearestValue(lineNumber, lookForwards=True)
if prevValueLine == nextValueLine:
return prevValue
weight = (lineNumber-prevValueLine) / float(nextValueLine-prevValueLine)
return ((weight*prevValue) + ((1-weight)*nextValue))
def getIndexOf(self, lineNumber):
index = bisect.bisect_left(self.listData, (lineNumber,0)) - 1
print "INDEX: %d" % index
assert(self.listData[index][0] == lineNumber)
return index
'''returns the index within this channel's listData of the given lineNumber, or raises an Exception if not found'''
index = bisect.bisect_left(self.listData, (lineNumber,-99999))
#print "INDEX of line %d: %d" % (lineNumber,index)
#print "self.listData[index][0]: %d" % self.listData[index][0]
if (self.listData[index][0] == lineNumber):
return index
else:
raise Exception("Error finding index for line %d" % lineNumber)
# class LogIterator:
# '''Smart iterator that can move through a log by line number and maintain an index into the nearest values of all data channels'''
class LogIterator:
'''Smart iterator that can move through a log by line number and maintain an index into the nearest values of all data channels'''
# TODO: LogIterator currently indexes the next available value rather than the nearest value, we should make it configurable between next/nearest
# iterators = [] # (lineLabel, dataLabel) -> listIndex
# logdata = None
# currentLine = None
class LogIteratorSubValue:
'''syntactic sugar to allow access by LogIterator[lineLabel][dataLabel]'''
logdata = None
iterators = None
lineLabel = None
def __init__(self, logdata, iterators, lineLabel):
self.logdata = logdata
self.lineLabel = lineLabel
self.iterators = iterators
def __getitem__(self, dataLabel):
index = self.iterators[self.lineLabel][0]
return self.logdata.channels[self.lineLabel][dataLabel].listData[index][1]
# def __init__(self, logdata):
# self.logdata = logdata
# self.currentLine = 0
# for format in self.logdata.formats:
# for label in format.labels:
# iterators[(format,label)] = 0
iterators = {} # lineLabel -> (listIndex,lineNumber)
logdata = None
currentLine = None
# def __iter__(self):
# return self
def __init__(self, logdata, lineNumber=0):
self.logdata = logdata
self.currentLine = lineNumber
for lineLabel in self.logdata.formats:
if lineLabel in self.logdata.channels:
self.iterators[lineLabel] = ()
self.jump(lineNumber)
def __iter__(self):
return self
def __getitem__(self, lineLabel):
return LogIterator.LogIteratorSubValue(self.logdata, self.iterators, lineLabel)
def next(self):
self.currentLine += 1
if self.currentLine > self.logdata.lineCount:
return self
for lineLabel in self.iterators.keys():
# check if the currentLine has gone past our the line we're pointing to for this type of data
dataLabel = self.logdata.formats[lineLabel].labels[0]
(index, lineNumber) = self.iterators[lineLabel]
# if so, and it is not the last entry in the log, then increment the indices for all dataLabels under that lineLabel
if (self.currentLine > lineNumber) and (index < len(self.logdata.channels[lineLabel][dataLabel].listData)-1):
index += 1
lineNumber = self.logdata.channels[lineLabel][dataLabel].listData[index][0]
self.iterators[lineLabel] = (index,lineNumber)
return self
def jump(self, lineNumber):
self.currentLine = lineNumber
for lineLabel in self.iterators.keys():
dataLabel = self.logdata.formats[lineLabel].labels[0]
(value,lineNumber) = self.logdata.channels[lineLabel][dataLabel].getNearestValue(self.currentLine)
#print " Found value: %.2f, lineNumber: %d" % (value,lineNumber)
#print " Found index: %d" % self.logdata.channels[lineLabel][dataLabel].getIndexOf(lineNumber)
self.iterators[lineLabel] = (self.logdata.channels[lineLabel][dataLabel].getIndexOf(lineNumber), lineNumber)
# def next(self):
# pass
# def jump(self, lineNumber):
# pass
class DataflashLogHelper:
@staticmethod
@ -167,7 +221,6 @@ class DataflashLogHelper:
class DataflashLog:
'''APM Dataflash log file reader and container class. Keep this simple, add more advanced or specific functions to DataflashLogHelper class'''
# TODO: implement some kind of iterator or different data storage approeach where we can step through the log by time/line and easily access, interpolate and cross-reference values from all channels at that point
filename = None

View File

@ -105,7 +105,7 @@ class TestSuite:
statusMessageExtra = test.result.statusMessage.strip('\n\r').split('\n')[1:]
execTime = ""
if outputStats:
execTime = " (%.2fms)" % (test.execTime)
execTime = " (%6.2fms)" % (test.execTime)
if test.result.status == TestResult.StatusType.PASS:
print " %20s: PASS %-55s%s" % (test.name, statusMessageFirstLine, execTime)
elif test.result.status == TestResult.StatusType.FAIL:

View File

@ -5,7 +5,7 @@
#
#
# TODO: implement unit+regression tests
# TODO: implement more unit+regression tests
import DataflashLog
import traceback
@ -42,6 +42,24 @@ try:
assert(logdata.durationSecs == 155)
assert(logdata.lineCount == 4750)
# test LogIterator class
lit = DataflashLog.LogIterator(logdata)
assert(lit.currentLine == 0)
assert(lit.iterators == {'CURR': (0, 310), 'ERR': (0, 307), 'NTUN': (0, 2206), 'CTUN': (0, 308), 'GPS': (0, 552), 'CMD': (0, 607), 'D32': (0, 305), 'ATT': (0, 311), 'EV': (0, 306), 'DU32': (0, 309), 'PM': (0, 479)})
lit.jump(500)
assert(lit.iterators == {'CURR': (9, 514), 'ERR': (1, 553), 'NTUN': (0, 2206), 'CTUN': (87, 500), 'GPS': (0, 552), 'CMD': (0, 607), 'D32': (0, 305), 'ATT': (83, 501), 'EV': (4, 606), 'DU32': (9, 513), 'PM': (1, 719)})
assert(lit['CTUN']['ThrIn'] == 450)
assert(lit['ATT']['RollIn'] == 11.19)
assert(lit['CURR']['CurrTot'] == 25.827288)
assert(lit['D32']['Value'] == 11122)
lit.next()
assert(lit.iterators == {'CURR': (9, 514), 'ERR': (1, 553), 'NTUN': (0, 2206), 'CTUN': (88, 502), 'GPS': (0, 552), 'CMD': (0, 607), 'D32': (0, 305), 'ATT': (83, 501), 'EV': (4, 606), 'DU32': (9, 513), 'PM': (1, 719)})
lit.jump(4750)
lit.next()
assert(lit.currentLine == 4751)
assert(lit['ATT']['Roll'] == 2.99)
# TODO: unit test DataflashLog reading 2
# ...

View File

@ -4,10 +4,11 @@ import DataflashLog
class TestPitchRollCoupling(Test):
'''test for divergence between input and output pitch/roll, i.e. mechanical failure or bad PID tuning'''
# TODO: currently we're only checking for roll/pitch outside of max lean angle, will come back later to analyze roll/pitch in versus out values
def __init__(self):
self.name = "Pitch/Roll"
self.enable = False # TEMP
self.enable = True # TEMP
def run(self, logdata):
self.result = TestResult()
@ -15,69 +16,107 @@ class TestPitchRollCoupling(Test):
if logdata.vehicleType != "ArduCopter":
self.result.status = TestResult.StatusType.NA
return
if not "ATT" in logdata.channels:
self.result.status = TestResult.StatusType.UNKNOWN
self.result.statusMessage = "No ATT log data"
return
# TODO: implement pitch/roll input/output divergence testing -
# note: names changed from PitchIn to DesPitch at some point, check for both
# what to test for?
# - only analyse while we're airborne
# - absolute diff between in+out?
# - accumulated diff between in+out?
# - slope diff between in+out curves?
# - roll/pitch over max in non-acro modes?
# - if direct control use CTUN roll+pitch, if auto mode use NTUN data
# figure out where each mode begins and ends, so we can treat auto and manual modes differently
# figure out where each mode begins and ends, so we can treat auto and manual modes differently and ignore acro/tune modes
autoModes = ["RTL","AUTO","LAND","LOITER","GUIDED","CIRCLE","OF_LOITER"] # use NTUN DRol+DPit
manualModes = ["STABILIZE","DRIFT","ALT_HOLD"] # use CTUN RollIn/DesRoll + PitchIn/DesPitch
ignoreModes = ["ACRO","SPORT","FLIP","AUTOTUNE"] # ignore data from these modes
autoSegments = [] # list of (startLine,endLine) pairs
manualSegments = [] # list of (startLine,endLine) pairs
orderedModes = collections.OrderedDict(sorted(logdata.modeChanges.items(), key=lambda t: t[0]))
isAuto = False # always start in a manual control mode
prevLine = 1
isAuto = False # we always start in a manual control mode
prevLine = 0
for line,modepair in orderedModes.iteritems():
mode = modepair[0].upper()
if prevLine == 0:
prevLine = line
if mode in autoModes:
print "On line %d mode changed to %s (AUTO)" % (line,mode) # TEMP
if not isAuto:
manualSegments.append((prevLine,line-1))
print " Previous manual segment: " + `(prevLine,line-1)` # TEMP
#print "Adding manual segment: %d,%d" % (prevLine,line-1)
prevLine = line
isAuto = True
elif mode in manualModes:
print "On line %d mode changed to %s (MANUAL)" % (line,mode) # TEMP
if isAuto:
autoSegments.append((prevLine,line-1))
print " Previous auto segment: " + `(prevLine,line-1)` # TEMP
#print "Adding auto segment: %d,%d" % (prevLine,line-1)
prevLine = line
isAuto = False
elif mode in ignoreModes:
pass
if isAuto:
autoSegments.append((prevLine,line-1))
#print "Adding auto segment: %d,%d" % (prevLine,line-1)
else:
manualSegments.append((prevLine,line-1))
#print "Adding manual segment: %d,%d" % (prevLine,line-1)
prevLine = 0
else:
raise Exception("Unknown mode in TestPitchRollCoupling: %s" % mode)
# and handle the last segment, which doesn't have an ending
if mode in autoModes:
autoSegments.append((prevLine,logdata.lineCount))
#print "Adding final auto segment: %d,%d" % (prevLine,logdata.lineCount)
elif mode in manualModes:
manualSegments.append((prevLine,logdata.lineCount))
#print "Adding final manual segment: %d,%d" % (prevLine,logdata.lineCount)
# look through manual segments
for startLine,endLine in manualSegments:
(value,attLine) = logdata.channels["ATT"]["Roll"].getNearestValue(startLine, lookForwards=True)
print "Nearest ATT line after %d is %d" % (startLine,attLine)
index = logdata.channels["ATT"]["Roll"].getIndexOf(attLine)
print "First ATT line in manual segment (%d,%d) is line %d" % (startLine,endLine,logdata.channels["ATT"]["Roll"].listData[index][0])
# figure out max lean angle, the ANGLE_MAX param was added in AC3.1
maxLeanAngle = 45.0
if "ANGLE_MAX" in logdata.parameters:
maxLeanAngle = logdata.parameters["ANGLE_MAX"] / 100.0
maxLeanAngleBuffer = 10 # allow a buffer margin
# ignore anything below this altitude, to discard any data while not flying
minAltThreshold = 2.0
# look through manual+auto flight segments
# TODO: filter to ignore single points outside range?
(maxRoll, maxRollLine) = (0.0, 0)
(maxPitch, maxPitchLine) = (0.0, 0)
for (startLine,endLine) in manualSegments+autoSegments:
#print "Checking segment %d,%d" % (startLine,endLine)
# quick up-front test, only fallover into more complex line-by-line check if max()>threshold
rollSeg = logdata.channels["ATT"]["Roll"].getSegment(startLine,endLine)
pitchSeg = logdata.channels["ATT"]["Pitch"].getSegment(startLine,endLine)
if not rollSeg.dictData and not pitchSeg.dictData:
continue
# check max roll+pitch for any time where relative altitude is above minAltThreshold
roll = max(abs(rollSeg.min()), abs(rollSeg.max()))
pitch = max(abs(pitchSeg.min()), abs(pitchSeg.max()))
if (roll>(maxLeanAngle+maxLeanAngleBuffer) and abs(roll)>abs(maxRoll)) or (pitch>(maxLeanAngle+maxLeanAngleBuffer) and abs(pitch)>abs(maxPitch)):
lit = DataflashLog.LogIterator(logdata, startLine)
assert(lit.currentLine == startLine)
while lit.currentLine <= endLine:
relativeAlt = lit["CTUN"]["BarAlt"]
if relativeAlt > minAltThreshold:
roll = lit["ATT"]["Roll"]
pitch = lit["ATT"]["Pitch"]
if abs(roll)>(maxLeanAngle+maxLeanAngleBuffer) and abs(roll)>abs(maxRoll):
maxRoll = roll
maxRollLine = lit.currentLine
if abs(pitch)>(maxLeanAngle+maxLeanAngleBuffer) and abs(pitch)>abs(maxPitch):
maxPitch = pitch
maxPitchLine = lit.currentLine
lit.next()
# check for breaking max lean angles
if maxRoll and abs(maxRoll)>abs(maxPitch):
self.result.status = TestResult.StatusType.FAIL
self.result.statusMessage = "Roll (%.2f, line %d) > maximum lean angle (%.2f)" % (maxRoll, maxRollLine, maxLeanAngle)
return
if maxPitch:
self.result.status = TestResult.StatusType.FAIL
self.result.statusMessage = "Pitch (%.2f, line %d) > maximum lean angle (%.2f)" % (maxPitch, maxPitchLine, maxLeanAngle)
return
# look through auto segments
# TODO: use numpy/scipy to check Roll+RollIn curves for fitness (ignore where we're not airborne)
# ...
@ -86,5 +125,3 @@ class TestPitchRollCoupling(Test):

View File

@ -27,8 +27,8 @@ class TestVCC(Test):
vccMaxDiff = 0.3 * 1000;
if vccDiff > vccMaxDiff:
self.result.status = TestResult.StatusType.WARN
self.result.statusMessage = "VCC min/max diff %s, should be <%s" % (vccDiff/1000.0, vccMaxDiff/1000.0)
self.result.statusMessage = "VCC min/max diff %sv, should be <%sv" % (vccDiff/1000.0, vccMaxDiff/1000.0)
elif vccMin < vccMinThreshold:
self.result.status = TestResult.StatusType.FAIL
self.result.statusMessage = "VCC below minimum of %s (%s)" % (`vccMinThreshold/1000.0`,`vccMin/1000.0`)
self.result.statusMessage = "VCC below minimum of %sv (%sv)" % (`vccMinThreshold/1000.0`,`vccMin/1000.0`)