truncate() returns the new size and position.

write() returns the number of bytes/characters written/buffered.
FileIO.close() calls self.flush().
Implement readinto() for buffered readers.
Tests th check all these.
Test proper behavior of __enter__/__exit__.
This commit is contained in:
Guido van Rossum 2007-04-10 21:06:59 +00:00
parent 34d69e57e3
commit 8742977b33
3 changed files with 95 additions and 27 deletions

View File

@ -177,10 +177,11 @@ class IOBase:
"""tell() -> int. Return current stream position."""
return self.seek(0, 1)
def truncate(self, pos: int = None) -> None:
"""truncate(size: int = None) -> None. Truncate file to size bytes.
def truncate(self, pos: int = None) -> int:
"""truncate(size: int = None) -> int. Truncate file to size bytes.
Size defaults to the current IO position as reported by tell().
Returns the new size.
"""
self._unsupported("truncate")
@ -329,6 +330,10 @@ class FileIO(_fileio._FileIO, RawIOBase):
would be hard to do since _fileio.c is written in C).
"""
def close(self):
_fileio._FileIO.close(self)
RawIOBase.close(self)
class SocketIO(RawIOBase):
@ -413,7 +418,10 @@ class BufferedIOBase(IOBase):
Raises BlockingIOError if the underlying raw stream has no
data at the moment.
"""
self._unsupported("readinto")
data = self.read(len(b))
n = len(data)
b[:n] = data
return n
def write(self, b: bytes) -> int:
"""write(b: bytes) -> int. Write the given buffer to the IO stream.
@ -448,7 +456,7 @@ class _BufferedIOMixin(BufferedIOBase):
return self.raw.tell()
def truncate(self, pos=None):
self.raw.truncate(pos)
return self.raw.truncate(pos)
### Flush and close ###
@ -503,12 +511,6 @@ class _MemoryIOMixin(BufferedIOBase):
self._pos = newpos
return b
def readinto(self, b):
tmp = self.read(len(b))
n = len(tmp)
b[:n] = tmp
return n
def write(self, b):
n = len(b)
newpos = self._pos + n
@ -536,6 +538,7 @@ class _MemoryIOMixin(BufferedIOBase):
else:
self._pos = max(0, pos)
del self._buffer[pos:]
return pos
def readable(self):
return True
@ -652,7 +655,6 @@ class BufferedWriter(_BufferedIOMixin):
def write(self, b):
# XXX we can implement some more tricks to try and avoid partial writes
##assert issubclass(type(b), bytes)
if len(self._write_buf) > self.buffer_size:
# We're full, so let's pre-flush the buffer
try:
@ -672,6 +674,7 @@ class BufferedWriter(_BufferedIOMixin):
overage = len(self._write_buf) - self.max_buffer_size
self._write_buf = self._write_buf[:self.max_buffer_size]
raise BlockingIOError(e.errno, e.strerror, overage)
return len(b)
def flush(self):
written = 0

View File

