Added fast alternate io.BytesIO implementation and its test suite.

Removed old test suite for StringIO.
Modified truncate() to imply a seek to given argument value.
This commit is contained in:
Alexandre Vassalotti 2008-05-06 19:48:38 +00:00
parent 5d8da20dd1
commit 77250f4df7
10 changed files with 1245 additions and 193 deletions

View File

@ -490,6 +490,7 @@ class IOBase(metaclass=abc.ABCMeta):
terminator(s) recognized. terminator(s) recognized.
""" """
# For backwards compatibility, a (slowish) readline(). # For backwards compatibility, a (slowish) readline().
self._checkClosed()
if hasattr(self, "peek"): if hasattr(self, "peek"):
def nreadahead(): def nreadahead():
readahead = self.peek(1) readahead = self.peek(1)
@ -531,7 +532,7 @@ class IOBase(metaclass=abc.ABCMeta):
lines will be read if the total size (in bytes/characters) of all lines will be read if the total size (in bytes/characters) of all
lines so far exceeds hint. lines so far exceeds hint.
""" """
if hint is None: if hint is None or hint <= 0:
return list(self) return list(self)
n = 0 n = 0
lines = [] lines = []
@ -726,6 +727,8 @@ class _BufferedIOMixin(BufferedIOBase):
if pos is None: if pos is None:
pos = self.tell() pos = self.tell()
# XXX: Should seek() be used, instead of passing the position
# XXX directly to truncate?
return self.raw.truncate(pos) return self.raw.truncate(pos)
### Flush and close ### ### Flush and close ###
@ -765,7 +768,7 @@ class _BufferedIOMixin(BufferedIOBase):
return self.raw.isatty() return self.raw.isatty()
class BytesIO(BufferedIOBase): class _BytesIO(BufferedIOBase):
"""Buffered I/O implementation using an in-memory bytes buffer.""" """Buffered I/O implementation using an in-memory bytes buffer."""
@ -779,13 +782,19 @@ class BytesIO(BufferedIOBase):
def getvalue(self): def getvalue(self):
"""Return the bytes value (contents) of the buffer """Return the bytes value (contents) of the buffer
""" """
if self.closed:
raise ValueError("getvalue on closed file")
return bytes(self._buffer) return bytes(self._buffer)
def read(self, n=None): def read(self, n=None):
if self.closed:
raise ValueError("read from closed file")
if n is None: if n is None:
n = -1 n = -1
if n < 0: if n < 0:
n = len(self._buffer) n = len(self._buffer)
if len(self._buffer) <= self._pos:
return self._buffer[:0]
newpos = min(len(self._buffer), self._pos + n) newpos = min(len(self._buffer), self._pos + n)
b = self._buffer[self._pos : newpos] b = self._buffer[self._pos : newpos]
self._pos = newpos self._pos = newpos
@ -802,6 +811,8 @@ class BytesIO(BufferedIOBase):
if isinstance(b, str): if isinstance(b, str):
raise TypeError("can't write str to binary stream") raise TypeError("can't write str to binary stream")
n = len(b) n = len(b)
if n == 0:
return 0
newpos = self._pos + n newpos = self._pos + n
if newpos > len(self._buffer): if newpos > len(self._buffer):
# Inserts null bytes between the current end of the file # Inserts null bytes between the current end of the file
@ -813,28 +824,38 @@ class BytesIO(BufferedIOBase):
return n return n
def seek(self, pos, whence=0): def seek(self, pos, whence=0):
if self.closed:
raise ValueError("seek on closed file")
try: try:
pos = pos.__index__() pos = pos.__index__()
except AttributeError as err: except AttributeError as err:
raise TypeError("an integer is required") from err raise TypeError("an integer is required") from err
if whence == 0: if whence == 0:
self._pos = max(0, pos) self._pos = max(0, pos)
if pos < 0:
raise ValueError("negative seek position %r" % (pos,))
elif whence == 1: elif whence == 1:
self._pos = max(0, self._pos + pos) self._pos = max(0, self._pos + pos)
elif whence == 2: elif whence == 2:
self._pos = max(0, len(self._buffer) + pos) self._pos = max(0, len(self._buffer) + pos)
else: else:
raise IOError("invalid whence value") raise ValueError("invalid whence value")
return self._pos return self._pos
def tell(self): def tell(self):
if self.closed:
raise ValueError("tell on closed file")
return self._pos return self._pos
def truncate(self, pos=None): def truncate(self, pos=None):
if self.closed:
raise ValueError("truncate on closed file")
if pos is None: if pos is None:
pos = self._pos pos = self._pos
elif pos < 0:
raise ValueError("negative truncate position %r" % (pos,))
del self._buffer[pos:] del self._buffer[pos:]
return pos return self.seek(pos)
def readable(self): def readable(self):
return True return True
@ -845,6 +866,16 @@ class BytesIO(BufferedIOBase):
def seekable(self): def seekable(self):
return True return True
# Use the faster implementation of BytesIO if available
try:
import _bytesio
class BytesIO(_bytesio._BytesIO, BufferedIOBase):
__doc__ = _bytesio._BytesIO.__doc__
except ImportError:
BytesIO = _BytesIO
class BufferedReader(_BufferedIOMixin): class BufferedReader(_BufferedIOMixin):
@ -978,6 +1009,12 @@ class BufferedWriter(_BufferedIOMixin):
raise BlockingIOError(e.errno, e.strerror, overage) raise BlockingIOError(e.errno, e.strerror, overage)
return written return written
def truncate(self, pos=None):
self.flush()
if pos is None:
pos = self.raw.tell()
return self.raw.truncate(pos)
def flush(self): def flush(self):
if self.closed: if self.closed:
raise ValueError("flush of closed file") raise ValueError("flush of closed file")
@ -1097,6 +1134,13 @@ class BufferedRandom(BufferedWriter, BufferedReader):
else: else:
return self.raw.tell() - len(self._read_buf) return self.raw.tell() - len(self._read_buf)
def truncate(self, pos=None):
if pos is None:
pos = self.tell()
# Use seek to flush the read buffer.
self.seek(pos)
return BufferedWriter.truncate(self)
def read(self, n=None): def read(self, n=None):
if n is None: if n is None:
n = -1 n = -1
@ -1145,11 +1189,7 @@ class TextIOBase(IOBase):
def truncate(self, pos: int = None) -> int: def truncate(self, pos: int = None) -> int:
"""Truncate size to pos.""" """Truncate size to pos."""
self.flush() self._unsupported("truncate")
if pos is None:
pos = self.tell()
self.seek(pos)
return self.buffer.truncate()
def readline(self) -> str: def readline(self) -> str:
"""Read until newline or EOF. """Read until newline or EOF.
@ -1346,6 +1386,12 @@ class TextIOWrapper(TextIOBase):
def seekable(self): def seekable(self):
return self._seekable return self._seekable
def readable(self):
return self.buffer.readable()
def writable(self):
return self.buffer.writable()
def flush(self): def flush(self):
self.buffer.flush() self.buffer.flush()
self._telling = self._seekable self._telling = self._seekable
@ -1539,7 +1585,16 @@ class TextIOWrapper(TextIOBase):
finally: finally:
decoder.setstate(saved_state) decoder.setstate(saved_state)
def truncate(self, pos=None):
self.flush()
if pos is None:
pos = self.tell()
self.seek(pos)
return self.buffer.truncate()
def seek(self, cookie, whence=0): def seek(self, cookie, whence=0):
if self.closed:
raise ValueError("tell on closed file")
if not self._seekable: if not self._seekable:
raise IOError("underlying stream is not seekable") raise IOError("underlying stream is not seekable")
if whence == 1: # seek relative to current position if whence == 1: # seek relative to current position
@ -1626,6 +1681,8 @@ class TextIOWrapper(TextIOBase):
return line return line
def readline(self, limit=None): def readline(self, limit=None):
if self.closed:
raise ValueError("read from closed file")
if limit is None: if limit is None:
limit = -1 limit = -1

View File

@ -1,127 +0,0 @@
# Tests StringIO and cStringIO
import sys
import unittest
import io
from test import test_support
class TestGenericStringIO:
# use a class variable CLASS to define which class is being tested
CLASS = None
# Line of data to test as string
_line = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!'
# Constructor to use for the test data (._line is passed to this
# constructor)
constructor = str
def setUp(self):
self._line = self.constructor(self._line)
self._lines = self.constructor((self._line + '\n') * 5)
self._fp = self.CLASS(self._lines)
def test_reads(self):
eq = self.assertEqual
self.assertRaises(TypeError, self._fp.seek)
eq(self._fp.read(10), self._line[:10])
eq(self._fp.readline(), self._line[10:] + '\n')
eq(len(self._fp.readlines(60)), 2)
def test_writes(self):
f = self.CLASS()
self.assertRaises(TypeError, f.seek)
f.write(self._line[:6])
f.seek(3)
f.write(self._line[20:26])
f.write(self._line[52])
self.assertEqual(f.getvalue(), 'abcuvwxyz!')
def test_writelines(self):
f = self.CLASS()
f.writelines([self._line[0], self._line[1], self._line[2]])
f.seek(0)
self.assertEqual(f.getvalue(), 'abc')
def test_writelines_error(self):
def errorGen():
yield 'a'
raise KeyboardInterrupt()
f = self.CLASS()
self.assertRaises(KeyboardInterrupt, f.writelines, errorGen())
def test_truncate(self):
eq = self.assertEqual
f = self.CLASS()
f.write(self._lines)
f.seek(10)
f.truncate()
eq(f.getvalue(), 'abcdefghij')
f.truncate(5)
eq(f.getvalue(), 'abcde')
f.write('xyz')
eq(f.getvalue(), 'abcdexyz')
self.assertRaises(ValueError, f.truncate, -1)
f.close()
self.assertRaises(ValueError, f.write, 'frobnitz')
def test_closed_flag(self):
f = self.CLASS()
self.assertEqual(f.closed, False)
f.close()
self.assertEqual(f.closed, True)
f = self.CLASS(self.constructor("abc"))
self.assertEqual(f.closed, False)
f.close()
self.assertEqual(f.closed, True)
def test_isatty(self):
f = self.CLASS()
self.assertRaises(TypeError, f.isatty, None)
self.assertEqual(f.isatty(), False)
f.close()
self.assertRaises(ValueError, f.isatty)
def test_iterator(self):
eq = self.assertEqual
unless = self.failUnless
eq(iter(self._fp), self._fp)
# Does this object support the iteration protocol?
unless(hasattr(self._fp, '__iter__'))
unless(hasattr(self._fp, '__next__'))
i = 0
for line in self._fp:
eq(line, self._line + '\n')
i += 1
eq(i, 5)
self._fp.close()
self.assertRaises(StopIteration, next, self._fp)
class TestioStringIO(TestGenericStringIO, unittest.TestCase):
CLASS = io.StringIO
def test_unicode(self):
if not test_support.have_unicode: return
# The StringIO module also supports concatenating Unicode
# snippets to larger Unicode strings. This is tested by this
# method. Note that cStringIO does not support this extension.
f = self.CLASS()
f.write(self._line[:6])
f.seek(3)
f.write(str(self._line[20:26]))
f.write(str(self._line[52]))
s = f.getvalue()
self.assertEqual(s, str('abcuvwxyz!'))
self.assertEqual(type(s), str)
def test_main():
test_support.run_unittest(TestioStringIO)
if __name__ == '__main__':
test_main()

View File

@ -98,7 +98,7 @@ class IOTest(unittest.TestCase):
self.assertEqual(f.seek(-1, 2), 13) self.assertEqual(f.seek(-1, 2), 13)
self.assertEqual(f.tell(), 13) self.assertEqual(f.tell(), 13)
self.assertEqual(f.truncate(12), 12) self.assertEqual(f.truncate(12), 12)
self.assertEqual(f.tell(), 13) self.assertEqual(f.tell(), 12)
self.assertRaises(TypeError, f.seek, 0.0) self.assertRaises(TypeError, f.seek, 0.0)
def read_ops(self, f, buffered=False): def read_ops(self, f, buffered=False):
@ -143,7 +143,7 @@ class IOTest(unittest.TestCase):
self.assertEqual(f.tell(), self.LARGE + 2) self.assertEqual(f.tell(), self.LARGE + 2)
self.assertEqual(f.seek(0, 2), self.LARGE + 2) self.assertEqual(f.seek(0, 2), self.LARGE + 2)
self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1) self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
self.assertEqual(f.tell(), self.LARGE + 2) self.assertEqual(f.tell(), self.LARGE + 1)
self.assertEqual(f.seek(0, 2), self.LARGE + 1) self.assertEqual(f.seek(0, 2), self.LARGE + 1)
self.assertEqual(f.seek(-1, 2), self.LARGE) self.assertEqual(f.seek(-1, 2), self.LARGE)
self.assertEqual(f.read(2), b"x") self.assertEqual(f.read(2), b"x")
@ -727,6 +727,7 @@ class TextIOWrapperTest(unittest.TestCase):
txt.write("BB\nCCC\n") txt.write("BB\nCCC\n")
txt.write("X\rY\r\nZ") txt.write("X\rY\r\nZ")
txt.flush() txt.flush()
self.assertEquals(buf.closed, False)
self.assertEquals(buf.getvalue(), expected) self.assertEquals(buf.getvalue(), expected)
def testNewlines(self): def testNewlines(self):
@ -807,7 +808,8 @@ class TextIOWrapperTest(unittest.TestCase):
txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline) txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
txt.write(data) txt.write(data)
txt.close() txt.close()
self.assertEquals(buf.getvalue(), expected) self.assertEquals(buf.closed, True)
self.assertRaises(ValueError, buf.getvalue)
finally: finally:
os.linesep = save_linesep os.linesep = save_linesep

View File

