mirror of https://github.com/python/cpython
Progress on issue #1193577 by adding a polling .shutdown() method to
SocketServers. The core of the patch was written by Pedro Werneck, but any bugs are mine. I've also rearranged the code for timeouts in order to avoid interfering with the shutdown poll.
This commit is contained in:
parent
38fb9bee6c
commit
e75f59a578
|
@ -113,7 +113,8 @@ or inappropriate for the service) is to maintain an explicit table of partially
|
||||||
finished requests and to use :func:`select` to decide which request to work on
|
finished requests and to use :func:`select` to decide which request to work on
|
||||||
next (or whether to handle a new incoming request). This is particularly
|
next (or whether to handle a new incoming request). This is particularly
|
||||||
important for stream services where each client can potentially be connected for
|
important for stream services where each client can potentially be connected for
|
||||||
a long time (if threads or subprocesses cannot be used).
|
a long time (if threads or subprocesses cannot be used). See :mod:`asyncore` for
|
||||||
|
another way to manage this.
|
||||||
|
|
||||||
.. XXX should data and methods be intermingled, or separate?
|
.. XXX should data and methods be intermingled, or separate?
|
||||||
how should the distinction between class and instance variables be drawn?
|
how should the distinction between class and instance variables be drawn?
|
||||||
|
@ -132,16 +133,24 @@ Server Objects
|
||||||
|
|
||||||
.. function:: handle_request()
|
.. function:: handle_request()
|
||||||
|
|
||||||
Process a single request. This function calls the following methods in order:
|
Process a single request. This function calls the following methods in
|
||||||
:meth:`get_request`, :meth:`verify_request`, and :meth:`process_request`. If
|
order: :meth:`get_request`, :meth:`verify_request`, and
|
||||||
the user-provided :meth:`handle` method of the handler class raises an
|
:meth:`process_request`. If the user-provided :meth:`handle` method of the
|
||||||
exception, the server's :meth:`handle_error` method will be called.
|
handler class raises an exception, the server's :meth:`handle_error` method
|
||||||
|
will be called. If no request is received within :attr:`self.timeout`
|
||||||
|
seconds, :meth:`handle_timeout` will be called and :meth:`handle_request`
|
||||||
|
will return.
|
||||||
|
|
||||||
|
|
||||||
.. function:: serve_forever()
|
.. function:: serve_forever(poll_interval=0.5)
|
||||||
|
|
||||||
Handle an infinite number of requests. This simply calls :meth:`handle_request`
|
Handle requests until an explicit :meth:`shutdown` request. Polls for
|
||||||
inside an infinite loop.
|
shutdown every *poll_interval* seconds.
|
||||||
|
|
||||||
|
|
||||||
|
.. function:: shutdown()
|
||||||
|
|
||||||
|
Tells the :meth:`serve_forever` loop to stop and waits until it does.
|
||||||
|
|
||||||
|
|
||||||
.. data:: address_family
|
.. data:: address_family
|
||||||
|
@ -195,10 +204,9 @@ The server classes support the following class variables:
|
||||||
|
|
||||||
.. data:: timeout
|
.. data:: timeout
|
||||||
|
|
||||||
Timeout duration, measured in seconds, or :const:`None` if no timeout is desired.
|
Timeout duration, measured in seconds, or :const:`None` if no timeout is
|
||||||
If no incoming requests are received within the timeout period,
|
desired. If :meth:`handle_request` receives no incoming requests within the
|
||||||
the :meth:`handle_timeout` method is called and then the server resumes waiting for
|
timeout period, the :meth:`handle_timeout` method is called.
|
||||||
requests.
|
|
||||||
|
|
||||||
There are various server methods that can be overridden by subclasses of base
|
There are various server methods that can be overridden by subclasses of base
|
||||||
server classes like :class:`TCPServer`; these methods aren't useful to external
|
server classes like :class:`TCPServer`; these methods aren't useful to external
|
||||||
|
|
|
@ -130,8 +130,13 @@ __version__ = "0.4"
|
||||||
|
|
||||||
|
|
||||||
import socket
|
import socket
|
||||||
|
import select
|
||||||
import sys
|
import sys
|
||||||
import os
|
import os
|
||||||
|
try:
|
||||||
|
import threading
|
||||||
|
except ImportError:
|
||||||
|
import dummy_threading as threading
|
||||||
|
|
||||||
__all__ = ["TCPServer","UDPServer","ForkingUDPServer","ForkingTCPServer",
|
__all__ = ["TCPServer","UDPServer","ForkingUDPServer","ForkingTCPServer",
|
||||||
"ThreadingUDPServer","ThreadingTCPServer","BaseRequestHandler",
|
"ThreadingUDPServer","ThreadingTCPServer","BaseRequestHandler",
|
||||||
|
@ -149,7 +154,8 @@ class BaseServer:
|
||||||
Methods for the caller:
|
Methods for the caller:
|
||||||
|
|
||||||
- __init__(server_address, RequestHandlerClass)
|
- __init__(server_address, RequestHandlerClass)
|
||||||
- serve_forever()
|
- serve_forever(poll_interval=0.5)
|
||||||
|
- shutdown()
|
||||||
- handle_request() # if you do not use serve_forever()
|
- handle_request() # if you do not use serve_forever()
|
||||||
- fileno() -> int # for select()
|
- fileno() -> int # for select()
|
||||||
|
|
||||||
|
@ -190,6 +196,8 @@ class BaseServer:
|
||||||
"""Constructor. May be extended, do not override."""
|
"""Constructor. May be extended, do not override."""
|
||||||
self.server_address = server_address
|
self.server_address = server_address
|
||||||
self.RequestHandlerClass = RequestHandlerClass
|
self.RequestHandlerClass = RequestHandlerClass
|
||||||
|
self.__is_shut_down = threading.Event()
|
||||||
|
self.__serving = False
|
||||||
|
|
||||||
def server_activate(self):
|
def server_activate(self):
|
||||||
"""Called by constructor to activate the server.
|
"""Called by constructor to activate the server.
|
||||||
|
@ -199,27 +207,73 @@ class BaseServer:
|
||||||
"""
|
"""
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def serve_forever(self):
|
def serve_forever(self, poll_interval=0.5):
|
||||||
"""Handle one request at a time until doomsday."""
|
"""Handle one request at a time until shutdown.
|
||||||
while 1:
|
|
||||||
self.handle_request()
|
Polls for shutdown every poll_interval seconds. Ignores
|
||||||
|
self.timeout. If you need to do periodic tasks, do them in
|
||||||
|
another thread.
|
||||||
|
"""
|
||||||
|
self.__serving = True
|
||||||
|
self.__is_shut_down.clear()
|
||||||
|
while self.__serving:
|
||||||
|
# XXX: Consider using another file descriptor or
|
||||||
|
# connecting to the socket to wake this up instead of
|
||||||
|
# polling. Polling reduces our responsiveness to a
|
||||||
|
# shutdown request and wastes cpu at all other times.
|
||||||
|
r, w, e = select.select([self], [], [], poll_interval)
|
||||||
|
if r:
|
||||||
|
self._handle_request_noblock()
|
||||||
|
self.__is_shut_down.set()
|
||||||
|
|
||||||
|
def shutdown(self):
|
||||||
|
"""Stops the serve_forever loop.
|
||||||
|
|
||||||
|
Blocks until the loop has finished. This must be called while
|
||||||
|
serve_forever() is running in another thread, or it will
|
||||||
|
deadlock.
|
||||||
|
"""
|
||||||
|
self.__serving = False
|
||||||
|
self.__is_shut_down.wait()
|
||||||
|
|
||||||
# The distinction between handling, getting, processing and
|
# The distinction between handling, getting, processing and
|
||||||
# finishing a request is fairly arbitrary. Remember:
|
# finishing a request is fairly arbitrary. Remember:
|
||||||
#
|
#
|
||||||
# - handle_request() is the top-level call. It calls
|
# - handle_request() is the top-level call. It calls
|
||||||
# await_request(), verify_request() and process_request()
|
# select, get_request(), verify_request() and process_request()
|
||||||
# - get_request(), called by await_request(), is different for
|
# - get_request() is different for stream or datagram sockets
|
||||||
# stream or datagram sockets
|
|
||||||
# - process_request() is the place that may fork a new process
|
# - process_request() is the place that may fork a new process
|
||||||
# or create a new thread to finish the request
|
# or create a new thread to finish the request
|
||||||
# - finish_request() instantiates the request handler class;
|
# - finish_request() instantiates the request handler class;
|
||||||
# this constructor will handle the request all by itself
|
# this constructor will handle the request all by itself
|
||||||
|
|
||||||
def handle_request(self):
|
def handle_request(self):
|
||||||
"""Handle one request, possibly blocking."""
|
"""Handle one request, possibly blocking.
|
||||||
|
|
||||||
|
Respects self.timeout.
|
||||||
|
"""
|
||||||
|
# Support people who used socket.settimeout() to escape
|
||||||
|
# handle_request before self.timeout was available.
|
||||||
|
timeout = self.socket.gettimeout()
|
||||||
|
if timeout is None:
|
||||||
|
timeout = self.timeout
|
||||||
|
elif self.timeout is not None:
|
||||||
|
timeout = min(timeout, self.timeout)
|
||||||
|
fd_sets = select.select([self], [], [], timeout)
|
||||||
|
if not fd_sets[0]:
|
||||||
|
self.handle_timeout()
|
||||||
|
return
|
||||||
|
self._handle_request_noblock()
|
||||||
|
|
||||||
|
def _handle_request_noblock(self):
|
||||||
|
"""Handle one request, without blocking.
|
||||||
|
|
||||||
|
I assume that select.select has returned that the socket is
|
||||||
|
readable before this function was called, so there should be
|
||||||
|
no risk of blocking in get_request().
|
||||||
|
"""
|
||||||
try:
|
try:
|
||||||
request, client_address = self.await_request()
|
request, client_address = self.get_request()
|
||||||
except socket.error:
|
except socket.error:
|
||||||
return
|
return
|
||||||
if self.verify_request(request, client_address):
|
if self.verify_request(request, client_address):
|
||||||
|
@ -229,21 +283,6 @@ class BaseServer:
|
||||||
self.handle_error(request, client_address)
|
self.handle_error(request, client_address)
|
||||||
self.close_request(request)
|
self.close_request(request)
|
||||||
|
|
||||||
def await_request(self):
|
|
||||||
"""Call get_request or handle_timeout, observing self.timeout.
|
|
||||||
|
|
||||||
Returns value from get_request() or raises socket.timeout exception if
|
|
||||||
timeout was exceeded.
|
|
||||||
"""
|
|
||||||
if self.timeout is not None:
|
|
||||||
# If timeout == 0, you're responsible for your own fd magic.
|
|
||||||
import select
|
|
||||||
fd_sets = select.select([self], [], [], self.timeout)
|
|
||||||
if not fd_sets[0]:
|
|
||||||
self.handle_timeout()
|
|
||||||
raise socket.timeout("Listening timed out")
|
|
||||||
return self.get_request()
|
|
||||||
|
|
||||||
def handle_timeout(self):
|
def handle_timeout(self):
|
||||||
"""Called if no new request arrives within self.timeout.
|
"""Called if no new request arrives within self.timeout.
|
||||||
|
|
||||||
|
@ -307,7 +346,8 @@ class TCPServer(BaseServer):
|
||||||
Methods for the caller:
|
Methods for the caller:
|
||||||
|
|
||||||
- __init__(server_address, RequestHandlerClass, bind_and_activate=True)
|
- __init__(server_address, RequestHandlerClass, bind_and_activate=True)
|
||||||
- serve_forever()
|
- serve_forever(poll_interval=0.5)
|
||||||
|
- shutdown()
|
||||||
- handle_request() # if you don't use serve_forever()
|
- handle_request() # if you don't use serve_forever()
|
||||||
- fileno() -> int # for select()
|
- fileno() -> int # for select()
|
||||||
|
|
||||||
|
@ -523,7 +563,6 @@ class ThreadingMixIn:
|
||||||
|
|
||||||
def process_request(self, request, client_address):
|
def process_request(self, request, client_address):
|
||||||
"""Start a new thread to process the request."""
|
"""Start a new thread to process the request."""
|
||||||
import threading
|
|
||||||
t = threading.Thread(target = self.process_request_thread,
|
t = threading.Thread(target = self.process_request_thread,
|
||||||
args = (request, client_address))
|
args = (request, client_address))
|
||||||
if self.daemon_threads:
|
if self.daemon_threads:
|
||||||
|
|
|
@ -21,7 +21,6 @@ from test.test_support import TESTFN as TEST_FILE
|
||||||
|
|
||||||
test.test_support.requires("network")
|
test.test_support.requires("network")
|
||||||
|
|
||||||
NREQ = 3
|
|
||||||
TEST_STR = "hello world\n"
|
TEST_STR = "hello world\n"
|
||||||
HOST = "localhost"
|
HOST = "localhost"
|
||||||
|
|
||||||
|
@ -50,43 +49,6 @@ if HAVE_UNIX_SOCKETS:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
class MyMixinServer:
|
|
||||||
def serve_a_few(self):
|
|
||||||
for i in range(NREQ):
|
|
||||||
self.handle_request()
|
|
||||||
|
|
||||||
def handle_error(self, request, client_address):
|
|
||||||
self.close_request(request)
|
|
||||||
self.server_close()
|
|
||||||
raise
|
|
||||||
|
|
||||||
|
|
||||||
class ServerThread(threading.Thread):
|
|
||||||
def __init__(self, addr, svrcls, hdlrcls):
|
|
||||||
threading.Thread.__init__(self)
|
|
||||||
self.__addr = addr
|
|
||||||
self.__svrcls = svrcls
|
|
||||||
self.__hdlrcls = hdlrcls
|
|
||||||
self.ready = threading.Event()
|
|
||||||
|
|
||||||
def run(self):
|
|
||||||
class svrcls(MyMixinServer, self.__svrcls):
|
|
||||||
pass
|
|
||||||
if verbose: print "thread: creating server"
|
|
||||||
svr = svrcls(self.__addr, self.__hdlrcls)
|
|
||||||
# We had the OS pick a port, so pull the real address out of
|
|
||||||
# the server.
|
|
||||||
self.addr = svr.server_address
|
|
||||||
self.port = self.addr[1]
|
|
||||||
if self.addr != svr.socket.getsockname():
|
|
||||||
raise RuntimeError('server_address was %s, expected %s' %
|
|
||||||
(self.addr, svr.socket.getsockname()))
|
|
||||||
self.ready.set()
|
|
||||||
if verbose: print "thread: serving three times"
|
|
||||||
svr.serve_a_few()
|
|
||||||
if verbose: print "thread: done"
|
|
||||||
|
|
||||||
|
|
||||||
@contextlib.contextmanager
|
@contextlib.contextmanager
|
||||||
def simple_subprocess(testcase):
|
def simple_subprocess(testcase):
|
||||||
pid = os.fork()
|
pid = os.fork()
|
||||||
|
@ -143,28 +105,48 @@ class SocketServerTest(unittest.TestCase):
|
||||||
self.test_files.append(fn)
|
self.test_files.append(fn)
|
||||||
return fn
|
return fn
|
||||||
|
|
||||||
def run_server(self, svrcls, hdlrbase, testfunc):
|
def make_server(self, addr, svrcls, hdlrbase):
|
||||||
|
class MyServer(svrcls):
|
||||||
|
def handle_error(self, request, client_address):
|
||||||
|
self.close_request(request)
|
||||||
|
self.server_close()
|
||||||
|
raise
|
||||||
|
|
||||||
class MyHandler(hdlrbase):
|
class MyHandler(hdlrbase):
|
||||||
def handle(self):
|
def handle(self):
|
||||||
line = self.rfile.readline()
|
line = self.rfile.readline()
|
||||||
self.wfile.write(line)
|
self.wfile.write(line)
|
||||||
|
|
||||||
addr = self.pickaddr(svrcls.address_family)
|
if verbose: print "creating server"
|
||||||
|
server = MyServer(addr, MyHandler)
|
||||||
|
self.assertEquals(server.server_address, server.socket.getsockname())
|
||||||
|
return server
|
||||||
|
|
||||||
|
def run_server(self, svrcls, hdlrbase, testfunc):
|
||||||
|
server = self.make_server(self.pickaddr(svrcls.address_family),
|
||||||
|
svrcls, hdlrbase)
|
||||||
|
# We had the OS pick a port, so pull the real address out of
|
||||||
|
# the server.
|
||||||
|
addr = server.server_address
|
||||||
if verbose:
|
if verbose:
|
||||||
|
print "server created"
|
||||||
print "ADDR =", addr
|
print "ADDR =", addr
|
||||||
print "CLASS =", svrcls
|
print "CLASS =", svrcls
|
||||||
t = ServerThread(addr, svrcls, MyHandler)
|
t = threading.Thread(
|
||||||
if verbose: print "server created"
|
name='%s serving' % svrcls,
|
||||||
|
target=server.serve_forever,
|
||||||
|
# Short poll interval to make the test finish quickly.
|
||||||
|
# Time between requests is short enough that we won't wake
|
||||||
|
# up spuriously too many times.
|
||||||
|
kwargs={'poll_interval':0.01})
|
||||||
|
t.setDaemon(True) # In case this function raises.
|
||||||
t.start()
|
t.start()
|
||||||
if verbose: print "server running"
|
if verbose: print "server running"
|
||||||
t.ready.wait(10)
|
for i in range(3):
|
||||||
self.assert_(t.ready.isSet(),
|
|
||||||
"%s not ready within a reasonable time" % svrcls)
|
|
||||||
addr = t.addr
|
|
||||||
for i in range(NREQ):
|
|
||||||
if verbose: print "test client", i
|
if verbose: print "test client", i
|
||||||
testfunc(svrcls.address_family, addr)
|
testfunc(svrcls.address_family, addr)
|
||||||
if verbose: print "waiting for server"
|
if verbose: print "waiting for server"
|
||||||
|
server.shutdown()
|
||||||
t.join()
|
t.join()
|
||||||
if verbose: print "done"
|
if verbose: print "done"
|
||||||
|
|
||||||
|
|
|
@ -18,6 +18,9 @@ Core and builtins
|
||||||
Library
|
Library
|
||||||
-------
|
-------
|
||||||
|
|
||||||
|
- Issue #1193577: A .shutdown() method has been added to SocketServers
|
||||||
|
which terminates the .serve_forever() loop.
|
||||||
|
|
||||||
- Bug #2220: handle rlcompleter attribute match failure more gracefully.
|
- Bug #2220: handle rlcompleter attribute match failure more gracefully.
|
||||||
|
|
||||||
- Issue #2225: py_compile, when executed as a script, now returns a non-
|
- Issue #2225: py_compile, when executed as a script, now returns a non-
|
||||||
|
|
Loading…
Reference in New Issue