forked from Archive/PX4-Autopilot
Tools: add parser to extract event definitions in source & generate json output
Example definition: /* EVENT * @description * test description * @arg1: test */ events::send<uint8_t>(events::ID("test_event"), "test message", events::Log::Error, 0);
This commit is contained in:
parent
fef2c43395
commit
7c5838116a
|
@ -0,0 +1,58 @@
|
|||
import codecs
|
||||
import json
|
||||
import sys
|
||||
import os
|
||||
|
||||
|
||||
class JsonOutput():
|
||||
def __init__(self, groups):
|
||||
all_json = {}
|
||||
all_json['version'] = 1
|
||||
component = {}
|
||||
all_json['components'] = {1: component} #1: autopilot component
|
||||
|
||||
all_events = {}
|
||||
component['namespace'] = "px4"
|
||||
component['event_groups'] = all_events
|
||||
|
||||
for group in groups:
|
||||
current_group = {}
|
||||
current_events = {}
|
||||
current_group['events'] = current_events
|
||||
all_events[group] = current_group
|
||||
|
||||
for e in groups[group]:
|
||||
event_obj = {}
|
||||
event_obj['name'] = e.name
|
||||
event_obj['message'] = e.message
|
||||
if e.description is not None:
|
||||
event_obj['description'] = e.description
|
||||
args = []
|
||||
for i in range(len(e.arguments)):
|
||||
arg = {}
|
||||
arg['type'] = e.arguments[i][0]
|
||||
arg['name'] = e.arguments[i][1]
|
||||
args.append(arg)
|
||||
if len(args) > 0:
|
||||
event_obj['arguments'] = args
|
||||
sub_id = e.sub_id
|
||||
assert sub_id not in current_events, \
|
||||
"Duplicate event ID for {0} (message: '{1}'), other event message: '{2}'".format(
|
||||
e.name, e.message, current_events[sub_id]['message'])
|
||||
current_events[sub_id] = event_obj
|
||||
|
||||
self.json = all_json
|
||||
|
||||
def save(self, filename):
|
||||
need_to_write = True
|
||||
# only write if current file is not the same, to avoid updating the file
|
||||
# timestamp
|
||||
if os.path.isfile(filename):
|
||||
with open(filename, 'rb') as json_file:
|
||||
existing_data = json.load(json_file)
|
||||
if existing_data == self.json:
|
||||
need_to_write = False
|
||||
if need_to_write:
|
||||
with codecs.open(filename, 'w', 'utf-8') as f:
|
||||
f.write(json.dumps(self.json,indent=2))
|
||||
|
|
@ -0,0 +1,282 @@
|
|||
import sys
|
||||
import re
|
||||
import math
|
||||
|
||||
def hash_32_fnv1a(data: str):
|
||||
hash_val = 0x811c9dc5
|
||||
prime = 0x1000193
|
||||
for i in range(len(data)):
|
||||
value = ord(data[i])
|
||||
hash_val = hash_val ^ value
|
||||
hash_val *= prime
|
||||
hash_val &= 0xffffffff
|
||||
return hash_val
|
||||
|
||||
|
||||
class Event(object):
|
||||
"""
|
||||
Single event definition
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self.name = None
|
||||
self.message = None
|
||||
self.description = None
|
||||
self.group = "default"
|
||||
self._arguments = []
|
||||
|
||||
@staticmethod
|
||||
def _get_id(name):
|
||||
return 0xffffff & hash_32_fnv1a(name)
|
||||
|
||||
@property
|
||||
def arguments(self):
|
||||
""" list of (type: str, name: str) tuples """
|
||||
return self._arguments
|
||||
|
||||
def set_default_arguments(self, num_args):
|
||||
""" set argument names to default (if not specified) """
|
||||
for i in range(num_args):
|
||||
self.add_argument(None, "arg"+str(i))
|
||||
|
||||
def _shift_printed_arguments(self, msg, offset):
|
||||
""" shift all {<idx> arguments by an offset """
|
||||
i = 0
|
||||
while i < len(msg):
|
||||
|
||||
if msg[i] == '\\': # escaped character
|
||||
i += 2
|
||||
continue
|
||||
|
||||
if msg[i] == '{':
|
||||
m = re.match(r"^(\d+)", msg[i+1:])
|
||||
if m:
|
||||
arg_idx = int(m.group(1)) + offset
|
||||
msg = msg[:i+1] + str(arg_idx) + msg[i+1+len(m.group(1)):]
|
||||
i += 1
|
||||
return msg
|
||||
|
||||
def prepend_arguments(self, arguments):
|
||||
""" prepend additional arguments, and shift all '{<index>}' in the
|
||||
description and message
|
||||
:param arguments: list of (type: str, name: str) tuples
|
||||
"""
|
||||
self._arguments = arguments + self._arguments
|
||||
num_added = len(arguments)
|
||||
if self.message is not None:
|
||||
self.message = self._shift_printed_arguments(self.message, num_added)
|
||||
if self.description is not None:
|
||||
self.description = self._shift_printed_arguments(self.description, num_added)
|
||||
|
||||
def add_argument(self, arg_type, name):
|
||||
self._arguments.append((arg_type, name))
|
||||
|
||||
@property
|
||||
def sub_id(self):
|
||||
return self._get_id(self.name)
|
||||
|
||||
def validate(self):
|
||||
if self.name is None: raise Exception("missing event name")
|
||||
if self.message is None: raise Exception("missing event message for {}".format(self.name))
|
||||
# just to ensure a common convention
|
||||
assert self.message[-1] != '.', "Avoid event message ending in '.' ({:})".format(self.message)
|
||||
# description is optional
|
||||
|
||||
class SourceParser(object):
|
||||
"""
|
||||
Parses provided data and stores all found events internally.
|
||||
"""
|
||||
|
||||
re_split_lines = re.compile(r'[\r\n]+')
|
||||
re_comment_start = re.compile(r'^\/\*\s*EVENT$')
|
||||
re_comment_content = re.compile(r'^\*\s*(.*)')
|
||||
re_comment_tag = re.compile(r'^@([a-zA-Z][a-zA-Z0-9_]*):?\s*(.*)')
|
||||
re_comment_end = re.compile(r'(.*?)\s*\*\/$')
|
||||
re_code_end = re.compile(r'(.*?)\s*;$')
|
||||
re_template_args = re.compile(r'([a-zA-Z0-9_:\.]+)\s*<([a-zA-Z0-9_,\s:]+)\s*>\s*\((.*)\);$')
|
||||
re_no_template_args = re.compile(r'([a-zA-Z0-9_:\.]+)\s*\((.*)\);$')
|
||||
re_event_id = re.compile(r'(events::)?ID\("([a-zA-Z0-9_]+)\"')
|
||||
|
||||
def __init__(self):
|
||||
self._events = {}
|
||||
|
||||
@property
|
||||
def events(self):
|
||||
""" dict of 'group': [Event] list """
|
||||
return self._events
|
||||
|
||||
def Parse(self, contents):
|
||||
"""
|
||||
Incrementally parse program contents and append all found events
|
||||
to the list.
|
||||
"""
|
||||
# This code is essentially a comment-parsing grammar. "state"
|
||||
# represents parser state. It contains human-readable state
|
||||
# names.
|
||||
state = None
|
||||
def finalize_current_tag(event, tag, value):
|
||||
if tag is None: return
|
||||
if tag == "description":
|
||||
descr = value.strip()
|
||||
# merge continued lines (but not e.g. enumerations)
|
||||
for i in range(1, len(descr)-1):
|
||||
if descr[i-1] != '\n' and descr[i] == '\n' and descr[i+1].isalpha():
|
||||
descr = descr[:i] + ' ' + descr[i+1:]
|
||||
event.description = descr
|
||||
elif tag == "group":
|
||||
known_groups = ["calibration", "health", "arming_check", "normal"]
|
||||
event.group = value.strip()
|
||||
if not event.group in known_groups:
|
||||
raise Exception("Unknown event group: '{}'\nKnown groups: {}\n" \
|
||||
"If this is not a typo, add the new group to the script".format(event.group, known_groups))
|
||||
elif tag.startswith("arg"):
|
||||
arg_index = int(tag[3:])-1
|
||||
arg_name = value.strip()
|
||||
assert len(event.arguments) == arg_index, "Invalid argument ordering/duplicate ({}, {})".format(tag, value)
|
||||
event.add_argument(None, arg_name)
|
||||
else:
|
||||
raise Exception("Invalid tag: {}\nvalue: {}".format(tag, value))
|
||||
|
||||
for line in self.re_split_lines.split(contents):
|
||||
line = line.strip()
|
||||
# Ignore empty lines
|
||||
if line == "":
|
||||
continue
|
||||
if self.re_comment_start.match(line):
|
||||
state = "parse-comments"
|
||||
event = Event()
|
||||
current_tag = None
|
||||
current_value = None
|
||||
current_code = ""
|
||||
continue
|
||||
if state is None:
|
||||
continue
|
||||
if state == "parse-command":
|
||||
current_code += line
|
||||
m = self.re_code_end.search(line)
|
||||
if m:
|
||||
# extract template arguments
|
||||
m = self.re_template_args.search(current_code)
|
||||
if m:
|
||||
call, template_args, args = m.group(1, 2, 3)
|
||||
template_args = template_args.split(',')
|
||||
else:
|
||||
m = self.re_no_template_args.search(current_code)
|
||||
if m:
|
||||
template_args = []
|
||||
call, args = m.group(1, 2)
|
||||
else:
|
||||
raise Exception("Failed to parse code line {:}".format(current_code))
|
||||
|
||||
# if event arguments are not specified, use default naming
|
||||
if len(event.arguments) == 0:
|
||||
event.set_default_arguments(len(template_args))
|
||||
|
||||
# get argument types from template arguments
|
||||
assert len(template_args) == len(event.arguments), \
|
||||
"Number of arguments mismatch (args: {:})".format(template_args)
|
||||
num_args = len(template_args)
|
||||
for i in range(num_args):
|
||||
arg_name = event.arguments[i][1]
|
||||
arg_type = template_args[i].strip()
|
||||
if arg_type.startswith('events::'):
|
||||
arg_type = arg_type[8:]
|
||||
arg_type = arg_type.replace('enums::', '')
|
||||
event.arguments[i] = (arg_type, arg_name)
|
||||
#print("method: {}, args: {}, template args: {}".format(call, args, event.arguments))
|
||||
|
||||
# extract function arguments
|
||||
args_split = self._parse_arguments(args)
|
||||
if call == "events::send" or call == "send":
|
||||
assert len(args_split) == num_args + 3, \
|
||||
"Unexpected Number of arguments for: {:}, {:}".format(args_split, num_args)
|
||||
m = self.re_event_id.search(args_split[0])
|
||||
if m:
|
||||
_, event_name = m.group(1, 2)
|
||||
else:
|
||||
raise Exception("Could not extract event ID from {:}".format(args_split[0]))
|
||||
event.name = event_name
|
||||
event.message = args_split[2][1:-1]
|
||||
elif call in ['reporter.healthFailure', 'reporter.armingCheckFailure']:
|
||||
assert len(args_split) == num_args + 5, \
|
||||
"Unexpected Number of arguments for: {:}, {:}".format(args_split, num_args)
|
||||
m = self.re_event_id.search(args_split[2])
|
||||
if m:
|
||||
_, event_name = m.group(1, 2)
|
||||
else:
|
||||
raise Exception("Could not extract event ID from {:}".format(args_split[2]))
|
||||
event.name = event_name
|
||||
event.message = args_split[4][1:-1]
|
||||
if 'health' in call:
|
||||
event.group = "health"
|
||||
else:
|
||||
event.group = "arming_check"
|
||||
event.prepend_arguments([('common::navigation_mode_category_t', 'modes'),
|
||||
('uint8_t', 'health_component_index')])
|
||||
else:
|
||||
raise Exception("unknown event method call: {}, args: {}".format(call, args))
|
||||
|
||||
event.validate()
|
||||
|
||||
# insert
|
||||
if not event.group in self._events:
|
||||
self._events[event.group] = []
|
||||
self._events[event.group].append(event)
|
||||
|
||||
state = None
|
||||
|
||||
else:
|
||||
m = self.re_comment_end.search(line)
|
||||
if m:
|
||||
line = m.group(1)
|
||||
last_comment_line = True
|
||||
else:
|
||||
last_comment_line = False
|
||||
m = self.re_comment_content.match(line)
|
||||
if m:
|
||||
comment_content = m.group(1)
|
||||
m = self.re_comment_tag.match(comment_content)
|
||||
if m:
|
||||
finalize_current_tag(event, current_tag, current_value)
|
||||
current_tag, current_value = m.group(1, 2)
|
||||
elif current_tag is not None:
|
||||
current_value += "\n"+comment_content
|
||||
# else: empty line before any tag
|
||||
elif not last_comment_line:
|
||||
# Invalid comment line (inside comment, but not starting with
|
||||
# "*" or "*/".
|
||||
raise Exception("Excpected a comment, got '{}'".format(line))
|
||||
if last_comment_line:
|
||||
finalize_current_tag(event, current_tag, current_value)
|
||||
state = "parse-command"
|
||||
return True
|
||||
|
||||
def _parse_arguments(self, args):
|
||||
"""
|
||||
given a string of arguments, returns a list of strings split into the
|
||||
arguments, with respecting brackets.
|
||||
args is expected to be a single line.
|
||||
Note: comments are not handled, also template arguments.
|
||||
|
||||
e.g. "32, test(4,4), \"e(c\", ab" -> ["32", "test(4,4)", "\"e(c\"", "ab"]
|
||||
"""
|
||||
args_split = []
|
||||
start = 0
|
||||
bracket = 0
|
||||
in_string = False
|
||||
for i in range(len(args)):
|
||||
if in_string and args[i] == "\"" and args[i-1] != "\\":
|
||||
in_string = False
|
||||
elif not in_string and args[i] == "\"":
|
||||
in_string = True
|
||||
if in_string:
|
||||
continue
|
||||
if args[i] in "{([":
|
||||
bracket += 1
|
||||
if args[i] in "})]":
|
||||
bracket -= 1
|
||||
if bracket == 0 and args[i] == ',':
|
||||
args_split.append(args[start:i].strip())
|
||||
start = i + 1
|
||||
args_split.append(args[start:].strip())
|
||||
return args_split
|
|
@ -0,0 +1,52 @@
|
|||
import os
|
||||
import re
|
||||
import codecs
|
||||
import sys
|
||||
|
||||
class SourceScanner(object):
|
||||
"""
|
||||
Traverses directory tree, reads all source files, and passes their contents
|
||||
to the Parser.
|
||||
"""
|
||||
|
||||
def ScanDir(self, srcdirs, parser):
|
||||
"""
|
||||
Scans provided path and passes all found contents to the parser using
|
||||
parser.Parse method.
|
||||
"""
|
||||
extensions = tuple([".cpp"])
|
||||
for srcdir in srcdirs:
|
||||
if os.path.isfile(srcdir):
|
||||
if not self.ScanFile(srcdir, parser):
|
||||
return False
|
||||
else:
|
||||
for dirname, dirnames, filenames in os.walk(srcdir):
|
||||
for filename in filenames:
|
||||
if filename.endswith(extensions):
|
||||
path = os.path.join(dirname, filename)
|
||||
try:
|
||||
if not self.ScanFile(path, parser):
|
||||
return False
|
||||
except:
|
||||
print(("Exception in file %s" % path))
|
||||
raise
|
||||
return True
|
||||
|
||||
def ScanFile(self, path, parser):
|
||||
"""
|
||||
Scans provided file and passes its contents to the parser using
|
||||
parser.Parse method.
|
||||
"""
|
||||
|
||||
with codecs.open(path, 'r', 'utf-8') as f:
|
||||
try:
|
||||
contents = f.read()
|
||||
except:
|
||||
contents = ''
|
||||
print('Failed reading file: %s, skipping content.' % path)
|
||||
pass
|
||||
try:
|
||||
return parser.Parse(contents)
|
||||
except Exception as e:
|
||||
print("Exception while parsing file {}".format(path))
|
||||
raise
|
|
@ -0,0 +1,97 @@
|
|||
#!/usr/bin/env python
|
||||
############################################################################
|
||||
#
|
||||
# Copyright (C) 2020 PX4 Development Team. All rights reserved.
|
||||
#
|
||||
# Redistribution and use in source and binary forms, with or without
|
||||
# modification, are permitted provided that the following conditions
|
||||
# are met:
|
||||
#
|
||||
# 1. Redistributions of source code must retain the above copyright
|
||||
# notice, this list of conditions and the following disclaimer.
|
||||
# 2. Redistributions in binary form must reproduce the above copyright
|
||||
# notice, this list of conditions and the following disclaimer in
|
||||
# the documentation and/or other materials provided with the
|
||||
# distribution.
|
||||
# 3. Neither the name PX4 nor the names of its contributors may be
|
||||
# used to endorse or promote products derived from this software
|
||||
# without specific prior written permission.
|
||||
#
|
||||
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
|
||||
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
|
||||
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
|
||||
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
|
||||
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
|
||||
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
|
||||
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS
|
||||
# OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
|
||||
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
|
||||
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
|
||||
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
||||
# POSSIBILITY OF SUCH DAMAGE.
|
||||
#
|
||||
############################################################################
|
||||
|
||||
#
|
||||
# PX4 events processor (main executable file)
|
||||
#
|
||||
# This tool scans the PX4 source code for definitions of events.
|
||||
#
|
||||
|
||||
import sys
|
||||
import os
|
||||
import argparse
|
||||
from px4events import srcscanner, srcparser, jsonout
|
||||
|
||||
import re
|
||||
import codecs
|
||||
|
||||
|
||||
def main():
|
||||
# Parse command line arguments
|
||||
parser = argparse.ArgumentParser(description="Process events definitions.")
|
||||
parser.add_argument("-s", "--src-path",
|
||||
default=["../src"],
|
||||
metavar="PATH",
|
||||
nargs='*',
|
||||
help="one or more paths/files to source files to scan for events")
|
||||
parser.add_argument("-j", "--json",
|
||||
nargs='?',
|
||||
const="events.json",
|
||||
metavar="FILENAME",
|
||||
help="Create Json output file"
|
||||
" (default FILENAME: events.json)")
|
||||
parser.add_argument('-v', '--verbose',
|
||||
action='store_true',
|
||||
help="verbose output")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
# Check for valid command
|
||||
if not (args.json):
|
||||
print("Error: You need to specify at least one output method!")
|
||||
parser.print_usage()
|
||||
sys.exit(1)
|
||||
|
||||
# Initialize source scanner and parser
|
||||
scanner = srcscanner.SourceScanner()
|
||||
parser = srcparser.SourceParser()
|
||||
|
||||
# Scan directories, and parse the files
|
||||
if args.verbose:
|
||||
print("Scanning source path " + str(args.src_path))
|
||||
|
||||
if not scanner.ScanDir(args.src_path, parser):
|
||||
sys.exit(1)
|
||||
|
||||
events = parser.events
|
||||
|
||||
# Output to JSON file
|
||||
if args.json:
|
||||
if args.verbose: print("Creating Json file " + args.json)
|
||||
cur_dir = os.path.dirname(os.path.realpath(__file__))
|
||||
out = jsonout.JsonOutput(events)
|
||||
out.save(args.json)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
Loading…
Reference in New Issue