diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py index 9fbe46deff4..59fb6640d5b 100644 --- a/Lib/multiprocessing/connection.py +++ b/Lib/multiprocessing/connection.py @@ -12,7 +12,6 @@ __all__ = [ 'Client', 'Listener', 'Pipe', 'wait' ] import io import os import sys -import select import socket import struct import errno @@ -877,28 +876,7 @@ if sys.platform == 'win32': else: - if hasattr(select, 'poll'): - def _poll(fds, timeout): - if timeout is not None: - timeout = int(timeout * 1000) # timeout is in milliseconds - fd_map = {} - pollster = select.poll() - for fd in fds: - pollster.register(fd, select.POLLIN) - if hasattr(fd, 'fileno'): - fd_map[fd.fileno()] = fd - else: - fd_map[fd] = fd - ls = [] - for fd, event in pollster.poll(timeout): - if event & select.POLLNVAL: - raise ValueError('invalid file descriptor %i' % fd) - ls.append(fd_map[fd]) - return ls - else: - def _poll(fds, timeout): - return select.select(fds, [], [], timeout)[0] - + import selectors def wait(object_list, timeout=None): ''' @@ -906,19 +884,22 @@ else: Returns list of those objects in object_list which are ready/readable. ''' - if timeout is not None: - if timeout <= 0: - return _poll(object_list, 0) - else: - deadline = time.time() + timeout - while True: - try: - return _poll(object_list, timeout) - except OSError as e: - if e.errno != errno.EINTR: - raise + with selectors.DefaultSelector() as selector: + for obj in object_list: + selector.register(obj, selectors.EVENT_READ) + if timeout is not None: - timeout = deadline - time.time() + deadline = time.time() + timeout + + while True: + ready = selector.select(timeout) + if ready: + return [key.fileobj for (key, events) in ready] + else: + if timeout is not None: + timeout = deadline - time.time() + if timeout < 0: + return ready # # Make connection and socket objects sharable if possible diff --git a/Lib/multiprocessing/forkserver.py b/Lib/multiprocessing/forkserver.py index 975b15aef8f..0a237079f15 100644 --- a/Lib/multiprocessing/forkserver.py +++ b/Lib/multiprocessing/forkserver.py @@ -1,6 +1,6 @@ import errno import os -import select +import selectors import signal import socket import struct @@ -149,14 +149,20 @@ def main(listener_fd, alive_r, preload, main_path=None, sys_path=None): # ignoring SIGCHLD means no need to reap zombie processes handler = signal.signal(signal.SIGCHLD, signal.SIG_IGN) - with socket.socket(socket.AF_UNIX, fileno=listener_fd) as listener: + with socket.socket(socket.AF_UNIX, fileno=listener_fd) as listener, \ + selectors.DefaultSelector() as selector: global _forkserver_address _forkserver_address = listener.getsockname() - readers = [listener, alive_r] + + selector.register(listener, selectors.EVENT_READ) + selector.register(alive_r, selectors.EVENT_READ) while True: try: - rfds, wfds, xfds = select.select(readers, [], []) + while True: + rfds = [key.fileobj for (key, events) in selector.select()] + if rfds: + break if alive_r in rfds: # EOF because no more client processes left