bpo-40275: Use new test.support helper submodules in tests (GH-21314)
This commit is contained in:
parent
9ce8132e1f
commit
883bc63833
|
@ -22,10 +22,13 @@ except ImportError:
|
||||||
_testbuffer = None
|
_testbuffer = None
|
||||||
|
|
||||||
from test import support
|
from test import support
|
||||||
|
from test.support import os_helper
|
||||||
from test.support import (
|
from test.support import (
|
||||||
TestFailed, TESTFN, run_with_locale, no_tracing,
|
TestFailed, run_with_locale, no_tracing,
|
||||||
_2G, _4G, bigmemtest, forget,
|
_2G, _4G, bigmemtest
|
||||||
)
|
)
|
||||||
|
from test.support.import_helper import forget
|
||||||
|
from test.support.os_helper import TESTFN
|
||||||
from test.support import threading_helper
|
from test.support import threading_helper
|
||||||
from test.support.warnings_helper import save_restore_warnings_filters
|
from test.support.warnings_helper import save_restore_warnings_filters
|
||||||
|
|
||||||
|
@ -3100,7 +3103,7 @@ class AbstractPickleModuleTests(unittest.TestCase):
|
||||||
f.close()
|
f.close()
|
||||||
self.assertRaises(ValueError, self.dump, 123, f)
|
self.assertRaises(ValueError, self.dump, 123, f)
|
||||||
finally:
|
finally:
|
||||||
support.unlink(TESTFN)
|
os_helper.unlink(TESTFN)
|
||||||
|
|
||||||
def test_load_closed_file(self):
|
def test_load_closed_file(self):
|
||||||
f = open(TESTFN, "wb")
|
f = open(TESTFN, "wb")
|
||||||
|
@ -3108,7 +3111,7 @@ class AbstractPickleModuleTests(unittest.TestCase):
|
||||||
f.close()
|
f.close()
|
||||||
self.assertRaises(ValueError, self.dump, 123, f)
|
self.assertRaises(ValueError, self.dump, 123, f)
|
||||||
finally:
|
finally:
|
||||||
support.unlink(TESTFN)
|
os_helper.unlink(TESTFN)
|
||||||
|
|
||||||
def test_load_from_and_dump_to_file(self):
|
def test_load_from_and_dump_to_file(self):
|
||||||
stream = io.BytesIO()
|
stream = io.BytesIO()
|
||||||
|
@ -3139,7 +3142,7 @@ class AbstractPickleModuleTests(unittest.TestCase):
|
||||||
self.assertRaises(TypeError, self.dump, 123, f, proto)
|
self.assertRaises(TypeError, self.dump, 123, f, proto)
|
||||||
finally:
|
finally:
|
||||||
f.close()
|
f.close()
|
||||||
support.unlink(TESTFN)
|
os_helper.unlink(TESTFN)
|
||||||
|
|
||||||
def test_incomplete_input(self):
|
def test_incomplete_input(self):
|
||||||
s = io.BytesIO(b"X''.")
|
s = io.BytesIO(b"X''.")
|
||||||
|
|
|
@ -5,23 +5,27 @@
|
||||||
"""
|
"""
|
||||||
import unittest
|
import unittest
|
||||||
from test import support
|
from test import support
|
||||||
|
from test.support import import_helper
|
||||||
|
from test.support import os_helper
|
||||||
|
from test.support import warnings_helper
|
||||||
|
|
||||||
with support.check_warnings(('', DeprecationWarning)):
|
|
||||||
binhex = support.import_fresh_module('binhex')
|
with warnings_helper.check_warnings(('', DeprecationWarning)):
|
||||||
|
binhex = import_helper.import_fresh_module('binhex')
|
||||||
|
|
||||||
|
|
||||||
class BinHexTestCase(unittest.TestCase):
|
class BinHexTestCase(unittest.TestCase):
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
# binhex supports only file names encodable to Latin1
|
# binhex supports only file names encodable to Latin1
|
||||||
self.fname1 = support.TESTFN_ASCII + "1"
|
self.fname1 = os_helper.TESTFN_ASCII + "1"
|
||||||
self.fname2 = support.TESTFN_ASCII + "2"
|
self.fname2 = os_helper.TESTFN_ASCII + "2"
|
||||||
self.fname3 = support.TESTFN_ASCII + "very_long_filename__very_long_filename__very_long_filename__very_long_filename__"
|
self.fname3 = os_helper.TESTFN_ASCII + "very_long_filename__very_long_filename__very_long_filename__very_long_filename__"
|
||||||
|
|
||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
support.unlink(self.fname1)
|
os_helper.unlink(self.fname1)
|
||||||
support.unlink(self.fname2)
|
os_helper.unlink(self.fname2)
|
||||||
support.unlink(self.fname3)
|
os_helper.unlink(self.fname3)
|
||||||
|
|
||||||
DATA = b'Jack is my hero'
|
DATA = b'Jack is my hero'
|
||||||
|
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
import unittest
|
import unittest
|
||||||
from test import support
|
from test import support
|
||||||
|
from test.support import os_helper
|
||||||
|
|
||||||
import io # C implementation.
|
import io # C implementation.
|
||||||
import _pyio as pyio # Python implementation.
|
import _pyio as pyio # Python implementation.
|
||||||
|
@ -17,18 +18,18 @@ class BufferSizeTest:
|
||||||
# .readline()s deliver what we wrote.
|
# .readline()s deliver what we wrote.
|
||||||
|
|
||||||
# Ensure we can open TESTFN for writing.
|
# Ensure we can open TESTFN for writing.
|
||||||
support.unlink(support.TESTFN)
|
os_helper.unlink(os_helper.TESTFN)
|
||||||
|
|
||||||
# Since C doesn't guarantee we can write/read arbitrary bytes in text
|
# Since C doesn't guarantee we can write/read arbitrary bytes in text
|
||||||
# files, use binary mode.
|
# files, use binary mode.
|
||||||
f = self.open(support.TESTFN, "wb")
|
f = self.open(os_helper.TESTFN, "wb")
|
||||||
try:
|
try:
|
||||||
# write once with \n and once without
|
# write once with \n and once without
|
||||||
f.write(s)
|
f.write(s)
|
||||||
f.write(b"\n")
|
f.write(b"\n")
|
||||||
f.write(s)
|
f.write(s)
|
||||||
f.close()
|
f.close()
|
||||||
f = open(support.TESTFN, "rb")
|
f = open(os_helper.TESTFN, "rb")
|
||||||
line = f.readline()
|
line = f.readline()
|
||||||
self.assertEqual(line, s + b"\n")
|
self.assertEqual(line, s + b"\n")
|
||||||
line = f.readline()
|
line = f.readline()
|
||||||
|
@ -37,7 +38,7 @@ class BufferSizeTest:
|
||||||
self.assertFalse(line) # Must be at EOF
|
self.assertFalse(line) # Must be at EOF
|
||||||
f.close()
|
f.close()
|
||||||
finally:
|
finally:
|
||||||
support.unlink(support.TESTFN)
|
os_helper.unlink(os_helper.TESTFN)
|
||||||
|
|
||||||
def drive_one(self, pattern):
|
def drive_one(self, pattern):
|
||||||
for length in lengths:
|
for length in lengths:
|
||||||
|
|
|
@ -17,6 +17,7 @@ import importlib.machinery
|
||||||
import importlib.util
|
import importlib.util
|
||||||
from test import support
|
from test import support
|
||||||
from test.support import MISSING_C_DOCSTRINGS
|
from test.support import MISSING_C_DOCSTRINGS
|
||||||
|
from test.support import import_helper
|
||||||
from test.support import threading_helper
|
from test.support import threading_helper
|
||||||
from test.support.script_helper import assert_python_failure, assert_python_ok
|
from test.support.script_helper import assert_python_failure, assert_python_ok
|
||||||
try:
|
try:
|
||||||
|
@ -25,7 +26,7 @@ except ImportError:
|
||||||
_posixsubprocess = None
|
_posixsubprocess = None
|
||||||
|
|
||||||
# Skip this test if the _testcapi module isn't available.
|
# Skip this test if the _testcapi module isn't available.
|
||||||
_testcapi = support.import_module('_testcapi')
|
_testcapi = import_helper.import_module('_testcapi')
|
||||||
|
|
||||||
import _testinternalcapi
|
import _testinternalcapi
|
||||||
|
|
||||||
|
|
|
@ -23,6 +23,7 @@ except ImportError:
|
||||||
_have_multiprocessing = False
|
_have_multiprocessing = False
|
||||||
|
|
||||||
from test import support
|
from test import support
|
||||||
|
from test.support import os_helper
|
||||||
from test.support import script_helper
|
from test.support import script_helper
|
||||||
|
|
||||||
from .test_py_compile import without_source_date_epoch
|
from .test_py_compile import without_source_date_epoch
|
||||||
|
@ -356,7 +357,7 @@ class CompileallTestsBase:
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@support.skip_unless_symlink
|
@os_helper.skip_unless_symlink
|
||||||
def test_ignore_symlink_destination(self):
|
def test_ignore_symlink_destination(self):
|
||||||
# Create folders for allowed files, symlinks and prohibited area
|
# Create folders for allowed files, symlinks and prohibited area
|
||||||
allowed_path = os.path.join(self.directory, "test", "dir", "allowed")
|
allowed_path = os.path.join(self.directory, "test", "dir", "allowed")
|
||||||
|
@ -438,7 +439,7 @@ class CommandLineTestsBase:
|
||||||
sys_path_writable = False
|
sys_path_writable = False
|
||||||
break
|
break
|
||||||
finally:
|
finally:
|
||||||
support.unlink(str(path))
|
os_helper.unlink(str(path))
|
||||||
if directory_created:
|
if directory_created:
|
||||||
directory.rmdir()
|
directory.rmdir()
|
||||||
else:
|
else:
|
||||||
|
@ -477,7 +478,7 @@ class CommandLineTestsBase:
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
self.directory = tempfile.mkdtemp()
|
self.directory = tempfile.mkdtemp()
|
||||||
self.addCleanup(support.rmtree, self.directory)
|
self.addCleanup(os_helper.rmtree, self.directory)
|
||||||
self.pkgdir = os.path.join(self.directory, 'foo')
|
self.pkgdir = os.path.join(self.directory, 'foo')
|
||||||
os.mkdir(self.pkgdir)
|
os.mkdir(self.pkgdir)
|
||||||
self.pkgdir_cachedir = os.path.join(self.pkgdir, '__pycache__')
|
self.pkgdir_cachedir = os.path.join(self.pkgdir, '__pycache__')
|
||||||
|
@ -625,7 +626,7 @@ class CommandLineTestsBase:
|
||||||
self.assertCompiled(spamfn)
|
self.assertCompiled(spamfn)
|
||||||
self.assertCompiled(eggfn)
|
self.assertCompiled(eggfn)
|
||||||
|
|
||||||
@support.skip_unless_symlink
|
@os_helper.skip_unless_symlink
|
||||||
def test_symlink_loop(self):
|
def test_symlink_loop(self):
|
||||||
# Currently, compileall ignores symlinks to directories.
|
# Currently, compileall ignores symlinks to directories.
|
||||||
# If that limitation is ever lifted, it should protect against
|
# If that limitation is ever lifted, it should protect against
|
||||||
|
@ -823,7 +824,7 @@ class CommandLineTestsBase:
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@support.skip_unless_symlink
|
@os_helper.skip_unless_symlink
|
||||||
def test_ignore_symlink_destination(self):
|
def test_ignore_symlink_destination(self):
|
||||||
# Create folders for allowed files, symlinks and prohibited area
|
# Create folders for allowed files, symlinks and prohibited area
|
||||||
allowed_path = os.path.join(self.directory, "test", "dir", "allowed")
|
allowed_path = os.path.join(self.directory, "test", "dir", "allowed")
|
||||||
|
|
|
@ -9,8 +9,9 @@ from array import array
|
||||||
from weakref import proxy
|
from weakref import proxy
|
||||||
from functools import wraps
|
from functools import wraps
|
||||||
|
|
||||||
from test.support import (TESTFN, TESTFN_UNICODE, check_warnings, run_unittest,
|
from test.support import (run_unittest, cpython_only, swap_attr)
|
||||||
make_bad_fd, cpython_only, swap_attr)
|
from test.support.os_helper import (TESTFN, TESTFN_UNICODE, make_bad_fd)
|
||||||
|
from test.support.warnings_helper import check_warnings
|
||||||
from collections import UserList
|
from collections import UserList
|
||||||
|
|
||||||
import _io # C implementation of io
|
import _io # C implementation of io
|
||||||
|
|
|
@ -5,11 +5,12 @@ import unittest
|
||||||
import doctest
|
import doctest
|
||||||
|
|
||||||
from test import support
|
from test import support
|
||||||
|
from test.support import import_helper
|
||||||
from unittest import TestCase, skipUnless
|
from unittest import TestCase, skipUnless
|
||||||
from operator import itemgetter
|
from operator import itemgetter
|
||||||
|
|
||||||
py_heapq = support.import_fresh_module('heapq', blocked=['_heapq'])
|
py_heapq = import_helper.import_fresh_module('heapq', blocked=['_heapq'])
|
||||||
c_heapq = support.import_fresh_module('heapq', fresh=['_heapq'])
|
c_heapq = import_helper.import_fresh_module('heapq', fresh=['_heapq'])
|
||||||
|
|
||||||
# _heapq.nlargest/nsmallest are saved in heapq._nlargest/_smallest when
|
# _heapq.nlargest/nsmallest are saved in heapq._nlargest/_smallest when
|
||||||
# _heapq is imported, so check them there
|
# _heapq is imported, so check them there
|
||||||
|
|
|
@ -13,7 +13,10 @@ import unittest
|
||||||
TestCase = unittest.TestCase
|
TestCase = unittest.TestCase
|
||||||
|
|
||||||
from test import support
|
from test import support
|
||||||
|
from test.support import os_helper
|
||||||
from test.support import socket_helper
|
from test.support import socket_helper
|
||||||
|
from test.support import warnings_helper
|
||||||
|
|
||||||
|
|
||||||
here = os.path.dirname(__file__)
|
here = os.path.dirname(__file__)
|
||||||
# Self-signed cert file for 'localhost'
|
# Self-signed cert file for 'localhost'
|
||||||
|
@ -1766,14 +1769,14 @@ class HTTPSTest(TestCase):
|
||||||
with self.assertRaises(ssl.CertificateError):
|
with self.assertRaises(ssl.CertificateError):
|
||||||
h.request('GET', '/')
|
h.request('GET', '/')
|
||||||
# Same with explicit check_hostname=True
|
# Same with explicit check_hostname=True
|
||||||
with support.check_warnings(('', DeprecationWarning)):
|
with warnings_helper.check_warnings(('', DeprecationWarning)):
|
||||||
h = client.HTTPSConnection('localhost', server.port,
|
h = client.HTTPSConnection('localhost', server.port,
|
||||||
context=context, check_hostname=True)
|
context=context, check_hostname=True)
|
||||||
with self.assertRaises(ssl.CertificateError):
|
with self.assertRaises(ssl.CertificateError):
|
||||||
h.request('GET', '/')
|
h.request('GET', '/')
|
||||||
# With check_hostname=False, the mismatching is ignored
|
# With check_hostname=False, the mismatching is ignored
|
||||||
context.check_hostname = False
|
context.check_hostname = False
|
||||||
with support.check_warnings(('', DeprecationWarning)):
|
with warnings_helper.check_warnings(('', DeprecationWarning)):
|
||||||
h = client.HTTPSConnection('localhost', server.port,
|
h = client.HTTPSConnection('localhost', server.port,
|
||||||
context=context, check_hostname=False)
|
context=context, check_hostname=False)
|
||||||
h.request('GET', '/nonexistent')
|
h.request('GET', '/nonexistent')
|
||||||
|
@ -1792,7 +1795,7 @@ class HTTPSTest(TestCase):
|
||||||
h.close()
|
h.close()
|
||||||
# Passing check_hostname to HTTPSConnection should override the
|
# Passing check_hostname to HTTPSConnection should override the
|
||||||
# context's setting.
|
# context's setting.
|
||||||
with support.check_warnings(('', DeprecationWarning)):
|
with warnings_helper.check_warnings(('', DeprecationWarning)):
|
||||||
h = client.HTTPSConnection('localhost', server.port,
|
h = client.HTTPSConnection('localhost', server.port,
|
||||||
context=context, check_hostname=True)
|
context=context, check_hostname=True)
|
||||||
with self.assertRaises(ssl.CertificateError):
|
with self.assertRaises(ssl.CertificateError):
|
||||||
|
@ -1908,10 +1911,10 @@ class RequestBodyTest(TestCase):
|
||||||
self.assertEqual(b'body\xc1', f.read())
|
self.assertEqual(b'body\xc1', f.read())
|
||||||
|
|
||||||
def test_text_file_body(self):
|
def test_text_file_body(self):
|
||||||
self.addCleanup(support.unlink, support.TESTFN)
|
self.addCleanup(os_helper.unlink, os_helper.TESTFN)
|
||||||
with open(support.TESTFN, "w") as f:
|
with open(os_helper.TESTFN, "w") as f:
|
||||||
f.write("body")
|
f.write("body")
|
||||||
with open(support.TESTFN) as f:
|
with open(os_helper.TESTFN) as f:
|
||||||
self.conn.request("PUT", "/url", f)
|
self.conn.request("PUT", "/url", f)
|
||||||
message, f = self.get_headers_and_fp()
|
message, f = self.get_headers_and_fp()
|
||||||
self.assertEqual("text/plain", message.get_content_type())
|
self.assertEqual("text/plain", message.get_content_type())
|
||||||
|
@ -1923,10 +1926,10 @@ class RequestBodyTest(TestCase):
|
||||||
self.assertEqual(b'4\r\nbody\r\n0\r\n\r\n', f.read())
|
self.assertEqual(b'4\r\nbody\r\n0\r\n\r\n', f.read())
|
||||||
|
|
||||||
def test_binary_file_body(self):
|
def test_binary_file_body(self):
|
||||||
self.addCleanup(support.unlink, support.TESTFN)
|
self.addCleanup(os_helper.unlink, os_helper.TESTFN)
|
||||||
with open(support.TESTFN, "wb") as f:
|
with open(os_helper.TESTFN, "wb") as f:
|
||||||
f.write(b"body\xc1")
|
f.write(b"body\xc1")
|
||||||
with open(support.TESTFN, "rb") as f:
|
with open(os_helper.TESTFN, "rb") as f:
|
||||||
self.conn.request("PUT", "/url", f)
|
self.conn.request("PUT", "/url", f)
|
||||||
message, f = self.get_headers_and_fp()
|
message, f = self.get_headers_and_fp()
|
||||||
self.assertEqual("text/plain", message.get_content_type())
|
self.assertEqual("text/plain", message.get_content_type())
|
||||||
|
|
|
@ -12,6 +12,7 @@ import os
|
||||||
import os.path
|
import os.path
|
||||||
from pathlib import Path, PurePath
|
from pathlib import Path, PurePath
|
||||||
from test import support
|
from test import support
|
||||||
|
from test.support import import_helper
|
||||||
import unittest
|
import unittest
|
||||||
import sys
|
import sys
|
||||||
import tempfile
|
import tempfile
|
||||||
|
@ -55,8 +56,8 @@ _extension_details()
|
||||||
def import_importlib(module_name):
|
def import_importlib(module_name):
|
||||||
"""Import a module from importlib both w/ and w/o _frozen_importlib."""
|
"""Import a module from importlib both w/ and w/o _frozen_importlib."""
|
||||||
fresh = ('importlib',) if '.' in module_name else ()
|
fresh = ('importlib',) if '.' in module_name else ()
|
||||||
frozen = support.import_fresh_module(module_name)
|
frozen = import_helper.import_fresh_module(module_name)
|
||||||
source = support.import_fresh_module(module_name, fresh=fresh,
|
source = import_helper.import_fresh_module(module_name, fresh=fresh,
|
||||||
blocked=('_frozen_importlib', '_frozen_importlib_external'))
|
blocked=('_frozen_importlib', '_frozen_importlib_external'))
|
||||||
return {'Frozen': frozen, 'Source': source}
|
return {'Frozen': frozen, 'Source': source}
|
||||||
|
|
||||||
|
|
|
@ -40,8 +40,11 @@ from itertools import cycle, count
|
||||||
from test import support
|
from test import support
|
||||||
from test.support.script_helper import (
|
from test.support.script_helper import (
|
||||||
assert_python_ok, assert_python_failure, run_python_until_end)
|
assert_python_ok, assert_python_failure, run_python_until_end)
|
||||||
|
from test.support import import_helper
|
||||||
|
from test.support import os_helper
|
||||||
from test.support import threading_helper
|
from test.support import threading_helper
|
||||||
from test.support import FakePath
|
from test.support import warnings_helper
|
||||||
|
from test.support.os_helper import FakePath
|
||||||
|
|
||||||
import codecs
|
import codecs
|
||||||
import io # C implementation of io
|
import io # C implementation of io
|
||||||
|
@ -317,10 +320,10 @@ class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
|
||||||
class IOTest(unittest.TestCase):
|
class IOTest(unittest.TestCase):
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
support.unlink(support.TESTFN)
|
os_helper.unlink(os_helper.TESTFN)
|
||||||
|
|
||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
support.unlink(support.TESTFN)
|
os_helper.unlink(os_helper.TESTFN)
|
||||||
|
|
||||||
def write_ops(self, f):
|
def write_ops(self, f):
|
||||||
self.assertEqual(f.write(b"blah."), 5)
|
self.assertEqual(f.write(b"blah."), 5)
|
||||||
|
@ -405,19 +408,19 @@ class IOTest(unittest.TestCase):
|
||||||
# Try writing on a file opened in read mode and vice-versa.
|
# Try writing on a file opened in read mode and vice-versa.
|
||||||
exc = self.UnsupportedOperation
|
exc = self.UnsupportedOperation
|
||||||
for mode in ("w", "wb"):
|
for mode in ("w", "wb"):
|
||||||
with self.open(support.TESTFN, mode) as fp:
|
with self.open(os_helper.TESTFN, mode) as fp:
|
||||||
self.assertRaises(exc, fp.read)
|
self.assertRaises(exc, fp.read)
|
||||||
self.assertRaises(exc, fp.readline)
|
self.assertRaises(exc, fp.readline)
|
||||||
with self.open(support.TESTFN, "wb", buffering=0) as fp:
|
with self.open(os_helper.TESTFN, "wb", buffering=0) as fp:
|
||||||
self.assertRaises(exc, fp.read)
|
self.assertRaises(exc, fp.read)
|
||||||
self.assertRaises(exc, fp.readline)
|
self.assertRaises(exc, fp.readline)
|
||||||
with self.open(support.TESTFN, "rb", buffering=0) as fp:
|
with self.open(os_helper.TESTFN, "rb", buffering=0) as fp:
|
||||||
self.assertRaises(exc, fp.write, b"blah")
|
self.assertRaises(exc, fp.write, b"blah")
|
||||||
self.assertRaises(exc, fp.writelines, [b"blah\n"])
|
self.assertRaises(exc, fp.writelines, [b"blah\n"])
|
||||||
with self.open(support.TESTFN, "rb") as fp:
|
with self.open(os_helper.TESTFN, "rb") as fp:
|
||||||
self.assertRaises(exc, fp.write, b"blah")
|
self.assertRaises(exc, fp.write, b"blah")
|
||||||
self.assertRaises(exc, fp.writelines, [b"blah\n"])
|
self.assertRaises(exc, fp.writelines, [b"blah\n"])
|
||||||
with self.open(support.TESTFN, "r") as fp:
|
with self.open(os_helper.TESTFN, "r") as fp:
|
||||||
self.assertRaises(exc, fp.write, "blah")
|
self.assertRaises(exc, fp.write, "blah")
|
||||||
self.assertRaises(exc, fp.writelines, ["blah\n"])
|
self.assertRaises(exc, fp.writelines, ["blah\n"])
|
||||||
# Non-zero seeking from current or end pos
|
# Non-zero seeking from current or end pos
|
||||||
|
@ -538,33 +541,33 @@ class IOTest(unittest.TestCase):
|
||||||
self.assertRaises(ValueError, self.open, bytes_fn, 'w')
|
self.assertRaises(ValueError, self.open, bytes_fn, 'w')
|
||||||
|
|
||||||
def test_raw_file_io(self):
|
def test_raw_file_io(self):
|
||||||
with self.open(support.TESTFN, "wb", buffering=0) as f:
|
with self.open(os_helper.TESTFN, "wb", buffering=0) as f:
|
||||||
self.assertEqual(f.readable(), False)
|
self.assertEqual(f.readable(), False)
|
||||||
self.assertEqual(f.writable(), True)
|
self.assertEqual(f.writable(), True)
|
||||||
self.assertEqual(f.seekable(), True)
|
self.assertEqual(f.seekable(), True)
|
||||||
self.write_ops(f)
|
self.write_ops(f)
|
||||||
with self.open(support.TESTFN, "rb", buffering=0) as f:
|
with self.open(os_helper.TESTFN, "rb", buffering=0) as f:
|
||||||
self.assertEqual(f.readable(), True)
|
self.assertEqual(f.readable(), True)
|
||||||
self.assertEqual(f.writable(), False)
|
self.assertEqual(f.writable(), False)
|
||||||
self.assertEqual(f.seekable(), True)
|
self.assertEqual(f.seekable(), True)
|
||||||
self.read_ops(f)
|
self.read_ops(f)
|
||||||
|
|
||||||
def test_buffered_file_io(self):
|
def test_buffered_file_io(self):
|
||||||
with self.open(support.TESTFN, "wb") as f:
|
with self.open(os_helper.TESTFN, "wb") as f:
|
||||||
self.assertEqual(f.readable(), False)
|
self.assertEqual(f.readable(), False)
|
||||||
self.assertEqual(f.writable(), True)
|
self.assertEqual(f.writable(), True)
|
||||||
self.assertEqual(f.seekable(), True)
|
self.assertEqual(f.seekable(), True)
|
||||||
self.write_ops(f)
|
self.write_ops(f)
|
||||||
with self.open(support.TESTFN, "rb") as f:
|
with self.open(os_helper.TESTFN, "rb") as f:
|
||||||
self.assertEqual(f.readable(), True)
|
self.assertEqual(f.readable(), True)
|
||||||
self.assertEqual(f.writable(), False)
|
self.assertEqual(f.writable(), False)
|
||||||
self.assertEqual(f.seekable(), True)
|
self.assertEqual(f.seekable(), True)
|
||||||
self.read_ops(f, True)
|
self.read_ops(f, True)
|
||||||
|
|
||||||
def test_readline(self):
|
def test_readline(self):
|
||||||
with self.open(support.TESTFN, "wb") as f:
|
with self.open(os_helper.TESTFN, "wb") as f:
|
||||||
f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
|
f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
|
||||||
with self.open(support.TESTFN, "rb") as f:
|
with self.open(os_helper.TESTFN, "rb") as f:
|
||||||
self.assertEqual(f.readline(), b"abc\n")
|
self.assertEqual(f.readline(), b"abc\n")
|
||||||
self.assertEqual(f.readline(10), b"def\n")
|
self.assertEqual(f.readline(10), b"def\n")
|
||||||
self.assertEqual(f.readline(2), b"xy")
|
self.assertEqual(f.readline(2), b"xy")
|
||||||
|
@ -572,7 +575,7 @@ class IOTest(unittest.TestCase):
|
||||||
self.assertEqual(f.readline(), b"foo\x00bar\n")
|
self.assertEqual(f.readline(), b"foo\x00bar\n")
|
||||||
self.assertEqual(f.readline(None), b"another line")
|
self.assertEqual(f.readline(None), b"another line")
|
||||||
self.assertRaises(TypeError, f.readline, 5.3)
|
self.assertRaises(TypeError, f.readline, 5.3)
|
||||||
with self.open(support.TESTFN, "r") as f:
|
with self.open(os_helper.TESTFN, "r") as f:
|
||||||
self.assertRaises(TypeError, f.readline, 5.3)
|
self.assertRaises(TypeError, f.readline, 5.3)
|
||||||
|
|
||||||
def test_readline_nonsizeable(self):
|
def test_readline_nonsizeable(self):
|
||||||
|
@ -607,20 +610,20 @@ class IOTest(unittest.TestCase):
|
||||||
support.requires(
|
support.requires(
|
||||||
'largefile',
|
'largefile',
|
||||||
'test requires %s bytes and a long time to run' % self.LARGE)
|
'test requires %s bytes and a long time to run' % self.LARGE)
|
||||||
with self.open(support.TESTFN, "w+b", 0) as f:
|
with self.open(os_helper.TESTFN, "w+b", 0) as f:
|
||||||
self.large_file_ops(f)
|
self.large_file_ops(f)
|
||||||
with self.open(support.TESTFN, "w+b") as f:
|
with self.open(os_helper.TESTFN, "w+b") as f:
|
||||||
self.large_file_ops(f)
|
self.large_file_ops(f)
|
||||||
|
|
||||||
def test_with_open(self):
|
def test_with_open(self):
|
||||||
for bufsize in (0, 100):
|
for bufsize in (0, 100):
|
||||||
f = None
|
f = None
|
||||||
with self.open(support.TESTFN, "wb", bufsize) as f:
|
with self.open(os_helper.TESTFN, "wb", bufsize) as f:
|
||||||
f.write(b"xxx")
|
f.write(b"xxx")
|
||||||
self.assertEqual(f.closed, True)
|
self.assertEqual(f.closed, True)
|
||||||
f = None
|
f = None
|
||||||
try:
|
try:
|
||||||
with self.open(support.TESTFN, "wb", bufsize) as f:
|
with self.open(os_helper.TESTFN, "wb", bufsize) as f:
|
||||||
1/0
|
1/0
|
||||||
except ZeroDivisionError:
|
except ZeroDivisionError:
|
||||||
self.assertEqual(f.closed, True)
|
self.assertEqual(f.closed, True)
|
||||||
|
@ -629,13 +632,13 @@ class IOTest(unittest.TestCase):
|
||||||
|
|
||||||
# issue 5008
|
# issue 5008
|
||||||
def test_append_mode_tell(self):
|
def test_append_mode_tell(self):
|
||||||
with self.open(support.TESTFN, "wb") as f:
|
with self.open(os_helper.TESTFN, "wb") as f:
|
||||||
f.write(b"xxx")
|
f.write(b"xxx")
|
||||||
with self.open(support.TESTFN, "ab", buffering=0) as f:
|
with self.open(os_helper.TESTFN, "ab", buffering=0) as f:
|
||||||
self.assertEqual(f.tell(), 3)
|
self.assertEqual(f.tell(), 3)
|
||||||
with self.open(support.TESTFN, "ab") as f:
|
with self.open(os_helper.TESTFN, "ab") as f:
|
||||||
self.assertEqual(f.tell(), 3)
|
self.assertEqual(f.tell(), 3)
|
||||||
with self.open(support.TESTFN, "a") as f:
|
with self.open(os_helper.TESTFN, "a") as f:
|
||||||
self.assertGreater(f.tell(), 0)
|
self.assertGreater(f.tell(), 0)
|
||||||
|
|
||||||
def test_destructor(self):
|
def test_destructor(self):
|
||||||
|
@ -655,13 +658,13 @@ class IOTest(unittest.TestCase):
|
||||||
def flush(self):
|
def flush(self):
|
||||||
record.append(3)
|
record.append(3)
|
||||||
super().flush()
|
super().flush()
|
||||||
with support.check_warnings(('', ResourceWarning)):
|
with warnings_helper.check_warnings(('', ResourceWarning)):
|
||||||
f = MyFileIO(support.TESTFN, "wb")
|
f = MyFileIO(os_helper.TESTFN, "wb")
|
||||||
f.write(b"xxx")
|
f.write(b"xxx")
|
||||||
del f
|
del f
|
||||||
support.gc_collect()
|
support.gc_collect()
|
||||||
self.assertEqual(record, [1, 2, 3])
|
self.assertEqual(record, [1, 2, 3])
|
||||||
with self.open(support.TESTFN, "rb") as f:
|
with self.open(os_helper.TESTFN, "rb") as f:
|
||||||
self.assertEqual(f.read(), b"xxx")
|
self.assertEqual(f.read(), b"xxx")
|
||||||
|
|
||||||
def _check_base_destructor(self, base):
|
def _check_base_destructor(self, base):
|
||||||
|
@ -707,9 +710,9 @@ class IOTest(unittest.TestCase):
|
||||||
self._check_base_destructor(self.TextIOBase)
|
self._check_base_destructor(self.TextIOBase)
|
||||||
|
|
||||||
def test_close_flushes(self):
|
def test_close_flushes(self):
|
||||||
with self.open(support.TESTFN, "wb") as f:
|
with self.open(os_helper.TESTFN, "wb") as f:
|
||||||
f.write(b"xxx")
|
f.write(b"xxx")
|
||||||
with self.open(support.TESTFN, "rb") as f:
|
with self.open(os_helper.TESTFN, "rb") as f:
|
||||||
self.assertEqual(f.read(), b"xxx")
|
self.assertEqual(f.read(), b"xxx")
|
||||||
|
|
||||||
def test_array_writes(self):
|
def test_array_writes(self):
|
||||||
|
@ -720,25 +723,25 @@ class IOTest(unittest.TestCase):
|
||||||
self.assertEqual(f.write(a), n)
|
self.assertEqual(f.write(a), n)
|
||||||
f.writelines((a,))
|
f.writelines((a,))
|
||||||
check(self.BytesIO())
|
check(self.BytesIO())
|
||||||
check(self.FileIO(support.TESTFN, "w"))
|
check(self.FileIO(os_helper.TESTFN, "w"))
|
||||||
check(self.BufferedWriter(self.MockRawIO()))
|
check(self.BufferedWriter(self.MockRawIO()))
|
||||||
check(self.BufferedRandom(self.MockRawIO()))
|
check(self.BufferedRandom(self.MockRawIO()))
|
||||||
check(self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()))
|
check(self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()))
|
||||||
|
|
||||||
def test_closefd(self):
|
def test_closefd(self):
|
||||||
self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
|
self.assertRaises(ValueError, self.open, os_helper.TESTFN, 'w',
|
||||||
closefd=False)
|
closefd=False)
|
||||||
|
|
||||||
def test_read_closed(self):
|
def test_read_closed(self):
|
||||||
with self.open(support.TESTFN, "w") as f:
|
with self.open(os_helper.TESTFN, "w") as f:
|
||||||
f.write("egg\n")
|
f.write("egg\n")
|
||||||
with self.open(support.TESTFN, "r") as f:
|
with self.open(os_helper.TESTFN, "r") as f:
|
||||||
file = self.open(f.fileno(), "r", closefd=False)
|
file = self.open(f.fileno(), "r", closefd=False)
|
||||||
self.assertEqual(file.read(), "egg\n")
|
self.assertEqual(file.read(), "egg\n")
|
||||||
file.seek(0)
|
file.seek(0)
|
||||||
file.close()
|
file.close()
|
||||||
self.assertRaises(ValueError, file.read)
|
self.assertRaises(ValueError, file.read)
|
||||||
with self.open(support.TESTFN, "rb") as f:
|
with self.open(os_helper.TESTFN, "rb") as f:
|
||||||
file = self.open(f.fileno(), "rb", closefd=False)
|
file = self.open(f.fileno(), "rb", closefd=False)
|
||||||
self.assertEqual(file.read()[:3], b"egg")
|
self.assertEqual(file.read()[:3], b"egg")
|
||||||
file.close()
|
file.close()
|
||||||
|
@ -746,12 +749,12 @@ class IOTest(unittest.TestCase):
|
||||||
|
|
||||||
def test_no_closefd_with_filename(self):
|
def test_no_closefd_with_filename(self):
|
||||||
# can't use closefd in combination with a file name
|
# can't use closefd in combination with a file name
|
||||||
self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
|
self.assertRaises(ValueError, self.open, os_helper.TESTFN, "r", closefd=False)
|
||||||
|
|
||||||
def test_closefd_attr(self):
|
def test_closefd_attr(self):
|
||||||
with self.open(support.TESTFN, "wb") as f:
|
with self.open(os_helper.TESTFN, "wb") as f:
|
||||||
f.write(b"egg\n")
|
f.write(b"egg\n")
|
||||||
with self.open(support.TESTFN, "r") as f:
|
with self.open(os_helper.TESTFN, "r") as f:
|
||||||
self.assertEqual(f.buffer.raw.closefd, True)
|
self.assertEqual(f.buffer.raw.closefd, True)
|
||||||
file = self.open(f.fileno(), "r", closefd=False)
|
file = self.open(f.fileno(), "r", closefd=False)
|
||||||
self.assertEqual(file.buffer.raw.closefd, False)
|
self.assertEqual(file.buffer.raw.closefd, False)
|
||||||
|
@ -759,15 +762,15 @@ class IOTest(unittest.TestCase):
|
||||||
def test_garbage_collection(self):
|
def test_garbage_collection(self):
|
||||||
# FileIO objects are collected, and collecting them flushes
|
# FileIO objects are collected, and collecting them flushes
|
||||||
# all data to disk.
|
# all data to disk.
|
||||||
with support.check_warnings(('', ResourceWarning)):
|
with warnings_helper.check_warnings(('', ResourceWarning)):
|
||||||
f = self.FileIO(support.TESTFN, "wb")
|
f = self.FileIO(os_helper.TESTFN, "wb")
|
||||||
f.write(b"abcxxx")
|
f.write(b"abcxxx")
|
||||||
f.f = f
|
f.f = f
|
||||||
wr = weakref.ref(f)
|
wr = weakref.ref(f)
|
||||||
del f
|
del f
|
||||||
support.gc_collect()
|
support.gc_collect()
|
||||||
self.assertIsNone(wr(), wr)
|
self.assertIsNone(wr(), wr)
|
||||||
with self.open(support.TESTFN, "rb") as f:
|
with self.open(os_helper.TESTFN, "rb") as f:
|
||||||
self.assertEqual(f.read(), b"abcxxx")
|
self.assertEqual(f.read(), b"abcxxx")
|
||||||
|
|
||||||
def test_unbounded_file(self):
|
def test_unbounded_file(self):
|
||||||
|
@ -804,29 +807,29 @@ class IOTest(unittest.TestCase):
|
||||||
def test_flush_error_on_close(self):
|
def test_flush_error_on_close(self):
|
||||||
# raw file
|
# raw file
|
||||||
# Issue #5700: io.FileIO calls flush() after file closed
|
# Issue #5700: io.FileIO calls flush() after file closed
|
||||||
self.check_flush_error_on_close(support.TESTFN, 'wb', buffering=0)
|
self.check_flush_error_on_close(os_helper.TESTFN, 'wb', buffering=0)
|
||||||
fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
|
fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT)
|
||||||
self.check_flush_error_on_close(fd, 'wb', buffering=0)
|
self.check_flush_error_on_close(fd, 'wb', buffering=0)
|
||||||
fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
|
fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT)
|
||||||
self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False)
|
self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False)
|
||||||
os.close(fd)
|
os.close(fd)
|
||||||
# buffered io
|
# buffered io
|
||||||
self.check_flush_error_on_close(support.TESTFN, 'wb')
|
self.check_flush_error_on_close(os_helper.TESTFN, 'wb')
|
||||||
fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
|
fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT)
|
||||||
self.check_flush_error_on_close(fd, 'wb')
|
self.check_flush_error_on_close(fd, 'wb')
|
||||||
fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
|
fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT)
|
||||||
self.check_flush_error_on_close(fd, 'wb', closefd=False)
|
self.check_flush_error_on_close(fd, 'wb', closefd=False)
|
||||||
os.close(fd)
|
os.close(fd)
|
||||||
# text io
|
# text io
|
||||||
self.check_flush_error_on_close(support.TESTFN, 'w')
|
self.check_flush_error_on_close(os_helper.TESTFN, 'w')
|
||||||
fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
|
fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT)
|
||||||
self.check_flush_error_on_close(fd, 'w')
|
self.check_flush_error_on_close(fd, 'w')
|
||||||
fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
|
fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT)
|
||||||
self.check_flush_error_on_close(fd, 'w', closefd=False)
|
self.check_flush_error_on_close(fd, 'w', closefd=False)
|
||||||
os.close(fd)
|
os.close(fd)
|
||||||
|
|
||||||
def test_multi_close(self):
|
def test_multi_close(self):
|
||||||
f = self.open(support.TESTFN, "wb", buffering=0)
|
f = self.open(os_helper.TESTFN, "wb", buffering=0)
|
||||||
f.close()
|
f.close()
|
||||||
f.close()
|
f.close()
|
||||||
f.close()
|
f.close()
|
||||||
|
@ -857,9 +860,9 @@ class IOTest(unittest.TestCase):
|
||||||
self.assertTrue(hasattr(obj, "__dict__"))
|
self.assertTrue(hasattr(obj, "__dict__"))
|
||||||
|
|
||||||
def test_opener(self):
|
def test_opener(self):
|
||||||
with self.open(support.TESTFN, "w") as f:
|
with self.open(os_helper.TESTFN, "w") as f:
|
||||||
f.write("egg\n")
|
f.write("egg\n")
|
||||||
fd = os.open(support.TESTFN, os.O_RDONLY)
|
fd = os.open(os_helper.TESTFN, os.O_RDONLY)
|
||||||
def opener(path, flags):
|
def opener(path, flags):
|
||||||
return fd
|
return fd
|
||||||
with self.open("non-existent", "r", opener=opener) as f:
|
with self.open("non-existent", "r", opener=opener) as f:
|
||||||
|
@ -894,14 +897,14 @@ class IOTest(unittest.TestCase):
|
||||||
f2.readline()
|
f2.readline()
|
||||||
|
|
||||||
def test_nonbuffered_textio(self):
|
def test_nonbuffered_textio(self):
|
||||||
with support.check_no_resource_warning(self):
|
with warnings_helper.check_no_resource_warning(self):
|
||||||
with self.assertRaises(ValueError):
|
with self.assertRaises(ValueError):
|
||||||
self.open(support.TESTFN, 'w', buffering=0)
|
self.open(os_helper.TESTFN, 'w', buffering=0)
|
||||||
|
|
||||||
def test_invalid_newline(self):
|
def test_invalid_newline(self):
|
||||||
with support.check_no_resource_warning(self):
|
with warnings_helper.check_no_resource_warning(self):
|
||||||
with self.assertRaises(ValueError):
|
with self.assertRaises(ValueError):
|
||||||
self.open(support.TESTFN, 'w', newline='invalid')
|
self.open(os_helper.TESTFN, 'w', newline='invalid')
|
||||||
|
|
||||||
def test_buffered_readinto_mixin(self):
|
def test_buffered_readinto_mixin(self):
|
||||||
# Test the implementation provided by BufferedIOBase
|
# Test the implementation provided by BufferedIOBase
|
||||||
|
@ -924,10 +927,10 @@ class IOTest(unittest.TestCase):
|
||||||
with self.open(path, "r") as f:
|
with self.open(path, "r") as f:
|
||||||
self.assertEqual(f.read(), "egg\n")
|
self.assertEqual(f.read(), "egg\n")
|
||||||
|
|
||||||
check_path_succeeds(FakePath(support.TESTFN))
|
check_path_succeeds(FakePath(os_helper.TESTFN))
|
||||||
check_path_succeeds(FakePath(support.TESTFN.encode('utf-8')))
|
check_path_succeeds(FakePath(os_helper.TESTFN.encode('utf-8')))
|
||||||
|
|
||||||
with self.open(support.TESTFN, "w") as f:
|
with self.open(os_helper.TESTFN, "w") as f:
|
||||||
bad_path = FakePath(f.fileno())
|
bad_path = FakePath(f.fileno())
|
||||||
with self.assertRaises(TypeError):
|
with self.assertRaises(TypeError):
|
||||||
self.open(bad_path, 'w')
|
self.open(bad_path, 'w')
|
||||||
|
@ -942,7 +945,7 @@ class IOTest(unittest.TestCase):
|
||||||
|
|
||||||
# ensure that refcounting is correct with some error conditions
|
# ensure that refcounting is correct with some error conditions
|
||||||
with self.assertRaisesRegex(ValueError, 'read/write/append mode'):
|
with self.assertRaisesRegex(ValueError, 'read/write/append mode'):
|
||||||
self.open(FakePath(support.TESTFN), 'rwxa')
|
self.open(FakePath(os_helper.TESTFN), 'rwxa')
|
||||||
|
|
||||||
def test_RawIOBase_readall(self):
|
def test_RawIOBase_readall(self):
|
||||||
# Exercise the default unlimited RawIOBase.read() and readall()
|
# Exercise the default unlimited RawIOBase.read() and readall()
|
||||||
|
@ -1454,9 +1457,9 @@ class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
|
||||||
l = list(range(256)) * N
|
l = list(range(256)) * N
|
||||||
random.shuffle(l)
|
random.shuffle(l)
|
||||||
s = bytes(bytearray(l))
|
s = bytes(bytearray(l))
|
||||||
with self.open(support.TESTFN, "wb") as f:
|
with self.open(os_helper.TESTFN, "wb") as f:
|
||||||
f.write(s)
|
f.write(s)
|
||||||
with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
|
with self.open(os_helper.TESTFN, self.read_mode, buffering=0) as raw:
|
||||||
bufio = self.tp(raw, 8)
|
bufio = self.tp(raw, 8)
|
||||||
errors = []
|
errors = []
|
||||||
results = []
|
results = []
|
||||||
|
@ -1482,7 +1485,7 @@ class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
|
||||||
c = bytes(bytearray([i]))
|
c = bytes(bytearray([i]))
|
||||||
self.assertEqual(s.count(c), N)
|
self.assertEqual(s.count(c), N)
|
||||||
finally:
|
finally:
|
||||||
support.unlink(support.TESTFN)
|
os_helper.unlink(os_helper.TESTFN)
|
||||||
|
|
||||||
def test_unseekable(self):
|
def test_unseekable(self):
|
||||||
bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
|
bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
|
||||||
|
@ -1572,9 +1575,9 @@ class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
|
||||||
def test_garbage_collection(self):
|
def test_garbage_collection(self):
|
||||||
# C BufferedReader objects are collected.
|
# C BufferedReader objects are collected.
|
||||||
# The Python version has __del__, so it ends into gc.garbage instead
|
# The Python version has __del__, so it ends into gc.garbage instead
|
||||||
self.addCleanup(support.unlink, support.TESTFN)
|
self.addCleanup(os_helper.unlink, os_helper.TESTFN)
|
||||||
with support.check_warnings(('', ResourceWarning)):
|
with warnings_helper.check_warnings(('', ResourceWarning)):
|
||||||
rawio = self.FileIO(support.TESTFN, "w+b")
|
rawio = self.FileIO(os_helper.TESTFN, "w+b")
|
||||||
f = self.tp(rawio)
|
f = self.tp(rawio)
|
||||||
f.f = f
|
f.f = f
|
||||||
wr = weakref.ref(f)
|
wr = weakref.ref(f)
|
||||||
|
@ -1791,26 +1794,26 @@ class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
|
||||||
|
|
||||||
def test_truncate(self):
|
def test_truncate(self):
|
||||||
# Truncate implicitly flushes the buffer.
|
# Truncate implicitly flushes the buffer.
|
||||||
self.addCleanup(support.unlink, support.TESTFN)
|
self.addCleanup(os_helper.unlink, os_helper.TESTFN)
|
||||||
with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
|
with self.open(os_helper.TESTFN, self.write_mode, buffering=0) as raw:
|
||||||
bufio = self.tp(raw, 8)
|
bufio = self.tp(raw, 8)
|
||||||
bufio.write(b"abcdef")
|
bufio.write(b"abcdef")
|
||||||
self.assertEqual(bufio.truncate(3), 3)
|
self.assertEqual(bufio.truncate(3), 3)
|
||||||
self.assertEqual(bufio.tell(), 6)
|
self.assertEqual(bufio.tell(), 6)
|
||||||
with self.open(support.TESTFN, "rb", buffering=0) as f:
|
with self.open(os_helper.TESTFN, "rb", buffering=0) as f:
|
||||||
self.assertEqual(f.read(), b"abc")
|
self.assertEqual(f.read(), b"abc")
|
||||||
|
|
||||||
def test_truncate_after_write(self):
|
def test_truncate_after_write(self):
|
||||||
# Ensure that truncate preserves the file position after
|
# Ensure that truncate preserves the file position after
|
||||||
# writes longer than the buffer size.
|
# writes longer than the buffer size.
|
||||||
# Issue: https://bugs.python.org/issue32228
|
# Issue: https://bugs.python.org/issue32228
|
||||||
self.addCleanup(support.unlink, support.TESTFN)
|
self.addCleanup(os_helper.unlink, os_helper.TESTFN)
|
||||||
with self.open(support.TESTFN, "wb") as f:
|
with self.open(os_helper.TESTFN, "wb") as f:
|
||||||
# Fill with some buffer
|
# Fill with some buffer
|
||||||
f.write(b'\x00' * 10000)
|
f.write(b'\x00' * 10000)
|
||||||
buffer_sizes = [8192, 4096, 200]
|
buffer_sizes = [8192, 4096, 200]
|
||||||
for buffer_size in buffer_sizes:
|
for buffer_size in buffer_sizes:
|
||||||
with self.open(support.TESTFN, "r+b", buffering=buffer_size) as f:
|
with self.open(os_helper.TESTFN, "r+b", buffering=buffer_size) as f:
|
||||||
f.write(b'\x00' * (buffer_size + 1))
|
f.write(b'\x00' * (buffer_size + 1))
|
||||||
# After write write_pos and write_end are set to 0
|
# After write write_pos and write_end are set to 0
|
||||||
f.read(1)
|
f.read(1)
|
||||||
|
@ -1838,7 +1841,7 @@ class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
|
||||||
# writing the buffer to the raw streams. This is in addition
|
# writing the buffer to the raw streams. This is in addition
|
||||||
# to concurrency issues due to switching threads in the middle
|
# to concurrency issues due to switching threads in the middle
|
||||||
# of Python code.
|
# of Python code.
|
||||||
with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
|
with self.open(os_helper.TESTFN, self.write_mode, buffering=0) as raw:
|
||||||
bufio = self.tp(raw, 8)
|
bufio = self.tp(raw, 8)
|
||||||
errors = []
|
errors = []
|
||||||
def f():
|
def f():
|
||||||
|
@ -1858,12 +1861,12 @@ class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
|
||||||
self.assertFalse(errors,
|
self.assertFalse(errors,
|
||||||
"the following exceptions were caught: %r" % errors)
|
"the following exceptions were caught: %r" % errors)
|
||||||
bufio.close()
|
bufio.close()
|
||||||
with self.open(support.TESTFN, "rb") as f:
|
with self.open(os_helper.TESTFN, "rb") as f:
|
||||||
s = f.read()
|
s = f.read()
|
||||||
for i in range(256):
|
for i in range(256):
|
||||||
self.assertEqual(s.count(bytes([i])), N)
|
self.assertEqual(s.count(bytes([i])), N)
|
||||||
finally:
|
finally:
|
||||||
support.unlink(support.TESTFN)
|
os_helper.unlink(os_helper.TESTFN)
|
||||||
|
|
||||||
def test_misbehaved_io(self):
|
def test_misbehaved_io(self):
|
||||||
rawio = self.MisbehavedRawIO()
|
rawio = self.MisbehavedRawIO()
|
||||||
|
@ -1931,9 +1934,9 @@ class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
|
||||||
# C BufferedWriter objects are collected, and collecting them flushes
|
# C BufferedWriter objects are collected, and collecting them flushes
|
||||||
# all data to disk.
|
# all data to disk.
|
||||||
# The Python version has __del__, so it ends into gc.garbage instead
|
# The Python version has __del__, so it ends into gc.garbage instead
|
||||||
self.addCleanup(support.unlink, support.TESTFN)
|
self.addCleanup(os_helper.unlink, os_helper.TESTFN)
|
||||||
with support.check_warnings(('', ResourceWarning)):
|
with warnings_helper.check_warnings(('', ResourceWarning)):
|
||||||
rawio = self.FileIO(support.TESTFN, "w+b")
|
rawio = self.FileIO(os_helper.TESTFN, "w+b")
|
||||||
f = self.tp(rawio)
|
f = self.tp(rawio)
|
||||||
f.write(b"123xxx")
|
f.write(b"123xxx")
|
||||||
f.x = f
|
f.x = f
|
||||||
|
@ -1941,7 +1944,7 @@ class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
|
||||||
del f
|
del f
|
||||||
support.gc_collect()
|
support.gc_collect()
|
||||||
self.assertIsNone(wr(), wr)
|
self.assertIsNone(wr(), wr)
|
||||||
with self.open(support.TESTFN, "rb") as f:
|
with self.open(os_helper.TESTFN, "rb") as f:
|
||||||
self.assertEqual(f.read(), b"123xxx")
|
self.assertEqual(f.read(), b"123xxx")
|
||||||
|
|
||||||
def test_args_error(self):
|
def test_args_error(self):
|
||||||
|
@ -2579,10 +2582,10 @@ class TextIOWrapperTest(unittest.TestCase):
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
|
self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
|
||||||
self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
|
self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
|
||||||
support.unlink(support.TESTFN)
|
os_helper.unlink(os_helper.TESTFN)
|
||||||
|
|
||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
support.unlink(support.TESTFN)
|
os_helper.unlink(os_helper.TESTFN)
|
||||||
|
|
||||||
def test_constructor(self):
|
def test_constructor(self):
|
||||||
r = self.BytesIO(b"\xc3\xa9\n\n")
|
r = self.BytesIO(b"\xc3\xa9\n\n")
|
||||||
|
@ -2918,11 +2921,11 @@ class TextIOWrapperTest(unittest.TestCase):
|
||||||
def test_basic_io(self):
|
def test_basic_io(self):
|
||||||
for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
|
for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
|
||||||
for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
|
for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
|
||||||
f = self.open(support.TESTFN, "w+", encoding=enc)
|
f = self.open(os_helper.TESTFN, "w+", encoding=enc)
|
||||||
f._CHUNK_SIZE = chunksize
|
f._CHUNK_SIZE = chunksize
|
||||||
self.assertEqual(f.write("abc"), 3)
|
self.assertEqual(f.write("abc"), 3)
|
||||||
f.close()
|
f.close()
|
||||||
f = self.open(support.TESTFN, "r+", encoding=enc)
|
f = self.open(os_helper.TESTFN, "r+", encoding=enc)
|
||||||
f._CHUNK_SIZE = chunksize
|
f._CHUNK_SIZE = chunksize
|
||||||
self.assertEqual(f.tell(), 0)
|
self.assertEqual(f.tell(), 0)
|
||||||
self.assertEqual(f.read(), "abc")
|
self.assertEqual(f.read(), "abc")
|
||||||
|
@ -2967,7 +2970,7 @@ class TextIOWrapperTest(unittest.TestCase):
|
||||||
self.assertEqual(rlines, wlines)
|
self.assertEqual(rlines, wlines)
|
||||||
|
|
||||||
def test_telling(self):
|
def test_telling(self):
|
||||||
f = self.open(support.TESTFN, "w+", encoding="utf-8")
|
f = self.open(os_helper.TESTFN, "w+", encoding="utf-8")
|
||||||
p0 = f.tell()
|
p0 = f.tell()
|
||||||
f.write("\xff\n")
|
f.write("\xff\n")
|
||||||
p1 = f.tell()
|
p1 = f.tell()
|
||||||
|
@ -2995,9 +2998,9 @@ class TextIOWrapperTest(unittest.TestCase):
|
||||||
u_suffix = "\u8888\n"
|
u_suffix = "\u8888\n"
|
||||||
suffix = bytes(u_suffix.encode("utf-8"))
|
suffix = bytes(u_suffix.encode("utf-8"))
|
||||||
line = prefix + suffix
|
line = prefix + suffix
|
||||||
with self.open(support.TESTFN, "wb") as f:
|
with self.open(os_helper.TESTFN, "wb") as f:
|
||||||
f.write(line*2)
|
f.write(line*2)
|
||||||
with self.open(support.TESTFN, "r", encoding="utf-8") as f:
|
with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f:
|
||||||
s = f.read(prefix_size)
|
s = f.read(prefix_size)
|
||||||
self.assertEqual(s, str(prefix, "ascii"))
|
self.assertEqual(s, str(prefix, "ascii"))
|
||||||
self.assertEqual(f.tell(), prefix_size)
|
self.assertEqual(f.tell(), prefix_size)
|
||||||
|
@ -3006,9 +3009,9 @@ class TextIOWrapperTest(unittest.TestCase):
|
||||||
def test_seeking_too(self):
|
def test_seeking_too(self):
|
||||||
# Regression test for a specific bug
|
# Regression test for a specific bug
|
||||||
data = b'\xe0\xbf\xbf\n'
|
data = b'\xe0\xbf\xbf\n'
|
||||||
with self.open(support.TESTFN, "wb") as f:
|
with self.open(os_helper.TESTFN, "wb") as f:
|
||||||
f.write(data)
|
f.write(data)
|
||||||
with self.open(support.TESTFN, "r", encoding="utf-8") as f:
|
with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f:
|
||||||
f._CHUNK_SIZE # Just test that it exists
|
f._CHUNK_SIZE # Just test that it exists
|
||||||
f._CHUNK_SIZE = 2
|
f._CHUNK_SIZE = 2
|
||||||
f.readline()
|
f.readline()
|
||||||
|
@ -3022,17 +3025,17 @@ class TextIOWrapperTest(unittest.TestCase):
|
||||||
def test_seek_and_tell_with_data(data, min_pos=0):
|
def test_seek_and_tell_with_data(data, min_pos=0):
|
||||||
"""Tell/seek to various points within a data stream and ensure
|
"""Tell/seek to various points within a data stream and ensure
|
||||||
that the decoded data returned by read() is consistent."""
|
that the decoded data returned by read() is consistent."""
|
||||||
f = self.open(support.TESTFN, 'wb')
|
f = self.open(os_helper.TESTFN, 'wb')
|
||||||
f.write(data)
|
f.write(data)
|
||||||
f.close()
|
f.close()
|
||||||
f = self.open(support.TESTFN, encoding='test_decoder')
|
f = self.open(os_helper.TESTFN, encoding='test_decoder')
|
||||||
f._CHUNK_SIZE = CHUNK_SIZE
|
f._CHUNK_SIZE = CHUNK_SIZE
|
||||||
decoded = f.read()
|
decoded = f.read()
|
||||||
f.close()
|
f.close()
|
||||||
|
|
||||||
for i in range(min_pos, len(decoded) + 1): # seek positions
|
for i in range(min_pos, len(decoded) + 1): # seek positions
|
||||||
for j in [1, 5, len(decoded) - i]: # read lengths
|
for j in [1, 5, len(decoded) - i]: # read lengths
|
||||||
f = self.open(support.TESTFN, encoding='test_decoder')
|
f = self.open(os_helper.TESTFN, encoding='test_decoder')
|
||||||
self.assertEqual(f.read(i), decoded[:i])
|
self.assertEqual(f.read(i), decoded[:i])
|
||||||
cookie = f.tell()
|
cookie = f.tell()
|
||||||
self.assertEqual(f.read(j), decoded[i:i + j])
|
self.assertEqual(f.read(j), decoded[i:i + j])
|
||||||
|
@ -3062,11 +3065,11 @@ class TextIOWrapperTest(unittest.TestCase):
|
||||||
StatefulIncrementalDecoder.codecEnabled = 0
|
StatefulIncrementalDecoder.codecEnabled = 0
|
||||||
|
|
||||||
def test_multibyte_seek_and_tell(self):
|
def test_multibyte_seek_and_tell(self):
|
||||||
f = self.open(support.TESTFN, "w", encoding="euc_jp")
|
f = self.open(os_helper.TESTFN, "w", encoding="euc_jp")
|
||||||
f.write("AB\n\u3046\u3048\n")
|
f.write("AB\n\u3046\u3048\n")
|
||||||
f.close()
|
f.close()
|
||||||
|
|
||||||
f = self.open(support.TESTFN, "r", encoding="euc_jp")
|
f = self.open(os_helper.TESTFN, "r", encoding="euc_jp")
|
||||||
self.assertEqual(f.readline(), "AB\n")
|
self.assertEqual(f.readline(), "AB\n")
|
||||||
p0 = f.tell()
|
p0 = f.tell()
|
||||||
self.assertEqual(f.readline(), "\u3046\u3048\n")
|
self.assertEqual(f.readline(), "\u3046\u3048\n")
|
||||||
|
@ -3077,7 +3080,7 @@ class TextIOWrapperTest(unittest.TestCase):
|
||||||
f.close()
|
f.close()
|
||||||
|
|
||||||
def test_seek_with_encoder_state(self):
|
def test_seek_with_encoder_state(self):
|
||||||
f = self.open(support.TESTFN, "w", encoding="euc_jis_2004")
|
f = self.open(os_helper.TESTFN, "w", encoding="euc_jis_2004")
|
||||||
f.write("\u00e6\u0300")
|
f.write("\u00e6\u0300")
|
||||||
p0 = f.tell()
|
p0 = f.tell()
|
||||||
f.write("\u00e6")
|
f.write("\u00e6")
|
||||||
|
@ -3085,7 +3088,7 @@ class TextIOWrapperTest(unittest.TestCase):
|
||||||
f.write("\u0300")
|
f.write("\u0300")
|
||||||
f.close()
|
f.close()
|
||||||
|
|
||||||
f = self.open(support.TESTFN, "r", encoding="euc_jis_2004")
|
f = self.open(os_helper.TESTFN, "r", encoding="euc_jis_2004")
|
||||||
self.assertEqual(f.readline(), "\u00e6\u0300\u0300")
|
self.assertEqual(f.readline(), "\u00e6\u0300\u0300")
|
||||||
f.close()
|
f.close()
|
||||||
|
|
||||||
|
@ -3229,7 +3232,7 @@ class TextIOWrapperTest(unittest.TestCase):
|
||||||
|
|
||||||
def test_append_bom(self):
|
def test_append_bom(self):
|
||||||
# The BOM is not written again when appending to a non-empty file
|
# The BOM is not written again when appending to a non-empty file
|
||||||
filename = support.TESTFN
|
filename = os_helper.TESTFN
|
||||||
for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
|
for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
|
||||||
with self.open(filename, 'w', encoding=charset) as f:
|
with self.open(filename, 'w', encoding=charset) as f:
|
||||||
f.write('aaa')
|
f.write('aaa')
|
||||||
|
@ -3244,7 +3247,7 @@ class TextIOWrapperTest(unittest.TestCase):
|
||||||
|
|
||||||
def test_seek_bom(self):
|
def test_seek_bom(self):
|
||||||
# Same test, but when seeking manually
|
# Same test, but when seeking manually
|
||||||
filename = support.TESTFN
|
filename = os_helper.TESTFN
|
||||||
for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
|
for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
|
||||||
with self.open(filename, 'w', encoding=charset) as f:
|
with self.open(filename, 'w', encoding=charset) as f:
|
||||||
f.write('aaa')
|
f.write('aaa')
|
||||||
|
@ -3259,7 +3262,7 @@ class TextIOWrapperTest(unittest.TestCase):
|
||||||
|
|
||||||
def test_seek_append_bom(self):
|
def test_seek_append_bom(self):
|
||||||
# Same test, but first seek to the start and then to the end
|
# Same test, but first seek to the start and then to the end
|
||||||
filename = support.TESTFN
|
filename = os_helper.TESTFN
|
||||||
for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
|
for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
|
||||||
with self.open(filename, 'w', encoding=charset) as f:
|
with self.open(filename, 'w', encoding=charset) as f:
|
||||||
f.write('aaa')
|
f.write('aaa')
|
||||||
|
@ -3271,16 +3274,16 @@ class TextIOWrapperTest(unittest.TestCase):
|
||||||
self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
|
self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
|
||||||
|
|
||||||
def test_errors_property(self):
|
def test_errors_property(self):
|
||||||
with self.open(support.TESTFN, "w") as f:
|
with self.open(os_helper.TESTFN, "w") as f:
|
||||||
self.assertEqual(f.errors, "strict")
|
self.assertEqual(f.errors, "strict")
|
||||||
with self.open(support.TESTFN, "w", errors="replace") as f:
|
with self.open(os_helper.TESTFN, "w", errors="replace") as f:
|
||||||
self.assertEqual(f.errors, "replace")
|
self.assertEqual(f.errors, "replace")
|
||||||
|
|
||||||
@support.no_tracing
|
@support.no_tracing
|
||||||
def test_threads_write(self):
|
def test_threads_write(self):
|
||||||
# Issue6750: concurrent writes could duplicate data
|
# Issue6750: concurrent writes could duplicate data
|
||||||
event = threading.Event()
|
event = threading.Event()
|
||||||
with self.open(support.TESTFN, "w", buffering=1) as f:
|
with self.open(os_helper.TESTFN, "w", buffering=1) as f:
|
||||||
def run(n):
|
def run(n):
|
||||||
text = "Thread%03d\n" % n
|
text = "Thread%03d\n" % n
|
||||||
event.wait()
|
event.wait()
|
||||||
|
@ -3289,7 +3292,7 @@ class TextIOWrapperTest(unittest.TestCase):
|
||||||
for x in range(20)]
|
for x in range(20)]
|
||||||
with threading_helper.start_threads(threads, event.set):
|
with threading_helper.start_threads(threads, event.set):
|
||||||
time.sleep(0.02)
|
time.sleep(0.02)
|
||||||
with self.open(support.TESTFN) as f:
|
with self.open(os_helper.TESTFN) as f:
|
||||||
content = f.read()
|
content = f.read()
|
||||||
for n in range(20):
|
for n in range(20):
|
||||||
self.assertEqual(content.count("Thread%03d\n" % n), 1)
|
self.assertEqual(content.count("Thread%03d\n" % n), 1)
|
||||||
|
@ -3732,8 +3735,8 @@ class CTextIOWrapperTest(TextIOWrapperTest):
|
||||||
# C TextIOWrapper objects are collected, and collecting them flushes
|
# C TextIOWrapper objects are collected, and collecting them flushes
|
||||||
# all data to disk.
|
# all data to disk.
|
||||||
# The Python version has __del__, so it ends in gc.garbage instead.
|
# The Python version has __del__, so it ends in gc.garbage instead.
|
||||||
with support.check_warnings(('', ResourceWarning)):
|
with warnings_helper.check_warnings(('', ResourceWarning)):
|
||||||
rawio = io.FileIO(support.TESTFN, "wb")
|
rawio = io.FileIO(os_helper.TESTFN, "wb")
|
||||||
b = self.BufferedWriter(rawio)
|
b = self.BufferedWriter(rawio)
|
||||||
t = self.TextIOWrapper(b, encoding="ascii")
|
t = self.TextIOWrapper(b, encoding="ascii")
|
||||||
t.write("456def")
|
t.write("456def")
|
||||||
|
@ -3742,7 +3745,7 @@ class CTextIOWrapperTest(TextIOWrapperTest):
|
||||||
del t
|
del t
|
||||||
support.gc_collect()
|
support.gc_collect()
|
||||||
self.assertIsNone(wr(), wr)
|
self.assertIsNone(wr(), wr)
|
||||||
with self.open(support.TESTFN, "rb") as f:
|
with self.open(os_helper.TESTFN, "rb") as f:
|
||||||
self.assertEqual(f.read(), b"456def")
|
self.assertEqual(f.read(), b"456def")
|
||||||
|
|
||||||
def test_rwpair_cleared_before_textio(self):
|
def test_rwpair_cleared_before_textio(self):
|
||||||
|
@ -3899,7 +3902,7 @@ class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
|
||||||
class MiscIOTest(unittest.TestCase):
|
class MiscIOTest(unittest.TestCase):
|
||||||
|
|
||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
support.unlink(support.TESTFN)
|
os_helper.unlink(os_helper.TESTFN)
|
||||||
|
|
||||||
def test___all__(self):
|
def test___all__(self):
|
||||||
for name in self.io.__all__:
|
for name in self.io.__all__:
|
||||||
|
@ -3913,21 +3916,21 @@ class MiscIOTest(unittest.TestCase):
|
||||||
self.assertTrue(issubclass(obj, self.IOBase))
|
self.assertTrue(issubclass(obj, self.IOBase))
|
||||||
|
|
||||||
def test_attributes(self):
|
def test_attributes(self):
|
||||||
f = self.open(support.TESTFN, "wb", buffering=0)
|
f = self.open(os_helper.TESTFN, "wb", buffering=0)
|
||||||
self.assertEqual(f.mode, "wb")
|
self.assertEqual(f.mode, "wb")
|
||||||
f.close()
|
f.close()
|
||||||
|
|
||||||
with support.check_warnings(('', DeprecationWarning)):
|
with warnings_helper.check_warnings(('', DeprecationWarning)):
|
||||||
f = self.open(support.TESTFN, "U")
|
f = self.open(os_helper.TESTFN, "U")
|
||||||
self.assertEqual(f.name, support.TESTFN)
|
self.assertEqual(f.name, os_helper.TESTFN)
|
||||||
self.assertEqual(f.buffer.name, support.TESTFN)
|
self.assertEqual(f.buffer.name, os_helper.TESTFN)
|
||||||
self.assertEqual(f.buffer.raw.name, support.TESTFN)
|
self.assertEqual(f.buffer.raw.name, os_helper.TESTFN)
|
||||||
self.assertEqual(f.mode, "U")
|
self.assertEqual(f.mode, "U")
|
||||||
self.assertEqual(f.buffer.mode, "rb")
|
self.assertEqual(f.buffer.mode, "rb")
|
||||||
self.assertEqual(f.buffer.raw.mode, "rb")
|
self.assertEqual(f.buffer.raw.mode, "rb")
|
||||||
f.close()
|
f.close()
|
||||||
|
|
||||||
f = self.open(support.TESTFN, "w+")
|
f = self.open(os_helper.TESTFN, "w+")
|
||||||
self.assertEqual(f.mode, "w+")
|
self.assertEqual(f.mode, "w+")
|
||||||
self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
|
self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
|
||||||
self.assertEqual(f.buffer.raw.mode, "rb+")
|
self.assertEqual(f.buffer.raw.mode, "rb+")
|
||||||
|
@ -3969,7 +3972,7 @@ class MiscIOTest(unittest.TestCase):
|
||||||
{"mode": "w+", "buffering": 2},
|
{"mode": "w+", "buffering": 2},
|
||||||
{"mode": "w+b", "buffering": 0},
|
{"mode": "w+b", "buffering": 0},
|
||||||
]:
|
]:
|
||||||
f = self.open(support.TESTFN, **kwargs)
|
f = self.open(os_helper.TESTFN, **kwargs)
|
||||||
f.close()
|
f.close()
|
||||||
self.assertRaises(ValueError, f.flush)
|
self.assertRaises(ValueError, f.flush)
|
||||||
self.assertRaises(ValueError, f.fileno)
|
self.assertRaises(ValueError, f.fileno)
|
||||||
|
@ -4019,17 +4022,17 @@ class MiscIOTest(unittest.TestCase):
|
||||||
self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
|
self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
|
||||||
|
|
||||||
def _check_abc_inheritance(self, abcmodule):
|
def _check_abc_inheritance(self, abcmodule):
|
||||||
with self.open(support.TESTFN, "wb", buffering=0) as f:
|
with self.open(os_helper.TESTFN, "wb", buffering=0) as f:
|
||||||
self.assertIsInstance(f, abcmodule.IOBase)
|
self.assertIsInstance(f, abcmodule.IOBase)
|
||||||
self.assertIsInstance(f, abcmodule.RawIOBase)
|
self.assertIsInstance(f, abcmodule.RawIOBase)
|
||||||
self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
|
self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
|
||||||
self.assertNotIsInstance(f, abcmodule.TextIOBase)
|
self.assertNotIsInstance(f, abcmodule.TextIOBase)
|
||||||
with self.open(support.TESTFN, "wb") as f:
|
with self.open(os_helper.TESTFN, "wb") as f:
|
||||||
self.assertIsInstance(f, abcmodule.IOBase)
|
self.assertIsInstance(f, abcmodule.IOBase)
|
||||||
self.assertNotIsInstance(f, abcmodule.RawIOBase)
|
self.assertNotIsInstance(f, abcmodule.RawIOBase)
|
||||||
self.assertIsInstance(f, abcmodule.BufferedIOBase)
|
self.assertIsInstance(f, abcmodule.BufferedIOBase)
|
||||||
self.assertNotIsInstance(f, abcmodule.TextIOBase)
|
self.assertNotIsInstance(f, abcmodule.TextIOBase)
|
||||||
with self.open(support.TESTFN, "w") as f:
|
with self.open(os_helper.TESTFN, "w") as f:
|
||||||
self.assertIsInstance(f, abcmodule.IOBase)
|
self.assertIsInstance(f, abcmodule.IOBase)
|
||||||
self.assertNotIsInstance(f, abcmodule.RawIOBase)
|
self.assertNotIsInstance(f, abcmodule.RawIOBase)
|
||||||
self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
|
self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
|
||||||
|
@ -4053,9 +4056,9 @@ class MiscIOTest(unittest.TestCase):
|
||||||
self.assertIn(r, str(cm.warning.args[0]))
|
self.assertIn(r, str(cm.warning.args[0]))
|
||||||
|
|
||||||
def test_warn_on_dealloc(self):
|
def test_warn_on_dealloc(self):
|
||||||
self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
|
self._check_warn_on_dealloc(os_helper.TESTFN, "wb", buffering=0)
|
||||||
self._check_warn_on_dealloc(support.TESTFN, "wb")
|
self._check_warn_on_dealloc(os_helper.TESTFN, "wb")
|
||||||
self._check_warn_on_dealloc(support.TESTFN, "w")
|
self._check_warn_on_dealloc(os_helper.TESTFN, "w")
|
||||||
|
|
||||||
def _check_warn_on_dealloc_fd(self, *args, **kwargs):
|
def _check_warn_on_dealloc_fd(self, *args, **kwargs):
|
||||||
fds = []
|
fds = []
|
||||||
|
@ -4073,7 +4076,7 @@ class MiscIOTest(unittest.TestCase):
|
||||||
# When using closefd=False, there's no warning
|
# When using closefd=False, there's no warning
|
||||||
r, w = os.pipe()
|
r, w = os.pipe()
|
||||||
fds += r, w
|
fds += r, w
|
||||||
with support.check_no_resource_warning(self):
|
with warnings_helper.check_no_resource_warning(self):
|
||||||
open(r, *args, closefd=False, **kwargs)
|
open(r, *args, closefd=False, **kwargs)
|
||||||
|
|
||||||
def test_warn_on_dealloc_fd(self):
|
def test_warn_on_dealloc_fd(self):
|
||||||
|
@ -4096,7 +4099,7 @@ class MiscIOTest(unittest.TestCase):
|
||||||
{"mode": "w+b", "buffering": 0},
|
{"mode": "w+b", "buffering": 0},
|
||||||
]:
|
]:
|
||||||
for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
|
for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
|
||||||
with self.open(support.TESTFN, **kwargs) as f:
|
with self.open(os_helper.TESTFN, **kwargs) as f:
|
||||||
self.assertRaises(TypeError, pickle.dumps, f, protocol)
|
self.assertRaises(TypeError, pickle.dumps, f, protocol)
|
||||||
|
|
||||||
def test_nonblock_pipe_write_bigbuf(self):
|
def test_nonblock_pipe_write_bigbuf(self):
|
||||||
|
@ -4159,20 +4162,20 @@ class MiscIOTest(unittest.TestCase):
|
||||||
|
|
||||||
def test_create_fail(self):
|
def test_create_fail(self):
|
||||||
# 'x' mode fails if file is existing
|
# 'x' mode fails if file is existing
|
||||||
with self.open(support.TESTFN, 'w'):
|
with self.open(os_helper.TESTFN, 'w'):
|
||||||
pass
|
pass
|
||||||
self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x')
|
self.assertRaises(FileExistsError, self.open, os_helper.TESTFN, 'x')
|
||||||
|
|
||||||
def test_create_writes(self):
|
def test_create_writes(self):
|
||||||
# 'x' mode opens for writing
|
# 'x' mode opens for writing
|
||||||
with self.open(support.TESTFN, 'xb') as f:
|
with self.open(os_helper.TESTFN, 'xb') as f:
|
||||||
f.write(b"spam")
|
f.write(b"spam")
|
||||||
with self.open(support.TESTFN, 'rb') as f:
|
with self.open(os_helper.TESTFN, 'rb') as f:
|
||||||
self.assertEqual(b"spam", f.read())
|
self.assertEqual(b"spam", f.read())
|
||||||
|
|
||||||
def test_open_allargs(self):
|
def test_open_allargs(self):
|
||||||
# there used to be a buffer overflow in the parser for rawmode
|
# there used to be a buffer overflow in the parser for rawmode
|
||||||
self.assertRaises(ValueError, self.open, support.TESTFN, 'rwax+')
|
self.assertRaises(ValueError, self.open, os_helper.TESTFN, 'rwax+')
|
||||||
|
|
||||||
def test_check_encoding_errors(self):
|
def test_check_encoding_errors(self):
|
||||||
# bpo-37388: open() and TextIOWrapper must check encoding and errors
|
# bpo-37388: open() and TextIOWrapper must check encoding and errors
|
||||||
|
@ -4427,7 +4430,7 @@ class SignalsTest(unittest.TestCase):
|
||||||
"""Check that a buffered write, when it gets interrupted (either
|
"""Check that a buffered write, when it gets interrupted (either
|
||||||
returning a partial result or EINTR), properly invokes the signal
|
returning a partial result or EINTR), properly invokes the signal
|
||||||
handler and retries if the latter returned successfully."""
|
handler and retries if the latter returned successfully."""
|
||||||
select = support.import_module("select")
|
select = import_helper.import_module("select")
|
||||||
|
|
||||||
# A quantity that exceeds the buffer size of an anonymous pipe's
|
# A quantity that exceeds the buffer size of an anonymous pipe's
|
||||||
# write end.
|
# write end.
|
||||||
|
|
|
@ -4,10 +4,12 @@ import doctest
|
||||||
import unittest
|
import unittest
|
||||||
|
|
||||||
from test import support
|
from test import support
|
||||||
|
from test.support import import_helper
|
||||||
|
|
||||||
|
|
||||||
# import json with and without accelerations
|
# import json with and without accelerations
|
||||||
cjson = support.import_fresh_module('json', fresh=['_json'])
|
cjson = import_helper.import_fresh_module('json', fresh=['_json'])
|
||||||
pyjson = support.import_fresh_module('json', blocked=['_json'])
|
pyjson = import_helper.import_fresh_module('json', blocked=['_json'])
|
||||||
# JSONDecodeError is cached inside the _json module
|
# JSONDecodeError is cached inside the _json module
|
||||||
cjson.JSONDecodeError = cjson.decoder.JSONDecodeError = json.JSONDecodeError
|
cjson.JSONDecodeError = cjson.decoder.JSONDecodeError = json.JSONDecodeError
|
||||||
|
|
||||||
|
|
|
@ -6,6 +6,7 @@ import unittest
|
||||||
import subprocess
|
import subprocess
|
||||||
|
|
||||||
from test import support
|
from test import support
|
||||||
|
from test.support import os_helper
|
||||||
from test.support.script_helper import assert_python_ok
|
from test.support.script_helper import assert_python_ok
|
||||||
|
|
||||||
|
|
||||||
|
@ -91,7 +92,7 @@ class TestTool(unittest.TestCase):
|
||||||
self.assertEqual(process.stderr, '')
|
self.assertEqual(process.stderr, '')
|
||||||
|
|
||||||
def _create_infile(self, data=None):
|
def _create_infile(self, data=None):
|
||||||
infile = support.TESTFN
|
infile = os_helper.TESTFN
|
||||||
with open(infile, "w", encoding="utf-8") as fp:
|
with open(infile, "w", encoding="utf-8") as fp:
|
||||||
self.addCleanup(os.remove, infile)
|
self.addCleanup(os.remove, infile)
|
||||||
fp.write(data or self.data)
|
fp.write(data or self.data)
|
||||||
|
@ -121,7 +122,7 @@ class TestTool(unittest.TestCase):
|
||||||
|
|
||||||
def test_infile_outfile(self):
|
def test_infile_outfile(self):
|
||||||
infile = self._create_infile()
|
infile = self._create_infile()
|
||||||
outfile = support.TESTFN + '.out'
|
outfile = os_helper.TESTFN + '.out'
|
||||||
rc, out, err = assert_python_ok('-m', 'json.tool', infile, outfile)
|
rc, out, err = assert_python_ok('-m', 'json.tool', infile, outfile)
|
||||||
self.addCleanup(os.remove, outfile)
|
self.addCleanup(os.remove, outfile)
|
||||||
with open(outfile, "r") as fp:
|
with open(outfile, "r") as fp:
|
||||||
|
@ -189,7 +190,7 @@ class TestTool(unittest.TestCase):
|
||||||
|
|
||||||
def test_no_ensure_ascii_flag(self):
|
def test_no_ensure_ascii_flag(self):
|
||||||
infile = self._create_infile('{"key":"💩"}')
|
infile = self._create_infile('{"key":"💩"}')
|
||||||
outfile = support.TESTFN + '.out'
|
outfile = os_helper.TESTFN + '.out'
|
||||||
self.addCleanup(os.remove, outfile)
|
self.addCleanup(os.remove, outfile)
|
||||||
assert_python_ok('-m', 'json.tool', '--no-ensure-ascii', infile, outfile)
|
assert_python_ok('-m', 'json.tool', '--no-ensure-ascii', infile, outfile)
|
||||||
with open(outfile, "rb") as f:
|
with open(outfile, "rb") as f:
|
||||||
|
@ -200,7 +201,7 @@ class TestTool(unittest.TestCase):
|
||||||
|
|
||||||
def test_ensure_ascii_default(self):
|
def test_ensure_ascii_default(self):
|
||||||
infile = self._create_infile('{"key":"💩"}')
|
infile = self._create_infile('{"key":"💩"}')
|
||||||
outfile = support.TESTFN + '.out'
|
outfile = os_helper.TESTFN + '.out'
|
||||||
self.addCleanup(os.remove, outfile)
|
self.addCleanup(os.remove, outfile)
|
||||||
assert_python_ok('-m', 'json.tool', infile, outfile)
|
assert_python_ok('-m', 'json.tool', infile, outfile)
|
||||||
with open(outfile, "rb") as f:
|
with open(outfile, "rb") as f:
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
import netrc, os, unittest, sys, tempfile, textwrap
|
import netrc, os, unittest, sys, tempfile, textwrap
|
||||||
from test import support
|
from test import support
|
||||||
|
from test.support import os_helper
|
||||||
|
|
||||||
|
|
||||||
class NetrcTestCase(unittest.TestCase):
|
class NetrcTestCase(unittest.TestCase):
|
||||||
|
@ -108,16 +109,16 @@ class NetrcTestCase(unittest.TestCase):
|
||||||
def test_security(self):
|
def test_security(self):
|
||||||
# This test is incomplete since we are normally not run as root and
|
# This test is incomplete since we are normally not run as root and
|
||||||
# therefore can't test the file ownership being wrong.
|
# therefore can't test the file ownership being wrong.
|
||||||
d = support.TESTFN
|
d = os_helper.TESTFN
|
||||||
os.mkdir(d)
|
os.mkdir(d)
|
||||||
self.addCleanup(support.rmtree, d)
|
self.addCleanup(os_helper.rmtree, d)
|
||||||
fn = os.path.join(d, '.netrc')
|
fn = os.path.join(d, '.netrc')
|
||||||
with open(fn, 'wt') as f:
|
with open(fn, 'wt') as f:
|
||||||
f.write("""\
|
f.write("""\
|
||||||
machine foo.domain.com login bar password pass
|
machine foo.domain.com login bar password pass
|
||||||
default login foo password pass
|
default login foo password pass
|
||||||
""")
|
""")
|
||||||
with support.EnvironmentVarGuard() as environ:
|
with os_helper.EnvironmentVarGuard() as environ:
|
||||||
environ.set('HOME', d)
|
environ.set('HOME', d)
|
||||||
os.chmod(fn, 0o600)
|
os.chmod(fn, 0o600)
|
||||||
nrc = netrc.netrc()
|
nrc = netrc.netrc()
|
||||||
|
@ -127,10 +128,10 @@ class NetrcTestCase(unittest.TestCase):
|
||||||
self.assertRaises(netrc.NetrcParseError, netrc.netrc)
|
self.assertRaises(netrc.NetrcParseError, netrc.netrc)
|
||||||
|
|
||||||
def test_file_not_found_in_home(self):
|
def test_file_not_found_in_home(self):
|
||||||
d = support.TESTFN
|
d = os_helper.TESTFN
|
||||||
os.mkdir(d)
|
os.mkdir(d)
|
||||||
self.addCleanup(support.rmtree, d)
|
self.addCleanup(os_helper.rmtree, d)
|
||||||
with support.EnvironmentVarGuard() as environ:
|
with os_helper.EnvironmentVarGuard() as environ:
|
||||||
environ.set('HOME', d)
|
environ.set('HOME', d)
|
||||||
self.assertRaises(FileNotFoundError, netrc.netrc)
|
self.assertRaises(FileNotFoundError, netrc.netrc)
|
||||||
|
|
||||||
|
@ -139,9 +140,9 @@ class NetrcTestCase(unittest.TestCase):
|
||||||
file='unlikely_netrc')
|
file='unlikely_netrc')
|
||||||
|
|
||||||
def test_home_not_set(self):
|
def test_home_not_set(self):
|
||||||
fake_home = support.TESTFN
|
fake_home = os_helper.TESTFN
|
||||||
os.mkdir(fake_home)
|
os.mkdir(fake_home)
|
||||||
self.addCleanup(support.rmtree, fake_home)
|
self.addCleanup(os_helper.rmtree, fake_home)
|
||||||
fake_netrc_path = os.path.join(fake_home, '.netrc')
|
fake_netrc_path = os.path.join(fake_home, '.netrc')
|
||||||
with open(fake_netrc_path, 'w') as f:
|
with open(fake_netrc_path, 'w') as f:
|
||||||
f.write('machine foo.domain.com login bar password pass')
|
f.write('machine foo.domain.com login bar password pass')
|
||||||
|
@ -152,7 +153,7 @@ class NetrcTestCase(unittest.TestCase):
|
||||||
|
|
||||||
def fake_expanduser(s):
|
def fake_expanduser(s):
|
||||||
called.append(s)
|
called.append(s)
|
||||||
with support.EnvironmentVarGuard() as environ:
|
with os_helper.EnvironmentVarGuard() as environ:
|
||||||
environ.set('HOME', fake_home)
|
environ.set('HOME', fake_home)
|
||||||
environ.set('USERPROFILE', fake_home)
|
environ.set('USERPROFILE', fake_home)
|
||||||
result = orig_expanduser(s)
|
result = orig_expanduser(s)
|
||||||
|
|
|
@ -1,8 +1,10 @@
|
||||||
from test import support
|
from test import support
|
||||||
|
from test.support import import_helper
|
||||||
import unittest
|
import unittest
|
||||||
|
|
||||||
|
|
||||||
# Skip test if nis module does not exist.
|
# Skip test if nis module does not exist.
|
||||||
nis = support.import_module('nis')
|
nis = import_helper.import_module('nis')
|
||||||
|
|
||||||
|
|
||||||
class NisTests(unittest.TestCase):
|
class NisTests(unittest.TestCase):
|
||||||
|
|
|
@ -10,6 +10,7 @@ import weakref
|
||||||
|
|
||||||
import unittest
|
import unittest
|
||||||
from test import support
|
from test import support
|
||||||
|
from test.support import import_helper
|
||||||
|
|
||||||
from test.pickletester import AbstractHookTests
|
from test.pickletester import AbstractHookTests
|
||||||
from test.pickletester import AbstractUnpickleTests
|
from test.pickletester import AbstractUnpickleTests
|
||||||
|
@ -499,7 +500,7 @@ class CompatPickleTests(unittest.TestCase):
|
||||||
('builtins', name))
|
('builtins', name))
|
||||||
|
|
||||||
def test_multiprocessing_exceptions(self):
|
def test_multiprocessing_exceptions(self):
|
||||||
module = support.import_module('multiprocessing.context')
|
module = import_helper.import_module('multiprocessing.context')
|
||||||
for name, exc in get_exceptions(module):
|
for name, exc in get_exceptions(module):
|
||||||
with self.subTest(name):
|
with self.subTest(name):
|
||||||
self.assertEqual(reverse_mapping('multiprocessing.context', name),
|
self.assertEqual(reverse_mapping('multiprocessing.context', name),
|
||||||
|
|
|
@ -9,13 +9,14 @@ import tempfile
|
||||||
import unittest
|
import unittest
|
||||||
|
|
||||||
from test import support
|
from test import support
|
||||||
|
from test.support import os_helper
|
||||||
|
|
||||||
|
|
||||||
def without_source_date_epoch(fxn):
|
def without_source_date_epoch(fxn):
|
||||||
"""Runs function with SOURCE_DATE_EPOCH unset."""
|
"""Runs function with SOURCE_DATE_EPOCH unset."""
|
||||||
@functools.wraps(fxn)
|
@functools.wraps(fxn)
|
||||||
def wrapper(*args, **kwargs):
|
def wrapper(*args, **kwargs):
|
||||||
with support.EnvironmentVarGuard() as env:
|
with os_helper.EnvironmentVarGuard() as env:
|
||||||
env.unset('SOURCE_DATE_EPOCH')
|
env.unset('SOURCE_DATE_EPOCH')
|
||||||
return fxn(*args, **kwargs)
|
return fxn(*args, **kwargs)
|
||||||
return wrapper
|
return wrapper
|
||||||
|
@ -25,7 +26,7 @@ def with_source_date_epoch(fxn):
|
||||||
"""Runs function with SOURCE_DATE_EPOCH set."""
|
"""Runs function with SOURCE_DATE_EPOCH set."""
|
||||||
@functools.wraps(fxn)
|
@functools.wraps(fxn)
|
||||||
def wrapper(*args, **kwargs):
|
def wrapper(*args, **kwargs):
|
||||||
with support.EnvironmentVarGuard() as env:
|
with os_helper.EnvironmentVarGuard() as env:
|
||||||
env['SOURCE_DATE_EPOCH'] = '123456789'
|
env['SOURCE_DATE_EPOCH'] = '123456789'
|
||||||
return fxn(*args, **kwargs)
|
return fxn(*args, **kwargs)
|
||||||
return wrapper
|
return wrapper
|
||||||
|
|
|
@ -1,7 +1,8 @@
|
||||||
import test.support
|
import test.support
|
||||||
|
from test.support import import_helper
|
||||||
|
|
||||||
# Skip test if _sqlite3 module not installed
|
# Skip test if _sqlite3 module not installed
|
||||||
test.support.import_module('_sqlite3')
|
import_helper.import_module('_sqlite3')
|
||||||
|
|
||||||
import unittest
|
import unittest
|
||||||
import sqlite3
|
import sqlite3
|
||||||
|
|
|
@ -5,6 +5,8 @@ import unittest
|
||||||
import os
|
import os
|
||||||
import sys
|
import sys
|
||||||
from test import support
|
from test import support
|
||||||
|
from test.support import os_helper
|
||||||
|
|
||||||
|
|
||||||
if not hasattr(sys.stdin, 'newlines'):
|
if not hasattr(sys.stdin, 'newlines'):
|
||||||
raise unittest.SkipTest(
|
raise unittest.SkipTest(
|
||||||
|
@ -46,29 +48,29 @@ class TestGenericUnivNewlines:
|
||||||
data = self.DATA
|
data = self.DATA
|
||||||
if "b" in self.WRITEMODE:
|
if "b" in self.WRITEMODE:
|
||||||
data = data.encode("ascii")
|
data = data.encode("ascii")
|
||||||
with self.open(support.TESTFN, self.WRITEMODE) as fp:
|
with self.open(os_helper.TESTFN, self.WRITEMODE) as fp:
|
||||||
fp.write(data)
|
fp.write(data)
|
||||||
|
|
||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
try:
|
try:
|
||||||
os.unlink(support.TESTFN)
|
os.unlink(os_helper.TESTFN)
|
||||||
except:
|
except:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def test_read(self):
|
def test_read(self):
|
||||||
with self.open(support.TESTFN, self.READMODE) as fp:
|
with self.open(os_helper.TESTFN, self.READMODE) as fp:
|
||||||
data = fp.read()
|
data = fp.read()
|
||||||
self.assertEqual(data, DATA_LF)
|
self.assertEqual(data, DATA_LF)
|
||||||
self.assertEqual(repr(fp.newlines), repr(self.NEWLINE))
|
self.assertEqual(repr(fp.newlines), repr(self.NEWLINE))
|
||||||
|
|
||||||
def test_readlines(self):
|
def test_readlines(self):
|
||||||
with self.open(support.TESTFN, self.READMODE) as fp:
|
with self.open(os_helper.TESTFN, self.READMODE) as fp:
|
||||||
data = fp.readlines()
|
data = fp.readlines()
|
||||||
self.assertEqual(data, DATA_SPLIT)
|
self.assertEqual(data, DATA_SPLIT)
|
||||||
self.assertEqual(repr(fp.newlines), repr(self.NEWLINE))
|
self.assertEqual(repr(fp.newlines), repr(self.NEWLINE))
|
||||||
|
|
||||||
def test_readline(self):
|
def test_readline(self):
|
||||||
with self.open(support.TESTFN, self.READMODE) as fp:
|
with self.open(os_helper.TESTFN, self.READMODE) as fp:
|
||||||
data = []
|
data = []
|
||||||
d = fp.readline()
|
d = fp.readline()
|
||||||
while d:
|
while d:
|
||||||
|
@ -78,7 +80,7 @@ class TestGenericUnivNewlines:
|
||||||
self.assertEqual(repr(fp.newlines), repr(self.NEWLINE))
|
self.assertEqual(repr(fp.newlines), repr(self.NEWLINE))
|
||||||
|
|
||||||
def test_seek(self):
|
def test_seek(self):
|
||||||
with self.open(support.TESTFN, self.READMODE) as fp:
|
with self.open(os_helper.TESTFN, self.READMODE) as fp:
|
||||||
fp.readline()
|
fp.readline()
|
||||||
pos = fp.tell()
|
pos = fp.tell()
|
||||||
data = fp.readlines()
|
data = fp.readlines()
|
||||||
|
@ -105,7 +107,7 @@ class TestCRLFNewlines(TestGenericUnivNewlines):
|
||||||
DATA = DATA_CRLF
|
DATA = DATA_CRLF
|
||||||
|
|
||||||
def test_tell(self):
|
def test_tell(self):
|
||||||
with self.open(support.TESTFN, self.READMODE) as fp:
|
with self.open(os_helper.TESTFN, self.READMODE) as fp:
|
||||||
self.assertEqual(repr(fp.newlines), repr(None))
|
self.assertEqual(repr(fp.newlines), repr(None))
|
||||||
data = fp.readline()
|
data = fp.readline()
|
||||||
pos = fp.tell()
|
pos = fp.tell()
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
import unittest
|
import unittest
|
||||||
from test import support
|
from test import support
|
||||||
|
from test.support import import_helper
|
||||||
import builtins
|
import builtins
|
||||||
import contextlib
|
import contextlib
|
||||||
import copy
|
import copy
|
||||||
|
@ -10,8 +11,8 @@ import sys
|
||||||
import weakref
|
import weakref
|
||||||
from unittest import mock
|
from unittest import mock
|
||||||
|
|
||||||
py_uuid = support.import_fresh_module('uuid', blocked=['_uuid'])
|
py_uuid = import_helper.import_fresh_module('uuid', blocked=['_uuid'])
|
||||||
c_uuid = support.import_fresh_module('uuid', fresh=['_uuid'])
|
c_uuid = import_helper.import_fresh_module('uuid', fresh=['_uuid'])
|
||||||
|
|
||||||
def importable(name):
|
def importable(name):
|
||||||
try:
|
try:
|
||||||
|
|
Loading…
Reference in New Issue