Tools: autotest: tweak output messages, loosen camera trigger timeout

This commit is contained in:
Peter Barker 2018-09-08 09:48:45 +10:00 committed by Peter Barker
parent 6e44ea13d4
commit a7042e3847
3 changed files with 96 additions and 27 deletions

View File

@ -1270,12 +1270,23 @@ class AutoTestCopter(AutoTest):
count_start = -1
count_stop = -1
tstart = self.get_sim_time()
last_mission_current_msg = 0
last_seq = None
while self.armed(): # we RTL at end of mission
now = self.get_sim_time()
now = self.get_sim_time_cached()
if now - tstart > 120:
raise AutoTestTimeoutException()
m = self.mav.recv_match(type='MISSION_CURRENT', blocking=True)
self.progress("MISSION_CURRENT.seq=%u" % (m.seq,))
if ((now - last_mission_current_msg) > 1 or
m.seq != last_seq):
dist = None
x = self.mav.messages.get("NAV_CONTROLLER_OUTPUT", None)
if x is not None:
dist = x.wp_dist
self.progress("MISSION_CURRENT.seq=%u dist=%s" %
(m.seq, dist))
last_mission_current_msg = self.get_sim_time_cached()
last_seq = m.seq
if m.seq == 3:
self.progress("At delay item")
if count_start == -1:

View File

@ -509,9 +509,23 @@ class AutoTestPlane(AutoTest):
self.wait_mode('AUTO')
self.wait_ready_to_arm()
self.arm_vehicle()
tstart = self.get_sim_time_cached()
last_mission_current_msg = 0
last_seq = None
while self.armed():
m = self.mav.recv_match(type='MISSION_CURRENT', blocking=True)
self.progress("MISSION_CURRENT.seq=%u" % (m.seq,))
time_delta = (self.get_sim_time_cached() -
last_mission_current_msg)
if (time_delta >1 or
m.seq != last_seq):
dist = None
x = self.mav.messages.get("NAV_CONTROLLER_OUTPUT", None)
if x is not None:
dist = x.wp_dist
self.progress("MISSION_CURRENT.seq=%u (dist=%s)" %
(m.seq,str(dist)))
last_mission_current_msg = self.get_sim_time_cached()
last_seq = m.seq
# flaps should undeploy at the end
self.wait_servo_channel_value(servo_ch, servo_ch_min, timeout=30)
@ -557,7 +571,7 @@ class AutoTestPlane(AutoTest):
raise PreconditionFailedException()
self.set_rc(12, 2000)
tstart = self.get_sim_time()
while self.get_sim_time() - tstart < 1:
while self.get_sim_time() - tstart < 10:
x = self.mav.messages.get("CAMERA_FEEDBACK", None)
if x is not None:
break
@ -648,4 +662,7 @@ class AutoTestPlane(AutoTest):
if len(self.fail_list):
self.progress("FAILED: %s" % self.fail_list)
return False
self.progress("Max set_rc_timeout=%s" % self.max_set_rc_timeout);
return True

View File

