Tools: LogAnalyzer: use print_function

This commit is contained in:
Peter Barker 2017-07-22 09:09:51 +10:00
parent 22578bd7f0
commit 44d1c77b0c
9 changed files with 110 additions and 95 deletions

View File

@ -226,7 +226,7 @@ class Channel(object):
index = bisect.bisect_left(self.listData, (lineNumber,-99999))
while index<len(self.listData):
line = self.listData[index][0]
#print "Looking forwards for nearest value to line number %d, starting at line %d" % (lineNumber,line) # TEMP
#print("Looking forwards for nearest value to line number %d, starting at line %d" % (lineNumber,line)) # TEMP
if line >= lineNumber:
return (self.listData[index][1],line)
index += 1
@ -236,7 +236,7 @@ class Channel(object):
index = bisect.bisect_left(self.listData, (lineNumber,-99999)) - 1
while index>=0:
line = self.listData[index][0]
#print "Looking backwards for nearest value to line number %d, starting at line %d" % (lineNumber,line) # TEMP
#print("Looking backwards for nearest value to line number %d, starting at line %d" % (lineNumber,line)) # TEMP
if line <= lineNumber:
return (self.listData[index][1],line)
index -= 1
@ -264,8 +264,8 @@ class Channel(object):
def getIndexOf(self, lineNumber):
'''returns the index within this channel's listData of the given lineNumber, or raises an Exception if not found'''
index = bisect.bisect_left(self.listData, (lineNumber,-99999))
#print "INDEX of line %d: %d" % (lineNumber,index)
#print "self.listData[index][0]: %d" % self.listData[index][0]
#print("INDEX of line %d: %d" % (lineNumber,index))
#print("self.listData[index][0]: %d" % self.listData[index][0])
if (self.listData[index][0] == lineNumber):
return index
else:
@ -375,8 +375,8 @@ class DataflashLogHelper:
chunkTimeSeconds = (DataflashLogHelper.getTimeAtLine(logdata,endLine)-DataflashLogHelper.getTimeAtLine(logdata,startLine)+1) / 1000.0
if chunkTimeSeconds > minLengthSeconds:
chunks.append((startLine,endLine))
#print "LOITER chunk: %d to %d, %d lines" % (startLine,endLine,endLine-startLine+1)
#print " (time %d to %d, %d seconds)" % (DataflashLogHelper.getTimeAtLine(logdata,startLine), DataflashLogHelper.getTimeAtLine(logdata,endLine), chunkTimeSeconds)
#print("LOITER chunk: %d to %d, %d lines" % (startLine,endLine,endLine-startLine+1))
#print(" (time %d to %d, %d seconds)" % (DataflashLogHelper.getTimeAtLine(logdata,startLine), DataflashLogHelper.getTimeAtLine(logdata,endLine), chunkTimeSeconds))
chunks.sort(chunkSizeCompare)
return chunks
@ -610,7 +610,7 @@ class DataflashLog(object):
lineNumber = lineNumber + 1
numBytes += len(line) + 1
try:
#print "Reading line: %d" % lineNumber
#print("Reading line: %d" % lineNumber)
line = line.strip('\n\r')
tokens = line.split(', ')
# first handle the log header lines

View File

