Tools: use methods to determine port numbers

This commit is contained in:
Peter Barker 2023-02-16 16:37:55 +11:00 committed by Peter Barker
parent d7bcfd757b
commit b90166ab97
3 changed files with 67 additions and 30 deletions

View File

@ -1653,13 +1653,27 @@ class AutoTest(ABC):
"""Allow subclasses to override SITL streamrate.""" """Allow subclasses to override SITL streamrate."""
return 10 return 10
def adjust_ardupilot_port(self, port):
'''adjust port in case we do not wish to use the default range (5760 and 5501 etc)'''
return port
def spare_network_port(self, offset=0):
'''returns a network port which should be able to be bound'''
if offset > 2:
raise ValueError("offset too large")
return 8000 + offset
def autotest_connection_string_to_ardupilot(self): def autotest_connection_string_to_ardupilot(self):
return "tcp:127.0.0.1:5760" return "tcp:127.0.0.1:%u" % self.adjust_ardupilot_port(5760)
def sitl_rcin_port(self, offset=0):
if offset > 2:
raise ValueError("offset too large")
return 5501 + offset
def mavproxy_options(self): def mavproxy_options(self):
"""Returns options to be passed to MAVProxy.""" """Returns options to be passed to MAVProxy."""
ret = [ ret = [
'--sitl=127.0.0.1:5502',
'--streamrate=%u' % self.sitl_streamrate(), '--streamrate=%u' % self.sitl_streamrate(),
'--target-system=%u' % self.sysid_thismav(), '--target-system=%u' % self.sysid_thismav(),
'--target-component=1', '--target-component=1',
@ -3466,7 +3480,7 @@ class AutoTest(ABC):
self.reboot_sitl() self.reboot_sitl()
mav2 = mavutil.mavlink_connection( mav2 = mavutil.mavlink_connection(
"tcp:localhost:5763", "tcp:localhost:%u" % self.adjust_ardupilot_port(5763),
robust_parsing=True, robust_parsing=True,
source_system=7, source_system=7,
source_component=7, source_component=7,
@ -4871,7 +4885,7 @@ class AutoTest(ABC):
def rc_thread_main(self): def rc_thread_main(self):
chan16 = [1000] * 16 chan16 = [1000] * 16
sitl_output = mavutil.mavudp("127.0.0.1:5501", input=False) sitl_output = mavutil.mavudp("127.0.0.1:%u" % self.sitl_rcin_port(), input=False)
buf = None buf = None
while True: while True:
@ -8017,7 +8031,7 @@ Also, ignores heartbeats not from our target system'''
def defaults_filepath(self): def defaults_filepath(self):
return None return None
def start_mavproxy(self): def start_mavproxy(self, sitl_rcin_port=None):
self.start_mavproxy_count += 1 self.start_mavproxy_count += 1
if self.mavproxy is not None: if self.mavproxy is not None:
return self.mavproxy return self.mavproxy
@ -8029,11 +8043,17 @@ Also, ignores heartbeats not from our target system'''
if self.valgrind or self.callgrind: if self.valgrind or self.callgrind:
pexpect_timeout *= 10 pexpect_timeout *= 10
if sitl_rcin_port is None:
sitl_rcin_port = self.sitl_rcin_port()
mavproxy = util.start_MAVProxy_SITL( mavproxy = util.start_MAVProxy_SITL(
self.vehicleinfo_key(), self.vehicleinfo_key(),
master='tcp:127.0.0.1:%u' % self.adjust_ardupilot_port(5762),
logfile=self.mavproxy_logfile, logfile=self.mavproxy_logfile,
options=self.mavproxy_options(), options=self.mavproxy_options(),
pexpect_timeout=pexpect_timeout) pexpect_timeout=pexpect_timeout,
sitl_rcin_port=sitl_rcin_port,
)
mavproxy.expect(r'Telemetry log: (\S+)\r\n') mavproxy.expect(r'Telemetry log: (\S+)\r\n')
self.logfile = mavproxy.match.group(1) self.logfile = mavproxy.match.group(1)
self.progress("LOGFILE %s" % self.logfile) self.progress("LOGFILE %s" % self.logfile)
@ -11455,9 +11475,10 @@ switch value'''
'''Test AHRS NMEA Output can be read by out NMEA GPS''' '''Test AHRS NMEA Output can be read by out NMEA GPS'''
self.set_parameter("SERIAL5_PROTOCOL", 20) # serial5 is NMEA output self.set_parameter("SERIAL5_PROTOCOL", 20) # serial5 is NMEA output
self.set_parameter("GPS_TYPE2", 5) # GPS2 is NMEA self.set_parameter("GPS_TYPE2", 5) # GPS2 is NMEA
port = self.spare_network_port()
self.customise_SITL_commandline([ self.customise_SITL_commandline([
"--uartE=tcp:6735", # GPS2 is NMEA.... "--uartE=tcp:%u" % port, # GPS2 is NMEA....
"--uartF=tcpclient:127.0.0.1:6735", # serial5 spews to localhost:6735 "--uartF=tcpclient:127.0.0.1:%u" % port, # serial5 spews to localhost port
]) ])
self.do_timesync_roundtrip() self.do_timesync_roundtrip()
self.wait_gps_fix_type_gte(3) self.wait_gps_fix_type_gte(3)
@ -12066,10 +12087,11 @@ switch value'''
"RPM1_TYPE": 10, # enable RPM output "RPM1_TYPE": 10, # enable RPM output
"TERRAIN_ENABLE": 0, "TERRAIN_ENABLE": 0,
}) })
port = self.spare_network_port()
self.customise_SITL_commandline([ self.customise_SITL_commandline([
"--uartF=tcp:6735" # serial5 spews to localhost:6735 "--uartF=tcp:%u" % port # serial5 spews to localhost port
]) ])
frsky = FRSkyPassThrough(("127.0.0.1", 6735), frsky = FRSkyPassThrough(("127.0.0.1", port),
get_time=self.get_sim_time_cached) get_time=self.get_sim_time_cached)
# waiting until we are ready to arm should ensure our wanted # waiting until we are ready to arm should ensure our wanted
@ -12160,10 +12182,11 @@ switch value'''
"SERIAL5_PROTOCOL": 10, # serial5 is FRSky passthrough "SERIAL5_PROTOCOL": 10, # serial5 is FRSky passthrough
"RPM1_TYPE": 10, # enable RPM output "RPM1_TYPE": 10, # enable RPM output
}) })
port = self.spare_network_port()
self.customise_SITL_commandline([ self.customise_SITL_commandline([
"--uartF=tcp:6735" # serial5 spews to localhost:6735 "--uartF=tcp:%u" % port # serial5 spews to localhost port
]) ])
frsky = FRSkyPassThrough(("127.0.0.1", 6735), frsky = FRSkyPassThrough(("127.0.0.1", port),
get_time=self.get_sim_time_cached) get_time=self.get_sim_time_cached)
self.wait_ready_to_arm() self.wait_ready_to_arm()
@ -12314,10 +12337,11 @@ switch value'''
def FRSkyMAVlite(self): def FRSkyMAVlite(self):
'''Test FrSky MAVlite serial output''' '''Test FrSky MAVlite serial output'''
self.set_parameter("SERIAL5_PROTOCOL", 10) # serial5 is FRSky passthrough self.set_parameter("SERIAL5_PROTOCOL", 10) # serial5 is FRSky passthrough
port = self.spare_network_port()
self.customise_SITL_commandline([ self.customise_SITL_commandline([
"--uartF=tcp:6735" # serial5 spews to localhost:6735 "--uartF=tcp:%u" % port # serial5 spews to localhost port
]) ])
frsky = FRSkyPassThrough(("127.0.0.1", 6735)) frsky = FRSkyPassThrough(("127.0.0.1", port))
frsky.connect() frsky.connect()
sport_to_mavlite = SPortToMAVlite() sport_to_mavlite = SPortToMAVlite()
@ -12588,10 +12612,11 @@ switch value'''
'''Test FrSky SPort mode''' '''Test FrSky SPort mode'''
self.set_parameter("SERIAL5_PROTOCOL", 4) # serial5 is FRSky sport self.set_parameter("SERIAL5_PROTOCOL", 4) # serial5 is FRSky sport
self.set_parameter("RPM1_TYPE", 10) # enable SITL RPM sensor self.set_parameter("RPM1_TYPE", 10) # enable SITL RPM sensor
port = self.spare_network_port()
self.customise_SITL_commandline([ self.customise_SITL_commandline([
"--uartF=tcp:6735" # serial5 spews to localhost:6735 "--uartF=tcp:%u" % port # serial5 spews to localhost port
]) ])
frsky = FRSkySPort(("127.0.0.1", 6735), verbose=True) frsky = FRSkySPort(("127.0.0.1", port), verbose=True)
self.wait_ready_to_arm() self.wait_ready_to_arm()
# we need to start the engine to get some RPM readings, we do it for plane only # we need to start the engine to get some RPM readings, we do it for plane only
@ -12660,10 +12685,11 @@ switch value'''
def FRSkyD(self): def FRSkyD(self):
'''Test FrSkyD serial output''' '''Test FrSkyD serial output'''
self.set_parameter("SERIAL5_PROTOCOL", 3) # serial5 is FRSky output self.set_parameter("SERIAL5_PROTOCOL", 3) # serial5 is FRSky output
port = self.spare_network_port()
self.customise_SITL_commandline([ self.customise_SITL_commandline([
"--uartF=tcp:6735" # serial5 spews to localhost:6735 "--uartF=tcp:%u" % port # serial5 spews to localhost port
]) ])
frsky = FRSkyD(("127.0.0.1", 6735)) frsky = FRSkyD(("127.0.0.1", port))
self.wait_ready_to_arm() self.wait_ready_to_arm()
m = self.assert_receive_message('GLOBAL_POSITION_INT', timeout=1) m = self.assert_receive_message('GLOBAL_POSITION_INT', timeout=1)
gpi_abs_alt = int((m.alt+500) / 1000) # mm -> m gpi_abs_alt = int((m.alt+500) / 1000) # mm -> m
@ -12782,10 +12808,11 @@ switch value'''
def LTM(self): def LTM(self):
'''Test LTM serial output''' '''Test LTM serial output'''
self.set_parameter("SERIAL5_PROTOCOL", 25) # serial5 is LTM output self.set_parameter("SERIAL5_PROTOCOL", 25) # serial5 is LTM output
port = self.spare_network_port()
self.customise_SITL_commandline([ self.customise_SITL_commandline([
"--uartF=tcp:6735" # serial5 spews to localhost:6735 "--uartF=tcp:%u" % port # serial5 spews to localhost port
]) ])
ltm = LTM(("127.0.0.1", 6735)) ltm = LTM(("127.0.0.1", port))
self.wait_ready_to_arm() self.wait_ready_to_arm()
wants = { wants = {
@ -12824,10 +12851,11 @@ switch value'''
'''Test DEVO serial output''' '''Test DEVO serial output'''
self.context_push() self.context_push()
self.set_parameter("SERIAL5_PROTOCOL", 17) # serial5 is DEVO output self.set_parameter("SERIAL5_PROTOCOL", 17) # serial5 is DEVO output
port = self.spare_network_port()
self.customise_SITL_commandline([ self.customise_SITL_commandline([
"--uartF=tcp:6735" # serial5 spews to localhost:6735 "--uartF=tcp:%u" % port # serial5 spews to localhost port
]) ])
devo = DEVO(("127.0.0.1", 6735)) devo = DEVO(("127.0.0.1", port))
self.wait_ready_to_arm() self.wait_ready_to_arm()
m = self.assert_receive_message('GLOBAL_POSITION_INT', timeout=1) m = self.assert_receive_message('GLOBAL_POSITION_INT', timeout=1)
@ -12896,10 +12924,11 @@ switch value'''
'''Test MSP DJI serial output''' '''Test MSP DJI serial output'''
self.set_parameter("SERIAL5_PROTOCOL", 33) # serial5 is MSP DJI output self.set_parameter("SERIAL5_PROTOCOL", 33) # serial5 is MSP DJI output
self.set_parameter("MSP_OPTIONS", 1) # telemetry (unpolled) mode self.set_parameter("MSP_OPTIONS", 1) # telemetry (unpolled) mode
port = self.spare_network_port()
self.customise_SITL_commandline([ self.customise_SITL_commandline([
"--uartF=tcp:6735" # serial5 spews to localhost:6735 "--uartF=tcp:%u" % port # serial5 spews to localhost port
]) ])
msp = MSP_DJI(("127.0.0.1", 6735)) msp = MSP_DJI(("127.0.0.1", port))
self.wait_ready_to_arm() self.wait_ready_to_arm()
tstart = self.get_sim_time() tstart = self.get_sim_time()
@ -12923,10 +12952,11 @@ switch value'''
ex = None ex = None
try: try:
self.set_parameter("SERIAL5_PROTOCOL", 23) # serial5 is RCIN input self.set_parameter("SERIAL5_PROTOCOL", 23) # serial5 is RCIN input
port = self.spare_network_port()
self.customise_SITL_commandline([ self.customise_SITL_commandline([
"--uartF=tcp:6735" # serial5 reads from to localhost:6735 "--uartF=tcp:%u" % port # serial5 reads from to localhost port
]) ])
crsf = CRSF(("127.0.0.1", 6735)) crsf = CRSF(("127.0.0.1", port))
crsf.connect() crsf.connect()
self.progress("Writing vtx_frame") self.progress("Writing vtx_frame")

