gh-102519: Add os.listdrives, os.listvolumes and os.listmounts on Windows (GH-102544)

This commit is contained in:
Steve Dower 2023-03-10 12:21:37 +00:00 committed by GitHub
parent 2999e02836
commit cb35882773
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 375 additions and 1 deletions

View File

@ -294,6 +294,10 @@ os
method to check if the entry is a junction.
(Contributed by Charles Machalow in :gh:`99547`.)
* Add :func:`os.listdrives`, :func:`os.listvolumes` and :func:`os.listmounts`
functions on Windows for enumerating drives, volumes and mount points.
(Contributed by Steve Dower in :gh:`102519`.)
os.path
-------

View File

@ -1216,6 +1216,7 @@ _PyStaticObjects_CheckRefcnt(PyInterpreterState *interp) {
_PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(value));
_PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(values));
_PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(version));
_PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(volume));
_PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(warnings));
_PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(warnoptions));
_PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(wbits));

View File

@ -702,6 +702,7 @@ struct _Py_global_strings {
STRUCT_FOR_ID(value)
STRUCT_FOR_ID(values)
STRUCT_FOR_ID(version)
STRUCT_FOR_ID(volume)
STRUCT_FOR_ID(warnings)
STRUCT_FOR_ID(warnoptions)
STRUCT_FOR_ID(wbits)

View File

@ -1208,6 +1208,7 @@ extern "C" {
INIT_ID(value), \
INIT_ID(values), \
INIT_ID(version), \
INIT_ID(volume), \
INIT_ID(warnings), \
INIT_ID(warnoptions), \
INIT_ID(wbits), \

View File

@ -1310,6 +1310,8 @@ _PyUnicode_InitStaticStrings(void) {
PyUnicode_InternInPlace(&string);
string = &_Py_ID(version);
PyUnicode_InternInPlace(&string);
string = &_Py_ID(volume);
PyUnicode_InternInPlace(&string);
string = &_Py_ID(warnings);
PyUnicode_InternInPlace(&string);
string = &_Py_ID(warnoptions);

View File

@ -2648,6 +2648,49 @@ class Win32ListdirTests(unittest.TestCase):
[os.fsencode(path) for path in self.created_paths])
@unittest.skipUnless(os.name == "nt", "NT specific tests")
class Win32ListdriveTests(unittest.TestCase):
"""Test listdrive, listmounts and listvolume on Windows."""
def setUp(self):
# Get drives and volumes from fsutil
out = subprocess.check_output(
["fsutil.exe", "volume", "list"],
cwd=os.path.join(os.getenv("SystemRoot", "\\Windows"), "System32"),
encoding="mbcs",
errors="ignore",
)
lines = out.splitlines()
self.known_volumes = {l for l in lines if l.startswith('\\\\?\\')}
self.known_drives = {l for l in lines if l[1:] == ':\\'}
self.known_mounts = {l for l in lines if l[1:3] == ':\\'}
def test_listdrives(self):
drives = os.listdrives()
self.assertIsInstance(drives, list)
self.assertSetEqual(
self.known_drives,
self.known_drives & set(drives),
)
def test_listvolumes(self):
volumes = os.listvolumes()
self.assertIsInstance(volumes, list)
self.assertSetEqual(
self.known_volumes,
self.known_volumes & set(volumes),
)
def test_listmounts(self):
for volume in os.listvolumes():
mounts = os.listmounts(volume)
self.assertIsInstance(mounts, list)
self.assertSetEqual(
set(mounts),
self.known_mounts & set(mounts),
)
@unittest.skipUnless(hasattr(os, 'readlink'), 'needs os.readlink()')
class ReadlinkTests(unittest.TestCase):
filelink = 'readlinktest'

View File

@ -0,0 +1,2 @@
Add :func:`os.listdrives`, :func:`os.listvolumes` and :func:`os.listmounts`
functions on Windows for enumerating drives, volumes and mount points

View File

@ -1601,6 +1601,120 @@ exit:
#if defined(MS_WINDOWS)
PyDoc_STRVAR(os_listdrives__doc__,
"listdrives($module, /)\n"
"--\n"
"\n"
"Return a list containing the names of drives in the system.\n"
"\n"
"A drive name typically looks like \'C:\\\\\'.");
#define OS_LISTDRIVES_METHODDEF \
{"listdrives", (PyCFunction)os_listdrives, METH_NOARGS, os_listdrives__doc__},
static PyObject *
os_listdrives_impl(PyObject *module);
static PyObject *
os_listdrives(PyObject *module, PyObject *Py_UNUSED(ignored))
{
return os_listdrives_impl(module);
}
#endif /* defined(MS_WINDOWS) */
#if defined(MS_WINDOWS)
PyDoc_STRVAR(os_listvolumes__doc__,
"listvolumes($module, /)\n"
"--\n"
"\n"
"Return a list containing the volumes in the system.\n"
"\n"
"Volumes are typically represented as a GUID path.");
#define OS_LISTVOLUMES_METHODDEF \
{"listvolumes", (PyCFunction)os_listvolumes, METH_NOARGS, os_listvolumes__doc__},
static PyObject *
os_listvolumes_impl(PyObject *module);
static PyObject *
os_listvolumes(PyObject *module, PyObject *Py_UNUSED(ignored))
{
return os_listvolumes_impl(module);
}
#endif /* defined(MS_WINDOWS) */
#if defined(MS_WINDOWS)
PyDoc_STRVAR(os_listmounts__doc__,
"listmounts($module, /, volume)\n"
"--\n"
"\n"
"Return a list containing mount points for a particular volume.\n"
"\n"
"\'volume\' should be a GUID path as returned from os.listvolumes.");
#define OS_LISTMOUNTS_METHODDEF \
{"listmounts", _PyCFunction_CAST(os_listmounts), METH_FASTCALL|METH_KEYWORDS, os_listmounts__doc__},
static PyObject *
os_listmounts_impl(PyObject *module, path_t *volume);
static PyObject *
os_listmounts(PyObject *module, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames)
{
PyObject *return_value = NULL;
#if defined(Py_BUILD_CORE) && !defined(Py_BUILD_CORE_MODULE)
#define NUM_KEYWORDS 1
static struct {
PyGC_Head _this_is_not_used;
PyObject_VAR_HEAD
PyObject *ob_item[NUM_KEYWORDS];
} _kwtuple = {
.ob_base = PyVarObject_HEAD_INIT(&PyTuple_Type, NUM_KEYWORDS)
.ob_item = { &_Py_ID(volume), },
};
#undef NUM_KEYWORDS
#define KWTUPLE (&_kwtuple.ob_base.ob_base)
#else // !Py_BUILD_CORE
# define KWTUPLE NULL
#endif // !Py_BUILD_CORE
static const char * const _keywords[] = {"volume", NULL};
static _PyArg_Parser _parser = {
.keywords = _keywords,
.fname = "listmounts",
.kwtuple = KWTUPLE,
};
#undef KWTUPLE
PyObject *argsbuf[1];
path_t volume = PATH_T_INITIALIZE("listmounts", "volume", 0, 0);
args = _PyArg_UnpackKeywords(args, nargs, NULL, kwnames, &_parser, 1, 1, 0, argsbuf);
if (!args) {
goto exit;
}
if (!path_converter(args[0], &volume)) {
goto exit;
}
return_value = os_listmounts_impl(module, &volume);
exit:
/* Cleanup for volume */
path_cleanup(&volume);
return return_value;
}
#endif /* defined(MS_WINDOWS) */
#if defined(MS_WINDOWS)
PyDoc_STRVAR(os__getfullpathname__doc__,
"_getfullpathname($module, path, /)\n"
"--\n"
@ -11253,6 +11367,18 @@ exit:
#define OS_LINK_METHODDEF
#endif /* !defined(OS_LINK_METHODDEF) */
#ifndef OS_LISTDRIVES_METHODDEF
#define OS_LISTDRIVES_METHODDEF
#endif /* !defined(OS_LISTDRIVES_METHODDEF) */
#ifndef OS_LISTVOLUMES_METHODDEF
#define OS_LISTVOLUMES_METHODDEF
#endif /* !defined(OS_LISTVOLUMES_METHODDEF) */
#ifndef OS_LISTMOUNTS_METHODDEF
#define OS_LISTMOUNTS_METHODDEF
#endif /* !defined(OS_LISTMOUNTS_METHODDEF) */
#ifndef OS__GETFULLPATHNAME_METHODDEF
#define OS__GETFULLPATHNAME_METHODDEF
#endif /* !defined(OS__GETFULLPATHNAME_METHODDEF) */
@ -11796,4 +11922,4 @@ exit:
#ifndef OS_WAITSTATUS_TO_EXITCODE_METHODDEF
#define OS_WAITSTATUS_TO_EXITCODE_METHODDEF
#endif /* !defined(OS_WAITSTATUS_TO_EXITCODE_METHODDEF) */
/*[clinic end generated code: output=9495478e51701b8a input=a9049054013a1b77]*/
/*[clinic end generated code: output=47750e0e29c8d707 input=a9049054013a1b77]*/

View File

@ -4229,7 +4229,198 @@ os_listdir_impl(PyObject *module, path_t *path)
#endif
}
#ifdef MS_WINDOWS
/*[clinic input]
os.listdrives
Return a list containing the names of drives in the system.
A drive name typically looks like 'C:\\'.
[clinic start generated code]*/
static PyObject *
os_listdrives_impl(PyObject *module)
/*[clinic end generated code: output=aaece9dacdf682b5 input=1af9ccc9e583798e]*/
{
/* Number of possible drives is limited, so 256 should always be enough.
On the day when it is not, listmounts() will have to be used. */
wchar_t buffer[256];
DWORD buflen = Py_ARRAY_LENGTH(buffer);
PyObject *result = NULL;
if (PySys_Audit("os.listdrives", NULL) < 0) {
return NULL;
}
Py_BEGIN_ALLOW_THREADS;
buflen = GetLogicalDriveStringsW(buflen, buffer);
Py_END_ALLOW_THREADS;
if (!buflen) {
PyErr_SetFromWindowsErr(0);
return NULL;
} else if (buflen >= Py_ARRAY_LENGTH(buffer)) {
PyErr_SetFromWindowsErr(ERROR_MORE_DATA);
return NULL;
}
/* buflen includes a null terminator, so remove it */
PyObject *str = PyUnicode_FromWideChar(buffer, buflen - 1);
if (str) {
PyObject *nullchar = PyUnicode_FromStringAndSize("\0", 1);
if (nullchar) {
result = PyUnicode_Split(str, nullchar, -1);
Py_DECREF(nullchar);
}
Py_DECREF(str);
}
return result;
}
/*[clinic input]
os.listvolumes
Return a list containing the volumes in the system.
Volumes are typically represented as a GUID path.
[clinic start generated code]*/
static PyObject *
os_listvolumes_impl(PyObject *module)
/*[clinic end generated code: output=534e10ea2bf9d386 input=f6e4e70371f11e99]*/
{
PyObject *result = PyList_New(0);
HANDLE find = INVALID_HANDLE_VALUE;
wchar_t buffer[MAX_PATH + 1];
if (!result) {
return NULL;
}
if (PySys_Audit("os.listvolumes", NULL) < 0) {
Py_DECREF(result);
return NULL;
}
int err = 0;
Py_BEGIN_ALLOW_THREADS;
find = FindFirstVolumeW(buffer, Py_ARRAY_LENGTH(buffer));
if (find == INVALID_HANDLE_VALUE) {
err = GetLastError();
}
Py_END_ALLOW_THREADS;
while (!err) {
PyObject *s = PyUnicode_FromWideChar(buffer, -1);
if (!s || PyList_Append(result, s) < 0) {
Py_XDECREF(s);
Py_CLEAR(result);
break;
}
Py_DECREF(s);
Py_BEGIN_ALLOW_THREADS;
if (!FindNextVolumeW(find, buffer, Py_ARRAY_LENGTH(buffer))) {
err = GetLastError();
}
Py_END_ALLOW_THREADS;
}
if (find != INVALID_HANDLE_VALUE) {
Py_BEGIN_ALLOW_THREADS;
FindVolumeClose(find);
Py_END_ALLOW_THREADS;
}
if (err && err != ERROR_NO_MORE_FILES) {
PyErr_SetFromWindowsErr(err);
Py_XDECREF(result);
result = NULL;
}
return result;
}
/*[clinic input]
os.listmounts
volume: path_t
Return a list containing mount points for a particular volume.
'volume' should be a GUID path as returned from os.listvolumes.
[clinic start generated code]*/
static PyObject *
os_listmounts_impl(PyObject *module, path_t *volume)
/*[clinic end generated code: output=06da49679de4512e input=a8a27178e3f67845]*/
{
wchar_t default_buffer[MAX_PATH + 1];
DWORD buflen = Py_ARRAY_LENGTH(default_buffer);
LPWSTR buffer = default_buffer;
DWORD attributes;
PyObject *str = NULL;
PyObject *nullchar = NULL;
PyObject *result = NULL;
/* Ensure we have a valid volume path before continuing */
Py_BEGIN_ALLOW_THREADS
attributes = GetFileAttributesW(volume->wide);
Py_END_ALLOW_THREADS
if (attributes == INVALID_FILE_ATTRIBUTES &&
GetLastError() == ERROR_UNRECOGNIZED_VOLUME)
{
return PyErr_SetFromWindowsErr(ERROR_UNRECOGNIZED_VOLUME);
}
if (PySys_Audit("os.listmounts", "O", volume->object) < 0) {
return NULL;
}
while (1) {
BOOL success;
Py_BEGIN_ALLOW_THREADS
success = GetVolumePathNamesForVolumeNameW(volume->wide, buffer,
buflen, &buflen);
Py_END_ALLOW_THREADS
if (success) {
break;
}
if (GetLastError() != ERROR_MORE_DATA) {
PyErr_SetFromWindowsErr(0);
goto exit;
}
if (buffer != default_buffer) {
PyMem_Free((void *)buffer);
}
buffer = (wchar_t*)PyMem_Malloc(sizeof(wchar_t) * buflen);
if (!buffer) {
PyErr_NoMemory();
goto exit;
}
}
if (buflen < 2) {
result = PyList_New(0);
goto exit;
}
// buflen includes two null terminators, one for the last string
// and one for the array of strings.
str = PyUnicode_FromWideChar(buffer, buflen - 2);
nullchar = PyUnicode_FromStringAndSize("\0", 1);
if (str && nullchar) {
result = PyUnicode_Split(str, nullchar, -1);
}
exit:
if (buffer != default_buffer) {
PyMem_Free(buffer);
}
Py_XDECREF(nullchar);
Py_XDECREF(str);
return result;
}
int
_PyOS_getfullpathname(const wchar_t *path, wchar_t **abspath_p)
{
@ -15252,6 +15443,9 @@ static PyMethodDef posix_methods[] = {
OS_GETCWDB_METHODDEF
OS_LINK_METHODDEF
OS_LISTDIR_METHODDEF
OS_LISTDRIVES_METHODDEF
OS_LISTMOUNTS_METHODDEF
OS_LISTVOLUMES_METHODDEF
OS_LSTAT_METHODDEF
OS_MKDIR_METHODDEF
OS_NICE_METHODDEF