From e8d44da2c0633bacd4b0c7a472ca2118210cea3e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Beat=20K=C3=BCng?= Date: Thu, 28 Jul 2016 17:43:52 +0200 Subject: [PATCH] Tools: add mavlink_shell.py script to start a shell over mavlink --- Tools/mavlink_shell.py | 154 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 154 insertions(+) create mode 100755 Tools/mavlink_shell.py diff --git a/Tools/mavlink_shell.py b/Tools/mavlink_shell.py new file mode 100755 index 0000000000..fc77ad6a22 --- /dev/null +++ b/Tools/mavlink_shell.py @@ -0,0 +1,154 @@ +#!/usr/bin/env python + +""" +Open a shell over MAVLink. + +@author: Beat Kueng (beat-kueng@gmx.net) +""" + + +from __future__ import print_function +import sys, select + +try: + from pymavlink import mavutil + import serial +except: + print("Failed to import pymavlink.") + print("You may need to install it with 'pip install pymavlink pyserial'") + exit(-1) +from argparse import ArgumentParser + + +class MavlinkSerialPort(): + '''an object that looks like a serial port, but + transmits using mavlink SERIAL_CONTROL packets''' + def __init__(self, portname, baudrate, devnum=0, debug=0): + self.baudrate = 0 + self._debug = debug + self.buf = '' + self.port = devnum + self.debug("Connecting with MAVLink to %s ..." % portname) + self.mav = mavutil.mavlink_connection(portname, autoreconnect=True, baud=baudrate) + self.mav.wait_heartbeat() + self.debug("HEARTBEAT OK\n") + self.debug("Locked serial device\n") + + def debug(self, s, level=1): + '''write some debug text''' + if self._debug >= level: + print(s) + + def write(self, b): + '''write some bytes''' + self.debug("sending '%s' (0x%02x) of len %u\n" % (b, ord(b[0]), len(b)), 2) + while len(b) > 0: + n = len(b) + if n > 70: + n = 70 + buf = [ord(x) for x in b[:n]] + buf.extend([0]*(70-len(buf))) + self.mav.mav.serial_control_send(self.port, + mavutil.mavlink.SERIAL_CONTROL_FLAG_EXCLUSIVE | + mavutil.mavlink.SERIAL_CONTROL_FLAG_RESPOND, + 0, + 0, + n, + buf) + b = b[n:] + + def close(self): + self.mav.mav.serial_control_send(self.port, 0, 0, 0, 0, [0]*70) + + def _recv(self): + '''read some bytes into self.buf''' + m = self.mav.recv_match(condition='SERIAL_CONTROL.count!=0', + type='SERIAL_CONTROL', blocking=True, + timeout=0.03) + if m is not None: + if self._debug > 2: + print(m) + data = m.data[:m.count] + self.buf += ''.join(str(chr(x)) for x in data) + + def read(self, n): + '''read some bytes''' + if len(self.buf) == 0: + self._recv() + if len(self.buf) > 0: + if n > len(self.buf): + n = len(self.buf) + ret = self.buf[:n] + self.buf = self.buf[n:] + if self._debug >= 2: + for b in ret: + self.debug("read 0x%x" % ord(b), 2) + return ret + return '' + + +def main(): + parser = ArgumentParser(description=__doc__) + parser.add_argument('port', metavar='PORT', nargs='?', default = None, + help='Mavlink port name: serial: DEVICE[,BAUD], udp: IP:PORT, tcp: tcp:IP:PORT. Eg: \ +/dev/ttyUSB0 or 0.0.0.0:14550. Auto-detect serial if not given.') + parser.add_argument("--baudrate", "-b", dest="baudrate", type=int, + help="Mavlink port baud rate (default=115200)", default=115200) + args = parser.parse_args() + + + if args.port == None: + serial_list = mavutil.auto_detect_serial(preferred_list=['*FTDI*', + "*Arduino_Mega_2560*", "*3D_Robotics*", "*USB_to_UART*", '*PX4*', '*FMU*']) + + if len(serial_list) == 0: + print("Error: no serial connection found") + return + + if len(serial_list) > 1: + print('Auto-detected serial ports are:') + for port in serial_list: + print(" {:}".format(port)) + print('Using port {:}'.format(serial_list[0])) + args.port = serial_list[0].device + + + print("Connecting to MAVLINK...") + mav_serialport = MavlinkSerialPort(args.port, args.baudrate, devnum=10) + + mav_serialport.write('\n') # make sure the shell is started + + try: + while True: + i, o, e = select.select([sys.stdin], [], [], 0.001) + if (i): + data = sys.stdin.readline() + + # Erase stdin output (otherwise we have it twice) + # FIXME: this assumes a prompt of 5 chars, and does not work in all cases + CURSOR_UP_ONE = '\x1b[1A' + CURSOR_RIGHT = '\x1b[5C' + ERASE_LINE = '\x1b[2K' + ERASE_END_LINE = '\x1b[K' + sys.stdout.write(CURSOR_UP_ONE + CURSOR_RIGHT + ERASE_END_LINE) + + # TODO: add a command history? + + #sys.stdout.write(data) + mav_serialport.write(data) + + data = mav_serialport.read(4096) + if data and len(data) > 0: + sys.stdout.write(data) + sys.stdout.flush() + + except serial.serialutil.SerialException as e: + print(e) + + except KeyboardInterrupt: + mav_serialport.close() + + +if __name__ == '__main__': + main() +