2017-07-21 20:09:51 -03:00
from __future__ import print_function
2014-02-12 15:55:00 -04:00
import DataflashLog
2022-07-18 11:09:04 -03:00
from LogAnalyzer import Test , TestResult
2014-02-12 15:55:00 -04:00
class TestDupeLogData ( Test ) :
2022-07-18 11:09:04 -03:00
''' test for duplicated data in log, which has been happening on PX4/Pixhawk '''
def __init__ ( self ) :
Test . __init__ ( self )
self . name = " Dupe Log Data "
def __matchSample ( self , sample , sampleStartIndex , logdata ) :
''' return the line number where a match is found, otherwise return False '''
# ignore if all data in sample is the same value
nSame = 0
for s in sample :
if s [ 1 ] == sample [ 0 ] [ 1 ] :
nSame + = 1
if nSame == 20 :
return False
# c
data = logdata . channels [ " ATT " ] [ " Pitch " ] . listData
for i in range ( sampleStartIndex , len ( data ) ) :
# print("Checking against index %d" % i)
if i == sampleStartIndex :
continue # skip matching against ourselves
j = 0
while j < 20 and ( i + j ) < len ( data ) and data [ i + j ] [ 1 ] == sample [ j ] [ 1 ] :
# print("### Match found, j=%d, data=%f, sample=%f, log data matched to sample at line %d" % (j,data[i+j][1],sample[j][1],data[i+j][0]))
j + = 1
if j == 20 : # all samples match
return data [ i ] [ 0 ]
return False
def run ( self , logdata , verbose ) :
self . result = TestResult ( )
self . result . status = TestResult . StatusType . GOOD
# this could be made more flexible by not hard-coding to use ATT data, could make it dynamic based on whatever is available as long as it is highly variable
if " ATT " not in logdata . channels :
self . result . status = TestResult . StatusType . UNKNOWN
self . result . statusMessage = " No ATT log data "
return
# pick 10 sample points within the range of ATT data we have
sampleStartIndices = [ ]
attStartIndex = 0
attEndIndex = len ( logdata . channels [ " ATT " ] [ " Pitch " ] . listData ) - 1
step = int ( attEndIndex / 11 )
for i in range ( step , attEndIndex - step , step ) :
sampleStartIndices . append ( i )
# print("Dupe data sample point index %d at line %d" % (i, logdata.channels["ATT"]["Pitch"].listData[i][0]))
# get 20 datapoints of pitch from each sample location and check for a match elsewhere
sampleIndex = 0
for i in range ( sampleStartIndices [ 0 ] , len ( logdata . channels [ " ATT " ] [ " Pitch " ] . listData ) ) :
if i == sampleStartIndices [ sampleIndex ] :
# print("Checking sample %d" % i)
sample = logdata . channels [ " ATT " ] [ " Pitch " ] . listData [ i : i + 20 ]
matchedLine = self . __matchSample ( sample , i , logdata )
if matchedLine :
# print("Data from line %d found duplicated at line %d" % (sample[0][0],matchedLine))
self . result . status = TestResult . StatusType . FAIL
self . result . statusMessage = " Duplicate data chunks found in log ( %d and %d ) " % (
sample [ 0 ] [ 0 ] ,
matchedLine ,
)
return
sampleIndex + = 1
if sampleIndex > = len ( sampleStartIndices ) :
break