Issue #21040: socketserver: Use the selectors module.

This commit is contained in:
Charles-François Natali 2014-03-24 22:25:39 +00:00
parent e3fb80fb76
commit 1d29cc5b6c
3 changed files with 58 additions and 74 deletions

View File

@ -113,7 +113,7 @@ the request handler class :meth:`handle` method.
Another approach to handling multiple simultaneous requests in an environment Another approach to handling multiple simultaneous requests in an environment
that supports neither threads nor :func:`~os.fork` (or where these are too that supports neither threads nor :func:`~os.fork` (or where these are too
expensive or inappropriate for the service) is to maintain an explicit table of expensive or inappropriate for the service) is to maintain an explicit table of
partially finished requests and to use :func:`~select.select` to decide which partially finished requests and to use :mod:`selectors` to decide which
request to work on next (or whether to handle a new incoming request). This is request to work on next (or whether to handle a new incoming request). This is
particularly important for stream services where each client can potentially be particularly important for stream services where each client can potentially be
connected for a long time (if threads or subprocesses cannot be used). See connected for a long time (if threads or subprocesses cannot be used). See
@ -136,7 +136,7 @@ Server Objects
.. method:: BaseServer.fileno() .. method:: BaseServer.fileno()
Return an integer file descriptor for the socket on which the server is Return an integer file descriptor for the socket on which the server is
listening. This function is most commonly passed to :func:`select.select`, to listening. This function is most commonly passed to :mod:`selectors`, to
allow monitoring multiple servers in the same process. allow monitoring multiple servers in the same process.

View File

