Merge 3.4

asyncio: sync with Tulip

- Tulip issue 185: Add a create_task() method to event loops. The create_task()
  method can be overriden in custom event loop to implement their own task
  class. For example, greenio and Pulsar projects use their own task class. The
  create_task() method is now preferred over creating directly task using the
  Task class.
- tests: fix a warning
- fix typo in the name of a test function
- Update AbstractEventLoop: add new event loop methods; update also the unit test

Update asyncio documentation

- Document the new create_task() method
- "Hide" the Task class: point to the create_task() method for interoperability
- Rewrite the documentation of the Task class
- Document the "Pending task destroyed"
- Update output in debug mode of examples in the dev section
- Replace Task() with create_task() in examples
This commit is contained in:
Victor Stinner 2014-07-08 12:43:24 +02:00
commit 4532c43e16
13 changed files with 194 additions and 54 deletions

View File

@ -103,20 +103,11 @@ the logger ``'asyncio'``.
Detect coroutine objects never scheduled Detect coroutine objects never scheduled
---------------------------------------- ----------------------------------------
When a coroutine function is called but not passed to :func:`async` or to the When a coroutine function is called and its result is not passed to
:class:`Task` constructor, it is not scheduled and it is probably a bug. :func:`async` or to the :meth:`BaseEventLoop.create_task` method: the execution
of the coroutine objet will never be scheduled and it is probably a bug.
To detect such bug, :ref:`enable the debug mode of asyncio :ref:`Enable the debug mode of asyncio <asyncio-debug-mode>` to :ref:`log a
<asyncio-debug-mode>`. When the coroutine object is destroyed by the garbage warning <asyncio-logger>` to detect it.
collector, a log will be emitted with the traceback where the coroutine
function was called. See the :ref:`asyncio logger <asyncio-logger>`.
The debug flag changes the behaviour of the :func:`coroutine` decorator. The
debug flag value is only used when then coroutine function is defined, not when
it is called. Coroutine functions defined before the debug flag is set to
``True`` will not be tracked. For example, it is not possible to debug
coroutines defined in the :mod:`asyncio` module, because the module must be
imported before the flag value can be changed.
Example with the bug:: Example with the bug::
@ -130,20 +121,27 @@ Example with the bug::
Output in debug mode:: Output in debug mode::
Coroutine 'test' defined at test.py:4 was never yielded from Coroutine test() at test.py:3 was never yielded from
Coroutine object created at (most recent call last):
File "test.py", line 7, in <module>
test()
The fix is to call the :func:`async` function or create a :class:`Task` object The fix is to call the :func:`async` function or the
with this coroutine object. :meth:`BaseEventLoop.create_task` method with the coroutine object.
.. seealso::
:ref:`Pending task destroyed <asyncio-pending-task-destroyed>`.
Detect exceptions not consumed Detect exceptions never consumed
------------------------------ --------------------------------
Python usually calls :func:`sys.displayhook` on unhandled exceptions. If Python usually calls :func:`sys.displayhook` on unhandled exceptions. If
:meth:`Future.set_exception` is called, but the exception is not consumed, :meth:`Future.set_exception` is called, but the exception is never consumed,
:func:`sys.displayhook` is not called. Instead, a log is emitted when the :func:`sys.displayhook` is not called. Instead, a :ref:`a log is emitted
future is deleted by the garbage collector, with the traceback where the <asyncio-logger>` when the future is deleted by the garbage collector, with the
exception was raised. See the :ref:`asyncio logger <asyncio-logger>`. traceback where the exception was raised.
Example of unhandled exception:: Example of unhandled exception::
@ -159,16 +157,27 @@ Example of unhandled exception::
Output:: Output::
Future/Task exception was never retrieved: Task exception was never retrieved
future: <Task finished bug() done at asyncio/coroutines.py:139 exception=Exception('not consumed',)>
source_traceback: Object created at (most recent call last):
File "test.py", line 10, in <module>
asyncio.async(bug())
File "asyncio/tasks.py", line 510, in async
task = loop.create_task(coro_or_future)
Traceback (most recent call last): Traceback (most recent call last):
File "/usr/lib/python3.4/asyncio/tasks.py", line 279, in _step File "asyncio/tasks.py", line 244, in _step
result = next(coro) result = next(coro)
File "/usr/lib/python3.4/asyncio/tasks.py", line 80, in coro File "coroutines.py", line 78, in __next__
return next(self.gen)
File "asyncio/coroutines.py", line 141, in coro
res = func(*args, **kw) res = func(*args, **kw)
File "test.py", line 5, in bug File "test.py", line 7, in bug
raise Exception("not consumed") raise Exception("not consumed")
Exception: not consumed Exception: not consumed
:ref:`Enable the debug mode of asyncio <asyncio-debug-mode>` to get the
traceback where the task was created.
There are different options to fix this issue. The first option is to chain to There are different options to fix this issue. The first option is to chain to
coroutine in another coroutine and use classic try/except:: coroutine in another coroutine and use classic try/except::
@ -195,7 +204,7 @@ function::
See also the :meth:`Future.exception` method. See also the :meth:`Future.exception` method.
Chain coroutines correctly Chain correctly coroutines
-------------------------- --------------------------
When a coroutine function calls other coroutine functions and tasks, they When a coroutine function calls other coroutine functions and tasks, they
@ -246,7 +255,9 @@ Actual output::
(3) close file (3) close file
(2) write into file (2) write into file
Pending tasks at exit: {Task(<create>)<PENDING>} Pending tasks at exit: {<Task pending create() at test.py:7 wait_for=<Future pending cb=[Task._wakeup()]>>}
Task was destroyed but it is pending!
task: <Task pending create() done at test.py:5 wait_for=<Future pending cb=[Task._wakeup()]>>
The loop stopped before the ``create()`` finished, ``close()`` has been called The loop stopped before the ``create()`` finished, ``close()`` has been called
before ``write()``, whereas coroutine functions were called in this order: before ``write()``, whereas coroutine functions were called in this order:
@ -272,3 +283,29 @@ Or without ``asyncio.async()``::
yield from asyncio.sleep(2.0) yield from asyncio.sleep(2.0)
loop.stop() loop.stop()
.. _asyncio-pending-task-destroyed:
Pending task destroyed
----------------------
If a pending task is destroyed, the execution of its wrapped :ref:`coroutine
<coroutine>` did not complete. It is probably a bug and so a warning is logged.
Example of log::
Task was destroyed but it is pending!
source_traceback: Object created at (most recent call last):
File "test.py", line 17, in <module>
task = asyncio.async(coro, loop=loop)
File "asyncio/tasks.py", line 510, in async
task = loop.create_task(coro_or_future)
task: <Task pending kill_me() done at test.py:5 wait_for=<Future pending cb=[Task._wakeup()]>>
:ref:`Enable the debug mode of asyncio <asyncio-debug-mode>` to get the
traceback where the task was created.
.. seealso::
:ref:`Detect coroutine objects never scheduled <asyncio-coroutine-not-scheduled>`.

