From ca9b36cd1a384e5ecb56d9df9a59144240353ef0 Mon Sep 17 00:00:00 2001 From: Yury Selivanov Date: Sat, 23 Dec 2017 15:04:15 -0500 Subject: [PATCH] bpo-32415: Add asyncio.Task.get_loop() and Future.get_loop() (#4992) --- Doc/library/asyncio-task.rst | 6 ++ Lib/asyncio/base_events.py | 2 +- Lib/asyncio/futures.py | 20 +++- Lib/asyncio/tasks.py | 39 ++++---- Lib/test/test_asyncio/test_futures.py | 1 + Lib/test/test_asyncio/test_tasks.py | 13 ++- .../2017-12-23-12-45-00.bpo-32415.YufXTU.rst | 1 + Modules/_asynciomodule.c | 98 +++++++++++++------ Modules/clinic/_asynciomodule.c.h | 48 +++++---- 9 files changed, 152 insertions(+), 76 deletions(-) create mode 100644 Misc/NEWS.d/next/Library/2017-12-23-12-45-00.bpo-32415.YufXTU.rst diff --git a/Doc/library/asyncio-task.rst b/Doc/library/asyncio-task.rst index d85dddfa02e..71dbe06c899 100644 --- a/Doc/library/asyncio-task.rst +++ b/Doc/library/asyncio-task.rst @@ -306,6 +306,12 @@ Future If the future is already done when this method is called, raises :exc:`InvalidStateError`. + .. method:: get_loop() + + Return the event loop the future object is bound to. + + .. versionadded:: 3.7 + Example: Future with run_until_complete() ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py index 2ab8a76e0c5..96cc4f02588 100644 --- a/Lib/asyncio/base_events.py +++ b/Lib/asyncio/base_events.py @@ -145,7 +145,7 @@ def _run_until_complete_cb(fut): # Issue #22429: run_forever() already finished, no need to # stop it. return - fut._loop.stop() + futures._get_loop(fut).stop() class Server(events.AbstractServer): diff --git a/Lib/asyncio/futures.py b/Lib/asyncio/futures.py index b310962f9fe..24843c016a7 100644 --- a/Lib/asyncio/futures.py +++ b/Lib/asyncio/futures.py @@ -105,6 +105,10 @@ class Future: context['source_traceback'] = self._source_traceback self._loop.call_exception_handler(context) + def get_loop(self): + """Return the event loop the Future is bound to.""" + return self._loop + def cancel(self): """Cancel the future and schedule callbacks. @@ -249,6 +253,18 @@ class Future: _PyFuture = Future +def _get_loop(fut): + # Tries to call Future.get_loop() if it's available. + # Otherwise fallbacks to using the old '_loop' property. + try: + get_loop = fut.get_loop + except AttributeError: + pass + else: + return get_loop() + return fut._loop + + def _set_result_unless_cancelled(fut, result): """Helper setting the result only if the future was not cancelled.""" if fut.cancelled(): @@ -304,8 +320,8 @@ def _chain_future(source, destination): if not isfuture(destination) and not isinstance(destination, concurrent.futures.Future): raise TypeError('A future is required for destination argument') - source_loop = source._loop if isfuture(source) else None - dest_loop = destination._loop if isfuture(destination) else None + source_loop = _get_loop(source) if isfuture(source) else None + dest_loop = _get_loop(destination) if isfuture(destination) else None def _set_state(future, other): if isfuture(future): diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py index ff8a486b544..572e7073338 100644 --- a/Lib/asyncio/tasks.py +++ b/Lib/asyncio/tasks.py @@ -34,7 +34,7 @@ def all_tasks(loop=None): """Return a set of all tasks for the loop.""" if loop is None: loop = events.get_event_loop() - return {t for t, l in _all_tasks.items() if l is loop} + return {t for t in _all_tasks if futures._get_loop(t) is loop} class Task(futures.Future): @@ -96,7 +96,7 @@ class Task(futures.Future): self._coro = coro self._loop.call_soon(self._step) - _register_task(self._loop, self) + _register_task(self) def __del__(self): if self._state == futures._PENDING and self._log_destroy_pending: @@ -215,7 +215,7 @@ class Task(futures.Future): blocking = getattr(result, '_asyncio_future_blocking', None) if blocking is not None: # Yielded Future must come from Future.__iter__(). - if result._loop is not self._loop: + if futures._get_loop(result) is not self._loop: new_exc = RuntimeError( f'Task {self!r} got Future ' f'{result!r} attached to a different loop') @@ -510,9 +510,9 @@ async def sleep(delay, result=None, *, loop=None): if loop is None: loop = events.get_event_loop() future = loop.create_future() - h = future._loop.call_later(delay, - futures._set_result_unless_cancelled, - future, result) + h = loop.call_later(delay, + futures._set_result_unless_cancelled, + future, result) try: return await future finally: @@ -525,7 +525,7 @@ def ensure_future(coro_or_future, *, loop=None): If the argument is a Future, it is returned directly. """ if futures.isfuture(coro_or_future): - if loop is not None and loop is not coro_or_future._loop: + if loop is not None and loop is not futures._get_loop(coro_or_future): raise ValueError('loop argument must agree with Future') return coro_or_future elif coroutines.iscoroutine(coro_or_future): @@ -655,7 +655,7 @@ def gather(*coros_or_futures, loop=None, return_exceptions=False): if arg not in arg_to_fut: fut = ensure_future(arg, loop=loop) if loop is None: - loop = fut._loop + loop = futures._get_loop(fut) if fut is not arg: # 'arg' was not a Future, therefore, 'fut' is a new # Future created specifically for 'arg'. Since the caller @@ -707,7 +707,7 @@ def shield(arg, *, loop=None): if inner.done(): # Shortcut. return inner - loop = inner._loop + loop = futures._get_loop(inner) outer = loop.create_future() def _done_callback(inner): @@ -751,23 +751,17 @@ def run_coroutine_threadsafe(coro, loop): return future -# WeakKeyDictionary of {Task: EventLoop} containing all tasks alive. -# Task should be a weak reference to remove entry on task garbage -# collection, EventLoop is required -# to not access to private task._loop attribute. -_all_tasks = weakref.WeakKeyDictionary() +# WeakSet containing all alive tasks. +_all_tasks = weakref.WeakSet() # Dictionary containing tasks that are currently active in # all running event loops. {EventLoop: Task} _current_tasks = {} -def _register_task(loop, task): - """Register a new task in asyncio as executed by loop. - - Returns None. - """ - _all_tasks[task] = loop +def _register_task(task): + """Register a new task in asyncio as executed by loop.""" + _all_tasks.add(task) def _enter_task(loop, task): @@ -786,8 +780,9 @@ def _leave_task(loop, task): del _current_tasks[loop] -def _unregister_task(loop, task): - _all_tasks.pop(task, None) +def _unregister_task(task): + """Unregister a task.""" + _all_tasks.discard(task) _py_register_task = _register_task diff --git a/Lib/test/test_asyncio/test_futures.py b/Lib/test/test_asyncio/test_futures.py index 5652a42690e..f777a420b29 100644 --- a/Lib/test/test_asyncio/test_futures.py +++ b/Lib/test/test_asyncio/test_futures.py @@ -139,6 +139,7 @@ class BaseFutureTests: asyncio.set_event_loop(self.loop) f = self._new_future() self.assertIs(f._loop, self.loop) + self.assertIs(f.get_loop(), self.loop) def test_constructor_positional(self): # Make sure Future doesn't accept a positional argument diff --git a/Lib/test/test_asyncio/test_tasks.py b/Lib/test/test_asyncio/test_tasks.py index f1dbb99d4fc..84669cd6c7e 100644 --- a/Lib/test/test_asyncio/test_tasks.py +++ b/Lib/test/test_asyncio/test_tasks.py @@ -141,6 +141,7 @@ class BaseTaskTests: self.assertTrue(t.done()) self.assertEqual(t.result(), 'ok') self.assertIs(t._loop, self.loop) + self.assertIs(t.get_loop(), self.loop) loop = asyncio.new_event_loop() self.set_event_loop(loop) @@ -2310,10 +2311,11 @@ class BaseTaskIntrospectionTests: def test__register_task(self): task = mock.Mock() loop = mock.Mock() + task.get_loop = lambda: loop self.assertEqual(asyncio.all_tasks(loop), set()) - self._register_task(loop, task) + self._register_task(task) self.assertEqual(asyncio.all_tasks(loop), {task}) - self._unregister_task(loop, task) + self._unregister_task(task) def test__enter_task(self): task = mock.Mock() @@ -2360,14 +2362,15 @@ class BaseTaskIntrospectionTests: def test__unregister_task(self): task = mock.Mock() loop = mock.Mock() - self._register_task(loop, task) - self._unregister_task(loop, task) + task.get_loop = lambda: loop + self._register_task(task) + self._unregister_task(task) self.assertEqual(asyncio.all_tasks(loop), set()) def test__unregister_task_not_registered(self): task = mock.Mock() loop = mock.Mock() - self._unregister_task(loop, task) + self._unregister_task(task) self.assertEqual(asyncio.all_tasks(loop), set()) diff --git a/Misc/NEWS.d/next/Library/2017-12-23-12-45-00.bpo-32415.YufXTU.rst b/Misc/NEWS.d/next/Library/2017-12-23-12-45-00.bpo-32415.YufXTU.rst new file mode 100644 index 00000000000..f1f57376241 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2017-12-23-12-45-00.bpo-32415.YufXTU.rst @@ -0,0 +1 @@ +asyncio: Add Task.get_loop() and Future.get_loop() diff --git a/Modules/_asynciomodule.c b/Modules/_asynciomodule.c index f52297d33f2..25acd552b18 100644 --- a/Modules/_asynciomodule.c +++ b/Modules/_asynciomodule.c @@ -16,7 +16,6 @@ _Py_IDENTIFIER(call_soon); _Py_IDENTIFIER(cancel); _Py_IDENTIFIER(current_task); _Py_IDENTIFIER(get_event_loop); -_Py_IDENTIFIER(pop); _Py_IDENTIFIER(send); _Py_IDENTIFIER(throw); _Py_IDENTIFIER(_step); @@ -39,15 +38,12 @@ static PyObject *asyncio_InvalidStateError; static PyObject *asyncio_CancelledError; -/* WeakKeyDictionary of {Task: EventLoop} containing all tasks alive. - Task should be a weak reference to remove entry on task garbage - collection, EventLoop is required - to not access to private task._loop attribute. */ -static PyObject *current_tasks; +/* WeakSet containing all alive tasks. */ +static PyObject *all_tasks; /* Dictionary containing tasks that are currently active in all running event loops. {EventLoop: Task} */ -static PyObject *all_tasks; +static PyObject *current_tasks; /* An isinstance type cache for the 'is_coroutine()' function. */ static PyObject *iscoroutine_typecache; @@ -186,6 +182,31 @@ is_coroutine(PyObject *coro) } +static PyObject * +get_future_loop(PyObject *fut) +{ + /* Implementation of `asyncio.futures._get_loop` */ + + _Py_IDENTIFIER(get_loop); + _Py_IDENTIFIER(_loop); + + if (Future_CheckExact(fut) || Task_CheckExact(fut)) { + PyObject *loop = ((FutureObj *)fut)->fut_loop; + Py_INCREF(loop); + return loop; + } + + PyObject *getloop = _PyObject_GetAttrId(fut, &PyId_get_loop); + if (getloop != NULL) { + PyObject *res = _PyObject_CallNoArg(getloop); + Py_DECREF(getloop); + return res; + } + + return _PyObject_GetAttrId(fut, &PyId__loop); +} + + static int get_running_loop(PyObject **loop) { @@ -977,6 +998,20 @@ _asyncio_Future_done_impl(FutureObj *self) } } +/*[clinic input] +_asyncio.Future.get_loop + +Return the event loop the Future is bound to. +[clinic start generated code]*/ + +static PyObject * +_asyncio_Future_get_loop_impl(FutureObj *self) +/*[clinic end generated code: output=119b6ea0c9816c3f input=cba48c2136c79d1f]*/ +{ + Py_INCREF(self->fut_loop); + return self->fut_loop; +} + static PyObject * FutureObj_get_blocking(FutureObj *fut) { @@ -1295,6 +1330,7 @@ static PyMethodDef FutureType_methods[] = { _ASYNCIO_FUTURE_CANCEL_METHODDEF _ASYNCIO_FUTURE_CANCELLED_METHODDEF _ASYNCIO_FUTURE_DONE_METHODDEF + _ASYNCIO_FUTURE_GET_LOOP_METHODDEF _ASYNCIO_FUTURE__REPR_INFO_METHODDEF _ASYNCIO_FUTURE__SCHEDULE_CALLBACKS_METHODDEF {NULL, NULL} /* Sentinel */ @@ -1759,19 +1795,27 @@ TaskWakeupMethWrapper_new(TaskObj *task) /* ----- Task introspection helpers */ static int -register_task(PyObject *loop, PyObject *task) +register_task(PyObject *task) { - return PyObject_SetItem(all_tasks, task, loop); + _Py_IDENTIFIER(add); + + PyObject *res = _PyObject_CallMethodIdObjArgs( + all_tasks, &PyId_add, task, NULL); + if (res == NULL) { + return -1; + } + Py_DECREF(res); + return 0; } static int -unregister_task(PyObject *loop, PyObject *task) +unregister_task(PyObject *task) { - PyObject *res; + _Py_IDENTIFIER(discard); - res = _PyObject_CallMethodIdObjArgs(all_tasks, &PyId_pop, - task, Py_None, NULL); + PyObject *res = _PyObject_CallMethodIdObjArgs( + all_tasks, &PyId_discard, task, NULL); if (res == NULL) { return -1; } @@ -1871,7 +1915,7 @@ _asyncio_Task___init___impl(TaskObj *self, PyObject *coro, PyObject *loop) if (task_call_step_soon(self, NULL)) { return -1; } - return register_task(self->task_loop, (PyObject*)self); + return register_task((PyObject*)self); } static int @@ -2622,7 +2666,7 @@ set_exception: } /* Check if `result` future is attached to a different loop */ - PyObject *oloop = PyObject_GetAttrString(result, "_loop"); + PyObject *oloop = get_future_loop(result); if (oloop == NULL) { goto fail; } @@ -2928,7 +2972,6 @@ _asyncio_get_running_loop_impl(PyObject *module) /*[clinic input] _asyncio._register_task - loop: object task: object Register a new task in asyncio as executed by loop. @@ -2937,11 +2980,10 @@ Returns None. [clinic start generated code]*/ static PyObject * -_asyncio__register_task_impl(PyObject *module, PyObject *loop, - PyObject *task) -/*[clinic end generated code: output=54c5cb733dbe0f38 input=9b5fee38fcb2c288]*/ +_asyncio__register_task_impl(PyObject *module, PyObject *task) +/*[clinic end generated code: output=8672dadd69a7d4e2 input=21075aaea14dfbad]*/ { - if (register_task(loop, task) < 0) { + if (register_task(task) < 0) { return NULL; } Py_RETURN_NONE; @@ -2951,7 +2993,6 @@ _asyncio__register_task_impl(PyObject *module, PyObject *loop, /*[clinic input] _asyncio._unregister_task - loop: object task: object Unregister a task. @@ -2960,11 +3001,10 @@ Returns None. [clinic start generated code]*/ static PyObject * -_asyncio__unregister_task_impl(PyObject *module, PyObject *loop, - PyObject *task) -/*[clinic end generated code: output=f634743a76b84ebc input=51fa1820634ef331]*/ +_asyncio__unregister_task_impl(PyObject *module, PyObject *task) +/*[clinic end generated code: output=6e5585706d568a46 input=28fb98c3975f7bdc]*/ { - if (unregister_task(loop, task) < 0) { + if (unregister_task(task) < 0) { return NULL; } Py_RETURN_NONE; @@ -3123,11 +3163,11 @@ module_init(void) WITH_MOD("traceback") GET_MOD_ATTR(traceback_extract_stack, "extract_stack") - PyObject *weak_key_dict; + PyObject *weak_set; WITH_MOD("weakref") - GET_MOD_ATTR(weak_key_dict, "WeakKeyDictionary"); - all_tasks = _PyObject_CallNoArg(weak_key_dict); - Py_CLEAR(weak_key_dict); + GET_MOD_ATTR(weak_set, "WeakSet"); + all_tasks = _PyObject_CallNoArg(weak_set); + Py_CLEAR(weak_set); if (all_tasks == NULL) { goto fail; } diff --git a/Modules/clinic/_asynciomodule.c.h b/Modules/clinic/_asynciomodule.c.h index 9d5dea52c8e..6a35434ce3b 100644 --- a/Modules/clinic/_asynciomodule.c.h +++ b/Modules/clinic/_asynciomodule.c.h @@ -194,6 +194,24 @@ _asyncio_Future_done(FutureObj *self, PyObject *Py_UNUSED(ignored)) return _asyncio_Future_done_impl(self); } +PyDoc_STRVAR(_asyncio_Future_get_loop__doc__, +"get_loop($self, /)\n" +"--\n" +"\n" +"Return the event loop the Future is bound to."); + +#define _ASYNCIO_FUTURE_GET_LOOP_METHODDEF \ + {"get_loop", (PyCFunction)_asyncio_Future_get_loop, METH_NOARGS, _asyncio_Future_get_loop__doc__}, + +static PyObject * +_asyncio_Future_get_loop_impl(FutureObj *self); + +static PyObject * +_asyncio_Future_get_loop(FutureObj *self, PyObject *Py_UNUSED(ignored)) +{ + return _asyncio_Future_get_loop_impl(self); +} + PyDoc_STRVAR(_asyncio_Future__repr_info__doc__, "_repr_info($self, /)\n" "--\n" @@ -597,7 +615,7 @@ _asyncio_get_running_loop(PyObject *module, PyObject *Py_UNUSED(ignored)) } PyDoc_STRVAR(_asyncio__register_task__doc__, -"_register_task($module, /, loop, task)\n" +"_register_task($module, /, task)\n" "--\n" "\n" "Register a new task in asyncio as executed by loop.\n" @@ -608,30 +626,28 @@ PyDoc_STRVAR(_asyncio__register_task__doc__, {"_register_task", (PyCFunction)_asyncio__register_task, METH_FASTCALL|METH_KEYWORDS, _asyncio__register_task__doc__}, static PyObject * -_asyncio__register_task_impl(PyObject *module, PyObject *loop, - PyObject *task); +_asyncio__register_task_impl(PyObject *module, PyObject *task); static PyObject * _asyncio__register_task(PyObject *module, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames) { PyObject *return_value = NULL; - static const char * const _keywords[] = {"loop", "task", NULL}; - static _PyArg_Parser _parser = {"OO:_register_task", _keywords, 0}; - PyObject *loop; + static const char * const _keywords[] = {"task", NULL}; + static _PyArg_Parser _parser = {"O:_register_task", _keywords, 0}; PyObject *task; if (!_PyArg_ParseStackAndKeywords(args, nargs, kwnames, &_parser, - &loop, &task)) { + &task)) { goto exit; } - return_value = _asyncio__register_task_impl(module, loop, task); + return_value = _asyncio__register_task_impl(module, task); exit: return return_value; } PyDoc_STRVAR(_asyncio__unregister_task__doc__, -"_unregister_task($module, /, loop, task)\n" +"_unregister_task($module, /, task)\n" "--\n" "\n" "Unregister a task.\n" @@ -642,23 +658,21 @@ PyDoc_STRVAR(_asyncio__unregister_task__doc__, {"_unregister_task", (PyCFunction)_asyncio__unregister_task, METH_FASTCALL|METH_KEYWORDS, _asyncio__unregister_task__doc__}, static PyObject * -_asyncio__unregister_task_impl(PyObject *module, PyObject *loop, - PyObject *task); +_asyncio__unregister_task_impl(PyObject *module, PyObject *task); static PyObject * _asyncio__unregister_task(PyObject *module, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames) { PyObject *return_value = NULL; - static const char * const _keywords[] = {"loop", "task", NULL}; - static _PyArg_Parser _parser = {"OO:_unregister_task", _keywords, 0}; - PyObject *loop; + static const char * const _keywords[] = {"task", NULL}; + static _PyArg_Parser _parser = {"O:_unregister_task", _keywords, 0}; PyObject *task; if (!_PyArg_ParseStackAndKeywords(args, nargs, kwnames, &_parser, - &loop, &task)) { + &task)) { goto exit; } - return_value = _asyncio__unregister_task_impl(module, loop, task); + return_value = _asyncio__unregister_task_impl(module, task); exit: return return_value; @@ -733,4 +747,4 @@ _asyncio__leave_task(PyObject *module, PyObject *const *args, Py_ssize_t nargs, exit: return return_value; } -/*[clinic end generated code: output=0033af17965b51b4 input=a9049054013a1b77]*/ +/*[clinic end generated code: output=5d100b3d74f2a0f4 input=a9049054013a1b77]*/