@ -17,6 +17,7 @@
# TODO: add test for noisy baro values
# TODO: support loading binary log files (use Tridge's mavlogdump?)
from __future__ import print_function
import DataflashLog
@ -93,22 +94,22 @@ class TestSuite(object):
def outputPlainText(self, outputStats):
'''output test results in plain text'''
print 'Dataflash log analysis report for file: ' + self.logfile
print 'Log size: %.2fmb (%d lines)' % (self.logdata.filesizeKB / 1024.0, self.logdata.lineCount)
print 'Log duration: %s' % str(datetime.timedelta(seconds=self.logdata.durationSecs)) + '\n'
print('Dataflash log analysis report for file: ' + self.logfile)
print('Log size: %.2fmb (%d lines)' % (self.logdata.filesizeKB / 1024.0, self.logdata.lineCount))
print('Log duration: %s' % str(datetime.timedelta(seconds=self.logdata.durationSecs)) + '\n')
if self.logdata.vehicleType == VehicleType.Copter and self.logdata.getCopterType():
print 'Vehicle Type: %s (%s)' % (self.logdata.vehicleTypeString, self.logdata.getCopterType())
print('Vehicle Type: %s (%s)' % (self.logdata.vehicleTypeString, self.logdata.getCopterType()))
else:
print 'Vehicle Type: %s' % self.logdata.vehicleTypeString
print 'Firmware Version: %s (%s)' % (self.logdata.firmwareVersion, self.logdata.firmwareHash)
print 'Hardware: %s' % self.logdata.hardwareType
print 'Free RAM: %s' % self.logdata.freeRAM
print('Vehicle Type: %s' % self.logdata.vehicleTypeString)
print('Firmware Version: %s (%s)' % (self.logdata.firmwareVersion, self.logdata.firmwareHash))
print('Hardware: %s' % self.logdata.hardwareType)
print('Free RAM: %s' % self.logdata.freeRAM)
if self.logdata.skippedLines:
print "\nWARNING: %d malformed log lines skipped during read" % self.logdata.skippedLines
print '\n'
print("\nWARNING: %d malformed log lines skipped during read" % self.logdata.skippedLines)
print('\n')
print "Test Results:"
print("Test Results:")
for test in self.tests:
if not test.enable:
continue
@ -118,23 +119,23 @@ class TestSuite(object):
if outputStats:
execTime = " (%6.2fms)" % (test.execTime)
if test.result.status == TestResult.StatusType.GOOD:
print " %20s: GOOD %-55s%s" % (test.name, statusMessageFirstLine, execTime)
print(" %20s: GOOD %-55s%s" % (test.name, statusMessageFirstLine, execTime))
elif test.result.status == TestResult.StatusType.FAIL:
print " %20s: FAIL %-55s%s [GRAPH]" % (test.name, statusMessageFirstLine, execTime)
print(" %20s: FAIL %-55s%s [GRAPH]" % (test.name, statusMessageFirstLine, execTime))
elif test.result.status == TestResult.StatusType.WARN:
print " %20s: WARN %-55s%s [GRAPH]" % (test.name, statusMessageFirstLine, execTime)
print(" %20s: WARN %-55s%s [GRAPH]" % (test.name, statusMessageFirstLine, execTime))
elif test.result.status == TestResult.StatusType.NA:
# skip any that aren't relevant for this vehicle/hardware/etc
continue
else:
print " %20s: UNKNOWN %-55s%s" % (test.name, statusMessageFirstLine, execTime)
print(" %20s: UNKNOWN %-55s%s" % (test.name, statusMessageFirstLine, execTime))
#if statusMessageExtra:
for line in statusMessageExtra:
print " %29s %s" % ("",line)
print(" %29s %s" % ("",line))
print '\n'
print 'The Log Analyzer is currently BETA code.\nFor any support or feedback on the log analyzer please email Andrew Chapman (amchapman@gmail.com)'
print '\n'
print('\n')
print('The Log Analyzer is currently BETA code.\nFor any support or feedback on the log analyzer please email Andrew Chapman (amchapman@gmail.com)')
print('\n')
def outputXML(self, xmlFile):
'''output test results to an XML file'''
@ -152,60 +153,60 @@ class TestSuite(object):
# output header info
print >>xml, "<?xml version=\"1.0\" encoding=\"UTF-8\"?>"
print >>xml, "<loganalysis>"
print >>xml, "<header>"
print >>xml, " <logfile>" + escape(self.logfile) + "</logfile>"
print >>xml, " <sizekb>" + escape(`self.logdata.filesizeKB`) + "</sizekb>"
print >>xml, " <sizelines>" + escape(`self.logdata.lineCount`) + "</sizelines>"
print >>xml, " <duration>" + escape(str(datetime.timedelta(seconds=self.logdata.durationSecs))) + "</duration>"
print >>xml, " <vehicletype>" + escape(self.logdata.vehicleTypeString) + "</vehicletype>"
xml.write("<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n")
xml.write("<loganalysis>\n")
xml.write("<header>\n")
xml.write(" <logfile>" + escape(self.logfile) + "</logfile>\n")
xml.write(" <sizekb>" + escape(`self.logdata.filesizeKB`) + "</sizekb>\n")
xml.write(" <sizelines>" + escape(`self.logdata.lineCount`) + "</sizelines>\n")
xml.write(" <duration>" + escape(str(datetime.timedelta(seconds=self.logdata.durationSecs))) + "</duration>\n")
xml.write(" <vehicletype>" + escape(self.logdata.vehicleTypeString) + "</vehicletype>\n")
if self.logdata.vehicleType == VehicleType.Copter and self.logdata.getCopterType():
print >>xml, " <coptertype>" + escape(self.logdata.getCopterType()) + "</coptertype>"
print >>xml, " <firmwareversion>" + escape(self.logdata.firmwareVersion) + "</firmwareversion>"
print >>xml, " <firmwarehash>" + escape(self.logdata.firmwareHash) + "</firmwarehash>"
print >>xml, " <hardwaretype>" + escape(self.logdata.hardwareType) + "</hardwaretype>"
print >>xml, " <freemem>" + escape(`self.logdata.freeRAM`) + "</freemem>"
print >>xml, " <skippedlines>" + escape(`self.logdata.skippedLines`) + "</skippedlines>"
print >>xml, "</header>"
xml.write(" <coptertype>" + escape(self.logdata.getCopterType()) + "</coptertype>\n")
xml.write(" <firmwareversion>" + escape(self.logdata.firmwareVersion) + "</firmwareversion>\n")
xml.write(" <firmwarehash>" + escape(self.logdata.firmwareHash) + "</firmwarehash>\n")
xml.write(" <hardwaretype>" + escape(self.logdata.hardwareType) + "</hardwaretype>\n")
xml.write(" <freemem>" + escape(`self.logdata.freeRAM`) + "</freemem>\n")
xml.write(" <skippedlines>" + escape(`self.logdata.skippedLines`) + "</skippedlines>\n")
xml.write("</header>\n")
# output parameters
print >>xml, "<params>"
xml.write("<params>\n")
for param, value in self.logdata.parameters.items():
print >>xml, " <param name=\"%s\" value=\"%s\" />" % (param,escape(`value`))
print >>xml, "</params>"
xml.write(" <param name=\"%s\" value=\"%s\" />\n" % (param,escape(`value`)))
xml.write("</params>\n")
# output test results
print >>xml, "<results>"
xml.write("<results>\n")
for test in self.tests:
if not test.enable:
continue
print >>xml, " <result>"
xml.write(" <result>\n")
if test.result.status == TestResult.StatusType.GOOD:
print >>xml, " <name>" + escape(test.name) + "</name>"
print >>xml, " <status>GOOD</status>"
print >>xml, " <message>" + escape(test.result.statusMessage) + "</message>"
xml.write(" <name>" + escape(test.name) + "</name>\n")
xml.write(" <status>GOOD</status>\n")
xml.write(" <message>" + escape(test.result.statusMessage) + "</message>\n")
elif test.result.status == TestResult.StatusType.FAIL:
print >>xml, " <name>" + escape(test.name) + "</name>"
print >>xml, " <status>FAIL</status>"
print >>xml, " <message>" + escape(test.result.statusMessage) + "</message>"
print >>xml, " <data>(test data will be embedded here at some point)</data>"
xml.write(" <name>" + escape(test.name) + "</name>\n")
xml.write(" <status>FAIL</status>\n")
xml.write(" <message>" + escape(test.result.statusMessage) + "</message>\n")
xml.write(" <data>(test data will be embedded here at some point)</data>\n")
elif test.result.status == TestResult.StatusType.WARN:
print >>xml, " <name>" + escape(test.name) + "</name>"
print >>xml, " <status>WARN</status>"
print >>xml, " <message>" + escape(test.result.statusMessage) + "</message>"
print >>xml, " <data>(test data will be embedded here at some point)</data>"
xml.write(" <name>" + escape(test.name) + "</name>\n")
xml.write(" <status>WARN</status>\n")
xml.write(" <message>" + escape(test.result.statusMessage) + "</message>\n")
xml.write(" <data>(test data will be embedded here at some point)</data>\n")
elif test.result.status == TestResult.StatusType.NA:
print >>xml, " <name>" + escape(test.name) + "</name>"
print >>xml, " <status>NA</status>"
xml.write(" <name>" + escape(test.name) + "</name>\n")
xml.write(" <status>NA</status>\n")
else:
print >>xml, " <name>" + escape(test.name) + "</name>"
print >>xml, " <status>UNKNOWN</status>"
print >>xml, " <message>" + escape(test.result.statusMessage) + "</message>"
print >>xml, " </result>"
print >>xml, "</results>"
xml.write(" <name>" + escape(test.name) + "</name>\n")
xml.write(" <status>UNKNOWN</status>\n")
xml.write(" <message>" + escape(test.result.statusMessage) + "</message>\n")
xml.write(" </result>\n")
xml.write("</results>\n")
print >>xml, "</loganalysis>"
xml.write("</loganalysis>\n")
xml.close()
@ -230,7 +231,7 @@ def main():
logdata = DataflashLog.DataflashLog(args.logfile.name, format=args.format, ignoreBadlines=args.skip_bad) # read log
endTime = time.time()
if args.profile:
print "Log file read time: %.2f seconds" % (endTime-startTime)
print("Log file read time: %.2f seconds" % (endTime-startTime))
# check for empty log if requested
if args.empty:
@ -245,7 +246,7 @@ def main():
testSuite.run(logdata, args.verbose) # run tests
endTime = time.time()
if args.profile:
print "Test suite run time: %.2f seconds" % (endTime-startTime)
print("Test suite run time: %.2f seconds" % (endTime-startTime))
# deal with output
if not args.quiet:
@ -253,7 +254,7 @@ def main():
if args.xml:
testSuite.outputXML(args.xml)
if not args.quiet:
print "XML output written to file: %s\n" % args.xml
print("XML output written to file: %s\n" % args.xml)
if __name__ == "__main__":

