Tools/LogAnalyzer: apply Black and isort

This commit is contained in:
Luiz Georg 2022-07-18 11:09:04 -03:00 committed by Peter Barker
parent 545cf0504a
commit 42f202d0ff
22 changed files with 1388 additions and 841 deletions

View File

@ -5,29 +5,32 @@
#
from __future__ import print_function
import collections
import numpy
import bisect
import sys
import ctypes
import bisect
import collections
import ctypes
import sys
import numpy
from VehicleType import VehicleType, VehicleTypeString
class Format(object):
'''Data channel format as specified by the FMT lines in the log file'''
def __init__(self,msgType,msgLen,name,types,labels):
self.NAME = 'FMT'
def __init__(self, msgType, msgLen, name, types, labels):
self.NAME = 'FMT'
self.msgType = msgType
self.msgLen = msgLen
self.name = name
self.types = types
self.labels = labels.split(',')
self.msgLen = msgLen
self.name = name
self.types = types
self.labels = labels.split(',')
def __str__(self):
return "%8s %s" % (self.name, repr(self.labels))
@staticmethod
def trycastToFormatType(value,valueType):
def trycastToFormatType(value, valueType):
'''using format characters from libraries/DataFlash/DataFlash.h to cast strings to basic python int/float/string types
tries a cast, if it does not work, well, acceptable as the text logs do not match the format, e.g. MODE is expected to be int'''
try:
@ -43,8 +46,8 @@ class Format(object):
def to_class(self):
members = dict(
NAME = self.name,
labels = self.labels[:],
NAME=self.name,
labels=self.labels[:],
)
fieldtypes = [i for i in self.types]
@ -52,51 +55,56 @@ class Format(object):
# field access
for (label, _type) in zip(fieldlabels, fieldtypes):
def createproperty(name, format):
# extra scope for variable sanity
# scaling via _NAME and def NAME(self): return self._NAME / SCALE
propertyname = name
attributename = '_' + name
p = property(lambda x:getattr(x, attributename),
lambda x, v:setattr(x,attributename, Format.trycastToFormatType(v,format)))
p = property(
lambda x: getattr(x, attributename),
lambda x, v: setattr(x, attributename, Format.trycastToFormatType(v, format)),
)
members[propertyname] = p
members[attributename] = None
createproperty(label, _type)
# repr shows all values but the header
members['__repr__'] = lambda x: "<{cls} {data}>".format(cls=x.__class__.__name__, data = ' '.join(["{}:{}".format(k,getattr(x,'_'+k)) for k in x.labels]))
members['__repr__'] = lambda x: "<{cls} {data}>".format(
cls=x.__class__.__name__, data=' '.join(["{}:{}".format(k, getattr(x, '_' + k)) for k in x.labels])
)
def init(a, *x):
if len(x) != len(a.labels):
raise ValueError("Invalid Length")
#print(list(zip(a.labels, x)))
for (l,v) in zip(a.labels, x):
# print(list(zip(a.labels, x)))
for (l, v) in zip(a.labels, x):
try:
setattr(a, l, v)
except Exception as e:
print("{} {} {} failed".format(a,l,v))
print("{} {} {} failed".format(a, l, v))
print(e)
members['__init__'] = init
# finally, create the class
cls = type(\
'Log__{:s}'.format(self.name),
(object,),
members
)
#print(members)
cls = type('Log__{:s}'.format(self.name), (object,), members)
# print(members)
return cls
class logheader(ctypes.LittleEndianStructure):
_fields_ = [ \
_fields_ = [
('head1', ctypes.c_uint8),
('head2', ctypes.c_uint8),
('msgid', ctypes.c_uint8),
]
def __repr__(self):
return "<logheader head1=0x{self.head1:x} head2=0x{self.head2:x} msgid=0x{self.msgid:x} ({self.msgid})>".format(self=self)
return "<logheader head1=0x{self.head1:x} head2=0x{self.head2:x} msgid=0x{self.msgid:x} ({self.msgid})>".format(
self=self
)
class BinaryFormat(ctypes.LittleEndianStructure):
@ -116,10 +124,10 @@ class BinaryFormat(ctypes.LittleEndianStructure):
'n': ctypes.c_char * 4,
'N': ctypes.c_char * 16,
'Z': ctypes.c_char * 64,
'c': ctypes.c_int16,# * 100,
'C': ctypes.c_uint16,# * 100,
'e': ctypes.c_int32,# * 100,
'E': ctypes.c_uint32,# * 100,
'c': ctypes.c_int16, # * 100,
'C': ctypes.c_uint16, # * 100,
'e': ctypes.c_int32, # * 100,
'E': ctypes.c_uint32, # * 100,
'L': ctypes.c_int32,
'M': ctypes.c_uint8,
'q': ctypes.c_int64,
@ -134,7 +142,7 @@ class BinaryFormat(ctypes.LittleEndianStructure):
}
_packed_ = True
_fields_ = [ \
_fields_ = [
('head', logheader),
('type', ctypes.c_uint8),
('length', ctypes.c_uint8),
@ -142,17 +150,18 @@ class BinaryFormat(ctypes.LittleEndianStructure):
('types', ctypes.c_char * 16),
('labels', ctypes.c_char * 64),
]
def __repr__(self):
return "<{cls} {data}>".format(cls=self.__class__.__name__, data = ' '.join(["{}:{}".format(k,getattr(self,k)) for (k,_) in self._fields_[1:]]))
return "<{cls} {data}>".format(
cls=self.__class__.__name__,
data=' '.join(["{}:{}".format(k, getattr(self, k)) for (k, _) in self._fields_[1:]]),
)
def to_class(self):
labels = self.labels.decode('ascii') if self.labels else ""
members = dict(
NAME = self.name.decode('ascii'),
MSG = self.type,
SIZE = self.length,
labels = labels.split(","),
_pack_ = True)
NAME=self.name.decode('ascii'), MSG=self.type, SIZE=self.length, labels=labels.split(","), _pack_=True
)
if type(self.types[0]) == str:
fieldtypes = [i for i in self.types]
@ -163,53 +172,57 @@ class BinaryFormat(ctypes.LittleEndianStructure):
print("Broken FMT message for {} .. ignoring".format(self.name), file=sys.stderr)
return None
fields = [('head',logheader)]
fields = [('head', logheader)]
# field access
for (label, _type) in zip(fieldlabels, fieldtypes):
def createproperty(name, format):
# extra scope for variable sanity
# scaling via _NAME and def NAME(self): return self._NAME / SCALE
propertyname = name
attributename = '_' + name
scale = BinaryFormat.FIELD_SCALE.get(format, None)
def get_message_attribute(x):
ret = getattr(x, attributename)
if str(format) in ['Z','n','N']:
if str(format) in ['Z', 'n', 'N']:
ret = ret.decode('ascii')
return ret
p = property(get_message_attribute)
if scale is not None:
p = property(lambda x:getattr(x, attributename) / scale)
p = property(lambda x: getattr(x, attributename) / scale)
members[propertyname] = p
try:
fields.append((attributename, BinaryFormat.FIELD_FORMAT[format]))
except KeyError:
print('ERROR: Failed to add FMT type: {}, with format: {}'.format(attributename, format))
raise
createproperty(label, _type)
members['_fields_'] = fields
# repr shows all values but the header
members['__repr__'] = lambda x: "<{cls} {data}>".format(cls=x.__class__.__name__, data = ' '.join(["{}:{}".format(k,getattr(x,k)) for k in x.labels]))
members['__repr__'] = lambda x: "<{cls} {data}>".format(
cls=x.__class__.__name__, data=' '.join(["{}:{}".format(k, getattr(x, k)) for k in x.labels])
)
# finally, create the class
cls = type(\
'Log__%s' % self.name,
(ctypes.LittleEndianStructure,),
members
)
cls = type('Log__%s' % self.name, (ctypes.LittleEndianStructure,), members)
if ctypes.sizeof(cls) != cls.SIZE:
print("size mismatch for {} expected {} got {}".format(cls, ctypes.sizeof(cls), cls.SIZE), file=sys.stderr)
# for i in cls.labels:
# print("{} = {}".format(i,getattr(cls,'_'+i)))
# for i in cls.labels:
# print("{} = {}".format(i,getattr(cls,'_'+i)))
return None
return cls
BinaryFormat.SIZE = ctypes.sizeof(BinaryFormat)
class Channel(object):
'''storage for a single stream of data, i.e. all GPS.RelAlt values'''
@ -217,39 +230,50 @@ class Channel(object):
# TODO: store data as a scipy spline curve so we can more easily interpolate and sample the slope?
def __init__(self):
self.dictData = {} # dict of linenum->value # store dupe data in dict and list for now, until we decide which is the better way to go
self.listData = [] # list of (linenum,value) # store dupe data in dict and list for now, until we decide which is the better way to go
self.dictData = (
{}
) # dict of linenum->value # store dupe data in dict and list for now, until we decide which is the better way to go
self.listData = (
[]
) # list of (linenum,value) # store dupe data in dict and list for now, until we decide which is the better way to go
def getSegment(self, startLine, endLine):
'''returns a segment of this data (from startLine to endLine, inclusive) as a new Channel instance'''
segment = Channel()
segment.dictData = {k:v for k,v in self.dictData.items() if k >= startLine and k <= endLine}
segment.dictData = {k: v for k, v in self.dictData.items() if k >= startLine and k <= endLine}
return segment
def min(self):
return min(self.dictData.values())
def max(self):
return max(self.dictData.values())
def avg(self):
return numpy.mean(self.dictData.values())
def getNearestValueFwd(self, lineNumber):
'''Returns (value,lineNumber)'''
index = bisect.bisect_left(self.listData, (lineNumber,-99999))
while index<len(self.listData):
line = self.listData[index][0]
#print("Looking forwards for nearest value to line number %d, starting at line %d" % (lineNumber,line)) # TEMP
index = bisect.bisect_left(self.listData, (lineNumber, -99999))
while index < len(self.listData):
line = self.listData[index][0]
# print("Looking forwards for nearest value to line number %d, starting at line %d" % (lineNumber,line)) # TEMP
if line >= lineNumber:
return (self.listData[index][1],line)
return (self.listData[index][1], line)
index += 1
raise Exception("Error finding nearest value for line %d" % lineNumber)
def getNearestValueBack(self, lineNumber):
'''Returns (value,lineNumber)'''
index = bisect.bisect_left(self.listData, (lineNumber,-99999)) - 1
while index>=0:
line = self.listData[index][0]
#print("Looking backwards for nearest value to line number %d, starting at line %d" % (lineNumber,line)) # TEMP
index = bisect.bisect_left(self.listData, (lineNumber, -99999)) - 1
while index >= 0:
line = self.listData[index][0]
# print("Looking backwards for nearest value to line number %d, starting at line %d" % (lineNumber,line)) # TEMP
if line <= lineNumber:
return (self.listData[index][1],line)
return (self.listData[index][1], line)
index -= 1
raise Exception("Error finding nearest value for line %d" % lineNumber)
def getNearestValue(self, lineNumber, lookForwards=True):
'''find the nearest data value to the given lineNumber, defaults to first looking forwards. Returns (value,lineNumber)'''
if lookForwards:
@ -263,42 +287,49 @@ class Channel(object):
except:
return self.getNearestValueFwd(lineNumber)
raise Exception("Error finding nearest value for line %d" % lineNumber)
def getInterpolatedValue(self, lineNumber):
(prevValue,prevValueLine) = self.getNearestValue(lineNumber, lookForwards=False)
(nextValue,nextValueLine) = self.getNearestValue(lineNumber, lookForwards=True)
(prevValue, prevValueLine) = self.getNearestValue(lineNumber, lookForwards=False)
(nextValue, nextValueLine) = self.getNearestValue(lineNumber, lookForwards=True)
if prevValueLine == nextValueLine:
return prevValue
weight = (lineNumber-prevValueLine) / float(nextValueLine-prevValueLine)
return ((weight*prevValue) + ((1-weight)*nextValue))
weight = (lineNumber - prevValueLine) / float(nextValueLine - prevValueLine)
return (weight * prevValue) + ((1 - weight) * nextValue)
def getIndexOf(self, lineNumber):
'''returns the index within this channel's listData of the given lineNumber, or raises an Exception if not found'''
index = bisect.bisect_left(self.listData, (lineNumber,-99999))
#print("INDEX of line %d: %d" % (lineNumber,index))
#print("self.listData[index][0]: %d" % self.listData[index][0])
if (self.listData[index][0] == lineNumber):
index = bisect.bisect_left(self.listData, (lineNumber, -99999))
# print("INDEX of line %d: %d" % (lineNumber,index))
# print("self.listData[index][0]: %d" % self.listData[index][0])
if self.listData[index][0] == lineNumber:
return index
else:
raise Exception("Error finding index for line %d" % lineNumber)
class LogIterator:
'''Smart iterator that can move through a log by line number and maintain an index into the nearest values of all data channels'''
# TODO: LogIterator currently indexes the next available value rather than the nearest value, we should make it configurable between next/nearest
class LogIteratorSubValue:
'''syntactic sugar to allow access by LogIterator[lineLabel][dataLabel]'''
logdata = None
logdata = None
iterators = None
lineLabel = None
def __init__(self, logdata, iterators, lineLabel):
self.logdata = logdata
self.lineLabel = lineLabel
self.iterators = iterators
def __getitem__(self, dataLabel):
index = self.iterators[self.lineLabel][0]
return self.logdata.channels[self.lineLabel][dataLabel].listData[index][1]
iterators = {} # lineLabel -> (listIndex,lineNumber)
logdata = None
iterators = {} # lineLabel -> (listIndex,lineNumber)
logdata = None
currentLine = None
def __init__(self, logdata, lineNumber=0):
@ -308,10 +339,13 @@ class LogIterator:
if lineLabel in self.logdata.channels:
self.iterators[lineLabel] = ()
self.jump(lineNumber)
def __iter__(self):
return self
def __getitem__(self, lineLabel):
return LogIterator.LogIteratorSubValue(self.logdata, self.iterators, lineLabel)
def next(self):
'''increment iterator to next log line'''
self.currentLine += 1
@ -322,17 +356,20 @@ class LogIterator:
dataLabel = self.logdata.formats[lineLabel].labels[0]
(index, lineNumber) = self.iterators[lineLabel]
# if so, and it is not the last entry in the log, then increment the indices for all dataLabels under that lineLabel
if (self.currentLine > lineNumber) and (index < len(self.logdata.channels[lineLabel][dataLabel].listData)-1):
if (self.currentLine > lineNumber) and (
index < len(self.logdata.channels[lineLabel][dataLabel].listData) - 1
):
index += 1
lineNumber = self.logdata.channels[lineLabel][dataLabel].listData[index][0]
self.iterators[lineLabel] = (index,lineNumber)
self.iterators[lineLabel] = (index, lineNumber)
return self
def jump(self, lineNumber):
'''jump iterator to specified log line'''
self.currentLine = lineNumber
for lineLabel in self.iterators.keys():
dataLabel = self.logdata.formats[lineLabel].labels[0]
(value,lineNumber) = self.logdata.channels[lineLabel][dataLabel].getNearestValue(self.currentLine)
(value, lineNumber) = self.logdata.channels[lineLabel][dataLabel].getNearestValue(self.currentLine)
self.iterators[lineLabel] = (self.logdata.channels[lineLabel][dataLabel].getIndexOf(lineNumber), lineNumber)
@ -367,8 +404,8 @@ class DataflashLogHelper:
# TODO: implement noRCInputs handling when identifying stable loiter chunks, for now we're ignoring it
def chunkSizeCompare(chunk1, chunk2):
chunk1Len = chunk1[1]-chunk1[0]
chunk2Len = chunk2[1]-chunk2[0]
chunk1Len = chunk1[1] - chunk1[0]
chunk2Len = chunk2[1] - chunk2[0]
if chunk1Len == chunk2Len:
return 0
elif chunk1Len > chunk2Len:
@ -381,16 +418,20 @@ class DataflashLogHelper:
for i in range(len(od.keys())):
if od.values()[i][0] == "LOITER":
startLine = od.keys()[i]
endLine = None
if i == len(od.keys())-1:
endLine = None
if i == len(od.keys()) - 1:
endLine = logdata.lineCount
else:
endLine = od.keys()[i+1]-1
chunkTimeSeconds = (DataflashLogHelper.getTimeAtLine(logdata,endLine)-DataflashLogHelper.getTimeAtLine(logdata,startLine)+1) / 1000.0
endLine = od.keys()[i + 1] - 1
chunkTimeSeconds = (
DataflashLogHelper.getTimeAtLine(logdata, endLine)
- DataflashLogHelper.getTimeAtLine(logdata, startLine)
+ 1
) / 1000.0
if chunkTimeSeconds > minLengthSeconds:
chunks.append((startLine,endLine))
#print("LOITER chunk: %d to %d, %d lines" % (startLine,endLine,endLine-startLine+1))
#print(" (time %d to %d, %d seconds)" % (DataflashLogHelper.getTimeAtLine(logdata,startLine), DataflashLogHelper.getTimeAtLine(logdata,endLine), chunkTimeSeconds))
chunks.append((startLine, endLine))
# print("LOITER chunk: %d to %d, %d lines" % (startLine,endLine,endLine-startLine+1))
# print(" (time %d to %d, %d seconds)" % (DataflashLogHelper.getTimeAtLine(logdata,startLine), DataflashLogHelper.getTimeAtLine(logdata,endLine), chunkTimeSeconds))
chunks.sort(chunkSizeCompare)
return chunks
@ -400,7 +441,7 @@ class DataflashLogHelper:
# naive check for now, see if the throttle output was ever above 20%
throttleThreshold = 20
if logdata.vehicleType == VehicleType.Copter:
throttleThreshold = 200 # copter uses 0-1000, plane+rover use 0-100
throttleThreshold = 200 # copter uses 0-1000, plane+rover use 0-100
if "CTUN" in logdata.channels:
try:
maxThrottle = logdata.channels["CTUN"]["ThrOut"].max()
@ -422,25 +463,25 @@ class DataflashLog(object):
def __init__(self, logfile=None, format="auto", ignoreBadlines=False):
self.filename = None
self.vehicleType = None # from VehicleType enumeration; value derived from header
self.vehicleTypeString = None # set at same time has the enum value
self.vehicleType = None # from VehicleType enumeration; value derived from header
self.vehicleTypeString = None # set at same time has the enum value
self.firmwareVersion = ""
self.firmwareHash = ""
self.freeRAM = 0
self.hardwareType = "" # APM 1, APM 2, PX4, MPNG, etc What is VRBrain? BeagleBone, etc? Needs more testing
self.formats = {} # name -> Format
self.parameters = {} # token -> value
self.messages = {} # lineNum -> message
self.modeChanges = {} # lineNum -> (mode,value)
self.channels = {} # lineLabel -> {dataLabel:Channel}
self.filesizeKB = 0
self.firmwareHash = ""
self.freeRAM = 0
self.hardwareType = "" # APM 1, APM 2, PX4, MPNG, etc What is VRBrain? BeagleBone, etc? Needs more testing
self.formats = {} # name -> Format
self.parameters = {} # token -> value
self.messages = {} # lineNum -> message
self.modeChanges = {} # lineNum -> (mode,value)
self.channels = {} # lineLabel -> {dataLabel:Channel}
self.filesizeKB = 0
self.durationSecs = 0
self.lineCount = 0
self.lineCount = 0
self.skippedLines = 0
self.backpatch_these_modechanges = []
self.frame = None
self.frame = None
if logfile:
self.read(logfile, format, ignoreBadlines)
@ -450,7 +491,7 @@ class DataflashLog(object):
if self.vehicleType != VehicleType.Copter:
return None
motLabels = []
if "MOT" in self.formats: # not listed in PX4 log header for some reason?
if "MOT" in self.formats: # not listed in PX4 log header for some reason?
motLabels = self.formats["MOT"].labels
if "GGain" in motLabels:
return "tradheli"
@ -471,20 +512,20 @@ class DataflashLog(object):
"OCTA": 8,
"OCTA_QUAD": 8,
"DECA": 10,
# "HELI": 1,
# "HELI_DUAL": 2,
# "HELI": 1,
# "HELI_DUAL": 2,
"TRI": 3,
"SINGLE": 1,
"COAX": 2,
"TAILSITTER": 1,
"DODECA_HEXA" : 12,
"DODECA_HEXA": 12,
}
return motor_channels_for_frame[self.frame]
def read(self, logfile, format="auto", ignoreBadlines=False):
'''returns on successful log read (including bad lines if ignoreBadlines==True), will throw an Exception otherwise'''
# TODO: dataflash log parsing code is pretty hacky, should re-write more methodically
df_header = bytearray([0xa3, 0x95, 0x80, 0x80])
df_header = bytearray([0xA3, 0x95, 0x80, 0x80])
self.filename = logfile
if self.filename == '<stdin>':
f = sys.stdin
@ -498,7 +539,7 @@ class DataflashLog(object):
elif format == 'auto':
if self.filename == '<stdin>':
# assuming TXT format
# raise ValueError("Invalid log format for stdin: {}".format(format))
# raise ValueError("Invalid log format for stdin: {}".format(format))
head = ""
else:
head = f.read(4)
@ -513,22 +554,22 @@ class DataflashLog(object):
numBytes, lineNumber = self.read_text(f, ignoreBadlines)
# gather some general stats about the log
self.lineCount = lineNumber
self.lineCount = lineNumber
self.filesizeKB = numBytes / 1024.0
# TODO: switch duration calculation to use TimeMS values rather than GPS timestemp
if "GPS" in self.channels:
# the GPS time label changed at some point, need to handle both
timeLabel = None
for i in 'TimeMS','TimeUS','Time':
for i in 'TimeMS', 'TimeUS', 'Time':
if i in self.channels["GPS"]:
timeLabel = i
break
firstTimeGPS = int(self.channels["GPS"][timeLabel].listData[0][1])
lastTimeGPS = int(self.channels["GPS"][timeLabel].listData[-1][1])
lastTimeGPS = int(self.channels["GPS"][timeLabel].listData[-1][1])
if timeLabel == 'TimeUS':
firstTimeGPS /= 1000
lastTimeGPS /= 1000
self.durationSecs = (lastTimeGPS-firstTimeGPS) / 1000
self.durationSecs = (lastTimeGPS - firstTimeGPS) / 1000
# TODO: calculate logging rate based on timestamps
# ...
@ -537,7 +578,7 @@ class DataflashLog(object):
"ArduCopter": VehicleType.Copter,
"APM:Copter": VehicleType.Copter,
"ArduPlane": VehicleType.Plane,
"ArduRover": VehicleType.Rover
"ArduRover": VehicleType.Rover,
}
# takes the vehicle type supplied via "MSG" and sets vehicleType from
@ -552,26 +593,26 @@ class DataflashLog(object):
def handleModeChange(self, lineNumber, e):
if self.vehicleType == VehicleType.Copter:
modes = {
0:'STABILIZE',
1:'ACRO',
2:'ALT_HOLD',
3:'AUTO',
4:'GUIDED',
5:'LOITER',
6:'RTL',
7:'CIRCLE',
9:'LAND',
10:'OF_LOITER',
11:'DRIFT',
13:'SPORT',
14:'FLIP',
15:'AUTOTUNE',
16:'POSHOLD',
17:'BRAKE',
18:'THROW',
19:'AVOID_ADSB',
20:'GUIDED_NOGPS',
21:'SMART_RTL',
0: 'STABILIZE',
1: 'ACRO',
2: 'ALT_HOLD',
3: 'AUTO',
4: 'GUIDED',
5: 'LOITER',
6: 'RTL',
7: 'CIRCLE',
9: 'LAND',
10: 'OF_LOITER',
11: 'DRIFT',
13: 'SPORT',
14: 'FLIP',
15: 'AUTOTUNE',
16: 'POSHOLD',
17: 'BRAKE',
18: 'THROW',
19: 'AVOID_ADSB',
20: 'GUIDED_NOGPS',
21: 'SMART_RTL',
}
try:
if hasattr(e, 'ThrCrs'):
@ -599,7 +640,9 @@ class DataflashLog(object):
else:
# if you've gotten to here the chances are we don't
# know what vehicle you're flying...
raise Exception("Unknown log type for MODE line vehicletype=({}) line=({})".format(self.vehicleTypeString, repr(e)))
raise Exception(
"Unknown log type for MODE line vehicletype=({}) line=({})".format(self.vehicleTypeString, repr(e))
)
def backPatchModeChanges(self):
for (lineNumber, e) in self.backpatch_these_modechanges:
@ -611,8 +654,8 @@ class DataflashLog(object):
def process(self, lineNumber, e):
if e.NAME == 'FMT':
cls = e.to_class()
if cls is not None: # FMT messages can be broken ...
if hasattr(e, 'type') and e.type not in self._formats: # binary log specific
if cls is not None: # FMT messages can be broken ...
if hasattr(e, 'type') and e.type not in self._formats: # binary log specific
self._formats[e.type] = cls
if cls.NAME not in self.formats:
self.formats[cls.NAME] = cls
@ -625,7 +668,7 @@ class DataflashLog(object):
self.set_frame(tokens[1])
if not self.vehicleType:
try:
self.set_vehicleType_from_MSG_vehicle(tokens[0]);
self.set_vehicleType_from_MSG_vehicle(tokens[0])
except ValueError:
return
self.backPatchModeChanges()
@ -636,7 +679,7 @@ class DataflashLog(object):
self.messages[lineNumber] = e.Message
elif e.NAME == "MODE":
if self.vehicleType is None:
self.backpatch_these_modechanges.append( (lineNumber, e) )
self.backpatch_these_modechanges.append((lineNumber, e))
else:
self.handleModeChange(lineNumber, e)
# anything else must be the log data
@ -656,9 +699,8 @@ class DataflashLog(object):
channel.dictData[lineNumber] = value
channel.listData.append((lineNumber, value))
def read_text(self, f, ignoreBadlines):
self.formats = {'FMT':Format}
self.formats = {'FMT': Format}
lineNumber = 0
numBytes = 0
knownHardwareTypes = ["APM", "PX4", "MPNG"]
@ -666,32 +708,36 @@ class DataflashLog(object):
lineNumber = lineNumber + 1
numBytes += len(line) + 1
try:
#print("Reading line: %d" % lineNumber)
# print("Reading line: %d" % lineNumber)
line = line.strip('\n\r')
tokens = line.split(', ')
# first handle the log header lines
if line == " Ready to drive." or line == " Ready to FLY.":
continue
if line == "----------------------------------------": # present in pre-3.0 logs
raise Exception("Log file seems to be in the older format (prior to self-describing logs), which isn't supported")
raise Exception(
"Log file seems to be in the older format (prior to self-describing logs), which isn't supported"
)
if len(tokens) == 1:
tokens2 = line.split(' ')
if line == "":
pass
elif len(tokens2) == 1 and tokens2[0].isdigit(): # log index
elif len(tokens2) == 1 and tokens2[0].isdigit(): # log index
pass
elif len(tokens2) == 3 and tokens2[0] == "Free" and tokens2[1] == "RAM:":
self.freeRAM = int(tokens2[2])
elif tokens2[0] in knownHardwareTypes:
self.hardwareType = line # not sure if we can parse this more usefully, for now only need to report it back verbatim
elif (len(tokens2) == 2 or len(tokens2) == 3) and tokens2[1][0].lower() == "v": # e.g. ArduCopter V3.1 (5c6503e2)
self.hardwareType = line # not sure if we can parse this more usefully, for now only need to report it back verbatim
elif (len(tokens2) == 2 or len(tokens2) == 3) and tokens2[1][
0
].lower() == "v": # e.g. ArduCopter V3.1 (5c6503e2)
try:
self.set_vehicleType_from_MSG_vehicle(tokens2[0])
except ValueError:
pass
self.firmwareVersion = tokens2[1]
if len(tokens2) == 3:
self.firmwareHash = tokens2[2][1:-1]
self.firmwareHash = tokens2[2][1:-1]
else:
errorMsg = "Error parsing line %d of log file: %s" % (lineNumber, self.filename)
if ignoreBadlines:
@ -707,8 +753,10 @@ class DataflashLog(object):
except Exception as e:
print("BAD LINE: " + str(line), file=sys.stderr)
if not ignoreBadlines:
raise Exception("Error parsing line %d of log file %s - %s" % (lineNumber,self.filename,e.args[0]))
return (numBytes,lineNumber)
raise Exception(
"Error parsing line %d of log file %s - %s" % (lineNumber, self.filename, e.args[0])
)
return (numBytes, lineNumber)
def read_binary(self, f, ignoreBadlines):
lineNumber = 0
@ -718,22 +766,27 @@ class DataflashLog(object):
if e is None:
continue
numBytes += e.SIZE
# print(e)
# print(e)
self.process(lineNumber, e)
return (numBytes,lineNumber)
return (numBytes, lineNumber)
def _read_binary(self, f, ignoreBadlines):
self._formats = {128:BinaryFormat}
self._formats = {128: BinaryFormat}
data = bytearray(f.read())
offset = 0
while len(data) > offset + ctypes.sizeof(logheader):
h = logheader.from_buffer(data, offset)
if not (h.head1 == 0xa3 and h.head2 == 0x95):
if not (h.head1 == 0xA3 and h.head2 == 0x95):
if ignoreBadlines == False:
raise ValueError(h)
else:
if h.head1 == 0xff and h.head2 == 0xff and h.msgid == 0xff:
print("Assuming EOF due to dataflash block tail filled with \\xff... (offset={off})".format(off=offset), file=sys.stderr)
if h.head1 == 0xFF and h.head2 == 0xFF and h.msgid == 0xFF:
print(
"Assuming EOF due to dataflash block tail filled with \\xff... (offset={off})".format(
off=offset
),
file=sys.stderr,
)
break
offset += 1
continue
@ -745,7 +798,11 @@ class DataflashLog(object):
try:
e = typ.from_buffer(data, offset)
except:
print("data:{} offset:{} size:{} sizeof:{} sum:{}".format(len(data),offset,typ.SIZE,ctypes.sizeof(typ),offset+typ.SIZE))
print(
"data:{} offset:{} size:{} sizeof:{} sum:{}".format(
len(data), offset, typ.SIZE, ctypes.sizeof(typ), offset + typ.SIZE
)
)
raise
offset += typ.SIZE
else:

