asyncio, tulip issue 202: Add unit test of pause/resume writing for proactor
socket transport
This commit is contained in:
parent
049882e561
commit
df75d5b402
|
@ -230,10 +230,6 @@ class _ProactorBaseWritePipeTransport(_ProactorBasePipeTransport,
|
|||
assert self._buffer is None
|
||||
# Pass a copy, except if it's already immutable.
|
||||
self._loop_writing(data=bytes(data))
|
||||
# XXX Should we pause the protocol at this point
|
||||
# if len(data) > self._high_water? (That would
|
||||
# require keeping track of the number of bytes passed
|
||||
# to a send() that hasn't finished yet.)
|
||||
elif not self._buffer: # WRITING -> BACKED UP
|
||||
# Make a mutable copy which we can extend.
|
||||
self._buffer = bytearray(data)
|
||||
|
|
|
@ -343,6 +343,88 @@ class ProactorSocketTransportTests(test_utils.TestCase):
|
|||
tr.close()
|
||||
|
||||
|
||||
def pause_writing_transport(self, high):
|
||||
tr = _ProactorSocketTransport(
|
||||
self.loop, self.sock, self.protocol)
|
||||
self.addCleanup(tr.close)
|
||||
|
||||
tr.set_write_buffer_limits(high=high)
|
||||
|
||||
self.assertEqual(tr.get_write_buffer_size(), 0)
|
||||
self.assertFalse(self.protocol.pause_writing.called)
|
||||
self.assertFalse(self.protocol.resume_writing.called)
|
||||
return tr
|
||||
|
||||
def test_pause_resume_writing(self):
|
||||
tr = self.pause_writing_transport(high=4)
|
||||
|
||||
# write a large chunk, must pause writing
|
||||
fut = asyncio.Future(loop=self.loop)
|
||||
self.loop._proactor.send.return_value = fut
|
||||
tr.write(b'large data')
|
||||
self.loop._run_once()
|
||||
self.assertTrue(self.protocol.pause_writing.called)
|
||||
|
||||
# flush the buffer
|
||||
fut.set_result(None)
|
||||
self.loop._run_once()
|
||||
self.assertEqual(tr.get_write_buffer_size(), 0)
|
||||
self.assertTrue(self.protocol.resume_writing.called)
|
||||
|
||||
def test_pause_writing_2write(self):
|
||||
tr = self.pause_writing_transport(high=4)
|
||||
|
||||
# first short write, the buffer is not full (3 <= 4)
|
||||
fut1 = asyncio.Future(loop=self.loop)
|
||||
self.loop._proactor.send.return_value = fut1
|
||||
tr.write(b'123')
|
||||
self.loop._run_once()
|
||||
self.assertEqual(tr.get_write_buffer_size(), 3)
|
||||
self.assertFalse(self.protocol.pause_writing.called)
|
||||
|
||||
# fill the buffer, must pause writing (6 > 4)
|
||||
tr.write(b'abc')
|
||||
self.loop._run_once()
|
||||
self.assertEqual(tr.get_write_buffer_size(), 6)
|
||||
self.assertTrue(self.protocol.pause_writing.called)
|
||||
|
||||
def test_pause_writing_3write(self):
|
||||
tr = self.pause_writing_transport(high=4)
|
||||
|
||||
# first short write, the buffer is not full (1 <= 4)
|
||||
fut = asyncio.Future(loop=self.loop)
|
||||
self.loop._proactor.send.return_value = fut
|
||||
tr.write(b'1')
|
||||
self.loop._run_once()
|
||||
self.assertEqual(tr.get_write_buffer_size(), 1)
|
||||
self.assertFalse(self.protocol.pause_writing.called)
|
||||
|
||||
# second short write, the buffer is not full (3 <= 4)
|
||||
tr.write(b'23')
|
||||
self.loop._run_once()
|
||||
self.assertEqual(tr.get_write_buffer_size(), 3)
|
||||
self.assertFalse(self.protocol.pause_writing.called)
|
||||
|
||||
# fill the buffer, must pause writing (6 > 4)
|
||||
tr.write(b'abc')
|
||||
self.loop._run_once()
|
||||
self.assertEqual(tr.get_write_buffer_size(), 6)
|
||||
self.assertTrue(self.protocol.pause_writing.called)
|
||||
|
||||
def test_dont_pause_writing(self):
|
||||
tr = self.pause_writing_transport(high=4)
|
||||
|
||||
# write a large chunk which completes immedialty,
|
||||
# it should not pause writing
|
||||
fut = asyncio.Future(loop=self.loop)
|
||||
fut.set_result(None)
|
||||
self.loop._proactor.send.return_value = fut
|
||||
tr.write(b'very large data')
|
||||
self.loop._run_once()
|
||||
self.assertEqual(tr.get_write_buffer_size(), 0)
|
||||
self.assertFalse(self.protocol.pause_writing.called)
|
||||
|
||||
|
||||
class BaseProactorEventLoopTests(test_utils.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
|
|
Loading…
Reference in New Issue