LogAnalyzer: added check for binary log files (can't parse yet)

This commit is contained in:
Andrew Chapman 2014-03-03 19:55:25 +01:00 committed by Andrew Tridgell
parent 70c2682c28
commit 0f5a89f22e
6 changed files with 32 additions and 36 deletions

View File

@ -1,10 +1,9 @@
#
# Code to abstract the parsing of APM Dataflash log files, currently only used by the LogAnalyzer
#
# Initial code by Andrew Chapman (chapman@skymount.com), 16th Jan 2014
# Initial code by Andrew Chapman (amchapman@gmail.com), 16th Jan 2014
#
import pprint # temp
import collections
import os
import numpy
@ -12,7 +11,7 @@ import bisect
class Format:
'''Channel format as specified by the FMT lines in the log file'''
'''Data channel format as specified by the FMT lines in the log file'''
msgType = 0
msgLen = 0
name = ""
@ -31,11 +30,10 @@ class Format:
class Channel:
'''storage for a single stream of data, i.e. all GPS.RelAlt values'''
# TODO: rethink data storage, but do regression test suite first before refactoring it
# TODO: rethink data storage, but do more thorough regression testing before refactoring it
# TODO: store data as a scipy spline curve so we can more easily interpolate and sample the slope?
dictData = None # dict of linenum->value # store dupe data in dict and list for now, until we decide which is the better way to go
listData = None # list of (linenum,value)
listData = None # list of (linenum,value) # store dupe data in dict and list for now, until we decide which is the better way to go
def __init__(self):
self.dictData = {}
@ -134,6 +132,7 @@ class LogIterator:
def __getitem__(self, lineLabel):
return LogIterator.LogIteratorSubValue(self.logdata, self.iterators, lineLabel)
def next(self):
'''increment iterator to next log line'''
self.currentLine += 1
if self.currentLine > self.logdata.lineCount:
return self
@ -148,16 +147,17 @@ class LogIterator:
self.iterators[lineLabel] = (index,lineNumber)
return self
def jump(self, lineNumber):
'''jump iterator to specified log line'''
self.currentLine = lineNumber
for lineLabel in self.iterators.keys():
dataLabel = self.logdata.formats[lineLabel].labels[0]
(value,lineNumber) = self.logdata.channels[lineLabel][dataLabel].getNearestValue(self.currentLine)
#print " Found value: %.2f, lineNumber: %d" % (value,lineNumber)
#print " Found index: %d" % self.logdata.channels[lineLabel][dataLabel].getIndexOf(lineNumber)
self.iterators[lineLabel] = (self.logdata.channels[lineLabel][dataLabel].getIndexOf(lineNumber), lineNumber)
class DataflashLogHelper:
'''helper functions for dealing with log data, put here to keep DataflashLog class as a simple parser and data store'''
@staticmethod
def getTimeAtLine(logdata, lineNumber):
'''returns the nearest GPS timestamp in milliseconds after the given line number'''
@ -242,6 +242,7 @@ class DataflashLog:
skippedLines = 0
def getCopterType(self):
'''returns quad/hex/octo/tradheli if this is a copter log'''
if self.vehicleType != "ArduCopter":
return None
motLabels = []
@ -259,7 +260,7 @@ class DataflashLog:
return ""
def __castToFormatType(self,value,valueType):
'''using format characters from libraries/DataFlash/DataFlash.h to cast to basic python int/float/string types'''
'''using format characters from libraries/DataFlash/DataFlash.h to cast strings to basic python int/float/string types'''
intTypes = "bBhHiIM"
floatTypes = "fcCeEL"
charTypes = "nNZ"
@ -277,9 +278,12 @@ class DataflashLog:
def read(self, logfile, ignoreBadlines=False):
'''returns on successful log read (including bad lines if ignoreBadlines==True), will throw an Exception otherwise'''
# TODO: dataflash log parsing code is *SUPER* hacky, should re-write more methodically
# TODO: dataflash log parsing code is pretty hacky, should re-write more methodically
self.filename = logfile
f = open(self.filename, 'r')
if f.read(4) == '\xa3\x95\x80\x80':
raise Exception("Unable to parse binary log files at this time, will be added soon")
f.seek(0)
lineNumber = 0
knownHardwareTypes = ["APM", "PX4", "MPNG"]
for line in f:
@ -382,5 +386,6 @@ class DataflashLog:
self.durationSecs = (lastTimeGPS-firstTimeGPS) / 1000
# TODO: calculate logging rate based on timestamps
# ...

View File

@ -2,7 +2,7 @@
#
# A module to analyze and identify any common problems which can be determined from log files
#
# Initial code by Andrew Chapman (chapman@skymount.com), 16th Jan 2014
# Initial code by Andrew Chapman (amchapman@gmail.com), 16th Jan 2014
#
@ -14,6 +14,8 @@
# - copter+plane use 'V' in their vehicle type/version/build line, rover uses lower case 'v'. Copter+Rover give a build number, plane does not
# - CTUN.ThrOut on copter is 0-1000, on plane+rover it is 0-100
# TODO: add test for noisy baro values
import DataflashLog
@ -33,27 +35,28 @@ class TestResult:
# NA means not applicable for this log (e.g. copter tests against a plane log), UNKNOWN means it is missing data required for the test
PASS, FAIL, WARN, UNKNOWN, NA = range(5)
status = None
statusMessage = ""
statusMessage = "" # can be multi-line
class Test:
'''base class to be inherited by each specific log test. Each test should be quite granular so we have lots of small tests with clear results'''
name = ""
result = None # will be an instance of TestResult after being run
'''base class to be inherited by log tests. Each test should be quite granular so we have lots of small tests with clear results'''
name = ""
result = None # will be an instance of TestResult after being run
execTime = None
enable = True
enable = True
def run(self, logdata, verbose=False):
pass
class TestSuite:
'''registers test classes'''
'''registers test classes, loading using a basic plugin architecture, and can run them all in one run() operation'''
tests = []
logfile = None
logdata = None
def __init__(self):
# dynamically load in Test subclasses from the 'tests' folder
# to prevent one being loaded, move it out of that folder, or set that test's .enable attribute to False
dirName = os.path.dirname(os.path.abspath(__file__))
testScripts = glob.glob(dirName + '/tests/*.py')
testClasses = []
@ -69,7 +72,7 @@ class TestSuite:
# self.tests.append(m.TestBadParams())
def run(self, logdata, verbose):
'''run all registered tests in a single call'''
'''run all registered tests in a single call, gathering execution timing info'''
self.logdata = logdata
self.logfile = logdata.filename
for test in self.tests:
@ -81,6 +84,7 @@ class TestSuite:
test.execTime = 1000 * (endTime-startTime)
def outputPlainText(self, outputStats):
'''output test results in plain text'''
print 'Dataflash log analysis report for file: ' + self.logfile
print 'Log size: %.2fmb (%d lines)' % (self.logdata.filesizeKB / 1024.0, self.logdata.lineCount)
print 'Log duration: %s' % str(datetime.timedelta(seconds=self.logdata.durationSecs)) + '\n'
@ -125,6 +129,8 @@ class TestSuite:
print '\n'
def outputXML(self, xmlFile):
'''output test results to an XML file'''
# open the file for writing
xml = None
try:
@ -156,9 +162,6 @@ class TestSuite:
print >>xml, "<params>"
for param, value in self.logdata.parameters.items():
print >>xml, " <param name=\"%s\" value=\"%s\" />" % (param,`value`)
#print >>xml, " <paramname>" + param + "</paramname>"
#print >>xml, " <paramvalue>" + `value` + "</paramvalue>"
#print >>xml, " </param>"
print >>xml, "</params>"
# output test results
@ -241,13 +244,6 @@ def main():
if not args.quiet:
print "XML output written to file: %s\n" % args.xml
# temp - test some spot values - include a bunch of this in a unit test at some point
#print "GPS abs alt on line 24126 is " + `self.logdata.channels["GPS"]["Alt"].dictData[24126]` # 52.03
#print "ATT pitch on line 22153 is " + `self.logdata.channels["ATT"]["Pitch"].dictData[22153]` # -7.03
#gpsAlt = self.logdata.channels["GPS"]["Alt"]
#print "All GPS Alt data: %s\n\n" % gpsAlt.dictData
#gpsAltSeg = gpsAlt.getSegment(426,711)
#print "Segment of GPS Alt data from %d to %d: %s\n\n" % (426,711,gpsAltSeg.dictData)
if __name__ == "__main__":
main()

View File

@ -4,6 +4,7 @@ import DataflashLog
class TestEvents(Test):
'''test for erroneous events and failsafes'''
# TODO: need to check for vehicle-specific codes
def __init__(self):
self.name = "Event/Failsafe"

View File

@ -51,3 +51,5 @@ class TestGPSGlitch(Test):
if not gpsGlitchCount:
self.result.status = TestResult.StatusType.WARN
self.result.statusMessage = satsMsg

View File

@ -39,32 +39,26 @@ class TestPitchRollCoupling(Test):
if mode in autoModes:
if not isAuto:
manualSegments.append((prevLine,line-1))
#print "Adding manual segment: %d,%d" % (prevLine,line-1)
prevLine = line
isAuto = True
elif mode in manualModes:
if isAuto:
autoSegments.append((prevLine,line-1))
#print "Adding auto segment: %d,%d" % (prevLine,line-1)
prevLine = line
isAuto = False
elif mode in ignoreModes:
if isAuto:
autoSegments.append((prevLine,line-1))
#print "Adding auto segment: %d,%d" % (prevLine,line-1)
else:
manualSegments.append((prevLine,line-1))
#print "Adding manual segment: %d,%d" % (prevLine,line-1)
prevLine = 0
else:
raise Exception("Unknown mode in TestPitchRollCoupling: %s" % mode)
# and handle the last segment, which doesn't have an ending
if mode in autoModes:
autoSegments.append((prevLine,logdata.lineCount))
#print "Adding final auto segment: %d,%d" % (prevLine,logdata.lineCount)
elif mode in manualModes:
manualSegments.append((prevLine,logdata.lineCount))
#print "Adding final manual segment: %d,%d" % (prevLine,logdata.lineCount)
# figure out max lean angle, the ANGLE_MAX param was added in AC3.1
maxLeanAngle = 45.0
@ -80,7 +74,6 @@ class TestPitchRollCoupling(Test):
(maxRoll, maxRollLine) = (0.0, 0)
(maxPitch, maxPitchLine) = (0.0, 0)
for (startLine,endLine) in manualSegments+autoSegments:
#print "Checking segment %d,%d" % (startLine,endLine)
# quick up-front test, only fallover into more complex line-by-line check if max()>threshold
rollSeg = logdata.channels["ATT"]["Roll"].getSegment(startLine,endLine)
pitchSeg = logdata.channels["ATT"]["Pitch"].getSegment(startLine,endLine)

View File

@ -61,7 +61,6 @@ class TestUnderpowered(Test):
(startLine,endLine) = (data[seg[0]][0], data[seg[1]][0])
avgClimbRate = logdata.channels["CTUN"]["CRate"].getSegment(startLine,endLine).avg()
avgThrOut = logdata.channels["CTUN"]["ThrOut"].getSegment(startLine,endLine).avg()
#print " Average CRate for this chunk is %.2f" % avgClimbRate
if avgClimbRate < climbThresholdFAIL:
self.result.status = TestResult.StatusType.FAIL
self.result.statusMessage = "Avg climb rate %.2f cm/s for throttle avg %d" % (avgClimbRate,avgThrOut)