Implement #1578269. Patch by Jason R. Coombs.
Added Windows support for os.symlink when run on Windows 6.0 or greater, aka Vista. Previous Windows versions will raise NotImplementedError when trying to symlink. Includes numerous test updates and additions to test_os, including a symlink_support module because of the fact that privilege escalation is required in order to run the tests to ensure that the user is able to create symlinks. By default, accounts do not have the required privilege, so the escalation code will have to be exposed later (or documented on how to do so). I'll be following up with that work next. Note that the tests use ctypes, which was agreed on during the PyCon language summit.
This commit is contained in:
parent
0dd8f7890a
commit
d40e6f70a5
|
@ -231,11 +231,15 @@ applications should use string objects to access all files.
|
||||||
|
|
||||||
.. function:: samefile(path1, path2)
|
.. function:: samefile(path1, path2)
|
||||||
|
|
||||||
Return ``True`` if both pathname arguments refer to the same file or directory
|
Return ``True`` if both pathname arguments refer to the same file or directory.
|
||||||
(as indicated by device number and i-node number). Raise an exception if a
|
On Unix, this is determined by the device number and i-node number and raises an
|
||||||
:func:`os.stat` call on either pathname fails.
|
exception if a :func:`os.stat` call on either pathname fails.
|
||||||
|
|
||||||
Availability: Unix.
|
On Windows, two files are the same if they resolve to the same final path
|
||||||
|
name using the Windows API call GetFinalPathNameByHandle and this function
|
||||||
|
raises an exception if handles cannot be obtained to either file.
|
||||||
|
|
||||||
|
Availability: Windows, Unix.
|
||||||
|
|
||||||
|
|
||||||
.. function:: sameopenfile(fp1, fp2)
|
.. function:: sameopenfile(fp1, fp2)
|
||||||
|
|
|
@ -1065,7 +1065,7 @@ Files and Directories
|
||||||
|
|
||||||
Like :func:`stat`, but do not follow symbolic links. This is an alias for
|
Like :func:`stat`, but do not follow symbolic links. This is an alias for
|
||||||
:func:`stat` on platforms that do not support symbolic links, such as
|
:func:`stat` on platforms that do not support symbolic links, such as
|
||||||
Windows.
|
Windows prior to 6.0 (Vista).
|
||||||
|
|
||||||
|
|
||||||
.. function:: mkfifo(path[, mode])
|
.. function:: mkfifo(path[, mode])
|
||||||
|
@ -1181,7 +1181,7 @@ Files and Directories
|
||||||
and the call may raise an UnicodeDecodeError. If the *path* is a bytes
|
and the call may raise an UnicodeDecodeError. If the *path* is a bytes
|
||||||
object, the result will be a bytes object.
|
object, the result will be a bytes object.
|
||||||
|
|
||||||
Availability: Unix.
|
Availability: Unix, Windows.
|
||||||
|
|
||||||
|
|
||||||
.. function:: remove(path)
|
.. function:: remove(path)
|
||||||
|
@ -1341,9 +1341,25 @@ Files and Directories
|
||||||
|
|
||||||
.. function:: symlink(source, link_name)
|
.. function:: symlink(source, link_name)
|
||||||
|
|
||||||
Create a symbolic link pointing to *source* named *link_name*.
|
Create a symbolic link pointing to *source* named *link_name*. On Windows,
|
||||||
|
symlink version takes an additional, optional parameter,
|
||||||
|
*target_is_directory*, which defaults to False.
|
||||||
|
|
||||||
Availability: Unix.
|
symlink(source, link_name, target_is_directory=False)
|
||||||
|
|
||||||
|
On Windows, a symlink represents a file or a directory, and does not
|
||||||
|
morph to the target dynamically. For this reason, when creating a
|
||||||
|
symlink on Windows, if the target is not already present, the symlink
|
||||||
|
will default to being a file symlink. If *target_is_directory* is set to
|
||||||
|
True, the symlink will be created as a directory symlink. This
|
||||||
|
parameter is ignored if the target exists (and the symlink is created
|
||||||
|
with the same type as the target).
|
||||||
|
|
||||||
|
Symbolic link support was introduced in Windows 6.0 (Vista). *symlink*
|
||||||
|
will raise a NotImplementedError on Windows versions earlier than 6.0. The
|
||||||
|
SeCreateSymbolicLinkPrivilege is required in order to create symlinks.
|
||||||
|
|
||||||
|
Availability: Unix, Windows 6.0.
|
||||||
|
|
||||||
|
|
||||||
.. function:: unlink(path)
|
.. function:: unlink(path)
|
||||||
|
|
|
@ -16,7 +16,8 @@ __all__ = ["normcase","isabs","join","splitdrive","split","splitext",
|
||||||
"getatime","getctime", "islink","exists","lexists","isdir","isfile",
|
"getatime","getctime", "islink","exists","lexists","isdir","isfile",
|
||||||
"ismount", "expanduser","expandvars","normpath","abspath",
|
"ismount", "expanduser","expandvars","normpath","abspath",
|
||||||
"splitunc","curdir","pardir","sep","pathsep","defpath","altsep",
|
"splitunc","curdir","pardir","sep","pathsep","defpath","altsep",
|
||||||
"extsep","devnull","realpath","supports_unicode_filenames","relpath"]
|
"extsep","devnull","realpath","supports_unicode_filenames","relpath",
|
||||||
|
"samefile",]
|
||||||
|
|
||||||
# strings representing various path-related bits and pieces
|
# strings representing various path-related bits and pieces
|
||||||
# These are primarily for export; internally, they are hardcoded.
|
# These are primarily for export; internally, they are hardcoded.
|
||||||
|
@ -309,16 +310,28 @@ def dirname(p):
|
||||||
return split(p)[0]
|
return split(p)[0]
|
||||||
|
|
||||||
# Is a path a symbolic link?
|
# Is a path a symbolic link?
|
||||||
# This will always return false on systems where posix.lstat doesn't exist.
|
# This will always return false on systems where os.lstat doesn't exist.
|
||||||
|
|
||||||
def islink(path):
|
def islink(path):
|
||||||
"""Test for symbolic link.
|
"""Test whether a path is a symbolic link.
|
||||||
On WindowsNT/95 and OS/2 always returns false
|
This will always return false for Windows prior to 6.0
|
||||||
|
and for OS/2.
|
||||||
"""
|
"""
|
||||||
return False
|
try:
|
||||||
|
st = os.lstat(path)
|
||||||
|
except (os.error, AttributeError):
|
||||||
|
return False
|
||||||
|
return stat.S_ISLNK(st.st_mode)
|
||||||
|
|
||||||
# alias exists to lexists
|
# Being true for dangling symbolic links is also useful.
|
||||||
lexists = exists
|
|
||||||
|
def lexists(path):
|
||||||
|
"""Test whether a path exists. Returns True for broken symbolic links"""
|
||||||
|
try:
|
||||||
|
st = os.lstat(path)
|
||||||
|
except (os.error, WindowsError):
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
# Is a path a mount point? Either a root (with or without drive letter)
|
# Is a path a mount point? Either a root (with or without drive letter)
|
||||||
# or an UNC path with at most a / or \ after the mount point.
|
# or an UNC path with at most a / or \ after the mount point.
|
||||||
|
@ -612,3 +625,17 @@ def relpath(path, start=curdir):
|
||||||
if not rel_list:
|
if not rel_list:
|
||||||
return _get_dot(path)
|
return _get_dot(path)
|
||||||
return join(*rel_list)
|
return join(*rel_list)
|
||||||
|
|
||||||
|
|
||||||
|
# determine if two files are in fact the same file
|
||||||
|
def samefile(f1, f2):
|
||||||
|
"Test whether two pathnames reference the same actual file"
|
||||||
|
try:
|
||||||
|
from nt import _getfinalpathname
|
||||||
|
return _getfinalpathname(f1) == _getfinalpathname(f2)
|
||||||
|
except (NotImplementedError, ImportError):
|
||||||
|
# On Windows XP and earlier, two files are the same if their
|
||||||
|
# absolute pathnames are the same.
|
||||||
|
# Also, on other operating systems, fake this method with a
|
||||||
|
# Windows-XP approximation.
|
||||||
|
return abspath(f1) == abspath(f2)
|
||||||
|
|
|
@ -2273,7 +2273,7 @@ class TarFile(object):
|
||||||
(platform limitation), we try to make a copy of the referenced file
|
(platform limitation), we try to make a copy of the referenced file
|
||||||
instead of a link.
|
instead of a link.
|
||||||
"""
|
"""
|
||||||
if hasattr(os, "symlink") and hasattr(os, "link"):
|
try:
|
||||||
# For systems that support symbolic and hard links.
|
# For systems that support symbolic and hard links.
|
||||||
if tarinfo.issym():
|
if tarinfo.issym():
|
||||||
os.symlink(tarinfo.linkname, targetpath)
|
os.symlink(tarinfo.linkname, targetpath)
|
||||||
|
@ -2282,7 +2282,15 @@ class TarFile(object):
|
||||||
if os.path.exists(tarinfo._link_target):
|
if os.path.exists(tarinfo._link_target):
|
||||||
os.link(tarinfo._link_target, targetpath)
|
os.link(tarinfo._link_target, targetpath)
|
||||||
else:
|
else:
|
||||||
self._extract_member(self._find_link_target(tarinfo), targetpath)
|
self._extract_mem
|
||||||
|
except (AttributeError, NotImplementedError, WindowsError):
|
||||||
|
# AttributeError if no os.symlink
|
||||||
|
# NotImplementedError if on Windows XP
|
||||||
|
# WindowsError (1314) if the required privilege is not held by the client
|
||||||
|
if tarinfo.issym():
|
||||||
|
linkpath = os.path.join(os.path.dirname(tarinfo.name),tarinfo.linkname)
|
||||||
|
else:
|
||||||
|
linkpath = tarinfo.linkname
|
||||||
else:
|
else:
|
||||||
try:
|
try:
|
||||||
self._extract_member(self._find_link_target(tarinfo), targetpath)
|
self._extract_member(self._find_link_target(tarinfo), targetpath)
|
||||||
|
|
|
@ -17,6 +17,7 @@ import unittest
|
||||||
import importlib
|
import importlib
|
||||||
import collections
|
import collections
|
||||||
import re
|
import re
|
||||||
|
import subprocess
|
||||||
import imp
|
import imp
|
||||||
import time
|
import time
|
||||||
try:
|
try:
|
||||||
|
@ -38,8 +39,7 @@ __all__ = [
|
||||||
"set_memlimit", "bigmemtest", "bigaddrspacetest", "BasicTestRunner",
|
"set_memlimit", "bigmemtest", "bigaddrspacetest", "BasicTestRunner",
|
||||||
"run_unittest", "run_doctest", "threading_setup", "threading_cleanup",
|
"run_unittest", "run_doctest", "threading_setup", "threading_cleanup",
|
||||||
"reap_children", "cpython_only", "check_impl_detail", "get_attribute",
|
"reap_children", "cpython_only", "check_impl_detail", "get_attribute",
|
||||||
"swap_item", "swap_attr",
|
"swap_item", "swap_attr", "can_symlink", "skip_unless_symlink"]
|
||||||
]
|
|
||||||
|
|
||||||
|
|
||||||
class Error(Exception):
|
class Error(Exception):
|
||||||
|
@ -1169,6 +1169,27 @@ def reap_children():
|
||||||
except:
|
except:
|
||||||
break
|
break
|
||||||
|
|
||||||
|
try:
|
||||||
|
from .symlink_support import enable_symlink_privilege
|
||||||
|
except:
|
||||||
|
enable_symlink_privilege = lambda: True
|
||||||
|
|
||||||
|
def can_symlink():
|
||||||
|
"""It's no longer sufficient to test for the presence of symlink in the
|
||||||
|
os module - on Windows XP and earlier, os.symlink exists but a
|
||||||
|
NotImplementedError is thrown.
|
||||||
|
"""
|
||||||
|
has_symlink = hasattr(os, 'symlink')
|
||||||
|
is_old_windows = sys.platform == "win32" and sys.getwindowsversion().major < 6
|
||||||
|
has_privilege = False if is_old_windows else enable_symlink_privilege()
|
||||||
|
return has_symlink and (not is_old_windows) and has_privilege
|
||||||
|
|
||||||
|
def skip_unless_symlink(test):
|
||||||
|
"""Skip decorator for tests that require functional symlink"""
|
||||||
|
selector = can_symlink()
|
||||||
|
msg = "Requires functional symlink implementation"
|
||||||
|
return [unittest.skip(msg)(test), test][selector]
|
||||||
|
|
||||||
@contextlib.contextmanager
|
@contextlib.contextmanager
|
||||||
def swap_attr(obj, attr, new_val):
|
def swap_attr(obj, attr, new_val):
|
||||||
"""Temporary swap out an attribute with a new object.
|
"""Temporary swap out an attribute with a new object.
|
||||||
|
|
|
@ -0,0 +1,203 @@
|
||||||
|
"""
|
||||||
|
A module built to test if the current process has the privilege to
|
||||||
|
create symlinks on Windows.
|
||||||
|
"""
|
||||||
|
|
||||||
|
# allow script to run natively under python 2.6+
|
||||||
|
from __future__ import print_function
|
||||||
|
|
||||||
|
import ctypes
|
||||||
|
from ctypes import wintypes
|
||||||
|
|
||||||
|
GetCurrentProcess = ctypes.windll.kernel32.GetCurrentProcess
|
||||||
|
GetCurrentProcess.restype = wintypes.HANDLE
|
||||||
|
OpenProcessToken = ctypes.windll.advapi32.OpenProcessToken
|
||||||
|
OpenProcessToken.argtypes = (wintypes.HANDLE, wintypes.DWORD, ctypes.POINTER(wintypes.HANDLE))
|
||||||
|
OpenProcessToken.restype = wintypes.BOOL
|
||||||
|
|
||||||
|
class LUID(ctypes.Structure):
|
||||||
|
_fields_ = [
|
||||||
|
('low_part', wintypes.DWORD),
|
||||||
|
('high_part', wintypes.LONG),
|
||||||
|
]
|
||||||
|
|
||||||
|
def __eq__(self, other):
|
||||||
|
return (
|
||||||
|
self.high_part == other.high_part and
|
||||||
|
self.low_part == other.low_part
|
||||||
|
)
|
||||||
|
|
||||||
|
def __ne__(self, other):
|
||||||
|
return not (self==other)
|
||||||
|
|
||||||
|
LookupPrivilegeValue = ctypes.windll.advapi32.LookupPrivilegeValueW
|
||||||
|
LookupPrivilegeValue.argtypes = (
|
||||||
|
wintypes.LPWSTR, # system name
|
||||||
|
wintypes.LPWSTR, # name
|
||||||
|
ctypes.POINTER(LUID),
|
||||||
|
)
|
||||||
|
LookupPrivilegeValue.restype = wintypes.BOOL
|
||||||
|
|
||||||
|
class TOKEN_INFORMATION_CLASS:
|
||||||
|
TokenUser = 1
|
||||||
|
TokenGroups = 2
|
||||||
|
TokenPrivileges = 3
|
||||||
|
# ... see http://msdn.microsoft.com/en-us/library/aa379626%28VS.85%29.aspx
|
||||||
|
|
||||||
|
SE_PRIVILEGE_ENABLED_BY_DEFAULT = (0x00000001)
|
||||||
|
SE_PRIVILEGE_ENABLED = (0x00000002)
|
||||||
|
SE_PRIVILEGE_REMOVED = (0x00000004)
|
||||||
|
SE_PRIVILEGE_USED_FOR_ACCESS = (0x80000000)
|
||||||
|
|
||||||
|
class LUID_AND_ATTRIBUTES(ctypes.Structure):
|
||||||
|
_fields_ = [
|
||||||
|
('LUID', LUID),
|
||||||
|
('attributes', wintypes.DWORD),
|
||||||
|
]
|
||||||
|
|
||||||
|
def is_enabled(self):
|
||||||
|
return bool(self.attributes & SE_PRIVILEGE_ENABLED)
|
||||||
|
|
||||||
|
def enable(self):
|
||||||
|
self.attributes |= SE_PRIVILEGE_ENABLED
|
||||||
|
|
||||||
|
def get_name(self):
|
||||||
|
size = wintypes.DWORD(10240)
|
||||||
|
buf = ctypes.create_unicode_buffer(size.value)
|
||||||
|
res = LookupPrivilegeName(None, self.LUID, buf, size)
|
||||||
|
if res == 0:
|
||||||
|
raise RuntimeError
|
||||||
|
return buf[:size.value]
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
name = self.name
|
||||||
|
fmt = ['{name}', '{name} (enabled)'][self.is_enabled()]
|
||||||
|
return fmt.format(**vars())
|
||||||
|
|
||||||
|
LookupPrivilegeName = ctypes.windll.advapi32.LookupPrivilegeNameW
|
||||||
|
LookupPrivilegeName.argtypes = (
|
||||||
|
wintypes.LPWSTR, # lpSystemName
|
||||||
|
ctypes.POINTER(LUID), # lpLuid
|
||||||
|
wintypes.LPWSTR, # lpName
|
||||||
|
ctypes.POINTER(wintypes.DWORD), #cchName
|
||||||
|
)
|
||||||
|
LookupPrivilegeName.restype = wintypes.BOOL
|
||||||
|
|
||||||
|
class TOKEN_PRIVILEGES(ctypes.Structure):
|
||||||
|
_fields_ = [
|
||||||
|
('count', wintypes.DWORD),
|
||||||
|
('privileges', LUID_AND_ATTRIBUTES*0),
|
||||||
|
]
|
||||||
|
|
||||||
|
def get_array(self):
|
||||||
|
array_type = LUID_AND_ATTRIBUTES*self.count
|
||||||
|
privileges = ctypes.cast(self.privileges, ctypes.POINTER(array_type)).contents
|
||||||
|
return privileges
|
||||||
|
|
||||||
|
def __iter__(self):
|
||||||
|
return iter(self.get_array())
|
||||||
|
|
||||||
|
PTOKEN_PRIVILEGES = ctypes.POINTER(TOKEN_PRIVILEGES)
|
||||||
|
|
||||||
|
GetTokenInformation = ctypes.windll.advapi32.GetTokenInformation
|
||||||
|
GetTokenInformation.argtypes = [
|
||||||
|
wintypes.HANDLE, # TokenHandle
|
||||||
|
ctypes.c_uint, # TOKEN_INFORMATION_CLASS value
|
||||||
|
ctypes.c_void_p, # TokenInformation
|
||||||
|
wintypes.DWORD, # TokenInformationLength
|
||||||
|
ctypes.POINTER(wintypes.DWORD), # ReturnLength
|
||||||
|
]
|
||||||
|
GetTokenInformation.restype = wintypes.BOOL
|
||||||
|
|
||||||
|
# http://msdn.microsoft.com/en-us/library/aa375202%28VS.85%29.aspx
|
||||||
|
AdjustTokenPrivileges = ctypes.windll.advapi32.AdjustTokenPrivileges
|
||||||
|
AdjustTokenPrivileges.restype = wintypes.BOOL
|
||||||
|
AdjustTokenPrivileges.argtypes = [
|
||||||
|
wintypes.HANDLE, # TokenHandle
|
||||||
|
wintypes.BOOL, # DisableAllPrivileges
|
||||||
|
PTOKEN_PRIVILEGES, # NewState (optional)
|
||||||
|
wintypes.DWORD, # BufferLength of PreviousState
|
||||||
|
PTOKEN_PRIVILEGES, # PreviousState (out, optional)
|
||||||
|
ctypes.POINTER(wintypes.DWORD), # ReturnLength
|
||||||
|
]
|
||||||
|
|
||||||
|
def get_process_token():
|
||||||
|
"Get the current process token"
|
||||||
|
token = wintypes.HANDLE()
|
||||||
|
TOKEN_ALL_ACCESS = 0xf01ff
|
||||||
|
res = OpenProcessToken(GetCurrentProcess(), TOKEN_ALL_ACCESS, token)
|
||||||
|
if not res > 0:
|
||||||
|
raise RuntimeError("Couldn't get process token")
|
||||||
|
return token
|
||||||
|
|
||||||
|
def get_symlink_luid():
|
||||||
|
"Get the LUID for the SeCreateSymbolicLinkPrivilege"
|
||||||
|
symlink_luid = LUID()
|
||||||
|
res = LookupPrivilegeValue(None, "SeCreateSymbolicLinkPrivilege", symlink_luid)
|
||||||
|
if not res > 0:
|
||||||
|
raise RuntimeError("Couldn't lookup privilege value")
|
||||||
|
return symlink_luid
|
||||||
|
|
||||||
|
def get_privilege_information():
|
||||||
|
"Get all privileges associated with the current process."
|
||||||
|
# first call with zero length to determine what size buffer we need
|
||||||
|
|
||||||
|
return_length = wintypes.DWORD()
|
||||||
|
params = [
|
||||||
|
get_process_token(),
|
||||||
|
TOKEN_INFORMATION_CLASS.TokenPrivileges,
|
||||||
|
None,
|
||||||
|
0,
|
||||||
|
return_length,
|
||||||
|
]
|
||||||
|
|
||||||
|
res = GetTokenInformation(*params)
|
||||||
|
|
||||||
|
# assume we now have the necessary length in return_length
|
||||||
|
|
||||||
|
buffer = ctypes.create_string_buffer(return_length.value)
|
||||||
|
params[2] = buffer
|
||||||
|
params[3] = return_length.value
|
||||||
|
|
||||||
|
res = GetTokenInformation(*params)
|
||||||
|
assert res > 0, "Error in second GetTokenInformation (%d)" % res
|
||||||
|
|
||||||
|
privileges = ctypes.cast(buffer, ctypes.POINTER(TOKEN_PRIVILEGES)).contents
|
||||||
|
return privileges
|
||||||
|
|
||||||
|
def report_privilege_information():
|
||||||
|
"Report all privilege information assigned to the current process."
|
||||||
|
privileges = get_privilege_information()
|
||||||
|
print("found {0} privileges".format(privileges.count))
|
||||||
|
tuple(map(print, privileges))
|
||||||
|
|
||||||
|
def enable_symlink_privilege():
|
||||||
|
"""
|
||||||
|
Try to assign the symlink privilege to the current process token.
|
||||||
|
Return True if the assignment is successful.
|
||||||
|
"""
|
||||||
|
# create a space in memory for a TOKEN_PRIVILEGES structure
|
||||||
|
# with one element
|
||||||
|
size = ctypes.sizeof(TOKEN_PRIVILEGES)
|
||||||
|
size += ctypes.sizeof(LUID_AND_ATTRIBUTES)
|
||||||
|
buffer = ctypes.create_string_buffer(size)
|
||||||
|
tp = ctypes.cast(buffer, ctypes.POINTER(TOKEN_PRIVILEGES)).contents
|
||||||
|
tp.count = 1
|
||||||
|
tp.get_array()[0].enable()
|
||||||
|
tp.get_array()[0].LUID = get_symlink_luid()
|
||||||
|
token = get_process_token()
|
||||||
|
res = AdjustTokenPrivileges(token, False, tp, 0, None, None)
|
||||||
|
if res == 0:
|
||||||
|
raise RuntimeError("Error in AdjustTokenPrivileges")
|
||||||
|
|
||||||
|
ERROR_NOT_ALL_ASSIGNED = 1300
|
||||||
|
return ctypes.windll.kernel32.GetLastError() != ERROR_NOT_ALL_ASSIGNED
|
||||||
|
|
||||||
|
def main():
|
||||||
|
assigned = enable_symlink_privilege()
|
||||||
|
msg = ['failure', 'success'][assigned]
|
||||||
|
|
||||||
|
print("Symlink privilege assignment completed with {0}".format(msg))
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
main()
|
|
@ -1,5 +1,5 @@
|
||||||
import unittest
|
import unittest
|
||||||
from test.support import run_unittest, TESTFN
|
from test.support import run_unittest, TESTFN, skip_unless_symlink, can_symlink
|
||||||
import glob
|
import glob
|
||||||
import os
|
import os
|
||||||
import shutil
|
import shutil
|
||||||
|
@ -25,7 +25,7 @@ class GlobTests(unittest.TestCase):
|
||||||
self.mktemp('ZZZ')
|
self.mktemp('ZZZ')
|
||||||
self.mktemp('a', 'bcd', 'EF')
|
self.mktemp('a', 'bcd', 'EF')
|
||||||
self.mktemp('a', 'bcd', 'efg', 'ha')
|
self.mktemp('a', 'bcd', 'efg', 'ha')
|
||||||
if hasattr(os, 'symlink'):
|
if can_symlink():
|
||||||
os.symlink(self.norm('broken'), self.norm('sym1'))
|
os.symlink(self.norm('broken'), self.norm('sym1'))
|
||||||
os.symlink(self.norm('broken'), self.norm('sym2'))
|
os.symlink(self.norm('broken'), self.norm('sym2'))
|
||||||
|
|
||||||
|
@ -98,12 +98,12 @@ class GlobTests(unittest.TestCase):
|
||||||
# either of these results are reasonable
|
# either of these results are reasonable
|
||||||
self.assertIn(res[0], [self.tempdir, self.tempdir + os.sep])
|
self.assertIn(res[0], [self.tempdir, self.tempdir + os.sep])
|
||||||
|
|
||||||
|
@skip_unless_symlink
|
||||||
def test_glob_broken_symlinks(self):
|
def test_glob_broken_symlinks(self):
|
||||||
if hasattr(os, 'symlink'):
|
eq = self.assertSequencesEqual_noorder
|
||||||
eq = self.assertSequencesEqual_noorder
|
eq(self.glob('sym*'), [self.norm('sym1'), self.norm('sym2')])
|
||||||
eq(self.glob('sym*'), [self.norm('sym1'), self.norm('sym2')])
|
eq(self.glob('sym1'), [self.norm('sym1')])
|
||||||
eq(self.glob('sym1'), [self.norm('sym1')])
|
eq(self.glob('sym2'), [self.norm('sym2')])
|
||||||
eq(self.glob('sym2'), [self.norm('sym2')])
|
|
||||||
|
|
||||||
|
|
||||||
def test_main():
|
def test_main():
|
||||||
|
|
|
@ -299,7 +299,7 @@ class CGIHTTPServerTestCase(BaseTestCase):
|
||||||
|
|
||||||
# The shebang line should be pure ASCII: use symlink if possible.
|
# The shebang line should be pure ASCII: use symlink if possible.
|
||||||
# See issue #7668.
|
# See issue #7668.
|
||||||
if hasattr(os, 'symlink'):
|
if support.can_symlink():
|
||||||
self.pythonexe = os.path.join(self.parent_dir, 'python')
|
self.pythonexe = os.path.join(self.parent_dir, 'python')
|
||||||
os.symlink(sys.executable, self.pythonexe)
|
os.symlink(sys.executable, self.pythonexe)
|
||||||
else:
|
else:
|
||||||
|
|
|
@ -520,7 +520,7 @@ class WalkTests(unittest.TestCase):
|
||||||
f = open(path, "w")
|
f = open(path, "w")
|
||||||
f.write("I'm " + path + " and proud of it. Blame test_os.\n")
|
f.write("I'm " + path + " and proud of it. Blame test_os.\n")
|
||||||
f.close()
|
f.close()
|
||||||
if hasattr(os, "symlink"):
|
if support.can_symlink():
|
||||||
os.symlink(os.path.abspath(t2_path), link_path)
|
os.symlink(os.path.abspath(t2_path), link_path)
|
||||||
sub2_tree = (sub2_path, ["link"], ["tmp3"])
|
sub2_tree = (sub2_path, ["link"], ["tmp3"])
|
||||||
else:
|
else:
|
||||||
|
@ -564,7 +564,7 @@ class WalkTests(unittest.TestCase):
|
||||||
self.assertEqual(all[flipped + 1], (sub1_path, ["SUB11"], ["tmp2"]))
|
self.assertEqual(all[flipped + 1], (sub1_path, ["SUB11"], ["tmp2"]))
|
||||||
self.assertEqual(all[2 - 2 * flipped], sub2_tree)
|
self.assertEqual(all[2 - 2 * flipped], sub2_tree)
|
||||||
|
|
||||||
if hasattr(os, "symlink"):
|
if support.can_symlink():
|
||||||
# Walk, following symlinks.
|
# Walk, following symlinks.
|
||||||
for root, dirs, files in os.walk(walk_path, followlinks=True):
|
for root, dirs, files in os.walk(walk_path, followlinks=True):
|
||||||
if root == link_path:
|
if root == link_path:
|
||||||
|
@ -1033,6 +1033,83 @@ class Win32KillTests(unittest.TestCase):
|
||||||
self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
|
self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
|
||||||
|
|
||||||
|
|
||||||
|
def skipUnlessWindows6(test):
|
||||||
|
if hasattr(sys, 'getwindowsversion') and sys.getwindowsversion().major >= 6:
|
||||||
|
return test
|
||||||
|
return unittest.skip("Requires Windows Vista or later")(test)
|
||||||
|
|
||||||
|
@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
|
||||||
|
@support.skip_unless_symlink
|
||||||
|
class Win32SymlinkTests(unittest.TestCase):
|
||||||
|
filelink = 'filelinktest'
|
||||||
|
filelink_target = os.path.abspath(__file__)
|
||||||
|
dirlink = 'dirlinktest'
|
||||||
|
dirlink_target = os.path.dirname(filelink_target)
|
||||||
|
missing_link = 'missing link'
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
assert os.path.exists(self.dirlink_target)
|
||||||
|
assert os.path.exists(self.filelink_target)
|
||||||
|
assert not os.path.exists(self.dirlink)
|
||||||
|
assert not os.path.exists(self.filelink)
|
||||||
|
assert not os.path.exists(self.missing_link)
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
if os.path.exists(self.filelink):
|
||||||
|
os.remove(self.filelink)
|
||||||
|
if os.path.exists(self.dirlink):
|
||||||
|
os.rmdir(self.dirlink)
|
||||||
|
if os.path.lexists(self.missing_link):
|
||||||
|
os.remove(self.missing_link)
|
||||||
|
|
||||||
|
def test_directory_link(self):
|
||||||
|
os.symlink(self.dirlink_target, self.dirlink)
|
||||||
|
self.assertTrue(os.path.exists(self.dirlink))
|
||||||
|
self.assertTrue(os.path.isdir(self.dirlink))
|
||||||
|
self.assertTrue(os.path.islink(self.dirlink))
|
||||||
|
self.check_stat(self.dirlink, self.dirlink_target)
|
||||||
|
|
||||||
|
def test_file_link(self):
|
||||||
|
os.symlink(self.filelink_target, self.filelink)
|
||||||
|
self.assertTrue(os.path.exists(self.filelink))
|
||||||
|
self.assertTrue(os.path.isfile(self.filelink))
|
||||||
|
self.assertTrue(os.path.islink(self.filelink))
|
||||||
|
self.check_stat(self.filelink, self.filelink_target)
|
||||||
|
|
||||||
|
def _create_missing_dir_link(self):
|
||||||
|
'Create a "directory" link to a non-existent target'
|
||||||
|
linkname = self.missing_link
|
||||||
|
if os.path.lexists(linkname):
|
||||||
|
os.remove(linkname)
|
||||||
|
target = r'c:\\target does not exist.29r3c740'
|
||||||
|
assert not os.path.exists(target)
|
||||||
|
target_is_dir = True
|
||||||
|
os.symlink(target, linkname, target_is_dir)
|
||||||
|
|
||||||
|
def test_remove_directory_link_to_missing_target(self):
|
||||||
|
self._create_missing_dir_link()
|
||||||
|
# For compatibility with Unix, os.remove will check the
|
||||||
|
# directory status and call RemoveDirectory if the symlink
|
||||||
|
# was created with target_is_dir==True.
|
||||||
|
os.remove(self.missing_link)
|
||||||
|
|
||||||
|
@unittest.skip("currently fails; consider for improvement")
|
||||||
|
def test_isdir_on_directory_link_to_missing_target(self):
|
||||||
|
self._create_missing_dir_link()
|
||||||
|
# consider having isdir return true for directory links
|
||||||
|
self.assertTrue(os.path.isdir(self.missing_link))
|
||||||
|
|
||||||
|
@unittest.skip("currently fails; consider for improvement")
|
||||||
|
def test_rmdir_on_directory_link_to_missing_target(self):
|
||||||
|
self._create_missing_dir_link()
|
||||||
|
# consider allowing rmdir to remove directory links
|
||||||
|
os.rmdir(self.missing_link)
|
||||||
|
|
||||||
|
def check_stat(self, link, target):
|
||||||
|
self.assertEqual(os.stat(link), os.stat(target))
|
||||||
|
self.assertNotEqual(os.lstat(link), os.stat(link))
|
||||||
|
|
||||||
|
|
||||||
class MiscTests(unittest.TestCase):
|
class MiscTests(unittest.TestCase):
|
||||||
|
|
||||||
@unittest.skipIf(os.name == "nt", "POSIX specific test")
|
@unittest.skipIf(os.name == "nt", "POSIX specific test")
|
||||||
|
@ -1056,6 +1133,7 @@ def test_main():
|
||||||
PosixUidGidTests,
|
PosixUidGidTests,
|
||||||
Pep383Tests,
|
Pep383Tests,
|
||||||
Win32KillTests,
|
Win32KillTests,
|
||||||
|
Win32SymlinkTests,
|
||||||
MiscTests,
|
MiscTests,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -10,20 +10,26 @@ class PlatformTest(unittest.TestCase):
|
||||||
def test_architecture(self):
|
def test_architecture(self):
|
||||||
res = platform.architecture()
|
res = platform.architecture()
|
||||||
|
|
||||||
if hasattr(os, "symlink"):
|
@support.skip_unless_symlink
|
||||||
def test_architecture_via_symlink(self): # issue3762
|
def test_architecture_via_symlink(self): # issue3762
|
||||||
def get(python):
|
# On Windows, the EXE needs to know where pythonXY.dll is at so we have
|
||||||
cmd = [python, '-c',
|
# to add the directory to the path.
|
||||||
'import platform; print(platform.architecture())']
|
if sys.platform == "win32":
|
||||||
p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
|
os.environ["Path"] = "{};{}".format(os.path.dirname(sys.executable),
|
||||||
return p.communicate()
|
os.environ["Path"])
|
||||||
real = os.path.realpath(sys.executable)
|
|
||||||
link = os.path.abspath(support.TESTFN)
|
def get(python):
|
||||||
os.symlink(real, link)
|
cmd = [python, '-c',
|
||||||
try:
|
'import platform; print(platform.architecture())']
|
||||||
self.assertEqual(get(real), get(link))
|
p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
|
||||||
finally:
|
return p.communicate()
|
||||||
os.remove(link)
|
real = os.path.realpath(sys.executable)
|
||||||
|
link = os.path.abspath(support.TESTFN)
|
||||||
|
os.symlink(real, link)
|
||||||
|
try:
|
||||||
|
self.assertEqual(get(real), get(link))
|
||||||
|
finally:
|
||||||
|
os.remove(link)
|
||||||
|
|
||||||
def test_platform(self):
|
def test_platform(self):
|
||||||
for aliased in (False, True):
|
for aliased in (False, True):
|
||||||
|
|
|
@ -1,7 +1,9 @@
|
||||||
import unittest
|
import unittest
|
||||||
from test import support, test_genericpath
|
from test import support, test_genericpath
|
||||||
|
|
||||||
import posixpath, os
|
import posixpath
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
from posixpath import realpath, abspath, dirname, basename
|
from posixpath import realpath, abspath, dirname, basename
|
||||||
|
|
||||||
# An absolute path to a temporary filename for testing. We can't rely on TESTFN
|
# An absolute path to a temporary filename for testing. We can't rely on TESTFN
|
||||||
|
@ -9,6 +11,16 @@ from posixpath import realpath, abspath, dirname, basename
|
||||||
|
|
||||||
ABSTFN = abspath(support.TESTFN)
|
ABSTFN = abspath(support.TESTFN)
|
||||||
|
|
||||||
|
def skip_if_ABSTFN_contains_backslash(test):
|
||||||
|
"""
|
||||||
|
On Windows, posixpath.abspath still returns paths with backslashes
|
||||||
|
instead of posix forward slashes. If this is the case, several tests
|
||||||
|
fail, so skip them.
|
||||||
|
"""
|
||||||
|
found_backslash = '\\' in ABSTFN
|
||||||
|
msg = "ABSTFN is not a posix path - tests fail"
|
||||||
|
return [test, unittest.skip(msg)(test)][found_backslash]
|
||||||
|
|
||||||
def safe_rmdir(dirname):
|
def safe_rmdir(dirname):
|
||||||
try:
|
try:
|
||||||
os.rmdir(dirname)
|
os.rmdir(dirname)
|
||||||
|
@ -143,7 +155,7 @@ class PosixPathTest(unittest.TestCase):
|
||||||
f.write(b"foo")
|
f.write(b"foo")
|
||||||
f.close()
|
f.close()
|
||||||
self.assertIs(posixpath.islink(support.TESTFN + "1"), False)
|
self.assertIs(posixpath.islink(support.TESTFN + "1"), False)
|
||||||
if hasattr(os, "symlink"):
|
if support.can_symlink():
|
||||||
os.symlink(support.TESTFN + "1", support.TESTFN + "2")
|
os.symlink(support.TESTFN + "1", support.TESTFN + "2")
|
||||||
self.assertIs(posixpath.islink(support.TESTFN + "2"), True)
|
self.assertIs(posixpath.islink(support.TESTFN + "2"), True)
|
||||||
os.remove(support.TESTFN + "1")
|
os.remove(support.TESTFN + "1")
|
||||||
|
@ -154,85 +166,59 @@ class PosixPathTest(unittest.TestCase):
|
||||||
if not f.close():
|
if not f.close():
|
||||||
f.close()
|
f.close()
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _create_file(filename):
|
||||||
|
with open(filename, 'wb') as f:
|
||||||
|
f.write(b'foo')
|
||||||
|
|
||||||
def test_samefile(self):
|
def test_samefile(self):
|
||||||
f = open(support.TESTFN + "1", "wb")
|
test_fn = support.TESTFN + "1"
|
||||||
try:
|
self._create_file(test_fn)
|
||||||
f.write(b"foo")
|
self.assertTrue(posixpath.samefile(test_fn, test_fn))
|
||||||
f.close()
|
self.assertRaises(TypeError, posixpath.samefile)
|
||||||
self.assertIs(
|
|
||||||
posixpath.samefile(
|
@unittest.skipIf(
|
||||||
support.TESTFN + "1",
|
sys.platform.startswith('win'),
|
||||||
support.TESTFN + "1"
|
"posixpath.samefile does not work on links in Windows")
|
||||||
),
|
@support.skip_unless_symlink
|
||||||
True
|
def test_samefile_on_links(self):
|
||||||
)
|
test_fn1 = support.TESTFN + "1"
|
||||||
# If we don't have links, assume that os.stat doesn't return resonable
|
test_fn2 = support.TESTFN + "2"
|
||||||
# inode information and thus, that samefile() doesn't work
|
self._create_file(test_fn1)
|
||||||
if hasattr(os, "symlink"):
|
|
||||||
os.symlink(
|
os.symlink(test_fn1, test_fn2)
|
||||||
support.TESTFN + "1",
|
self.assertTrue(posixpath.samefile(test_fn1, test_fn2))
|
||||||
support.TESTFN + "2"
|
os.remove(test_fn2)
|
||||||
)
|
|
||||||
self.assertIs(
|
self._create_file(test_fn2)
|
||||||
posixpath.samefile(
|
self.assertFalse(posixpath.samefile(test_fn1, test_fn2))
|
||||||
support.TESTFN + "1",
|
|
||||||
support.TESTFN + "2"
|
|
||||||
),
|
|
||||||
True
|
|
||||||
)
|
|
||||||
os.remove(support.TESTFN + "2")
|
|
||||||
f = open(support.TESTFN + "2", "wb")
|
|
||||||
f.write(b"bar")
|
|
||||||
f.close()
|
|
||||||
self.assertIs(
|
|
||||||
posixpath.samefile(
|
|
||||||
support.TESTFN + "1",
|
|
||||||
support.TESTFN + "2"
|
|
||||||
),
|
|
||||||
False
|
|
||||||
)
|
|
||||||
finally:
|
|
||||||
if not f.close():
|
|
||||||
f.close()
|
|
||||||
|
|
||||||
def test_samestat(self):
|
def test_samestat(self):
|
||||||
f = open(support.TESTFN + "1", "wb")
|
test_fn = support.TESTFN + "1"
|
||||||
try:
|
self._create_file(test_fn)
|
||||||
f.write(b"foo")
|
test_fns = [test_fn]*2
|
||||||
f.close()
|
stats = map(os.stat, test_fns)
|
||||||
self.assertIs(
|
self.assertTrue(posixpath.samestat(*stats))
|
||||||
posixpath.samestat(
|
|
||||||
os.stat(support.TESTFN + "1"),
|
@unittest.skipIf(
|
||||||
os.stat(support.TESTFN + "1")
|
sys.platform.startswith('win'),
|
||||||
),
|
"posixpath.samestat does not work on links in Windows")
|
||||||
True
|
@support.skip_unless_symlink
|
||||||
)
|
def test_samestat_on_links(self):
|
||||||
# If we don't have links, assume that os.stat() doesn't return resonable
|
test_fn1 = support.TESTFN + "1"
|
||||||
# inode information and thus, that samefile() doesn't work
|
test_fn2 = support.TESTFN + "2"
|
||||||
if hasattr(os, "symlink"):
|
test_fns = (test_fn1, test_fn2)
|
||||||
if hasattr(os, "symlink"):
|
os.symlink(*test_fns)
|
||||||
os.symlink(support.TESTFN + "1", support.TESTFN + "2")
|
stats = map(os.stat, test_fns)
|
||||||
self.assertIs(
|
self.assertTrue(posixpath.samestat(*stats))
|
||||||
posixpath.samestat(
|
os.remove(test_fn2)
|
||||||
os.stat(support.TESTFN + "1"),
|
|
||||||
os.stat(support.TESTFN + "2")
|
self._create_file(test_fn2)
|
||||||
),
|
stats = map(os.stat, test_fns)
|
||||||
True
|
self.assertFalse(posixpath.samestat(*stats))
|
||||||
)
|
|
||||||
os.remove(support.TESTFN + "2")
|
self.assertRaises(TypeError, posixpath.samestat)
|
||||||
f = open(support.TESTFN + "2", "wb")
|
|
||||||
f.write(b"bar")
|
|
||||||
f.close()
|
|
||||||
self.assertIs(
|
|
||||||
posixpath.samestat(
|
|
||||||
os.stat(support.TESTFN + "1"),
|
|
||||||
os.stat(support.TESTFN + "2")
|
|
||||||
),
|
|
||||||
False
|
|
||||||
)
|
|
||||||
finally:
|
|
||||||
if not f.close():
|
|
||||||
f.close()
|
|
||||||
|
|
||||||
def test_ismount(self):
|
def test_ismount(self):
|
||||||
self.assertIs(posixpath.ismount("/"), True)
|
self.assertIs(posixpath.ismount("/"), True)
|
||||||
|
@ -286,103 +272,112 @@ class PosixPathTest(unittest.TestCase):
|
||||||
self.assertEqual(posixpath.normpath(b"///..//./foo/.//bar"),
|
self.assertEqual(posixpath.normpath(b"///..//./foo/.//bar"),
|
||||||
b"/foo/bar")
|
b"/foo/bar")
|
||||||
|
|
||||||
if hasattr(os, "symlink"):
|
@support.skip_unless_symlink
|
||||||
def test_realpath_basic(self):
|
@skip_if_ABSTFN_contains_backslash
|
||||||
# Basic operation.
|
def test_realpath_basic(self):
|
||||||
try:
|
# Basic operation.
|
||||||
os.symlink(ABSTFN+"1", ABSTFN)
|
try:
|
||||||
self.assertEqual(realpath(ABSTFN), ABSTFN+"1")
|
os.symlink(ABSTFN+"1", ABSTFN)
|
||||||
finally:
|
self.assertEqual(realpath(ABSTFN), ABSTFN+"1")
|
||||||
support.unlink(ABSTFN)
|
finally:
|
||||||
|
support.unlink(ABSTFN)
|
||||||
|
|
||||||
def test_realpath_symlink_loops(self):
|
@support.skip_unless_symlink
|
||||||
# Bug #930024, return the path unchanged if we get into an infinite
|
@skip_if_ABSTFN_contains_backslash
|
||||||
# symlink loop.
|
def test_realpath_symlink_loops(self):
|
||||||
try:
|
# Bug #930024, return the path unchanged if we get into an infinite
|
||||||
old_path = abspath('.')
|
# symlink loop.
|
||||||
os.symlink(ABSTFN, ABSTFN)
|
try:
|
||||||
self.assertEqual(realpath(ABSTFN), ABSTFN)
|
old_path = abspath('.')
|
||||||
|
os.symlink(ABSTFN, ABSTFN)
|
||||||
|
self.assertEqual(realpath(ABSTFN), ABSTFN)
|
||||||
|
|
||||||
os.symlink(ABSTFN+"1", ABSTFN+"2")
|
os.symlink(ABSTFN+"1", ABSTFN+"2")
|
||||||
os.symlink(ABSTFN+"2", ABSTFN+"1")
|
os.symlink(ABSTFN+"2", ABSTFN+"1")
|
||||||
self.assertEqual(realpath(ABSTFN+"1"), ABSTFN+"1")
|
self.assertEqual(realpath(ABSTFN+"1"), ABSTFN+"1")
|
||||||
self.assertEqual(realpath(ABSTFN+"2"), ABSTFN+"2")
|
self.assertEqual(realpath(ABSTFN+"2"), ABSTFN+"2")
|
||||||
|
|
||||||
# Test using relative path as well.
|
# Test using relative path as well.
|
||||||
os.chdir(dirname(ABSTFN))
|
os.chdir(dirname(ABSTFN))
|
||||||
self.assertEqual(realpath(basename(ABSTFN)), ABSTFN)
|
self.assertEqual(realpath(basename(ABSTFN)), ABSTFN)
|
||||||
finally:
|
finally:
|
||||||
os.chdir(old_path)
|
os.chdir(old_path)
|
||||||
support.unlink(ABSTFN)
|
support.unlink(ABSTFN)
|
||||||
support.unlink(ABSTFN+"1")
|
support.unlink(ABSTFN+"1")
|
||||||
support.unlink(ABSTFN+"2")
|
support.unlink(ABSTFN+"2")
|
||||||
|
|
||||||
def test_realpath_resolve_parents(self):
|
@support.skip_unless_symlink
|
||||||
# We also need to resolve any symlinks in the parents of a relative
|
@skip_if_ABSTFN_contains_backslash
|
||||||
# path passed to realpath. E.g.: current working directory is
|
def test_realpath_resolve_parents(self):
|
||||||
# /usr/doc with 'doc' being a symlink to /usr/share/doc. We call
|
# We also need to resolve any symlinks in the parents of a relative
|
||||||
# realpath("a"). This should return /usr/share/doc/a/.
|
# path passed to realpath. E.g.: current working directory is
|
||||||
try:
|
# /usr/doc with 'doc' being a symlink to /usr/share/doc. We call
|
||||||
old_path = abspath('.')
|
# realpath("a"). This should return /usr/share/doc/a/.
|
||||||
os.mkdir(ABSTFN)
|
try:
|
||||||
os.mkdir(ABSTFN + "/y")
|
old_path = abspath('.')
|
||||||
os.symlink(ABSTFN + "/y", ABSTFN + "/k")
|
os.mkdir(ABSTFN)
|
||||||
|
os.mkdir(ABSTFN + "/y")
|
||||||
|
os.symlink(ABSTFN + "/y", ABSTFN + "/k")
|
||||||
|
|
||||||
os.chdir(ABSTFN + "/k")
|
os.chdir(ABSTFN + "/k")
|
||||||
self.assertEqual(realpath("a"), ABSTFN + "/y/a")
|
self.assertEqual(realpath("a"), ABSTFN + "/y/a")
|
||||||
finally:
|
finally:
|
||||||
os.chdir(old_path)
|
os.chdir(old_path)
|
||||||
support.unlink(ABSTFN + "/k")
|
support.unlink(ABSTFN + "/k")
|
||||||
safe_rmdir(ABSTFN + "/y")
|
safe_rmdir(ABSTFN + "/y")
|
||||||
safe_rmdir(ABSTFN)
|
safe_rmdir(ABSTFN)
|
||||||
|
|
||||||
def test_realpath_resolve_before_normalizing(self):
|
@support.skip_unless_symlink
|
||||||
# Bug #990669: Symbolic links should be resolved before we
|
@skip_if_ABSTFN_contains_backslash
|
||||||
# normalize the path. E.g.: if we have directories 'a', 'k' and 'y'
|
def test_realpath_resolve_before_normalizing(self):
|
||||||
# in the following hierarchy:
|
# Bug #990669: Symbolic links should be resolved before we
|
||||||
# a/k/y
|
# normalize the path. E.g.: if we have directories 'a', 'k' and 'y'
|
||||||
#
|
# in the following hierarchy:
|
||||||
# and a symbolic link 'link-y' pointing to 'y' in directory 'a',
|
# a/k/y
|
||||||
# then realpath("link-y/..") should return 'k', not 'a'.
|
#
|
||||||
try:
|
# and a symbolic link 'link-y' pointing to 'y' in directory 'a',
|
||||||
old_path = abspath('.')
|
# then realpath("link-y/..") should return 'k', not 'a'.
|
||||||
os.mkdir(ABSTFN)
|
try:
|
||||||
os.mkdir(ABSTFN + "/k")
|
old_path = abspath('.')
|
||||||
os.mkdir(ABSTFN + "/k/y")
|
os.mkdir(ABSTFN)
|
||||||
os.symlink(ABSTFN + "/k/y", ABSTFN + "/link-y")
|
os.mkdir(ABSTFN + "/k")
|
||||||
|
os.mkdir(ABSTFN + "/k/y")
|
||||||
|
os.symlink(ABSTFN + "/k/y", ABSTFN + "/link-y")
|
||||||
|
|
||||||
# Absolute path.
|
# Absolute path.
|
||||||
self.assertEqual(realpath(ABSTFN + "/link-y/.."), ABSTFN + "/k")
|
self.assertEqual(realpath(ABSTFN + "/link-y/.."), ABSTFN + "/k")
|
||||||
# Relative path.
|
# Relative path.
|
||||||
os.chdir(dirname(ABSTFN))
|
os.chdir(dirname(ABSTFN))
|
||||||
self.assertEqual(realpath(basename(ABSTFN) + "/link-y/.."),
|
self.assertEqual(realpath(basename(ABSTFN) + "/link-y/.."),
|
||||||
ABSTFN + "/k")
|
ABSTFN + "/k")
|
||||||
finally:
|
finally:
|
||||||
os.chdir(old_path)
|
os.chdir(old_path)
|
||||||
support.unlink(ABSTFN + "/link-y")
|
support.unlink(ABSTFN + "/link-y")
|
||||||
safe_rmdir(ABSTFN + "/k/y")
|
safe_rmdir(ABSTFN + "/k/y")
|
||||||
safe_rmdir(ABSTFN + "/k")
|
safe_rmdir(ABSTFN + "/k")
|
||||||
safe_rmdir(ABSTFN)
|
safe_rmdir(ABSTFN)
|
||||||
|
|
||||||
def test_realpath_resolve_first(self):
|
@support.skip_unless_symlink
|
||||||
# Bug #1213894: The first component of the path, if not absolute,
|
@skip_if_ABSTFN_contains_backslash
|
||||||
# must be resolved too.
|
def test_realpath_resolve_first(self):
|
||||||
|
# Bug #1213894: The first component of the path, if not absolute,
|
||||||
|
# must be resolved too.
|
||||||
|
|
||||||
try:
|
try:
|
||||||
old_path = abspath('.')
|
old_path = abspath('.')
|
||||||
os.mkdir(ABSTFN)
|
os.mkdir(ABSTFN)
|
||||||
os.mkdir(ABSTFN + "/k")
|
os.mkdir(ABSTFN + "/k")
|
||||||
os.symlink(ABSTFN, ABSTFN + "link")
|
os.symlink(ABSTFN, ABSTFN + "link")
|
||||||
os.chdir(dirname(ABSTFN))
|
os.chdir(dirname(ABSTFN))
|
||||||
|
|
||||||
base = basename(ABSTFN)
|
base = basename(ABSTFN)
|
||||||
self.assertEqual(realpath(base + "link"), ABSTFN)
|
self.assertEqual(realpath(base + "link"), ABSTFN)
|
||||||
self.assertEqual(realpath(base + "link/k"), ABSTFN + "/k")
|
self.assertEqual(realpath(base + "link/k"), ABSTFN + "/k")
|
||||||
finally:
|
finally:
|
||||||
os.chdir(old_path)
|
os.chdir(old_path)
|
||||||
support.unlink(ABSTFN + "link")
|
support.unlink(ABSTFN + "link")
|
||||||
safe_rmdir(ABSTFN + "/k")
|
safe_rmdir(ABSTFN + "/k")
|
||||||
safe_rmdir(ABSTFN)
|
safe_rmdir(ABSTFN)
|
||||||
|
|
||||||
def test_relpath(self):
|
def test_relpath(self):
|
||||||
(real_getcwd, os.getcwd) = (os.getcwd, lambda: r"/home/user/bar")
|
(real_getcwd, os.getcwd) = (os.getcwd, lambda: r"/home/user/bar")
|
||||||
|
|
|
@ -271,7 +271,7 @@ class TestShutil(unittest.TestCase):
|
||||||
shutil.rmtree(src_dir)
|
shutil.rmtree(src_dir)
|
||||||
shutil.rmtree(os.path.dirname(dst_dir))
|
shutil.rmtree(os.path.dirname(dst_dir))
|
||||||
|
|
||||||
@unittest.skipUnless(hasattr(os, 'symlink'), 'requires os.symlink')
|
@support.skip_unless_symlink
|
||||||
def test_dont_copy_file_onto_link_to_itself(self):
|
def test_dont_copy_file_onto_link_to_itself(self):
|
||||||
# bug 851123.
|
# bug 851123.
|
||||||
os.mkdir(TESTFN)
|
os.mkdir(TESTFN)
|
||||||
|
@ -282,10 +282,11 @@ class TestShutil(unittest.TestCase):
|
||||||
f.write('cheddar')
|
f.write('cheddar')
|
||||||
f.close()
|
f.close()
|
||||||
|
|
||||||
os.link(src, dst)
|
if hasattr(os, "link"):
|
||||||
self.assertRaises(shutil.Error, shutil.copyfile, src, dst)
|
os.link(src, dst)
|
||||||
self.assertEqual(open(src,'r').read(), 'cheddar')
|
self.assertRaises(shutil.Error, shutil.copyfile, src, dst)
|
||||||
os.remove(dst)
|
self.assertEqual(open(src,'r').read(), 'cheddar')
|
||||||
|
os.remove(dst)
|
||||||
|
|
||||||
# Using `src` here would mean we end up with a symlink pointing
|
# Using `src` here would mean we end up with a symlink pointing
|
||||||
# to TESTFN/TESTFN/cheese, while it should point at
|
# to TESTFN/TESTFN/cheese, while it should point at
|
||||||
|
@ -300,30 +301,30 @@ class TestShutil(unittest.TestCase):
|
||||||
except OSError:
|
except OSError:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@unittest.skipUnless(hasattr(os, 'symlink'), 'requires os.symlink')
|
@support.skip_unless_symlink
|
||||||
def test_rmtree_on_symlink(self):
|
def test_rmtree_on_symlink(self):
|
||||||
# bug 1669.
|
# bug 1669.
|
||||||
os.mkdir(TESTFN)
|
os.mkdir(TESTFN)
|
||||||
try:
|
|
||||||
src = os.path.join(TESTFN, 'cheese')
|
|
||||||
dst = os.path.join(TESTFN, 'shop')
|
|
||||||
os.mkdir(src)
|
|
||||||
os.symlink(src, dst)
|
|
||||||
self.assertRaises(OSError, shutil.rmtree, dst)
|
|
||||||
finally:
|
|
||||||
shutil.rmtree(TESTFN, ignore_errors=True)
|
|
||||||
|
|
||||||
@unittest.skipUnless(hasattr(os, 'mkfifo'), 'requires os.mkfifo')
|
|
||||||
# Issue #3002: copyfile and copytree block indefinitely on named pipes
|
|
||||||
def test_copyfile_named_pipe(self):
|
|
||||||
os.mkfifo(TESTFN)
|
|
||||||
try:
|
try:
|
||||||
self.assertRaises(shutil.SpecialFileError,
|
src = os.path.join(TESTFN, 'cheese')
|
||||||
shutil.copyfile, TESTFN, TESTFN2)
|
dst = os.path.join(TESTFN, 'shop')
|
||||||
self.assertRaises(shutil.SpecialFileError,
|
os.mkdir(src)
|
||||||
shutil.copyfile, __file__, TESTFN)
|
os.symlink(src, dst)
|
||||||
|
self.assertRaises(OSError, shutil.rmtree, dst)
|
||||||
finally:
|
finally:
|
||||||
os.remove(TESTFN)
|
shutil.rmtree(TESTFN, ignore_errors=True)
|
||||||
|
|
||||||
|
if hasattr(os, "mkfifo"):
|
||||||
|
# Issue #3002: copyfile and copytree block indefinitely on named pipes
|
||||||
|
def test_copyfile_named_pipe(self):
|
||||||
|
os.mkfifo(TESTFN)
|
||||||
|
try:
|
||||||
|
self.assertRaises(shutil.SpecialFileError,
|
||||||
|
shutil.copyfile, TESTFN, TESTFN2)
|
||||||
|
self.assertRaises(shutil.SpecialFileError,
|
||||||
|
shutil.copyfile, __file__, TESTFN)
|
||||||
|
finally:
|
||||||
|
os.remove(TESTFN)
|
||||||
|
|
||||||
@unittest.skipUnless(hasattr(os, 'mkfifo'), 'requires os.mkfifo')
|
@unittest.skipUnless(hasattr(os, 'mkfifo'), 'requires os.mkfifo')
|
||||||
def test_copytree_named_pipe(self):
|
def test_copytree_named_pipe(self):
|
||||||
|
@ -361,7 +362,7 @@ class TestShutil(unittest.TestCase):
|
||||||
shutil.copytree(src_dir, dst_dir, copy_function=_copy)
|
shutil.copytree(src_dir, dst_dir, copy_function=_copy)
|
||||||
self.assertEquals(len(copied), 2)
|
self.assertEquals(len(copied), 2)
|
||||||
|
|
||||||
@unittest.skipUnless(hasattr(os, 'symlink'), 'requires os.symlink')
|
@support.skip_unless_symlink
|
||||||
def test_copytree_dangling_symlinks(self):
|
def test_copytree_dangling_symlinks(self):
|
||||||
|
|
||||||
# a dangling symlink raises an error at the end
|
# a dangling symlink raises an error at the end
|
||||||
|
|
|
@ -263,7 +263,7 @@ class SysModuleTest(unittest.TestCase):
|
||||||
# Raise SkipTest if sys doesn't have getwindowsversion attribute
|
# Raise SkipTest if sys doesn't have getwindowsversion attribute
|
||||||
test.support.get_attribute(sys, "getwindowsversion")
|
test.support.get_attribute(sys, "getwindowsversion")
|
||||||
v = sys.getwindowsversion()
|
v = sys.getwindowsversion()
|
||||||
self.assertEqual(len(v), 5)
|
self.assertEqual(len(v), 9)
|
||||||
self.assertIsInstance(v[0], int)
|
self.assertIsInstance(v[0], int)
|
||||||
self.assertIsInstance(v[1], int)
|
self.assertIsInstance(v[1], int)
|
||||||
self.assertIsInstance(v[2], int)
|
self.assertIsInstance(v[2], int)
|
||||||
|
|
|
@ -12,7 +12,7 @@ import shutil
|
||||||
from copy import copy, deepcopy
|
from copy import copy, deepcopy
|
||||||
|
|
||||||
from test.support import (run_unittest, TESTFN, unlink, get_attribute,
|
from test.support import (run_unittest, TESTFN, unlink, get_attribute,
|
||||||
captured_stdout)
|
captured_stdout, skip_unless_symlink)
|
||||||
|
|
||||||
import sysconfig
|
import sysconfig
|
||||||
from sysconfig import (get_paths, get_platform, get_config_vars,
|
from sysconfig import (get_paths, get_platform, get_config_vars,
|
||||||
|
@ -239,17 +239,23 @@ class TestSysConfig(unittest.TestCase):
|
||||||
'posix_home', 'posix_prefix', 'posix_user')
|
'posix_home', 'posix_prefix', 'posix_user')
|
||||||
self.assertEquals(get_scheme_names(), wanted)
|
self.assertEquals(get_scheme_names(), wanted)
|
||||||
|
|
||||||
|
@skip_unless_symlink
|
||||||
def test_symlink(self):
|
def test_symlink(self):
|
||||||
|
# On Windows, the EXE needs to know where pythonXY.dll is at so we have
|
||||||
|
# to add the directory to the path.
|
||||||
|
if sys.platform == "win32":
|
||||||
|
os.environ["Path"] = "{};{}".format(os.path.dirname(sys.executable),
|
||||||
|
os.environ["Path"])
|
||||||
|
|
||||||
# Issue 7880
|
# Issue 7880
|
||||||
symlink = get_attribute(os, "symlink")
|
|
||||||
def get(python):
|
def get(python):
|
||||||
cmd = [python, '-c',
|
cmd = [python, '-c',
|
||||||
'import sysconfig; print(sysconfig.get_platform())']
|
'import sysconfig; print(sysconfig.get_platform())']
|
||||||
p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
|
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, env=os.environ)
|
||||||
return p.communicate()
|
return p.communicate()
|
||||||
real = os.path.realpath(sys.executable)
|
real = os.path.realpath(sys.executable)
|
||||||
link = os.path.abspath(TESTFN)
|
link = os.path.abspath(TESTFN)
|
||||||
symlink(real, link)
|
os.symlink(real, link)
|
||||||
try:
|
try:
|
||||||
self.assertEqual(get(real), get(link))
|
self.assertEqual(get(real), get(link))
|
||||||
finally:
|
finally:
|
||||||
|
|
|
@ -291,6 +291,8 @@ class MiscReadTest(CommonReadTest):
|
||||||
self.assertTrue(self.tar.getmembers()[-1].name == "misc/eof",
|
self.assertTrue(self.tar.getmembers()[-1].name == "misc/eof",
|
||||||
"could not find all members")
|
"could not find all members")
|
||||||
|
|
||||||
|
@unittest.skipUnless(hasattr(os, "link"), "Missing hardlink implementation")
|
||||||
|
@support.skip_unless_symlink
|
||||||
def test_extract_hardlink(self):
|
def test_extract_hardlink(self):
|
||||||
# Test hardlink extraction (e.g. bug #857297).
|
# Test hardlink extraction (e.g. bug #857297).
|
||||||
tar = tarfile.open(tarname, errorlevel=1, encoding="iso8859-1")
|
tar = tarfile.open(tarname, errorlevel=1, encoding="iso8859-1")
|
||||||
|
@ -695,16 +697,16 @@ class WriteTest(WriteTestBase):
|
||||||
os.remove(target)
|
os.remove(target)
|
||||||
os.remove(link)
|
os.remove(link)
|
||||||
|
|
||||||
|
@support.skip_unless_symlink
|
||||||
def test_symlink_size(self):
|
def test_symlink_size(self):
|
||||||
if hasattr(os, "symlink"):
|
path = os.path.join(TEMPDIR, "symlink")
|
||||||
path = os.path.join(TEMPDIR, "symlink")
|
os.symlink("link_target", path)
|
||||||
os.symlink("link_target", path)
|
try:
|
||||||
try:
|
tar = tarfile.open(tmpname, self.mode)
|
||||||
tar = tarfile.open(tmpname, self.mode)
|
tarinfo = tar.gettarinfo(path)
|
||||||
tarinfo = tar.gettarinfo(path)
|
self.assertEqual(tarinfo.size, 0)
|
||||||
self.assertEqual(tarinfo.size, 0)
|
finally:
|
||||||
finally:
|
os.remove(path)
|
||||||
os.remove(path)
|
|
||||||
|
|
||||||
def test_add_self(self):
|
def test_add_self(self):
|
||||||
# Test for #1257255.
|
# Test for #1257255.
|
||||||
|
@ -1408,15 +1410,24 @@ class LinkEmulationTest(ReadTest):
|
||||||
data = open(os.path.join(TEMPDIR, name), "rb").read()
|
data = open(os.path.join(TEMPDIR, name), "rb").read()
|
||||||
self.assertEqual(md5sum(data), md5_regtype)
|
self.assertEqual(md5sum(data), md5_regtype)
|
||||||
|
|
||||||
|
# When 8879 gets fixed, this will need to change. Currently on Windows
|
||||||
|
# we have os.path.islink but no os.link, so these tests fail without the
|
||||||
|
# following skip until link is completed.
|
||||||
|
@unittest.skipIf(hasattr(os.path, "islink"),
|
||||||
|
"Skip emulation - has os.path.islink but not os.link")
|
||||||
def test_hardlink_extraction1(self):
|
def test_hardlink_extraction1(self):
|
||||||
self._test_link_extraction("ustar/lnktype")
|
self._test_link_extraction("ustar/lnktype")
|
||||||
|
|
||||||
|
@unittest.skipIf(hasattr(os.path, "islink"),
|
||||||
|
"Skip emulation - has os.path.islink but not os.link")
|
||||||
def test_hardlink_extraction2(self):
|
def test_hardlink_extraction2(self):
|
||||||
self._test_link_extraction("./ustar/linktest2/lnktype")
|
self._test_link_extraction("./ustar/linktest2/lnktype")
|
||||||
|
|
||||||
|
@unittest.skipIf(hasattr(os, "symlink"), "Skip emulation if symlink exists")
|
||||||
def test_symlink_extraction1(self):
|
def test_symlink_extraction1(self):
|
||||||
self._test_link_extraction("ustar/symtype")
|
self._test_link_extraction("ustar/symtype")
|
||||||
|
|
||||||
|
@unittest.skipIf(hasattr(os, "symlink"), "Skip emulation if symlink exists")
|
||||||
def test_symlink_extraction2(self):
|
def test_symlink_extraction2(self):
|
||||||
self._test_link_extraction("./ustar/linktest2/symtype")
|
self._test_link_extraction("./ustar/linktest2/symtype")
|
||||||
|
|
||||||
|
|
|
@ -154,6 +154,7 @@ Benjamin Collar
|
||||||
Jeffery Collins
|
Jeffery Collins
|
||||||
Robert Collins
|
Robert Collins
|
||||||
Paul Colomiets
|
Paul Colomiets
|
||||||
|
Jason R. Coombs
|
||||||
Geremy Condra
|
Geremy Condra
|
||||||
Juan José Conti
|
Juan José Conti
|
||||||
Matt Conway
|
Matt Conway
|
||||||
|
|
|
@ -1406,6 +1406,9 @@ Library
|
||||||
Extension Modules
|
Extension Modules
|
||||||
-----------------
|
-----------------
|
||||||
|
|
||||||
|
- Issue #1578269: Implement os.symlink for Windows 6.0+. Patch by
|
||||||
|
Jason R. Coombs
|
||||||
|
|
||||||
- In struct.pack, correctly propogate exceptions from computing the truth of an
|
- In struct.pack, correctly propogate exceptions from computing the truth of an
|
||||||
object in the '?' format.
|
object in the '?' format.
|
||||||
|
|
||||||
|
|
|
@ -571,7 +571,7 @@ posix_error_with_allocated_filename(PyObject* name)
|
||||||
|
|
||||||
#ifdef MS_WINDOWS
|
#ifdef MS_WINDOWS
|
||||||
static PyObject *
|
static PyObject *
|
||||||
win32_error(char* function, char* filename)
|
win32_error(char* function, const char* filename)
|
||||||
{
|
{
|
||||||
/* XXX We should pass the function name along in the future.
|
/* XXX We should pass the function name along in the future.
|
||||||
(winreg.c also wants to pass the function name.)
|
(winreg.c also wants to pass the function name.)
|
||||||
|
@ -978,12 +978,28 @@ attributes_from_dir_w(LPCWSTR pszFile, LPWIN32_FILE_ATTRIBUTE_DATA pfad)
|
||||||
return TRUE;
|
return TRUE;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int
|
/* About the following functions: win32_lstat, win32_lstat_w, win32_stat,
|
||||||
win32_stat(const char* path, struct win32_stat *result)
|
win32_stat_w
|
||||||
|
|
||||||
|
In Posix, stat automatically traverses symlinks and returns the stat
|
||||||
|
structure for the target. In Windows, the equivalent GetFileAttributes by
|
||||||
|
default does not traverse symlinks and instead returns attributes for
|
||||||
|
the symlink.
|
||||||
|
|
||||||
|
Therefore, win32_lstat will get the attributes traditionally, and
|
||||||
|
win32_stat will first explicitly resolve the symlink target and then will
|
||||||
|
call win32_lstat on that result.
|
||||||
|
|
||||||
|
The _w represent Unicode equivalents of the aformentioned ANSI functions. */
|
||||||
|
|
||||||
|
static int
|
||||||
|
win32_lstat(const char* path, struct win32_stat *result)
|
||||||
{
|
{
|
||||||
WIN32_FILE_ATTRIBUTE_DATA info;
|
WIN32_FILE_ATTRIBUTE_DATA info;
|
||||||
int code;
|
int code;
|
||||||
char *dot;
|
char *dot;
|
||||||
|
WIN32_FIND_DATAA find_data;
|
||||||
|
HANDLE find_data_handle;
|
||||||
if (!GetFileAttributesExA(path, GetFileExInfoStandard, &info)) {
|
if (!GetFileAttributesExA(path, GetFileExInfoStandard, &info)) {
|
||||||
if (GetLastError() != ERROR_SHARING_VIOLATION) {
|
if (GetLastError() != ERROR_SHARING_VIOLATION) {
|
||||||
/* Protocol violation: we explicitly clear errno, instead of
|
/* Protocol violation: we explicitly clear errno, instead of
|
||||||
|
@ -1000,28 +1016,214 @@ win32_stat(const char* path, struct win32_stat *result)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
code = attribute_data_to_stat(&info, result);
|
code = attribute_data_to_stat(&info, result);
|
||||||
if (code != 0)
|
if (code != 0)
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
|
/* Get WIN32_FIND_DATA structure for the path to determine if
|
||||||
|
it is a symlink */
|
||||||
|
if(info.dwFileAttributes & FILE_ATTRIBUTE_REPARSE_POINT) {
|
||||||
|
find_data_handle = FindFirstFileA(path, &find_data);
|
||||||
|
if(find_data_handle != INVALID_HANDLE_VALUE) {
|
||||||
|
if(find_data.dwReserved0 == IO_REPARSE_TAG_SYMLINK) {
|
||||||
|
/* first clear the S_IFMT bits */
|
||||||
|
result->st_mode ^= (result->st_mode & 0170000);
|
||||||
|
/* now set the bits that make this a symlink */
|
||||||
|
result->st_mode |= 0120000;
|
||||||
|
}
|
||||||
|
FindClose(find_data_handle);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/* Set S_IFEXEC if it is an .exe, .bat, ... */
|
/* Set S_IFEXEC if it is an .exe, .bat, ... */
|
||||||
dot = strrchr(path, '.');
|
dot = strrchr(path, '.');
|
||||||
if (dot) {
|
if (dot) {
|
||||||
if (stricmp(dot, ".bat") == 0 ||
|
if (stricmp(dot, ".bat") == 0 || stricmp(dot, ".cmd") == 0 ||
|
||||||
stricmp(dot, ".cmd") == 0 ||
|
stricmp(dot, ".exe") == 0 || stricmp(dot, ".com") == 0)
|
||||||
stricmp(dot, ".exe") == 0 ||
|
|
||||||
stricmp(dot, ".com") == 0)
|
|
||||||
result->st_mode |= 0111;
|
result->st_mode |= 0111;
|
||||||
}
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int
|
static int
|
||||||
win32_wstat(const wchar_t* path, struct win32_stat *result)
|
win32_lstat_w(const wchar_t* path, struct win32_stat *result)
|
||||||
{
|
{
|
||||||
int code;
|
int code;
|
||||||
const wchar_t *dot;
|
const wchar_t *dot;
|
||||||
WIN32_FILE_ATTRIBUTE_DATA info;
|
WIN32_FILE_ATTRIBUTE_DATA info;
|
||||||
|
WIN32_FIND_DATAW find_data;
|
||||||
|
HANDLE find_data_handle;
|
||||||
if (!GetFileAttributesExW(path, GetFileExInfoStandard, &info)) {
|
if (!GetFileAttributesExW(path, GetFileExInfoStandard, &info)) {
|
||||||
|
if (GetLastError() != ERROR_SHARING_VIOLATION) {
|
||||||
|
/* Protocol violation: we explicitly clear errno, instead of
|
||||||
|
setting it to a POSIX error. Callers should use GetLastError. */
|
||||||
|
errno = 0;
|
||||||
|
return -1;
|
||||||
|
} else {
|
||||||
|
/* Could not get attributes on open file. Fall back to reading
|
||||||
|
the directory. */
|
||||||
|
if (!attributes_from_dir_w(path, &info)) {
|
||||||
|
/* Very strange. This should not fail now */
|
||||||
|
errno = 0;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
code = attribute_data_to_stat(&info, result);
|
||||||
|
if (code < 0)
|
||||||
|
return code;
|
||||||
|
|
||||||
|
/* Get WIN32_FIND_DATA structure for the path to determine if
|
||||||
|
it is a symlink */
|
||||||
|
if(info.dwFileAttributes & FILE_ATTRIBUTE_REPARSE_POINT) {
|
||||||
|
find_data_handle = FindFirstFileW(path, &find_data);
|
||||||
|
if(find_data_handle != INVALID_HANDLE_VALUE) {
|
||||||
|
if(find_data.dwReserved0 == IO_REPARSE_TAG_SYMLINK) {
|
||||||
|
/* first clear the S_IFMT bits */
|
||||||
|
result->st_mode ^= (result->st_mode & 0170000);
|
||||||
|
/* now set the bits that make this a symlink */
|
||||||
|
result->st_mode |= 0120000;
|
||||||
|
}
|
||||||
|
FindClose(find_data_handle);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Set IFEXEC if it is an .exe, .bat, ... */
|
||||||
|
dot = wcsrchr(path, '.');
|
||||||
|
if (dot) {
|
||||||
|
if (_wcsicmp(dot, L".bat") == 0 || _wcsicmp(dot, L".cmd") == 0 ||
|
||||||
|
_wcsicmp(dot, L".exe") == 0 || _wcsicmp(dot, L".com") == 0)
|
||||||
|
result->st_mode |= 0111;
|
||||||
|
}
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Grab GetFinalPathNameByHandle dynamically from kernel32 */
|
||||||
|
static int has_GetFinalPathNameByHandle = 0;
|
||||||
|
static DWORD (CALLBACK *Py_GetFinalPathNameByHandleA)(HANDLE, LPSTR, DWORD,
|
||||||
|
DWORD);
|
||||||
|
static DWORD (CALLBACK *Py_GetFinalPathNameByHandleW)(HANDLE, LPWSTR, DWORD,
|
||||||
|
DWORD);
|
||||||
|
static int
|
||||||
|
check_GetFinalPathNameByHandle()
|
||||||
|
{
|
||||||
|
HINSTANCE hKernel32;
|
||||||
|
/* only recheck */
|
||||||
|
if (!has_GetFinalPathNameByHandle)
|
||||||
|
{
|
||||||
|
hKernel32 = GetModuleHandle("KERNEL32");
|
||||||
|
*(FARPROC*)&Py_GetFinalPathNameByHandleA = GetProcAddress(hKernel32,
|
||||||
|
"GetFinalPathNameByHandleA");
|
||||||
|
*(FARPROC*)&Py_GetFinalPathNameByHandleW = GetProcAddress(hKernel32,
|
||||||
|
"GetFinalPathNameByHandleW");
|
||||||
|
has_GetFinalPathNameByHandle = Py_GetFinalPathNameByHandleA && Py_GetFinalPathNameByHandleW;
|
||||||
|
}
|
||||||
|
return has_GetFinalPathNameByHandle;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int
|
||||||
|
win32_stat(const char* path, struct win32_stat *result)
|
||||||
|
{
|
||||||
|
/* Traverse the symlink to the target using
|
||||||
|
GetFinalPathNameByHandle()
|
||||||
|
*/
|
||||||
|
int code;
|
||||||
|
HANDLE hFile;
|
||||||
|
int buf_size;
|
||||||
|
char *target_path;
|
||||||
|
int result_length;
|
||||||
|
WIN32_FILE_ATTRIBUTE_DATA info;
|
||||||
|
|
||||||
|
if(!check_GetFinalPathNameByHandle()) {
|
||||||
|
/* if the OS doesn't have GetFinalPathNameByHandle, it doesn't
|
||||||
|
have symlinks, so just fall back to the traditional behavior
|
||||||
|
found in lstat. */
|
||||||
|
return win32_lstat(path, result);
|
||||||
|
}
|
||||||
|
|
||||||
|
hFile = CreateFileA(
|
||||||
|
path,
|
||||||
|
0, /* desired access */
|
||||||
|
0, /* share mode */
|
||||||
|
NULL, /* security attributes */
|
||||||
|
OPEN_EXISTING,
|
||||||
|
/* FILE_FLAG_BACKUP_SEMANTICS is required to open a directory */
|
||||||
|
FILE_FLAG_BACKUP_SEMANTICS,
|
||||||
|
NULL);
|
||||||
|
|
||||||
|
if(hFile == INVALID_HANDLE_VALUE) {
|
||||||
|
/* Either the target doesn't exist, or we don't have access to
|
||||||
|
get a handle to it. If the former, we need to return an error.
|
||||||
|
If the latter, we can use attributes_from_dir. */
|
||||||
|
if (GetLastError() != ERROR_SHARING_VIOLATION) {
|
||||||
|
/* Protocol violation: we explicitly clear errno, instead of
|
||||||
|
setting it to a POSIX error. Callers should use GetLastError. */
|
||||||
|
errno = 0;
|
||||||
|
return -1;
|
||||||
|
} else {
|
||||||
|
/* Could not get attributes on open file. Fall back to
|
||||||
|
reading the directory. */
|
||||||
|
if (!attributes_from_dir(path, &info)) {
|
||||||
|
/* Very strange. This should not fail now */
|
||||||
|
errno = 0;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
code = attribute_data_to_stat(&info, result);
|
||||||
|
}
|
||||||
|
|
||||||
|
buf_size = Py_GetFinalPathNameByHandleA(hFile, 0, 0, VOLUME_NAME_DOS);
|
||||||
|
if(!buf_size) return -1;
|
||||||
|
target_path = (char *)malloc((buf_size+1)*sizeof(char));
|
||||||
|
result_length = Py_GetFinalPathNameByHandleA(hFile, target_path,
|
||||||
|
buf_size, VOLUME_NAME_DOS);
|
||||||
|
|
||||||
|
if(!result_length)
|
||||||
|
return -1;
|
||||||
|
|
||||||
|
if(!CloseHandle(hFile))
|
||||||
|
return -1;
|
||||||
|
|
||||||
|
target_path[result_length] = 0;
|
||||||
|
code = win32_lstat(target_path, result);
|
||||||
|
free(target_path);
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int
|
||||||
|
win32_stat_w(const wchar_t* path, struct win32_stat *result)
|
||||||
|
{
|
||||||
|
/* Traverse the symlink to the target using GetFinalPathNameByHandle() */
|
||||||
|
int code;
|
||||||
|
HANDLE hFile;
|
||||||
|
int buf_size;
|
||||||
|
wchar_t *target_path;
|
||||||
|
int result_length;
|
||||||
|
WIN32_FILE_ATTRIBUTE_DATA info;
|
||||||
|
|
||||||
|
if(!check_GetFinalPathNameByHandle()) {
|
||||||
|
/* If the OS doesn't have GetFinalPathNameByHandle, it doesn't have
|
||||||
|
symlinks, so just fall back to the traditional behavior found
|
||||||
|
in lstat. */
|
||||||
|
return win32_lstat_w(path, result);
|
||||||
|
}
|
||||||
|
|
||||||
|
hFile = CreateFileW(
|
||||||
|
path,
|
||||||
|
0, /* desired access */
|
||||||
|
0, /* share mode */
|
||||||
|
NULL, /* security attributes */
|
||||||
|
OPEN_EXISTING,
|
||||||
|
/* FILE_FLAG_BACKUP_SEMANTICS is required to open a directory */
|
||||||
|
FILE_ATTRIBUTE_NORMAL|FILE_FLAG_BACKUP_SEMANTICS,
|
||||||
|
NULL);
|
||||||
|
|
||||||
|
if(hFile == INVALID_HANDLE_VALUE) {
|
||||||
|
/* Either the target doesn't exist, or we don't have access to
|
||||||
|
get a handle to it. If the former, we need to return an error.
|
||||||
|
If the latter, we can use attributes_from_dir. */
|
||||||
if (GetLastError() != ERROR_SHARING_VIOLATION) {
|
if (GetLastError() != ERROR_SHARING_VIOLATION) {
|
||||||
/* Protocol violation: we explicitly clear errno, instead of
|
/* Protocol violation: we explicitly clear errno, instead of
|
||||||
setting it to a POSIX error. Callers should use GetLastError. */
|
setting it to a POSIX error. Callers should use GetLastError. */
|
||||||
|
@ -1036,19 +1238,30 @@ win32_wstat(const wchar_t* path, struct win32_stat *result)
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
code = attribute_data_to_stat(&info, result);
|
||||||
}
|
}
|
||||||
code = attribute_data_to_stat(&info, result);
|
else {
|
||||||
if (code < 0)
|
/* We have a good handle to the target, use it to determine the target
|
||||||
return code;
|
path name (then we'll call lstat on it). */
|
||||||
/* Set IFEXEC if it is an .exe, .bat, ... */
|
buf_size = Py_GetFinalPathNameByHandleW(hFile, 0, 0, VOLUME_NAME_DOS);
|
||||||
dot = wcsrchr(path, '.');
|
if(!buf_size)
|
||||||
if (dot) {
|
return -1;
|
||||||
if (_wcsicmp(dot, L".bat") == 0 ||
|
|
||||||
_wcsicmp(dot, L".cmd") == 0 ||
|
target_path = (wchar_t *)malloc((buf_size+1)*sizeof(wchar_t));
|
||||||
_wcsicmp(dot, L".exe") == 0 ||
|
result_length = Py_GetFinalPathNameByHandleW(hFile, target_path,
|
||||||
_wcsicmp(dot, L".com") == 0)
|
buf_size, VOLUME_NAME_DOS);
|
||||||
result->st_mode |= 0111;
|
|
||||||
|
if(!result_length)
|
||||||
|
return -1;
|
||||||
|
|
||||||
|
if(!CloseHandle(hFile))
|
||||||
|
return -1;
|
||||||
|
|
||||||
|
target_path[result_length] = 0;
|
||||||
|
code = win32_lstat_w(target_path, result);
|
||||||
|
free(target_path);
|
||||||
}
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2443,6 +2656,69 @@ posix__getfullpathname(PyObject *self, PyObject *args)
|
||||||
}
|
}
|
||||||
return PyBytes_FromString(outbuf);
|
return PyBytes_FromString(outbuf);
|
||||||
} /* end of posix__getfullpathname */
|
} /* end of posix__getfullpathname */
|
||||||
|
|
||||||
|
/* A helper function for samepath on windows */
|
||||||
|
static PyObject *
|
||||||
|
posix__getfinalpathname(PyObject *self, PyObject *args)
|
||||||
|
{
|
||||||
|
HANDLE hFile;
|
||||||
|
int buf_size;
|
||||||
|
wchar_t *target_path;
|
||||||
|
int result_length;
|
||||||
|
PyObject *result;
|
||||||
|
wchar_t *path;
|
||||||
|
|
||||||
|
if (!PyArg_ParseTuple(args, "u|:_getfullpathname", &path)) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
if(!check_GetFinalPathNameByHandle()) {
|
||||||
|
/* If the OS doesn't have GetFinalPathNameByHandle, return a
|
||||||
|
NotImplementedError. */
|
||||||
|
return PyErr_Format(PyExc_NotImplementedError,
|
||||||
|
"GetFinalPathNameByHandle not available on this platform");
|
||||||
|
}
|
||||||
|
|
||||||
|
hFile = CreateFileW(
|
||||||
|
path,
|
||||||
|
0, /* desired access */
|
||||||
|
0, /* share mode */
|
||||||
|
NULL, /* security attributes */
|
||||||
|
OPEN_EXISTING,
|
||||||
|
/* FILE_FLAG_BACKUP_SEMANTICS is required to open a directory */
|
||||||
|
FILE_FLAG_BACKUP_SEMANTICS,
|
||||||
|
NULL);
|
||||||
|
|
||||||
|
if(hFile == INVALID_HANDLE_VALUE) {
|
||||||
|
return win32_error_unicode("GetFinalPathNamyByHandle", path);
|
||||||
|
return PyErr_Format(PyExc_RuntimeError, "Could not get a handle to file.");
|
||||||
|
}
|
||||||
|
|
||||||
|
/* We have a good handle to the target, use it to determine the
|
||||||
|
target path name. */
|
||||||
|
buf_size = Py_GetFinalPathNameByHandleW(hFile, 0, 0, VOLUME_NAME_NT);
|
||||||
|
|
||||||
|
if(!buf_size)
|
||||||
|
return win32_error_unicode("GetFinalPathNameByHandle", path);
|
||||||
|
|
||||||
|
target_path = (wchar_t *)malloc((buf_size+1)*sizeof(wchar_t));
|
||||||
|
if(!target_path)
|
||||||
|
return PyErr_NoMemory();
|
||||||
|
|
||||||
|
result_length = Py_GetFinalPathNameByHandleW(hFile, target_path,
|
||||||
|
buf_size, VOLUME_NAME_DOS);
|
||||||
|
if(!result_length)
|
||||||
|
return win32_error_unicode("GetFinalPathNamyByHandle", path);
|
||||||
|
|
||||||
|
if(!CloseHandle(hFile))
|
||||||
|
return win32_error_unicode("GetFinalPathNameByHandle", path);
|
||||||
|
|
||||||
|
target_path[result_length] = 0;
|
||||||
|
result = PyUnicode_FromUnicode(target_path, result_length);
|
||||||
|
free(target_path);
|
||||||
|
return result;
|
||||||
|
|
||||||
|
} /* end of posix__getfinalpathname */
|
||||||
#endif /* MS_WINDOWS */
|
#endif /* MS_WINDOWS */
|
||||||
|
|
||||||
PyDoc_STRVAR(posix_mkdir__doc__,
|
PyDoc_STRVAR(posix_mkdir__doc__,
|
||||||
|
@ -2623,7 +2899,7 @@ static PyObject *
|
||||||
posix_stat(PyObject *self, PyObject *args)
|
posix_stat(PyObject *self, PyObject *args)
|
||||||
{
|
{
|
||||||
#ifdef MS_WINDOWS
|
#ifdef MS_WINDOWS
|
||||||
return posix_do_stat(self, args, "O&:stat", STAT, "U:stat", win32_wstat);
|
return posix_do_stat(self, args, "O&:stat", STAT, "U:stat", win32_stat_w);
|
||||||
#else
|
#else
|
||||||
return posix_do_stat(self, args, "O&:stat", STAT, NULL, NULL);
|
return posix_do_stat(self, args, "O&:stat", STAT, NULL, NULL);
|
||||||
#endif
|
#endif
|
||||||
|
@ -2681,6 +2957,41 @@ posix_umask(PyObject *self, PyObject *args)
|
||||||
return PyLong_FromLong((long)i);
|
return PyLong_FromLong((long)i);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#ifdef MS_WINDOWS
|
||||||
|
|
||||||
|
/* override the default DeleteFileW behavior so that directory
|
||||||
|
symlinks can be removed with this function, the same as with
|
||||||
|
Unix symlinks */
|
||||||
|
BOOL WINAPI Py_DeleteFileW(LPCWSTR lpFileName)
|
||||||
|
{
|
||||||
|
WIN32_FILE_ATTRIBUTE_DATA info;
|
||||||
|
WIN32_FIND_DATAW find_data;
|
||||||
|
HANDLE find_data_handle;
|
||||||
|
int is_directory = 0;
|
||||||
|
int is_link = 0;
|
||||||
|
|
||||||
|
if (GetFileAttributesExW(lpFileName, GetFileExInfoStandard, &info)) {
|
||||||
|
is_directory = info.dwFileAttributes & FILE_ATTRIBUTE_DIRECTORY;
|
||||||
|
|
||||||
|
/* Get WIN32_FIND_DATA structure for the path to determine if
|
||||||
|
it is a symlink */
|
||||||
|
if(is_directory &&
|
||||||
|
info.dwFileAttributes & FILE_ATTRIBUTE_REPARSE_POINT) {
|
||||||
|
find_data_handle = FindFirstFileW(lpFileName, &find_data);
|
||||||
|
|
||||||
|
if(find_data_handle != INVALID_HANDLE_VALUE) {
|
||||||
|
is_link = find_data.dwReserved0 == IO_REPARSE_TAG_SYMLINK;
|
||||||
|
FindClose(find_data_handle);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (is_directory && is_link)
|
||||||
|
return RemoveDirectoryW(lpFileName);
|
||||||
|
|
||||||
|
return DeleteFileW(lpFileName);
|
||||||
|
}
|
||||||
|
#endif /* MS_WINDOWS */
|
||||||
|
|
||||||
PyDoc_STRVAR(posix_unlink__doc__,
|
PyDoc_STRVAR(posix_unlink__doc__,
|
||||||
"unlink(path)\n\n\
|
"unlink(path)\n\n\
|
||||||
|
@ -2694,7 +3005,7 @@ static PyObject *
|
||||||
posix_unlink(PyObject *self, PyObject *args)
|
posix_unlink(PyObject *self, PyObject *args)
|
||||||
{
|
{
|
||||||
#ifdef MS_WINDOWS
|
#ifdef MS_WINDOWS
|
||||||
return win32_1str(args, "remove", "y:remove", DeleteFileA, "U:remove", DeleteFileW);
|
return win32_1str(args, "remove", "y:remove", DeleteFileA, "U:remove", Py_DeleteFileW);
|
||||||
#else
|
#else
|
||||||
return posix_1str(args, "O&:remove", unlink);
|
return posix_1str(args, "O&:remove", unlink);
|
||||||
#endif
|
#endif
|
||||||
|
@ -4544,7 +4855,8 @@ posix_lstat(PyObject *self, PyObject *args)
|
||||||
return posix_do_stat(self, args, "O&:lstat", lstat, NULL, NULL);
|
return posix_do_stat(self, args, "O&:lstat", lstat, NULL, NULL);
|
||||||
#else /* !HAVE_LSTAT */
|
#else /* !HAVE_LSTAT */
|
||||||
#ifdef MS_WINDOWS
|
#ifdef MS_WINDOWS
|
||||||
return posix_do_stat(self, args, "O&:lstat", STAT, "U:lstat", win32_wstat);
|
return posix_do_stat(self, args, "O&:lstat", STAT, "U:lstat",
|
||||||
|
win32_lstat_w);
|
||||||
#else
|
#else
|
||||||
return posix_do_stat(self, args, "O&:lstat", STAT, NULL, NULL);
|
return posix_do_stat(self, args, "O&:lstat", STAT, NULL, NULL);
|
||||||
#endif
|
#endif
|
||||||
|
@ -4609,6 +4921,193 @@ posix_symlink(PyObject *self, PyObject *args)
|
||||||
}
|
}
|
||||||
#endif /* HAVE_SYMLINK */
|
#endif /* HAVE_SYMLINK */
|
||||||
|
|
||||||
|
#if !defined(HAVE_READLINK) && defined(MS_WINDOWS)
|
||||||
|
|
||||||
|
PyDoc_STRVAR(win_readlink__doc__,
|
||||||
|
"readlink(path) -> path\n\n\
|
||||||
|
Return a string representing the path to which the symbolic link points.");
|
||||||
|
|
||||||
|
/* The following structure was copied from
|
||||||
|
http://msdn.microsoft.com/en-us/library/ms791514.aspx as the required
|
||||||
|
include doesn't seem to be present in the Windows SDK (at least as included
|
||||||
|
with Visual Studio Express). */
|
||||||
|
typedef struct _REPARSE_DATA_BUFFER {
|
||||||
|
ULONG ReparseTag;
|
||||||
|
USHORT ReparseDataLength;
|
||||||
|
USHORT Reserved;
|
||||||
|
union {
|
||||||
|
struct {
|
||||||
|
USHORT SubstituteNameOffset;
|
||||||
|
USHORT SubstituteNameLength;
|
||||||
|
USHORT PrintNameOffset;
|
||||||
|
USHORT PrintNameLength;
|
||||||
|
ULONG Flags;
|
||||||
|
WCHAR PathBuffer[1];
|
||||||
|
} SymbolicLinkReparseBuffer;
|
||||||
|
|
||||||
|
struct {
|
||||||
|
USHORT SubstituteNameOffset;
|
||||||
|
USHORT SubstituteNameLength;
|
||||||
|
USHORT PrintNameOffset;
|
||||||
|
USHORT PrintNameLength;
|
||||||
|
WCHAR PathBuffer[1];
|
||||||
|
} MountPointReparseBuffer;
|
||||||
|
|
||||||
|
struct {
|
||||||
|
UCHAR DataBuffer[1];
|
||||||
|
} GenericReparseBuffer;
|
||||||
|
};
|
||||||
|
} REPARSE_DATA_BUFFER, *PREPARSE_DATA_BUFFER;
|
||||||
|
|
||||||
|
#define REPARSE_DATA_BUFFER_HEADER_SIZE FIELD_OFFSET(REPARSE_DATA_BUFFER, GenericReparseBuffer)
|
||||||
|
|
||||||
|
#define MAXIMUM_REPARSE_DATA_BUFFER_SIZE ( 16 * 1024 )
|
||||||
|
|
||||||
|
/* Windows readlink implementation */
|
||||||
|
static PyObject *
|
||||||
|
win_readlink(PyObject *self, PyObject *args)
|
||||||
|
{
|
||||||
|
wchar_t *path;
|
||||||
|
DWORD n_bytes_returned;
|
||||||
|
DWORD io_result;
|
||||||
|
PyObject *result;
|
||||||
|
HANDLE reparse_point_handle;
|
||||||
|
|
||||||
|
char target_buffer[MAXIMUM_REPARSE_DATA_BUFFER_SIZE];
|
||||||
|
REPARSE_DATA_BUFFER *rdb = (REPARSE_DATA_BUFFER *)target_buffer;
|
||||||
|
wchar_t *print_name;
|
||||||
|
|
||||||
|
if (!PyArg_ParseTuple(args,
|
||||||
|
"u:readlink",
|
||||||
|
&path))
|
||||||
|
return NULL;
|
||||||
|
|
||||||
|
/* First get a handle to the reparse point */
|
||||||
|
Py_BEGIN_ALLOW_THREADS
|
||||||
|
reparse_point_handle = CreateFileW(
|
||||||
|
path,
|
||||||
|
0,
|
||||||
|
0,
|
||||||
|
0,
|
||||||
|
OPEN_EXISTING,
|
||||||
|
FILE_FLAG_OPEN_REPARSE_POINT|FILE_FLAG_BACKUP_SEMANTICS,
|
||||||
|
0);
|
||||||
|
Py_END_ALLOW_THREADS
|
||||||
|
|
||||||
|
if (reparse_point_handle==INVALID_HANDLE_VALUE)
|
||||||
|
{
|
||||||
|
return win32_error_unicode("readlink", path);
|
||||||
|
}
|
||||||
|
|
||||||
|
Py_BEGIN_ALLOW_THREADS
|
||||||
|
/* New call DeviceIoControl to read the reparse point */
|
||||||
|
io_result = DeviceIoControl(
|
||||||
|
reparse_point_handle,
|
||||||
|
FSCTL_GET_REPARSE_POINT,
|
||||||
|
0, 0, /* in buffer */
|
||||||
|
target_buffer, sizeof(target_buffer),
|
||||||
|
&n_bytes_returned,
|
||||||
|
0 /* we're not using OVERLAPPED_IO */
|
||||||
|
);
|
||||||
|
CloseHandle(reparse_point_handle);
|
||||||
|
Py_END_ALLOW_THREADS
|
||||||
|
|
||||||
|
if (io_result==0)
|
||||||
|
{
|
||||||
|
return win32_error_unicode("readlink", path);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (rdb->ReparseTag != IO_REPARSE_TAG_SYMLINK)
|
||||||
|
{
|
||||||
|
PyErr_SetString(PyExc_ValueError,
|
||||||
|
"not a symbolic link");
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
print_name = rdb->SymbolicLinkReparseBuffer.PathBuffer + rdb->SymbolicLinkReparseBuffer.PrintNameOffset;
|
||||||
|
result = PyUnicode_FromWideChar(print_name, rdb->SymbolicLinkReparseBuffer.PrintNameLength/2);
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
#endif /* !defined(HAVE_READLINK) && defined(MS_WINDOWS) */
|
||||||
|
|
||||||
|
#if !defined(HAVE_SYMLINK) && defined(MS_WINDOWS)
|
||||||
|
|
||||||
|
/* Grab CreateSymbolicLinkW dynamically from kernel32 */
|
||||||
|
static int has_CreateSymbolicLinkW = 0;
|
||||||
|
static DWORD (CALLBACK *Py_CreateSymbolicLinkW)(LPWSTR, LPWSTR, DWORD);
|
||||||
|
static int
|
||||||
|
check_CreateSymbolicLinkW()
|
||||||
|
{
|
||||||
|
HINSTANCE hKernel32;
|
||||||
|
/* only recheck */
|
||||||
|
if (has_CreateSymbolicLinkW)
|
||||||
|
return has_CreateSymbolicLinkW;
|
||||||
|
hKernel32 = GetModuleHandle("KERNEL32");
|
||||||
|
*(FARPROC*)&Py_CreateSymbolicLinkW = GetProcAddress(hKernel32, "CreateSymbolicLinkW");
|
||||||
|
if (Py_CreateSymbolicLinkW)
|
||||||
|
has_CreateSymbolicLinkW = 1;
|
||||||
|
return has_CreateSymbolicLinkW;
|
||||||
|
}
|
||||||
|
|
||||||
|
PyDoc_STRVAR(win_symlink__doc__,
|
||||||
|
"symlink(src, dst, target_is_directory=False)\n\n\
|
||||||
|
Create a symbolic link pointing to src named dst.\n\
|
||||||
|
target_is_directory is required if the target is to be interpreted as\n\
|
||||||
|
a directory.\n\
|
||||||
|
This function requires Windows 6.0 or greater, and raises a\n\
|
||||||
|
NotImplementedError otherwise.");
|
||||||
|
|
||||||
|
static PyObject *
|
||||||
|
win_symlink(PyObject *self, PyObject *args, PyObject *kwargs)
|
||||||
|
{
|
||||||
|
static char *kwlist[] = {"src", "dest", "target_is_directory", NULL};
|
||||||
|
PyObject *src, *dest;
|
||||||
|
int target_is_directory = 0;
|
||||||
|
DWORD res;
|
||||||
|
WIN32_FILE_ATTRIBUTE_DATA src_info;
|
||||||
|
|
||||||
|
if (!check_CreateSymbolicLinkW())
|
||||||
|
{
|
||||||
|
/* raise NotImplementedError */
|
||||||
|
return PyErr_Format(PyExc_NotImplementedError,
|
||||||
|
"CreateSymbolicLinkW not found");
|
||||||
|
}
|
||||||
|
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "OO|i:symlink",
|
||||||
|
kwlist, &src, &dest, &target_is_directory))
|
||||||
|
return NULL;
|
||||||
|
if (!convert_to_unicode(&src)) { return NULL; }
|
||||||
|
if (!convert_to_unicode(&dest)) {
|
||||||
|
Py_DECREF(src);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* if src is a directory, ensure target_is_directory==1 */
|
||||||
|
if(
|
||||||
|
GetFileAttributesExW(
|
||||||
|
PyUnicode_AsUnicode(src), GetFileExInfoStandard, &src_info
|
||||||
|
))
|
||||||
|
{
|
||||||
|
target_is_directory = target_is_directory ||
|
||||||
|
(src_info.dwFileAttributes & FILE_ATTRIBUTE_DIRECTORY);
|
||||||
|
}
|
||||||
|
|
||||||
|
Py_BEGIN_ALLOW_THREADS
|
||||||
|
res = Py_CreateSymbolicLinkW(
|
||||||
|
PyUnicode_AsUnicode(dest),
|
||||||
|
PyUnicode_AsUnicode(src),
|
||||||
|
target_is_directory);
|
||||||
|
Py_END_ALLOW_THREADS
|
||||||
|
Py_DECREF(src);
|
||||||
|
Py_DECREF(dest);
|
||||||
|
if (!res)
|
||||||
|
{
|
||||||
|
return win32_error_unicode("symlink", PyUnicode_AsUnicode(src));
|
||||||
|
}
|
||||||
|
|
||||||
|
Py_INCREF(Py_None);
|
||||||
|
return Py_None;
|
||||||
|
}
|
||||||
|
#endif /* !defined(HAVE_SYMLINK) && defined(MS_WINDOWS) */
|
||||||
|
|
||||||
#ifdef HAVE_TIMES
|
#ifdef HAVE_TIMES
|
||||||
#if defined(PYCC_VACPP) && defined(PYOS_OS2)
|
#if defined(PYCC_VACPP) && defined(PYOS_OS2)
|
||||||
|
@ -7076,13 +7575,19 @@ static PyMethodDef posix_methods[] = {
|
||||||
#ifdef HAVE_READLINK
|
#ifdef HAVE_READLINK
|
||||||
{"readlink", posix_readlink, METH_VARARGS, posix_readlink__doc__},
|
{"readlink", posix_readlink, METH_VARARGS, posix_readlink__doc__},
|
||||||
#endif /* HAVE_READLINK */
|
#endif /* HAVE_READLINK */
|
||||||
{"rename", posix_rename, METH_VARARGS, posix_rename__doc__},
|
#if !defined(HAVE_READLINK) && defined(MS_WINDOWS)
|
||||||
{"rmdir", posix_rmdir, METH_VARARGS, posix_rmdir__doc__},
|
{"readlink", win_readlink, METH_VARARGS, win_readlink__doc__},
|
||||||
{"stat", posix_stat, METH_VARARGS, posix_stat__doc__},
|
#endif /* !defined(HAVE_READLINK) && defined(MS_WINDOWS) */
|
||||||
|
{"rename", posix_rename, METH_VARARGS, posix_rename__doc__},
|
||||||
|
{"rmdir", posix_rmdir, METH_VARARGS, posix_rmdir__doc__},
|
||||||
|
{"stat", posix_stat, METH_VARARGS, posix_stat__doc__},
|
||||||
{"stat_float_times", stat_float_times, METH_VARARGS, stat_float_times__doc__},
|
{"stat_float_times", stat_float_times, METH_VARARGS, stat_float_times__doc__},
|
||||||
#ifdef HAVE_SYMLINK
|
#ifdef HAVE_SYMLINK
|
||||||
{"symlink", posix_symlink, METH_VARARGS, posix_symlink__doc__},
|
{"symlink", posix_symlink, METH_VARARGS, posix_symlink__doc__},
|
||||||
#endif /* HAVE_SYMLINK */
|
#endif /* HAVE_SYMLINK */
|
||||||
|
#if !defined(HAVE_SYMLINK) && defined(MS_WINDOWS)
|
||||||
|
{"symlink", (PyCFunction)win_symlink, METH_VARARGS | METH_KEYWORDS, win_symlink__doc__},
|
||||||
|
#endif /* !defined(HAVE_SYMLINK) && defined(MS_WINDOWS) */
|
||||||
#ifdef HAVE_SYSTEM
|
#ifdef HAVE_SYSTEM
|
||||||
{"system", posix_system, METH_VARARGS, posix_system__doc__},
|
{"system", posix_system, METH_VARARGS, posix_system__doc__},
|
||||||
#endif
|
#endif
|
||||||
|
@ -7307,6 +7812,7 @@ static PyMethodDef posix_methods[] = {
|
||||||
{"abort", posix_abort, METH_NOARGS, posix_abort__doc__},
|
{"abort", posix_abort, METH_NOARGS, posix_abort__doc__},
|
||||||
#ifdef MS_WINDOWS
|
#ifdef MS_WINDOWS
|
||||||
{"_getfullpathname", posix__getfullpathname, METH_VARARGS, NULL},
|
{"_getfullpathname", posix__getfullpathname, METH_VARARGS, NULL},
|
||||||
|
{"_getfinalpathname", posix__getfinalpathname, METH_VARARGS, NULL},
|
||||||
#endif
|
#endif
|
||||||
#ifdef HAVE_GETLOADAVG
|
#ifdef HAVE_GETLOADAVG
|
||||||
{"getloadavg", posix_getloadavg, METH_NOARGS, posix_getloadavg__doc__},
|
{"getloadavg", posix_getloadavg, METH_NOARGS, posix_getloadavg__doc__},
|
||||||
|
|
Loading…
Reference in New Issue