mirror of https://github.com/python/cpython
gh-94440: Fix issue of ProcessPoolExecutor shutdown hanging (#94468)
Fix an issue of concurrent.futures ProcessPoolExecutor shutdown hanging. Co-authored-by: Alex Waygood <Alex.Waygood@Gmail.com>
This commit is contained in:
parent
a44553ea9f
commit
2dc94634b5
|
@ -366,6 +366,11 @@ class _ExecutorManagerThread(threading.Thread):
|
||||||
if self.is_shutting_down():
|
if self.is_shutting_down():
|
||||||
self.flag_executor_shutting_down()
|
self.flag_executor_shutting_down()
|
||||||
|
|
||||||
|
# When only canceled futures remain in pending_work_items, our
|
||||||
|
# next call to wait_result_broken_or_wakeup would hang forever.
|
||||||
|
# This makes sure we have some running futures or none at all.
|
||||||
|
self.add_call_item_to_queue()
|
||||||
|
|
||||||
# Since no new work items can be added, it is safe to shutdown
|
# Since no new work items can be added, it is safe to shutdown
|
||||||
# this thread if there are no pending work items.
|
# this thread if there are no pending work items.
|
||||||
if not self.pending_work_items:
|
if not self.pending_work_items:
|
||||||
|
|
|
@ -14,6 +14,7 @@ import logging
|
||||||
from logging.handlers import QueueHandler
|
from logging.handlers import QueueHandler
|
||||||
import os
|
import os
|
||||||
import queue
|
import queue
|
||||||
|
import signal
|
||||||
import sys
|
import sys
|
||||||
import threading
|
import threading
|
||||||
import time
|
import time
|
||||||
|
@ -397,6 +398,33 @@ class ExecutorShutdownTest:
|
||||||
self.assertFalse(err)
|
self.assertFalse(err)
|
||||||
self.assertEqual(out.strip(), b"apple")
|
self.assertEqual(out.strip(), b"apple")
|
||||||
|
|
||||||
|
def test_hang_gh94440(self):
|
||||||
|
"""shutdown(wait=True) doesn't hang when a future was submitted and
|
||||||
|
quickly canceled right before shutdown.
|
||||||
|
|
||||||
|
See https://github.com/python/cpython/issues/94440.
|
||||||
|
"""
|
||||||
|
if not hasattr(signal, 'alarm'):
|
||||||
|
raise unittest.SkipTest(
|
||||||
|
"Tested platform does not support the alarm signal")
|
||||||
|
|
||||||
|
def timeout(_signum, _frame):
|
||||||
|
raise RuntimeError("timed out waiting for shutdown")
|
||||||
|
|
||||||
|
kwargs = {}
|
||||||
|
if getattr(self, 'ctx', None):
|
||||||
|
kwargs['mp_context'] = self.get_context()
|
||||||
|
executor = self.executor_type(max_workers=1, **kwargs)
|
||||||
|
executor.submit(int).result()
|
||||||
|
old_handler = signal.signal(signal.SIGALRM, timeout)
|
||||||
|
try:
|
||||||
|
signal.alarm(5)
|
||||||
|
executor.submit(int).cancel()
|
||||||
|
executor.shutdown(wait=True)
|
||||||
|
finally:
|
||||||
|
signal.alarm(0)
|
||||||
|
signal.signal(signal.SIGALRM, old_handler)
|
||||||
|
|
||||||
|
|
||||||
class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest, BaseTestCase):
|
class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest, BaseTestCase):
|
||||||
def test_threads_terminate(self):
|
def test_threads_terminate(self):
|
||||||
|
|
|
@ -1385,6 +1385,7 @@ Thomas Perl
|
||||||
Mathieu Perreault
|
Mathieu Perreault
|
||||||
Mark Perrego
|
Mark Perrego
|
||||||
Trevor Perrin
|
Trevor Perrin
|
||||||
|
Yonatan Perry
|
||||||
Gabriel de Perthuis
|
Gabriel de Perthuis
|
||||||
Tim Peters
|
Tim Peters
|
||||||
Benjamin Peterson
|
Benjamin Peterson
|
||||||
|
|
|
@ -0,0 +1,2 @@
|
||||||
|
Fix a :mod:`concurrent.futures.process` bug where ``ProcessPoolExecutor`` shutdown
|
||||||
|
could hang after a future has been quickly submitted and canceled.
|
Loading…
Reference in New Issue