Tools: ros2: Clean up copter takeoff

* Finish timeout implementation missing variables
* Remove unused imports

Signed-off-by: Ryan Friedman <25047695+Ryanf55@users.noreply.github.com>
This commit is contained in:
Ryan Friedman 2024-12-30 18:21:22 -07:00 committed by Andrew Tridgell
parent bb96db5466
commit 01345e5a38

View File

@ -20,27 +20,21 @@ Run takeoff test on Copter.
Warning - This is NOT production code; it's a simple demo of capability. Warning - This is NOT production code; it's a simple demo of capability.
""" """
import math
import rclpy import rclpy
import time import time
import errno
from rclpy.node import Node from rclpy.node import Node
from builtin_interfaces.msg import Time
from ardupilot_msgs.msg import GlobalPosition
from geographic_msgs.msg import GeoPoseStamped from geographic_msgs.msg import GeoPoseStamped
from geopy import distance
from geopy import point
from ardupilot_msgs.srv import ArmMotors from ardupilot_msgs.srv import ArmMotors
from ardupilot_msgs.srv import ModeSwitch from ardupilot_msgs.srv import ModeSwitch
from ardupilot_msgs.srv import Takeoff from ardupilot_msgs.srv import Takeoff
COPTER_MODE_GUIDED = 4 COPTER_MODE_GUIDED = 4
TAKEOFF_ALT = 10.5 TAKEOFF_ALT = 10.5
class CopterTakeoff(Node): class CopterTakeoff(Node):
"""Copter takeoff using guided control.""" """Copter takeoff using guided control."""
@ -65,7 +59,7 @@ class CopterTakeoff(Node):
self._client_takeoff = self.create_client(Takeoff, self._takeoff_topic) self._client_takeoff = self.create_client(Takeoff, self._takeoff_topic)
while not self._client_takeoff.wait_for_service(timeout_sec=1.0): while not self._client_takeoff.wait_for_service(timeout_sec=1.0):
self.get_logger().info('takeoff service not available, waiting again...') self.get_logger().info('takeoff service not available, waiting again...')
self.declare_parameter("geopose_topic", "/ap/geopose/filtered") self.declare_parameter("geopose_topic", "/ap/geopose/filtered")
self._geopose_topic = self.get_parameter("geopose_topic").get_parameter_value().string_value self._geopose_topic = self.get_parameter("geopose_topic").get_parameter_value().string_value
qos = rclpy.qos.QoSProfile( qos = rclpy.qos.QoSProfile(
@ -84,7 +78,6 @@ class CopterTakeoff(Node):
# Store current state # Store current state
self._cur_geopose = msg self._cur_geopose = msg
def arm(self): def arm(self):
req = ArmMotors.Request() req = ArmMotors.Request()
req.arm = True req.arm = True
@ -113,14 +106,14 @@ class CopterTakeoff(Node):
"""Try to switch mode. Returns true on success, or false if mode switch fails or times out.""" """Try to switch mode. Returns true on success, or false if mode switch fails or times out."""
is_in_desired_mode = False is_in_desired_mode = False
start = self.get_clock().now() start = self.get_clock().now()
while not is_in_desired_mode: while not is_in_desired_mode and self.get_clock().now() - start < timeout:
result = self.switch_mode(desired_mode) result = self.switch_mode(desired_mode)
# Handle successful switch or the case that the vehicle is already in expected mode # Handle successful switch or the case that the vehicle is already in expected mode
is_in_desired_mode = result.status or result.curr_mode == desired_mode is_in_desired_mode = result.status or result.curr_mode == desired_mode
time.sleep(1) time.sleep(1)
return is_in_desired_mode return is_in_desired_mode
def takeoff(self, alt): def takeoff(self, alt):
req = Takeoff.Request() req = Takeoff.Request()
req.alt = alt req.alt = alt
@ -132,17 +125,18 @@ class CopterTakeoff(Node):
"""Try to takeoff. Returns true on success, or false if takeoff fails or times out.""" """Try to takeoff. Returns true on success, or false if takeoff fails or times out."""
takeoff_success = False takeoff_success = False
start = self.get_clock().now() start = self.get_clock().now()
while not takeoff_success: while not takeoff_success and self.get_clock().now() - start < timeout:
result = self.takeoff(alt) result = self.takeoff(alt)
takeoff_success = result.status takeoff_success = result.status
time.sleep(1) time.sleep(1)
return takeoff_success return takeoff_success
def get_cur_geopose(self): def get_cur_geopose(self):
"""Return latest geopose.""" """Return latest geopose."""
return self._cur_geopose return self._cur_geopose
def main(args=None): def main(args=None):
"""Node entry point.""" """Node entry point."""
rclpy.init(args=args) rclpy.init(args=args)