diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py index ce49c4f37a8..d183f60722c 100644 --- a/Lib/asyncio/unix_events.py +++ b/Lib/asyncio/unix_events.py @@ -305,14 +305,20 @@ class _UnixReadPipeTransport(transports.ReadTransport): self._loop = loop self._pipe = pipe self._fileno = pipe.fileno() + self._protocol = protocol + self._closing = False + mode = os.fstat(self._fileno).st_mode if not (stat.S_ISFIFO(mode) or stat.S_ISSOCK(mode) or stat.S_ISCHR(mode)): + self._pipe = None + self._fileno = None + self._protocol = None raise ValueError("Pipe transport is for pipes/sockets only.") + _set_nonblocking(self._fileno) - self._protocol = protocol - self._closing = False + self._loop.call_soon(self._protocol.connection_made, self) # only start reading when connection_made() has been called self._loop.call_soon(self._loop.add_reader, @@ -422,25 +428,30 @@ class _UnixWritePipeTransport(transports._FlowControlMixin, self._extra['pipe'] = pipe self._pipe = pipe self._fileno = pipe.fileno() - mode = os.fstat(self._fileno).st_mode - is_socket = stat.S_ISSOCK(mode) - if not (is_socket or - stat.S_ISFIFO(mode) or - stat.S_ISCHR(mode)): - raise ValueError("Pipe transport is only for " - "pipes, sockets and character devices") - _set_nonblocking(self._fileno) self._protocol = protocol self._buffer = [] self._conn_lost = 0 self._closing = False # Set when close() or write_eof() called. + mode = os.fstat(self._fileno).st_mode + is_char = stat.S_ISCHR(mode) + is_fifo = stat.S_ISFIFO(mode) + is_socket = stat.S_ISSOCK(mode) + if not (is_char or is_fifo or is_socket): + self._pipe = None + self._fileno = None + self._protocol = None + raise ValueError("Pipe transport is only for " + "pipes, sockets and character devices") + + _set_nonblocking(self._fileno) + self._loop.call_soon(self._protocol.connection_made, self) # On AIX, the reader trick (to be notified when the read end of the # socket is closed) only works for sockets. On other platforms it # works for pipes and sockets. (Exception: OS X 10.4? Issue #19294.) - if is_socket or not sys.platform.startswith("aix"): + if is_socket or (is_fifo and not sys.platform.startswith("aix")): # only start reading when connection_made() has been called self._loop.call_soon(self._loop.add_reader, self._fileno, self._read_ready) diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py index ddb0d44c23b..e742eb71e0d 100644 --- a/Lib/test/test_asyncio/test_events.py +++ b/Lib/test/test_asyncio/test_events.py @@ -21,6 +21,8 @@ import unittest from unittest import mock import weakref +if sys.platform != 'win32': + import tty import asyncio from asyncio import proactor_events @@ -1626,6 +1628,79 @@ class EventLoopTestsMixin: self.loop.run_until_complete(proto.done) self.assertEqual('CLOSED', proto.state) + @unittest.skipUnless(sys.platform != 'win32', + "Don't support pipes for Windows") + # select, poll and kqueue don't support character devices (PTY) on Mac OS X + # older than 10.6 (Snow Leopard) + @support.requires_mac_ver(10, 6) + def test_bidirectional_pty(self): + master, read_slave = os.openpty() + write_slave = os.dup(read_slave) + tty.setraw(read_slave) + + slave_read_obj = io.open(read_slave, 'rb', 0) + read_proto = MyReadPipeProto(loop=self.loop) + read_connect = self.loop.connect_read_pipe(lambda: read_proto, + slave_read_obj) + read_transport, p = self.loop.run_until_complete(read_connect) + self.assertIs(p, read_proto) + self.assertIs(read_transport, read_proto.transport) + self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state) + self.assertEqual(0, read_proto.nbytes) + + + slave_write_obj = io.open(write_slave, 'wb', 0) + write_proto = MyWritePipeProto(loop=self.loop) + write_connect = self.loop.connect_write_pipe(lambda: write_proto, + slave_write_obj) + write_transport, p = self.loop.run_until_complete(write_connect) + self.assertIs(p, write_proto) + self.assertIs(write_transport, write_proto.transport) + self.assertEqual('CONNECTED', write_proto.state) + + data = bytearray() + def reader(data): + chunk = os.read(master, 1024) + data += chunk + return len(data) + + write_transport.write(b'1') + test_utils.run_until(self.loop, lambda: reader(data) >= 1, timeout=10) + self.assertEqual(b'1', data) + self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state) + self.assertEqual('CONNECTED', write_proto.state) + + os.write(master, b'a') + test_utils.run_until(self.loop, lambda: read_proto.nbytes >= 1, + timeout=10) + self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state) + self.assertEqual(1, read_proto.nbytes) + self.assertEqual('CONNECTED', write_proto.state) + + write_transport.write(b'2345') + test_utils.run_until(self.loop, lambda: reader(data) >= 5, timeout=10) + self.assertEqual(b'12345', data) + self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state) + self.assertEqual('CONNECTED', write_proto.state) + + os.write(master, b'bcde') + test_utils.run_until(self.loop, lambda: read_proto.nbytes >= 5, + timeout=10) + self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state) + self.assertEqual(5, read_proto.nbytes) + self.assertEqual('CONNECTED', write_proto.state) + + os.close(master) + + read_transport.close() + self.loop.run_until_complete(read_proto.done) + self.assertEqual( + ['INITIAL', 'CONNECTED', 'EOF', 'CLOSED'], read_proto.state) + + write_transport.close() + self.loop.run_until_complete(write_proto.done) + self.assertEqual('CLOSED', write_proto.state) + def test_prompt_cancellation(self): r, w = test_utils.socketpair() r.setblocking(False)