mirror of https://github.com/ArduPilot/ardupilot
Tools: Script to fetch online ArduPilot parameter documentation (if not cached) and
add it to the specified file or to all *.param and *.parm files in the specified directory. 1. Checks if a local cache of the XML file exists in the target directory or on the directory of the target file: - If it does, the script loads the file content. - If it doesn't, the script sends a GET request to the URL to fetch the XML data for the requested vehicle type. 2. Parses the XML data and creates a dictionary of parameter documentation. 3. DELETES all comments that start at the beginning of a line 4. Adds the parameter documentation to the target file or to all *.param,*.parm files in the target directory. Supports AP_Periph, AntennaTracker, ArduCopter, ArduPlane, ArduSub, Blimp, Heli, Rover and SITL vehicle types Supports both Mission Planner and MAVProxy file formats Supports sorting the parameters Has unit tests with 88% coverage AP_FLAKE8_CLEAN Author: Amilcar do Carmo Lucas, IAV GmbH # Conflicts: # .github/workflows/test_unit_tests.yml
This commit is contained in:
parent
feff639c15
commit
b6b4c972f9
|
@ -147,16 +147,22 @@ jobs:
|
|||
fi
|
||||
PATH="/github/home/.local/bin:$PATH"
|
||||
Tools/scripts/build_ci.sh
|
||||
|
||||
- name: Unittest extract_param_defaults
|
||||
env:
|
||||
PYTHONPATH: "${{ github.workspace }}/Tools/scripts:${{ github.workspace }}/modules/mavlink/pymavlink"
|
||||
shell: 'script -q -e -c "bash {0}"'
|
||||
if: matrix.config == 'unit-tests'
|
||||
run: |
|
||||
if [[ ${{ matrix.config }} == "unit-tests" ]]; then
|
||||
Tools/autotest/unittest/extract_param_defaults_unittest.py
|
||||
else
|
||||
echo ${{matrix.config}} does not have pymavlink
|
||||
fi
|
||||
Tools/autotest/unittest/extract_param_defaults_unittest.py
|
||||
|
||||
- name: Unittest annotate_params
|
||||
env:
|
||||
PYTHONPATH: "${{ github.workspace }}/Tools/scripts"
|
||||
if: matrix.config == 'unit-tests'
|
||||
run: |
|
||||
python -m pip install --upgrade pip
|
||||
pip install requests mock
|
||||
Tools/autotest/unittest/annotate_params_unittest.py
|
||||
|
||||
- name: Archive buildlog artifacts
|
||||
uses: actions/upload-artifact@v3
|
||||
|
|
|
@ -0,0 +1,539 @@
|
|||
#!/usr/bin/python3
|
||||
|
||||
'''
|
||||
These are the unit tests for the python script that fetches online ArduPilot
|
||||
parameter documentation (if not cached) and adds it to the specified file or
|
||||
to all *.param and *.parm files in the specified directory.
|
||||
|
||||
AP_FLAKE8_CLEAN
|
||||
|
||||
Author: Amilcar do Carmo Lucas, IAV GmbH
|
||||
'''
|
||||
|
||||
import tempfile
|
||||
from unittest.mock import patch, mock_open
|
||||
import os
|
||||
import unittest
|
||||
import xml.etree.ElementTree as ET
|
||||
import requests
|
||||
import mock
|
||||
from annotate_params import get_xml_data, remove_prefix, split_into_lines, create_doc_dict, \
|
||||
format_columns, update_parameter_documentation, print_read_only_params, \
|
||||
BASE_URL, PARAM_DEFINITION_XML_FILE
|
||||
|
||||
|
||||
class TestParamDocsUpdate(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
# Create a temporary directory
|
||||
self.temp_dir = tempfile.mkdtemp()
|
||||
|
||||
# Create a temporary file
|
||||
self.temp_file = tempfile.NamedTemporaryFile(delete=False)
|
||||
|
||||
# Create a dictionary of parameter documentation
|
||||
self.doc_dict = {
|
||||
"PARAM1": {
|
||||
"humanName": "Param 1",
|
||||
"documentation": ["Documentation for Param 1"],
|
||||
"fields": {"Field1": "Value1", "Field2": "Value2"},
|
||||
"values": {"Code1": "Value1", "Code2": "Value2"}
|
||||
},
|
||||
"PARAM2": {
|
||||
"humanName": "Param 2",
|
||||
"documentation": ["Documentation for Param 2"],
|
||||
"fields": {"Field3": "Value3", "Field4": "Value4"},
|
||||
"values": {"Code3": "Value3", "Code4": "Value4"}
|
||||
},
|
||||
"PARAM_1": {
|
||||
"humanName": "Param _ 1",
|
||||
"documentation": ["Documentation for Param_1"],
|
||||
"fields": {"Field_1": "Value_1", "Field_2": "Value_2"},
|
||||
"values": {"Code_1": "Value_1", "Code_2": "Value_2"}
|
||||
},
|
||||
}
|
||||
|
||||
@patch('builtins.open', new_callable=mock_open, read_data='<root></root>')
|
||||
@patch('os.path.isfile')
|
||||
def test_get_xml_data_local_file(self, mock_isfile, mock_open):
|
||||
# Mock the isfile function to return True
|
||||
mock_isfile.return_value = True
|
||||
|
||||
# Call the function with a local file
|
||||
result = get_xml_data("/path/to/local/file/", ".", "test.xml")
|
||||
|
||||
# Check the result
|
||||
self.assertIsInstance(result, ET.Element)
|
||||
|
||||
# Assert that the file was opened correctly
|
||||
mock_open.assert_called_once_with('./test.xml', 'r', encoding='utf-8')
|
||||
|
||||
@patch('requests.get')
|
||||
def test_get_xml_data_remote_file(self, mock_get):
|
||||
# Mock the response
|
||||
mock_get.return_value.status_code = 200
|
||||
mock_get.return_value.text = "<root></root>"
|
||||
|
||||
# Remove the test.xml file if it exists
|
||||
try:
|
||||
os.remove("test.xml")
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
|
||||
# Call the function with a remote file
|
||||
result = get_xml_data("http://example.com/", ".", "test.xml")
|
||||
|
||||
# Check the result
|
||||
self.assertIsInstance(result, ET.Element)
|
||||
|
||||
# Assert that the requests.get function was called once
|
||||
mock_get.assert_called_once_with("http://example.com/test.xml", timeout=5)
|
||||
|
||||
@patch('os.path.isfile')
|
||||
def test_get_xml_data_script_dir_file(self, mock_isfile):
|
||||
# Mock the isfile function to return False for the current directory and True for the script directory
|
||||
def side_effect(filename):
|
||||
return True
|
||||
mock_isfile.side_effect = side_effect
|
||||
|
||||
# Mock the open function to return a dummy XML string
|
||||
mock_open = mock.mock_open(read_data='<root></root>')
|
||||
with patch('builtins.open', mock_open):
|
||||
# Call the function with a filename that exists in the script directory
|
||||
result = get_xml_data(BASE_URL, ".", PARAM_DEFINITION_XML_FILE)
|
||||
|
||||
# Check the result
|
||||
self.assertIsInstance(result, ET.Element)
|
||||
|
||||
# Assert that the file was opened correctly
|
||||
mock_open.assert_called_once_with(os.path.join('.', PARAM_DEFINITION_XML_FILE), 'r', encoding='utf-8')
|
||||
|
||||
def test_get_xml_data_no_requests_package(self):
|
||||
# Temporarily remove the requests module
|
||||
with patch.dict('sys.modules', {'requests': None}):
|
||||
|
||||
# Remove the test.xml file if it exists
|
||||
try:
|
||||
os.remove("test.xml")
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
|
||||
# Call the function with a remote file
|
||||
with self.assertRaises(SystemExit):
|
||||
get_xml_data("http://example.com/", ".", "test.xml")
|
||||
|
||||
@patch('requests.get')
|
||||
def test_get_xml_data_request_failure(self, mock_get):
|
||||
# Mock the response
|
||||
mock_get.side_effect = requests.exceptions.RequestException
|
||||
|
||||
# Remove the test.xml file if it exists
|
||||
try:
|
||||
os.remove("test.xml")
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
|
||||
# Call the function with a remote file
|
||||
with self.assertRaises(SystemExit):
|
||||
get_xml_data("http://example.com/", ".", "test.xml")
|
||||
|
||||
@patch('requests.get')
|
||||
def test_get_xml_data_valid_xml(self, mock_get):
|
||||
# Mock the response
|
||||
mock_get.return_value.status_code = 200
|
||||
mock_get.return_value.text = "<root></root>"
|
||||
|
||||
# Call the function with a remote file
|
||||
result = get_xml_data("http://example.com/", ".", "test.xml")
|
||||
|
||||
# Check the result
|
||||
self.assertIsInstance(result, ET.Element)
|
||||
|
||||
@patch('requests.get')
|
||||
def test_get_xml_data_invalid_xml(self, mock_get):
|
||||
# Mock the response
|
||||
mock_get.return_value.status_code = 200
|
||||
mock_get.return_value.text = "<root><invalid></root>"
|
||||
|
||||
# Remove the test.xml file if it exists
|
||||
try:
|
||||
os.remove("test.xml")
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
|
||||
# Call the function with a remote file
|
||||
with self.assertRaises(ET.ParseError):
|
||||
get_xml_data("http://example.com/", ".", "test.xml")
|
||||
|
||||
@patch('requests.get')
|
||||
@patch('os.path.isfile')
|
||||
def test_get_xml_data_missing_file(self, mock_isfile, mock_get):
|
||||
# Mock the isfile function to return False
|
||||
mock_isfile.return_value = False
|
||||
# Mock the requests.get call to raise FileNotFoundError
|
||||
mock_get.side_effect = FileNotFoundError
|
||||
|
||||
# Remove the test.xml file if it exists
|
||||
try:
|
||||
os.remove("test.xml")
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
|
||||
# Call the function with a local file
|
||||
with self.assertRaises(FileNotFoundError):
|
||||
get_xml_data("/path/to/local/file/", ".", "test.xml")
|
||||
|
||||
@patch('requests.get')
|
||||
def test_get_xml_data_network_issue(self, mock_get):
|
||||
# Mock the response
|
||||
mock_get.side_effect = requests.exceptions.ConnectionError
|
||||
|
||||
# Call the function with a remote file
|
||||
with self.assertRaises(SystemExit):
|
||||
get_xml_data("http://example.com/", ".", "test.xml")
|
||||
|
||||
def test_remove_prefix(self):
|
||||
# Test case 1: Normal operation
|
||||
self.assertEqual(remove_prefix("prefix_test", "prefix_"), "test")
|
||||
|
||||
# Test case 2: Prefix not present
|
||||
self.assertEqual(remove_prefix("test", "prefix_"), "test")
|
||||
|
||||
# Test case 3: Empty string
|
||||
self.assertEqual(remove_prefix("", "prefix_"), "")
|
||||
|
||||
def test_split_into_lines(self):
|
||||
# Test case 1: Normal operation
|
||||
string_to_split = "This is a test string. It should be split into several lines."
|
||||
maximum_line_length = 12
|
||||
expected_output = ["This is a", "test string.", "It should be", "split into", "several", "lines."]
|
||||
self.assertEqual(split_into_lines(string_to_split, maximum_line_length), expected_output)
|
||||
|
||||
# Test case 2: String shorter than maximum line length
|
||||
string_to_split = "Short"
|
||||
maximum_line_length = 10
|
||||
expected_output = ["Short"]
|
||||
self.assertEqual(split_into_lines(string_to_split, maximum_line_length), expected_output)
|
||||
|
||||
# Test case 3: Empty string
|
||||
string_to_split = ""
|
||||
maximum_line_length = 10
|
||||
expected_output = []
|
||||
self.assertEqual(split_into_lines(string_to_split, maximum_line_length), expected_output)
|
||||
|
||||
def test_create_doc_dict(self):
|
||||
# Mock XML data
|
||||
xml_data = '''
|
||||
<root>
|
||||
<param name="PARAM1" humanName="Param 1" documentation="Documentation for Param 1">
|
||||
<field name="Field1">Value1</field>
|
||||
<field name="Field2">Value2</field>
|
||||
<values>
|
||||
<value code="Code1">Value1</value>
|
||||
<value code="Code2">Value2</value>
|
||||
</values>
|
||||
</param>
|
||||
<param name="PARAM2" humanName="Param 2" documentation="Documentation for Param 2">
|
||||
<field name="Units">m/s</field>
|
||||
<field name="UnitText">meters per second</field>
|
||||
<values>
|
||||
<value code="Code3">Value3</value>
|
||||
<value code="Code4">Value4</value>
|
||||
</values>
|
||||
</param>
|
||||
</root>
|
||||
'''
|
||||
root = ET.fromstring(xml_data)
|
||||
|
||||
# Expected output
|
||||
expected_output = {
|
||||
"PARAM1": {
|
||||
"humanName": "Param 1",
|
||||
"documentation": ["Documentation for Param 1"],
|
||||
"fields": {"Field1": "Value1", "Field2": "Value2"},
|
||||
"values": {"Code1": "Value1", "Code2": "Value2"}
|
||||
},
|
||||
"PARAM2": {
|
||||
"humanName": "Param 2",
|
||||
"documentation": ["Documentation for Param 2"],
|
||||
"fields": {"Units": "m/s (meters per second)"},
|
||||
"values": {"Code3": "Value3", "Code4": "Value4"}
|
||||
}
|
||||
}
|
||||
|
||||
# Call the function with the mock XML data
|
||||
result = create_doc_dict(root, "VehicleType")
|
||||
|
||||
# Check the result
|
||||
self.assertEqual(result, expected_output)
|
||||
|
||||
def test_format_columns(self):
|
||||
# Define the input
|
||||
values = {
|
||||
"Key1": "Value1",
|
||||
"Key2": "Value2",
|
||||
"Key3": "Value3",
|
||||
"Key4": "Value4",
|
||||
"Key5": "Value5",
|
||||
"Key6": "Value6",
|
||||
"Key7": "Value7",
|
||||
"Key8": "Value8",
|
||||
"Key9": "Value9",
|
||||
"Key10": "Value10",
|
||||
"Key11": "Value11",
|
||||
"Key12": "Value12",
|
||||
}
|
||||
|
||||
# Define the expected output
|
||||
expected_output = [
|
||||
'Key1: Value1 Key7: Value7',
|
||||
'Key2: Value2 Key8: Value8',
|
||||
'Key3: Value3 Key9: Value9',
|
||||
'Key4: Value4 Key10: Value10',
|
||||
'Key5: Value5 Key11: Value11',
|
||||
'Key6: Value6 Key12: Value12',
|
||||
]
|
||||
|
||||
# Call the function with the input
|
||||
result = format_columns(values)
|
||||
|
||||
# Check the result
|
||||
self.assertEqual(result, expected_output)
|
||||
|
||||
self.assertEqual(format_columns({}), [])
|
||||
|
||||
def test_update_parameter_documentation(self):
|
||||
# Write some initial content to the temporary file
|
||||
with open(self.temp_file.name, "w", encoding="utf-8") as file:
|
||||
file.write("PARAM1 100\n")
|
||||
|
||||
# Call the function with the temporary file
|
||||
update_parameter_documentation(self.doc_dict, self.temp_file.name)
|
||||
|
||||
# Read the updated content from the temporary file
|
||||
with open(self.temp_file.name, "r", encoding="utf-8") as file:
|
||||
updated_content = file.read()
|
||||
|
||||
# Check if the file has been updated correctly
|
||||
self.assertIn("Param 1", updated_content)
|
||||
self.assertIn("Documentation for Param 1", updated_content)
|
||||
self.assertIn("Field1: Value1", updated_content)
|
||||
self.assertIn("Field2: Value2", updated_content)
|
||||
self.assertIn("Code1: Value1", updated_content)
|
||||
self.assertIn("Code2: Value2", updated_content)
|
||||
|
||||
def test_update_parameter_documentation_sorting_none(self):
|
||||
# Write some initial content to the temporary file
|
||||
# With stray leading and trailing whitespaces
|
||||
with open(self.temp_file.name, "w", encoding="utf-8") as file:
|
||||
file.write("PARAM2 100\n PARAM_1 100 \nPARAM3 3\nPARAM4 4\nPARAM5 5\nPARAM1 100\n")
|
||||
|
||||
# Call the function with the temporary file
|
||||
update_parameter_documentation(self.doc_dict, self.temp_file.name)
|
||||
|
||||
# Read the updated content from the temporary file
|
||||
with open(self.temp_file.name, "r", encoding="utf-8") as file:
|
||||
updated_content = file.read()
|
||||
|
||||
expected_content = '''# Param 2
|
||||
# Documentation for Param 2
|
||||
# Field3: Value3
|
||||
# Field4: Value4
|
||||
# Code3: Value3
|
||||
# Code4: Value4
|
||||
PARAM2 100
|
||||
|
||||
# Param _ 1
|
||||
# Documentation for Param_1
|
||||
# Field_1: Value_1
|
||||
# Field_2: Value_2
|
||||
# Code_1: Value_1
|
||||
# Code_2: Value_2
|
||||
PARAM_1 100
|
||||
PARAM3 3
|
||||
PARAM4 4
|
||||
PARAM5 5
|
||||
|
||||
# Param 1
|
||||
# Documentation for Param 1
|
||||
# Field1: Value1
|
||||
# Field2: Value2
|
||||
# Code1: Value1
|
||||
# Code2: Value2
|
||||
PARAM1 100
|
||||
'''
|
||||
self.assertEqual(updated_content, expected_content)
|
||||
|
||||
def test_update_parameter_documentation_sorting_missionplanner(self):
|
||||
# Write some initial content to the temporary file
|
||||
with open(self.temp_file.name, "w", encoding="utf-8") as file:
|
||||
file.write("PARAM2 100 # ignore, me\nPARAM_1\t100\nPARAM1,100\n")
|
||||
|
||||
# Call the function with the temporary file
|
||||
update_parameter_documentation(self.doc_dict, self.temp_file.name, "missionplanner")
|
||||
|
||||
# Read the updated content from the temporary file
|
||||
with open(self.temp_file.name, "r", encoding="utf-8") as file:
|
||||
updated_content = file.read()
|
||||
|
||||
expected_content = '''# Param _ 1
|
||||
# Documentation for Param_1
|
||||
# Field_1: Value_1
|
||||
# Field_2: Value_2
|
||||
# Code_1: Value_1
|
||||
# Code_2: Value_2
|
||||
PARAM_1\t100
|
||||
|
||||
# Param 1
|
||||
# Documentation for Param 1
|
||||
# Field1: Value1
|
||||
# Field2: Value2
|
||||
# Code1: Value1
|
||||
# Code2: Value2
|
||||
PARAM1,100
|
||||
|
||||
# Param 2
|
||||
# Documentation for Param 2
|
||||
# Field3: Value3
|
||||
# Field4: Value4
|
||||
# Code3: Value3
|
||||
# Code4: Value4
|
||||
PARAM2 100 # ignore, me
|
||||
'''
|
||||
self.assertEqual(updated_content, expected_content)
|
||||
|
||||
def test_update_parameter_documentation_sorting_mavproxy(self):
|
||||
# Write some initial content to the temporary file
|
||||
with open(self.temp_file.name, "w", encoding="utf-8") as file:
|
||||
file.write("PARAM2 100\nPARAM_1\t100\nPARAM1,100\n")
|
||||
|
||||
# Call the function with the temporary file
|
||||
update_parameter_documentation(self.doc_dict, self.temp_file.name, "mavproxy")
|
||||
|
||||
# Read the updated content from the temporary file
|
||||
with open(self.temp_file.name, "r", encoding="utf-8") as file:
|
||||
updated_content = file.read()
|
||||
|
||||
expected_content = '''# Param 1
|
||||
# Documentation for Param 1
|
||||
# Field1: Value1
|
||||
# Field2: Value2
|
||||
# Code1: Value1
|
||||
# Code2: Value2
|
||||
PARAM1,100
|
||||
|
||||
# Param 2
|
||||
# Documentation for Param 2
|
||||
# Field3: Value3
|
||||
# Field4: Value4
|
||||
# Code3: Value3
|
||||
# Code4: Value4
|
||||
PARAM2 100
|
||||
|
||||
# Param _ 1
|
||||
# Documentation for Param_1
|
||||
# Field_1: Value_1
|
||||
# Field_2: Value_2
|
||||
# Code_1: Value_1
|
||||
# Code_2: Value_2
|
||||
PARAM_1\t100
|
||||
'''
|
||||
self.assertEqual(updated_content, expected_content)
|
||||
|
||||
def test_update_parameter_documentation_invalid_line_format(self):
|
||||
# Write some initial content to the temporary file with an invalid line format
|
||||
with open(self.temp_file.name, "w", encoding="utf-8") as file:
|
||||
file.write("%INVALID_LINE_FORMAT\n")
|
||||
|
||||
# Call the function with the temporary file
|
||||
with self.assertRaises(SystemExit) as cm:
|
||||
update_parameter_documentation(self.doc_dict, self.temp_file.name)
|
||||
|
||||
# Check if the SystemExit exception contains the expected message
|
||||
self.assertEqual(cm.exception.code, "Invalid line in input file")
|
||||
|
||||
@patch('logging.Logger.info')
|
||||
def test_print_read_only_params(self, mock_info):
|
||||
# Mock XML data
|
||||
xml_data = '''
|
||||
<root>
|
||||
<param name="PARAM1" humanName="Param 1" documentation="Documentation for Param 1">
|
||||
<field name="ReadOnly">True</field>
|
||||
<field name="Field1">Value1</field>
|
||||
<field name="Field2">Value2</field>
|
||||
<values>
|
||||
<value code="Code1">Value1</value>
|
||||
<value code="Code2">Value2</value>
|
||||
</values>
|
||||
</param>
|
||||
<param name="PARAM2" humanName="Param 2" documentation="Documentation for Param 2">
|
||||
<field name="Field3">Value3</field>
|
||||
<field name="Field4">Value4</field>
|
||||
<values>
|
||||
<value code="Code3">Value3</value>
|
||||
<value code="Code4">Value4</value>
|
||||
</values>
|
||||
</param>
|
||||
</root>
|
||||
'''
|
||||
root = ET.fromstring(xml_data)
|
||||
doc_dict = create_doc_dict(root, "VehicleType")
|
||||
|
||||
# Call the function with the mock XML data
|
||||
print_read_only_params(doc_dict)
|
||||
|
||||
# Check if the parameter name was logged
|
||||
mock_info.assert_has_calls([mock.call('ReadOnly parameters:'), mock.call('PARAM1')])
|
||||
|
||||
def test_update_parameter_documentation_invalid_target(self):
|
||||
# Call the function with an invalid target
|
||||
with self.assertRaises(ValueError):
|
||||
update_parameter_documentation(self.doc_dict, "invalid_target")
|
||||
|
||||
def test_invalid_parameter_name(self):
|
||||
# Write some initial content to the temporary file
|
||||
with open(self.temp_file.name, "w", encoding="utf-8") as file:
|
||||
file.write("INVALID_$PARAM 100\n")
|
||||
|
||||
# Call the function with the temporary file
|
||||
with self.assertRaises(SystemExit):
|
||||
update_parameter_documentation(self.doc_dict, self.temp_file.name)
|
||||
|
||||
def test_update_parameter_documentation_too_long_parameter_name(self):
|
||||
# Write some initial content to the temporary file
|
||||
with open(self.temp_file.name, "w", encoding="utf-8") as file:
|
||||
file.write("TOO_LONG_PARAMETER_NAME 100\n")
|
||||
|
||||
# Call the function with the temporary file
|
||||
with self.assertRaises(SystemExit):
|
||||
update_parameter_documentation(self.doc_dict, self.temp_file.name)
|
||||
|
||||
@patch('logging.Logger.warning')
|
||||
def test_missing_parameter_documentation(self, mock_warning):
|
||||
# Write some initial content to the temporary file
|
||||
with open(self.temp_file.name, "w", encoding="utf-8") as file:
|
||||
file.write("MISSING_DOC_PARA 100\n")
|
||||
|
||||
# Call the function with the temporary file
|
||||
update_parameter_documentation(self.doc_dict, self.temp_file.name)
|
||||
|
||||
# Check if the warnings were logged
|
||||
mock_warning.assert_has_calls([
|
||||
mock.call('Read file %s with %d parameters, but only %s of which got documented', self.temp_file.name, 1, 0),
|
||||
mock.call('No documentation found for: %s', 'MISSING_DOC_PARA')
|
||||
])
|
||||
|
||||
def test_empty_parameter_file(self):
|
||||
# Call the function with the temporary file
|
||||
update_parameter_documentation(self.doc_dict, self.temp_file.name)
|
||||
|
||||
# Read the updated content from the temporary file
|
||||
with open(self.temp_file.name, "r", encoding="utf-8") as file:
|
||||
updated_content = file.read()
|
||||
|
||||
# Check if the file is still empty
|
||||
self.assertEqual(updated_content, "")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
|
@ -0,0 +1,410 @@
|
|||
#!/usr/bin/python3
|
||||
|
||||
"""
|
||||
This script fetches online ArduPilot parameter documentation (if not cached) and adds it to the specified file
|
||||
or to all *.param and *.parm files in the specified directory.
|
||||
|
||||
1. Checks if a local cache of the XML file exists in the target directory or on the directory of the target file:
|
||||
- If it does, the script loads the file content.
|
||||
- If it doesn't, the script sends a GET request to the URL to fetch the XML data for the requested vehicle type.
|
||||
2. Parses the XML data and creates a dictionary of parameter documentation.
|
||||
3. DELETES all comments that start at the beginning of a line
|
||||
4. Adds the parameter documentation to the target file or to all *.param,*.parm files in the target directory.
|
||||
|
||||
Supports AP_Periph, AntennaTracker, ArduCopter, ArduPlane, ArduSub, Blimp, Heli, Rover and SITL vehicle types
|
||||
Supports both Mission Planner and MAVProxy file formats
|
||||
Supports sorting the parameters
|
||||
Has unit tests with 88% coverage
|
||||
|
||||
AP_FLAKE8_CLEAN
|
||||
|
||||
Author: Amilcar do Carmo Lucas, IAV GmbH
|
||||
"""
|
||||
|
||||
import os
|
||||
import glob
|
||||
import re
|
||||
from typing import Any, Dict, List, Tuple
|
||||
import xml.etree.ElementTree as ET
|
||||
import argparse
|
||||
import logging
|
||||
|
||||
# URL of the XML file
|
||||
BASE_URL = "https://autotest.ardupilot.org/Parameters/"
|
||||
|
||||
PARAM_DEFINITION_XML_FILE = "apm.pdef.xml"
|
||||
|
||||
# ArduPilot parameter names start with a capital letter and can have capital letters, numbers and _
|
||||
PARAM_NAME_REGEX = r'^[A-Z][A-Z_0-9]*'
|
||||
PARAM_NAME_MAX_LEN = 16
|
||||
VERSION = '1.0'
|
||||
|
||||
|
||||
def arg_parser():
|
||||
parser = argparse.ArgumentParser(description='Fetches on-line ArduPilot parameter documentation and adds it to the '
|
||||
'specified file or to all *.param and *.parm files in the specified directory.')
|
||||
parser.add_argument('target',
|
||||
help='The target file or directory.',
|
||||
)
|
||||
parser.add_argument('-s', '--sort',
|
||||
choices=['none', 'missionplanner', 'mavproxy'],
|
||||
default='none',
|
||||
help='Sort the parameters in the file. Defaults to not sorting.',
|
||||
)
|
||||
parser.add_argument('-t', '--vehicle-type',
|
||||
choices=['AP_Periph', 'AntennaTracker', 'ArduCopter', 'ArduPlane',
|
||||
'ArduSub', 'Blimp', 'Heli', 'Rover', 'SITL'],
|
||||
default='ArduCopter',
|
||||
help='The type of the vehicle. Defaults to ArduCopter',
|
||||
)
|
||||
parser.add_argument('--verbose', action='store_true',
|
||||
help='Increase output verbosity, print ReadOnly parameter list. Defaults to false',
|
||||
)
|
||||
parser.add_argument('-v', '--version', action='version', version=f'%(prog)s {VERSION}',
|
||||
help='Display version information and exit.',
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.verbose:
|
||||
logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s')
|
||||
else:
|
||||
logging.basicConfig(level=logging.WARNING, format='%(levelname)s: %(message)s')
|
||||
|
||||
return args
|
||||
|
||||
|
||||
def get_xml_data(base_url: str, directory: str, filename: str) -> ET.Element:
|
||||
"""
|
||||
Fetches XML data from a local file or a URL.
|
||||
|
||||
Args:
|
||||
base_url (str): The base URL for fetching the XML file.
|
||||
directory (str): The directory where the XML file is expected.
|
||||
filename (str): The name of the XML file.
|
||||
|
||||
Returns:
|
||||
ET.Element: The root element of the parsed XML data.
|
||||
"""
|
||||
file_path = os.path.join(directory, filename)
|
||||
# Check if the locally cached file exists
|
||||
if os.path.isfile(file_path):
|
||||
# Load the file content relative to the script location
|
||||
with open(file_path, "r", encoding="utf-8") as file:
|
||||
xml_data = file.read()
|
||||
else:
|
||||
# No locally cached file exists, get it from the internet
|
||||
try:
|
||||
import requests # pylint: disable=C0415
|
||||
except ImportError:
|
||||
logging.error("The requests package was not found")
|
||||
logging.error("Please install it by running 'pip install requests' in your terminal.")
|
||||
raise SystemExit("requests package is not installed") # pylint: disable=W0707
|
||||
try:
|
||||
# Send a GET request to the URL
|
||||
response = requests.get(base_url + filename, timeout=5)
|
||||
except requests.exceptions.RequestException as e:
|
||||
logging.error("Unable to fetch XML data: %s", e)
|
||||
raise SystemExit("unable to fetch online XML documentation") # pylint: disable=W0707
|
||||
# Get the text content of the response
|
||||
xml_data = response.text
|
||||
# Write the content to a file
|
||||
with open(os.path.join(directory, filename), "w", encoding="utf-8") as file:
|
||||
file.write(xml_data)
|
||||
|
||||
# Parse the XML data
|
||||
root = ET.fromstring(xml_data)
|
||||
return root
|
||||
|
||||
|
||||
def remove_prefix(text: str, prefix: str) -> str:
|
||||
"""
|
||||
Removes a prefix from a string.
|
||||
|
||||
Args:
|
||||
text (str): The original string.
|
||||
prefix (str): The prefix to remove.
|
||||
|
||||
Returns:
|
||||
str: The string without the prefix.
|
||||
"""
|
||||
if text.startswith(prefix):
|
||||
return text[len(prefix):]
|
||||
return text
|
||||
|
||||
|
||||
def split_into_lines(string_to_split: str, maximum_line_length: int) -> List[str]:
|
||||
"""
|
||||
Splits a string into lines of a maximum length.
|
||||
|
||||
Args:
|
||||
string_to_split (str): The string to split.
|
||||
maximum_line_length (int): The maximum length of a line.
|
||||
|
||||
Returns:
|
||||
List[str]: The list of lines.
|
||||
"""
|
||||
doc_lines = re.findall(
|
||||
r".{1," + str(maximum_line_length) + r"}(?:\s|$)", string_to_split
|
||||
)
|
||||
# Remove trailing whitespace from each line
|
||||
return [line.rstrip() for line in doc_lines]
|
||||
|
||||
|
||||
def create_doc_dict(root: ET.Element, vehicle_type: str) -> Dict[str, Any]:
|
||||
"""
|
||||
Create a dictionary of parameter documentation from the root element of the parsed XML data.
|
||||
|
||||
Args:
|
||||
root (ET.Element): The root element of the parsed XML data.
|
||||
|
||||
Returns:
|
||||
Dict[str, Any]: A dictionary of parameter documentation.
|
||||
"""
|
||||
# Dictionary to store the parameter documentation
|
||||
doc = {}
|
||||
|
||||
# Use the findall method with an XPath expression to find all "param" elements
|
||||
for param in root.findall(".//param"):
|
||||
name = param.get("name")
|
||||
# Remove the <vehicle_type>: prefix from the name if it exists
|
||||
name = remove_prefix(name, vehicle_type + ":")
|
||||
|
||||
human_name = param.get("humanName")
|
||||
documentation = split_into_lines(param.get("documentation"), 100)
|
||||
# the keys are the "name" attribute of the "field" sub-elements
|
||||
# the values are the text content of the "field" sub-elements
|
||||
fields = {field.get("name"): field.text for field in param.findall("field")}
|
||||
# if Units and UnitText exist, combine them into a single element
|
||||
delete_unit_text = False
|
||||
for key, value in fields.items():
|
||||
if key == "Units" and "UnitText" in fields:
|
||||
fields[key] = f"{value} ({fields['UnitText']})"
|
||||
delete_unit_text = True
|
||||
if delete_unit_text:
|
||||
del fields['UnitText']
|
||||
# the keys are the "code" attribute of the "values/value" sub-elements
|
||||
# the values are the text content of the "values/value" sub-elements
|
||||
values = {value.get("code"): value.text for value in param.findall("values/value")}
|
||||
|
||||
# Dictionary with "Parameter names" as keys and the values is a
|
||||
# dictionary with "humanName", "documentation" attributes and
|
||||
# "fields", "values" sub-elements.
|
||||
doc[name] = {
|
||||
"humanName": human_name,
|
||||
"documentation": documentation,
|
||||
"fields": fields,
|
||||
"values": values,
|
||||
}
|
||||
|
||||
return doc
|
||||
|
||||
|
||||
def format_columns(values: Dict[str, Any], max_width: int = 105, max_columns: int = 4) -> List[str]:
|
||||
"""
|
||||
Formats a dictionary of values into column-major horizontally aligned columns.
|
||||
It uses at most max_columns columns
|
||||
|
||||
Args:
|
||||
values (Dict[str, Any]): The dictionary of values to format.
|
||||
max_width (int, optional): The maximum number of characters on all columns. Defaults to 105.
|
||||
|
||||
Returns:
|
||||
List[str]: The list of formatted strings.
|
||||
"""
|
||||
# Convert the dictionary into a list of strings
|
||||
strings = [f"{k}: {v}" for k, v in values.items()]
|
||||
|
||||
if (not strings) or (len(strings) == 0):
|
||||
return []
|
||||
|
||||
# Calculate the maximum length of the strings
|
||||
max_len = max(len(s) for s in strings)
|
||||
|
||||
# Determine the number of columns
|
||||
# Column distribution will only happen if it results in more than 5 rows
|
||||
# The strings will be distributed evenly across up-to max_columns columns.
|
||||
for num_cols in range(max_columns, 0, -1):
|
||||
if len(strings) // num_cols > 5 and (max_len + 2) * num_cols < max_width:
|
||||
break
|
||||
|
||||
# Calculate the column width
|
||||
col_width = max_width // num_cols
|
||||
|
||||
num_rows = (len(strings) + num_cols - 1) // num_cols
|
||||
|
||||
formatted_strings = []
|
||||
for j in range(num_rows):
|
||||
row = []
|
||||
for i in range(num_cols):
|
||||
if i*num_rows + j < len(strings):
|
||||
if i < num_cols - 1 and ((i+1)*num_rows + j < len(strings)):
|
||||
row.append(strings[i*num_rows + j].ljust(col_width))
|
||||
else:
|
||||
row.append(strings[i*num_rows + j])
|
||||
formatted_strings.append(" ".join(row))
|
||||
|
||||
return formatted_strings
|
||||
|
||||
|
||||
def extract_parameter_name(item: str) -> str:
|
||||
"""
|
||||
Extract the parameter name from a line. Very simple to be used in sorting
|
||||
"""
|
||||
item = item.strip()
|
||||
match = re.match(PARAM_NAME_REGEX, item)
|
||||
return match.group(0) if match else item
|
||||
|
||||
|
||||
def missionplanner_sort(item: str) -> Tuple[str, ...]:
|
||||
"""
|
||||
MissionPlanner parameter sorting function
|
||||
"""
|
||||
# Split the parameter name by underscore
|
||||
parts = extract_parameter_name(item).split("_")
|
||||
# Compare the parts separately
|
||||
return tuple(parts)
|
||||
|
||||
|
||||
def extract_parameter_name_and_validate(line: str, filename: str, line_nr: int) -> str:
|
||||
"""
|
||||
Extracts the parameter name from a line and validates it.
|
||||
Args:
|
||||
line (str): The line to extract the parameter name from.
|
||||
Returns:
|
||||
str: The extracted parameter name.
|
||||
Raises:
|
||||
SystemExit: If the line is invalid or the parameter name is too long or invalid.
|
||||
"""
|
||||
# Extract the parameter name
|
||||
match = re.match(PARAM_NAME_REGEX, line)
|
||||
if match:
|
||||
param_name = match.group(0)
|
||||
else:
|
||||
logging.error("Invalid line %d in file %s: %s", line_nr, filename, line)
|
||||
raise SystemExit("Invalid line in input file")
|
||||
param_len = len(param_name)
|
||||
param_sep = line[param_len] # the character following the parameter name must be a separator
|
||||
if param_sep not in {',', ' ', '\t'}:
|
||||
logging.error("Invalid parameter name %s on line %d in file %s", param_name, line_nr,
|
||||
filename)
|
||||
raise SystemExit("Invalid parameter name")
|
||||
if param_len > PARAM_NAME_MAX_LEN:
|
||||
logging.error("Too long parameter name on line %d in file %s", line_nr, filename)
|
||||
raise SystemExit("Too long parameter name")
|
||||
return param_name
|
||||
|
||||
|
||||
def update_parameter_documentation(doc: Dict[str, Any], target: str = '.', sort_type: str = 'none') -> None:
|
||||
"""
|
||||
Updates the parameter documentation in the target file or in all *.param,*.parm files of the target directory.
|
||||
|
||||
This function iterates over all the ArduPilot parameter files in the target directory or file.
|
||||
For each file, it DELETES all comments that start at the beginning of a line, optionally sorts the
|
||||
parameter names and checks if the parameter name is in the dictionary of parameter documentation.
|
||||
If it is, it prefixes the line with comment derived from the dictionary element.
|
||||
If it's not, it copies the parameter line 1-to-1.
|
||||
After processing all the parameters in a file, it writes the new lines back to the file.
|
||||
|
||||
Args:
|
||||
doc (Dict[str, Any]): A dictionary of parameter documentation.
|
||||
target (str, optional): The target directory or file. Defaults to '.'.
|
||||
sort_type (str, optional): The type of sorting to apply to the parameters.
|
||||
Can be 'none', 'missionplanner', or 'mavproxy'. Defaults to 'none'.
|
||||
"""
|
||||
# Check if the target is a file or a directory
|
||||
if os.path.isfile(target):
|
||||
# If it's a file, process only that file
|
||||
param_files = [target]
|
||||
elif os.path.isdir(target):
|
||||
# If it's a directory, process all .param and .parm files in that directory
|
||||
param_files = glob.glob(os.path.join(target, "*.param")) \
|
||||
+ glob.glob(os.path.join(target, "*.parm"))
|
||||
else:
|
||||
raise ValueError(f"Target '{target}' is neither a file nor a directory.")
|
||||
|
||||
# Iterate over all the target ArduPilot parameter files
|
||||
for param_file in param_files:
|
||||
|
||||
# Read the entire file contents
|
||||
with open(param_file, "r", encoding="utf-8") as file:
|
||||
lines = file.readlines()
|
||||
|
||||
new_lines = []
|
||||
total_params = 0
|
||||
documented_params = 0
|
||||
undocumented_params = []
|
||||
is_first_param_in_file = True # pylint: disable=C0103
|
||||
if sort_type == "missionplanner":
|
||||
lines.sort(key=missionplanner_sort)
|
||||
if sort_type == "mavproxy":
|
||||
lines.sort(key=extract_parameter_name)
|
||||
for n, line in enumerate(lines, start=1):
|
||||
line = line.strip()
|
||||
if not line.startswith("#") and line:
|
||||
param_name = extract_parameter_name_and_validate(line, param_file, n)
|
||||
|
||||
if param_name in doc:
|
||||
# If the parameter name is in the dictionary,
|
||||
# prefix the line with comment derived from the dictionary element
|
||||
data = doc[param_name]
|
||||
prefix_parts = [
|
||||
f"{data['humanName']}",
|
||||
]
|
||||
prefix_parts += data["documentation"]
|
||||
for key, value in data["fields"].items():
|
||||
prefix_parts.append(f"{key}: {value}")
|
||||
prefix_parts += format_columns(data["values"])
|
||||
doc_text = "\n# ".join(prefix_parts) # pylint: disable=C0103
|
||||
if not is_first_param_in_file:
|
||||
new_lines.append("\n")
|
||||
new_lines.append(f"# {doc_text}\n{line}\n")
|
||||
documented_params += 1
|
||||
else:
|
||||
# If the parameter name is in not the dictionary, copy the parameter line 1-to-1
|
||||
new_lines.append(f"{line}\n")
|
||||
undocumented_params.append(param_name)
|
||||
total_params += 1
|
||||
is_first_param_in_file = False
|
||||
|
||||
if total_params == documented_params:
|
||||
logging.info("Read file %s with %d parameters, all got documented",
|
||||
param_file, total_params)
|
||||
else:
|
||||
logging.warning("Read file %s with %d parameters, but only %s of which got documented",
|
||||
param_file, total_params, documented_params)
|
||||
logging.warning("No documentation found for: %s", ", ".join(undocumented_params))
|
||||
|
||||
# Write the new file contents to the file
|
||||
with open(param_file, "w", encoding="utf-8") as file:
|
||||
file.writelines(new_lines)
|
||||
|
||||
|
||||
def print_read_only_params(doc):
|
||||
"""
|
||||
Print the names of read-only parameters.
|
||||
|
||||
Args:
|
||||
doc (dict): A dictionary of parameter documentation.
|
||||
"""
|
||||
logging.info("ReadOnly parameters:")
|
||||
for param_name, param_value in doc.items():
|
||||
if 'ReadOnly' in param_value['fields'] and param_value['fields']['ReadOnly']:
|
||||
logging.info(param_name)
|
||||
|
||||
|
||||
def main():
|
||||
args = arg_parser()
|
||||
try:
|
||||
xml_dir = args.target if os.path.isdir(args.target) else os.path.dirname(os.path.realpath(args.target))
|
||||
xml_root = get_xml_data(BASE_URL + args.vehicle_type + "/", xml_dir, PARAM_DEFINITION_XML_FILE)
|
||||
doc_dict = create_doc_dict(xml_root, args.vehicle_type)
|
||||
update_parameter_documentation(doc_dict, args.target, args.sort)
|
||||
if args.verbose:
|
||||
print_read_only_params(doc_dict)
|
||||
except Exception as exp: # pylint: disable=W0718
|
||||
logging.fatal(exp)
|
||||
exit(1) # pylint: disable=R1722
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
Loading…
Reference in New Issue