@ -120,14 +120,15 @@ class TestCase(unittest.TestCase):
newsize -= 1 newsize -= 1
f.seek(42) f.seek(42)
f.truncate(newsize) f.truncate(newsize)
self.assertEqual(f.tell(), 42) # else pointer moved
f.seek(0, 2)
self.assertEqual(f.tell(), newsize) # else wasn't truncated self.assertEqual(f.tell(), newsize) # else wasn't truncated
f.seek(0, 2)
self.assertEqual(f.tell(), newsize)
# XXX truncate(larger than true size) is ill-defined # XXX truncate(larger than true size) is ill-defined
# across platform; cut it waaaaay back # across platform; cut it waaaaay back
f.seek(0) f.seek(0)
f.truncate(1) f.truncate(1)
self.assertEqual(f.tell(), 0) # else pointer moved self.assertEqual(f.tell(), 1) # else pointer moved
f.seek(0)
self.assertEqual(len(f.read()), 1) # else wasn't truncated self.assertEqual(len(f.read()), 1) # else wasn't truncated
def test_main(): def test_main():

405
Lib/test/test_memoryio.py Normal file
View File

@ -0,0 +1,405 @@
"""Unit tests for memory-based file-like objects.
StringIO -- for unicode strings
BytesIO -- for bytes
"""
import unittest
from test import test_support
import io
try:
import _bytesio
has_c_implementation = True
except ImportError:
has_c_implementation = False
class MemoryTestMixin:
def write_ops(self, f, t):
self.assertEqual(f.write(t("blah.")), 5)
self.assertEqual(f.seek(0), 0)
self.assertEqual(f.write(t("Hello.")), 6)
self.assertEqual(f.tell(), 6)
self.assertEqual(f.seek(5), 5)
self.assertEqual(f.tell(), 5)
self.assertEqual(f.write(t(" world\n\n\n")), 9)
self.assertEqual(f.seek(0), 0)
self.assertEqual(f.write(t("h")), 1)
self.assertEqual(f.truncate(12), 12)
self.assertEqual(f.tell(), 12)
def test_write(self):
buf = self.buftype("hello world\n")
memio = self.ioclass(buf)
self.write_ops(memio, self.buftype)
self.assertEqual(memio.getvalue(), buf)
memio = self.ioclass()
self.write_ops(memio, self.buftype)
self.assertEqual(memio.getvalue(), buf)
self.assertRaises(TypeError, memio.write, None)
memio.close()
self.assertRaises(ValueError, memio.write, self.buftype(""))
def test_writelines(self):
buf = self.buftype("1234567890")
memio = self.ioclass()
self.assertEqual(memio.writelines([buf] * 100), None)
self.assertEqual(memio.getvalue(), buf * 100)
memio.writelines([])
self.assertEqual(memio.getvalue(), buf * 100)
memio = self.ioclass()
self.assertRaises(TypeError, memio.writelines, [buf] + [1])
self.assertEqual(memio.getvalue(), buf)
self.assertRaises(TypeError, memio.writelines, None)
memio.close()
self.assertRaises(ValueError, memio.writelines, [])
def test_writelines_error(self):
memio = self.ioclass()
def error_gen():
yield self.buftype('spam')
raise KeyboardInterrupt
self.assertRaises(KeyboardInterrupt, memio.writelines, error_gen())
def test_truncate(self):
buf = self.buftype("1234567890")
memio = self.ioclass(buf)
self.assertRaises(ValueError, memio.truncate, -1)
memio.seek(6)
self.assertEqual(memio.truncate(), 6)
self.assertEqual(memio.getvalue(), buf[:6])
self.assertEqual(memio.truncate(4), 4)
self.assertEqual(memio.getvalue(), buf[:4])
self.assertEqual(memio.tell(), 4)
memio.write(buf)
self.assertEqual(memio.getvalue(), buf[:4] + buf)
pos = memio.tell()
self.assertEqual(memio.truncate(None), pos)
self.assertEqual(memio.tell(), pos)
self.assertRaises(TypeError, memio.truncate, '0')
memio.close()
self.assertRaises(ValueError, memio.truncate, 0)
def test_init(self):
buf = self.buftype("1234567890")
memio = self.ioclass(buf)
self.assertEqual(memio.getvalue(), buf)
memio = self.ioclass(None)
self.assertEqual(memio.getvalue(), self.EOF)
memio.__init__(buf * 2)
self.assertEqual(memio.getvalue(), buf * 2)
memio.__init__(buf)
self.assertEqual(memio.getvalue(), buf)
def test_read(self):
buf = self.buftype("1234567890")
memio = self.ioclass(buf)
self.assertEqual(memio.read(0), self.EOF)
self.assertEqual(memio.read(1), buf[:1])
self.assertEqual(memio.read(4), buf[1:5])
self.assertEqual(memio.read(900), buf[5:])
self.assertEqual(memio.read(), self.EOF)
memio.seek(0)
self.assertEqual(memio.read(), buf)
self.assertEqual(memio.read(), self.EOF)
self.assertEqual(memio.tell(), 10)
memio.seek(0)
self.assertEqual(memio.read(-1), buf)
memio.seek(0)
self.assertEqual(memio.read(None), buf)
self.assertRaises(TypeError, memio.read, '')
memio.close()
self.assertRaises(ValueError, memio.read)
def test_readline(self):
buf = self.buftype("1234567890\n")
memio = self.ioclass(buf * 2)
self.assertEqual(memio.readline(0), self.EOF)
self.assertEqual(memio.readline(), buf)
self.assertEqual(memio.readline(), buf)
self.assertEqual(memio.readline(), self.EOF)
memio.seek(0)
self.assertEqual(memio.readline(5), buf[:5])
self.assertEqual(memio.readline(5), buf[5:10])
self.assertEqual(memio.readline(5), buf[10:15])
memio.seek(0)
self.assertEqual(memio.readline(-1), buf)
memio.seek(0)
self.assertEqual(memio.readline(0), self.EOF)
buf = self.buftype("1234567890\n")
memio = self.ioclass((buf * 3)[:-1])
self.assertEqual(memio.readline(), buf)
self.assertEqual(memio.readline(), buf)
self.assertEqual(memio.readline(), buf[:-1])
self.assertEqual(memio.readline(), self.EOF)
memio.seek(0)
self.assertEqual(memio.readline(None), buf)
self.assertRaises(TypeError, memio.readline, '')
memio.close()
self.assertRaises(ValueError, memio.readline)
def test_readlines(self):
buf = self.buftype("1234567890\n")
memio = self.ioclass(buf * 10)
self.assertEqual(memio.readlines(), [buf] * 10)
memio.seek(5)
self.assertEqual(memio.readlines(), [buf[5:]] + [buf] * 9)
memio.seek(0)
self.assertEqual(memio.readlines(15), [buf] * 2)
memio.seek(0)
self.assertEqual(memio.readlines(-1), [buf] * 10)
memio.seek(0)
self.assertEqual(memio.readlines(0), [buf] * 10)
memio.seek(0)
self.assertEqual(memio.readlines(None), [buf] * 10)
self.assertRaises(TypeError, memio.readlines, '')
memio.close()
self.assertRaises(ValueError, memio.readlines)
def test_iterator(self):
buf = self.buftype("1234567890\n")
memio = self.ioclass(buf * 10)
self.assertEqual(iter(memio), memio)
self.failUnless(hasattr(memio, '__iter__'))
self.failUnless(hasattr(memio, '__next__'))
i = 0
for line in memio:
self.assertEqual(line, buf)
i += 1
self.assertEqual(i, 10)
memio.seek(0)
i = 0
for line in memio:
self.assertEqual(line, buf)
i += 1
self.assertEqual(i, 10)
memio = self.ioclass(buf * 2)
memio.close()
self.assertRaises(ValueError, memio.__next__)
def test_getvalue(self):
buf = self.buftype("1234567890")
memio = self.ioclass(buf)
self.assertEqual(memio.getvalue(), buf)
memio.read()
self.assertEqual(memio.getvalue(), buf)
memio = self.ioclass(buf * 1000)
self.assertEqual(memio.getvalue()[-3:], self.buftype("890"))
memio = self.ioclass(buf)
memio.close()
self.assertRaises(ValueError, memio.getvalue)
def test_seek(self):
buf = self.buftype("1234567890")
memio = self.ioclass(buf)
memio.read(5)
self.assertRaises(ValueError, memio.seek, -1)
self.assertRaises(ValueError, memio.seek, 1, -1)
self.assertRaises(ValueError, memio.seek, 1, 3)
self.assertEqual(memio.seek(0), 0)
self.assertEqual(memio.seek(0, 0), 0)
self.assertEqual(memio.read(), buf)
self.assertEqual(memio.seek(3), 3)
self.assertEqual(memio.seek(0, 1), 3)
self.assertEqual(memio.read(), buf[3:])
self.assertEqual(memio.seek(len(buf)), len(buf))
self.assertEqual(memio.read(), self.EOF)
memio.seek(len(buf) + 1)
self.assertEqual(memio.read(), self.EOF)
self.assertEqual(memio.seek(0, 2), len(buf))
self.assertEqual(memio.read(), self.EOF)
memio.close()
self.assertRaises(ValueError, memio.seek, 0)
def test_overseek(self):
buf = self.buftype("1234567890")
memio = self.ioclass(buf)
self.assertEqual(memio.seek(len(buf) + 1), 11)
self.assertEqual(memio.read(), self.EOF)
self.assertEqual(memio.tell(), 11)
self.assertEqual(memio.getvalue(), buf)
memio.write(self.EOF)
self.assertEqual(memio.getvalue(), buf)
memio.write(buf)
self.assertEqual(memio.getvalue(), buf + self.buftype('\0') + buf)
def test_tell(self):
buf = self.buftype("1234567890")
memio = self.ioclass(buf)
self.assertEqual(memio.tell(), 0)
memio.seek(5)
self.assertEqual(memio.tell(), 5)
memio.seek(10000)
self.assertEqual(memio.tell(), 10000)
memio.close()
self.assertRaises(ValueError, memio.tell)
def test_flush(self):
buf = self.buftype("1234567890")
memio = self.ioclass(buf)
self.assertEqual(memio.flush(), None)
def test_flags(self):
memio = self.ioclass()
self.assertEqual(memio.writable(), True)
self.assertEqual(memio.readable(), True)
self.assertEqual(memio.seekable(), True)
self.assertEqual(memio.isatty(), False)
self.assertEqual(memio.closed, False)
memio.close()
self.assertEqual(memio.writable(), True)
self.assertEqual(memio.readable(), True)
self.assertEqual(memio.seekable(), True)
self.assertRaises(ValueError, memio.isatty)
self.assertEqual(memio.closed, True)
def test_subclassing(self):
buf = self.buftype("1234567890")
def test1():
class MemIO(self.ioclass):
pass
m = MemIO(buf)
return m.getvalue()
def test2():
class MemIO(self.ioclass):
def __init__(me, a, b):
self.ioclass.__init__(me, a)
m = MemIO(buf, None)
return m.getvalue()
self.assertEqual(test1(), buf)
self.assertEqual(test2(), buf)
class PyBytesIOTest(MemoryTestMixin, unittest.TestCase):
@staticmethod
def buftype(s):
return s.encode("ascii")
ioclass = io._BytesIO
EOF = b""
def test_read1(self):
buf = self.buftype("1234567890")
memio = self.ioclass(buf)
self.assertRaises(TypeError, memio.read1)
self.assertEqual(memio.read(), buf)
def test_readinto(self):
buf = self.buftype("1234567890")
memio = self.ioclass(buf)
b = bytearray(b"hello")
self.assertEqual(memio.readinto(b), 5)
self.assertEqual(b, b"12345")
self.assertEqual(memio.readinto(b), 5)
self.assertEqual(b, b"67890")
self.assertEqual(memio.readinto(b), 0)
self.assertEqual(b, b"67890")
b = bytearray(b"hello world")
memio.seek(0)
self.assertEqual(memio.readinto(b), 10)
self.assertEqual(b, b"1234567890d")
b = bytearray(b"")
memio.seek(0)
self.assertEqual(memio.readinto(b), 0)
self.assertEqual(b, b"")
self.assertRaises(TypeError, memio.readinto, '')
import array
a = array.array('b', b"hello world")
memio = self.ioclass(buf)
memio.readinto(a)
self.assertEqual(a.tostring(), b"1234567890d")
memio.close()
self.assertRaises(ValueError, memio.readinto, b)
def test_relative_seek(self):
buf = self.buftype("1234567890")
memio = self.ioclass(buf)
self.assertEqual(memio.seek(-1, 1), 0)
self.assertEqual(memio.seek(3, 1), 3)
self.assertEqual(memio.seek(-4, 1), 0)
self.assertEqual(memio.seek(-1, 2), 9)
self.assertEqual(memio.seek(1, 1), 10)
self.assertEqual(memio.seek(1, 2), 11)
memio.seek(-3, 2)
self.assertEqual(memio.read(), buf[-3:])
memio.seek(0)
memio.seek(1, 1)
self.assertEqual(memio.read(), buf[1:])
def test_unicode(self):
memio = self.ioclass()
self.assertRaises(TypeError, self.ioclass, "1234567890")
self.assertRaises(TypeError, memio.write, "1234567890")
self.assertRaises(TypeError, memio.writelines, ["1234567890"])
def test_bytes_array(self):
buf = b"1234567890"
import array
a = array.array('b', list(buf))
memio = self.ioclass(a)
self.assertEqual(memio.getvalue(), buf)
self.assertEqual(memio.write(a), 10)
self.assertEqual(memio.getvalue(), buf)
class PyStringIOTest(MemoryTestMixin, unittest.TestCase):
buftype = str
ioclass = io.StringIO
EOF = ""
def test_relative_seek(self):
memio = self.ioclass()
self.assertRaises(IOError, memio.seek, -1, 1)
self.assertRaises(IOError, memio.seek, 3, 1)
self.assertRaises(IOError, memio.seek, -3, 1)
self.assertRaises(IOError, memio.seek, -1, 2)
self.assertRaises(IOError, memio.seek, 1, 1)
self.assertRaises(IOError, memio.seek, 1, 2)
# XXX: For the Python version of io.StringIO, this is highly
# dependent on the encoding used for the underlying buffer.
# def test_widechar(self):
# buf = self.buftype("\U0002030a\U00020347")
# memio = self.ioclass(buf)
#
# self.assertEqual(memio.getvalue(), buf)
# self.assertEqual(memio.write(buf), len(buf))
# self.assertEqual(memio.tell(), len(buf))
# self.assertEqual(memio.getvalue(), buf)
# self.assertEqual(memio.write(buf), len(buf))
# self.assertEqual(memio.tell(), len(buf) * 2)
# self.assertEqual(memio.getvalue(), buf + buf)
if has_c_implementation:
class CBytesIOTest(PyBytesIOTest):
ioclass = io.BytesIO
def test_main():
tests = [PyBytesIOTest, PyStringIOTest]
if has_c_implementation:
tests.extend([CBytesIOTest])
test_support.run_unittest(*tests)
if __name__ == '__main__':
test_main()

