134 lines
4.0 KiB
Python
134 lines
4.0 KiB
Python
|
#!/usr/bin/env python
|
||
|
|
||
|
'''
|
||
|
extract ISBH and ISBD messages from AP_Logging files and produce C++ arrays for consumption by the DSP subsystem
|
||
|
'''
|
||
|
from __future__ import print_function
|
||
|
|
||
|
import os
|
||
|
import sys
|
||
|
import time
|
||
|
import copy
|
||
|
import numpy
|
||
|
|
||
|
from argparse import ArgumentParser
|
||
|
|
||
|
parser = ArgumentParser(description=__doc__)
|
||
|
parser.add_argument("--condition", default=None, help="select packets by condition")
|
||
|
parser.add_argument("logs", metavar="LOG", nargs="+")
|
||
|
|
||
|
args = parser.parse_args()
|
||
|
|
||
|
from pymavlink import mavutil
|
||
|
|
||
|
def isb_parser(logfile):
|
||
|
'''display fft for raw ACC data in logfile'''
|
||
|
|
||
|
'''object to store data about a single FFT plot'''
|
||
|
class ISBData(object):
|
||
|
def __init__(self, ffth):
|
||
|
self.seqno = -1
|
||
|
self.fftnum = ffth.N
|
||
|
self.sensor_type = ffth.type
|
||
|
self.instance = ffth.instance
|
||
|
self.sample_rate_hz = ffth.smp_rate
|
||
|
self.multiplier = ffth.mul
|
||
|
self.data = {}
|
||
|
self.data["X"] = []
|
||
|
self.data["Y"] = []
|
||
|
self.data["Z"] = []
|
||
|
self.holes = False
|
||
|
self.freq = None
|
||
|
|
||
|
def add_isb(self, fftd):
|
||
|
if fftd.N != self.fftnum:
|
||
|
print("Skipping ISBD with wrong fftnum (%u vs %u)\n" % (fftd.fftnum, self.fftnum), file=sys.stderr)
|
||
|
return
|
||
|
if self.holes:
|
||
|
print("Skipping ISBD(%u) for ISBH(%u) with holes in it" % (fftd.seqno, self.fftnum), file=sys.stderr)
|
||
|
return
|
||
|
if fftd.seqno != self.seqno+1:
|
||
|
print("ISBH(%u) has holes in it" % fftd.N, file=sys.stderr)
|
||
|
self.holes = True
|
||
|
return
|
||
|
self.seqno += 1
|
||
|
self.data["X"].extend(fftd.x)
|
||
|
self.data["Y"].extend(fftd.y)
|
||
|
self.data["Z"].extend(fftd.z)
|
||
|
|
||
|
def prefix(self):
|
||
|
if self.sensor_type == 0:
|
||
|
return "Accel"
|
||
|
elif self.sensor_type == 1:
|
||
|
return "Gyro"
|
||
|
else:
|
||
|
return "?Unknown Sensor Type?"
|
||
|
|
||
|
def tag(self):
|
||
|
return str(self)
|
||
|
|
||
|
def __str__(self):
|
||
|
return "%s[%u]" % (self.prefix(), self.instance)
|
||
|
|
||
|
mlog = mavutil.mavlink_connection(logfile)
|
||
|
|
||
|
isb_frames = []
|
||
|
isbdata = None
|
||
|
msgcount = 0
|
||
|
start_time = time.time()
|
||
|
while True:
|
||
|
m = mlog.recv_match(condition=args.condition)
|
||
|
if m is None:
|
||
|
break
|
||
|
msgcount += 1
|
||
|
if msgcount % 1000 == 0:
|
||
|
sys.stderr.write(".")
|
||
|
msg_type = m.get_type()
|
||
|
if msg_type == "ISBH":
|
||
|
if isbdata is not None:
|
||
|
sensor = isbdata.tag()
|
||
|
isb_frames.append(isbdata)
|
||
|
isbdata = ISBData(m)
|
||
|
continue
|
||
|
|
||
|
if msg_type == "ISBD":
|
||
|
if isbdata is None:
|
||
|
sys.stderr.write("?(fftnum=%u)" % m.N)
|
||
|
continue
|
||
|
isbdata.add_isb(m)
|
||
|
|
||
|
print("", file=sys.stderr)
|
||
|
time_delta = time.time() - start_time
|
||
|
print("Extracted %u fft data sets" % len(isb_frames), file=sys.stderr)
|
||
|
|
||
|
sample_rates = {}
|
||
|
counts = {}
|
||
|
|
||
|
print("#include \"GyroFrame.h\"")
|
||
|
print("const uint32_t NUM_FRAMES = %d;" % len(isb_frames))
|
||
|
print("const GyroFrame gyro_frames[] = {")
|
||
|
|
||
|
sample_rate = None
|
||
|
for isb_frame in isb_frames:
|
||
|
if isb_frame.sensor_type == 1 and isb_frame.instance == 0: # gyro data
|
||
|
print(" {")
|
||
|
|
||
|
for axis in [ "X","Y","Z" ]:
|
||
|
# normalize data
|
||
|
d = numpy.array(isb_frame.data[axis]) # / float(isb_frame.multiplier)
|
||
|
print(" { " + ", ".join(d.astype(numpy.dtype(str))) + " },")
|
||
|
|
||
|
if len(d) == 0:
|
||
|
print("No data?!?!?!", file=sys.stderr)
|
||
|
if sample_rate is None:
|
||
|
sample_rate = isb_frame.sample_rate_hz
|
||
|
print(" },")
|
||
|
|
||
|
print("};")
|
||
|
print("const uint16_t SAMPLE_RATE = %d;" % sample_rate)
|
||
|
|
||
|
|
||
|
|
||
|
for filename in args.logs:
|
||
|
isb_parser(filename)
|