mirror of https://github.com/python/cpython
Issue #25304: Add asyncio.run_coroutine_threadsafe(). By Vincent Michel. (Merge 3.4->3.5.)
This commit is contained in:
commit
0d9bef927b
|
@ -390,22 +390,64 @@ class Future:
|
||||||
__await__ = __iter__ # make compatible with 'await' expression
|
__await__ = __iter__ # make compatible with 'await' expression
|
||||||
|
|
||||||
|
|
||||||
def wrap_future(fut, *, loop=None):
|
def _set_concurrent_future_state(concurrent, source):
|
||||||
|
"""Copy state from a future to a concurrent.futures.Future."""
|
||||||
|
assert source.done()
|
||||||
|
if source.cancelled():
|
||||||
|
concurrent.cancel()
|
||||||
|
if not concurrent.set_running_or_notify_cancel():
|
||||||
|
return
|
||||||
|
exception = source.exception()
|
||||||
|
if exception is not None:
|
||||||
|
concurrent.set_exception(exception)
|
||||||
|
else:
|
||||||
|
result = source.result()
|
||||||
|
concurrent.set_result(result)
|
||||||
|
|
||||||
|
|
||||||
|
def _chain_future(source, destination):
|
||||||
|
"""Chain two futures so that when one completes, so does the other.
|
||||||
|
|
||||||
|
The result (or exception) of source will be copied to destination.
|
||||||
|
If destination is cancelled, source gets cancelled too.
|
||||||
|
Compatible with both asyncio.Future and concurrent.futures.Future.
|
||||||
|
"""
|
||||||
|
if not isinstance(source, (Future, concurrent.futures.Future)):
|
||||||
|
raise TypeError('A future is required for source argument')
|
||||||
|
if not isinstance(destination, (Future, concurrent.futures.Future)):
|
||||||
|
raise TypeError('A future is required for destination argument')
|
||||||
|
source_loop = source._loop if isinstance(source, Future) else None
|
||||||
|
dest_loop = destination._loop if isinstance(destination, Future) else None
|
||||||
|
|
||||||
|
def _set_state(future, other):
|
||||||
|
if isinstance(future, Future):
|
||||||
|
future._copy_state(other)
|
||||||
|
else:
|
||||||
|
_set_concurrent_future_state(future, other)
|
||||||
|
|
||||||
|
def _call_check_cancel(destination):
|
||||||
|
if destination.cancelled():
|
||||||
|
if source_loop is None or source_loop is dest_loop:
|
||||||
|
source.cancel()
|
||||||
|
else:
|
||||||
|
source_loop.call_soon_threadsafe(source.cancel)
|
||||||
|
|
||||||
|
def _call_set_state(source):
|
||||||
|
if dest_loop is None or dest_loop is source_loop:
|
||||||
|
_set_state(destination, source)
|
||||||
|
else:
|
||||||
|
dest_loop.call_soon_threadsafe(_set_state, destination, source)
|
||||||
|
|
||||||
|
destination.add_done_callback(_call_check_cancel)
|
||||||
|
source.add_done_callback(_call_set_state)
|
||||||
|
|
||||||
|
|
||||||
|
def wrap_future(future, *, loop=None):
|
||||||
"""Wrap concurrent.futures.Future object."""
|
"""Wrap concurrent.futures.Future object."""
|
||||||
if isinstance(fut, Future):
|
if isinstance(future, Future):
|
||||||
return fut
|
return future
|
||||||
assert isinstance(fut, concurrent.futures.Future), \
|
assert isinstance(future, concurrent.futures.Future), \
|
||||||
'concurrent.futures.Future is expected, got {!r}'.format(fut)
|
'concurrent.futures.Future is expected, got {!r}'.format(future)
|
||||||
if loop is None:
|
|
||||||
loop = events.get_event_loop()
|
|
||||||
new_future = Future(loop=loop)
|
new_future = Future(loop=loop)
|
||||||
|
_chain_future(future, new_future)
|
||||||
def _check_cancel_other(f):
|
|
||||||
if f.cancelled():
|
|
||||||
fut.cancel()
|
|
||||||
|
|
||||||
new_future.add_done_callback(_check_cancel_other)
|
|
||||||
fut.add_done_callback(
|
|
||||||
lambda future: loop.call_soon_threadsafe(
|
|
||||||
new_future._copy_state, future))
|
|
||||||
return new_future
|
return new_future
|
||||||
|
|
|
@ -3,7 +3,7 @@
|
||||||
__all__ = ['Task',
|
__all__ = ['Task',
|
||||||
'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
|
'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
|
||||||
'wait', 'wait_for', 'as_completed', 'sleep', 'async',
|
'wait', 'wait_for', 'as_completed', 'sleep', 'async',
|
||||||
'gather', 'shield', 'ensure_future',
|
'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe',
|
||||||
]
|
]
|
||||||
|
|
||||||
import concurrent.futures
|
import concurrent.futures
|
||||||
|
@ -692,3 +692,19 @@ def shield(arg, *, loop=None):
|
||||||
|
|
||||||
inner.add_done_callback(_done_callback)
|
inner.add_done_callback(_done_callback)
|
||||||
return outer
|
return outer
|
||||||
|
|
||||||
|
|
||||||
|
def run_coroutine_threadsafe(coro, loop):
|
||||||
|
"""Submit a coroutine object to a given event loop.
|
||||||
|
|
||||||
|
Return a concurrent.futures.Future to access the result.
|
||||||
|
"""
|
||||||
|
if not coroutines.iscoroutine(coro):
|
||||||
|
raise TypeError('A coroutine object is required')
|
||||||
|
future = concurrent.futures.Future()
|
||||||
|
|
||||||
|
def callback():
|
||||||
|
futures._chain_future(ensure_future(coro, loop=loop), future)
|
||||||
|
|
||||||
|
loop.call_soon_threadsafe(callback)
|
||||||
|
return future
|
||||||
|
|
|
@ -174,8 +174,6 @@ class FutureTests(test_utils.TestCase):
|
||||||
'<Future cancelled>')
|
'<Future cancelled>')
|
||||||
|
|
||||||
def test_copy_state(self):
|
def test_copy_state(self):
|
||||||
# Test the internal _copy_state method since it's being directly
|
|
||||||
# invoked in other modules.
|
|
||||||
f = asyncio.Future(loop=self.loop)
|
f = asyncio.Future(loop=self.loop)
|
||||||
f.set_result(10)
|
f.set_result(10)
|
||||||
|
|
||||||
|
|
|
@ -2100,5 +2100,72 @@ class CoroutineGatherTests(GatherTestsBase, test_utils.TestCase):
|
||||||
self.assertIsInstance(f.exception(), RuntimeError)
|
self.assertIsInstance(f.exception(), RuntimeError)
|
||||||
|
|
||||||
|
|
||||||
|
class RunCoroutineThreadsafeTests(test_utils.TestCase):
|
||||||
|
"""Test case for futures.submit_to_loop."""
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
self.loop = self.new_test_loop(self.time_gen)
|
||||||
|
|
||||||
|
def time_gen(self):
|
||||||
|
"""Handle the timer."""
|
||||||
|
yield 0 # second
|
||||||
|
yield 1 # second
|
||||||
|
|
||||||
|
@asyncio.coroutine
|
||||||
|
def add(self, a, b, fail=False, cancel=False):
|
||||||
|
"""Wait 1 second and return a + b."""
|
||||||
|
yield from asyncio.sleep(1, loop=self.loop)
|
||||||
|
if fail:
|
||||||
|
raise RuntimeError("Fail!")
|
||||||
|
if cancel:
|
||||||
|
asyncio.tasks.Task.current_task(self.loop).cancel()
|
||||||
|
yield
|
||||||
|
return a + b
|
||||||
|
|
||||||
|
def target(self, fail=False, cancel=False, timeout=None):
|
||||||
|
"""Run add coroutine in the event loop."""
|
||||||
|
coro = self.add(1, 2, fail=fail, cancel=cancel)
|
||||||
|
future = asyncio.run_coroutine_threadsafe(coro, self.loop)
|
||||||
|
try:
|
||||||
|
return future.result(timeout)
|
||||||
|
finally:
|
||||||
|
future.done() or future.cancel()
|
||||||
|
|
||||||
|
def test_run_coroutine_threadsafe(self):
|
||||||
|
"""Test coroutine submission from a thread to an event loop."""
|
||||||
|
future = self.loop.run_in_executor(None, self.target)
|
||||||
|
result = self.loop.run_until_complete(future)
|
||||||
|
self.assertEqual(result, 3)
|
||||||
|
|
||||||
|
def test_run_coroutine_threadsafe_with_exception(self):
|
||||||
|
"""Test coroutine submission from a thread to an event loop
|
||||||
|
when an exception is raised."""
|
||||||
|
future = self.loop.run_in_executor(None, self.target, True)
|
||||||
|
with self.assertRaises(RuntimeError) as exc_context:
|
||||||
|
self.loop.run_until_complete(future)
|
||||||
|
self.assertIn("Fail!", exc_context.exception.args)
|
||||||
|
|
||||||
|
def test_run_coroutine_threadsafe_with_timeout(self):
|
||||||
|
"""Test coroutine submission from a thread to an event loop
|
||||||
|
when a timeout is raised."""
|
||||||
|
callback = lambda: self.target(timeout=0)
|
||||||
|
future = self.loop.run_in_executor(None, callback)
|
||||||
|
with self.assertRaises(asyncio.TimeoutError):
|
||||||
|
self.loop.run_until_complete(future)
|
||||||
|
# Clear the time generator and tasks
|
||||||
|
test_utils.run_briefly(self.loop)
|
||||||
|
# Check that there's no pending task (add has been cancelled)
|
||||||
|
for task in asyncio.Task.all_tasks(self.loop):
|
||||||
|
self.assertTrue(task.done())
|
||||||
|
|
||||||
|
def test_run_coroutine_threadsafe_task_cancelled(self):
|
||||||
|
"""Test coroutine submission from a tread to an event loop
|
||||||
|
when the task is cancelled."""
|
||||||
|
callback = lambda: self.target(cancel=True)
|
||||||
|
future = self.loop.run_in_executor(None, callback)
|
||||||
|
with self.assertRaises(asyncio.CancelledError):
|
||||||
|
self.loop.run_until_complete(future)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
|
|
@ -957,6 +957,7 @@ Steven Miale
|
||||||
Trent Mick
|
Trent Mick
|
||||||
Jason Michalski
|
Jason Michalski
|
||||||
Franck Michea
|
Franck Michea
|
||||||
|
Vincent Michel
|
||||||
Tom Middleton
|
Tom Middleton
|
||||||
Thomas Miedema
|
Thomas Miedema
|
||||||
Stan Mihai
|
Stan Mihai
|
||||||
|
|
|
@ -34,6 +34,10 @@ Core and Builtins
|
||||||
Library
|
Library
|
||||||
-------
|
-------
|
||||||
|
|
||||||
|
- Issue #25304: Add asyncio.run_coroutine_threadsafe(). This lets you
|
||||||
|
submit a coroutine to a loop from another thread, returning a
|
||||||
|
concurrent.futures.Future. By Vincent Michel.
|
||||||
|
|
||||||
- Issue #25232: Fix CGIRequestHandler to split the query from the URL at the
|
- Issue #25232: Fix CGIRequestHandler to split the query from the URL at the
|
||||||
first question mark (?) rather than the last. Patch from Xiang Zhang.
|
first question mark (?) rather than the last. Patch from Xiang Zhang.
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue