Merge.
This commit is contained in:
commit
33aaa73cad
|
@ -174,7 +174,8 @@ class Pool(object):
|
||||||
|
|
||||||
self._task_handler = threading.Thread(
|
self._task_handler = threading.Thread(
|
||||||
target=Pool._handle_tasks,
|
target=Pool._handle_tasks,
|
||||||
args=(self._taskqueue, self._quick_put, self._outqueue, self._pool)
|
args=(self._taskqueue, self._quick_put, self._outqueue,
|
||||||
|
self._pool, self._cache)
|
||||||
)
|
)
|
||||||
self._task_handler.daemon = True
|
self._task_handler.daemon = True
|
||||||
self._task_handler._state = RUN
|
self._task_handler._state = RUN
|
||||||
|
@ -364,7 +365,7 @@ class Pool(object):
|
||||||
util.debug('worker handler exiting')
|
util.debug('worker handler exiting')
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _handle_tasks(taskqueue, put, outqueue, pool):
|
def _handle_tasks(taskqueue, put, outqueue, pool, cache):
|
||||||
thread = threading.current_thread()
|
thread = threading.current_thread()
|
||||||
|
|
||||||
for taskseq, set_length in iter(taskqueue.get, None):
|
for taskseq, set_length in iter(taskqueue.get, None):
|
||||||
|
@ -375,9 +376,12 @@ class Pool(object):
|
||||||
break
|
break
|
||||||
try:
|
try:
|
||||||
put(task)
|
put(task)
|
||||||
except OSError:
|
except Exception as e:
|
||||||
util.debug('could not put task on queue')
|
job, ind = task[:2]
|
||||||
break
|
try:
|
||||||
|
cache[job]._set(ind, (False, e))
|
||||||
|
except KeyError:
|
||||||
|
pass
|
||||||
else:
|
else:
|
||||||
if set_length:
|
if set_length:
|
||||||
util.debug('doing set_length()')
|
util.debug('doing set_length()')
|
||||||
|
|
|
@ -1698,6 +1698,16 @@ class _TestPool(BaseTestCase):
|
||||||
self.assertEqual(2, len(call_args))
|
self.assertEqual(2, len(call_args))
|
||||||
self.assertIsInstance(call_args[1], ValueError)
|
self.assertIsInstance(call_args[1], ValueError)
|
||||||
|
|
||||||
|
def test_map_unplicklable(self):
|
||||||
|
# Issue #19425 -- failure to pickle should not cause a hang
|
||||||
|
if self.TYPE == 'threads':
|
||||||
|
return
|
||||||
|
class A(object):
|
||||||
|
def __reduce__(self):
|
||||||
|
raise RuntimeError('cannot pickle')
|
||||||
|
with self.assertRaises(RuntimeError):
|
||||||
|
self.pool.map(sqr, [A()]*10)
|
||||||
|
|
||||||
def test_map_chunksize(self):
|
def test_map_chunksize(self):
|
||||||
try:
|
try:
|
||||||
self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
|
self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
|
||||||
|
|
Loading…
Reference in New Issue