mirror of https://github.com/python/cpython
Merge 3.5 (asyncio, issue #26909)
This commit is contained in:
commit
b76968be0d
|
@ -435,7 +435,7 @@ class _UnixWritePipeTransport(transports._FlowControlMixin,
|
||||||
self._pipe = pipe
|
self._pipe = pipe
|
||||||
self._fileno = pipe.fileno()
|
self._fileno = pipe.fileno()
|
||||||
self._protocol = protocol
|
self._protocol = protocol
|
||||||
self._buffer = []
|
self._buffer = bytearray()
|
||||||
self._conn_lost = 0
|
self._conn_lost = 0
|
||||||
self._closing = False # Set when close() or write_eof() called.
|
self._closing = False # Set when close() or write_eof() called.
|
||||||
|
|
||||||
|
@ -451,7 +451,6 @@ class _UnixWritePipeTransport(transports._FlowControlMixin,
|
||||||
"pipes, sockets and character devices")
|
"pipes, sockets and character devices")
|
||||||
|
|
||||||
_set_nonblocking(self._fileno)
|
_set_nonblocking(self._fileno)
|
||||||
|
|
||||||
self._loop.call_soon(self._protocol.connection_made, self)
|
self._loop.call_soon(self._protocol.connection_made, self)
|
||||||
|
|
||||||
# On AIX, the reader trick (to be notified when the read end of the
|
# On AIX, the reader trick (to be notified when the read end of the
|
||||||
|
@ -493,7 +492,7 @@ class _UnixWritePipeTransport(transports._FlowControlMixin,
|
||||||
return '<%s>' % ' '.join(info)
|
return '<%s>' % ' '.join(info)
|
||||||
|
|
||||||
def get_write_buffer_size(self):
|
def get_write_buffer_size(self):
|
||||||
return sum(len(data) for data in self._buffer)
|
return len(self._buffer)
|
||||||
|
|
||||||
def _read_ready(self):
|
def _read_ready(self):
|
||||||
# Pipe was closed by peer.
|
# Pipe was closed by peer.
|
||||||
|
@ -531,39 +530,37 @@ class _UnixWritePipeTransport(transports._FlowControlMixin,
|
||||||
if n == len(data):
|
if n == len(data):
|
||||||
return
|
return
|
||||||
elif n > 0:
|
elif n > 0:
|
||||||
data = data[n:]
|
data = memoryview(data)[n:]
|
||||||
self._loop.add_writer(self._fileno, self._write_ready)
|
self._loop.add_writer(self._fileno, self._write_ready)
|
||||||
|
|
||||||
self._buffer.append(data)
|
self._buffer += data
|
||||||
self._maybe_pause_protocol()
|
self._maybe_pause_protocol()
|
||||||
|
|
||||||
def _write_ready(self):
|
def _write_ready(self):
|
||||||
data = b''.join(self._buffer)
|
assert self._buffer, 'Data should not be empty'
|
||||||
assert data, 'Data should not be empty'
|
|
||||||
|
|
||||||
self._buffer.clear()
|
|
||||||
try:
|
try:
|
||||||
n = os.write(self._fileno, data)
|
n = os.write(self._fileno, self._buffer)
|
||||||
except (BlockingIOError, InterruptedError):
|
except (BlockingIOError, InterruptedError):
|
||||||
self._buffer.append(data)
|
pass
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
|
self._buffer.clear()
|
||||||
self._conn_lost += 1
|
self._conn_lost += 1
|
||||||
# Remove writer here, _fatal_error() doesn't it
|
# Remove writer here, _fatal_error() doesn't it
|
||||||
# because _buffer is empty.
|
# because _buffer is empty.
|
||||||
self._loop.remove_writer(self._fileno)
|
self._loop.remove_writer(self._fileno)
|
||||||
self._fatal_error(exc, 'Fatal write error on pipe transport')
|
self._fatal_error(exc, 'Fatal write error on pipe transport')
|
||||||
else:
|
else:
|
||||||
if n == len(data):
|
if n == len(self._buffer):
|
||||||
|
self._buffer.clear()
|
||||||
self._loop.remove_writer(self._fileno)
|
self._loop.remove_writer(self._fileno)
|
||||||
self._maybe_resume_protocol() # May append to buffer.
|
self._maybe_resume_protocol() # May append to buffer.
|
||||||
if not self._buffer and self._closing:
|
if self._closing:
|
||||||
self._loop.remove_reader(self._fileno)
|
self._loop.remove_reader(self._fileno)
|
||||||
self._call_connection_lost(None)
|
self._call_connection_lost(None)
|
||||||
return
|
return
|
||||||
elif n > 0:
|
elif n > 0:
|
||||||
data = data[n:]
|
del self._buffer[:n]
|
||||||
|
|
||||||
self._buffer.append(data) # Try again later.
|
|
||||||
|
|
||||||
def can_write_eof(self):
|
def can_write_eof(self):
|
||||||
return True
|
return True
|
||||||
|
|
|
@ -518,7 +518,7 @@ class UnixWritePipeTransportTests(test_utils.TestCase):
|
||||||
tr.write(b'data')
|
tr.write(b'data')
|
||||||
m_write.assert_called_with(5, b'data')
|
m_write.assert_called_with(5, b'data')
|
||||||
self.assertFalse(self.loop.writers)
|
self.assertFalse(self.loop.writers)
|
||||||
self.assertEqual([], tr._buffer)
|
self.assertEqual(bytearray(), tr._buffer)
|
||||||
|
|
||||||
@mock.patch('os.write')
|
@mock.patch('os.write')
|
||||||
def test_write_no_data(self, m_write):
|
def test_write_no_data(self, m_write):
|
||||||
|
@ -526,35 +526,34 @@ class UnixWritePipeTransportTests(test_utils.TestCase):
|
||||||
tr.write(b'')
|
tr.write(b'')
|
||||||
self.assertFalse(m_write.called)
|
self.assertFalse(m_write.called)
|
||||||
self.assertFalse(self.loop.writers)
|
self.assertFalse(self.loop.writers)
|
||||||
self.assertEqual([], tr._buffer)
|
self.assertEqual(bytearray(b''), tr._buffer)
|
||||||
|
|
||||||
@mock.patch('os.write')
|
@mock.patch('os.write')
|
||||||
def test_write_partial(self, m_write):
|
def test_write_partial(self, m_write):
|
||||||
tr = self.write_pipe_transport()
|
tr = self.write_pipe_transport()
|
||||||
m_write.return_value = 2
|
m_write.return_value = 2
|
||||||
tr.write(b'data')
|
tr.write(b'data')
|
||||||
m_write.assert_called_with(5, b'data')
|
|
||||||
self.loop.assert_writer(5, tr._write_ready)
|
self.loop.assert_writer(5, tr._write_ready)
|
||||||
self.assertEqual([b'ta'], tr._buffer)
|
self.assertEqual(bytearray(b'ta'), tr._buffer)
|
||||||
|
|
||||||
@mock.patch('os.write')
|
@mock.patch('os.write')
|
||||||
def test_write_buffer(self, m_write):
|
def test_write_buffer(self, m_write):
|
||||||
tr = self.write_pipe_transport()
|
tr = self.write_pipe_transport()
|
||||||
self.loop.add_writer(5, tr._write_ready)
|
self.loop.add_writer(5, tr._write_ready)
|
||||||
tr._buffer = [b'previous']
|
tr._buffer = bytearray(b'previous')
|
||||||
tr.write(b'data')
|
tr.write(b'data')
|
||||||
self.assertFalse(m_write.called)
|
self.assertFalse(m_write.called)
|
||||||
self.loop.assert_writer(5, tr._write_ready)
|
self.loop.assert_writer(5, tr._write_ready)
|
||||||
self.assertEqual([b'previous', b'data'], tr._buffer)
|
self.assertEqual(bytearray(b'previousdata'), tr._buffer)
|
||||||
|
|
||||||
@mock.patch('os.write')
|
@mock.patch('os.write')
|
||||||
def test_write_again(self, m_write):
|
def test_write_again(self, m_write):
|
||||||
tr = self.write_pipe_transport()
|
tr = self.write_pipe_transport()
|
||||||
m_write.side_effect = BlockingIOError()
|
m_write.side_effect = BlockingIOError()
|
||||||
tr.write(b'data')
|
tr.write(b'data')
|
||||||
m_write.assert_called_with(5, b'data')
|
m_write.assert_called_with(5, bytearray(b'data'))
|
||||||
self.loop.assert_writer(5, tr._write_ready)
|
self.loop.assert_writer(5, tr._write_ready)
|
||||||
self.assertEqual([b'data'], tr._buffer)
|
self.assertEqual(bytearray(b'data'), tr._buffer)
|
||||||
|
|
||||||
@mock.patch('asyncio.unix_events.logger')
|
@mock.patch('asyncio.unix_events.logger')
|
||||||
@mock.patch('os.write')
|
@mock.patch('os.write')
|
||||||
|
@ -566,7 +565,7 @@ class UnixWritePipeTransportTests(test_utils.TestCase):
|
||||||
tr.write(b'data')
|
tr.write(b'data')
|
||||||
m_write.assert_called_with(5, b'data')
|
m_write.assert_called_with(5, b'data')
|
||||||
self.assertFalse(self.loop.writers)
|
self.assertFalse(self.loop.writers)
|
||||||
self.assertEqual([], tr._buffer)
|
self.assertEqual(bytearray(), tr._buffer)
|
||||||
tr._fatal_error.assert_called_with(
|
tr._fatal_error.assert_called_with(
|
||||||
err,
|
err,
|
||||||
'Fatal write error on pipe transport')
|
'Fatal write error on pipe transport')
|
||||||
|
@ -606,58 +605,55 @@ class UnixWritePipeTransportTests(test_utils.TestCase):
|
||||||
def test__write_ready(self, m_write):
|
def test__write_ready(self, m_write):
|
||||||
tr = self.write_pipe_transport()
|
tr = self.write_pipe_transport()
|
||||||
self.loop.add_writer(5, tr._write_ready)
|
self.loop.add_writer(5, tr._write_ready)
|
||||||
tr._buffer = [b'da', b'ta']
|
tr._buffer = bytearray(b'data')
|
||||||
m_write.return_value = 4
|
m_write.return_value = 4
|
||||||
tr._write_ready()
|
tr._write_ready()
|
||||||
m_write.assert_called_with(5, b'data')
|
|
||||||
self.assertFalse(self.loop.writers)
|
self.assertFalse(self.loop.writers)
|
||||||
self.assertEqual([], tr._buffer)
|
self.assertEqual(bytearray(), tr._buffer)
|
||||||
|
|
||||||
@mock.patch('os.write')
|
@mock.patch('os.write')
|
||||||
def test__write_ready_partial(self, m_write):
|
def test__write_ready_partial(self, m_write):
|
||||||
tr = self.write_pipe_transport()
|
tr = self.write_pipe_transport()
|
||||||
self.loop.add_writer(5, tr._write_ready)
|
self.loop.add_writer(5, tr._write_ready)
|
||||||
tr._buffer = [b'da', b'ta']
|
tr._buffer = bytearray(b'data')
|
||||||
m_write.return_value = 3
|
m_write.return_value = 3
|
||||||
tr._write_ready()
|
tr._write_ready()
|
||||||
m_write.assert_called_with(5, b'data')
|
|
||||||
self.loop.assert_writer(5, tr._write_ready)
|
self.loop.assert_writer(5, tr._write_ready)
|
||||||
self.assertEqual([b'a'], tr._buffer)
|
self.assertEqual(bytearray(b'a'), tr._buffer)
|
||||||
|
|
||||||
@mock.patch('os.write')
|
@mock.patch('os.write')
|
||||||
def test__write_ready_again(self, m_write):
|
def test__write_ready_again(self, m_write):
|
||||||
tr = self.write_pipe_transport()
|
tr = self.write_pipe_transport()
|
||||||
self.loop.add_writer(5, tr._write_ready)
|
self.loop.add_writer(5, tr._write_ready)
|
||||||
tr._buffer = [b'da', b'ta']
|
tr._buffer = bytearray(b'data')
|
||||||
m_write.side_effect = BlockingIOError()
|
m_write.side_effect = BlockingIOError()
|
||||||
tr._write_ready()
|
tr._write_ready()
|
||||||
m_write.assert_called_with(5, b'data')
|
m_write.assert_called_with(5, bytearray(b'data'))
|
||||||
self.loop.assert_writer(5, tr._write_ready)
|
self.loop.assert_writer(5, tr._write_ready)
|
||||||
self.assertEqual([b'data'], tr._buffer)
|
self.assertEqual(bytearray(b'data'), tr._buffer)
|
||||||
|
|
||||||
@mock.patch('os.write')
|
@mock.patch('os.write')
|
||||||
def test__write_ready_empty(self, m_write):
|
def test__write_ready_empty(self, m_write):
|
||||||
tr = self.write_pipe_transport()
|
tr = self.write_pipe_transport()
|
||||||
self.loop.add_writer(5, tr._write_ready)
|
self.loop.add_writer(5, tr._write_ready)
|
||||||
tr._buffer = [b'da', b'ta']
|
tr._buffer = bytearray(b'data')
|
||||||
m_write.return_value = 0
|
m_write.return_value = 0
|
||||||
tr._write_ready()
|
tr._write_ready()
|
||||||
m_write.assert_called_with(5, b'data')
|
m_write.assert_called_with(5, bytearray(b'data'))
|
||||||
self.loop.assert_writer(5, tr._write_ready)
|
self.loop.assert_writer(5, tr._write_ready)
|
||||||
self.assertEqual([b'data'], tr._buffer)
|
self.assertEqual(bytearray(b'data'), tr._buffer)
|
||||||
|
|
||||||
@mock.patch('asyncio.log.logger.error')
|
@mock.patch('asyncio.log.logger.error')
|
||||||
@mock.patch('os.write')
|
@mock.patch('os.write')
|
||||||
def test__write_ready_err(self, m_write, m_logexc):
|
def test__write_ready_err(self, m_write, m_logexc):
|
||||||
tr = self.write_pipe_transport()
|
tr = self.write_pipe_transport()
|
||||||
self.loop.add_writer(5, tr._write_ready)
|
self.loop.add_writer(5, tr._write_ready)
|
||||||
tr._buffer = [b'da', b'ta']
|
tr._buffer = bytearray(b'data')
|
||||||
m_write.side_effect = err = OSError()
|
m_write.side_effect = err = OSError()
|
||||||
tr._write_ready()
|
tr._write_ready()
|
||||||
m_write.assert_called_with(5, b'data')
|
|
||||||
self.assertFalse(self.loop.writers)
|
self.assertFalse(self.loop.writers)
|
||||||
self.assertFalse(self.loop.readers)
|
self.assertFalse(self.loop.readers)
|
||||||
self.assertEqual([], tr._buffer)
|
self.assertEqual(bytearray(), tr._buffer)
|
||||||
self.assertTrue(tr.is_closing())
|
self.assertTrue(tr.is_closing())
|
||||||
m_logexc.assert_called_with(
|
m_logexc.assert_called_with(
|
||||||
test_utils.MockPattern(
|
test_utils.MockPattern(
|
||||||
|
@ -673,13 +669,12 @@ class UnixWritePipeTransportTests(test_utils.TestCase):
|
||||||
tr = self.write_pipe_transport()
|
tr = self.write_pipe_transport()
|
||||||
self.loop.add_writer(5, tr._write_ready)
|
self.loop.add_writer(5, tr._write_ready)
|
||||||
tr._closing = True
|
tr._closing = True
|
||||||
tr._buffer = [b'da', b'ta']
|
tr._buffer = bytearray(b'data')
|
||||||
m_write.return_value = 4
|
m_write.return_value = 4
|
||||||
tr._write_ready()
|
tr._write_ready()
|
||||||
m_write.assert_called_with(5, b'data')
|
|
||||||
self.assertFalse(self.loop.writers)
|
self.assertFalse(self.loop.writers)
|
||||||
self.assertFalse(self.loop.readers)
|
self.assertFalse(self.loop.readers)
|
||||||
self.assertEqual([], tr._buffer)
|
self.assertEqual(bytearray(), tr._buffer)
|
||||||
self.protocol.connection_lost.assert_called_with(None)
|
self.protocol.connection_lost.assert_called_with(None)
|
||||||
self.pipe.close.assert_called_with()
|
self.pipe.close.assert_called_with()
|
||||||
|
|
||||||
|
|
|
@ -427,6 +427,9 @@ Library
|
||||||
- Issue #26654: Inspect functools.partial in asyncio.Handle.__repr__.
|
- Issue #26654: Inspect functools.partial in asyncio.Handle.__repr__.
|
||||||
Patch by iceboy.
|
Patch by iceboy.
|
||||||
|
|
||||||
|
- Issue #26909: Fix slow pipes IO in asyncio.
|
||||||
|
Patch by INADA Naoki.
|
||||||
|
|
||||||
IDLE
|
IDLE
|
||||||
----
|
----
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue