From 1de298ce8bd2db288b451a597e90e9aa5edcdbc0 Mon Sep 17 00:00:00 2001 From: Tony Solomonik Date: Sun, 19 Jul 2020 23:52:13 +0300 Subject: [PATCH] bpo-41279: Add BufferedProtocol to unix read pipe transport tests (GH-21446) --- Lib/test/test_asyncio/test_unix_events.py | 180 +++++++++++++++------- 1 file changed, 128 insertions(+), 52 deletions(-) diff --git a/Lib/test/test_asyncio/test_unix_events.py b/Lib/test/test_asyncio/test_unix_events.py index 2c7d52a15bb..f22165fb0b5 100644 --- a/Lib/test/test_asyncio/test_unix_events.py +++ b/Lib/test/test_asyncio/test_unix_events.py @@ -642,12 +642,14 @@ class SelectorEventLoopUnixSockSendfileTests(test_utils.TestCase): self.assertEqual(1000, self.file.tell()) -class UnixReadPipeTransportTests(test_utils.TestCase): +class UnixReadPipeTransportTestsBase: - def setUp(self): - super().setUp() + def make_test_protocol(self): + raise NotImplementedError + + def setup_globals(self): self.loop = self.new_test_loop() - self.protocol = test_utils.make_test_protocol(asyncio.Protocol) + self.protocol = None self.pipe = mock.Mock(spec_set=io.RawIOBase) self.pipe.fileno.return_value = 5 @@ -662,6 +664,8 @@ class UnixReadPipeTransportTests(test_utils.TestCase): m_fstat.return_value = st self.addCleanup(fstat_patcher.stop) + self.make_test_protocol() + def read_pipe_transport(self, waiter=None): transport = unix_events._UnixReadPipeTransport(self.loop, self.pipe, self.protocol, @@ -678,54 +682,6 @@ class UnixReadPipeTransportTests(test_utils.TestCase): self.loop.assert_reader(5, tr._read_ready) self.assertIsNone(waiter.result()) - @mock.patch('os.read') - def test__read_ready(self, m_read): - tr = self.read_pipe_transport() - m_read.return_value = b'data' - tr._read_ready() - - m_read.assert_called_with(5, tr.max_size) - self.protocol.data_received.assert_called_with(b'data') - - @mock.patch('os.read') - def test__read_ready_eof(self, m_read): - tr = self.read_pipe_transport() - m_read.return_value = b'' - tr._read_ready() - - m_read.assert_called_with(5, tr.max_size) - self.assertFalse(self.loop.readers) - test_utils.run_briefly(self.loop) - self.protocol.eof_received.assert_called_with() - self.protocol.connection_lost.assert_called_with(None) - - @mock.patch('os.read') - def test__read_ready_blocked(self, m_read): - tr = self.read_pipe_transport() - m_read.side_effect = BlockingIOError - tr._read_ready() - - m_read.assert_called_with(5, tr.max_size) - test_utils.run_briefly(self.loop) - self.assertFalse(self.protocol.data_received.called) - - @mock.patch('asyncio.log.logger.error') - @mock.patch('os.read') - def test__read_ready_error(self, m_read, m_logexc): - tr = self.read_pipe_transport() - err = OSError() - m_read.side_effect = err - tr._close = mock.Mock() - tr._read_ready() - - m_read.assert_called_with(5, tr.max_size) - tr._close.assert_called_with(err) - m_logexc.assert_called_with( - test_utils.MockPattern( - 'Fatal read error on pipe transport' - '\nprotocol:.*\ntransport:.*'), - exc_info=(OSError, MOCK_ANY, MOCK_ANY)) - @mock.patch('os.read') def test_pause_reading(self, m_read): tr = self.read_pipe_transport() @@ -819,6 +775,126 @@ class UnixReadPipeTransportTests(test_utils.TestCase): tr.resume_reading() +class UnixReadPipeTransportWithProtocolTests(test_utils.TestCase, UnixReadPipeTransportTestsBase): + + def make_test_protocol(self): + self.protocol = test_utils.make_test_protocol(asyncio.Protocol) + + def setUp(self): + super().setUp() + self.setup_globals() + + @mock.patch('os.read') + def test__read_ready(self, m_read): + tr = self.read_pipe_transport() + m_read.return_value = b'data' + tr._read_ready() + + m_read.assert_called_with(5, tr.max_size) + self.protocol.data_received.assert_called_with(b'data') + + @mock.patch('os.read') + def test__read_ready_eof(self, m_read): + tr = self.read_pipe_transport() + m_read.return_value = b'' + tr._read_ready() + + m_read.assert_called_with(5, tr.max_size) + self.assertFalse(self.loop.readers) + test_utils.run_briefly(self.loop) + self.protocol.eof_received.assert_called_with() + self.protocol.connection_lost.assert_called_with(None) + + @mock.patch('os.read') + def test__read_ready_blocked(self, m_read): + tr = self.read_pipe_transport() + m_read.side_effect = BlockingIOError + tr._read_ready() + + m_read.assert_called_with(5, tr.max_size) + test_utils.run_briefly(self.loop) + self.assertFalse(self.protocol.data_received.called) + + @mock.patch('asyncio.log.logger.error') + @mock.patch('os.read') + def test__read_ready_error(self, m_read, m_logexc): + tr = self.read_pipe_transport() + err = OSError() + m_read.side_effect = err + tr._close = mock.Mock() + tr._read_ready() + + m_read.assert_called_with(5, tr.max_size) + tr._close.assert_called_with(err) + m_logexc.assert_called_with( + test_utils.MockPattern( + 'Fatal read error on pipe transport' + '\nprotocol:.*\ntransport:.*'), + exc_info=(OSError, MOCK_ANY, MOCK_ANY)) + + +class UnixReadPipeTransportWithBufferedProtocolTests(test_utils.TestCase, UnixReadPipeTransportTestsBase): + + def make_test_protocol(self): + self.protocol = test_utils.make_test_buffered_protocol( + asyncio.BufferedProtocol, 65536) + + def setUp(self): + super().setUp() + self.setup_globals() + + def set_next_buffered_read(self, data): + data_length = len(data) + buf = self.protocol.get_buffer(data_length) + buf[:data_length] = data + self.pipe.readinto.return_value = data_length + + def test__read_ready(self): + tr = self.read_pipe_transport() + data = b'data' + self.set_next_buffered_read(data) + tr._read_ready() + + self.pipe.readinto.assert_called_once() + self.assertTrue(self.protocol.buffer_updated.called) + self.assertEqual(self.protocol._last_called_buffer[:len(data)], bytearray(data)) + + def test__read_ready_eof(self): + tr = self.read_pipe_transport() + self.set_next_buffered_read(b'') + tr._read_ready() + + self.pipe.readinto.assert_called_once() + self.assertFalse(self.loop.readers) + test_utils.run_briefly(self.loop) + self.protocol.eof_received.assert_called_with() + + def test__read_ready_blocked(self): + tr = self.read_pipe_transport() + self.pipe.readinto.side_effect = BlockingIOError + tr._read_ready() + + self.pipe.readinto.assert_called_once() + test_utils.run_briefly(self.loop) + self.assertFalse(self.protocol.buffer_updated.called) + + @mock.patch('asyncio.log.logger.error') + def test__read_ready_error(self, m_logexc): + tr = self.read_pipe_transport() + err = OSError() + self.pipe.readinto.side_effect = err + tr._close = mock.Mock() + tr._read_ready() + + self.pipe.readinto.assert_called_once() + tr._close.assert_called_with(err) + m_logexc.assert_called_with( + test_utils.MockPattern( + 'Fatal read error on pipe transport' + '\nprotocol:.*\ntransport:.*'), + exc_info=(OSError, MOCK_ANY, MOCK_ANY)) + + class UnixWritePipeTransportTests(test_utils.TestCase): def setUp(self):