Further integration of the documentation for the sqlite3 module. There's still

quite some content to move over from the pysqlite manual, but it's a start now.
This commit is contained in:
Gerhard Häring 2006-05-01 15:14:48 +00:00
parent e3c958c33b
commit 82560ebb8d
30 changed files with 654 additions and 3 deletions

View File

@ -86,8 +86,8 @@ int, long, float, str (UTF-8 encoded), unicode or buffer.
A \class{Connection} instance has the following attributes and methods:
\begin{memberdesc}{isolation_level}
Get or set the current isolation level. None for autocommit mode or one
of "DEFERRED", "IMMEDIATE" or "EXLUSIVE". See `5. Controlling
Get or set the current isolation level. None for autocommit mode or one
of "DEFERRED", "IMMEDIATE" or "EXLUSIVE". See `5. Controlling
Transactions`_ for a more detailed explanation.
\end{memberdesc}
@ -96,4 +96,136 @@ A \class{Connection} instance has the following attributes and methods:
This is a custom cursor class which must extend \class{sqlite3.Cursor}.
\end{methoddesc}
TODO: execute*
\begin{methoddesc}{execute}{sql, \optional{parameters}}
This is a nonstandard shortcut that creates an intermediate cursor object by
calling the cursor method, then calls the cursor's execute method with the
parameters given.
\end{methoddesc}
\begin{methoddesc}{executemany}{sql, \optional{parameters}}
This is a nonstandard shortcut that creates an intermediate cursor object by
calling the cursor method, then calls the cursor's executemany method with the
parameters given.
\end{methoddesc}
\begin{methoddesc}{executescript}{sql_script}
This is a nonstandard shortcut that creates an intermediate cursor object by
calling the cursor method, then calls the cursor's executescript method with the
parameters given.
\end{methoddesc}
\begin{memberdesc}{row_factory}
You can change this attribute to a callable that accepts the cursor and
the original row as tuple and will return the real result row. This
way, you can implement more advanced ways of returning results, like
ones that can also access columns by name.
Example:
\verbatiminput{sqlite3/row_factory.py}
If the standard tuple types don't suffice for you, and you want name-based
access to columns, you should consider setting \member{row_factory} to the
highly-optimized pysqlite2.dbapi2.Row type. It provides both
index-based and case-insensitive name-based access to columns with almost
no memory overhead. Much better than your own custom dictionary-based
approach or even a db_row based solution.
\end{memberdesc}
\begin{memberdesc}{text_factory}
Using this attribute you can control what objects pysqlite returns for the
TEXT data type. By default, this attribute is set to ``unicode`` and
pysqlite will return Unicode objects for TEXT. If you want to return
bytestrings instead, you can set it to ``str``.
For efficiency reasons, there's also a way to return Unicode objects only
for non-ASCII data, and bytestrings otherwise. To activate it, set this
attribute to ``pysqlite2.dbapi2.OptimizedUnicode``.
You can also set it to any other callable that accepts a single bytestring
parameter and returns the result object.
See the following example code for illustration:
\verbatiminput{sqlite3/text_factory.py}
\end{memberdesc}
\begin{memberdesc}{total_changes}
Returns the total number of database rows that have be modified, inserted,
or deleted since the database connection was opened.
\end{memberdesc}
\subsection{Cursor Objects \label{Cursor-Objects}}
A \class{Cursor} instance has the following attributes and methods:
\begin{methoddesc}{execute}{sql, \optional{parameters}}
Executes a SQL statement. The SQL statement may be parametrized (i. e.
placeholders instead of SQL literals). The sqlite3 module supports two kinds of
placeholders: question marks (qmark style) and named placeholders (named
style).
This example shows how to use parameters with qmark style:
\verbatiminput{sqlite3/execute_1.py}
This example shows how to use the named style:
\verbatiminput{sqlite3/execute_2.py}
\method{execute} will only execute a single SQL statement. If you try to
execute more than one statement with it, it will raise a Warning. Use
\method{executescript} if want to execute multiple SQL statements with one
call.
\end{methoddesc}
\begin{methoddesc}{executemany}{sql, seq_of_parameters}
Executes a SQL command against all parameter sequences or mappings found in the
sequence \var{sql}. The \module{sqlite3} module also allows
to use an iterator yielding parameters instead of a sequence.
\verbatiminput{sqlite3/executemany_1.py}
Here's a shorter example using a generator:
\verbatiminput{sqlite3/executemany_2.py}
\end{methoddesc}
\begin{methoddesc}{executescript}{sql_script}
This is a nonstandard convenience method for executing multiple SQL statements
at once. It issues a COMMIT statement before, then executes the SQL script it
gets as a parameter.
\var{sql_script} can be a bytestring or a Unicode string.
Example:
\verbatiminput{sqlite3/executescript.py}
\end{methoddesc}
\begin{memberdesc}{rowcount}
Although the Cursors of the \module{sqlite3} module implement this
attribute, the database engine's own support for the determination of "rows
affected"/"rows selected" is quirky.
For \code{SELECT} statements, \member{rowcount} is always None because we cannot
determine the number of rows a query produced until all rows were fetched.
For \code{DELETE} statements, SQLite reports \member{rowcount} as 0 if you make a
\code{DELETE FROM table} without any condition.
For \method{executemany} statements, pysqlite sums up the number of
modifications into \member{rowcount}.
As required by the Python DB API Spec, the \member{rowcount} attribute "is -1
in case no executeXX() has been performed on the cursor or the rowcount
of the last operation is not determinable by the interface".
\end{memberdesc}

View File

@ -0,0 +1,14 @@
import sqlite3
import datetime, time
def adapt_datetime(ts):
return time.mktime(ts.timetuple())
sqlite3.register_adapter(datetime.datetime, adapt_datetime)
con = sqlite3.connect(":memory:")
cur = con.cursor()
now = datetime.datetime.now()
cur.execute("select ?", (now,))
print cur.fetchone()[0]

View File

@ -0,0 +1,17 @@
import sqlite3
class Point(object):
def __init__(self, x, y):
self.x, self.y = x, y
def __conform__(self, protocol):
if protocol is sqlite3.PrepareProtocol:
return "%f;%f" % (self.x, self.y)
con = sqlite3.connect(":memory:")
cur = con.cursor()
p = Point(4.0, -3.2)
cur.execute("select ?", (p,))
print cur.fetchone()[0]

View File

@ -0,0 +1,18 @@
import sqlite3
class Point(object):
def __init__(self, x, y):
self.x, self.y = x, y
def adapt_point(point):
return "%f;%f" % (point.x, point.y)
sqlite3.register_adapter(Point, adapt_point)
con = sqlite3.connect(":memory:")
cur = con.cursor()
p = Point(4.0, -3.2)
cur.execute("select ?", (p,))
print cur.fetchone()[0]

View File

@ -0,0 +1,15 @@
import sqlite3
def collate_reverse(string1, string2):
return -cmp(string1, string2)
con = sqlite3.connect(":memory:")
con.create_collation("reverse", collate_reverse)
cur = con.cursor()
cur.execute("create table test(x)")
cur.executemany("insert into test(x) values (?)", [("a",), ("b",)])
cur.execute("select x from test order by x collate reverse")
for row in cur:
print row
con.close()

View File

@ -0,0 +1,30 @@
# A minimal SQLite shell for experiments
import sqlite3
con = sqlite3.connect(":memory:")
con.isolation_level = None
cur = con.cursor()
buffer = ""
print "Enter your SQL commands to execute in sqlite3."
print "Enter a blank line to exit."
while True:
line = raw_input()
if line == "":
break
buffer += line
if sqlite3.complete_statement(buffer):
try:
buffer = buffer.strip()
cur.execute(buffer)
if buffer.lstrip().upper().startswith("SELECT"):
print cur.fetchall()
except sqlite3.Error, e:
print "An error occured:", e.args[0]
buffer = ""
con.close()

View File

@ -0,0 +1,3 @@
import sqlite3
con = sqlite3.connect("mydb")

View File

@ -0,0 +1,3 @@
import sqlite3
con = sqlite3.connect(":memory:")

View File

@ -0,0 +1,47 @@
import sqlite3
class Point(object):
def __init__(self, x, y):
self.x, self.y = x, y
def __repr__(self):
return "(%f;%f)" % (self.x, self.y)
def adapt_point(point):
return "%f;%f" % (point.x, point.y)
def convert_point(s):
x, y = map(float, s.split(";"))
return Point(x, y)
# Register the adapter
sqlite3.register_adapter(Point, adapt_point)
# Register the converter
sqlite3.register_converter("point", convert_point)
p = Point(4.0, -3.2)
#########################
# 1) Using declared types
con = sqlite3.connect(":memory:", detect_types=sqlite3.PARSE_DECLTYPES)
cur = con.cursor()
cur.execute("create table test(p point)")
cur.execute("insert into test(p) values (?)", (p,))
cur.execute("select p from test")
print "with declared types:", cur.fetchone()[0]
cur.close()
con.close()
#######################
# 1) Using column names
con = sqlite3.connect(":memory:", detect_types=sqlite3.PARSE_COLNAMES)
cur = con.cursor()
cur.execute("create table test(p)")
cur.execute("insert into test(p) values (?)", (p,))
cur.execute('select p as "p [point]" from test')
print "with column names:", cur.fetchone()[0]
cur.close()
con.close()

View File

@ -0,0 +1,15 @@
import sqlite3
class CountCursorsConnection(sqlite3.Connection):
def __init__(self, *args, **kwargs):
sqlite3.Connection.__init__(self, *args, **kwargs)
self.numcursors = 0
def cursor(self, *args, **kwargs):
self.numcursors += 1
return sqlite3.Connection.cursor(self, *args, **kwargs)
con = sqlite3.connect(":memory:", factory=CountCursorsConnection)
cur1 = con.cursor()
cur2 = con.cursor()
print con.numcursors

View File

@ -0,0 +1,28 @@
# Not referenced from the documentation, but builds the database file the other
# code snippets expect.
import sqlite3
import os
DB_FILE = "mydb"
if os.path.exists(DB_FILE):
os.remove(DB_FILE)
con = sqlite3.connect(DB_FILE)
cur = con.cursor()
cur.execute("""
create table people
(
name_last varchar(20),
age integer
)
""")
cur.execute("insert into people (name_last, age) values ('Yeltsin', 72)")
cur.execute("insert into people (name_last, age) values ('Putin', 51)")
con.commit()
cur.close()
con.close()

View File

@ -0,0 +1,17 @@
import sqlite3
con = sqlite3.connect("mydb")
cur = con.cursor()
SELECT = "select name_last, age from people order by age, name_last"
# 1. Iterate over the rows available from the cursor, unpacking the
# resulting sequences to yield their elements (name_last, age):
cur.execute(SELECT)
for (name_last, age) in cur:
print '%s is %d years old.' % (name_last, age)
# 2. Equivalently:
cur.execute(SELECT)
for row in cur:
print '%s is %d years old.' % (row[0], row[1])

View File

@ -0,0 +1,13 @@
import sqlite3
# Create a connection to the database file "mydb":
con = sqlite3.connect("mydb")
# Get a Cursor object that operates in the context of Connection con:
cur = con.cursor()
# Execute the SELECT statement:
cur.execute("select * from people order by age")
# Retrieve all rows as a sequence and print that sequence:
print cur.fetchall()

View File

@ -0,0 +1,11 @@
import sqlite3
con = sqlite3.connect("mydb")
cur = con.cursor()
who = "Yeltsin"
age = 72
cur.execute("select name_last, age from people where name_last=? and age=?", (who, age))
print cur.fetchone()

View File

@ -0,0 +1,13 @@
import sqlite3
con = sqlite3.connect("mydb")
cur = con.cursor()
who = "Yeltsin"
age = 72
cur.execute("select name_last, age from people where name_last=:who and age=:age",
{"who": who, "age": age})
print cur.fetchone()

View File

@ -0,0 +1,14 @@
import sqlite3
con = sqlite3.connect("mydb")
cur = con.cursor()
who = "Yeltsin"
age = 72
cur.execute("select name_last, age from people where name_last=:who and age=:age",
locals())
print cur.fetchone()

View File

@ -0,0 +1,24 @@
import sqlite3
class IterChars:
def __init__(self):
self.count = ord('a')
def __iter__(self):
return self
def next(self):
if self.count > ord('z'):
raise StopIteration
self.count += 1
return (chr(self.count - 1),) # this is a 1-tuple
con = sqlite3.connect(":memory:")
cur = con.cursor()
cur.execute("create table characters(c)")
theIter = IterChars()
cur.executemany("insert into characters(c) values (?)", theIter)
cur.execute("select c from characters")
print cur.fetchall()

View File

@ -0,0 +1,15 @@
import sqlite3
def char_generator():
import string
for c in string.letters[:26]:
yield (c,)
con = sqlite3.connect(":memory:")
cur = con.cursor()
cur.execute("create table characters(c)")
cur.executemany("insert into characters(c) values (?)", char_generator())
cur.execute("select c from characters")
print cur.fetchall()

View File

@ -0,0 +1,24 @@
import sqlite3
con = sqlite3.connect(":memory:")
cur = con.cursor()
cur.executescript("""
create table person(
firstname,
lastname,
age
);
create table book(
title,
author,
published
);
insert into book(title, author, published)
values (
'Dirk Gently''s Holistic Detective Agency
'Douglas Adams',
1987
);
""")

View File

@ -0,0 +1,17 @@
import sqlite3
con = sqlite3.connect("mydb")
cur = con.cursor()
newPeople = (
('Lebed' , 53),
('Zhirinovsky' , 57),
)
for person in newPeople:
cur.execute("insert into people (name_last, age) values (?, ?)", person)
# The changes will not be saved unless the transaction is committed explicitly:
con.commit()

View File

@ -0,0 +1,11 @@
import sqlite3
import md5
def md5sum(t):
return md5.md5(t).hexdigest()
con = sqlite3.connect(":memory:")
con.create_function("md5", 1, md5sum)
cur = con.cursor()
cur.execute("select md5(?)", ("foo",))
print cur.fetchone()[0]

View File

@ -0,0 +1,20 @@
import sqlite3
class MySum:
def __init__(self):
self.count = 0
def step(self, value):
self.count += value
def finalize(self):
return self.count
con = sqlite3.connect(":memory:")
con.create_aggregate("mysum", 1, MySum)
cur = con.cursor()
cur.execute("create table test(i)")
cur.execute("insert into test(i) values (1)")
cur.execute("insert into test(i) values (2)")
cur.execute("select mysum(i) from test")
print cur.fetchone()[0]

View File

@ -0,0 +1,8 @@
import sqlite3
import datetime
con = sqlite3.connect(":memory:", detect_types=sqlite3.PARSE_COLNAMES)
cur = con.cursor()
cur.execute('select ? as "x [timestamp]"', (datetime.datetime.now(),))
dt = cur.fetchone()[0]
print dt, type(dt)

View File

@ -0,0 +1,20 @@
import sqlite3
import datetime
con = sqlite3.connect(":memory:", detect_types=sqlite3.PARSE_DECLTYPES|sqlite3.PARSE_COLNAMES)
cur = con.cursor()
cur.execute("create table test(d date, ts timestamp)")
today = datetime.date.today()
now = datetime.datetime.now()
cur.execute("insert into test(d, ts) values (?, ?)", (today, now))
cur.execute("select d, ts from test")
row = cur.fetchone()
print today, "=>", row[0], type(row[0])
print now, "=>", row[1], type(row[1])
cur.execute('select current_date as "d [date]", current_timestamp as "ts [timestamp]"')
row = cur.fetchone()
print "current_date", row[0], type(row[0])
print "current_timestamp", row[1], type(row[1])

View File

@ -0,0 +1,13 @@
import sqlite3
def dict_factory(cursor, row):
d = {}
for idx, col in enumerate(cursor.description):
d[col[0]] = row[idx]
return d
con = sqlite3.connect(":memory:")
con.row_factory = dict_factory
cur = con.cursor()
cur.execute("select 1 as a")
print cur.fetchone()["a"]

View File

@ -0,0 +1,12 @@
import sqlite3
con = sqlite3.connect("mydb")
con.row_factory = sqlite3.Row
cur = con.cursor()
cur.execute("select name_last, age from people")
for row in cur:
assert row[0] == row["name_last"]
assert row["name_last"] == row["nAmE_lAsT"]
assert row[1] == row["age"]
assert row[1] == row["AgE"]

View File

@ -0,0 +1,6 @@
import sqlite3
# The shared cache is only available in SQLite versions 3.3.3 or later
# See the SQLite documentaton for details.
sqlite3.enable_shared_cache(True)

View File

@ -0,0 +1,22 @@
import sqlite3
persons = [
("Hugo", "Boss"),
("Calvin", "Klein")
]
con = sqlite3.connect(":memory:")
# Create the table
con.execute("create table person(firstname, lastname)")
# Fill the table
con.executemany("insert into person(firstname, lastname) values (?, ?)", persons)
# Print the table contents
for row in con.execute("select firstname, lastname from person"):
print row
# Using a dummy WHERE clause to not let SQLite take the shortcut table deletes.
print "I just deleted", con.execute("delete from person where 1=1").rowcount, "rows"

View File

@ -0,0 +1,26 @@
import sqlite3
FIELD_MAX_WIDTH = 20
TABLE_NAME = 'people'
SELECT = 'select * from %s order by age, name_last' % TABLE_NAME
con = sqlite3.connect("mydb")
cur = con.cursor()
cur.execute(SELECT)
# Print a header.
for fieldDesc in cur.description:
print fieldDesc[0].ljust(FIELD_MAX_WIDTH) ,
print # Finish the header with a newline.
print '-' * 78
# For each row, print the value of each field left-justified within
# the maximum possible width of that field.
fieldIndices = range(len(cur.description))
for row in cur:
for fieldIndex in fieldIndices:
fieldValue = str(row[fieldIndex])
print fieldValue.ljust(FIELD_MAX_WIDTH) ,
print # Finish the row with a newline.

View File

@ -0,0 +1,43 @@
import sqlite3
con = sqlite3.connect(":memory:")
cur = con.cursor()
# Create the table
con.execute("create table person(lastname, firstname)")
AUSTRIA = u"\xd6sterreich"
# by default, rows are returned as Unicode
cur.execute("select ?", (AUSTRIA,))
row = cur.fetchone()
assert row[0] == AUSTRIA
# but we can make pysqlite always return bytestrings ...
con.text_factory = str
cur.execute("select ?", (AUSTRIA,))
row = cur.fetchone()
assert type(row[0]) == str
# the bytestrings will be encoded in UTF-8, unless you stored garbage in the
# database ...
assert row[0] == AUSTRIA.encode("utf-8")
# we can also implement a custom text_factory ...
# here we implement one that will ignore Unicode characters that cannot be
# decoded from UTF-8
con.text_factory = lambda x: unicode(x, "utf-8", "ignore")
cur.execute("select ?", ("this is latin1 and would normally create errors" + u"\xe4\xf6\xfc".encode("latin1"),))
row = cur.fetchone()
assert type(row[0]) == unicode
# pysqlite offers a builtin optimized text_factory that will return bytestring
# objects, if the data is in ASCII only, and otherwise return unicode objects
con.text_factory = sqlite3.OptimizedUnicode
cur.execute("select ?", (AUSTRIA,))
row = cur.fetchone()
assert type(row[0]) == unicode
cur.execute("select ?", ("Germany",))
row = cur.fetchone()
assert type(row[0]) == str