View File

@ -58,7 +58,7 @@ class MimeToolsTest(unittest.TestCase):
s.add(nb) s.add(nb)
def test_message(self): def test_message(self):
msg = mimetools.Message(io.StringIO(msgtext1)) msg = mimetools.Message(io.StringIO(str(msgtext1)))
self.assertEqual(msg.gettype(), "text/plain") self.assertEqual(msg.gettype(), "text/plain")
self.assertEqual(msg.getmaintype(), "text") self.assertEqual(msg.getmaintype(), "text")
self.assertEqual(msg.getsubtype(), "plain") self.assertEqual(msg.getsubtype(), "plain")

744
Modules/_bytesio.c Normal file
View File

@ -0,0 +1,744 @@
#include "Python.h"
typedef struct {
PyObject_HEAD
char *buf;
Py_ssize_t pos;
Py_ssize_t string_size;
size_t buf_size;
} BytesIOObject;
#define CHECK_CLOSED(self) \
if ((self)->buf == NULL) { \
PyErr_SetString(PyExc_ValueError, \
"I/O operation on closed file."); \
return NULL; \
}
/* Internal routine to get a line from the buffer of a BytesIO
object. Returns the length between the current position to the
next newline character. */
static Py_ssize_t
get_line(BytesIOObject *self, char **output)
{
char *n;
const char *str_end;
Py_ssize_t len;
assert(self->buf != NULL);
/* Move to the end of the line, up to the end of the string, s. */
str_end = self->buf + self->string_size;
for (n = self->buf + self->pos;
n < str_end && *n != '\n';
n++);
/* Skip the newline character */
if (n < str_end)
n++;
/* Get the length from the current position to the end of the line. */
len = n - (self->buf + self->pos);
*output = self->buf + self->pos;
assert(len >= 0);
assert(self->pos < PY_SSIZE_T_MAX - len);
self->pos += len;
return len;
}
/* Internal routine for changing the size of the buffer of BytesIO objects.
The caller should ensure that the 'size' argument is non-negative. Returns
0 on success, -1 otherwise. */
static int
resize_buffer(BytesIOObject *self, size_t size)
{
/* Here, unsigned types are used to avoid dealing with signed integer
overflow, which is undefined in C. */
size_t alloc = self->buf_size;
char *new_buf = NULL;
assert(self->buf != NULL);
/* For simplicity, stay in the range of the signed type. Anyway, Python
doesn't allow strings to be longer than this. */
if (size > PY_SSIZE_T_MAX)
goto overflow;
if (size < alloc / 2) {
/* Major downsize; resize down to exact size. */
alloc = size + 1;
}
else if (size < alloc) {
/* Within allocated size; quick exit */
return 0;
}
else if (size <= alloc * 1.125) {
/* Moderate upsize; overallocate similar to list_resize() */
alloc = size + (size >> 3) + (size < 9 ? 3 : 6);
}
else {
/* Major upsize; resize up to exact size */
alloc = size + 1;
}
if (alloc > ((size_t)-1) / sizeof(char))
goto overflow;
new_buf = (char *)PyMem_Realloc(self->buf, alloc * sizeof(char));
if (new_buf == NULL) {
PyErr_NoMemory();
return -1;
}
self->buf_size = alloc;
self->buf = new_buf;
return 0;
overflow:
PyErr_SetString(PyExc_OverflowError,
"new buffer size too large");
return -1;
}
/* Internal routine for writing a string of bytes to the buffer of a BytesIO
object. Returns the number of bytes wrote, or -1 on error. */
static Py_ssize_t
write_bytes(BytesIOObject *self, const char *bytes, Py_ssize_t len)
{
assert(self->buf != NULL);
assert(self->pos >= 0);
assert(len >= 0);
/* This overflow check is not strictly necessary. However, it avoids us to
deal with funky things like comparing an unsigned and a signed
integer. */
if (self->pos > PY_SSIZE_T_MAX - len) {
PyErr_SetString(PyExc_OverflowError,
"new position too large");
return -1;
}
if (self->pos + len > self->buf_size) {
if (resize_buffer(self, self->pos + len) < 0)
return -1;
}
if (self->pos > self->string_size) {
/* In case of overseek, pad with null bytes the buffer region between
the end of stream and the current position.
0 lo string_size hi
| |<---used--->|<----------available----------->|
| | <--to pad-->|<---to write---> |
0 buf position
*/
memset(self->buf + self->string_size, '\0',
(self->pos - self->string_size) * sizeof(char));
}
/* Copy the data to the internal buffer, overwriting some of the existing
data if self->pos < self->string_size. */
memcpy(self->buf + self->pos, bytes, len);
self->pos += len;
/* Set the new length of the internal string if it has changed. */
if (self->string_size < self->pos) {
self->string_size = self->pos;
}
return len;
}
static PyObject *
bytesio_get_closed(BytesIOObject *self)
{
if (self->buf == NULL)
Py_RETURN_TRUE;
else
Py_RETURN_FALSE;
}
/* Generic getter for the writable, readable and seekable properties */
static PyObject *
return_true(BytesIOObject *self)
{
Py_RETURN_TRUE;
}
PyDoc_STRVAR(flush_doc,
"flush() -> None. Does nothing.");
static PyObject *
bytesio_flush(BytesIOObject *self)
{
Py_RETURN_NONE;
}
PyDoc_STRVAR(getval_doc,
"getvalue() -> string.\n"
"\n"
"Retrieve the entire contents of the BytesIO object.");
static PyObject *
bytesio_getvalue(BytesIOObject *self)
{
CHECK_CLOSED(self);
return PyString_FromStringAndSize(self->buf, self->string_size);
}
PyDoc_STRVAR(isatty_doc,
"isatty() -> False.\n"
"\n"
"Always returns False since BytesIO objects are not connected\n"
"to a tty-like device.");
static PyObject *
bytesio_isatty(BytesIOObject *self)
{
CHECK_CLOSED(self);
Py_RETURN_FALSE;
}
PyDoc_STRVAR(tell_doc,
"tell() -> current file position, an integer\n");
static PyObject *
bytesio_tell(BytesIOObject *self)
{
CHECK_CLOSED(self);
return PyLong_FromSsize_t(self->pos);
}
PyDoc_STRVAR(read_doc,
"read([size]) -> read at most size bytes, returned as a string.\n"
"\n"
"If the size argument is negative, read until EOF is reached.\n"
"Return an empty string at EOF.");
static PyObject *
bytesio_read(BytesIOObject *self, PyObject *args)
{
Py_ssize_t size, n;
char *output;
PyObject *arg = Py_None;
CHECK_CLOSED(self);
if (!PyArg_ParseTuple(args, "|O:read", &arg))
return NULL;
if (PyLong_Check(arg)) {
size = PyLong_AsSsize_t(arg);
}
else if (arg == Py_None) {
/* Read until EOF is reached, by default. */
size = -1;
}
else {
PyErr_Format(PyExc_TypeError, "integer argument expected, got '%s'",
Py_TYPE(arg)->tp_name);
return NULL;
}
/* adjust invalid sizes */
n = self->string_size - self->pos;
if (size < 0 || size > n) {
size = n;
if (size < 0)
size = 0;
}
assert(self->buf != NULL);
output = self->buf + self->pos;
self->pos += size;
return PyString_FromStringAndSize(output, size);
}
PyDoc_STRVAR(read1_doc,
"read1(size) -> read at most size bytes, returned as a string.\n"
"\n"
"If the size argument is negative or omitted, read until EOF is reached.\n"
"Return an empty string at EOF.");
static PyObject *
bytesio_read1(BytesIOObject *self, PyObject *n)
{
PyObject *arg, *res;
arg = PyTuple_Pack(1, n);
if (arg == NULL)
return NULL;
res = bytesio_read(self, arg);
Py_DECREF(arg);
return res;
}
PyDoc_STRVAR(readline_doc,
"readline([size]) -> next line from the file, as a string.\n"
"\n"
"Retain newline. A non-negative size argument limits the maximum\n"
"number of bytes to return (an incomplete line may be returned then).\n"
"Return an empty string at EOF.\n");
static PyObject *
bytesio_readline(BytesIOObject *self, PyObject *args)
{
Py_ssize_t size, n;
char *output;
PyObject *arg = Py_None;
CHECK_CLOSED(self);
if (!PyArg_ParseTuple(args, "|O:readline", &arg))
return NULL;
if (PyLong_Check(arg)) {
size = PyLong_AsSsize_t(arg);
}
else if (arg == Py_None) {
/* No size limit, by default. */
size = -1;
}
else {
PyErr_Format(PyExc_TypeError, "integer argument expected, got '%s'",
Py_TYPE(arg)->tp_name);
return NULL;
}
n = get_line(self, &output);
if (size >= 0 && size < n) {
size = n - size;
n -= size;
self->pos -= size;
}
return PyString_FromStringAndSize(output, n);
}
PyDoc_STRVAR(readlines_doc,
"readlines([size]) -> list of strings, each a line from the file.\n"
"\n"
"Call readline() repeatedly and return a list of the lines so read.\n"
"The optional size argument, if given, is an approximate bound on the\n"
"total number of bytes in the lines returned.\n");
static PyObject *
bytesio_readlines(BytesIOObject *self, PyObject *args)
{
Py_ssize_t maxsize, size, n;
PyObject *result, *line;
char *output;
PyObject *arg = Py_None;
CHECK_CLOSED(self);
if (!PyArg_ParseTuple(args, "|O:readlines", &arg))
return NULL;
if (PyLong_Check(arg)) {
maxsize = PyLong_AsSsize_t(arg);
}
else if (arg == Py_None) {
/* No size limit, by default. */
maxsize = -1;
}
else {
PyErr_Format(PyExc_TypeError, "integer argument expected, got '%s'",
Py_TYPE(arg)->tp_name);
return NULL;
}
size = 0;
result = PyList_New(0);
if (!result)
return NULL;
while ((n = get_line(self, &output)) != 0) {
line = PyString_FromStringAndSize(output, n);
if (!line)
goto on_error;
if (PyList_Append(result, line) == -1) {
Py_DECREF(line);
goto on_error;
}
Py_DECREF(line);
size += n;
if (maxsize > 0 && size >= maxsize)
break;
}
return result;
on_error:
Py_DECREF(result);
return NULL;
}
PyDoc_STRVAR(readinto_doc,
"readinto(bytes) -> int. Read up to len(b) bytes into b.\n"
"\n"
"Returns number of bytes read (0 for EOF), or None if the object\n"
"is set not to block as has no data to read.");
static PyObject *
bytesio_readinto(BytesIOObject *self, PyObject *buffer)
{
void *raw_buffer;
Py_ssize_t len;
CHECK_CLOSED(self);
if (PyObject_AsWriteBuffer(buffer, &raw_buffer, &len) == -1)
return NULL;
if (self->pos + len > self->string_size)
len = self->string_size - self->pos;
memcpy(raw_buffer, self->buf + self->pos, len);
assert(self->pos + len < PY_SSIZE_T_MAX);
assert(len >= 0);
self->pos += len;
return PyLong_FromSsize_t(len);
}
PyDoc_STRVAR(truncate_doc,
"truncate([size]) -> int. Truncate the file to at most size bytes.\n"
"\n"
"Size defaults to the current file position, as returned by tell().\n"
"Returns the new size. Imply an absolute seek to the position size.");
static PyObject *
bytesio_truncate(BytesIOObject *self, PyObject *args)
{
Py_ssize_t size;
PyObject *arg = Py_None;
CHECK_CLOSED(self);
if (!PyArg_ParseTuple(args, "|O:truncate", &arg))
return NULL;
if (PyLong_Check(arg)) {
size = PyLong_AsSsize_t(arg);
}
else if (arg == Py_None) {
/* Truncate to current position if no argument is passed. */
size = self->pos;
}
else {
PyErr_Format(PyExc_TypeError, "integer argument expected, got '%s'",
Py_TYPE(arg)->tp_name);
return NULL;
}
if (size < 0) {
PyErr_Format(PyExc_ValueError,
"negative size value %zd", size);
return NULL;
}
if (size < self->string_size) {
self->string_size = size;
if (resize_buffer(self, size) < 0)
return NULL;
}
self->pos = size;
return PyLong_FromSsize_t(size);
}
static PyObject *
bytesio_iternext(BytesIOObject *self)
{
char *next;
Py_ssize_t n;
CHECK_CLOSED(self);
n = get_line(self, &next);
if (!next || n == 0)
return NULL;
return PyString_FromStringAndSize(next, n);
}
PyDoc_STRVAR(seek_doc,
"seek(pos, whence=0) -> int. Change stream position.\n"
"\n"
"Seek to byte offset pos relative to position indicated by whence:\n"
" 0 Start of stream (the default). pos should be >= 0;\n"
" 1 Current position - pos may be negative;\n"
" 2 End of stream - pos usually negative.\n"
"Returns the new absolute position.");
static PyObject *
bytesio_seek(BytesIOObject *self, PyObject *args)
{
Py_ssize_t pos;
int mode = 0;
CHECK_CLOSED(self);
if (!PyArg_ParseTuple(args, "n|i:seek", &pos, &mode))
return NULL;
if (pos < 0 && mode == 0) {
PyErr_Format(PyExc_ValueError,
"negative seek value %zd", pos);
return NULL;
}
/* mode 0: offset relative to beginning of the string.
mode 1: offset relative to current position.
mode 2: offset relative the end of the string. */
if (mode == 1) {
if (pos > PY_SSIZE_T_MAX - self->pos) {
PyErr_SetString(PyExc_OverflowError,
"new position too large");
return NULL;
}
pos += self->pos;
}
else if (mode == 2) {
if (pos > PY_SSIZE_T_MAX - self->string_size) {
PyErr_SetString(PyExc_OverflowError,
"new position too large");
return NULL;
}
pos += self->string_size;
}
else if (mode != 0) {
PyErr_Format(PyExc_ValueError,
"invalid whence (%i, should be 0, 1 or 2)", mode);
return NULL;
}
if (pos < 0)
pos = 0;
self->pos = pos;
return PyLong_FromSsize_t(self->pos);
}
PyDoc_STRVAR(write_doc,
"write(str) -> int. Write string str to file.\n"
"\n"
"Return the number of bytes written.");
static PyObject *
bytesio_write(BytesIOObject *self, PyObject *obj)
{
const char *bytes;
Py_ssize_t size;
Py_ssize_t n = 0;
CHECK_CLOSED(self);
if (PyObject_AsReadBuffer(obj, (void *)&bytes, &size) < 0)
return NULL;
if (size != 0) {
n = write_bytes(self, bytes, size);
if (n < 0)
return NULL;
}
return PyLong_FromSsize_t(n);
}
PyDoc_STRVAR(writelines_doc,
"writelines(sequence_of_strings) -> None. Write the strings to the file.\n"
"\n"
"Note that newlines are not added. The sequence can be any iterable object\n"
"producing strings. This is equivalent to calling write() for each string.");
static PyObject *
bytesio_writelines(BytesIOObject *self, PyObject *v)
{
PyObject *it, *item;
PyObject *ret;
CHECK_CLOSED(self);
it = PyObject_GetIter(v);
if (it == NULL)
return NULL;
while ((item = PyIter_Next(it)) != NULL) {
ret = bytesio_write(self, item);
Py_DECREF(item);
if (ret == NULL) {
Py_DECREF(it);
return NULL;
}
Py_DECREF(ret);
}
Py_DECREF(it);
/* See if PyIter_Next failed */
if (PyErr_Occurred())
return NULL;
Py_RETURN_NONE;
}
PyDoc_STRVAR(close_doc,
"close() -> None. Disable all I/O operations.");
static PyObject *
bytesio_close(BytesIOObject *self)
{
if (self->buf != NULL) {
PyMem_Free(self->buf);
self->buf = NULL;
}
Py_RETURN_NONE;
}
static void
bytesio_dealloc(BytesIOObject *self)
{
if (self->buf != NULL) {
PyMem_Free(self->buf);
self->buf = NULL;
}
Py_TYPE(self)->tp_free(self);
}
static PyObject *
bytesio_new(PyTypeObject *type, PyObject *args, PyObject *kwds)
{
BytesIOObject *self;
assert(type != NULL && type->tp_alloc != NULL);
self = (BytesIOObject *)type->tp_alloc(type, 0);
if (self == NULL)
return NULL;
self->string_size = 0;
self->pos = 0;
self->buf_size = 0;
self->buf = (char *)PyMem_Malloc(0);
if (self->buf == NULL) {
Py_DECREF(self);
return PyErr_NoMemory();
}
return (PyObject *)self;
}
static int
bytesio_init(BytesIOObject *self, PyObject *args, PyObject *kwds)
{
PyObject *initvalue = NULL;
if (!PyArg_ParseTuple(args, "|O:BytesIO", &initvalue))
return -1;
/* In case, __init__ is called multiple times. */
self->string_size = 0;
self->pos = 0;
if (initvalue && initvalue != Py_None) {
PyObject *res;
res = bytesio_write(self, initvalue);
if (res == NULL)
return -1;
Py_DECREF(res);
self->pos = 0;
}
return 0;
}
static PyGetSetDef bytesio_getsetlist[] = {
{"closed", (getter)bytesio_get_closed, NULL,
"True if the file is closed."},
{0}, /* sentinel */
};
static struct PyMethodDef bytesio_methods[] = {
{"readable", (PyCFunction)return_true, METH_NOARGS, NULL},
{"seekable", (PyCFunction)return_true, METH_NOARGS, NULL},
{"writable", (PyCFunction)return_true, METH_NOARGS, NULL},
{"close", (PyCFunction)bytesio_close, METH_NOARGS, close_doc},
{"flush", (PyCFunction)bytesio_flush, METH_NOARGS, flush_doc},
{"isatty", (PyCFunction)bytesio_isatty, METH_NOARGS, isatty_doc},
{"tell", (PyCFunction)bytesio_tell, METH_NOARGS, tell_doc},
{"write", (PyCFunction)bytesio_write, METH_O, write_doc},
{"writelines", (PyCFunction)bytesio_writelines, METH_O, writelines_doc},
{"read1", (PyCFunction)bytesio_read1, METH_O, read1_doc},
{"readinto", (PyCFunction)bytesio_readinto, METH_O, readinto_doc},
{"readline", (PyCFunction)bytesio_readline, METH_VARARGS, readline_doc},
{"readlines", (PyCFunction)bytesio_readlines, METH_VARARGS, readlines_doc},
{"read", (PyCFunction)bytesio_read, METH_VARARGS, read_doc},
{"getvalue", (PyCFunction)bytesio_getvalue, METH_VARARGS, getval_doc},
{"seek", (PyCFunction)bytesio_seek, METH_VARARGS, seek_doc},
{"truncate", (PyCFunction)bytesio_truncate, METH_VARARGS, truncate_doc},
{NULL, NULL} /* sentinel */
};
PyDoc_STRVAR(bytesio_doc,
"BytesIO([buffer]) -> object\n"
"\n"
"Create a buffered I/O implementation using an in-memory bytes\n"
"buffer, ready for reading and writing.");
static PyTypeObject BytesIO_Type = {
PyVarObject_HEAD_INIT(NULL, 0)
"_bytesio._BytesIO", /*tp_name*/
sizeof(BytesIOObject), /*tp_basicsize*/
0, /*tp_itemsize*/
(destructor)bytesio_dealloc, /*tp_dealloc*/
0, /*tp_print*/
0, /*tp_getattr*/
0, /*tp_setattr*/
0, /*tp_compare*/
0, /*tp_repr*/
0, /*tp_as_number*/
0, /*tp_as_sequence*/
0, /*tp_as_mapping*/
0, /*tp_hash*/
0, /*tp_call*/
0, /*tp_str*/
0, /*tp_getattro*/
0, /*tp_setattro*/
0, /*tp_as_buffer*/
Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, /*tp_flags*/
bytesio_doc, /*tp_doc*/
0, /*tp_traverse*/
0, /*tp_clear*/
0, /*tp_richcompare*/
0, /*tp_weaklistoffset*/
PyObject_SelfIter, /*tp_iter*/
(iternextfunc)bytesio_iternext, /*tp_iternext*/
bytesio_methods, /*tp_methods*/
0, /*tp_members*/
bytesio_getsetlist, /*tp_getset*/
0, /*tp_base*/
0, /*tp_dict*/
0, /*tp_descr_get*/
0, /*tp_descr_set*/
0, /*tp_dictoffset*/
(initproc)bytesio_init, /*tp_init*/
0, /*tp_alloc*/
bytesio_new, /*tp_new*/
};
PyMODINIT_FUNC
init_bytesio(void)
{
PyObject *m;
if (PyType_Ready(&BytesIO_Type) < 0)
return;
m = Py_InitModule("_bytesio", NULL);
if (m == NULL)
return;
Py_INCREF(&BytesIO_Type);
PyModule_AddObject(m, "_BytesIO", (PyObject *)&BytesIO_Type);
}

