bpo-43651: Fix EncodingWarning in test_io (GH-25097)

This commit is contained in:
Inada Naoki 2021-04-01 11:25:04 +09:00 committed by GitHub
parent 55f31be44b
commit 58cffba187
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 78 additions and 67 deletions

View File

@ -407,10 +407,12 @@ class IOTest(unittest.TestCase):
def test_invalid_operations(self):
# Try writing on a file opened in read mode and vice-versa.
exc = self.UnsupportedOperation
for mode in ("w", "wb"):
with self.open(os_helper.TESTFN, mode) as fp:
self.assertRaises(exc, fp.read)
self.assertRaises(exc, fp.readline)
with self.open(os_helper.TESTFN, "w", encoding="utf-8") as fp:
self.assertRaises(exc, fp.read)
self.assertRaises(exc, fp.readline)
with self.open(os_helper.TESTFN, "wb") as fp:
self.assertRaises(exc, fp.read)
self.assertRaises(exc, fp.readline)
with self.open(os_helper.TESTFN, "wb", buffering=0) as fp:
self.assertRaises(exc, fp.read)
self.assertRaises(exc, fp.readline)
@ -420,7 +422,7 @@ class IOTest(unittest.TestCase):
with self.open(os_helper.TESTFN, "rb") as fp:
self.assertRaises(exc, fp.write, b"blah")
self.assertRaises(exc, fp.writelines, [b"blah\n"])
with self.open(os_helper.TESTFN, "r") as fp:
with self.open(os_helper.TESTFN, "r", encoding="utf-8") as fp:
self.assertRaises(exc, fp.write, "blah")
self.assertRaises(exc, fp.writelines, ["blah\n"])
# Non-zero seeking from current or end pos
@ -533,12 +535,12 @@ class IOTest(unittest.TestCase):
def test_open_handles_NUL_chars(self):
fn_with_NUL = 'foo\0bar'
self.assertRaises(ValueError, self.open, fn_with_NUL, 'w')
self.assertRaises(ValueError, self.open, fn_with_NUL, 'w', encoding="utf-8")
bytes_fn = bytes(fn_with_NUL, 'ascii')
with warnings.catch_warnings():
warnings.simplefilter("ignore", DeprecationWarning)
self.assertRaises(ValueError, self.open, bytes_fn, 'w')
self.assertRaises(ValueError, self.open, bytes_fn, 'w', encoding="utf-8")
def test_raw_file_io(self):
with self.open(os_helper.TESTFN, "wb", buffering=0) as f:
@ -575,7 +577,7 @@ class IOTest(unittest.TestCase):
self.assertEqual(f.readline(), b"foo\x00bar\n")
self.assertEqual(f.readline(None), b"another line")
self.assertRaises(TypeError, f.readline, 5.3)
with self.open(os_helper.TESTFN, "r") as f:
with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f:
self.assertRaises(TypeError, f.readline, 5.3)
def test_readline_nonsizeable(self):
@ -638,7 +640,7 @@ class IOTest(unittest.TestCase):
self.assertEqual(f.tell(), 3)
with self.open(os_helper.TESTFN, "ab") as f:
self.assertEqual(f.tell(), 3)
with self.open(os_helper.TESTFN, "a") as f:
with self.open(os_helper.TESTFN, "a", encoding="utf-8") as f:
self.assertGreater(f.tell(), 0)
def test_destructor(self):
@ -730,13 +732,13 @@ class IOTest(unittest.TestCase):
def test_closefd(self):
self.assertRaises(ValueError, self.open, os_helper.TESTFN, 'w',
closefd=False)
encoding="utf-8", closefd=False)
def test_read_closed(self):
with self.open(os_helper.TESTFN, "w") as f:
with self.open(os_helper.TESTFN, "w", encoding="utf-8") as f:
f.write("egg\n")
with self.open(os_helper.TESTFN, "r") as f:
file = self.open(f.fileno(), "r", closefd=False)
with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f:
file = self.open(f.fileno(), "r", encoding="utf-8", closefd=False)
self.assertEqual(file.read(), "egg\n")
file.seek(0)
file.close()
@ -749,14 +751,15 @@ class IOTest(unittest.TestCase):
def test_no_closefd_with_filename(self):
# can't use closefd in combination with a file name
self.assertRaises(ValueError, self.open, os_helper.TESTFN, "r", closefd=False)
self.assertRaises(ValueError, self.open, os_helper.TESTFN, "r",
encoding="utf-8", closefd=False)
def test_closefd_attr(self):
with self.open(os_helper.TESTFN, "wb") as f:
f.write(b"egg\n")
with self.open(os_helper.TESTFN, "r") as f:
with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f:
self.assertEqual(f.buffer.raw.closefd, True)
file = self.open(f.fileno(), "r", closefd=False)
file = self.open(f.fileno(), "r", encoding="utf-8", closefd=False)
self.assertEqual(file.buffer.raw.closefd, False)
def test_garbage_collection(self):
@ -821,11 +824,11 @@ class IOTest(unittest.TestCase):
self.check_flush_error_on_close(fd, 'wb', closefd=False)
os.close(fd)
# text io
self.check_flush_error_on_close(os_helper.TESTFN, 'w')
self.check_flush_error_on_close(os_helper.TESTFN, 'w', encoding="utf-8")
fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT)
self.check_flush_error_on_close(fd, 'w')
self.check_flush_error_on_close(fd, 'w', encoding="utf-8")
fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT)
self.check_flush_error_on_close(fd, 'w', closefd=False)
self.check_flush_error_on_close(fd, 'w', encoding="utf-8", closefd=False)
os.close(fd)
def test_multi_close(self):
@ -860,12 +863,12 @@ class IOTest(unittest.TestCase):
self.assertTrue(hasattr(obj, "__dict__"))
def test_opener(self):
with self.open(os_helper.TESTFN, "w") as f:
with self.open(os_helper.TESTFN, "w", encoding="utf-8") as f:
f.write("egg\n")
fd = os.open(os_helper.TESTFN, os.O_RDONLY)
def opener(path, flags):
return fd
with self.open("non-existent", "r", opener=opener) as f:
with self.open("non-existent", "r", encoding="utf-8", opener=opener) as f:
self.assertEqual(f.read(), "egg\n")
def test_bad_opener_negative_1(self):
@ -899,12 +902,12 @@ class IOTest(unittest.TestCase):
def test_nonbuffered_textio(self):
with warnings_helper.check_no_resource_warning(self):
with self.assertRaises(ValueError):
self.open(os_helper.TESTFN, 'w', buffering=0)
self.open(os_helper.TESTFN, 'w', encoding="utf-8", buffering=0)
def test_invalid_newline(self):
with warnings_helper.check_no_resource_warning(self):
with self.assertRaises(ValueError):
self.open(os_helper.TESTFN, 'w', newline='invalid')
self.open(os_helper.TESTFN, 'w', encoding="utf-8", newline='invalid')
def test_buffered_readinto_mixin(self):
# Test the implementation provided by BufferedIOBase
@ -921,31 +924,31 @@ class IOTest(unittest.TestCase):
def test_fspath_support(self):
def check_path_succeeds(path):
with self.open(path, "w") as f:
with self.open(path, "w", encoding="utf-8") as f:
f.write("egg\n")
with self.open(path, "r") as f:
with self.open(path, "r", encoding="utf-8") as f:
self.assertEqual(f.read(), "egg\n")
check_path_succeeds(FakePath(os_helper.TESTFN))
check_path_succeeds(FakePath(os.fsencode(os_helper.TESTFN)))
with self.open(os_helper.TESTFN, "w") as f:
with self.open(os_helper.TESTFN, "w", encoding="utf-8") as f:
bad_path = FakePath(f.fileno())
with self.assertRaises(TypeError):
self.open(bad_path, 'w')
self.open(bad_path, 'w', encoding="utf-8")
bad_path = FakePath(None)
with self.assertRaises(TypeError):
self.open(bad_path, 'w')
self.open(bad_path, 'w', encoding="utf-8")
bad_path = FakePath(FloatingPointError)
with self.assertRaises(FloatingPointError):
self.open(bad_path, 'w')
self.open(bad_path, 'w', encoding="utf-8")
# ensure that refcounting is correct with some error conditions
with self.assertRaisesRegex(ValueError, 'read/write/append mode'):
self.open(FakePath(os_helper.TESTFN), 'rwxa')
self.open(FakePath(os_helper.TESTFN), 'rwxa', encoding="utf-8")
def test_RawIOBase_readall(self):
# Exercise the default unlimited RawIOBase.read() and readall()
@ -2590,7 +2593,7 @@ class TextIOWrapperTest(unittest.TestCase):
def test_constructor(self):
r = self.BytesIO(b"\xc3\xa9\n\n")
b = self.BufferedReader(r, 1000)
t = self.TextIOWrapper(b)
t = self.TextIOWrapper(b, encoding="utf-8")
t.__init__(b, encoding="latin-1", newline="\r\n")
self.assertEqual(t.encoding, "latin-1")
self.assertEqual(t.line_buffering, False)
@ -2609,7 +2612,7 @@ class TextIOWrapperTest(unittest.TestCase):
self.assertRaisesRegex((ValueError, AttributeError),
'uninitialized|has no attribute',
t.read, 0)
t.__init__(self.MockRawIO())
t.__init__(self.MockRawIO(), encoding="utf-8")
self.assertEqual(t.read(0), '')
def test_non_text_encoding_codecs_are_rejected(self):
@ -2624,7 +2627,7 @@ class TextIOWrapperTest(unittest.TestCase):
def test_detach(self):
r = self.BytesIO()
b = self.BufferedWriter(r)
t = self.TextIOWrapper(b)
t = self.TextIOWrapper(b, encoding="ascii")
self.assertIs(t.detach(), b)
t = self.TextIOWrapper(b, encoding="ascii")
@ -2664,7 +2667,7 @@ class TextIOWrapperTest(unittest.TestCase):
def test_recursive_repr(self):
# Issue #25455
raw = self.BytesIO()
t = self.TextIOWrapper(raw)
t = self.TextIOWrapper(raw, encoding="utf-8")
with support.swap_attr(raw, 'name', t):
try:
repr(t) # Should not crash
@ -2674,7 +2677,7 @@ class TextIOWrapperTest(unittest.TestCase):
def test_line_buffering(self):
r = self.BytesIO()
b = self.BufferedWriter(r, 1000)
t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
t = self.TextIOWrapper(b, encoding="utf-8", newline="\n", line_buffering=True)
t.write("X")
self.assertEqual(r.getvalue(), b"") # No flush happened
t.write("Y\nZ")
@ -2685,7 +2688,7 @@ class TextIOWrapperTest(unittest.TestCase):
def test_reconfigure_line_buffering(self):
r = self.BytesIO()
b = self.BufferedWriter(r, 1000)
t = self.TextIOWrapper(b, newline="\n", line_buffering=False)
t = self.TextIOWrapper(b, encoding="utf-8", newline="\n", line_buffering=False)
t.write("AB\nC")
self.assertEqual(r.getvalue(), b"")
@ -2722,7 +2725,9 @@ class TextIOWrapperTest(unittest.TestCase):
current_locale_encoding = locale.getpreferredencoding(False)
b = self.BytesIO()
t = self.TextIOWrapper(b)
with warnings.catch_warnings():
warnings.simplefilter("ignore", EncodingWarning)
t = self.TextIOWrapper(b)
self.assertEqual(t.encoding, current_locale_encoding)
finally:
os.environ.clear()
@ -2735,16 +2740,18 @@ class TextIOWrapperTest(unittest.TestCase):
import _testcapi
b = self.BytesIO()
b.fileno = lambda: _testcapi.INT_MAX + 1
self.assertRaises(OverflowError, self.TextIOWrapper, b)
self.assertRaises(OverflowError, self.TextIOWrapper, b, encoding="locale")
b.fileno = lambda: _testcapi.UINT_MAX + 1
self.assertRaises(OverflowError, self.TextIOWrapper, b)
self.assertRaises(OverflowError, self.TextIOWrapper, b, encoding="locale")
def test_encoding(self):
# Check the encoding attribute is always set, and valid
b = self.BytesIO()
t = self.TextIOWrapper(b, encoding="utf-8")
self.assertEqual(t.encoding, "utf-8")
t = self.TextIOWrapper(b)
with warnings.catch_warnings():
warnings.simplefilter("ignore", EncodingWarning)
t = self.TextIOWrapper(b)
self.assertIsNotNone(t.encoding)
codecs.lookup(t.encoding)
@ -2909,7 +2916,7 @@ class TextIOWrapperTest(unittest.TestCase):
rawio = self.CloseFailureIO()
with support.catch_unraisable_exception() as cm:
with self.assertRaises(AttributeError):
self.TextIOWrapper(rawio).xyzzy
self.TextIOWrapper(rawio, encoding="utf-8").xyzzy
if not IOBASE_EMITS_UNRAISABLE:
self.assertIsNone(cm.unraisable)
@ -3116,11 +3123,11 @@ class TextIOWrapperTest(unittest.TestCase):
class UnReadable(self.BytesIO):
def readable(self):
return False
txt = self.TextIOWrapper(UnReadable())
txt = self.TextIOWrapper(UnReadable(), encoding="utf-8")
self.assertRaises(OSError, txt.read)
def test_read_one_by_one(self):
txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"), encoding="utf-8")
reads = ""
while True:
c = txt.read(1)
@ -3130,7 +3137,7 @@ class TextIOWrapperTest(unittest.TestCase):
self.assertEqual(reads, "AA\nBB")
def test_readlines(self):
txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"), encoding="utf-8")
self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
txt.seek(0)
self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
@ -3140,7 +3147,7 @@ class TextIOWrapperTest(unittest.TestCase):
# read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
def test_read_by_chunk(self):
# make sure "\r\n" straddles 128 char boundary.
txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"), encoding="utf-8")
reads = ""
while True:
c = txt.read(128)
@ -3152,7 +3159,7 @@ class TextIOWrapperTest(unittest.TestCase):
def test_writelines(self):
l = ['ab', 'cd', 'ef']
buf = self.BytesIO()
txt = self.TextIOWrapper(buf)
txt = self.TextIOWrapper(buf, encoding="utf-8")
txt.writelines(l)
txt.flush()
self.assertEqual(buf.getvalue(), b'abcdef')
@ -3160,13 +3167,13 @@ class TextIOWrapperTest(unittest.TestCase):
def test_writelines_userlist(self):
l = UserList(['ab', 'cd', 'ef'])
buf = self.BytesIO()
txt = self.TextIOWrapper(buf)
txt = self.TextIOWrapper(buf, encoding="utf-8")
txt.writelines(l)
txt.flush()
self.assertEqual(buf.getvalue(), b'abcdef')
def test_writelines_error(self):
txt = self.TextIOWrapper(self.BytesIO())
txt = self.TextIOWrapper(self.BytesIO(), encoding="utf-8")
self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
self.assertRaises(TypeError, txt.writelines, None)
self.assertRaises(TypeError, txt.writelines, b'abc')
@ -3274,16 +3281,16 @@ class TextIOWrapperTest(unittest.TestCase):
self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
def test_errors_property(self):
with self.open(os_helper.TESTFN, "w") as f:
with self.open(os_helper.TESTFN, "w", encoding="utf-8") as f:
self.assertEqual(f.errors, "strict")
with self.open(os_helper.TESTFN, "w", errors="replace") as f:
with self.open(os_helper.TESTFN, "w", encoding="utf-8", errors="replace") as f:
self.assertEqual(f.errors, "replace")
@support.no_tracing
def test_threads_write(self):
# Issue6750: concurrent writes could duplicate data
event = threading.Event()
with self.open(os_helper.TESTFN, "w", buffering=1) as f:
with self.open(os_helper.TESTFN, "w", encoding="utf-8", buffering=1) as f:
def run(n):
text = "Thread%03d\n" % n
event.wait()
@ -3292,7 +3299,7 @@ class TextIOWrapperTest(unittest.TestCase):
for x in range(20)]
with threading_helper.start_threads(threads, event.set):
time.sleep(0.02)
with self.open(os_helper.TESTFN) as f:
with self.open(os_helper.TESTFN, encoding="utf-8") as f:
content = f.read()
for n in range(20):
self.assertEqual(content.count("Thread%03d\n" % n), 1)
@ -3363,7 +3370,7 @@ class TextIOWrapperTest(unittest.TestCase):
self.assertRaises(ValueError, txt.flush)
def test_unseekable(self):
txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata), encoding="utf-8")
self.assertRaises(self.UnsupportedOperation, txt.tell)
self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
@ -3452,11 +3459,11 @@ class TextIOWrapperTest(unittest.TestCase):
def test_read_nonbytes(self):
# Issue #17106
# Crash when underlying read() returns non-bytes
t = self.TextIOWrapper(self.StringIO('a'))
t = self.TextIOWrapper(self.StringIO('a'), encoding="utf-8")
self.assertRaises(TypeError, t.read, 1)
t = self.TextIOWrapper(self.StringIO('a'))
t = self.TextIOWrapper(self.StringIO('a'), encoding="utf-8")
self.assertRaises(TypeError, t.readline)
t = self.TextIOWrapper(self.StringIO('a'))
t = self.TextIOWrapper(self.StringIO('a'), encoding="utf-8")
self.assertRaises(TypeError, t.read)
def test_illegal_encoder(self):
@ -3724,7 +3731,7 @@ class CTextIOWrapperTest(TextIOWrapperTest):
def test_initialization(self):
r = self.BytesIO(b"\xc3\xa9\n\n")
b = self.BufferedReader(r, 1000)
t = self.TextIOWrapper(b)
t = self.TextIOWrapper(b, encoding="utf-8")
self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
self.assertRaises(ValueError, t.read)
@ -3948,7 +3955,7 @@ class MiscIOTest(unittest.TestCase):
f.close()
with warnings_helper.check_warnings(('', DeprecationWarning)):
f = self.open(os_helper.TESTFN, "U")
f = self.open(os_helper.TESTFN, "U", encoding="utf-8")
self.assertEqual(f.name, os_helper.TESTFN)
self.assertEqual(f.buffer.name, os_helper.TESTFN)
self.assertEqual(f.buffer.raw.name, os_helper.TESTFN)
@ -3957,7 +3964,7 @@ class MiscIOTest(unittest.TestCase):
self.assertEqual(f.buffer.raw.mode, "rb")
f.close()
f = self.open(os_helper.TESTFN, "w+")
f = self.open(os_helper.TESTFN, "w+", encoding="utf-8")
self.assertEqual(f.mode, "w+")
self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
self.assertEqual(f.buffer.raw.mode, "rb+")
@ -3974,7 +3981,7 @@ class MiscIOTest(unittest.TestCase):
# bpo-27805: Ignore ESPIPE from lseek() in open().
r, w = os.pipe()
self.addCleanup(os.close, r)
f = self.open(w, 'a')
f = self.open(w, 'a', encoding="utf-8")
self.addCleanup(f.close)
# Check that the file is marked non-seekable. On Windows, however, lseek
# somehow succeeds on pipes.
@ -3999,6 +4006,8 @@ class MiscIOTest(unittest.TestCase):
{"mode": "w+", "buffering": 2},
{"mode": "w+b", "buffering": 0},
]:
if "b" not in kwargs["mode"]:
kwargs["encoding"] = "utf-8"
f = self.open(os_helper.TESTFN, **kwargs)
f.close()
self.assertRaises(ValueError, f.flush)
@ -4059,7 +4068,7 @@ class MiscIOTest(unittest.TestCase):
self.assertNotIsInstance(f, abcmodule.RawIOBase)
self.assertIsInstance(f, abcmodule.BufferedIOBase)
self.assertNotIsInstance(f, abcmodule.TextIOBase)
with self.open(os_helper.TESTFN, "w") as f:
with self.open(os_helper.TESTFN, "w", encoding="utf-8") as f:
self.assertIsInstance(f, abcmodule.IOBase)
self.assertNotIsInstance(f, abcmodule.RawIOBase)
self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
@ -4085,7 +4094,7 @@ class MiscIOTest(unittest.TestCase):
def test_warn_on_dealloc(self):
self._check_warn_on_dealloc(os_helper.TESTFN, "wb", buffering=0)
self._check_warn_on_dealloc(os_helper.TESTFN, "wb")
self._check_warn_on_dealloc(os_helper.TESTFN, "w")
self._check_warn_on_dealloc(os_helper.TESTFN, "w", encoding="utf-8")
def _check_warn_on_dealloc_fd(self, *args, **kwargs):
fds = []
@ -4109,7 +4118,7 @@ class MiscIOTest(unittest.TestCase):
def test_warn_on_dealloc_fd(self):
self._check_warn_on_dealloc_fd("rb", buffering=0)
self._check_warn_on_dealloc_fd("rb")
self._check_warn_on_dealloc_fd("r")
self._check_warn_on_dealloc_fd("r", encoding="utf-8")
def test_pickling(self):
@ -4125,6 +4134,8 @@ class MiscIOTest(unittest.TestCase):
{"mode": "w+b"},
{"mode": "w+b", "buffering": 0},
]:
if "b" not in kwargs["mode"]:
kwargs["encoding"] = "utf-8"
for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
with self.open(os_helper.TESTFN, **kwargs) as f:
self.assertRaises(TypeError, pickle.dumps, f, protocol)
@ -4189,9 +4200,9 @@ class MiscIOTest(unittest.TestCase):
def test_create_fail(self):
# 'x' mode fails if file is existing
with self.open(os_helper.TESTFN, 'w'):
with self.open(os_helper.TESTFN, 'w', encoding="utf-8"):
pass
self.assertRaises(FileExistsError, self.open, os_helper.TESTFN, 'x')
self.assertRaises(FileExistsError, self.open, os_helper.TESTFN, 'x', encoding="utf-8")
def test_create_writes(self):
# 'x' mode opens for writing
@ -4202,7 +4213,7 @@ class MiscIOTest(unittest.TestCase):
def test_open_allargs(self):
# there used to be a buffer overflow in the parser for rawmode
self.assertRaises(ValueError, self.open, os_helper.TESTFN, 'rwax+')
self.assertRaises(ValueError, self.open, os_helper.TESTFN, 'rwax+', encoding="utf-8")
def test_check_encoding_errors(self):
# bpo-37388: open() and TextIOWrapper must check encoding and errors
@ -4474,7 +4485,7 @@ class SignalsTest(unittest.TestCase):
def test_interrupted_read_retry_text(self):
self.check_interrupted_read_retry(lambda x: x,
mode="r")
mode="r", encoding="latin1")
def check_interrupted_write_retry(self, item, **fdopen_kwargs):
"""Check that a buffered write, when it gets interrupted (either