import sys import re import math def hash_32_fnv1a(data: str): hash_val = 0x811c9dc5 prime = 0x1000193 for i in range(len(data)): value = ord(data[i]) hash_val = hash_val ^ value hash_val *= prime hash_val &= 0xffffffff return hash_val class Event(object): """ Single event definition """ def __init__(self): self.name = None self.message = None self.description = None self.group = "default" self.type = None self._arguments = [] @staticmethod def _get_id(name): return 0xffffff & hash_32_fnv1a(name) @property def arguments(self): """ list of (type: str, name: str) tuples """ return self._arguments def set_default_arguments(self, num_args): """ set argument names to default (if not specified) """ for i in range(num_args): self.add_argument(None, "arg"+str(i)) def _shift_printed_arguments(self, msg, offset): """ shift all { arguments by an offset """ i = 0 while i < len(msg): if msg[i] == '\\': # escaped character i += 2 continue if msg[i] == '{': m = re.match(r"^(\d+)", msg[i+1:]) if m: arg_idx = int(m.group(1)) + offset msg = msg[:i+1] + str(arg_idx) + msg[i+1+len(m.group(1)):] i += 1 return msg def prepend_arguments(self, arguments): """ prepend additional arguments, and shift all '{}' in the description and message :param arguments: list of (type: str, name: str) tuples """ self._arguments = arguments + self._arguments num_added = len(arguments) if self.message is not None: self.message = self._shift_printed_arguments(self.message, num_added) if self.description is not None: self.description = self._shift_printed_arguments(self.description, num_added) def add_argument(self, arg_type, name): self._arguments.append((arg_type, name)) @property def sub_id(self): return self._get_id(self.name) def validate(self): if self.name is None: raise Exception("missing event name") if self.message is None: raise Exception("missing event message for {}".format(self.name)) # just to ensure a common convention assert self.message[-1] != '.', "Avoid event message ending in '.' ({:})".format(self.message) # description is optional class SourceParser(object): """ Parses provided data and stores all found events internally. """ re_split_lines = re.compile(r'[\r\n]+') re_comment_start = re.compile(r'^\/\*\s*EVENT$') re_events_send = re.compile(r'^events::send[<\(]') re_comment_content = re.compile(r'^\*\s*(.*)') re_comment_tag = re.compile(r'^@([a-zA-Z][a-zA-Z0-9_-]*):?\s*(.*)') re_comment_end = re.compile(r'(.*?)\s*\*\/$') re_code_end = re.compile(r'(.*?)\s*;$') re_template_args = re.compile(r'([a-zA-Z0-9_:\.]+)\s*<([a-zA-Z0-9_,\s:]+)\s*>\s*\((.*)\);$') re_no_template_args = re.compile(r'([a-zA-Z0-9_:\.]+)\s*\((.*)\);$') re_event_id = re.compile(r'(events::)?ID\("([a-zA-Z0-9_]+)\"') def __init__(self): self._events = {} @property def events(self): """ dict of 'group': [Event] list """ return self._events def Parse(self, contents, path): """ Incrementally parse program contents and append all found events to the list. """ # This code is essentially a comment-parsing grammar. "state" # represents parser state. It contains human-readable state # names. state = None def finalize_current_tag(event, tag, value): if tag is None: return True if tag == "description": descr = value.strip() # merge continued lines (but not e.g. enumerations) for i in range(1, len(descr)-1): if descr[i-1] != '\n' and descr[i] == '\n' and descr[i+1].isalpha(): descr = descr[:i] + ' ' + descr[i+1:] event.description = descr elif tag == "group": known_groups = ["calibration", "health", "arming_check", "default"] event.group = value.strip() if not event.group in known_groups: raise Exception("Unknown event group: '{}'\nKnown groups: {}\n" \ "If this is not a typo, add the new group to the script".format(event.group, known_groups)) elif tag == "type": event.type = value.strip() elif tag == "skip-file": print("Skipping file: {:}".format(path)) return False elif tag.startswith("arg"): arg_index = int(tag[3:])-1 arg_name = value.strip() assert len(event.arguments) == arg_index, "Invalid argument ordering/duplicate ({}, {})".format(tag, value) event.add_argument(None, arg_name) else: raise Exception("Invalid tag: {}\nvalue: {}".format(tag, value)) return True for line in self.re_split_lines.split(contents): line = line.strip() # Ignore empty lines if line == "": continue assert not line.startswith("using namespace events;"), "Avoid 'using namespace events;', as it prevents proper events extraction" # Check for '/* EVENT' if self.re_comment_start.match(line): state = "parse-comments" event = Event() current_tag = None current_value = None current_code = "" continue # Check for events::send (allow '/* EVENT' to be optional) if state is None and self.re_events_send.match(line): state = "parse-command" event = Event() current_tag = None current_value = None current_code = "" if state is None: assert 'events::ID(' not in line or line.startswith('//'), \ "unmatched 'events::ID(' found in line '{:}'".format(line) continue if state == "parse-command": current_code += line m = self.re_code_end.search(line) if m: # extract template arguments m = self.re_template_args.search(current_code) if m: call, template_args, args = m.group(1, 2, 3) template_args = template_args.split(',') else: m = self.re_no_template_args.search(current_code) if m: template_args = [] call, args = m.group(1, 2) else: raise Exception("Failed to parse code line {:}".format(current_code)) # if event arguments are not specified, use default naming if len(event.arguments) == 0: event.set_default_arguments(len(template_args)) # get argument types from template arguments assert len(template_args) == len(event.arguments), \ "Number of arguments mismatch (args: {:})".format(template_args) num_args = len(template_args) for i in range(num_args): arg_name = event.arguments[i][1] arg_type = template_args[i].strip() if arg_type.startswith('events::'): arg_type = arg_type[8:] arg_type = arg_type.replace('enums::', '') event.arguments[i] = (arg_type, arg_name) #print("method: {}, args: {}, template args: {}".format(call, args, event.arguments)) ignore_event = False def parse_message(s): assert s[0] == '"', "Argument must be a string literal: {:}".format(s) # unescape \x, to treat the string the same as the C++ compiler return s[1:-1].encode("utf-8").decode('unicode_escape') # extract function arguments args_split = self._parse_arguments(args) if call == "events::send" or call == "send": if len(args_split) == 1: # This is a send call for a generated event ignore_event = True else: assert len(args_split) == num_args + 3, \ "Unexpected Number of arguments for: {:}, " \ "num template args: {:} (missing template args?)" \ .format(args_split, num_args) m = self.re_event_id.search(args_split[0]) if m: _, event_name = m.group(1, 2) else: raise Exception("Could not extract event ID from {:}".format(args_split[0])) event.name = event_name event.message = parse_message(args_split[2]) elif call in ['reporter.healthFailure', 'reporter.armingCheckFailure']: assert len(args_split) == num_args + 5, \ "Unexpected Number of arguments for: {:}, {:}".format(args_split, num_args) m = self.re_event_id.search(args_split[2]) if m: _, event_name = m.group(1, 2) else: raise Exception("Could not extract event ID from {:}".format(args_split[2])) event.name = event_name event.message = parse_message(args_split[4]) if 'health' in call: event.group = "health" else: event.group = "arming_check" event.prepend_arguments([('navigation_mode_group_t', 'modes'), ('uint8_t', 'health_component_index')]) elif call in ['reporter.healthFailureExt', 'reporter.armingCheckFailureExt']: # from ROS2 assert len(args_split) == num_args + 3, \ "Unexpected Number of arguments for: {:}, {:}".format(args_split, num_args) m = self.re_event_id.search(args_split[0]) if m: _, event_name = m.group(1, 2) else: raise Exception("Could not extract event ID from {:}".format(args_split[0])) event.name = event_name event.message = args_split[2][1:-1] if 'health' in call: event.group = "health" else: event.group = "arming_check" event.prepend_arguments([('navigation_mode_group_t', 'modes'), ('uint8_t', 'health_component_index')]) else: raise Exception("unknown event method call: {}, args: {}".format(call, args)) if not ignore_event: event.validate() # insert if not event.group in self._events: self._events[event.group] = [] self._events[event.group].append(event) state = None else: # parse-comments m = self.re_comment_end.search(line) if m: line = m.group(1) last_comment_line = True else: last_comment_line = False m = self.re_comment_content.match(line) if m: comment_content = m.group(1) m = self.re_comment_tag.match(comment_content) if m: if not finalize_current_tag(event, current_tag, current_value): return True # skip file current_tag, current_value = m.group(1, 2) elif current_tag is not None: current_value += "\n"+comment_content # else: empty line before any tag elif not last_comment_line: # Invalid comment line (inside comment, but not starting with # "*" or "*/". raise Exception("Excpected a comment, got '{}'".format(line)) if last_comment_line: if not finalize_current_tag(event, current_tag, current_value): return True # skip file state = "parse-command" return True def _parse_arguments(self, args): """ given a string of arguments, returns a list of strings split into the arguments, with respecting brackets. args is expected to be a single line. Note: comments are not handled, also template arguments. e.g. "32, test(4,4), \"e(c\", ab" -> ["32", "test(4,4)", "\"e(c\"", "ab"] """ args_split = [] start = 0 bracket = 0 in_string = False for i in range(len(args)): if in_string and args[i] == "\"" and args[i-1] != "\\": in_string = False elif not in_string and args[i] == "\"": in_string = True if in_string: continue if args[i] in "{([": bracket += 1 if args[i] in "})]": bracket -= 1 if bracket == 0 and args[i] == ',': args_split.append(args[start:i].strip()) start = i + 1 args_split.append(args[start:].strip()) return args_split