cpython/Lib/multiprocessing/pool.py

597 lines
17 KiB
Python
Raw Normal View History

#
# Module providing the `Pool` class for managing a process pool
#
# multiprocessing/pool.py
#
# Copyright (c) 2007-2008, R Oudkerk --- see COPYING.txt
#
__all__ = ['Pool']
#
# Imports
#
import threading
import Queue
import itertools
import collections
import time
from multiprocessing import Process, cpu_count, TimeoutError
from multiprocessing.util import Finalize, debug
#
# Constants representing the state of a pool
#
RUN = 0
CLOSE = 1
TERMINATE = 2
#
# Miscellaneous
#
job_counter = itertools.count()
def mapstar(args):
return map(*args)
#
# Code run by worker processes
#
def worker(inqueue, outqueue, initializer=None, initargs=()):
put = outqueue.put
get = inqueue.get
if hasattr(inqueue, '_writer'):
inqueue._writer.close()
outqueue._reader.close()
if initializer is not None:
initializer(*initargs)
while 1:
try:
task = get()
except (EOFError, IOError):
debug('worker got EOFError or IOError -- exiting')
break
if task is None:
debug('worker got sentinel -- exiting')
break
job, i, func, args, kwds = task
try:
result = (True, func(*args, **kwds))
except Exception, e:
result = (False, e)
put((job, i, result))
#
# Class representing a process pool
#
class Pool(object):
'''
Class which supports an async version of the `apply()` builtin
'''
Process = Process
def __init__(self, processes=None, initializer=None, initargs=()):
self._setup_queues()
self._taskqueue = Queue.Queue()
self._cache = {}
self._state = RUN
if processes is None:
try:
processes = cpu_count()
except NotImplementedError:
processes = 1
self._pool = []
for i in range(processes):
w = self.Process(
target=worker,
args=(self._inqueue, self._outqueue, initializer, initargs)
)
self._pool.append(w)
w.name = w.name.replace('Process', 'PoolWorker')
w.daemon = True
w.start()
self._task_handler = threading.Thread(
target=Pool._handle_tasks,
args=(self._taskqueue, self._quick_put, self._outqueue, self._pool)
)
self._task_handler.daemon = True
self._task_handler._state = RUN
self._task_handler.start()
self._result_handler = threading.Thread(
target=Pool._handle_results,
args=(self._outqueue, self._quick_get, self._cache)
)
self._result_handler.daemon = True
self._result_handler._state = RUN
self._result_handler.start()
self._terminate = Finalize(
self, self._terminate_pool,
args=(self._taskqueue, self._inqueue, self._outqueue, self._pool,
self._task_handler, self._result_handler, self._cache),
exitpriority=15
)
def _setup_queues(self):
from .queues import SimpleQueue
self._inqueue = SimpleQueue()
self._outqueue = SimpleQueue()
self._quick_put = self._inqueue._writer.send
self._quick_get = self._outqueue._reader.recv
def apply(self, func, args=(), kwds={}):
'''
Equivalent of `apply()` builtin
'''
assert self._state == RUN
return self.apply_async(func, args, kwds).get()
def map(self, func, iterable, chunksize=None):
'''
Equivalent of `map()` builtin
'''
assert self._state == RUN
return self.map_async(func, iterable, chunksize).get()
def imap(self, func, iterable, chunksize=1):
'''
Merged revisions 67245,67277,67289,67295,67301-67303,67307,67330,67332,67336,67355,67359,67362,67364,67367-67368,67370 via svnmerge from svn+ssh://pythondev@svn.python.org/python/trunk ........ r67245 | benjamin.peterson | 2008-11-17 23:05:19 +0100 (Mon, 17 Nov 2008) | 1 line improve __hash__ docs ........ r67277 | skip.montanaro | 2008-11-19 04:35:41 +0100 (Wed, 19 Nov 2008) | 1 line patch from issue 1108 ........ r67289 | brett.cannon | 2008-11-19 21:29:39 +0100 (Wed, 19 Nov 2008) | 2 lines Ignore .pyc and .pyo files. ........ r67295 | benjamin.peterson | 2008-11-20 05:05:12 +0100 (Thu, 20 Nov 2008) | 1 line move useful sys.settrace information to the function's documentation from the debugger ........ r67301 | benjamin.peterson | 2008-11-20 22:25:31 +0100 (Thu, 20 Nov 2008) | 1 line fix indentation and a sphinx warning ........ r67302 | benjamin.peterson | 2008-11-20 22:44:23 +0100 (Thu, 20 Nov 2008) | 1 line oops! didn't mean to disable that test ........ r67303 | benjamin.peterson | 2008-11-20 23:06:22 +0100 (Thu, 20 Nov 2008) | 1 line backport r67300 ........ r67307 | amaury.forgeotdarc | 2008-11-21 00:34:31 +0100 (Fri, 21 Nov 2008) | 9 lines Fixed issue #4233. Changed semantic of _fileio.FileIO's close() method on file objects with closefd=False. The file descriptor is still kept open but the file object behaves like a closed file. The FileIO object also got a new readonly attribute closefd. Approved by Barry Backport of r67106 from the py3k branch ........ r67330 | georg.brandl | 2008-11-22 09:34:14 +0100 (Sat, 22 Nov 2008) | 2 lines #4364: fix attribute name on ctypes object. ........ r67332 | georg.brandl | 2008-11-22 09:45:33 +0100 (Sat, 22 Nov 2008) | 2 lines Fix typo. ........ r67336 | georg.brandl | 2008-11-22 11:08:50 +0100 (Sat, 22 Nov 2008) | 2 lines Fix error about "-*-" being mandatory in coding cookies. ........ r67355 | georg.brandl | 2008-11-23 20:17:25 +0100 (Sun, 23 Nov 2008) | 2 lines #4392: fix parameter name. ........ r67359 | georg.brandl | 2008-11-23 22:57:30 +0100 (Sun, 23 Nov 2008) | 2 lines #4399: fix typo. ........ r67362 | gregory.p.smith | 2008-11-24 01:41:43 +0100 (Mon, 24 Nov 2008) | 2 lines Document PY_SSIZE_T_CLEAN for PyArg_ParseTuple. ........ r67364 | benjamin.peterson | 2008-11-24 02:16:29 +0100 (Mon, 24 Nov 2008) | 2 lines replace reference to debugger-hooks ........ r67367 | georg.brandl | 2008-11-24 17:16:07 +0100 (Mon, 24 Nov 2008) | 2 lines Fix typo. ........ r67368 | georg.brandl | 2008-11-24 20:56:47 +0100 (Mon, 24 Nov 2008) | 2 lines #4404: make clear what "path" is. ........ r67370 | jeremy.hylton | 2008-11-24 23:00:29 +0100 (Mon, 24 Nov 2008) | 8 lines Add unittests that verify documented behavior of public methods in Transport class. These methods can be overridden. The tests verify that the overridden methods are called, and that changes to the connection have a visible effect on the request. ........
2008-12-05 04:51:30 -04:00
Equivalent of `itertools.imap()` -- can be MUCH slower than `Pool.map()`
'''
assert self._state == RUN
if chunksize == 1:
result = IMapIterator(self._cache)
self._taskqueue.put((((result._job, i, func, (x,), {})
for i, x in enumerate(iterable)), result._set_length))
return result
else:
assert chunksize > 1
task_batches = Pool._get_tasks(func, iterable, chunksize)
result = IMapIterator(self._cache)
self._taskqueue.put((((result._job, i, mapstar, (x,), {})
for i, x in enumerate(task_batches)), result._set_length))
return (item for chunk in result for item in chunk)
def imap_unordered(self, func, iterable, chunksize=1):
'''
Like `imap()` method but ordering of results is arbitrary
'''
assert self._state == RUN
if chunksize == 1:
result = IMapUnorderedIterator(self._cache)
self._taskqueue.put((((result._job, i, func, (x,), {})
for i, x in enumerate(iterable)), result._set_length))
return result
else:
assert chunksize > 1
task_batches = Pool._get_tasks(func, iterable, chunksize)
result = IMapUnorderedIterator(self._cache)
self._taskqueue.put((((result._job, i, mapstar, (x,), {})
for i, x in enumerate(task_batches)), result._set_length))
return (item for chunk in result for item in chunk)
def apply_async(self, func, args=(), kwds={}, callback=None):
'''
Asynchronous equivalent of `apply()` builtin
'''
assert self._state == RUN
result = ApplyResult(self._cache, callback)
self._taskqueue.put(([(result._job, None, func, args, kwds)], None))
return result
def map_async(self, func, iterable, chunksize=None, callback=None):
'''
Asynchronous equivalent of `map()` builtin
'''
assert self._state == RUN
if not hasattr(iterable, '__len__'):
iterable = list(iterable)
if chunksize is None:
chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
if extra:
chunksize += 1
task_batches = Pool._get_tasks(func, iterable, chunksize)
result = MapResult(self._cache, chunksize, len(iterable), callback)
self._taskqueue.put((((result._job, i, mapstar, (x,), {})
for i, x in enumerate(task_batches)), None))
return result
@staticmethod
def _handle_tasks(taskqueue, put, outqueue, pool):
thread = threading.current_thread()
for taskseq, set_length in iter(taskqueue.get, None):
i = -1
for i, task in enumerate(taskseq):
if thread._state:
debug('task handler found thread._state != RUN')
break
try:
put(task)
except IOError:
debug('could not put task on queue')
break
else:
if set_length:
debug('doing set_length()')
set_length(i+1)
continue
break
else:
debug('task handler got sentinel')
try:
# tell result handler to finish when cache is empty
debug('task handler sending sentinel to result handler')
outqueue.put(None)
# tell workers there is no more work
debug('task handler sending sentinel to workers')
for p in pool:
put(None)
except IOError:
debug('task handler got IOError when sending sentinels')
debug('task handler exiting')
@staticmethod
def _handle_results(outqueue, get, cache):
thread = threading.current_thread()
while 1:
try:
task = get()
except (IOError, EOFError):
debug('result handler got EOFError/IOError -- exiting')
return
if thread._state:
assert thread._state == TERMINATE
debug('result handler found thread._state=TERMINATE')
break
if task is None:
debug('result handler got sentinel')
break
job, i, obj = task
try:
cache[job]._set(i, obj)
except KeyError:
pass
while cache and thread._state != TERMINATE:
try:
task = get()
except (IOError, EOFError):
debug('result handler got EOFError/IOError -- exiting')
return
if task is None:
debug('result handler ignoring extra sentinel')
continue
job, i, obj = task
try:
cache[job]._set(i, obj)
except KeyError:
pass
if hasattr(outqueue, '_reader'):
debug('ensuring that outqueue is not full')
# If we don't make room available in outqueue then
# attempts to add the sentinel (None) to outqueue may
# block. There is guaranteed to be no more than 2 sentinels.
try:
for i in range(10):
if not outqueue._reader.poll():
break
get()
except (IOError, EOFError):
pass
debug('result handler exiting: len(cache)=%s, thread._state=%s',
len(cache), thread._state)
@staticmethod
def _get_tasks(func, it, size):
it = iter(it)
while 1:
x = tuple(itertools.islice(it, size))
if not x:
return
yield (func, x)
def __reduce__(self):
raise NotImplementedError(
'pool objects cannot be passed between processes or pickled'
)
def close(self):
debug('closing pool')
if self._state == RUN:
self._state = CLOSE
self._taskqueue.put(None)
def terminate(self):
debug('terminating pool')
self._state = TERMINATE
self._terminate()
def join(self):
debug('joining pool')
assert self._state in (CLOSE, TERMINATE)
self._task_handler.join()
self._result_handler.join()
for p in self._pool:
p.join()
@staticmethod
def _help_stuff_finish(inqueue, task_handler, size):
# task_handler may be blocked trying to put items on inqueue
debug('removing tasks from inqueue until task handler finished')
inqueue._rlock.acquire()
while task_handler.is_alive() and inqueue._reader.poll():
inqueue._reader.recv()
time.sleep(0)
@classmethod
def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool,
task_handler, result_handler, cache):
# this is guaranteed to only be called once
debug('finalizing pool')
task_handler._state = TERMINATE
taskqueue.put(None) # sentinel
debug('helping task handler/workers to finish')
cls._help_stuff_finish(inqueue, task_handler, len(pool))
assert result_handler.is_alive() or len(cache) == 0
result_handler._state = TERMINATE
outqueue.put(None) # sentinel
if pool and hasattr(pool[0], 'terminate'):
debug('terminating workers')
for p in pool:
p.terminate()
debug('joining task handler')
task_handler.join(1e100)
debug('joining result handler')
result_handler.join(1e100)
if pool and hasattr(pool[0], 'terminate'):
debug('joining pool workers')
for p in pool:
p.join()
#
# Class whose instances are returned by `Pool.apply_async()`
#
class ApplyResult(object):
def __init__(self, cache, callback):
self._cond = threading.Condition(threading.Lock())
self._job = job_counter.next()
self._cache = cache
self._ready = False
self._callback = callback
cache[self._job] = self
def ready(self):
return self._ready
def successful(self):
assert self._ready
return self._success
def wait(self, timeout=None):
self._cond.acquire()
try:
if not self._ready:
self._cond.wait(timeout)
finally:
self._cond.release()
def get(self, timeout=None):
self.wait(timeout)
if not self._ready:
raise TimeoutError
if self._success:
return self._value
else:
raise self._value
def _set(self, i, obj):
self._success, self._value = obj
if self._callback and self._success:
self._callback(self._value)
self._cond.acquire()
try:
self._ready = True
self._cond.notify()
finally:
self._cond.release()
del self._cache[self._job]
#
# Class whose instances are returned by `Pool.map_async()`
#
class MapResult(ApplyResult):
def __init__(self, cache, chunksize, length, callback):
ApplyResult.__init__(self, cache, callback)
self._success = True
self._value = [None] * length
self._chunksize = chunksize
if chunksize <= 0:
self._number_left = 0
self._ready = True
else:
self._number_left = length//chunksize + bool(length % chunksize)
def _set(self, i, success_result):
success, result = success_result
if success:
self._value[i*self._chunksize:(i+1)*self._chunksize] = result
self._number_left -= 1
if self._number_left == 0:
if self._callback:
self._callback(self._value)
del self._cache[self._job]
self._cond.acquire()
try:
self._ready = True
self._cond.notify()
finally:
self._cond.release()
else:
self._success = False
self._value = result
del self._cache[self._job]
self._cond.acquire()
try:
self._ready = True
self._cond.notify()
finally:
self._cond.release()
#
# Class whose instances are returned by `Pool.imap()`
#
class IMapIterator(object):
def __init__(self, cache):
self._cond = threading.Condition(threading.Lock())
self._job = job_counter.next()
self._cache = cache
self._items = collections.deque()
self._index = 0
self._length = None
self._unsorted = {}
cache[self._job] = self
def __iter__(self):
return self
def next(self, timeout=None):
self._cond.acquire()
try:
try:
item = self._items.popleft()
except IndexError:
if self._index == self._length:
raise StopIteration
self._cond.wait(timeout)
try:
item = self._items.popleft()
except IndexError:
if self._index == self._length:
raise StopIteration
raise TimeoutError
finally:
self._cond.release()
success, value = item
if success:
return value
raise value
__next__ = next # XXX
def _set(self, i, obj):
self._cond.acquire()
try:
if self._index == i:
self._items.append(obj)
self._index += 1
while self._index in self._unsorted:
obj = self._unsorted.pop(self._index)
self._items.append(obj)
self._index += 1
self._cond.notify()
else:
self._unsorted[i] = obj
if self._index == self._length:
del self._cache[self._job]
finally:
self._cond.release()
def _set_length(self, length):
self._cond.acquire()
try:
self._length = length
if self._index == self._length:
self._cond.notify()
del self._cache[self._job]
finally:
self._cond.release()
#
# Class whose instances are returned by `Pool.imap_unordered()`
#
class IMapUnorderedIterator(IMapIterator):
def _set(self, i, obj):
self._cond.acquire()
try:
self._items.append(obj)
self._index += 1
self._cond.notify()
if self._index == self._length:
del self._cache[self._job]
finally:
self._cond.release()
#
#
#
class ThreadPool(Pool):
from .dummy import Process
def __init__(self, processes=None, initializer=None, initargs=()):
Pool.__init__(self, processes, initializer, initargs)
def _setup_queues(self):
self._inqueue = Queue.Queue()
self._outqueue = Queue.Queue()
self._quick_put = self._inqueue.put
self._quick_get = self._outqueue.get
@staticmethod
def _help_stuff_finish(inqueue, task_handler, size):
# put sentinels at head of inqueue to make workers finish
inqueue.not_empty.acquire()
try:
inqueue.queue.clear()
inqueue.queue.extend([None] * size)
inqueue.not_empty.notify_all()
finally:
inqueue.not_empty.release()