2017-07-21 20:09:51 -03:00
from __future__ import print_function
2014-02-12 15:55:00 -04:00
from LogAnalyzer import Test , TestResult
import DataflashLog
class TestDupeLogData ( Test ) :
''' test for duplicated data in log, which has been happening on PX4/Pixhawk '''
def __init__ ( self ) :
2014-06-27 20:11:23 -03:00
Test . __init__ ( self )
2014-02-12 15:55:00 -04:00
self . name = " Dupe Log Data "
def __matchSample ( self , sample , sampleStartIndex , logdata ) :
''' return the line number where a match is found, otherwise return False '''
# ignore if all data in sample is the same value
nSame = 0
for s in sample :
if s [ 1 ] == sample [ 0 ] [ 1 ] :
nSame + = 1
if nSame == 20 :
return False
# c
data = logdata . channels [ " ATT " ] [ " Pitch " ] . listData
for i in range ( sampleStartIndex , len ( data ) ) :
2017-07-21 20:09:51 -03:00
#print("Checking against index %d" % i)
2014-02-12 15:55:00 -04:00
if i == sampleStartIndex :
continue # skip matching against ourselves
j = 0
while j < 20 and ( i + j ) < len ( data ) and data [ i + j ] [ 1 ] == sample [ j ] [ 1 ] :
2017-07-21 20:09:51 -03:00
#print("### Match found, j=%d, data=%f, sample=%f, log data matched to sample at line %d" % (j,data[i+j][1],sample[j][1],data[i+j][0]))
2014-02-12 15:55:00 -04:00
j + = 1
if j == 20 : # all samples match
return data [ i ] [ 0 ]
return False
2014-03-03 13:46:17 -04:00
def run ( self , logdata , verbose ) :
2014-02-12 15:55:00 -04:00
self . result = TestResult ( )
2014-06-15 18:35:14 -03:00
self . result . status = TestResult . StatusType . GOOD
2014-02-12 15:55:00 -04:00
# this could be made more flexible by not hard-coding to use ATT data, could make it dynamic based on whatever is available as long as it is highly variable
if " ATT " not in logdata . channels :
self . result . status = TestResult . StatusType . UNKNOWN
self . result . statusMessage = " No ATT log data "
return
# pick 10 sample points within the range of ATT data we have
sampleStartIndices = [ ]
attStartIndex = 0
attEndIndex = len ( logdata . channels [ " ATT " ] [ " Pitch " ] . listData ) - 1
2020-07-16 20:22:10 -03:00
step = int ( attEndIndex / 11 )
2014-02-12 15:55:00 -04:00
for i in range ( step , attEndIndex - step , step ) :
sampleStartIndices . append ( i )
2017-07-21 20:09:51 -03:00
#print("Dupe data sample point index %d at line %d" % (i, logdata.channels["ATT"]["Pitch"].listData[i][0]))
2014-02-12 15:55:00 -04:00
# get 20 datapoints of pitch from each sample location and check for a match elsewhere
sampleIndex = 0
for i in range ( sampleStartIndices [ 0 ] , len ( logdata . channels [ " ATT " ] [ " Pitch " ] . listData ) ) :
if i == sampleStartIndices [ sampleIndex ] :
2017-07-21 20:09:51 -03:00
#print("Checking sample %d" % i)
2014-02-12 15:55:00 -04:00
sample = logdata . channels [ " ATT " ] [ " Pitch " ] . listData [ i : i + 20 ]
matchedLine = self . __matchSample ( sample , i , logdata )
if matchedLine :
2017-07-21 20:09:51 -03:00
#print("Data from line %d found duplicated at line %d" % (sample[0][0],matchedLine))
2014-02-12 15:55:00 -04:00
self . result . status = TestResult . StatusType . FAIL
self . result . statusMessage = " Duplicate data chunks found in log ( %d and %d ) " % ( sample [ 0 ] [ 0 ] , matchedLine )
return
sampleIndex + = 1
if sampleIndex > = len ( sampleStartIndices ) :
break