Merge 3.4

This commit is contained in:
Victor Stinner 2014-07-11 23:48:10 +02:00
commit 4fee7aab90
5 changed files with 58 additions and 43 deletions

View File

@ -263,8 +263,9 @@ Creating listening connections
.. method:: BaseEventLoop.create_server(protocol_factory, host=None, port=None, \*, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None) .. method:: BaseEventLoop.create_server(protocol_factory, host=None, port=None, \*, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None)
Create a TCP server bound to host and port. Return an Create a TCP server bound to host and port. Return a :class:`Server` object,
:class:`AbstractServer` object which can be used to stop the service. its :attr:`~Server.sockets` attribute contains created sockets. Use the
:meth:`Server.close` method to stop the server: close listening sockets.
This method is a :ref:`coroutine <coroutine>`. This method is a :ref:`coroutine <coroutine>`.
@ -557,17 +558,31 @@ Debug mode
Server Server
------ ------
.. class:: AbstractServer .. class:: Server
Abstract server returned by :func:`BaseEventLoop.create_server`. Server listening on sockets.
Object created by the :meth:`BaseEventLoop.create_server` method and the
:func:`start_server` function. Don't instanciate the class directly.
.. method:: close() .. method:: close()
Stop serving. This leaves existing connections open. Stop serving: close all sockets and set the :attr:`sockets` attribute to
``None``.
The server is closed asynchonously, use the :meth:`wait_closed` coroutine
to wait until the server is closed.
.. method:: wait_closed() .. method:: wait_closed()
A :ref:`coroutine <coroutine>` to wait until service is closed. Wait until the :meth:`close` method completes.
This method is a :ref:`coroutine <coroutine>`.
.. attribute:: sockets
List of :class:`socket.socket` objects the server is listening to, or
``None`` if the server is closed.
Handle Handle

View File

@ -34,29 +34,26 @@ Stream functions
.. function:: start_server(client_connected_cb, host=None, port=None, \*, loop=None, limit=None, **kwds) .. function:: start_server(client_connected_cb, host=None, port=None, \*, loop=None, limit=None, **kwds)
Start a socket server, with a callback for each client connected. Start a socket server, with a callback for each client connected. The return
value is the same as :meth:`~BaseEventLoop.create_server()`.
The first parameter, *client_connected_cb*, takes two parameters: The *client_connected_cb* parameter is called with two parameters:
*client_reader*, *client_writer*. *client_reader* is a *client_reader*, *client_writer*. *client_reader* is a
:class:`StreamReader` object, while *client_writer* is a :class:`StreamReader` object, while *client_writer* is a
:class:`StreamWriter` object. This parameter can either be a plain callback :class:`StreamWriter` object. The *client_connected_cb* parameter can
function or a :ref:`coroutine function <coroutine>`; if it is a coroutine either be a plain callback function or a :ref:`coroutine function
function, it will be automatically wrapped in a future using the <coroutine>`; if it is a coroutine function, it will be automatically
:meth:`BaseEventLoop.create_task` method. wrapped in a future using the :meth:`BaseEventLoop.create_task` method.
The rest of the arguments are all the usual arguments to The rest of the arguments are all the usual arguments to
:meth:`~BaseEventLoop.create_server()` except *protocol_factory*; most :meth:`~BaseEventLoop.create_server()` except *protocol_factory*; most
common are positional host and port, with various optional keyword arguments common are positional *host* and *port*, with various optional keyword
following. The return value is the same as arguments following.
:meth:`~BaseEventLoop.create_server()`.
Additional optional keyword arguments are *loop* (to set the event loop Additional optional keyword arguments are *loop* (to set the event loop
instance to use) and *limit* (to set the buffer limit passed to the instance to use) and *limit* (to set the buffer limit passed to the
:class:`StreamReader`). :class:`StreamReader`).
The return value is the same as :meth:`~BaseEventLoop.create_server()`, i.e.
a :class:`AbstractServer` object which can be used to stop the service.
This function is a :ref:`coroutine <coroutine>`. This function is a :ref:`coroutine <coroutine>`.
.. function:: open_unix_connection(path=None, \*, loop=None, limit=None, **kwds) .. function:: open_unix_connection(path=None, \*, loop=None, limit=None, **kwds)

