ardupilot/Tools/scripts/mavlink_parse.py

503 lines
21 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python
import re
from enum import StrEnum # requires Python >= 3.11
from pathlib import Path
from itertools import chain
from dataclasses import dataclass, astuple
from pymavlink.dialects.v20 import (
common, icarous, cubepilot, uAvionix, ardupilotmega
)
class MAVLinkDialect(StrEnum):
# in subset, superset, unknown order, for correct links
# supported values must match imported dialect names
COMMON = 'common'
ICAROUS = 'icarous'
CUBEPILOT = 'cubepilot'
UAVIONIX = 'uAvionix'
ARDUPILOTMEGA = 'ardupilotmega'
UNKNOWN = 'UNKNOWN'
@dataclass(slots=True, order=True)
class MAVLinkMessage:
name: str
source: str
dialect: MAVLinkDialect = MAVLinkDialect.UNKNOWN
PREFIX = 'MAVLINK_MSG_ID_'
# Get message sets, for quick containment checks.
# Function is required because of Python's class scoping rules.
# See https://stackoverflow.com/questions/13905741.
def _get_known_messages(prefix):
''' Returns a dictionary of {dialect: {messages}} given 'prefix'. '''
return {
dialect: set(m for m in dir(globals()[dialect])
if m.startswith(prefix))
for dialect in MAVLinkDialect
if dialect != MAVLinkDialect.UNKNOWN
}
KNOWN_DIALECTS = _get_known_messages(PREFIX)
# Try to determine dialect if initialised without one specified.
def __post_init__(self):
if self.dialect == MAVLinkDialect.UNKNOWN:
self.determine_dialect()
@property
def id_name(self):
return self.PREFIX + self.name
def determine_dialect(self):
for dialect, message_set in self.KNOWN_DIALECTS.items():
if self.id_name in message_set:
self.dialect = dialect
break # dialect found, no need to continue searching
else:
self.dialect = MAVLinkDialect.UNKNOWN
def as_tuple(self):
return astuple(self)
def __str__(self):
return f'{self.name:<45}{self.source:<55}{self.dialect}'
@classmethod
def get_unsupported(cls, supported: set, remove_prefix=True):
''' Yields known messages that are not included in 'supported'. '''
offset = len(cls.PREFIX) if remove_prefix else 0
known_missing = set() # don't double-count for supersets
for dialect, message_set in cls.KNOWN_DIALECTS.items():
missing_names = message_set - supported - known_missing
for name in missing_names:
yield cls(name[offset:], 'UNSUPPORTED', dialect)
known_missing |= missing_names
class MAVLinkCommand(MAVLinkMessage):
PREFIX = 'MAV_CMD_'
KNOWN_DIALECTS = MAVLinkMessage._get_known_messages(PREFIX)
@property
def id_name(self):
return self.name # commands are registered with their prefix
@classmethod
def get_unsupported(cls, supported: set, remove_prefix=False):
''' Yields known commands that are not included in 'supported'. '''
# avoid accidentally treating enum values as commands
enums = [f'{e}_' for e in ardupilotmega.enums
if e.startswith(cls.PREFIX)]
for command in super().get_unsupported(supported, remove_prefix):
if not any(command.name.startswith(e) for e in enums):
yield command
class MAVLinkDetector:
# file paths
BASE_DIR = Path(__file__).parent / '../..'
COMMON_FILE = BASE_DIR / 'libraries/GCS_MAVLink/GCS_Common.cpp'
STREAM_GROUP_FILE = 'GCS_MAVLink.cpp'
# regex for messages handled by the autopilot
INCOMING_MESSAGES = re.compile(r'case MAVLINK_MSG_ID_([A-Z0-9_]*)')
# regex for commands handled by the autopilot
INCOMING_COMMANDS = re.compile(r'case (MAV_CMD_[A-Z0-9_]*)')
# regex for messages that can be requested from the autopilot
REQUESTABLE_REGION = re.compile(' map\[\]([^;]*);')
REQUESTABLE_MAP = re.compile(r'MAVLINK_MSG_ID_([A-Z0-9_]*),\s*MSG_([A-Z0-9_]*)')
# regex for messages the autopilot might send, but cannot be requested
OUTGOING_MESSAGES = re.compile(r'mavlink_msg_([a-z0-9_]*)_send\(')
# regex for extracting messages in stream groups
STREAM_GROUPS = re.compile(r'ap_message STREAM_([A-Z0-9_]*)_msgs\[\] = \{([^\}]*)')
AP_MESSAGE = re.compile(r'MSG_([A-Z0-9_]*)')
# regex for named values
NAMED_FLOAT = re.compile(r'send_named_float\("([\w]*)"')
NAMED_INT = re.compile(r'send_named_int\("([\w]*)"')
TYPE_DESCRIPTIONS = {
'incoming_messages':
'Messages the autopilot handles when received.',
'requestable_messages':
'Messages that can be requested/streamed from the autopilot.',
'outgoing_messages':
'Messages the autopilot will send automatically (unrequested).',
'named_floats':
'Breakout of named floating-point (numerical) values sent by the autopilot.',
'named_ints':
'Breakout of named integer values sent by the autopilot.',
}
EXTRA_DESCRIPTIONS = {
'stream_groups':
'Message groups with stream rates requestable by `SRn_*` parameters.'
' Messages in a group are only sent if the corresponding feature'
' is active.',
'missing_messages':
'Unsupported / unhandled messages.',
'incoming_commands':
TYPE_DESCRIPTIONS['incoming_messages'].replace('Messages','Commands'),
}
EXTRA_DESCRIPTIONS['missing_commands'] = \
EXTRA_DESCRIPTIONS['missing_messages'].replace('messages', 'commands')
TYPE_OPTIONS = {
'messages': MAVLinkMessage,
'commands': MAVLinkCommand,
}
MAVLINK_URL = 'https://mavlink.io/en/messages/{dialect}.html#{message_name}'
ARDUPILOT_URL = 'https://github.com/ArduPilot/ardupilot/tree/{branch}/{source}'
EXPORT_FILETYPES = {
'csv': 'csv',
'markdown': 'md'
}
MARKDOWN_INTRO = (
'The [MAVLink](https://mavlink.io/en/) protocol supports a variety'
' of features and functionalities, but not all'
' [messages](https://mavlink.io/en/messages/) or'
' [commands](https://mavlink.io/en/services/command.html)'
' are implemented by the ArduPilot ecosystem, or relevant to a'
' particular autopilot firmware.\n\n'
'This page is auto-generated from analysing the {vehicle} source'
' code, and provides an indication of which messages{commands} are'
' handled by, requestable from, and sent from the firmware. '
'A message being handled does not guarantee full support, but at'
' least shows that the autopilot is aware it exists, and will try'
' to do something meaningful with it.{unsupported}{stream_groups}'
)
VEHICLES = ('AntennaTracker', 'ArduCopter', 'ArduPlane', 'ArduSub', 'Rover')
def __init__(self, common_files, vehicle='ALL',
exclude_libraries=['SITL', 'AP_Scripting']):
self.vehicle = vehicle
vehicles = [vehicle] if vehicle != 'ALL' else self.VEHICLES
files = chain(*((self.BASE_DIR / vehicle).glob('**/*.cpp')
for vehicle in vehicles),
common_files)
self.incoming_messages = {}
self.incoming_commands = {}
self.outgoing_messages = {}
self.requestable_messages = {}
self._ap_to_mavlink = {
'NAMED_FLOAT': 'NAMED_VALUE_FLOAT', # manual inclusion
}
self.named_floats = {}
self.named_ints = {}
for file in files:
folder = file.parent.stem
if folder in exclude_libraries:
continue
text = file.read_text()
source = f'{folder}/{file.name}'
if file == self.COMMON_FILE:
for mavlink, ap_message in self.find_requestable_messages(text):
self.requestable_messages[mavlink] = \
MAVLinkMessage(mavlink, source)
if ap_message != mavlink:
self._ap_to_mavlink[ap_message] = mavlink
named_types = ('float', 'int') if folder in vehicles else ()
for type_ in named_types:
substring = f'named_{type_}s'
method = getattr(self, f'find_{substring}')
names = getattr(self, substring)
new_names = set(method(text)) - names.keys()
for name in new_names:
names[name] = MAVLinkMessage(f'NAMED_VALUE_{type_.upper()}:{name}',
source, MAVLinkDialect.COMMON)
for method, data, type_ in (
(self.find_incoming_messages, self.incoming_messages, 'messages'),
(self.find_incoming_commands, self.incoming_commands, 'commands'),
(self.find_outgoing_messages, self.outgoing_messages, 'messages'),
):
new_data = set(method(text)) - data.keys()
cls = self.TYPE_OPTIONS[type_]
for datum in new_data:
data[datum] = cls(datum, source)
self._supported_names = {'messages': None, 'commands': None}
self._unsupported = self._supported_names.copy()
self._stream_groups = self.get_stream_groups(vehicle) if len(vehicles) == 1 else []
@classmethod
def get_description(cls, query):
return cls.TYPE_DESCRIPTIONS.get(query,
cls.EXTRA_DESCRIPTIONS.get(query, '')
)
@classmethod
def find_incoming_messages(cls, text: str):
return cls.INCOMING_MESSAGES.findall(text)
@classmethod
def find_incoming_commands(cls, text: str):
return cls.INCOMING_COMMANDS.findall(text)
@classmethod
def find_outgoing_messages(cls, text: str):
return (msg.upper() for msg in
cls.OUTGOING_MESSAGES.findall(text))
@classmethod
def find_requestable_messages(cls, text: str):
region = cls.REQUESTABLE_REGION.search(text).group()
return cls.REQUESTABLE_MAP.findall(region)
@classmethod
def find_named_floats(cls, text: str):
return cls.NAMED_FLOAT.findall(text)
@classmethod
def find_named_ints(cls, text: str):
return cls.NAMED_INT.findall(text)
def get_stream_groups(self, vehicle):
stream_groups = ['stream_groups']
text = (self.BASE_DIR / vehicle / self.STREAM_GROUP_FILE).read_text()
for group_name, message_data in self.STREAM_GROUPS.findall(text):
stream_groups.extend(sorted(
MAVLinkMessage(self._ap_to_mavlink.get(ap_message, ap_message),
f'SRn_{group_name}')
for ap_message in self.AP_MESSAGE.findall(message_data)
))
return stream_groups
def get_supported(self, type: str, inject_commands=False):
if type == 'messages':
for message_type in self.TYPE_DESCRIPTIONS:
values = getattr(self, message_type).values()
if not values:
continue
yield message_type
yield from sorted(values)
# add in incoming_commands right after incoming_messages
if inject_commands and message_type == 'incoming_messages':
yield from self.get_supported('commands')
elif type == 'commands':
yield 'incoming_commands'
yield from sorted(self.incoming_commands.values())
def get_supported_names(self, type: str):
if self._supported_names[type] is None:
self._supported_names[type] = set(
m.id_name for m in self.get_supported(type)
if isinstance(m, MAVLinkMessage)
)
return self._supported_names[type]
def get_unsupported(self, type='messages'):
if self._unsupported[type] is None:
supported_messages = self.get_supported_names(type)
cls = self.TYPE_OPTIONS[type]
self._unsupported[type] = sorted(
cls.get_unsupported(supported_messages)
)
if self._unsupported[type]:
yield f'missing_{type}'
yield from self._unsupported[type]
def get_iterable(self, include_commands=False, include_stream_groups=False,
include_unsupported=False):
iterables = [self.get_supported('messages', include_commands)]
if include_stream_groups:
iterables.append(self._stream_groups)
if include_unsupported:
iterables.append(self.get_unsupported('messages'))
if include_commands:
iterables.append(self.get_unsupported('commands'))
return chain(*iterables)
def printout(self, **iter_options):
for data in self.get_iterable(**iter_options):
match data: # requires Python >= 3.10
case str() as type_:
print(f'\n{type_}:',
self.get_description(type_),
sep='\n')
case MAVLinkMessage() as message:
print(message)
def export(self, filename: Path, type='csv', include_commands=False,
include_stream_groups=False, include_unsupported=False,
**export_options):
export_method = getattr(self, f'export_{type}')
# ensure export_method and get_iterable have the same options specified
iter_options = dict(
include_commands = include_commands,
include_stream_groups = include_stream_groups,
include_unsupported = include_unsupported,
)
with open(filename, 'w') as file:
export_method(file, self.get_iterable(**iter_options),
**export_options, **iter_options)
def export_csv(self, file, iterable, **ignore):
file.write('MAVLinkMessage,CodeSource,MAVLinkDialect,MessageType\n')
for data in iterable:
match data:
case str():
current_type = data
case MAVLinkMessage() as message:
print(*message.as_tuple(), current_type, sep=',', file=file)
def export_markdown(self, file, iterable, branch='master', header=None,
use_intro=True, **extra_kwargs):
if header == 'ArduSub':
import time
now = time.strftime('%Y-%m-%dT%H:%M:%S%z')
date = f'{now[:-2]}:{now[-2:]}' # add colon to the timezone
header = '\n'.join((
'+++',
'title = "MAVLink Support"',
'description = "MAVLink message support details."',
f'{date = }', 'template = "docs/page.html"',
'sort_by = "weight"', 'weight = 20', 'draft = false',
'[extra]', 'toc = true', 'top = false',
'+++'
))
if header:
print(header, file=file)
if use_intro:
commands = stream_groups = unsupported = ''
if extra_kwargs['include_commands']:
commands = ' (and commands)'
if extra_kwargs['include_unsupported']:
unsupported = (
'\n\nKnown [unsupported messages](#missing-messages)'
f'{commands} are shown at the end.'
)
if extra_kwargs['include_stream_groups']:
stream_groups = (
'\n\nThe autopilot includes a set of [stream groups]'
'(#stream-groups) for convenience, which allow'
' configuring the stream rates of groups of'
' requestable messages by setting parameter values. '
'It is also possible to manually request messages,'
' and request individual messages be streamed at a'
' specified rate.'
)
vehicle = self.vehicle.replace('ALL', 'ArduPilot')
print(self.MARKDOWN_INTRO.format(
vehicle=vehicle, commands=commands,
stream_groups=stream_groups, unsupported=unsupported
), file=file)
for data in iterable:
match data:
case str() as type_:
heading = type_.title().replace('_', ' ')
source_header = (
'Code Source' if type_ != 'stream_groups' else
'Stream Group Parameter'
)
print(f'## {heading}',
self.get_description(type_),
f'\nMAVLink Message | {source_header} | MAVLink Dialect',
'--- | --- | ---', sep='\n', file=file)
case MAVLinkMessage() as message:
name, source, dialect = message.as_tuple()
if dialect != MAVLinkDialect.UNKNOWN:
msg_url = self.MAVLINK_URL.format(dialect=dialect,
message_name=name.split(':')[0])
name = f'[{name}]({msg_url})'
if source != 'UNSUPPORTED' and not source.startswith('SRn'):
folder = source.split('/')[0]
base = 'libraries/' if folder not in self.VEHICLES else ''
code_url = self.ARDUPILOT_URL.format(branch=branch,
source=base+source)
source = f'[{source}]({code_url})'
print(name, source, dialect, sep=' | ', file=file)
if __name__ == '__main__':
from inspect import signature
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
detector_init_params = signature(MAVLinkDetector.__init__).parameters
default_vehicle = detector_init_params['vehicle'].default
vehicle_options = [default_vehicle, *MAVLinkDetector.VEHICLES]
default_exclusions = detector_init_params['exclude_libraries'].default
parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter)
parse_opts = parser.add_argument_group('parsing options')
parse_opts.add_argument('-v', '--vehicle', default=default_vehicle,
choices=vehicle_options, help='Vehicle folder, or ALL.')
parse_opts.add_argument('-e', '--exclude-library', action='append',
default=default_exclusions,
help='Libraries to exclude from the search.')
parse_opts.add_argument('-c', '--include-commands', action='store_true',
help='Include MAVLink commands as well as messages.')
parse_opts.add_argument('-g', '--include-stream-groups', action='store_true',
help='Include stream group message sets in the output.')
parse_opts.add_argument('-u', '--include-unsupported', action='store_true',
help='Include unsupported messages in the output.')
export_opts = parser.add_argument_group('export options')
export_opts.add_argument('-q', '--quiet', action='store_true',
help='Disable printout, only export a file.')
export_opts.add_argument('-f', '--format', default='markdown',
choices=['csv', 'markdown', 'none'],
help='Desired format for the exported file.')
export_opts.add_argument('-b', '--branch',
help=('The branch to link to in markdown mode.'
' Defaults to the branch in the working directory.'))
export_opts.add_argument('--filename', help='Override default filename.')
export_opts.add_argument('--header', help='Header for the markdown file.')
export_opts.add_argument('--no-intro', action='store_true',
help="Flag to not use the automatic markdown intro.")
args = parser.parse_args()
assert (args.vehicle in MAVLinkDetector.VEHICLES
or not args.include_stream_groups), \
'Determining stream groups requires a single vehicle to be specified.'
common_files = (MAVLinkDetector.BASE_DIR / 'libraries').glob('**/*.cpp')
messages = MAVLinkDetector(common_files, args.vehicle, args.exclude_library)
include_options = dict(
include_commands = args.include_commands,
include_stream_groups = args.include_stream_groups,
include_unsupported = args.include_unsupported,
)
if not args.quiet:
messages.printout(**include_options)
if args.format != 'none':
ext = messages.EXPORT_FILETYPES[args.format]
branch = args.branch
if not branch:
import subprocess
pattern = re.compile(r'On branch ([\S]*)')
result = subprocess.run(['git', 'status'], capture_output=True).stdout
try:
branch, = pattern.search(result.decode()).groups()
except AttributeError as e:
raise Exception(
'No --branch specified, and "git status" failed to find one.'
'Please manually specify an ardupilot firmware branch for '
'code source hyperlinks (e.g. Sub-4.1) or ensure this '
'repository copy is managed by git.'
)
filename = (
args.filename or
f'{args.vehicle}_{branch}_MAVLink_Messages.{ext}'
)
messages.export(filename, type=args.format, branch=branch, header=args.header,
use_intro=not args.no_intro, **include_options)