# Mimic the sqlite3 console shell's .dump command # Author: Paul Kippes # Every identifier in sql is quoted based on a comment in sqlite # documentation "SQLite adds new keywords from time to time when it # takes on new features. So to prevent your code from being broken by # future enhancements, you should normally quote any identifier that # is an English language word, even if you do not have to." def _quote_name(name): return '"{0}"'.format(name.replace('"', '""')) def _quote_value(value): return "'{0}'".format(value.replace("'", "''")) def _iterdump(connection, *, filter=None): """ Returns an iterator to the dump of the database in an SQL text format. Used to produce an SQL dump of the database. Useful to save an in-memory database for later restoration. This function should not be called directly but instead called from the Connection method, iterdump(). """ writeable_schema = False cu = connection.cursor() # Disable foreign key constraints, if there is any foreign key violation. violations = cu.execute("PRAGMA foreign_key_check").fetchall() if violations: yield('PRAGMA foreign_keys=OFF;') yield('BEGIN TRANSACTION;') if filter: # Return database objects which match the filter pattern. filter_name_clause = 'AND "name" LIKE ?' params = [filter] else: filter_name_clause = "" params = [] # sqlite_master table contains the SQL CREATE statements for the database. q = f""" SELECT "name", "type", "sql" FROM "sqlite_master" WHERE "sql" NOT NULL AND "type" == 'table' {filter_name_clause} ORDER BY "name" """ schema_res = cu.execute(q, params) sqlite_sequence = [] for table_name, type, sql in schema_res.fetchall(): if table_name == 'sqlite_sequence': rows = cu.execute('SELECT * FROM "sqlite_sequence";') sqlite_sequence = ['DELETE FROM "sqlite_sequence"'] sqlite_sequence += [ f'INSERT INTO "sqlite_sequence" VALUES({_quote_value(table_name)},{seq_value})' for table_name, seq_value in rows.fetchall() ] continue elif table_name == 'sqlite_stat1': yield('ANALYZE "sqlite_master";') elif table_name.startswith('sqlite_'): continue elif sql.startswith('CREATE VIRTUAL TABLE'): if not writeable_schema: writeable_schema = True yield('PRAGMA writable_schema=ON;') yield("INSERT INTO sqlite_master(type,name,tbl_name,rootpage,sql)" "VALUES('table',{0},{0},0,{1});".format( _quote_value(table_name), _quote_value(sql), )) else: yield('{0};'.format(sql)) # Build the insert statement for each row of the current table table_name_ident = _quote_name(table_name) res = cu.execute(f'PRAGMA table_info({table_name_ident})') column_names = [str(table_info[1]) for table_info in res.fetchall()] q = "SELECT 'INSERT INTO {0} VALUES('{1}')' FROM {0};".format( table_name_ident, "','".join( "||quote({0})||".format(_quote_name(col)) for col in column_names ) ) query_res = cu.execute(q) for row in query_res: yield("{0};".format(row[0])) # Now when the type is 'index', 'trigger', or 'view' q = f""" SELECT "name", "type", "sql" FROM "sqlite_master" WHERE "sql" NOT NULL AND "type" IN ('index', 'trigger', 'view') {filter_name_clause} """ schema_res = cu.execute(q, params) for name, type, sql in schema_res.fetchall(): yield('{0};'.format(sql)) if writeable_schema: yield('PRAGMA writable_schema=OFF;') # gh-79009: Yield statements concerning the sqlite_sequence table at the # end of the transaction. for row in sqlite_sequence: yield('{0};'.format(row)) yield('COMMIT;')