From d0236ceac991611c4f9ff8ff5a5e3e13c1463a9d Mon Sep 17 00:00:00 2001 From: Peter Barker Date: Wed, 2 Dec 2020 19:23:38 +1100 Subject: [PATCH] Replay: check_replay_branch.py: inspect all logs for replayability --- Tools/Replay/check_replay_branch.py | 135 +++++++++++++++++++++++++--- 1 file changed, 123 insertions(+), 12 deletions(-) diff --git a/Tools/Replay/check_replay_branch.py b/Tools/Replay/check_replay_branch.py index e04e28ceb3..ee233207b3 100755 --- a/Tools/Replay/check_replay_branch.py +++ b/Tools/Replay/check_replay_branch.py @@ -7,9 +7,13 @@ Run check_replay.py over the produced log ''' import git +import glob import os import subprocess import sys +import re + +from pymavlink import DFReader import check_replay @@ -38,11 +42,65 @@ class CheckReplayBranch(object): if self.repo.is_dirty(): raise ValueError("Tree is dirty") - def run(self): - self.topdir = self.find_topdir() - self.repo = self.find_repo() - self.assert_tree_clean() + def is_replayable_log(self, logfile_path): + '''returns true if an XKF1 or NKF1 message appears in first 5000 + messages''' + dfreader = DFReader.DFReader_binary(logfile_path, zero_time_base=True); + seen_log_replay = False + seen_log_disarmed = False + while True: + m = dfreader.recv_match(type='PARM') + if m is None: + if not seen_log_replay: + return False + if not seen_log_disarmed: + return False + break + if m.Name == "LOG_REPLAY": + if seen_log_replay: + return False + if m.Value != 1: + return False + seen_log_replay = True + if m.Name == "LOG_DISARMED": + if seen_log_disarmed: + return False + seen_log_disarmed = True + if m.Value != 1: + return False + return False + def is_replay_log(self, logfile_path): + '''returns true if an XKF1 or NKF1 message appears in first 5000 + messages''' + dfreader = DFReader.DFReader_binary(logfile_path, zero_time_base=True); + count = 0 + while True: + m = dfreader.recv_match() + if m is None: + break + if m.get_type() == 'XKF1' and m.C >= 100: + return True + if m.get_type() == 'NKF1' and m.C >= 100: + return True + count += 1 + if count > 5000: + return False + return False + + def progress(self, message): + print("CRB: %s" % message) + + def build_replay(self): + subprocess.check_call(["./waf", "replay"]) + + def run_replay_on_log(self, logfile_path): + subprocess.check_call(["./build/sitl/tools/Replay", logfile_path]) + + def get_logs(self): + return sorted(glob.glob("logs/*.BIN")) + + def run_autotest_replay_on_master(self): # remember where we were: old_branch = self.repo.active_branch @@ -50,20 +108,73 @@ class CheckReplayBranch(object): self.repo.head.reference = self.master self.repo.head.reset(index=True, working_tree=True) - # generate a log: - os.chdir(self.topdir) - print("chdir (%s)" % str(self.topdir)) + # generate logs: subprocess.check_call(["Tools/autotest/autotest.py", "--debug", "build.Copter", "test.Copter.Replay"]) - logfile_name = "00000004.BIN" # FIXME; parse output of subprocess? - logfile_path = os.path.join("logs", logfile_name) - # check out the original branch: self.repo.head.reference = old_branch self.repo.head.reset(index=True, working_tree=True) - # run check_replay across Replay log - return check_replay.check_log(logfile_path) + def find_replayed_logs(self): + '''find logs which were replayed in the autotest''' + replayed_logs = set() + for logfile_path in self.get_logs(): + self.progress(" Checking %s" % logfile_path) + dfreader = DFReader.DFReader_binary(logfile_path, zero_time_base=True); + while True: + m = dfreader.recv_match(type='MSG') + if m is None: + break + match = re.match(".*Running replay on \(([^)]+)\).*", m.Message) + if match is None: + continue + replayed_logs.add(match.group(1)) + return sorted(list(replayed_logs)) + + def run(self): + self.topdir = self.find_topdir() + self.repo = self.find_repo() + self.assert_tree_clean() + + os.chdir(self.topdir) + self.progress("chdir (%s)" % str(self.topdir)) + + self.progress("Running autotest Replay on %s" % self.master) + self.run_autotest_replay_on_master() + + self.progress("Building Replay") + self.build_replay() + self.progress("Build of Replay done") + + # check all replayable logs + self.progress("Finding replayed logs") + replay_logs = self.find_replayed_logs() + success = True + if len(replay_logs) == 0: + raise ValueError("Found no Replay logs") + for log in replay_logs: + self.progress("Running Replay on (%s)" % log) + old_logs = self.get_logs() + self.run_replay_on_log(log) + new_logs = self.get_logs() + delta = [x for x in new_logs if x not in old_logs] + if len(delta) != 1: + raise ValueError("Expected a single new log") + new_log = delta[0] + self.progress("Running check_replay.py on Replay output log: %s" % new_log) + + # run check_replay across Replay log + if check_replay.check_log(new_log, verbose=True): + self.progress("check_replay.py of (%s): OK" % new_log) + else: + self.progress("check_replay.py of (%s): FAILED" % new_log) + success = False + if success: + self.progress("All OK") + else: + self.progress("Failed") + + return success if __name__ == '__main__': import sys