""" A module built to test if the current process has the privilege to create symlinks on Windows. """ # allow script to run natively under python 2.6+ from __future__ import print_function import ctypes from ctypes import wintypes GetCurrentProcess = ctypes.windll.kernel32.GetCurrentProcess GetCurrentProcess.restype = wintypes.HANDLE OpenProcessToken = ctypes.windll.advapi32.OpenProcessToken OpenProcessToken.argtypes = (wintypes.HANDLE, wintypes.DWORD, ctypes.POINTER(wintypes.HANDLE)) OpenProcessToken.restype = wintypes.BOOL class LUID(ctypes.Structure): _fields_ = [ ('low_part', wintypes.DWORD), ('high_part', wintypes.LONG), ] def __eq__(self, other): return ( self.high_part == other.high_part and self.low_part == other.low_part ) def __ne__(self, other): return not (self==other) LookupPrivilegeValue = ctypes.windll.advapi32.LookupPrivilegeValueW LookupPrivilegeValue.argtypes = ( wintypes.LPWSTR, # system name wintypes.LPWSTR, # name ctypes.POINTER(LUID), ) LookupPrivilegeValue.restype = wintypes.BOOL class TOKEN_INFORMATION_CLASS: TokenUser = 1 TokenGroups = 2 TokenPrivileges = 3 # ... see http://msdn.microsoft.com/en-us/library/aa379626%28VS.85%29.aspx SE_PRIVILEGE_ENABLED_BY_DEFAULT = (0x00000001) SE_PRIVILEGE_ENABLED = (0x00000002) SE_PRIVILEGE_REMOVED = (0x00000004) SE_PRIVILEGE_USED_FOR_ACCESS = (0x80000000) class LUID_AND_ATTRIBUTES(ctypes.Structure): _fields_ = [ ('LUID', LUID), ('attributes', wintypes.DWORD), ] def is_enabled(self): return bool(self.attributes & SE_PRIVILEGE_ENABLED) def enable(self): self.attributes |= SE_PRIVILEGE_ENABLED def get_name(self): size = wintypes.DWORD(10240) buf = ctypes.create_unicode_buffer(size.value) res = LookupPrivilegeName(None, self.LUID, buf, size) if res == 0: raise RuntimeError return buf[:size.value] def __str__(self): name = self.name fmt = ['{name}', '{name} (enabled)'][self.is_enabled()] return fmt.format(**vars()) LookupPrivilegeName = ctypes.windll.advapi32.LookupPrivilegeNameW LookupPrivilegeName.argtypes = ( wintypes.LPWSTR, # lpSystemName ctypes.POINTER(LUID), # lpLuid wintypes.LPWSTR, # lpName ctypes.POINTER(wintypes.DWORD), #cchName ) LookupPrivilegeName.restype = wintypes.BOOL class TOKEN_PRIVILEGES(ctypes.Structure): _fields_ = [ ('count', wintypes.DWORD), ('privileges', LUID_AND_ATTRIBUTES*0), ] def get_array(self): array_type = LUID_AND_ATTRIBUTES*self.count privileges = ctypes.cast(self.privileges, ctypes.POINTER(array_type)).contents return privileges def __iter__(self): return iter(self.get_array()) PTOKEN_PRIVILEGES = ctypes.POINTER(TOKEN_PRIVILEGES) GetTokenInformation = ctypes.windll.advapi32.GetTokenInformation GetTokenInformation.argtypes = [ wintypes.HANDLE, # TokenHandle ctypes.c_uint, # TOKEN_INFORMATION_CLASS value ctypes.c_void_p, # TokenInformation wintypes.DWORD, # TokenInformationLength ctypes.POINTER(wintypes.DWORD), # ReturnLength ] GetTokenInformation.restype = wintypes.BOOL # http://msdn.microsoft.com/en-us/library/aa375202%28VS.85%29.aspx AdjustTokenPrivileges = ctypes.windll.advapi32.AdjustTokenPrivileges AdjustTokenPrivileges.restype = wintypes.BOOL AdjustTokenPrivileges.argtypes = [ wintypes.HANDLE, # TokenHandle wintypes.BOOL, # DisableAllPrivileges PTOKEN_PRIVILEGES, # NewState (optional) wintypes.DWORD, # BufferLength of PreviousState PTOKEN_PRIVILEGES, # PreviousState (out, optional) ctypes.POINTER(wintypes.DWORD), # ReturnLength ] def get_process_token(): "Get the current process token" token = wintypes.HANDLE() TOKEN_ALL_ACCESS = 0xf01ff res = OpenProcessToken(GetCurrentProcess(), TOKEN_ALL_ACCESS, token) if not res > 0: raise RuntimeError("Couldn't get process token") return token def get_symlink_luid(): "Get the LUID for the SeCreateSymbolicLinkPrivilege" symlink_luid = LUID() res = LookupPrivilegeValue(None, "SeCreateSymbolicLinkPrivilege", symlink_luid) if not res > 0: raise RuntimeError("Couldn't lookup privilege value") return symlink_luid def get_privilege_information(): "Get all privileges associated with the current process." # first call with zero length to determine what size buffer we need return_length = wintypes.DWORD() params = [ get_process_token(), TOKEN_INFORMATION_CLASS.TokenPrivileges, None, 0, return_length, ] res = GetTokenInformation(*params) # assume we now have the necessary length in return_length buffer = ctypes.create_string_buffer(return_length.value) params[2] = buffer params[3] = return_length.value res = GetTokenInformation(*params) assert res > 0, "Error in second GetTokenInformation (%d)" % res privileges = ctypes.cast(buffer, ctypes.POINTER(TOKEN_PRIVILEGES)).contents return privileges def report_privilege_information(): "Report all privilege information assigned to the current process." privileges = get_privilege_information() print("found {0} privileges".format(privileges.count)) tuple(map(print, privileges)) def enable_symlink_privilege(): """ Try to assign the symlink privilege to the current process token. Return True if the assignment is successful. """ # create a space in memory for a TOKEN_PRIVILEGES structure # with one element size = ctypes.sizeof(TOKEN_PRIVILEGES) size += ctypes.sizeof(LUID_AND_ATTRIBUTES) buffer = ctypes.create_string_buffer(size) tp = ctypes.cast(buffer, ctypes.POINTER(TOKEN_PRIVILEGES)).contents tp.count = 1 tp.get_array()[0].enable() tp.get_array()[0].LUID = get_symlink_luid() token = get_process_token() res = AdjustTokenPrivileges(token, False, tp, 0, None, None) if res == 0: raise RuntimeError("Error in AdjustTokenPrivileges") ERROR_NOT_ALL_ASSIGNED = 1300 return ctypes.windll.kernel32.GetLastError() != ERROR_NOT_ALL_ASSIGNED def main(): assigned = enable_symlink_privilege() msg = ['failure', 'success'][assigned] print("Symlink privilege assignment completed with {0}".format(msg)) if __name__ == '__main__': main()