Tools: ROS 2 update launch scripts and tests

Add launch module to ardupilot_sitl package
- Move body of launch scripts into package for reuse.
- Add utilities module.

Update launch scripts
- Provide type hints.
- Add arguments to sitl.launch.py.
- Fix formatting bug in sitl.launch.py home argument.

Update micro_ros_agent launch script
- Modify import for lauch_ros.actions.Node.
- Change argument order in node initialiser.
- Add args for UDP transport and set as default.

Update ROS 2 DDS default params
- Add default params for both serial and UDP transports.
- Add DDS_ENABLE.

Rename ROS 2 sitl_dds launch script
- Rename sitl_dds launch script with serial suffix.
- Add launch script for UDP transport.

Update ROS 2 launch test fixtures
- Make common test fixtures more granular.
- Add fixtures and tests for UDP transport.
- Update ROS 2 package.xml dependencies
- Use yield rather than return in test fixtures.
- Use ardupilot_sitl launch module directly.
- Correct return type descriptions.
- Fix flake8 failure.

Update ROS 2 README
- Update launch instructions.

Signed-off-by: Rhys Mainwaring <rhys.mainwaring@me.com>
This commit is contained in:
Rhys Mainwaring 2023-03-07 18:42:53 +00:00 committed by Andrew Tridgell
parent af534b5b91
commit 21ea0d9794
18 changed files with 1217 additions and 613 deletions

View File

@ -187,7 +187,7 @@ ros2 run micro_ros_agent micro_ros_agent serial --baudrate 115200 --dev ./dev/tt
```
```bash
arducopter --synthetic-clock --wipe --model quad --speedup 1 --slave 0 --instance 0 --uartC uart:./dev/ttyROS1 --defaults $(ros2 pkg prefix ardupilot_sitl)/share/ardupilot_sitl/config/default_params/copter.parm,$(ros2 pkg prefix ardupilot_sitl)/share/ardupilot_sitl/config/default_params/dds.parm --sim-address 127.0.0.1
arducopter --synthetic-clock --wipe --model quad --speedup 1 --slave 0 --instance 0 --uartC uart:./dev/ttyROS1 --defaults $(ros2 pkg prefix ardupilot_sitl)/share/ardupilot_sitl/config/default_params/copter.parm,$(ros2 pkg prefix ardupilot_sitl)/share/ardupilot_sitl/config/default_params/dds_serial.parm --sim-address 127.0.0.1
```
```bash
@ -201,11 +201,11 @@ ros2 launch ardupilot_sitl virtual_ports.launch.py tty0:=./dev/ttyROS0 tty1:=./d
```
```bash
ros2 launch ardupilot_sitl micro_ros_agent.launch.py refs:=$(ros2 pkg prefix ardupilot_sitl)/share/ardupilot_sitl/config/dds_xrce_profile.xml baudrate:=115200 device:=./dev/ttyROS0
ros2 launch ardupilot_sitl micro_ros_agent.launch.py transport:=serial refs:=$(ros2 pkg prefix ardupilot_sitl)/share/ardupilot_sitl/config/dds_xrce_profile.xml baudrate:=115200 device:=./dev/ttyROS0
```
```bash
ros2 launch ardupilot_sitl sitl.launch.py synthetic_clock:=True wipe:=True model:=quad speedup:=1 slave:=0 instance:=0 uartC:=uart:./dev/ttyROS1 defaults:=$(ros2 pkg prefix ardupilot_sitl)/share/ardupilot_sitl/config/default_params/copter.parm,$(ros2 pkg prefix ardupilot_sitl)/share/ardupilot_sitl/config/default_params/dds.parm sim_address:=127.0.0.1
ros2 launch ardupilot_sitl sitl.launch.py synthetic_clock:=True wipe:=True model:=quad speedup:=1 slave:=0 instance:=0 uartC:=uart:./dev/ttyROS1 defaults:=$(ros2 pkg prefix ardupilot_sitl)/share/ardupilot_sitl/config/default_params/copter.parm,$(ros2 pkg prefix ardupilot_sitl)/share/ardupilot_sitl/config/default_params/dds_serial.parm sim_address:=127.0.0.1
```
```bash
@ -215,11 +215,12 @@ ros2 launch ardupilot_sitl mavproxy.launch.py master:=tcp:127.0.0.1:5760 sitl:=1
Using combined launch file
```bash
ros2 launch ardupilot_sitl sitl_dds.launch.py \
ros2 launch ardupilot_sitl sitl_dds_serial.launch.py \
\
tty0:=./dev/ttyROS0 \
tty1:=./dev/ttyROS1 \
\
transport:=serial \
refs:=$(ros2 pkg prefix ardupilot_sitl)/share/ardupilot_sitl/config/dds_xrce_profile.xml \
baudrate:=115200 \
device:=./dev/ttyROS0 \
@ -231,9 +232,15 @@ speedup:=1 \
slave:=0 \
instance:=0 \
uartC:=uart:./dev/ttyROS1 \
defaults:=$(ros2 pkg prefix ardupilot_sitl)/share/ardupilot_sitl/config/default_params/copter.parm,$(ros2 pkg prefix ardupilot_sitl)/share/ardupilot_sitl/config/default_params/dds.parm \
defaults:=$(ros2 pkg prefix ardupilot_sitl)/share/ardupilot_sitl/config/default_params/copter.parm,$(ros2 pkg prefix ardupilot_sitl)/share/ardupilot_sitl/config/default_params/dds_serial.parm \
sim_address:=127.0.0.1 \
\
master:=tcp:127.0.0.1:5760 \
sitl:=127.0.0.1:5501
```
UDP version
```
ros2 launch ardupilot_sitl sitl_dds_udp.launch.py transport:=udp4 refs:=$(ros2 pkg prefix ardupilot_sitl)/share/ardupilot_sitl/config/dds_xrce_profile.xml synthetic_clock:=True wipe:=False model:=quad speedup:=1 slave:=0 instance:=0 defaults:=$(ros2 pkg prefix ardupilot_sitl)/share/ardupilot_sitl/config/default_params/copter.parm,$(ros2 pkg prefix ardupilot_sitl)/share/ardupilot_sitl/config/default_params/dds_udp.parm sim_address:=127.0.0.1 master:=tcp:127.0.0.1:5760 sitl:=127.0.0.1:5501
```

View File

@ -8,6 +8,7 @@
<license>GLP-3.0</license>
<exec_depend>ament_index_python</exec_depend>
<exec_depend>ardupilot_sitl</exec_depend>
<exec_depend>launch</exec_depend>
<exec_depend>launch_pytest</exec_depend>
<exec_depend>launch_ros</exec_depend>
@ -18,6 +19,7 @@
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>ardupilot_sitl</test_depend>
<test_depend>launch</test_depend>
<test_depend>launch_pytest</test_depend>
<test_depend>launch_ros</test_depend>

View File

