Tools: LogAnalyzer: remove

the web-based tools are supplanting this
This commit is contained in:
Peter Barker 2024-08-14 09:09:51 +10:00 committed by Andrew Tridgell
parent faf769d8cd
commit bdea9be7fb
47 changed files with 0 additions and 33083 deletions

View File

@ -26,7 +26,6 @@ on:
- 'Tools/gittools/**'
- 'Tools/Hello/**'
- 'Tools/IO_Firmware/**'
- 'Tools/LogAnalyzer/**'
- 'Tools/mavproxy_modules/**'
- 'Tools/Pozyx/**'
- 'Tools/PrintVersion.py'

View File

@ -21,7 +21,6 @@ on:
- 'Tools/Hello/**'
- 'Tools/IO_Firmware/**'
- 'Tools/Linux_HAL_Essentials/**'
- 'Tools/LogAnalyzer/**'
- 'Tools/Pozyx/**'
- 'Tools/PrintVersion.py'
- 'Tools/Replay/**'

View File

@ -29,7 +29,6 @@ on:
- 'Tools/Hello/**'
- 'Tools/IO_Firmware/**'
- 'Tools/Linux_HAL_Essentials/**'
- 'Tools/LogAnalyzer/**'
- 'Tools/mavproxy_modules/**'
- 'Tools/Pozyx/**'
- 'Tools/PrintVersion.py'

View File

@ -27,7 +27,6 @@ on:
- 'Tools/gittools/**'
- 'Tools/Hello/**'
- 'Tools/Linux_HAL_Essentials/**'
- 'Tools/LogAnalyzer/**'
- 'Tools/mavproxy_modules/**'
- 'Tools/Pozyx/**'
- 'Tools/PrintVersion.py'

View File

@ -21,7 +21,6 @@ on:
- 'Tools/Hello/**'
- 'Tools/IO_Firmware/**'
- 'Tools/Linux_HAL_Essentials/**'
- 'Tools/LogAnalyzer/**'
- 'Tools/Pozyx/**'
- 'Tools/PrintVersion.py'
- 'Tools/Replay/**'

View File

@ -25,7 +25,6 @@ on:
- 'Tools/GIT_Test/**'
- 'Tools/gittools/**'
- 'Tools/Hello/**'
- 'Tools/LogAnalyzer/**'
- 'Tools/mavproxy_modules/**'
- 'Tools/Pozyx/**'
- 'Tools/PrintVersion.py'

View File

@ -19,7 +19,6 @@ on:
- 'Tools/GIT_Test/**'
- 'Tools/gittools/**'
- 'Tools/Hello/**'
- 'Tools/LogAnalyzer/**'
- 'Tools/mavproxy_modules/**'
- 'Tools/Pozyx/**'
- 'Tools/PrintVersion.py'

View File

@ -26,7 +26,6 @@ on:
- 'Tools/gittools/**'
- 'Tools/Hello/**'
- 'Tools/IO_Firmware/**'
- 'Tools/LogAnalyzer/**'
- 'Tools/mavproxy_modules/**'
- 'Tools/Pozyx/**'
- 'Tools/PrintVersion.py'

View File

@ -24,7 +24,6 @@ on:
- 'Tools/gittools/**'
- 'Tools/Hello/**'
- 'Tools/IO_Firmware/**'
- 'Tools/LogAnalyzer/**'
- 'Tools/mavproxy_modules/**'
- 'Tools/Pozyx/**'
- 'Tools/PrintVersion.py'

View File

@ -31,7 +31,6 @@ on:
- 'Tools/Hello/**'
- 'Tools/IO_Firmware/**'
- 'Tools/Linux_HAL_Essentials/**'
- 'Tools/LogAnalyzer/**'
- 'Tools/mavproxy_modules/**'
- 'Tools/Pozyx/**'
- 'Tools/PrintVersion.py'

View File

@ -30,7 +30,6 @@ on:
- 'Tools/Hello/**'
- 'Tools/IO_Firmware/**'
- 'Tools/Linux_HAL_Essentials/**'
- 'Tools/LogAnalyzer/**'
- 'Tools/Pozyx/**'
- 'Tools/PrintVersion.py'
- 'Tools/Replay/**'

View File

@ -30,7 +30,6 @@ on:
- 'Tools/Hello/**'
- 'Tools/IO_Firmware/**'
- 'Tools/Linux_HAL_Essentials/**'
- 'Tools/LogAnalyzer/**'
- 'Tools/Pozyx/**'
- 'Tools/PrintVersion.py'
- 'Tools/simulink/**'

View File

@ -28,7 +28,6 @@ on:
- 'Tools/Hello/**'
- 'Tools/IO_Firmware/**'
- 'Tools/Linux_HAL_Essentials/**'
- 'Tools/LogAnalyzer/**'
- 'Tools/Pozyx/**'
- 'Tools/PrintVersion.py'
- 'Tools/Replay/**'

View File

@ -30,7 +30,6 @@ on:
- 'Tools/Hello/**'
- 'Tools/IO_Firmware/**'
- 'Tools/Linux_HAL_Essentials/**'
- 'Tools/LogAnalyzer/**'
- 'Tools/Pozyx/**'
- 'Tools/PrintVersion.py'
- 'Tools/Replay/**'

View File

@ -30,7 +30,6 @@ on:
- 'Tools/Hello/**'
- 'Tools/IO_Firmware/**'
- 'Tools/Linux_HAL_Essentials/**'
- 'Tools/LogAnalyzer/**'
- 'Tools/Pozyx/**'
- 'Tools/PrintVersion.py'
- 'Tools/Replay/**'

View File

@ -30,7 +30,6 @@ on:
- 'Tools/Hello/**'
- 'Tools/IO_Firmware/**'
- 'Tools/Linux_HAL_Essentials/**'
- 'Tools/LogAnalyzer/**'
- 'Tools/Pozyx/**'
- 'Tools/PrintVersion.py'
- 'Tools/Replay/**'

View File

@ -30,7 +30,6 @@ on:
- 'Tools/Hello/**'
- 'Tools/IO_Firmware/**'
- 'Tools/Linux_HAL_Essentials/**'
- 'Tools/LogAnalyzer/**'
- 'Tools/Pozyx/**'
- 'Tools/PrintVersion.py'
- 'Tools/Replay/**'

View File

@ -37,7 +37,6 @@ on:
- 'Tools/gittools/**'
- 'Tools/Hello/**'
- 'Tools/Linux_HAL_Essentials/**'
- 'Tools/LogAnalyzer/**'
- 'Tools/mavproxy_modules/**'
- 'Tools/Pozyx/**'
- 'Tools/PrintVersion.py'

View File

@ -33,7 +33,6 @@ on:
- 'Tools/gittools/**'
- 'Tools/Hello/**'
- 'Tools/Linux_HAL_Essentials/**'
- 'Tools/LogAnalyzer/**'
- 'Tools/mavproxy_modules/**'
- 'Tools/Pozyx/**'
- 'Tools/PrintVersion.py'

View File

