bpo-40275: Use new test.support helper submodules in tests (GH-21449)
This commit is contained in:
parent
488512bf49
commit
a7f5d93bb6
|
@ -1,5 +1,5 @@
|
||||||
import dis
|
import dis
|
||||||
from test.support import import_module
|
from test.support.import_helper import import_module
|
||||||
import unittest
|
import unittest
|
||||||
|
|
||||||
_opcode = import_module("_opcode")
|
_opcode = import_module("_opcode")
|
||||||
|
|
|
@ -10,8 +10,10 @@ import struct
|
||||||
import threading
|
import threading
|
||||||
|
|
||||||
from test import support
|
from test import support
|
||||||
|
from test.support import os_helper
|
||||||
from test.support import socket_helper
|
from test.support import socket_helper
|
||||||
from test.support import threading_helper
|
from test.support import threading_helper
|
||||||
|
from test.support import warnings_helper
|
||||||
from io import BytesIO
|
from io import BytesIO
|
||||||
|
|
||||||
if support.PGO:
|
if support.PGO:
|
||||||
|
@ -92,7 +94,7 @@ def bind_af_aware(sock, addr):
|
||||||
"""Helper function to bind a socket according to its family."""
|
"""Helper function to bind a socket according to its family."""
|
||||||
if HAS_UNIX_SOCKETS and sock.family == socket.AF_UNIX:
|
if HAS_UNIX_SOCKETS and sock.family == socket.AF_UNIX:
|
||||||
# Make sure the path doesn't exist.
|
# Make sure the path doesn't exist.
|
||||||
support.unlink(addr)
|
os_helper.unlink(addr)
|
||||||
socket_helper.bind_unix_socket(sock, addr)
|
socket_helper.bind_unix_socket(sock, addr)
|
||||||
else:
|
else:
|
||||||
sock.bind(addr)
|
sock.bind(addr)
|
||||||
|
@ -369,14 +371,14 @@ class DispatcherWithSendTests(unittest.TestCase):
|
||||||
class FileWrapperTest(unittest.TestCase):
|
class FileWrapperTest(unittest.TestCase):
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
self.d = b"It's not dead, it's sleeping!"
|
self.d = b"It's not dead, it's sleeping!"
|
||||||
with open(support.TESTFN, 'wb') as file:
|
with open(os_helper.TESTFN, 'wb') as file:
|
||||||
file.write(self.d)
|
file.write(self.d)
|
||||||
|
|
||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
support.unlink(support.TESTFN)
|
os_helper.unlink(os_helper.TESTFN)
|
||||||
|
|
||||||
def test_recv(self):
|
def test_recv(self):
|
||||||
fd = os.open(support.TESTFN, os.O_RDONLY)
|
fd = os.open(os_helper.TESTFN, os.O_RDONLY)
|
||||||
w = asyncore.file_wrapper(fd)
|
w = asyncore.file_wrapper(fd)
|
||||||
os.close(fd)
|
os.close(fd)
|
||||||
|
|
||||||
|
@ -390,20 +392,20 @@ class FileWrapperTest(unittest.TestCase):
|
||||||
def test_send(self):
|
def test_send(self):
|
||||||
d1 = b"Come again?"
|
d1 = b"Come again?"
|
||||||
d2 = b"I want to buy some cheese."
|
d2 = b"I want to buy some cheese."
|
||||||
fd = os.open(support.TESTFN, os.O_WRONLY | os.O_APPEND)
|
fd = os.open(os_helper.TESTFN, os.O_WRONLY | os.O_APPEND)
|
||||||
w = asyncore.file_wrapper(fd)
|
w = asyncore.file_wrapper(fd)
|
||||||
os.close(fd)
|
os.close(fd)
|
||||||
|
|
||||||
w.write(d1)
|
w.write(d1)
|
||||||
w.send(d2)
|
w.send(d2)
|
||||||
w.close()
|
w.close()
|
||||||
with open(support.TESTFN, 'rb') as file:
|
with open(os_helper.TESTFN, 'rb') as file:
|
||||||
self.assertEqual(file.read(), self.d + d1 + d2)
|
self.assertEqual(file.read(), self.d + d1 + d2)
|
||||||
|
|
||||||
@unittest.skipUnless(hasattr(asyncore, 'file_dispatcher'),
|
@unittest.skipUnless(hasattr(asyncore, 'file_dispatcher'),
|
||||||
'asyncore.file_dispatcher required')
|
'asyncore.file_dispatcher required')
|
||||||
def test_dispatcher(self):
|
def test_dispatcher(self):
|
||||||
fd = os.open(support.TESTFN, os.O_RDONLY)
|
fd = os.open(os_helper.TESTFN, os.O_RDONLY)
|
||||||
data = []
|
data = []
|
||||||
class FileDispatcher(asyncore.file_dispatcher):
|
class FileDispatcher(asyncore.file_dispatcher):
|
||||||
def handle_read(self):
|
def handle_read(self):
|
||||||
|
@ -415,16 +417,16 @@ class FileWrapperTest(unittest.TestCase):
|
||||||
|
|
||||||
def test_resource_warning(self):
|
def test_resource_warning(self):
|
||||||
# Issue #11453
|
# Issue #11453
|
||||||
fd = os.open(support.TESTFN, os.O_RDONLY)
|
fd = os.open(os_helper.TESTFN, os.O_RDONLY)
|
||||||
f = asyncore.file_wrapper(fd)
|
f = asyncore.file_wrapper(fd)
|
||||||
|
|
||||||
os.close(fd)
|
os.close(fd)
|
||||||
with support.check_warnings(('', ResourceWarning)):
|
with warnings_helper.check_warnings(('', ResourceWarning)):
|
||||||
f = None
|
f = None
|
||||||
support.gc_collect()
|
support.gc_collect()
|
||||||
|
|
||||||
def test_close_twice(self):
|
def test_close_twice(self):
|
||||||
fd = os.open(support.TESTFN, os.O_RDONLY)
|
fd = os.open(os_helper.TESTFN, os.O_RDONLY)
|
||||||
f = asyncore.file_wrapper(fd)
|
f = asyncore.file_wrapper(fd)
|
||||||
os.close(fd)
|
os.close(fd)
|
||||||
|
|
||||||
|
@ -804,10 +806,10 @@ class TestAPI_UseIPv6Sockets(BaseTestAPI):
|
||||||
class TestAPI_UseUnixSockets(BaseTestAPI):
|
class TestAPI_UseUnixSockets(BaseTestAPI):
|
||||||
if HAS_UNIX_SOCKETS:
|
if HAS_UNIX_SOCKETS:
|
||||||
family = socket.AF_UNIX
|
family = socket.AF_UNIX
|
||||||
addr = support.TESTFN
|
addr = os_helper.TESTFN
|
||||||
|
|
||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
support.unlink(self.addr)
|
os_helper.unlink(self.addr)
|
||||||
BaseTestAPI.tearDown(self)
|
BaseTestAPI.tearDown(self)
|
||||||
|
|
||||||
class TestAPI_UseIPv4Select(TestAPI_UseIPv4Sockets, unittest.TestCase):
|
class TestAPI_UseIPv4Select(TestAPI_UseIPv4Sockets, unittest.TestCase):
|
||||||
|
|
|
@ -4,7 +4,8 @@ import unittest
|
||||||
import binascii
|
import binascii
|
||||||
import array
|
import array
|
||||||
import re
|
import re
|
||||||
from test import support
|
from test.support import warnings_helper
|
||||||
|
|
||||||
|
|
||||||
# Note: "*_hex" functions are aliases for "(un)hexlify"
|
# Note: "*_hex" functions are aliases for "(un)hexlify"
|
||||||
b2a_functions = ['b2a_base64', 'b2a_hex', 'b2a_hqx', 'b2a_qp', 'b2a_uu',
|
b2a_functions = ['b2a_base64', 'b2a_hex', 'b2a_hqx', 'b2a_qp', 'b2a_uu',
|
||||||
|
@ -37,7 +38,7 @@ class BinASCIITest(unittest.TestCase):
|
||||||
self.assertTrue(hasattr(getattr(binascii, name), '__call__'))
|
self.assertTrue(hasattr(getattr(binascii, name), '__call__'))
|
||||||
self.assertRaises(TypeError, getattr(binascii, name))
|
self.assertRaises(TypeError, getattr(binascii, name))
|
||||||
|
|
||||||
@support.ignore_warnings(category=DeprecationWarning)
|
@warnings_helper.ignore_warnings(category=DeprecationWarning)
|
||||||
def test_returned_value(self):
|
def test_returned_value(self):
|
||||||
# Limit to the minimum of all limits (b2a_uu)
|
# Limit to the minimum of all limits (b2a_uu)
|
||||||
MAX_ALL = 45
|
MAX_ALL = 45
|
||||||
|
@ -181,7 +182,7 @@ class BinASCIITest(unittest.TestCase):
|
||||||
with self.assertRaises(TypeError):
|
with self.assertRaises(TypeError):
|
||||||
binascii.b2a_uu(b"", True)
|
binascii.b2a_uu(b"", True)
|
||||||
|
|
||||||
@support.ignore_warnings(category=DeprecationWarning)
|
@warnings_helper.ignore_warnings(category=DeprecationWarning)
|
||||||
def test_crc_hqx(self):
|
def test_crc_hqx(self):
|
||||||
crc = binascii.crc_hqx(self.type2test(b"Test the CRC-32 of"), 0)
|
crc = binascii.crc_hqx(self.type2test(b"Test the CRC-32 of"), 0)
|
||||||
crc = binascii.crc_hqx(self.type2test(b" this string."), crc)
|
crc = binascii.crc_hqx(self.type2test(b" this string."), crc)
|
||||||
|
@ -201,7 +202,7 @@ class BinASCIITest(unittest.TestCase):
|
||||||
|
|
||||||
self.assertRaises(TypeError, binascii.crc32)
|
self.assertRaises(TypeError, binascii.crc32)
|
||||||
|
|
||||||
@support.ignore_warnings(category=DeprecationWarning)
|
@warnings_helper.ignore_warnings(category=DeprecationWarning)
|
||||||
def test_hqx(self):
|
def test_hqx(self):
|
||||||
# Perform binhex4 style RLE-compression
|
# Perform binhex4 style RLE-compression
|
||||||
# Then calculate the hexbin4 binary-to-ASCII translation
|
# Then calculate the hexbin4 binary-to-ASCII translation
|
||||||
|
@ -212,7 +213,7 @@ class BinASCIITest(unittest.TestCase):
|
||||||
res = binascii.rledecode_hqx(b)
|
res = binascii.rledecode_hqx(b)
|
||||||
self.assertEqual(res, self.rawdata)
|
self.assertEqual(res, self.rawdata)
|
||||||
|
|
||||||
@support.ignore_warnings(category=DeprecationWarning)
|
@warnings_helper.ignore_warnings(category=DeprecationWarning)
|
||||||
def test_rle(self):
|
def test_rle(self):
|
||||||
# test repetition with a repetition longer than the limit of 255
|
# test repetition with a repetition longer than the limit of 255
|
||||||
data = (b'a' * 100 + b'b' + b'c' * 300)
|
data = (b'a' * 100 + b'b' + b'c' * 300)
|
||||||
|
@ -359,7 +360,7 @@ class BinASCIITest(unittest.TestCase):
|
||||||
self.assertEqual(b2a_qp(type2test(b'a.\n')), b'a.\n')
|
self.assertEqual(b2a_qp(type2test(b'a.\n')), b'a.\n')
|
||||||
self.assertEqual(b2a_qp(type2test(b'.a')[:-1]), b'=2E')
|
self.assertEqual(b2a_qp(type2test(b'.a')[:-1]), b'=2E')
|
||||||
|
|
||||||
@support.ignore_warnings(category=DeprecationWarning)
|
@warnings_helper.ignore_warnings(category=DeprecationWarning)
|
||||||
def test_empty_string(self):
|
def test_empty_string(self):
|
||||||
# A test for SF bug #1022953. Make sure SystemError is not raised.
|
# A test for SF bug #1022953. Make sure SystemError is not raised.
|
||||||
empty = self.type2test(b'')
|
empty = self.type2test(b'')
|
||||||
|
@ -384,7 +385,7 @@ class BinASCIITest(unittest.TestCase):
|
||||||
# crc_hqx needs 2 arguments
|
# crc_hqx needs 2 arguments
|
||||||
self.assertRaises(TypeError, binascii.crc_hqx, "test", 0)
|
self.assertRaises(TypeError, binascii.crc_hqx, "test", 0)
|
||||||
|
|
||||||
@support.ignore_warnings(category=DeprecationWarning)
|
@warnings_helper.ignore_warnings(category=DeprecationWarning)
|
||||||
def test_unicode_a2b(self):
|
def test_unicode_a2b(self):
|
||||||
# Unicode strings are accepted by a2b_* functions.
|
# Unicode strings are accepted by a2b_* functions.
|
||||||
MAX_ALL = 45
|
MAX_ALL = 45
|
||||||
|
|
|
@ -1,10 +1,11 @@
|
||||||
import sys
|
import sys
|
||||||
import unittest
|
import unittest
|
||||||
from test import support
|
from test.support import import_helper
|
||||||
from collections import UserList
|
from collections import UserList
|
||||||
|
|
||||||
py_bisect = support.import_fresh_module('bisect', blocked=['_bisect'])
|
|
||||||
c_bisect = support.import_fresh_module('bisect', fresh=['_bisect'])
|
py_bisect = import_helper.import_fresh_module('bisect', blocked=['_bisect'])
|
||||||
|
c_bisect = import_helper.import_fresh_module('bisect', fresh=['_bisect'])
|
||||||
|
|
||||||
class Range(object):
|
class Range(object):
|
||||||
"""A trivial range()-like object that has an insert() method."""
|
"""A trivial range()-like object that has an insert() method."""
|
||||||
|
|
|
@ -26,10 +26,10 @@ from textwrap import dedent
|
||||||
from types import AsyncGeneratorType, FunctionType
|
from types import AsyncGeneratorType, FunctionType
|
||||||
from operator import neg
|
from operator import neg
|
||||||
from test import support
|
from test import support
|
||||||
from test.support import (
|
from test.support import (swap_attr, maybe_get_event_loop_policy)
|
||||||
EnvironmentVarGuard, TESTFN, check_warnings, swap_attr, unlink,
|
from test.support.os_helper import (EnvironmentVarGuard, TESTFN, unlink)
|
||||||
maybe_get_event_loop_policy)
|
|
||||||
from test.support.script_helper import assert_python_ok
|
from test.support.script_helper import assert_python_ok
|
||||||
|
from test.support.warnings_helper import check_warnings
|
||||||
from unittest.mock import MagicMock, patch
|
from unittest.mock import MagicMock, patch
|
||||||
try:
|
try:
|
||||||
import pty, signal
|
import pty, signal
|
||||||
|
|
|
@ -1,8 +1,9 @@
|
||||||
from test import support
|
from test import support
|
||||||
|
from test.support import import_helper
|
||||||
from test.support import threading_helper
|
from test.support import threading_helper
|
||||||
|
|
||||||
# Skip tests if _multiprocessing wasn't built.
|
# Skip tests if _multiprocessing wasn't built.
|
||||||
support.import_module('_multiprocessing')
|
import_helper.import_module('_multiprocessing')
|
||||||
# Skip tests if sem_open implementation is broken.
|
# Skip tests if sem_open implementation is broken.
|
||||||
support.skip_if_broken_multiprocessing_synchronize()
|
support.skip_if_broken_multiprocessing_synchronize()
|
||||||
|
|
||||||
|
|
|
@ -8,6 +8,7 @@ import unittest
|
||||||
import warnings
|
import warnings
|
||||||
|
|
||||||
from test import support
|
from test import support
|
||||||
|
from test.support import os_helper
|
||||||
|
|
||||||
|
|
||||||
class SortedDict(collections.UserDict):
|
class SortedDict(collections.UserDict):
|
||||||
|
@ -1063,17 +1064,17 @@ class MultilineValuesTestCase(BasicTestCase, unittest.TestCase):
|
||||||
cf.add_section(s)
|
cf.add_section(s)
|
||||||
for j in range(10):
|
for j in range(10):
|
||||||
cf.set(s, 'lovely_spam{}'.format(j), self.wonderful_spam)
|
cf.set(s, 'lovely_spam{}'.format(j), self.wonderful_spam)
|
||||||
with open(support.TESTFN, 'w') as f:
|
with open(os_helper.TESTFN, 'w') as f:
|
||||||
cf.write(f)
|
cf.write(f)
|
||||||
|
|
||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
os.unlink(support.TESTFN)
|
os.unlink(os_helper.TESTFN)
|
||||||
|
|
||||||
def test_dominating_multiline_values(self):
|
def test_dominating_multiline_values(self):
|
||||||
# We're reading from file because this is where the code changed
|
# We're reading from file because this is where the code changed
|
||||||
# during performance updates in Python 3.2
|
# during performance updates in Python 3.2
|
||||||
cf_from_file = self.newconfig()
|
cf_from_file = self.newconfig()
|
||||||
with open(support.TESTFN) as f:
|
with open(os_helper.TESTFN) as f:
|
||||||
cf_from_file.read_file(f)
|
cf_from_file.read_file(f)
|
||||||
self.assertEqual(cf_from_file.get('section8', 'lovely_spam4'),
|
self.assertEqual(cf_from_file.get('section8', 'lovely_spam4'),
|
||||||
self.wonderful_spam.replace('\t\n', '\n'))
|
self.wonderful_spam.replace('\t\n', '\n'))
|
||||||
|
|
|
@ -7,6 +7,8 @@ import types
|
||||||
import unittest
|
import unittest
|
||||||
import warnings
|
import warnings
|
||||||
from test import support
|
from test import support
|
||||||
|
from test.support import import_helper
|
||||||
|
from test.support import warnings_helper
|
||||||
from test.support.script_helper import assert_python_ok
|
from test.support.script_helper import assert_python_ok
|
||||||
|
|
||||||
|
|
||||||
|
@ -2117,7 +2119,7 @@ class CoroAsyncIOCompatTest(unittest.TestCase):
|
||||||
def test_asyncio_1(self):
|
def test_asyncio_1(self):
|
||||||
# asyncio cannot be imported when Python is compiled without thread
|
# asyncio cannot be imported when Python is compiled without thread
|
||||||
# support
|
# support
|
||||||
asyncio = support.import_module('asyncio')
|
asyncio = import_helper.import_module('asyncio')
|
||||||
|
|
||||||
class MyException(Exception):
|
class MyException(Exception):
|
||||||
pass
|
pass
|
||||||
|
@ -2258,7 +2260,8 @@ class OriginTrackingTest(unittest.TestCase):
|
||||||
try:
|
try:
|
||||||
warnings._warn_unawaited_coroutine = lambda coro: 1/0
|
warnings._warn_unawaited_coroutine = lambda coro: 1/0
|
||||||
with support.catch_unraisable_exception() as cm, \
|
with support.catch_unraisable_exception() as cm, \
|
||||||
support.check_warnings((r'coroutine .* was never awaited',
|
warnings_helper.check_warnings(
|
||||||
|
(r'coroutine .* was never awaited',
|
||||||
RuntimeWarning)):
|
RuntimeWarning)):
|
||||||
# only store repr() to avoid keeping the coroutine alive
|
# only store repr() to avoid keeping the coroutine alive
|
||||||
coro = corofn()
|
coro = corofn()
|
||||||
|
@ -2272,8 +2275,8 @@ class OriginTrackingTest(unittest.TestCase):
|
||||||
self.assertEqual(cm.unraisable.exc_type, ZeroDivisionError)
|
self.assertEqual(cm.unraisable.exc_type, ZeroDivisionError)
|
||||||
|
|
||||||
del warnings._warn_unawaited_coroutine
|
del warnings._warn_unawaited_coroutine
|
||||||
with support.check_warnings((r'coroutine .* was never awaited',
|
with warnings_helper.check_warnings(
|
||||||
RuntimeWarning)):
|
(r'coroutine .* was never awaited', RuntimeWarning)):
|
||||||
corofn()
|
corofn()
|
||||||
support.gc_collect()
|
support.gc_collect()
|
||||||
|
|
||||||
|
|
|
@ -15,7 +15,8 @@ import sys
|
||||||
import tempfile
|
import tempfile
|
||||||
import unittest
|
import unittest
|
||||||
|
|
||||||
from test.support import requires, import_module, verbose, SaveSignals
|
from test.support import requires, verbose, SaveSignals
|
||||||
|
from test.support.import_helper import import_module
|
||||||
|
|
||||||
# Optionally test curses module. This currently requires that the
|
# Optionally test curses module. This currently requires that the
|
||||||
# 'curses' resource be given on the regrtest command line using the -u
|
# 'curses' resource be given on the regrtest command line using the -u
|
||||||
|
|
|
@ -1,7 +1,9 @@
|
||||||
import unittest
|
import unittest
|
||||||
import sys
|
import sys
|
||||||
|
|
||||||
from test.support import import_fresh_module, run_unittest
|
from test.support import run_unittest
|
||||||
|
from test.support.import_helper import import_fresh_module
|
||||||
|
|
||||||
|
|
||||||
TESTS = 'test.datetimetester'
|
TESTS = 'test.datetimetester'
|
||||||
|
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
from test.support import (TESTFN, import_module, unlink,
|
from test.support import (requires, _2G, _4G, gc_collect, cpython_only)
|
||||||
requires, _2G, _4G, gc_collect, cpython_only)
|
from test.support.import_helper import import_module
|
||||||
|
from test.support.os_helper import TESTFN, unlink
|
||||||
import unittest
|
import unittest
|
||||||
import os
|
import os
|
||||||
import re
|
import re
|
||||||
|
|
|
@ -9,6 +9,7 @@ import sys
|
||||||
import time
|
import time
|
||||||
import unittest
|
import unittest
|
||||||
from test import support
|
from test import support
|
||||||
|
from test.support import os_helper
|
||||||
from test.support.script_helper import assert_python_ok, spawn_python
|
from test.support.script_helper import assert_python_ok, spawn_python
|
||||||
try:
|
try:
|
||||||
import _testcapi
|
import _testcapi
|
||||||
|
@ -154,7 +155,7 @@ class WakeupFDTests(unittest.TestCase):
|
||||||
signal.set_wakeup_fd(signal.SIGINT, False)
|
signal.set_wakeup_fd(signal.SIGINT, False)
|
||||||
|
|
||||||
def test_invalid_fd(self):
|
def test_invalid_fd(self):
|
||||||
fd = support.make_bad_fd()
|
fd = os_helper.make_bad_fd()
|
||||||
self.assertRaises((ValueError, OSError),
|
self.assertRaises((ValueError, OSError),
|
||||||
signal.set_wakeup_fd, fd)
|
signal.set_wakeup_fd, fd)
|
||||||
|
|
||||||
|
|
|
@ -4,8 +4,11 @@ import sys
|
||||||
import unittest
|
import unittest
|
||||||
import unittest.mock
|
import unittest.mock
|
||||||
from test import support
|
from test import support
|
||||||
|
from test.support import import_helper
|
||||||
|
from test.support import os_helper
|
||||||
from test.support import socket_helper
|
from test.support import socket_helper
|
||||||
from test.support import threading_helper
|
from test.support import threading_helper
|
||||||
|
from test.support import warnings_helper
|
||||||
import socket
|
import socket
|
||||||
import select
|
import select
|
||||||
import time
|
import time
|
||||||
|
@ -27,7 +30,7 @@ try:
|
||||||
except ImportError:
|
except ImportError:
|
||||||
ctypes = None
|
ctypes = None
|
||||||
|
|
||||||
ssl = support.import_module("ssl")
|
ssl = import_helper.import_module("ssl")
|
||||||
|
|
||||||
from ssl import TLSVersion, _TLSContentType, _TLSMessageType
|
from ssl import TLSVersion, _TLSContentType, _TLSMessageType
|
||||||
|
|
||||||
|
@ -571,7 +574,7 @@ class BasicSocketTests(unittest.TestCase):
|
||||||
s = socket.socket(socket.AF_INET)
|
s = socket.socket(socket.AF_INET)
|
||||||
ss = test_wrap_socket(s)
|
ss = test_wrap_socket(s)
|
||||||
wr = weakref.ref(ss)
|
wr = weakref.ref(ss)
|
||||||
with support.check_warnings(("", ResourceWarning)):
|
with warnings_helper.check_warnings(("", ResourceWarning)):
|
||||||
del ss
|
del ss
|
||||||
self.assertEqual(wr(), None)
|
self.assertEqual(wr(), None)
|
||||||
|
|
||||||
|
@ -893,7 +896,7 @@ class BasicSocketTests(unittest.TestCase):
|
||||||
self.assertEqual(len(paths), 6)
|
self.assertEqual(len(paths), 6)
|
||||||
self.assertIsInstance(paths, ssl.DefaultVerifyPaths)
|
self.assertIsInstance(paths, ssl.DefaultVerifyPaths)
|
||||||
|
|
||||||
with support.EnvironmentVarGuard() as env:
|
with os_helper.EnvironmentVarGuard() as env:
|
||||||
env["SSL_CERT_DIR"] = CAPATH
|
env["SSL_CERT_DIR"] = CAPATH
|
||||||
env["SSL_CERT_FILE"] = CERTFILE
|
env["SSL_CERT_FILE"] = CERTFILE
|
||||||
paths = ssl.get_default_verify_paths()
|
paths = ssl.get_default_verify_paths()
|
||||||
|
@ -1605,7 +1608,7 @@ class ContextTests(unittest.TestCase):
|
||||||
@unittest.skipIf(IS_LIBRESSL, "LibreSSL doesn't support env vars")
|
@unittest.skipIf(IS_LIBRESSL, "LibreSSL doesn't support env vars")
|
||||||
def test_load_default_certs_env(self):
|
def test_load_default_certs_env(self):
|
||||||
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
||||||
with support.EnvironmentVarGuard() as env:
|
with os_helper.EnvironmentVarGuard() as env:
|
||||||
env["SSL_CERT_DIR"] = CAPATH
|
env["SSL_CERT_DIR"] = CAPATH
|
||||||
env["SSL_CERT_FILE"] = CERTFILE
|
env["SSL_CERT_FILE"] = CERTFILE
|
||||||
ctx.load_default_certs()
|
ctx.load_default_certs()
|
||||||
|
@ -1619,7 +1622,7 @@ class ContextTests(unittest.TestCase):
|
||||||
stats = ctx.cert_store_stats()
|
stats = ctx.cert_store_stats()
|
||||||
|
|
||||||
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
||||||
with support.EnvironmentVarGuard() as env:
|
with os_helper.EnvironmentVarGuard() as env:
|
||||||
env["SSL_CERT_DIR"] = CAPATH
|
env["SSL_CERT_DIR"] = CAPATH
|
||||||
env["SSL_CERT_FILE"] = CERTFILE
|
env["SSL_CERT_FILE"] = CERTFILE
|
||||||
ctx.load_default_certs()
|
ctx.load_default_certs()
|
||||||
|
@ -4266,9 +4269,9 @@ class ThreadedTests(unittest.TestCase):
|
||||||
|
|
||||||
def test_sendfile(self):
|
def test_sendfile(self):
|
||||||
TEST_DATA = b"x" * 512
|
TEST_DATA = b"x" * 512
|
||||||
with open(support.TESTFN, 'wb') as f:
|
with open(os_helper.TESTFN, 'wb') as f:
|
||||||
f.write(TEST_DATA)
|
f.write(TEST_DATA)
|
||||||
self.addCleanup(support.unlink, support.TESTFN)
|
self.addCleanup(os_helper.unlink, os_helper.TESTFN)
|
||||||
context = ssl.SSLContext(ssl.PROTOCOL_TLS)
|
context = ssl.SSLContext(ssl.PROTOCOL_TLS)
|
||||||
context.verify_mode = ssl.CERT_REQUIRED
|
context.verify_mode = ssl.CERT_REQUIRED
|
||||||
context.load_verify_locations(SIGNING_CA)
|
context.load_verify_locations(SIGNING_CA)
|
||||||
|
@ -4277,7 +4280,7 @@ class ThreadedTests(unittest.TestCase):
|
||||||
with server:
|
with server:
|
||||||
with context.wrap_socket(socket.socket()) as s:
|
with context.wrap_socket(socket.socket()) as s:
|
||||||
s.connect((HOST, server.port))
|
s.connect((HOST, server.port))
|
||||||
with open(support.TESTFN, 'rb') as file:
|
with open(os_helper.TESTFN, 'rb') as file:
|
||||||
s.sendfile(file)
|
s.sendfile(file)
|
||||||
self.assertEqual(s.recv(1024), TEST_DATA)
|
self.assertEqual(s.recv(1024), TEST_DATA)
|
||||||
|
|
||||||
|
@ -4603,21 +4606,21 @@ requires_keylog = unittest.skipUnless(
|
||||||
|
|
||||||
class TestSSLDebug(unittest.TestCase):
|
class TestSSLDebug(unittest.TestCase):
|
||||||
|
|
||||||
def keylog_lines(self, fname=support.TESTFN):
|
def keylog_lines(self, fname=os_helper.TESTFN):
|
||||||
with open(fname) as f:
|
with open(fname) as f:
|
||||||
return len(list(f))
|
return len(list(f))
|
||||||
|
|
||||||
@requires_keylog
|
@requires_keylog
|
||||||
@unittest.skipIf(Py_DEBUG_WIN32, "Avoid mixing debug/release CRT on Windows")
|
@unittest.skipIf(Py_DEBUG_WIN32, "Avoid mixing debug/release CRT on Windows")
|
||||||
def test_keylog_defaults(self):
|
def test_keylog_defaults(self):
|
||||||
self.addCleanup(support.unlink, support.TESTFN)
|
self.addCleanup(os_helper.unlink, os_helper.TESTFN)
|
||||||
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
||||||
self.assertEqual(ctx.keylog_filename, None)
|
self.assertEqual(ctx.keylog_filename, None)
|
||||||
|
|
||||||
self.assertFalse(os.path.isfile(support.TESTFN))
|
self.assertFalse(os.path.isfile(os_helper.TESTFN))
|
||||||
ctx.keylog_filename = support.TESTFN
|
ctx.keylog_filename = os_helper.TESTFN
|
||||||
self.assertEqual(ctx.keylog_filename, support.TESTFN)
|
self.assertEqual(ctx.keylog_filename, os_helper.TESTFN)
|
||||||
self.assertTrue(os.path.isfile(support.TESTFN))
|
self.assertTrue(os.path.isfile(os_helper.TESTFN))
|
||||||
self.assertEqual(self.keylog_lines(), 1)
|
self.assertEqual(self.keylog_lines(), 1)
|
||||||
|
|
||||||
ctx.keylog_filename = None
|
ctx.keylog_filename = None
|
||||||
|
@ -4626,7 +4629,7 @@ class TestSSLDebug(unittest.TestCase):
|
||||||
with self.assertRaises((IsADirectoryError, PermissionError)):
|
with self.assertRaises((IsADirectoryError, PermissionError)):
|
||||||
# Windows raises PermissionError
|
# Windows raises PermissionError
|
||||||
ctx.keylog_filename = os.path.dirname(
|
ctx.keylog_filename = os.path.dirname(
|
||||||
os.path.abspath(support.TESTFN))
|
os.path.abspath(os_helper.TESTFN))
|
||||||
|
|
||||||
with self.assertRaises(TypeError):
|
with self.assertRaises(TypeError):
|
||||||
ctx.keylog_filename = 1
|
ctx.keylog_filename = 1
|
||||||
|
@ -4634,10 +4637,10 @@ class TestSSLDebug(unittest.TestCase):
|
||||||
@requires_keylog
|
@requires_keylog
|
||||||
@unittest.skipIf(Py_DEBUG_WIN32, "Avoid mixing debug/release CRT on Windows")
|
@unittest.skipIf(Py_DEBUG_WIN32, "Avoid mixing debug/release CRT on Windows")
|
||||||
def test_keylog_filename(self):
|
def test_keylog_filename(self):
|
||||||
self.addCleanup(support.unlink, support.TESTFN)
|
self.addCleanup(os_helper.unlink, os_helper.TESTFN)
|
||||||
client_context, server_context, hostname = testing_context()
|
client_context, server_context, hostname = testing_context()
|
||||||
|
|
||||||
client_context.keylog_filename = support.TESTFN
|
client_context.keylog_filename = os_helper.TESTFN
|
||||||
server = ThreadedEchoServer(context=server_context, chatty=False)
|
server = ThreadedEchoServer(context=server_context, chatty=False)
|
||||||
with server:
|
with server:
|
||||||
with client_context.wrap_socket(socket.socket(),
|
with client_context.wrap_socket(socket.socket(),
|
||||||
|
@ -4647,7 +4650,7 @@ class TestSSLDebug(unittest.TestCase):
|
||||||
self.assertEqual(self.keylog_lines(), 6)
|
self.assertEqual(self.keylog_lines(), 6)
|
||||||
|
|
||||||
client_context.keylog_filename = None
|
client_context.keylog_filename = None
|
||||||
server_context.keylog_filename = support.TESTFN
|
server_context.keylog_filename = os_helper.TESTFN
|
||||||
server = ThreadedEchoServer(context=server_context, chatty=False)
|
server = ThreadedEchoServer(context=server_context, chatty=False)
|
||||||
with server:
|
with server:
|
||||||
with client_context.wrap_socket(socket.socket(),
|
with client_context.wrap_socket(socket.socket(),
|
||||||
|
@ -4655,8 +4658,8 @@ class TestSSLDebug(unittest.TestCase):
|
||||||
s.connect((HOST, server.port))
|
s.connect((HOST, server.port))
|
||||||
self.assertGreaterEqual(self.keylog_lines(), 11)
|
self.assertGreaterEqual(self.keylog_lines(), 11)
|
||||||
|
|
||||||
client_context.keylog_filename = support.TESTFN
|
client_context.keylog_filename = os_helper.TESTFN
|
||||||
server_context.keylog_filename = support.TESTFN
|
server_context.keylog_filename = os_helper.TESTFN
|
||||||
server = ThreadedEchoServer(context=server_context, chatty=False)
|
server = ThreadedEchoServer(context=server_context, chatty=False)
|
||||||
with server:
|
with server:
|
||||||
with client_context.wrap_socket(socket.socket(),
|
with client_context.wrap_socket(socket.socket(),
|
||||||
|
@ -4672,19 +4675,19 @@ class TestSSLDebug(unittest.TestCase):
|
||||||
"test is not compatible with ignore_environment")
|
"test is not compatible with ignore_environment")
|
||||||
@unittest.skipIf(Py_DEBUG_WIN32, "Avoid mixing debug/release CRT on Windows")
|
@unittest.skipIf(Py_DEBUG_WIN32, "Avoid mixing debug/release CRT on Windows")
|
||||||
def test_keylog_env(self):
|
def test_keylog_env(self):
|
||||||
self.addCleanup(support.unlink, support.TESTFN)
|
self.addCleanup(os_helper.unlink, os_helper.TESTFN)
|
||||||
with unittest.mock.patch.dict(os.environ):
|
with unittest.mock.patch.dict(os.environ):
|
||||||
os.environ['SSLKEYLOGFILE'] = support.TESTFN
|
os.environ['SSLKEYLOGFILE'] = os_helper.TESTFN
|
||||||
self.assertEqual(os.environ['SSLKEYLOGFILE'], support.TESTFN)
|
self.assertEqual(os.environ['SSLKEYLOGFILE'], os_helper.TESTFN)
|
||||||
|
|
||||||
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
||||||
self.assertEqual(ctx.keylog_filename, None)
|
self.assertEqual(ctx.keylog_filename, None)
|
||||||
|
|
||||||
ctx = ssl.create_default_context()
|
ctx = ssl.create_default_context()
|
||||||
self.assertEqual(ctx.keylog_filename, support.TESTFN)
|
self.assertEqual(ctx.keylog_filename, os_helper.TESTFN)
|
||||||
|
|
||||||
ctx = ssl._create_stdlib_context()
|
ctx = ssl._create_stdlib_context()
|
||||||
self.assertEqual(ctx.keylog_filename, support.TESTFN)
|
self.assertEqual(ctx.keylog_filename, os_helper.TESTFN)
|
||||||
|
|
||||||
def test_msg_callback(self):
|
def test_msg_callback(self):
|
||||||
client_context, server_context, hostname = testing_context()
|
client_context, server_context, hostname = testing_context()
|
||||||
|
|
|
@ -11,6 +11,7 @@ import unittest.mock
|
||||||
import tarfile
|
import tarfile
|
||||||
|
|
||||||
from test import support
|
from test import support
|
||||||
|
from test.support import os_helper
|
||||||
from test.support import script_helper
|
from test.support import script_helper
|
||||||
|
|
||||||
# Check for our compression modules.
|
# Check for our compression modules.
|
||||||
|
@ -30,7 +31,7 @@ except ImportError:
|
||||||
def sha256sum(data):
|
def sha256sum(data):
|
||||||
return sha256(data).hexdigest()
|
return sha256(data).hexdigest()
|
||||||
|
|
||||||
TEMPDIR = os.path.abspath(support.TESTFN) + "-tardir"
|
TEMPDIR = os.path.abspath(os_helper.TESTFN) + "-tardir"
|
||||||
tarextdir = TEMPDIR + '-extract-test'
|
tarextdir = TEMPDIR + '-extract-test'
|
||||||
tarname = support.findfile("testtar.tar")
|
tarname = support.findfile("testtar.tar")
|
||||||
gzipname = os.path.join(TEMPDIR, "testtar.tar.gz")
|
gzipname = os.path.join(TEMPDIR, "testtar.tar.gz")
|
||||||
|
@ -575,21 +576,21 @@ class MiscReadTestBase(CommonReadTest):
|
||||||
|
|
||||||
@unittest.skipUnless(hasattr(os, "link"),
|
@unittest.skipUnless(hasattr(os, "link"),
|
||||||
"Missing hardlink implementation")
|
"Missing hardlink implementation")
|
||||||
@support.skip_unless_symlink
|
@os_helper.skip_unless_symlink
|
||||||
def test_extract_hardlink(self):
|
def test_extract_hardlink(self):
|
||||||
# Test hardlink extraction (e.g. bug #857297).
|
# Test hardlink extraction (e.g. bug #857297).
|
||||||
with tarfile.open(tarname, errorlevel=1, encoding="iso8859-1") as tar:
|
with tarfile.open(tarname, errorlevel=1, encoding="iso8859-1") as tar:
|
||||||
tar.extract("ustar/regtype", TEMPDIR)
|
tar.extract("ustar/regtype", TEMPDIR)
|
||||||
self.addCleanup(support.unlink, os.path.join(TEMPDIR, "ustar/regtype"))
|
self.addCleanup(os_helper.unlink, os.path.join(TEMPDIR, "ustar/regtype"))
|
||||||
|
|
||||||
tar.extract("ustar/lnktype", TEMPDIR)
|
tar.extract("ustar/lnktype", TEMPDIR)
|
||||||
self.addCleanup(support.unlink, os.path.join(TEMPDIR, "ustar/lnktype"))
|
self.addCleanup(os_helper.unlink, os.path.join(TEMPDIR, "ustar/lnktype"))
|
||||||
with open(os.path.join(TEMPDIR, "ustar/lnktype"), "rb") as f:
|
with open(os.path.join(TEMPDIR, "ustar/lnktype"), "rb") as f:
|
||||||
data = f.read()
|
data = f.read()
|
||||||
self.assertEqual(sha256sum(data), sha256_regtype)
|
self.assertEqual(sha256sum(data), sha256_regtype)
|
||||||
|
|
||||||
tar.extract("ustar/symtype", TEMPDIR)
|
tar.extract("ustar/symtype", TEMPDIR)
|
||||||
self.addCleanup(support.unlink, os.path.join(TEMPDIR, "ustar/symtype"))
|
self.addCleanup(os_helper.unlink, os.path.join(TEMPDIR, "ustar/symtype"))
|
||||||
with open(os.path.join(TEMPDIR, "ustar/symtype"), "rb") as f:
|
with open(os.path.join(TEMPDIR, "ustar/symtype"), "rb") as f:
|
||||||
data = f.read()
|
data = f.read()
|
||||||
self.assertEqual(sha256sum(data), sha256_regtype)
|
self.assertEqual(sha256sum(data), sha256_regtype)
|
||||||
|
@ -622,7 +623,7 @@ class MiscReadTestBase(CommonReadTest):
|
||||||
self.assertEqual(tarinfo.mtime, file_mtime, errmsg)
|
self.assertEqual(tarinfo.mtime, file_mtime, errmsg)
|
||||||
finally:
|
finally:
|
||||||
tar.close()
|
tar.close()
|
||||||
support.rmtree(DIR)
|
os_helper.rmtree(DIR)
|
||||||
|
|
||||||
def test_extract_directory(self):
|
def test_extract_directory(self):
|
||||||
dirtype = "ustar/dirtype"
|
dirtype = "ustar/dirtype"
|
||||||
|
@ -637,11 +638,11 @@ class MiscReadTestBase(CommonReadTest):
|
||||||
if sys.platform != "win32":
|
if sys.platform != "win32":
|
||||||
self.assertEqual(os.stat(extracted).st_mode & 0o777, 0o755)
|
self.assertEqual(os.stat(extracted).st_mode & 0o777, 0o755)
|
||||||
finally:
|
finally:
|
||||||
support.rmtree(DIR)
|
os_helper.rmtree(DIR)
|
||||||
|
|
||||||
def test_extractall_pathlike_name(self):
|
def test_extractall_pathlike_name(self):
|
||||||
DIR = pathlib.Path(TEMPDIR) / "extractall"
|
DIR = pathlib.Path(TEMPDIR) / "extractall"
|
||||||
with support.temp_dir(DIR), \
|
with os_helper.temp_dir(DIR), \
|
||||||
tarfile.open(tarname, encoding="iso8859-1") as tar:
|
tarfile.open(tarname, encoding="iso8859-1") as tar:
|
||||||
directories = [t for t in tar if t.isdir()]
|
directories = [t for t in tar if t.isdir()]
|
||||||
tar.extractall(DIR, directories)
|
tar.extractall(DIR, directories)
|
||||||
|
@ -652,7 +653,7 @@ class MiscReadTestBase(CommonReadTest):
|
||||||
def test_extract_pathlike_name(self):
|
def test_extract_pathlike_name(self):
|
||||||
dirtype = "ustar/dirtype"
|
dirtype = "ustar/dirtype"
|
||||||
DIR = pathlib.Path(TEMPDIR) / "extractall"
|
DIR = pathlib.Path(TEMPDIR) / "extractall"
|
||||||
with support.temp_dir(DIR), \
|
with os_helper.temp_dir(DIR), \
|
||||||
tarfile.open(tarname, encoding="iso8859-1") as tar:
|
tarfile.open(tarname, encoding="iso8859-1") as tar:
|
||||||
tarinfo = tar.getmember(dirtype)
|
tarinfo = tar.getmember(dirtype)
|
||||||
tar.extract(tarinfo, path=DIR)
|
tar.extract(tarinfo, path=DIR)
|
||||||
|
@ -676,7 +677,7 @@ class MiscReadTestBase(CommonReadTest):
|
||||||
else:
|
else:
|
||||||
self.fail("ReadError not raised")
|
self.fail("ReadError not raised")
|
||||||
finally:
|
finally:
|
||||||
support.unlink(empty)
|
os_helper.unlink(empty)
|
||||||
|
|
||||||
def test_parallel_iteration(self):
|
def test_parallel_iteration(self):
|
||||||
# Issue #16601: Restarting iteration over tarfile continued
|
# Issue #16601: Restarting iteration over tarfile continued
|
||||||
|
@ -1026,7 +1027,7 @@ class GNUReadTest(LongnameTest, ReadTest, unittest.TestCase):
|
||||||
fobj.write(b'x' * 4096)
|
fobj.write(b'x' * 4096)
|
||||||
fobj.truncate()
|
fobj.truncate()
|
||||||
s = os.stat(name)
|
s = os.stat(name)
|
||||||
support.unlink(name)
|
os_helper.unlink(name)
|
||||||
return (s.st_blocks * 512 < s.st_size)
|
return (s.st_blocks * 512 < s.st_size)
|
||||||
else:
|
else:
|
||||||
return False
|
return False
|
||||||
|
@ -1171,7 +1172,7 @@ class WriteTest(WriteTestBase, unittest.TestCase):
|
||||||
finally:
|
finally:
|
||||||
tar.close()
|
tar.close()
|
||||||
finally:
|
finally:
|
||||||
support.rmdir(path)
|
os_helper.rmdir(path)
|
||||||
|
|
||||||
# mock the following:
|
# mock the following:
|
||||||
# os.listdir: so we know that files are in the wrong order
|
# os.listdir: so we know that files are in the wrong order
|
||||||
|
@ -1193,9 +1194,9 @@ class WriteTest(WriteTestBase, unittest.TestCase):
|
||||||
finally:
|
finally:
|
||||||
tar.close()
|
tar.close()
|
||||||
finally:
|
finally:
|
||||||
support.unlink(os.path.join(path, "1"))
|
os_helper.unlink(os.path.join(path, "1"))
|
||||||
support.unlink(os.path.join(path, "2"))
|
os_helper.unlink(os.path.join(path, "2"))
|
||||||
support.rmdir(path)
|
os_helper.rmdir(path)
|
||||||
|
|
||||||
def test_gettarinfo_pathlike_name(self):
|
def test_gettarinfo_pathlike_name(self):
|
||||||
with tarfile.open(tmpname, self.mode) as tar:
|
with tarfile.open(tmpname, self.mode) as tar:
|
||||||
|
@ -1229,10 +1230,10 @@ class WriteTest(WriteTestBase, unittest.TestCase):
|
||||||
finally:
|
finally:
|
||||||
tar.close()
|
tar.close()
|
||||||
finally:
|
finally:
|
||||||
support.unlink(target)
|
os_helper.unlink(target)
|
||||||
support.unlink(link)
|
os_helper.unlink(link)
|
||||||
|
|
||||||
@support.skip_unless_symlink
|
@os_helper.skip_unless_symlink
|
||||||
def test_symlink_size(self):
|
def test_symlink_size(self):
|
||||||
path = os.path.join(TEMPDIR, "symlink")
|
path = os.path.join(TEMPDIR, "symlink")
|
||||||
os.symlink("link_target", path)
|
os.symlink("link_target", path)
|
||||||
|
@ -1244,7 +1245,7 @@ class WriteTest(WriteTestBase, unittest.TestCase):
|
||||||
finally:
|
finally:
|
||||||
tar.close()
|
tar.close()
|
||||||
finally:
|
finally:
|
||||||
support.unlink(path)
|
os_helper.unlink(path)
|
||||||
|
|
||||||
def test_add_self(self):
|
def test_add_self(self):
|
||||||
# Test for #1257255.
|
# Test for #1257255.
|
||||||
|
@ -1257,7 +1258,7 @@ class WriteTest(WriteTestBase, unittest.TestCase):
|
||||||
self.assertEqual(tar.getnames(), [],
|
self.assertEqual(tar.getnames(), [],
|
||||||
"added the archive to itself")
|
"added the archive to itself")
|
||||||
|
|
||||||
with support.change_cwd(TEMPDIR):
|
with os_helper.change_cwd(TEMPDIR):
|
||||||
tar.add(dstname)
|
tar.add(dstname)
|
||||||
self.assertEqual(tar.getnames(), [],
|
self.assertEqual(tar.getnames(), [],
|
||||||
"added the archive to itself")
|
"added the archive to itself")
|
||||||
|
@ -1270,7 +1271,7 @@ class WriteTest(WriteTestBase, unittest.TestCase):
|
||||||
try:
|
try:
|
||||||
for name in ("foo", "bar", "baz"):
|
for name in ("foo", "bar", "baz"):
|
||||||
name = os.path.join(tempdir, name)
|
name = os.path.join(tempdir, name)
|
||||||
support.create_empty_file(name)
|
os_helper.create_empty_file(name)
|
||||||
|
|
||||||
def filter(tarinfo):
|
def filter(tarinfo):
|
||||||
if os.path.basename(tarinfo.name) == "bar":
|
if os.path.basename(tarinfo.name) == "bar":
|
||||||
|
@ -1298,7 +1299,7 @@ class WriteTest(WriteTestBase, unittest.TestCase):
|
||||||
finally:
|
finally:
|
||||||
tar.close()
|
tar.close()
|
||||||
finally:
|
finally:
|
||||||
support.rmtree(tempdir)
|
os_helper.rmtree(tempdir)
|
||||||
|
|
||||||
# Guarantee that stored pathnames are not modified. Don't
|
# Guarantee that stored pathnames are not modified. Don't
|
||||||
# remove ./ or ../ or double slashes. Still make absolute
|
# remove ./ or ../ or double slashes. Still make absolute
|
||||||
|
@ -1309,7 +1310,7 @@ class WriteTest(WriteTestBase, unittest.TestCase):
|
||||||
# and compare the stored name with the original.
|
# and compare the stored name with the original.
|
||||||
foo = os.path.join(TEMPDIR, "foo")
|
foo = os.path.join(TEMPDIR, "foo")
|
||||||
if not dir:
|
if not dir:
|
||||||
support.create_empty_file(foo)
|
os_helper.create_empty_file(foo)
|
||||||
else:
|
else:
|
||||||
os.mkdir(foo)
|
os.mkdir(foo)
|
||||||
|
|
||||||
|
@ -1326,14 +1327,14 @@ class WriteTest(WriteTestBase, unittest.TestCase):
|
||||||
tar.close()
|
tar.close()
|
||||||
|
|
||||||
if not dir:
|
if not dir:
|
||||||
support.unlink(foo)
|
os_helper.unlink(foo)
|
||||||
else:
|
else:
|
||||||
support.rmdir(foo)
|
os_helper.rmdir(foo)
|
||||||
|
|
||||||
self.assertEqual(t.name, cmp_path or path.replace(os.sep, "/"))
|
self.assertEqual(t.name, cmp_path or path.replace(os.sep, "/"))
|
||||||
|
|
||||||
|
|
||||||
@support.skip_unless_symlink
|
@os_helper.skip_unless_symlink
|
||||||
def test_extractall_symlinks(self):
|
def test_extractall_symlinks(self):
|
||||||
# Test if extractall works properly when tarfile contains symlinks
|
# Test if extractall works properly when tarfile contains symlinks
|
||||||
tempdir = os.path.join(TEMPDIR, "testsymlinks")
|
tempdir = os.path.join(TEMPDIR, "testsymlinks")
|
||||||
|
@ -1356,8 +1357,8 @@ class WriteTest(WriteTestBase, unittest.TestCase):
|
||||||
except OSError:
|
except OSError:
|
||||||
self.fail("extractall failed with symlinked files")
|
self.fail("extractall failed with symlinked files")
|
||||||
finally:
|
finally:
|
||||||
support.unlink(temparchive)
|
os_helper.unlink(temparchive)
|
||||||
support.rmtree(tempdir)
|
os_helper.rmtree(tempdir)
|
||||||
|
|
||||||
def test_pathnames(self):
|
def test_pathnames(self):
|
||||||
self._test_pathname("foo")
|
self._test_pathname("foo")
|
||||||
|
@ -1385,7 +1386,7 @@ class WriteTest(WriteTestBase, unittest.TestCase):
|
||||||
|
|
||||||
def test_cwd(self):
|
def test_cwd(self):
|
||||||
# Test adding the current working directory.
|
# Test adding the current working directory.
|
||||||
with support.change_cwd(TEMPDIR):
|
with os_helper.change_cwd(TEMPDIR):
|
||||||
tar = tarfile.open(tmpname, self.mode)
|
tar = tarfile.open(tmpname, self.mode)
|
||||||
try:
|
try:
|
||||||
tar.add(".")
|
tar.add(".")
|
||||||
|
@ -1453,7 +1454,7 @@ class StreamWriteTest(WriteTestBase, unittest.TestCase):
|
||||||
# Test for issue #8464: Create files with correct
|
# Test for issue #8464: Create files with correct
|
||||||
# permissions.
|
# permissions.
|
||||||
if os.path.exists(tmpname):
|
if os.path.exists(tmpname):
|
||||||
support.unlink(tmpname)
|
os_helper.unlink(tmpname)
|
||||||
|
|
||||||
original_umask = os.umask(0o022)
|
original_umask = os.umask(0o022)
|
||||||
try:
|
try:
|
||||||
|
@ -1599,7 +1600,7 @@ class DeviceHeaderTest(WriteTestBase, unittest.TestCase):
|
||||||
self.assertEqual(buf_blk[device_headers], b"0000000\0" * 2)
|
self.assertEqual(buf_blk[device_headers], b"0000000\0" * 2)
|
||||||
self.assertEqual(buf_reg[device_headers], b"\0" * 16)
|
self.assertEqual(buf_reg[device_headers], b"\0" * 16)
|
||||||
finally:
|
finally:
|
||||||
support.rmtree(tempdir)
|
os_helper.rmtree(tempdir)
|
||||||
|
|
||||||
|
|
||||||
class CreateTest(WriteTestBase, unittest.TestCase):
|
class CreateTest(WriteTestBase, unittest.TestCase):
|
||||||
|
@ -1609,7 +1610,7 @@ class CreateTest(WriteTestBase, unittest.TestCase):
|
||||||
file_path = os.path.join(TEMPDIR, "spameggs42")
|
file_path = os.path.join(TEMPDIR, "spameggs42")
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
support.unlink(tmpname)
|
os_helper.unlink(tmpname)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def setUpClass(cls):
|
def setUpClass(cls):
|
||||||
|
@ -1618,7 +1619,7 @@ class CreateTest(WriteTestBase, unittest.TestCase):
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def tearDownClass(cls):
|
def tearDownClass(cls):
|
||||||
support.unlink(cls.file_path)
|
os_helper.unlink(cls.file_path)
|
||||||
|
|
||||||
def test_create(self):
|
def test_create(self):
|
||||||
with tarfile.open(tmpname, self.mode) as tobj:
|
with tarfile.open(tmpname, self.mode) as tobj:
|
||||||
|
@ -1733,8 +1734,8 @@ class HardlinkTest(unittest.TestCase):
|
||||||
|
|
||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
self.tar.close()
|
self.tar.close()
|
||||||
support.unlink(self.foo)
|
os_helper.unlink(self.foo)
|
||||||
support.unlink(self.bar)
|
os_helper.unlink(self.bar)
|
||||||
|
|
||||||
def test_add_twice(self):
|
def test_add_twice(self):
|
||||||
# The same name will be added as a REGTYPE every
|
# The same name will be added as a REGTYPE every
|
||||||
|
@ -2043,7 +2044,7 @@ class AppendTestBase:
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
self.tarname = tmpname
|
self.tarname = tmpname
|
||||||
if os.path.exists(self.tarname):
|
if os.path.exists(self.tarname):
|
||||||
support.unlink(self.tarname)
|
os_helper.unlink(self.tarname)
|
||||||
|
|
||||||
def _create_testtar(self, mode="w:"):
|
def _create_testtar(self, mode="w:"):
|
||||||
with tarfile.open(tarname, encoding="iso8859-1") as src:
|
with tarfile.open(tarname, encoding="iso8859-1") as src:
|
||||||
|
@ -2288,7 +2289,7 @@ class CommandLineTest(unittest.TestCase):
|
||||||
files = [support.findfile('tokenize_tests.txt'),
|
files = [support.findfile('tokenize_tests.txt'),
|
||||||
support.findfile('tokenize_tests-no-coding-cookie-'
|
support.findfile('tokenize_tests-no-coding-cookie-'
|
||||||
'and-utf8-bom-sig-only.txt')]
|
'and-utf8-bom-sig-only.txt')]
|
||||||
self.addCleanup(support.unlink, tar_name)
|
self.addCleanup(os_helper.unlink, tar_name)
|
||||||
with tarfile.open(tar_name, 'w') as tf:
|
with tarfile.open(tar_name, 'w') as tf:
|
||||||
for tardata in files:
|
for tardata in files:
|
||||||
tf.add(tardata, arcname=os.path.basename(tardata))
|
tf.add(tardata, arcname=os.path.basename(tardata))
|
||||||
|
@ -2334,7 +2335,7 @@ class CommandLineTest(unittest.TestCase):
|
||||||
self.assertEqual(out, b'')
|
self.assertEqual(out, b'')
|
||||||
self.assertEqual(rc, 1)
|
self.assertEqual(rc, 1)
|
||||||
finally:
|
finally:
|
||||||
support.unlink(tmpname)
|
os_helper.unlink(tmpname)
|
||||||
|
|
||||||
def test_list_command(self):
|
def test_list_command(self):
|
||||||
for tar_name in testtarnames:
|
for tar_name in testtarnames:
|
||||||
|
@ -2376,7 +2377,7 @@ class CommandLineTest(unittest.TestCase):
|
||||||
with tarfile.open(tmpname) as tar:
|
with tarfile.open(tmpname) as tar:
|
||||||
tar.getmembers()
|
tar.getmembers()
|
||||||
finally:
|
finally:
|
||||||
support.unlink(tmpname)
|
os_helper.unlink(tmpname)
|
||||||
|
|
||||||
def test_create_command_verbose(self):
|
def test_create_command_verbose(self):
|
||||||
files = [support.findfile('tokenize_tests.txt'),
|
files = [support.findfile('tokenize_tests.txt'),
|
||||||
|
@ -2390,7 +2391,7 @@ class CommandLineTest(unittest.TestCase):
|
||||||
with tarfile.open(tmpname) as tar:
|
with tarfile.open(tmpname) as tar:
|
||||||
tar.getmembers()
|
tar.getmembers()
|
||||||
finally:
|
finally:
|
||||||
support.unlink(tmpname)
|
os_helper.unlink(tmpname)
|
||||||
|
|
||||||
def test_create_command_dotless_filename(self):
|
def test_create_command_dotless_filename(self):
|
||||||
files = [support.findfile('tokenize_tests.txt')]
|
files = [support.findfile('tokenize_tests.txt')]
|
||||||
|
@ -2400,7 +2401,7 @@ class CommandLineTest(unittest.TestCase):
|
||||||
with tarfile.open(dotlessname) as tar:
|
with tarfile.open(dotlessname) as tar:
|
||||||
tar.getmembers()
|
tar.getmembers()
|
||||||
finally:
|
finally:
|
||||||
support.unlink(dotlessname)
|
os_helper.unlink(dotlessname)
|
||||||
|
|
||||||
def test_create_command_dot_started_filename(self):
|
def test_create_command_dot_started_filename(self):
|
||||||
tar_name = os.path.join(TEMPDIR, ".testtar")
|
tar_name = os.path.join(TEMPDIR, ".testtar")
|
||||||
|
@ -2411,7 +2412,7 @@ class CommandLineTest(unittest.TestCase):
|
||||||
with tarfile.open(tar_name) as tar:
|
with tarfile.open(tar_name) as tar:
|
||||||
tar.getmembers()
|
tar.getmembers()
|
||||||
finally:
|
finally:
|
||||||
support.unlink(tar_name)
|
os_helper.unlink(tar_name)
|
||||||
|
|
||||||
def test_create_command_compressed(self):
|
def test_create_command_compressed(self):
|
||||||
files = [support.findfile('tokenize_tests.txt'),
|
files = [support.findfile('tokenize_tests.txt'),
|
||||||
|
@ -2426,41 +2427,41 @@ class CommandLineTest(unittest.TestCase):
|
||||||
with filetype.taropen(tar_name) as tar:
|
with filetype.taropen(tar_name) as tar:
|
||||||
tar.getmembers()
|
tar.getmembers()
|
||||||
finally:
|
finally:
|
||||||
support.unlink(tar_name)
|
os_helper.unlink(tar_name)
|
||||||
|
|
||||||
def test_extract_command(self):
|
def test_extract_command(self):
|
||||||
self.make_simple_tarfile(tmpname)
|
self.make_simple_tarfile(tmpname)
|
||||||
for opt in '-e', '--extract':
|
for opt in '-e', '--extract':
|
||||||
try:
|
try:
|
||||||
with support.temp_cwd(tarextdir):
|
with os_helper.temp_cwd(tarextdir):
|
||||||
out = self.tarfilecmd(opt, tmpname)
|
out = self.tarfilecmd(opt, tmpname)
|
||||||
self.assertEqual(out, b'')
|
self.assertEqual(out, b'')
|
||||||
finally:
|
finally:
|
||||||
support.rmtree(tarextdir)
|
os_helper.rmtree(tarextdir)
|
||||||
|
|
||||||
def test_extract_command_verbose(self):
|
def test_extract_command_verbose(self):
|
||||||
self.make_simple_tarfile(tmpname)
|
self.make_simple_tarfile(tmpname)
|
||||||
for opt in '-v', '--verbose':
|
for opt in '-v', '--verbose':
|
||||||
try:
|
try:
|
||||||
with support.temp_cwd(tarextdir):
|
with os_helper.temp_cwd(tarextdir):
|
||||||
out = self.tarfilecmd(opt, '-e', tmpname,
|
out = self.tarfilecmd(opt, '-e', tmpname,
|
||||||
PYTHONIOENCODING='utf-8')
|
PYTHONIOENCODING='utf-8')
|
||||||
self.assertIn(b' file is extracted.', out)
|
self.assertIn(b' file is extracted.', out)
|
||||||
finally:
|
finally:
|
||||||
support.rmtree(tarextdir)
|
os_helper.rmtree(tarextdir)
|
||||||
|
|
||||||
def test_extract_command_different_directory(self):
|
def test_extract_command_different_directory(self):
|
||||||
self.make_simple_tarfile(tmpname)
|
self.make_simple_tarfile(tmpname)
|
||||||
try:
|
try:
|
||||||
with support.temp_cwd(tarextdir):
|
with os_helper.temp_cwd(tarextdir):
|
||||||
out = self.tarfilecmd('-e', tmpname, 'spamdir')
|
out = self.tarfilecmd('-e', tmpname, 'spamdir')
|
||||||
self.assertEqual(out, b'')
|
self.assertEqual(out, b'')
|
||||||
finally:
|
finally:
|
||||||
support.rmtree(tarextdir)
|
os_helper.rmtree(tarextdir)
|
||||||
|
|
||||||
def test_extract_command_invalid_file(self):
|
def test_extract_command_invalid_file(self):
|
||||||
zipname = support.findfile('zipdir.zip')
|
zipname = support.findfile('zipdir.zip')
|
||||||
with support.temp_cwd(tarextdir):
|
with os_helper.temp_cwd(tarextdir):
|
||||||
rc, out, err = self.tarfilecmd_failure('-e', zipname)
|
rc, out, err = self.tarfilecmd_failure('-e', zipname)
|
||||||
self.assertIn(b' is not a tar archive.', err)
|
self.assertIn(b' is not a tar archive.', err)
|
||||||
self.assertEqual(out, b'')
|
self.assertEqual(out, b'')
|
||||||
|
@ -2723,7 +2724,7 @@ class NumericOwnerTest(unittest.TestCase):
|
||||||
|
|
||||||
|
|
||||||
def setUpModule():
|
def setUpModule():
|
||||||
support.unlink(TEMPDIR)
|
os_helper.unlink(TEMPDIR)
|
||||||
os.makedirs(TEMPDIR)
|
os.makedirs(TEMPDIR)
|
||||||
|
|
||||||
global testtarnames
|
global testtarnames
|
||||||
|
@ -2734,14 +2735,14 @@ def setUpModule():
|
||||||
# Create compressed tarfiles.
|
# Create compressed tarfiles.
|
||||||
for c in GzipTest, Bz2Test, LzmaTest:
|
for c in GzipTest, Bz2Test, LzmaTest:
|
||||||
if c.open:
|
if c.open:
|
||||||
support.unlink(c.tarname)
|
os_helper.unlink(c.tarname)
|
||||||
testtarnames.append(c.tarname)
|
testtarnames.append(c.tarname)
|
||||||
with c.open(c.tarname, "wb") as tar:
|
with c.open(c.tarname, "wb") as tar:
|
||||||
tar.write(data)
|
tar.write(data)
|
||||||
|
|
||||||
def tearDownModule():
|
def tearDownModule():
|
||||||
if os.path.exists(TEMPDIR):
|
if os.path.exists(TEMPDIR):
|
||||||
support.rmtree(TEMPDIR)
|
os_helper.rmtree(TEMPDIR)
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
|
|
@ -4,7 +4,8 @@ Tests for the threading module.
|
||||||
|
|
||||||
import test.support
|
import test.support
|
||||||
from test.support import threading_helper
|
from test.support import threading_helper
|
||||||
from test.support import verbose, import_module, cpython_only
|
from test.support import verbose, cpython_only
|
||||||
|
from test.support.import_helper import import_module
|
||||||
from test.support.script_helper import assert_python_ok, assert_python_failure
|
from test.support.script_helper import assert_python_ok, assert_python_failure
|
||||||
|
|
||||||
import random
|
import random
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
import os
|
import os
|
||||||
import sys
|
import sys
|
||||||
from test.support import TESTFN, TESTFN_UNICODE, FS_NONASCII, rmtree, unlink, captured_stdout
|
from test.support import captured_stdout
|
||||||
|
from test.support.os_helper import (TESTFN, rmtree, unlink)
|
||||||
from test.support.script_helper import assert_python_ok, assert_python_failure
|
from test.support.script_helper import assert_python_ok, assert_python_failure
|
||||||
import textwrap
|
import textwrap
|
||||||
import unittest
|
import unittest
|
||||||
|
|
|
@ -7,6 +7,7 @@ from unittest.mock import patch
|
||||||
from test.support.script_helper import (assert_python_ok, assert_python_failure,
|
from test.support.script_helper import (assert_python_ok, assert_python_failure,
|
||||||
interpreter_requires_environment)
|
interpreter_requires_environment)
|
||||||
from test import support
|
from test import support
|
||||||
|
from test.support import os_helper
|
||||||
|
|
||||||
try:
|
try:
|
||||||
import _testcapi
|
import _testcapi
|
||||||
|
@ -287,11 +288,11 @@ class TestTracemallocEnabled(unittest.TestCase):
|
||||||
self.assertGreater(snapshot.traces[1].traceback.total_nframe, 10)
|
self.assertGreater(snapshot.traces[1].traceback.total_nframe, 10)
|
||||||
|
|
||||||
# write on disk
|
# write on disk
|
||||||
snapshot.dump(support.TESTFN)
|
snapshot.dump(os_helper.TESTFN)
|
||||||
self.addCleanup(support.unlink, support.TESTFN)
|
self.addCleanup(os_helper.unlink, os_helper.TESTFN)
|
||||||
|
|
||||||
# load from disk
|
# load from disk
|
||||||
snapshot2 = tracemalloc.Snapshot.load(support.TESTFN)
|
snapshot2 = tracemalloc.Snapshot.load(os_helper.TESTFN)
|
||||||
self.assertEqual(snapshot2.traces, snapshot.traces)
|
self.assertEqual(snapshot2.traces, snapshot.traces)
|
||||||
|
|
||||||
# tracemalloc must be tracing memory allocations to take a snapshot
|
# tracemalloc must be tracing memory allocations to take a snapshot
|
||||||
|
@ -306,11 +307,11 @@ class TestTracemallocEnabled(unittest.TestCase):
|
||||||
# take a snapshot with a new attribute
|
# take a snapshot with a new attribute
|
||||||
snapshot = tracemalloc.take_snapshot()
|
snapshot = tracemalloc.take_snapshot()
|
||||||
snapshot.test_attr = "new"
|
snapshot.test_attr = "new"
|
||||||
snapshot.dump(support.TESTFN)
|
snapshot.dump(os_helper.TESTFN)
|
||||||
self.addCleanup(support.unlink, support.TESTFN)
|
self.addCleanup(os_helper.unlink, os_helper.TESTFN)
|
||||||
|
|
||||||
# load() should recreate the attribute
|
# load() should recreate the attribute
|
||||||
snapshot2 = tracemalloc.Snapshot.load(support.TESTFN)
|
snapshot2 = tracemalloc.Snapshot.load(os_helper.TESTFN)
|
||||||
self.assertEqual(snapshot2.test_attr, "new")
|
self.assertEqual(snapshot2.test_attr, "new")
|
||||||
|
|
||||||
def fork_child(self):
|
def fork_child(self):
|
||||||
|
|
|
@ -2,7 +2,7 @@
|
||||||
import io
|
import io
|
||||||
import struct
|
import struct
|
||||||
from test import support
|
from test import support
|
||||||
from test.support import import_fresh_module
|
from test.support.import_helper import import_fresh_module
|
||||||
import types
|
import types
|
||||||
import unittest
|
import unittest
|
||||||
|
|
||||||
|
|
|
@ -5,6 +5,7 @@ import sys
|
||||||
import types
|
import types
|
||||||
import pickle
|
import pickle
|
||||||
from test import support
|
from test import support
|
||||||
|
from test.support import import_helper
|
||||||
import test.test_importlib.util
|
import test.test_importlib.util
|
||||||
|
|
||||||
import unittest
|
import unittest
|
||||||
|
@ -848,7 +849,7 @@ class TestDiscovery(unittest.TestCase):
|
||||||
|
|
||||||
with unittest.mock.patch('builtins.__import__', _import):
|
with unittest.mock.patch('builtins.__import__', _import):
|
||||||
# Since loader.discover() can modify sys.path, restore it when done.
|
# Since loader.discover() can modify sys.path, restore it when done.
|
||||||
with support.DirsOnSysPath():
|
with import_helper.DirsOnSysPath():
|
||||||
# Make sure to remove 'package' from sys.modules when done.
|
# Make sure to remove 'package' from sys.modules when done.
|
||||||
with test.test_importlib.util.uncache('package'):
|
with test.test_importlib.util.uncache('package'):
|
||||||
suite = loader.discover('package')
|
suite = loader.discover('package')
|
||||||
|
@ -865,7 +866,7 @@ class TestDiscovery(unittest.TestCase):
|
||||||
|
|
||||||
with unittest.mock.patch('builtins.__import__', _import):
|
with unittest.mock.patch('builtins.__import__', _import):
|
||||||
# Since loader.discover() can modify sys.path, restore it when done.
|
# Since loader.discover() can modify sys.path, restore it when done.
|
||||||
with support.DirsOnSysPath():
|
with import_helper.DirsOnSysPath():
|
||||||
# Make sure to remove 'package' from sys.modules when done.
|
# Make sure to remove 'package' from sys.modules when done.
|
||||||
with test.test_importlib.util.uncache('package'):
|
with test.test_importlib.util.uncache('package'):
|
||||||
with self.assertRaises(TypeError) as cm:
|
with self.assertRaises(TypeError) as cm:
|
||||||
|
|
|
@ -2,7 +2,7 @@ import io
|
||||||
import sys
|
import sys
|
||||||
import textwrap
|
import textwrap
|
||||||
|
|
||||||
from test import support
|
from test.support import warnings_helper
|
||||||
|
|
||||||
import traceback
|
import traceback
|
||||||
import unittest
|
import unittest
|
||||||
|
@ -458,8 +458,8 @@ OldResult = type('OldResult', (object,), classDict)
|
||||||
class Test_OldTestResult(unittest.TestCase):
|
class Test_OldTestResult(unittest.TestCase):
|
||||||
|
|
||||||
def assertOldResultWarning(self, test, failures):
|
def assertOldResultWarning(self, test, failures):
|
||||||
with support.check_warnings(("TestResult has no add.+ method,",
|
with warnings_helper.check_warnings(
|
||||||
RuntimeWarning)):
|
("TestResult has no add.+ method,", RuntimeWarning)):
|
||||||
result = OldResult()
|
result = OldResult()
|
||||||
test.run(result)
|
test.run(result)
|
||||||
self.assertEqual(len(result.failures), failures)
|
self.assertEqual(len(result.failures), failures)
|
||||||
|
|
Loading…
Reference in New Issue