px_generate_uorb_topic_files.py: beautify

This commit is contained in:
TSC21 2018-09-11 17:28:47 +01:00 committed by Beat Küng
parent cafc2f5e61
commit 4137517d12
1 changed files with 324 additions and 291 deletions

View File

@ -51,11 +51,11 @@ sys.path.append(px4_tools_dir + "/gencpp/src")
px4_msg_dir = os.path.join(px4_tools_dir, "..")
try:
import em
import genmsg.template_tools
import em
import genmsg.template_tools
except ImportError as e:
print("python import error: ", e)
print('''
print("python import error: ", e)
print('''
Required python packages not installed.
On a Debian/Ubuntu system please run:
@ -69,7 +69,7 @@ On MacOS please run:
On Windows please run:
easy_install empy catkin_pkg
''')
exit(1)
exit(1)
__author__ = "Sergey Belash, Thomas Gubler, Beat Kueng"
__copyright__ = "Copyright (C) 2013-2016 PX4 Development Team."
@ -85,25 +85,28 @@ PACKAGE = 'px4'
TOPICS_TOKEN = '# TOPICS '
IDL_TEMPLATE_FILE = 'msg.idl.template'
class MsgScope:
NONE = 0
SEND = 1
RECEIVE = 2
def get_multi_topics(filename):
"""
Get TOPICS names from a "# TOPICS" line
"""
ofile = open(filename, 'r')
text = ofile.read()
result = []
for each_line in text.split('\n'):
if each_line.startswith (TOPICS_TOKEN):
topic_names_str = each_line.strip()
topic_names_str = topic_names_str.replace(TOPICS_TOKEN, "")
result.extend(topic_names_str.split(" "))
ofile.close()
return result
"""
Get TOPICS names from a "# TOPICS" line
"""
ofile = open(filename, 'r')
text = ofile.read()
result = []
for each_line in text.split('\n'):
if each_line.startswith(TOPICS_TOKEN):
topic_names_str = each_line.strip()
topic_names_str = topic_names_str.replace(TOPICS_TOKEN, "")
result.extend(topic_names_str.split(" "))
ofile.close()
return result
def get_msgs_list(msgdir):
"""
@ -113,136 +116,154 @@ def get_msgs_list(msgdir):
def generate_output_from_file(format_idx, filename, outputdir, package, templatedir, includepath):
"""
Converts a single .msg file to an uorb header/source file
"""
msg_context = genmsg.msg_loader.MsgContext.create_default()
full_type_name = genmsg.gentools.compute_full_type_name(package, os.path.basename(filename))
spec = genmsg.msg_loader.load_msg_from_file(msg_context, filename, full_type_name)
field_name_and_type = {}
for field in spec.parsed_fields():
field_name_and_type.update({field.name:field.type})
# assert if the timestamp field exists
try:
assert 'timestamp' in field_name_and_type
except AssertionError:
print("[ERROR] uORB topic files generator:\n\tgenerate_output_from_file:\tNo 'timestamp' field found in " + spec.short_name + " msg definition!")
exit(1)
# assert if the timestamp field is of type uint64
try:
assert field_name_and_type.get('timestamp') == 'uint64'
except AssertionError:
print("[ERROR] uORB topic files generator:\n\tgenerate_output_from_file:\t'timestamp' field in " + spec.short_name + " msg definition is not of type uint64 but rather of type " + field_name_and_type.get('timestamp') + "!")
exit(1)
topics = get_multi_topics(filename)
if includepath:
search_path = genmsg.command_line.includepath_to_dict(includepath)
else:
search_path = {}
genmsg.msg_loader.load_depends(msg_context, spec, search_path)
md5sum = genmsg.gentools.compute_md5(msg_context, spec)
if len(topics) == 0:
topics.append(spec.short_name)
em_globals = {
"file_name_in": filename,
"md5sum": md5sum,
"search_path": search_path,
"msg_context": msg_context,
"spec": spec,
"topics": topics
}
"""
Converts a single .msg file to an uorb header/source file
"""
msg_context = genmsg.msg_loader.MsgContext.create_default()
full_type_name = genmsg.gentools.compute_full_type_name(
package, os.path.basename(filename))
spec = genmsg.msg_loader.load_msg_from_file(
msg_context, filename, full_type_name)
field_name_and_type = {}
for field in spec.parsed_fields():
field_name_and_type.update({field.name: field.type})
# assert if the timestamp field exists
try:
assert 'timestamp' in field_name_and_type
except AssertionError:
print("[ERROR] uORB topic files generator:\n\tgenerate_output_from_file:\tNo 'timestamp' field found in " +
spec.short_name + " msg definition!")
exit(1)
# assert if the timestamp field is of type uint64
try:
assert field_name_and_type.get('timestamp') == 'uint64'
except AssertionError:
print("[ERROR] uORB topic files generator:\n\tgenerate_output_from_file:\t'timestamp' field in " + spec.short_name +
" msg definition is not of type uint64 but rather of type " + field_name_and_type.get('timestamp') + "!")
exit(1)
topics = get_multi_topics(filename)
if includepath:
search_path = genmsg.command_line.includepath_to_dict(includepath)
else:
search_path = {}
genmsg.msg_loader.load_depends(msg_context, spec, search_path)
md5sum = genmsg.gentools.compute_md5(msg_context, spec)
if len(topics) == 0:
topics.append(spec.short_name)
em_globals = {
"file_name_in": filename,
"md5sum": md5sum,
"search_path": search_path,
"msg_context": msg_context,
"spec": spec,
"topics": topics
}
# Make sure output directory exists:
if not os.path.isdir(outputdir):
os.makedirs(outputdir)
# Make sure output directory exists:
if not os.path.isdir(outputdir):
os.makedirs(outputdir)
template_file = os.path.join(templatedir, TEMPLATE_FILE[format_idx])
output_file = os.path.join(outputdir, spec.short_name +
OUTPUT_FILE_EXT[format_idx])
template_file = os.path.join(templatedir, TEMPLATE_FILE[format_idx])
output_file = os.path.join(outputdir, spec.short_name +
OUTPUT_FILE_EXT[format_idx])
return generate_by_template(output_file, template_file, em_globals)
return generate_by_template(output_file, template_file, em_globals)
def generate_idl_file(filename_msg, outputdir, templatedir, package, includepath):
"""
Generates an .idl from .msg file
"""
em_globals = get_em_globals(filename_msg, package, includepath, MsgScope.NONE)
spec_short_name = em_globals["spec"].short_name
"""
Generates an .idl from .msg file
"""
em_globals = get_em_globals(
filename_msg, package, includepath, MsgScope.NONE)
spec_short_name = em_globals["spec"].short_name
# Make sure output directory exists:
if not os.path.isdir(outputdir):
os.makedirs(outputdir)
# Make sure output directory exists:
if not os.path.isdir(outputdir):
os.makedirs(outputdir)
template_file = os.path.join(templatedir, IDL_TEMPLATE_FILE)
output_file = os.path.join(outputdir, IDL_TEMPLATE_FILE.replace("msg.idl.template", str(spec_short_name + "_.idl")))
template_file = os.path.join(templatedir, IDL_TEMPLATE_FILE)
output_file = os.path.join(outputdir, IDL_TEMPLATE_FILE.replace(
"msg.idl.template", str(spec_short_name + "_.idl")))
return generate_by_template(output_file, template_file, em_globals)
return generate_by_template(output_file, template_file, em_globals)
def generate_uRTPS_general(filename_send_msgs, filename_received_msgs,
outputdir, templatedir, package, includepath, template_name):
"""
Generates source file by msg content
"""
em_globals_list = []
if filename_send_msgs:
em_globals_list.extend([get_em_globals(f, package, includepath, MsgScope.SEND) for f in filename_send_msgs])
"""
Generates source file by msg content
"""
em_globals_list = []
if filename_send_msgs:
em_globals_list.extend([get_em_globals(
f, package, includepath, MsgScope.SEND) for f in filename_send_msgs])
if filename_received_msgs:
em_globals_list.extend([get_em_globals(f, package, includepath, MsgScope.RECEIVE) for f in filename_received_msgs])
if filename_received_msgs:
em_globals_list.extend([get_em_globals(
f, package, includepath, MsgScope.RECEIVE) for f in filename_received_msgs])
merged_em_globals = merge_em_globals_list(em_globals_list)
# Make sure output directory exists:
if not os.path.isdir(outputdir):
os.makedirs(outputdir)
merged_em_globals = merge_em_globals_list(em_globals_list)
# Make sure output directory exists:
if not os.path.isdir(outputdir):
os.makedirs(outputdir)
template_file = os.path.join(templatedir, template_name)
output_file = os.path.join(outputdir, template_name.replace(".template", ""))
template_file = os.path.join(templatedir, template_name)
output_file = os.path.join(
outputdir, template_name.replace(".template", ""))
return generate_by_template(output_file, template_file, merged_em_globals)
return generate_by_template(output_file, template_file, merged_em_globals)
def generate_topic_file(filename_msg, outputdir, templatedir, package, includepath, template_name):
"""
Generates an .idl from .msg file
"""
em_globals = get_em_globals(filename_msg, package, includepath, MsgScope.NONE)
spec_short_name = em_globals["spec"].short_name
"""
Generates an .idl from .msg file
"""
em_globals = get_em_globals(
filename_msg, package, includepath, MsgScope.NONE)
spec_short_name = em_globals["spec"].short_name
# Make sure output directory exists:
if not os.path.isdir(outputdir):
os.makedirs(outputdir)
# Make sure output directory exists:
if not os.path.isdir(outputdir):
os.makedirs(outputdir)
template_file = os.path.join(templatedir, template_name)
output_file = os.path.join(outputdir, spec_short_name + "_" + template_name.replace(".template", ""))
template_file = os.path.join(templatedir, template_name)
output_file = os.path.join(
outputdir, spec_short_name + "_" + template_name.replace(".template", ""))
return generate_by_template(output_file, template_file, em_globals)
return generate_by_template(output_file, template_file, em_globals)
def get_em_globals(filename_msg, package, includepath, scope):
"""
Generates em globals dictionary
"""
msg_context = genmsg.msg_loader.MsgContext.create_default()
full_type_name = genmsg.gentools.compute_full_type_name(package, os.path.basename(filename_msg))
spec = genmsg.msg_loader.load_msg_from_file(msg_context, filename_msg, full_type_name)
topics = get_multi_topics(filename_msg)
if includepath:
search_path = genmsg.command_line.includepath_to_dict(includepath)
else:
search_path = {}
genmsg.msg_loader.load_depends(msg_context, spec, search_path)
md5sum = genmsg.gentools.compute_md5(msg_context, spec)
if len(topics) == 0:
topics.append(spec.short_name)
em_globals = {
"file_name_in": filename_msg,
"md5sum": md5sum,
"search_path": search_path,
"msg_context": msg_context,
"spec": spec,
"topics": topics,
"scope": scope
}
"""
Generates em globals dictionary
"""
msg_context = genmsg.msg_loader.MsgContext.create_default()
full_type_name = genmsg.gentools.compute_full_type_name(
package, os.path.basename(filename_msg))
spec = genmsg.msg_loader.load_msg_from_file(
msg_context, filename_msg, full_type_name)
topics = get_multi_topics(filename_msg)
if includepath:
search_path = genmsg.command_line.includepath_to_dict(includepath)
else:
search_path = {}
genmsg.msg_loader.load_depends(msg_context, spec, search_path)
md5sum = genmsg.gentools.compute_md5(msg_context, spec)
if len(topics) == 0:
topics.append(spec.short_name)
em_globals = {
"file_name_in": filename_msg,
"md5sum": md5sum,
"search_path": search_path,
"msg_context": msg_context,
"spec": spec,
"topics": topics,
"scope": scope
}
return em_globals
return em_globals
def merge_em_globals_list(em_globals_list):
"""
@ -253,208 +274,220 @@ def merge_em_globals_list(em_globals_list):
merged_em_globals = {}
for name in em_globals_list[0]:
merged_em_globals[name] = [em_globals[name] for em_globals in em_globals_list]
merged_em_globals[name] = [em_globals[name]
for em_globals in em_globals_list]
return merged_em_globals
def generate_by_template(output_file, template_file, em_globals):
"""
Invokes empy intepreter to geneate output_file by the
given template_file and predefined em_globals dict
"""
# check if folder exists:
folder_name = os.path.dirname(output_file)
if not os.path.exists(folder_name):
os.makedirs(folder_name)
"""
Invokes empy intepreter to geneate output_file by the
given template_file and predefined em_globals dict
"""
# check if folder exists:
folder_name = os.path.dirname(output_file)
if not os.path.exists(folder_name):
os.makedirs(folder_name)
ofile = open(output_file, 'w')
# todo, reuse interpreter
interpreter = em.Interpreter(output=ofile, globals=em_globals, options={em.RAW_OPT:True,em.BUFFERED_OPT:True})
try:
interpreter.file(open(template_file))
except OSError as e:
ofile.close()
os.remove(output_file)
raise
interpreter.shutdown()
ofile = open(output_file, 'w')
# todo, reuse interpreter
interpreter = em.Interpreter(output=ofile, globals=em_globals, options={
em.RAW_OPT: True, em.BUFFERED_OPT: True})
try:
interpreter.file(open(template_file))
except OSError as e:
ofile.close()
return True
os.remove(output_file)
raise
interpreter.shutdown()
ofile.close()
return True
def convert_dir(format_idx, inputdir, outputdir, package, templatedir):
"""
Converts all .msg files in inputdir to uORB header/source files
"""
"""
Converts all .msg files in inputdir to uORB header/source files
"""
# Find the most recent modification time in input dir
maxinputtime = 0
for f in os.listdir(inputdir):
fni = os.path.join(inputdir, f)
if os.path.isfile(fni):
it = os.path.getmtime(fni)
if it > maxinputtime:
maxinputtime = it
# Find the most recent modification time in input dir
maxinputtime = 0
for f in os.listdir(inputdir):
fni = os.path.join(inputdir, f)
if os.path.isfile(fni):
it = os.path.getmtime(fni)
if it > maxinputtime:
maxinputtime = it
# Find the most recent modification time in output dir
maxouttime = 0
if os.path.isdir(outputdir):
for f in os.listdir(outputdir):
fni = os.path.join(outputdir, f)
if os.path.isfile(fni):
it = os.path.getmtime(fni)
if it > maxouttime:
maxouttime = it
# Find the most recent modification time in output dir
maxouttime = 0
if os.path.isdir(outputdir):
for f in os.listdir(outputdir):
fni = os.path.join(outputdir, f)
if os.path.isfile(fni):
it = os.path.getmtime(fni)
if it > maxouttime:
maxouttime = it
# Do not generate if nothing changed on the input
if (maxinputtime != 0 and maxouttime != 0 and maxinputtime < maxouttime):
return False
# Do not generate if nothing changed on the input
if (maxinputtime != 0 and maxouttime != 0 and maxinputtime < maxouttime):
return False
includepath = INCL_DEFAULT + [':'.join([package, inputdir])]
for f in os.listdir(inputdir):
# Ignore hidden files
if f.startswith("."):
continue
includepath = INCL_DEFAULT + [':'.join([package, inputdir])]
for f in os.listdir(inputdir):
# Ignore hidden files
if f.startswith("."):
continue
fn = os.path.join(inputdir, f)
# Only look at actual files
if not os.path.isfile(fn):
continue
fn = os.path.join(inputdir, f)
# Only look at actual files
if not os.path.isfile(fn):
continue
if fn[-4:].lower() != '.msg':
continue
if fn[-4:].lower() != '.msg':
continue
generate_output_from_file(format_idx, fn, outputdir, package, templatedir, includepath)
return True
generate_output_from_file(
format_idx, fn, outputdir, package, templatedir, includepath)
return True
def copy_changed(inputdir, outputdir, prefix='', quiet=False):
"""
Copies files from inputdir to outputdir if they don't exist in
ouputdir or if their content changed
"""
"""
Copies files from inputdir to outputdir if they don't exist in
ouputdir or if their content changed
"""
# Make sure output directory exists:
if not os.path.isdir(outputdir):
os.makedirs(outputdir)
# Make sure output directory exists:
if not os.path.isdir(outputdir):
os.makedirs(outputdir)
for input_file in os.listdir(inputdir):
fni = os.path.join(inputdir, input_file)
if os.path.isfile(fni):
# Check if input_file exists in outpoutdir, copy the file if not
fno = os.path.join(outputdir, prefix + input_file)
if not os.path.isfile(fno):
shutil.copy(fni, fno)
if not quiet:
print("{0}: new header file".format(fno))
continue
for input_file in os.listdir(inputdir):
fni = os.path.join(inputdir, input_file)
if os.path.isfile(fni):
# Check if input_file exists in outpoutdir, copy the file if not
fno = os.path.join(outputdir, prefix + input_file)
if not os.path.isfile(fno):
shutil.copy(fni, fno)
if not quiet:
print("{0}: new header file".format(fno))
continue
if os.path.getmtime(fni) > os.path.getmtime(fno):
# The file exists in inputdir and outputdir
# only copy if contents do not match
if not filecmp.cmp(fni, fno):
shutil.copy(fni, fno)
if not quiet:
print("{0}: updated".format(input_file))
continue
if os.path.getmtime(fni) > os.path.getmtime(fno):
# The file exists in inputdir and outputdir
# only copy if contents do not match
if not filecmp.cmp(fni, fno):
shutil.copy(fni, fno)
if not quiet:
print("{0}: updated".format(input_file))
continue
if not quiet:
print("{0}: unchanged".format(input_file))
if not quiet:
print("{0}: unchanged".format(input_file))
def convert_dir_save(format_idx, inputdir, outputdir, package, templatedir, temporarydir, prefix, quiet=False):
"""
Converts all .msg files in inputdir to uORB header files
Unchanged existing files are not overwritten.
"""
# Create new headers in temporary output directory
convert_dir(format_idx, inputdir, temporarydir, package, templatedir)
if generate_idx == 1:
generate_topics_list_file(inputdir, temporarydir, templatedir)
# Copy changed headers from temporary dir to output dir
copy_changed(temporarydir, outputdir, prefix, quiet)
"""
Converts all .msg files in inputdir to uORB header files
Unchanged existing files are not overwritten.
"""
# Create new headers in temporary output directory
convert_dir(format_idx, inputdir, temporarydir, package, templatedir)
if generate_idx == 1:
generate_topics_list_file(inputdir, temporarydir, templatedir)
# Copy changed headers from temporary dir to output dir
copy_changed(temporarydir, outputdir, prefix, quiet)
def generate_topics_list_file(msgdir, outputdir, templatedir):
# generate cpp file with topics list
msgs = get_msgs_list(msgdir)
multi_topics = []
for msg in msgs:
msg_filename = os.path.join(msgdir, msg)
multi_topics.extend(get_multi_topics(msg_filename))
tl_globals = {"msgs" : msgs, "multi_topics" : multi_topics}
tl_template_file = os.path.join(templatedir, TOPICS_LIST_TEMPLATE_FILE)
tl_out_file = os.path.join(outputdir, TOPICS_LIST_TEMPLATE_FILE.replace(".template", ""))
generate_by_template(tl_out_file, tl_template_file, tl_globals)
# generate cpp file with topics list
msgs = get_msgs_list(msgdir)
multi_topics = []
for msg in msgs:
msg_filename = os.path.join(msgdir, msg)
multi_topics.extend(get_multi_topics(msg_filename))
tl_globals = {"msgs": msgs, "multi_topics": multi_topics}
tl_template_file = os.path.join(templatedir, TOPICS_LIST_TEMPLATE_FILE)
tl_out_file = os.path.join(
outputdir, TOPICS_LIST_TEMPLATE_FILE.replace(".template", ""))
generate_by_template(tl_out_file, tl_template_file, tl_globals)
def generate_topics_list_file_from_files(files, outputdir, templatedir):
# generate cpp file with topics list
filenames = [os.path.basename(p) for p in files if os.path.basename(p).endswith(".msg")]
multi_topics = []
for msg_filename in files:
multi_topics.extend(get_multi_topics(msg_filename))
tl_globals = {"msgs" : filenames, "multi_topics" : multi_topics}
tl_template_file = os.path.join(templatedir, TOPICS_LIST_TEMPLATE_FILE)
tl_out_file = os.path.join(outputdir, TOPICS_LIST_TEMPLATE_FILE.replace(".template", ""))
generate_by_template(tl_out_file, tl_template_file, tl_globals)
# generate cpp file with topics list
filenames = [os.path.basename(
p) for p in files if os.path.basename(p).endswith(".msg")]
multi_topics = []
for msg_filename in files:
multi_topics.extend(get_multi_topics(msg_filename))
tl_globals = {"msgs": filenames, "multi_topics": multi_topics}
tl_template_file = os.path.join(templatedir, TOPICS_LIST_TEMPLATE_FILE)
tl_out_file = os.path.join(
outputdir, TOPICS_LIST_TEMPLATE_FILE.replace(".template", ""))
generate_by_template(tl_out_file, tl_template_file, tl_globals)
def append_to_include_path(path_to_append, curr_include, package):
for p in path_to_append:
curr_include.append('%s:%s' % (package, p))
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description='Convert msg files to uorb headers/sources')
parser.add_argument('--headers', help='Generate header files',
action='store_true')
parser.add_argument('--sources', help='Generate source files',
action='store_true')
parser.add_argument('-d', dest='dir', help='directory with msg files')
parser.add_argument('-f', dest='file',
help="files to convert (use only without -d)",
nargs="+")
parser.add_argument('-i', dest="include_paths",
help='Additional Include Paths', nargs="*",
default=None)
parser.add_argument('-e', dest='templatedir',
help='directory with template files',)
parser.add_argument('-k', dest='package', default=PACKAGE,
help='package name')
parser.add_argument('-o', dest='outputdir',
help='output directory for header files')
parser.add_argument('-t', dest='temporarydir',
help='temporary directory')
parser.add_argument('-p', dest='prefix', default='',
help='string added as prefix to the output file '
' name when converting directories')
parser.add_argument('-q', dest='quiet', default=False, action='store_true',
help='string added as prefix to the output file '
' name when converting directories')
args = parser.parse_args()
parser = argparse.ArgumentParser(
description='Convert msg files to uorb headers/sources')
parser.add_argument('--headers', help='Generate header files',
action='store_true')
parser.add_argument('--sources', help='Generate source files',
action='store_true')
parser.add_argument('-d', dest='dir', help='directory with msg files')
parser.add_argument('-f', dest='file',
help="files to convert (use only without -d)",
nargs="+")
parser.add_argument('-i', dest="include_paths",
help='Additional Include Paths', nargs="*",
default=None)
parser.add_argument('-e', dest='templatedir',
help='directory with template files',)
parser.add_argument('-k', dest='package', default=PACKAGE,
help='package name')
parser.add_argument('-o', dest='outputdir',
help='output directory for header files')
parser.add_argument('-t', dest='temporarydir',
help='temporary directory')
parser.add_argument('-p', dest='prefix', default='',
help='string added as prefix to the output file '
' name when converting directories')
parser.add_argument('-q', dest='quiet', default=False, action='store_true',
help='string added as prefix to the output file '
' name when converting directories')
args = parser.parse_args()
if args.include_paths:
append_to_include_path(args.include_paths, INCL_DEFAULT, args.package)
if args.include_paths:
append_to_include_path(args.include_paths, INCL_DEFAULT, args.package)
if args.headers:
generate_idx = 0
elif args.sources:
generate_idx = 1
else:
print('Error: either --headers or --sources must be specified')
exit(-1)
if args.file is not None:
for f in args.file:
generate_output_from_file(generate_idx, f, args.temporarydir, args.package, args.templatedir, INCL_DEFAULT)
if generate_idx == 1:
generate_topics_list_file_from_files(args.file, args.outputdir, args.templatedir)
copy_changed(args.temporarydir, args.outputdir, args.prefix, args.quiet)
elif args.dir is not None:
convert_dir_save(
generate_idx,
args.dir,
args.outputdir,
args.package,
args.templatedir,
args.temporarydir,
args.prefix,
args.quiet)
if args.headers:
generate_idx = 0
elif args.sources:
generate_idx = 1
else:
print('Error: either --headers or --sources must be specified')
exit(-1)
if args.file is not None:
for f in args.file:
generate_output_from_file(
generate_idx, f, args.temporarydir, args.package, args.templatedir, INCL_DEFAULT)
if generate_idx == 1:
generate_topics_list_file_from_files(
args.file, args.outputdir, args.templatedir)
copy_changed(args.temporarydir, args.outputdir,
args.prefix, args.quiet)
elif args.dir is not None:
convert_dir_save(
generate_idx,
args.dir,
args.outputdir,
args.package,
args.templatedir,
args.temporarydir,
args.prefix,
args.quiet)