autotest: imported python quadcopter model as sim_quad.py

this allows us to keep it in sync with the main SITL code
This commit is contained in:
Andrew Tridgell 2011-12-02 15:13:50 +11:00
parent 544d2e6793
commit 0887804096
7 changed files with 2911 additions and 0 deletions

View File

@ -0,0 +1,62 @@
import euclid, math, util
class Aircraft(object):
'''a basic aircraft class'''
def __init__(self):
self.home_latitude = 0
self.home_longitude = 0
self.home_altitude = 0
self.ground_level = 0
self.frame_height = 0.0
self.latitude = self.home_latitude
self.longitude = self.home_longitude
self.altitude = self.home_altitude
self.pitch = 0.0 # degrees
self.roll = 0.0 # degrees
self.yaw = 0.0 # degrees
self.pitch_rate = 0.0 # degrees/s
self.roll_rate = 0.0 # degrees/s
self.yaw_rate = 0.0 # degrees/s
self.velocity = euclid.Vector3(0, 0, 0) # m/s, North, East, Up
self.position = euclid.Vector3(0, 0, 0) # m North, East, Up
self.accel = euclid.Vector3(0, 0, 0) # m/s/s North, East, Up
self.mass = 0.0
self.update_frequency = 50 # in Hz
self.gravity = 9.8 # m/s/s
self.accelerometer = euclid.Vector3(0, 0, -self.gravity)
def normalise(self):
'''normalise roll, pitch and yaw
roll between -180 and 180
pitch between -180 and 180
yaw between 0 and 360
'''
def norm(angle, min, max):
while angle > max:
angle -= 360
while angle < min:
angle += 360
return angle
self.roll = norm(self.roll, -180, 180)
self.pitch = norm(self.pitch, -180, 180)
self.yaw = norm(self.yaw, 0, 360)
def update_position(self):
'''update lat/lon/alt from position'''
radius_of_earth = 6378100.0 # in meters
dlat = math.degrees(math.atan(self.position.x/radius_of_earth))
dlon = math.degrees(math.atan(self.position.y/radius_of_earth))
self.altitude = self.home_altitude + self.position.z
self.latitude = self.home_latitude + dlat
self.longitude = self.home_longitude + dlon
# work out what the accelerometer would see
self.accelerometer = util.RPY_to_XYZ(self.roll, self.pitch, self.yaw, 1) * -self.gravity
self.accelerometer -= self.accel

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,209 @@
#!/usr/bin/env python
# parse and construct FlightGear NET FDM packets
# Andrew Tridgell, November 2011
# released under GNU GPL version 2 or later
import struct, math
class fgFDMError(Exception):
'''fgFDM error class'''
def __init__(self, msg):
Exception.__init__(self, msg)
self.message = 'fgFDMError: ' + msg
class fgFDMVariable(object):
'''represent a single fgFDM variable'''
def __init__(self, index, arraylength, units):
self.index = index
self.arraylength = arraylength
self.units = units
class fgFDMVariableList(object):
'''represent a list of fgFDM variable'''
def __init__(self):
self.vars = {}
self._nextidx = 0
def add(self, varname, arraylength=1, units=None):
self.vars[varname] = fgFDMVariable(self._nextidx, arraylength, units=units)
self._nextidx += arraylength
class fgFDM(object):
'''a flightgear native FDM parser/generator'''
def __init__(self):
'''init a fgFDM object'''
self.FG_NET_FDM_VERSION = 24
self.pack_string = '>I 4x 3d 6f 11f 3f 2f I 4I 4f 4f 4f 4f 4f 4f 4f 4f 4f I 4f I 3I 3f 3f 3f I i f 10f'
self.values = [0]*98
self.FG_MAX_ENGINES = 4
self.FG_MAX_WHEELS = 3
self.FG_MAX_TANKS = 4
# supported unit mappings
self.unitmap = {
('radians', 'degrees') : math.degrees(1),
('rps', 'dps') : math.degrees(1),
('feet', 'meters') : 0.3048,
('fps', 'mps') : 0.3048,
('knots', 'mps') : 0.514444444,
('knots', 'fps') : 0.514444444/0.3048,
('fpss', 'mpss') : 0.3048,
('seconds', 'minutes') : 60,
('seconds', 'hours') : 3600,
}
# build a mapping between variable name and index in the values array
# note that the order of this initialisation is critical - it must
# match the wire structure
self.mapping = fgFDMVariableList()
self.mapping.add('version')
# position
self.mapping.add('longitude', units='radians') # geodetic (radians)
self.mapping.add('latitude', units='radians') # geodetic (radians)
self.mapping.add('altitude', units='meters') # above sea level (meters)
self.mapping.add('agl', units='meters') # above ground level (meters)
# attitude
self.mapping.add('phi', units='radians') # roll (radians)
self.mapping.add('theta', units='radians') # pitch (radians)
self.mapping.add('psi', units='radians') # yaw or true heading (radians)
self.mapping.add('alpha', units='radians') # angle of attack (radians)
self.mapping.add('beta', units='radians') # side slip angle (radians)
# Velocities
self.mapping.add('phidot', units='rps') # roll rate (radians/sec)
self.mapping.add('thetadot', units='rps') # pitch rate (radians/sec)
self.mapping.add('psidot', units='rps') # yaw rate (radians/sec)
self.mapping.add('vcas', units='fps') # calibrated airspeed
self.mapping.add('climb_rate', units='fps') # feet per second
self.mapping.add('v_north', units='fps') # north velocity in local/body frame, fps
self.mapping.add('v_east', units='fps') # east velocity in local/body frame, fps
self.mapping.add('v_down', units='fps') # down/vertical velocity in local/body frame, fps
self.mapping.add('v_wind_body_north', units='fps') # north velocity in local/body frame
self.mapping.add('v_wind_body_east', units='fps') # east velocity in local/body frame
self.mapping.add('v_wind_body_down', units='fps') # down/vertical velocity in local/body
# Accelerations
self.mapping.add('A_X_pilot', units='fpss') # X accel in body frame ft/sec^2
self.mapping.add('A_Y_pilot', units='fpss') # Y accel in body frame ft/sec^2
self.mapping.add('A_Z_pilot', units='fpss') # Z accel in body frame ft/sec^2
# Stall
self.mapping.add('stall_warning') # 0.0 - 1.0 indicating the amount of stall
self.mapping.add('slip_deg', units='degrees') # slip ball deflection
# Engine status
self.mapping.add('num_engines') # Number of valid engines
self.mapping.add('eng_state', self.FG_MAX_ENGINES) # Engine state (off, cranking, running)
self.mapping.add('rpm', self.FG_MAX_ENGINES) # Engine RPM rev/min
self.mapping.add('fuel_flow', self.FG_MAX_ENGINES) # Fuel flow gallons/hr
self.mapping.add('fuel_px', self.FG_MAX_ENGINES) # Fuel pressure psi
self.mapping.add('egt', self.FG_MAX_ENGINES) # Exhuast gas temp deg F
self.mapping.add('cht', self.FG_MAX_ENGINES) # Cylinder head temp deg F
self.mapping.add('mp_osi', self.FG_MAX_ENGINES) # Manifold pressure
self.mapping.add('tit', self.FG_MAX_ENGINES) # Turbine Inlet Temperature
self.mapping.add('oil_temp', self.FG_MAX_ENGINES) # Oil temp deg F
self.mapping.add('oil_px', self.FG_MAX_ENGINES) # Oil pressure psi
# Consumables
self.mapping.add('num_tanks') # Max number of fuel tanks
self.mapping.add('fuel_quantity', self.FG_MAX_TANKS)
# Gear status
self.mapping.add('num_wheels')
self.mapping.add('wow', self.FG_MAX_WHEELS)
self.mapping.add('gear_pos', self.FG_MAX_WHEELS)
self.mapping.add('gear_steer', self.FG_MAX_WHEELS)
self.mapping.add('gear_compression', self.FG_MAX_WHEELS)
# Environment
self.mapping.add('cur_time', units='seconds') # current unix time
self.mapping.add('warp', units='seconds') # offset in seconds to unix time
self.mapping.add('visibility', units='meters') # visibility in meters (for env. effects)
# Control surface positions (normalized values)
self.mapping.add('elevator')
self.mapping.add('elevator_trim_tab')
self.mapping.add('left_flap')
self.mapping.add('right_flap')
self.mapping.add('left_aileron')
self.mapping.add('right_aileron')
self.mapping.add('rudder')
self.mapping.add('nose_wheel')
self.mapping.add('speedbrake')
self.mapping.add('spoilers')
self.set('version', self.FG_NET_FDM_VERSION)
self._packet_size = struct.calcsize(self.pack_string)
if len(self.values) != self.mapping._nextidx:
raise fgFDMError('Invalid variable list in initialisation')
def packet_size(self):
'''return expected size of FG FDM packets'''
return self._packet_size
def convert(self, value, fromunits, tounits):
'''convert a value from one set of units to another'''
if fromunits == tounits:
return value
if (fromunits,tounits) in self.unitmap:
return value * self.unitmap[(fromunits,tounits)]
if (tounits,fromunits) in self.unitmap:
return value / self.unitmap[(tounits,fromunits)]
raise fgFDMError("unknown unit mapping (%s,%s)" % (fromunits, tounits))
def units(self, varname):
'''return the default units of a variable'''
if not varname in self.mapping.vars:
raise fgFDMError('Unknown variable %s' % varname)
return self.mapping.vars[varname].units
def variables(self):
'''return a list of available variables'''
return sorted(self.mapping.vars.keys(),
key = lambda v : self.mapping.vars[v].index)
def get(self, varname, idx=0, units=None):
'''get a variable value'''
if not varname in self.mapping.vars:
raise fgFDMError('Unknown variable %s' % varname)
if idx >= self.mapping.vars[varname].arraylength:
raise fgFDMError('index of %s beyond end of array idx=%u arraylength=%u' % (
varname, idx, self.mapping.vars[varname].arraylength))
value = self.values[self.mapping.vars[varname].index + idx]
if units:
value = self.convert(value, self.mapping.vars[varname].units, units)
return value
def set(self, varname, value, idx=0, units=None):
'''set a variable value'''
if not varname in self.mapping.vars:
raise fgFDMError('Unknown variable %s' % varname)
if idx >= self.mapping.vars[varname].arraylength:
raise fgFDMError('index of %s beyond end of array idx=%u arraylength=%u' % (
varname, idx, self.mapping.vars[varname].arraylength))
if units:
value = self.convert(value, units, self.mapping.vars[varname].units)
self.values[self.mapping.vars[varname].index + idx] = value
def parse(self, buf):
'''parse a FD FDM buffer'''
try:
t = struct.unpack(self.pack_string, buf)
except struct.error, msg:
raise fgFDMError('unable to parse - %s' % msg)
self.values = list(t)
def pack(self):
'''pack a FD FDM buffer from current values'''
for i in range(len(self.values)):
if math.isnan(self.values[i]):
self.values[i] = 0
return struct.pack(self.pack_string, *self.values)

