Ardupilot2/libraries/AP_HAL/examples/DSP_test/gyro_isb_reader.py
Andy Piper e2ef0bd36e AP_HAL: collect data for three largest peaks
new dsp peak detection algorithm
add DSP sketch with frequency ascii art
tool to generate gyro data frames from batch sampled DF logs
add generated data from real Y6B flight
allow fft_start() to use ObjectBuffer<float> for lock-free access
allow ObjectBuffer to be resized
2020-05-24 07:43:34 +10:00

134 lines
4.0 KiB
Python

#!/usr/bin/env python
'''
extract ISBH and ISBD messages from AP_Logging files and produce C++ arrays for consumption by the DSP subsystem
'''
from __future__ import print_function
import os
import sys
import time
import copy
import numpy
from argparse import ArgumentParser
parser = ArgumentParser(description=__doc__)
parser.add_argument("--condition", default=None, help="select packets by condition")
parser.add_argument("logs", metavar="LOG", nargs="+")
args = parser.parse_args()
from pymavlink import mavutil
def isb_parser(logfile):
'''display fft for raw ACC data in logfile'''
'''object to store data about a single FFT plot'''
class ISBData(object):
def __init__(self, ffth):
self.seqno = -1
self.fftnum = ffth.N
self.sensor_type = ffth.type
self.instance = ffth.instance
self.sample_rate_hz = ffth.smp_rate
self.multiplier = ffth.mul
self.data = {}
self.data["X"] = []
self.data["Y"] = []
self.data["Z"] = []
self.holes = False
self.freq = None
def add_isb(self, fftd):
if fftd.N != self.fftnum:
print("Skipping ISBD with wrong fftnum (%u vs %u)\n" % (fftd.fftnum, self.fftnum), file=sys.stderr)
return
if self.holes:
print("Skipping ISBD(%u) for ISBH(%u) with holes in it" % (fftd.seqno, self.fftnum), file=sys.stderr)
return
if fftd.seqno != self.seqno+1:
print("ISBH(%u) has holes in it" % fftd.N, file=sys.stderr)
self.holes = True
return
self.seqno += 1
self.data["X"].extend(fftd.x)
self.data["Y"].extend(fftd.y)
self.data["Z"].extend(fftd.z)
def prefix(self):
if self.sensor_type == 0:
return "Accel"
elif self.sensor_type == 1:
return "Gyro"
else:
return "?Unknown Sensor Type?"
def tag(self):
return str(self)
def __str__(self):
return "%s[%u]" % (self.prefix(), self.instance)
mlog = mavutil.mavlink_connection(logfile)
isb_frames = []
isbdata = None
msgcount = 0
start_time = time.time()
while True:
m = mlog.recv_match(condition=args.condition)
if m is None:
break
msgcount += 1
if msgcount % 1000 == 0:
sys.stderr.write(".")
msg_type = m.get_type()
if msg_type == "ISBH":
if isbdata is not None:
sensor = isbdata.tag()
isb_frames.append(isbdata)
isbdata = ISBData(m)
continue
if msg_type == "ISBD":
if isbdata is None:
sys.stderr.write("?(fftnum=%u)" % m.N)
continue
isbdata.add_isb(m)
print("", file=sys.stderr)
time_delta = time.time() - start_time
print("Extracted %u fft data sets" % len(isb_frames), file=sys.stderr)
sample_rates = {}
counts = {}
print("#include \"GyroFrame.h\"")
print("const uint32_t NUM_FRAMES = %d;" % len(isb_frames))
print("const GyroFrame gyro_frames[] = {")
sample_rate = None
for isb_frame in isb_frames:
if isb_frame.sensor_type == 1 and isb_frame.instance == 0: # gyro data
print(" {")
for axis in [ "X","Y","Z" ]:
# normalize data
d = numpy.array(isb_frame.data[axis]) # / float(isb_frame.multiplier)
print(" { " + ", ".join(d.astype(numpy.dtype(str))) + " },")
if len(d) == 0:
print("No data?!?!?!", file=sys.stderr)
if sample_rate is None:
sample_rate = isb_frame.sample_rate_hz
print(" },")
print("};")
print("const uint16_t SAMPLE_RATE = %d;" % sample_rate)
for filename in args.logs:
isb_parser(filename)