diff --git a/Tools/scripts/mavlink_parse.py b/Tools/scripts/mavlink_parse.py new file mode 100755 index 0000000000..165111125f --- /dev/null +++ b/Tools/scripts/mavlink_parse.py @@ -0,0 +1,502 @@ +#!/usr/bin/env python + +import re +from enum import StrEnum # requires Python >= 3.11 +from pathlib import Path +from itertools import chain +from dataclasses import dataclass, astuple + +from pymavlink.dialects.v20 import ( + common, icarous, cubepilot, uAvionix, ardupilotmega +) + +class MAVLinkDialect(StrEnum): + # in subset, superset, unknown order, for correct links + # supported values must match imported dialect names + COMMON = 'common' + ICAROUS = 'icarous' + CUBEPILOT = 'cubepilot' + UAVIONIX = 'uAvionix' + ARDUPILOTMEGA = 'ardupilotmega' + UNKNOWN = 'UNKNOWN' + + +@dataclass(slots=True, order=True) +class MAVLinkMessage: + name: str + source: str + dialect: MAVLinkDialect = MAVLinkDialect.UNKNOWN + + PREFIX = 'MAVLINK_MSG_ID_' + + # Get message sets, for quick containment checks. + # Function is required because of Python's class scoping rules. + # See https://stackoverflow.com/questions/13905741. + def _get_known_messages(prefix): + ''' Returns a dictionary of {dialect: {messages}} given 'prefix'. ''' + return { + dialect: set(m for m in dir(globals()[dialect]) + if m.startswith(prefix)) + for dialect in MAVLinkDialect + if dialect != MAVLinkDialect.UNKNOWN + } + KNOWN_DIALECTS = _get_known_messages(PREFIX) + + # Try to determine dialect if initialised without one specified. + def __post_init__(self): + if self.dialect == MAVLinkDialect.UNKNOWN: + self.determine_dialect() + + @property + def id_name(self): + return self.PREFIX + self.name + + def determine_dialect(self): + for dialect, message_set in self.KNOWN_DIALECTS.items(): + if self.id_name in message_set: + self.dialect = dialect + break # dialect found, no need to continue searching + else: + self.dialect = MAVLinkDialect.UNKNOWN + + def as_tuple(self): + return astuple(self) + + def __str__(self): + return f'{self.name:<45}{self.source:<55}{self.dialect}' + + @classmethod + def get_unsupported(cls, supported: set, remove_prefix=True): + ''' Yields known messages that are not included in 'supported'. ''' + offset = len(cls.PREFIX) if remove_prefix else 0 + known_missing = set() # don't double-count for supersets + for dialect, message_set in cls.KNOWN_DIALECTS.items(): + missing_names = message_set - supported - known_missing + for name in missing_names: + yield cls(name[offset:], 'UNSUPPORTED', dialect) + known_missing |= missing_names + + +class MAVLinkCommand(MAVLinkMessage): + PREFIX = 'MAV_CMD_' + KNOWN_DIALECTS = MAVLinkMessage._get_known_messages(PREFIX) + + @property + def id_name(self): + return self.name # commands are registered with their prefix + + @classmethod + def get_unsupported(cls, supported: set, remove_prefix=False): + ''' Yields known commands that are not included in 'supported'. ''' + # avoid accidentally treating enum values as commands + enums = [f'{e}_' for e in ardupilotmega.enums + if e.startswith(cls.PREFIX)] + for command in super().get_unsupported(supported, remove_prefix): + if not any(command.name.startswith(e) for e in enums): + yield command + + +class MAVLinkDetector: + # file paths + BASE_DIR = Path(__file__).parent / '../..' + COMMON_FILE = BASE_DIR / 'libraries/GCS_MAVLink/GCS_Common.cpp' + STREAM_GROUP_FILE = 'GCS_MAVLink.cpp' + + # regex for messages handled by the autopilot + INCOMING_MESSAGES = re.compile(r'case MAVLINK_MSG_ID_([A-Z0-9_]*)') + # regex for commands handled by the autopilot + INCOMING_COMMANDS = re.compile(r'case (MAV_CMD_[A-Z0-9_]*)') + # regex for messages that can be requested from the autopilot + REQUESTABLE_REGION = re.compile(' map\[\]([^;]*);') + REQUESTABLE_MAP = re.compile(r'MAVLINK_MSG_ID_([A-Z0-9_]*),\s*MSG_([A-Z0-9_]*)') + # regex for messages the autopilot might send, but cannot be requested + OUTGOING_MESSAGES = re.compile(r'mavlink_msg_([a-z0-9_]*)_send\(') + # regex for extracting messages in stream groups + STREAM_GROUPS = re.compile(r'ap_message STREAM_([A-Z0-9_]*)_msgs\[\] = \{([^\}]*)') + AP_MESSAGE = re.compile(r'MSG_([A-Z0-9_]*)') + # regex for named values + NAMED_FLOAT = re.compile(r'send_named_float\("([\w]*)"') + NAMED_INT = re.compile(r'send_named_int\("([\w]*)"') + + TYPE_DESCRIPTIONS = { + 'incoming_messages': + 'Messages the autopilot handles when received.', + 'requestable_messages': + 'Messages that can be requested/streamed from the autopilot.', + 'outgoing_messages': + 'Messages the autopilot will send automatically (unrequested).', + 'named_floats': + 'Breakout of named floating-point (numerical) values sent by the autopilot.', + 'named_ints': + 'Breakout of named integer values sent by the autopilot.', + } + EXTRA_DESCRIPTIONS = { + 'stream_groups': + 'Message groups with stream rates requestable by `SRn_*` parameters.' + ' Messages in a group are only sent if the corresponding feature' + ' is active.', + 'missing_messages': + 'Unsupported / unhandled messages.', + 'incoming_commands': + TYPE_DESCRIPTIONS['incoming_messages'].replace('Messages','Commands'), + } + EXTRA_DESCRIPTIONS['missing_commands'] = \ + EXTRA_DESCRIPTIONS['missing_messages'].replace('messages', 'commands') + + TYPE_OPTIONS = { + 'messages': MAVLinkMessage, + 'commands': MAVLinkCommand, + } + + MAVLINK_URL = 'https://mavlink.io/en/messages/{dialect}.html#{message_name}' + ARDUPILOT_URL = 'https://github.com/ArduPilot/ardupilot/tree/{branch}/{source}' + EXPORT_FILETYPES = { + 'csv': 'csv', + 'markdown': 'md' + } + + MARKDOWN_INTRO = ( + 'The [MAVLink](https://mavlink.io/en/) protocol supports a variety' + ' of features and functionalities, but not all' + ' [messages](https://mavlink.io/en/messages/) or' + ' [commands](https://mavlink.io/en/services/command.html)' + ' are implemented by the ArduPilot ecosystem, or relevant to a' + ' particular autopilot firmware.\n\n' + 'This page is auto-generated from analysing the {vehicle} source' + ' code, and provides an indication of which messages{commands} are' + ' handled by, requestable from, and sent from the firmware. ' + 'A message being handled does not guarantee full support, but at' + ' least shows that the autopilot is aware it exists, and will try' + ' to do something meaningful with it.{unsupported}{stream_groups}' + ) + + VEHICLES = ('AntennaTracker', 'ArduCopter', 'ArduPlane', 'ArduSub', 'Rover') + + def __init__(self, common_files, vehicle='ALL', + exclude_libraries=['SITL', 'AP_Scripting']): + self.vehicle = vehicle + vehicles = [vehicle] if vehicle != 'ALL' else self.VEHICLES + files = chain(*((self.BASE_DIR / vehicle).glob('**/*.cpp') + for vehicle in vehicles), + common_files) + self.incoming_messages = {} + self.incoming_commands = {} + self.outgoing_messages = {} + self.requestable_messages = {} + self._ap_to_mavlink = { + 'NAMED_FLOAT': 'NAMED_VALUE_FLOAT', # manual inclusion + } + self.named_floats = {} + self.named_ints = {} + + for file in files: + folder = file.parent.stem + if folder in exclude_libraries: + continue + text = file.read_text() + source = f'{folder}/{file.name}' + if file == self.COMMON_FILE: + for mavlink, ap_message in self.find_requestable_messages(text): + self.requestable_messages[mavlink] = \ + MAVLinkMessage(mavlink, source) + if ap_message != mavlink: + self._ap_to_mavlink[ap_message] = mavlink + + named_types = ('float', 'int') if folder in vehicles else () + for type_ in named_types: + substring = f'named_{type_}s' + method = getattr(self, f'find_{substring}') + names = getattr(self, substring) + new_names = set(method(text)) - names.keys() + for name in new_names: + names[name] = MAVLinkMessage(f'NAMED_VALUE_{type_.upper()}:{name}', + source, MAVLinkDialect.COMMON) + + for method, data, type_ in ( + (self.find_incoming_messages, self.incoming_messages, 'messages'), + (self.find_incoming_commands, self.incoming_commands, 'commands'), + (self.find_outgoing_messages, self.outgoing_messages, 'messages'), + ): + new_data = set(method(text)) - data.keys() + cls = self.TYPE_OPTIONS[type_] + for datum in new_data: + data[datum] = cls(datum, source) + + self._supported_names = {'messages': None, 'commands': None} + self._unsupported = self._supported_names.copy() + + self._stream_groups = self.get_stream_groups(vehicle) if len(vehicles) == 1 else [] + + @classmethod + def get_description(cls, query): + return cls.TYPE_DESCRIPTIONS.get(query, + cls.EXTRA_DESCRIPTIONS.get(query, '') + ) + + @classmethod + def find_incoming_messages(cls, text: str): + return cls.INCOMING_MESSAGES.findall(text) + + @classmethod + def find_incoming_commands(cls, text: str): + return cls.INCOMING_COMMANDS.findall(text) + + @classmethod + def find_outgoing_messages(cls, text: str): + return (msg.upper() for msg in + cls.OUTGOING_MESSAGES.findall(text)) + + @classmethod + def find_requestable_messages(cls, text: str): + region = cls.REQUESTABLE_REGION.search(text).group() + return cls.REQUESTABLE_MAP.findall(region) + + @classmethod + def find_named_floats(cls, text: str): + return cls.NAMED_FLOAT.findall(text) + + @classmethod + def find_named_ints(cls, text: str): + return cls.NAMED_INT.findall(text) + + def get_stream_groups(self, vehicle): + stream_groups = ['stream_groups'] + + text = (self.BASE_DIR / vehicle / self.STREAM_GROUP_FILE).read_text() + for group_name, message_data in self.STREAM_GROUPS.findall(text): + stream_groups.extend(sorted( + MAVLinkMessage(self._ap_to_mavlink.get(ap_message, ap_message), + f'SRn_{group_name}') + for ap_message in self.AP_MESSAGE.findall(message_data) + )) + + return stream_groups + + def get_supported(self, type: str, inject_commands=False): + if type == 'messages': + for message_type in self.TYPE_DESCRIPTIONS: + values = getattr(self, message_type).values() + if not values: + continue + yield message_type + yield from sorted(values) + # add in incoming_commands right after incoming_messages + if inject_commands and message_type == 'incoming_messages': + yield from self.get_supported('commands') + elif type == 'commands': + yield 'incoming_commands' + yield from sorted(self.incoming_commands.values()) + + def get_supported_names(self, type: str): + if self._supported_names[type] is None: + self._supported_names[type] = set( + m.id_name for m in self.get_supported(type) + if isinstance(m, MAVLinkMessage) + ) + return self._supported_names[type] + + def get_unsupported(self, type='messages'): + if self._unsupported[type] is None: + supported_messages = self.get_supported_names(type) + cls = self.TYPE_OPTIONS[type] + self._unsupported[type] = sorted( + cls.get_unsupported(supported_messages) + ) + + if self._unsupported[type]: + yield f'missing_{type}' + yield from self._unsupported[type] + + def get_iterable(self, include_commands=False, include_stream_groups=False, + include_unsupported=False): + iterables = [self.get_supported('messages', include_commands)] + if include_stream_groups: + iterables.append(self._stream_groups) + if include_unsupported: + iterables.append(self.get_unsupported('messages')) + if include_commands: + iterables.append(self.get_unsupported('commands')) + return chain(*iterables) + + def printout(self, **iter_options): + for data in self.get_iterable(**iter_options): + match data: # requires Python >= 3.10 + case str() as type_: + print(f'\n{type_}:', + self.get_description(type_), + sep='\n') + case MAVLinkMessage() as message: + print(message) + + def export(self, filename: Path, type='csv', include_commands=False, + include_stream_groups=False, include_unsupported=False, + **export_options): + export_method = getattr(self, f'export_{type}') + # ensure export_method and get_iterable have the same options specified + iter_options = dict( + include_commands = include_commands, + include_stream_groups = include_stream_groups, + include_unsupported = include_unsupported, + ) + with open(filename, 'w') as file: + export_method(file, self.get_iterable(**iter_options), + **export_options, **iter_options) + + def export_csv(self, file, iterable, **ignore): + file.write('MAVLinkMessage,CodeSource,MAVLinkDialect,MessageType\n') + for data in iterable: + match data: + case str(): + current_type = data + case MAVLinkMessage() as message: + print(*message.as_tuple(), current_type, sep=',', file=file) + + def export_markdown(self, file, iterable, branch='master', header=None, + use_intro=True, **extra_kwargs): + if header == 'ArduSub': + import time + now = time.strftime('%Y-%m-%dT%H:%M:%S%z') + date = f'{now[:-2]}:{now[-2:]}' # add colon to the timezone + header = '\n'.join(( + '+++', + 'title = "MAVLink Support"', + 'description = "MAVLink message support details."', + f'{date = }', 'template = "docs/page.html"', + 'sort_by = "weight"', 'weight = 20', 'draft = false', + '[extra]', 'toc = true', 'top = false', + '+++' + )) + if header: + print(header, file=file) + + if use_intro: + commands = stream_groups = unsupported = '' + if extra_kwargs['include_commands']: + commands = ' (and commands)' + if extra_kwargs['include_unsupported']: + unsupported = ( + '\n\nKnown [unsupported messages](#missing-messages)' + f'{commands} are shown at the end.' + ) + if extra_kwargs['include_stream_groups']: + stream_groups = ( + '\n\nThe autopilot includes a set of [stream groups]' + '(#stream-groups) for convenience, which allow' + ' configuring the stream rates of groups of' + ' requestable messages by setting parameter values. ' + 'It is also possible to manually request messages,' + ' and request individual messages be streamed at a' + ' specified rate.' + ) + vehicle = self.vehicle.replace('ALL', 'ArduPilot') + + print(self.MARKDOWN_INTRO.format( + vehicle=vehicle, commands=commands, + stream_groups=stream_groups, unsupported=unsupported + ), file=file) + + for data in iterable: + match data: + case str() as type_: + heading = type_.title().replace('_', ' ') + source_header = ( + 'Code Source' if type_ != 'stream_groups' else + 'Stream Group Parameter' + ) + print(f'## {heading}', + self.get_description(type_), + f'\nMAVLink Message | {source_header} | MAVLink Dialect', + '--- | --- | ---', sep='\n', file=file) + case MAVLinkMessage() as message: + name, source, dialect = message.as_tuple() + if dialect != MAVLinkDialect.UNKNOWN: + msg_url = self.MAVLINK_URL.format(dialect=dialect, + message_name=name.split(':')[0]) + name = f'[{name}]({msg_url})' + if source != 'UNSUPPORTED' and not source.startswith('SRn'): + folder = source.split('/')[0] + base = 'libraries/' if folder not in self.VEHICLES else '' + code_url = self.ARDUPILOT_URL.format(branch=branch, + source=base+source) + source = f'[{source}]({code_url})' + + print(name, source, dialect, sep=' | ', file=file) + + +if __name__ == '__main__': + from inspect import signature + from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter + + detector_init_params = signature(MAVLinkDetector.__init__).parameters + default_vehicle = detector_init_params['vehicle'].default + vehicle_options = [default_vehicle, *MAVLinkDetector.VEHICLES] + default_exclusions = detector_init_params['exclude_libraries'].default + + parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter) + parse_opts = parser.add_argument_group('parsing options') + parse_opts.add_argument('-v', '--vehicle', default=default_vehicle, + choices=vehicle_options, help='Vehicle folder, or ALL.') + parse_opts.add_argument('-e', '--exclude-library', action='append', + default=default_exclusions, + help='Libraries to exclude from the search.') + parse_opts.add_argument('-c', '--include-commands', action='store_true', + help='Include MAVLink commands as well as messages.') + parse_opts.add_argument('-g', '--include-stream-groups', action='store_true', + help='Include stream group message sets in the output.') + parse_opts.add_argument('-u', '--include-unsupported', action='store_true', + help='Include unsupported messages in the output.') + export_opts = parser.add_argument_group('export options') + export_opts.add_argument('-q', '--quiet', action='store_true', + help='Disable printout, only export a file.') + export_opts.add_argument('-f', '--format', default='markdown', + choices=['csv', 'markdown', 'none'], + help='Desired format for the exported file.') + export_opts.add_argument('-b', '--branch', + help=('The branch to link to in markdown mode.' + ' Defaults to the branch in the working directory.')) + export_opts.add_argument('--filename', help='Override default filename.') + export_opts.add_argument('--header', help='Header for the markdown file.') + export_opts.add_argument('--no-intro', action='store_true', + help="Flag to not use the automatic markdown intro.") + + args = parser.parse_args() + + assert (args.vehicle in MAVLinkDetector.VEHICLES + or not args.include_stream_groups), \ + 'Determining stream groups requires a single vehicle to be specified.' + + common_files = (MAVLinkDetector.BASE_DIR / 'libraries').glob('**/*.cpp') + messages = MAVLinkDetector(common_files, args.vehicle, args.exclude_library) + + include_options = dict( + include_commands = args.include_commands, + include_stream_groups = args.include_stream_groups, + include_unsupported = args.include_unsupported, + ) + + if not args.quiet: + messages.printout(**include_options) + if args.format != 'none': + ext = messages.EXPORT_FILETYPES[args.format] + branch = args.branch + if not branch: + import subprocess + pattern = re.compile(r'On branch ([\S]*)') + result = subprocess.run(['git', 'status'], capture_output=True).stdout + try: + branch, = pattern.search(result.decode()).groups() + except AttributeError as e: + raise Exception( + 'No --branch specified, and "git status" failed to find one.' + 'Please manually specify an ardupilot firmware branch for ' + 'code source hyperlinks (e.g. Sub-4.1) or ensure this ' + 'repository copy is managed by git.' + ) + + filename = ( + args.filename or + f'{args.vehicle}_{branch}_MAVLink_Messages.{ext}' + ) + + messages.export(filename, type=args.format, branch=branch, header=args.header, + use_intro=not args.no_intro, **include_options)