autotest: set and get parameters directly using mavlink message

This commit is contained in:
Peter Barker 2020-06-30 22:54:49 +10:00 committed by Peter Barker
parent ba91cbb270
commit b7641345e1
3 changed files with 123 additions and 19 deletions

View File

@ -447,7 +447,6 @@ class AutoTestCopter(AutoTest):
try:
self.set_parameter("PLND_ENABLED", 1)
self.fetch_parameters()
self.set_parameter("PLND_TYPE", 4)
self.set_analog_rangefinder_parameters()
@ -2524,7 +2523,6 @@ class AutoTestCopter(AutoTest):
ex = None
try:
self.set_parameter("PLND_ENABLED", 1)
self.fetch_parameters()
self.set_parameter("PLND_TYPE", 4)
self.set_analog_rangefinder_parameters()
@ -4430,7 +4428,6 @@ class AutoTestCopter(AutoTest):
ex = None
try:
self.set_parameter("PLND_ENABLED", 1)
self.fetch_parameters()
# enable companion backend:
self.set_parameter("PLND_TYPE", 1)

View File

@ -23,6 +23,7 @@ from MAVProxy.modules.lib import mp_util
from pymavlink import mavwp, mavutil, DFReader
from pymavlink import mavextra
from pymavlink import mavparm
from pysim import util, vehicleinfo
@ -904,6 +905,40 @@ class AutoTest(ABC):
return self.log_name()
def repeatedly_apply_parameter_file(self, filepath):
if False:
return self.repeatedly_apply_parameter_file_mavproxy(filepath)
parameters = mavparm.MAVParmDict()
correct_parameters = set()
parameters.load(filepath)
failfetch = None
for i in range(10):
self.progress("Apply parameter file (%s) pass %u" % (filepath, i+1,))
success = True
for p in parameters.keys():
if p in correct_parameters:
continue
try:
current = self.get_parameter(p, verbose=False)
except Exception as e:
# may still be hidden
self.progress("get_parameter(%s) failed" % p)
failfetch = p
success = False
continue
delta = current - parameters[p]
self.progress("%s: want=%f got=%f delta=%f" %
(p, parameters[p], current, delta))
if abs(delta) > 0.00001:
success = False
self.set_parameter(p, parameters[p], verbose=False)
continue
correct_parameters.add(p)
if success:
self.progress("Applied parameter file (%s)" % (filepath))
return
raise NotAchievedException("Failed to load parameter file; last failfetch was %s" % failfetch)
def repeatedly_apply_parameter_file_mavproxy(self, filepath):
'''keep applying a parameter file until no parameters changed'''
for i in range(0, 3):
self.mavproxy.send("param load %s\n" % filepath)
@ -938,7 +973,8 @@ class AutoTest(ABC):
self.set_parameter("EK3_ENABLE", 1)
self.set_parameter("AHRS_EKF_TYPE", self.force_ahrs_type)
self.reboot_sitl()
self.fetch_parameters()
if False: # FIXME: do do this if using MAVProxy:
self.fetch_parameters()
def count_lines_in_filepath(self, filepath):
return len([i for i in open(filepath)])
@ -984,11 +1020,7 @@ class AutoTest(ABC):
self.mavproxy.send("param fetch\n")
self.mavproxy.expect("Received [0-9]+ parameters")
def reboot_sitl_mav(self, required_bootcount=None):
"""Reboot SITL instance using mavlink and wait for it to reconnect."""
old_bootcount = self.get_parameter('STAT_BOOTCNT')
# ardupilot SITL may actually NAK the reboot; replace with
# run_cmd when we don't do that.
def send_reboot_command(self):
self.mav.mav.command_long_send(self.sysid_thismav(),
1,
mavutil.mavlink.MAV_CMD_PREFLIGHT_REBOOT_SHUTDOWN,
@ -1000,6 +1032,13 @@ class AutoTest(ABC):
0,
0,
0)
def reboot_sitl_mav(self, required_bootcount=None):
"""Reboot SITL instance using mavlink and wait for it to reconnect."""
old_bootcount = self.get_parameter('STAT_BOOTCNT')
# ardupilot SITL may actually NAK the reboot; replace with
# run_cmd when we don't do that.
self.send_reboot_command()
self.detect_and_handle_reboot(old_bootcount, required_bootcount=required_bootcount)
def send_cmd_enter_cpu_lockup(self):
@ -1036,7 +1075,7 @@ class AutoTest(ABC):
if time.time() - tstart > timeout:
raise AutoTestTimeoutException("Did not detect reboot")
try:
current_bootcount = self.get_parameter('STAT_BOOTCNT', timeout=1)
current_bootcount = self.get_parameter('STAT_BOOTCNT', timeout=1, attempts=3)
print("current=%s required=%u" % (str(current_bootcount), required_bootcount))
if current_bootcount == required_bootcount:
break
@ -2736,7 +2775,12 @@ class AutoTest(ABC):
tstart = self.get_sim_time()
self.send_cmd_enter_cpu_lockup()
self.wait_disarmed(timeout=5, tstart=tstart)
# we're not getting SYSTEM_TIME messages at this point.... and
# we're in a weird state where the vehicle is armed but the
# motors are not, and we can't disarm further because Copter
# looks at whether its *motors* are armed as part of its
# disarm process.
self.stop_SITL()
def cpufailsafe_wait_servo_channel_value(self, channel, value, timeout=30):
'''we get restricted messages while doing cpufailsafe, this working then'''
@ -2893,6 +2937,7 @@ class AutoTest(ABC):
@staticmethod
def should_fetch_all_for_parameter_change(param_name):
return False # FIXME: if we allow MAVProxy then allow this
if fnmatch.fnmatch(param_name, "*_ENABLE") or fnmatch.fnmatch(param_name, "*_ENABLED"):
return True
if param_name in ["ARSPD_TYPE",
@ -2906,13 +2951,31 @@ class AutoTest(ABC):
return True
return False
def set_parameter(self, name, value, add_to_context=True, epsilon=0.0002, retries=10):
def send_set_parameter_direct(self, name, value):
self.mav.mav.param_set_send(self.sysid_thismav(),
1,
name.encode('ascii'),
value,
mavutil.mavlink.MAV_PARAM_TYPE_REAL32)
def send_set_parameter_mavproxy(self, name, value):
self.mavproxy.send("param set %s %s\n" % (name, str(value)))
def send_set_parameter(self, name, value, verbose=False):
if verbose:
self.progress("Send set param for (%s) (%f)" % (name, value))
if False:
return self.send_set_parameter_mavproxy(name, value)
return self.send_set_parameter_direct(name, value)
def set_parameter(self, name, value, add_to_context=True, epsilon=0.0002, retries=10, verbose=True):
"""Set parameters from vehicle."""
self.progress("Setting %s to %f" % (name, value))
if verbose:
self.progress("Setting %s to %f" % (name, value))
old_value = self.get_parameter(name, attempts=2)
for i in range(1, retries+2):
self.mavproxy.send("param set %s %s\n" % (name, str(value)))
returned_value = self.get_parameter(name)
self.send_set_parameter(name, value)
returned_value = self.get_parameter(name, verbose=verbose)
delta = float(value) - returned_value
if abs(delta) < epsilon:
# yes, near-enough-to-equal.
@ -2926,7 +2989,46 @@ class AutoTest(ABC):
raise ValueError("Param fetch returned incorrect value for (%s): (%s) vs (%s)"
% (name, returned_value, value))
def get_parameter(self, name, attempts=1, timeout=60):
def get_parameter(self, *args, **kwargs):
return self.get_parameter_direct(*args, **kwargs)
def get_parameter_direct(self, name, attempts=1, timeout=60, verbose=True):
while attempts > 0:
if verbose:
self.progress("Sending param_request_read for (%s)" % name)
self.drain_mav_unparsed(quiet=True)
tstart = self.get_sim_time()
encname = name
if sys.version_info.major >= 3 and type(encname) != bytes:
encname = bytes(encname, 'ascii')
self.mav.mav.param_request_read_send(self.sysid_thismav(),
1,
encname,
-1)
while True:
now = self.get_sim_time_cached()
if tstart > now:
self.progress("Time wrap detected")
# we're going to have to send another request...
break
delta_time = now - tstart
if delta_time > timeout:
break
m = self.mav.recv_match(type='PARAM_VALUE', blocking=True, timeout=0.1)
if verbose:
self.progress("get_parameter(%s): %s" % (name, str(m), ))
if m is None:
continue
if m.param_id == name:
if delta_time > 5:
self.progress("Long time to get parameter: %fs" % (delta_time,))
return m.param_value
if verbose:
self.progress("(%s) != (%s)" % (m.param_id, name,))
attempts -= 1
raise NotAchievedException("Failed to retrieve parameter (%s)" % name)
def get_parameter_mavproxy(self, name, attempts=1, timeout=60):
"""Get parameters from vehicle."""
for i in range(0, attempts):
# we call get_parameter while the vehicle is rebooting.
@ -5729,7 +5831,6 @@ Also, ignores heartbeats not from our target system'''
def test_gripper(self):
self.context_push()
self.set_parameter("GRIP_ENABLE", 1)
self.fetch_parameters()
self.set_parameter("GRIP_GRAB", 2000)
self.set_parameter("GRIP_RELEASE", 1000)
self.set_parameter("GRIP_TYPE", 1)
@ -6151,6 +6252,10 @@ switch value'''
if self.get_parameter("MIS_OPTIONS") != 1:
raise NotAchievedException("Failed to set MIS_OPTIONS")
from_mavproxy = self.get_parameter_mavproxy("MIS_OPTIONS")
if from_mavproxy != 1:
raise NotAchievedException("MAVProxy failed to get parameter")
def test_parameter_documentation(self):
'''ensure parameter documentation is valid'''
self.start_subsubtest("Check all parameters are documented")
@ -6370,7 +6475,10 @@ switch value'''
# wait for messages to stop coming:
self.drain_mav_seconds(15)
self.set_parameter("SIM_PIN_MASK", 0)
# NOTE: SIM_PIN_MASK is *magic*. You might set it to zero,
# but it *will* end up as 1<<btn immediately afterwards. Thus
# not attempting to fetch the value back here:
self.send_set_parameter("SIM_PIN_MASK", 0)
while True:
m3 = self.mav.recv_match(type='BUTTON_CHANGE', blocking=True, timeout=1)
self.progress("m3: %s" % str(m3))

View File

@ -253,7 +253,6 @@ class AutoTestRover(AutoTest):
self.set_parameter("SERVO%u_MAX" % spinner_ch, spinner_ch_max)
self.set_parameter("SIM_SPR_ENABLE", 1)
self.fetch_parameters()
self.set_parameter("SIM_SPR_PUMP", pump_ch)
self.set_parameter("SIM_SPR_SPIN", spinner_ch)