diff --git a/Lib/asyncio/queues.py b/Lib/asyncio/queues.py index 512ea6037f8..2a8d3e76044 100644 --- a/Lib/asyncio/queues.py +++ b/Lib/asyncio/queues.py @@ -121,6 +121,13 @@ class Queue: await putter except: putter.cancel() # Just in case putter is not done yet. + try: + # Clean self._putters from canceled putters. + self._putters.remove(putter) + except ValueError: + # The putter could be removed from self._putters by a + # previous get_nowait call. + pass if not self.full() and not putter.cancelled(): # We were woken up by get_nowait(), but can't take # the call. Wake up the next in line. @@ -152,12 +159,13 @@ class Queue: await getter except: getter.cancel() # Just in case getter is not done yet. - try: + # Clean self._getters from canceled getters. self._getters.remove(getter) except ValueError: + # The getter could be removed from self._getters by a + # previous put_nowait call. pass - if not self.empty() and not getter.cancelled(): # We were woken up by put_nowait(), but can't take # the call. Wake up the next in line. diff --git a/Lib/test/test_asyncio/test_queues.py b/Lib/test/test_asyncio/test_queues.py index 8d78546318a..efe719ed39a 100644 --- a/Lib/test/test_asyncio/test_queues.py +++ b/Lib/test/test_asyncio/test_queues.py @@ -520,6 +520,56 @@ class QueuePutTests(_QueueTestBase): self.loop.run_until_complete( asyncio.gather(getter(), t0, t1, t2, t3, loop=self.loop)) + def test_cancelled_puts_not_being_held_in_self_putters(self): + def a_generator(): + yield 0.01 + yield 0.1 + + loop = self.new_test_loop(a_generator) + + # Full queue. + queue = asyncio.Queue(loop=loop, maxsize=1) + queue.put_nowait(1) + + # Task waiting for space to put an item in the queue. + put_task = loop.create_task(queue.put(1)) + loop.run_until_complete(asyncio.sleep(0.01, loop=loop)) + + # Check that the putter is correctly removed from queue._putters when + # the task is canceled. + self.assertEqual(len(queue._putters), 1) + put_task.cancel() + with self.assertRaises(asyncio.CancelledError): + loop.run_until_complete(put_task) + self.assertEqual(len(queue._putters), 0) + + def test_cancelled_put_silence_value_error_exception(self): + def gen(): + yield 0.01 + yield 0.1 + + loop = self.new_test_loop(gen) + + # Full Queue. + queue = asyncio.Queue(1, loop=loop) + queue.put_nowait(1) + + # Task waiting for space to put a item in the queue. + put_task = loop.create_task(queue.put(1)) + loop.run_until_complete(asyncio.sleep(0.01, loop=loop)) + + # get_nowait() remove the future of put_task from queue._putters. + queue.get_nowait() + # When canceled, queue.put is going to remove its future from + # self._putters but it was removed previously by queue.get_nowait(). + put_task.cancel() + + # The ValueError exception triggered by queue._putters.remove(putter) + # inside queue.put should be silenced. + # If the ValueError is silenced we should catch a CancelledError. + with self.assertRaises(asyncio.CancelledError): + loop.run_until_complete(put_task) + class LifoQueueTests(_QueueTestBase): diff --git a/Misc/NEWS.d/next/Library/2018-01-16-20-37-28.bpo-32574.ru8eZ9.rst b/Misc/NEWS.d/next/Library/2018-01-16-20-37-28.bpo-32574.ru8eZ9.rst new file mode 100644 index 00000000000..00650d4d02e --- /dev/null +++ b/Misc/NEWS.d/next/Library/2018-01-16-20-37-28.bpo-32574.ru8eZ9.rst @@ -0,0 +1,3 @@ +Fix memory leak in asyncio.Queue, when the queue has limited size and it is +full, the cancelation of queue.put() can cause a memory leak. Patch by: José +Melero.