mirror of https://github.com/python/cpython
601 lines
19 KiB
Python
601 lines
19 KiB
Python
import importlib
|
|
import pickle
|
|
import threading
|
|
from textwrap import dedent
|
|
import unittest
|
|
import time
|
|
|
|
from test.support import import_helper
|
|
# Raise SkipTest if subinterpreters not supported.
|
|
_channels = import_helper.import_module('_interpchannels')
|
|
from test.support import interpreters
|
|
from test.support.interpreters import channels
|
|
from .utils import _run_output, TestBase
|
|
|
|
|
|
class LowLevelTests(TestBase):
|
|
|
|
# The behaviors in the low-level module is important in as much
|
|
# as it is exercised by the high-level module. Therefore the
|
|
# most # important testing happens in the high-level tests.
|
|
# These low-level tests cover corner cases that are not
|
|
# encountered by the high-level module, thus they
|
|
# mostly shouldn't matter as much.
|
|
|
|
# Additional tests are found in Lib/test/test__interpchannels.py.
|
|
# XXX Those should be either moved to LowLevelTests or eliminated
|
|
# in favor of high-level tests in this file.
|
|
|
|
def test_highlevel_reloaded(self):
|
|
# See gh-115490 (https://github.com/python/cpython/issues/115490).
|
|
importlib.reload(channels)
|
|
|
|
|
|
class TestChannels(TestBase):
|
|
|
|
def test_create(self):
|
|
r, s = channels.create()
|
|
self.assertIsInstance(r, channels.RecvChannel)
|
|
self.assertIsInstance(s, channels.SendChannel)
|
|
|
|
def test_list_all(self):
|
|
self.assertEqual(channels.list_all(), [])
|
|
created = set()
|
|
for _ in range(3):
|
|
ch = channels.create()
|
|
created.add(ch)
|
|
after = set(channels.list_all())
|
|
self.assertEqual(after, created)
|
|
|
|
def test_shareable(self):
|
|
interp = interpreters.create()
|
|
rch, sch = channels.create()
|
|
|
|
self.assertTrue(
|
|
interpreters.is_shareable(rch))
|
|
self.assertTrue(
|
|
interpreters.is_shareable(sch))
|
|
|
|
sch.send_nowait(rch)
|
|
sch.send_nowait(sch)
|
|
rch2 = rch.recv()
|
|
sch2 = rch.recv()
|
|
|
|
interp.prepare_main(rch=rch, sch=sch)
|
|
sch.send_nowait(rch)
|
|
sch.send_nowait(sch)
|
|
interp.exec(dedent("""
|
|
rch2 = rch.recv()
|
|
sch2 = rch.recv()
|
|
assert rch2 == rch
|
|
assert sch2 == sch
|
|
|
|
sch.send_nowait(rch2)
|
|
sch.send_nowait(sch2)
|
|
"""))
|
|
rch3 = rch.recv()
|
|
sch3 = rch.recv()
|
|
|
|
self.assertEqual(rch2, rch)
|
|
self.assertEqual(sch2, sch)
|
|
self.assertEqual(rch3, rch)
|
|
self.assertEqual(sch3, sch)
|
|
|
|
def test_is_closed(self):
|
|
rch, sch = channels.create()
|
|
rbefore = rch.is_closed
|
|
sbefore = sch.is_closed
|
|
rch.close()
|
|
rafter = rch.is_closed
|
|
safter = sch.is_closed
|
|
|
|
self.assertFalse(rbefore)
|
|
self.assertFalse(sbefore)
|
|
self.assertTrue(rafter)
|
|
self.assertTrue(safter)
|
|
|
|
|
|
class TestRecvChannelAttrs(TestBase):
|
|
|
|
def test_id_type(self):
|
|
rch, _ = channels.create()
|
|
self.assertIsInstance(rch.id, _channels.ChannelID)
|
|
|
|
def test_custom_id(self):
|
|
rch = channels.RecvChannel(1)
|
|
self.assertEqual(rch.id, 1)
|
|
|
|
with self.assertRaises(TypeError):
|
|
channels.RecvChannel('1')
|
|
|
|
def test_id_readonly(self):
|
|
rch = channels.RecvChannel(1)
|
|
with self.assertRaises(AttributeError):
|
|
rch.id = 2
|
|
|
|
def test_equality(self):
|
|
ch1, _ = channels.create()
|
|
ch2, _ = channels.create()
|
|
self.assertEqual(ch1, ch1)
|
|
self.assertNotEqual(ch1, ch2)
|
|
|
|
def test_pickle(self):
|
|
ch, _ = channels.create()
|
|
data = pickle.dumps(ch)
|
|
unpickled = pickle.loads(data)
|
|
self.assertEqual(unpickled, ch)
|
|
|
|
|
|
class TestSendChannelAttrs(TestBase):
|
|
|
|
def test_id_type(self):
|
|
_, sch = channels.create()
|
|
self.assertIsInstance(sch.id, _channels.ChannelID)
|
|
|
|
def test_custom_id(self):
|
|
sch = channels.SendChannel(1)
|
|
self.assertEqual(sch.id, 1)
|
|
|
|
with self.assertRaises(TypeError):
|
|
channels.SendChannel('1')
|
|
|
|
def test_id_readonly(self):
|
|
sch = channels.SendChannel(1)
|
|
with self.assertRaises(AttributeError):
|
|
sch.id = 2
|
|
|
|
def test_equality(self):
|
|
_, ch1 = channels.create()
|
|
_, ch2 = channels.create()
|
|
self.assertEqual(ch1, ch1)
|
|
self.assertNotEqual(ch1, ch2)
|
|
|
|
def test_pickle(self):
|
|
_, ch = channels.create()
|
|
data = pickle.dumps(ch)
|
|
unpickled = pickle.loads(data)
|
|
self.assertEqual(unpickled, ch)
|
|
|
|
|
|
class TestSendRecv(TestBase):
|
|
|
|
def test_send_recv_main(self):
|
|
r, s = channels.create()
|
|
orig = b'spam'
|
|
s.send_nowait(orig)
|
|
obj = r.recv()
|
|
|
|
self.assertEqual(obj, orig)
|
|
self.assertIsNot(obj, orig)
|
|
|
|
def test_send_recv_same_interpreter(self):
|
|
interp = interpreters.create()
|
|
interp.exec(dedent("""
|
|
from test.support.interpreters import channels
|
|
r, s = channels.create()
|
|
orig = b'spam'
|
|
s.send_nowait(orig)
|
|
obj = r.recv()
|
|
assert obj == orig, 'expected: obj == orig'
|
|
assert obj is not orig, 'expected: obj is not orig'
|
|
"""))
|
|
|
|
@unittest.skip('broken (see BPO-...)')
|
|
def test_send_recv_different_interpreters(self):
|
|
r1, s1 = channels.create()
|
|
r2, s2 = channels.create()
|
|
orig1 = b'spam'
|
|
s1.send_nowait(orig1)
|
|
out = _run_output(
|
|
interpreters.create(),
|
|
dedent(f"""
|
|
obj1 = r.recv()
|
|
assert obj1 == b'spam', 'expected: obj1 == orig1'
|
|
# When going to another interpreter we get a copy.
|
|
assert id(obj1) != {id(orig1)}, 'expected: obj1 is not orig1'
|
|
orig2 = b'eggs'
|
|
print(id(orig2))
|
|
s.send_nowait(orig2)
|
|
"""),
|
|
channels=dict(r=r1, s=s2),
|
|
)
|
|
obj2 = r2.recv()
|
|
|
|
self.assertEqual(obj2, b'eggs')
|
|
self.assertNotEqual(id(obj2), int(out))
|
|
|
|
def test_send_recv_different_threads(self):
|
|
r, s = channels.create()
|
|
|
|
def f():
|
|
while True:
|
|
try:
|
|
obj = r.recv()
|
|
break
|
|
except channels.ChannelEmptyError:
|
|
time.sleep(0.1)
|
|
s.send(obj)
|
|
t = threading.Thread(target=f)
|
|
t.start()
|
|
|
|
orig = b'spam'
|
|
s.send(orig)
|
|
obj = r.recv()
|
|
t.join()
|
|
|
|
self.assertEqual(obj, orig)
|
|
self.assertIsNot(obj, orig)
|
|
|
|
def test_send_recv_nowait_main(self):
|
|
r, s = channels.create()
|
|
orig = b'spam'
|
|
s.send_nowait(orig)
|
|
obj = r.recv_nowait()
|
|
|
|
self.assertEqual(obj, orig)
|
|
self.assertIsNot(obj, orig)
|
|
|
|
def test_send_recv_nowait_main_with_default(self):
|
|
r, _ = channels.create()
|
|
obj = r.recv_nowait(None)
|
|
|
|
self.assertIsNone(obj)
|
|
|
|
def test_send_recv_nowait_same_interpreter(self):
|
|
interp = interpreters.create()
|
|
interp.exec(dedent("""
|
|
from test.support.interpreters import channels
|
|
r, s = channels.create()
|
|
orig = b'spam'
|
|
s.send_nowait(orig)
|
|
obj = r.recv_nowait()
|
|
assert obj == orig, 'expected: obj == orig'
|
|
# When going back to the same interpreter we get the same object.
|
|
assert obj is not orig, 'expected: obj is not orig'
|
|
"""))
|
|
|
|
@unittest.skip('broken (see BPO-...)')
|
|
def test_send_recv_nowait_different_interpreters(self):
|
|
r1, s1 = channels.create()
|
|
r2, s2 = channels.create()
|
|
orig1 = b'spam'
|
|
s1.send_nowait(orig1)
|
|
out = _run_output(
|
|
interpreters.create(),
|
|
dedent(f"""
|
|
obj1 = r.recv_nowait()
|
|
assert obj1 == b'spam', 'expected: obj1 == orig1'
|
|
# When going to another interpreter we get a copy.
|
|
assert id(obj1) != {id(orig1)}, 'expected: obj1 is not orig1'
|
|
orig2 = b'eggs'
|
|
print(id(orig2))
|
|
s.send_nowait(orig2)
|
|
"""),
|
|
channels=dict(r=r1, s=s2),
|
|
)
|
|
obj2 = r2.recv_nowait()
|
|
|
|
self.assertEqual(obj2, b'eggs')
|
|
self.assertNotEqual(id(obj2), int(out))
|
|
|
|
def test_recv_timeout(self):
|
|
r, _ = channels.create()
|
|
with self.assertRaises(TimeoutError):
|
|
r.recv(timeout=1)
|
|
|
|
def test_recv_channel_does_not_exist(self):
|
|
ch = channels.RecvChannel(1_000_000)
|
|
with self.assertRaises(channels.ChannelNotFoundError):
|
|
ch.recv()
|
|
|
|
def test_send_channel_does_not_exist(self):
|
|
ch = channels.SendChannel(1_000_000)
|
|
with self.assertRaises(channels.ChannelNotFoundError):
|
|
ch.send(b'spam')
|
|
|
|
def test_recv_nowait_channel_does_not_exist(self):
|
|
ch = channels.RecvChannel(1_000_000)
|
|
with self.assertRaises(channels.ChannelNotFoundError):
|
|
ch.recv_nowait()
|
|
|
|
def test_send_nowait_channel_does_not_exist(self):
|
|
ch = channels.SendChannel(1_000_000)
|
|
with self.assertRaises(channels.ChannelNotFoundError):
|
|
ch.send_nowait(b'spam')
|
|
|
|
def test_recv_nowait_empty(self):
|
|
ch, _ = channels.create()
|
|
with self.assertRaises(channels.ChannelEmptyError):
|
|
ch.recv_nowait()
|
|
|
|
def test_recv_nowait_default(self):
|
|
default = object()
|
|
rch, sch = channels.create()
|
|
obj1 = rch.recv_nowait(default)
|
|
sch.send_nowait(None)
|
|
sch.send_nowait(1)
|
|
sch.send_nowait(b'spam')
|
|
sch.send_nowait(b'eggs')
|
|
obj2 = rch.recv_nowait(default)
|
|
obj3 = rch.recv_nowait(default)
|
|
obj4 = rch.recv_nowait()
|
|
obj5 = rch.recv_nowait(default)
|
|
obj6 = rch.recv_nowait(default)
|
|
|
|
self.assertIs(obj1, default)
|
|
self.assertIs(obj2, None)
|
|
self.assertEqual(obj3, 1)
|
|
self.assertEqual(obj4, b'spam')
|
|
self.assertEqual(obj5, b'eggs')
|
|
self.assertIs(obj6, default)
|
|
|
|
def test_send_buffer(self):
|
|
buf = bytearray(b'spamspamspam')
|
|
obj = None
|
|
rch, sch = channels.create()
|
|
|
|
def f():
|
|
nonlocal obj
|
|
while True:
|
|
try:
|
|
obj = rch.recv()
|
|
break
|
|
except channels.ChannelEmptyError:
|
|
time.sleep(0.1)
|
|
t = threading.Thread(target=f)
|
|
t.start()
|
|
|
|
sch.send_buffer(buf)
|
|
t.join()
|
|
|
|
self.assertIsNot(obj, buf)
|
|
self.assertIsInstance(obj, memoryview)
|
|
self.assertEqual(obj, buf)
|
|
|
|
buf[4:8] = b'eggs'
|
|
self.assertEqual(obj, buf)
|
|
obj[4:8] = b'ham.'
|
|
self.assertEqual(obj, buf)
|
|
|
|
def test_send_buffer_nowait(self):
|
|
buf = bytearray(b'spamspamspam')
|
|
rch, sch = channels.create()
|
|
sch.send_buffer_nowait(buf)
|
|
obj = rch.recv()
|
|
|
|
self.assertIsNot(obj, buf)
|
|
self.assertIsInstance(obj, memoryview)
|
|
self.assertEqual(obj, buf)
|
|
|
|
buf[4:8] = b'eggs'
|
|
self.assertEqual(obj, buf)
|
|
obj[4:8] = b'ham.'
|
|
self.assertEqual(obj, buf)
|
|
|
|
def test_send_cleared_with_subinterpreter(self):
|
|
def common(rch, sch, unbound=None, presize=0):
|
|
if not unbound:
|
|
extraargs = ''
|
|
elif unbound is channels.UNBOUND:
|
|
extraargs = ', unbound=channels.UNBOUND'
|
|
elif unbound is channels.UNBOUND_ERROR:
|
|
extraargs = ', unbound=channels.UNBOUND_ERROR'
|
|
elif unbound is channels.UNBOUND_REMOVE:
|
|
extraargs = ', unbound=channels.UNBOUND_REMOVE'
|
|
else:
|
|
raise NotImplementedError(repr(unbound))
|
|
interp = interpreters.create()
|
|
|
|
_run_output(interp, dedent(f"""
|
|
from test.support.interpreters import channels
|
|
sch = channels.SendChannel({sch.id})
|
|
obj1 = b'spam'
|
|
obj2 = b'eggs'
|
|
sch.send_nowait(obj1{extraargs})
|
|
sch.send_nowait(obj2{extraargs})
|
|
"""))
|
|
self.assertEqual(
|
|
_channels.get_count(rch.id),
|
|
presize + 2,
|
|
)
|
|
|
|
if presize == 0:
|
|
obj1 = rch.recv()
|
|
self.assertEqual(obj1, b'spam')
|
|
self.assertEqual(
|
|
_channels.get_count(rch.id),
|
|
presize + 1,
|
|
)
|
|
|
|
return interp
|
|
|
|
with self.subTest('default'): # UNBOUND
|
|
rch, sch = channels.create()
|
|
interp = common(rch, sch)
|
|
del interp
|
|
self.assertEqual(_channels.get_count(rch.id), 1)
|
|
obj1 = rch.recv()
|
|
self.assertEqual(_channels.get_count(rch.id), 0)
|
|
self.assertIs(obj1, channels.UNBOUND)
|
|
self.assertEqual(_channels.get_count(rch.id), 0)
|
|
with self.assertRaises(channels.ChannelEmptyError):
|
|
rch.recv_nowait()
|
|
|
|
with self.subTest('UNBOUND'):
|
|
rch, sch = channels.create()
|
|
interp = common(rch, sch, channels.UNBOUND)
|
|
del interp
|
|
self.assertEqual(_channels.get_count(rch.id), 1)
|
|
obj1 = rch.recv()
|
|
self.assertIs(obj1, channels.UNBOUND)
|
|
self.assertEqual(_channels.get_count(rch.id), 0)
|
|
with self.assertRaises(channels.ChannelEmptyError):
|
|
rch.recv_nowait()
|
|
|
|
with self.subTest('UNBOUND_ERROR'):
|
|
rch, sch = channels.create()
|
|
interp = common(rch, sch, channels.UNBOUND_ERROR)
|
|
|
|
del interp
|
|
self.assertEqual(_channels.get_count(rch.id), 1)
|
|
with self.assertRaises(channels.ItemInterpreterDestroyed):
|
|
rch.recv()
|
|
|
|
self.assertEqual(_channels.get_count(rch.id), 0)
|
|
with self.assertRaises(channels.ChannelEmptyError):
|
|
rch.recv_nowait()
|
|
|
|
with self.subTest('UNBOUND_REMOVE'):
|
|
rch, sch = channels.create()
|
|
|
|
interp = common(rch, sch, channels.UNBOUND_REMOVE)
|
|
del interp
|
|
self.assertEqual(_channels.get_count(rch.id), 0)
|
|
with self.assertRaises(channels.ChannelEmptyError):
|
|
rch.recv_nowait()
|
|
|
|
sch.send_nowait(b'ham', unbound=channels.UNBOUND_REMOVE)
|
|
self.assertEqual(_channels.get_count(rch.id), 1)
|
|
interp = common(rch, sch, channels.UNBOUND_REMOVE, 1)
|
|
self.assertEqual(_channels.get_count(rch.id), 3)
|
|
sch.send_nowait(42, unbound=channels.UNBOUND_REMOVE)
|
|
self.assertEqual(_channels.get_count(rch.id), 4)
|
|
del interp
|
|
self.assertEqual(_channels.get_count(rch.id), 2)
|
|
obj1 = rch.recv()
|
|
obj2 = rch.recv()
|
|
self.assertEqual(obj1, b'ham')
|
|
self.assertEqual(obj2, 42)
|
|
self.assertEqual(_channels.get_count(rch.id), 0)
|
|
with self.assertRaises(channels.ChannelEmptyError):
|
|
rch.recv_nowait()
|
|
|
|
def test_send_cleared_with_subinterpreter_mixed(self):
|
|
rch, sch = channels.create()
|
|
interp = interpreters.create()
|
|
|
|
# If we don't associate the main interpreter with the channel
|
|
# then the channel will be automatically closed when interp
|
|
# is destroyed.
|
|
sch.send_nowait(None)
|
|
rch.recv()
|
|
self.assertEqual(_channels.get_count(rch.id), 0)
|
|
|
|
_run_output(interp, dedent(f"""
|
|
from test.support.interpreters import channels
|
|
sch = channels.SendChannel({sch.id})
|
|
sch.send_nowait(1, unbound=channels.UNBOUND)
|
|
sch.send_nowait(2, unbound=channels.UNBOUND_ERROR)
|
|
sch.send_nowait(3)
|
|
sch.send_nowait(4, unbound=channels.UNBOUND_REMOVE)
|
|
sch.send_nowait(5, unbound=channels.UNBOUND)
|
|
"""))
|
|
self.assertEqual(_channels.get_count(rch.id), 5)
|
|
|
|
del interp
|
|
self.assertEqual(_channels.get_count(rch.id), 4)
|
|
|
|
obj1 = rch.recv()
|
|
self.assertIs(obj1, channels.UNBOUND)
|
|
self.assertEqual(_channels.get_count(rch.id), 3)
|
|
|
|
with self.assertRaises(channels.ItemInterpreterDestroyed):
|
|
rch.recv()
|
|
self.assertEqual(_channels.get_count(rch.id), 2)
|
|
|
|
obj2 = rch.recv()
|
|
self.assertIs(obj2, channels.UNBOUND)
|
|
self.assertEqual(_channels.get_count(rch.id), 1)
|
|
|
|
obj3 = rch.recv()
|
|
self.assertIs(obj3, channels.UNBOUND)
|
|
self.assertEqual(_channels.get_count(rch.id), 0)
|
|
|
|
def test_send_cleared_with_subinterpreter_multiple(self):
|
|
rch, sch = channels.create()
|
|
interp1 = interpreters.create()
|
|
interp2 = interpreters.create()
|
|
|
|
sch.send_nowait(1)
|
|
_run_output(interp1, dedent(f"""
|
|
from test.support.interpreters import channels
|
|
rch = channels.RecvChannel({rch.id})
|
|
sch = channels.SendChannel({sch.id})
|
|
obj1 = rch.recv()
|
|
sch.send_nowait(2, unbound=channels.UNBOUND)
|
|
sch.send_nowait(obj1, unbound=channels.UNBOUND_REMOVE)
|
|
"""))
|
|
_run_output(interp2, dedent(f"""
|
|
from test.support.interpreters import channels
|
|
rch = channels.RecvChannel({rch.id})
|
|
sch = channels.SendChannel({sch.id})
|
|
obj2 = rch.recv()
|
|
obj1 = rch.recv()
|
|
"""))
|
|
self.assertEqual(_channels.get_count(rch.id), 0)
|
|
sch.send_nowait(3)
|
|
_run_output(interp1, dedent("""
|
|
sch.send_nowait(4, unbound=channels.UNBOUND)
|
|
# interp closed here
|
|
sch.send_nowait(5, unbound=channels.UNBOUND_REMOVE)
|
|
sch.send_nowait(6, unbound=channels.UNBOUND)
|
|
"""))
|
|
_run_output(interp2, dedent("""
|
|
sch.send_nowait(7, unbound=channels.UNBOUND_ERROR)
|
|
# interp closed here
|
|
sch.send_nowait(obj1, unbound=channels.UNBOUND_ERROR)
|
|
sch.send_nowait(obj2, unbound=channels.UNBOUND_REMOVE)
|
|
sch.send_nowait(8, unbound=channels.UNBOUND)
|
|
"""))
|
|
_run_output(interp1, dedent("""
|
|
sch.send_nowait(9, unbound=channels.UNBOUND_REMOVE)
|
|
sch.send_nowait(10, unbound=channels.UNBOUND)
|
|
"""))
|
|
self.assertEqual(_channels.get_count(rch.id), 10)
|
|
|
|
obj3 = rch.recv()
|
|
self.assertEqual(obj3, 3)
|
|
self.assertEqual(_channels.get_count(rch.id), 9)
|
|
|
|
obj4 = rch.recv()
|
|
self.assertEqual(obj4, 4)
|
|
self.assertEqual(_channels.get_count(rch.id), 8)
|
|
|
|
del interp1
|
|
self.assertEqual(_channels.get_count(rch.id), 6)
|
|
|
|
# obj5 was removed
|
|
|
|
obj6 = rch.recv()
|
|
self.assertIs(obj6, channels.UNBOUND)
|
|
self.assertEqual(_channels.get_count(rch.id), 5)
|
|
|
|
obj7 = rch.recv()
|
|
self.assertEqual(obj7, 7)
|
|
self.assertEqual(_channels.get_count(rch.id), 4)
|
|
|
|
del interp2
|
|
self.assertEqual(_channels.get_count(rch.id), 3)
|
|
|
|
# obj1
|
|
with self.assertRaises(channels.ItemInterpreterDestroyed):
|
|
rch.recv()
|
|
self.assertEqual(_channels.get_count(rch.id), 2)
|
|
|
|
# obj2 was removed
|
|
|
|
obj8 = rch.recv()
|
|
self.assertIs(obj8, channels.UNBOUND)
|
|
self.assertEqual(_channels.get_count(rch.id), 1)
|
|
|
|
# obj9 was removed
|
|
|
|
obj10 = rch.recv()
|
|
self.assertIs(obj10, channels.UNBOUND)
|
|
self.assertEqual(_channels.get_count(rch.id), 0)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
# Test needs to be a package, so we can do relative imports.
|
|
unittest.main()
|