bpo-47062: Implement asyncio.Runner context manager (GH-31799)

Co-authored-by: Zachary Ware <zach@python.org>
This commit is contained in:
Andrew Svetlov 2022-03-24 21:51:16 +02:00 committed by GitHub
parent 2f49b97cc5
commit 4119d2d7c9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 381 additions and 106 deletions

View File

@ -0,0 +1,121 @@
.. currentmodule:: asyncio
=======
Runners
=======
**Source code:** :source:`Lib/asyncio/runners.py`
This section outlines high-level asyncio primitives to run asyncio code.
They are built on top of an :ref:`event loop <asyncio-event-loop>` with the aim
to simplify async code usage for common wide-spread scenarios.
.. contents::
:depth: 1
:local:
Running an asyncio Program
==========================
.. function:: run(coro, *, debug=None)
Execute the :term:`coroutine` *coro* and return the result.
This function runs the passed coroutine, taking care of
managing the asyncio event loop, *finalizing asynchronous
generators*, and closing the threadpool.
This function cannot be called when another asyncio event loop is
running in the same thread.
If *debug* is ``True``, the event loop will be run in debug mode. ``False`` disables
debug mode explicitly. ``None`` is used to respect the global
:ref:`asyncio-debug-mode` settings.
This function always creates a new event loop and closes it at
the end. It should be used as a main entry point for asyncio
programs, and should ideally only be called once.
Example::
async def main():
await asyncio.sleep(1)
print('hello')
asyncio.run(main())
.. versionadded:: 3.7
.. versionchanged:: 3.9
Updated to use :meth:`loop.shutdown_default_executor`.
.. versionchanged:: 3.10
*debug* is ``None`` by default to respect the global debug mode settings.
Runner context manager
======================
.. class:: Runner(*, debug=None, factory=None)
A context manager that simplifies *multiple* async function calls in the same
context.
Sometimes several top-level async functions should be called in the same :ref:`event
loop <asyncio-event-loop>` and :class:`contextvars.Context`.
If *debug* is ``True``, the event loop will be run in debug mode. ``False`` disables
debug mode explicitly. ``None`` is used to respect the global
:ref:`asyncio-debug-mode` settings.
*factory* could be used for overriding the loop creation.
:func:`asyncio.new_event_loop` is used if ``None``.
Basically, :func:`asyncio.run()` example can be rewritten with the runner usage::
async def main():
await asyncio.sleep(1)
print('hello')
with asyncio.Runner() as runner:
runner.run(main())
.. versionadded:: 3.11
.. method:: run(coro, *, context=None)
Run a :term:`coroutine <coroutine>` *coro* in the embedded loop.
Return the coroutine's result or raise its exception.
An optional keyword-only *context* argument allows specifying a
custom :class:`contextvars.Context` for the *coro* to run in.
The runner's default context is used if ``None``.
This function cannot be called when another asyncio event loop is
running in the same thread.
.. method:: close()
Close the runner.
Finalize asynchronous generators, shutdown default executor, close the event loop
and release embedded :class:`contextvars.Context`.
.. method:: get_loop()
Return the event loop associated with the runner instance.
.. note::
:class:`Runner` uses the lazy initialization strategy, its constructor doesn't
initialize underlying low-level structures.
Embedded *loop* and *context* are created at the :keyword:`with` body entering
or the first call of :meth:`run` or :meth:`get_loop`.

View File

@ -204,43 +204,6 @@ A good example of a low-level function that returns a Future object
is :meth:`loop.run_in_executor`.
Running an asyncio Program
==========================
.. function:: run(coro, *, debug=False)
Execute the :term:`coroutine` *coro* and return the result.
This function runs the passed coroutine, taking care of
managing the asyncio event loop, *finalizing asynchronous
generators*, and closing the threadpool.
This function cannot be called when another asyncio event loop is
running in the same thread.
If *debug* is ``True``, the event loop will be run in debug mode.
This function always creates a new event loop and closes it at
the end. It should be used as a main entry point for asyncio
programs, and should ideally only be called once.
Example::
async def main():
await asyncio.sleep(1)
print('hello')
asyncio.run(main())
.. versionadded:: 3.7
.. versionchanged:: 3.9
Updated to use :meth:`loop.shutdown_default_executor`.
.. note::
The source code for ``asyncio.run()`` can be found in
:source:`Lib/asyncio/runners.py`.
Creating Tasks
==============

View File

@ -67,6 +67,7 @@ Additionally, there are **low-level** APIs for
:caption: High-level APIs
:maxdepth: 1
asyncio-runner.rst
asyncio-task.rst
asyncio-stream.rst
asyncio-sync.rst

View File

