"""Tests for proactor_events.py""" import io import socket import unittest import sys from unittest import mock import asyncio from asyncio.proactor_events import BaseProactorEventLoop from asyncio.proactor_events import _ProactorSocketTransport from asyncio.proactor_events import _ProactorWritePipeTransport from asyncio.proactor_events import _ProactorDuplexPipeTransport from asyncio.proactor_events import _ProactorDatagramTransport from test.support import os_helper from test.support import socket_helper from test.test_asyncio import utils as test_utils def tearDownModule(): asyncio.set_event_loop_policy(None) def close_transport(transport): # Don't call transport.close() because the event loop and the IOCP proactor # are mocked if transport._sock is None: return transport._sock.close() transport._sock = None class ProactorSocketTransportTests(test_utils.TestCase): def setUp(self): super().setUp() self.loop = self.new_test_loop() self.addCleanup(self.loop.close) self.proactor = mock.Mock() self.loop._proactor = self.proactor self.protocol = test_utils.make_test_protocol(asyncio.Protocol) self.sock = mock.Mock(socket.socket) self.buffer_size = 65536 def socket_transport(self, waiter=None): transport = _ProactorSocketTransport(self.loop, self.sock, self.protocol, waiter=waiter) self.addCleanup(close_transport, transport) return transport def test_ctor(self): fut = self.loop.create_future() tr = self.socket_transport(waiter=fut) test_utils.run_briefly(self.loop) self.assertIsNone(fut.result()) self.protocol.connection_made(tr) self.proactor.recv_into.assert_called_with(self.sock, bytearray(self.buffer_size)) def test_loop_reading(self): tr = self.socket_transport() tr._loop_reading() self.loop._proactor.recv_into.assert_called_with(self.sock, bytearray(self.buffer_size)) self.assertFalse(self.protocol.data_received.called) self.assertFalse(self.protocol.eof_received.called) def test_loop_reading_data(self): buf = b'data' res = self.loop.create_future() res.set_result(len(buf)) tr = self.socket_transport() tr._read_fut = res tr._data[:len(buf)] = buf tr._loop_reading(res) called_buf = bytearray(self.buffer_size) called_buf[:len(buf)] = buf self.loop._proactor.recv_into.assert_called_with(self.sock, called_buf) self.protocol.data_received.assert_called_with(buf) # assert_called_with maps bytearray and bytes to the same thing so check manually # regression test for https://github.com/python/cpython/issues/99941 self.assertIsInstance(self.protocol.data_received.call_args.args[0], bytes) @unittest.skipIf(sys.flags.optimize, "Assertions are disabled in optimized mode") def test_loop_reading_no_data(self): res = self.loop.create_future() res.set_result(0) tr = self.socket_transport() self.assertRaises(AssertionError, tr._loop_reading, res) tr.close = mock.Mock() tr._read_fut = res tr._loop_reading(res) self.assertFalse(self.loop._proactor.recv_into.called) self.assertTrue(self.protocol.eof_received.called) self.assertTrue(tr.close.called) def test_loop_reading_aborted(self): err = self.loop._proactor.recv_into.side_effect = ConnectionAbortedError() tr = self.socket_transport() tr._fatal_error = mock.Mock() tr._loop_reading() tr._fatal_error.assert_called_with( err, 'Fatal read error on pipe transport') def test_loop_reading_aborted_closing(self): self.loop._proactor.recv_into.side_effect = ConnectionAbortedError() tr = self.socket_transport() tr._closing = True tr._fatal_error = mock.Mock() tr._loop_reading() self.assertFalse(tr._fatal_error.called) def test_loop_reading_aborted_is_fatal(self): self.loop._proactor.recv_into.side_effect = ConnectionAbortedError() tr = self.socket_transport() tr._closing = False tr._fatal_error = mock.Mock() tr._loop_reading() self.assertTrue(tr._fatal_error.called) def test_loop_reading_conn_reset_lost(self): err = self.loop._proactor.recv_into.side_effect = ConnectionResetError() tr = self.socket_transport() tr._closing = False tr._fatal_error = mock.Mock() tr._force_close = mock.Mock() tr._loop_reading() self.assertFalse(tr._fatal_error.called) tr._force_close.assert_called_with(err) def test_loop_reading_exception(self): err = self.loop._proactor.recv_into.side_effect = (OSError()) tr = self.socket_transport() tr._fatal_error = mock.Mock() tr._loop_reading() tr._fatal_error.assert_called_with( err, 'Fatal read error on pipe transport') def test_write(self): tr = self.socket_transport() tr._loop_writing = mock.Mock() tr.write(b'data') self.assertEqual(tr._buffer, None) tr._loop_writing.assert_called_with(data=b'data') def test_write_no_data(self): tr = self.socket_transport() tr.write(b'') self.assertFalse(tr._buffer) def test_write_more(self): tr = self.socket_transport() tr._write_fut = mock.Mock() tr._loop_writing = mock.Mock() tr.write(b'data') self.assertEqual(tr._buffer, b'data') self.assertFalse(tr._loop_writing.called) def test_loop_writing(self): tr = self.socket_transport() tr._buffer = bytearray(b'data') tr._loop_writing() self.loop._proactor.send.assert_called_with(self.sock, b'data') self.loop._proactor.send.return_value.add_done_callback.\ assert_called_with(tr._loop_writing) @mock.patch('asyncio.proactor_events.logger') def test_loop_writing_err(self, m_log): err = self.loop._proactor.send.side_effect = OSError() tr = self.socket_transport() tr._fatal_error = mock.Mock() tr._buffer = [b'da', b'ta'] tr._loop_writing() tr._fatal_error.assert_called_with( err, 'Fatal write error on pipe transport') tr._conn_lost = 1 tr.write(b'data') tr.write(b'data') tr.write(b'data') tr.write(b'data') tr.write(b'data') self.assertEqual(tr._buffer, None) m_log.warning.assert_called_with('socket.send() raised exception.') def test_loop_writing_stop(self): fut = self.loop.create_future() fut.set_result(b'data') tr = self.socket_transport() tr._write_fut = fut tr._loop_writing(fut) self.assertIsNone(tr._write_fut) def test_loop_writing_closing(self): fut = self.loop.create_future() fut.set_result(1) tr = self.socket_transport() tr._write_fut = fut tr.close() tr._loop_writing(fut) self.assertIsNone(tr._write_fut) test_utils.run_briefly(self.loop) self.protocol.connection_lost.assert_called_with(None) def test_abort(self): tr = self.socket_transport() tr._force_close = mock.Mock() tr.abort() tr._force_close.assert_called_with(None) def test_close(self): tr = self.socket_transport() tr.close() test_utils.run_briefly(self.loop) self.protocol.connection_lost.assert_called_with(None) self.assertTrue(tr.is_closing()) self.assertEqual(tr._conn_lost, 1) self.protocol.connection_lost.reset_mock() tr.close() test_utils.run_briefly(self.loop) self.assertFalse(self.protocol.connection_lost.called) def test_close_write_fut(self): tr = self.socket_transport() tr._write_fut = mock.Mock() tr.close() test_utils.run_briefly(self.loop) self.assertFalse(self.protocol.connection_lost.called) def test_close_buffer(self): tr = self.socket_transport() tr._buffer = [b'data'] tr.close() test_utils.run_briefly(self.loop) self.assertFalse(self.protocol.connection_lost.called) def test_close_invalid_sockobj(self): tr = self.socket_transport() self.sock.fileno.return_value = -1 tr.close() test_utils.run_briefly(self.loop) self.protocol.connection_lost.assert_called_with(None) self.assertFalse(self.sock.shutdown.called) @mock.patch('asyncio.base_events.logger') def test_fatal_error(self, m_logging): tr = self.socket_transport() tr._force_close = mock.Mock() tr._fatal_error(None) self.assertTrue(tr._force_close.called) self.assertTrue(m_logging.error.called) def test_force_close(self): tr = self.socket_transport() tr._buffer = [b'data'] read_fut = tr._read_fut = mock.Mock() write_fut = tr._write_fut = mock.Mock() tr._force_close(None) read_fut.cancel.assert_called_with() write_fut.cancel.assert_called_with() test_utils.run_briefly(self.loop) self.protocol.connection_lost.assert_called_with(None) self.assertEqual(None, tr._buffer) self.assertEqual(tr._conn_lost, 1) def test_loop_writing_force_close(self): exc_handler = mock.Mock() self.loop.set_exception_handler(exc_handler) fut = self.loop.create_future() fut.set_result(1) self.proactor.send.return_value = fut tr = self.socket_transport() tr.write(b'data') tr._force_close(None) test_utils.run_briefly(self.loop) exc_handler.assert_not_called() def test_force_close_idempotent(self): tr = self.socket_transport() tr._closing = True tr._force_close(None) test_utils.run_briefly(self.loop) # See https://github.com/python/cpython/issues/89237 # `protocol.connection_lost` should be called even if # the transport was closed forcefully otherwise # the resources held by protocol will never be freed # and waiters will never be notified leading to hang. self.assertTrue(self.protocol.connection_lost.called) def test_force_close_protocol_connection_lost_once(self): tr = self.socket_transport() self.assertFalse(self.protocol.connection_lost.called) tr._closing = True # Calling _force_close twice should not call # protocol.connection_lost twice tr._force_close(None) tr._force_close(None) test_utils.run_briefly(self.loop) self.assertEqual(1, self.protocol.connection_lost.call_count) def test_close_protocol_connection_lost_once(self): tr = self.socket_transport() self.assertFalse(self.protocol.connection_lost.called) # Calling close twice should not call # protocol.connection_lost twice tr.close() tr.close() test_utils.run_briefly(self.loop) self.assertEqual(1, self.protocol.connection_lost.call_count) def test_fatal_error_2(self): tr = self.socket_transport() tr._buffer = [b'data'] tr._force_close(None) test_utils.run_briefly(self.loop) self.protocol.connection_lost.assert_called_with(None) self.assertEqual(None, tr._buffer) def test_call_connection_lost(self): tr = self.socket_transport() tr._call_connection_lost(None) self.assertTrue(self.protocol.connection_lost.called) self.assertTrue(self.sock.close.called) def test_write_eof(self): tr = self.socket_transport() self.assertTrue(tr.can_write_eof()) tr.write_eof() self.sock.shutdown.assert_called_with(socket.SHUT_WR) tr.write_eof() self.assertEqual(self.sock.shutdown.call_count, 1) tr.close() def test_write_eof_buffer(self): tr = self.socket_transport() f = self.loop.create_future() tr._loop._proactor.send.return_value = f tr.write(b'data') tr.write_eof() self.assertTrue(tr._eof_written) self.assertFalse(self.sock.shutdown.called) tr._loop._proactor.send.assert_called_with(self.sock, b'data') f.set_result(4) self.loop._run_once() self.sock.shutdown.assert_called_with(socket.SHUT_WR) tr.close() def test_write_eof_write_pipe(self): tr = _ProactorWritePipeTransport( self.loop, self.sock, self.protocol) self.assertTrue(tr.can_write_eof()) tr.write_eof() self.assertTrue(tr.is_closing()) self.loop._run_once() self.assertTrue(self.sock.close.called) tr.close() def test_write_eof_buffer_write_pipe(self): tr = _ProactorWritePipeTransport(self.loop, self.sock, self.protocol) f = self.loop.create_future() tr._loop._proactor.send.return_value = f tr.write(b'data') tr.write_eof() self.assertTrue(tr.is_closing()) self.assertFalse(self.sock.shutdown.called) tr._loop._proactor.send.assert_called_with(self.sock, b'data') f.set_result(4) self.loop._run_once() self.loop._run_once() self.assertTrue(self.sock.close.called) tr.close() def test_write_eof_duplex_pipe(self): tr = _ProactorDuplexPipeTransport( self.loop, self.sock, self.protocol) self.assertFalse(tr.can_write_eof()) with self.assertRaises(NotImplementedError): tr.write_eof() close_transport(tr) def test_pause_resume_reading(self): tr = self.socket_transport() index = 0 msgs = [b'data1', b'data2', b'data3', b'data4', b'data5', b''] reversed_msgs = list(reversed(msgs)) def recv_into(sock, data): f = self.loop.create_future() msg = reversed_msgs.pop() result = f.result def monkey(): data[:len(msg)] = msg return result() f.result = monkey f.set_result(len(msg)) return f self.loop._proactor.recv_into.side_effect = recv_into self.loop._run_once() self.assertFalse(tr._paused) self.assertTrue(tr.is_reading()) for msg in msgs[:2]: self.loop._run_once() self.protocol.data_received.assert_called_with(bytearray(msg)) tr.pause_reading() tr.pause_reading() self.assertTrue(tr._paused) self.assertFalse(tr.is_reading()) for i in range(10): self.loop._run_once() self.protocol.data_received.assert_called_with(bytearray(msgs[1])) tr.resume_reading() tr.resume_reading() self.assertFalse(tr._paused) self.assertTrue(tr.is_reading()) for msg in msgs[2:4]: self.loop._run_once() self.protocol.data_received.assert_called_with(bytearray(msg)) tr.pause_reading() tr.resume_reading() self.loop.call_exception_handler = mock.Mock() self.loop._run_once() self.loop.call_exception_handler.assert_not_called() self.protocol.data_received.assert_called_with(bytearray(msgs[4])) tr.close() self.assertFalse(tr.is_reading()) def test_pause_reading_connection_made(self): tr = self.socket_transport() self.protocol.connection_made.side_effect = lambda _: tr.pause_reading() test_utils.run_briefly(self.loop) self.assertFalse(tr.is_reading()) self.loop.assert_no_reader(7) tr.resume_reading() self.assertTrue(tr.is_reading()) tr.close() self.assertFalse(tr.is_reading()) def pause_writing_transport(self, high): tr = self.socket_transport() tr.set_write_buffer_limits(high=high) self.assertEqual(tr.get_write_buffer_size(), 0) self.assertFalse(self.protocol.pause_writing.called) self.assertFalse(self.protocol.resume_writing.called) return tr def test_pause_resume_writing(self): tr = self.pause_writing_transport(high=4) # write a large chunk, must pause writing fut = self.loop.create_future() self.loop._proactor.send.return_value = fut tr.write(b'large data') self.loop._run_once() self.assertTrue(self.protocol.pause_writing.called) # flush the buffer fut.set_result(None) self.loop._run_once() self.assertEqual(tr.get_write_buffer_size(), 0) self.assertTrue(self.protocol.resume_writing.called) def test_pause_writing_2write(self): tr = self.pause_writing_transport(high=4) # first short write, the buffer is not full (3 <= 4) fut1 = self.loop.create_future() self.loop._proactor.send.return_value = fut1 tr.write(b'123') self.loop._run_once() self.assertEqual(tr.get_write_buffer_size(), 3) self.assertFalse(self.protocol.pause_writing.called) # fill the buffer, must pause writing (6 > 4) tr.write(b'abc') self.loop._run_once() self.assertEqual(tr.get_write_buffer_size(), 6) self.assertTrue(self.protocol.pause_writing.called) def test_pause_writing_3write(self): tr = self.pause_writing_transport(high=4) # first short write, the buffer is not full (1 <= 4) fut = self.loop.create_future() self.loop._proactor.send.return_value = fut tr.write(b'1') self.loop._run_once() self.assertEqual(tr.get_write_buffer_size(), 1) self.assertFalse(self.protocol.pause_writing.called) # second short write, the buffer is not full (3 <= 4) tr.write(b'23') self.loop._run_once() self.assertEqual(tr.get_write_buffer_size(), 3) self.assertFalse(self.protocol.pause_writing.called) # fill the buffer, must pause writing (6 > 4) tr.write(b'abc') self.loop._run_once() self.assertEqual(tr.get_write_buffer_size(), 6) self.assertTrue(self.protocol.pause_writing.called) def test_dont_pause_writing(self): tr = self.pause_writing_transport(high=4) # write a large chunk which completes immediately, # it should not pause writing fut = self.loop.create_future() fut.set_result(None) self.loop._proactor.send.return_value = fut tr.write(b'very large data') self.loop._run_once() self.assertEqual(tr.get_write_buffer_size(), 0) self.assertFalse(self.protocol.pause_writing.called) class ProactorDatagramTransportTests(test_utils.TestCase): def setUp(self): super().setUp() self.loop = self.new_test_loop() self.proactor = mock.Mock() self.loop._proactor = self.proactor self.protocol = test_utils.make_test_protocol(asyncio.DatagramProtocol) self.sock = mock.Mock(spec_set=socket.socket) self.sock.fileno.return_value = 7 def datagram_transport(self, address=None): self.sock.getpeername.side_effect = None if address else OSError transport = _ProactorDatagramTransport(self.loop, self.sock, self.protocol, address=address) self.addCleanup(close_transport, transport) return transport def test_sendto(self): data = b'data' transport = self.datagram_transport() transport.sendto(data, ('0.0.0.0', 1234)) self.assertTrue(self.proactor.sendto.called) self.proactor.sendto.assert_called_with( self.sock, data, addr=('0.0.0.0', 1234)) def test_sendto_bytearray(self): data = bytearray(b'data') transport = self.datagram_transport() transport.sendto(data, ('0.0.0.0', 1234)) self.assertTrue(self.proactor.sendto.called) self.proactor.sendto.assert_called_with( self.sock, b'data', addr=('0.0.0.0', 1234)) def test_sendto_memoryview(self): data = memoryview(b'data') transport = self.datagram_transport() transport.sendto(data, ('0.0.0.0', 1234)) self.assertTrue(self.proactor.sendto.called) self.proactor.sendto.assert_called_with( self.sock, b'data', addr=('0.0.0.0', 1234)) def test_sendto_no_data(self): transport = self.datagram_transport() transport.sendto(b'', ('0.0.0.0', 1234)) self.assertTrue(self.proactor.sendto.called) self.proactor.sendto.assert_called_with( self.sock, b'', addr=('0.0.0.0', 1234)) def test_sendto_buffer(self): transport = self.datagram_transport() transport._buffer.append((b'data1', ('0.0.0.0', 12345))) transport._write_fut = object() transport.sendto(b'data2', ('0.0.0.0', 12345)) self.assertFalse(self.proactor.sendto.called) self.assertEqual( [(b'data1', ('0.0.0.0', 12345)), (b'data2', ('0.0.0.0', 12345))], list(transport._buffer)) def test_sendto_buffer_bytearray(self): data2 = bytearray(b'data2') transport = self.datagram_transport() transport._buffer.append((b'data1', ('0.0.0.0', 12345))) transport._write_fut = object() transport.sendto(data2, ('0.0.0.0', 12345)) self.assertFalse(self.proactor.sendto.called) self.assertEqual( [(b'data1', ('0.0.0.0', 12345)), (b'data2', ('0.0.0.0', 12345))], list(transport._buffer)) self.assertIsInstance(transport._buffer[1][0], bytes) def test_sendto_buffer_memoryview(self): data2 = memoryview(b'data2') transport = self.datagram_transport() transport._buffer.append((b'data1', ('0.0.0.0', 12345))) transport._write_fut = object() transport.sendto(data2, ('0.0.0.0', 12345)) self.assertFalse(self.proactor.sendto.called) self.assertEqual( [(b'data1', ('0.0.0.0', 12345)), (b'data2', ('0.0.0.0', 12345))], list(transport._buffer)) self.assertIsInstance(transport._buffer[1][0], bytes) def test_sendto_buffer_nodata(self): data2 = b'' transport = self.datagram_transport() transport._buffer.append((b'data1', ('0.0.0.0', 12345))) transport._write_fut = object() transport.sendto(data2, ('0.0.0.0', 12345)) self.assertFalse(self.proactor.sendto.called) self.assertEqual( [(b'data1', ('0.0.0.0', 12345)), (b'', ('0.0.0.0', 12345))], list(transport._buffer)) self.assertIsInstance(transport._buffer[1][0], bytes) @mock.patch('asyncio.proactor_events.logger') def test_sendto_exception(self, m_log): data = b'data' err = self.proactor.sendto.side_effect = RuntimeError() transport = self.datagram_transport() transport._fatal_error = mock.Mock() transport.sendto(data, ()) self.assertTrue(transport._fatal_error.called) transport._fatal_error.assert_called_with( err, 'Fatal write error on datagram transport') transport._conn_lost = 1 transport._address = ('123',) transport.sendto(data) transport.sendto(data) transport.sendto(data) transport.sendto(data) transport.sendto(data) m_log.warning.assert_called_with('socket.sendto() raised exception.') def test_sendto_error_received(self): data = b'data' self.sock.sendto.side_effect = ConnectionRefusedError transport = self.datagram_transport() transport._fatal_error = mock.Mock() transport.sendto(data, ()) self.assertEqual(transport._conn_lost, 0) self.assertFalse(transport._fatal_error.called) def test_sendto_error_received_connected(self): data = b'data' self.proactor.send.side_effect = ConnectionRefusedError transport = self.datagram_transport(address=('0.0.0.0', 1)) transport._fatal_error = mock.Mock() transport.sendto(data) self.assertFalse(transport._fatal_error.called) self.assertTrue(self.protocol.error_received.called) def test_sendto_str(self): transport = self.datagram_transport() self.assertRaises(TypeError, transport.sendto, 'str', ()) def test_sendto_connected_addr(self): transport = self.datagram_transport(address=('0.0.0.0', 1)) self.assertRaises( ValueError, transport.sendto, b'str', ('0.0.0.0', 2)) def test_sendto_closing(self): transport = self.datagram_transport(address=(1,)) transport.close() self.assertEqual(transport._conn_lost, 1) transport.sendto(b'data', (1,)) self.assertEqual(transport._conn_lost, 2) def test__loop_writing_closing(self): transport = self.datagram_transport() transport._closing = True transport._loop_writing() self.assertIsNone(transport._write_fut) test_utils.run_briefly(self.loop) self.sock.close.assert_called_with() self.protocol.connection_lost.assert_called_with(None) def test__loop_writing_exception(self): err = self.proactor.sendto.side_effect = RuntimeError() transport = self.datagram_transport() transport._fatal_error = mock.Mock() transport._buffer.append((b'data', ())) transport._loop_writing() transport._fatal_error.assert_called_with( err, 'Fatal write error on datagram transport') def test__loop_writing_error_received(self): self.proactor.sendto.side_effect = ConnectionRefusedError transport = self.datagram_transport() transport._fatal_error = mock.Mock() transport._buffer.append((b'data', ())) transport._loop_writing() self.assertFalse(transport._fatal_error.called) def test__loop_writing_error_received_connection(self): self.proactor.send.side_effect = ConnectionRefusedError transport = self.datagram_transport(address=('0.0.0.0', 1)) transport._fatal_error = mock.Mock() transport._buffer.append((b'data', ())) transport._loop_writing() self.assertFalse(transport._fatal_error.called) self.assertTrue(self.protocol.error_received.called) @mock.patch('asyncio.base_events.logger.error') def test_fatal_error_connected(self, m_exc): transport = self.datagram_transport(address=('0.0.0.0', 1)) err = ConnectionRefusedError() transport._fatal_error(err) self.assertFalse(self.protocol.error_received.called) m_exc.assert_not_called() class BaseProactorEventLoopTests(test_utils.TestCase): def setUp(self): super().setUp() self.sock = test_utils.mock_nonblocking_socket() self.proactor = mock.Mock() self.ssock, self.csock = mock.Mock(), mock.Mock() with mock.patch('asyncio.proactor_events.socket.socketpair', return_value=(self.ssock, self.csock)): with mock.patch('signal.set_wakeup_fd'): self.loop = BaseProactorEventLoop(self.proactor) self.set_event_loop(self.loop) @mock.patch('asyncio.proactor_events.socket.socketpair') def test_ctor(self, socketpair): ssock, csock = socketpair.return_value = ( mock.Mock(), mock.Mock()) with mock.patch('signal.set_wakeup_fd'): loop = BaseProactorEventLoop(self.proactor) self.assertIs(loop._ssock, ssock) self.assertIs(loop._csock, csock) self.assertEqual(loop._internal_fds, 1) loop.close() def test_close_self_pipe(self): self.loop._close_self_pipe() self.assertEqual(self.loop._internal_fds, 0) self.assertTrue(self.ssock.close.called) self.assertTrue(self.csock.close.called) self.assertIsNone(self.loop._ssock) self.assertIsNone(self.loop._csock) # Don't call close(): _close_self_pipe() cannot be called twice self.loop._closed = True def test_close(self): self.loop._close_self_pipe = mock.Mock() self.loop.close() self.assertTrue(self.loop._close_self_pipe.called) self.assertTrue(self.proactor.close.called) self.assertIsNone(self.loop._proactor) self.loop._close_self_pipe.reset_mock() self.loop.close() self.assertFalse(self.loop._close_self_pipe.called) def test_make_socket_transport(self): tr = self.loop._make_socket_transport(self.sock, asyncio.Protocol()) self.assertIsInstance(tr, _ProactorSocketTransport) close_transport(tr) def test_loop_self_reading(self): self.loop._loop_self_reading() self.proactor.recv.assert_called_with(self.ssock, 4096) self.proactor.recv.return_value.add_done_callback.assert_called_with( self.loop._loop_self_reading) def test_loop_self_reading_fut(self): fut = mock.Mock() self.loop._self_reading_future = fut self.loop._loop_self_reading(fut) self.assertTrue(fut.result.called) self.proactor.recv.assert_called_with(self.ssock, 4096) self.proactor.recv.return_value.add_done_callback.assert_called_with( self.loop._loop_self_reading) def test_loop_self_reading_exception(self): self.loop.call_exception_handler = mock.Mock() self.proactor.recv.side_effect = OSError() self.loop._loop_self_reading() self.assertTrue(self.loop.call_exception_handler.called) def test_write_to_self(self): self.loop._write_to_self() self.csock.send.assert_called_with(b'\0') def test_process_events(self): self.loop._process_events([]) @mock.patch('asyncio.base_events.logger') def test_create_server(self, m_log): pf = mock.Mock() call_soon = self.loop.call_soon = mock.Mock() self.loop._start_serving(pf, self.sock) self.assertTrue(call_soon.called) # callback loop = call_soon.call_args[0][0] loop() self.proactor.accept.assert_called_with(self.sock) # conn fut = mock.Mock() fut.result.return_value = (mock.Mock(), mock.Mock()) make_tr = self.loop._make_socket_transport = mock.Mock() loop(fut) self.assertTrue(fut.result.called) self.assertTrue(make_tr.called) # exception fut.result.side_effect = OSError() loop(fut) self.assertTrue(self.sock.close.called) self.assertTrue(m_log.error.called) def test_create_server_cancel(self): pf = mock.Mock() call_soon = self.loop.call_soon = mock.Mock() self.loop._start_serving(pf, self.sock) loop = call_soon.call_args[0][0] # cancelled fut = self.loop.create_future() fut.cancel() loop(fut) self.assertTrue(self.sock.close.called) def test_stop_serving(self): sock1 = mock.Mock() future1 = mock.Mock() sock2 = mock.Mock() future2 = mock.Mock() self.loop._accept_futures = { sock1.fileno(): future1, sock2.fileno(): future2 } self.loop._stop_serving(sock1) self.assertTrue(sock1.close.called) self.assertTrue(future1.cancel.called) self.proactor._stop_serving.assert_called_with(sock1) self.assertFalse(sock2.close.called) self.assertFalse(future2.cancel.called) def datagram_transport(self): self.protocol = test_utils.make_test_protocol(asyncio.DatagramProtocol) return self.loop._make_datagram_transport(self.sock, self.protocol) def test_make_datagram_transport(self): tr = self.datagram_transport() self.assertIsInstance(tr, _ProactorDatagramTransport) self.assertIsInstance(tr, asyncio.DatagramTransport) close_transport(tr) def test_datagram_loop_writing(self): tr = self.datagram_transport() tr._buffer.appendleft((b'data', ('127.0.0.1', 12068))) tr._loop_writing() self.loop._proactor.sendto.assert_called_with(self.sock, b'data', addr=('127.0.0.1', 12068)) self.loop._proactor.sendto.return_value.add_done_callback.\ assert_called_with(tr._loop_writing) close_transport(tr) def test_datagram_loop_reading(self): tr = self.datagram_transport() tr._loop_reading() self.loop._proactor.recvfrom.assert_called_with(self.sock, 256 * 1024) self.assertFalse(self.protocol.datagram_received.called) self.assertFalse(self.protocol.error_received.called) close_transport(tr) def test_datagram_loop_reading_data(self): res = self.loop.create_future() res.set_result((b'data', ('127.0.0.1', 12068))) tr = self.datagram_transport() tr._read_fut = res tr._loop_reading(res) self.loop._proactor.recvfrom.assert_called_with(self.sock, 256 * 1024) self.protocol.datagram_received.assert_called_with(b'data', ('127.0.0.1', 12068)) close_transport(tr) @unittest.skipIf(sys.flags.optimize, "Assertions are disabled in optimized mode") def test_datagram_loop_reading_no_data(self): res = self.loop.create_future() res.set_result((b'', ('127.0.0.1', 12068))) tr = self.datagram_transport() self.assertRaises(AssertionError, tr._loop_reading, res) tr.close = mock.Mock() tr._read_fut = res tr._loop_reading(res) self.assertTrue(self.loop._proactor.recvfrom.called) self.assertFalse(self.protocol.error_received.called) self.assertFalse(tr.close.called) close_transport(tr) def test_datagram_loop_reading_aborted(self): err = self.loop._proactor.recvfrom.side_effect = ConnectionAbortedError() tr = self.datagram_transport() tr._fatal_error = mock.Mock() tr._protocol.error_received = mock.Mock() tr._loop_reading() tr._protocol.error_received.assert_called_with(err) close_transport(tr) def test_datagram_loop_writing_aborted(self): err = self.loop._proactor.sendto.side_effect = ConnectionAbortedError() tr = self.datagram_transport() tr._fatal_error = mock.Mock() tr._protocol.error_received = mock.Mock() tr._buffer.appendleft((b'Hello', ('127.0.0.1', 12068))) tr._loop_writing() tr._protocol.error_received.assert_called_with(err) close_transport(tr) @unittest.skipIf(sys.platform != 'win32', 'Proactor is supported on Windows only') class ProactorEventLoopUnixSockSendfileTests(test_utils.TestCase): DATA = b"12345abcde" * 16 * 1024 # 160 KiB class MyProto(asyncio.Protocol): def __init__(self, loop): self.started = False self.closed = False self.data = bytearray() self.fut = loop.create_future() self.transport = None def connection_made(self, transport): self.started = True self.transport = transport def data_received(self, data): self.data.extend(data) def connection_lost(self, exc): self.closed = True self.fut.set_result(None) async def wait_closed(self): await self.fut @classmethod def setUpClass(cls): with open(os_helper.TESTFN, 'wb') as fp: fp.write(cls.DATA) super().setUpClass() @classmethod def tearDownClass(cls): os_helper.unlink(os_helper.TESTFN) super().tearDownClass() def setUp(self): self.loop = asyncio.ProactorEventLoop() self.set_event_loop(self.loop) self.addCleanup(self.loop.close) self.file = open(os_helper.TESTFN, 'rb') self.addCleanup(self.file.close) super().setUp() def make_socket(self, cleanup=True, blocking=False): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.setblocking(blocking) sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1024) sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1024) if cleanup: self.addCleanup(sock.close) return sock def run_loop(self, coro): return self.loop.run_until_complete(coro) def prepare(self): sock = self.make_socket() proto = self.MyProto(self.loop) port = socket_helper.find_unused_port() srv_sock = self.make_socket(cleanup=False) srv_sock.bind(('127.0.0.1', port)) server = self.run_loop(self.loop.create_server( lambda: proto, sock=srv_sock)) self.run_loop(self.loop.sock_connect(sock, srv_sock.getsockname())) def cleanup(): if proto.transport is not None: # can be None if the task was cancelled before # connection_made callback proto.transport.close() self.run_loop(proto.wait_closed()) server.close() self.run_loop(server.wait_closed()) self.addCleanup(cleanup) return sock, proto def test_sock_sendfile_not_a_file(self): sock, proto = self.prepare() f = object() with self.assertRaisesRegex(asyncio.SendfileNotAvailableError, "not a regular file"): self.run_loop(self.loop._sock_sendfile_native(sock, f, 0, None)) self.assertEqual(self.file.tell(), 0) def test_sock_sendfile_iobuffer(self): sock, proto = self.prepare() f = io.BytesIO() with self.assertRaisesRegex(asyncio.SendfileNotAvailableError, "not a regular file"): self.run_loop(self.loop._sock_sendfile_native(sock, f, 0, None)) self.assertEqual(self.file.tell(), 0) def test_sock_sendfile_not_regular_file(self): sock, proto = self.prepare() f = mock.Mock() f.fileno.return_value = -1 with self.assertRaisesRegex(asyncio.SendfileNotAvailableError, "not a regular file"): self.run_loop(self.loop._sock_sendfile_native(sock, f, 0, None)) self.assertEqual(self.file.tell(), 0) def test_blocking_socket(self): self.loop.set_debug(True) sock = self.make_socket(blocking=True) with self.assertRaisesRegex(ValueError, "must be non-blocking"): self.run_loop(self.loop.sock_sendfile(sock, self.file)) if __name__ == '__main__': unittest.main()