autotest: add sanity checking for undocumented/overdocumented logger messages

This commit is contained in:
Peter Barker 2020-03-21 11:19:43 +11:00 committed by Peter Barker
parent 3914ae23ca
commit 6206cbd1fa

View File

@ -1033,6 +1033,227 @@ class AutoTest(ABC):
# if fail:
# raise NotAchievedException("Extra parameters in XML")
def find_format_defines(self, filepath):
ret = {}
for line in open(filepath,'rb').readlines():
m = re.match('#define (\w+_(?:LABELS|FMT|UNITS|MULTS))\s+(".*")', line)
if m is None:
continue
(a, b) = (m.group(1), m.group(2))
if a in ret:
raise NotAchievedException("Duplicate define for (%s)" % a)
ret[a] = b
return ret
def vehicle_code_dirpath(self):
'''returns path to vehicle-specific code directory e.g. ~/ardupilot/APMrover2'''
dirname = self.log_name()
if dirname == "QuadPlane":
dirname = "ArduPlane"
elif dirname == "HeliCopter":
dirname = "ArduCopter"
return os.path.join(self.rootdir(), dirname)
def all_log_format_ids(self):
ids = {}
filepath = os.path.join(self.rootdir(), 'libraries', 'AP_Logger', 'LogStructure.h')
state_outside = 0
state_inside = 1
state = state_outside
defines = self.find_format_defines(filepath)
linestate_none = 45
linestate_within = 46
linestate = linestate_none
message_infos = []
for line in open(filepath,'rb').readlines():
# print("line: %s" % line)
line = re.sub("//.*", "", line) # trim comments
if re.match("\s*$", line):
# blank line
continue
if state == state_outside:
if "#define LOG_BASE_STRUCTURES" in line:
# self.progress("Moving inside")
state = state_inside
continue
if state == state_inside:
if "#define LOG_COMMON_STRUCTURES" in line:
# self.progress("Moving outside")
state = state_outside
break
if linestate == linestate_none:
if "#define LOG_SBP_STRUCTURES" in line:
continue
m = re.match("\s*{(.*)},\s*", line)
if m is not None:
# complete line
# print("Complete line: %s" % str(line))
message_infos.append(m.group(1))
continue
m = re.match("\s*{(.*)[\\\]", line)
if m is None:
raise NotAchievedException("Bad line %s" % line)
partial_line = m.group(1)
linestate = linestate_within
continue
if linestate == linestate_within:
m = re.match("(.*)}", line)
if m is None:
raise NotAchievedException("Bad closing line (%s)" % line)
message_infos.append(partial_line + m.group(1))
linestate = linestate_none
continue
raise NotAchievedException("Bad line (%s)")
if linestate != linestate_none:
raise NotAchievedException("Must be linestate-none at end of file")
if state == state_inside:
raise NotAchievedException("Must be outside at end of file")
# now look in the vehicle-specific logfile:
filepath = os.path.join(self.vehicle_code_dirpath(), "Log.cpp")
state_outside = 67
state_inside = 68
state = state_outside
linestate_none = 89
linestate_within = 90
linestate = linestate_none
for line in open(filepath,'rb').readlines():
line = re.sub("//.*", "", line) # trim comments
if re.match("\s*$", line):
# blank line
continue
if state == state_outside:
if ("const LogStructure" in line or
"const struct LogStructure" in line):
state = state_inside;
continue
if state == state_inside:
if re.match("};", line):
state = state_outside;
break;
if linestate == linestate_none:
if "#if FRAME_CONFIG == HELI_FRAME" in line:
continue
if "#if PRECISION_LANDING == ENABLED" in line:
continue
if "#end" in line:
continue
if "LOG_COMMON_STRUCTURES" in line:
continue
m = re.match("\s*{(.*)},\s*", line)
if m is not None:
# complete line
# print("Complete line: %s" % str(line))
message_infos.append(m.group(1))
continue
m = re.match("\s*{(.*)", line)
if m is None:
raise NotAchievedException("Bad line %s" % line)
partial_line = m.group(1)
linestate = linestate_within
continue
if linestate == linestate_within:
m = re.match("(.*)}", line)
if m is None:
raise NotAchievedException("Bad closing line (%s)" % line)
message_infos.append(partial_line + m.group(1))
linestate = linestate_none
continue
raise NotAchievedException("Bad line (%s)")
if state == state_inside:
raise NotAchievedException("Should not be in state_inside at end")
for message_info in message_infos:
for define in defines:
message_info = re.sub(define, defines[define], message_info)
m = re.match('\s*LOG_\w+\s*,\s*sizeof\([^)]+\)\s*,\s*"(\w+)"\s*,\s*"(\w+)"\s*,\s*"([\w,]+)"\s*,\s*"([^"]+)"\s*,\s*"([^"]+)"\s*$', message_info)
if m is None:
raise NotAchievedException("Failed to match (%s)" % message_info)
(name, fmt, labels, units, multipliers) = (m.group(1), m.group(2), m.group(3), m.group(4), m.group(5))
if name in ids:
raise NotAchievedException("Already seen a (%s) message" % name)
ids[name] = {
"name": name,
"format": fmt,
"labels": labels,
"units": units,
"multipliers": multipliers,
}
# now look for Log_Write(...) messages:
base_directories = [
os.path.join(self.rootdir(), 'libraries'),
self.vehicle_code_dirpath(),
]
log_write_statements = []
for base_directory in base_directories:
for root, dirs, files in os.walk(base_directory):
state_outside = 37
state_inside = 38
state = state_outside
for f in files:
if not re.search("[.]cpp$", f):
continue
filepath = os.path.join(root, f)
count = 0
for line in open(filepath,'rb').readlines():
if state == state_outside:
if re.match("\s*AP::logger\(\)[.]Write\(", line):
state = state_inside
log_write_statement = line
continue
if state == state_inside:
log_write_statement += line
if re.match(".*\);", line):
log_write_statements.append(log_write_statement)
state = state_outside
count += 1
if state != state_outside:
raise NotAchievedException("Expected to be outside at end of file")
# print("%s has %u lines" % (f, count))
# change all whitespace to single space
log_write_statements = [re.sub("\s+", " ", x) for x in log_write_statements]
# print("Got log-write-statements: %s" % str(log_write_statements))
results = []
for log_write_statement in log_write_statements:
for define in defines:
log_write_statement = re.sub(define, defines[define], log_write_statement)
my_re = ' AP::logger\(\)[.]Write\(\s*"(\w+)"\s*,\s*"([\w,]+)".*\);'
m = re.match(my_re, log_write_statement)
if m is None:
if "TimeUS,C,Cnt,IMUMin,IMUMax,EKFMin" in log_write_statement:
# special-case for logging ekf timing:
for name in ["NKT", "XKT"]:
x = re.sub(" name,", '"'+name+'",', log_write_statement)
m = re.match(my_re, x)
if m is None:
raise NotAchievedException("Did not match (%s)", x)
results.append((m.group(1), m.group(2)))
continue
raise NotAchievedException("Did not match (%s)" % (log_write_statement))
else:
results.append((m.group(1), m.group(2)))
for result in results:
(name, labels) = result
if name in ids:
raise Exception("Already have id for (%s)" % name)
# self.progress("Adding Log_Write result (%s)" % name)
ids[name] = {
"name": name,
"labels": labels,
}
if len(ids) == 0:
raise NotAchievedException("Did not get any ids")
return ids
def test_onboard_logging_generation(self):
'''just generates, as we can't do a lot of testing'''
xml_filepath = os.path.join(self.rootdir(), "LogMessages.xml")
@ -1070,10 +1291,39 @@ class AutoTest(ABC):
xml = open(xml_filepath,'rb').read()
objectify.enable_recursive_str()
tree = objectify.fromstring(xml)
docco_ids = {}
for thing in tree.logformat:
print("Got (%s)" % str(thing.get("name")))
# for entry in tree.children:
# print("Got entry (%s)" % str(entry))
name = str(thing.get("name"))
docco_ids[name] = {
"name": name,
"labels": [],
}
for field in thing.fields.field:
# print("field: (%s)" % str(field))
fieldname = field.get("name")
# print("Got (%s.%s)" % (name,str(fieldname)))
docco_ids[name]["labels"].append(fieldname)
code_ids = self.all_log_format_ids()
print("Code ids: (%s)" % str(code_ids.keys()))
print("Docco ids: (%s)" % str(docco_ids.keys()))
for name in sorted(code_ids.keys()):
if name not in docco_ids:
self.progress("Undocumented message: %s" % name)
continue
for label in code_ids[name]["labels"].split(","):
if label not in docco_ids[name]["labels"]:
raise NotAchievedException("%s.%s not in documented fields" %
(name, label))
for name in sorted(docco_ids):
if name not in code_ids:
raise NotAchievedException("Documented message (%s) not in code" % name)
for label in docco_ids[name]["labels"]:
if label not in code_ids[name]["labels"].split(","):
raise NotAchievedException("documented field %s.%s not found in code" %
(name, label))
def initialise_after_reboot_sitl(self):