mirror of https://github.com/python/cpython
1503 lines
51 KiB
Python
1503 lines
51 KiB
Python
"""Supporting definitions for the Python regression tests."""
|
|
|
|
if __name__ != 'test.support':
|
|
raise ImportError('support must be imported from the test package')
|
|
|
|
import contextlib
|
|
import errno
|
|
import functools
|
|
import gc
|
|
import socket
|
|
import sys
|
|
import os
|
|
import platform
|
|
import shutil
|
|
import warnings
|
|
import unittest
|
|
import importlib
|
|
import collections
|
|
import re
|
|
import subprocess
|
|
import imp
|
|
import time
|
|
import sysconfig
|
|
import logging.handlers
|
|
|
|
try:
|
|
import _thread
|
|
except ImportError:
|
|
_thread = None
|
|
|
|
__all__ = [
|
|
"Error", "TestFailed", "ResourceDenied", "import_module",
|
|
"verbose", "use_resources", "max_memuse", "record_original_stdout",
|
|
"get_original_stdout", "unload", "unlink", "rmtree", "forget",
|
|
"is_resource_enabled", "requires", "find_unused_port", "bind_port",
|
|
"fcmp", "is_jython", "TESTFN", "HOST", "FUZZ", "SAVEDCWD", "temp_cwd",
|
|
"findfile", "sortdict", "check_syntax_error", "open_urlresource",
|
|
"check_warnings", "CleanImport", "EnvironmentVarGuard",
|
|
"TransientResource", "captured_output", "captured_stdout",
|
|
"time_out", "socket_peer_reset", "ioerror_peer_reset",
|
|
"run_with_locale", 'temp_umask', "transient_internet",
|
|
"set_memlimit", "bigmemtest", "bigaddrspacetest", "BasicTestRunner",
|
|
"run_unittest", "run_doctest", "threading_setup", "threading_cleanup",
|
|
"reap_children", "cpython_only", "check_impl_detail", "get_attribute",
|
|
"swap_item", "swap_attr", "requires_IEEE_754",
|
|
"TestHandler", "Matcher", "can_symlink", "skip_unless_symlink"]
|
|
|
|
|
|
class Error(Exception):
|
|
"""Base class for regression test exceptions."""
|
|
|
|
class TestFailed(Error):
|
|
"""Test failed."""
|
|
|
|
class ResourceDenied(unittest.SkipTest):
|
|
"""Test skipped because it requested a disallowed resource.
|
|
|
|
This is raised when a test calls requires() for a resource that
|
|
has not be enabled. It is used to distinguish between expected
|
|
and unexpected skips.
|
|
"""
|
|
|
|
@contextlib.contextmanager
|
|
def _ignore_deprecated_imports(ignore=True):
|
|
"""Context manager to suppress package and module deprecation
|
|
warnings when importing them.
|
|
|
|
If ignore is False, this context manager has no effect."""
|
|
if ignore:
|
|
with warnings.catch_warnings():
|
|
warnings.filterwarnings("ignore", ".+ (module|package)",
|
|
DeprecationWarning)
|
|
yield
|
|
else:
|
|
yield
|
|
|
|
|
|
def import_module(name, deprecated=False):
|
|
"""Import and return the module to be tested, raising SkipTest if
|
|
it is not available.
|
|
|
|
If deprecated is True, any module or package deprecation messages
|
|
will be suppressed."""
|
|
with _ignore_deprecated_imports(deprecated):
|
|
try:
|
|
return importlib.import_module(name)
|
|
except ImportError as msg:
|
|
raise unittest.SkipTest(str(msg))
|
|
|
|
|
|
def _save_and_remove_module(name, orig_modules):
|
|
"""Helper function to save and remove a module from sys.modules
|
|
|
|
Return value is True if the module was in sys.modules and
|
|
False otherwise."""
|
|
saved = True
|
|
try:
|
|
orig_modules[name] = sys.modules[name]
|
|
except KeyError:
|
|
saved = False
|
|
else:
|
|
del sys.modules[name]
|
|
return saved
|
|
|
|
|
|
def _save_and_block_module(name, orig_modules):
|
|
"""Helper function to save and block a module in sys.modules
|
|
|
|
Return value is True if the module was in sys.modules and
|
|
False otherwise."""
|
|
saved = True
|
|
try:
|
|
orig_modules[name] = sys.modules[name]
|
|
except KeyError:
|
|
saved = False
|
|
sys.modules[name] = None
|
|
return saved
|
|
|
|
|
|
def import_fresh_module(name, fresh=(), blocked=(), deprecated=False):
|
|
"""Imports and returns a module, deliberately bypassing the sys.modules cache
|
|
and importing a fresh copy of the module. Once the import is complete,
|
|
the sys.modules cache is restored to its original state.
|
|
|
|
Modules named in fresh are also imported anew if needed by the import.
|
|
|
|
Importing of modules named in blocked is prevented while the fresh import
|
|
takes place.
|
|
|
|
If deprecated is True, any module or package deprecation messages
|
|
will be suppressed."""
|
|
# NOTE: test_heapq and test_warnings include extra sanity checks to make
|
|
# sure that this utility function is working as expected
|
|
with _ignore_deprecated_imports(deprecated):
|
|
# Keep track of modules saved for later restoration as well
|
|
# as those which just need a blocking entry removed
|
|
orig_modules = {}
|
|
names_to_remove = []
|
|
_save_and_remove_module(name, orig_modules)
|
|
try:
|
|
for fresh_name in fresh:
|
|
_save_and_remove_module(fresh_name, orig_modules)
|
|
for blocked_name in blocked:
|
|
if not _save_and_block_module(blocked_name, orig_modules):
|
|
names_to_remove.append(blocked_name)
|
|
fresh_module = importlib.import_module(name)
|
|
finally:
|
|
for orig_name, module in orig_modules.items():
|
|
sys.modules[orig_name] = module
|
|
for name_to_remove in names_to_remove:
|
|
del sys.modules[name_to_remove]
|
|
return fresh_module
|
|
|
|
|
|
def get_attribute(obj, name):
|
|
"""Get an attribute, raising SkipTest if AttributeError is raised."""
|
|
try:
|
|
attribute = getattr(obj, name)
|
|
except AttributeError:
|
|
raise unittest.SkipTest("module %s has no attribute %s" % (
|
|
obj.__name__, name))
|
|
else:
|
|
return attribute
|
|
|
|
verbose = 1 # Flag set to 0 by regrtest.py
|
|
use_resources = None # Flag set to [] by regrtest.py
|
|
max_memuse = 0 # Disable bigmem tests (they will still be run with
|
|
# small sizes, to make sure they work.)
|
|
real_max_memuse = 0
|
|
|
|
# _original_stdout is meant to hold stdout at the time regrtest began.
|
|
# This may be "the real" stdout, or IDLE's emulation of stdout, or whatever.
|
|
# The point is to have some flavor of stdout the user can actually see.
|
|
_original_stdout = None
|
|
def record_original_stdout(stdout):
|
|
global _original_stdout
|
|
_original_stdout = stdout
|
|
|
|
def get_original_stdout():
|
|
return _original_stdout or sys.stdout
|
|
|
|
def unload(name):
|
|
try:
|
|
del sys.modules[name]
|
|
except KeyError:
|
|
pass
|
|
|
|
def unlink(filename):
|
|
try:
|
|
os.unlink(filename)
|
|
except OSError as error:
|
|
# The filename need not exist.
|
|
if error.errno not in (errno.ENOENT, errno.ENOTDIR):
|
|
raise
|
|
|
|
def rmtree(path):
|
|
try:
|
|
shutil.rmtree(path)
|
|
except OSError as error:
|
|
# Unix returns ENOENT, Windows returns ESRCH.
|
|
if error.errno not in (errno.ENOENT, errno.ESRCH):
|
|
raise
|
|
|
|
def make_legacy_pyc(source):
|
|
"""Move a PEP 3147 pyc/pyo file to its legacy pyc/pyo location.
|
|
|
|
The choice of .pyc or .pyo extension is done based on the __debug__ flag
|
|
value.
|
|
|
|
:param source: The file system path to the source file. The source file
|
|
does not need to exist, however the PEP 3147 pyc file must exist.
|
|
:return: The file system path to the legacy pyc file.
|
|
"""
|
|
pyc_file = imp.cache_from_source(source)
|
|
up_one = os.path.dirname(os.path.abspath(source))
|
|
legacy_pyc = os.path.join(up_one, source + ('c' if __debug__ else 'o'))
|
|
os.rename(pyc_file, legacy_pyc)
|
|
return legacy_pyc
|
|
|
|
def forget(modname):
|
|
"""'Forget' a module was ever imported.
|
|
|
|
This removes the module from sys.modules and deletes any PEP 3147 or
|
|
legacy .pyc and .pyo files.
|
|
"""
|
|
unload(modname)
|
|
for dirname in sys.path:
|
|
source = os.path.join(dirname, modname + '.py')
|
|
# It doesn't matter if they exist or not, unlink all possible
|
|
# combinations of PEP 3147 and legacy pyc and pyo files.
|
|
unlink(source + 'c')
|
|
unlink(source + 'o')
|
|
unlink(imp.cache_from_source(source, debug_override=True))
|
|
unlink(imp.cache_from_source(source, debug_override=False))
|
|
|
|
# On some platforms, should not run gui test even if it is allowed
|
|
# in `use_resources'.
|
|
if sys.platform.startswith('win'):
|
|
import ctypes
|
|
import ctypes.wintypes
|
|
def _is_gui_available():
|
|
UOI_FLAGS = 1
|
|
WSF_VISIBLE = 0x0001
|
|
class USEROBJECTFLAGS(ctypes.Structure):
|
|
_fields_ = [("fInherit", ctypes.wintypes.BOOL),
|
|
("fReserved", ctypes.wintypes.BOOL),
|
|
("dwFlags", ctypes.wintypes.DWORD)]
|
|
dll = ctypes.windll.user32
|
|
h = dll.GetProcessWindowStation()
|
|
if not h:
|
|
raise ctypes.WinError()
|
|
uof = USEROBJECTFLAGS()
|
|
needed = ctypes.wintypes.DWORD()
|
|
res = dll.GetUserObjectInformationW(h,
|
|
UOI_FLAGS,
|
|
ctypes.byref(uof),
|
|
ctypes.sizeof(uof),
|
|
ctypes.byref(needed))
|
|
if not res:
|
|
raise ctypes.WinError()
|
|
return bool(uof.dwFlags & WSF_VISIBLE)
|
|
else:
|
|
def _is_gui_available():
|
|
return True
|
|
|
|
def is_resource_enabled(resource):
|
|
"""Test whether a resource is enabled. Known resources are set by
|
|
regrtest.py."""
|
|
return use_resources is not None and resource in use_resources
|
|
|
|
def requires(resource, msg=None):
|
|
"""Raise ResourceDenied if the specified resource is not available.
|
|
|
|
If the caller's module is __main__ then automatically return True. The
|
|
possibility of False being returned occurs when regrtest.py is
|
|
executing.
|
|
"""
|
|
if resource == 'gui' and not _is_gui_available():
|
|
raise unittest.SkipTest("Cannot use the 'gui' resource")
|
|
# see if the caller's module is __main__ - if so, treat as if
|
|
# the resource was set
|
|
if sys._getframe(1).f_globals.get("__name__") == "__main__":
|
|
return
|
|
if not is_resource_enabled(resource):
|
|
if msg is None:
|
|
msg = "Use of the `%s' resource not enabled" % resource
|
|
raise ResourceDenied(msg)
|
|
|
|
HOST = 'localhost'
|
|
|
|
def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM):
|
|
"""Returns an unused port that should be suitable for binding. This is
|
|
achieved by creating a temporary socket with the same family and type as
|
|
the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to
|
|
the specified host address (defaults to 0.0.0.0) with the port set to 0,
|
|
eliciting an unused ephemeral port from the OS. The temporary socket is
|
|
then closed and deleted, and the ephemeral port is returned.
|
|
|
|
Either this method or bind_port() should be used for any tests where a
|
|
server socket needs to be bound to a particular port for the duration of
|
|
the test. Which one to use depends on whether the calling code is creating
|
|
a python socket, or if an unused port needs to be provided in a constructor
|
|
or passed to an external program (i.e. the -accept argument to openssl's
|
|
s_server mode). Always prefer bind_port() over find_unused_port() where
|
|
possible. Hard coded ports should *NEVER* be used. As soon as a server
|
|
socket is bound to a hard coded port, the ability to run multiple instances
|
|
of the test simultaneously on the same host is compromised, which makes the
|
|
test a ticking time bomb in a buildbot environment. On Unix buildbots, this
|
|
may simply manifest as a failed test, which can be recovered from without
|
|
intervention in most cases, but on Windows, the entire python process can
|
|
completely and utterly wedge, requiring someone to log in to the buildbot
|
|
and manually kill the affected process.
|
|
|
|
(This is easy to reproduce on Windows, unfortunately, and can be traced to
|
|
the SO_REUSEADDR socket option having different semantics on Windows versus
|
|
Unix/Linux. On Unix, you can't have two AF_INET SOCK_STREAM sockets bind,
|
|
listen and then accept connections on identical host/ports. An EADDRINUSE
|
|
socket.error will be raised at some point (depending on the platform and
|
|
the order bind and listen were called on each socket).
|
|
|
|
However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE
|
|
will ever be raised when attempting to bind two identical host/ports. When
|
|
accept() is called on each socket, the second caller's process will steal
|
|
the port from the first caller, leaving them both in an awkwardly wedged
|
|
state where they'll no longer respond to any signals or graceful kills, and
|
|
must be forcibly killed via OpenProcess()/TerminateProcess().
|
|
|
|
The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option
|
|
instead of SO_REUSEADDR, which effectively affords the same semantics as
|
|
SO_REUSEADDR on Unix. Given the propensity of Unix developers in the Open
|
|
Source world compared to Windows ones, this is a common mistake. A quick
|
|
look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when
|
|
openssl.exe is called with the 's_server' option, for example. See
|
|
http://bugs.python.org/issue2550 for more info. The following site also
|
|
has a very thorough description about the implications of both REUSEADDR
|
|
and EXCLUSIVEADDRUSE on Windows:
|
|
http://msdn2.microsoft.com/en-us/library/ms740621(VS.85).aspx)
|
|
|
|
XXX: although this approach is a vast improvement on previous attempts to
|
|
elicit unused ports, it rests heavily on the assumption that the ephemeral
|
|
port returned to us by the OS won't immediately be dished back out to some
|
|
other process when we close and delete our temporary socket but before our
|
|
calling code has a chance to bind the returned port. We can deal with this
|
|
issue if/when we come across it.
|
|
"""
|
|
|
|
tempsock = socket.socket(family, socktype)
|
|
port = bind_port(tempsock)
|
|
tempsock.close()
|
|
del tempsock
|
|
return port
|
|
|
|
def bind_port(sock, host=HOST):
|
|
"""Bind the socket to a free port and return the port number. Relies on
|
|
ephemeral ports in order to ensure we are using an unbound port. This is
|
|
important as many tests may be running simultaneously, especially in a
|
|
buildbot environment. This method raises an exception if the sock.family
|
|
is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR
|
|
or SO_REUSEPORT set on it. Tests should *never* set these socket options
|
|
for TCP/IP sockets. The only case for setting these options is testing
|
|
multicasting via multiple UDP sockets.
|
|
|
|
Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e.
|
|
on Windows), it will be set on the socket. This will prevent anyone else
|
|
from bind()'ing to our host/port for the duration of the test.
|
|
"""
|
|
|
|
if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM:
|
|
if hasattr(socket, 'SO_REUSEADDR'):
|
|
if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1:
|
|
raise TestFailed("tests should never set the SO_REUSEADDR " \
|
|
"socket option on TCP/IP sockets!")
|
|
if hasattr(socket, 'SO_REUSEPORT'):
|
|
if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1:
|
|
raise TestFailed("tests should never set the SO_REUSEPORT " \
|
|
"socket option on TCP/IP sockets!")
|
|
if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'):
|
|
sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
|
|
|
|
sock.bind((host, 0))
|
|
port = sock.getsockname()[1]
|
|
return port
|
|
|
|
FUZZ = 1e-6
|
|
|
|
def fcmp(x, y): # fuzzy comparison function
|
|
if isinstance(x, float) or isinstance(y, float):
|
|
try:
|
|
fuzz = (abs(x) + abs(y)) * FUZZ
|
|
if abs(x-y) <= fuzz:
|
|
return 0
|
|
except:
|
|
pass
|
|
elif type(x) == type(y) and isinstance(x, (tuple, list)):
|
|
for i in range(min(len(x), len(y))):
|
|
outcome = fcmp(x[i], y[i])
|
|
if outcome != 0:
|
|
return outcome
|
|
return (len(x) > len(y)) - (len(x) < len(y))
|
|
return (x > y) - (x < y)
|
|
|
|
# decorator for skipping tests on non-IEEE 754 platforms
|
|
requires_IEEE_754 = unittest.skipUnless(
|
|
float.__getformat__("double").startswith("IEEE"),
|
|
"test requires IEEE 754 doubles")
|
|
|
|
is_jython = sys.platform.startswith('java')
|
|
|
|
# Filename used for testing
|
|
if os.name == 'java':
|
|
# Jython disallows @ in module names
|
|
TESTFN = '$test'
|
|
else:
|
|
TESTFN = '@test'
|
|
|
|
# Disambiguate TESTFN for parallel testing, while letting it remain a valid
|
|
# module name.
|
|
TESTFN = "{}_{}_tmp".format(TESTFN, os.getpid())
|
|
|
|
|
|
# TESTFN_UNICODE is a non-ascii filename
|
|
TESTFN_UNICODE = TESTFN + "-\xe0\xf2\u0258\u0141\u011f"
|
|
if sys.platform == 'darwin':
|
|
# In Mac OS X's VFS API file names are, by definition, canonically
|
|
# decomposed Unicode, encoded using UTF-8. See QA1173:
|
|
# http://developer.apple.com/mac/library/qa/qa2001/qa1173.html
|
|
import unicodedata
|
|
TESTFN_UNICODE = unicodedata.normalize('NFD', TESTFN_UNICODE)
|
|
TESTFN_ENCODING = sys.getfilesystemencoding()
|
|
|
|
# TESTFN_UNENCODABLE is a filename (str type) that should *not* be able to be
|
|
# encoded by the filesystem encoding (in strict mode). It can be None if we
|
|
# cannot generate such filename.
|
|
TESTFN_UNENCODABLE = None
|
|
if os.name in ('nt', 'ce'):
|
|
# skip win32s (0) or Windows 9x/ME (1)
|
|
if sys.getwindowsversion().platform >= 2:
|
|
# Different kinds of characters from various languages to minimize the
|
|
# probability that the whole name is encodable to MBCS (issue #9819)
|
|
TESTFN_UNENCODABLE = TESTFN + "-\u5171\u0141\u2661\u0363\uDC80"
|
|
try:
|
|
TESTFN_UNENCODABLE.encode(TESTFN_ENCODING)
|
|
except UnicodeEncodeError:
|
|
pass
|
|
else:
|
|
print('WARNING: The filename %r CAN be encoded by the filesystem encoding (%s). '
|
|
'Unicode filename tests may not be effective'
|
|
% (TESTFN_UNENCODABLE, TESTFN_ENCODING))
|
|
TESTFN_UNENCODABLE = None
|
|
# Mac OS X denies unencodable filenames (invalid utf-8)
|
|
elif sys.platform != 'darwin':
|
|
try:
|
|
# ascii and utf-8 cannot encode the byte 0xff
|
|
b'\xff'.decode(TESTFN_ENCODING)
|
|
except UnicodeDecodeError:
|
|
# 0xff will be encoded using the surrogate character u+DCFF
|
|
TESTFN_UNENCODABLE = TESTFN \
|
|
+ b'-\xff'.decode(TESTFN_ENCODING, 'surrogateescape')
|
|
else:
|
|
# File system encoding (eg. ISO-8859-* encodings) can encode
|
|
# the byte 0xff. Skip some unicode filename tests.
|
|
pass
|
|
|
|
# Save the initial cwd
|
|
SAVEDCWD = os.getcwd()
|
|
|
|
@contextlib.contextmanager
|
|
def temp_cwd(name='tempcwd', quiet=False, path=None):
|
|
"""
|
|
Context manager that temporarily changes the CWD.
|
|
|
|
An existing path may be provided as *path*, in which case this
|
|
function makes no changes to the file system.
|
|
|
|
Otherwise, the new CWD is created in the current directory and it's
|
|
named *name*. If *quiet* is False (default) and it's not possible to
|
|
create or change the CWD, an error is raised. If it's True, only a
|
|
warning is raised and the original CWD is used.
|
|
"""
|
|
saved_dir = os.getcwd()
|
|
is_temporary = False
|
|
if path is None:
|
|
path = name
|
|
try:
|
|
os.mkdir(name)
|
|
is_temporary = True
|
|
except OSError:
|
|
if not quiet:
|
|
raise
|
|
warnings.warn('tests may fail, unable to create temp CWD ' + name,
|
|
RuntimeWarning, stacklevel=3)
|
|
try:
|
|
os.chdir(path)
|
|
except OSError:
|
|
if not quiet:
|
|
raise
|
|
warnings.warn('tests may fail, unable to change the CWD to ' + name,
|
|
RuntimeWarning, stacklevel=3)
|
|
try:
|
|
yield os.getcwd()
|
|
finally:
|
|
os.chdir(saved_dir)
|
|
if is_temporary:
|
|
rmtree(name)
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def temp_umask(umask):
|
|
"""Context manager that temporarily sets the process umask."""
|
|
oldmask = os.umask(umask)
|
|
try:
|
|
yield
|
|
finally:
|
|
os.umask(oldmask)
|
|
|
|
|
|
def findfile(file, here=__file__, subdir=None):
|
|
"""Try to find a file on sys.path and the working directory. If it is not
|
|
found the argument passed to the function is returned (this does not
|
|
necessarily signal failure; could still be the legitimate path)."""
|
|
if os.path.isabs(file):
|
|
return file
|
|
if subdir is not None:
|
|
file = os.path.join(subdir, file)
|
|
path = sys.path
|
|
path = [os.path.dirname(here)] + path
|
|
for dn in path:
|
|
fn = os.path.join(dn, file)
|
|
if os.path.exists(fn): return fn
|
|
return file
|
|
|
|
def sortdict(dict):
|
|
"Like repr(dict), but in sorted order."
|
|
items = sorted(dict.items())
|
|
reprpairs = ["%r: %r" % pair for pair in items]
|
|
withcommas = ", ".join(reprpairs)
|
|
return "{%s}" % withcommas
|
|
|
|
def make_bad_fd():
|
|
"""
|
|
Create an invalid file descriptor by opening and closing a file and return
|
|
its fd.
|
|
"""
|
|
file = open(TESTFN, "wb")
|
|
try:
|
|
return file.fileno()
|
|
finally:
|
|
file.close()
|
|
unlink(TESTFN)
|
|
|
|
def check_syntax_error(testcase, statement):
|
|
testcase.assertRaises(SyntaxError, compile, statement,
|
|
'<test string>', 'exec')
|
|
|
|
def open_urlresource(url, *args, **kw):
|
|
import urllib.request, urllib.parse
|
|
|
|
check = kw.pop('check', None)
|
|
|
|
filename = urllib.parse.urlparse(url)[2].split('/')[-1] # '/': it's URL!
|
|
|
|
fn = os.path.join(os.path.dirname(__file__), "data", filename)
|
|
|
|
def check_valid_file(fn):
|
|
f = open(fn, *args, **kw)
|
|
if check is None:
|
|
return f
|
|
elif check(f):
|
|
f.seek(0)
|
|
return f
|
|
f.close()
|
|
|
|
if os.path.exists(fn):
|
|
f = check_valid_file(fn)
|
|
if f is not None:
|
|
return f
|
|
unlink(fn)
|
|
|
|
# Verify the requirement before downloading the file
|
|
requires('urlfetch')
|
|
|
|
print('\tfetching %s ...' % url, file=get_original_stdout())
|
|
f = urllib.request.urlopen(url, timeout=15)
|
|
try:
|
|
with open(fn, "wb") as out:
|
|
s = f.read()
|
|
while s:
|
|
out.write(s)
|
|
s = f.read()
|
|
finally:
|
|
f.close()
|
|
|
|
f = check_valid_file(fn)
|
|
if f is not None:
|
|
return f
|
|
raise TestFailed('invalid resource "%s"' % fn)
|
|
|
|
|
|
class WarningsRecorder(object):
|
|
"""Convenience wrapper for the warnings list returned on
|
|
entry to the warnings.catch_warnings() context manager.
|
|
"""
|
|
def __init__(self, warnings_list):
|
|
self._warnings = warnings_list
|
|
self._last = 0
|
|
|
|
def __getattr__(self, attr):
|
|
if len(self._warnings) > self._last:
|
|
return getattr(self._warnings[-1], attr)
|
|
elif attr in warnings.WarningMessage._WARNING_DETAILS:
|
|
return None
|
|
raise AttributeError("%r has no attribute %r" % (self, attr))
|
|
|
|
@property
|
|
def warnings(self):
|
|
return self._warnings[self._last:]
|
|
|
|
def reset(self):
|
|
self._last = len(self._warnings)
|
|
|
|
|
|
def _filterwarnings(filters, quiet=False):
|
|
"""Catch the warnings, then check if all the expected
|
|
warnings have been raised and re-raise unexpected warnings.
|
|
If 'quiet' is True, only re-raise the unexpected warnings.
|
|
"""
|
|
# Clear the warning registry of the calling module
|
|
# in order to re-raise the warnings.
|
|
frame = sys._getframe(2)
|
|
registry = frame.f_globals.get('__warningregistry__')
|
|
if registry:
|
|
registry.clear()
|
|
with warnings.catch_warnings(record=True) as w:
|
|
# Set filter "always" to record all warnings. Because
|
|
# test_warnings swap the module, we need to look up in
|
|
# the sys.modules dictionary.
|
|
sys.modules['warnings'].simplefilter("always")
|
|
yield WarningsRecorder(w)
|
|
# Filter the recorded warnings
|
|
reraise = list(w)
|
|
missing = []
|
|
for msg, cat in filters:
|
|
seen = False
|
|
for w in reraise[:]:
|
|
warning = w.message
|
|
# Filter out the matching messages
|
|
if (re.match(msg, str(warning), re.I) and
|
|
issubclass(warning.__class__, cat)):
|
|
seen = True
|
|
reraise.remove(w)
|
|
if not seen and not quiet:
|
|
# This filter caught nothing
|
|
missing.append((msg, cat.__name__))
|
|
if reraise:
|
|
raise AssertionError("unhandled warning %s" % reraise[0])
|
|
if missing:
|
|
raise AssertionError("filter (%r, %s) did not catch any warning" %
|
|
missing[0])
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def check_warnings(*filters, **kwargs):
|
|
"""Context manager to silence warnings.
|
|
|
|
Accept 2-tuples as positional arguments:
|
|
("message regexp", WarningCategory)
|
|
|
|
Optional argument:
|
|
- if 'quiet' is True, it does not fail if a filter catches nothing
|
|
(default True without argument,
|
|
default False if some filters are defined)
|
|
|
|
Without argument, it defaults to:
|
|
check_warnings(("", Warning), quiet=True)
|
|
"""
|
|
quiet = kwargs.get('quiet')
|
|
if not filters:
|
|
filters = (("", Warning),)
|
|
# Preserve backward compatibility
|
|
if quiet is None:
|
|
quiet = True
|
|
return _filterwarnings(filters, quiet)
|
|
|
|
|
|
class CleanImport(object):
|
|
"""Context manager to force import to return a new module reference.
|
|
|
|
This is useful for testing module-level behaviours, such as
|
|
the emission of a DeprecationWarning on import.
|
|
|
|
Use like this:
|
|
|
|
with CleanImport("foo"):
|
|
importlib.import_module("foo") # new reference
|
|
"""
|
|
|
|
def __init__(self, *module_names):
|
|
self.original_modules = sys.modules.copy()
|
|
for module_name in module_names:
|
|
if module_name in sys.modules:
|
|
module = sys.modules[module_name]
|
|
# It is possible that module_name is just an alias for
|
|
# another module (e.g. stub for modules renamed in 3.x).
|
|
# In that case, we also need delete the real module to clear
|
|
# the import cache.
|
|
if module.__name__ != module_name:
|
|
del sys.modules[module.__name__]
|
|
del sys.modules[module_name]
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, *ignore_exc):
|
|
sys.modules.update(self.original_modules)
|
|
|
|
|
|
class EnvironmentVarGuard(collections.MutableMapping):
|
|
|
|
"""Class to help protect the environment variable properly. Can be used as
|
|
a context manager."""
|
|
|
|
def __init__(self):
|
|
self._environ = os.environ
|
|
self._changed = {}
|
|
|
|
def __getitem__(self, envvar):
|
|
return self._environ[envvar]
|
|
|
|
def __setitem__(self, envvar, value):
|
|
# Remember the initial value on the first access
|
|
if envvar not in self._changed:
|
|
self._changed[envvar] = self._environ.get(envvar)
|
|
self._environ[envvar] = value
|
|
|
|
def __delitem__(self, envvar):
|
|
# Remember the initial value on the first access
|
|
if envvar not in self._changed:
|
|
self._changed[envvar] = self._environ.get(envvar)
|
|
if envvar in self._environ:
|
|
del self._environ[envvar]
|
|
|
|
def keys(self):
|
|
return self._environ.keys()
|
|
|
|
def __iter__(self):
|
|
return iter(self._environ)
|
|
|
|
def __len__(self):
|
|
return len(self._environ)
|
|
|
|
def set(self, envvar, value):
|
|
self[envvar] = value
|
|
|
|
def unset(self, envvar):
|
|
del self[envvar]
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, *ignore_exc):
|
|
for (k, v) in self._changed.items():
|
|
if v is None:
|
|
if k in self._environ:
|
|
del self._environ[k]
|
|
else:
|
|
self._environ[k] = v
|
|
os.environ = self._environ
|
|
|
|
|
|
class DirsOnSysPath(object):
|
|
"""Context manager to temporarily add directories to sys.path.
|
|
|
|
This makes a copy of sys.path, appends any directories given
|
|
as positional arguments, then reverts sys.path to the copied
|
|
settings when the context ends.
|
|
|
|
Note that *all* sys.path modifications in the body of the
|
|
context manager, including replacement of the object,
|
|
will be reverted at the end of the block.
|
|
"""
|
|
|
|
def __init__(self, *paths):
|
|
self.original_value = sys.path[:]
|
|
self.original_object = sys.path
|
|
sys.path.extend(paths)
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, *ignore_exc):
|
|
sys.path = self.original_object
|
|
sys.path[:] = self.original_value
|
|
|
|
|
|
class TransientResource(object):
|
|
|
|
"""Raise ResourceDenied if an exception is raised while the context manager
|
|
is in effect that matches the specified exception and attributes."""
|
|
|
|
def __init__(self, exc, **kwargs):
|
|
self.exc = exc
|
|
self.attrs = kwargs
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, type_=None, value=None, traceback=None):
|
|
"""If type_ is a subclass of self.exc and value has attributes matching
|
|
self.attrs, raise ResourceDenied. Otherwise let the exception
|
|
propagate (if any)."""
|
|
if type_ is not None and issubclass(self.exc, type_):
|
|
for attr, attr_value in self.attrs.items():
|
|
if not hasattr(value, attr):
|
|
break
|
|
if getattr(value, attr) != attr_value:
|
|
break
|
|
else:
|
|
raise ResourceDenied("an optional resource is not available")
|
|
|
|
# Context managers that raise ResourceDenied when various issues
|
|
# with the Internet connection manifest themselves as exceptions.
|
|
# XXX deprecate these and use transient_internet() instead
|
|
time_out = TransientResource(IOError, errno=errno.ETIMEDOUT)
|
|
socket_peer_reset = TransientResource(socket.error, errno=errno.ECONNRESET)
|
|
ioerror_peer_reset = TransientResource(IOError, errno=errno.ECONNRESET)
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def transient_internet(resource_name, *, timeout=30.0, errnos=()):
|
|
"""Return a context manager that raises ResourceDenied when various issues
|
|
with the Internet connection manifest themselves as exceptions."""
|
|
default_errnos = [
|
|
('ECONNREFUSED', 111),
|
|
('ECONNRESET', 104),
|
|
('EHOSTUNREACH', 113),
|
|
('ENETUNREACH', 101),
|
|
('ETIMEDOUT', 110),
|
|
]
|
|
default_gai_errnos = [
|
|
('EAI_NONAME', -2),
|
|
('EAI_NODATA', -5),
|
|
]
|
|
|
|
denied = ResourceDenied("Resource '%s' is not available" % resource_name)
|
|
captured_errnos = errnos
|
|
gai_errnos = []
|
|
if not captured_errnos:
|
|
captured_errnos = [getattr(errno, name, num)
|
|
for (name, num) in default_errnos]
|
|
gai_errnos = [getattr(socket, name, num)
|
|
for (name, num) in default_gai_errnos]
|
|
|
|
def filter_error(err):
|
|
n = getattr(err, 'errno', None)
|
|
if (isinstance(err, socket.timeout) or
|
|
(isinstance(err, socket.gaierror) and n in gai_errnos) or
|
|
n in captured_errnos):
|
|
if not verbose:
|
|
sys.stderr.write(denied.args[0] + "\n")
|
|
raise denied from err
|
|
|
|
old_timeout = socket.getdefaulttimeout()
|
|
try:
|
|
if timeout is not None:
|
|
socket.setdefaulttimeout(timeout)
|
|
yield
|
|
except IOError as err:
|
|
# urllib can wrap original socket errors multiple times (!), we must
|
|
# unwrap to get at the original error.
|
|
while True:
|
|
a = err.args
|
|
if len(a) >= 1 and isinstance(a[0], IOError):
|
|
err = a[0]
|
|
# The error can also be wrapped as args[1]:
|
|
# except socket.error as msg:
|
|
# raise IOError('socket error', msg).with_traceback(sys.exc_info()[2])
|
|
elif len(a) >= 2 and isinstance(a[1], IOError):
|
|
err = a[1]
|
|
else:
|
|
break
|
|
filter_error(err)
|
|
raise
|
|
# XXX should we catch generic exceptions and look for their
|
|
# __cause__ or __context__?
|
|
finally:
|
|
socket.setdefaulttimeout(old_timeout)
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def captured_output(stream_name):
|
|
"""Run the 'with' statement body using a StringIO object in place of a
|
|
specific attribute on the sys module.
|
|
Example use (with 'stream_name=stdout')::
|
|
|
|
with captured_stdout() as s:
|
|
print("hello")
|
|
assert s.getvalue() == "hello"
|
|
"""
|
|
import io
|
|
orig_stdout = getattr(sys, stream_name)
|
|
setattr(sys, stream_name, io.StringIO())
|
|
try:
|
|
yield getattr(sys, stream_name)
|
|
finally:
|
|
setattr(sys, stream_name, orig_stdout)
|
|
|
|
def captured_stdout():
|
|
return captured_output("stdout")
|
|
|
|
def captured_stderr():
|
|
return captured_output("stderr")
|
|
|
|
def captured_stdin():
|
|
return captured_output("stdin")
|
|
|
|
def gc_collect():
|
|
"""Force as many objects as possible to be collected.
|
|
|
|
In non-CPython implementations of Python, this is needed because timely
|
|
deallocation is not guaranteed by the garbage collector. (Even in CPython
|
|
this can be the case in case of reference cycles.) This means that __del__
|
|
methods may be called later than expected and weakrefs may remain alive for
|
|
longer than expected. This function tries its best to force all garbage
|
|
objects to disappear.
|
|
"""
|
|
gc.collect()
|
|
if is_jython:
|
|
time.sleep(0.1)
|
|
gc.collect()
|
|
gc.collect()
|
|
|
|
|
|
def python_is_optimized():
|
|
"""Find if Python was built with optimizations."""
|
|
cflags = sysconfig.get_config_var('PY_CFLAGS') or ''
|
|
final_opt = ""
|
|
for opt in cflags.split():
|
|
if opt.startswith('-O'):
|
|
final_opt = opt
|
|
return final_opt and final_opt != '-O0'
|
|
|
|
|
|
#=======================================================================
|
|
# Decorator for running a function in a different locale, correctly resetting
|
|
# it afterwards.
|
|
|
|
def run_with_locale(catstr, *locales):
|
|
def decorator(func):
|
|
def inner(*args, **kwds):
|
|
try:
|
|
import locale
|
|
category = getattr(locale, catstr)
|
|
orig_locale = locale.setlocale(category)
|
|
except AttributeError:
|
|
# if the test author gives us an invalid category string
|
|
raise
|
|
except:
|
|
# cannot retrieve original locale, so do nothing
|
|
locale = orig_locale = None
|
|
else:
|
|
for loc in locales:
|
|
try:
|
|
locale.setlocale(category, loc)
|
|
break
|
|
except:
|
|
pass
|
|
|
|
# now run the function, resetting the locale on exceptions
|
|
try:
|
|
return func(*args, **kwds)
|
|
finally:
|
|
if locale and orig_locale:
|
|
locale.setlocale(category, orig_locale)
|
|
inner.__name__ = func.__name__
|
|
inner.__doc__ = func.__doc__
|
|
return inner
|
|
return decorator
|
|
|
|
#=======================================================================
|
|
# Big-memory-test support. Separate from 'resources' because memory use
|
|
# should be configurable.
|
|
|
|
# Some handy shorthands. Note that these are used for byte-limits as well
|
|
# as size-limits, in the various bigmem tests
|
|
_1M = 1024*1024
|
|
_1G = 1024 * _1M
|
|
_2G = 2 * _1G
|
|
_4G = 4 * _1G
|
|
|
|
MAX_Py_ssize_t = sys.maxsize
|
|
|
|
def set_memlimit(limit):
|
|
global max_memuse
|
|
global real_max_memuse
|
|
sizes = {
|
|
'k': 1024,
|
|
'm': _1M,
|
|
'g': _1G,
|
|
't': 1024*_1G,
|
|
}
|
|
m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit,
|
|
re.IGNORECASE | re.VERBOSE)
|
|
if m is None:
|
|
raise ValueError('Invalid memory limit %r' % (limit,))
|
|
memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()])
|
|
real_max_memuse = memlimit
|
|
if memlimit > MAX_Py_ssize_t:
|
|
memlimit = MAX_Py_ssize_t
|
|
if memlimit < _2G - 1:
|
|
raise ValueError('Memory limit %r too low to be useful' % (limit,))
|
|
max_memuse = memlimit
|
|
|
|
def bigmemtest(minsize, memuse):
|
|
"""Decorator for bigmem tests.
|
|
|
|
'minsize' is the minimum useful size for the test (in arbitrary,
|
|
test-interpreted units.) 'memuse' is the number of 'bytes per size' for
|
|
the test, or a good estimate of it.
|
|
|
|
The decorator tries to guess a good value for 'size' and passes it to
|
|
the decorated test function. If minsize * memuse is more than the
|
|
allowed memory use (as defined by max_memuse), the test is skipped.
|
|
Otherwise, minsize is adjusted upward to use up to max_memuse.
|
|
"""
|
|
def decorator(f):
|
|
def wrapper(self):
|
|
# Retrieve values in case someone decided to adjust them
|
|
minsize = wrapper.minsize
|
|
memuse = wrapper.memuse
|
|
if not max_memuse:
|
|
# If max_memuse is 0 (the default),
|
|
# we still want to run the tests with size set to a few kb,
|
|
# to make sure they work. We still want to avoid using
|
|
# too much memory, though, but we do that noisily.
|
|
maxsize = 5147
|
|
self.assertFalse(maxsize * memuse > 20 * _1M)
|
|
else:
|
|
maxsize = int(max_memuse / memuse)
|
|
if maxsize < minsize:
|
|
raise unittest.SkipTest(
|
|
"not enough memory: %.1fG minimum needed"
|
|
% (minsize * memuse / (1024 ** 3)))
|
|
return f(self, maxsize)
|
|
wrapper.minsize = minsize
|
|
wrapper.memuse = memuse
|
|
return wrapper
|
|
return decorator
|
|
|
|
def precisionbigmemtest(size, memuse):
|
|
def decorator(f):
|
|
def wrapper(self):
|
|
size = wrapper.size
|
|
memuse = wrapper.memuse
|
|
if not real_max_memuse:
|
|
maxsize = 5147
|
|
else:
|
|
maxsize = size
|
|
|
|
if real_max_memuse and real_max_memuse < maxsize * memuse:
|
|
raise unittest.SkipTest(
|
|
"not enough memory: %.1fG minimum needed"
|
|
% (size * memuse / (1024 ** 3)))
|
|
|
|
return f(self, maxsize)
|
|
wrapper.size = size
|
|
wrapper.memuse = memuse
|
|
return wrapper
|
|
return decorator
|
|
|
|
def bigaddrspacetest(f):
|
|
"""Decorator for tests that fill the address space."""
|
|
def wrapper(self):
|
|
if max_memuse < MAX_Py_ssize_t:
|
|
if MAX_Py_ssize_t >= 2**63 - 1 and max_memuse >= 2**31:
|
|
raise unittest.SkipTest(
|
|
"not enough memory: try a 32-bit build instead")
|
|
else:
|
|
raise unittest.SkipTest(
|
|
"not enough memory: %.1fG minimum needed"
|
|
% (MAX_Py_ssize_t / (1024 ** 3)))
|
|
else:
|
|
return f(self)
|
|
return wrapper
|
|
|
|
#=======================================================================
|
|
# unittest integration.
|
|
|
|
class BasicTestRunner:
|
|
def run(self, test):
|
|
result = unittest.TestResult()
|
|
test(result)
|
|
return result
|
|
|
|
def _id(obj):
|
|
return obj
|
|
|
|
def requires_resource(resource):
|
|
if resource == 'gui' and not _is_gui_available():
|
|
return unittest.skip("resource 'gui' is not available")
|
|
if is_resource_enabled(resource):
|
|
return _id
|
|
else:
|
|
return unittest.skip("resource {0!r} is not enabled".format(resource))
|
|
|
|
def cpython_only(test):
|
|
"""
|
|
Decorator for tests only applicable on CPython.
|
|
"""
|
|
return impl_detail(cpython=True)(test)
|
|
|
|
def impl_detail(msg=None, **guards):
|
|
if check_impl_detail(**guards):
|
|
return _id
|
|
if msg is None:
|
|
guardnames, default = _parse_guards(guards)
|
|
if default:
|
|
msg = "implementation detail not available on {0}"
|
|
else:
|
|
msg = "implementation detail specific to {0}"
|
|
guardnames = sorted(guardnames.keys())
|
|
msg = msg.format(' or '.join(guardnames))
|
|
return unittest.skip(msg)
|
|
|
|
def _parse_guards(guards):
|
|
# Returns a tuple ({platform_name: run_me}, default_value)
|
|
if not guards:
|
|
return ({'cpython': True}, False)
|
|
is_true = list(guards.values())[0]
|
|
assert list(guards.values()) == [is_true] * len(guards) # all True or all False
|
|
return (guards, not is_true)
|
|
|
|
# Use the following check to guard CPython's implementation-specific tests --
|
|
# or to run them only on the implementation(s) guarded by the arguments.
|
|
def check_impl_detail(**guards):
|
|
"""This function returns True or False depending on the host platform.
|
|
Examples:
|
|
if check_impl_detail(): # only on CPython (default)
|
|
if check_impl_detail(jython=True): # only on Jython
|
|
if check_impl_detail(cpython=False): # everywhere except on CPython
|
|
"""
|
|
guards, default = _parse_guards(guards)
|
|
return guards.get(platform.python_implementation().lower(), default)
|
|
|
|
|
|
|
|
def _run_suite(suite):
|
|
"""Run tests from a unittest.TestSuite-derived class."""
|
|
if verbose:
|
|
runner = unittest.TextTestRunner(sys.stdout, verbosity=2)
|
|
else:
|
|
runner = BasicTestRunner()
|
|
|
|
result = runner.run(suite)
|
|
if not result.wasSuccessful():
|
|
if len(result.errors) == 1 and not result.failures:
|
|
err = result.errors[0][1]
|
|
elif len(result.failures) == 1 and not result.errors:
|
|
err = result.failures[0][1]
|
|
else:
|
|
err = "multiple errors occurred"
|
|
if not verbose: err += "; run in verbose mode for details"
|
|
raise TestFailed(err)
|
|
|
|
|
|
def run_unittest(*classes):
|
|
"""Run tests from unittest.TestCase-derived classes."""
|
|
valid_types = (unittest.TestSuite, unittest.TestCase)
|
|
suite = unittest.TestSuite()
|
|
for cls in classes:
|
|
if isinstance(cls, str):
|
|
if cls in sys.modules:
|
|
suite.addTest(unittest.findTestCases(sys.modules[cls]))
|
|
else:
|
|
raise ValueError("str arguments must be keys in sys.modules")
|
|
elif isinstance(cls, valid_types):
|
|
suite.addTest(cls)
|
|
else:
|
|
suite.addTest(unittest.makeSuite(cls))
|
|
_run_suite(suite)
|
|
|
|
|
|
#=======================================================================
|
|
# doctest driver.
|
|
|
|
def run_doctest(module, verbosity=None):
|
|
"""Run doctest on the given module. Return (#failures, #tests).
|
|
|
|
If optional argument verbosity is not specified (or is None), pass
|
|
support's belief about verbosity on to doctest. Else doctest's
|
|
usual behavior is used (it searches sys.argv for -v).
|
|
"""
|
|
|
|
import doctest
|
|
|
|
if verbosity is None:
|
|
verbosity = verbose
|
|
else:
|
|
verbosity = None
|
|
|
|
# Direct doctest output (normally just errors) to real stdout; doctest
|
|
# output shouldn't be compared by regrtest.
|
|
save_stdout = sys.stdout
|
|
sys.stdout = get_original_stdout()
|
|
try:
|
|
f, t = doctest.testmod(module, verbose=verbosity)
|
|
if f:
|
|
raise TestFailed("%d of %d doctests failed" % (f, t))
|
|
finally:
|
|
sys.stdout = save_stdout
|
|
if verbose:
|
|
print('doctest (%s) ... %d tests with zero failures' %
|
|
(module.__name__, t))
|
|
return f, t
|
|
|
|
|
|
#=======================================================================
|
|
# Support for saving and restoring the imported modules.
|
|
|
|
def modules_setup():
|
|
return sys.modules.copy(),
|
|
|
|
def modules_cleanup(oldmodules):
|
|
# Encoders/decoders are registered permanently within the internal
|
|
# codec cache. If we destroy the corresponding modules their
|
|
# globals will be set to None which will trip up the cached functions.
|
|
encodings = [(k, v) for k, v in sys.modules.items()
|
|
if k.startswith('encodings.')]
|
|
sys.modules.clear()
|
|
sys.modules.update(encodings)
|
|
# XXX: This kind of problem can affect more than just encodings. In particular
|
|
# extension modules (such as _ssl) don't cope with reloading properly.
|
|
# Really, test modules should be cleaning out the test specific modules they
|
|
# know they added (ala test_runpy) rather than relying on this function (as
|
|
# test_importhooks and test_pkg do currently).
|
|
# Implicitly imported *real* modules should be left alone (see issue 10556).
|
|
sys.modules.update(oldmodules)
|
|
|
|
#=======================================================================
|
|
# Threading support to prevent reporting refleaks when running regrtest.py -R
|
|
|
|
# NOTE: we use thread._count() rather than threading.enumerate() (or the
|
|
# moral equivalent thereof) because a threading.Thread object is still alive
|
|
# until its __bootstrap() method has returned, even after it has been
|
|
# unregistered from the threading module.
|
|
# thread._count(), on the other hand, only gets decremented *after* the
|
|
# __bootstrap() method has returned, which gives us reliable reference counts
|
|
# at the end of a test run.
|
|
|
|
def threading_setup():
|
|
if _thread:
|
|
return _thread._count(),
|
|
else:
|
|
return 1,
|
|
|
|
def threading_cleanup(nb_threads):
|
|
if not _thread:
|
|
return
|
|
_MAX_COUNT = 10
|
|
for count in range(_MAX_COUNT):
|
|
n = _thread._count()
|
|
if n == nb_threads:
|
|
break
|
|
time.sleep(0.1)
|
|
# XXX print a warning in case of failure?
|
|
|
|
def reap_threads(func):
|
|
"""Use this function when threads are being used. This will
|
|
ensure that the threads are cleaned up even when the test fails.
|
|
If threading is unavailable this function does nothing.
|
|
"""
|
|
if not _thread:
|
|
return func
|
|
|
|
@functools.wraps(func)
|
|
def decorator(*args):
|
|
key = threading_setup()
|
|
try:
|
|
return func(*args)
|
|
finally:
|
|
threading_cleanup(*key)
|
|
return decorator
|
|
|
|
def reap_children():
|
|
"""Use this function at the end of test_main() whenever sub-processes
|
|
are started. This will help ensure that no extra children (zombies)
|
|
stick around to hog resources and create problems when looking
|
|
for refleaks.
|
|
"""
|
|
|
|
# Reap all our dead child processes so we don't leave zombies around.
|
|
# These hog resources and might be causing some of the buildbots to die.
|
|
if hasattr(os, 'waitpid'):
|
|
any_process = -1
|
|
while True:
|
|
try:
|
|
# This will raise an exception on Windows. That's ok.
|
|
pid, status = os.waitpid(any_process, os.WNOHANG)
|
|
if pid == 0:
|
|
break
|
|
except:
|
|
break
|
|
|
|
@contextlib.contextmanager
|
|
def swap_attr(obj, attr, new_val):
|
|
"""Temporary swap out an attribute with a new object.
|
|
|
|
Usage:
|
|
with swap_attr(obj, "attr", 5):
|
|
...
|
|
|
|
This will set obj.attr to 5 for the duration of the with: block,
|
|
restoring the old value at the end of the block. If `attr` doesn't
|
|
exist on `obj`, it will be created and then deleted at the end of the
|
|
block.
|
|
"""
|
|
if hasattr(obj, attr):
|
|
real_val = getattr(obj, attr)
|
|
setattr(obj, attr, new_val)
|
|
try:
|
|
yield
|
|
finally:
|
|
setattr(obj, attr, real_val)
|
|
else:
|
|
setattr(obj, attr, new_val)
|
|
try:
|
|
yield
|
|
finally:
|
|
delattr(obj, attr)
|
|
|
|
@contextlib.contextmanager
|
|
def swap_item(obj, item, new_val):
|
|
"""Temporary swap out an item with a new object.
|
|
|
|
Usage:
|
|
with swap_item(obj, "item", 5):
|
|
...
|
|
|
|
This will set obj["item"] to 5 for the duration of the with: block,
|
|
restoring the old value at the end of the block. If `item` doesn't
|
|
exist on `obj`, it will be created and then deleted at the end of the
|
|
block.
|
|
"""
|
|
if item in obj:
|
|
real_val = obj[item]
|
|
obj[item] = new_val
|
|
try:
|
|
yield
|
|
finally:
|
|
obj[item] = real_val
|
|
else:
|
|
obj[item] = new_val
|
|
try:
|
|
yield
|
|
finally:
|
|
del obj[item]
|
|
|
|
def strip_python_stderr(stderr):
|
|
"""Strip the stderr of a Python process from potential debug output
|
|
emitted by the interpreter.
|
|
|
|
This will typically be run on the result of the communicate() method
|
|
of a subprocess.Popen object.
|
|
"""
|
|
stderr = re.sub(br"\[\d+ refs\]\r?\n?$", b"", stderr).strip()
|
|
return stderr
|
|
|
|
def args_from_interpreter_flags():
|
|
"""Return a list of command-line arguments reproducing the current
|
|
settings in sys.flags."""
|
|
flag_opt_map = {
|
|
'bytes_warning': 'b',
|
|
'dont_write_bytecode': 'B',
|
|
'ignore_environment': 'E',
|
|
'no_user_site': 's',
|
|
'no_site': 'S',
|
|
'optimize': 'O',
|
|
'verbose': 'v',
|
|
}
|
|
args = []
|
|
for flag, opt in flag_opt_map.items():
|
|
v = getattr(sys.flags, flag)
|
|
if v > 0:
|
|
args.append('-' + opt * v)
|
|
return args
|
|
|
|
#============================================================
|
|
# Support for assertions about logging.
|
|
#============================================================
|
|
|
|
class TestHandler(logging.handlers.BufferingHandler):
|
|
def __init__(self, matcher):
|
|
# BufferingHandler takes a "capacity" argument
|
|
# so as to know when to flush. As we're overriding
|
|
# shouldFlush anyway, we can set a capacity of zero.
|
|
# You can call flush() manually to clear out the
|
|
# buffer.
|
|
logging.handlers.BufferingHandler.__init__(self, 0)
|
|
self.matcher = matcher
|
|
|
|
def shouldFlush(self):
|
|
return False
|
|
|
|
def emit(self, record):
|
|
self.format(record)
|
|
self.buffer.append(record.__dict__)
|
|
|
|
def matches(self, **kwargs):
|
|
"""
|
|
Look for a saved dict whose keys/values match the supplied arguments.
|
|
"""
|
|
result = False
|
|
for d in self.buffer:
|
|
if self.matcher.matches(d, **kwargs):
|
|
result = True
|
|
break
|
|
return result
|
|
|
|
class Matcher(object):
|
|
|
|
_partial_matches = ('msg', 'message')
|
|
|
|
def matches(self, d, **kwargs):
|
|
"""
|
|
Try to match a single dict with the supplied arguments.
|
|
|
|
Keys whose values are strings and which are in self._partial_matches
|
|
will be checked for partial (i.e. substring) matches. You can extend
|
|
this scheme to (for example) do regular expression matching, etc.
|
|
"""
|
|
result = True
|
|
for k in kwargs:
|
|
v = kwargs[k]
|
|
dv = d.get(k)
|
|
if not self.match_value(k, dv, v):
|
|
result = False
|
|
break
|
|
return result
|
|
|
|
def match_value(self, k, dv, v):
|
|
"""
|
|
Try to match a single stored value (dv) with a supplied value (v).
|
|
"""
|
|
if type(v) != type(dv):
|
|
result = False
|
|
elif type(dv) is not str or k not in self._partial_matches:
|
|
result = (v == dv)
|
|
else:
|
|
result = dv.find(v) >= 0
|
|
return result
|
|
|
|
|
|
_can_symlink = None
|
|
def can_symlink():
|
|
global _can_symlink
|
|
if _can_symlink is not None:
|
|
return _can_symlink
|
|
try:
|
|
os.symlink(TESTFN, TESTFN + "can_symlink")
|
|
can = True
|
|
except (OSError, NotImplementedError):
|
|
can = False
|
|
_can_symlink = can
|
|
return can
|
|
|
|
def skip_unless_symlink(test):
|
|
"""Skip decorator for tests that require functional symlink"""
|
|
ok = can_symlink()
|
|
msg = "Requires functional symlink implementation"
|
|
return test if ok else unittest.skip(msg)(test)
|
|
|
|
def patch(test_instance, object_to_patch, attr_name, new_value):
|
|
"""Override 'object_to_patch'.'attr_name' with 'new_value'.
|
|
|
|
Also, add a cleanup procedure to 'test_instance' to restore
|
|
'object_to_patch' value for 'attr_name'.
|
|
The 'attr_name' should be a valid attribute for 'object_to_patch'.
|
|
|
|
"""
|
|
# check that 'attr_name' is a real attribute for 'object_to_patch'
|
|
# will raise AttributeError if it does not exist
|
|
getattr(object_to_patch, attr_name)
|
|
|
|
# keep a copy of the old value
|
|
attr_is_local = False
|
|
try:
|
|
old_value = object_to_patch.__dict__[attr_name]
|
|
except (AttributeError, KeyError):
|
|
old_value = getattr(object_to_patch, attr_name, None)
|
|
else:
|
|
attr_is_local = True
|
|
|
|
# restore the value when the test is done
|
|
def cleanup():
|
|
if attr_is_local:
|
|
setattr(object_to_patch, attr_name, old_value)
|
|
else:
|
|
delattr(object_to_patch, attr_name)
|
|
|
|
test_instance.addCleanup(cleanup)
|
|
|
|
# actually override the attribute
|
|
setattr(object_to_patch, attr_name, new_value)
|