Issue #10798: Reject supporting concurrent.futures if the system has

too few POSIX semaphores.
This commit is contained in:
Martin v. Löwis 2011-01-03 00:07:01 +00:00
parent e10608cf5d
commit 9f6d48ba4e
3 changed files with 87 additions and 72 deletions

View File

@ -244,6 +244,31 @@ def _queue_manangement_worker(executor_reference,
else: else:
work_item.future.set_result(result_item.result) work_item.future.set_result(result_item.result)
_system_limits_checked = False
_system_limited = None
def _check_system_limits():
global _system_limits_checked, _system_limited
if _system_limits_checked:
if _system_limited:
raise NotImplementedError(_system_limited)
_system_limits_checked = True
try:
import os
nsems_max = os.sysconf("SC_SEM_NSEMS_MAX")
except (AttributeError, ValueError):
# sysconf not available or setting not available
return
if nsems_max == -1:
# indetermine limit, assume that limit is determined
# by available memory only
return
if nsems_max >= 256:
# minimum number of semaphores available
# according to POSIX
return
_system_limited = "system provides too few semaphores (%d available, 256 necessary)" % nsems_max
raise NotImplementedError(_system_limited)
class ProcessPoolExecutor(_base.Executor): class ProcessPoolExecutor(_base.Executor):
def __init__(self, max_workers=None): def __init__(self, max_workers=None):
"""Initializes a new ProcessPoolExecutor instance. """Initializes a new ProcessPoolExecutor instance.
@ -253,6 +278,7 @@ class ProcessPoolExecutor(_base.Executor):
execute the given calls. If None or not given then as many execute the given calls. If None or not given then as many
worker processes will be created as the machine has processors. worker processes will be created as the machine has processors.
""" """
_check_system_limits()
_remove_dead_thread_references() _remove_dead_thread_references()
if max_workers is None: if max_workers is None:

View File

@ -69,7 +69,7 @@ class Call(object):
assert handle is not None assert handle is not None
return handle return handle
else: else:
event = multiprocessing.Event() event = self.Event[0]()
self.CALL_LOCKS[id(event)] = event self.CALL_LOCKS[id(event)] = event
return id(event) return id(event)
@ -99,7 +99,8 @@ class Call(object):
else: else:
self.CALL_LOCKS[handle].set() self.CALL_LOCKS[handle].set()
def __init__(self, manual_finish=False, result=42): def __init__(self, Event, manual_finish=False, result=42):
self.Event = Event
self._called_event = self._create_event() self._called_event = self._create_event()
self._can_finish = self._create_event() self._can_finish = self._create_event()
@ -138,8 +139,8 @@ class ExceptionCall(Call):
raise ZeroDivisionError() raise ZeroDivisionError()
class MapCall(Call): class MapCall(Call):
def __init__(self, result=42): def __init__(self, Event, result=42):
super().__init__(manual_finish=True, result=result) super().__init__(Event, manual_finish=True, result=result)
def __call__(self, manual_finish): def __call__(self, manual_finish):
if manual_finish: if manual_finish:
@ -155,9 +156,9 @@ class ExecutorShutdownTest(unittest.TestCase):
def _start_some_futures(self): def _start_some_futures(self):
call1 = Call(manual_finish=True) call1 = Call(self.Event, manual_finish=True)
call2 = Call(manual_finish=True) call2 = Call(self.Event, manual_finish=True)
call3 = Call(manual_finish=True) call3 = Call(self.Event, manual_finish=True)
try: try:
self.executor.submit(call1) self.executor.submit(call1)
@ -176,13 +177,28 @@ class ExecutorShutdownTest(unittest.TestCase):
call2.close() call2.close()
call3.close() call3.close()
class ThreadPoolShutdownTest(ExecutorShutdownTest): class ThreadPoolMixin:
# wrap in tuple to prevent creation of instance methods
Event = (threading.Event,)
def setUp(self): def setUp(self):
self.executor = futures.ThreadPoolExecutor(max_workers=5) self.executor = futures.ThreadPoolExecutor(max_workers=5)
def tearDown(self): def tearDown(self):
self.executor.shutdown(wait=True) self.executor.shutdown(wait=True)
class ProcessPoolMixin:
# wrap in tuple to prevent creation of instance methods
Event = (multiprocessing.Event,)
def setUp(self):
try:
self.executor = futures.ProcessPoolExecutor(max_workers=5)
except NotImplementedError as e:
self.skipTest(str(e))
def tearDown(self):
self.executor.shutdown(wait=True)
class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest):
def test_threads_terminate(self): def test_threads_terminate(self):
self._start_some_futures() self._start_some_futures()
self.assertEqual(len(self.executor._threads), 3) self.assertEqual(len(self.executor._threads), 3)
@ -208,13 +224,7 @@ class ThreadPoolShutdownTest(ExecutorShutdownTest):
for t in threads: for t in threads:
t.join() t.join()
class ProcessPoolShutdownTest(ExecutorShutdownTest): class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest):
def setUp(self):
self.executor = futures.ProcessPoolExecutor(max_workers=5)
def tearDown(self):
self.executor.shutdown(wait=True)
def test_processes_terminate(self): def test_processes_terminate(self):
self._start_some_futures() self._start_some_futures()
self.assertEqual(len(self.executor._processes), 5) self.assertEqual(len(self.executor._processes), 5)
@ -251,8 +261,8 @@ class WaitTests(unittest.TestCase):
pass pass
call1.set_can() call1.set_can()
call1 = Call(manual_finish=True) call1 = Call(self.Event, manual_finish=True)
call2 = Call(manual_finish=True) call2 = Call(self.Event, manual_finish=True)
try: try:
future1 = self.executor.submit(call1) future1 = self.executor.submit(call1)
future2 = self.executor.submit(call2) future2 = self.executor.submit(call2)
@ -270,7 +280,7 @@ class WaitTests(unittest.TestCase):
call2.close() call2.close()
def test_first_completed_one_already_completed(self): def test_first_completed_one_already_completed(self):
call1 = Call(manual_finish=True) call1 = Call(self.Event, manual_finish=True)
try: try:
future1 = self.executor.submit(call1) future1 = self.executor.submit(call1)
@ -290,9 +300,9 @@ class WaitTests(unittest.TestCase):
call1.set_can() call1.set_can()
call2.set_can() call2.set_can()
call1 = Call(manual_finish=True) call1 = Call(self.Event, manual_finish=True)
call2 = ExceptionCall(manual_finish=True) call2 = ExceptionCall(self.Event, manual_finish=True)
call3 = Call(manual_finish=True) call3 = Call(self.Event, manual_finish=True)
try: try:
future1 = self.executor.submit(call1) future1 = self.executor.submit(call1)
future2 = self.executor.submit(call2) future2 = self.executor.submit(call2)
@ -317,8 +327,8 @@ class WaitTests(unittest.TestCase):
pass pass
call1.set_can() call1.set_can()
call1 = ExceptionCall(manual_finish=True) call1 = ExceptionCall(self.Event, manual_finish=True)
call2 = Call(manual_finish=True) call2 = Call(self.Event, manual_finish=True)
try: try:
future1 = self.executor.submit(call1) future1 = self.executor.submit(call1)
future2 = self.executor.submit(call2) future2 = self.executor.submit(call2)
@ -343,7 +353,7 @@ class WaitTests(unittest.TestCase):
call2.close() call2.close()
def test_first_exception_one_already_failed(self): def test_first_exception_one_already_failed(self):
call1 = Call(manual_finish=True) call1 = Call(self.Event, manual_finish=True)
try: try:
future1 = self.executor.submit(call1) future1 = self.executor.submit(call1)
@ -363,8 +373,8 @@ class WaitTests(unittest.TestCase):
call1.set_can() call1.set_can()
call2.set_can() call2.set_can()
call1 = Call(manual_finish=True) call1 = Call(self.Event, manual_finish=True)
call2 = Call(manual_finish=True) call2 = Call(self.Event, manual_finish=True)
try: try:
future1 = self.executor.submit(call1) future1 = self.executor.submit(call1)
future2 = self.executor.submit(call2) future2 = self.executor.submit(call2)
@ -397,10 +407,10 @@ class WaitTests(unittest.TestCase):
'this test assumes that future4 will be cancelled before it is ' 'this test assumes that future4 will be cancelled before it is '
'queued to run - which might not be the case if ' 'queued to run - which might not be the case if '
'ProcessPoolExecutor is too aggresive in scheduling futures') 'ProcessPoolExecutor is too aggresive in scheduling futures')
call1 = Call(manual_finish=True) call1 = Call(self.Event, manual_finish=True)
call2 = Call(manual_finish=True) call2 = Call(self.Event, manual_finish=True)
call3 = Call(manual_finish=True) call3 = Call(self.Event, manual_finish=True)
call4 = Call(manual_finish=True) call4 = Call(self.Event, manual_finish=True)
try: try:
future1 = self.executor.submit(call1) future1 = self.executor.submit(call1)
future2 = self.executor.submit(call2) future2 = self.executor.submit(call2)
@ -432,8 +442,8 @@ class WaitTests(unittest.TestCase):
pass pass
call1.set_can() call1.set_can()
call1 = Call(manual_finish=True) call1 = Call(self.Event, manual_finish=True)
call2 = Call(manual_finish=True) call2 = Call(self.Event, manual_finish=True)
try: try:
future1 = self.executor.submit(call1) future1 = self.executor.submit(call1)
future2 = self.executor.submit(call2) future2 = self.executor.submit(call2)
@ -460,19 +470,11 @@ class WaitTests(unittest.TestCase):
call2.close() call2.close()
class ThreadPoolWaitTests(WaitTests): class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests):
def setUp(self): pass
self.executor = futures.ThreadPoolExecutor(max_workers=1)
def tearDown(self): class ProcessPoolWaitTests(ProcessPoolMixin, WaitTests):
self.executor.shutdown(wait=True) pass
class ProcessPoolWaitTests(WaitTests):
def setUp(self):
self.executor = futures.ProcessPoolExecutor(max_workers=1)
def tearDown(self):
self.executor.shutdown(wait=True)
class AsCompletedTests(unittest.TestCase): class AsCompletedTests(unittest.TestCase):
# TODO(brian@sweetapp.com): Should have a test with a non-zero timeout. # TODO(brian@sweetapp.com): Should have a test with a non-zero timeout.
@ -483,8 +485,8 @@ class AsCompletedTests(unittest.TestCase):
call1.set_can() call1.set_can()
call2.set_can() call2.set_can()
call1 = Call(manual_finish=True) call1 = Call(self.Event, manual_finish=True)
call2 = Call(manual_finish=True) call2 = Call(self.Event, manual_finish=True)
try: try:
future1 = self.executor.submit(call1) future1 = self.executor.submit(call1)
future2 = self.executor.submit(call2) future2 = self.executor.submit(call2)
@ -507,7 +509,7 @@ class AsCompletedTests(unittest.TestCase):
call2.close() call2.close()
def test_zero_timeout(self): def test_zero_timeout(self):
call1 = Call(manual_finish=True) call1 = Call(self.Event, manual_finish=True)
try: try:
future1 = self.executor.submit(call1) future1 = self.executor.submit(call1)
completed_futures = set() completed_futures = set()
@ -529,19 +531,11 @@ class AsCompletedTests(unittest.TestCase):
finally: finally:
call1.close() call1.close()
class ThreadPoolAsCompletedTests(AsCompletedTests): class ThreadPoolAsCompletedTests(ThreadPoolMixin, AsCompletedTests):
def setUp(self): pass
self.executor = futures.ThreadPoolExecutor(max_workers=1)
def tearDown(self): class ProcessPoolAsCompletedTests(ProcessPoolMixin, AsCompletedTests):
self.executor.shutdown(wait=True) pass
class ProcessPoolAsCompletedTests(AsCompletedTests):
def setUp(self):
self.executor = futures.ProcessPoolExecutor(max_workers=1)
def tearDown(self):
self.executor.shutdown(wait=True)
class ExecutorTest(unittest.TestCase): class ExecutorTest(unittest.TestCase):
# Executor.shutdown() and context manager usage is tested by # Executor.shutdown() and context manager usage is tested by
@ -567,7 +561,7 @@ class ExecutorTest(unittest.TestCase):
def test_map_timeout(self): def test_map_timeout(self):
results = [] results = []
timeout_call = MapCall() timeout_call = MapCall(self.Event)
try: try:
try: try:
for i in self.executor.map(timeout_call, for i in self.executor.map(timeout_call,
@ -583,19 +577,11 @@ class ExecutorTest(unittest.TestCase):
self.assertEqual([42, 42], results) self.assertEqual([42, 42], results)
class ThreadPoolExecutorTest(ExecutorTest): class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest):
def setUp(self): pass
self.executor = futures.ThreadPoolExecutor(max_workers=1)
def tearDown(self): class ProcessPoolExecutorTest(ProcessPoolMixin, ExecutorTest):
self.executor.shutdown(wait=True) pass
class ProcessPoolExecutorTest(ExecutorTest):
def setUp(self):
self.executor = futures.ProcessPoolExecutor(max_workers=1)
def tearDown(self):
self.executor.shutdown(wait=True)
class FutureTests(unittest.TestCase): class FutureTests(unittest.TestCase):
def test_done_callback_with_result(self): def test_done_callback_with_result(self):

View File

@ -20,6 +20,9 @@ Core and Builtins
Library Library
------- -------
- Issue #10798: Reject supporting concurrent.futures if the system has too
few POSIX semaphores.
- Issue #10807: Remove base64, bz2, hex, quopri, rot13, uu and zlib codecs from - Issue #10807: Remove base64, bz2, hex, quopri, rot13, uu and zlib codecs from
the codec aliases. They are still accessible via codecs.lookup(). the codec aliases. They are still accessible via codecs.lookup().