diff --git a/Lib/test/test_asyncio/test_proactor_events.py b/Lib/test/test_asyncio/test_proactor_events.py index c838c5ce85a..007039a7cdf 100644 --- a/Lib/test/test_asyncio/test_proactor_events.py +++ b/Lib/test/test_asyncio/test_proactor_events.py @@ -472,259 +472,6 @@ class ProactorSocketTransportTests(test_utils.TestCase): self.assertFalse(self.protocol.pause_writing.called) -@unittest.skip('FIXME: bpo-33694: these tests are too close ' - 'to the implementation and should be refactored or removed') -class ProactorSocketTransportBufferedProtoTests(test_utils.TestCase): - - def setUp(self): - super().setUp() - self.loop = self.new_test_loop() - self.addCleanup(self.loop.close) - self.proactor = mock.Mock() - self.loop._proactor = self.proactor - - self.protocol = test_utils.make_test_protocol(asyncio.BufferedProtocol) - self.buf = bytearray(1) - self.protocol.get_buffer.side_effect = lambda hint: self.buf - - self.sock = mock.Mock(socket.socket) - - def socket_transport(self, waiter=None): - transport = _ProactorSocketTransport(self.loop, self.sock, - self.protocol, waiter=waiter) - self.addCleanup(close_transport, transport) - return transport - - def test_ctor(self): - fut = self.loop.create_future() - tr = self.socket_transport(waiter=fut) - test_utils.run_briefly(self.loop) - self.assertIsNone(fut.result()) - self.protocol.connection_made(tr) - self.proactor.recv_into.assert_called_with(self.sock, self.buf) - - def test_loop_reading(self): - tr = self.socket_transport() - tr._loop_reading() - self.loop._proactor.recv_into.assert_called_with(self.sock, self.buf) - self.assertTrue(self.protocol.get_buffer.called) - self.assertFalse(self.protocol.buffer_updated.called) - self.assertFalse(self.protocol.eof_received.called) - - def test_get_buffer_error(self): - transport = self.socket_transport() - transport._fatal_error = mock.Mock() - - self.loop.call_exception_handler = mock.Mock() - self.protocol.get_buffer.side_effect = LookupError() - - transport._loop_reading() - - self.assertTrue(transport._fatal_error.called) - self.assertTrue(self.protocol.get_buffer.called) - self.assertFalse(self.protocol.buffer_updated.called) - - def test_get_buffer_zerosized(self): - transport = self.socket_transport() - transport._fatal_error = mock.Mock() - - self.loop.call_exception_handler = mock.Mock() - self.protocol.get_buffer.side_effect = lambda hint: bytearray(0) - - transport._loop_reading() - - self.assertTrue(transport._fatal_error.called) - self.assertTrue(self.protocol.get_buffer.called) - self.assertFalse(self.protocol.buffer_updated.called) - - def test_proto_type_switch(self): - self.protocol = test_utils.make_test_protocol(asyncio.Protocol) - tr = self.socket_transport() - - res = self.loop.create_future() - res.set_result(b'data') - - tr = self.socket_transport() - tr._read_fut = res - tr._loop_reading(res) - self.loop._proactor.recv.assert_called_with(self.sock, 32768) - self.protocol.data_received.assert_called_with(b'data') - - # switch protocol to a BufferedProtocol - - buf_proto = test_utils.make_test_protocol(asyncio.BufferedProtocol) - buf = bytearray(4) - buf_proto.get_buffer.side_effect = lambda hint: buf - - tr.set_protocol(buf_proto) - test_utils.run_briefly(self.loop) - res = self.loop.create_future() - res.set_result(4) - - tr._read_fut = res - tr._loop_reading(res) - self.loop._proactor.recv_into.assert_called_with(self.sock, buf) - buf_proto.buffer_updated.assert_called_with(4) - - @unittest.skip('FIXME: bpo-33694: this test is too close to the ' - 'implementation and should be refactored or removed') - def test_proto_buf_switch(self): - tr = self.socket_transport() - test_utils.run_briefly(self.loop) - self.protocol.get_buffer.assert_called_with(-1) - - # switch protocol to *another* BufferedProtocol - - buf_proto = test_utils.make_test_protocol(asyncio.BufferedProtocol) - buf = bytearray(4) - buf_proto.get_buffer.side_effect = lambda hint: buf - tr._read_fut.done.side_effect = lambda: False - tr.set_protocol(buf_proto) - self.assertFalse(buf_proto.get_buffer.called) - test_utils.run_briefly(self.loop) - buf_proto.get_buffer.assert_called_with(-1) - - def test_buffer_updated_error(self): - transport = self.socket_transport() - transport._fatal_error = mock.Mock() - - self.loop.call_exception_handler = mock.Mock() - self.protocol.buffer_updated.side_effect = LookupError() - - res = self.loop.create_future() - res.set_result(10) - transport._read_fut = res - transport._loop_reading(res) - - self.assertTrue(transport._fatal_error.called) - self.assertFalse(self.protocol.get_buffer.called) - self.assertTrue(self.protocol.buffer_updated.called) - - def test_loop_eof_received_error(self): - res = self.loop.create_future() - res.set_result(0) - - self.protocol.eof_received.side_effect = LookupError() - - tr = self.socket_transport() - tr._fatal_error = mock.Mock() - - tr.close = mock.Mock() - tr._read_fut = res - tr._loop_reading(res) - self.assertFalse(self.loop._proactor.recv_into.called) - self.assertTrue(self.protocol.eof_received.called) - self.assertTrue(tr._fatal_error.called) - - def test_loop_reading_data(self): - res = self.loop.create_future() - res.set_result(4) - - tr = self.socket_transport() - tr._read_fut = res - tr._loop_reading(res) - self.loop._proactor.recv_into.assert_called_with(self.sock, self.buf) - self.protocol.buffer_updated.assert_called_with(4) - - def test_loop_reading_no_data(self): - res = self.loop.create_future() - res.set_result(0) - - tr = self.socket_transport() - self.assertRaises(AssertionError, tr._loop_reading, res) - - tr.close = mock.Mock() - tr._read_fut = res - tr._loop_reading(res) - self.assertFalse(self.loop._proactor.recv_into.called) - self.assertTrue(self.protocol.eof_received.called) - self.assertTrue(tr.close.called) - - def test_loop_reading_aborted(self): - err = self.loop._proactor.recv_into.side_effect = \ - ConnectionAbortedError() - - tr = self.socket_transport() - tr._fatal_error = mock.Mock() - tr._loop_reading() - tr._fatal_error.assert_called_with( - err, 'Fatal read error on pipe transport') - - def test_loop_reading_aborted_closing(self): - self.loop._proactor.recv.side_effect = ConnectionAbortedError() - - tr = self.socket_transport() - tr._closing = True - tr._fatal_error = mock.Mock() - tr._loop_reading() - self.assertFalse(tr._fatal_error.called) - - def test_loop_reading_aborted_is_fatal(self): - self.loop._proactor.recv_into.side_effect = ConnectionAbortedError() - tr = self.socket_transport() - tr._closing = False - tr._fatal_error = mock.Mock() - tr._loop_reading() - self.assertTrue(tr._fatal_error.called) - - def test_loop_reading_conn_reset_lost(self): - err = self.loop._proactor.recv_into.side_effect = ConnectionResetError() - - tr = self.socket_transport() - tr._closing = False - tr._fatal_error = mock.Mock() - tr._force_close = mock.Mock() - tr._loop_reading() - self.assertFalse(tr._fatal_error.called) - tr._force_close.assert_called_with(err) - - def test_loop_reading_exception(self): - err = self.loop._proactor.recv_into.side_effect = OSError() - - tr = self.socket_transport() - tr._fatal_error = mock.Mock() - tr._loop_reading() - tr._fatal_error.assert_called_with( - err, 'Fatal read error on pipe transport') - - def test_pause_resume_reading(self): - tr = self.socket_transport() - futures = [] - for msg in [10, 20, 30, 40, 0]: - f = self.loop.create_future() - f.set_result(msg) - futures.append(f) - - self.loop._proactor.recv_into.side_effect = futures - self.loop._run_once() - self.assertFalse(tr._paused) - self.assertTrue(tr.is_reading()) - self.loop._run_once() - self.protocol.buffer_updated.assert_called_with(10) - self.loop._run_once() - self.protocol.buffer_updated.assert_called_with(20) - - tr.pause_reading() - tr.pause_reading() - self.assertTrue(tr._paused) - self.assertFalse(tr.is_reading()) - for i in range(10): - self.loop._run_once() - self.protocol.buffer_updated.assert_called_with(20) - - tr.resume_reading() - tr.resume_reading() - self.assertFalse(tr._paused) - self.assertTrue(tr.is_reading()) - self.loop._run_once() - self.protocol.buffer_updated.assert_called_with(30) - self.loop._run_once() - self.protocol.buffer_updated.assert_called_with(40) - tr.close() - - self.assertFalse(tr.is_reading()) - - class ProactorDatagramTransportTests(test_utils.TestCase): def setUp(self):