diff --git a/Lib/sqlite3/test/types.py b/Lib/sqlite3/test/types.py index 6667bc86860..0b5b3e7e3c2 100644 --- a/Lib/sqlite3/test/types.py +++ b/Lib/sqlite3/test/types.py @@ -274,6 +274,45 @@ class ColNamesTests(unittest.TestCase): self.cur.execute("select * from test where 0 = 1") self.assertEqual(self.cur.description[0][0], "x") + def CheckCursorDescriptionInsert(self): + self.cur.execute("insert into test values (1)") + self.assertIsNone(self.cur.description) + + +@unittest.skipIf(sqlite.sqlite_version_info < (3, 8, 3), "CTEs not supported") +class CommonTableExpressionTests(unittest.TestCase): + + def setUp(self): + self.con = sqlite.connect(":memory:") + self.cur = self.con.cursor() + self.cur.execute("create table test(x foo)") + + def tearDown(self): + self.cur.close() + self.con.close() + + def CheckCursorDescriptionCTESimple(self): + self.cur.execute("with one as (select 1) select * from one") + self.assertIsNotNone(self.cur.description) + self.assertEqual(self.cur.description[0][0], "1") + + def CheckCursorDescriptionCTESMultipleColumns(self): + self.cur.execute("insert into test values(1)") + self.cur.execute("insert into test values(2)") + self.cur.execute("with testCTE as (select * from test) select * from testCTE") + self.assertIsNotNone(self.cur.description) + self.assertEqual(self.cur.description[0][0], "x") + + def CheckCursorDescriptionCTE(self): + self.cur.execute("insert into test values (1)") + self.cur.execute("with bar as (select * from test) select * from test where x = 1") + self.assertIsNotNone(self.cur.description) + self.assertEqual(self.cur.description[0][0], "x") + self.cur.execute("with bar as (select * from test) select * from test where x = 2") + self.assertIsNotNone(self.cur.description) + self.assertEqual(self.cur.description[0][0], "x") + + class ObjectAdaptationTests(unittest.TestCase): def cast(obj): return float(obj) @@ -372,7 +411,8 @@ def suite(): adaptation_suite = unittest.makeSuite(ObjectAdaptationTests, "Check") bin_suite = unittest.makeSuite(BinaryConverterTests, "Check") date_suite = unittest.makeSuite(DateTimeTests, "Check") - return unittest.TestSuite((sqlite_type_suite, decltypes_type_suite, colnames_type_suite, adaptation_suite, bin_suite, date_suite)) + cte_suite = unittest.makeSuite(CommonTableExpressionTests, "Check") + return unittest.TestSuite((sqlite_type_suite, decltypes_type_suite, colnames_type_suite, adaptation_suite, bin_suite, date_suite, cte_suite)) def test(): runner = unittest.TextTestRunner() diff --git a/Misc/NEWS b/Misc/NEWS index 36af9741551..54cae1da9fb 100644 --- a/Misc/NEWS +++ b/Misc/NEWS @@ -46,6 +46,8 @@ Core and Builtins Library ------- +- Issue #21718: cursor.description is now available for queries using CTEs. + - Issue #2466: posixpath.ismount now correctly recognizes mount points which the user does not have permission to access. diff --git a/Modules/_sqlite/cursor.c b/Modules/_sqlite/cursor.c index 300da2878e4..e1676de9071 100644 --- a/Modules/_sqlite/cursor.c +++ b/Modules/_sqlite/cursor.c @@ -646,12 +646,11 @@ PyObject* _pysqlite_query_execute(pysqlite_Cursor* self, int multiple, PyObject* goto error; } - if (rc == SQLITE_ROW || (rc == SQLITE_DONE && statement_type == STATEMENT_SELECT)) { - if (self->description == Py_None) { - Py_BEGIN_ALLOW_THREADS - numcols = sqlite3_column_count(self->statement->st); - Py_END_ALLOW_THREADS - + if (rc == SQLITE_ROW || rc == SQLITE_DONE) { + Py_BEGIN_ALLOW_THREADS + numcols = sqlite3_column_count(self->statement->st); + Py_END_ALLOW_THREADS + if (self->description == Py_None && numcols > 0) { Py_SETREF(self->description, PyTuple_New(numcols)); if (!self->description) { goto error;