View File

@ -102,8 +102,8 @@ Run an event loop
Run until the :class:`Future` is done. Run until the :class:`Future` is done.
If the argument is a :ref:`coroutine <coroutine>`, it is wrapped If the argument is a :ref:`coroutine object <coroutine>`, it is wrapped by
in a :class:`Task`. :func:`async`.
Return the Future's result, or raise its exception. Return the Future's result, or raise its exception.
@ -205,6 +205,25 @@ a different clock than :func:`time.time`.
The :func:`asyncio.sleep` function. The :func:`asyncio.sleep` function.
Coroutines
----------
.. method:: BaseEventLoop.create_task(coro)
Schedule the execution of a :ref:`coroutine object <coroutine>`: wrap it in
a future. Return a :class:`Task` object.
Third-party event loops can use their own subclass of :class:`Task` for
interoperability. In this case, the result type is a subclass of
:class:`Task`.
.. seealso::
The :meth:`async` function.
.. versionadded:: 3.4.2
Creating connections Creating connections
-------------------- --------------------

View File

@ -41,7 +41,8 @@ Stream functions
:class:`StreamReader` object, while *client_writer* is a :class:`StreamReader` object, while *client_writer* is a
:class:`StreamWriter` object. This parameter can either be a plain callback :class:`StreamWriter` object. This parameter can either be a plain callback
function or a :ref:`coroutine function <coroutine>`; if it is a coroutine function or a :ref:`coroutine function <coroutine>`; if it is a coroutine
function, it will be automatically converted into a :class:`Task`. function, it will be automatically wrapped in a future using the
:meth:`BaseEventLoop.create_task` method.
The rest of the arguments are all the usual arguments to The rest of the arguments are all the usual arguments to
:meth:`~BaseEventLoop.create_server()` except *protocol_factory*; most :meth:`~BaseEventLoop.create_server()` except *protocol_factory*; most

