bpo-32251: Implement asyncio.BufferedProtocol. (#4755)

This commit is contained in:
Yury Selivanov 2018-01-28 16:30:26 -05:00 committed by GitHub
parent 0ceb717689
commit 631fd38dbf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 763 additions and 44 deletions

View File

@ -333,6 +333,16 @@ Protocol classes
The base class for implementing streaming protocols (for use with
e.g. TCP and SSL transports).
.. class:: BufferedProtocol
A base class for implementing streaming protocols with manual
control of the receive buffer.
.. versionadded:: 3.7
**Important:** this has been been added to asyncio in Python 3.7
*on a provisional basis*! Treat it as an experimental API that
might be changed or removed in Python 3.8.
.. class:: DatagramProtocol
The base class for implementing datagram protocols (for use with
@ -428,10 +438,67 @@ and, if called, :meth:`data_received` won't be called after it.
State machine:
start -> :meth:`~BaseProtocol.connection_made`
[-> :meth:`~Protocol.data_received` \*]
[-> :meth:`~Protocol.eof_received` ?]
-> :meth:`~BaseProtocol.connection_lost` -> end
.. code-block:: none
start -> connection_made
[-> data_received]*
[-> eof_received]?
-> connection_lost -> end
Streaming protocols with manual receive buffer control
------------------------------------------------------
.. versionadded:: 3.7
**Important:** :class:`BufferedProtocol` has been been added to
asyncio in Python 3.7 *on a provisional basis*! Treat it as an
experimental API that might be changed or removed in Python 3.8.
Event methods, such as :meth:`AbstractEventLoop.create_server` and
:meth:`AbstractEventLoop.create_connection`, accept factories that
return protocols that implement this interface.
The idea of BufferedProtocol is that it allows to manually allocate
and control the receive buffer. Event loops can then use the buffer
provided by the protocol to avoid unnecessary data copies. This
can result in noticeable performance improvement for protocols that
receive big amounts of data. Sophisticated protocols can allocate
the buffer only once at creation time.
The following callbacks are called on :class:`BufferedProtocol`
instances:
.. method:: BufferedProtocol.get_buffer()
Called to allocate a new receive buffer. Must return an object
that implements the :ref:`buffer protocol <bufferobjects>`.
.. method:: BufferedProtocol.buffer_updated(nbytes)
Called when the buffer was updated with the received data.
*nbytes* is the total number of bytes that were written to the buffer.
.. method:: BufferedProtocol.eof_received()
See the documentation of the :meth:`Protocol.eof_received` method.
:meth:`get_buffer` can be called an arbitrary number of times during
a connection. However, :meth:`eof_received` is called at most once
and, if called, :meth:`data_received` won't be called after it.
State machine:
.. code-block:: none
start -> connection_made
[-> get_buffer
[-> buffer_updated]?
]*
[-> eof_received]?
-> connection_lost -> end
Datagram protocols

View File

@ -12,6 +12,7 @@ import warnings
from . import base_events
from . import constants
from . import futures
from . import protocols
from . import sslproto
from . import transports
from .log import logger
@ -91,17 +92,19 @@ class _ProactorBasePipeTransport(transports._FlowControlMixin,
self.close()
def _fatal_error(self, exc, message='Fatal error on pipe transport'):
if isinstance(exc, base_events._FATAL_ERROR_IGNORE):
if self._loop.get_debug():
logger.debug("%r: %s", self, message, exc_info=True)
else:
self._loop.call_exception_handler({
'message': message,
'exception': exc,
'transport': self,
'protocol': self._protocol,
})
self._force_close(exc)
try:
if isinstance(exc, base_events._FATAL_ERROR_IGNORE):
if self._loop.get_debug():
logger.debug("%r: %s", self, message, exc_info=True)
else:
self._loop.call_exception_handler({
'message': message,
'exception': exc,
'transport': self,
'protocol': self._protocol,
})
finally:
self._force_close(exc)
def _force_close(self, exc):
if self._closing:
@ -150,6 +153,12 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
extra=None, server=None):
super().__init__(loop, sock, protocol, waiter, extra, server)
self._paused = False
if protocols._is_buffered_protocol(protocol):
self._loop_reading = self._loop_reading__get_buffer
else:
self._loop_reading = self._loop_reading__data_received
self._loop.call_soon(self._loop_reading)
def is_reading(self):
@ -159,6 +168,11 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
if self._closing or self._paused:
return
self._paused = True
if self._read_fut is not None and not self._read_fut.done():
self._read_fut.cancel()
self._read_fut = None
if self._loop.get_debug():
logger.debug("%r pauses reading", self)
@ -170,11 +184,25 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
if self._loop.get_debug():
logger.debug("%r resumes reading", self)
def _loop_reading(self, fut=None):
def _loop_reading__on_eof(self):
if self._loop.get_debug():
logger.debug("%r received EOF", self)
try:
keep_open = self._protocol.eof_received()
except Exception as exc:
self._fatal_error(
exc, 'Fatal error: protocol.eof_received() call failed.')
return
if not keep_open:
self.close()
def _loop_reading__data_received(self, fut=None):
if self._paused:
return
data = None
data = None
try:
if fut is not None:
assert self._read_fut is fut or (self._read_fut is None and
@ -197,7 +225,7 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
return
# reschedule a new read
self._read_fut = self._loop._proactor.recv(self._sock, 4096)
self._read_fut = self._loop._proactor.recv(self._sock, 32768)
except ConnectionAbortedError as exc:
if not self._closing:
self._fatal_error(exc, 'Fatal read error on pipe transport')
@ -216,12 +244,81 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
finally:
if data:
self._protocol.data_received(data)
elif data is not None:
if self._loop.get_debug():
logger.debug("%r received EOF", self)
keep_open = self._protocol.eof_received()
if not keep_open:
self.close()
elif data == b'':
self._loop_reading__on_eof()
def _loop_reading__get_buffer(self, fut=None):
if self._paused:
return
nbytes = None
if fut is not None:
assert self._read_fut is fut or (self._read_fut is None and
self._closing)
self._read_fut = None
try:
if fut.done():
nbytes = fut.result()
else:
# the future will be replaced by next proactor.recv call
fut.cancel()
except ConnectionAbortedError as exc:
if not self._closing:
self._fatal_error(
exc, 'Fatal read error on pipe transport')
elif self._loop.get_debug():
logger.debug("Read error on pipe transport while closing",
exc_info=True)
except ConnectionResetError as exc:
self._force_close(exc)
except OSError as exc:
self._fatal_error(exc, 'Fatal read error on pipe transport')
except futures.CancelledError:
if not self._closing:
raise
if nbytes is not None:
if nbytes == 0:
# we got end-of-file so no need to reschedule a new read
self._loop_reading__on_eof()
else:
try:
self._protocol.buffer_updated(nbytes)
except Exception as exc:
self._fatal_error(
exc,
'Fatal error: '
'protocol.buffer_updated() call failed.')
return
if self._closing or nbytes == 0:
# since close() has been called we ignore any read data
return
try:
buf = self._protocol.get_buffer()
except Exception as exc:
self._fatal_error(
exc, 'Fatal error: protocol.get_buffer() call failed.')
return
try:
# schedule a new read
self._read_fut = self._loop._proactor.recv_into(self._sock, buf)
self._read_fut.add_done_callback(self._loop_reading)
except ConnectionAbortedError as exc:
if not self._closing:
self._fatal_error(exc, 'Fatal read error on pipe transport')
elif self._loop.get_debug():
logger.debug("Read error on pipe transport while closing",
exc_info=True)
except ConnectionResetError as exc:
self._force_close(exc)
except OSError as exc:
self._fatal_error(exc, 'Fatal read error on pipe transport')
except futures.CancelledError:
if not self._closing:
raise
class _ProactorBaseWritePipeTransport(_ProactorBasePipeTransport,

View File

@ -2,7 +2,7 @@
__all__ = (
'BaseProtocol', 'Protocol', 'DatagramProtocol',
'SubprocessProtocol',
'SubprocessProtocol', 'BufferedProtocol',
)
@ -102,6 +102,57 @@ class Protocol(BaseProtocol):
"""
class BufferedProtocol(BaseProtocol):
"""Interface for stream protocol with manual buffer control.
Important: this has been been added to asyncio in Python 3.7
*on a provisional basis*! Treat it as an experimental API that
might be changed or removed in Python 3.8.
Event methods, such as `create_server` and `create_connection`,
accept factories that return protocols that implement this interface.
The idea of BufferedProtocol is that it allows to manually allocate
and control the receive buffer. Event loops can then use the buffer
provided by the protocol to avoid unnecessary data copies. This
can result in noticeable performance improvement for protocols that
receive big amounts of data. Sophisticated protocols can allocate
the buffer only once at creation time.
State machine of calls:
start -> CM [-> GB [-> BU?]]* [-> ER?] -> CL -> end
* CM: connection_made()
* GB: get_buffer()
* BU: buffer_updated()
* ER: eof_received()
* CL: connection_lost()
"""
def get_buffer(self):
"""Called to allocate a new receive buffer.
Must return an object that implements the
:ref:`buffer protocol <bufferobjects>`.
"""
def buffer_updated(self, nbytes):
"""Called when the buffer was updated with the received data.
*nbytes* is the total number of bytes that were written to
the buffer.
"""
def eof_received(self):
"""Called when the other end calls write_eof() or equivalent.
If this returns a false value (including None), the transport
will close itself. If it returns a true value, closing the
transport is up to the protocol.
"""
class DatagramProtocol(BaseProtocol):
"""Interface for datagram protocol."""
@ -134,3 +185,7 @@ class SubprocessProtocol(BaseProtocol):
def process_exited(self):
"""Called when subprocess has exited."""
def _is_buffered_protocol(proto):
return hasattr(proto, 'get_buffer') and not hasattr(proto, 'data_received')

View File

@ -22,8 +22,9 @@ from . import base_events
from . import constants
from . import events
from . import futures
from . import transports
from . import protocols
from . import sslproto
from . import transports
from .log import logger
@ -713,6 +714,12 @@ class _SelectorSocketTransport(_SelectorTransport):
def __init__(self, loop, sock, protocol, waiter=None,
extra=None, server=None):
if protocols._is_buffered_protocol(protocol):
self._read_ready = self._read_ready__get_buffer
else:
self._read_ready = self._read_ready__data_received
super().__init__(loop, sock, protocol, extra, server)
self._eof = False
self._paused = False
@ -751,29 +758,74 @@ class _SelectorSocketTransport(_SelectorTransport):
if self._loop.get_debug():
logger.debug("%r resumes reading", self)
def _read_ready(self):
def _read_ready__get_buffer(self):
if self._conn_lost:
return
try:
buf = self._protocol.get_buffer()
except Exception as exc:
self._fatal_error(
exc, 'Fatal error: protocol.get_buffer() call failed.')
return
try:
nbytes = self._sock.recv_into(buf)
except (BlockingIOError, InterruptedError):
return
except Exception as exc:
self._fatal_error(exc, 'Fatal read error on socket transport')
return
if not nbytes:
self._read_ready__on_eof()
return
try:
self._protocol.buffer_updated(nbytes)
except Exception as exc:
self._fatal_error(
exc, 'Fatal error: protocol.buffer_updated() call failed.')
def _read_ready__data_received(self):
if self._conn_lost:
return
try:
data = self._sock.recv(self.max_size)
except (BlockingIOError, InterruptedError):
pass
return
except Exception as exc:
self._fatal_error(exc, 'Fatal read error on socket transport')
return
if not data:
self._read_ready__on_eof()
return
try:
self._protocol.data_received(data)
except Exception as exc:
self._fatal_error(
exc, 'Fatal error: protocol.data_received() call failed.')
def _read_ready__on_eof(self):
if self._loop.get_debug():
logger.debug("%r received EOF", self)
try:
keep_open = self._protocol.eof_received()
except Exception as exc:
self._fatal_error(
exc, 'Fatal error: protocol.eof_received() call failed.')
return
if keep_open:
# We're keeping the connection open so the
# protocol can write more, but we still can't
# receive more, so remove the reader callback.
self._loop._remove_reader(self._sock_fd)
else:
if data:
self._protocol.data_received(data)
else:
if self._loop.get_debug():
logger.debug("%r received EOF", self)
keep_open = self._protocol.eof_received()
if keep_open:
# We're keeping the connection open so the
# protocol can write more, but we still can't
# receive more, so remove the reader callback.
self._loop._remove_reader(self._sock_fd)
else:
self.close()
self.close()
def write(self, data):
if not isinstance(data, (bytes, bytearray, memoryview)):

View File

@ -0,0 +1,85 @@
import asyncio
import unittest
from test.test_asyncio import functional as func_tests
class ReceiveStuffProto(asyncio.BufferedProtocol):
def __init__(self, cb, con_lost_fut):
self.cb = cb
self.con_lost_fut = con_lost_fut
def get_buffer(self):
self.buffer = bytearray(100)
return self.buffer
def buffer_updated(self, nbytes):
self.cb(self.buffer[:nbytes])
def connection_lost(self, exc):
if exc is None:
self.con_lost_fut.set_result(None)
else:
self.con_lost_fut.set_exception(exc)
class BaseTestBufferedProtocol(func_tests.FunctionalTestCaseMixin):
def new_loop(self):
raise NotImplementedError
def test_buffered_proto_create_connection(self):
NOISE = b'12345678+' * 1024
async def client(addr):
data = b''
def on_buf(buf):
nonlocal data
data += buf
if data == NOISE:
tr.write(b'1')
conn_lost_fut = self.loop.create_future()
tr, pr = await self.loop.create_connection(
lambda: ReceiveStuffProto(on_buf, conn_lost_fut), *addr)
await conn_lost_fut
async def on_server_client(reader, writer):
writer.write(NOISE)
await reader.readexactly(1)
writer.close()
await writer.wait_closed()
srv = self.loop.run_until_complete(
asyncio.start_server(
on_server_client, '127.0.0.1', 0))
addr = srv.sockets[0].getsockname()
self.loop.run_until_complete(
asyncio.wait_for(client(addr), 5, loop=self.loop))
srv.close()
self.loop.run_until_complete(srv.wait_closed())
class BufferedProtocolSelectorTests(BaseTestBufferedProtocol,
unittest.TestCase):
def new_loop(self):
return asyncio.SelectorEventLoop()
@unittest.skipUnless(hasattr(asyncio, 'ProactorEventLoop'), 'Windows only')
class BufferedProtocolProactorTests(BaseTestBufferedProtocol,
unittest.TestCase):
def new_loop(self):
return asyncio.ProactorEventLoop()
if __name__ == '__main__':
unittest.main()

View File

@ -44,12 +44,12 @@ class ProactorSocketTransportTests(test_utils.TestCase):
test_utils.run_briefly(self.loop)
self.assertIsNone(fut.result())
self.protocol.connection_made(tr)
self.proactor.recv.assert_called_with(self.sock, 4096)
self.proactor.recv.assert_called_with(self.sock, 32768)
def test_loop_reading(self):
tr = self.socket_transport()
tr._loop_reading()
self.loop._proactor.recv.assert_called_with(self.sock, 4096)
self.loop._proactor.recv.assert_called_with(self.sock, 32768)
self.assertFalse(self.protocol.data_received.called)
self.assertFalse(self.protocol.eof_received.called)
@ -60,7 +60,7 @@ class ProactorSocketTransportTests(test_utils.TestCase):
tr = self.socket_transport()
tr._read_fut = res
tr._loop_reading(res)
self.loop._proactor.recv.assert_called_with(self.sock, 4096)
self.loop._proactor.recv.assert_called_with(self.sock, 32768)
self.protocol.data_received.assert_called_with(b'data')
def test_loop_reading_no_data(self):
@ -444,6 +444,197 @@ class ProactorSocketTransportTests(test_utils.TestCase):
self.assertFalse(self.protocol.pause_writing.called)
class ProactorSocketTransportBufferedProtoTests(test_utils.TestCase):
def setUp(self):
super().setUp()
self.loop = self.new_test_loop()
self.addCleanup(self.loop.close)
self.proactor = mock.Mock()
self.loop._proactor = self.proactor
self.protocol = test_utils.make_test_protocol(asyncio.BufferedProtocol)
self.buf = mock.Mock()
self.protocol.get_buffer.side_effect = lambda: self.buf
self.sock = mock.Mock(socket.socket)
def socket_transport(self, waiter=None):
transport = _ProactorSocketTransport(self.loop, self.sock,
self.protocol, waiter=waiter)
self.addCleanup(close_transport, transport)
return transport
def test_ctor(self):
fut = asyncio.Future(loop=self.loop)
tr = self.socket_transport(waiter=fut)
test_utils.run_briefly(self.loop)
self.assertIsNone(fut.result())
self.protocol.connection_made(tr)
self.proactor.recv_into.assert_called_with(self.sock, self.buf)
def test_loop_reading(self):
tr = self.socket_transport()
tr._loop_reading()
self.loop._proactor.recv_into.assert_called_with(self.sock, self.buf)
self.assertTrue(self.protocol.get_buffer.called)
self.assertFalse(self.protocol.buffer_updated.called)
self.assertFalse(self.protocol.eof_received.called)
def test_get_buffer_error(self):
transport = self.socket_transport()
transport._fatal_error = mock.Mock()
self.loop.call_exception_handler = mock.Mock()
self.protocol.get_buffer.side_effect = LookupError()
transport._loop_reading()
self.assertTrue(transport._fatal_error.called)
self.assertTrue(self.protocol.get_buffer.called)
self.assertFalse(self.protocol.buffer_updated.called)
def test_buffer_updated_error(self):
transport = self.socket_transport()
transport._fatal_error = mock.Mock()
self.loop.call_exception_handler = mock.Mock()
self.protocol.buffer_updated.side_effect = LookupError()
res = asyncio.Future(loop=self.loop)
res.set_result(10)
transport._read_fut = res
transport._loop_reading(res)
self.assertTrue(transport._fatal_error.called)
self.assertFalse(self.protocol.get_buffer.called)
self.assertTrue(self.protocol.buffer_updated.called)
def test_loop_eof_received_error(self):
res = asyncio.Future(loop=self.loop)
res.set_result(0)
self.protocol.eof_received.side_effect = LookupError()
tr = self.socket_transport()
tr._fatal_error = mock.Mock()
tr.close = mock.Mock()
tr._read_fut = res
tr._loop_reading(res)
self.assertFalse(self.loop._proactor.recv_into.called)
self.assertTrue(self.protocol.eof_received.called)
self.assertTrue(tr._fatal_error.called)
def test_loop_reading_data(self):
res = asyncio.Future(loop=self.loop)
res.set_result(4)
tr = self.socket_transport()
tr._read_fut = res
tr._loop_reading(res)
self.loop._proactor.recv_into.assert_called_with(self.sock, self.buf)
self.protocol.buffer_updated.assert_called_with(4)
def test_loop_reading_no_data(self):
res = asyncio.Future(loop=self.loop)
res.set_result(0)
tr = self.socket_transport()
self.assertRaises(AssertionError, tr._loop_reading, res)
tr.close = mock.Mock()
tr._read_fut = res
tr._loop_reading(res)
self.assertFalse(self.loop._proactor.recv_into.called)
self.assertTrue(self.protocol.eof_received.called)
self.assertTrue(tr.close.called)
def test_loop_reading_aborted(self):
err = self.loop._proactor.recv_into.side_effect = \
ConnectionAbortedError()
tr = self.socket_transport()
tr._fatal_error = mock.Mock()
tr._loop_reading()
tr._fatal_error.assert_called_with(
err, 'Fatal read error on pipe transport')
def test_loop_reading_aborted_closing(self):
self.loop._proactor.recv.side_effect = ConnectionAbortedError()
tr = self.socket_transport()
tr._closing = True
tr._fatal_error = mock.Mock()
tr._loop_reading()
self.assertFalse(tr._fatal_error.called)
def test_loop_reading_aborted_is_fatal(self):
self.loop._proactor.recv_into.side_effect = ConnectionAbortedError()
tr = self.socket_transport()
tr._closing = False
tr._fatal_error = mock.Mock()
tr._loop_reading()
self.assertTrue(tr._fatal_error.called)
def test_loop_reading_conn_reset_lost(self):
err = self.loop._proactor.recv_into.side_effect = ConnectionResetError()
tr = self.socket_transport()
tr._closing = False
tr._fatal_error = mock.Mock()
tr._force_close = mock.Mock()
tr._loop_reading()
self.assertFalse(tr._fatal_error.called)
tr._force_close.assert_called_with(err)
def test_loop_reading_exception(self):
err = self.loop._proactor.recv_into.side_effect = OSError()
tr = self.socket_transport()
tr._fatal_error = mock.Mock()
tr._loop_reading()
tr._fatal_error.assert_called_with(
err, 'Fatal read error on pipe transport')
def test_pause_resume_reading(self):
tr = self.socket_transport()
futures = []
for msg in [10, 20, 30, 40, 0]:
f = asyncio.Future(loop=self.loop)
f.set_result(msg)
futures.append(f)
self.loop._proactor.recv_into.side_effect = futures
self.loop._run_once()
self.assertFalse(tr._paused)
self.assertTrue(tr.is_reading())
self.loop._run_once()
self.protocol.buffer_updated.assert_called_with(10)
self.loop._run_once()
self.protocol.buffer_updated.assert_called_with(20)
tr.pause_reading()
tr.pause_reading()
self.assertTrue(tr._paused)
self.assertFalse(tr.is_reading())
for i in range(10):
self.loop._run_once()
self.protocol.buffer_updated.assert_called_with(20)
tr.resume_reading()
tr.resume_reading()
self.assertFalse(tr._paused)
self.assertTrue(tr.is_reading())
self.loop._run_once()
self.protocol.buffer_updated.assert_called_with(30)
self.loop._run_once()
self.protocol.buffer_updated.assert_called_with(40)
tr.close()
self.assertFalse(tr.is_reading())
class BaseProactorEventLoopTests(test_utils.TestCase):
def setUp(self):

View File

@ -926,6 +926,34 @@ class SelectorSocketTransportTests(test_utils.TestCase):
self.assertFalse(tr.is_reading())
self.loop.assert_no_reader(7)
def test_read_eof_received_error(self):
transport = self.socket_transport()
transport.close = mock.Mock()
transport._fatal_error = mock.Mock()
self.loop.call_exception_handler = mock.Mock()
self.protocol.eof_received.side_effect = LookupError()
self.sock.recv.return_value = b''
transport._read_ready()
self.protocol.eof_received.assert_called_with()
self.assertTrue(transport._fatal_error.called)
def test_data_received_error(self):
transport = self.socket_transport()
transport._fatal_error = mock.Mock()
self.loop.call_exception_handler = mock.Mock()
self.protocol.data_received.side_effect = LookupError()
self.sock.recv.return_value = b'data'
transport._read_ready()
self.assertTrue(transport._fatal_error.called)
self.assertTrue(self.protocol.data_received.called)
def test_read_ready(self):
transport = self.socket_transport()
@ -1229,6 +1257,149 @@ class SelectorSocketTransportTests(test_utils.TestCase):
remove_writer.assert_called_with(self.sock_fd)
class SelectorSocketTransportBufferedProtocolTests(test_utils.TestCase):
def setUp(self):
super().setUp()
self.loop = self.new_test_loop()
self.protocol = test_utils.make_test_protocol(asyncio.BufferedProtocol)
self.buf = mock.Mock()
self.protocol.get_buffer.side_effect = lambda: self.buf
self.sock = mock.Mock(socket.socket)
self.sock_fd = self.sock.fileno.return_value = 7
def socket_transport(self, waiter=None):
transport = _SelectorSocketTransport(self.loop, self.sock,
self.protocol, waiter=waiter)
self.addCleanup(close_transport, transport)
return transport
def test_ctor(self):
waiter = asyncio.Future(loop=self.loop)
tr = self.socket_transport(waiter=waiter)
self.loop.run_until_complete(waiter)
self.loop.assert_reader(7, tr._read_ready)
test_utils.run_briefly(self.loop)
self.protocol.connection_made.assert_called_with(tr)
def test_get_buffer_error(self):
transport = self.socket_transport()
transport._fatal_error = mock.Mock()
self.loop.call_exception_handler = mock.Mock()
self.protocol.get_buffer.side_effect = LookupError()
transport._read_ready()
self.assertTrue(transport._fatal_error.called)
self.assertTrue(self.protocol.get_buffer.called)
self.assertFalse(self.protocol.buffer_updated.called)
def test_buffer_updated_error(self):
transport = self.socket_transport()
transport._fatal_error = mock.Mock()
self.loop.call_exception_handler = mock.Mock()
self.protocol.buffer_updated.side_effect = LookupError()
self.sock.recv_into.return_value = 10
transport._read_ready()
self.assertTrue(transport._fatal_error.called)
self.assertTrue(self.protocol.get_buffer.called)
self.assertTrue(self.protocol.buffer_updated.called)
def test_read_eof_received_error(self):
transport = self.socket_transport()
transport.close = mock.Mock()
transport._fatal_error = mock.Mock()
self.loop.call_exception_handler = mock.Mock()
self.protocol.eof_received.side_effect = LookupError()
self.sock.recv_into.return_value = 0
transport._read_ready()
self.protocol.eof_received.assert_called_with()
self.assertTrue(transport._fatal_error.called)
def test_read_ready(self):
transport = self.socket_transport()
self.sock.recv_into.return_value = 10
transport._read_ready()
self.protocol.get_buffer.assert_called_with()
self.protocol.buffer_updated.assert_called_with(10)
def test_read_ready_eof(self):
transport = self.socket_transport()
transport.close = mock.Mock()
self.sock.recv_into.return_value = 0
transport._read_ready()
self.protocol.eof_received.assert_called_with()
transport.close.assert_called_with()
def test_read_ready_eof_keep_open(self):
transport = self.socket_transport()
transport.close = mock.Mock()
self.sock.recv_into.return_value = 0
self.protocol.eof_received.return_value = True
transport._read_ready()
self.protocol.eof_received.assert_called_with()
self.assertFalse(transport.close.called)
@mock.patch('logging.exception')
def test_read_ready_tryagain(self, m_exc):
self.sock.recv_into.side_effect = BlockingIOError
transport = self.socket_transport()
transport._fatal_error = mock.Mock()
transport._read_ready()
self.assertFalse(transport._fatal_error.called)
@mock.patch('logging.exception')
def test_read_ready_tryagain_interrupted(self, m_exc):
self.sock.recv_into.side_effect = InterruptedError
transport = self.socket_transport()
transport._fatal_error = mock.Mock()
transport._read_ready()
self.assertFalse(transport._fatal_error.called)
@mock.patch('logging.exception')
def test_read_ready_conn_reset(self, m_exc):
err = self.sock.recv_into.side_effect = ConnectionResetError()
transport = self.socket_transport()
transport._force_close = mock.Mock()
with test_utils.disable_logger():
transport._read_ready()
transport._force_close.assert_called_with(err)
@mock.patch('logging.exception')
def test_read_ready_err(self, m_exc):
err = self.sock.recv_into.side_effect = OSError()
transport = self.socket_transport()
transport._fatal_error = mock.Mock()
transport._read_ready()
transport._fatal_error.assert_called_with(
err,
'Fatal read error on socket transport')
class SelectorDatagramTransportTests(test_utils.TestCase):
def setUp(self):

View File

@ -0,0 +1 @@
Implement asyncio.BufferedProtocol (provisional API).