@ -20,65 +20,60 @@ import pytest
from ament_index_python.packages import get_package_share_directory
from launch import LaunchDescription
from launch import LaunchDescriptionSource
from launch.actions import IncludeLaunchDescription
from launch.launch_description_sources import PythonLaunchDescriptionSource
from launch.substitutions import PathJoinSubstitution
from launch_ros.substitutions import FindPackageShare
from pathlib import Path
from ardupilot_sitl.launch import VirtualPortsLaunch
from ardupilot_sitl.launch import MicroRosAgentLaunch
from ardupilot_sitl.launch import MAVProxyLaunch
from ardupilot_sitl.launch import SITLLaunch
@pytest.fixture(scope="module")
def device_dir(tmp_path_factory):
"""Fixture to create a temporary directory for devices."""
path = tmp_path_factory.mktemp("devices")
return path
# Create dev directory for virtual ports.
os.mkdir(Path(path, "dev"))
yield path
@pytest.fixture
def sitl_dds(device_dir):
"""Fixture to bring up ArduPilot SITL DDS."""
# Create directory for virtual ports.
print(f"\ntmpdirname: {device_dir}\n")
if not ("dev" in os.listdir(device_dir)):
os.mkdir(Path(device_dir, "dev"))
# Full path to virtual ports.
def virtual_ports(device_dir):
"""Fixture to create virtual ports."""
tty0 = Path(device_dir, "dev", "tty0").resolve()
tty1 = Path(device_dir, "dev", "tty1").resolve()
virtual_ports = IncludeLaunchDescription(
PythonLaunchDescriptionSource(
[
PathJoinSubstitution(
[
FindPackageShare("ardupilot_sitl"),
"launch",
"virtual_ports.launch.py",
]
),
]
),
vp_ld, vp_actions = VirtualPortsLaunch.generate_launch_description_with_actions()
ld = IncludeLaunchDescription(
LaunchDescriptionSource(vp_ld),
launch_arguments={
"tty0": str(tty0),
"tty1": str(tty1),
}.items(),
)
yield ld, vp_actions
micro_ros_agent = IncludeLaunchDescription(
PythonLaunchDescriptionSource(
[
PathJoinSubstitution(
[
FindPackageShare("ardupilot_sitl"),
"launch",
"micro_ros_agent.launch.py",
]
),
]
),
@pytest.fixture
def micro_ros_agent_serial(device_dir):
"""Fixture to create a micro_ros_agent node."""
tty0 = Path(device_dir, "dev", "tty0").resolve()
mra_ld, mra_actions = MicroRosAgentLaunch.generate_launch_description_with_actions()
ld = IncludeLaunchDescription(
LaunchDescriptionSource(mra_ld),
launch_arguments={
"transport": "serial",
"refs": PathJoinSubstitution(
[
FindPackageShare("ardupilot_sitl"),
@ -90,23 +85,62 @@ def sitl_dds(device_dir):
"device": str(tty0),
}.items(),
)
yield ld, mra_actions
sitl = IncludeLaunchDescription(
PythonLaunchDescriptionSource(
[
PathJoinSubstitution(
[
FindPackageShare("ardupilot_sitl"),
"launch",
"sitl.launch.py",
]
),
]
),
@pytest.fixture
def micro_ros_agent_udp():
"""Fixture to create a micro_ros_agent node."""
mra_ld, mra_actions = MicroRosAgentLaunch.generate_launch_description_with_actions()
ld = IncludeLaunchDescription(
LaunchDescriptionSource(mra_ld),
launch_arguments={
"transport": "udp4",
"refs": PathJoinSubstitution(
[
FindPackageShare("ardupilot_sitl"),
"config",
"dds_xrce_profile.xml",
]
),
}.items(),
)
yield ld, mra_actions
@pytest.fixture
def mavproxy():
"""Fixture to bring up MAVProxy."""
mp_ld, mp_actions = MAVProxyLaunch.generate_launch_description_with_actions()
ld = IncludeLaunchDescription(
LaunchDescriptionSource(mp_ld),
launch_arguments={
"master": "tcp:127.0.0.1:5760",
"sitl": "127.0.0.1:5501",
}.items(),
)
yield ld, mp_actions
@pytest.fixture
def sitl_copter_dds_serial(device_dir, virtual_ports, micro_ros_agent_serial, mavproxy):
"""Fixture to bring up ArduPilot SITL DDS."""
tty1 = Path(device_dir, "dev", "tty1").resolve()
vp_ld, vp_actions = virtual_ports
mra_ld, mra_actions = micro_ros_agent_serial
mp_ld, mp_actions = mavproxy
sitl_ld, sitl_actions = SITLLaunch.generate_launch_description_with_actions()
sitl_ld_args = IncludeLaunchDescription(
LaunchDescriptionSource(sitl_ld),
launch_arguments={
"command": "arducopter",
"synthetic_clock": "True",
"wipe": "True",
# "wipe": "True",
"wipe": "False",
"model": "quad",
"speedup": "10",
"slave": "0",
@ -126,35 +160,75 @@ def sitl_dds(device_dir):
get_package_share_directory("ardupilot_sitl"),
"config",
"default_params",
"dds.parm",
"dds_serial.parm",
)
),
}.items(),
)
mavproxy = IncludeLaunchDescription(
PythonLaunchDescriptionSource(
[
PathJoinSubstitution(
[
FindPackageShare("ardupilot_sitl"),
"launch",
"mavproxy.launch.py",
]
),
]
),
ld = LaunchDescription(
[
vp_ld,
mra_ld,
mp_ld,
sitl_ld_args,
]
)
actions = {}
actions.update(vp_actions)
actions.update(mra_actions)
actions.update(mp_actions)
actions.update(sitl_actions)
yield ld, actions
@pytest.fixture
def sitl_copter_dds_udp(micro_ros_agent_udp, mavproxy):
"""Fixture to bring up ArduPilot SITL DDS."""
mra_ld, mra_actions = micro_ros_agent_udp
mp_ld, mp_actions = mavproxy
sitl_ld, sitl_actions = SITLLaunch.generate_launch_description_with_actions()
sitl_ld_args = IncludeLaunchDescription(
LaunchDescriptionSource(sitl_ld),
launch_arguments={
"master": "tcp:127.0.0.1:5760",
"sitl": "127.0.0.1:5501",
"command": "arducopter",
"synthetic_clock": "True",
# "wipe": "True",
"wipe": "False",
"model": "quad",
"speedup": "10",
"slave": "0",
"instance": "0",
"defaults": str(
Path(
get_package_share_directory("ardupilot_sitl"),
"config",
"default_params",
"copter.parm",
)
)
+ ","
+ str(
Path(
get_package_share_directory("ardupilot_sitl"),
"config",
"default_params",
"dds_udp.parm",
)
),
}.items(),
)
return LaunchDescription(
ld = LaunchDescription(
[
virtual_ports,
micro_ros_agent,
sitl,
mavproxy,
mra_ld,
mp_ld,
sitl_ld_args,
]
)
actions = {}
actions.update(mra_actions)
actions.update(mp_actions)
actions.update(sitl_actions)
yield ld, actions

View File

@ -22,22 +22,15 @@ import threading
from launch import LaunchDescription
from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy
from launch_pytest.tools import process as process_tools
from rclpy.qos import QoSProfile
from rclpy.qos import QoSReliabilityPolicy
from rclpy.qos import QoSHistoryPolicy
from sensor_msgs.msg import NavSatFix
@launch_pytest.fixture
def launch_description(sitl_dds):
"""Fixture to create the launch description."""
return LaunchDescription(
[
sitl_dds,
launch_pytest.actions.ReadyToTest(),
]
)
class NavSatFixListener(rclpy.node.Node):
"""Subscribe to NavSatFix messages on /ap/navsat/navsat0."""
@ -80,9 +73,51 @@ class NavSatFixListener(rclpy.node.Node):
self.get_logger().info("From AP : False")
@pytest.mark.launch(fixture=launch_description)
def test_navsat_msgs_received(sitl_dds, launch_context):
@launch_pytest.fixture
def launch_sitl_copter_dds_serial(sitl_copter_dds_serial):
"""Fixture to create the launch description."""
sitl_ld, sitl_actions = sitl_copter_dds_serial
ld = LaunchDescription(
[
sitl_ld,
launch_pytest.actions.ReadyToTest(),
]
)
actions = sitl_actions
yield ld, actions
@launch_pytest.fixture
def launch_sitl_copter_dds_udp(sitl_copter_dds_udp):
"""Fixture to create the launch description."""
sitl_ld, sitl_actions = sitl_copter_dds_udp
ld = LaunchDescription(
[
sitl_ld,
launch_pytest.actions.ReadyToTest(),
]
)
actions = sitl_actions
yield ld, actions
@pytest.mark.launch(fixture=launch_sitl_copter_dds_serial)
def test_dds_serial_navsat_msg_recv(launch_context, launch_sitl_copter_dds_serial):
"""Test NavSatFix messages are published by AP_DDS."""
_, actions = launch_sitl_copter_dds_serial
virtual_ports = actions["virtual_ports"].action
micro_ros_agent = actions["micro_ros_agent"].action
mavproxy = actions["mavproxy"].action
sitl = actions["sitl"].action
# Wait for process to start.
process_tools.wait_for_start_sync(launch_context, virtual_ports, timeout=2)
process_tools.wait_for_start_sync(launch_context, micro_ros_agent, timeout=2)
process_tools.wait_for_start_sync(launch_context, mavproxy, timeout=2)
process_tools.wait_for_start_sync(launch_context, sitl, timeout=2)
rclpy.init()
try:
node = NavSatFixListener()
@ -91,8 +126,28 @@ def test_navsat_msgs_received(sitl_dds, launch_context):
assert msgs_received_flag, "Did not receive 'ap/navsat/navsat0' msgs."
finally:
rclpy.shutdown()
yield
# Anything below this line is executed after launch service shutdown.
pass
@pytest.mark.launch(fixture=launch_sitl_copter_dds_udp)
def test_dds_udp_navsat_msg_recv(launch_context, launch_sitl_copter_dds_udp):
"""Test NavSatFix messages are published by AP_DDS."""
_, actions = launch_sitl_copter_dds_udp
micro_ros_agent = actions["micro_ros_agent"].action
mavproxy = actions["mavproxy"].action
sitl = actions["sitl"].action
# Wait for process to start.
process_tools.wait_for_start_sync(launch_context, micro_ros_agent, timeout=2)
process_tools.wait_for_start_sync(launch_context, mavproxy, timeout=2)
process_tools.wait_for_start_sync(launch_context, sitl, timeout=2)
rclpy.init()
try:
node = NavSatFixListener()
node.start_subscriber()
msgs_received_flag = node.msg_event_object.wait(timeout=10.0)
assert msgs_received_flag, "Did not receive 'ap/navsat/navsat0' msgs."
finally:
rclpy.shutdown()
yield

View File

@ -22,20 +22,11 @@ import threading
from launch import LaunchDescription
from launch_pytest.tools import process as process_tools
from builtin_interfaces.msg import Time
@launch_pytest.fixture
def launch_description(sitl_dds):
"""Fixture to create the launch description."""
return LaunchDescription(
[
sitl_dds,
launch_pytest.actions.ReadyToTest(),
]
)
class TimeListener(rclpy.node.Node):
"""Subscribe to Time messages on /ap/clock."""
@ -72,9 +63,51 @@ class TimeListener(rclpy.node.Node):
self.get_logger().info("From AP : False")
@pytest.mark.launch(fixture=launch_description)
def test_time_msgs_received(sitl_dds, launch_context):
"""Test Time messages are published by AP_DDS."""
@launch_pytest.fixture
def launch_sitl_copter_dds_serial(sitl_copter_dds_serial):
"""Fixture to create the launch description."""
sitl_ld, sitl_actions = sitl_copter_dds_serial
ld = LaunchDescription(
[
sitl_ld,
launch_pytest.actions.ReadyToTest(),
]
)
actions = sitl_actions
yield ld, actions
@launch_pytest.fixture
def launch_sitl_copter_dds_udp(sitl_copter_dds_udp):
"""Fixture to create the launch description."""
sitl_ld, sitl_actions = sitl_copter_dds_udp
ld = LaunchDescription(
[
sitl_ld,
launch_pytest.actions.ReadyToTest(),
]
)
actions = sitl_actions
yield ld, actions
@pytest.mark.launch(fixture=launch_sitl_copter_dds_serial)
def test_dds_serial_clock_msg_recv(launch_context, launch_sitl_copter_dds_serial):
"""Test /ap/clock is published by AP_DDS."""
_, actions = launch_sitl_copter_dds_serial
virtual_ports = actions["virtual_ports"].action
micro_ros_agent = actions["micro_ros_agent"].action
mavproxy = actions["mavproxy"].action
sitl = actions["sitl"].action
# Wait for process to start.
process_tools.wait_for_start_sync(launch_context, virtual_ports, timeout=2)
process_tools.wait_for_start_sync(launch_context, micro_ros_agent, timeout=2)
process_tools.wait_for_start_sync(launch_context, mavproxy, timeout=2)
process_tools.wait_for_start_sync(launch_context, sitl, timeout=2)
rclpy.init()
try:
node = TimeListener()
@ -83,8 +116,28 @@ def test_time_msgs_received(sitl_dds, launch_context):
assert msgs_received_flag, "Did not receive 'ROS_Time' msgs."
finally:
rclpy.shutdown()
yield
# Anything below this line is executed after launch service shutdown.
pass
@pytest.mark.launch(fixture=launch_sitl_copter_dds_udp)
def test_dds_udp_clock_msg_recv(launch_context, launch_sitl_copter_dds_udp):
"""Test /ap/clock is published by AP_DDS."""
_, actions = launch_sitl_copter_dds_udp
micro_ros_agent = actions["micro_ros_agent"].action
mavproxy = actions["mavproxy"].action
sitl = actions["sitl"].action
# Wait for process to start.
process_tools.wait_for_start_sync(launch_context, micro_ros_agent, timeout=2)
process_tools.wait_for_start_sync(launch_context, mavproxy, timeout=2)
process_tools.wait_for_start_sync(launch_context, sitl, timeout=2)
rclpy.init()
try:
node = TimeListener()
node.start_subscriber()
msgs_received_flag = node.msg_event_object.wait(timeout=10.0)
assert msgs_received_flag, "Did not receive 'ROS_Time' msgs."
finally:
rclpy.shutdown()
yield

View File

@ -14,123 +14,43 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""Use `socat` to create virtual ports ttyROS0 and ttyRO1 and check they exist."""
import os
import launch_pytest
import pytest
from launch import LaunchDescription
from launch.actions import EmitEvent
from launch.actions import ExecuteProcess
from launch.actions import IncludeLaunchDescription
from launch.actions import LogInfo
from launch.actions import RegisterEventHandler
from launch.event_handlers import OnProcessStart
from launch.event_handlers import OnProcessExit
from launch.events import matches_action
from launch.events.process import ShutdownProcess
from launch.launch_description_sources import PythonLaunchDescriptionSource
from launch.substitutions import PathJoinSubstitution
from launch_pytest.tools import process as process_tools
from launch_ros.substitutions import FindPackageShare
from pathlib import Path
@launch_pytest.fixture
def launch_description(virtual_ports):
"""Launch description fixture."""
vp_ld, vp_actions = virtual_ports
@pytest.fixture()
def dummy_proc():
"""Return a dummy process action to manage test lifetime."""
return ExecuteProcess(
cmd=["echo", "ardupilot_dds_tests"],
shell=True,
cached_output=True,
)
@pytest.fixture()
def virtual_ports(device_dir):
"""Fixture that includes and configures the virtual port launch."""
# Create directory for virtual ports.
print(f"\ntmpdirname: {device_dir}\n")
if not ("dev" in os.listdir(device_dir)):
os.mkdir(Path(device_dir, "dev"))
# Full path to virtual ports.
tty0 = Path(device_dir, "dev", "tty0").resolve()
tty1 = Path(device_dir, "dev", "tty1").resolve()
virtual_ports = IncludeLaunchDescription(
PythonLaunchDescriptionSource(
[
PathJoinSubstitution(
[
FindPackageShare("ardupilot_sitl"),
"launch",
"virtual_ports.launch.py",
]
),
]
),
launch_arguments={
"tty0": str(tty0),
"tty1": str(tty1),
}.items(),
)
return virtual_ports
@launch_pytest.fixture()
def launch_description(dummy_proc, virtual_ports):
"""Fixture to create the launch description."""
on_start = RegisterEventHandler(
event_handler=OnProcessStart(
target_action=dummy_proc,
on_start=[
LogInfo(msg="Test started."),
],
)
)
on_process_exit = RegisterEventHandler(
event_handler=OnProcessExit(
target_action=dummy_proc,
on_exit=[
LogInfo(msg="Test stopping."),
EmitEvent(
event=ShutdownProcess(process_matcher=matches_action(virtual_ports))
),
],
)
)
return LaunchDescription(
ld = LaunchDescription(
[
dummy_proc,
virtual_ports,
on_start,
on_process_exit,
vp_ld,
launch_pytest.actions.ReadyToTest(),
]
)
actions = vp_actions
yield ld, actions
@pytest.mark.launch(fixture=launch_description)
def test_virtual_ports(dummy_proc, virtual_ports, launch_context):
def test_virtual_ports(launch_context, launch_description):
"""Test the virtual ports are present."""
# TODO: check virtial port process for regex:
# "starting data transfer loop"
_, actions = launch_description
virtual_ports = actions["virtual_ports"].action
# Wait for process to start.
process_tools.wait_for_start_sync(launch_context, virtual_ports, timeout=2)
# Assert contents of output to stderr.
def validate_output(output):
assert output.splitlines() == [
"ardupilot_dds_tests"
], "Test process had no output."
assert "N starting data transfer loop" in output, "Test process had no output."
process_tools.assert_output_sync(
launch_context, dummy_proc, validate_output, timeout=5
process_tools.assert_stderr_sync(
launch_context, virtual_ports, validate_output, timeout=5
)
yield
# Anything below this line is executed after launch service shutdown.
assert dummy_proc.return_code == 0

View File

@ -0,0 +1,2 @@
DDS_ENABLE 1
DDS_PORT 2019

View File

@ -25,73 +25,9 @@ Show launch arguments:
ros2 launch ardupilot_sitl mavproxy.launch.py --show-args
"""
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument
from launch.actions import ExecuteProcess
from launch.actions import OpaqueFunction
from launch.substitutions import LaunchConfiguration
from ardupilot_sitl.launch import MAVProxyLaunch
def launch_mavproxy(context, *args, **kwargs):
"""Return a non-interactive MAVProxy process."""
# Declare the command.
command = "mavproxy.py"
# Retrieve launch arguments.
master = LaunchConfiguration("master").perform(context)
# out = LaunchConfiguration("out").perform(context)
sitl = LaunchConfiguration("sitl").perform(context)
# Display launch arguments.
print(f"command: {command}")
print(f"master: {master}")
print(f"sitl: {sitl}")
# Create action.
mavproxy_process = ExecuteProcess(
cmd=[
[
f"{command} ",
"--out ",
"127.0.0.1:14550 ",
"--out ",
"127.0.0.1:14551 ",
f"--master {master} ",
f"--sitl {sitl} ",
"--non-interactive ",
# "--daemon "
]
],
shell=True,
output="both",
respawn=False,
)
launch_processes = [mavproxy_process]
return launch_processes
def generate_launch_description():
def generate_launch_description() -> LaunchDescription:
"""Generate a launch description for MAVProxy."""
# Create launch description.
return LaunchDescription(
[
# Launch arguments.
DeclareLaunchArgument(
"master",
default_value="tcp:127.0.0.1:5760",
description="MAVLink master port and optional baud rate.",
),
DeclareLaunchArgument(
"out",
default_value="127.0.0.1:14550",
description="MAVLink output port and optional baud rate.",
),
DeclareLaunchArgument(
"sitl",
default_value="127.0.0.1:5501",
description="SITL output port.",
),
# Launch function.
OpaqueFunction(function=launch_mavproxy),
]
)
return MAVProxyLaunch.generate_launch_description()

View File

@ -21,108 +21,9 @@ Run with default arguments:
ros2 launch ardupilot_sitl micro_ros_agent.launch.py
"""
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument
from launch.actions import OpaqueFunction
from launch.substitutions import LaunchConfiguration
import launch_ros
from ardupilot_sitl.launch import MicroRosAgentLaunch
def launch_micro_ros_agent(context, *args, **kwargs):
"""Launch the micro_ros_agent node."""
# ROS arguments.
micro_ros_agent_ns = LaunchConfiguration("micro_ros_agent_ns").perform(context)
# Common arguments.
transport = LaunchConfiguration("transport").perform(context)
middleware = LaunchConfiguration("middleware").perform(context)
refs = LaunchConfiguration("refs").perform(context)
verbose = LaunchConfiguration("verbose").perform(context)
# discovery = LaunchConfiguration("discovery").perform(context)
# Serial arguments.
device = LaunchConfiguration("device").perform(context)
baudrate = LaunchConfiguration("baudrate").perform(context)
# Display launch arguments.
print(f"namespace: {micro_ros_agent_ns}")
print(f"transport: {transport}")
print(f"middleware: {middleware}")
print(f"refs: {refs}")
print(f"verbose: {verbose}")
# print(f"discovery: {discovery}")
print(f"baudrate: {baudrate}")
print(f"device: {device}")
# Create action.
micro_ros_agent_node = launch_ros.actions.Node(
package="micro_ros_agent",
namespace=f"{micro_ros_agent_ns}",
executable="micro_ros_agent",
name="micro_ros_agent",
output="both",
arguments=[
{transport},
"--middleware",
{middleware},
"--refs",
{refs},
"--dev",
{device},
"--baudrate",
{baudrate},
],
# additional_env={"PYTHONUNBUFFERED": "1"},
)
launch_processes = [micro_ros_agent_node]
return launch_processes
def generate_launch_description():
def generate_launch_description() -> LaunchDescription:
"""Generate a launch description for the micro_ros_agent."""
return LaunchDescription(
[
# Launch arguments.
DeclareLaunchArgument(
"micro_ros_agent_ns",
default_value="",
description="Set the micro_ros_agent namespace.",
),
DeclareLaunchArgument(
"transport",
default_value="serial",
description="Set the transport.",
choices=["serial"],
),
DeclareLaunchArgument(
"middleware",
default_value="dds",
description="Set the middleware.",
choices=["ced", "dds", "rtps"],
),
DeclareLaunchArgument(
"refs",
default_value="",
description="Set the refs file.",
),
DeclareLaunchArgument(
"verbose",
default_value="",
description="Set the verbosity level.",
),
DeclareLaunchArgument(
"baudrate",
default_value="",
description="Set the baudrate.",
),
DeclareLaunchArgument(
"device",
default_value="",
description="Set the device.",
),
# Launch function.
OpaqueFunction(function=launch_micro_ros_agent),
]
)
return MicroRosAgentLaunch.generate_launch_description()

View File

@ -24,190 +24,10 @@ Show launch arguments:
ros2 launch ardupilot_sitl sitl.launch.py --show-args
"""
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument
from launch.actions import ExecuteProcess
from launch.actions import OpaqueFunction
from launch.substitutions import LaunchConfiguration
from launch.substitutions import PathJoinSubstitution
from launch_ros.substitutions import FindPackageShare
# Labels for the optional uart launch arguments.
uart_labels = ["A", "B", "C", "D", "E", "F", "H", "I", "J"]
max_serial_ports = 10
from ardupilot_sitl.launch import SITLLaunch
def launch_sitl(context, *args, **kwargs):
"""Return a SITL process."""
command = LaunchConfiguration("command").perform(context)
model = LaunchConfiguration("model").perform(context)
speedup = LaunchConfiguration("speedup").perform(context)
slave = LaunchConfiguration("slave").perform(context)
sim_address = LaunchConfiguration("sim_address").perform(context)
instance = LaunchConfiguration("instance").perform(context)
defaults = LaunchConfiguration("defaults").perform(context)
# Display launch arguments.
print(f"command: {command}")
print(f"model: {model}")
print(f"speedup: {speedup}")
print(f"slave: {slave}")
print(f"sim_address: {sim_address}")
print(f"instance: {instance}")
print(f"defaults: {defaults}")
# Required arguments.
cmd_args = [
f"{command} ",
f"--model {model} ",
f"--speedup {speedup} ",
f"--slave {slave} ",
f"--sim-address={sim_address} ",
f"--instance {instance} ",
f"--defaults {defaults} ",
]
# Optional arguments.
wipe = LaunchConfiguration("wipe").perform(context)
if wipe == "True":
cmd_args.append("--wipe ")
print(f"wipe: {wipe}")
synthetic_clock = LaunchConfiguration("synthetic_clock").perform(context)
if synthetic_clock == "True":
cmd_args.append("--synthetic-clock ")
print(f"synthetic_clock: {synthetic_clock}")
home = LaunchConfiguration("home").perform(context)
if home:
cmd_args.append("--home {home} ")
print(f"home: {home}")
# Optional uart arguments.
for label in uart_labels:
arg = LaunchConfiguration(f"uart{label}").perform(context)
if arg:
cmd_args.append(f"--uart{label} {arg} ")
print(f"uart{label}: {arg}")
# Optional serial arguments.
for label in range(10):
arg = LaunchConfiguration(f"serial{label}").perform(context)
if arg:
cmd_args.append(f"--serial{label} {arg} ")
print(f"serial{label}: {arg}")
# Create action.
sitl_process = ExecuteProcess(
cmd=[cmd_args],
shell=True,
output="both",
respawn=False,
)
launch_processes = [sitl_process]
return launch_processes
def generate_launch_description():
"""Generate a launch description for SITL."""
# UART launch arguments.
uart_args = []
for label in uart_labels:
arg = DeclareLaunchArgument(
f"uart{label}",
default_value="",
description=f"set device string for UART{label}.",
)
uart_args.append(arg)
# Serial launch arguments.
serial_args = []
for label in range(max_serial_ports):
arg = DeclareLaunchArgument(
f"serial{label}",
default_value="",
description=f"set device string for SERIAL{label}.",
)
serial_args.append(arg)
return LaunchDescription(
[
# Launch arguments.
DeclareLaunchArgument(
"command",
default_value="arducopter",
description="Run ArduPilot SITL.",
choices=[
"antennatracker",
"arducopter-heli",
"ardurover",
"blimp",
"arducopter",
"arduplane",
"ardusub",
],
),
DeclareLaunchArgument(
"model",
default_value="quad",
description="Set simulation model.",
),
DeclareLaunchArgument(
"slave",
default_value="0",
description="Set the number of JSON slaves.",
),
DeclareLaunchArgument(
"sim_address",
default_value="127.0.0.1",
description="Set address string for simulator.",
),
DeclareLaunchArgument(
"speedup",
default_value="1",
description="Set simulation speedup.",
),
DeclareLaunchArgument(
"instance",
default_value="0",
description="Set instance of SITL "
"(adds 10*instance to all port numbers).",
),
DeclareLaunchArgument(
"defaults",
default_value=PathJoinSubstitution(
[
FindPackageShare("ardupilot_sitl"),
"config",
"default_params",
"copter.parm",
]
),
description="Set path to defaults file.",
),
# Optional launch arguments.
DeclareLaunchArgument(
"wipe",
default_value="False",
description="Wipe eeprom.",
choices=["True", "False"],
),
DeclareLaunchArgument(
"synthetic_clock",
default_value="False",
description="Set synthetic clock mode.",
choices=["True", "False"],
),
DeclareLaunchArgument(
"home",
default_value="",
description="Set start location (lat,lng,alt,yaw) " "or location name.",
),
]
+ uart_args
+ serial_args
+ [OpaqueFunction(function=launch_sitl)]
)
def generate_launch_description() -> LaunchDescription:
"""Generate a launch description for ArduPilot SITL."""
return SITLLaunch.generate_launch_description()

View File

@ -18,7 +18,7 @@ Launch ArduPilot SITL, MAVProxy and the microROS DDS agent.
Run with default arguments:
ros2 launch ardupilot_sitl sitl_dds.launch.py
ros2 launch ardupilot_sitl sitl_dds_serial.launch.py
"""
from launch import LaunchDescription
from launch.actions import IncludeLaunchDescription
@ -28,9 +28,9 @@ from launch.substitutions import PathJoinSubstitution
from launch_ros.substitutions import FindPackageShare
def generate_launch_description():
def generate_launch_description() -> LaunchDescription:
"""Generate a launch description to bring up ArduPilot SITL with DDS."""
# Include component launch files.
# Compose launch files.
virtual_ports = IncludeLaunchDescription(
PythonLaunchDescriptionSource(
[

View File

@ -0,0 +1,80 @@
# Copyright 2023 ArduPilot.org.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Launch ArduPilot SITL, MAVProxy and the microROS DDS agent.
Run with default arguments:
ros2 launch ardupilot_sitl sitl_dds_udp.launch.py
"""
from launch import LaunchDescription
from launch.actions import IncludeLaunchDescription
from launch.launch_description_sources import PythonLaunchDescriptionSource
from launch.substitutions import PathJoinSubstitution
from launch_ros.substitutions import FindPackageShare
def generate_launch_description() -> LaunchDescription:
"""Generate a launch description to bring up ArduPilot SITL with DDS."""
# Compose launch files.
micro_ros_agent = IncludeLaunchDescription(
PythonLaunchDescriptionSource(
[
PathJoinSubstitution(
[
FindPackageShare("ardupilot_sitl"),
"launch",
"micro_ros_agent.launch.py",
]
),
]
)
)
sitl = IncludeLaunchDescription(
PythonLaunchDescriptionSource(
[
PathJoinSubstitution(
[
FindPackageShare("ardupilot_sitl"),
"launch",
"sitl.launch.py",
]
),
]
)
)
mavproxy = IncludeLaunchDescription(
PythonLaunchDescriptionSource(
[
PathJoinSubstitution(
[
FindPackageShare("ardupilot_sitl"),
"launch",
"mavproxy.launch.py",
]
),
]
)
)
return LaunchDescription(
[
micro_ros_agent,
sitl,
mavproxy,
]
)

View File

@ -28,8 +28,9 @@ from launch.substitutions import PathJoinSubstitution
from launch_ros.substitutions import FindPackageShare
def generate_launch_description():
def generate_launch_description() -> LaunchDescription:
"""Generate a launch description for combined SITL and MAVProxy."""
# Compose launch files.
sitl = IncludeLaunchDescription(
PythonLaunchDescriptionSource(
[

View File

@ -21,58 +21,9 @@ Run with default arguments:
ros2 launch ardupilot_sitl virtual_ports.launch.py
"""
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument
from launch.actions import ExecuteProcess
from launch.actions import OpaqueFunction
from launch.substitutions import LaunchConfiguration
from ardupilot_sitl.launch import VirtualPortsLaunch
def launch_socat(context, *args, **kwargs):
"""Launch a process to create virtual ports using `socat`."""
command = "socat"
tty0 = LaunchConfiguration("tty0").perform(context)
tty1 = LaunchConfiguration("tty1").perform(context)
# Display launch arguments.
print(f"command: {command}")
print(f"tty0: {tty0}")
print(f"tty1: {tty1}")
# Create action.
socat_process = ExecuteProcess(
cmd=[
[
"socat ",
"-d -d ",
f"pty,raw,echo=0,link={tty0} ",
f"pty,raw,echo=0,link={tty1} ",
]
],
shell=True,
output="both",
respawn=False,
)
launch_processes = [socat_process]
return launch_processes
def generate_launch_description():
def generate_launch_description() -> LaunchDescription:
"""Generate a launch description for creating virtual ports using `socat`."""
return LaunchDescription(
[
# Launch arguments.
DeclareLaunchArgument(
"tty0",
default_value="tty0",
description="Symbolic link name for tty0.",
),
DeclareLaunchArgument(
"tty1",
default_value="tty1",
description="Symbolic link name for tty1.",
),
# Launch function.
OpaqueFunction(function=launch_socat),
]
)
return VirtualPortsLaunch.generate_launch_description()

View File

@ -0,0 +1,107 @@
# Copyright 2023 ArduPilot.org.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# Adapted from launch.actions.opaque_function.py
# Copyright 2018 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Module for the OpaqueFunction action."""
import collections.abc
from typing import Any
from typing import Callable
from typing import Dict
from typing import Iterable
from typing import List
from typing import Optional
from typing import Text
from launch.action import Action
from launch.launch_context import LaunchContext
from launch.launch_description_entity import LaunchDescriptionEntity
from launch.utilities import ensure_argument_type
class ExecuteFunction(Action):
"""
Action that executes a Python function.
The signature of the function should be:
.. code-block:: python
def function(
context: LaunchContext,
*args,
**kwargs
) -> Optional[Action]:
...
"""
def __init__(
self,
*,
function: Callable,
args: Optional[Iterable[Any]] = None,
kwargs: Optional[Dict[Text, Any]] = None,
**left_over_kwargs
) -> None:
"""Create an ExecuteFunction action."""
super().__init__(**left_over_kwargs)
if not callable(function):
raise TypeError(
"ExecuteFunction expected a callable for 'function', got '{}'".format(
type(function)
)
)
ensure_argument_type(
args, (collections.abc.Iterable, type(None)), "args", "ExecuteFunction"
)
ensure_argument_type(kwargs, (dict, type(None)), "kwargs", "ExecuteFunction")
self.__function = function
self.__args = [] # type: Iterable
if args is not None:
self.__args = args
self.__kwargs = {} # type: Dict[Text, Any]
if kwargs is not None:
self.__kwargs = kwargs
self.__action = None
@property
def action(self) -> Action:
"""Return the Action obtained by executing the function."""
return self.__action
def execute(
self, context: LaunchContext
) -> Optional[List[LaunchDescriptionEntity]]:
"""Execute the function."""
action = self.__function(context, *self.__args, **self.__kwargs)
self.__action = action
return [action]

View File

@ -0,0 +1,657 @@
# Copyright 2023 ArduPilot.org.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""Launch actions for ArduPilot."""
from typing import List
from typing import Dict
from typing import Text
from typing import Tuple
from launch import LaunchContext
from launch import LaunchDescription
from launch.actions import ExecuteProcess
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration
from launch.substitutions import PathJoinSubstitution
from launch_ros.actions import Node
from launch_ros.substitutions import FindPackageShare
from .actions import ExecuteFunction
class VirtualPortsLaunch:
"""Launch functions for creating virtual ports using `socat`."""
@staticmethod
def generate_action(context: LaunchContext, *args, **kwargs) -> ExecuteProcess:
"""Generate a launch action."""
# Declare commands.
command = "socat"
# Retrieve launch arguments.
tty0 = LaunchConfiguration("tty0").perform(context)
tty1 = LaunchConfiguration("tty1").perform(context)
# Display launch arguments.
print(f"command: {command}")
print(f"tty0: {tty0}")
print(f"tty1: {tty1}")
# Create action.
action = ExecuteProcess(
cmd=[
[
"socat ",
"-d -d ",
f"pty,raw,echo=0,link={tty0} ",
f"pty,raw,echo=0,link={tty1} ",
]
],
shell=True,
output="both",
respawn=False,
cached_output=True,
)
return action
@staticmethod
def generate_launch_description_with_actions() -> (
Tuple[LaunchDescription, Dict[Text, ExecuteFunction]]
):
"""Generate a launch description with actions."""
launch_arguments = VirtualPortsLaunch.generate_launch_arguments()
action = ExecuteFunction(function=VirtualPortsLaunch.generate_action)
ld = LaunchDescription(
launch_arguments
+ [
action,
]
)
actions = {
"virtual_ports": action,
}
return ld, actions
@staticmethod
def generate_launch_description() -> LaunchDescription:
"""Generate a launch description."""
ld, _ = VirtualPortsLaunch.generate_launch_description_with_actions()
return ld
@staticmethod
def generate_launch_arguments() -> List[DeclareLaunchArgument]:
"""Generate a list of launch arguments."""
return [
DeclareLaunchArgument(
"tty0",
default_value="tty0",
description="Symbolic link name for tty0.",
),
DeclareLaunchArgument(
"tty1",
default_value="tty1",
description="Symbolic link name for tty1.",
),
]
class MicroRosAgentLaunch:
"""Launch functions for the micro ROS agent node."""
@staticmethod
def generate_action(context: LaunchContext, *args, **kwargs) -> ExecuteProcess:
"""Launch the micro_ros_agent node."""
# ROS arguments.
micro_ros_agent_ns = LaunchConfiguration("micro_ros_agent_ns").perform(context)
# Common arguments.
transport = LaunchConfiguration("transport").perform(context)
middleware = LaunchConfiguration("middleware").perform(context)
verbose = LaunchConfiguration("verbose").perform(context)
discovery = LaunchConfiguration("discovery").perform(context)
# IPvX arguments.
port = LaunchConfiguration("port").perform(context)
# Serial arguments.
device = LaunchConfiguration("device").perform(context)
baudrate = LaunchConfiguration("baudrate").perform(context)
# Display launch arguments.
print(f"namespace: {micro_ros_agent_ns}")
print(f"transport: {transport}")
print(f"middleware: {middleware}")
print(f"verbose: {verbose}")
print(f"discovery: {discovery}")
# Required arguments
args = [
transport,
"--middleware",
middleware,
]
if transport in ["udp4", "udp6", "tcp4", "tcp6"]:
# IPvX arguments
port = LaunchConfiguration("port").perform(context)
args.append("--port")
args.append(port)
print(f"port: {port}")
elif transport in ["serial", "multiserial", "pseudoterminal"]:
# Serial arguments
baudrate = LaunchConfiguration("baudrate").perform(context)
args.append("--baudrate")
args.append(baudrate)
print(f"baudrate: {baudrate}")
device = LaunchConfiguration("device").perform(context)
args.append("--dev")
args.append(device)
print(f"device: {device}")
else:
# transport must be canfd
pass
# Optional arguments.
refs = LaunchConfiguration("refs").perform(context)
if refs:
args.append("--refs")
args.append(refs)
print(f"refs: {refs}")
# Create action.
node = Node(
package="micro_ros_agent",
executable="micro_ros_agent",
name="micro_ros_agent",
namespace=f"{micro_ros_agent_ns}",
output="both",
arguments=args,
)
return node
@staticmethod
def generate_launch_description_with_actions() -> (
Tuple[LaunchDescription, Dict[Text, ExecuteFunction]]
):
"""Generate a launch description with actions."""
launch_arguments = MicroRosAgentLaunch.generate_launch_arguments()
action = ExecuteFunction(function=MicroRosAgentLaunch.generate_action)
ld = LaunchDescription(
launch_arguments
+ [
action,
]
)
actions = {
"micro_ros_agent": action,
}
return ld, actions
@staticmethod
def generate_launch_description() -> LaunchDescription:
"""Generate a launch description."""
ld, _ = MicroRosAgentLaunch.generate_launch_description_with_actions()
return ld
@staticmethod
def generate_launch_arguments() -> List[DeclareLaunchArgument]:
"""Generate a list of launch arguments."""
return [
DeclareLaunchArgument(
"micro_ros_agent_ns",
default_value="",
description="Set the micro_ros_agent namespace.",
),
DeclareLaunchArgument(
"transport",
default_value="udp4",
description="Set the transport.",
choices=[
"udp4",
"udp6",
"tcp4",
"tcp6",
"canfd",
"serial",
"multiserial",
"pseudoterminal",
],
),
DeclareLaunchArgument(
"middleware",
default_value="dds",
description="Set the middleware.",
choices=["ced", "dds", "rtps"],
),
DeclareLaunchArgument(
"refs",
default_value="",
description="Set the refs file.",
),
DeclareLaunchArgument(
"verbose",
default_value="4",
description="Set the verbosity level.",
choices=["0", "1", "2", "3", "4", "5", "6"],
),
DeclareLaunchArgument(
"discovery",
default_value="7400",
description="Set the dsicovery port.",
),
DeclareLaunchArgument(
"port",
default_value="2019",
description="Set the port number.",
),
DeclareLaunchArgument(
"baudrate",
default_value="115200",
description="Set the baudrate.",
),
DeclareLaunchArgument(
"device",
default_value="",
description="Set the device.",
),
]
class MAVProxyLaunch:
"""Launch functions for MAVProxy."""
@staticmethod
def generate_action(context: LaunchContext, *args, **kwargs) -> ExecuteProcess:
"""Return a non-interactive MAVProxy process."""
# Declare the command.
command = "mavproxy.py"
# Retrieve launch arguments.
master = LaunchConfiguration("master").perform(context)
# out = LaunchConfiguration("out").perform(context)
sitl = LaunchConfiguration("sitl").perform(context)
# Display launch arguments.
print(f"command: {command}")
print(f"master: {master}")
print(f"sitl: {sitl}")
# Create action.
mavproxy_process = ExecuteProcess(
cmd=[
[
f"{command} ",
"--out ",
"127.0.0.1:14550 ",
"--out ",
"127.0.0.1:14551 ",
f"--master {master} ",
f"--sitl {sitl} ",
"--non-interactive ",
]
],
shell=True,
output="both",
respawn=False,
)
return mavproxy_process
@staticmethod
def generate_launch_description_with_actions() -> (
Tuple[LaunchDescription, Dict[Text, ExecuteFunction]]
):
"""Generate a launch description for MAVProxy."""
launch_arguments = MAVProxyLaunch.generate_launch_arguments()
action = ExecuteFunction(function=MAVProxyLaunch.generate_action)
ld = LaunchDescription(
launch_arguments
+ [
action,
]
)
actions = {
"mavproxy": action,
}
return ld, actions
@staticmethod
def generate_launch_description() -> LaunchDescription:
"""Generate a launch description."""
ld, _ = MAVProxyLaunch.generate_launch_description_with_actions()
return ld
@staticmethod
def generate_launch_arguments() -> List[DeclareLaunchArgument]:
"""Generate a list of launch arguments."""
return [
DeclareLaunchArgument(
"master",
default_value="tcp:127.0.0.1:5760",
description="MAVLink master port and optional baud rate.",
),
DeclareLaunchArgument(
"out",
default_value="127.0.0.1:14550",
description="MAVLink output port and optional baud rate.",
),
DeclareLaunchArgument(
"sitl",
default_value="127.0.0.1:5501",
description="SITL output port.",
),
]
class SITLLaunch:
"""Launch functions for ArduPilot SITL."""
# Labels for the optional uart launch arguments.
UART_LABELS = ["A", "B", "C", "D", "E", "F", "H", "I", "J"]
MAX_SERIAL_PORTS = 10
@staticmethod
def generate_action(context: LaunchContext, *args, **kwargs) -> ExecuteProcess:
"""Return a SITL process."""
# Retrieve launch arguments.
command = LaunchConfiguration("command").perform(context)
model = LaunchConfiguration("model").perform(context)
speedup = LaunchConfiguration("speedup").perform(context)
slave = LaunchConfiguration("slave").perform(context)
sim_address = LaunchConfiguration("sim_address").perform(context)
instance = LaunchConfiguration("instance").perform(context)
defaults = LaunchConfiguration("defaults").perform(context)
# Display launch arguments.
print(f"command: {command}")
print(f"model: {model}")
print(f"speedup: {speedup}")
print(f"slave: {slave}")
print(f"sim_address: {sim_address}")
print(f"instance: {instance}")
print(f"defaults: {defaults}")
# Required arguments.
cmd_args = [
f"{command} ",
f"--model {model} ",
f"--speedup {speedup} ",
f"--slave {slave} ",
f"--sim-address={sim_address} ",
f"--instance {instance} ",
f"--defaults {defaults} ",
]
# Optional arguments.
wipe = LaunchConfiguration("wipe").perform(context)
if wipe == "True":
cmd_args.append("--wipe ")
print(f"wipe: {wipe}")
synthetic_clock = LaunchConfiguration("synthetic_clock").perform(context)
if synthetic_clock == "True":
cmd_args.append("--synthetic-clock ")
print(f"synthetic_clock: {synthetic_clock}")
home = LaunchConfiguration("home").perform(context)
if home:
cmd_args.append(f"--home {home} ")
print(f"home: {home}")
# Optional uart arguments.
for label in SITLLaunch.UART_LABELS:
arg = LaunchConfiguration(f"uart{label}").perform(context)
if arg:
cmd_args.append(f"--uart{label} {arg} ")
print(f"uart{label}: {arg}")
# Optional serial arguments.
for label in range(10):
arg = LaunchConfiguration(f"serial{label}").perform(context)
if arg:
cmd_args.append(f"--serial{label} {arg} ")
print(f"serial{label}: {arg}")
rate = LaunchConfiguration("rate").perform(context)
if rate:
cmd_args.append(f"--rate {rate} ")
print(f"rate: {rate}")
gimbal = LaunchConfiguration("gimbal").perform(context)
if gimbal:
cmd_args.append(f"--gimbal {gimbal} ")
print(f"gimbal: {gimbal}")
base_port = LaunchConfiguration("base_port").perform(context)
if base_port:
cmd_args.append(f"--base-port {base_port} ")
print(f"base_port: {base_port}")
rc_in_port = LaunchConfiguration("rc_in_port").perform(context)
if rc_in_port:
cmd_args.append(f"--rc-in-port {rc_in_port} ")
print(f"rc_in_port: {rc_in_port}")
sim_port_in = LaunchConfiguration("sim_port_in").perform(context)
if sim_port_in:
cmd_args.append(f"--sim-port-in {sim_port_in} ")
print(f"sim_port_in: {sim_port_in}")
sim_port_out = LaunchConfiguration("sim_port_out").perform(context)
if sim_port_out:
cmd_args.append(f"--sim-port-out {sim_port_out} ")
print(f"sim_port_out: {sim_port_out}")
irlock_port = LaunchConfiguration("irlock_port").perform(context)
if irlock_port:
cmd_args.append(f"--irlock-port {irlock_port} ")
print(f"irlock_port: {irlock_port}")
start_time = LaunchConfiguration("start_time").perform(context)
if start_time:
cmd_args.append(f"--start-time {start_time} ")
print(f"start_time: {start_time}")
sysid = LaunchConfiguration("sysid").perform(context)
if sysid:
cmd_args.append(f"--sysid {sysid} ")
print(f"sysid: {sysid}")
# Create action.
sitl_process = ExecuteProcess(
cmd=[cmd_args],
shell=True,
output="both",
respawn=False,
)
return sitl_process
@staticmethod
def generate_launch_description_with_actions() -> (
Tuple[LaunchDescription, Dict[Text, ExecuteFunction]]
):
"""Generate a launch description for SITL."""
launch_arguments = SITLLaunch.generate_launch_arguments()
action = ExecuteFunction(function=SITLLaunch.generate_action)
ld = LaunchDescription(
launch_arguments
+ [
action,
]
)
actions = {
"sitl": action,
}
return ld, actions
@staticmethod
def generate_launch_description() -> LaunchDescription:
"""Generate a launch description."""
ld, _ = SITLLaunch.generate_launch_description_with_actions()
return ld
@staticmethod
def generate_launch_arguments() -> List[DeclareLaunchArgument]:
"""Generate a list of launch arguments."""
launch_args = [
# Required launch arguments.
DeclareLaunchArgument(
"command",
default_value="arducopter",
description="Run ArduPilot SITL.",
choices=[
"antennatracker",
"arducopter-heli",
"ardurover",
"blimp",
"arducopter",
"arduplane",
"ardusub",
],
),
DeclareLaunchArgument(
"model",
default_value="quad",
description="Set simulation model.",
),
DeclareLaunchArgument(
"slave",
default_value="0",
description="Set the number of JSON slaves.",
),
DeclareLaunchArgument(
"sim_address",
default_value="127.0.0.1",
description="Set address string for simulator.",
),
DeclareLaunchArgument(
"speedup",
default_value="1",
description="Set simulation speedup.",
),
DeclareLaunchArgument(
"instance",
default_value="0",
description="Set instance of SITL "
"(adds 10*instance to all port numbers).",
),
DeclareLaunchArgument(
"defaults",
default_value=PathJoinSubstitution(
[
FindPackageShare("ardupilot_sitl"),
"config",
"default_params",
"copter.parm",
]
),
description="Set path to defaults file.",
),
# Optional launch arguments.
DeclareLaunchArgument(
"wipe",
default_value="False",
description="Wipe eeprom.",
choices=["True", "False"],
),
DeclareLaunchArgument(
"synthetic_clock",
default_value="False",
description="Set synthetic clock mode.",
choices=["True", "False"],
),
DeclareLaunchArgument(
"home",
default_value="",
description="Set start location (lat,lng,alt,yaw) or location name.",
),
DeclareLaunchArgument(
"rate",
default_value="",
description="Set SITL framerate.",
),
DeclareLaunchArgument(
"gimbal",
default_value="",
description="Enable simulated MAVLink gimbal.",
),
DeclareLaunchArgument(
"base_port",
default_value="",
description="Set port num for base port(default 5670) "
"must be before -I option.",
),
DeclareLaunchArgument(
"rc_in_port",
default_value="",
description="Set port num for rc in.",
),
DeclareLaunchArgument(
"sim_port_in",
default_value="",
description="Set port num for simulator in.",
),
DeclareLaunchArgument(
"sim_port_out",
default_value="",
description="Set port num for simulator out.",
),
DeclareLaunchArgument(
"irlock_port",
default_value="",
description="Set port num for irlock.",
),
DeclareLaunchArgument(
"start_time",
default_value="",
description="Set simulation start time in UNIX timestamp.",
),
DeclareLaunchArgument(
"sysid",
default_value="",
description="Set SYSID_THISMAV.",
),
]
# UART launch arguments.
uart_args = []
for label in SITLLaunch.UART_LABELS:
arg = DeclareLaunchArgument(
f"uart{label}",
default_value="",
description=f"set device string for UART{label}.",
)
uart_args.append(arg)
# Serial launch arguments.
serial_args = []
for label in range(SITLLaunch.MAX_SERIAL_PORTS):
arg = DeclareLaunchArgument(
f"serial{label}",
default_value="",
description=f"set device string for SERIAL{label}.",
)
serial_args.append(arg)
return launch_args + uart_args + serial_args

View File

@ -0,0 +1,37 @@
# Copyright 2023 ArduPilot.org.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""Utilities."""
from functools import wraps
from typing import List
from launch import LaunchContext
from launch import LaunchDescriptionEntity
# Adapted from a SO answer by David Wolever.
# Is there a library function in Python to turn a generator-function
# into a function returning a list?
# https://stackoverflow.com/questions/12377013
def listify(fn) -> List[LaunchDescriptionEntity]:
"""Wrap a functions's return value in a list."""
@wraps(fn)
def listify_helper(
context: LaunchContext, *args, **kwargs
) -> List[LaunchDescriptionEntity]:
return [fn(context, args, kwargs)]
return listify_helper