gh-122133: Rework pure Python socketpair tests to avoid use of importlib.reload. (#122493)

Co-authored-by: Gregory P. Smith <greg@krypto.org>
This commit is contained in:
Russell Keith-Magee 2024-07-31 16:24:15 +08:00 committed by GitHub
parent d01fd24051
commit f071f01b7b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 64 additions and 77 deletions

View File

@ -592,30 +592,10 @@ if hasattr(_socket.socket, "share"):
return socket(0, 0, 0, info)
__all__.append("fromshare")
if hasattr(_socket, "socketpair"):
def socketpair(family=None, type=SOCK_STREAM, proto=0):
"""socketpair([family[, type[, proto]]]) -> (socket object, socket object)
Create a pair of socket objects from the sockets returned by the platform
socketpair() function.
The arguments are the same as for socket() except the default family is
AF_UNIX if defined on the platform; otherwise, the default is AF_INET.
"""
if family is None:
try:
family = AF_UNIX
except NameError:
family = AF_INET
a, b = _socket.socketpair(family, type, proto)
a = socket(family, type, proto, a.detach())
b = socket(family, type, proto, b.detach())
return a, b
else:
# Origin: https://gist.github.com/4325783, by Geert Jansen. Public domain.
def socketpair(family=AF_INET, type=SOCK_STREAM, proto=0):
# Origin: https://gist.github.com/4325783, by Geert Jansen. Public domain.
# This is used if _socket doesn't natively provide socketpair. It's
# always defined so that it can be patched in for testing purposes.
def _fallback_socketpair(family=AF_INET, type=SOCK_STREAM, proto=0):
if family == AF_INET:
host = _LOCALHOST
elif family == AF_INET6:
@ -668,6 +648,21 @@ else:
raise
return (ssock, csock)
if hasattr(_socket, "socketpair"):
def socketpair(family=None, type=SOCK_STREAM, proto=0):
if family is None:
try:
family = AF_UNIX
except NameError:
family = AF_INET
a, b = _socket.socketpair(family, type, proto)
a = socket(family, type, proto, a.detach())
b = socket(family, type, proto, b.detach())
return a, b
else:
socketpair = _fallback_socketpair
__all__.append("socketpair")
socketpair.__doc__ = """socketpair([family[, type[, proto]]]) -> (socket object, socket object)

View File

@ -4861,7 +4861,6 @@ class BasicSocketPairTest(SocketPairTest):
class PurePythonSocketPairTest(SocketPairTest):
# Explicitly use socketpair AF_INET or AF_INET6 to ensure that is the
# code path we're using regardless platform is the pure python one where
# `_socket.socketpair` does not exist. (AF_INET does not work with
@ -4876,28 +4875,21 @@ class PurePythonSocketPairTest(SocketPairTest):
# Local imports in this class make for easy security fix backporting.
def setUp(self):
import _socket
self._orig_sp = getattr(_socket, 'socketpair', None)
if self._orig_sp is not None:
if hasattr(_socket, "socketpair"):
self._orig_sp = socket.socketpair
# This forces the version using the non-OS provided socketpair
# emulation via an AF_INET socket in Lib/socket.py.
del _socket.socketpair
import importlib
global socket
socket = importlib.reload(socket)
socket.socketpair = socket._fallback_socketpair
else:
pass # This platform already uses the non-OS provided version.
# This platform already uses the non-OS provided version.
self._orig_sp = None
super().setUp()
def tearDown(self):
super().tearDown()
import _socket
if self._orig_sp is not None:
# Restore the default socket.socketpair definition.
_socket.socketpair = self._orig_sp
import importlib
global socket
socket = importlib.reload(socket)
socket.socketpair = self._orig_sp
def test_recv(self):
msg = self.serv.recv(1024)