ardupilot/Tools/autotest/logger_metadata/parse.py
Simon Hancock c0a503d74d autotest: Provide format and unit/multiplier info for log messages
Definitions of each character are extracted from LogStructure.h
Data is extracted by parsing the logging definition struct
Also parse WriteMessage() calls for messages not defined in struct
Add support to separate log descriptions for messages with same field list
Compute derived unit from combination of format, unit and multiplier
For XML output the format and derived unit into new attributes
Add enumerations to the XML output (bitmasks were already done)
For MD,RST,HTML, output either derived unit, 'char[n]', 'bitmask' or 'enum'
Fix support for Blimp by adding it to the parse_enum.py lookup table
2024-01-16 11:24:34 +11:00

498 lines
20 KiB
Python
Executable File

#!/usr/bin/env python
from __future__ import print_function
import argparse
import copy
import os
import re
import sys
import emit_html
import emit_rst
import emit_xml
import emit_md
import enum_parse
from enum_parse import EnumDocco
topdir = os.path.join(os.path.dirname(os.path.realpath(__file__)), '../../../')
topdir = os.path.realpath(topdir)
# Regular expressions for finding message information in code comments
re_loggermessage = re.compile(r"@LoggerMessage\s*:\s*([\w,]+)", re.MULTILINE)
re_commentline = re.compile(r"\s*//")
re_description = re.compile(r"\s*//\s*@Description\s*:\s*(.*)")
re_url = re.compile(r"\s*//\s*@URL\s*:\s*(.*)")
re_field = re.compile(r"\s*//\s*@Field\s*:\s*(\w+):\s*(.*)")
re_fieldbits = re.compile(r"\s*//\s*@FieldBits\s*:\s*(\w+):\s*(.*)")
re_fieldbitmaskenum = re.compile(r"\s*//\s*@FieldBitmaskEnum\s*:\s*(\w+):\s*(.*)")
re_fieldvalueenum = re.compile(r"\s*//\s*@FieldValueEnum\s*:\s*(\w+):\s*(.*)")
re_vehicles = re.compile(r"\s*//\s*@Vehicles\s*:\s*(.*)")
# Regular expressions for finding message definitions in structure format
re_start_messagedef = re.compile(r"^\s*{?\s*LOG_[A-Z0-9_]+_[MSGTA]+[A-Z0-9_]*\s*,")
re_deffield = r'[\s\\]*"?([\w\-#?%]+)"?\s*'
re_full_messagedef = re.compile(r'\s*LOG_\w+\s*,\s*\w+\([^)]+\)[\s\\]*,' + f'{re_deffield},{re_deffield},' + r'[\s\\]*"?([\w,]+)"?[\s\\]*,' + f'{re_deffield},{re_deffield}' , re.MULTILINE)
re_fmt_define = re.compile(r'#define\s+(\w+_FMT)\s+"([\w\-#?%]+)"')
re_units_define = re.compile(r'#define\s+(\w+_UNITS)\s+"([\w\-#?%]+)"')
re_mults_define = re.compile(r'#define\s+(\w+_MULTS)\s+"([\w\-#?%]+)"')
# Regular expressions for finding message definitions in Write calls
re_start_writecall = re.compile(r"\s*[AP:]*logger[\(\)]*.Write[StreamingCrcl]*\(")
re_writefield = r'\s*"([\w\-#?%,]+)"\s*'
re_full_writecall = re.compile(r'\s*[AP:]*logger[\(\)]*.Write[StreamingCrcl]*\(' + f'{re_writefield},{re_writefield},{re_writefield},({re_writefield},{re_writefield})?' , re.MULTILINE)
# Regular expression for extracting unit and multipliers from structure
re_units_mults_struct = re.compile(r"^\s*{\s*'([\w\-#?%!/])',"+r'\s*"?([\w\-#?%./]*)"?\s*}')
# TODO: validate URLS actually return 200
# Lookup tables are populated by reading LogStructure.h
log_fmt_lookup = {}
log_units_lookup = {}
log_mult_lookup = {}
# Lookup table to convert multiplier to prefix
mult_prefix_lookup = {
0: "",
1: "",
1e-1: "d", # deci-
1e-2: "c", # centi-
1e-3: "m", # milli-
1e-6: "μ", # micro-
1e-9: "n" # nano-
}
class LoggerDocco(object):
vehicle_map = {
"Rover": "Rover",
"Sub": "ArduSub",
"Copter": "ArduCopter",
"Plane": "ArduPlane",
"Tracker": "AntennaTracker",
"Blimp": "Blimp",
}
def __init__(self, vehicle):
self.vehicle = vehicle
self.doccos = []
self.emitters = [
emit_html.HTMLEmitter(),
emit_rst.RSTEmitter(),
emit_xml.XMLEmitter(),
emit_md.MDEmitter(),
]
self.msg_fmts_list = {}
self.msg_units_list = {}
self.msg_mults_list = {}
class Docco(object):
def __init__(self, name):
self.name = name
self.url = None
if isinstance(name,list):
self.description = [None] * len(name)
else:
self.description = None
self.fields = {}
self.fields_order = []
self.vehicles = None
self.bits_enums = []
def add_name(self, name):
# If self.name/description aren't lists, convert them
if isinstance(self.name,str):
self.name = [self.name]
self.description = [self.description]
# Replace any existing empty descriptions with empty strings
for i in range(0,len(self.description)):
if self.description[i] is None:
self.description[i] = ""
# Extend the name and description lists
if isinstance(name,list):
self.name.extend(name)
self.description.extend([None] * len(name))
else:
self.name.append(name)
self.description.append(None)
def set_description(self, desc):
if isinstance(self.description,list):
for i in range(0,len(self.description)):
if self.description[i] is None:
self.description[i] = desc
else:
self.description = desc
def set_url(self, url):
self.url = url
def ensure_field(self, field):
if field not in self.fields:
self.fields[field] = {}
self.fields_order.append(field)
def set_field_description(self, field, description):
if field in self.fields:
raise ValueError("Already have field %s in %s" %
(field, self.name))
self.ensure_field(field)
self.fields[field]["description"] = description
def set_field_bits(self, field, bits):
bits = bits.split(",")
count = 0
entries = []
for bit in bits:
entries.append(EnumDocco.EnumEntry(bit, 1<<count, None))
count += 1
bitmask_name = self.name + field
self.bits_enums.append(EnumDocco.Enumeration(bitmask_name, entries))
self.ensure_field(field)
self.fields[field]["bitmaskenum"] = bitmask_name
def set_fieldbitmaskenum(self, field, bits):
self.ensure_field(field)
self.fields[field]["bitmaskenum"] = bits
def set_fieldvalueenum(self, field, bits):
self.ensure_field(field)
self.fields[field]["valueenum"] = bits
def set_vehicles(self, vehicles):
self.vehicles = vehicles
def set_fmts(self, fmts):
# If no fields are defined, do nothing
if len(self.fields_order)==0:
return
# Make sure lengths match up
if len(fmts) != len(self.fields_order):
print(f"Number of fmts don't match fields: msg={self.name} fmts={fmts} num_fields={len(self.fields_order)}")
return
# Loop through the list
for idx in range(0,len(fmts)):
if fmts[idx] in log_fmt_lookup:
self.fields[self.fields_order[idx]]["fmt"] = log_fmt_lookup[fmts[idx]]
else:
print(f"Unrecognised format character: {fmts[idx]} in message {self.name}")
def set_units(self, units, mults):
# If no fields are defined, do nothing
if len(self.fields_order)==0:
return
# Make sure lengths match up
if len(units) != len(self.fields_order) or len(units) != len(mults):
print(f"Number of units/mults/fields don't match: msg={self.name} units={units} mults={mults} num_fields={len(self.fields_order)}")
return
# Loop through the list
for idx in range(0,len(units)):
# Get the index into fields from field_order
f = self.fields_order[idx]
# Convert unit char to base unit
if units[idx] in log_units_lookup:
baseunit = log_units_lookup[units[idx]]
else:
print(f"Unrecognised units character: {units[idx]} in message {self.name}")
continue
# Do nothing if this field has no unit defined
if baseunit == "":
continue
# Convert mult char to value
if mults[idx] in log_mult_lookup:
mult = log_mult_lookup[mults[idx]]
mult_num = float(mult)
else:
print(f"Unrecognised multiplier character: {mults[idx]} in message {self.name}")
continue
# Check if the defined format for this field contains its own multiplier
# If so, the presented value will be the base-unit directly
if 'fmt' in self.fields[f] and self.fields[f]['fmt'].endswith("* 100"):
self.fields[f]["units"] = baseunit
elif 'fmt' in self.fields[f] and "latitude/longitude" in self.fields[f]['fmt']:
self.fields[f]["units"] = baseunit
# Check if we have a defined prefix for this multiplier
elif mult_num in mult_prefix_lookup:
self.fields[f]["units"] = f"{mult_prefix_lookup[mult_num]}{baseunit}"
# If all else fails, set the unit as the multipler and base unit together
else:
self.fields[f]["units"] = f"{mult} {baseunit}"
def populate_lookups(self):
# Initialise the lookup tables
# Read the contents of the LogStructure.h file
structfile = os.path.join(topdir, "libraries", "AP_Logger", "LogStructure.h")
with open(structfile) as f:
lines = f.readlines()
f.close()
# Initialise current section to none
section = "none"
# Loop through the lines in the file
for line in lines:
# Look for the start of fmt/unit/mult info
if line.startswith("Format characters"):
section = "fmt"
elif line.startswith("const struct UnitStructure"):
section = "units"
elif line.startswith("const struct MultiplierStructure"):
section = "mult"
# Read formats from code comment, e.g.:
# b : int8_t
elif section == "fmt":
if "*/" in line:
section = "none"
else:
parts = line.split(":")
log_fmt_lookup[parts[0].strip()] = parts[1].strip()
# Read units or multipliers from C struct definition, e.g.:
# { '2', 1e2 }, or { 'J', "W.s" },
elif section != "none":
if "};" in line:
section = "none"
else:
u = re_units_mults_struct.search(line)
if u is not None and section == "units":
log_units_lookup[u.group(1)] = u.group(2)
if u is not None and section == "mult":
log_mult_lookup[u.group(1)] = u.group(2)
def search_for_files(self, dirs_to_search):
_next = []
for _dir in dirs_to_search:
_dir = os.path.join(topdir, _dir)
for entry in os.listdir(_dir):
filepath = os.path.join(_dir, entry)
if os.path.isdir(filepath):
_next.append(filepath)
continue
(name, extension) = os.path.splitext(filepath)
if extension not in [".cpp", ".h"]:
continue
self.files.append(filepath)
if len(_next):
self.search_for_files(_next)
def parse_messagedef(self,messagedef):
# Merge concatinated strings and remove comments
messagedef = re.sub(r'"\s+"', '', messagedef)
messagedef = re.sub(r'//[^\n]*', '', messagedef)
# Extract details from a structure definition
d = re_full_messagedef.search(messagedef)
if d is not None:
self.msg_fmts_list[d.group(1)] = d.group(2)
self.msg_units_list[d.group(1)] = d.group(4)
self.msg_mults_list[d.group(1)] = d.group(5)
return
# Extract details from a WriteStreaming call
d = re_full_writecall.search(messagedef)
if d is not None:
if d.group(1) in self.msg_fmts_list:
return
if d.group(5) is None:
self.msg_fmts_list[d.group(1)] = d.group(3)
else:
self.msg_fmts_list[d.group(1)] = d.group(6)
self.msg_units_list[d.group(1)] = d.group(3)
self.msg_mults_list[d.group(1)] = d.group(5)
return
# Didn't parse
#print(f"Unable to parse: {messagedef}")
def search_messagedef_start(self,line,prevmessagedef=""):
# Look for the start of a structure definition
d = re_start_messagedef.search(line)
if d is not None:
messagedef = line
if "}" in line:
self.parse_messagedef(messagedef)
return ""
else:
return messagedef
# Look for a new call to WriteStreaming
d = re_start_writecall.search(line)
if d is not None:
messagedef = line
if ";" in line:
self.parse_messagedef(messagedef)
return ""
else:
return messagedef
# If we didn't find a new one, continue with any previous state
return prevmessagedef
def parse_file(self, filepath):
with open(filepath) as f:
# print("Opened (%s)" % filepath)
lines = f.readlines()
f.close()
state_outside = "outside"
state_inside = "inside"
messagedef = ""
state = state_outside
docco = None
for line in lines:
if messagedef:
messagedef = messagedef + line
if "}" in line or ";" in line:
self.parse_messagedef(messagedef)
messagedef = ""
if state == state_outside:
# Check for start of a message definition
messagedef = self.search_messagedef_start(line,messagedef)
# Check for fmt/unit/mult #define
u = re_fmt_define.search(line)
if u is not None:
self.msg_fmts_list[u.group(1)] = u.group(2)
u = re_units_define.search(line)
if u is not None:
self.msg_units_list[u.group(1)] = u.group(2)
u = re_mults_define.search(line)
if u is not None:
self.msg_mults_list[u.group(1)] = u.group(2)
# Check for the @LoggerMessage tag indicating the start of the docco block
m = re_loggermessage.search(line)
if m is None:
continue
name = m.group(1)
if "," in name:
name = name.split(",")
state = state_inside
docco = LoggerDocco.Docco(name)
elif state == state_inside:
# If this line is not a comment, then this is the end of the docco block
if not re_commentline.match(line):
state = state_outside
if docco.vehicles is None or self.vehicle in docco.vehicles:
self.finalise_docco(docco)
messagedef = self.search_messagedef_start(line)
continue
# Check for an multiple @LoggerMessage lines in this docco block
m = re_loggermessage.search(line)
if m is not None:
name = m.group(1)
if "," in name:
name = name.split(",")
docco.add_name(name)
continue
# Find and extract data from the various docco fields
m = re_description.match(line)
if m is not None:
docco.set_description(m.group(1))
continue
m = re_url.match(line)
if m is not None:
docco.set_url(m.group(1))
continue
m = re_field.match(line)
if m is not None:
docco.set_field_description(m.group(1), m.group(2))
continue
m = re_fieldbits.match(line)
if m is not None:
docco.set_field_bits(m.group(1), m.group(2))
continue
m = re_fieldbitmaskenum.match(line)
if m is not None:
docco.set_fieldbitmaskenum(m.group(1), m.group(2))
continue
m = re_fieldvalueenum.match(line)
if m is not None:
docco.set_fieldvalueenum(m.group(1), m.group(2))
continue
m = re_vehicles.match(line)
if m is not None:
docco.set_vehicles([x.strip() for x in m.group(1).split(',')])
continue
print("Unknown field (%s)" % str(line))
sys.exit(1)
def parse_files(self):
for _file in self.files:
self.parse_file(_file)
def emit_output(self):
# expand things like PIDR,PIDQ,PIDA into multiple doccos
new_doccos = []
for docco in self.doccos:
if isinstance(docco.name, list):
for name,desc in zip(docco.name, docco.description):
tmpdocco = copy.copy(docco)
tmpdocco.name = name
tmpdocco.description = desc
new_doccos.append(tmpdocco)
else:
new_doccos.append(docco)
new_doccos = sorted(new_doccos, key=lambda x : x.name)
# Try to attach the formats/units/multipliers
for docco in new_doccos:
# Apply the Formats to the docco
if docco.name in self.msg_fmts_list:
if "FMT" in self.msg_fmts_list[docco.name]:
if self.msg_fmts_list[docco.name] in self.msg_fmts_list:
docco.set_fmts(self.msg_fmts_list[self.msg_fmts_list[docco.name]])
else:
docco.set_fmts(self.msg_fmts_list[docco.name])
else:
print(f"No formats found for message {docco.name}")
# Get the Units
units = None
if docco.name in self.msg_units_list:
if "UNITS" in self.msg_units_list[docco.name]:
if self.msg_units_list[docco.name] in self.msg_units_list:
units = self.msg_units_list[self.msg_units_list[docco.name]]
else:
units = self.msg_units_list[docco.name]
# Get the Multipliers
mults = None
if docco.name in self.msg_mults_list:
if "MULTS" in self.msg_mults_list[docco.name]:
if self.msg_mults_list[docco.name] in self.msg_mults_list:
mults = self.msg_mults_list[self.msg_mults_list[docco.name]]
else:
mults = self.msg_mults_list[docco.name]
# Apply the units/mults to the docco
if units is not None and mults is not None:
docco.set_units(units,mults)
elif units is not None or mults is not None:
print(f"Cannot find matching units/mults for message {docco.name}")
enums_by_name = {}
for enum in self.enumerations:
enums_by_name[enum.name] = enum
for emitter in self.emitters:
emitter.emit(new_doccos, enums_by_name)
def run(self):
self.populate_lookups()
self.enumerations = enum_parse.EnumDocco(self.vehicle).get_enumerations()
self.files = []
self.search_for_files([self.vehicle_map[self.vehicle], "libraries"])
self.parse_files()
self.emit_output()
def finalise_docco(self, docco):
self.doccos.append(docco)
self.enumerations += docco.bits_enums
if __name__ == '__main__':
parser = argparse.ArgumentParser(description="Parse parameters.")
parser.add_argument("-v", "--verbose", dest='verbose', action='store_true', default=False, help="show debugging output")
parser.add_argument("--vehicle", required=True, help="Vehicle type to generate for")
args = parser.parse_args()
s = LoggerDocco(args.vehicle)
if args.vehicle not in s.vehicle_map:
print("Invalid vehicle (choose from: %s)" % str(s.vehicle_map.keys()))
sys.exit(1)
s.run()