bpo-40275: Add os_helper submodule in test.support (GH-20765)

This commit is contained in:
Hai Shi 2020-06-10 20:29:02 +08:00 committed by GitHub
parent 07d8112812
commit 0d00b2a5d7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 813 additions and 758 deletions

View File

@ -247,41 +247,6 @@ The :mod:`test.support` module defines the following constants:
Path for shell if not on Windows; otherwise ``None``. Path for shell if not on Windows; otherwise ``None``.
.. data:: FS_NONASCII
A non-ASCII character encodable by :func:`os.fsencode`.
.. data:: TESTFN
Set to a name that is safe to use as the name of a temporary file. Any
temporary file that is created should be closed and unlinked (removed).
.. data:: TESTFN_UNICODE
Set to a non-ASCII name for a temporary file.
.. data:: TESTFN_UNENCODABLE
Set to a filename (str type) that should not be able to be encoded by file
system encoding in strict mode. It may be ``None`` if it's not possible to
generate such a filename.
.. data:: TESTFN_UNDECODABLE
Set to a filename (bytes type) that should not be able to be decoded by
file system encoding in strict mode. It may be ``None`` if it's not
possible to generate such a filename.
.. data:: TESTFN_NONASCII
Set to a filename containing the :data:`FS_NONASCII` character.
.. data:: LOOPBACK_TIMEOUT .. data:: LOOPBACK_TIMEOUT
Timeout in seconds for tests using a network server listening on the network Timeout in seconds for tests using a network server listening on the network
@ -343,11 +308,6 @@ The :mod:`test.support` module defines the following constants:
:data:`SHORT_TIMEOUT`. :data:`SHORT_TIMEOUT`.
.. data:: SAVEDCWD
Set to :func:`os.getcwd`.
.. data:: PGO .. data:: PGO
Set when tests can be skipped when they are not useful for PGO. Set when tests can be skipped when they are not useful for PGO.
@ -449,25 +409,6 @@ The :mod:`test.support` module defines the following functions:
Delete *name* from ``sys.modules``. Delete *name* from ``sys.modules``.
.. function:: unlink(filename)
Call :func:`os.unlink` on *filename*. On Windows platforms, this is
wrapped with a wait loop that checks for the existence fo the file.
.. function:: rmdir(filename)
Call :func:`os.rmdir` on *filename*. On Windows platforms, this is
wrapped with a wait loop that checks for the existence of the file.
.. function:: rmtree(path)
Call :func:`shutil.rmtree` on *path* or call :func:`os.lstat` and
:func:`os.rmdir` to remove a path and its contents. On Windows platforms,
this is wrapped with a wait loop that checks for the existence of the files.
.. function:: make_legacy_pyc(source) .. function:: make_legacy_pyc(source)
Move a :pep:`3147`/:pep:`488` pyc file to its legacy pyc location and return the file Move a :pep:`3147`/:pep:`488` pyc file to its legacy pyc location and return the file
@ -521,16 +462,6 @@ The :mod:`test.support` module defines the following functions:
rather than looking directly in the path directories. rather than looking directly in the path directories.
.. function:: create_empty_file(filename)
Create an empty file with *filename*. If it already exists, truncate it.
.. function:: fd_count()
Count the number of open file descriptors.
.. function:: match_test(test) .. function:: match_test(test)
Match *test* to patterns set in :func:`set_match_tests`. Match *test* to patterns set in :func:`set_match_tests`.
@ -713,47 +644,6 @@ The :mod:`test.support` module defines the following functions:
self.assertEqual(captured, "hello") self.assertEqual(captured, "hello")
.. function:: temp_dir(path=None, quiet=False)
A context manager that creates a temporary directory at *path* and
yields the directory.
If *path* is ``None``, the temporary directory is created using
:func:`tempfile.mkdtemp`. If *quiet* is ``False``, the context manager
raises an exception on error. Otherwise, if *path* is specified and
cannot be created, only a warning is issued.
.. function:: change_cwd(path, quiet=False)
A context manager that temporarily changes the current working
directory to *path* and yields the directory.
If *quiet* is ``False``, the context manager raises an exception
on error. Otherwise, it issues only a warning and keeps the current
working directory the same.
.. function:: temp_cwd(name='tempcwd', quiet=False)
A context manager that temporarily creates a new directory and
changes the current working directory (CWD).
The context manager creates a temporary directory in the current
directory with name *name* before temporarily changing the current
working directory. If *name* is ``None``, the temporary directory is
created using :func:`tempfile.mkdtemp`.
If *quiet* is ``False`` and it is not possible to create or change
the CWD, an error is raised. Otherwise, only a warning is raised
and the original CWD is used.
.. function:: temp_umask(umask)
A context manager that temporarily sets the process umask.
.. function:: disable_faulthandler() .. function:: disable_faulthandler()
A context manager that replaces ``sys.stderr`` with ``sys.__stderr__``. A context manager that replaces ``sys.stderr`` with ``sys.__stderr__``.
@ -851,28 +741,6 @@ The :mod:`test.support` module defines the following functions:
header size equals *size*. header size equals *size*.
.. function:: can_symlink()
Return ``True`` if the OS supports symbolic links, ``False``
otherwise.
.. function:: can_xattr()
Return ``True`` if the OS supports xattr, ``False``
otherwise.
.. decorator:: skip_unless_symlink
A decorator for running tests that require support for symbolic links.
.. decorator:: skip_unless_xattr
A decorator for running tests that require support for xattr.
.. decorator:: anticipate_failure(condition) .. decorator:: anticipate_failure(condition)
A decorator to conditionally mark tests with A decorator to conditionally mark tests with
@ -992,12 +860,6 @@ The :mod:`test.support` module defines the following functions:
wrap. wrap.
.. function:: make_bad_fd()
Create an invalid file descriptor by opening and closing a temporary file,
and returning its descriptor.
.. function:: check_syntax_error(testcase, statement, errtext='', *, lineno=None, offset=None) .. function:: check_syntax_error(testcase, statement, errtext='', *, lineno=None, offset=None)
Test for syntax errors in *statement* by attempting to compile *statement*. Test for syntax errors in *statement* by attempting to compile *statement*.
@ -1144,11 +1006,6 @@ The :mod:`test.support` module defines the following functions:
return load_package_tests(os.path.dirname(__file__), *args) return load_package_tests(os.path.dirname(__file__), *args)
.. function:: fs_is_case_insensitive(directory)
Return ``True`` if the file system for *directory* is case-insensitive.
.. function:: detect_api_mismatch(ref_api, other_api, *, ignore=()) .. function:: detect_api_mismatch(ref_api, other_api, *, ignore=())
Returns the set of attributes, functions or methods of *ref_api* not Returns the set of attributes, functions or methods of *ref_api* not
@ -1241,28 +1098,6 @@ The :mod:`test.support` module defines the following classes:
attributes on the exception is :exc:`ResourceDenied` raised. attributes on the exception is :exc:`ResourceDenied` raised.
.. class:: EnvironmentVarGuard()
Class used to temporarily set or unset environment variables. Instances can
be used as a context manager and have a complete dictionary interface for
querying/modifying the underlying ``os.environ``. After exit from the
context manager all changes to environment variables done through this
instance will be rolled back.
.. versionchanged:: 3.1
Added dictionary interface.
.. method:: EnvironmentVarGuard.set(envvar, value)
Temporarily set the environment variable ``envvar`` to the value of
``value``.
.. method:: EnvironmentVarGuard.unset(envvar)
Temporarily unset the environment variable ``envvar``.
.. class:: SuppressCrashReport() .. class:: SuppressCrashReport()
A context manager used to try to prevent crash dialog popups on tests that A context manager used to try to prevent crash dialog popups on tests that
@ -1332,13 +1167,6 @@ The :mod:`test.support` module defines the following classes:
Run *test* and return the result. Run *test* and return the result.
.. class:: FakePath(path)
Simple :term:`path-like object`. It implements the :meth:`__fspath__`
method which just returns the *path* argument. If *path* is an exception,
it will be raised in :meth:`!__fspath__`.
:mod:`test.support.socket_helper` --- Utilities for socket tests :mod:`test.support.socket_helper` --- Utilities for socket tests
================================================================ ================================================================
@ -1634,3 +1462,187 @@ The :mod:`test.support.threading_helper` module provides support for threading t
# (to avoid reference cycles) # (to avoid reference cycles)
.. versionadded:: 3.8 .. versionadded:: 3.8
:mod:`test.support.os_helper` --- Utilities for os tests
========================================================================
.. module:: test.support.os_helper
:synopsis: Support for os tests.
The :mod:`test.support.os_helper` module provides support for os tests.
.. versionadded:: 3.10
.. data:: FS_NONASCII
A non-ASCII character encodable by :func:`os.fsencode`.
.. data:: SAVEDCWD
Set to :func:`os.getcwd`.
.. data:: TESTFN
Set to a name that is safe to use as the name of a temporary file. Any
temporary file that is created should be closed and unlinked (removed).
.. data:: TESTFN_NONASCII
Set to a filename containing the :data:`FS_NONASCII` character.
.. data:: TESTFN_UNENCODABLE
Set to a filename (str type) that should not be able to be encoded by file
system encoding in strict mode. It may be ``None`` if it's not possible to
generate such a filename.
.. data:: TESTFN_UNDECODABLE
Set to a filename (bytes type) that should not be able to be decoded by
file system encoding in strict mode. It may be ``None`` if it's not
possible to generate such a filename.
.. data:: TESTFN_UNICODE
Set to a non-ASCII name for a temporary file.
.. class:: EnvironmentVarGuard()
Class used to temporarily set or unset environment variables. Instances can
be used as a context manager and have a complete dictionary interface for
querying/modifying the underlying ``os.environ``. After exit from the
context manager all changes to environment variables done through this
instance will be rolled back.
.. versionchanged:: 3.1
Added dictionary interface.
.. class:: FakePath(path)
Simple :term:`path-like object`. It implements the :meth:`__fspath__`
method which just returns the *path* argument. If *path* is an exception,
it will be raised in :meth:`!__fspath__`.
.. method:: EnvironmentVarGuard.set(envvar, value)
Temporarily set the environment variable ``envvar`` to the value of
``value``.
.. method:: EnvironmentVarGuard.unset(envvar)
Temporarily unset the environment variable ``envvar``.
.. function:: can_symlink()
Return ``True`` if the OS supports symbolic links, ``False``
otherwise.
.. function:: can_xattr()
Return ``True`` if the OS supports xattr, ``False``
otherwise.
.. function:: change_cwd(path, quiet=False)
A context manager that temporarily changes the current working
directory to *path* and yields the directory.
If *quiet* is ``False``, the context manager raises an exception
on error. Otherwise, it issues only a warning and keeps the current
working directory the same.
.. function:: create_empty_file(filename)
Create an empty file with *filename*. If it already exists, truncate it.
.. function:: fd_count()
Count the number of open file descriptors.
.. function:: fs_is_case_insensitive(directory)
Return ``True`` if the file system for *directory* is case-insensitive.
.. function:: make_bad_fd()
Create an invalid file descriptor by opening and closing a temporary file,
and returning its descriptor.
.. function:: rmdir(filename)
Call :func:`os.rmdir` on *filename*. On Windows platforms, this is
wrapped with a wait loop that checks for the existence of the file.
.. function:: rmtree(path)
Call :func:`shutil.rmtree` on *path* or call :func:`os.lstat` and
:func:`os.rmdir` to remove a path and its contents. On Windows platforms,
this is wrapped with a wait loop that checks for the existence of the files.
.. decorator:: skip_unless_symlink
A decorator for running tests that require support for symbolic links.
.. decorator:: skip_unless_xattr
A decorator for running tests that require support for xattr.
.. function:: temp_cwd(name='tempcwd', quiet=False)
A context manager that temporarily creates a new directory and
changes the current working directory (CWD).
The context manager creates a temporary directory in the current
directory with name *name* before temporarily changing the current
working directory. If *name* is ``None``, the temporary directory is
created using :func:`tempfile.mkdtemp`.
If *quiet* is ``False`` and it is not possible to create or change
the CWD, an error is raised. Otherwise, only a warning is raised
and the original CWD is used.
.. function:: temp_dir(path=None, quiet=False)
A context manager that creates a temporary directory at *path* and
yields the directory.
If *path* is ``None``, the temporary directory is created using
:func:`tempfile.mkdtemp`. If *quiet* is ``False``, the context manager
raises an exception on error. Otherwise, if *path* is specified and
cannot be created, only a warning is issued.
.. function:: temp_umask(umask)
A context manager that temporarily sets the process umask.
.. function:: unlink(filename)
Call :func:`os.unlink` on *filename*. On Windows platforms, this is
wrapped with a wait loop that checks for the existence fo the file.