View File

@ -0,0 +1,75 @@
#!/usr/bin/env python
import socket, struct, time, math
import fgFDM
class udp_socket(object):
'''a UDP socket'''
def __init__(self, device, blocking=True, input=True):
a = device.split(':')
if len(a) != 2:
print("UDP ports must be specified as host:port")
sys.exit(1)
self.port = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
if input:
self.port.bind((a[0], int(a[1])))
self.destination_addr = None
else:
self.destination_addr = (a[0], int(a[1]))
if not blocking:
self.port.setblocking(0)
self.last_address = None
def recv(self,n=1000):
try:
data, self.last_address = self.port.recvfrom(n)
except socket.error as e:
if e.errno in [ 11, 35 ]:
return ""
raise
return data
def write(self, buf):
try:
if self.destination_addr:
self.port.sendto(buf, self.destination_addr)
else:
self.port.sendto(buf, self.last_addr)
except socket.error:
pass
def ft2m(x):
return x * 0.3048
def m2ft(x):
return x / 0.3048
def kt2mps(x):
return x * 0.514444444
def mps2kt(x):
return x / 0.514444444
udp = udp_socket("127.0.0.1:5123")
fgout = udp_socket("127.0.0.1:5124", input=False)
tlast = time.time()
count = 0
fg = fgFDM.fgFDM()
while True:
buf = udp.recv(1000)
fg.parse(buf)
fgout.write(fg.pack())
count += 1
if time.time() - tlast > 1.0:
print("%u FPS len=%u" % (count, len(buf)))
count = 0
tlast = time.time()
print(fg.get('latitude', units='degrees'),
fg.get('longitude', units='degrees'),
fg.get('altitude', units='meters'),
fg.get('vcas', units='mps'))

