Issue #23812: Fix asyncio.Queue.get() to avoid loosing items on cancellation.
Patch by Gustavo J. A. M. Carneiro.
This commit is contained in:
commit
22506d24ee
|
@ -47,7 +47,7 @@ class Queue:
|
||||||
|
|
||||||
# Futures.
|
# Futures.
|
||||||
self._getters = collections.deque()
|
self._getters = collections.deque()
|
||||||
# Pairs of (item, Future).
|
# Futures
|
||||||
self._putters = collections.deque()
|
self._putters = collections.deque()
|
||||||
self._unfinished_tasks = 0
|
self._unfinished_tasks = 0
|
||||||
self._finished = locks.Event(loop=self._loop)
|
self._finished = locks.Event(loop=self._loop)
|
||||||
|
@ -98,7 +98,7 @@ class Queue:
|
||||||
|
|
||||||
def _consume_done_putters(self):
|
def _consume_done_putters(self):
|
||||||
# Delete waiters at the head of the put() queue who've timed out.
|
# Delete waiters at the head of the put() queue who've timed out.
|
||||||
while self._putters and self._putters[0][1].done():
|
while self._putters and self._putters[0].done():
|
||||||
self._putters.popleft()
|
self._putters.popleft()
|
||||||
|
|
||||||
def qsize(self):
|
def qsize(self):
|
||||||
|
@ -148,8 +148,9 @@ class Queue:
|
||||||
elif self._maxsize > 0 and self._maxsize <= self.qsize():
|
elif self._maxsize > 0 and self._maxsize <= self.qsize():
|
||||||
waiter = futures.Future(loop=self._loop)
|
waiter = futures.Future(loop=self._loop)
|
||||||
|
|
||||||
self._putters.append((item, waiter))
|
self._putters.append(waiter)
|
||||||
yield from waiter
|
yield from waiter
|
||||||
|
self._put(item)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
self.__put_internal(item)
|
self.__put_internal(item)
|
||||||
|
@ -186,8 +187,7 @@ class Queue:
|
||||||
self._consume_done_putters()
|
self._consume_done_putters()
|
||||||
if self._putters:
|
if self._putters:
|
||||||
assert self.full(), 'queue not full, why are putters waiting?'
|
assert self.full(), 'queue not full, why are putters waiting?'
|
||||||
item, putter = self._putters.popleft()
|
putter = self._putters.popleft()
|
||||||
self.__put_internal(item)
|
|
||||||
|
|
||||||
# When a getter runs and frees up a slot so this putter can
|
# When a getter runs and frees up a slot so this putter can
|
||||||
# run, we need to defer the put for a tick to ensure that
|
# run, we need to defer the put for a tick to ensure that
|
||||||
|
@ -201,9 +201,39 @@ class Queue:
|
||||||
return self._get()
|
return self._get()
|
||||||
else:
|
else:
|
||||||
waiter = futures.Future(loop=self._loop)
|
waiter = futures.Future(loop=self._loop)
|
||||||
|
|
||||||
self._getters.append(waiter)
|
self._getters.append(waiter)
|
||||||
|
try:
|
||||||
return (yield from waiter)
|
return (yield from waiter)
|
||||||
|
except futures.CancelledError:
|
||||||
|
# if we get CancelledError, it means someone cancelled this
|
||||||
|
# get() coroutine. But there is a chance that the waiter
|
||||||
|
# already is ready and contains an item that has just been
|
||||||
|
# removed from the queue. In this case, we need to put the item
|
||||||
|
# back into the front of the queue. This get() must either
|
||||||
|
# succeed without fault or, if it gets cancelled, it must be as
|
||||||
|
# if it never happened.
|
||||||
|
if waiter.done():
|
||||||
|
self._put_it_back(waiter.result())
|
||||||
|
raise
|
||||||
|
|
||||||
|
def _put_it_back(self, item):
|
||||||
|
"""
|
||||||
|
This is called when we have a waiter to get() an item and this waiter
|
||||||
|
gets cancelled. In this case, we put the item back: wake up another
|
||||||
|
waiter or put it in the _queue.
|
||||||
|
"""
|
||||||
|
self._consume_done_getters()
|
||||||
|
if self._getters:
|
||||||
|
assert not self._queue, (
|
||||||
|
'queue non-empty, why are getters waiting?')
|
||||||
|
|
||||||
|
getter = self._getters.popleft()
|
||||||
|
self._put_internal(item)
|
||||||
|
|
||||||
|
# getter cannot be cancelled, we just removed done getters
|
||||||
|
getter.set_result(item)
|
||||||
|
else:
|
||||||
|
self._queue.appendleft(item)
|
||||||
|
|
||||||
def get_nowait(self):
|
def get_nowait(self):
|
||||||
"""Remove and return an item from the queue.
|
"""Remove and return an item from the queue.
|
||||||
|
@ -213,8 +243,7 @@ class Queue:
|
||||||
self._consume_done_putters()
|
self._consume_done_putters()
|
||||||
if self._putters:
|
if self._putters:
|
||||||
assert self.full(), 'queue not full, why are putters waiting?'
|
assert self.full(), 'queue not full, why are putters waiting?'
|
||||||
item, putter = self._putters.popleft()
|
putter = self._putters.popleft()
|
||||||
self.__put_internal(item)
|
|
||||||
# Wake putter on next tick.
|
# Wake putter on next tick.
|
||||||
|
|
||||||
# getter cannot be cancelled, we just removed done putters
|
# getter cannot be cancelled, we just removed done putters
|
||||||
|
|
|
@ -171,7 +171,7 @@ class QueueGetTests(_QueueTestBase):
|
||||||
q.put_nowait(1)
|
q.put_nowait(1)
|
||||||
|
|
||||||
waiter = asyncio.Future(loop=self.loop)
|
waiter = asyncio.Future(loop=self.loop)
|
||||||
q._putters.append((2, waiter))
|
q._putters.append(waiter)
|
||||||
|
|
||||||
res = self.loop.run_until_complete(q.get())
|
res = self.loop.run_until_complete(q.get())
|
||||||
self.assertEqual(1, res)
|
self.assertEqual(1, res)
|
||||||
|
@ -322,6 +322,64 @@ class QueuePutTests(_QueueTestBase):
|
||||||
q.put_nowait(1)
|
q.put_nowait(1)
|
||||||
self.assertEqual(1, q.get_nowait())
|
self.assertEqual(1, q.get_nowait())
|
||||||
|
|
||||||
|
def test_get_cancel_drop(self):
|
||||||
|
def gen():
|
||||||
|
yield 0.01
|
||||||
|
yield 0.1
|
||||||
|
|
||||||
|
loop = self.new_test_loop(gen)
|
||||||
|
|
||||||
|
q = asyncio.Queue(loop=loop)
|
||||||
|
|
||||||
|
reader = loop.create_task(q.get())
|
||||||
|
|
||||||
|
loop.run_until_complete(asyncio.sleep(0.01, loop=loop))
|
||||||
|
|
||||||
|
q.put_nowait(1)
|
||||||
|
q.put_nowait(2)
|
||||||
|
reader.cancel()
|
||||||
|
|
||||||
|
try:
|
||||||
|
loop.run_until_complete(reader)
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
# try again
|
||||||
|
reader = loop.create_task(q.get())
|
||||||
|
loop.run_until_complete(reader)
|
||||||
|
|
||||||
|
result = reader.result()
|
||||||
|
# if we get 2, it means 1 got dropped!
|
||||||
|
self.assertEqual(1, result)
|
||||||
|
|
||||||
|
def test_put_cancel_drop(self):
|
||||||
|
|
||||||
|
def gen():
|
||||||
|
yield 0.01
|
||||||
|
yield 0.1
|
||||||
|
|
||||||
|
loop = self.new_test_loop(gen)
|
||||||
|
q = asyncio.Queue(1, loop=loop)
|
||||||
|
|
||||||
|
q.put_nowait(1)
|
||||||
|
|
||||||
|
# putting a second item in the queue has to block (qsize=1)
|
||||||
|
writer = loop.create_task(q.put(2))
|
||||||
|
loop.run_until_complete(asyncio.sleep(0.01, loop=loop))
|
||||||
|
|
||||||
|
value1 = q.get_nowait()
|
||||||
|
self.assertEqual(value1, 1)
|
||||||
|
|
||||||
|
writer.cancel()
|
||||||
|
try:
|
||||||
|
loop.run_until_complete(writer)
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
# try again
|
||||||
|
writer = loop.create_task(q.put(2))
|
||||||
|
loop.run_until_complete(writer)
|
||||||
|
|
||||||
|
value2 = q.get_nowait()
|
||||||
|
self.assertEqual(value2, 2)
|
||||||
|
self.assertEqual(q.qsize(), 0)
|
||||||
|
|
||||||
def test_nonblocking_put_exception(self):
|
def test_nonblocking_put_exception(self):
|
||||||
q = asyncio.Queue(maxsize=1, loop=self.loop)
|
q = asyncio.Queue(maxsize=1, loop=self.loop)
|
||||||
q.put_nowait(1)
|
q.put_nowait(1)
|
||||||
|
@ -374,6 +432,7 @@ class QueuePutTests(_QueueTestBase):
|
||||||
test_utils.run_briefly(self.loop)
|
test_utils.run_briefly(self.loop)
|
||||||
self.assertTrue(put_c.done())
|
self.assertTrue(put_c.done())
|
||||||
self.assertEqual(q.get_nowait(), 'a')
|
self.assertEqual(q.get_nowait(), 'a')
|
||||||
|
test_utils.run_briefly(self.loop)
|
||||||
self.assertEqual(q.get_nowait(), 'b')
|
self.assertEqual(q.get_nowait(), 'b')
|
||||||
|
|
||||||
self.loop.run_until_complete(put_b)
|
self.loop.run_until_complete(put_b)
|
||||||
|
|
|
@ -37,6 +37,9 @@ Library
|
||||||
|
|
||||||
- Issue #17527: Add PATCH to wsgiref.validator. Patch from Luca Sbardella.
|
- Issue #17527: Add PATCH to wsgiref.validator. Patch from Luca Sbardella.
|
||||||
|
|
||||||
|
- Issue #23812: Fix asyncio.Queue.get() to avoid loosing items on cancellation.
|
||||||
|
Patch by Gustavo J. A. M. Carneiro.
|
||||||
|
|
||||||
Documentation
|
Documentation
|
||||||
-------------
|
-------------
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue