mirror of
https://github.com/ArduPilot/ardupilot
synced 2025-01-05 07:28:29 -04:00
515 lines
20 KiB
Python
Executable File
515 lines
20 KiB
Python
Executable File
#!/usr/bin/env python
|
|
|
|
'''
|
|
AP_FLAKE8_CLEAN
|
|
'''
|
|
|
|
from __future__ import print_function
|
|
|
|
import argparse
|
|
import copy
|
|
import os
|
|
import re
|
|
import sys
|
|
|
|
import emit_html
|
|
import emit_rst
|
|
import emit_xml
|
|
import emit_md
|
|
|
|
import enum_parse
|
|
from enum_parse import EnumDocco
|
|
|
|
topdir = os.path.join(os.path.dirname(os.path.realpath(__file__)), '../../../')
|
|
topdir = os.path.realpath(topdir)
|
|
|
|
# Regular expressions for finding message information in code comments
|
|
re_loggermessage = re.compile(r"@LoggerMessage\s*:\s*([\w,]+)", re.MULTILINE)
|
|
re_commentline = re.compile(r"\s*//")
|
|
re_description = re.compile(r"\s*//\s*@Description\s*:\s*(.*)")
|
|
re_url = re.compile(r"\s*//\s*@URL\s*:\s*(.*)")
|
|
re_field = re.compile(r"\s*//\s*@Field\s*:\s*(\w+):\s*(.*)")
|
|
re_fieldbits = re.compile(r"\s*//\s*@FieldBits\s*:\s*(\w+):\s*(.*)")
|
|
re_fieldbitmaskenum = re.compile(r"\s*//\s*@FieldBitmaskEnum\s*:\s*(\w+):\s*(.*)")
|
|
re_fieldvalueenum = re.compile(r"\s*//\s*@FieldValueEnum\s*:\s*(\w+):\s*(.*)")
|
|
re_vehicles = re.compile(r"\s*//\s*@Vehicles\s*:\s*(.*)")
|
|
|
|
# Regular expressions for finding message definitions in structure format
|
|
re_start_messagedef = re.compile(r"^\s*{?\s*LOG_[A-Z0-9_]+_[MSGTA]+[A-Z0-9_]*\s*,")
|
|
re_deffield = r'[\s\\]*"?([\w\-#?%]+)"?\s*'
|
|
re_full_messagedef = re.compile(r'\s*LOG_\w+\s*,\s*\w+\([^)]+\)[\s\\]*,' +
|
|
f'{re_deffield},{re_deffield},' +
|
|
r'[\s\\]*"?([\w,]+)"?[\s\\]*,' +
|
|
f'{re_deffield},{re_deffield}',
|
|
re.MULTILINE)
|
|
re_fmt_define = re.compile(r'#define\s+(\w+_FMT)\s+"([\w\-#?%]+)"')
|
|
re_units_define = re.compile(r'#define\s+(\w+_UNITS)\s+"([\w\-#?%]+)"')
|
|
re_mults_define = re.compile(r'#define\s+(\w+_MULTS)\s+"([\w\-#?%]+)"')
|
|
|
|
# Regular expressions for finding message definitions in Write calls
|
|
re_start_writecall = re.compile(r"\s*[AP:]*logger[\(\)]*.Write[StreamingCrcl]*\(")
|
|
re_writefield = r'\s*"([\w\-#?%,]+)"\s*'
|
|
re_full_writecall = re.compile(r'\s*[AP:]*logger[\(\)]*.Write[StreamingCrcl]*\(' +
|
|
f'{re_writefield},{re_writefield},{re_writefield},({re_writefield},{re_writefield})?',
|
|
re.MULTILINE)
|
|
|
|
# Regular expression for extracting unit and multipliers from structure
|
|
re_units_mults_struct = re.compile(r"^\s*{\s*'([\w\-#?%!/])',"+r'\s*"?([\w\-#?%./]*)"?\s*}')
|
|
|
|
# TODO: validate URLS actually return 200
|
|
|
|
# Lookup tables are populated by reading LogStructure.h
|
|
log_fmt_lookup = {}
|
|
log_units_lookup = {}
|
|
log_mult_lookup = {}
|
|
|
|
# Lookup table to convert multiplier to prefix
|
|
mult_prefix_lookup = {
|
|
0: "",
|
|
1: "",
|
|
1e-1: "d", # deci-
|
|
1e-2: "c", # centi-
|
|
1e-3: "m", # milli-
|
|
1e-6: "μ", # micro-
|
|
1e-9: "n" # nano-
|
|
}
|
|
|
|
|
|
class LoggerDocco(object):
|
|
|
|
vehicle_map = {
|
|
"Rover": "Rover",
|
|
"Sub": "ArduSub",
|
|
"Copter": "ArduCopter",
|
|
"Plane": "ArduPlane",
|
|
"Tracker": "AntennaTracker",
|
|
"Blimp": "Blimp",
|
|
}
|
|
|
|
def __init__(self, vehicle):
|
|
self.vehicle = vehicle
|
|
self.doccos = []
|
|
self.emitters = [
|
|
emit_html.HTMLEmitter(),
|
|
emit_rst.RSTEmitter(),
|
|
emit_xml.XMLEmitter(),
|
|
emit_md.MDEmitter(),
|
|
]
|
|
self.msg_fmts_list = {}
|
|
self.msg_units_list = {}
|
|
self.msg_mults_list = {}
|
|
|
|
class Docco(object):
|
|
|
|
def __init__(self, name):
|
|
self.name = name
|
|
self.url = None
|
|
if isinstance(name, list):
|
|
self.description = [None] * len(name)
|
|
else:
|
|
self.description = None
|
|
self.fields = {}
|
|
self.fields_order = []
|
|
self.vehicles = None
|
|
self.bits_enums = []
|
|
|
|
def add_name(self, name):
|
|
# If self.name/description aren't lists, convert them
|
|
if isinstance(self.name, str):
|
|
self.name = [self.name]
|
|
self.description = [self.description]
|
|
# Replace any existing empty descriptions with empty strings
|
|
for i in range(0, len(self.description)):
|
|
if self.description[i] is None:
|
|
self.description[i] = ""
|
|
# Extend the name and description lists
|
|
if isinstance(name, list):
|
|
self.name.extend(name)
|
|
self.description.extend([None] * len(name))
|
|
else:
|
|
self.name.append(name)
|
|
self.description.append(None)
|
|
|
|
def set_description(self, desc):
|
|
if isinstance(self.description, list):
|
|
for i in range(0, len(self.description)):
|
|
if self.description[i] is None:
|
|
self.description[i] = desc
|
|
else:
|
|
self.description = desc
|
|
|
|
def set_url(self, url):
|
|
self.url = url
|
|
|
|
def ensure_field(self, field):
|
|
if field not in self.fields:
|
|
self.fields[field] = {}
|
|
self.fields_order.append(field)
|
|
|
|
def set_field_description(self, field, description):
|
|
if field in self.fields:
|
|
raise ValueError("Already have field %s in %s" %
|
|
(field, self.name))
|
|
self.ensure_field(field)
|
|
self.fields[field]["description"] = description
|
|
|
|
def set_field_bits(self, field, bits):
|
|
bits = bits.split(",")
|
|
count = 0
|
|
entries = []
|
|
for bit in bits:
|
|
entries.append(EnumDocco.EnumEntry(bit, 1 << count, None))
|
|
count += 1
|
|
bitmask_name = self.name + field
|
|
self.bits_enums.append(EnumDocco.Enumeration(bitmask_name, entries))
|
|
self.ensure_field(field)
|
|
self.fields[field]["bitmaskenum"] = bitmask_name
|
|
|
|
def set_fieldbitmaskenum(self, field, bits):
|
|
self.ensure_field(field)
|
|
self.fields[field]["bitmaskenum"] = bits
|
|
|
|
def set_fieldvalueenum(self, field, bits):
|
|
self.ensure_field(field)
|
|
self.fields[field]["valueenum"] = bits
|
|
|
|
def set_vehicles(self, vehicles):
|
|
self.vehicles = vehicles
|
|
|
|
def set_fmts(self, fmts):
|
|
# If no fields are defined, do nothing
|
|
if len(self.fields_order) == 0:
|
|
return
|
|
# Make sure lengths match up
|
|
if len(fmts) != len(self.fields_order):
|
|
print(f"Number of fmts don't match fields: msg={self.name} fmts={fmts} num_fields={len(self.fields_order)} {self.fields_order}") # noqa:E501
|
|
return
|
|
# Loop through the list
|
|
for idx in range(0, len(fmts)):
|
|
if fmts[idx] in log_fmt_lookup:
|
|
self.fields[self.fields_order[idx]]["fmt"] = log_fmt_lookup[fmts[idx]]
|
|
else:
|
|
print(f"Unrecognised format character: {fmts[idx]} in message {self.name}")
|
|
|
|
def set_units(self, units, mults):
|
|
# If no fields are defined, do nothing
|
|
if len(self.fields_order) == 0:
|
|
return
|
|
# Make sure lengths match up
|
|
if len(units) != len(self.fields_order) or len(units) != len(mults):
|
|
print(f"Number of units/mults/fields don't match: msg={self.name} units={units} mults={mults} num_fields={len(self.fields_order)}") # noqa:E501
|
|
return
|
|
# Loop through the list
|
|
for idx in range(0, len(units)):
|
|
# Get the index into fields from field_order
|
|
f = self.fields_order[idx]
|
|
# Convert unit char to base unit
|
|
if units[idx] in log_units_lookup:
|
|
baseunit = log_units_lookup[units[idx]]
|
|
else:
|
|
print(f"Unrecognised units character: {units[idx]} in message {self.name}")
|
|
continue
|
|
# Do nothing if this field has no unit defined
|
|
if baseunit == "":
|
|
continue
|
|
# Convert mult char to value
|
|
if mults[idx] in log_mult_lookup:
|
|
mult = log_mult_lookup[mults[idx]]
|
|
mult_num = float(mult)
|
|
else:
|
|
print(f"Unrecognised multiplier character: {mults[idx]} in message {self.name}")
|
|
continue
|
|
# Check if the defined format for this field contains its own multiplier
|
|
# If so, the presented value will be the base-unit directly
|
|
if 'fmt' in self.fields[f] and self.fields[f]['fmt'].endswith("* 100"):
|
|
self.fields[f]["units"] = baseunit
|
|
elif 'fmt' in self.fields[f] and "latitude/longitude" in self.fields[f]['fmt']:
|
|
self.fields[f]["units"] = baseunit
|
|
# Check if we have a defined prefix for this multiplier
|
|
elif mult_num in mult_prefix_lookup:
|
|
self.fields[f]["units"] = f"{mult_prefix_lookup[mult_num]}{baseunit}"
|
|
# If all else fails, set the unit as the multipler and base unit together
|
|
else:
|
|
self.fields[f]["units"] = f"{mult} {baseunit}"
|
|
|
|
def populate_lookups(self):
|
|
# Initialise the lookup tables
|
|
# Read the contents of the LogStructure.h file
|
|
structfile = os.path.join(topdir, "libraries", "AP_Logger", "LogStructure.h")
|
|
with open(structfile) as f:
|
|
lines = f.readlines()
|
|
f.close()
|
|
# Initialise current section to none
|
|
section = "none"
|
|
# Loop through the lines in the file
|
|
for line in lines:
|
|
# Look for the start of fmt/unit/mult info
|
|
if line.startswith("Format characters"):
|
|
section = "fmt"
|
|
elif line.startswith("const struct UnitStructure"):
|
|
section = "units"
|
|
elif line.startswith("const struct MultiplierStructure"):
|
|
section = "mult"
|
|
# Read formats from code comment, e.g.:
|
|
# b : int8_t
|
|
elif section == "fmt":
|
|
if "*/" in line:
|
|
section = "none"
|
|
else:
|
|
parts = line.split(":")
|
|
log_fmt_lookup[parts[0].strip()] = parts[1].strip()
|
|
# Read units or multipliers from C struct definition, e.g.:
|
|
# { '2', 1e2 }, or { 'J', "W.s" },
|
|
elif section != "none":
|
|
if "};" in line:
|
|
section = "none"
|
|
else:
|
|
u = re_units_mults_struct.search(line)
|
|
if u is not None and section == "units":
|
|
log_units_lookup[u.group(1)] = u.group(2)
|
|
if u is not None and section == "mult":
|
|
log_mult_lookup[u.group(1)] = u.group(2)
|
|
|
|
def search_for_files(self, dirs_to_search):
|
|
_next = []
|
|
for _dir in dirs_to_search:
|
|
_dir = os.path.join(topdir, _dir)
|
|
for entry in os.listdir(_dir):
|
|
filepath = os.path.join(_dir, entry)
|
|
if os.path.isdir(filepath):
|
|
_next.append(filepath)
|
|
continue
|
|
(name, extension) = os.path.splitext(filepath)
|
|
if extension not in [".cpp", ".h"]:
|
|
continue
|
|
self.files.append(filepath)
|
|
if len(_next):
|
|
self.search_for_files(_next)
|
|
|
|
def parse_messagedef(self, messagedef):
|
|
# Merge concatinated strings and remove comments
|
|
messagedef = re.sub(r'"\s+"', '', messagedef)
|
|
messagedef = re.sub(r'//[^\n]*', '', messagedef)
|
|
# Extract details from a structure definition
|
|
d = re_full_messagedef.search(messagedef)
|
|
if d is not None:
|
|
self.msg_fmts_list[d.group(1)] = d.group(2)
|
|
self.msg_units_list[d.group(1)] = d.group(4)
|
|
self.msg_mults_list[d.group(1)] = d.group(5)
|
|
return
|
|
# Extract details from a WriteStreaming call
|
|
d = re_full_writecall.search(messagedef)
|
|
if d is not None:
|
|
if d.group(1) in self.msg_fmts_list:
|
|
return
|
|
if d.group(5) is None:
|
|
self.msg_fmts_list[d.group(1)] = d.group(3)
|
|
else:
|
|
self.msg_fmts_list[d.group(1)] = d.group(6)
|
|
self.msg_units_list[d.group(1)] = d.group(3)
|
|
self.msg_mults_list[d.group(1)] = d.group(5)
|
|
return
|
|
# Didn't parse
|
|
# print(f"Unable to parse: {messagedef}")
|
|
|
|
def search_messagedef_start(self, line, prevmessagedef=""):
|
|
# Look for the start of a structure definition
|
|
d = re_start_messagedef.search(line)
|
|
if d is not None:
|
|
messagedef = line
|
|
if "}" in line:
|
|
self.parse_messagedef(messagedef)
|
|
return ""
|
|
else:
|
|
return messagedef
|
|
# Look for a new call to WriteStreaming
|
|
d = re_start_writecall.search(line)
|
|
if d is not None:
|
|
messagedef = line
|
|
if ";" in line:
|
|
self.parse_messagedef(messagedef)
|
|
return ""
|
|
else:
|
|
return messagedef
|
|
# If we didn't find a new one, continue with any previous state
|
|
return prevmessagedef
|
|
|
|
def parse_file(self, filepath):
|
|
with open(filepath) as f:
|
|
# print("Opened (%s)" % filepath)
|
|
lines = f.readlines()
|
|
f.close()
|
|
|
|
def debug(x):
|
|
pass
|
|
# if filepath == "/home/pbarker/rc/ardupilot/libraries/AP_HAL/AnalogIn.h":
|
|
# debug = print
|
|
state_outside = "outside"
|
|
state_inside = "inside"
|
|
messagedef = ""
|
|
state = state_outside
|
|
docco = None
|
|
for line in lines:
|
|
debug(f"{state}: {line}")
|
|
if messagedef:
|
|
messagedef = messagedef + line
|
|
if "}" in line or ";" in line:
|
|
self.parse_messagedef(messagedef)
|
|
messagedef = ""
|
|
if state == state_outside:
|
|
# Check for start of a message definition
|
|
messagedef = self.search_messagedef_start(line, messagedef)
|
|
|
|
# Check for fmt/unit/mult #define
|
|
u = re_fmt_define.search(line)
|
|
if u is not None:
|
|
self.msg_fmts_list[u.group(1)] = u.group(2)
|
|
u = re_units_define.search(line)
|
|
if u is not None:
|
|
self.msg_units_list[u.group(1)] = u.group(2)
|
|
u = re_mults_define.search(line)
|
|
if u is not None:
|
|
self.msg_mults_list[u.group(1)] = u.group(2)
|
|
|
|
# Check for the @LoggerMessage tag indicating the start of the docco block
|
|
m = re_loggermessage.search(line)
|
|
if m is None:
|
|
continue
|
|
name = m.group(1)
|
|
if "," in name:
|
|
name = name.split(",")
|
|
state = state_inside
|
|
docco = LoggerDocco.Docco(name)
|
|
elif state == state_inside:
|
|
# If this line is not a comment, then this is the end of the docco block
|
|
if not re_commentline.match(line):
|
|
state = state_outside
|
|
if docco.vehicles is None or self.vehicle in docco.vehicles:
|
|
self.finalise_docco(docco)
|
|
messagedef = self.search_messagedef_start(line)
|
|
continue
|
|
# Check for an multiple @LoggerMessage lines in this docco block
|
|
m = re_loggermessage.search(line)
|
|
if m is not None:
|
|
name = m.group(1)
|
|
if "," in name:
|
|
name = name.split(",")
|
|
docco.add_name(name)
|
|
continue
|
|
# Find and extract data from the various docco fields
|
|
m = re_description.match(line)
|
|
if m is not None:
|
|
docco.set_description(m.group(1))
|
|
continue
|
|
m = re_url.match(line)
|
|
if m is not None:
|
|
docco.set_url(m.group(1))
|
|
continue
|
|
m = re_field.match(line)
|
|
if m is not None:
|
|
docco.set_field_description(m.group(1), m.group(2))
|
|
continue
|
|
m = re_fieldbits.match(line)
|
|
if m is not None:
|
|
docco.set_field_bits(m.group(1), m.group(2))
|
|
continue
|
|
m = re_fieldbitmaskenum.match(line)
|
|
if m is not None:
|
|
docco.set_fieldbitmaskenum(m.group(1), m.group(2))
|
|
continue
|
|
m = re_fieldvalueenum.match(line)
|
|
if m is not None:
|
|
docco.set_fieldvalueenum(m.group(1), m.group(2))
|
|
continue
|
|
m = re_vehicles.match(line)
|
|
if m is not None:
|
|
docco.set_vehicles([x.strip() for x in m.group(1).split(',')])
|
|
continue
|
|
print("Unknown field (%s)" % str(line))
|
|
sys.exit(1)
|
|
|
|
def parse_files(self):
|
|
for _file in self.files:
|
|
self.parse_file(_file)
|
|
|
|
def emit_output(self):
|
|
# expand things like PIDR,PIDQ,PIDA into multiple doccos
|
|
new_doccos = []
|
|
for docco in self.doccos:
|
|
if isinstance(docco.name, list):
|
|
for name, desc in zip(docco.name, docco.description):
|
|
tmpdocco = copy.copy(docco)
|
|
tmpdocco.name = name
|
|
tmpdocco.description = desc
|
|
new_doccos.append(tmpdocco)
|
|
else:
|
|
new_doccos.append(docco)
|
|
new_doccos = sorted(new_doccos, key=lambda x : x.name)
|
|
|
|
# Try to attach the formats/units/multipliers
|
|
for docco in new_doccos:
|
|
# Apply the Formats to the docco
|
|
if docco.name in self.msg_fmts_list:
|
|
if "FMT" in self.msg_fmts_list[docco.name]:
|
|
if self.msg_fmts_list[docco.name] in self.msg_fmts_list:
|
|
docco.set_fmts(self.msg_fmts_list[self.msg_fmts_list[docco.name]])
|
|
else:
|
|
docco.set_fmts(self.msg_fmts_list[docco.name])
|
|
else:
|
|
print(f"No formats found for message {docco.name}")
|
|
# Get the Units
|
|
units = None
|
|
if docco.name in self.msg_units_list:
|
|
if "UNITS" in self.msg_units_list[docco.name]:
|
|
if self.msg_units_list[docco.name] in self.msg_units_list:
|
|
units = self.msg_units_list[self.msg_units_list[docco.name]]
|
|
else:
|
|
units = self.msg_units_list[docco.name]
|
|
# Get the Multipliers
|
|
mults = None
|
|
if docco.name in self.msg_mults_list:
|
|
if "MULTS" in self.msg_mults_list[docco.name]:
|
|
if self.msg_mults_list[docco.name] in self.msg_mults_list:
|
|
mults = self.msg_mults_list[self.msg_mults_list[docco.name]]
|
|
else:
|
|
mults = self.msg_mults_list[docco.name]
|
|
# Apply the units/mults to the docco
|
|
if units is not None and mults is not None:
|
|
docco.set_units(units, mults)
|
|
elif units is not None or mults is not None:
|
|
print(f"Cannot find matching units/mults for message {docco.name}")
|
|
|
|
enums_by_name = {}
|
|
for enum in self.enumerations:
|
|
enums_by_name[enum.name] = enum
|
|
for emitter in self.emitters:
|
|
emitter.emit(new_doccos, enums_by_name)
|
|
|
|
def run(self):
|
|
self.populate_lookups()
|
|
self.enumerations = enum_parse.EnumDocco(self.vehicle).get_enumerations()
|
|
self.files = []
|
|
self.search_for_files([self.vehicle_map[self.vehicle], "libraries"])
|
|
self.parse_files()
|
|
self.emit_output()
|
|
|
|
def finalise_docco(self, docco):
|
|
self.doccos.append(docco)
|
|
self.enumerations += docco.bits_enums
|
|
|
|
|
|
if __name__ == '__main__':
|
|
parser = argparse.ArgumentParser(description="Parse parameters.")
|
|
parser.add_argument("-v", "--verbose", dest='verbose', action='store_true', default=False, help="show debugging output")
|
|
parser.add_argument("--vehicle", required=True, help="Vehicle type to generate for")
|
|
|
|
args = parser.parse_args()
|
|
|
|
s = LoggerDocco(args.vehicle)
|
|
|
|
if args.vehicle not in s.vehicle_map:
|
|
print("Invalid vehicle (choose from: %s)" % str(s.vehicle_map.keys()))
|
|
sys.exit(1)
|
|
|
|
s.run()
|