Mike Verdone's checkpoint, cleaned up.

Also implemented Neal's suggestion (add fileno() to SocketIO)
and some unrelated changes, e.g. remove Google copyright
and make BytesIO a subclass of BufferedIOBase.
This commit is contained in:
Guido van Rossum 2007-02-27 17:19:33 +00:00
parent a4f9fc6494
commit 68bbcd2a71
2 changed files with 277 additions and 13 deletions

144
Lib/io.py
View File

@ -1,6 +1,3 @@
# Copyright 2006 Google, Inc. All Rights Reserved.
# Licensed to PSF under a Contributor Agreement.
"""New I/O library. """New I/O library.
This is an early prototype; eventually some of this will be This is an early prototype; eventually some of this will be
@ -9,12 +6,17 @@ reimplemented in C and the rest may be turned into a package.
See PEP XXX; for now: http://docs.google.com/Doc?id=dfksfvqd_1cn5g5m See PEP XXX; for now: http://docs.google.com/Doc?id=dfksfvqd_1cn5g5m
""" """
__author__ = "Guido van Rossum <guido@python.org>" __author__ = ("Guido van Rossum <guido@python.org>, "
"Mike Verdone <mike.verdone@gmail.com>")
__all__ = ["open", "RawIOBase", "FileIO", "SocketIO", "BytesIO"] __all__ = ["open", "RawIOBase", "FileIO", "SocketIO", "BytesIO",
"BufferedReader", "BufferedWriter", "BufferedRWPair", "EOF"]
import os import os
DEFAULT_BUFFER_SIZE = 8 * 1024 # bytes
EOF = b""
def open(filename, mode="r", buffering=None, *, encoding=None): def open(filename, mode="r", buffering=None, *, encoding=None):
"""Replacement for the built-in open function. """Replacement for the built-in open function.
@ -71,8 +73,8 @@ def open(filename, mode="r", buffering=None, *, encoding=None):
(appending and "a" or "") + (appending and "a" or "") +
(updating and "+" or "")) (updating and "+" or ""))
if buffering is None: if buffering is None:
buffering = 8*1024 # International standard buffer size buffering = DEFAULT_BUFFER_SIZE
# Should default to line buffering if os.isatty(raw.fileno()) # XXX Should default to line buffering if os.isatty(raw.fileno())
try: try:
bs = os.fstat(raw.fileno()).st_blksize bs = os.fstat(raw.fileno()).st_blksize
except (os.error, AttributeError): except (os.error, AttributeError):
@ -219,7 +221,7 @@ class FileIO(RawIOBase):
def fileno(self): def fileno(self):
return self._fd return self._fd
class SocketIO(RawIOBase): class SocketIO(RawIOBase):
"""Raw I/O implementation for stream sockets.""" """Raw I/O implementation for stream sockets."""
@ -249,12 +251,18 @@ class SocketIO(RawIOBase):
def writable(self): def writable(self):
return "w" in self._mode return "w" in self._mode
# XXX(nnorwitz)??? def fileno(self): return self._sock.fileno() def fileno(self):
return self._sock.fileno()
class BytesIO(RawIOBase): class BufferedIOBase(RawIOBase):
"""Raw I/O implementation for bytes, like StringIO.""" """XXX Docstring."""
class BytesIO(BufferedIOBase):
"""Buffered I/O implementation using a bytes buffer, like StringIO."""
# XXX More docs # XXX More docs
@ -267,7 +275,9 @@ class BytesIO(RawIOBase):
def getvalue(self): def getvalue(self):
return self._buffer return self._buffer
def read(self, n): def read(self, n=None):
if n is None:
n = len(self._buffer)
assert n >= 0 assert n >= 0
newpos = min(len(self._buffer), self._pos + n) newpos = min(len(self._buffer), self._pos + n)
b = self._buffer[self._pos : newpos] b = self._buffer[self._pos : newpos]
@ -312,3 +322,113 @@ class BytesIO(RawIOBase):
def seekable(self): def seekable(self):
return True return True
class BufferedReader(BufferedIOBase):
"""Buffered reader.
Buffer for a readable sequential RawIO object. Does not allow
random access (seek, tell).
"""
def __init__(self, raw):
"""
Create a new buffered reader using the given readable raw IO object.
"""
assert raw.readable()
self.raw = raw
self._read_buf = b''
if hasattr(raw, 'fileno'):
self.fileno = raw.fileno
def read(self, n=None):
"""
Read n bytes. Returns exactly n bytes of data unless the underlying
raw IO stream reaches EOF of if the call would block in non-blocking
mode. If n is None, read until EOF or until read() would block.
"""
nodata_val = EOF
while (len(self._read_buf) < n) if (n is not None) else True:
current = self.raw.read(n)
if current in (EOF, None):
nodata_val = current
break
self._read_buf += current # XXX using += is bad
read = self._read_buf[:n]
if (not self._read_buf):
return nodata_val
self._read_buf = self._read_buf[n if n else 0:]
return read
def write(self, b):
raise IOError(".write() unsupported")
def readable(self):
return True
def flush(self):
# Flush is a no-op
pass
class BufferedWriter(BufferedIOBase):
"""Buffered writer.
XXX More docs.
"""
def __init__(self, raw, buffer_size=DEFAULT_BUFFER_SIZE):
assert raw.writeable()
self.raw = raw
self.buffer_size = buffer_size
self._write_buf_stack = []
self._write_buf_size = 0
if hasattr(raw, 'fileno'):
self.fileno = raw.fileno
def read(self, n=None):
raise IOError(".read() not supported")
def write(self, b):
assert issubclass(type(b), bytes)
self._write_buf_stack.append(b)
self._write_buf_size += len(b)
if (self._write_buf_size > self.buffer_size):
self.flush()
def writeable(self):
return True
def flush(self):
buf = b''.join(self._write_buf_stack)
while len(buf):
buf = buf[self.raw.write(buf):]
self._write_buf_stack = []
self._write_buf_size = 0
# XXX support flushing buffer on close, del
class BufferedRWPair(BufferedReader, BufferedWriter):
"""Buffered Read/Write Pair.
A buffered reader object and buffered writer object put together to
form a sequential IO object that can read and write.
"""
def __init__(self, bufferedReader, bufferedWriter):
assert bufferedReader.readable()
assert bufferedWriter.writeable()
self.bufferedReader = bufferedReader
self.bufferedWriter = bufferedWriter
self.read = bufferedReader.read
self.write = bufferedWriter.write
self.flush = bufferedWriter.flush
self.readable = bufferedReader.readable
self.writeable = bufferedWriter.writeable
def seekable(self):
return False