View File

@ -19,36 +19,40 @@
from __future__ import print_function
import DataflashLog
import pprint # temp
import imp
import glob
import inspect
import os, sys
import argparse
import datetime
import glob
import imp
import inspect
import os
import pprint # temp
import sys
import time
from xml.sax.saxutils import escape
import DataflashLog
from VehicleType import VehicleType
class TestResult(object):
'''all tests return a standardized result type'''
class StatusType:
# NA means not applicable for this log (e.g. copter tests against a plane log), UNKNOWN means it is missing data required for the test
GOOD, FAIL, WARN, UNKNOWN, NA = range(5)
status = None
statusMessage = "" # can be multi-line
statusMessage = "" # can be multi-line
class Test(object):
'''base class to be inherited by log tests. Each test should be quite granular so we have lots of small tests with clear results'''
def __init__(self):
self.name = ""
self.result = None # will be an instance of TestResult after being run
self.name = ""
self.result = None # will be an instance of TestResult after being run
self.execTime = None
self.enable = True
self.enable = True
def run(self, logdata, verbose=False):
pass
@ -56,17 +60,18 @@ class Test(object):
class TestSuite(object):
'''registers test classes, loading using a basic plugin architecture, and can run them all in one run() operation'''
def __init__(self):
self.tests = []
self.tests = []
self.logfile = None
self.logdata = None
self.logdata = None
# dynamically load in Test subclasses from the 'tests' folder
# to prevent one being loaded, move it out of that folder, or set that test's .enable attribute to False
dirName = os.path.dirname(os.path.abspath(__file__))
testScripts = glob.glob(dirName + '/tests/*.py')
testClasses = []
for script in testScripts:
m = imp.load_source("m",script)
m = imp.load_source("m", script)
for name, obj in inspect.getmembers(m, inspect.isclass):
if name not in testClasses and inspect.getsourcefile(obj) == script:
testClasses.append(name)
@ -90,7 +95,7 @@ class TestSuite(object):
startTime = time.time()
test.run(self.logdata, verbose) # RUN THE TEST
endTime = time.time()
test.execTime = 1000 * (endTime-startTime)
test.execTime = 1000 * (endTime - startTime)
def outputPlainText(self, outputStats):
'''output test results in plain text'''
@ -114,7 +119,7 @@ class TestSuite(object):
if not test.enable:
continue
statusMessageFirstLine = test.result.statusMessage.strip('\n\r').split('\n')[0]
statusMessageExtra = test.result.statusMessage.strip('\n\r').split('\n')[1:]
statusMessageExtra = test.result.statusMessage.strip('\n\r').split('\n')[1:]
execTime = ""
if outputStats:
execTime = " (%6.2fms)" % (test.execTime)
@ -129,12 +134,14 @@ class TestSuite(object):
continue
else:
print(" %20s: UNKNOWN %-55s%s" % (test.name, statusMessageFirstLine, execTime))
#if statusMessageExtra:
# if statusMessageExtra:
for line in statusMessageExtra:
print(" %29s %s" % ("",line))
print(" %29s %s" % ("", line))
print('\n')
print('The Log Analyzer is currently BETA code.\nFor any support or feedback on the log analyzer please email Andrew Chapman (amchapman@gmail.com)')
print(
'The Log Analyzer is currently BETA code.\nFor any support or feedback on the log analyzer please email Andrew Chapman (amchapman@gmail.com)'
)
print('\n')
def outputXML(self, xmlFile):
@ -151,18 +158,17 @@ class TestSuite(object):
sys.stderr.write("Error opening output xml file: %s" % xmlFile)
sys.exit(1)
# output header info
xml.write("<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n")
xml.write("<loganalysis>\n")
xml.write("<header>\n")
xml.write(" <logfile>" + escape(self.logfile) + "</logfile>\n")
xml.write(" <sizekb>" + escape(repr(self.logdata.filesizeKB)) + "</sizekb>\n")
xml.write(" <logfile>" + escape(self.logfile) + "</logfile>\n")
xml.write(" <sizekb>" + escape(repr(self.logdata.filesizeKB)) + "</sizekb>\n")
xml.write(" <sizelines>" + escape(repr(self.logdata.lineCount)) + "</sizelines>\n")
xml.write(" <duration>" + escape(str(datetime.timedelta(seconds=self.logdata.durationSecs))) + "</duration>\n")
xml.write(" <duration>" + escape(str(datetime.timedelta(seconds=self.logdata.durationSecs))) + "</duration>\n")
xml.write(" <vehicletype>" + escape(self.logdata.vehicleTypeString) + "</vehicletype>\n")
if self.logdata.vehicleType == VehicleType.Copter and self.logdata.getCopterType():
xml.write(" <coptertype>" + escape(self.logdata.getCopterType()) + "</coptertype>\n")
xml.write(" <coptertype>" + escape(self.logdata.getCopterType()) + "</coptertype>\n")
xml.write(" <firmwareversion>" + escape(self.logdata.firmwareVersion) + "</firmwareversion>\n")
xml.write(" <firmwarehash>" + escape(self.logdata.firmwareHash) + "</firmwarehash>\n")
xml.write(" <hardwaretype>" + escape(self.logdata.hardwareType) + "</hardwaretype>\n")
@ -173,7 +179,7 @@ class TestSuite(object):
# output parameters
xml.write("<params>\n")
for param, value in self.logdata.parameters.items():
xml.write(" <param name=\"%s\" value=\"%s\" />\n" % (param,escape(repr(value))))
xml.write(" <param name=\"%s\" value=\"%s\" />\n" % (param, escape(repr(value))))
xml.write("</params>\n")
# output test results
@ -217,21 +223,47 @@ def main():
# deal with command line arguments
parser = argparse.ArgumentParser(description='Analyze an APM Dataflash log for known issues')
parser.add_argument('logfile', type=argparse.FileType('r'), help='path to Dataflash log file (or - for stdin)')
parser.add_argument('-f', '--format', metavar='', type=str, action='store', choices=['bin','log','auto'], default='auto', help='log file format: \'bin\',\'log\' or \'auto\'')
parser.add_argument('-q', '--quiet', metavar='', action='store_const', const=True, help='quiet mode, do not print results')
parser.add_argument('-p', '--profile', metavar='', action='store_const', const=True, help='output performance profiling data')
parser.add_argument('-s', '--skip_bad', metavar='', action='store_const', const=True, help='skip over corrupt dataflash lines')
parser.add_argument('-e', '--empty', metavar='', action='store_const', const=True, help='run an initial check for an empty log')
parser.add_argument('-x', '--xml', type=str, metavar='XML file', nargs='?', const='', default='', help='write output to specified XML file (or - for stdout)')
parser.add_argument(
'-f',
'--format',
metavar='',
type=str,
action='store',
choices=['bin', 'log', 'auto'],
default='auto',
help='log file format: \'bin\',\'log\' or \'auto\'',
)
parser.add_argument(
'-q', '--quiet', metavar='', action='store_const', const=True, help='quiet mode, do not print results'
)
parser.add_argument(
'-p', '--profile', metavar='', action='store_const', const=True, help='output performance profiling data'
)
parser.add_argument(
'-s', '--skip_bad', metavar='', action='store_const', const=True, help='skip over corrupt dataflash lines'
)
parser.add_argument(
'-e', '--empty', metavar='', action='store_const', const=True, help='run an initial check for an empty log'
)
parser.add_argument(
'-x',
'--xml',
type=str,
metavar='XML file',
nargs='?',
const='',
default='',
help='write output to specified XML file (or - for stdout)',
)
parser.add_argument('-v', '--verbose', metavar='', action='store_const', const=True, help='verbose output')
args = parser.parse_args()
# load the log
startTime = time.time()
logdata = DataflashLog.DataflashLog(args.logfile.name, format=args.format, ignoreBadlines=args.skip_bad) # read log
logdata = DataflashLog.DataflashLog(args.logfile.name, format=args.format, ignoreBadlines=args.skip_bad) # read log
endTime = time.time()
if args.profile:
print("Log file read time: %.2f seconds" % (endTime-startTime))
print("Log file read time: %.2f seconds" % (endTime - startTime))
# check for empty log if requested
if args.empty:
@ -240,13 +272,13 @@ def main():
sys.stderr.write("Empty log file: %s, %s" % (logdata.filename, emptyErr))
sys.exit(1)
#run the tests, and gather timings
# run the tests, and gather timings
testSuite = TestSuite()
startTime = time.time()
testSuite.run(logdata, args.verbose) # run tests
endTime = time.time()
if args.profile:
print("Test suite run time: %.2f seconds" % (endTime-startTime))
print("Test suite run time: %.2f seconds" % (endTime - startTime))
# deal with output
if not args.quiet:
@ -259,4 +291,3 @@ def main():
if __name__ == "__main__":
main()

