ardupilot/Tools/Replay/check_replay_branch.py

192 lines
6.2 KiB
Python
Raw Normal View History

#!/usr/bin/env python
'''
Create a replay log using master branch.
Check out a specified branch, compile and run Replay against replay log
Run check_replay.py over the produced log
'''
import git
import glob
import os
import subprocess
import sys
import re
from pymavlink import DFReader
import check_replay
class CheckReplayBranch(object):
def __init__(self, master='remotes/origin/master'):
self.master = master
def find_topdir(self):
here = os.getcwd()
bits = here.split(os.path.sep)
while len(bits):
# print("bits: %s" % str(bits))
tmp = bits[:]
tmp.extend([".git"]) # can we look for something more specific?
flagfile = os.path.sep.join(tmp)
if os.path.exists(flagfile):
# print("Found in (%s)" % str(flagfile))
return os.path.sep.join(bits)
bits = bits[:-2]
raise FileNotFoundError()
def find_repo(self):
return git.Repo(self.topdir)
def assert_tree_clean(self):
if self.repo.is_dirty():
raise ValueError("Tree is dirty")
def is_replayable_log(self, logfile_path):
'''returns true if an XKF1 or NKF1 message appears in first 5000
messages'''
dfreader = DFReader.DFReader_binary(logfile_path, zero_time_base=True);
seen_log_replay = False
seen_log_disarmed = False
while True:
m = dfreader.recv_match(type='PARM')
if m is None:
if not seen_log_replay:
return False
if not seen_log_disarmed:
return False
break
if m.Name == "LOG_REPLAY":
if seen_log_replay:
return False
if m.Value != 1:
return False
seen_log_replay = True
if m.Name == "LOG_DISARMED":
if seen_log_disarmed:
return False
seen_log_disarmed = True
if m.Value != 1:
return False
return False
def is_replay_log(self, logfile_path):
'''returns true if an XKF1 or NKF1 message appears in first 5000
messages'''
dfreader = DFReader.DFReader_binary(logfile_path, zero_time_base=True);
count = 0
while True:
m = dfreader.recv_match()
if m is None:
break
if m.get_type() == 'XKF1' and m.C >= 100:
return True
if m.get_type() == 'NKF1' and m.C >= 100:
return True
count += 1
if count > 5000:
return False
return False
def progress(self, message):
print("CRB: %s" % message)
def build_replay(self):
subprocess.check_call(["./waf", "replay"])
def run_replay_on_log(self, logfile_path):
subprocess.check_call(["./build/sitl/tool/Replay", logfile_path])
def get_logs(self):
return sorted(glob.glob("logs/*.BIN"))
def run_autotest_replay_on_master(self):
# remember where we were:
old_branch = self.repo.active_branch
# check out the master branch:
self.repo.head.reference = self.master
self.repo.head.reset(index=True, working_tree=True)
# generate logs:
subprocess.check_call(["Tools/autotest/autotest.py", "--debug", "build.Copter", "test.Copter.Replay"])
# check out the original branch:
self.repo.head.reference = old_branch
self.repo.head.reset(index=True, working_tree=True)
def find_replayed_logs(self):
'''find logs which were replayed in the autotest'''
replayed_logs = set()
for logfile_path in self.get_logs():
self.progress(" Checking %s" % logfile_path)
dfreader = DFReader.DFReader_binary(logfile_path, zero_time_base=True);
while True:
m = dfreader.recv_match(type='MSG')
if m is None:
break
match = re.match(".*Running replay on \(([^)]+)\).*", m.Message)
if match is None:
continue
replayed_logs.add(match.group(1))
return sorted(list(replayed_logs))
def run(self):
self.topdir = self.find_topdir()
self.repo = self.find_repo()
self.assert_tree_clean()
os.chdir(self.topdir)
self.progress("chdir (%s)" % str(self.topdir))
self.progress("Running autotest Replay on %s" % self.master)
self.run_autotest_replay_on_master()
self.progress("Building Replay")
self.build_replay()
self.progress("Build of Replay done")
# check all replayable logs
self.progress("Finding replayed logs")
replay_logs = self.find_replayed_logs()
success = True
if len(replay_logs) == 0:
raise ValueError("Found no Replay logs")
for log in replay_logs:
self.progress("Running Replay on (%s)" % log)
old_logs = self.get_logs()
self.run_replay_on_log(log)
new_logs = self.get_logs()
delta = [x for x in new_logs if x not in old_logs]
if len(delta) != 1:
raise ValueError("Expected a single new log")
new_log = delta[0]
self.progress("Running check_replay.py on Replay output log: %s" % new_log)
# run check_replay across Replay log
if check_replay.check_log(new_log, verbose=True):
self.progress("check_replay.py of (%s): OK" % new_log)
else:
self.progress("check_replay.py of (%s): FAILED" % new_log)
success = False
if success:
self.progress("All OK")
else:
self.progress("Failed")
return success
if __name__ == '__main__':
import sys
from argparse import ArgumentParser
parser = ArgumentParser(description=__doc__)
parser.add_argument("--master", default='remotes/origin/master', help="branch to consider master branch")
args = parser.parse_args()
s = CheckReplayBranch(master=args.master)
if not s.run():
sys.exit(1)
sys.exit(0)