@ -121,6 +121,7 @@ class AutoTest(ABC):
self.buildlog = None
self.copy_tlog = False
self.logfile = None
self.max_set_rc_timeout = 0
@staticmethod
def progress(text):
@ -292,6 +293,13 @@ class AutoTest(ABC):
continue
util.pexpect_drain(p)
def drain_mav(self):
count = 0
while self.mav.recv_match(type='SYSTEM_TIME', blocking=False) is not None:
count += 1
self.progress("Drained %u messages from mav" % count)
#################################################
# SIM UTILITIES
#################################################
@ -401,15 +409,31 @@ class AutoTest(ABC):
for chan in range(1, 16):
self.mavproxy.send('rc %u 1500\n' % chan)
def set_rc(self, chan, pwm, timeout=20):
def set_rc(self, chan, pwm, timeout=2000):
"""Setup a simulated RC control to a PWM value"""
self.drain_mav()
tstart = self.get_sim_time()
wclock = time.time()
while self.get_sim_time_cached() < tstart + timeout:
self.mavproxy.send('rc %u %u\n' % (chan, pwm))
m = self.mav.recv_match(type='RC_CHANNELS', blocking=True)
chan_pwm = getattr(m, "chan" + str(chan) + "_raw")
self.progress("set_rc: want=%fu got=%u" % (pwm, chan_pwm))
wclock_delta = time.time() - wclock
sim_time_delta = self.get_sim_time_cached()-tstart
if sim_time_delta == 0:
time_ratio = None
else:
time_ratio = wclock_delta / sim_time_delta
self.progress("set_rc (wc=%s st=%s r=%s): want=%u got=%u" %
(wclock_delta,
sim_time_delta,
time_ratio,
pwm,
chan_pwm))
if chan_pwm == pwm:
delta = self.get_sim_time_cached() - tstart
if delta > self.max_set_rc_timeout:
self.max_set_rc_timeout = delta
return True
self.progress("Failed to send RC commands to channel %s" % str(chan))
raise SetRCTimeout()
@ -818,7 +842,8 @@ class AutoTest(ABC):
tstart = self.get_sim_time()
self.progress("Waiting for altitude between %u and %u" %
(alt_min, alt_max))
while self.get_sim_time() < tstart + timeout:
last_wait_alt_msg = 0
while self.get_sim_time_cached() < tstart + timeout:
m = self.mav.recv_match(type='GLOBAL_POSITION_INT', blocking=True)
if m is None:
continue
@ -829,8 +854,10 @@ class AutoTest(ABC):
climb_rate = alt - previous_alt
previous_alt = alt
self.progress("Wait Altitude: Cur:%u, min_alt:%u, climb_rate: %u"
% (alt, alt_min, climb_rate))
if self.get_sim_time_cached() - last_wait_alt_msg > 1:
self.progress("Wait Altitude: Cur:%u, min_alt:%u, climb_rate: %u"
% (alt, alt_min, climb_rate))
last_wait_alt_msg = self.get_sim_time_cached()
if alt >= alt_min and alt <= alt_max:
self.progress("Altitude OK")
return True
@ -839,13 +866,16 @@ class AutoTest(ABC):
def wait_groundspeed(self, gs_min, gs_max, timeout=30):
"""Wait for a given ground speed range."""
tstart = self.get_sim_time()
self.progress("Waiting for groundspeed between %.1f and %.1f" %
(gs_min, gs_max))
while self.get_sim_time() < tstart + timeout:
last_print = 0
tstart = self.get_sim_time()
while self.get_sim_time_cached() < tstart + timeout:
m = self.mav.recv_match(type='VFR_HUD', blocking=True)
self.progress("Wait groundspeed %.1f, target:%.1f" %
(m.groundspeed, gs_min))
if self.get_sim_time_cached() - last_print > 1:
self.progress("Wait groundspeed %.1f, target:%.1f" %
(m.groundspeed, gs_min))
last_print = self.get_sim_time_cached();
if m.groundspeed >= gs_min and m.groundspeed <= gs_max:
return True
self.progress("Failed to attain groundspeed range")
@ -886,19 +916,16 @@ class AutoTest(ABC):
tstart = self.get_sim_time()
self.progress("Waiting for heading %u with accuracy %u" %
(heading, accuracy))
last_heading = -1
last_print_time = 0
while True:
now = self.get_sim_time_cached()
if now - tstart >= timeout:
break
m = self.mav.recv_match(type='VFR_HUD', blocking=True)
if (last_heading != m.heading or
now - last_print_time > 1):
if now - last_print_time > 1:
self.progress("Heading %u (want %f +- %f)" % (
m.heading, heading, accuracy))
last_print_time = now
last_heading = heading
if math.fabs(m.heading - heading) <= accuracy:
self.progress("Attained heading %u" % heading)
return True
@ -909,10 +936,14 @@ class AutoTest(ABC):
"""Wait for flight of a given distance."""
tstart = self.get_sim_time()
start = self.mav.location()
last_distance_message = 0
while self.get_sim_time() < tstart + timeout:
pos = self.mav.location()
delta = self.get_distance(start, pos)
self.progress("Distance %.2f meters" % delta)
if self.get_sim_time_cached() - last_distance_message >= 1:
self.progress("Distance=%.2f meters want=%.2f" %
(delta, distance))
last_distance_message = self.get_sim_time_cached()
if math.fabs(delta - distance) <= accuracy:
self.progress("Attained distance %.2f meters OK" % delta)
return True
@ -955,10 +986,13 @@ class AutoTest(ABC):
self.progress("Waiting for location"
"%.4f,%.4f at altitude %.1f height_accuracy=%.1f" %
(loc.lat, loc.lng, target_altitude, height_accuracy))
last_distance_message = 0
while self.get_sim_time() < tstart + timeout:
pos = self.mav.location()
delta = self.get_distance(loc, pos)
self.progress("Distance %.2f meters alt %.1f" % (delta, pos.alt))
if self.get_sim_time_cached() - last_distance_message >= 1:
self.progress("Distance %.2f meters alt %.1f" % (delta, pos.alt))
last_distance_message = self.get_sim_time_cached()
if delta <= accuracy:
height_delta = math.fabs(pos.alt - target_altitude)
if height_accuracy != -1 and height_delta > height_accuracy:
@ -981,14 +1015,15 @@ class AutoTest(ABC):
current_wp = start_wp
mode = self.mav.flightmode
self.progress("\ntest: wait for waypoint ranges start=%u end=%u\n\n"
self.progress("wait for waypoint ranges start=%u end=%u"
% (wpnum_start, wpnum_end))
# if start_wp != wpnum_start:
# self.progress("test: Expected start waypoint %u but got %u" %
# (wpnum_start, start_wp))
# raise WaitWaypointTimeout()
while self.get_sim_time() < tstart + timeout:
last_wp_msg = 0
while self.get_sim_time_cached() < tstart + timeout:
seq = self.mav.waypoint_current()
m = self.mav.recv_match(type='NAV_CONTROLLER_OUTPUT',
blocking=True)
@ -1000,9 +1035,11 @@ class AutoTest(ABC):
self.progress('Exited %s mode' % mode)
raise WaitWaypointTimeout()
self.progress("test: WP %u (wp_dist=%u Alt=%d), current_wp: %u,"
"wpnum_end: %u" %
(seq, wp_dist, m.alt, current_wp, wpnum_end))
if self.get_sim_time_cached() - last_wp_msg > 1:
self.progress("WP %u (wp_dist=%u Alt=%d), current_wp: %u,"
"wpnum_end: %u" %
(seq, wp_dist, m.alt, current_wp, wpnum_end))
last_wp_msg = self.get_sim_time_cached()
if seq == current_wp+1 or (seq > current_wp+1 and allow_skip):
self.progress("test: Starting new waypoint %u" % seq)
tstart = self.get_sim_time()
@ -1071,15 +1108,19 @@ class AutoTest(ABC):
mavutil.mavlink.ESTIMATOR_GPS_GLITCH |
mavutil.mavlink.ESTIMATOR_ACCEL_ERROR)
self.progress("Waiting for EKF value %u" % required_value)
while timeout is None or self.get_sim_time() < tstart + timeout:
last_err_print_time = 0
last_print_time = 0
while timeout is None or self.get_sim_time_cached() < tstart + timeout:
m = self.mav.recv_match(type='EKF_STATUS_REPORT', blocking=True)
current = m.flags
if (tstart - self.get_sim_time()) % 5 == 0:
if self.get_sim_time_cached() - last_print_time > 1:
self.progress("Wait EKF.flags: required:%u current:%u" %
(required_value, current))
last_print_time = self.get_sim_time_cached()
errors = current & error_bits
if errors:
if errors and self.get_sim_time_cached() - last_err_print_time > 1:
self.progress("Wait EKF.flags: errors=%u" % errors)
last_err_print_time = self.get_sim_time_cached()
continue
if (current & required_value == required_value):
self.progress("EKF Flags OK")