File diff suppressed because one or more lines are too long

View File

@ -1,12 +1,9 @@
class VehicleType():
class VehicleType:
Plane = 17
Copter = 23
Rover = 37
# these should really be "Plane", "Copter" and "Rover", but many
# things use these values as triggers in their code:
VehicleTypeString = {
17: "ArduPlane",
23: "ArduCopter",
37: "ArduRover"
}
VehicleTypeString = {17: "ArduPlane", 23: "ArduCopter", 37: "ArduRover"}

View File

@ -1,25 +1,30 @@
from LogAnalyzer import Test, TestResult
import DataflashLog
from LogAnalyzer import Test, TestResult
from VehicleType import VehicleType
# from ArduCopter/defines.h
AUTOTUNE_INITIALISED = 30
AUTOTUNE_OFF = 31
AUTOTUNE_RESTART = 32
AUTOTUNE_SUCCESS = 33
AUTOTUNE_FAILED = 34
AUTOTUNE_REACHED_LIMIT = 35
AUTOTUNE_PILOT_TESTING = 36
AUTOTUNE_SAVEDGAINS = 37
AUTOTUNE_INITIALISED = 30
AUTOTUNE_OFF = 31
AUTOTUNE_RESTART = 32
AUTOTUNE_SUCCESS = 33
AUTOTUNE_FAILED = 34
AUTOTUNE_REACHED_LIMIT = 35
AUTOTUNE_PILOT_TESTING = 36
AUTOTUNE_SAVEDGAINS = 37
AUTOTUNE_EVENTS = frozenset(
[
AUTOTUNE_INITIALISED,
AUTOTUNE_OFF,
AUTOTUNE_RESTART,
AUTOTUNE_SUCCESS,
AUTOTUNE_FAILED,
AUTOTUNE_REACHED_LIMIT,
AUTOTUNE_PILOT_TESTING,
AUTOTUNE_SAVEDGAINS,
]
)
AUTOTUNE_EVENTS = frozenset([AUTOTUNE_INITIALISED,
AUTOTUNE_OFF,
AUTOTUNE_RESTART,
AUTOTUNE_SUCCESS,
AUTOTUNE_FAILED,
AUTOTUNE_REACHED_LIMIT,
AUTOTUNE_PILOT_TESTING,
AUTOTUNE_SAVEDGAINS])
class TestAutotune(Test):
'''test for autotune success (copter only)'''
@ -27,25 +32,29 @@ class TestAutotune(Test):
class AutotuneSession(object):
def __init__(self, events):
self.events = events
@property
def linestart(self):
return self.events[0][0]
@property
def linestop(self):
return self.events[-1][0]
@property
def success(self):
return AUTOTUNE_SUCCESS in [i for _,i in self.events]
return AUTOTUNE_SUCCESS in [i for _, i in self.events]
@property
def failure(self):
return AUTOTUNE_FAILED in [i for _,i in self.events]
return AUTOTUNE_FAILED in [i for _, i in self.events]
@property
def limit(self):
return AUTOTUNE_REACHED_LIMIT in [i for _,i in self.events]
return AUTOTUNE_REACHED_LIMIT in [i for _, i in self.events]
def __repr__(self):
return "<AutotuneSession {}-{}>".format(self.linestart,self.linestop)
return "<AutotuneSession {}-{}>".format(self.linestart, self.linestop)
def __init__(self):
Test.__init__(self)
@ -59,7 +68,7 @@ class TestAutotune(Test):
self.result.status = TestResult.StatusType.NA
return
for i in ['EV','ATDE','ATUN']:
for i in ['EV', 'ATDE', 'ATUN']:
r = False
if not i in logdata.channels:
self.result.status = TestResult.StatusType.UNKNOWN
@ -72,8 +81,8 @@ class TestAutotune(Test):
attempts = []
j = None
for i in range(0,len(events)):
line,ev = events[i]
for i in range(0, len(events)):
line, ev = events[i]
if ev == AUTOTUNE_INITIALISED:
if j is not None:
attempts.append(TestAutotune.AutotuneSession(events[j:i]))
@ -84,43 +93,40 @@ class TestAutotune(Test):
attempts.append(TestAutotune.AutotuneSession(events[j:]))
for a in attempts:
# this should not be necessary!
def class_from_channel(c):
members = dict({'__init__':lambda x: setattr(x,i,None) for i in logdata.channels[c]})
cls = type(\
'Channel__{:s}'.format(c),
(object,),
members
)
return cls
# this should not be necessary!
def class_from_channel(c):
members = dict({'__init__': lambda x: setattr(x, i, None) for i in logdata.channels[c]})
cls = type('Channel__{:s}'.format(c), (object,), members)
return cls
# last wins
if a.success:
self.result.status = TestResult.StatusType.GOOD
s = "[+]"
elif a.failure:
self.result.status = TestResult.StatusType.FAIL
s = "[-]"
else:
self.result.status = TestResult.StatusType.UNKNOWN
s = "[?]"
# last wins
if a.success:
self.result.status = TestResult.StatusType.GOOD
s = "[+]"
elif a.failure:
self.result.status = TestResult.StatusType.FAIL
s = "[-]"
else:
self.result.status = TestResult.StatusType.UNKNOWN
s = "[?]"
s += " Autotune {}-{}\n".format(a.linestart,a.linestop)
self.result.statusMessage += s
s += " Autotune {}-{}\n".format(a.linestart, a.linestop)
self.result.statusMessage += s
if verbose:
linenext = a.linestart + 1
while linenext < a.linestop:
try:
line = logdata.channels['ATUN']['RateMax'].getNearestValueFwd(linenext)[1]
if line > a.linestop:
break
except:
if verbose:
linenext = a.linestart + 1
while linenext < a.linestop:
try:
line = logdata.channels['ATUN']['RateMax'].getNearestValueFwd(linenext)[1]
if line > a.linestop:
break
atun = class_from_channel('ATUN')()
for key in logdata.channels['ATUN']:
setattr(atun, key, logdata.channels['ATUN'][key].getNearestValueFwd(linenext)[0])
linenext = logdata.channels['ATUN'][key].getNearestValueFwd(linenext)[1] + 1
self.result.statusMessage += 'ATUN Axis:{atun.Axis} TuneStep:{atun.TuneStep} RateMin:{atun.RateMin:5.0f} RateMax:{atun.RateMax:5.0f} RPGain:{atun.RPGain:1.4f} RDGain:{atun.RDGain:1.4f} SPGain:{atun.SPGain:1.1f} (@line:{l})\n'.format(l=linenext,s=s, atun=atun)
self.result.statusMessage += '\n'
except:
break
atun = class_from_channel('ATUN')()
for key in logdata.channels['ATUN']:
setattr(atun, key, logdata.channels['ATUN'][key].getNearestValueFwd(linenext)[0])
linenext = logdata.channels['ATUN'][key].getNearestValueFwd(linenext)[1] + 1
self.result.statusMessage += 'ATUN Axis:{atun.Axis} TuneStep:{atun.TuneStep} RateMin:{atun.RateMin:5.0f} RateMax:{atun.RateMax:5.0f} RPGain:{atun.RPGain:1.4f} RDGain:{atun.RDGain:1.4f} SPGain:{atun.SPGain:1.1f} (@line:{l})\n'.format(
l=linenext, s=s, atun=atun
)
self.result.statusMessage += '\n'

View File

@ -1,44 +1,47 @@
from LogAnalyzer import Test,TestResult
import DataflashLog
import collections
import DataflashLog
from LogAnalyzer import Test, TestResult
class TestBrownout(Test):
'''test for a log that has been truncated in flight'''
'''test for a log that has been truncated in flight'''
def __init__(self):
Test.__init__(self)
self.name = "Brownout"
def __init__(self):
Test.__init__(self)
self.name = "Brownout"
def run(self, logdata, verbose):
self.result = TestResult()
self.result.status = TestResult.StatusType.GOOD
def run(self, logdata, verbose):
self.result = TestResult()
self.result.status = TestResult.StatusType.GOOD
isArmed = False
# FIXME: cope with LOG_ARM_DISARM_MSG message
if "EV" in logdata.channels:
# step through the arm/disarm events in order, to see if they're symmetrical
# note: it seems landing detection isn't robust enough to rely upon here, so we'll only consider arm+disarm, not takeoff+land
for line,ev in logdata.channels["EV"]["Id"].listData:
if ev == 10:
isArmed = True
elif ev == 11:
isArmed = False
isArmed = False
# FIXME: cope with LOG_ARM_DISARM_MSG message
if "EV" in logdata.channels:
# step through the arm/disarm events in order, to see if they're symmetrical
# note: it seems landing detection isn't robust enough to rely upon here, so we'll only consider arm+disarm, not takeoff+land
for line, ev in logdata.channels["EV"]["Id"].listData:
if ev == 10:
isArmed = True
elif ev == 11:
isArmed = False
if "CTUN" not in logdata.channels:
self.result.status = TestResult.StatusType.UNKNOWN
self.result.statusMessage = "No CTUN log data"
return
if "CTUN" not in logdata.channels:
self.result.status = TestResult.StatusType.UNKNOWN
self.result.statusMessage = "No CTUN log data"
return
if "BarAlt" in logdata.channels['CTUN']:
self.ctun_baralt_att = 'BarAlt'
else:
self.ctun_baralt_att = 'BAlt'
if "BarAlt" in logdata.channels['CTUN']:
self.ctun_baralt_att = 'BarAlt'
else:
self.ctun_baralt_att = 'BAlt'
# check for relative altitude at end
(finalAlt,finalAltLine) = logdata.channels["CTUN"][self.ctun_baralt_att].getNearestValue(logdata.lineCount, lookForwards=False)
# check for relative altitude at end
(finalAlt, finalAltLine) = logdata.channels["CTUN"][self.ctun_baralt_att].getNearestValue(
logdata.lineCount, lookForwards=False
)
finalAltMax = 3.0 # max alt offset that we'll still consider to be on the ground
if isArmed and finalAlt > finalAltMax:
self.result.status = TestResult.StatusType.FAIL
self.result.statusMessage = "Truncated Log? Ends while armed at altitude %.2fm" % finalAlt
finalAltMax = 3.0 # max alt offset that we'll still consider to be on the ground
if isArmed and finalAlt > finalAltMax:
self.result.status = TestResult.StatusType.FAIL
self.result.statusMessage = "Truncated Log? Ends while armed at altitude %.2fm" % finalAlt

View File

