bpo-40275: Use new test.support helper submodules in tests (GH-21219)

This commit is contained in:
Hai Shi 2020-06-30 21:46:06 +08:00 committed by GitHub
parent 3fa4799c3f
commit 3ddc634cd5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 116 additions and 94 deletions

View File

@ -1,4 +1,5 @@
from test.support import findfile, TESTFN, unlink from test.support import findfile
from test.support.os_helper import TESTFN, unlink
import array import array
import io import io
import pickle import pickle

View File

@ -2,6 +2,7 @@ import argparse
import os import os
import sys import sys
from test import support from test import support
from test.support import os_helper
USAGE = """\ USAGE = """\
@ -291,7 +292,7 @@ def _create_parser():
def relative_filename(string): def relative_filename(string):
# CWD is replaced with a temporary dir before calling main(), so we # CWD is replaced with a temporary dir before calling main(), so we
# join it with the saved CWD so it ends up where the user expects. # join it with the saved CWD so it ends up where the user expects.
return os.path.join(support.SAVEDCWD, string) return os.path.join(os_helper.SAVEDCWD, string)
def huntrleaks(string): def huntrleaks(string):

View File

@ -216,7 +216,7 @@ class Regrtest:
# regex to match 'test_builtin' in line: # regex to match 'test_builtin' in line:
# '0:00:00 [ 4/400] test_builtin -- test_dict took 1 sec' # '0:00:00 [ 4/400] test_builtin -- test_dict took 1 sec'
regex = re.compile(r'\btest_[a-zA-Z0-9_]+\b') regex = re.compile(r'\btest_[a-zA-Z0-9_]+\b')
with open(os.path.join(support.SAVEDCWD, self.ns.fromfile)) as fp: with open(os.path.join(os_helper.SAVEDCWD, self.ns.fromfile)) as fp:
for line in fp: for line in fp:
line = line.split('#', 1)[0] line = line.split('#', 1)[0]
line = line.strip() line = line.strip()
@ -559,7 +559,7 @@ class Regrtest:
for k, v in totals.items(): for k, v in totals.items():
root.set(k, str(v)) root.set(k, str(v))
xmlpath = os.path.join(support.SAVEDCWD, self.ns.xmlpath) xmlpath = os.path.join(os_helper.SAVEDCWD, self.ns.xmlpath)
with open(xmlpath, 'wb') as f: with open(xmlpath, 'wb') as f:
for s in ET.tostringlist(root): for s in ET.tostringlist(root):
f.write(s) f.write(s)
@ -597,7 +597,7 @@ class Regrtest:
test_cwd = 'test_python_worker_{}'.format(pid) test_cwd = 'test_python_worker_{}'.format(pid)
else: else:
test_cwd = 'test_python_{}'.format(pid) test_cwd = 'test_python_{}'.format(pid)
test_cwd += support.FS_NONASCII test_cwd += os_helper.FS_NONASCII
test_cwd = os.path.join(self.tmp_dir, test_cwd) test_cwd = os.path.join(self.tmp_dir, test_cwd)
return test_cwd return test_cwd
@ -609,10 +609,10 @@ class Regrtest:
for name in glob.glob(path): for name in glob.glob(path):
if os.path.isdir(name): if os.path.isdir(name):
print("Remove directory: %s" % name) print("Remove directory: %s" % name)
support.rmtree(name) os_helper.rmtree(name)
else: else:
print("Remove file: %s" % name) print("Remove file: %s" % name)
support.unlink(name) os_helper.unlink(name)
def main(self, tests=None, **kwargs): def main(self, tests=None, **kwargs):
self.parse_args(kwargs) self.parse_args(kwargs)
@ -629,7 +629,7 @@ class Regrtest:
# Run the tests in a context manager that temporarily changes the CWD # Run the tests in a context manager that temporarily changes the CWD
# to a temporary and writable directory. If it's not possible to # to a temporary and writable directory. If it's not possible to
# create or change the CWD, the original CWD will be used. # create or change the CWD, the original CWD will be used.
# The original CWD is available from support.SAVEDCWD. # The original CWD is available from os_helper.SAVEDCWD.
with os_helper.temp_cwd(test_cwd, quiet=True): with os_helper.temp_cwd(test_cwd, quiet=True):
# When using multiprocessing, worker processes will use test_cwd # When using multiprocessing, worker processes will use test_cwd
# as their parent temporary directory. So when the main process # as their parent temporary directory. So when the main process

View File

@ -11,6 +11,7 @@ import time
import traceback import traceback
import types import types
from test import support from test import support
from test.support import os_helper
from test.libregrtest.runtest import ( from test.libregrtest.runtest import (
runtest, INTERRUPTED, CHILD_ERROR, PROGRESS_MIN_TIME, runtest, INTERRUPTED, CHILD_ERROR, PROGRESS_MIN_TIME,
@ -70,7 +71,7 @@ def run_test_in_subprocess(testname, ns):
stderr=subprocess.PIPE, stderr=subprocess.PIPE,
universal_newlines=True, universal_newlines=True,
close_fds=(os.name != 'nt'), close_fds=(os.name != 'nt'),
cwd=support.SAVEDCWD, cwd=os_helper.SAVEDCWD,
**kw) **kw)

