Merged revisions 88610 via svnmerge from
svn+ssh://pythondev@svn.python.org/python/branches/py3k ........ r88610 | antoine.pitrou | 2011-02-25 22:24:11 +0100 (ven., 25 févr. 2011) | 4 lines Issue #10956: Buffered I/O classes retry reading or writing after a signal has arrived and the handler returned successfully. ........
This commit is contained in:
parent
31c44031f8
commit
d843c2d86f
31
Lib/_pyio.py
31
Lib/_pyio.py
|
@ -14,6 +14,7 @@ except ImportError:
|
|||
|
||||
import io
|
||||
from io import (__all__, SEEK_SET, SEEK_CUR, SEEK_END)
|
||||
from errno import EINTR
|
||||
|
||||
# open() uses st_blksize whenever we can
|
||||
DEFAULT_BUFFER_SIZE = 8 * 1024 # bytes
|
||||
|
@ -943,7 +944,12 @@ class BufferedReader(_BufferedIOMixin):
|
|||
current_size = 0
|
||||
while True:
|
||||
# Read until EOF or until read() would block.
|
||||
chunk = self.raw.read()
|
||||
try:
|
||||
chunk = self.raw.read()
|
||||
except IOError as e:
|
||||
if e.errno != EINTR:
|
||||
raise
|
||||
continue
|
||||
if chunk in empty_values:
|
||||
nodata_val = chunk
|
||||
break
|
||||
|
@ -962,7 +968,12 @@ class BufferedReader(_BufferedIOMixin):
|
|||
chunks = [buf[pos:]]
|
||||
wanted = max(self.buffer_size, n)
|
||||
while avail < n:
|
||||
chunk = self.raw.read(wanted)
|
||||
try:
|
||||
chunk = self.raw.read(wanted)
|
||||
except IOError as e:
|
||||
if e.errno != EINTR:
|
||||
raise
|
||||
continue
|
||||
if chunk in empty_values:
|
||||
nodata_val = chunk
|
||||
break
|
||||
|
@ -991,7 +1002,14 @@ class BufferedReader(_BufferedIOMixin):
|
|||
have = len(self._read_buf) - self._read_pos
|
||||
if have < want or have <= 0:
|
||||
to_read = self.buffer_size - have
|
||||
current = self.raw.read(to_read)
|
||||
while True:
|
||||
try:
|
||||
current = self.raw.read(to_read)
|
||||
except IOError as e:
|
||||
if e.errno != EINTR:
|
||||
raise
|
||||
continue
|
||||
break
|
||||
if current:
|
||||
self._read_buf = self._read_buf[self._read_pos:] + current
|
||||
self._read_pos = 0
|
||||
|
@ -1098,7 +1116,12 @@ class BufferedWriter(_BufferedIOMixin):
|
|||
written = 0
|
||||
try:
|
||||
while self._write_buf:
|
||||
n = self.raw.write(self._write_buf)
|
||||
try:
|
||||
n = self.raw.write(self._write_buf)
|
||||
except IOError as e:
|
||||
if e.errno != EINTR:
|
||||
raise
|
||||
continue
|
||||
if n > len(self._write_buf) or n < 0:
|
||||
raise IOError("write() returned incorrect number of bytes")
|
||||
del self._write_buf[:n]
|
||||
|
|
|
@ -2622,7 +2622,8 @@ class SignalsTest(unittest.TestCase):
|
|||
@unittest.skipUnless(threading, 'Threading required for this test.')
|
||||
def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
|
||||
"""Check that a partial write, when it gets interrupted, properly
|
||||
invokes the signal handler."""
|
||||
invokes the signal handler, and bubbles up the exception raised
|
||||
in the latter."""
|
||||
read_results = []
|
||||
def _read():
|
||||
s = os.read(r, 1)
|
||||
|
@ -2701,6 +2702,98 @@ class SignalsTest(unittest.TestCase):
|
|||
def test_reentrant_write_text(self):
|
||||
self.check_reentrant_write("xy", mode="w", encoding="ascii")
|
||||
|
||||
def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
|
||||
"""Check that a buffered read, when it gets interrupted (either
|
||||
returning a partial result or EINTR), properly invokes the signal
|
||||
handler and retries if the latter returned successfully."""
|
||||
r, w = os.pipe()
|
||||
fdopen_kwargs["closefd"] = False
|
||||
def alarm_handler(sig, frame):
|
||||
os.write(w, b"bar")
|
||||
signal.signal(signal.SIGALRM, alarm_handler)
|
||||
try:
|
||||
rio = self.io.open(r, **fdopen_kwargs)
|
||||
os.write(w, b"foo")
|
||||
signal.alarm(1)
|
||||
# Expected behaviour:
|
||||
# - first raw read() returns partial b"foo"
|
||||
# - second raw read() returns EINTR
|
||||
# - third raw read() returns b"bar"
|
||||
self.assertEqual(decode(rio.read(6)), "foobar")
|
||||
finally:
|
||||
rio.close()
|
||||
os.close(w)
|
||||
os.close(r)
|
||||
|
||||
def test_interrupterd_read_retry_buffered(self):
|
||||
self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
|
||||
mode="rb")
|
||||
|
||||
def test_interrupterd_read_retry_text(self):
|
||||
self.check_interrupted_read_retry(lambda x: x,
|
||||
mode="r")
|
||||
|
||||
@unittest.skipUnless(threading, 'Threading required for this test.')
|
||||
def check_interrupted_write_retry(self, item, **fdopen_kwargs):
|
||||
"""Check that a buffered write, when it gets interrupted (either
|
||||
returning a partial result or EINTR), properly invokes the signal
|
||||
handler and retries if the latter returned successfully."""
|
||||
select = support.import_module("select")
|
||||
# A quantity that exceeds the buffer size of an anonymous pipe's
|
||||
# write end.
|
||||
N = 1024 * 1024
|
||||
r, w = os.pipe()
|
||||
fdopen_kwargs["closefd"] = False
|
||||
# We need a separate thread to read from the pipe and allow the
|
||||
# write() to finish. This thread is started after the SIGALRM is
|
||||
# received (forcing a first EINTR in write()).
|
||||
read_results = []
|
||||
write_finished = False
|
||||
def _read():
|
||||
while not write_finished:
|
||||
while r in select.select([r], [], [], 1.0)[0]:
|
||||
s = os.read(r, 1024)
|
||||
read_results.append(s)
|
||||
t = threading.Thread(target=_read)
|
||||
t.daemon = True
|
||||
def alarm1(sig, frame):
|
||||
signal.signal(signal.SIGALRM, alarm2)
|
||||
signal.alarm(1)
|
||||
def alarm2(sig, frame):
|
||||
t.start()
|
||||
signal.signal(signal.SIGALRM, alarm1)
|
||||
try:
|
||||
wio = self.io.open(w, **fdopen_kwargs)
|
||||
signal.alarm(1)
|
||||
# Expected behaviour:
|
||||
# - first raw write() is partial (because of the limited pipe buffer
|
||||
# and the first alarm)
|
||||
# - second raw write() returns EINTR (because of the second alarm)
|
||||
# - subsequent write()s are successful (either partial or complete)
|
||||
self.assertEqual(N, wio.write(item * N))
|
||||
wio.flush()
|
||||
write_finished = True
|
||||
t.join()
|
||||
self.assertEqual(N, sum(len(x) for x in read_results))
|
||||
finally:
|
||||
write_finished = True
|
||||
os.close(w)
|
||||
os.close(r)
|
||||
# This is deliberate. If we didn't close the file descriptor
|
||||
# before closing wio, wio would try to flush its internal
|
||||
# buffer, and could block (in case of failure).
|
||||
try:
|
||||
wio.close()
|
||||
except IOError as e:
|
||||
if e.errno != errno.EBADF:
|
||||
raise
|
||||
|
||||
def test_interrupterd_write_retry_buffered(self):
|
||||
self.check_interrupted_write_retry(b"x", mode="wb")
|
||||
|
||||
def test_interrupterd_write_retry_text(self):
|
||||
self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
|
||||
|
||||
|
||||
class CSignalsTest(SignalsTest):
|
||||
io = io
|
||||
|
|
|
@ -24,6 +24,9 @@ Core and Builtins
|
|||
Library
|
||||
-------
|
||||
|
||||
- Issue #10956: Buffered I/O classes retry reading or writing after a signal
|
||||
has arrived and the handler returned successfully.
|
||||
|
||||
- Issue #11224: Fixed a regression in tarfile that affected the file-like
|
||||
objects returned by TarFile.extractfile() regarding performance, memory
|
||||
consumption and failures with the stream interface.
|
||||
|
|
|
@ -714,6 +714,39 @@ _buffered_init(buffered *self)
|
|||
return 0;
|
||||
}
|
||||
|
||||
/* Return 1 if an EnvironmentError with errno == EINTR is set (and then
|
||||
clears the error indicator), 0 otherwise.
|
||||
Should only be called when PyErr_Occurred() is true.
|
||||
*/
|
||||
static int
|
||||
_trap_eintr(void)
|
||||
{
|
||||
static PyObject *eintr_int = NULL;
|
||||
PyObject *typ, *val, *tb;
|
||||
PyEnvironmentErrorObject *env_err;
|
||||
|
||||
if (eintr_int == NULL) {
|
||||
eintr_int = PyLong_FromLong(EINTR);
|
||||
assert(eintr_int != NULL);
|
||||
}
|
||||
if (!PyErr_ExceptionMatches(PyExc_EnvironmentError))
|
||||
return 0;
|
||||
PyErr_Fetch(&typ, &val, &tb);
|
||||
PyErr_NormalizeException(&typ, &val, &tb);
|
||||
env_err = (PyEnvironmentErrorObject *) val;
|
||||
assert(env_err != NULL);
|
||||
if (env_err->myerrno != NULL &&
|
||||
PyObject_RichCompareBool(env_err->myerrno, eintr_int, Py_EQ) > 0) {
|
||||
Py_DECREF(typ);
|
||||
Py_DECREF(val);
|
||||
Py_XDECREF(tb);
|
||||
return 1;
|
||||
}
|
||||
/* This silences any error set by PyObject_RichCompareBool() */
|
||||
PyErr_Restore(typ, val, tb);
|
||||
return 0;
|
||||
}
|
||||
|
||||
/*
|
||||
* Shared methods and wrappers
|
||||
*/
|
||||
|
@ -1269,7 +1302,14 @@ _bufferedreader_raw_read(buffered *self, char *start, Py_ssize_t len)
|
|||
memobj = PyMemoryView_FromBuffer(&buf);
|
||||
if (memobj == NULL)
|
||||
return -1;
|
||||
res = PyObject_CallMethodObjArgs(self->raw, _PyIO_str_readinto, memobj, NULL);
|
||||
/* NOTE: PyErr_SetFromErrno() calls PyErr_CheckSignals() when EINTR
|
||||
occurs so we needn't do it ourselves.
|
||||
We then retry reading, ignoring the signal if no handler has
|
||||
raised (see issue #10956).
|
||||
*/
|
||||
do {
|
||||
res = PyObject_CallMethodObjArgs(self->raw, _PyIO_str_readinto, memobj, NULL);
|
||||
} while (res == NULL && _trap_eintr());
|
||||
Py_DECREF(memobj);
|
||||
if (res == NULL)
|
||||
return -1;
|
||||
|
@ -1678,7 +1718,14 @@ _bufferedwriter_raw_write(buffered *self, char *start, Py_ssize_t len)
|
|||
memobj = PyMemoryView_FromBuffer(&buf);
|
||||
if (memobj == NULL)
|
||||
return -1;
|
||||
res = PyObject_CallMethodObjArgs(self->raw, _PyIO_str_write, memobj, NULL);
|
||||
/* NOTE: PyErr_SetFromErrno() calls PyErr_CheckSignals() when EINTR
|
||||
occurs so we needn't do it ourselves.
|
||||
We then retry writing, ignoring the signal if no handler has
|
||||
raised (see issue #10956).
|
||||
*/
|
||||
do {
|
||||
res = PyObject_CallMethodObjArgs(self->raw, _PyIO_str_write, memobj, NULL);
|
||||
} while (res == NULL && _trap_eintr());
|
||||
Py_DECREF(memobj);
|
||||
if (res == NULL)
|
||||
return -1;
|
||||
|
|
Loading…
Reference in New Issue