Issue #23051: multiprocessing.Pool methods imap() and imap_unordered() now
handle exceptions raised by an iterator. Patch by Alon Diamant and Davin Potts.
This commit is contained in:
commit
63623ac252
|
@ -374,25 +374,34 @@ class Pool(object):
|
||||||
thread = threading.current_thread()
|
thread = threading.current_thread()
|
||||||
|
|
||||||
for taskseq, set_length in iter(taskqueue.get, None):
|
for taskseq, set_length in iter(taskqueue.get, None):
|
||||||
|
task = None
|
||||||
i = -1
|
i = -1
|
||||||
for i, task in enumerate(taskseq):
|
try:
|
||||||
if thread._state:
|
for i, task in enumerate(taskseq):
|
||||||
util.debug('task handler found thread._state != RUN')
|
if thread._state:
|
||||||
break
|
util.debug('task handler found thread._state != RUN')
|
||||||
try:
|
break
|
||||||
put(task)
|
|
||||||
except Exception as e:
|
|
||||||
job, ind = task[:2]
|
|
||||||
try:
|
try:
|
||||||
cache[job]._set(ind, (False, e))
|
put(task)
|
||||||
except KeyError:
|
except Exception as e:
|
||||||
pass
|
job, ind = task[:2]
|
||||||
else:
|
try:
|
||||||
|
cache[job]._set(ind, (False, e))
|
||||||
|
except KeyError:
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
if set_length:
|
||||||
|
util.debug('doing set_length()')
|
||||||
|
set_length(i+1)
|
||||||
|
continue
|
||||||
|
break
|
||||||
|
except Exception as ex:
|
||||||
|
job, ind = task[:2] if task else (0, 0)
|
||||||
|
if job in cache:
|
||||||
|
cache[job]._set(ind + 1, (False, ex))
|
||||||
if set_length:
|
if set_length:
|
||||||
util.debug('doing set_length()')
|
util.debug('doing set_length()')
|
||||||
set_length(i+1)
|
set_length(i+1)
|
||||||
continue
|
|
||||||
break
|
|
||||||
else:
|
else:
|
||||||
util.debug('task handler got sentinel')
|
util.debug('task handler got sentinel')
|
||||||
|
|
||||||
|
|
|
@ -1660,6 +1660,14 @@ def sqr(x, wait=0.0):
|
||||||
def mul(x, y):
|
def mul(x, y):
|
||||||
return x*y
|
return x*y
|
||||||
|
|
||||||
|
class SayWhenError(ValueError): pass
|
||||||
|
|
||||||
|
def exception_throwing_generator(total, when):
|
||||||
|
for i in range(total):
|
||||||
|
if i == when:
|
||||||
|
raise SayWhenError("Somebody said when")
|
||||||
|
yield i
|
||||||
|
|
||||||
class _TestPool(BaseTestCase):
|
class _TestPool(BaseTestCase):
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
|
@ -1758,6 +1766,25 @@ class _TestPool(BaseTestCase):
|
||||||
self.assertEqual(next(it), i*i)
|
self.assertEqual(next(it), i*i)
|
||||||
self.assertRaises(StopIteration, it.__next__)
|
self.assertRaises(StopIteration, it.__next__)
|
||||||
|
|
||||||
|
def test_imap_handle_iterable_exception(self):
|
||||||
|
if self.TYPE == 'manager':
|
||||||
|
self.skipTest('test not appropriate for {}'.format(self.TYPE))
|
||||||
|
|
||||||
|
it = self.pool.imap(sqr, exception_throwing_generator(10, 3), 1)
|
||||||
|
for i in range(3):
|
||||||
|
self.assertEqual(next(it), i*i)
|
||||||
|
self.assertRaises(SayWhenError, it.__next__)
|
||||||
|
|
||||||
|
# SayWhenError seen at start of problematic chunk's results
|
||||||
|
it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 2)
|
||||||
|
for i in range(6):
|
||||||
|
self.assertEqual(next(it), i*i)
|
||||||
|
self.assertRaises(SayWhenError, it.__next__)
|
||||||
|
it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 4)
|
||||||
|
for i in range(4):
|
||||||
|
self.assertEqual(next(it), i*i)
|
||||||
|
self.assertRaises(SayWhenError, it.__next__)
|
||||||
|
|
||||||
def test_imap_unordered(self):
|
def test_imap_unordered(self):
|
||||||
it = self.pool.imap_unordered(sqr, list(range(1000)))
|
it = self.pool.imap_unordered(sqr, list(range(1000)))
|
||||||
self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
|
self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
|
||||||
|
@ -1765,6 +1792,25 @@ class _TestPool(BaseTestCase):
|
||||||
it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
|
it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
|
||||||
self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
|
self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
|
||||||
|
|
||||||
|
def test_imap_unordered_handle_iterable_exception(self):
|
||||||
|
if self.TYPE == 'manager':
|
||||||
|
self.skipTest('test not appropriate for {}'.format(self.TYPE))
|
||||||
|
|
||||||
|
it = self.pool.imap_unordered(sqr,
|
||||||
|
exception_throwing_generator(10, 3),
|
||||||
|
1)
|
||||||
|
with self.assertRaises(SayWhenError):
|
||||||
|
# imap_unordered makes it difficult to anticipate the SayWhenError
|
||||||
|
for i in range(10):
|
||||||
|
self.assertEqual(next(it), i*i)
|
||||||
|
|
||||||
|
it = self.pool.imap_unordered(sqr,
|
||||||
|
exception_throwing_generator(20, 7),
|
||||||
|
2)
|
||||||
|
with self.assertRaises(SayWhenError):
|
||||||
|
for i in range(20):
|
||||||
|
self.assertEqual(next(it), i*i)
|
||||||
|
|
||||||
def test_make_pool(self):
|
def test_make_pool(self):
|
||||||
self.assertRaises(ValueError, multiprocessing.Pool, -1)
|
self.assertRaises(ValueError, multiprocessing.Pool, -1)
|
||||||
self.assertRaises(ValueError, multiprocessing.Pool, 0)
|
self.assertRaises(ValueError, multiprocessing.Pool, 0)
|
||||||
|
|
|
@ -336,6 +336,7 @@ Raghuram Devarakonda
|
||||||
Caleb Deveraux
|
Caleb Deveraux
|
||||||
Catherine Devlin
|
Catherine Devlin
|
||||||
Scott Dial
|
Scott Dial
|
||||||
|
Alon Diamant
|
||||||
Toby Dickenson
|
Toby Dickenson
|
||||||
Mark Dickinson
|
Mark Dickinson
|
||||||
Jack Diederich
|
Jack Diederich
|
||||||
|
@ -1099,6 +1100,7 @@ Martin Pool
|
||||||
Iustin Pop
|
Iustin Pop
|
||||||
Claudiu Popa
|
Claudiu Popa
|
||||||
John Popplewell
|
John Popplewell
|
||||||
|
Davin Potts
|
||||||
Guillaume Pratte
|
Guillaume Pratte
|
||||||
Amrit Prem
|
Amrit Prem
|
||||||
Paul Prescod
|
Paul Prescod
|
||||||
|
|
|
@ -18,6 +18,10 @@ Core and Builtins
|
||||||
Library
|
Library
|
||||||
-------
|
-------
|
||||||
|
|
||||||
|
- Issue #23051: multiprocessing.Pool methods imap() and imap_unordered() now
|
||||||
|
handle exceptions raised by an iterator. Patch by Alon Diamant and Davin
|
||||||
|
Potts.
|
||||||
|
|
||||||
- Issue #23581: Add matmul support to MagicMock. Patch by Håkan Lövdahl.
|
- Issue #23581: Add matmul support to MagicMock. Patch by Håkan Lövdahl.
|
||||||
|
|
||||||
- Issue #23566: enable(), register(), dump_traceback() and
|
- Issue #23566: enable(), register(), dump_traceback() and
|
||||||
|
|
Loading…
Reference in New Issue