From 904e34d4e6b6007986dcc585d5c553ee8ae06f95 Mon Sep 17 00:00:00 2001 From: Sean Date: Wed, 22 May 2019 14:29:58 -0700 Subject: [PATCH] bpo-24882: Let ThreadPoolExecutor reuse idle threads before creating new thread (#6375) * Fixes issue 24882 * Add news file entry for change. * Change test_concurrent_futures.ThreadPoolShutdownTest Adjust the shutdown test so that, after submitting three jobs to the executor, the test checks for less than three threads, instead of looking for exactly three threads. If idle threads are being recycled properly, then we should have less than three threads. * Switched idle count to semaphor, Updated tests As suggested by reviewer tomMoral, swapped lock-protected counter with a semaphore to track the number of unused threads. Adjusted test_threads_terminate to wait for completiton of the previous future before submitting a new one (and checking the number of threads used). Also added a new test to confirm the thread pool can be saturated. * Updates tests as requested by pitrou. * Correct minor whitespace error. * Make test_saturation faster --- Lib/concurrent/futures/thread.py | 15 +++++++-- Lib/test/test_concurrent_futures.py | 32 +++++++++++++++++-- .../2018-04-04-14-54-30.bpo-24882.urybpa.rst | 1 + 3 files changed, 43 insertions(+), 5 deletions(-) create mode 100644 Misc/NEWS.d/next/Library/2018-04-04-14-54-30.bpo-24882.urybpa.rst diff --git a/Lib/concurrent/futures/thread.py b/Lib/concurrent/futures/thread.py index 2af31a106dd..ad6b4c20b56 100644 --- a/Lib/concurrent/futures/thread.py +++ b/Lib/concurrent/futures/thread.py @@ -80,7 +80,14 @@ def _worker(executor_reference, work_queue, initializer, initargs): work_item.run() # Delete references to object. See issue16284 del work_item + + # attempt to increment idle count + executor = executor_reference() + if executor is not None: + executor._idle_semaphore.release() + del executor continue + executor = executor_reference() # Exit if: # - The interpreter is shutting down OR @@ -133,6 +140,7 @@ class ThreadPoolExecutor(_base.Executor): self._max_workers = max_workers self._work_queue = queue.SimpleQueue() + self._idle_semaphore = threading.Semaphore(0) self._threads = set() self._broken = False self._shutdown = False @@ -178,12 +186,15 @@ class ThreadPoolExecutor(_base.Executor): submit.__doc__ = _base.Executor.submit.__doc__ def _adjust_thread_count(self): + # if idle threads are available, don't spin new threads + if self._idle_semaphore.acquire(timeout=0): + return + # When the executor gets lost, the weakref callback will wake up # the worker threads. def weakref_cb(_, q=self._work_queue): q.put(None) - # TODO(bquinlan): Should avoid creating new threads if there are more - # idle threads than items in the work queue. + num_threads = len(self._threads) if num_threads < self._max_workers: thread_name = '%s_%d' % (self._thread_name_prefix or self, diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index 212ccd8d532..de6ad8f2aa1 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -346,10 +346,15 @@ class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest, BaseTestCase pass def test_threads_terminate(self): - self.executor.submit(mul, 21, 2) - self.executor.submit(mul, 6, 7) - self.executor.submit(mul, 3, 14) + def acquire_lock(lock): + lock.acquire() + + sem = threading.Semaphore(0) + for i in range(3): + self.executor.submit(acquire_lock, sem) self.assertEqual(len(self.executor._threads), 3) + for i in range(3): + sem.release() self.executor.shutdown() for t in self.executor._threads: t.join() @@ -753,6 +758,27 @@ class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest, BaseTestCase): self.assertEqual(executor._max_workers, (os.cpu_count() or 1) * 5) + def test_saturation(self): + executor = self.executor_type(4) + def acquire_lock(lock): + lock.acquire() + + sem = threading.Semaphore(0) + for i in range(15 * executor._max_workers): + executor.submit(acquire_lock, sem) + self.assertEqual(len(executor._threads), executor._max_workers) + for i in range(15 * executor._max_workers): + sem.release() + executor.shutdown(wait=True) + + def test_idle_thread_reuse(self): + executor = self.executor_type() + executor.submit(mul, 21, 2).result() + executor.submit(mul, 6, 7).result() + executor.submit(mul, 3, 14).result() + self.assertEqual(len(executor._threads), 1) + executor.shutdown(wait=True) + class ProcessPoolExecutorTest(ExecutorTest): diff --git a/Misc/NEWS.d/next/Library/2018-04-04-14-54-30.bpo-24882.urybpa.rst b/Misc/NEWS.d/next/Library/2018-04-04-14-54-30.bpo-24882.urybpa.rst new file mode 100644 index 00000000000..8c418824a99 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2018-04-04-14-54-30.bpo-24882.urybpa.rst @@ -0,0 +1 @@ +Change ThreadPoolExecutor to use existing idle threads before spinning up new ones. \ No newline at end of file