@ -1,10 +1,112 @@
__all__ = 'run',
__all__ = ('Runner', 'run')
import contextvars
import enum
from . import coroutines
from . import events
from . import tasks
class _State(enum.Enum):
CREATED = "created"
INITIALIZED = "initialized"
CLOSED = "closed"
class Runner:
"""A context manager that controls event loop life cycle.
The context manager always creates a new event loop,
allows to run async functions inside it,
and properly finalizes the loop at the context manager exit.
If debug is True, the event loop will be run in debug mode.
If factory is passed, it is used for new event loop creation.
asyncio.run(main(), debug=True)
is a shortcut for
with asyncio.Runner(debug=True) as runner:
runner.run(main())
The run() method can be called multiple times within the runner's context.
This can be useful for interactive console (e.g. IPython),
unittest runners, console tools, -- everywhere when async code
is called from existing sync framework and where the preferred single
asyncio.run() call doesn't work.
"""
# Note: the class is final, it is not intended for inheritance.
def __init__(self, *, debug=None, factory=None):
self._state = _State.CREATED
self._debug = debug
self._factory = factory
self._loop = None
self._context = None
def __enter__(self):
self._lazy_init()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
def close(self):
"""Shutdown and close event loop."""
if self._state is not _State.INITIALIZED:
return
try:
loop = self._loop
_cancel_all_tasks(loop)
loop.run_until_complete(loop.shutdown_asyncgens())
loop.run_until_complete(loop.shutdown_default_executor())
finally:
loop.close()
self._loop = None
self._state = _State.CLOSED
def get_loop(self):
"""Return embedded event loop."""
self._lazy_init()
return self._loop
def run(self, coro, *, context=None):
"""Run a coroutine inside the embedded event loop."""
if not coroutines.iscoroutine(coro):
raise ValueError("a coroutine was expected, got {!r}".format(coro))
if events._get_running_loop() is not None:
# fail fast with short traceback
raise RuntimeError(
"Runner.run() cannot be called from a running event loop")
self._lazy_init()
if context is None:
context = self._context
task = self._loop.create_task(coro, context=context)
return self._loop.run_until_complete(task)
def _lazy_init(self):
if self._state is _State.CLOSED:
raise RuntimeError("Runner is closed")
if self._state is _State.INITIALIZED:
return
if self._factory is None:
self._loop = events.new_event_loop()
else:
self._loop = self._factory()
if self._debug is not None:
self._loop.set_debug(self._debug)
self._context = contextvars.copy_context()
self._state = _State.INITIALIZED
def run(main, *, debug=None):
"""Execute the coroutine and return the result.
@ -30,26 +132,12 @@ def run(main, *, debug=None):
asyncio.run(main())
"""
if events._get_running_loop() is not None:
# fail fast with short traceback
raise RuntimeError(
"asyncio.run() cannot be called from a running event loop")
if not coroutines.iscoroutine(main):
raise ValueError("a coroutine was expected, got {!r}".format(main))
loop = events.new_event_loop()
try:
events.set_event_loop(loop)
if debug is not None:
loop.set_debug(debug)
return loop.run_until_complete(main)
finally:
try:
_cancel_all_tasks(loop)
loop.run_until_complete(loop.shutdown_asyncgens())
loop.run_until_complete(loop.shutdown_default_executor())
finally:
events.set_event_loop(None)
loop.close()
with Runner(debug=debug) as runner:
return runner.run(main)
def _cancel_all_tasks(loop):

View File

@ -1,4 +1,7 @@
import asyncio
import contextvars
import gc
import re
import unittest
from unittest import mock
@ -186,5 +189,135 @@ class RunTests(BaseTest):
self.assertFalse(spinner.ag_running)
class RunnerTests(BaseTest):
def test_non_debug(self):
with asyncio.Runner(debug=False) as runner:
self.assertFalse(runner.get_loop().get_debug())
def test_debug(self):
with asyncio.Runner(debug=True) as runner:
self.assertTrue(runner.get_loop().get_debug())
def test_custom_factory(self):
loop = mock.Mock()
with asyncio.Runner(factory=lambda: loop) as runner:
self.assertIs(runner.get_loop(), loop)
def test_run(self):
async def f():
await asyncio.sleep(0)
return 'done'
with asyncio.Runner() as runner:
self.assertEqual('done', runner.run(f()))
loop = runner.get_loop()
with self.assertRaisesRegex(
RuntimeError,
"Runner is closed"
):
runner.get_loop()
self.assertTrue(loop.is_closed())
def test_run_non_coro(self):
with asyncio.Runner() as runner:
with self.assertRaisesRegex(
ValueError,
"a coroutine was expected"
):
runner.run(123)
def test_run_future(self):
with asyncio.Runner() as runner:
with self.assertRaisesRegex(
ValueError,
"a coroutine was expected"
):
fut = runner.get_loop().create_future()
runner.run(fut)
def test_explicit_close(self):
runner = asyncio.Runner()
loop = runner.get_loop()
runner.close()
with self.assertRaisesRegex(
RuntimeError,
"Runner is closed"
):
runner.get_loop()
self.assertTrue(loop.is_closed())
def test_double_close(self):
runner = asyncio.Runner()
loop = runner.get_loop()
runner.close()
self.assertTrue(loop.is_closed())
# the second call is no-op
runner.close()
self.assertTrue(loop.is_closed())
def test_second_with_block_raises(self):
ret = []
async def f(arg):
ret.append(arg)
runner = asyncio.Runner()
with runner:
runner.run(f(1))
with self.assertRaisesRegex(
RuntimeError,
"Runner is closed"
):
with runner:
runner.run(f(2))
self.assertEqual([1], ret)
def test_run_keeps_context(self):
cvar = contextvars.ContextVar("cvar", default=-1)
async def f(val):
old = cvar.get()
await asyncio.sleep(0)
cvar.set(val)
return old
async def get_context():
return contextvars.copy_context()
with asyncio.Runner() as runner:
self.assertEqual(-1, runner.run(f(1)))
self.assertEqual(1, runner.run(f(2)))
self.assertEqual({cvar: 2}, dict(runner.run(get_context())))
def test_recursine_run(self):
async def g():
pass
async def f():
runner.run(g())
with asyncio.Runner() as runner:
with self.assertWarnsRegex(
RuntimeWarning,
"coroutine .+ was never awaited",
):
with self.assertRaisesRegex(
RuntimeError,
re.escape(
"Runner.run() cannot be called from a running event loop"
),
):
runner.run(f())
if __name__ == '__main__':
unittest.main()

