Fix a c.f.as_completed() refleak previously introduced in bpo-27144 (#3270)
This commit is contained in:
parent
bca4939d80
commit
2ef37607b7
|
@ -171,15 +171,24 @@ def _create_and_install_waiters(fs, return_when):
|
||||||
return waiter
|
return waiter
|
||||||
|
|
||||||
|
|
||||||
def _yield_and_decref(fs, ref_collect):
|
def _yield_finished_futures(fs, waiter, ref_collect):
|
||||||
"""
|
"""
|
||||||
Iterate on the list *fs*, yielding objects one by one in reverse order.
|
Iterate on the list *fs*, yielding finished futures one by one in
|
||||||
Before yielding an object, it is removed from each set in
|
reverse order.
|
||||||
the collection of sets *ref_collect*.
|
Before yielding a future, *waiter* is removed from its waiters
|
||||||
|
and the future is removed from each set in the collection of sets
|
||||||
|
*ref_collect*.
|
||||||
|
|
||||||
|
The aim of this function is to avoid keeping stale references after
|
||||||
|
the future is yielded and before the iterator resumes.
|
||||||
"""
|
"""
|
||||||
while fs:
|
while fs:
|
||||||
|
f = fs[-1]
|
||||||
for futures_set in ref_collect:
|
for futures_set in ref_collect:
|
||||||
futures_set.remove(fs[-1])
|
futures_set.remove(f)
|
||||||
|
with f._condition:
|
||||||
|
f._waiters.remove(waiter)
|
||||||
|
del f
|
||||||
# Careful not to keep a reference to the popped value
|
# Careful not to keep a reference to the popped value
|
||||||
yield fs.pop()
|
yield fs.pop()
|
||||||
|
|
||||||
|
@ -216,7 +225,8 @@ def as_completed(fs, timeout=None):
|
||||||
waiter = _create_and_install_waiters(fs, _AS_COMPLETED)
|
waiter = _create_and_install_waiters(fs, _AS_COMPLETED)
|
||||||
finished = list(finished)
|
finished = list(finished)
|
||||||
try:
|
try:
|
||||||
yield from _yield_and_decref(finished, ref_collect=(fs,))
|
yield from _yield_finished_futures(finished, waiter,
|
||||||
|
ref_collect=(fs,))
|
||||||
|
|
||||||
while pending:
|
while pending:
|
||||||
if timeout is None:
|
if timeout is None:
|
||||||
|
@ -237,9 +247,11 @@ def as_completed(fs, timeout=None):
|
||||||
|
|
||||||
# reverse to keep finishing order
|
# reverse to keep finishing order
|
||||||
finished.reverse()
|
finished.reverse()
|
||||||
yield from _yield_and_decref(finished, ref_collect=(fs, pending))
|
yield from _yield_finished_futures(finished, waiter,
|
||||||
|
ref_collect=(fs, pending))
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
|
# Remove waiter from unfinished futures
|
||||||
for f in fs:
|
for f in fs:
|
||||||
with f._condition:
|
with f._condition:
|
||||||
f._waiters.remove(waiter)
|
f._waiters.remove(waiter)
|
||||||
|
|
|
@ -405,7 +405,7 @@ class AsCompletedTests:
|
||||||
# to finished futures.
|
# to finished futures.
|
||||||
futures_list = [Future() for _ in range(8)]
|
futures_list = [Future() for _ in range(8)]
|
||||||
futures_list.append(create_future(state=CANCELLED_AND_NOTIFIED))
|
futures_list.append(create_future(state=CANCELLED_AND_NOTIFIED))
|
||||||
futures_list.append(create_future(state=SUCCESSFUL_FUTURE))
|
futures_list.append(create_future(state=FINISHED, result=42))
|
||||||
|
|
||||||
with self.assertRaises(futures.TimeoutError):
|
with self.assertRaises(futures.TimeoutError):
|
||||||
for future in futures.as_completed(futures_list, timeout=0):
|
for future in futures.as_completed(futures_list, timeout=0):
|
||||||
|
|
Loading…
Reference in New Issue