cpython/Lib/test/test_sqlite3/test_dump.py

231 lines
8.8 KiB
Python

# Author: Paul Kippes <kippesp@gmail.com>
import unittest
from .util import memory_database
from .util import MemoryDatabaseMixin
class DumpTests(MemoryDatabaseMixin, unittest.TestCase):
def test_table_dump(self):
expected_sqls = [
"""CREATE TABLE "index"("index" blob);"""
,
"""INSERT INTO "index" VALUES(X'01');"""
,
"""CREATE TABLE "quoted""table"("quoted""field" text);"""
,
"""INSERT INTO "quoted""table" VALUES('quoted''value');"""
,
"CREATE TABLE t1(id integer primary key, s1 text, " \
"t1_i1 integer not null, i2 integer, unique (s1), " \
"constraint t1_idx1 unique (i2), " \
"constraint t1_i1_idx1 unique (t1_i1));"
,
"INSERT INTO \"t1\" VALUES(1,'foo',10,20);"
,
"INSERT INTO \"t1\" VALUES(2,'foo2',30,30);"
,
"CREATE TABLE t2(id integer, t2_i1 integer, " \
"t2_i2 integer, primary key (id)," \
"foreign key(t2_i1) references t1(t1_i1));"
,
# Foreign key violation.
"INSERT INTO \"t2\" VALUES(1,2,3);"
,
"CREATE TRIGGER trigger_1 update of t1_i1 on t1 " \
"begin " \
"update t2 set t2_i1 = new.t1_i1 where t2_i1 = old.t1_i1; " \
"end;"
,
"CREATE VIEW v1 as select * from t1 left join t2 " \
"using (id);"
]
[self.cu.execute(s) for s in expected_sqls]
i = self.cx.iterdump()
actual_sqls = [s for s in i]
expected_sqls = [
"PRAGMA foreign_keys=OFF;",
"BEGIN TRANSACTION;",
*expected_sqls,
"COMMIT;",
]
[self.assertEqual(expected_sqls[i], actual_sqls[i])
for i in range(len(expected_sqls))]
def test_table_dump_filter(self):
all_table_sqls = [
"""CREATE TABLE "some_table_2" ("id_1" INTEGER);""",
"""INSERT INTO "some_table_2" VALUES(3);""",
"""INSERT INTO "some_table_2" VALUES(4);""",
"""CREATE TABLE "test_table_1" ("id_2" INTEGER);""",
"""INSERT INTO "test_table_1" VALUES(1);""",
"""INSERT INTO "test_table_1" VALUES(2);""",
]
all_views_sqls = [
"""CREATE VIEW "view_1" AS SELECT * FROM "some_table_2";""",
"""CREATE VIEW "view_2" AS SELECT * FROM "test_table_1";""",
]
# Create database structure.
for sql in [*all_table_sqls, *all_views_sqls]:
self.cu.execute(sql)
# %_table_% matches all tables.
dump_sqls = list(self.cx.iterdump(filter="%_table_%"))
self.assertEqual(
dump_sqls,
["BEGIN TRANSACTION;", *all_table_sqls, "COMMIT;"],
)
# view_% matches all views.
dump_sqls = list(self.cx.iterdump(filter="view_%"))
self.assertEqual(
dump_sqls,
["BEGIN TRANSACTION;", *all_views_sqls, "COMMIT;"],
)
# %_1 matches tables and views with the _1 suffix.
dump_sqls = list(self.cx.iterdump(filter="%_1"))
self.assertEqual(
dump_sqls,
[
"BEGIN TRANSACTION;",
"""CREATE TABLE "test_table_1" ("id_2" INTEGER);""",
"""INSERT INTO "test_table_1" VALUES(1);""",
"""INSERT INTO "test_table_1" VALUES(2);""",
"""CREATE VIEW "view_1" AS SELECT * FROM "some_table_2";""",
"COMMIT;"
],
)
# some_% matches some_table_2.
dump_sqls = list(self.cx.iterdump(filter="some_%"))
self.assertEqual(
dump_sqls,
[
"BEGIN TRANSACTION;",
"""CREATE TABLE "some_table_2" ("id_1" INTEGER);""",
"""INSERT INTO "some_table_2" VALUES(3);""",
"""INSERT INTO "some_table_2" VALUES(4);""",
"COMMIT;"
],
)
# Only single object.
dump_sqls = list(self.cx.iterdump(filter="view_2"))
self.assertEqual(
dump_sqls,
[
"BEGIN TRANSACTION;",
"""CREATE VIEW "view_2" AS SELECT * FROM "test_table_1";""",
"COMMIT;"
],
)
# % matches all objects.
dump_sqls = list(self.cx.iterdump(filter="%"))
self.assertEqual(
dump_sqls,
["BEGIN TRANSACTION;", *all_table_sqls, *all_views_sqls, "COMMIT;"],
)
def test_dump_autoincrement(self):
expected = [
'CREATE TABLE "t1" (id integer primary key autoincrement);',
'INSERT INTO "t1" VALUES(NULL);',
'CREATE TABLE "t2" (id integer primary key autoincrement);',
]
self.cu.executescript("".join(expected))
# the NULL value should now be automatically be set to 1
expected[1] = expected[1].replace("NULL", "1")
expected.insert(0, "BEGIN TRANSACTION;")
expected.extend([
'DELETE FROM "sqlite_sequence";',
'INSERT INTO "sqlite_sequence" VALUES(\'t1\',1);',
'COMMIT;',
])
actual = [stmt for stmt in self.cx.iterdump()]
self.assertEqual(expected, actual)
def test_dump_autoincrement_create_new_db(self):
self.cu.execute("BEGIN TRANSACTION")
self.cu.execute("CREATE TABLE t1 (id integer primary key autoincrement)")
self.cu.execute("CREATE TABLE t2 (id integer primary key autoincrement)")
self.cu.executemany("INSERT INTO t1 VALUES(?)", ((None,) for _ in range(9)))
self.cu.executemany("INSERT INTO t2 VALUES(?)", ((None,) for _ in range(4)))
self.cx.commit()
with memory_database() as cx2:
query = "".join(self.cx.iterdump())
cx2.executescript(query)
cu2 = cx2.cursor()
dataset = (
("t1", 9),
("t2", 4),
)
for table, seq in dataset:
with self.subTest(table=table, seq=seq):
res = cu2.execute("""
SELECT "seq" FROM "sqlite_sequence" WHERE "name" == ?
""", (table,))
rows = res.fetchall()
self.assertEqual(rows[0][0], seq)
def test_unorderable_row(self):
# iterdump() should be able to cope with unorderable row types (issue #15545)
class UnorderableRow:
def __init__(self, cursor, row):
self.row = row
def __getitem__(self, index):
return self.row[index]
self.cx.row_factory = UnorderableRow
CREATE_ALPHA = """CREATE TABLE "alpha" ("one");"""
CREATE_BETA = """CREATE TABLE "beta" ("two");"""
expected = [
"BEGIN TRANSACTION;",
CREATE_ALPHA,
CREATE_BETA,
"COMMIT;"
]
self.cu.execute(CREATE_BETA)
self.cu.execute(CREATE_ALPHA)
got = list(self.cx.iterdump())
self.assertEqual(expected, got)
def test_dump_custom_row_factory(self):
# gh-118221: iterdump should be able to cope with custom row factories.
def dict_factory(cu, row):
fields = [col[0] for col in cu.description]
return dict(zip(fields, row))
self.cx.row_factory = dict_factory
CREATE_TABLE = "CREATE TABLE test(t);"
expected = ["BEGIN TRANSACTION;", CREATE_TABLE, "COMMIT;"]
self.cu.execute(CREATE_TABLE)
actual = list(self.cx.iterdump())
self.assertEqual(expected, actual)
self.assertEqual(self.cx.row_factory, dict_factory)
def test_dump_virtual_tables(self):
# gh-64662
expected = [
"BEGIN TRANSACTION;",
"PRAGMA writable_schema=ON;",
("INSERT INTO sqlite_master(type,name,tbl_name,rootpage,sql)"
"VALUES('table','test','test',0,'CREATE VIRTUAL TABLE test USING fts4(example)');"),
"CREATE TABLE 'test_content'(docid INTEGER PRIMARY KEY, 'c0example');",
"CREATE TABLE 'test_docsize'(docid INTEGER PRIMARY KEY, size BLOB);",
("CREATE TABLE 'test_segdir'(level INTEGER,idx INTEGER,start_block INTEGER,"
"leaves_end_block INTEGER,end_block INTEGER,root BLOB,PRIMARY KEY(level, idx));"),
"CREATE TABLE 'test_segments'(blockid INTEGER PRIMARY KEY, block BLOB);",
"CREATE TABLE 'test_stat'(id INTEGER PRIMARY KEY, value BLOB);",
"PRAGMA writable_schema=OFF;",
"COMMIT;"
]
self.cu.execute("CREATE VIRTUAL TABLE test USING fts4(example)")
actual = list(self.cx.iterdump())
self.assertEqual(expected, actual)
if __name__ == "__main__":
unittest.main()