@ -1,8 +1,8 @@
from LogAnalyzer import Test,TestResult
import DataflashLog
from functools import reduce
import math
from functools import reduce
import DataflashLog
from LogAnalyzer import Test, TestResult
class TestCompass(Test):
@ -17,7 +17,7 @@ class TestCompass(Test):
self.result.status = TestResult.StatusType.GOOD
def vec_len(x):
return math.sqrt(x[0]**2+x[1]**2+x[2]**2)
return math.sqrt(x[0] ** 2 + x[1] ** 2 + x[2] ** 2)
def FAIL():
self.result.status = TestResult.StatusType.FAIL
@ -32,30 +32,46 @@ class TestCompass(Test):
param_offsets = (
logdata.parameters["COMPASS_OFS_X"],
logdata.parameters["COMPASS_OFS_Y"],
logdata.parameters["COMPASS_OFS_Z"]
)
logdata.parameters["COMPASS_OFS_Z"],
)
if vec_len(param_offsets) > failOffset:
FAIL()
self.result.statusMessage = "FAIL: Large compass offset params (X:%.2f, Y:%.2f, Z:%.2f)\n" % (param_offsets[0],param_offsets[1],param_offsets[2])
self.result.statusMessage = "FAIL: Large compass offset params (X:%.2f, Y:%.2f, Z:%.2f)\n" % (
param_offsets[0],
param_offsets[1],
param_offsets[2],
)
elif vec_len(param_offsets) > warnOffset:
WARN()
self.result.statusMessage = "WARN: Large compass offset params (X:%.2f, Y:%.2f, Z:%.2f)\n" % (param_offsets[0],param_offsets[1],param_offsets[2])
self.result.statusMessage = "WARN: Large compass offset params (X:%.2f, Y:%.2f, Z:%.2f)\n" % (
param_offsets[0],
param_offsets[1],
param_offsets[2],
)
if "MAG" in logdata.channels:
max_log_offsets = zip(
map(lambda x: x[1],logdata.channels["MAG"]["OfsX"].listData),
map(lambda x: x[1],logdata.channels["MAG"]["OfsY"].listData),
map(lambda x: x[1],logdata.channels["MAG"]["OfsZ"].listData)
)
max_log_offsets = reduce(lambda x,y: x if vec_len(x) > vec_len(y) else y, max_log_offsets)
map(lambda x: x[1], logdata.channels["MAG"]["OfsX"].listData),
map(lambda x: x[1], logdata.channels["MAG"]["OfsY"].listData),
map(lambda x: x[1], logdata.channels["MAG"]["OfsZ"].listData),
)
max_log_offsets = reduce(lambda x, y: x if vec_len(x) > vec_len(y) else y, max_log_offsets)
if vec_len(max_log_offsets) > failOffset:
FAIL()
self.result.statusMessage += "FAIL: Large compass offset in MAG data (X:%.2f, Y:%.2f, Z:%.2f)\n" % (max_log_offsets[0],max_log_offsets[1],max_log_offsets[2])
self.result.statusMessage += "FAIL: Large compass offset in MAG data (X:%.2f, Y:%.2f, Z:%.2f)\n" % (
max_log_offsets[0],
max_log_offsets[1],
max_log_offsets[2],
)
elif vec_len(max_log_offsets) > warnOffset:
WARN()
self.result.statusMessage += "WARN: Large compass offset in MAG data (X:%.2f, Y:%.2f, Z:%.2f)\n" % (max_log_offsets[0],max_log_offsets[1],max_log_offsets[2])
self.result.statusMessage += "WARN: Large compass offset in MAG data (X:%.2f, Y:%.2f, Z:%.2f)\n" % (
max_log_offsets[0],
max_log_offsets[1],
max_log_offsets[2],
)
# check for mag field length change, and length outside of recommended range
if "MAG" in logdata.channels:
@ -66,54 +82,77 @@ class TestCompass(Test):
index = 0
length = len(logdata.channels["MAG"]["MagX"].listData)
magField = []
(minMagField, maxMagField) = (None,None)
(minMagFieldLine, maxMagFieldLine) = (None,None)
(minMagField, maxMagField) = (None, None)
(minMagFieldLine, maxMagFieldLine) = (None, None)
zerosFound = False
while index<length:
while index < length:
mx = logdata.channels["MAG"]["MagX"].listData[index][1]
my = logdata.channels["MAG"]["MagY"].listData[index][1]
mz = logdata.channels["MAG"]["MagZ"].listData[index][1]
if ((mx==0) and (my==0) and (mz==0)): # sometimes they're zero, not sure why, same reason as why we get NaNs as offsets?
if (
(mx == 0) and (my == 0) and (mz == 0)
): # sometimes they're zero, not sure why, same reason as why we get NaNs as offsets?
zerosFound = True
else:
mf = math.sqrt(mx*mx + my*my + mz*mz)
mf = math.sqrt(mx * mx + my * my + mz * mz)
magField.append(mf)
if mf<minMagField:
if mf < minMagField:
minMagField = mf
minMagFieldLine = logdata.channels["MAG"]["MagX"].listData[index][0]
if mf>maxMagField:
if mf > maxMagField:
maxMagField = mf
maxMagFieldLine = logdata.channels["MAG"]["MagX"].listData[index][0]
if index == 0:
(minMagField, maxMagField) = (mf,mf)
(minMagField, maxMagField) = (mf, mf)
index += 1
if minMagField is None:
FAIL()
self.result.statusMessage = self.result.statusMessage + "No valid mag data found\n"
else:
percentDiff = (maxMagField-minMagField) / minMagField
percentDiff = (maxMagField - minMagField) / minMagField
if percentDiff > percentDiffThresholdFAIL:
FAIL()
self.result.statusMessage = self.result.statusMessage + "Large change in mag_field (%.2f%%)\n" % (percentDiff*100)
self.result.statusMessage = (
self.result.statusMessage + "Large change in mag_field (%.2f%%)\n" % (percentDiff * 100)
)
elif percentDiff > percentDiffThresholdWARN:
WARN()
self.result.statusMessage = self.result.statusMessage + "Moderate change in mag_field (%.2f%%)\n" % (percentDiff*100)
self.result.statusMessage = (
self.result.statusMessage + "Moderate change in mag_field (%.2f%%)\n" % (percentDiff * 100)
)
else:
self.result.statusMessage = self.result.statusMessage + "mag_field interference within limits (%.2f%%)\n" % (percentDiff*100)
self.result.statusMessage = (
self.result.statusMessage
+ "mag_field interference within limits (%.2f%%)\n" % (percentDiff * 100)
)
if minMagField < minMagFieldThreshold:
self.result.statusMessage = self.result.statusMessage + "Min mag field length (%.2f) < recommended (%.2f)\n" % (minMagField,minMagFieldThreshold)
self.result.statusMessage = (
self.result.statusMessage
+ "Min mag field length (%.2f) < recommended (%.2f)\n" % (minMagField, minMagFieldThreshold)
)
if maxMagField > maxMagFieldThreshold:
self.result.statusMessage = self.result.statusMessage + "Max mag field length (%.2f) > recommended (%.2f)\n" % (maxMagField,maxMagFieldThreshold)
self.result.statusMessage = (
self.result.statusMessage
+ "Max mag field length (%.2f) > recommended (%.2f)\n" % (maxMagField, maxMagFieldThreshold)
)
if verbose:
self.result.statusMessage = self.result.statusMessage + "Min mag_field of %.2f on line %d\n" % (minMagField,minMagFieldLine)
self.result.statusMessage = self.result.statusMessage + "Max mag_field of %.2f on line %d\n" % (maxMagField,maxMagFieldLine)
self.result.statusMessage = self.result.statusMessage + "Min mag_field of %.2f on line %d\n" % (
minMagField,
minMagFieldLine,
)
self.result.statusMessage = self.result.statusMessage + "Max mag_field of %.2f on line %d\n" % (
maxMagField,
maxMagFieldLine,
)
if zerosFound:
if self.result.status == TestResult.StatusType.GOOD:
WARN()
self.result.statusMessage = self.result.statusMessage + "All zeros found in MAG X/Y/Z log data\n"
else:
self.result.statusMessage = self.result.statusMessage + "No MAG data, unable to test mag_field interference\n"
self.result.statusMessage = (
self.result.statusMessage + "No MAG data, unable to test mag_field interference\n"
)
except KeyError as e:
self.result.status = TestResult.StatusType.FAIL

View File

@ -1,7 +1,7 @@
from __future__ import print_function
from LogAnalyzer import Test,TestResult
import DataflashLog
from LogAnalyzer import Test, TestResult
# import scipy
# import pylab #### TEMP!!! only for dev
@ -9,113 +9,102 @@ import DataflashLog
class TestDualGyroDrift(Test):
'''test for gyro drift between dual IMU data'''
'''test for gyro drift between dual IMU data'''
def __init__(self):
Test.__init__(self)
self.name = "Gyro Drift"
self.enable = False
def run(self, logdata, verbose):
self.result = TestResult()
self.result.status = TestResult.StatusType.GOOD
def __init__(self):
Test.__init__(self)
self.name = "Gyro Drift"
self.enable = False
# if "IMU" not in logdata.channels or "IMU2" not in logdata.channels:
# self.result.status = TestResult.StatusType.NA
# return
def run(self, logdata, verbose):
self.result = TestResult()
self.result.status = TestResult.StatusType.GOOD
# imuX = logdata.channels["IMU"]["GyrX"].listData
# imu2X = logdata.channels["IMU2"]["GyrX"].listData
# if "IMU" not in logdata.channels or "IMU2" not in logdata.channels:
# self.result.status = TestResult.StatusType.NA
# return
# # NOTE: weird thing about Holger's log is that the counts of IMU+IMU2 are different
# print("length 1: %.2f, length 2: %.2f" % (len(imuX),len(imu2X)))
# #assert(len(imuX) == len(imu2X))
# imuX = logdata.channels["IMU"]["GyrX"].listData
# imu2X = logdata.channels["IMU2"]["GyrX"].listData
# # divide the curve into segments and get the average of each segment
# # we will get the diff between those averages, rather than a per-sample diff as the IMU+IMU2 arrays are often not the same length
# diffThresholdWARN = 0.03
# diffThresholdFAIL = 0.05
# nSamples = 10
# imu1XAverages, imu1YAverages, imu1ZAverages, imu2XAverages, imu2YAverages, imu2ZAverages = ([],[],[],[],[],[])
# imuXDiffAverages, imuYDiffAverages, imuZDiffAverages = ([],[],[])
# maxDiffX, maxDiffY, maxDiffZ = (0,0,0)
# sliceLength1 = len(logdata.channels["IMU"]["GyrX"].dictData.values()) / nSamples
# sliceLength2 = len(logdata.channels["IMU2"]["GyrX"].dictData.values()) / nSamples
# for i in range(0,nSamples):
# imu1XAverages.append(numpy.mean(logdata.channels["IMU"]["GyrX"].dictData.values()[i*sliceLength1:i*sliceLength1+sliceLength1]))
# imu1YAverages.append(numpy.mean(logdata.channels["IMU"]["GyrY"].dictData.values()[i*sliceLength1:i*sliceLength1+sliceLength1]))
# imu1ZAverages.append(numpy.mean(logdata.channels["IMU"]["GyrZ"].dictData.values()[i*sliceLength1:i*sliceLength1+sliceLength1]))
# imu2XAverages.append(numpy.mean(logdata.channels["IMU2"]["GyrX"].dictData.values()[i*sliceLength2:i*sliceLength2+sliceLength2]))
# imu2YAverages.append(numpy.mean(logdata.channels["IMU2"]["GyrY"].dictData.values()[i*sliceLength2:i*sliceLength2+sliceLength2]))
# imu2ZAverages.append(numpy.mean(logdata.channels["IMU2"]["GyrZ"].dictData.values()[i*sliceLength2:i*sliceLength2+sliceLength2]))
# imuXDiffAverages.append(imu2XAverages[-1]-imu1XAverages[-1])
# imuYDiffAverages.append(imu2YAverages[-1]-imu1YAverages[-1])
# imuZDiffAverages.append(imu2ZAverages[-1]-imu1ZAverages[-1])
# if abs(imuXDiffAverages[-1]) > maxDiffX:
# maxDiffX = imuXDiffAverages[-1]
# if abs(imuYDiffAverages[-1]) > maxDiffY:
# maxDiffY = imuYDiffAverages[-1]
# if abs(imuZDiffAverages[-1]) > maxDiffZ:
# maxDiffZ = imuZDiffAverages[-1]
# if max(maxDiffX,maxDiffY,maxDiffZ) > diffThresholdFAIL:
# self.result.status = TestResult.StatusType.FAIL
# self.result.statusMessage = "IMU/IMU2 gyro averages differ by more than %s radians" % diffThresholdFAIL
# elif max(maxDiffX,maxDiffY,maxDiffZ) > diffThresholdWARN:
# self.result.status = TestResult.StatusType.WARN
# self.result.statusMessage = "IMU/IMU2 gyro averages differ by more than %s radians" % diffThresholdWARN
# # NOTE: weird thing about Holger's log is that the counts of IMU+IMU2 are different
# print("length 1: %.2f, length 2: %.2f" % (len(imuX),len(imu2X)))
# #assert(len(imuX) == len(imu2X))
# # pylab.plot(zip(*imuX)[0], zip(*imuX)[1], 'g')
# # pylab.plot(zip(*imu2X)[0], zip(*imu2X)[1], 'r')
# #pylab.plot(range(0,(nSamples*sliceLength1),sliceLength1), imu1ZAverages, 'b')
# print("Gyro averages1X: " + repr(imu1XAverages))
# print("Gyro averages1Y: " + repr(imu1YAverages))
# print("Gyro averages1Z: " + repr(imu1ZAverages) + "\n")
# print("Gyro averages2X: " + repr(imu2XAverages))
# print("Gyro averages2Y: " + repr(imu2YAverages))
# print("Gyro averages2Z: " + repr(imu2ZAverages) + "\n")
# print("Gyro averages diff X: " + repr(imuXDiffAverages))
# print("Gyro averages diff Y: " + repr(imuYDiffAverages))
# print("Gyro averages diff Z: " + repr(imuZDiffAverages))
# # lowpass filter using numpy
# # cutoff = 100
# # fs = 10000.0
# # b,a = scipy.signal.filter_design.butter(5,cutoff/(fs/2))
# # imuXFiltered = scipy.signal.filtfilt(b,a,zip(*imuX)[1])
# # imu2XFiltered = scipy.signal.filtfilt(b,a,zip(*imu2X)[1])
# #pylab.plot(imuXFiltered, 'r')
# # TMP: DISPLAY BEFORE+AFTER plots
# pylab.show()
# # print("imuX average before lowpass filter: %.8f" % logdata.channels["IMU"]["GyrX"].avg())
# # print("imuX average after lowpass filter: %.8f" % numpy.mean(imuXFiltered))
# # print("imu2X average before lowpass filter: %.8f" % logdata.channels["IMU2"]["GyrX"].avg())
# # print("imu2X average after lowpass filter: %.8f" % numpy.mean(imu2XFiltered))
# avg1X = logdata.channels["IMU"]["GyrX"].avg()
# avg1Y = logdata.channels["IMU"]["GyrY"].avg()
# avg1Z = logdata.channels["IMU"]["GyrZ"].avg()
# avg2X = logdata.channels["IMU2"]["GyrX"].avg()
# avg2Y = logdata.channels["IMU2"]["GyrY"].avg()
# avg2Z = logdata.channels["IMU2"]["GyrZ"].avg()
# avgRatioX = (max(avg1X,avg2X) - min(avg1X,avg2X)) / #abs(max(avg1X,avg2X) / min(avg1X,avg2X))
# avgRatioY = abs(max(avg1Y,avg2Y) / min(avg1Y,avg2Y))
# avgRatioZ = abs(max(avg1Z,avg2Z) / min(avg1Z,avg2Z))
# self.result.statusMessage = "IMU gyro avg: %.4f,%.4f,%.4f\nIMU2 gyro avg: %.4f,%.4f,%.4f\nAvg ratio: %.4f,%.4f,%.4f" % (avg1X,avg1Y,avg1Z, avg2X,avg2Y,avg2Z, avgRatioX,avgRatioY,avgRatioZ)
# # divide the curve into segments and get the average of each segment
# # we will get the diff between those averages, rather than a per-sample diff as the IMU+IMU2 arrays are often not the same length
# diffThresholdWARN = 0.03
# diffThresholdFAIL = 0.05
# nSamples = 10
# imu1XAverages, imu1YAverages, imu1ZAverages, imu2XAverages, imu2YAverages, imu2ZAverages = ([],[],[],[],[],[])
# imuXDiffAverages, imuYDiffAverages, imuZDiffAverages = ([],[],[])
# maxDiffX, maxDiffY, maxDiffZ = (0,0,0)
# sliceLength1 = len(logdata.channels["IMU"]["GyrX"].dictData.values()) / nSamples
# sliceLength2 = len(logdata.channels["IMU2"]["GyrX"].dictData.values()) / nSamples
# for i in range(0,nSamples):
# imu1XAverages.append(numpy.mean(logdata.channels["IMU"]["GyrX"].dictData.values()[i*sliceLength1:i*sliceLength1+sliceLength1]))
# imu1YAverages.append(numpy.mean(logdata.channels["IMU"]["GyrY"].dictData.values()[i*sliceLength1:i*sliceLength1+sliceLength1]))
# imu1ZAverages.append(numpy.mean(logdata.channels["IMU"]["GyrZ"].dictData.values()[i*sliceLength1:i*sliceLength1+sliceLength1]))
# imu2XAverages.append(numpy.mean(logdata.channels["IMU2"]["GyrX"].dictData.values()[i*sliceLength2:i*sliceLength2+sliceLength2]))
# imu2YAverages.append(numpy.mean(logdata.channels["IMU2"]["GyrY"].dictData.values()[i*sliceLength2:i*sliceLength2+sliceLength2]))
# imu2ZAverages.append(numpy.mean(logdata.channels["IMU2"]["GyrZ"].dictData.values()[i*sliceLength2:i*sliceLength2+sliceLength2]))
# imuXDiffAverages.append(imu2XAverages[-1]-imu1XAverages[-1])
# imuYDiffAverages.append(imu2YAverages[-1]-imu1YAverages[-1])
# imuZDiffAverages.append(imu2ZAverages[-1]-imu1ZAverages[-1])
# if abs(imuXDiffAverages[-1]) > maxDiffX:
# maxDiffX = imuXDiffAverages[-1]
# if abs(imuYDiffAverages[-1]) > maxDiffY:
# maxDiffY = imuYDiffAverages[-1]
# if abs(imuZDiffAverages[-1]) > maxDiffZ:
# maxDiffZ = imuZDiffAverages[-1]
# if max(maxDiffX,maxDiffY,maxDiffZ) > diffThresholdFAIL:
# self.result.status = TestResult.StatusType.FAIL
# self.result.statusMessage = "IMU/IMU2 gyro averages differ by more than %s radians" % diffThresholdFAIL
# elif max(maxDiffX,maxDiffY,maxDiffZ) > diffThresholdWARN:
# self.result.status = TestResult.StatusType.WARN
# self.result.statusMessage = "IMU/IMU2 gyro averages differ by more than %s radians" % diffThresholdWARN
# # pylab.plot(zip(*imuX)[0], zip(*imuX)[1], 'g')
# # pylab.plot(zip(*imu2X)[0], zip(*imu2X)[1], 'r')
# #pylab.plot(range(0,(nSamples*sliceLength1),sliceLength1), imu1ZAverages, 'b')
# print("Gyro averages1X: " + repr(imu1XAverages))
# print("Gyro averages1Y: " + repr(imu1YAverages))
# print("Gyro averages1Z: " + repr(imu1ZAverages) + "\n")
# print("Gyro averages2X: " + repr(imu2XAverages))
# print("Gyro averages2Y: " + repr(imu2YAverages))
# print("Gyro averages2Z: " + repr(imu2ZAverages) + "\n")
# print("Gyro averages diff X: " + repr(imuXDiffAverages))
# print("Gyro averages diff Y: " + repr(imuYDiffAverages))
# print("Gyro averages diff Z: " + repr(imuZDiffAverages))
# # lowpass filter using numpy
# # cutoff = 100
# # fs = 10000.0
# # b,a = scipy.signal.filter_design.butter(5,cutoff/(fs/2))
# # imuXFiltered = scipy.signal.filtfilt(b,a,zip(*imuX)[1])
# # imu2XFiltered = scipy.signal.filtfilt(b,a,zip(*imu2X)[1])
# #pylab.plot(imuXFiltered, 'r')
# # TMP: DISPLAY BEFORE+AFTER plots
# pylab.show()
# # print("imuX average before lowpass filter: %.8f" % logdata.channels["IMU"]["GyrX"].avg())
# # print("imuX average after lowpass filter: %.8f" % numpy.mean(imuXFiltered))
# # print("imu2X average before lowpass filter: %.8f" % logdata.channels["IMU2"]["GyrX"].avg())
# # print("imu2X average after lowpass filter: %.8f" % numpy.mean(imu2XFiltered))
# avg1X = logdata.channels["IMU"]["GyrX"].avg()
# avg1Y = logdata.channels["IMU"]["GyrY"].avg()
# avg1Z = logdata.channels["IMU"]["GyrZ"].avg()
# avg2X = logdata.channels["IMU2"]["GyrX"].avg()
# avg2Y = logdata.channels["IMU2"]["GyrY"].avg()
# avg2Z = logdata.channels["IMU2"]["GyrZ"].avg()
# avgRatioX = (max(avg1X,avg2X) - min(avg1X,avg2X)) / #abs(max(avg1X,avg2X) / min(avg1X,avg2X))
# avgRatioY = abs(max(avg1Y,avg2Y) / min(avg1Y,avg2Y))
# avgRatioZ = abs(max(avg1Z,avg2Z) / min(avg1Z,avg2Z))
# self.result.statusMessage = "IMU gyro avg: %.4f,%.4f,%.4f\nIMU2 gyro avg: %.4f,%.4f,%.4f\nAvg ratio: %.4f,%.4f,%.4f" % (avg1X,avg1Y,avg1Z, avg2X,avg2Y,avg2Z, avgRatioX,avgRatioY,avgRatioZ)