@ -1,820 +0,0 @@
#
# Code to abstract the parsing of APM Dataflash log files, currently only used by the LogAnalyzer
#
# Initial code by Andrew Chapman (amchapman@gmail.com), 16th Jan 2014
#
# AP_FLAKE8_CLEAN
from __future__ import print_function, division
import bisect
import ctypes
import sys
import numpy
from VehicleType import VehicleType, VehicleTypeString
class Format(object):
'''Data channel format as specified by the FMT lines in the log file'''
def __init__(self, msgType, msgLen, name, types, labels):
self.NAME = 'FMT'
self.msgType = msgType
self.msgLen = msgLen
self.name = name
self.types = types
self.labels = labels.split(',')
def __str__(self):
return "%8s %s" % (self.name, repr(self.labels))
@staticmethod
def trycastToFormatType(value, valueType):
"""
Using format characters from libraries/DataFlash/DataFlash.h to cast strings to basic python int/float/string
types tries a cast, if it does not work, well, acceptable as the text logs do not match the format, e.g. MODE is
expected to be int
"""
try:
if valueType in "fcCeELd":
return float(value)
elif valueType in "bBhHiIMQq":
return int(value)
elif valueType in "nNZ":
return str(value)
except ValueError:
pass
return value
def to_class(self):
members = dict(
NAME=self.name,
labels=self.labels[:],
)
fieldtypes = [i for i in self.types]
fieldlabels = self.labels[:]
# field access
for (label, _type) in zip(fieldlabels, fieldtypes):
def createproperty(name, format):
# extra scope for variable sanity
# scaling via _NAME and def NAME(self): return self._NAME / SCALE
propertyname = name
attributename = '_' + name
p = property(
lambda x: getattr(x, attributename),
lambda x, v: setattr(x, attributename, Format.trycastToFormatType(v, format)),
)
members[propertyname] = p
members[attributename] = None
createproperty(label, _type)
# repr shows all values but the header
members['__repr__'] = lambda x: "<{cls} {data}>".format(
cls=x.__class__.__name__, data=' '.join(["{}:{}".format(k, getattr(x, '_' + k)) for k in x.labels])
)
def init(a, *x):
if len(x) != len(a.labels):
raise ValueError("Invalid Length")
for (l, v) in zip(a.labels, x):
try:
setattr(a, l, v)
except Exception as e:
print("{} {} {} failed".format(a, l, v))
print(e)
members['__init__'] = init
# finally, create the class
cls = type('Log__{:s}'.format(self.name), (object,), members)
return cls
class logheader(ctypes.LittleEndianStructure):
_fields_ = [
('head1', ctypes.c_uint8),
('head2', ctypes.c_uint8),
('msgid', ctypes.c_uint8),
]
def __repr__(self):
return "<logheader head1=0x{self.head1:x} head2=0x{self.head2:x} msgid=0x{self.msgid:x} ({self.msgid})>".format(
self=self
)
class BinaryFormat(ctypes.LittleEndianStructure):
NAME = 'FMT'
MSG = 128
SIZE = 0
FIELD_FORMAT = {
'a': ctypes.c_int16 * 32,
'b': ctypes.c_int8,
'B': ctypes.c_uint8,
'h': ctypes.c_int16,
'H': ctypes.c_uint16,
'i': ctypes.c_int32,
'I': ctypes.c_uint32,
'f': ctypes.c_float,
'd': ctypes.c_double,
'n': ctypes.c_char * 4,
'N': ctypes.c_char * 16,
'Z': ctypes.c_char * 64,
'c': ctypes.c_int16, # * 100,
'C': ctypes.c_uint16, # * 100,
'e': ctypes.c_int32, # * 100,
'E': ctypes.c_uint32, # * 100,
'L': ctypes.c_int32,
'M': ctypes.c_uint8,
'q': ctypes.c_int64,
'Q': ctypes.c_uint64,
}
FIELD_SCALE = {
'c': 100,
'C': 100,
'e': 100,
'E': 100,
}
_packed_ = True
_fields_ = [
('head', logheader),
('type', ctypes.c_uint8),
('length', ctypes.c_uint8),
('name', ctypes.c_char * 4),
('types', ctypes.c_char * 16),
('labels', ctypes.c_char * 64),
]
def __repr__(self):
return "<{cls} {data}>".format(
cls=self.__class__.__name__,
data=' '.join(["{}:{}".format(k, getattr(self, k)) for (k, _) in self._fields_[1:]]),
)
def to_class(self):
labels = self.labels.decode(encoding="utf-8") if self.labels else ""
members = dict(
NAME=self.name.decode(encoding="utf-8"),
MSG=self.type,
SIZE=self.length,
labels=labels.split(","),
_pack_=True,
)
if isinstance(self.types[0], str):
fieldtypes = [i for i in self.types]
else:
fieldtypes = [chr(i) for i in self.types]
fieldlabels = members["labels"]
if self.labels and (len(fieldtypes) != len(fieldlabels)):
print("Broken FMT message for {} .. ignoring".format(self.name), file=sys.stderr)
return None
fields = [('head', logheader)]
# field access
for (label, _type) in zip(fieldlabels, fieldtypes):
def createproperty(name, format):
# extra scope for variable sanity
# scaling via _NAME and def NAME(self): return self._NAME / SCALE
propertyname = name
attributename = '_' + name
scale = BinaryFormat.FIELD_SCALE.get(format, None)
def get_message_attribute(x):
ret = getattr(x, attributename)
if str(format) in ['Z', 'n', 'N']:
ret = ret.decode(encoding="utf-8")
return ret
p = property(get_message_attribute)
if scale is not None:
p = property(lambda x: getattr(x, attributename) / scale)
members[propertyname] = p
try:
fields.append((attributename, BinaryFormat.FIELD_FORMAT[format]))
except KeyError:
print('ERROR: Failed to add FMT type: {}, with format: {}'.format(attributename, format))
raise
createproperty(label, _type)
members['_fields_'] = fields
# repr shows all values but the header
members['__repr__'] = lambda x: "<{cls} {data}>".format(
cls=x.__class__.__name__, data=' '.join(["{}:{}".format(k, getattr(x, k)) for k in x.labels])
)
# finally, create the class
cls = type('Log__%s' % self.name, (ctypes.LittleEndianStructure,), members)
if ctypes.sizeof(cls) != cls.SIZE:
print("size mismatch for {} expected {} got {}".format(cls, ctypes.sizeof(cls), cls.SIZE), file=sys.stderr)
return None
return cls
BinaryFormat.SIZE = ctypes.sizeof(BinaryFormat)
class Channel(object):
'''storage for a single stream of data, i.e. all GPS.RelAlt values'''
# TODO: rethink data storage, but do more thorough regression testing before refactoring it
# TODO: store data as a scipy spline curve so we can more easily interpolate and sample the slope?
def __init__(self):
# store dupe data in dict and list for now, until we decide which is the better way to go
self.dictData = {} # dict of linenum->value
self.listData = [] # list of (linenum,value)
def getSegment(self, startLine, endLine):
'''returns a segment of this data (from startLine to endLine, inclusive) as a new Channel instance'''
segment = Channel()
segment.dictData = {k: v for k, v in self.dictData.items() if k >= startLine and k <= endLine}
segment.listData = [(k, v) for k, v in self.listData if k >= startLine and k <= endLine]
return segment
def min(self):
return min(self.dictData.values())
def max(self):
return max(self.dictData.values())
def avg(self):
return numpy.mean(self.dictData.values())
def getNearestValueFwd(self, lineNumber):
'''Returns (value,lineNumber)'''
index = bisect.bisect_left(self.listData, (lineNumber, -99999))
while index < len(self.listData):
line = self.listData[index][0]
if line >= lineNumber:
return (self.listData[index][1], line)
index += 1
raise ValueError("Error finding nearest value for line %d" % lineNumber)
def getNearestValueBack(self, lineNumber):
'''Returns (value,lineNumber)'''
index = bisect.bisect_left(self.listData, (lineNumber, -99999)) - 1
while index >= 0:
line = self.listData[index][0]
if line <= lineNumber:
return (self.listData[index][1], line)
index -= 1
raise ValueError("Error finding nearest value for line %d" % lineNumber)
def getNearestValue(self, lineNumber, lookForwards=True):
"""
Find the nearest data value to the given lineNumber, defaults to first looking forwards.
Returns (value,lineNumber)
"""
if lookForwards:
try:
return self.getNearestValueFwd(lineNumber)
except ValueError:
return self.getNearestValueBack(lineNumber)
else:
try:
return self.getNearestValueBack(lineNumber)
except ValueError:
return self.getNearestValueFwd(lineNumber)
raise Exception("Error finding nearest value for line %d" % lineNumber)
def getInterpolatedValue(self, lineNumber):
(prevValue, prevValueLine) = self.getNearestValue(lineNumber, lookForwards=False)
(nextValue, nextValueLine) = self.getNearestValue(lineNumber, lookForwards=True)
if prevValueLine == nextValueLine:
return prevValue
weight = (lineNumber - prevValueLine) / float(nextValueLine - prevValueLine)
return (weight * prevValue) + ((1 - weight) * nextValue)
def getIndexOf(self, lineNumber):
'''returns the index within this channel's listData of the given lineNumber, or raises an Exception if not found'''
index = bisect.bisect_left(self.listData, (lineNumber, -99999))
if self.listData[index][0] == lineNumber:
return index
else:
raise Exception("Error finding index for line %d" % lineNumber)
class LogIterator:
"""
Smart iterator that can move through a log by line number and maintain an index into the nearest values of all data
channels
"""
# TODO: LogIterator currently indexes the next available value rather than the nearest value, we should make it
# configurable between next/nearest
class LogIteratorSubValue:
'''syntactic sugar to allow access by LogIterator[lineLabel][dataLabel]'''
logdata = None
iterators = None
lineLabel = None
def __init__(self, logdata, iterators, lineLabel):
self.logdata = logdata
self.lineLabel = lineLabel
self.iterators = iterators
def __getitem__(self, dataLabel):
index = self.iterators[self.lineLabel][0]
return self.logdata.channels[self.lineLabel][dataLabel].listData[index][1]
iterators = {} # lineLabel -> (listIndex,lineNumber)
logdata = None
currentLine = None
def __init__(self, logdata, lineNumber=0):
self.logdata = logdata
self.currentLine = lineNumber
for lineLabel in self.logdata.formats:
if lineLabel in self.logdata.channels:
self.iterators[lineLabel] = ()
self.jump(lineNumber)
def __iter__(self):
return self
def __getitem__(self, lineLabel):
return LogIterator.LogIteratorSubValue(self.logdata, self.iterators, lineLabel)
def next(self):
'''increment iterator to next log line'''
self.currentLine += 1
if self.currentLine > self.logdata.lineCount:
return self
for lineLabel in self.iterators.keys():
# check if the currentLine has gone past our the line we're pointing to for this type of data
dataLabel = self.logdata.formats[lineLabel].labels[0]
(index, lineNumber) = self.iterators[lineLabel]
# if so, and it is not the last entry in the log, increment the indices for dataLabels under that lineLabel
if (self.currentLine > lineNumber) and (
index < len(self.logdata.channels[lineLabel][dataLabel].listData) - 1
):
index += 1
lineNumber = self.logdata.channels[lineLabel][dataLabel].listData[index][0]
self.iterators[lineLabel] = (index, lineNumber)
return self
__next__ = next
def jump(self, lineNumber):
'''jump iterator to specified log line'''
self.currentLine = lineNumber
for lineLabel in self.iterators.keys():
dataLabel = self.logdata.formats[lineLabel].labels[0]
(value, lineNumber) = self.logdata.channels[lineLabel][dataLabel].getNearestValue(self.currentLine)
self.iterators[lineLabel] = (self.logdata.channels[lineLabel][dataLabel].getIndexOf(lineNumber), lineNumber)
class DataflashLogHelper:
'''helper functions for dealing with log data, put here to keep DataflashLog class as a simple parser and data store'''
@staticmethod
def getTimeAtLine(logdata, lineNumber):
'''returns the nearest GPS timestamp in milliseconds after the given line number'''
if "GPS" not in logdata.channels:
raise Exception("no GPS log data found")
# older logs use 'TIme', newer logs use 'TimeMS'
# even newer logs use TimeUS
timeLabel = None
for possible in "TimeMS", "Time", "TimeUS":
if possible in logdata.channels["GPS"]:
timeLabel = possible
break
if timeLabel is None:
raise Exception("Unable to get time label")
while lineNumber <= logdata.lineCount:
if lineNumber in logdata.channels["GPS"][timeLabel].dictData:
return logdata.channels["GPS"][timeLabel].dictData[lineNumber]
lineNumber = lineNumber + 1
sys.stderr.write("didn't find GPS data for " + str(lineNumber) + " - using maxtime\n")
return logdata.channels["GPS"][timeLabel].max()
@staticmethod
def findLoiterChunks(logdata, minLengthSeconds=0, noRCInputs=True):
"""
Returns a list of (to, from) pairs defining sections of the log which are in loiter mode, ordered from longest
to shortest in time. If `noRCInputs == True` it only returns chunks with no control inputs
"""
# TODO: implement noRCInputs handling when identifying stable loiter chunks, for now we're ignoring it
def chunkSizeCompare(chunk1, chunk2):
chunk1Len = chunk1[1] - chunk1[0]
chunk2Len = chunk2[1] - chunk2[0]
if chunk1Len == chunk2Len:
return 0
elif chunk1Len > chunk2Len:
return -1
else:
return 1
changes = [{"line": k, "modeName": v[0], "modeNum": v[1]} for k, v in sorted(logdata.modeChanges.items())]
chunks = []
for i in range(len(changes)):
if changes[i]["modeName"] == "LOITER":
startLine = changes[i]["line"]
try:
endLine = changes[i + 1]["line"]
except IndexError:
endLine = logdata.lineCount
chunkTimeSeconds = (
DataflashLogHelper.getTimeAtLine(logdata, endLine)
- DataflashLogHelper.getTimeAtLine(logdata, startLine)
+ 1
) / 1000.0
if chunkTimeSeconds > minLengthSeconds:
chunks.append((startLine, endLine))
chunks.sort(key=lambda chunk: chunk[1] - chunk[0])
return chunks
@staticmethod
def isLogEmpty(logdata):
'''returns an human readable error string if the log is essentially empty, otherwise returns None'''
# naive check for now, see if the throttle output was ever above 20%
throttleThreshold = 20
if logdata.vehicleType == VehicleType.Copter:
throttleThreshold = 200 # copter uses 0-1000, plane+rover use 0-100
if "CTUN" in logdata.channels:
try:
maxThrottle = logdata.channels["CTUN"]["ThrOut"].max()
except KeyError:
# ThrOut was shorted to ThO at some stage...
maxThrottle = logdata.channels["CTUN"]["ThO"].max()
# at roughly the same time ThO became a range from 0 to 1
throttleThreshold = 0.2
if maxThrottle < throttleThreshold:
return "Throttle never above 20%"
return None
class DataflashLog(object):
"""
ArduPilot Dataflash log file reader and container class. Keep this simple, add more advanced or specific functions
to DataflashLogHelper class
"""
knownHardwareTypes = ["APM", "PX4", "MPNG"]
def __init__(self, logfile=None, format="auto", ignoreBadlines=False):
self.filename = None
self.vehicleType = None # from VehicleType enumeration; value derived from header
self.vehicleTypeString = None # set at same time has the enum value
self.firmwareVersion = ""
self.firmwareHash = ""
self.freeRAM = 0
self.hardwareType = "" # APM 1, APM 2, PX4, MPNG, etc What is VRBrain? BeagleBone, etc? Needs more testing
self.formats = {} # name -> Format
self.parameters = {} # token -> value
self.messages = {} # lineNum -> message
self.modeChanges = {} # lineNum -> (mode,value)
self.channels = {} # lineLabel -> {dataLabel:Channel}
self.filesizeKB = 0
self.durationSecs = 0
self.lineCount = 0
self.skippedLines = 0
self.backpatch_these_modechanges = []
self.frame = None
if logfile:
self.read(logfile, format, ignoreBadlines)
def getCopterType(self):
'''returns quad/hex/octo/tradheli if this is a copter log'''
if self.vehicleType != VehicleType.Copter:
return None
motLabels = []
if "MOT" in self.formats: # not listed in PX4 log header for some reason?
motLabels = self.formats["MOT"].labels
if "GGain" in motLabels:
return "tradheli"
elif len(motLabels) == 4:
return "quad"
elif len(motLabels) == 6:
return "hex"
elif len(motLabels) == 8:
return "octo"
else:
return ""
def num_motor_channels(self):
motor_channels_for_frame = {
"QUAD": 4,
"HEXA": 6,
"Y6": 6,
"OCTA": 8,
"OCTA_QUAD": 8,
"DECA": 10,
# "HELI": 1,
# "HELI_DUAL": 2,
"TRI": 3,
"SINGLE": 1,
"COAX": 2,
"TAILSITTER": 1,
"DODECA_HEXA": 12,
}
return motor_channels_for_frame[self.frame]
def read(self, logfile, format="auto", ignoreBadlines=False):
'''returns on successful log read (including bad lines if ignoreBadlines==True), will throw an Exception otherwise'''
# TODO: dataflash log parsing code is pretty hacky, should re-write more methodically
df_header = bytearray([0xA3, 0x95, 0x80, 0x80])
self.filename = logfile
if self.filename == '<stdin>':
f = sys.stdin
else:
f = open(self.filename, 'rb')
if format.lower() == 'bin':
head = df_header
elif format == 'log':
head = ""
elif format == 'auto':
if self.filename == '<stdin>':
# assuming TXT format
head = ""
else:
head = f.read(4)
f.seek(0)
else:
raise ValueError("Unknown log format for {}: {}".format(self.filename, format))
if head == df_header:
numBytes, lineNumber = self.read_binary(f, ignoreBadlines)
pass
else:
numBytes, lineNumber = self.read_text(f, ignoreBadlines)
# gather some general stats about the log
self.lineCount = lineNumber
self.filesizeKB = numBytes / 1024.0
# TODO: switch duration calculation to use TimeMS values rather than GPS timestemp
if "GPS" in self.channels:
# the GPS time label changed at some point, need to handle both
timeLabel = None
for i in 'TimeMS', 'TimeUS', 'Time':
if i in self.channels["GPS"]:
timeLabel = i
break
firstTimeGPS = int(self.channels["GPS"][timeLabel].listData[0][1])
lastTimeGPS = int(self.channels["GPS"][timeLabel].listData[-1][1])
if timeLabel == 'TimeUS':
firstTimeGPS /= 1000
lastTimeGPS /= 1000
self.durationSecs = (lastTimeGPS - firstTimeGPS) / 1000
# TODO: calculate logging rate based on timestamps
# ...
msg_vehicle_to_vehicle_map = {
"ArduCopter": VehicleType.Copter,
"APM:Copter": VehicleType.Copter,
"ArduPlane": VehicleType.Plane,
"ArduRover": VehicleType.Rover,
}
# takes the vehicle type supplied via "MSG" and sets vehicleType from
# the VehicleType enumeration
def set_vehicleType_from_MSG_vehicle(self, MSG_vehicle):
ret = self.msg_vehicle_to_vehicle_map.get(MSG_vehicle, None)
if ret is None:
raise ValueError("Unknown vehicle type (%s)" % (MSG_vehicle))
self.vehicleType = ret
self.vehicleTypeString = VehicleTypeString[ret]
def handleModeChange(self, lineNumber, e):
if self.vehicleType == VehicleType.Copter:
modes = {
0: 'STABILIZE',
1: 'ACRO',
2: 'ALT_HOLD',
3: 'AUTO',
4: 'GUIDED',
5: 'LOITER',
6: 'RTL',
7: 'CIRCLE',
9: 'LAND',
10: 'OF_LOITER',
11: 'DRIFT',
13: 'SPORT',
14: 'FLIP',
15: 'AUTOTUNE',
16: 'POSHOLD',
17: 'BRAKE',
18: 'THROW',
19: 'AVOID_ADSB',
20: 'GUIDED_NOGPS',
21: 'SMART_RTL',
}
try:
if hasattr(e, 'ThrCrs'):
self.modeChanges[lineNumber] = (modes[int(e.Mode)], e.ThrCrs)
else:
# assume it has ModeNum:
self.modeChanges[lineNumber] = (modes[int(e.Mode)], e.ModeNum)
except ValueError:
if hasattr(e, 'ThrCrs'):
self.modeChanges[lineNumber] = (e.Mode, e.ThrCrs)
else:
# some .log files have the name spelt out by name
# rather than number, contrary to the format
# string. Attempt to map that back to a number:
uppername = str(e.Mode).upper()
for num in modes:
if modes[num].upper() == uppername:
self.modeChanges[lineNumber] = (uppername, num)
return
# assume it has ModeNum:
print("Unknown mode=%u" % e.ModeNum)
self.modeChanges[lineNumber] = (e.Mode, "mode=%u" % e.ModeNum)
elif self.vehicleType in [VehicleType.Plane, VehicleType.Copter, VehicleType.Rover]:
self.modeChanges[lineNumber] = (e.Mode, e.ModeNum)
else:
# if you've gotten to here the chances are we don't
# know what vehicle you're flying...
raise Exception(
"Unknown log type for MODE line vehicletype=({}) line=({})".format(self.vehicleTypeString, repr(e))
)
def backPatchModeChanges(self):
for (lineNumber, e) in self.backpatch_these_modechanges:
self.handleModeChange(lineNumber, e)
def set_frame(self, frame):
self.frame = frame
def process(self, lineNumber, e):
if e.NAME == 'FMT':
cls = e.to_class()
if cls is not None: # FMT messages can be broken ...
if hasattr(e, 'type') and e.type not in self._formats: # binary log specific
self._formats[e.type] = cls
if cls.NAME not in self.formats:
self.formats[cls.NAME] = cls
elif e.NAME == "PARM":
self.parameters[e.Name] = e.Value
elif e.NAME == "MSG":
tokens = e.Message.split(' ')
if not self.frame:
if "Frame" in tokens[0]:
self.set_frame(tokens[1])
if not self.vehicleType:
try:
self.set_vehicleType_from_MSG_vehicle(tokens[0])
except ValueError:
return
self.backPatchModeChanges()
self.firmwareVersion = tokens[1]
if len(tokens) == 3:
self.firmwareHash = tokens[2][1:-1]
else:
self.messages[lineNumber] = e.Message
elif e.NAME == "MODE":
if self.vehicleType is None:
self.backpatch_these_modechanges.append((lineNumber, e))
else:
self.handleModeChange(lineNumber, e)
# anything else must be the log data
else:
groupName = e.NAME
# first time seeing this type of log line, create the channel storage
if groupName not in self.channels:
self.channels[groupName] = {}
for label in e.labels:
self.channels[groupName][label] = Channel()
# store each token in its relevant channel
for label in e.labels:
value = getattr(e, label)
channel = self.channels[groupName][label]
channel.dictData[lineNumber] = value
channel.listData.append((lineNumber, value))
def read_text(self, f, ignoreBadlines):
self.formats = {'FMT': Format}
lineNumber = 0
numBytes = 0
knownHardwareTypes = ["APM", "PX4", "MPNG"]
for line in f:
lineNumber = lineNumber + 1
numBytes += len(line) + 1
line = line.decode(encoding="utf-8")
try:
line = line.strip('\n\r')
tokens = line.split(', ')
# first handle the log header lines
if line == " Ready to drive." or line == " Ready to FLY.":
continue
if line == "----------------------------------------": # present in pre-3.0 logs
raise Exception(
"Log file seems to be in the older format (prior to self-describing logs), which isn't supported"
)
if len(tokens) == 1:
tokens2 = line.split(' ')
if line == "":
pass
elif len(tokens2) == 1 and tokens2[0].isdigit(): # log index
pass
elif len(tokens2) == 3 and tokens2[0] == "Free" and tokens2[1] == "RAM:":
self.freeRAM = int(tokens2[2])
elif tokens2[0] in knownHardwareTypes:
# not sure if we can parse this more usefully, for now only need to report it back verbatim
self.hardwareType = line
elif (len(tokens2) == 2 or len(tokens2) == 3) and tokens2[1][
0
].lower() == "v": # e.g. ArduCopter V3.1 (5c6503e2)
try:
self.set_vehicleType_from_MSG_vehicle(tokens2[0])
except ValueError:
pass
self.firmwareVersion = tokens2[1]
if len(tokens2) == 3:
self.firmwareHash = tokens2[2][1:-1]
else:
errorMsg = "Error parsing line %d of log file: %s" % (lineNumber, self.filename)
if ignoreBadlines:
print(errorMsg + " (skipping line)", file=sys.stderr)
self.skippedLines += 1
else:
raise Exception("")
else:
if tokens[0] not in self.formats:
raise ValueError("Unknown Format {}".format(tokens[0]))
e = self.formats[tokens[0]](*tokens[1:])
self.process(lineNumber, e)
except Exception as e:
print("BAD LINE: " + str(line), file=sys.stderr)
if not ignoreBadlines:
raise Exception(
"Error parsing line %d of log file %s - %s" % (lineNumber, self.filename, e.args[0])
)
return (numBytes, lineNumber)
def read_binary(self, f, ignoreBadlines):
lineNumber = 0
numBytes = 0
for e in self._read_binary(f, ignoreBadlines):
lineNumber += 1
if e is None:
continue
numBytes += e.SIZE
# print(e)
self.process(lineNumber, e)
return (numBytes, lineNumber)
def _read_binary(self, f, ignoreBadlines):
self._formats = {128: BinaryFormat}
data = bytearray(f.read())
offset = 0
while len(data) > offset + ctypes.sizeof(logheader):
h = logheader.from_buffer(data, offset)
if not (h.head1 == 0xA3 and h.head2 == 0x95):
if ignoreBadlines is False:
raise ValueError(h)
else:
if h.head1 == 0xFF and h.head2 == 0xFF and h.msgid == 0xFF:
print(
"Assuming EOF due to dataflash block tail filled with \\xff... (offset={off})".format(
off=offset
),
file=sys.stderr,
)
break
offset += 1
continue
if h.msgid in self._formats:
typ = self._formats[h.msgid]
if len(data) <= offset + typ.SIZE:
break
try:
e = typ.from_buffer(data, offset)
except ValueError:
print(
"data:{} offset:{} size:{} sizeof:{} sum:{}".format(
len(data), offset, typ.SIZE, ctypes.sizeof(typ), offset + typ.SIZE
)
)
raise
offset += typ.SIZE
else:
raise ValueError(str(h) + "unknown type")
yield e

View File