View File

@ -1,4 +1,6 @@
from test.support import check_no_resource_warning, findfile, TESTFN, unlink from test.support import findfile
from test.support.os_helper import TESTFN, unlink
from test.support.warnings_helper import check_no_resource_warning
import unittest import unittest
from unittest import mock from unittest import mock
from test import audiotests from test import audiotests

View File

@ -12,14 +12,15 @@ import random
import shutil import shutil
import subprocess import subprocess
import threading import threading
from test.support import import_helper
from test.support import threading_helper from test.support import threading_helper
from test.support import unlink from test.support.os_helper import unlink
import _compression import _compression
import sys import sys
# Skip tests if the bz2 module doesn't exist. # Skip tests if the bz2 module doesn't exist.
bz2 = support.import_module('bz2') bz2 = import_helper.import_module('bz2')
from bz2 import BZ2File, BZ2Compressor, BZ2Decompressor from bz2 import BZ2File, BZ2Compressor, BZ2Decompressor
has_cmdline_bunzip2 = None has_cmdline_bunzip2 = None

View File

@ -19,15 +19,18 @@ import gc
from weakref import proxy from weakref import proxy
import contextlib import contextlib
from test.support import import_helper
from test.support import threading_helper from test.support import threading_helper
from test.support.script_helper import assert_python_ok from test.support.script_helper import assert_python_ok
import functools import functools
py_functools = support.import_fresh_module('functools', blocked=['_functools']) py_functools = import_helper.import_fresh_module('functools',
c_functools = support.import_fresh_module('functools', fresh=['_functools']) blocked=['_functools'])
c_functools = import_helper.import_fresh_module('functools',
fresh=['_functools'])
decimal = support.import_fresh_module('decimal', fresh=['_decimal']) decimal = import_helper.import_fresh_module('decimal', fresh=['_decimal'])
@contextlib.contextmanager @contextlib.contextmanager
def replaced_module(name, replacement): def replaced_module(name, replacement):

View File

@ -4,6 +4,7 @@ import __future__
import ast import ast
import unittest import unittest
from test import support from test import support
from test.support import import_helper
from textwrap import dedent from textwrap import dedent
import os import os
import re import re
@ -24,17 +25,17 @@ class FutureTest(unittest.TestCase):
self.assertEqual(err.offset, offset) self.assertEqual(err.offset, offset)
def test_future1(self): def test_future1(self):
with support.CleanImport('future_test1'): with import_helper.CleanImport('future_test1'):
from test import future_test1 from test import future_test1
self.assertEqual(future_test1.result, 6) self.assertEqual(future_test1.result, 6)
def test_future2(self): def test_future2(self):
with support.CleanImport('future_test2'): with import_helper.CleanImport('future_test2'):
from test import future_test2 from test import future_test2
self.assertEqual(future_test2.result, 6) self.assertEqual(future_test2.result, 6)
def test_future3(self): def test_future3(self):
with support.CleanImport('test_future3'): with import_helper.CleanImport('test_future3'):
from test import test_future3 from test import test_future3
def test_badfuture3(self): def test_badfuture3(self):
@ -113,7 +114,7 @@ class FutureTest(unittest.TestCase):
self.fail("syntax error didn't occur") self.fail("syntax error didn't occur")
def test_multiple_features(self): def test_multiple_features(self):
with support.CleanImport("test.test_future5"): with import_helper.CleanImport("test.test_future5"):
from test import test_future5 from test import test_future5
def test_unicode_literals_exec(self): def test_unicode_literals_exec(self):

View File

@ -42,8 +42,10 @@ import sys
import tempfile import tempfile
from test.support.script_helper import assert_python_ok, assert_python_failure from test.support.script_helper import assert_python_ok, assert_python_failure
from test import support from test import support
from test.support import os_helper
from test.support import socket_helper from test.support import socket_helper
from test.support import threading_helper from test.support import threading_helper
from test.support import warnings_helper
from test.support.logging_helper import TestHandler from test.support.logging_helper import TestHandler
import textwrap import textwrap
import threading import threading
@ -1169,7 +1171,7 @@ class ConfigFileTest(BaseTest):
"""Reading logging config from a .ini-style config file.""" """Reading logging config from a .ini-style config file."""
check_no_resource_warning = support.check_no_resource_warning check_no_resource_warning = warnings_helper.check_no_resource_warning
expected_log_pat = r"^(\w+) \+\+ (\w+)$" expected_log_pat = r"^(\w+) \+\+ (\w+)$"
# config0 is a standard configuration. # config0 is a standard configuration.
@ -1756,7 +1758,7 @@ class UnixSocketHandlerTest(SocketHandlerTest):
def tearDown(self): def tearDown(self):
SocketHandlerTest.tearDown(self) SocketHandlerTest.tearDown(self)
support.unlink(self.address) os_helper.unlink(self.address)
class DatagramHandlerTest(BaseTest): class DatagramHandlerTest(BaseTest):
@ -1837,7 +1839,7 @@ class UnixDatagramHandlerTest(DatagramHandlerTest):
def tearDown(self): def tearDown(self):
DatagramHandlerTest.tearDown(self) DatagramHandlerTest.tearDown(self)
support.unlink(self.address) os_helper.unlink(self.address)
class SysLogHandlerTest(BaseTest): class SysLogHandlerTest(BaseTest):
@ -1921,7 +1923,7 @@ class UnixSysLogHandlerTest(SysLogHandlerTest):
def tearDown(self): def tearDown(self):
SysLogHandlerTest.tearDown(self) SysLogHandlerTest.tearDown(self)
support.unlink(self.address) os_helper.unlink(self.address)
@unittest.skipUnless(socket_helper.IPV6_ENABLED, @unittest.skipUnless(socket_helper.IPV6_ENABLED,
'IPv6 support required for this test.') 'IPv6 support required for this test.')
@ -2175,7 +2177,7 @@ class ConfigDictTest(BaseTest):
"""Reading logging config from a dictionary.""" """Reading logging config from a dictionary."""
check_no_resource_warning = support.check_no_resource_warning check_no_resource_warning = warnings_helper.check_no_resource_warning
expected_log_pat = r"^(\w+) \+\+ (\w+)$" expected_log_pat = r"^(\w+) \+\+ (\w+)$"
# config0 is a standard configuration. # config0 is a standard configuration.

