Tools/HIL: use pyserial spy:// to log all serial traffic and dump on failure

This commit is contained in:
Daniel Agar 2022-01-01 21:04:47 -05:00
parent 7df1721d32
commit c421bff73c
6 changed files with 242 additions and 205 deletions

View File

@ -25,7 +25,7 @@ pipeline {
}
post {
always {
sh 'make distclean; git clean -ff -x -d .'
sh 'make distclean; git clean -ff -x -d .'
}
}
} // stage build
@ -73,8 +73,11 @@ pipeline {
}
}
post {
failure {
sh 'cat /tmp/pyserial_spy_file.txt'
}
always {
sh './platforms/nuttx/Debug/jlink_gdb_backtrace_simple.sh build/cubepilot_cubeorange_test/cubepilot_cubeorange_test.elf || true'
sh './platforms/nuttx/Debug/jlink_gdb_backtrace_simple.sh build/cubepilot_cubeorange_test/cubepilot_cubeorange_test.elf || true'
}
}
} // stage test
@ -99,7 +102,7 @@ pipeline {
}
post {
always {
sh 'make distclean; git clean -ff -x -d .'
sh 'make distclean; git clean -ff -x -d .'
}
}
} // stage build
@ -142,8 +145,11 @@ pipeline {
}
}
post {
failure {
sh 'cat /tmp/pyserial_spy_file.txt'
}
always {
sh './platforms/nuttx/Debug/jlink_gdb_backtrace_simple.sh build/cuav_x7pro_test/cuav_x7pro_test.elf || true'
sh './platforms/nuttx/Debug/jlink_gdb_backtrace_simple.sh build/cuav_x7pro_test/cuav_x7pro_test.elf || true'
}
}
} // stage test
@ -168,7 +174,7 @@ pipeline {
}
post {
always {
sh 'make distclean; git clean -ff -x -d .'
sh 'make distclean; git clean -ff -x -d .'
}
}
} // stage build
@ -211,8 +217,11 @@ pipeline {
}
}
post {
failure {
sh 'cat /tmp/pyserial_spy_file.txt'
}
always {
sh './platforms/nuttx/Debug/jlink_gdb_backtrace_simple.sh build/px4_fmu-v3_test/px4_fmu-v3_test.elf || true'
sh './platforms/nuttx/Debug/jlink_gdb_backtrace_simple.sh build/px4_fmu-v3_test/px4_fmu-v3_test.elf || true'
}
}
} // stage test
@ -237,7 +246,7 @@ pipeline {
}
post {
always {
sh 'make distclean; git clean -ff -x -d .'
sh 'make distclean; git clean -ff -x -d .'
}
}
} // stage build
@ -279,8 +288,11 @@ pipeline {
}
}
post {
failure {
sh 'cat /tmp/pyserial_spy_file.txt'
}
always {
sh './platforms/nuttx/Debug/jlink_gdb_backtrace_simple.sh build/px4_fmu-v4_test/px4_fmu-v4_test.elf || true'
sh './platforms/nuttx/Debug/jlink_gdb_backtrace_simple.sh build/px4_fmu-v4_test/px4_fmu-v4_test.elf || true'
}
}
} // stage test
@ -305,7 +317,7 @@ pipeline {
}
post {
always {
sh 'make distclean; git clean -ff -x -d .'
sh 'make distclean; git clean -ff -x -d .'
}
}
} // stage build
@ -348,8 +360,11 @@ pipeline {
}
}
post {
failure {
sh 'cat /tmp/pyserial_spy_file.txt'
}
always {
sh './platforms/nuttx/Debug/jlink_gdb_backtrace_simple.sh build/px4_fmu-v4pro_test/px4_fmu-v4pro_test.elf || true'
sh './platforms/nuttx/Debug/jlink_gdb_backtrace_simple.sh build/px4_fmu-v4pro_test/px4_fmu-v4pro_test.elf || true'
}
}
} // stage test
@ -374,7 +389,7 @@ pipeline {
}
post {
always {
sh 'make distclean; git clean -ff -x -d .'
sh 'make distclean; git clean -ff -x -d .'
}
}
} // stage build
@ -437,8 +452,11 @@ pipeline {
}
}
post {
failure {
sh 'cat /tmp/pyserial_spy_file.txt'
}
always {
sh './platforms/nuttx/Debug/jlink_gdb_backtrace_simple.sh build/px4_fmu-v5_debug/px4_fmu-v5_debug.elf || true'
sh './platforms/nuttx/Debug/jlink_gdb_backtrace_simple.sh build/px4_fmu-v5_debug/px4_fmu-v5_debug.elf || true'
}
}
} // stage test
@ -463,7 +481,7 @@ pipeline {
}
post {
always {
sh 'make distclean; git clean -ff -x -d .'
sh 'make distclean; git clean -ff -x -d .'
}
}
} // stage build
@ -518,8 +536,11 @@ pipeline {
}
}
post {
failure {
sh 'cat /tmp/pyserial_spy_file.txt'
}
always {
sh './platforms/nuttx/Debug/jlink_gdb_backtrace_simple.sh build/px4_fmu-v5_stackcheck/px4_fmu-v5_stackcheck.elf || true'
sh './platforms/nuttx/Debug/jlink_gdb_backtrace_simple.sh build/px4_fmu-v5_stackcheck/px4_fmu-v5_stackcheck.elf || true'
}
}
} // stage test
@ -544,7 +565,7 @@ pipeline {
}
post {
always {
sh 'make distclean; git clean -ff -x -d .'
sh 'make distclean; git clean -ff -x -d .'
}
}
} // stage build
@ -587,8 +608,11 @@ pipeline {
}
}
post {
failure {
sh 'cat /tmp/pyserial_spy_file.txt'
}
always {
sh './platforms/nuttx/Debug/jlink_gdb_backtrace_simple.sh build/px4_fmu-v5_test/px4_fmu-v5_test.elf || true'
sh './platforms/nuttx/Debug/jlink_gdb_backtrace_simple.sh build/px4_fmu-v5_test/px4_fmu-v5_test.elf || true'
}
}
} // stage test
@ -613,7 +637,7 @@ pipeline {
}
post {
always {
sh 'make distclean; git clean -ff -x -d .'
sh 'make distclean; git clean -ff -x -d .'
}
}
} // stage build
@ -656,8 +680,11 @@ pipeline {
}
}
post {
failure {
sh 'cat /tmp/pyserial_spy_file.txt'
}
always {
sh './platforms/nuttx/Debug/jlink_gdb_backtrace_simple.sh build/nxp_fmuk66-v3_test/nxp_fmuk66-v3_test.elf || true'
sh './platforms/nuttx/Debug/jlink_gdb_backtrace_simple.sh build/nxp_fmuk66-v3_test/nxp_fmuk66-v3_test.elf || true'
}
}
} // stage test
@ -723,11 +750,15 @@ void checkStatus() {
sh './Tools/HIL/run_nsh_cmd.py --device `find /dev/serial -name *usb-*` --cmd "logger on"'
sh './Tools/HIL/run_nsh_cmd.py --device `find /dev/serial -name *usb-*` --cmd "sleep 1"' // sleep before continuing
// status commands
sh './Tools/HIL/run_nsh_cmd.py --device `find /dev/serial -name *usb-*` --cmd "cat /proc/fs/blocks"'
sh './Tools/HIL/run_nsh_cmd.py --device `find /dev/serial -name *usb-*` --cmd "cat /proc/fs/mount"'
sh './Tools/HIL/run_nsh_cmd.py --device `find /dev/serial -name *usb-*` --cmd "cat /proc/fs/usage"'
sh './Tools/HIL/run_nsh_cmd.py --device `find /dev/serial -name *usb-*` --cmd "cat /proc/meminfo"'
sh './Tools/HIL/run_nsh_cmd.py --device `find /dev/serial -name *usb-*` --cmd "cat /proc/uptime"'
sh './Tools/HIL/run_nsh_cmd.py --device `find /dev/serial -name *usb-*` --cmd "commander check" || true'
sh './Tools/HIL/run_nsh_cmd.py --device `find /dev/serial -name *usb-*` --cmd "commander status"'
sh './Tools/HIL/run_nsh_cmd.py --device `find /dev/serial -name *usb-*` --cmd "dataman status"'
sh './Tools/HIL/run_nsh_cmd.py --device `find /dev/serial -name *usb-*` --cmd "df -h"'
sh './Tools/HIL/run_nsh_cmd.py --device `find /dev/serial -name *usb-*` --cmd "df"'
sh './Tools/HIL/run_nsh_cmd.py --device `find /dev/serial -name *usb-*` --cmd "ekf2 status"'
sh './Tools/HIL/run_nsh_cmd.py --device `find /dev/serial -name *usb-*` --cmd "free"'
@ -738,8 +769,11 @@ void checkStatus() {
sh './Tools/HIL/run_nsh_cmd.py --device `find /dev/serial -name *usb-*` --cmd "ls /bin"'
sh './Tools/HIL/run_nsh_cmd.py --device `find /dev/serial -name *usb-*` --cmd "ls /dev"'
sh './Tools/HIL/run_nsh_cmd.py --device `find /dev/serial -name *usb-*` --cmd "ls /etc"'
sh './Tools/HIL/run_nsh_cmd.py --device `find /dev/serial -name *usb-*` --cmd "ls /fs"'
sh './Tools/HIL/run_nsh_cmd.py --device `find /dev/serial -name *usb-*` --cmd "ls /fs/microsd"'
sh './Tools/HIL/run_nsh_cmd.py --device `find /dev/serial -name *usb-*` --cmd "ls /obj"'
sh './Tools/HIL/run_nsh_cmd.py --device `find /dev/serial -name *usb-*` --cmd "ls /proc"'
sh './Tools/HIL/run_nsh_cmd.py --device `find /dev/serial -name *usb-*` --cmd "ls /proc/fs"'
sh './Tools/HIL/run_nsh_cmd.py --device `find /dev/serial -name *usb-*` --cmd "mavlink status streams" || true'
sh './Tools/HIL/run_nsh_cmd.py --device `find /dev/serial -name *usb-*` --cmd "mavlink status" || true'
sh './Tools/HIL/run_nsh_cmd.py --device `find /dev/serial -name *usb-*` --cmd "mount"'
@ -917,10 +951,6 @@ void printTopics() {
}
void resetBoard() {
sh './Tools/HIL/run_nsh_cmd.py --device `find /dev/serial -name *usb-*` --cmd "df -h" || true'
sh './Tools/HIL/run_nsh_cmd.py --device `find /dev/serial -name *usb-*` --cmd "ls /fs/" || true'
sh './Tools/HIL/run_nsh_cmd.py --device `find /dev/serial -name *usb-*` --cmd "ls /fs/microsd" || true'
resetParameters()
sh './Tools/HIL/run_nsh_cmd.py --device `find /dev/serial -name *usb-*` --cmd "param set SYS_AUTOSTART 0" || true'
@ -930,12 +960,4 @@ void resetBoard() {
sh './Tools/HIL/run_nsh_cmd.py --device `find /dev/serial -name *usb-*` --cmd "param set SDLOG_MODE -1" || true' // limit cpu usage
sh './Tools/HIL/run_nsh_cmd.py --device `find /dev/serial -name *usb-*` --cmd "echo > /fs/microsd/.format" || true'
sh './Tools/HIL/reboot.py --device `find /dev/serial -name *usb-*`' // reboot to apply
// check SD card
sh './Tools/HIL/run_nsh_cmd.py --device `find /dev/serial -name *usb-*` --cmd "df -h" || true'
sh './Tools/HIL/run_nsh_cmd.py --device `find /dev/serial -name *usb-*` --cmd "ls /fs/microsd" || true'
sh './Tools/HIL/run_nsh_cmd.py --device `find /dev/serial -name *usb-*` --cmd "ls /proc/fs" || true'
sh './Tools/HIL/run_nsh_cmd.py --device `find /dev/serial -name *usb-*` --cmd "cat /proc/fs/blocks" || true'
sh './Tools/HIL/run_nsh_cmd.py --device `find /dev/serial -name *usb-*` --cmd "cat /proc/fs/mount" || true'
sh './Tools/HIL/run_nsh_cmd.py --device `find /dev/serial -name *usb-*` --cmd "cat /proc/fs/usage" || true'
}

View File

@ -8,6 +8,7 @@ import re
import sys
import datetime
import serial.tools.list_ports as list_ports
import tempfile
COLOR_RED = "\x1b[31m"
COLOR_GREEN = "\x1b[32m"
@ -38,8 +39,8 @@ def print_line(line):
print('{0}'.format(line), end='')
def monitor_firmware_upload(port, baudrate):
ser = serial.Serial(port, baudrate, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, timeout=1, xonxoff=False, rtscts=False, dsrdtr=False)
def monitor_firmware_upload(port_url, baudrate):
ser = serial.serial_for_url(url=port_url, baudrate=baudrate, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, timeout=3, xonxoff=False, rtscts=False, dsrdtr=False, inter_byte_timeout=1)
timeout = 180 # 3 minutes
timeout_start = time.monotonic()
@ -56,20 +57,21 @@ def monitor_firmware_upload(port, baudrate):
print_line(serial_line)
if "NuttShell (NSH)" in serial_line:
sys.exit(return_code)
elif "nsh>" in serial_line:
sys.exit(return_code)
if "NuttShell (NSH)" in serial_line:
sys.exit(return_code)
elif "nsh>" in serial_line:
sys.exit(return_code)
if time.monotonic() > timeout_start + timeout:
print("Error, timeout")
sys.exit(-1)
else:
if time.monotonic() > timeout_start + timeout:
print("Error, timeout")
sys.exit(-1)
# newline every 10 seconds if still running
if (len(serial_line) <= 0) and (time.monotonic() - timeout_newline > 10):
timeout_newline = time.monotonic()
ser.write("\n".encode("ascii"))
# newline every 10 seconds if still running
if time.monotonic() - timeout_newline > 10:
timeout_newline = time.monotonic()
ser.write("\n".encode("ascii"))
ser.flush()
def main():
@ -96,10 +98,15 @@ def main():
parser = ArgumentParser(description=__doc__)
parser.add_argument('--device', "-d", nargs='?', default=default_device, help='', required=device_required)
parser.add_argument("--baudrate", "-b", dest="baudrate", type=int, help="Mavlink port baud rate (default=57600)", default=57600)
parser.add_argument("--baudrate", "-b", dest="baudrate", type=int, help="serial port baud rate (default=57600)", default=57600)
args = parser.parse_args()
monitor_firmware_upload(args.device, args.baudrate)
tmp_file = "{0}/pyserial_spy_file.txt".format(tempfile.gettempdir())
port_url = "spy://{0}?file={1}".format(args.device, tmp_file)
print("pyserial url: {0}".format(port_url))
monitor_firmware_upload(port_url, args.baudrate)
if __name__ == "__main__":
main()

View File

@ -8,6 +8,7 @@ import re
import sys
import datetime
import serial.tools.list_ports as list_ports
import tempfile
COLOR_RED = "\x1b[31m"
COLOR_GREEN = "\x1b[32m"
@ -38,28 +39,28 @@ def print_line(line):
print('{0}'.format(line), end='')
def do_param_set_cmd(port, baudrate, param_name, param_value):
ser = serial.Serial(port, baudrate, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, timeout=1, xonxoff=False, rtscts=False, dsrdtr=False)
def do_param_set_cmd(port_url, baudrate, param_name, param_value):
ser = serial.serial_for_url(url=port_url, baudrate=baudrate, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, timeout=3, xonxoff=False, rtscts=False, dsrdtr=False, inter_byte_timeout=1)
timeout_start = time.monotonic()
timeout = 30 # 30 seconds
ser.write("\n\n\n".encode("ascii"))
# wait for nsh prompt
while True:
ser.write("\n".encode("ascii"))
ser.flush()
serial_line = ser.readline().decode("ascii", errors='ignore')
if "nsh>" in serial_line:
break
if len(serial_line) > 0:
if "nsh>" in serial_line:
break
else:
if len(serial_line) > 0:
print_line(serial_line)
if time.monotonic() > timeout_start + timeout:
print("Error, timeout waiting for prompt")
sys.exit(1)
ser.write("\n".encode("ascii"))
if time.monotonic() > timeout_start + timeout:
print("Error, timeout waiting for prompt")
sys.exit(1)
# clear
ser.reset_input_buffer()
@ -72,28 +73,32 @@ def do_param_set_cmd(port, baudrate, param_name, param_value):
# write command (param set) and wait for command echo
print("Running command: \'{0}\'".format(cmd))
serial_cmd = '{0}\r\n'.format(cmd)
serial_cmd = '{0}\n'.format(cmd)
ser.write(serial_cmd.encode("ascii"))
ser.flush()
while True:
serial_line = ser.readline().decode("ascii", errors='ignore')
if cmd in serial_line:
print_line(serial_line)
break
else:
if len(serial_line) > 0:
print_line(serial_line)
if len(serial_line) > 0:
if cmd in serial_line:
break
if time.monotonic() > timeout_start + timeout:
print("Error, timeout waiting for command echo")
break
print_line(serial_line)
else:
if time.monotonic() > timeout_start + timeout:
print("Error, timeout waiting for command echo")
break
# clear
ser.reset_input_buffer()
# verify param value
cmd = "param show " + param_name
serial_cmd = '{0}\r\n'.format(cmd)
print("Running command: \'{0}\'".format(cmd))
serial_cmd = '{0}\n'.format(cmd)
ser.write(serial_cmd.encode("ascii"))
ser.flush()
param_show_response = param_name + " ["
@ -103,34 +108,27 @@ def do_param_set_cmd(port, baudrate, param_name, param_value):
while True:
serial_line = ser.readline().decode("ascii", errors='ignore')
if param_show_response in serial_line:
if len(serial_line) > 0:
print_line(serial_line)
current_param_value = serial_line.split(":")[-1].strip()
if (current_param_value == param_value):
sys.exit(0)
if param_show_response in serial_line:
current_param_value = serial_line.split(":")[-1].strip()
if (current_param_value == param_value):
sys.exit(0)
else:
sys.exit(1)
else:
sys.exit(1)
if time.monotonic() > timeout_start + timeout:
if "nsh>" in serial_line:
sys.exit(1) # error, command didn't complete successfully
elif "NuttShell (NSH)" in serial_line:
sys.exit(1) # error, command didn't complete successfully
else:
if len(serial_line) > 0:
print_line(serial_line)
if time.monotonic() > timeout_start + timeout:
if "nsh>" in serial_line:
sys.exit(1) # error, command didn't complete successfully
elif "NuttShell (NSH)" in serial_line:
sys.exit(1) # error, command didn't complete successfully
print("Error, timeout")
sys.exit(-1)
if time.monotonic() > timeout_start + timeout:
print("Error, timeout")
sys.exit(-1)
if len(serial_line) <= 0:
ser.write("\r\n".encode("ascii"))
ser.flush()
time.sleep(0.2)
ser.close()
def main():
@ -157,12 +155,17 @@ def main():
parser = ArgumentParser(description=__doc__)
parser.add_argument('--device', "-d", nargs='?', default=default_device, help='', required=device_required)
parser.add_argument("--baudrate", "-b", dest="baudrate", type=int, help="Mavlink port baud rate (default=57600)", default=57600)
parser.add_argument("--baudrate", "-b", dest="baudrate", type=int, help="serial port baud rate (default=57600)", default=57600)
parser.add_argument("--name", "-p", dest="param_name", help="Parameter name")
parser.add_argument("--value", "-v", dest="param_value", help="Parameter value")
args = parser.parse_args()
do_param_set_cmd(args.device, args.baudrate, args.param_name, args.param_value)
tmp_file = "{0}/pyserial_spy_file.txt".format(tempfile.gettempdir())
port_url = "spy://{0}?file={1}".format(args.device, tmp_file)
print("pyserial url: {0}".format(port_url))
do_param_set_cmd(port_url, args.baudrate, args.param_name, args.param_value)
if __name__ == "__main__":
main()

View File

@ -8,6 +8,7 @@ import re
import sys
import datetime
import serial.tools.list_ports as list_ports
import tempfile
COLOR_RED = "\x1b[31m"
COLOR_GREEN = "\x1b[32m"
@ -38,14 +39,13 @@ def print_line(line):
print('{0}'.format(line), end='')
def reboot(port, baudrate):
ser = serial.Serial(port, baudrate, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, timeout=1, xonxoff=False, rtscts=False, dsrdtr=False)
# clear
ser.reset_input_buffer()
def reboot(port_url, baudrate):
ser = serial.serial_for_url(url=port_url, baudrate=baudrate, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, timeout=3, xonxoff=False, rtscts=False, dsrdtr=False, inter_byte_timeout=1)
time_start = time.monotonic()
ser.write("\n".encode("ascii"))
ser.write("\n\n\n".encode("ascii"))
ser.write("reboot\n".encode("ascii"))
time_reboot_cmd = time_start
@ -55,26 +55,27 @@ def reboot(port, baudrate):
return_code = 0
while True:
if time.monotonic() > time_reboot_cmd + timeout_reboot_cmd:
time_reboot_cmd = time.monotonic()
print("sending reboot cmd again")
ser.write("reboot\n".encode("ascii"))
time.sleep(0.5)
serial_line = ser.readline().decode("ascii", errors='ignore')
if len(serial_line) > 0:
print_line(serial_line)
if "ERROR" in serial_line:
return_code = -1
print_line(serial_line)
if "NuttShell (NSH)" in serial_line:
sys.exit(return_code)
if "NuttShell (NSH)" in serial_line:
sys.exit(return_code)
else:
if time.monotonic() > time_start + timeout:
print("Error, timeout")
sys.exit(-1)
if time.monotonic() > time_reboot_cmd + timeout_reboot_cmd:
time_reboot_cmd = time.monotonic()
print("sending reboot cmd again")
ser.write("reboot\n".encode("ascii"))
if time.monotonic() > time_start + timeout:
print("Error, timeout")
sys.exit(-1)
def main():

View File

@ -8,6 +8,7 @@ import re
import sys
import datetime
import serial.tools.list_ports as list_ports
import tempfile
COLOR_RED = "\x1b[31m"
COLOR_GREEN = "\x1b[32m"
@ -38,59 +39,59 @@ def print_line(line):
print('{0}'.format(line), end='')
def do_nsh_cmd(port, baudrate, cmd):
ser = serial.Serial(port, baudrate, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, timeout=1, xonxoff=False, rtscts=False, dsrdtr=False)
def do_nsh_cmd(port_url, baudrate, cmd):
ser = serial.serial_for_url(url=port_url, baudrate=baudrate, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, timeout=1, xonxoff=False, rtscts=False, dsrdtr=False, inter_byte_timeout=1)
timeout_start = time.monotonic()
timeout = 30 # 30 seconds
ser.write("\n\n\n".encode("ascii"))
# wait for nsh prompt
while True:
ser.write("\n".encode("ascii"))
ser.flush()
serial_line = ser.readline().decode("ascii", errors='ignore')
if "nsh>" in serial_line:
break
if len(serial_line) > 0:
if "nsh>" in serial_line:
break
else:
if len(serial_line) > 0:
print_line(serial_line)
if time.monotonic() > timeout_start + timeout:
print("Error, timeout waiting for prompt")
sys.exit(1)
ser.write("\n".encode("ascii"))
if time.monotonic() > timeout_start + timeout:
print("Error, timeout waiting for prompt")
sys.exit(1)
# clear
ser.reset_input_buffer()
# run command
timeout_start = time.monotonic()
timeout = 1 # 1 second
timeout = 5 # 5 seconds
success_cmd = "cmd succeeded!"
# wait for command echo
print("Running command: \'{0}\'".format(cmd))
serial_cmd = '{0}; echo "{1}"; echo "{2}";\r\n'.format(cmd, success_cmd, success_cmd)
serial_cmd = '{0}; echo "{1}"; echo "{2}";\n'.format(cmd, success_cmd, success_cmd)
ser.write(serial_cmd.encode("ascii"))
ser.flush()
while True:
serial_line = ser.readline().decode("ascii", errors='ignore')
if cmd in serial_line:
break
elif serial_line.startswith(success_cmd) and len(serial_line) <= len(success_cmd) + 2:
print_line(serial_line)
# we missed the echo, but command ran and succeeded
sys.exit(0)
else:
if len(serial_line) > 0:
if len(serial_line) > 0:
if cmd in serial_line:
break
elif serial_line.startswith(success_cmd) and len(serial_line) <= len(success_cmd) + 2:
print_line(serial_line)
if (len(serial_line) <= 0) and (time.monotonic() > timeout_start + timeout):
print("Error, timeout waiting for command echo")
break
# we missed the echo, but command ran and succeeded
sys.exit(0)
else:
print_line(serial_line)
else:
if time.monotonic() > timeout_start + timeout:
print("Error, timeout waiting for command echo")
break
timeout_start = time.monotonic()
@ -101,31 +102,24 @@ def do_nsh_cmd(port, baudrate, cmd):
while True:
serial_line = ser.readline().decode("ascii", errors='ignore')
if success_cmd in serial_line:
sys.exit(return_code)
break
else:
if len(serial_line) > 0:
if len(serial_line) > 0:
if success_cmd in serial_line:
sys.exit(return_code)
else:
if "ERROR " in serial_line:
return_code = -1
print_line(serial_line)
if "nsh>" in serial_line:
sys.exit(1) # error, command didn't complete successfully
elif "NuttShell (NSH)" in serial_line:
sys.exit(1) # error, command didn't complete successfully
if "nsh>" in serial_line:
sys.exit(1) # error, command didn't complete successfully
elif "NuttShell (NSH)" in serial_line:
sys.exit(1) # error, command didn't complete successfully
else:
if time.monotonic() > timeout_start + timeout:
print("Error, timeout")
sys.exit(-1)
if (len(serial_line) <= 0) and (time.monotonic() > timeout_start + timeout):
print("Error, timeout")
sys.exit(-1)
if len(serial_line) <= 0:
ser.write("\r\n".encode("ascii"))
ser.flush()
time.sleep(0.2)
ser.close()
def main():
@ -152,11 +146,16 @@ def main():
parser = ArgumentParser(description=__doc__)
parser.add_argument('--device', "-d", nargs='?', default=default_device, help='', required=device_required)
parser.add_argument("--baudrate", "-b", dest="baudrate", type=int, help="Mavlink port baud rate (default=57600)", default=57600)
parser.add_argument("--baudrate", "-b", dest="baudrate", type=int, help="serial port baud rate (default=57600)", default=57600)
parser.add_argument("--cmd", "-c", dest="cmd", help="Command to run")
args = parser.parse_args()
do_nsh_cmd(args.device, args.baudrate, args.cmd)
tmp_file = "{0}/pyserial_spy_file.txt".format(tempfile.gettempdir())
port_url = "spy://{0}?file={1}".format(args.device, tmp_file)
print("pyserial url: {0}".format(port_url))
do_nsh_cmd(port_url, args.baudrate, args.cmd)
if __name__ == "__main__":
main()

View File

@ -10,6 +10,8 @@ import os
import sys
import datetime
import serial.tools.list_ports as list_ports
import tempfile
import warnings
COLOR_RED = "\x1b[31m"
COLOR_GREEN = "\x1b[32m"
@ -40,34 +42,36 @@ def print_line(line):
print('{0}'.format(line), end='')
def do_test(port, baudrate, test_name):
ser = serial.Serial(port, baudrate, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, timeout=1, xonxoff=False, rtscts=False, dsrdtr=False)
def do_test(port_url, baudrate, test_name):
# ignore pyserial spy:// resource warnings
warnings.filterwarnings(action="ignore", message="unclosed", category=ResourceWarning)
ser = serial.serial_for_url(url=port_url, baudrate=baudrate, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, timeout=3, xonxoff=False, rtscts=False, dsrdtr=False, inter_byte_timeout=1)
timeout_start = time.monotonic()
timeout = 30 # 30 seconds
ser.write("\n\n\n".encode("ascii"))
# wait for nsh prompt
while True:
ser.write("\n".encode("ascii"))
ser.flush()
serial_line = ser.readline().decode("ascii", errors='ignore')
if "nsh>" in serial_line:
break
if len(serial_line) > 0:
if "nsh>" in serial_line:
break
else:
if len(serial_line) > 0:
print(serial_line, end='')
if time.monotonic() > timeout_start + timeout:
print("Error, timeout waiting for prompt")
return False
ser.write("\n".encode("ascii"))
if time.monotonic() > timeout_start + timeout:
print("Error, timeout waiting for prompt")
return False
# clear
ser.reset_input_buffer()
success = False
# run test cmd
print('\n|======================================================================')
cmd = 'tests ' + test_name
@ -81,20 +85,17 @@ def do_test(port, baudrate, test_name):
print("Running command: \'{0}\'".format(cmd))
serial_cmd = '{0}\n'.format(cmd)
ser.write(serial_cmd.encode("ascii"))
ser.flush()
while True:
serial_line = ser.readline().decode("ascii", errors='ignore')
if cmd in serial_line:
break
if len(serial_line) > 0:
if cmd in serial_line:
break
else:
if len(serial_line) > 0:
print_line(serial_line)
if time.monotonic() > timeout_start + timeout:
print("Error, timeout waiting for command echo")
break
if time.monotonic() > timeout_start + timeout:
print("Error, timeout waiting for command echo")
break
# print results, wait for final result (PASSED or FAILED)
timeout = 300 # 5 minutes
@ -107,27 +108,26 @@ def do_test(port, baudrate, test_name):
if len(serial_line) > 0:
print_line(serial_line)
if test_name + " PASSED" in serial_line:
success = True
break
elif test_name + " FAILED" in serial_line:
success = False
break
if test_name + " PASSED" in serial_line:
ser.close()
return True
elif test_name + " FAILED" in serial_line:
ser.close()
return False
else:
if time.monotonic() > timeout_start + timeout:
print("Error, timeout")
print(test_name + f" {COLOR_RED}FAILED{COLOR_RESET}")
ser.close()
return False
if time.monotonic() > timeout_start + timeout:
print("Error, timeout")
print(test_name + f" {COLOR_RED}FAILED{COLOR_RESET}")
success = False
break
# newline every 10 seconds if still running
if (len(serial_line) <= 0) and (time.monotonic() - timeout_newline > 10):
ser.write("\n".encode("ascii"))
timeout_newline = time.monotonic()
# newline every 30 seconds if still running
if time.monotonic() - timeout_newline > 30:
ser.write("\n".encode("ascii"))
timeout_newline = time.monotonic()
ser.close()
return success
return False
class TestHardwareMethods(unittest.TestCase):
TEST_DEVICE = 0
@ -233,10 +233,15 @@ def main():
parser = ArgumentParser(description=__doc__)
parser.add_argument('--device', "-d", nargs='?', default=default_device, help='', required=device_required)
parser.add_argument("--baudrate", "-b", dest="baudrate", type=int, help="Mavlink port baud rate (default=57600)", default=57600)
parser.add_argument("--baudrate", "-b", dest="baudrate", type=int, help="serial port baud rate (default=57600)", default=57600)
args = parser.parse_args()
TestHardwareMethods.TEST_DEVICE = args.device
tmp_file = "{0}/pyserial_spy_file.txt".format(tempfile.gettempdir())
port_url = "spy://{0}?file={1}".format(args.device, tmp_file)
print("pyserial url: {0}".format(port_url))
TestHardwareMethods.TEST_DEVICE = port_url
TestHardwareMethods.TEST_BAUDRATE = args.baudrate
unittest.main(__name__, failfast=True, verbosity=0, argv=['main'])