View File

@ -1,79 +1,76 @@
from __future__ import print_function
from LogAnalyzer import Test,TestResult
import DataflashLog
from LogAnalyzer import Test, TestResult
class TestDupeLogData(Test):
'''test for duplicated data in log, which has been happening on PX4/Pixhawk'''
'''test for duplicated data in log, which has been happening on PX4/Pixhawk'''
def __init__(self):
Test.__init__(self)
self.name = "Dupe Log Data"
def __matchSample(self, sample, sampleStartIndex, logdata):
'''return the line number where a match is found, otherwise return False'''
def __init__(self):
Test.__init__(self)
self.name = "Dupe Log Data"
# ignore if all data in sample is the same value
nSame = 0
for s in sample:
if s[1] == sample[0][1]:
nSame += 1
if nSame == 20:
return False
def __matchSample(self, sample, sampleStartIndex, logdata):
'''return the line number where a match is found, otherwise return False'''
# c
data = logdata.channels["ATT"]["Pitch"].listData
for i in range(sampleStartIndex, len(data)):
#print("Checking against index %d" % i)
if i == sampleStartIndex:
continue # skip matching against ourselves
j = 0
while j<20 and (i+j)<len(data) and data[i+j][1] == sample[j][1]:
#print("### Match found, j=%d, data=%f, sample=%f, log data matched to sample at line %d" % (j,data[i+j][1],sample[j][1],data[i+j][0]))
j += 1
if j == 20: # all samples match
return data[i][0]
return False
def run(self, logdata, verbose):
self.result = TestResult()
self.result.status = TestResult.StatusType.GOOD
# this could be made more flexible by not hard-coding to use ATT data, could make it dynamic based on whatever is available as long as it is highly variable
if "ATT" not in logdata.channels:
self.result.status = TestResult.StatusType.UNKNOWN
self.result.statusMessage = "No ATT log data"
return
# pick 10 sample points within the range of ATT data we have
sampleStartIndices = []
attStartIndex = 0
attEndIndex = len(logdata.channels["ATT"]["Pitch"].listData)-1
step = int(attEndIndex / 11)
for i in range(step,attEndIndex-step,step):
sampleStartIndices.append(i)
#print("Dupe data sample point index %d at line %d" % (i, logdata.channels["ATT"]["Pitch"].listData[i][0]))
# get 20 datapoints of pitch from each sample location and check for a match elsewhere
sampleIndex = 0
for i in range(sampleStartIndices[0], len(logdata.channels["ATT"]["Pitch"].listData)):
if i == sampleStartIndices[sampleIndex]:
#print("Checking sample %d" % i)
sample = logdata.channels["ATT"]["Pitch"].listData[i:i+20]
matchedLine = self.__matchSample(sample, i, logdata)
if matchedLine:
#print("Data from line %d found duplicated at line %d" % (sample[0][0],matchedLine))
self.result.status = TestResult.StatusType.FAIL
self.result.statusMessage = "Duplicate data chunks found in log (%d and %d)" % (sample[0][0],matchedLine)
return
sampleIndex += 1
if sampleIndex >= len(sampleStartIndices):
break
# ignore if all data in sample is the same value
nSame = 0
for s in sample:
if s[1] == sample[0][1]:
nSame += 1
if nSame == 20:
return False
# c
data = logdata.channels["ATT"]["Pitch"].listData
for i in range(sampleStartIndex, len(data)):
# print("Checking against index %d" % i)
if i == sampleStartIndex:
continue # skip matching against ourselves
j = 0
while j < 20 and (i + j) < len(data) and data[i + j][1] == sample[j][1]:
# print("### Match found, j=%d, data=%f, sample=%f, log data matched to sample at line %d" % (j,data[i+j][1],sample[j][1],data[i+j][0]))
j += 1
if j == 20: # all samples match
return data[i][0]
return False
def run(self, logdata, verbose):
self.result = TestResult()
self.result.status = TestResult.StatusType.GOOD
# this could be made more flexible by not hard-coding to use ATT data, could make it dynamic based on whatever is available as long as it is highly variable
if "ATT" not in logdata.channels:
self.result.status = TestResult.StatusType.UNKNOWN
self.result.statusMessage = "No ATT log data"
return
# pick 10 sample points within the range of ATT data we have
sampleStartIndices = []
attStartIndex = 0
attEndIndex = len(logdata.channels["ATT"]["Pitch"].listData) - 1
step = int(attEndIndex / 11)
for i in range(step, attEndIndex - step, step):
sampleStartIndices.append(i)
# print("Dupe data sample point index %d at line %d" % (i, logdata.channels["ATT"]["Pitch"].listData[i][0]))
# get 20 datapoints of pitch from each sample location and check for a match elsewhere
sampleIndex = 0
for i in range(sampleStartIndices[0], len(logdata.channels["ATT"]["Pitch"].listData)):
if i == sampleStartIndices[sampleIndex]:
# print("Checking sample %d" % i)
sample = logdata.channels["ATT"]["Pitch"].listData[i : i + 20]
matchedLine = self.__matchSample(sample, i, logdata)
if matchedLine:
# print("Data from line %d found duplicated at line %d" % (sample[0][0],matchedLine))
self.result.status = TestResult.StatusType.FAIL
self.result.statusMessage = "Duplicate data chunks found in log (%d and %d)" % (
sample[0][0],
matchedLine,
)
return
sampleIndex += 1
if sampleIndex >= len(sampleStartIndices):
break

View File

@ -1,20 +1,20 @@
from LogAnalyzer import Test,TestResult
import DataflashLog
from LogAnalyzer import Test, TestResult
class TestEmpty(Test):
'''test for empty or near-empty logs'''
'''test for empty or near-empty logs'''
def __init__(self):
Test.__init__(self)
self.name = "Empty"
def run(self, logdata, verbose):
self.result = TestResult()
self.result.status = TestResult.StatusType.GOOD
def __init__(self):
Test.__init__(self)
self.name = "Empty"
# all the logic for this test is in the helper function, as it can also be called up front as an early exit
emptyErr = DataflashLog.DataflashLogHelper.isLogEmpty(logdata)
if emptyErr:
self.result.status = TestResult.StatusType.FAIL
self.result.statusMessage = "Empty log? " + emptyErr
def run(self, logdata, verbose):
self.result = TestResult()
self.result.status = TestResult.StatusType.GOOD
# all the logic for this test is in the helper function, as it can also be called up front as an early exit
emptyErr = DataflashLog.DataflashLogHelper.isLogEmpty(logdata)
if emptyErr:
self.result.status = TestResult.StatusType.FAIL
self.result.statusMessage = "Empty log? " + emptyErr

View File

@ -1,56 +1,56 @@
from LogAnalyzer import Test,TestResult
import DataflashLog
from LogAnalyzer import Test, TestResult
class TestEvents(Test):
'''test for erroneous events and failsafes'''
# TODO: need to check for vehicle-specific codes
'''test for erroneous events and failsafes'''
def __init__(self):
Test.__init__(self)
self.name = "Event/Failsafe"
# TODO: need to check for vehicle-specific codes
def run(self, logdata, verbose):
self.result = TestResult()
self.result.status = TestResult.StatusType.GOOD
def __init__(self):
Test.__init__(self)
self.name = "Event/Failsafe"
errors = set()
def run(self, logdata, verbose):
self.result = TestResult()
self.result.status = TestResult.StatusType.GOOD
if "ERR" in logdata.channels:
assert(len(logdata.channels["ERR"]["Subsys"].listData) == len(logdata.channels["ERR"]["ECode"].listData))
for i in range(len(logdata.channels["ERR"]["Subsys"].listData)):
subSys = logdata.channels["ERR"]["Subsys"].listData[i][1]
eCode = logdata.channels["ERR"]["ECode"].listData[i][1]
if subSys == 2 and (eCode == 1):
errors.add("PPM")
elif subSys == 3 and (eCode == 1 or eCode == 2):
errors.add("COMPASS")
elif subSys == 5 and (eCode == 1):
errors.add("FS_THR")
elif subSys == 6 and (eCode == 1):
errors.add("FS_BATT")
elif subSys == 7 and (eCode == 1):
errors.add("GPS")
elif subSys == 8 and (eCode == 1):
errors.add("GCS")
elif subSys == 9 and (eCode == 1 or eCode == 2):
errors.add("FENCE")
elif subSys == 10:
errors.add("FLT_MODE")
elif subSys == 11 and (eCode == 2):
errors.add("GPS_GLITCH")
elif subSys == 12 and (eCode == 1):
errors.add("CRASH")
errors = set()
if errors:
if len(errors) == 1 and "FENCE" in errors:
self.result.status = TestResult.StatusType.WARN
else:
self.result.status = TestResult.StatusType.FAIL
if len(errors) == 1:
self.result.statusMessage = "ERR found: "
else:
self.result.statusMessage = "ERRs found: "
for err in errors:
self.result.statusMessage = self.result.statusMessage + err + " "
if "ERR" in logdata.channels:
assert len(logdata.channels["ERR"]["Subsys"].listData) == len(logdata.channels["ERR"]["ECode"].listData)
for i in range(len(logdata.channels["ERR"]["Subsys"].listData)):
subSys = logdata.channels["ERR"]["Subsys"].listData[i][1]
eCode = logdata.channels["ERR"]["ECode"].listData[i][1]
if subSys == 2 and (eCode == 1):
errors.add("PPM")
elif subSys == 3 and (eCode == 1 or eCode == 2):
errors.add("COMPASS")
elif subSys == 5 and (eCode == 1):
errors.add("FS_THR")
elif subSys == 6 and (eCode == 1):
errors.add("FS_BATT")
elif subSys == 7 and (eCode == 1):
errors.add("GPS")
elif subSys == 8 and (eCode == 1):
errors.add("GCS")
elif subSys == 9 and (eCode == 1 or eCode == 2):
errors.add("FENCE")
elif subSys == 10:
errors.add("FLT_MODE")
elif subSys == 11 and (eCode == 2):
errors.add("GPS_GLITCH")
elif subSys == 12 and (eCode == 1):
errors.add("CRASH")
if errors:
if len(errors) == 1 and "FENCE" in errors:
self.result.status = TestResult.StatusType.WARN
else:
self.result.status = TestResult.StatusType.FAIL
if len(errors) == 1:
self.result.statusMessage = "ERR found: "
else:
self.result.statusMessage = "ERRs found: "
for err in errors:
self.result.statusMessage = self.result.statusMessage + err + " "

View File

@ -32,8 +32,7 @@ class TestGPSGlitch(Test):
# leaving the test in for all
gpsGlitchCount = 0
if "ERR" in logdata.channels:
assert(len(logdata.channels["ERR"]["Subsys"].listData) ==
len(logdata.channels["ERR"]["ECode"].listData))
assert len(logdata.channels["ERR"]["Subsys"].listData) == len(logdata.channels["ERR"]["ECode"].listData)
for i in range(len(logdata.channels["ERR"]["Subsys"].listData)):
subSys = logdata.channels["ERR"]["Subsys"].listData[i][1]
eCode = logdata.channels["ERR"]["ECode"].listData[i][1]
@ -41,8 +40,7 @@ class TestGPSGlitch(Test):
gpsGlitchCount += 1
if gpsGlitchCount:
self.result.status = TestResult.StatusType.FAIL
self.result.statusMessage = ("GPS glitch errors found (%d)" %
gpsGlitchCount)
self.result.statusMessage = "GPS glitch errors found (%d)" % gpsGlitchCount
# define and check different thresholds for WARN level and
# FAIL level
@ -58,11 +56,9 @@ class TestGPSGlitch(Test):
foundBadHDopWarn = hdopChan.max() > maxHDopWARN
foundBadSatsFail = satsChan.min() < minSatsFAIL
foundBadHDopFail = hdopChan.max() > maxHDopFAIL
satsMsg = ("Min satellites: %s, Max HDop: %s" %
(satsChan.min(), hdopChan.max()))
satsMsg = "Min satellites: %s, Max HDop: %s" % (satsChan.min(), hdopChan.max())
if gpsGlitchCount:
self.result.statusMessage = "\n".join([self.result.statusMessage,
satsMsg])
self.result.statusMessage = "\n".join([self.result.statusMessage, satsMsg])
if foundBadSatsFail or foundBadHDopFail:
if not gpsGlitchCount:
self.result.status = TestResult.StatusType.FAIL

View File