View File

@ -7,8 +7,10 @@ import select
import threading import threading
import time import time
import unittest import unittest
from test.support import TESTFN, run_unittest, cpython_only from test.support import run_unittest, cpython_only
from test.support import threading_helper from test.support import threading_helper
from test.support.os_helper import TESTFN
try: try:
select.poll select.poll

View File

@ -18,6 +18,7 @@ import textwrap
import unittest import unittest
from test import libregrtest from test import libregrtest
from test import support from test import support
from test.support import os_helper
from test.libregrtest import utils from test.libregrtest import utils
@ -161,12 +162,12 @@ class ParseArgsTestCase(unittest.TestCase):
self.assertEqual(ns.ignore_tests, ['pattern']) self.assertEqual(ns.ignore_tests, ['pattern'])
self.checkError([opt], 'expected one argument') self.checkError([opt], 'expected one argument')
self.addCleanup(support.unlink, support.TESTFN) self.addCleanup(os_helper.unlink, os_helper.TESTFN)
with open(support.TESTFN, "w") as fp: with open(os_helper.TESTFN, "w") as fp:
print('matchfile1', file=fp) print('matchfile1', file=fp)
print('matchfile2', file=fp) print('matchfile2', file=fp)
filename = os.path.abspath(support.TESTFN) filename = os.path.abspath(os_helper.TESTFN)
ns = libregrtest._parse_args(['-m', 'match', ns = libregrtest._parse_args(['-m', 'match',
'--ignorefile', filename]) '--ignorefile', filename])
self.assertEqual(ns.ignore_tests, self.assertEqual(ns.ignore_tests,
@ -183,12 +184,12 @@ class ParseArgsTestCase(unittest.TestCase):
'-m', 'pattern2']) '-m', 'pattern2'])
self.assertEqual(ns.match_tests, ['pattern1', 'pattern2']) self.assertEqual(ns.match_tests, ['pattern1', 'pattern2'])
self.addCleanup(support.unlink, support.TESTFN) self.addCleanup(os_helper.unlink, os_helper.TESTFN)
with open(support.TESTFN, "w") as fp: with open(os_helper.TESTFN, "w") as fp:
print('matchfile1', file=fp) print('matchfile1', file=fp)
print('matchfile2', file=fp) print('matchfile2', file=fp)
filename = os.path.abspath(support.TESTFN) filename = os.path.abspath(os_helper.TESTFN)
ns = libregrtest._parse_args(['-m', 'match', ns = libregrtest._parse_args(['-m', 'match',
'--matchfile', filename]) '--matchfile', filename])
self.assertEqual(ns.match_tests, self.assertEqual(ns.match_tests,
@ -237,7 +238,7 @@ class ParseArgsTestCase(unittest.TestCase):
def test_testdir(self): def test_testdir(self):
ns = libregrtest._parse_args(['--testdir', 'foo']) ns = libregrtest._parse_args(['--testdir', 'foo'])
self.assertEqual(ns.testdir, os.path.join(support.SAVEDCWD, 'foo')) self.assertEqual(ns.testdir, os.path.join(os_helper.SAVEDCWD, 'foo'))
self.checkError(['--testdir'], 'expected one argument') self.checkError(['--testdir'], 'expected one argument')
def test_runleaks(self): def test_runleaks(self):
@ -284,7 +285,7 @@ class ParseArgsTestCase(unittest.TestCase):
with self.subTest(opt=opt): with self.subTest(opt=opt):
ns = libregrtest._parse_args([opt, 'foo']) ns = libregrtest._parse_args([opt, 'foo'])
self.assertEqual(ns.coverdir, self.assertEqual(ns.coverdir,
os.path.join(support.SAVEDCWD, 'foo')) os.path.join(os_helper.SAVEDCWD, 'foo'))
self.checkError([opt], 'expected one argument') self.checkError([opt], 'expected one argument')
def test_nocoverdir(self): def test_nocoverdir(self):
@ -363,7 +364,7 @@ class BaseTestCase(unittest.TestCase):
self.testdir = os.path.realpath(os.path.dirname(__file__)) self.testdir = os.path.realpath(os.path.dirname(__file__))
self.tmptestdir = tempfile.mkdtemp() self.tmptestdir = tempfile.mkdtemp()
self.addCleanup(support.rmtree, self.tmptestdir) self.addCleanup(os_helper.rmtree, self.tmptestdir)
def create_test(self, name=None, code=None): def create_test(self, name=None, code=None):
if not name: if not name:
@ -384,7 +385,7 @@ class BaseTestCase(unittest.TestCase):
name = self.TESTNAME_PREFIX + name name = self.TESTNAME_PREFIX + name
path = os.path.join(self.tmptestdir, name + '.py') path = os.path.join(self.tmptestdir, name + '.py')
self.addCleanup(support.unlink, path) self.addCleanup(os_helper.unlink, path)
# Use 'x' mode to ensure that we do not override existing tests # Use 'x' mode to ensure that we do not override existing tests
try: try:
with open(path, 'x', encoding='utf-8') as fp: with open(path, 'x', encoding='utf-8') as fp:
@ -770,8 +771,8 @@ class ArgsTestCase(BaseTestCase):
# Write the list of files using a format similar to regrtest output: # Write the list of files using a format similar to regrtest output:
# [1/2] test_1 # [1/2] test_1
# [2/2] test_2 # [2/2] test_2
filename = support.TESTFN filename = os_helper.TESTFN
self.addCleanup(support.unlink, filename) self.addCleanup(os_helper.unlink, filename)
# test format '0:00:00 [2/7] test_opcodes -- test_grammar took 0 sec' # test format '0:00:00 [2/7] test_opcodes -- test_grammar took 0 sec'
with open(filename, "w") as fp: with open(filename, "w") as fp:
@ -886,7 +887,7 @@ class ArgsTestCase(BaseTestCase):
test = self.create_test('huntrleaks', code=code) test = self.create_test('huntrleaks', code=code)
filename = 'reflog.txt' filename = 'reflog.txt'
self.addCleanup(support.unlink, filename) self.addCleanup(os_helper.unlink, filename)
output = self.run_tests('--huntrleaks', '3:3:', test, output = self.run_tests('--huntrleaks', '3:3:', test,
exitcode=2, exitcode=2,
stderr=subprocess.STDOUT) stderr=subprocess.STDOUT)
@ -997,8 +998,8 @@ class ArgsTestCase(BaseTestCase):
testname = self.create_test(code=code) testname = self.create_test(code=code)
# only run a subset # only run a subset
filename = support.TESTFN filename = os_helper.TESTFN
self.addCleanup(support.unlink, filename) self.addCleanup(os_helper.unlink, filename)
subset = [ subset = [
# only ignore the method name # only ignore the method name
@ -1038,8 +1039,8 @@ class ArgsTestCase(BaseTestCase):
self.assertEqual(methods, all_methods) self.assertEqual(methods, all_methods)
# only run a subset # only run a subset
filename = support.TESTFN filename = os_helper.TESTFN
self.addCleanup(support.unlink, filename) self.addCleanup(os_helper.unlink, filename)
subset = [ subset = [
# only match the method name # only match the method name

View File

@ -2,6 +2,7 @@ import unittest
import textwrap import textwrap
from test import support, mock_socket from test import support, mock_socket
from test.support import socket_helper from test.support import socket_helper
from test.support import warnings_helper
import socket import socket
import io import io
import smtpd import smtpd
@ -714,49 +715,49 @@ class SMTPDChannelTest(unittest.TestCase):
b'recognized\r\n') b'recognized\r\n')
def test_attribute_deprecations(self): def test_attribute_deprecations(self):
with support.check_warnings(('', DeprecationWarning)): with warnings_helper.check_warnings(('', DeprecationWarning)):
spam = self.channel._SMTPChannel__server spam = self.channel._SMTPChannel__server
with support.check_warnings(('', DeprecationWarning)): with warnings_helper.check_warnings(('', DeprecationWarning)):
self.channel._SMTPChannel__server = 'spam' self.channel._SMTPChannel__server = 'spam'
with support.check_warnings(('', DeprecationWarning)): with warnings_helper.check_warnings(('', DeprecationWarning)):
spam = self.channel._SMTPChannel__line spam = self.channel._SMTPChannel__line
with support.check_warnings(('', DeprecationWarning)): with warnings_helper.check_warnings(('', DeprecationWarning)):
self.channel._SMTPChannel__line = 'spam' self.channel._SMTPChannel__line = 'spam'
with support.check_warnings(('', DeprecationWarning)): with warnings_helper.check_warnings(('', DeprecationWarning)):
spam = self.channel._SMTPChannel__state spam = self.channel._SMTPChannel__state
with support.check_warnings(('', DeprecationWarning)): with warnings_helper.check_warnings(('', DeprecationWarning)):
self.channel._SMTPChannel__state = 'spam' self.channel._SMTPChannel__state = 'spam'
with support.check_warnings(('', DeprecationWarning)): with warnings_helper.check_warnings(('', DeprecationWarning)):
spam = self.channel._SMTPChannel__greeting spam = self.channel._SMTPChannel__greeting
with support.check_warnings(('', DeprecationWarning)): with warnings_helper.check_warnings(('', DeprecationWarning)):
self.channel._SMTPChannel__greeting = 'spam' self.channel._SMTPChannel__greeting = 'spam'
with support.check_warnings(('', DeprecationWarning)): with warnings_helper.check_warnings(('', DeprecationWarning)):
spam = self.channel._SMTPChannel__mailfrom spam = self.channel._SMTPChannel__mailfrom
with support.check_warnings(('', DeprecationWarning)): with warnings_helper.check_warnings(('', DeprecationWarning)):
self.channel._SMTPChannel__mailfrom = 'spam' self.channel._SMTPChannel__mailfrom = 'spam'
with support.check_warnings(('', DeprecationWarning)): with warnings_helper.check_warnings(('', DeprecationWarning)):
spam = self.channel._SMTPChannel__rcpttos spam = self.channel._SMTPChannel__rcpttos
with support.check_warnings(('', DeprecationWarning)): with warnings_helper.check_warnings(('', DeprecationWarning)):
self.channel._SMTPChannel__rcpttos = 'spam' self.channel._SMTPChannel__rcpttos = 'spam'
with support.check_warnings(('', DeprecationWarning)): with warnings_helper.check_warnings(('', DeprecationWarning)):
spam = self.channel._SMTPChannel__data spam = self.channel._SMTPChannel__data
with support.check_warnings(('', DeprecationWarning)): with warnings_helper.check_warnings(('', DeprecationWarning)):
self.channel._SMTPChannel__data = 'spam' self.channel._SMTPChannel__data = 'spam'
with support.check_warnings(('', DeprecationWarning)): with warnings_helper.check_warnings(('', DeprecationWarning)):
spam = self.channel._SMTPChannel__fqdn spam = self.channel._SMTPChannel__fqdn
with support.check_warnings(('', DeprecationWarning)): with warnings_helper.check_warnings(('', DeprecationWarning)):
self.channel._SMTPChannel__fqdn = 'spam' self.channel._SMTPChannel__fqdn = 'spam'
with support.check_warnings(('', DeprecationWarning)): with warnings_helper.check_warnings(('', DeprecationWarning)):
spam = self.channel._SMTPChannel__peer spam = self.channel._SMTPChannel__peer
with support.check_warnings(('', DeprecationWarning)): with warnings_helper.check_warnings(('', DeprecationWarning)):
self.channel._SMTPChannel__peer = 'spam' self.channel._SMTPChannel__peer = 'spam'
with support.check_warnings(('', DeprecationWarning)): with warnings_helper.check_warnings(('', DeprecationWarning)):
spam = self.channel._SMTPChannel__conn spam = self.channel._SMTPChannel__conn
with support.check_warnings(('', DeprecationWarning)): with warnings_helper.check_warnings(('', DeprecationWarning)):
self.channel._SMTPChannel__conn = 'spam' self.channel._SMTPChannel__conn = 'spam'
with support.check_warnings(('', DeprecationWarning)): with warnings_helper.check_warnings(('', DeprecationWarning)):
spam = self.channel._SMTPChannel__addr spam = self.channel._SMTPChannel__addr
with support.check_warnings(('', DeprecationWarning)): with warnings_helper.check_warnings(('', DeprecationWarning)):
self.channel._SMTPChannel__addr = 'spam' self.channel._SMTPChannel__addr = 'spam'
@unittest.skipUnless(socket_helper.IPV6_ENABLED, "IPv6 not enabled") @unittest.skipUnless(socket_helper.IPV6_ENABLED, "IPv6 not enabled")

View File

@ -1,4 +1,5 @@
from test import support from test import support
from test.support import warnings_helper
import decimal import decimal
import enum import enum
import locale import locale
@ -247,7 +248,7 @@ class TimeTestCase(unittest.TestCase):
# not change output based on its value and no test for year # not change output based on its value and no test for year
# because systems vary in their support for year 0. # because systems vary in their support for year 0.
expected = "2000 01 01 00 00 00 1 001" expected = "2000 01 01 00 00 00 1 001"
with support.check_warnings(): with warnings_helper.check_warnings():
result = time.strftime("%Y %m %d %H %M %S %w %j", (2000,)+(0,)*8) result = time.strftime("%Y %m %d %H %M %S %w %j", (2000,)+(0,)*8)
self.assertEqual(expected, result) self.assertEqual(expected, result)

View File

@ -6,8 +6,10 @@ import sys
import unicodedata import unicodedata
import unittest import unittest
from test.support import (run_unittest, rmtree, change_cwd, from test.support import run_unittest
TESTFN_UNICODE, TESTFN_UNENCODABLE, create_empty_file) from test.support.os_helper import (rmtree, change_cwd, TESTFN_UNICODE,
TESTFN_UNENCODABLE, create_empty_file)
if not os.path.supports_unicode_filenames: if not os.path.supports_unicode_filenames:
try: try:

View File

@ -9,6 +9,8 @@ import io
import unittest import unittest
from unittest.mock import patch from unittest.mock import patch
from test import support from test import support
from test.support import os_helper
from test.support import warnings_helper
import os import os
try: try:
import ssl import ssl
@ -50,7 +52,7 @@ def urlopen(url, data=None, proxies=None):
def FancyURLopener(): def FancyURLopener():
with support.check_warnings( with warnings_helper.check_warnings(
('FancyURLopener style of invoking requests is deprecated.', ('FancyURLopener style of invoking requests is deprecated.',
DeprecationWarning)): DeprecationWarning)):
return urllib.request.FancyURLopener() return urllib.request.FancyURLopener()
@ -145,19 +147,19 @@ class urlopen_FileTests(unittest.TestCase):
# Create a temp file to use for testing # Create a temp file to use for testing
self.text = bytes("test_urllib: %s\n" % self.__class__.__name__, self.text = bytes("test_urllib: %s\n" % self.__class__.__name__,
"ascii") "ascii")
f = open(support.TESTFN, 'wb') f = open(os_helper.TESTFN, 'wb')
try: try:
f.write(self.text) f.write(self.text)
finally: finally:
f.close() f.close()
self.pathname = support.TESTFN self.pathname = os_helper.TESTFN
self.quoted_pathname = urllib.parse.quote(self.pathname) self.quoted_pathname = urllib.parse.quote(self.pathname)
self.returned_obj = urlopen("file:%s" % self.quoted_pathname) self.returned_obj = urlopen("file:%s" % self.quoted_pathname)
def tearDown(self): def tearDown(self):
"""Shut down the open object""" """Shut down the open object"""
self.returned_obj.close() self.returned_obj.close()
os.remove(support.TESTFN) os.remove(os_helper.TESTFN)
def test_interface(self): def test_interface(self):
# Make sure object returned by urlopen() has the specified methods # Make sure object returned by urlopen() has the specified methods
@ -230,7 +232,7 @@ class ProxyTests(unittest.TestCase):
def setUp(self): def setUp(self):
# Records changes to env vars # Records changes to env vars
self.env = support.EnvironmentVarGuard() self.env = os_helper.EnvironmentVarGuard()
# Delete all proxy related env vars # Delete all proxy related env vars
for k in list(os.environ): for k in list(os.environ):
if 'proxy' in k.lower(): if 'proxy' in k.lower():
@ -592,13 +594,13 @@ Connection: close
self.unfakehttp() self.unfakehttp()
def test_URLopener_deprecation(self): def test_URLopener_deprecation(self):
with support.check_warnings(('',DeprecationWarning)): with warnings_helper.check_warnings(('',DeprecationWarning)):
urllib.request.URLopener() urllib.request.URLopener()
@unittest.skipUnless(ssl, "ssl module required") @unittest.skipUnless(ssl, "ssl module required")
def test_cafile_and_context(self): def test_cafile_and_context(self):
context = ssl.create_default_context() context = ssl.create_default_context()
with support.check_warnings(('', DeprecationWarning)): with warnings_helper.check_warnings(('', DeprecationWarning)):
with self.assertRaises(ValueError): with self.assertRaises(ValueError):
urllib.request.urlopen( urllib.request.urlopen(
"https://localhost", cafile="/nonexistent/path", context=context "https://localhost", cafile="/nonexistent/path", context=context
@ -699,10 +701,10 @@ class urlretrieve_FileTests(unittest.TestCase):
self.tempFiles = [] self.tempFiles = []
# Create a temporary file. # Create a temporary file.
self.registerFileForCleanUp(support.TESTFN) self.registerFileForCleanUp(os_helper.TESTFN)
self.text = b'testing urllib.urlretrieve' self.text = b'testing urllib.urlretrieve'
try: try:
FILE = open(support.TESTFN, 'wb') FILE = open(os_helper.TESTFN, 'wb')
FILE.write(self.text) FILE.write(self.text)
FILE.close() FILE.close()
finally: finally:
@ -745,18 +747,18 @@ class urlretrieve_FileTests(unittest.TestCase):
def test_basic(self): def test_basic(self):
# Make sure that a local file just gets its own location returned and # Make sure that a local file just gets its own location returned and
# a headers value is returned. # a headers value is returned.
result = urllib.request.urlretrieve("file:%s" % support.TESTFN) result = urllib.request.urlretrieve("file:%s" % os_helper.TESTFN)
self.assertEqual(result[0], support.TESTFN) self.assertEqual(result[0], os_helper.TESTFN)
self.assertIsInstance(result[1], email.message.Message, self.assertIsInstance(result[1], email.message.Message,
"did not get an email.message.Message instance " "did not get an email.message.Message instance "
"as second returned value") "as second returned value")
def test_copy(self): def test_copy(self):
# Test that setting the filename argument works. # Test that setting the filename argument works.
second_temp = "%s.2" % support.TESTFN second_temp = "%s.2" % os_helper.TESTFN
self.registerFileForCleanUp(second_temp) self.registerFileForCleanUp(second_temp)
result = urllib.request.urlretrieve(self.constructLocalFileUrl( result = urllib.request.urlretrieve(self.constructLocalFileUrl(
support.TESTFN), second_temp) os_helper.TESTFN), second_temp)
self.assertEqual(second_temp, result[0]) self.assertEqual(second_temp, result[0])
self.assertTrue(os.path.exists(second_temp), "copy of the file was not " self.assertTrue(os.path.exists(second_temp), "copy of the file was not "
"made") "made")
@ -777,10 +779,10 @@ class urlretrieve_FileTests(unittest.TestCase):
self.assertIsInstance(file_size, int) self.assertIsInstance(file_size, int)
self.assertEqual(block_count, count_holder[0]) self.assertEqual(block_count, count_holder[0])
count_holder[0] = count_holder[0] + 1 count_holder[0] = count_holder[0] + 1
second_temp = "%s.2" % support.TESTFN second_temp = "%s.2" % os_helper.TESTFN
self.registerFileForCleanUp(second_temp) self.registerFileForCleanUp(second_temp)
urllib.request.urlretrieve( urllib.request.urlretrieve(
self.constructLocalFileUrl(support.TESTFN), self.constructLocalFileUrl(os_helper.TESTFN),
second_temp, hooktester) second_temp, hooktester)
def test_reporthook_0_bytes(self): def test_reporthook_0_bytes(self):
@ -790,7 +792,7 @@ class urlretrieve_FileTests(unittest.TestCase):
_report.append((block_count, block_read_size, file_size)) _report.append((block_count, block_read_size, file_size))
srcFileName = self.createNewTempFile() srcFileName = self.createNewTempFile()
urllib.request.urlretrieve(self.constructLocalFileUrl(srcFileName), urllib.request.urlretrieve(self.constructLocalFileUrl(srcFileName),
support.TESTFN, hooktester) os_helper.TESTFN, hooktester)
self.assertEqual(len(report), 1) self.assertEqual(len(report), 1)
self.assertEqual(report[0][2], 0) self.assertEqual(report[0][2], 0)
@ -803,7 +805,7 @@ class urlretrieve_FileTests(unittest.TestCase):
_report.append((block_count, block_read_size, file_size)) _report.append((block_count, block_read_size, file_size))
srcFileName = self.createNewTempFile(b"x" * 5) srcFileName = self.createNewTempFile(b"x" * 5)
urllib.request.urlretrieve(self.constructLocalFileUrl(srcFileName), urllib.request.urlretrieve(self.constructLocalFileUrl(srcFileName),
support.TESTFN, hooktester) os_helper.TESTFN, hooktester)
self.assertEqual(len(report), 2) self.assertEqual(len(report), 2)
self.assertEqual(report[0][2], 5) self.assertEqual(report[0][2], 5)
self.assertEqual(report[1][2], 5) self.assertEqual(report[1][2], 5)
@ -817,7 +819,7 @@ class urlretrieve_FileTests(unittest.TestCase):
_report.append((block_count, block_read_size, file_size)) _report.append((block_count, block_read_size, file_size))
srcFileName = self.createNewTempFile(b"x" * 8193) srcFileName = self.createNewTempFile(b"x" * 8193)
urllib.request.urlretrieve(self.constructLocalFileUrl(srcFileName), urllib.request.urlretrieve(self.constructLocalFileUrl(srcFileName),
support.TESTFN, hooktester) os_helper.TESTFN, hooktester)
self.assertEqual(len(report), 3) self.assertEqual(len(report), 3)
self.assertEqual(report[0][2], 8193) self.assertEqual(report[0][2], 8193)
self.assertEqual(report[0][1], 8192) self.assertEqual(report[0][1], 8192)
@ -1556,7 +1558,7 @@ class URLopener_Tests(FakeHTTPMixin, unittest.TestCase):
class DummyURLopener(urllib.request.URLopener): class DummyURLopener(urllib.request.URLopener):
def open_spam(self, url): def open_spam(self, url):
return url return url
with support.check_warnings( with warnings_helper.check_warnings(
('DummyURLopener style of invoking requests is deprecated.', ('DummyURLopener style of invoking requests is deprecated.',
DeprecationWarning)): DeprecationWarning)):
self.assertEqual(DummyURLopener().open( self.assertEqual(DummyURLopener().open(
@ -1567,9 +1569,9 @@ class URLopener_Tests(FakeHTTPMixin, unittest.TestCase):
"spam://c:|windows%/:=&?~#+!$,;'@()*[]|/path/"), "spam://c:|windows%/:=&?~#+!$,;'@()*[]|/path/"),
"//c:|windows%/:=&?~#+!$,;'@()*[]|/path/") "//c:|windows%/:=&?~#+!$,;'@()*[]|/path/")
@support.ignore_warnings(category=DeprecationWarning) @warnings_helper.ignore_warnings(category=DeprecationWarning)
def test_urlopener_retrieve_file(self): def test_urlopener_retrieve_file(self):
with support.temp_dir() as tmpdir: with os_helper.temp_dir() as tmpdir:
fd, tmpfile = tempfile.mkstemp(dir=tmpdir) fd, tmpfile = tempfile.mkstemp(dir=tmpdir)
os.close(fd) os.close(fd)
fileurl = "file:" + urllib.request.pathname2url(tmpfile) fileurl = "file:" + urllib.request.pathname2url(tmpfile)
@ -1577,7 +1579,7 @@ class URLopener_Tests(FakeHTTPMixin, unittest.TestCase):
# Some buildbots have TEMP folder that uses a lowercase drive letter. # Some buildbots have TEMP folder that uses a lowercase drive letter.
self.assertEqual(os.path.normcase(filename), os.path.normcase(tmpfile)) self.assertEqual(os.path.normcase(filename), os.path.normcase(tmpfile))
@support.ignore_warnings(category=DeprecationWarning) @warnings_helper.ignore_warnings(category=DeprecationWarning)
def test_urlopener_retrieve_remote(self): def test_urlopener_retrieve_remote(self):
url = "http://www.python.org/file.txt" url = "http://www.python.org/file.txt"
self.fakehttp(b"HTTP/1.1 200 OK\r\n\r\nHello!") self.fakehttp(b"HTTP/1.1 200 OK\r\n\r\nHello!")
@ -1585,7 +1587,7 @@ class URLopener_Tests(FakeHTTPMixin, unittest.TestCase):
filename, _ = urllib.request.URLopener().retrieve(url) filename, _ = urllib.request.URLopener().retrieve(url)
self.assertEqual(os.path.splitext(filename)[1], ".txt") self.assertEqual(os.path.splitext(filename)[1], ".txt")
@support.ignore_warnings(category=DeprecationWarning) @warnings_helper.ignore_warnings(category=DeprecationWarning)
def test_local_file_open(self): def test_local_file_open(self):
# bpo-35907, CVE-2019-9948: urllib must reject local_file:// scheme # bpo-35907, CVE-2019-9948: urllib must reject local_file:// scheme
class DummyURLopener(urllib.request.URLopener): class DummyURLopener(urllib.request.URLopener):

View File

@ -13,6 +13,7 @@ import doctest
import inspect import inspect
import linecache import linecache
import unittest import unittest
from test.support import os_helper
from test.support.script_helper import (spawn_python, kill_python, assert_python_ok, from test.support.script_helper import (spawn_python, kill_python, assert_python_ok,
make_script, make_zip_script) make_script, make_zip_script)
@ -77,7 +78,7 @@ class ZipSupportTests(unittest.TestCase):
def test_inspect_getsource_issue4223(self): def test_inspect_getsource_issue4223(self):
test_src = "def foo(): pass\n" test_src = "def foo(): pass\n"
with test.support.temp_dir() as d: with os_helper.temp_dir() as d:
init_name = make_script(d, '__init__', test_src) init_name = make_script(d, '__init__', test_src)
name_in_zip = os.path.join('zip_pkg', name_in_zip = os.path.join('zip_pkg',
os.path.basename(init_name)) os.path.basename(init_name))
@ -117,7 +118,7 @@ class ZipSupportTests(unittest.TestCase):
mod_name = mod_name.replace("sample_", "sample_zipped_") mod_name = mod_name.replace("sample_", "sample_zipped_")
sample_sources[mod_name] = src sample_sources[mod_name] = src
with test.support.temp_dir() as d: with os_helper.temp_dir() as d:
script_name = make_script(d, 'test_zipped_doctest', script_name = make_script(d, 'test_zipped_doctest',
test_src) test_src)
zip_name, run_name = make_zip_script(d, 'test_zip', zip_name, run_name = make_zip_script(d, 'test_zip',
@ -192,7 +193,7 @@ class ZipSupportTests(unittest.TestCase):
doctest.testmod() doctest.testmod()
""") """)
pattern = 'File "%s", line 2, in %s' pattern = 'File "%s", line 2, in %s'
with test.support.temp_dir() as d: with os_helper.temp_dir() as d:
script_name = make_script(d, 'script', test_src) script_name = make_script(d, 'script', test_src)
rc, out, err = assert_python_ok(script_name) rc, out, err = assert_python_ok(script_name)
expected = pattern % (script_name, "__main__.Test") expected = pattern % (script_name, "__main__.Test")
@ -219,7 +220,7 @@ class ZipSupportTests(unittest.TestCase):
import pdb import pdb
pdb.Pdb(nosigint=True).runcall(f) pdb.Pdb(nosigint=True).runcall(f)
""") """)
with test.support.temp_dir() as d: with os_helper.temp_dir() as d:
script_name = make_script(d, 'script', test_src) script_name = make_script(d, 'script', test_src)
p = spawn_python(script_name) p = spawn_python(script_name)
p.stdin.write(b'l\n') p.stdin.write(b'l\n')