Tools: scripts: create mavlink_parse.py

A parser that finds incoming, requestable, and outgoing MAVLink messages for each vehicle.
May not indicate full support, but at least shows the messages which are handled in the code.

Requires Python >= 3.11
This commit is contained in:
ES-Alexander 2022-11-25 01:08:39 +11:00 committed by Willian Galvani
parent 66c674c192
commit 9b24dd20c4
1 changed files with 502 additions and 0 deletions

502
Tools/scripts/mavlink_parse.py Executable file
View File

@ -0,0 +1,502 @@
#!/usr/bin/env python
import re
from enum import StrEnum # requires Python >= 3.11
from pathlib import Path
from itertools import chain
from dataclasses import dataclass, astuple
from pymavlink.dialects.v20 import (
common, icarous, cubepilot, uAvionix, ardupilotmega
)
class MAVLinkDialect(StrEnum):
# in subset, superset, unknown order, for correct links
# supported values must match imported dialect names
COMMON = 'common'
ICAROUS = 'icarous'
CUBEPILOT = 'cubepilot'
UAVIONIX = 'uAvionix'
ARDUPILOTMEGA = 'ardupilotmega'
UNKNOWN = 'UNKNOWN'
@dataclass(slots=True, order=True)
class MAVLinkMessage:
name: str
source: str
dialect: MAVLinkDialect = MAVLinkDialect.UNKNOWN
PREFIX = 'MAVLINK_MSG_ID_'
# Get message sets, for quick containment checks.
# Function is required because of Python's class scoping rules.
# See https://stackoverflow.com/questions/13905741.
def _get_known_messages(prefix):
''' Returns a dictionary of {dialect: {messages}} given 'prefix'. '''
return {
dialect: set(m for m in dir(globals()[dialect])
if m.startswith(prefix))
for dialect in MAVLinkDialect
if dialect != MAVLinkDialect.UNKNOWN
}
KNOWN_DIALECTS = _get_known_messages(PREFIX)
# Try to determine dialect if initialised without one specified.
def __post_init__(self):
if self.dialect == MAVLinkDialect.UNKNOWN:
self.determine_dialect()
@property
def id_name(self):
return self.PREFIX + self.name
def determine_dialect(self):
for dialect, message_set in self.KNOWN_DIALECTS.items():
if self.id_name in message_set:
self.dialect = dialect
break # dialect found, no need to continue searching
else:
self.dialect = MAVLinkDialect.UNKNOWN
def as_tuple(self):
return astuple(self)
def __str__(self):
return f'{self.name:<45}{self.source:<55}{self.dialect}'
@classmethod
def get_unsupported(cls, supported: set, remove_prefix=True):
''' Yields known messages that are not included in 'supported'. '''
offset = len(cls.PREFIX) if remove_prefix else 0
known_missing = set() # don't double-count for supersets
for dialect, message_set in cls.KNOWN_DIALECTS.items():
missing_names = message_set - supported - known_missing
for name in missing_names:
yield cls(name[offset:], 'UNSUPPORTED', dialect)
known_missing |= missing_names
class MAVLinkCommand(MAVLinkMessage):
PREFIX = 'MAV_CMD_'
KNOWN_DIALECTS = MAVLinkMessage._get_known_messages(PREFIX)
@property
def id_name(self):
return self.name # commands are registered with their prefix
@classmethod
def get_unsupported(cls, supported: set, remove_prefix=False):
''' Yields known commands that are not included in 'supported'. '''
# avoid accidentally treating enum values as commands
enums = [f'{e}_' for e in ardupilotmega.enums
if e.startswith(cls.PREFIX)]
for command in super().get_unsupported(supported, remove_prefix):
if not any(command.name.startswith(e) for e in enums):
yield command
class MAVLinkDetector:
# file paths
BASE_DIR = Path(__file__).parent / '../..'
COMMON_FILE = BASE_DIR / 'libraries/GCS_MAVLink/GCS_Common.cpp'
STREAM_GROUP_FILE = 'GCS_MAVLink.cpp'
# regex for messages handled by the autopilot
INCOMING_MESSAGES = re.compile(r'case MAVLINK_MSG_ID_([A-Z0-9_]*)')
# regex for commands handled by the autopilot
INCOMING_COMMANDS = re.compile(r'case (MAV_CMD_[A-Z0-9_]*)')
# regex for messages that can be requested from the autopilot
REQUESTABLE_REGION = re.compile(' map\[\]([^;]*);')
REQUESTABLE_MAP = re.compile(r'MAVLINK_MSG_ID_([A-Z0-9_]*),\s*MSG_([A-Z0-9_]*)')
# regex for messages the autopilot might send, but cannot be requested
OUTGOING_MESSAGES = re.compile(r'mavlink_msg_([a-z0-9_]*)_send\(')
# regex for extracting messages in stream groups
STREAM_GROUPS = re.compile(r'ap_message STREAM_([A-Z0-9_]*)_msgs\[\] = \{([^\}]*)')
AP_MESSAGE = re.compile(r'MSG_([A-Z0-9_]*)')
# regex for named values
NAMED_FLOAT = re.compile(r'send_named_float\("([\w]*)"')
NAMED_INT = re.compile(r'send_named_int\("([\w]*)"')
TYPE_DESCRIPTIONS = {
'incoming_messages':
'Messages the autopilot handles when received.',
'requestable_messages':
'Messages that can be requested/streamed from the autopilot.',
'outgoing_messages':
'Messages the autopilot will send automatically (unrequested).',
'named_floats':
'Breakout of named floating-point (numerical) values sent by the autopilot.',
'named_ints':
'Breakout of named integer values sent by the autopilot.',
}
EXTRA_DESCRIPTIONS = {
'stream_groups':
'Message groups with stream rates requestable by `SRn_*` parameters.'
' Messages in a group are only sent if the corresponding feature'
' is active.',
'missing_messages':
'Unsupported / unhandled messages.',
'incoming_commands':
TYPE_DESCRIPTIONS['incoming_messages'].replace('Messages','Commands'),
}
EXTRA_DESCRIPTIONS['missing_commands'] = \
EXTRA_DESCRIPTIONS['missing_messages'].replace('messages', 'commands')
TYPE_OPTIONS = {
'messages': MAVLinkMessage,
'commands': MAVLinkCommand,
}
MAVLINK_URL = 'https://mavlink.io/en/messages/{dialect}.html#{message_name}'
ARDUPILOT_URL = 'https://github.com/ArduPilot/ardupilot/tree/{branch}/{source}'
EXPORT_FILETYPES = {
'csv': 'csv',
'markdown': 'md'
}
MARKDOWN_INTRO = (
'The [MAVLink](https://mavlink.io/en/) protocol supports a variety'
' of features and functionalities, but not all'
' [messages](https://mavlink.io/en/messages/) or'
' [commands](https://mavlink.io/en/services/command.html)'
' are implemented by the ArduPilot ecosystem, or relevant to a'
' particular autopilot firmware.\n\n'
'This page is auto-generated from analysing the {vehicle} source'
' code, and provides an indication of which messages{commands} are'
' handled by, requestable from, and sent from the firmware. '
'A message being handled does not guarantee full support, but at'
' least shows that the autopilot is aware it exists, and will try'
' to do something meaningful with it.{unsupported}{stream_groups}'
)
VEHICLES = ('AntennaTracker', 'ArduCopter', 'ArduPlane', 'ArduSub', 'Rover')
def __init__(self, common_files, vehicle='ALL',
exclude_libraries=['SITL', 'AP_Scripting']):
self.vehicle = vehicle
vehicles = [vehicle] if vehicle != 'ALL' else self.VEHICLES
files = chain(*((self.BASE_DIR / vehicle).glob('**/*.cpp')
for vehicle in vehicles),
common_files)
self.incoming_messages = {}
self.incoming_commands = {}
self.outgoing_messages = {}
self.requestable_messages = {}
self._ap_to_mavlink = {
'NAMED_FLOAT': 'NAMED_VALUE_FLOAT', # manual inclusion
}
self.named_floats = {}
self.named_ints = {}
for file in files:
folder = file.parent.stem
if folder in exclude_libraries:
continue
text = file.read_text()
source = f'{folder}/{file.name}'
if file == self.COMMON_FILE:
for mavlink, ap_message in self.find_requestable_messages(text):
self.requestable_messages[mavlink] = \
MAVLinkMessage(mavlink, source)
if ap_message != mavlink:
self._ap_to_mavlink[ap_message] = mavlink
named_types = ('float', 'int') if folder in vehicles else ()
for type_ in named_types:
substring = f'named_{type_}s'
method = getattr(self, f'find_{substring}')
names = getattr(self, substring)
new_names = set(method(text)) - names.keys()
for name in new_names:
names[name] = MAVLinkMessage(f'NAMED_VALUE_{type_.upper()}:{name}',
source, MAVLinkDialect.COMMON)
for method, data, type_ in (
(self.find_incoming_messages, self.incoming_messages, 'messages'),
(self.find_incoming_commands, self.incoming_commands, 'commands'),
(self.find_outgoing_messages, self.outgoing_messages, 'messages'),
):
new_data = set(method(text)) - data.keys()
cls = self.TYPE_OPTIONS[type_]
for datum in new_data:
data[datum] = cls(datum, source)
self._supported_names = {'messages': None, 'commands': None}
self._unsupported = self._supported_names.copy()
self._stream_groups = self.get_stream_groups(vehicle) if len(vehicles) == 1 else []
@classmethod
def get_description(cls, query):
return cls.TYPE_DESCRIPTIONS.get(query,
cls.EXTRA_DESCRIPTIONS.get(query, '')
)
@classmethod
def find_incoming_messages(cls, text: str):
return cls.INCOMING_MESSAGES.findall(text)
@classmethod
def find_incoming_commands(cls, text: str):
return cls.INCOMING_COMMANDS.findall(text)
@classmethod
def find_outgoing_messages(cls, text: str):
return (msg.upper() for msg in
cls.OUTGOING_MESSAGES.findall(text))
@classmethod
def find_requestable_messages(cls, text: str):
region = cls.REQUESTABLE_REGION.search(text).group()
return cls.REQUESTABLE_MAP.findall(region)
@classmethod
def find_named_floats(cls, text: str):
return cls.NAMED_FLOAT.findall(text)
@classmethod
def find_named_ints(cls, text: str):
return cls.NAMED_INT.findall(text)
def get_stream_groups(self, vehicle):
stream_groups = ['stream_groups']
text = (self.BASE_DIR / vehicle / self.STREAM_GROUP_FILE).read_text()
for group_name, message_data in self.STREAM_GROUPS.findall(text):
stream_groups.extend(sorted(
MAVLinkMessage(self._ap_to_mavlink.get(ap_message, ap_message),
f'SRn_{group_name}')
for ap_message in self.AP_MESSAGE.findall(message_data)
))
return stream_groups
def get_supported(self, type: str, inject_commands=False):
if type == 'messages':
for message_type in self.TYPE_DESCRIPTIONS:
values = getattr(self, message_type).values()
if not values:
continue
yield message_type
yield from sorted(values)
# add in incoming_commands right after incoming_messages
if inject_commands and message_type == 'incoming_messages':
yield from self.get_supported('commands')
elif type == 'commands':
yield 'incoming_commands'
yield from sorted(self.incoming_commands.values())
def get_supported_names(self, type: str):
if self._supported_names[type] is None:
self._supported_names[type] = set(
m.id_name for m in self.get_supported(type)
if isinstance(m, MAVLinkMessage)
)
return self._supported_names[type]
def get_unsupported(self, type='messages'):
if self._unsupported[type] is None:
supported_messages = self.get_supported_names(type)
cls = self.TYPE_OPTIONS[type]
self._unsupported[type] = sorted(
cls.get_unsupported(supported_messages)
)
if self._unsupported[type]:
yield f'missing_{type}'
yield from self._unsupported[type]
def get_iterable(self, include_commands=False, include_stream_groups=False,
include_unsupported=False):
iterables = [self.get_supported('messages', include_commands)]
if include_stream_groups:
iterables.append(self._stream_groups)
if include_unsupported:
iterables.append(self.get_unsupported('messages'))
if include_commands:
iterables.append(self.get_unsupported('commands'))
return chain(*iterables)
def printout(self, **iter_options):
for data in self.get_iterable(**iter_options):
match data: # requires Python >= 3.10
case str() as type_:
print(f'\n{type_}:',
self.get_description(type_),
sep='\n')
case MAVLinkMessage() as message:
print(message)
def export(self, filename: Path, type='csv', include_commands=False,
include_stream_groups=False, include_unsupported=False,
**export_options):
export_method = getattr(self, f'export_{type}')
# ensure export_method and get_iterable have the same options specified
iter_options = dict(
include_commands = include_commands,
include_stream_groups = include_stream_groups,
include_unsupported = include_unsupported,
)
with open(filename, 'w') as file:
export_method(file, self.get_iterable(**iter_options),
**export_options, **iter_options)
def export_csv(self, file, iterable, **ignore):
file.write('MAVLinkMessage,CodeSource,MAVLinkDialect,MessageType\n')
for data in iterable:
match data:
case str():
current_type = data
case MAVLinkMessage() as message:
print(*message.as_tuple(), current_type, sep=',', file=file)
def export_markdown(self, file, iterable, branch='master', header=None,
use_intro=True, **extra_kwargs):
if header == 'ArduSub':
import time
now = time.strftime('%Y-%m-%dT%H:%M:%S%z')
date = f'{now[:-2]}:{now[-2:]}' # add colon to the timezone
header = '\n'.join((
'+++',
'title = "MAVLink Support"',
'description = "MAVLink message support details."',
f'{date = }', 'template = "docs/page.html"',
'sort_by = "weight"', 'weight = 20', 'draft = false',
'[extra]', 'toc = true', 'top = false',
'+++'
))
if header:
print(header, file=file)
if use_intro:
commands = stream_groups = unsupported = ''
if extra_kwargs['include_commands']:
commands = ' (and commands)'
if extra_kwargs['include_unsupported']:
unsupported = (
'\n\nKnown [unsupported messages](#missing-messages)'
f'{commands} are shown at the end.'
)
if extra_kwargs['include_stream_groups']:
stream_groups = (
'\n\nThe autopilot includes a set of [stream groups]'
'(#stream-groups) for convenience, which allow'
' configuring the stream rates of groups of'
' requestable messages by setting parameter values. '
'It is also possible to manually request messages,'
' and request individual messages be streamed at a'
' specified rate.'
)
vehicle = self.vehicle.replace('ALL', 'ArduPilot')
print(self.MARKDOWN_INTRO.format(
vehicle=vehicle, commands=commands,
stream_groups=stream_groups, unsupported=unsupported
), file=file)
for data in iterable:
match data:
case str() as type_:
heading = type_.title().replace('_', ' ')
source_header = (
'Code Source' if type_ != 'stream_groups' else
'Stream Group Parameter'
)
print(f'## {heading}',
self.get_description(type_),
f'\nMAVLink Message | {source_header} | MAVLink Dialect',
'--- | --- | ---', sep='\n', file=file)
case MAVLinkMessage() as message:
name, source, dialect = message.as_tuple()
if dialect != MAVLinkDialect.UNKNOWN:
msg_url = self.MAVLINK_URL.format(dialect=dialect,
message_name=name.split(':')[0])
name = f'[{name}]({msg_url})'
if source != 'UNSUPPORTED' and not source.startswith('SRn'):
folder = source.split('/')[0]
base = 'libraries/' if folder not in self.VEHICLES else ''
code_url = self.ARDUPILOT_URL.format(branch=branch,
source=base+source)
source = f'[{source}]({code_url})'
print(name, source, dialect, sep=' | ', file=file)
if __name__ == '__main__':
from inspect import signature
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
detector_init_params = signature(MAVLinkDetector.__init__).parameters
default_vehicle = detector_init_params['vehicle'].default
vehicle_options = [default_vehicle, *MAVLinkDetector.VEHICLES]
default_exclusions = detector_init_params['exclude_libraries'].default
parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter)
parse_opts = parser.add_argument_group('parsing options')
parse_opts.add_argument('-v', '--vehicle', default=default_vehicle,
choices=vehicle_options, help='Vehicle folder, or ALL.')
parse_opts.add_argument('-e', '--exclude-library', action='append',
default=default_exclusions,
help='Libraries to exclude from the search.')
parse_opts.add_argument('-c', '--include-commands', action='store_true',
help='Include MAVLink commands as well as messages.')
parse_opts.add_argument('-g', '--include-stream-groups', action='store_true',
help='Include stream group message sets in the output.')
parse_opts.add_argument('-u', '--include-unsupported', action='store_true',
help='Include unsupported messages in the output.')
export_opts = parser.add_argument_group('export options')
export_opts.add_argument('-q', '--quiet', action='store_true',
help='Disable printout, only export a file.')
export_opts.add_argument('-f', '--format', default='markdown',
choices=['csv', 'markdown', 'none'],
help='Desired format for the exported file.')
export_opts.add_argument('-b', '--branch',
help=('The branch to link to in markdown mode.'
' Defaults to the branch in the working directory.'))
export_opts.add_argument('--filename', help='Override default filename.')
export_opts.add_argument('--header', help='Header for the markdown file.')
export_opts.add_argument('--no-intro', action='store_true',
help="Flag to not use the automatic markdown intro.")
args = parser.parse_args()
assert (args.vehicle in MAVLinkDetector.VEHICLES
or not args.include_stream_groups), \
'Determining stream groups requires a single vehicle to be specified.'
common_files = (MAVLinkDetector.BASE_DIR / 'libraries').glob('**/*.cpp')
messages = MAVLinkDetector(common_files, args.vehicle, args.exclude_library)
include_options = dict(
include_commands = args.include_commands,
include_stream_groups = args.include_stream_groups,
include_unsupported = args.include_unsupported,
)
if not args.quiet:
messages.printout(**include_options)
if args.format != 'none':
ext = messages.EXPORT_FILETYPES[args.format]
branch = args.branch
if not branch:
import subprocess
pattern = re.compile(r'On branch ([\S]*)')
result = subprocess.run(['git', 'status'], capture_output=True).stdout
try:
branch, = pattern.search(result.decode()).groups()
except AttributeError as e:
raise Exception(
'No --branch specified, and "git status" failed to find one.'
'Please manually specify an ardupilot firmware branch for '
'code source hyperlinks (e.g. Sub-4.1) or ensure this '
'repository copy is managed by git.'
)
filename = (
args.filename or
f'{args.vehicle}_{branch}_MAVLink_Messages.{ext}'
)
messages.export(filename, type=args.format, branch=branch, header=args.header,
use_intro=not args.no_intro, **include_options)