bpo-39207: Spawn workers on demand in ProcessPoolExecutor (GH-19453)

Roughly based on 904e34d4e6, but with a few substantial differences.

/cc @pitrou @brianquinlan
This commit is contained in:
Kyle Stanley 2020-04-19 10:00:59 -04:00 committed by GitHub
parent c12375aa0b
commit 1ac6e37929
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 63 additions and 6 deletions

View File

@ -206,6 +206,11 @@ and :class:`~concurrent.futures.ProcessPoolExecutor`. This improves
compatibility with subinterpreters and predictability in their shutdown compatibility with subinterpreters and predictability in their shutdown
processes. (Contributed by Kyle Stanley in :issue:`39812`.) processes. (Contributed by Kyle Stanley in :issue:`39812`.)
Workers in :class:`~concurrent.futures.ProcessPoolExecutor` are now spawned on
demand, only when there are no available idle workers to reuse. This optimizes
startup overhead and reduces the amount of lost CPU time to idle workers.
(Contributed by Kyle Stanley in :issue:`39207`.)
curses curses
------ ------

View File

@ -318,6 +318,12 @@ class _ExecutorManagerThread(threading.Thread):
# while waiting on new results. # while waiting on new results.
del result_item del result_item
# attempt to increment idle process count
executor = self.executor_reference()
if executor is not None:
executor._idle_worker_semaphore.release()
del executor
if self.is_shutting_down(): if self.is_shutting_down():
self.flag_executor_shutting_down() self.flag_executor_shutting_down()
@ -601,6 +607,7 @@ class ProcessPoolExecutor(_base.Executor):
# Shutdown is a two-step process. # Shutdown is a two-step process.
self._shutdown_thread = False self._shutdown_thread = False
self._shutdown_lock = threading.Lock() self._shutdown_lock = threading.Lock()
self._idle_worker_semaphore = threading.Semaphore(0)
self._broken = False self._broken = False
self._queue_count = 0 self._queue_count = 0
self._pending_work_items = {} self._pending_work_items = {}
@ -633,14 +640,18 @@ class ProcessPoolExecutor(_base.Executor):
def _start_executor_manager_thread(self): def _start_executor_manager_thread(self):
if self._executor_manager_thread is None: if self._executor_manager_thread is None:
# Start the processes so that their sentinels are known. # Start the processes so that their sentinels are known.
self._adjust_process_count()
self._executor_manager_thread = _ExecutorManagerThread(self) self._executor_manager_thread = _ExecutorManagerThread(self)
self._executor_manager_thread.start() self._executor_manager_thread.start()
_threads_wakeups[self._executor_manager_thread] = \ _threads_wakeups[self._executor_manager_thread] = \
self._executor_manager_thread_wakeup self._executor_manager_thread_wakeup
def _adjust_process_count(self): def _adjust_process_count(self):
for _ in range(len(self._processes), self._max_workers): # if there's an idle process, we don't need to spawn a new one.
if self._idle_worker_semaphore.acquire(blocking=False):
return
process_count = len(self._processes)
if process_count < self._max_workers:
p = self._mp_context.Process( p = self._mp_context.Process(
target=_process_worker, target=_process_worker,
args=(self._call_queue, args=(self._call_queue,
@ -669,6 +680,7 @@ class ProcessPoolExecutor(_base.Executor):
# Wake up queue management thread # Wake up queue management thread
self._executor_manager_thread_wakeup.wakeup() self._executor_manager_thread_wakeup.wakeup()
self._adjust_process_count()
self._start_executor_manager_thread() self._start_executor_manager_thread()
return f return f
submit.__doc__ = _base.Executor.submit.__doc__ submit.__doc__ = _base.Executor.submit.__doc__

View File

@ -486,10 +486,16 @@ class ProcessPoolShutdownTest(ExecutorShutdownTest):
pass pass
def test_processes_terminate(self): def test_processes_terminate(self):
self.executor.submit(mul, 21, 2) def acquire_lock(lock):
self.executor.submit(mul, 6, 7) lock.acquire()
self.executor.submit(mul, 3, 14)
self.assertEqual(len(self.executor._processes), 5) mp_context = get_context()
sem = mp_context.Semaphore(0)
for _ in range(3):
self.executor.submit(acquire_lock, sem)
self.assertEqual(len(self.executor._processes), 3)
for _ in range(3):
sem.release()
processes = self.executor._processes processes = self.executor._processes
self.executor.shutdown() self.executor.shutdown()
@ -964,6 +970,36 @@ class ProcessPoolExecutorTest(ExecutorTest):
mgr.shutdown() mgr.shutdown()
mgr.join() mgr.join()
def test_saturation(self):
executor = self.executor_type(4)
mp_context = get_context()
sem = mp_context.Semaphore(0)
job_count = 15 * executor._max_workers
try:
for _ in range(job_count):
executor.submit(sem.acquire)
self.assertEqual(len(executor._processes), executor._max_workers)
for _ in range(job_count):
sem.release()
finally:
executor.shutdown()
def test_idle_process_reuse_one(self):
executor = self.executor_type(4)
executor.submit(mul, 21, 2).result()
executor.submit(mul, 6, 7).result()
executor.submit(mul, 3, 14).result()
self.assertEqual(len(executor._processes), 1)
executor.shutdown()
def test_idle_process_reuse_multiple(self):
executor = self.executor_type(4)
executor.submit(mul, 12, 7).result()
executor.submit(mul, 33, 25)
executor.submit(mul, 25, 26).result()
executor.submit(mul, 18, 29)
self.assertLessEqual(len(executor._processes), 2)
executor.shutdown()
create_executor_tests(ProcessPoolExecutorTest, create_executor_tests(ProcessPoolExecutorTest,
executor_mixins=(ProcessPoolForkMixin, executor_mixins=(ProcessPoolForkMixin,

View File

@ -0,0 +1,4 @@
Workers in :class:`~concurrent.futures.ProcessPoolExecutor` are now spawned on
demand, only when there are no available idle workers to reuse. This optimizes
startup overhead and reduces the amount of lost CPU time to idle workers.
Patch by Kyle Stanley.