LogAnalyzer: fixed plane+rover parsing, added test for underpowered copters

This commit is contained in:
Andrew Chapman 2014-02-22 20:36:30 +01:00 committed by Andrew Tridgell
parent 301a3bcdaf
commit c4828e1d3d
8 changed files with 112 additions and 23 deletions

View File

@ -10,6 +10,8 @@
import pprint # temp
import collections
import os
import numpy
class Format:
'''Channel format as specified by the FMT lines in the log file'''
@ -30,6 +32,7 @@ class Format:
class Channel:
'''storage for a single stream of data, i.e. all GPS.RelAlt values'''
# TODO: store data as a curve so we can more easily interpolate and sample the slope
dictData = None # dict of linenum->value # store dupe data in dict and list for now, until we decide which is the better way to go
listData = None # list of (linenum,value)
def __init__(self):
@ -45,11 +48,11 @@ class Channel:
def max(self):
return max(self.dictData.values())
def avg(self):
return avg(self.dictData.values())
return numpy.mean(self.dictData.values())
def getNearestValue(self, lineNumber, lookForwards=True):
'''return the nearest data value to the given lineNumber'''
'''find the nearest data value to the given lineNumber, defaults to looking forwards. Returns (value,lineNumber)'''
if lineNumber in self.dictData:
return self.dictData[lineNumber]
return (self.dictData[lineNumber], lineNumber)
offset = 1
if not lookForwards:
offset = -1
@ -59,9 +62,16 @@ class Channel:
line = min(maxLine,line)
while line >= minLine and line <= maxLine:
if line in self.dictData:
return self.dictData[line]
return (self.dictData[line], line)
line = line + offset
raise Exception("Error finding nearest value for line %d" % lineNumber)
def getInterpolatedValue(self, lineNumber):
(prevValue,prevValueLine) = self.getNearestValue(lineNumber, lookForwards=False)
(nextValue,nextValueLine) = self.getNearestValue(lineNumber, lookForwards=False)
if prevValueLine == nextValueLine:
return prevValue
weight = (lineNumber-prevValueLine) / float(nextValueLine-prevValueLine)
return ((weight*prevValue) + ((1-weight)*nextValue))
class DataflashLogHelper:
@ -116,7 +126,9 @@ class DataflashLogHelper:
def isLogEmpty(logdata):
'''returns an human readable error string if the log is essentially empty, otherwise returns None'''
# naive check for now, see if the throttle output was ever above 20%
throttleThreshold = 200
throttleThreshold = 20
if logdata.vehicleType == "ArduCopter":
throttleThreshold = 200 # copter uses 0-1000, plane+rover use 0-100
if "CTUN" in logdata.channels:
maxThrottle = logdata.channels["CTUN"]["ThrOut"].max()
if maxThrottle < throttleThreshold:
@ -126,6 +138,8 @@ class DataflashLogHelper:
class DataflashLog:
'''APM Dataflash log file reader and container class. Keep this simple, add more advanced or specific functions to DataflashLogHelper class'''
# TODO: implement some kind of iterator or different data storage approeach where we can step through the log by time/line and easily access, interpolate and cross-reference values from all channels at that point
filename = None
vehicleType = "" # ArduCopter, ArduPlane, ArduRover, etc, verbatim as given by header
@ -196,6 +210,8 @@ class DataflashLog:
if copterLogPre3Header and line[0:15] == "SYSID_SW_MREV: ":
copterLogPre3Header = False
copterLogPre3Params = True
if line == " Ready to drive." or line == " Ready to FLY.":
continue
if len(tokens) == 1:
tokens2 = line.split(' ')
if line == "":
@ -210,7 +226,7 @@ class DataflashLog:
copterLogPre3Header = True
elif copterLogPre3Header:
pass # just skip over all that until we hit the PARM lines
elif (len(tokens2) == 2 or len(tokens2) == 3) and tokens2[1][0] == "V": # e.g. ArduCopter V3.1 (5c6503e2)
elif (len(tokens2) == 2 or len(tokens2) == 3) and tokens2[1][0].lower() == "v": # e.g. ArduCopter V3.1 (5c6503e2)
self.vehicleType = tokens2[0]
self.firmwareVersion = tokens2[1]
if len(tokens2) == 3:
@ -241,7 +257,12 @@ class DataflashLog:
elif tokens[0] == "MSG":
self.messages[lineNumber] = tokens[1]
elif tokens[0] == "MODE":
self.modeChanges[lineNumber] = (tokens[1],int(tokens[2]))
if self.vehicleType == "ArduCopter":
self.modeChanges[lineNumber] = (tokens[1],int(tokens[2]))
elif self.vehicleType == "ArduPlane" or self.vehicleType == "ArduRover":
self.modeChanges[lineNumber] = (tokens[2],int(tokens[3]))
else:
raise Exception("Unknown log type for MODE line")
# anything else must be the log data
elif not copterLogPre3Header:
groupName = tokens[0]

