#!/usr/bin/env python # Drive APMrover2 in SITL from __future__ import print_function import os import time from common import AutoTest from common import AutoTestTimeoutException from common import MsgRcvTimeoutException from common import NotAchievedException from common import PreconditionFailedException from pymavlink import mavutil # get location of scripts testdir = os.path.dirname(os.path.realpath(__file__)) SITL_START_LOCATION = mavutil.location(40.071374969556928, -105.22978898137808, 1583.702759, 246) class AutoTestRover(AutoTest): @staticmethod def get_not_armable_mode_list(): return [] @staticmethod def get_not_disarmed_settable_modes_list(): return ["FOLLOW"] @staticmethod def get_no_position_not_settable_modes_list(): return [] @staticmethod def get_position_armable_modes_list(): return ["GUIDED", "LOITER", "STEERING", "AUTO", "RTL", "SMART_RTL"] @staticmethod def get_normal_armable_modes_list(): return ["ACRO", "HOLD", "MANUAL"] def log_name(self): return "APMrover2" def test_filepath(self): return os.path.realpath(__file__) def sitl_start_location(self): return SITL_START_LOCATION def default_frame(self): return "rover" def is_rover(self): return True def get_stick_arming_channel(self): return int(self.get_parameter("RCMAP_ROLL")) def arming_test_mission(self): return os.path.join(testdir, "ArduRover-Missions", "test_arming.txt") ########################################################## # TESTS DRIVE ########################################################## # Drive a square in manual mode def drive_square(self, side=50): """Drive a square, Driving N then E .""" self.context_push() ex = None try: self.progress("TEST SQUARE") self.set_parameter("RC7_OPTION", 7) self.set_parameter("RC9_OPTION", 58) self.mavproxy.send('switch 5\n') self.wait_mode('MANUAL') self.wait_ready_to_arm() self.arm_vehicle() self.clear_wp(9) # first aim north self.progress("\nTurn right towards north") self.reach_heading_manual(10) # save bottom left corner of box as home AND waypoint self.progress("Save HOME") self.save_wp() self.progress("Save WP") self.save_wp() # pitch forward to fly north self.progress("\nGoing north %u meters" % side) self.reach_distance_manual(side) # save top left corner of square as waypoint self.progress("Save WP") self.save_wp() # roll right to fly east self.progress("\nGoing east %u meters" % side) self.reach_heading_manual(100) self.reach_distance_manual(side) # save top right corner of square as waypoint self.progress("Save WP") self.save_wp() # pitch back to fly south self.progress("\nGoing south %u meters" % side) self.reach_heading_manual(190) self.reach_distance_manual(side) # save bottom right corner of square as waypoint self.progress("Save WP") self.save_wp() # roll left to fly west self.progress("\nGoing west %u meters" % side) self.reach_heading_manual(280) self.reach_distance_manual(side) # save bottom left corner of square (should be near home) as waypoint self.progress("Save WP") self.save_wp() self.progress("Checking number of saved waypoints") num_wp = self.save_mission_to_file( os.path.join(testdir, "rover-ch7_mission.txt")) expected = 7 # home + 6 toggled in if num_wp != expected: raise NotAchievedException("Did not get %u waypoints; got %u" % (expected, num_wp)) # TODO: actually drive the mission self.clear_wp(9) except Exception as e: self.progress("Caught exception: %s" % self.get_exception_stacktrace(e)) ex = e self.disarm_vehicle() self.context_pop() if ex: raise ex def drive_left_circuit(self): """Drive a left circuit, 50m on a side.""" self.mavproxy.send('switch 6\n') self.wait_mode('MANUAL') self.set_rc(3, 2000) self.progress("Driving left circuit") # do 4 turns for i in range(0, 4): # hard left self.progress("Starting turn %u" % i) self.set_rc(1, 1000) self.wait_heading(270 - (90*i), accuracy=10) self.set_rc(1, 1500) self.progress("Starting leg %u" % i) self.wait_distance(50, accuracy=7) self.set_rc(3, 1500) self.progress("Circuit complete") # def test_throttle_failsafe(self, home, distance_min=10, side=60, # timeout=300): # """Fly east, Failsafe, return, land.""" # # self.mavproxy.send('switch 6\n') # manual mode # self.wait_mode('MANUAL') # self.mavproxy.send("param set FS_ACTION 1\n") # # # first aim east # self.progress("turn east") # if not self.reach_heading_manual(135): # return False # # # fly east 60 meters # self.progress("# Going forward %u meters" % side) # if not self.reach_distance_manual(side): # return False # # # pull throttle low # self.progress("# Enter Failsafe") # self.mavproxy.send('rc 3 900\n') # # tstart = self.get_sim_time() # success = False # while self.get_sim_time() < tstart + timeout and not success: # m = self.mav.recv_match(type='VFR_HUD', blocking=True) # pos = self.mav.location() # home_distance = self.get_distance(home, pos) # self.progress("Alt: %u HomeDistance: %.0f" % # (m.alt, home_distance)) # # check if we've reached home # if home_distance <= distance_min: # self.progress("RTL Complete") # success = True # # # reduce throttle # self.mavproxy.send('rc 3 1500\n') # self.mavproxy.expect('APM: Failsafe ended') # self.mavproxy.send('switch 2\n') # manual mode # self.wait_heartbeat() # self.wait_mode('MANUAL') # # if success: # self.progress("Reached failsafe home OK") # return True # else: # self.progress("Failed to reach Home on failsafe RTL - " # "timed out after %u seconds" % timeout) # return False def test_sprayer(self): """Test sprayer functionality.""" self.context_push() ex = None try: rc_ch = 5 pump_ch = 5 spinner_ch = 6 pump_ch_min = 1050 pump_ch_trim = 1520 pump_ch_max = 1950 spinner_ch_min = 975 spinner_ch_trim = 1510 spinner_ch_max = 1975 self.set_parameter("SPRAY_ENABLE", 1) self.set_parameter("SERVO%u_FUNCTION" % pump_ch, 22) self.set_parameter("SERVO%u_MIN" % pump_ch, pump_ch_min) self.set_parameter("SERVO%u_TRIM" % pump_ch, pump_ch_trim) self.set_parameter("SERVO%u_MAX" % pump_ch, pump_ch_max) self.set_parameter("SERVO%u_FUNCTION" % spinner_ch, 23) self.set_parameter("SERVO%u_MIN" % spinner_ch, spinner_ch_min) self.set_parameter("SERVO%u_TRIM" % spinner_ch, spinner_ch_trim) self.set_parameter("SERVO%u_MAX" % spinner_ch, spinner_ch_max) self.set_parameter("SIM_SPR_ENABLE", 1) self.fetch_parameters() self.set_parameter("SIM_SPR_PUMP", pump_ch) self.set_parameter("SIM_SPR_SPIN", spinner_ch) self.set_parameter("RC%u_OPTION" % rc_ch, 15) self.set_parameter("LOG_DISARMED", 1) self.reboot_sitl() self.wait_ready_to_arm() self.arm_vehicle() self.progress("test bootup state - it's zero-output!") self.wait_servo_channel_value(spinner_ch, 0) self.wait_servo_channel_value(pump_ch, 0) self.progress("Enable sprayer") self.set_rc(rc_ch, 2000) self.progress("Testing zero-speed state") self.wait_servo_channel_value(spinner_ch, spinner_ch_min) self.wait_servo_channel_value(pump_ch, pump_ch_min) self.progress("Testing turning it off") self.set_rc(rc_ch, 1000) self.wait_servo_channel_value(spinner_ch, spinner_ch_min) self.wait_servo_channel_value(pump_ch, pump_ch_min) self.progress("Testing turning it back on") self.set_rc(rc_ch, 2000) self.wait_servo_channel_value(spinner_ch, spinner_ch_min) self.wait_servo_channel_value(pump_ch, pump_ch_min) self.progress("Testing speed-ramping") self.set_rc(3, 1700) # start driving forward # this is somewhat empirical... self.wait_servo_channel_value(pump_ch, 1695, timeout=60) self.progress("Sprayer OK") except Exception as e: self.progress("Caught exception: %s" % self.get_exception_stacktrace(e)) ex = e self.context_pop() self.disarm_vehicle(force=True) self.reboot_sitl() if ex: raise ex ################################################# # AUTOTEST ALL ################################################# def drive_mission(self, filename): """Drive a mission from a file.""" self.progress("Driving mission %s" % filename) self.load_mission(filename) self.wait_ready_to_arm() self.arm_vehicle() self.mavproxy.send('switch 4\n') # auto mode self.set_rc(3, 1500) self.wait_mode('AUTO') self.wait_waypoint(1, 4, max_dist=5) self.wait_mode('HOLD', timeout=300) self.disarm_vehicle() self.progress("Mission OK") def test_gripper_mission(self): self.load_mission("rover-gripper-mission.txt") self.change_mode('AUTO') self.wait_ready_to_arm() self.arm_vehicle() self.mavproxy.expect("Gripper Grabbed") self.mavproxy.expect("Gripper Released") self.wait_mode("HOLD") self.disarm_vehicle() def do_get_banner(self): self.mavproxy.send("long DO_SEND_BANNER 1\n") start = time.time() while True: m = self.mav.recv_match(type='STATUSTEXT', blocking=True, timeout=1) if m is not None and "ArduRover" in m.text: self.progress("banner received: %s" % m.text) return if time.time() - start > 10: break raise MsgRcvTimeoutException("banner not received") def drive_brake_get_stopping_distance(self, speed): # measure our stopping distance: old_cruise_speed = self.get_parameter('CRUISE_SPEED') old_accel_max = self.get_parameter('ATC_ACCEL_MAX') # controller tends not to meet cruise speed (max of ~14 when 15 # set), thus *1.2 self.set_parameter('CRUISE_SPEED', speed*1.2) # at time of writing, the vehicle is only capable of 10m/s/s accel self.set_parameter('ATC_ACCEL_MAX', 15) self.change_mode("STEERING") self.set_rc(3, 2000) self.wait_groundspeed(15, 100) initial = self.mav.location() initial_time = time.time() while time.time() - initial_time < 2: # wait for a position update from the autopilot start = self.mav.location() if start != initial: break self.set_rc(3, 1500) self.wait_groundspeed(0, 0.2) # why do we not stop?! initial = self.mav.location() initial_time = time.time() while time.time() - initial_time < 2: # wait for a position update from the autopilot stop = self.mav.location() if stop != initial: break delta = self.get_distance(start, stop) self.set_parameter('CRUISE_SPEED', old_cruise_speed) self.set_parameter('ATC_ACCEL_MAX', old_accel_max) return delta def drive_brake(self): old_using_brake = self.get_parameter('ATC_BRAKE') old_cruise_speed = self.get_parameter('CRUISE_SPEED') self.set_parameter('CRUISE_SPEED', 15) self.set_parameter('ATC_BRAKE', 0) self.arm_vehicle() distance_without_brakes = self.drive_brake_get_stopping_distance(15) # brakes on: self.set_parameter('ATC_BRAKE', 1) distance_with_brakes = self.drive_brake_get_stopping_distance(15) # revert state: self.set_parameter('ATC_BRAKE', old_using_brake) self.set_parameter('CRUISE_SPEED', old_cruise_speed) delta = distance_without_brakes - distance_with_brakes if delta < distance_without_brakes * 0.05: # 5% isn't asking for much self.disarm_vehicle() raise NotAchievedException(""" Brakes have negligible effect (with=%0.2fm without=%0.2fm delta=%0.2fm) """ % (distance_with_brakes, distance_without_brakes, delta)) self.disarm_vehicle() self.progress( "Brakes work (with=%0.2fm without=%0.2fm delta=%0.2fm)" % (distance_with_brakes, distance_without_brakes, delta)) def drive_rtl_mission_max_distance_from_home(self): '''maximum distance allowed from home at end''' return 6.5 def drive_rtl_mission(self): self.wait_ready_to_arm() self.arm_vehicle() mission_filepath = os.path.join("ArduRover-Missions", "rtl.txt") self.load_mission(mission_filepath) self.change_mode("AUTO") self.mavproxy.expect('Mission: 3 RTL') self.drain_mav(); m = self.mav.recv_match(type='NAV_CONTROLLER_OUTPUT', blocking=True, timeout=1) if m is None: raise MsgRcvTimeoutException( "Did not receive NAV_CONTROLLER_OUTPUT message") wp_dist_min = 5 if m.wp_dist < wp_dist_min: raise PreconditionFailedException( "Did not start at least %f metres from destination (is=%f)" % (wp_dist_min, m.wp_dist)) self.progress("NAV_CONTROLLER_OUTPUT.wp_dist looks good (%u >= %u)" % (m.wp_dist, wp_dist_min,)) tstart = self.get_sim_time() while True: if self.get_sim_time_cached() - tstart > 600: raise NotAchievedException("Did not get home") self.progress("Distance home: %f (mode=%s)" % (self.distance_to_home(), self.mav.flightmode)) if self.mode_is('HOLD') or self.mode_is('ACRO'): # acro for balancebot break # the EKF doesn't pull us down to 0 speed: self.wait_groundspeed(0, 0.5, timeout=600) # current Rover blows straight past the home position and ends # up ~6m past the home point. home_distance = self.distance_to_home() home_distance_min = 5.5 home_distance_max = self.drive_rtl_mission_max_distance_from_home() if home_distance > home_distance_max: raise NotAchievedException( "Did not stop near home (%f metres distant (%f > want > %f))" % (home_distance, home_distance_min, home_distance_max)) self.disarm_vehicle() self.progress("RTL Mission OK (%fm)" % home_distance) def wait_distance_home_gt(self, distance, timeout=60): home_distance = None tstart = self.get_sim_time() while self.get_sim_time_cached() - tstart < timeout: # m = self.mav.recv_match(type='VFR_HUD', blocking=True) distance_home = self.distance_to_home(use_cached_home=True) self.progress("distance_home=%f want=%f" % (distance_home, distance)) if distance_home > distance: return self.drain_mav() raise NotAchievedException("Failed to get %fm from home (now=%f)" % (distance, home_distance)) def drive_fence_ac_avoidance(self): self.context_push() ex = None try: self.load_fence("rover-fence-ac-avoid.txt") self.set_parameter("FENCE_ENABLE", 0) self.set_parameter("PRX_TYPE", 10) self.set_parameter("RC10_OPTION", 40) # proximity-enable self.reboot_sitl() # start = self.mav.location() self.wait_ready_to_arm() self.arm_vehicle() # first make sure we can breach the fence: self.set_rc(10, 1000) self.change_mode("ACRO") self.set_rc(3, 1550) self.wait_distance_home_gt(25) self.change_mode("RTL") self.mavproxy.expect("APM: Reached destination") # now enable avoidance and make sure we can't: self.set_rc(10, 2000) self.change_mode("ACRO") self.wait_groundspeed(0, 0.7, timeout=60) # watch for speed zero self.wait_groundspeed(0, 0.2, timeout=120) except Exception as e: self.progress("Caught exception: %s" % self.get_exception_stacktrace(e)) ex = e self.context_pop() self.mavproxy.send("fence clear\n") self.disarm_vehicle(force=True) self.reboot_sitl() if ex: raise ex def test_servorelayevents(self): self.do_set_relay(0, 0) off = self.get_parameter("SIM_PIN_MASK") self.do_set_relay(0, 1) on = self.get_parameter("SIM_PIN_MASK") if on == off: raise NotAchievedException( "Pin mask unchanged after relay cmd") self.progress("Pin mask changed after relay command") def test_setting_modes_via_mavproxy_switch(self): fnoo = [(1, 'MANUAL'), (2, 'MANUAL'), (3, 'RTL'), # (4, 'AUTO'), # no mission, can't set auto (5, 'RTL'), # non-existant mode, should stay in RTL (6, 'MANUAL')] for (num, expected) in fnoo: self.mavproxy.send('switch %u\n' % num) self.wait_mode(expected) def test_setting_modes_via_mavproxy_mode_command(self): fnoo = [(1, 'ACRO'), (3, 'STEERING'), (4, 'HOLD'), ] for (num, expected) in fnoo: self.mavproxy.send('mode manual\n') self.wait_mode("MANUAL") self.mavproxy.send('mode %u\n' % num) self.wait_mode(expected) self.mavproxy.send('mode manual\n') self.wait_mode("MANUAL") self.mavproxy.send('mode %s\n' % expected) self.wait_mode(expected) def test_setting_modes_via_modeswitch(self): # test setting of modes through mode switch self.context_push() ex = None try: self.set_parameter("MODE_CH", 8) self.set_rc(8, 1000) # mavutil.mavlink.ROVER_MODE_HOLD: self.set_parameter("MODE6", 4) # mavutil.mavlink.ROVER_MODE_ACRO self.set_parameter("MODE5", 1) self.set_rc(8, 1800) # PWM for mode6 self.wait_mode("HOLD") self.set_rc(8, 1700) # PWM for mode5 self.wait_mode("ACRO") self.set_rc(8, 1800) # PWM for mode6 self.wait_mode("HOLD") self.set_rc(8, 1700) # PWM for mode5 self.wait_mode("ACRO") except Exception as e: self.progress("Exception caught") ex = e self.context_pop() if ex is not None: raise ex def test_setting_modes_via_auxswitches(self): self.context_push() ex = None try: self.set_parameter("MODE5", 1) self.mavproxy.send('switch 1\n') # random mode self.wait_heartbeat() self.change_mode('MANUAL') self.mavproxy.send('switch 5\n') # acro mode self.wait_mode("ACRO") self.set_rc(9, 1000) self.set_rc(10, 1000) self.set_parameter("RC9_OPTION", 53) # steering self.set_parameter("RC10_OPTION", 54) # hold self.set_rc(9, 1900) self.wait_mode("STEERING") self.set_rc(10, 1900) self.wait_mode("HOLD") # reset both switches - should go back to ACRO self.set_rc(9, 1000) self.set_rc(10, 1000) self.wait_mode("ACRO") self.set_rc(9, 1900) self.wait_mode("STEERING") self.set_rc(10, 1900) self.wait_mode("HOLD") self.set_rc(10, 1000) # this re-polls the mode switch self.wait_mode("ACRO") self.set_rc(9, 1000) except Exception as e: self.progress("Exception caught") ex = e self.context_pop() if ex is not None: raise ex def test_rc_override_cancel(self): self.change_mode('MANUAL') self.wait_ready_to_arm() self.zero_throttle() self.arm_vehicle() # start moving forward a little: normal_rc_throttle = 1700 throttle_override = 1900 self.progress("Establishing baseline RC input") self.mavproxy.send('rc 3 %u\n' % normal_rc_throttle) tstart = self.get_sim_time_cached() while True: if self.get_sim_time_cached() - tstart > 10: raise AutoTestTimeoutException("Did not get rc change") m = self.mav.recv_match(type='RC_CHANNELS', blocking=True) if m.chan3_raw == normal_rc_throttle: break self.progress("Set override with RC_CHANNELS_OVERRIDE") tstart = self.get_sim_time_cached() while True: if self.get_sim_time_cached() - tstart > 10: raise AutoTestTimeoutException("Did not override") self.progress("Sending throttle of %u" % (throttle_override,)) self.mav.mav.rc_channels_override_send( 1, # target system 1, # targe component 65535, # chan1_raw 65535, # chan2_raw throttle_override, # chan3_raw 65535, # chan4_raw 65535, # chan5_raw 65535, # chan6_raw 65535, # chan7_raw 65535) # chan8_raw m = self.mav.recv_match(type='RC_CHANNELS', blocking=True) self.progress("chan3=%f want=%f" % (m.chan3_raw, throttle_override)) if m.chan3_raw == throttle_override: break self.progress("disabling override and making sure we revert to RC input in good time") tstart = self.get_sim_time_cached() while True: if self.get_sim_time_cached() - tstart > 0.5: raise AutoTestTimeoutException("Did not cancel override") self.progress("Sending cancel of throttle override") self.mav.mav.rc_channels_override_send( 1, # target system 1, # targe component 65535, # chan1_raw 65535, # chan2_raw 0, # chan3_raw 65535, # chan4_raw 65535, # chan5_raw 65535, # chan6_raw 65535, # chan7_raw 65535) # chan8_raw m = self.mav.recv_match(type='RC_CHANNELS', blocking=True) self.progress("chan3=%f want=%f" % (m.chan3_raw, normal_rc_throttle)) if m.chan3_raw == normal_rc_throttle: break self.disarm_vehicle() def test_rc_overrides(self): self.context_push() ex = None try: self.set_parameter("RC12_OPTION", 46) self.reboot_sitl() self.mavproxy.send('switch 6\n') # Manual mode self.wait_mode('MANUAL') self.wait_ready_to_arm() self.mavproxy.send('rc 3 1500\n') # throttle at zero self.arm_vehicle() # start moving forward a little: normal_rc_throttle = 1700 self.mavproxy.send('rc 3 %u\n' % normal_rc_throttle) self.wait_groundspeed(5, 100) # allow overrides: self.set_rc(12, 2000) # now override to stop: throttle_override = 1500 tstart = self.get_sim_time_cached() while True: if self.get_sim_time_cached() - tstart > 10: raise AutoTestTimeoutException("Did not reach speed") self.progress("Sending throttle of %u" % (throttle_override,)) self.mav.mav.rc_channels_override_send( 1, # target system 1, # targe component 65535, # chan1_raw 65535, # chan2_raw throttle_override, # chan3_raw 65535, # chan4_raw 65535, # chan5_raw 65535, # chan6_raw 65535, # chan7_raw 65535) # chan8_raw m = self.mav.recv_match(type='VFR_HUD', blocking=True) want_speed = 2.0 self.progress("Speed=%f want=<%f" % (m.groundspeed, want_speed)) if m.groundspeed < want_speed: break # now override to stop - but set the switch on the RC # transmitter to deny overrides; this should send the # speed back up to 5 metres/second: self.set_rc(12, 1000) throttle_override = 1500 tstart = self.get_sim_time_cached() while True: if self.get_sim_time_cached() - tstart > 10: raise AutoTestTimeoutException("Did not stop") print("Sending throttle of %u" % (throttle_override,)) self.mav.mav.rc_channels_override_send( 1, # target system 1, # targe component 65535, # chan1_raw 65535, # chan2_raw throttle_override, # chan3_raw 65535, # chan4_raw 65535, # chan5_raw 65535, # chan6_raw 65535, # chan7_raw 65535) # chan8_raw m = self.mav.recv_match(type='VFR_HUD', blocking=True) want_speed = 5.0 print("Speed=%f want=>%f" % (m.groundspeed, want_speed)) if m.groundspeed > want_speed: break # re-enable RC overrides self.set_rc(12, 2000) # check we revert to normal RC inputs when gcs overrides cease: self.progress("Waiting for RC to revert to normal RC input") while True: m = self.mav.recv_match(type='RC_CHANNELS', blocking=True) print("%s" % m) if m.chan3_raw == normal_rc_throttle: break except Exception as e: self.progress("Exception caught") ex = e self.context_pop() self.disarm_vehicle() self.reboot_sitl() if ex is not None: raise ex def test_manual_control(self): self.context_push() ex = None try: self.set_parameter("RC12_OPTION", 46) # enable/disable rc overrides self.reboot_sitl() self.change_mode("MANUAL") self.wait_ready_to_arm() self.zero_throttle() self.arm_vehicle() self.progress("start moving forward a little") normal_rc_throttle = 1700 self.mavproxy.send('rc 3 %u\n' % normal_rc_throttle) self.wait_groundspeed(5, 100) self.progress("allow overrides") self.set_rc(12, 2000) self.progress("now override to stop") throttle_override_normalized = 0 expected_throttle = 0 # in VFR_HUD tstart = self.get_sim_time_cached() while True: if self.get_sim_time_cached() - tstart > 10: raise AutoTestTimeoutException("Did not reach speed") self.progress("Sending normalized throttle of %d" % (throttle_override_normalized,)) self.mav.mav.manual_control_send( 1, # target system 32767, # x (pitch) 32767, # y (roll) throttle_override_normalized, # z (thrust) 32767, # r (yaw) 0) # button mask m = self.mav.recv_match(type='VFR_HUD', blocking=True) want_speed = 2.0 self.progress("Speed=%f want=<%f throttle=%u want=%u" % (m.groundspeed, want_speed, m.throttle, expected_throttle)) if m.groundspeed < want_speed and m.throttle == expected_throttle: break self.progress("now override to stop - but set the switch on the RC transmitter to deny overrides; this should send the speed back up to 5 metres/second") self.set_rc(12, 1000) throttle_override_normalized = 500 expected_throttle = 36 # in VFR_HUD, corresponding to normal_rc_throttle adjusted for channel min/max tstart = self.get_sim_time_cached() while True: if self.get_sim_time_cached() - tstart > 10: raise AutoTestTimeoutException("Did not stop") print("Sending normalized throttle of %u" % (throttle_override_normalized,)) self.mav.mav.manual_control_send( 1, # target system 32767, # x (pitch) 32767, # y (roll) throttle_override_normalized, # z (thrust) 32767, # r (yaw) 0) # button mask m = self.mav.recv_match(type='VFR_HUD', blocking=True) want_speed = 5.0 self.progress("Speed=%f want=>%f throttle=%u want=%u" % (m.groundspeed, want_speed, m.throttle, expected_throttle)) if m.groundspeed > want_speed and m.throttle == expected_throttle: break # re-enable RC overrides self.set_rc(12, 2000) # check we revert to normal RC inputs when gcs overrides cease: self.progress("Waiting for RC to revert to normal RC input") while True: m = self.mav.recv_match(type='RC_CHANNELS', blocking=True) print("%s" % m) if m.chan3_raw == normal_rc_throttle: break except Exception as e: self.progress("Exception caught") ex = e self.context_pop() self.disarm_vehicle() self.reboot_sitl() if ex is not None: raise ex def test_camera_mission_items(self): self.context_push() ex = None try: self.load_mission("rover-camera-mission.txt") self.wait_ready_to_arm() self.change_mode("AUTO") self.wait_ready_to_arm() self.arm_vehicle() prev_cf = None while True: cf = self.mav.recv_match(type='CAMERA_FEEDBACK', blocking=True) if prev_cf is None: prev_cf = cf continue dist_travelled = self.get_distance_int(prev_cf, cf) prev_cf = cf mc = self.mav.messages.get("MISSION_CURRENT", None) if mc is None: continue elif mc.seq == 2: expected_distance = 2 elif mc.seq == 4: expected_distance = 5 elif mc.seq == 5: break else: continue self.progress("Expected distance %f got %f" % (expected_distance, dist_travelled)) error = abs(expected_distance - dist_travelled) # Rover moves at ~5m/s; we appear to do something at # 5Hz, so we do see over a meter of error! max_error = 1.5 if error > max_error: raise NotAchievedException("Camera distance error: %f (%f)" % (error, max_error)) self.disarm_vehicle() except Exception as e: self.progress("Exception caught") ex = e self.context_pop() if ex is not None: raise ex def test_do_set_mode_via_command_long(self): self.do_set_mode_via_command_long("HOLD") self.do_set_mode_via_command_long("MANUAL") def test_mavproxy_do_set_mode_via_command_long(self): self.mavproxy_do_set_mode_via_command_long("HOLD") self.mavproxy_do_set_mode_via_command_long("MANUAL") def test_sysid_enforce(self): '''Run the same arming code with correct then incorrect SYSID''' self.context_push() ex = None try: # if set_parameter is ever changed to not use MAVProxy # this test is going to break horribly. Sorry. self.set_parameter("SYSID_MYGCS", 255) # assume MAVProxy does this! self.set_parameter("SYSID_ENFORCE", 1) # assume MAVProxy does this! self.change_mode('MANUAL') self.progress("make sure I can arm ATM") self.wait_ready_to_arm() self.arm_vehicle(timeout=5) self.disarm_vehicle() # temporarily set a different system ID than MAVProxy: self.progress("Attempting to arm vehicle myself") old_srcSystem = self.mav.mav.srcSystem try: self.mav.mav.srcSystem = 243 self.arm_vehicle(timeout=5) self.disarm_vehicle() success = False except AutoTestTimeoutException as e: success = True self.mav.mav.srcSystem = old_srcSystem if not success: raise NotAchievedException( "Managed to arm with SYSID_ENFORCE set") self.progress("Attempting to arm vehicle from vehicle component") old_srcSystem = self.mav.mav.srcSystem comp_arm_exception = None try: self.mav.mav.srcSystem = 1 self.arm_vehicle(timeout=5) self.disarm_vehicle() except Exception as e: comp_arm_exception = e self.mav.mav.srcSystem = old_srcSystem if comp_arm_exception is not None: raise comp_arm_exception except Exception as e: self.progress("Exception caught") ex = e self.context_pop() if ex is not None: raise ex def drain_mav_seconds(self, seconds): tstart = self.get_sim_time_cached() while self.get_sim_time_cached() - tstart < seconds: self.drain_mav(); self.delay_sim_time(0.5) def test_button(self): self.set_parameter("SIM_PIN_MASK", 0) self.set_parameter("BTN_ENABLE", 1) btn = 2 pin = 3 self.drain_mav() self.set_parameter("BTN_PIN%u" % btn, pin) m = self.mav.recv_match(type='BUTTON_CHANGE', blocking=True, timeout=1) self.progress("m: %s" % str(m)) if m is None: raise NotAchievedException("Did not get BUTTON_CHANGE event") mask = 1< 0.000001: raise NotAchievedException("Did not get correct lat in fencepoint: got=%f want=%f" % (m.lat, lat)) if abs(m.lng - lng) > 0.000001: raise NotAchievedException("Did not get correct lng in fencepoint: got=%f want=%f" % (m.lng, lng)) self.progress("Roundtrip OK") def test_gcs_fence(self): target_system = 1 target_component = 1 self.progress("Testing FENCE_POINT protocol") self.set_parameter("FENCE_TOTAL", 1) self.roundtrip_fencepoint_protocol(0, 1, 1.2345, 5.4321, target_system=target_system, target_component=target_component) lat = 2.345 lng = 4.321 self.roundtrip_fencepoint_protocol(0, 1, lat, lng, target_system=target_system, target_component=target_component) def test_offboard(self, timeout=90): self.load_mission("rover-guided-mission.txt") self.wait_ready_to_arm(require_absolute=True) self.arm_vehicle() self.change_mode("AUTO") offboard_expected_duration = 10 # see mission file if self.mav.messages.get("SET_POSITION_TARGET_GLOBAL_INT", None): raise PreconditionFailedException("Already have SET_POSITION_TARGET_GLOBAL_INT") tstart = self.get_sim_time_cached() last_heartbeat_sent = 0 got_sptgi = False magic_waypoint_tstart = 0 magic_waypoint_tstop = 0 while True: if self.mode_is("HOLD", cached=True): break now = self.get_sim_time_cached() if now - last_heartbeat_sent > 1: last_heartbeat_sent = now self.mav.mav.heartbeat_send(mavutil.mavlink.MAV_TYPE_ONBOARD_CONTROLLER, mavutil.mavlink.MAV_AUTOPILOT_INVALID, 0, 0, 0) if now - tstart > timeout: raise AutoTestTimeoutException("Didn't complete") magic_waypoint = 3 # mc = self.mav.messages.get("MISSION_CURRENT", None) mc = self.mav.recv_match(type="MISSION_CURRENT", blocking=False) if mc is not None: print("%s" % str(mc)) if mc.seq == magic_waypoint: print("At magic waypoint") if magic_waypoint_tstart == 0: magic_waypoint_tstart = self.get_sim_time_cached() sptgi = self.mav.messages.get("SET_POSITION_TARGET_GLOBAL_INT", None) if sptgi is not None: got_sptgi = True elif mc.seq > magic_waypoint: if magic_waypoint_tstop == 0: magic_waypoint_tstop = self.get_sim_time_cached() self.disarm_vehicle() offboard_duration = magic_waypoint_tstop - magic_waypoint_tstart if abs(offboard_duration - offboard_expected_duration) > 1: raise NotAchievedException("Did not stay in offboard control for correct time (want=%f got=%f)" % (offboard_expected_duration, offboard_duration)) if not got_sptgi: raise NotAchievedException("Did not get sptgi message") print("spgti: %s" % str(sptgi)) def assert_mission_count_on_link(self, mav, expected_count, target_system, target_component, mission_type): self.drain_mav(mav) self.progress("waiting for a message - any message....") m = mav.recv_match(blocking=True, timeout=1) self.progress("Received (%s)" % str(m)) if not mav.mavlink20(): raise NotAchievedException("Not doing mavlink2") tstart = self.get_sim_time() mav.mav.mission_request_list_send(target_system, target_component, mission_type) while True: if self.get_sim_time_cached() - tstart > 10: raise NotAchievedException("Did not receive MISSION_COUNT on link") m = mav.recv_match(blocking=True, timeout=1) if m is None: self.progress("No messages") continue self.progress("Received (%s)" % str(m)) if m.get_type() == "MISSION_ACK": if m.type != mavutil.mavlink.MAV_MISSION_ACCEPTED: raise NotAchievedException("Expected MISSION_COUNT, got (%s)" % m) if m.get_type() == "MISSION_COUNT": break if m.mission_type != mission_type: raise NotAchievedException("Did not get expected mission type (want=%u got=%u)" % (mission_type, m.mission_type)) if m.count != expected_count: raise NotAchievedException("Bad count received (want=%u got=%u)" % (expected_count, m.count)) def get_mission_item_int_on_link(self, item, mav, target_system, target_component, mission_type): mav.mav.mission_request_int_send(target_system, target_component, item, mission_type) m = mav.recv_match(type='MISSION_ITEM_INT', blocking=True, timeout=1) if m is None: raise NotAchievedException("Did not receive mission item int") if m.mission_type != mission_type: raise NotAchievedException("Mission item of incorrect type") if m.target_system != mav.mav.srcSystem: raise NotAchievedException("Unexpected target system %u want=%u" % (m.target_system, mav.mav.srcSystem)) if m.seq != item: raise NotAchievedException("Incorrect sequence number on received item got=%u want=%u" % (m.seq, item)) if m.mission_type != mission_type: raise NotAchievedException("Mission type incorrect on received item (want=%u got=%u)" % (mission_type, m.mission_type)) if m.target_component != mav.mav.srcComponent: raise NotAchievedException("Unexpected target component %u want=%u" % (m.target_component, mav.mav.srcComponent)) return m def get_mission_item_on_link(self, item, mav, target_system, target_component, mission_type): mav.mav.mission_request_send(target_system, target_component, item, mission_type) m = mav.recv_match(type='MISSION_ITEM', blocking=True, timeout=1) if m is None: raise NotAchievedException("Did not receive mission item int") if m.target_system != mav.mav.srcSystem: raise NotAchievedException("Unexpected target system %u want=%u" % (m.target_system, mav.mav.srcSystem)) if m.seq != item: raise NotAchievedException("Incorrect sequence number on received item_int got=%u want=%u" % (m.seq, item)) if m.mission_type != mission_type: raise NotAchievedException("Mission type incorrect on received item_int (want=%u got=%u)" % (mission_type, m.mission_type)) if m.target_component != mav.mav.srcComponent: raise NotAchievedException("Unexpected target component %u want=%u" % (m.target_component, mav.mav.srcComponent)) return m def clear_mission(self, mission_type, target_system=1, target_component=1): self.mav.mav.mission_count_send(target_system, target_component, 0, mission_type) m = self.mav.recv_match(type='MISSION_ACK', blocking=True, timeout=5) if m is None: raise NotAchievedException("Expected ACK for clearing mission") if m.target_system != self.mav.mav.srcSystem: raise NotAchievedException("ACK not targetted at correct system want=%u got=%u" % (self.mav.mav.srcSystem, m.target_system)) if m.target_component != self.mav.mav.srcComponent: raise NotAchievedException("ACK not targetted at correct component want=%u got=%u" % (self.mav.mav.srcComponent, m.target_component)) if m.type != mavutil.mavlink.MAV_MISSION_ACCEPTED: raise NotAchievedException("Expected MAV_MISSION_ACCEPTED got %s" % (mavutil.mavlink.enums["MAV_MISSION_RESULT"][m.type].name,)) def assert_receive_mission_item_request(self, mission_type, seq): self.progress("Expecting request for item %u" % seq) m = self.mav.recv_match(type='MISSION_REQUEST', blocking=True, timeout=1) if m is None: raise NotAchievedException("Did not get item request") if m.mission_type != mission_type: raise NotAchievedException("Incorrect mission type (wanted=%u got=%u)" % (mission_type, m.mission_type)) if m.seq != seq: raise NotAchievedException("Unexpected sequence number (want=%u got=%u)" % (seq, m.seq)) self.progress("Received item request OK") def assert_receive_mission_ack(self, mission_type, want_type=mavutil.mavlink.MAV_MISSION_ACCEPTED, target_system=None, target_component=None, mav=None): if mav is None: mav = self.mav if target_system is None: target_system = mav.mav.srcSystem if target_component is None: target_component = mav.mav.srcComponent self.progress("Expecting mission ack") m = mav.recv_match(type='MISSION_ACK', blocking=True, timeout=5) self.progress("Received ACK (%s)" % str(m)) if m is None: raise NotAchievedException("Expected mission ACK") if m.target_system != target_system: raise NotAchievedException("ACK not targetted at correct system want=%u got=%u" % (mav.mav.srcSystem, m.target_system)) if m.target_component != target_component: raise NotAchievedException("ACK not targetted at correct component") if m.mission_type != mission_type: raise NotAchievedException("Unexpected mission type %u want=%u" % (m.mission_type, mission_type)) if m.type != want_type: raise NotAchievedException("Expected ack type got %u got %u" % (want_type, m.type)) def test_gcs_rally(self): target_system = 1 target_component = 1 self.mavproxy.send('rally clear\n') self.delay_sim_time(1) if self.get_parameter("RALLY_TOTAL") != 0: raise NotAchievedException("Failed to clear rally points") old_srcSystem = self.mav.mav.srcSystem # stop MAVProxy poking the autopilot: self.mavproxy.send('module unload rally\n') self.mavproxy.expect("Unloaded module rally") self.mavproxy.send('module unload wp\n') self.mavproxy.expect("Unloaded module wp") try: item1_lat = int(2.0000 *1e7) items = [ self.mav.mav.mission_item_int_encode( target_system, target_component, 0, # seq mavutil.mavlink.MAV_FRAME_GLOBAL_RELATIVE_ALT, mavutil.mavlink.MAV_CMD_NAV_RALLY_POINT, 0, # current 0, # autocontinue 0, # p1 0, # p2 0, # p3 0, # p4 int(1.0000 *1e7), # latitude int(1.0000 *1e7), # longitude 31.0000, # altitude mavutil.mavlink.MAV_MISSION_TYPE_RALLY), self.mav.mav.mission_item_int_encode( target_system, target_component, 1, # seq mavutil.mavlink.MAV_FRAME_GLOBAL_RELATIVE_ALT, mavutil.mavlink.MAV_CMD_NAV_RALLY_POINT, 0, # current 0, # autocontinue 0, # p1 0, # p2 0, # p3 0, # p4 item1_lat, # latitude int(2.0000 *1e7), # longitude 32.0000, # altitude mavutil.mavlink.MAV_MISSION_TYPE_RALLY), self.mav.mav.mission_item_int_encode( target_system, target_component, 2, # seq mavutil.mavlink.MAV_FRAME_GLOBAL_RELATIVE_ALT, mavutil.mavlink.MAV_CMD_NAV_RALLY_POINT, 0, # current 0, # autocontinue 0, # p1 0, # p2 0, # p3 0, # p4 int(3.0000 *1e7), # latitude int(3.0000 *1e7), # longitude 33.0000, # altitude mavutil.mavlink.MAV_MISSION_TYPE_RALLY), ] self.upload_using_mission_protocol(mavutil.mavlink.MAV_MISSION_TYPE_RALLY, items) downloaded = self.download_using_mission_protocol(mavutil.mavlink.MAV_MISSION_TYPE_RALLY) print("Got items (%s)" % str(items)) if len(downloaded) != len(items): raise NotAchievedException("Did not download correct number of items want=%u got=%u" % (len(downloaded), len(items))) rally_total = self.get_parameter("RALLY_TOTAL") if rally_total != len(downloaded): raise NotAchievedException("Unexpected rally point count: want=%u got=%u" % (len(items), rally_total)) self.progress("Pruning count by setting parameter (urgh)") self.set_parameter("RALLY_TOTAL", 2) downloaded = self.download_using_mission_protocol(mavutil.mavlink.MAV_MISSION_TYPE_RALLY) if len(downloaded) != 2: raise NotAchievedException("Failed to prune rally points by setting parameter. want=%u got=%u" % (2, len(downloaded))) self.progress("Uploading a third item using old protocol") new_item2_lat = int(6.0 *1e7) self.set_parameter("RALLY_TOTAL", 3) self.mav.mav.rally_point_send(target_system, target_component, 2, # sequence number 3, # total count new_item2_lat, int(7.0 *1e7), 15, 0, # "break" alt?! 0, # "land dir" 0) # flags downloaded = self.download_using_mission_protocol(mavutil.mavlink.MAV_MISSION_TYPE_RALLY) if len(downloaded) != 3: raise NotAchievedException("resetting rally point count didn't change items returned") if downloaded[2].x != new_item2_lat: raise NotAchievedException("Bad lattitude in downloaded item: want=%u got=%u" % (new_item2_lat, downloaded[2].x)) self.progress("Grabbing original item 1 using original protocol") self.mav.mav.rally_fetch_point_send(target_system, target_component, 1) m = self.mav.recv_match(type="RALLY_POINT", blocking=True, timeout=1) if m.target_system != 255: raise NotAchievedException("Bad target_system on received rally point (want=%u got=%u)" % (255, m.target_system)) if m.target_component != 250: # autotest's component ID raise NotAchievedException("Bad target_component on received rally point") if m.lat != item1_lat: raise NotAchievedException("Bad latitude on received rally point") self.start_subtest("Test upload lockout and timeout") self.progress("Starting upload from normal sysid") self.mav.mav.mission_count_send(target_system, target_component, len(items), mavutil.mavlink.MAV_MISSION_TYPE_RALLY) self.drain_mav() # throw away requests for items self.mav.mav.srcSystem = 243 self.progress("Attempting upload from sysid=%u" % (self.mav.mav.srcSystem,)) self.mav.mav.mission_count_send(target_system, target_component, len(items), mavutil.mavlink.MAV_MISSION_TYPE_RALLY) self.assert_receive_mission_ack(mavutil.mavlink.MAV_MISSION_TYPE_RALLY, want_type=mavutil.mavlink.MAV_MISSION_DENIED) self.progress("Attempting download from sysid=%u" % (self.mav.mav.srcSystem,)) self.mav.mav.mission_request_list_send(target_system, target_component, mavutil.mavlink.MAV_MISSION_TYPE_RALLY) self.assert_receive_mission_ack(mavutil.mavlink.MAV_MISSION_TYPE_RALLY, want_type=mavutil.mavlink.MAV_MISSION_DENIED) # wait for the upload from sysid=1 to time out: self.mavproxy.expect("upload timeout") self.progress("Now trying to upload empty mission after timeout") self.mav.mav.mission_count_send(target_system, target_component, 0, mavutil.mavlink.MAV_MISSION_TYPE_RALLY) self.assert_receive_mission_ack(mavutil.mavlink.MAV_MISSION_TYPE_RALLY) self.drain_mav() self.start_subtest("Check rally upload/download across separate links") self.upload_using_mission_protocol(mavutil.mavlink.MAV_MISSION_TYPE_RALLY, items) self.progress("ensure a mavlink1 connection can't do anything useful with new item types") mav2 = mavutil.mavlink_connection("tcp:localhost:5763", robust_parsing=True, source_system = 7, source_component=7) mav2.mav.mission_request_list_send(target_system, target_component, mission_type=mavutil.mavlink.MAV_MISSION_TYPE_RALLY) # so this looks a bit odd; the other end isn't sending # mavlink2 so can't fill in the extension here. self.assert_receive_mission_ack( mavutil.mavlink.MAV_MISSION_TYPE_MISSION, want_type=mavutil.mavlink.MAV_MISSION_UNSUPPORTED, mav=mav2, ) self.set_parameter("SERIAL2_PROTOCOL", 2) expected_count = 3 self.progress("Assert mission count on new link") self.assert_mission_count_on_link(mav2, expected_count, target_system, target_component, mavutil.mavlink.MAV_MISSION_TYPE_RALLY) self.progress("Assert mission count on original link") self.assert_mission_count_on_link(self.mav, expected_count, target_system, target_component, mavutil.mavlink.MAV_MISSION_TYPE_RALLY) self.progress("Get first item on new link") m2 = self.get_mission_item_int_on_link(2, mav2, target_system, target_component, mavutil.mavlink.MAV_MISSION_TYPE_RALLY) self.progress("Get first item on original link") m = self.get_mission_item_int_on_link(2, self.mav, target_system, target_component, mavutil.mavlink.MAV_MISSION_TYPE_RALLY) if m2.x != m.x: raise NotAchievedException("mission items do not match (%d vs %d)" % (m2.x, m.x)) self.get_mission_item_on_link(2, self.mav, target_system, target_component, mavutil.mavlink.MAV_MISSION_TYPE_RALLY) # ensure we get nacks for bad mission item requests: self.mav.mav.mission_request_send(target_system, target_component, 65, mavutil.mavlink.MAV_MISSION_TYPE_RALLY) self.assert_receive_mission_ack( mavutil.mavlink.MAV_MISSION_TYPE_RALLY, want_type=mavutil.mavlink.MAV_MISSION_INVALID_SEQUENCE, ) self.mav.mav.mission_request_int_send(target_system, target_component, 65, mavutil.mavlink.MAV_MISSION_TYPE_RALLY) self.assert_receive_mission_ack( mavutil.mavlink.MAV_MISSION_TYPE_RALLY, want_type=mavutil.mavlink.MAV_MISSION_INVALID_SEQUENCE, ) self.start_subtest("Should enforce items come from correct GCS") self.mav.mav.mission_count_send(target_system, target_component, 1, mavutil.mavlink.MAV_MISSION_TYPE_RALLY) self.assert_receive_mission_item_request(mavutil.mavlink.MAV_MISSION_TYPE_RALLY, 0) self.progress("Attempting to upload from bad sysid") old_sysid = self.mav.mav.srcSystem self.mav.mav.srcSystem = 17 items[0].pack(self.mav.mav) self.mav.mav.send(items[0]) self.mav.mav.srcSystem = old_sysid self.assert_receive_mission_ack(mavutil.mavlink.MAV_MISSION_TYPE_RALLY, want_type=mavutil.mavlink.MAV_MISSION_DENIED, target_system=17) self.progress("Sending from correct sysid") items[0].pack(self.mav.mav) self.mav.mav.send(items[0]) self.assert_receive_mission_ack(mavutil.mavlink.MAV_MISSION_TYPE_RALLY) self.drain_mav() self.drain_all_pexpects() self.start_subtest("Attempt to send item on different link to that which we are sending requests on") self.progress("Sending count") self.mav.mav.mission_count_send(target_system, target_component, 2, mavutil.mavlink.MAV_MISSION_TYPE_RALLY) self.assert_receive_mission_item_request(mavutil.mavlink.MAV_MISSION_TYPE_RALLY, 0) old_mav2_system = mav2.mav.srcSystem old_mav2_component = mav2.mav.srcComponent mav2.mav.srcSystem = self.mav.mav.srcSystem mav2.mav.srcComponent = self.mav.mav.srcComponent self.progress("Sending item on second link") # note that the routing table in ArduPilot now will say # this sysid/compid is on both links which may cause # weirdness... items[0].pack(mav2.mav) mav2.mav.send(items[0]) mav2.mav.srcSystem = old_mav2_system mav2.mav.srcComponent = old_mav2_component # we continue to receive requests on the original link: m = self.mav.recv_match(type='MISSION_REQUEST', blocking=True, timeout=1) if m is None: raise NotAchievedException("Did not get mission request") if m.mission_type != mavutil.mavlink.MAV_MISSION_TYPE_RALLY: raise NotAchievedException("Mission request of incorrect type") if m.seq != 1: raise NotAchievedException("Unexpected sequence number (expected=%u got=%u)" % (1, m.seq)) items[1].pack(self.mav.mav) self.mav.mav.send(items[1]) self.assert_receive_mission_ack(mavutil.mavlink.MAV_MISSION_TYPE_RALLY) self.drain_mav() self.drain_all_pexpects() self.start_subtest("Upload mission and rally points at same time") self.progress("Sending rally count") self.mav.mav.mission_count_send(target_system, target_component, 3, mavutil.mavlink.MAV_MISSION_TYPE_RALLY) self.assert_receive_mission_item_request(mavutil.mavlink.MAV_MISSION_TYPE_RALLY, 0) self.progress("Sending wp count") self.mav.mav.mission_count_send(target_system, target_component, 3, mavutil.mavlink.MAV_MISSION_TYPE_MISSION) self.assert_receive_mission_item_request(mavutil.mavlink.MAV_MISSION_TYPE_MISSION, 0) self.progress("Answering request for mission item 0") self.mav.mav.mission_item_int_send( target_system, target_component, 0, # seq mavutil.mavlink.MAV_FRAME_GLOBAL_RELATIVE_ALT, mavutil.mavlink.MAV_CMD_NAV_WAYPOINT, 0, # current 0, # autocontinue 0, # p1 0, # p2 0, # p3 0, # p4 int(1.1000 *1e7), # latitude int(1.2000 *1e7), # longitude 321.0000, # altitude mavutil.mavlink.MAV_MISSION_TYPE_MISSION), self.assert_receive_mission_item_request(mavutil.mavlink.MAV_MISSION_TYPE_MISSION, 1) self.progress("Answering request for rally point 0") items[0].pack(self.mav.mav) self.mav.mav.send(items[0]) self.progress("Expecting request for rally item 1") self.assert_receive_mission_item_request(mavutil.mavlink.MAV_MISSION_TYPE_RALLY, 1) self.progress("Answering request for rally point 1") items[1].pack(self.mav.mav) self.mav.mav.send(items[1]) self.progress("Expecting request for rally item 2") self.assert_receive_mission_item_request(mavutil.mavlink.MAV_MISSION_TYPE_RALLY, 2) self.progress("Answering request for rally point 2") items[2].pack(self.mav.mav) self.mav.mav.send(items[2]) self.progress("Expecting mission ack for rally") self.assert_receive_mission_ack(mavutil.mavlink.MAV_MISSION_TYPE_RALLY) self.progress("Answering request for waypoints item 1") self.mav.mav.mission_item_int_send( target_system, target_component, 1, # seq mavutil.mavlink.MAV_FRAME_GLOBAL_RELATIVE_ALT, mavutil.mavlink.MAV_CMD_NAV_WAYPOINT, 0, # current 0, # autocontinue 0, # p1 0, # p2 0, # p3 0, # p4 int(1.1000 *1e7), # latitude int(1.2000 *1e7), # longitude 321.0000, # altitude mavutil.mavlink.MAV_MISSION_TYPE_MISSION), self.assert_receive_mission_item_request(mavutil.mavlink.MAV_MISSION_TYPE_MISSION, 2) self.progress("Answering request for waypoints item 2") self.mav.mav.mission_item_int_send( target_system, target_component, 2, # seq mavutil.mavlink.MAV_FRAME_GLOBAL_RELATIVE_ALT, mavutil.mavlink.MAV_CMD_NAV_WAYPOINT, 0, # current 0, # autocontinue 0, # p1 0, # p2 0, # p3 0, # p4 int(1.1000 *1e7), # latitude int(1.2000 *1e7), # longitude 321.0000, # altitude mavutil.mavlink.MAV_MISSION_TYPE_MISSION), self.assert_receive_mission_ack(mavutil.mavlink.MAV_MISSION_TYPE_MISSION) self.start_subtest("Test write-partial-list") self.progress("Clearing rally points using count-send") self.clear_mission(mavutil.mavlink.MAV_MISSION_TYPE_RALLY, target_system=target_system, target_component=target_component) self.progress("Should not be able to set items completely past the waypoint count") self.upload_using_mission_protocol(mavutil.mavlink.MAV_MISSION_TYPE_RALLY, items) self.mav.mav.mission_write_partial_list_send( target_system, target_component, 17, 20, mavutil.mavlink.MAV_MISSION_TYPE_RALLY) self.assert_receive_mission_ack(mavutil.mavlink.MAV_MISSION_TYPE_RALLY, want_type=mavutil.mavlink.MAV_MISSION_ERROR) self.progress("Should not be able to set items overlapping the waypoint count") self.mav.mav.mission_write_partial_list_send( target_system, target_component, 0, 20, mavutil.mavlink.MAV_MISSION_TYPE_RALLY) self.assert_receive_mission_ack(mavutil.mavlink.MAV_MISSION_TYPE_RALLY, want_type=mavutil.mavlink.MAV_MISSION_ERROR) self.progress("try to overwrite items 1 and 2") self.mav.mav.mission_write_partial_list_send( target_system, target_component, 1, 2, mavutil.mavlink.MAV_MISSION_TYPE_RALLY) self.assert_receive_mission_item_request(mavutil.mavlink.MAV_MISSION_TYPE_RALLY, 1) self.progress("Try shoving up an incorrectly sequenced item") self.mav.mav.mission_item_int_send( target_system, target_component, 0, # seq mavutil.mavlink.MAV_FRAME_GLOBAL_RELATIVE_ALT, mavutil.mavlink.MAV_CMD_NAV_RALLY_POINT, 0, # current 0, # autocontinue 0, # p1 0, # p2 0, # p3 0, # p4 int(1.1000 *1e7), # latitude int(1.2000 *1e7), # longitude 321.0000, # altitude mavutil.mavlink.MAV_MISSION_TYPE_RALLY), self.assert_receive_mission_ack(mavutil.mavlink.MAV_MISSION_TYPE_RALLY, want_type=mavutil.mavlink.MAV_MISSION_INVALID_SEQUENCE) self.progress("Try shoving up an incorrectly sequenced item (but within band)") self.mav.mav.mission_item_int_send( target_system, target_component, 2, # seq mavutil.mavlink.MAV_FRAME_GLOBAL_RELATIVE_ALT, mavutil.mavlink.MAV_CMD_NAV_RALLY_POINT, 0, # current 0, # autocontinue 0, # p1 0, # p2 0, # p3 0, # p4 int(1.1000 *1e7), # latitude int(1.2000 *1e7), # longitude 321.0000, # altitude mavutil.mavlink.MAV_MISSION_TYPE_RALLY), self.assert_receive_mission_ack(mavutil.mavlink.MAV_MISSION_TYPE_RALLY, want_type=mavutil.mavlink.MAV_MISSION_INVALID_SEQUENCE) self.progress("Now provide correct item") item1_latitude = int(1.2345*1e7) self.mav.mav.mission_item_int_send( target_system, target_component, 1, # seq mavutil.mavlink.MAV_FRAME_GLOBAL_RELATIVE_ALT, mavutil.mavlink.MAV_CMD_NAV_RALLY_POINT, 0, # current 0, # autocontinue 0, # p1 0, # p2 0, # p3 0, # p4 item1_latitude, # latitude int(1.2000 *1e7), # longitude 321.0000, # altitude mavutil.mavlink.MAV_MISSION_TYPE_RALLY), self.assert_receive_mission_item_request(mavutil.mavlink.MAV_MISSION_TYPE_RALLY, 2) self.progress("Answering request for rally point 2") items[2].pack(self.mav.mav) self.mav.mav.send(items[2]) self.assert_receive_mission_ack(mavutil.mavlink.MAV_MISSION_TYPE_RALLY) self.progress("TODO: ensure partial mission write was good") self.start_subtest("clear mission types") self.assert_mission_count_on_link(self.mav, 3, target_system, target_component, mavutil.mavlink.MAV_MISSION_TYPE_RALLY) self.assert_mission_count_on_link(self.mav, 3, target_system, target_component, mavutil.mavlink.MAV_MISSION_TYPE_MISSION) self.mav.mav.mission_clear_all_send(target_system, target_component, mavutil.mavlink.MAV_MISSION_TYPE_RALLY) self.assert_mission_count_on_link(self.mav, 0, target_system, target_component, mavutil.mavlink.MAV_MISSION_TYPE_RALLY) self.assert_mission_count_on_link(self.mav, 3, target_system, target_component, mavutil.mavlink.MAV_MISSION_TYPE_MISSION) self.mav.mav.mission_clear_all_send(target_system, target_component, mavutil.mavlink.MAV_MISSION_TYPE_MISSION) self.assert_mission_count_on_link(self.mav, 0, target_system, target_component, mavutil.mavlink.MAV_MISSION_TYPE_RALLY) self.assert_mission_count_on_link(self.mav, 0, target_system, target_component, mavutil.mavlink.MAV_MISSION_TYPE_MISSION) self.start_subtest("try sending out-of-range counts") self.mav.mav.mission_count_send(target_system, target_component, 1, 230) self.assert_receive_mission_ack(230, want_type=mavutil.mavlink.MAV_MISSION_UNSUPPORTED) self.mav.mav.mission_count_send(target_system, target_component, 16000, mavutil.mavlink.MAV_MISSION_TYPE_RALLY) self.assert_receive_mission_ack(mavutil.mavlink.MAV_MISSION_TYPE_RALLY, want_type=mavutil.mavlink.MAV_MISSION_NO_SPACE) except Exception as e: self.progress("Received exception (%s)" % self.get_exception_stacktrace(e)) self.mav.mav.srcSystem = old_srcSystem raise e self.mavproxy.send('module load rally\n') self.mavproxy.expect("Loaded module rally") self.mavproxy.send('module load wp\n') self.mavproxy.expect("Loaded module wp") def wait_distance_to_home(self, distance, accuracy=5): tstart = self.get_sim_time() timeout = 30 while True: if self.get_sim_time_cached() - tstart > timeout: raise AutoTestTimeoutException("Failed to get home") self.mav.recv_match(type='VFR_HUD', blocking=True) self.progress("Dist from home: %.02f" % self.distance_to_home(use_cached_home=True)) if abs(distance-self.distance_to_home(use_cached_home=True)) <= accuracy: break def drive_smartrtl(self): self.change_mode("STEERING") self.wait_ready_to_arm() self.arm_vehicle() # drive two sides of a square, make sure we don't go back through # the middle of the square self.progress("Driving North") self.reach_heading_manual(0) self.set_rc(3, 2000) self.delay_sim_time(5) self.set_rc(3, 1000) self.wait_groundspeed(0, 1) loc = self.mav.location() self.progress("Driving East") self.set_rc(3, 2000) self.reach_heading_manual(90) self.set_rc(3, 2000) self.delay_sim_time(5) self.set_rc(3, 1000) self.progress("Entering smartrtl") self.change_mode("SMART_RTL") self.progress("Ensure we go via intermediate point") self.wait_distance_to_location(loc, 0, 5) self.progress("Ensure we get home") self.wait_distance_to_home(5, accuracy=2) self.disarm_vehicle() def test_motor_test(self): '''AKA run-rover-run''' magic_throttle_value = 1812 self.run_cmd(mavutil.mavlink.MAV_CMD_DO_MOTOR_TEST, 1, # p1 - motor instance mavutil.mavlink.MOTOR_TEST_THROTTLE_PWM, # p2 - throttle type magic_throttle_value, # p3 - throttle 5, # p4 - timeout 1, # p5 - motor count 0, # p6 - test order (see MOTOR_TEST_ORDER) 0, # p7 ) self.mav.motors_armed_wait() self.progress("Waiting for magic throttle value") self.wait_servo_channel_value(3, magic_throttle_value) self.wait_servo_channel_value(3, self.get_parameter("RC3_TRIM", 5), timeout=10) self.mav.motors_disarmed_wait() def tests(self): '''return list of all tests''' ret = super(AutoTestRover, self).tests() ret.extend([ ("MAVProxy_SetModeUsingSwitch", "Set modes via mavproxy switch", self.test_setting_modes_via_mavproxy_switch), ("MAVProxy_SetModeUsingMode", "Set modes via mavproxy mode command", self.test_setting_modes_via_mavproxy_mode_command), ("ModeSwitch", "Set modes via modeswitch", self.test_setting_modes_via_modeswitch), ("AuxModeSwitch", "Set modes via auxswitches", self.test_setting_modes_via_auxswitches), ("DriveRTL", "Drive an RTL Mission", self.drive_rtl_mission), ("SmartRTL", "Test SmartRTL", self.drive_smartrtl), ("DriveSquare", "Learn/Drive Square with Ch7 option", self.drive_square), ("DriveMission", "Drive Mission %s" % "rover1.txt", lambda: self.drive_mission("rover1.txt")), # disabled due to frequent failures in travis. This test needs re-writing # ("Drive Brake", self.drive_brake), ("GetBanner", "Get Banner", self.do_get_banner), ("GetCapabilities", "Get Capabilities", self.do_get_autopilot_capabilities), ("DO_SET_MODE", "Set mode via MAV_COMMAND_DO_SET_MODE", self.test_do_set_mode_via_command_long), ("MAVProxy_DO_SET_MODE", "Set mode via MAV_COMMAND_DO_SET_MODE with MAVProxy", self.test_mavproxy_do_set_mode_via_command_long), ("ServoRelayEvents", "Test ServoRelayEvents", self.test_servorelayevents), ("RCOverrides", "Test RC overrides", self.test_rc_overrides), ("RCOverridesCancel", "Test RC overrides Cancel", self.test_rc_override_cancel), ("MANUAL_CONTROL", "Test mavlink MANUAL_CONTROL", self.test_manual_control), ("Sprayer", "Test Sprayer", self.test_sprayer), ("AC_Avoidance", "Test AC Avoidance switch", self.drive_fence_ac_avoidance), ("CameraMission", "Test Camera Mission Items", self.test_camera_mission_items), # Gripper test ("Gripper", "Test gripper", self.test_gripper), ("GripperMission", "Test Gripper Mission Items", self.test_gripper_mission), ("SET_MESSAGE_INTERVAL", "Test MAV_CMD_SET_MESSAGE_INTERVAL", self.test_set_message_interval), ("REQUEST_MESSAGE", "Test MAV_CMD_REQUEST_MESSAGE", self.test_request_message), ("SYSID_ENFORCE", "Test enforcement of SYSID_MYGCS", self.test_sysid_enforce), ("Button", "Test Buttons", self.test_button), ("Rally", "Test Rally Points", self.test_rally_points), ("Offboard", "Test Offboard Control", self.test_offboard), ("GCSFence", "Upload and download of fence", self.test_gcs_fence), ("GCSRally", "Upload and download of rally", self.test_gcs_rally), ("MotorTest", "Motor Test triggered via mavlink", self.test_motor_test), ("DataFlashOverMAVLink", "Test DataFlash over MAVLink", self.test_dataflash_over_mavlink), ("DataFlashSITL", "Test DataFlash SITL backend", self.test_dataflash_sitl), ("DownLoadLogs", "Download logs", lambda: self.log_download( self.buildlogs_path("APMrover2-log.bin"), upload_logs=len(self.fail_list) > 0)), ]) return ret def rc_defaults(self): ret = super(AutoTestRover, self).rc_defaults() ret[3] = 1500 ret[8] = 1800 return ret def default_mode(self): return 'MANUAL'