View File

@ -600,11 +600,15 @@ def MAVProxy_version():
def start_MAVProxy_SITL(atype, def start_MAVProxy_SITL(atype,
aircraft=None, aircraft=None,
setup=False, setup=False,
master='tcp:127.0.0.1:5762', master=None,
options=[], options=[],
sitl_rcin_port=5501,
pexpect_timeout=60, pexpect_timeout=60,
logfile=sys.stdout): logfile=sys.stdout):
"""Launch mavproxy connected to a SITL instance.""" """Launch mavproxy connected to a SITL instance."""
if master is None:
raise ValueError("Expected a master")
local_mp_modules_dir = os.path.abspath( local_mp_modules_dir = os.path.abspath(
os.path.join(__file__, '..', '..', '..', 'mavproxy_modules')) os.path.join(__file__, '..', '..', '..', 'mavproxy_modules'))
env = dict(os.environ) env = dict(os.environ)
@ -617,6 +621,7 @@ def start_MAVProxy_SITL(atype,
cmd = [] cmd = []
cmd.append(mavproxy_cmd()) cmd.append(mavproxy_cmd())
cmd.extend(['--master', master]) cmd.extend(['--master', master])
cmd.extend(['--sitl', "localhost:%u" % sitl_rcin_port])
if setup: if setup:
cmd.append('--setup') cmd.append('--setup')
if aircraft is None: if aircraft is None:

View File

@ -624,8 +624,9 @@ Brakes have negligible effect (with=%0.2fm without=%0.2fm delta=%0.2fm)
def MAVProxy_SetModeUsingSwitch(self): def MAVProxy_SetModeUsingSwitch(self):
"""Set modes via mavproxy switch""" """Set modes via mavproxy switch"""
port = self.sitl_rcin_port(offset=1)
self.customise_SITL_commandline([ self.customise_SITL_commandline([
"--rc-in-port", "5502", "--rc-in-port", str(port),
]) ])
ex = None ex = None
try: try:
@ -637,7 +638,7 @@ Brakes have negligible effect (with=%0.2fm without=%0.2fm delta=%0.2fm)
(4, 'AUTO'), (4, 'AUTO'),
(5, 'AUTO'), # non-existant mode, should stay in RTL (5, 'AUTO'), # non-existant mode, should stay in RTL
(6, 'MANUAL')] (6, 'MANUAL')]
mavproxy = self.start_mavproxy() mavproxy = self.start_mavproxy(sitl_rcin_port=port)
for (num, expected) in fnoo: for (num, expected) in fnoo:
mavproxy.send('switch %u\n' % num) mavproxy.send('switch %u\n' % num)
self.wait_mode(expected) self.wait_mode(expected)
@ -6181,7 +6182,8 @@ Brakes have negligible effect (with=%0.2fm without=%0.2fm delta=%0.2fm)
def PrivateChannel(self): def PrivateChannel(self):
'''test the serial option bit specifying a mavlink channel as private''' '''test the serial option bit specifying a mavlink channel as private'''
global mav2 global mav2
mav2 = mavutil.mavlink_connection("tcp:localhost:5763", port = self.adjust_ardupilot_port(5763)
mav2 = mavutil.mavlink_connection("tcp:localhost:%u" % port,
robust_parsing=True, robust_parsing=True,
source_system=7, source_system=7,
source_component=7) source_component=7)