diff --git a/Lib/asyncio/events.py b/Lib/asyncio/events.py index e425b06e42f..a00f861a9e5 100644 --- a/Lib/asyncio/events.py +++ b/Lib/asyncio/events.py @@ -652,6 +652,7 @@ def get_running_loop(): This function is thread-specific. """ + # NOTE: this function is implemented in C (see _asynciomodule.c) loop = _get_running_loop() if loop is None: raise RuntimeError('no running event loop') @@ -664,6 +665,7 @@ def _get_running_loop(): This is a low-level function intended to be used by event loops. This function is thread-specific. """ + # NOTE: this function is implemented in C (see _asynciomodule.c) running_loop, pid = _running_loop.loop_pid if running_loop is not None and pid == os.getpid(): return running_loop @@ -675,6 +677,7 @@ def _set_running_loop(loop): This is a low-level function intended to be used by event loops. This function is thread-specific. """ + # NOTE: this function is implemented in C (see _asynciomodule.c) _running_loop.loop_pid = (loop, os.getpid()) @@ -711,6 +714,7 @@ def get_event_loop(): If there is no running event loop set, the function will return the result of `get_event_loop_policy().get_event_loop()` call. """ + # NOTE: this function is implemented in C (see _asynciomodule.c) current_loop = _get_running_loop() if current_loop is not None: return current_loop @@ -736,3 +740,26 @@ def set_child_watcher(watcher): """Equivalent to calling get_event_loop_policy().set_child_watcher(watcher).""" return get_event_loop_policy().set_child_watcher(watcher) + + +# Alias pure-Python implementations for testing purposes. +_py__get_running_loop = _get_running_loop +_py__set_running_loop = _set_running_loop +_py_get_running_loop = get_running_loop +_py_get_event_loop = get_event_loop + + +try: + # get_event_loop() is one of the most frequently called + # functions in asyncio. Pure Python implementation is + # about 4 times slower than C-accelerated. + from _asyncio import (_get_running_loop, _set_running_loop, + get_running_loop, get_event_loop) +except ImportError: + pass +else: + # Alias C implementations for testing purposes. + _c__get_running_loop = _get_running_loop + _c__set_running_loop = _set_running_loop + _c_get_running_loop = get_running_loop + _c_get_event_loop = get_event_loop diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py index 1315febe403..144921ad0ec 100644 --- a/Lib/test/test_asyncio/test_events.py +++ b/Lib/test/test_asyncio/test_events.py @@ -27,6 +27,7 @@ if sys.platform != 'win32': import asyncio from asyncio import coroutines +from asyncio import events from asyncio import proactor_events from asyncio import selector_events from test.test_asyncio import utils as test_utils @@ -2145,23 +2146,6 @@ else: asyncio.set_child_watcher(None) super().tearDown() - def test_get_event_loop_new_process(self): - # Issue bpo-32126: The multiprocessing module used by - # ProcessPoolExecutor is not functional when the - # multiprocessing.synchronize module cannot be imported. - support.import_module('multiprocessing.synchronize') - async def main(): - pool = concurrent.futures.ProcessPoolExecutor() - result = await self.loop.run_in_executor( - pool, _test_get_event_loop_new_process__sub_proc) - pool.shutdown() - return result - - self.unpatch_get_running_loop() - - self.assertEqual( - self.loop.run_until_complete(main()), - 'hello') if hasattr(selectors, 'KqueueSelector'): class KqueueEventLoopTests(UnixEventLoopTestsMixin, @@ -2722,17 +2706,95 @@ class PolicyTests(unittest.TestCase): self.assertIs(policy, asyncio.get_event_loop_policy()) self.assertIsNot(policy, old_policy) + +class GetEventLoopTestsMixin: + + _get_running_loop_impl = None + _set_running_loop_impl = None + get_running_loop_impl = None + get_event_loop_impl = None + + def setUp(self): + self._get_running_loop_saved = events._get_running_loop + self._set_running_loop_saved = events._set_running_loop + self.get_running_loop_saved = events.get_running_loop + self.get_event_loop_saved = events.get_event_loop + + events._get_running_loop = type(self)._get_running_loop_impl + events._set_running_loop = type(self)._set_running_loop_impl + events.get_running_loop = type(self).get_running_loop_impl + events.get_event_loop = type(self).get_event_loop_impl + + asyncio._get_running_loop = type(self)._get_running_loop_impl + asyncio._set_running_loop = type(self)._set_running_loop_impl + asyncio.get_running_loop = type(self).get_running_loop_impl + asyncio.get_event_loop = type(self).get_event_loop_impl + + super().setUp() + + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.loop) + + watcher = asyncio.SafeChildWatcher() + watcher.attach_loop(self.loop) + asyncio.set_child_watcher(watcher) + + def tearDown(self): + try: + asyncio.set_child_watcher(None) + super().tearDown() + finally: + self.loop.close() + asyncio.set_event_loop(None) + + events._get_running_loop = self._get_running_loop_saved + events._set_running_loop = self._set_running_loop_saved + events.get_running_loop = self.get_running_loop_saved + events.get_event_loop = self.get_event_loop_saved + + asyncio._get_running_loop = self._get_running_loop_saved + asyncio._set_running_loop = self._set_running_loop_saved + asyncio.get_running_loop = self.get_running_loop_saved + asyncio.get_event_loop = self.get_event_loop_saved + + if sys.platform != 'win32': + + def test_get_event_loop_new_process(self): + # Issue bpo-32126: The multiprocessing module used by + # ProcessPoolExecutor is not functional when the + # multiprocessing.synchronize module cannot be imported. + support.import_module('multiprocessing.synchronize') + + async def main(): + pool = concurrent.futures.ProcessPoolExecutor() + result = await self.loop.run_in_executor( + pool, _test_get_event_loop_new_process__sub_proc) + pool.shutdown() + return result + + self.assertEqual( + self.loop.run_until_complete(main()), + 'hello') + def test_get_event_loop_returns_running_loop(self): + class TestError(Exception): + pass + class Policy(asyncio.DefaultEventLoopPolicy): def get_event_loop(self): - raise NotImplementedError - - loop = None + raise TestError old_policy = asyncio.get_event_loop_policy() try: asyncio.set_event_loop_policy(Policy()) loop = asyncio.new_event_loop() + + with self.assertRaises(TestError): + asyncio.get_event_loop() + asyncio.set_event_loop(None) + with self.assertRaises(TestError): + asyncio.get_event_loop() + with self.assertRaisesRegex(RuntimeError, 'no running'): self.assertIs(asyncio.get_running_loop(), None) self.assertIs(asyncio._get_running_loop(), None) @@ -2743,6 +2805,15 @@ class PolicyTests(unittest.TestCase): self.assertIs(asyncio._get_running_loop(), loop) loop.run_until_complete(func()) + + asyncio.set_event_loop(loop) + with self.assertRaises(TestError): + asyncio.get_event_loop() + + asyncio.set_event_loop(None) + with self.assertRaises(TestError): + asyncio.get_event_loop() + finally: asyncio.set_event_loop_policy(old_policy) if loop is not None: @@ -2754,5 +2825,27 @@ class PolicyTests(unittest.TestCase): self.assertIs(asyncio._get_running_loop(), None) +class TestPyGetEventLoop(GetEventLoopTestsMixin, unittest.TestCase): + + _get_running_loop_impl = events._py__get_running_loop + _set_running_loop_impl = events._py__set_running_loop + get_running_loop_impl = events._py_get_running_loop + get_event_loop_impl = events._py_get_event_loop + + +try: + import _asyncio # NoQA +except ImportError: + pass +else: + + class TestCGetEventLoop(GetEventLoopTestsMixin, unittest.TestCase): + + _get_running_loop_impl = events._c__get_running_loop + _set_running_loop_impl = events._c__set_running_loop + get_running_loop_impl = events._c_get_running_loop + get_event_loop_impl = events._c_get_event_loop + + if __name__ == '__main__': unittest.main() diff --git a/Misc/NEWS.d/next/Library/2017-12-12-18-01-01.bpo-32296.bwscHz.rst b/Misc/NEWS.d/next/Library/2017-12-12-18-01-01.bpo-32296.bwscHz.rst new file mode 100644 index 00000000000..4100d48518e --- /dev/null +++ b/Misc/NEWS.d/next/Library/2017-12-12-18-01-01.bpo-32296.bwscHz.rst @@ -0,0 +1,2 @@ +Implement asyncio._get_running_loop() and get_event_loop() in C. This makes +them 4x faster. diff --git a/Modules/_asynciomodule.c b/Modules/_asynciomodule.c index 2c64c55e7e3..01c38b80b95 100644 --- a/Modules/_asynciomodule.c +++ b/Modules/_asynciomodule.c @@ -9,9 +9,11 @@ module _asyncio /* identifiers used from some functions */ +_Py_IDENTIFIER(__asyncio_running_event_loop__); _Py_IDENTIFIER(add_done_callback); _Py_IDENTIFIER(call_soon); _Py_IDENTIFIER(cancel); +_Py_IDENTIFIER(get_event_loop); _Py_IDENTIFIER(send); _Py_IDENTIFIER(throw); _Py_IDENTIFIER(_step); @@ -23,7 +25,7 @@ _Py_IDENTIFIER(_wakeup); static PyObject *all_tasks; static PyObject *current_tasks; static PyObject *traceback_extract_stack; -static PyObject *asyncio_get_event_loop; +static PyObject *asyncio_get_event_loop_policy; static PyObject *asyncio_future_repr_info_func; static PyObject *asyncio_task_repr_info_func; static PyObject *asyncio_task_get_stack_func; @@ -31,6 +33,7 @@ static PyObject *asyncio_task_print_stack_func; static PyObject *asyncio_InvalidStateError; static PyObject *asyncio_CancelledError; static PyObject *inspect_isgenerator; +static PyObject *os_getpid; typedef enum { @@ -88,6 +91,134 @@ class _asyncio.Future "FutureObj *" "&Future_Type" static PyObject* future_new_iter(PyObject *); static inline int future_call_schedule_callbacks(FutureObj *); + +static int +get_running_loop(PyObject **loop) +{ + PyObject *ts_dict; + PyObject *running_tuple; + PyObject *running_loop; + PyObject *running_loop_pid; + PyObject *current_pid; + int same_pid; + + ts_dict = PyThreadState_GetDict(); // borrowed + if (ts_dict == NULL) { + PyErr_SetString( + PyExc_RuntimeError, "thread-local storage is not available"); + goto error; + } + + running_tuple = _PyDict_GetItemId( + ts_dict, &PyId___asyncio_running_event_loop__); // borrowed + if (running_tuple == NULL) { + /* _PyDict_GetItemId doesn't set an error if key is not found */ + goto not_found; + } + + assert(PyTuple_CheckExact(running_tuple)); + assert(PyTuple_Size(running_tuple) == 2); + running_loop = PyTuple_GET_ITEM(running_tuple, 0); // borrowed + running_loop_pid = PyTuple_GET_ITEM(running_tuple, 1); // borrowed + + if (running_loop == Py_None) { + goto not_found; + } + + current_pid = _PyObject_CallNoArg(os_getpid); + if (current_pid == NULL) { + goto error; + } + same_pid = PyObject_RichCompareBool(current_pid, running_loop_pid, Py_EQ); + Py_DECREF(current_pid); + if (same_pid == -1) { + goto error; + } + + if (same_pid) { + // current_pid == running_loop_pid + goto found; + } + +not_found: + *loop = NULL; + return 0; + +found: + Py_INCREF(running_loop); + *loop = running_loop; + return 0; + +error: + *loop = NULL; + return -1; +} + + +static int +set_running_loop(PyObject *loop) +{ + PyObject *ts_dict; + PyObject *running_tuple; + PyObject *current_pid; + + ts_dict = PyThreadState_GetDict(); // borrowed + if (ts_dict == NULL) { + PyErr_SetString( + PyExc_RuntimeError, "thread-local storage is not available"); + return -1; + } + + current_pid = _PyObject_CallNoArg(os_getpid); + if (current_pid == NULL) { + return -1; + } + + running_tuple = PyTuple_New(2); + if (running_tuple == NULL) { + Py_DECREF(current_pid); + return -1; + } + + Py_INCREF(loop); + PyTuple_SET_ITEM(running_tuple, 0, loop); + PyTuple_SET_ITEM(running_tuple, 1, current_pid); // borrowed + + if (_PyDict_SetItemId( + ts_dict, &PyId___asyncio_running_event_loop__, running_tuple)) { + Py_DECREF(running_tuple); // will cleanup loop & current_pid + return -1; + } + Py_DECREF(running_tuple); + + return 0; +} + + +static PyObject * +get_event_loop(void) +{ + PyObject *loop; + PyObject *policy; + + if (get_running_loop(&loop)) { + return NULL; + } + if (loop != NULL) { + return loop; + } + + policy = _PyObject_CallNoArg(asyncio_get_event_loop_policy); + if (policy == NULL) { + return NULL; + } + + loop = _PyObject_CallMethodId(policy, &PyId_get_event_loop, NULL); + Py_DECREF(policy); + return loop; +} + + static int future_schedule_callbacks(FutureObj *fut) { @@ -140,7 +271,7 @@ future_init(FutureObj *fut, PyObject *loop) _Py_IDENTIFIER(get_debug); if (loop == Py_None) { - loop = _PyObject_CallNoArg(asyncio_get_event_loop); + loop = get_event_loop(); if (loop == NULL) { return -1; } @@ -1449,7 +1580,7 @@ _asyncio_Task_current_task_impl(PyTypeObject *type, PyObject *loop) PyObject *res; if (loop == Py_None) { - loop = _PyObject_CallNoArg(asyncio_get_event_loop); + loop = get_event_loop(); if (loop == NULL) { return NULL; } @@ -1536,7 +1667,7 @@ _asyncio_Task_all_tasks_impl(PyTypeObject *type, PyObject *loop) PyObject *res; if (loop == Py_None) { - loop = _PyObject_CallNoArg(asyncio_get_event_loop); + loop = get_event_loop(); if (loop == NULL) { return NULL; } @@ -2368,6 +2499,100 @@ task_wakeup(TaskObj *task, PyObject *o) } +/*********************** Functions **************************/ + + +/*[clinic input] +_asyncio._get_running_loop + +Return the running event loop or None. + +This is a low-level function intended to be used by event loops. +This function is thread-specific. + +[clinic start generated code]*/ + +static PyObject * +_asyncio__get_running_loop_impl(PyObject *module) +/*[clinic end generated code: output=b4390af721411a0a input=0a21627e25a4bd43]*/ +{ + PyObject *loop; + if (get_running_loop(&loop)) { + return NULL; + } + if (loop == NULL) { + /* There's no currently running event loop */ + Py_RETURN_NONE; + } + return loop; +} + +/*[clinic input] +_asyncio._set_running_loop + loop: 'O' + / + +Set the running event loop. + +This is a low-level function intended to be used by event loops. +This function is thread-specific. +[clinic start generated code]*/ + +static PyObject * +_asyncio__set_running_loop(PyObject *module, PyObject *loop) +/*[clinic end generated code: output=ae56bf7a28ca189a input=4c9720233d606604]*/ +{ + if (set_running_loop(loop)) { + return NULL; + } + Py_RETURN_NONE; +} + +/*[clinic input] +_asyncio.get_event_loop + +Return an asyncio event loop. + +When called from a coroutine or a callback (e.g. scheduled with +call_soon or similar API), this function will always return the +running event loop. + +If there is no running event loop set, the function will return +the result of `get_event_loop_policy().get_event_loop()` call. +[clinic start generated code]*/ + +static PyObject * +_asyncio_get_event_loop_impl(PyObject *module) +/*[clinic end generated code: output=2a2d8b2f824c648b input=9364bf2916c8655d]*/ +{ + return get_event_loop(); +} + +/*[clinic input] +_asyncio.get_running_loop + +Return the running event loop. Raise a RuntimeError if there is none. + +This function is thread-specific. +[clinic start generated code]*/ + +static PyObject * +_asyncio_get_running_loop_impl(PyObject *module) +/*[clinic end generated code: output=c247b5f9e529530e input=2a3bf02ba39f173d]*/ +{ + PyObject *loop; + if (get_running_loop(&loop)) { + return NULL; + } + if (loop == NULL) { + /* There's no currently running event loop */ + PyErr_SetString( + PyExc_RuntimeError, "no running event loop"); + } + return loop; +} + + /*********************** Module **************************/ @@ -2377,7 +2602,7 @@ module_free(void *m) Py_CLEAR(current_tasks); Py_CLEAR(all_tasks); Py_CLEAR(traceback_extract_stack); - Py_CLEAR(asyncio_get_event_loop); + Py_CLEAR(asyncio_get_event_loop_policy); Py_CLEAR(asyncio_future_repr_info_func); Py_CLEAR(asyncio_task_repr_info_func); Py_CLEAR(asyncio_task_get_stack_func); @@ -2385,6 +2610,7 @@ module_free(void *m) Py_CLEAR(asyncio_InvalidStateError); Py_CLEAR(asyncio_CancelledError); Py_CLEAR(inspect_isgenerator); + Py_CLEAR(os_getpid); } static int @@ -2407,7 +2633,7 @@ module_init(void) } WITH_MOD("asyncio.events") - GET_MOD_ATTR(asyncio_get_event_loop, "get_event_loop") + GET_MOD_ATTR(asyncio_get_event_loop_policy, "get_event_loop_policy") WITH_MOD("asyncio.base_futures") GET_MOD_ATTR(asyncio_future_repr_info_func, "_future_repr_info") @@ -2422,6 +2648,9 @@ module_init(void) WITH_MOD("inspect") GET_MOD_ATTR(inspect_isgenerator, "isgenerator") + WITH_MOD("os") + GET_MOD_ATTR(os_getpid, "getpid") + WITH_MOD("traceback") GET_MOD_ATTR(traceback_extract_stack, "extract_stack") @@ -2452,12 +2681,20 @@ fail: PyDoc_STRVAR(module_doc, "Accelerator module for asyncio"); +static PyMethodDef asyncio_methods[] = { + _ASYNCIO_GET_EVENT_LOOP_METHODDEF + _ASYNCIO_GET_RUNNING_LOOP_METHODDEF + _ASYNCIO__GET_RUNNING_LOOP_METHODDEF + _ASYNCIO__SET_RUNNING_LOOP_METHODDEF + {NULL, NULL} +}; + static struct PyModuleDef _asynciomodule = { PyModuleDef_HEAD_INIT, /* m_base */ "_asyncio", /* m_name */ module_doc, /* m_doc */ -1, /* m_size */ - NULL, /* m_methods */ + asyncio_methods, /* m_methods */ NULL, /* m_slots */ NULL, /* m_traverse */ NULL, /* m_clear */ diff --git a/Modules/clinic/_asynciomodule.c.h b/Modules/clinic/_asynciomodule.c.h index 7627849bb88..8022d1c6f50 100644 --- a/Modules/clinic/_asynciomodule.c.h +++ b/Modules/clinic/_asynciomodule.c.h @@ -517,4 +517,82 @@ _asyncio_Task__wakeup(TaskObj *self, PyObject **args, Py_ssize_t nargs, PyObject exit: return return_value; } -/*[clinic end generated code: output=b92f9cd2b9fb37ef input=a9049054013a1b77]*/ + +PyDoc_STRVAR(_asyncio__get_running_loop__doc__, +"_get_running_loop($module, /)\n" +"--\n" +"\n" +"Return the running event loop or None.\n" +"\n" +"This is a low-level function intended to be used by event loops.\n" +"This function is thread-specific."); + +#define _ASYNCIO__GET_RUNNING_LOOP_METHODDEF \ + {"_get_running_loop", (PyCFunction)_asyncio__get_running_loop, METH_NOARGS, _asyncio__get_running_loop__doc__}, + +static PyObject * +_asyncio__get_running_loop_impl(PyObject *module); + +static PyObject * +_asyncio__get_running_loop(PyObject *module, PyObject *Py_UNUSED(ignored)) +{ + return _asyncio__get_running_loop_impl(module); +} + +PyDoc_STRVAR(_asyncio__set_running_loop__doc__, +"_set_running_loop($module, loop, /)\n" +"--\n" +"\n" +"Set the running event loop.\n" +"\n" +"This is a low-level function intended to be used by event loops.\n" +"This function is thread-specific."); + +#define _ASYNCIO__SET_RUNNING_LOOP_METHODDEF \ + {"_set_running_loop", (PyCFunction)_asyncio__set_running_loop, METH_O, _asyncio__set_running_loop__doc__}, + +PyDoc_STRVAR(_asyncio_get_event_loop__doc__, +"get_event_loop($module, /)\n" +"--\n" +"\n" +"Return an asyncio event loop.\n" +"\n" +"When called from a coroutine or a callback (e.g. scheduled with\n" +"call_soon or similar API), this function will always return the\n" +"running event loop.\n" +"\n" +"If there is no running event loop set, the function will return\n" +"the result of `get_event_loop_policy().get_event_loop()` call."); + +#define _ASYNCIO_GET_EVENT_LOOP_METHODDEF \ + {"get_event_loop", (PyCFunction)_asyncio_get_event_loop, METH_NOARGS, _asyncio_get_event_loop__doc__}, + +static PyObject * +_asyncio_get_event_loop_impl(PyObject *module); + +static PyObject * +_asyncio_get_event_loop(PyObject *module, PyObject *Py_UNUSED(ignored)) +{ + return _asyncio_get_event_loop_impl(module); +} + +PyDoc_STRVAR(_asyncio_get_running_loop__doc__, +"get_running_loop($module, /)\n" +"--\n" +"\n" +"Return the running event loop. Raise a RuntimeError if there is none.\n" +"\n" +"This function is thread-specific."); + +#define _ASYNCIO_GET_RUNNING_LOOP_METHODDEF \ + {"get_running_loop", (PyCFunction)_asyncio_get_running_loop, METH_NOARGS, _asyncio_get_running_loop__doc__}, + +static PyObject * +_asyncio_get_running_loop_impl(PyObject *module); + +static PyObject * +_asyncio_get_running_loop(PyObject *module, PyObject *Py_UNUSED(ignored)) +{ + return _asyncio_get_running_loop_impl(module); +} +/*[clinic end generated code: output=d40b94e629571d48 input=a9049054013a1b77]*/