#!/usr/bin/env python ''' Create a replay log using master branch. Check out a specified branch, compile and run Replay against replay log Run check_replay.py over the produced log ''' import git import glob import os import subprocess import sys import re from pymavlink import DFReader import check_replay class CheckReplayBranch(object): def __init__(self, master='remotes/origin/master'): self.master = master def find_topdir(self): here = os.getcwd() bits = here.split(os.path.sep) while len(bits): # print("bits: %s" % str(bits)) tmp = bits[:] tmp.extend([".git"]) # can we look for something more specific? flagfile = os.path.sep.join(tmp) if os.path.exists(flagfile): # print("Found in (%s)" % str(flagfile)) return os.path.sep.join(bits) bits = bits[:-2] raise FileNotFoundError() def find_repo(self): return git.Repo(self.topdir) def assert_tree_clean(self): if self.repo.is_dirty(): raise ValueError("Tree is dirty") def is_replayable_log(self, logfile_path): '''returns true if an XKF1 or NKF1 message appears in first 5000 messages''' dfreader = DFReader.DFReader_binary(logfile_path, zero_time_base=True); seen_log_replay = False seen_log_disarmed = False while True: m = dfreader.recv_match(type='PARM') if m is None: if not seen_log_replay: return False if not seen_log_disarmed: return False break if m.Name == "LOG_REPLAY": if seen_log_replay: return False if m.Value != 1: return False seen_log_replay = True if m.Name == "LOG_DISARMED": if seen_log_disarmed: return False seen_log_disarmed = True if m.Value != 1: return False return False def is_replay_log(self, logfile_path): '''returns true if an XKF1 or NKF1 message appears in first 5000 messages''' dfreader = DFReader.DFReader_binary(logfile_path, zero_time_base=True); count = 0 while True: m = dfreader.recv_match() if m is None: break if m.get_type() == 'XKF1' and m.C >= 100: return True if m.get_type() == 'NKF1' and m.C >= 100: return True count += 1 if count > 5000: return False return False def progress(self, message): print("CRB: %s" % message) def build_replay(self): subprocess.check_call(["./waf", "replay"]) def run_replay_on_log(self, logfile_path): subprocess.check_call(["./build/sitl/tools/Replay", logfile_path]) def get_logs(self): return sorted(glob.glob("logs/*.BIN")) def run_autotest_replay_on_master(self): # remember where we were: old_branch = self.repo.active_branch # check out the master branch: self.repo.head.reference = self.master self.repo.head.reset(index=True, working_tree=True) # generate logs: subprocess.check_call(["Tools/autotest/autotest.py", "--debug", "build.Copter", "test.Copter.Replay"]) # check out the original branch: self.repo.head.reference = old_branch self.repo.head.reset(index=True, working_tree=True) def find_replayed_logs(self): '''find logs which were replayed in the autotest''' replayed_logs = set() for logfile_path in self.get_logs(): self.progress(" Checking %s" % logfile_path) dfreader = DFReader.DFReader_binary(logfile_path, zero_time_base=True); while True: m = dfreader.recv_match(type='MSG') if m is None: break match = re.match(".*Running replay on \(([^)]+)\).*", m.Message) if match is None: continue replayed_logs.add(match.group(1)) return sorted(list(replayed_logs)) def run(self): self.topdir = self.find_topdir() self.repo = self.find_repo() self.assert_tree_clean() os.chdir(self.topdir) self.progress("chdir (%s)" % str(self.topdir)) self.progress("Running autotest Replay on %s" % self.master) self.run_autotest_replay_on_master() self.progress("Building Replay") self.build_replay() self.progress("Build of Replay done") # check all replayable logs self.progress("Finding replayed logs") replay_logs = self.find_replayed_logs() success = True if len(replay_logs) == 0: raise ValueError("Found no Replay logs") for log in replay_logs: self.progress("Running Replay on (%s)" % log) old_logs = self.get_logs() self.run_replay_on_log(log) new_logs = self.get_logs() delta = [x for x in new_logs if x not in old_logs] if len(delta) != 1: raise ValueError("Expected a single new log") new_log = delta[0] self.progress("Running check_replay.py on Replay output log: %s" % new_log) # run check_replay across Replay log if check_replay.check_log(new_log, verbose=True): self.progress("check_replay.py of (%s): OK" % new_log) else: self.progress("check_replay.py of (%s): FAILED" % new_log) success = False if success: self.progress("All OK") else: self.progress("Failed") return success if __name__ == '__main__': import sys from argparse import ArgumentParser parser = ArgumentParser(description=__doc__) parser.add_argument("--master", default='remotes/origin/master', help="branch to consider master branch") args = parser.parse_args() s = CheckReplayBranch(master=args.master) if not s.run(): sys.exit(1) sys.exit(0)