Replay: check_replay_branch.py: inspect all logs for replayability

This commit is contained in:
Peter Barker 2020-12-02 19:23:38 +11:00 committed by Peter Barker
parent b289701d20
commit d0236ceac9

View File

@ -7,9 +7,13 @@ Run check_replay.py over the produced log
''' '''
import git import git
import glob
import os import os
import subprocess import subprocess
import sys import sys
import re
from pymavlink import DFReader
import check_replay import check_replay
@ -38,11 +42,65 @@ class CheckReplayBranch(object):
if self.repo.is_dirty(): if self.repo.is_dirty():
raise ValueError("Tree is dirty") raise ValueError("Tree is dirty")
def run(self): def is_replayable_log(self, logfile_path):
self.topdir = self.find_topdir() '''returns true if an XKF1 or NKF1 message appears in first 5000
self.repo = self.find_repo() messages'''
self.assert_tree_clean() dfreader = DFReader.DFReader_binary(logfile_path, zero_time_base=True);
seen_log_replay = False
seen_log_disarmed = False
while True:
m = dfreader.recv_match(type='PARM')
if m is None:
if not seen_log_replay:
return False
if not seen_log_disarmed:
return False
break
if m.Name == "LOG_REPLAY":
if seen_log_replay:
return False
if m.Value != 1:
return False
seen_log_replay = True
if m.Name == "LOG_DISARMED":
if seen_log_disarmed:
return False
seen_log_disarmed = True
if m.Value != 1:
return False
return False
def is_replay_log(self, logfile_path):
'''returns true if an XKF1 or NKF1 message appears in first 5000
messages'''
dfreader = DFReader.DFReader_binary(logfile_path, zero_time_base=True);
count = 0
while True:
m = dfreader.recv_match()
if m is None:
break
if m.get_type() == 'XKF1' and m.C >= 100:
return True
if m.get_type() == 'NKF1' and m.C >= 100:
return True
count += 1
if count > 5000:
return False
return False
def progress(self, message):
print("CRB: %s" % message)
def build_replay(self):
subprocess.check_call(["./waf", "replay"])
def run_replay_on_log(self, logfile_path):
subprocess.check_call(["./build/sitl/tools/Replay", logfile_path])
def get_logs(self):
return sorted(glob.glob("logs/*.BIN"))
def run_autotest_replay_on_master(self):
# remember where we were: # remember where we were:
old_branch = self.repo.active_branch old_branch = self.repo.active_branch
@ -50,20 +108,73 @@ class CheckReplayBranch(object):
self.repo.head.reference = self.master self.repo.head.reference = self.master
self.repo.head.reset(index=True, working_tree=True) self.repo.head.reset(index=True, working_tree=True)
# generate a log: # generate logs:
os.chdir(self.topdir)
print("chdir (%s)" % str(self.topdir))
subprocess.check_call(["Tools/autotest/autotest.py", "--debug", "build.Copter", "test.Copter.Replay"]) subprocess.check_call(["Tools/autotest/autotest.py", "--debug", "build.Copter", "test.Copter.Replay"])
logfile_name = "00000004.BIN" # FIXME; parse output of subprocess?
logfile_path = os.path.join("logs", logfile_name)
# check out the original branch: # check out the original branch:
self.repo.head.reference = old_branch self.repo.head.reference = old_branch
self.repo.head.reset(index=True, working_tree=True) self.repo.head.reset(index=True, working_tree=True)
# run check_replay across Replay log def find_replayed_logs(self):
return check_replay.check_log(logfile_path) '''find logs which were replayed in the autotest'''
replayed_logs = set()
for logfile_path in self.get_logs():
self.progress(" Checking %s" % logfile_path)
dfreader = DFReader.DFReader_binary(logfile_path, zero_time_base=True);
while True:
m = dfreader.recv_match(type='MSG')
if m is None:
break
match = re.match(".*Running replay on \(([^)]+)\).*", m.Message)
if match is None:
continue
replayed_logs.add(match.group(1))
return sorted(list(replayed_logs))
def run(self):
self.topdir = self.find_topdir()
self.repo = self.find_repo()
self.assert_tree_clean()
os.chdir(self.topdir)
self.progress("chdir (%s)" % str(self.topdir))
self.progress("Running autotest Replay on %s" % self.master)
self.run_autotest_replay_on_master()
self.progress("Building Replay")
self.build_replay()
self.progress("Build of Replay done")
# check all replayable logs
self.progress("Finding replayed logs")
replay_logs = self.find_replayed_logs()
success = True
if len(replay_logs) == 0:
raise ValueError("Found no Replay logs")
for log in replay_logs:
self.progress("Running Replay on (%s)" % log)
old_logs = self.get_logs()
self.run_replay_on_log(log)
new_logs = self.get_logs()
delta = [x for x in new_logs if x not in old_logs]
if len(delta) != 1:
raise ValueError("Expected a single new log")
new_log = delta[0]
self.progress("Running check_replay.py on Replay output log: %s" % new_log)
# run check_replay across Replay log
if check_replay.check_log(new_log, verbose=True):
self.progress("check_replay.py of (%s): OK" % new_log)
else:
self.progress("check_replay.py of (%s): FAILED" % new_log)
success = False
if success:
self.progress("All OK")
else:
self.progress("Failed")
return success
if __name__ == '__main__': if __name__ == '__main__':
import sys import sys