diff --git a/Tools/LogAnalyzer/DataflashLog.py b/Tools/LogAnalyzer/DataflashLog.py index c4daadb8fe..adde7be7c5 100644 --- a/Tools/LogAnalyzer/DataflashLog.py +++ b/Tools/LogAnalyzer/DataflashLog.py @@ -1,10 +1,9 @@ # # Code to abstract the parsing of APM Dataflash log files, currently only used by the LogAnalyzer # -# Initial code by Andrew Chapman (chapman@skymount.com), 16th Jan 2014 +# Initial code by Andrew Chapman (amchapman@gmail.com), 16th Jan 2014 # -import pprint # temp import collections import os import numpy @@ -12,7 +11,7 @@ import bisect class Format: - '''Channel format as specified by the FMT lines in the log file''' + '''Data channel format as specified by the FMT lines in the log file''' msgType = 0 msgLen = 0 name = "" @@ -31,11 +30,10 @@ class Format: class Channel: '''storage for a single stream of data, i.e. all GPS.RelAlt values''' - # TODO: rethink data storage, but do regression test suite first before refactoring it + # TODO: rethink data storage, but do more thorough regression testing before refactoring it # TODO: store data as a scipy spline curve so we can more easily interpolate and sample the slope? - dictData = None # dict of linenum->value # store dupe data in dict and list for now, until we decide which is the better way to go - listData = None # list of (linenum,value) + listData = None # list of (linenum,value) # store dupe data in dict and list for now, until we decide which is the better way to go def __init__(self): self.dictData = {} @@ -134,6 +132,7 @@ class LogIterator: def __getitem__(self, lineLabel): return LogIterator.LogIteratorSubValue(self.logdata, self.iterators, lineLabel) def next(self): + '''increment iterator to next log line''' self.currentLine += 1 if self.currentLine > self.logdata.lineCount: return self @@ -148,16 +147,17 @@ class LogIterator: self.iterators[lineLabel] = (index,lineNumber) return self def jump(self, lineNumber): + '''jump iterator to specified log line''' self.currentLine = lineNumber for lineLabel in self.iterators.keys(): dataLabel = self.logdata.formats[lineLabel].labels[0] (value,lineNumber) = self.logdata.channels[lineLabel][dataLabel].getNearestValue(self.currentLine) - #print " Found value: %.2f, lineNumber: %d" % (value,lineNumber) - #print " Found index: %d" % self.logdata.channels[lineLabel][dataLabel].getIndexOf(lineNumber) self.iterators[lineLabel] = (self.logdata.channels[lineLabel][dataLabel].getIndexOf(lineNumber), lineNumber) class DataflashLogHelper: + '''helper functions for dealing with log data, put here to keep DataflashLog class as a simple parser and data store''' + @staticmethod def getTimeAtLine(logdata, lineNumber): '''returns the nearest GPS timestamp in milliseconds after the given line number''' @@ -242,6 +242,7 @@ class DataflashLog: skippedLines = 0 def getCopterType(self): + '''returns quad/hex/octo/tradheli if this is a copter log''' if self.vehicleType != "ArduCopter": return None motLabels = [] @@ -259,7 +260,7 @@ class DataflashLog: return "" def __castToFormatType(self,value,valueType): - '''using format characters from libraries/DataFlash/DataFlash.h to cast to basic python int/float/string types''' + '''using format characters from libraries/DataFlash/DataFlash.h to cast strings to basic python int/float/string types''' intTypes = "bBhHiIM" floatTypes = "fcCeEL" charTypes = "nNZ" @@ -277,9 +278,12 @@ class DataflashLog: def read(self, logfile, ignoreBadlines=False): '''returns on successful log read (including bad lines if ignoreBadlines==True), will throw an Exception otherwise''' - # TODO: dataflash log parsing code is *SUPER* hacky, should re-write more methodically + # TODO: dataflash log parsing code is pretty hacky, should re-write more methodically self.filename = logfile f = open(self.filename, 'r') + if f.read(4) == '\xa3\x95\x80\x80': + raise Exception("Unable to parse binary log files at this time, will be added soon") + f.seek(0) lineNumber = 0 knownHardwareTypes = ["APM", "PX4", "MPNG"] for line in f: @@ -382,5 +386,6 @@ class DataflashLog: self.durationSecs = (lastTimeGPS-firstTimeGPS) / 1000 # TODO: calculate logging rate based on timestamps + # ... diff --git a/Tools/LogAnalyzer/LogAnalyzer.py b/Tools/LogAnalyzer/LogAnalyzer.py index c12fadeac0..5d9c5edee4 100755 --- a/Tools/LogAnalyzer/LogAnalyzer.py +++ b/Tools/LogAnalyzer/LogAnalyzer.py @@ -2,7 +2,7 @@ # # A module to analyze and identify any common problems which can be determined from log files # -# Initial code by Andrew Chapman (chapman@skymount.com), 16th Jan 2014 +# Initial code by Andrew Chapman (amchapman@gmail.com), 16th Jan 2014 # @@ -14,6 +14,8 @@ # - copter+plane use 'V' in their vehicle type/version/build line, rover uses lower case 'v'. Copter+Rover give a build number, plane does not # - CTUN.ThrOut on copter is 0-1000, on plane+rover it is 0-100 +# TODO: add test for noisy baro values + import DataflashLog @@ -33,27 +35,28 @@ class TestResult: # NA means not applicable for this log (e.g. copter tests against a plane log), UNKNOWN means it is missing data required for the test PASS, FAIL, WARN, UNKNOWN, NA = range(5) status = None - statusMessage = "" + statusMessage = "" # can be multi-line class Test: - '''base class to be inherited by each specific log test. Each test should be quite granular so we have lots of small tests with clear results''' - name = "" - result = None # will be an instance of TestResult after being run + '''base class to be inherited by log tests. Each test should be quite granular so we have lots of small tests with clear results''' + name = "" + result = None # will be an instance of TestResult after being run execTime = None - enable = True + enable = True def run(self, logdata, verbose=False): pass class TestSuite: - '''registers test classes''' + '''registers test classes, loading using a basic plugin architecture, and can run them all in one run() operation''' tests = [] logfile = None logdata = None def __init__(self): # dynamically load in Test subclasses from the 'tests' folder + # to prevent one being loaded, move it out of that folder, or set that test's .enable attribute to False dirName = os.path.dirname(os.path.abspath(__file__)) testScripts = glob.glob(dirName + '/tests/*.py') testClasses = [] @@ -69,7 +72,7 @@ class TestSuite: # self.tests.append(m.TestBadParams()) def run(self, logdata, verbose): - '''run all registered tests in a single call''' + '''run all registered tests in a single call, gathering execution timing info''' self.logdata = logdata self.logfile = logdata.filename for test in self.tests: @@ -81,6 +84,7 @@ class TestSuite: test.execTime = 1000 * (endTime-startTime) def outputPlainText(self, outputStats): + '''output test results in plain text''' print 'Dataflash log analysis report for file: ' + self.logfile print 'Log size: %.2fmb (%d lines)' % (self.logdata.filesizeKB / 1024.0, self.logdata.lineCount) print 'Log duration: %s' % str(datetime.timedelta(seconds=self.logdata.durationSecs)) + '\n' @@ -125,6 +129,8 @@ class TestSuite: print '\n' def outputXML(self, xmlFile): + '''output test results to an XML file''' + # open the file for writing xml = None try: @@ -156,9 +162,6 @@ class TestSuite: print >>xml, "" for param, value in self.logdata.parameters.items(): print >>xml, " " % (param,`value`) - #print >>xml, " " + param + "" - #print >>xml, " " + `value` + "" - #print >>xml, " " print >>xml, "" # output test results @@ -241,13 +244,6 @@ def main(): if not args.quiet: print "XML output written to file: %s\n" % args.xml - # temp - test some spot values - include a bunch of this in a unit test at some point - #print "GPS abs alt on line 24126 is " + `self.logdata.channels["GPS"]["Alt"].dictData[24126]` # 52.03 - #print "ATT pitch on line 22153 is " + `self.logdata.channels["ATT"]["Pitch"].dictData[22153]` # -7.03 - #gpsAlt = self.logdata.channels["GPS"]["Alt"] - #print "All GPS Alt data: %s\n\n" % gpsAlt.dictData - #gpsAltSeg = gpsAlt.getSegment(426,711) - #print "Segment of GPS Alt data from %d to %d: %s\n\n" % (426,711,gpsAltSeg.dictData) if __name__ == "__main__": main() diff --git a/Tools/LogAnalyzer/tests/TestEvents.py b/Tools/LogAnalyzer/tests/TestEvents.py index 21f276200b..819c87a951 100644 --- a/Tools/LogAnalyzer/tests/TestEvents.py +++ b/Tools/LogAnalyzer/tests/TestEvents.py @@ -4,6 +4,7 @@ import DataflashLog class TestEvents(Test): '''test for erroneous events and failsafes''' + # TODO: need to check for vehicle-specific codes def __init__(self): self.name = "Event/Failsafe" diff --git a/Tools/LogAnalyzer/tests/TestGPSGlitch.py b/Tools/LogAnalyzer/tests/TestGPSGlitch.py index 95c624135d..9f997e060d 100644 --- a/Tools/LogAnalyzer/tests/TestGPSGlitch.py +++ b/Tools/LogAnalyzer/tests/TestGPSGlitch.py @@ -51,3 +51,5 @@ class TestGPSGlitch(Test): if not gpsGlitchCount: self.result.status = TestResult.StatusType.WARN self.result.statusMessage = satsMsg + + diff --git a/Tools/LogAnalyzer/tests/TestPitchRollCoupling.py b/Tools/LogAnalyzer/tests/TestPitchRollCoupling.py index b4dd9da213..46357ee5be 100644 --- a/Tools/LogAnalyzer/tests/TestPitchRollCoupling.py +++ b/Tools/LogAnalyzer/tests/TestPitchRollCoupling.py @@ -39,32 +39,26 @@ class TestPitchRollCoupling(Test): if mode in autoModes: if not isAuto: manualSegments.append((prevLine,line-1)) - #print "Adding manual segment: %d,%d" % (prevLine,line-1) prevLine = line isAuto = True elif mode in manualModes: if isAuto: autoSegments.append((prevLine,line-1)) - #print "Adding auto segment: %d,%d" % (prevLine,line-1) prevLine = line isAuto = False elif mode in ignoreModes: if isAuto: autoSegments.append((prevLine,line-1)) - #print "Adding auto segment: %d,%d" % (prevLine,line-1) else: manualSegments.append((prevLine,line-1)) - #print "Adding manual segment: %d,%d" % (prevLine,line-1) prevLine = 0 else: raise Exception("Unknown mode in TestPitchRollCoupling: %s" % mode) # and handle the last segment, which doesn't have an ending if mode in autoModes: autoSegments.append((prevLine,logdata.lineCount)) - #print "Adding final auto segment: %d,%d" % (prevLine,logdata.lineCount) elif mode in manualModes: manualSegments.append((prevLine,logdata.lineCount)) - #print "Adding final manual segment: %d,%d" % (prevLine,logdata.lineCount) # figure out max lean angle, the ANGLE_MAX param was added in AC3.1 maxLeanAngle = 45.0 @@ -80,7 +74,6 @@ class TestPitchRollCoupling(Test): (maxRoll, maxRollLine) = (0.0, 0) (maxPitch, maxPitchLine) = (0.0, 0) for (startLine,endLine) in manualSegments+autoSegments: - #print "Checking segment %d,%d" % (startLine,endLine) # quick up-front test, only fallover into more complex line-by-line check if max()>threshold rollSeg = logdata.channels["ATT"]["Roll"].getSegment(startLine,endLine) pitchSeg = logdata.channels["ATT"]["Pitch"].getSegment(startLine,endLine) diff --git a/Tools/LogAnalyzer/tests/TestUnderpowered.py b/Tools/LogAnalyzer/tests/TestUnderpowered.py index 654061ab8a..d757662970 100644 --- a/Tools/LogAnalyzer/tests/TestUnderpowered.py +++ b/Tools/LogAnalyzer/tests/TestUnderpowered.py @@ -61,7 +61,6 @@ class TestUnderpowered(Test): (startLine,endLine) = (data[seg[0]][0], data[seg[1]][0]) avgClimbRate = logdata.channels["CTUN"]["CRate"].getSegment(startLine,endLine).avg() avgThrOut = logdata.channels["CTUN"]["ThrOut"].getSegment(startLine,endLine).avg() - #print " Average CRate for this chunk is %.2f" % avgClimbRate if avgClimbRate < climbThresholdFAIL: self.result.status = TestResult.StatusType.FAIL self.result.statusMessage = "Avg climb rate %.2f cm/s for throttle avg %d" % (avgClimbRate,avgThrOut)