Issue #18923: Update subprocess to use the new selectors module.
This commit is contained in:
parent
2ce6c44ae4
commit
3a4586a9f9
|
@ -404,15 +404,23 @@ if mswindows:
|
|||
hStdError = None
|
||||
wShowWindow = 0
|
||||
else:
|
||||
import select
|
||||
_has_poll = hasattr(select, 'poll')
|
||||
import _posixsubprocess
|
||||
import select
|
||||
import selectors
|
||||
|
||||
# When select or poll has indicated that the file is writable,
|
||||
# we can write up to _PIPE_BUF bytes without risk of blocking.
|
||||
# POSIX defines PIPE_BUF as >= 512.
|
||||
_PIPE_BUF = getattr(select, 'PIPE_BUF', 512)
|
||||
|
||||
# poll/select have the advantage of not requiring any extra file
|
||||
# descriptor, contrarily to epoll/kqueue (also, they require a single
|
||||
# syscall).
|
||||
if hasattr(selectors, 'PollSelector'):
|
||||
_PopenSelector = selectors.PollSelector
|
||||
else:
|
||||
_PopenSelector = selectors.SelectSelector
|
||||
|
||||
|
||||
__all__ = ["Popen", "PIPE", "STDOUT", "call", "check_call", "getstatusoutput",
|
||||
"getoutput", "check_output", "CalledProcessError", "DEVNULL"]
|
||||
|
@ -1530,12 +1538,65 @@ class Popen(object):
|
|||
if not input:
|
||||
self.stdin.close()
|
||||
|
||||
if _has_poll:
|
||||
stdout, stderr = self._communicate_with_poll(input, endtime,
|
||||
orig_timeout)
|
||||
else:
|
||||
stdout, stderr = self._communicate_with_select(input, endtime,
|
||||
orig_timeout)
|
||||
stdout = None
|
||||
stderr = None
|
||||
|
||||
# Only create this mapping if we haven't already.
|
||||
if not self._communication_started:
|
||||
self._fileobj2output = {}
|
||||
if self.stdout:
|
||||
self._fileobj2output[self.stdout] = []
|
||||
if self.stderr:
|
||||
self._fileobj2output[self.stderr] = []
|
||||
|
||||
if self.stdout:
|
||||
stdout = self._fileobj2output[self.stdout]
|
||||
if self.stderr:
|
||||
stderr = self._fileobj2output[self.stderr]
|
||||
|
||||
self._save_input(input)
|
||||
|
||||
with _PopenSelector() as selector:
|
||||
if self.stdin and input:
|
||||
selector.register(self.stdin, selectors.EVENT_WRITE)
|
||||
if self.stdout:
|
||||
selector.register(self.stdout, selectors.EVENT_READ)
|
||||
if self.stderr:
|
||||
selector.register(self.stderr, selectors.EVENT_READ)
|
||||
|
||||
while selector.get_map():
|
||||
timeout = self._remaining_time(endtime)
|
||||
if timeout is not None and timeout < 0:
|
||||
raise TimeoutExpired(self.args, orig_timeout)
|
||||
|
||||
ready = selector.select(timeout)
|
||||
self._check_timeout(endtime, orig_timeout)
|
||||
|
||||
# XXX Rewrite these to use non-blocking I/O on the file
|
||||
# objects; they are no longer using C stdio!
|
||||
|
||||
for key, events in ready:
|
||||
if key.fileobj is self.stdin:
|
||||
chunk = self._input[self._input_offset :
|
||||
self._input_offset + _PIPE_BUF]
|
||||
try:
|
||||
self._input_offset += os.write(key.fd, chunk)
|
||||
except OSError as e:
|
||||
if e.errno == errno.EPIPE:
|
||||
selector.unregister(key.fileobj)
|
||||
key.fileobj.close()
|
||||
else:
|
||||
raise
|
||||
else:
|
||||
if self._input_offset >= len(self._input):
|
||||
selector.unregister(key.fileobj)
|
||||
key.fileobj.close()
|
||||
elif key.fileobj in (self.stdout, self.stderr):
|
||||
data = os.read(key.fd, 4096)
|
||||
if not data:
|
||||
selector.unregister(key.fileobj)
|
||||
key.fileobj.close()
|
||||
self._fileobj2output[key.fileobj].append(data)
|
||||
|
||||
self.wait(timeout=self._remaining_time(endtime))
|
||||
|
||||
|
@ -1569,167 +1630,6 @@ class Popen(object):
|
|||
self._input = self._input.encode(self.stdin.encoding)
|
||||
|
||||
|
||||
def _communicate_with_poll(self, input, endtime, orig_timeout):
|
||||
stdout = None # Return
|
||||
stderr = None # Return
|
||||
|
||||
if not self._communication_started:
|
||||
self._fd2file = {}
|
||||
|
||||
poller = select.poll()
|
||||
def register_and_append(file_obj, eventmask):
|
||||
poller.register(file_obj.fileno(), eventmask)
|
||||
self._fd2file[file_obj.fileno()] = file_obj
|
||||
|
||||
def close_unregister_and_remove(fd):
|
||||
poller.unregister(fd)
|
||||
self._fd2file[fd].close()
|
||||
self._fd2file.pop(fd)
|
||||
|
||||
if self.stdin and input:
|
||||
register_and_append(self.stdin, select.POLLOUT)
|
||||
|
||||
# Only create this mapping if we haven't already.
|
||||
if not self._communication_started:
|
||||
self._fd2output = {}
|
||||
if self.stdout:
|
||||
self._fd2output[self.stdout.fileno()] = []
|
||||
if self.stderr:
|
||||
self._fd2output[self.stderr.fileno()] = []
|
||||
|
||||
select_POLLIN_POLLPRI = select.POLLIN | select.POLLPRI
|
||||
if self.stdout:
|
||||
register_and_append(self.stdout, select_POLLIN_POLLPRI)
|
||||
stdout = self._fd2output[self.stdout.fileno()]
|
||||
if self.stderr:
|
||||
register_and_append(self.stderr, select_POLLIN_POLLPRI)
|
||||
stderr = self._fd2output[self.stderr.fileno()]
|
||||
|
||||
self._save_input(input)
|
||||
|
||||
while self._fd2file:
|
||||
timeout = self._remaining_time(endtime)
|
||||
if timeout is not None and timeout < 0:
|
||||
raise TimeoutExpired(self.args, orig_timeout)
|
||||
try:
|
||||
ready = poller.poll(timeout)
|
||||
except OSError as e:
|
||||
if e.args[0] == errno.EINTR:
|
||||
continue
|
||||
raise
|
||||
self._check_timeout(endtime, orig_timeout)
|
||||
|
||||
# XXX Rewrite these to use non-blocking I/O on the
|
||||
# file objects; they are no longer using C stdio!
|
||||
|
||||
for fd, mode in ready:
|
||||
if mode & select.POLLOUT:
|
||||
chunk = self._input[self._input_offset :
|
||||
self._input_offset + _PIPE_BUF]
|
||||
try:
|
||||
self._input_offset += os.write(fd, chunk)
|
||||
except OSError as e:
|
||||
if e.errno == errno.EPIPE:
|
||||
close_unregister_and_remove(fd)
|
||||
else:
|
||||
raise
|
||||
else:
|
||||
if self._input_offset >= len(self._input):
|
||||
close_unregister_and_remove(fd)
|
||||
elif mode & select_POLLIN_POLLPRI:
|
||||
data = os.read(fd, 4096)
|
||||
if not data:
|
||||
close_unregister_and_remove(fd)
|
||||
self._fd2output[fd].append(data)
|
||||
else:
|
||||
# Ignore hang up or errors.
|
||||
close_unregister_and_remove(fd)
|
||||
|
||||
return (stdout, stderr)
|
||||
|
||||
|
||||
def _communicate_with_select(self, input, endtime, orig_timeout):
|
||||
if not self._communication_started:
|
||||
self._read_set = []
|
||||
self._write_set = []
|
||||
if self.stdin and input:
|
||||
self._write_set.append(self.stdin)
|
||||
if self.stdout:
|
||||
self._read_set.append(self.stdout)
|
||||
if self.stderr:
|
||||
self._read_set.append(self.stderr)
|
||||
|
||||
self._save_input(input)
|
||||
|
||||
stdout = None # Return
|
||||
stderr = None # Return
|
||||
|
||||
if self.stdout:
|
||||
if not self._communication_started:
|
||||
self._stdout_buff = []
|
||||
stdout = self._stdout_buff
|
||||
if self.stderr:
|
||||
if not self._communication_started:
|
||||
self._stderr_buff = []
|
||||
stderr = self._stderr_buff
|
||||
|
||||
while self._read_set or self._write_set:
|
||||
timeout = self._remaining_time(endtime)
|
||||
if timeout is not None and timeout < 0:
|
||||
raise TimeoutExpired(self.args, orig_timeout)
|
||||
try:
|
||||
(rlist, wlist, xlist) = \
|
||||
select.select(self._read_set, self._write_set, [],
|
||||
timeout)
|
||||
except OSError as e:
|
||||
if e.args[0] == errno.EINTR:
|
||||
continue
|
||||
raise
|
||||
|
||||
# According to the docs, returning three empty lists indicates
|
||||
# that the timeout expired.
|
||||
if not (rlist or wlist or xlist):
|
||||
raise TimeoutExpired(self.args, orig_timeout)
|
||||
# We also check what time it is ourselves for good measure.
|
||||
self._check_timeout(endtime, orig_timeout)
|
||||
|
||||
# XXX Rewrite these to use non-blocking I/O on the
|
||||
# file objects; they are no longer using C stdio!
|
||||
|
||||
if self.stdin in wlist:
|
||||
chunk = self._input[self._input_offset :
|
||||
self._input_offset + _PIPE_BUF]
|
||||
try:
|
||||
bytes_written = os.write(self.stdin.fileno(), chunk)
|
||||
except OSError as e:
|
||||
if e.errno == errno.EPIPE:
|
||||
self.stdin.close()
|
||||
self._write_set.remove(self.stdin)
|
||||
else:
|
||||
raise
|
||||
else:
|
||||
self._input_offset += bytes_written
|
||||
if self._input_offset >= len(self._input):
|
||||
self.stdin.close()
|
||||
self._write_set.remove(self.stdin)
|
||||
|
||||
if self.stdout in rlist:
|
||||
data = os.read(self.stdout.fileno(), 1024)
|
||||
if not data:
|
||||
self.stdout.close()
|
||||
self._read_set.remove(self.stdout)
|
||||
stdout.append(data)
|
||||
|
||||
if self.stderr in rlist:
|
||||
data = os.read(self.stderr.fileno(), 1024)
|
||||
if not data:
|
||||
self.stderr.close()
|
||||
self._read_set.remove(self.stderr)
|
||||
stderr.append(data)
|
||||
|
||||
return (stdout, stderr)
|
||||
|
||||
|
||||
def send_signal(self, sig):
|
||||
"""Send a signal to the process
|
||||
"""
|
||||
|
|
|
@ -11,6 +11,7 @@ import errno
|
|||
import tempfile
|
||||
import time
|
||||
import re
|
||||
import selectors
|
||||
import sysconfig
|
||||
import warnings
|
||||
import select
|
||||
|
@ -2179,15 +2180,16 @@ class CommandTests(unittest.TestCase):
|
|||
os.rmdir(dir)
|
||||
|
||||
|
||||
@unittest.skipUnless(getattr(subprocess, '_has_poll', False),
|
||||
"poll system call not supported")
|
||||
@unittest.skipUnless(hasattr(selectors, 'PollSelector'),
|
||||
"Test needs selectors.PollSelector")
|
||||
class ProcessTestCaseNoPoll(ProcessTestCase):
|
||||
def setUp(self):
|
||||
subprocess._has_poll = False
|
||||
self.orig_selector = subprocess._PopenSelector
|
||||
subprocess._PopenSelector = selectors.SelectSelector
|
||||
ProcessTestCase.setUp(self)
|
||||
|
||||
def tearDown(self):
|
||||
subprocess._has_poll = True
|
||||
subprocess._PopenSelector = self.orig_selector
|
||||
ProcessTestCase.tearDown(self)
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue