tools: Change LogAnalyzer tabs to spaces (per APM python conventions)

This commit is contained in:
Kevin Hester 2014-06-24 15:30:55 -07:00 committed by Andrew Tridgell
parent 544d872d65
commit a31498f599
2 changed files with 545 additions and 545 deletions

View File

@ -11,392 +11,392 @@ import bisect
class Format:
'''Data channel format as specified by the FMT lines in the log file'''
msgType = 0
msgLen = 0
name = ""
types = ""
labels = []
def __init__(self,msgType,msgLen,name,types,labels):
self.msgType = msgType
self.msgLen = msgLen
self.name = name
self.types = types
self.labels = labels.split(',')
def __str__(self):
return "%8s %s" % (self.name, `self.labels`)
'''Data channel format as specified by the FMT lines in the log file'''
msgType = 0
msgLen = 0
name = ""
types = ""
labels = []
def __init__(self,msgType,msgLen,name,types,labels):
self.msgType = msgType
self.msgLen = msgLen
self.name = name
self.types = types
self.labels = labels.split(',')
def __str__(self):
return "%8s %s" % (self.name, `self.labels`)
class Channel:
'''storage for a single stream of data, i.e. all GPS.RelAlt values'''
# TODO: rethink data storage, but do more thorough regression testing before refactoring it
# TODO: store data as a scipy spline curve so we can more easily interpolate and sample the slope?
dictData = None # dict of linenum->value # store dupe data in dict and list for now, until we decide which is the better way to go
listData = None # list of (linenum,value) # store dupe data in dict and list for now, until we decide which is the better way to go
'''storage for a single stream of data, i.e. all GPS.RelAlt values'''
def __init__(self):
self.dictData = {}
self.listData = []
def getSegment(self, startLine, endLine):
'''returns a segment of this data (from startLine to endLine, inclusive) as a new Channel instance'''
segment = Channel()
segment.dictData = {k:v for k,v in self.dictData.iteritems() if k >= startLine and k <= endLine}
return segment
def min(self):
return min(self.dictData.values())
def max(self):
return max(self.dictData.values())
def avg(self):
return numpy.mean(self.dictData.values())
def getNearestValueFwd(self, lineNumber):
'''Returns (value,lineNumber)'''
index = bisect.bisect_left(self.listData, (lineNumber,-99999))
while index<len(self.listData):
line = self.listData[index][0]
#print "Looking forwards for nearest value to line number %d, starting at line %d" % (lineNumber,line) # TEMP
if line >= lineNumber:
return (self.listData[index][1],line)
index += 1
raise Exception("Error finding nearest value for line %d" % lineNumber)
def getNearestValueBack(self, lineNumber):
'''Returns (value,lineNumber)'''
index = bisect.bisect_left(self.listData, (lineNumber,-99999)) - 1
while index>=0:
line = self.listData[index][0]
#print "Looking backwards for nearest value to line number %d, starting at line %d" % (lineNumber,line) # TEMP
if line <= lineNumber:
return (self.listData[index][1],line)
index -= 1
raise Exception("Error finding nearest value for line %d" % lineNumber)
def getNearestValue(self, lineNumber, lookForwards=True):
'''find the nearest data value to the given lineNumber, defaults to first looking forwards. Returns (value,lineNumber)'''
if lookForwards:
try:
return self.getNearestValueFwd(lineNumber)
except:
return self.getNearestValueBack(lineNumber)
else:
try:
return self.getNearestValueBack(lineNumber)
except:
return self.getNearestValueFwd(lineNumber)
raise Exception("Error finding nearest value for line %d" % lineNumber)
def getInterpolatedValue(self, lineNumber):
(prevValue,prevValueLine) = self.getNearestValue(lineNumber, lookForwards=False)
(nextValue,nextValueLine) = self.getNearestValue(lineNumber, lookForwards=True)
if prevValueLine == nextValueLine:
return prevValue
weight = (lineNumber-prevValueLine) / float(nextValueLine-prevValueLine)
return ((weight*prevValue) + ((1-weight)*nextValue))
def getIndexOf(self, lineNumber):
'''returns the index within this channel's listData of the given lineNumber, or raises an Exception if not found'''
index = bisect.bisect_left(self.listData, (lineNumber,-99999))
#print "INDEX of line %d: %d" % (lineNumber,index)
#print "self.listData[index][0]: %d" % self.listData[index][0]
if (self.listData[index][0] == lineNumber):
return index
else:
raise Exception("Error finding index for line %d" % lineNumber)
# TODO: rethink data storage, but do more thorough regression testing before refactoring it
# TODO: store data as a scipy spline curve so we can more easily interpolate and sample the slope?
dictData = None # dict of linenum->value # store dupe data in dict and list for now, until we decide which is the better way to go
listData = None # list of (linenum,value) # store dupe data in dict and list for now, until we decide which is the better way to go
def __init__(self):
self.dictData = {}
self.listData = []
def getSegment(self, startLine, endLine):
'''returns a segment of this data (from startLine to endLine, inclusive) as a new Channel instance'''
segment = Channel()
segment.dictData = {k:v for k,v in self.dictData.iteritems() if k >= startLine and k <= endLine}
return segment
def min(self):
return min(self.dictData.values())
def max(self):
return max(self.dictData.values())
def avg(self):
return numpy.mean(self.dictData.values())
def getNearestValueFwd(self, lineNumber):
'''Returns (value,lineNumber)'''
index = bisect.bisect_left(self.listData, (lineNumber,-99999))
while index<len(self.listData):
line = self.listData[index][0]
#print "Looking forwards for nearest value to line number %d, starting at line %d" % (lineNumber,line) # TEMP
if line >= lineNumber:
return (self.listData[index][1],line)
index += 1
raise Exception("Error finding nearest value for line %d" % lineNumber)
def getNearestValueBack(self, lineNumber):
'''Returns (value,lineNumber)'''
index = bisect.bisect_left(self.listData, (lineNumber,-99999)) - 1
while index>=0:
line = self.listData[index][0]
#print "Looking backwards for nearest value to line number %d, starting at line %d" % (lineNumber,line) # TEMP
if line <= lineNumber:
return (self.listData[index][1],line)
index -= 1
raise Exception("Error finding nearest value for line %d" % lineNumber)
def getNearestValue(self, lineNumber, lookForwards=True):
'''find the nearest data value to the given lineNumber, defaults to first looking forwards. Returns (value,lineNumber)'''
if lookForwards:
try:
return self.getNearestValueFwd(lineNumber)
except:
return self.getNearestValueBack(lineNumber)
else:
try:
return self.getNearestValueBack(lineNumber)
except:
return self.getNearestValueFwd(lineNumber)
raise Exception("Error finding nearest value for line %d" % lineNumber)
def getInterpolatedValue(self, lineNumber):
(prevValue,prevValueLine) = self.getNearestValue(lineNumber, lookForwards=False)
(nextValue,nextValueLine) = self.getNearestValue(lineNumber, lookForwards=True)
if prevValueLine == nextValueLine:
return prevValue
weight = (lineNumber-prevValueLine) / float(nextValueLine-prevValueLine)
return ((weight*prevValue) + ((1-weight)*nextValue))
def getIndexOf(self, lineNumber):
'''returns the index within this channel's listData of the given lineNumber, or raises an Exception if not found'''
index = bisect.bisect_left(self.listData, (lineNumber,-99999))
#print "INDEX of line %d: %d" % (lineNumber,index)
#print "self.listData[index][0]: %d" % self.listData[index][0]
if (self.listData[index][0] == lineNumber):
return index
else:
raise Exception("Error finding index for line %d" % lineNumber)
class LogIterator:
'''Smart iterator that can move through a log by line number and maintain an index into the nearest values of all data channels'''
# TODO: LogIterator currently indexes the next available value rather than the nearest value, we should make it configurable between next/nearest
'''Smart iterator that can move through a log by line number and maintain an index into the nearest values of all data channels'''
# TODO: LogIterator currently indexes the next available value rather than the nearest value, we should make it configurable between next/nearest
class LogIteratorSubValue:
'''syntactic sugar to allow access by LogIterator[lineLabel][dataLabel]'''
logdata = None
iterators = None
lineLabel = None
def __init__(self, logdata, iterators, lineLabel):
self.logdata = logdata
self.lineLabel = lineLabel
self.iterators = iterators
def __getitem__(self, dataLabel):
index = self.iterators[self.lineLabel][0]
return self.logdata.channels[self.lineLabel][dataLabel].listData[index][1]
class LogIteratorSubValue:
'''syntactic sugar to allow access by LogIterator[lineLabel][dataLabel]'''
logdata = None
iterators = None
lineLabel = None
def __init__(self, logdata, iterators, lineLabel):
self.logdata = logdata
self.lineLabel = lineLabel
self.iterators = iterators
def __getitem__(self, dataLabel):
index = self.iterators[self.lineLabel][0]
return self.logdata.channels[self.lineLabel][dataLabel].listData[index][1]
iterators = {} # lineLabel -> (listIndex,lineNumber)
logdata = None
currentLine = None
iterators = {} # lineLabel -> (listIndex,lineNumber)
logdata = None
currentLine = None
def __init__(self, logdata, lineNumber=0):
self.logdata = logdata
self.currentLine = lineNumber
for lineLabel in self.logdata.formats:
if lineLabel in self.logdata.channels:
self.iterators[lineLabel] = ()
self.jump(lineNumber)
def __iter__(self):
return self
def __getitem__(self, lineLabel):
return LogIterator.LogIteratorSubValue(self.logdata, self.iterators, lineLabel)
def next(self):
'''increment iterator to next log line'''
self.currentLine += 1
if self.currentLine > self.logdata.lineCount:
return self
for lineLabel in self.iterators.keys():
# check if the currentLine has gone past our the line we're pointing to for this type of data
dataLabel = self.logdata.formats[lineLabel].labels[0]
(index, lineNumber) = self.iterators[lineLabel]
# if so, and it is not the last entry in the log, then increment the indices for all dataLabels under that lineLabel
if (self.currentLine > lineNumber) and (index < len(self.logdata.channels[lineLabel][dataLabel].listData)-1):
index += 1
lineNumber = self.logdata.channels[lineLabel][dataLabel].listData[index][0]
self.iterators[lineLabel] = (index,lineNumber)
return self
def jump(self, lineNumber):
'''jump iterator to specified log line'''
self.currentLine = lineNumber
for lineLabel in self.iterators.keys():
dataLabel = self.logdata.formats[lineLabel].labels[0]
(value,lineNumber) = self.logdata.channels[lineLabel][dataLabel].getNearestValue(self.currentLine)
self.iterators[lineLabel] = (self.logdata.channels[lineLabel][dataLabel].getIndexOf(lineNumber), lineNumber)
def __init__(self, logdata, lineNumber=0):
self.logdata = logdata
self.currentLine = lineNumber
for lineLabel in self.logdata.formats:
if lineLabel in self.logdata.channels:
self.iterators[lineLabel] = ()
self.jump(lineNumber)
def __iter__(self):
return self
def __getitem__(self, lineLabel):
return LogIterator.LogIteratorSubValue(self.logdata, self.iterators, lineLabel)
def next(self):
'''increment iterator to next log line'''
self.currentLine += 1
if self.currentLine > self.logdata.lineCount:
return self
for lineLabel in self.iterators.keys():
# check if the currentLine has gone past our the line we're pointing to for this type of data
dataLabel = self.logdata.formats[lineLabel].labels[0]
(index, lineNumber) = self.iterators[lineLabel]
# if so, and it is not the last entry in the log, then increment the indices for all dataLabels under that lineLabel
if (self.currentLine > lineNumber) and (index < len(self.logdata.channels[lineLabel][dataLabel].listData)-1):
index += 1
lineNumber = self.logdata.channels[lineLabel][dataLabel].listData[index][0]
self.iterators[lineLabel] = (index,lineNumber)
return self
def jump(self, lineNumber):
'''jump iterator to specified log line'''
self.currentLine = lineNumber
for lineLabel in self.iterators.keys():
dataLabel = self.logdata.formats[lineLabel].labels[0]
(value,lineNumber) = self.logdata.channels[lineLabel][dataLabel].getNearestValue(self.currentLine)
self.iterators[lineLabel] = (self.logdata.channels[lineLabel][dataLabel].getIndexOf(lineNumber), lineNumber)
class DataflashLogHelper:
'''helper functions for dealing with log data, put here to keep DataflashLog class as a simple parser and data store'''
'''helper functions for dealing with log data, put here to keep DataflashLog class as a simple parser and data store'''
@staticmethod
def getTimeAtLine(logdata, lineNumber):
'''returns the nearest GPS timestamp in milliseconds after the given line number'''
if not "GPS" in logdata.channels:
raise Exception("no GPS log data found")
# older logs use 'TIme', newer logs use 'TimeMS'
timeLabel = "TimeMS"
if "Time" in logdata.channels["GPS"]:
timeLabel = "Time"
while lineNumber <= logdata.lineCount:
if lineNumber in logdata.channels["GPS"][timeLabel].dictData:
return logdata.channels["GPS"][timeLabel].dictData[lineNumber]
lineNumber = lineNumber + 1
@staticmethod
def getTimeAtLine(logdata, lineNumber):
'''returns the nearest GPS timestamp in milliseconds after the given line number'''
if not "GPS" in logdata.channels:
raise Exception("no GPS log data found")
# older logs use 'TIme', newer logs use 'TimeMS'
timeLabel = "TimeMS"
if "Time" in logdata.channels["GPS"]:
timeLabel = "Time"
while lineNumber <= logdata.lineCount:
if lineNumber in logdata.channels["GPS"][timeLabel].dictData:
return logdata.channels["GPS"][timeLabel].dictData[lineNumber]
lineNumber = lineNumber + 1
@staticmethod
def findLoiterChunks(logdata, minLengthSeconds=0, noRCInputs=True):
'''returns a list of (to,from) pairs defining sections of the log which are in loiter mode. Ordered from longest to shortest in time. If noRCInputs == True it only returns chunks with no control inputs'''
# TODO: implement noRCInputs handling when identifying stable loiter chunks, for now we're ignoring it
@staticmethod
def findLoiterChunks(logdata, minLengthSeconds=0, noRCInputs=True):
'''returns a list of (to,from) pairs defining sections of the log which are in loiter mode. Ordered from longest to shortest in time. If noRCInputs == True it only returns chunks with no control inputs'''
# TODO: implement noRCInputs handling when identifying stable loiter chunks, for now we're ignoring it
def chunkSizeCompare(chunk1, chunk2):
chunk1Len = chunk1[1]-chunk1[0]
chunk2Len = chunk2[1]-chunk2[0]
if chunk1Len == chunk2Len:
return 0
elif chunk1Len > chunk2Len:
return -1
else:
return 1
def chunkSizeCompare(chunk1, chunk2):
chunk1Len = chunk1[1]-chunk1[0]
chunk2Len = chunk2[1]-chunk2[0]
if chunk1Len == chunk2Len:
return 0
elif chunk1Len > chunk2Len:
return -1
else:
return 1
od = collections.OrderedDict(sorted(logdata.modeChanges.items(), key=lambda t: t[0]))
chunks = []
for i in range(len(od.keys())):
if od.values()[i][0] == "LOITER":
startLine = od.keys()[i]
endLine = None
if i == len(od.keys())-1:
endLine = logdata.lineCount
else:
endLine = od.keys()[i+1]-1
chunkTimeSeconds = (DataflashLogHelper.getTimeAtLine(logdata,endLine)-DataflashLogHelper.getTimeAtLine(logdata,startLine)+1) / 1000.0
if chunkTimeSeconds > minLengthSeconds:
chunks.append((startLine,endLine))
#print "LOITER chunk: %d to %d, %d lines" % (startLine,endLine,endLine-startLine+1)
#print " (time %d to %d, %d seconds)" % (DataflashLogHelper.getTimeAtLine(logdata,startLine), DataflashLogHelper.getTimeAtLine(logdata,endLine), chunkTimeSeconds)
chunks.sort(chunkSizeCompare)
return chunks
od = collections.OrderedDict(sorted(logdata.modeChanges.items(), key=lambda t: t[0]))
chunks = []
for i in range(len(od.keys())):
if od.values()[i][0] == "LOITER":
startLine = od.keys()[i]
endLine = None
if i == len(od.keys())-1:
endLine = logdata.lineCount
else:
endLine = od.keys()[i+1]-1
chunkTimeSeconds = (DataflashLogHelper.getTimeAtLine(logdata,endLine)-DataflashLogHelper.getTimeAtLine(logdata,startLine)+1) / 1000.0
if chunkTimeSeconds > minLengthSeconds:
chunks.append((startLine,endLine))
#print "LOITER chunk: %d to %d, %d lines" % (startLine,endLine,endLine-startLine+1)
#print " (time %d to %d, %d seconds)" % (DataflashLogHelper.getTimeAtLine(logdata,startLine), DataflashLogHelper.getTimeAtLine(logdata,endLine), chunkTimeSeconds)
chunks.sort(chunkSizeCompare)
return chunks
@staticmethod
def isLogEmpty(logdata):
'''returns an human readable error string if the log is essentially empty, otherwise returns None'''
# naive check for now, see if the throttle output was ever above 20%
throttleThreshold = 20
if logdata.vehicleType == "ArduCopter":
throttleThreshold = 200 # copter uses 0-1000, plane+rover use 0-100
if "CTUN" in logdata.channels:
maxThrottle = logdata.channels["CTUN"]["ThrOut"].max()
if maxThrottle < throttleThreshold:
return "Throttle never above 20%"
return None
@staticmethod
def isLogEmpty(logdata):
'''returns an human readable error string if the log is essentially empty, otherwise returns None'''
# naive check for now, see if the throttle output was ever above 20%
throttleThreshold = 20
if logdata.vehicleType == "ArduCopter":
throttleThreshold = 200 # copter uses 0-1000, plane+rover use 0-100
if "CTUN" in logdata.channels:
maxThrottle = logdata.channels["CTUN"]["ThrOut"].max()
if maxThrottle < throttleThreshold:
return "Throttle never above 20%"
return None
class DataflashLog:
'''APM Dataflash log file reader and container class. Keep this simple, add more advanced or specific functions to DataflashLogHelper class'''
'''APM Dataflash log file reader and container class. Keep this simple, add more advanced or specific functions to DataflashLogHelper class'''
filename = None
filename = None
vehicleType = "" # ArduCopter, ArduPlane, ArduRover, etc, verbatim as given by header
firmwareVersion = ""
firmwareHash = ""
freeRAM = 0
hardwareType = "" # APM 1, APM 2, PX4, MPNG, etc What is VRBrain? BeagleBone, etc? Needs more testing
vehicleType = "" # ArduCopter, ArduPlane, ArduRover, etc, verbatim as given by header
firmwareVersion = ""
firmwareHash = ""
freeRAM = 0
hardwareType = "" # APM 1, APM 2, PX4, MPNG, etc What is VRBrain? BeagleBone, etc? Needs more testing
formats = {} # name -> Format
parameters = {} # token -> value
messages = {} # lineNum -> message
modeChanges = {} # lineNum -> (mode,value)
channels = {} # lineLabel -> {dataLabel:Channel}
formats = {} # name -> Format
parameters = {} # token -> value
messages = {} # lineNum -> message
modeChanges = {} # lineNum -> (mode,value)
channels = {} # lineLabel -> {dataLabel:Channel}
filesizeKB = 0
durationSecs = 0
lineCount = 0
skippedLines = 0
filesizeKB = 0
durationSecs = 0
lineCount = 0
skippedLines = 0
def getCopterType(self):
'''returns quad/hex/octo/tradheli if this is a copter log'''
if self.vehicleType != "ArduCopter":
return None
motLabels = []
if "MOT" in self.formats: # not listed in PX4 log header for some reason?
motLabels = self.formats["MOT"].labels
if "GGain" in motLabels:
return "tradheli"
elif len(motLabels) == 4:
return "quad"
elif len(motLabels) == 6:
return "hex"
elif len(motLabels) == 8:
return "octo"
else:
return ""
def getCopterType(self):
'''returns quad/hex/octo/tradheli if this is a copter log'''
if self.vehicleType != "ArduCopter":
return None
motLabels = []
if "MOT" in self.formats: # not listed in PX4 log header for some reason?
motLabels = self.formats["MOT"].labels
if "GGain" in motLabels:
return "tradheli"
elif len(motLabels) == 4:
return "quad"
elif len(motLabels) == 6:
return "hex"
elif len(motLabels) == 8:
return "octo"
else:
return ""
def __castToFormatType(self,value,valueType):
'''using format characters from libraries/DataFlash/DataFlash.h to cast strings to basic python int/float/string types'''
intTypes = "bBhHiIM"
floatTypes = "fcCeEL"
charTypes = "nNZ"
if valueType in floatTypes:
return float(value)
elif valueType in intTypes:
return int(value)
elif valueType in charTypes:
return str(value)
else:
raise Exception("Unknown value type of '%s' specified to __castToFormatType()" % valueType)
def __castToFormatType(self,value,valueType):
'''using format characters from libraries/DataFlash/DataFlash.h to cast strings to basic python int/float/string types'''
intTypes = "bBhHiIM"
floatTypes = "fcCeEL"
charTypes = "nNZ"
if valueType in floatTypes:
return float(value)
elif valueType in intTypes:
return int(value)
elif valueType in charTypes:
return str(value)
else:
raise Exception("Unknown value type of '%s' specified to __castToFormatType()" % valueType)
#def __init__(self, logfile, ignoreBadlines=False):
#self.read(logfile, ignoreBadlines)
#def __init__(self, logfile, ignoreBadlines=False):
#self.read(logfile, ignoreBadlines)
def read(self, logfile, ignoreBadlines=False):
'''returns on successful log read (including bad lines if ignoreBadlines==True), will throw an Exception otherwise'''
# TODO: dataflash log parsing code is pretty hacky, should re-write more methodically
self.filename = logfile
f = open(self.filename, 'r')
if f.read(4) == '\xa3\x95\x80\x80':
raise Exception("Unable to parse binary log files at this time, will be added soon")
f.seek(0)
lineNumber = 0
knownHardwareTypes = ["APM", "PX4", "MPNG"]
for line in f:
lineNumber = lineNumber + 1
try:
#print "Reading line: %d" % lineNumber
line = line.strip('\n\r')
tokens = line.split(', ')
# first handle the log header lines
if line == " Ready to drive." or line == " Ready to FLY.":
continue
if line == "----------------------------------------": # present in pre-3.0 logs
raise Exception("Log file seems to be in the older format (prior to self-describing logs), which isn't supported")
# Some logs are missing the initial dataflash header which says the log index and the type of log, but we can catch the vehicle
# type here too in the MSG line
if not self.vehicleType and tokens[0] == "MSG":
tokens2 = line.split(' ')
vehicleTypes = ["ArduPlane", "ArduCopter", "ArduRover"]
if tokens2[1] in vehicleTypes and tokens2[2][0].lower() == "v":
self.vehicleType = tokens2[1]
self.firmwareVersion = tokens2[1]
if len(tokens2) == 3:
self.firmwareHash = tokens2[2][1:-1]
continue
if len(tokens) == 1:
tokens2 = line.split(' ')
if line == "":
pass
elif len(tokens2) == 1 and tokens2[0].isdigit(): # log index
pass
elif len(tokens2) == 3 and tokens2[0] == "Free" and tokens2[1] == "RAM:":
self.freeRAM = int(tokens2[2])
elif tokens2[0] in knownHardwareTypes:
self.hardwareType = line # not sure if we can parse this more usefully, for now only need to report it back verbatim
elif (len(tokens2) == 2 or len(tokens2) == 3) and tokens2[1][0].lower() == "v": # e.g. ArduCopter V3.1 (5c6503e2)
self.vehicleType = tokens2[0]
self.firmwareVersion = tokens2[1]
if len(tokens2) == 3:
self.firmwareHash = tokens2[2][1:-1]
else:
errorMsg = "Error parsing line %d of log file: %s" % (lineNumber, self.filename)
if ignoreBadlines:
print errorMsg + " (skipping line)"
self.skippedLines += 1
else:
raise Exception("")
# now handle the non-log data stuff, format descriptions, params, etc
elif tokens[0] == "FMT":
format = None
if len(tokens) == 6:
format = Format(tokens[1],tokens[2],tokens[3],tokens[4],tokens[5])
elif len(tokens) == 5: # some logs have FMT STRT with no labels
format = Format(tokens[1],tokens[2],tokens[3],tokens[4],"")
else:
raise Exception("FMT error, nTokens: %d" % len(tokens))
#print format # TEMP
self.formats[tokens[3]] = format
elif tokens[0] == "PARM":
pName = tokens[1]
self.parameters[pName] = float(tokens[2])
elif tokens[0] == "MSG":
self.messages[lineNumber] = tokens[1]
elif tokens[0] == "MODE":
if self.vehicleType == "ArduCopter":
self.modeChanges[lineNumber] = (tokens[1],int(tokens[2]))
elif self.vehicleType == "ArduPlane" or self.vehicleType == "ArduRover":
self.modeChanges[lineNumber] = (tokens[2],int(tokens[3]))
else:
raise Exception("Unknown log type for MODE line")
# anything else must be the log data
else:
groupName = tokens[0]
tokens2 = line.split(', ')
# first time seeing this type of log line, create the channel storage
if not groupName in self.channels:
self.channels[groupName] = {}
for label in self.formats[groupName].labels:
self.channels[groupName][label] = Channel()
# check the number of tokens matches between the line and what we're expecting from the FMT definition
if (len(tokens2)-1) != len(self.formats[groupName].labels):
errorMsg = "%s line's value count (%d) not matching FMT specification (%d) on line %d" % (groupName, len(tokens2)-1, len(self.formats[groupName].labels), lineNumber)
if ignoreBadlines:
print errorMsg + " (skipping line)"
self.skippedLines += 1
continue
else:
raise Exception(errorMsg)
# store each token in its relevant channel
for (index,label) in enumerate(self.formats[groupName].labels):
#value = float(tokens2[index+1]) # simple read without handling datatype
value = self.__castToFormatType(tokens2[index+1], self.formats[groupName].types[index]) # handling datatype via this call slows down ready by about 50%
channel = self.channels[groupName][label]
#print "Set data {%s,%s} for line %d to value %s, of type %c, stored at address %s" % (groupName, label, lineNumber, `value`, self.formats[groupName].types[index], hex(id(channel.dictData)))
channel.dictData[lineNumber] = value
channel.listData.append((lineNumber,value))
except Exception as e:
raise Exception("Error parsing line %d of log file %s - %s" % (lineNumber,self.filename,e.args[0]))
# gather some general stats about the log
self.lineCount = lineNumber
self.filesizeKB = os.path.getsize(self.filename) / 1024.0
# TODO: switch duration calculation to use TimeMS values rather than GPS timestemp
if "GPS" in self.channels:
# the GPS time label changed at some point, need to handle both
timeLabel = "TimeMS"
if timeLabel not in self.channels["GPS"]:
timeLabel = "Time"
firstTimeGPS = self.channels["GPS"][timeLabel].listData[0][1]
lastTimeGPS = self.channels["GPS"][timeLabel].listData[-1][1]
self.durationSecs = (lastTimeGPS-firstTimeGPS) / 1000
# TODO: calculate logging rate based on timestamps
# ...
def read(self, logfile, ignoreBadlines=False):
'''returns on successful log read (including bad lines if ignoreBadlines==True), will throw an Exception otherwise'''
# TODO: dataflash log parsing code is pretty hacky, should re-write more methodically
self.filename = logfile
f = open(self.filename, 'r')
if f.read(4) == '\xa3\x95\x80\x80':
raise Exception("Unable to parse binary log files at this time, will be added soon")
f.seek(0)
lineNumber = 0
knownHardwareTypes = ["APM", "PX4", "MPNG"]
for line in f:
lineNumber = lineNumber + 1
try:
#print "Reading line: %d" % lineNumber
line = line.strip('\n\r')
tokens = line.split(', ')
# first handle the log header lines
if line == " Ready to drive." or line == " Ready to FLY.":
continue
if line == "----------------------------------------": # present in pre-3.0 logs
raise Exception("Log file seems to be in the older format (prior to self-describing logs), which isn't supported")
# Some logs are missing the initial dataflash header which says the log index and the type of log, but we can catch the vehicle
# type here too in the MSG line
if not self.vehicleType and tokens[0] == "MSG":
tokens2 = line.split(' ')
vehicleTypes = ["ArduPlane", "ArduCopter", "ArduRover"]
if tokens2[1] in vehicleTypes and tokens2[2][0].lower() == "v":
self.vehicleType = tokens2[1]
self.firmwareVersion = tokens2[1]
if len(tokens2) == 3:
self.firmwareHash = tokens2[2][1:-1]
continue
if len(tokens) == 1:
tokens2 = line.split(' ')
if line == "":
pass
elif len(tokens2) == 1 and tokens2[0].isdigit(): # log index
pass
elif len(tokens2) == 3 and tokens2[0] == "Free" and tokens2[1] == "RAM:":
self.freeRAM = int(tokens2[2])
elif tokens2[0] in knownHardwareTypes:
self.hardwareType = line # not sure if we can parse this more usefully, for now only need to report it back verbatim
elif (len(tokens2) == 2 or len(tokens2) == 3) and tokens2[1][0].lower() == "v": # e.g. ArduCopter V3.1 (5c6503e2)
self.vehicleType = tokens2[0]
self.firmwareVersion = tokens2[1]
if len(tokens2) == 3:
self.firmwareHash = tokens2[2][1:-1]
else:
errorMsg = "Error parsing line %d of log file: %s" % (lineNumber, self.filename)
if ignoreBadlines:
print errorMsg + " (skipping line)"
self.skippedLines += 1
else:
raise Exception("")
# now handle the non-log data stuff, format descriptions, params, etc
elif tokens[0] == "FMT":
format = None
if len(tokens) == 6:
format = Format(tokens[1],tokens[2],tokens[3],tokens[4],tokens[5])
elif len(tokens) == 5: # some logs have FMT STRT with no labels
format = Format(tokens[1],tokens[2],tokens[3],tokens[4],"")
else:
raise Exception("FMT error, nTokens: %d" % len(tokens))
#print format # TEMP
self.formats[tokens[3]] = format
elif tokens[0] == "PARM":
pName = tokens[1]
self.parameters[pName] = float(tokens[2])
elif tokens[0] == "MSG":
self.messages[lineNumber] = tokens[1]
elif tokens[0] == "MODE":
if self.vehicleType == "ArduCopter":
self.modeChanges[lineNumber] = (tokens[1],int(tokens[2]))
elif self.vehicleType == "ArduPlane" or self.vehicleType == "ArduRover":
self.modeChanges[lineNumber] = (tokens[2],int(tokens[3]))
else:
raise Exception("Unknown log type for MODE line")
# anything else must be the log data
else:
groupName = tokens[0]
tokens2 = line.split(', ')
# first time seeing this type of log line, create the channel storage
if not groupName in self.channels:
self.channels[groupName] = {}
for label in self.formats[groupName].labels:
self.channels[groupName][label] = Channel()
# check the number of tokens matches between the line and what we're expecting from the FMT definition
if (len(tokens2)-1) != len(self.formats[groupName].labels):
errorMsg = "%s line's value count (%d) not matching FMT specification (%d) on line %d" % (groupName, len(tokens2)-1, len(self.formats[groupName].labels), lineNumber)
if ignoreBadlines:
print errorMsg + " (skipping line)"
self.skippedLines += 1
continue
else:
raise Exception(errorMsg)
# store each token in its relevant channel
for (index,label) in enumerate(self.formats[groupName].labels):
#value = float(tokens2[index+1]) # simple read without handling datatype
value = self.__castToFormatType(tokens2[index+1], self.formats[groupName].types[index]) # handling datatype via this call slows down ready by about 50%
channel = self.channels[groupName][label]
#print "Set data {%s,%s} for line %d to value %s, of type %c, stored at address %s" % (groupName, label, lineNumber, `value`, self.formats[groupName].types[index], hex(id(channel.dictData)))
channel.dictData[lineNumber] = value
channel.listData.append((lineNumber,value))
except Exception as e:
raise Exception("Error parsing line %d of log file %s - %s" % (lineNumber,self.filename,e.args[0]))
# gather some general stats about the log
self.lineCount = lineNumber
self.filesizeKB = os.path.getsize(self.filename) / 1024.0
# TODO: switch duration calculation to use TimeMS values rather than GPS timestemp
if "GPS" in self.channels:
# the GPS time label changed at some point, need to handle both
timeLabel = "TimeMS"
if timeLabel not in self.channels["GPS"]:
timeLabel = "Time"
firstTimeGPS = self.channels["GPS"][timeLabel].listData[0][1]
lastTimeGPS = self.channels["GPS"][timeLabel].listData[-1][1]
self.durationSecs = (lastTimeGPS-firstTimeGPS) / 1000
# TODO: calculate logging rate based on timestamps
# ...

