removed copy of pymavlink code

use upstream code instead
This commit is contained in:
Andrew Tridgell 2012-06-04 17:49:18 +10:00
parent 7589de689b
commit 47d5f31fc2
10 changed files with 3 additions and 12294 deletions

View File

@ -6,7 +6,7 @@ import pexpect, os, sys, shutil, atexit
import optparse, fnmatch, time, glob, traceback import optparse, fnmatch, time, glob, traceback
sys.path.insert(0, os.path.join(os.path.dirname(os.path.realpath(__file__)), 'pysim')) sys.path.insert(0, os.path.join(os.path.dirname(os.path.realpath(__file__)), 'pysim'))
sys.path.insert(0, os.path.join(os.path.dirname(os.path.realpath(__file__)), 'pymavlink')) sys.path.insert(0, os.path.join(os.path.dirname(os.path.realpath(__file__)), '..', '..', '..', 'mavlink', 'pymavlink'))
import util import util
os.environ['PYTHONUNBUFFERED'] = '1' os.environ['PYTHONUNBUFFERED'] = '1'

View File

@ -5,7 +5,7 @@ import sys, os, pexpect, fdpexpect, socket
import math, time, select, struct, signal, errno import math, time, select, struct, signal, errno
sys.path.insert(0, os.path.join(os.path.dirname(os.path.realpath(__file__)), '..', 'pysim')) sys.path.insert(0, os.path.join(os.path.dirname(os.path.realpath(__file__)), '..', 'pysim'))
sys.path.insert(0, os.path.join(os.path.dirname(os.path.realpath(__file__)), '..', 'pymavlink')) sys.path.insert(0, os.path.join(os.path.dirname(os.path.realpath(__file__)), '..', '..', '..', '..', 'mavlink', 'pymavlink'))
import util, fgFDM, atexit import util, fgFDM, atexit

View File

@ -1,209 +0,0 @@
#!/usr/bin/env python
# parse and construct FlightGear NET FDM packets
# Andrew Tridgell, November 2011
# released under GNU GPL version 2 or later
import struct, math
class fgFDMError(Exception):
'''fgFDM error class'''
def __init__(self, msg):
Exception.__init__(self, msg)
self.message = 'fgFDMError: ' + msg
class fgFDMVariable(object):
'''represent a single fgFDM variable'''
def __init__(self, index, arraylength, units):
self.index = index
self.arraylength = arraylength
self.units = units
class fgFDMVariableList(object):
'''represent a list of fgFDM variable'''
def __init__(self):
self.vars = {}
self._nextidx = 0
def add(self, varname, arraylength=1, units=None):
self.vars[varname] = fgFDMVariable(self._nextidx, arraylength, units=units)
self._nextidx += arraylength
class fgFDM(object):
'''a flightgear native FDM parser/generator'''
def __init__(self):
'''init a fgFDM object'''
self.FG_NET_FDM_VERSION = 24
self.pack_string = '>I 4x 3d 6f 11f 3f 2f I 4I 4f 4f 4f 4f 4f 4f 4f 4f 4f I 4f I 3I 3f 3f 3f I i f 10f'
self.values = [0]*98
self.FG_MAX_ENGINES = 4
self.FG_MAX_WHEELS = 3
self.FG_MAX_TANKS = 4
# supported unit mappings
self.unitmap = {
('radians', 'degrees') : math.degrees(1),
('rps', 'dps') : math.degrees(1),
('feet', 'meters') : 0.3048,
('fps', 'mps') : 0.3048,
('knots', 'mps') : 0.514444444,
('knots', 'fps') : 0.514444444/0.3048,
('fpss', 'mpss') : 0.3048,
('seconds', 'minutes') : 60,
('seconds', 'hours') : 3600,
}
# build a mapping between variable name and index in the values array
# note that the order of this initialisation is critical - it must
# match the wire structure
self.mapping = fgFDMVariableList()
self.mapping.add('version')
# position
self.mapping.add('longitude', units='radians') # geodetic (radians)
self.mapping.add('latitude', units='radians') # geodetic (radians)
self.mapping.add('altitude', units='meters') # above sea level (meters)
self.mapping.add('agl', units='meters') # above ground level (meters)
# attitude
self.mapping.add('phi', units='radians') # roll (radians)
self.mapping.add('theta', units='radians') # pitch (radians)
self.mapping.add('psi', units='radians') # yaw or true heading (radians)
self.mapping.add('alpha', units='radians') # angle of attack (radians)
self.mapping.add('beta', units='radians') # side slip angle (radians)
# Velocities
self.mapping.add('phidot', units='rps') # roll rate (radians/sec)
self.mapping.add('thetadot', units='rps') # pitch rate (radians/sec)
self.mapping.add('psidot', units='rps') # yaw rate (radians/sec)
self.mapping.add('vcas', units='fps') # calibrated airspeed
self.mapping.add('climb_rate', units='fps') # feet per second
self.mapping.add('v_north', units='fps') # north velocity in local/body frame, fps
self.mapping.add('v_east', units='fps') # east velocity in local/body frame, fps
self.mapping.add('v_down', units='fps') # down/vertical velocity in local/body frame, fps
self.mapping.add('v_wind_body_north', units='fps') # north velocity in local/body frame
self.mapping.add('v_wind_body_east', units='fps') # east velocity in local/body frame
self.mapping.add('v_wind_body_down', units='fps') # down/vertical velocity in local/body
# Accelerations
self.mapping.add('A_X_pilot', units='fpss') # X accel in body frame ft/sec^2
self.mapping.add('A_Y_pilot', units='fpss') # Y accel in body frame ft/sec^2
self.mapping.add('A_Z_pilot', units='fpss') # Z accel in body frame ft/sec^2
# Stall
self.mapping.add('stall_warning') # 0.0 - 1.0 indicating the amount of stall
self.mapping.add('slip_deg', units='degrees') # slip ball deflection
# Engine status
self.mapping.add('num_engines') # Number of valid engines
self.mapping.add('eng_state', self.FG_MAX_ENGINES) # Engine state (off, cranking, running)
self.mapping.add('rpm', self.FG_MAX_ENGINES) # Engine RPM rev/min
self.mapping.add('fuel_flow', self.FG_MAX_ENGINES) # Fuel flow gallons/hr
self.mapping.add('fuel_px', self.FG_MAX_ENGINES) # Fuel pressure psi
self.mapping.add('egt', self.FG_MAX_ENGINES) # Exhuast gas temp deg F
self.mapping.add('cht', self.FG_MAX_ENGINES) # Cylinder head temp deg F
self.mapping.add('mp_osi', self.FG_MAX_ENGINES) # Manifold pressure
self.mapping.add('tit', self.FG_MAX_ENGINES) # Turbine Inlet Temperature
self.mapping.add('oil_temp', self.FG_MAX_ENGINES) # Oil temp deg F
self.mapping.add('oil_px', self.FG_MAX_ENGINES) # Oil pressure psi
# Consumables
self.mapping.add('num_tanks') # Max number of fuel tanks
self.mapping.add('fuel_quantity', self.FG_MAX_TANKS)
# Gear status
self.mapping.add('num_wheels')
self.mapping.add('wow', self.FG_MAX_WHEELS)
self.mapping.add('gear_pos', self.FG_MAX_WHEELS)
self.mapping.add('gear_steer', self.FG_MAX_WHEELS)
self.mapping.add('gear_compression', self.FG_MAX_WHEELS)
# Environment
self.mapping.add('cur_time', units='seconds') # current unix time
self.mapping.add('warp', units='seconds') # offset in seconds to unix time
self.mapping.add('visibility', units='meters') # visibility in meters (for env. effects)
# Control surface positions (normalized values)
self.mapping.add('elevator')
self.mapping.add('elevator_trim_tab')
self.mapping.add('left_flap')
self.mapping.add('right_flap')
self.mapping.add('left_aileron')
self.mapping.add('right_aileron')
self.mapping.add('rudder')
self.mapping.add('nose_wheel')
self.mapping.add('speedbrake')
self.mapping.add('spoilers')
self._packet_size = struct.calcsize(self.pack_string)
self.set('version', self.FG_NET_FDM_VERSION)
if len(self.values) != self.mapping._nextidx:
raise fgFDMError('Invalid variable list in initialisation')
def packet_size(self):
'''return expected size of FG FDM packets'''
return self._packet_size
def convert(self, value, fromunits, tounits):
'''convert a value from one set of units to another'''
if fromunits == tounits:
return value
if (fromunits,tounits) in self.unitmap:
return value * self.unitmap[(fromunits,tounits)]
if (tounits,fromunits) in self.unitmap:
return value / self.unitmap[(tounits,fromunits)]
raise fgFDMError("unknown unit mapping (%s,%s)" % (fromunits, tounits))
def units(self, varname):
'''return the default units of a variable'''
if not varname in self.mapping.vars:
raise fgFDMError('Unknown variable %s' % varname)
return self.mapping.vars[varname].units
def variables(self):
'''return a list of available variables'''
return sorted(self.mapping.vars.keys(),
key = lambda v : self.mapping.vars[v].index)
def get(self, varname, idx=0, units=None):
'''get a variable value'''
if not varname in self.mapping.vars:
raise fgFDMError('Unknown variable %s' % varname)
if idx >= self.mapping.vars[varname].arraylength:
raise fgFDMError('index of %s beyond end of array idx=%u arraylength=%u' % (
varname, idx, self.mapping.vars[varname].arraylength))
value = self.values[self.mapping.vars[varname].index + idx]
if units:
value = self.convert(value, self.mapping.vars[varname].units, units)
return value
def set(self, varname, value, idx=0, units=None):
'''set a variable value'''
if not varname in self.mapping.vars:
raise fgFDMError('Unknown variable %s' % varname)
if idx >= self.mapping.vars[varname].arraylength:
raise fgFDMError('index of %s beyond end of array idx=%u arraylength=%u' % (
varname, idx, self.mapping.vars[varname].arraylength))
if units:
value = self.convert(value, units, self.mapping.vars[varname].units)
self.values[self.mapping.vars[varname].index + idx] = value
def parse(self, buf):
'''parse a FD FDM buffer'''
try:
t = struct.unpack(self.pack_string, buf)
except struct.error, msg:
raise fgFDMError('unable to parse - %s' % msg)
self.values = list(t)
def pack(self):
'''pack a FD FDM buffer from current values'''
for i in range(len(self.values)):
if math.isnan(self.values[i]):
self.values[i] = 0
return struct.pack(self.pack_string, *self.values)

