bpo-40275: Avoid importing socket in test.support (GH-19603)

* Move socket related functions from test.support to socket_helper.
* Import socket, nntplib and urllib.error lazily in transient_internet().
* Remove importing multiprocess.
This commit is contained in:
Serhiy Storchaka 2020-04-25 10:06:29 +03:00 committed by GitHub
parent 3c8a5b459d
commit 16994912c9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
37 changed files with 472 additions and 429 deletions

View File

@ -348,11 +348,6 @@ The :mod:`test.support` module defines the following constants:
:data:`SHORT_TIMEOUT`. :data:`SHORT_TIMEOUT`.
.. data:: IPV6_ENABLED
Set to ``True`` if IPV6 is enabled on this host, ``False`` otherwise.
.. data:: SAVEDCWD .. data:: SAVEDCWD
Set to :func:`os.getcwd`. Set to :func:`os.getcwd`.
@ -901,12 +896,6 @@ The :mod:`test.support` module defines the following functions:
A decorator for running tests that require support for xattr. A decorator for running tests that require support for xattr.
.. decorator:: skip_unless_bind_unix_socket
A decorator for running tests that require a functional bind() for Unix
sockets.
.. decorator:: anticipate_failure(condition) .. decorator:: anticipate_failure(condition)
A decorator to conditionally mark tests with A decorator to conditionally mark tests with
@ -1157,31 +1146,6 @@ The :mod:`test.support` module defines the following functions:
is raised. is raised.
.. function:: bind_port(sock, host=HOST)
Bind the socket to a free port and return the port number. Relies on
ephemeral ports in order to ensure we are using an unbound port. This is
important as many tests may be running simultaneously, especially in a
buildbot environment. This method raises an exception if the
``sock.family`` is :const:`~socket.AF_INET` and ``sock.type`` is
:const:`~socket.SOCK_STREAM`, and the socket has
:const:`~socket.SO_REUSEADDR` or :const:`~socket.SO_REUSEPORT` set on it.
Tests should never set these socket options for TCP/IP sockets.
The only case for setting these options is testing multicasting via
multiple UDP sockets.
Additionally, if the :const:`~socket.SO_EXCLUSIVEADDRUSE` socket option is
available (i.e. on Windows), it will be set on the socket. This will
prevent anyone else from binding to our host/port for the duration of the
test.
.. function:: bind_unix_socket(sock, addr)
Bind a unix socket, raising :exc:`unittest.SkipTest` if
:exc:`PermissionError` is raised.
.. function:: catch_threading_exception() .. function:: catch_threading_exception()
Context manager catching :class:`threading.Thread` exception using Context manager catching :class:`threading.Thread` exception using
@ -1243,29 +1207,6 @@ The :mod:`test.support` module defines the following functions:
.. versionadded:: 3.8 .. versionadded:: 3.8
.. function:: find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM)
Returns an unused port that should be suitable for binding. This is
achieved by creating a temporary socket with the same family and type as
the ``sock`` parameter (default is :const:`~socket.AF_INET`,
:const:`~socket.SOCK_STREAM`),
and binding it to the specified host address (defaults to ``0.0.0.0``)
with the port set to 0, eliciting an unused ephemeral port from the OS.
The temporary socket is then closed and deleted, and the ephemeral port is
returned.
Either this method or :func:`bind_port` should be used for any tests
where a server socket needs to be bound to a particular port for the
duration of the test.
Which one to use depends on whether the calling code is creating a Python
socket, or if an unused port needs to be provided in a constructor
or passed to an external program (i.e. the ``-accept`` argument to
openssl's s_server mode). Always prefer :func:`bind_port` over
:func:`find_unused_port` where possible. Using a hard coded port is
discouraged since it can make multiple instances of the test impossible to
run simultaneously, which is a problem for buildbots.
.. function:: load_package_tests(pkg_dir, loader, standard_tests, pattern) .. function:: load_package_tests(pkg_dir, loader, standard_tests, pattern)
Generic implementation of the :mod:`unittest` ``load_tests`` protocol for Generic implementation of the :mod:`unittest` ``load_tests`` protocol for
@ -1481,6 +1422,77 @@ The :mod:`test.support` module defines the following classes:
it will be raised in :meth:`!__fspath__`. it will be raised in :meth:`!__fspath__`.
:mod:`test.support.socket_helper` --- Utilities for socket tests
================================================================
.. module:: test.support.socket_helper
:synopsis: Support for socket tests.
The :mod:`test.support.socket_helper` module provides support for socket tests.
.. versionadded:: 3.9
.. data:: IPV6_ENABLED
Set to ``True`` if IPv6 is enabled on this host, ``False`` otherwise.
.. function:: find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM)
Returns an unused port that should be suitable for binding. This is
achieved by creating a temporary socket with the same family and type as
the ``sock`` parameter (default is :const:`~socket.AF_INET`,
:const:`~socket.SOCK_STREAM`),
and binding it to the specified host address (defaults to ``0.0.0.0``)
with the port set to 0, eliciting an unused ephemeral port from the OS.
The temporary socket is then closed and deleted, and the ephemeral port is
returned.
Either this method or :func:`bind_port` should be used for any tests
where a server socket needs to be bound to a particular port for the
duration of the test.
Which one to use depends on whether the calling code is creating a Python
socket, or if an unused port needs to be provided in a constructor
or passed to an external program (i.e. the ``-accept`` argument to
openssl's s_server mode). Always prefer :func:`bind_port` over
:func:`find_unused_port` where possible. Using a hard coded port is
discouraged since it can make multiple instances of the test impossible to
run simultaneously, which is a problem for buildbots.
.. function:: bind_port(sock, host=HOST)
Bind the socket to a free port and return the port number. Relies on
ephemeral ports in order to ensure we are using an unbound port. This is
important as many tests may be running simultaneously, especially in a
buildbot environment. This method raises an exception if the
``sock.family`` is :const:`~socket.AF_INET` and ``sock.type`` is
:const:`~socket.SOCK_STREAM`, and the socket has
:const:`~socket.SO_REUSEADDR` or :const:`~socket.SO_REUSEPORT` set on it.
Tests should never set these socket options for TCP/IP sockets.
The only case for setting these options is testing multicasting via
multiple UDP sockets.
Additionally, if the :const:`~socket.SO_EXCLUSIVEADDRUSE` socket option is
available (i.e. on Windows), it will be set on the socket. This will
prevent anyone else from binding to our host/port for the duration of the
test.
.. function:: bind_unix_socket(sock, addr)
Bind a unix socket, raising :exc:`unittest.SkipTest` if
:exc:`PermissionError` is raised.
.. decorator:: skip_unless_bind_unix_socket
A decorator for running tests that require a functional ``bind()`` for Unix
sockets.
:mod:`test.support.script_helper` --- Utilities for the Python execution tests :mod:`test.support.script_helper` --- Utilities for the Python execution tests
============================================================================== ==============================================================================

View File

@ -26,6 +26,7 @@ import warnings
import test.support import test.support
import test.support.script_helper import test.support.script_helper
from test import support from test import support
from test.support import socket_helper
# Skip tests if _multiprocessing wasn't built. # Skip tests if _multiprocessing wasn't built.
@ -2928,7 +2929,7 @@ class _TestRemoteManager(BaseTestCase):
authkey = os.urandom(32) authkey = os.urandom(32)
manager = QueueManager( manager = QueueManager(
address=(test.support.HOST, 0), authkey=authkey, serializer=SERIALIZER address=(socket_helper.HOST, 0), authkey=authkey, serializer=SERIALIZER
) )
manager.start() manager.start()
self.addCleanup(manager.shutdown) self.addCleanup(manager.shutdown)
@ -2965,7 +2966,7 @@ class _TestManagerRestart(BaseTestCase):
def test_rapid_restart(self): def test_rapid_restart(self):
authkey = os.urandom(32) authkey = os.urandom(32)
manager = QueueManager( manager = QueueManager(
address=(test.support.HOST, 0), authkey=authkey, serializer=SERIALIZER) address=(socket_helper.HOST, 0), authkey=authkey, serializer=SERIALIZER)
try: try:
srvr = manager.get_server() srvr = manager.get_server()
addr = srvr.address addr = srvr.address
@ -3455,7 +3456,7 @@ class _TestPicklingConnections(BaseTestCase):
new_conn.close() new_conn.close()
l.close() l.close()
l = socket.create_server((test.support.HOST, 0)) l = socket.create_server((socket_helper.HOST, 0))
conn.send(l.getsockname()) conn.send(l.getsockname())
new_conn, addr = l.accept() new_conn, addr = l.accept()
conn.send(new_conn) conn.send(new_conn)
@ -4593,7 +4594,7 @@ class TestWait(unittest.TestCase):
def test_wait_socket(self, slow=False): def test_wait_socket(self, slow=False):
from multiprocessing.connection import wait from multiprocessing.connection import wait
l = socket.create_server((test.support.HOST, 0)) l = socket.create_server((socket_helper.HOST, 0))
addr = l.getsockname() addr = l.getsockname()
readers = [] readers = []
procs = [] procs = []

View File

@ -22,6 +22,7 @@ import time
import unittest import unittest
from test import support from test import support
from test.support import socket_helper
@contextlib.contextmanager @contextlib.contextmanager
def kill_on_error(proc): def kill_on_error(proc):
@ -283,14 +284,14 @@ class SocketEINTRTest(EINTRBaseTest):
self._test_send(lambda sock, data: sock.sendmsg([data])) self._test_send(lambda sock, data: sock.sendmsg([data]))
def test_accept(self): def test_accept(self):
sock = socket.create_server((support.HOST, 0)) sock = socket.create_server((socket_helper.HOST, 0))
self.addCleanup(sock.close) self.addCleanup(sock.close)
port = sock.getsockname()[1] port = sock.getsockname()[1]
code = '\n'.join(( code = '\n'.join((
'import socket, time', 'import socket, time',
'', '',
'host = %r' % support.HOST, 'host = %r' % socket_helper.HOST,
'port = %s' % port, 'port = %s' % port,
'sleep_time = %r' % self.sleep_time, 'sleep_time = %r' % self.sleep_time,
'', '',

View File

@ -9,10 +9,11 @@ from http.server import (HTTPServer as _HTTPServer,
SimpleHTTPRequestHandler, BaseHTTPRequestHandler) SimpleHTTPRequestHandler, BaseHTTPRequestHandler)
from test import support from test import support
from test.support import socket_helper
here = os.path.dirname(__file__) here = os.path.dirname(__file__)
HOST = support.HOST HOST = socket_helper.HOST
CERTFILE = os.path.join(here, 'keycert.pem') CERTFILE = os.path.join(here, 'keycert.pem')
# This one's based on HTTPServer, which is based on socketserver # This one's based on HTTPServer, which is based on socketserver

View File

@ -16,12 +16,10 @@ import importlib
import importlib.util import importlib.util
import locale import locale
import logging.handlers import logging.handlers
import nntplib
import os import os
import platform import platform
import re import re
import shutil import shutil
import socket
import stat import stat
import struct import struct
import subprocess import subprocess
@ -33,16 +31,10 @@ import threading
import time import time
import types import types
import unittest import unittest
import urllib.error
import warnings import warnings
from .testresult import get_test_runner from .testresult import get_test_runner
try:
import multiprocessing.process
except ImportError:
multiprocessing = None
try: try:
import zlib import zlib
except ImportError: except ImportError:
@ -98,14 +90,13 @@ __all__ = [
"bigmemtest", "bigaddrspacetest", "cpython_only", "get_attribute", "bigmemtest", "bigaddrspacetest", "cpython_only", "get_attribute",
"requires_IEEE_754", "skip_unless_xattr", "requires_zlib", "requires_IEEE_754", "skip_unless_xattr", "requires_zlib",
"anticipate_failure", "load_package_tests", "detect_api_mismatch", "anticipate_failure", "load_package_tests", "detect_api_mismatch",
"check__all__", "skip_unless_bind_unix_socket", "skip_if_buggy_ucrt_strfptime", "check__all__", "skip_if_buggy_ucrt_strfptime",
"ignore_warnings", "ignore_warnings",
# sys # sys
"is_jython", "is_android", "check_impl_detail", "unix_shell", "is_jython", "is_android", "check_impl_detail", "unix_shell",
"setswitchinterval", "setswitchinterval",
# network # network
"HOST", "IPV6_ENABLED", "find_unused_port", "bind_port", "open_urlresource", "open_urlresource",
"bind_unix_socket",
# processes # processes
'temp_umask', "reap_children", 'temp_umask', "reap_children",
# logging # logging
@ -727,135 +718,6 @@ def requires_hashdigest(digestname, openssl=None, usedforsecurity=True):
return decorator return decorator
HOST = "localhost"
HOSTv4 = "127.0.0.1"
HOSTv6 = "::1"
def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM):
"""Returns an unused port that should be suitable for binding. This is
achieved by creating a temporary socket with the same family and type as
the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to
the specified host address (defaults to 0.0.0.0) with the port set to 0,
eliciting an unused ephemeral port from the OS. The temporary socket is
then closed and deleted, and the ephemeral port is returned.
Either this method or bind_port() should be used for any tests where a
server socket needs to be bound to a particular port for the duration of
the test. Which one to use depends on whether the calling code is creating
a python socket, or if an unused port needs to be provided in a constructor
or passed to an external program (i.e. the -accept argument to openssl's
s_server mode). Always prefer bind_port() over find_unused_port() where
possible. Hard coded ports should *NEVER* be used. As soon as a server
socket is bound to a hard coded port, the ability to run multiple instances
of the test simultaneously on the same host is compromised, which makes the
test a ticking time bomb in a buildbot environment. On Unix buildbots, this
may simply manifest as a failed test, which can be recovered from without
intervention in most cases, but on Windows, the entire python process can
completely and utterly wedge, requiring someone to log in to the buildbot
and manually kill the affected process.
(This is easy to reproduce on Windows, unfortunately, and can be traced to
the SO_REUSEADDR socket option having different semantics on Windows versus
Unix/Linux. On Unix, you can't have two AF_INET SOCK_STREAM sockets bind,
listen and then accept connections on identical host/ports. An EADDRINUSE
OSError will be raised at some point (depending on the platform and
the order bind and listen were called on each socket).
However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE
will ever be raised when attempting to bind two identical host/ports. When
accept() is called on each socket, the second caller's process will steal
the port from the first caller, leaving them both in an awkwardly wedged
state where they'll no longer respond to any signals or graceful kills, and
must be forcibly killed via OpenProcess()/TerminateProcess().
The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option
instead of SO_REUSEADDR, which effectively affords the same semantics as
SO_REUSEADDR on Unix. Given the propensity of Unix developers in the Open
Source world compared to Windows ones, this is a common mistake. A quick
look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when
openssl.exe is called with the 's_server' option, for example. See
http://bugs.python.org/issue2550 for more info. The following site also
has a very thorough description about the implications of both REUSEADDR
and EXCLUSIVEADDRUSE on Windows:
http://msdn2.microsoft.com/en-us/library/ms740621(VS.85).aspx)
XXX: although this approach is a vast improvement on previous attempts to
elicit unused ports, it rests heavily on the assumption that the ephemeral
port returned to us by the OS won't immediately be dished back out to some
other process when we close and delete our temporary socket but before our
calling code has a chance to bind the returned port. We can deal with this
issue if/when we come across it.
"""
with socket.socket(family, socktype) as tempsock:
port = bind_port(tempsock)
del tempsock
return port
def bind_port(sock, host=HOST):
"""Bind the socket to a free port and return the port number. Relies on
ephemeral ports in order to ensure we are using an unbound port. This is
important as many tests may be running simultaneously, especially in a
buildbot environment. This method raises an exception if the sock.family
is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR
or SO_REUSEPORT set on it. Tests should *never* set these socket options
for TCP/IP sockets. The only case for setting these options is testing
multicasting via multiple UDP sockets.
Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e.
on Windows), it will be set on the socket. This will prevent anyone else
from bind()'ing to our host/port for the duration of the test.
"""
if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM:
if hasattr(socket, 'SO_REUSEADDR'):
if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1:
raise TestFailed("tests should never set the SO_REUSEADDR " \
"socket option on TCP/IP sockets!")
if hasattr(socket, 'SO_REUSEPORT'):
try:
if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1:
raise TestFailed("tests should never set the SO_REUSEPORT " \
"socket option on TCP/IP sockets!")
except OSError:
# Python's socket module was compiled using modern headers
# thus defining SO_REUSEPORT but this process is running
# under an older kernel that does not support SO_REUSEPORT.
pass
if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'):
sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
sock.bind((host, 0))
port = sock.getsockname()[1]
return port
def bind_unix_socket(sock, addr):
"""Bind a unix socket, raising SkipTest if PermissionError is raised."""
assert sock.family == socket.AF_UNIX
try:
sock.bind(addr)
except PermissionError:
sock.close()
raise unittest.SkipTest('cannot bind AF_UNIX sockets')
def _is_ipv6_enabled():
"""Check whether IPv6 is enabled on this host."""
if socket.has_ipv6:
sock = None
try:
sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
sock.bind((HOSTv6, 0))
return True
except OSError:
pass
finally:
if sock:
sock.close()
return False
IPV6_ENABLED = _is_ipv6_enabled()
def system_must_validate_cert(f): def system_must_validate_cert(f):
"""Skip the test on TLS certificate validation failures.""" """Skip the test on TLS certificate validation failures."""
@functools.wraps(f) @functools.wraps(f)
@ -1563,31 +1425,13 @@ socket_peer_reset = TransientResource(OSError, errno=errno.ECONNRESET)
ioerror_peer_reset = TransientResource(OSError, errno=errno.ECONNRESET) ioerror_peer_reset = TransientResource(OSError, errno=errno.ECONNRESET)
def get_socket_conn_refused_errs():
"""
Get the different socket error numbers ('errno') which can be received
when a connection is refused.
"""
errors = [errno.ECONNREFUSED]
if hasattr(errno, 'ENETUNREACH'):
# On Solaris, ENETUNREACH is returned sometimes instead of ECONNREFUSED
errors.append(errno.ENETUNREACH)
if hasattr(errno, 'EADDRNOTAVAIL'):
# bpo-31910: socket.create_connection() fails randomly
# with EADDRNOTAVAIL on Travis CI
errors.append(errno.EADDRNOTAVAIL)
if hasattr(errno, 'EHOSTUNREACH'):
# bpo-37583: The destination host cannot be reached
errors.append(errno.EHOSTUNREACH)
if not IPV6_ENABLED:
errors.append(errno.EAFNOSUPPORT)
return errors
@contextlib.contextmanager @contextlib.contextmanager
def transient_internet(resource_name, *, timeout=_NOT_SET, errnos=()): def transient_internet(resource_name, *, timeout=_NOT_SET, errnos=()):
"""Return a context manager that raises ResourceDenied when various issues """Return a context manager that raises ResourceDenied when various issues
with the Internet connection manifest themselves as exceptions.""" with the Internet connection manifest themselves as exceptions."""
import socket
import nntplib
import urllib.error
if timeout is _NOT_SET: if timeout is _NOT_SET:
timeout = INTERNET_TIMEOUT timeout = INTERNET_TIMEOUT
@ -2754,28 +2598,6 @@ def skip_if_pgo_task(test):
msg = "Not run for (non-extended) PGO task" msg = "Not run for (non-extended) PGO task"
return test if ok else unittest.skip(msg)(test) return test if ok else unittest.skip(msg)(test)
_bind_nix_socket_error = None
def skip_unless_bind_unix_socket(test):
"""Decorator for tests requiring a functional bind() for unix sockets."""
if not hasattr(socket, 'AF_UNIX'):
return unittest.skip('No UNIX Sockets')(test)
global _bind_nix_socket_error
if _bind_nix_socket_error is None:
path = TESTFN + "can_bind_unix_socket"
with socket.socket(socket.AF_UNIX) as sock:
try:
sock.bind(path)
_bind_nix_socket_error = False
except OSError as e:
_bind_nix_socket_error = e
finally:
unlink(path)
if _bind_nix_socket_error:
msg = 'Requires a functional unix bind(): %s' % _bind_nix_socket_error
return unittest.skip(msg)(test)
else:
return test
def fs_is_case_insensitive(directory): def fs_is_case_insensitive(directory):
"""Detects if the file system for the specified directory is case-insensitive.""" """Detects if the file system for the specified directory is case-insensitive."""

View File

@ -0,0 +1,177 @@
import errno
import socket
import unittest
HOST = "localhost"
HOSTv4 = "127.0.0.1"
HOSTv6 = "::1"
def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM):
"""Returns an unused port that should be suitable for binding. This is
achieved by creating a temporary socket with the same family and type as
the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to
the specified host address (defaults to 0.0.0.0) with the port set to 0,
eliciting an unused ephemeral port from the OS. The temporary socket is
then closed and deleted, and the ephemeral port is returned.
Either this method or bind_port() should be used for any tests where a
server socket needs to be bound to a particular port for the duration of
the test. Which one to use depends on whether the calling code is creating
a python socket, or if an unused port needs to be provided in a constructor
or passed to an external program (i.e. the -accept argument to openssl's
s_server mode). Always prefer bind_port() over find_unused_port() where
possible. Hard coded ports should *NEVER* be used. As soon as a server
socket is bound to a hard coded port, the ability to run multiple instances
of the test simultaneously on the same host is compromised, which makes the
test a ticking time bomb in a buildbot environment. On Unix buildbots, this
may simply manifest as a failed test, which can be recovered from without
intervention in most cases, but on Windows, the entire python process can
completely and utterly wedge, requiring someone to log in to the buildbot
and manually kill the affected process.
(This is easy to reproduce on Windows, unfortunately, and can be traced to
the SO_REUSEADDR socket option having different semantics on Windows versus
Unix/Linux. On Unix, you can't have two AF_INET SOCK_STREAM sockets bind,
listen and then accept connections on identical host/ports. An EADDRINUSE
OSError will be raised at some point (depending on the platform and
the order bind and listen were called on each socket).
However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE
will ever be raised when attempting to bind two identical host/ports. When
accept() is called on each socket, the second caller's process will steal
the port from the first caller, leaving them both in an awkwardly wedged
state where they'll no longer respond to any signals or graceful kills, and
must be forcibly killed via OpenProcess()/TerminateProcess().
The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option
instead of SO_REUSEADDR, which effectively affords the same semantics as
SO_REUSEADDR on Unix. Given the propensity of Unix developers in the Open
Source world compared to Windows ones, this is a common mistake. A quick
look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when
openssl.exe is called with the 's_server' option, for example. See
http://bugs.python.org/issue2550 for more info. The following site also
has a very thorough description about the implications of both REUSEADDR
and EXCLUSIVEADDRUSE on Windows:
http://msdn2.microsoft.com/en-us/library/ms740621(VS.85).aspx)
XXX: although this approach is a vast improvement on previous attempts to
elicit unused ports, it rests heavily on the assumption that the ephemeral
port returned to us by the OS won't immediately be dished back out to some
other process when we close and delete our temporary socket but before our
calling code has a chance to bind the returned port. We can deal with this
issue if/when we come across it.
"""
with socket.socket(family, socktype) as tempsock:
port = bind_port(tempsock)
del tempsock
return port
def bind_port(sock, host=HOST):
"""Bind the socket to a free port and return the port number. Relies on
ephemeral ports in order to ensure we are using an unbound port. This is
important as many tests may be running simultaneously, especially in a
buildbot environment. This method raises an exception if the sock.family
is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR
or SO_REUSEPORT set on it. Tests should *never* set these socket options
for TCP/IP sockets. The only case for setting these options is testing
multicasting via multiple UDP sockets.
Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e.
on Windows), it will be set on the socket. This will prevent anyone else
from bind()'ing to our host/port for the duration of the test.
"""
if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM:
if hasattr(socket, 'SO_REUSEADDR'):
if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1:
raise TestFailed("tests should never set the SO_REUSEADDR " \
"socket option on TCP/IP sockets!")
if hasattr(socket, 'SO_REUSEPORT'):
try:
if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1:
raise TestFailed("tests should never set the SO_REUSEPORT " \
"socket option on TCP/IP sockets!")
except OSError:
# Python's socket module was compiled using modern headers
# thus defining SO_REUSEPORT but this process is running
# under an older kernel that does not support SO_REUSEPORT.
pass
if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'):
sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
sock.bind((host, 0))
port = sock.getsockname()[1]
return port
def bind_unix_socket(sock, addr):
"""Bind a unix socket, raising SkipTest if PermissionError is raised."""
assert sock.family == socket.AF_UNIX
try:
sock.bind(addr)
except PermissionError:
sock.close()
raise unittest.SkipTest('cannot bind AF_UNIX sockets')
def _is_ipv6_enabled():
"""Check whether IPv6 is enabled on this host."""
if socket.has_ipv6:
sock = None
try:
sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
sock.bind((HOSTv6, 0))
return True
except OSError:
pass
finally:
if sock:
sock.close()
return False
IPV6_ENABLED = _is_ipv6_enabled()
_bind_nix_socket_error = None
def skip_unless_bind_unix_socket(test):
"""Decorator for tests requiring a functional bind() for unix sockets."""
if not hasattr(socket, 'AF_UNIX'):
return unittest.skip('No UNIX Sockets')(test)
global _bind_nix_socket_error
if _bind_nix_socket_error is None:
from test.support import TESTFN, unlink
path = TESTFN + "can_bind_unix_socket"
with socket.socket(socket.AF_UNIX) as sock:
try:
sock.bind(path)
_bind_nix_socket_error = False
except OSError as e:
_bind_nix_socket_error = e
finally:
unlink(path)
if _bind_nix_socket_error:
msg = 'Requires a functional unix bind(): %s' % _bind_nix_socket_error
return unittest.skip(msg)(test)
else:
return test
def get_socket_conn_refused_errs():
"""
Get the different socket error numbers ('errno') which can be received
when a connection is refused.
"""
errors = [errno.ECONNREFUSED]
if hasattr(errno, 'ENETUNREACH'):
# On Solaris, ENETUNREACH is returned sometimes instead of ECONNREFUSED
errors.append(errno.ENETUNREACH)
if hasattr(errno, 'EADDRNOTAVAIL'):
# bpo-31910: socket.create_connection() fails randomly
# with EADDRNOTAVAIL on Travis CI
errors.append(errno.EADDRNOTAVAIL)
if hasattr(errno, 'EHOSTUNREACH'):
# bpo-37583: The destination host cannot be reached
errors.append(errno.EHOSTUNREACH)
if not IPV6_ENABLED:
errors.append(errno.EAFNOSUPPORT)
return errors

View File

@ -1,6 +1,7 @@
# test asynchat # test asynchat
from test import support from test import support
from test.support import socket_helper
import asynchat import asynchat
import asyncore import asyncore
@ -12,7 +13,7 @@ import time
import unittest import unittest
import unittest.mock import unittest.mock
HOST = support.HOST HOST = socket_helper.HOST
SERVER_QUIT = b'QUIT\n' SERVER_QUIT = b'QUIT\n'
@ -25,7 +26,7 @@ class echo_server(threading.Thread):
threading.Thread.__init__(self) threading.Thread.__init__(self)
self.event = event self.event = event
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.port = support.bind_port(self.sock) self.port = socket_helper.bind_port(self.sock)
# This will be set if the client wants us to wait before echoing # This will be set if the client wants us to wait before echoing
# data back. # data back.
self.start_resend_event = None self.start_resend_event = None

View File

@ -17,6 +17,7 @@ from asyncio import constants
from test.test_asyncio import utils as test_utils from test.test_asyncio import utils as test_utils
from test import support from test import support
from test.support.script_helper import assert_python_ok from test.support.script_helper import assert_python_ok
from test.support import socket_helper
MOCK_ANY = mock.ANY MOCK_ANY = mock.ANY
@ -91,7 +92,7 @@ class BaseEventTests(test_utils.TestCase):
self.assertIsNone( self.assertIsNone(
base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, 0, 0)) base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, 0, 0))
if support.IPV6_ENABLED: if socket_helper.IPV6_ENABLED:
# IPv4 address with family IPv6. # IPv4 address with family IPv6.
self.assertIsNone( self.assertIsNone(
base_events._ipaddr_info('1.2.3.4', 1, INET6, STREAM, TCP)) base_events._ipaddr_info('1.2.3.4', 1, INET6, STREAM, TCP))
@ -1156,7 +1157,7 @@ class BaseEventLoopWithSelectorTests(test_utils.TestCase):
srv.close() srv.close()
self.loop.run_until_complete(srv.wait_closed()) self.loop.run_until_complete(srv.wait_closed())
@unittest.skipUnless(support.IPV6_ENABLED, 'no IPv6 support') @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'no IPv6 support')
def test_create_server_ipv6(self): def test_create_server_ipv6(self):
async def main(): async def main():
with self.assertWarns(DeprecationWarning): with self.assertWarns(DeprecationWarning):
@ -1288,7 +1289,7 @@ class BaseEventLoopWithSelectorTests(test_utils.TestCase):
t.close() t.close()
test_utils.run_briefly(self.loop) # allow transport to close test_utils.run_briefly(self.loop) # allow transport to close
if support.IPV6_ENABLED: if socket_helper.IPV6_ENABLED:
sock.family = socket.AF_INET6 sock.family = socket.AF_INET6
coro = self.loop.create_connection(asyncio.Protocol, '::1', 80) coro = self.loop.create_connection(asyncio.Protocol, '::1', 80)
t, p = self.loop.run_until_complete(coro) t, p = self.loop.run_until_complete(coro)
@ -1307,7 +1308,7 @@ class BaseEventLoopWithSelectorTests(test_utils.TestCase):
t.close() t.close()
test_utils.run_briefly(self.loop) # allow transport to close test_utils.run_briefly(self.loop) # allow transport to close
@unittest.skipUnless(support.IPV6_ENABLED, 'no IPv6 support') @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'no IPv6 support')
@unittest.skipIf(sys.platform.startswith('aix'), @unittest.skipIf(sys.platform.startswith('aix'),
"bpo-25545: IPv6 scope id and getaddrinfo() behave differently on AIX") "bpo-25545: IPv6 scope id and getaddrinfo() behave differently on AIX")
@patch_socket @patch_socket
@ -1639,7 +1640,7 @@ class BaseEventLoopWithSelectorTests(test_utils.TestCase):
self.assertRaises( self.assertRaises(
OSError, self.loop.run_until_complete, coro) OSError, self.loop.run_until_complete, coro)
@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 not supported or enabled') @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 not supported or enabled')
def test_create_datagram_endpoint_no_matching_family(self): def test_create_datagram_endpoint_no_matching_family(self):
coro = self.loop.create_datagram_endpoint( coro = self.loop.create_datagram_endpoint(
asyncio.DatagramProtocol, asyncio.DatagramProtocol,
@ -1700,7 +1701,7 @@ class BaseEventLoopWithSelectorTests(test_utils.TestCase):
self.loop.run_until_complete(protocol.done) self.loop.run_until_complete(protocol.done)
self.assertEqual('CLOSED', protocol.state) self.assertEqual('CLOSED', protocol.state)
@support.skip_unless_bind_unix_socket @socket_helper.skip_unless_bind_unix_socket
def test_create_datagram_endpoint_existing_sock_unix(self): def test_create_datagram_endpoint_existing_sock_unix(self):
with test_utils.unix_socket_path() as path: with test_utils.unix_socket_path() as path:
sock = socket.socket(socket.AF_UNIX, type=socket.SOCK_DGRAM) sock = socket.socket(socket.AF_UNIX, type=socket.SOCK_DGRAM)
@ -2015,7 +2016,7 @@ class BaseLoopSockSendfileTests(test_utils.TestCase):
sock = self.make_socket() sock = self.make_socket()
proto = self.MyProto(self.loop) proto = self.MyProto(self.loop)
server = self.run_loop(self.loop.create_server( server = self.run_loop(self.loop.create_server(
lambda: proto, support.HOST, 0, family=socket.AF_INET)) lambda: proto, socket_helper.HOST, 0, family=socket.AF_INET))
addr = server.sockets[0].getsockname() addr = server.sockets[0].getsockname()
for _ in range(10): for _ in range(10):

View File

@ -32,6 +32,7 @@ from asyncio import proactor_events
from asyncio import selector_events from asyncio import selector_events
from test.test_asyncio import utils as test_utils from test.test_asyncio import utils as test_utils
from test import support from test import support
from test.support import socket_helper
from test.support import ALWAYS_EQ, LARGEST, SMALLEST from test.support import ALWAYS_EQ, LARGEST, SMALLEST
@ -517,7 +518,7 @@ class EventLoopTestsMixin:
lambda: MyProto(loop=self.loop), *httpd.address) lambda: MyProto(loop=self.loop), *httpd.address)
self._basetest_create_connection(conn_fut) self._basetest_create_connection(conn_fut)
@support.skip_unless_bind_unix_socket @socket_helper.skip_unless_bind_unix_socket
def test_create_unix_connection(self): def test_create_unix_connection(self):
# Issue #20682: On Mac OS X Tiger, getsockname() returns a # Issue #20682: On Mac OS X Tiger, getsockname() returns a
# zero-length address for UNIX socket. # zero-length address for UNIX socket.
@ -619,7 +620,7 @@ class EventLoopTestsMixin:
self._test_create_ssl_connection(httpd, create_connection, self._test_create_ssl_connection(httpd, create_connection,
peername=httpd.address) peername=httpd.address)
@support.skip_unless_bind_unix_socket @socket_helper.skip_unless_bind_unix_socket
@unittest.skipIf(ssl is None, 'No ssl module') @unittest.skipIf(ssl is None, 'No ssl module')
def test_create_ssl_unix_connection(self): def test_create_ssl_unix_connection(self):
# Issue #20682: On Mac OS X Tiger, getsockname() returns a # Issue #20682: On Mac OS X Tiger, getsockname() returns a
@ -638,7 +639,7 @@ class EventLoopTestsMixin:
def test_create_connection_local_addr(self): def test_create_connection_local_addr(self):
with test_utils.run_test_server() as httpd: with test_utils.run_test_server() as httpd:
port = support.find_unused_port() port = socket_helper.find_unused_port()
f = self.loop.create_connection( f = self.loop.create_connection(
lambda: MyProto(loop=self.loop), lambda: MyProto(loop=self.loop),
*httpd.address, local_addr=(httpd.address[0], port)) *httpd.address, local_addr=(httpd.address[0], port))
@ -843,7 +844,7 @@ class EventLoopTestsMixin:
return server, path return server, path
@support.skip_unless_bind_unix_socket @socket_helper.skip_unless_bind_unix_socket
def test_create_unix_server(self): def test_create_unix_server(self):
proto = MyProto(loop=self.loop) proto = MyProto(loop=self.loop)
server, path = self._make_unix_server(lambda: proto) server, path = self._make_unix_server(lambda: proto)
@ -935,7 +936,7 @@ class EventLoopTestsMixin:
# stop serving # stop serving
server.close() server.close()
@support.skip_unless_bind_unix_socket @socket_helper.skip_unless_bind_unix_socket
@unittest.skipIf(ssl is None, 'No ssl module') @unittest.skipIf(ssl is None, 'No ssl module')
def test_create_unix_server_ssl(self): def test_create_unix_server_ssl(self):
proto = MyProto(loop=self.loop) proto = MyProto(loop=self.loop)
@ -995,7 +996,7 @@ class EventLoopTestsMixin:
self.assertIsNone(proto.transport) self.assertIsNone(proto.transport)
server.close() server.close()
@support.skip_unless_bind_unix_socket @socket_helper.skip_unless_bind_unix_socket
@unittest.skipIf(ssl is None, 'No ssl module') @unittest.skipIf(ssl is None, 'No ssl module')
def test_create_unix_server_ssl_verify_failed(self): def test_create_unix_server_ssl_verify_failed(self):
proto = MyProto(loop=self.loop) proto = MyProto(loop=self.loop)
@ -1055,7 +1056,7 @@ class EventLoopTestsMixin:
self.assertIsNone(proto.transport) self.assertIsNone(proto.transport)
server.close() server.close()
@support.skip_unless_bind_unix_socket @socket_helper.skip_unless_bind_unix_socket
@unittest.skipIf(ssl is None, 'No ssl module') @unittest.skipIf(ssl is None, 'No ssl module')
def test_create_unix_server_ssl_verified(self): def test_create_unix_server_ssl_verified(self):
proto = MyProto(loop=self.loop) proto = MyProto(loop=self.loop)
@ -1148,7 +1149,7 @@ class EventLoopTestsMixin:
server.close() server.close()
@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 not supported or enabled') @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 not supported or enabled')
def test_create_server_dual_stack(self): def test_create_server_dual_stack(self):
f_proto = self.loop.create_future() f_proto = self.loop.create_future()
@ -1160,7 +1161,7 @@ class EventLoopTestsMixin:
try_count = 0 try_count = 0
while True: while True:
try: try:
port = support.find_unused_port() port = socket_helper.find_unused_port()
f = self.loop.create_server(TestMyProto, host=None, port=port) f = self.loop.create_server(TestMyProto, host=None, port=port)
server = self.loop.run_until_complete(f) server = self.loop.run_until_complete(f)
except OSError as ex: except OSError as ex:

