Issue #23812: Fix asyncio.Queue.get() to avoid loosing items on cancellation.
Patch by Gustavo J. A. M. Carneiro.
This commit is contained in:
parent
91e561aa77
commit
3fc0f2d288
|
@ -47,7 +47,7 @@ class Queue:
|
|||
|
||||
# Futures.
|
||||
self._getters = collections.deque()
|
||||
# Pairs of (item, Future).
|
||||
# Futures
|
||||
self._putters = collections.deque()
|
||||
self._unfinished_tasks = 0
|
||||
self._finished = locks.Event(loop=self._loop)
|
||||
|
@ -98,7 +98,7 @@ class Queue:
|
|||
|
||||
def _consume_done_putters(self):
|
||||
# Delete waiters at the head of the put() queue who've timed out.
|
||||
while self._putters and self._putters[0][1].done():
|
||||
while self._putters and self._putters[0].done():
|
||||
self._putters.popleft()
|
||||
|
||||
def qsize(self):
|
||||
|
@ -148,8 +148,9 @@ class Queue:
|
|||
elif self._maxsize > 0 and self._maxsize <= self.qsize():
|
||||
waiter = futures.Future(loop=self._loop)
|
||||
|
||||
self._putters.append((item, waiter))
|
||||
self._putters.append(waiter)
|
||||
yield from waiter
|
||||
self._put(item)
|
||||
|
||||
else:
|
||||
self.__put_internal(item)
|
||||
|
@ -186,8 +187,7 @@ class Queue:
|
|||
self._consume_done_putters()
|
||||
if self._putters:
|
||||
assert self.full(), 'queue not full, why are putters waiting?'
|
||||
item, putter = self._putters.popleft()
|
||||
self.__put_internal(item)
|
||||
putter = self._putters.popleft()
|
||||
|
||||
# When a getter runs and frees up a slot so this putter can
|
||||
# run, we need to defer the put for a tick to ensure that
|
||||
|
@ -201,9 +201,39 @@ class Queue:
|
|||
return self._get()
|
||||
else:
|
||||
waiter = futures.Future(loop=self._loop)
|
||||
|
||||
self._getters.append(waiter)
|
||||
try:
|
||||
return (yield from waiter)
|
||||
except futures.CancelledError:
|
||||
# if we get CancelledError, it means someone cancelled this
|
||||
# get() coroutine. But there is a chance that the waiter
|
||||
# already is ready and contains an item that has just been
|
||||
# removed from the queue. In this case, we need to put the item
|
||||
# back into the front of the queue. This get() must either
|
||||
# succeed without fault or, if it gets cancelled, it must be as
|
||||
# if it never happened.
|
||||
if waiter.done():
|
||||
self._put_it_back(waiter.result())
|
||||
raise
|
||||
|
||||
def _put_it_back(self, item):
|
||||
"""
|
||||
This is called when we have a waiter to get() an item and this waiter
|
||||
gets cancelled. In this case, we put the item back: wake up another
|
||||
waiter or put it in the _queue.
|
||||
"""
|
||||
self._consume_done_getters()
|
||||
if self._getters:
|
||||
assert not self._queue, (
|
||||
'queue non-empty, why are getters waiting?')
|
||||
|
||||
getter = self._getters.popleft()
|
||||
self._put_internal(item)
|
||||
|
||||
# getter cannot be cancelled, we just removed done getters
|
||||
getter.set_result(item)
|
||||
else:
|
||||
self._queue.appendleft(item)
|
||||
|
||||
def get_nowait(self):
|
||||
"""Remove and return an item from the queue.
|
||||
|
@ -213,8 +243,7 @@ class Queue:
|
|||
self._consume_done_putters()
|
||||
if self._putters:
|
||||
assert self.full(), 'queue not full, why are putters waiting?'
|
||||
item, putter = self._putters.popleft()
|
||||
self.__put_internal(item)
|
||||
putter = self._putters.popleft()
|
||||
# Wake putter on next tick.
|
||||
|
||||
# getter cannot be cancelled, we just removed done putters
|
||||
|
|
|
@ -171,7 +171,7 @@ class QueueGetTests(_QueueTestBase):
|
|||
q.put_nowait(1)
|
||||
|
||||
waiter = asyncio.Future(loop=self.loop)
|
||||
q._putters.append((2, waiter))
|
||||
q._putters.append(waiter)
|
||||
|
||||
res = self.loop.run_until_complete(q.get())
|
||||
self.assertEqual(1, res)
|
||||
|
@ -322,6 +322,64 @@ class QueuePutTests(_QueueTestBase):
|
|||
q.put_nowait(1)
|
||||
self.assertEqual(1, q.get_nowait())
|
||||
|
||||
def test_get_cancel_drop(self):
|
||||
def gen():
|
||||
yield 0.01
|
||||
yield 0.1
|
||||
|
||||
loop = self.new_test_loop(gen)
|
||||
|
||||
q = asyncio.Queue(loop=loop)
|
||||
|
||||
reader = loop.create_task(q.get())
|
||||
|
||||
loop.run_until_complete(asyncio.sleep(0.01, loop=loop))
|
||||
|
||||
q.put_nowait(1)
|
||||
q.put_nowait(2)
|
||||
reader.cancel()
|
||||
|
||||
try:
|
||||
loop.run_until_complete(reader)
|
||||
except asyncio.CancelledError:
|
||||
# try again
|
||||
reader = loop.create_task(q.get())
|
||||
loop.run_until_complete(reader)
|
||||
|
||||
result = reader.result()
|
||||
# if we get 2, it means 1 got dropped!
|
||||
self.assertEqual(1, result)
|
||||
|
||||
def test_put_cancel_drop(self):
|
||||
|
||||
def gen():
|
||||
yield 0.01
|
||||
yield 0.1
|
||||
|
||||
loop = self.new_test_loop(gen)
|
||||
q = asyncio.Queue(1, loop=loop)
|
||||
|
||||
q.put_nowait(1)
|
||||
|
||||
# putting a second item in the queue has to block (qsize=1)
|
||||
writer = loop.create_task(q.put(2))
|
||||
loop.run_until_complete(asyncio.sleep(0.01, loop=loop))
|
||||
|
||||
value1 = q.get_nowait()
|
||||
self.assertEqual(value1, 1)
|
||||
|
||||
writer.cancel()
|
||||
try:
|
||||
loop.run_until_complete(writer)
|
||||
except asyncio.CancelledError:
|
||||
# try again
|
||||
writer = loop.create_task(q.put(2))
|
||||
loop.run_until_complete(writer)
|
||||
|
||||
value2 = q.get_nowait()
|
||||
self.assertEqual(value2, 2)
|
||||
self.assertEqual(q.qsize(), 0)
|
||||
|
||||
def test_nonblocking_put_exception(self):
|
||||
q = asyncio.Queue(maxsize=1, loop=self.loop)
|
||||
q.put_nowait(1)
|
||||
|
@ -374,6 +432,7 @@ class QueuePutTests(_QueueTestBase):
|
|||
test_utils.run_briefly(self.loop)
|
||||
self.assertTrue(put_c.done())
|
||||
self.assertEqual(q.get_nowait(), 'a')
|
||||
test_utils.run_briefly(self.loop)
|
||||
self.assertEqual(q.get_nowait(), 'b')
|
||||
|
||||
self.loop.run_until_complete(put_b)
|
||||
|
|
Loading…
Reference in New Issue