# Author: Paul Kippes import unittest from .util import memory_database from .util import MemoryDatabaseMixin class DumpTests(MemoryDatabaseMixin, unittest.TestCase): def test_table_dump(self): expected_sqls = [ "PRAGMA foreign_keys=OFF;", """CREATE TABLE "index"("index" blob);""" , """INSERT INTO "index" VALUES(X'01');""" , """CREATE TABLE "quoted""table"("quoted""field" text);""" , """INSERT INTO "quoted""table" VALUES('quoted''value');""" , "CREATE TABLE t1(id integer primary key, s1 text, " \ "t1_i1 integer not null, i2 integer, unique (s1), " \ "constraint t1_idx1 unique (i2), " \ "constraint t1_i1_idx1 unique (t1_i1));" , "INSERT INTO \"t1\" VALUES(1,'foo',10,20);" , "INSERT INTO \"t1\" VALUES(2,'foo2',30,30);" , "CREATE TABLE t2(id integer, t2_i1 integer, " \ "t2_i2 integer, primary key (id)," \ "foreign key(t2_i1) references t1(t1_i1));" , # Foreign key violation. "INSERT INTO \"t2\" VALUES(1,2,3);" , "CREATE TRIGGER trigger_1 update of t1_i1 on t1 " \ "begin " \ "update t2 set t2_i1 = new.t1_i1 where t2_i1 = old.t1_i1; " \ "end;" , "CREATE VIEW v1 as select * from t1 left join t2 " \ "using (id);" ] [self.cu.execute(s) for s in expected_sqls] i = self.cx.iterdump() actual_sqls = [s for s in i] expected_sqls = [ "PRAGMA foreign_keys=OFF;", "BEGIN TRANSACTION;", *expected_sqls[1:], "COMMIT;", ] [self.assertEqual(expected_sqls[i], actual_sqls[i]) for i in range(len(expected_sqls))] def test_table_dump_filter(self): all_table_sqls = [ """CREATE TABLE "some_table_2" ("id_1" INTEGER);""", """INSERT INTO "some_table_2" VALUES(3);""", """INSERT INTO "some_table_2" VALUES(4);""", """CREATE TABLE "test_table_1" ("id_2" INTEGER);""", """INSERT INTO "test_table_1" VALUES(1);""", """INSERT INTO "test_table_1" VALUES(2);""", ] all_views_sqls = [ """CREATE VIEW "view_1" AS SELECT * FROM "some_table_2";""", """CREATE VIEW "view_2" AS SELECT * FROM "test_table_1";""", ] # Create database structure. for sql in [*all_table_sqls, *all_views_sqls]: self.cu.execute(sql) # %_table_% matches all tables. dump_sqls = list(self.cx.iterdump(filter="%_table_%")) self.assertEqual( dump_sqls, ["BEGIN TRANSACTION;", *all_table_sqls, "COMMIT;"], ) # view_% matches all views. dump_sqls = list(self.cx.iterdump(filter="view_%")) self.assertEqual( dump_sqls, ["BEGIN TRANSACTION;", *all_views_sqls, "COMMIT;"], ) # %_1 matches tables and views with the _1 suffix. dump_sqls = list(self.cx.iterdump(filter="%_1")) self.assertEqual( dump_sqls, [ "BEGIN TRANSACTION;", """CREATE TABLE "test_table_1" ("id_2" INTEGER);""", """INSERT INTO "test_table_1" VALUES(1);""", """INSERT INTO "test_table_1" VALUES(2);""", """CREATE VIEW "view_1" AS SELECT * FROM "some_table_2";""", "COMMIT;" ], ) # some_% matches some_table_2. dump_sqls = list(self.cx.iterdump(filter="some_%")) self.assertEqual( dump_sqls, [ "BEGIN TRANSACTION;", """CREATE TABLE "some_table_2" ("id_1" INTEGER);""", """INSERT INTO "some_table_2" VALUES(3);""", """INSERT INTO "some_table_2" VALUES(4);""", "COMMIT;" ], ) # Only single object. dump_sqls = list(self.cx.iterdump(filter="view_2")) self.assertEqual( dump_sqls, [ "BEGIN TRANSACTION;", """CREATE VIEW "view_2" AS SELECT * FROM "test_table_1";""", "COMMIT;" ], ) # % matches all objects. dump_sqls = list(self.cx.iterdump(filter="%")) self.assertEqual( dump_sqls, ["BEGIN TRANSACTION;", *all_table_sqls, *all_views_sqls, "COMMIT;"], ) def test_dump_autoincrement(self): expected = [ 'CREATE TABLE "t1" (id integer primary key autoincrement);', 'INSERT INTO "t1" VALUES(NULL);', 'CREATE TABLE "t2" (id integer primary key autoincrement);', ] self.cu.executescript("".join(expected)) # the NULL value should now be automatically be set to 1 expected[1] = expected[1].replace("NULL", "1") expected.insert(0, "BEGIN TRANSACTION;") expected.extend([ 'DELETE FROM "sqlite_sequence";', 'INSERT INTO "sqlite_sequence" VALUES(\'t1\',1);', 'COMMIT;', ]) actual = [stmt for stmt in self.cx.iterdump()] self.assertEqual(expected, actual) def test_dump_autoincrement_create_new_db(self): self.cu.execute("BEGIN TRANSACTION") self.cu.execute("CREATE TABLE t1 (id integer primary key autoincrement)") self.cu.execute("CREATE TABLE t2 (id integer primary key autoincrement)") self.cu.executemany("INSERT INTO t1 VALUES(?)", ((None,) for _ in range(9))) self.cu.executemany("INSERT INTO t2 VALUES(?)", ((None,) for _ in range(4))) self.cx.commit() with memory_database() as cx2: query = "".join(self.cx.iterdump()) cx2.executescript(query) cu2 = cx2.cursor() dataset = ( ("t1", 9), ("t2", 4), ) for table, seq in dataset: with self.subTest(table=table, seq=seq): res = cu2.execute(""" SELECT "seq" FROM "sqlite_sequence" WHERE "name" == ? """, (table,)) rows = res.fetchall() self.assertEqual(rows[0][0], seq) def test_unorderable_row(self): # iterdump() should be able to cope with unorderable row types (issue #15545) class UnorderableRow: def __init__(self, cursor, row): self.row = row def __getitem__(self, index): return self.row[index] self.cx.row_factory = UnorderableRow CREATE_ALPHA = """CREATE TABLE "alpha" ("one");""" CREATE_BETA = """CREATE TABLE "beta" ("two");""" expected = [ "BEGIN TRANSACTION;", CREATE_ALPHA, CREATE_BETA, "COMMIT;" ] self.cu.execute(CREATE_BETA) self.cu.execute(CREATE_ALPHA) got = list(self.cx.iterdump()) self.assertEqual(expected, got) def test_dump_custom_row_factory(self): # gh-118221: iterdump should be able to cope with custom row factories. def dict_factory(cu, row): fields = [col[0] for col in cu.description] return dict(zip(fields, row)) self.cx.row_factory = dict_factory CREATE_TABLE = "CREATE TABLE test(t);" expected = ["BEGIN TRANSACTION;", CREATE_TABLE, "COMMIT;"] self.cu.execute(CREATE_TABLE) actual = list(self.cx.iterdump()) self.assertEqual(expected, actual) self.assertEqual(self.cx.row_factory, dict_factory) def test_dump_virtual_tables(self): # gh-64662 expected = [ "BEGIN TRANSACTION;", "PRAGMA writable_schema=ON;", ("INSERT INTO sqlite_master(type,name,tbl_name,rootpage,sql)" "VALUES('table','test','test',0,'CREATE VIRTUAL TABLE test USING fts4(example)');"), "CREATE TABLE 'test_content'(docid INTEGER PRIMARY KEY, 'c0example');", "CREATE TABLE 'test_docsize'(docid INTEGER PRIMARY KEY, size BLOB);", ("CREATE TABLE 'test_segdir'(level INTEGER,idx INTEGER,start_block INTEGER," "leaves_end_block INTEGER,end_block INTEGER,root BLOB,PRIMARY KEY(level, idx));"), "CREATE TABLE 'test_segments'(blockid INTEGER PRIMARY KEY, block BLOB);", "CREATE TABLE 'test_stat'(id INTEGER PRIMARY KEY, value BLOB);", "PRAGMA writable_schema=OFF;", "COMMIT;" ] self.cu.execute("CREATE VIRTUAL TABLE test USING fts4(example)") actual = list(self.cx.iterdump()) self.assertEqual(expected, actual) if __name__ == "__main__": unittest.main()