View File

@ -0,0 +1,34 @@
#!/usr/bin/env python
'''
useful extra functions for use by mavlink clients
Copyright Andrew Tridgell 2011
Released under GNU GPL version 3 or later
'''
from math import *
def norm_heading(RAW_IMU, ATTITUDE, declination):
'''calculate heading from RAW_IMU and ATTITUDE'''
xmag = RAW_IMU.xmag
ymag = RAW_IMU.ymag
zmag = RAW_IMU.zmag
pitch = ATTITUDE.pitch
roll = ATTITUDE.roll
headX = xmag*cos(pitch) + ymag*sin(roll)*sin(pitch) + zmag*cos(roll)*sin(pitch)
headY = ymag*cos(roll) - zmag*sin(roll)
heading = atan2(-headY, headX)
heading = fmod(degrees(heading) + declination + 360, 360)
return heading
def TrueHeading(SERVO_OUTPUT_RAW):
rc3_min = 1060
rc3_max = 1850
p = float(SERVO_OUTPUT_RAW.servo3_raw - rc3_min) / (rc3_max - rc3_min)
return 172 + (1.0-p)*(326 - 172)
def kmh(mps):
'''convert m/s to Km/h'''
return mps*3.6

View File

@ -0,0 +1,88 @@
#!/usr/bin/env python
from aircraft import Aircraft
import euclid, util, time
class QuadCopter(Aircraft):
'''a QuadCopter'''
def __init__(self):
Aircraft.__init__(self)
self.motor_speed = [ 0.0, 0.0, 0.0, 0.0 ]
self.mass = 1.0 # Kg
self.hover_throttle = 0.37
self.terminal_velocity = 30.0
self.frame_height = 0.1
# scaling from total motor power to Newtons. Allows the copter
# to hover against gravity when each motor is at hover_throttle
self.thrust_scale = (self.mass * self.gravity) / (4.0 * self.hover_throttle)
self.last_time = time.time()
def update(self, servos):
for i in range(0, 4):
if servos[i] <= 0.0:
self.motor_speed[i] = 0
else:
self.motor_speed[i] = servos[i]
m = self.motor_speed
# how much time has passed?
t = time.time()
delta_time = t - self.last_time
self.last_time = t
# rotational acceleration, in degrees/s/s
roll_accel = (m[1] - m[0]) * 5000.0
pitch_accel = (m[2] - m[3]) * 5000.0
yaw_accel = -((m[2]+m[3]) - (m[0]+m[1])) * 400.0
# update rotational rates
self.roll_rate += roll_accel * delta_time
self.pitch_rate += pitch_accel * delta_time
self.yaw_rate += yaw_accel * delta_time
# update rotation
self.roll += self.roll_rate * delta_time
self.pitch += self.pitch_rate * delta_time
self.yaw += self.yaw_rate * delta_time
# air resistance
air_resistance = - self.velocity * (self.gravity/self.terminal_velocity)
# normalise rotations
self.normalise()
thrust = (m[0] + m[1] + m[2] + m[3]) * self.thrust_scale # Newtons
accel = thrust / self.mass
accel3D = util.RPY_to_XYZ(self.roll, self.pitch, self.yaw, accel)
accel3D += euclid.Vector3(0, 0, -self.gravity)
accel3D += air_resistance
# new velocity vector
self.velocity += accel3D * delta_time
self.accel = accel3D
# new position vector
old_position = self.position.copy()
self.position += self.velocity * delta_time
# constrain height to the ground
if self.position.z + self.home_altitude < self.ground_level + self.frame_height:
if old_position.z + self.home_altitude > self.ground_level + self.frame_height:
print("Hit ground at %f m/s" % (-self.velocity.z))
self.velocity = euclid.Vector3(0, 0, 0)
self.roll_rate = 0
self.pitch_rate = 0
self.yaw_rate = 0
self.roll = 0
self.pitch = 0
self.accel = euclid.Vector3(0, 0, 0)
self.position = euclid.Vector3(self.position.x, self.position.y,
self.ground_level + self.frame_height - self.home_altitude)
# update lat/lon/altitude
self.update_position()

