Tools: ros2: Added message and dds test for copter takeoff

This commit is contained in:
snktshrma 2024-11-21 13:29:36 +05:30 committed by Randy Mackay
parent 20aeae1a09
commit fcd61c38ae
4 changed files with 192 additions and 0 deletions

View File

@ -0,0 +1,180 @@
#!/usr/bin/env python3
# Copyright 2023 ArduPilot.org.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Run takeoff test on Copter.
Warning - This is NOT production code; it's a simple demo of capability.
"""
import math
import rclpy
import time
import errno
from rclpy.node import Node
from builtin_interfaces.msg import Time
from ardupilot_msgs.msg import GlobalPosition
from geographic_msgs.msg import GeoPoseStamped
from geopy import distance
from geopy import point
from ardupilot_msgs.srv import ArmMotors
from ardupilot_msgs.srv import ModeSwitch
from ardupilot_msgs.srv import Takeoff
COPTER_MODE_GUIDED = 4
TAKEOFF_ALT = 10.5
class CopterTakeoff(Node):
"""Copter takeoff using guided control."""
def __init__(self):
"""Initialise the node."""
super().__init__("copter_takeoff")
self.declare_parameter("arm_topic", "/ap/arm_motors")
self._arm_topic = self.get_parameter("arm_topic").get_parameter_value().string_value
self._client_arm = self.create_client(ArmMotors, self._arm_topic)
while not self._client_arm.wait_for_service(timeout_sec=1.0):
self.get_logger().info('arm service not available, waiting again...')
self.declare_parameter("mode_topic", "/ap/mode_switch")
self._mode_topic = self.get_parameter("mode_topic").get_parameter_value().string_value
self._client_mode_switch = self.create_client(ModeSwitch, self._mode_topic)
while not self._client_mode_switch.wait_for_service(timeout_sec=1.0):
self.get_logger().info('mode switch service not available, waiting again...')
self.declare_parameter("takeoff_service", "/ap/experimental/takeoff")
self._takeoff_topic = self.get_parameter("takeoff_service").get_parameter_value().string_value
self._client_takeoff = self.create_client(Takeoff, self._takeoff_topic)
while not self._client_takeoff.wait_for_service(timeout_sec=1.0):
self.get_logger().info('takeoff service not available, waiting again...')
self.declare_parameter("geopose_topic", "/ap/geopose/filtered")
self._geopose_topic = self.get_parameter("geopose_topic").get_parameter_value().string_value
qos = rclpy.qos.QoSProfile(
reliability=rclpy.qos.ReliabilityPolicy.BEST_EFFORT, durability=rclpy.qos.DurabilityPolicy.VOLATILE, depth=1
)
self._subscription_geopose = self.create_subscription(GeoPoseStamped, self._geopose_topic, self.geopose_cb, qos)
self._cur_geopose = GeoPoseStamped()
def geopose_cb(self, msg: GeoPoseStamped):
"""Process a GeoPose message."""
stamp = msg.header.stamp
if stamp.sec:
self.get_logger().info("From AP : Geopose [sec:{}, nsec: {}]".format(stamp.sec, stamp.nanosec))
# Store current state
self._cur_geopose = msg
def arm(self):
req = ArmMotors.Request()
req.arm = True
future = self._client_arm.call_async(req)
rclpy.spin_until_future_complete(self, future)
return future.result()
def arm_with_timeout(self, timeout: rclpy.duration.Duration):
"""Try to arm. Returns true on success, or false if arming fails or times out."""
armed = False
start = self.get_clock().now()
while not armed and self.get_clock().now() - start < timeout:
armed = self.arm().result
time.sleep(1)
return armed
def switch_mode(self, mode):
req = ModeSwitch.Request()
assert mode in [COPTER_MODE_GUIDED]
req.mode = mode
future = self._client_mode_switch.call_async(req)
rclpy.spin_until_future_complete(self, future)
return future.result()
def switch_mode_with_timeout(self, desired_mode: int, timeout: rclpy.duration.Duration):
"""Try to switch mode. Returns true on success, or false if mode switch fails or times out."""
is_in_desired_mode = False
start = self.get_clock().now()
while not is_in_desired_mode:
result = self.switch_mode(desired_mode)
# Handle successful switch or the case that the vehicle is already in expected mode
is_in_desired_mode = result.status or result.curr_mode == desired_mode
time.sleep(1)
return is_in_desired_mode
def takeoff(self, alt):
req = Takeoff.Request()
req.alt = alt
future = self._client_takeoff.call_async(req)
rclpy.spin_until_future_complete(self, future)
return future.result()
def takeoff_with_timeout(self, alt: int, timeout: rclpy.duration.Duration):
"""Try to takeoff. Returns true on success, or false if takeoff fails or times out."""
takeoff_success = False
start = self.get_clock().now()
while not takeoff_success:
result = self.takeoff(alt)
takeoff_success = result.status
time.sleep(1)
return takeoff_success
def get_cur_geopose(self):
"""Return latest geopose."""
return self._cur_geopose
def main(args=None):
"""Node entry point."""
rclpy.init(args=args)
node = CopterTakeoff()
try:
if not node.switch_mode_with_timeout(COPTER_MODE_GUIDED, rclpy.duration.Duration(seconds=20)):
raise RuntimeError("Unable to switch to guided mode")
# Block till armed, which will wait for EKF3 to initialize
if not node.arm_with_timeout(rclpy.duration.Duration(seconds=30)):
raise RuntimeError("Unable to arm")
# Block till in takeoff
if not node.takeoff_with_timeout(TAKEOFF_ALT, rclpy.duration.Duration(seconds=20)):
raise RuntimeError("Unable to takeoff")
is_ascending_to_takeoff_alt = True
while is_ascending_to_takeoff_alt:
rclpy.spin_once(node)
time.sleep(1.0)
is_ascending_to_takeoff_alt = node.get_cur_geopose().pose.position.altitude < TAKEOFF_ALT
if is_ascending_to_takeoff_alt:
raise RuntimeError("Failed to reach takeoff altitude")
except KeyboardInterrupt:
pass
# Destroy the node explicitly.
node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()

View File

@ -27,6 +27,7 @@ setup(
"time_listener = ardupilot_dds_tests.time_listener:main",
"plane_waypoint_follower = ardupilot_dds_tests.plane_waypoint_follower:main",
"pre_arm_check = ardupilot_dds_tests.pre_arm_check_service:main",
"copter_takeoff = ardupilot_dds_tests.copter_takeoff:main",
],
},
)

View File

@ -17,6 +17,7 @@ rosidl_generate_interfaces(${PROJECT_NAME}
"msg/Status.msg"
"srv/ArmMotors.srv"
"srv/ModeSwitch.srv"
"srv/Takeoff.srv"
DEPENDENCIES geometry_msgs std_msgs
ADD_LINTER_TESTS
)

View File

@ -0,0 +1,10 @@
# This service requests the vehicle to takeoff
# alt : Set the takeoff altitude above home or above terrain(in case of rangefinder)
float32 alt
---
# status : True if the request for mode switch was successful, False otherwise
bool status