@ -1,9 +1,10 @@
from __future__ import print_function
from LogAnalyzer import Test,TestResult
import DataflashLog
from math import sqrt
import DataflashLog
from LogAnalyzer import Test, TestResult
class TestIMUMatch(Test):
'''test for empty or near-empty logs'''
@ -14,8 +15,8 @@ class TestIMUMatch(Test):
def run(self, logdata, verbose):
#tuning parameters:
warn_threshold = .75
# tuning parameters:
warn_threshold = 0.75
fail_threshold = 1.5
filter_tc = 5.0
@ -36,7 +37,7 @@ class TestIMUMatch(Test):
imu2 = logdata.channels["IMU2"]
timeLabel = None
for i in 'TimeMS','TimeUS','Time':
for i in 'TimeMS', 'TimeUS', 'Time':
if i in logdata.channels["GPS"]:
timeLabel = i
break
@ -50,18 +51,32 @@ class TestIMUMatch(Test):
imu2_accy = imu2["AccY"].listData
imu2_accz = imu2["AccZ"].listData
imu_multiplier = 1.0E-3
imu_multiplier = 1.0e-3
if timeLabel == 'TimeUS':
imu_multiplier = 1.0E-6
imu_multiplier = 1.0e-6
imu1 = []
imu2 = []
for i in range(len(imu1_timems)):
imu1.append({ 't': imu1_timems[i][1]*imu_multiplier, 'x': imu1_accx[i][1], 'y': imu1_accy[i][1], 'z': imu1_accz[i][1]})
imu1.append(
{
't': imu1_timems[i][1] * imu_multiplier,
'x': imu1_accx[i][1],
'y': imu1_accy[i][1],
'z': imu1_accz[i][1],
}
)
for i in range(len(imu2_timems)):
imu2.append({ 't': imu2_timems[i][1]*imu_multiplier, 'x': imu2_accx[i][1], 'y': imu2_accy[i][1], 'z': imu2_accz[i][1]})
imu2.append(
{
't': imu2_timems[i][1] * imu_multiplier,
'x': imu2_accx[i][1],
'y': imu2_accy[i][1],
'z': imu2_accz[i][1],
}
)
imu1.sort(key=lambda x: x['t'])
imu2.sort(key=lambda x: x['t'])
@ -76,40 +91,48 @@ class TestIMUMatch(Test):
max_diff_filtered = 0
for i in range(len(imu1)):
#find closest imu2 value
# find closest imu2 value
t = imu1[i]['t']
dt = 0 if last_t is None else t-last_t
dt=min(dt,.1)
dt = 0 if last_t is None else t - last_t
dt = min(dt, 0.1)
next_imu2 = None
for i in range(imu2_index,len(imu2)):
for i in range(imu2_index, len(imu2)):
next_imu2 = imu2[i]
imu2_index=i
imu2_index = i
if next_imu2['t'] >= t:
break
prev_imu2 = imu2[imu2_index-1]
closest_imu2 = next_imu2 if abs(next_imu2['t']-t)<abs(prev_imu2['t']-t) else prev_imu2
prev_imu2 = imu2[imu2_index - 1]
closest_imu2 = next_imu2 if abs(next_imu2['t'] - t) < abs(prev_imu2['t'] - t) else prev_imu2
xdiff = imu1[i]['x']-closest_imu2['x']
ydiff = imu1[i]['y']-closest_imu2['y']
zdiff = imu1[i]['z']-closest_imu2['z']
xdiff = imu1[i]['x'] - closest_imu2['x']
ydiff = imu1[i]['y'] - closest_imu2['y']
zdiff = imu1[i]['z'] - closest_imu2['z']
xdiff_filtered += (xdiff-xdiff_filtered)*dt/filter_tc
ydiff_filtered += (ydiff-ydiff_filtered)*dt/filter_tc
zdiff_filtered += (zdiff-zdiff_filtered)*dt/filter_tc
xdiff_filtered += (xdiff - xdiff_filtered) * dt / filter_tc
ydiff_filtered += (ydiff - ydiff_filtered) * dt / filter_tc
zdiff_filtered += (zdiff - zdiff_filtered) * dt / filter_tc
diff_filtered = sqrt(xdiff_filtered**2+ydiff_filtered**2+zdiff_filtered**2)
max_diff_filtered = max(max_diff_filtered,diff_filtered)
#print(max_diff_filtered)
diff_filtered = sqrt(xdiff_filtered**2 + ydiff_filtered**2 + zdiff_filtered**2)
max_diff_filtered = max(max_diff_filtered, diff_filtered)
# print(max_diff_filtered)
last_t = t
if max_diff_filtered > fail_threshold:
self.result.statusMessage = "Check vibration or accelerometer calibration. (Mismatch: %.2f, WARN: %.2f, FAIL: %.2f)" % (max_diff_filtered,warn_threshold,fail_threshold)
self.result.statusMessage = (
"Check vibration or accelerometer calibration. (Mismatch: %.2f, WARN: %.2f, FAIL: %.2f)"
% (max_diff_filtered, warn_threshold, fail_threshold)
)
self.result.status = TestResult.StatusType.FAIL
elif max_diff_filtered > warn_threshold:
self.result.statusMessage = "Check vibration or accelerometer calibration. (Mismatch: %.2f, WARN: %.2f, FAIL: %.2f)" % (max_diff_filtered,warn_threshold,fail_threshold)
self.result.statusMessage = (
"Check vibration or accelerometer calibration. (Mismatch: %.2f, WARN: %.2f, FAIL: %.2f)"
% (max_diff_filtered, warn_threshold, fail_threshold)
)
self.result.status = TestResult.StatusType.WARN
else:
self.result.statusMessage = "(Mismatch: %.2f, WARN: %.2f, FAIL: %.2f)" % (max_diff_filtered,warn_threshold, fail_threshold)
self.result.statusMessage = "(Mismatch: %.2f, WARN: %.2f, FAIL: %.2f)" % (
max_diff_filtered,
warn_threshold,
fail_threshold,
)

View File

@ -1,10 +1,11 @@
from LogAnalyzer import Test,TestResult
import DataflashLog
from LogAnalyzer import Test, TestResult
from VehicleType import VehicleType
class TestBalanceTwist(Test):
'''test for badly unbalanced copter, including yaw twist'''
def __init__(self):
Test.__init__(self)
self.name = "Motor Balance"
@ -25,14 +26,14 @@ class TestBalanceTwist(Test):
for i in range(8):
for prefix in "Chan", "Ch", "C":
if prefix+repr((i+1)) in logdata.channels["RCOU"]:
ch.append(map(lambda x: x[1], logdata.channels["RCOU"][prefix+repr((i+1))].listData))
if prefix + repr((i + 1)) in logdata.channels["RCOU"]:
ch.append(map(lambda x: x[1], logdata.channels["RCOU"][prefix + repr((i + 1))].listData))
ch = zip(*ch)
num_channels = 0
ch = list(ch)
for i in range(len(ch)):
ch[i] = list(filter(lambda x: (x>0 and x<3000), ch[i]))
ch[i] = list(filter(lambda x: (x > 0 and x < 3000), ch[i]))
if num_channels < len(ch[i]):
num_channels = len(ch[i])
@ -43,11 +44,20 @@ class TestBalanceTwist(Test):
return
try:
min_throttle = logdata.parameters["RC3_MIN"] + logdata.parameters["THR_MIN"] / (logdata.parameters["RC3_MAX"]-logdata.parameters["RC3_MIN"])/1000.0
min_throttle = (
logdata.parameters["RC3_MIN"]
+ logdata.parameters["THR_MIN"]
/ (logdata.parameters["RC3_MAX"] - logdata.parameters["RC3_MIN"])
/ 1000.0
)
except KeyError as e:
min_throttle = logdata.parameters["MOT_PWM_MIN"] / (logdata.parameters["MOT_PWM_MAX"]-logdata.parameters["RC3_MIN"])/1000.0
min_throttle = (
logdata.parameters["MOT_PWM_MIN"]
/ (logdata.parameters["MOT_PWM_MAX"] - logdata.parameters["RC3_MIN"])
/ 1000.0
)
ch = list(filter(lambda x:sum(x)/num_channels > min_throttle, ch))
ch = list(filter(lambda x: sum(x) / num_channels > min_throttle, ch))
if len(ch) == 0:
return
@ -55,17 +65,20 @@ class TestBalanceTwist(Test):
avg_sum = 0
avg_ch = []
for i in range(num_channels):
avg = list(map(lambda x: x[i],ch))
avg = sum(avg)/len(avg)
avg = list(map(lambda x: x[i], ch))
avg = sum(avg) / len(avg)
avg_ch.append(avg)
avg_sum += avg
avg_all = avg_sum / num_channels
self.result.statusMessage = "Motor channel averages = %s\nAverage motor output = %.0f\nDifference between min and max motor averages = %.0f" % (str(avg_ch),avg_all,abs(min(avg_ch)-max(avg_ch)))
self.result.statusMessage = (
"Motor channel averages = %s\nAverage motor output = %.0f\nDifference between min and max motor averages = %.0f"
% (str(avg_ch), avg_all, abs(min(avg_ch) - max(avg_ch)))
)
self.result.status = TestResult.StatusType.GOOD
if abs(min(avg_ch)-max(avg_ch)) > 75:
if abs(min(avg_ch) - max(avg_ch)) > 75:
self.result.status = TestResult.StatusType.WARN
if abs(min(avg_ch)-max(avg_ch)) > 150:
if abs(min(avg_ch) - max(avg_ch)) > 150:
self.result.status = TestResult.StatusType.FAIL

View File

@ -1,6 +1,8 @@
from LogAnalyzer import Test,TestResult
import math
from LogAnalyzer import Test, TestResult
class TestNaN(Test):
'''test for NaNs present in log'''
@ -16,8 +18,8 @@ class TestNaN(Test):
self.result.status = TestResult.StatusType.FAIL
nans_ok = {
"CTUN": [ "DSAlt", "TAlt" ],
"POS": [ "RelOriginAlt"],
"CTUN": ["DSAlt", "TAlt"],
"POS": ["RelOriginAlt"],
}
for channel in logdata.channels.keys():
@ -29,7 +31,10 @@ class TestNaN(Test):
(ts, val) = tupe
if isinstance(val, float) and math.isnan(val):
FAIL()
self.result.statusMessage += "Found NaN in %s.%s\n" % (channel, field,)
self.result.statusMessage += "Found NaN in %s.%s\n" % (
channel,
field,
)
raise ValueError()
except ValueError as e:
continue

View File

