Tools: add more tests for log downloads

This commit is contained in:
Pierre Kancir 2022-05-04 13:46:43 +02:00 committed by Andrew Tridgell
parent 879937fffe
commit 8cd91b4789
2 changed files with 136 additions and 27 deletions

View File

@ -1524,7 +1524,8 @@ class AutoTestQuadPlane(vehicle_test_suite.TestSuite):
self.PilotYaw, self.PilotYaw,
self.ParameterChecks, self.ParameterChecks,
self.QAUTOTUNE, self.QAUTOTUNE,
self.LogDownload, self.TestLogDownload,
self.TestLogDownloadWrap,
self.EXTENDED_SYS_STATE, self.EXTENDED_SYS_STATE,
self.Mission, self.Mission,
self.Weathervane, self.Weathervane,

View File

@ -32,6 +32,7 @@ import random
import tempfile import tempfile
import threading import threading
import enum import enum
from pathlib import Path
from MAVProxy.modules.lib import mp_util from MAVProxy.modules.lib import mp_util
from MAVProxy.modules.lib import mp_elevation from MAVProxy.modules.lib import mp_elevation
@ -3454,34 +3455,13 @@ class TestSuite(ABC):
if ex is not None: if ex is not None:
raise ex raise ex
def LogDownload(self): def download_full_log_list(self, print_logs=True):
'''Test Onboard Log Download'''
if self.is_tracker():
# tracker starts armed, which is annoying
return
self.progress("Ensuring we have contents we care about")
self.set_parameter("LOG_FILE_DSRMROT", 1)
self.set_parameter("LOG_DISARMED", 0)
self.reboot_sitl()
original_log_list = self.log_list()
for i in range(0, 10):
self.wait_ready_to_arm()
self.arm_vehicle()
self.delay_sim_time(1)
self.disarm_vehicle()
new_log_list = self.log_list()
new_log_count = len(new_log_list) - len(original_log_list)
if new_log_count != 10:
raise NotAchievedException("Expected exactly 10 new logs got %u (%s) to (%s)" %
(new_log_count, original_log_list, new_log_list))
self.progress("Directory contents: %s" % str(new_log_list))
tstart = self.get_sim_time() tstart = self.get_sim_time()
self.mav.mav.log_request_list_send(self.sysid_thismav(), self.mav.mav.log_request_list_send(self.sysid_thismav(),
1, # target component 1, # target component
0, 0,
0xff) 0xffff)
logs = [] logs = {}
last_id = None last_id = None
num_logs = None num_logs = None
while True: while True:
@ -3491,10 +3471,11 @@ class TestSuite(ABC):
m = self.mav.recv_match(type='LOG_ENTRY', m = self.mav.recv_match(type='LOG_ENTRY',
blocking=True, blocking=True,
timeout=1) timeout=1)
self.progress("Received (%s)" % str(m)) if print_logs:
self.progress("Received (%s)" % str(m))
if m is None: if m is None:
continue continue
logs.append(m) logs[m.id] = m
if last_id is None: if last_id is None:
if m.num_logs == 0: if m.num_logs == 0:
# caller to guarantee this works: # caller to guarantee this works:
@ -3521,7 +3502,134 @@ class TestSuite(ABC):
timeout=2) timeout=2)
if m is not None: if m is not None:
raise NotAchievedException("Received extra LOG_ENTRY?!") raise NotAchievedException("Received extra LOG_ENTRY?!")
return logs
def TestLogDownloadWrap(self):
"""Test log wrapping."""
if self.is_tracker():
# tracker starts armed, which is annoying
return
self.progress("Ensuring we have contents we care about")
self.set_parameter("LOG_FILE_DSRMROT", 1)
self.set_parameter("LOG_DISARMED", 0)
self.reboot_sitl()
logspath = Path("logs")
def create_num_logs(num_logs, logsdir, clear_logsdir=True):
if clear_logsdir:
shutil.rmtree(logsdir, ignore_errors=True)
logsdir.mkdir()
lastlogfile_path = logsdir / Path("LASTLOG.TXT")
self.progress(f"Add LASTLOG.TXT file with counter at {num_logs}")
with open(lastlogfile_path, 'w') as lastlogfile:
lastlogfile.write(f"{num_logs}\n")
self.progress(f"Create fakelogs from 1 to {num_logs} on logs directory")
for ii in range(1, num_logs + 1):
new_log = logsdir / Path(f"{str(ii).zfill(8)}.BIN")
with open(new_log, 'w+') as logfile:
logfile.write(f"I AM LOG {ii}\n")
logfile.write('1' * ii)
def verify_logs(test_log_num):
try:
wrap = False
offset = 0
max_logs_num = int(self.get_parameter("LOG_MAX_FILES"))
if test_log_num > max_logs_num:
wrap = True
offset = test_log_num - max_logs_num
test_log_num = max_logs_num
logs_dict = self.download_full_log_list(print_logs=False)
if len(logs_dict) != test_log_num:
raise NotAchievedException(
f"Didn't get the full log list, expect {test_log_num} got {len(logs_dict)}")
self.progress("Checking logs size are matching")
start_log = offset if wrap else 1
for ii in range(start_log, test_log_num + 1 - offset):
log_i = logspath / Path(f"{str(ii + offset).zfill(8)}.BIN")
if logs_dict[ii].size != log_i.stat().st_size:
logs_dict = self.download_full_log_list(print_logs=False)
# sometimes we don't have finish writing the log, so get it again prevent failure
if logs_dict[ii].size != log_i.stat().st_size:
raise NotAchievedException(
f"Log{ii} size mismatch : {logs_dict[ii].size} vs {log_i.stat().st_size}"
)
if wrap:
self.progress("Checking wrapped logs size are matching")
for ii in range(1, offset):
log_i = logspath / Path(f"{str(ii).zfill(8)}.BIN")
if logs_dict[test_log_num + 1 - offset + ii].size != log_i.stat().st_size:
self.progress(f"{logs_dict[test_log_num + 1 - offset + ii]}")
raise NotAchievedException(
f"Log{test_log_num + 1 - offset + ii} size mismatch :"
f" {logs_dict[test_log_num + 1 - offset + ii].size} vs {log_i.stat().st_size}"
)
except NotAchievedException as e:
shutil.rmtree(logspath, ignore_errors=True)
logspath.mkdir()
with open(logspath / Path("LASTLOG.TXT"), 'w') as lastlogfile:
lastlogfile.write("1\n")
raise e
def add_and_verify_log(test_log_num):
self.wait_ready_to_arm()
self.arm_vehicle()
self.delay_sim_time(1)
self.disarm_vehicle()
self.delay_sim_time(10)
verify_logs(test_log_num + 1)
def create_and_verify_logs(test_log_num, clear_logsdir=True):
self.progress(f"Test {test_log_num} logs")
create_num_logs(test_log_num, logspath, clear_logsdir)
self.reboot_sitl()
verify_logs(test_log_num)
self.start_subsubtest("Adding one more log")
add_and_verify_log(test_log_num)
self.start_subtest("Checking log list match with filesystem info")
create_and_verify_logs(500)
create_and_verify_logs(10)
create_and_verify_logs(1)
self.start_subtest("Change LOG_MAX_FILES and Checking log list match with filesystem info")
self.set_parameter("LOG_MAX_FILES", 250)
create_and_verify_logs(250)
self.set_parameter("LOG_MAX_FILES", 1)
create_and_verify_logs(1)
self.start_subtest("Change LOG_MAX_FILES, don't clear old logs and Checking log list match with filesystem info")
self.set_parameter("LOG_MAX_FILES", 500)
create_and_verify_logs(500)
self.set_parameter("LOG_MAX_FILES", 250)
create_and_verify_logs(250, clear_logsdir=False)
# cleanup
shutil.rmtree(logspath, ignore_errors=True)
def TestLogDownload(self):
"""Test Onboard Log Download."""
if self.is_tracker():
# tracker starts armed, which is annoying
return
self.progress("Ensuring we have contents we care about")
self.set_parameter("LOG_FILE_DSRMROT", 1)
self.set_parameter("LOG_DISARMED", 0)
self.reboot_sitl()
original_log_list = self.log_list()
for i in range(0, 10):
self.wait_ready_to_arm()
self.arm_vehicle()
self.delay_sim_time(1)
self.disarm_vehicle()
new_log_list = self.log_list()
new_log_count = len(new_log_list) - len(original_log_list)
if new_log_count != 10:
raise NotAchievedException("Expected exactly 10 new logs got %u (%s) to (%s)" %
(new_log_count, original_log_list, new_log_list))
self.progress("Directory contents: %s" % str(new_log_list))
self.download_full_log_list()
log_id = 5 log_id = 5
ofs = 6 ofs = 6
count = 2 count = 2