View File

@ -11,6 +11,8 @@
# - Pixhawk doesn't output one of the FMT labels... forget which one
# - MAG offsets seem to be constant (only seen data on Pixhawk)
# - MAG offsets seem to be cast to int before being output? (param is -84.67, logged as -84)
# - copter+plane use 'V' in their vehicle type/version/build line, rover uses lower case 'v'. Copter+Rover give a build number, plane does not
# - CTUN.ThrOut on copter is 0-1000, on plane+rover it is 0-100
# TODO - unify result statusMessage and extraOutput. simple tests set statusMessage, complex ones append to it with newlines

View File

@ -1,3 +1,5 @@
ArduCopter V3.1
FMT, 128, 89, FMT, BBnNZ, Type,Length,Name,Format
FMT, 129, 23, PARM, Nf, Name,Value
FMT, 130, 45, GPS, BIHBcLLeeEefI, Status,TimeMS,Week,NSats,HDop,Lat,Lng,RelAlt,Alt,Spd,GCrs,VZ,T

View File

@ -13,20 +13,26 @@ class TestBrownout(Test):
self.result = TestResult()
self.result.status = TestResult.StatusType.PASS
# step through the arm/disarm events in order, to see if they're symmetrical
# note: it seems landing detection isn't robust enough to rely upon here, so we'll only consider arm+disarm, not takeoff+land
evData = logdata.channels["EV"]["Id"]
orderedEVData = collections.OrderedDict(sorted(evData.dictData.items(), key=lambda t: t[0]))
isArmed = False
for line,ev in orderedEVData.iteritems():
if ev == 10:
isArmed = True
elif ev == 11:
isArmed = False
if "EV" in logdata.channels:
# step through the arm/disarm events in order, to see if they're symmetrical
# note: it seems landing detection isn't robust enough to rely upon here, so we'll only consider arm+disarm, not takeoff+land
evData = logdata.channels["EV"]["Id"]
orderedEVData = collections.OrderedDict(sorted(evData.dictData.items(), key=lambda t: t[0]))
isArmed = False
for line,ev in orderedEVData.iteritems():
if ev == 10:
isArmed = True
elif ev == 11:
isArmed = False
if "CTUN" not in logdata.channels:
self.result.status = TestResult.StatusType.UNKNOWN
self.result.statusMessage = "No CTUN log data"
return
# check for relative altitude at end
if "CTUN" in logdata.channels and "BarAlt" in logdata.channels["CTUN"]:
finalAlt = logdata.channels["CTUN"]["BarAlt"].getNearestValue(logdata.lineCount, lookForwards=False)
(finalAlt,finalAltLine) = logdata.channels["CTUN"]["BarAlt"].getNearestValue(logdata.lineCount, lookForwards=False)
finalAltMax = 3.0 # max alt offset that we'll still consider to be on the ground
if isArmed and finalAlt > finalAltMax:

View File

@ -13,7 +13,7 @@ class TestGPSGlitch(Test):
self.result.status = TestResult.StatusType.PASS
if "GPS" not in logdata.channels:
self.result.status = TestResult.StatusType.FAIL
self.result.status = TestResult.StatusType.UNKNOWN
self.result.statusMessage = "No GPS log data"
return

View File