@ -1,12 +1,14 @@
from LogAnalyzer import Test,TestResult
import DataflashLog
from math import sqrt
import numpy as np
import DataflashLog
import matplotlib.pyplot as plt
import numpy as np
from LogAnalyzer import Test, TestResult
class TestFlow(Test):
'''test optical flow sensor scale factor calibration'''
#
# Use the following procedure to log the calibration data. is assumed that the optical flow sensor has been
# correctly aligned, is focussed and the test is performed over a textured surface with adequate lighting.
@ -42,13 +44,17 @@ class TestFlow(Test):
try:
# tuning parameters used by the algorithm
tilt_threshold = 15 # roll and pitch threshold used to start and stop calibration (deg)
quality_threshold = 124 # minimum flow quality required for data to be used by the curve fit (N/A)
min_rate_threshold = 0.0 # if the gyro rate is less than this, the data will not be used by the curve fit (rad/sec)
max_rate_threshold = 2.0 # if the gyro rate is greter than this, the data will not be used by the curve fit (rad/sec)
param_std_threshold = 5.0 # maximum allowable 1-std uncertainty in scaling parameter (scale factor * 1000)
param_abs_threshold = 200 # max/min allowable scale factor parameter. Values of FLOW_FXSCALER and FLOW_FYSCALER outside the range of +-param_abs_threshold indicate a sensor configuration problem.
min_num_points = 100 # minimum number of points required for a curve fit - this is necessary, but not sufficient condition - the standard deviation estimate of the fit gradient is also important.
tilt_threshold = 15 # roll and pitch threshold used to start and stop calibration (deg)
quality_threshold = 124 # minimum flow quality required for data to be used by the curve fit (N/A)
min_rate_threshold = (
0.0 # if the gyro rate is less than this, the data will not be used by the curve fit (rad/sec)
)
max_rate_threshold = (
2.0 # if the gyro rate is greter than this, the data will not be used by the curve fit (rad/sec)
)
param_std_threshold = 5.0 # maximum allowable 1-std uncertainty in scaling parameter (scale factor * 1000)
param_abs_threshold = 200 # max/min allowable scale factor parameter. Values of FLOW_FXSCALER and FLOW_FYSCALER outside the range of +-param_abs_threshold indicate a sensor configuration problem.
min_num_points = 100 # minimum number of points required for a curve fit - this is necessary, but not sufficient condition - the standard deviation estimate of the fit gradient is also important.
# get the existing scale parameters
flow_fxscaler = logdata.parameters["FLOW_FXSCALER"]
@ -58,27 +64,27 @@ class TestFlow(Test):
if "OF" in logdata.channels:
flowX = np.zeros(len(logdata.channels["OF"]["flowX"].listData))
for i in range(len(logdata.channels["OF"]["flowX"].listData)):
(line, flowX[i]) = logdata.channels["OF"]["flowX"].listData[i]
(line, flowX[i]) = logdata.channels["OF"]["flowX"].listData[i]
bodyX = np.zeros(len(logdata.channels["OF"]["bodyX"].listData))
for i in range(len(logdata.channels["OF"]["bodyX"].listData)):
(line, bodyX[i]) = logdata.channels["OF"]["bodyX"].listData[i]
(line, bodyX[i]) = logdata.channels["OF"]["bodyX"].listData[i]
flowY = np.zeros(len(logdata.channels["OF"]["flowY"].listData))
for i in range(len(logdata.channels["OF"]["flowY"].listData)):
(line, flowY[i]) = logdata.channels["OF"]["flowY"].listData[i]
(line, flowY[i]) = logdata.channels["OF"]["flowY"].listData[i]
bodyY = np.zeros(len(logdata.channels["OF"]["bodyY"].listData))
for i in range(len(logdata.channels["OF"]["bodyY"].listData)):
(line, bodyY[i]) = logdata.channels["OF"]["bodyY"].listData[i]
(line, bodyY[i]) = logdata.channels["OF"]["bodyY"].listData[i]
flow_time_us = np.zeros(len(logdata.channels["OF"]["TimeUS"].listData))
for i in range(len(logdata.channels["OF"]["TimeUS"].listData)):
(line, flow_time_us[i]) = logdata.channels["OF"]["TimeUS"].listData[i]
(line, flow_time_us[i]) = logdata.channels["OF"]["TimeUS"].listData[i]
flow_qual = np.zeros(len(logdata.channels["OF"]["Qual"].listData))
for i in range(len(logdata.channels["OF"]["Qual"].listData)):
(line, flow_qual[i]) = logdata.channels["OF"]["Qual"].listData[i]
(line, flow_qual[i]) = logdata.channels["OF"]["Qual"].listData[i]
else:
FAIL()
@ -89,15 +95,15 @@ class TestFlow(Test):
if "ATT" in logdata.channels:
Roll = np.zeros(len(logdata.channels["ATT"]["Roll"].listData))
for i in range(len(logdata.channels["ATT"]["Roll"].listData)):
(line, Roll[i]) = logdata.channels["ATT"]["Roll"].listData[i]
(line, Roll[i]) = logdata.channels["ATT"]["Roll"].listData[i]
Pitch = np.zeros(len(logdata.channels["ATT"]["Pitch"].listData))
for i in range(len(logdata.channels["ATT"]["Pitch"].listData)):
(line, Pitch[i]) = logdata.channels["ATT"]["Pitch"].listData[i]
(line, Pitch[i]) = logdata.channels["ATT"]["Pitch"].listData[i]
att_time_us = np.zeros(len(logdata.channels["ATT"]["TimeUS"].listData))
for i in range(len(logdata.channels["ATT"]["TimeUS"].listData)):
(line, att_time_us[i]) = logdata.channels["ATT"]["TimeUS"].listData[i]
(line, att_time_us[i]) = logdata.channels["ATT"]["TimeUS"].listData[i]
else:
FAIL()
@ -119,17 +125,17 @@ class TestFlow(Test):
# calculate the end time for the roll calibration
endTime = int(0)
endRollIndex = int(0)
for i in range(len(Roll)-1,-1,-1):
for i in range(len(Roll) - 1, -1, -1):
if abs(Roll[i]) > tilt_threshold:
endTime = att_time_us[i]
break
for i in range(len(flow_time_us)-1,-1,-1):
for i in range(len(flow_time_us) - 1, -1, -1):
if flow_time_us[i] < endTime:
endRollIndex = i
break
# check we have enough roll data points
if (endRollIndex - startRollIndex <= min_num_points):
if endRollIndex - startRollIndex <= min_num_points:
FAIL()
self.result.statusMessage = "FAIL: insufficient roll data pointsa\n"
return
@ -140,7 +146,13 @@ class TestFlow(Test):
bodyX_resampled = []
flowX_time_us_resampled = []
for i in range(len(Roll)):
if (i >= startRollIndex) and (i <= endRollIndex) and (abs(bodyX[i]) > min_rate_threshold) and (abs(bodyX[i]) < max_rate_threshold) and (flow_qual[i] > quality_threshold):
if (
(i >= startRollIndex)
and (i <= endRollIndex)
and (abs(bodyX[i]) > min_rate_threshold)
and (abs(bodyX[i]) < max_rate_threshold)
and (flow_qual[i] > quality_threshold)
):
flowX_resampled.append(flowX[i])
bodyX_resampled.append(bodyX[i])
flowX_time_us_resampled.append(flow_time_us[i])
@ -160,17 +172,17 @@ class TestFlow(Test):
# calculate the end time for the pitch calibration
endTime = 0
endPitchIndex = int(0)
for i in range(len(Pitch)-1,-1,-1):
for i in range(len(Pitch) - 1, -1, -1):
if abs(Pitch[i]) > tilt_threshold:
endTime = att_time_us[i]
break
for i in range(len(flow_time_us)-1,-1,-1):
for i in range(len(flow_time_us) - 1, -1, -1):
if flow_time_us[i] < endTime:
endPitchIndex = i
break
# check we have enough pitch data points
if (endPitchIndex - startPitchIndex <= min_num_points):
if endPitchIndex - startPitchIndex <= min_num_points:
FAIL()
self.result.statusMessage = "FAIL: insufficient pitch data pointsa\n"
return
@ -181,34 +193,58 @@ class TestFlow(Test):
bodyY_resampled = []
flowY_time_us_resampled = []
for i in range(len(Roll)):
if (i >= startPitchIndex) and (i <= endPitchIndex) and (abs(bodyY[i]) > min_rate_threshold) and (abs(bodyY[i]) < max_rate_threshold) and (flow_qual[i] > quality_threshold):
if (
(i >= startPitchIndex)
and (i <= endPitchIndex)
and (abs(bodyY[i]) > min_rate_threshold)
and (abs(bodyY[i]) < max_rate_threshold)
and (flow_qual[i] > quality_threshold)
):
flowY_resampled.append(flowY[i])
bodyY_resampled.append(bodyY[i])
flowY_time_us_resampled.append(flow_time_us[i])
# fit a straight line to the flow vs body rate data and calculate the scale factor parameter required to achieve a slope of 1
coef_flow_x , cov_x = np.polyfit(bodyX_resampled,flowX_resampled,1,rcond=None, full=False, w=None, cov=True)
coef_flow_y , cov_y = np.polyfit(bodyY_resampled,flowY_resampled,1,rcond=None, full=False, w=None, cov=True)
coef_flow_x, cov_x = np.polyfit(
bodyX_resampled, flowX_resampled, 1, rcond=None, full=False, w=None, cov=True
)
coef_flow_y, cov_y = np.polyfit(
bodyY_resampled, flowY_resampled, 1, rcond=None, full=False, w=None, cov=True
)
# taking the exisiting scale factor parameters into account, calculate the parameter values reequired to achieve a unity slope
flow_fxscaler_new = int(1000 * (((1 + 0.001 * float(flow_fxscaler))/coef_flow_x[0] - 1)))
flow_fyscaler_new = int(1000 * (((1 + 0.001 * float(flow_fyscaler))/coef_flow_y[0] - 1)))
flow_fxscaler_new = int(1000 * (((1 + 0.001 * float(flow_fxscaler)) / coef_flow_x[0] - 1)))
flow_fyscaler_new = int(1000 * (((1 + 0.001 * float(flow_fyscaler)) / coef_flow_y[0] - 1)))
# Do a sanity check on the scale factor variance
if sqrt(cov_x[0][0]) > param_std_threshold or sqrt(cov_y[0][0]) > param_std_threshold:
FAIL()
self.result.statusMessage = "FAIL: inaccurate fit - poor quality or insufficient data\nFLOW_FXSCALER 1STD = %u\nFLOW_FYSCALER 1STD = %u\n" % (round(1000*sqrt(cov_x[0][0])),round(1000*sqrt(cov_y[0][0])))
self.result.statusMessage = (
"FAIL: inaccurate fit - poor quality or insufficient data\nFLOW_FXSCALER 1STD = %u\nFLOW_FYSCALER 1STD = %u\n"
% (round(1000 * sqrt(cov_x[0][0])), round(1000 * sqrt(cov_y[0][0])))
)
# Do a sanity check on the scale factors
if abs(flow_fxscaler_new) > param_abs_threshold or abs(flow_fyscaler_new) > param_abs_threshold:
FAIL()
self.result.statusMessage = "FAIL: required scale factors are excessive\nFLOW_FXSCALER=%i\nFLOW_FYSCALER=%i\n" % (flow_fxscaler,flow_fyscaler)
self.result.statusMessage = (
"FAIL: required scale factors are excessive\nFLOW_FXSCALER=%i\nFLOW_FYSCALER=%i\n"
% (flow_fxscaler, flow_fyscaler)
)
# display recommended scale factors
self.result.statusMessage = "Set FLOW_FXSCALER to %i\nSet FLOW_FYSCALER to %i\n\nCal plots saved to flow_calibration.pdf\nCal parameters saved to flow_calibration.param\n\nFLOW_FXSCALER 1STD = %u\nFLOW_FYSCALER 1STD = %u\n" % (flow_fxscaler_new,flow_fyscaler_new,round(1000*sqrt(cov_x[0][0])),round(1000*sqrt(cov_y[0][0])))
self.result.statusMessage = (
"Set FLOW_FXSCALER to %i\nSet FLOW_FYSCALER to %i\n\nCal plots saved to flow_calibration.pdf\nCal parameters saved to flow_calibration.param\n\nFLOW_FXSCALER 1STD = %u\nFLOW_FYSCALER 1STD = %u\n"
% (
flow_fxscaler_new,
flow_fyscaler_new,
round(1000 * sqrt(cov_x[0][0])),
round(1000 * sqrt(cov_y[0][0])),
)
)
# calculate fit display data
body_rate_display = [-max_rate_threshold,max_rate_threshold]
body_rate_display = [-max_rate_threshold, max_rate_threshold]
fit_coef_x = np.poly1d(coef_flow_x)
flowX_display = fit_coef_x(body_rate_display)
fit_coef_y = np.poly1d(coef_flow_y)
@ -216,13 +252,14 @@ class TestFlow(Test):
# plot and save calibration test points to PDF
from matplotlib.backends.backend_pdf import PdfPages
output_plot_filename = "flow_calibration.pdf"
pp = PdfPages(output_plot_filename)
plt.figure(1,figsize=(20,13))
plt.subplot(2,1,1)
plt.plot(bodyX_resampled,flowX_resampled,'b', linestyle=' ', marker='o',label="test points")
plt.plot(body_rate_display,flowX_display,'r',linewidth=2.5,label="linear fit")
plt.figure(1, figsize=(20, 13))
plt.subplot(2, 1, 1)
plt.plot(bodyX_resampled, flowX_resampled, 'b', linestyle=' ', marker='o', label="test points")
plt.plot(body_rate_display, flowX_display, 'r', linewidth=2.5, label="linear fit")
plt.title('X axis flow rate vs gyro rate')
plt.ylabel('flow rate (rad/s)')
plt.xlabel('gyro rate (rad/sec)')
@ -230,9 +267,9 @@ class TestFlow(Test):
plt.legend(loc='upper left')
# draw plots
plt.subplot(2,1,2)
plt.plot(bodyY_resampled,flowY_resampled,'b', linestyle=' ', marker='o',label="test points")
plt.plot(body_rate_display,flowY_display,'r',linewidth=2.5,label="linear fit")
plt.subplot(2, 1, 2)
plt.plot(bodyY_resampled, flowY_resampled, 'b', linestyle=' ', marker='o', label="test points")
plt.plot(body_rate_display, flowY_display, 'r', linewidth=2.5, label="linear fit")
plt.title('Y axis flow rate vs gyro rate')
plt.ylabel('flow rate (rad/s)')
plt.xlabel('gyro rate (rad/sec)')
@ -241,12 +278,12 @@ class TestFlow(Test):
pp.savefig()
plt.figure(2,figsize=(20,13))
plt.subplot(2,1,1)
plt.plot(flow_time_us,flowX,'b',label="flow rate - all")
plt.plot(flow_time_us,bodyX,'r',label="gyro rate - all")
plt.plot(flowX_time_us_resampled,flowX_resampled,'c', linestyle=' ', marker='o',label="flow rate - used")
plt.plot(flowX_time_us_resampled,bodyX_resampled,'m', linestyle=' ', marker='o',label="gyro rate - used")
plt.figure(2, figsize=(20, 13))
plt.subplot(2, 1, 1)
plt.plot(flow_time_us, flowX, 'b', label="flow rate - all")
plt.plot(flow_time_us, bodyX, 'r', label="gyro rate - all")
plt.plot(flowX_time_us_resampled, flowX_resampled, 'c', linestyle=' ', marker='o', label="flow rate - used")
plt.plot(flowX_time_us_resampled, bodyX_resampled, 'm', linestyle=' ', marker='o', label="gyro rate - used")
plt.title('X axis flow and body rate vs time')
plt.ylabel('rate (rad/s)')
plt.xlabel('time (usec)')
@ -254,11 +291,11 @@ class TestFlow(Test):
plt.legend(loc='upper left')
# draw plots
plt.subplot(2,1,2)
plt.plot(flow_time_us,flowY,'b',label="flow rate - all")
plt.plot(flow_time_us,bodyY,'r',label="gyro rate - all")
plt.plot(flowY_time_us_resampled,flowY_resampled,'c', linestyle=' ', marker='o',label="flow rate - used")
plt.plot(flowY_time_us_resampled,bodyY_resampled,'m', linestyle=' ', marker='o',label="gyro rate - used")
plt.subplot(2, 1, 2)
plt.plot(flow_time_us, flowY, 'b', label="flow rate - all")
plt.plot(flow_time_us, bodyY, 'r', label="gyro rate - all")
plt.plot(flowY_time_us_resampled, flowY_resampled, 'c', linestyle=' ', marker='o', label="flow rate - used")
plt.plot(flowY_time_us_resampled, bodyY_resampled, 'm', linestyle=' ', marker='o', label="gyro rate - used")
plt.title('Y axis flow and body rate vs time')
plt.ylabel('rate (rad/s)')
plt.xlabel('time (usec)')
@ -275,21 +312,11 @@ class TestFlow(Test):
# write correction parameters to file
test_results_filename = "flow_calibration.param"
file = open(test_results_filename,"w")
file.write("FLOW_FXSCALER"+" "+str(flow_fxscaler_new)+"\n")
file.write("FLOW_FYSCALER"+" "+str(flow_fyscaler_new)+"\n")
file = open(test_results_filename, "w")
file.write("FLOW_FXSCALER" + " " + str(flow_fxscaler_new) + "\n")
file.write("FLOW_FYSCALER" + " " + str(flow_fyscaler_new) + "\n")
file.close()
except KeyError as e:
self.result.status = TestResult.StatusType.FAIL
self.result.statusMessage = str(e) + ' not found'

View File

@ -1,8 +1,8 @@
from LogAnalyzer import Test, TestResult
import DataflashLog
from VehicleType import VehicleType
import math # for isnan()
import math # for isnan()
import DataflashLog
from LogAnalyzer import Test, TestResult
from VehicleType import VehicleType
class TestParams(Test):
@ -12,31 +12,43 @@ class TestParams(Test):
Test.__init__(self)
self.name = "Parameters"
# helper functions
def __checkParamIsEqual(self, paramName, expectedValue, logdata):
value = logdata.parameters[paramName]
if value != expectedValue:
self.result.status = TestResult.StatusType.FAIL
self.result.statusMessage = self.result.statusMessage + "%s set to %s, expecting %s\n" % (paramName, repr(value), repr(expectedValue))
self.result.statusMessage = self.result.statusMessage + "%s set to %s, expecting %s\n" % (
paramName,
repr(value),
repr(expectedValue),
)
def __checkParamIsLessThan(self, paramName, maxValue, logdata):
value = logdata.parameters[paramName]
if value >= maxValue:
self.result.status = TestResult.StatusType.FAIL
self.result.statusMessage = self.result.statusMessage + "%s set to %s, expecting less than %s\n" % (paramName, repr(value), repr(maxValue))
self.result.statusMessage = self.result.statusMessage + "%s set to %s, expecting less than %s\n" % (
paramName,
repr(value),
repr(maxValue),
)
def __checkParamIsMoreThan(self, paramName, minValue, logdata):
value = logdata.parameters[paramName]
if value <= minValue:
self.result.status = TestResult.StatusType.FAIL
self.result.statusMessage = self.result.statusMessage + "%s set to %s, expecting less than %s\n" % (paramName, repr(value), repr(minValue))
self.result.statusMessage = self.result.statusMessage + "%s set to %s, expecting less than %s\n" % (
paramName,
repr(value),
repr(minValue),
)
def run(self, logdata, verbose):
self.result = TestResult()
self.result.status = TestResult.StatusType.GOOD # GOOD by default, tests below will override it if they fail
# check all params for NaN
for name,value in logdata.parameters.items():
for name, value in logdata.parameters.items():
if math.isnan(value):
self.result.status = TestResult.StatusType.FAIL
self.result.statusMessage = self.result.statusMessage + name + " is NaN\n"
@ -45,11 +57,11 @@ class TestParams(Test):
# add parameter checks below using the helper functions, any failures will trigger a FAIL status and accumulate info in statusMessage
# if more complex checking or correlations are required you can access parameter values directly using the logdata.parameters[paramName] dict
if logdata.vehicleType == VehicleType.Copter:
self.__checkParamIsEqual ("MAG_ENABLE", 1, logdata)
self.__checkParamIsEqual("MAG_ENABLE", 1, logdata)
if "THR_MIN" in logdata.parameters:
self.__checkParamIsLessThan("THR_MIN", 200, logdata)
self.__checkParamIsLessThan("THR_MID", 701, logdata)
self.__checkParamIsMoreThan("THR_MID", 299, logdata)
self.__checkParamIsLessThan("THR_MIN", 200, logdata)
self.__checkParamIsLessThan("THR_MID", 701, logdata)
self.__checkParamIsMoreThan("THR_MID", 299, logdata)
# TODO: add more parameter tests, these are just an example...
elif logdata.vehicleType == VehicleType.Plane:
# TODO: add parameter checks for plane...

View File

@ -1,9 +1,10 @@
from __future__ import print_function
from LogAnalyzer import Test, TestResult
import DataflashLog
from LogAnalyzer import Test, TestResult
from VehicleType import VehicleType
class TestPerformance(Test):
'''check performance monitoring messages (PM) for issues with slow loops, etc'''
@ -29,7 +30,7 @@ class TestPerformance(Test):
# ignoreMaxTLines = []
# for maxT in logdata.channels["PM"]["MaxT"].listData:
# if not armingLines:
# break
# break
# if maxT[0] > armingLines[0]:
# #print("Ignoring maxT from line %d, as it is the first PM line after arming on line %d" % (maxT[0],armingLines[0]))
# ignoreMaxTLines.append(maxT[0])
@ -45,22 +46,28 @@ class TestPerformance(Test):
maxPercentSlowLine = 0
slowLoopLineCount = 0
for i in range(len(logdata.channels["PM"]["NLon"].listData)):
(line, nLon) = logdata.channels["PM"]["NLon"].listData[i]
(line, nLon) = logdata.channels["PM"]["NLon"].listData[i]
(line, nLoop) = logdata.channels["PM"]["NLoop"].listData[i]
(line, maxT) = logdata.channels["PM"]["MaxT"].listData[i]
(line, maxT) = logdata.channels["PM"]["MaxT"].listData[i]
percentSlow = (nLon / float(nLoop)) * 100
if percentSlow > 6.0:
slowLoopLineCount = slowLoopLineCount + 1
if percentSlow > maxPercentSlow:
maxPercentSlow = percentSlow
maxPercentSlowLine = line
#if (maxT > 13000) and line not in ignoreMaxTLines:
# if (maxT > 13000) and line not in ignoreMaxTLines:
# print("MaxT of %d detected on line %d" % (maxT,line))
if (maxPercentSlow > 10) or (slowLoopLineCount > 6):
self.result.status = TestResult.StatusType.FAIL
self.result.statusMessage = "%d slow loop lines found, max %.2f%% on line %d" % (slowLoopLineCount,maxPercentSlow,maxPercentSlowLine)
elif (maxPercentSlow > 6):
self.result.statusMessage = "%d slow loop lines found, max %.2f%% on line %d" % (
slowLoopLineCount,
maxPercentSlow,
maxPercentSlowLine,
)
elif maxPercentSlow > 6:
self.result.status = TestResult.StatusType.WARN
self.result.statusMessage = "%d slow loop lines found, max %.2f%% on line %d" % (slowLoopLineCount,maxPercentSlow,maxPercentSlowLine)
self.result.statusMessage = "%d slow loop lines found, max %.2f%% on line %d" % (
slowLoopLineCount,
maxPercentSlow,
maxPercentSlowLine,
)

View File

