Issue #6963: Added maxtasksperchild argument to multiprocessing.Pool

This commit is contained in:
Jesse Noller 2010-01-27 03:05:57 +00:00
parent 2deb5c758a
commit 654ade3e6a
6 changed files with 130 additions and 19 deletions

View File

@ -1537,7 +1537,7 @@ Process Pools
One can create a pool of processes which will carry out tasks submitted to it
with the :class:`Pool` class.
.. class:: multiprocessing.Pool([processes[, initializer[, initargs]]])
.. class:: multiprocessing.Pool([processes[, initializer[, initargs[, maxtasksperchild]]]])
A process pool object which controls a pool of worker processes to which jobs
can be submitted. It supports asynchronous results with timeouts and
@ -1548,6 +1548,21 @@ with the :class:`Pool` class.
*initializer* is not ``None`` then each worker process will call
``initializer(*initargs)`` when it starts.
*maxtasksperchild* is the number of tasks a worker process can complete
before it will exit and be replaced with a fresh worker process, to enable
unused resources to be freed. The default *maxtasksperchild* is None, which
means worker processes will live as long as the pool.
.. note::
Worker processes within a :class:`Pool` typically live for the complete
duration of the Pool's work queue. A frequent pattern found in other
systems (such as Apache, mod_wsgi, etc) to free resources held by
workers is to allow a worker within a pool to complete only a set
amount of work before being exiting, being cleaned up and a new
process spawned to replace the old one. The *maxtasksperchild*
argument to the :class:`Pool` exposes this ability to the end user.
.. method:: apply(func[, args[, kwds]])
Equivalent of the :func:`apply` built-in function. It blocks till the

View File

@ -219,12 +219,12 @@ def JoinableQueue(maxsize=0):
from multiprocessing.queues import JoinableQueue
return JoinableQueue(maxsize)
def Pool(processes=None, initializer=None, initargs=()):
def Pool(processes=None, initializer=None, initargs=(), maxtasksperchild=None):
'''
Returns a process pool object
'''
from multiprocessing.pool import Pool
return Pool(processes, initializer, initargs)
return Pool(processes, initializer, initargs, maxtasksperchild)
def RawValue(typecode_or_type, *args):
'''

View File

@ -42,7 +42,8 @@ def mapstar(args):
# Code run by worker processes
#
def worker(inqueue, outqueue, initializer=None, initargs=()):
def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None):
assert maxtasks is None or (type(maxtasks) == int and maxtasks > 0)
put = outqueue.put
get = inqueue.get
if hasattr(inqueue, '_writer'):
@ -52,7 +53,8 @@ def worker(inqueue, outqueue, initializer=None, initargs=()):
if initializer is not None:
initializer(*initargs)
while 1:
completed = 0
while maxtasks is None or (maxtasks and completed < maxtasks):
try:
task = get()
except (EOFError, IOError):
@ -69,6 +71,8 @@ def worker(inqueue, outqueue, initializer=None, initargs=()):
except Exception, e:
result = (False, e)
put((job, i, result))
completed += 1
debug('worker exiting after %d tasks' % completed)
#
# Class representing a process pool
@ -80,11 +84,15 @@ class Pool(object):
'''
Process = Process
def __init__(self, processes=None, initializer=None, initargs=()):
def __init__(self, processes=None, initializer=None, initargs=(),
maxtasksperchild=None):
self._setup_queues()
self._taskqueue = Queue.Queue()
self._cache = {}
self._state = RUN
self._maxtasksperchild = maxtasksperchild
self._initializer = initializer
self._initargs = initargs
if processes is None:
try:
@ -95,16 +103,18 @@ class Pool(object):
if initializer is not None and not hasattr(initializer, '__call__'):
raise TypeError('initializer must be a callable')
self._processes = processes
self._pool = []
for i in range(processes):
w = self.Process(
target=worker,
args=(self._inqueue, self._outqueue, initializer, initargs)
self._repopulate_pool()
self._worker_handler = threading.Thread(
target=Pool._handle_workers,
args=(self, )
)
self._pool.append(w)
w.name = w.name.replace('Process', 'PoolWorker')
w.daemon = True
w.start()
self._worker_handler.daemon = True
self._worker_handler._state = RUN
self._worker_handler.start()
self._task_handler = threading.Thread(
target=Pool._handle_tasks,
@ -125,10 +135,48 @@ class Pool(object):
self._terminate = Finalize(
self, self._terminate_pool,
args=(self._taskqueue, self._inqueue, self._outqueue, self._pool,
self._task_handler, self._result_handler, self._cache),
self._worker_handler, self._task_handler,
self._result_handler, self._cache),
exitpriority=15
)
def _join_exited_workers(self):
"""Cleanup after any worker processes which have exited due to reaching
their specified lifetime. Returns True if any workers were cleaned up.
"""
cleaned = False
for i in reversed(range(len(self._pool))):
worker = self._pool[i]
if worker.exitcode is not None:
# worker exited
debug('cleaning up worker %d' % i)
worker.join()
cleaned = True
del self._pool[i]
return cleaned
def _repopulate_pool(self):
"""Bring the number of pool processes up to the specified number,
for use after reaping workers which have exited.
"""
for i in range(self._processes - len(self._pool)):
w = self.Process(target=worker,
args=(self._inqueue, self._outqueue,
self._initializer,
self._initargs, self._maxtasksperchild)
)
self._pool.append(w)
w.name = w.name.replace('Process', 'PoolWorker')
w.daemon = True
w.start()
debug('added worker')
def _maintain_pool(self):
"""Clean up any exited workers and start replacements for them.
"""
if self._join_exited_workers():
self._repopulate_pool()
def _setup_queues(self):
from .queues import SimpleQueue
self._inqueue = SimpleQueue()
@ -216,6 +264,13 @@ class Pool(object):
for i, x in enumerate(task_batches)), None))
return result
@staticmethod
def _handle_workers(pool):
while pool._worker_handler._state == RUN and pool._state == RUN:
pool._maintain_pool()
time.sleep(0.1)
debug('worker handler exiting')
@staticmethod
def _handle_tasks(taskqueue, put, outqueue, pool):
thread = threading.current_thread()
@ -331,16 +386,19 @@ class Pool(object):
debug('closing pool')
if self._state == RUN:
self._state = CLOSE
self._worker_handler._state = CLOSE
self._taskqueue.put(None)
def terminate(self):
debug('terminating pool')
self._state = TERMINATE
self._worker_handler._state = TERMINATE
self._terminate()
def join(self):
debug('joining pool')
assert self._state in (CLOSE, TERMINATE)
self._worker_handler.join()
self._task_handler.join()
self._result_handler.join()
for p in self._pool:
@ -357,10 +415,11 @@ class Pool(object):
@classmethod
def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool,
task_handler, result_handler, cache):
worker_handler, task_handler, result_handler, cache):
# this is guaranteed to only be called once
debug('finalizing pool')
worker_handler._state = TERMINATE
task_handler._state = TERMINATE
taskqueue.put(None) # sentinel
@ -372,9 +431,11 @@ class Pool(object):
result_handler._state = TERMINATE
outqueue.put(None) # sentinel
# Terminate workers which haven't already finished.
if pool and hasattr(pool[0], 'terminate'):
debug('terminating workers')
for p in pool:
if p.exitcode is None:
p.terminate()
debug('joining task handler')
@ -387,6 +448,11 @@ class Pool(object):
debug('joining pool workers')
for p in pool:
p.join()
for w in pool:
if w.exitcode is None:
# worker has not yet exited
debug('cleaning up worker %d' % w.pid)
w.join()
#
# Class whose instances are returned by `Pool.apply_async()`