@ -79,17 +79,19 @@ class IOTest(unittest.TestCase):
test_support.unlink(test_support.TESTFN)
def write_ops(self, f):
f.write(b"blah.")
f.seek(0)
f.write(b"Hello.")
self.assertEqual(f.write(b"blah."), 5)
self.assertEqual(f.seek(0), 0)
self.assertEqual(f.write(b"Hello."), 6)
self.assertEqual(f.tell(), 6)
f.seek(-1, 1)
self.assertEqual(f.seek(-1, 1), 5)
self.assertEqual(f.tell(), 5)
f.write(" world\n\n\n")
f.seek(0)
f.write("h")
f.seek(-2, 2)
f.truncate()
self.assertEqual(f.write(" world\n\n\n"), 9)
self.assertEqual(f.seek(0), 0)
self.assertEqual(f.write("h"), 1)
self.assertEqual(f.seek(-1, 2), 13)
self.assertEqual(f.tell(), 13)
self.assertEqual(f.truncate(12), 12)
self.assertEqual(f.tell(), 12)
LARGE = 2**31
@ -101,10 +103,10 @@ class IOTest(unittest.TestCase):
self.assertEqual(f.write(b"xxx"), 3)
self.assertEqual(f.tell(), self.LARGE + 3)
self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
f.truncate()
self.assertEqual(f.truncate(), self.LARGE + 2)
self.assertEqual(f.tell(), self.LARGE + 2)
self.assertEqual(f.seek(0, 2), self.LARGE + 2)
f.truncate(self.LARGE + 1)
self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
self.assertEqual(f.tell(), self.LARGE + 1)
self.assertEqual(f.seek(0, 2), self.LARGE + 1)
self.assertEqual(f.seek(-1, 2), self.LARGE)
@ -142,6 +144,20 @@ class IOTest(unittest.TestCase):
self.read_ops(f)
f.close()
def test_buffered_file_io(self):
f = io.open(test_support.TESTFN, "wb")
self.assertEqual(f.readable(), False)
self.assertEqual(f.writable(), True)
self.assertEqual(f.seekable(), True)
self.write_ops(f)
f.close()
f = io.open(test_support.TESTFN, "rb")
self.assertEqual(f.readable(), True)
self.assertEqual(f.writable(), False)
self.assertEqual(f.seekable(), True)
self.read_ops(f)
f.close()
def test_raw_bytes_io(self):
f = io.BytesIO()
self.write_ops(f)
@ -163,9 +179,52 @@ class IOTest(unittest.TestCase):
print("Use 'regrtest.py -u largefile test_io' to run it.",
file=sys.stderr)
return
f = io.open(test_support.TESTFN, "w+b", buffering=0)
f = io.open(test_support.TESTFN, "w+b", 0)
self.large_file_ops(f)
f.close()
f = io.open(test_support.TESTFN, "w+b")
self.large_file_ops(f)
f.close()
def test_with_open(self):
for bufsize in (0, 1, 100):
f = None
with open(test_support.TESTFN, "wb", bufsize) as f:
f.write("xxx")
self.assertEqual(f.closed, True)
f = None
try:
with open(test_support.TESTFN, "wb", bufsize) as f:
1/0
except ZeroDivisionError:
self.assertEqual(f.closed, True)
else:
self.fail("1/0 didn't raise an exception")
def test_destructor(self):
record = []
class MyFileIO(io.FileIO):
def __del__(self):
record.append(1)
io.FileIO.__del__(self)
def close(self):
record.append(2)
io.FileIO.close(self)
def flush(self):
record.append(3)
io.FileIO.flush(self)
f = MyFileIO(test_support.TESTFN, "w")
f.write("xxx")
del f
self.assertEqual(record, [1, 2, 3])
def test_close_flushes(self):
f = io.open(test_support.TESTFN, "wb")
f.write("xxx")
f.close()
f = io.open(test_support.TESTFN, "rb")
self.assertEqual(f.read(), b"xxx")
f.close()
class MemorySeekTestMixin:

View File

@ -530,6 +530,9 @@ fileio_truncate(PyFileIOObject *self, PyObject *args)
if (!PyArg_ParseTuple(args, "|O", &posobj))
return NULL;
if (posobj == Py_None)
posobj = NULL;
if (posobj == NULL)
whence = 1;
else
@ -545,19 +548,22 @@ fileio_truncate(PyFileIOObject *self, PyObject *args)
pos = PyLong_Check(posobj) ?
PyLong_AsLongLong(posobj) : PyInt_AsLong(posobj);
#endif
Py_DECREF(posobj);
if (PyErr_Occurred())
if (PyErr_Occurred()) {
Py_DECREF(posobj);
return NULL;
}
Py_BEGIN_ALLOW_THREADS
errno = 0;
pos = ftruncate(fd, pos);
Py_END_ALLOW_THREADS
if (errno < 0)
if (pos < 0) {
Py_DECREF(posobj);
PyErr_SetFromErrno(PyExc_IOError);
}
Py_RETURN_NONE;
return posobj;
}
static char *