wind_est: extract utility functions to separate file

This commit is contained in:
bresch 2022-09-22 15:59:10 +02:00 committed by Mathieu Bresciani
parent 2549054b28
commit 89bc28e836
2 changed files with 149 additions and 72 deletions

View File

@ -1,7 +1,43 @@
import os
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Copyright (c) 2022 PX4 Development Team
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
1. Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in
the documentation and/or other materials provided with the
distribution.
3. Neither the name PX4 nor the names of its contributors may be
used to endorse or promote products derived from this software
without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS
OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
File: derivation.py
Description:
Derivation of a wind and airspeed scale (EKF) estimator using SymForce
"""
from symforce import symbolic as sm
from symforce import geo
from symforce import typing as T
from derivation_utils import *
def fuse_airspeed(
v_local: geo.V3,
@ -18,36 +54,12 @@ def fuse_airspeed(
innov = airspeed - airspeed_pred
H = geo.V1(airspeed_pred).jacobian(state)
innov_var = (H * P * H.transpose() + R)[0,0]
innov_var = (H * P * H.T + R)[0,0]
K = P * H.transpose() / sm.Max(innov_var, epsilon)
K = P * H.T / sm.Max(innov_var, epsilon)
return (geo.V3(H), K, innov_var, innov)
# q: quaternion describing rotation from frame 1 to frame 2
# returns a rotation matrix derived form q which describes the same
# rotation
def quat2Rot(q):
q0 = q[0]
q1 = q[1]
q2 = q[2]
q3 = q[3]
Rot = geo.M33([[q0**2 + q1**2 - q2**2 - q3**2, 2*(q1*q2 - q0*q3), 2*(q1*q3 + q0*q2)],
[2*(q1*q2 + q0*q3), q0**2 - q1**2 + q2**2 - q3**2, 2*(q2*q3 - q0*q1)],
[2*(q1*q3-q0*q2), 2*(q2*q3 + q0*q1), q0**2 - q1**2 - q2**2 + q3**2]])
return Rot
def sign_no_zero(x) -> T.Scalar:
"""
Returns -1 if x is negative, 1 if x is positive, and 1 if x is zero
"""
return 2 * sm.Min(sm.sign(x), 0) + 1
def add_epsilon_sign(expr, var, eps):
return expr.subs(var, var + eps * sign_no_zero(var))
def fuse_beta(
v_local: geo.V3,
state: geo.V3,
@ -58,18 +70,18 @@ def fuse_beta(
) -> (geo.V3, geo.V3, T.Scalar, T.Scalar):
vel_rel = geo.V3(v_local[0] - state[0], v_local[1] - state[1], v_local[2])
relative_wind_body = quat2Rot(q_att).T * vel_rel
relative_wind_body = quat_to_rot(q_att).T * vel_rel
# small angle approximation of side slip model
# protect division by zero using epsilon
# Small angle approximation of side slip model
# Protect division by zero using epsilon
beta_pred = add_epsilon_sign(relative_wind_body[1] / relative_wind_body[0], relative_wind_body[0], epsilon)
innov = 0.0 - beta_pred
H = geo.V1(beta_pred).jacobian(state)
innov_var = (H * P * H.transpose() + R)[0,0]
innov_var = (H * P * H.T + R)[0,0]
K = P * H.transpose() / sm.Max(innov_var, epsilon)
K = P * H.T / sm.Max(innov_var, epsilon)
return (geo.V3(H), K, innov_var, innov)
@ -99,44 +111,8 @@ def init_wind_using_airspeed(
return (wind, P)
def generatePx4Function(function_name, output_names):
from symforce.codegen import Codegen, CppConfig
codegen = Codegen.function(
function_name,
output_names=output_names,
config=CppConfig())
metadata = codegen.generate_function(
output_dir="generated",
skip_directory_nesting=True)
generate_px4_function(fuse_airspeed, output_names=["H", "K", "innov_var", "innov"])
generate_px4_function(fuse_beta, output_names=["H", "K", "innov_var", "innov"])
generate_px4_function(init_wind_using_airspeed, output_names=["wind", "P"])
print("Files generated in {}:\n".format(metadata.output_dir))
for f in metadata.generated_files:
print(" |- {}".format(os.path.relpath(f, metadata.output_dir)))
# Replace cstdlib and Eigen functions by PX4 equivalents
import fileinput
with fileinput.FileInput(os.path.abspath(metadata.generated_files[0]), inplace=True, backup='.bak') as file:
for line in file:
line = line.replace("std::max", "math::max")
line = line.replace("std::min", "math::min")
line = line.replace("Eigen", "matrix")
line = line.replace("matrix/Dense", "matrix/math.hpp")
print(line, end='')
def generatePythonFunction(function_name, output_names):
from symforce.codegen import Codegen, PythonConfig
codegen = Codegen.function(
function_name,
output_names=output_names,
config=PythonConfig())
metadata = codegen.generate_function(
output_dir="generated",
skip_directory_nesting=True)
generatePx4Function(fuse_airspeed, output_names=["H", "K", "innov_var", "innov"])
generatePx4Function(fuse_beta, output_names=["H", "K", "innov_var", "innov"])
generatePx4Function(init_wind_using_airspeed, output_names=["wind", "P"])
generatePythonFunction(fuse_airspeed, output_names=["H", "K", "innov_var", "innov"])
generate_python_function(fuse_airspeed, output_names=["H", "K", "innov_var", "innov"])

View File

@ -0,0 +1,101 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Copyright (c) 2022 PX4 Development Team
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
1. Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in
the documentation and/or other materials provided with the
distribution.
3. Neither the name PX4 nor the names of its contributors may be
used to endorse or promote products derived from this software
without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS
OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
File: derivation_utils.py
Description:
Common functions used for the derivation of most estimators
"""
from symforce import symbolic as sm
from symforce import geo
from symforce import typing as T
# q: quaternion describing rotation from frame 1 to frame 2
# returns a rotation matrix derived form q which describes the same
# rotation
def quat_to_rot(q):
q0 = q[0]
q1 = q[1]
q2 = q[2]
q3 = q[3]
Rot = geo.M33([[q0**2 + q1**2 - q2**2 - q3**2, 2*(q1*q2 - q0*q3), 2*(q1*q3 + q0*q2)],
[2*(q1*q2 + q0*q3), q0**2 - q1**2 + q2**2 - q3**2, 2*(q2*q3 - q0*q1)],
[2*(q1*q3-q0*q2), 2*(q2*q3 + q0*q1), q0**2 - q1**2 - q2**2 + q3**2]])
return Rot
def sign_no_zero(x) -> T.Scalar:
"""
Returns -1 if x is negative, 1 if x is positive, and 1 if x is zero
"""
return 2 * sm.Min(sm.sign(x), 0) + 1
def add_epsilon_sign(expr, var, eps):
# Avoids a singularity at 0 while keeping the derivative correct
return expr.subs(var, var + eps * sign_no_zero(var))
def generate_px4_function(function_name, output_names):
from symforce.codegen import Codegen, CppConfig
import os
import fileinput
codegen = Codegen.function(
function_name,
output_names=output_names,
config=CppConfig())
metadata = codegen.generate_function(
output_dir="generated",
skip_directory_nesting=True)
print("Files generated in {}:\n".format(metadata.output_dir))
for f in metadata.generated_files:
print(" |- {}".format(os.path.relpath(f, metadata.output_dir)))
# Replace cstdlib and Eigen functions by PX4 equivalents
with fileinput.FileInput(os.path.abspath(metadata.generated_files[0]), inplace=True, backup='.bak') as file:
for line in file:
line = line.replace("std::max", "math::max")
line = line.replace("std::min", "math::min")
line = line.replace("Eigen", "matrix")
line = line.replace("matrix/Dense", "matrix/math.hpp")
print(line, end='')
def generate_python_function(function_name, output_names):
from symforce.codegen import Codegen, PythonConfig
codegen = Codegen.function(
function_name,
output_names=output_names,
config=PythonConfig())
metadata = codegen.generate_function(
output_dir="generated",
skip_directory_nesting=True)