#!/usr/bin/env python import re from enum import StrEnum # requires Python >= 3.11 from pathlib import Path from itertools import chain from dataclasses import dataclass, astuple from pymavlink.dialects.v20 import ( common, icarous, cubepilot, uAvionix, ardupilotmega ) class MAVLinkDialect(StrEnum): # in subset, superset, unknown order, for correct links # supported values must match imported dialect names COMMON = 'common' ICAROUS = 'icarous' CUBEPILOT = 'cubepilot' UAVIONIX = 'uAvionix' ARDUPILOTMEGA = 'ardupilotmega' UNKNOWN = 'UNKNOWN' @dataclass(slots=True, order=True) class MAVLinkMessage: name: str source: str dialect: MAVLinkDialect = MAVLinkDialect.UNKNOWN PREFIX = 'MAVLINK_MSG_ID_' # Get message sets, for quick containment checks. # Function is required because of Python's class scoping rules. # See https://stackoverflow.com/questions/13905741. def _get_known_messages(prefix): ''' Returns a dictionary of {dialect: {messages}} given 'prefix'. ''' return { dialect: set(m for m in dir(globals()[dialect]) if m.startswith(prefix)) for dialect in MAVLinkDialect if dialect != MAVLinkDialect.UNKNOWN } KNOWN_DIALECTS = _get_known_messages(PREFIX) # Try to determine dialect if initialised without one specified. def __post_init__(self): if self.dialect == MAVLinkDialect.UNKNOWN: self.determine_dialect() @property def id_name(self): return self.PREFIX + self.name def determine_dialect(self): for dialect, message_set in self.KNOWN_DIALECTS.items(): if self.id_name in message_set: self.dialect = dialect break # dialect found, no need to continue searching else: self.dialect = MAVLinkDialect.UNKNOWN def as_tuple(self): return astuple(self) def __str__(self): return f'{self.name:<45}{self.source:<55}{self.dialect}' @classmethod def get_unsupported(cls, supported: set, remove_prefix=True): ''' Yields known messages that are not included in 'supported'. ''' offset = len(cls.PREFIX) if remove_prefix else 0 known_missing = set() # don't double-count for supersets for dialect, message_set in cls.KNOWN_DIALECTS.items(): missing_names = message_set - supported - known_missing for name in missing_names: yield cls(name[offset:], 'UNSUPPORTED', dialect) known_missing |= missing_names class MAVLinkCommand(MAVLinkMessage): PREFIX = 'MAV_CMD_' KNOWN_DIALECTS = MAVLinkMessage._get_known_messages(PREFIX) @property def id_name(self): return self.name # commands are registered with their prefix @classmethod def get_unsupported(cls, supported: set, remove_prefix=False): ''' Yields known commands that are not included in 'supported'. ''' # avoid accidentally treating enum values as commands enums = [f'{e}_' for e in ardupilotmega.enums if e.startswith(cls.PREFIX)] for command in super().get_unsupported(supported, remove_prefix): if not any(command.name.startswith(e) for e in enums): yield command class MAVLinkDetector: # file paths BASE_DIR = Path(__file__).parent / '../..' COMMON_FILE = BASE_DIR / 'libraries/GCS_MAVLink/GCS_Common.cpp' STREAM_GROUP_FILE = 'GCS_MAVLink.cpp' # regex for messages handled by the autopilot INCOMING_MESSAGES = re.compile(r'case MAVLINK_MSG_ID_([A-Z0-9_]*)') # regex for commands handled by the autopilot INCOMING_COMMANDS = re.compile(r'case (MAV_CMD_[A-Z0-9_]*)') # regex for messages that can be requested from the autopilot REQUESTABLE_REGION = re.compile(' map\[\]([^;]*);') REQUESTABLE_MAP = re.compile(r'MAVLINK_MSG_ID_([A-Z0-9_]*),\s*MSG_([A-Z0-9_]*)') # regex for messages the autopilot might send, but cannot be requested OUTGOING_MESSAGES = re.compile(r'mavlink_msg_([a-z0-9_]*)_send\(') # regex for extracting messages in stream groups STREAM_GROUPS = re.compile(r'ap_message STREAM_([A-Z0-9_]*)_msgs\[\] = \{([^\}]*)') AP_MESSAGE = re.compile(r'MSG_([A-Z0-9_]*)') # regex for named values NAMED_FLOAT = re.compile(r'send_named_float\("([\w]*)"') NAMED_INT = re.compile(r'send_named_int\("([\w]*)"') TYPE_DESCRIPTIONS = { 'incoming_messages': 'Messages the autopilot handles when received.', 'requestable_messages': 'Messages that can be requested/streamed from the autopilot.', 'outgoing_messages': 'Messages the autopilot will send automatically (unrequested).', 'named_floats': 'Breakout of named floating-point (numerical) values sent by the autopilot.', 'named_ints': 'Breakout of named integer values sent by the autopilot.', } EXTRA_DESCRIPTIONS = { 'stream_groups': 'Message groups with stream rates requestable by `SRn_*` parameters.' ' Messages in a group are only sent if the corresponding feature' ' is active.', 'missing_messages': 'Unsupported / unhandled messages.', 'incoming_commands': TYPE_DESCRIPTIONS['incoming_messages'].replace('Messages','Commands'), } EXTRA_DESCRIPTIONS['missing_commands'] = \ EXTRA_DESCRIPTIONS['missing_messages'].replace('messages', 'commands') TYPE_OPTIONS = { 'messages': MAVLinkMessage, 'commands': MAVLinkCommand, } MAVLINK_URL = 'https://mavlink.io/en/messages/{dialect}.html#{message_name}' ARDUPILOT_URL = 'https://github.com/ArduPilot/ardupilot/tree/{branch}/{source}' EXPORT_FILETYPES = { 'csv': 'csv', 'markdown': 'md' } MARKDOWN_INTRO = ( 'The [MAVLink](https://mavlink.io/en/) protocol supports a variety' ' of features and functionalities, but not all' ' [messages](https://mavlink.io/en/messages/) or' ' [commands](https://mavlink.io/en/services/command.html)' ' are implemented by the ArduPilot ecosystem, or relevant to a' ' particular autopilot firmware.\n\n' 'This page is auto-generated from analysing the {vehicle} source' ' code, and provides an indication of which messages{commands} are' ' handled by, requestable from, and sent from the firmware. ' 'A message being handled does not guarantee full support, but at' ' least shows that the autopilot is aware it exists, and will try' ' to do something meaningful with it.{unsupported}{stream_groups}' ) VEHICLES = ('AntennaTracker', 'ArduCopter', 'ArduPlane', 'ArduSub', 'Rover') def __init__(self, common_files, vehicle='ALL', exclude_libraries=['SITL', 'AP_Scripting']): self.vehicle = vehicle vehicles = [vehicle] if vehicle != 'ALL' else self.VEHICLES files = chain(*((self.BASE_DIR / vehicle).glob('**/*.cpp') for vehicle in vehicles), common_files) self.incoming_messages = {} self.incoming_commands = {} self.outgoing_messages = {} self.requestable_messages = {} self._ap_to_mavlink = { 'NAMED_FLOAT': 'NAMED_VALUE_FLOAT', # manual inclusion } self.named_floats = {} self.named_ints = {} for file in files: folder = file.parent.stem if folder in exclude_libraries: continue text = file.read_text() source = f'{folder}/{file.name}' if file == self.COMMON_FILE: for mavlink, ap_message in self.find_requestable_messages(text): self.requestable_messages[mavlink] = \ MAVLinkMessage(mavlink, source) if ap_message != mavlink: self._ap_to_mavlink[ap_message] = mavlink named_types = ('float', 'int') if folder in vehicles else () for type_ in named_types: substring = f'named_{type_}s' method = getattr(self, f'find_{substring}') names = getattr(self, substring) new_names = set(method(text)) - names.keys() for name in new_names: names[name] = MAVLinkMessage(f'NAMED_VALUE_{type_.upper()}:{name}', source, MAVLinkDialect.COMMON) for method, data, type_ in ( (self.find_incoming_messages, self.incoming_messages, 'messages'), (self.find_incoming_commands, self.incoming_commands, 'commands'), (self.find_outgoing_messages, self.outgoing_messages, 'messages'), ): new_data = set(method(text)) - data.keys() cls = self.TYPE_OPTIONS[type_] for datum in new_data: data[datum] = cls(datum, source) self._supported_names = {'messages': None, 'commands': None} self._unsupported = self._supported_names.copy() self._stream_groups = self.get_stream_groups(vehicle) if len(vehicles) == 1 else [] @classmethod def get_description(cls, query): return cls.TYPE_DESCRIPTIONS.get(query, cls.EXTRA_DESCRIPTIONS.get(query, '') ) @classmethod def find_incoming_messages(cls, text: str): return cls.INCOMING_MESSAGES.findall(text) @classmethod def find_incoming_commands(cls, text: str): return cls.INCOMING_COMMANDS.findall(text) @classmethod def find_outgoing_messages(cls, text: str): return (msg.upper() for msg in cls.OUTGOING_MESSAGES.findall(text)) @classmethod def find_requestable_messages(cls, text: str): region = cls.REQUESTABLE_REGION.search(text).group() return cls.REQUESTABLE_MAP.findall(region) @classmethod def find_named_floats(cls, text: str): return cls.NAMED_FLOAT.findall(text) @classmethod def find_named_ints(cls, text: str): return cls.NAMED_INT.findall(text) def get_stream_groups(self, vehicle): stream_groups = ['stream_groups'] text = (self.BASE_DIR / vehicle / self.STREAM_GROUP_FILE).read_text() for group_name, message_data in self.STREAM_GROUPS.findall(text): stream_groups.extend(sorted( MAVLinkMessage(self._ap_to_mavlink.get(ap_message, ap_message), f'SRn_{group_name}') for ap_message in self.AP_MESSAGE.findall(message_data) )) return stream_groups def get_supported(self, type: str, inject_commands=False): if type == 'messages': for message_type in self.TYPE_DESCRIPTIONS: values = getattr(self, message_type).values() if not values: continue yield message_type yield from sorted(values) # add in incoming_commands right after incoming_messages if inject_commands and message_type == 'incoming_messages': yield from self.get_supported('commands') elif type == 'commands': yield 'incoming_commands' yield from sorted(self.incoming_commands.values()) def get_supported_names(self, type: str): if self._supported_names[type] is None: self._supported_names[type] = set( m.id_name for m in self.get_supported(type) if isinstance(m, MAVLinkMessage) ) return self._supported_names[type] def get_unsupported(self, type='messages'): if self._unsupported[type] is None: supported_messages = self.get_supported_names(type) cls = self.TYPE_OPTIONS[type] self._unsupported[type] = sorted( cls.get_unsupported(supported_messages) ) if self._unsupported[type]: yield f'missing_{type}' yield from self._unsupported[type] def get_iterable(self, include_commands=False, include_stream_groups=False, include_unsupported=False): iterables = [self.get_supported('messages', include_commands)] if include_stream_groups: iterables.append(self._stream_groups) if include_unsupported: iterables.append(self.get_unsupported('messages')) if include_commands: iterables.append(self.get_unsupported('commands')) return chain(*iterables) def printout(self, **iter_options): for data in self.get_iterable(**iter_options): match data: # requires Python >= 3.10 case str() as type_: print(f'\n{type_}:', self.get_description(type_), sep='\n') case MAVLinkMessage() as message: print(message) def export(self, filename: Path, type='csv', include_commands=False, include_stream_groups=False, include_unsupported=False, **export_options): export_method = getattr(self, f'export_{type}') # ensure export_method and get_iterable have the same options specified iter_options = dict( include_commands = include_commands, include_stream_groups = include_stream_groups, include_unsupported = include_unsupported, ) with open(filename, 'w') as file: export_method(file, self.get_iterable(**iter_options), **export_options, **iter_options) def export_csv(self, file, iterable, **ignore): file.write('MAVLinkMessage,CodeSource,MAVLinkDialect,MessageType\n') for data in iterable: match data: case str(): current_type = data case MAVLinkMessage() as message: print(*message.as_tuple(), current_type, sep=',', file=file) def export_markdown(self, file, iterable, branch='master', header=None, use_intro=True, **extra_kwargs): if header == 'ArduSub': import time now = time.strftime('%Y-%m-%dT%H:%M:%S%z') date = f'{now[:-2]}:{now[-2:]}' # add colon to the timezone header = '\n'.join(( '+++', 'title = "MAVLink Support"', 'description = "MAVLink message support details."', f'{date = }', 'template = "docs/page.html"', 'sort_by = "weight"', 'weight = 20', 'draft = false', '[extra]', 'toc = true', 'top = false', '+++' )) if header: print(header, file=file) if use_intro: commands = stream_groups = unsupported = '' if extra_kwargs['include_commands']: commands = ' (and commands)' if extra_kwargs['include_unsupported']: unsupported = ( '\n\nKnown [unsupported messages](#missing-messages)' f'{commands} are shown at the end.' ) if extra_kwargs['include_stream_groups']: stream_groups = ( '\n\nThe autopilot includes a set of [stream groups]' '(#stream-groups) for convenience, which allow' ' configuring the stream rates of groups of' ' requestable messages by setting parameter values. ' 'It is also possible to manually request messages,' ' and request individual messages be streamed at a' ' specified rate.' ) vehicle = self.vehicle.replace('ALL', 'ArduPilot') print(self.MARKDOWN_INTRO.format( vehicle=vehicle, commands=commands, stream_groups=stream_groups, unsupported=unsupported ), file=file) for data in iterable: match data: case str() as type_: heading = type_.title().replace('_', ' ') source_header = ( 'Code Source' if type_ != 'stream_groups' else 'Stream Group Parameter' ) print(f'## {heading}', self.get_description(type_), f'\nMAVLink Message | {source_header} | MAVLink Dialect', '--- | --- | ---', sep='\n', file=file) case MAVLinkMessage() as message: name, source, dialect = message.as_tuple() if dialect != MAVLinkDialect.UNKNOWN: msg_url = self.MAVLINK_URL.format(dialect=dialect, message_name=name.split(':')[0]) name = f'[{name}]({msg_url})' if source != 'UNSUPPORTED' and not source.startswith('SRn'): folder = source.split('/')[0] base = 'libraries/' if folder not in self.VEHICLES else '' code_url = self.ARDUPILOT_URL.format(branch=branch, source=base+source) source = f'[{source}]({code_url})' print(name, source, dialect, sep=' | ', file=file) if __name__ == '__main__': from inspect import signature from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter detector_init_params = signature(MAVLinkDetector.__init__).parameters default_vehicle = detector_init_params['vehicle'].default vehicle_options = [default_vehicle, *MAVLinkDetector.VEHICLES] default_exclusions = detector_init_params['exclude_libraries'].default parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter) parse_opts = parser.add_argument_group('parsing options') parse_opts.add_argument('-v', '--vehicle', default=default_vehicle, choices=vehicle_options, help='Vehicle folder, or ALL.') parse_opts.add_argument('-e', '--exclude-library', action='append', default=default_exclusions, help='Libraries to exclude from the search.') parse_opts.add_argument('-c', '--include-commands', action='store_true', help='Include MAVLink commands as well as messages.') parse_opts.add_argument('-g', '--include-stream-groups', action='store_true', help='Include stream group message sets in the output.') parse_opts.add_argument('-u', '--include-unsupported', action='store_true', help='Include unsupported messages in the output.') export_opts = parser.add_argument_group('export options') export_opts.add_argument('-q', '--quiet', action='store_true', help='Disable printout, only export a file.') export_opts.add_argument('-f', '--format', default='markdown', choices=['csv', 'markdown', 'none'], help='Desired format for the exported file.') export_opts.add_argument('-b', '--branch', help=('The branch to link to in markdown mode.' ' Defaults to the branch in the working directory.')) export_opts.add_argument('--filename', help='Override default filename.') export_opts.add_argument('--header', help='Header for the markdown file.') export_opts.add_argument('--no-intro', action='store_true', help="Flag to not use the automatic markdown intro.") args = parser.parse_args() assert (args.vehicle in MAVLinkDetector.VEHICLES or not args.include_stream_groups), \ 'Determining stream groups requires a single vehicle to be specified.' common_files = (MAVLinkDetector.BASE_DIR / 'libraries').glob('**/*.cpp') messages = MAVLinkDetector(common_files, args.vehicle, args.exclude_library) include_options = dict( include_commands = args.include_commands, include_stream_groups = args.include_stream_groups, include_unsupported = args.include_unsupported, ) if not args.quiet: messages.printout(**include_options) if args.format != 'none': ext = messages.EXPORT_FILETYPES[args.format] branch = args.branch if not branch: import subprocess pattern = re.compile(r'On branch ([\S]*)') result = subprocess.run(['git', 'status'], capture_output=True).stdout try: branch, = pattern.search(result.decode()).groups() except AttributeError as e: raise Exception( 'No --branch specified, and "git status" failed to find one.' 'Please manually specify an ardupilot firmware branch for ' 'code source hyperlinks (e.g. Sub-4.1) or ensure this ' 'repository copy is managed by git.' ) filename = ( args.filename or f'{args.vehicle}_{branch}_MAVLink_Messages.{ext}' ) messages.export(filename, type=args.format, branch=branch, header=args.header, use_intro=not args.no_intro, **include_options)