diff --git a/Tools/ros2/README.md b/Tools/ros2/README.md
index 7cf44f7e5c..bdd055e3f1 100644
--- a/Tools/ros2/README.md
+++ b/Tools/ros2/README.md
@@ -187,7 +187,7 @@ ros2 run micro_ros_agent micro_ros_agent serial --baudrate 115200 --dev ./dev/tt
```
```bash
-arducopter --synthetic-clock --wipe --model quad --speedup 1 --slave 0 --instance 0 --uartC uart:./dev/ttyROS1 --defaults $(ros2 pkg prefix ardupilot_sitl)/share/ardupilot_sitl/config/default_params/copter.parm,$(ros2 pkg prefix ardupilot_sitl)/share/ardupilot_sitl/config/default_params/dds.parm --sim-address 127.0.0.1
+arducopter --synthetic-clock --wipe --model quad --speedup 1 --slave 0 --instance 0 --uartC uart:./dev/ttyROS1 --defaults $(ros2 pkg prefix ardupilot_sitl)/share/ardupilot_sitl/config/default_params/copter.parm,$(ros2 pkg prefix ardupilot_sitl)/share/ardupilot_sitl/config/default_params/dds_serial.parm --sim-address 127.0.0.1
```
```bash
@@ -201,11 +201,11 @@ ros2 launch ardupilot_sitl virtual_ports.launch.py tty0:=./dev/ttyROS0 tty1:=./d
```
```bash
-ros2 launch ardupilot_sitl micro_ros_agent.launch.py refs:=$(ros2 pkg prefix ardupilot_sitl)/share/ardupilot_sitl/config/dds_xrce_profile.xml baudrate:=115200 device:=./dev/ttyROS0
+ros2 launch ardupilot_sitl micro_ros_agent.launch.py transport:=serial refs:=$(ros2 pkg prefix ardupilot_sitl)/share/ardupilot_sitl/config/dds_xrce_profile.xml baudrate:=115200 device:=./dev/ttyROS0
```
```bash
-ros2 launch ardupilot_sitl sitl.launch.py synthetic_clock:=True wipe:=True model:=quad speedup:=1 slave:=0 instance:=0 uartC:=uart:./dev/ttyROS1 defaults:=$(ros2 pkg prefix ardupilot_sitl)/share/ardupilot_sitl/config/default_params/copter.parm,$(ros2 pkg prefix ardupilot_sitl)/share/ardupilot_sitl/config/default_params/dds.parm sim_address:=127.0.0.1
+ros2 launch ardupilot_sitl sitl.launch.py synthetic_clock:=True wipe:=True model:=quad speedup:=1 slave:=0 instance:=0 uartC:=uart:./dev/ttyROS1 defaults:=$(ros2 pkg prefix ardupilot_sitl)/share/ardupilot_sitl/config/default_params/copter.parm,$(ros2 pkg prefix ardupilot_sitl)/share/ardupilot_sitl/config/default_params/dds_serial.parm sim_address:=127.0.0.1
```
```bash
@@ -215,11 +215,12 @@ ros2 launch ardupilot_sitl mavproxy.launch.py master:=tcp:127.0.0.1:5760 sitl:=1
Using combined launch file
```bash
-ros2 launch ardupilot_sitl sitl_dds.launch.py \
+ros2 launch ardupilot_sitl sitl_dds_serial.launch.py \
\
tty0:=./dev/ttyROS0 \
tty1:=./dev/ttyROS1 \
\
+transport:=serial \
refs:=$(ros2 pkg prefix ardupilot_sitl)/share/ardupilot_sitl/config/dds_xrce_profile.xml \
baudrate:=115200 \
device:=./dev/ttyROS0 \
@@ -231,9 +232,15 @@ speedup:=1 \
slave:=0 \
instance:=0 \
uartC:=uart:./dev/ttyROS1 \
-defaults:=$(ros2 pkg prefix ardupilot_sitl)/share/ardupilot_sitl/config/default_params/copter.parm,$(ros2 pkg prefix ardupilot_sitl)/share/ardupilot_sitl/config/default_params/dds.parm \
+defaults:=$(ros2 pkg prefix ardupilot_sitl)/share/ardupilot_sitl/config/default_params/copter.parm,$(ros2 pkg prefix ardupilot_sitl)/share/ardupilot_sitl/config/default_params/dds_serial.parm \
sim_address:=127.0.0.1 \
\
master:=tcp:127.0.0.1:5760 \
sitl:=127.0.0.1:5501
```
+
+UDP version
+
+```
+ros2 launch ardupilot_sitl sitl_dds_udp.launch.py transport:=udp4 refs:=$(ros2 pkg prefix ardupilot_sitl)/share/ardupilot_sitl/config/dds_xrce_profile.xml synthetic_clock:=True wipe:=False model:=quad speedup:=1 slave:=0 instance:=0 defaults:=$(ros2 pkg prefix ardupilot_sitl)/share/ardupilot_sitl/config/default_params/copter.parm,$(ros2 pkg prefix ardupilot_sitl)/share/ardupilot_sitl/config/default_params/dds_udp.parm sim_address:=127.0.0.1 master:=tcp:127.0.0.1:5760 sitl:=127.0.0.1:5501
+```
diff --git a/Tools/ros2/ardupilot_dds_tests/package.xml b/Tools/ros2/ardupilot_dds_tests/package.xml
index 64e6670881..dacbbe61e1 100644
--- a/Tools/ros2/ardupilot_dds_tests/package.xml
+++ b/Tools/ros2/ardupilot_dds_tests/package.xml
@@ -8,6 +8,7 @@
GLP-3.0
ament_index_python
+ ardupilot_sitl
launch
launch_pytest
launch_ros
@@ -18,6 +19,7 @@
ament_copyright
ament_flake8
ament_pep257
+ ardupilot_sitl
launch
launch_pytest
launch_ros
diff --git a/Tools/ros2/ardupilot_dds_tests/test/ardupilot_dds_tests/conftest.py b/Tools/ros2/ardupilot_dds_tests/test/ardupilot_dds_tests/conftest.py
index 7fa92b7e6c..66a71779fa 100644
--- a/Tools/ros2/ardupilot_dds_tests/test/ardupilot_dds_tests/conftest.py
+++ b/Tools/ros2/ardupilot_dds_tests/test/ardupilot_dds_tests/conftest.py
@@ -20,65 +20,60 @@ import pytest
from ament_index_python.packages import get_package_share_directory
from launch import LaunchDescription
+from launch import LaunchDescriptionSource
from launch.actions import IncludeLaunchDescription
-from launch.launch_description_sources import PythonLaunchDescriptionSource
from launch.substitutions import PathJoinSubstitution
from launch_ros.substitutions import FindPackageShare
from pathlib import Path
+from ardupilot_sitl.launch import VirtualPortsLaunch
+from ardupilot_sitl.launch import MicroRosAgentLaunch
+from ardupilot_sitl.launch import MAVProxyLaunch
+from ardupilot_sitl.launch import SITLLaunch
+
@pytest.fixture(scope="module")
def device_dir(tmp_path_factory):
"""Fixture to create a temporary directory for devices."""
path = tmp_path_factory.mktemp("devices")
- return path
+
+ # Create dev directory for virtual ports.
+ os.mkdir(Path(path, "dev"))
+
+ yield path
@pytest.fixture
-def sitl_dds(device_dir):
- """Fixture to bring up ArduPilot SITL DDS."""
- # Create directory for virtual ports.
- print(f"\ntmpdirname: {device_dir}\n")
- if not ("dev" in os.listdir(device_dir)):
- os.mkdir(Path(device_dir, "dev"))
-
- # Full path to virtual ports.
+def virtual_ports(device_dir):
+ """Fixture to create virtual ports."""
tty0 = Path(device_dir, "dev", "tty0").resolve()
tty1 = Path(device_dir, "dev", "tty1").resolve()
- virtual_ports = IncludeLaunchDescription(
- PythonLaunchDescriptionSource(
- [
- PathJoinSubstitution(
- [
- FindPackageShare("ardupilot_sitl"),
- "launch",
- "virtual_ports.launch.py",
- ]
- ),
- ]
- ),
+ vp_ld, vp_actions = VirtualPortsLaunch.generate_launch_description_with_actions()
+
+ ld = IncludeLaunchDescription(
+ LaunchDescriptionSource(vp_ld),
launch_arguments={
"tty0": str(tty0),
"tty1": str(tty1),
}.items(),
)
+ yield ld, vp_actions
- micro_ros_agent = IncludeLaunchDescription(
- PythonLaunchDescriptionSource(
- [
- PathJoinSubstitution(
- [
- FindPackageShare("ardupilot_sitl"),
- "launch",
- "micro_ros_agent.launch.py",
- ]
- ),
- ]
- ),
+
+@pytest.fixture
+def micro_ros_agent_serial(device_dir):
+ """Fixture to create a micro_ros_agent node."""
+ tty0 = Path(device_dir, "dev", "tty0").resolve()
+
+ mra_ld, mra_actions = MicroRosAgentLaunch.generate_launch_description_with_actions()
+
+ ld = IncludeLaunchDescription(
+ LaunchDescriptionSource(mra_ld),
launch_arguments={
+ "transport": "serial",
"refs": PathJoinSubstitution(
[
FindPackageShare("ardupilot_sitl"),
@@ -90,23 +85,62 @@ def sitl_dds(device_dir):
"device": str(tty0),
}.items(),
)
+ yield ld, mra_actions
- sitl = IncludeLaunchDescription(
- PythonLaunchDescriptionSource(
- [
- PathJoinSubstitution(
- [
- FindPackageShare("ardupilot_sitl"),
- "launch",
- "sitl.launch.py",
- ]
- ),
- ]
- ),
+
+@pytest.fixture
+def micro_ros_agent_udp():
+ """Fixture to create a micro_ros_agent node."""
+ mra_ld, mra_actions = MicroRosAgentLaunch.generate_launch_description_with_actions()
+
+ ld = IncludeLaunchDescription(
+ LaunchDescriptionSource(mra_ld),
+ launch_arguments={
+ "transport": "udp4",
+ "refs": PathJoinSubstitution(
+ [
+ FindPackageShare("ardupilot_sitl"),
+ "config",
+ "dds_xrce_profile.xml",
+ ]
+ ),
+ }.items(),
+ )
+ yield ld, mra_actions
+
+
+@pytest.fixture
+def mavproxy():
+ """Fixture to bring up MAVProxy."""
+ mp_ld, mp_actions = MAVProxyLaunch.generate_launch_description_with_actions()
+
+ ld = IncludeLaunchDescription(
+ LaunchDescriptionSource(mp_ld),
+ launch_arguments={
+ "master": "tcp:127.0.0.1:5760",
+ "sitl": "127.0.0.1:5501",
+ }.items(),
+ )
+ yield ld, mp_actions
+
+
+@pytest.fixture
+def sitl_copter_dds_serial(device_dir, virtual_ports, micro_ros_agent_serial, mavproxy):
+ """Fixture to bring up ArduPilot SITL DDS."""
+ tty1 = Path(device_dir, "dev", "tty1").resolve()
+
+ vp_ld, vp_actions = virtual_ports
+ mra_ld, mra_actions = micro_ros_agent_serial
+ mp_ld, mp_actions = mavproxy
+ sitl_ld, sitl_actions = SITLLaunch.generate_launch_description_with_actions()
+
+ sitl_ld_args = IncludeLaunchDescription(
+ LaunchDescriptionSource(sitl_ld),
launch_arguments={
"command": "arducopter",
"synthetic_clock": "True",
- "wipe": "True",
+ # "wipe": "True",
+ "wipe": "False",
"model": "quad",
"speedup": "10",
"slave": "0",
@@ -126,35 +160,75 @@ def sitl_dds(device_dir):
get_package_share_directory("ardupilot_sitl"),
"config",
"default_params",
- "dds.parm",
+ "dds_serial.parm",
)
),
}.items(),
)
- mavproxy = IncludeLaunchDescription(
- PythonLaunchDescriptionSource(
- [
- PathJoinSubstitution(
- [
- FindPackageShare("ardupilot_sitl"),
- "launch",
- "mavproxy.launch.py",
- ]
- ),
- ]
- ),
+ ld = LaunchDescription(
+ [
+ vp_ld,
+ mra_ld,
+ mp_ld,
+ sitl_ld_args,
+ ]
+ )
+ actions = {}
+ actions.update(vp_actions)
+ actions.update(mra_actions)
+ actions.update(mp_actions)
+ actions.update(sitl_actions)
+ yield ld, actions
+
+
+@pytest.fixture
+def sitl_copter_dds_udp(micro_ros_agent_udp, mavproxy):
+ """Fixture to bring up ArduPilot SITL DDS."""
+ mra_ld, mra_actions = micro_ros_agent_udp
+ mp_ld, mp_actions = mavproxy
+ sitl_ld, sitl_actions = SITLLaunch.generate_launch_description_with_actions()
+
+ sitl_ld_args = IncludeLaunchDescription(
+ LaunchDescriptionSource(sitl_ld),
launch_arguments={
- "master": "tcp:127.0.0.1:5760",
- "sitl": "127.0.0.1:5501",
+ "command": "arducopter",
+ "synthetic_clock": "True",
+ # "wipe": "True",
+ "wipe": "False",
+ "model": "quad",
+ "speedup": "10",
+ "slave": "0",
+ "instance": "0",
+ "defaults": str(
+ Path(
+ get_package_share_directory("ardupilot_sitl"),
+ "config",
+ "default_params",
+ "copter.parm",
+ )
+ )
+ + ","
+ + str(
+ Path(
+ get_package_share_directory("ardupilot_sitl"),
+ "config",
+ "default_params",
+ "dds_udp.parm",
+ )
+ ),
}.items(),
)
- return LaunchDescription(
+ ld = LaunchDescription(
[
- virtual_ports,
- micro_ros_agent,
- sitl,
- mavproxy,
+ mra_ld,
+ mp_ld,
+ sitl_ld_args,
]
)
+ actions = {}
+ actions.update(mra_actions)
+ actions.update(mp_actions)
+ actions.update(sitl_actions)
+ yield ld, actions
diff --git a/Tools/ros2/ardupilot_dds_tests/test/ardupilot_dds_tests/test_navsat_msg_received.py b/Tools/ros2/ardupilot_dds_tests/test/ardupilot_dds_tests/test_navsat_msg_received.py
index a9909ecaed..d4f78f80ff 100644
--- a/Tools/ros2/ardupilot_dds_tests/test/ardupilot_dds_tests/test_navsat_msg_received.py
+++ b/Tools/ros2/ardupilot_dds_tests/test/ardupilot_dds_tests/test_navsat_msg_received.py
@@ -22,22 +22,15 @@ import threading
from launch import LaunchDescription
-from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy
+from launch_pytest.tools import process as process_tools
+
+from rclpy.qos import QoSProfile
+from rclpy.qos import QoSReliabilityPolicy
+from rclpy.qos import QoSHistoryPolicy
from sensor_msgs.msg import NavSatFix
-@launch_pytest.fixture
-def launch_description(sitl_dds):
- """Fixture to create the launch description."""
- return LaunchDescription(
- [
- sitl_dds,
- launch_pytest.actions.ReadyToTest(),
- ]
- )
-
-
class NavSatFixListener(rclpy.node.Node):
"""Subscribe to NavSatFix messages on /ap/navsat/navsat0."""
@@ -80,9 +73,51 @@ class NavSatFixListener(rclpy.node.Node):
self.get_logger().info("From AP : False")
-@pytest.mark.launch(fixture=launch_description)
-def test_navsat_msgs_received(sitl_dds, launch_context):
+@launch_pytest.fixture
+def launch_sitl_copter_dds_serial(sitl_copter_dds_serial):
+ """Fixture to create the launch description."""
+ sitl_ld, sitl_actions = sitl_copter_dds_serial
+
+ ld = LaunchDescription(
+ [
+ sitl_ld,
+ launch_pytest.actions.ReadyToTest(),
+ ]
+ )
+ actions = sitl_actions
+ yield ld, actions
+
+
+@launch_pytest.fixture
+def launch_sitl_copter_dds_udp(sitl_copter_dds_udp):
+ """Fixture to create the launch description."""
+ sitl_ld, sitl_actions = sitl_copter_dds_udp
+
+ ld = LaunchDescription(
+ [
+ sitl_ld,
+ launch_pytest.actions.ReadyToTest(),
+ ]
+ )
+ actions = sitl_actions
+ yield ld, actions
+
+
+@pytest.mark.launch(fixture=launch_sitl_copter_dds_serial)
+def test_dds_serial_navsat_msg_recv(launch_context, launch_sitl_copter_dds_serial):
"""Test NavSatFix messages are published by AP_DDS."""
+ _, actions = launch_sitl_copter_dds_serial
+ virtual_ports = actions["virtual_ports"].action
+ micro_ros_agent = actions["micro_ros_agent"].action
+ mavproxy = actions["mavproxy"].action
+ sitl = actions["sitl"].action
+
+ # Wait for process to start.
+ process_tools.wait_for_start_sync(launch_context, virtual_ports, timeout=2)
+ process_tools.wait_for_start_sync(launch_context, micro_ros_agent, timeout=2)
+ process_tools.wait_for_start_sync(launch_context, mavproxy, timeout=2)
+ process_tools.wait_for_start_sync(launch_context, sitl, timeout=2)
+
rclpy.init()
try:
node = NavSatFixListener()
@@ -91,8 +126,28 @@ def test_navsat_msgs_received(sitl_dds, launch_context):
assert msgs_received_flag, "Did not receive 'ap/navsat/navsat0' msgs."
finally:
rclpy.shutdown()
-
yield
- # Anything below this line is executed after launch service shutdown.
- pass
+
+@pytest.mark.launch(fixture=launch_sitl_copter_dds_udp)
+def test_dds_udp_navsat_msg_recv(launch_context, launch_sitl_copter_dds_udp):
+ """Test NavSatFix messages are published by AP_DDS."""
+ _, actions = launch_sitl_copter_dds_udp
+ micro_ros_agent = actions["micro_ros_agent"].action
+ mavproxy = actions["mavproxy"].action
+ sitl = actions["sitl"].action
+
+ # Wait for process to start.
+ process_tools.wait_for_start_sync(launch_context, micro_ros_agent, timeout=2)
+ process_tools.wait_for_start_sync(launch_context, mavproxy, timeout=2)
+ process_tools.wait_for_start_sync(launch_context, sitl, timeout=2)
+
+ rclpy.init()
+ try:
+ node = NavSatFixListener()
+ node.start_subscriber()
+ msgs_received_flag = node.msg_event_object.wait(timeout=10.0)
+ assert msgs_received_flag, "Did not receive 'ap/navsat/navsat0' msgs."
+ finally:
+ rclpy.shutdown()
+ yield
diff --git a/Tools/ros2/ardupilot_dds_tests/test/ardupilot_dds_tests/test_time_msg_received.py b/Tools/ros2/ardupilot_dds_tests/test/ardupilot_dds_tests/test_time_msg_received.py
index b32864e662..d3caa34820 100644
--- a/Tools/ros2/ardupilot_dds_tests/test/ardupilot_dds_tests/test_time_msg_received.py
+++ b/Tools/ros2/ardupilot_dds_tests/test/ardupilot_dds_tests/test_time_msg_received.py
@@ -22,20 +22,11 @@ import threading
from launch import LaunchDescription
+from launch_pytest.tools import process as process_tools
+
from builtin_interfaces.msg import Time
-@launch_pytest.fixture
-def launch_description(sitl_dds):
- """Fixture to create the launch description."""
- return LaunchDescription(
- [
- sitl_dds,
- launch_pytest.actions.ReadyToTest(),
- ]
- )
-
-
class TimeListener(rclpy.node.Node):
"""Subscribe to Time messages on /ap/clock."""
@@ -72,9 +63,51 @@ class TimeListener(rclpy.node.Node):
self.get_logger().info("From AP : False")
-@pytest.mark.launch(fixture=launch_description)
-def test_time_msgs_received(sitl_dds, launch_context):
- """Test Time messages are published by AP_DDS."""
+@launch_pytest.fixture
+def launch_sitl_copter_dds_serial(sitl_copter_dds_serial):
+ """Fixture to create the launch description."""
+ sitl_ld, sitl_actions = sitl_copter_dds_serial
+
+ ld = LaunchDescription(
+ [
+ sitl_ld,
+ launch_pytest.actions.ReadyToTest(),
+ ]
+ )
+ actions = sitl_actions
+ yield ld, actions
+
+
+@launch_pytest.fixture
+def launch_sitl_copter_dds_udp(sitl_copter_dds_udp):
+ """Fixture to create the launch description."""
+ sitl_ld, sitl_actions = sitl_copter_dds_udp
+
+ ld = LaunchDescription(
+ [
+ sitl_ld,
+ launch_pytest.actions.ReadyToTest(),
+ ]
+ )
+ actions = sitl_actions
+ yield ld, actions
+
+
+@pytest.mark.launch(fixture=launch_sitl_copter_dds_serial)
+def test_dds_serial_clock_msg_recv(launch_context, launch_sitl_copter_dds_serial):
+ """Test /ap/clock is published by AP_DDS."""
+ _, actions = launch_sitl_copter_dds_serial
+ virtual_ports = actions["virtual_ports"].action
+ micro_ros_agent = actions["micro_ros_agent"].action
+ mavproxy = actions["mavproxy"].action
+ sitl = actions["sitl"].action
+
+ # Wait for process to start.
+ process_tools.wait_for_start_sync(launch_context, virtual_ports, timeout=2)
+ process_tools.wait_for_start_sync(launch_context, micro_ros_agent, timeout=2)
+ process_tools.wait_for_start_sync(launch_context, mavproxy, timeout=2)
+ process_tools.wait_for_start_sync(launch_context, sitl, timeout=2)
+
rclpy.init()
try:
node = TimeListener()
@@ -83,8 +116,28 @@ def test_time_msgs_received(sitl_dds, launch_context):
assert msgs_received_flag, "Did not receive 'ROS_Time' msgs."
finally:
rclpy.shutdown()
-
yield
- # Anything below this line is executed after launch service shutdown.
- pass
+
+@pytest.mark.launch(fixture=launch_sitl_copter_dds_udp)
+def test_dds_udp_clock_msg_recv(launch_context, launch_sitl_copter_dds_udp):
+ """Test /ap/clock is published by AP_DDS."""
+ _, actions = launch_sitl_copter_dds_udp
+ micro_ros_agent = actions["micro_ros_agent"].action
+ mavproxy = actions["mavproxy"].action
+ sitl = actions["sitl"].action
+
+ # Wait for process to start.
+ process_tools.wait_for_start_sync(launch_context, micro_ros_agent, timeout=2)
+ process_tools.wait_for_start_sync(launch_context, mavproxy, timeout=2)
+ process_tools.wait_for_start_sync(launch_context, sitl, timeout=2)
+
+ rclpy.init()
+ try:
+ node = TimeListener()
+ node.start_subscriber()
+ msgs_received_flag = node.msg_event_object.wait(timeout=10.0)
+ assert msgs_received_flag, "Did not receive 'ROS_Time' msgs."
+ finally:
+ rclpy.shutdown()
+ yield
diff --git a/Tools/ros2/ardupilot_dds_tests/test/ardupilot_dds_tests/test_virtual_ports.py b/Tools/ros2/ardupilot_dds_tests/test/ardupilot_dds_tests/test_virtual_ports.py
index a355c8e95f..25109a21e9 100644
--- a/Tools/ros2/ardupilot_dds_tests/test/ardupilot_dds_tests/test_virtual_ports.py
+++ b/Tools/ros2/ardupilot_dds_tests/test/ardupilot_dds_tests/test_virtual_ports.py
@@ -14,123 +14,43 @@
# along with this program. If not, see .
"""Use `socat` to create virtual ports ttyROS0 and ttyRO1 and check they exist."""
-import os
import launch_pytest
import pytest
from launch import LaunchDescription
-from launch.actions import EmitEvent
-from launch.actions import ExecuteProcess
-from launch.actions import IncludeLaunchDescription
-from launch.actions import LogInfo
-from launch.actions import RegisterEventHandler
-from launch.event_handlers import OnProcessStart
-from launch.event_handlers import OnProcessExit
-from launch.events import matches_action
-from launch.events.process import ShutdownProcess
-from launch.launch_description_sources import PythonLaunchDescriptionSource
-from launch.substitutions import PathJoinSubstitution
from launch_pytest.tools import process as process_tools
-from launch_ros.substitutions import FindPackageShare
-from pathlib import Path
+@launch_pytest.fixture
+def launch_description(virtual_ports):
+ """Launch description fixture."""
+ vp_ld, vp_actions = virtual_ports
-
-@pytest.fixture()
-def dummy_proc():
- """Return a dummy process action to manage test lifetime."""
- return ExecuteProcess(
- cmd=["echo", "ardupilot_dds_tests"],
- shell=True,
- cached_output=True,
- )
-
-
-@pytest.fixture()
-def virtual_ports(device_dir):
- """Fixture that includes and configures the virtual port launch."""
- # Create directory for virtual ports.
- print(f"\ntmpdirname: {device_dir}\n")
- if not ("dev" in os.listdir(device_dir)):
- os.mkdir(Path(device_dir, "dev"))
-
- # Full path to virtual ports.
- tty0 = Path(device_dir, "dev", "tty0").resolve()
- tty1 = Path(device_dir, "dev", "tty1").resolve()
-
- virtual_ports = IncludeLaunchDescription(
- PythonLaunchDescriptionSource(
- [
- PathJoinSubstitution(
- [
- FindPackageShare("ardupilot_sitl"),
- "launch",
- "virtual_ports.launch.py",
- ]
- ),
- ]
- ),
- launch_arguments={
- "tty0": str(tty0),
- "tty1": str(tty1),
- }.items(),
- )
- return virtual_ports
-
-
-@launch_pytest.fixture()
-def launch_description(dummy_proc, virtual_ports):
- """Fixture to create the launch description."""
- on_start = RegisterEventHandler(
- event_handler=OnProcessStart(
- target_action=dummy_proc,
- on_start=[
- LogInfo(msg="Test started."),
- ],
- )
- )
-
- on_process_exit = RegisterEventHandler(
- event_handler=OnProcessExit(
- target_action=dummy_proc,
- on_exit=[
- LogInfo(msg="Test stopping."),
- EmitEvent(
- event=ShutdownProcess(process_matcher=matches_action(virtual_ports))
- ),
- ],
- )
- )
-
- return LaunchDescription(
+ ld = LaunchDescription(
[
- dummy_proc,
- virtual_ports,
- on_start,
- on_process_exit,
+ vp_ld,
launch_pytest.actions.ReadyToTest(),
]
)
+ actions = vp_actions
+ yield ld, actions
@pytest.mark.launch(fixture=launch_description)
-def test_virtual_ports(dummy_proc, virtual_ports, launch_context):
+def test_virtual_ports(launch_context, launch_description):
"""Test the virtual ports are present."""
- # TODO: check virtial port process for regex:
- # "starting data transfer loop"
+ _, actions = launch_description
+ virtual_ports = actions["virtual_ports"].action
+ # Wait for process to start.
+ process_tools.wait_for_start_sync(launch_context, virtual_ports, timeout=2)
+
+ # Assert contents of output to stderr.
def validate_output(output):
- assert output.splitlines() == [
- "ardupilot_dds_tests"
- ], "Test process had no output."
+ assert "N starting data transfer loop" in output, "Test process had no output."
- process_tools.assert_output_sync(
- launch_context, dummy_proc, validate_output, timeout=5
+ process_tools.assert_stderr_sync(
+ launch_context, virtual_ports, validate_output, timeout=5
)
-
yield
-
- # Anything below this line is executed after launch service shutdown.
- assert dummy_proc.return_code == 0
diff --git a/Tools/ros2/ardupilot_sitl/config/default_params/dds.parm b/Tools/ros2/ardupilot_sitl/config/default_params/dds_serial.parm
similarity index 74%
rename from Tools/ros2/ardupilot_sitl/config/default_params/dds.parm
rename to Tools/ros2/ardupilot_sitl/config/default_params/dds_serial.parm
index 80767ed1d7..89f2b3cf8e 100644
--- a/Tools/ros2/ardupilot_sitl/config/default_params/dds.parm
+++ b/Tools/ros2/ardupilot_sitl/config/default_params/dds_serial.parm
@@ -1,2 +1,3 @@
+DDS_ENABLE 1
SERIAL1_BAUD 115
SERIAL1_PROTOCOL 45
diff --git a/Tools/ros2/ardupilot_sitl/config/default_params/dds_udp.parm b/Tools/ros2/ardupilot_sitl/config/default_params/dds_udp.parm
new file mode 100644
index 0000000000..81c3e69cd9
--- /dev/null
+++ b/Tools/ros2/ardupilot_sitl/config/default_params/dds_udp.parm
@@ -0,0 +1,2 @@
+DDS_ENABLE 1
+DDS_PORT 2019
diff --git a/Tools/ros2/ardupilot_sitl/launch/mavproxy.launch.py b/Tools/ros2/ardupilot_sitl/launch/mavproxy.launch.py
index 4faa222abf..47c3ee1b0b 100644
--- a/Tools/ros2/ardupilot_sitl/launch/mavproxy.launch.py
+++ b/Tools/ros2/ardupilot_sitl/launch/mavproxy.launch.py
@@ -25,73 +25,9 @@ Show launch arguments:
ros2 launch ardupilot_sitl mavproxy.launch.py --show-args
"""
from launch import LaunchDescription
-from launch.actions import DeclareLaunchArgument
-from launch.actions import ExecuteProcess
-from launch.actions import OpaqueFunction
-from launch.substitutions import LaunchConfiguration
+from ardupilot_sitl.launch import MAVProxyLaunch
-def launch_mavproxy(context, *args, **kwargs):
- """Return a non-interactive MAVProxy process."""
- # Declare the command.
- command = "mavproxy.py"
-
- # Retrieve launch arguments.
- master = LaunchConfiguration("master").perform(context)
- # out = LaunchConfiguration("out").perform(context)
- sitl = LaunchConfiguration("sitl").perform(context)
-
- # Display launch arguments.
- print(f"command: {command}")
- print(f"master: {master}")
- print(f"sitl: {sitl}")
-
- # Create action.
- mavproxy_process = ExecuteProcess(
- cmd=[
- [
- f"{command} ",
- "--out ",
- "127.0.0.1:14550 ",
- "--out ",
- "127.0.0.1:14551 ",
- f"--master {master} ",
- f"--sitl {sitl} ",
- "--non-interactive ",
- # "--daemon "
- ]
- ],
- shell=True,
- output="both",
- respawn=False,
- )
-
- launch_processes = [mavproxy_process]
- return launch_processes
-
-
-def generate_launch_description():
+def generate_launch_description() -> LaunchDescription:
"""Generate a launch description for MAVProxy."""
- # Create launch description.
- return LaunchDescription(
- [
- # Launch arguments.
- DeclareLaunchArgument(
- "master",
- default_value="tcp:127.0.0.1:5760",
- description="MAVLink master port and optional baud rate.",
- ),
- DeclareLaunchArgument(
- "out",
- default_value="127.0.0.1:14550",
- description="MAVLink output port and optional baud rate.",
- ),
- DeclareLaunchArgument(
- "sitl",
- default_value="127.0.0.1:5501",
- description="SITL output port.",
- ),
- # Launch function.
- OpaqueFunction(function=launch_mavproxy),
- ]
- )
+ return MAVProxyLaunch.generate_launch_description()
diff --git a/Tools/ros2/ardupilot_sitl/launch/micro_ros_agent.launch.py b/Tools/ros2/ardupilot_sitl/launch/micro_ros_agent.launch.py
index e8013f6076..c5ab775bda 100644
--- a/Tools/ros2/ardupilot_sitl/launch/micro_ros_agent.launch.py
+++ b/Tools/ros2/ardupilot_sitl/launch/micro_ros_agent.launch.py
@@ -21,108 +21,9 @@ Run with default arguments:
ros2 launch ardupilot_sitl micro_ros_agent.launch.py
"""
from launch import LaunchDescription
-from launch.actions import DeclareLaunchArgument
-from launch.actions import OpaqueFunction
-from launch.substitutions import LaunchConfiguration
-
-import launch_ros
+from ardupilot_sitl.launch import MicroRosAgentLaunch
-def launch_micro_ros_agent(context, *args, **kwargs):
- """Launch the micro_ros_agent node."""
- # ROS arguments.
- micro_ros_agent_ns = LaunchConfiguration("micro_ros_agent_ns").perform(context)
-
- # Common arguments.
- transport = LaunchConfiguration("transport").perform(context)
- middleware = LaunchConfiguration("middleware").perform(context)
- refs = LaunchConfiguration("refs").perform(context)
- verbose = LaunchConfiguration("verbose").perform(context)
- # discovery = LaunchConfiguration("discovery").perform(context)
-
- # Serial arguments.
- device = LaunchConfiguration("device").perform(context)
- baudrate = LaunchConfiguration("baudrate").perform(context)
-
- # Display launch arguments.
- print(f"namespace: {micro_ros_agent_ns}")
- print(f"transport: {transport}")
- print(f"middleware: {middleware}")
- print(f"refs: {refs}")
- print(f"verbose: {verbose}")
- # print(f"discovery: {discovery}")
-
- print(f"baudrate: {baudrate}")
- print(f"device: {device}")
-
- # Create action.
- micro_ros_agent_node = launch_ros.actions.Node(
- package="micro_ros_agent",
- namespace=f"{micro_ros_agent_ns}",
- executable="micro_ros_agent",
- name="micro_ros_agent",
- output="both",
- arguments=[
- {transport},
- "--middleware",
- {middleware},
- "--refs",
- {refs},
- "--dev",
- {device},
- "--baudrate",
- {baudrate},
- ],
- # additional_env={"PYTHONUNBUFFERED": "1"},
- )
-
- launch_processes = [micro_ros_agent_node]
- return launch_processes
-
-
-def generate_launch_description():
+def generate_launch_description() -> LaunchDescription:
"""Generate a launch description for the micro_ros_agent."""
- return LaunchDescription(
- [
- # Launch arguments.
- DeclareLaunchArgument(
- "micro_ros_agent_ns",
- default_value="",
- description="Set the micro_ros_agent namespace.",
- ),
- DeclareLaunchArgument(
- "transport",
- default_value="serial",
- description="Set the transport.",
- choices=["serial"],
- ),
- DeclareLaunchArgument(
- "middleware",
- default_value="dds",
- description="Set the middleware.",
- choices=["ced", "dds", "rtps"],
- ),
- DeclareLaunchArgument(
- "refs",
- default_value="",
- description="Set the refs file.",
- ),
- DeclareLaunchArgument(
- "verbose",
- default_value="",
- description="Set the verbosity level.",
- ),
- DeclareLaunchArgument(
- "baudrate",
- default_value="",
- description="Set the baudrate.",
- ),
- DeclareLaunchArgument(
- "device",
- default_value="",
- description="Set the device.",
- ),
- # Launch function.
- OpaqueFunction(function=launch_micro_ros_agent),
- ]
- )
+ return MicroRosAgentLaunch.generate_launch_description()
diff --git a/Tools/ros2/ardupilot_sitl/launch/sitl.launch.py b/Tools/ros2/ardupilot_sitl/launch/sitl.launch.py
index ca5aba8172..9cbbdc269f 100644
--- a/Tools/ros2/ardupilot_sitl/launch/sitl.launch.py
+++ b/Tools/ros2/ardupilot_sitl/launch/sitl.launch.py
@@ -24,190 +24,10 @@ Show launch arguments:
ros2 launch ardupilot_sitl sitl.launch.py --show-args
"""
-
from launch import LaunchDescription
-from launch.actions import DeclareLaunchArgument
-from launch.actions import ExecuteProcess
-from launch.actions import OpaqueFunction
-from launch.substitutions import LaunchConfiguration
-from launch.substitutions import PathJoinSubstitution
-
-from launch_ros.substitutions import FindPackageShare
-
-# Labels for the optional uart launch arguments.
-uart_labels = ["A", "B", "C", "D", "E", "F", "H", "I", "J"]
-max_serial_ports = 10
+from ardupilot_sitl.launch import SITLLaunch
-def launch_sitl(context, *args, **kwargs):
- """Return a SITL process."""
- command = LaunchConfiguration("command").perform(context)
- model = LaunchConfiguration("model").perform(context)
- speedup = LaunchConfiguration("speedup").perform(context)
- slave = LaunchConfiguration("slave").perform(context)
- sim_address = LaunchConfiguration("sim_address").perform(context)
- instance = LaunchConfiguration("instance").perform(context)
- defaults = LaunchConfiguration("defaults").perform(context)
-
- # Display launch arguments.
- print(f"command: {command}")
- print(f"model: {model}")
- print(f"speedup: {speedup}")
- print(f"slave: {slave}")
- print(f"sim_address: {sim_address}")
- print(f"instance: {instance}")
- print(f"defaults: {defaults}")
-
- # Required arguments.
- cmd_args = [
- f"{command} ",
- f"--model {model} ",
- f"--speedup {speedup} ",
- f"--slave {slave} ",
- f"--sim-address={sim_address} ",
- f"--instance {instance} ",
- f"--defaults {defaults} ",
- ]
-
- # Optional arguments.
- wipe = LaunchConfiguration("wipe").perform(context)
- if wipe == "True":
- cmd_args.append("--wipe ")
- print(f"wipe: {wipe}")
-
- synthetic_clock = LaunchConfiguration("synthetic_clock").perform(context)
- if synthetic_clock == "True":
- cmd_args.append("--synthetic-clock ")
- print(f"synthetic_clock: {synthetic_clock}")
-
- home = LaunchConfiguration("home").perform(context)
- if home:
- cmd_args.append("--home {home} ")
- print(f"home: {home}")
-
- # Optional uart arguments.
- for label in uart_labels:
- arg = LaunchConfiguration(f"uart{label}").perform(context)
- if arg:
- cmd_args.append(f"--uart{label} {arg} ")
- print(f"uart{label}: {arg}")
-
- # Optional serial arguments.
- for label in range(10):
- arg = LaunchConfiguration(f"serial{label}").perform(context)
- if arg:
- cmd_args.append(f"--serial{label} {arg} ")
- print(f"serial{label}: {arg}")
-
- # Create action.
- sitl_process = ExecuteProcess(
- cmd=[cmd_args],
- shell=True,
- output="both",
- respawn=False,
- )
-
- launch_processes = [sitl_process]
- return launch_processes
-
-
-def generate_launch_description():
- """Generate a launch description for SITL."""
- # UART launch arguments.
- uart_args = []
- for label in uart_labels:
- arg = DeclareLaunchArgument(
- f"uart{label}",
- default_value="",
- description=f"set device string for UART{label}.",
- )
- uart_args.append(arg)
-
- # Serial launch arguments.
- serial_args = []
- for label in range(max_serial_ports):
- arg = DeclareLaunchArgument(
- f"serial{label}",
- default_value="",
- description=f"set device string for SERIAL{label}.",
- )
- serial_args.append(arg)
-
- return LaunchDescription(
- [
- # Launch arguments.
- DeclareLaunchArgument(
- "command",
- default_value="arducopter",
- description="Run ArduPilot SITL.",
- choices=[
- "antennatracker",
- "arducopter-heli",
- "ardurover",
- "blimp",
- "arducopter",
- "arduplane",
- "ardusub",
- ],
- ),
- DeclareLaunchArgument(
- "model",
- default_value="quad",
- description="Set simulation model.",
- ),
- DeclareLaunchArgument(
- "slave",
- default_value="0",
- description="Set the number of JSON slaves.",
- ),
- DeclareLaunchArgument(
- "sim_address",
- default_value="127.0.0.1",
- description="Set address string for simulator.",
- ),
- DeclareLaunchArgument(
- "speedup",
- default_value="1",
- description="Set simulation speedup.",
- ),
- DeclareLaunchArgument(
- "instance",
- default_value="0",
- description="Set instance of SITL "
- "(adds 10*instance to all port numbers).",
- ),
- DeclareLaunchArgument(
- "defaults",
- default_value=PathJoinSubstitution(
- [
- FindPackageShare("ardupilot_sitl"),
- "config",
- "default_params",
- "copter.parm",
- ]
- ),
- description="Set path to defaults file.",
- ),
- # Optional launch arguments.
- DeclareLaunchArgument(
- "wipe",
- default_value="False",
- description="Wipe eeprom.",
- choices=["True", "False"],
- ),
- DeclareLaunchArgument(
- "synthetic_clock",
- default_value="False",
- description="Set synthetic clock mode.",
- choices=["True", "False"],
- ),
- DeclareLaunchArgument(
- "home",
- default_value="",
- description="Set start location (lat,lng,alt,yaw) " "or location name.",
- ),
- ]
- + uart_args
- + serial_args
- + [OpaqueFunction(function=launch_sitl)]
- )
+def generate_launch_description() -> LaunchDescription:
+ """Generate a launch description for ArduPilot SITL."""
+ return SITLLaunch.generate_launch_description()
diff --git a/Tools/ros2/ardupilot_sitl/launch/sitl_dds.launch.py b/Tools/ros2/ardupilot_sitl/launch/sitl_dds_serial.launch.py
similarity index 95%
rename from Tools/ros2/ardupilot_sitl/launch/sitl_dds.launch.py
rename to Tools/ros2/ardupilot_sitl/launch/sitl_dds_serial.launch.py
index ec229603c4..2a9507e5eb 100644
--- a/Tools/ros2/ardupilot_sitl/launch/sitl_dds.launch.py
+++ b/Tools/ros2/ardupilot_sitl/launch/sitl_dds_serial.launch.py
@@ -18,7 +18,7 @@ Launch ArduPilot SITL, MAVProxy and the microROS DDS agent.
Run with default arguments:
-ros2 launch ardupilot_sitl sitl_dds.launch.py
+ros2 launch ardupilot_sitl sitl_dds_serial.launch.py
"""
from launch import LaunchDescription
from launch.actions import IncludeLaunchDescription
@@ -28,9 +28,9 @@ from launch.substitutions import PathJoinSubstitution
from launch_ros.substitutions import FindPackageShare
-def generate_launch_description():
+def generate_launch_description() -> LaunchDescription:
"""Generate a launch description to bring up ArduPilot SITL with DDS."""
- # Include component launch files.
+ # Compose launch files.
virtual_ports = IncludeLaunchDescription(
PythonLaunchDescriptionSource(
[
diff --git a/Tools/ros2/ardupilot_sitl/launch/sitl_dds_udp.launch.py b/Tools/ros2/ardupilot_sitl/launch/sitl_dds_udp.launch.py
new file mode 100644
index 0000000000..79b1492382
--- /dev/null
+++ b/Tools/ros2/ardupilot_sitl/launch/sitl_dds_udp.launch.py
@@ -0,0 +1,80 @@
+# Copyright 2023 ArduPilot.org.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+"""
+Launch ArduPilot SITL, MAVProxy and the microROS DDS agent.
+
+Run with default arguments:
+
+ros2 launch ardupilot_sitl sitl_dds_udp.launch.py
+"""
+from launch import LaunchDescription
+from launch.actions import IncludeLaunchDescription
+from launch.launch_description_sources import PythonLaunchDescriptionSource
+from launch.substitutions import PathJoinSubstitution
+
+from launch_ros.substitutions import FindPackageShare
+
+
+def generate_launch_description() -> LaunchDescription:
+ """Generate a launch description to bring up ArduPilot SITL with DDS."""
+ # Compose launch files.
+ micro_ros_agent = IncludeLaunchDescription(
+ PythonLaunchDescriptionSource(
+ [
+ PathJoinSubstitution(
+ [
+ FindPackageShare("ardupilot_sitl"),
+ "launch",
+ "micro_ros_agent.launch.py",
+ ]
+ ),
+ ]
+ )
+ )
+ sitl = IncludeLaunchDescription(
+ PythonLaunchDescriptionSource(
+ [
+ PathJoinSubstitution(
+ [
+ FindPackageShare("ardupilot_sitl"),
+ "launch",
+ "sitl.launch.py",
+ ]
+ ),
+ ]
+ )
+ )
+ mavproxy = IncludeLaunchDescription(
+ PythonLaunchDescriptionSource(
+ [
+ PathJoinSubstitution(
+ [
+ FindPackageShare("ardupilot_sitl"),
+ "launch",
+ "mavproxy.launch.py",
+ ]
+ ),
+ ]
+ )
+ )
+
+ return LaunchDescription(
+ [
+ micro_ros_agent,
+ sitl,
+ mavproxy,
+ ]
+ )
diff --git a/Tools/ros2/ardupilot_sitl/launch/sitl_mavproxy.launch.py b/Tools/ros2/ardupilot_sitl/launch/sitl_mavproxy.launch.py
index 95107e6e0b..50c2237140 100644
--- a/Tools/ros2/ardupilot_sitl/launch/sitl_mavproxy.launch.py
+++ b/Tools/ros2/ardupilot_sitl/launch/sitl_mavproxy.launch.py
@@ -28,8 +28,9 @@ from launch.substitutions import PathJoinSubstitution
from launch_ros.substitutions import FindPackageShare
-def generate_launch_description():
+def generate_launch_description() -> LaunchDescription:
"""Generate a launch description for combined SITL and MAVProxy."""
+ # Compose launch files.
sitl = IncludeLaunchDescription(
PythonLaunchDescriptionSource(
[
diff --git a/Tools/ros2/ardupilot_sitl/launch/virtual_ports.launch.py b/Tools/ros2/ardupilot_sitl/launch/virtual_ports.launch.py
index a0872ab8aa..d1b771f6bb 100644
--- a/Tools/ros2/ardupilot_sitl/launch/virtual_ports.launch.py
+++ b/Tools/ros2/ardupilot_sitl/launch/virtual_ports.launch.py
@@ -21,58 +21,9 @@ Run with default arguments:
ros2 launch ardupilot_sitl virtual_ports.launch.py
"""
from launch import LaunchDescription
-from launch.actions import DeclareLaunchArgument
-from launch.actions import ExecuteProcess
-from launch.actions import OpaqueFunction
-from launch.substitutions import LaunchConfiguration
+from ardupilot_sitl.launch import VirtualPortsLaunch
-def launch_socat(context, *args, **kwargs):
- """Launch a process to create virtual ports using `socat`."""
- command = "socat"
- tty0 = LaunchConfiguration("tty0").perform(context)
- tty1 = LaunchConfiguration("tty1").perform(context)
-
- # Display launch arguments.
- print(f"command: {command}")
- print(f"tty0: {tty0}")
- print(f"tty1: {tty1}")
-
- # Create action.
- socat_process = ExecuteProcess(
- cmd=[
- [
- "socat ",
- "-d -d ",
- f"pty,raw,echo=0,link={tty0} ",
- f"pty,raw,echo=0,link={tty1} ",
- ]
- ],
- shell=True,
- output="both",
- respawn=False,
- )
-
- launch_processes = [socat_process]
- return launch_processes
-
-
-def generate_launch_description():
+def generate_launch_description() -> LaunchDescription:
"""Generate a launch description for creating virtual ports using `socat`."""
- return LaunchDescription(
- [
- # Launch arguments.
- DeclareLaunchArgument(
- "tty0",
- default_value="tty0",
- description="Symbolic link name for tty0.",
- ),
- DeclareLaunchArgument(
- "tty1",
- default_value="tty1",
- description="Symbolic link name for tty1.",
- ),
- # Launch function.
- OpaqueFunction(function=launch_socat),
- ]
- )
+ return VirtualPortsLaunch.generate_launch_description()
diff --git a/Tools/ros2/ardupilot_sitl/src/ardupilot_sitl/actions.py b/Tools/ros2/ardupilot_sitl/src/ardupilot_sitl/actions.py
new file mode 100644
index 0000000000..928a511742
--- /dev/null
+++ b/Tools/ros2/ardupilot_sitl/src/ardupilot_sitl/actions.py
@@ -0,0 +1,107 @@
+# Copyright 2023 ArduPilot.org.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+# Adapted from launch.actions.opaque_function.py
+
+# Copyright 2018 Open Source Robotics Foundation, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Module for the OpaqueFunction action."""
+
+import collections.abc
+from typing import Any
+from typing import Callable
+from typing import Dict
+from typing import Iterable
+from typing import List
+from typing import Optional
+from typing import Text
+
+from launch.action import Action
+from launch.launch_context import LaunchContext
+from launch.launch_description_entity import LaunchDescriptionEntity
+from launch.utilities import ensure_argument_type
+
+
+class ExecuteFunction(Action):
+ """
+ Action that executes a Python function.
+
+ The signature of the function should be:
+
+ .. code-block:: python
+
+ def function(
+ context: LaunchContext,
+ *args,
+ **kwargs
+ ) -> Optional[Action]:
+ ...
+
+ """
+
+ def __init__(
+ self,
+ *,
+ function: Callable,
+ args: Optional[Iterable[Any]] = None,
+ kwargs: Optional[Dict[Text, Any]] = None,
+ **left_over_kwargs
+ ) -> None:
+ """Create an ExecuteFunction action."""
+ super().__init__(**left_over_kwargs)
+ if not callable(function):
+ raise TypeError(
+ "ExecuteFunction expected a callable for 'function', got '{}'".format(
+ type(function)
+ )
+ )
+ ensure_argument_type(
+ args, (collections.abc.Iterable, type(None)), "args", "ExecuteFunction"
+ )
+ ensure_argument_type(kwargs, (dict, type(None)), "kwargs", "ExecuteFunction")
+ self.__function = function
+ self.__args = [] # type: Iterable
+ if args is not None:
+ self.__args = args
+ self.__kwargs = {} # type: Dict[Text, Any]
+ if kwargs is not None:
+ self.__kwargs = kwargs
+
+ self.__action = None
+
+ @property
+ def action(self) -> Action:
+ """Return the Action obtained by executing the function."""
+ return self.__action
+
+ def execute(
+ self, context: LaunchContext
+ ) -> Optional[List[LaunchDescriptionEntity]]:
+ """Execute the function."""
+ action = self.__function(context, *self.__args, **self.__kwargs)
+ self.__action = action
+ return [action]
diff --git a/Tools/ros2/ardupilot_sitl/src/ardupilot_sitl/launch.py b/Tools/ros2/ardupilot_sitl/src/ardupilot_sitl/launch.py
new file mode 100644
index 0000000000..0d15ab4ac3
--- /dev/null
+++ b/Tools/ros2/ardupilot_sitl/src/ardupilot_sitl/launch.py
@@ -0,0 +1,657 @@
+# Copyright 2023 ArduPilot.org.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+"""Launch actions for ArduPilot."""
+from typing import List
+from typing import Dict
+from typing import Text
+from typing import Tuple
+
+from launch import LaunchContext
+from launch import LaunchDescription
+from launch.actions import ExecuteProcess
+from launch.actions import DeclareLaunchArgument
+from launch.substitutions import LaunchConfiguration
+from launch.substitutions import PathJoinSubstitution
+
+from launch_ros.actions import Node
+from launch_ros.substitutions import FindPackageShare
+
+from .actions import ExecuteFunction
+
+
+class VirtualPortsLaunch:
+ """Launch functions for creating virtual ports using `socat`."""
+
+ @staticmethod
+ def generate_action(context: LaunchContext, *args, **kwargs) -> ExecuteProcess:
+ """Generate a launch action."""
+ # Declare commands.
+ command = "socat"
+
+ # Retrieve launch arguments.
+ tty0 = LaunchConfiguration("tty0").perform(context)
+ tty1 = LaunchConfiguration("tty1").perform(context)
+
+ # Display launch arguments.
+ print(f"command: {command}")
+ print(f"tty0: {tty0}")
+ print(f"tty1: {tty1}")
+
+ # Create action.
+ action = ExecuteProcess(
+ cmd=[
+ [
+ "socat ",
+ "-d -d ",
+ f"pty,raw,echo=0,link={tty0} ",
+ f"pty,raw,echo=0,link={tty1} ",
+ ]
+ ],
+ shell=True,
+ output="both",
+ respawn=False,
+ cached_output=True,
+ )
+ return action
+
+ @staticmethod
+ def generate_launch_description_with_actions() -> (
+ Tuple[LaunchDescription, Dict[Text, ExecuteFunction]]
+ ):
+ """Generate a launch description with actions."""
+ launch_arguments = VirtualPortsLaunch.generate_launch_arguments()
+
+ action = ExecuteFunction(function=VirtualPortsLaunch.generate_action)
+
+ ld = LaunchDescription(
+ launch_arguments
+ + [
+ action,
+ ]
+ )
+ actions = {
+ "virtual_ports": action,
+ }
+ return ld, actions
+
+ @staticmethod
+ def generate_launch_description() -> LaunchDescription:
+ """Generate a launch description."""
+ ld, _ = VirtualPortsLaunch.generate_launch_description_with_actions()
+ return ld
+
+ @staticmethod
+ def generate_launch_arguments() -> List[DeclareLaunchArgument]:
+ """Generate a list of launch arguments."""
+ return [
+ DeclareLaunchArgument(
+ "tty0",
+ default_value="tty0",
+ description="Symbolic link name for tty0.",
+ ),
+ DeclareLaunchArgument(
+ "tty1",
+ default_value="tty1",
+ description="Symbolic link name for tty1.",
+ ),
+ ]
+
+
+class MicroRosAgentLaunch:
+ """Launch functions for the micro ROS agent node."""
+
+ @staticmethod
+ def generate_action(context: LaunchContext, *args, **kwargs) -> ExecuteProcess:
+ """Launch the micro_ros_agent node."""
+ # ROS arguments.
+ micro_ros_agent_ns = LaunchConfiguration("micro_ros_agent_ns").perform(context)
+
+ # Common arguments.
+ transport = LaunchConfiguration("transport").perform(context)
+ middleware = LaunchConfiguration("middleware").perform(context)
+ verbose = LaunchConfiguration("verbose").perform(context)
+ discovery = LaunchConfiguration("discovery").perform(context)
+
+ # IPvX arguments.
+ port = LaunchConfiguration("port").perform(context)
+
+ # Serial arguments.
+ device = LaunchConfiguration("device").perform(context)
+ baudrate = LaunchConfiguration("baudrate").perform(context)
+
+ # Display launch arguments.
+ print(f"namespace: {micro_ros_agent_ns}")
+ print(f"transport: {transport}")
+ print(f"middleware: {middleware}")
+ print(f"verbose: {verbose}")
+ print(f"discovery: {discovery}")
+
+ # Required arguments
+ args = [
+ transport,
+ "--middleware",
+ middleware,
+ ]
+
+ if transport in ["udp4", "udp6", "tcp4", "tcp6"]:
+ # IPvX arguments
+ port = LaunchConfiguration("port").perform(context)
+ args.append("--port")
+ args.append(port)
+ print(f"port: {port}")
+ elif transport in ["serial", "multiserial", "pseudoterminal"]:
+ # Serial arguments
+ baudrate = LaunchConfiguration("baudrate").perform(context)
+ args.append("--baudrate")
+ args.append(baudrate)
+ print(f"baudrate: {baudrate}")
+
+ device = LaunchConfiguration("device").perform(context)
+ args.append("--dev")
+ args.append(device)
+ print(f"device: {device}")
+ else:
+ # transport must be canfd
+ pass
+
+ # Optional arguments.
+ refs = LaunchConfiguration("refs").perform(context)
+ if refs:
+ args.append("--refs")
+ args.append(refs)
+ print(f"refs: {refs}")
+
+ # Create action.
+ node = Node(
+ package="micro_ros_agent",
+ executable="micro_ros_agent",
+ name="micro_ros_agent",
+ namespace=f"{micro_ros_agent_ns}",
+ output="both",
+ arguments=args,
+ )
+ return node
+
+ @staticmethod
+ def generate_launch_description_with_actions() -> (
+ Tuple[LaunchDescription, Dict[Text, ExecuteFunction]]
+ ):
+ """Generate a launch description with actions."""
+ launch_arguments = MicroRosAgentLaunch.generate_launch_arguments()
+
+ action = ExecuteFunction(function=MicroRosAgentLaunch.generate_action)
+
+ ld = LaunchDescription(
+ launch_arguments
+ + [
+ action,
+ ]
+ )
+ actions = {
+ "micro_ros_agent": action,
+ }
+ return ld, actions
+
+ @staticmethod
+ def generate_launch_description() -> LaunchDescription:
+ """Generate a launch description."""
+ ld, _ = MicroRosAgentLaunch.generate_launch_description_with_actions()
+ return ld
+
+ @staticmethod
+ def generate_launch_arguments() -> List[DeclareLaunchArgument]:
+ """Generate a list of launch arguments."""
+ return [
+ DeclareLaunchArgument(
+ "micro_ros_agent_ns",
+ default_value="",
+ description="Set the micro_ros_agent namespace.",
+ ),
+ DeclareLaunchArgument(
+ "transport",
+ default_value="udp4",
+ description="Set the transport.",
+ choices=[
+ "udp4",
+ "udp6",
+ "tcp4",
+ "tcp6",
+ "canfd",
+ "serial",
+ "multiserial",
+ "pseudoterminal",
+ ],
+ ),
+ DeclareLaunchArgument(
+ "middleware",
+ default_value="dds",
+ description="Set the middleware.",
+ choices=["ced", "dds", "rtps"],
+ ),
+ DeclareLaunchArgument(
+ "refs",
+ default_value="",
+ description="Set the refs file.",
+ ),
+ DeclareLaunchArgument(
+ "verbose",
+ default_value="4",
+ description="Set the verbosity level.",
+ choices=["0", "1", "2", "3", "4", "5", "6"],
+ ),
+ DeclareLaunchArgument(
+ "discovery",
+ default_value="7400",
+ description="Set the dsicovery port.",
+ ),
+ DeclareLaunchArgument(
+ "port",
+ default_value="2019",
+ description="Set the port number.",
+ ),
+ DeclareLaunchArgument(
+ "baudrate",
+ default_value="115200",
+ description="Set the baudrate.",
+ ),
+ DeclareLaunchArgument(
+ "device",
+ default_value="",
+ description="Set the device.",
+ ),
+ ]
+
+
+class MAVProxyLaunch:
+ """Launch functions for MAVProxy."""
+
+ @staticmethod
+ def generate_action(context: LaunchContext, *args, **kwargs) -> ExecuteProcess:
+ """Return a non-interactive MAVProxy process."""
+ # Declare the command.
+ command = "mavproxy.py"
+
+ # Retrieve launch arguments.
+ master = LaunchConfiguration("master").perform(context)
+ # out = LaunchConfiguration("out").perform(context)
+ sitl = LaunchConfiguration("sitl").perform(context)
+
+ # Display launch arguments.
+ print(f"command: {command}")
+ print(f"master: {master}")
+ print(f"sitl: {sitl}")
+
+ # Create action.
+ mavproxy_process = ExecuteProcess(
+ cmd=[
+ [
+ f"{command} ",
+ "--out ",
+ "127.0.0.1:14550 ",
+ "--out ",
+ "127.0.0.1:14551 ",
+ f"--master {master} ",
+ f"--sitl {sitl} ",
+ "--non-interactive ",
+ ]
+ ],
+ shell=True,
+ output="both",
+ respawn=False,
+ )
+ return mavproxy_process
+
+ @staticmethod
+ def generate_launch_description_with_actions() -> (
+ Tuple[LaunchDescription, Dict[Text, ExecuteFunction]]
+ ):
+ """Generate a launch description for MAVProxy."""
+ launch_arguments = MAVProxyLaunch.generate_launch_arguments()
+
+ action = ExecuteFunction(function=MAVProxyLaunch.generate_action)
+
+ ld = LaunchDescription(
+ launch_arguments
+ + [
+ action,
+ ]
+ )
+ actions = {
+ "mavproxy": action,
+ }
+ return ld, actions
+
+ @staticmethod
+ def generate_launch_description() -> LaunchDescription:
+ """Generate a launch description."""
+ ld, _ = MAVProxyLaunch.generate_launch_description_with_actions()
+ return ld
+
+ @staticmethod
+ def generate_launch_arguments() -> List[DeclareLaunchArgument]:
+ """Generate a list of launch arguments."""
+ return [
+ DeclareLaunchArgument(
+ "master",
+ default_value="tcp:127.0.0.1:5760",
+ description="MAVLink master port and optional baud rate.",
+ ),
+ DeclareLaunchArgument(
+ "out",
+ default_value="127.0.0.1:14550",
+ description="MAVLink output port and optional baud rate.",
+ ),
+ DeclareLaunchArgument(
+ "sitl",
+ default_value="127.0.0.1:5501",
+ description="SITL output port.",
+ ),
+ ]
+
+
+class SITLLaunch:
+ """Launch functions for ArduPilot SITL."""
+
+ # Labels for the optional uart launch arguments.
+ UART_LABELS = ["A", "B", "C", "D", "E", "F", "H", "I", "J"]
+ MAX_SERIAL_PORTS = 10
+
+ @staticmethod
+ def generate_action(context: LaunchContext, *args, **kwargs) -> ExecuteProcess:
+ """Return a SITL process."""
+ # Retrieve launch arguments.
+ command = LaunchConfiguration("command").perform(context)
+ model = LaunchConfiguration("model").perform(context)
+ speedup = LaunchConfiguration("speedup").perform(context)
+ slave = LaunchConfiguration("slave").perform(context)
+ sim_address = LaunchConfiguration("sim_address").perform(context)
+ instance = LaunchConfiguration("instance").perform(context)
+ defaults = LaunchConfiguration("defaults").perform(context)
+
+ # Display launch arguments.
+ print(f"command: {command}")
+ print(f"model: {model}")
+ print(f"speedup: {speedup}")
+ print(f"slave: {slave}")
+ print(f"sim_address: {sim_address}")
+ print(f"instance: {instance}")
+ print(f"defaults: {defaults}")
+
+ # Required arguments.
+ cmd_args = [
+ f"{command} ",
+ f"--model {model} ",
+ f"--speedup {speedup} ",
+ f"--slave {slave} ",
+ f"--sim-address={sim_address} ",
+ f"--instance {instance} ",
+ f"--defaults {defaults} ",
+ ]
+
+ # Optional arguments.
+ wipe = LaunchConfiguration("wipe").perform(context)
+ if wipe == "True":
+ cmd_args.append("--wipe ")
+ print(f"wipe: {wipe}")
+
+ synthetic_clock = LaunchConfiguration("synthetic_clock").perform(context)
+ if synthetic_clock == "True":
+ cmd_args.append("--synthetic-clock ")
+ print(f"synthetic_clock: {synthetic_clock}")
+
+ home = LaunchConfiguration("home").perform(context)
+ if home:
+ cmd_args.append(f"--home {home} ")
+ print(f"home: {home}")
+
+ # Optional uart arguments.
+ for label in SITLLaunch.UART_LABELS:
+ arg = LaunchConfiguration(f"uart{label}").perform(context)
+ if arg:
+ cmd_args.append(f"--uart{label} {arg} ")
+ print(f"uart{label}: {arg}")
+
+ # Optional serial arguments.
+ for label in range(10):
+ arg = LaunchConfiguration(f"serial{label}").perform(context)
+ if arg:
+ cmd_args.append(f"--serial{label} {arg} ")
+ print(f"serial{label}: {arg}")
+
+ rate = LaunchConfiguration("rate").perform(context)
+ if rate:
+ cmd_args.append(f"--rate {rate} ")
+ print(f"rate: {rate}")
+
+ gimbal = LaunchConfiguration("gimbal").perform(context)
+ if gimbal:
+ cmd_args.append(f"--gimbal {gimbal} ")
+ print(f"gimbal: {gimbal}")
+
+ base_port = LaunchConfiguration("base_port").perform(context)
+ if base_port:
+ cmd_args.append(f"--base-port {base_port} ")
+ print(f"base_port: {base_port}")
+
+ rc_in_port = LaunchConfiguration("rc_in_port").perform(context)
+ if rc_in_port:
+ cmd_args.append(f"--rc-in-port {rc_in_port} ")
+ print(f"rc_in_port: {rc_in_port}")
+
+ sim_port_in = LaunchConfiguration("sim_port_in").perform(context)
+ if sim_port_in:
+ cmd_args.append(f"--sim-port-in {sim_port_in} ")
+ print(f"sim_port_in: {sim_port_in}")
+
+ sim_port_out = LaunchConfiguration("sim_port_out").perform(context)
+ if sim_port_out:
+ cmd_args.append(f"--sim-port-out {sim_port_out} ")
+ print(f"sim_port_out: {sim_port_out}")
+
+ irlock_port = LaunchConfiguration("irlock_port").perform(context)
+ if irlock_port:
+ cmd_args.append(f"--irlock-port {irlock_port} ")
+ print(f"irlock_port: {irlock_port}")
+
+ start_time = LaunchConfiguration("start_time").perform(context)
+ if start_time:
+ cmd_args.append(f"--start-time {start_time} ")
+ print(f"start_time: {start_time}")
+
+ sysid = LaunchConfiguration("sysid").perform(context)
+ if sysid:
+ cmd_args.append(f"--sysid {sysid} ")
+ print(f"sysid: {sysid}")
+
+ # Create action.
+ sitl_process = ExecuteProcess(
+ cmd=[cmd_args],
+ shell=True,
+ output="both",
+ respawn=False,
+ )
+ return sitl_process
+
+ @staticmethod
+ def generate_launch_description_with_actions() -> (
+ Tuple[LaunchDescription, Dict[Text, ExecuteFunction]]
+ ):
+ """Generate a launch description for SITL."""
+ launch_arguments = SITLLaunch.generate_launch_arguments()
+
+ action = ExecuteFunction(function=SITLLaunch.generate_action)
+
+ ld = LaunchDescription(
+ launch_arguments
+ + [
+ action,
+ ]
+ )
+ actions = {
+ "sitl": action,
+ }
+ return ld, actions
+
+ @staticmethod
+ def generate_launch_description() -> LaunchDescription:
+ """Generate a launch description."""
+ ld, _ = SITLLaunch.generate_launch_description_with_actions()
+ return ld
+
+ @staticmethod
+ def generate_launch_arguments() -> List[DeclareLaunchArgument]:
+ """Generate a list of launch arguments."""
+ launch_args = [
+ # Required launch arguments.
+ DeclareLaunchArgument(
+ "command",
+ default_value="arducopter",
+ description="Run ArduPilot SITL.",
+ choices=[
+ "antennatracker",
+ "arducopter-heli",
+ "ardurover",
+ "blimp",
+ "arducopter",
+ "arduplane",
+ "ardusub",
+ ],
+ ),
+ DeclareLaunchArgument(
+ "model",
+ default_value="quad",
+ description="Set simulation model.",
+ ),
+ DeclareLaunchArgument(
+ "slave",
+ default_value="0",
+ description="Set the number of JSON slaves.",
+ ),
+ DeclareLaunchArgument(
+ "sim_address",
+ default_value="127.0.0.1",
+ description="Set address string for simulator.",
+ ),
+ DeclareLaunchArgument(
+ "speedup",
+ default_value="1",
+ description="Set simulation speedup.",
+ ),
+ DeclareLaunchArgument(
+ "instance",
+ default_value="0",
+ description="Set instance of SITL "
+ "(adds 10*instance to all port numbers).",
+ ),
+ DeclareLaunchArgument(
+ "defaults",
+ default_value=PathJoinSubstitution(
+ [
+ FindPackageShare("ardupilot_sitl"),
+ "config",
+ "default_params",
+ "copter.parm",
+ ]
+ ),
+ description="Set path to defaults file.",
+ ),
+ # Optional launch arguments.
+ DeclareLaunchArgument(
+ "wipe",
+ default_value="False",
+ description="Wipe eeprom.",
+ choices=["True", "False"],
+ ),
+ DeclareLaunchArgument(
+ "synthetic_clock",
+ default_value="False",
+ description="Set synthetic clock mode.",
+ choices=["True", "False"],
+ ),
+ DeclareLaunchArgument(
+ "home",
+ default_value="",
+ description="Set start location (lat,lng,alt,yaw) or location name.",
+ ),
+ DeclareLaunchArgument(
+ "rate",
+ default_value="",
+ description="Set SITL framerate.",
+ ),
+ DeclareLaunchArgument(
+ "gimbal",
+ default_value="",
+ description="Enable simulated MAVLink gimbal.",
+ ),
+ DeclareLaunchArgument(
+ "base_port",
+ default_value="",
+ description="Set port num for base port(default 5670) "
+ "must be before -I option.",
+ ),
+ DeclareLaunchArgument(
+ "rc_in_port",
+ default_value="",
+ description="Set port num for rc in.",
+ ),
+ DeclareLaunchArgument(
+ "sim_port_in",
+ default_value="",
+ description="Set port num for simulator in.",
+ ),
+ DeclareLaunchArgument(
+ "sim_port_out",
+ default_value="",
+ description="Set port num for simulator out.",
+ ),
+ DeclareLaunchArgument(
+ "irlock_port",
+ default_value="",
+ description="Set port num for irlock.",
+ ),
+ DeclareLaunchArgument(
+ "start_time",
+ default_value="",
+ description="Set simulation start time in UNIX timestamp.",
+ ),
+ DeclareLaunchArgument(
+ "sysid",
+ default_value="",
+ description="Set SYSID_THISMAV.",
+ ),
+ ]
+
+ # UART launch arguments.
+ uart_args = []
+ for label in SITLLaunch.UART_LABELS:
+ arg = DeclareLaunchArgument(
+ f"uart{label}",
+ default_value="",
+ description=f"set device string for UART{label}.",
+ )
+ uart_args.append(arg)
+
+ # Serial launch arguments.
+ serial_args = []
+ for label in range(SITLLaunch.MAX_SERIAL_PORTS):
+ arg = DeclareLaunchArgument(
+ f"serial{label}",
+ default_value="",
+ description=f"set device string for SERIAL{label}.",
+ )
+ serial_args.append(arg)
+
+ return launch_args + uart_args + serial_args
diff --git a/Tools/ros2/ardupilot_sitl/src/ardupilot_sitl/utilities.py b/Tools/ros2/ardupilot_sitl/src/ardupilot_sitl/utilities.py
new file mode 100644
index 0000000000..10a4396e9f
--- /dev/null
+++ b/Tools/ros2/ardupilot_sitl/src/ardupilot_sitl/utilities.py
@@ -0,0 +1,37 @@
+# Copyright 2023 ArduPilot.org.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+"""Utilities."""
+from functools import wraps
+from typing import List
+
+from launch import LaunchContext
+from launch import LaunchDescriptionEntity
+
+
+# Adapted from a SO answer by David Wolever.
+# Is there a library function in Python to turn a generator-function
+# into a function returning a list?
+# https://stackoverflow.com/questions/12377013
+def listify(fn) -> List[LaunchDescriptionEntity]:
+ """Wrap a functions's return value in a list."""
+
+ @wraps(fn)
+ def listify_helper(
+ context: LaunchContext, *args, **kwargs
+ ) -> List[LaunchDescriptionEntity]:
+ return [fn(context, args, kwargs)]
+
+ return listify_helper