2008-06-13 16:20:48 -03:00
|
|
|
#
|
|
|
|
# Module providing the `Pool` class for managing a process pool
|
|
|
|
#
|
|
|
|
# multiprocessing/pool.py
|
|
|
|
#
|
|
|
|
# Copyright (c) 2007-2008, R Oudkerk --- see COPYING.txt
|
|
|
|
#
|
|
|
|
|
|
|
|
__all__ = ['Pool']
|
|
|
|
|
|
|
|
#
|
|
|
|
# Imports
|
|
|
|
#
|
|
|
|
|
|
|
|
import threading
|
|
|
|
import Queue
|
|
|
|
import itertools
|
|
|
|
import collections
|
|
|
|
import time
|
|
|
|
|
|
|
|
from multiprocessing import Process, cpu_count, TimeoutError
|
|
|
|
from multiprocessing.util import Finalize, debug
|
|
|
|
|
|
|
|
#
|
|
|
|
# Constants representing the state of a pool
|
|
|
|
#
|
|
|
|
|
|
|
|
RUN = 0
|
|
|
|
CLOSE = 1
|
|
|
|
TERMINATE = 2
|
|
|
|
|
|
|
|
#
|
|
|
|
# Miscellaneous
|
|
|
|
#
|
|
|
|
|
|
|
|
job_counter = itertools.count()
|
|
|
|
|
|
|
|
def mapstar(args):
|
|
|
|
return map(*args)
|
|
|
|
|
|
|
|
#
|
|
|
|
# Code run by worker processes
|
|
|
|
#
|
|
|
|
|
|
|
|
def worker(inqueue, outqueue, initializer=None, initargs=()):
|
|
|
|
put = outqueue.put
|
|
|
|
get = inqueue.get
|
|
|
|
if hasattr(inqueue, '_writer'):
|
|
|
|
inqueue._writer.close()
|
|
|
|
outqueue._reader.close()
|
|
|
|
|
|
|
|
if initializer is not None:
|
|
|
|
initializer(*initargs)
|
|
|
|
|
|
|
|
while 1:
|
|
|
|
try:
|
|
|
|
task = get()
|
|
|
|
except (EOFError, IOError):
|
|
|
|
debug('worker got EOFError or IOError -- exiting')
|
|
|
|
break
|
|
|
|
|
|
|
|
if task is None:
|
|
|
|
debug('worker got sentinel -- exiting')
|
|
|
|
break
|
|
|
|
|
|
|
|
job, i, func, args, kwds = task
|
|
|
|
try:
|
|
|
|
result = (True, func(*args, **kwds))
|
|
|
|
except Exception, e:
|
|
|
|
result = (False, e)
|
|
|
|
put((job, i, result))
|
|
|
|
|
|
|
|
#
|
|
|
|
# Class representing a process pool
|
|
|
|
#
|
|
|
|
|
|
|
|
class Pool(object):
|
|
|
|
'''
|
|
|
|
Class which supports an async version of the `apply()` builtin
|
|
|
|
'''
|
|
|
|
Process = Process
|
|
|
|
|
|
|
|
def __init__(self, processes=None, initializer=None, initargs=()):
|
|
|
|
self._setup_queues()
|
|
|
|
self._taskqueue = Queue.Queue()
|
|
|
|
self._cache = {}
|
|
|
|
self._state = RUN
|
|
|
|
|
|
|
|
if processes is None:
|
|
|
|
try:
|
|
|
|
processes = cpu_count()
|
|
|
|
except NotImplementedError:
|
|
|
|
processes = 1
|
|
|
|
|
|
|
|
self._pool = []
|
|
|
|
for i in range(processes):
|
|
|
|
w = self.Process(
|
|
|
|
target=worker,
|
|
|
|
args=(self._inqueue, self._outqueue, initializer, initargs)
|
|
|
|
)
|
|
|
|
self._pool.append(w)
|
2008-08-19 16:06:19 -03:00
|
|
|
w.name = w.name.replace('Process', 'PoolWorker')
|
2008-08-18 15:31:58 -03:00
|
|
|
w.daemon = True
|
2008-06-13 16:20:48 -03:00
|
|
|
w.start()
|
|
|
|
|
|
|
|
self._task_handler = threading.Thread(
|
|
|
|
target=Pool._handle_tasks,
|
|
|
|
args=(self._taskqueue, self._quick_put, self._outqueue, self._pool)
|
|
|
|
)
|
2008-08-18 15:31:58 -03:00
|
|
|
self._task_handler.daemon = True
|
2008-06-13 16:20:48 -03:00
|
|
|
self._task_handler._state = RUN
|
|
|
|
self._task_handler.start()
|
|
|
|
|
|
|
|
self._result_handler = threading.Thread(
|
|
|
|
target=Pool._handle_results,
|
|
|
|
args=(self._outqueue, self._quick_get, self._cache)
|
|
|
|
)
|
2008-08-18 15:31:58 -03:00
|
|
|
self._result_handler.daemon = True
|
2008-06-13 16:20:48 -03:00
|
|
|
self._result_handler._state = RUN
|
|
|
|
self._result_handler.start()
|
|
|
|
|
|
|
|
self._terminate = Finalize(
|
|
|
|
self, self._terminate_pool,
|
|
|
|
args=(self._taskqueue, self._inqueue, self._outqueue, self._pool,
|
|
|
|
self._task_handler, self._result_handler, self._cache),
|
|
|
|
exitpriority=15
|
|
|
|
)
|
|
|
|
|
|
|
|
def _setup_queues(self):
|
|
|
|
from .queues import SimpleQueue
|
|
|
|
self._inqueue = SimpleQueue()
|
|
|
|
self._outqueue = SimpleQueue()
|
|
|
|
self._quick_put = self._inqueue._writer.send
|
|
|
|
self._quick_get = self._outqueue._reader.recv
|
|
|
|
|
|
|
|
def apply(self, func, args=(), kwds={}):
|
|
|
|
'''
|
|
|
|
Equivalent of `apply()` builtin
|
|
|
|
'''
|
|
|
|
assert self._state == RUN
|
|
|
|
return self.apply_async(func, args, kwds).get()
|
|
|
|
|
|
|
|
def map(self, func, iterable, chunksize=None):
|
|
|
|
'''
|
|
|
|
Equivalent of `map()` builtin
|
|
|
|
'''
|
|
|
|
assert self._state == RUN
|
|
|
|
return self.map_async(func, iterable, chunksize).get()
|
|
|
|
|
|
|
|
def imap(self, func, iterable, chunksize=1):
|
|
|
|
'''
|
Merged revisions 67245,67277,67289,67295,67301-67303,67307,67330,67332,67336,67355,67359,67362,67364,67367-67368,67370 via svnmerge from
svn+ssh://pythondev@svn.python.org/python/trunk
........
r67245 | benjamin.peterson | 2008-11-17 23:05:19 +0100 (Mon, 17 Nov 2008) | 1 line
improve __hash__ docs
........
r67277 | skip.montanaro | 2008-11-19 04:35:41 +0100 (Wed, 19 Nov 2008) | 1 line
patch from issue 1108
........
r67289 | brett.cannon | 2008-11-19 21:29:39 +0100 (Wed, 19 Nov 2008) | 2 lines
Ignore .pyc and .pyo files.
........
r67295 | benjamin.peterson | 2008-11-20 05:05:12 +0100 (Thu, 20 Nov 2008) | 1 line
move useful sys.settrace information to the function's documentation from the debugger
........
r67301 | benjamin.peterson | 2008-11-20 22:25:31 +0100 (Thu, 20 Nov 2008) | 1 line
fix indentation and a sphinx warning
........
r67302 | benjamin.peterson | 2008-11-20 22:44:23 +0100 (Thu, 20 Nov 2008) | 1 line
oops! didn't mean to disable that test
........
r67303 | benjamin.peterson | 2008-11-20 23:06:22 +0100 (Thu, 20 Nov 2008) | 1 line
backport r67300
........
r67307 | amaury.forgeotdarc | 2008-11-21 00:34:31 +0100 (Fri, 21 Nov 2008) | 9 lines
Fixed issue #4233.
Changed semantic of _fileio.FileIO's close() method on file objects with closefd=False.
The file descriptor is still kept open but the file object behaves like a closed file.
The FileIO object also got a new readonly attribute closefd.
Approved by Barry
Backport of r67106 from the py3k branch
........
r67330 | georg.brandl | 2008-11-22 09:34:14 +0100 (Sat, 22 Nov 2008) | 2 lines
#4364: fix attribute name on ctypes object.
........
r67332 | georg.brandl | 2008-11-22 09:45:33 +0100 (Sat, 22 Nov 2008) | 2 lines
Fix typo.
........
r67336 | georg.brandl | 2008-11-22 11:08:50 +0100 (Sat, 22 Nov 2008) | 2 lines
Fix error about "-*-" being mandatory in coding cookies.
........
r67355 | georg.brandl | 2008-11-23 20:17:25 +0100 (Sun, 23 Nov 2008) | 2 lines
#4392: fix parameter name.
........
r67359 | georg.brandl | 2008-11-23 22:57:30 +0100 (Sun, 23 Nov 2008) | 2 lines
#4399: fix typo.
........
r67362 | gregory.p.smith | 2008-11-24 01:41:43 +0100 (Mon, 24 Nov 2008) | 2 lines
Document PY_SSIZE_T_CLEAN for PyArg_ParseTuple.
........
r67364 | benjamin.peterson | 2008-11-24 02:16:29 +0100 (Mon, 24 Nov 2008) | 2 lines
replace reference to debugger-hooks
........
r67367 | georg.brandl | 2008-11-24 17:16:07 +0100 (Mon, 24 Nov 2008) | 2 lines
Fix typo.
........
r67368 | georg.brandl | 2008-11-24 20:56:47 +0100 (Mon, 24 Nov 2008) | 2 lines
#4404: make clear what "path" is.
........
r67370 | jeremy.hylton | 2008-11-24 23:00:29 +0100 (Mon, 24 Nov 2008) | 8 lines
Add unittests that verify documented behavior of public methods in Transport
class.
These methods can be overridden. The tests verify that the overridden
methods are called, and that changes to the connection have a visible
effect on the request.
........
2008-12-05 04:51:30 -04:00
|
|
|
Equivalent of `itertools.imap()` -- can be MUCH slower than `Pool.map()`
|
2008-06-13 16:20:48 -03:00
|
|
|
'''
|
|
|
|
assert self._state == RUN
|
|
|
|
if chunksize == 1:
|
|
|
|
result = IMapIterator(self._cache)
|
|
|
|
self._taskqueue.put((((result._job, i, func, (x,), {})
|
|
|
|
for i, x in enumerate(iterable)), result._set_length))
|
|
|
|
return result
|
|
|
|
else:
|
|
|
|
assert chunksize > 1
|
|
|
|
task_batches = Pool._get_tasks(func, iterable, chunksize)
|
|
|
|
result = IMapIterator(self._cache)
|
|
|
|
self._taskqueue.put((((result._job, i, mapstar, (x,), {})
|
|
|
|
for i, x in enumerate(task_batches)), result._set_length))
|
|
|
|
return (item for chunk in result for item in chunk)
|
|
|
|
|
|
|
|
def imap_unordered(self, func, iterable, chunksize=1):
|
|
|
|
'''
|
|
|
|
Like `imap()` method but ordering of results is arbitrary
|
|
|
|
'''
|
|
|
|
assert self._state == RUN
|
|
|
|
if chunksize == 1:
|
|
|
|
result = IMapUnorderedIterator(self._cache)
|
|
|
|
self._taskqueue.put((((result._job, i, func, (x,), {})
|
|
|
|
for i, x in enumerate(iterable)), result._set_length))
|
|
|
|
return result
|
|
|
|
else:
|
|
|
|
assert chunksize > 1
|
|
|
|
task_batches = Pool._get_tasks(func, iterable, chunksize)
|
|
|
|
result = IMapUnorderedIterator(self._cache)
|
|
|
|
self._taskqueue.put((((result._job, i, mapstar, (x,), {})
|
|
|
|
for i, x in enumerate(task_batches)), result._set_length))
|
|
|
|
return (item for chunk in result for item in chunk)
|
|
|
|
|
|
|
|
def apply_async(self, func, args=(), kwds={}, callback=None):
|
|
|
|
'''
|
|
|
|
Asynchronous equivalent of `apply()` builtin
|
|
|
|
'''
|
|
|
|
assert self._state == RUN
|
|
|
|
result = ApplyResult(self._cache, callback)
|
|
|
|
self._taskqueue.put(([(result._job, None, func, args, kwds)], None))
|
|
|
|
return result
|
|
|
|
|
|
|
|
def map_async(self, func, iterable, chunksize=None, callback=None):
|
|
|
|
'''
|
|
|
|
Asynchronous equivalent of `map()` builtin
|
|
|
|
'''
|
|
|
|
assert self._state == RUN
|
|
|
|
if not hasattr(iterable, '__len__'):
|
|
|
|
iterable = list(iterable)
|
|
|
|
|
|
|
|
if chunksize is None:
|
|
|
|
chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
|
|
|
|
if extra:
|
|
|
|
chunksize += 1
|
|
|
|
|
|
|
|
task_batches = Pool._get_tasks(func, iterable, chunksize)
|
|
|
|
result = MapResult(self._cache, chunksize, len(iterable), callback)
|
|
|
|
self._taskqueue.put((((result._job, i, mapstar, (x,), {})
|
|
|
|
for i, x in enumerate(task_batches)), None))
|
|
|
|
return result
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def _handle_tasks(taskqueue, put, outqueue, pool):
|
|
|
|
thread = threading.current_thread()
|
|
|
|
|
|
|
|
for taskseq, set_length in iter(taskqueue.get, None):
|
|
|
|
i = -1
|
|
|
|
for i, task in enumerate(taskseq):
|
|
|
|
if thread._state:
|
|
|
|
debug('task handler found thread._state != RUN')
|
|
|
|
break
|
|
|
|
try:
|
|
|
|
put(task)
|
|
|
|
except IOError:
|
|
|
|
debug('could not put task on queue')
|
|
|
|
break
|
|
|
|
else:
|
|
|
|
if set_length:
|
|
|
|
debug('doing set_length()')
|
|
|
|
set_length(i+1)
|
|
|
|
continue
|
|
|
|
break
|
|
|
|
else:
|
|
|
|
debug('task handler got sentinel')
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
# tell result handler to finish when cache is empty
|
|
|
|
debug('task handler sending sentinel to result handler')
|
|
|
|
outqueue.put(None)
|
|
|
|
|
|
|
|
# tell workers there is no more work
|
|
|
|
debug('task handler sending sentinel to workers')
|
|
|
|
for p in pool:
|
|
|
|
put(None)
|
|
|
|
except IOError:
|
|
|
|
debug('task handler got IOError when sending sentinels')
|
|
|
|
|
|
|
|
debug('task handler exiting')
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def _handle_results(outqueue, get, cache):
|
|
|
|
thread = threading.current_thread()
|
|
|
|
|
|
|
|
while 1:
|
|
|
|
try:
|
|
|
|
task = get()
|
|
|
|
except (IOError, EOFError):
|
|
|
|
debug('result handler got EOFError/IOError -- exiting')
|
|
|
|
return
|
|
|
|
|
|
|
|
if thread._state:
|
|
|
|
assert thread._state == TERMINATE
|
|
|
|
debug('result handler found thread._state=TERMINATE')
|
|
|
|
break
|
|
|
|
|
|
|
|
if task is None:
|
|
|
|
debug('result handler got sentinel')
|
|
|
|
break
|
|
|
|
|
|
|
|
job, i, obj = task
|
|
|
|
try:
|
|
|
|
cache[job]._set(i, obj)
|
|
|
|
except KeyError:
|
|
|
|
pass
|
|
|
|
|
|
|
|
while cache and thread._state != TERMINATE:
|
|
|
|
try:
|
|
|
|
task = get()
|
|
|
|
except (IOError, EOFError):
|
|
|
|
debug('result handler got EOFError/IOError -- exiting')
|
|
|
|
return
|
|
|
|
|
|
|
|
if task is None:
|
|
|
|
debug('result handler ignoring extra sentinel')
|
|
|
|
continue
|
|
|
|
job, i, obj = task
|
|
|
|
try:
|
|
|
|
cache[job]._set(i, obj)
|
|
|
|
except KeyError:
|
|
|
|
pass
|
|
|
|
|
|
|
|
if hasattr(outqueue, '_reader'):
|
|
|
|
debug('ensuring that outqueue is not full')
|
|
|
|
# If we don't make room available in outqueue then
|
|
|
|
# attempts to add the sentinel (None) to outqueue may
|
|
|
|
# block. There is guaranteed to be no more than 2 sentinels.
|
|
|
|
try:
|
|
|
|
for i in range(10):
|
|
|
|
if not outqueue._reader.poll():
|
|
|
|
break
|
|
|
|
get()
|
|
|
|
except (IOError, EOFError):
|
|
|
|
pass
|
|
|
|
|
|
|
|
debug('result handler exiting: len(cache)=%s, thread._state=%s',
|
|
|
|
len(cache), thread._state)
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def _get_tasks(func, it, size):
|
|
|
|
it = iter(it)
|
|
|
|
while 1:
|
|
|
|
x = tuple(itertools.islice(it, size))
|
|
|
|
if not x:
|
|
|
|
return
|
|
|
|
yield (func, x)
|
|
|
|
|
|
|
|
def __reduce__(self):
|
|
|
|
raise NotImplementedError(
|
|
|
|
'pool objects cannot be passed between processes or pickled'
|
|
|
|
)
|
|
|
|
|
|
|
|
def close(self):
|
|
|
|
debug('closing pool')
|
|
|
|
if self._state == RUN:
|
|
|
|
self._state = CLOSE
|
|
|
|
self._taskqueue.put(None)
|
|
|
|
|
|
|
|
def terminate(self):
|
|
|
|
debug('terminating pool')
|
|
|
|
self._state = TERMINATE
|
|
|
|
self._terminate()
|
|
|
|
|
|
|
|
def join(self):
|
|
|
|
debug('joining pool')
|
|
|
|
assert self._state in (CLOSE, TERMINATE)
|
|
|
|
self._task_handler.join()
|
|
|
|
self._result_handler.join()
|
|
|
|
for p in self._pool:
|
|
|
|
p.join()
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def _help_stuff_finish(inqueue, task_handler, size):
|
|
|
|
# task_handler may be blocked trying to put items on inqueue
|
|
|
|
debug('removing tasks from inqueue until task handler finished')
|
|
|
|
inqueue._rlock.acquire()
|
|
|
|
while task_handler.is_alive() and inqueue._reader.poll():
|
|
|
|
inqueue._reader.recv()
|
|
|
|
time.sleep(0)
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool,
|
|
|
|
task_handler, result_handler, cache):
|
|
|
|
# this is guaranteed to only be called once
|
|
|
|
debug('finalizing pool')
|
|
|
|
|
|
|
|
task_handler._state = TERMINATE
|
|
|
|
taskqueue.put(None) # sentinel
|
|
|
|
|
|
|
|
debug('helping task handler/workers to finish')
|
|
|
|
cls._help_stuff_finish(inqueue, task_handler, len(pool))
|
|
|
|
|
|
|
|
assert result_handler.is_alive() or len(cache) == 0
|
|
|
|
|
|
|
|
result_handler._state = TERMINATE
|
|
|
|
outqueue.put(None) # sentinel
|
|
|
|
|
|
|
|
if pool and hasattr(pool[0], 'terminate'):
|
|
|
|
debug('terminating workers')
|
|
|
|
for p in pool:
|
|
|
|
p.terminate()
|
|
|
|
|
|
|
|
debug('joining task handler')
|
|
|
|
task_handler.join(1e100)
|
|
|
|
|
|
|
|
debug('joining result handler')
|
|
|
|
result_handler.join(1e100)
|
|
|
|
|
|
|
|
if pool and hasattr(pool[0], 'terminate'):
|
|
|
|
debug('joining pool workers')
|
|
|
|
for p in pool:
|
|
|
|
p.join()
|
|
|
|
|
|
|
|
#
|
|
|
|
# Class whose instances are returned by `Pool.apply_async()`
|
|
|
|
#
|
|
|
|
|
|
|
|
class ApplyResult(object):
|
|
|
|
|
|
|
|
def __init__(self, cache, callback):
|
|
|
|
self._cond = threading.Condition(threading.Lock())
|
|
|
|
self._job = job_counter.next()
|
|
|
|
self._cache = cache
|
|
|
|
self._ready = False
|
|
|
|
self._callback = callback
|
|
|
|
cache[self._job] = self
|
|
|
|
|
|
|
|
def ready(self):
|
|
|
|
return self._ready
|
|
|
|
|
|
|
|
def successful(self):
|
|
|
|
assert self._ready
|
|
|
|
return self._success
|
|
|
|
|
|
|
|
def wait(self, timeout=None):
|
|
|
|
self._cond.acquire()
|
|
|
|
try:
|
|
|
|
if not self._ready:
|
|
|
|
self._cond.wait(timeout)
|
|
|
|
finally:
|
|
|
|
self._cond.release()
|
|
|
|
|
|
|
|
def get(self, timeout=None):
|
|
|
|
self.wait(timeout)
|
|
|
|
if not self._ready:
|
|
|
|
raise TimeoutError
|
|
|
|
if self._success:
|
|
|
|
return self._value
|
|
|
|
else:
|
|
|
|
raise self._value
|
|
|
|
|
|
|
|
def _set(self, i, obj):
|
|
|
|
self._success, self._value = obj
|
|
|
|
if self._callback and self._success:
|
|
|
|
self._callback(self._value)
|
|
|
|
self._cond.acquire()
|
|
|
|
try:
|
|
|
|
self._ready = True
|
|
|
|
self._cond.notify()
|
|
|
|
finally:
|
|
|
|
self._cond.release()
|
|
|
|
del self._cache[self._job]
|
|
|
|
|
|
|
|
#
|
|
|
|
# Class whose instances are returned by `Pool.map_async()`
|
|
|
|
#
|
|
|
|
|
|
|
|
class MapResult(ApplyResult):
|
|
|
|
|
|
|
|
def __init__(self, cache, chunksize, length, callback):
|
|
|
|
ApplyResult.__init__(self, cache, callback)
|
|
|
|
self._success = True
|
|
|
|
self._value = [None] * length
|
|
|
|
self._chunksize = chunksize
|
|
|
|
if chunksize <= 0:
|
|
|
|
self._number_left = 0
|
|
|
|
self._ready = True
|
|
|
|
else:
|
|
|
|
self._number_left = length//chunksize + bool(length % chunksize)
|
|
|
|
|
|
|
|
def _set(self, i, success_result):
|
|
|
|
success, result = success_result
|
|
|
|
if success:
|
|
|
|
self._value[i*self._chunksize:(i+1)*self._chunksize] = result
|
|
|
|
self._number_left -= 1
|
|
|
|
if self._number_left == 0:
|
|
|
|
if self._callback:
|
|
|
|
self._callback(self._value)
|
|
|
|
del self._cache[self._job]
|
|
|
|
self._cond.acquire()
|
|
|
|
try:
|
|
|
|
self._ready = True
|
|
|
|
self._cond.notify()
|
|
|
|
finally:
|
|
|
|
self._cond.release()
|
|
|
|
|
|
|
|
else:
|
|
|
|
self._success = False
|
|
|
|
self._value = result
|
|
|
|
del self._cache[self._job]
|
|
|
|
self._cond.acquire()
|
|
|
|
try:
|
|
|
|
self._ready = True
|
|
|
|
self._cond.notify()
|
|
|
|
finally:
|
|
|
|
self._cond.release()
|
|
|
|
|
|
|
|
#
|
|
|
|
# Class whose instances are returned by `Pool.imap()`
|
|
|
|
#
|
|
|
|
|
|
|
|
class IMapIterator(object):
|
|
|
|
|
|
|
|
def __init__(self, cache):
|
|
|
|
self._cond = threading.Condition(threading.Lock())
|
|
|
|
self._job = job_counter.next()
|
|
|
|
self._cache = cache
|
|
|
|
self._items = collections.deque()
|
|
|
|
self._index = 0
|
|
|
|
self._length = None
|
|
|
|
self._unsorted = {}
|
|
|
|
cache[self._job] = self
|
|
|
|
|
|
|
|
def __iter__(self):
|
|
|
|
return self
|
|
|
|
|
|
|
|
def next(self, timeout=None):
|
|
|
|
self._cond.acquire()
|
|
|
|
try:
|
|
|
|
try:
|
|
|
|
item = self._items.popleft()
|
|
|
|
except IndexError:
|
|
|
|
if self._index == self._length:
|
|
|
|
raise StopIteration
|
|
|
|
self._cond.wait(timeout)
|
|
|
|
try:
|
|
|
|
item = self._items.popleft()
|
|
|
|
except IndexError:
|
|
|
|
if self._index == self._length:
|
|
|
|
raise StopIteration
|
|
|
|
raise TimeoutError
|
|
|
|
finally:
|
|
|
|
self._cond.release()
|
|
|
|
|
|
|
|
success, value = item
|
|
|
|
if success:
|
|
|
|
return value
|
|
|
|
raise value
|
|
|
|
|
|
|
|
__next__ = next # XXX
|
|
|
|
|
|
|
|
def _set(self, i, obj):
|
|
|
|
self._cond.acquire()
|
|
|
|
try:
|
|
|
|
if self._index == i:
|
|
|
|
self._items.append(obj)
|
|
|
|
self._index += 1
|
|
|
|
while self._index in self._unsorted:
|
|
|
|
obj = self._unsorted.pop(self._index)
|
|
|
|
self._items.append(obj)
|
|
|
|
self._index += 1
|
|
|
|
self._cond.notify()
|
|
|
|
else:
|
|
|
|
self._unsorted[i] = obj
|
|
|
|
|
|
|
|
if self._index == self._length:
|
|
|
|
del self._cache[self._job]
|
|
|
|
finally:
|
|
|
|
self._cond.release()
|
|
|
|
|
|
|
|
def _set_length(self, length):
|
|
|
|
self._cond.acquire()
|
|
|
|
try:
|
|
|
|
self._length = length
|
|
|
|
if self._index == self._length:
|
|
|
|
self._cond.notify()
|
|
|
|
del self._cache[self._job]
|
|
|
|
finally:
|
|
|
|
self._cond.release()
|
|
|
|
|
|
|
|
#
|
|
|
|
# Class whose instances are returned by `Pool.imap_unordered()`
|
|
|
|
#
|
|
|
|
|
|
|
|
class IMapUnorderedIterator(IMapIterator):
|
|
|
|
|
|
|
|
def _set(self, i, obj):
|
|
|
|
self._cond.acquire()
|
|
|
|
try:
|
|
|
|
self._items.append(obj)
|
|
|
|
self._index += 1
|
|
|
|
self._cond.notify()
|
|
|
|
if self._index == self._length:
|
|
|
|
del self._cache[self._job]
|
|
|
|
finally:
|
|
|
|
self._cond.release()
|
|
|
|
|
|
|
|
#
|
|
|
|
#
|
|
|
|
#
|
|
|
|
|
|
|
|
class ThreadPool(Pool):
|
|
|
|
|
|
|
|
from .dummy import Process
|
|
|
|
|
|
|
|
def __init__(self, processes=None, initializer=None, initargs=()):
|
|
|
|
Pool.__init__(self, processes, initializer, initargs)
|
|
|
|
|
|
|
|
def _setup_queues(self):
|
|
|
|
self._inqueue = Queue.Queue()
|
|
|
|
self._outqueue = Queue.Queue()
|
|
|
|
self._quick_put = self._inqueue.put
|
|
|
|
self._quick_get = self._outqueue.get
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def _help_stuff_finish(inqueue, task_handler, size):
|
|
|
|
# put sentinels at head of inqueue to make workers finish
|
|
|
|
inqueue.not_empty.acquire()
|
|
|
|
try:
|
|
|
|
inqueue.queue.clear()
|
|
|
|
inqueue.queue.extend([None] * size)
|
|
|
|
inqueue.not_empty.notify_all()
|
|
|
|
finally:
|
|
|
|
inqueue.not_empty.release()
|