bpo-40221: Update multiprocessing to use _at_fork_reinit (GH-19511)

This commit is contained in:
Dong-hee Na 2020-04-15 01:35:36 +09:00 committed by GitHub
parent e560f90602
commit a5900ecf9f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 10 additions and 9 deletions

View File

@ -49,8 +49,7 @@ class Queue(object):
self._sem = ctx.BoundedSemaphore(maxsize) self._sem = ctx.BoundedSemaphore(maxsize)
# For use by concurrent.futures # For use by concurrent.futures
self._ignore_epipe = False self._ignore_epipe = False
self._reset()
self._after_fork()
if sys.platform != 'win32': if sys.platform != 'win32':
register_after_fork(self, Queue._after_fork) register_after_fork(self, Queue._after_fork)
@ -63,11 +62,17 @@ class Queue(object):
def __setstate__(self, state): def __setstate__(self, state):
(self._ignore_epipe, self._maxsize, self._reader, self._writer, (self._ignore_epipe, self._maxsize, self._reader, self._writer,
self._rlock, self._wlock, self._sem, self._opid) = state self._rlock, self._wlock, self._sem, self._opid) = state
self._after_fork() self._reset()
def _after_fork(self): def _after_fork(self):
debug('Queue._after_fork()') debug('Queue._after_fork()')
self._notempty = threading.Condition(threading.Lock()) self._reset(after_fork=True)
def _reset(self, after_fork=False):
if after_fork:
self._notempty._at_fork_reinit()
else:
self._notempty = threading.Condition(threading.Lock())
self._buffer = collections.deque() self._buffer = collections.deque()
self._thread = None self._thread = None
self._jointhread = None self._jointhread = None

View File

@ -63,7 +63,6 @@ class _ResourceSharer(object):
def __init__(self): def __init__(self):
self._key = 0 self._key = 0
self._cache = {} self._cache = {}
self._old_locks = []
self._lock = threading.Lock() self._lock = threading.Lock()
self._listener = None self._listener = None
self._address = None self._address = None
@ -113,10 +112,7 @@ class _ResourceSharer(object):
for key, (send, close) in self._cache.items(): for key, (send, close) in self._cache.items():
close() close()
self._cache.clear() self._cache.clear()
# If self._lock was locked at the time of the fork, it may be broken self._lock._at_fork_reinit()
# -- see issue 6721. Replace it without letting it be gc'ed.
self._old_locks.append(self._lock)
self._lock = threading.Lock()
if self._listener is not None: if self._listener is not None:
self._listener.close() self._listener.close()
self._listener = None self._listener = None