diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py index d66d37a5c3e..715a9b0e129 100644 --- a/Lib/multiprocessing/queues.py +++ b/Lib/multiprocessing/queues.py @@ -161,7 +161,7 @@ class Queue(object): target=Queue._feed, args=(self._buffer, self._notempty, self._send_bytes, self._wlock, self._writer.close, self._ignore_epipe, - self._on_queue_feeder_error), + self._on_queue_feeder_error, self._sem), name='QueueFeederThread' ) self._thread.daemon = True @@ -203,7 +203,7 @@ class Queue(object): @staticmethod def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe, - onerror): + onerror, queue_sem): debug('starting thread to feed data to pipe') nacquire = notempty.acquire nrelease = notempty.release @@ -255,6 +255,12 @@ class Queue(object): info('error in queue thread: %s', e) return else: + # Since the object has not been sent in the queue, we need + # to decrease the size of the queue. The error acts as + # if the object had been silently removed from the queue + # and this step is necessary to have a properly working + # queue. + queue_sem.release() onerror(e, obj) @staticmethod diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 940fe584e75..c787702f1d4 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -1056,6 +1056,19 @@ class _TestQueue(BaseTestCase): self.assertTrue(q.get(timeout=1.0)) close_queue(q) + with test.support.captured_stderr(): + # bpo-33078: verify that the queue size is correctly handled + # on errors. + q = self.Queue(maxsize=1) + q.put(NotSerializable()) + q.put(True) + self.assertEqual(q.qsize(), 1) + # bpo-30595: use a timeout of 1 second for slow buildbots + self.assertTrue(q.get(timeout=1.0)) + # Check that the size of the queue is correct + self.assertEqual(q.qsize(), 0) + close_queue(q) + def test_queue_feeder_on_queue_feeder_error(self): # bpo-30006: verify feeder handles exceptions using the # _on_queue_feeder_error hook. diff --git a/Misc/NEWS.d/next/Library/2018-03-15-07-38-00.bpo-33078.RmjUF5.rst b/Misc/NEWS.d/next/Library/2018-03-15-07-38-00.bpo-33078.RmjUF5.rst new file mode 100644 index 00000000000..55c2b1de866 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2018-03-15-07-38-00.bpo-33078.RmjUF5.rst @@ -0,0 +1,2 @@ +Fix the size handling in multiprocessing.Queue when a pickling error +occurs.