Tools: create and use WaitAndMaintainServoChannelValue

allows for checking for "maintain" on servo channel values
This commit is contained in:
Peter Barker 2024-09-06 23:50:19 +10:00 committed by Peter Barker
parent d74050c5e3
commit ab77509551
2 changed files with 64 additions and 6 deletions

View File

@ -4697,22 +4697,40 @@ class AutoTestPlane(vehicle_test_suite.TestSuite):
# Throw the catapult.
self.set_servo(7, 2000)
# we expect to maintain this throttle level past the takeoff
# altitude through to our takeoff altitude:
expected_takeoff_throttle = 1000+10*self.get_parameter("TKOFF_THR_MAX")
# Check whether we're at max throttle below TKOFF_LVL_ALT.
test_alt = self.get_parameter("TKOFF_LVL_ALT")-10
self.wait_altitude(test_alt, test_alt+2, relative=True)
self.assert_servo_channel_value(3, 1000+10*self.get_parameter("TKOFF_THR_MAX"))
w = vehicle_test_suite.WaitAndMaintainServoChannelValue(
self,
3, # throttle
expected_takeoff_throttle,
epsilon=1,
minimum_duration=1,
)
w.run()
# Check whether we're still at max throttle past TKOFF_LVL_ALT.
test_alt = self.get_parameter("TKOFF_LVL_ALT")+10
self.wait_altitude(test_alt, test_alt+2, relative=True)
self.assert_servo_channel_value(3, 1000+10*(self.get_parameter("TKOFF_THR_MAX")), operator.le)
self.assert_servo_channel_value(3, 1000+10*(self.get_parameter("TKOFF_THR_MAX"))-1, operator.ge)
w = vehicle_test_suite.WaitAndMaintainServoChannelValue(
self,
3, # throttle
expected_takeoff_throttle,
epsilon=1,
minimum_duration=1,
)
w.run()
# Wait for the takeoff to complete.
target_alt = self.get_parameter("TKOFF_ALT")
self.wait_altitude(target_alt-5, target_alt, relative=True)
self.wait_altitude(target_alt-2.5, target_alt+2.5, relative=True, minimum_duration=10, timeout=30)
self.fly_home_land_and_disarm()
self.reboot_sitl(force=True)
def TakeoffTakeoff4(self):
'''Test the behaviour of a takeoff in TAKEOFF mode, pt4.'''

View File

@ -472,13 +472,17 @@ class WaitAndMaintain(object):
minimum_duration=None,
progress_print_interval=1,
timeout=30,
epsilon=None,
comparator=None,
):
self.test_suite = test_suite
self.minimum_duration = minimum_duration
self.achieving_duration_start = None
self.timeout = timeout
self.epsilon = epsilon
self.last_progress_print = 0
self.progress_print_interval = progress_print_interval
self.comparator = comparator
def run(self):
self.announce_test_start()
@ -533,7 +537,14 @@ class WaitAndMaintain(object):
return f"want={self.get_target_value()} got={value}"
def validate_value(self, value):
return value == self.get_target_value()
target_value = self.get_target_value()
if self.comparator is not None:
return self.comparator(value, target_value)
if self.epsilon is not None:
return (abs(value - target_value) <= self.epsilon)
return value == target_value
def timeoutexception(self):
return AutoTestTimeoutException("Failed to attain or maintain value")
@ -675,6 +686,35 @@ class WaitAndMaintainArmed(WaitAndMaintain):
return "Ensuring vehicle remains armed"
class WaitAndMaintainServoChannelValue(WaitAndMaintain):
def __init__(self, test_suite, channel, value, **kwargs):
super(WaitAndMaintainServoChannelValue, self).__init__(test_suite, **kwargs)
self.channel = channel
self.value = value
def announce_start_text(self):
str_operator = ""
if self.comparator == operator.lt:
str_operator = "less than "
elif self.comparator == operator.gt:
str_operator = "more than "
return f"Waiting for SERVO_OUTPUT_RAW.servo{self.channel}_value value {str_operator}{self.value}"
def get_target_value(self):
return self.value
def get_current_value(self):
m = self.test_suite.assert_receive_message('SERVO_OUTPUT_RAW', timeout=10)
channel_field = "servo%u_raw" % self.channel
m_value = getattr(m, channel_field, None)
if m_value is None:
raise ValueError(f"message ({str(m)}) has no field {channel_field}")
self.last_SERVO_OUTPUT_RAW = m
return m_value
class MSP_Generic(Telem):
def __init__(self, destination_address):
super(MSP_Generic, self).__init__(destination_address)