bpo-27144: concurrent.futures as_complete and map iterators do not keep reference to returned object (#1560)

* bpo-27144: concurrent.futures as_complie and map iterators do not keep
reference to returned object

* Some nits.  Improve wordings in docstrings and comments, and avoid relying on
sys.getrefcount() in tests.
This commit is contained in:
Grzegorz Grzywacz 2017-09-01 18:54:00 +02:00 committed by Antoine Pitrou
parent 16432beadb
commit 97e1b1c814
4 changed files with 91 additions and 10 deletions

View File

@ -170,6 +170,20 @@ def _create_and_install_waiters(fs, return_when):
return waiter return waiter
def _yield_and_decref(fs, ref_collect):
"""
Iterate on the list *fs*, yielding objects one by one in reverse order.
Before yielding an object, it is removed from each set in
the collection of sets *ref_collect*.
"""
while fs:
for futures_set in ref_collect:
futures_set.remove(fs[-1])
# Careful not to keep a reference to the popped value
yield fs.pop()
def as_completed(fs, timeout=None): def as_completed(fs, timeout=None):
"""An iterator over the given futures that yields each as it completes. """An iterator over the given futures that yields each as it completes.
@ -191,6 +205,8 @@ def as_completed(fs, timeout=None):
if timeout is not None: if timeout is not None:
end_time = timeout + time.time() end_time = timeout + time.time()
total_futures = len(fs)
fs = set(fs) fs = set(fs)
with _AcquireFutures(fs): with _AcquireFutures(fs):
finished = set( finished = set(
@ -198,9 +214,9 @@ def as_completed(fs, timeout=None):
if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]) if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
pending = fs - finished pending = fs - finished
waiter = _create_and_install_waiters(fs, _AS_COMPLETED) waiter = _create_and_install_waiters(fs, _AS_COMPLETED)
finished = list(finished)
try: try:
yield from finished yield from _yield_and_decref(finished, ref_collect=(fs,))
while pending: while pending:
if timeout is None: if timeout is None:
@ -210,7 +226,7 @@ def as_completed(fs, timeout=None):
if wait_timeout < 0: if wait_timeout < 0:
raise TimeoutError( raise TimeoutError(
'%d (of %d) futures unfinished' % ( '%d (of %d) futures unfinished' % (
len(pending), len(fs))) len(pending), total_futures))
waiter.event.wait(wait_timeout) waiter.event.wait(wait_timeout)
@ -219,9 +235,9 @@ def as_completed(fs, timeout=None):
waiter.finished_futures = [] waiter.finished_futures = []
waiter.event.clear() waiter.event.clear()
for future in finished: # reverse to keep finishing order
yield future finished.reverse()
pending.remove(future) yield from _yield_and_decref(finished, ref_collect=(fs, pending))
finally: finally:
for f in fs: for f in fs:
@ -551,11 +567,14 @@ class Executor(object):
# before the first iterator value is required. # before the first iterator value is required.
def result_iterator(): def result_iterator():
try: try:
for future in fs: # reverse to keep finishing order
fs.reverse()
while fs:
# Careful not to keep a reference to the popped future
if timeout is None: if timeout is None:
yield future.result() yield fs.pop().result()
else: else:
yield future.result(end_time - time.time()) yield fs.pop().result(end_time - time.time())
finally: finally:
for future in fs: for future in fs:
future.cancel() future.cancel()

View File

@ -357,6 +357,18 @@ def _check_system_limits():
raise NotImplementedError(_system_limited) raise NotImplementedError(_system_limited)
def _chain_from_iterable_of_lists(iterable):
"""
Specialized implementation of itertools.chain.from_iterable.
Each item in *iterable* should be a list. This function is
careful not to keep references to yielded objects.
"""
for element in iterable:
element.reverse()
while element:
yield element.pop()
class BrokenProcessPool(RuntimeError): class BrokenProcessPool(RuntimeError):
""" """
Raised when a process in a ProcessPoolExecutor terminated abruptly Raised when a process in a ProcessPoolExecutor terminated abruptly
@ -482,7 +494,7 @@ class ProcessPoolExecutor(_base.Executor):
results = super().map(partial(_process_chunk, fn), results = super().map(partial(_process_chunk, fn),
_get_chunks(*iterables, chunksize=chunksize), _get_chunks(*iterables, chunksize=chunksize),
timeout=timeout) timeout=timeout)
return itertools.chain.from_iterable(results) return _chain_from_iterable_of_lists(results)
def shutdown(self, wait=True): def shutdown(self, wait=True):
with self._shutdown_lock: with self._shutdown_lock:

View File

@ -59,6 +59,10 @@ class MyObject(object):
pass pass
def make_dummy_object(_):
return MyObject()
class BaseTestCase(unittest.TestCase): class BaseTestCase(unittest.TestCase):
def setUp(self): def setUp(self):
self._thread_key = test.support.threading_setup() self._thread_key = test.support.threading_setup()
@ -396,6 +400,38 @@ class AsCompletedTests:
completed = [f for f in futures.as_completed([future1,future1])] completed = [f for f in futures.as_completed([future1,future1])]
self.assertEqual(len(completed), 1) self.assertEqual(len(completed), 1)
def test_free_reference_yielded_future(self):
# Issue #14406: Generator should not keep references
# to finished futures.
futures_list = [Future() for _ in range(8)]
futures_list.append(create_future(state=CANCELLED_AND_NOTIFIED))
futures_list.append(create_future(state=SUCCESSFUL_FUTURE))
with self.assertRaises(futures.TimeoutError):
for future in futures.as_completed(futures_list, timeout=0):
futures_list.remove(future)
wr = weakref.ref(future)
del future
self.assertIsNone(wr())
futures_list[0].set_result("test")
for future in futures.as_completed(futures_list):
futures_list.remove(future)
wr = weakref.ref(future)
del future
self.assertIsNone(wr())
if futures_list:
futures_list[0].set_result("test")
def test_correct_timeout_exception_msg(self):
futures_list = [CANCELLED_AND_NOTIFIED_FUTURE, PENDING_FUTURE,
RUNNING_FUTURE, SUCCESSFUL_FUTURE]
with self.assertRaises(futures.TimeoutError) as cm:
list(futures.as_completed(futures_list, timeout=0))
self.assertEqual(str(cm.exception), '2 (of 4) futures unfinished')
class ThreadPoolAsCompletedTests(ThreadPoolMixin, AsCompletedTests, BaseTestCase): class ThreadPoolAsCompletedTests(ThreadPoolMixin, AsCompletedTests, BaseTestCase):
pass pass
@ -421,6 +457,10 @@ class ExecutorTest:
list(self.executor.map(pow, range(10), range(10))), list(self.executor.map(pow, range(10), range(10))),
list(map(pow, range(10), range(10)))) list(map(pow, range(10), range(10))))
self.assertEqual(
list(self.executor.map(pow, range(10), range(10), chunksize=3)),
list(map(pow, range(10), range(10))))
def test_map_exception(self): def test_map_exception(self):
i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5]) i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5])
self.assertEqual(i.__next__(), (0, 1)) self.assertEqual(i.__next__(), (0, 1))
@ -471,6 +511,14 @@ class ExecutorTest:
"than 0"): "than 0"):
self.executor_type(max_workers=number) self.executor_type(max_workers=number)
def test_free_reference(self):
# Issue #14406: Result iterator should not keep an internal
# reference to result objects.
for obj in self.executor.map(make_dummy_object, range(10)):
wr = weakref.ref(obj)
del obj
self.assertIsNone(wr())
class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest, BaseTestCase): class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest, BaseTestCase):
def test_map_submits_without_iteration(self): def test_map_submits_without_iteration(self):

View File

@ -0,0 +1,2 @@
The ``map()`` and ``as_completed()`` iterators in ``concurrent.futures``
now avoid keeping a reference to yielded objects.