@ -1,313 +0,0 @@
#!/usr/bin/env python
#
# A module to analyze and identify any common problems which can be determined from log files
#
# Initial code by Andrew Chapman (amchapman@gmail.com), 16th Jan 2014
#
# AP_FLAKE8_CLEAN
# some logging oddities noticed while doing this, to be followed up on:
# - tradheli MOT labels Mot1,Mot2,Mot3,Mot4,GGain
# - Pixhawk doesn't output one of the FMT labels... forget which one
# - MAG offsets seem to be constant (only seen data on Pixhawk)
# - MAG offsets seem to be cast to int before being output? (param is -84.67, logged as -84)
# - copter+plane use 'V' in their vehicle type/version/build line, rover uses lower case 'v'.
# Copter+Rover give a build number, plane does not
# - CTUN.ThrOut on copter is 0-1000, on plane+rover it is 0-100
# TODO: add test for noisy baro values
# TODO: support loading binary log files (use Tridge's mavlogdump?)
from __future__ import print_function
import argparse
import datetime
import glob
import os
import sys
import time
from xml.sax.saxutils import escape
import DataflashLog
from VehicleType import VehicleType
class TestResult(object):
'''all tests return a standardized result type'''
class StatusType:
# NA means not applicable for this log (e.g. copter tests against a plane log)
# UNKNOWN means it is missing data required for the test
GOOD, FAIL, WARN, UNKNOWN, NA = range(5)
status = None
statusMessage = "" # can be multi-line
class Test(object):
"""
Base class to be inherited by log tests. Each test should be quite granular so we have lots of small tests with
clear results
"""
def __init__(self):
self.name = ""
self.result = None # will be an instance of TestResult after being run
self.execTime = None
self.enable = True
def run(self, logdata, verbose=False):
pass
class TestSuite(object):
'''registers test classes, loading using a basic plugin architecture, and can run them all in one run() operation'''
def __init__(self):
self.tests = []
self.logfile = None
self.logdata = None
# dynamically load in Test subclasses from the 'tests' folder
# to prevent one being loaded, move it out of that folder, or set that test's .enable attribute to False
dirName = os.path.dirname(os.path.abspath(__file__))
testScripts = glob.glob(dirName + '/tests/*.py')
def getTests(module_path):
import inspect
tests = []
module_name = os.path.basename(module_path)[:-3]
if sys.version_info >= (3, 5):
from importlib.util import spec_from_file_location, module_from_spec
spec = spec_from_file_location(module_name, module_path)
m = module_from_spec(spec)
spec.loader.exec_module(m)
else:
from imp import load_source
m = load_source(module_name, module_path)
for _, _class in inspect.getmembers(m, inspect.isclass):
if _class.__module__ == m.__name__:
tests.append(_class())
return tests
for script in testScripts:
self.tests.extend(getTests(script))
# and here's an example of explicitly loading a Test class if you wanted to do that
# spec = importlib.util.spec_from_file_location("m", dirName + "/tests/TestBadParams.py")
# m = importlib.util.module_from_spec(spec)
# spec.loader.exec_module(m)
# self.tests.append(m.TestBadParams())
def run(self, logdata, verbose):
'''run all registered tests in a single call, gathering execution timing info'''
self.logdata = logdata
if 'GPS' not in self.logdata.channels and 'GPS2' in self.logdata.channels:
# *cough*
self.logdata.channels['GPS'] = self.logdata.channels['GPS2']
self.logfile = logdata.filename
for test in self.tests:
# run each test in turn, gathering timing info
if test.enable:
startTime = time.time()
test.run(self.logdata, verbose) # RUN THE TEST
endTime = time.time()
test.execTime = 1000 * (endTime - startTime)
def outputPlainText(self, outputStats):
'''output test results in plain text'''
print('Dataflash log analysis report for file: ' + self.logfile)
print('Log size: %.2fmb (%d lines)' % (self.logdata.filesizeKB / 1024.0, self.logdata.lineCount))
print('Log duration: %s' % str(datetime.timedelta(seconds=self.logdata.durationSecs)) + '\n')
if self.logdata.vehicleType == VehicleType.Copter and self.logdata.getCopterType():
print('Vehicle Type: %s (%s)' % (self.logdata.vehicleTypeString, self.logdata.getCopterType()))
else:
print('Vehicle Type: %s' % self.logdata.vehicleTypeString)
print('Firmware Version: %s (%s)' % (self.logdata.firmwareVersion, self.logdata.firmwareHash))
print('Hardware: %s' % self.logdata.hardwareType)
print('Free RAM: %s' % self.logdata.freeRAM)
if self.logdata.skippedLines:
print("\nWARNING: %d malformed log lines skipped during read" % self.logdata.skippedLines)
print('\n')
print("Test Results:")
for test in self.tests:
if not test.enable:
continue
statusMessageFirstLine = test.result.statusMessage.strip('\n\r').split('\n')[0]
statusMessageExtra = test.result.statusMessage.strip('\n\r').split('\n')[1:]
execTime = ""
if outputStats:
execTime = " (%6.2fms)" % (test.execTime)
if test.result.status == TestResult.StatusType.GOOD:
print(" %20s: GOOD %-55s%s" % (test.name, statusMessageFirstLine, execTime))
elif test.result.status == TestResult.StatusType.FAIL:
print(" %20s: FAIL %-55s%s [GRAPH]" % (test.name, statusMessageFirstLine, execTime))
elif test.result.status == TestResult.StatusType.WARN:
print(" %20s: WARN %-55s%s [GRAPH]" % (test.name, statusMessageFirstLine, execTime))
elif test.result.status == TestResult.StatusType.NA:
# skip any that aren't relevant for this vehicle/hardware/etc
continue
else:
print(" %20s: UNKNOWN %-55s%s" % (test.name, statusMessageFirstLine, execTime))
# if statusMessageExtra:
for line in statusMessageExtra:
print(" %29s %s" % ("", line))
print('\n')
print(
"The Log Analyzer is currently BETA code."
"\nFor any support or feedback on the log analyzer please email Andrew Chapman (amchapman@gmail.com)"
)
print('\n')
def outputXML(self, xmlFile):
'''output test results to an XML file'''
# open the file for writing
xml = None
try:
if xmlFile == '-':
xml = sys.stdout
else:
xml = open(xmlFile, 'w')
except IOError:
sys.stderr.write("Error opening output xml file: %s" % xmlFile)
sys.exit(1)
# output header info
xml.write("<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n")
xml.write("<loganalysis>\n")
xml.write("<header>\n")
xml.write(" <logfile>" + escape(self.logfile) + "</logfile>\n")
xml.write(" <sizekb>" + escape(repr(self.logdata.filesizeKB)) + "</sizekb>\n")
xml.write(" <sizelines>" + escape(repr(self.logdata.lineCount)) + "</sizelines>\n")
xml.write(" <duration>" + escape(str(datetime.timedelta(seconds=self.logdata.durationSecs))) + "</duration>\n")
xml.write(" <vehicletype>" + escape(self.logdata.vehicleTypeString) + "</vehicletype>\n")
if self.logdata.vehicleType == VehicleType.Copter and self.logdata.getCopterType():
xml.write(" <coptertype>" + escape(self.logdata.getCopterType()) + "</coptertype>\n")
xml.write(" <firmwareversion>" + escape(self.logdata.firmwareVersion) + "</firmwareversion>\n")
xml.write(" <firmwarehash>" + escape(self.logdata.firmwareHash) + "</firmwarehash>\n")
xml.write(" <hardwaretype>" + escape(self.logdata.hardwareType) + "</hardwaretype>\n")
xml.write(" <freemem>" + escape(repr(self.logdata.freeRAM)) + "</freemem>\n")
xml.write(" <skippedlines>" + escape(repr(self.logdata.skippedLines)) + "</skippedlines>\n")
xml.write("</header>\n")
# output parameters
xml.write("<params>\n")
for param, value in self.logdata.parameters.items():
xml.write(" <param name=\"%s\" value=\"%s\" />\n" % (param, escape(repr(value))))
xml.write("</params>\n")
# output test results
xml.write("<results>\n")
for test in self.tests:
if not test.enable:
continue
xml.write(" <result>\n")
if test.result.status == TestResult.StatusType.GOOD:
xml.write(" <name>" + escape(test.name) + "</name>\n")
xml.write(" <status>GOOD</status>\n")
xml.write(" <message>" + escape(test.result.statusMessage) + "</message>\n")
elif test.result.status == TestResult.StatusType.FAIL:
xml.write(" <name>" + escape(test.name) + "</name>\n")
xml.write(" <status>FAIL</status>\n")
xml.write(" <message>" + escape(test.result.statusMessage) + "</message>\n")
xml.write(" <data>(test data will be embedded here at some point)</data>\n")
elif test.result.status == TestResult.StatusType.WARN:
xml.write(" <name>" + escape(test.name) + "</name>\n")
xml.write(" <status>WARN</status>\n")
xml.write(" <message>" + escape(test.result.statusMessage) + "</message>\n")
xml.write(" <data>(test data will be embedded here at some point)</data>\n")
elif test.result.status == TestResult.StatusType.NA:
xml.write(" <name>" + escape(test.name) + "</name>\n")
xml.write(" <status>NA</status>\n")
else:
xml.write(" <name>" + escape(test.name) + "</name>\n")
xml.write(" <status>UNKNOWN</status>\n")
xml.write(" <message>" + escape(test.result.statusMessage) + "</message>\n")
xml.write(" </result>\n")
xml.write("</results>\n")
xml.write("</loganalysis>\n")
xml.close()
def main():
# deal with command line arguments
parser = argparse.ArgumentParser(description='Analyze an APM Dataflash log for known issues')
parser.add_argument('logfile', type=argparse.FileType('r'), help='path to Dataflash log file (or - for stdin)')
parser.add_argument(
'-f',
'--format',
metavar='',
type=str,
action='store',
choices=['bin', 'log', 'auto'],
default='auto',
help='log file format: \'bin\',\'log\' or \'auto\'',
)
parser.add_argument(
'-q', '--quiet', metavar='', action='store_const', const=True, help='quiet mode, do not print results'
)
parser.add_argument(
'-p', '--profile', metavar='', action='store_const', const=True, help='output performance profiling data'
)
parser.add_argument(
'-s', '--skip_bad', metavar='', action='store_const', const=True, help='skip over corrupt dataflash lines'
)
parser.add_argument(
'-e', '--empty', metavar='', action='store_const', const=True, help='run an initial check for an empty log'
)
parser.add_argument(
'-x',
'--xml',
type=str,
metavar='XML file',
nargs='?',
const='',
default='',
help='write output to specified XML file (or - for stdout)',
)
parser.add_argument('-v', '--verbose', metavar='', action='store_const', const=True, help='verbose output')
args = parser.parse_args()
# load the log
startTime = time.time()
logdata = DataflashLog.DataflashLog(args.logfile.name, format=args.format, ignoreBadlines=args.skip_bad) # read log
endTime = time.time()
if args.profile:
print("Log file read time: %.2f seconds" % (endTime - startTime))
# check for empty log if requested
if args.empty:
emptyErr = DataflashLog.DataflashLogHelper.isLogEmpty(logdata)
if emptyErr:
sys.stderr.write("Empty log file: %s, %s" % (logdata.filename, emptyErr))
sys.exit(1)
# run the tests, and gather timings
testSuite = TestSuite()
startTime = time.time()
testSuite.run(logdata, args.verbose) # run tests
endTime = time.time()
if args.profile:
print("Test suite run time: %.2f seconds" % (endTime - startTime))
# deal with output
if not args.quiet:
testSuite.outputPlainText(args.profile)
if args.xml:
testSuite.outputXML(args.xml)
if not args.quiet:
print("XML output written to file: %s\n" % args.xml)
if __name__ == "__main__":
main()

View File

@ -1,404 +0,0 @@
#!/usr/bin/env python
#
#
# Unit and regression tests for the LogAnalyzer code
#
#
# AP_FLAKE8_CLEAN
# TODO: implement more unit+regression tests
from __future__ import print_function
import traceback
import DataflashLog
from VehicleType import VehicleType
try:
# test DataflashLog reading 1
logdata = DataflashLog.DataflashLog()
logdata.read("examples/robert_lefebvre_octo_PM.log", ignoreBadlines=False)
assert logdata.filename == "examples/robert_lefebvre_octo_PM.log"
assert logdata.vehicleType == VehicleType.Copter
assert logdata.vehicleTypeString == "ArduCopter"
assert logdata.firmwareVersion == "V3.0.1"
assert logdata.firmwareHash == "5c6503e2"
assert logdata.freeRAM == 1331
assert logdata.hardwareType == "APM 2"
assert len(logdata.formats) == 27
assert logdata.formats['GPS'].labels == [
'Status',
'Time',
'NSats',
'HDop',
'Lat',
'Lng',
'RelAlt',
'Alt',
'Spd',
'GCrs',
]
assert logdata.formats['ATT'].labels == ['RollIn', 'Roll', 'PitchIn', 'Pitch', 'YawIn', 'Yaw', 'NavYaw']
assert logdata.parameters == {
'RC7_REV': 1.0,
'MNT_MODE': 3.0,
'LOITER_LON_P': 1.0,
'FLTMODE1': 1.0,
'FLTMODE3': 0.0,
'FLTMODE2': 6.0,
'TUNE_HIGH': 10000.0,
'FLTMODE4': 5.0,
'FLTMODE6': 2.0,
'SYSID_SW_TYPE': 10.0,
'LOITER_LON_D': 0.0,
'RC5_REV': 1.0,
'THR_RATE_IMAX': 300.0,
'MNT_RC_IN_PAN': 0.0,
'RC2_MIN': 1110.0,
'LOITER_LON_I': 0.5,
'HLD_LON_P': 1.0,
'STB_RLL_I': 0.0,
'LOW_VOLT': 10.5,
'MNT_CONTROL_Y': 0.0,
'MNT_CONTROL_X': 0.0,
'FRAME': 1.0,
'MNT_CONTROL_Z': 0.0,
'OF_PIT_IMAX': 100.0,
'AHRS_ORIENTATION': 0.0,
'SIMPLE': 0.0,
'RC2_MAX': 1929.0,
'RC8_FUNCTION': 0.0,
'INS_ACCSCAL_X': 0.992788,
'ACRO_P': 4.5,
'MNT_ANGMIN_ROL': -4500.0,
'OF_RLL_P': 2.5,
'STB_RLL_P': 3.5,
'STB_YAW_P': 3.0,
'SR0_RAW_SENS': 2.0,
'FLTMODE5': 0.0,
'RATE_YAW_I': 0.02,
'MAG_ENABLE': 1.0,
'MNT_RETRACT_Y': 0.0,
'MNT_RETRACT_X': 0.0,
'RATE_YAW_IMAX': 800.0,
'WPNAV_SPEED_DN': 150.0,
'WP_YAW_BEHAVIOR': 2.0,
'RC11_REV': 1.0,
'SYSID_THISMAV': 1.0,
'SR0_EXTRA1': 10.0,
'SR0_EXTRA2': 10.0,
'ACRO_BAL_PITCH': 200.0,
'STB_YAW_I': 0.0,
'INS_ACCSCAL_Z': 0.97621,
'INS_ACCSCAL_Y': 1.00147,
'LED_MODE': 9.0,
'FS_GCS_ENABLE': 0.0,
'MNT_RC_IN_ROLL': 0.0,
'INAV_TC_Z': 8.0,
'RATE_PIT_IMAX': 4500.0,
'HLD_LON_IMAX': 3000.0,
'THR_RATE_I': 0.0,
'SR3_EXTRA1': 0.0,
'STB_PIT_IMAX': 800.0,
'AHRS_TRIM_Z': 0.0,
'RC2_REV': 1.0,
'INS_MPU6K_FILTER': 20.0,
'THR_MIN': 130.0,
'AHRS_TRIM_Y': 0.021683,
'RC11_DZ': 0.0,
'THR_MAX': 1000.0,
'SR3_EXTRA2': 0.0,
'MNT_NEUTRAL_Z': 0.0,
'THR_MID': 300.0,
'MNT_NEUTRAL_X': 0.0,
'AMP_PER_VOLT': 18.002001,
'SR0_POSITION': 3.0,
'MNT_STAB_PAN': 0.0,
'FS_BATT_ENABLE': 0.0,
'LAND_SPEED': 50.0,
'OF_PIT_D': 0.12,
'SR0_PARAMS': 50.0,
'COMPASS_ORIENT': 0.0,
'WPNAV_ACCEL': 200.0,
'THR_ACCEL_IMAX': 5000.0,
'SR3_POSITION': 0.0,
'WPNAV_RADIUS': 100.0,
'WP_TOTAL': 14.0,
'RC8_MAX': 1856.0,
'OF_PIT_P': 2.5,
'SR3_RAW_SENS': 0.0,
'RTL_ALT_FINAL': 0.0,
'SR3_PARAMS': 0.0,
'SR0_EXTRA3': 2.0,
'LOITER_LAT_I': 0.5,
'RC6_DZ': 0.0,
'RC4_TRIM': 1524.0,
'RATE_RLL_P': 0.07,
'LOITER_LAT_D': 0.0,
'STB_PIT_P': 3.5,
'OF_PIT_I': 0.5,
'RATE_RLL_I': 1.0,
'AHRS_TRIM_X': 0.003997,
'RC3_REV': 1.0,
'STB_PIT_I': 0.0,
'FS_THR_ENABLE': 0.0,
'LOITER_LAT_P': 1.0,
'AHRS_RP_P': 0.1,
'FENCE_ACTION': 1.0,
'TOY_RATE': 1.0,
'RATE_RLL_D': 0.006,
'RC5_MIN': 1151.0,
'RC5_TRIM': 1676.0,
'STB_RLL_IMAX': 800.0,
'RC4_DZ': 40.0,
'AHRS_YAW_P': 0.1,
'RC11_TRIM': 1500.0,
'MOT_TCRV_ENABLE': 1.0,
'CAM_TRIGG_TYPE': 1.0,
'STB_YAW_IMAX': 800.0,
'RC4_MAX': 1942.0,
'LOITER_LAT_IMAX': 400.0,
'CH7_OPT': 9.0,
'RC11_FUNCTION': 7.0,
'SR0_EXT_STAT': 2.0,
'SONAR_TYPE': 0.0,
'RC3_MAX': 1930.0,
'RATE_YAW_D': 0.0,
'FENCE_ALT_MAX': 30.0,
'COMPASS_MOT_Y': 0.0,
'AXIS_ENABLE': 1.0,
'FENCE_ENABLE': 0.0,
'RC10_DZ': 0.0,
'PILOT_VELZ_MAX': 250.0,
'BATT_CAPACITY': 1760.0,
'FS_THR_VALUE': 975.0,
'RC4_MIN': 1115.0,
'MNT_ANGMAX_TIL': 4500.0,
'RTL_LOIT_TIME': 5000.0,
'ARMING_CHECK': 1.0,
'THR_RATE_P': 6.0,
'OF_RLL_IMAX': 100.0,
'RC6_MIN': 971.0,
'SR0_RAW_CTRL': 0.0,
'RC6_MAX': 2078.0,
'RC5_MAX': 1829.0,
'LOITER_LON_IMAX': 400.0,
'MNT_STAB_TILT': 0.0,
'MOT_TCRV_MIDPCT': 52.0,
'COMPASS_OFS_Z': -5.120774,
'COMPASS_OFS_Y': 46.709824,
'COMPASS_OFS_X': -20.490345,
'THR_ALT_I': 0.0,
'RC10_TRIM': 1500.0,
'INS_PRODUCT_ID': 88.0,
'RC11_MIN': 1100.0,
'FS_GPS_ENABLE': 1.0,
'HLD_LAT_IMAX': 3000.0,
'RC3_TRIM': 1476.0,
'RC6_FUNCTION': 0.0,
'TRIM_THROTTLE': 260.0,
'MNT_STAB_ROLL': 0.0,
'INAV_TC_XY': 2.5,
'RC1_DZ': 30.0,
'MNT_RETRACT_Z': 0.0,
'THR_ACC_ENABLE': 1.0,
'LOG_BITMASK': 830.0,
'TUNE_LOW': 0.0,
'CIRCLE_RATE': 5.0,
'CAM_DURATION': 10.0,
'MNT_NEUTRAL_Y': 0.0,
'RC10_MIN': 1100.0,
'INS_ACCOFFS_X': -0.019376,
'THR_RATE_D': 0.0,
'INS_ACCOFFS_Z': 1.370947,
'RC4_REV': 1.0,
'CIRCLE_RADIUS': 10.0,
'RATE_RLL_IMAX': 4500.0,
'HLD_LAT_P': 1.0,
'AHRS_GPS_MINSATS': 6.0,
'RC8_REV': 1.0,
'SONAR_GAIN': 0.2,
'RC2_TRIM': 1521.0,
'WP_INDEX': 0.0,
'RC1_REV': 1.0,
'RC7_DZ': 0.0,
'AHRS_GPS_USE': 1.0,
'MNT_ANGMIN_PAN': -4500.0,
'SR3_RC_CHAN': 0.0,
'COMPASS_LEARN': 0.0,
'ACRO_TRAINER': 1.0,
'CAM_SERVO_OFF': 1100.0,
'RC5_DZ': 0.0,
'SCHED_DEBUG': 0.0,
'RC11_MAX': 1900.0,
'AHRS_WIND_MAX': 0.0,
'SR3_EXT_STAT': 0.0,
'MNT_ANGMAX_PAN': 4500.0,
'MNT_ANGMAX_ROL': 4500.0,
'RC_SPEED': 490.0,
'SUPER_SIMPLE': 0.0,
'VOLT_DIVIDER': 10.0,
'COMPASS_MOTCT': 0.0,
'SR3_RAW_CTRL': 0.0,
'SONAR_ENABLE': 0.0,
'INS_ACCOFFS_Y': 0.362242,
'SYSID_SW_MREV': 120.0,
'WPNAV_LOIT_SPEED': 1000.0,
'BATT_MONITOR': 4.0,
'MNT_RC_IN_TILT': 8.0,
'CH8_OPT': 0.0,
'RTL_ALT': 1000.0,
'SR0_RC_CHAN': 2.0,
'RC1_MIN': 1111.0,
'RSSI_PIN': -1.0,
'MOT_TCRV_MAXPCT': 93.0,
'GND_ABS_PRESS': 101566.97,
'RC1_MAX': 1936.0,
'FENCE_TYPE': 3.0,
'RC5_FUNCTION': 0.0,
'OF_RLL_D': 0.12,
'BATT_VOLT_PIN': 13.0,
'WPNAV_SPEED': 1000.0,
'RC7_MAX': 1884.0,
'CAM_SERVO_ON': 1300.0,
'RATE_PIT_I': 1.0,
'RC7_MIN': 969.0,
'AHRS_COMP_BETA': 0.1,
'OF_RLL_I': 0.5,
'COMPASS_DEC': 0.0,
'RC3_MIN': 1113.0,
'RC2_DZ': 30.0,
'FENCE_RADIUS': 30.0,
'HLD_LON_I': 0.0,
'ACRO_BAL_ROLL': 200.0,
'COMPASS_AUTODEC': 1.0,
'SR3_EXTRA3': 0.0,
'COMPASS_USE': 1.0,
'RC10_MAX': 1900.0,
'RATE_PIT_P': 0.07,
'GND_TEMP': 21.610104,
'RC7_TRIM': 970.0,
'RC10_REV': 1.0,
'RATE_YAW_P': 0.2,
'THR_ALT_P': 1.0,
'RATE_PIT_D': 0.006,
'ESC': 0.0,
'MNT_ANGMIN_TIL': -4500.0,
'SERIAL3_BAUD': 57.0,
'RC8_MIN': 968.0,
'THR_ALT_IMAX': 300.0,
'SYSID_MYGCS': 255.0,
'INS_GYROFFS_Y': 0.581989,
'TUNE': 0.0,
'RC8_TRIM': 970.0,
'RC3_DZ': 30.0,
'AHRS_GPS_GAIN': 1.0,
'THR_ACCEL_D': 0.0,
'TELEM_DELAY': 0.0,
'THR_ACCEL_I': 0.5,
'COMPASS_MOT_X': 0.0,
'COMPASS_MOT_Z': 0.0,
'RC10_FUNCTION': 0.0,
'INS_GYROFFS_X': -0.001698,
'INS_GYROFFS_Z': 0.01517,
'RC6_TRIM': 1473.0,
'THR_ACCEL_P': 1.2,
'RC8_DZ': 0.0,
'HLD_LAT_I': 0.0,
'RC7_FUNCTION': 0.0,
'RC6_REV': 1.0,
'BATT_CURR_PIN': 12.0,
'WPNAV_SPEED_UP': 250.0,
'RC1_TRIM': 1524.0,
"MNT_JSTICK_SPD": 0.0,
"FLOW_ENABLE": 0.0,
}
assert logdata.messages == {}
assert logdata.modeChanges == {
2204: ('LOITER', 269),
4594: ('STABILIZE', 269),
644: ('ALT_HOLD', 269),
4404: ('ALT_HOLD', 269),
}
assert logdata.channels['GPS']['NSats'].min() == 6
assert logdata.channels['GPS']['NSats'].max() == 8
assert logdata.channels['GPS']['HDop'].listData[0] == (552, 4.68)
assert logdata.channels['GPS']['HDop'].listData[44] == (768, 4.67)
assert logdata.channels['GPS']['HDop'].listData[157] == (1288, 2.28)
assert logdata.channels['CTUN']['ThrOut'].listData[5] == (321, 139)
assert logdata.channels['CTUN']['ThrOut'].listData[45] == (409, 242)
assert logdata.channels['CTUN']['ThrOut'].listData[125] == (589, 266)
assert logdata.channels['CTUN']['CRate'].listData[3] == (317, 35)
assert logdata.channels['CTUN']['CRate'].listData[51] == (421, 31)
assert logdata.channels['CTUN']['CRate'].listData[115] == (563, -8)
assert int(logdata.filesizeKB) == 307
assert abs(logdata.durationSecs - 155.399) < 1e-9
assert logdata.lineCount == 4750
# test LogIterator class
lit = DataflashLog.LogIterator(logdata)
assert lit.currentLine == 0
assert lit.iterators == {
'CURR': (0, 310),
'ERR': (0, 307),
'NTUN': (0, 2206),
'CTUN': (0, 308),
'GPS': (0, 552),
'CMD': (0, 607),
'D32': (0, 305),
'ATT': (0, 311),
'EV': (0, 306),
'DU32': (0, 309),
'PM': (0, 479),
}
lit.jump(500)
assert lit.iterators == {
'CURR': (9, 514),
'ERR': (1, 553),
'NTUN': (0, 2206),
'CTUN': (87, 500),
'GPS': (0, 552),
'CMD': (0, 607),
'D32': (0, 305),
'ATT': (83, 501),
'EV': (4, 606),
'DU32': (9, 513),
'PM': (1, 719),
}
assert lit['CTUN']['ThrIn'] == 450
assert lit['ATT']['RollIn'] == 11.19
assert lit['CURR']['CurrTot'] == 25.827288
assert lit['D32']['Value'] == 11122
next(lit)
assert lit.iterators == {
'CURR': (9, 514),
'ERR': (1, 553),
'NTUN': (0, 2206),
'CTUN': (88, 502),
'GPS': (0, 552),
'CMD': (0, 607),
'D32': (0, 305),
'ATT': (83, 501),
'EV': (4, 606),
'DU32': (9, 513),
'PM': (1, 719),
}
lit.jump(4750)
next(lit)
assert lit.currentLine == 4751
assert lit['ATT']['Roll'] == 2.99
# TODO: unit test DataflashLog reading 2
# ...
# TODO: unit test the log test classes
# ...
print("All unit/regression tests GOOD\n")
except Exception:
print("Error found: " + traceback.format_exc())
print("UNIT TEST FAILED\n")

