#!/usr/bin/env python # Drive APMrover2 in SITL from __future__ import print_function import os import pexpect import time from common import AutoTest from common import AutoTestTimeoutException from common import MsgRcvTimeoutException from common import NotAchievedException from common import PreconditionFailedException from pysim import util from pymavlink import mavutil # get location of scripts testdir = os.path.dirname(os.path.realpath(__file__)) SITL_START_LOCATION = mavutil.location(40.071374969556928, -105.22978898137808, 1583.702759, 246) class AutoTestRover(AutoTest): @staticmethod def get_not_armable_mode_list(): return [] @staticmethod def get_not_disarmed_settable_modes_list(): return ["FOLLOW"] @staticmethod def get_no_position_not_settable_modes_list(): return [] @staticmethod def get_position_armable_modes_list(): return ["GUIDED", "LOITER", "STEERING", "AUTO", "RTL", "SMART_RTL"] @staticmethod def get_normal_armable_modes_list(): return ["ACRO", "HOLD", "MANUAL"] def log_name(self): return "APMrover2" def test_filepath(self): return os.path.realpath(__file__) def sitl_start_location(self): return SITL_START_LOCATION def default_frame(self): return "rover" def is_rover(self): return True def get_stick_arming_channel(self): return int(self.get_parameter("RCMAP_ROLL")) def arming_test_mission(self): return os.path.join(testdir, "ArduRover-Missions", "test_arming.txt") ########################################################## # TESTS DRIVE ########################################################## # Drive a square in manual mode def drive_square(self, side=50): """Drive a square, Driving N then E .""" self.context_push() ex = None try: self.progress("TEST SQUARE") self.set_parameter("RC7_OPTION", 7) self.set_parameter("RC9_OPTION", 58) self.mavproxy.send('switch 5\n') self.wait_mode('MANUAL') self.wait_ready_to_arm() self.arm_vehicle() self.clear_wp(9) # first aim north self.progress("\nTurn right towards north") self.reach_heading_manual(10) # save bottom left corner of box as home AND waypoint self.progress("Save HOME") self.save_wp() self.progress("Save WP") self.save_wp() # pitch forward to fly north self.progress("\nGoing north %u meters" % side) self.reach_distance_manual(side) # save top left corner of square as waypoint self.progress("Save WP") self.save_wp() # roll right to fly east self.progress("\nGoing east %u meters" % side) self.reach_heading_manual(100) self.reach_distance_manual(side) # save top right corner of square as waypoint self.progress("Save WP") self.save_wp() # pitch back to fly south self.progress("\nGoing south %u meters" % side) self.reach_heading_manual(190) self.reach_distance_manual(side) # save bottom right corner of square as waypoint self.progress("Save WP") self.save_wp() # roll left to fly west self.progress("\nGoing west %u meters" % side) self.reach_heading_manual(280) self.reach_distance_manual(side) # save bottom left corner of square (should be near home) as waypoint self.progress("Save WP") self.save_wp() self.progress("Checking number of saved waypoints") num_wp = self.save_mission_to_file( os.path.join(testdir, "rover-ch7_mission.txt")) expected = 7 # home + 6 toggled in if num_wp != expected: raise NotAchievedException("Did not get %u waypoints; got %u" % (expected, num_wp)) # TODO: actually drive the mission self.clear_wp(9) except Exception as e: self.progress("Caught exception: %s" % self.get_exception_stacktrace(e)) ex = e self.disarm_vehicle() self.context_pop() if ex: raise ex def drive_left_circuit(self): """Drive a left circuit, 50m on a side.""" self.mavproxy.send('switch 6\n') self.wait_mode('MANUAL') self.set_rc(3, 2000) self.progress("Driving left circuit") # do 4 turns for i in range(0, 4): # hard left self.progress("Starting turn %u" % i) self.set_rc(1, 1000) self.wait_heading(270 - (90*i), accuracy=10) self.set_rc(1, 1500) self.progress("Starting leg %u" % i) self.wait_distance(50, accuracy=7) self.set_rc(3, 1500) self.progress("Circuit complete") # def test_throttle_failsafe(self, home, distance_min=10, side=60, # timeout=300): # """Fly east, Failsafe, return, land.""" # # self.mavproxy.send('switch 6\n') # manual mode # self.wait_mode('MANUAL') # self.mavproxy.send("param set FS_ACTION 1\n") # # # first aim east # self.progress("turn east") # if not self.reach_heading_manual(135): # return False # # # fly east 60 meters # self.progress("# Going forward %u meters" % side) # if not self.reach_distance_manual(side): # return False # # # pull throttle low # self.progress("# Enter Failsafe") # self.mavproxy.send('rc 3 900\n') # # tstart = self.get_sim_time() # success = False # while self.get_sim_time() < tstart + timeout and not success: # m = self.mav.recv_match(type='VFR_HUD', blocking=True) # pos = self.mav.location() # home_distance = self.get_distance(home, pos) # self.progress("Alt: %u HomeDistance: %.0f" % # (m.alt, home_distance)) # # check if we've reached home # if home_distance <= distance_min: # self.progress("RTL Complete") # success = True # # # reduce throttle # self.mavproxy.send('rc 3 1500\n') # self.mavproxy.expect('APM: Failsafe ended') # self.mavproxy.send('switch 2\n') # manual mode # self.wait_heartbeat() # self.wait_mode('MANUAL') # # if success: # self.progress("Reached failsafe home OK") # return True # else: # self.progress("Failed to reach Home on failsafe RTL - " # "timed out after %u seconds" % timeout) # return False def test_sprayer(self): """Test sprayer functionality.""" self.context_push() ex = None try: rc_ch = 5 pump_ch = 5 spinner_ch = 6 pump_ch_min = 1050 pump_ch_trim = 1520 pump_ch_max = 1950 spinner_ch_min = 975 spinner_ch_trim = 1510 spinner_ch_max = 1975 self.set_parameter("SPRAY_ENABLE", 1) self.set_parameter("SERVO%u_FUNCTION" % pump_ch, 22) self.set_parameter("SERVO%u_MIN" % pump_ch, pump_ch_min) self.set_parameter("SERVO%u_TRIM" % pump_ch, pump_ch_trim) self.set_parameter("SERVO%u_MAX" % pump_ch, pump_ch_max) self.set_parameter("SERVO%u_FUNCTION" % spinner_ch, 23) self.set_parameter("SERVO%u_MIN" % spinner_ch, spinner_ch_min) self.set_parameter("SERVO%u_TRIM" % spinner_ch, spinner_ch_trim) self.set_parameter("SERVO%u_MAX" % spinner_ch, spinner_ch_max) self.set_parameter("SIM_SPR_ENABLE", 1) self.fetch_parameters() self.set_parameter("SIM_SPR_PUMP", pump_ch) self.set_parameter("SIM_SPR_SPIN", spinner_ch) self.set_parameter("RC%u_OPTION" % rc_ch, 15) self.set_parameter("LOG_DISARMED", 1) self.reboot_sitl() self.wait_ready_to_arm() self.arm_vehicle() self.progress("test bootup state - it's zero-output!") self.wait_servo_channel_value(spinner_ch, 0) self.wait_servo_channel_value(pump_ch, 0) self.progress("Enable sprayer") self.set_rc(rc_ch, 2000) self.progress("Testing zero-speed state") self.wait_servo_channel_value(spinner_ch, spinner_ch_min) self.wait_servo_channel_value(pump_ch, pump_ch_min) self.progress("Testing turning it off") self.set_rc(rc_ch, 1000) self.wait_servo_channel_value(spinner_ch, spinner_ch_min) self.wait_servo_channel_value(pump_ch, pump_ch_min) self.progress("Testing turning it back on") self.set_rc(rc_ch, 2000) self.wait_servo_channel_value(spinner_ch, spinner_ch_min) self.wait_servo_channel_value(pump_ch, pump_ch_min) self.progress("Testing speed-ramping") self.set_rc(3, 1700) # start driving forward # this is somewhat empirical... self.wait_servo_channel_value(pump_ch, 1695, timeout=60) self.progress("Sprayer OK") except Exception as e: self.progress("Caught exception: %s" % self.get_exception_stacktrace(e)) ex = e self.context_pop() self.disarm_vehicle(force=True) self.reboot_sitl() if ex: raise ex ################################################# # AUTOTEST ALL ################################################# def drive_mission(self, filename): """Drive a mission from a file.""" self.progress("Driving mission %s" % filename) self.load_mission(filename) self.wait_ready_to_arm() self.arm_vehicle() self.mavproxy.send('switch 4\n') # auto mode self.set_rc(3, 1500) self.wait_mode('AUTO') self.wait_waypoint(1, 4, max_dist=5) self.wait_mode('HOLD', timeout=300) self.disarm_vehicle() self.progress("Mission OK") def test_gripper_mission(self): self.load_mission("rover-gripper-mission.txt") self.change_mode('AUTO') self.wait_ready_to_arm() self.arm_vehicle() self.mavproxy.expect("Gripper Grabbed") self.mavproxy.expect("Gripper Released") self.wait_mode("HOLD") self.disarm_vehicle() def do_get_banner(self): self.mavproxy.send("long DO_SEND_BANNER 1\n") start = time.time() while True: m = self.mav.recv_match(type='STATUSTEXT', blocking=True, timeout=1) if m is not None and "ArduRover" in m.text: self.progress("banner received: %s" % m.text) return if time.time() - start > 10: break raise MsgRcvTimeoutException("banner not received") def drive_brake_get_stopping_distance(self, speed): # measure our stopping distance: old_cruise_speed = self.get_parameter('CRUISE_SPEED') old_accel_max = self.get_parameter('ATC_ACCEL_MAX') # controller tends not to meet cruise speed (max of ~14 when 15 # set), thus *1.2 self.set_parameter('CRUISE_SPEED', speed*1.2) # at time of writing, the vehicle is only capable of 10m/s/s accel self.set_parameter('ATC_ACCEL_MAX', 15) self.change_mode("STEERING") self.set_rc(3, 2000) self.wait_groundspeed(15, 100) initial = self.mav.location() initial_time = time.time() while time.time() - initial_time < 2: # wait for a position update from the autopilot start = self.mav.location() if start != initial: break self.set_rc(3, 1500) self.wait_groundspeed(0, 0.2) # why do we not stop?! initial = self.mav.location() initial_time = time.time() while time.time() - initial_time < 2: # wait for a position update from the autopilot stop = self.mav.location() if stop != initial: break delta = self.get_distance(start, stop) self.set_parameter('CRUISE_SPEED', old_cruise_speed) self.set_parameter('ATC_ACCEL_MAX', old_accel_max) return delta def drive_brake(self): old_using_brake = self.get_parameter('ATC_BRAKE') old_cruise_speed = self.get_parameter('CRUISE_SPEED') self.set_parameter('CRUISE_SPEED', 15) self.set_parameter('ATC_BRAKE', 0) self.arm_vehicle() distance_without_brakes = self.drive_brake_get_stopping_distance(15) # brakes on: self.set_parameter('ATC_BRAKE', 1) distance_with_brakes = self.drive_brake_get_stopping_distance(15) # revert state: self.set_parameter('ATC_BRAKE', old_using_brake) self.set_parameter('CRUISE_SPEED', old_cruise_speed) delta = distance_without_brakes - distance_with_brakes if delta < distance_without_brakes * 0.05: # 5% isn't asking for much self.disarm_vehicle() raise NotAchievedException(""" Brakes have negligible effect (with=%0.2fm without=%0.2fm delta=%0.2fm) """ % (distance_with_brakes, distance_without_brakes, delta)) self.disarm_vehicle() self.progress( "Brakes work (with=%0.2fm without=%0.2fm delta=%0.2fm)" % (distance_with_brakes, distance_without_brakes, delta)) def drive_rtl_mission_max_distance_from_home(self): '''maximum distance allowed from home at end''' return 6.5 def drive_rtl_mission(self): self.wait_ready_to_arm() self.arm_vehicle() mission_filepath = os.path.join("ArduRover-Missions", "rtl.txt") self.load_mission(mission_filepath) self.change_mode("AUTO") self.mavproxy.expect('Mission: 3 RTL') self.drain_mav(); m = self.mav.recv_match(type='NAV_CONTROLLER_OUTPUT', blocking=True, timeout=1) if m is None: raise MsgRcvTimeoutException( "Did not receive NAV_CONTROLLER_OUTPUT message") wp_dist_min = 5 if m.wp_dist < wp_dist_min: raise PreconditionFailedException( "Did not start at least %f metres from destination (is=%f)" % (wp_dist_min, m.wp_dist)) self.progress("NAV_CONTROLLER_OUTPUT.wp_dist looks good (%u >= %u)" % (m.wp_dist, wp_dist_min,)) tstart = self.get_sim_time() while True: if self.get_sim_time_cached() - tstart > 600: raise NotAchievedException("Did not get home") self.progress("Distance home: %f (mode=%s)" % (self.distance_to_home(), self.mav.flightmode)) if self.mode_is('HOLD') or self.mode_is('ACRO'): # acro for balancebot break # the EKF doesn't pull us down to 0 speed: self.wait_groundspeed(0, 0.5, timeout=600) # current Rover blows straight past the home position and ends # up ~6m past the home point. home_distance = self.distance_to_home() home_distance_min = 5.5 home_distance_max = self.drive_rtl_mission_max_distance_from_home() if home_distance > home_distance_max: raise NotAchievedException( "Did not stop near home (%f metres distant (%f > want > %f))" % (home_distance, home_distance_min, home_distance_max)) self.disarm_vehicle() self.progress("RTL Mission OK (%fm)" % home_distance) def wait_distance_home_gt(self, distance, timeout=60): home_distance = None tstart = self.get_sim_time() while self.get_sim_time_cached() - tstart < timeout: # m = self.mav.recv_match(type='VFR_HUD', blocking=True) distance_home = self.distance_to_home(use_cached_home=True) self.progress("distance_home=%f want=%f" % (distance_home, distance)) if distance_home > distance: return self.drain_mav() raise NotAchievedException("Failed to get %fm from home (now=%f)" % (distance, home_distance)) def drive_fence_ac_avoidance(self): self.context_push() ex = None try: self.load_fence("rover-fence-ac-avoid.txt") self.set_parameter("FENCE_ENABLE", 0) self.set_parameter("PRX_TYPE", 10) self.set_parameter("RC10_OPTION", 40) # proximity-enable self.reboot_sitl() # start = self.mav.location() self.wait_ready_to_arm() self.arm_vehicle() # first make sure we can breach the fence: self.set_rc(10, 1000) self.change_mode("ACRO") self.set_rc(3, 1550) self.wait_distance_home_gt(25) self.change_mode("RTL") self.mavproxy.expect("APM: Reached destination") # now enable avoidance and make sure we can't: self.set_rc(10, 2000) self.change_mode("ACRO") self.wait_groundspeed(0, 0.7, timeout=60) # watch for speed zero self.wait_groundspeed(0, 0.2, timeout=120) except Exception as e: self.progress("Caught exception: %s" % self.get_exception_stacktrace(e)) ex = e self.context_pop() self.mavproxy.send("fence clear\n") self.disarm_vehicle(force=True) self.reboot_sitl() if ex: raise ex def test_servorelayevents(self): self.do_set_relay(0, 0) off = self.get_parameter("SIM_PIN_MASK") self.do_set_relay(0, 1) on = self.get_parameter("SIM_PIN_MASK") if on == off: raise NotAchievedException( "Pin mask unchanged after relay cmd") self.progress("Pin mask changed after relay command") def test_setting_modes_via_mavproxy_switch(self): fnoo = [(1, 'MANUAL'), (2, 'MANUAL'), (3, 'RTL'), # (4, 'AUTO'), # no mission, can't set auto (5, 'RTL'), # non-existant mode, should stay in RTL (6, 'MANUAL')] for (num, expected) in fnoo: self.mavproxy.send('switch %u\n' % num) self.wait_mode(expected) def test_setting_modes_via_mavproxy_mode_command(self): fnoo = [(1, 'ACRO'), (3, 'STEERING'), (4, 'HOLD'), ] for (num, expected) in fnoo: self.mavproxy.send('mode manual\n') self.wait_mode("MANUAL") self.mavproxy.send('mode %u\n' % num) self.wait_mode(expected) self.mavproxy.send('mode manual\n') self.wait_mode("MANUAL") self.mavproxy.send('mode %s\n' % expected) self.wait_mode(expected) def test_setting_modes_via_modeswitch(self): # test setting of modes through mode switch self.context_push() ex = None try: self.set_parameter("MODE_CH", 8) self.set_rc(8, 1000) # mavutil.mavlink.ROVER_MODE_HOLD: self.set_parameter("MODE6", 4) # mavutil.mavlink.ROVER_MODE_ACRO self.set_parameter("MODE5", 1) self.set_rc(8, 1800) # PWM for mode6 self.wait_mode("HOLD") self.set_rc(8, 1700) # PWM for mode5 self.wait_mode("ACRO") self.set_rc(8, 1800) # PWM for mode6 self.wait_mode("HOLD") self.set_rc(8, 1700) # PWM for mode5 self.wait_mode("ACRO") except Exception as e: self.progress("Exception caught") ex = e self.context_pop() if ex is not None: raise ex def test_setting_modes_via_auxswitches(self): self.context_push() ex = None try: self.set_parameter("MODE5", 1) self.mavproxy.send('switch 1\n') # random mode self.wait_heartbeat() self.change_mode('MANUAL') self.mavproxy.send('switch 5\n') # acro mode self.wait_mode("ACRO") self.set_rc(9, 1000) self.set_rc(10, 1000) self.set_parameter("RC9_OPTION", 53) # steering self.set_parameter("RC10_OPTION", 54) # hold self.set_rc(9, 1900) self.wait_mode("STEERING") self.set_rc(10, 1900) self.wait_mode("HOLD") # reset both switches - should go back to ACRO self.set_rc(9, 1000) self.set_rc(10, 1000) self.wait_mode("ACRO") self.set_rc(9, 1900) self.wait_mode("STEERING") self.set_rc(10, 1900) self.wait_mode("HOLD") self.set_rc(10, 1000) # this re-polls the mode switch self.wait_mode("ACRO") self.set_rc(9, 1000) except Exception as e: self.progress("Exception caught") ex = e self.context_pop() if ex is not None: raise ex def test_rc_override_cancel(self): self.change_mode('MANUAL') self.wait_ready_to_arm() self.zero_throttle() self.arm_vehicle() # start moving forward a little: normal_rc_throttle = 1700 throttle_override = 1900 self.progress("Establishing baseline RC input") self.mavproxy.send('rc 3 %u\n' % normal_rc_throttle) tstart = self.get_sim_time_cached() while True: if self.get_sim_time_cached() - tstart > 10: raise AutoTestTimeoutException("Did not get rc change") m = self.mav.recv_match(type='RC_CHANNELS', blocking=True) if m.chan3_raw == normal_rc_throttle: break self.progress("Set override with RC_CHANNELS_OVERRIDE") tstart = self.get_sim_time_cached() while True: if self.get_sim_time_cached() - tstart > 10: raise AutoTestTimeoutException("Did not override") self.progress("Sending throttle of %u" % (throttle_override,)) self.mav.mav.rc_channels_override_send( 1, # target system 1, # targe component 65535, # chan1_raw 65535, # chan2_raw throttle_override, # chan3_raw 65535, # chan4_raw 65535, # chan5_raw 65535, # chan6_raw 65535, # chan7_raw 65535) # chan8_raw m = self.mav.recv_match(type='RC_CHANNELS', blocking=True) self.progress("chan3=%f want=%f" % (m.chan3_raw, throttle_override)) if m.chan3_raw == throttle_override: break self.progress("disabling override and making sure we revert to RC input in good time") tstart = self.get_sim_time_cached() while True: if self.get_sim_time_cached() - tstart > 0.5: raise AutoTestTimeoutException("Did not cancel override") self.progress("Sending cancel of throttle override") self.mav.mav.rc_channels_override_send( 1, # target system 1, # targe component 65535, # chan1_raw 65535, # chan2_raw 0, # chan3_raw 65535, # chan4_raw 65535, # chan5_raw 65535, # chan6_raw 65535, # chan7_raw 65535) # chan8_raw m = self.mav.recv_match(type='RC_CHANNELS', blocking=True) self.progress("chan3=%f want=%f" % (m.chan3_raw, normal_rc_throttle)) if m.chan3_raw == normal_rc_throttle: break self.disarm_vehicle() def test_rc_overrides(self): self.context_push() ex = None try: self.set_parameter("RC12_OPTION", 46) self.reboot_sitl() self.mavproxy.send('switch 6\n') # Manual mode self.wait_mode('MANUAL') self.wait_ready_to_arm() self.mavproxy.send('rc 3 1500\n') # throttle at zero self.arm_vehicle() # start moving forward a little: normal_rc_throttle = 1700 self.mavproxy.send('rc 3 %u\n' % normal_rc_throttle) self.wait_groundspeed(5, 100) # allow overrides: self.set_rc(12, 2000) # now override to stop: throttle_override = 1500 tstart = self.get_sim_time_cached() while True: if self.get_sim_time_cached() - tstart > 10: raise AutoTestTimeoutException("Did not reach speed") self.progress("Sending throttle of %u" % (throttle_override,)) self.mav.mav.rc_channels_override_send( 1, # target system 1, # targe component 65535, # chan1_raw 65535, # chan2_raw throttle_override, # chan3_raw 65535, # chan4_raw 65535, # chan5_raw 65535, # chan6_raw 65535, # chan7_raw 65535) # chan8_raw m = self.mav.recv_match(type='VFR_HUD', blocking=True) want_speed = 2.0 self.progress("Speed=%f want=<%f" % (m.groundspeed, want_speed)) if m.groundspeed < want_speed: break # now override to stop - but set the switch on the RC # transmitter to deny overrides; this should send the # speed back up to 5 metres/second: self.set_rc(12, 1000) throttle_override = 1500 tstart = self.get_sim_time_cached() while True: if self.get_sim_time_cached() - tstart > 10: raise AutoTestTimeoutException("Did not stop") print("Sending throttle of %u" % (throttle_override,)) self.mav.mav.rc_channels_override_send( 1, # target system 1, # targe component 65535, # chan1_raw 65535, # chan2_raw throttle_override, # chan3_raw 65535, # chan4_raw 65535, # chan5_raw 65535, # chan6_raw 65535, # chan7_raw 65535) # chan8_raw m = self.mav.recv_match(type='VFR_HUD', blocking=True) want_speed = 5.0 print("Speed=%f want=>%f" % (m.groundspeed, want_speed)) if m.groundspeed > want_speed: break # re-enable RC overrides self.set_rc(12, 2000) # check we revert to normal RC inputs when gcs overrides cease: self.progress("Waiting for RC to revert to normal RC input") while True: m = self.mav.recv_match(type='RC_CHANNELS', blocking=True) print("%s" % m) if m.chan3_raw == normal_rc_throttle: break except Exception as e: self.progress("Exception caught") ex = e self.context_pop() self.disarm_vehicle() self.reboot_sitl() if ex is not None: raise ex def test_manual_control(self): self.context_push() ex = None try: self.set_parameter("RC12_OPTION", 46) # enable/disable rc overrides self.reboot_sitl() self.change_mode("MANUAL") self.wait_ready_to_arm() self.zero_throttle() self.arm_vehicle() self.progress("start moving forward a little") normal_rc_throttle = 1700 self.mavproxy.send('rc 3 %u\n' % normal_rc_throttle) self.wait_groundspeed(5, 100) self.progress("allow overrides") self.set_rc(12, 2000) self.progress("now override to stop") throttle_override_normalized = 0 expected_throttle = 0 # in VFR_HUD tstart = self.get_sim_time_cached() while True: if self.get_sim_time_cached() - tstart > 10: raise AutoTestTimeoutException("Did not reach speed") self.progress("Sending normalized throttle of %d" % (throttle_override_normalized,)) self.mav.mav.manual_control_send( 1, # target system 32767, # x (pitch) 32767, # y (roll) throttle_override_normalized, # z (thrust) 32767, # r (yaw) 0) # button mask m = self.mav.recv_match(type='VFR_HUD', blocking=True) want_speed = 2.0 self.progress("Speed=%f want=<%f throttle=%u want=%u" % (m.groundspeed, want_speed, m.throttle, expected_throttle)) if m.groundspeed < want_speed and m.throttle == expected_throttle: break self.progress("now override to stop - but set the switch on the RC transmitter to deny overrides; this should send the speed back up to 5 metres/second") self.set_rc(12, 1000) throttle_override_normalized = 500 expected_throttle = 36 # in VFR_HUD, corresponding to normal_rc_throttle adjusted for channel min/max tstart = self.get_sim_time_cached() while True: if self.get_sim_time_cached() - tstart > 10: raise AutoTestTimeoutException("Did not stop") print("Sending normalized throttle of %u" % (throttle_override_normalized,)) self.mav.mav.manual_control_send( 1, # target system 32767, # x (pitch) 32767, # y (roll) throttle_override_normalized, # z (thrust) 32767, # r (yaw) 0) # button mask m = self.mav.recv_match(type='VFR_HUD', blocking=True) want_speed = 5.0 self.progress("Speed=%f want=>%f throttle=%u want=%u" % (m.groundspeed, want_speed, m.throttle, expected_throttle)) if m.groundspeed > want_speed and m.throttle == expected_throttle: break # re-enable RC overrides self.set_rc(12, 2000) # check we revert to normal RC inputs when gcs overrides cease: self.progress("Waiting for RC to revert to normal RC input") while True: m = self.mav.recv_match(type='RC_CHANNELS', blocking=True) print("%s" % m) if m.chan3_raw == normal_rc_throttle: break except Exception as e: self.progress("Exception caught") ex = e self.context_pop() self.disarm_vehicle() self.reboot_sitl() if ex is not None: raise ex def test_camera_mission_items(self): self.context_push() ex = None try: self.load_mission("rover-camera-mission.txt") self.wait_ready_to_arm() self.change_mode("AUTO") self.wait_ready_to_arm() self.arm_vehicle() prev_cf = None while True: cf = self.mav.recv_match(type='CAMERA_FEEDBACK', blocking=True) if prev_cf is None: prev_cf = cf continue dist_travelled = self.get_distance_int(prev_cf, cf) prev_cf = cf mc = self.mav.messages.get("MISSION_CURRENT", None) if mc is None: continue elif mc.seq == 2: expected_distance = 2 elif mc.seq == 4: expected_distance = 5 elif mc.seq == 5: break else: continue self.progress("Expected distance %f got %f" % (expected_distance, dist_travelled)) error = abs(expected_distance - dist_travelled) # Rover moves at ~5m/s; we appear to do something at # 5Hz, so we do see over a meter of error! max_error = 1.5 if error > max_error: raise NotAchievedException("Camera distance error: %f (%f)" % (error, max_error)) self.disarm_vehicle() except Exception as e: self.progress("Exception caught") ex = e self.context_pop() if ex is not None: raise ex def test_do_set_mode_via_command_long(self): self.do_set_mode_via_command_long("HOLD") self.do_set_mode_via_command_long("MANUAL") def test_mavproxy_do_set_mode_via_command_long(self): self.mavproxy_do_set_mode_via_command_long("HOLD") self.mavproxy_do_set_mode_via_command_long("MANUAL") def test_sysid_enforce(self): '''Run the same arming code with correct then incorrect SYSID''' self.context_push() ex = None try: # if set_parameter is ever changed to not use MAVProxy # this test is going to break horribly. Sorry. self.set_parameter("SYSID_MYGCS", 255) # assume MAVProxy does this! self.set_parameter("SYSID_ENFORCE", 1) # assume MAVProxy does this! self.change_mode('MANUAL') self.progress("make sure I can arm ATM") self.wait_ready_to_arm() self.arm_vehicle(timeout=5) self.disarm_vehicle() # temporarily set a different system ID than MAVProxy: self.progress("Attempting to arm vehicle myself") old_srcSystem = self.mav.mav.srcSystem try: self.mav.mav.srcSystem = 243 self.arm_vehicle(timeout=5) self.disarm_vehicle() success = False except AutoTestTimeoutException as e: success = True pass self.mav.mav.srcSystem = old_srcSystem if not success: raise NotAchievedException( "Managed to arm with SYSID_ENFORCE set") self.progress("Attempting to arm vehicle from vehicle component") old_srcSystem = self.mav.mav.srcSystem comp_arm_exception = None try: self.mav.mav.srcSystem = 1 self.arm_vehicle(timeout=5) self.disarm_vehicle() except Exception as e: comp_arm_exception = e pass self.mav.mav.srcSystem = old_srcSystem if comp_arm_exception is not None: raise comp_arm_exception except Exception as e: self.progress("Exception caught") ex = e self.context_pop() if ex is not None: raise ex def drain_mav_seconds(self, seconds): tstart = self.get_sim_time_cached() while self.get_sim_time_cached() - tstart < seconds: self.drain_mav(); self.delay_sim_time(0.5) def test_button(self): self.set_parameter("SIM_PIN_MASK", 0) self.set_parameter("BTN_ENABLE", 1) btn = 2 pin = 3 self.drain_mav() self.set_parameter("BTN_PIN%u" % btn, pin) m = self.mav.recv_match(type='BUTTON_CHANGE', blocking=True, timeout=1) self.progress("m: %s" % str(m)) if m is None: raise NotAchievedException("Did not get BUTTON_CHANGE event") mask = 1< 0.000001: raise NotAchievedException("Did not get correct lat in fencepoint: got=%f want=%f" % (m.lat, lat)) if abs(m.lng - lng) > 0.000001: raise NotAchievedException("Did not get correct lng in fencepoint: got=%f want=%f" % (m.lng, lng)) self.progress("Roundtrip OK") def test_gcs_fence(self): target_system = 1 target_component = 1 self.progress("Testing FENCE_POINT protocol") self.set_parameter("FENCE_TOTAL", 1) self.roundtrip_fencepoint_protocol(0, 1, 1.2345, 5.4321, target_system=target_component, target_component=target_component) lat = 2.345 lng = 4.321 self.roundtrip_fencepoint_protocol(0, 1, lat, lng, target_system=target_component, target_component=target_component) def test_offboard(self, timeout=90): self.load_mission("rover-guided-mission.txt") self.wait_ready_to_arm(require_absolute=True) self.arm_vehicle() self.change_mode("AUTO") offboard_expected_duration = 10 # see mission file if self.mav.messages.get("SET_POSITION_TARGET_GLOBAL_INT", None): raise PreconditionFailedException("Already have SET_POSITION_TARGET_GLOBAL_INT") tstart = self.get_sim_time_cached() last_heartbeat_sent = 0 got_sptgi = False magic_waypoint_tstart = 0 magic_waypoint_tstop = 0 while True: if self.mode_is("HOLD", cached=True): break now = self.get_sim_time_cached() if now - last_heartbeat_sent > 1: last_heartbeat_sent = now self.mav.mav.heartbeat_send(mavutil.mavlink.MAV_TYPE_ONBOARD_CONTROLLER, mavutil.mavlink.MAV_AUTOPILOT_INVALID, 0, 0, 0) if now - tstart > timeout: raise AutoTestTimeoutException("Didn't complete") magic_waypoint = 3 # mc = self.mav.messages.get("MISSION_CURRENT", None) mc = self.mav.recv_match(type="MISSION_CURRENT", blocking=False) if mc is not None: print("%s" % str(mc)) if mc.seq == magic_waypoint: print("At magic waypoint") if magic_waypoint_tstart == 0: magic_waypoint_tstart = self.get_sim_time_cached() sptgi = self.mav.messages.get("SET_POSITION_TARGET_GLOBAL_INT", None) if sptgi is not None: got_sptgi = True elif mc.seq > magic_waypoint: if magic_waypoint_tstop == 0: magic_waypoint_tstop = self.get_sim_time_cached() self.disarm_vehicle() offboard_duration = magic_waypoint_tstop - magic_waypoint_tstart if abs(offboard_duration - offboard_expected_duration) > 1: raise NotAchievedException("Did not stay in offboard control for correct time (want=%f got=%f)" % (offboard_expected_duration, offboard_duration)) if not got_sptgi: raise NotAchievedException("Did not get sptgi message") print("spgti: %s" % str(sptgi)) def assert_mission_count_on_link(self, mav, expected_count, target_system, target_component, mission_type): self.drain_mav(mav) self.progress("waiting for a message - any message....") m = mav.recv_match(blocking=True, timeout=1) self.progress("Received (%s)" % str(m)) if not mav.mavlink20(): raise NotAchievedException("Not doing mavlink2") tstart = self.get_sim_time() mav.mav.mission_request_list_send(target_system, target_component, mission_type) while True: if self.get_sim_time_cached() - tstart > 10: raise NotAchievedException("Did not receive MISSION_COUNT on link") m = mav.recv_match(blocking=True, timeout=1) if m is None: self.progress("No messages") continue self.progress("Received (%s)" % str(m)) if m.get_type() == "MISSION_ACK": if m.type != mavutil.mavlink.MAV_MISSION_ACCEPTED: raise NotAchievedException("Expected MISSION_COUNT, got (%s)" % m) if m.get_type() == "MISSION_COUNT": break if m.mission_type != mission_type: raise NotAchievedException("Did not get expected mission type (want=%u got=%u)" % (mission_type, m.mission_type)) if m.count != expected_count: raise NotAchievedException("Bad count received (want=%u got=%u)" % (expected_count, m.count)) def get_mission_item_int_on_link(self, item, mav, target_system, target_component, mission_type): mav.mav.mission_request_int_send(target_system, target_component, item, mission_type) m = mav.recv_match(type='MISSION_ITEM_INT', blocking=True, timeout=1) if m is None: raise NotAchievedException("Did not receive mission item int") if m.mission_type != mission_type: raise NotAchievedException("Mission item of incorrect type") if m.target_system != mav.mav.srcSystem: raise NotAchievedException("Unexpected target system %u want=%u" % (m.target_system, mav.mav.srcSystem)) if m.seq != item: raise NotAchievedException("Incorrect sequence number on received item got=%u want=%u" % (m.seq, item)) if m.mission_type != mission_type: raise NotAchievedException("Mission type incorrect on received item (want=%u got=%u)" % (mission_type, m.mission_type)) if m.target_component != mav.mav.srcComponent: raise NotAchievedException("Unexpected target component %u want=%u" % (m.target_component, mav.mav.srcComponent)) return m def get_mission_item_on_link(self, item, mav, target_system, target_component, mission_type): mav.mav.mission_request_send(target_system, target_component, item, mission_type) m = mav.recv_match(type='MISSION_ITEM', blocking=True, timeout=1) if m is None: raise NotAchievedException("Did not receive mission item int") if m.target_system != mav.mav.srcSystem: raise NotAchievedException("Unexpected target system %u want=%u" % (m.target_system, mav.mav.srcSystem)) if m.seq != item: raise NotAchievedException("Incorrect sequence number on received item_int got=%u want=%u" % (m.seq, item)) if m.mission_type != mission_type: raise NotAchievedException("Mission type incorrect on received item_int (want=%u got=%u)" % (mission_type, m.mission_type)) if m.target_component != mav.mav.srcComponent: raise NotAchievedException("Unexpected target component %u want=%u" % (m.target_component, mav.mav.srcComponent)) return m def clear_mission(self, mission_type, target_system=1, target_component=1): self.mav.mav.mission_count_send(target_system, target_component, 0, mission_type) m = self.mav.recv_match(type='MISSION_ACK', blocking=True, timeout=5) if m is None: raise NotAchievedException("Expected ACK for clearing mission") if m.target_system != self.mav.mav.srcSystem: raise NotAchievedException("ACK not targetted at correct system want=%u got=%u" % (self.mav.mav.srcSystem, m.target_system)) if m.target_component != self.mav.mav.srcComponent: raise NotAchievedException("ACK not targetted at correct component want=%u got=%u" % (self.mav.mav.srcComponent, m.target_component)) if m.type != mavutil.mavlink.MAV_MISSION_ACCEPTED: raise NotAchievedException("Expected MAV_MISSION_ACCEPTED got %s" % (mavutil.mavlink.enums["MAV_MISSION_RESULT"][m.type].name,)) def assert_receive_mission_item_request(self, mission_type, seq): self.progress("Expecting request for item %u" % seq) m = self.mav.recv_match(type='MISSION_REQUEST', blocking=True, timeout=1) if m is None: raise NotAchievedException("Did not get item request") if m.mission_type != mission_type: raise NotAchievedException("Incorrect mission type (wanted=%u got=%u)" % (mission_type, m.mission_type)) if m.seq != seq: raise NotAchievedException("Unexpected sequence number (want=%u got=%u)" % (seq, m.seq)) self.progress("Received item request OK") def assert_receive_mission_ack(self, mission_type, want_type=mavutil.mavlink.MAV_MISSION_ACCEPTED, target_system=None, target_component=None, mav=None): if mav is None: mav = self.mav if target_system is None: target_system = mav.mav.srcSystem if target_component is None: target_component = mav.mav.srcComponent self.progress("Expecting mission ack") m = mav.recv_match(type='MISSION_ACK', blocking=True, timeout=5) self.progress("Received ACK (%s)" % str(m)) if m is None: raise NotAchievedException("Expected mission ACK") if m.target_system != target_system: raise NotAchievedException("ACK not targetted at correct system want=%u got=%u" % (mav.mav.srcSystem, m.target_system)) if m.target_component != target_component: raise NotAchievedException("ACK not targetted at correct component") if m.mission_type != mission_type: raise NotAchievedException("Unexpected mission type %u want=%u" % (m.mission_type, mission_type)) if m.type != want_type: raise NotAchievedException("Expected ack type got %u got %u" % (want_type, m.type)) def test_gcs_rally(self): target_system = 1 target_component = 1 self.mavproxy.send('rally clear\n') self.delay_sim_time(1) if self.get_parameter("RALLY_TOTAL") != 0: raise NotAchievedException("Failed to clear rally points") old_srcSystem = self.mav.mav.srcSystem # stop MAVProxy poking the autopilot: self.mavproxy.send('module unload rally\n') self.mavproxy.expect("Unloaded module rally") self.mavproxy.send('module unload wp\n') self.mavproxy.expect("Unloaded module wp") try: item1_lat = int(2.0000 *1e7) items = [ self.mav.mav.mission_item_int_encode( target_system, target_component, 0, # seq mavutil.mavlink.MAV_FRAME_GLOBAL_RELATIVE_ALT, mavutil.mavlink.MAV_CMD_NAV_RALLY_POINT, 0, # current 0, # autocontinue 0, # p1 0, # p2 0, # p3 0, # p4 int(1.0000 *1e7), # latitude int(1.0000 *1e7), # longitude 31.0000, # altitude mavutil.mavlink.MAV_MISSION_TYPE_RALLY), self.mav.mav.mission_item_int_encode( target_system, target_component, 1, # seq mavutil.mavlink.MAV_FRAME_GLOBAL_RELATIVE_ALT, mavutil.mavlink.MAV_CMD_NAV_RALLY_POINT, 0, # current 0, # autocontinue 0, # p1 0, # p2 0, # p3 0, # p4 item1_lat, # latitude int(2.0000 *1e7), # longitude 32.0000, # altitude mavutil.mavlink.MAV_MISSION_TYPE_RALLY), self.mav.mav.mission_item_int_encode( target_system, target_component, 2, # seq mavutil.mavlink.MAV_FRAME_GLOBAL_RELATIVE_ALT, mavutil.mavlink.MAV_CMD_NAV_RALLY_POINT, 0, # current 0, # autocontinue 0, # p1 0, # p2 0, # p3 0, # p4 int(3.0000 *1e7), # latitude int(3.0000 *1e7), # longitude 33.0000, # altitude mavutil.mavlink.MAV_MISSION_TYPE_RALLY), ] self.upload_using_mission_protocol(mavutil.mavlink.MAV_MISSION_TYPE_RALLY, items) downloaded = self.download_using_mission_protocol(mavutil.mavlink.MAV_MISSION_TYPE_RALLY) print("Got items (%s)" % str(items)) if len(downloaded) != len(items): raise NotAchievedException("Did not download correct number of items want=%u got=%u" % (len(downloaded), len(items))) rally_total = self.get_parameter("RALLY_TOTAL") if rally_total != len(downloaded): raise NotAchievedException("Unexpected rally point count: want=%u got=%u" % (len(items), rally_total)) self.progress("Pruning count by setting parameter (urgh)") self.set_parameter("RALLY_TOTAL", 2) downloaded = self.download_using_mission_protocol(mavutil.mavlink.MAV_MISSION_TYPE_RALLY) if len(downloaded) != 2: raise NotAchievedException("Failed to prune rally points by setting parameter. want=%u got=%u" % (2, len(downloaded))) self.progress("Uploading a third item using old protocol") new_item2_lat = int(6.0 *1e7) self.set_parameter("RALLY_TOTAL", 3) self.mav.mav.rally_point_send(target_system, target_component, 2, # sequence number 3, # total count new_item2_lat, int(7.0 *1e7), 15, 0, # "break" alt?! 0, # "land dir" 0) # flags downloaded = self.download_using_mission_protocol(mavutil.mavlink.MAV_MISSION_TYPE_RALLY) if len(downloaded) != 3: raise NotAchievedException("resetting rally point count didn't change items returned") if downloaded[2].x != new_item2_lat: raise NotAchievedException("Bad lattitude in downloaded item: want=%u got=%u" % (new_item2_lat, downloaded[2].x)) self.progress("Grabbing original item 1 using original protocol") self.mav.mav.rally_fetch_point_send(target_system, target_component, 1) m = self.mav.recv_match(type="RALLY_POINT", blocking=True, timeout=1) if m.target_system != 255: raise NotAchievedException("Bad target_system on received rally point (want=%u got=%u)" % (255, m.target_system)) if m.target_component != 250: # autotest's component ID raise NotAchievedException("Bad target_component on received rally point") if m.lat != item1_lat: raise NotAchievedException("Bad latitude on received rally point") self.start_subtest("Test upload lockout and timeout") self.progress("Starting upload from normal sysid") self.mav.mav.mission_count_send(target_system, target_component, len(items), mavutil.mavlink.MAV_MISSION_TYPE_RALLY) self.drain_mav() # throw away requests for items self.mav.mav.srcSystem = 243 self.progress("Attempting upload from sysid=%u" % (self.mav.mav.srcSystem,)) self.mav.mav.mission_count_send(target_system, target_component, len(items), mavutil.mavlink.MAV_MISSION_TYPE_RALLY) self.assert_receive_mission_ack(mavutil.mavlink.MAV_MISSION_TYPE_RALLY, want_type=mavutil.mavlink.MAV_MISSION_DENIED) self.progress("Attempting download from sysid=%u" % (self.mav.mav.srcSystem,)) self.mav.mav.mission_request_list_send(target_system, target_component, mavutil.mavlink.MAV_MISSION_TYPE_RALLY) self.assert_receive_mission_ack(mavutil.mavlink.MAV_MISSION_TYPE_RALLY, want_type=mavutil.mavlink.MAV_MISSION_DENIED) # wait for the upload from sysid=1 to time out: self.mavproxy.expect("upload timeout") self.progress("Now trying to upload empty mission after timeout") self.mav.mav.mission_count_send(target_system, target_component, 0, mavutil.mavlink.MAV_MISSION_TYPE_RALLY) self.assert_receive_mission_ack(mavutil.mavlink.MAV_MISSION_TYPE_RALLY) self.drain_mav() self.start_subtest("Check rally upload/download across separate links") self.upload_using_mission_protocol(mavutil.mavlink.MAV_MISSION_TYPE_RALLY, items) self.progress("ensure a mavlink1 connection can't do anything useful with new item types") mav2 = mavutil.mavlink_connection("tcp:localhost:5763", robust_parsing=True, source_system = 7, source_component=7) mav2.mav.mission_request_list_send(target_system, target_component, mission_type=mavutil.mavlink.MAV_MISSION_TYPE_RALLY) # so this looks a bit odd; the other end isn't sending # mavlink2 so can't fill in the extension here. self.assert_receive_mission_ack( mavutil.mavlink.MAV_MISSION_TYPE_MISSION, want_type=mavutil.mavlink.MAV_MISSION_UNSUPPORTED, mav=mav2, ) self.set_parameter("SERIAL2_PROTOCOL", 2) expected_count = 3 self.progress("Assert mission count on new link") self.assert_mission_count_on_link(mav2, expected_count, target_system, target_component, mavutil.mavlink.MAV_MISSION_TYPE_RALLY) self.progress("Assert mission count on original link") self.assert_mission_count_on_link(self.mav, expected_count, target_system, target_component, mavutil.mavlink.MAV_MISSION_TYPE_RALLY) self.progress("Get first item on new link") m2 = self.get_mission_item_int_on_link(2, mav2, target_system, target_component, mavutil.mavlink.MAV_MISSION_TYPE_RALLY) self.progress("Get first item on original link") m = self.get_mission_item_int_on_link(2, self.mav, target_system, target_component, mavutil.mavlink.MAV_MISSION_TYPE_RALLY) if m2.x != m.x: raise NotAchievedException("mission items do not match (%d vs %d)" % (m2.x, m.x)) m_nonint = self.get_mission_item_on_link(2, self.mav, target_system, target_component, mavutil.mavlink.MAV_MISSION_TYPE_RALLY) # ensure we get nacks for bad mission item requests: self.mav.mav.mission_request_send(target_system, target_component, 65, mavutil.mavlink.MAV_MISSION_TYPE_RALLY) self.assert_receive_mission_ack( mavutil.mavlink.MAV_MISSION_TYPE_RALLY, want_type=mavutil.mavlink.MAV_MISSION_INVALID_SEQUENCE, ) self.mav.mav.mission_request_int_send(target_system, target_component, 65, mavutil.mavlink.MAV_MISSION_TYPE_RALLY) self.assert_receive_mission_ack( mavutil.mavlink.MAV_MISSION_TYPE_RALLY, want_type=mavutil.mavlink.MAV_MISSION_INVALID_SEQUENCE, ) self.start_subtest("Should enforce items come from correct GCS") self.mav.mav.mission_count_send(target_system, target_component, 1, mavutil.mavlink.MAV_MISSION_TYPE_RALLY) self.assert_receive_mission_item_request(mavutil.mavlink.MAV_MISSION_TYPE_RALLY, 0) self.progress("Attempting to upload from bad sysid") old_sysid = self.mav.mav.srcSystem self.mav.mav.srcSystem = 17 items[0].pack(self.mav.mav) self.mav.mav.send(items[0]) self.mav.mav.srcSystem = old_sysid self.assert_receive_mission_ack(mavutil.mavlink.MAV_MISSION_TYPE_RALLY, want_type=mavutil.mavlink.MAV_MISSION_DENIED, target_system=17) self.progress("Sending from correct sysid") items[0].pack(self.mav.mav) self.mav.mav.send(items[0]) self.assert_receive_mission_ack(mavutil.mavlink.MAV_MISSION_TYPE_RALLY) self.drain_mav() self.drain_all_pexpects() self.start_subtest("Attempt to send item on different link to that which we are sending requests on") self.progress("Sending count") self.mav.mav.mission_count_send(target_system, target_component, 2, mavutil.mavlink.MAV_MISSION_TYPE_RALLY) self.assert_receive_mission_item_request(mavutil.mavlink.MAV_MISSION_TYPE_RALLY, 0) old_mav2_system = mav2.mav.srcSystem old_mav2_component = mav2.mav.srcComponent mav2.mav.srcSystem = self.mav.mav.srcSystem mav2.mav.srcComponent = self.mav.mav.srcComponent self.progress("Sending item on second link") # note that the routing table in ArduPilot now will say # this sysid/compid is on both links which may cause # weirdness... items[0].pack(mav2.mav) mav2.mav.send(items[0]) mav2.mav.srcSystem = old_mav2_system mav2.mav.srcComponent = old_mav2_component # we continue to receive requests on the original link: m = self.mav.recv_match(type='MISSION_REQUEST', blocking=True, timeout=1) if m is None: raise NotAchievedException("Did not get mission request") if m.mission_type != mavutil.mavlink.MAV_MISSION_TYPE_RALLY: raise NotAchievedException("Mission request of incorrect type") if m.seq != 1: raise NotAchievedException("Unexpected sequence number (expected=%u got=%u)" % (1, m.seq)) items[1].pack(self.mav.mav) self.mav.mav.send(items[1]) self.assert_receive_mission_ack(mavutil.mavlink.MAV_MISSION_TYPE_RALLY) self.drain_mav() self.drain_all_pexpects() self.start_subtest("Upload mission and rally points at same time") self.progress("Sending rally count") self.mav.mav.mission_count_send(target_system, target_component, 3, mavutil.mavlink.MAV_MISSION_TYPE_RALLY) self.assert_receive_mission_item_request(mavutil.mavlink.MAV_MISSION_TYPE_RALLY, 0) self.progress("Sending wp count") self.mav.mav.mission_count_send(target_system, target_component, 3, mavutil.mavlink.MAV_MISSION_TYPE_MISSION) self.assert_receive_mission_item_request(mavutil.mavlink.MAV_MISSION_TYPE_MISSION, 0) self.progress("Answering request for mission item 0") wp = self.mav.mav.mission_item_int_send( target_system, target_component, 0, # seq mavutil.mavlink.MAV_FRAME_GLOBAL_RELATIVE_ALT, mavutil.mavlink.MAV_CMD_NAV_WAYPOINT, 0, # current 0, # autocontinue 0, # p1 0, # p2 0, # p3 0, # p4 int(1.1000 *1e7), # latitude int(1.2000 *1e7), # longitude 321.0000, # altitude mavutil.mavlink.MAV_MISSION_TYPE_MISSION), self.assert_receive_mission_item_request(mavutil.mavlink.MAV_MISSION_TYPE_MISSION, 1) self.progress("Answering request for rally point 0") items[0].pack(self.mav.mav) self.mav.mav.send(items[0]) self.progress("Expecting request for rally item 1") self.assert_receive_mission_item_request(mavutil.mavlink.MAV_MISSION_TYPE_RALLY, 1) self.progress("Answering request for rally point 1") items[1].pack(self.mav.mav) self.mav.mav.send(items[1]) self.progress("Expecting request for rally item 2") self.assert_receive_mission_item_request(mavutil.mavlink.MAV_MISSION_TYPE_RALLY, 2) self.progress("Answering request for rally point 2") items[2].pack(self.mav.mav) self.mav.mav.send(items[2]) self.progress("Expecting mission ack for rally") self.assert_receive_mission_ack(mavutil.mavlink.MAV_MISSION_TYPE_RALLY) self.progress("Answering request for waypoints item 1") wp = self.mav.mav.mission_item_int_send( target_system, target_component, 1, # seq mavutil.mavlink.MAV_FRAME_GLOBAL_RELATIVE_ALT, mavutil.mavlink.MAV_CMD_NAV_WAYPOINT, 0, # current 0, # autocontinue 0, # p1 0, # p2 0, # p3 0, # p4 int(1.1000 *1e7), # latitude int(1.2000 *1e7), # longitude 321.0000, # altitude mavutil.mavlink.MAV_MISSION_TYPE_MISSION), self.assert_receive_mission_item_request(mavutil.mavlink.MAV_MISSION_TYPE_MISSION, 2) self.progress("Answering request for waypoints item 2") self.mav.mav.mission_item_int_send( target_system, target_component, 2, # seq mavutil.mavlink.MAV_FRAME_GLOBAL_RELATIVE_ALT, mavutil.mavlink.MAV_CMD_NAV_WAYPOINT, 0, # current 0, # autocontinue 0, # p1 0, # p2 0, # p3 0, # p4 int(1.1000 *1e7), # latitude int(1.2000 *1e7), # longitude 321.0000, # altitude mavutil.mavlink.MAV_MISSION_TYPE_MISSION), self.assert_receive_mission_ack(mavutil.mavlink.MAV_MISSION_TYPE_MISSION) self.start_subtest("Test write-partial-list") self.progress("Clearing rally points using count-send") self.clear_mission(mavutil.mavlink.MAV_MISSION_TYPE_RALLY, target_system=target_system, target_component=target_component) self.progress("Should not be able to set items completely past the waypoint count") self.upload_using_mission_protocol(mavutil.mavlink.MAV_MISSION_TYPE_RALLY, items) self.mav.mav.mission_write_partial_list_send( target_system, target_component, 17, 20, mavutil.mavlink.MAV_MISSION_TYPE_RALLY) self.assert_receive_mission_ack(mavutil.mavlink.MAV_MISSION_TYPE_RALLY, want_type=mavutil.mavlink.MAV_MISSION_ERROR) self.progress("Should not be able to set items overlapping the waypoint count") self.mav.mav.mission_write_partial_list_send( target_system, target_component, 0, 20, mavutil.mavlink.MAV_MISSION_TYPE_RALLY) self.assert_receive_mission_ack(mavutil.mavlink.MAV_MISSION_TYPE_RALLY, want_type=mavutil.mavlink.MAV_MISSION_ERROR) self.progress("try to overwrite items 1 and 2") self.mav.mav.mission_write_partial_list_send( target_system, target_component, 1, 2, mavutil.mavlink.MAV_MISSION_TYPE_RALLY) self.assert_receive_mission_item_request(mavutil.mavlink.MAV_MISSION_TYPE_RALLY, 1) self.progress("Try shoving up an incorrectly sequenced item") self.mav.mav.mission_item_int_send( target_system, target_component, 0, # seq mavutil.mavlink.MAV_FRAME_GLOBAL_RELATIVE_ALT, mavutil.mavlink.MAV_CMD_NAV_RALLY_POINT, 0, # current 0, # autocontinue 0, # p1 0, # p2 0, # p3 0, # p4 int(1.1000 *1e7), # latitude int(1.2000 *1e7), # longitude 321.0000, # altitude mavutil.mavlink.MAV_MISSION_TYPE_RALLY), self.assert_receive_mission_ack(mavutil.mavlink.MAV_MISSION_TYPE_RALLY, want_type=mavutil.mavlink.MAV_MISSION_INVALID_SEQUENCE) self.progress("Try shoving up an incorrectly sequenced item (but within band)") self.mav.mav.mission_item_int_send( target_system, target_component, 2, # seq mavutil.mavlink.MAV_FRAME_GLOBAL_RELATIVE_ALT, mavutil.mavlink.MAV_CMD_NAV_RALLY_POINT, 0, # current 0, # autocontinue 0, # p1 0, # p2 0, # p3 0, # p4 int(1.1000 *1e7), # latitude int(1.2000 *1e7), # longitude 321.0000, # altitude mavutil.mavlink.MAV_MISSION_TYPE_RALLY), self.assert_receive_mission_ack(mavutil.mavlink.MAV_MISSION_TYPE_RALLY, want_type=mavutil.mavlink.MAV_MISSION_INVALID_SEQUENCE) self.progress("Now provide correct item") item1_latitude = int(1.2345*1e7) self.mav.mav.mission_item_int_send( target_system, target_component, 1, # seq mavutil.mavlink.MAV_FRAME_GLOBAL_RELATIVE_ALT, mavutil.mavlink.MAV_CMD_NAV_RALLY_POINT, 0, # current 0, # autocontinue 0, # p1 0, # p2 0, # p3 0, # p4 item1_latitude, # latitude int(1.2000 *1e7), # longitude 321.0000, # altitude mavutil.mavlink.MAV_MISSION_TYPE_RALLY), self.assert_receive_mission_item_request(mavutil.mavlink.MAV_MISSION_TYPE_RALLY, 2) self.progress("Answering request for rally point 2") items[2].pack(self.mav.mav) self.mav.mav.send(items[2]) self.assert_receive_mission_ack(mavutil.mavlink.MAV_MISSION_TYPE_RALLY) self.progress("TODO: ensure partial mission write was good") self.start_subtest("clear mission types") self.assert_mission_count_on_link(self.mav, 3, target_system, target_component, mavutil.mavlink.MAV_MISSION_TYPE_RALLY) self.assert_mission_count_on_link(self.mav, 3, target_system, target_component, mavutil.mavlink.MAV_MISSION_TYPE_MISSION) self.mav.mav.mission_clear_all_send(target_system, target_component, mavutil.mavlink.MAV_MISSION_TYPE_RALLY) self.assert_mission_count_on_link(self.mav, 0, target_system, target_component, mavutil.mavlink.MAV_MISSION_TYPE_RALLY) self.assert_mission_count_on_link(self.mav, 3, target_system, target_component, mavutil.mavlink.MAV_MISSION_TYPE_MISSION) self.mav.mav.mission_clear_all_send(target_system, target_component, mavutil.mavlink.MAV_MISSION_TYPE_MISSION) self.assert_mission_count_on_link(self.mav, 0, target_system, target_component, mavutil.mavlink.MAV_MISSION_TYPE_RALLY) self.assert_mission_count_on_link(self.mav, 0, target_system, target_component, mavutil.mavlink.MAV_MISSION_TYPE_MISSION) self.start_subtest("try sending out-of-range counts") self.mav.mav.mission_count_send(target_system, target_component, 1, 230) self.assert_receive_mission_ack(230, want_type=mavutil.mavlink.MAV_MISSION_UNSUPPORTED) self.mav.mav.mission_count_send(target_system, target_component, 16000, mavutil.mavlink.MAV_MISSION_TYPE_RALLY) self.assert_receive_mission_ack(mavutil.mavlink.MAV_MISSION_TYPE_RALLY, want_type=mavutil.mavlink.MAV_MISSION_NO_SPACE) except Exception as e: self.progress("Received exception (%s)" % self.get_exception_stacktrace(e)) self.mav.mav.srcSystem = old_srcSystem raise e self.mavproxy.send('module load rally\n') self.mavproxy.expect("Loaded module rally") self.mavproxy.send('module load wp\n') self.mavproxy.expect("Loaded module wp") def wait_distance_to_home(self, distance, accuracy=5): tstart = self.get_sim_time() timeout = 30 while True: if self.get_sim_time_cached() - tstart > timeout: raise AutoTestTimeoutException("Failed to get home") self.mav.recv_match(type='VFR_HUD', blocking=True) self.progress("Dist from home: %.02f" % self.distance_to_home(use_cached_home=True)) if abs(distance-self.distance_to_home(use_cached_home=True)) <= accuracy: break def drive_smartrtl(self): self.change_mode("STEERING") self.wait_ready_to_arm() self.arm_vehicle() # drive two sides of a square, make sure we don't go back through # the middle of the square self.progress("Driving North") self.reach_heading_manual(0) self.set_rc(3, 2000) self.delay_sim_time(5) self.set_rc(3, 1000) self.wait_groundspeed(0, 1) loc = self.mav.location() self.progress("Driving East") self.set_rc(3, 2000) self.reach_heading_manual(90) self.set_rc(3, 2000) self.delay_sim_time(5) self.set_rc(3, 1000) self.progress("Entering smartrtl") self.change_mode("SMART_RTL") self.progress("Ensure we go via intermediate point") self.wait_distance_to_location(loc, 0, 5) self.progress("Ensure we get home") self.wait_distance_to_home(5, accuracy=2) self.disarm_vehicle() def test_motor_test(self): '''AKA run-rover-run''' magic_throttle_value = 1812 self.run_cmd(mavutil.mavlink.MAV_CMD_DO_MOTOR_TEST, 1, # p1 - motor instance mavutil.mavlink.MOTOR_TEST_THROTTLE_PWM, # p2 - throttle type magic_throttle_value, # p3 - throttle 5, # p4 - timeout 1, # p5 - motor count 0, # p6 - test order (see MOTOR_TEST_ORDER) 0, # p7 ) self.mav.motors_armed_wait() self.progress("Waiting for magic throttle value") self.wait_servo_channel_value(3, magic_throttle_value) self.wait_servo_channel_value(3, self.get_parameter("RC3_TRIM", 5), timeout=10) self.mav.motors_disarmed_wait() def tests(self): '''return list of all tests''' ret = super(AutoTestRover, self).tests() ret.extend([ ("MAVProxy_SetModeUsingSwitch", "Set modes via mavproxy switch", self.test_setting_modes_via_mavproxy_switch), ("MAVProxy_SetModeUsingMode", "Set modes via mavproxy mode command", self.test_setting_modes_via_mavproxy_mode_command), ("ModeSwitch", "Set modes via modeswitch", self.test_setting_modes_via_modeswitch), ("AuxModeSwitch", "Set modes via auxswitches", self.test_setting_modes_via_auxswitches), ("DriveRTL", "Drive an RTL Mission", self.drive_rtl_mission), ("SmartRTL", "Test SmartRTL", self.drive_smartrtl), ("DriveSquare", "Learn/Drive Square with Ch7 option", self.drive_square), ("DriveMission", "Drive Mission %s" % "rover1.txt", lambda: self.drive_mission("rover1.txt")), # disabled due to frequent failures in travis. This test needs re-writing # ("Drive Brake", self.drive_brake), ("GetBanner", "Get Banner", self.do_get_banner), ("GetCapabilities", "Get Capabilities", self.do_get_autopilot_capabilities), ("DO_SET_MODE", "Set mode via MAV_COMMAND_DO_SET_MODE", self.test_do_set_mode_via_command_long), ("MAVProxy_DO_SET_MODE", "Set mode via MAV_COMMAND_DO_SET_MODE with MAVProxy", self.test_mavproxy_do_set_mode_via_command_long), ("ServoRelayEvents", "Test ServoRelayEvents", self.test_servorelayevents), ("RCOverrides", "Test RC overrides", self.test_rc_overrides), ("RCOverridesCancel", "Test RC overrides Cancel", self.test_rc_override_cancel), ("MANUAL_CONTROL", "Test mavlink MANUAL_CONTROL", self.test_manual_control), ("Sprayer", "Test Sprayer", self.test_sprayer), ("AC_Avoidance", "Test AC Avoidance switch", self.drive_fence_ac_avoidance), ("CameraMission", "Test Camera Mission Items", self.test_camera_mission_items), # Gripper test ("Gripper", "Test gripper", self.test_gripper), ("GripperMission", "Test Gripper Mission Items", self.test_gripper_mission), ("SET_MESSAGE_INTERVAL", "Test MAV_CMD_SET_MESSAGE_INTERVAL", self.test_set_message_interval), ("REQUEST_MESSAGE", "Test MAV_CMD_REQUEST_MESSAGE", self.test_request_message), ("SYSID_ENFORCE", "Test enforcement of SYSID_MYGCS", self.test_sysid_enforce), ("Button", "Test Buttons", self.test_button), ("Rally", "Test Rally Points", self.test_rally_points), ("Offboard", "Test Offboard Control", self.test_offboard), ("GCSFence", "Upload and download of fence", self.test_gcs_fence), ("GCSRally", "Upload and download of rally", self.test_gcs_rally), ("MotorTest", "Motor Test triggered via mavlink", self.test_motor_test), ("DataFlashOverMAVLink", "Test DataFlash over MAVLink", self.test_dataflash_over_mavlink), ("DownLoadLogs", "Download logs", lambda: self.log_download( self.buildlogs_path("APMrover2-log.bin"), upload_logs=len(self.fail_list) > 0)), ]) return ret def rc_defaults(self): ret = super(AutoTestRover, self).rc_defaults() ret[3] = 1500 ret[8] = 1800 return ret def default_mode(self): return 'MANUAL'