gh-97514: Authenticate the forkserver control socket. (GH-99309)

This adds authentication to the forkserver control socket. In the past only filesystem permissions protected this socket from code injection into the forkserver process by limiting access to the same UID, which didn't exist when Linux abstract namespace sockets were used (see issue) meaning that any process in the same system network namespace could inject code. We've since stopped using abstract namespace sockets by default, but protecting our control sockets regardless of type is a good idea.

This reuses the HMAC based shared key auth already used by `multiprocessing.connection` sockets for other purposes.

Doing this is useful so that filesystem permissions are not relied upon and trust isn't implied by default between all processes running as the same UID with access to the unix socket.

### pyperformance benchmarks

No significant changes. Including `concurrent_imap` which exercises `multiprocessing.Pool.imap` in that suite.

### Microbenchmarks

This does _slightly_ slow down forkserver use. How much so appears to depend on the platform. Modern platforms and simple platforms are less impacted. This PR adds additional IPC round trips to the control socket to tell forkserver to spawn a new process. Systems with potentially high latency IPC are naturally impacted more.

Typically a 1-4% slowdown on a very targeted process creation microbenchmark, with a worst case overloaded system slowdown of 20%.  No evidence that these slowdowns appear in practical sense.  See the PR for details.
This commit is contained in:
Gregory P. Smith 2024-11-20 08:18:58 -08:00 committed by GitHub
parent 48c50ff1a2
commit 7191b7662e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 141 additions and 16 deletions

View File

@ -421,6 +421,11 @@ multiprocessing
:func:`multiprocessing.get_context` (preferred) or change the default via :func:`multiprocessing.get_context` (preferred) or change the default via
:func:`multiprocessing.set_start_method`. :func:`multiprocessing.set_start_method`.
(Contributed by Gregory P. Smith in :gh:`84559`.) (Contributed by Gregory P. Smith in :gh:`84559`.)
* :mod:`multiprocessing`'s ``"forkserver"`` start method now authenticates
its control socket to avoid solely relying on filesystem permissions
to restrict what other processes could cause the forkserver to spawn workers
and run code.
(Contributed by Gregory P. Smith for :gh:`97514`.)
* The :ref:`multiprocessing proxy objects <multiprocessing-proxy_objects>` * The :ref:`multiprocessing proxy objects <multiprocessing-proxy_objects>`
for *list* and *dict* types gain previously overlooked missing methods: for *list* and *dict* types gain previously overlooked missing methods:

View File

@ -181,6 +181,10 @@ class _ConnectionBase:
finally: finally:
self._handle = None self._handle = None
def _detach(self):
"""Stop managing the underlying file descriptor or handle."""
self._handle = None
def send_bytes(self, buf, offset=0, size=None): def send_bytes(self, buf, offset=0, size=None):
"""Send the bytes data from a bytes-like object""" """Send the bytes data from a bytes-like object"""
self._check_closed() self._check_closed()

View File

