diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py index 8c0e09ad12d..3807680f8d7 100644 --- a/Lib/asyncio/unix_events.py +++ b/Lib/asyncio/unix_events.py @@ -213,6 +213,9 @@ class _UnixReadPipeTransport(transports.ReadTransport): self._loop = loop self._pipe = pipe self._fileno = pipe.fileno() + mode = os.fstat(self._fileno).st_mode + if not (stat.S_ISFIFO(mode) or stat.S_ISSOCK(mode)): + raise ValueError("Pipe transport is for pipes/sockets only.") _set_nonblocking(self._fileno) self._protocol = protocol self._closing = False @@ -275,21 +278,29 @@ class _UnixWritePipeTransport(transports.WriteTransport): self._loop = loop self._pipe = pipe self._fileno = pipe.fileno() - if not stat.S_ISFIFO(os.fstat(self._fileno).st_mode): - raise ValueError("Pipe transport is for pipes only.") + mode = os.fstat(self._fileno).st_mode + is_socket = stat.S_ISSOCK(mode) + is_pipe = stat.S_ISFIFO(mode) + if not (is_socket or is_pipe): + raise ValueError("Pipe transport is for pipes/sockets only.") _set_nonblocking(self._fileno) self._protocol = protocol self._buffer = [] self._conn_lost = 0 self._closing = False # Set when close() or write_eof() called. - self._loop.add_reader(self._fileno, self._read_ready) + + # On AIX, the reader trick only works for sockets. + # On other platforms it works for pipes and sockets. + # (Exception: OS X 10.4? Issue #19294.) + if is_socket or not sys.platform.startswith("aix"): + self._loop.add_reader(self._fileno, self._read_ready) self._loop.call_soon(self._protocol.connection_made, self) if waiter is not None: self._loop.call_soon(waiter.set_result, None) def _read_ready(self): - # pipe was closed by peer + # Pipe was closed by peer. self._close() def write(self, data): @@ -435,8 +446,15 @@ class _UnixSubprocessTransport(transports.SubprocessTransport): self._loop = loop self._pipes = {} + stdin_w = None if stdin == subprocess.PIPE: self._pipes[STDIN] = None + # Use a socket pair for stdin, since not all platforms + # support selecting read events on the write end of a + # socket (which we use in order to detect closing of the + # other end). Notably this is needed on AIX, and works + # just fine on other platforms. + stdin, stdin_w = self._loop._socketpair() if stdout == subprocess.PIPE: self._pipes[STDOUT] = None if stderr == subprocess.PIPE: @@ -448,6 +466,9 @@ class _UnixSubprocessTransport(transports.SubprocessTransport): self._proc = subprocess.Popen( args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr, universal_newlines=False, bufsize=bufsize, **kwargs) + if stdin_w is not None: + stdin.close() + self._proc.stdin = open(stdin_w.detach(), 'rb', buffering=bufsize) self._extra['subprocess'] = self._proc def close(self): diff --git a/Lib/test/test_asyncio/test_unix_events.py b/Lib/test/test_asyncio/test_unix_events.py index 27e70c6d2b1..f29e7afec15 100644 --- a/Lib/test/test_asyncio/test_unix_events.py +++ b/Lib/test/test_asyncio/test_unix_events.py @@ -312,6 +312,13 @@ class UnixReadPipeTransportTests(unittest.TestCase): fcntl_patcher.start() self.addCleanup(fcntl_patcher.stop) + fstat_patcher = unittest.mock.patch('os.fstat') + m_fstat = fstat_patcher.start() + st = unittest.mock.Mock() + st.st_mode = stat.S_IFIFO + m_fstat.return_value = st + self.addCleanup(fstat_patcher.stop) + def test_ctor(self): tr = unix_events._UnixReadPipeTransport( self.loop, self.pipe, self.protocol)