View File

@ -7,6 +7,8 @@
# TODO: implement more unit+regression tests
from __future__ import print_function
import DataflashLog
import traceback
from VehicleType import VehicleType
@ -69,11 +71,11 @@ try:
# ...
print "All unit/regression tests GOOD\n"
print("All unit/regression tests GOOD\n")
except Exception as e:
print "Error found: " + traceback.format_exc()
print "UNIT TEST FAILED\n"
print("Error found: " + traceback.format_exc())
print("UNIT TEST FAILED\n")

View File

@ -1,3 +1,5 @@
from __future__ import print_function
from LogAnalyzer import Test,TestResult
import DataflashLog
@ -26,7 +28,7 @@ class TestDualGyroDrift(Test):
# imu2X = logdata.channels["IMU2"]["GyrX"].listData
# # NOTE: weird thing about Holger's log is that the counts of IMU+IMU2 are different
# print "length 1: %.2f, length 2: %.2f" % (len(imuX),len(imu2X))
# print("length 1: %.2f, length 2: %.2f" % (len(imuX),len(imu2X)))
# #assert(len(imuX) == len(imu2X))
# # divide the curve into segments and get the average of each segment
@ -68,15 +70,15 @@ class TestDualGyroDrift(Test):
# #pylab.plot(range(0,(nSamples*sliceLength1),sliceLength1), imu1ZAverages, 'b')
# print "Gyro averages1X: " + `imu1XAverages`
# print "Gyro averages1Y: " + `imu1YAverages`
# print "Gyro averages1Z: " + `imu1ZAverages` + "\n"
# print "Gyro averages2X: " + `imu2XAverages`
# print "Gyro averages2Y: " + `imu2YAverages`
# print "Gyro averages2Z: " + `imu2ZAverages` + "\n"
# print "Gyro averages diff X: " + `imuXDiffAverages`
# print "Gyro averages diff Y: " + `imuYDiffAverages`
# print "Gyro averages diff Z: " + `imuZDiffAverages`
# print("Gyro averages1X: " + `imu1XAverages`)
# print("Gyro averages1Y: " + `imu1YAverages`)
# print("Gyro averages1Z: " + `imu1ZAverages` + "\n")
# print("Gyro averages2X: " + `imu2XAverages`)
# print("Gyro averages2Y: " + `imu2YAverages`)
# print("Gyro averages2Z: " + `imu2ZAverages` + "\n")
# print("Gyro averages diff X: " + `imuXDiffAverages`)
# print("Gyro averages diff Y: " + `imuYDiffAverages`)
# print("Gyro averages diff Z: " + `imuZDiffAverages`)
# # lowpass filter using numpy
# # cutoff = 100
@ -90,10 +92,10 @@ class TestDualGyroDrift(Test):
# # TMP: DISPLAY BEFORE+AFTER plots
# pylab.show()
# # print "imuX average before lowpass filter: %.8f" % logdata.channels["IMU"]["GyrX"].avg()
# # print "imuX average after lowpass filter: %.8f" % numpy.mean(imuXFiltered)
# # print "imu2X average before lowpass filter: %.8f" % logdata.channels["IMU2"]["GyrX"].avg()
# # print "imu2X average after lowpass filter: %.8f" % numpy.mean(imu2XFiltered)
# # print("imuX average before lowpass filter: %.8f" % logdata.channels["IMU"]["GyrX"].avg())
# # print("imuX average after lowpass filter: %.8f" % numpy.mean(imuXFiltered))
# # print("imu2X average before lowpass filter: %.8f" % logdata.channels["IMU2"]["GyrX"].avg())
# # print("imu2X average after lowpass filter: %.8f" % numpy.mean(imu2XFiltered))
# avg1X = logdata.channels["IMU"]["GyrX"].avg()
# avg1Y = logdata.channels["IMU"]["GyrY"].avg()