View File

@ -13,6 +13,7 @@ from asyncio.proactor_events import _ProactorWritePipeTransport
from asyncio.proactor_events import _ProactorDuplexPipeTransport from asyncio.proactor_events import _ProactorDuplexPipeTransport
from asyncio.proactor_events import _ProactorDatagramTransport from asyncio.proactor_events import _ProactorDatagramTransport
from test import support from test import support
from test.support import socket_helper
from test.test_asyncio import utils as test_utils from test.test_asyncio import utils as test_utils
@ -950,7 +951,7 @@ class ProactorEventLoopUnixSockSendfileTests(test_utils.TestCase):
def prepare(self): def prepare(self):
sock = self.make_socket() sock = self.make_socket()
proto = self.MyProto(self.loop) proto = self.MyProto(self.loop)
port = support.find_unused_port() port = socket_helper.find_unused_port()
srv_sock = self.make_socket(cleanup=False) srv_sock = self.make_socket(cleanup=False)
srv_sock.bind(('127.0.0.1', port)) srv_sock.bind(('127.0.0.1', port))
server = self.run_loop(self.loop.create_server( server = self.run_loop(self.loop.create_server(

View File

@ -10,6 +10,7 @@ from asyncio import base_events
from asyncio import constants from asyncio import constants
from unittest import mock from unittest import mock
from test import support from test import support
from test.support import socket_helper
from test.test_asyncio import utils as test_utils from test.test_asyncio import utils as test_utils
try: try:
@ -163,9 +164,9 @@ class SockSendfileMixin(SendfileBase):
def prepare_socksendfile(self): def prepare_socksendfile(self):
proto = MyProto(self.loop) proto = MyProto(self.loop)
port = support.find_unused_port() port = socket_helper.find_unused_port()
srv_sock = self.make_socket(cleanup=False) srv_sock = self.make_socket(cleanup=False)
srv_sock.bind((support.HOST, port)) srv_sock.bind((socket_helper.HOST, port))
server = self.run_loop(self.loop.create_server( server = self.run_loop(self.loop.create_server(
lambda: proto, sock=srv_sock)) lambda: proto, sock=srv_sock))
self.reduce_receive_buffer_size(srv_sock) self.reduce_receive_buffer_size(srv_sock)
@ -240,7 +241,7 @@ class SendfileMixin(SendfileBase):
# Note: sendfile via SSL transport is equal to sendfile fallback # Note: sendfile via SSL transport is equal to sendfile fallback
def prepare_sendfile(self, *, is_ssl=False, close_after=0): def prepare_sendfile(self, *, is_ssl=False, close_after=0):
port = support.find_unused_port() port = socket_helper.find_unused_port()
srv_proto = MySendfileProto(loop=self.loop, srv_proto = MySendfileProto(loop=self.loop,
close_after=close_after) close_after=close_after)
if is_ssl: if is_ssl:
@ -252,17 +253,17 @@ class SendfileMixin(SendfileBase):
srv_ctx = None srv_ctx = None
cli_ctx = None cli_ctx = None
srv_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) srv_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
srv_sock.bind((support.HOST, port)) srv_sock.bind((socket_helper.HOST, port))
server = self.run_loop(self.loop.create_server( server = self.run_loop(self.loop.create_server(
lambda: srv_proto, sock=srv_sock, ssl=srv_ctx)) lambda: srv_proto, sock=srv_sock, ssl=srv_ctx))
self.reduce_receive_buffer_size(srv_sock) self.reduce_receive_buffer_size(srv_sock)
if is_ssl: if is_ssl:
server_hostname = support.HOST server_hostname = socket_helper.HOST
else: else:
server_hostname = None server_hostname = None
cli_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) cli_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
cli_sock.connect((support.HOST, port)) cli_sock.connect((socket_helper.HOST, port))
cli_proto = MySendfileProto(loop=self.loop) cli_proto = MySendfileProto(loop=self.loop)
tr, pr = self.run_loop(self.loop.create_connection( tr, pr = self.run_loop(self.loop.create_connection(

View File

@ -4,6 +4,7 @@ import threading
import unittest import unittest
from test import support from test import support
from test.support import socket_helper
from test.test_asyncio import utils as test_utils from test.test_asyncio import utils as test_utils
from test.test_asyncio import functional as func_tests from test.test_asyncio import functional as func_tests
@ -47,7 +48,7 @@ class BaseStartServer(func_tests.FunctionalTestCaseMixin):
with self.assertWarns(DeprecationWarning): with self.assertWarns(DeprecationWarning):
srv = self.loop.run_until_complete(asyncio.start_server( srv = self.loop.run_until_complete(asyncio.start_server(
serve, support.HOSTv4, 0, loop=self.loop, start_serving=False)) serve, socket_helper.HOSTv4, 0, loop=self.loop, start_serving=False))
self.assertFalse(srv.is_serving()) self.assertFalse(srv.is_serving())
@ -73,7 +74,7 @@ class SelectorStartServerTests(BaseStartServer, unittest.TestCase):
def new_loop(self): def new_loop(self):
return asyncio.SelectorEventLoop() return asyncio.SelectorEventLoop()
@support.skip_unless_bind_unix_socket @socket_helper.skip_unless_bind_unix_socket
def test_start_unix_server_1(self): def test_start_unix_server_1(self):
HELLO_MSG = b'1' * 1024 * 5 + b'\n' HELLO_MSG = b'1' * 1024 * 5 + b'\n'
started = threading.Event() started = threading.Event()

View File

@ -5,6 +5,7 @@ from asyncio import proactor_events
from itertools import cycle, islice from itertools import cycle, islice
from test.test_asyncio import utils as test_utils from test.test_asyncio import utils as test_utils
from test import support from test import support
from test.support import socket_helper
class MyProto(asyncio.Protocol): class MyProto(asyncio.Protocol):
@ -225,7 +226,7 @@ class BaseSockTestsMixin:
self.loop.run_until_complete( self.loop.run_until_complete(
self._basetest_huge_content_recvinto(httpd.address)) self._basetest_huge_content_recvinto(httpd.address))
@support.skip_unless_bind_unix_socket @socket_helper.skip_unless_bind_unix_socket
def test_unix_sock_client_ops(self): def test_unix_sock_client_ops(self):
with test_utils.run_test_unix_server() as httpd: with test_utils.run_test_unix_server() as httpd:
sock = socket.socket(socket.AF_UNIX) sock = socket.socket(socket.AF_UNIX)

View File

@ -9,7 +9,7 @@ import sys
import threading import threading
import unittest import unittest
from unittest import mock from unittest import mock
from test import support from test.support import socket_helper
try: try:
import ssl import ssl
except ImportError: except ImportError:
@ -66,7 +66,7 @@ class StreamTests(test_utils.TestCase):
loop=self.loop) loop=self.loop)
self._basetest_open_connection(conn_fut) self._basetest_open_connection(conn_fut)
@support.skip_unless_bind_unix_socket @socket_helper.skip_unless_bind_unix_socket
def test_open_unix_connection(self): def test_open_unix_connection(self):
with test_utils.run_test_unix_server() as httpd: with test_utils.run_test_unix_server() as httpd:
conn_fut = asyncio.open_unix_connection(httpd.address, conn_fut = asyncio.open_unix_connection(httpd.address,
@ -99,7 +99,7 @@ class StreamTests(test_utils.TestCase):
self._basetest_open_connection_no_loop_ssl(conn_fut) self._basetest_open_connection_no_loop_ssl(conn_fut)
@support.skip_unless_bind_unix_socket @socket_helper.skip_unless_bind_unix_socket
@unittest.skipIf(ssl is None, 'No ssl module') @unittest.skipIf(ssl is None, 'No ssl module')
def test_open_unix_connection_no_loop_ssl(self): def test_open_unix_connection_no_loop_ssl(self):
with test_utils.run_test_unix_server(use_ssl=True) as httpd: with test_utils.run_test_unix_server(use_ssl=True) as httpd:
@ -130,7 +130,7 @@ class StreamTests(test_utils.TestCase):
loop=self.loop) loop=self.loop)
self._basetest_open_connection_error(conn_fut) self._basetest_open_connection_error(conn_fut)
@support.skip_unless_bind_unix_socket @socket_helper.skip_unless_bind_unix_socket
def test_open_unix_connection_error(self): def test_open_unix_connection_error(self):
with test_utils.run_test_unix_server() as httpd: with test_utils.run_test_unix_server() as httpd:
conn_fut = asyncio.open_unix_connection(httpd.address, conn_fut = asyncio.open_unix_connection(httpd.address,
@ -653,7 +653,7 @@ class StreamTests(test_utils.TestCase):
self.assertEqual(messages, []) self.assertEqual(messages, [])
@support.skip_unless_bind_unix_socket @socket_helper.skip_unless_bind_unix_socket
def test_start_unix_server(self): def test_start_unix_server(self):
class MyServer: class MyServer:

View File

@ -15,6 +15,7 @@ import threading
import unittest import unittest
from unittest import mock from unittest import mock
from test import support from test import support
from test.support import socket_helper
if sys.platform == 'win32': if sys.platform == 'win32':
raise unittest.SkipTest('UNIX only') raise unittest.SkipTest('UNIX only')
@ -273,7 +274,7 @@ class SelectorEventLoopUnixSocketTests(test_utils.TestCase):
self.loop = asyncio.SelectorEventLoop() self.loop = asyncio.SelectorEventLoop()
self.set_event_loop(self.loop) self.set_event_loop(self.loop)
@support.skip_unless_bind_unix_socket @socket_helper.skip_unless_bind_unix_socket
def test_create_unix_server_existing_path_sock(self): def test_create_unix_server_existing_path_sock(self):
with test_utils.unix_socket_path() as path: with test_utils.unix_socket_path() as path:
sock = socket.socket(socket.AF_UNIX) sock = socket.socket(socket.AF_UNIX)
@ -286,7 +287,7 @@ class SelectorEventLoopUnixSocketTests(test_utils.TestCase):
srv.close() srv.close()
self.loop.run_until_complete(srv.wait_closed()) self.loop.run_until_complete(srv.wait_closed())
@support.skip_unless_bind_unix_socket @socket_helper.skip_unless_bind_unix_socket
def test_create_unix_server_pathlib(self): def test_create_unix_server_pathlib(self):
with test_utils.unix_socket_path() as path: with test_utils.unix_socket_path() as path:
path = pathlib.Path(path) path = pathlib.Path(path)
@ -344,7 +345,7 @@ class SelectorEventLoopUnixSocketTests(test_utils.TestCase):
@unittest.skipUnless(hasattr(socket, 'SOCK_NONBLOCK'), @unittest.skipUnless(hasattr(socket, 'SOCK_NONBLOCK'),
'no socket.SOCK_NONBLOCK (linux only)') 'no socket.SOCK_NONBLOCK (linux only)')
@support.skip_unless_bind_unix_socket @socket_helper.skip_unless_bind_unix_socket
def test_create_unix_server_path_stream_bittype(self): def test_create_unix_server_path_stream_bittype(self):
sock = socket.socket( sock = socket.socket(
socket.AF_UNIX, socket.SOCK_STREAM | socket.SOCK_NONBLOCK) socket.AF_UNIX, socket.SOCK_STREAM | socket.SOCK_NONBLOCK)
@ -497,12 +498,12 @@ class SelectorEventLoopUnixSockSendfileTests(test_utils.TestCase):
def prepare(self): def prepare(self):
sock = self.make_socket() sock = self.make_socket()
proto = self.MyProto(self.loop) proto = self.MyProto(self.loop)
port = support.find_unused_port() port = socket_helper.find_unused_port()
srv_sock = self.make_socket(cleanup=False) srv_sock = self.make_socket(cleanup=False)
srv_sock.bind((support.HOST, port)) srv_sock.bind((socket_helper.HOST, port))
server = self.run_loop(self.loop.create_server( server = self.run_loop(self.loop.create_server(
lambda: proto, sock=srv_sock)) lambda: proto, sock=srv_sock))
self.run_loop(self.loop.sock_connect(sock, (support.HOST, port))) self.run_loop(self.loop.sock_connect(sock, (socket_helper.HOST, port)))
self.run_loop(proto._ready) self.run_loop(proto._ready)
def cleanup(): def cleanup():

View File

@ -10,6 +10,7 @@ import struct
import threading import threading
from test import support from test import support
from test.support import socket_helper
from io import BytesIO from io import BytesIO
if support.PGO: if support.PGO:
@ -91,7 +92,7 @@ def bind_af_aware(sock, addr):
if HAS_UNIX_SOCKETS and sock.family == socket.AF_UNIX: if HAS_UNIX_SOCKETS and sock.family == socket.AF_UNIX:
# Make sure the path doesn't exist. # Make sure the path doesn't exist.
support.unlink(addr) support.unlink(addr)
support.bind_unix_socket(sock, addr) socket_helper.bind_unix_socket(sock, addr)
else: else:
sock.bind(addr) sock.bind(addr)
@ -327,7 +328,7 @@ class DispatcherWithSendTests(unittest.TestCase):
evt = threading.Event() evt = threading.Event()
sock = socket.socket() sock = socket.socket()
sock.settimeout(3) sock.settimeout(3)
port = support.bind_port(sock) port = socket_helper.bind_port(sock)
cap = BytesIO() cap = BytesIO()
args = (evt, cap, sock) args = (evt, cap, sock)
@ -341,7 +342,7 @@ class DispatcherWithSendTests(unittest.TestCase):
data = b"Suppose there isn't a 16-ton weight?" data = b"Suppose there isn't a 16-ton weight?"
d = dispatcherwithsend_noread() d = dispatcherwithsend_noread()
d.create_socket() d.create_socket()
d.connect((support.HOST, port)) d.connect((socket_helper.HOST, port))
# give time for socket to connect # give time for socket to connect
time.sleep(0.1) time.sleep(0.1)
@ -791,12 +792,12 @@ class BaseTestAPI:
class TestAPI_UseIPv4Sockets(BaseTestAPI): class TestAPI_UseIPv4Sockets(BaseTestAPI):
family = socket.AF_INET family = socket.AF_INET
addr = (support.HOST, 0) addr = (socket_helper.HOST, 0)
@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 support required') @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 support required')
class TestAPI_UseIPv6Sockets(BaseTestAPI): class TestAPI_UseIPv6Sockets(BaseTestAPI):
family = socket.AF_INET6 family = socket.AF_INET6
addr = (support.HOSTv6, 0) addr = (socket_helper.HOSTv6, 0)
@unittest.skipUnless(HAS_UNIX_SOCKETS, 'Unix sockets required') @unittest.skipUnless(HAS_UNIX_SOCKETS, 'Unix sockets required')
class TestAPI_UseUnixSockets(BaseTestAPI): class TestAPI_UseUnixSockets(BaseTestAPI):

View File

@ -19,7 +19,8 @@ except ImportError:
from unittest import TestCase, skipUnless from unittest import TestCase, skipUnless
from test import support from test import support
from test.support import HOST, HOSTv6 from test.support import socket_helper
from test.support.socket_helper import HOST, HOSTv6
TIMEOUT = support.LOOPBACK_TIMEOUT TIMEOUT = support.LOOPBACK_TIMEOUT
DEFAULT_ENCODING = 'utf-8' DEFAULT_ENCODING = 'utf-8'
@ -751,7 +752,7 @@ class TestFTPClass(TestCase):
def test_source_address(self): def test_source_address(self):
self.client.quit() self.client.quit()
port = support.find_unused_port() port = socket_helper.find_unused_port()
try: try:
self.client.connect(self.server.host, self.server.port, self.client.connect(self.server.host, self.server.port,
source_address=(HOST, port)) source_address=(HOST, port))
@ -763,7 +764,7 @@ class TestFTPClass(TestCase):
raise raise
def test_source_address_passive_connection(self): def test_source_address_passive_connection(self):
port = support.find_unused_port() port = socket_helper.find_unused_port()
self.client.source_address = (HOST, port) self.client.source_address = (HOST, port)
try: try:
with self.client.transfercmd('list') as sock: with self.client.transfercmd('list') as sock:
@ -816,7 +817,7 @@ class TestFTPClass(TestCase):
self.assertEqual(DEFAULT_ENCODING, client.encoding) self.assertEqual(DEFAULT_ENCODING, client.encoding)
@skipUnless(support.IPV6_ENABLED, "IPv6 not enabled") @skipUnless(socket_helper.IPV6_ENABLED, "IPv6 not enabled")
class TestIPv6Environment(TestCase): class TestIPv6Environment(TestCase):
def setUp(self): def setUp(self):
@ -1007,7 +1008,7 @@ class TestTimeouts(TestCase):
self.evt = threading.Event() self.evt = threading.Event()
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.settimeout(20) self.sock.settimeout(20)
self.port = support.bind_port(self.sock) self.port = socket_helper.bind_port(self.sock)
self.server_thread = threading.Thread(target=self.server) self.server_thread = threading.Thread(target=self.server)
self.server_thread.daemon = True self.server_thread.daemon = True
self.server_thread.start() self.server_thread.start()

View File

@ -13,6 +13,7 @@ import unittest
TestCase = unittest.TestCase TestCase = unittest.TestCase
from test import support from test import support
from test.support import socket_helper
here = os.path.dirname(__file__) here = os.path.dirname(__file__)
# Self-signed cert file for 'localhost' # Self-signed cert file for 'localhost'
@ -42,7 +43,7 @@ last_chunk_extended = "0" + chunk_extension + "\r\n"
trailers = "X-Dummy: foo\r\nX-Dumm2: bar\r\n" trailers = "X-Dummy: foo\r\nX-Dumm2: bar\r\n"
chunked_end = "\r\n" chunked_end = "\r\n"
HOST = support.HOST HOST = socket_helper.HOST
class FakeSocket: class FakeSocket:
def __init__(self, text, fileclass=io.BytesIO, host=None, port=None): def __init__(self, text, fileclass=io.BytesIO, host=None, port=None):
@ -1463,8 +1464,8 @@ class OfflineTest(TestCase):
class SourceAddressTest(TestCase): class SourceAddressTest(TestCase):
def setUp(self): def setUp(self):
self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.port = support.bind_port(self.serv) self.port = socket_helper.bind_port(self.serv)
self.source_port = support.find_unused_port() self.source_port = socket_helper.find_unused_port()
self.serv.listen() self.serv.listen()
self.conn = None self.conn = None
@ -1496,7 +1497,7 @@ class TimeoutTest(TestCase):
def setUp(self): def setUp(self):
self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
TimeoutTest.PORT = support.bind_port(self.serv) TimeoutTest.PORT = socket_helper.bind_port(self.serv)
self.serv.listen() self.serv.listen()
def tearDown(self): def tearDown(self):

View File

@ -1,4 +1,5 @@
from test import support from test import support
from test.support import socket_helper
from contextlib import contextmanager from contextlib import contextmanager
import imaplib import imaplib
@ -82,7 +83,7 @@ class TestImaplib(unittest.TestCase):
pass pass
# This is the exception that should be raised. # This is the exception that should be raised.
expected_errnos = support.get_socket_conn_refused_errs() expected_errnos = socket_helper.get_socket_conn_refused_errs()
with self.assertRaises(OSError) as cm: with self.assertRaises(OSError) as cm:
imaplib.IMAP4() imaplib.IMAP4()
self.assertIn(cm.exception.errno, expected_errnos) self.assertIn(cm.exception.errno, expected_errnos)
@ -210,7 +211,7 @@ class NewIMAPTestsMixin():
raise raise
self.addCleanup(self._cleanup) self.addCleanup(self._cleanup)
self.server = self.server_class((support.HOST, 0), imap_handler) self.server = self.server_class((socket_helper.HOST, 0), imap_handler)
self.thread = threading.Thread( self.thread = threading.Thread(
name=self._testMethodName+'-server', name=self._testMethodName+'-server',
target=self.server.serve_forever, target=self.server.serve_forever,
@ -600,7 +601,7 @@ class ThreadedNetworkedTests(unittest.TestCase):
@contextmanager @contextmanager
def reaped_server(self, hdlr): def reaped_server(self, hdlr):
server, thread = self.make_server((support.HOST, 0), hdlr) server, thread = self.make_server((socket_helper.HOST, 0), hdlr)
try: try:
yield server yield server
finally: finally:

View File

@ -8,8 +8,9 @@ import unittest
import socket import socket
import shutil import shutil
import threading import threading
from test.support import TESTFN, requires, unlink, bigmemtest, find_unused_port from test.support import TESTFN, requires, unlink, bigmemtest
from test.support import SHORT_TIMEOUT from test.support import SHORT_TIMEOUT
from test.support import socket_helper
import io # C implementation of io import io # C implementation of io
import _pyio as pyio # Python implementation of io import _pyio as pyio # Python implementation of io
@ -219,7 +220,7 @@ class TestSocketSendfile(LargeFileTest, unittest.TestCase):
# bit more tolerance. # bit more tolerance.
@skip_no_disk_space(TESTFN, size * 2.5) @skip_no_disk_space(TESTFN, size * 2.5)
def test_it(self): def test_it(self):
port = find_unused_port() port = socket_helper.find_unused_port()
with socket.create_server(("", port)) as sock: with socket.create_server(("", port)) as sock:
self.tcp_server(sock) self.tcp_server(sock)
with socket.create_connection(("127.0.0.1", port)) as client: with socket.create_connection(("127.0.0.1", port)) as client:

View File

@ -43,6 +43,7 @@ import sys
import tempfile import tempfile
from test.support.script_helper import assert_python_ok, assert_python_failure from test.support.script_helper import assert_python_ok, assert_python_failure
from test import support from test import support
from test.support import socket_helper
import textwrap import textwrap
import threading import threading
import time import time
@ -1053,10 +1054,10 @@ class SMTPHandlerTest(BaseTest):
def test_basic(self): def test_basic(self):
sockmap = {} sockmap = {}
server = TestSMTPServer((support.HOST, 0), self.process_message, 0.001, server = TestSMTPServer((socket_helper.HOST, 0), self.process_message, 0.001,
sockmap) sockmap)
server.start() server.start()
addr = (support.HOST, server.port) addr = (socket_helper.HOST, server.port)
h = logging.handlers.SMTPHandler(addr, 'me', 'you', 'Log', h = logging.handlers.SMTPHandler(addr, 'me', 'you', 'Log',
timeout=self.TIMEOUT) timeout=self.TIMEOUT)
self.assertEqual(h.toaddrs, ['you']) self.assertEqual(h.toaddrs, ['you'])
@ -1921,7 +1922,7 @@ class UnixSysLogHandlerTest(SysLogHandlerTest):
SysLogHandlerTest.tearDown(self) SysLogHandlerTest.tearDown(self)
support.unlink(self.address) support.unlink(self.address)
@unittest.skipUnless(support.IPV6_ENABLED, @unittest.skipUnless(socket_helper.IPV6_ENABLED,
'IPv6 support required for this test.') 'IPv6 support required for this test.')
class IPv6SysLogHandlerTest(SysLogHandlerTest): class IPv6SysLogHandlerTest(SysLogHandlerTest):

View File

@ -10,6 +10,7 @@ import re
import threading import threading
from test import support from test import support
from test.support import socket_helper
from nntplib import NNTP, GroupInfo from nntplib import NNTP, GroupInfo
import nntplib import nntplib
from unittest.mock import patch from unittest.mock import patch
@ -1555,14 +1556,14 @@ class MockSslTests(MockSocketTests):
class LocalServerTests(unittest.TestCase): class LocalServerTests(unittest.TestCase):
def setUp(self): def setUp(self):
sock = socket.socket() sock = socket.socket()
port = support.bind_port(sock) port = socket_helper.bind_port(sock)
sock.listen() sock.listen()
self.background = threading.Thread( self.background = threading.Thread(
target=self.run_server, args=(sock,)) target=self.run_server, args=(sock,))
self.background.start() self.background.start()
self.addCleanup(self.background.join) self.addCleanup(self.background.join)
self.nntp = NNTP(support.HOST, port, usenetrc=False).__enter__() self.nntp = NNTP(socket_helper.HOST, port, usenetrc=False).__enter__()
self.addCleanup(self.nntp.__exit__, None, None, None) self.addCleanup(self.nntp.__exit__, None, None, None)
def run_server(self, sock): def run_server(self, sock):

View File

@ -30,6 +30,7 @@ import unittest
import uuid import uuid
import warnings import warnings
from test import support from test import support
from test.support import socket_helper
from platform import win32_is_iot from platform import win32_is_iot
try: try:
@ -3171,7 +3172,7 @@ class TestSendfile(unittest.TestCase):
support.unlink(support.TESTFN) support.unlink(support.TESTFN)
def setUp(self): def setUp(self):
self.server = SendfileTestServer((support.HOST, 0)) self.server = SendfileTestServer((socket_helper.HOST, 0))
self.server.start() self.server.start()
self.client = socket.socket() self.client = socket.socket()
self.client.connect((self.server.host, self.server.port)) self.client.connect((self.server.host, self.server.port))

View File

@ -13,8 +13,9 @@ import threading
from unittest import TestCase, skipUnless from unittest import TestCase, skipUnless
from test import support as test_support from test import support as test_support
from test.support import socket_helper
HOST = test_support.HOST HOST = socket_helper.HOST
PORT = 0 PORT = 0
SUPPORTS_SSL = False SUPPORTS_SSL = False
@ -480,7 +481,7 @@ class TestTimeouts(TestCase):
self.evt = threading.Event() self.evt = threading.Event()
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.settimeout(60) # Safety net. Look issue 11812 self.sock.settimeout(60) # Safety net. Look issue 11812
self.port = test_support.bind_port(self.sock) self.port = socket_helper.bind_port(self.sock)
self.thread = threading.Thread(target=self.server, args=(self.evt, self.sock)) self.thread = threading.Thread(target=self.server, args=(self.evt, self.sock))
self.thread.daemon = True self.thread.daemon = True
self.thread.start() self.thread.start()

View File

@ -4,6 +4,7 @@ import threading
import unittest import unittest
import urllib.robotparser import urllib.robotparser
from test import support from test import support
from test.support import socket_helper
from http.server import BaseHTTPRequestHandler, HTTPServer from http.server import BaseHTTPRequestHandler, HTTPServer
@ -312,7 +313,7 @@ class PasswordProtectedSiteTestCase(unittest.TestCase):
# clear _opener global variable # clear _opener global variable
self.addCleanup(urllib.request.urlcleanup) self.addCleanup(urllib.request.urlcleanup)
self.server = HTTPServer((support.HOST, 0), RobotHandler) self.server = HTTPServer((socket_helper.HOST, 0), RobotHandler)
self.t = threading.Thread( self.t = threading.Thread(
name='HTTPServer serving', name='HTTPServer serving',
@ -332,7 +333,7 @@ class PasswordProtectedSiteTestCase(unittest.TestCase):
@support.reap_threads @support.reap_threads
def testPasswordProtectedSite(self): def testPasswordProtectedSite(self):
addr = self.server.server_address addr = self.server.server_address
url = 'http://' + support.HOST + ':' + str(addr[1]) url = 'http://' + socket_helper.HOST + ':' + str(addr[1])
robots_url = url + "/robots.txt" robots_url = url + "/robots.txt"
parser = urllib.robotparser.RobotFileParser() parser = urllib.robotparser.RobotFileParser()
parser.set_url(url) parser.set_url(url)

View File

@ -6,6 +6,7 @@ import signal
import socket import socket
import sys import sys
from test import support from test import support
from test.support import socket_helper
from time import sleep from time import sleep
import unittest import unittest
import unittest.mock import unittest.mock
@ -22,7 +23,7 @@ if hasattr(socket, 'socketpair'):
else: else:
def socketpair(family=socket.AF_INET, type=socket.SOCK_STREAM, proto=0): def socketpair(family=socket.AF_INET, type=socket.SOCK_STREAM, proto=0):
with socket.socket(family, type, proto) as l: with socket.socket(family, type, proto) as l:
l.bind((support.HOST, 0)) l.bind((socket_helper.HOST, 0))
l.listen() l.listen()
c = socket.socket(family, type, proto) c = socket.socket(family, type, proto)
try: try:

View File

@ -1,6 +1,7 @@
import unittest import unittest
import textwrap import textwrap
from test import support, mock_socket from test import support, mock_socket
from test.support import socket_helper
import socket import socket
import io import io
import smtpd import smtpd
@ -38,7 +39,7 @@ class SMTPDServerTest(unittest.TestCase):
smtpd.socket = asyncore.socket = mock_socket smtpd.socket = asyncore.socket = mock_socket
def test_process_message_unimplemented(self): def test_process_message_unimplemented(self):
server = smtpd.SMTPServer((support.HOST, 0), ('b', 0), server = smtpd.SMTPServer((socket_helper.HOST, 0), ('b', 0),
decode_data=True) decode_data=True)
conn, addr = server.accept() conn, addr = server.accept()
channel = smtpd.SMTPChannel(server, conn, addr, decode_data=True) channel = smtpd.SMTPChannel(server, conn, addr, decode_data=True)
@ -57,7 +58,7 @@ class SMTPDServerTest(unittest.TestCase):
self.assertRaises( self.assertRaises(
ValueError, ValueError,
smtpd.SMTPServer, smtpd.SMTPServer,
(support.HOST, 0), (socket_helper.HOST, 0),
('b', 0), ('b', 0),
enable_SMTPUTF8=True, enable_SMTPUTF8=True,
decode_data=True) decode_data=True)
@ -87,7 +88,7 @@ class DebuggingServerTest(unittest.TestCase):
write_line(b'.') write_line(b'.')
def test_process_message_with_decode_data_true(self): def test_process_message_with_decode_data_true(self):
server = smtpd.DebuggingServer((support.HOST, 0), ('b', 0), server = smtpd.DebuggingServer((socket_helper.HOST, 0), ('b', 0),
decode_data=True) decode_data=True)
conn, addr = server.accept() conn, addr = server.accept()
channel = smtpd.SMTPChannel(server, conn, addr, decode_data=True) channel = smtpd.SMTPChannel(server, conn, addr, decode_data=True)
@ -104,7 +105,7 @@ class DebuggingServerTest(unittest.TestCase):
""")) """))
def test_process_message_with_decode_data_false(self): def test_process_message_with_decode_data_false(self):
server = smtpd.DebuggingServer((support.HOST, 0), ('b', 0)) server = smtpd.DebuggingServer((socket_helper.HOST, 0), ('b', 0))
conn, addr = server.accept() conn, addr = server.accept()
channel = smtpd.SMTPChannel(server, conn, addr) channel = smtpd.SMTPChannel(server, conn, addr)
with support.captured_stdout() as s: with support.captured_stdout() as s:
@ -120,7 +121,7 @@ class DebuggingServerTest(unittest.TestCase):
""")) """))
def test_process_message_with_enable_SMTPUTF8_true(self): def test_process_message_with_enable_SMTPUTF8_true(self):
server = smtpd.DebuggingServer((support.HOST, 0), ('b', 0), server = smtpd.DebuggingServer((socket_helper.HOST, 0), ('b', 0),
enable_SMTPUTF8=True) enable_SMTPUTF8=True)
conn, addr = server.accept() conn, addr = server.accept()
channel = smtpd.SMTPChannel(server, conn, addr, enable_SMTPUTF8=True) channel = smtpd.SMTPChannel(server, conn, addr, enable_SMTPUTF8=True)
@ -137,7 +138,7 @@ class DebuggingServerTest(unittest.TestCase):
""")) """))
def test_process_SMTPUTF8_message_with_enable_SMTPUTF8_true(self): def test_process_SMTPUTF8_message_with_enable_SMTPUTF8_true(self):
server = smtpd.DebuggingServer((support.HOST, 0), ('b', 0), server = smtpd.DebuggingServer((socket_helper.HOST, 0), ('b', 0),
enable_SMTPUTF8=True) enable_SMTPUTF8=True)
conn, addr = server.accept() conn, addr = server.accept()
channel = smtpd.SMTPChannel(server, conn, addr, enable_SMTPUTF8=True) channel = smtpd.SMTPChannel(server, conn, addr, enable_SMTPUTF8=True)
@ -168,13 +169,13 @@ class TestFamilyDetection(unittest.TestCase):
asyncore.close_all() asyncore.close_all()
asyncore.socket = smtpd.socket = socket asyncore.socket = smtpd.socket = socket
@unittest.skipUnless(support.IPV6_ENABLED, "IPv6 not enabled") @unittest.skipUnless(socket_helper.IPV6_ENABLED, "IPv6 not enabled")
def test_socket_uses_IPv6(self): def test_socket_uses_IPv6(self):
server = smtpd.SMTPServer((support.HOSTv6, 0), (support.HOSTv4, 0)) server = smtpd.SMTPServer((socket_helper.HOSTv6, 0), (socket_helper.HOSTv4, 0))
self.assertEqual(server.socket.family, socket.AF_INET6) self.assertEqual(server.socket.family, socket.AF_INET6)
def test_socket_uses_IPv4(self): def test_socket_uses_IPv4(self):
server = smtpd.SMTPServer((support.HOSTv4, 0), (support.HOSTv6, 0)) server = smtpd.SMTPServer((socket_helper.HOSTv4, 0), (socket_helper.HOSTv6, 0))
self.assertEqual(server.socket.family, socket.AF_INET) self.assertEqual(server.socket.family, socket.AF_INET)
@ -197,7 +198,7 @@ class TestRcptOptionParsing(unittest.TestCase):
channel.handle_read() channel.handle_read()
def test_params_rejected(self): def test_params_rejected(self):
server = DummyServer((support.HOST, 0), ('b', 0)) server = DummyServer((socket_helper.HOST, 0), ('b', 0))
conn, addr = server.accept() conn, addr = server.accept()
channel = smtpd.SMTPChannel(server, conn, addr) channel = smtpd.SMTPChannel(server, conn, addr)
self.write_line(channel, b'EHLO example') self.write_line(channel, b'EHLO example')
@ -206,7 +207,7 @@ class TestRcptOptionParsing(unittest.TestCase):
self.assertEqual(channel.socket.last, self.error_response) self.assertEqual(channel.socket.last, self.error_response)
def test_nothing_accepted(self): def test_nothing_accepted(self):
server = DummyServer((support.HOST, 0), ('b', 0)) server = DummyServer((socket_helper.HOST, 0), ('b', 0))
conn, addr = server.accept() conn, addr = server.accept()
channel = smtpd.SMTPChannel(server, conn, addr) channel = smtpd.SMTPChannel(server, conn, addr)
self.write_line(channel, b'EHLO example') self.write_line(channel, b'EHLO example')
@ -234,7 +235,7 @@ class TestMailOptionParsing(unittest.TestCase):
channel.handle_read() channel.handle_read()
def test_with_decode_data_true(self): def test_with_decode_data_true(self):
server = DummyServer((support.HOST, 0), ('b', 0), decode_data=True) server = DummyServer((socket_helper.HOST, 0), ('b', 0), decode_data=True)
conn, addr = server.accept() conn, addr = server.accept()
channel = smtpd.SMTPChannel(server, conn, addr, decode_data=True) channel = smtpd.SMTPChannel(server, conn, addr, decode_data=True)
self.write_line(channel, b'EHLO example') self.write_line(channel, b'EHLO example')
@ -250,7 +251,7 @@ class TestMailOptionParsing(unittest.TestCase):
self.assertEqual(channel.socket.last, b'250 OK\r\n') self.assertEqual(channel.socket.last, b'250 OK\r\n')
def test_with_decode_data_false(self): def test_with_decode_data_false(self):
server = DummyServer((support.HOST, 0), ('b', 0)) server = DummyServer((socket_helper.HOST, 0), ('b', 0))
conn, addr = server.accept() conn, addr = server.accept()
channel = smtpd.SMTPChannel(server, conn, addr) channel = smtpd.SMTPChannel(server, conn, addr)
self.write_line(channel, b'EHLO example') self.write_line(channel, b'EHLO example')
@ -271,7 +272,7 @@ class TestMailOptionParsing(unittest.TestCase):
self.assertEqual(channel.socket.last, b'250 OK\r\n') self.assertEqual(channel.socket.last, b'250 OK\r\n')
def test_with_enable_smtputf8_true(self): def test_with_enable_smtputf8_true(self):
server = DummyServer((support.HOST, 0), ('b', 0), enable_SMTPUTF8=True) server = DummyServer((socket_helper.HOST, 0), ('b', 0), enable_SMTPUTF8=True)
conn, addr = server.accept() conn, addr = server.accept()
channel = smtpd.SMTPChannel(server, conn, addr, enable_SMTPUTF8=True) channel = smtpd.SMTPChannel(server, conn, addr, enable_SMTPUTF8=True)
self.write_line(channel, b'EHLO example') self.write_line(channel, b'EHLO example')
@ -286,7 +287,7 @@ class SMTPDChannelTest(unittest.TestCase):
smtpd.socket = asyncore.socket = mock_socket smtpd.socket = asyncore.socket = mock_socket
self.old_debugstream = smtpd.DEBUGSTREAM self.old_debugstream = smtpd.DEBUGSTREAM
self.debug = smtpd.DEBUGSTREAM = io.StringIO() self.debug = smtpd.DEBUGSTREAM = io.StringIO()
self.server = DummyServer((support.HOST, 0), ('b', 0), self.server = DummyServer((socket_helper.HOST, 0), ('b', 0),
decode_data=True) decode_data=True)
conn, addr = self.server.accept() conn, addr = self.server.accept()
self.channel = smtpd.SMTPChannel(self.server, conn, addr, self.channel = smtpd.SMTPChannel(self.server, conn, addr,
@ -304,7 +305,7 @@ class SMTPDChannelTest(unittest.TestCase):
def test_broken_connect(self): def test_broken_connect(self):
self.assertRaises( self.assertRaises(
DummyDispatcherBroken, BrokenDummyServer, DummyDispatcherBroken, BrokenDummyServer,
(support.HOST, 0), ('b', 0), decode_data=True) (socket_helper.HOST, 0), ('b', 0), decode_data=True)
def test_decode_data_and_enable_SMTPUTF8_raises(self): def test_decode_data_and_enable_SMTPUTF8_raises(self):
self.assertRaises( self.assertRaises(
@ -758,13 +759,13 @@ class SMTPDChannelTest(unittest.TestCase):
with support.check_warnings(('', DeprecationWarning)): with support.check_warnings(('', DeprecationWarning)):
self.channel._SMTPChannel__addr = 'spam' self.channel._SMTPChannel__addr = 'spam'
@unittest.skipUnless(support.IPV6_ENABLED, "IPv6 not enabled") @unittest.skipUnless(socket_helper.IPV6_ENABLED, "IPv6 not enabled")
class SMTPDChannelIPv6Test(SMTPDChannelTest): class SMTPDChannelIPv6Test(SMTPDChannelTest):
def setUp(self): def setUp(self):
smtpd.socket = asyncore.socket = mock_socket smtpd.socket = asyncore.socket = mock_socket
self.old_debugstream = smtpd.DEBUGSTREAM self.old_debugstream = smtpd.DEBUGSTREAM
self.debug = smtpd.DEBUGSTREAM = io.StringIO() self.debug = smtpd.DEBUGSTREAM = io.StringIO()
self.server = DummyServer((support.HOSTv6, 0), ('b', 0), self.server = DummyServer((socket_helper.HOSTv6, 0), ('b', 0),
decode_data=True) decode_data=True)
conn, addr = self.server.accept() conn, addr = self.server.accept()
self.channel = smtpd.SMTPChannel(self.server, conn, addr, self.channel = smtpd.SMTPChannel(self.server, conn, addr,
@ -776,7 +777,7 @@ class SMTPDChannelWithDataSizeLimitTest(unittest.TestCase):
smtpd.socket = asyncore.socket = mock_socket smtpd.socket = asyncore.socket = mock_socket
self.old_debugstream = smtpd.DEBUGSTREAM self.old_debugstream = smtpd.DEBUGSTREAM
self.debug = smtpd.DEBUGSTREAM = io.StringIO() self.debug = smtpd.DEBUGSTREAM = io.StringIO()
self.server = DummyServer((support.HOST, 0), ('b', 0), self.server = DummyServer((socket_helper.HOST, 0), ('b', 0),
decode_data=True) decode_data=True)
conn, addr = self.server.accept() conn, addr = self.server.accept()
# Set DATA size limit to 32 bytes for easy testing # Set DATA size limit to 32 bytes for easy testing
@ -831,7 +832,7 @@ class SMTPDChannelWithDecodeDataFalse(unittest.TestCase):
smtpd.socket = asyncore.socket = mock_socket smtpd.socket = asyncore.socket = mock_socket
self.old_debugstream = smtpd.DEBUGSTREAM self.old_debugstream = smtpd.DEBUGSTREAM
self.debug = smtpd.DEBUGSTREAM = io.StringIO() self.debug = smtpd.DEBUGSTREAM = io.StringIO()
self.server = DummyServer((support.HOST, 0), ('b', 0)) self.server = DummyServer((socket_helper.HOST, 0), ('b', 0))
conn, addr = self.server.accept() conn, addr = self.server.accept()
self.channel = smtpd.SMTPChannel(self.server, conn, addr) self.channel = smtpd.SMTPChannel(self.server, conn, addr)
@ -873,7 +874,7 @@ class SMTPDChannelWithDecodeDataTrue(unittest.TestCase):
smtpd.socket = asyncore.socket = mock_socket smtpd.socket = asyncore.socket = mock_socket
self.old_debugstream = smtpd.DEBUGSTREAM self.old_debugstream = smtpd.DEBUGSTREAM
self.debug = smtpd.DEBUGSTREAM = io.StringIO() self.debug = smtpd.DEBUGSTREAM = io.StringIO()
self.server = DummyServer((support.HOST, 0), ('b', 0), self.server = DummyServer((socket_helper.HOST, 0), ('b', 0),
decode_data=True) decode_data=True)
conn, addr = self.server.accept() conn, addr = self.server.accept()
# Set decode_data to True # Set decode_data to True
@ -916,7 +917,7 @@ class SMTPDChannelTestWithEnableSMTPUTF8True(unittest.TestCase):
smtpd.socket = asyncore.socket = mock_socket smtpd.socket = asyncore.socket = mock_socket
self.old_debugstream = smtpd.DEBUGSTREAM self.old_debugstream = smtpd.DEBUGSTREAM
self.debug = smtpd.DEBUGSTREAM = io.StringIO() self.debug = smtpd.DEBUGSTREAM = io.StringIO()
self.server = DummyServer((support.HOST, 0), ('b', 0), self.server = DummyServer((socket_helper.HOST, 0), ('b', 0),
enable_SMTPUTF8=True) enable_SMTPUTF8=True)
conn, addr = self.server.accept() conn, addr = self.server.accept()
self.channel = smtpd.SMTPChannel(self.server, conn, addr, self.channel = smtpd.SMTPChannel(self.server, conn, addr,

View File

@ -20,11 +20,12 @@ import threading
import unittest import unittest
from test import support, mock_socket from test import support, mock_socket
from test.support import HOST from test.support import socket_helper
from test.support import threading_setup, threading_cleanup, join_thread from test.support import threading_setup, threading_cleanup, join_thread
from test.support import requires_hashdigest from test.support import requires_hashdigest
from unittest.mock import Mock from unittest.mock import Mock
HOST = socket_helper.HOST
if sys.platform == 'darwin': if sys.platform == 'darwin':
# select.poll returns a select.POLLHUP at the end of the tests # select.poll returns a select.POLLHUP at the end of the tests
@ -271,7 +272,7 @@ class DebuggingServerTests(unittest.TestCase):
def testSourceAddress(self): def testSourceAddress(self):
# connect # connect
src_port = support.find_unused_port() src_port = socket_helper.find_unused_port()
try: try:
smtp = smtplib.SMTP(self.host, self.port, local_hostname='localhost', smtp = smtplib.SMTP(self.host, self.port, local_hostname='localhost',
timeout=support.LOOPBACK_TIMEOUT, timeout=support.LOOPBACK_TIMEOUT,
@ -711,7 +712,7 @@ class TooLongLineTests(unittest.TestCase):
self.evt = threading.Event() self.evt = threading.Event()
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.settimeout(15) self.sock.settimeout(15)
self.port = support.bind_port(self.sock) self.port = socket_helper.bind_port(self.sock)
servargs = (self.evt, self.respdata, self.sock) servargs = (self.evt, self.respdata, self.sock)
self.thread = threading.Thread(target=server, args=servargs) self.thread = threading.Thread(target=server, args=servargs)
self.thread.start() self.thread.start()

View File

@ -1,5 +1,6 @@
import unittest import unittest
from test import support from test import support
from test.support import socket_helper
import errno import errno
import io import io
@ -34,7 +35,7 @@ try:
except ImportError: except ImportError:
fcntl = None fcntl = None
HOST = support.HOST HOST = socket_helper.HOST
# test unicode string and carriage return # test unicode string and carriage return
MSG = 'Michael Gilfix was here\u1234\r\n'.encode('utf-8') MSG = 'Michael Gilfix was here\u1234\r\n'.encode('utf-8')
@ -161,7 +162,7 @@ class SocketTCPTest(unittest.TestCase):
def setUp(self): def setUp(self):
self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.port = support.bind_port(self.serv) self.port = socket_helper.bind_port(self.serv)
self.serv.listen() self.serv.listen()
def tearDown(self): def tearDown(self):
@ -172,7 +173,7 @@ class SocketUDPTest(unittest.TestCase):
def setUp(self): def setUp(self):
self.serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.port = support.bind_port(self.serv) self.port = socket_helper.bind_port(self.serv)
def tearDown(self): def tearDown(self):
self.serv.close() self.serv.close()
@ -182,7 +183,7 @@ class SocketUDPLITETest(SocketUDPTest):
def setUp(self): def setUp(self):
self.serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDPLITE) self.serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDPLITE)
self.port = support.bind_port(self.serv) self.port = socket_helper.bind_port(self.serv)
class ThreadSafeCleanupTestCase(unittest.TestCase): class ThreadSafeCleanupTestCase(unittest.TestCase):
"""Subclass of unittest.TestCase with thread-safe cleanup methods. """Subclass of unittest.TestCase with thread-safe cleanup methods.
@ -265,7 +266,7 @@ class SocketRDSTest(unittest.TestCase):
self.serv = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) self.serv = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0)
self.addCleanup(self.serv.close) self.addCleanup(self.serv.close)
try: try:
self.port = support.bind_port(self.serv) self.port = socket_helper.bind_port(self.serv)
except OSError: except OSError:
self.skipTest('unable to bind RDS socket') self.skipTest('unable to bind RDS socket')
@ -682,7 +683,7 @@ class UnixSocketTestBase(SocketTestBase):
def bindSock(self, sock): def bindSock(self, sock):
path = tempfile.mktemp(dir=self.dir_path) path = tempfile.mktemp(dir=self.dir_path)
support.bind_unix_socket(sock, path) socket_helper.bind_unix_socket(sock, path)
self.addCleanup(support.unlink, path) self.addCleanup(support.unlink, path)
class UnixStreamBase(UnixSocketTestBase): class UnixStreamBase(UnixSocketTestBase):
@ -702,7 +703,7 @@ class InetTestBase(SocketTestBase):
self.port = self.serv_addr[1] self.port = self.serv_addr[1]
def bindSock(self, sock): def bindSock(self, sock):
support.bind_port(sock, host=self.host) socket_helper.bind_port(sock, host=self.host)
class TCPTestBase(InetTestBase): class TCPTestBase(InetTestBase):
"""Base class for TCP-over-IPv4 tests.""" """Base class for TCP-over-IPv4 tests."""
@ -733,7 +734,7 @@ class SCTPStreamBase(InetTestBase):
class Inet6TestBase(InetTestBase): class Inet6TestBase(InetTestBase):
"""Base class for IPv6 socket tests.""" """Base class for IPv6 socket tests."""
host = support.HOSTv6 host = socket_helper.HOSTv6
class UDP6TestBase(Inet6TestBase): class UDP6TestBase(Inet6TestBase):
"""Base class for UDP-over-IPv6 tests.""" """Base class for UDP-over-IPv6 tests."""
@ -966,12 +967,12 @@ class GeneralModuleTests(unittest.TestCase):
self.fail("Error testing host resolution mechanisms. (fqdn: %s, all: %s)" % (fqhn, repr(all_host_names))) self.fail("Error testing host resolution mechanisms. (fqdn: %s, all: %s)" % (fqhn, repr(all_host_names)))
def test_host_resolution(self): def test_host_resolution(self):
for addr in [support.HOSTv4, '10.0.0.1', '255.255.255.255']: for addr in [socket_helper.HOSTv4, '10.0.0.1', '255.255.255.255']:
self.assertEqual(socket.gethostbyname(addr), addr) self.assertEqual(socket.gethostbyname(addr), addr)
# we don't test support.HOSTv6 because there's a chance it doesn't have # we don't test socket_helper.HOSTv6 because there's a chance it doesn't have
# a matching name entry (e.g. 'ip6-localhost') # a matching name entry (e.g. 'ip6-localhost')
for host in [support.HOSTv4]: for host in [socket_helper.HOSTv4]:
self.assertIn(host, socket.gethostbyaddr(host)[2]) self.assertIn(host, socket.gethostbyaddr(host)[2])
def test_host_resolution_bad_address(self): def test_host_resolution_bad_address(self):
@ -1334,7 +1335,7 @@ class GeneralModuleTests(unittest.TestCase):
def testSockName(self): def testSockName(self):
# Testing getsockname() # Testing getsockname()
port = support.find_unused_port() port = socket_helper.find_unused_port()
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.addCleanup(sock.close) self.addCleanup(sock.close)
sock.bind(("0.0.0.0", port)) sock.bind(("0.0.0.0", port))
@ -1400,7 +1401,7 @@ class GeneralModuleTests(unittest.TestCase):
def test_getsockaddrarg(self): def test_getsockaddrarg(self):
sock = socket.socket() sock = socket.socket()
self.addCleanup(sock.close) self.addCleanup(sock.close)
port = support.find_unused_port() port = socket_helper.find_unused_port()
big_port = port + 65536 big_port = port + 65536
neg_port = port - 65536 neg_port = port - 65536
self.assertRaises(OverflowError, sock.bind, (HOST, big_port)) self.assertRaises(OverflowError, sock.bind, (HOST, big_port))
@ -1408,7 +1409,7 @@ class GeneralModuleTests(unittest.TestCase):
# Since find_unused_port() is inherently subject to race conditions, we # Since find_unused_port() is inherently subject to race conditions, we
# call it a couple times if necessary. # call it a couple times if necessary.
for i in itertools.count(): for i in itertools.count():
port = support.find_unused_port() port = socket_helper.find_unused_port()
try: try:
sock.bind((HOST, port)) sock.bind((HOST, port))
except OSError as e: except OSError as e:
@ -1461,7 +1462,7 @@ class GeneralModuleTests(unittest.TestCase):
socket.getaddrinfo('localhost', 80) socket.getaddrinfo('localhost', 80)
socket.getaddrinfo('127.0.0.1', 80) socket.getaddrinfo('127.0.0.1', 80)
socket.getaddrinfo(None, 80) socket.getaddrinfo(None, 80)
if support.IPV6_ENABLED: if socket_helper.IPV6_ENABLED:
socket.getaddrinfo('::1', 80) socket.getaddrinfo('::1', 80)
# port can be a string service name such as "http", a numeric # port can be a string service name such as "http", a numeric
# port number or None # port number or None
@ -1674,14 +1675,14 @@ class GeneralModuleTests(unittest.TestCase):
srv.bind((HOST, 0)) srv.bind((HOST, 0))
self.assertRaises(OverflowError, srv.listen, _testcapi.INT_MAX + 1) self.assertRaises(OverflowError, srv.listen, _testcapi.INT_MAX + 1)
@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.') @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
def test_flowinfo(self): def test_flowinfo(self):
self.assertRaises(OverflowError, socket.getnameinfo, self.assertRaises(OverflowError, socket.getnameinfo,
(support.HOSTv6, 0, 0xffffffff), 0) (socket_helper.HOSTv6, 0, 0xffffffff), 0)
with socket.socket(socket.AF_INET6, socket.SOCK_STREAM) as s: with socket.socket(socket.AF_INET6, socket.SOCK_STREAM) as s:
self.assertRaises(OverflowError, s.bind, (support.HOSTv6, 0, -10)) self.assertRaises(OverflowError, s.bind, (socket_helper.HOSTv6, 0, -10))
@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.') @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
def test_getaddrinfo_ipv6_basic(self): def test_getaddrinfo_ipv6_basic(self):
((*_, sockaddr),) = socket.getaddrinfo( ((*_, sockaddr),) = socket.getaddrinfo(
'ff02::1de:c0:face:8D', # Note capital letter `D`. 'ff02::1de:c0:face:8D', # Note capital letter `D`.
@ -1691,7 +1692,7 @@ class GeneralModuleTests(unittest.TestCase):
) )
self.assertEqual(sockaddr, ('ff02::1de:c0:face:8d', 1234, 0, 0)) self.assertEqual(sockaddr, ('ff02::1de:c0:face:8d', 1234, 0, 0))
@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.') @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
@unittest.skipIf(sys.platform == 'win32', 'does not work on Windows') @unittest.skipIf(sys.platform == 'win32', 'does not work on Windows')
@unittest.skipIf(AIX, 'Symbolic scope id does not work') @unittest.skipIf(AIX, 'Symbolic scope id does not work')
def test_getaddrinfo_ipv6_scopeid_symbolic(self): def test_getaddrinfo_ipv6_scopeid_symbolic(self):
@ -1706,7 +1707,7 @@ class GeneralModuleTests(unittest.TestCase):
# Note missing interface name part in IPv6 address # Note missing interface name part in IPv6 address
self.assertEqual(sockaddr, ('ff02::1de:c0:face:8d', 1234, 0, ifindex)) self.assertEqual(sockaddr, ('ff02::1de:c0:face:8d', 1234, 0, ifindex))
@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.') @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
@unittest.skipUnless( @unittest.skipUnless(
sys.platform == 'win32', sys.platform == 'win32',
'Numeric scope id does not work or undocumented') 'Numeric scope id does not work or undocumented')
@ -1723,7 +1724,7 @@ class GeneralModuleTests(unittest.TestCase):
# Note missing interface name part in IPv6 address # Note missing interface name part in IPv6 address
self.assertEqual(sockaddr, ('ff02::1de:c0:face:8d', 1234, 0, ifindex)) self.assertEqual(sockaddr, ('ff02::1de:c0:face:8d', 1234, 0, ifindex))
@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.') @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
@unittest.skipIf(sys.platform == 'win32', 'does not work on Windows') @unittest.skipIf(sys.platform == 'win32', 'does not work on Windows')
@unittest.skipIf(AIX, 'Symbolic scope id does not work') @unittest.skipIf(AIX, 'Symbolic scope id does not work')
def test_getnameinfo_ipv6_scopeid_symbolic(self): def test_getnameinfo_ipv6_scopeid_symbolic(self):
@ -1733,7 +1734,7 @@ class GeneralModuleTests(unittest.TestCase):
nameinfo = socket.getnameinfo(sockaddr, socket.NI_NUMERICHOST | socket.NI_NUMERICSERV) nameinfo = socket.getnameinfo(sockaddr, socket.NI_NUMERICHOST | socket.NI_NUMERICSERV)
self.assertEqual(nameinfo, ('ff02::1de:c0:face:8d%' + test_interface, '1234')) self.assertEqual(nameinfo, ('ff02::1de:c0:face:8d%' + test_interface, '1234'))
@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.') @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
@unittest.skipUnless( sys.platform == 'win32', @unittest.skipUnless( sys.platform == 'win32',
'Numeric scope id does not work or undocumented') 'Numeric scope id does not work or undocumented')
def test_getnameinfo_ipv6_scopeid_numeric(self): def test_getnameinfo_ipv6_scopeid_numeric(self):
@ -1826,19 +1827,19 @@ class GeneralModuleTests(unittest.TestCase):
def test_socket_fileno(self): def test_socket_fileno(self):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.addCleanup(s.close) self.addCleanup(s.close)
s.bind((support.HOST, 0)) s.bind((socket_helper.HOST, 0))
self._test_socket_fileno(s, socket.AF_INET, socket.SOCK_STREAM) self._test_socket_fileno(s, socket.AF_INET, socket.SOCK_STREAM)
if hasattr(socket, "SOCK_DGRAM"): if hasattr(socket, "SOCK_DGRAM"):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.addCleanup(s.close) self.addCleanup(s.close)
s.bind((support.HOST, 0)) s.bind((socket_helper.HOST, 0))
self._test_socket_fileno(s, socket.AF_INET, socket.SOCK_DGRAM) self._test_socket_fileno(s, socket.AF_INET, socket.SOCK_DGRAM)
if support.IPV6_ENABLED: if socket_helper.IPV6_ENABLED:
s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
self.addCleanup(s.close) self.addCleanup(s.close)
s.bind((support.HOSTv6, 0, 0, 0)) s.bind((socket_helper.HOSTv6, 0, 0, 0))
self._test_socket_fileno(s, socket.AF_INET6, socket.SOCK_STREAM) self._test_socket_fileno(s, socket.AF_INET6, socket.SOCK_STREAM)
if hasattr(socket, "AF_UNIX"): if hasattr(socket, "AF_UNIX"):
@ -2214,12 +2215,12 @@ class BasicQIPCRTRTest(unittest.TestCase):
def testBindSock(self): def testBindSock(self):
with socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM) as s: with socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM) as s:
support.bind_port(s, host=s.getsockname()[0]) socket_helper.bind_port(s, host=s.getsockname()[0])
self.assertNotEqual(s.getsockname()[1], 0) self.assertNotEqual(s.getsockname()[1], 0)
def testInvalidBindSock(self): def testInvalidBindSock(self):
with socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM) as s: with socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM) as s:
self.assertRaises(OSError, support.bind_port, s, host=-2) self.assertRaises(OSError, socket_helper.bind_port, s, host=-2)
def testAutoBindSock(self): def testAutoBindSock(self):
with socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM) as s: with socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM) as s:
@ -4094,25 +4095,25 @@ class SendrecvmsgUDP6TestBase(SendrecvmsgDgramFlagsBase,
self.assertEqual(addr1[:-1], addr2[:-1]) self.assertEqual(addr1[:-1], addr2[:-1])
@requireAttrs(socket.socket, "sendmsg") @requireAttrs(socket.socket, "sendmsg")
@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.') @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
@requireSocket("AF_INET6", "SOCK_DGRAM") @requireSocket("AF_INET6", "SOCK_DGRAM")
class SendmsgUDP6Test(SendmsgConnectionlessTests, SendrecvmsgUDP6TestBase): class SendmsgUDP6Test(SendmsgConnectionlessTests, SendrecvmsgUDP6TestBase):
pass pass
@requireAttrs(socket.socket, "recvmsg") @requireAttrs(socket.socket, "recvmsg")
@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.') @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
@requireSocket("AF_INET6", "SOCK_DGRAM") @requireSocket("AF_INET6", "SOCK_DGRAM")
class RecvmsgUDP6Test(RecvmsgTests, SendrecvmsgUDP6TestBase): class RecvmsgUDP6Test(RecvmsgTests, SendrecvmsgUDP6TestBase):
pass pass
@requireAttrs(socket.socket, "recvmsg_into") @requireAttrs(socket.socket, "recvmsg_into")
@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.') @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
@requireSocket("AF_INET6", "SOCK_DGRAM") @requireSocket("AF_INET6", "SOCK_DGRAM")
class RecvmsgIntoUDP6Test(RecvmsgIntoTests, SendrecvmsgUDP6TestBase): class RecvmsgIntoUDP6Test(RecvmsgIntoTests, SendrecvmsgUDP6TestBase):
pass pass
@requireAttrs(socket.socket, "recvmsg") @requireAttrs(socket.socket, "recvmsg")
@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.') @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
@requireAttrs(socket, "IPPROTO_IPV6") @requireAttrs(socket, "IPPROTO_IPV6")
@requireSocket("AF_INET6", "SOCK_DGRAM") @requireSocket("AF_INET6", "SOCK_DGRAM")
class RecvmsgRFC3542AncillaryUDP6Test(RFC3542AncillaryTest, class RecvmsgRFC3542AncillaryUDP6Test(RFC3542AncillaryTest,
@ -4120,7 +4121,7 @@ class RecvmsgRFC3542AncillaryUDP6Test(RFC3542AncillaryTest,
pass pass
@requireAttrs(socket.socket, "recvmsg_into") @requireAttrs(socket.socket, "recvmsg_into")
@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.') @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
@requireAttrs(socket, "IPPROTO_IPV6") @requireAttrs(socket, "IPPROTO_IPV6")
@requireSocket("AF_INET6", "SOCK_DGRAM") @requireSocket("AF_INET6", "SOCK_DGRAM")
class RecvmsgIntoRFC3542AncillaryUDP6Test(RecvmsgIntoMixin, class RecvmsgIntoRFC3542AncillaryUDP6Test(RecvmsgIntoMixin,
@ -4167,7 +4168,7 @@ class SendrecvmsgUDPLITE6TestBase(SendrecvmsgDgramFlagsBase,
self.assertEqual(addr1[:-1], addr2[:-1]) self.assertEqual(addr1[:-1], addr2[:-1])
@requireAttrs(socket.socket, "sendmsg") @requireAttrs(socket.socket, "sendmsg")
@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.') @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
@unittest.skipUnless(HAVE_SOCKET_UDPLITE, @unittest.skipUnless(HAVE_SOCKET_UDPLITE,
'UDPLITE sockets required for this test.') 'UDPLITE sockets required for this test.')
@requireSocket("AF_INET6", "SOCK_DGRAM") @requireSocket("AF_INET6", "SOCK_DGRAM")
@ -4175,7 +4176,7 @@ class SendmsgUDPLITE6Test(SendmsgConnectionlessTests, SendrecvmsgUDPLITE6TestBas
pass pass
@requireAttrs(socket.socket, "recvmsg") @requireAttrs(socket.socket, "recvmsg")
@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.') @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
@unittest.skipUnless(HAVE_SOCKET_UDPLITE, @unittest.skipUnless(HAVE_SOCKET_UDPLITE,
'UDPLITE sockets required for this test.') 'UDPLITE sockets required for this test.')
@requireSocket("AF_INET6", "SOCK_DGRAM") @requireSocket("AF_INET6", "SOCK_DGRAM")
@ -4183,7 +4184,7 @@ class RecvmsgUDPLITE6Test(RecvmsgTests, SendrecvmsgUDPLITE6TestBase):
pass pass
@requireAttrs(socket.socket, "recvmsg_into") @requireAttrs(socket.socket, "recvmsg_into")
@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.') @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
@unittest.skipUnless(HAVE_SOCKET_UDPLITE, @unittest.skipUnless(HAVE_SOCKET_UDPLITE,
'UDPLITE sockets required for this test.') 'UDPLITE sockets required for this test.')
@requireSocket("AF_INET6", "SOCK_DGRAM") @requireSocket("AF_INET6", "SOCK_DGRAM")
@ -4191,7 +4192,7 @@ class RecvmsgIntoUDPLITE6Test(RecvmsgIntoTests, SendrecvmsgUDPLITE6TestBase):
pass pass
@requireAttrs(socket.socket, "recvmsg") @requireAttrs(socket.socket, "recvmsg")
@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.') @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
@unittest.skipUnless(HAVE_SOCKET_UDPLITE, @unittest.skipUnless(HAVE_SOCKET_UDPLITE,
'UDPLITE sockets required for this test.') 'UDPLITE sockets required for this test.')
@requireAttrs(socket, "IPPROTO_IPV6") @requireAttrs(socket, "IPPROTO_IPV6")
@ -4201,7 +4202,7 @@ class RecvmsgRFC3542AncillaryUDPLITE6Test(RFC3542AncillaryTest,
pass pass
@requireAttrs(socket.socket, "recvmsg_into") @requireAttrs(socket.socket, "recvmsg_into")
@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.') @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
@unittest.skipUnless(HAVE_SOCKET_UDPLITE, @unittest.skipUnless(HAVE_SOCKET_UDPLITE,
'UDPLITE sockets required for this test.') 'UDPLITE sockets required for this test.')
@requireAttrs(socket, "IPPROTO_IPV6") @requireAttrs(socket, "IPPROTO_IPV6")
@ -4999,7 +5000,7 @@ class NetworkConnectionNoServer(unittest.TestCase):
socket.socket = old_socket socket.socket = old_socket
def test_connect(self): def test_connect(self):
port = support.find_unused_port() port = socket_helper.find_unused_port()
cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM) cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.addCleanup(cli.close) self.addCleanup(cli.close)
with self.assertRaises(OSError) as cm: with self.assertRaises(OSError) as cm:
@ -5009,7 +5010,7 @@ class NetworkConnectionNoServer(unittest.TestCase):
def test_create_connection(self): def test_create_connection(self):
# Issue #9792: errors raised by create_connection() should have # Issue #9792: errors raised by create_connection() should have
# a proper errno attribute. # a proper errno attribute.
port = support.find_unused_port() port = socket_helper.find_unused_port()
with self.assertRaises(OSError) as cm: with self.assertRaises(OSError) as cm:
socket.create_connection((HOST, port)) socket.create_connection((HOST, port))
@ -5027,7 +5028,7 @@ class NetworkConnectionNoServer(unittest.TestCase):
# On Solaris, ENETUNREACH is returned in this circumstance instead # On Solaris, ENETUNREACH is returned in this circumstance instead
# of ECONNREFUSED. So, if that errno exists, add it to our list of # of ECONNREFUSED. So, if that errno exists, add it to our list of
# expected errnos. # expected errnos.
expected_errnos = support.get_socket_conn_refused_errs() expected_errnos = socket_helper.get_socket_conn_refused_errs()
self.assertIn(cm.exception.errno, expected_errnos) self.assertIn(cm.exception.errno, expected_errnos)
def test_create_connection_timeout(self): def test_create_connection_timeout(self):
@ -5039,7 +5040,7 @@ class NetworkConnectionNoServer(unittest.TestCase):
except socket.timeout: except socket.timeout:
pass pass
except OSError as exc: except OSError as exc:
if support.IPV6_ENABLED or exc.errno != errno.EAFNOSUPPORT: if socket_helper.IPV6_ENABLED or exc.errno != errno.EAFNOSUPPORT:
raise raise
else: else:
self.fail('socket.timeout not raised') self.fail('socket.timeout not raised')
@ -5052,7 +5053,7 @@ class NetworkConnectionAttributesTest(SocketTCPTest, ThreadableTest):
ThreadableTest.__init__(self) ThreadableTest.__init__(self)
def clientSetUp(self): def clientSetUp(self):
self.source_port = support.find_unused_port() self.source_port = socket_helper.find_unused_port()
def clientTearDown(self): def clientTearDown(self):
self.cli.close() self.cli.close()
@ -5338,7 +5339,7 @@ class TestUnixDomain(unittest.TestCase):
def bind(self, sock, path): def bind(self, sock, path):
# Bind the socket # Bind the socket
try: try:
support.bind_unix_socket(sock, path) socket_helper.bind_unix_socket(sock, path)
except OSError as e: except OSError as e:
if str(e) == "AF_UNIX path too long": if str(e) == "AF_UNIX path too long":
self.skipTest( self.skipTest(
@ -6328,11 +6329,11 @@ class TestMSWindowsTCPFlags(unittest.TestCase):
class CreateServerTest(unittest.TestCase): class CreateServerTest(unittest.TestCase):
def test_address(self): def test_address(self):
port = support.find_unused_port() port = socket_helper.find_unused_port()
with socket.create_server(("127.0.0.1", port)) as sock: with socket.create_server(("127.0.0.1", port)) as sock:
self.assertEqual(sock.getsockname()[0], "127.0.0.1") self.assertEqual(sock.getsockname()[0], "127.0.0.1")
self.assertEqual(sock.getsockname()[1], port) self.assertEqual(sock.getsockname()[1], port)
if support.IPV6_ENABLED: if socket_helper.IPV6_ENABLED:
with socket.create_server(("::1", port), with socket.create_server(("::1", port),
family=socket.AF_INET6) as sock: family=socket.AF_INET6) as sock:
self.assertEqual(sock.getsockname()[0], "::1") self.assertEqual(sock.getsockname()[0], "::1")
@ -6342,7 +6343,7 @@ class CreateServerTest(unittest.TestCase):
with socket.create_server(("127.0.0.1", 0)) as sock: with socket.create_server(("127.0.0.1", 0)) as sock:
self.assertEqual(sock.family, socket.AF_INET) self.assertEqual(sock.family, socket.AF_INET)
self.assertEqual(sock.type, socket.SOCK_STREAM) self.assertEqual(sock.type, socket.SOCK_STREAM)
if support.IPV6_ENABLED: if socket_helper.IPV6_ENABLED:
with socket.create_server(("::1", 0), family=socket.AF_INET6) as s: with socket.create_server(("::1", 0), family=socket.AF_INET6) as s:
self.assertEqual(s.family, socket.AF_INET6) self.assertEqual(s.family, socket.AF_INET6)
self.assertEqual(sock.type, socket.SOCK_STREAM) self.assertEqual(sock.type, socket.SOCK_STREAM)
@ -6362,14 +6363,14 @@ class CreateServerTest(unittest.TestCase):
@unittest.skipIf(not hasattr(_socket, 'IPPROTO_IPV6') or @unittest.skipIf(not hasattr(_socket, 'IPPROTO_IPV6') or
not hasattr(_socket, 'IPV6_V6ONLY'), not hasattr(_socket, 'IPV6_V6ONLY'),
"IPV6_V6ONLY option not supported") "IPV6_V6ONLY option not supported")
@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test') @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test')
def test_ipv6_only_default(self): def test_ipv6_only_default(self):
with socket.create_server(("::1", 0), family=socket.AF_INET6) as sock: with socket.create_server(("::1", 0), family=socket.AF_INET6) as sock:
assert sock.getsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY) assert sock.getsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY)
@unittest.skipIf(not socket.has_dualstack_ipv6(), @unittest.skipIf(not socket.has_dualstack_ipv6(),
"dualstack_ipv6 not supported") "dualstack_ipv6 not supported")
@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test') @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test')
def test_dualstack_ipv6_family(self): def test_dualstack_ipv6_family(self):
with socket.create_server(("::1", 0), family=socket.AF_INET6, with socket.create_server(("::1", 0), family=socket.AF_INET6,
dualstack_ipv6=True) as sock: dualstack_ipv6=True) as sock:
@ -6411,14 +6412,14 @@ class CreateServerFunctionalTest(unittest.TestCase):
self.assertEqual(sock.recv(1024), b'foo') self.assertEqual(sock.recv(1024), b'foo')
def test_tcp4(self): def test_tcp4(self):
port = support.find_unused_port() port = socket_helper.find_unused_port()
with socket.create_server(("", port)) as sock: with socket.create_server(("", port)) as sock:
self.echo_server(sock) self.echo_server(sock)
self.echo_client(("127.0.0.1", port), socket.AF_INET) self.echo_client(("127.0.0.1", port), socket.AF_INET)
@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test') @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test')
def test_tcp6(self): def test_tcp6(self):
port = support.find_unused_port() port = socket_helper.find_unused_port()
with socket.create_server(("", port), with socket.create_server(("", port),
family=socket.AF_INET6) as sock: family=socket.AF_INET6) as sock:
self.echo_server(sock) self.echo_server(sock)
@ -6428,9 +6429,9 @@ class CreateServerFunctionalTest(unittest.TestCase):
@unittest.skipIf(not socket.has_dualstack_ipv6(), @unittest.skipIf(not socket.has_dualstack_ipv6(),
"dualstack_ipv6 not supported") "dualstack_ipv6 not supported")
@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test') @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test')
def test_dual_stack_client_v4(self): def test_dual_stack_client_v4(self):
port = support.find_unused_port() port = socket_helper.find_unused_port()
with socket.create_server(("", port), family=socket.AF_INET6, with socket.create_server(("", port), family=socket.AF_INET6,
dualstack_ipv6=True) as sock: dualstack_ipv6=True) as sock:
self.echo_server(sock) self.echo_server(sock)
@ -6438,9 +6439,9 @@ class CreateServerFunctionalTest(unittest.TestCase):
@unittest.skipIf(not socket.has_dualstack_ipv6(), @unittest.skipIf(not socket.has_dualstack_ipv6(),
"dualstack_ipv6 not supported") "dualstack_ipv6 not supported")
@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test') @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test')
def test_dual_stack_client_v6(self): def test_dual_stack_client_v6(self):
port = support.find_unused_port() port = socket_helper.find_unused_port()
with socket.create_server(("", port), family=socket.AF_INET6, with socket.create_server(("", port), family=socket.AF_INET6,
dualstack_ipv6=True) as sock: dualstack_ipv6=True) as sock:
self.echo_server(sock) self.echo_server(sock)

View File

@ -15,12 +15,13 @@ import socketserver
import test.support import test.support
from test.support import reap_children, reap_threads, verbose from test.support import reap_children, reap_threads, verbose
from test.support import socket_helper
test.support.requires("network") test.support.requires("network")
TEST_STR = b"hello world\n" TEST_STR = b"hello world\n"
HOST = test.support.HOST HOST = socket_helper.HOST
HAVE_UNIX_SOCKETS = hasattr(socket, "AF_UNIX") HAVE_UNIX_SOCKETS = hasattr(socket, "AF_UNIX")
requires_unix_sockets = unittest.skipUnless(HAVE_UNIX_SOCKETS, requires_unix_sockets = unittest.skipUnless(HAVE_UNIX_SOCKETS,

View File

@ -4,6 +4,7 @@ import sys
import unittest import unittest
import unittest.mock import unittest.mock
from test import support from test import support
from test.support import socket_helper
import socket import socket
import select import select
import time import time
@ -33,7 +34,7 @@ Py_DEBUG = hasattr(sys, 'gettotalrefcount')
Py_DEBUG_WIN32 = Py_DEBUG and sys.platform == 'win32' Py_DEBUG_WIN32 = Py_DEBUG and sys.platform == 'win32'
PROTOCOLS = sorted(ssl._PROTOCOL_NAMES) PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
HOST = support.HOST HOST = socket_helper.HOST
IS_LIBRESSL = ssl.OPENSSL_VERSION.startswith('LibreSSL') IS_LIBRESSL = ssl.OPENSSL_VERSION.startswith('LibreSSL')
IS_OPENSSL_1_1_0 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 0) IS_OPENSSL_1_1_0 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 0)
IS_OPENSSL_1_1_1 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 1) IS_OPENSSL_1_1_1 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 1)
@ -762,7 +763,7 @@ class BasicSocketTests(unittest.TestCase):
fail(cert, 'example.net') fail(cert, 'example.net')
# -- IPv6 matching -- # -- IPv6 matching --
if support.IPV6_ENABLED: if socket_helper.IPV6_ENABLED:
cert = {'subject': ((('commonName', 'example.com'),),), cert = {'subject': ((('commonName', 'example.com'),),),
'subjectAltName': ( 'subjectAltName': (
('DNS', 'example.com'), ('DNS', 'example.com'),
@ -845,7 +846,7 @@ class BasicSocketTests(unittest.TestCase):
ssl._inet_paton(invalid) ssl._inet_paton(invalid)
for ipaddr in ['127.0.0.1', '192.168.0.1']: for ipaddr in ['127.0.0.1', '192.168.0.1']:
self.assertTrue(ssl._inet_paton(ipaddr)) self.assertTrue(ssl._inet_paton(ipaddr))
if support.IPV6_ENABLED: if socket_helper.IPV6_ENABLED:
for ipaddr in ['::1', '2001:db8:85a3::8a2e:370:7334']: for ipaddr in ['::1', '2001:db8:85a3::8a2e:370:7334']:
self.assertTrue(ssl._inet_paton(ipaddr)) self.assertTrue(ssl._inet_paton(ipaddr))
@ -1073,7 +1074,7 @@ class BasicSocketTests(unittest.TestCase):
def test_connect_ex_error(self): def test_connect_ex_error(self):
server = socket.socket(socket.AF_INET) server = socket.socket(socket.AF_INET)
self.addCleanup(server.close) self.addCleanup(server.close)
port = support.bind_port(server) # Reserve port but don't listen port = socket_helper.bind_port(server) # Reserve port but don't listen
s = test_wrap_socket(socket.socket(socket.AF_INET), s = test_wrap_socket(socket.socket(socket.AF_INET),
cert_reqs=ssl.CERT_REQUIRED) cert_reqs=ssl.CERT_REQUIRED)
self.addCleanup(s.close) self.addCleanup(s.close)
@ -2256,7 +2257,7 @@ class NetworkedTests(unittest.TestCase):
self.skipTest("REMOTE_HOST responded too quickly") self.skipTest("REMOTE_HOST responded too quickly")
self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK)) self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
@unittest.skipUnless(support.IPV6_ENABLED, 'Needs IPv6') @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'Needs IPv6')
def test_get_server_certificate_ipv6(self): def test_get_server_certificate_ipv6(self):
with support.transient_internet('ipv6.google.com'): with support.transient_internet('ipv6.google.com'):
_test_get_server_certificate(self, 'ipv6.google.com', 443) _test_get_server_certificate(self, 'ipv6.google.com', 443)
@ -2511,7 +2512,7 @@ class ThreadedEchoServer(threading.Thread):
self.connectionchatty = connectionchatty self.connectionchatty = connectionchatty
self.starttls_server = starttls_server self.starttls_server = starttls_server
self.sock = socket.socket() self.sock = socket.socket()
self.port = support.bind_port(self.sock) self.port = socket_helper.bind_port(self.sock)
self.flag = None self.flag = None
self.active = False self.active = False
self.selected_npn_protocols = [] self.selected_npn_protocols = []
@ -2624,7 +2625,7 @@ class AsyncoreEchoServer(threading.Thread):
def __init__(self, certfile): def __init__(self, certfile):
self.certfile = certfile self.certfile = certfile
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.port = support.bind_port(sock, '') self.port = socket_helper.bind_port(sock, '')
asyncore.dispatcher.__init__(self, sock) asyncore.dispatcher.__init__(self, sock)
self.listen(5) self.listen(5)
@ -3144,7 +3145,7 @@ class ThreadedTests(unittest.TestCase):
listener_gone = threading.Event() listener_gone = threading.Event()
s = socket.socket() s = socket.socket()
port = support.bind_port(s, HOST) port = socket_helper.bind_port(s, HOST)
# `listener` runs in a thread. It sits in an accept() until # `listener` runs in a thread. It sits in an accept() until
# the main thread connects. Then it rudely closes the socket, # the main thread connects. Then it rudely closes the socket,
@ -3640,7 +3641,7 @@ class ThreadedTests(unittest.TestCase):
# Issue #5103: SSL handshake must respect the socket timeout # Issue #5103: SSL handshake must respect the socket timeout
server = socket.socket(socket.AF_INET) server = socket.socket(socket.AF_INET)
host = "127.0.0.1" host = "127.0.0.1"
port = support.bind_port(server) port = socket_helper.bind_port(server)
started = threading.Event() started = threading.Event()
finish = False finish = False
@ -3694,7 +3695,7 @@ class ThreadedTests(unittest.TestCase):
context.load_cert_chain(SIGNED_CERTFILE) context.load_cert_chain(SIGNED_CERTFILE)
server = socket.socket(socket.AF_INET) server = socket.socket(socket.AF_INET)
host = "127.0.0.1" host = "127.0.0.1"
port = support.bind_port(server) port = socket_helper.bind_port(server)
server = context.wrap_socket(server, server_side=True) server = context.wrap_socket(server, server_side=True)
self.assertTrue(server.server_side) self.assertTrue(server.server_side)

View File

@ -2,8 +2,8 @@ import unittest
import os import os
import socket import socket
import sys import sys
from test.support import (TESTFN, import_fresh_module, from test.support import socket_helper
skip_unless_bind_unix_socket) from test.support import TESTFN, import_fresh_module
c_stat = import_fresh_module('stat', fresh=['_stat']) c_stat = import_fresh_module('stat', fresh=['_stat'])
py_stat = import_fresh_module('stat', blocked=['_stat']) py_stat = import_fresh_module('stat', blocked=['_stat'])
@ -193,7 +193,7 @@ class TestFilemode:
self.assertS_IS("BLK", st_mode) self.assertS_IS("BLK", st_mode)
break break
@skip_unless_bind_unix_socket @socket_helper.skip_unless_bind_unix_socket
def test_socket(self): def test_socket(self):
with socket.socket(socket.AF_UNIX) as s: with socket.socket(socket.AF_UNIX) as s:
s.bind(TESTFN) s.bind(TESTFN)

View File

@ -14,6 +14,7 @@ import time
import unittest import unittest
from test import support from test import support
from test.support import script_helper from test.support import script_helper
from test.support import socket_helper
TESTFN = support.TESTFN TESTFN = support.TESTFN
@ -91,17 +92,17 @@ class TestSupport(unittest.TestCase):
support.rmtree('__pycache__') support.rmtree('__pycache__')
def test_HOST(self): def test_HOST(self):
s = socket.create_server((support.HOST, 0)) s = socket.create_server((socket_helper.HOST, 0))
s.close() s.close()
def test_find_unused_port(self): def test_find_unused_port(self):
port = support.find_unused_port() port = socket_helper.find_unused_port()
s = socket.create_server((support.HOST, port)) s = socket.create_server((socket_helper.HOST, port))
s.close() s.close()
def test_bind_port(self): def test_bind_port(self):
s = socket.socket() s = socket.socket()
support.bind_port(s) socket_helper.bind_port(s)
s.listen() s.listen()
s.close() s.close()

View File

@ -5,9 +5,10 @@ import threading
import contextlib import contextlib
from test import support from test import support
from test.support import socket_helper
import unittest import unittest
HOST = support.HOST HOST = socket_helper.HOST
def server(evt, serv): def server(evt, serv):
serv.listen() serv.listen()
@ -26,7 +27,7 @@ class GeneralTests(unittest.TestCase):
self.evt = threading.Event() self.evt = threading.Event()
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.settimeout(60) # Safety net. Look issue 11812 self.sock.settimeout(60) # Safety net. Look issue 11812
self.port = support.bind_port(self.sock) self.port = socket_helper.bind_port(self.sock)
self.thread = threading.Thread(target=server, args=(self.evt,self.sock)) self.thread = threading.Thread(target=server, args=(self.evt,self.sock))
self.thread.setDaemon(True) self.thread.setDaemon(True)
self.thread.start() self.thread.start()

View File

@ -3,6 +3,7 @@
import functools import functools
import unittest import unittest
from test import support from test import support
from test.support import socket_helper
# This requires the 'network' resource as given on the regrtest command line. # This requires the 'network' resource as given on the regrtest command line.
skip_expected = not support.is_resource_enabled('network') skip_expected = not support.is_resource_enabled('network')
@ -110,7 +111,7 @@ class TimeoutTestCase(unittest.TestCase):
# solution. # solution.
fuzz = 2.0 fuzz = 2.0
localhost = support.HOST localhost = socket_helper.HOST
def setUp(self): def setUp(self):
raise NotImplementedError() raise NotImplementedError()
@ -240,14 +241,14 @@ class TCPTimeoutTestCase(TimeoutTestCase):
def testAcceptTimeout(self): def testAcceptTimeout(self):
# Test accept() timeout # Test accept() timeout
support.bind_port(self.sock, self.localhost) socket_helper.bind_port(self.sock, self.localhost)
self.sock.listen() self.sock.listen()
self._sock_operation(1, 1.5, 'accept') self._sock_operation(1, 1.5, 'accept')
def testSend(self): def testSend(self):
# Test send() timeout # Test send() timeout
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as serv: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as serv:
support.bind_port(serv, self.localhost) socket_helper.bind_port(serv, self.localhost)
serv.listen() serv.listen()
self.sock.connect(serv.getsockname()) self.sock.connect(serv.getsockname())
# Send a lot of data in order to bypass buffering in the TCP stack. # Send a lot of data in order to bypass buffering in the TCP stack.
@ -256,7 +257,7 @@ class TCPTimeoutTestCase(TimeoutTestCase):
def testSendto(self): def testSendto(self):
# Test sendto() timeout # Test sendto() timeout
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as serv: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as serv:
support.bind_port(serv, self.localhost) socket_helper.bind_port(serv, self.localhost)
serv.listen() serv.listen()
self.sock.connect(serv.getsockname()) self.sock.connect(serv.getsockname())
# The address argument is ignored since we already connected. # The address argument is ignored since we already connected.
@ -266,7 +267,7 @@ class TCPTimeoutTestCase(TimeoutTestCase):
def testSendall(self): def testSendall(self):
# Test sendall() timeout # Test sendall() timeout
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as serv: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as serv:
support.bind_port(serv, self.localhost) socket_helper.bind_port(serv, self.localhost)
serv.listen() serv.listen()
self.sock.connect(serv.getsockname()) self.sock.connect(serv.getsockname())
# Send a lot of data in order to bypass buffering in the TCP stack. # Send a lot of data in order to bypass buffering in the TCP stack.
@ -285,7 +286,7 @@ class UDPTimeoutTestCase(TimeoutTestCase):
def testRecvfromTimeout(self): def testRecvfromTimeout(self):
# Test recvfrom() timeout # Test recvfrom() timeout
# Prevent "Address already in use" socket exceptions # Prevent "Address already in use" socket exceptions
support.bind_port(self.sock, self.localhost) socket_helper.bind_port(self.sock, self.localhost)
self._sock_operation(1, 1.5, 'recvfrom', 1024) self._sock_operation(1, 1.5, 'recvfrom', 1024)

View File

@ -1,5 +1,6 @@
from unittest import mock from unittest import mock
from test import support from test import support
from test.support import socket_helper
from test.test_httpservers import NoLogRequestHandler from test.test_httpservers import NoLogRequestHandler
from unittest import TestCase from unittest import TestCase
from wsgiref.util import setup_testing_defaults from wsgiref.util import setup_testing_defaults
@ -263,7 +264,7 @@ class IntegrationTests(TestCase):
class WsgiHandler(NoLogRequestHandler, WSGIRequestHandler): class WsgiHandler(NoLogRequestHandler, WSGIRequestHandler):
pass pass
server = make_server(support.HOST, 0, app, handler_class=WsgiHandler) server = make_server(socket_helper.HOST, 0, app, handler_class=WsgiHandler)
self.addCleanup(server.server_close) self.addCleanup(server.server_close)
interrupted = threading.Event() interrupted = threading.Event()

View File

@ -15,6 +15,7 @@ import re
import io import io
import contextlib import contextlib
from test import support from test import support
from test.support import socket_helper
from test.support import ALWAYS_EQ, LARGEST, SMALLEST from test.support import ALWAYS_EQ, LARGEST, SMALLEST
try: try:
@ -334,7 +335,7 @@ class XMLRPCTestCase(unittest.TestCase):
server.handle_request() # First request and attempt at second server.handle_request() # First request and attempt at second
server.handle_request() # Retried second request server.handle_request() # Retried second request
server = http.server.HTTPServer((support.HOST, 0), RequestHandler) server = http.server.HTTPServer((socket_helper.HOST, 0), RequestHandler)
self.addCleanup(server.server_close) self.addCleanup(server.server_close)
thread = threading.Thread(target=run_server) thread = threading.Thread(target=run_server)
thread.start() thread.start()