mirror of https://github.com/python/cpython
156 lines
5.5 KiB
Python
156 lines
5.5 KiB
Python
"""sqlite3 CLI tests."""
|
|
import sqlite3
|
|
import unittest
|
|
|
|
from sqlite3.__main__ import main as cli
|
|
from test.support.os_helper import TESTFN, unlink
|
|
from test.support import captured_stdout, captured_stderr, captured_stdin
|
|
|
|
|
|
class CommandLineInterface(unittest.TestCase):
|
|
|
|
def _do_test(self, *args, expect_success=True):
|
|
with (
|
|
captured_stdout() as out,
|
|
captured_stderr() as err,
|
|
self.assertRaises(SystemExit) as cm
|
|
):
|
|
cli(args)
|
|
return out.getvalue(), err.getvalue(), cm.exception.code
|
|
|
|
def expect_success(self, *args):
|
|
out, err, code = self._do_test(*args)
|
|
self.assertEqual(code, 0,
|
|
"\n".join([f"Unexpected failure: {args=}", out, err]))
|
|
self.assertEqual(err, "")
|
|
return out
|
|
|
|
def expect_failure(self, *args):
|
|
out, err, code = self._do_test(*args, expect_success=False)
|
|
self.assertNotEqual(code, 0,
|
|
"\n".join([f"Unexpected failure: {args=}", out, err]))
|
|
self.assertEqual(out, "")
|
|
return err
|
|
|
|
def test_cli_help(self):
|
|
out = self.expect_success("-h")
|
|
self.assertIn("usage: python -m sqlite3", out)
|
|
|
|
def test_cli_version(self):
|
|
out = self.expect_success("-v")
|
|
self.assertIn(sqlite3.sqlite_version, out)
|
|
|
|
def test_cli_execute_sql(self):
|
|
out = self.expect_success(":memory:", "select 1")
|
|
self.assertIn("(1,)", out)
|
|
|
|
def test_cli_execute_too_much_sql(self):
|
|
stderr = self.expect_failure(":memory:", "select 1; select 2")
|
|
err = "ProgrammingError: You can only execute one statement at a time"
|
|
self.assertIn(err, stderr)
|
|
|
|
def test_cli_execute_incomplete_sql(self):
|
|
stderr = self.expect_failure(":memory:", "sel")
|
|
self.assertIn("OperationalError (SQLITE_ERROR)", stderr)
|
|
|
|
def test_cli_on_disk_db(self):
|
|
self.addCleanup(unlink, TESTFN)
|
|
out = self.expect_success(TESTFN, "create table t(t)")
|
|
self.assertEqual(out, "")
|
|
out = self.expect_success(TESTFN, "select count(t) from t")
|
|
self.assertIn("(0,)", out)
|
|
|
|
|
|
class InteractiveSession(unittest.TestCase):
|
|
MEMORY_DB_MSG = "Connected to a transient in-memory database"
|
|
PS1 = "sqlite> "
|
|
PS2 = "... "
|
|
|
|
def run_cli(self, *args, commands=()):
|
|
with (
|
|
captured_stdin() as stdin,
|
|
captured_stdout() as stdout,
|
|
captured_stderr() as stderr,
|
|
self.assertRaises(SystemExit) as cm
|
|
):
|
|
for cmd in commands:
|
|
stdin.write(cmd + "\n")
|
|
stdin.seek(0)
|
|
cli(args)
|
|
|
|
out = stdout.getvalue()
|
|
err = stderr.getvalue()
|
|
self.assertEqual(cm.exception.code, 0,
|
|
f"Unexpected failure: {args=}\n{out}\n{err}")
|
|
return out, err
|
|
|
|
def test_interact(self):
|
|
out, err = self.run_cli()
|
|
self.assertIn(self.MEMORY_DB_MSG, err)
|
|
self.assertIn(self.MEMORY_DB_MSG, err)
|
|
self.assertTrue(out.endswith(self.PS1))
|
|
self.assertEqual(out.count(self.PS1), 1)
|
|
self.assertEqual(out.count(self.PS2), 0)
|
|
|
|
def test_interact_quit(self):
|
|
out, err = self.run_cli(commands=(".quit",))
|
|
self.assertIn(self.MEMORY_DB_MSG, err)
|
|
self.assertTrue(out.endswith(self.PS1))
|
|
self.assertEqual(out.count(self.PS1), 1)
|
|
self.assertEqual(out.count(self.PS2), 0)
|
|
|
|
def test_interact_version(self):
|
|
out, err = self.run_cli(commands=(".version",))
|
|
self.assertIn(self.MEMORY_DB_MSG, err)
|
|
self.assertIn(sqlite3.sqlite_version + "\n", out)
|
|
self.assertTrue(out.endswith(self.PS1))
|
|
self.assertEqual(out.count(self.PS1), 2)
|
|
self.assertEqual(out.count(self.PS2), 0)
|
|
self.assertIn(sqlite3.sqlite_version, out)
|
|
|
|
def test_interact_valid_sql(self):
|
|
out, err = self.run_cli(commands=("SELECT 1;",))
|
|
self.assertIn(self.MEMORY_DB_MSG, err)
|
|
self.assertIn("(1,)\n", out)
|
|
self.assertTrue(out.endswith(self.PS1))
|
|
self.assertEqual(out.count(self.PS1), 2)
|
|
self.assertEqual(out.count(self.PS2), 0)
|
|
|
|
def test_interact_incomplete_multiline_sql(self):
|
|
out, err = self.run_cli(commands=("SELECT 1",))
|
|
self.assertIn(self.MEMORY_DB_MSG, err)
|
|
self.assertTrue(out.endswith(self.PS2))
|
|
self.assertEqual(out.count(self.PS1), 1)
|
|
self.assertEqual(out.count(self.PS2), 1)
|
|
|
|
def test_interact_valid_multiline_sql(self):
|
|
out, err = self.run_cli(commands=("SELECT 1\n;",))
|
|
self.assertIn(self.MEMORY_DB_MSG, err)
|
|
self.assertIn(self.PS2, out)
|
|
self.assertIn("(1,)\n", out)
|
|
self.assertTrue(out.endswith(self.PS1))
|
|
self.assertEqual(out.count(self.PS1), 2)
|
|
self.assertEqual(out.count(self.PS2), 1)
|
|
|
|
def test_interact_invalid_sql(self):
|
|
out, err = self.run_cli(commands=("sel;",))
|
|
self.assertIn(self.MEMORY_DB_MSG, err)
|
|
self.assertIn("OperationalError (SQLITE_ERROR)", err)
|
|
self.assertTrue(out.endswith(self.PS1))
|
|
self.assertEqual(out.count(self.PS1), 2)
|
|
self.assertEqual(out.count(self.PS2), 0)
|
|
|
|
def test_interact_on_disk_file(self):
|
|
self.addCleanup(unlink, TESTFN)
|
|
|
|
out, err = self.run_cli(TESTFN, commands=("CREATE TABLE t(t);",))
|
|
self.assertIn(TESTFN, err)
|
|
self.assertTrue(out.endswith(self.PS1))
|
|
|
|
out, _ = self.run_cli(TESTFN, commands=("SELECT count(t) FROM t;",))
|
|
self.assertIn("(0,)\n", out)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|