From 58cffba1874f0e9a9731b25a3e11a011bfbbf95f Mon Sep 17 00:00:00 2001 From: Inada Naoki Date: Thu, 1 Apr 2021 11:25:04 +0900 Subject: [PATCH] bpo-43651: Fix EncodingWarning in test_io (GH-25097) --- Lib/test/test_io.py | 145 ++++++++++++++++++++++++-------------------- 1 file changed, 78 insertions(+), 67 deletions(-) diff --git a/Lib/test/test_io.py b/Lib/test/test_io.py index c731302a9f2..d493b19525a 100644 --- a/Lib/test/test_io.py +++ b/Lib/test/test_io.py @@ -407,10 +407,12 @@ class IOTest(unittest.TestCase): def test_invalid_operations(self): # Try writing on a file opened in read mode and vice-versa. exc = self.UnsupportedOperation - for mode in ("w", "wb"): - with self.open(os_helper.TESTFN, mode) as fp: - self.assertRaises(exc, fp.read) - self.assertRaises(exc, fp.readline) + with self.open(os_helper.TESTFN, "w", encoding="utf-8") as fp: + self.assertRaises(exc, fp.read) + self.assertRaises(exc, fp.readline) + with self.open(os_helper.TESTFN, "wb") as fp: + self.assertRaises(exc, fp.read) + self.assertRaises(exc, fp.readline) with self.open(os_helper.TESTFN, "wb", buffering=0) as fp: self.assertRaises(exc, fp.read) self.assertRaises(exc, fp.readline) @@ -420,7 +422,7 @@ class IOTest(unittest.TestCase): with self.open(os_helper.TESTFN, "rb") as fp: self.assertRaises(exc, fp.write, b"blah") self.assertRaises(exc, fp.writelines, [b"blah\n"]) - with self.open(os_helper.TESTFN, "r") as fp: + with self.open(os_helper.TESTFN, "r", encoding="utf-8") as fp: self.assertRaises(exc, fp.write, "blah") self.assertRaises(exc, fp.writelines, ["blah\n"]) # Non-zero seeking from current or end pos @@ -533,12 +535,12 @@ class IOTest(unittest.TestCase): def test_open_handles_NUL_chars(self): fn_with_NUL = 'foo\0bar' - self.assertRaises(ValueError, self.open, fn_with_NUL, 'w') + self.assertRaises(ValueError, self.open, fn_with_NUL, 'w', encoding="utf-8") bytes_fn = bytes(fn_with_NUL, 'ascii') with warnings.catch_warnings(): warnings.simplefilter("ignore", DeprecationWarning) - self.assertRaises(ValueError, self.open, bytes_fn, 'w') + self.assertRaises(ValueError, self.open, bytes_fn, 'w', encoding="utf-8") def test_raw_file_io(self): with self.open(os_helper.TESTFN, "wb", buffering=0) as f: @@ -575,7 +577,7 @@ class IOTest(unittest.TestCase): self.assertEqual(f.readline(), b"foo\x00bar\n") self.assertEqual(f.readline(None), b"another line") self.assertRaises(TypeError, f.readline, 5.3) - with self.open(os_helper.TESTFN, "r") as f: + with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f: self.assertRaises(TypeError, f.readline, 5.3) def test_readline_nonsizeable(self): @@ -638,7 +640,7 @@ class IOTest(unittest.TestCase): self.assertEqual(f.tell(), 3) with self.open(os_helper.TESTFN, "ab") as f: self.assertEqual(f.tell(), 3) - with self.open(os_helper.TESTFN, "a") as f: + with self.open(os_helper.TESTFN, "a", encoding="utf-8") as f: self.assertGreater(f.tell(), 0) def test_destructor(self): @@ -730,13 +732,13 @@ class IOTest(unittest.TestCase): def test_closefd(self): self.assertRaises(ValueError, self.open, os_helper.TESTFN, 'w', - closefd=False) + encoding="utf-8", closefd=False) def test_read_closed(self): - with self.open(os_helper.TESTFN, "w") as f: + with self.open(os_helper.TESTFN, "w", encoding="utf-8") as f: f.write("egg\n") - with self.open(os_helper.TESTFN, "r") as f: - file = self.open(f.fileno(), "r", closefd=False) + with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f: + file = self.open(f.fileno(), "r", encoding="utf-8", closefd=False) self.assertEqual(file.read(), "egg\n") file.seek(0) file.close() @@ -749,14 +751,15 @@ class IOTest(unittest.TestCase): def test_no_closefd_with_filename(self): # can't use closefd in combination with a file name - self.assertRaises(ValueError, self.open, os_helper.TESTFN, "r", closefd=False) + self.assertRaises(ValueError, self.open, os_helper.TESTFN, "r", + encoding="utf-8", closefd=False) def test_closefd_attr(self): with self.open(os_helper.TESTFN, "wb") as f: f.write(b"egg\n") - with self.open(os_helper.TESTFN, "r") as f: + with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f: self.assertEqual(f.buffer.raw.closefd, True) - file = self.open(f.fileno(), "r", closefd=False) + file = self.open(f.fileno(), "r", encoding="utf-8", closefd=False) self.assertEqual(file.buffer.raw.closefd, False) def test_garbage_collection(self): @@ -821,11 +824,11 @@ class IOTest(unittest.TestCase): self.check_flush_error_on_close(fd, 'wb', closefd=False) os.close(fd) # text io - self.check_flush_error_on_close(os_helper.TESTFN, 'w') + self.check_flush_error_on_close(os_helper.TESTFN, 'w', encoding="utf-8") fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT) - self.check_flush_error_on_close(fd, 'w') + self.check_flush_error_on_close(fd, 'w', encoding="utf-8") fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT) - self.check_flush_error_on_close(fd, 'w', closefd=False) + self.check_flush_error_on_close(fd, 'w', encoding="utf-8", closefd=False) os.close(fd) def test_multi_close(self): @@ -860,12 +863,12 @@ class IOTest(unittest.TestCase): self.assertTrue(hasattr(obj, "__dict__")) def test_opener(self): - with self.open(os_helper.TESTFN, "w") as f: + with self.open(os_helper.TESTFN, "w", encoding="utf-8") as f: f.write("egg\n") fd = os.open(os_helper.TESTFN, os.O_RDONLY) def opener(path, flags): return fd - with self.open("non-existent", "r", opener=opener) as f: + with self.open("non-existent", "r", encoding="utf-8", opener=opener) as f: self.assertEqual(f.read(), "egg\n") def test_bad_opener_negative_1(self): @@ -899,12 +902,12 @@ class IOTest(unittest.TestCase): def test_nonbuffered_textio(self): with warnings_helper.check_no_resource_warning(self): with self.assertRaises(ValueError): - self.open(os_helper.TESTFN, 'w', buffering=0) + self.open(os_helper.TESTFN, 'w', encoding="utf-8", buffering=0) def test_invalid_newline(self): with warnings_helper.check_no_resource_warning(self): with self.assertRaises(ValueError): - self.open(os_helper.TESTFN, 'w', newline='invalid') + self.open(os_helper.TESTFN, 'w', encoding="utf-8", newline='invalid') def test_buffered_readinto_mixin(self): # Test the implementation provided by BufferedIOBase @@ -921,31 +924,31 @@ class IOTest(unittest.TestCase): def test_fspath_support(self): def check_path_succeeds(path): - with self.open(path, "w") as f: + with self.open(path, "w", encoding="utf-8") as f: f.write("egg\n") - with self.open(path, "r") as f: + with self.open(path, "r", encoding="utf-8") as f: self.assertEqual(f.read(), "egg\n") check_path_succeeds(FakePath(os_helper.TESTFN)) check_path_succeeds(FakePath(os.fsencode(os_helper.TESTFN))) - with self.open(os_helper.TESTFN, "w") as f: + with self.open(os_helper.TESTFN, "w", encoding="utf-8") as f: bad_path = FakePath(f.fileno()) with self.assertRaises(TypeError): - self.open(bad_path, 'w') + self.open(bad_path, 'w', encoding="utf-8") bad_path = FakePath(None) with self.assertRaises(TypeError): - self.open(bad_path, 'w') + self.open(bad_path, 'w', encoding="utf-8") bad_path = FakePath(FloatingPointError) with self.assertRaises(FloatingPointError): - self.open(bad_path, 'w') + self.open(bad_path, 'w', encoding="utf-8") # ensure that refcounting is correct with some error conditions with self.assertRaisesRegex(ValueError, 'read/write/append mode'): - self.open(FakePath(os_helper.TESTFN), 'rwxa') + self.open(FakePath(os_helper.TESTFN), 'rwxa', encoding="utf-8") def test_RawIOBase_readall(self): # Exercise the default unlimited RawIOBase.read() and readall() @@ -2590,7 +2593,7 @@ class TextIOWrapperTest(unittest.TestCase): def test_constructor(self): r = self.BytesIO(b"\xc3\xa9\n\n") b = self.BufferedReader(r, 1000) - t = self.TextIOWrapper(b) + t = self.TextIOWrapper(b, encoding="utf-8") t.__init__(b, encoding="latin-1", newline="\r\n") self.assertEqual(t.encoding, "latin-1") self.assertEqual(t.line_buffering, False) @@ -2609,7 +2612,7 @@ class TextIOWrapperTest(unittest.TestCase): self.assertRaisesRegex((ValueError, AttributeError), 'uninitialized|has no attribute', t.read, 0) - t.__init__(self.MockRawIO()) + t.__init__(self.MockRawIO(), encoding="utf-8") self.assertEqual(t.read(0), '') def test_non_text_encoding_codecs_are_rejected(self): @@ -2624,7 +2627,7 @@ class TextIOWrapperTest(unittest.TestCase): def test_detach(self): r = self.BytesIO() b = self.BufferedWriter(r) - t = self.TextIOWrapper(b) + t = self.TextIOWrapper(b, encoding="ascii") self.assertIs(t.detach(), b) t = self.TextIOWrapper(b, encoding="ascii") @@ -2664,7 +2667,7 @@ class TextIOWrapperTest(unittest.TestCase): def test_recursive_repr(self): # Issue #25455 raw = self.BytesIO() - t = self.TextIOWrapper(raw) + t = self.TextIOWrapper(raw, encoding="utf-8") with support.swap_attr(raw, 'name', t): try: repr(t) # Should not crash @@ -2674,7 +2677,7 @@ class TextIOWrapperTest(unittest.TestCase): def test_line_buffering(self): r = self.BytesIO() b = self.BufferedWriter(r, 1000) - t = self.TextIOWrapper(b, newline="\n", line_buffering=True) + t = self.TextIOWrapper(b, encoding="utf-8", newline="\n", line_buffering=True) t.write("X") self.assertEqual(r.getvalue(), b"") # No flush happened t.write("Y\nZ") @@ -2685,7 +2688,7 @@ class TextIOWrapperTest(unittest.TestCase): def test_reconfigure_line_buffering(self): r = self.BytesIO() b = self.BufferedWriter(r, 1000) - t = self.TextIOWrapper(b, newline="\n", line_buffering=False) + t = self.TextIOWrapper(b, encoding="utf-8", newline="\n", line_buffering=False) t.write("AB\nC") self.assertEqual(r.getvalue(), b"") @@ -2722,7 +2725,9 @@ class TextIOWrapperTest(unittest.TestCase): current_locale_encoding = locale.getpreferredencoding(False) b = self.BytesIO() - t = self.TextIOWrapper(b) + with warnings.catch_warnings(): + warnings.simplefilter("ignore", EncodingWarning) + t = self.TextIOWrapper(b) self.assertEqual(t.encoding, current_locale_encoding) finally: os.environ.clear() @@ -2735,16 +2740,18 @@ class TextIOWrapperTest(unittest.TestCase): import _testcapi b = self.BytesIO() b.fileno = lambda: _testcapi.INT_MAX + 1 - self.assertRaises(OverflowError, self.TextIOWrapper, b) + self.assertRaises(OverflowError, self.TextIOWrapper, b, encoding="locale") b.fileno = lambda: _testcapi.UINT_MAX + 1 - self.assertRaises(OverflowError, self.TextIOWrapper, b) + self.assertRaises(OverflowError, self.TextIOWrapper, b, encoding="locale") def test_encoding(self): # Check the encoding attribute is always set, and valid b = self.BytesIO() t = self.TextIOWrapper(b, encoding="utf-8") self.assertEqual(t.encoding, "utf-8") - t = self.TextIOWrapper(b) + with warnings.catch_warnings(): + warnings.simplefilter("ignore", EncodingWarning) + t = self.TextIOWrapper(b) self.assertIsNotNone(t.encoding) codecs.lookup(t.encoding) @@ -2909,7 +2916,7 @@ class TextIOWrapperTest(unittest.TestCase): rawio = self.CloseFailureIO() with support.catch_unraisable_exception() as cm: with self.assertRaises(AttributeError): - self.TextIOWrapper(rawio).xyzzy + self.TextIOWrapper(rawio, encoding="utf-8").xyzzy if not IOBASE_EMITS_UNRAISABLE: self.assertIsNone(cm.unraisable) @@ -3116,11 +3123,11 @@ class TextIOWrapperTest(unittest.TestCase): class UnReadable(self.BytesIO): def readable(self): return False - txt = self.TextIOWrapper(UnReadable()) + txt = self.TextIOWrapper(UnReadable(), encoding="utf-8") self.assertRaises(OSError, txt.read) def test_read_one_by_one(self): - txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB")) + txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"), encoding="utf-8") reads = "" while True: c = txt.read(1) @@ -3130,7 +3137,7 @@ class TextIOWrapperTest(unittest.TestCase): self.assertEqual(reads, "AA\nBB") def test_readlines(self): - txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC")) + txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"), encoding="utf-8") self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"]) txt.seek(0) self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"]) @@ -3140,7 +3147,7 @@ class TextIOWrapperTest(unittest.TestCase): # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128. def test_read_by_chunk(self): # make sure "\r\n" straddles 128 char boundary. - txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB")) + txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"), encoding="utf-8") reads = "" while True: c = txt.read(128) @@ -3152,7 +3159,7 @@ class TextIOWrapperTest(unittest.TestCase): def test_writelines(self): l = ['ab', 'cd', 'ef'] buf = self.BytesIO() - txt = self.TextIOWrapper(buf) + txt = self.TextIOWrapper(buf, encoding="utf-8") txt.writelines(l) txt.flush() self.assertEqual(buf.getvalue(), b'abcdef') @@ -3160,13 +3167,13 @@ class TextIOWrapperTest(unittest.TestCase): def test_writelines_userlist(self): l = UserList(['ab', 'cd', 'ef']) buf = self.BytesIO() - txt = self.TextIOWrapper(buf) + txt = self.TextIOWrapper(buf, encoding="utf-8") txt.writelines(l) txt.flush() self.assertEqual(buf.getvalue(), b'abcdef') def test_writelines_error(self): - txt = self.TextIOWrapper(self.BytesIO()) + txt = self.TextIOWrapper(self.BytesIO(), encoding="utf-8") self.assertRaises(TypeError, txt.writelines, [1, 2, 3]) self.assertRaises(TypeError, txt.writelines, None) self.assertRaises(TypeError, txt.writelines, b'abc') @@ -3274,16 +3281,16 @@ class TextIOWrapperTest(unittest.TestCase): self.assertEqual(f.read(), 'aaaxxx'.encode(charset)) def test_errors_property(self): - with self.open(os_helper.TESTFN, "w") as f: + with self.open(os_helper.TESTFN, "w", encoding="utf-8") as f: self.assertEqual(f.errors, "strict") - with self.open(os_helper.TESTFN, "w", errors="replace") as f: + with self.open(os_helper.TESTFN, "w", encoding="utf-8", errors="replace") as f: self.assertEqual(f.errors, "replace") @support.no_tracing def test_threads_write(self): # Issue6750: concurrent writes could duplicate data event = threading.Event() - with self.open(os_helper.TESTFN, "w", buffering=1) as f: + with self.open(os_helper.TESTFN, "w", encoding="utf-8", buffering=1) as f: def run(n): text = "Thread%03d\n" % n event.wait() @@ -3292,7 +3299,7 @@ class TextIOWrapperTest(unittest.TestCase): for x in range(20)] with threading_helper.start_threads(threads, event.set): time.sleep(0.02) - with self.open(os_helper.TESTFN) as f: + with self.open(os_helper.TESTFN, encoding="utf-8") as f: content = f.read() for n in range(20): self.assertEqual(content.count("Thread%03d\n" % n), 1) @@ -3363,7 +3370,7 @@ class TextIOWrapperTest(unittest.TestCase): self.assertRaises(ValueError, txt.flush) def test_unseekable(self): - txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata)) + txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata), encoding="utf-8") self.assertRaises(self.UnsupportedOperation, txt.tell) self.assertRaises(self.UnsupportedOperation, txt.seek, 0) @@ -3452,11 +3459,11 @@ class TextIOWrapperTest(unittest.TestCase): def test_read_nonbytes(self): # Issue #17106 # Crash when underlying read() returns non-bytes - t = self.TextIOWrapper(self.StringIO('a')) + t = self.TextIOWrapper(self.StringIO('a'), encoding="utf-8") self.assertRaises(TypeError, t.read, 1) - t = self.TextIOWrapper(self.StringIO('a')) + t = self.TextIOWrapper(self.StringIO('a'), encoding="utf-8") self.assertRaises(TypeError, t.readline) - t = self.TextIOWrapper(self.StringIO('a')) + t = self.TextIOWrapper(self.StringIO('a'), encoding="utf-8") self.assertRaises(TypeError, t.read) def test_illegal_encoder(self): @@ -3724,7 +3731,7 @@ class CTextIOWrapperTest(TextIOWrapperTest): def test_initialization(self): r = self.BytesIO(b"\xc3\xa9\n\n") b = self.BufferedReader(r, 1000) - t = self.TextIOWrapper(b) + t = self.TextIOWrapper(b, encoding="utf-8") self.assertRaises(ValueError, t.__init__, b, newline='xyzzy') self.assertRaises(ValueError, t.read) @@ -3948,7 +3955,7 @@ class MiscIOTest(unittest.TestCase): f.close() with warnings_helper.check_warnings(('', DeprecationWarning)): - f = self.open(os_helper.TESTFN, "U") + f = self.open(os_helper.TESTFN, "U", encoding="utf-8") self.assertEqual(f.name, os_helper.TESTFN) self.assertEqual(f.buffer.name, os_helper.TESTFN) self.assertEqual(f.buffer.raw.name, os_helper.TESTFN) @@ -3957,7 +3964,7 @@ class MiscIOTest(unittest.TestCase): self.assertEqual(f.buffer.raw.mode, "rb") f.close() - f = self.open(os_helper.TESTFN, "w+") + f = self.open(os_helper.TESTFN, "w+", encoding="utf-8") self.assertEqual(f.mode, "w+") self.assertEqual(f.buffer.mode, "rb+") # Does it really matter? self.assertEqual(f.buffer.raw.mode, "rb+") @@ -3974,7 +3981,7 @@ class MiscIOTest(unittest.TestCase): # bpo-27805: Ignore ESPIPE from lseek() in open(). r, w = os.pipe() self.addCleanup(os.close, r) - f = self.open(w, 'a') + f = self.open(w, 'a', encoding="utf-8") self.addCleanup(f.close) # Check that the file is marked non-seekable. On Windows, however, lseek # somehow succeeds on pipes. @@ -3999,6 +4006,8 @@ class MiscIOTest(unittest.TestCase): {"mode": "w+", "buffering": 2}, {"mode": "w+b", "buffering": 0}, ]: + if "b" not in kwargs["mode"]: + kwargs["encoding"] = "utf-8" f = self.open(os_helper.TESTFN, **kwargs) f.close() self.assertRaises(ValueError, f.flush) @@ -4059,7 +4068,7 @@ class MiscIOTest(unittest.TestCase): self.assertNotIsInstance(f, abcmodule.RawIOBase) self.assertIsInstance(f, abcmodule.BufferedIOBase) self.assertNotIsInstance(f, abcmodule.TextIOBase) - with self.open(os_helper.TESTFN, "w") as f: + with self.open(os_helper.TESTFN, "w", encoding="utf-8") as f: self.assertIsInstance(f, abcmodule.IOBase) self.assertNotIsInstance(f, abcmodule.RawIOBase) self.assertNotIsInstance(f, abcmodule.BufferedIOBase) @@ -4085,7 +4094,7 @@ class MiscIOTest(unittest.TestCase): def test_warn_on_dealloc(self): self._check_warn_on_dealloc(os_helper.TESTFN, "wb", buffering=0) self._check_warn_on_dealloc(os_helper.TESTFN, "wb") - self._check_warn_on_dealloc(os_helper.TESTFN, "w") + self._check_warn_on_dealloc(os_helper.TESTFN, "w", encoding="utf-8") def _check_warn_on_dealloc_fd(self, *args, **kwargs): fds = [] @@ -4109,7 +4118,7 @@ class MiscIOTest(unittest.TestCase): def test_warn_on_dealloc_fd(self): self._check_warn_on_dealloc_fd("rb", buffering=0) self._check_warn_on_dealloc_fd("rb") - self._check_warn_on_dealloc_fd("r") + self._check_warn_on_dealloc_fd("r", encoding="utf-8") def test_pickling(self): @@ -4125,6 +4134,8 @@ class MiscIOTest(unittest.TestCase): {"mode": "w+b"}, {"mode": "w+b", "buffering": 0}, ]: + if "b" not in kwargs["mode"]: + kwargs["encoding"] = "utf-8" for protocol in range(pickle.HIGHEST_PROTOCOL + 1): with self.open(os_helper.TESTFN, **kwargs) as f: self.assertRaises(TypeError, pickle.dumps, f, protocol) @@ -4189,9 +4200,9 @@ class MiscIOTest(unittest.TestCase): def test_create_fail(self): # 'x' mode fails if file is existing - with self.open(os_helper.TESTFN, 'w'): + with self.open(os_helper.TESTFN, 'w', encoding="utf-8"): pass - self.assertRaises(FileExistsError, self.open, os_helper.TESTFN, 'x') + self.assertRaises(FileExistsError, self.open, os_helper.TESTFN, 'x', encoding="utf-8") def test_create_writes(self): # 'x' mode opens for writing @@ -4202,7 +4213,7 @@ class MiscIOTest(unittest.TestCase): def test_open_allargs(self): # there used to be a buffer overflow in the parser for rawmode - self.assertRaises(ValueError, self.open, os_helper.TESTFN, 'rwax+') + self.assertRaises(ValueError, self.open, os_helper.TESTFN, 'rwax+', encoding="utf-8") def test_check_encoding_errors(self): # bpo-37388: open() and TextIOWrapper must check encoding and errors @@ -4474,7 +4485,7 @@ class SignalsTest(unittest.TestCase): def test_interrupted_read_retry_text(self): self.check_interrupted_read_retry(lambda x: x, - mode="r") + mode="r", encoding="latin1") def check_interrupted_write_retry(self, item, **fdopen_kwargs): """Check that a buffered write, when it gets interrupted (either