From 896a25ab30269369201401b50c66130911dd2238 Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Tue, 8 Jul 2014 11:29:25 +0200 Subject: [PATCH 1/2] asyncio: sync with Tulip - Tulip issue 185: Add a create_task() method to event loops. The create_task() method can be overriden in custom event loop to implement their own task class. For example, greenio and Pulsar projects use their own task class. The create_task() method is now preferred over creating directly task using the Task class. - tests: fix a warning - fix typo in the name of a test function - Update AbstractEventLoop: add new event loop methods; update also the unit test --- Lib/asyncio/base_events.py | 6 ++++++ Lib/asyncio/events.py | 9 +++++++++ Lib/asyncio/streams.py | 2 +- Lib/asyncio/tasks.py | 4 +++- Lib/asyncio/test_utils.py | 2 +- Lib/test/test_asyncio/test_base_events.py | 24 +++++++++++++++++++++++ Lib/test/test_asyncio/test_events.py | 14 +++++++++++++ Lib/test/test_asyncio/test_futures.py | 4 ++-- Lib/test/test_asyncio/test_tasks.py | 3 +++ 9 files changed, 63 insertions(+), 5 deletions(-) diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py index 2230dc2c9cc..52c5517b2f8 100644 --- a/Lib/asyncio/base_events.py +++ b/Lib/asyncio/base_events.py @@ -151,6 +151,12 @@ class BaseEventLoop(events.AbstractEventLoop): % (self.__class__.__name__, self.is_running(), self.is_closed(), self.get_debug())) + def create_task(self, coro): + """Schedule a coroutine object. + + Return a task object.""" + return tasks.Task(coro, loop=self) + def _make_socket_transport(self, sock, protocol, waiter=None, *, extra=None, server=None): """Create socket transport.""" diff --git a/Lib/asyncio/events.py b/Lib/asyncio/events.py index b389cfb0889..1f5e5824e51 100644 --- a/Lib/asyncio/events.py +++ b/Lib/asyncio/events.py @@ -200,6 +200,10 @@ class AbstractEventLoop: """Return whether the event loop is currently running.""" raise NotImplementedError + def is_closed(self): + """Returns True if the event loop was closed.""" + raise NotImplementedError + def close(self): """Close the loop. @@ -225,6 +229,11 @@ class AbstractEventLoop: def time(self): raise NotImplementedError + # Method scheduling a coroutine object: create a task. + + def create_task(self, coro): + raise NotImplementedError + # Methods for interacting with threads. def call_soon_threadsafe(self, callback, *args): diff --git a/Lib/asyncio/streams.py b/Lib/asyncio/streams.py index a10b969c990..9bde218bfa4 100644 --- a/Lib/asyncio/streams.py +++ b/Lib/asyncio/streams.py @@ -213,7 +213,7 @@ class StreamReaderProtocol(FlowControlMixin, protocols.Protocol): res = self._client_connected_cb(self._stream_reader, self._stream_writer) if coroutines.iscoroutine(res): - tasks.Task(res, loop=self._loop) + self._loop.create_task(res) def connection_lost(self, exc): if exc is None: diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py index 8c7217b702b..befc2967c71 100644 --- a/Lib/asyncio/tasks.py +++ b/Lib/asyncio/tasks.py @@ -505,7 +505,9 @@ def async(coro_or_future, *, loop=None): raise ValueError('loop argument must agree with Future') return coro_or_future elif coroutines.iscoroutine(coro_or_future): - task = Task(coro_or_future, loop=loop) + if loop is None: + loop = events.get_event_loop() + task = loop.create_task(coro_or_future) if task._source_traceback: del task._source_traceback[-1] return task diff --git a/Lib/asyncio/test_utils.py b/Lib/asyncio/test_utils.py index ef3be236de3..6abcaf1d379 100644 --- a/Lib/asyncio/test_utils.py +++ b/Lib/asyncio/test_utils.py @@ -48,7 +48,7 @@ def run_briefly(loop): def once(): pass gen = once() - t = tasks.Task(gen, loop=loop) + t = loop.create_task(gen) # Don't log a warning if the task is not done after run_until_complete(). # It occurs if the loop is stopped or if a task raises a BaseException. t._log_destroy_pending = False diff --git a/Lib/test/test_asyncio/test_base_events.py b/Lib/test/test_asyncio/test_base_events.py index adba082b5e9..f6da7c375ef 100644 --- a/Lib/test/test_asyncio/test_base_events.py +++ b/Lib/test/test_asyncio/test_base_events.py @@ -12,6 +12,7 @@ from test.support import IPV6_ENABLED import asyncio from asyncio import base_events +from asyncio import events from asyncio import constants from asyncio import test_utils @@ -526,6 +527,29 @@ class BaseEventLoopTests(test_utils.TestCase): PYTHONASYNCIODEBUG='1') self.assertEqual(stdout.rstrip(), b'False') + def test_create_task(self): + class MyTask(asyncio.Task): + pass + + @asyncio.coroutine + def test(): + pass + + class EventLoop(base_events.BaseEventLoop): + def create_task(self, coro): + return MyTask(coro, loop=loop) + + loop = EventLoop() + self.set_event_loop(loop) + + coro = test() + task = asyncio.async(coro, loop=loop) + self.assertIsInstance(task, MyTask) + + # make warnings quiet + task._log_destroy_pending = False + coro.close() + class MyProto(asyncio.Protocol): done = None diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py index beb6cecf2dd..b89416fb576 100644 --- a/Lib/test/test_asyncio/test_events.py +++ b/Lib/test/test_asyncio/test_events.py @@ -1968,8 +1968,12 @@ class AbstractEventLoopTests(unittest.TestCase): NotImplementedError, loop.stop) self.assertRaises( NotImplementedError, loop.is_running) + self.assertRaises( + NotImplementedError, loop.is_closed) self.assertRaises( NotImplementedError, loop.close) + self.assertRaises( + NotImplementedError, loop.create_task, None) self.assertRaises( NotImplementedError, loop.call_later, None, None) self.assertRaises( @@ -2027,6 +2031,16 @@ class AbstractEventLoopTests(unittest.TestCase): mock.sentinel) self.assertRaises( NotImplementedError, loop.subprocess_exec, f) + self.assertRaises( + NotImplementedError, loop.set_exception_handler, f) + self.assertRaises( + NotImplementedError, loop.default_exception_handler, f) + self.assertRaises( + NotImplementedError, loop.call_exception_handler, f) + self.assertRaises( + NotImplementedError, loop.get_debug) + self.assertRaises( + NotImplementedError, loop.set_debug, f) class ProtocolsAbsTests(unittest.TestCase): diff --git a/Lib/test/test_asyncio/test_futures.py b/Lib/test/test_asyncio/test_futures.py index a6071ea76ba..157adb7f7d9 100644 --- a/Lib/test/test_asyncio/test_futures.py +++ b/Lib/test/test_asyncio/test_futures.py @@ -301,12 +301,12 @@ class FutureTests(test_utils.TestCase): def test_future_exception_never_retrieved(self, m_log): self.loop.set_debug(True) - def memroy_error(): + def memory_error(): try: raise MemoryError() except BaseException as exc: return exc - exc = memroy_error() + exc = memory_error() future = asyncio.Future(loop=self.loop) source_traceback = future._source_traceback diff --git a/Lib/test/test_asyncio/test_tasks.py b/Lib/test/test_asyncio/test_tasks.py index eaef05b50dd..afadc7c1ed5 100644 --- a/Lib/test/test_asyncio/test_tasks.py +++ b/Lib/test/test_asyncio/test_tasks.py @@ -233,6 +233,9 @@ class TaskTests(test_utils.TestCase): self.assertRegex(repr(task), '' % re.escape(repr(fut))) + fut.set_result(None) + self.loop.run_until_complete(task) + def test_task_basics(self): @asyncio.coroutine def outer(): From 530ef2f0693d50435a8d62ea84d3fdcbe662d8aa Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Tue, 8 Jul 2014 12:39:10 +0200 Subject: [PATCH 2/2] Update asyncio documentation - Document the new create_task() method - "Hide" the Task class: point to the create_task() method for interoperability - Rewrite the documentation of the Task class - Document the "Pending task destroyed" - Update output in debug mode of examples in the dev section - Replace Task() with create_task() in examples --- Doc/library/asyncio-dev.rst | 95 +++++++++++++++++++++---------- Doc/library/asyncio-eventloop.rst | 23 +++++++- Doc/library/asyncio-stream.rst | 3 +- Doc/library/asyncio-task.rst | 59 +++++++++++++------ 4 files changed, 131 insertions(+), 49 deletions(-) diff --git a/Doc/library/asyncio-dev.rst b/Doc/library/asyncio-dev.rst index 2b3ad9417bc..bf6812118bb 100644 --- a/Doc/library/asyncio-dev.rst +++ b/Doc/library/asyncio-dev.rst @@ -103,20 +103,11 @@ the logger ``'asyncio'``. Detect coroutine objects never scheduled ---------------------------------------- -When a coroutine function is called but not passed to :func:`async` or to the -:class:`Task` constructor, it is not scheduled and it is probably a bug. - -To detect such bug, :ref:`enable the debug mode of asyncio -`. When the coroutine object is destroyed by the garbage -collector, a log will be emitted with the traceback where the coroutine -function was called. See the :ref:`asyncio logger `. - -The debug flag changes the behaviour of the :func:`coroutine` decorator. The -debug flag value is only used when then coroutine function is defined, not when -it is called. Coroutine functions defined before the debug flag is set to -``True`` will not be tracked. For example, it is not possible to debug -coroutines defined in the :mod:`asyncio` module, because the module must be -imported before the flag value can be changed. +When a coroutine function is called and its result is not passed to +:func:`async` or to the :meth:`BaseEventLoop.create_task` method: the execution +of the coroutine objet will never be scheduled and it is probably a bug. +:ref:`Enable the debug mode of asyncio ` to :ref:`log a +warning ` to detect it. Example with the bug:: @@ -130,20 +121,27 @@ Example with the bug:: Output in debug mode:: - Coroutine 'test' defined at test.py:4 was never yielded from + Coroutine test() at test.py:3 was never yielded from + Coroutine object created at (most recent call last): + File "test.py", line 7, in + test() -The fix is to call the :func:`async` function or create a :class:`Task` object -with this coroutine object. +The fix is to call the :func:`async` function or the +:meth:`BaseEventLoop.create_task` method with the coroutine object. + +.. seealso:: + + :ref:`Pending task destroyed `. -Detect exceptions not consumed ------------------------------- +Detect exceptions never consumed +-------------------------------- Python usually calls :func:`sys.displayhook` on unhandled exceptions. If -:meth:`Future.set_exception` is called, but the exception is not consumed, -:func:`sys.displayhook` is not called. Instead, a log is emitted when the -future is deleted by the garbage collector, with the traceback where the -exception was raised. See the :ref:`asyncio logger `. +:meth:`Future.set_exception` is called, but the exception is never consumed, +:func:`sys.displayhook` is not called. Instead, a :ref:`a log is emitted +` when the future is deleted by the garbage collector, with the +traceback where the exception was raised. Example of unhandled exception:: @@ -159,16 +157,27 @@ Example of unhandled exception:: Output:: - Future/Task exception was never retrieved: + Task exception was never retrieved + future: + source_traceback: Object created at (most recent call last): + File "test.py", line 10, in + asyncio.async(bug()) + File "asyncio/tasks.py", line 510, in async + task = loop.create_task(coro_or_future) Traceback (most recent call last): - File "/usr/lib/python3.4/asyncio/tasks.py", line 279, in _step + File "asyncio/tasks.py", line 244, in _step result = next(coro) - File "/usr/lib/python3.4/asyncio/tasks.py", line 80, in coro + File "coroutines.py", line 78, in __next__ + return next(self.gen) + File "asyncio/coroutines.py", line 141, in coro res = func(*args, **kw) - File "test.py", line 5, in bug + File "test.py", line 7, in bug raise Exception("not consumed") Exception: not consumed +:ref:`Enable the debug mode of asyncio ` to get the +traceback where the task was created. + There are different options to fix this issue. The first option is to chain to coroutine in another coroutine and use classic try/except:: @@ -195,7 +204,7 @@ function:: See also the :meth:`Future.exception` method. -Chain coroutines correctly +Chain correctly coroutines -------------------------- When a coroutine function calls other coroutine functions and tasks, they @@ -246,7 +255,9 @@ Actual output:: (3) close file (2) write into file - Pending tasks at exit: {Task()} + Pending tasks at exit: {>} + Task was destroyed but it is pending! + task: > The loop stopped before the ``create()`` finished, ``close()`` has been called before ``write()``, whereas coroutine functions were called in this order: @@ -272,3 +283,29 @@ Or without ``asyncio.async()``:: yield from asyncio.sleep(2.0) loop.stop() + +.. _asyncio-pending-task-destroyed: + +Pending task destroyed +---------------------- + +If a pending task is destroyed, the execution of its wrapped :ref:`coroutine +` did not complete. It is probably a bug and so a warning is logged. + +Example of log:: + + Task was destroyed but it is pending! + source_traceback: Object created at (most recent call last): + File "test.py", line 17, in + task = asyncio.async(coro, loop=loop) + File "asyncio/tasks.py", line 510, in async + task = loop.create_task(coro_or_future) + task: > + +:ref:`Enable the debug mode of asyncio ` to get the +traceback where the task was created. + +.. seealso:: + + :ref:`Detect coroutine objects never scheduled `. + diff --git a/Doc/library/asyncio-eventloop.rst b/Doc/library/asyncio-eventloop.rst index 268fa41fb00..1a80921e15a 100644 --- a/Doc/library/asyncio-eventloop.rst +++ b/Doc/library/asyncio-eventloop.rst @@ -102,8 +102,8 @@ Run an event loop Run until the :class:`Future` is done. - If the argument is a :ref:`coroutine `, it is wrapped - in a :class:`Task`. + If the argument is a :ref:`coroutine object `, it is wrapped by + :func:`async`. Return the Future's result, or raise its exception. @@ -205,6 +205,25 @@ a different clock than :func:`time.time`. The :func:`asyncio.sleep` function. +Coroutines +---------- + +.. method:: BaseEventLoop.create_task(coro) + + Schedule the execution of a :ref:`coroutine object `: wrap it in + a future. Return a :class:`Task` object. + + Third-party event loops can use their own subclass of :class:`Task` for + interoperability. In this case, the result type is a subclass of + :class:`Task`. + + .. seealso:: + + The :meth:`async` function. + + .. versionadded:: 3.4.2 + + Creating connections -------------------- diff --git a/Doc/library/asyncio-stream.rst b/Doc/library/asyncio-stream.rst index 4543af4c2e1..f6b126de77e 100644 --- a/Doc/library/asyncio-stream.rst +++ b/Doc/library/asyncio-stream.rst @@ -41,7 +41,8 @@ Stream functions :class:`StreamReader` object, while *client_writer* is a :class:`StreamWriter` object. This parameter can either be a plain callback function or a :ref:`coroutine function `; if it is a coroutine - function, it will be automatically converted into a :class:`Task`. + function, it will be automatically wrapped in a future using the + :meth:`BaseEventLoop.create_task` method. The rest of the arguments are all the usual arguments to :meth:`~BaseEventLoop.create_server()` except *protocol_factory*; most diff --git a/Doc/library/asyncio-task.rst b/Doc/library/asyncio-task.rst index 35446577cfa..316a694ce12 100644 --- a/Doc/library/asyncio-task.rst +++ b/Doc/library/asyncio-task.rst @@ -51,8 +51,8 @@ generator, and the coroutine object returned by the call is really a generator object, which doesn't do anything until you iterate over it. In the case of a coroutine object, there are two basic ways to start it running: call ``yield from coroutine`` from another coroutine -(assuming the other coroutine is already running!), or convert it to a -:class:`Task`. +(assuming the other coroutine is already running!), or schedule its execution +using the :meth:`BaseEventLoop.create_task` method. Coroutines (and tasks) can only run when the event loop is running. @@ -256,7 +256,7 @@ Example combining a :class:`Future` and a :ref:`coroutine function loop = asyncio.get_event_loop() future = asyncio.Future() - asyncio.Task(slow_operation(future)) + loop.create_task(slow_operation(future)) loop.run_until_complete(future) print(future.result()) loop.close() @@ -292,7 +292,7 @@ flow:: loop = asyncio.get_event_loop() future = asyncio.Future() - asyncio.Task(slow_operation(future)) + loop.create_task(slow_operation(future)) future.add_done_callback(got_result) try: loop.run_forever() @@ -314,7 +314,33 @@ Task .. class:: Task(coro, \*, loop=None) - A coroutine object wrapped in a :class:`Future`. Subclass of :class:`Future`. + Schedule the execution of a :ref:`coroutine `: wrap it in a + future. A task is a subclass of :class:`Future`. + + A task is responsible to execute a coroutine object in an event loop. If + the wrapped coroutine yields from a future, the task suspends the execution + of the wrapped coroutine and waits for the completition of the future. When + the future is done, the execution of the wrapped coroutine restarts with the + result or the exception of the future. + + Event loops use cooperative scheduling: an event loop only runs one task at + the same time. Other tasks may run in parallel if other event loops are + running in different threads. While a task waits for the completion of a + future, the event loop executes a new task. + + The cancellation of a task is different than cancelling a future. Calling + :meth:`cancel` will throw a :exc:`~concurrent.futures.CancelledError` to the + wrapped coroutine. :meth:`~Future.cancelled` only returns ``True`` if the + wrapped coroutine did not catch the + :exc:`~concurrent.futures.CancelledError` exception, or raised a + :exc:`~concurrent.futures.CancelledError` exception. + + If a pending task is destroyed, the execution of its wrapped :ref:`coroutine + ` did not complete. It is probably a bug and a warning is + logged: see :ref:`Pending task destroyed `. + + Don't create directly :class:`Task` instances: use the + :meth:`BaseEventLoop.create_task` method. .. classmethod:: all_tasks(loop=None) @@ -396,12 +422,11 @@ Example executing 3 tasks (A, B, C) in parallel:: f *= i print("Task %s: factorial(%s) = %s" % (name, number, f)) - tasks = [ - asyncio.Task(factorial("A", 2)), - asyncio.Task(factorial("B", 3)), - asyncio.Task(factorial("C", 4))] - loop = asyncio.get_event_loop() + tasks = [ + loop.create_task(factorial("A", 2)), + loop.create_task(factorial("B", 3)), + loop.create_task(factorial("C", 4))] loop.run_until_complete(asyncio.wait(tasks)) loop.close() @@ -450,7 +475,8 @@ Task functions .. function:: async(coro_or_future, \*, loop=None) - Wrap a :ref:`coroutine object ` in a future. + Wrap a :ref:`coroutine object ` in a future using the + :meth:`BaseEventLoop.create_task` method. If the argument is a :class:`Future`, it is returned directly. @@ -566,18 +592,17 @@ Task functions .. function:: wait_for(fut, timeout, \*, loop=None) Wait for the single :class:`Future` or :ref:`coroutine object ` - to complete, with timeout. If *timeout* is ``None``, block until the future + to complete with timeout. If *timeout* is ``None``, block until the future completes. - Coroutine will be wrapped in :class:`Task`. + Coroutine objects are wrapped in a future using the + :meth:`BaseEventLoop.create_task` method. Returns result of the Future or coroutine. When a timeout occurs, it cancels the task and raises :exc:`asyncio.TimeoutError`. To avoid the task cancellation, wrap it in :func:`shield`. - This function is a :ref:`coroutine `. + This function is a :ref:`coroutine `, usage:: - Usage:: - - result = yield from asyncio.wait_for(fut, 60.0) + result = yield from asyncio.wait_for(fut, 60.0)