View File

@ -3,7 +3,6 @@
if __name__ != 'test.support': if __name__ != 'test.support':
raise ImportError('support must be imported from the test package') raise ImportError('support must be imported from the test package')
import collections.abc
import contextlib import contextlib
import errno import errno
import fnmatch import fnmatch
@ -22,8 +21,19 @@ import types
import unittest import unittest
import warnings import warnings
from .os_helper import (
FS_NONASCII, SAVEDCWD, TESTFN, TESTFN_NONASCII,
TESTFN_UNENCODABLE, TESTFN_UNDECODABLE,
TESTFN_UNICODE, can_symlink, can_xattr,
change_cwd, create_empty_file, fd_count,
fs_is_case_insensitive, make_bad_fd, rmdir,
rmtree, skip_unless_symlink, skip_unless_xattr,
temp_cwd, temp_dir, temp_umask, unlink,
EnvironmentVarGuard, FakePath, _longpath)
from .testresult import get_test_runner from .testresult import get_test_runner
__all__ = [ __all__ = [
# globals # globals
"PIPE_MAX_SIZE", "verbose", "max_memuse", "use_resources", "failfast", "PIPE_MAX_SIZE", "verbose", "max_memuse", "use_resources", "failfast",
@ -36,18 +46,15 @@ __all__ = [
# io # io
"record_original_stdout", "get_original_stdout", "captured_stdout", "record_original_stdout", "get_original_stdout", "captured_stdout",
"captured_stdin", "captured_stderr", "captured_stdin", "captured_stderr",
# filesystem
"TESTFN", "SAVEDCWD", "unlink", "rmtree", "temp_cwd", "findfile",
"create_empty_file", "can_symlink", "fs_is_case_insensitive",
# unittest # unittest
"is_resource_enabled", "requires", "requires_freebsd_version", "is_resource_enabled", "requires", "requires_freebsd_version",
"requires_linux_version", "requires_mac_ver", "requires_linux_version", "requires_mac_ver",
"check_syntax_error", "check_syntax_warning", "check_syntax_error", "check_syntax_warning",
"TransientResource", "time_out", "socket_peer_reset", "ioerror_peer_reset", "TransientResource", "time_out", "socket_peer_reset", "ioerror_peer_reset",
"BasicTestRunner", "run_unittest", "run_doctest", "BasicTestRunner", "run_unittest", "run_doctest",
"skip_unless_symlink", "requires_gzip", "requires_bz2", "requires_lzma", "requires_gzip", "requires_bz2", "requires_lzma",
"bigmemtest", "bigaddrspacetest", "cpython_only", "get_attribute", "bigmemtest", "bigaddrspacetest", "cpython_only", "get_attribute",
"requires_IEEE_754", "skip_unless_xattr", "requires_zlib", "requires_IEEE_754", "requires_zlib",
"anticipate_failure", "load_package_tests", "detect_api_mismatch", "anticipate_failure", "load_package_tests", "detect_api_mismatch",
"check__all__", "skip_if_buggy_ucrt_strfptime", "check__all__", "skip_if_buggy_ucrt_strfptime",
"ignore_warnings", "ignore_warnings",
@ -57,13 +64,12 @@ __all__ = [
# network # network
"open_urlresource", "open_urlresource",
# processes # processes
'temp_umask', "reap_children", "reap_children",
# miscellaneous # miscellaneous
"check_warnings", "check_no_resource_warning", "check_no_warnings", "check_warnings", "check_no_resource_warning", "check_no_warnings",
"EnvironmentVarGuard", "run_with_locale", "swap_item", "findfile",
"run_with_locale", "swap_item",
"swap_attr", "Matcher", "set_memlimit", "SuppressCrashReport", "sortdict", "swap_attr", "Matcher", "set_memlimit", "SuppressCrashReport", "sortdict",
"run_with_tz", "PGO", "missing_compiler_executable", "fd_count", "run_with_tz", "PGO", "missing_compiler_executable",
"ALWAYS_EQ", "NEVER_EQ", "LARGEST", "SMALLEST", "ALWAYS_EQ", "NEVER_EQ", "LARGEST", "SMALLEST",
"LOOPBACK_TIMEOUT", "INTERNET_TIMEOUT", "SHORT_TIMEOUT", "LONG_TIMEOUT", "LOOPBACK_TIMEOUT", "INTERNET_TIMEOUT", "SHORT_TIMEOUT", "LONG_TIMEOUT",
] ]
@ -318,6 +324,7 @@ def unload(name):
except KeyError: except KeyError:
pass pass
def _force_run(path, func, *args): def _force_run(path, func, *args):
try: try:
return func(*args) return func(*args)
@ -328,124 +335,6 @@ def _force_run(path, func, *args):
os.chmod(path, stat.S_IRWXU) os.chmod(path, stat.S_IRWXU)
return func(*args) return func(*args)
if sys.platform.startswith("win"):
def _waitfor(func, pathname, waitall=False):
# Perform the operation
func(pathname)
# Now setup the wait loop
if waitall:
dirname = pathname
else:
dirname, name = os.path.split(pathname)
dirname = dirname or '.'
# Check for `pathname` to be removed from the filesystem.
# The exponential backoff of the timeout amounts to a total
# of ~1 second after which the deletion is probably an error
# anyway.
# Testing on an i7@4.3GHz shows that usually only 1 iteration is
# required when contention occurs.
timeout = 0.001
while timeout < 1.0:
# Note we are only testing for the existence of the file(s) in
# the contents of the directory regardless of any security or
# access rights. If we have made it this far, we have sufficient
# permissions to do that much using Python's equivalent of the
# Windows API FindFirstFile.
# Other Windows APIs can fail or give incorrect results when
# dealing with files that are pending deletion.
L = os.listdir(dirname)
if not (L if waitall else name in L):
return
# Increase the timeout and try again
time.sleep(timeout)
timeout *= 2
warnings.warn('tests may fail, delete still pending for ' + pathname,
RuntimeWarning, stacklevel=4)
def _unlink(filename):
_waitfor(os.unlink, filename)
def _rmdir(dirname):
_waitfor(os.rmdir, dirname)
def _rmtree(path):
def _rmtree_inner(path):
for name in _force_run(path, os.listdir, path):
fullname = os.path.join(path, name)
try:
mode = os.lstat(fullname).st_mode
except OSError as exc:
print("support.rmtree(): os.lstat(%r) failed with %s" % (fullname, exc),
file=sys.__stderr__)
mode = 0
if stat.S_ISDIR(mode):
_waitfor(_rmtree_inner, fullname, waitall=True)
_force_run(fullname, os.rmdir, fullname)
else:
_force_run(fullname, os.unlink, fullname)
_waitfor(_rmtree_inner, path, waitall=True)
_waitfor(lambda p: _force_run(p, os.rmdir, p), path)
def _longpath(path):
try:
import ctypes
except ImportError:
# No ctypes means we can't expands paths.
pass
else:
buffer = ctypes.create_unicode_buffer(len(path) * 2)
length = ctypes.windll.kernel32.GetLongPathNameW(path, buffer,
len(buffer))
if length:
return buffer[:length]
return path
else:
_unlink = os.unlink
_rmdir = os.rmdir
def _rmtree(path):
import shutil
try:
shutil.rmtree(path)
return
except OSError:
pass
def _rmtree_inner(path):
for name in _force_run(path, os.listdir, path):
fullname = os.path.join(path, name)
try:
mode = os.lstat(fullname).st_mode
except OSError:
mode = 0
if stat.S_ISDIR(mode):
_rmtree_inner(fullname)
_force_run(path, os.rmdir, fullname)
else:
_force_run(path, os.unlink, fullname)
_rmtree_inner(path)
os.rmdir(path)
def _longpath(path):
return path
def unlink(filename):
try:
_unlink(filename)
except (FileNotFoundError, NotADirectoryError):
pass
def rmdir(dirname):
try:
_rmdir(dirname)
except FileNotFoundError:
pass
def rmtree(path):
try:
_rmtree(path)
except FileNotFoundError:
pass
def make_legacy_pyc(source): def make_legacy_pyc(source):
"""Move a PEP 3147/488 pyc file to its legacy pyc location. """Move a PEP 3147/488 pyc file to its legacy pyc location.
@ -714,149 +603,10 @@ if sys.platform != 'win32':
else: else:
unix_shell = None unix_shell = None
# Filename used for testing
if os.name == 'java':
# Jython disallows @ in module names
TESTFN = '$test'
else:
TESTFN = '@test'
# Disambiguate TESTFN for parallel testing, while letting it remain a valid
# module name.
TESTFN = "{}_{}_tmp".format(TESTFN, os.getpid())
# Define the URL of a dedicated HTTP server for the network tests. # Define the URL of a dedicated HTTP server for the network tests.
# The URL must use clear-text HTTP: no redirection to encrypted HTTPS. # The URL must use clear-text HTTP: no redirection to encrypted HTTPS.
TEST_HTTP_URL = "http://www.pythontest.net" TEST_HTTP_URL = "http://www.pythontest.net"
# FS_NONASCII: non-ASCII character encodable by os.fsencode(),
# or None if there is no such character.
FS_NONASCII = None
for character in (
# First try printable and common characters to have a readable filename.
# For each character, the encoding list are just example of encodings able
# to encode the character (the list is not exhaustive).
# U+00E6 (Latin Small Letter Ae): cp1252, iso-8859-1
'\u00E6',
# U+0130 (Latin Capital Letter I With Dot Above): cp1254, iso8859_3
'\u0130',
# U+0141 (Latin Capital Letter L With Stroke): cp1250, cp1257
'\u0141',
# U+03C6 (Greek Small Letter Phi): cp1253
'\u03C6',
# U+041A (Cyrillic Capital Letter Ka): cp1251
'\u041A',
# U+05D0 (Hebrew Letter Alef): Encodable to cp424
'\u05D0',
# U+060C (Arabic Comma): cp864, cp1006, iso8859_6, mac_arabic
'\u060C',
# U+062A (Arabic Letter Teh): cp720
'\u062A',
# U+0E01 (Thai Character Ko Kai): cp874
'\u0E01',
# Then try more "special" characters. "special" because they may be
# interpreted or displayed differently depending on the exact locale
# encoding and the font.
# U+00A0 (No-Break Space)
'\u00A0',
# U+20AC (Euro Sign)
'\u20AC',
):
try:
# If Python is set up to use the legacy 'mbcs' in Windows,
# 'replace' error mode is used, and encode() returns b'?'
# for characters missing in the ANSI codepage
if os.fsdecode(os.fsencode(character)) != character:
raise UnicodeError
except UnicodeError:
pass
else:
FS_NONASCII = character
break
# TESTFN_UNICODE is a non-ascii filename
TESTFN_UNICODE = TESTFN + "-\xe0\xf2\u0258\u0141\u011f"
if sys.platform == 'darwin':
# In Mac OS X's VFS API file names are, by definition, canonically
# decomposed Unicode, encoded using UTF-8. See QA1173:
# http://developer.apple.com/mac/library/qa/qa2001/qa1173.html
import unicodedata
TESTFN_UNICODE = unicodedata.normalize('NFD', TESTFN_UNICODE)
# TESTFN_UNENCODABLE is a filename (str type) that should *not* be able to be
# encoded by the filesystem encoding (in strict mode). It can be None if we
# cannot generate such filename.
TESTFN_UNENCODABLE = None
if os.name == 'nt':
# skip win32s (0) or Windows 9x/ME (1)
if sys.getwindowsversion().platform >= 2:
# Different kinds of characters from various languages to minimize the
# probability that the whole name is encodable to MBCS (issue #9819)
TESTFN_UNENCODABLE = TESTFN + "-\u5171\u0141\u2661\u0363\uDC80"
try:
TESTFN_UNENCODABLE.encode(sys.getfilesystemencoding())
except UnicodeEncodeError:
pass
else:
print('WARNING: The filename %r CAN be encoded by the filesystem encoding (%s). '
'Unicode filename tests may not be effective'
% (TESTFN_UNENCODABLE, sys.getfilesystemencoding()))
TESTFN_UNENCODABLE = None
# Mac OS X denies unencodable filenames (invalid utf-8)
elif sys.platform != 'darwin':
try:
# ascii and utf-8 cannot encode the byte 0xff
b'\xff'.decode(sys.getfilesystemencoding())
except UnicodeDecodeError:
# 0xff will be encoded using the surrogate character u+DCFF
TESTFN_UNENCODABLE = TESTFN \
+ b'-\xff'.decode(sys.getfilesystemencoding(), 'surrogateescape')
else:
# File system encoding (eg. ISO-8859-* encodings) can encode
# the byte 0xff. Skip some unicode filename tests.
pass
# TESTFN_UNDECODABLE is a filename (bytes type) that should *not* be able to be
# decoded from the filesystem encoding (in strict mode). It can be None if we
# cannot generate such filename (ex: the latin1 encoding can decode any byte
# sequence). On UNIX, TESTFN_UNDECODABLE can be decoded by os.fsdecode() thanks
# to the surrogateescape error handler (PEP 383), but not from the filesystem
# encoding in strict mode.
TESTFN_UNDECODABLE = None
for name in (
# b'\xff' is not decodable by os.fsdecode() with code page 932. Windows
# accepts it to create a file or a directory, or don't accept to enter to
# such directory (when the bytes name is used). So test b'\xe7' first: it is
# not decodable from cp932.
b'\xe7w\xf0',
# undecodable from ASCII, UTF-8
b'\xff',
# undecodable from iso8859-3, iso8859-6, iso8859-7, cp424, iso8859-8, cp856
# and cp857
b'\xae\xd5'
# undecodable from UTF-8 (UNIX and Mac OS X)
b'\xed\xb2\x80', b'\xed\xb4\x80',
# undecodable from shift_jis, cp869, cp874, cp932, cp1250, cp1251, cp1252,
# cp1253, cp1254, cp1255, cp1257, cp1258
b'\x81\x98',
):
try:
name.decode(sys.getfilesystemencoding())
except UnicodeDecodeError:
TESTFN_UNDECODABLE = os.fsencode(TESTFN) + name
break
if FS_NONASCII:
TESTFN_NONASCII = TESTFN + '-' + FS_NONASCII
else:
TESTFN_NONASCII = None
# Save the initial cwd
SAVEDCWD = os.getcwd()
# Set by libregrtest/main.py so we can skip tests that are not # Set by libregrtest/main.py so we can skip tests that are not
# useful for PGO # useful for PGO
PGO = False PGO = False
@ -865,103 +615,6 @@ PGO = False
# PGO task. If this is True, PGO is also True. # PGO task. If this is True, PGO is also True.
PGO_EXTENDED = False PGO_EXTENDED = False
@contextlib.contextmanager
def temp_dir(path=None, quiet=False):
"""Return a context manager that creates a temporary directory.
Arguments:
path: the directory to create temporarily. If omitted or None,
defaults to creating a temporary directory using tempfile.mkdtemp.
quiet: if False (the default), the context manager raises an exception
on error. Otherwise, if the path is specified and cannot be
created, only a warning is issued.
"""
import tempfile
dir_created = False
if path is None:
path = tempfile.mkdtemp()
dir_created = True
path = os.path.realpath(path)
else:
try:
os.mkdir(path)
dir_created = True
except OSError as exc:
if not quiet:
raise
warnings.warn(f'tests may fail, unable to create '
f'temporary directory {path!r}: {exc}',
RuntimeWarning, stacklevel=3)
if dir_created:
pid = os.getpid()
try:
yield path
finally:
# In case the process forks, let only the parent remove the
# directory. The child has a different process id. (bpo-30028)
if dir_created and pid == os.getpid():
rmtree(path)
@contextlib.contextmanager
def change_cwd(path, quiet=False):
"""Return a context manager that changes the current working directory.
Arguments:
path: the directory to use as the temporary current working directory.
quiet: if False (the default), the context manager raises an exception
on error. Otherwise, it issues only a warning and keeps the current
working directory the same.
"""
saved_dir = os.getcwd()
try:
os.chdir(os.path.realpath(path))
except OSError as exc:
if not quiet:
raise
warnings.warn(f'tests may fail, unable to change the current working '
f'directory to {path!r}: {exc}',
RuntimeWarning, stacklevel=3)
try:
yield os.getcwd()
finally:
os.chdir(saved_dir)
@contextlib.contextmanager
def temp_cwd(name='tempcwd', quiet=False):
"""
Context manager that temporarily creates and changes the CWD.
The function temporarily changes the current working directory
after creating a temporary directory in the current directory with
name *name*. If *name* is None, the temporary directory is
created using tempfile.mkdtemp.
If *quiet* is False (default) and it is not possible to
create or change the CWD, an error is raised. If *quiet* is True,
only a warning is raised and the original CWD is used.
"""
with temp_dir(path=name, quiet=quiet) as temp_path:
with change_cwd(temp_path, quiet=quiet) as cwd_dir:
yield cwd_dir
if hasattr(os, "umask"):
@contextlib.contextmanager
def temp_umask(umask):
"""Context manager that temporarily sets the process umask."""
oldmask = os.umask(umask)
try:
yield
finally:
os.umask(oldmask)
# TEST_HOME_DIR refers to the top level directory of the "test" package # TEST_HOME_DIR refers to the top level directory of the "test" package
# that contains Python's regression test suite # that contains Python's regression test suite
TEST_SUPPORT_DIR = os.path.dirname(os.path.abspath(__file__)) TEST_SUPPORT_DIR = os.path.dirname(os.path.abspath(__file__))
@ -970,6 +623,7 @@ TEST_HOME_DIR = os.path.dirname(TEST_SUPPORT_DIR)
# TEST_DATA_DIR is used as a target download location for remote resources # TEST_DATA_DIR is used as a target download location for remote resources
TEST_DATA_DIR = os.path.join(TEST_HOME_DIR, "data") TEST_DATA_DIR = os.path.join(TEST_HOME_DIR, "data")
def findfile(filename, subdir=None): def findfile(filename, subdir=None):
"""Try to find a file on sys.path or in the test directory. If it is not """Try to find a file on sys.path or in the test directory. If it is not
found the argument passed to the function is returned (this does not found the argument passed to the function is returned (this does not
@ -988,10 +642,6 @@ def findfile(filename, subdir=None):
if os.path.exists(fn): return fn if os.path.exists(fn): return fn
return filename return filename
def create_empty_file(filename):
"""Create an empty file. If the file already exists, truncate it."""
fd = os.open(filename, os.O_WRONLY | os.O_CREAT | os.O_TRUNC)
os.close(fd)
def sortdict(dict): def sortdict(dict):
"Like repr(dict), but in sorted order." "Like repr(dict), but in sorted order."
@ -1000,19 +650,6 @@ def sortdict(dict):
withcommas = ", ".join(reprpairs) withcommas = ", ".join(reprpairs)
return "{%s}" % withcommas return "{%s}" % withcommas
def make_bad_fd():
"""
Create an invalid file descriptor by opening and closing a file and return
its fd.
"""
file = open(TESTFN, "wb")
try:
return file.fileno()
finally:
file.close()
unlink(TESTFN)
def check_syntax_error(testcase, statement, errtext='', *, lineno=None, offset=None): def check_syntax_error(testcase, statement, errtext='', *, lineno=None, offset=None):
with testcase.assertRaisesRegex(SyntaxError, errtext) as cm: with testcase.assertRaisesRegex(SyntaxError, errtext) as cm:
compile(statement, '<test string>', 'exec') compile(statement, '<test string>', 'exec')
@ -1265,59 +902,6 @@ class CleanImport(object):
sys.modules.update(self.original_modules) sys.modules.update(self.original_modules)
class EnvironmentVarGuard(collections.abc.MutableMapping):
"""Class to help protect the environment variable properly. Can be used as
a context manager."""
def __init__(self):
self._environ = os.environ
self._changed = {}
def __getitem__(self, envvar):
return self._environ[envvar]
def __setitem__(self, envvar, value):
# Remember the initial value on the first access
if envvar not in self._changed:
self._changed[envvar] = self._environ.get(envvar)
self._environ[envvar] = value
def __delitem__(self, envvar):
# Remember the initial value on the first access
if envvar not in self._changed:
self._changed[envvar] = self._environ.get(envvar)
if envvar in self._environ:
del self._environ[envvar]
def keys(self):
return self._environ.keys()
def __iter__(self):
return iter(self._environ)
def __len__(self):
return len(self._environ)
def set(self, envvar, value):
self[envvar] = value
def unset(self, envvar):
del self[envvar]
def __enter__(self):
return self
def __exit__(self, *ignore_exc):
for (k, v) in self._changed.items():
if v is None:
if k in self._environ:
del self._environ[k]
else:
self._environ[k] = v
os.environ = self._environ
class DirsOnSysPath(object): class DirsOnSysPath(object):
"""Context manager to temporarily add directories to sys.path. """Context manager to temporarily add directories to sys.path.
@ -2133,28 +1717,6 @@ class Matcher(object):
return result return result
_can_symlink = None
def can_symlink():
global _can_symlink
if _can_symlink is not None:
return _can_symlink
symlink_path = TESTFN + "can_symlink"
try:
os.symlink(TESTFN, symlink_path)
can = True
except (OSError, NotImplementedError, AttributeError):
can = False
else:
os.remove(symlink_path)
_can_symlink = can
return can
def skip_unless_symlink(test):
"""Skip decorator for tests that require functional symlink"""
ok = can_symlink()
msg = "Requires functional symlink implementation"
return test if ok else unittest.skip(msg)(test)
_buggy_ucrt = None _buggy_ucrt = None
def skip_if_buggy_ucrt_strfptime(test): def skip_if_buggy_ucrt_strfptime(test):
""" """
@ -2256,45 +1818,6 @@ class PythonSymlink:
return self._call(self.link, args, self._env, returncode) return self._call(self.link, args, self._env, returncode)
_can_xattr = None
def can_xattr():
import tempfile
global _can_xattr
if _can_xattr is not None:
return _can_xattr
if not hasattr(os, "setxattr"):
can = False
else:
import platform
tmp_dir = tempfile.mkdtemp()
tmp_fp, tmp_name = tempfile.mkstemp(dir=tmp_dir)
try:
with open(TESTFN, "wb") as fp:
try:
# TESTFN & tempfile may use different file systems with
# different capabilities
os.setxattr(tmp_fp, b"user.test", b"")
os.setxattr(tmp_name, b"trusted.foo", b"42")
os.setxattr(fp.fileno(), b"user.test", b"")
# Kernels < 2.6.39 don't respect setxattr flags.
kernel_version = platform.release()
m = re.match(r"2.6.(\d{1,2})", kernel_version)
can = m is None or int(m.group(1)) >= 39
except OSError:
can = False
finally:
unlink(TESTFN)
unlink(tmp_name)
rmdir(tmp_dir)
_can_xattr = can
return can
def skip_unless_xattr(test):
"""Skip decorator for tests that require functional extended attributes"""
ok = can_xattr()
msg = "no non-broken extended attribute support"
return test if ok else unittest.skip(msg)(test)
def skip_if_pgo_task(test): def skip_if_pgo_task(test):
"""Skip decorator for tests not run in (non-extended) PGO task""" """Skip decorator for tests not run in (non-extended) PGO task"""
ok = not PGO or PGO_EXTENDED ok = not PGO or PGO_EXTENDED
@ -2302,20 +1825,6 @@ def skip_if_pgo_task(test):
return test if ok else unittest.skip(msg)(test) return test if ok else unittest.skip(msg)(test)
def fs_is_case_insensitive(directory):
"""Detects if the file system for the specified directory is case-insensitive."""
import tempfile
with tempfile.NamedTemporaryFile(dir=directory) as base:
base_path = base.name
case_path = base_path.upper()
if case_path == base_path:
case_path = base_path.lower()
try:
return os.path.samefile(base_path, case_path)
except FileNotFoundError:
return False
def detect_api_mismatch(ref_api, other_api, *, ignore=()): def detect_api_mismatch(ref_api, other_api, *, ignore=()):
"""Returns the set of items in ref_api not in other_api, except for a """Returns the set of items in ref_api not in other_api, except for a
defined list of items to be ignored in this check. defined list of items to be ignored in this check.
@ -2623,65 +2132,6 @@ def disable_faulthandler():
faulthandler.enable(file=fd, all_threads=True) faulthandler.enable(file=fd, all_threads=True)
def fd_count():
"""Count the number of open file descriptors.
"""
if sys.platform.startswith(('linux', 'freebsd')):
try:
names = os.listdir("/proc/self/fd")
# Subtract one because listdir() internally opens a file
# descriptor to list the content of the /proc/self/fd/ directory.
return len(names) - 1
except FileNotFoundError:
pass
MAXFD = 256
if hasattr(os, 'sysconf'):
try:
MAXFD = os.sysconf("SC_OPEN_MAX")
except OSError:
pass
old_modes = None
if sys.platform == 'win32':
# bpo-25306, bpo-31009: Call CrtSetReportMode() to not kill the process
# on invalid file descriptor if Python is compiled in debug mode
try:
import msvcrt
msvcrt.CrtSetReportMode
except (AttributeError, ImportError):
# no msvcrt or a release build
pass
else:
old_modes = {}
for report_type in (msvcrt.CRT_WARN,
msvcrt.CRT_ERROR,
msvcrt.CRT_ASSERT):
old_modes[report_type] = msvcrt.CrtSetReportMode(report_type, 0)
try:
count = 0
for fd in range(MAXFD):
try:
# Prefer dup() over fstat(). fstat() can require input/output
# whereas dup() doesn't.
fd2 = os.dup(fd)
except OSError as e:
if e.errno != errno.EBADF:
raise
else:
os.close(fd2)
count += 1
finally:
if old_modes is not None:
for report_type in (msvcrt.CRT_WARN,
msvcrt.CRT_ERROR,
msvcrt.CRT_ASSERT):
msvcrt.CrtSetReportMode(report_type, old_modes[report_type])
return count
class SaveSignals: class SaveSignals:
""" """
Save and restore signal handlers. Save and restore signal handlers.
@ -2726,24 +2176,6 @@ def with_pymalloc():
return _testcapi.WITH_PYMALLOC return _testcapi.WITH_PYMALLOC
class FakePath:
"""Simple implementing of the path protocol.
"""
def __init__(self, path):
self.path = path
def __repr__(self):
return f'<FakePath {self.path!r}>'
def __fspath__(self):
if (isinstance(self.path, BaseException) or
isinstance(self.path, type) and
issubclass(self.path, BaseException)):
raise self.path
else:
return self.path
class _ALWAYS_EQ: class _ALWAYS_EQ:
""" """
Object that is equal to anything. Object that is equal to anything.

View File

@ -0,0 +1,611 @@
import collections.abc
import contextlib
import errno
import os
import re
import stat
import sys
import time
import unittest
import warnings
# Filename used for testing
if os.name == 'java':
# Jython disallows @ in module names
TESTFN = '$test'
else:
TESTFN = '@test'
# Disambiguate TESTFN for parallel testing, while letting it remain a valid
# module name.
TESTFN = "{}_{}_tmp".format(TESTFN, os.getpid())
# TESTFN_UNICODE is a non-ascii filename
TESTFN_UNICODE = TESTFN + "-\xe0\xf2\u0258\u0141\u011f"
if sys.platform == 'darwin':
# In Mac OS X's VFS API file names are, by definition, canonically
# decomposed Unicode, encoded using UTF-8. See QA1173:
# http://developer.apple.com/mac/library/qa/qa2001/qa1173.html
import unicodedata
TESTFN_UNICODE = unicodedata.normalize('NFD', TESTFN_UNICODE)
# TESTFN_UNENCODABLE is a filename (str type) that should *not* be able to be
# encoded by the filesystem encoding (in strict mode). It can be None if we
# cannot generate such filename.
TESTFN_UNENCODABLE = None
if os.name == 'nt':
# skip win32s (0) or Windows 9x/ME (1)
if sys.getwindowsversion().platform >= 2:
# Different kinds of characters from various languages to minimize the
# probability that the whole name is encodable to MBCS (issue #9819)
TESTFN_UNENCODABLE = TESTFN + "-\u5171\u0141\u2661\u0363\uDC80"
try:
TESTFN_UNENCODABLE.encode(sys.getfilesystemencoding())
except UnicodeEncodeError:
pass
else:
print('WARNING: The filename %r CAN be encoded by the filesystem '
'encoding (%s). Unicode filename tests may not be effective'
% (TESTFN_UNENCODABLE, sys.getfilesystemencoding()))
TESTFN_UNENCODABLE = None
# Mac OS X denies unencodable filenames (invalid utf-8)
elif sys.platform != 'darwin':
try:
# ascii and utf-8 cannot encode the byte 0xff
b'\xff'.decode(sys.getfilesystemencoding())
except UnicodeDecodeError:
# 0xff will be encoded using the surrogate character u+DCFF
TESTFN_UNENCODABLE = TESTFN \
+ b'-\xff'.decode(sys.getfilesystemencoding(), 'surrogateescape')
else:
# File system encoding (eg. ISO-8859-* encodings) can encode
# the byte 0xff. Skip some unicode filename tests.
pass
# FS_NONASCII: non-ASCII character encodable by os.fsencode(),
# or None if there is no such character.
FS_NONASCII = None
for character in (
# First try printable and common characters to have a readable filename.
# For each character, the encoding list are just example of encodings able
# to encode the character (the list is not exhaustive).
# U+00E6 (Latin Small Letter Ae): cp1252, iso-8859-1
'\u00E6',
# U+0130 (Latin Capital Letter I With Dot Above): cp1254, iso8859_3
'\u0130',
# U+0141 (Latin Capital Letter L With Stroke): cp1250, cp1257
'\u0141',
# U+03C6 (Greek Small Letter Phi): cp1253
'\u03C6',
# U+041A (Cyrillic Capital Letter Ka): cp1251
'\u041A',
# U+05D0 (Hebrew Letter Alef): Encodable to cp424
'\u05D0',
# U+060C (Arabic Comma): cp864, cp1006, iso8859_6, mac_arabic
'\u060C',
# U+062A (Arabic Letter Teh): cp720
'\u062A',
# U+0E01 (Thai Character Ko Kai): cp874
'\u0E01',
# Then try more "special" characters. "special" because they may be
# interpreted or displayed differently depending on the exact locale
# encoding and the font.
# U+00A0 (No-Break Space)
'\u00A0',
# U+20AC (Euro Sign)
'\u20AC',
):
try:
# If Python is set up to use the legacy 'mbcs' in Windows,
# 'replace' error mode is used, and encode() returns b'?'
# for characters missing in the ANSI codepage
if os.fsdecode(os.fsencode(character)) != character:
raise UnicodeError
except UnicodeError:
pass
else:
FS_NONASCII = character
break
# Save the initial cwd
SAVEDCWD = os.getcwd()
# TESTFN_UNDECODABLE is a filename (bytes type) that should *not* be able to be
# decoded from the filesystem encoding (in strict mode). It can be None if we
# cannot generate such filename (ex: the latin1 encoding can decode any byte
# sequence). On UNIX, TESTFN_UNDECODABLE can be decoded by os.fsdecode() thanks
# to the surrogateescape error handler (PEP 383), but not from the filesystem
# encoding in strict mode.
TESTFN_UNDECODABLE = None
for name in (
# b'\xff' is not decodable by os.fsdecode() with code page 932. Windows
# accepts it to create a file or a directory, or don't accept to enter to
# such directory (when the bytes name is used). So test b'\xe7' first:
# it is not decodable from cp932.
b'\xe7w\xf0',
# undecodable from ASCII, UTF-8
b'\xff',
# undecodable from iso8859-3, iso8859-6, iso8859-7, cp424, iso8859-8, cp856
# and cp857
b'\xae\xd5'
# undecodable from UTF-8 (UNIX and Mac OS X)
b'\xed\xb2\x80', b'\xed\xb4\x80',
# undecodable from shift_jis, cp869, cp874, cp932, cp1250, cp1251, cp1252,
# cp1253, cp1254, cp1255, cp1257, cp1258
b'\x81\x98',
):
try:
name.decode(sys.getfilesystemencoding())
except UnicodeDecodeError:
TESTFN_UNDECODABLE = os.fsencode(TESTFN) + name
break
if FS_NONASCII:
TESTFN_NONASCII = TESTFN + '-' + FS_NONASCII
else:
TESTFN_NONASCII = None
def make_bad_fd():
"""
Create an invalid file descriptor by opening and closing a file and return
its fd.
"""
file = open(TESTFN, "wb")
try:
return file.fileno()
finally:
file.close()
unlink(TESTFN)
_can_symlink = None
def can_symlink():
global _can_symlink
if _can_symlink is not None:
return _can_symlink
symlink_path = TESTFN + "can_symlink"
try:
os.symlink(TESTFN, symlink_path)
can = True
except (OSError, NotImplementedError, AttributeError):
can = False
else:
os.remove(symlink_path)
_can_symlink = can
return can
def skip_unless_symlink(test):
"""Skip decorator for tests that require functional symlink"""
ok = can_symlink()
msg = "Requires functional symlink implementation"
return test if ok else unittest.skip(msg)(test)
_can_xattr = None
def can_xattr():
import tempfile
global _can_xattr
if _can_xattr is not None:
return _can_xattr
if not hasattr(os, "setxattr"):
can = False
else:
import platform
tmp_dir = tempfile.mkdtemp()
tmp_fp, tmp_name = tempfile.mkstemp(dir=tmp_dir)
try:
with open(TESTFN, "wb") as fp:
try:
# TESTFN & tempfile may use different file systems with
# different capabilities
os.setxattr(tmp_fp, b"user.test", b"")
os.setxattr(tmp_name, b"trusted.foo", b"42")
os.setxattr(fp.fileno(), b"user.test", b"")
# Kernels < 2.6.39 don't respect setxattr flags.
kernel_version = platform.release()
m = re.match(r"2.6.(\d{1,2})", kernel_version)
can = m is None or int(m.group(1)) >= 39
except OSError:
can = False
finally:
unlink(TESTFN)
unlink(tmp_name)
rmdir(tmp_dir)
_can_xattr = can
return can
def skip_unless_xattr(test):
"""Skip decorator for tests that require functional extended attributes"""
ok = can_xattr()
msg = "no non-broken extended attribute support"
return test if ok else unittest.skip(msg)(test)
def unlink(filename):
try:
_unlink(filename)
except (FileNotFoundError, NotADirectoryError):
pass
if sys.platform.startswith("win"):
def _waitfor(func, pathname, waitall=False):
# Perform the operation
func(pathname)
# Now setup the wait loop
if waitall:
dirname = pathname
else:
dirname, name = os.path.split(pathname)
dirname = dirname or '.'
# Check for `pathname` to be removed from the filesystem.
# The exponential backoff of the timeout amounts to a total
# of ~1 second after which the deletion is probably an error
# anyway.
# Testing on an i7@4.3GHz shows that usually only 1 iteration is
# required when contention occurs.
timeout = 0.001
while timeout < 1.0:
# Note we are only testing for the existence of the file(s) in
# the contents of the directory regardless of any security or
# access rights. If we have made it this far, we have sufficient
# permissions to do that much using Python's equivalent of the
# Windows API FindFirstFile.
# Other Windows APIs can fail or give incorrect results when
# dealing with files that are pending deletion.
L = os.listdir(dirname)
if not (L if waitall else name in L):
return
# Increase the timeout and try again
time.sleep(timeout)
timeout *= 2
warnings.warn('tests may fail, delete still pending for ' + pathname,
RuntimeWarning, stacklevel=4)
def _unlink(filename):
_waitfor(os.unlink, filename)
def _rmdir(dirname):
_waitfor(os.rmdir, dirname)
def _rmtree(path):
from test.support import _force_run
def _rmtree_inner(path):
for name in _force_run(path, os.listdir, path):
fullname = os.path.join(path, name)
try:
mode = os.lstat(fullname).st_mode
except OSError as exc:
print("support.rmtree(): os.lstat(%r) failed with %s"
% (fullname, exc),
file=sys.__stderr__)
mode = 0
if stat.S_ISDIR(mode):
_waitfor(_rmtree_inner, fullname, waitall=True)
_force_run(fullname, os.rmdir, fullname)
else:
_force_run(fullname, os.unlink, fullname)
_waitfor(_rmtree_inner, path, waitall=True)
_waitfor(lambda p: _force_run(p, os.rmdir, p), path)
def _longpath(path):
try:
import ctypes
except ImportError:
# No ctypes means we can't expands paths.
pass
else:
buffer = ctypes.create_unicode_buffer(len(path) * 2)
length = ctypes.windll.kernel32.GetLongPathNameW(path, buffer,
len(buffer))
if length:
return buffer[:length]
return path
else:
_unlink = os.unlink
_rmdir = os.rmdir
def _rmtree(path):
import shutil
try:
shutil.rmtree(path)
return
except OSError:
pass
def _rmtree_inner(path):
from test.support import _force_run
for name in _force_run(path, os.listdir, path):
fullname = os.path.join(path, name)
try:
mode = os.lstat(fullname).st_mode
except OSError:
mode = 0
if stat.S_ISDIR(mode):
_rmtree_inner(fullname)
_force_run(path, os.rmdir, fullname)
else:
_force_run(path, os.unlink, fullname)
_rmtree_inner(path)
os.rmdir(path)
def _longpath(path):
return path
def rmdir(dirname):
try:
_rmdir(dirname)
except FileNotFoundError:
pass
def rmtree(path):
try:
_rmtree(path)
except FileNotFoundError:
pass
@contextlib.contextmanager
def temp_dir(path=None, quiet=False):
"""Return a context manager that creates a temporary directory.
Arguments:
path: the directory to create temporarily. If omitted or None,
defaults to creating a temporary directory using tempfile.mkdtemp.
quiet: if False (the default), the context manager raises an exception
on error. Otherwise, if the path is specified and cannot be
created, only a warning is issued.
"""
import tempfile
dir_created = False
if path is None:
path = tempfile.mkdtemp()
dir_created = True
path = os.path.realpath(path)
else:
try:
os.mkdir(path)
dir_created = True
except OSError as exc:
if not quiet:
raise
warnings.warn(f'tests may fail, unable to create '
f'temporary directory {path!r}: {exc}',
RuntimeWarning, stacklevel=3)
if dir_created:
pid = os.getpid()
try:
yield path
finally:
# In case the process forks, let only the parent remove the
# directory. The child has a different process id. (bpo-30028)
if dir_created and pid == os.getpid():
rmtree(path)
@contextlib.contextmanager
def change_cwd(path, quiet=False):
"""Return a context manager that changes the current working directory.
Arguments:
path: the directory to use as the temporary current working directory.
quiet: if False (the default), the context manager raises an exception
on error. Otherwise, it issues only a warning and keeps the current
working directory the same.
"""
saved_dir = os.getcwd()
try:
os.chdir(os.path.realpath(path))
except OSError as exc:
if not quiet:
raise
warnings.warn(f'tests may fail, unable to change the current working '
f'directory to {path!r}: {exc}',
RuntimeWarning, stacklevel=3)
try:
yield os.getcwd()
finally:
os.chdir(saved_dir)
@contextlib.contextmanager
def temp_cwd(name='tempcwd', quiet=False):
"""
Context manager that temporarily creates and changes the CWD.
The function temporarily changes the current working directory
after creating a temporary directory in the current directory with
name *name*. If *name* is None, the temporary directory is
created using tempfile.mkdtemp.
If *quiet* is False (default) and it is not possible to
create or change the CWD, an error is raised. If *quiet* is True,
only a warning is raised and the original CWD is used.
"""
with temp_dir(path=name, quiet=quiet) as temp_path:
with change_cwd(temp_path, quiet=quiet) as cwd_dir:
yield cwd_dir
def create_empty_file(filename):
"""Create an empty file. If the file already exists, truncate it."""
fd = os.open(filename, os.O_WRONLY | os.O_CREAT | os.O_TRUNC)
os.close(fd)
def fs_is_case_insensitive(directory):
"""Detects if the file system for the specified directory
is case-insensitive."""
import tempfile
with tempfile.NamedTemporaryFile(dir=directory) as base:
base_path = base.name
case_path = base_path.upper()
if case_path == base_path:
case_path = base_path.lower()
try:
return os.path.samefile(base_path, case_path)
except FileNotFoundError:
return False
class FakePath:
"""Simple implementing of the path protocol.
"""
def __init__(self, path):
self.path = path
def __repr__(self):
return f'<FakePath {self.path!r}>'
def __fspath__(self):
if (isinstance(self.path, BaseException) or
isinstance(self.path, type) and
issubclass(self.path, BaseException)):
raise self.path
else:
return self.path
def fd_count():
"""Count the number of open file descriptors.
"""
if sys.platform.startswith(('linux', 'freebsd')):
try:
names = os.listdir("/proc/self/fd")
# Subtract one because listdir() internally opens a file
# descriptor to list the content of the /proc/self/fd/ directory.
return len(names) - 1
except FileNotFoundError:
pass
MAXFD = 256
if hasattr(os, 'sysconf'):
try:
MAXFD = os.sysconf("SC_OPEN_MAX")
except OSError:
pass
old_modes = None
if sys.platform == 'win32':
# bpo-25306, bpo-31009: Call CrtSetReportMode() to not kill the process
# on invalid file descriptor if Python is compiled in debug mode
try:
import msvcrt
msvcrt.CrtSetReportMode
except (AttributeError, ImportError):
# no msvcrt or a release build
pass
else:
old_modes = {}
for report_type in (msvcrt.CRT_WARN,
msvcrt.CRT_ERROR,
msvcrt.CRT_ASSERT):
old_modes[report_type] = msvcrt.CrtSetReportMode(report_type,
0)
try:
count = 0
for fd in range(MAXFD):
try:
# Prefer dup() over fstat(). fstat() can require input/output
# whereas dup() doesn't.
fd2 = os.dup(fd)
except OSError as e:
if e.errno != errno.EBADF:
raise
else:
os.close(fd2)
count += 1
finally:
if old_modes is not None:
for report_type in (msvcrt.CRT_WARN,
msvcrt.CRT_ERROR,
msvcrt.CRT_ASSERT):
msvcrt.CrtSetReportMode(report_type, old_modes[report_type])
return count
if hasattr(os, "umask"):
@contextlib.contextmanager
def temp_umask(umask):
"""Context manager that temporarily sets the process umask."""
oldmask = os.umask(umask)
try:
yield
finally:
os.umask(oldmask)
class EnvironmentVarGuard(collections.abc.MutableMapping):
"""Class to help protect the environment variable properly. Can be used as
a context manager."""
def __init__(self):
self._environ = os.environ
self._changed = {}
def __getitem__(self, envvar):
return self._environ[envvar]
def __setitem__(self, envvar, value):
# Remember the initial value on the first access
if envvar not in self._changed:
self._changed[envvar] = self._environ.get(envvar)
self._environ[envvar] = value
def __delitem__(self, envvar):
# Remember the initial value on the first access
if envvar not in self._changed:
self._changed[envvar] = self._environ.get(envvar)
if envvar in self._environ:
del self._environ[envvar]
def keys(self):
return self._environ.keys()
def __iter__(self):
return iter(self._environ)
def __len__(self):
return len(self._environ)
def set(self, envvar, value):
self[envvar] = value
def unset(self, envvar):
del self[envvar]
def __enter__(self):
return self
def __exit__(self, *ignore_exc):
for (k, v) in self._changed.items():
if v is None:
if k in self._environ:
del self._environ[k]
else:
self._environ[k] = v
os.environ = self._environ