import multiprocessing import sys import time import unittest from concurrent import futures from concurrent.futures._base import ( PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future, ) from concurrent.futures.process import _check_system_limits from test import support from test.support import threading_helper def create_future(state=PENDING, exception=None, result=None): f = Future() f._state = state f._exception = exception f._result = result return f PENDING_FUTURE = create_future(state=PENDING) RUNNING_FUTURE = create_future(state=RUNNING) CANCELLED_FUTURE = create_future(state=CANCELLED) CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED) EXCEPTION_FUTURE = create_future(state=FINISHED, exception=OSError()) SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42) class BaseTestCase(unittest.TestCase): def setUp(self): self._thread_key = threading_helper.threading_setup() def tearDown(self): support.reap_children() threading_helper.threading_cleanup(*self._thread_key) class ExecutorMixin: worker_count = 5 executor_kwargs = {} def setUp(self): super().setUp() self.t1 = time.monotonic() if hasattr(self, "ctx"): self.executor = self.executor_type( max_workers=self.worker_count, mp_context=self.get_context(), **self.executor_kwargs) else: self.executor = self.executor_type( max_workers=self.worker_count, **self.executor_kwargs) def tearDown(self): self.executor.shutdown(wait=True) self.executor = None dt = time.monotonic() - self.t1 if support.verbose: print("%.2fs" % dt, end=' ') self.assertLess(dt, 300, "synchronization issue: test lasted too long") super().tearDown() def get_context(self): return multiprocessing.get_context(self.ctx) class ThreadPoolMixin(ExecutorMixin): executor_type = futures.ThreadPoolExecutor class InterpreterPoolMixin(ExecutorMixin): executor_type = futures.InterpreterPoolExecutor class ProcessPoolForkMixin(ExecutorMixin): executor_type = futures.ProcessPoolExecutor ctx = "fork" def get_context(self): try: _check_system_limits() except NotImplementedError: self.skipTest("ProcessPoolExecutor unavailable on this system") if sys.platform == "win32": self.skipTest("require unix system") if support.check_sanitizer(thread=True): self.skipTest("TSAN doesn't support threads after fork") return super().get_context() class ProcessPoolSpawnMixin(ExecutorMixin): executor_type = futures.ProcessPoolExecutor ctx = "spawn" def get_context(self): try: _check_system_limits() except NotImplementedError: self.skipTest("ProcessPoolExecutor unavailable on this system") return super().get_context() class ProcessPoolForkserverMixin(ExecutorMixin): executor_type = futures.ProcessPoolExecutor ctx = "forkserver" def get_context(self): try: _check_system_limits() except NotImplementedError: self.skipTest("ProcessPoolExecutor unavailable on this system") if sys.platform == "win32": self.skipTest("require unix system") if support.check_sanitizer(thread=True): self.skipTest("TSAN doesn't support threads after fork") return super().get_context() def create_executor_tests(remote_globals, mixin, bases=(BaseTestCase,), executor_mixins=(ThreadPoolMixin, InterpreterPoolMixin, ProcessPoolForkMixin, ProcessPoolForkserverMixin, ProcessPoolSpawnMixin)): def strip_mixin(name): if name.endswith(('Mixin', 'Tests')): return name[:-5] elif name.endswith('Test'): return name[:-4] else: return name module = remote_globals['__name__'] for exe in executor_mixins: name = ("%s%sTest" % (strip_mixin(exe.__name__), strip_mixin(mixin.__name__))) cls = type(name, (mixin,) + (exe,) + bases, {'__module__': module}) remote_globals[name] = cls def setup_module(): try: _check_system_limits() except NotImplementedError: pass else: unittest.addModuleCleanup(multiprocessing.util._cleanup_tests) thread_info = threading_helper.threading_setup() unittest.addModuleCleanup(threading_helper.threading_cleanup, *thread_info)