diff --git a/Tools/scripts/tempcal_IMU.py b/Tools/scripts/tempcal_IMU.py index ab6c5410dc..f931cc1af9 100755 --- a/Tools/scripts/tempcal_IMU.py +++ b/Tools/scripts/tempcal_IMU.py @@ -5,8 +5,11 @@ Create temperature calibration parameters for IMUs based on log data. from argparse import ArgumentParser parser = ArgumentParser(description=__doc__) -parser.add_argument("--outfile", default="tcal.parm") -parser.add_argument("--no-graph", action='store_true', default=False) +parser.add_argument("--outfile", default="tcal.parm", help='set output file') +parser.add_argument("--no-graph", action='store_true', default=False, help='disable graph display') +parser.add_argument("--log-parm", action='store_true', default=False, help='show corrections using coefficients from log file') +parser.add_argument("--online", action='store_true', default=False, help='use online polynomial fitting') +parser.add_argument("--tclr", action='store_true', default=False, help='use TCLR messages from log instead of IMU messages') parser.add_argument("log", metavar="LOG") args = parser.parse_args() @@ -27,11 +30,201 @@ POLY_ORDER = 3 # use exponential notation SCALE_FACTOR = 1.0e6 -def moving_average(data, w): - '''apply a moving average filter over a window of width w''' - ret = np.cumsum(data) - ret[w:] = ret[w:] - ret[:-w] - return ret[w - 1:] / w +AXES = ['X','Y','Z'] +AXEST = ['X','Y','Z','T','time'] + +class Coefficients: + '''class representing a set of coefficients''' + def __init__(self): + self.acoef = {} + self.gcoef = {} + self.enable = [0]*3 + self.tmin = [-100]*3 + self.tmax = [-100]*3 + self.gtcal = {} + self.atcal = {} + + def set_accel_poly(self, imu, axis, values): + if not imu in self.acoef: + self.acoef[imu] = {} + if not axis in self.acoef[imu]: + self.acoef[imu][axis] = [0]*4 + self.acoef[imu][axis] = values + + def set_gyro_poly(self, imu, axis, values): + if not imu in self.gcoef: + self.gcoef[imu] = {} + if not axis in self.gcoef[imu]: + self.gcoef[imu][axis] = [0]*4 + self.gcoef[imu][axis] = values + + def set_acoeff(self, imu, axis, order, value): + if not imu in self.acoef: + self.acoef[imu] = {} + if not axis in self.acoef[imu]: + self.acoef[imu][axis] = [0]*4 + self.acoef[imu][axis][POLY_ORDER-order] = value + + def set_gcoeff(self, imu, axis, order, value): + if not imu in self.gcoef: + self.gcoef[imu] = {} + if not axis in self.gcoef[imu]: + self.gcoef[imu][axis] = [0]*4 + self.gcoef[imu][axis][POLY_ORDER-order] = value + + def set_tmin(self, imu, tmin): + self.tmin[imu] = tmin + + def set_tmax(self, imu, tmax): + self.tmax[imu] = tmax + + def set_gyro_tcal(self, imu, value): + self.gtcal[imu] = value + + def set_accel_tcal(self, imu, value): + self.atcal[imu] = value + + def set_enable(self, imu, value): + self.enable[imu] = value + + def correction(self, coeff, imu, temperature, axis): + '''calculate correction from temperature calibration from log data using parameters''' + if self.enable[imu] != 1.0: + return 0.0 + tmid = 0.5 * (self.tmax[imu] + self.tmin[imu]) + if tmid <= 0: + return 0.0 + if self.atcal[imu] <= -80: + return 0.0 + if not axis in coeff: + return 0.0 + temperature = constrain(temperature, self.tmin[imu], self.tmax[imu]) + cal_temp = constrain(self.atcal[imu], self.tmin[imu], self.tmax[imu]) + poly = np.poly1d(coeff[axis]) + ret = poly(self.atcal[imu] - tmid) - poly(temperature-tmid) + return ret + + def correction_accel(self, imu, temperature): + '''calculate accel correction from temperature calibration from + log data using parameters''' + return Vector3(self.correction(self.acoef[imu], imu, temperature, 'X'), + self.correction(self.acoef[imu], imu, temperature, 'Y'), + self.correction(self.acoef[imu], imu, temperature, 'Z')) + + + def correction_gyro(self, imu, temperature): + '''calculate gyro correction from temperature calibration from + log data using parameters''' + return Vector3(self.correction(self.gcoef[imu], imu, temperature, 'X'), + self.correction(self.gcoef[imu], imu, temperature, 'Y'), + self.correction(self.gcoef[imu], imu, temperature, 'Z')) + + def param_string(self, imu): + params = '' + params += 'INS_TCAL%u_ENABLE 1\n' % (imu+1) + params += 'INS_TCAL%u_TMIN %.1f\n' % (imu+1, self.tmin[imu]) + params += 'INS_TCAL%u_TMAX %.1f\n' % (imu+1, self.tmax[imu]) + # note that we don't save the first term of the polynomial as that is a + # constant offset which is already handled by the accel/gyro constant + # offsets. We only same the temperature dependent part of the + # calibration + for p in range(POLY_ORDER): + for axis in AXES: + params += 'INS_TCAL%u_ACC%u_%s %.9f\n' % (imu+1, p+1, axis, self.acoef[imu][axis][POLY_ORDER-(p+1)]*SCALE_FACTOR) + for p in range(POLY_ORDER): + for axis in AXES: + params += 'INS_TCAL%u_GYR%u_%s %.9f\n' % (imu+1, p+1, axis, self.gcoef[imu][axis][POLY_ORDER-(p+1)]*SCALE_FACTOR) + return params + + +class OnlineIMUfit: + '''implement the online learning used in ArduPilot''' + def __init__(self): + pass + + def update(self, x, y): + temp = 1.0 + + for i in range(2*(self.porder - 1), -1, -1): + k = 0 if (i < self.porder) else (i - self.porder + 1) + for j in range(i - k, k-1, -1): + self.mat[j][i-j] += temp + temp *= x + + temp = 1.0 + for i in range(self.porder-1, -1, -1): + self.vec[i] += y * temp + temp *= x + + def get_polynomial(self): + inv_mat = np.linalg.inv(self.mat) + res = np.zeros(self.porder) + for i in range(self.porder): + res[i] = 0.0 + for j in range(self.porder): + res[i] += inv_mat[i][j] * self.vec[j] + return res + + def polyfit(self, x, y, order): + self.porder = order + 1 + self.mat = np.zeros((self.porder, self.porder)) + self.vec = np.zeros(self.porder) + for i in range(len(x)): + self.update(x[i], y[i]) + return self.get_polynomial() + +class IMUData: + def __init__(self): + self.accel = {} + self.gyro = {} + + def IMUs(self): + '''return list of IMUs''' + return self.accel.keys() + + def add_accel(self, imu, temperature, time, value): + if not imu in self.accel: + self.accel[imu] = {} + for axis in AXEST: + self.accel[imu][axis] = np.zeros(0,dtype=float) + self.accel[imu]['T'] = np.append(self.accel[imu]['T'], temperature) + self.accel[imu]['X'] = np.append(self.accel[imu]['X'], value.x) + self.accel[imu]['Y'] = np.append(self.accel[imu]['Y'], value.y) + self.accel[imu]['Z'] = np.append(self.accel[imu]['Z'], value.z) + self.accel[imu]['time'] = np.append(self.accel[imu]['time'], time) + + def add_gyro(self, imu, temperature, time, value): + if not imu in self.gyro: + self.gyro[imu] = {} + for axis in AXEST: + self.gyro[imu][axis] = np.zeros(0,dtype=float) + self.gyro[imu]['T'] = np.append(self.gyro[imu]['T'], temperature) + self.gyro[imu]['X'] = np.append(self.gyro[imu]['X'], value.x) + self.gyro[imu]['Y'] = np.append(self.gyro[imu]['Y'], value.y) + self.gyro[imu]['Z'] = np.append(self.gyro[imu]['Z'], value.z) + self.gyro[imu]['time'] = np.append(self.gyro[imu]['time'], time) + + def moving_average(self, data, w): + '''apply a moving average filter over a window of width w''' + ret = np.cumsum(data) + ret[w:] = ret[w:] - ret[:-w] + return ret[w - 1:] / w + + def FilterArray(self, data, width_s): + '''apply moving average filter of width width_s seconds''' + nseconds = data['time'][-1] - data['time'][0] + nsamples = len(data['time']) + window = int(nsamples / nseconds) * width_s + if window > 1: + for axis in AXEST: + data[axis] = self.moving_average(data[axis], window) + return data + + def Filter(self, width_s): + '''apply moving average filter of width width_s seconds''' + for imu in self.IMUs(): + self.accel[imu] = self.FilterArray(self.accel[imu], width_s) + self.gyro[imu] = self.FilterArray(self.gyro[imu], width_s) def constrain(value, minv, maxv): """Constrain a value to a range.""" @@ -41,178 +234,160 @@ def constrain(value, minv, maxv): value = maxv return value -def correction_parm(enable, tmin, tmax, coeff, temperature, cal_temp, axis): - '''calculate correction from temperature calibration from log data using parameters''' - if enable != 1.0: - return 0.0 - tmid = 0.5 * (tmax + tmin) - if tmid <= 0: - return 0.0 - if cal_temp <= -80: - return 0.0 - if not axis in coeff: - return 0.0 - temperature = constrain(temperature, tmin, tmax) - cal_temp = constrain(cal_temp, tmin, tmax) - poly = np.poly1d(coeff[axis]) - ret = poly(cal_temp-tmid) - poly(temperature-tmid) - return ret - def IMUfit(logfile): '''find IMU calibration parameters from a log file''' print("Processing log %s" % logfile) mlog = mavutil.mavlink_connection(logfile) - accel = {} - gyro = {} - acoef = {} - gcoef = {} - enable = [0]*3 - tmin = [-100]*3 - tmax = [-100]*3 - gtcal = {} - atcal = {} + data = IMUData() + + c = Coefficients() orientation = 0 - axes = ['X','Y','Z'] - axesT = ['X','Y','Z','T','time'] + + stop_capture = [ False ] * 3 + + if args.tclr: + messages = ['PARM','TCLR'] + else: + messages = ['PARM','IMU'] while True: - msg = mlog.recv_match(type=['IMU','PARM']) + msg = mlog.recv_match(type=messages) if msg is None: break if msg.get_type() == 'PARM': # build up the old coefficients so we can remove the impact of # existing coefficients from the data + m = re.match("^INS_TCAL(\d)_ENABLE$", msg.Name) + if m: + imu = int(m.group(1))-1 + if stop_capture[imu]: + continue + if msg.Value == 1 and c.enable[imu] == 2: + print("TCAL[%u] enabled" % imu) + stop_capture[imu] = True + continue + if msg.Value == 0 and c.enable[imu] == 1: + print("TCAL[%u] disabled" % imu) + stop_capture[imu] = True + continue + c.set_enable(imu, msg.Value) m = re.match("^INS_TCAL(\d)_([AG]..)([1-3])_([XYZ])$", msg.Name) if m: imu = int(m.group(1))-1 stype = m.group(2) p = int(m.group(3)) axis = m.group(4) + if stop_capture[imu]: + continue if stype == 'ACC': - if not imu in acoef: - acoef[imu] = {} - if not axis in acoef[imu]: - acoef[imu][axis] = [0.0]*4 - acoef[imu][axis][POLY_ORDER-p] = msg.Value/SCALE_FACTOR + c.set_acoeff(imu, axis, p, msg.Value/SCALE_FACTOR) if stype == 'GYR': - if not imu in gcoef: - gcoef[imu] = {} - if not axis in gcoef[imu]: - gcoef[imu][axis] = [0.0]*4 - gcoef[imu][axis][POLY_ORDER-p] = msg.Value/SCALE_FACTOR - m = re.match("^INS_TCAL(\d)_ENABLE$", msg.Name) - if m: - imu = int(m.group(1))-1 - if msg.Value == 1 and enable[imu] == 2: - print("TCAL[%u] enabled" % imu) - break - enable[imu] = msg.Value + c.set_gcoeff(imu, axis, p, msg.Value/SCALE_FACTOR) m = re.match("^INS_TCAL(\d)_TMIN$", msg.Name) if m: imu = int(m.group(1))-1 - tmin[imu] = msg.Value + if stop_capture[imu]: + continue + c.set_tmin(imu, msg.Value) m = re.match("^INS_TCAL(\d)_TMAX", msg.Name) if m: imu = int(m.group(1))-1 - tmax[imu] = msg.Value + if stop_capture[imu]: + continue + c.set_tmax(imu, msg.Value) m = re.match("^INS_GYR_CALTEMP(\d)", msg.Name) if m: imu = int(m.group(1))-1 - gtcal[imu] = msg.Value + if stop_capture[imu]: + continue + c.set_gyro_tcal(imu, msg.Value) m = re.match("^INS_ACC_CALTEMP(\d)", msg.Name) if m: imu = int(m.group(1))-1 - atcal[imu] = msg.Value + if stop_capture[imu]: + continue + c.set_accel_tcal(imu, msg.Value) if msg.Name == 'AHRS_ORIENTATION': orientation = int(msg.Value) print("Using orientation %d" % orientation) - if msg.get_type() != 'IMU': - continue + if msg.get_type() == 'TCLR' and args.tclr: + imu = msg.I - imu = msg.I + T = msg.Temp + if msg.Si == 0: + # accel + acc = Vector3(msg.X, msg.Y, msg.Z) + time = msg.TimeUS*1.0e-6 + data.add_accel(imu, T, time, acc) + elif msg.Si == 1: + # gyro + gyr = Vector3(msg.X, msg.Y, msg.Z) + time = msg.TimeUS*1.0e-6 + data.add_gyro(imu, T, time, gyr) - if imu not in accel: - accel[imu] = {} - gyro[imu] = {} - for axis in axesT: - accel[imu][axis] = np.zeros(0,dtype=float) - gyro[imu][axis] = np.zeros(0,dtype=float) + if msg.get_type() == 'IMU' and not args.tclr: + imu = msg.I - T = msg.T - accel[imu]['T'] = np.append(accel[imu]['T'], T) - gyro[imu]['T'] = np.append(gyro[imu]['T'], T) - accel[imu]['time'] = np.append(accel[imu]['time'], msg.TimeUS*1.0e-6) - gyro[imu]['time'] = np.append(gyro[imu]['time'], msg.TimeUS*1.0e-6) + T = msg.T + acc = Vector3(msg.AccX, msg.AccY, msg.AccZ) + gyr = Vector3(msg.GyrX, msg.GyrY, msg.GyrZ) - acc = Vector3(msg.AccX, msg.AccY, msg.AccZ) - gyr = Vector3(msg.GyrX, msg.GyrY, msg.GyrZ) + # invert the board orientation rotation. Corrections are in sensor frame + if orientation != 0: + acc = acc.rotate_by_inverse_id(orientation) + gyr = gyr.rotate_by_inverse_id(orientation) + if acc is None or gyr is None: + print("Invalid AHRS_ORIENTATION %u" % orientation) + sys.exit(1) - # invert the board orientation rotation. Corrections are in sensor frame - if orientation != 0: - acc = acc.rotate_by_inverse_id(orientation) - gyr = gyr.rotate_by_inverse_id(orientation) - if acc is None or gyr is None: - print("Invalid AHRS_ORIENTATION %u" % orientation) - sys.exit(1) + if c.enable[imu] == 1: + acc -= c.correction_accel(imu, T) + gyr -= c.correction_gyro(imu, T) - for axis in axes: - value = getattr(acc, axis.lower()) - if enable[imu] == 1: - value -= correction_parm(enable[imu], tmin[imu], tmax[imu], acoef[imu], - T, atcal[imu], axis) - accel[imu][axis] = np.append(accel[imu][axis], value) + time = msg.TimeUS*1.0e-6 + data.add_accel(imu, T, time, acc) + data.add_gyro (imu, T, time, gyr) - value = getattr(gyr, axis.lower()) - if enable[imu] == 1: - value -= correction_parm(enable[imu], tmin[imu], tmax[imu], gcoef[imu], - T, gtcal[imu], axis) - gyro[imu][axis] = np.append(gyro[imu][axis], value) + if len(data.IMUs()) == 0: + print("No data found") + sys.exit(1) - # apply moving average filter with 2s width - for imu in accel: - nseconds = accel[imu]['time'][-1] - accel[imu]['time'][0] - nsamples = len(accel[imu]['time']) - window = int(nsamples / nseconds) * 2 + print("Loaded %u accel and %u gyro samples" % (len(data.accel[0]['T']),len(data.gyro[0]['T']))) - for axis in axesT: - accel[imu][axis] = moving_average(accel[imu][axis], window) - gyro[imu][axis] = moving_average(gyro[imu][axis], window) + if not args.tclr: + # apply moving average filter with 2s width + data.Filter(2) - trel = {} + clog = c + c = Coefficients() calfile = open(args.outfile, "w") - for imu in accel: - tmin = np.amin(accel[imu]['T']) - tmax = np.amax(accel[imu]['T']) + for imu in data.IMUs(): + tmin = np.amin(data.accel[imu]['T']) + tmax = np.amax(data.accel[imu]['T']) tref = (tmin+tmax)*0.5 - acoef[imu] = {} - gcoef[imu] = {} + c.set_tmin(imu, tmin) + c.set_tmax(imu, tmax) - trel[imu] = accel[imu]['T'] - tref + for axis in AXES: + if args.online: + fit = OnlineIMUfit() + trel = data.accel[imu]['T'] - tref + c.set_accel_poly(imu, axis, fit.polyfit(trel, data.accel[imu][axis] - np.median(data.accel[imu][axis]), POLY_ORDER)) + trel = data.gyro[imu]['T'] - tref + c.set_gyro_poly(imu, axis, fit.polyfit(trel, data.gyro[imu][axis], POLY_ORDER)) + else: + trel = data.accel[imu]['T'] - tref + c.set_accel_poly(imu, axis, np.polyfit(trel, data.accel[imu][axis] - np.median(data.accel[imu][axis]), POLY_ORDER)) + trel = data.gyro[imu]['T'] - tref + c.set_gyro_poly(imu, axis, np.polyfit(trel, data.gyro[imu][axis], POLY_ORDER)) - for axis in axes: - acoef[imu][axis] = np.polyfit(trel[imu], accel[imu][axis] - np.median(accel[imu][axis]), POLY_ORDER) - gcoef[imu][axis] = np.polyfit(trel[imu], gyro[imu][axis], POLY_ORDER) - - params = '' - params += 'INS_TCAL%u_ENABLE 1\n' % (imu+1) - params += 'INS_TCAL%u_TMIN %.1f\n' % (imu+1, tmin) - params += 'INS_TCAL%u_TMAX %.1f\n' % (imu+1, tmax) - # note that we don't save the first term of the polynomial as that is a - # constant offset which is already handled by the accel/gyro constant - # offsets. We only same the temperature dependent part of the - # calibration - for p in range(POLY_ORDER): - for axis in axes: - params += 'INS_TCAL%u_ACC%u_%s %.9f\n' % (imu+1, p+1, axis, acoef[imu][axis][POLY_ORDER-(p+1)]*SCALE_FACTOR) - for p in range(POLY_ORDER): - for axis in axes: - params += 'INS_TCAL%u_GYR%u_%s %.9f\n' % (imu+1, p+1, axis, gcoef[imu][axis][POLY_ORDER-(p+1)]*SCALE_FACTOR) + params = c.param_string(imu) print(params) calfile.write(params) @@ -221,40 +396,61 @@ def IMUfit(logfile): if args.no_graph: return - fig, axs = pyplot.subplots(len(gyro), 1, sharex=True) + fig, axs = pyplot.subplots(len(data.IMUs()), 1, sharex=True) - if len(gyro) == 1: + num_imus = len(data.IMUs()) + if num_imus == 1: axs = [axs] - for imu in gyro: + for imu in data.IMUs(): scale = math.degrees(1) - for axis in axes: - axs[imu].plot(gyro[imu]['time'], gyro[imu][axis]*scale, label='Uncorrected %s' % axis) - for axis in axes: - poly = np.poly1d(gcoef[imu][axis]) - correction = poly(trel[imu]) - axs[imu].plot(gyro[imu]['time'], (gyro[imu][axis] - correction)*scale, label='Corrected %s' % axis) + for axis in AXES: + axs[imu].plot(data.gyro[imu]['time'], data.gyro[imu][axis]*scale, label='Uncorrected %s' % axis) + for axis in AXES: + poly = np.poly1d(c.gcoef[imu][axis]) + trel = data.gyro[imu]['T'] - tref + correction = poly(trel) + axs[imu].plot(data.gyro[imu]['time'], (data.gyro[imu][axis] - correction)*scale, label='Corrected %s' % axis) + if args.log_parm: + for axis in AXES: + if clog.enable[imu] == 0.0: + print("IMU[%u] disabled in log parms" % imu) + continue + poly = np.poly1d(clog.gcoef[imu][axis]) + tmid = 0.5*(clog.tmin[imu]+clog.tmax[imu]) + correction = poly(data.gyro[imu]['T'] - tmid) + axs[imu].plot(data.gyro[imu]['time'], (data.gyro[imu][axis] - correction)*scale, label='Corrected %s (log parm)' % axis) ax2 = axs[imu].twinx() - ax2.plot(gyro[imu]['time'], gyro[imu]['T'], label='Temperature(C)', color='black') + ax2.plot(data.gyro[imu]['time'], data.gyro[imu]['T'], label='Temperature(C)', color='black') ax2.legend(loc='upper right') axs[imu].legend(loc='upper left') axs[imu].set_title('IMU[%u] Gyro (deg/s)' % imu) - fig, axs = pyplot.subplots(len(accel), 1, sharex=True) - if len(accel) == 1: + fig, axs = pyplot.subplots(num_imus, 1, sharex=True) + if num_imus == 1: axs = [axs] - for imu in accel: + for imu in data.IMUs(): mean = {} - for axis in axes: - mean[axis] = np.mean(accel[imu][axis]) - axs[imu].plot(accel[imu]['time'], accel[imu][axis] - mean[axis], label='Uncorrected %s' % axis) - for axis in axes: - poly = np.poly1d(acoef[imu][axis]) - correction = poly(trel[imu]) - axs[imu].plot(accel[imu]['time'], (accel[imu][axis]-mean[axis]) - correction, label='Corrected %s' % axis) + for axis in AXES: + mean[axis] = np.mean(data.accel[imu][axis]) + axs[imu].plot(data.accel[imu]['time'], data.accel[imu][axis] - mean[axis], label='Uncorrected %s' % axis) + for axis in AXES: + poly = np.poly1d(c.acoef[imu][axis]) + trel = data.accel[imu]['T'] - tref + correction = poly(trel) + axs[imu].plot(data.accel[imu]['time'], (data.accel[imu][axis]-mean[axis]) - correction, label='Corrected %s' % axis) + if args.log_parm: + for axis in AXES: + if clog.enable[imu] == 0.0: + print("IMU[%u] disabled in log parms" % imu) + continue + poly = np.poly1d(clog.acoef[imu][axis]) + tmid = 0.5*(clog.tmin[imu]+clog.tmax[imu]) + correction = poly(data.accel[imu]['T'] - tmid) + axs[imu].plot(data.accel[imu]['time'], (data.accel[imu][axis]-mean[axis]) - correction, label='Corrected %s (log parm)' % axis) ax2 = axs[imu].twinx() - ax2.plot(accel[imu]['time'], accel[imu]['T'], label='Temperature(C)', color='black') + ax2.plot(data.accel[imu]['time'], data.accel[imu]['T'], label='Temperature(C)', color='black') ax2.legend(loc='upper right') axs[imu].legend(loc='upper left') axs[imu].set_title('IMU[%u] Accel (m/s^2)' % imu)