View File

@ -552,11 +552,10 @@ portable_lseek(int fd, PyObject *posobj, int whence)
PyErr_SetString(PyExc_TypeError, "an integer is required"); PyErr_SetString(PyExc_TypeError, "an integer is required");
return NULL; return NULL;
} }
#if !defined(HAVE_LARGEFILE_SUPPORT) #if defined(HAVE_LARGEFILE_SUPPORT)
pos = PyLong_AsLong(posobj); pos = PyLong_AsLongLong(posobj);
#else #else
pos = PyLong_Check(posobj) ? pos = PyLong_AsLong(posobj);
PyLong_AsLongLong(posobj) : PyLong_AsLong(posobj);
#endif #endif
if (PyErr_Occurred()) if (PyErr_Occurred())
return NULL; return NULL;
@ -572,10 +571,10 @@ portable_lseek(int fd, PyObject *posobj, int whence)
if (res < 0) if (res < 0)
return PyErr_SetFromErrno(PyExc_IOError); return PyErr_SetFromErrno(PyExc_IOError);
#if !defined(HAVE_LARGEFILE_SUPPORT) #if defined(HAVE_LARGEFILE_SUPPORT)
return PyLong_FromLong(res);
#else
return PyLong_FromLongLong(res); return PyLong_FromLongLong(res);
#else
return PyLong_FromLong(res);
#endif #endif
} }
@ -622,48 +621,29 @@ fileio_truncate(PyFileIOObject *self, PyObject *args)
return NULL; return NULL;
if (posobj == Py_None || posobj == NULL) { if (posobj == Py_None || posobj == NULL) {
/* Get the current position. */
posobj = portable_lseek(fd, NULL, 1); posobj = portable_lseek(fd, NULL, 1);
if (posobj == NULL) if (posobj == NULL)
return NULL; return NULL;
} }
else { else {
Py_INCREF(posobj); /* Move to the position to be truncated. */
posobj = portable_lseek(fd, posobj, 0);
} }
#if !defined(HAVE_LARGEFILE_SUPPORT) #if defined(HAVE_LARGEFILE_SUPPORT)
pos = PyLong_AsLong(posobj); pos = PyLong_AsLongLong(posobj);
#else #else
pos = PyLong_Check(posobj) ? pos = PyLong_AsLong(posobj);
PyLong_AsLongLong(posobj) : PyLong_AsLong(posobj);
#endif #endif
if (PyErr_Occurred()) { if (PyErr_Occurred())
Py_DECREF(posobj);
return NULL; return NULL;
}
#ifdef MS_WINDOWS #ifdef MS_WINDOWS
/* MS _chsize doesn't work if newsize doesn't fit in 32 bits, /* MS _chsize doesn't work if newsize doesn't fit in 32 bits,
so don't even try using it. */ so don't even try using it. */
{ {
HANDLE hFile; HANDLE hFile;
PyObject *pos2, *oldposobj;
/* store the current position */
oldposobj = portable_lseek(self->fd, NULL, 1);
if (oldposobj == NULL) {
Py_DECREF(posobj);
return NULL;
}
/* Have to move current pos to desired endpoint on Windows. */
errno = 0;
pos2 = portable_lseek(fd, posobj, SEEK_SET);
if (pos2 == NULL) {
Py_DECREF(posobj);
Py_DECREF(oldposobj);
return NULL;
}
Py_DECREF(pos2);
/* Truncate. Note that this may grow the file! */ /* Truncate. Note that this may grow the file! */
Py_BEGIN_ALLOW_THREADS Py_BEGIN_ALLOW_THREADS
@ -676,18 +656,6 @@ fileio_truncate(PyFileIOObject *self, PyObject *args)
errno = EACCES; errno = EACCES;
} }
Py_END_ALLOW_THREADS Py_END_ALLOW_THREADS
if (ret == 0) {
/* Move to the previous position in the file */
pos2 = portable_lseek(fd, oldposobj, SEEK_SET);
if (pos2 == NULL) {
Py_DECREF(posobj);
Py_DECREF(oldposobj);
return NULL;
}
}
Py_DECREF(pos2);
Py_DECREF(oldposobj);
} }
#else #else
Py_BEGIN_ALLOW_THREADS Py_BEGIN_ALLOW_THREADS
@ -697,7 +665,6 @@ fileio_truncate(PyFileIOObject *self, PyObject *args)
#endif /* !MS_WINDOWS */ #endif /* !MS_WINDOWS */
if (ret != 0) { if (ret != 0) {
Py_DECREF(posobj);
PyErr_SetFromErrno(PyExc_IOError); PyErr_SetFromErrno(PyExc_IOError);
return NULL; return NULL;
} }
@ -791,7 +758,8 @@ PyDoc_STRVAR(seek_doc,
PyDoc_STRVAR(truncate_doc, PyDoc_STRVAR(truncate_doc,
"truncate([size: int]) -> None. Truncate the file to at most size bytes.\n" "truncate([size: int]) -> None. Truncate the file to at most size bytes.\n"
"\n" "\n"
"Size defaults to the current file position, as returned by tell()."); "Size defaults to the current file position, as returned by tell()."
"The current file position is changed to the value of size.");
#endif #endif
PyDoc_STRVAR(tell_doc, PyDoc_STRVAR(tell_doc,

View File

@ -240,11 +240,11 @@ Py_InitializeEx(int install_sigs)
} }
initmain(); /* Module __main__ */ initmain(); /* Module __main__ */
if (!Py_NoSiteFlag)
initsite(); /* Module site */
if (initstdio() < 0) if (initstdio() < 0)
Py_FatalError( Py_FatalError(
"Py_Initialize: can't initialize sys standard streams"); "Py_Initialize: can't initialize sys standard streams");
if (!Py_NoSiteFlag)
initsite(); /* Module site */
/* auto-thread-state API, if available */ /* auto-thread-state API, if available */
#ifdef WITH_THREAD #ifdef WITH_THREAD

View File

@ -426,6 +426,8 @@ class PyBuildExt(build_ext):
exts.append( Extension('operator', ['operator.c']) ) exts.append( Extension('operator', ['operator.c']) )
# _functools # _functools
exts.append( Extension("_functools", ["_functoolsmodule.c"]) ) exts.append( Extension("_functools", ["_functoolsmodule.c"]) )
# Memory-based IO accelerator modules
exts.append( Extension("_bytesio", ["_bytesio.c"]) )
# atexit # atexit
exts.append( Extension("atexit", ["atexitmodule.c"]) ) exts.append( Extension("atexit", ["atexitmodule.c"]) )
# Python C API test module # Python C API test module