mirror of https://github.com/python/cpython
Merge asyncio from 3.5
This commit is contained in:
commit
898ac56fc2
|
@ -305,14 +305,20 @@ class _UnixReadPipeTransport(transports.ReadTransport):
|
|||
self._loop = loop
|
||||
self._pipe = pipe
|
||||
self._fileno = pipe.fileno()
|
||||
self._protocol = protocol
|
||||
self._closing = False
|
||||
|
||||
mode = os.fstat(self._fileno).st_mode
|
||||
if not (stat.S_ISFIFO(mode) or
|
||||
stat.S_ISSOCK(mode) or
|
||||
stat.S_ISCHR(mode)):
|
||||
self._pipe = None
|
||||
self._fileno = None
|
||||
self._protocol = None
|
||||
raise ValueError("Pipe transport is for pipes/sockets only.")
|
||||
|
||||
_set_nonblocking(self._fileno)
|
||||
self._protocol = protocol
|
||||
self._closing = False
|
||||
|
||||
self._loop.call_soon(self._protocol.connection_made, self)
|
||||
# only start reading when connection_made() has been called
|
||||
self._loop.call_soon(self._loop.add_reader,
|
||||
|
@ -422,25 +428,30 @@ class _UnixWritePipeTransport(transports._FlowControlMixin,
|
|||
self._extra['pipe'] = pipe
|
||||
self._pipe = pipe
|
||||
self._fileno = pipe.fileno()
|
||||
mode = os.fstat(self._fileno).st_mode
|
||||
is_socket = stat.S_ISSOCK(mode)
|
||||
if not (is_socket or
|
||||
stat.S_ISFIFO(mode) or
|
||||
stat.S_ISCHR(mode)):
|
||||
raise ValueError("Pipe transport is only for "
|
||||
"pipes, sockets and character devices")
|
||||
_set_nonblocking(self._fileno)
|
||||
self._protocol = protocol
|
||||
self._buffer = []
|
||||
self._conn_lost = 0
|
||||
self._closing = False # Set when close() or write_eof() called.
|
||||
|
||||
mode = os.fstat(self._fileno).st_mode
|
||||
is_char = stat.S_ISCHR(mode)
|
||||
is_fifo = stat.S_ISFIFO(mode)
|
||||
is_socket = stat.S_ISSOCK(mode)
|
||||
if not (is_char or is_fifo or is_socket):
|
||||
self._pipe = None
|
||||
self._fileno = None
|
||||
self._protocol = None
|
||||
raise ValueError("Pipe transport is only for "
|
||||
"pipes, sockets and character devices")
|
||||
|
||||
_set_nonblocking(self._fileno)
|
||||
|
||||
self._loop.call_soon(self._protocol.connection_made, self)
|
||||
|
||||
# On AIX, the reader trick (to be notified when the read end of the
|
||||
# socket is closed) only works for sockets. On other platforms it
|
||||
# works for pipes and sockets. (Exception: OS X 10.4? Issue #19294.)
|
||||
if is_socket or not sys.platform.startswith("aix"):
|
||||
if is_socket or (is_fifo and not sys.platform.startswith("aix")):
|
||||
# only start reading when connection_made() has been called
|
||||
self._loop.call_soon(self._loop.add_reader,
|
||||
self._fileno, self._read_ready)
|
||||
|
|
|
@ -21,6 +21,8 @@ import unittest
|
|||
from unittest import mock
|
||||
import weakref
|
||||
|
||||
if sys.platform != 'win32':
|
||||
import tty
|
||||
|
||||
import asyncio
|
||||
from asyncio import proactor_events
|
||||
|
@ -1626,6 +1628,79 @@ class EventLoopTestsMixin:
|
|||
self.loop.run_until_complete(proto.done)
|
||||
self.assertEqual('CLOSED', proto.state)
|
||||
|
||||
@unittest.skipUnless(sys.platform != 'win32',
|
||||
"Don't support pipes for Windows")
|
||||
# select, poll and kqueue don't support character devices (PTY) on Mac OS X
|
||||
# older than 10.6 (Snow Leopard)
|
||||
@support.requires_mac_ver(10, 6)
|
||||
def test_bidirectional_pty(self):
|
||||
master, read_slave = os.openpty()
|
||||
write_slave = os.dup(read_slave)
|
||||
tty.setraw(read_slave)
|
||||
|
||||
slave_read_obj = io.open(read_slave, 'rb', 0)
|
||||
read_proto = MyReadPipeProto(loop=self.loop)
|
||||
read_connect = self.loop.connect_read_pipe(lambda: read_proto,
|
||||
slave_read_obj)
|
||||
read_transport, p = self.loop.run_until_complete(read_connect)
|
||||
self.assertIs(p, read_proto)
|
||||
self.assertIs(read_transport, read_proto.transport)
|
||||
self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state)
|
||||
self.assertEqual(0, read_proto.nbytes)
|
||||
|
||||
|
||||
slave_write_obj = io.open(write_slave, 'wb', 0)
|
||||
write_proto = MyWritePipeProto(loop=self.loop)
|
||||
write_connect = self.loop.connect_write_pipe(lambda: write_proto,
|
||||
slave_write_obj)
|
||||
write_transport, p = self.loop.run_until_complete(write_connect)
|
||||
self.assertIs(p, write_proto)
|
||||
self.assertIs(write_transport, write_proto.transport)
|
||||
self.assertEqual('CONNECTED', write_proto.state)
|
||||
|
||||
data = bytearray()
|
||||
def reader(data):
|
||||
chunk = os.read(master, 1024)
|
||||
data += chunk
|
||||
return len(data)
|
||||
|
||||
write_transport.write(b'1')
|
||||
test_utils.run_until(self.loop, lambda: reader(data) >= 1, timeout=10)
|
||||
self.assertEqual(b'1', data)
|
||||
self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state)
|
||||
self.assertEqual('CONNECTED', write_proto.state)
|
||||
|
||||
os.write(master, b'a')
|
||||
test_utils.run_until(self.loop, lambda: read_proto.nbytes >= 1,
|
||||
timeout=10)
|
||||
self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state)
|
||||
self.assertEqual(1, read_proto.nbytes)
|
||||
self.assertEqual('CONNECTED', write_proto.state)
|
||||
|
||||
write_transport.write(b'2345')
|
||||
test_utils.run_until(self.loop, lambda: reader(data) >= 5, timeout=10)
|
||||
self.assertEqual(b'12345', data)
|
||||
self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state)
|
||||
self.assertEqual('CONNECTED', write_proto.state)
|
||||
|
||||
os.write(master, b'bcde')
|
||||
test_utils.run_until(self.loop, lambda: read_proto.nbytes >= 5,
|
||||
timeout=10)
|
||||
self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state)
|
||||
self.assertEqual(5, read_proto.nbytes)
|
||||
self.assertEqual('CONNECTED', write_proto.state)
|
||||
|
||||
os.close(master)
|
||||
|
||||
read_transport.close()
|
||||
self.loop.run_until_complete(read_proto.done)
|
||||
self.assertEqual(
|
||||
['INITIAL', 'CONNECTED', 'EOF', 'CLOSED'], read_proto.state)
|
||||
|
||||
write_transport.close()
|
||||
self.loop.run_until_complete(write_proto.done)
|
||||
self.assertEqual('CLOSED', write_proto.state)
|
||||
|
||||
def test_prompt_cancellation(self):
|
||||
r, w = test_utils.socketpair()
|
||||
r.setblocking(False)
|
||||
|
|
Loading…
Reference in New Issue