diff --git a/Lib/asyncio/proactor_events.py b/Lib/asyncio/proactor_events.py index 4c527aa262a..e67cf65a10f 100644 --- a/Lib/asyncio/proactor_events.py +++ b/Lib/asyncio/proactor_events.py @@ -230,10 +230,6 @@ class _ProactorBaseWritePipeTransport(_ProactorBasePipeTransport, assert self._buffer is None # Pass a copy, except if it's already immutable. self._loop_writing(data=bytes(data)) - # XXX Should we pause the protocol at this point - # if len(data) > self._high_water? (That would - # require keeping track of the number of bytes passed - # to a send() that hasn't finished yet.) elif not self._buffer: # WRITING -> BACKED UP # Make a mutable copy which we can extend. self._buffer = bytearray(data) diff --git a/Lib/test/test_asyncio/test_proactor_events.py b/Lib/test/test_asyncio/test_proactor_events.py index 0c536986ff7..9e9b41a47fd 100644 --- a/Lib/test/test_asyncio/test_proactor_events.py +++ b/Lib/test/test_asyncio/test_proactor_events.py @@ -343,6 +343,88 @@ class ProactorSocketTransportTests(test_utils.TestCase): tr.close() + def pause_writing_transport(self, high): + tr = _ProactorSocketTransport( + self.loop, self.sock, self.protocol) + self.addCleanup(tr.close) + + tr.set_write_buffer_limits(high=high) + + self.assertEqual(tr.get_write_buffer_size(), 0) + self.assertFalse(self.protocol.pause_writing.called) + self.assertFalse(self.protocol.resume_writing.called) + return tr + + def test_pause_resume_writing(self): + tr = self.pause_writing_transport(high=4) + + # write a large chunk, must pause writing + fut = asyncio.Future(loop=self.loop) + self.loop._proactor.send.return_value = fut + tr.write(b'large data') + self.loop._run_once() + self.assertTrue(self.protocol.pause_writing.called) + + # flush the buffer + fut.set_result(None) + self.loop._run_once() + self.assertEqual(tr.get_write_buffer_size(), 0) + self.assertTrue(self.protocol.resume_writing.called) + + def test_pause_writing_2write(self): + tr = self.pause_writing_transport(high=4) + + # first short write, the buffer is not full (3 <= 4) + fut1 = asyncio.Future(loop=self.loop) + self.loop._proactor.send.return_value = fut1 + tr.write(b'123') + self.loop._run_once() + self.assertEqual(tr.get_write_buffer_size(), 3) + self.assertFalse(self.protocol.pause_writing.called) + + # fill the buffer, must pause writing (6 > 4) + tr.write(b'abc') + self.loop._run_once() + self.assertEqual(tr.get_write_buffer_size(), 6) + self.assertTrue(self.protocol.pause_writing.called) + + def test_pause_writing_3write(self): + tr = self.pause_writing_transport(high=4) + + # first short write, the buffer is not full (1 <= 4) + fut = asyncio.Future(loop=self.loop) + self.loop._proactor.send.return_value = fut + tr.write(b'1') + self.loop._run_once() + self.assertEqual(tr.get_write_buffer_size(), 1) + self.assertFalse(self.protocol.pause_writing.called) + + # second short write, the buffer is not full (3 <= 4) + tr.write(b'23') + self.loop._run_once() + self.assertEqual(tr.get_write_buffer_size(), 3) + self.assertFalse(self.protocol.pause_writing.called) + + # fill the buffer, must pause writing (6 > 4) + tr.write(b'abc') + self.loop._run_once() + self.assertEqual(tr.get_write_buffer_size(), 6) + self.assertTrue(self.protocol.pause_writing.called) + + def test_dont_pause_writing(self): + tr = self.pause_writing_transport(high=4) + + # write a large chunk which completes immedialty, + # it should not pause writing + fut = asyncio.Future(loop=self.loop) + fut.set_result(None) + self.loop._proactor.send.return_value = fut + tr.write(b'very large data') + self.loop._run_once() + self.assertEqual(tr.get_write_buffer_size(), 0) + self.assertFalse(self.protocol.pause_writing.called) + + class BaseProactorEventLoopTests(test_utils.TestCase): def setUp(self):