View File

@ -1,8 +1,43 @@
"""Unit tests for io.py."""
import unittest import unittest
from test import test_support from test import test_support
import io import io
class MockReadIO(io.RawIOBase):
def __init__(self, readStack):
self._readStack = list(readStack)
def read(self, n=None):
try:
return self._readStack.pop(0)
except:
return io.EOF
def fileno(self):
return 42
def readable(self):
return True
class MockWriteIO(io.RawIOBase):
def __init__(self):
self._writeStack = []
def write(self, b):
self._writeStack.append(b)
return len(b)
def writeable(self):
return True
def fileno(self):
return 42
class IOTest(unittest.TestCase): class IOTest(unittest.TestCase):
def write_ops(self, f): def write_ops(self, f):
@ -55,8 +90,117 @@ class IOTest(unittest.TestCase):
f = io.BytesIO(data) f = io.BytesIO(data)
self.read_ops(f) self.read_ops(f)
class BytesIOTest(unittest.TestCase):
def testInit(self):
buf = b"1234567890"
bytesIo = io.BytesIO(buf)
def testRead(self):
buf = b"1234567890"
bytesIo = io.BytesIO(buf)
self.assertEquals(buf[:1], bytesIo.read(1))
self.assertEquals(buf[1:5], bytesIo.read(4))
self.assertEquals(buf[5:], bytesIo.read(900))
self.assertEquals(io.EOF, bytesIo.read())
def testReadNoArgs(self):
buf = b"1234567890"
bytesIo = io.BytesIO(buf)
self.assertEquals(buf, bytesIo.read())
self.assertEquals(io.EOF, bytesIo.read())
def testSeek(self):
buf = b"1234567890"
bytesIo = io.BytesIO(buf)
bytesIo.read(5)
bytesIo.seek(0)
self.assertEquals(buf, bytesIo.read())
bytesIo.seek(3)
self.assertEquals(buf[3:], bytesIo.read())
def testTell(self):
buf = b"1234567890"
bytesIo = io.BytesIO(buf)
self.assertEquals(0, bytesIo.tell())
bytesIo.seek(5)
self.assertEquals(5, bytesIo.tell())
bytesIo.seek(10000)
self.assertEquals(10000, bytesIo.tell())
class BufferedReaderTest(unittest.TestCase):
def testRead(self):
rawIo = MockReadIO((b"abc", b"d", b"efg"))
bufIo = io.BufferedReader(rawIo)
self.assertEquals(b"abcdef", bufIo.read(6))
def testReadToEof(self):
rawIo = MockReadIO((b"abc", b"d", b"efg"))
bufIo = io.BufferedReader(rawIo)
self.assertEquals(b"abcdefg", bufIo.read(9000))
def testReadNoArgs(self):
rawIo = MockReadIO((b"abc", b"d", b"efg"))
bufIo = io.BufferedReader(rawIo)
self.assertEquals(b"abcdefg", bufIo.read())
def testFileno(self):
rawIo = MockReadIO((b"abc", b"d", b"efg"))
bufIo = io.BufferedReader(rawIo)
self.assertEquals(42, bufIo.fileno())
def testFilenoNoFileno(self):
# TODO will we always have fileno() function? If so, kill
# this test. Else, write it.
pass
class BufferedWriterTest(unittest.TestCase):
def testWrite(self):
# Write to the buffered IO but don't overflow the buffer.
writer = MockWriteIO()
bufIo = io.BufferedWriter(writer, 8)
bufIo.write(b"abc")
self.assertFalse(writer._writeStack)
def testWriteOverflow(self):
writer = MockWriteIO()
bufIo = io.BufferedWriter(writer, 8)
bufIo.write(b"abc")
bufIo.write(b"defghijkl")
self.assertEquals(b"abcdefghijkl", writer._writeStack[0])
def testFlush(self):
writer = MockWriteIO()
bufIo = io.BufferedWriter(writer, 8)
bufIo.write(b"abc")
bufIo.flush()
self.assertEquals(b"abc", writer._writeStack[0])
# TODO. Tests for open()
def test_main(): def test_main():
test_support.run_unittest(IOTest) test_support.run_unittest(IOTest, BytesIOTest, BufferedReaderTest,
BufferedWriterTest)
if __name__ == "__main__": if __name__ == "__main__":
test_main() test_main()