bpo-39850: Add support for abstract sockets in multiprocessing (GH-18866)
This commit is contained in:
parent
dccd41e29f
commit
6012f30bef
|
@ -73,6 +73,11 @@ def arbitrary_address(family):
|
||||||
if family == 'AF_INET':
|
if family == 'AF_INET':
|
||||||
return ('localhost', 0)
|
return ('localhost', 0)
|
||||||
elif family == 'AF_UNIX':
|
elif family == 'AF_UNIX':
|
||||||
|
# Prefer abstract sockets if possible to avoid problems with the address
|
||||||
|
# size. When coding portable applications, some implementations have
|
||||||
|
# sun_path as short as 92 bytes in the sockaddr_un struct.
|
||||||
|
if util.abstract_sockets_supported:
|
||||||
|
return f"\0listener-{os.getpid()}-{next(_mmap_counter)}"
|
||||||
return tempfile.mktemp(prefix='listener-', dir=util.get_temp_dir())
|
return tempfile.mktemp(prefix='listener-', dir=util.get_temp_dir())
|
||||||
elif family == 'AF_PIPE':
|
elif family == 'AF_PIPE':
|
||||||
return tempfile.mktemp(prefix=r'\\.\pipe\pyc-%d-%d-' %
|
return tempfile.mktemp(prefix=r'\\.\pipe\pyc-%d-%d-' %
|
||||||
|
@ -102,7 +107,7 @@ def address_type(address):
|
||||||
return 'AF_INET'
|
return 'AF_INET'
|
||||||
elif type(address) is str and address.startswith('\\\\'):
|
elif type(address) is str and address.startswith('\\\\'):
|
||||||
return 'AF_PIPE'
|
return 'AF_PIPE'
|
||||||
elif type(address) is str:
|
elif type(address) is str or util.is_abstract_socket_namespace(address):
|
||||||
return 'AF_UNIX'
|
return 'AF_UNIX'
|
||||||
else:
|
else:
|
||||||
raise ValueError('address type of %r unrecognized' % address)
|
raise ValueError('address type of %r unrecognized' % address)
|
||||||
|
@ -597,7 +602,8 @@ class SocketListener(object):
|
||||||
self._family = family
|
self._family = family
|
||||||
self._last_accepted = None
|
self._last_accepted = None
|
||||||
|
|
||||||
if family == 'AF_UNIX':
|
if family == 'AF_UNIX' and not util.is_abstract_socket_namespace(address):
|
||||||
|
# Linux abstract socket namespaces do not need to be explicitly unlinked
|
||||||
self._unlink = util.Finalize(
|
self._unlink = util.Finalize(
|
||||||
self, os.unlink, args=(address,), exitpriority=0
|
self, os.unlink, args=(address,), exitpriority=0
|
||||||
)
|
)
|
||||||
|
|
|
@ -55,7 +55,8 @@ class ForkServer(object):
|
||||||
os.waitpid(self._forkserver_pid, 0)
|
os.waitpid(self._forkserver_pid, 0)
|
||||||
self._forkserver_pid = None
|
self._forkserver_pid = None
|
||||||
|
|
||||||
os.unlink(self._forkserver_address)
|
if not util.is_abstract_socket_namespace(self._forkserver_address):
|
||||||
|
os.unlink(self._forkserver_address)
|
||||||
self._forkserver_address = None
|
self._forkserver_address = None
|
||||||
|
|
||||||
def set_forkserver_preload(self, modules_names):
|
def set_forkserver_preload(self, modules_names):
|
||||||
|
@ -135,7 +136,8 @@ class ForkServer(object):
|
||||||
with socket.socket(socket.AF_UNIX) as listener:
|
with socket.socket(socket.AF_UNIX) as listener:
|
||||||
address = connection.arbitrary_address('AF_UNIX')
|
address = connection.arbitrary_address('AF_UNIX')
|
||||||
listener.bind(address)
|
listener.bind(address)
|
||||||
os.chmod(address, 0o600)
|
if not util.is_abstract_socket_namespace(address):
|
||||||
|
os.chmod(address, 0o600)
|
||||||
listener.listen()
|
listener.listen()
|
||||||
|
|
||||||
# all client processes own the write end of the "alive" pipe;
|
# all client processes own the write end of the "alive" pipe;
|
||||||
|
|
|
@ -1262,8 +1262,12 @@ if HAS_SHMEM:
|
||||||
|
|
||||||
def __init__(self, *args, **kwargs):
|
def __init__(self, *args, **kwargs):
|
||||||
Server.__init__(self, *args, **kwargs)
|
Server.__init__(self, *args, **kwargs)
|
||||||
|
address = self.address
|
||||||
|
# The address of Linux abstract namespaces can be bytes
|
||||||
|
if isinstance(address, bytes):
|
||||||
|
address = os.fsdecode(address)
|
||||||
self.shared_memory_context = \
|
self.shared_memory_context = \
|
||||||
_SharedMemoryTracker(f"shmm_{self.address}_{getpid()}")
|
_SharedMemoryTracker(f"shm_{address}_{getpid()}")
|
||||||
util.debug(f"SharedMemoryServer started by pid {getpid()}")
|
util.debug(f"SharedMemoryServer started by pid {getpid()}")
|
||||||
|
|
||||||
def create(self, c, typeid, /, *args, **kwargs):
|
def create(self, c, typeid, /, *args, **kwargs):
|
||||||
|
|
|
@ -102,6 +102,29 @@ def log_to_stderr(level=None):
|
||||||
_log_to_stderr = True
|
_log_to_stderr = True
|
||||||
return _logger
|
return _logger
|
||||||
|
|
||||||
|
|
||||||
|
# Abstract socket support
|
||||||
|
|
||||||
|
def _platform_supports_abstract_sockets():
|
||||||
|
if sys.platform == "linux":
|
||||||
|
return True
|
||||||
|
if hasattr(sys, 'getandroidapilevel'):
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def is_abstract_socket_namespace(address):
|
||||||
|
if not address:
|
||||||
|
return False
|
||||||
|
if isinstance(address, bytes):
|
||||||
|
return address[0] == 0
|
||||||
|
elif isinstance(address, str):
|
||||||
|
return address[0] == "\0"
|
||||||
|
raise TypeError('address type of {address!r} unrecognized')
|
||||||
|
|
||||||
|
|
||||||
|
abstract_sockets_supported = _platform_supports_abstract_sockets()
|
||||||
|
|
||||||
#
|
#
|
||||||
# Function returning a temp directory which will be removed on exit
|
# Function returning a temp directory which will be removed on exit
|
||||||
#
|
#
|
||||||
|
|
|
@ -3274,6 +3274,19 @@ class _TestListener(BaseTestCase):
|
||||||
if self.TYPE == 'processes':
|
if self.TYPE == 'processes':
|
||||||
self.assertRaises(OSError, l.accept)
|
self.assertRaises(OSError, l.accept)
|
||||||
|
|
||||||
|
@unittest.skipUnless(util.abstract_sockets_supported,
|
||||||
|
"test needs abstract socket support")
|
||||||
|
def test_abstract_socket(self):
|
||||||
|
with self.connection.Listener("\0something") as listener:
|
||||||
|
with self.connection.Client(listener.address) as client:
|
||||||
|
with listener.accept() as d:
|
||||||
|
client.send(1729)
|
||||||
|
self.assertEqual(d.recv(), 1729)
|
||||||
|
|
||||||
|
if self.TYPE == 'processes':
|
||||||
|
self.assertRaises(OSError, listener.accept)
|
||||||
|
|
||||||
|
|
||||||
class _TestListenerClient(BaseTestCase):
|
class _TestListenerClient(BaseTestCase):
|
||||||
|
|
||||||
ALLOWED_TYPES = ('processes', 'threads')
|
ALLOWED_TYPES = ('processes', 'threads')
|
||||||
|
|
|
@ -0,0 +1,5 @@
|
||||||
|
:mod:`multiprocessing` now supports abstract socket addresses (if abstract sockets
|
||||||
|
are supported in the running platform). When creating arbitrary addresses (like when
|
||||||
|
default-constructing :class:`multiprocessing.connection.Listener` objects) abstract
|
||||||
|
sockets are preferred to avoid the case when the temporary-file-generated address is
|
||||||
|
too large for an AF_UNIX socket address. Patch by Pablo Galindo.
|
Loading…
Reference in New Issue