Issue #21859: Added Python implementation of io.FileIO.

This commit is contained in:
Serhiy Storchaka 2015-04-10 16:16:16 +03:00
parent cd092efb16
commit 71fd224af0
4 changed files with 500 additions and 66 deletions

View File

@ -7,11 +7,16 @@ import abc
import codecs
import errno
import array
import stat
# Import _thread instead of threading to reduce startup cost
try:
from _thread import allocate_lock as Lock
except ImportError:
from _dummy_thread import allocate_lock as Lock
if os.name == 'win32':
from msvcrt import setmode as _setmode
else:
_setmode = None
import io
from io import (__all__, SEEK_SET, SEEK_CUR, SEEK_END)
@ -1378,6 +1383,345 @@ class BufferedRandom(BufferedWriter, BufferedReader):
return BufferedWriter.write(self, b)
class FileIO(RawIOBase):
_fd = -1
_created = False
_readable = False
_writable = False
_appending = False
_seekable = None
_closefd = True
def __init__(self, file, mode='r', closefd=True, opener=None):
"""Open a file. The mode can be 'r' (default), 'w', 'x' or 'a' for reading,
writing, exclusive creation or appending. The file will be created if it
doesn't exist when opened for writing or appending; it will be truncated
when opened for writing. A FileExistsError will be raised if it already
exists when opened for creating. Opening a file for creating implies
writing so this mode behaves in a similar way to 'w'. Add a '+' to the mode
to allow simultaneous reading and writing. A custom opener can be used by
passing a callable as *opener*. The underlying file descriptor for the file
object is then obtained by calling opener with (*name*, *flags*).
*opener* must return an open file descriptor (passing os.open as *opener*
results in functionality similar to passing None).
"""
if self._fd >= 0:
# Have to close the existing file first.
try:
if self._closefd:
os.close(self._fd)
finally:
self._fd = -1
if isinstance(file, float):
raise TypeError('integer argument expected, got float')
if isinstance(file, int):
fd = file
if fd < 0:
raise ValueError('negative file descriptor')
else:
fd = -1
if not isinstance(mode, str):
raise TypeError('invalid mode: %s' % (mode,))
if not set(mode) <= set('xrwab+'):
raise ValueError('invalid mode: %s' % (mode,))
if sum(c in 'rwax' for c in mode) != 1 or mode.count('+') > 1:
raise ValueError('Must have exactly one of create/read/write/append '
'mode and at most one plus')
if 'x' in mode:
self._created = True
self._writable = True
flags = os.O_EXCL | os.O_CREAT
elif 'r' in mode:
self._readable = True
flags = 0
elif 'w' in mode:
self._writable = True
flags = os.O_CREAT | os.O_TRUNC
elif 'a' in mode:
self._writable = True
self._appending = True
flags = os.O_APPEND | os.O_CREAT
if '+' in mode:
self._readable = True
self._writable = True
if self._readable and self._writable:
flags |= os.O_RDWR
elif self._readable:
flags |= os.O_RDONLY
else:
flags |= os.O_WRONLY
flags |= getattr(os, 'O_BINARY', 0)
noinherit_flag = (getattr(os, 'O_NOINHERIT', 0) or
getattr(os, 'O_CLOEXEC', 0))
flags |= noinherit_flag
owned_fd = None
try:
if fd < 0:
if not closefd:
raise ValueError('Cannot use closefd=False with file name')
if opener is None:
fd = os.open(file, flags, 0o666)
else:
fd = opener(file, flags)
if not isinstance(fd, int):
raise TypeError('expected integer from opener')
if fd < 0:
raise OSError('Negative file descriptor')
owned_fd = fd
if not noinherit_flag:
os.set_inheritable(fd, False)
self._closefd = closefd
fdfstat = os.fstat(fd)
try:
if stat.S_ISDIR(fdfstat.st_mode):
raise IsADirectoryError(errno.EISDIR,
os.strerror(errno.EISDIR), file)
except AttributeError:
# Ignore the AttribueError if stat.S_ISDIR or errno.EISDIR
# don't exist.
pass
self._blksize = getattr(fdfstat, 'st_blksize', 0)
if self._blksize <= 1:
self._blksize = DEFAULT_BUFFER_SIZE
if _setmode:
# don't translate newlines (\r\n <=> \n)
_setmode(fd, os.O_BINARY)
self.name = file
if self._appending:
# For consistent behaviour, we explicitly seek to the
# end of file (otherwise, it might be done only on the
# first write()).
os.lseek(fd, 0, SEEK_END)
except:
if owned_fd is not None:
os.close(owned_fd)
raise
self._fd = fd
def __del__(self):
if self._fd >= 0 and self._closefd and not self.closed:
import warnings
warnings.warn('unclosed file %r' % (self,), ResourceWarning,
stacklevel=2)
self.close()
def __getstate__(self):
raise TypeError("cannot serialize '%s' object", self.__class__.__name__)
def __repr__(self):
class_name = '%s.%s' % (self.__class__.__module__,
self.__class__.__qualname__)
if self.closed:
return '<%s [closed]>' % class_name
try:
name = self.name
except AttributeError:
return ('<%s fd=%d mode=%r closefd=%r>' %
(class_name, self._fd, self.mode, self._closefd))
else:
return ('<%s name=%r mode=%r closefd=%r>' %
(class_name, name, self.mode, self._closefd))
def _checkReadable(self):
if not self._readable:
raise UnsupportedOperation('File not open for reading')
def _checkWritable(self, msg=None):
if not self._writable:
raise UnsupportedOperation('File not open for writing')
def read(self, size=None):
"""Read at most size bytes, returned as bytes.
Only makes one system call, so less data may be returned than requested
In non-blocking mode, returns None if no data is available.
Return an empty bytes object at EOF.
"""
self._checkClosed()
self._checkReadable()
if size is None or size < 0:
return self.readall()
try:
return os.read(self._fd, size)
except BlockingIOError:
return None
def readall(self):
"""Read all data from the file, returned as bytes.
In non-blocking mode, returns as much as is immediately available,
or None if no data is available. Return an empty bytes object at EOF.
"""
self._checkClosed()
self._checkReadable()
bufsize = DEFAULT_BUFFER_SIZE
try:
pos = os.lseek(self._fd, 0, SEEK_CUR)
end = os.fstat(self._fd).st_size
if end >= pos:
bufsize = end - pos + 1
except OSError:
pass
result = bytearray()
while True:
if len(result) >= bufsize:
bufsize = len(result)
bufsize += max(bufsize, DEFAULT_BUFFER_SIZE)
n = bufsize - len(result)
try:
chunk = os.read(self._fd, n)
except BlockingIOError:
if result:
break
return None
if not chunk: # reached the end of the file
break
result += chunk
return bytes(result)
def readinto(self, b):
"""Same as RawIOBase.readinto()."""
m = memoryview(b).cast('B')
data = self.read(len(m))
n = len(data)
m[:n] = data
return n
def write(self, b):
"""Write bytes b to file, return number written.
Only makes one system call, so not all of the data may be written.
The number of bytes actually written is returned. In non-blocking mode,
returns None if the write would block.
"""
self._checkClosed()
self._checkWritable()
try:
return os.write(self._fd, b)
except BlockingIOError:
return None
def seek(self, pos, whence=SEEK_SET):
"""Move to new file position.
Argument offset is a byte count. Optional argument whence defaults to
SEEK_SET or 0 (offset from start of file, offset should be >= 0); other values
are SEEK_CUR or 1 (move relative to current position, positive or negative),
and SEEK_END or 2 (move relative to end of file, usually negative, although
many platforms allow seeking beyond the end of a file).
Note that not all file objects are seekable.
"""
if isinstance(pos, float):
raise TypeError('an integer is required')
self._checkClosed()
return os.lseek(self._fd, pos, whence)
def tell(self):
"""tell() -> int. Current file position.
Can raise OSError for non seekable files."""
self._checkClosed()
return os.lseek(self._fd, 0, SEEK_CUR)
def truncate(self, size=None):
"""Truncate the file to at most size bytes.
Size defaults to the current file position, as returned by tell().
The current file position is changed to the value of size.
"""
self._checkClosed()
self._checkWritable()
if size is None:
size = self.tell()
os.ftruncate(self._fd, size)
return size
def close(self):
"""Close the file.
A closed file cannot be used for further I/O operations. close() may be
called more than once without error.
"""
if not self.closed:
try:
if self._closefd:
os.close(self._fd)
finally:
super().close()
def seekable(self):
"""True if file supports random-access."""
self._checkClosed()
if self._seekable is None:
try:
self.tell()
except OSError:
self._seekable = False
else:
self._seekable = True
return self._seekable
def readable(self):
"""True if file was opened in a read mode."""
self._checkClosed()
return self._readable
def writable(self):
"""True if file was opened in a write mode."""
self._checkClosed()
return self._writable
def fileno(self):
"""Return the underlying file descriptor (an integer)."""
self._checkClosed()
return self._fd
def isatty(self):
"""True if the file is connected to a TTY device."""
self._checkClosed()
return os.isatty(self._fd)
@property
def closefd(self):
"""True if the file descriptor will be closed by close()."""
return self._closefd
@property
def mode(self):
"""String giving the file mode"""
if self._created:
if self._readable:
return 'xb+'
else:
return 'xb'
elif self._appending:
if self._readable:
return 'ab+'
else:
return 'ab'
elif self._readable:
if self._writable:
return 'rb+'
else:
return 'rb'
else:
return 'wb'
class TextIOBase(IOBase):
"""Base class for text I/O.

View File

@ -18,11 +18,12 @@ import time
import unittest
# Test import all of the things we're about to try testing up front.
from _io import FileIO
import _io
import _pyio
@unittest.skipUnless(os.name == 'posix', 'tests requires a posix system.')
class TestFileIOSignalInterrupt(unittest.TestCase):
class TestFileIOSignalInterrupt:
def setUp(self):
self._process = None
@ -38,8 +39,9 @@ class TestFileIOSignalInterrupt(unittest.TestCase):
subclasseses should override this to test different IO objects.
"""
return ('import _io ;'
'infile = _io.FileIO(sys.stdin.fileno(), "rb")')
return ('import %s as io ;'
'infile = io.FileIO(sys.stdin.fileno(), "rb")' %
self.modname)
def fail_with_process_info(self, why, stdout=b'', stderr=b'',
communicate=True):
@ -179,11 +181,19 @@ class TestFileIOSignalInterrupt(unittest.TestCase):
expected=b'hello\nworld!\n'))
class CTestFileIOSignalInterrupt(TestFileIOSignalInterrupt, unittest.TestCase):
modname = '_io'
class PyTestFileIOSignalInterrupt(TestFileIOSignalInterrupt, unittest.TestCase):
modname = '_pyio'
class TestBufferedIOSignalInterrupt(TestFileIOSignalInterrupt):
def _generate_infile_setup_code(self):
"""Returns the infile = ... line of code to make a BufferedReader."""
return ('infile = open(sys.stdin.fileno(), "rb") ;'
'import _io ;assert isinstance(infile, _io.BufferedReader)')
return ('import %s as io ;infile = io.open(sys.stdin.fileno(), "rb") ;'
'assert isinstance(infile, io.BufferedReader)' %
self.modname)
def test_readall(self):
"""BufferedReader.read() must handle signals and not lose data."""
@ -193,12 +203,20 @@ class TestBufferedIOSignalInterrupt(TestFileIOSignalInterrupt):
read_method_name='read',
expected=b'hello\nworld!\n'))
class CTestBufferedIOSignalInterrupt(TestBufferedIOSignalInterrupt, unittest.TestCase):
modname = '_io'
class PyTestBufferedIOSignalInterrupt(TestBufferedIOSignalInterrupt, unittest.TestCase):
modname = '_pyio'
class TestTextIOSignalInterrupt(TestFileIOSignalInterrupt):
def _generate_infile_setup_code(self):
"""Returns the infile = ... line of code to make a TextIOWrapper."""
return ('infile = open(sys.stdin.fileno(), "rt", newline=None) ;'
'import _io ;assert isinstance(infile, _io.TextIOWrapper)')
return ('import %s as io ;'
'infile = io.open(sys.stdin.fileno(), "rt", newline=None) ;'
'assert isinstance(infile, io.TextIOWrapper)' %
self.modname)
def test_readline(self):
"""readline() must handle signals and not lose data."""
@ -224,6 +242,12 @@ class TestTextIOSignalInterrupt(TestFileIOSignalInterrupt):
read_method_name='read',
expected="hello\nworld!\n"))
class CTestTextIOSignalInterrupt(TestTextIOSignalInterrupt, unittest.TestCase):
modname = '_io'
class PyTestTextIOSignalInterrupt(TestTextIOSignalInterrupt, unittest.TestCase):
modname = '_pyio'
def test_main():
test_cases = [

View File

@ -12,13 +12,15 @@ from functools import wraps
from test.support import TESTFN, check_warnings, run_unittest, make_bad_fd, cpython_only
from collections import UserList
from _io import FileIO as _FileIO
import _io # C implementation of io
import _pyio # Python implementation of io
class AutoFileTests(unittest.TestCase):
class AutoFileTests:
# file tests for which a test file is automatically set up
def setUp(self):
self.f = _FileIO(TESTFN, 'w')
self.f = self.FileIO(TESTFN, 'w')
def tearDown(self):
if self.f:
@ -69,20 +71,60 @@ class AutoFileTests(unittest.TestCase):
blksize = getattr(fst, 'st_blksize', blksize)
self.assertEqual(self.f._blksize, blksize)
def testReadinto(self):
# verify readinto
self.f.write(bytes([1, 2]))
def testReadintoByteArray(self):
self.f.write(bytes([1, 2, 0, 255]))
self.f.close()
a = array('b', b'x'*10)
self.f = _FileIO(TESTFN, 'r')
n = self.f.readinto(a)
self.assertEqual(array('b', [1, 2]), a[:n])
ba = bytearray(b'abcdefgh')
with self.FileIO(TESTFN, 'r') as f:
n = f.readinto(ba)
self.assertEqual(ba, b'\x01\x02\x00\xffefgh')
self.assertEqual(n, 4)
def _testReadintoMemoryview(self):
self.f.write(bytes([1, 2, 0, 255]))
self.f.close()
m = memoryview(bytearray(b'abcdefgh'))
with self.FileIO(TESTFN, 'r') as f:
n = f.readinto(m)
self.assertEqual(m, b'\x01\x02\x00\xffefgh')
self.assertEqual(n, 4)
m = memoryview(bytearray(b'abcdefgh')).cast('H', shape=[2, 2])
with self.FileIO(TESTFN, 'r') as f:
n = f.readinto(m)
self.assertEqual(bytes(m), b'\x01\x02\x00\xffefgh')
self.assertEqual(n, 4)
def _testReadintoArray(self):
self.f.write(bytes([1, 2, 0, 255]))
self.f.close()
a = array('B', b'abcdefgh')
with self.FileIO(TESTFN, 'r') as f:
n = f.readinto(a)
self.assertEqual(a, array('B', [1, 2, 0, 255, 101, 102, 103, 104]))
self.assertEqual(n, 4)
a = array('b', b'abcdefgh')
with self.FileIO(TESTFN, 'r') as f:
n = f.readinto(a)
self.assertEqual(a, array('b', [1, 2, 0, -1, 101, 102, 103, 104]))
self.assertEqual(n, 4)
a = array('I', b'abcdefgh')
with self.FileIO(TESTFN, 'r') as f:
n = f.readinto(a)
self.assertEqual(a, array('I', b'\x01\x02\x00\xffefgh'))
self.assertEqual(n, 4)
def testWritelinesList(self):
l = [b'123', b'456']
self.f.writelines(l)
self.f.close()
self.f = _FileIO(TESTFN, 'rb')
self.f = self.FileIO(TESTFN, 'rb')
buf = self.f.read()
self.assertEqual(buf, b'123456')
@ -90,7 +132,7 @@ class AutoFileTests(unittest.TestCase):
l = UserList([b'123', b'456'])
self.f.writelines(l)
self.f.close()
self.f = _FileIO(TESTFN, 'rb')
self.f = self.FileIO(TESTFN, 'rb')
buf = self.f.read()
self.assertEqual(buf, b'123456')
@ -102,7 +144,7 @@ class AutoFileTests(unittest.TestCase):
def test_none_args(self):
self.f.write(b"hi\nbye\nabc")
self.f.close()
self.f = _FileIO(TESTFN, 'r')
self.f = self.FileIO(TESTFN, 'r')
self.assertEqual(self.f.read(None), b"hi\nbye\nabc")
self.f.seek(0)
self.assertEqual(self.f.readline(None), b"hi\n")
@ -112,23 +154,24 @@ class AutoFileTests(unittest.TestCase):
self.assertRaises(TypeError, self.f.write, "Hello!")
def testRepr(self):
self.assertEqual(
repr(self.f), "<_io.FileIO name=%r mode=%r closefd=True>"
% (self.f.name, self.f.mode))
self.assertEqual(repr(self.f),
"<%s.FileIO name=%r mode=%r closefd=True>" %
(self.modulename, self.f.name, self.f.mode))
del self.f.name
self.assertEqual(
repr(self.f), "<_io.FileIO fd=%r mode=%r closefd=True>"
% (self.f.fileno(), self.f.mode))
self.assertEqual(repr(self.f),
"<%s.FileIO fd=%r mode=%r closefd=True>" %
(self.modulename, self.f.fileno(), self.f.mode))
self.f.close()
self.assertEqual(repr(self.f), "<_io.FileIO [closed]>")
self.assertEqual(repr(self.f),
"<%s.FileIO [closed]>" % (self.modulename,))
def testReprNoCloseFD(self):
fd = os.open(TESTFN, os.O_RDONLY)
try:
with _FileIO(fd, 'r', closefd=False) as f:
with self.FileIO(fd, 'r', closefd=False) as f:
self.assertEqual(repr(f),
"<_io.FileIO name=%r mode=%r closefd=False>"
% (f.name, f.mode))
"<%s.FileIO name=%r mode=%r closefd=False>" %
(self.modulename, f.name, f.mode))
finally:
os.close(fd)
@ -140,15 +183,15 @@ class AutoFileTests(unittest.TestCase):
self.assertRaises(ValueError, f.read, 10) # Open for reading
f.close()
self.assertTrue(f.closed)
f = _FileIO(TESTFN, 'r')
f = self.FileIO(TESTFN, 'r')
self.assertRaises(TypeError, f.readinto, "")
self.assertTrue(not f.closed)
f.close()
self.assertTrue(f.closed)
def testMethods(self):
methods = ['fileno', 'isatty', 'read', 'readinto',
'seek', 'tell', 'truncate', 'write', 'seekable',
methods = ['fileno', 'isatty', 'read',
'tell', 'truncate', 'seekable',
'readable', 'writable']
self.f.close()
@ -158,13 +201,16 @@ class AutoFileTests(unittest.TestCase):
method = getattr(self.f, methodname)
# should raise on closed file
self.assertRaises(ValueError, method)
self.assertRaises(ValueError, self.f.readinto, bytearray())
self.assertRaises(ValueError, self.f.seek, 0, os.SEEK_CUR)
self.assertRaises(ValueError, self.f.write, b'')
def testOpendir(self):
# Issue 3703: opening a directory should fill the errno
# Windows always returns "[Errno 13]: Permission denied
# Unix uses fstat and returns "[Errno 21]: Is a directory"
try:
_FileIO('.', 'r')
self.FileIO('.', 'r')
except OSError as e:
self.assertNotEqual(e.errno, 0)
self.assertEqual(e.filename, ".")
@ -175,7 +221,7 @@ class AutoFileTests(unittest.TestCase):
def testOpenDirFD(self):
fd = os.open('.', os.O_RDONLY)
with self.assertRaises(OSError) as cm:
_FileIO(fd, 'r')
self.FileIO(fd, 'r')
os.close(fd)
self.assertEqual(cm.exception.errno, errno.EISDIR)
@ -260,7 +306,7 @@ class AutoFileTests(unittest.TestCase):
self.f.close()
except OSError:
pass
self.f = _FileIO(TESTFN, 'r')
self.f = self.FileIO(TESTFN, 'r')
os.close(self.f.fileno())
return self.f
@ -280,23 +326,32 @@ class AutoFileTests(unittest.TestCase):
a = array('b', b'x'*10)
f.readinto(a)
class OtherFileTests(unittest.TestCase):
class CAutoFileTests(AutoFileTests, unittest.TestCase):
FileIO = _io.FileIO
modulename = '_io'
class PyAutoFileTests(AutoFileTests, unittest.TestCase):
FileIO = _pyio.FileIO
modulename = '_pyio'
class OtherFileTests:
def testAbles(self):
try:
f = _FileIO(TESTFN, "w")
f = self.FileIO(TESTFN, "w")
self.assertEqual(f.readable(), False)
self.assertEqual(f.writable(), True)
self.assertEqual(f.seekable(), True)
f.close()
f = _FileIO(TESTFN, "r")
f = self.FileIO(TESTFN, "r")
self.assertEqual(f.readable(), True)
self.assertEqual(f.writable(), False)
self.assertEqual(f.seekable(), True)
f.close()
f = _FileIO(TESTFN, "a+")
f = self.FileIO(TESTFN, "a+")
self.assertEqual(f.readable(), True)
self.assertEqual(f.writable(), True)
self.assertEqual(f.seekable(), True)
@ -305,7 +360,7 @@ class OtherFileTests(unittest.TestCase):
if sys.platform != "win32":
try:
f = _FileIO("/dev/tty", "a")
f = self.FileIO("/dev/tty", "a")
except OSError:
# When run in a cron job there just aren't any
# ttys, so skip the test. This also handles other
@ -328,7 +383,7 @@ class OtherFileTests(unittest.TestCase):
# check invalid mode strings
for mode in ("", "aU", "wU+", "rw", "rt"):
try:
f = _FileIO(TESTFN, mode)
f = self.FileIO(TESTFN, mode)
except ValueError:
pass
else:
@ -344,7 +399,7 @@ class OtherFileTests(unittest.TestCase):
('ab+', 'ab+'), ('a+b', 'ab+'), ('r', 'rb'),
('rb', 'rb'), ('rb+', 'rb+'), ('r+b', 'rb+')]:
# read modes are last so that TESTFN will exist first
with _FileIO(TESTFN, modes[0]) as f:
with self.FileIO(TESTFN, modes[0]) as f:
self.assertEqual(f.mode, modes[1])
finally:
if os.path.exists(TESTFN):
@ -352,7 +407,7 @@ class OtherFileTests(unittest.TestCase):
def testUnicodeOpen(self):
# verify repr works for unicode too
f = _FileIO(str(TESTFN), "w")
f = self.FileIO(str(TESTFN), "w")
f.close()
os.unlink(TESTFN)
@ -362,7 +417,7 @@ class OtherFileTests(unittest.TestCase):
fn = TESTFN.encode("ascii")
except UnicodeEncodeError:
self.skipTest('could not encode %r to ascii' % TESTFN)
f = _FileIO(fn, "w")
f = self.FileIO(fn, "w")
try:
f.write(b"abc")
f.close()
@ -373,28 +428,21 @@ class OtherFileTests(unittest.TestCase):
def testConstructorHandlesNULChars(self):
fn_with_NUL = 'foo\0bar'
self.assertRaises(ValueError, _FileIO, fn_with_NUL, 'w')
self.assertRaises(ValueError, _FileIO, bytes(fn_with_NUL, 'ascii'), 'w')
self.assertRaises(ValueError, self.FileIO, fn_with_NUL, 'w')
self.assertRaises(ValueError, self.FileIO, bytes(fn_with_NUL, 'ascii'), 'w')
def testInvalidFd(self):
self.assertRaises(ValueError, _FileIO, -10)
self.assertRaises(OSError, _FileIO, make_bad_fd())
self.assertRaises(ValueError, self.FileIO, -10)
self.assertRaises(OSError, self.FileIO, make_bad_fd())
if sys.platform == 'win32':
import msvcrt
self.assertRaises(OSError, msvcrt.get_osfhandle, make_bad_fd())
@cpython_only
def testInvalidFd_overflow(self):
# Issue 15989
import _testcapi
self.assertRaises(TypeError, _FileIO, _testcapi.INT_MAX + 1)
self.assertRaises(TypeError, _FileIO, _testcapi.INT_MIN - 1)
def testBadModeArgument(self):
# verify that we get a sensible error message for bad mode argument
bad_mode = "qwerty"
try:
f = _FileIO(TESTFN, bad_mode)
f = self.FileIO(TESTFN, bad_mode)
except ValueError as msg:
if msg.args[0] != 0:
s = str(msg)
@ -407,7 +455,7 @@ class OtherFileTests(unittest.TestCase):
self.fail("no error for invalid mode: %s" % bad_mode)
def testTruncate(self):
f = _FileIO(TESTFN, 'w')
f = self.FileIO(TESTFN, 'w')
f.write(bytes(bytearray(range(10))))
self.assertEqual(f.tell(), 10)
f.truncate(5)
@ -422,11 +470,11 @@ class OtherFileTests(unittest.TestCase):
def bug801631():
# SF bug <http://www.python.org/sf/801631>
# "file.truncate fault on windows"
f = _FileIO(TESTFN, 'w')
f = self.FileIO(TESTFN, 'w')
f.write(bytes(range(11)))
f.close()
f = _FileIO(TESTFN,'r+')
f = self.FileIO(TESTFN,'r+')
data = f.read(5)
if data != bytes(range(5)):
self.fail("Read on file opened for update failed %r" % data)
@ -466,19 +514,19 @@ class OtherFileTests(unittest.TestCase):
pass
def testInvalidInit(self):
self.assertRaises(TypeError, _FileIO, "1", 0, 0)
self.assertRaises(TypeError, self.FileIO, "1", 0, 0)
def testWarnings(self):
with check_warnings(quiet=True) as w:
self.assertEqual(w.warnings, [])
self.assertRaises(TypeError, _FileIO, [])
self.assertRaises(TypeError, self.FileIO, [])
self.assertEqual(w.warnings, [])
self.assertRaises(ValueError, _FileIO, "/some/invalid/name", "rt")
self.assertRaises(ValueError, self.FileIO, "/some/invalid/name", "rt")
self.assertEqual(w.warnings, [])
def testUnclosedFDOnException(self):
class MyException(Exception): pass
class MyFileIO(_FileIO):
class MyFileIO(self.FileIO):
def __setattr__(self, name, value):
if name == "name":
raise MyException("blocked setting name")
@ -487,12 +535,28 @@ class OtherFileTests(unittest.TestCase):
self.assertRaises(MyException, MyFileIO, fd)
os.close(fd) # should not raise OSError(EBADF)
class COtherFileTests(OtherFileTests, unittest.TestCase):
FileIO = _io.FileIO
modulename = '_io'
@cpython_only
def testInvalidFd_overflow(self):
# Issue 15989
import _testcapi
self.assertRaises(TypeError, self.FileIO, _testcapi.INT_MAX + 1)
self.assertRaises(TypeError, self.FileIO, _testcapi.INT_MIN - 1)
class PyOtherFileTests(OtherFileTests, unittest.TestCase):
FileIO = _pyio.FileIO
modulename = '_pyio'
def test_main():
# Historically, these tests have been sloppy about removing TESTFN.
# So get rid of it no matter what.
try:
run_unittest(AutoFileTests, OtherFileTests)
run_unittest(CAutoFileTests, PyAutoFileTests,
COtherFileTests, PyOtherFileTests)
finally:
if os.path.exists(TESTFN):
os.unlink(TESTFN)

View File

@ -19,6 +19,8 @@ Core and Builtins
Library
-------
- Issue #21859: Added Python implementation of io.FileIO.
- Issue #23865: close() methods in multiple modules now are idempotent and more
robust at shutdown. If needs to release multiple resources, they are released
even if errors are occured.