View File

@ -1,3 +1,5 @@
from __future__ import print_function
from LogAnalyzer import Test,TestResult
import DataflashLog
@ -23,12 +25,12 @@ class TestDupeLogData(Test):
# c
data = logdata.channels["ATT"]["Pitch"].listData
for i in range(sampleStartIndex, len(data)):
#print "Checking against index %d" % i
#print("Checking against index %d" % i)
if i == sampleStartIndex:
continue # skip matching against ourselves
j = 0
while j<20 and (i+j)<len(data) and data[i+j][1] == sample[j][1]:
#print "### Match found, j=%d, data=%f, sample=%f, log data matched to sample at line %d" % (j,data[i+j][1],sample[j][1],data[i+j][0])
#print("### Match found, j=%d, data=%f, sample=%f, log data matched to sample at line %d" % (j,data[i+j][1],sample[j][1],data[i+j][0]))
j += 1
if j == 20: # all samples match
return data[i][0]
@ -52,17 +54,17 @@ class TestDupeLogData(Test):
step = attEndIndex / 11
for i in range(step,attEndIndex-step,step):
sampleStartIndices.append(i)
#print "Dupe data sample point index %d at line %d" % (i, logdata.channels["ATT"]["Pitch"].listData[i][0])
#print("Dupe data sample point index %d at line %d" % (i, logdata.channels["ATT"]["Pitch"].listData[i][0]))
# get 20 datapoints of pitch from each sample location and check for a match elsewhere
sampleIndex = 0
for i in range(sampleStartIndices[0], len(logdata.channels["ATT"]["Pitch"].listData)):
if i == sampleStartIndices[sampleIndex]:
#print "Checking sample %d" % i
#print("Checking sample %d" % i)
sample = logdata.channels["ATT"]["Pitch"].listData[i:i+20]
matchedLine = self.__matchSample(sample, i, logdata)
if matchedLine:
#print "Data from line %d found duplicated at line %d" % (sample[0][0],matchedLine)
#print("Data from line %d found duplicated at line %d" % (sample[0][0],matchedLine))
self.result.status = TestResult.StatusType.FAIL
self.result.statusMessage = "Duplicate data chunks found in log (%d and %d)" % (sample[0][0],matchedLine)
return