@ -6,12 +6,16 @@ class TestPerformance(Test):
'''check performance monitoring messages (PM) for issues with slow loops, etc'''
def __init__(self):
self.name = "Performance"
self.name = "PM"
def run(self, logdata):
self.result = TestResult()
self.result.status = TestResult.StatusType.PASS
if logdata.vehicleType != "ArduCopter":
self.result.status = TestResult.StatusType.NA
return
# NOTE: we'll ignore MaxT altogether for now, it seems there are quite regularly one or two high values in there, even ignoring the ones expected after arm/disarm events
# gather info on arm/disarm lines, we will ignore the MaxT data from the first line found after each of these

View File

@ -12,7 +12,62 @@ class TestUnderpowered(Test):
self.result = TestResult()
self.result.status = TestResult.StatusType.PASS
if logdata.vehicleType == "ArduPlane":
if logdata.vehicleType != "ArduCopter":
self.result.status = TestResult.StatusType.NA
# TODO: implement test for underpowered copter (use log Randy provided)
if not "CTUN" in logdata.channels:
self.result.status = TestResult.StatusType.UNKNOWN
self.result.statusMessage = "No CTUN log data"
return
if not "ATT" in logdata.channels:
self.result.status = TestResult.StatusType.UNKNOWN
self.result.statusMessage = "No ATT log data"
return
# check for throttle (CTUN.ThrOut) above 700 for a chunk of time with copter not rising
highThrottleThreshold = 700
tiltThreshold = 20 # ignore high throttle when roll or tilt is above this value
climbThresholdWARN = 100
climbThresholdFAIL = 50
minSampleLength = 50
highThrottleSegments = []
# find any contiguous chunks where CTUN.ThrOut > highThrottleThreshold, ignore high throttle if tilt > tiltThreshold, and discard any segments shorter than minSampleLength
start = None
data = logdata.channels["CTUN"]["ThrOut"].listData
for i in range(0,len(data)):
(lineNumber,value) = data[i]
isBelowTiltThreshold = True
if value > highThrottleThreshold:
(roll,meh) = logdata.channels["ATT"]["Roll"].getNearestValue(lineNumber)
(pitch,meh) = logdata.channels["ATT"]["Pitch"].getNearestValue(lineNumber)
if (abs(roll) > tiltThreshold) or (abs(pitch) > tiltThreshold):
isBelowTiltThreshold = False
if (value > highThrottleThreshold) and isBelowTiltThreshold:
if start == None:
start = i
elif start != None:
if (i-start) > minSampleLength:
#print "Found high throttle chunk from line %d to %d (%d samples)" % (data[start][0],data[i][0],i-start+1)
highThrottleSegments.append((start,i))
start = None
# loop through each checking CTUN.CRate, if < 50 FAIL, if < 100 WARN
# TODO: we should filter CRate and use its slope rather than value for this test
for seg in highThrottleSegments:
(startLine,endLine) = (data[seg[0]][0], data[seg[1]][0])
avgClimbRate = logdata.channels["CTUN"]["CRate"].getSegment(startLine,endLine).avg()
avgThrOut = logdata.channels["CTUN"]["ThrOut"].getSegment(startLine,endLine).avg()
#print " Average CRate for this chunk is %.2f" % avgClimbRate
if avgClimbRate < climbThresholdFAIL:
self.result.status = TestResult.StatusType.FAIL
self.result.statusMessage = "Avg climb rate %.2f cm/s for throttle avg %d" % (avgClimbRate,avgThrOut)
return
if avgClimbRate < climbThresholdWARN:
self.result.status = TestResult.StatusType.WARN
self.result.statusMessage = "Avg climb rate %.2f cm/s for throttle avg %d" % (avgClimbRate,avgThrOut)

View File

@ -21,7 +21,6 @@ class TestVibration(Test):
aimRangeWarnZ = 2.0 # gravity +/- aim range
aimRangeFailZ = 5.0 # gravity +/- aim range
# TODO: in Pixhawk should we use IMU or IMU2?
if not "IMU" in logdata.channels:
self.result.status = TestResult.StatusType.UNKNOWN
self.result.statusMessage = "No IMU log data"