autotest: comprehensive dataflash test

This commit is contained in:
Andy Piper 2020-06-21 16:00:15 +01:00 committed by Peter Barker
parent ccb583d092
commit 6ebc7622fe
2 changed files with 152 additions and 5 deletions

View File

@ -5476,6 +5476,14 @@ class AutoTestCopter(AutoTest):
"Test that Alt Estimation is mandatory for ALT_HOLD",
self.test_alt_estimate_prearm),
("DataFlash",
"Test DataFlash Block backend",
self.test_dataflash_sitl),
("DataFlashErase",
"Test DataFlash Block backend erase",
self.test_dataflash_erase),
("LogUpload",
"Log upload",
self.log_upload),

View File

@ -26,6 +26,7 @@ from pymavlink import mavextra
from pymavlink import mavparm
from pysim import util, vehicleinfo
from io import StringIO
# a list of pexpect objects to read while waiting for
# messages. This keeps the output to stdout flowing
@ -5319,20 +5320,57 @@ Also, ignores heartbeats not from our target system'''
raise ex
def test_dataflash_sitl(self):
"""Test the basic functionality of block logging"""
self.context_push()
ex = None
try:
self.set_parameter("LOG_BACKEND_TYPE", 5)
self.set_parameter("LOG_BACKEND_TYPE", 4)
self.set_parameter("LOG_FILE_DSRMROT", 1)
self.reboot_sitl()
self.drain_mav() # hopefully draining COMMAND_ACK from that failed arm
# First log created here, but we are in chip erase so ignored
self.mavproxy.send("module load log\n")
# self.mavproxy.expect("Loaded module log\n")
self.mavproxy.send("log erase\n")
self.mavproxy.expect("Chip erase complete")
self.wait_ready_to_arm()
if self.is_copter() or self.is_plane():
self.set_autodisarm_delay(0)
self.arm_vehicle()
self.delay_sim_time(5)
self.disarm_vehicle()
# First log created here
self.delay_sim_time(2)
self.arm_vehicle()
self.delay_sim_time(5)
self.disarm_vehicle()
# Second log created here
self.delay_sim_time(2)
self.mavproxy.send("log list\n")
self.mavproxy.expect("Log 1 numLogs 1 lastLog 1 size")
self.mavproxy.expect("Log ([0-9]+) numLogs ([0-9]+) lastLog ([0-9]+) size ([0-9]+)", timeout=120)
log_num = int(self.mavproxy.match.group(1))
numlogs = int(self.mavproxy.match.group(2))
lastlog = int(self.mavproxy.match.group(3))
size = int(self.mavproxy.match.group(4))
if numlogs != 2 or log_num != 1 or size <= 0:
raise NotAchievedException("Unexpected log information %d %d %d" % (log_num, numlogs, lastlog))
self.progress("Log size: %d" % size)
self.reboot_sitl()
# This starts a new log with a time of 0, wait for arm so that we can insert the correct time
self.wait_ready_to_arm()
# Third log created here
self.mavproxy.send("log list\n")
self.mavproxy.expect("Log 1 numLogs 2 lastLog 2 size")
self.mavproxy.expect("Log 1 numLogs 3 lastLog 3 size")
# Download second and third logs
self.mavproxy.send("log download 2 logs/dataflash-log-002.BIN\n")
self.mavproxy.expect("Finished downloading", timeout=120)
self.mavproxy.send("log download 3 logs/dataflash-log-003.BIN\n")
self.mavproxy.expect("Finished downloading", timeout=120)
# Erase the logs
self.mavproxy.send("log erase\n")
self.mavproxy.expect("Chip erase complete")
except Exception as e:
self.progress("Exception (%s) caught" % str(e))
ex = e
@ -5342,6 +5380,107 @@ Also, ignores heartbeats not from our target system'''
if ex is not None:
raise ex
def validate_log_file(self, logname, header_errors=0):
"""Validate the contents of a log file"""
# read the downloaded log - it must parse without error
class Capturing(list):
def __enter__(self):
self._stderr = sys.stderr
sys.stderr = self._stringio = StringIO()
return self
def __exit__(self, *args):
self.extend(self._stringio.getvalue().splitlines())
del self._stringio # free up some memory
sys.stderr = self._stderr
with Capturing() as df_output:
try:
mlog = mavutil.mavlink_connection(logname)
while True:
m = mlog.recv_match()
if m is None:
break
except Exception as e:
raise NotAchievedException("Error reading log file %s: %s" % (logname, str(e)))
herrors = 0
for msg in df_output:
if msg.startswith("bad header") or msg.startswith("unknown msg type"):
herrors = herrors + 1
if herrors > header_errors:
raise NotAchievedException("Error parsing log file %s, %d header errors" % (logname, herrors))
def test_dataflash_erase(self):
"""Test that erasing the dataflash chip and creating a new log is error free"""
ex = None
self.context_push()
try:
self.set_parameter("LOG_BACKEND_TYPE", 4)
self.reboot_sitl()
self.mavproxy.send("module load log\n")
self.mavproxy.send("log erase\n")
self.mavproxy.expect("Chip erase complete")
self.set_parameter("LOG_DISARMED", 1)
self.delay_sim_time(3)
self.set_parameter("LOG_DISARMED", 0)
self.mavproxy.send("log download 1 logs/dataflash-log-erase.BIN\n")
self.mavproxy.expect("Finished downloading", timeout=120)
# read the downloaded log - it must parse without error
self.validate_log_file("logs/dataflash-log-erase.BIN")
self.start_subtest("Test file wrapping results in a valid file")
# roughly 4mb
self.set_parameter("LOG_FILE_DSRMROT", 1)
self.set_parameter("LOG_BITMASK", 131071)
self.wait_ready_to_arm()
if self.is_copter() or self.is_plane():
self.set_autodisarm_delay(0)
self.arm_vehicle()
self.delay_sim_time(30)
self.disarm_vehicle()
# roughly 4mb
self.arm_vehicle()
self.delay_sim_time(30)
self.disarm_vehicle()
# roughly 9mb, should wrap around
self.arm_vehicle()
self.delay_sim_time(50)
self.disarm_vehicle()
# make sure we have finished logging
self.delay_sim_time(15)
self.mavproxy.send("log list\n")
self.mavproxy.expect("Log ([0-9]+) numLogs ([0-9]+) lastLog ([0-9]+) size ([0-9]+)", timeout=120)
if int(self.mavproxy.match.group(2)) != 2:
raise NotAchievedException("Expected 2 logs")
self.mavproxy.send("log download 1 logs/dataflash-log-erase2.BIN\n")
self.mavproxy.expect("Finished downloading", timeout=120)
self.validate_log_file("logs/dataflash-log-erase2.BIN", 1)
self.mavproxy.send("log download latest logs/dataflash-log-erase3.BIN\n")
self.mavproxy.expect("Finished downloading", timeout=120)
self.validate_log_file("logs/dataflash-log-erase3.BIN", 1)
# clean up
self.mavproxy.send("log erase\n")
self.mavproxy.expect("Chip erase complete")
# clean up
self.mavproxy.send("log erase\n")
self.mavproxy.expect("Chip erase complete")
except Exception as e:
self.progress("Exception (%s) caught" % str(e))
ex = e
self.mavproxy.send("module unload log\n")
self.context_pop()
self.reboot_sitl()
if ex is not None:
raise ex
def test_arm_feature(self):
"""Common feature to test."""
# TEST ARMING/DISARM