View File

@ -1,3 +1,5 @@
from __future__ import print_function
from LogAnalyzer import Test,TestResult
import DataflashLog
from math import sqrt
@ -98,7 +100,7 @@ class TestIMUMatch(Test):
diff_filtered = math.sqrt(xdiff_filtered**2+ydiff_filtered**2+zdiff_filtered**2)
max_diff_filtered = max(max_diff_filtered,diff_filtered)
#print max_diff_filtered
#print(max_diff_filtered)
last_t = t
if max_diff_filtered > fail_threshold:

View File

@ -1,3 +1,5 @@
from __future__ import print_function
from LogAnalyzer import Test,TestResult
import DataflashLog
@ -29,7 +31,7 @@ class TestPerformance(Test):
# if not armingLines:
# break
# if maxT[0] > armingLines[0]:
# #print "Ignoring maxT from line %d, as it is the first PM line after arming on line %d" % (maxT[0],armingLines[0])
# #print("Ignoring maxT from line %d, as it is the first PM line after arming on line %d" % (maxT[0],armingLines[0]))
# ignoreMaxTLines.append(maxT[0])
# armingLines.pop(0)
@ -53,7 +55,7 @@ class TestPerformance(Test):
maxPercentSlow = percentSlow
maxPercentSlowLine = line
#if (maxT > 13000) and line not in ignoreMaxTLines:
# print "MaxT of %d detected on line %d" % (maxT,line)
# print("MaxT of %d detected on line %d" % (maxT,line))
if (maxPercentSlow > 10) or (slowLoopLineCount > 6):
self.result.status = TestResult.StatusType.FAIL
self.result.statusMessage = "%d slow loop lines found, max %.2f%% on line %d" % (slowLoopLineCount,maxPercentSlow,maxPercentSlowLine)

View File

@ -1,3 +1,5 @@
from __future__ import print_function
from LogAnalyzer import Test,TestResult
import DataflashLog
@ -52,7 +54,7 @@ class TestThrust(Test):
start = i
elif start != None:
if (i-start) > minSampleLength:
#print "Found high throttle chunk from line %d to %d (%d samples)" % (data[start][0],data[i][0],i-start+1)
#print("Found high throttle chunk from line %d to %d (%d samples)" % (data[start][0],data[i][0],i-start+1))
highThrottleSegments.append((start,i))
start = None

View File

@ -1,3 +1,5 @@
from __future__ import print_function
from LogAnalyzer import Test,TestResult
import DataflashLog
@ -43,7 +45,7 @@ class TestVibration(Test):
# TODO: accumulate all LOITER chunks over min size, or just use the largest one?
startLine = chunks[0][0]
endLine = chunks[0][1]
#print "TestVibration using LOITER chunk from lines %s to %s" % (`startLine`, `endLine`)
#print("TestVibration using LOITER chunk from lines %s to %s" % (`startLine`, `endLine`))
def getStdDevIMU(logdata, channelName, startLine,endLine):
loiterData = logdata.channels["IMU"][channelName].getSegment(startLine,endLine)