mirror of https://github.com/python/cpython
153 lines
4.7 KiB
Python
153 lines
4.7 KiB
Python
import contextlib
|
|
import logging
|
|
import queue
|
|
import time
|
|
import unittest
|
|
import sys
|
|
import io
|
|
from concurrent.futures._base import BrokenExecutor
|
|
from concurrent.futures.process import _check_system_limits
|
|
|
|
from logging.handlers import QueueHandler
|
|
|
|
from test import support
|
|
|
|
from .util import ExecutorMixin, create_executor_tests, setup_module
|
|
|
|
|
|
INITIALIZER_STATUS = 'uninitialized'
|
|
|
|
def init(x):
|
|
global INITIALIZER_STATUS
|
|
INITIALIZER_STATUS = x
|
|
|
|
def get_init_status():
|
|
return INITIALIZER_STATUS
|
|
|
|
def init_fail(log_queue=None):
|
|
if log_queue is not None:
|
|
logger = logging.getLogger('concurrent.futures')
|
|
logger.addHandler(QueueHandler(log_queue))
|
|
logger.setLevel('CRITICAL')
|
|
logger.propagate = False
|
|
time.sleep(0.1) # let some futures be scheduled
|
|
raise ValueError('error in initializer')
|
|
|
|
|
|
class InitializerMixin(ExecutorMixin):
|
|
worker_count = 2
|
|
|
|
def setUp(self):
|
|
global INITIALIZER_STATUS
|
|
INITIALIZER_STATUS = 'uninitialized'
|
|
self.executor_kwargs = dict(initializer=init,
|
|
initargs=('initialized',))
|
|
super().setUp()
|
|
|
|
def test_initializer(self):
|
|
futures = [self.executor.submit(get_init_status)
|
|
for _ in range(self.worker_count)]
|
|
|
|
for f in futures:
|
|
self.assertEqual(f.result(), 'initialized')
|
|
|
|
|
|
class FailingInitializerMixin(ExecutorMixin):
|
|
worker_count = 2
|
|
|
|
def setUp(self):
|
|
if hasattr(self, "ctx"):
|
|
# Pass a queue to redirect the child's logging output
|
|
self.mp_context = self.get_context()
|
|
self.log_queue = self.mp_context.Queue()
|
|
self.executor_kwargs = dict(initializer=init_fail,
|
|
initargs=(self.log_queue,))
|
|
else:
|
|
# In a thread pool, the child shares our logging setup
|
|
# (see _assert_logged())
|
|
self.mp_context = None
|
|
self.log_queue = None
|
|
self.executor_kwargs = dict(initializer=init_fail)
|
|
super().setUp()
|
|
|
|
def test_initializer(self):
|
|
with self._assert_logged('ValueError: error in initializer'):
|
|
try:
|
|
future = self.executor.submit(get_init_status)
|
|
except BrokenExecutor:
|
|
# Perhaps the executor is already broken
|
|
pass
|
|
else:
|
|
with self.assertRaises(BrokenExecutor):
|
|
future.result()
|
|
|
|
# At some point, the executor should break
|
|
for _ in support.sleeping_retry(support.SHORT_TIMEOUT,
|
|
"executor not broken"):
|
|
if self.executor._broken:
|
|
break
|
|
|
|
# ... and from this point submit() is guaranteed to fail
|
|
with self.assertRaises(BrokenExecutor):
|
|
self.executor.submit(get_init_status)
|
|
|
|
@contextlib.contextmanager
|
|
def _assert_logged(self, msg):
|
|
if self.log_queue is not None:
|
|
yield
|
|
output = []
|
|
try:
|
|
while True:
|
|
output.append(self.log_queue.get_nowait().getMessage())
|
|
except queue.Empty:
|
|
pass
|
|
else:
|
|
with self.assertLogs('concurrent.futures', 'CRITICAL') as cm:
|
|
yield
|
|
output = cm.output
|
|
self.assertTrue(any(msg in line for line in output),
|
|
output)
|
|
|
|
|
|
create_executor_tests(globals(), InitializerMixin)
|
|
create_executor_tests(globals(), FailingInitializerMixin)
|
|
|
|
|
|
@unittest.skipIf(sys.platform == "win32", "Resource Tracker doesn't run on Windows")
|
|
class FailingInitializerResourcesTest(unittest.TestCase):
|
|
"""
|
|
Source: https://github.com/python/cpython/issues/104090
|
|
"""
|
|
|
|
def _test(self, test_class):
|
|
try:
|
|
_check_system_limits()
|
|
except NotImplementedError:
|
|
self.skipTest("ProcessPoolExecutor unavailable on this system")
|
|
|
|
runner = unittest.TextTestRunner(stream=io.StringIO())
|
|
runner.run(test_class('test_initializer'))
|
|
|
|
# GH-104090:
|
|
# Stop resource tracker manually now, so we can verify there are not leaked resources by checking
|
|
# the process exit code
|
|
from multiprocessing.resource_tracker import _resource_tracker
|
|
_resource_tracker._stop()
|
|
|
|
self.assertEqual(_resource_tracker._exitcode, 0)
|
|
|
|
def test_spawn(self):
|
|
self._test(ProcessPoolSpawnFailingInitializerTest)
|
|
|
|
@support.skip_if_sanitizer("TSAN doesn't support threads after fork", thread=True)
|
|
def test_forkserver(self):
|
|
self._test(ProcessPoolForkserverFailingInitializerTest)
|
|
|
|
|
|
def setUpModule():
|
|
setup_module()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|