View File

@ -51,8 +51,8 @@ generator, and the coroutine object returned by the call is really a
generator object, which doesn't do anything until you iterate over it. generator object, which doesn't do anything until you iterate over it.
In the case of a coroutine object, there are two basic ways to start In the case of a coroutine object, there are two basic ways to start
it running: call ``yield from coroutine`` from another coroutine it running: call ``yield from coroutine`` from another coroutine
(assuming the other coroutine is already running!), or convert it to a (assuming the other coroutine is already running!), or schedule its execution
:class:`Task`. using the :meth:`BaseEventLoop.create_task` method.
Coroutines (and tasks) can only run when the event loop is running. Coroutines (and tasks) can only run when the event loop is running.
@ -256,7 +256,7 @@ Example combining a :class:`Future` and a :ref:`coroutine function
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
future = asyncio.Future() future = asyncio.Future()
asyncio.Task(slow_operation(future)) loop.create_task(slow_operation(future))
loop.run_until_complete(future) loop.run_until_complete(future)
print(future.result()) print(future.result())
loop.close() loop.close()
@ -292,7 +292,7 @@ flow::
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
future = asyncio.Future() future = asyncio.Future()
asyncio.Task(slow_operation(future)) loop.create_task(slow_operation(future))
future.add_done_callback(got_result) future.add_done_callback(got_result)
try: try:
loop.run_forever() loop.run_forever()
@ -314,7 +314,33 @@ Task
.. class:: Task(coro, \*, loop=None) .. class:: Task(coro, \*, loop=None)
A coroutine object wrapped in a :class:`Future`. Subclass of :class:`Future`. Schedule the execution of a :ref:`coroutine <coroutine>`: wrap it in a
future. A task is a subclass of :class:`Future`.
A task is responsible to execute a coroutine object in an event loop. If
the wrapped coroutine yields from a future, the task suspends the execution
of the wrapped coroutine and waits for the completition of the future. When
the future is done, the execution of the wrapped coroutine restarts with the
result or the exception of the future.
Event loops use cooperative scheduling: an event loop only runs one task at
the same time. Other tasks may run in parallel if other event loops are
running in different threads. While a task waits for the completion of a
future, the event loop executes a new task.
The cancellation of a task is different than cancelling a future. Calling
:meth:`cancel` will throw a :exc:`~concurrent.futures.CancelledError` to the
wrapped coroutine. :meth:`~Future.cancelled` only returns ``True`` if the
wrapped coroutine did not catch the
:exc:`~concurrent.futures.CancelledError` exception, or raised a
:exc:`~concurrent.futures.CancelledError` exception.
If a pending task is destroyed, the execution of its wrapped :ref:`coroutine
<coroutine>` did not complete. It is probably a bug and a warning is
logged: see :ref:`Pending task destroyed <asyncio-pending-task-destroyed>`.
Don't create directly :class:`Task` instances: use the
:meth:`BaseEventLoop.create_task` method.
.. classmethod:: all_tasks(loop=None) .. classmethod:: all_tasks(loop=None)
@ -396,12 +422,11 @@ Example executing 3 tasks (A, B, C) in parallel::
f *= i f *= i
print("Task %s: factorial(%s) = %s" % (name, number, f)) print("Task %s: factorial(%s) = %s" % (name, number, f))
tasks = [
asyncio.Task(factorial("A", 2)),
asyncio.Task(factorial("B", 3)),
asyncio.Task(factorial("C", 4))]
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
tasks = [
loop.create_task(factorial("A", 2)),
loop.create_task(factorial("B", 3)),
loop.create_task(factorial("C", 4))]
loop.run_until_complete(asyncio.wait(tasks)) loop.run_until_complete(asyncio.wait(tasks))
loop.close() loop.close()
@ -450,7 +475,8 @@ Task functions
.. function:: async(coro_or_future, \*, loop=None) .. function:: async(coro_or_future, \*, loop=None)
Wrap a :ref:`coroutine object <coroutine>` in a future. Wrap a :ref:`coroutine object <coroutine>` in a future using the
:meth:`BaseEventLoop.create_task` method.
If the argument is a :class:`Future`, it is returned directly. If the argument is a :class:`Future`, it is returned directly.
@ -566,18 +592,17 @@ Task functions
.. function:: wait_for(fut, timeout, \*, loop=None) .. function:: wait_for(fut, timeout, \*, loop=None)
Wait for the single :class:`Future` or :ref:`coroutine object <coroutine>` Wait for the single :class:`Future` or :ref:`coroutine object <coroutine>`
to complete, with timeout. If *timeout* is ``None``, block until the future to complete with timeout. If *timeout* is ``None``, block until the future
completes. completes.
Coroutine will be wrapped in :class:`Task`. Coroutine objects are wrapped in a future using the
:meth:`BaseEventLoop.create_task` method.
Returns result of the Future or coroutine. When a timeout occurs, it Returns result of the Future or coroutine. When a timeout occurs, it
cancels the task and raises :exc:`asyncio.TimeoutError`. To avoid the task cancels the task and raises :exc:`asyncio.TimeoutError`. To avoid the task
cancellation, wrap it in :func:`shield`. cancellation, wrap it in :func:`shield`.
This function is a :ref:`coroutine <coroutine>`. This function is a :ref:`coroutine <coroutine>`, usage::
Usage:: result = yield from asyncio.wait_for(fut, 60.0)
result = yield from asyncio.wait_for(fut, 60.0)

View File

@ -151,6 +151,12 @@ class BaseEventLoop(events.AbstractEventLoop):
% (self.__class__.__name__, self.is_running(), % (self.__class__.__name__, self.is_running(),
self.is_closed(), self.get_debug())) self.is_closed(), self.get_debug()))
def create_task(self, coro):
"""Schedule a coroutine object.
Return a task object."""
return tasks.Task(coro, loop=self)
def _make_socket_transport(self, sock, protocol, waiter=None, *, def _make_socket_transport(self, sock, protocol, waiter=None, *,
extra=None, server=None): extra=None, server=None):
"""Create socket transport.""" """Create socket transport."""

View File

@ -200,6 +200,10 @@ class AbstractEventLoop:
"""Return whether the event loop is currently running.""" """Return whether the event loop is currently running."""
raise NotImplementedError raise NotImplementedError
def is_closed(self):
"""Returns True if the event loop was closed."""
raise NotImplementedError
def close(self): def close(self):
"""Close the loop. """Close the loop.
@ -225,6 +229,11 @@ class AbstractEventLoop:
def time(self): def time(self):
raise NotImplementedError raise NotImplementedError
# Method scheduling a coroutine object: create a task.
def create_task(self, coro):
raise NotImplementedError
# Methods for interacting with threads. # Methods for interacting with threads.
def call_soon_threadsafe(self, callback, *args): def call_soon_threadsafe(self, callback, *args):

View File

@ -213,7 +213,7 @@ class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
res = self._client_connected_cb(self._stream_reader, res = self._client_connected_cb(self._stream_reader,
self._stream_writer) self._stream_writer)
if coroutines.iscoroutine(res): if coroutines.iscoroutine(res):
tasks.Task(res, loop=self._loop) self._loop.create_task(res)
def connection_lost(self, exc): def connection_lost(self, exc):
if exc is None: if exc is None:

View File

@ -505,7 +505,9 @@ def async(coro_or_future, *, loop=None):
raise ValueError('loop argument must agree with Future') raise ValueError('loop argument must agree with Future')
return coro_or_future return coro_or_future
elif coroutines.iscoroutine(coro_or_future): elif coroutines.iscoroutine(coro_or_future):
task = Task(coro_or_future, loop=loop) if loop is None:
loop = events.get_event_loop()
task = loop.create_task(coro_or_future)
if task._source_traceback: if task._source_traceback:
del task._source_traceback[-1] del task._source_traceback[-1]
return task return task

View File

@ -48,7 +48,7 @@ def run_briefly(loop):
def once(): def once():
pass pass
gen = once() gen = once()
t = tasks.Task(gen, loop=loop) t = loop.create_task(gen)
# Don't log a warning if the task is not done after run_until_complete(). # Don't log a warning if the task is not done after run_until_complete().
# It occurs if the loop is stopped or if a task raises a BaseException. # It occurs if the loop is stopped or if a task raises a BaseException.
t._log_destroy_pending = False t._log_destroy_pending = False

View File

@ -12,6 +12,7 @@ from test.support import IPV6_ENABLED
import asyncio import asyncio
from asyncio import base_events from asyncio import base_events
from asyncio import events
from asyncio import constants from asyncio import constants
from asyncio import test_utils from asyncio import test_utils
@ -526,6 +527,29 @@ class BaseEventLoopTests(test_utils.TestCase):
PYTHONASYNCIODEBUG='1') PYTHONASYNCIODEBUG='1')
self.assertEqual(stdout.rstrip(), b'False') self.assertEqual(stdout.rstrip(), b'False')
def test_create_task(self):
class MyTask(asyncio.Task):
pass
@asyncio.coroutine
def test():
pass
class EventLoop(base_events.BaseEventLoop):
def create_task(self, coro):
return MyTask(coro, loop=loop)
loop = EventLoop()
self.set_event_loop(loop)
coro = test()
task = asyncio.async(coro, loop=loop)
self.assertIsInstance(task, MyTask)
# make warnings quiet
task._log_destroy_pending = False
coro.close()
class MyProto(asyncio.Protocol): class MyProto(asyncio.Protocol):
done = None done = None

View File

@ -1968,8 +1968,12 @@ class AbstractEventLoopTests(unittest.TestCase):
NotImplementedError, loop.stop) NotImplementedError, loop.stop)
self.assertRaises( self.assertRaises(
NotImplementedError, loop.is_running) NotImplementedError, loop.is_running)
self.assertRaises(
NotImplementedError, loop.is_closed)
self.assertRaises( self.assertRaises(
NotImplementedError, loop.close) NotImplementedError, loop.close)
self.assertRaises(
NotImplementedError, loop.create_task, None)
self.assertRaises( self.assertRaises(
NotImplementedError, loop.call_later, None, None) NotImplementedError, loop.call_later, None, None)
self.assertRaises( self.assertRaises(
@ -2027,6 +2031,16 @@ class AbstractEventLoopTests(unittest.TestCase):
mock.sentinel) mock.sentinel)
self.assertRaises( self.assertRaises(
NotImplementedError, loop.subprocess_exec, f) NotImplementedError, loop.subprocess_exec, f)
self.assertRaises(
NotImplementedError, loop.set_exception_handler, f)
self.assertRaises(
NotImplementedError, loop.default_exception_handler, f)
self.assertRaises(
NotImplementedError, loop.call_exception_handler, f)
self.assertRaises(
NotImplementedError, loop.get_debug)
self.assertRaises(
NotImplementedError, loop.set_debug, f)
class ProtocolsAbsTests(unittest.TestCase): class ProtocolsAbsTests(unittest.TestCase):

View File

@ -301,12 +301,12 @@ class FutureTests(test_utils.TestCase):
def test_future_exception_never_retrieved(self, m_log): def test_future_exception_never_retrieved(self, m_log):
self.loop.set_debug(True) self.loop.set_debug(True)
def memroy_error(): def memory_error():
try: try:
raise MemoryError() raise MemoryError()
except BaseException as exc: except BaseException as exc:
return exc return exc
exc = memroy_error() exc = memory_error()
future = asyncio.Future(loop=self.loop) future = asyncio.Future(loop=self.loop)
source_traceback = future._source_traceback source_traceback = future._source_traceback

View File

@ -233,6 +233,9 @@ class TaskTests(test_utils.TestCase):
self.assertRegex(repr(task), self.assertRegex(repr(task),
'<Task .* wait_for=%s>' % re.escape(repr(fut))) '<Task .* wait_for=%s>' % re.escape(repr(fut)))
fut.set_result(None)
self.loop.run_until_complete(task)
def test_task_basics(self): def test_task_basics(self):
@asyncio.coroutine @asyncio.coroutine
def outer(): def outer():