Patch # 1033 by Adam Hupp:
1) tempfile.TemporaryFile, NamedTemporaryFile, and SpooledTemporaryFile now pass newline and encoding to the underlying io.open call. 2) test_tempfile is updated 3) test_csv is updated to use the new arguments.
This commit is contained in:
parent
a1a68521db
commit
f0c7416157
|
@ -406,13 +406,16 @@ class _TemporaryFileWrapper:
|
||||||
def __del__(self):
|
def __del__(self):
|
||||||
self.close()
|
self.close()
|
||||||
|
|
||||||
def NamedTemporaryFile(mode='w+b', bufsize=-1, suffix="",
|
def NamedTemporaryFile(mode='w+b', buffering=-1, encoding=None,
|
||||||
prefix=template, dir=None, delete=True):
|
newline=None, suffix="", prefix=template,
|
||||||
|
dir=None, delete=True):
|
||||||
"""Create and return a temporary file.
|
"""Create and return a temporary file.
|
||||||
Arguments:
|
Arguments:
|
||||||
'prefix', 'suffix', 'dir' -- as for mkstemp.
|
'prefix', 'suffix', 'dir' -- as for mkstemp.
|
||||||
'mode' -- the mode argument to os.fdopen (default "w+b").
|
'mode' -- the mode argument to io.open (default "w+b").
|
||||||
'bufsize' -- the buffer size argument to os.fdopen (default -1).
|
'buffering' -- the buffer size argument to io.open (default -1).
|
||||||
|
'encoding' -- the encoding argument to io.open (default None)
|
||||||
|
'newline' -- the newline argument to io.open (default None)
|
||||||
'delete' -- whether the file is deleted on close (default True).
|
'delete' -- whether the file is deleted on close (default True).
|
||||||
The file is created as mkstemp() would do it.
|
The file is created as mkstemp() would do it.
|
||||||
|
|
||||||
|
@ -435,7 +438,9 @@ def NamedTemporaryFile(mode='w+b', bufsize=-1, suffix="",
|
||||||
flags |= _os.O_TEMPORARY
|
flags |= _os.O_TEMPORARY
|
||||||
|
|
||||||
(fd, name) = _mkstemp_inner(dir, prefix, suffix, flags)
|
(fd, name) = _mkstemp_inner(dir, prefix, suffix, flags)
|
||||||
file = _io.open(fd, mode, bufsize)
|
file = _io.open(fd, mode, buffering=buffering,
|
||||||
|
newline=newline, encoding=encoding)
|
||||||
|
|
||||||
return _TemporaryFileWrapper(file, name, delete)
|
return _TemporaryFileWrapper(file, name, delete)
|
||||||
|
|
||||||
if _os.name != 'posix' or _os.sys.platform == 'cygwin':
|
if _os.name != 'posix' or _os.sys.platform == 'cygwin':
|
||||||
|
@ -444,13 +449,16 @@ if _os.name != 'posix' or _os.sys.platform == 'cygwin':
|
||||||
TemporaryFile = NamedTemporaryFile
|
TemporaryFile = NamedTemporaryFile
|
||||||
|
|
||||||
else:
|
else:
|
||||||
def TemporaryFile(mode='w+b', bufsize=-1, suffix="",
|
def TemporaryFile(mode='w+b', buffering=-1, encoding=None,
|
||||||
prefix=template, dir=None):
|
newline=None, suffix="", prefix=template,
|
||||||
|
dir=None):
|
||||||
"""Create and return a temporary file.
|
"""Create and return a temporary file.
|
||||||
Arguments:
|
Arguments:
|
||||||
'prefix', 'suffix', 'dir' -- as for mkstemp.
|
'prefix', 'suffix', 'dir' -- as for mkstemp.
|
||||||
'mode' -- the mode argument to os.fdopen (default "w+b").
|
'mode' -- the mode argument to io.open (default "w+b").
|
||||||
'bufsize' -- the buffer size argument to os.fdopen (default -1).
|
'buffering' -- the buffer size argument to io.open (default -1).
|
||||||
|
'encoding' -- the encoding argument to io.open (default None)
|
||||||
|
'newline' -- the newline argument to io.open (default None)
|
||||||
The file is created as mkstemp() would do it.
|
The file is created as mkstemp() would do it.
|
||||||
|
|
||||||
Returns an object with a file-like interface. The file has no
|
Returns an object with a file-like interface. The file has no
|
||||||
|
@ -468,7 +476,8 @@ else:
|
||||||
(fd, name) = _mkstemp_inner(dir, prefix, suffix, flags)
|
(fd, name) = _mkstemp_inner(dir, prefix, suffix, flags)
|
||||||
try:
|
try:
|
||||||
_os.unlink(name)
|
_os.unlink(name)
|
||||||
return _io.open(fd, mode, bufsize)
|
return _io.open(fd, mode, buffering=buffering,
|
||||||
|
newline=newline, encoding=encoding)
|
||||||
except:
|
except:
|
||||||
_os.close(fd)
|
_os.close(fd)
|
||||||
raise
|
raise
|
||||||
|
@ -480,15 +489,19 @@ class SpooledTemporaryFile:
|
||||||
"""
|
"""
|
||||||
_rolled = False
|
_rolled = False
|
||||||
|
|
||||||
def __init__(self, max_size=0, mode='w+b', bufsize=-1,
|
def __init__(self, max_size=0, mode='w+b', buffering=-1,
|
||||||
|
encoding=None, newline=None,
|
||||||
suffix="", prefix=template, dir=None):
|
suffix="", prefix=template, dir=None):
|
||||||
if 'b' in mode:
|
if 'b' in mode:
|
||||||
self._file = _io.BytesIO()
|
self._file = _io.BytesIO()
|
||||||
else:
|
else:
|
||||||
self._file = _io.StringIO()
|
self._file = _io.StringIO(encoding=encoding, newline=newline)
|
||||||
self._max_size = max_size
|
self._max_size = max_size
|
||||||
self._rolled = False
|
self._rolled = False
|
||||||
self._TemporaryFileArgs = (mode, bufsize, suffix, prefix, dir)
|
self._TemporaryFileArgs = {'mode': mode, 'buffering': buffering,
|
||||||
|
'suffix': suffix, 'prefix': prefix,
|
||||||
|
'encoding': encoding, 'newline': newline,
|
||||||
|
'dir': dir}
|
||||||
|
|
||||||
def _check(self, file):
|
def _check(self, file):
|
||||||
if self._rolled: return
|
if self._rolled: return
|
||||||
|
@ -499,7 +512,7 @@ class SpooledTemporaryFile:
|
||||||
def rollover(self):
|
def rollover(self):
|
||||||
if self._rolled: return
|
if self._rolled: return
|
||||||
file = self._file
|
file = self._file
|
||||||
newfile = self._file = TemporaryFile(*self._TemporaryFileArgs)
|
newfile = self._file = TemporaryFile(**self._TemporaryFileArgs)
|
||||||
del self._TemporaryFileArgs
|
del self._TemporaryFileArgs
|
||||||
|
|
||||||
newfile.write(file.getvalue())
|
newfile.write(file.getvalue())
|
||||||
|
|
|
@ -2,6 +2,7 @@
|
||||||
# Copyright (C) 2001,2002 Python Software Foundation
|
# Copyright (C) 2001,2002 Python Software Foundation
|
||||||
# csv package unit tests
|
# csv package unit tests
|
||||||
|
|
||||||
|
import io
|
||||||
import sys
|
import sys
|
||||||
import os
|
import os
|
||||||
import unittest
|
import unittest
|
||||||
|
@ -117,11 +118,11 @@ class Test_Csv(unittest.TestCase):
|
||||||
|
|
||||||
|
|
||||||
def _write_test(self, fields, expect, **kwargs):
|
def _write_test(self, fields, expect, **kwargs):
|
||||||
with TemporaryFile("w+b") as fileobj:
|
with TemporaryFile("w+", newline='') as fileobj:
|
||||||
writer = csv.writer(fileobj, **kwargs)
|
writer = csv.writer(fileobj, **kwargs)
|
||||||
writer.writerow(fields)
|
writer.writerow(fields)
|
||||||
fileobj.seek(0)
|
fileobj.seek(0)
|
||||||
self.assertEqual(str(fileobj.read()),
|
self.assertEqual(fileobj.read(),
|
||||||
expect + writer.dialect.lineterminator)
|
expect + writer.dialect.lineterminator)
|
||||||
|
|
||||||
def test_write_arg_valid(self):
|
def test_write_arg_valid(self):
|
||||||
|
@ -188,12 +189,12 @@ class Test_Csv(unittest.TestCase):
|
||||||
writer = csv.writer(BrokenFile())
|
writer = csv.writer(BrokenFile())
|
||||||
self.assertRaises(IOError, writer.writerows, [['a']])
|
self.assertRaises(IOError, writer.writerows, [['a']])
|
||||||
|
|
||||||
with TemporaryFile("w+b") as fileobj:
|
with TemporaryFile("w+", newline='') as fileobj:
|
||||||
writer = csv.writer(fileobj)
|
writer = csv.writer(fileobj)
|
||||||
self.assertRaises(TypeError, writer.writerows, None)
|
self.assertRaises(TypeError, writer.writerows, None)
|
||||||
writer.writerows([['a','b'],['c','d']])
|
writer.writerows([['a','b'],['c','d']])
|
||||||
fileobj.seek(0)
|
fileobj.seek(0)
|
||||||
self.assertEqual(fileobj.read(), b"a,b\r\nc,d\r\n")
|
self.assertEqual(fileobj.read(), "a,b\r\nc,d\r\n")
|
||||||
|
|
||||||
def _read_test(self, input, expect, **kwargs):
|
def _read_test(self, input, expect, **kwargs):
|
||||||
reader = csv.reader(input, **kwargs)
|
reader = csv.reader(input, **kwargs)
|
||||||
|
@ -332,11 +333,13 @@ class TestDialectRegistry(unittest.TestCase):
|
||||||
self.assertEqual(next(reader), ["c1ccccc1", "benzene"])
|
self.assertEqual(next(reader), ["c1ccccc1", "benzene"])
|
||||||
|
|
||||||
def compare_dialect_123(self, expected, *writeargs, **kwwriteargs):
|
def compare_dialect_123(self, expected, *writeargs, **kwwriteargs):
|
||||||
with TemporaryFile("w+b") as fileobj:
|
|
||||||
|
with TemporaryFile("w+", newline='', encoding="utf-8") as fileobj:
|
||||||
|
|
||||||
writer = csv.writer(fileobj, *writeargs, **kwwriteargs)
|
writer = csv.writer(fileobj, *writeargs, **kwwriteargs)
|
||||||
writer.writerow([1,2,3])
|
writer.writerow([1,2,3])
|
||||||
fileobj.seek(0)
|
fileobj.seek(0)
|
||||||
self.assertEqual(str(fileobj.read()), expected)
|
self.assertEqual(fileobj.read(), expected)
|
||||||
|
|
||||||
def test_dialect_apply(self):
|
def test_dialect_apply(self):
|
||||||
class testA(csv.excel):
|
class testA(csv.excel):
|
||||||
|
@ -380,11 +383,11 @@ class TestCsvBase(unittest.TestCase):
|
||||||
self.assertEqual(fields, expected_result)
|
self.assertEqual(fields, expected_result)
|
||||||
|
|
||||||
def writerAssertEqual(self, input, expected_result):
|
def writerAssertEqual(self, input, expected_result):
|
||||||
with TemporaryFile("w+b") as fileobj:
|
with TemporaryFile("w+", newline='') as fileobj:
|
||||||
writer = csv.writer(fileobj, dialect = self.dialect)
|
writer = csv.writer(fileobj, dialect = self.dialect)
|
||||||
writer.writerows(input)
|
writer.writerows(input)
|
||||||
fileobj.seek(0)
|
fileobj.seek(0)
|
||||||
self.assertEqual(str(fileobj.read()), expected_result)
|
self.assertEqual(fileobj.read(), expected_result)
|
||||||
|
|
||||||
class TestDialectExcel(TestCsvBase):
|
class TestDialectExcel(TestCsvBase):
|
||||||
dialect = 'excel'
|
dialect = 'excel'
|
||||||
|
@ -513,11 +516,11 @@ class TestDictFields(unittest.TestCase):
|
||||||
### "long" means the row is longer than the number of fieldnames
|
### "long" means the row is longer than the number of fieldnames
|
||||||
### "short" means there are fewer elements in the row than fieldnames
|
### "short" means there are fewer elements in the row than fieldnames
|
||||||
def test_write_simple_dict(self):
|
def test_write_simple_dict(self):
|
||||||
with TemporaryFile("w+b") as fileobj:
|
with TemporaryFile("w+", newline='') as fileobj:
|
||||||
writer = csv.DictWriter(fileobj, fieldnames = ["f1", "f2", "f3"])
|
writer = csv.DictWriter(fileobj, fieldnames = ["f1", "f2", "f3"])
|
||||||
writer.writerow({"f1": 10, "f3": "abc"})
|
writer.writerow({"f1": 10, "f3": "abc"})
|
||||||
fileobj.seek(0)
|
fileobj.seek(0)
|
||||||
self.assertEqual(str(fileobj.read()), "10,,abc\r\n")
|
self.assertEqual(fileobj.read(), "10,,abc\r\n")
|
||||||
|
|
||||||
def test_write_no_fields(self):
|
def test_write_no_fields(self):
|
||||||
fileobj = StringIO()
|
fileobj = StringIO()
|
||||||
|
@ -614,45 +617,45 @@ class TestArrayWrites(unittest.TestCase):
|
||||||
contents = [(20-i) for i in range(20)]
|
contents = [(20-i) for i in range(20)]
|
||||||
a = array.array('i', contents)
|
a = array.array('i', contents)
|
||||||
|
|
||||||
with TemporaryFile("w+b") as fileobj:
|
with TemporaryFile("w+", newline='') as fileobj:
|
||||||
writer = csv.writer(fileobj, dialect="excel")
|
writer = csv.writer(fileobj, dialect="excel")
|
||||||
writer.writerow(a)
|
writer.writerow(a)
|
||||||
expected = ",".join([str(i) for i in a])+"\r\n"
|
expected = ",".join([str(i) for i in a])+"\r\n"
|
||||||
fileobj.seek(0)
|
fileobj.seek(0)
|
||||||
self.assertEqual(str(fileobj.read()), expected)
|
self.assertEqual(fileobj.read(), expected)
|
||||||
|
|
||||||
def test_double_write(self):
|
def test_double_write(self):
|
||||||
import array
|
import array
|
||||||
contents = [(20-i)*0.1 for i in range(20)]
|
contents = [(20-i)*0.1 for i in range(20)]
|
||||||
a = array.array('d', contents)
|
a = array.array('d', contents)
|
||||||
with TemporaryFile("w+b") as fileobj:
|
with TemporaryFile("w+", newline='') as fileobj:
|
||||||
writer = csv.writer(fileobj, dialect="excel")
|
writer = csv.writer(fileobj, dialect="excel")
|
||||||
writer.writerow(a)
|
writer.writerow(a)
|
||||||
expected = ",".join([str(i) for i in a])+"\r\n"
|
expected = ",".join([str(i) for i in a])+"\r\n"
|
||||||
fileobj.seek(0)
|
fileobj.seek(0)
|
||||||
self.assertEqual(str(fileobj.read()), expected)
|
self.assertEqual(fileobj.read(), expected)
|
||||||
|
|
||||||
def test_float_write(self):
|
def test_float_write(self):
|
||||||
import array
|
import array
|
||||||
contents = [(20-i)*0.1 for i in range(20)]
|
contents = [(20-i)*0.1 for i in range(20)]
|
||||||
a = array.array('f', contents)
|
a = array.array('f', contents)
|
||||||
with TemporaryFile("w+b") as fileobj:
|
with TemporaryFile("w+", newline='') as fileobj:
|
||||||
writer = csv.writer(fileobj, dialect="excel")
|
writer = csv.writer(fileobj, dialect="excel")
|
||||||
writer.writerow(a)
|
writer.writerow(a)
|
||||||
expected = ",".join([str(i) for i in a])+"\r\n"
|
expected = ",".join([str(i) for i in a])+"\r\n"
|
||||||
fileobj.seek(0)
|
fileobj.seek(0)
|
||||||
self.assertEqual(str(fileobj.read()), expected)
|
self.assertEqual(fileobj.read(), expected)
|
||||||
|
|
||||||
def test_char_write(self):
|
def test_char_write(self):
|
||||||
import array, string
|
import array, string
|
||||||
a = array.array('u', string.ascii_letters)
|
a = array.array('u', string.ascii_letters)
|
||||||
|
|
||||||
with TemporaryFile("w+b") as fileobj:
|
with TemporaryFile("w+", newline='') as fileobj:
|
||||||
writer = csv.writer(fileobj, dialect="excel")
|
writer = csv.writer(fileobj, dialect="excel")
|
||||||
writer.writerow(a)
|
writer.writerow(a)
|
||||||
expected = ",".join(a)+"\r\n"
|
expected = ",".join(a)+"\r\n"
|
||||||
fileobj.seek(0)
|
fileobj.seek(0)
|
||||||
self.assertEqual(str(fileobj.read()), expected)
|
self.assertEqual(fileobj.read(), expected)
|
||||||
|
|
||||||
class TestDialectValidity(unittest.TestCase):
|
class TestDialectValidity(unittest.TestCase):
|
||||||
def test_quoting(self):
|
def test_quoting(self):
|
||||||
|
@ -864,10 +867,8 @@ class TestUnicode(unittest.TestCase):
|
||||||
|
|
||||||
def test_unicode_read(self):
|
def test_unicode_read(self):
|
||||||
import io
|
import io
|
||||||
fileobj = io.TextIOWrapper(TemporaryFile("w+b"), encoding="utf-16")
|
with TemporaryFile("w+", newline='', encoding="utf-8") as fileobj:
|
||||||
with fileobj as fileobj:
|
|
||||||
fileobj.write(",".join(self.names) + "\r\n")
|
fileobj.write(",".join(self.names) + "\r\n")
|
||||||
|
|
||||||
fileobj.seek(0)
|
fileobj.seek(0)
|
||||||
reader = csv.reader(fileobj)
|
reader = csv.reader(fileobj)
|
||||||
self.assertEqual(list(reader), [self.names])
|
self.assertEqual(list(reader), [self.names])
|
||||||
|
@ -875,14 +876,12 @@ class TestUnicode(unittest.TestCase):
|
||||||
|
|
||||||
def test_unicode_write(self):
|
def test_unicode_write(self):
|
||||||
import io
|
import io
|
||||||
with TemporaryFile("w+b") as fileobj:
|
with TemporaryFile("w+", newline='', encoding="utf-8") as fileobj:
|
||||||
encwriter = io.TextIOWrapper(fileobj, encoding="utf-8")
|
writer = csv.writer(fileobj)
|
||||||
writer = csv.writer(encwriter)
|
|
||||||
writer.writerow(self.names)
|
writer.writerow(self.names)
|
||||||
expected = ",".join(self.names)+"\r\n"
|
expected = ",".join(self.names)+"\r\n"
|
||||||
fileobj.seek(0)
|
fileobj.seek(0)
|
||||||
self.assertEqual(str(fileobj.read()), expected)
|
self.assertEqual(fileobj.read(), expected)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -746,6 +746,18 @@ class test_SpooledTemporaryFile(TC):
|
||||||
f.seek(0)
|
f.seek(0)
|
||||||
self.assertEqual(f.read(), "abc\ndef\nxyzzy\n")
|
self.assertEqual(f.read(), "abc\ndef\nxyzzy\n")
|
||||||
|
|
||||||
|
def test_text_newline_and_encoding(self):
|
||||||
|
f = tempfile.SpooledTemporaryFile(mode='w+', max_size=10,
|
||||||
|
newline='', encoding='utf-8')
|
||||||
|
f.write("\u039B\r\n")
|
||||||
|
f.seek(0)
|
||||||
|
self.assertEqual(f.read(), "\u039B\r\n")
|
||||||
|
self.failIf(f._rolled)
|
||||||
|
|
||||||
|
f.write("\u039B" * 20 + "\r\n")
|
||||||
|
f.seek(0)
|
||||||
|
self.assertEqual(f.read(), "\u039B\r\n" + ("\u039B" * 20) + "\r\n")
|
||||||
|
self.failUnless(f._rolled)
|
||||||
|
|
||||||
test_classes.append(test_SpooledTemporaryFile)
|
test_classes.append(test_SpooledTemporaryFile)
|
||||||
|
|
||||||
|
@ -790,6 +802,18 @@ class test_TemporaryFile(TC):
|
||||||
self.failOnException("close")
|
self.failOnException("close")
|
||||||
|
|
||||||
# How to test the mode and bufsize parameters?
|
# How to test the mode and bufsize parameters?
|
||||||
|
def test_mode_and_encoding(self):
|
||||||
|
|
||||||
|
def roundtrip(input, *args, **kwargs):
|
||||||
|
with tempfile.TemporaryFile(*args, **kwargs) as fileobj:
|
||||||
|
fileobj.write(input)
|
||||||
|
fileobj.seek(0)
|
||||||
|
self.assertEquals(input, fileobj.read())
|
||||||
|
|
||||||
|
roundtrip(b"1234", "w+b")
|
||||||
|
roundtrip("abdc\n", "w+")
|
||||||
|
roundtrip("\u039B", "w+", encoding="utf-16")
|
||||||
|
roundtrip("foo\r\n", "w+", newline="")
|
||||||
|
|
||||||
|
|
||||||
if tempfile.NamedTemporaryFile is not tempfile.TemporaryFile:
|
if tempfile.NamedTemporaryFile is not tempfile.TemporaryFile:
|
||||||
|
|
Loading…
Reference in New Issue