mirror of https://github.com/python/cpython
Resolves issues 5155, 5313, 5331 - bad file descriptor error with processes in processes
This commit is contained in:
parent
0c9eb43149
commit
1b90efbdc5
|
@ -2101,6 +2101,38 @@ Explicitly pass resources to child processes
|
|||
for i in range(10):
|
||||
Process(target=f, args=(lock,)).start()
|
||||
|
||||
Beware replacing sys.stdin with a "file like object"
|
||||
|
||||
:mod:`multiprocessing` originally unconditionally called::
|
||||
|
||||
os.close(sys.stdin.fileno())
|
||||
|
||||
In the :meth:`multiprocessing.Process._bootstrap` method of - this resulted
|
||||
in issues with processes-in-processes. This has been changed to::
|
||||
|
||||
sys.stdin.close()
|
||||
sys.stdin = open(os.devnull)
|
||||
|
||||
Which solves the fundamental issue of processes colliding with each other
|
||||
resulting in a bad file descriptor error, but introduces a potential danger
|
||||
to applications which replace :func:`sys.stdin` with a "file-like object"
|
||||
with output buffering, this danger is that if multiple processes call
|
||||
:func:`close()` on this file-like object, it could result in the same
|
||||
data being flushed to the object multiple times, resulting in corruption.
|
||||
|
||||
If you write a file-like object and implement your own caching, you can
|
||||
make it fork-safe by storing the pid whenever you append to the cache,
|
||||
and discarding the cache when the pid changes. For example::
|
||||
|
||||
@property
|
||||
def cache(self):
|
||||
pid = os.getpid()
|
||||
if pid != self._pid:
|
||||
self._pid = pid
|
||||
self._cache = []
|
||||
return self._cache
|
||||
|
||||
For more information, see :issue:`5155`, :issue:`5313` and :issue:`5331`
|
||||
|
||||
Windows
|
||||
~~~~~~~
|
||||
|
|
|
@ -220,7 +220,8 @@ class Process(object):
|
|||
self._children = set()
|
||||
self._counter = itertools.count(1)
|
||||
try:
|
||||
os.close(sys.stdin.fileno())
|
||||
sys.stdin.close()
|
||||
sys.stdin = open(os.devnull)
|
||||
except (OSError, ValueError):
|
||||
pass
|
||||
_current_process = self
|
||||
|
|
|
@ -18,6 +18,7 @@ import socket
|
|||
import random
|
||||
import logging
|
||||
import test_support
|
||||
from StringIO import StringIO
|
||||
|
||||
|
||||
_multiprocessing = test_support.import_module('_multiprocessing')
|
||||
|
@ -1861,7 +1862,74 @@ class TestInitializers(unittest.TestCase):
|
|||
p.join()
|
||||
self.assertEqual(self.ns.test, 1)
|
||||
|
||||
testcases_other = [OtherTest, TestInvalidHandle, TestInitializers]
|
||||
#
|
||||
# Issue 5155, 5313, 5331: Test process in processes
|
||||
# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
|
||||
#
|
||||
|
||||
def _ThisSubProcess(q):
|
||||
try:
|
||||
item = q.get(block=False)
|
||||
except Queue.Empty:
|
||||
pass
|
||||
|
||||
def _TestProcess(q):
|
||||
queue = multiprocessing.Queue()
|
||||
subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
|
||||
subProc.start()
|
||||
subProc.join()
|
||||
|
||||
def _afunc(x):
|
||||
return x*x
|
||||
|
||||
def pool_in_process():
|
||||
pool = multiprocessing.Pool(processes=4)
|
||||
x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
|
||||
|
||||
class _file_like(object):
|
||||
def __init__(self, delegate):
|
||||
self._delegate = delegate
|
||||
self._pid = None
|
||||
|
||||
@property
|
||||
def cache(self):
|
||||
pid = os.getpid()
|
||||
# There are no race conditions since fork keeps only the running thread
|
||||
if pid != self._pid:
|
||||
self._pid = pid
|
||||
self._cache = []
|
||||
return self._cache
|
||||
|
||||
def write(self, data):
|
||||
self.cache.append(data)
|
||||
|
||||
def flush(self):
|
||||
self._delegate.write(''.join(self.cache))
|
||||
self._cache = []
|
||||
|
||||
class TestStdinBadfiledescriptor(unittest.TestCase):
|
||||
|
||||
def test_queue_in_process(self):
|
||||
queue = multiprocessing.Queue()
|
||||
proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
|
||||
proc.start()
|
||||
proc.join()
|
||||
|
||||
def test_pool_in_process(self):
|
||||
p = multiprocessing.Process(target=pool_in_process)
|
||||
p.start()
|
||||
p.join()
|
||||
|
||||
def test_flushing(self):
|
||||
sio = StringIO()
|
||||
flike = _file_like(sio)
|
||||
flike.write('foo')
|
||||
proc = multiprocessing.Process(target=lambda: flike.flush())
|
||||
flike.flush()
|
||||
assert sio.getvalue() == 'foo'
|
||||
|
||||
testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
|
||||
TestStdinBadfiledescriptor]
|
||||
|
||||
#
|
||||
#
|
||||
|
|
|
@ -46,6 +46,7 @@ Des Barry
|
|||
Ulf Bartelt
|
||||
Nick Bastin
|
||||
Jeff Bauer
|
||||
Mike Bayer
|
||||
Michael R Bax
|
||||
Anthony Baxter
|
||||
Samuel L. Bayer
|
||||
|
@ -183,6 +184,7 @@ Cesar Douady
|
|||
Dean Draayer
|
||||
John DuBois
|
||||
Paul Dubois
|
||||
Graham Dumpleton
|
||||
Quinn Dunkan
|
||||
Robin Dunn
|
||||
Luke Dunstan
|
||||
|
@ -553,6 +555,7 @@ Steven Pemberton
|
|||
Santiago Peresón
|
||||
Mark Perrego
|
||||
Trevor Perrin
|
||||
Gabriel de Perthuis
|
||||
Tim Peters
|
||||
Benjamin Peterson
|
||||
Chris Petrilli
|
||||
|
|
|
@ -341,6 +341,10 @@ Core and Builtins
|
|||
Library
|
||||
-------
|
||||
|
||||
- Issues #5155, 5313, 5331: multiprocessing.Process._bootstrap was
|
||||
unconditionally calling "os.close(sys.stdin.fileno())" resulting in file
|
||||
descriptor errors
|
||||
|
||||
- Issue #6365: Distutils build_ext inplace mode was copying the compiled
|
||||
extension in a subdirectory if the extension name had dots.
|
||||
|
||||
|
|
Loading…
Reference in New Issue