bpo-32309: Implement asyncio.to_thread() (GH-20143)

Implements `asyncio.to_thread`, a coroutine for asynchronously running IO-bound functions in a separate thread without blocking the event loop. See the discussion starting from [here](https://github.com/python/cpython/pull/18410#issuecomment-628930973) in GH-18410 for context.

Automerge-Triggered-By: @aeros
This commit is contained in:
Kyle Stanley 2020-05-18 23:03:28 -04:00 committed by GitHub
parent d4fe098d1e
commit cc2bbc2227
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 171 additions and 0 deletions

View File

@ -48,6 +48,9 @@ await on multiple things with timeouts.
* - :class:`Task`
- Task object.
* - :func:`to_thread`
- Asychronously run a function in a separate OS thread.
* - :func:`run_coroutine_threadsafe`
- Schedule a coroutine from another OS thread.

View File

@ -602,6 +602,62 @@ Waiting Primitives
# ...
Running in Threads
==================
.. coroutinefunction:: to_thread(func, /, \*args, \*\*kwargs)
Asynchronously run function *func* in a separate thread.
Any \*args and \*\*kwargs supplied for this function are directly passed
to *func*.
Return an :class:`asyncio.Future` which represents the eventual result of
*func*.
This coroutine function is primarily intended to be used for executing
IO-bound functions/methods that would otherwise block the event loop if
they were ran in the main thread. For example::
def blocking_io():
print(f"start blocking_io at {time.strftime('%X')}")
# Note that time.sleep() can be replaced with any blocking
# IO-bound operation, such as file operations.
time.sleep(1)
print(f"blocking_io complete at {time.strftime('%X')}")
async def main():
print(f"started main at {time.strftime('%X')}")
await asyncio.gather(
asyncio.to_thread(blocking_io),
asyncio.sleep(1))
print(f"finished main at {time.strftime('%X')}")
asyncio.run(main())
# Expected output:
#
# started main at 19:50:53
# start blocking_io at 19:50:53
# blocking_io complete at 19:50:54
# finished main at 19:50:54
Directly calling `blocking_io()` in any coroutine would block the event loop
for its duration, resulting in an additional 1 second of run time. Instead,
by using `asyncio.to_thread()`, we can run it in a separate thread without
blocking the event loop.
.. note::
Due to the :term:`GIL`, `asyncio.to_thread()` can typically only be used
to make IO-bound functions non-blocking. However, for extension modules
that release the GIL or alternative Python implementations that don't
have one, `asyncio.to_thread()` can also be used for CPU-bound functions.
Scheduling From Other Threads
=============================

View File

@ -282,6 +282,12 @@ that schedules a shutdown for the default executor that waits on the
Added :class:`asyncio.PidfdChildWatcher`, a Linux-specific child watcher
implementation that polls process file descriptors. (:issue:`38692`)
Added a new :term:`coroutine` :func:`asyncio.to_thread`. It is mainly used for
running IO-bound functions in a separate thread to avoid blocking the event
loop, and essentially works as a high-level version of
:meth:`~asyncio.loop.run_in_executor` that can directly take keyword arguments.
(Contributed by Kyle Stanley and Yury Selivanov in :issue:`32309`.)
compileall
----------

View File

@ -17,6 +17,7 @@ from .queues import *
from .streams import *
from .subprocess import *
from .tasks import *
from .threads import *
from .transports import *
# Exposed for _asynciomodule.c to implement now deprecated
@ -35,6 +36,7 @@ __all__ = (base_events.__all__ +
streams.__all__ +
subprocess.__all__ +
tasks.__all__ +
threads.__all__ +
transports.__all__)
if sys.platform == 'win32': # pragma: no cover

21
Lib/asyncio/threads.py Normal file
View File

@ -0,0 +1,21 @@
"""High-level support for working with threads in asyncio"""
import functools
from . import events
__all__ = "to_thread",
async def to_thread(func, /, *args, **kwargs):
"""Asynchronously run function *func* in a separate thread.
Any *args and **kwargs supplied for this function are directly passed
to *func*.
Return an asyncio.Future which represents the eventual result of *func*.
"""
loop = events.get_running_loop()
func_call = functools.partial(func, *args, **kwargs)
return await loop.run_in_executor(None, func_call)

View File

@ -0,0 +1,79 @@
"""Tests for asyncio/threads.py"""
import asyncio
import unittest
from unittest import mock
from test.test_asyncio import utils as test_utils
def tearDownModule():
asyncio.set_event_loop_policy(None)
class ToThreadTests(test_utils.TestCase):
def setUp(self):
super().setUp()
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
def tearDown(self):
self.loop.run_until_complete(
self.loop.shutdown_default_executor())
self.loop.close()
asyncio.set_event_loop(None)
self.loop = None
super().tearDown()
def test_to_thread(self):
async def main():
return await asyncio.to_thread(sum, [40, 2])
result = self.loop.run_until_complete(main())
self.assertEqual(result, 42)
def test_to_thread_exception(self):
def raise_runtime():
raise RuntimeError("test")
async def main():
await asyncio.to_thread(raise_runtime)
with self.assertRaisesRegex(RuntimeError, "test"):
self.loop.run_until_complete(main())
def test_to_thread_once(self):
func = mock.Mock()
async def main():
await asyncio.to_thread(func)
self.loop.run_until_complete(main())
func.assert_called_once()
def test_to_thread_concurrent(self):
func = mock.Mock()
async def main():
futs = []
for _ in range(10):
fut = asyncio.to_thread(func)
futs.append(fut)
await asyncio.gather(*futs)
self.loop.run_until_complete(main())
self.assertEqual(func.call_count, 10)
def test_to_thread_args_kwargs(self):
# Unlike run_in_executor(), to_thread() should directly accept kwargs.
func = mock.Mock()
async def main():
await asyncio.to_thread(func, 'test', something=True)
self.loop.run_until_complete(main())
func.assert_called_once_with('test', something=True)
if __name__ == "__main__":
unittest.main()

View File

@ -0,0 +1,4 @@
Added a new :term:`coroutine` :func:`asyncio.to_thread`. It is mainly used for
running IO-bound functions in a separate thread to avoid blocking the event
loop, and essentially works as a high-level version of
:meth:`~asyncio.loop.run_in_executor` that can directly take keyword arguments.