View File

@ -89,43 +89,46 @@ def _raise_stop_error(*args):
class Server(events.AbstractServer): class Server(events.AbstractServer):
def __init__(self, loop, sockets): def __init__(self, loop, sockets):
self.loop = loop self._loop = loop
self.sockets = sockets self.sockets = sockets
self.active_count = 0 self._active_count = 0
self.waiters = [] self._waiters = []
def attach(self, transport): def _attach(self):
assert self.sockets is not None assert self.sockets is not None
self.active_count += 1 self._active_count += 1
def detach(self, transport): def _detach(self):
assert self.active_count > 0 assert self._active_count > 0
self.active_count -= 1 self._active_count -= 1
if self.active_count == 0 and self.sockets is None: if self._active_count == 0 and self.sockets is None:
self._wakeup() self._wakeup()
def close(self): def close(self):
sockets = self.sockets sockets = self.sockets
if sockets is not None: if sockets is None:
self.sockets = None return
for sock in sockets: self.sockets = None
self.loop._stop_serving(sock) for sock in sockets:
if self.active_count == 0: # closing sockets will call asynchronously the _detach() method
self._wakeup() # which calls _wakeup() for the last socket
self._loop._stop_serving(sock)
if self._active_count == 0:
self._wakeup()
def _wakeup(self): def _wakeup(self):
waiters = self.waiters waiters = self._waiters
self.waiters = None self._waiters = None
for waiter in waiters: for waiter in waiters:
if not waiter.done(): if not waiter.done():
waiter.set_result(waiter) waiter.set_result(waiter)
@coroutine @coroutine
def wait_closed(self): def wait_closed(self):
if self.sockets is None or self.waiters is None: if self.sockets is None or self._waiters is None:
return return
waiter = futures.Future(loop=self.loop) waiter = futures.Future(loop=self._loop)
self.waiters.append(waiter) self._waiters.append(waiter)
yield from waiter yield from waiter
@ -625,7 +628,7 @@ class BaseEventLoop(events.AbstractEventLoop):
reuse_address=None): reuse_address=None):
"""Create a TCP server bound to host and port. """Create a TCP server bound to host and port.
Return an AbstractServer object which can be used to stop the service. Return an Server object which can be used to stop the service.
This method is a coroutine. This method is a coroutine.
""" """

View File

@ -35,7 +35,7 @@ class _ProactorBasePipeTransport(transports._FlowControlMixin,
self._closing = False # Set when close() called. self._closing = False # Set when close() called.
self._eof_written = False self._eof_written = False
if self._server is not None: if self._server is not None:
self._server.attach(self) self._server._attach()
self._loop.call_soon(self._protocol.connection_made, self) self._loop.call_soon(self._protocol.connection_made, self)
if waiter is not None: if waiter is not None:
# wait until protocol.connection_made() has been called # wait until protocol.connection_made() has been called
@ -91,7 +91,7 @@ class _ProactorBasePipeTransport(transports._FlowControlMixin,
self._sock.close() self._sock.close()
server = self._server server = self._server
if server is not None: if server is not None:
server.detach(self) server._detach()
self._server = None self._server = None
def get_write_buffer_size(self): def get_write_buffer_size(self):

View File

@ -417,7 +417,7 @@ class _SelectorTransport(transports._FlowControlMixin,
self._conn_lost = 0 # Set when call to connection_lost scheduled. self._conn_lost = 0 # Set when call to connection_lost scheduled.
self._closing = False # Set when close() called. self._closing = False # Set when close() called.
if self._server is not None: if self._server is not None:
self._server.attach(self) self._server._attach()
def abort(self): def abort(self):
self._force_close(None) self._force_close(None)
@ -464,7 +464,7 @@ class _SelectorTransport(transports._FlowControlMixin,
self._loop = None self._loop = None
server = self._server server = self._server
if server is not None: if server is not None:
server.detach(self) server._detach()
self._server = None self._server = None
def get_write_buffer_size(self): def get_write_buffer_size(self):