mirror of https://github.com/python/cpython
Silence spurious "broken pipe" tracebacks when shutting down a ProcessPoolExecutor.
This commit is contained in:
parent
d06a065a44
commit
dc19c24832
|
@ -205,12 +205,12 @@ def _queue_management_worker(executor_reference,
|
|||
nb_children_alive = sum(p.is_alive() for p in processes.values())
|
||||
for i in range(0, nb_children_alive):
|
||||
call_queue.put_nowait(None)
|
||||
# Release the queue's resources as soon as possible.
|
||||
call_queue.close()
|
||||
# If .join() is not called on the created processes then
|
||||
# some multiprocessing.Queue methods may deadlock on Mac OS X.
|
||||
for p in processes.values():
|
||||
p.join()
|
||||
# Release resources held by the queue
|
||||
call_queue.close()
|
||||
|
||||
while True:
|
||||
_add_call_item_to_queue(pending_work_items,
|
||||
|
@ -241,8 +241,7 @@ def _queue_management_worker(executor_reference,
|
|||
# locks may be in a dirty state and block forever.
|
||||
for p in processes.values():
|
||||
p.terminate()
|
||||
for p in processes.values():
|
||||
p.join()
|
||||
shutdown_worker()
|
||||
return
|
||||
if isinstance(result_item, int):
|
||||
# Clean shutdown of a worker using its PID
|
||||
|
@ -337,6 +336,10 @@ class ProcessPoolExecutor(_base.Executor):
|
|||
# because futures in the call queue cannot be cancelled.
|
||||
self._call_queue = multiprocessing.Queue(self._max_workers +
|
||||
EXTRA_QUEUED_CALLS)
|
||||
# Killed worker processes can produce spurious "broken pipe"
|
||||
# tracebacks in the queue's own worker thread. But we detect killed
|
||||
# processes anyway, so silence the tracebacks.
|
||||
self._call_queue._ignore_epipe = True
|
||||
self._result_queue = SimpleQueue()
|
||||
self._work_ids = queue.Queue()
|
||||
self._queue_management_thread = None
|
||||
|
|
|
@ -41,6 +41,7 @@ import collections
|
|||
import time
|
||||
import atexit
|
||||
import weakref
|
||||
import errno
|
||||
|
||||
from queue import Empty, Full
|
||||
import _multiprocessing
|
||||
|
@ -67,6 +68,8 @@ class Queue(object):
|
|||
else:
|
||||
self._wlock = Lock()
|
||||
self._sem = BoundedSemaphore(maxsize)
|
||||
# For use by concurrent.futures
|
||||
self._ignore_epipe = False
|
||||
|
||||
self._after_fork()
|
||||
|
||||
|
@ -178,7 +181,7 @@ class Queue(object):
|
|||
self._thread = threading.Thread(
|
||||
target=Queue._feed,
|
||||
args=(self._buffer, self._notempty, self._send,
|
||||
self._wlock, self._writer.close),
|
||||
self._wlock, self._writer.close, self._ignore_epipe),
|
||||
name='QueueFeederThread'
|
||||
)
|
||||
self._thread.daemon = True
|
||||
|
@ -229,7 +232,7 @@ class Queue(object):
|
|||
notempty.release()
|
||||
|
||||
@staticmethod
|
||||
def _feed(buffer, notempty, send, writelock, close):
|
||||
def _feed(buffer, notempty, send, writelock, close, ignore_epipe):
|
||||
debug('starting thread to feed data to pipe')
|
||||
from .util import is_exiting
|
||||
|
||||
|
@ -271,6 +274,8 @@ class Queue(object):
|
|||
except IndexError:
|
||||
pass
|
||||
except Exception as e:
|
||||
if ignore_epipe and getattr(e, 'errno', 0) == errno.EPIPE:
|
||||
return
|
||||
# Since this runs in a daemon thread the resources it uses
|
||||
# may be become unusable while the process is cleaning up.
|
||||
# We ignore errors which happen after the process has
|
||||
|
|
Loading…
Reference in New Issue