View File

@ -45,7 +45,7 @@ latin = str
#
LOG_LEVEL = util.SUBWARNING
#LOG_LEVEL = logging.WARNING
#LOG_LEVEL = logging.DEBUG
DELTA = 0.1
CHECK_TIMINGS = False # making true makes tests take a lot longer
@ -1052,6 +1052,30 @@ class _TestPool(BaseTestCase):
join = TimingWrapper(self.pool.join)
join()
self.assertTrue(join.elapsed < 0.2)
class _TestPoolWorkerLifetime(BaseTestCase):
ALLOWED_TYPES = ('processes', )
def test_pool_worker_lifetime(self):
p = multiprocessing.Pool(3, maxtasksperchild=10)
self.assertEqual(3, len(p._pool))
origworkerpids = [w.pid for w in p._pool]
# Run many tasks so each worker gets replaced (hopefully)
results = []
for i in range(100):
results.append(p.apply_async(sqr, (i, )))
# Fetch the results and verify we got the right answers,
# also ensuring all the tasks have completed.
for (j, res) in enumerate(results):
self.assertEqual(res.get(), sqr(j))
# Refill the pool
p._repopulate_pool()
# Finally, check that the worker pids have changed
finalworkerpids = [w.pid for w in p._pool]
self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
p.close()
p.join()
#
# Test that manager has expected number of shared objects left
#

View File

@ -121,6 +121,7 @@ Brett Cannon
Mike Carlton
Terry Carroll
Donn Cave
Charles Cazabon
Per Cederqvist
Octavian Cerna
Hye-Shik Chang

View File

@ -173,6 +173,11 @@ Core and Builtins
Library
-------
- Issue #6963: Added "maxtasksperchild" argument to multiprocessing.Pool,
allowing for a maximum number of tasks within the pool to be completed by
the worker before that worker is terminated, and a new one created to
replace it.
- Issue #7617: Make sure distutils.unixccompiler.UnixCCompiler recognizes
gcc when it has a fully qualified configuration prefix. Initial patch
by Arfrever.