diff --git a/Doc/library/queue.rst b/Doc/library/queue.rst index c7dfc35925b..ed408f424f2 100644 --- a/Doc/library/queue.rst +++ b/Doc/library/queue.rst @@ -23,6 +23,8 @@ the first retrieved (operating like a stack). With a priority queue, the entries are kept sorted (using the :mod:`heapq` module) and the lowest valued entry is retrieved first. +Internally, the module uses locks to temporarily block competing threads; +however, it is not designed to handle reentrancy within a thread. The :mod:`queue` module defines the following classes and exceptions: @@ -189,11 +191,6 @@ Example of how to wait for enqueued tasks to be completed:: t.join() -.. note:: - - The :mod:`queue` module is not safe for use from :mod:`signal` handlers as - it uses :mod:`threading` locks. - .. seealso:: Class :class:`multiprocessing.Queue` diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py index b420586a139..9c2fa12486f 100644 --- a/Lib/asyncio/base_events.py +++ b/Lib/asyncio/base_events.py @@ -414,7 +414,7 @@ class BaseEventLoop(events.AbstractEventLoop): """ self._check_closed() - new_task = not isinstance(future, futures.Future) + new_task = not futures.isfuture(future) future = tasks.ensure_future(future, loop=self) if new_task: # An exception is raised if the future didn't complete, so there diff --git a/Lib/asyncio/coroutines.py b/Lib/asyncio/coroutines.py index 9c338b0c326..d92f67d590e 100644 --- a/Lib/asyncio/coroutines.py +++ b/Lib/asyncio/coroutines.py @@ -204,8 +204,8 @@ def coroutine(func): @functools.wraps(func) def coro(*args, **kw): res = func(*args, **kw) - if isinstance(res, futures.Future) or inspect.isgenerator(res) or \ - isinstance(res, CoroWrapper): + if (futures.isfuture(res) or inspect.isgenerator(res) or + isinstance(res, CoroWrapper)): res = yield from res elif _AwaitableABC is not None: # If 'func' returns an Awaitable (new in 3.5) we diff --git a/Lib/asyncio/futures.py b/Lib/asyncio/futures.py index edc13dcc474..bcd4d16b9d1 100644 --- a/Lib/asyncio/futures.py +++ b/Lib/asyncio/futures.py @@ -110,6 +110,16 @@ class _TracebackLogger: self.loop.call_exception_handler({'message': msg}) +def isfuture(obj): + """Check for a Future. + + This returns True when obj is a Future instance or is advertising + itself as duck-type compatible by setting _asyncio_future_blocking. + See comment in Future for more details. + """ + return getattr(obj, '_asyncio_future_blocking', None) is not None + + class Future: """This class is *almost* compatible with concurrent.futures.Future. @@ -423,15 +433,17 @@ def _chain_future(source, destination): If destination is cancelled, source gets cancelled too. Compatible with both asyncio.Future and concurrent.futures.Future. """ - if not isinstance(source, (Future, concurrent.futures.Future)): + if not isfuture(source) and not isinstance(source, + concurrent.futures.Future): raise TypeError('A future is required for source argument') - if not isinstance(destination, (Future, concurrent.futures.Future)): + if not isfuture(destination) and not isinstance(destination, + concurrent.futures.Future): raise TypeError('A future is required for destination argument') - source_loop = source._loop if isinstance(source, Future) else None - dest_loop = destination._loop if isinstance(destination, Future) else None + source_loop = source._loop if isfuture(source) else None + dest_loop = destination._loop if isfuture(destination) else None def _set_state(future, other): - if isinstance(future, Future): + if isfuture(future): _copy_future_state(other, future) else: _set_concurrent_future_state(future, other) @@ -455,7 +467,7 @@ def _chain_future(source, destination): def wrap_future(future, *, loop=None): """Wrap concurrent.futures.Future object.""" - if isinstance(future, Future): + if isfuture(future): return future assert isinstance(future, concurrent.futures.Future), \ 'concurrent.futures.Future is expected, got {!r}'.format(future) diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py index 3e200f6338d..35c945c4368 100644 --- a/Lib/asyncio/tasks.py +++ b/Lib/asyncio/tasks.py @@ -333,7 +333,7 @@ def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED): Note: This does not raise TimeoutError! Futures that aren't done when the timeout occurs are returned in the second set. """ - if isinstance(fs, futures.Future) or coroutines.iscoroutine(fs): + if futures.isfuture(fs) or coroutines.iscoroutine(fs): raise TypeError("expect a list of futures, not %s" % type(fs).__name__) if not fs: raise ValueError('Set of coroutines/Futures is empty.') @@ -462,7 +462,7 @@ def as_completed(fs, *, loop=None, timeout=None): Note: The futures 'f' are not necessarily members of fs. """ - if isinstance(fs, futures.Future) or coroutines.iscoroutine(fs): + if futures.isfuture(fs) or coroutines.iscoroutine(fs): raise TypeError("expect a list of futures, not %s" % type(fs).__name__) loop = loop if loop is not None else events.get_event_loop() todo = {ensure_future(f, loop=loop) for f in set(fs)} @@ -538,7 +538,7 @@ def ensure_future(coro_or_future, *, loop=None): If the argument is a Future, it is returned directly. """ - if isinstance(coro_or_future, futures.Future): + if futures.isfuture(coro_or_future): if loop is not None and loop is not coro_or_future._loop: raise ValueError('loop argument must agree with Future') return coro_or_future @@ -614,7 +614,7 @@ def gather(*coros_or_futures, loop=None, return_exceptions=False): arg_to_fut = {} for arg in set(coros_or_futures): - if not isinstance(arg, futures.Future): + if not futures.isfuture(arg): fut = ensure_future(arg, loop=loop) if loop is None: loop = fut._loop diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py index e742eb71e0d..7c901f27aae 100644 --- a/Lib/test/test_asyncio/test_events.py +++ b/Lib/test/test_asyncio/test_events.py @@ -793,7 +793,7 @@ class EventLoopTestsMixin: loop.connect_accepted_socket( (lambda : proto), conn, ssl=server_ssl)) loop.run_forever() - conn.close() + proto.transport.close() lsock.close() thread.join(1) diff --git a/Lib/test/test_asyncio/test_futures.py b/Lib/test/test_asyncio/test_futures.py index c38c1f29e7f..d20eb687f9a 100644 --- a/Lib/test/test_asyncio/test_futures.py +++ b/Lib/test/test_asyncio/test_futures.py @@ -25,6 +25,74 @@ def last_cb(): pass +class DuckFuture: + # Class that does not inherit from Future but aims to be duck-type + # compatible with it. + + _asyncio_future_blocking = False + __cancelled = False + __result = None + __exception = None + + def cancel(self): + if self.done(): + return False + self.__cancelled = True + return True + + def cancelled(self): + return self.__cancelled + + def done(self): + return (self.__cancelled + or self.__result is not None + or self.__exception is not None) + + def result(self): + assert not self.cancelled() + if self.__exception is not None: + raise self.__exception + return self.__result + + def exception(self): + assert not self.cancelled() + return self.__exception + + def set_result(self, result): + assert not self.done() + assert result is not None + self.__result = result + + def set_exception(self, exception): + assert not self.done() + assert exception is not None + self.__exception = exception + + def __iter__(self): + if not self.done(): + self._asyncio_future_blocking = True + yield self + assert self.done() + return self.result() + + +class DuckTests(test_utils.TestCase): + + def setUp(self): + self.loop = self.new_test_loop() + self.addCleanup(self.loop.close) + + def test_wrap_future(self): + f = DuckFuture() + g = asyncio.wrap_future(f) + assert g is f + + def test_ensure_future(self): + f = DuckFuture() + g = asyncio.ensure_future(f) + assert g is f + + class FutureTests(test_utils.TestCase): def setUp(self):