Issue #17097: Make multiprocessing ignore EINTR.
This commit is contained in:
parent
8087879349
commit
cca8c53d6a
|
@ -366,7 +366,10 @@ class Connection(_ConnectionBase):
|
|||
def _send(self, buf, write=_write):
|
||||
remaining = len(buf)
|
||||
while True:
|
||||
n = write(self._handle, buf)
|
||||
try:
|
||||
n = write(self._handle, buf)
|
||||
except InterruptedError:
|
||||
continue
|
||||
remaining -= n
|
||||
if remaining == 0:
|
||||
break
|
||||
|
@ -377,7 +380,10 @@ class Connection(_ConnectionBase):
|
|||
handle = self._handle
|
||||
remaining = size
|
||||
while remaining > 0:
|
||||
chunk = read(handle, remaining)
|
||||
try:
|
||||
chunk = read(handle, remaining)
|
||||
except InterruptedError:
|
||||
continue
|
||||
n = len(chunk)
|
||||
if n == 0:
|
||||
if remaining == size:
|
||||
|
@ -581,7 +587,13 @@ class SocketListener(object):
|
|||
self._unlink = None
|
||||
|
||||
def accept(self):
|
||||
s, self._last_accepted = self._socket.accept()
|
||||
while True:
|
||||
try:
|
||||
s, self._last_accepted = self._socket.accept()
|
||||
except InterruptedError:
|
||||
pass
|
||||
else:
|
||||
break
|
||||
s.setblocking(True)
|
||||
return Connection(s.detach())
|
||||
|
||||
|
|
|
@ -3460,6 +3460,74 @@ class TestForkAwareThreadLock(unittest.TestCase):
|
|||
p.join()
|
||||
self.assertLessEqual(new_size, old_size)
|
||||
|
||||
#
|
||||
# Issue #17097: EINTR should be ignored by recv(), send(), accept() etc
|
||||
#
|
||||
|
||||
class TestIgnoreEINTR(unittest.TestCase):
|
||||
|
||||
@classmethod
|
||||
def _test_ignore(cls, conn):
|
||||
def handler(signum, frame):
|
||||
pass
|
||||
signal.signal(signal.SIGUSR1, handler)
|
||||
conn.send('ready')
|
||||
x = conn.recv()
|
||||
conn.send(x)
|
||||
conn.send_bytes(b'x'*(1024*1024)) # sending 1 MB should block
|
||||
|
||||
@unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
|
||||
def test_ignore(self):
|
||||
conn, child_conn = multiprocessing.Pipe()
|
||||
try:
|
||||
p = multiprocessing.Process(target=self._test_ignore,
|
||||
args=(child_conn,))
|
||||
p.daemon = True
|
||||
p.start()
|
||||
child_conn.close()
|
||||
self.assertEqual(conn.recv(), 'ready')
|
||||
time.sleep(0.1)
|
||||
os.kill(p.pid, signal.SIGUSR1)
|
||||
time.sleep(0.1)
|
||||
conn.send(1234)
|
||||
self.assertEqual(conn.recv(), 1234)
|
||||
time.sleep(0.1)
|
||||
os.kill(p.pid, signal.SIGUSR1)
|
||||
self.assertEqual(conn.recv_bytes(), b'x'*(1024*1024))
|
||||
time.sleep(0.1)
|
||||
p.join()
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
@classmethod
|
||||
def _test_ignore_listener(cls, conn):
|
||||
def handler(signum, frame):
|
||||
pass
|
||||
signal.signal(signal.SIGUSR1, handler)
|
||||
l = multiprocessing.connection.Listener()
|
||||
conn.send(l.address)
|
||||
a = l.accept()
|
||||
a.send('welcome')
|
||||
|
||||
@unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
|
||||
def test_ignore_listener(self):
|
||||
conn, child_conn = multiprocessing.Pipe()
|
||||
try:
|
||||
p = multiprocessing.Process(target=self._test_ignore_listener,
|
||||
args=(child_conn,))
|
||||
p.daemon = True
|
||||
p.start()
|
||||
child_conn.close()
|
||||
address = conn.recv()
|
||||
time.sleep(0.1)
|
||||
os.kill(p.pid, signal.SIGUSR1)
|
||||
time.sleep(0.1)
|
||||
client = multiprocessing.connection.Client(address)
|
||||
self.assertEqual(client.recv(), 'welcome')
|
||||
p.join()
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
#
|
||||
#
|
||||
#
|
||||
|
@ -3467,7 +3535,7 @@ class TestForkAwareThreadLock(unittest.TestCase):
|
|||
testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
|
||||
TestStdinBadfiledescriptor, TestWait, TestInvalidFamily,
|
||||
TestFlags, TestTimeouts, TestNoForkBomb,
|
||||
TestForkAwareThreadLock]
|
||||
TestForkAwareThreadLock, TestIgnoreEINTR]
|
||||
|
||||
#
|
||||
#
|
||||
|
|
Loading…
Reference in New Issue