View File

@ -1,199 +0,0 @@
#!/usr/bin/env python
'''
useful extra functions for use by mavlink clients
Copyright Andrew Tridgell 2011
Released under GNU GPL version 3 or later
'''
from math import *
def kmh(mps):
'''convert m/s to Km/h'''
return mps*3.6
def altitude(SCALED_PRESSURE):
'''calculate barometric altitude'''
import mavutil
self = mavutil.mavfile_global
if self.param('GND_ABS_PRESS', None) is None:
return 0
scaling = self.param('GND_ABS_PRESS', 1) / (SCALED_PRESSURE.press_abs*100.0)
temp = self.param('GND_TEMP', 0) + 273.15
return log(scaling) * temp * 29271.267 * 0.001
def mag_heading(RAW_IMU, ATTITUDE, declination=0, SENSOR_OFFSETS=None, ofs=None):
'''calculate heading from raw magnetometer'''
mag_x = RAW_IMU.xmag
mag_y = RAW_IMU.ymag
mag_z = RAW_IMU.zmag
if SENSOR_OFFSETS is not None and ofs is not None:
mag_x += ofs[0] - SENSOR_OFFSETS.mag_ofs_x
mag_y += ofs[1] - SENSOR_OFFSETS.mag_ofs_y
mag_z += ofs[2] - SENSOR_OFFSETS.mag_ofs_z
headX = mag_x*cos(ATTITUDE.pitch) + mag_y*sin(ATTITUDE.roll)*sin(ATTITUDE.pitch) + mag_z*cos(ATTITUDE.roll)*sin(ATTITUDE.pitch)
headY = mag_y*cos(ATTITUDE.roll) - mag_z*sin(ATTITUDE.roll)
heading = degrees(atan2(-headY,headX)) + declination
if heading < 0:
heading += 360
return heading
def mag_field(RAW_IMU, SENSOR_OFFSETS=None, ofs=None):
'''calculate magnetic field strength from raw magnetometer'''
mag_x = RAW_IMU.xmag
mag_y = RAW_IMU.ymag
mag_z = RAW_IMU.zmag
if SENSOR_OFFSETS is not None and ofs is not None:
mag_x += ofs[0] - SENSOR_OFFSETS.mag_ofs_x
mag_y += ofs[1] - SENSOR_OFFSETS.mag_ofs_y
mag_z += ofs[2] - SENSOR_OFFSETS.mag_ofs_z
return sqrt(mag_x**2 + mag_y**2 + mag_z**2)
def angle_diff(angle1, angle2):
'''show the difference between two angles in degrees'''
ret = angle1 - angle2
if ret > 180:
ret -= 360;
if ret < -180:
ret += 360
return ret
lowpass_data = {}
def lowpass(var, key, factor):
'''a simple lowpass filter'''
global lowpass_data
if not key in lowpass_data:
lowpass_data[key] = var
else:
lowpass_data[key] = factor*lowpass_data[key] + (1.0 - factor)*var
return lowpass_data[key]
last_delta = {}
def delta(var, key):
'''calculate slope'''
global last_delta
dv = 0
if key in last_delta:
dv = var - last_delta[key]
last_delta[key] = var
return dv
def delta_angle(var, key):
'''calculate slope of an angle'''
global last_delta
dv = 0
if key in last_delta:
dv = var - last_delta[key]
last_delta[key] = var
if dv > 180:
dv -= 360
if dv < -180:
dv += 360
return dv
def roll_estimate(RAW_IMU,SENSOR_OFFSETS=None, ofs=None, mul=None,smooth=0.7):
'''estimate roll from accelerometer'''
rx = RAW_IMU.xacc * 9.81 / 1000.0
ry = RAW_IMU.yacc * 9.81 / 1000.0
rz = RAW_IMU.zacc * 9.81 / 1000.0
if SENSOR_OFFSETS is not None and ofs is not None:
rx += SENSOR_OFFSETS.accel_cal_x
ry += SENSOR_OFFSETS.accel_cal_y
rz += SENSOR_OFFSETS.accel_cal_z
rx -= ofs[0]
ry -= ofs[1]
rz -= ofs[2]
if mul is not None:
rx *= mul[0]
ry *= mul[1]
rz *= mul[2]
return lowpass(degrees(-asin(ry/sqrt(rx**2+ry**2+rz**2))),'_roll',smooth)
def pitch_estimate(RAW_IMU, SENSOR_OFFSETS=None, ofs=None, mul=None, smooth=0.7):
'''estimate pitch from accelerometer'''
rx = RAW_IMU.xacc * 9.81 / 1000.0
ry = RAW_IMU.yacc * 9.81 / 1000.0
rz = RAW_IMU.zacc * 9.81 / 1000.0
if SENSOR_OFFSETS is not None and ofs is not None:
rx += SENSOR_OFFSETS.accel_cal_x
ry += SENSOR_OFFSETS.accel_cal_y
rz += SENSOR_OFFSETS.accel_cal_z
rx -= ofs[0]
ry -= ofs[1]
rz -= ofs[2]
if mul is not None:
rx *= mul[0]
ry *= mul[1]
rz *= mul[2]
return lowpass(degrees(asin(rx/sqrt(rx**2+ry**2+rz**2))),'_pitch',smooth)
def gravity(RAW_IMU, SENSOR_OFFSETS=None, ofs=None, mul=None, smooth=0.7):
'''estimate pitch from accelerometer'''
rx = RAW_IMU.xacc * 9.81 / 1000.0
ry = RAW_IMU.yacc * 9.81 / 1000.0
rz = RAW_IMU.zacc * 9.81 / 1000.0
if SENSOR_OFFSETS is not None and ofs is not None:
rx += SENSOR_OFFSETS.accel_cal_x
ry += SENSOR_OFFSETS.accel_cal_y
rz += SENSOR_OFFSETS.accel_cal_z
rx -= ofs[0]
ry -= ofs[1]
rz -= ofs[2]
if mul is not None:
rx *= mul[0]
ry *= mul[1]
rz *= mul[2]
return lowpass(sqrt(rx**2+ry**2+rz**2),'_gravity',smooth)
def pitch_sim(SIMSTATE, GPS_RAW):
'''estimate pitch from SIMSTATE accels'''
xacc = SIMSTATE.xacc - lowpass(delta(GPS_RAW.v,"v")*6.6, "v", 0.9)
zacc = SIMSTATE.zacc
zacc += SIMSTATE.ygyro * GPS_RAW.v;
if xacc/zacc >= 1:
return 0
if xacc/zacc <= -1:
return -0
return degrees(-asin(xacc/zacc))
def distance_two(GPS_RAW1, GPS_RAW2):
'''distance between two points'''
lat1 = radians(GPS_RAW1.lat)
lat2 = radians(GPS_RAW2.lat)
lon1 = radians(GPS_RAW1.lon)
lon2 = radians(GPS_RAW2.lon)
dLat = lat2 - lat1
dLon = lon2 - lon1
a = sin(0.5*dLat)**2 + sin(0.5*dLon)**2 * cos(lat1) * cos(lat2)
c = 2.0 * atan2(sqrt(a), sqrt(1.0-a))
return 6371 * 1000 * c
first_fix = None
def distance_home(GPS_RAW):
'''distance from first fix point'''
global first_fix
if GPS_RAW.fix_type < 2:
return 0
if first_fix == None or first_fix.fix_type < 2:
first_fix = GPS_RAW
return 0
return distance_two(GPS_RAW, first_fix)
def sawtooth(ATTITUDE, amplitude=2.0, period=5.0):
'''sawtooth pattern based on uptime'''
mins = (ATTITUDE.usec * 1.0e-6)/60
p = fmod(mins, period*2)
if p < period:
return amplitude * (p/period)
return amplitude * (period - (p-period))/period

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -1,889 +0,0 @@
#!/usr/bin/env python
'''
mavlink python utility functions
Copyright Andrew Tridgell 2011
Released under GNU GPL version 3 or later
'''
import socket, math, struct, time, os, fnmatch, array, sys, errno, fcntl
from math import *
from mavextra import *
if os.getenv('MAVLINK10') or 'MAVLINK10' in os.environ:
import mavlinkv10 as mavlink
else:
import mavlink
def evaluate_expression(expression, vars):
'''evaluation an expression'''
try:
v = eval(expression, globals(), vars)
except NameError:
return None
return v
def evaluate_condition(condition, vars):
'''evaluation a conditional (boolean) statement'''
if condition is None:
return True
v = evaluate_expression(condition, vars)
if v is None:
return False
return v
mavfile_global = None
class location(object):
'''represent a GPS coordinate'''
def __init__(self, lat, lng, alt=0, heading=0):
self.lat = lat
self.lng = lng
self.alt = alt
self.heading = heading
def __str__(self):
return "lat=%.6f,lon=%.6f,alt=%.1f" % (self.lat, self.lng, self.alt)
class mavfile(object):
'''a generic mavlink port'''
def __init__(self, fd, address, source_system=255, notimestamps=False, input=True):
global mavfile_global
if input:
mavfile_global = self
self.fd = fd
self.address = address
self.messages = { 'MAV' : self }
if mavlink.WIRE_PROTOCOL_VERSION == "1.0":
self.messages['HOME'] = mavlink.MAVLink_gps_raw_int_message(0,0,0,0,0,0,0,0,0,0)
mavlink.MAVLink_waypoint_message = mavlink.MAVLink_mission_item_message
else:
self.messages['HOME'] = mavlink.MAVLink_gps_raw_message(0,0,0,0,0,0,0,0,0)
self.params = {}
self.target_system = 0
self.target_component = 0
self.source_system = source_system
self.first_byte = True
self.robust_parsing = True
self.mav = mavlink.MAVLink(self, srcSystem=self.source_system)
self.mav.robust_parsing = self.robust_parsing
self.logfile = None
self.logfile_raw = None
self.param_fetch_in_progress = False
self.param_fetch_complete = False
self.start_time = time.time()
self.flightmode = "UNKNOWN"
self.timestamp = 0
self.message_hooks = []
self.idle_hooks = []
self.uptime = 0.0
self.notimestamps = notimestamps
self._timestamp = None
self.ground_pressure = None
self.ground_temperature = None
self.altitude = 0
self.WIRE_PROTOCOL_VERSION = mavlink.WIRE_PROTOCOL_VERSION
self.last_seq = -1
self.mav_loss = 0
self.mav_count = 0
self.stop_on_EOF = False
def auto_mavlink_version(self, buf):
'''auto-switch mavlink protocol version'''
global mavlink
if len(buf) == 0:
return
if not ord(buf[0]) in [ 85, 254 ]:
return
self.first_byte = False
if self.WIRE_PROTOCOL_VERSION == "0.9" and ord(buf[0]) == 254:
import mavlinkv10 as mavlink
os.environ['MAVLINK10'] = '1'
elif self.WIRE_PROTOCOL_VERSION == "1.0" and ord(buf[0]) == 85:
import mavlink as mavlink
else:
return
# switch protocol
(callback, callback_args, callback_kwargs) = (self.mav.callback,
self.mav.callback_args,
self.mav.callback_kwargs)
self.mav = mavlink.MAVLink(self, srcSystem=self.source_system)
self.mav.robust_parsing = self.robust_parsing
self.WIRE_PROTOCOL_VERSION = mavlink.WIRE_PROTOCOL_VERSION
(self.mav.callback, self.mav.callback_args, self.mav.callback_kwargs) = (callback,
callback_args,
callback_kwargs)
def recv(self, n=None):
'''default recv method'''
raise RuntimeError('no recv() method supplied')
def close(self, n=None):
'''default close method'''
raise RuntimeError('no close() method supplied')
def write(self, buf):
'''default write method'''
raise RuntimeError('no write() method supplied')
def pre_message(self):
'''default pre message call'''
return
def post_message(self, msg):
'''default post message call'''
if '_posted' in msg.__dict__:
return
msg._posted = True
msg._timestamp = time.time()
type = msg.get_type()
self.messages[type] = msg
if 'usec' in msg.__dict__:
self.uptime = msg.usec * 1.0e-6
if self._timestamp is not None:
if self.notimestamps:
msg._timestamp = self.uptime
else:
msg._timestamp = self._timestamp
if not (
# its the radio or planner
(msg.get_srcSystem() == ord('3') and msg.get_srcComponent() == ord('D')) or
msg.get_srcSystem() == 255 or
msg.get_type() == 'BAD_DATA'):
seq = (self.last_seq+1) % 256
seq2 = msg.get_seq()
if seq != seq2 and self.last_seq != -1:
diff = (seq2 - seq) % 256
self.mav_loss += diff
self.last_seq = seq2
self.mav_count += 1
self.timestamp = msg._timestamp
if type == 'HEARTBEAT':
self.target_system = msg.get_srcSystem()
self.target_component = msg.get_srcComponent()
if mavlink.WIRE_PROTOCOL_VERSION == '1.0':
self.flightmode = mode_string_v10(msg)
elif type == 'PARAM_VALUE':
s = str(msg.param_id)
self.params[str(msg.param_id)] = msg.param_value
if msg.param_index+1 == msg.param_count:
self.param_fetch_in_progress = False
self.param_fetch_complete = True
elif type == 'SYS_STATUS' and mavlink.WIRE_PROTOCOL_VERSION == '0.9':
self.flightmode = mode_string_v09(msg)
elif type == 'GPS_RAW':
if self.messages['HOME'].fix_type < 2:
self.messages['HOME'] = msg
elif type == 'GPS_RAW_INT':
if self.messages['HOME'].fix_type < 3:
self.messages['HOME'] = msg
for hook in self.message_hooks:
hook(self, msg)
def packet_loss(self):
'''packet loss as a percentage'''
if self.mav_count == 0:
return 0
return (100.0*self.mav_loss)/(self.mav_count+self.mav_loss)
def recv_msg(self):
'''message receive routine'''
self.pre_message()
while True:
n = self.mav.bytes_needed()
s = self.recv(n)
if len(s) == 0 and (len(self.mav.buf) == 0 or self.stop_on_EOF):
return None
if self.logfile_raw:
self.logfile_raw.write(str(s))
if self.first_byte:
self.auto_mavlink_version(s)
msg = self.mav.parse_char(s)
if msg:
self.post_message(msg)
return msg
def recv_match(self, condition=None, type=None, blocking=False):
'''recv the next MAVLink message that matches the given condition'''
while True:
m = self.recv_msg()
if m is None:
if blocking:
for hook in self.idle_hooks:
hook(self)
time.sleep(0.01)
continue
return None
if type is not None and type != m.get_type():
continue
if not evaluate_condition(condition, self.messages):
continue
return m
def mavlink10(self):
'''return True if using MAVLink 1.0'''
return self.WIRE_PROTOCOL_VERSION == "1.0"
def setup_logfile(self, logfile, mode='w'):
'''start logging to the given logfile, with timestamps'''
self.logfile = open(logfile, mode=mode)
def setup_logfile_raw(self, logfile, mode='w'):
'''start logging raw bytes to the given logfile, without timestamps'''
self.logfile_raw = open(logfile, mode=mode)
def wait_heartbeat(self, blocking=True):
'''wait for a heartbeat so we know the target system IDs'''
return self.recv_match(type='HEARTBEAT', blocking=blocking)
def param_fetch_all(self):
'''initiate fetch of all parameters'''
if time.time() - getattr(self, 'param_fetch_start', 0) < 2.0:
# don't fetch too often
return
self.param_fetch_start = time.time()
self.param_fetch_in_progress = True
self.mav.param_request_list_send(self.target_system, self.target_component)
def time_since(self, mtype):
'''return the time since the last message of type mtype was received'''
if not mtype in self.messages:
return time.time() - self.start_time
return time.time() - self.messages[mtype]._timestamp
def param_set_send(self, parm_name, parm_value, parm_type=None):
'''wrapper for parameter set'''
if self.mavlink10():
if parm_type == None:
parm_type = mavlink.MAVLINK_TYPE_FLOAT
self.mav.param_set_send(self.target_system, self.target_component,
parm_name, parm_value, parm_type)
else:
self.mav.param_set_send(self.target_system, self.target_component,
parm_name, parm_value)
def waypoint_request_list_send(self):
'''wrapper for waypoint_request_list_send'''
if self.mavlink10():
self.mav.mission_request_list_send(self.target_system, self.target_component)
else:
self.mav.waypoint_request_list_send(self.target_system, self.target_component)
def waypoint_clear_all_send(self):
'''wrapper for waypoint_clear_all_send'''
if self.mavlink10():
self.mav.mission_clear_all_send(self.target_system, self.target_component)
else:
self.mav.waypoint_clear_all_send(self.target_system, self.target_component)
def waypoint_request_send(self, seq):
'''wrapper for waypoint_request_send'''
if self.mavlink10():
self.mav.mission_request_send(self.target_system, self.target_component, seq)
else:
self.mav.waypoint_request_send(self.target_system, self.target_component, seq)
def waypoint_set_current_send(self, seq):
'''wrapper for waypoint_set_current_send'''
if self.mavlink10():
self.mav.mission_set_current_send(self.target_system, self.target_component, seq)
else:
self.mav.waypoint_set_current_send(self.target_system, self.target_component, seq)
def waypoint_current(self):
'''return current waypoint'''
if self.mavlink10():
m = self.recv_match(type='MISSION_CURRENT', blocking=True)
else:
m = self.recv_match(type='WAYPOINT_CURRENT', blocking=True)
return m.seq
def waypoint_count_send(self, seq):
'''wrapper for waypoint_count_send'''
if self.mavlink10():
self.mav.mission_count_send(self.target_system, self.target_component, seq)
else:
self.mav.waypoint_count_send(self.target_system, self.target_component, seq)
def set_mode_auto(self):
'''enter auto mode'''
if self.mavlink10():
self.mav.command_long_send(self.target_system, self.target_component,
mavlink.MAV_CMD_MISSION_START, 0, 0, 0, 0, 0, 0, 0, 0)
else:
MAV_ACTION_SET_AUTO = 13
self.mav.action_send(self.target_system, self.target_component, MAV_ACTION_SET_AUTO)
def set_mode_rtl(self):
'''enter RTL mode'''
if self.mavlink10():
self.mav.command_long_send(self.target_system, self.target_component,
mavlink.MAV_CMD_NAV_RETURN_TO_LAUNCH, 0, 0, 0, 0, 0, 0, 0, 0)
else:
MAV_ACTION_RETURN = 3
self.mav.action_send(self.target_system, self.target_component, MAV_ACTION_RETURN)
def set_mode_loiter(self):
'''enter LOITER mode'''
if self.mavlink10():
self.mav.command_long_send(self.target_system, self.target_component,
mavlink.MAV_CMD_NAV_LOITER_UNLIM, 0, 0, 0, 0, 0, 0, 0, 0)
else:
MAV_ACTION_LOITER = 27
self.mav.action_send(self.target_system, self.target_component, MAV_ACTION_LOITER)
def calibrate_imu(self):
'''calibrate IMU'''
if self.mavlink10():
self.mav.command_long_send(self.target_system, self.target_component,
mavlink.MAV_CMD_PREFLIGHT_CALIBRATION, 0,
1, 1, 1, 1, 0, 0, 0)
else:
MAV_ACTION_CALIBRATE_GYRO = 17
self.mav.action_send(self.target_system, self.target_component, MAV_ACTION_CALIBRATE_GYRO)
def calibrate_level(self):
'''calibrate accels'''
if self.mavlink10():
self.mav.command_long_send(self.target_system, self.target_component,
mavlink.MAV_CMD_PREFLIGHT_CALIBRATION, 0,
1, 1, 1, 1, 0, 0, 0)
else:
MAV_ACTION_CALIBRATE_ACC = 19
self.mav.action_send(self.target_system, self.target_component, MAV_ACTION_CALIBRATE_ACC)
def wait_gps_fix(self):
self.recv_match(type='VFR_HUD', blocking=True)
if self.mavlink10():
self.recv_match(type='GPS_RAW_INT', blocking=True,
condition='GPS_RAW_INT.fix_type==3 and GPS_RAW_INT.lat != 0 and GPS_RAW_INT.alt != 0')
else:
self.recv_match(type='GPS_RAW', blocking=True,
condition='GPS_RAW.fix_type==2 and GPS_RAW.lat != 0 and GPS_RAW.alt != 0')
def location(self):
'''return current location'''
self.wait_gps_fix()
# wait for another VFR_HUD, to ensure we have correct altitude
self.recv_match(type='VFR_HUD', blocking=True)
if self.mavlink10():
return location(self.messages['GPS_RAW_INT'].lat*1.0e-7,
self.messages['GPS_RAW_INT'].lon*1.0e-7,
self.messages['VFR_HUD'].alt,
self.messages['VFR_HUD'].heading)
else:
return location(self.messages['GPS_RAW'].lat,
self.messages['GPS_RAW'].lon,
self.messages['VFR_HUD'].alt,
self.messages['VFR_HUD'].heading)
def field(self, type, field, default=None):
'''convenient function for returning an arbitrary MAVLink
field with a default'''
if not type in self.messages:
return default
return getattr(self.messages[type], field, default)
def param(self, name, default=None):
'''convenient function for returning an arbitrary MAVLink
parameter with a default'''
if not name in self.params:
return default
return self.params[name]
class mavserial(mavfile):
'''a serial mavlink port'''
def __init__(self, device, baud=115200, autoreconnect=False, source_system=255):
import serial
self.baud = baud
self.device = device
self.autoreconnect = autoreconnect
self.port = serial.Serial(self.device, self.baud, timeout=0,
dsrdtr=False, rtscts=False, xonxoff=False)
try:
fd = self.port.fileno()
flags = fcntl.fcntl(fd, fcntl.F_GETFD)
flags |= fcntl.FD_CLOEXEC
fcntl.fcntl(fd, fcntl.F_SETFD, flags)
except Exception:
fd = None
mavfile.__init__(self, fd, device, source_system=source_system)
def close(self):
self.port.close()
def recv(self,n=None):
if n is None:
n = self.mav.bytes_needed()
if self.fd is None:
waiting = self.port.inWaiting()
if waiting < n:
n = waiting
return self.port.read(n)
def write(self, buf):
try:
return self.port.write(buf)
except Exception:
if self.autoreconnect:
self.reset()
return -1
def reset(self):
import serial
self.port.close()
while True:
try:
self.port = serial.Serial(self.device, self.baud, timeout=1,
dsrdtr=False, rtscts=False, xonxoff=False)
try:
self.fd = self.port.fileno()
except Exception:
self.fd = None
return
except Exception:
print("Failed to reopen %s" % self.device)
time.sleep(0.5)
class mavudp(mavfile):
'''a UDP mavlink socket'''
def __init__(self, device, input=True, source_system=255):
a = device.split(':')
if len(a) != 2:
print("UDP ports must be specified as host:port")
sys.exit(1)
self.port = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.udp_server = input
if input:
self.port.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.port.bind((a[0], int(a[1])))
else:
self.destination_addr = (a[0], int(a[1]))
flags = fcntl.fcntl(self.port.fileno(), fcntl.F_GETFD)
flags |= fcntl.FD_CLOEXEC
fcntl.fcntl(self.port.fileno(), fcntl.F_SETFD, flags)
self.port.setblocking(0)
self.last_address = None
mavfile.__init__(self, self.port.fileno(), device, source_system=source_system, input=input)
def close(self):
self.port.close()
def recv(self,n=None):
try:
data, self.last_address = self.port.recvfrom(300)
except socket.error as e:
if e.errno in [ errno.EAGAIN, errno.EWOULDBLOCK, errno.ECONNREFUSED ]:
return ""
raise
return data
def write(self, buf):
try:
if self.udp_server:
if self.last_address:
self.port.sendto(buf, self.last_address)
else:
self.port.sendto(buf, self.destination_addr)
except socket.error:
pass
def recv_msg(self):
'''message receive routine for UDP link'''
self.pre_message()
s = self.recv()
if len(s) == 0:
return None
if self.first_byte:
self.auto_mavlink_version(s)
msg = self.mav.parse_buffer(s)
if msg is not None:
for m in msg:
self.post_message(m)
return msg[0]
return None
class mavtcp(mavfile):
'''a TCP mavlink socket'''
def __init__(self, device, source_system=255):
a = device.split(':')
if len(a) != 2:
print("TCP ports must be specified as host:port")
sys.exit(1)
self.port = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.destination_addr = (a[0], int(a[1]))
self.port.connect(self.destination_addr)
self.port.setblocking(0)
flags = fcntl.fcntl(self.port.fileno(), fcntl.F_GETFD)
flags |= fcntl.FD_CLOEXEC
fcntl.fcntl(self.port.fileno(), fcntl.F_SETFD, flags)
self.port.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1)
mavfile.__init__(self, self.port.fileno(), device, source_system=source_system)
def close(self):
self.port.close()
def recv(self,n=None):
if n is None:
n = self.mav.bytes_needed()
try:
data = self.port.recv(n)
except socket.error as e:
if e.errno in [ errno.EAGAIN, errno.EWOULDBLOCK ]:
return ""
raise
return data
def write(self, buf):
try:
self.port.send(buf)
except socket.error:
pass
class mavlogfile(mavfile):
'''a MAVLink logfile reader/writer'''
def __init__(self, filename, planner_format=None,
write=False, append=False,
robust_parsing=True, notimestamps=False, source_system=255):
self.filename = filename
self.writeable = write
self.robust_parsing = robust_parsing
self.planner_format = planner_format
self._two64 = math.pow(2.0, 63)
mode = 'rb'
if self.writeable:
if append:
mode = 'ab'
else:
mode = 'wb'
self.f = open(filename, mode)
self.filesize = os.path.getsize(filename)
self.percent = 0
mavfile.__init__(self, None, filename, source_system=source_system, notimestamps=notimestamps)
if self.notimestamps:
self._timestamp = 0
else:
self._timestamp = time.time()
self.stop_on_EOF = True
def close(self):
self.f.close()
def recv(self,n=None):
if n is None:
n = self.mav.bytes_needed()
return self.f.read(n)
def write(self, buf):
self.f.write(buf)
def pre_message(self):
'''read timestamp if needed'''
# read the timestamp
if self.filesize != 0:
self.percent = (100.0 * self.f.tell()) / self.filesize
if self.notimestamps:
return
if self.planner_format:
tbuf = self.f.read(21)
if len(tbuf) != 21 or tbuf[0] != '-' or tbuf[20] != ':':
raise RuntimeError('bad planner timestamp %s' % tbuf)
hnsec = self._two64 + float(tbuf[0:20])
t = hnsec * 1.0e-7 # convert to seconds
t -= 719163 * 24 * 60 * 60 # convert to 1970 base
self._link = 0
else:
tbuf = self.f.read(8)
if len(tbuf) != 8:
return
(tusec,) = struct.unpack('>Q', tbuf)
t = tusec * 1.0e-6
self._link = tusec & 0x3
self._timestamp = t
def post_message(self, msg):
'''add timestamp to message'''
# read the timestamp
super(mavlogfile, self).post_message(msg)
if self.planner_format:
self.f.read(1) # trailing newline
self.timestamp = msg._timestamp
class mavchildexec(mavfile):
'''a MAVLink child processes reader/writer'''
def __init__(self, filename, source_system=255):
from subprocess import Popen, PIPE
import fcntl
self.filename = filename
self.child = Popen(filename, shell=True, stdout=PIPE, stdin=PIPE)
self.fd = self.child.stdout.fileno()
fl = fcntl.fcntl(self.fd, fcntl.F_GETFL)
fcntl.fcntl(self.fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
fl = fcntl.fcntl(self.child.stdout.fileno(), fcntl.F_GETFL)
fcntl.fcntl(self.child.stdout.fileno(), fcntl.F_SETFL, fl | os.O_NONBLOCK)
mavfile.__init__(self, self.fd, filename, source_system=source_system)
def close(self):
self.child.close()
def recv(self,n=None):
try:
x = self.child.stdout.read(1)
except Exception:
return ''
return x
def write(self, buf):
self.child.stdin.write(buf)
def mavlink_connection(device, baud=115200, source_system=255,
planner_format=None, write=False, append=False,
robust_parsing=True, notimestamps=False, input=True):
'''make a serial or UDP mavlink connection'''
if device.startswith('tcp:'):
return mavtcp(device[4:], source_system=source_system)
if device.startswith('udp:'):
return mavudp(device[4:], input=input, source_system=source_system)
if device.find(':') != -1 and not device.endswith('log'):
return mavudp(device, source_system=source_system, input=input)
if os.path.isfile(device):
if device.endswith(".elf"):
return mavchildexec(device, source_system=source_system)
else:
return mavlogfile(device, planner_format=planner_format, write=write,
append=append, robust_parsing=robust_parsing, notimestamps=notimestamps,
source_system=source_system)
return mavserial(device, baud=baud, source_system=source_system)
class periodic_event(object):
'''a class for fixed frequency events'''
def __init__(self, frequency):
self.frequency = float(frequency)
self.last_time = time.time()
def force(self):
'''force immediate triggering'''
self.last_time = 0
def trigger(self):
'''return True if we should trigger now'''
tnow = time.time()
if self.last_time + (1.0/self.frequency) <= tnow:
self.last_time = tnow
return True
return False
try:
from curses import ascii
have_ascii = True
except:
have_ascii = False
def is_printable(c):
'''see if a character is printable'''
global have_ascii
if have_ascii:
return ascii.isprint(c)
if isinstance(c, int):
ic = c
else:
ic = ord(c)
return ic >= 32 and ic <= 126
def all_printable(buf):
'''see if a string is all printable'''
for c in buf:
if not is_printable(c) and not c in ['\r', '\n', '\t']:
return False
return True
class SerialPort(object):
'''auto-detected serial port'''
def __init__(self, device, description=None, hwid=None):
self.device = device
self.description = description
self.hwid = hwid
def __str__(self):
ret = self.device
if self.description is not None:
ret += " : " + self.description
if self.hwid is not None:
ret += " : " + self.hwid
return ret
def auto_detect_serial_win32(preferred_list=['*']):
'''try to auto-detect serial ports on win32'''
try:
import scanwin32
list = sorted(scanwin32.comports())
except:
return []
ret = []
for order, port, desc, hwid in list:
for preferred in preferred_list:
if fnmatch.fnmatch(desc, preferred) or fnmatch.fnmatch(hwid, preferred):
ret.append(SerialPort(port, description=desc, hwid=hwid))
if len(ret) > 0:
return ret
# now the rest
for order, port, desc, hwid in list:
ret.append(SerialPort(port, description=desc, hwid=hwid))
return ret
def auto_detect_serial_unix(preferred_list=['*']):
'''try to auto-detect serial ports on win32'''
import glob
glist = glob.glob('/dev/ttyS*') + glob.glob('/dev/ttyUSB*') + glob.glob('/dev/ttyACM*') + glob.glob('/dev/serial/by-id/*')
ret = []
# try preferred ones first
for d in glist:
for preferred in preferred_list:
if fnmatch.fnmatch(d, preferred):
ret.append(SerialPort(d))
if len(ret) > 0:
return ret
# now the rest
for d in glist:
ret.append(SerialPort(d))
return ret
def auto_detect_serial(preferred_list=['*']):
'''try to auto-detect serial port'''
# see if
if os.name == 'nt':
return auto_detect_serial_win32(preferred_list=preferred_list)
return auto_detect_serial_unix(preferred_list=preferred_list)
def mode_string_v09(msg):
'''mode string for 0.9 protocol'''
mode = msg.mode
nav_mode = msg.nav_mode
MAV_MODE_UNINIT = 0
MAV_MODE_MANUAL = 2
MAV_MODE_GUIDED = 3
MAV_MODE_AUTO = 4
MAV_MODE_TEST1 = 5
MAV_MODE_TEST2 = 6
MAV_MODE_TEST3 = 7
MAV_NAV_GROUNDED = 0
MAV_NAV_LIFTOFF = 1
MAV_NAV_HOLD = 2
MAV_NAV_WAYPOINT = 3
MAV_NAV_VECTOR = 4
MAV_NAV_RETURNING = 5
MAV_NAV_LANDING = 6
MAV_NAV_LOST = 7
MAV_NAV_LOITER = 8
cmode = (mode, nav_mode)
mapping = {
(MAV_MODE_UNINIT, MAV_NAV_GROUNDED) : "INITIALISING",
(MAV_MODE_MANUAL, MAV_NAV_VECTOR) : "MANUAL",
(MAV_MODE_TEST3, MAV_NAV_VECTOR) : "CIRCLE",
(MAV_MODE_GUIDED, MAV_NAV_VECTOR) : "GUIDED",
(MAV_MODE_TEST1, MAV_NAV_VECTOR) : "STABILIZE",
(MAV_MODE_TEST2, MAV_NAV_LIFTOFF) : "FBWA",
(MAV_MODE_AUTO, MAV_NAV_WAYPOINT) : "AUTO",
(MAV_MODE_AUTO, MAV_NAV_RETURNING) : "RTL",
(MAV_MODE_AUTO, MAV_NAV_LOITER) : "LOITER",
(MAV_MODE_AUTO, MAV_NAV_LIFTOFF) : "TAKEOFF",
(MAV_MODE_AUTO, MAV_NAV_LANDING) : "LANDING",
(MAV_MODE_AUTO, MAV_NAV_HOLD) : "LOITER",
(MAV_MODE_GUIDED, MAV_NAV_VECTOR) : "GUIDED",
(MAV_MODE_GUIDED, MAV_NAV_WAYPOINT) : "GUIDED",
(100, MAV_NAV_VECTOR) : "STABILIZE",
(101, MAV_NAV_VECTOR) : "ACRO",
(102, MAV_NAV_VECTOR) : "ALT_HOLD",
(107, MAV_NAV_VECTOR) : "CIRCLE",
(109, MAV_NAV_VECTOR) : "LAND",
}
if cmode in mapping:
return mapping[cmode]
return "Mode(%s,%s)" % cmode
def mode_string_v10(msg):
'''mode string for 1.0 protocol, from heartbeat'''
if not msg.base_mode & mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED:
return "Mode(0x%08x)" % msg.base_mode
mapping_apm = {
0 : 'MANUAL',
1 : 'CIRCLE',
2 : 'STABILIZE',
5 : 'FBWA',
6 : 'FBWB',
7 : 'FBWC',
10 : 'AUTO',
11 : 'RTL',
12 : 'LOITER',
13 : 'TAKEOFF',
14 : 'LAND',
15 : 'GUIDED',
16 : 'INITIALISING'
}
mapping_acm = {
0 : 'STABILIZE',
1 : 'ACRO',
2 : 'ALT_HOLD',
3 : 'AUTO',
4 : 'GUIDED',
5 : 'LOITER',
6 : 'RTL',
7 : 'CIRCLE',
8 : 'POSITION',
9 : 'LAND',
10 : 'OF_LOITER',
11 : 'APPROACH'
}
if msg.type == mavlink.MAV_TYPE_QUADROTOR:
if msg.custom_mode in mapping_acm:
return mapping_acm[msg.custom_mode]
if msg.type == mavlink.MAV_TYPE_FIXED_WING:
if msg.custom_mode in mapping_apm:
return mapping_apm[msg.custom_mode]
return "Mode(%u)" % msg.custom_mode
class x25crc(object):
'''x25 CRC - based on checksum.h from mavlink library'''
def __init__(self, buf=''):
self.crc = 0xffff
self.accumulate(buf)
def accumulate(self, buf):
'''add in some more bytes'''
bytes = array.array('B')
if isinstance(buf, array.array):
bytes.extend(buf)
else:
bytes.fromstring(buf)
accum = self.crc
for b in bytes:
tmp = b ^ (accum & 0xff)
tmp = (tmp ^ (tmp<<4)) & 0xFF
accum = (accum>>8) ^ (tmp<<8) ^ (tmp<<3) ^ (tmp>>4)
accum = accum & 0xFFFF
self.crc = accum

View File

@ -1,209 +0,0 @@
'''
module for loading/saving waypoints
'''
import os
if os.getenv('MAVLINK10'):
import mavlinkv10 as mavlink
else:
import mavlink
class MAVWPError(Exception):
'''MAVLink WP error class'''
def __init__(self, msg):
Exception.__init__(self, msg)
self.message = msg
class MAVWPLoader(object):
'''MAVLink waypoint loader'''
def __init__(self, target_system=0, target_component=0):
self.wpoints = []
self.target_system = target_system
self.target_component = target_component
def count(self):
'''return number of waypoints'''
return len(self.wpoints)
def wp(self, i):
'''return a waypoint'''
return self.wpoints[i]
def add(self, w):
'''add a waypoint'''
w.seq = self.count()
self.wpoints.append(w)
def set(self, w, idx):
'''set a waypoint'''
w.seq = idx
if w.seq == self.count():
return self.add(w)
if self.count() <= idx:
raise MAVWPError('adding waypoint at idx=%u past end of list (count=%u)' % (idx, self.count()))
self.wpoints[idx] = w
def remove(self, w):
'''remove a waypoint'''
self.wpoints.remove(w)
def clear(self):
'''clear waypoint list'''
self.wpoints = []
def _read_waypoint_v100(self, line):
'''read a version 100 waypoint'''
cmdmap = {
2 : mavlink.MAV_CMD_NAV_TAKEOFF,
3 : mavlink.MAV_CMD_NAV_RETURN_TO_LAUNCH,
4 : mavlink.MAV_CMD_NAV_LAND,
24: mavlink.MAV_CMD_NAV_TAKEOFF,
26: mavlink.MAV_CMD_NAV_LAND,
25: mavlink.MAV_CMD_NAV_WAYPOINT ,
27: mavlink.MAV_CMD_NAV_LOITER_UNLIM
}
a = line.split()
if len(a) != 13:
raise MAVWPError("invalid waypoint line with %u values" % len(a))
w = mavlink.MAVLink_waypoint_message(self.target_system, self.target_component,
int(a[0]), # seq
int(a[1]), # frame
int(a[2]), # action
int(a[7]), # current
int(a[12]), # autocontinue
float(a[5]), # param1,
float(a[6]), # param2,
float(a[3]), # param3
float(a[4]), # param4
float(a[9]), # x, latitude
float(a[8]), # y, longitude
float(a[10]) # z
)
if not w.command in cmdmap:
raise MAVWPError("Unknown v100 waypoint action %u" % w.command)
w.command = cmdmap[w.command]
return w
def _read_waypoint_v110(self, line):
'''read a version 110 waypoint'''
a = line.split()
if len(a) != 12:
raise MAVWPError("invalid waypoint line with %u values" % len(a))
w = mavlink.MAVLink_waypoint_message(self.target_system, self.target_component,
int(a[0]), # seq
int(a[2]), # frame
int(a[3]), # command
int(a[1]), # current
int(a[11]), # autocontinue
float(a[4]), # param1,
float(a[5]), # param2,
float(a[6]), # param3
float(a[7]), # param4
float(a[8]), # x (latitude)
float(a[9]), # y (longitude)
float(a[10]) # z (altitude)
)
return w
def load(self, filename):
'''load waypoints from a file.
returns number of waypoints loaded'''
f = open(filename, mode='r')
version_line = f.readline().strip()
if version_line == "QGC WPL 100":
readfn = self._read_waypoint_v100
elif version_line == "QGC WPL 110":
readfn = self._read_waypoint_v110
else:
f.close()
raise MAVWPError("Unsupported waypoint format '%s'" % version_line)
self.clear()
for line in f:
if line.startswith('#'):
continue
line = line.strip()
if not line:
continue
w = readfn(line)
if w is not None:
self.add(w)
f.close()
return len(self.wpoints)
def save(self, filename):
'''save waypoints to a file'''
f = open(filename, mode='w')
f.write("QGC WPL 110\n")
for w in self.wpoints:
f.write("%u\t%u\t%u\t%u\t%f\t%f\t%f\t%f\t%f\t%f\t%f\t%u\n" % (
w.seq, w.current, w.frame, w.command,
w.param1, w.param2, w.param3, w.param4,
w.x, w.y, w.z, w.autocontinue))
f.close()
class MAVFenceError(Exception):
'''MAVLink fence error class'''
def __init__(self, msg):
Exception.__init__(self, msg)
self.message = msg
class MAVFenceLoader(object):
'''MAVLink geo-fence loader'''
def __init__(self, target_system=0, target_component=0):
self.points = []
self.target_system = target_system
self.target_component = target_component
def count(self):
'''return number of points'''
return len(self.points)
def point(self, i):
'''return a point'''
return self.points[i]
def add(self, p):
'''add a point'''
self.points.append(p)
def clear(self):
'''clear point list'''
self.points = []
def load(self, filename):
'''load points from a file.
returns number of points loaded'''
f = open(filename, mode='r')
self.clear()
for line in f:
if line.startswith('#'):
continue
line = line.strip()
if not line:
continue
a = line.split()
if len(a) != 2:
raise MAVFenceError("invalid fence point line: %s" % line)
p = mavlink.MAVLink_fence_point_message(self.target_system, self.target_component,
self.count(), 0, float(a[0]), float(a[1]))
self.add(p)
f.close()
for i in range(self.count()):
self.points[i].count = self.count()
return len(self.points)
def save(self, filename):
'''save fence points to a file'''
f = open(filename, mode='w')
for p in self.points:
f.write("%f\t%f\n" % (p.lat, p.lng))
f.close()

View File

@ -1,236 +0,0 @@
#!/usr/bin/env python
# this is taken from the pySerial documentation at
# http://pyserial.sourceforge.net/examples.html
import ctypes
import re
def ValidHandle(value):
if value == 0:
raise ctypes.WinError()
return value
NULL = 0
HDEVINFO = ctypes.c_int
BOOL = ctypes.c_int
CHAR = ctypes.c_char
PCTSTR = ctypes.c_char_p
HWND = ctypes.c_uint
DWORD = ctypes.c_ulong
PDWORD = ctypes.POINTER(DWORD)
ULONG = ctypes.c_ulong
ULONG_PTR = ctypes.POINTER(ULONG)
#~ PBYTE = ctypes.c_char_p
PBYTE = ctypes.c_void_p
class GUID(ctypes.Structure):
_fields_ = [
('Data1', ctypes.c_ulong),
('Data2', ctypes.c_ushort),
('Data3', ctypes.c_ushort),
('Data4', ctypes.c_ubyte*8),
]
def __str__(self):
return "{%08x-%04x-%04x-%s-%s}" % (
self.Data1,
self.Data2,
self.Data3,
''.join(["%02x" % d for d in self.Data4[:2]]),
''.join(["%02x" % d for d in self.Data4[2:]]),
)
class SP_DEVINFO_DATA(ctypes.Structure):
_fields_ = [
('cbSize', DWORD),
('ClassGuid', GUID),
('DevInst', DWORD),
('Reserved', ULONG_PTR),
]
def __str__(self):
return "ClassGuid:%s DevInst:%s" % (self.ClassGuid, self.DevInst)
PSP_DEVINFO_DATA = ctypes.POINTER(SP_DEVINFO_DATA)
class SP_DEVICE_INTERFACE_DATA(ctypes.Structure):
_fields_ = [
('cbSize', DWORD),
('InterfaceClassGuid', GUID),
('Flags', DWORD),
('Reserved', ULONG_PTR),
]
def __str__(self):
return "InterfaceClassGuid:%s Flags:%s" % (self.InterfaceClassGuid, self.Flags)
PSP_DEVICE_INTERFACE_DATA = ctypes.POINTER(SP_DEVICE_INTERFACE_DATA)
PSP_DEVICE_INTERFACE_DETAIL_DATA = ctypes.c_void_p
class dummy(ctypes.Structure):
_fields_=[("d1", DWORD), ("d2", CHAR)]
_pack_ = 1
SIZEOF_SP_DEVICE_INTERFACE_DETAIL_DATA_A = ctypes.sizeof(dummy)
SetupDiDestroyDeviceInfoList = ctypes.windll.setupapi.SetupDiDestroyDeviceInfoList
SetupDiDestroyDeviceInfoList.argtypes = [HDEVINFO]
SetupDiDestroyDeviceInfoList.restype = BOOL
SetupDiGetClassDevs = ctypes.windll.setupapi.SetupDiGetClassDevsA
SetupDiGetClassDevs.argtypes = [ctypes.POINTER(GUID), PCTSTR, HWND, DWORD]
SetupDiGetClassDevs.restype = ValidHandle # HDEVINFO
SetupDiEnumDeviceInterfaces = ctypes.windll.setupapi.SetupDiEnumDeviceInterfaces
SetupDiEnumDeviceInterfaces.argtypes = [HDEVINFO, PSP_DEVINFO_DATA, ctypes.POINTER(GUID), DWORD, PSP_DEVICE_INTERFACE_DATA]
SetupDiEnumDeviceInterfaces.restype = BOOL
SetupDiGetDeviceInterfaceDetail = ctypes.windll.setupapi.SetupDiGetDeviceInterfaceDetailA
SetupDiGetDeviceInterfaceDetail.argtypes = [HDEVINFO, PSP_DEVICE_INTERFACE_DATA, PSP_DEVICE_INTERFACE_DETAIL_DATA, DWORD, PDWORD, PSP_DEVINFO_DATA]
SetupDiGetDeviceInterfaceDetail.restype = BOOL
SetupDiGetDeviceRegistryProperty = ctypes.windll.setupapi.SetupDiGetDeviceRegistryPropertyA
SetupDiGetDeviceRegistryProperty.argtypes = [HDEVINFO, PSP_DEVINFO_DATA, DWORD, PDWORD, PBYTE, DWORD, PDWORD]
SetupDiGetDeviceRegistryProperty.restype = BOOL
GUID_CLASS_COMPORT = GUID(0x86e0d1e0L, 0x8089, 0x11d0,
(ctypes.c_ubyte*8)(0x9c, 0xe4, 0x08, 0x00, 0x3e, 0x30, 0x1f, 0x73))
DIGCF_PRESENT = 2
DIGCF_DEVICEINTERFACE = 16
INVALID_HANDLE_VALUE = 0
ERROR_INSUFFICIENT_BUFFER = 122
SPDRP_HARDWAREID = 1
SPDRP_FRIENDLYNAME = 12
SPDRP_LOCATION_INFORMATION = 13
ERROR_NO_MORE_ITEMS = 259
def comports(available_only=True):
"""This generator scans the device registry for com ports and yields
(order, port, desc, hwid). If available_only is true only return currently
existing ports. Order is a helper to get sorted lists. it can be ignored
otherwise."""
flags = DIGCF_DEVICEINTERFACE
if available_only:
flags |= DIGCF_PRESENT
g_hdi = SetupDiGetClassDevs(ctypes.byref(GUID_CLASS_COMPORT), None, NULL, flags);
#~ for i in range(256):
for dwIndex in range(256):
did = SP_DEVICE_INTERFACE_DATA()
did.cbSize = ctypes.sizeof(did)
if not SetupDiEnumDeviceInterfaces(
g_hdi,
None,
ctypes.byref(GUID_CLASS_COMPORT),
dwIndex,
ctypes.byref(did)
):
if ctypes.GetLastError() != ERROR_NO_MORE_ITEMS:
raise ctypes.WinError()
break
dwNeeded = DWORD()
# get the size
if not SetupDiGetDeviceInterfaceDetail(
g_hdi,
ctypes.byref(did),
None, 0, ctypes.byref(dwNeeded),
None
):
# Ignore ERROR_INSUFFICIENT_BUFFER
if ctypes.GetLastError() != ERROR_INSUFFICIENT_BUFFER:
raise ctypes.WinError()
# allocate buffer
class SP_DEVICE_INTERFACE_DETAIL_DATA_A(ctypes.Structure):
_fields_ = [
('cbSize', DWORD),
('DevicePath', CHAR*(dwNeeded.value - ctypes.sizeof(DWORD))),
]
def __str__(self):
return "DevicePath:%s" % (self.DevicePath,)
idd = SP_DEVICE_INTERFACE_DETAIL_DATA_A()
idd.cbSize = SIZEOF_SP_DEVICE_INTERFACE_DETAIL_DATA_A
devinfo = SP_DEVINFO_DATA()
devinfo.cbSize = ctypes.sizeof(devinfo)
if not SetupDiGetDeviceInterfaceDetail(
g_hdi,
ctypes.byref(did),
ctypes.byref(idd), dwNeeded, None,
ctypes.byref(devinfo)
):
raise ctypes.WinError()
# hardware ID
szHardwareID = ctypes.create_string_buffer(250)
if not SetupDiGetDeviceRegistryProperty(
g_hdi,
ctypes.byref(devinfo),
SPDRP_HARDWAREID,
None,
ctypes.byref(szHardwareID), ctypes.sizeof(szHardwareID) - 1,
None
):
# Ignore ERROR_INSUFFICIENT_BUFFER
if ctypes.GetLastError() != ERROR_INSUFFICIENT_BUFFER:
raise ctypes.WinError()
# friendly name
szFriendlyName = ctypes.create_string_buffer(1024)
if not SetupDiGetDeviceRegistryProperty(
g_hdi,
ctypes.byref(devinfo),
SPDRP_FRIENDLYNAME,
None,
ctypes.byref(szFriendlyName), ctypes.sizeof(szFriendlyName) - 1,
None
):
# Ignore ERROR_INSUFFICIENT_BUFFER
if ctypes.GetLastError() != ERROR_INSUFFICIENT_BUFFER:
#~ raise ctypes.WinError()
# not getting friendly name for com0com devices, try something else
szFriendlyName = ctypes.create_string_buffer(1024)
if SetupDiGetDeviceRegistryProperty(
g_hdi,
ctypes.byref(devinfo),
SPDRP_LOCATION_INFORMATION,
None,
ctypes.byref(szFriendlyName), ctypes.sizeof(szFriendlyName) - 1,
None
):
port_name = "\\\\.\\" + szFriendlyName.value
order = None
else:
port_name = szFriendlyName.value
order = None
else:
try:
m = re.search(r"\((.*?(\d+))\)", szFriendlyName.value)
#~ print szFriendlyName.value, m.groups()
port_name = m.group(1)
order = int(m.group(2))
except AttributeError, msg:
port_name = szFriendlyName.value
order = None
yield order, port_name, szFriendlyName.value, szHardwareID.value
SetupDiDestroyDeviceInfoList(g_hdi)
if __name__ == '__main__':
import serial
print "-"*78
print "Serial ports"
print "-"*78
for order, port, desc, hwid in sorted(comports()):
print "%-10s: %s (%s) ->" % (port, desc, hwid),
try:
serial.Serial(port) # test open
except serial.serialutil.SerialException:
print "can't be openend"
else:
print "Ready"
print
# list of all ports the system knows
print "-"*78
print "All serial ports (registry)"
print "-"*78
for order, port, desc, hwid in sorted(comports(False)):
print "%-10s: %s (%s)" % (port, desc, hwid)

View File

@ -5,7 +5,7 @@ import util, time, os, sys, math
import socket, struct import socket, struct
import select, errno import select, errno
sys.path.insert(0, os.path.join(os.path.dirname(os.path.realpath(__file__)), '..', 'pymavlink')) sys.path.insert(0, os.path.join(os.path.dirname(os.path.realpath(__file__)), '..', '..', '..', '..', 'mavlink', 'pymavlink'))
import fgFDM import fgFDM
def sim_send(m, a): def sim_send(m, a):