# pysqlite2/test/transactions.py: tests transactions # # Copyright (C) 2005-2007 Gerhard Häring # # This file is part of pysqlite. # # This software is provided 'as-is', without any express or implied # warranty. In no event will the authors be held liable for any damages # arising from the use of this software. # # Permission is granted to anyone to use this software for any purpose, # including commercial applications, and to alter it and redistribute it # freely, subject to the following restrictions: # # 1. The origin of this software must not be misrepresented; you must not # claim that you wrote the original software. If you use this software # in a product, an acknowledgment in the product documentation would be # appreciated but is not required. # 2. Altered source versions must be plainly marked as such, and must not be # misrepresented as being the original software. # 3. This notice may not be removed or altered from any source distribution. import unittest import sqlite3 as sqlite from contextlib import contextmanager from test.support.os_helper import TESTFN, unlink from test.support.script_helper import assert_python_ok from .util import memory_database from .util import MemoryDatabaseMixin class TransactionTests(unittest.TestCase): def setUp(self): # We can disable the busy handlers, since we control # the order of SQLite C API operations. self.con1 = sqlite.connect(TESTFN, timeout=0) self.cur1 = self.con1.cursor() self.con2 = sqlite.connect(TESTFN, timeout=0) self.cur2 = self.con2.cursor() def tearDown(self): try: self.cur1.close() self.con1.close() self.cur2.close() self.con2.close() finally: unlink(TESTFN) def test_dml_does_not_auto_commit_before(self): self.cur1.execute("create table test(i)") self.cur1.execute("insert into test(i) values (5)") self.cur1.execute("create table test2(j)") self.cur2.execute("select i from test") res = self.cur2.fetchall() self.assertEqual(len(res), 0) def test_insert_starts_transaction(self): self.cur1.execute("create table test(i)") self.cur1.execute("insert into test(i) values (5)") self.cur2.execute("select i from test") res = self.cur2.fetchall() self.assertEqual(len(res), 0) def test_update_starts_transaction(self): self.cur1.execute("create table test(i)") self.cur1.execute("insert into test(i) values (5)") self.con1.commit() self.cur1.execute("update test set i=6") self.cur2.execute("select i from test") res = self.cur2.fetchone()[0] self.assertEqual(res, 5) def test_delete_starts_transaction(self): self.cur1.execute("create table test(i)") self.cur1.execute("insert into test(i) values (5)") self.con1.commit() self.cur1.execute("delete from test") self.cur2.execute("select i from test") res = self.cur2.fetchall() self.assertEqual(len(res), 1) def test_replace_starts_transaction(self): self.cur1.execute("create table test(i)") self.cur1.execute("insert into test(i) values (5)") self.con1.commit() self.cur1.execute("replace into test(i) values (6)") self.cur2.execute("select i from test") res = self.cur2.fetchall() self.assertEqual(len(res), 1) self.assertEqual(res[0][0], 5) def test_toggle_auto_commit(self): self.cur1.execute("create table test(i)") self.cur1.execute("insert into test(i) values (5)") self.con1.isolation_level = None self.assertEqual(self.con1.isolation_level, None) self.cur2.execute("select i from test") res = self.cur2.fetchall() self.assertEqual(len(res), 1) self.con1.isolation_level = "DEFERRED" self.assertEqual(self.con1.isolation_level , "DEFERRED") self.cur1.execute("insert into test(i) values (5)") self.cur2.execute("select i from test") res = self.cur2.fetchall() self.assertEqual(len(res), 1) def test_raise_timeout(self): self.cur1.execute("create table test(i)") self.cur1.execute("insert into test(i) values (5)") with self.assertRaises(sqlite.OperationalError): self.cur2.execute("insert into test(i) values (5)") def test_locking(self): # This tests the improved concurrency with pysqlite 2.3.4. You needed # to roll back con2 before you could commit con1. self.cur1.execute("create table test(i)") self.cur1.execute("insert into test(i) values (5)") with self.assertRaises(sqlite.OperationalError): self.cur2.execute("insert into test(i) values (5)") # NO self.con2.rollback() HERE!!! self.con1.commit() def test_rollback_cursor_consistency(self): """Check that cursors behave correctly after rollback.""" with memory_database() as con: cur = con.cursor() cur.execute("create table test(x)") cur.execute("insert into test(x) values (5)") cur.execute("select 1 union select 2 union select 3") con.rollback() self.assertEqual(cur.fetchall(), [(1,), (2,), (3,)]) def test_multiple_cursors_and_iternext(self): # gh-94028: statements are cleared and reset in cursor iternext. # Provoke the gh-94028 by using a cursor cache. CURSORS = {} def sql(cx, sql, *args): cu = cx.cursor() cu.execute(sql, args) CURSORS[id(sql)] = cu return cu self.con1.execute("create table t(t)") sql(self.con1, "insert into t values (?), (?), (?)", "u1", "u2", "u3") self.con1.commit() # On second connection, verify rows are visible, then delete them. count = sql(self.con2, "select count(*) from t").fetchone()[0] self.assertEqual(count, 3) changes = sql(self.con2, "delete from t").rowcount self.assertEqual(changes, 3) self.con2.commit() # Back in original connection, create 2 new users. sql(self.con1, "insert into t values (?)", "u4") sql(self.con1, "insert into t values (?)", "u5") # The second connection cannot see uncommitted changes. count = sql(self.con2, "select count(*) from t").fetchone()[0] self.assertEqual(count, 0) # First connection can see its own changes. count = sql(self.con1, "select count(*) from t").fetchone()[0] self.assertEqual(count, 2) # The second connection can now see the changes. self.con1.commit() count = sql(self.con2, "select count(*) from t").fetchone()[0] self.assertEqual(count, 2) class RollbackTests(unittest.TestCase): """bpo-44092: sqlite3 now leaves it to SQLite to resolve rollback issues""" def setUp(self): self.con = sqlite.connect(":memory:") self.cur1 = self.con.cursor() self.cur2 = self.con.cursor() with self.con: self.con.execute("create table t(c)"); self.con.executemany("insert into t values(?)", [(0,), (1,), (2,)]) self.cur1.execute("begin transaction") select = "select c from t" self.cur1.execute(select) self.con.rollback() self.res = self.cur2.execute(select) # Reusing stmt from cache def tearDown(self): self.con.close() def _check_rows(self): for i, row in enumerate(self.res): self.assertEqual(row[0], i) def test_no_duplicate_rows_after_rollback_del_cursor(self): del self.cur1 self._check_rows() def test_no_duplicate_rows_after_rollback_close_cursor(self): self.cur1.close() self._check_rows() def test_no_duplicate_rows_after_rollback_new_query(self): self.cur1.execute("select c from t where c = 1") self._check_rows() class SpecialCommandTests(MemoryDatabaseMixin, unittest.TestCase): def test_drop_table(self): self.cur.execute("create table test(i)") self.cur.execute("insert into test(i) values (5)") self.cur.execute("drop table test") def test_pragma(self): self.cur.execute("create table test(i)") self.cur.execute("insert into test(i) values (5)") self.cur.execute("pragma count_changes=1") class TransactionalDDL(MemoryDatabaseMixin, unittest.TestCase): def test_ddl_does_not_autostart_transaction(self): # For backwards compatibility reasons, DDL statements should not # implicitly start a transaction. self.con.execute("create table test(i)") self.con.rollback() result = self.con.execute("select * from test").fetchall() self.assertEqual(result, []) def test_immediate_transactional_ddl(self): # You can achieve transactional DDL by issuing a BEGIN # statement manually. self.con.execute("begin immediate") self.con.execute("create table test(i)") self.con.rollback() with self.assertRaises(sqlite.OperationalError): self.con.execute("select * from test") def test_transactional_ddl(self): # You can achieve transactional DDL by issuing a BEGIN # statement manually. self.con.execute("begin") self.con.execute("create table test(i)") self.con.rollback() with self.assertRaises(sqlite.OperationalError): self.con.execute("select * from test") class IsolationLevelFromInit(unittest.TestCase): CREATE = "create table t(t)" INSERT = "insert into t values(1)" def setUp(self): self.traced = [] def _run_test(self, cx): cx.execute(self.CREATE) cx.set_trace_callback(lambda stmt: self.traced.append(stmt)) with cx: cx.execute(self.INSERT) def test_isolation_level_default(self): with memory_database() as cx: self._run_test(cx) self.assertEqual(self.traced, ["BEGIN ", self.INSERT, "COMMIT"]) def test_isolation_level_begin(self): with memory_database(isolation_level="") as cx: self._run_test(cx) self.assertEqual(self.traced, ["BEGIN ", self.INSERT, "COMMIT"]) def test_isolation_level_deferred(self): with memory_database(isolation_level="DEFERRED") as cx: self._run_test(cx) self.assertEqual(self.traced, ["BEGIN DEFERRED", self.INSERT, "COMMIT"]) def test_isolation_level_immediate(self): with memory_database(isolation_level="IMMEDIATE") as cx: self._run_test(cx) self.assertEqual(self.traced, ["BEGIN IMMEDIATE", self.INSERT, "COMMIT"]) def test_isolation_level_exclusive(self): with memory_database(isolation_level="EXCLUSIVE") as cx: self._run_test(cx) self.assertEqual(self.traced, ["BEGIN EXCLUSIVE", self.INSERT, "COMMIT"]) def test_isolation_level_none(self): with memory_database(isolation_level=None) as cx: self._run_test(cx) self.assertEqual(self.traced, [self.INSERT]) class IsolationLevelPostInit(unittest.TestCase): QUERY = "insert into t values(1)" def setUp(self): self.cx = sqlite.connect(":memory:") self.cx.execute("create table t(t)") self.traced = [] self.cx.set_trace_callback(lambda stmt: self.traced.append(stmt)) def tearDown(self): self.cx.close() def test_isolation_level_default(self): with self.cx: self.cx.execute(self.QUERY) self.assertEqual(self.traced, ["BEGIN ", self.QUERY, "COMMIT"]) def test_isolation_level_begin(self): self.cx.isolation_level = "" with self.cx: self.cx.execute(self.QUERY) self.assertEqual(self.traced, ["BEGIN ", self.QUERY, "COMMIT"]) def test_isolation_level_deferrred(self): self.cx.isolation_level = "DEFERRED" with self.cx: self.cx.execute(self.QUERY) self.assertEqual(self.traced, ["BEGIN DEFERRED", self.QUERY, "COMMIT"]) def test_isolation_level_immediate(self): self.cx.isolation_level = "IMMEDIATE" with self.cx: self.cx.execute(self.QUERY) self.assertEqual(self.traced, ["BEGIN IMMEDIATE", self.QUERY, "COMMIT"]) def test_isolation_level_exclusive(self): self.cx.isolation_level = "EXCLUSIVE" with self.cx: self.cx.execute(self.QUERY) self.assertEqual(self.traced, ["BEGIN EXCLUSIVE", self.QUERY, "COMMIT"]) def test_isolation_level_none(self): self.cx.isolation_level = None with self.cx: self.cx.execute(self.QUERY) self.assertEqual(self.traced, [self.QUERY]) class AutocommitAttribute(unittest.TestCase): """Test PEP 249-compliant autocommit behaviour.""" legacy = sqlite.LEGACY_TRANSACTION_CONTROL @contextmanager def check_stmt_trace(self, cx, expected, reset=True): try: traced = [] cx.set_trace_callback(lambda stmt: traced.append(stmt)) yield finally: self.assertEqual(traced, expected) if reset: cx.set_trace_callback(None) def test_autocommit_default(self): with memory_database() as cx: self.assertEqual(cx.autocommit, sqlite.LEGACY_TRANSACTION_CONTROL) def test_autocommit_setget(self): dataset = ( True, False, sqlite.LEGACY_TRANSACTION_CONTROL, ) for mode in dataset: with self.subTest(mode=mode): with memory_database(autocommit=mode) as cx: self.assertEqual(cx.autocommit, mode) with memory_database() as cx: cx.autocommit = mode self.assertEqual(cx.autocommit, mode) def test_autocommit_setget_invalid(self): msg = "autocommit must be True, False, or.*LEGACY" for mode in "a", 12, (), None: with self.subTest(mode=mode): with self.assertRaisesRegex(ValueError, msg): sqlite.connect(":memory:", autocommit=mode) def test_autocommit_disabled(self): expected = [ "SELECT 1", "COMMIT", "BEGIN", "ROLLBACK", "BEGIN", ] with memory_database(autocommit=False) as cx: self.assertTrue(cx.in_transaction) with self.check_stmt_trace(cx, expected): cx.execute("SELECT 1") cx.commit() cx.rollback() def test_autocommit_disabled_implicit_rollback(self): expected = ["ROLLBACK"] with memory_database(autocommit=False) as cx: self.assertTrue(cx.in_transaction) with self.check_stmt_trace(cx, expected, reset=False): cx.close() def test_autocommit_enabled(self): expected = ["CREATE TABLE t(t)", "INSERT INTO t VALUES(1)"] with memory_database(autocommit=True) as cx: self.assertFalse(cx.in_transaction) with self.check_stmt_trace(cx, expected): cx.execute("CREATE TABLE t(t)") cx.execute("INSERT INTO t VALUES(1)") self.assertFalse(cx.in_transaction) def test_autocommit_enabled_txn_ctl(self): for op in "commit", "rollback": with self.subTest(op=op): with memory_database(autocommit=True) as cx: meth = getattr(cx, op) self.assertFalse(cx.in_transaction) with self.check_stmt_trace(cx, []): meth() # expect this to pass silently self.assertFalse(cx.in_transaction) def test_autocommit_disabled_then_enabled(self): expected = ["COMMIT"] with memory_database(autocommit=False) as cx: self.assertTrue(cx.in_transaction) with self.check_stmt_trace(cx, expected): cx.autocommit = True # should commit self.assertFalse(cx.in_transaction) def test_autocommit_enabled_then_disabled(self): expected = ["BEGIN"] with memory_database(autocommit=True) as cx: self.assertFalse(cx.in_transaction) with self.check_stmt_trace(cx, expected): cx.autocommit = False # should begin self.assertTrue(cx.in_transaction) def test_autocommit_explicit_then_disabled(self): expected = ["BEGIN DEFERRED"] with memory_database(autocommit=True) as cx: self.assertFalse(cx.in_transaction) with self.check_stmt_trace(cx, expected): cx.execute("BEGIN DEFERRED") cx.autocommit = False # should now be a no-op self.assertTrue(cx.in_transaction) def test_autocommit_enabled_ctx_mgr(self): with memory_database(autocommit=True) as cx: # The context manager is a no-op if autocommit=True with self.check_stmt_trace(cx, []): with cx: self.assertFalse(cx.in_transaction) self.assertFalse(cx.in_transaction) def test_autocommit_disabled_ctx_mgr(self): expected = ["COMMIT", "BEGIN"] with memory_database(autocommit=False) as cx: with self.check_stmt_trace(cx, expected): with cx: self.assertTrue(cx.in_transaction) self.assertTrue(cx.in_transaction) def test_autocommit_compat_ctx_mgr(self): expected = ["BEGIN ", "INSERT INTO T VALUES(1)", "COMMIT"] with memory_database(autocommit=self.legacy) as cx: cx.execute("create table t(t)") with self.check_stmt_trace(cx, expected): with cx: self.assertFalse(cx.in_transaction) cx.execute("INSERT INTO T VALUES(1)") self.assertTrue(cx.in_transaction) self.assertFalse(cx.in_transaction) def test_autocommit_enabled_executescript(self): expected = ["BEGIN", "SELECT 1"] with memory_database(autocommit=True) as cx: with self.check_stmt_trace(cx, expected): self.assertFalse(cx.in_transaction) cx.execute("BEGIN") cx.executescript("SELECT 1") self.assertTrue(cx.in_transaction) def test_autocommit_disabled_executescript(self): expected = ["SELECT 1"] with memory_database(autocommit=False) as cx: with self.check_stmt_trace(cx, expected): self.assertTrue(cx.in_transaction) cx.executescript("SELECT 1") self.assertTrue(cx.in_transaction) def test_autocommit_compat_executescript(self): expected = ["BEGIN", "COMMIT", "SELECT 1"] with memory_database(autocommit=self.legacy) as cx: with self.check_stmt_trace(cx, expected): self.assertFalse(cx.in_transaction) cx.execute("BEGIN") cx.executescript("SELECT 1") self.assertFalse(cx.in_transaction) def test_autocommit_disabled_implicit_shutdown(self): # The implicit ROLLBACK should not call back into Python during # interpreter tear-down. code = """if 1: import sqlite3 cx = sqlite3.connect(":memory:", autocommit=False) cx.set_trace_callback(print) """ assert_python_ok("-c", code, PYTHONIOENCODING="utf-8") if __name__ == "__main__": unittest.main()