View File

@ -1,12 +0,0 @@
# AP_FLAKE8_CLEAN
class VehicleType:
Plane = 17
Copter = 23
Rover = 37
# these should really be "Plane", "Copter" and "Rover", but many
# things use these values as triggers in their code:
VehicleTypeString = {17: "ArduPlane", 23: "ArduCopter", 37: "ArduRover"}

View File

@ -1,356 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<loganalysis>
<header>
<logfile>examples/robert_lefebvre_octo_PM.log</logfile>
<sizekb>302.7548828125</sizekb>
<sizelines>4750</sizelines>
<duration>0:02:35</duration>
<vehicletype>ArduCopter</vehicletype>
<coptertype>octo</coptertype>
<firmwareversion>V3.0.1</firmwareversion>
<firmwarehash>5c6503e2</firmwarehash>
<hardwaretype>APM 2</hardwaretype>
<freemem>1331</freemem>
<skippedlines>0</skippedlines>
</header>
<params>
<param name="RC7_REV" value="1.0" />
<param name="MNT_MODE" value="3.0" />
<param name="LOITER_LON_P" value="1.0" />
<param name="FLTMODE1" value="1.0" />
<param name="FLTMODE3" value="0.0" />
<param name="FLTMODE2" value="6.0" />
<param name="TUNE_HIGH" value="10000.0" />
<param name="FLTMODE4" value="5.0" />
<param name="FLTMODE6" value="2.0" />
<param name="SYSID_SW_TYPE" value="10.0" />
<param name="LOITER_LON_D" value="0.0" />
<param name="RC5_REV" value="1.0" />
<param name="THR_RATE_IMAX" value="300.0" />
<param name="MNT_RC_IN_PAN" value="0.0" />
<param name="RC2_MIN" value="1110.0" />
<param name="LOITER_LON_I" value="0.5" />
<param name="HLD_LON_P" value="1.0" />
<param name="STB_RLL_I" value="0.0" />
<param name="LOW_VOLT" value="10.5" />
<param name="MNT_CONTROL_Y" value="0.0" />
<param name="MNT_CONTROL_X" value="0.0" />
<param name="FRAME" value="1.0" />
<param name="MNT_CONTROL_Z" value="0.0" />
<param name="OF_PIT_IMAX" value="100.0" />
<param name="AHRS_ORIENTATION" value="0.0" />
<param name="SIMPLE" value="0.0" />
<param name="RC2_MAX" value="1929.0" />
<param name="MNT_JSTICK_SPD" value="0.0" />
<param name="RC8_FUNCTION" value="0.0" />
<param name="INS_ACCSCAL_X" value="0.992788" />
<param name="ACRO_P" value="4.5" />
<param name="MNT_ANGMIN_ROL" value="-4500.0" />
<param name="OF_RLL_P" value="2.5" />
<param name="STB_RLL_P" value="3.5" />
<param name="STB_YAW_P" value="3.0" />
<param name="SR0_RAW_SENS" value="2.0" />
<param name="FLTMODE5" value="0.0" />
<param name="RATE_YAW_I" value="0.02" />
<param name="MAG_ENABLE" value="1.0" />
<param name="MNT_RETRACT_Y" value="0.0" />
<param name="MNT_RETRACT_X" value="0.0" />
<param name="RATE_YAW_IMAX" value="800.0" />
<param name="WPNAV_SPEED_DN" value="150.0" />
<param name="WP_YAW_BEHAVIOR" value="2.0" />
<param name="RC11_REV" value="1.0" />
<param name="SYSID_THISMAV" value="1.0" />
<param name="SR0_EXTRA1" value="10.0" />
<param name="SR0_EXTRA2" value="10.0" />
<param name="ACRO_BAL_PITCH" value="200.0" />
<param name="STB_YAW_I" value="0.0" />
<param name="INS_ACCSCAL_Z" value="0.97621" />
<param name="INS_ACCSCAL_Y" value="1.00147" />
<param name="LED_MODE" value="9.0" />
<param name="FS_GCS_ENABLE" value="0.0" />
<param name="MNT_RC_IN_ROLL" value="0.0" />
<param name="INAV_TC_Z" value="8.0" />
<param name="RATE_PIT_IMAX" value="4500.0" />
<param name="HLD_LON_IMAX" value="3000.0" />
<param name="THR_RATE_I" value="0.0" />
<param name="SR3_EXTRA1" value="0.0" />
<param name="STB_PIT_IMAX" value="800.0" />
<param name="AHRS_TRIM_Z" value="0.0" />
<param name="RC2_REV" value="1.0" />
<param name="INS_MPU6K_FILTER" value="20.0" />
<param name="THR_MIN" value="130.0" />
<param name="AHRS_TRIM_Y" value="0.021683" />
<param name="RC11_DZ" value="0.0" />
<param name="THR_MAX" value="1000.0" />
<param name="SR3_EXTRA2" value="0.0" />
<param name="MNT_NEUTRAL_Z" value="0.0" />
<param name="THR_MID" value="300.0" />
<param name="MNT_NEUTRAL_X" value="0.0" />
<param name="AMP_PER_VOLT" value="18.002001" />
<param name="SR0_POSITION" value="3.0" />
<param name="MNT_STAB_PAN" value="0.0" />
<param name="FS_BATT_ENABLE" value="0.0" />
<param name="LAND_SPEED" value="50.0" />
<param name="OF_PIT_D" value="0.12" />
<param name="SR0_PARAMS" value="50.0" />
<param name="COMPASS_ORIENT" value="0.0" />
<param name="WPNAV_ACCEL" value="200.0" />
<param name="THR_ACCEL_IMAX" value="5000.0" />
<param name="SR3_POSITION" value="0.0" />
<param name="WPNAV_RADIUS" value="100.0" />
<param name="WP_TOTAL" value="14.0" />
<param name="RC8_MAX" value="1856.0" />
<param name="OF_PIT_P" value="2.5" />
<param name="SR3_RAW_SENS" value="0.0" />
<param name="RTL_ALT_FINAL" value="0.0" />
<param name="SR3_PARAMS" value="0.0" />
<param name="SR0_EXTRA3" value="2.0" />
<param name="LOITER_LAT_I" value="0.5" />
<param name="RC6_DZ" value="0.0" />
<param name="RC4_TRIM" value="1524.0" />
<param name="RATE_RLL_P" value="0.07" />
<param name="LOITER_LAT_D" value="0.0" />
<param name="STB_PIT_P" value="3.5" />
<param name="OF_PIT_I" value="0.5" />
<param name="RATE_RLL_I" value="1.0" />
<param name="AHRS_TRIM_X" value="0.003997" />
<param name="RC3_REV" value="1.0" />
<param name="STB_PIT_I" value="0.0" />
<param name="FS_THR_ENABLE" value="0.0" />
<param name="LOITER_LAT_P" value="1.0" />
<param name="AHRS_RP_P" value="0.1" />
<param name="FENCE_ACTION" value="1.0" />
<param name="TOY_RATE" value="1.0" />
<param name="RATE_RLL_D" value="0.006" />
<param name="RC5_MIN" value="1151.0" />
<param name="RC5_TRIM" value="1676.0" />
<param name="STB_RLL_IMAX" value="800.0" />
<param name="RC4_DZ" value="40.0" />
<param name="AHRS_YAW_P" value="0.1" />
<param name="RC11_TRIM" value="1500.0" />
<param name="MOT_TCRV_ENABLE" value="1.0" />
<param name="CAM_TRIGG_TYPE" value="1.0" />
<param name="STB_YAW_IMAX" value="800.0" />
<param name="RC4_MAX" value="1942.0" />
<param name="LOITER_LAT_IMAX" value="400.0" />
<param name="CH7_OPT" value="9.0" />
<param name="RC11_FUNCTION" value="7.0" />
<param name="SR0_EXT_STAT" value="2.0" />
<param name="SONAR_TYPE" value="0.0" />
<param name="RC3_MAX" value="1930.0" />
<param name="RATE_YAW_D" value="0.0" />
<param name="FENCE_ALT_MAX" value="30.0" />
<param name="COMPASS_MOT_Y" value="0.0" />
<param name="AXIS_ENABLE" value="1.0" />
<param name="FENCE_ENABLE" value="0.0" />
<param name="RC10_DZ" value="0.0" />
<param name="PILOT_VELZ_MAX" value="250.0" />
<param name="BATT_CAPACITY" value="1760.0" />
<param name="FS_THR_VALUE" value="975.0" />
<param name="RC4_MIN" value="1115.0" />
<param name="MNT_ANGMAX_TIL" value="4500.0" />
<param name="RTL_LOIT_TIME" value="5000.0" />
<param name="ARMING_CHECK" value="1.0" />
<param name="THR_RATE_P" value="6.0" />
<param name="OF_RLL_IMAX" value="100.0" />
<param name="RC6_MIN" value="971.0" />
<param name="SR0_RAW_CTRL" value="0.0" />
<param name="RC6_MAX" value="2078.0" />
<param name="RC5_MAX" value="1829.0" />
<param name="LOITER_LON_IMAX" value="400.0" />
<param name="MNT_STAB_TILT" value="0.0" />
<param name="MOT_TCRV_MIDPCT" value="52.0" />
<param name="COMPASS_OFS_Z" value="-5.120774" />
<param name="COMPASS_OFS_Y" value="46.709824" />
<param name="COMPASS_OFS_X" value="-20.490345" />
<param name="THR_ALT_I" value="0.0" />
<param name="RC10_TRIM" value="1500.0" />
<param name="INS_PRODUCT_ID" value="88.0" />
<param name="RC11_MIN" value="1100.0" />
<param name="FS_GPS_ENABLE" value="1.0" />
<param name="HLD_LAT_IMAX" value="3000.0" />
<param name="RC3_TRIM" value="1476.0" />
<param name="RC6_FUNCTION" value="0.0" />
<param name="TRIM_THROTTLE" value="260.0" />
<param name="MNT_STAB_ROLL" value="0.0" />
<param name="INAV_TC_XY" value="2.5" />
<param name="RC1_DZ" value="30.0" />
<param name="MNT_RETRACT_Z" value="0.0" />
<param name="THR_ACC_ENABLE" value="1.0" />
<param name="LOG_BITMASK" value="830.0" />
<param name="TUNE_LOW" value="0.0" />
<param name="CIRCLE_RATE" value="5.0" />
<param name="CAM_DURATION" value="10.0" />
<param name="MNT_NEUTRAL_Y" value="0.0" />
<param name="RC10_MIN" value="1100.0" />
<param name="INS_ACCOFFS_X" value="-0.019376" />
<param name="THR_RATE_D" value="0.0" />
<param name="INS_ACCOFFS_Z" value="1.370947" />
<param name="RC4_REV" value="1.0" />
<param name="CIRCLE_RADIUS" value="10.0" />
<param name="RATE_RLL_IMAX" value="4500.0" />
<param name="HLD_LAT_P" value="1.0" />
<param name="AHRS_GPS_MINSATS" value="6.0" />
<param name="FLOW_ENABLE" value="0.0" />
<param name="RC8_REV" value="1.0" />
<param name="SONAR_GAIN" value="0.2" />
<param name="RC2_TRIM" value="1521.0" />
<param name="WP_INDEX" value="0.0" />
<param name="RC1_REV" value="1.0" />
<param name="RC7_DZ" value="0.0" />
<param name="AHRS_GPS_USE" value="1.0" />
<param name="MNT_ANGMIN_PAN" value="-4500.0" />
<param name="SR3_RC_CHAN" value="0.0" />
<param name="COMPASS_LEARN" value="0.0" />
<param name="ACRO_TRAINER" value="1.0" />
<param name="CAM_SERVO_OFF" value="1100.0" />
<param name="RC5_DZ" value="0.0" />
<param name="SCHED_DEBUG" value="0.0" />
<param name="RC11_MAX" value="1900.0" />
<param name="AHRS_WIND_MAX" value="0.0" />
<param name="SR3_EXT_STAT" value="0.0" />
<param name="MNT_ANGMAX_PAN" value="4500.0" />
<param name="MNT_ANGMAX_ROL" value="4500.0" />
<param name="RC_SPEED" value="490.0" />
<param name="SUPER_SIMPLE" value="0.0" />
<param name="VOLT_DIVIDER" value="10.0" />
<param name="COMPASS_MOTCT" value="0.0" />
<param name="SR3_RAW_CTRL" value="0.0" />
<param name="SONAR_ENABLE" value="0.0" />
<param name="INS_ACCOFFS_Y" value="0.362242" />
<param name="SYSID_SW_MREV" value="120.0" />
<param name="WPNAV_LOIT_SPEED" value="1000.0" />
<param name="BATT_MONITOR" value="4.0" />
<param name="MNT_RC_IN_TILT" value="8.0" />
<param name="CH8_OPT" value="0.0" />
<param name="RTL_ALT" value="1000.0" />
<param name="SR0_RC_CHAN" value="2.0" />
<param name="RC1_MIN" value="1111.0" />
<param name="RSSI_PIN" value="-1.0" />
<param name="MOT_TCRV_MAXPCT" value="93.0" />
<param name="GND_ABS_PRESS" value="101566.97" />
<param name="RC1_MAX" value="1936.0" />
<param name="FENCE_TYPE" value="3.0" />
<param name="RC5_FUNCTION" value="0.0" />
<param name="OF_RLL_D" value="0.12" />
<param name="BATT_VOLT_PIN" value="13.0" />
<param name="WPNAV_SPEED" value="1000.0" />
<param name="RC7_MAX" value="1884.0" />
<param name="CAM_SERVO_ON" value="1300.0" />
<param name="RATE_PIT_I" value="1.0" />
<param name="RC7_MIN" value="969.0" />
<param name="AHRS_COMP_BETA" value="0.1" />
<param name="OF_RLL_I" value="0.5" />
<param name="COMPASS_DEC" value="0.0" />
<param name="RC3_MIN" value="1113.0" />
<param name="RC2_DZ" value="30.0" />
<param name="FENCE_RADIUS" value="30.0" />
<param name="HLD_LON_I" value="0.0" />
<param name="ACRO_BAL_ROLL" value="200.0" />
<param name="COMPASS_AUTODEC" value="1.0" />
<param name="SR3_EXTRA3" value="0.0" />
<param name="COMPASS_USE" value="1.0" />
<param name="RC10_MAX" value="1900.0" />
<param name="RATE_PIT_P" value="0.07" />
<param name="GND_TEMP" value="21.610104" />
<param name="RC7_TRIM" value="970.0" />
<param name="RC10_REV" value="1.0" />
<param name="RATE_YAW_P" value="0.2" />
<param name="THR_ALT_P" value="1.0" />
<param name="RATE_PIT_D" value="0.006" />
<param name="ESC" value="0.0" />
<param name="MNT_ANGMIN_TIL" value="-4500.0" />
<param name="SERIAL3_BAUD" value="57.0" />
<param name="RC8_MIN" value="968.0" />
<param name="THR_ALT_IMAX" value="300.0" />
<param name="SYSID_MYGCS" value="255.0" />
<param name="INS_GYROFFS_Y" value="0.581989" />
<param name="TUNE" value="0.0" />
<param name="RC8_TRIM" value="970.0" />
<param name="RC3_DZ" value="30.0" />
<param name="AHRS_GPS_GAIN" value="1.0" />
<param name="THR_ACCEL_D" value="0.0" />
<param name="TELEM_DELAY" value="0.0" />
<param name="THR_ACCEL_I" value="0.5" />
<param name="COMPASS_MOT_X" value="0.0" />
<param name="COMPASS_MOT_Z" value="0.0" />
<param name="RC10_FUNCTION" value="0.0" />
<param name="INS_GYROFFS_X" value="-0.001698" />
<param name="INS_GYROFFS_Z" value="0.01517" />
<param name="RC6_TRIM" value="1473.0" />
<param name="THR_ACCEL_P" value="1.2" />
<param name="RC8_DZ" value="0.0" />
<param name="HLD_LAT_I" value="0.0" />
<param name="RC7_FUNCTION" value="0.0" />
<param name="RC6_REV" value="1.0" />
<param name="BATT_CURR_PIN" value="12.0" />
<param name="WPNAV_SPEED_UP" value="250.0" />
<param name="RC1_TRIM" value="1524.0" />
</params>
<results>
<result>
<name>Brownout</name>
<status>GOOD</status>
<message></message>
</result>
<result>
<name>Compass</name>
<status>GOOD</status>
<message>No MAG data, unable to test mag_field interference
</message>
</result>
<result>
<name>Dupe Log Data</name>
<status>GOOD</status>
<message></message>
</result>
<result>
<name>Empty</name>
<status>GOOD</status>
<message></message>
</result>
<result>
<name>Event/Failsafe</name>
<status>FAIL</status>
<message>ERR found: GPS </message>
<data>(test data will be embedded here at some point)</data>
</result>
<result>
<name>GPS</name>
<status>WARN</status>
<message>Min satellites: 6, Max HDop: 4.68</message>
<data>(test data will be embedded here at some point)</data>
</result>
<result>
<name>Parameters</name>
<status>GOOD</status>
<message></message>
</result>
<result>
<name>PM</name>
<status>FAIL</status>
<message>14 slow loop lines found, max 18.56% on line 2983</message>
<data>(test data will be embedded here at some point)</data>
</result>
<result>
<name>Pitch/Roll</name>
<status>GOOD</status>
<message></message>
</result>
<result>
<name>Thrust</name>
<status>GOOD</status>
<message></message>
</result>
<result>
<name>VCC</name>
<status>GOOD</status>
<message></message>
</result>
<result>
<name>Vibration</name>
<status>UNKNOWN</status>
<message>No IMU log data</message>
</result>
</results>
</loganalysis>

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -1,135 +0,0 @@
# AP_FLAKE8_CLEAN
from LogAnalyzer import Test, TestResult
from VehicleType import VehicleType
# from ArduCopter/defines.h
AUTOTUNE_INITIALISED = 30
AUTOTUNE_OFF = 31
AUTOTUNE_RESTART = 32
AUTOTUNE_SUCCESS = 33
AUTOTUNE_FAILED = 34
AUTOTUNE_REACHED_LIMIT = 35
AUTOTUNE_PILOT_TESTING = 36
AUTOTUNE_SAVEDGAINS = 37
AUTOTUNE_EVENTS = frozenset(
[
AUTOTUNE_INITIALISED,
AUTOTUNE_OFF,
AUTOTUNE_RESTART,
AUTOTUNE_SUCCESS,
AUTOTUNE_FAILED,
AUTOTUNE_REACHED_LIMIT,
AUTOTUNE_PILOT_TESTING,
AUTOTUNE_SAVEDGAINS,
]
)
class TestAutotune(Test):
'''test for autotune success (copter only)'''
class AutotuneSession(object):
def __init__(self, events):
self.events = events
@property
def linestart(self):
return self.events[0][0]
@property
def linestop(self):
return self.events[-1][0]
@property
def success(self):
return AUTOTUNE_SUCCESS in [i for _, i in self.events]
@property
def failure(self):
return AUTOTUNE_FAILED in [i for _, i in self.events]
@property
def limit(self):
return AUTOTUNE_REACHED_LIMIT in [i for _, i in self.events]
def __repr__(self):
return "<AutotuneSession {}-{}>".format(self.linestart, self.linestop)
def __init__(self):
Test.__init__(self)
self.name = "Autotune"
def run(self, logdata, verbose):
self.result = TestResult()
self.result.status = TestResult.StatusType.GOOD
if logdata.vehicleType != VehicleType.Copter:
self.result.status = TestResult.StatusType.NA
return
for i in ['EV', 'ATDE', 'ATUN']:
r = False
if i not in logdata.channels:
self.result.status = TestResult.StatusType.UNKNOWN
self.result.statusMessage = "No {} log data".format(i)
r = True
if r:
return
events = list(filter(lambda x: x[1] in AUTOTUNE_EVENTS, logdata.channels["EV"]["Id"].listData))
attempts = []
j = None
for i in range(0, len(events)):
line, ev = events[i]
if ev == AUTOTUNE_INITIALISED:
if j is not None:
attempts.append(TestAutotune.AutotuneSession(events[j:i]))
j = i
# last attempt
if j is not None:
attempts.append(TestAutotune.AutotuneSession(events[j:]))
for a in attempts:
# this should not be necessary!
def class_from_channel(c):
members = dict({'__init__': lambda x: setattr(x, i, None) for i in logdata.channels[c]})
cls = type('Channel__{:s}'.format(c), (object,), members)
return cls
# last wins
if a.success:
self.result.status = TestResult.StatusType.GOOD
s = "[+]"
elif a.failure:
self.result.status = TestResult.StatusType.FAIL
s = "[-]"
else:
self.result.status = TestResult.StatusType.UNKNOWN
s = "[?]"
s += " Autotune {}-{}\n".format(a.linestart, a.linestop)
self.result.statusMessage += s
if verbose:
linenext = a.linestart + 1
while linenext < a.linestop:
try:
line = logdata.channels['ATUN']['RateMax'].getNearestValueFwd(linenext)[1]
if line > a.linestop:
break
except ValueError:
break
atun = class_from_channel('ATUN')()
for key in logdata.channels['ATUN']:
setattr(atun, key, logdata.channels['ATUN'][key].getNearestValueFwd(linenext)[0])
linenext = logdata.channels['ATUN'][key].getNearestValueFwd(linenext)[1] + 1
self.result.statusMessage += (
"ATUN Axis:{atun.Axis} TuneStep:{atun.TuneStep} RateMin:{atun.RateMin:5.0f}"
" RateMax:{atun.RateMax:5.0f} RPGain:{atun.RPGain:1.4f} RDGain:{atun.RDGain:1.4f}"
" SPGain:{atun.SPGain:1.1f} (@line:{l})\n"
).format(l=linenext, atun=atun)
self.result.statusMessage += '\n'

