mirror of
https://github.com/ArduPilot/ardupilot
synced 2025-01-10 18:08:30 -04:00
c0a503d74d
Definitions of each character are extracted from LogStructure.h Data is extracted by parsing the logging definition struct Also parse WriteMessage() calls for messages not defined in struct Add support to separate log descriptions for messages with same field list Compute derived unit from combination of format, unit and multiplier For XML output the format and derived unit into new attributes Add enumerations to the XML output (bitmasks were already done) For MD,RST,HTML, output either derived unit, 'char[n]', 'bitmask' or 'enum' Fix support for Blimp by adding it to the parse_enum.py lookup table
498 lines
20 KiB
Python
Executable File
498 lines
20 KiB
Python
Executable File
#!/usr/bin/env python
|
|
|
|
from __future__ import print_function
|
|
|
|
import argparse
|
|
import copy
|
|
import os
|
|
import re
|
|
import sys
|
|
|
|
import emit_html
|
|
import emit_rst
|
|
import emit_xml
|
|
import emit_md
|
|
|
|
import enum_parse
|
|
from enum_parse import EnumDocco
|
|
|
|
topdir = os.path.join(os.path.dirname(os.path.realpath(__file__)), '../../../')
|
|
topdir = os.path.realpath(topdir)
|
|
|
|
# Regular expressions for finding message information in code comments
|
|
re_loggermessage = re.compile(r"@LoggerMessage\s*:\s*([\w,]+)", re.MULTILINE)
|
|
re_commentline = re.compile(r"\s*//")
|
|
re_description = re.compile(r"\s*//\s*@Description\s*:\s*(.*)")
|
|
re_url = re.compile(r"\s*//\s*@URL\s*:\s*(.*)")
|
|
re_field = re.compile(r"\s*//\s*@Field\s*:\s*(\w+):\s*(.*)")
|
|
re_fieldbits = re.compile(r"\s*//\s*@FieldBits\s*:\s*(\w+):\s*(.*)")
|
|
re_fieldbitmaskenum = re.compile(r"\s*//\s*@FieldBitmaskEnum\s*:\s*(\w+):\s*(.*)")
|
|
re_fieldvalueenum = re.compile(r"\s*//\s*@FieldValueEnum\s*:\s*(\w+):\s*(.*)")
|
|
re_vehicles = re.compile(r"\s*//\s*@Vehicles\s*:\s*(.*)")
|
|
|
|
# Regular expressions for finding message definitions in structure format
|
|
re_start_messagedef = re.compile(r"^\s*{?\s*LOG_[A-Z0-9_]+_[MSGTA]+[A-Z0-9_]*\s*,")
|
|
re_deffield = r'[\s\\]*"?([\w\-#?%]+)"?\s*'
|
|
re_full_messagedef = re.compile(r'\s*LOG_\w+\s*,\s*\w+\([^)]+\)[\s\\]*,' + f'{re_deffield},{re_deffield},' + r'[\s\\]*"?([\w,]+)"?[\s\\]*,' + f'{re_deffield},{re_deffield}' , re.MULTILINE)
|
|
re_fmt_define = re.compile(r'#define\s+(\w+_FMT)\s+"([\w\-#?%]+)"')
|
|
re_units_define = re.compile(r'#define\s+(\w+_UNITS)\s+"([\w\-#?%]+)"')
|
|
re_mults_define = re.compile(r'#define\s+(\w+_MULTS)\s+"([\w\-#?%]+)"')
|
|
|
|
# Regular expressions for finding message definitions in Write calls
|
|
re_start_writecall = re.compile(r"\s*[AP:]*logger[\(\)]*.Write[StreamingCrcl]*\(")
|
|
re_writefield = r'\s*"([\w\-#?%,]+)"\s*'
|
|
re_full_writecall = re.compile(r'\s*[AP:]*logger[\(\)]*.Write[StreamingCrcl]*\(' + f'{re_writefield},{re_writefield},{re_writefield},({re_writefield},{re_writefield})?' , re.MULTILINE)
|
|
|
|
# Regular expression for extracting unit and multipliers from structure
|
|
re_units_mults_struct = re.compile(r"^\s*{\s*'([\w\-#?%!/])',"+r'\s*"?([\w\-#?%./]*)"?\s*}')
|
|
|
|
# TODO: validate URLS actually return 200
|
|
|
|
# Lookup tables are populated by reading LogStructure.h
|
|
log_fmt_lookup = {}
|
|
log_units_lookup = {}
|
|
log_mult_lookup = {}
|
|
|
|
# Lookup table to convert multiplier to prefix
|
|
mult_prefix_lookup = {
|
|
0: "",
|
|
1: "",
|
|
1e-1: "d", # deci-
|
|
1e-2: "c", # centi-
|
|
1e-3: "m", # milli-
|
|
1e-6: "μ", # micro-
|
|
1e-9: "n" # nano-
|
|
}
|
|
|
|
class LoggerDocco(object):
|
|
|
|
vehicle_map = {
|
|
"Rover": "Rover",
|
|
"Sub": "ArduSub",
|
|
"Copter": "ArduCopter",
|
|
"Plane": "ArduPlane",
|
|
"Tracker": "AntennaTracker",
|
|
"Blimp": "Blimp",
|
|
}
|
|
|
|
def __init__(self, vehicle):
|
|
self.vehicle = vehicle
|
|
self.doccos = []
|
|
self.emitters = [
|
|
emit_html.HTMLEmitter(),
|
|
emit_rst.RSTEmitter(),
|
|
emit_xml.XMLEmitter(),
|
|
emit_md.MDEmitter(),
|
|
]
|
|
self.msg_fmts_list = {}
|
|
self.msg_units_list = {}
|
|
self.msg_mults_list = {}
|
|
|
|
class Docco(object):
|
|
|
|
def __init__(self, name):
|
|
self.name = name
|
|
self.url = None
|
|
if isinstance(name,list):
|
|
self.description = [None] * len(name)
|
|
else:
|
|
self.description = None
|
|
self.fields = {}
|
|
self.fields_order = []
|
|
self.vehicles = None
|
|
self.bits_enums = []
|
|
|
|
def add_name(self, name):
|
|
# If self.name/description aren't lists, convert them
|
|
if isinstance(self.name,str):
|
|
self.name = [self.name]
|
|
self.description = [self.description]
|
|
# Replace any existing empty descriptions with empty strings
|
|
for i in range(0,len(self.description)):
|
|
if self.description[i] is None:
|
|
self.description[i] = ""
|
|
# Extend the name and description lists
|
|
if isinstance(name,list):
|
|
self.name.extend(name)
|
|
self.description.extend([None] * len(name))
|
|
else:
|
|
self.name.append(name)
|
|
self.description.append(None)
|
|
|
|
def set_description(self, desc):
|
|
if isinstance(self.description,list):
|
|
for i in range(0,len(self.description)):
|
|
if self.description[i] is None:
|
|
self.description[i] = desc
|
|
else:
|
|
self.description = desc
|
|
|
|
def set_url(self, url):
|
|
self.url = url
|
|
|
|
def ensure_field(self, field):
|
|
if field not in self.fields:
|
|
self.fields[field] = {}
|
|
self.fields_order.append(field)
|
|
|
|
def set_field_description(self, field, description):
|
|
if field in self.fields:
|
|
raise ValueError("Already have field %s in %s" %
|
|
(field, self.name))
|
|
self.ensure_field(field)
|
|
self.fields[field]["description"] = description
|
|
|
|
def set_field_bits(self, field, bits):
|
|
bits = bits.split(",")
|
|
count = 0
|
|
entries = []
|
|
for bit in bits:
|
|
entries.append(EnumDocco.EnumEntry(bit, 1<<count, None))
|
|
count += 1
|
|
bitmask_name = self.name + field
|
|
self.bits_enums.append(EnumDocco.Enumeration(bitmask_name, entries))
|
|
self.ensure_field(field)
|
|
self.fields[field]["bitmaskenum"] = bitmask_name
|
|
|
|
def set_fieldbitmaskenum(self, field, bits):
|
|
self.ensure_field(field)
|
|
self.fields[field]["bitmaskenum"] = bits
|
|
|
|
def set_fieldvalueenum(self, field, bits):
|
|
self.ensure_field(field)
|
|
self.fields[field]["valueenum"] = bits
|
|
|
|
def set_vehicles(self, vehicles):
|
|
self.vehicles = vehicles
|
|
|
|
def set_fmts(self, fmts):
|
|
# If no fields are defined, do nothing
|
|
if len(self.fields_order)==0:
|
|
return
|
|
# Make sure lengths match up
|
|
if len(fmts) != len(self.fields_order):
|
|
print(f"Number of fmts don't match fields: msg={self.name} fmts={fmts} num_fields={len(self.fields_order)}")
|
|
return
|
|
# Loop through the list
|
|
for idx in range(0,len(fmts)):
|
|
if fmts[idx] in log_fmt_lookup:
|
|
self.fields[self.fields_order[idx]]["fmt"] = log_fmt_lookup[fmts[idx]]
|
|
else:
|
|
print(f"Unrecognised format character: {fmts[idx]} in message {self.name}")
|
|
|
|
def set_units(self, units, mults):
|
|
# If no fields are defined, do nothing
|
|
if len(self.fields_order)==0:
|
|
return
|
|
# Make sure lengths match up
|
|
if len(units) != len(self.fields_order) or len(units) != len(mults):
|
|
print(f"Number of units/mults/fields don't match: msg={self.name} units={units} mults={mults} num_fields={len(self.fields_order)}")
|
|
return
|
|
# Loop through the list
|
|
for idx in range(0,len(units)):
|
|
# Get the index into fields from field_order
|
|
f = self.fields_order[idx]
|
|
# Convert unit char to base unit
|
|
if units[idx] in log_units_lookup:
|
|
baseunit = log_units_lookup[units[idx]]
|
|
else:
|
|
print(f"Unrecognised units character: {units[idx]} in message {self.name}")
|
|
continue
|
|
# Do nothing if this field has no unit defined
|
|
if baseunit == "":
|
|
continue
|
|
# Convert mult char to value
|
|
if mults[idx] in log_mult_lookup:
|
|
mult = log_mult_lookup[mults[idx]]
|
|
mult_num = float(mult)
|
|
else:
|
|
print(f"Unrecognised multiplier character: {mults[idx]} in message {self.name}")
|
|
continue
|
|
# Check if the defined format for this field contains its own multiplier
|
|
# If so, the presented value will be the base-unit directly
|
|
if 'fmt' in self.fields[f] and self.fields[f]['fmt'].endswith("* 100"):
|
|
self.fields[f]["units"] = baseunit
|
|
elif 'fmt' in self.fields[f] and "latitude/longitude" in self.fields[f]['fmt']:
|
|
self.fields[f]["units"] = baseunit
|
|
# Check if we have a defined prefix for this multiplier
|
|
elif mult_num in mult_prefix_lookup:
|
|
self.fields[f]["units"] = f"{mult_prefix_lookup[mult_num]}{baseunit}"
|
|
# If all else fails, set the unit as the multipler and base unit together
|
|
else:
|
|
self.fields[f]["units"] = f"{mult} {baseunit}"
|
|
|
|
def populate_lookups(self):
|
|
# Initialise the lookup tables
|
|
# Read the contents of the LogStructure.h file
|
|
structfile = os.path.join(topdir, "libraries", "AP_Logger", "LogStructure.h")
|
|
with open(structfile) as f:
|
|
lines = f.readlines()
|
|
f.close()
|
|
# Initialise current section to none
|
|
section = "none"
|
|
# Loop through the lines in the file
|
|
for line in lines:
|
|
# Look for the start of fmt/unit/mult info
|
|
if line.startswith("Format characters"):
|
|
section = "fmt"
|
|
elif line.startswith("const struct UnitStructure"):
|
|
section = "units"
|
|
elif line.startswith("const struct MultiplierStructure"):
|
|
section = "mult"
|
|
# Read formats from code comment, e.g.:
|
|
# b : int8_t
|
|
elif section == "fmt":
|
|
if "*/" in line:
|
|
section = "none"
|
|
else:
|
|
parts = line.split(":")
|
|
log_fmt_lookup[parts[0].strip()] = parts[1].strip()
|
|
# Read units or multipliers from C struct definition, e.g.:
|
|
# { '2', 1e2 }, or { 'J', "W.s" },
|
|
elif section != "none":
|
|
if "};" in line:
|
|
section = "none"
|
|
else:
|
|
u = re_units_mults_struct.search(line)
|
|
if u is not None and section == "units":
|
|
log_units_lookup[u.group(1)] = u.group(2)
|
|
if u is not None and section == "mult":
|
|
log_mult_lookup[u.group(1)] = u.group(2)
|
|
|
|
def search_for_files(self, dirs_to_search):
|
|
_next = []
|
|
for _dir in dirs_to_search:
|
|
_dir = os.path.join(topdir, _dir)
|
|
for entry in os.listdir(_dir):
|
|
filepath = os.path.join(_dir, entry)
|
|
if os.path.isdir(filepath):
|
|
_next.append(filepath)
|
|
continue
|
|
(name, extension) = os.path.splitext(filepath)
|
|
if extension not in [".cpp", ".h"]:
|
|
continue
|
|
self.files.append(filepath)
|
|
if len(_next):
|
|
self.search_for_files(_next)
|
|
|
|
def parse_messagedef(self,messagedef):
|
|
# Merge concatinated strings and remove comments
|
|
messagedef = re.sub(r'"\s+"', '', messagedef)
|
|
messagedef = re.sub(r'//[^\n]*', '', messagedef)
|
|
# Extract details from a structure definition
|
|
d = re_full_messagedef.search(messagedef)
|
|
if d is not None:
|
|
self.msg_fmts_list[d.group(1)] = d.group(2)
|
|
self.msg_units_list[d.group(1)] = d.group(4)
|
|
self.msg_mults_list[d.group(1)] = d.group(5)
|
|
return
|
|
# Extract details from a WriteStreaming call
|
|
d = re_full_writecall.search(messagedef)
|
|
if d is not None:
|
|
if d.group(1) in self.msg_fmts_list:
|
|
return
|
|
if d.group(5) is None:
|
|
self.msg_fmts_list[d.group(1)] = d.group(3)
|
|
else:
|
|
self.msg_fmts_list[d.group(1)] = d.group(6)
|
|
self.msg_units_list[d.group(1)] = d.group(3)
|
|
self.msg_mults_list[d.group(1)] = d.group(5)
|
|
return
|
|
# Didn't parse
|
|
#print(f"Unable to parse: {messagedef}")
|
|
|
|
def search_messagedef_start(self,line,prevmessagedef=""):
|
|
# Look for the start of a structure definition
|
|
d = re_start_messagedef.search(line)
|
|
if d is not None:
|
|
messagedef = line
|
|
if "}" in line:
|
|
self.parse_messagedef(messagedef)
|
|
return ""
|
|
else:
|
|
return messagedef
|
|
# Look for a new call to WriteStreaming
|
|
d = re_start_writecall.search(line)
|
|
if d is not None:
|
|
messagedef = line
|
|
if ";" in line:
|
|
self.parse_messagedef(messagedef)
|
|
return ""
|
|
else:
|
|
return messagedef
|
|
# If we didn't find a new one, continue with any previous state
|
|
return prevmessagedef
|
|
|
|
def parse_file(self, filepath):
|
|
with open(filepath) as f:
|
|
# print("Opened (%s)" % filepath)
|
|
lines = f.readlines()
|
|
f.close()
|
|
state_outside = "outside"
|
|
state_inside = "inside"
|
|
messagedef = ""
|
|
state = state_outside
|
|
docco = None
|
|
for line in lines:
|
|
if messagedef:
|
|
messagedef = messagedef + line
|
|
if "}" in line or ";" in line:
|
|
self.parse_messagedef(messagedef)
|
|
messagedef = ""
|
|
if state == state_outside:
|
|
# Check for start of a message definition
|
|
messagedef = self.search_messagedef_start(line,messagedef)
|
|
|
|
# Check for fmt/unit/mult #define
|
|
u = re_fmt_define.search(line)
|
|
if u is not None:
|
|
self.msg_fmts_list[u.group(1)] = u.group(2)
|
|
u = re_units_define.search(line)
|
|
if u is not None:
|
|
self.msg_units_list[u.group(1)] = u.group(2)
|
|
u = re_mults_define.search(line)
|
|
if u is not None:
|
|
self.msg_mults_list[u.group(1)] = u.group(2)
|
|
|
|
# Check for the @LoggerMessage tag indicating the start of the docco block
|
|
m = re_loggermessage.search(line)
|
|
if m is None:
|
|
continue
|
|
name = m.group(1)
|
|
if "," in name:
|
|
name = name.split(",")
|
|
state = state_inside
|
|
docco = LoggerDocco.Docco(name)
|
|
elif state == state_inside:
|
|
# If this line is not a comment, then this is the end of the docco block
|
|
if not re_commentline.match(line):
|
|
state = state_outside
|
|
if docco.vehicles is None or self.vehicle in docco.vehicles:
|
|
self.finalise_docco(docco)
|
|
messagedef = self.search_messagedef_start(line)
|
|
continue
|
|
# Check for an multiple @LoggerMessage lines in this docco block
|
|
m = re_loggermessage.search(line)
|
|
if m is not None:
|
|
name = m.group(1)
|
|
if "," in name:
|
|
name = name.split(",")
|
|
docco.add_name(name)
|
|
continue
|
|
# Find and extract data from the various docco fields
|
|
m = re_description.match(line)
|
|
if m is not None:
|
|
docco.set_description(m.group(1))
|
|
continue
|
|
m = re_url.match(line)
|
|
if m is not None:
|
|
docco.set_url(m.group(1))
|
|
continue
|
|
m = re_field.match(line)
|
|
if m is not None:
|
|
docco.set_field_description(m.group(1), m.group(2))
|
|
continue
|
|
m = re_fieldbits.match(line)
|
|
if m is not None:
|
|
docco.set_field_bits(m.group(1), m.group(2))
|
|
continue
|
|
m = re_fieldbitmaskenum.match(line)
|
|
if m is not None:
|
|
docco.set_fieldbitmaskenum(m.group(1), m.group(2))
|
|
continue
|
|
m = re_fieldvalueenum.match(line)
|
|
if m is not None:
|
|
docco.set_fieldvalueenum(m.group(1), m.group(2))
|
|
continue
|
|
m = re_vehicles.match(line)
|
|
if m is not None:
|
|
docco.set_vehicles([x.strip() for x in m.group(1).split(',')])
|
|
continue
|
|
print("Unknown field (%s)" % str(line))
|
|
sys.exit(1)
|
|
|
|
def parse_files(self):
|
|
for _file in self.files:
|
|
self.parse_file(_file)
|
|
|
|
def emit_output(self):
|
|
# expand things like PIDR,PIDQ,PIDA into multiple doccos
|
|
new_doccos = []
|
|
for docco in self.doccos:
|
|
if isinstance(docco.name, list):
|
|
for name,desc in zip(docco.name, docco.description):
|
|
tmpdocco = copy.copy(docco)
|
|
tmpdocco.name = name
|
|
tmpdocco.description = desc
|
|
new_doccos.append(tmpdocco)
|
|
else:
|
|
new_doccos.append(docco)
|
|
new_doccos = sorted(new_doccos, key=lambda x : x.name)
|
|
|
|
# Try to attach the formats/units/multipliers
|
|
for docco in new_doccos:
|
|
# Apply the Formats to the docco
|
|
if docco.name in self.msg_fmts_list:
|
|
if "FMT" in self.msg_fmts_list[docco.name]:
|
|
if self.msg_fmts_list[docco.name] in self.msg_fmts_list:
|
|
docco.set_fmts(self.msg_fmts_list[self.msg_fmts_list[docco.name]])
|
|
else:
|
|
docco.set_fmts(self.msg_fmts_list[docco.name])
|
|
else:
|
|
print(f"No formats found for message {docco.name}")
|
|
# Get the Units
|
|
units = None
|
|
if docco.name in self.msg_units_list:
|
|
if "UNITS" in self.msg_units_list[docco.name]:
|
|
if self.msg_units_list[docco.name] in self.msg_units_list:
|
|
units = self.msg_units_list[self.msg_units_list[docco.name]]
|
|
else:
|
|
units = self.msg_units_list[docco.name]
|
|
# Get the Multipliers
|
|
mults = None
|
|
if docco.name in self.msg_mults_list:
|
|
if "MULTS" in self.msg_mults_list[docco.name]:
|
|
if self.msg_mults_list[docco.name] in self.msg_mults_list:
|
|
mults = self.msg_mults_list[self.msg_mults_list[docco.name]]
|
|
else:
|
|
mults = self.msg_mults_list[docco.name]
|
|
# Apply the units/mults to the docco
|
|
if units is not None and mults is not None:
|
|
docco.set_units(units,mults)
|
|
elif units is not None or mults is not None:
|
|
print(f"Cannot find matching units/mults for message {docco.name}")
|
|
|
|
enums_by_name = {}
|
|
for enum in self.enumerations:
|
|
enums_by_name[enum.name] = enum
|
|
for emitter in self.emitters:
|
|
emitter.emit(new_doccos, enums_by_name)
|
|
|
|
def run(self):
|
|
self.populate_lookups()
|
|
self.enumerations = enum_parse.EnumDocco(self.vehicle).get_enumerations()
|
|
self.files = []
|
|
self.search_for_files([self.vehicle_map[self.vehicle], "libraries"])
|
|
self.parse_files()
|
|
self.emit_output()
|
|
|
|
def finalise_docco(self, docco):
|
|
self.doccos.append(docco)
|
|
self.enumerations += docco.bits_enums
|
|
|
|
|
|
if __name__ == '__main__':
|
|
parser = argparse.ArgumentParser(description="Parse parameters.")
|
|
parser.add_argument("-v", "--verbose", dest='verbose', action='store_true', default=False, help="show debugging output")
|
|
parser.add_argument("--vehicle", required=True, help="Vehicle type to generate for")
|
|
|
|
args = parser.parse_args()
|
|
|
|
s = LoggerDocco(args.vehicle)
|
|
|
|
if args.vehicle not in s.vehicle_map:
|
|
print("Invalid vehicle (choose from: %s)" % str(s.vehicle_map.keys()))
|
|
sys.exit(1)
|
|
|
|
s.run()
|