Issue #14373: Fixed threaded test for lru_cache(). Added new threaded test.

This commit is contained in:
Serhiy Storchaka 2015-06-08 11:14:31 +03:00
parent 93cfeb93ac
commit 77cb197aaa
1 changed files with 47 additions and 14 deletions

View File

@ -1102,43 +1102,76 @@ class TestLRU:
@unittest.skipUnless(threading, 'This test requires threading.')
def test_lru_cache_threaded(self):
n, m = 5, 11
def orig(x, y):
return 3 * x + y
f = self.module.lru_cache(maxsize=20)(orig)
f = self.module.lru_cache(maxsize=n*m)(orig)
hits, misses, maxsize, currsize = f.cache_info()
self.assertEqual(currsize, 0)
start = threading.Event()
def full(f, *args):
for _ in range(10):
start.wait(10)
for _ in range(m):
f(*args)
def clear(f):
for _ in range(10):
def clear():
start.wait(10)
for _ in range(2*m):
f.cache_clear()
orig_si = sys.getswitchinterval()
sys.setswitchinterval(1e-6)
try:
# create 5 threads in order to fill cache
# create n threads in order to fill cache
threads = [threading.Thread(target=full, args=[f, k, k])
for k in range(5)]
for k in range(n)]
with support.start_threads(threads):
pass
start.set()
hits, misses, maxsize, currsize = f.cache_info()
self.assertEqual(hits, 45)
self.assertEqual(misses, 5)
self.assertEqual(currsize, 5)
self.assertLessEqual(misses, n)
self.assertEqual(hits, m*n - misses)
self.assertEqual(currsize, n)
# create 5 threads in order to fill cache and 1 to clear it
threads = [threading.Thread(target=clear, args=[f])]
# create n threads in order to fill cache and 1 to clear it
threads = [threading.Thread(target=clear)]
threads += [threading.Thread(target=full, args=[f, k, k])
for k in range(5)]
for k in range(n)]
start.clear()
with support.start_threads(threads):
pass
start.set()
finally:
sys.setswitchinterval(orig_si)
@unittest.skipUnless(threading, 'This test requires threading.')
def test_lru_cache_threaded2(self):
# Simultaneous call with the same arguments
n, m = 5, 7
start = threading.Barrier(n+1)
pause = threading.Barrier(n+1)
stop = threading.Barrier(n+1)
@self.module.lru_cache(maxsize=m*n)
def f(x):
pause.wait(10)
return 3 * x
self.assertEqual(f.cache_info(), (0, 0, m*n, 0))
def test():
for i in range(m):
start.wait(10)
self.assertEqual(f(i), 3 * i)
stop.wait(10)
threads = [threading.Thread(target=test) for k in range(n)]
with support.start_threads(threads):
for i in range(m):
start.wait(10)
stop.reset()
pause.wait(10)
start.reset()
stop.wait(10)
pause.reset()
self.assertEqual(f.cache_info(), (0, (i+1)*n, m*n, i+1))
def test_need_for_rlock(self):
# This will deadlock on an LRU cache that uses a regular lock