View File

@ -1,47 +0,0 @@
# AP_FLAKE8_CLEAN
from LogAnalyzer import Test, TestResult
class TestBrownout(Test):
'''test for a log that has been truncated in flight'''
def __init__(self):
Test.__init__(self)
self.name = "Brownout"
def run(self, logdata, verbose):
self.result = TestResult()
self.result.status = TestResult.StatusType.GOOD
isArmed = False
# FIXME: cope with LOG_ARM_DISARM_MSG message
if "EV" in logdata.channels:
# step through the arm/disarm events in order, to see if they're symmetrical
# note: it seems landing detection isn't robust enough to rely upon here, so we'll only consider arm+disarm,
# not takeoff+land
for line, ev in logdata.channels["EV"]["Id"].listData:
if ev == 10:
isArmed = True
elif ev == 11:
isArmed = False
if "CTUN" not in logdata.channels:
self.result.status = TestResult.StatusType.UNKNOWN
self.result.statusMessage = "No CTUN log data"
return
if "BarAlt" in logdata.channels['CTUN']:
self.ctun_baralt_att = 'BarAlt'
else:
self.ctun_baralt_att = 'BAlt'
# check for relative altitude at end
(finalAlt, finalAltLine) = logdata.channels["CTUN"][self.ctun_baralt_att].getNearestValue(
logdata.lineCount, lookForwards=False
)
finalAltMax = 3.0 # max alt offset that we'll still consider to be on the ground
if isArmed and finalAlt > finalAltMax:
self.result.status = TestResult.StatusType.FAIL
self.result.statusMessage = "Truncated Log? Ends while armed at altitude %.2fm" % finalAlt

View File

@ -1,161 +0,0 @@
# AP_FLAKE8_CLEAN
import math
from functools import reduce
from LogAnalyzer import Test, TestResult
class TestCompass(Test):
'''test for compass offsets and throttle interference'''
def __init__(self):
Test.__init__(self)
self.name = "Compass"
def run(self, logdata, verbose):
self.result = TestResult()
self.result.status = TestResult.StatusType.GOOD
def vec_len(x):
return math.sqrt(x[0] ** 2 + x[1] ** 2 + x[2] ** 2)
def FAIL():
self.result.status = TestResult.StatusType.FAIL
def WARN():
if self.result.status != TestResult.StatusType.FAIL:
self.result.status = TestResult.StatusType.WARN
try:
warnOffset = 300
failOffset = 500
param_offsets = (
logdata.parameters["COMPASS_OFS_X"],
logdata.parameters["COMPASS_OFS_Y"],
logdata.parameters["COMPASS_OFS_Z"],
)
if vec_len(param_offsets) > failOffset:
FAIL()
self.result.statusMessage = "FAIL: Large compass offset params (X:%.2f, Y:%.2f, Z:%.2f)\n" % (
param_offsets[0],
param_offsets[1],
param_offsets[2],
)
elif vec_len(param_offsets) > warnOffset:
WARN()
self.result.statusMessage = "WARN: Large compass offset params (X:%.2f, Y:%.2f, Z:%.2f)\n" % (
param_offsets[0],
param_offsets[1],
param_offsets[2],
)
if "MAG" in logdata.channels:
max_log_offsets = zip(
map(lambda x: x[1], logdata.channels["MAG"]["OfsX"].listData),
map(lambda x: x[1], logdata.channels["MAG"]["OfsY"].listData),
map(lambda x: x[1], logdata.channels["MAG"]["OfsZ"].listData),
)
max_log_offsets = reduce(lambda x, y: x if vec_len(x) > vec_len(y) else y, max_log_offsets)
if vec_len(max_log_offsets) > failOffset:
FAIL()
self.result.statusMessage += "FAIL: Large compass offset in MAG data (X:%.2f, Y:%.2f, Z:%.2f)\n" % (
max_log_offsets[0],
max_log_offsets[1],
max_log_offsets[2],
)
elif vec_len(max_log_offsets) > warnOffset:
WARN()
self.result.statusMessage += "WARN: Large compass offset in MAG data (X:%.2f, Y:%.2f, Z:%.2f)\n" % (
max_log_offsets[0],
max_log_offsets[1],
max_log_offsets[2],
)
# check for mag field length change, and length outside of recommended range
if "MAG" in logdata.channels:
percentDiffThresholdWARN = 0.25
percentDiffThresholdFAIL = 0.35
minMagFieldThreshold = 120.0
maxMagFieldThreshold = 550.0
index = 0
length = len(logdata.channels["MAG"]["MagX"].listData)
magField = []
(minMagField, maxMagField) = (None, None)
(minMagFieldLine, maxMagFieldLine) = (None, None)
zerosFound = False
while index < length:
mx = logdata.channels["MAG"]["MagX"].listData[index][1]
my = logdata.channels["MAG"]["MagY"].listData[index][1]
mz = logdata.channels["MAG"]["MagZ"].listData[index][1]
if (
(mx == 0) and (my == 0) and (mz == 0)
): # sometimes they're zero, not sure why, same reason as why we get NaNs as offsets?
zerosFound = True
else:
mf = math.sqrt(mx * mx + my * my + mz * mz)
magField.append(mf)
if mf < minMagField:
minMagField = mf
minMagFieldLine = logdata.channels["MAG"]["MagX"].listData[index][0]
if mf > maxMagField:
maxMagField = mf
maxMagFieldLine = logdata.channels["MAG"]["MagX"].listData[index][0]
if index == 0:
(minMagField, maxMagField) = (mf, mf)
index += 1
if minMagField is None:
FAIL()
self.result.statusMessage = self.result.statusMessage + "No valid mag data found\n"
else:
percentDiff = (maxMagField - minMagField) / minMagField
if percentDiff > percentDiffThresholdFAIL:
FAIL()
self.result.statusMessage = (
self.result.statusMessage + "Large change in mag_field (%.2f%%)\n" % (percentDiff * 100)
)
elif percentDiff > percentDiffThresholdWARN:
WARN()
self.result.statusMessage = (
self.result.statusMessage + "Moderate change in mag_field (%.2f%%)\n" % (percentDiff * 100)
)
else:
self.result.statusMessage = (
self.result.statusMessage
+ "mag_field interference within limits (%.2f%%)\n" % (percentDiff * 100)
)
if minMagField < minMagFieldThreshold:
self.result.statusMessage = (
self.result.statusMessage
+ "Min mag field length (%.2f) < recommended (%.2f)\n" % (minMagField, minMagFieldThreshold)
)
if maxMagField > maxMagFieldThreshold:
self.result.statusMessage = (
self.result.statusMessage
+ "Max mag field length (%.2f) > recommended (%.2f)\n" % (maxMagField, maxMagFieldThreshold)
)
if verbose:
self.result.statusMessage = self.result.statusMessage + "Min mag_field of %.2f on line %d\n" % (
minMagField,
minMagFieldLine,
)
self.result.statusMessage = self.result.statusMessage + "Max mag_field of %.2f on line %d\n" % (
maxMagField,
maxMagFieldLine,
)
if zerosFound:
if self.result.status == TestResult.StatusType.GOOD:
WARN()
self.result.statusMessage = self.result.statusMessage + "All zeros found in MAG X/Y/Z log data\n"
else:
self.result.statusMessage = (
self.result.statusMessage + "No MAG data, unable to test mag_field interference\n"
)
except KeyError as e:
self.result.status = TestResult.StatusType.FAIL
self.result.statusMessage = str(e) + ' not found'

View File

