Issue #18762: Fix EBADF error when using forkserver.

This commit is contained in:
Richard Oudkerk 2013-08-22 11:38:55 +01:00
parent b8c537094d
commit 0718f70131
1 changed files with 25 additions and 30 deletions

View File

@ -23,11 +23,12 @@ __all__ = ['ensure_running', 'get_inherited_fds', 'connect_to_new_process',
MAXFDS_TO_SEND = 256
UNSIGNED_STRUCT = struct.Struct('Q') # large enough for pid_t
_forkserver_address = None
_forkserver_alive_fd = None
_inherited_fds = None
_lock = threading.Lock()
_preload_modules = ['__main__']
#
# Public function
#
@ -56,31 +57,15 @@ def connect_to_new_process(fds):
'''
if len(fds) + 3 >= MAXFDS_TO_SEND:
raise ValueError('too many fds')
address, alive_w = process.current_process()._config['forkserver_info']
with socket.socket(socket.AF_UNIX) as client:
client.connect(address)
client.connect(_forkserver_address)
parent_r, child_w = util.pipe()
child_r, parent_w = util.pipe()
allfds = [child_r, child_w, alive_w]
allfds = [child_r, child_w, _forkserver_alive_fd]
allfds += fds
try:
reduction.sendfds(client, allfds)
return parent_r, parent_w
except OSError:
# XXX This is debugging info for Issue #18762
import fcntl
L = []
for fd in allfds:
try:
flags = fcntl.fcntl(fd, fcntl.F_GETFL)
except OSError as e:
L.append((fd, e))
else:
L.append((fd, flags))
print('*** connect_to_new_process: %r' % L, file=sys.stderr)
os.close(parent_r)
os.close(parent_w)
raise
except:
os.close(parent_r)
os.close(parent_w)
@ -97,12 +82,13 @@ def ensure_running():
process will just reuse the forkserver started by its parent, so
ensure_running() will do nothing.
'''
global _forkserver_address, _forkserver_alive_fd
with _lock:
config = process.current_process()._config
if config.get('forkserver_info') is not None:
if _forkserver_alive_fd is not None:
return
assert all(type(mod) is str for mod in _preload_modules)
config = process.current_process()._config
semaphore_tracker_fd = config['semaphore_tracker_fd']
cmd = ('from multiprocessing.forkserver import main; ' +
'main(%d, %d, %r, **%r)')
@ -122,13 +108,20 @@ def ensure_running():
# all client processes own the write end of the "alive" pipe;
# when they all terminate the read end becomes ready.
alive_r, alive_w = os.pipe()
config['forkserver_info'] = (address, alive_w)
fds_to_pass = [listener.fileno(), alive_r, semaphore_tracker_fd]
cmd %= (listener.fileno(), alive_r, _preload_modules, data)
exe = spawn.get_executable()
args = [exe] + util._args_from_interpreter_flags() + ['-c', cmd]
pid = util.spawnv_passfds(exe, args, fds_to_pass)
alive_r, alive_w = util.pipe()
try:
fds_to_pass = [listener.fileno(), alive_r, semaphore_tracker_fd]
cmd %= (listener.fileno(), alive_r, _preload_modules, data)
exe = spawn.get_executable()
args = [exe] + util._args_from_interpreter_flags() + ['-c', cmd]
pid = util.spawnv_passfds(exe, args, fds_to_pass)
except:
os.close(alive_w)
raise
finally:
os.close(alive_r)
_forkserver_address = address
_forkserver_alive_fd = alive_w
def main(listener_fd, alive_r, preload, main_path=None, sys_path=None):
@ -157,6 +150,8 @@ def main(listener_fd, alive_r, preload, main_path=None, sys_path=None):
# ignoring SIGCHLD means no need to reap zombie processes
handler = signal.signal(signal.SIGCHLD, signal.SIG_IGN)
with socket.socket(socket.AF_UNIX, fileno=listener_fd) as listener:
global _forkserver_address
_forkserver_address = listener.getsockname()
readers = [listener, alive_r]
while True:
@ -191,7 +186,7 @@ def main(listener_fd, alive_r, preload, main_path=None, sys_path=None):
#
def _serve_one(s, listener, alive_r, handler):
global _inherited_fds
global _inherited_fds, _forkserver_alive_fd
# close unnecessary stuff and reset SIGCHLD handler
listener.close()
@ -202,7 +197,7 @@ def _serve_one(s, listener, alive_r, handler):
fds = reduction.recvfds(s, MAXFDS_TO_SEND + 1)
s.close()
assert len(fds) <= MAXFDS_TO_SEND
child_r, child_w, alive_w, *_inherited_fds = fds
child_r, child_w, _forkserver_alive_fd, *_inherited_fds = fds
# send pid to client processes
write_unsigned(child_w, os.getpid())