172
Tools/autotest/pysim/sim_quad.py Executable file
View File

@ -0,0 +1,172 @@
#!/usr/bin/env python
from quadcopter import QuadCopter
import euclid, util, time, os, sys, math
import socket, struct
import select, fgFDM
# find the mavlink.py module
for d in [ 'pymavlink',
os.path.join(os.path.dirname(os.path.realpath(__file__)), '../pymavlink') ]:
if os.path.exists(d):
sys.path.insert(0, d)
import mavlink
def sim_send(m, a, r):
'''send flight information to mavproxy and flightgear'''
global fdm
fdm.set('latitude', a.latitude, units='degrees')
fdm.set('longitude', a.longitude, units='degrees')
fdm.set('altitude', a.altitude, units='meters')
fdm.set('phi', a.roll, units='degrees')
fdm.set('theta', a.pitch, units='degrees')
fdm.set('psi', a.yaw, units='degrees')
fdm.set('phidot', a.roll_rate, units='dps')
fdm.set('thetadot', a.pitch_rate, units='dps')
fdm.set('psidot', a.yaw_rate, units='dps')
fdm.set('vcas', math.sqrt(a.velocity.x*a.velocity.x + a.velocity.y*a.velocity.y), units='mps')
fdm.set('v_north', a.velocity.x, units='mps')
fdm.set('v_east', a.velocity.y, units='mps')
fdm.set('num_engines', 4)
for i in range(4):
fdm.set('rpm', 1000*m[i], idx=i)
fg_out.send(fdm.pack())
buf = struct.pack('>ddddddddddddddddI',
a.latitude, a.longitude, util.m2ft(a.altitude), a.yaw,
util.m2ft(a.velocity.x), util.m2ft(a.velocity.y),
util.m2ft(a.accelerometer.x), util.m2ft(a.accelerometer.y), util.m2ft(a.accelerometer.z),
a.roll_rate, a.pitch_rate, a.yaw_rate,
a.roll, a.pitch, a.yaw,
util.m2ft(math.sqrt(a.velocity.x*a.velocity.x + a.velocity.y*a.velocity.y)),
0x4c56414d)
sim_out.send(buf)
def sim_recv(m, a, r):
'''receive control information from SITL'''
while True:
fd = sim_in.fileno()
rin = [fd]
try:
(rin, win, xin) = select.select(rin, [], [], 1.0)
except select.error:
util.check_parent()
continue
if fd in rin:
break
util.check_parent()
buf = sim_in.recv(32)
if len(buf) != 32:
return
(m0, m1, m2, m3,
r[0], r[1], r[2], r[3], r[4], r[5], r[6], r[7]) = struct.unpack('>ffffHHHHHHHH', buf)
m[0] = m0
m[1] = m1
m[2] = m2
m[3] = m3
def interpret_address(addrstr):
'''interpret a IP:port string'''
a = addrstr.split(':')
a[1] = int(a[1])
return tuple(a)
##################
# main program
from optparse import OptionParser
parser = OptionParser("sim_quad.py [options]")
parser.add_option("--fgout", dest="fgout", help="flightgear output (IP:port)", default="127.0.0.1:5503")
parser.add_option("--simin", dest="simin", help="SIM input (IP:port)", default="127.0.0.1:5502")
parser.add_option("--simout", dest="simout", help="SIM output (IP:port)", default="127.0.0.1:5501")
parser.add_option("--home", dest="home", type='string', default=None, help="home lat,lng,alt,hdg (required)")
parser.add_option("--rate", dest="rate", type='int', help="SIM update rate", default=50)
(opts, args) = parser.parse_args()
for m in [ 'home' ]:
if not opts.__dict__[m]:
print("Missing required option '%s'" % m)
parser.print_help()
sys.exit(1)
parent_pid = os.getppid()
# UDP socket addresses
fg_out_address = interpret_address(opts.fgout)
sim_out_address = interpret_address(opts.simout)
sim_in_address = interpret_address(opts.simin)
# setup output to flightgear
fg_out = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
fg_out.connect(fg_out_address)
fg_out.setblocking(0)
# setup input from SITL
sim_in = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sim_in.bind(sim_in_address)
sim_in.setblocking(0)
# setup output to SITL
sim_out = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sim_out.connect(sim_out_address)
sim_out.setblocking(0)
# FG FDM object
fdm = fgFDM.fgFDM()
# create the quadcopter model
a = QuadCopter()
a.update_frequency = opts.rate
# motors initially off
m = [0, 0, 0, 0]
# raw PWM
r = [0, 0, 0, 0, 0, 0, 0, 0]
lastt = time.time()
frame_count = 0
# parse home
v = opts.home.split(',')
if len(v) != 4:
print("home should be lat,lng,alt,hdg")
sys.exit(1)
a.home_latitude = float(v[0])
a.home_longitude = float(v[1])
a.home_altitude = float(v[2])
a.latitude = a.home_latitude
a.longitude = a.home_longitude
a.altitude = a.home_altitude
a.yaw = float(v[3])
a.ground_level = a.home_altitude
a.position.z = 0
print("Starting at lat=%f lon=%f alt=%.1f heading=%.1f" % (
a.home_latitude,
a.home_longitude,
a.home_altitude,
a.yaw))
while True:
sim_recv(m, a, r)
# allow for adding inbalance in flight
m2 = m[:]
a.update(m2)
sim_send(m, a, r)
frame_count += 1
t = time.time()
if t - lastt > 1.0:
print("%.2f fps zspeed=%.2f zaccel=%.2f h=%.1f a=%.1f yaw=%.1f yawrate=%.1f" % (
frame_count/(t-lastt),
a.velocity.z, a.accel.z, a.position.z, a.altitude,
a.yaw, a.yaw_rate))
lastt = t
frame_count = 0