From f6d991d88502ee05da7c41217331ff024c634cbc Mon Sep 17 00:00:00 2001 From: Yury Selivanov Date: Thu, 15 Sep 2016 13:10:51 -0400 Subject: [PATCH 1/2] asyncio: Sync with the upstream --- Lib/asyncio/base_events.py | 60 ++++++++++++++++++++++++++++++++++++++ Lib/asyncio/events.py | 4 +++ 2 files changed, 64 insertions(+) diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py index 99f12b40bdd..619fe3a7782 100644 --- a/Lib/asyncio/base_events.py +++ b/Lib/asyncio/base_events.py @@ -28,6 +28,7 @@ import time import traceback import sys import warnings +import weakref from . import compat from . import coroutines @@ -242,6 +243,17 @@ class BaseEventLoop(events.AbstractEventLoop): self._task_factory = None self._coroutine_wrapper_set = False + if hasattr(sys, 'get_asyncgen_hooks'): + # Python >= 3.6 + # A weak set of all asynchronous generators that are + # being iterated by the loop. + self._asyncgens = weakref.WeakSet() + else: + self._asyncgens = None + + # Set to True when `loop.shutdown_asyncgens` is called. + self._asyncgens_shutdown_called = False + def __repr__(self): return ('<%s running=%s closed=%s debug=%s>' % (self.__class__.__name__, self.is_running(), @@ -333,6 +345,48 @@ class BaseEventLoop(events.AbstractEventLoop): if self._closed: raise RuntimeError('Event loop is closed') + def _asyncgen_finalizer_hook(self, agen): + self._asyncgens.discard(agen) + if not self.is_closed(): + self.create_task(agen.aclose()) + + def _asyncgen_firstiter_hook(self, agen): + if self._asyncgens_shutdown_called: + warnings.warn( + "asynchronous generator {!r} was scheduled after " + "loop.shutdown_asyncgens() call".format(agen), + ResourceWarning, source=self) + + self._asyncgens.add(agen) + + @coroutine + def shutdown_asyncgens(self): + """Shutdown all active asynchronous generators.""" + self._asyncgens_shutdown_called = True + + if self._asyncgens is None or not len(self._asyncgens): + # If Python version is <3.6 or we don't have any asynchronous + # generators alive. + return + + closing_agens = list(self._asyncgens) + self._asyncgens.clear() + + shutdown_coro = tasks.gather( + *[ag.aclose() for ag in closing_agens], + return_exceptions=True, + loop=self) + + results = yield from shutdown_coro + for result, agen in zip(results, closing_agens): + if isinstance(result, Exception): + self.call_exception_handler({ + 'message': 'an error occurred during closing of ' + 'asynchronous generator {!r}'.format(agen), + 'exception': result, + 'asyncgen': agen + }) + def run_forever(self): """Run until stop() is called.""" self._check_closed() @@ -340,6 +394,10 @@ class BaseEventLoop(events.AbstractEventLoop): raise RuntimeError('Event loop is running.') self._set_coroutine_wrapper(self._debug) self._thread_id = threading.get_ident() + if self._asyncgens is not None: + old_agen_hooks = sys.get_asyncgen_hooks() + sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook, + finalizer=self._asyncgen_finalizer_hook) try: while True: self._run_once() @@ -349,6 +407,8 @@ class BaseEventLoop(events.AbstractEventLoop): self._stopping = False self._thread_id = None self._set_coroutine_wrapper(False) + if self._asyncgens is not None: + sys.set_asyncgen_hooks(*old_agen_hooks) def run_until_complete(self, future): """Run until the Future is done. diff --git a/Lib/asyncio/events.py b/Lib/asyncio/events.py index c48c5bed736..cc9a986b994 100644 --- a/Lib/asyncio/events.py +++ b/Lib/asyncio/events.py @@ -248,6 +248,10 @@ class AbstractEventLoop: """ raise NotImplementedError + def shutdown_asyncgens(self): + """Shutdown all active asynchronous generators.""" + raise NotImplementedError + # Methods scheduling callbacks. All these return Handles. def _timer_handle_cancelled(self, handle): From 4357cf62028964eb1a56c503ec1296de3034b77b Mon Sep 17 00:00:00 2001 From: Yury Selivanov Date: Thu, 15 Sep 2016 13:49:08 -0400 Subject: [PATCH 2/2] Another asyncio sync. --- Lib/asyncio/base_events.py | 9 +++++---- Lib/asyncio/tasks.py | 7 ++++++- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py index 619fe3a7782..bc3e0129398 100644 --- a/Lib/asyncio/base_events.py +++ b/Lib/asyncio/base_events.py @@ -13,7 +13,6 @@ conscious design decision, leaving the door open for keyword arguments to modify the meaning of the API call itself. """ - import collections import concurrent.futures import heapq @@ -1128,7 +1127,7 @@ class BaseEventLoop(events.AbstractEventLoop): transport = yield from self._make_subprocess_transport( protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs) if self._debug: - logger.info('%s: %r' % (debug_log, transport)) + logger.info('%s: %r', debug_log, transport) return transport, protocol @coroutine @@ -1158,7 +1157,7 @@ class BaseEventLoop(events.AbstractEventLoop): protocol, popen_args, False, stdin, stdout, stderr, bufsize, **kwargs) if self._debug: - logger.info('%s: %r' % (debug_log, transport)) + logger.info('%s: %r', debug_log, transport) return transport, protocol def get_exception_handler(self): @@ -1238,7 +1237,9 @@ class BaseEventLoop(events.AbstractEventLoop): - 'handle' (optional): Handle instance; - 'protocol' (optional): Protocol instance; - 'transport' (optional): Transport instance; - - 'socket' (optional): Socket instance. + - 'socket' (optional): Socket instance; + - 'asyncgen' (optional): Asynchronous generator that caused + the exception. New keys maybe introduced in the future. diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py index 35c945c4368..4c66546428b 100644 --- a/Lib/asyncio/tasks.py +++ b/Lib/asyncio/tasks.py @@ -519,7 +519,7 @@ def sleep(delay, result=None, *, loop=None): h.cancel() -def async(coro_or_future, *, loop=None): +def async_(coro_or_future, *, loop=None): """Wrap a coroutine in a future. If the argument is a Future, it is returned directly. @@ -532,6 +532,11 @@ def async(coro_or_future, *, loop=None): return ensure_future(coro_or_future, loop=loop) +# Silence DeprecationWarning: +globals()['async'] = async_ +async_.__name__ = 'async' +del async_ + def ensure_future(coro_or_future, *, loop=None): """Wrap a coroutine or an awaitable in a future.