Issue #18934: multiprocessing: use selectors module.

This commit is contained in:
Charles-François Natali 2013-09-05 20:46:49 +02:00
parent a83a022f9a
commit e241ac9283
2 changed files with 26 additions and 39 deletions

View File

@ -12,7 +12,6 @@ __all__ = [ 'Client', 'Listener', 'Pipe', 'wait' ]
import io
import os
import sys
import select
import socket
import struct
import errno
@ -877,28 +876,7 @@ if sys.platform == 'win32':
else:
if hasattr(select, 'poll'):
def _poll(fds, timeout):
if timeout is not None:
timeout = int(timeout * 1000) # timeout is in milliseconds
fd_map = {}
pollster = select.poll()
for fd in fds:
pollster.register(fd, select.POLLIN)
if hasattr(fd, 'fileno'):
fd_map[fd.fileno()] = fd
else:
fd_map[fd] = fd
ls = []
for fd, event in pollster.poll(timeout):
if event & select.POLLNVAL:
raise ValueError('invalid file descriptor %i' % fd)
ls.append(fd_map[fd])
return ls
else:
def _poll(fds, timeout):
return select.select(fds, [], [], timeout)[0]
import selectors
def wait(object_list, timeout=None):
'''
@ -906,19 +884,22 @@ else:
Returns list of those objects in object_list which are ready/readable.
'''
if timeout is not None:
if timeout <= 0:
return _poll(object_list, 0)
else:
deadline = time.time() + timeout
while True:
try:
return _poll(object_list, timeout)
except OSError as e:
if e.errno != errno.EINTR:
raise
with selectors.DefaultSelector() as selector:
for obj in object_list:
selector.register(obj, selectors.EVENT_READ)
if timeout is not None:
timeout = deadline - time.time()
deadline = time.time() + timeout
while True:
ready = selector.select(timeout)
if ready:
return [key.fileobj for (key, events) in ready]
else:
if timeout is not None:
timeout = deadline - time.time()
if timeout < 0:
return ready
#
# Make connection and socket objects sharable if possible

View File

@ -1,6 +1,6 @@
import errno
import os
import select
import selectors
import signal
import socket
import struct
@ -149,14 +149,20 @@ def main(listener_fd, alive_r, preload, main_path=None, sys_path=None):
# ignoring SIGCHLD means no need to reap zombie processes
handler = signal.signal(signal.SIGCHLD, signal.SIG_IGN)
with socket.socket(socket.AF_UNIX, fileno=listener_fd) as listener:
with socket.socket(socket.AF_UNIX, fileno=listener_fd) as listener, \
selectors.DefaultSelector() as selector:
global _forkserver_address
_forkserver_address = listener.getsockname()
readers = [listener, alive_r]
selector.register(listener, selectors.EVENT_READ)
selector.register(alive_r, selectors.EVENT_READ)
while True:
try:
rfds, wfds, xfds = select.select(readers, [], [])
while True:
rfds = [key.fileobj for (key, events) in selector.select()]
if rfds:
break
if alive_r in rfds:
# EOF because no more client processes left