View File

@ -34,7 +34,7 @@ class IsolatedAsyncioTestCase(TestCase):
def __init__(self, methodName='runTest'):
super().__init__(methodName)
self._asyncioTestLoop = None
self._asyncioRunner = None
self._asyncioTestContext = contextvars.copy_context()
async def asyncSetUp(self):
@ -75,76 +75,44 @@ class IsolatedAsyncioTestCase(TestCase):
self._callMaybeAsync(function, *args, **kwargs)
def _callAsync(self, func, /, *args, **kwargs):
assert self._asyncioTestLoop is not None, 'asyncio test loop is not initialized'
assert self._asyncioRunner is not None, 'asyncio runner is not initialized'
assert inspect.iscoroutinefunction(func), f'{func!r} is not an async function'
task = self._asyncioTestLoop.create_task(
return self._asyncioRunner.run(
func(*args, **kwargs),
context=self._asyncioTestContext,
context=self._asyncioTestContext
)
return self._asyncioTestLoop.run_until_complete(task)
def _callMaybeAsync(self, func, /, *args, **kwargs):
assert self._asyncioTestLoop is not None, 'asyncio test loop is not initialized'
assert self._asyncioRunner is not None, 'asyncio runner is not initialized'
if inspect.iscoroutinefunction(func):
task = self._asyncioTestLoop.create_task(
return self._asyncioRunner.run(
func(*args, **kwargs),
context=self._asyncioTestContext,
)
return self._asyncioTestLoop.run_until_complete(task)
else:
return self._asyncioTestContext.run(func, *args, **kwargs)
def _setupAsyncioLoop(self):
assert self._asyncioTestLoop is None, 'asyncio test loop already initialized'
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.set_debug(True)
self._asyncioTestLoop = loop
def _setupAsyncioRunner(self):
assert self._asyncioRunner is None, 'asyncio runner is already initialized'
runner = asyncio.Runner(debug=True)
self._asyncioRunner = runner
def _tearDownAsyncioLoop(self):
assert self._asyncioTestLoop is not None, 'asyncio test loop is not initialized'
loop = self._asyncioTestLoop
self._asyncioTestLoop = None
try:
# cancel all tasks
to_cancel = asyncio.all_tasks(loop)
if not to_cancel:
return
for task in to_cancel:
task.cancel()
loop.run_until_complete(
asyncio.gather(*to_cancel, return_exceptions=True))
for task in to_cancel:
if task.cancelled():
continue
if task.exception() is not None:
loop.call_exception_handler({
'message': 'unhandled exception during test shutdown',
'exception': task.exception(),
'task': task,
})
# shutdown asyncgens
loop.run_until_complete(loop.shutdown_asyncgens())
finally:
asyncio.set_event_loop(None)
loop.close()
def _tearDownAsyncioRunner(self):
runner = self._asyncioRunner
runner.close()
def run(self, result=None):
self._setupAsyncioLoop()
self._setupAsyncioRunner()
try:
return super().run(result)
finally:
self._tearDownAsyncioLoop()
self._tearDownAsyncioRunner()
def debug(self):
self._setupAsyncioLoop()
self._setupAsyncioRunner()
super().debug()
self._tearDownAsyncioLoop()
self._tearDownAsyncioRunner()
def __del__(self):
if self._asyncioTestLoop is not None:
self._tearDownAsyncioLoop()
if self._asyncioRunner is not None:
self._tearDownAsyncioRunner()

View File

@ -0,0 +1 @@
Implement :class:`asyncio.Runner` context manager.