mirror of https://github.com/python/cpython
Merge 3.5 (issue #28369)
This commit is contained in:
commit
bb8eb92f46
|
@ -11,6 +11,7 @@ import errno
|
|||
import functools
|
||||
import socket
|
||||
import warnings
|
||||
import weakref
|
||||
try:
|
||||
import ssl
|
||||
except ImportError: # pragma: no cover
|
||||
|
@ -64,6 +65,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
|
|||
logger.debug('Using selector: %s', selector.__class__.__name__)
|
||||
self._selector = selector
|
||||
self._make_self_pipe()
|
||||
self._transports = weakref.WeakValueDictionary()
|
||||
|
||||
def _make_socket_transport(self, sock, protocol, waiter=None, *,
|
||||
extra=None, server=None):
|
||||
|
@ -115,7 +117,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
|
|||
raise NotImplementedError
|
||||
|
||||
def _close_self_pipe(self):
|
||||
self.remove_reader(self._ssock.fileno())
|
||||
self._remove_reader(self._ssock.fileno())
|
||||
self._ssock.close()
|
||||
self._ssock = None
|
||||
self._csock.close()
|
||||
|
@ -128,7 +130,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
|
|||
self._ssock.setblocking(False)
|
||||
self._csock.setblocking(False)
|
||||
self._internal_fds += 1
|
||||
self.add_reader(self._ssock.fileno(), self._read_from_self)
|
||||
self._add_reader(self._ssock.fileno(), self._read_from_self)
|
||||
|
||||
def _process_self_data(self, data):
|
||||
pass
|
||||
|
@ -163,8 +165,8 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
|
|||
|
||||
def _start_serving(self, protocol_factory, sock,
|
||||
sslcontext=None, server=None, backlog=100):
|
||||
self.add_reader(sock.fileno(), self._accept_connection,
|
||||
protocol_factory, sock, sslcontext, server, backlog)
|
||||
self._add_reader(sock.fileno(), self._accept_connection,
|
||||
protocol_factory, sock, sslcontext, server, backlog)
|
||||
|
||||
def _accept_connection(self, protocol_factory, sock,
|
||||
sslcontext=None, server=None, backlog=100):
|
||||
|
@ -194,7 +196,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
|
|||
'exception': exc,
|
||||
'socket': sock,
|
||||
})
|
||||
self.remove_reader(sock.fileno())
|
||||
self._remove_reader(sock.fileno())
|
||||
self.call_later(constants.ACCEPT_RETRY_DELAY,
|
||||
self._start_serving,
|
||||
protocol_factory, sock, sslcontext, server,
|
||||
|
@ -244,8 +246,18 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
|
|||
context['transport'] = transport
|
||||
self.call_exception_handler(context)
|
||||
|
||||
def add_reader(self, fd, callback, *args):
|
||||
"""Add a reader callback."""
|
||||
def _ensure_fd_no_transport(self, fd):
|
||||
try:
|
||||
transport = self._transports[fd]
|
||||
except KeyError:
|
||||
pass
|
||||
else:
|
||||
if not transport.is_closing():
|
||||
raise RuntimeError(
|
||||
'File descriptor {!r} is used by transport {!r}'.format(
|
||||
fd, transport))
|
||||
|
||||
def _add_reader(self, fd, callback, *args):
|
||||
self._check_closed()
|
||||
handle = events.Handle(callback, args, self)
|
||||
try:
|
||||
|
@ -260,8 +272,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
|
|||
if reader is not None:
|
||||
reader.cancel()
|
||||
|
||||
def remove_reader(self, fd):
|
||||
"""Remove a reader callback."""
|
||||
def _remove_reader(self, fd):
|
||||
if self.is_closed():
|
||||
return False
|
||||
try:
|
||||
|
@ -282,8 +293,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
|
|||
else:
|
||||
return False
|
||||
|
||||
def add_writer(self, fd, callback, *args):
|
||||
"""Add a writer callback.."""
|
||||
def _add_writer(self, fd, callback, *args):
|
||||
self._check_closed()
|
||||
handle = events.Handle(callback, args, self)
|
||||
try:
|
||||
|
@ -298,7 +308,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
|
|||
if writer is not None:
|
||||
writer.cancel()
|
||||
|
||||
def remove_writer(self, fd):
|
||||
def _remove_writer(self, fd):
|
||||
"""Remove a writer callback."""
|
||||
if self.is_closed():
|
||||
return False
|
||||
|
@ -321,6 +331,26 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
|
|||
else:
|
||||
return False
|
||||
|
||||
def add_reader(self, fd, callback, *args):
|
||||
"""Add a reader callback."""
|
||||
self._ensure_fd_no_transport(fd)
|
||||
return self._add_reader(fd, callback, *args)
|
||||
|
||||
def remove_reader(self, fd):
|
||||
"""Remove a reader callback."""
|
||||
self._ensure_fd_no_transport(fd)
|
||||
return self._remove_reader(fd)
|
||||
|
||||
def add_writer(self, fd, callback, *args):
|
||||
"""Add a writer callback.."""
|
||||
self._ensure_fd_no_transport(fd)
|
||||
return self._add_writer(fd, callback, *args)
|
||||
|
||||
def remove_writer(self, fd):
|
||||
"""Remove a writer callback."""
|
||||
self._ensure_fd_no_transport(fd)
|
||||
return self._remove_writer(fd)
|
||||
|
||||
def sock_recv(self, sock, n):
|
||||
"""Receive data from the socket.
|
||||
|
||||
|
@ -494,17 +524,17 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
|
|||
fileobj, (reader, writer) = key.fileobj, key.data
|
||||
if mask & selectors.EVENT_READ and reader is not None:
|
||||
if reader._cancelled:
|
||||
self.remove_reader(fileobj)
|
||||
self._remove_reader(fileobj)
|
||||
else:
|
||||
self._add_callback(reader)
|
||||
if mask & selectors.EVENT_WRITE and writer is not None:
|
||||
if writer._cancelled:
|
||||
self.remove_writer(fileobj)
|
||||
self._remove_writer(fileobj)
|
||||
else:
|
||||
self._add_callback(writer)
|
||||
|
||||
def _stop_serving(self, sock):
|
||||
self.remove_reader(sock.fileno())
|
||||
self._remove_reader(sock.fileno())
|
||||
sock.close()
|
||||
|
||||
|
||||
|
@ -539,6 +569,7 @@ class _SelectorTransport(transports._FlowControlMixin,
|
|||
self._closing = False # Set when close() called.
|
||||
if self._server is not None:
|
||||
self._server._attach()
|
||||
loop._transports[self._sock_fd] = self
|
||||
|
||||
def __repr__(self):
|
||||
info = [self.__class__.__name__]
|
||||
|
@ -584,10 +615,10 @@ class _SelectorTransport(transports._FlowControlMixin,
|
|||
if self._closing:
|
||||
return
|
||||
self._closing = True
|
||||
self._loop.remove_reader(self._sock_fd)
|
||||
self._loop._remove_reader(self._sock_fd)
|
||||
if not self._buffer:
|
||||
self._conn_lost += 1
|
||||
self._loop.remove_writer(self._sock_fd)
|
||||
self._loop._remove_writer(self._sock_fd)
|
||||
self._loop.call_soon(self._call_connection_lost, None)
|
||||
|
||||
# On Python 3.3 and older, objects with a destructor part of a reference
|
||||
|
@ -619,10 +650,10 @@ class _SelectorTransport(transports._FlowControlMixin,
|
|||
return
|
||||
if self._buffer:
|
||||
self._buffer.clear()
|
||||
self._loop.remove_writer(self._sock_fd)
|
||||
self._loop._remove_writer(self._sock_fd)
|
||||
if not self._closing:
|
||||
self._closing = True
|
||||
self._loop.remove_reader(self._sock_fd)
|
||||
self._loop._remove_reader(self._sock_fd)
|
||||
self._conn_lost += 1
|
||||
self._loop.call_soon(self._call_connection_lost, exc)
|
||||
|
||||
|
@ -659,7 +690,7 @@ class _SelectorSocketTransport(_SelectorTransport):
|
|||
|
||||
self._loop.call_soon(self._protocol.connection_made, self)
|
||||
# only start reading when connection_made() has been called
|
||||
self._loop.call_soon(self._loop.add_reader,
|
||||
self._loop.call_soon(self._loop._add_reader,
|
||||
self._sock_fd, self._read_ready)
|
||||
if waiter is not None:
|
||||
# only wake up the waiter when connection_made() has been called
|
||||
|
@ -672,7 +703,7 @@ class _SelectorSocketTransport(_SelectorTransport):
|
|||
if self._paused:
|
||||
raise RuntimeError('Already paused')
|
||||
self._paused = True
|
||||
self._loop.remove_reader(self._sock_fd)
|
||||
self._loop._remove_reader(self._sock_fd)
|
||||
if self._loop.get_debug():
|
||||
logger.debug("%r pauses reading", self)
|
||||
|
||||
|
@ -682,7 +713,7 @@ class _SelectorSocketTransport(_SelectorTransport):
|
|||
self._paused = False
|
||||
if self._closing:
|
||||
return
|
||||
self._loop.add_reader(self._sock_fd, self._read_ready)
|
||||
self._loop._add_reader(self._sock_fd, self._read_ready)
|
||||
if self._loop.get_debug():
|
||||
logger.debug("%r resumes reading", self)
|
||||
|
||||
|
@ -706,7 +737,7 @@ class _SelectorSocketTransport(_SelectorTransport):
|
|||
# We're keeping the connection open so the
|
||||
# protocol can write more, but we still can't
|
||||
# receive more, so remove the reader callback.
|
||||
self._loop.remove_reader(self._sock_fd)
|
||||
self._loop._remove_reader(self._sock_fd)
|
||||
else:
|
||||
self.close()
|
||||
|
||||
|
@ -739,7 +770,7 @@ class _SelectorSocketTransport(_SelectorTransport):
|
|||
if not data:
|
||||
return
|
||||
# Not all was written; register write handler.
|
||||
self._loop.add_writer(self._sock_fd, self._write_ready)
|
||||
self._loop._add_writer(self._sock_fd, self._write_ready)
|
||||
|
||||
# Add it to the buffer.
|
||||
self._buffer.extend(data)
|
||||
|
@ -755,7 +786,7 @@ class _SelectorSocketTransport(_SelectorTransport):
|
|||
except (BlockingIOError, InterruptedError):
|
||||
pass
|
||||
except Exception as exc:
|
||||
self._loop.remove_writer(self._sock_fd)
|
||||
self._loop._remove_writer(self._sock_fd)
|
||||
self._buffer.clear()
|
||||
self._fatal_error(exc, 'Fatal write error on socket transport')
|
||||
else:
|
||||
|
@ -763,7 +794,7 @@ class _SelectorSocketTransport(_SelectorTransport):
|
|||
del self._buffer[:n]
|
||||
self._maybe_resume_protocol() # May append to buffer.
|
||||
if not self._buffer:
|
||||
self._loop.remove_writer(self._sock_fd)
|
||||
self._loop._remove_writer(self._sock_fd)
|
||||
if self._closing:
|
||||
self._call_connection_lost(None)
|
||||
elif self._eof:
|
||||
|
@ -834,19 +865,19 @@ class _SelectorSslTransport(_SelectorTransport):
|
|||
try:
|
||||
self._sock.do_handshake()
|
||||
except ssl.SSLWantReadError:
|
||||
self._loop.add_reader(self._sock_fd,
|
||||
self._on_handshake, start_time)
|
||||
self._loop._add_reader(self._sock_fd,
|
||||
self._on_handshake, start_time)
|
||||
return
|
||||
except ssl.SSLWantWriteError:
|
||||
self._loop.add_writer(self._sock_fd,
|
||||
self._on_handshake, start_time)
|
||||
self._loop._add_writer(self._sock_fd,
|
||||
self._on_handshake, start_time)
|
||||
return
|
||||
except BaseException as exc:
|
||||
if self._loop.get_debug():
|
||||
logger.warning("%r: SSL handshake failed",
|
||||
self, exc_info=True)
|
||||
self._loop.remove_reader(self._sock_fd)
|
||||
self._loop.remove_writer(self._sock_fd)
|
||||
self._loop._remove_reader(self._sock_fd)
|
||||
self._loop._remove_writer(self._sock_fd)
|
||||
self._sock.close()
|
||||
self._wakeup_waiter(exc)
|
||||
if isinstance(exc, Exception):
|
||||
|
@ -854,8 +885,8 @@ class _SelectorSslTransport(_SelectorTransport):
|
|||
else:
|
||||
raise
|
||||
|
||||
self._loop.remove_reader(self._sock_fd)
|
||||
self._loop.remove_writer(self._sock_fd)
|
||||
self._loop._remove_reader(self._sock_fd)
|
||||
self._loop._remove_writer(self._sock_fd)
|
||||
|
||||
peercert = self._sock.getpeercert()
|
||||
if not hasattr(self._sslcontext, 'check_hostname'):
|
||||
|
@ -883,7 +914,7 @@ class _SelectorSslTransport(_SelectorTransport):
|
|||
|
||||
self._read_wants_write = False
|
||||
self._write_wants_read = False
|
||||
self._loop.add_reader(self._sock_fd, self._read_ready)
|
||||
self._loop._add_reader(self._sock_fd, self._read_ready)
|
||||
self._protocol_connected = True
|
||||
self._loop.call_soon(self._protocol.connection_made, self)
|
||||
# only wake up the waiter when connection_made() has been called
|
||||
|
@ -905,7 +936,7 @@ class _SelectorSslTransport(_SelectorTransport):
|
|||
if self._paused:
|
||||
raise RuntimeError('Already paused')
|
||||
self._paused = True
|
||||
self._loop.remove_reader(self._sock_fd)
|
||||
self._loop._remove_reader(self._sock_fd)
|
||||
if self._loop.get_debug():
|
||||
logger.debug("%r pauses reading", self)
|
||||
|
||||
|
@ -915,7 +946,7 @@ class _SelectorSslTransport(_SelectorTransport):
|
|||
self._paused = False
|
||||
if self._closing:
|
||||
return
|
||||
self._loop.add_reader(self._sock_fd, self._read_ready)
|
||||
self._loop._add_reader(self._sock_fd, self._read_ready)
|
||||
if self._loop.get_debug():
|
||||
logger.debug("%r resumes reading", self)
|
||||
|
||||
|
@ -927,7 +958,7 @@ class _SelectorSslTransport(_SelectorTransport):
|
|||
self._write_ready()
|
||||
|
||||
if self._buffer:
|
||||
self._loop.add_writer(self._sock_fd, self._write_ready)
|
||||
self._loop._add_writer(self._sock_fd, self._write_ready)
|
||||
|
||||
try:
|
||||
data = self._sock.recv(self.max_size)
|
||||
|
@ -935,8 +966,8 @@ class _SelectorSslTransport(_SelectorTransport):
|
|||
pass
|
||||
except ssl.SSLWantWriteError:
|
||||
self._read_wants_write = True
|
||||
self._loop.remove_reader(self._sock_fd)
|
||||
self._loop.add_writer(self._sock_fd, self._write_ready)
|
||||
self._loop._remove_reader(self._sock_fd)
|
||||
self._loop._add_writer(self._sock_fd, self._write_ready)
|
||||
except Exception as exc:
|
||||
self._fatal_error(exc, 'Fatal read error on SSL transport')
|
||||
else:
|
||||
|
@ -961,7 +992,7 @@ class _SelectorSslTransport(_SelectorTransport):
|
|||
self._read_ready()
|
||||
|
||||
if not (self._paused or self._closing):
|
||||
self._loop.add_reader(self._sock_fd, self._read_ready)
|
||||
self._loop._add_reader(self._sock_fd, self._read_ready)
|
||||
|
||||
if self._buffer:
|
||||
try:
|
||||
|
@ -970,10 +1001,10 @@ class _SelectorSslTransport(_SelectorTransport):
|
|||
n = 0
|
||||
except ssl.SSLWantReadError:
|
||||
n = 0
|
||||
self._loop.remove_writer(self._sock_fd)
|
||||
self._loop._remove_writer(self._sock_fd)
|
||||
self._write_wants_read = True
|
||||
except Exception as exc:
|
||||
self._loop.remove_writer(self._sock_fd)
|
||||
self._loop._remove_writer(self._sock_fd)
|
||||
self._buffer.clear()
|
||||
self._fatal_error(exc, 'Fatal write error on SSL transport')
|
||||
return
|
||||
|
@ -984,7 +1015,7 @@ class _SelectorSslTransport(_SelectorTransport):
|
|||
self._maybe_resume_protocol() # May append to buffer.
|
||||
|
||||
if not self._buffer:
|
||||
self._loop.remove_writer(self._sock_fd)
|
||||
self._loop._remove_writer(self._sock_fd)
|
||||
if self._closing:
|
||||
self._call_connection_lost(None)
|
||||
|
||||
|
@ -1002,7 +1033,7 @@ class _SelectorSslTransport(_SelectorTransport):
|
|||
return
|
||||
|
||||
if not self._buffer:
|
||||
self._loop.add_writer(self._sock_fd, self._write_ready)
|
||||
self._loop._add_writer(self._sock_fd, self._write_ready)
|
||||
|
||||
# Add it to the buffer.
|
||||
self._buffer.extend(data)
|
||||
|
@ -1022,7 +1053,7 @@ class _SelectorDatagramTransport(_SelectorTransport):
|
|||
self._address = address
|
||||
self._loop.call_soon(self._protocol.connection_made, self)
|
||||
# only start reading when connection_made() has been called
|
||||
self._loop.call_soon(self._loop.add_reader,
|
||||
self._loop.call_soon(self._loop._add_reader,
|
||||
self._sock_fd, self._read_ready)
|
||||
if waiter is not None:
|
||||
# only wake up the waiter when connection_made() has been called
|
||||
|
@ -1072,7 +1103,7 @@ class _SelectorDatagramTransport(_SelectorTransport):
|
|||
self._sock.sendto(data, addr)
|
||||
return
|
||||
except (BlockingIOError, InterruptedError):
|
||||
self._loop.add_writer(self._sock_fd, self._sendto_ready)
|
||||
self._loop._add_writer(self._sock_fd, self._sendto_ready)
|
||||
except OSError as exc:
|
||||
self._protocol.error_received(exc)
|
||||
return
|
||||
|
@ -1106,6 +1137,6 @@ class _SelectorDatagramTransport(_SelectorTransport):
|
|||
|
||||
self._maybe_resume_protocol() # May append to buffer.
|
||||
if not self._buffer:
|
||||
self._loop.remove_writer(self._sock_fd)
|
||||
self._loop._remove_writer(self._sock_fd)
|
||||
if self._closing:
|
||||
self._call_connection_lost(None)
|
||||
|
|
|
@ -13,6 +13,8 @@ import tempfile
|
|||
import threading
|
||||
import time
|
||||
import unittest
|
||||
import weakref
|
||||
|
||||
from unittest import mock
|
||||
|
||||
from http.server import HTTPServer
|
||||
|
@ -300,6 +302,8 @@ class TestLoop(base_events.BaseEventLoop):
|
|||
self.writers = {}
|
||||
self.reset_counters()
|
||||
|
||||
self._transports = weakref.WeakValueDictionary()
|
||||
|
||||
def time(self):
|
||||
return self._time
|
||||
|
||||
|
@ -318,10 +322,10 @@ class TestLoop(base_events.BaseEventLoop):
|
|||
else: # pragma: no cover
|
||||
raise AssertionError("Time generator is not finished")
|
||||
|
||||
def add_reader(self, fd, callback, *args):
|
||||
def _add_reader(self, fd, callback, *args):
|
||||
self.readers[fd] = events.Handle(callback, args, self)
|
||||
|
||||
def remove_reader(self, fd):
|
||||
def _remove_reader(self, fd):
|
||||
self.remove_reader_count[fd] += 1
|
||||
if fd in self.readers:
|
||||
del self.readers[fd]
|
||||
|
@ -337,10 +341,10 @@ class TestLoop(base_events.BaseEventLoop):
|
|||
assert handle._args == args, '{!r} != {!r}'.format(
|
||||
handle._args, args)
|
||||
|
||||
def add_writer(self, fd, callback, *args):
|
||||
def _add_writer(self, fd, callback, *args):
|
||||
self.writers[fd] = events.Handle(callback, args, self)
|
||||
|
||||
def remove_writer(self, fd):
|
||||
def _remove_writer(self, fd):
|
||||
self.remove_writer_count[fd] += 1
|
||||
if fd in self.writers:
|
||||
del self.writers[fd]
|
||||
|
@ -356,6 +360,36 @@ class TestLoop(base_events.BaseEventLoop):
|
|||
assert handle._args == args, '{!r} != {!r}'.format(
|
||||
handle._args, args)
|
||||
|
||||
def _ensure_fd_no_transport(self, fd):
|
||||
try:
|
||||
transport = self._transports[fd]
|
||||
except KeyError:
|
||||
pass
|
||||
else:
|
||||
raise RuntimeError(
|
||||
'File descriptor {!r} is used by transport {!r}'.format(
|
||||
fd, transport))
|
||||
|
||||
def add_reader(self, fd, callback, *args):
|
||||
"""Add a reader callback."""
|
||||
self._ensure_fd_no_transport(fd)
|
||||
return self._add_reader(fd, callback, *args)
|
||||
|
||||
def remove_reader(self, fd):
|
||||
"""Remove a reader callback."""
|
||||
self._ensure_fd_no_transport(fd)
|
||||
return self._remove_reader(fd)
|
||||
|
||||
def add_writer(self, fd, callback, *args):
|
||||
"""Add a writer callback.."""
|
||||
self._ensure_fd_no_transport(fd)
|
||||
return self._add_writer(fd, callback, *args)
|
||||
|
||||
def remove_writer(self, fd):
|
||||
"""Remove a writer callback."""
|
||||
self._ensure_fd_no_transport(fd)
|
||||
return self._remove_writer(fd)
|
||||
|
||||
def reset_counters(self):
|
||||
self.remove_reader_count = collections.defaultdict(int)
|
||||
self.remove_writer_count = collections.defaultdict(int)
|
||||
|
|
|
@ -321,7 +321,7 @@ class _UnixReadPipeTransport(transports.ReadTransport):
|
|||
|
||||
self._loop.call_soon(self._protocol.connection_made, self)
|
||||
# only start reading when connection_made() has been called
|
||||
self._loop.call_soon(self._loop.add_reader,
|
||||
self._loop.call_soon(self._loop._add_reader,
|
||||
self._fileno, self._read_ready)
|
||||
if waiter is not None:
|
||||
# only wake up the waiter when connection_made() has been called
|
||||
|
@ -364,15 +364,15 @@ class _UnixReadPipeTransport(transports.ReadTransport):
|
|||
if self._loop.get_debug():
|
||||
logger.info("%r was closed by peer", self)
|
||||
self._closing = True
|
||||
self._loop.remove_reader(self._fileno)
|
||||
self._loop._remove_reader(self._fileno)
|
||||
self._loop.call_soon(self._protocol.eof_received)
|
||||
self._loop.call_soon(self._call_connection_lost, None)
|
||||
|
||||
def pause_reading(self):
|
||||
self._loop.remove_reader(self._fileno)
|
||||
self._loop._remove_reader(self._fileno)
|
||||
|
||||
def resume_reading(self):
|
||||
self._loop.add_reader(self._fileno, self._read_ready)
|
||||
self._loop._add_reader(self._fileno, self._read_ready)
|
||||
|
||||
def set_protocol(self, protocol):
|
||||
self._protocol = protocol
|
||||
|
@ -413,7 +413,7 @@ class _UnixReadPipeTransport(transports.ReadTransport):
|
|||
|
||||
def _close(self, exc):
|
||||
self._closing = True
|
||||
self._loop.remove_reader(self._fileno)
|
||||
self._loop._remove_reader(self._fileno)
|
||||
self._loop.call_soon(self._call_connection_lost, exc)
|
||||
|
||||
def _call_connection_lost(self, exc):
|
||||
|
@ -458,7 +458,7 @@ class _UnixWritePipeTransport(transports._FlowControlMixin,
|
|||
# works for pipes and sockets. (Exception: OS X 10.4? Issue #19294.)
|
||||
if is_socket or (is_fifo and not sys.platform.startswith("aix")):
|
||||
# only start reading when connection_made() has been called
|
||||
self._loop.call_soon(self._loop.add_reader,
|
||||
self._loop.call_soon(self._loop._add_reader,
|
||||
self._fileno, self._read_ready)
|
||||
|
||||
if waiter is not None:
|
||||
|
@ -531,7 +531,7 @@ class _UnixWritePipeTransport(transports._FlowControlMixin,
|
|||
return
|
||||
elif n > 0:
|
||||
data = memoryview(data)[n:]
|
||||
self._loop.add_writer(self._fileno, self._write_ready)
|
||||
self._loop._add_writer(self._fileno, self._write_ready)
|
||||
|
||||
self._buffer += data
|
||||
self._maybe_pause_protocol()
|
||||
|
@ -548,15 +548,15 @@ class _UnixWritePipeTransport(transports._FlowControlMixin,
|
|||
self._conn_lost += 1
|
||||
# Remove writer here, _fatal_error() doesn't it
|
||||
# because _buffer is empty.
|
||||
self._loop.remove_writer(self._fileno)
|
||||
self._loop._remove_writer(self._fileno)
|
||||
self._fatal_error(exc, 'Fatal write error on pipe transport')
|
||||
else:
|
||||
if n == len(self._buffer):
|
||||
self._buffer.clear()
|
||||
self._loop.remove_writer(self._fileno)
|
||||
self._loop._remove_writer(self._fileno)
|
||||
self._maybe_resume_protocol() # May append to buffer.
|
||||
if self._closing:
|
||||
self._loop.remove_reader(self._fileno)
|
||||
self._loop._remove_reader(self._fileno)
|
||||
self._call_connection_lost(None)
|
||||
return
|
||||
elif n > 0:
|
||||
|
@ -571,7 +571,7 @@ class _UnixWritePipeTransport(transports._FlowControlMixin,
|
|||
assert self._pipe
|
||||
self._closing = True
|
||||
if not self._buffer:
|
||||
self._loop.remove_reader(self._fileno)
|
||||
self._loop._remove_reader(self._fileno)
|
||||
self._loop.call_soon(self._call_connection_lost, None)
|
||||
|
||||
def set_protocol(self, protocol):
|
||||
|
@ -618,9 +618,9 @@ class _UnixWritePipeTransport(transports._FlowControlMixin,
|
|||
def _close(self, exc=None):
|
||||
self._closing = True
|
||||
if self._buffer:
|
||||
self._loop.remove_writer(self._fileno)
|
||||
self._loop._remove_writer(self._fileno)
|
||||
self._buffer.clear()
|
||||
self._loop.remove_reader(self._fileno)
|
||||
self._loop._remove_reader(self._fileno)
|
||||
self._loop.call_soon(self._call_connection_lost, exc)
|
||||
|
||||
def _call_connection_lost(self, exc):
|
||||
|
|
|
@ -1148,10 +1148,10 @@ class BaseEventLoopWithSelectorTests(test_utils.TestCase):
|
|||
m_socket.getaddrinfo = socket.getaddrinfo
|
||||
sock = m_socket.socket.return_value
|
||||
|
||||
self.loop.add_reader = mock.Mock()
|
||||
self.loop.add_reader._is_coroutine = False
|
||||
self.loop.add_writer = mock.Mock()
|
||||
self.loop.add_writer._is_coroutine = False
|
||||
self.loop._add_reader = mock.Mock()
|
||||
self.loop._add_reader._is_coroutine = False
|
||||
self.loop._add_writer = mock.Mock()
|
||||
self.loop._add_writer._is_coroutine = False
|
||||
|
||||
coro = self.loop.create_connection(asyncio.Protocol, '1.2.3.4', 80)
|
||||
t, p = self.loop.run_until_complete(coro)
|
||||
|
@ -1194,10 +1194,10 @@ class BaseEventLoopWithSelectorTests(test_utils.TestCase):
|
|||
m_socket.getaddrinfo = socket.getaddrinfo
|
||||
sock = m_socket.socket.return_value
|
||||
|
||||
self.loop.add_reader = mock.Mock()
|
||||
self.loop.add_reader._is_coroutine = False
|
||||
self.loop.add_writer = mock.Mock()
|
||||
self.loop.add_writer._is_coroutine = False
|
||||
self.loop._add_reader = mock.Mock()
|
||||
self.loop._add_reader._is_coroutine = False
|
||||
self.loop._add_writer = mock.Mock()
|
||||
self.loop._add_writer._is_coroutine = False
|
||||
|
||||
for service, port in ('http', 80), (b'http', 80):
|
||||
coro = self.loop.create_connection(asyncio.Protocol,
|
||||
|
@ -1614,8 +1614,8 @@ class BaseEventLoopWithSelectorTests(test_utils.TestCase):
|
|||
|
||||
m_socket.getaddrinfo = getaddrinfo
|
||||
m_socket.socket.return_value.bind = bind = mock.Mock()
|
||||
self.loop.add_reader = mock.Mock()
|
||||
self.loop.add_reader._is_coroutine = False
|
||||
self.loop._add_reader = mock.Mock()
|
||||
self.loop._add_reader._is_coroutine = False
|
||||
|
||||
reuseport_supported = hasattr(socket, 'SO_REUSEPORT')
|
||||
coro = self.loop.create_datagram_endpoint(
|
||||
|
@ -1646,13 +1646,13 @@ class BaseEventLoopWithSelectorTests(test_utils.TestCase):
|
|||
sock = mock.Mock()
|
||||
sock.fileno.return_value = 10
|
||||
sock.accept.side_effect = OSError(errno.EMFILE, 'Too many open files')
|
||||
self.loop.remove_reader = mock.Mock()
|
||||
self.loop._remove_reader = mock.Mock()
|
||||
self.loop.call_later = mock.Mock()
|
||||
|
||||
self.loop._accept_connection(MyProto, sock)
|
||||
self.assertTrue(m_log.error.called)
|
||||
self.assertFalse(sock.close.called)
|
||||
self.loop.remove_reader.assert_called_with(10)
|
||||
self.loop._remove_reader.assert_called_with(10)
|
||||
self.loop.call_later.assert_called_with(constants.ACCEPT_RETRY_DELAY,
|
||||
# self.loop._start_serving
|
||||
mock.ANY,
|
||||
|
|
|
@ -72,11 +72,11 @@ class BaseSelectorEventLoopTests(test_utils.TestCase):
|
|||
@unittest.skipIf(ssl is None, 'No ssl module')
|
||||
def test_make_ssl_transport(self):
|
||||
m = mock.Mock()
|
||||
self.loop.add_reader = mock.Mock()
|
||||
self.loop.add_reader._is_coroutine = False
|
||||
self.loop.add_writer = mock.Mock()
|
||||
self.loop.remove_reader = mock.Mock()
|
||||
self.loop.remove_writer = mock.Mock()
|
||||
self.loop._add_reader = mock.Mock()
|
||||
self.loop._add_reader._is_coroutine = False
|
||||
self.loop._add_writer = mock.Mock()
|
||||
self.loop._remove_reader = mock.Mock()
|
||||
self.loop._remove_writer = mock.Mock()
|
||||
waiter = asyncio.Future(loop=self.loop)
|
||||
with test_utils.disable_logger():
|
||||
transport = self.loop._make_ssl_transport(
|
||||
|
@ -119,7 +119,7 @@ class BaseSelectorEventLoopTests(test_utils.TestCase):
|
|||
ssock.fileno.return_value = 7
|
||||
csock = self.loop._csock
|
||||
csock.fileno.return_value = 1
|
||||
remove_reader = self.loop.remove_reader = mock.Mock()
|
||||
remove_reader = self.loop._remove_reader = mock.Mock()
|
||||
|
||||
self.loop._selector.close()
|
||||
self.loop._selector = selector = mock.Mock()
|
||||
|
@ -651,12 +651,12 @@ class BaseSelectorEventLoopTests(test_utils.TestCase):
|
|||
reader = mock.Mock()
|
||||
reader.cancelled = True
|
||||
|
||||
self.loop.remove_reader = mock.Mock()
|
||||
self.loop._remove_reader = mock.Mock()
|
||||
self.loop._process_events(
|
||||
[(selectors.SelectorKey(
|
||||
1, 1, selectors.EVENT_READ, (reader, None)),
|
||||
selectors.EVENT_READ)])
|
||||
self.loop.remove_reader.assert_called_with(1)
|
||||
self.loop._remove_reader.assert_called_with(1)
|
||||
|
||||
def test_process_events_write(self):
|
||||
writer = mock.Mock()
|
||||
|
@ -672,13 +672,13 @@ class BaseSelectorEventLoopTests(test_utils.TestCase):
|
|||
def test_process_events_write_cancelled(self):
|
||||
writer = mock.Mock()
|
||||
writer.cancelled = True
|
||||
self.loop.remove_writer = mock.Mock()
|
||||
self.loop._remove_writer = mock.Mock()
|
||||
|
||||
self.loop._process_events(
|
||||
[(selectors.SelectorKey(1, 1, selectors.EVENT_WRITE,
|
||||
(None, writer)),
|
||||
selectors.EVENT_WRITE)])
|
||||
self.loop.remove_writer.assert_called_with(1)
|
||||
self.loop._remove_writer.assert_called_with(1)
|
||||
|
||||
def test_accept_connection_multiple(self):
|
||||
sock = mock.Mock()
|
||||
|
@ -747,8 +747,8 @@ class SelectorTransportTests(test_utils.TestCase):
|
|||
def test_force_close(self):
|
||||
tr = self.create_transport()
|
||||
tr._buffer.extend(b'1')
|
||||
self.loop.add_reader(7, mock.sentinel)
|
||||
self.loop.add_writer(7, mock.sentinel)
|
||||
self.loop._add_reader(7, mock.sentinel)
|
||||
self.loop._add_writer(7, mock.sentinel)
|
||||
tr._force_close(None)
|
||||
|
||||
self.assertTrue(tr.is_closing())
|
||||
|
@ -1037,7 +1037,7 @@ class SelectorSocketTransportTests(test_utils.TestCase):
|
|||
|
||||
transport = self.socket_transport()
|
||||
transport._buffer.extend(data)
|
||||
self.loop.add_writer(7, transport._write_ready)
|
||||
self.loop._add_writer(7, transport._write_ready)
|
||||
transport._write_ready()
|
||||
self.assertTrue(self.sock.send.called)
|
||||
self.assertFalse(self.loop.writers)
|
||||
|
@ -1049,7 +1049,7 @@ class SelectorSocketTransportTests(test_utils.TestCase):
|
|||
transport = self.socket_transport()
|
||||
transport._closing = True
|
||||
transport._buffer.extend(data)
|
||||
self.loop.add_writer(7, transport._write_ready)
|
||||
self.loop._add_writer(7, transport._write_ready)
|
||||
transport._write_ready()
|
||||
self.assertTrue(self.sock.send.called)
|
||||
self.assertFalse(self.loop.writers)
|
||||
|
@ -1067,7 +1067,7 @@ class SelectorSocketTransportTests(test_utils.TestCase):
|
|||
|
||||
transport = self.socket_transport()
|
||||
transport._buffer.extend(data)
|
||||
self.loop.add_writer(7, transport._write_ready)
|
||||
self.loop._add_writer(7, transport._write_ready)
|
||||
transport._write_ready()
|
||||
self.loop.assert_writer(7, transport._write_ready)
|
||||
self.assertEqual(list_to_buffer([b'ta']), transport._buffer)
|
||||
|
@ -1078,7 +1078,7 @@ class SelectorSocketTransportTests(test_utils.TestCase):
|
|||
|
||||
transport = self.socket_transport()
|
||||
transport._buffer.extend(data)
|
||||
self.loop.add_writer(7, transport._write_ready)
|
||||
self.loop._add_writer(7, transport._write_ready)
|
||||
transport._write_ready()
|
||||
self.loop.assert_writer(7, transport._write_ready)
|
||||
self.assertEqual(list_to_buffer([b'data']), transport._buffer)
|
||||
|
@ -1088,7 +1088,7 @@ class SelectorSocketTransportTests(test_utils.TestCase):
|
|||
|
||||
transport = self.socket_transport()
|
||||
transport._buffer = list_to_buffer([b'data1', b'data2'])
|
||||
self.loop.add_writer(7, transport._write_ready)
|
||||
self.loop._add_writer(7, transport._write_ready)
|
||||
transport._write_ready()
|
||||
|
||||
self.loop.assert_writer(7, transport._write_ready)
|
||||
|
@ -1130,7 +1130,7 @@ class SelectorSocketTransportTests(test_utils.TestCase):
|
|||
|
||||
@mock.patch('asyncio.base_events.logger')
|
||||
def test_transport_close_remove_writer(self, m_log):
|
||||
remove_writer = self.loop.remove_writer = mock.Mock()
|
||||
remove_writer = self.loop._remove_writer = mock.Mock()
|
||||
|
||||
transport = self.socket_transport()
|
||||
transport.close()
|
||||
|
@ -1288,7 +1288,7 @@ class SelectorSslTransportTests(test_utils.TestCase):
|
|||
self.assertEqual((b'data',), self.protocol.data_received.call_args[0])
|
||||
|
||||
def test_read_ready_write_wants_read(self):
|
||||
self.loop.add_writer = mock.Mock()
|
||||
self.loop._add_writer = mock.Mock()
|
||||
self.sslsock.recv.side_effect = BlockingIOError
|
||||
transport = self._make_one()
|
||||
transport._write_wants_read = True
|
||||
|
@ -1298,7 +1298,7 @@ class SelectorSslTransportTests(test_utils.TestCase):
|
|||
|
||||
self.assertFalse(transport._write_wants_read)
|
||||
transport._write_ready.assert_called_with()
|
||||
self.loop.add_writer.assert_called_with(
|
||||
self.loop._add_writer.assert_called_with(
|
||||
transport._sock_fd, transport._write_ready)
|
||||
|
||||
def test_read_ready_recv_eof(self):
|
||||
|
@ -1333,16 +1333,16 @@ class SelectorSslTransportTests(test_utils.TestCase):
|
|||
self.assertFalse(self.protocol.data_received.called)
|
||||
|
||||
def test_read_ready_recv_write(self):
|
||||
self.loop.remove_reader = mock.Mock()
|
||||
self.loop.add_writer = mock.Mock()
|
||||
self.loop._remove_reader = mock.Mock()
|
||||
self.loop._add_writer = mock.Mock()
|
||||
self.sslsock.recv.side_effect = ssl.SSLWantWriteError
|
||||
transport = self._make_one()
|
||||
transport._read_ready()
|
||||
self.assertFalse(self.protocol.data_received.called)
|
||||
self.assertTrue(transport._read_wants_write)
|
||||
|
||||
self.loop.remove_reader.assert_called_with(transport._sock_fd)
|
||||
self.loop.add_writer.assert_called_with(
|
||||
self.loop._remove_reader.assert_called_with(transport._sock_fd)
|
||||
self.loop._add_writer.assert_called_with(
|
||||
transport._sock_fd, transport._write_ready)
|
||||
|
||||
def test_read_ready_recv_exc(self):
|
||||
|
@ -1419,12 +1419,12 @@ class SelectorSslTransportTests(test_utils.TestCase):
|
|||
transport = self._make_one()
|
||||
transport._buffer = list_to_buffer([b'data'])
|
||||
|
||||
self.loop.remove_writer = mock.Mock()
|
||||
self.loop._remove_writer = mock.Mock()
|
||||
self.sslsock.send.side_effect = ssl.SSLWantReadError
|
||||
transport._write_ready()
|
||||
self.assertFalse(self.protocol.data_received.called)
|
||||
self.assertTrue(transport._write_wants_read)
|
||||
self.loop.remove_writer.assert_called_with(transport._sock_fd)
|
||||
self.loop._remove_writer.assert_called_with(transport._sock_fd)
|
||||
|
||||
def test_write_ready_send_exc(self):
|
||||
err = self.sslsock.send.side_effect = OSError()
|
||||
|
@ -1439,7 +1439,7 @@ class SelectorSslTransportTests(test_utils.TestCase):
|
|||
self.assertEqual(list_to_buffer(), transport._buffer)
|
||||
|
||||
def test_write_ready_read_wants_write(self):
|
||||
self.loop.add_reader = mock.Mock()
|
||||
self.loop._add_reader = mock.Mock()
|
||||
self.sslsock.send.side_effect = BlockingIOError
|
||||
transport = self._make_one()
|
||||
transport._read_wants_write = True
|
||||
|
@ -1448,7 +1448,7 @@ class SelectorSslTransportTests(test_utils.TestCase):
|
|||
|
||||
self.assertFalse(transport._read_wants_write)
|
||||
transport._read_ready.assert_called_with()
|
||||
self.loop.add_reader.assert_called_with(
|
||||
self.loop._add_reader.assert_called_with(
|
||||
transport._sock_fd, transport._read_ready)
|
||||
|
||||
def test_write_eof(self):
|
||||
|
@ -1699,7 +1699,7 @@ class SelectorDatagramTransportTests(test_utils.TestCase):
|
|||
|
||||
transport = self.datagram_transport()
|
||||
transport._buffer.append((data, ('0.0.0.0', 12345)))
|
||||
self.loop.add_writer(7, transport._sendto_ready)
|
||||
self.loop._add_writer(7, transport._sendto_ready)
|
||||
transport._sendto_ready()
|
||||
self.assertTrue(self.sock.sendto.called)
|
||||
self.assertEqual(
|
||||
|
@ -1713,7 +1713,7 @@ class SelectorDatagramTransportTests(test_utils.TestCase):
|
|||
transport = self.datagram_transport()
|
||||
transport._closing = True
|
||||
transport._buffer.append((data, ()))
|
||||
self.loop.add_writer(7, transport._sendto_ready)
|
||||
self.loop._add_writer(7, transport._sendto_ready)
|
||||
transport._sendto_ready()
|
||||
self.sock.sendto.assert_called_with(data, ())
|
||||
self.assertFalse(self.loop.writers)
|
||||
|
@ -1722,7 +1722,7 @@ class SelectorDatagramTransportTests(test_utils.TestCase):
|
|||
|
||||
def test_sendto_ready_no_data(self):
|
||||
transport = self.datagram_transport()
|
||||
self.loop.add_writer(7, transport._sendto_ready)
|
||||
self.loop._add_writer(7, transport._sendto_ready)
|
||||
transport._sendto_ready()
|
||||
self.assertFalse(self.sock.sendto.called)
|
||||
self.assertFalse(self.loop.writers)
|
||||
|
@ -1732,7 +1732,7 @@ class SelectorDatagramTransportTests(test_utils.TestCase):
|
|||
|
||||
transport = self.datagram_transport()
|
||||
transport._buffer.extend([(b'data1', ()), (b'data2', ())])
|
||||
self.loop.add_writer(7, transport._sendto_ready)
|
||||
self.loop._add_writer(7, transport._sendto_ready)
|
||||
transport._sendto_ready()
|
||||
|
||||
self.loop.assert_writer(7, transport._sendto_ready)
|
||||
|
|
Loading…
Reference in New Issue