@ -94,7 +94,7 @@ handle() method.
Another approach to handling multiple simultaneous requests in an Another approach to handling multiple simultaneous requests in an
environment that supports neither threads nor fork (or where these are environment that supports neither threads nor fork (or where these are
too expensive or inappropriate for the service) is to maintain an too expensive or inappropriate for the service) is to maintain an
explicit table of partially finished requests and to use select() to explicit table of partially finished requests and to use a selector to
decide which request to work on next (or whether to handle a new decide which request to work on next (or whether to handle a new
incoming request). This is particularly important for stream services incoming request). This is particularly important for stream services
where each client can potentially be connected for a long time (if where each client can potentially be connected for a long time (if
@ -104,7 +104,6 @@ Future work:
- Standard classes for Sun RPC (which uses either UDP or TCP) - Standard classes for Sun RPC (which uses either UDP or TCP)
- Standard mix-in classes to implement various authentication - Standard mix-in classes to implement various authentication
and encryption schemes and encryption schemes
- Standard framework for select-based multiplexing
XXX Open problems: XXX Open problems:
- What to do with out-of-band data? - What to do with out-of-band data?
@ -130,13 +129,17 @@ __version__ = "0.4"
import socket import socket
import select import selectors
import os import os
import errno import errno
try: try:
import threading import threading
except ImportError: except ImportError:
import dummy_threading as threading import dummy_threading as threading
try:
from time import monotonic as time
except ImportError:
from time import time as time
__all__ = ["TCPServer","UDPServer","ForkingUDPServer","ForkingTCPServer", __all__ = ["TCPServer","UDPServer","ForkingUDPServer","ForkingTCPServer",
"ThreadingUDPServer","ThreadingTCPServer","BaseRequestHandler", "ThreadingUDPServer","ThreadingTCPServer","BaseRequestHandler",
@ -147,14 +150,13 @@ if hasattr(socket, "AF_UNIX"):
"ThreadingUnixStreamServer", "ThreadingUnixStreamServer",
"ThreadingUnixDatagramServer"]) "ThreadingUnixDatagramServer"])
def _eintr_retry(func, *args): # poll/select have the advantage of not requiring any extra file descriptor,
"""restart a system call interrupted by EINTR""" # contrarily to epoll/kqueue (also, they require a single syscall).
while True: if hasattr(selectors, 'PollSelector'):
try: _ServerSelector = selectors.PollSelector
return func(*args) else:
except OSError as e: _ServerSelector = selectors.SelectSelector
if e.errno != errno.EINTR:
raise
class BaseServer: class BaseServer:
@ -166,7 +168,7 @@ class BaseServer:
- serve_forever(poll_interval=0.5) - serve_forever(poll_interval=0.5)
- shutdown() - shutdown()
- handle_request() # if you do not use serve_forever() - handle_request() # if you do not use serve_forever()
- fileno() -> int # for select() - fileno() -> int # for selector
Methods that may be overridden: Methods that may be overridden:
@ -227,17 +229,19 @@ class BaseServer:
""" """
self.__is_shut_down.clear() self.__is_shut_down.clear()
try: try:
while not self.__shutdown_request: # XXX: Consider using another file descriptor or connecting to the
# XXX: Consider using another file descriptor or # socket to wake this up instead of polling. Polling reduces our
# connecting to the socket to wake this up instead of # responsiveness to a shutdown request and wastes cpu at all other
# polling. Polling reduces our responsiveness to a # times.
# shutdown request and wastes cpu at all other times. with _ServerSelector() as selector:
r, w, e = _eintr_retry(select.select, [self], [], [], selector.register(self, selectors.EVENT_READ)
poll_interval)
if self in r:
self._handle_request_noblock()
self.service_actions() while not self.__shutdown_request:
ready = selector.select(poll_interval)
if ready:
self._handle_request_noblock()
self.service_actions()
finally: finally:
self.__shutdown_request = False self.__shutdown_request = False
self.__is_shut_down.set() self.__is_shut_down.set()
@ -260,16 +264,16 @@ class BaseServer:
""" """
pass pass
# The distinction between handling, getting, processing and # The distinction between handling, getting, processing and finishing a
# finishing a request is fairly arbitrary. Remember: # request is fairly arbitrary. Remember:
# #
# - handle_request() is the top-level call. It calls # - handle_request() is the top-level call. It calls selector.select(),
# select, get_request(), verify_request() and process_request() # get_request(), verify_request() and process_request()
# - get_request() is different for stream or datagram sockets # - get_request() is different for stream or datagram sockets
# - process_request() is the place that may fork a new process # - process_request() is the place that may fork a new process or create a
# or create a new thread to finish the request # new thread to finish the request
# - finish_request() instantiates the request handler class; # - finish_request() instantiates the request handler class; this
# this constructor will handle the request all by itself # constructor will handle the request all by itself
def handle_request(self): def handle_request(self):
"""Handle one request, possibly blocking. """Handle one request, possibly blocking.
@ -283,18 +287,30 @@ class BaseServer:
timeout = self.timeout timeout = self.timeout
elif self.timeout is not None: elif self.timeout is not None:
timeout = min(timeout, self.timeout) timeout = min(timeout, self.timeout)
fd_sets = _eintr_retry(select.select, [self], [], [], timeout) if timeout is not None:
if not fd_sets[0]: deadline = time() + timeout
self.handle_timeout()
return # Wait until a request arrives or the timeout expires - the loop is
self._handle_request_noblock() # necessary to accomodate early wakeups due to EINTR.
with _ServerSelector() as selector:
selector.register(self, selectors.EVENT_READ)
while True:
ready = selector.select(timeout)
if ready:
return self._handle_request_noblock()
else:
if timeout is not None:
timeout = deadline - time()
if timeout < 0:
return self.handle_timeout()
def _handle_request_noblock(self): def _handle_request_noblock(self):
"""Handle one request, without blocking. """Handle one request, without blocking.
I assume that select.select has returned that the socket is I assume that selector.select() has returned that the socket is
readable before this function was called, so there should be readable before this function was called, so there should be no risk of
no risk of blocking in get_request(). blocking in get_request().
""" """
try: try:
request, client_address = self.get_request() request, client_address = self.get_request()
@ -377,7 +393,7 @@ class TCPServer(BaseServer):
- serve_forever(poll_interval=0.5) - serve_forever(poll_interval=0.5)
- shutdown() - shutdown()
- handle_request() # if you don't use serve_forever() - handle_request() # if you don't use serve_forever()
- fileno() -> int # for select() - fileno() -> int # for selector
Methods that may be overridden: Methods that may be overridden:
@ -459,7 +475,7 @@ class TCPServer(BaseServer):
def fileno(self): def fileno(self):
"""Return socket file number. """Return socket file number.
Interface required by select(). Interface required by selector.
""" """
return self.socket.fileno() return self.socket.fileno()

View File

@ -222,38 +222,6 @@ class SocketServerTest(unittest.TestCase):
socketserver.DatagramRequestHandler, socketserver.DatagramRequestHandler,
self.dgram_examine) self.dgram_examine)
@contextlib.contextmanager
def mocked_select_module(self):
"""Mocks the select.select() call to raise EINTR for first call"""
old_select = select.select
class MockSelect:
def __init__(self):
self.called = 0
def __call__(self, *args):
self.called += 1
if self.called == 1:
# raise the exception on first call
raise OSError(errno.EINTR, os.strerror(errno.EINTR))
else:
# Return real select value for consecutive calls
return old_select(*args)
select.select = MockSelect()
try:
yield select.select
finally:
select.select = old_select
def test_InterruptServerSelectCall(self):
with self.mocked_select_module() as mock_select:
pid = self.run_server(socketserver.TCPServer,
socketserver.StreamRequestHandler,
self.stream_examine)
# Make sure select was called again:
self.assertGreater(mock_select.called, 1)
# Alas, on Linux (at least) recvfrom() doesn't return a meaningful # Alas, on Linux (at least) recvfrom() doesn't return a meaningful
# client address so this cannot work: # client address so this cannot work: