cpython/Lib/test/test_contextlib.py

369 lines
10 KiB
Python
Raw Normal View History

"""Unit tests for contextlib.py, and other context managers."""
from __future__ import with_statement
import sys
import os
import decimal
import tempfile
import unittest
import threading
from contextlib import * # Tests __all__
from test.test_support import run_suite
class ContextManagerTestCase(unittest.TestCase):
def test_contextfactory_plain(self):
state = []
@contextfactory
def woohoo():
state.append(1)
yield 42
state.append(999)
with woohoo() as x:
self.assertEqual(state, [1])
self.assertEqual(x, 42)
state.append(x)
self.assertEqual(state, [1, 42, 999])
def test_contextfactory_finally(self):
state = []
@contextfactory
def woohoo():
state.append(1)
try:
yield 42
finally:
state.append(999)
try:
with woohoo() as x:
self.assertEqual(state, [1])
self.assertEqual(x, 42)
state.append(x)
raise ZeroDivisionError()
except ZeroDivisionError:
pass
else:
self.fail("Expected ZeroDivisionError")
self.assertEqual(state, [1, 42, 999])
def test_contextfactory_no_reraise(self):
@contextfactory
def whee():
yield
ctx = whee().__context__()
ctx.__enter__()
# Calling __exit__ should not result in an exception
self.failIf(ctx.__exit__(TypeError, TypeError("foo"), None))
def test_contextfactory_trap_yield_after_throw(self):
@contextfactory
def whoo():
try:
yield
except:
yield
ctx = whoo().__context__()
ctx.__enter__()
self.assertRaises(
RuntimeError, ctx.__exit__, TypeError, TypeError("foo"), None
)
def test_contextfactory_except(self):
state = []
@contextfactory
def woohoo():
state.append(1)
try:
yield 42
except ZeroDivisionError, e:
state.append(e.args[0])
self.assertEqual(state, [1, 42, 999])
with woohoo() as x:
self.assertEqual(state, [1])
self.assertEqual(x, 42)
state.append(x)
raise ZeroDivisionError(999)
self.assertEqual(state, [1, 42, 999])
def test_contextfactory_attribs(self):
def attribs(**kw):
def decorate(func):
for k,v in kw.items():
setattr(func,k,v)
return func
return decorate
@contextfactory
@attribs(foo='bar')
def baz(spam):
"""Whee!"""
self.assertEqual(baz.__name__,'baz')
self.assertEqual(baz.foo, 'bar')
self.assertEqual(baz.__doc__, "Whee!")
class NestedTestCase(unittest.TestCase):
# XXX This needs more work
def test_nested(self):
@contextfactory
def a():
yield 1
@contextfactory
def b():
yield 2
@contextfactory
def c():
yield 3
with nested(a(), b(), c()) as (x, y, z):
self.assertEqual(x, 1)
self.assertEqual(y, 2)
self.assertEqual(z, 3)
def test_nested_cleanup(self):
state = []
@contextfactory
def a():
state.append(1)
try:
yield 2
finally:
state.append(3)
@contextfactory
def b():
state.append(4)
try:
yield 5
finally:
state.append(6)
try:
with nested(a(), b()) as (x, y):
state.append(x)
state.append(y)
1/0
except ZeroDivisionError:
self.assertEqual(state, [1, 4, 2, 5, 6, 3])
else:
self.fail("Didn't raise ZeroDivisionError")
def test_nested_right_exception(self):
state = []
@contextfactory
def a():
yield 1
class b(object):
def __context__(self):
return self
def __enter__(self):
return 2
def __exit__(self, *exc_info):
try:
raise Exception()
except:
pass
try:
with nested(a(), b()) as (x, y):
1/0
except ZeroDivisionError:
self.assertEqual((x, y), (1, 2))
except Exception:
self.fail("Reraised wrong exception")
else:
self.fail("Didn't raise ZeroDivisionError")
def test_nested_b_swallows(self):
@contextfactory
def a():
yield
@contextfactory
def b():
try:
yield
except:
# Swallow the exception
pass
try:
with nested(a(), b()):
1/0
except ZeroDivisionError:
self.fail("Didn't swallow ZeroDivisionError")
def test_nested_break(self):
@contextfactory
def a():
yield
state = 0
while True:
state += 1
with nested(a(), a()):
break
state += 10
self.assertEqual(state, 1)
def test_nested_continue(self):
@contextfactory
def a():
yield
state = 0
while state < 3:
state += 1
with nested(a(), a()):
continue
state += 10
self.assertEqual(state, 3)
def test_nested_return(self):
@contextfactory
def a():
try:
yield
except:
pass
def foo():
with nested(a(), a()):
return 1
return 10
self.assertEqual(foo(), 1)
class ClosingTestCase(unittest.TestCase):
# XXX This needs more work
def test_closing(self):
state = []
class C:
def close(self):
state.append(1)
x = C()
self.assertEqual(state, [])
with closing(x) as y:
self.assertEqual(x, y)
self.assertEqual(state, [1])
def test_closing_error(self):
state = []
class C:
def close(self):
state.append(1)
x = C()
self.assertEqual(state, [])
try:
with closing(x) as y:
self.assertEqual(x, y)
1/0
except ZeroDivisionError:
self.assertEqual(state, [1])
else:
self.fail("Didn't raise ZeroDivisionError")
class FileContextTestCase(unittest.TestCase):
def testWithOpen(self):
tfn = tempfile.mktemp()
try:
f = None
with open(tfn, "w") as f:
self.failIf(f.closed)
f.write("Booh\n")
self.failUnless(f.closed)
f = None
try:
with open(tfn, "r") as f:
self.failIf(f.closed)
self.assertEqual(f.read(), "Booh\n")
1/0
except ZeroDivisionError:
self.failUnless(f.closed)
else:
self.fail("Didn't raise ZeroDivisionError")
finally:
try:
os.remove(tfn)
except os.error:
pass
class LockContextTestCase(unittest.TestCase):
def boilerPlate(self, lock, locked):
self.failIf(locked())
with lock:
self.failUnless(locked())
self.failIf(locked())
try:
with lock:
self.failUnless(locked())
1/0
except ZeroDivisionError:
self.failIf(locked())
else:
self.fail("Didn't raise ZeroDivisionError")
def testWithLock(self):
lock = threading.Lock()
self.boilerPlate(lock, lock.locked)
def testWithRLock(self):
lock = threading.RLock()
self.boilerPlate(lock, lock._is_owned)
def testWithCondition(self):
lock = threading.Condition()
def locked():
return lock._is_owned()
self.boilerPlate(lock, locked)
def testWithSemaphore(self):
lock = threading.Semaphore()
def locked():
if lock.acquire(False):
lock.release()
return False
else:
return True
self.boilerPlate(lock, locked)
def testWithBoundedSemaphore(self):
lock = threading.BoundedSemaphore()
def locked():
if lock.acquire(False):
lock.release()
return False
else:
return True
self.boilerPlate(lock, locked)
class DecimalContextTestCase(unittest.TestCase):
# XXX Somebody should write more thorough tests for this
def testBasic(self):
ctx = decimal.getcontext()
orig_context = ctx.copy()
try:
ctx.prec = save_prec = decimal.ExtendedContext.prec + 5
with decimal.ExtendedContext:
self.assertEqual(decimal.getcontext().prec,
decimal.ExtendedContext.prec)
self.assertEqual(decimal.getcontext().prec, save_prec)
try:
with decimal.ExtendedContext:
self.assertEqual(decimal.getcontext().prec,
decimal.ExtendedContext.prec)
1/0
except ZeroDivisionError:
self.assertEqual(decimal.getcontext().prec, save_prec)
else:
self.fail("Didn't raise ZeroDivisionError")
finally:
decimal.setcontext(orig_context)
# This is needed to make the test actually run under regrtest.py!
def test_main():
run_suite(
unittest.defaultTestLoader.loadTestsFromModule(sys.modules[__name__])
)
if __name__ == "__main__":
test_main()