cpython/Lib/test/test_interpreters/test_queues.py

479 lines
15 KiB
Python

import importlib
import pickle
import threading
from textwrap import dedent
import unittest
import time
from test.support import import_helper, Py_DEBUG
# Raise SkipTest if subinterpreters not supported.
_queues = import_helper.import_module('_interpqueues')
from test.support import interpreters
from test.support.interpreters import queues
from .utils import _run_output, TestBase as _TestBase
def get_num_queues():
return len(_queues.list_all())
class TestBase(_TestBase):
def tearDown(self):
for qid, _ in _queues.list_all():
try:
_queues.destroy(qid)
except Exception:
pass
class LowLevelTests(TestBase):
# The behaviors in the low-level module are important in as much
# as they are exercised by the high-level module. Therefore the
# most important testing happens in the high-level tests.
# These low-level tests cover corner cases that are not
# encountered by the high-level module, thus they
# mostly shouldn't matter as much.
def test_highlevel_reloaded(self):
# See gh-115490 (https://github.com/python/cpython/issues/115490).
importlib.reload(queues)
def test_create_destroy(self):
qid = _queues.create(2, 0)
_queues.destroy(qid)
self.assertEqual(get_num_queues(), 0)
with self.assertRaises(queues.QueueNotFoundError):
_queues.get(qid)
with self.assertRaises(queues.QueueNotFoundError):
_queues.destroy(qid)
def test_not_destroyed(self):
# It should have cleaned up any remaining queues.
stdout, stderr = self.assert_python_ok(
'-c',
dedent(f"""
import {_queues.__name__} as _queues
_queues.create(2, 0)
"""),
)
self.assertEqual(stdout, '')
if Py_DEBUG:
self.assertNotEqual(stderr, '')
else:
self.assertEqual(stderr, '')
def test_bind_release(self):
with self.subTest('typical'):
qid = _queues.create(2, 0)
_queues.bind(qid)
_queues.release(qid)
self.assertEqual(get_num_queues(), 0)
with self.subTest('bind too much'):
qid = _queues.create(2, 0)
_queues.bind(qid)
_queues.bind(qid)
_queues.release(qid)
_queues.destroy(qid)
self.assertEqual(get_num_queues(), 0)
with self.subTest('nested'):
qid = _queues.create(2, 0)
_queues.bind(qid)
_queues.bind(qid)
_queues.release(qid)
_queues.release(qid)
self.assertEqual(get_num_queues(), 0)
with self.subTest('release without binding'):
qid = _queues.create(2, 0)
with self.assertRaises(queues.QueueError):
_queues.release(qid)
class QueueTests(TestBase):
def test_create(self):
with self.subTest('vanilla'):
queue = queues.create()
self.assertEqual(queue.maxsize, 0)
with self.subTest('small maxsize'):
queue = queues.create(3)
self.assertEqual(queue.maxsize, 3)
with self.subTest('big maxsize'):
queue = queues.create(100)
self.assertEqual(queue.maxsize, 100)
with self.subTest('no maxsize'):
queue = queues.create(0)
self.assertEqual(queue.maxsize, 0)
with self.subTest('negative maxsize'):
queue = queues.create(-10)
self.assertEqual(queue.maxsize, -10)
with self.subTest('bad maxsize'):
with self.assertRaises(TypeError):
queues.create('1')
def test_shareable(self):
queue1 = queues.create()
interp = interpreters.create()
interp.exec(dedent(f"""
from test.support.interpreters import queues
queue1 = queues.Queue({queue1.id})
"""));
with self.subTest('same interpreter'):
queue2 = queues.create()
queue1.put(queue2, syncobj=True)
queue3 = queue1.get()
self.assertIs(queue3, queue2)
with self.subTest('from current interpreter'):
queue4 = queues.create()
queue1.put(queue4, syncobj=True)
out = _run_output(interp, dedent("""
queue4 = queue1.get()
print(queue4.id)
"""))
qid = int(out)
self.assertEqual(qid, queue4.id)
with self.subTest('from subinterpreter'):
out = _run_output(interp, dedent("""
queue5 = queues.create()
queue1.put(queue5, syncobj=True)
print(queue5.id)
"""))
qid = int(out)
queue5 = queue1.get()
self.assertEqual(queue5.id, qid)
def test_id_type(self):
queue = queues.create()
self.assertIsInstance(queue.id, int)
def test_custom_id(self):
with self.assertRaises(queues.QueueNotFoundError):
queues.Queue(1_000_000)
def test_id_readonly(self):
queue = queues.create()
with self.assertRaises(AttributeError):
queue.id = 1_000_000
def test_maxsize_readonly(self):
queue = queues.create(10)
with self.assertRaises(AttributeError):
queue.maxsize = 1_000_000
def test_hashable(self):
queue = queues.create()
expected = hash(queue.id)
actual = hash(queue)
self.assertEqual(actual, expected)
def test_equality(self):
queue1 = queues.create()
queue2 = queues.create()
self.assertEqual(queue1, queue1)
self.assertNotEqual(queue1, queue2)
def test_pickle(self):
queue = queues.create()
data = pickle.dumps(queue)
unpickled = pickle.loads(data)
self.assertEqual(unpickled, queue)
class TestQueueOps(TestBase):
def test_empty(self):
queue = queues.create()
before = queue.empty()
queue.put(None, syncobj=True)
during = queue.empty()
queue.get()
after = queue.empty()
self.assertIs(before, True)
self.assertIs(during, False)
self.assertIs(after, True)
def test_full(self):
expected = [False, False, False, True, False, False, False]
actual = []
queue = queues.create(3)
for _ in range(3):
actual.append(queue.full())
queue.put(None, syncobj=True)
actual.append(queue.full())
for _ in range(3):
queue.get()
actual.append(queue.full())
self.assertEqual(actual, expected)
def test_qsize(self):
expected = [0, 1, 2, 3, 2, 3, 2, 1, 0, 1, 0]
actual = []
queue = queues.create()
for _ in range(3):
actual.append(queue.qsize())
queue.put(None, syncobj=True)
actual.append(queue.qsize())
queue.get()
actual.append(queue.qsize())
queue.put(None, syncobj=True)
actual.append(queue.qsize())
for _ in range(3):
queue.get()
actual.append(queue.qsize())
queue.put(None, syncobj=True)
actual.append(queue.qsize())
queue.get()
actual.append(queue.qsize())
self.assertEqual(actual, expected)
def test_put_get_main(self):
expected = list(range(20))
for syncobj in (True, False):
kwds = dict(syncobj=syncobj)
with self.subTest(f'syncobj={syncobj}'):
queue = queues.create()
for i in range(20):
queue.put(i, **kwds)
actual = [queue.get() for _ in range(20)]
self.assertEqual(actual, expected)
def test_put_timeout(self):
for syncobj in (True, False):
kwds = dict(syncobj=syncobj)
with self.subTest(f'syncobj={syncobj}'):
queue = queues.create(2)
queue.put(None, **kwds)
queue.put(None, **kwds)
with self.assertRaises(queues.QueueFull):
queue.put(None, timeout=0.1, **kwds)
queue.get()
queue.put(None, **kwds)
def test_put_nowait(self):
for syncobj in (True, False):
kwds = dict(syncobj=syncobj)
with self.subTest(f'syncobj={syncobj}'):
queue = queues.create(2)
queue.put_nowait(None, **kwds)
queue.put_nowait(None, **kwds)
with self.assertRaises(queues.QueueFull):
queue.put_nowait(None, **kwds)
queue.get()
queue.put_nowait(None, **kwds)
def test_put_syncobj(self):
for obj in [
None,
True,
10,
'spam',
b'spam',
(0, 'a'),
]:
with self.subTest(repr(obj)):
queue = queues.create()
queue.put(obj, syncobj=True)
obj2 = queue.get()
self.assertEqual(obj2, obj)
queue.put(obj, syncobj=True)
obj2 = queue.get_nowait()
self.assertEqual(obj2, obj)
for obj in [
[1, 2, 3],
{'a': 13, 'b': 17},
]:
with self.subTest(repr(obj)):
queue = queues.create()
with self.assertRaises(interpreters.NotShareableError):
queue.put(obj, syncobj=True)
def test_put_not_syncobj(self):
for obj in [
None,
True,
10,
'spam',
b'spam',
(0, 'a'),
# not shareable
[1, 2, 3],
{'a': 13, 'b': 17},
]:
with self.subTest(repr(obj)):
queue = queues.create()
queue.put(obj, syncobj=False)
obj2 = queue.get()
self.assertEqual(obj2, obj)
queue.put(obj, syncobj=False)
obj2 = queue.get_nowait()
self.assertEqual(obj2, obj)
def test_get_timeout(self):
queue = queues.create()
with self.assertRaises(queues.QueueEmpty):
queue.get(timeout=0.1)
def test_get_nowait(self):
queue = queues.create()
with self.assertRaises(queues.QueueEmpty):
queue.get_nowait()
def test_put_get_default_syncobj(self):
expected = list(range(20))
queue = queues.create(syncobj=True)
for methname in ('get', 'get_nowait'):
with self.subTest(f'{methname}()'):
get = getattr(queue, methname)
for i in range(20):
queue.put(i)
actual = [get() for _ in range(20)]
self.assertEqual(actual, expected)
obj = [1, 2, 3] # lists are not shareable
with self.assertRaises(interpreters.NotShareableError):
queue.put(obj)
def test_put_get_default_not_syncobj(self):
expected = list(range(20))
queue = queues.create(syncobj=False)
for methname in ('get', 'get_nowait'):
with self.subTest(f'{methname}()'):
get = getattr(queue, methname)
for i in range(20):
queue.put(i)
actual = [get() for _ in range(20)]
self.assertEqual(actual, expected)
obj = [1, 2, 3] # lists are not shareable
queue.put(obj)
obj2 = get()
self.assertEqual(obj, obj2)
self.assertIsNot(obj, obj2)
def test_put_get_same_interpreter(self):
interp = interpreters.create()
interp.exec(dedent("""
from test.support.interpreters import queues
queue = queues.create()
"""))
for methname in ('get', 'get_nowait'):
with self.subTest(f'{methname}()'):
interp.exec(dedent(f"""
orig = b'spam'
queue.put(orig, syncobj=True)
obj = queue.{methname}()
assert obj == orig, 'expected: obj == orig'
assert obj is not orig, 'expected: obj is not orig'
"""))
def test_put_get_different_interpreters(self):
interp = interpreters.create()
queue1 = queues.create()
queue2 = queues.create()
self.assertEqual(len(queues.list_all()), 2)
for methname in ('get', 'get_nowait'):
with self.subTest(f'{methname}()'):
obj1 = b'spam'
queue1.put(obj1, syncobj=True)
out = _run_output(
interp,
dedent(f"""
from test.support.interpreters import queues
queue1 = queues.Queue({queue1.id})
queue2 = queues.Queue({queue2.id})
assert queue1.qsize() == 1, 'expected: queue1.qsize() == 1'
obj = queue1.{methname}()
assert queue1.qsize() == 0, 'expected: queue1.qsize() == 0'
assert obj == b'spam', 'expected: obj == obj1'
# When going to another interpreter we get a copy.
assert id(obj) != {id(obj1)}, 'expected: obj is not obj1'
obj2 = b'eggs'
print(id(obj2))
assert queue2.qsize() == 0, 'expected: queue2.qsize() == 0'
queue2.put(obj2, syncobj=True)
assert queue2.qsize() == 1, 'expected: queue2.qsize() == 1'
"""))
self.assertEqual(len(queues.list_all()), 2)
self.assertEqual(queue1.qsize(), 0)
self.assertEqual(queue2.qsize(), 1)
get = getattr(queue2, methname)
obj2 = get()
self.assertEqual(obj2, b'eggs')
self.assertNotEqual(id(obj2), int(out))
def test_put_cleared_with_subinterpreter(self):
interp = interpreters.create()
queue = queues.create()
out = _run_output(
interp,
dedent(f"""
from test.support.interpreters import queues
queue = queues.Queue({queue.id})
obj1 = b'spam'
obj2 = b'eggs'
queue.put(obj1, syncobj=True)
queue.put(obj2, syncobj=True)
"""))
self.assertEqual(queue.qsize(), 2)
obj1 = queue.get()
self.assertEqual(obj1, b'spam')
self.assertEqual(queue.qsize(), 1)
del interp
self.assertEqual(queue.qsize(), 0)
def test_put_get_different_threads(self):
queue1 = queues.create()
queue2 = queues.create()
def f():
while True:
try:
obj = queue1.get(timeout=0.1)
break
except queues.QueueEmpty:
continue
queue2.put(obj, syncobj=True)
t = threading.Thread(target=f)
t.start()
orig = b'spam'
queue1.put(orig, syncobj=True)
obj = queue2.get()
t.join()
self.assertEqual(obj, orig)
self.assertIsNot(obj, orig)
if __name__ == '__main__':
# Test needs to be a package, so we can do relative imports.
unittest.main()