View File

@ -32,221 +32,221 @@ from xml.sax.saxutils import escape
class TestResult:
'''all tests return a standardized result type'''
class StatusType:
# NA means not applicable for this log (e.g. copter tests against a plane log), UNKNOWN means it is missing data required for the test
GOOD, FAIL, WARN, UNKNOWN, NA = range(5)
status = None
statusMessage = "" # can be multi-line
'''all tests return a standardized result type'''
class StatusType:
# NA means not applicable for this log (e.g. copter tests against a plane log), UNKNOWN means it is missing data required for the test
GOOD, FAIL, WARN, UNKNOWN, NA = range(5)
status = None
statusMessage = "" # can be multi-line
class Test:
'''base class to be inherited by log tests. Each test should be quite granular so we have lots of small tests with clear results'''
name = ""
result = None # will be an instance of TestResult after being run
execTime = None
enable = True
def run(self, logdata, verbose=False):
pass
'''base class to be inherited by log tests. Each test should be quite granular so we have lots of small tests with clear results'''
name = ""
result = None # will be an instance of TestResult after being run
execTime = None
enable = True
def run(self, logdata, verbose=False):
pass
class TestSuite:
'''registers test classes, loading using a basic plugin architecture, and can run them all in one run() operation'''
tests = []
logfile = None
logdata = None
'''registers test classes, loading using a basic plugin architecture, and can run them all in one run() operation'''
tests = []
logfile = None
logdata = None
def __init__(self):
# dynamically load in Test subclasses from the 'tests' folder
# to prevent one being loaded, move it out of that folder, or set that test's .enable attribute to False
dirName = os.path.dirname(os.path.abspath(__file__))
testScripts = glob.glob(dirName + '/tests/*.py')
testClasses = []
for script in testScripts:
m = imp.load_source("m",script)
for name, obj in inspect.getmembers(m, inspect.isclass):
if name not in testClasses and inspect.getsourcefile(obj) == script:
testClasses.append(name)
self.tests.append(obj())
def __init__(self):
# dynamically load in Test subclasses from the 'tests' folder
# to prevent one being loaded, move it out of that folder, or set that test's .enable attribute to False
dirName = os.path.dirname(os.path.abspath(__file__))
testScripts = glob.glob(dirName + '/tests/*.py')
testClasses = []
for script in testScripts:
m = imp.load_source("m",script)
for name, obj in inspect.getmembers(m, inspect.isclass):
if name not in testClasses and inspect.getsourcefile(obj) == script:
testClasses.append(name)
self.tests.append(obj())
# and here's an example of explicitly loading a Test class if you wanted to do that
# m = imp.load_source("m", dirName + '/tests/TestBadParams.py')
# self.tests.append(m.TestBadParams())
# and here's an example of explicitly loading a Test class if you wanted to do that
# m = imp.load_source("m", dirName + '/tests/TestBadParams.py')
# self.tests.append(m.TestBadParams())
def run(self, logdata, verbose):
'''run all registered tests in a single call, gathering execution timing info'''
self.logdata = logdata
self.logfile = logdata.filename
for test in self.tests:
# run each test in turn, gathering timing info
if test.enable:
startTime = time.time()
test.run(self.logdata, verbose) # RUN THE TEST
endTime = time.time()
test.execTime = 1000 * (endTime-startTime)
def run(self, logdata, verbose):
'''run all registered tests in a single call, gathering execution timing info'''
self.logdata = logdata
self.logfile = logdata.filename
for test in self.tests:
# run each test in turn, gathering timing info
if test.enable:
startTime = time.time()
test.run(self.logdata, verbose) # RUN THE TEST
endTime = time.time()
test.execTime = 1000 * (endTime-startTime)
def outputPlainText(self, outputStats):
'''output test results in plain text'''
print 'Dataflash log analysis report for file: ' + self.logfile
print 'Log size: %.2fmb (%d lines)' % (self.logdata.filesizeKB / 1024.0, self.logdata.lineCount)
print 'Log duration: %s' % str(datetime.timedelta(seconds=self.logdata.durationSecs)) + '\n'
def outputPlainText(self, outputStats):
'''output test results in plain text'''
print 'Dataflash log analysis report for file: ' + self.logfile
print 'Log size: %.2fmb (%d lines)' % (self.logdata.filesizeKB / 1024.0, self.logdata.lineCount)
print 'Log duration: %s' % str(datetime.timedelta(seconds=self.logdata.durationSecs)) + '\n'
if self.logdata.vehicleType == "ArduCopter" and self.logdata.getCopterType():
print 'Vehicle Type: %s (%s)' % (self.logdata.vehicleType, self.logdata.getCopterType())
else:
print 'Vehicle Type: %s' % self.logdata.vehicleType
print 'Firmware Version: %s (%s)' % (self.logdata.firmwareVersion, self.logdata.firmwareHash)
print 'Hardware: %s' % self.logdata.hardwareType
print 'Free RAM: %s' % self.logdata.freeRAM
if self.logdata.skippedLines:
print "\nWARNING: %d malformed log lines skipped during read" % self.logdata.skippedLines
print '\n'
print "Test Results:"
for test in self.tests:
if not test.enable:
continue
statusMessageFirstLine = test.result.statusMessage.strip('\n\r').split('\n')[0]
statusMessageExtra = test.result.statusMessage.strip('\n\r').split('\n')[1:]
execTime = ""
if outputStats:
execTime = " (%6.2fms)" % (test.execTime)
if test.result.status == TestResult.StatusType.GOOD:
print " %20s: GOOD %-55s%s" % (test.name, statusMessageFirstLine, execTime)
elif test.result.status == TestResult.StatusType.FAIL:
print " %20s: FAIL %-55s%s [GRAPH]" % (test.name, statusMessageFirstLine, execTime)
elif test.result.status == TestResult.StatusType.WARN:
print " %20s: WARN %-55s%s [GRAPH]" % (test.name, statusMessageFirstLine, execTime)
elif test.result.status == TestResult.StatusType.NA:
# skip any that aren't relevant for this vehicle/hardware/etc
continue
else:
print " %20s: UNKNOWN %-55s%s" % (test.name, statusMessageFirstLine, execTime)
#if statusMessageExtra:
for line in statusMessageExtra:
print " %29s %s" % ("",line)
if self.logdata.vehicleType == "ArduCopter" and self.logdata.getCopterType():
print 'Vehicle Type: %s (%s)' % (self.logdata.vehicleType, self.logdata.getCopterType())
else:
print 'Vehicle Type: %s' % self.logdata.vehicleType
print 'Firmware Version: %s (%s)' % (self.logdata.firmwareVersion, self.logdata.firmwareHash)
print 'Hardware: %s' % self.logdata.hardwareType
print 'Free RAM: %s' % self.logdata.freeRAM
if self.logdata.skippedLines:
print "\nWARNING: %d malformed log lines skipped during read" % self.logdata.skippedLines
print '\n'
print '\n'
print 'The Log Analyzer is currently BETA code.\nFor any support or feedback on the log analyzer please email Andrew Chapman (amchapman@gmail.com)'
print '\n'
print "Test Results:"
for test in self.tests:
if not test.enable:
continue
statusMessageFirstLine = test.result.statusMessage.strip('\n\r').split('\n')[0]
statusMessageExtra = test.result.statusMessage.strip('\n\r').split('\n')[1:]
execTime = ""
if outputStats:
execTime = " (%6.2fms)" % (test.execTime)
if test.result.status == TestResult.StatusType.GOOD:
print " %20s: GOOD %-55s%s" % (test.name, statusMessageFirstLine, execTime)
elif test.result.status == TestResult.StatusType.FAIL:
print " %20s: FAIL %-55s%s [GRAPH]" % (test.name, statusMessageFirstLine, execTime)
elif test.result.status == TestResult.StatusType.WARN:
print " %20s: WARN %-55s%s [GRAPH]" % (test.name, statusMessageFirstLine, execTime)
elif test.result.status == TestResult.StatusType.NA:
# skip any that aren't relevant for this vehicle/hardware/etc
continue
else:
print " %20s: UNKNOWN %-55s%s" % (test.name, statusMessageFirstLine, execTime)
#if statusMessageExtra:
for line in statusMessageExtra:
print " %29s %s" % ("",line)
def outputXML(self, xmlFile):
'''output test results to an XML file'''
print '\n'
print 'The Log Analyzer is currently BETA code.\nFor any support or feedback on the log analyzer please email Andrew Chapman (amchapman@gmail.com)'
print '\n'
# open the file for writing
xml = None
try:
xml = open(xmlFile, 'w')
except:
sys.stderr.write("Error opening output xml file: %s" % xmlFile)
sys.exit(1)
def outputXML(self, xmlFile):
'''output test results to an XML file'''
# open the file for writing
xml = None
try:
xml = open(xmlFile, 'w')
except:
sys.stderr.write("Error opening output xml file: %s" % xmlFile)
sys.exit(1)
# output header info
print >>xml, "<?xml version=\"1.0\" encoding=\"UTF-8\"?>"
print >>xml, "<loganalysis>"
print >>xml, "<header>"
print >>xml, " <logfile>" + escape(self.logfile) + "</logfile>"
print >>xml, " <sizekb>" + escape(`self.logdata.filesizeKB`) + "</sizekb>"
print >>xml, " <sizelines>" + escape(`self.logdata.lineCount`) + "</sizelines>"
print >>xml, " <duration>" + escape(str(datetime.timedelta(seconds=self.logdata.durationSecs))) + "</duration>"
print >>xml, " <vehicletype>" + escape(self.logdata.vehicleType) + "</vehicletype>"
if self.logdata.vehicleType == "ArduCopter" and self.logdata.getCopterType():
print >>xml, " <coptertype>" + escape(self.logdata.getCopterType()) + "</coptertype>"
print >>xml, " <firmwareversion>" + escape(self.logdata.firmwareVersion) + "</firmwareversion>"
print >>xml, " <firmwarehash>" + escape(self.logdata.firmwareHash) + "</firmwarehash>"
print >>xml, " <hardwaretype>" + escape(self.logdata.hardwareType) + "</hardwaretype>"
print >>xml, " <freemem>" + escape(`self.logdata.freeRAM`) + "</freemem>"
print >>xml, " <skippedlines>" + escape(`self.logdata.skippedLines`) + "</skippedlines>"
print >>xml, "</header>"
# output header info
print >>xml, "<?xml version=\"1.0\" encoding=\"UTF-8\"?>"
print >>xml, "<loganalysis>"
print >>xml, "<header>"
print >>xml, " <logfile>" + escape(self.logfile) + "</logfile>"
print >>xml, " <sizekb>" + escape(`self.logdata.filesizeKB`) + "</sizekb>"
print >>xml, " <sizelines>" + escape(`self.logdata.lineCount`) + "</sizelines>"
print >>xml, " <duration>" + escape(str(datetime.timedelta(seconds=self.logdata.durationSecs))) + "</duration>"
print >>xml, " <vehicletype>" + escape(self.logdata.vehicleType) + "</vehicletype>"
if self.logdata.vehicleType == "ArduCopter" and self.logdata.getCopterType():
print >>xml, " <coptertype>" + escape(self.logdata.getCopterType()) + "</coptertype>"
print >>xml, " <firmwareversion>" + escape(self.logdata.firmwareVersion) + "</firmwareversion>"
print >>xml, " <firmwarehash>" + escape(self.logdata.firmwareHash) + "</firmwarehash>"
print >>xml, " <hardwaretype>" + escape(self.logdata.hardwareType) + "</hardwaretype>"
print >>xml, " <freemem>" + escape(`self.logdata.freeRAM`) + "</freemem>"
print >>xml, " <skippedlines>" + escape(`self.logdata.skippedLines`) + "</skippedlines>"
print >>xml, "</header>"
# output parameters
print >>xml, "<params>"
for param, value in self.logdata.parameters.items():
print >>xml, " <param name=\"%s\" value=\"%s\" />" % (param,escape(`value`))
print >>xml, "</params>"
# output parameters
print >>xml, "<params>"
for param, value in self.logdata.parameters.items():
print >>xml, " <param name=\"%s\" value=\"%s\" />" % (param,escape(`value`))
print >>xml, "</params>"
# output test results
print >>xml, "<results>"
for test in self.tests:
if not test.enable:
continue
print >>xml, " <result>"
if test.result.status == TestResult.StatusType.GOOD:
print >>xml, " <name>" + escape(test.name) + "</name>"
print >>xml, " <status>GOOD</status>"
print >>xml, " <message>" + escape(test.result.statusMessage) + "</message>"
elif test.result.status == TestResult.StatusType.FAIL:
print >>xml, " <name>" + escape(test.name) + "</name>"
print >>xml, " <status>FAIL</status>"
print >>xml, " <message>" + escape(test.result.statusMessage) + "</message>"
print >>xml, " <data>(test data will be embeded here at some point)</data>"
elif test.result.status == TestResult.StatusType.WARN:
print >>xml, " <name>" + escape(test.name) + "</name>"
print >>xml, " <status>WARN</status>"
print >>xml, " <message>" + escape(test.result.statusMessage) + "</message>"
print >>xml, " <data>(test data will be embeded here at some point)</data>"
elif test.result.status == TestResult.StatusType.NA:
print >>xml, " <name>" + escape(test.name) + "</name>"
print >>xml, " <status>NA</status>"
else:
print >>xml, " <name>" + escape(test.name) + "</name>"
print >>xml, " <status>UNKNOWN</status>"
print >>xml, " <message>" + escape(test.result.statusMessage) + "</message>"
print >>xml, " </result>"
print >>xml, "</results>"
# output test results
print >>xml, "<results>"
for test in self.tests:
if not test.enable:
continue
print >>xml, " <result>"
if test.result.status == TestResult.StatusType.GOOD:
print >>xml, " <name>" + escape(test.name) + "</name>"
print >>xml, " <status>GOOD</status>"
print >>xml, " <message>" + escape(test.result.statusMessage) + "</message>"
elif test.result.status == TestResult.StatusType.FAIL:
print >>xml, " <name>" + escape(test.name) + "</name>"
print >>xml, " <status>FAIL</status>"
print >>xml, " <message>" + escape(test.result.statusMessage) + "</message>"
print >>xml, " <data>(test data will be embeded here at some point)</data>"
elif test.result.status == TestResult.StatusType.WARN:
print >>xml, " <name>" + escape(test.name) + "</name>"
print >>xml, " <status>WARN</status>"
print >>xml, " <message>" + escape(test.result.statusMessage) + "</message>"
print >>xml, " <data>(test data will be embeded here at some point)</data>"
elif test.result.status == TestResult.StatusType.NA:
print >>xml, " <name>" + escape(test.name) + "</name>"
print >>xml, " <status>NA</status>"
else:
print >>xml, " <name>" + escape(test.name) + "</name>"
print >>xml, " <status>UNKNOWN</status>"
print >>xml, " <message>" + escape(test.result.statusMessage) + "</message>"
print >>xml, " </result>"
print >>xml, "</results>"
print >>xml, "</loganalysis>"
print >>xml, "</loganalysis>"
xml.close()
xml.close()
def main():
dirName = os.path.dirname(os.path.abspath(__file__))
dirName = os.path.dirname(os.path.abspath(__file__))
# deal with command line arguments
parser = argparse.ArgumentParser(description='Analyze an APM Dataflash log for known issues')
parser.add_argument('logfile', type=argparse.FileType('r'), help='path to Dataflash log file')
parser.add_argument('-q', '--quiet', metavar='', action='store_const', const=True, help='quiet mode, do not print results')
parser.add_argument('-p', '--profile', metavar='', action='store_const', const=True, help='output performance profiling data')
parser.add_argument('-s', '--skip_bad', metavar='', action='store_const', const=True, help='skip over corrupt dataflash lines')
parser.add_argument('-e', '--empty', metavar='', action='store_const', const=True, help='run an initial check for an empty log')
parser.add_argument('-x', '--xml', type=str, metavar='XML file', nargs='?', const='', default='', help='write output to specified XML file')
parser.add_argument('-v', '--verbose', metavar='', action='store_const', const=True, help='verbose output')
args = parser.parse_args()
# deal with command line arguments
parser = argparse.ArgumentParser(description='Analyze an APM Dataflash log for known issues')
parser.add_argument('logfile', type=argparse.FileType('r'), help='path to Dataflash log file')
parser.add_argument('-q', '--quiet', metavar='', action='store_const', const=True, help='quiet mode, do not print results')
parser.add_argument('-p', '--profile', metavar='', action='store_const', const=True, help='output performance profiling data')
parser.add_argument('-s', '--skip_bad', metavar='', action='store_const', const=True, help='skip over corrupt dataflash lines')
parser.add_argument('-e', '--empty', metavar='', action='store_const', const=True, help='run an initial check for an empty log')
parser.add_argument('-x', '--xml', type=str, metavar='XML file', nargs='?', const='', default='', help='write output to specified XML file')
parser.add_argument('-v', '--verbose', metavar='', action='store_const', const=True, help='verbose output')
args = parser.parse_args()
# load the log
startTime = time.time()
logdata = DataflashLog.DataflashLog()
logdata.read(args.logfile.name, ignoreBadlines=args.skip_bad) # read log
endTime = time.time()
if args.profile:
print "Log file read time: %.2f seconds" % (endTime-startTime)
# load the log
startTime = time.time()
logdata = DataflashLog.DataflashLog()
logdata.read(args.logfile.name, ignoreBadlines=args.skip_bad) # read log
endTime = time.time()
if args.profile:
print "Log file read time: %.2f seconds" % (endTime-startTime)
# check for empty log if requested
if args.empty:
emptyErr = DataflashLog.DataflashLogHelper.isLogEmpty(logdata)
if emptyErr:
sys.stderr.write("Empty log file: %s, %s" % (logdata.filename, emptyErr))
sys.exit(1)
# check for empty log if requested
if args.empty:
emptyErr = DataflashLog.DataflashLogHelper.isLogEmpty(logdata)
if emptyErr:
sys.stderr.write("Empty log file: %s, %s" % (logdata.filename, emptyErr))
sys.exit(1)
#run the tests, and gather timings
testSuite = TestSuite()
startTime = time.time()
testSuite.run(logdata, args.verbose) # run tests
endTime = time.time()
if args.profile:
print "Test suite run time: %.2f seconds" % (endTime-startTime)
#run the tests, and gather timings
testSuite = TestSuite()
startTime = time.time()
testSuite.run(logdata, args.verbose) # run tests
endTime = time.time()
if args.profile:
print "Test suite run time: %.2f seconds" % (endTime-startTime)
# deal with output
if not args.quiet:
testSuite.outputPlainText(args.profile)
if args.xml:
testSuite.outputXML(args.xml)
if not args.quiet:
print "XML output written to file: %s\n" % args.xml
# deal with output
if not args.quiet:
testSuite.outputPlainText(args.profile)
if args.xml:
testSuite.outputXML(args.xml)
if not args.quiet:
print "XML output written to file: %s\n" % args.xml
if __name__ == "__main__":
main()
main()