@ -1,110 +0,0 @@
from __future__ import print_function
import DataflashLog
from LogAnalyzer import Test, TestResult
# import scipy
# import pylab #### TEMP!!! only for dev
# from scipy import signal
class TestDualGyroDrift(Test):
'''test for gyro drift between dual IMU data'''
def __init__(self):
Test.__init__(self)
self.name = "Gyro Drift"
self.enable = False
def run(self, logdata, verbose):
self.result = TestResult()
self.result.status = TestResult.StatusType.GOOD
# if "IMU" not in logdata.channels or "IMU2" not in logdata.channels:
# self.result.status = TestResult.StatusType.NA
# return
# imuX = logdata.channels["IMU"]["GyrX"].listData
# imu2X = logdata.channels["IMU2"]["GyrX"].listData
# # NOTE: weird thing about Holger's log is that the counts of IMU+IMU2 are different
# print("length 1: %.2f, length 2: %.2f" % (len(imuX),len(imu2X)))
# #assert(len(imuX) == len(imu2X))
# # divide the curve into segments and get the average of each segment
# # we will get the diff between those averages, rather than a per-sample diff as the IMU+IMU2 arrays are often not the same length
# diffThresholdWARN = 0.03
# diffThresholdFAIL = 0.05
# nSamples = 10
# imu1XAverages, imu1YAverages, imu1ZAverages, imu2XAverages, imu2YAverages, imu2ZAverages = ([],[],[],[],[],[])
# imuXDiffAverages, imuYDiffAverages, imuZDiffAverages = ([],[],[])
# maxDiffX, maxDiffY, maxDiffZ = (0,0,0)
# sliceLength1 = len(logdata.channels["IMU"]["GyrX"].dictData.values()) / nSamples
# sliceLength2 = len(logdata.channels["IMU2"]["GyrX"].dictData.values()) / nSamples
# for i in range(0,nSamples):
# imu1XAverages.append(numpy.mean(logdata.channels["IMU"]["GyrX"].dictData.values()[i*sliceLength1:i*sliceLength1+sliceLength1]))
# imu1YAverages.append(numpy.mean(logdata.channels["IMU"]["GyrY"].dictData.values()[i*sliceLength1:i*sliceLength1+sliceLength1]))
# imu1ZAverages.append(numpy.mean(logdata.channels["IMU"]["GyrZ"].dictData.values()[i*sliceLength1:i*sliceLength1+sliceLength1]))
# imu2XAverages.append(numpy.mean(logdata.channels["IMU2"]["GyrX"].dictData.values()[i*sliceLength2:i*sliceLength2+sliceLength2]))
# imu2YAverages.append(numpy.mean(logdata.channels["IMU2"]["GyrY"].dictData.values()[i*sliceLength2:i*sliceLength2+sliceLength2]))
# imu2ZAverages.append(numpy.mean(logdata.channels["IMU2"]["GyrZ"].dictData.values()[i*sliceLength2:i*sliceLength2+sliceLength2]))
# imuXDiffAverages.append(imu2XAverages[-1]-imu1XAverages[-1])
# imuYDiffAverages.append(imu2YAverages[-1]-imu1YAverages[-1])
# imuZDiffAverages.append(imu2ZAverages[-1]-imu1ZAverages[-1])
# if abs(imuXDiffAverages[-1]) > maxDiffX:
# maxDiffX = imuXDiffAverages[-1]
# if abs(imuYDiffAverages[-1]) > maxDiffY:
# maxDiffY = imuYDiffAverages[-1]
# if abs(imuZDiffAverages[-1]) > maxDiffZ:
# maxDiffZ = imuZDiffAverages[-1]
# if max(maxDiffX,maxDiffY,maxDiffZ) > diffThresholdFAIL:
# self.result.status = TestResult.StatusType.FAIL
# self.result.statusMessage = "IMU/IMU2 gyro averages differ by more than %s radians" % diffThresholdFAIL
# elif max(maxDiffX,maxDiffY,maxDiffZ) > diffThresholdWARN:
# self.result.status = TestResult.StatusType.WARN
# self.result.statusMessage = "IMU/IMU2 gyro averages differ by more than %s radians" % diffThresholdWARN
# # pylab.plot(zip(*imuX)[0], zip(*imuX)[1], 'g')
# # pylab.plot(zip(*imu2X)[0], zip(*imu2X)[1], 'r')
# #pylab.plot(range(0,(nSamples*sliceLength1),sliceLength1), imu1ZAverages, 'b')
# print("Gyro averages1X: " + repr(imu1XAverages))
# print("Gyro averages1Y: " + repr(imu1YAverages))
# print("Gyro averages1Z: " + repr(imu1ZAverages) + "\n")
# print("Gyro averages2X: " + repr(imu2XAverages))
# print("Gyro averages2Y: " + repr(imu2YAverages))
# print("Gyro averages2Z: " + repr(imu2ZAverages) + "\n")
# print("Gyro averages diff X: " + repr(imuXDiffAverages))
# print("Gyro averages diff Y: " + repr(imuYDiffAverages))
# print("Gyro averages diff Z: " + repr(imuZDiffAverages))
# # lowpass filter using numpy
# # cutoff = 100
# # fs = 10000.0
# # b,a = scipy.signal.filter_design.butter(5,cutoff/(fs/2))
# # imuXFiltered = scipy.signal.filtfilt(b,a,zip(*imuX)[1])
# # imu2XFiltered = scipy.signal.filtfilt(b,a,zip(*imu2X)[1])
# #pylab.plot(imuXFiltered, 'r')
# # TMP: DISPLAY BEFORE+AFTER plots
# pylab.show()
# # print("imuX average before lowpass filter: %.8f" % logdata.channels["IMU"]["GyrX"].avg())
# # print("imuX average after lowpass filter: %.8f" % numpy.mean(imuXFiltered))
# # print("imu2X average before lowpass filter: %.8f" % logdata.channels["IMU2"]["GyrX"].avg())
# # print("imu2X average after lowpass filter: %.8f" % numpy.mean(imu2XFiltered))
# avg1X = logdata.channels["IMU"]["GyrX"].avg()
# avg1Y = logdata.channels["IMU"]["GyrY"].avg()
# avg1Z = logdata.channels["IMU"]["GyrZ"].avg()
# avg2X = logdata.channels["IMU2"]["GyrX"].avg()
# avg2Y = logdata.channels["IMU2"]["GyrY"].avg()
# avg2Z = logdata.channels["IMU2"]["GyrZ"].avg()
# avgRatioX = (max(avg1X,avg2X) - min(avg1X,avg2X)) / #abs(max(avg1X,avg2X) / min(avg1X,avg2X))
# avgRatioY = abs(max(avg1Y,avg2Y) / min(avg1Y,avg2Y))
# avgRatioZ = abs(max(avg1Z,avg2Z) / min(avg1Z,avg2Z))
# self.result.statusMessage = "IMU gyro avg: %.4f,%.4f,%.4f\nIMU2 gyro avg: %.4f,%.4f,%.4f\nAvg ratio: %.4f,%.4f,%.4f" % (avg1X,avg1Y,avg1Z, avg2X,avg2Y,avg2Z, avgRatioX,avgRatioY,avgRatioZ)

View File

@ -1,73 +0,0 @@
# AP_FLAKE8_CLEAN
from __future__ import print_function
from LogAnalyzer import Test, TestResult
class TestDupeLogData(Test):
'''test for duplicated data in log, which has been happening on PX4/Pixhawk'''
def __init__(self):
Test.__init__(self)
self.name = "Dupe Log Data"
def __matchSample(self, sample, sampleStartIndex, logdata):
'''return the line number where a match is found, otherwise return False'''
# ignore if all data in sample is the same value
nSame = 0
for s in sample:
if s[1] == sample[0][1]:
nSame += 1
if nSame == 20:
return False
# c
data = logdata.channels["ATT"]["Pitch"].listData
for i in range(sampleStartIndex, len(data)):
if i == sampleStartIndex:
continue # skip matching against ourselves
j = 0
while j < 20 and (i + j) < len(data) and data[i + j][1] == sample[j][1]:
j += 1
if j == 20: # all samples match
return data[i][0]
return False
def run(self, logdata, verbose):
self.result = TestResult()
self.result.status = TestResult.StatusType.GOOD
# this could be made more flexible by not hard-coding to use ATT data, could make it dynamic based on whatever
# is available as long as it is highly variable
if "ATT" not in logdata.channels:
self.result.status = TestResult.StatusType.UNKNOWN
self.result.statusMessage = "No ATT log data"
return
# pick 10 sample points within the range of ATT data we have
sampleStartIndices = []
attEndIndex = len(logdata.channels["ATT"]["Pitch"].listData) - 1
step = int(attEndIndex / 11)
for i in range(step, attEndIndex - step, step):
sampleStartIndices.append(i)
# get 20 datapoints of pitch from each sample location and check for a match elsewhere
sampleIndex = 0
for i in range(sampleStartIndices[0], len(logdata.channels["ATT"]["Pitch"].listData)):
if i == sampleStartIndices[sampleIndex]:
sample = logdata.channels["ATT"]["Pitch"].listData[i : i + 20]
matchedLine = self.__matchSample(sample, i, logdata)
if matchedLine:
self.result.status = TestResult.StatusType.FAIL
self.result.statusMessage = "Duplicate data chunks found in log (%d and %d)" % (
sample[0][0],
matchedLine,
)
return
sampleIndex += 1
if sampleIndex >= len(sampleStartIndices):
break

View File

@ -1,23 +0,0 @@
# AP_FLAKE8_CLEAN
import DataflashLog
from LogAnalyzer import Test, TestResult
class TestEmpty(Test):
'''test for empty or near-empty logs'''
def __init__(self):
Test.__init__(self)
self.name = "Empty"
def run(self, logdata, verbose):
self.result = TestResult()
self.result.status = TestResult.StatusType.GOOD
# all the logic for this test is in the helper function, as it can also be called up front as an early exit
emptyErr = DataflashLog.DataflashLogHelper.isLogEmpty(logdata)
if emptyErr:
self.result.status = TestResult.StatusType.FAIL
self.result.statusMessage = "Empty log? " + emptyErr

View File

@ -1,57 +0,0 @@
# AP_FLAKE8_CLEAN
from LogAnalyzer import Test, TestResult
class TestEvents(Test):
'''test for erroneous events and failsafes'''
# TODO: need to check for vehicle-specific codes
def __init__(self):
Test.__init__(self)
self.name = "Event/Failsafe"
def run(self, logdata, verbose):
self.result = TestResult()
self.result.status = TestResult.StatusType.GOOD
errors = set()
if "ERR" in logdata.channels:
assert len(logdata.channels["ERR"]["Subsys"].listData) == len(logdata.channels["ERR"]["ECode"].listData)
for i in range(len(logdata.channels["ERR"]["Subsys"].listData)):
subSys = logdata.channels["ERR"]["Subsys"].listData[i][1]
eCode = logdata.channels["ERR"]["ECode"].listData[i][1]
if subSys == 2 and (eCode == 1):
errors.add("PPM")
elif subSys == 3 and (eCode == 1 or eCode == 2):
errors.add("COMPASS")
elif subSys == 5 and (eCode == 1):
errors.add("FS_THR")
elif subSys == 6 and (eCode == 1):
errors.add("FS_BATT")
elif subSys == 7 and (eCode == 1):
errors.add("GPS")
elif subSys == 8 and (eCode == 1):
errors.add("GCS")
elif subSys == 9 and (eCode == 1 or eCode == 2):
errors.add("FENCE")
elif subSys == 10:
errors.add("FLT_MODE")
elif subSys == 11 and (eCode == 2):
errors.add("GPS_GLITCH")
elif subSys == 12 and (eCode == 1):
errors.add("CRASH")
if errors:
if len(errors) == 1 and "FENCE" in errors:
self.result.status = TestResult.StatusType.WARN
else:
self.result.status = TestResult.StatusType.FAIL
if len(errors) == 1:
self.result.statusMessage = "ERR found: "
else:
self.result.statusMessage = "ERRs found: "
for err in errors:
self.result.statusMessage = self.result.statusMessage + err + " "

View File

@ -1,72 +0,0 @@
# AP_FLAKE8_CLEAN
from LogAnalyzer import Test, TestResult
class TestGPSGlitch(Test):
'''test for GPS glitch reporting or bad GPS data (satellite count, hdop)'''
def __init__(self):
Test.__init__(self)
self.name = "GPS"
def findSatsChan(self, channels):
for chan in "NSats", "NSat", "numSV":
if chan in channels:
return channels[chan]
def findHDopChan(self, channels):
for chan in "HDop", "HDp", "EPH":
if chan in channels:
return channels[chan]
def run(self, logdata, verbose):
self.result = TestResult()
self.result.status = TestResult.StatusType.GOOD
if "GPS" not in logdata.channels:
self.result.status = TestResult.StatusType.UNKNOWN
self.result.statusMessage = "No GPS log data"
return
# glitch protection is currently copter-only, but might be
# added to other vehicle types later and there's no harm in
# leaving the test in for all
gpsGlitchCount = 0
if "ERR" in logdata.channels:
assert len(logdata.channels["ERR"]["Subsys"].listData) == len(logdata.channels["ERR"]["ECode"].listData)
for i in range(len(logdata.channels["ERR"]["Subsys"].listData)):
subSys = logdata.channels["ERR"]["Subsys"].listData[i][1]
eCode = logdata.channels["ERR"]["ECode"].listData[i][1]
if subSys == 11 and (eCode == 2):
gpsGlitchCount += 1
if gpsGlitchCount:
self.result.status = TestResult.StatusType.FAIL
self.result.statusMessage = "GPS glitch errors found (%d)" % gpsGlitchCount
# define and check different thresholds for WARN level and
# FAIL level
# TODO: for plane, only check after first instance of throttle
# > 0, or after takeoff if we can reliably detect it
minSatsWARN = 6
minSatsFAIL = 5
maxHDopWARN = 3.0
maxHDopFAIL = 10.0
satsChan = self.findSatsChan(logdata.channels["GPS"])
hdopChan = self.findHDopChan(logdata.channels["GPS"])
foundBadSatsWarn = satsChan.min() < minSatsWARN
foundBadHDopWarn = hdopChan.max() > maxHDopWARN
foundBadSatsFail = satsChan.min() < minSatsFAIL
foundBadHDopFail = hdopChan.max() > maxHDopFAIL
satsMsg = "Min satellites: %s, Max HDop: %s" % (satsChan.min(), hdopChan.max())
if gpsGlitchCount:
self.result.statusMessage = "\n".join([self.result.statusMessage, satsMsg])
if foundBadSatsFail or foundBadHDopFail:
if not gpsGlitchCount:
self.result.status = TestResult.StatusType.FAIL
self.result.statusMessage = satsMsg
elif foundBadSatsWarn or foundBadHDopWarn:
if not gpsGlitchCount:
self.result.status = TestResult.StatusType.WARN
self.result.statusMessage = satsMsg

View File

@ -1,139 +0,0 @@
# AP_FLAKE8_CLEAN
from __future__ import print_function
from math import sqrt
from LogAnalyzer import Test, TestResult
class TestIMUMatch(Test):
'''test for empty or near-empty logs'''
def __init__(self):
Test.__init__(self)
self.name = "IMU Mismatch"
def run(self, logdata, verbose):
# tuning parameters:
warn_threshold = 0.75
fail_threshold = 1.5
filter_tc = 5.0
self.result = TestResult()
self.result.status = TestResult.StatusType.GOOD
if ("IMU" in logdata.channels) and ("IMU2" not in logdata.channels):
self.result.status = TestResult.StatusType.NA
self.result.statusMessage = "No IMU2"
return
if ("IMU" not in logdata.channels) or ("IMU2" not in logdata.channels):
self.result.status = TestResult.StatusType.UNKNOWN
self.result.statusMessage = "No IMU log data"
return
imu1 = logdata.channels["IMU"]
imu2 = logdata.channels["IMU2"]
timeLabel = None
for i in 'TimeMS', 'TimeUS', 'Time':
if i in logdata.channels["GPS"]:
timeLabel = i
break
imu1_timems = imu1[timeLabel].listData
imu1_accx = imu1["AccX"].listData
imu1_accy = imu1["AccY"].listData
imu1_accz = imu1["AccZ"].listData
imu2_timems = imu2[timeLabel].listData
imu2_accx = imu2["AccX"].listData
imu2_accy = imu2["AccY"].listData
imu2_accz = imu2["AccZ"].listData
imu_multiplier = 1.0e-3
if timeLabel == 'TimeUS':
imu_multiplier = 1.0e-6
imu1 = []
imu2 = []
for i in range(len(imu1_timems)):
imu1.append(
{
't': imu1_timems[i][1] * imu_multiplier,
'x': imu1_accx[i][1],
'y': imu1_accy[i][1],
'z': imu1_accz[i][1],
}
)
for i in range(len(imu2_timems)):
imu2.append(
{
't': imu2_timems[i][1] * imu_multiplier,
'x': imu2_accx[i][1],
'y': imu2_accy[i][1],
'z': imu2_accz[i][1],
}
)
imu1.sort(key=lambda x: x['t'])
imu2.sort(key=lambda x: x['t'])
imu2_index = 0
last_t = None
xdiff_filtered = 0
ydiff_filtered = 0
zdiff_filtered = 0
max_diff_filtered = 0
for i in range(len(imu1)):
# find closest imu2 value
t = imu1[i]['t']
dt = 0 if last_t is None else t - last_t
dt = min(dt, 0.1)
next_imu2 = None
for i in range(imu2_index, len(imu2)):
next_imu2 = imu2[i]
imu2_index = i
if next_imu2['t'] >= t:
break
prev_imu2 = imu2[imu2_index - 1]
closest_imu2 = next_imu2 if abs(next_imu2['t'] - t) < abs(prev_imu2['t'] - t) else prev_imu2
xdiff = imu1[i]['x'] - closest_imu2['x']
ydiff = imu1[i]['y'] - closest_imu2['y']
zdiff = imu1[i]['z'] - closest_imu2['z']
xdiff_filtered += (xdiff - xdiff_filtered) * dt / filter_tc
ydiff_filtered += (ydiff - ydiff_filtered) * dt / filter_tc
zdiff_filtered += (zdiff - zdiff_filtered) * dt / filter_tc
diff_filtered = sqrt(xdiff_filtered**2 + ydiff_filtered**2 + zdiff_filtered**2)
max_diff_filtered = max(max_diff_filtered, diff_filtered)
last_t = t
if max_diff_filtered > fail_threshold:
self.result.statusMessage = (
"Check vibration or accelerometer calibration. (Mismatch: %.2f, WARN: %.2f, FAIL: %.2f)"
% (max_diff_filtered, warn_threshold, fail_threshold)
)
self.result.status = TestResult.StatusType.FAIL
elif max_diff_filtered > warn_threshold:
self.result.statusMessage = (
"Check vibration or accelerometer calibration. (Mismatch: %.2f, WARN: %.2f, FAIL: %.2f)"
% (max_diff_filtered, warn_threshold, fail_threshold)
)
self.result.status = TestResult.StatusType.WARN
else:
self.result.statusMessage = "(Mismatch: %.2f, WARN: %.2f, FAIL: %.2f)" % (
max_diff_filtered,
warn_threshold,
fail_threshold,
)

View File

