mavlink_shell.py: allow scripted use

E.g. echo free|./Tools/mavlink_shell.py

Or creating a virtual nsh console:
socat pty,rawer,link=/tmp/fmu_nsh_pts exec:"mavlink_shell.py 0.0.0.0\\:14550,pty,rawer"
This commit is contained in:
Beat Küng 2021-06-11 14:24:17 +02:00
parent aa60d76963
commit d79229f958
1 changed files with 27 additions and 12 deletions

View File

@ -12,6 +12,7 @@ import sys, select
import termios
from timeit import default_timer as timer
from argparse import ArgumentParser
import os
try:
from pymavlink import mavutil
@ -135,15 +136,20 @@ def main():
mav_serialport.write('\n') # make sure the shell is started
# setup the console, so we can read one char at a time
# disable echo & avoid buffering on stdin
fd_in = sys.stdin.fileno()
old_attr = termios.tcgetattr(fd_in)
new_attr = termios.tcgetattr(fd_in)
new_attr[3] = new_attr[3] & ~termios.ECHO # lflags
new_attr[3] = new_attr[3] & ~termios.ICANON
try:
old_attr = termios.tcgetattr(fd_in)
new_attr = termios.tcgetattr(fd_in)
new_attr[3] = new_attr[3] & ~termios.ECHO # lflags
new_attr[3] = new_attr[3] & ~termios.ICANON
termios.tcsetattr(fd_in, termios.TCSANOW, new_attr)
except termios.error:
# tcgetattr can fail if stdin is not a tty
old_attr = None
ubuf_stdin = os.fdopen(fd_in, 'rb', buffering=0)
try:
termios.tcsetattr(fd_in, termios.TCSANOW, new_attr)
cur_line = ''
command_history = []
cur_history_index = 0
@ -156,11 +162,19 @@ def main():
next_heartbeat_time = timer()
while True:
quit_time = None
while quit_time is None or quit_time > timer():
while True:
i, o, e = select.select([sys.stdin], [], [], 0)
i, o, e = select.select([ubuf_stdin], [], [], 0)
if not i: break
ch = sys.stdin.read(1)
ch = ubuf_stdin.read(1).decode('utf8')
if len(ch) == 0: # EOF
if quit_time is None:
# run a bit longer to read the response (we could also
# read until we get a prompt)
quit_time = timer() + 1
break
# provide a simple shell with command history
if ch == '\n':
@ -182,8 +196,8 @@ def main():
cur_line = cur_line[:-1]
sys.stdout.write(ch)
elif ord(ch) == 27:
ch = sys.stdin.read(1) # skip one
ch = sys.stdin.read(1)
ch = ubuf_stdin.read(1).decode('utf8') # skip one
ch = ubuf_stdin.read(1).decode('utf8')
if ch == 'A': # arrow up
if cur_history_index > 0:
cur_history_index -= 1
@ -223,7 +237,8 @@ def main():
mav_serialport.close()
finally:
termios.tcsetattr(fd_in, termios.TCSADRAIN, old_attr)
if old_attr:
termios.tcsetattr(fd_in, termios.TCSADRAIN, old_attr)
if __name__ == '__main__':