@ -9,6 +9,7 @@ import sys
import threading import threading
import warnings import warnings
from . import AuthenticationError
from . import connection from . import connection
from . import process from . import process
from .context import reduction from .context import reduction
@ -25,6 +26,7 @@ __all__ = ['ensure_running', 'get_inherited_fds', 'connect_to_new_process',
MAXFDS_TO_SEND = 256 MAXFDS_TO_SEND = 256
SIGNED_STRUCT = struct.Struct('q') # large enough for pid_t SIGNED_STRUCT = struct.Struct('q') # large enough for pid_t
_AUTHKEY_LEN = 32 # <= PIPEBUF so it fits a single write to an empty pipe.
# #
# Forkserver class # Forkserver class
@ -33,6 +35,7 @@ SIGNED_STRUCT = struct.Struct('q') # large enough for pid_t
class ForkServer(object): class ForkServer(object):
def __init__(self): def __init__(self):
self._forkserver_authkey = None
self._forkserver_address = None self._forkserver_address = None
self._forkserver_alive_fd = None self._forkserver_alive_fd = None
self._forkserver_pid = None self._forkserver_pid = None
@ -59,6 +62,7 @@ class ForkServer(object):
if not util.is_abstract_socket_namespace(self._forkserver_address): if not util.is_abstract_socket_namespace(self._forkserver_address):
os.unlink(self._forkserver_address) os.unlink(self._forkserver_address)
self._forkserver_address = None self._forkserver_address = None
self._forkserver_authkey = None
def set_forkserver_preload(self, modules_names): def set_forkserver_preload(self, modules_names):
'''Set list of module names to try to load in forkserver process.''' '''Set list of module names to try to load in forkserver process.'''
@ -83,6 +87,7 @@ class ForkServer(object):
process data. process data.
''' '''
self.ensure_running() self.ensure_running()
assert self._forkserver_authkey
if len(fds) + 4 >= MAXFDS_TO_SEND: if len(fds) + 4 >= MAXFDS_TO_SEND:
raise ValueError('too many fds') raise ValueError('too many fds')
with socket.socket(socket.AF_UNIX) as client: with socket.socket(socket.AF_UNIX) as client:
@ -93,6 +98,18 @@ class ForkServer(object):
resource_tracker.getfd()] resource_tracker.getfd()]
allfds += fds allfds += fds
try: try:
client.setblocking(True)
wrapped_client = connection.Connection(client.fileno())
# The other side of this exchange happens in the child as
# implemented in main().
try:
connection.answer_challenge(
wrapped_client, self._forkserver_authkey)
connection.deliver_challenge(
wrapped_client, self._forkserver_authkey)
finally:
wrapped_client._detach()
del wrapped_client
reduction.sendfds(client, allfds) reduction.sendfds(client, allfds)
return parent_r, parent_w return parent_r, parent_w
except: except:
@ -120,6 +137,7 @@ class ForkServer(object):
return return
# dead, launch it again # dead, launch it again
os.close(self._forkserver_alive_fd) os.close(self._forkserver_alive_fd)
self._forkserver_authkey = None
self._forkserver_address = None self._forkserver_address = None
self._forkserver_alive_fd = None self._forkserver_alive_fd = None
self._forkserver_pid = None self._forkserver_pid = None
@ -130,9 +148,9 @@ class ForkServer(object):
if self._preload_modules: if self._preload_modules:
desired_keys = {'main_path', 'sys_path'} desired_keys = {'main_path', 'sys_path'}
data = spawn.get_preparation_data('ignore') data = spawn.get_preparation_data('ignore')
data = {x: y for x, y in data.items() if x in desired_keys} main_kws = {x: y for x, y in data.items() if x in desired_keys}
else: else:
data = {} main_kws = {}
with socket.socket(socket.AF_UNIX) as listener: with socket.socket(socket.AF_UNIX) as listener:
address = connection.arbitrary_address('AF_UNIX') address = connection.arbitrary_address('AF_UNIX')
@ -144,19 +162,31 @@ class ForkServer(object):
# all client processes own the write end of the "alive" pipe; # all client processes own the write end of the "alive" pipe;
# when they all terminate the read end becomes ready. # when they all terminate the read end becomes ready.
alive_r, alive_w = os.pipe() alive_r, alive_w = os.pipe()
# A short lived pipe to initialize the forkserver authkey.
authkey_r, authkey_w = os.pipe()
try: try:
fds_to_pass = [listener.fileno(), alive_r] fds_to_pass = [listener.fileno(), alive_r, authkey_r]
main_kws['authkey_r'] = authkey_r
cmd %= (listener.fileno(), alive_r, self._preload_modules, cmd %= (listener.fileno(), alive_r, self._preload_modules,
data) main_kws)
exe = spawn.get_executable() exe = spawn.get_executable()
args = [exe] + util._args_from_interpreter_flags() args = [exe] + util._args_from_interpreter_flags()
args += ['-c', cmd] args += ['-c', cmd]
pid = util.spawnv_passfds(exe, args, fds_to_pass) pid = util.spawnv_passfds(exe, args, fds_to_pass)
except: except:
os.close(alive_w) os.close(alive_w)
os.close(authkey_w)
raise raise
finally: finally:
os.close(alive_r) os.close(alive_r)
os.close(authkey_r)
# Authenticate our control socket to prevent access from
# processes we have not shared this key with.
try:
self._forkserver_authkey = os.urandom(_AUTHKEY_LEN)
os.write(authkey_w, self._forkserver_authkey)
finally:
os.close(authkey_w)
self._forkserver_address = address self._forkserver_address = address
self._forkserver_alive_fd = alive_w self._forkserver_alive_fd = alive_w
self._forkserver_pid = pid self._forkserver_pid = pid
@ -165,8 +195,18 @@ class ForkServer(object):
# #
# #
def main(listener_fd, alive_r, preload, main_path=None, sys_path=None): def main(listener_fd, alive_r, preload, main_path=None, sys_path=None,
'''Run forkserver.''' *, authkey_r=None):
"""Run forkserver."""
if authkey_r is not None:
try:
authkey = os.read(authkey_r, _AUTHKEY_LEN)
assert len(authkey) == _AUTHKEY_LEN, f'{len(authkey)} < {_AUTHKEY_LEN}'
finally:
os.close(authkey_r)
else:
authkey = b''
if preload: if preload:
if sys_path is not None: if sys_path is not None:
sys.path[:] = sys_path sys.path[:] = sys_path
@ -257,8 +297,24 @@ def main(listener_fd, alive_r, preload, main_path=None, sys_path=None):
if listener in rfds: if listener in rfds:
# Incoming fork request # Incoming fork request
with listener.accept()[0] as s: with listener.accept()[0] as s:
# Receive fds from client try:
fds = reduction.recvfds(s, MAXFDS_TO_SEND + 1) if authkey:
wrapped_s = connection.Connection(s.fileno())
# The other side of this exchange happens in
# in connect_to_new_process().
try:
connection.deliver_challenge(
wrapped_s, authkey)
connection.answer_challenge(
wrapped_s, authkey)
finally:
wrapped_s._detach()
del wrapped_s
# Receive fds from client
fds = reduction.recvfds(s, MAXFDS_TO_SEND + 1)
except (EOFError, BrokenPipeError, AuthenticationError):
s.close()
continue
if len(fds) > MAXFDS_TO_SEND: if len(fds) > MAXFDS_TO_SEND:
raise RuntimeError( raise RuntimeError(
"Too many ({0:n}) fds to send".format( "Too many ({0:n}) fds to send".format(

View File

@ -139,15 +139,12 @@ else:
__all__ += ['DupFd', 'sendfds', 'recvfds'] __all__ += ['DupFd', 'sendfds', 'recvfds']
import array import array
# On MacOSX we should acknowledge receipt of fds -- see Issue14669
ACKNOWLEDGE = sys.platform == 'darwin'
def sendfds(sock, fds): def sendfds(sock, fds):
'''Send an array of fds over an AF_UNIX socket.''' '''Send an array of fds over an AF_UNIX socket.'''
fds = array.array('i', fds) fds = array.array('i', fds)
msg = bytes([len(fds) % 256]) msg = bytes([len(fds) % 256])
sock.sendmsg([msg], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, fds)]) sock.sendmsg([msg], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, fds)])
if ACKNOWLEDGE and sock.recv(1) != b'A': if sock.recv(1) != b'A':
raise RuntimeError('did not receive acknowledgement of fd') raise RuntimeError('did not receive acknowledgement of fd')
def recvfds(sock, size): def recvfds(sock, size):
@ -158,8 +155,11 @@ else:
if not msg and not ancdata: if not msg and not ancdata:
raise EOFError raise EOFError
try: try:
if ACKNOWLEDGE: # We send/recv an Ack byte after the fds to work around an old
sock.send(b'A') # macOS bug; it isn't clear if this is still required but it
# makes unit testing fd sending easier.
# See: https://github.com/python/cpython/issues/58874
sock.send(b'A') # Acknowledge
if len(ancdata) != 1: if len(ancdata) != 1:
raise RuntimeError('received %d items of ancdata' % raise RuntimeError('received %d items of ancdata' %
len(ancdata)) len(ancdata))