@ -1,85 +0,0 @@
# AP_FLAKE8_CLEAN
from LogAnalyzer import Test, TestResult
from VehicleType import VehicleType
class TestBalanceTwist(Test):
'''test for badly unbalanced copter, including yaw twist'''
def __init__(self):
Test.__init__(self)
self.name = "Motor Balance"
def run(self, logdata, verbose):
self.result = TestResult()
self.result.status = TestResult.StatusType.GOOD
if logdata.vehicleType != VehicleType.Copter:
self.result.status = TestResult.StatusType.NA
return
self.result.status = TestResult.StatusType.UNKNOWN
if "RCOU" not in logdata.channels:
return
ch = []
for i in range(8):
for prefix in "Chan", "Ch", "C":
if prefix + repr((i + 1)) in logdata.channels["RCOU"]:
ch.append(map(lambda x: x[1], logdata.channels["RCOU"][prefix + repr((i + 1))].listData))
ch = zip(*ch)
num_channels = 0
ch = list(ch)
for i in range(len(ch)):
ch[i] = list(filter(lambda x: (x > 0 and x < 3000), ch[i]))
if num_channels < len(ch[i]):
num_channels = len(ch[i])
if logdata.frame:
num_channels = logdata.num_motor_channels()
if num_channels < 2:
return
try:
min_throttle = (
logdata.parameters["RC3_MIN"]
+ logdata.parameters["THR_MIN"]
/ (logdata.parameters["RC3_MAX"] - logdata.parameters["RC3_MIN"])
/ 1000.0
)
except KeyError:
min_throttle = (
logdata.parameters["MOT_PWM_MIN"]
/ (logdata.parameters["MOT_PWM_MAX"] - logdata.parameters["RC3_MIN"])
/ 1000.0
)
ch = list(filter(lambda x: sum(x) / num_channels > min_throttle, ch))
if len(ch) == 0:
return
avg_sum = 0
avg_ch = []
for i in range(num_channels):
avg = list(map(lambda x: x[i], ch))
avg = sum(avg) / len(avg)
avg_ch.append(avg)
avg_sum += avg
avg_all = avg_sum / num_channels
self.result.statusMessage = (
"Motor channel averages = %s\nAverage motor output = %.0f\nDifference between min and max motor averages = %.0f"
% (str(avg_ch), avg_all, abs(min(avg_ch) - max(avg_ch)))
)
self.result.status = TestResult.StatusType.GOOD
if abs(min(avg_ch) - max(avg_ch)) > 75:
self.result.status = TestResult.StatusType.WARN
if abs(min(avg_ch) - max(avg_ch)) > 150:
self.result.status = TestResult.StatusType.FAIL

View File

@ -1,43 +0,0 @@
# AP_FLAKE8_CLEAN
import math
from LogAnalyzer import Test, TestResult
class TestNaN(Test):
'''test for NaNs present in log'''
def __init__(self):
Test.__init__(self)
self.name = "NaNs"
def run(self, logdata, verbose):
self.result = TestResult()
self.result.status = TestResult.StatusType.GOOD
def FAIL():
self.result.status = TestResult.StatusType.FAIL
nans_ok = {
"CTUN": ["DSAlt", "TAlt"],
"POS": ["RelOriginAlt"],
}
for channel in logdata.channels.keys():
for field in logdata.channels[channel].keys():
if channel in nans_ok and field in nans_ok[channel]:
continue
try:
for tupe in logdata.channels[channel][field].listData:
(ts, val) = tupe
if isinstance(val, float) and math.isnan(val):
FAIL()
self.result.statusMessage += "Found NaN in %s.%s\n" % (
channel,
field,
)
raise ValueError()
except ValueError:
continue

View File

@ -1,336 +0,0 @@
# AP_FLAKE8_CLEAN
from math import sqrt
import matplotlib.pyplot as plt
import numpy as np
from LogAnalyzer import Test, TestResult
class TestFlow(Test):
'''test optical flow sensor scale factor calibration'''
#
# Use the following procedure to log the calibration data. is assumed that the optical flow sensor has been
# correctly aligned, is focussed and the test is performed over a textured surface with adequate lighting.
# Note that the strobing effect from non incandescent artifical lighting can produce poor optical flow measurements.
#
# 1) Set LOG_DISARMED and FLOW_TYPE to 10 and verify that ATT and OF messages are being logged onboard
# 2) Place on level ground, apply power and wait for EKF to complete attitude alignment
# 3) Keeping the copter level, lift it to shoulder height and rock between +-20 and +-30 degrees
# in roll about an axis that passes through the flow sensor lens assembly. The time taken to rotate from
# maximum left roll to maximum right roll should be about 1 second.
# 4) Repeat 3) about the pitch axis
# 5) Holding the copter level, lower it to the ground and remove power
# 6) Transfer the logfile from the sdcard.
# 7) Open a terminal and cd to the ardupilot/Tools/LogAnalyzer directory
# 8) Enter to run the analysis 'python LogAnalyzer.py <log file name including full path>'
# 9) Check the OpticalFlow test status printed to the screen. The analysis plots are saved to
# flow_calibration.pdf and the recommended scale factors to flow_calibration.param
def __init__(self):
Test.__init__(self)
self.name = "OpticalFlow"
def run(self, logdata, verbose):
self.result = TestResult()
self.result.status = TestResult.StatusType.GOOD
def FAIL():
self.result.status = TestResult.StatusType.FAIL
def WARN():
if self.result.status != TestResult.StatusType.FAIL:
self.result.status = TestResult.StatusType.WARN
try:
# tuning parameters used by the algorithm
tilt_threshold = 15 # roll and pitch threshold used to start and stop calibration (deg)
quality_threshold = 124 # minimum flow quality required for data to be used by the curve fit (N/A)
min_rate_threshold = (
0.0 # if the gyro rate is less than this, the data will not be used by the curve fit (rad/sec)
)
max_rate_threshold = (
2.0 # if the gyro rate is greter than this, the data will not be used by the curve fit (rad/sec)
)
param_std_threshold = 5.0 # maximum allowable 1-std uncertainty in scaling parameter (scale factor * 1000)
# max/min allowable scale factor parameter. Values of FLOW_FXSCALER and FLOW_FYSCALER outside the range
# of +-param_abs_threshold indicate a sensor configuration problem.
param_abs_threshold = 200
# minimum number of points required for a curve fit - this is necessary, but not sufficient condition - the
# standard deviation estimate of the fit gradient is also important.
min_num_points = 100
# get the existing scale parameters
flow_fxscaler = logdata.parameters["FLOW_FXSCALER"]
flow_fyscaler = logdata.parameters["FLOW_FYSCALER"]
# load required optical flow data
if "OF" in logdata.channels:
flowX = np.zeros(len(logdata.channels["OF"]["flowX"].listData))
for i in range(len(logdata.channels["OF"]["flowX"].listData)):
(line, flowX[i]) = logdata.channels["OF"]["flowX"].listData[i]
bodyX = np.zeros(len(logdata.channels["OF"]["bodyX"].listData))
for i in range(len(logdata.channels["OF"]["bodyX"].listData)):
(line, bodyX[i]) = logdata.channels["OF"]["bodyX"].listData[i]
flowY = np.zeros(len(logdata.channels["OF"]["flowY"].listData))
for i in range(len(logdata.channels["OF"]["flowY"].listData)):
(line, flowY[i]) = logdata.channels["OF"]["flowY"].listData[i]
bodyY = np.zeros(len(logdata.channels["OF"]["bodyY"].listData))
for i in range(len(logdata.channels["OF"]["bodyY"].listData)):
(line, bodyY[i]) = logdata.channels["OF"]["bodyY"].listData[i]
flow_time_us = np.zeros(len(logdata.channels["OF"]["TimeUS"].listData))
for i in range(len(logdata.channels["OF"]["TimeUS"].listData)):
(line, flow_time_us[i]) = logdata.channels["OF"]["TimeUS"].listData[i]
flow_qual = np.zeros(len(logdata.channels["OF"]["Qual"].listData))
for i in range(len(logdata.channels["OF"]["Qual"].listData)):
(line, flow_qual[i]) = logdata.channels["OF"]["Qual"].listData[i]
else:
FAIL()
self.result.statusMessage = "FAIL: no optical flow data\n"
return
# load required attitude data
if "ATT" in logdata.channels:
Roll = np.zeros(len(logdata.channels["ATT"]["Roll"].listData))
for i in range(len(logdata.channels["ATT"]["Roll"].listData)):
(line, Roll[i]) = logdata.channels["ATT"]["Roll"].listData[i]
Pitch = np.zeros(len(logdata.channels["ATT"]["Pitch"].listData))
for i in range(len(logdata.channels["ATT"]["Pitch"].listData)):
(line, Pitch[i]) = logdata.channels["ATT"]["Pitch"].listData[i]
att_time_us = np.zeros(len(logdata.channels["ATT"]["TimeUS"].listData))
for i in range(len(logdata.channels["ATT"]["TimeUS"].listData)):
(line, att_time_us[i]) = logdata.channels["ATT"]["TimeUS"].listData[i]
else:
FAIL()
self.result.statusMessage = "FAIL: no attitude data\n"
return
# calculate the start time for the roll calibration
startTime = int(0)
startRollIndex = int(0)
for i in range(len(Roll)):
if abs(Roll[i]) > tilt_threshold:
startTime = att_time_us[i]
break
for i in range(len(flow_time_us)):
if flow_time_us[i] > startTime:
startRollIndex = i
break
# calculate the end time for the roll calibration
endTime = int(0)
endRollIndex = int(0)
for i in range(len(Roll) - 1, -1, -1):
if abs(Roll[i]) > tilt_threshold:
endTime = att_time_us[i]
break
for i in range(len(flow_time_us) - 1, -1, -1):
if flow_time_us[i] < endTime:
endRollIndex = i
break
# check we have enough roll data points
if endRollIndex - startRollIndex <= min_num_points:
FAIL()
self.result.statusMessage = "FAIL: insufficient roll data pointsa\n"
return
# resample roll test data excluding data before first movement and after last movement
# also exclude data where there is insufficient angular rate
flowX_resampled = []
bodyX_resampled = []
flowX_time_us_resampled = []
for i in range(len(Roll)):
if (
(i >= startRollIndex)
and (i <= endRollIndex)
and (abs(bodyX[i]) > min_rate_threshold)
and (abs(bodyX[i]) < max_rate_threshold)
and (flow_qual[i] > quality_threshold)
):
flowX_resampled.append(flowX[i])
bodyX_resampled.append(bodyX[i])
flowX_time_us_resampled.append(flow_time_us[i])
# calculate the start time for the pitch calibration
startTime = 0
startPitchIndex = int(0)
for i in range(len(Pitch)):
if abs(Pitch[i]) > tilt_threshold:
startTime = att_time_us[i]
break
for i in range(len(flow_time_us)):
if flow_time_us[i] > startTime:
startPitchIndex = i
break
# calculate the end time for the pitch calibration
endTime = 0
endPitchIndex = int(0)
for i in range(len(Pitch) - 1, -1, -1):
if abs(Pitch[i]) > tilt_threshold:
endTime = att_time_us[i]
break
for i in range(len(flow_time_us) - 1, -1, -1):
if flow_time_us[i] < endTime:
endPitchIndex = i
break
# check we have enough pitch data points
if endPitchIndex - startPitchIndex <= min_num_points:
FAIL()
self.result.statusMessage = "FAIL: insufficient pitch data pointsa\n"
return
# resample pitch test data excluding data before first movement and after last movement
# also exclude data where there is insufficient or too much angular rate
flowY_resampled = []
bodyY_resampled = []
flowY_time_us_resampled = []
for i in range(len(Roll)):
if (
(i >= startPitchIndex)
and (i <= endPitchIndex)
and (abs(bodyY[i]) > min_rate_threshold)
and (abs(bodyY[i]) < max_rate_threshold)
and (flow_qual[i] > quality_threshold)
):
flowY_resampled.append(flowY[i])
bodyY_resampled.append(bodyY[i])
flowY_time_us_resampled.append(flow_time_us[i])
# fit a straight line to the flow vs body rate data and calculate the scale factor parameter required to
# achieve a slope of 1
coef_flow_x, cov_x = np.polyfit(
bodyX_resampled, flowX_resampled, 1, rcond=None, full=False, w=None, cov=True
)
coef_flow_y, cov_y = np.polyfit(
bodyY_resampled, flowY_resampled, 1, rcond=None, full=False, w=None, cov=True
)
# taking the exisiting scale factor parameters into account, calculate the parameter values reequired to
# achieve a unity slope
flow_fxscaler_new = int(1000 * (((1 + 0.001 * float(flow_fxscaler)) / coef_flow_x[0] - 1)))
flow_fyscaler_new = int(1000 * (((1 + 0.001 * float(flow_fyscaler)) / coef_flow_y[0] - 1)))
# Do a sanity check on the scale factor variance
if sqrt(cov_x[0][0]) > param_std_threshold or sqrt(cov_y[0][0]) > param_std_threshold:
FAIL()
self.result.statusMessage = (
"FAIL: inaccurate fit - poor quality or insufficient data"
"\nFLOW_FXSCALER 1STD = %u"
"\nFLOW_FYSCALER 1STD = %u\n" % (round(1000 * sqrt(cov_x[0][0])), round(1000 * sqrt(cov_y[0][0])))
)
# Do a sanity check on the scale factors
if abs(flow_fxscaler_new) > param_abs_threshold or abs(flow_fyscaler_new) > param_abs_threshold:
FAIL()
self.result.statusMessage = (
"FAIL: required scale factors are excessive\nFLOW_FXSCALER=%i\nFLOW_FYSCALER=%i\n"
% (flow_fxscaler, flow_fyscaler)
)
# display recommended scale factors
self.result.statusMessage = (
"Set FLOW_FXSCALER to %i"
"\nSet FLOW_FYSCALER to %i"
"\n\nCal plots saved to flow_calibration.pdf"
"\nCal parameters saved to flow_calibration.param"
"\n\nFLOW_FXSCALER 1STD = %u"
"\nFLOW_FYSCALER 1STD = %u\n"
% (
flow_fxscaler_new,
flow_fyscaler_new,
round(1000 * sqrt(cov_x[0][0])),
round(1000 * sqrt(cov_y[0][0])),
)
)
# calculate fit display data
body_rate_display = [-max_rate_threshold, max_rate_threshold]
fit_coef_x = np.poly1d(coef_flow_x)
flowX_display = fit_coef_x(body_rate_display)
fit_coef_y = np.poly1d(coef_flow_y)
flowY_display = fit_coef_y(body_rate_display)
# plot and save calibration test points to PDF
from matplotlib.backends.backend_pdf import PdfPages
output_plot_filename = "flow_calibration.pdf"
pp = PdfPages(output_plot_filename)
plt.figure(1, figsize=(20, 13))
plt.subplot(2, 1, 1)
plt.plot(bodyX_resampled, flowX_resampled, 'b', linestyle=' ', marker='o', label="test points")
plt.plot(body_rate_display, flowX_display, 'r', linewidth=2.5, label="linear fit")
plt.title('X axis flow rate vs gyro rate')
plt.ylabel('flow rate (rad/s)')
plt.xlabel('gyro rate (rad/sec)')
plt.grid()
plt.legend(loc='upper left')
# draw plots
plt.subplot(2, 1, 2)
plt.plot(bodyY_resampled, flowY_resampled, 'b', linestyle=' ', marker='o', label="test points")
plt.plot(body_rate_display, flowY_display, 'r', linewidth=2.5, label="linear fit")
plt.title('Y axis flow rate vs gyro rate')
plt.ylabel('flow rate (rad/s)')
plt.xlabel('gyro rate (rad/sec)')
plt.grid()
plt.legend(loc='upper left')
pp.savefig()
plt.figure(2, figsize=(20, 13))
plt.subplot(2, 1, 1)
plt.plot(flow_time_us, flowX, 'b', label="flow rate - all")
plt.plot(flow_time_us, bodyX, 'r', label="gyro rate - all")
plt.plot(flowX_time_us_resampled, flowX_resampled, 'c', linestyle=' ', marker='o', label="flow rate - used")
plt.plot(flowX_time_us_resampled, bodyX_resampled, 'm', linestyle=' ', marker='o', label="gyro rate - used")
plt.title('X axis flow and body rate vs time')
plt.ylabel('rate (rad/s)')
plt.xlabel('time (usec)')
plt.grid()
plt.legend(loc='upper left')
# draw plots
plt.subplot(2, 1, 2)
plt.plot(flow_time_us, flowY, 'b', label="flow rate - all")
plt.plot(flow_time_us, bodyY, 'r', label="gyro rate - all")
plt.plot(flowY_time_us_resampled, flowY_resampled, 'c', linestyle=' ', marker='o', label="flow rate - used")
plt.plot(flowY_time_us_resampled, bodyY_resampled, 'm', linestyle=' ', marker='o', label="gyro rate - used")
plt.title('Y axis flow and body rate vs time')
plt.ylabel('rate (rad/s)')
plt.xlabel('time (usec)')
plt.grid()
plt.legend(loc='upper left')
pp.savefig()
# close the pdf file
pp.close()
# close all figures
plt.close("all")
# write correction parameters to file
test_results_filename = "flow_calibration.param"
file = open(test_results_filename, "w")
file.write("FLOW_FXSCALER" + " " + str(flow_fxscaler_new) + "\n")
file.write("FLOW_FYSCALER" + " " + str(flow_fyscaler_new) + "\n")
file.close()
except KeyError as e:
self.result.status = TestResult.StatusType.FAIL
self.result.statusMessage = str(e) + ' not found'

View File

