Merge 3.5 (issue #27906)
This commit is contained in:
commit
d070154fb5
|
@ -1035,7 +1035,7 @@ class BaseEventLoop(events.AbstractEventLoop):
|
|||
for sock in sockets:
|
||||
sock.listen(backlog)
|
||||
sock.setblocking(False)
|
||||
self._start_serving(protocol_factory, sock, ssl, server)
|
||||
self._start_serving(protocol_factory, sock, ssl, server, backlog)
|
||||
if self._debug:
|
||||
logger.info("%r is serving", server)
|
||||
return server
|
||||
|
|
|
@ -495,7 +495,7 @@ class BaseProactorEventLoop(base_events.BaseEventLoop):
|
|||
self._csock.send(b'\0')
|
||||
|
||||
def _start_serving(self, protocol_factory, sock,
|
||||
sslcontext=None, server=None):
|
||||
sslcontext=None, server=None, backlog=100):
|
||||
|
||||
def loop(f=None):
|
||||
try:
|
||||
|
|
|
@ -162,43 +162,50 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
|
|||
exc_info=True)
|
||||
|
||||
def _start_serving(self, protocol_factory, sock,
|
||||
sslcontext=None, server=None):
|
||||
sslcontext=None, server=None, backlog=100):
|
||||
self.add_reader(sock.fileno(), self._accept_connection,
|
||||
protocol_factory, sock, sslcontext, server)
|
||||
protocol_factory, sock, sslcontext, server, backlog)
|
||||
|
||||
def _accept_connection(self, protocol_factory, sock,
|
||||
sslcontext=None, server=None):
|
||||
try:
|
||||
conn, addr = sock.accept()
|
||||
if self._debug:
|
||||
logger.debug("%r got a new connection from %r: %r",
|
||||
server, addr, conn)
|
||||
conn.setblocking(False)
|
||||
except (BlockingIOError, InterruptedError, ConnectionAbortedError):
|
||||
pass # False alarm.
|
||||
except OSError as exc:
|
||||
# There's nowhere to send the error, so just log it.
|
||||
if exc.errno in (errno.EMFILE, errno.ENFILE,
|
||||
errno.ENOBUFS, errno.ENOMEM):
|
||||
# Some platforms (e.g. Linux keep reporting the FD as
|
||||
# ready, so we remove the read handler temporarily.
|
||||
# We'll try again in a while.
|
||||
self.call_exception_handler({
|
||||
'message': 'socket.accept() out of system resource',
|
||||
'exception': exc,
|
||||
'socket': sock,
|
||||
})
|
||||
self.remove_reader(sock.fileno())
|
||||
self.call_later(constants.ACCEPT_RETRY_DELAY,
|
||||
self._start_serving,
|
||||
protocol_factory, sock, sslcontext, server)
|
||||
sslcontext=None, server=None, backlog=100):
|
||||
# This method is only called once for each event loop tick where the
|
||||
# listening socket has triggered an EVENT_READ. There may be multiple
|
||||
# connections waiting for an .accept() so it is called in a loop.
|
||||
# See https://bugs.python.org/issue27906 for more details.
|
||||
for _ in range(backlog):
|
||||
try:
|
||||
conn, addr = sock.accept()
|
||||
if self._debug:
|
||||
logger.debug("%r got a new connection from %r: %r",
|
||||
server, addr, conn)
|
||||
conn.setblocking(False)
|
||||
except (BlockingIOError, InterruptedError, ConnectionAbortedError):
|
||||
# Early exit because the socket accept buffer is empty.
|
||||
return None
|
||||
except OSError as exc:
|
||||
# There's nowhere to send the error, so just log it.
|
||||
if exc.errno in (errno.EMFILE, errno.ENFILE,
|
||||
errno.ENOBUFS, errno.ENOMEM):
|
||||
# Some platforms (e.g. Linux keep reporting the FD as
|
||||
# ready, so we remove the read handler temporarily.
|
||||
# We'll try again in a while.
|
||||
self.call_exception_handler({
|
||||
'message': 'socket.accept() out of system resource',
|
||||
'exception': exc,
|
||||
'socket': sock,
|
||||
})
|
||||
self.remove_reader(sock.fileno())
|
||||
self.call_later(constants.ACCEPT_RETRY_DELAY,
|
||||
self._start_serving,
|
||||
protocol_factory, sock, sslcontext, server,
|
||||
backlog)
|
||||
else:
|
||||
raise # The event loop will catch, log and ignore it.
|
||||
else:
|
||||
raise # The event loop will catch, log and ignore it.
|
||||
else:
|
||||
extra = {'peername': addr}
|
||||
accept = self._accept_connection2(protocol_factory, conn, extra,
|
||||
sslcontext, server)
|
||||
self.create_task(accept)
|
||||
extra = {'peername': addr}
|
||||
accept = self._accept_connection2(protocol_factory, conn, extra,
|
||||
sslcontext, server)
|
||||
self.create_task(accept)
|
||||
|
||||
@coroutine
|
||||
def _accept_connection2(self, protocol_factory, conn, extra,
|
||||
|
|
|
@ -1634,7 +1634,7 @@ class BaseEventLoopWithSelectorTests(test_utils.TestCase):
|
|||
self.loop.call_later.assert_called_with(constants.ACCEPT_RETRY_DELAY,
|
||||
# self.loop._start_serving
|
||||
mock.ANY,
|
||||
MyProto, sock, None, None)
|
||||
MyProto, sock, None, None, mock.ANY)
|
||||
|
||||
def test_call_coroutine(self):
|
||||
@asyncio.coroutine
|
||||
|
|
|
@ -687,6 +687,20 @@ class BaseSelectorEventLoopTests(test_utils.TestCase):
|
|||
selectors.EVENT_WRITE)])
|
||||
self.loop.remove_writer.assert_called_with(1)
|
||||
|
||||
def test_accept_connection_multiple(self):
|
||||
sock = mock.Mock()
|
||||
sock.accept.return_value = (mock.Mock(), mock.Mock())
|
||||
backlog = 100
|
||||
# Mock the coroutine generation for a connection to prevent
|
||||
# warnings related to un-awaited coroutines.
|
||||
mock_obj = mock.patch.object
|
||||
with mock_obj(self.loop, '_accept_connection2') as accept2_mock:
|
||||
accept2_mock.return_value = None
|
||||
with mock_obj(self.loop, 'create_task') as task_mock:
|
||||
task_mock.return_value = None
|
||||
self.loop._accept_connection(mock.Mock(), sock, backlog=backlog)
|
||||
self.assertEqual(sock.accept.call_count, backlog)
|
||||
|
||||
|
||||
class SelectorTransportTests(test_utils.TestCase):
|
||||
|
||||
|
|
Loading…
Reference in New Issue