View File

@ -846,8 +846,8 @@ class _TestProcess(BaseTestCase):
finally: finally:
setattr(sys, stream_name, old_stream) setattr(sys, stream_name, old_stream)
@classmethod @staticmethod
def _sleep_and_set_event(self, evt, delay=0.0): def _sleep_and_set_event(evt, delay=0.0):
time.sleep(delay) time.sleep(delay)
evt.set() evt.set()
@ -898,6 +898,56 @@ class _TestProcess(BaseTestCase):
if os.name != 'nt': if os.name != 'nt':
self.check_forkserver_death(signal.SIGKILL) self.check_forkserver_death(signal.SIGKILL)
def test_forkserver_auth_is_enabled(self):
if self.TYPE == "threads":
self.skipTest(f"test not appropriate for {self.TYPE}")
if multiprocessing.get_start_method() != "forkserver":
self.skipTest("forkserver start method specific")
forkserver = multiprocessing.forkserver._forkserver
forkserver.ensure_running()
self.assertTrue(forkserver._forkserver_pid)
authkey = forkserver._forkserver_authkey
self.assertTrue(authkey)
self.assertGreater(len(authkey), 15)
addr = forkserver._forkserver_address
self.assertTrue(addr)
# Demonstrate that a raw auth handshake, as Client performs, does not
# raise an error.
client = multiprocessing.connection.Client(addr, authkey=authkey)
client.close()
# That worked, now launch a quick process.
proc = self.Process(target=sys.exit)
proc.start()
proc.join()
self.assertEqual(proc.exitcode, 0)
def test_forkserver_without_auth_fails(self):
if self.TYPE == "threads":
self.skipTest(f"test not appropriate for {self.TYPE}")
if multiprocessing.get_start_method() != "forkserver":
self.skipTest("forkserver start method specific")
forkserver = multiprocessing.forkserver._forkserver
forkserver.ensure_running()
self.assertTrue(forkserver._forkserver_pid)
authkey_len = len(forkserver._forkserver_authkey)
with unittest.mock.patch.object(
forkserver, '_forkserver_authkey', None):
# With an incorrect authkey we should get an auth rejection
# rather than the above protocol error.
forkserver._forkserver_authkey = b'T' * authkey_len
proc = self.Process(target=sys.exit)
with self.assertRaises(multiprocessing.AuthenticationError):
proc.start()
del proc
# authkey restored, launching processes should work again.
proc = self.Process(target=sys.exit)
proc.start()
proc.join()
# #
# #

View File

@ -0,0 +1,10 @@
Authentication was added to the :mod:`multiprocessing` forkserver start
method control socket so that only processes with the authentication key
generated by the process that spawned the forkserver can control it. This
is an enhancement over the other :gh:`97514` fixes so that access is no
longer limited only by filesystem permissions.
The file descriptor exchange of control pipes with the forked worker process
now requires an explicit acknowledgement byte to be sent over the socket after
the exchange on all forkserver supporting platforms. That makes testing the
above much easier.