@ -1,81 +0,0 @@
# AP_FLAKE8_CLEAN
import math # for isnan()
from LogAnalyzer import Test, TestResult
from VehicleType import VehicleType
class TestParams(Test):
'''test for any obviously bad parameters in the config'''
def __init__(self):
Test.__init__(self)
self.name = "Parameters"
# helper functions
def __checkParamIsEqual(self, paramName, expectedValue, logdata):
value = logdata.parameters[paramName]
if value != expectedValue:
self.result.status = TestResult.StatusType.FAIL
self.result.statusMessage = self.result.statusMessage + "%s set to %s, expecting %s\n" % (
paramName,
repr(value),
repr(expectedValue),
)
def __checkParamIsLessThan(self, paramName, maxValue, logdata):
value = logdata.parameters[paramName]
if value >= maxValue:
self.result.status = TestResult.StatusType.FAIL
self.result.statusMessage = self.result.statusMessage + "%s set to %s, expecting less than %s\n" % (
paramName,
repr(value),
repr(maxValue),
)
def __checkParamIsMoreThan(self, paramName, minValue, logdata):
value = logdata.parameters[paramName]
if value <= minValue:
self.result.status = TestResult.StatusType.FAIL
self.result.statusMessage = self.result.statusMessage + "%s set to %s, expecting less than %s\n" % (
paramName,
repr(value),
repr(minValue),
)
def run(self, logdata, verbose):
self.result = TestResult()
self.result.status = TestResult.StatusType.GOOD # GOOD by default, tests below will override it if they fail
# check all params for NaN
for name, value in logdata.parameters.items():
if math.isnan(value):
self.result.status = TestResult.StatusType.FAIL
self.result.statusMessage = self.result.statusMessage + name + " is NaN\n"
try:
# add parameter checks below using the helper functions, any failures will trigger a FAIL status and
# accumulate info in statusMessage.
# If more complex checking or correlations are required you can access parameter values directly using the
# logdata.parameters[paramName] dict
if logdata.vehicleType == VehicleType.Copter:
self.__checkParamIsEqual("MAG_ENABLE", 1, logdata)
if "THR_MIN" in logdata.parameters:
self.__checkParamIsLessThan("THR_MIN", 200, logdata)
self.__checkParamIsLessThan("THR_MID", 701, logdata)
self.__checkParamIsMoreThan("THR_MID", 299, logdata)
# TODO: add more parameter tests, these are just an example...
elif logdata.vehicleType == VehicleType.Plane:
# TODO: add parameter checks for plane...
pass
elif logdata.vehicleType == VehicleType.Rover:
# TODO: add parameter checks for rover...
pass
if self.result.status == TestResult.StatusType.FAIL:
self.result.statusMessage = "Bad parameters found:\n" + self.result.statusMessage
except KeyError as e:
self.result.status = TestResult.StatusType.FAIL
self.result.statusMessage = str(e) + ' not found'

View File

@ -1,73 +0,0 @@
# AP_FLAKE8_CLEAN
from __future__ import print_function
from LogAnalyzer import Test, TestResult
from VehicleType import VehicleType
class TestPerformance(Test):
'''check performance monitoring messages (PM) for issues with slow loops, etc'''
def __init__(self):
Test.__init__(self)
self.name = "PM"
def run(self, logdata, verbose):
self.result = TestResult()
self.result.status = TestResult.StatusType.GOOD
# this test should be valid for all vehicle types, just need to figure out why PM logging data is different in each
if logdata.vehicleType != VehicleType.Copter:
self.result.status = TestResult.StatusType.NA
return
# NOTE: we'll ignore MaxT altogether for now, it seems there are quite regularly one or two high values in
# there, even ignoring the ones expected after arm/disarm events.
# gather info on arm/disarm lines, we will ignore the MaxT data from the first line found after each of these
# armingLines = []
# for line,ev in logdata.channels["EV"]["Id"].listData:
# if (ev == 10) or (ev == 11):
# armingLines.append(line)
# ignoreMaxTLines = []
# for maxT in logdata.channels["PM"]["MaxT"].listData:
# if not armingLines:
# break
# if maxT[0] > armingLines[0]:
# ignoreMaxTLines.append(maxT[0])
# armingLines.pop(0)
if "PM" not in logdata.channels:
self.result.status = TestResult.StatusType.UNKNOWN
self.result.statusMessage = "No PM log data"
return
# check for slow loops, i.e. NLon greater than 6% of NLoop
maxPercentSlow = 0
maxPercentSlowLine = 0
slowLoopLineCount = 0
for i in range(len(logdata.channels["PM"]["NLon"].listData)):
(line, nLon) = logdata.channels["PM"]["NLon"].listData[i]
(line, nLoop) = logdata.channels["PM"]["NLoop"].listData[i]
(line, maxT) = logdata.channels["PM"]["MaxT"].listData[i]
percentSlow = (nLon / float(nLoop)) * 100
if percentSlow > 6.0:
slowLoopLineCount = slowLoopLineCount + 1
if percentSlow > maxPercentSlow:
maxPercentSlow = percentSlow
maxPercentSlowLine = line
if (maxPercentSlow > 10) or (slowLoopLineCount > 6):
self.result.status = TestResult.StatusType.FAIL
self.result.statusMessage = "%d slow loop lines found, max %.2f%% on line %d" % (
slowLoopLineCount,
maxPercentSlow,
maxPercentSlowLine,
)
elif maxPercentSlow > 6:
self.result.status = TestResult.StatusType.WARN
self.result.statusMessage = "%d slow loop lines found, max %.2f%% on line %d" % (
slowLoopLineCount,
maxPercentSlow,
maxPercentSlowLine,
)

View File

@ -1,163 +0,0 @@
# AP_FLAKE8_CLEAN
import collections
import DataflashLog
from LogAnalyzer import Test, TestResult
from VehicleType import VehicleType
class TestPitchRollCoupling(Test):
'''test for divergence between input and output pitch/roll, i.e. mechanical failure or bad PID tuning'''
# TODO: currently we're only checking for roll/pitch outside of max lean angle, will come back later to analyze
# roll/pitch in versus out values
def __init__(self):
Test.__init__(self)
self.name = "Pitch/Roll"
def run(self, logdata, verbose):
self.result = TestResult()
self.result.status = TestResult.StatusType.GOOD
if logdata.vehicleType != VehicleType.Copter:
self.result.status = TestResult.StatusType.NA
return
if "ATT" not in logdata.channels:
self.result.status = TestResult.StatusType.UNKNOWN
self.result.statusMessage = "No ATT log data"
return
if "CTUN" not in logdata.channels:
self.result.status = TestResult.StatusType.UNKNOWN
self.result.statusMessage = "No CTUN log data"
return
if "BarAlt" in logdata.channels['CTUN']:
self.ctun_baralt_att = 'BarAlt'
else:
self.ctun_baralt_att = 'BAlt'
# figure out where each mode begins and ends, so we can treat auto and manual modes differently and ignore
# acro/tune modes
autoModes = [
"RTL",
"AUTO",
"LAND",
"LOITER",
"GUIDED",
"CIRCLE",
"OF_LOITER",
"POSHOLD",
"BRAKE",
"AVOID_ADSB",
"GUIDED_NOGPS",
"SMARTRTL",
]
# use CTUN RollIn/DesRoll + PitchIn/DesPitch
manualModes = ["STABILIZE", "DRIFT", "ALTHOLD", "ALT_HOLD", "POSHOLD"]
# ignore data from these modes:
ignoreModes = [
"ACRO",
"SPORT",
"FLIP",
"AUTOTUNE",
"",
"THROW",
]
autoSegments = [] # list of (startLine,endLine) pairs
manualSegments = [] # list of (startLine,endLine) pairs
orderedModes = collections.OrderedDict(sorted(logdata.modeChanges.items(), key=lambda t: t[0]))
isAuto = False # we always start in a manual control mode
prevLine = 0
mode = ""
for line, modepair in orderedModes.items():
mode = modepair[0].upper()
if prevLine == 0:
prevLine = line
if mode in autoModes:
if not isAuto:
manualSegments.append((prevLine, line - 1))
prevLine = line
isAuto = True
elif mode in manualModes:
if isAuto:
autoSegments.append((prevLine, line - 1))
prevLine = line
isAuto = False
elif mode in ignoreModes:
if isAuto:
autoSegments.append((prevLine, line - 1))
else:
manualSegments.append((prevLine, line - 1))
prevLine = 0
else:
raise Exception("Unknown mode in TestPitchRollCoupling: %s" % mode)
# and handle the last segment, which doesn't have an ending
if mode in autoModes:
autoSegments.append((prevLine, logdata.lineCount))
elif mode in manualModes:
manualSegments.append((prevLine, logdata.lineCount))
# figure out max lean angle, the ANGLE_MAX param was added in AC3.1
maxLeanAngle = 45.0
if "ANGLE_MAX" in logdata.parameters:
maxLeanAngle = logdata.parameters["ANGLE_MAX"] / 100.0
maxLeanAngleBuffer = 10 # allow a buffer margin
# ignore anything below this altitude, to discard any data while not flying
minAltThreshold = 2.0
# look through manual+auto flight segments
# TODO: filter to ignore single points outside range?
(maxRoll, maxRollLine) = (0.0, 0)
(maxPitch, maxPitchLine) = (0.0, 0)
for (startLine, endLine) in manualSegments + autoSegments:
# quick up-front test, only fallover into more complex line-by-line check if max()>threshold
rollSeg = logdata.channels["ATT"]["Roll"].getSegment(startLine, endLine)
pitchSeg = logdata.channels["ATT"]["Pitch"].getSegment(startLine, endLine)
if not rollSeg.dictData and not pitchSeg.dictData:
continue
# check max roll+pitch for any time where relative altitude is above minAltThreshold
roll = max(abs(rollSeg.min()), abs(rollSeg.max()))
pitch = max(abs(pitchSeg.min()), abs(pitchSeg.max()))
if (roll > (maxLeanAngle + maxLeanAngleBuffer) and abs(roll) > abs(maxRoll)) or (
pitch > (maxLeanAngle + maxLeanAngleBuffer) and abs(pitch) > abs(maxPitch)
):
lit = DataflashLog.LogIterator(logdata, startLine)
assert lit.currentLine == startLine
while lit.currentLine <= endLine:
relativeAlt = lit["CTUN"][self.ctun_baralt_att]
if relativeAlt > minAltThreshold:
roll = lit["ATT"]["Roll"]
pitch = lit["ATT"]["Pitch"]
if abs(roll) > (maxLeanAngle + maxLeanAngleBuffer) and abs(roll) > abs(maxRoll):
maxRoll = roll
maxRollLine = lit.currentLine
if abs(pitch) > (maxLeanAngle + maxLeanAngleBuffer) and abs(pitch) > abs(maxPitch):
maxPitch = pitch
maxPitchLine = lit.currentLine
next(lit)
# check for breaking max lean angles
if maxRoll and abs(maxRoll) > abs(maxPitch):
self.result.status = TestResult.StatusType.FAIL
self.result.statusMessage = "Roll (%.2f, line %d) > maximum lean angle (%.2f)" % (
maxRoll,
maxRollLine,
maxLeanAngle,
)
return
if maxPitch:
self.result.status = TestResult.StatusType.FAIL
self.result.statusMessage = "Pitch (%.2f, line %d) > maximum lean angle (%.2f)" % (
maxPitch,
maxPitchLine,
maxLeanAngle,
)
return
# TODO: use numpy/scipy to check Roll+RollIn curves for fitness (ignore where we're not airborne)
# ...

View File

@ -1,90 +0,0 @@
# AP_FLAKE8_CLEAN
from __future__ import print_function
from LogAnalyzer import Test, TestResult
from VehicleType import VehicleType
class TestThrust(Test):
'''test for sufficient thrust (copter only for now)'''
def __init__(self):
Test.__init__(self)
self.name = "Thrust"
def run(self, logdata, verbose):
self.result = TestResult()
self.result.status = TestResult.StatusType.GOOD
if logdata.vehicleType != VehicleType.Copter:
self.result.status = TestResult.StatusType.NA
return
if "CTUN" not in logdata.channels:
self.result.status = TestResult.StatusType.UNKNOWN
self.result.statusMessage = "No CTUN log data"
return
if "ATT" not in logdata.channels:
self.result.status = TestResult.StatusType.UNKNOWN
self.result.statusMessage = "No ATT log data"
return
throut_key = None
for key in "ThO", "ThrOut":
if key in logdata.channels["CTUN"]:
throut_key = key
break
if throut_key is None:
self.result.status = TestResult.StatusType.UNKNOWN
self.result.statusMessage = "Could not find throttle out column"
return
# check for throttle (CTUN.ThrOut) above 700 for a chunk of time with copter not rising
highThrottleThreshold = 700
tiltThreshold = 20 # ignore high throttle when roll or tilt is above this value
climbThresholdWARN = 100
climbThresholdFAIL = 50
minSampleLength = 50
highThrottleSegments = []
# find any contiguous chunks where CTUN.ThrOut > highThrottleThreshold, ignore high throttle if
# tilt > tiltThreshold, and discard any segments shorter than minSampleLength
start = None
data = logdata.channels["CTUN"][throut_key].listData
for i in range(0, len(data)):
(lineNumber, value) = data[i]
isBelowTiltThreshold = True
if value > highThrottleThreshold:
(roll, meh) = logdata.channels["ATT"]["Roll"].getNearestValue(lineNumber)
(pitch, meh) = logdata.channels["ATT"]["Pitch"].getNearestValue(lineNumber)
if (abs(roll) > tiltThreshold) or (abs(pitch) > tiltThreshold):
isBelowTiltThreshold = False
if (value > highThrottleThreshold) and isBelowTiltThreshold:
if start is None:
start = i
elif start is not None:
if (i - start) > minSampleLength:
highThrottleSegments.append((start, i))
start = None
climbRate = "CRate"
if "CRate" not in logdata.channels["CTUN"]:
climbRate = "CRt"
# loop through each checking climbRate, if < 50 FAIL, if < 100 WARN
# TODO: we should filter climbRate and use its slope rather than value for this test
for seg in highThrottleSegments:
(startLine, endLine) = (data[seg[0]][0], data[seg[1]][0])
avgClimbRate = logdata.channels["CTUN"][climbRate].getSegment(startLine, endLine).avg()
avgThrOut = logdata.channels["CTUN"][throut_key].getSegment(startLine, endLine).avg()
if avgClimbRate < climbThresholdFAIL:
self.result.status = TestResult.StatusType.FAIL
self.result.statusMessage = "Avg climb rate %.2f cm/s for throttle avg %d" % (avgClimbRate, avgThrOut)
return
if avgClimbRate < climbThresholdWARN:
self.result.status = TestResult.StatusType.WARN
self.result.statusMessage = "Avg climb rate %.2f cm/s for throttle avg %d" % (avgClimbRate, avgThrOut)

View File

@ -1,43 +0,0 @@
# AP_FLAKE8_CLEAN
from LogAnalyzer import Test, TestResult
class TestVCC(Test):
'''test for VCC within recommendations, or abrupt end to log in flight'''
def __init__(self):
Test.__init__(self)
self.name = "VCC"
def run(self, logdata, verbose):
self.result = TestResult()
self.result.status = TestResult.StatusType.GOOD
if "CURR" not in logdata.channels:
self.result.status = TestResult.StatusType.UNKNOWN
self.result.statusMessage = "No CURR log data"
return
# just a naive min/max test for now
try:
vccMin = logdata.channels["CURR"]["Vcc"].min()
vccMax = logdata.channels["CURR"]["Vcc"].max()
except KeyError:
vccMin = logdata.channels["POWR"]["Vcc"].min()
vccMax = logdata.channels["POWR"]["Vcc"].max()
vccMin *= 1000
vccMax *= 1000
vccDiff = vccMax - vccMin
vccMinThreshold = 4.6 * 1000
vccMaxDiff = 0.3 * 1000
if vccDiff > vccMaxDiff:
self.result.status = TestResult.StatusType.WARN
self.result.statusMessage = "VCC min/max diff %sv, should be <%sv" % (vccDiff / 1000.0, vccMaxDiff / 1000.0)
elif vccMin < vccMinThreshold:
self.result.status = TestResult.StatusType.FAIL
self.result.statusMessage = "VCC below minimum of %sv (%sv)" % (
repr(vccMinThreshold / 1000.0),
repr(vccMin / 1000.0),
)

View File

@ -1,76 +0,0 @@
# AP_FLAKE8_CLEAN
from __future__ import print_function
import DataflashLog
import numpy
from LogAnalyzer import Test, TestResult
from VehicleType import VehicleType
class TestVibration(Test):
'''test for accelerometer vibration (accX/accY/accZ) within recommendations'''
def __init__(self):
Test.__init__(self)
self.name = "Vibration"
def run(self, logdata, verbose):
self.result = TestResult()
if logdata.vehicleType != VehicleType.Copter:
self.result.status = TestResult.StatusType.NA
return
# constants
aimRangeWarnXY = 1.5
aimRangeFailXY = 3.0
aimRangeWarnZ = 2.0 # gravity +/- aim range
aimRangeFailZ = 5.0 # gravity +/- aim range
if "IMU" not in logdata.channels:
self.result.status = TestResult.StatusType.UNKNOWN
self.result.statusMessage = "No IMU log data"
return
# find some stable LOITER data to analyze, at least 10 seconds
chunks = DataflashLog.DataflashLogHelper.findLoiterChunks(logdata, minLengthSeconds=10, noRCInputs=True)
if not chunks:
self.result.status = TestResult.StatusType.UNKNOWN
self.result.statusMessage = "No stable LOITER log data found"
return
# for now we'll just use the first (largest) chunk of LOITER data
# TODO: ignore the first couple of secs to avoid bad data during transition - or can we check more analytically
# that we're stable?
# TODO: accumulate all LOITER chunks over min size, or just use the largest one?
startLine = chunks[0][0]
endLine = chunks[0][1]
def getStdDevIMU(logdata, channelName, startLine, endLine):
loiterData = logdata.channels["IMU"][channelName].getSegment(startLine, endLine)
numpyData = numpy.array(loiterData.dictData.values())
return numpy.std(numpyData)
# use 2x standard deviations as the metric, so if 95% of samples lie within the aim range we're good
stdDevX = abs(2 * getStdDevIMU(logdata, "AccX", startLine, endLine))
stdDevY = abs(2 * getStdDevIMU(logdata, "AccY", startLine, endLine))
stdDevZ = abs(2 * getStdDevIMU(logdata, "AccZ", startLine, endLine))
if (stdDevX > aimRangeFailXY) or (stdDevY > aimRangeFailXY) or (stdDevZ > aimRangeFailZ):
self.result.status = TestResult.StatusType.FAIL
self.result.statusMessage = "Vibration too high (X:%.2fg, Y:%.2fg, Z:%.2fg)" % (stdDevX, stdDevY, stdDevZ)
elif (stdDevX > aimRangeWarnXY) or (stdDevY > aimRangeWarnXY) or (stdDevZ > aimRangeWarnZ):
self.result.status = TestResult.StatusType.WARN
self.result.statusMessage = "Vibration slightly high (X:%.2fg, Y:%.2fg, Z:%.2fg)" % (
stdDevX,
stdDevY,
stdDevZ,
)
else:
self.result.status = TestResult.StatusType.GOOD
self.result.statusMessage = "Good vibration values (X:%.2fg, Y:%.2fg, Z:%.2fg)" % (
stdDevX,
stdDevY,
stdDevZ,
)