bpo-45512: Use Argument Clinic to set sqlite3 isolation level (GH-29593)

This commit is contained in:
Erlend Egeberg Aasland 2021-11-18 10:18:09 +01:00 committed by GitHub
parent df3e53d86b
commit 0920b61a0c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 67 additions and 72 deletions

View File

@ -63,22 +63,7 @@ pysqlite_connection_init(PyObject *self, PyObject *args, PyObject *kwargs)
} }
} }
if (fastargs[3]) { if (fastargs[3]) {
if (fastargs[3] == Py_None) { if (!isolation_level_converter(fastargs[3], &isolation_level)) {
isolation_level = NULL;
}
else if (PyUnicode_Check(fastargs[3])) {
Py_ssize_t isolation_level_length;
isolation_level = PyUnicode_AsUTF8AndSize(fastargs[3], &isolation_level_length);
if (isolation_level == NULL) {
goto exit;
}
if (strlen(isolation_level) != (size_t)isolation_level_length) {
PyErr_SetString(PyExc_ValueError, "embedded null character");
goto exit;
}
}
else {
_PyArg_BadArgument("Connection", "argument 'isolation_level'", "str or None", fastargs[3]);
goto exit; goto exit;
} }
if (!--noptargs) { if (!--noptargs) {
@ -851,4 +836,4 @@ exit:
#ifndef PYSQLITE_CONNECTION_LOAD_EXTENSION_METHODDEF #ifndef PYSQLITE_CONNECTION_LOAD_EXTENSION_METHODDEF
#define PYSQLITE_CONNECTION_LOAD_EXTENSION_METHODDEF #define PYSQLITE_CONNECTION_LOAD_EXTENSION_METHODDEF
#endif /* !defined(PYSQLITE_CONNECTION_LOAD_EXTENSION_METHODDEF) */ #endif /* !defined(PYSQLITE_CONNECTION_LOAD_EXTENSION_METHODDEF) */
/*[clinic end generated code: output=6f267f20e77f92d0 input=a9049054013a1b77]*/ /*[clinic end generated code: output=c2faf6563397091b input=a9049054013a1b77]*/

View File

@ -33,6 +33,60 @@
#define HAVE_TRACE_V2 #define HAVE_TRACE_V2
#endif #endif
static const char *
get_isolation_level(const char *level)
{
assert(level != NULL);
static const char *const allowed_levels[] = {
"",
"DEFERRED",
"IMMEDIATE",
"EXCLUSIVE",
NULL
};
for (int i = 0; allowed_levels[i] != NULL; i++) {
const char *candidate = allowed_levels[i];
if (sqlite3_stricmp(level, candidate) == 0) {
return candidate;
}
}
PyErr_SetString(PyExc_ValueError,
"isolation_level string must be '', 'DEFERRED', "
"'IMMEDIATE', or 'EXCLUSIVE'");
return NULL;
}
static int
isolation_level_converter(PyObject *str_or_none, const char **result)
{
if (Py_IsNone(str_or_none)) {
*result = NULL;
}
else if (PyUnicode_Check(str_or_none)) {
Py_ssize_t sz;
const char *str = PyUnicode_AsUTF8AndSize(str_or_none, &sz);
if (str == NULL) {
return 0;
}
if (strlen(str) != (size_t)sz) {
PyErr_SetString(PyExc_ValueError, "embedded null character");
return 0;
}
const char *level = get_isolation_level(str);
if (level == NULL) {
return 0;
}
*result = level;
}
else {
PyErr_SetString(PyExc_TypeError,
"isolation_level must be str or None");
return 0;
}
return 1;
}
static int static int
clinic_fsconverter(PyObject *pathlike, const char **result) clinic_fsconverter(PyObject *pathlike, const char **result)
{ {
@ -100,29 +154,6 @@ new_statement_cache(pysqlite_Connection *self, pysqlite_state *state,
return res; return res;
} }
static const char *
get_isolation_level(const char *level)
{
assert(level != NULL);
static const char *const allowed_levels[] = {
"",
"DEFERRED",
"IMMEDIATE",
"EXCLUSIVE",
NULL
};
for (int i = 0; allowed_levels[i] != NULL; i++) {
const char *candidate = allowed_levels[i];
if (sqlite3_stricmp(level, candidate) == 0) {
return candidate;
}
}
PyErr_SetString(PyExc_ValueError,
"isolation_level string must be '', 'DEFERRED', "
"'IMMEDIATE', or 'EXCLUSIVE'");
return NULL;
}
/*[python input] /*[python input]
class FSConverter_converter(CConverter): class FSConverter_converter(CConverter):
type = "const char *" type = "const char *"
@ -131,8 +162,13 @@ class FSConverter_converter(CConverter):
self.c_default = "NULL" self.c_default = "NULL"
def cleanup(self): def cleanup(self):
return f"PyMem_Free((void *){self.name});\n" return f"PyMem_Free((void *){self.name});\n"
class IsolationLevel_converter(CConverter):
type = "const char *"
converter = "isolation_level_converter"
[python start generated code]*/ [python start generated code]*/
/*[python end generated code: output=da39a3ee5e6b4b0d input=7b3be538bc4058c0]*/ /*[python end generated code: output=da39a3ee5e6b4b0d input=be142323885672ab]*/
/*[clinic input] /*[clinic input]
_sqlite3.Connection.__init__ as pysqlite_connection_init _sqlite3.Connection.__init__ as pysqlite_connection_init
@ -140,7 +176,7 @@ _sqlite3.Connection.__init__ as pysqlite_connection_init
database: FSConverter database: FSConverter
timeout: double = 5.0 timeout: double = 5.0
detect_types: int = 0 detect_types: int = 0
isolation_level: str(accept={str, NoneType}) = "" isolation_level: IsolationLevel = ""
check_same_thread: bool(accept={int}) = True check_same_thread: bool(accept={int}) = True
factory: object(c_default='(PyObject*)clinic_state()->ConnectionType') = ConnectionType factory: object(c_default='(PyObject*)clinic_state()->ConnectionType') = ConnectionType
cached_statements as cache_size: int = 128 cached_statements as cache_size: int = 128
@ -153,7 +189,7 @@ pysqlite_connection_init_impl(pysqlite_Connection *self,
int detect_types, const char *isolation_level, int detect_types, const char *isolation_level,
int check_same_thread, PyObject *factory, int check_same_thread, PyObject *factory,
int cache_size, int uri) int cache_size, int uri)
/*[clinic end generated code: output=7d640ae1d83abfd4 input=35e316f66d9f70fd]*/ /*[clinic end generated code: output=7d640ae1d83abfd4 input=342173993434ba1e]*/
{ {
if (PySys_Audit("sqlite3.connect", "s", database) < 0) { if (PySys_Audit("sqlite3.connect", "s", database) < 0) {
return -1; return -1;
@ -189,15 +225,6 @@ pysqlite_connection_init_impl(pysqlite_Connection *self,
return -1; return -1;
} }
// Convert isolation level to begin statement.
const char *level = NULL;
if (isolation_level != NULL) {
level = get_isolation_level(isolation_level);
if (level == NULL) {
return -1;
}
}
// Create LRU statement cache; returns a new reference. // Create LRU statement cache; returns a new reference.
PyObject *statement_cache = new_statement_cache(self, state, cache_size); PyObject *statement_cache = new_statement_cache(self, state, cache_size);
if (statement_cache == NULL) { if (statement_cache == NULL) {
@ -215,7 +242,7 @@ pysqlite_connection_init_impl(pysqlite_Connection *self,
self->db = db; self->db = db;
self->state = state; self->state = state;
self->detect_types = detect_types; self->detect_types = detect_types;
self->isolation_level = level; self->isolation_level = isolation_level;
self->check_same_thread = check_same_thread; self->check_same_thread = check_same_thread;
self->thread_ident = PyThread_get_thread_ident(); self->thread_ident = PyThread_get_thread_ident();
self->statement_cache = statement_cache; self->statement_cache = statement_cache;
@ -1375,26 +1402,9 @@ pysqlite_connection_set_isolation_level(pysqlite_Connection* self, PyObject* iso
return -1; return -1;
} }
Py_DECREF(res); Py_DECREF(res);
return 0;
} }
else if (PyUnicode_Check(isolation_level)) { if (!isolation_level_converter(isolation_level, &self->isolation_level)) {
Py_ssize_t len;
const char *cstr_level = PyUnicode_AsUTF8AndSize(isolation_level, &len);
if (cstr_level == NULL) {
return -1;
}
if (strlen(cstr_level) != (size_t)len) {
PyErr_SetString(PyExc_ValueError, "embedded null character");
return -1;
}
const char *level = get_isolation_level(cstr_level);
if (level == NULL) {
return -1;
}
self->isolation_level = level;
}
else {
PyErr_SetString(PyExc_TypeError,
"isolation_level must be str or None");
return -1; return -1;
} }
return 0; return 0;