bpo-32309: Implement asyncio.to_thread() (GH-20143)
Implements `asyncio.to_thread`, a coroutine for asynchronously running IO-bound functions in a separate thread without blocking the event loop. See the discussion starting from [here](https://github.com/python/cpython/pull/18410GH-issuecomment-628930973) in GH-18410 for context.
Automerge-Triggered-By: @aeros
(cherry picked from commit cc2bbc2227
)
Co-authored-by: Kyle Stanley <aeros167@gmail.com>
This commit is contained in:
parent
3d062829de
commit
e2991308c9
|
@ -48,6 +48,9 @@ await on multiple things with timeouts.
|
|||
* - :class:`Task`
|
||||
- Task object.
|
||||
|
||||
* - :func:`to_thread`
|
||||
- Asychronously run a function in a separate OS thread.
|
||||
|
||||
* - :func:`run_coroutine_threadsafe`
|
||||
- Schedule a coroutine from another OS thread.
|
||||
|
||||
|
|
|
@ -602,6 +602,62 @@ Waiting Primitives
|
|||
# ...
|
||||
|
||||
|
||||
Running in Threads
|
||||
==================
|
||||
|
||||
.. coroutinefunction:: to_thread(func, /, \*args, \*\*kwargs)
|
||||
|
||||
Asynchronously run function *func* in a separate thread.
|
||||
|
||||
Any \*args and \*\*kwargs supplied for this function are directly passed
|
||||
to *func*.
|
||||
|
||||
Return an :class:`asyncio.Future` which represents the eventual result of
|
||||
*func*.
|
||||
|
||||
This coroutine function is primarily intended to be used for executing
|
||||
IO-bound functions/methods that would otherwise block the event loop if
|
||||
they were ran in the main thread. For example::
|
||||
|
||||
def blocking_io():
|
||||
print(f"start blocking_io at {time.strftime('%X')}")
|
||||
# Note that time.sleep() can be replaced with any blocking
|
||||
# IO-bound operation, such as file operations.
|
||||
time.sleep(1)
|
||||
print(f"blocking_io complete at {time.strftime('%X')}")
|
||||
|
||||
async def main():
|
||||
print(f"started main at {time.strftime('%X')}")
|
||||
|
||||
await asyncio.gather(
|
||||
asyncio.to_thread(blocking_io),
|
||||
asyncio.sleep(1))
|
||||
|
||||
print(f"finished main at {time.strftime('%X')}")
|
||||
|
||||
|
||||
asyncio.run(main())
|
||||
|
||||
# Expected output:
|
||||
#
|
||||
# started main at 19:50:53
|
||||
# start blocking_io at 19:50:53
|
||||
# blocking_io complete at 19:50:54
|
||||
# finished main at 19:50:54
|
||||
|
||||
Directly calling `blocking_io()` in any coroutine would block the event loop
|
||||
for its duration, resulting in an additional 1 second of run time. Instead,
|
||||
by using `asyncio.to_thread()`, we can run it in a separate thread without
|
||||
blocking the event loop.
|
||||
|
||||
.. note::
|
||||
|
||||
Due to the :term:`GIL`, `asyncio.to_thread()` can typically only be used
|
||||
to make IO-bound functions non-blocking. However, for extension modules
|
||||
that release the GIL or alternative Python implementations that don't
|
||||
have one, `asyncio.to_thread()` can also be used for CPU-bound functions.
|
||||
|
||||
|
||||
Scheduling From Other Threads
|
||||
=============================
|
||||
|
||||
|
|
|
@ -282,6 +282,12 @@ that schedules a shutdown for the default executor that waits on the
|
|||
Added :class:`asyncio.PidfdChildWatcher`, a Linux-specific child watcher
|
||||
implementation that polls process file descriptors. (:issue:`38692`)
|
||||
|
||||
Added a new :term:`coroutine` :func:`asyncio.to_thread`. It is mainly used for
|
||||
running IO-bound functions in a separate thread to avoid blocking the event
|
||||
loop, and essentially works as a high-level version of
|
||||
:meth:`~asyncio.loop.run_in_executor` that can directly take keyword arguments.
|
||||
(Contributed by Kyle Stanley and Yury Selivanov in :issue:`32309`.)
|
||||
|
||||
compileall
|
||||
----------
|
||||
|
||||
|
|
|
@ -17,6 +17,7 @@ from .queues import *
|
|||
from .streams import *
|
||||
from .subprocess import *
|
||||
from .tasks import *
|
||||
from .threads import *
|
||||
from .transports import *
|
||||
|
||||
# Exposed for _asynciomodule.c to implement now deprecated
|
||||
|
@ -35,6 +36,7 @@ __all__ = (base_events.__all__ +
|
|||
streams.__all__ +
|
||||
subprocess.__all__ +
|
||||
tasks.__all__ +
|
||||
threads.__all__ +
|
||||
transports.__all__)
|
||||
|
||||
if sys.platform == 'win32': # pragma: no cover
|
||||
|
|
|
@ -0,0 +1,21 @@
|
|||
"""High-level support for working with threads in asyncio"""
|
||||
|
||||
import functools
|
||||
|
||||
from . import events
|
||||
|
||||
|
||||
__all__ = "to_thread",
|
||||
|
||||
|
||||
async def to_thread(func, /, *args, **kwargs):
|
||||
"""Asynchronously run function *func* in a separate thread.
|
||||
|
||||
Any *args and **kwargs supplied for this function are directly passed
|
||||
to *func*.
|
||||
|
||||
Return an asyncio.Future which represents the eventual result of *func*.
|
||||
"""
|
||||
loop = events.get_running_loop()
|
||||
func_call = functools.partial(func, *args, **kwargs)
|
||||
return await loop.run_in_executor(None, func_call)
|
|
@ -0,0 +1,79 @@
|
|||
"""Tests for asyncio/threads.py"""
|
||||
|
||||
import asyncio
|
||||
import unittest
|
||||
|
||||
from unittest import mock
|
||||
from test.test_asyncio import utils as test_utils
|
||||
|
||||
|
||||
def tearDownModule():
|
||||
asyncio.set_event_loop_policy(None)
|
||||
|
||||
|
||||
class ToThreadTests(test_utils.TestCase):
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(self.loop)
|
||||
|
||||
def tearDown(self):
|
||||
self.loop.run_until_complete(
|
||||
self.loop.shutdown_default_executor())
|
||||
self.loop.close()
|
||||
asyncio.set_event_loop(None)
|
||||
self.loop = None
|
||||
super().tearDown()
|
||||
|
||||
def test_to_thread(self):
|
||||
async def main():
|
||||
return await asyncio.to_thread(sum, [40, 2])
|
||||
|
||||
result = self.loop.run_until_complete(main())
|
||||
self.assertEqual(result, 42)
|
||||
|
||||
def test_to_thread_exception(self):
|
||||
def raise_runtime():
|
||||
raise RuntimeError("test")
|
||||
|
||||
async def main():
|
||||
await asyncio.to_thread(raise_runtime)
|
||||
|
||||
with self.assertRaisesRegex(RuntimeError, "test"):
|
||||
self.loop.run_until_complete(main())
|
||||
|
||||
def test_to_thread_once(self):
|
||||
func = mock.Mock()
|
||||
|
||||
async def main():
|
||||
await asyncio.to_thread(func)
|
||||
|
||||
self.loop.run_until_complete(main())
|
||||
func.assert_called_once()
|
||||
|
||||
def test_to_thread_concurrent(self):
|
||||
func = mock.Mock()
|
||||
|
||||
async def main():
|
||||
futs = []
|
||||
for _ in range(10):
|
||||
fut = asyncio.to_thread(func)
|
||||
futs.append(fut)
|
||||
await asyncio.gather(*futs)
|
||||
|
||||
self.loop.run_until_complete(main())
|
||||
self.assertEqual(func.call_count, 10)
|
||||
|
||||
def test_to_thread_args_kwargs(self):
|
||||
# Unlike run_in_executor(), to_thread() should directly accept kwargs.
|
||||
func = mock.Mock()
|
||||
|
||||
async def main():
|
||||
await asyncio.to_thread(func, 'test', something=True)
|
||||
|
||||
self.loop.run_until_complete(main())
|
||||
func.assert_called_once_with('test', something=True)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
|
@ -0,0 +1,4 @@
|
|||
Added a new :term:`coroutine` :func:`asyncio.to_thread`. It is mainly used for
|
||||
running IO-bound functions in a separate thread to avoid blocking the event
|
||||
loop, and essentially works as a high-level version of
|
||||
:meth:`~asyncio.loop.run_in_executor` that can directly take keyword arguments.
|
Loading…
Reference in New Issue