@ -1,18 +1,19 @@
from LogAnalyzer import Test,TestResult
import DataflashLog
from VehicleType import VehicleType
import collections
import DataflashLog
from LogAnalyzer import Test, TestResult
from VehicleType import VehicleType
class TestPitchRollCoupling(Test):
'''test for divergence between input and output pitch/roll, i.e. mechanical failure or bad PID tuning'''
# TODO: currently we're only checking for roll/pitch outside of max lean angle, will come back later to analyze roll/pitch in versus out values
def __init__(self):
Test.__init__(self)
self.name = "Pitch/Roll"
self.enable = True # TEMP
self.enable = True # TEMP
def run(self, logdata, verbose):
self.result = TestResult()
@ -38,104 +39,121 @@ class TestPitchRollCoupling(Test):
self.ctun_baralt_att = 'BAlt'
# figure out where each mode begins and ends, so we can treat auto and manual modes differently and ignore acro/tune modes
autoModes = ["RTL",
"AUTO",
"LAND",
"LOITER",
"GUIDED",
"CIRCLE",
"OF_LOITER",
"POSHOLD",
"BRAKE",
"AVOID_ADSB",
"GUIDED_NOGPS",
"SMARTRTL"]
autoModes = [
"RTL",
"AUTO",
"LAND",
"LOITER",
"GUIDED",
"CIRCLE",
"OF_LOITER",
"POSHOLD",
"BRAKE",
"AVOID_ADSB",
"GUIDED_NOGPS",
"SMARTRTL",
]
# use CTUN RollIn/DesRoll + PitchIn/DesPitch
manualModes = ["STABILIZE", "DRIFT", "ALTHOLD", "ALT_HOLD", "POSHOLD"]
# ignore data from these modes:
ignoreModes = ["ACRO", "SPORT", "FLIP", "AUTOTUNE","", "THROW",]
autoSegments = [] # list of (startLine,endLine) pairs
ignoreModes = [
"ACRO",
"SPORT",
"FLIP",
"AUTOTUNE",
"",
"THROW",
]
autoSegments = [] # list of (startLine,endLine) pairs
manualSegments = [] # list of (startLine,endLine) pairs
orderedModes = collections.OrderedDict(sorted(logdata.modeChanges.items(), key=lambda t: t[0]))
isAuto = False # we always start in a manual control mode
isAuto = False # we always start in a manual control mode
prevLine = 0
mode = ""
for line,modepair in orderedModes.items():
for line, modepair in orderedModes.items():
mode = modepair[0].upper()
if prevLine == 0:
prevLine = line
if mode in autoModes:
if not isAuto:
manualSegments.append((prevLine,line-1))
manualSegments.append((prevLine, line - 1))
prevLine = line
isAuto = True
elif mode in manualModes:
if isAuto:
autoSegments.append((prevLine,line-1))
autoSegments.append((prevLine, line - 1))
prevLine = line
isAuto = False
elif mode in ignoreModes:
if isAuto:
autoSegments.append((prevLine,line-1))
autoSegments.append((prevLine, line - 1))
else:
manualSegments.append((prevLine,line-1))
manualSegments.append((prevLine, line - 1))
prevLine = 0
else:
raise Exception("Unknown mode in TestPitchRollCoupling: %s" % mode)
# and handle the last segment, which doesn't have an ending
if mode in autoModes:
autoSegments.append((prevLine,logdata.lineCount))
autoSegments.append((prevLine, logdata.lineCount))
elif mode in manualModes:
manualSegments.append((prevLine,logdata.lineCount))
manualSegments.append((prevLine, logdata.lineCount))
# figure out max lean angle, the ANGLE_MAX param was added in AC3.1
maxLeanAngle = 45.0
if "ANGLE_MAX" in logdata.parameters:
maxLeanAngle = logdata.parameters["ANGLE_MAX"] / 100.0
maxLeanAngleBuffer = 10 # allow a buffer margin
maxLeanAngleBuffer = 10 # allow a buffer margin
# ignore anything below this altitude, to discard any data while not flying
minAltThreshold = 2.0
# look through manual+auto flight segments
# TODO: filter to ignore single points outside range?
(maxRoll, maxRollLine) = (0.0, 0)
(maxRoll, maxRollLine) = (0.0, 0)
(maxPitch, maxPitchLine) = (0.0, 0)
for (startLine,endLine) in manualSegments+autoSegments:
for (startLine, endLine) in manualSegments + autoSegments:
# quick up-front test, only fallover into more complex line-by-line check if max()>threshold
rollSeg = logdata.channels["ATT"]["Roll"].getSegment(startLine,endLine)
pitchSeg = logdata.channels["ATT"]["Pitch"].getSegment(startLine,endLine)
rollSeg = logdata.channels["ATT"]["Roll"].getSegment(startLine, endLine)
pitchSeg = logdata.channels["ATT"]["Pitch"].getSegment(startLine, endLine)
if not rollSeg.dictData and not pitchSeg.dictData:
continue
# check max roll+pitch for any time where relative altitude is above minAltThreshold
roll = max(abs(rollSeg.min()), abs(rollSeg.max()))
roll = max(abs(rollSeg.min()), abs(rollSeg.max()))
pitch = max(abs(pitchSeg.min()), abs(pitchSeg.max()))
if (roll>(maxLeanAngle+maxLeanAngleBuffer) and abs(roll)>abs(maxRoll)) or (pitch>(maxLeanAngle+maxLeanAngleBuffer) and abs(pitch)>abs(maxPitch)):
if (roll > (maxLeanAngle + maxLeanAngleBuffer) and abs(roll) > abs(maxRoll)) or (
pitch > (maxLeanAngle + maxLeanAngleBuffer) and abs(pitch) > abs(maxPitch)
):
lit = DataflashLog.LogIterator(logdata, startLine)
assert(lit.currentLine == startLine)
assert lit.currentLine == startLine
while lit.currentLine <= endLine:
relativeAlt = lit["CTUN"][self.ctun_baralt_att]
if relativeAlt > minAltThreshold:
roll = lit["ATT"]["Roll"]
roll = lit["ATT"]["Roll"]
pitch = lit["ATT"]["Pitch"]
if abs(roll)>(maxLeanAngle+maxLeanAngleBuffer) and abs(roll)>abs(maxRoll):
if abs(roll) > (maxLeanAngle + maxLeanAngleBuffer) and abs(roll) > abs(maxRoll):
maxRoll = roll
maxRollLine = lit.currentLine
if abs(pitch)>(maxLeanAngle+maxLeanAngleBuffer) and abs(pitch)>abs(maxPitch):
if abs(pitch) > (maxLeanAngle + maxLeanAngleBuffer) and abs(pitch) > abs(maxPitch):
maxPitch = pitch
maxPitchLine = lit.currentLine
next(lit)
# check for breaking max lean angles
if maxRoll and abs(maxRoll)>abs(maxPitch):
if maxRoll and abs(maxRoll) > abs(maxPitch):
self.result.status = TestResult.StatusType.FAIL
self.result.statusMessage = "Roll (%.2f, line %d) > maximum lean angle (%.2f)" % (maxRoll, maxRollLine, maxLeanAngle)
self.result.statusMessage = "Roll (%.2f, line %d) > maximum lean angle (%.2f)" % (
maxRoll,
maxRollLine,
maxLeanAngle,
)
return
if maxPitch:
self.result.status = TestResult.StatusType.FAIL
self.result.statusMessage = "Pitch (%.2f, line %d) > maximum lean angle (%.2f)" % (maxPitch, maxPitchLine, maxLeanAngle)
self.result.statusMessage = "Pitch (%.2f, line %d) > maximum lean angle (%.2f)" % (
maxPitch,
maxPitchLine,
maxLeanAngle,
)
return
# TODO: use numpy/scipy to check Roll+RollIn curves for fitness (ignore where we're not airborne)
# ...

View File

@ -1,7 +1,7 @@
from __future__ import print_function
from LogAnalyzer import Test, TestResult
import DataflashLog
from LogAnalyzer import Test, TestResult
from VehicleType import VehicleType
@ -11,7 +11,7 @@ class TestThrust(Test):
def __init__(self):
Test.__init__(self)
self.name = "Thrust"
def run(self, logdata, verbose):
self.result = TestResult()
self.result.status = TestResult.StatusType.GOOD
@ -42,7 +42,7 @@ class TestThrust(Test):
# check for throttle (CTUN.ThrOut) above 700 for a chunk of time with copter not rising
highThrottleThreshold = 700
tiltThreshold = 20 # ignore high throttle when roll or tilt is above this value
tiltThreshold = 20 # ignore high throttle when roll or tilt is above this value
climbThresholdWARN = 100
climbThresholdFAIL = 50
minSampleLength = 50
@ -52,21 +52,21 @@ class TestThrust(Test):
# find any contiguous chunks where CTUN.ThrOut > highThrottleThreshold, ignore high throttle if tilt > tiltThreshold, and discard any segments shorter than minSampleLength
start = None
data = logdata.channels["CTUN"][throut_key].listData
for i in range(0,len(data)):
(lineNumber,value) = data[i]
for i in range(0, len(data)):
(lineNumber, value) = data[i]
isBelowTiltThreshold = True
if value > highThrottleThreshold:
(roll,meh) = logdata.channels["ATT"]["Roll"].getNearestValue(lineNumber)
(pitch,meh) = logdata.channels["ATT"]["Pitch"].getNearestValue(lineNumber)
(roll, meh) = logdata.channels["ATT"]["Roll"].getNearestValue(lineNumber)
(pitch, meh) = logdata.channels["ATT"]["Pitch"].getNearestValue(lineNumber)
if (abs(roll) > tiltThreshold) or (abs(pitch) > tiltThreshold):
isBelowTiltThreshold = False
if (value > highThrottleThreshold) and isBelowTiltThreshold:
if start == None:
start = i
elif start != None:
if (i-start) > minSampleLength:
#print("Found high throttle chunk from line %d to %d (%d samples)" % (data[start][0],data[i][0],i-start+1))
highThrottleSegments.append((start,i))
if (i - start) > minSampleLength:
# print("Found high throttle chunk from line %d to %d (%d samples)" % (data[start][0],data[i][0],i-start+1))
highThrottleSegments.append((start, i))
start = None
climbRate = "CRate"
@ -76,16 +76,13 @@ class TestThrust(Test):
# loop through each checking climbRate, if < 50 FAIL, if < 100 WARN
# TODO: we should filter climbRate and use its slope rather than value for this test
for seg in highThrottleSegments:
(startLine,endLine) = (data[seg[0]][0], data[seg[1]][0])
avgClimbRate = logdata.channels["CTUN"][climbRate].getSegment(startLine,endLine).avg()
avgThrOut = logdata.channels["CTUN"][throut_key].getSegment(startLine,endLine).avg()
(startLine, endLine) = (data[seg[0]][0], data[seg[1]][0])
avgClimbRate = logdata.channels["CTUN"][climbRate].getSegment(startLine, endLine).avg()
avgThrOut = logdata.channels["CTUN"][throut_key].getSegment(startLine, endLine).avg()
if avgClimbRate < climbThresholdFAIL:
self.result.status = TestResult.StatusType.FAIL
self.result.statusMessage = "Avg climb rate %.2f cm/s for throttle avg %d" % (avgClimbRate,avgThrOut)
self.result.statusMessage = "Avg climb rate %.2f cm/s for throttle avg %d" % (avgClimbRate, avgThrOut)
return
if avgClimbRate < climbThresholdWARN:
self.result.status = TestResult.StatusType.WARN
self.result.statusMessage = "Avg climb rate %.2f cm/s for throttle avg %d" % (avgClimbRate,avgThrOut)
self.result.statusMessage = "Avg climb rate %.2f cm/s for throttle avg %d" % (avgClimbRate, avgThrOut)

View File

@ -1,8 +1,8 @@
from LogAnalyzer import Test,TestResult
import DataflashLog
import collections
import DataflashLog
from LogAnalyzer import Test, TestResult
class TestVCC(Test):
'''test for VCC within recommendations, or abrupt end to log in flight'''
@ -22,21 +22,23 @@ class TestVCC(Test):
# just a naive min/max test for now
try:
vccMin = logdata.channels["CURR"]["Vcc"].min()
vccMax = logdata.channels["CURR"]["Vcc"].max()
vccMin = logdata.channels["CURR"]["Vcc"].min()
vccMax = logdata.channels["CURR"]["Vcc"].max()
except KeyError as e:
vccMin = logdata.channels["POWR"]["Vcc"].min()
vccMax = logdata.channels["POWR"]["Vcc"].max()
vccMin = logdata.channels["POWR"]["Vcc"].min()
vccMax = logdata.channels["POWR"]["Vcc"].max()
vccMin *= 1000
vccMax *= 1000
vccDiff = vccMax - vccMin;
vccMinThreshold = 4.6 * 1000;
vccMaxDiff = 0.3 * 1000;
vccDiff = vccMax - vccMin
vccMinThreshold = 4.6 * 1000
vccMaxDiff = 0.3 * 1000
if vccDiff > vccMaxDiff:
self.result.status = TestResult.StatusType.WARN
self.result.statusMessage = "VCC min/max diff %sv, should be <%sv" % (vccDiff/1000.0, vccMaxDiff/1000.0)
self.result.statusMessage = "VCC min/max diff %sv, should be <%sv" % (vccDiff / 1000.0, vccMaxDiff / 1000.0)
elif vccMin < vccMinThreshold:
self.result.status = TestResult.StatusType.FAIL
self.result.statusMessage = "VCC below minimum of %sv (%sv)" % (repr(vccMinThreshold/1000.0),repr(vccMin/1000.0))
self.result.statusMessage = "VCC below minimum of %sv (%sv)" % (
repr(vccMinThreshold / 1000.0),
repr(vccMin / 1000.0),
)

View File

@ -1,10 +1,9 @@
from __future__ import print_function
from LogAnalyzer import Test, TestResult
import DataflashLog
from VehicleType import VehicleType
import numpy
from LogAnalyzer import Test, TestResult
from VehicleType import VehicleType
class TestVibration(Test):
@ -14,7 +13,6 @@ class TestVibration(Test):
Test.__init__(self)
self.name = "Vibration"
def run(self, logdata, verbose):
self.result = TestResult()
@ -23,11 +21,11 @@ class TestVibration(Test):
return
# constants
gravity = -9.81
gravity = -9.81
aimRangeWarnXY = 1.5
aimRangeFailXY = 3.0
aimRangeWarnZ = 2.0 # gravity +/- aim range
aimRangeFailZ = 5.0 # gravity +/- aim range
aimRangeWarnZ = 2.0 # gravity +/- aim range
aimRangeFailZ = 5.0 # gravity +/- aim range
if not "IMU" in logdata.channels:
self.result.status = TestResult.StatusType.UNKNOWN
@ -38,33 +36,39 @@ class TestVibration(Test):
chunks = DataflashLog.DataflashLogHelper.findLoiterChunks(logdata, minLengthSeconds=10, noRCInputs=True)
if not chunks:
self.result.status = TestResult.StatusType.UNKNOWN
self.result.statusMessage = "No stable LOITER log data found"
self.result.statusMessage = "No stable LOITER log data found"
return
# for now we'll just use the first (largest) chunk of LOITER data
# TODO: ignore the first couple of secs to avoid bad data during transition - or can we check more analytically that we're stable?
# TODO: accumulate all LOITER chunks over min size, or just use the largest one?
startLine = chunks[0][0]
endLine = chunks[0][1]
#print("TestVibration using LOITER chunk from lines %s to %s" % (repr(startLine), repr(endLine)))
endLine = chunks[0][1]
# print("TestVibration using LOITER chunk from lines %s to %s" % (repr(startLine), repr(endLine)))
def getStdDevIMU(logdata, channelName, startLine,endLine):
loiterData = logdata.channels["IMU"][channelName].getSegment(startLine,endLine)
numpyData = numpy.array(loiterData.dictData.values())
def getStdDevIMU(logdata, channelName, startLine, endLine):
loiterData = logdata.channels["IMU"][channelName].getSegment(startLine, endLine)
numpyData = numpy.array(loiterData.dictData.values())
return numpy.std(numpyData)
# use 2x standard deviations as the metric, so if 95% of samples lie within the aim range we're good
stdDevX = abs(2 * getStdDevIMU(logdata,"AccX",startLine,endLine))
stdDevY = abs(2 * getStdDevIMU(logdata,"AccY",startLine,endLine))
stdDevZ = abs(2 * getStdDevIMU(logdata,"AccZ",startLine,endLine))
stdDevX = abs(2 * getStdDevIMU(logdata, "AccX", startLine, endLine))
stdDevY = abs(2 * getStdDevIMU(logdata, "AccY", startLine, endLine))
stdDevZ = abs(2 * getStdDevIMU(logdata, "AccZ", startLine, endLine))
if (stdDevX > aimRangeFailXY) or (stdDevY > aimRangeFailXY) or (stdDevZ > aimRangeFailZ):
self.result.status = TestResult.StatusType.FAIL
self.result.statusMessage = "Vibration too high (X:%.2fg, Y:%.2fg, Z:%.2fg)" % (stdDevX,stdDevY,stdDevZ)
self.result.statusMessage = "Vibration too high (X:%.2fg, Y:%.2fg, Z:%.2fg)" % (stdDevX, stdDevY, stdDevZ)
elif (stdDevX > aimRangeWarnXY) or (stdDevY > aimRangeWarnXY) or (stdDevZ > aimRangeWarnZ):
self.result.status = TestResult.StatusType.WARN
self.result.statusMessage = "Vibration slightly high (X:%.2fg, Y:%.2fg, Z:%.2fg)" % (stdDevX,stdDevY,stdDevZ)
self.result.statusMessage = "Vibration slightly high (X:%.2fg, Y:%.2fg, Z:%.2fg)" % (
stdDevX,
stdDevY,
stdDevZ,
)
else:
self.result.status = TestResult.StatusType.GOOD
self.result.statusMessage = "Good vibration values (X:%.2fg, Y:%.2fg, Z:%.2fg)" % (stdDevX,stdDevY,stdDevZ)
self.result.statusMessage = "Good vibration values (X:%.2fg, Y:%.2fg, Z:%.2fg)" % (
stdDevX,
stdDevY,
stdDevZ,
)