px4-firmware/Tools/module_config/generate_actuators_metadata.py

507 lines
20 KiB
Python
Executable File

#!/usr/bin/env python3
""" Script to generate actuators.json metadata from module.yaml config file(s)
"""
import argparse
import lzma #to create .xz file
import json
import os
import sys
from output_groups_from_timer_config import get_timer_groups, get_output_groups
try:
import yaml
except ImportError as e:
print("Failed to import yaml: " + str(e))
print("")
print("You may need to install it using:")
print(" pip3 install --user pyyaml")
print("")
sys.exit(1)
parser = argparse.ArgumentParser(description='Generate actuators.json from module.yaml file(s)')
parser.add_argument('--config-files', type=str, nargs='*', default=[],
help='YAML module config file(s)')
parser.add_argument('--output-file', type=str, action='store',
help='JSON output file', required=True)
parser.add_argument('--compress', action='store_true', help='Add a compressed output file')
parser.add_argument('-v', '--verbose', dest='verbose', action='store_true',
help='Verbose Output')
parser.add_argument('--timer-config', type=str, action='store',
help='board-specific timer_config.cpp file')
parser.add_argument('--board', type=str, action='store',
help='board name, e.g. ')
parser.add_argument('--board-with-io', dest='board_with_io', action='store_true',
help='Indicate that the board as an IO for extra PWM',
default=False)
args = parser.parse_args()
compress = args.compress
verbose = args.verbose
output_file = args.output_file
timer_config_file = args.timer_config
board_with_io = args.board_with_io
board = args.board
root_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)),"../..")
output_functions_file = os.path.join(root_dir,"src/lib/mixer_module/output_functions.yaml")
def save_compressed(filename):
#create lzma compressed version
xz_filename=filename+'.xz'
with lzma.open(xz_filename, 'wt', preset=9) as f:
with open(filename, 'r') as content_file:
f.write(content_file.read())
def load_yaml_file(file_name):
with open(file_name, 'r') as stream:
try:
return yaml.safe_load(stream)
except yaml.YAMLError as exc:
print(exc)
raise
# functions
output_functions_yaml = load_yaml_file(output_functions_file)
output_functions = output_functions_yaml['functions']
functions = {}
def add_function(functions, index, name, function_obj=None):
functions[index] = {
"label": name
}
if function_obj is not None:
if function_obj.get('exclude_from_actuator_testing', False):
functions[index]['exclude-from-actuator-testing'] = True
if 'note' in function_obj:
functions[index]['note'] = function_obj['note']
for group_key in output_functions:
group = output_functions[group_key]
for function_name in group:
function_name_label = function_name.replace('_', ' ')
if isinstance(group[function_name], int):
add_function(functions, group[function_name], function_name_label)
elif not 'count' in group[function_name]:
add_function(functions, group[function_name]['start'], function_name_label, group[function_name])
else:
start = group[function_name]['start']
count = group[function_name]['count']
for i in range(count):
add_function(functions, start+i, function_name_label+' '+str(i+1), group[function_name])
# outputs
outputs = []
def process_module_name(module_name):
if module_name == '${PWM_MAIN_OR_AUX}':
if board_with_io: return 'PWM AUX'
return 'PWM MAIN'
if '${' in module_name:
raise Exception('unhandled variable in {:}'.format(module_name))
return module_name
def process_param_prefix(param_prefix):
if param_prefix == '${PWM_MAIN_OR_AUX}':
if board_with_io: return 'PWM_AUX'
return 'PWM_MAIN'
if '${' in param_prefix:
raise Exception('unhandled variable in {:}'.format(param_prefix))
return param_prefix
def process_channel_label(module_name, channel_label, no_prefix):
if channel_label == '${PWM_MAIN_OR_AUX_CAP}':
return 'CAP'
if channel_label == '${PWM_MAIN_OR_AUX}':
if board_with_io: return 'AUX'
return 'MAIN'
if '${' in channel_label:
raise Exception('unhandled variable in {:}'.format(channel_label))
if no_prefix: return channel_label
return channel_label
def get_actuator_output(yaml_config, output_functions, timer_config_file, verbose):
""" parse the actuator_output section from the yaml config file
"""
if not 'actuator_output' in yaml_config:
return None
actuator_output_yaml = yaml_config['actuator_output']
output_groups = actuator_output_yaml['output_groups']
module_name = process_module_name(yaml_config['module_name'])
group_idx = 0
if verbose: print('processing module: {}'.format(module_name))
actuator_output = {
'label': module_name
}
if 'show_subgroups_if' in actuator_output_yaml:
actuator_output['show-subgroups-if'] = actuator_output_yaml['show_subgroups_if']
# config parameters
def get_config_params(param_list):
""" convert config parameter list (per group or per subgroup) """
parameters = []
for config_param in param_list:
if verbose:
print('config param: {}'.format(config_param))
param = {
'name': config_param['param'],
}
if 'label' in config_param:
param['label'] = config_param['label']
if 'function' in config_param:
param['function'] = config_param['function']
parameters.append(param)
return parameters
parameters = get_config_params(actuator_output_yaml.get('config_parameters', []))
if len(parameters) > 0:
actuator_output['parameters'] = parameters
subgroups = []
while group_idx < len(output_groups):
group = output_groups[group_idx]
group_idx += 1
if verbose: print("processing group: {:}".format(group))
# Check for generator and generate additional data.
if 'generator' in group:
if group['generator'] == 'pwm':
param_prefix = process_param_prefix(group['param_prefix'])
no_prefix = not group.get('channel_label_module_name_prefix', True)
channel_labels = [process_channel_label(module_name, label, no_prefix)
for label in group['channel_labels']]
standard_params = group.get('standard_params', [])
extra_function_groups = group.get('extra_function_groups', [])
pwm_timer_param = group.get('pwm_timer_param', None)
if 'timer_config_file' in group:
timer_config_file = os.path.join(root_dir, group['timer_config_file'])
if timer_config_file is None:
raise Exception('trying to generate pwm outputs, but --timer-config not set')
timer_groups = get_timer_groups(timer_config_file, verbose)
timer_output_groups, timer_params = get_output_groups(timer_groups,
param_prefix, channel_labels,
standard_params, extra_function_groups, pwm_timer_param,
verbose=verbose)
output_groups.extend(timer_output_groups)
else:
raise Exception('unknown generator {:}'.format(group['generator']))
continue
subgroup = {}
# supported actions
if 'supported_actions' in group:
actions = {}
for action_name in group['supported_actions']:
action = group['supported_actions'][action_name]
action_name = action_name.replace('_', '-')
actions[action_name] = {}
if 'supported_if' in action:
actions[action_name]['supported-if'] = action['supported_if']
if 'actuator_types' in action:
actions[action_name]['actuator-types'] = action['actuator_types']
subgroup['supported-actions'] = actions
# channels
num_channels = group['num_channels']
no_prefix = not group.get('channel_label_module_name_prefix', True)
channel_label = process_channel_label(module_name, group['channel_label'], no_prefix)
instance_start = group.get('instance_start', 1)
instance_start_label = group.get('instance_start_label', instance_start)
channels = []
for channel in range(num_channels):
channels.append({
'label': channel_label + ' ' +str(channel+instance_start_label),
'param-index': channel+instance_start
})
subgroup['channels'] = channels
if 'group_label' in group:
subgroup['label'] = group['group_label']
# per-channel-params
per_channel_params = []
param_prefix = process_param_prefix(group['param_prefix'])
standard_params = group.get('standard_params', {})
standard_params_array = [
( 'function', 'Function', 'FUNC', False ),
( 'disarmed', 'Disarmed', 'DIS', False ),
( 'min', 'Minimum', 'MIN', False ),
( 'max', 'Maximum', 'MAX', False ),
( 'failsafe', 'Failsafe', 'FAIL', True ),
]
for key, label, param_suffix, advanced in standard_params_array:
show_if = None
if key in standard_params and 'show_if' in standard_params[key]:
show_if = standard_params[key]['show_if']
if key in standard_params or key == 'function':
param = {
'label': label,
'name': param_prefix+'_'+param_suffix+'${i}',
'function': key,
}
if advanced: param['advanced'] = True
if show_if: param['show-if'] = show_if
per_channel_params.append(param)
param = {
'label': 'Rev Range\n(for Servos)',
'name': param_prefix+'_REV',
'index-offset': -1,
'show-as': 'bitset',
}
per_channel_params.append(param)
custom_params = group.get('custom_params', [])
for custom_param in custom_params:
# Simply pulls all custom params, assuming they are valid ones
param = {
'name': param_prefix+'_'+custom_param['name'],
}
# TODO: check and match the custom params in output_groups with module-level parameters
del custom_param['name']
for param_key in custom_param:
# '-' is used in actuators.schema.json, while '_' is used in module_schema.yml
param[param_key.replace('_', '-')] = custom_param[param_key]
per_channel_params.append(param)
subgroup['per-channel-parameters'] = per_channel_params
# group config params
parameters = get_config_params(group.get('config_parameters', []))
if len(parameters) > 0:
subgroup['parameters'] = parameters
subgroups.append(subgroup)
actuator_output['subgroups'] = subgroups
return actuator_output
# Mixers
mixers = None
def get_mixers(yaml_config, output_functions, verbose):
if not 'mixer' in yaml_config:
return None
actuator_types = {}
for actuator_type_key in yaml_config['mixer']['actuator_types']:
actuator_type_conf = yaml_config['mixer']['actuator_types'][actuator_type_key]
actuator_type = { }
if actuator_type_key != 'DEFAULT':
actuator_type['label-index-offset'] = 1 # always 1
if 'functions' in actuator_type_conf:
function_name = actuator_type_conf['functions']
# we expect the function to be in 'common' (this is not a requirement, just simplicity)
output_function = output_functions['common'][function_name]
actuator_type['function-min'] = output_function['start']
actuator_type['function-max'] = output_function['start'] + output_function['count'] - 1
values = actuator_type_conf['actuator_testing_values']
actuator_type['values'] = {
'min': values['min'],
'max': values['max'],
}
if values.get('default_is_nan', False):
actuator_type['values']['default-is-nan'] = True
else:
actuator_type['values']['default'] = values['default']
if values.get('reversible', False):
actuator_type['values']['reversible'] = True
# per item params
per_item_params = []
for per_item_param in actuator_type_conf.get('per_item_parameters', []):
per_item_params.append({k.replace('_','-'): v for k, v in per_item_param.items()})
if len(per_item_params) > 0:
actuator_type['per-item-parameters'] = per_item_params
actuator_types[actuator_type_key] = actuator_type
if verbose:
print('Actuator types: {}'.format(actuator_types))
config = []
yaml_mixer_config = yaml_config['mixer']['config']
select_param = yaml_mixer_config['param']
types = yaml_mixer_config['types']
for type_index in types:
current_type = types[type_index]
option = select_param + '==' + str(type_index)
mixer_config = {
'option': option,
'help-url': 'https://docs.px4.io/main/en/config/actuators.html',
}
for optional in ['type', 'title']:
if optional in current_type:
mixer_config[optional] = current_type[optional]
actuators = []
for actuator_conf in current_type['actuators']:
actuator = {
'actuator-type': actuator_conf['actuator_type'],
'required': True, # for now always set as required
}
# sanity check that actuator type exists
if actuator_conf['actuator_type'] not in actuator_types:
raise Exception('actuator type "{}" does not exist (valid: {})'.format(actuator_conf['actuator_type'], actuator_types.keys()))
if 'group_label' in actuator_conf:
actuator['group-label'] = actuator_conf['group_label']
else:
# infer from actuator type
if actuator_conf['actuator_type'] == 'motor':
actuator['group-label'] = 'Motors'
elif actuator_conf['actuator_type'] == 'servo':
actuator['group-label'] = 'Servos'
else:
raise Exception('Missing group label for actuator type "{}"'.format(actuator_conf['actuator_type']))
if 'count' in actuator_conf: # possibly dynamic size
actuator['count'] = actuator_conf['count']
per_item_params = actuator_conf.get('per_item_parameters', {})
params = []
if 'standard' in per_item_params:
standard_params = per_item_params['standard']
index_offset = standard_params.get('index_offset', 0)
if 'position' in standard_params:
params.extend([
{
'label': 'Position X',
'function': 'posx',
'name': standard_params['position'][0],
'index-offset': index_offset,
},
{
'label': 'Position Y',
'function': 'posy',
'name': standard_params['position'][1],
'index-offset': index_offset,
},
{
'label': 'Position Z',
'function': 'posz',
'name': standard_params['position'][2],
'advanced': True,
'index-offset': index_offset,
},
])
if 'extra' in per_item_params:
for extra_param in per_item_params['extra']:
params.append({k.replace('_','-'): v for k, v in extra_param.items()})
actuator['per-item-parameters'] = params
if 'item_label_prefix' in actuator_conf:
actuator['item-label-prefix'] = actuator_conf['item_label_prefix']
else: # fixed size
labels = []
pos_x = []
pos_y = []
pos_z = []
for instance in actuator_conf['instances']:
labels.append(instance['name'])
pos_x.append(instance['position'][0])
pos_y.append(instance['position'][1])
pos_z.append(instance['position'][2])
actuator['count'] = len(labels)
actuator['item-label-prefix'] = labels
actuator['per-item-parameters'] = [
{
'label': 'Position X',
'function': 'posx',
'value': pos_x,
},
{
'label': 'Position Y',
'function': 'posy',
'value': pos_y,
},
{
'label': 'Position Z',
'function': 'posz',
'value': pos_z,
'advanced': True,
},
]
# actuator parameters
parameters = []
for param in actuator_conf.get('parameters', []):
parameters.append({k.replace('_','-'): v for k, v in param.items()})
actuator['parameters'] = parameters
actuators.append(actuator)
mixer_config['actuators'] = actuators
config.append(mixer_config)
if verbose:
print('Mixer configs: {}'.format(config))
rules = []
for rule in yaml_config['mixer'].get('rules', []):
rules.append({k.replace('_','-'): v for k, v in rule.items()})
if verbose:
print('Mixer rules: {}'.format(rules))
mixers = {
'actuator-types': actuator_types,
'config': config,
'rules': rules,
}
return mixers
for yaml_file in args.config_files:
yaml_config = load_yaml_file(yaml_file)
try:
actuator_output = get_actuator_output(yaml_config,
output_functions, timer_config_file, verbose)
if actuator_output:
outputs.append(actuator_output)
parsed_mixers = get_mixers(yaml_config, output_functions, verbose)
if parsed_mixers is not None:
if mixers is not None:
# only expected to be configured in one module
raise Exception('multiple "mixer" sections in module config files')
mixers = parsed_mixers
except Exception as e:
print('Exception while parsing {:}:'.format(yaml_file))
raise e
if mixers is None:
if len(outputs) > 0:
raise Exception('Missing "mixer" section in yaml configs (CONFIG_MODULES_CONTROL_ALLOCATOR not added to the build?)')
else:
# set a minimal default
mixers = {
'actuator-types': { 'DEFAULT': { 'values': { 'min': 0, 'max': 1 } } },
'config': [],
}
actuators = {
'version': 1,
'outputs_v1': outputs,
'functions_v1': functions,
'mixer_v1': mixers,
}
with open(output_file, 'w') as outfile:
indent = 2 if verbose else None
json.dump(actuators, outfile, indent=indent, sort_keys=True)
if compress:
save_compressed(output_file)