From d76f3d71aed2ced0e7f148698c71a297b720337b Mon Sep 17 00:00:00 2001 From: Luiz Georg Date: Mon, 18 Jul 2022 10:52:56 -0300 Subject: [PATCH] Tools/LogAnalyzer: pass flake8 `TestDualGyroDrift.py` ignored because it is mostly commented out code --- Tools/LogAnalyzer/DataflashLog.py | 70 ++++++++++--------- Tools/LogAnalyzer/LogAnalyzer.py | 18 +++-- Tools/LogAnalyzer/UnitTest.py | 4 +- Tools/LogAnalyzer/VehicleType.py | 3 + Tools/LogAnalyzer/tests/TestAutotune.py | 13 ++-- Tools/LogAnalyzer/tests/TestBrownout.py | 6 +- Tools/LogAnalyzer/tests/TestCompass.py | 4 +- Tools/LogAnalyzer/tests/TestDupeLogData.py | 13 ++-- Tools/LogAnalyzer/tests/TestEmpty.py | 3 + Tools/LogAnalyzer/tests/TestEvents.py | 3 +- Tools/LogAnalyzer/tests/TestGPSGlitch.py | 3 + Tools/LogAnalyzer/tests/TestIMUMatch.py | 9 +-- Tools/LogAnalyzer/tests/TestMotorBalance.py | 7 +- Tools/LogAnalyzer/tests/TestNaN.py | 5 +- Tools/LogAnalyzer/tests/TestOptFlow.py | 30 +++++--- Tools/LogAnalyzer/tests/TestParams.py | 10 ++- Tools/LogAnalyzer/tests/TestPerformance.py | 10 +-- .../tests/TestPitchRollCoupling.py | 14 ++-- Tools/LogAnalyzer/tests/TestThrust.py | 16 +++-- Tools/LogAnalyzer/tests/TestVCC.py | 7 +- Tools/LogAnalyzer/tests/TestVibration.py | 10 +-- 21 files changed, 155 insertions(+), 103 deletions(-) diff --git a/Tools/LogAnalyzer/DataflashLog.py b/Tools/LogAnalyzer/DataflashLog.py index f7f9555f53..3098955ded 100644 --- a/Tools/LogAnalyzer/DataflashLog.py +++ b/Tools/LogAnalyzer/DataflashLog.py @@ -4,6 +4,8 @@ # Initial code by Andrew Chapman (amchapman@gmail.com), 16th Jan 2014 # +# AP_FLAKE8_CLEAN + from __future__ import print_function import bisect @@ -31,8 +33,11 @@ class Format(object): @staticmethod def trycastToFormatType(value, valueType): - '''using format characters from libraries/DataFlash/DataFlash.h to cast strings to basic python int/float/string types - tries a cast, if it does not work, well, acceptable as the text logs do not match the format, e.g. MODE is expected to be int''' + """ + Using format characters from libraries/DataFlash/DataFlash.h to cast strings to basic python int/float/string + types tries a cast, if it does not work, well, acceptable as the text logs do not match the format, e.g. MODE is + expected to be int + """ try: if valueType in "fcCeELd": return float(value) @@ -78,7 +83,6 @@ class Format(object): def init(a, *x): if len(x) != len(a.labels): raise ValueError("Invalid Length") - # print(list(zip(a.labels, x))) for (l, v) in zip(a.labels, x): try: setattr(a, l, v) @@ -90,7 +94,6 @@ class Format(object): # finally, create the class cls = type('Log__{:s}'.format(self.name), (object,), members) - # print(members) return cls @@ -213,8 +216,6 @@ class BinaryFormat(ctypes.LittleEndianStructure): if ctypes.sizeof(cls) != cls.SIZE: print("size mismatch for {} expected {} got {}".format(cls, ctypes.sizeof(cls), cls.SIZE), file=sys.stderr) - # for i in cls.labels: - # print("{} = {}".format(i,getattr(cls,'_'+i))) return None return cls @@ -230,12 +231,9 @@ class Channel(object): # TODO: store data as a scipy spline curve so we can more easily interpolate and sample the slope? def __init__(self): - self.dictData = ( - {} - ) # dict of linenum->value # store dupe data in dict and list for now, until we decide which is the better way to go - self.listData = ( - [] - ) # list of (linenum,value) # store dupe data in dict and list for now, until we decide which is the better way to go + # store dupe data in dict and list for now, until we decide which is the better way to go + self.dictData = {} # dict of linenum->value + self.listData = [] # list of (linenum,value) def getSegment(self, startLine, endLine): '''returns a segment of this data (from startLine to endLine, inclusive) as a new Channel instance''' @@ -257,7 +255,6 @@ class Channel(object): index = bisect.bisect_left(self.listData, (lineNumber, -99999)) while index < len(self.listData): line = self.listData[index][0] - # print("Looking forwards for nearest value to line number %d, starting at line %d" % (lineNumber,line)) # TEMP if line >= lineNumber: return (self.listData[index][1], line) index += 1 @@ -268,14 +265,16 @@ class Channel(object): index = bisect.bisect_left(self.listData, (lineNumber, -99999)) - 1 while index >= 0: line = self.listData[index][0] - # print("Looking backwards for nearest value to line number %d, starting at line %d" % (lineNumber,line)) # TEMP if line <= lineNumber: return (self.listData[index][1], line) index -= 1 raise ValueError("Error finding nearest value for line %d" % lineNumber) def getNearestValue(self, lineNumber, lookForwards=True): - '''find the nearest data value to the given lineNumber, defaults to first looking forwards. Returns (value,lineNumber)''' + """ + Find the nearest data value to the given lineNumber, defaults to first looking forwards. + Returns (value,lineNumber) + """ if lookForwards: try: return self.getNearestValueFwd(lineNumber) @@ -299,8 +298,6 @@ class Channel(object): def getIndexOf(self, lineNumber): '''returns the index within this channel's listData of the given lineNumber, or raises an Exception if not found''' index = bisect.bisect_left(self.listData, (lineNumber, -99999)) - # print("INDEX of line %d: %d" % (lineNumber,index)) - # print("self.listData[index][0]: %d" % self.listData[index][0]) if self.listData[index][0] == lineNumber: return index else: @@ -308,9 +305,13 @@ class Channel(object): class LogIterator: - '''Smart iterator that can move through a log by line number and maintain an index into the nearest values of all data channels''' + """ + Smart iterator that can move through a log by line number and maintain an index into the nearest values of all data + channels + """ - # TODO: LogIterator currently indexes the next available value rather than the nearest value, we should make it configurable between next/nearest + # TODO: LogIterator currently indexes the next available value rather than the nearest value, we should make it + # configurable between next/nearest class LogIteratorSubValue: '''syntactic sugar to allow access by LogIterator[lineLabel][dataLabel]''' @@ -355,7 +356,7 @@ class LogIterator: # check if the currentLine has gone past our the line we're pointing to for this type of data dataLabel = self.logdata.formats[lineLabel].labels[0] (index, lineNumber) = self.iterators[lineLabel] - # if so, and it is not the last entry in the log, then increment the indices for all dataLabels under that lineLabel + # if so, and it is not the last entry in the log, increment the indices for dataLabels under that lineLabel if (self.currentLine > lineNumber) and ( index < len(self.logdata.channels[lineLabel][dataLabel].listData) - 1 ): @@ -379,7 +380,7 @@ class DataflashLogHelper: @staticmethod def getTimeAtLine(logdata, lineNumber): '''returns the nearest GPS timestamp in milliseconds after the given line number''' - if not "GPS" in logdata.channels: + if "GPS" not in logdata.channels: raise Exception("no GPS log data found") # older logs use 'TIme', newer logs use 'TimeMS' # even newer logs use TimeUS @@ -400,7 +401,10 @@ class DataflashLogHelper: @staticmethod def findLoiterChunks(logdata, minLengthSeconds=0, noRCInputs=True): - '''returns a list of (to,from) pairs defining sections of the log which are in loiter mode. Ordered from longest to shortest in time. If noRCInputs == True it only returns chunks with no control inputs''' + """ + Returns a list of (to, from) pairs defining sections of the log which are in loiter mode, ordered from longest + to shortest in time. If `noRCInputs == True` it only returns chunks with no control inputs + """ # TODO: implement noRCInputs handling when identifying stable loiter chunks, for now we're ignoring it def chunkSizeCompare(chunk1, chunk2): @@ -430,9 +434,7 @@ class DataflashLogHelper: ) / 1000.0 if chunkTimeSeconds > minLengthSeconds: chunks.append((startLine, endLine)) - # print("LOITER chunk: %d to %d, %d lines" % (startLine,endLine,endLine-startLine+1)) - # print(" (time %d to %d, %d seconds)" % (DataflashLogHelper.getTimeAtLine(logdata,startLine), DataflashLogHelper.getTimeAtLine(logdata,endLine), chunkTimeSeconds)) - chunks.sort(chunkSizeCompare) + chunks.sort(key=lambda chunk: chunk[1] - chunk[0]) return chunks @staticmethod @@ -445,7 +447,7 @@ class DataflashLogHelper: if "CTUN" in logdata.channels: try: maxThrottle = logdata.channels["CTUN"]["ThrOut"].max() - except KeyError as e: + except KeyError: # ThrOut was shorted to ThO at some stage... maxThrottle = logdata.channels["CTUN"]["ThO"].max() # at roughly the same time ThO became a range from 0 to 1 @@ -456,7 +458,10 @@ class DataflashLogHelper: class DataflashLog(object): - '''ArduPilot Dataflash log file reader and container class. Keep this simple, add more advanced or specific functions to DataflashLogHelper class''' + """ + ArduPilot Dataflash log file reader and container class. Keep this simple, add more advanced or specific functions + to DataflashLogHelper class + """ knownHardwareTypes = ["APM", "PX4", "MPNG"] @@ -539,7 +544,6 @@ class DataflashLog(object): elif format == 'auto': if self.filename == '': # assuming TXT format - # raise ValueError("Invalid log format for stdin: {}".format(format)) head = "" else: head = f.read(4) @@ -620,7 +624,7 @@ class DataflashLog(object): else: # assume it has ModeNum: self.modeChanges[lineNumber] = (modes[int(e.Mode)], e.ModeNum) - except ValueError as x: + except ValueError: if hasattr(e, 'ThrCrs'): self.modeChanges[lineNumber] = (e.Mode, e.ThrCrs) else: @@ -687,7 +691,7 @@ class DataflashLog(object): groupName = e.NAME # first time seeing this type of log line, create the channel storage - if not groupName in self.channels: + if groupName not in self.channels: self.channels[groupName] = {} for label in e.labels: self.channels[groupName][label] = Channel() @@ -708,7 +712,6 @@ class DataflashLog(object): lineNumber = lineNumber + 1 numBytes += len(line) + 1 try: - # print("Reading line: %d" % lineNumber) line = line.strip('\n\r') tokens = line.split(', ') # first handle the log header lines @@ -727,7 +730,8 @@ class DataflashLog(object): elif len(tokens2) == 3 and tokens2[0] == "Free" and tokens2[1] == "RAM:": self.freeRAM = int(tokens2[2]) elif tokens2[0] in knownHardwareTypes: - self.hardwareType = line # not sure if we can parse this more usefully, for now only need to report it back verbatim + # not sure if we can parse this more usefully, for now only need to report it back verbatim + self.hardwareType = line elif (len(tokens2) == 2 or len(tokens2) == 3) and tokens2[1][ 0 ].lower() == "v": # e.g. ArduCopter V3.1 (5c6503e2) @@ -777,7 +781,7 @@ class DataflashLog(object): while len(data) > offset + ctypes.sizeof(logheader): h = logheader.from_buffer(data, offset) if not (h.head1 == 0xA3 and h.head2 == 0x95): - if ignoreBadlines == False: + if ignoreBadlines is False: raise ValueError(h) else: if h.head1 == 0xFF and h.head2 == 0xFF and h.msgid == 0xFF: diff --git a/Tools/LogAnalyzer/LogAnalyzer.py b/Tools/LogAnalyzer/LogAnalyzer.py index cfa163982e..70abd21692 100755 --- a/Tools/LogAnalyzer/LogAnalyzer.py +++ b/Tools/LogAnalyzer/LogAnalyzer.py @@ -5,13 +5,15 @@ # Initial code by Andrew Chapman (amchapman@gmail.com), 16th Jan 2014 # +# AP_FLAKE8_CLEAN # some logging oddities noticed while doing this, to be followed up on: # - tradheli MOT labels Mot1,Mot2,Mot3,Mot4,GGain # - Pixhawk doesn't output one of the FMT labels... forget which one # - MAG offsets seem to be constant (only seen data on Pixhawk) # - MAG offsets seem to be cast to int before being output? (param is -84.67, logged as -84) -# - copter+plane use 'V' in their vehicle type/version/build line, rover uses lower case 'v'. Copter+Rover give a build number, plane does not +# - copter+plane use 'V' in their vehicle type/version/build line, rover uses lower case 'v'. +# Copter+Rover give a build number, plane does not # - CTUN.ThrOut on copter is 0-1000, on plane+rover it is 0-100 # TODO: add test for noisy baro values @@ -25,7 +27,6 @@ import glob import imp import inspect import os -import pprint # temp import sys import time from xml.sax.saxutils import escape @@ -38,7 +39,8 @@ class TestResult(object): '''all tests return a standardized result type''' class StatusType: - # NA means not applicable for this log (e.g. copter tests against a plane log), UNKNOWN means it is missing data required for the test + # NA means not applicable for this log (e.g. copter tests against a plane log) + # UNKNOWN means it is missing data required for the test GOOD, FAIL, WARN, UNKNOWN, NA = range(5) status = None @@ -46,7 +48,10 @@ class TestResult(object): class Test(object): - '''base class to be inherited by log tests. Each test should be quite granular so we have lots of small tests with clear results''' + """ + Base class to be inherited by log tests. Each test should be quite granular so we have lots of small tests with + clear results + """ def __init__(self): self.name = "" @@ -140,7 +145,8 @@ class TestSuite(object): print('\n') print( - 'The Log Analyzer is currently BETA code.\nFor any support or feedback on the log analyzer please email Andrew Chapman (amchapman@gmail.com)' + "The Log Analyzer is currently BETA code." + "\nFor any support or feedback on the log analyzer please email Andrew Chapman (amchapman@gmail.com)" ) print('\n') @@ -218,8 +224,6 @@ class TestSuite(object): def main(): - dirName = os.path.dirname(os.path.abspath(__file__)) - # deal with command line arguments parser = argparse.ArgumentParser(description='Analyze an APM Dataflash log for known issues') parser.add_argument('logfile', type=argparse.FileType('r'), help='path to Dataflash log file (or - for stdin)') diff --git a/Tools/LogAnalyzer/UnitTest.py b/Tools/LogAnalyzer/UnitTest.py index fc1c36cb80..4eccc87952 100755 --- a/Tools/LogAnalyzer/UnitTest.py +++ b/Tools/LogAnalyzer/UnitTest.py @@ -5,6 +5,8 @@ # # +# AP_FLAKE8_CLEAN + # TODO: implement more unit+regression tests from __future__ import print_function @@ -397,6 +399,6 @@ try: print("All unit/regression tests GOOD\n") -except Exception as e: +except Exception: print("Error found: " + traceback.format_exc()) print("UNIT TEST FAILED\n") diff --git a/Tools/LogAnalyzer/VehicleType.py b/Tools/LogAnalyzer/VehicleType.py index 6dad1b4344..00df675930 100644 --- a/Tools/LogAnalyzer/VehicleType.py +++ b/Tools/LogAnalyzer/VehicleType.py @@ -1,3 +1,6 @@ +# AP_FLAKE8_CLEAN + + class VehicleType: Plane = 17 Copter = 23 diff --git a/Tools/LogAnalyzer/tests/TestAutotune.py b/Tools/LogAnalyzer/tests/TestAutotune.py index 7128da79df..16c04e39cd 100644 --- a/Tools/LogAnalyzer/tests/TestAutotune.py +++ b/Tools/LogAnalyzer/tests/TestAutotune.py @@ -1,4 +1,5 @@ -import DataflashLog +# AP_FLAKE8_CLEAN + from LogAnalyzer import Test, TestResult from VehicleType import VehicleType @@ -70,7 +71,7 @@ class TestAutotune(Test): for i in ['EV', 'ATDE', 'ATUN']: r = False - if not i in logdata.channels: + if i not in logdata.channels: self.result.status = TestResult.StatusType.UNKNOWN self.result.statusMessage = "No {} log data".format(i) r = True @@ -126,7 +127,9 @@ class TestAutotune(Test): for key in logdata.channels['ATUN']: setattr(atun, key, logdata.channels['ATUN'][key].getNearestValueFwd(linenext)[0]) linenext = logdata.channels['ATUN'][key].getNearestValueFwd(linenext)[1] + 1 - self.result.statusMessage += 'ATUN Axis:{atun.Axis} TuneStep:{atun.TuneStep} RateMin:{atun.RateMin:5.0f} RateMax:{atun.RateMax:5.0f} RPGain:{atun.RPGain:1.4f} RDGain:{atun.RDGain:1.4f} SPGain:{atun.SPGain:1.1f} (@line:{l})\n'.format( - l=linenext, s=s, atun=atun - ) + self.result.statusMessage += ( + "ATUN Axis:{atun.Axis} TuneStep:{atun.TuneStep} RateMin:{atun.RateMin:5.0f}" + " RateMax:{atun.RateMax:5.0f} RPGain:{atun.RPGain:1.4f} RDGain:{atun.RDGain:1.4f}" + " SPGain:{atun.SPGain:1.1f} (@line:{l})\n" + ).format(l=linenext, atun=atun) self.result.statusMessage += '\n' diff --git a/Tools/LogAnalyzer/tests/TestBrownout.py b/Tools/LogAnalyzer/tests/TestBrownout.py index 0542ae4891..a72281e965 100644 --- a/Tools/LogAnalyzer/tests/TestBrownout.py +++ b/Tools/LogAnalyzer/tests/TestBrownout.py @@ -1,6 +1,5 @@ -import collections +# AP_FLAKE8_CLEAN -import DataflashLog from LogAnalyzer import Test, TestResult @@ -19,7 +18,8 @@ class TestBrownout(Test): # FIXME: cope with LOG_ARM_DISARM_MSG message if "EV" in logdata.channels: # step through the arm/disarm events in order, to see if they're symmetrical - # note: it seems landing detection isn't robust enough to rely upon here, so we'll only consider arm+disarm, not takeoff+land + # note: it seems landing detection isn't robust enough to rely upon here, so we'll only consider arm+disarm, + # not takeoff+land for line, ev in logdata.channels["EV"]["Id"].listData: if ev == 10: isArmed = True diff --git a/Tools/LogAnalyzer/tests/TestCompass.py b/Tools/LogAnalyzer/tests/TestCompass.py index 959c0de4de..b4e6dc5f5a 100644 --- a/Tools/LogAnalyzer/tests/TestCompass.py +++ b/Tools/LogAnalyzer/tests/TestCompass.py @@ -1,7 +1,9 @@ +# AP_FLAKE8_CLEAN + + import math from functools import reduce -import DataflashLog from LogAnalyzer import Test, TestResult diff --git a/Tools/LogAnalyzer/tests/TestDupeLogData.py b/Tools/LogAnalyzer/tests/TestDupeLogData.py index fbe94634f6..d570f579f4 100644 --- a/Tools/LogAnalyzer/tests/TestDupeLogData.py +++ b/Tools/LogAnalyzer/tests/TestDupeLogData.py @@ -1,6 +1,8 @@ +# AP_FLAKE8_CLEAN + + from __future__ import print_function -import DataflashLog from LogAnalyzer import Test, TestResult @@ -25,12 +27,10 @@ class TestDupeLogData(Test): # c data = logdata.channels["ATT"]["Pitch"].listData for i in range(sampleStartIndex, len(data)): - # print("Checking against index %d" % i) if i == sampleStartIndex: continue # skip matching against ourselves j = 0 while j < 20 and (i + j) < len(data) and data[i + j][1] == sample[j][1]: - # print("### Match found, j=%d, data=%f, sample=%f, log data matched to sample at line %d" % (j,data[i+j][1],sample[j][1],data[i+j][0])) j += 1 if j == 20: # all samples match return data[i][0] @@ -41,7 +41,8 @@ class TestDupeLogData(Test): self.result = TestResult() self.result.status = TestResult.StatusType.GOOD - # this could be made more flexible by not hard-coding to use ATT data, could make it dynamic based on whatever is available as long as it is highly variable + # this could be made more flexible by not hard-coding to use ATT data, could make it dynamic based on whatever + # is available as long as it is highly variable if "ATT" not in logdata.channels: self.result.status = TestResult.StatusType.UNKNOWN self.result.statusMessage = "No ATT log data" @@ -49,22 +50,18 @@ class TestDupeLogData(Test): # pick 10 sample points within the range of ATT data we have sampleStartIndices = [] - attStartIndex = 0 attEndIndex = len(logdata.channels["ATT"]["Pitch"].listData) - 1 step = int(attEndIndex / 11) for i in range(step, attEndIndex - step, step): sampleStartIndices.append(i) - # print("Dupe data sample point index %d at line %d" % (i, logdata.channels["ATT"]["Pitch"].listData[i][0])) # get 20 datapoints of pitch from each sample location and check for a match elsewhere sampleIndex = 0 for i in range(sampleStartIndices[0], len(logdata.channels["ATT"]["Pitch"].listData)): if i == sampleStartIndices[sampleIndex]: - # print("Checking sample %d" % i) sample = logdata.channels["ATT"]["Pitch"].listData[i : i + 20] matchedLine = self.__matchSample(sample, i, logdata) if matchedLine: - # print("Data from line %d found duplicated at line %d" % (sample[0][0],matchedLine)) self.result.status = TestResult.StatusType.FAIL self.result.statusMessage = "Duplicate data chunks found in log (%d and %d)" % ( sample[0][0], diff --git a/Tools/LogAnalyzer/tests/TestEmpty.py b/Tools/LogAnalyzer/tests/TestEmpty.py index 07bfa8ed87..e247702a5e 100644 --- a/Tools/LogAnalyzer/tests/TestEmpty.py +++ b/Tools/LogAnalyzer/tests/TestEmpty.py @@ -1,3 +1,6 @@ +# AP_FLAKE8_CLEAN + + import DataflashLog from LogAnalyzer import Test, TestResult diff --git a/Tools/LogAnalyzer/tests/TestEvents.py b/Tools/LogAnalyzer/tests/TestEvents.py index 2ab10cd2a8..b8556b5baf 100644 --- a/Tools/LogAnalyzer/tests/TestEvents.py +++ b/Tools/LogAnalyzer/tests/TestEvents.py @@ -1,4 +1,5 @@ -import DataflashLog +# AP_FLAKE8_CLEAN + from LogAnalyzer import Test, TestResult diff --git a/Tools/LogAnalyzer/tests/TestGPSGlitch.py b/Tools/LogAnalyzer/tests/TestGPSGlitch.py index bbc6733427..294351e2be 100644 --- a/Tools/LogAnalyzer/tests/TestGPSGlitch.py +++ b/Tools/LogAnalyzer/tests/TestGPSGlitch.py @@ -1,3 +1,6 @@ +# AP_FLAKE8_CLEAN + + from LogAnalyzer import Test, TestResult diff --git a/Tools/LogAnalyzer/tests/TestIMUMatch.py b/Tools/LogAnalyzer/tests/TestIMUMatch.py index 327672c3c9..e26c68756f 100644 --- a/Tools/LogAnalyzer/tests/TestIMUMatch.py +++ b/Tools/LogAnalyzer/tests/TestIMUMatch.py @@ -1,8 +1,10 @@ +# AP_FLAKE8_CLEAN + + from __future__ import print_function from math import sqrt -import DataflashLog from LogAnalyzer import Test, TestResult @@ -23,12 +25,12 @@ class TestIMUMatch(Test): self.result = TestResult() self.result.status = TestResult.StatusType.GOOD - if ("IMU" in logdata.channels) and (not "IMU2" in logdata.channels): + if ("IMU" in logdata.channels) and ("IMU2" not in logdata.channels): self.result.status = TestResult.StatusType.NA self.result.statusMessage = "No IMU2" return - if (not "IMU" in logdata.channels) or (not "IMU2" in logdata.channels): + if ("IMU" not in logdata.channels) or ("IMU2" not in logdata.channels): self.result.status = TestResult.StatusType.UNKNOWN self.result.statusMessage = "No IMU log data" return @@ -115,7 +117,6 @@ class TestIMUMatch(Test): diff_filtered = sqrt(xdiff_filtered**2 + ydiff_filtered**2 + zdiff_filtered**2) max_diff_filtered = max(max_diff_filtered, diff_filtered) - # print(max_diff_filtered) last_t = t if max_diff_filtered > fail_threshold: diff --git a/Tools/LogAnalyzer/tests/TestMotorBalance.py b/Tools/LogAnalyzer/tests/TestMotorBalance.py index 26dc03cc0f..398d9941d3 100644 --- a/Tools/LogAnalyzer/tests/TestMotorBalance.py +++ b/Tools/LogAnalyzer/tests/TestMotorBalance.py @@ -1,4 +1,5 @@ -import DataflashLog +# AP_FLAKE8_CLEAN + from LogAnalyzer import Test, TestResult from VehicleType import VehicleType @@ -19,7 +20,7 @@ class TestBalanceTwist(Test): return self.result.status = TestResult.StatusType.UNKNOWN - if not "RCOU" in logdata.channels: + if "RCOU" not in logdata.channels: return ch = [] @@ -50,7 +51,7 @@ class TestBalanceTwist(Test): / (logdata.parameters["RC3_MAX"] - logdata.parameters["RC3_MIN"]) / 1000.0 ) - except KeyError as e: + except KeyError: min_throttle = ( logdata.parameters["MOT_PWM_MIN"] / (logdata.parameters["MOT_PWM_MAX"] - logdata.parameters["RC3_MIN"]) diff --git a/Tools/LogAnalyzer/tests/TestNaN.py b/Tools/LogAnalyzer/tests/TestNaN.py index 63fde87aa9..d53949a71e 100644 --- a/Tools/LogAnalyzer/tests/TestNaN.py +++ b/Tools/LogAnalyzer/tests/TestNaN.py @@ -1,3 +1,6 @@ +# AP_FLAKE8_CLEAN + + import math from LogAnalyzer import Test, TestResult @@ -36,5 +39,5 @@ class TestNaN(Test): field, ) raise ValueError() - except ValueError as e: + except ValueError: continue diff --git a/Tools/LogAnalyzer/tests/TestOptFlow.py b/Tools/LogAnalyzer/tests/TestOptFlow.py index 92bbb6aca7..32ef7251d8 100644 --- a/Tools/LogAnalyzer/tests/TestOptFlow.py +++ b/Tools/LogAnalyzer/tests/TestOptFlow.py @@ -1,6 +1,8 @@ +# AP_FLAKE8_CLEAN + + from math import sqrt -import DataflashLog import matplotlib.pyplot as plt import numpy as np from LogAnalyzer import Test, TestResult @@ -53,8 +55,12 @@ class TestFlow(Test): 2.0 # if the gyro rate is greter than this, the data will not be used by the curve fit (rad/sec) ) param_std_threshold = 5.0 # maximum allowable 1-std uncertainty in scaling parameter (scale factor * 1000) - param_abs_threshold = 200 # max/min allowable scale factor parameter. Values of FLOW_FXSCALER and FLOW_FYSCALER outside the range of +-param_abs_threshold indicate a sensor configuration problem. - min_num_points = 100 # minimum number of points required for a curve fit - this is necessary, but not sufficient condition - the standard deviation estimate of the fit gradient is also important. + # max/min allowable scale factor parameter. Values of FLOW_FXSCALER and FLOW_FYSCALER outside the range + # of +-param_abs_threshold indicate a sensor configuration problem. + param_abs_threshold = 200 + # minimum number of points required for a curve fit - this is necessary, but not sufficient condition - the + # standard deviation estimate of the fit gradient is also important. + min_num_points = 100 # get the existing scale parameters flow_fxscaler = logdata.parameters["FLOW_FXSCALER"] @@ -204,7 +210,8 @@ class TestFlow(Test): bodyY_resampled.append(bodyY[i]) flowY_time_us_resampled.append(flow_time_us[i]) - # fit a straight line to the flow vs body rate data and calculate the scale factor parameter required to achieve a slope of 1 + # fit a straight line to the flow vs body rate data and calculate the scale factor parameter required to + # achieve a slope of 1 coef_flow_x, cov_x = np.polyfit( bodyX_resampled, flowX_resampled, 1, rcond=None, full=False, w=None, cov=True ) @@ -212,7 +219,8 @@ class TestFlow(Test): bodyY_resampled, flowY_resampled, 1, rcond=None, full=False, w=None, cov=True ) - # taking the exisiting scale factor parameters into account, calculate the parameter values reequired to achieve a unity slope + # taking the exisiting scale factor parameters into account, calculate the parameter values reequired to + # achieve a unity slope flow_fxscaler_new = int(1000 * (((1 + 0.001 * float(flow_fxscaler)) / coef_flow_x[0] - 1))) flow_fyscaler_new = int(1000 * (((1 + 0.001 * float(flow_fyscaler)) / coef_flow_y[0] - 1))) @@ -220,8 +228,9 @@ class TestFlow(Test): if sqrt(cov_x[0][0]) > param_std_threshold or sqrt(cov_y[0][0]) > param_std_threshold: FAIL() self.result.statusMessage = ( - "FAIL: inaccurate fit - poor quality or insufficient data\nFLOW_FXSCALER 1STD = %u\nFLOW_FYSCALER 1STD = %u\n" - % (round(1000 * sqrt(cov_x[0][0])), round(1000 * sqrt(cov_y[0][0]))) + "FAIL: inaccurate fit - poor quality or insufficient data" + "\nFLOW_FXSCALER 1STD = %u" + "\nFLOW_FYSCALER 1STD = %u\n" % (round(1000 * sqrt(cov_x[0][0])), round(1000 * sqrt(cov_y[0][0]))) ) # Do a sanity check on the scale factors @@ -234,7 +243,12 @@ class TestFlow(Test): # display recommended scale factors self.result.statusMessage = ( - "Set FLOW_FXSCALER to %i\nSet FLOW_FYSCALER to %i\n\nCal plots saved to flow_calibration.pdf\nCal parameters saved to flow_calibration.param\n\nFLOW_FXSCALER 1STD = %u\nFLOW_FYSCALER 1STD = %u\n" + "Set FLOW_FXSCALER to %i" + "\nSet FLOW_FYSCALER to %i" + "\n\nCal plots saved to flow_calibration.pdf" + "\nCal parameters saved to flow_calibration.param" + "\n\nFLOW_FXSCALER 1STD = %u" + "\nFLOW_FYSCALER 1STD = %u\n" % ( flow_fxscaler_new, flow_fyscaler_new, diff --git a/Tools/LogAnalyzer/tests/TestParams.py b/Tools/LogAnalyzer/tests/TestParams.py index e5b00ddc26..aedcf76216 100644 --- a/Tools/LogAnalyzer/tests/TestParams.py +++ b/Tools/LogAnalyzer/tests/TestParams.py @@ -1,6 +1,8 @@ +# AP_FLAKE8_CLEAN + + import math # for isnan() -import DataflashLog from LogAnalyzer import Test, TestResult from VehicleType import VehicleType @@ -54,8 +56,10 @@ class TestParams(Test): self.result.statusMessage = self.result.statusMessage + name + " is NaN\n" try: - # add parameter checks below using the helper functions, any failures will trigger a FAIL status and accumulate info in statusMessage - # if more complex checking or correlations are required you can access parameter values directly using the logdata.parameters[paramName] dict + # add parameter checks below using the helper functions, any failures will trigger a FAIL status and + # accumulate info in statusMessage. + # If more complex checking or correlations are required you can access parameter values directly using the + # logdata.parameters[paramName] dict if logdata.vehicleType == VehicleType.Copter: self.__checkParamIsEqual("MAG_ENABLE", 1, logdata) if "THR_MIN" in logdata.parameters: diff --git a/Tools/LogAnalyzer/tests/TestPerformance.py b/Tools/LogAnalyzer/tests/TestPerformance.py index 27ba21e3b5..a4e2ebddc2 100644 --- a/Tools/LogAnalyzer/tests/TestPerformance.py +++ b/Tools/LogAnalyzer/tests/TestPerformance.py @@ -1,6 +1,8 @@ +# AP_FLAKE8_CLEAN + + from __future__ import print_function -import DataflashLog from LogAnalyzer import Test, TestResult from VehicleType import VehicleType @@ -21,7 +23,8 @@ class TestPerformance(Test): self.result.status = TestResult.StatusType.NA return - # NOTE: we'll ignore MaxT altogether for now, it seems there are quite regularly one or two high values in there, even ignoring the ones expected after arm/disarm events + # NOTE: we'll ignore MaxT altogether for now, it seems there are quite regularly one or two high values in + # there, even ignoring the ones expected after arm/disarm events. # gather info on arm/disarm lines, we will ignore the MaxT data from the first line found after each of these # armingLines = [] # for line,ev in logdata.channels["EV"]["Id"].listData: @@ -32,7 +35,6 @@ class TestPerformance(Test): # if not armingLines: # break # if maxT[0] > armingLines[0]: - # #print("Ignoring maxT from line %d, as it is the first PM line after arming on line %d" % (maxT[0],armingLines[0])) # ignoreMaxTLines.append(maxT[0]) # armingLines.pop(0) @@ -55,8 +57,6 @@ class TestPerformance(Test): if percentSlow > maxPercentSlow: maxPercentSlow = percentSlow maxPercentSlowLine = line - # if (maxT > 13000) and line not in ignoreMaxTLines: - # print("MaxT of %d detected on line %d" % (maxT,line)) if (maxPercentSlow > 10) or (slowLoopLineCount > 6): self.result.status = TestResult.StatusType.FAIL self.result.statusMessage = "%d slow loop lines found, max %.2f%% on line %d" % ( diff --git a/Tools/LogAnalyzer/tests/TestPitchRollCoupling.py b/Tools/LogAnalyzer/tests/TestPitchRollCoupling.py index 7268e5437f..1da9f6df80 100644 --- a/Tools/LogAnalyzer/tests/TestPitchRollCoupling.py +++ b/Tools/LogAnalyzer/tests/TestPitchRollCoupling.py @@ -1,3 +1,6 @@ +# AP_FLAKE8_CLEAN + + import collections import DataflashLog @@ -8,12 +11,12 @@ from VehicleType import VehicleType class TestPitchRollCoupling(Test): '''test for divergence between input and output pitch/roll, i.e. mechanical failure or bad PID tuning''' - # TODO: currently we're only checking for roll/pitch outside of max lean angle, will come back later to analyze roll/pitch in versus out values + # TODO: currently we're only checking for roll/pitch outside of max lean angle, will come back later to analyze + # roll/pitch in versus out values def __init__(self): Test.__init__(self) self.name = "Pitch/Roll" - self.enable = True # TEMP def run(self, logdata, verbose): self.result = TestResult() @@ -23,12 +26,12 @@ class TestPitchRollCoupling(Test): self.result.status = TestResult.StatusType.NA return - if not "ATT" in logdata.channels: + if "ATT" not in logdata.channels: self.result.status = TestResult.StatusType.UNKNOWN self.result.statusMessage = "No ATT log data" return - if not "CTUN" in logdata.channels: + if "CTUN" not in logdata.channels: self.result.status = TestResult.StatusType.UNKNOWN self.result.statusMessage = "No CTUN log data" return @@ -38,7 +41,8 @@ class TestPitchRollCoupling(Test): else: self.ctun_baralt_att = 'BAlt' - # figure out where each mode begins and ends, so we can treat auto and manual modes differently and ignore acro/tune modes + # figure out where each mode begins and ends, so we can treat auto and manual modes differently and ignore + # acro/tune modes autoModes = [ "RTL", "AUTO", diff --git a/Tools/LogAnalyzer/tests/TestThrust.py b/Tools/LogAnalyzer/tests/TestThrust.py index ed4a228424..c33ebc9cb0 100644 --- a/Tools/LogAnalyzer/tests/TestThrust.py +++ b/Tools/LogAnalyzer/tests/TestThrust.py @@ -1,6 +1,8 @@ +# AP_FLAKE8_CLEAN + + from __future__ import print_function -import DataflashLog from LogAnalyzer import Test, TestResult from VehicleType import VehicleType @@ -20,11 +22,11 @@ class TestThrust(Test): self.result.status = TestResult.StatusType.NA return - if not "CTUN" in logdata.channels: + if "CTUN" not in logdata.channels: self.result.status = TestResult.StatusType.UNKNOWN self.result.statusMessage = "No CTUN log data" return - if not "ATT" in logdata.channels: + if "ATT" not in logdata.channels: self.result.status = TestResult.StatusType.UNKNOWN self.result.statusMessage = "No ATT log data" return @@ -49,7 +51,8 @@ class TestThrust(Test): highThrottleSegments = [] - # find any contiguous chunks where CTUN.ThrOut > highThrottleThreshold, ignore high throttle if tilt > tiltThreshold, and discard any segments shorter than minSampleLength + # find any contiguous chunks where CTUN.ThrOut > highThrottleThreshold, ignore high throttle if + # tilt > tiltThreshold, and discard any segments shorter than minSampleLength start = None data = logdata.channels["CTUN"][throut_key].listData for i in range(0, len(data)): @@ -61,11 +64,10 @@ class TestThrust(Test): if (abs(roll) > tiltThreshold) or (abs(pitch) > tiltThreshold): isBelowTiltThreshold = False if (value > highThrottleThreshold) and isBelowTiltThreshold: - if start == None: + if start is None: start = i - elif start != None: + elif start is not None: if (i - start) > minSampleLength: - # print("Found high throttle chunk from line %d to %d (%d samples)" % (data[start][0],data[i][0],i-start+1)) highThrottleSegments.append((start, i)) start = None diff --git a/Tools/LogAnalyzer/tests/TestVCC.py b/Tools/LogAnalyzer/tests/TestVCC.py index 1f4ed497ba..fedef39d16 100644 --- a/Tools/LogAnalyzer/tests/TestVCC.py +++ b/Tools/LogAnalyzer/tests/TestVCC.py @@ -1,6 +1,5 @@ -import collections +# AP_FLAKE8_CLEAN -import DataflashLog from LogAnalyzer import Test, TestResult @@ -15,7 +14,7 @@ class TestVCC(Test): self.result = TestResult() self.result.status = TestResult.StatusType.GOOD - if not "CURR" in logdata.channels: + if "CURR" not in logdata.channels: self.result.status = TestResult.StatusType.UNKNOWN self.result.statusMessage = "No CURR log data" return @@ -24,7 +23,7 @@ class TestVCC(Test): try: vccMin = logdata.channels["CURR"]["Vcc"].min() vccMax = logdata.channels["CURR"]["Vcc"].max() - except KeyError as e: + except KeyError: vccMin = logdata.channels["POWR"]["Vcc"].min() vccMax = logdata.channels["POWR"]["Vcc"].max() vccMin *= 1000 diff --git a/Tools/LogAnalyzer/tests/TestVibration.py b/Tools/LogAnalyzer/tests/TestVibration.py index 9024f1604a..ac7d6e9979 100644 --- a/Tools/LogAnalyzer/tests/TestVibration.py +++ b/Tools/LogAnalyzer/tests/TestVibration.py @@ -1,3 +1,6 @@ +# AP_FLAKE8_CLEAN + + from __future__ import print_function import DataflashLog @@ -21,13 +24,12 @@ class TestVibration(Test): return # constants - gravity = -9.81 aimRangeWarnXY = 1.5 aimRangeFailXY = 3.0 aimRangeWarnZ = 2.0 # gravity +/- aim range aimRangeFailZ = 5.0 # gravity +/- aim range - if not "IMU" in logdata.channels: + if "IMU" not in logdata.channels: self.result.status = TestResult.StatusType.UNKNOWN self.result.statusMessage = "No IMU log data" return @@ -40,11 +42,11 @@ class TestVibration(Test): return # for now we'll just use the first (largest) chunk of LOITER data - # TODO: ignore the first couple of secs to avoid bad data during transition - or can we check more analytically that we're stable? + # TODO: ignore the first couple of secs to avoid bad data during transition - or can we check more analytically + # that we're stable? # TODO: accumulate all LOITER chunks over min size, or just use the largest one? startLine = chunks[0][0] endLine = chunks[0][1] - # print("TestVibration using LOITER chunk from lines %s to %s" % (repr(startLine), repr(endLine))) def getStdDevIMU(logdata, channelName, startLine, endLine): loiterData = logdata.channels["IMU"][channelName].getSegment(startLine, endLine)