gh-76785: Expand How Interpreter Channels Handle Interpreter Finalization (gh-121805)

See 6b98b274b6 for an explanation of the problem and solution.  Here I've applied the solution to channels.
This commit is contained in:
Eric Snow 2024-07-15 13:43:59 -06:00 committed by GitHub
parent fd085a411e
commit 8b209fd4f8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 898 additions and 306 deletions

View File

@ -0,0 +1,102 @@
"""Common code between queues and channels."""
class ItemInterpreterDestroyed(Exception):
"""Raised when trying to get an item whose interpreter was destroyed."""
class classonly:
"""A non-data descriptor that makes a value only visible on the class.
This is like the "classmethod" builtin, but does not show up on
instances of the class. It may be used as a decorator.
"""
def __init__(self, value):
self.value = value
self.getter = classmethod(value).__get__
self.name = None
def __set_name__(self, cls, name):
if self.name is not None:
raise TypeError('already used')
self.name = name
def __get__(self, obj, cls):
if obj is not None:
raise AttributeError(self.name)
# called on the class
return self.getter(None, cls)
class UnboundItem:
"""Represents a cross-interpreter item no longer bound to an interpreter.
An item is unbound when the interpreter that added it to the
cross-interpreter container is destroyed.
"""
__slots__ = ()
@classonly
def singleton(cls, kind, module, name='UNBOUND'):
doc = cls.__doc__.replace('cross-interpreter container', kind)
doc = doc.replace('cross-interpreter', kind)
subclass = type(
f'Unbound{kind.capitalize()}Item',
(cls,),
dict(
_MODULE=module,
_NAME=name,
__doc__=doc,
),
)
return object.__new__(subclass)
_MODULE = __name__
_NAME = 'UNBOUND'
def __new__(cls):
raise Exception(f'use {cls._MODULE}.{cls._NAME}')
def __repr__(self):
return f'{self._MODULE}.{self._NAME}'
# return f'interpreters.queues.UNBOUND'
UNBOUND = object.__new__(UnboundItem)
UNBOUND_ERROR = object()
UNBOUND_REMOVE = object()
_UNBOUND_CONSTANT_TO_FLAG = {
UNBOUND_REMOVE: 1,
UNBOUND_ERROR: 2,
UNBOUND: 3,
}
_UNBOUND_FLAG_TO_CONSTANT = {v: k
for k, v in _UNBOUND_CONSTANT_TO_FLAG.items()}
def serialize_unbound(unbound):
op = unbound
try:
flag = _UNBOUND_CONSTANT_TO_FLAG[op]
except KeyError:
raise NotImplementedError(f'unsupported unbound replacement op {op!r}')
return flag,
def resolve_unbound(flag, exctype_destroyed):
try:
op = _UNBOUND_FLAG_TO_CONSTANT[flag]
except KeyError:
raise NotImplementedError(f'unsupported unbound replacement op {flag!r}')
if op is UNBOUND_REMOVE:
# "remove" not possible here
raise NotImplementedError
elif op is UNBOUND_ERROR:
raise exctype_destroyed("item's original interpreter destroyed")
elif op is UNBOUND:
return UNBOUND
else:
raise NotImplementedError(repr(op))

View File

@ -2,35 +2,68 @@
import time import time
import _interpchannels as _channels import _interpchannels as _channels
from . import _crossinterp
# aliases: # aliases:
from _interpchannels import ( from _interpchannels import (
ChannelError, ChannelNotFoundError, ChannelClosedError, ChannelError, ChannelNotFoundError, ChannelClosedError,
ChannelEmptyError, ChannelNotEmptyError, ChannelEmptyError, ChannelNotEmptyError,
) )
from ._crossinterp import (
UNBOUND_ERROR, UNBOUND_REMOVE,
)
__all__ = [ __all__ = [
'UNBOUND', 'UNBOUND_ERROR', 'UNBOUND_REMOVE',
'create', 'list_all', 'create', 'list_all',
'SendChannel', 'RecvChannel', 'SendChannel', 'RecvChannel',
'ChannelError', 'ChannelNotFoundError', 'ChannelEmptyError', 'ChannelError', 'ChannelNotFoundError', 'ChannelEmptyError',
'ItemInterpreterDestroyed',
] ]
def create(): class ItemInterpreterDestroyed(ChannelError,
_crossinterp.ItemInterpreterDestroyed):
"""Raised from get() and get_nowait()."""
UNBOUND = _crossinterp.UnboundItem.singleton('queue', __name__)
def _serialize_unbound(unbound):
if unbound is UNBOUND:
unbound = _crossinterp.UNBOUND
return _crossinterp.serialize_unbound(unbound)
def _resolve_unbound(flag):
resolved = _crossinterp.resolve_unbound(flag, ItemInterpreterDestroyed)
if resolved is _crossinterp.UNBOUND:
resolved = UNBOUND
return resolved
def create(*, unbounditems=UNBOUND):
"""Return (recv, send) for a new cross-interpreter channel. """Return (recv, send) for a new cross-interpreter channel.
The channel may be used to pass data safely between interpreters. The channel may be used to pass data safely between interpreters.
"unbounditems" sets the default for the send end of the channel.
See SendChannel.send() for supported values. The default value
is UNBOUND, which replaces the unbound item when received.
""" """
cid = _channels.create() unbound = _serialize_unbound(unbounditems)
recv, send = RecvChannel(cid), SendChannel(cid) unboundop, = unbound
cid = _channels.create(unboundop)
recv, send = RecvChannel(cid), SendChannel(cid, _unbound=unbound)
return recv, send return recv, send
def list_all(): def list_all():
"""Return a list of (recv, send) for all open channels.""" """Return a list of (recv, send) for all open channels."""
return [(RecvChannel(cid), SendChannel(cid)) return [(RecvChannel(cid), SendChannel(cid, _unbound=unbound))
for cid in _channels.list_all()] for cid, unbound in _channels.list_all()]
class _ChannelEnd: class _ChannelEnd:
@ -106,12 +139,15 @@ class RecvChannel(_ChannelEnd):
if timeout < 0: if timeout < 0:
raise ValueError(f'timeout value must be non-negative') raise ValueError(f'timeout value must be non-negative')
end = time.time() + timeout end = time.time() + timeout
obj = _channels.recv(self._id, _sentinel) obj, unboundop = _channels.recv(self._id, _sentinel)
while obj is _sentinel: while obj is _sentinel:
time.sleep(_delay) time.sleep(_delay)
if timeout is not None and time.time() >= end: if timeout is not None and time.time() >= end:
raise TimeoutError raise TimeoutError
obj = _channels.recv(self._id, _sentinel) obj, unboundop = _channels.recv(self._id, _sentinel)
if unboundop is not None:
assert obj is None, repr(obj)
return _resolve_unbound(unboundop)
return obj return obj
def recv_nowait(self, default=_NOT_SET): def recv_nowait(self, default=_NOT_SET):
@ -122,9 +158,13 @@ class RecvChannel(_ChannelEnd):
is the same as recv(). is the same as recv().
""" """
if default is _NOT_SET: if default is _NOT_SET:
return _channels.recv(self._id) obj, unboundop = _channels.recv(self._id)
else: else:
return _channels.recv(self._id, default) obj, unboundop = _channels.recv(self._id, default)
if unboundop is not None:
assert obj is None, repr(obj)
return _resolve_unbound(unboundop)
return obj
def close(self): def close(self):
_channels.close(self._id, recv=True) _channels.close(self._id, recv=True)
@ -135,43 +175,79 @@ class SendChannel(_ChannelEnd):
_end = 'send' _end = 'send'
def __new__(cls, cid, *, _unbound=None):
if _unbound is None:
try:
op = _channels.get_channel_defaults(cid)
_unbound = (op,)
except ChannelNotFoundError:
_unbound = _serialize_unbound(UNBOUND)
self = super().__new__(cls, cid)
self._unbound = _unbound
return self
@property @property
def is_closed(self): def is_closed(self):
info = self._info info = self._info
return info.closed or info.closing return info.closed or info.closing
def send(self, obj, timeout=None): def send(self, obj, timeout=None, *,
unbound=None,
):
"""Send the object (i.e. its data) to the channel's receiving end. """Send the object (i.e. its data) to the channel's receiving end.
This blocks until the object is received. This blocks until the object is received.
""" """
_channels.send(self._id, obj, timeout=timeout, blocking=True) if unbound is None:
unboundop, = self._unbound
else:
unboundop, = _serialize_unbound(unbound)
_channels.send(self._id, obj, unboundop, timeout=timeout, blocking=True)
def send_nowait(self, obj): def send_nowait(self, obj, *,
unbound=None,
):
"""Send the object to the channel's receiving end. """Send the object to the channel's receiving end.
If the object is immediately received then return True If the object is immediately received then return True
(else False). Otherwise this is the same as send(). (else False). Otherwise this is the same as send().
""" """
if unbound is None:
unboundop, = self._unbound
else:
unboundop, = _serialize_unbound(unbound)
# XXX Note that at the moment channel_send() only ever returns # XXX Note that at the moment channel_send() only ever returns
# None. This should be fixed when channel_send_wait() is added. # None. This should be fixed when channel_send_wait() is added.
# See bpo-32604 and gh-19829. # See bpo-32604 and gh-19829.
return _channels.send(self._id, obj, blocking=False) return _channels.send(self._id, obj, unboundop, blocking=False)
def send_buffer(self, obj, timeout=None): def send_buffer(self, obj, timeout=None, *,
unbound=None,
):
"""Send the object's buffer to the channel's receiving end. """Send the object's buffer to the channel's receiving end.
This blocks until the object is received. This blocks until the object is received.
""" """
_channels.send_buffer(self._id, obj, timeout=timeout, blocking=True) if unbound is None:
unboundop, = self._unbound
else:
unboundop, = _serialize_unbound(unbound)
_channels.send_buffer(self._id, obj, unboundop,
timeout=timeout, blocking=True)
def send_buffer_nowait(self, obj): def send_buffer_nowait(self, obj, *,
unbound=None,
):
"""Send the object's buffer to the channel's receiving end. """Send the object's buffer to the channel's receiving end.
If the object is immediately received then return True If the object is immediately received then return True
(else False). Otherwise this is the same as send(). (else False). Otherwise this is the same as send().
""" """
return _channels.send_buffer(self._id, obj, blocking=False) if unbound is None:
unboundop, = self._unbound
else:
unboundop, = _serialize_unbound(unbound)
return _channels.send_buffer(self._id, obj, unboundop, blocking=False)
def close(self): def close(self):
_channels.close(self._id, send=True) _channels.close(self._id, send=True)

View File

@ -5,11 +5,15 @@ import queue
import time import time
import weakref import weakref
import _interpqueues as _queues import _interpqueues as _queues
from . import _crossinterp
# aliases: # aliases:
from _interpqueues import ( from _interpqueues import (
QueueError, QueueNotFoundError, QueueError, QueueNotFoundError,
) )
from ._crossinterp import (
UNBOUND_ERROR, UNBOUND_REMOVE,
)
__all__ = [ __all__ = [
'UNBOUND', 'UNBOUND_ERROR', 'UNBOUND_REMOVE', 'UNBOUND', 'UNBOUND_ERROR', 'UNBOUND_REMOVE',
@ -34,7 +38,8 @@ class QueueFull(QueueError, queue.Full):
""" """
class ItemInterpreterDestroyed(QueueError): class ItemInterpreterDestroyed(QueueError,
_crossinterp.ItemInterpreterDestroyed):
"""Raised from get() and get_nowait().""" """Raised from get() and get_nowait()."""
@ -42,57 +47,20 @@ _SHARED_ONLY = 0
_PICKLED = 1 _PICKLED = 1
class UnboundItem: UNBOUND = _crossinterp.UnboundItem.singleton('queue', __name__)
"""Represents a Queue item no longer bound to an interpreter.
An item is unbound when the interpreter that added it to the queue
is destroyed.
"""
__slots__ = ()
def __new__(cls):
return UNBOUND
def __repr__(self):
return f'interpreters.queues.UNBOUND'
UNBOUND = object.__new__(UnboundItem)
UNBOUND_ERROR = object()
UNBOUND_REMOVE = object()
_UNBOUND_CONSTANT_TO_FLAG = {
UNBOUND_REMOVE: 1,
UNBOUND_ERROR: 2,
UNBOUND: 3,
}
_UNBOUND_FLAG_TO_CONSTANT = {v: k
for k, v in _UNBOUND_CONSTANT_TO_FLAG.items()}
def _serialize_unbound(unbound): def _serialize_unbound(unbound):
op = unbound if unbound is UNBOUND:
try: unbound = _crossinterp.UNBOUND
flag = _UNBOUND_CONSTANT_TO_FLAG[op] return _crossinterp.serialize_unbound(unbound)
except KeyError:
raise NotImplementedError(f'unsupported unbound replacement op {op!r}')
return flag,
def _resolve_unbound(flag): def _resolve_unbound(flag):
try: resolved = _crossinterp.resolve_unbound(flag, ItemInterpreterDestroyed)
op = _UNBOUND_FLAG_TO_CONSTANT[flag] if resolved is _crossinterp.UNBOUND:
except KeyError: resolved = UNBOUND
raise NotImplementedError(f'unsupported unbound replacement op {flag!r}') return resolved
if op is UNBOUND_REMOVE:
# "remove" not possible here
raise NotImplementedError
elif op is UNBOUND_ERROR:
raise ItemInterpreterDestroyed("item's original interpreter destroyed")
elif op is UNBOUND:
return UNBOUND
else:
raise NotImplementedError(repr(op))
def create(maxsize=0, *, syncobj=False, unbounditems=UNBOUND): def create(maxsize=0, *, syncobj=False, unbounditems=UNBOUND):

View File

@ -8,6 +8,8 @@ import unittest
from test.support import import_helper from test.support import import_helper
_channels = import_helper.import_module('_interpchannels')
from test.support.interpreters import _crossinterp
from test.test__interpreters import ( from test.test__interpreters import (
_interpreters, _interpreters,
_run_output, _run_output,
@ -15,7 +17,7 @@ from test.test__interpreters import (
) )
_channels = import_helper.import_module('_interpchannels') REPLACE = _crossinterp._UNBOUND_CONSTANT_TO_FLAG[_crossinterp.UNBOUND]
# Additional tests are found in Lib/test/test_interpreters/test_channels.py. # Additional tests are found in Lib/test/test_interpreters/test_channels.py.
@ -29,9 +31,19 @@ _channels = import_helper.import_module('_interpchannels')
def recv_wait(cid): def recv_wait(cid):
while True: while True:
try: try:
return _channels.recv(cid) obj, unboundop = _channels.recv(cid)
except _channels.ChannelEmptyError: except _channels.ChannelEmptyError:
time.sleep(0.1) time.sleep(0.1)
else:
assert unboundop is None, repr(unboundop)
return obj
def recv_nowait(cid, *args, unbound=False):
obj, unboundop = _channels.recv(cid, *args)
assert (unboundop is None) != unbound, repr(unboundop)
return obj
#@contextmanager #@contextmanager
#def run_threaded(id, source, **shared): #def run_threaded(id, source, **shared):
@ -212,7 +224,7 @@ def _run_action(cid, action, end, state):
else: else:
raise Exception('expected ChannelEmptyError') raise Exception('expected ChannelEmptyError')
else: else:
_channels.recv(cid) recv_nowait(cid)
return state.decr() return state.decr()
else: else:
raise ValueError(end) raise ValueError(end)
@ -235,7 +247,7 @@ def _run_action(cid, action, end, state):
def clean_up_channels(): def clean_up_channels():
for cid in _channels.list_all(): for cid, _ in _channels.list_all():
try: try:
_channels.destroy(cid) _channels.destroy(cid)
except _channels.ChannelNotFoundError: except _channels.ChannelNotFoundError:
@ -297,7 +309,7 @@ class ChannelIDTests(TestBase):
_channels._channel_id(10, send=False, recv=False) _channels._channel_id(10, send=False, recv=False)
def test_does_not_exist(self): def test_does_not_exist(self):
cid = _channels.create() cid = _channels.create(REPLACE)
with self.assertRaises(_channels.ChannelNotFoundError): with self.assertRaises(_channels.ChannelNotFoundError):
_channels._channel_id(int(cid) + 1) # unforced _channels._channel_id(int(cid) + 1) # unforced
@ -319,9 +331,9 @@ class ChannelIDTests(TestBase):
self.assertEqual(repr(cid), 'ChannelID(10)') self.assertEqual(repr(cid), 'ChannelID(10)')
def test_equality(self): def test_equality(self):
cid1 = _channels.create() cid1 = _channels.create(REPLACE)
cid2 = _channels._channel_id(int(cid1)) cid2 = _channels._channel_id(int(cid1))
cid3 = _channels.create() cid3 = _channels.create(REPLACE)
self.assertTrue(cid1 == cid1) self.assertTrue(cid1 == cid1)
self.assertTrue(cid1 == cid2) self.assertTrue(cid1 == cid2)
@ -341,11 +353,11 @@ class ChannelIDTests(TestBase):
self.assertTrue(cid1 != cid3) self.assertTrue(cid1 != cid3)
def test_shareable(self): def test_shareable(self):
chan = _channels.create() chan = _channels.create(REPLACE)
obj = _channels.create() obj = _channels.create(REPLACE)
_channels.send(chan, obj, blocking=False) _channels.send(chan, obj, blocking=False)
got = _channels.recv(chan) got = recv_nowait(chan)
self.assertEqual(got, obj) self.assertEqual(got, obj)
self.assertIs(type(got), type(obj)) self.assertIs(type(got), type(obj))
@ -356,15 +368,15 @@ class ChannelIDTests(TestBase):
class ChannelTests(TestBase): class ChannelTests(TestBase):
def test_create_cid(self): def test_create_cid(self):
cid = _channels.create() cid = _channels.create(REPLACE)
self.assertIsInstance(cid, _channels.ChannelID) self.assertIsInstance(cid, _channels.ChannelID)
def test_sequential_ids(self): def test_sequential_ids(self):
before = _channels.list_all() before = [cid for cid, _ in _channels.list_all()]
id1 = _channels.create() id1 = _channels.create(REPLACE)
id2 = _channels.create() id2 = _channels.create(REPLACE)
id3 = _channels.create() id3 = _channels.create(REPLACE)
after = _channels.list_all() after = [cid for cid, _ in _channels.list_all()]
self.assertEqual(id2, int(id1) + 1) self.assertEqual(id2, int(id1) + 1)
self.assertEqual(id3, int(id2) + 1) self.assertEqual(id3, int(id2) + 1)
@ -374,7 +386,7 @@ class ChannelTests(TestBase):
id1 = _interpreters.create() id1 = _interpreters.create()
out = _run_output(id1, dedent(""" out = _run_output(id1, dedent("""
import _interpchannels as _channels import _interpchannels as _channels
cid = _channels.create() cid = _channels.create(3)
print(cid) print(cid)
""")) """))
cid1 = int(out.strip()) cid1 = int(out.strip())
@ -382,7 +394,7 @@ class ChannelTests(TestBase):
id2 = _interpreters.create() id2 = _interpreters.create()
out = _run_output(id2, dedent(""" out = _run_output(id2, dedent("""
import _interpchannels as _channels import _interpchannels as _channels
cid = _channels.create() cid = _channels.create(3)
print(cid) print(cid)
""")) """))
cid2 = int(out.strip()) cid2 = int(out.strip())
@ -392,7 +404,7 @@ class ChannelTests(TestBase):
def test_channel_list_interpreters_none(self): def test_channel_list_interpreters_none(self):
"""Test listing interpreters for a channel with no associations.""" """Test listing interpreters for a channel with no associations."""
# Test for channel with no associated _interpreters. # Test for channel with no associated _interpreters.
cid = _channels.create() cid = _channels.create(REPLACE)
send_interps = _channels.list_interpreters(cid, send=True) send_interps = _channels.list_interpreters(cid, send=True)
recv_interps = _channels.list_interpreters(cid, send=False) recv_interps = _channels.list_interpreters(cid, send=False)
self.assertEqual(send_interps, []) self.assertEqual(send_interps, [])
@ -401,7 +413,7 @@ class ChannelTests(TestBase):
def test_channel_list_interpreters_basic(self): def test_channel_list_interpreters_basic(self):
"""Test basic listing channel _interpreters.""" """Test basic listing channel _interpreters."""
interp0, *_ = _interpreters.get_main() interp0, *_ = _interpreters.get_main()
cid = _channels.create() cid = _channels.create(REPLACE)
_channels.send(cid, "send", blocking=False) _channels.send(cid, "send", blocking=False)
# Test for a channel that has one end associated to an interpreter. # Test for a channel that has one end associated to an interpreter.
send_interps = _channels.list_interpreters(cid, send=True) send_interps = _channels.list_interpreters(cid, send=True)
@ -412,7 +424,7 @@ class ChannelTests(TestBase):
interp1 = _interpreters.create() interp1 = _interpreters.create()
_run_output(interp1, dedent(f""" _run_output(interp1, dedent(f"""
import _interpchannels as _channels import _interpchannels as _channels
obj = _channels.recv({cid}) _channels.recv({cid})
""")) """))
# Test for channel that has both ends associated to an interpreter. # Test for channel that has both ends associated to an interpreter.
send_interps = _channels.list_interpreters(cid, send=True) send_interps = _channels.list_interpreters(cid, send=True)
@ -426,7 +438,7 @@ class ChannelTests(TestBase):
interp1 = _interpreters.create() interp1 = _interpreters.create()
interp2 = _interpreters.create() interp2 = _interpreters.create()
interp3 = _interpreters.create() interp3 = _interpreters.create()
cid = _channels.create() cid = _channels.create(REPLACE)
_channels.send(cid, "send", blocking=False) _channels.send(cid, "send", blocking=False)
_run_output(interp1, dedent(f""" _run_output(interp1, dedent(f"""
@ -435,11 +447,11 @@ class ChannelTests(TestBase):
""")) """))
_run_output(interp2, dedent(f""" _run_output(interp2, dedent(f"""
import _interpchannels as _channels import _interpchannels as _channels
obj = _channels.recv({cid}) _channels.recv({cid})
""")) """))
_run_output(interp3, dedent(f""" _run_output(interp3, dedent(f"""
import _interpchannels as _channels import _interpchannels as _channels
obj = _channels.recv({cid}) _channels.recv({cid})
""")) """))
send_interps = _channels.list_interpreters(cid, send=True) send_interps = _channels.list_interpreters(cid, send=True)
recv_interps = _channels.list_interpreters(cid, send=False) recv_interps = _channels.list_interpreters(cid, send=False)
@ -450,11 +462,11 @@ class ChannelTests(TestBase):
"""Test listing channel interpreters with a destroyed interpreter.""" """Test listing channel interpreters with a destroyed interpreter."""
interp0, *_ = _interpreters.get_main() interp0, *_ = _interpreters.get_main()
interp1 = _interpreters.create() interp1 = _interpreters.create()
cid = _channels.create() cid = _channels.create(REPLACE)
_channels.send(cid, "send", blocking=False) _channels.send(cid, "send", blocking=False)
_run_output(interp1, dedent(f""" _run_output(interp1, dedent(f"""
import _interpchannels as _channels import _interpchannels as _channels
obj = _channels.recv({cid}) _channels.recv({cid})
""")) """))
# Should be one interpreter associated with each end. # Should be one interpreter associated with each end.
send_interps = _channels.list_interpreters(cid, send=True) send_interps = _channels.list_interpreters(cid, send=True)
@ -476,16 +488,16 @@ class ChannelTests(TestBase):
interp0, *_ = _interpreters.get_main() interp0, *_ = _interpreters.get_main()
interp1 = _interpreters.create() interp1 = _interpreters.create()
interp2 = _interpreters.create() interp2 = _interpreters.create()
cid = _channels.create() cid = _channels.create(REPLACE)
_channels.send(cid, "data", blocking=False) _channels.send(cid, "data", blocking=False)
_run_output(interp1, dedent(f""" _run_output(interp1, dedent(f"""
import _interpchannels as _channels import _interpchannels as _channels
obj = _channels.recv({cid}) _channels.recv({cid})
""")) """))
_channels.send(cid, "data", blocking=False) _channels.send(cid, "data", blocking=False)
_run_output(interp2, dedent(f""" _run_output(interp2, dedent(f"""
import _interpchannels as _channels import _interpchannels as _channels
obj = _channels.recv({cid}) _channels.recv({cid})
""")) """))
# Check the setup. # Check the setup.
send_interps = _channels.list_interpreters(cid, send=True) send_interps = _channels.list_interpreters(cid, send=True)
@ -516,7 +528,7 @@ class ChannelTests(TestBase):
"""Test listing channel interpreters with a closed channel.""" """Test listing channel interpreters with a closed channel."""
interp0, *_ = _interpreters.get_main() interp0, *_ = _interpreters.get_main()
interp1 = _interpreters.create() interp1 = _interpreters.create()
cid = _channels.create() cid = _channels.create(REPLACE)
# Put something in the channel so that it's not empty. # Put something in the channel so that it's not empty.
_channels.send(cid, "send", blocking=False) _channels.send(cid, "send", blocking=False)
@ -538,7 +550,7 @@ class ChannelTests(TestBase):
"""Test listing channel interpreters with a channel's send end closed.""" """Test listing channel interpreters with a channel's send end closed."""
interp0, *_ = _interpreters.get_main() interp0, *_ = _interpreters.get_main()
interp1 = _interpreters.create() interp1 = _interpreters.create()
cid = _channels.create() cid = _channels.create(REPLACE)
# Put something in the channel so that it's not empty. # Put something in the channel so that it's not empty.
_channels.send(cid, "send", blocking=False) _channels.send(cid, "send", blocking=False)
@ -570,7 +582,7 @@ class ChannelTests(TestBase):
_channels.list_interpreters(cid, send=False) _channels.list_interpreters(cid, send=False)
def test_allowed_types(self): def test_allowed_types(self):
cid = _channels.create() cid = _channels.create(REPLACE)
objects = [ objects = [
None, None,
'spam', 'spam',
@ -580,7 +592,7 @@ class ChannelTests(TestBase):
for obj in objects: for obj in objects:
with self.subTest(obj): with self.subTest(obj):
_channels.send(cid, obj, blocking=False) _channels.send(cid, obj, blocking=False)
got = _channels.recv(cid) got = recv_nowait(cid)
self.assertEqual(got, obj) self.assertEqual(got, obj)
self.assertIs(type(got), type(obj)) self.assertIs(type(got), type(obj))
@ -589,7 +601,7 @@ class ChannelTests(TestBase):
# XXX What about between interpreters? # XXX What about between interpreters?
def test_run_string_arg_unresolved(self): def test_run_string_arg_unresolved(self):
cid = _channels.create() cid = _channels.create(REPLACE)
interp = _interpreters.create() interp = _interpreters.create()
_interpreters.set___main___attrs(interp, dict(cid=cid.send)) _interpreters.set___main___attrs(interp, dict(cid=cid.send))
@ -598,7 +610,7 @@ class ChannelTests(TestBase):
print(cid.end) print(cid.end)
_channels.send(cid, b'spam', blocking=False) _channels.send(cid, b'spam', blocking=False)
""")) """))
obj = _channels.recv(cid) obj = recv_nowait(cid)
self.assertEqual(obj, b'spam') self.assertEqual(obj, b'spam')
self.assertEqual(out.strip(), 'send') self.assertEqual(out.strip(), 'send')
@ -608,7 +620,7 @@ class ChannelTests(TestBase):
# Note: this test caused crashes on some buildbots (bpo-33615). # Note: this test caused crashes on some buildbots (bpo-33615).
@unittest.skip('disabled until high-level channels exist') @unittest.skip('disabled until high-level channels exist')
def test_run_string_arg_resolved(self): def test_run_string_arg_resolved(self):
cid = _channels.create() cid = _channels.create(REPLACE)
cid = _channels._channel_id(cid, _resolve=True) cid = _channels._channel_id(cid, _resolve=True)
interp = _interpreters.create() interp = _interpreters.create()
@ -618,7 +630,7 @@ class ChannelTests(TestBase):
_channels.send(chan.id, b'spam', blocking=False) _channels.send(chan.id, b'spam', blocking=False)
"""), """),
dict(chan=cid.send)) dict(chan=cid.send))
obj = _channels.recv(cid) obj = recv_nowait(cid)
self.assertEqual(obj, b'spam') self.assertEqual(obj, b'spam')
self.assertEqual(out.strip(), 'send') self.assertEqual(out.strip(), 'send')
@ -627,10 +639,10 @@ class ChannelTests(TestBase):
# send/recv # send/recv
def test_send_recv_main(self): def test_send_recv_main(self):
cid = _channels.create() cid = _channels.create(REPLACE)
orig = b'spam' orig = b'spam'
_channels.send(cid, orig, blocking=False) _channels.send(cid, orig, blocking=False)
obj = _channels.recv(cid) obj = recv_nowait(cid)
self.assertEqual(obj, orig) self.assertEqual(obj, orig)
self.assertIsNot(obj, orig) self.assertIsNot(obj, orig)
@ -639,27 +651,27 @@ class ChannelTests(TestBase):
id1 = _interpreters.create() id1 = _interpreters.create()
out = _run_output(id1, dedent(""" out = _run_output(id1, dedent("""
import _interpchannels as _channels import _interpchannels as _channels
cid = _channels.create() cid = _channels.create(REPLACE)
orig = b'spam' orig = b'spam'
_channels.send(cid, orig, blocking=False) _channels.send(cid, orig, blocking=False)
obj = _channels.recv(cid) obj, _ = _channels.recv(cid)
assert obj is not orig assert obj is not orig
assert obj == orig assert obj == orig
""")) """))
def test_send_recv_different_interpreters(self): def test_send_recv_different_interpreters(self):
cid = _channels.create() cid = _channels.create(REPLACE)
id1 = _interpreters.create() id1 = _interpreters.create()
out = _run_output(id1, dedent(f""" out = _run_output(id1, dedent(f"""
import _interpchannels as _channels import _interpchannels as _channels
_channels.send({cid}, b'spam', blocking=False) _channels.send({cid}, b'spam', blocking=False)
""")) """))
obj = _channels.recv(cid) obj = recv_nowait(cid)
self.assertEqual(obj, b'spam') self.assertEqual(obj, b'spam')
def test_send_recv_different_threads(self): def test_send_recv_different_threads(self):
cid = _channels.create() cid = _channels.create(REPLACE)
def f(): def f():
obj = recv_wait(cid) obj = recv_wait(cid)
@ -674,7 +686,7 @@ class ChannelTests(TestBase):
self.assertEqual(obj, b'spam') self.assertEqual(obj, b'spam')
def test_send_recv_different_interpreters_and_threads(self): def test_send_recv_different_interpreters_and_threads(self):
cid = _channels.create() cid = _channels.create(REPLACE)
id1 = _interpreters.create() id1 = _interpreters.create()
out = None out = None
@ -685,7 +697,7 @@ class ChannelTests(TestBase):
import _interpchannels as _channels import _interpchannels as _channels
while True: while True:
try: try:
obj = _channels.recv({cid}) obj, _ = _channels.recv({cid})
break break
except _channels.ChannelEmptyError: except _channels.ChannelEmptyError:
time.sleep(0.1) time.sleep(0.1)
@ -710,23 +722,23 @@ class ChannelTests(TestBase):
_channels.recv(10) _channels.recv(10)
def test_recv_empty(self): def test_recv_empty(self):
cid = _channels.create() cid = _channels.create(REPLACE)
with self.assertRaises(_channels.ChannelEmptyError): with self.assertRaises(_channels.ChannelEmptyError):
_channels.recv(cid) _channels.recv(cid)
def test_recv_default(self): def test_recv_default(self):
default = object() default = object()
cid = _channels.create() cid = _channels.create(REPLACE)
obj1 = _channels.recv(cid, default) obj1 = recv_nowait(cid, default)
_channels.send(cid, None, blocking=False) _channels.send(cid, None, blocking=False)
_channels.send(cid, 1, blocking=False) _channels.send(cid, 1, blocking=False)
_channels.send(cid, b'spam', blocking=False) _channels.send(cid, b'spam', blocking=False)
_channels.send(cid, b'eggs', blocking=False) _channels.send(cid, b'eggs', blocking=False)
obj2 = _channels.recv(cid, default) obj2 = recv_nowait(cid, default)
obj3 = _channels.recv(cid, default) obj3 = recv_nowait(cid, default)
obj4 = _channels.recv(cid) obj4 = recv_nowait(cid)
obj5 = _channels.recv(cid, default) obj5 = recv_nowait(cid, default)
obj6 = _channels.recv(cid, default) obj6 = recv_nowait(cid, default)
self.assertIs(obj1, default) self.assertIs(obj1, default)
self.assertIs(obj2, None) self.assertIs(obj2, None)
@ -737,7 +749,7 @@ class ChannelTests(TestBase):
def test_recv_sending_interp_destroyed(self): def test_recv_sending_interp_destroyed(self):
with self.subTest('closed'): with self.subTest('closed'):
cid1 = _channels.create() cid1 = _channels.create(REPLACE)
interp = _interpreters.create() interp = _interpreters.create()
_interpreters.run_string(interp, dedent(f""" _interpreters.run_string(interp, dedent(f"""
import _interpchannels as _channels import _interpchannels as _channels
@ -750,7 +762,7 @@ class ChannelTests(TestBase):
_channels.recv(cid1) _channels.recv(cid1)
del cid1 del cid1
with self.subTest('still open'): with self.subTest('still open'):
cid2 = _channels.create() cid2 = _channels.create(REPLACE)
interp = _interpreters.create() interp = _interpreters.create()
_interpreters.run_string(interp, dedent(f""" _interpreters.run_string(interp, dedent(f"""
import _interpchannels as _channels import _interpchannels as _channels
@ -759,7 +771,8 @@ class ChannelTests(TestBase):
_channels.send(cid2, b'eggs', blocking=False) _channels.send(cid2, b'eggs', blocking=False)
_interpreters.destroy(interp) _interpreters.destroy(interp)
_channels.recv(cid2) recv_nowait(cid2, unbound=True)
recv_nowait(cid2, unbound=False)
with self.assertRaisesRegex(RuntimeError, with self.assertRaisesRegex(RuntimeError,
f'channel {cid2} is empty'): f'channel {cid2} is empty'):
_channels.recv(cid2) _channels.recv(cid2)
@ -770,9 +783,9 @@ class ChannelTests(TestBase):
def test_send_buffer(self): def test_send_buffer(self):
buf = bytearray(b'spamspamspam') buf = bytearray(b'spamspamspam')
cid = _channels.create() cid = _channels.create(REPLACE)
_channels.send_buffer(cid, buf, blocking=False) _channels.send_buffer(cid, buf, blocking=False)
obj = _channels.recv(cid) obj = recv_nowait(cid)
self.assertIsNot(obj, buf) self.assertIsNot(obj, buf)
self.assertIsInstance(obj, memoryview) self.assertIsInstance(obj, memoryview)
@ -794,12 +807,12 @@ class ChannelTests(TestBase):
else: else:
send = _channels.send send = _channels.send
cid = _channels.create() cid = _channels.create(REPLACE)
try: try:
started = time.monotonic() started = time.monotonic()
send(cid, obj, blocking=False) send(cid, obj, blocking=False)
stopped = time.monotonic() stopped = time.monotonic()
_channels.recv(cid) recv_nowait(cid)
finally: finally:
_channels.destroy(cid) _channels.destroy(cid)
delay = stopped - started # seconds delay = stopped - started # seconds
@ -813,7 +826,7 @@ class ChannelTests(TestBase):
received = None received = None
obj = b'spam' obj = b'spam'
wait = self.build_send_waiter(obj) wait = self.build_send_waiter(obj)
cid = _channels.create() cid = _channels.create(REPLACE)
def f(): def f():
nonlocal received nonlocal received
wait() wait()
@ -829,7 +842,7 @@ class ChannelTests(TestBase):
received = None received = None
obj = bytearray(b'spam') obj = bytearray(b'spam')
wait = self.build_send_waiter(obj, buffer=True) wait = self.build_send_waiter(obj, buffer=True)
cid = _channels.create() cid = _channels.create(REPLACE)
def f(): def f():
nonlocal received nonlocal received
wait() wait()
@ -844,7 +857,7 @@ class ChannelTests(TestBase):
def test_send_blocking_no_wait(self): def test_send_blocking_no_wait(self):
received = None received = None
obj = b'spam' obj = b'spam'
cid = _channels.create() cid = _channels.create(REPLACE)
def f(): def f():
nonlocal received nonlocal received
received = recv_wait(cid) received = recv_wait(cid)
@ -858,7 +871,7 @@ class ChannelTests(TestBase):
def test_send_buffer_blocking_no_wait(self): def test_send_buffer_blocking_no_wait(self):
received = None received = None
obj = bytearray(b'spam') obj = bytearray(b'spam')
cid = _channels.create() cid = _channels.create(REPLACE)
def f(): def f():
nonlocal received nonlocal received
received = recv_wait(cid) received = recv_wait(cid)
@ -873,20 +886,20 @@ class ChannelTests(TestBase):
obj = b'spam' obj = b'spam'
with self.subTest('non-blocking with timeout'): with self.subTest('non-blocking with timeout'):
cid = _channels.create() cid = _channels.create(REPLACE)
with self.assertRaises(ValueError): with self.assertRaises(ValueError):
_channels.send(cid, obj, blocking=False, timeout=0.1) _channels.send(cid, obj, blocking=False, timeout=0.1)
with self.subTest('timeout hit'): with self.subTest('timeout hit'):
cid = _channels.create() cid = _channels.create(REPLACE)
with self.assertRaises(TimeoutError): with self.assertRaises(TimeoutError):
_channels.send(cid, obj, blocking=True, timeout=0.1) _channels.send(cid, obj, blocking=True, timeout=0.1)
with self.assertRaises(_channels.ChannelEmptyError): with self.assertRaises(_channels.ChannelEmptyError):
received = _channels.recv(cid) received = recv_nowait(cid)
print(repr(received)) print(repr(received))
with self.subTest('timeout not hit'): with self.subTest('timeout not hit'):
cid = _channels.create() cid = _channels.create(REPLACE)
def f(): def f():
recv_wait(cid) recv_wait(cid)
t = threading.Thread(target=f) t = threading.Thread(target=f)
@ -910,20 +923,20 @@ class ChannelTests(TestBase):
obj = bytearray(b'spam') obj = bytearray(b'spam')
with self.subTest('non-blocking with timeout'): with self.subTest('non-blocking with timeout'):
cid = _channels.create() cid = _channels.create(REPLACE)
with self.assertRaises(ValueError): with self.assertRaises(ValueError):
_channels.send_buffer(cid, obj, blocking=False, timeout=0.1) _channels.send_buffer(cid, obj, blocking=False, timeout=0.1)
with self.subTest('timeout hit'): with self.subTest('timeout hit'):
cid = _channels.create() cid = _channels.create(REPLACE)
with self.assertRaises(TimeoutError): with self.assertRaises(TimeoutError):
_channels.send_buffer(cid, obj, blocking=True, timeout=0.1) _channels.send_buffer(cid, obj, blocking=True, timeout=0.1)
with self.assertRaises(_channels.ChannelEmptyError): with self.assertRaises(_channels.ChannelEmptyError):
received = _channels.recv(cid) received = recv_nowait(cid)
print(repr(received)) print(repr(received))
with self.subTest('timeout not hit'): with self.subTest('timeout not hit'):
cid = _channels.create() cid = _channels.create(REPLACE)
def f(): def f():
recv_wait(cid) recv_wait(cid)
t = threading.Thread(target=f) t = threading.Thread(target=f)
@ -936,7 +949,7 @@ class ChannelTests(TestBase):
wait = self.build_send_waiter(obj) wait = self.build_send_waiter(obj)
with self.subTest('without timeout'): with self.subTest('without timeout'):
cid = _channels.create() cid = _channels.create(REPLACE)
def f(): def f():
wait() wait()
_channels.close(cid, force=True) _channels.close(cid, force=True)
@ -947,7 +960,7 @@ class ChannelTests(TestBase):
t.join() t.join()
with self.subTest('with timeout'): with self.subTest('with timeout'):
cid = _channels.create() cid = _channels.create(REPLACE)
def f(): def f():
wait() wait()
_channels.close(cid, force=True) _channels.close(cid, force=True)
@ -974,7 +987,7 @@ class ChannelTests(TestBase):
wait = self.build_send_waiter(obj, buffer=True) wait = self.build_send_waiter(obj, buffer=True)
with self.subTest('without timeout'): with self.subTest('without timeout'):
cid = _channels.create() cid = _channels.create(REPLACE)
def f(): def f():
wait() wait()
_channels.close(cid, force=True) _channels.close(cid, force=True)
@ -985,7 +998,7 @@ class ChannelTests(TestBase):
t.join() t.join()
with self.subTest('with timeout'): with self.subTest('with timeout'):
cid = _channels.create() cid = _channels.create(REPLACE)
def f(): def f():
wait() wait()
_channels.close(cid, force=True) _channels.close(cid, force=True)
@ -999,9 +1012,9 @@ class ChannelTests(TestBase):
# close # close
def test_close_single_user(self): def test_close_single_user(self):
cid = _channels.create() cid = _channels.create(REPLACE)
_channels.send(cid, b'spam', blocking=False) _channels.send(cid, b'spam', blocking=False)
_channels.recv(cid) recv_nowait(cid)
_channels.close(cid) _channels.close(cid)
with self.assertRaises(_channels.ChannelClosedError): with self.assertRaises(_channels.ChannelClosedError):
@ -1010,7 +1023,7 @@ class ChannelTests(TestBase):
_channels.recv(cid) _channels.recv(cid)
def test_close_multiple_users(self): def test_close_multiple_users(self):
cid = _channels.create() cid = _channels.create(REPLACE)
id1 = _interpreters.create() id1 = _interpreters.create()
id2 = _interpreters.create() id2 = _interpreters.create()
_interpreters.run_string(id1, dedent(f""" _interpreters.run_string(id1, dedent(f"""
@ -1034,9 +1047,9 @@ class ChannelTests(TestBase):
self.assertEqual(excsnap.type.__name__, 'ChannelClosedError') self.assertEqual(excsnap.type.__name__, 'ChannelClosedError')
def test_close_multiple_times(self): def test_close_multiple_times(self):
cid = _channels.create() cid = _channels.create(REPLACE)
_channels.send(cid, b'spam', blocking=False) _channels.send(cid, b'spam', blocking=False)
_channels.recv(cid) recv_nowait(cid)
_channels.close(cid) _channels.close(cid)
with self.assertRaises(_channels.ChannelClosedError): with self.assertRaises(_channels.ChannelClosedError):
@ -1051,9 +1064,9 @@ class ChannelTests(TestBase):
] ]
for send, recv in tests: for send, recv in tests:
with self.subTest((send, recv)): with self.subTest((send, recv)):
cid = _channels.create() cid = _channels.create(REPLACE)
_channels.send(cid, b'spam', blocking=False) _channels.send(cid, b'spam', blocking=False)
_channels.recv(cid) recv_nowait(cid)
_channels.close(cid, send=send, recv=recv) _channels.close(cid, send=send, recv=recv)
with self.assertRaises(_channels.ChannelClosedError): with self.assertRaises(_channels.ChannelClosedError):
@ -1062,56 +1075,56 @@ class ChannelTests(TestBase):
_channels.recv(cid) _channels.recv(cid)
def test_close_defaults_with_unused_items(self): def test_close_defaults_with_unused_items(self):
cid = _channels.create() cid = _channels.create(REPLACE)
_channels.send(cid, b'spam', blocking=False) _channels.send(cid, b'spam', blocking=False)
_channels.send(cid, b'ham', blocking=False) _channels.send(cid, b'ham', blocking=False)
with self.assertRaises(_channels.ChannelNotEmptyError): with self.assertRaises(_channels.ChannelNotEmptyError):
_channels.close(cid) _channels.close(cid)
_channels.recv(cid) recv_nowait(cid)
_channels.send(cid, b'eggs', blocking=False) _channels.send(cid, b'eggs', blocking=False)
def test_close_recv_with_unused_items_unforced(self): def test_close_recv_with_unused_items_unforced(self):
cid = _channels.create() cid = _channels.create(REPLACE)
_channels.send(cid, b'spam', blocking=False) _channels.send(cid, b'spam', blocking=False)
_channels.send(cid, b'ham', blocking=False) _channels.send(cid, b'ham', blocking=False)
with self.assertRaises(_channels.ChannelNotEmptyError): with self.assertRaises(_channels.ChannelNotEmptyError):
_channels.close(cid, recv=True) _channels.close(cid, recv=True)
_channels.recv(cid) recv_nowait(cid)
_channels.send(cid, b'eggs', blocking=False) _channels.send(cid, b'eggs', blocking=False)
_channels.recv(cid) recv_nowait(cid)
_channels.recv(cid) recv_nowait(cid)
_channels.close(cid, recv=True) _channels.close(cid, recv=True)
def test_close_send_with_unused_items_unforced(self): def test_close_send_with_unused_items_unforced(self):
cid = _channels.create() cid = _channels.create(REPLACE)
_channels.send(cid, b'spam', blocking=False) _channels.send(cid, b'spam', blocking=False)
_channels.send(cid, b'ham', blocking=False) _channels.send(cid, b'ham', blocking=False)
_channels.close(cid, send=True) _channels.close(cid, send=True)
with self.assertRaises(_channels.ChannelClosedError): with self.assertRaises(_channels.ChannelClosedError):
_channels.send(cid, b'eggs') _channels.send(cid, b'eggs')
_channels.recv(cid) recv_nowait(cid)
_channels.recv(cid) recv_nowait(cid)
with self.assertRaises(_channels.ChannelClosedError): with self.assertRaises(_channels.ChannelClosedError):
_channels.recv(cid) _channels.recv(cid)
def test_close_both_with_unused_items_unforced(self): def test_close_both_with_unused_items_unforced(self):
cid = _channels.create() cid = _channels.create(REPLACE)
_channels.send(cid, b'spam', blocking=False) _channels.send(cid, b'spam', blocking=False)
_channels.send(cid, b'ham', blocking=False) _channels.send(cid, b'ham', blocking=False)
with self.assertRaises(_channels.ChannelNotEmptyError): with self.assertRaises(_channels.ChannelNotEmptyError):
_channels.close(cid, recv=True, send=True) _channels.close(cid, recv=True, send=True)
_channels.recv(cid) recv_nowait(cid)
_channels.send(cid, b'eggs', blocking=False) _channels.send(cid, b'eggs', blocking=False)
_channels.recv(cid) recv_nowait(cid)
_channels.recv(cid) recv_nowait(cid)
_channels.close(cid, recv=True) _channels.close(cid, recv=True)
def test_close_recv_with_unused_items_forced(self): def test_close_recv_with_unused_items_forced(self):
cid = _channels.create() cid = _channels.create(REPLACE)
_channels.send(cid, b'spam', blocking=False) _channels.send(cid, b'spam', blocking=False)
_channels.send(cid, b'ham', blocking=False) _channels.send(cid, b'ham', blocking=False)
_channels.close(cid, recv=True, force=True) _channels.close(cid, recv=True, force=True)
@ -1122,7 +1135,7 @@ class ChannelTests(TestBase):
_channels.recv(cid) _channels.recv(cid)
def test_close_send_with_unused_items_forced(self): def test_close_send_with_unused_items_forced(self):
cid = _channels.create() cid = _channels.create(REPLACE)
_channels.send(cid, b'spam', blocking=False) _channels.send(cid, b'spam', blocking=False)
_channels.send(cid, b'ham', blocking=False) _channels.send(cid, b'ham', blocking=False)
_channels.close(cid, send=True, force=True) _channels.close(cid, send=True, force=True)
@ -1133,7 +1146,7 @@ class ChannelTests(TestBase):
_channels.recv(cid) _channels.recv(cid)
def test_close_both_with_unused_items_forced(self): def test_close_both_with_unused_items_forced(self):
cid = _channels.create() cid = _channels.create(REPLACE)
_channels.send(cid, b'spam', blocking=False) _channels.send(cid, b'spam', blocking=False)
_channels.send(cid, b'ham', blocking=False) _channels.send(cid, b'ham', blocking=False)
_channels.close(cid, send=True, recv=True, force=True) _channels.close(cid, send=True, recv=True, force=True)
@ -1144,7 +1157,7 @@ class ChannelTests(TestBase):
_channels.recv(cid) _channels.recv(cid)
def test_close_never_used(self): def test_close_never_used(self):
cid = _channels.create() cid = _channels.create(REPLACE)
_channels.close(cid) _channels.close(cid)
with self.assertRaises(_channels.ChannelClosedError): with self.assertRaises(_channels.ChannelClosedError):
@ -1153,7 +1166,7 @@ class ChannelTests(TestBase):
_channels.recv(cid) _channels.recv(cid)
def test_close_by_unassociated_interp(self): def test_close_by_unassociated_interp(self):
cid = _channels.create() cid = _channels.create(REPLACE)
_channels.send(cid, b'spam', blocking=False) _channels.send(cid, b'spam', blocking=False)
interp = _interpreters.create() interp = _interpreters.create()
_interpreters.run_string(interp, dedent(f""" _interpreters.run_string(interp, dedent(f"""
@ -1166,11 +1179,11 @@ class ChannelTests(TestBase):
_channels.close(cid) _channels.close(cid)
def test_close_used_multiple_times_by_single_user(self): def test_close_used_multiple_times_by_single_user(self):
cid = _channels.create() cid = _channels.create(REPLACE)
_channels.send(cid, b'spam', blocking=False) _channels.send(cid, b'spam', blocking=False)
_channels.send(cid, b'spam', blocking=False) _channels.send(cid, b'spam', blocking=False)
_channels.send(cid, b'spam', blocking=False) _channels.send(cid, b'spam', blocking=False)
_channels.recv(cid) recv_nowait(cid)
_channels.close(cid, force=True) _channels.close(cid, force=True)
with self.assertRaises(_channels.ChannelClosedError): with self.assertRaises(_channels.ChannelClosedError):
@ -1179,7 +1192,7 @@ class ChannelTests(TestBase):
_channels.recv(cid) _channels.recv(cid)
def test_channel_list_interpreters_invalid_channel(self): def test_channel_list_interpreters_invalid_channel(self):
cid = _channels.create() cid = _channels.create(REPLACE)
# Test for invalid channel ID. # Test for invalid channel ID.
with self.assertRaises(_channels.ChannelNotFoundError): with self.assertRaises(_channels.ChannelNotFoundError):
_channels.list_interpreters(1000, send=True) _channels.list_interpreters(1000, send=True)
@ -1191,7 +1204,7 @@ class ChannelTests(TestBase):
def test_channel_list_interpreters_invalid_args(self): def test_channel_list_interpreters_invalid_args(self):
# Tests for invalid arguments passed to the API. # Tests for invalid arguments passed to the API.
cid = _channels.create() cid = _channels.create(REPLACE)
with self.assertRaises(TypeError): with self.assertRaises(TypeError):
_channels.list_interpreters(cid) _channels.list_interpreters(cid)
@ -1240,9 +1253,9 @@ class ChannelReleaseTests(TestBase):
""" """
def test_single_user(self): def test_single_user(self):
cid = _channels.create() cid = _channels.create(REPLACE)
_channels.send(cid, b'spam', blocking=False) _channels.send(cid, b'spam', blocking=False)
_channels.recv(cid) recv_nowait(cid)
_channels.release(cid, send=True, recv=True) _channels.release(cid, send=True, recv=True)
with self.assertRaises(_channels.ChannelClosedError): with self.assertRaises(_channels.ChannelClosedError):
@ -1251,7 +1264,7 @@ class ChannelReleaseTests(TestBase):
_channels.recv(cid) _channels.recv(cid)
def test_multiple_users(self): def test_multiple_users(self):
cid = _channels.create() cid = _channels.create(REPLACE)
id1 = _interpreters.create() id1 = _interpreters.create()
id2 = _interpreters.create() id2 = _interpreters.create()
_interpreters.run_string(id1, dedent(f""" _interpreters.run_string(id1, dedent(f"""
@ -1260,7 +1273,7 @@ class ChannelReleaseTests(TestBase):
""")) """))
out = _run_output(id2, dedent(f""" out = _run_output(id2, dedent(f"""
import _interpchannels as _channels import _interpchannels as _channels
obj = _channels.recv({cid}) obj, _ = _channels.recv({cid})
_channels.release({cid}) _channels.release({cid})
print(repr(obj)) print(repr(obj))
""")) """))
@ -1271,9 +1284,9 @@ class ChannelReleaseTests(TestBase):
self.assertEqual(out.strip(), "b'spam'") self.assertEqual(out.strip(), "b'spam'")
def test_no_kwargs(self): def test_no_kwargs(self):
cid = _channels.create() cid = _channels.create(REPLACE)
_channels.send(cid, b'spam', blocking=False) _channels.send(cid, b'spam', blocking=False)
_channels.recv(cid) recv_nowait(cid)
_channels.release(cid) _channels.release(cid)
with self.assertRaises(_channels.ChannelClosedError): with self.assertRaises(_channels.ChannelClosedError):
@ -1282,16 +1295,16 @@ class ChannelReleaseTests(TestBase):
_channels.recv(cid) _channels.recv(cid)
def test_multiple_times(self): def test_multiple_times(self):
cid = _channels.create() cid = _channels.create(REPLACE)
_channels.send(cid, b'spam', blocking=False) _channels.send(cid, b'spam', blocking=False)
_channels.recv(cid) recv_nowait(cid)
_channels.release(cid, send=True, recv=True) _channels.release(cid, send=True, recv=True)
with self.assertRaises(_channels.ChannelClosedError): with self.assertRaises(_channels.ChannelClosedError):
_channels.release(cid, send=True, recv=True) _channels.release(cid, send=True, recv=True)
def test_with_unused_items(self): def test_with_unused_items(self):
cid = _channels.create() cid = _channels.create(REPLACE)
_channels.send(cid, b'spam', blocking=False) _channels.send(cid, b'spam', blocking=False)
_channels.send(cid, b'ham', blocking=False) _channels.send(cid, b'ham', blocking=False)
_channels.release(cid, send=True, recv=True) _channels.release(cid, send=True, recv=True)
@ -1300,7 +1313,7 @@ class ChannelReleaseTests(TestBase):
_channels.recv(cid) _channels.recv(cid)
def test_never_used(self): def test_never_used(self):
cid = _channels.create() cid = _channels.create(REPLACE)
_channels.release(cid) _channels.release(cid)
with self.assertRaises(_channels.ChannelClosedError): with self.assertRaises(_channels.ChannelClosedError):
@ -1309,14 +1322,14 @@ class ChannelReleaseTests(TestBase):
_channels.recv(cid) _channels.recv(cid)
def test_by_unassociated_interp(self): def test_by_unassociated_interp(self):
cid = _channels.create() cid = _channels.create(REPLACE)
_channels.send(cid, b'spam', blocking=False) _channels.send(cid, b'spam', blocking=False)
interp = _interpreters.create() interp = _interpreters.create()
_interpreters.run_string(interp, dedent(f""" _interpreters.run_string(interp, dedent(f"""
import _interpchannels as _channels import _interpchannels as _channels
_channels.release({cid}) _channels.release({cid})
""")) """))
obj = _channels.recv(cid) obj = recv_nowait(cid)
_channels.release(cid) _channels.release(cid)
with self.assertRaises(_channels.ChannelClosedError): with self.assertRaises(_channels.ChannelClosedError):
@ -1325,7 +1338,7 @@ class ChannelReleaseTests(TestBase):
def test_close_if_unassociated(self): def test_close_if_unassociated(self):
# XXX Something's not right with this test... # XXX Something's not right with this test...
cid = _channels.create() cid = _channels.create(REPLACE)
interp = _interpreters.create() interp = _interpreters.create()
_interpreters.run_string(interp, dedent(f""" _interpreters.run_string(interp, dedent(f"""
import _interpchannels as _channels import _interpchannels as _channels
@ -1338,21 +1351,21 @@ class ChannelReleaseTests(TestBase):
def test_partially(self): def test_partially(self):
# XXX Is partial close too weird/confusing? # XXX Is partial close too weird/confusing?
cid = _channels.create() cid = _channels.create(REPLACE)
_channels.send(cid, None, blocking=False) _channels.send(cid, None, blocking=False)
_channels.recv(cid) recv_nowait(cid)
_channels.send(cid, b'spam', blocking=False) _channels.send(cid, b'spam', blocking=False)
_channels.release(cid, send=True) _channels.release(cid, send=True)
obj = _channels.recv(cid) obj = recv_nowait(cid)
self.assertEqual(obj, b'spam') self.assertEqual(obj, b'spam')
def test_used_multiple_times_by_single_user(self): def test_used_multiple_times_by_single_user(self):
cid = _channels.create() cid = _channels.create(REPLACE)
_channels.send(cid, b'spam', blocking=False) _channels.send(cid, b'spam', blocking=False)
_channels.send(cid, b'spam', blocking=False) _channels.send(cid, b'spam', blocking=False)
_channels.send(cid, b'spam', blocking=False) _channels.send(cid, b'spam', blocking=False)
_channels.recv(cid) recv_nowait(cid)
_channels.release(cid, send=True, recv=True) _channels.release(cid, send=True, recv=True)
with self.assertRaises(_channels.ChannelClosedError): with self.assertRaises(_channels.ChannelClosedError):
@ -1428,9 +1441,9 @@ class ChannelCloseFixture(namedtuple('ChannelCloseFixture',
def _new_channel(self, creator): def _new_channel(self, creator):
if creator.name == 'main': if creator.name == 'main':
return _channels.create() return _channels.create(REPLACE)
else: else:
ch = _channels.create() ch = _channels.create(REPLACE)
run_interp(creator.id, f""" run_interp(creator.id, f"""
import _interpreters import _interpreters
cid = _xxsubchannels.create() cid = _xxsubchannels.create()
@ -1439,7 +1452,7 @@ class ChannelCloseFixture(namedtuple('ChannelCloseFixture',
_xxsubchannels.send({ch}, int(cid), blocking=False) _xxsubchannels.send({ch}, int(cid), blocking=False)
del _interpreters del _interpreters
""") """)
self._cid = _channels.recv(ch) self._cid = recv_nowait(ch)
return self._cid return self._cid
def _get_interpreter(self, interp): def _get_interpreter(self, interp):
@ -1657,7 +1670,7 @@ class ExhaustiveChannelTests(TestBase):
) )
fix.record_action(action, result) fix.record_action(action, result)
else: else:
_cid = _channels.create() _cid = _channels.create(REPLACE)
run_interp(interp.id, f""" run_interp(interp.id, f"""
result = helpers.run_action( result = helpers.run_action(
{fix.cid}, {fix.cid},
@ -1670,8 +1683,8 @@ class ExhaustiveChannelTests(TestBase):
_channels.send({_cid}, b'X' if result.closed else b'', blocking=False) _channels.send({_cid}, b'X' if result.closed else b'', blocking=False)
""") """)
result = ChannelState( result = ChannelState(
pending=int.from_bytes(_channels.recv(_cid), 'little'), pending=int.from_bytes(recv_nowait(_cid), 'little'),
closed=bool(_channels.recv(_cid)), closed=bool(recv_nowait(_cid)),
) )
fix.record_action(action, result) fix.record_action(action, result)
@ -1729,7 +1742,7 @@ class ExhaustiveChannelTests(TestBase):
self.assertTrue(fix.state.closed) self.assertTrue(fix.state.closed)
for _ in range(fix.state.pending): for _ in range(fix.state.pending):
_channels.recv(fix.cid) recv_nowait(fix.cid)
self._assert_closed_in_interp(fix) self._assert_closed_in_interp(fix)
for interp in ('same', 'other'): for interp in ('same', 'other'):

View File

@ -372,6 +372,228 @@ class TestSendRecv(TestBase):
obj[4:8] = b'ham.' obj[4:8] = b'ham.'
self.assertEqual(obj, buf) self.assertEqual(obj, buf)
def test_send_cleared_with_subinterpreter(self):
def common(rch, sch, unbound=None, presize=0):
if not unbound:
extraargs = ''
elif unbound is channels.UNBOUND:
extraargs = ', unbound=channels.UNBOUND'
elif unbound is channels.UNBOUND_ERROR:
extraargs = ', unbound=channels.UNBOUND_ERROR'
elif unbound is channels.UNBOUND_REMOVE:
extraargs = ', unbound=channels.UNBOUND_REMOVE'
else:
raise NotImplementedError(repr(unbound))
interp = interpreters.create()
_run_output(interp, dedent(f"""
from test.support.interpreters import channels
sch = channels.SendChannel({sch.id})
obj1 = b'spam'
obj2 = b'eggs'
sch.send_nowait(obj1{extraargs})
sch.send_nowait(obj2{extraargs})
"""))
self.assertEqual(
_channels.get_count(rch.id),
presize + 2,
)
if presize == 0:
obj1 = rch.recv()
self.assertEqual(obj1, b'spam')
self.assertEqual(
_channels.get_count(rch.id),
presize + 1,
)
return interp
with self.subTest('default'): # UNBOUND
rch, sch = channels.create()
interp = common(rch, sch)
del interp
self.assertEqual(_channels.get_count(rch.id), 1)
obj1 = rch.recv()
self.assertEqual(_channels.get_count(rch.id), 0)
self.assertIs(obj1, channels.UNBOUND)
self.assertEqual(_channels.get_count(rch.id), 0)
with self.assertRaises(channels.ChannelEmptyError):
rch.recv_nowait()
with self.subTest('UNBOUND'):
rch, sch = channels.create()
interp = common(rch, sch, channels.UNBOUND)
del interp
self.assertEqual(_channels.get_count(rch.id), 1)
obj1 = rch.recv()
self.assertIs(obj1, channels.UNBOUND)
self.assertEqual(_channels.get_count(rch.id), 0)
with self.assertRaises(channels.ChannelEmptyError):
rch.recv_nowait()
with self.subTest('UNBOUND_ERROR'):
rch, sch = channels.create()
interp = common(rch, sch, channels.UNBOUND_ERROR)
del interp
self.assertEqual(_channels.get_count(rch.id), 1)
with self.assertRaises(channels.ItemInterpreterDestroyed):
rch.recv()
self.assertEqual(_channels.get_count(rch.id), 0)
with self.assertRaises(channels.ChannelEmptyError):
rch.recv_nowait()
with self.subTest('UNBOUND_REMOVE'):
rch, sch = channels.create()
interp = common(rch, sch, channels.UNBOUND_REMOVE)
del interp
self.assertEqual(_channels.get_count(rch.id), 0)
with self.assertRaises(channels.ChannelEmptyError):
rch.recv_nowait()
sch.send_nowait(b'ham', unbound=channels.UNBOUND_REMOVE)
self.assertEqual(_channels.get_count(rch.id), 1)
interp = common(rch, sch, channels.UNBOUND_REMOVE, 1)
self.assertEqual(_channels.get_count(rch.id), 3)
sch.send_nowait(42, unbound=channels.UNBOUND_REMOVE)
self.assertEqual(_channels.get_count(rch.id), 4)
del interp
self.assertEqual(_channels.get_count(rch.id), 2)
obj1 = rch.recv()
obj2 = rch.recv()
self.assertEqual(obj1, b'ham')
self.assertEqual(obj2, 42)
self.assertEqual(_channels.get_count(rch.id), 0)
with self.assertRaises(channels.ChannelEmptyError):
rch.recv_nowait()
def test_send_cleared_with_subinterpreter_mixed(self):
rch, sch = channels.create()
interp = interpreters.create()
# If we don't associate the main interpreter with the channel
# then the channel will be automatically closed when interp
# is destroyed.
sch.send_nowait(None)
rch.recv()
self.assertEqual(_channels.get_count(rch.id), 0)
_run_output(interp, dedent(f"""
from test.support.interpreters import channels
sch = channels.SendChannel({sch.id})
sch.send_nowait(1, unbound=channels.UNBOUND)
sch.send_nowait(2, unbound=channels.UNBOUND_ERROR)
sch.send_nowait(3)
sch.send_nowait(4, unbound=channels.UNBOUND_REMOVE)
sch.send_nowait(5, unbound=channels.UNBOUND)
"""))
self.assertEqual(_channels.get_count(rch.id), 5)
del interp
self.assertEqual(_channels.get_count(rch.id), 4)
obj1 = rch.recv()
self.assertIs(obj1, channels.UNBOUND)
self.assertEqual(_channels.get_count(rch.id), 3)
with self.assertRaises(channels.ItemInterpreterDestroyed):
rch.recv()
self.assertEqual(_channels.get_count(rch.id), 2)
obj2 = rch.recv()
self.assertIs(obj2, channels.UNBOUND)
self.assertEqual(_channels.get_count(rch.id), 1)
obj3 = rch.recv()
self.assertIs(obj3, channels.UNBOUND)
self.assertEqual(_channels.get_count(rch.id), 0)
def test_send_cleared_with_subinterpreter_multiple(self):
rch, sch = channels.create()
interp1 = interpreters.create()
interp2 = interpreters.create()
sch.send_nowait(1)
_run_output(interp1, dedent(f"""
from test.support.interpreters import channels
rch = channels.RecvChannel({rch.id})
sch = channels.SendChannel({sch.id})
obj1 = rch.recv()
sch.send_nowait(2, unbound=channels.UNBOUND)
sch.send_nowait(obj1, unbound=channels.UNBOUND_REMOVE)
"""))
_run_output(interp2, dedent(f"""
from test.support.interpreters import channels
rch = channels.RecvChannel({rch.id})
sch = channels.SendChannel({sch.id})
obj2 = rch.recv()
obj1 = rch.recv()
"""))
self.assertEqual(_channels.get_count(rch.id), 0)
sch.send_nowait(3)
_run_output(interp1, dedent("""
sch.send_nowait(4, unbound=channels.UNBOUND)
# interp closed here
sch.send_nowait(5, unbound=channels.UNBOUND_REMOVE)
sch.send_nowait(6, unbound=channels.UNBOUND)
"""))
_run_output(interp2, dedent("""
sch.send_nowait(7, unbound=channels.UNBOUND_ERROR)
# interp closed here
sch.send_nowait(obj1, unbound=channels.UNBOUND_ERROR)
sch.send_nowait(obj2, unbound=channels.UNBOUND_REMOVE)
sch.send_nowait(8, unbound=channels.UNBOUND)
"""))
_run_output(interp1, dedent("""
sch.send_nowait(9, unbound=channels.UNBOUND_REMOVE)
sch.send_nowait(10, unbound=channels.UNBOUND)
"""))
self.assertEqual(_channels.get_count(rch.id), 10)
obj3 = rch.recv()
self.assertEqual(obj3, 3)
self.assertEqual(_channels.get_count(rch.id), 9)
obj4 = rch.recv()
self.assertEqual(obj4, 4)
self.assertEqual(_channels.get_count(rch.id), 8)
del interp1
self.assertEqual(_channels.get_count(rch.id), 6)
# obj5 was removed
obj6 = rch.recv()
self.assertIs(obj6, channels.UNBOUND)
self.assertEqual(_channels.get_count(rch.id), 5)
obj7 = rch.recv()
self.assertEqual(obj7, 7)
self.assertEqual(_channels.get_count(rch.id), 4)
del interp2
self.assertEqual(_channels.get_count(rch.id), 3)
# obj1
with self.assertRaises(channels.ItemInterpreterDestroyed):
rch.recv()
self.assertEqual(_channels.get_count(rch.id), 2)
# obj2 was removed
obj8 = rch.recv()
self.assertIs(obj8, channels.UNBOUND)
self.assertEqual(_channels.get_count(rch.id), 1)
# obj9 was removed
obj10 = rch.recv()
self.assertIs(obj10, channels.UNBOUND)
self.assertEqual(_channels.get_count(rch.id), 0)
if __name__ == '__main__': if __name__ == '__main__':
# Test needs to be a package, so we can do relative imports. # Test needs to be a package, so we can do relative imports.

View File

@ -8,11 +8,11 @@ from test.support import import_helper, Py_DEBUG
# Raise SkipTest if subinterpreters not supported. # Raise SkipTest if subinterpreters not supported.
_queues = import_helper.import_module('_interpqueues') _queues = import_helper.import_module('_interpqueues')
from test.support import interpreters from test.support import interpreters
from test.support.interpreters import queues from test.support.interpreters import queues, _crossinterp
from .utils import _run_output, TestBase as _TestBase from .utils import _run_output, TestBase as _TestBase
REPLACE = queues._UNBOUND_CONSTANT_TO_FLAG[queues.UNBOUND] REPLACE = _crossinterp._UNBOUND_CONSTANT_TO_FLAG[_crossinterp.UNBOUND]
def get_num_queues(): def get_num_queues():

View File

@ -18,7 +18,9 @@
#endif #endif
#define REGISTERS_HEAP_TYPES #define REGISTERS_HEAP_TYPES
#define HAS_UNBOUND_ITEMS
#include "_interpreters_common.h" #include "_interpreters_common.h"
#undef HAS_UNBOUND_ITEMS
#undef REGISTERS_HEAP_TYPES #undef REGISTERS_HEAP_TYPES
@ -511,8 +513,14 @@ _waiting_finish_releasing(_waiting_t *waiting)
struct _channelitem; struct _channelitem;
typedef struct _channelitem { typedef struct _channelitem {
/* The interpreter that added the item to the queue.
The actual bound interpid is found in item->data.
This is necessary because item->data might be NULL,
meaning the interpreter has been destroyed. */
int64_t interpid;
_PyCrossInterpreterData *data; _PyCrossInterpreterData *data;
_waiting_t *waiting; _waiting_t *waiting;
int unboundop;
struct _channelitem *next; struct _channelitem *next;
} _channelitem; } _channelitem;
@ -524,11 +532,22 @@ _channelitem_ID(_channelitem *item)
static void static void
_channelitem_init(_channelitem *item, _channelitem_init(_channelitem *item,
_PyCrossInterpreterData *data, _waiting_t *waiting) int64_t interpid, _PyCrossInterpreterData *data,
_waiting_t *waiting, int unboundop)
{ {
if (interpid < 0) {
interpid = _get_interpid(data);
}
else {
assert(data == NULL
|| _PyCrossInterpreterData_INTERPID(data) < 0
|| interpid == _PyCrossInterpreterData_INTERPID(data));
}
*item = (_channelitem){ *item = (_channelitem){
.interpid = interpid,
.data = data, .data = data,
.waiting = waiting, .waiting = waiting,
.unboundop = unboundop,
}; };
if (waiting != NULL) { if (waiting != NULL) {
waiting->itemid = _channelitem_ID(item); waiting->itemid = _channelitem_ID(item);
@ -536,17 +555,15 @@ _channelitem_init(_channelitem *item,
} }
static void static void
_channelitem_clear(_channelitem *item) _channelitem_clear_data(_channelitem *item, int removed)
{ {
item->next = NULL;
if (item->data != NULL) { if (item->data != NULL) {
// It was allocated in channel_send(). // It was allocated in channel_send().
(void)_release_xid_data(item->data, XID_IGNORE_EXC & XID_FREE); (void)_release_xid_data(item->data, XID_IGNORE_EXC & XID_FREE);
item->data = NULL; item->data = NULL;
} }
if (item->waiting != NULL) { if (item->waiting != NULL && removed) {
if (item->waiting->status == WAITING_ACQUIRED) { if (item->waiting->status == WAITING_ACQUIRED) {
_waiting_release(item->waiting, 0); _waiting_release(item->waiting, 0);
} }
@ -554,15 +571,23 @@ _channelitem_clear(_channelitem *item)
} }
} }
static void
_channelitem_clear(_channelitem *item)
{
item->next = NULL;
_channelitem_clear_data(item, 1);
}
static _channelitem * static _channelitem *
_channelitem_new(_PyCrossInterpreterData *data, _waiting_t *waiting) _channelitem_new(int64_t interpid, _PyCrossInterpreterData *data,
_waiting_t *waiting, int unboundop)
{ {
_channelitem *item = GLOBAL_MALLOC(_channelitem); _channelitem *item = GLOBAL_MALLOC(_channelitem);
if (item == NULL) { if (item == NULL) {
PyErr_NoMemory(); PyErr_NoMemory();
return NULL; return NULL;
} }
_channelitem_init(item, data, waiting); _channelitem_init(item, interpid, data, waiting, unboundop);
return item; return item;
} }
@ -585,17 +610,48 @@ _channelitem_free_all(_channelitem *item)
static void static void
_channelitem_popped(_channelitem *item, _channelitem_popped(_channelitem *item,
_PyCrossInterpreterData **p_data, _waiting_t **p_waiting) _PyCrossInterpreterData **p_data, _waiting_t **p_waiting,
int *p_unboundop)
{ {
assert(item->waiting == NULL || item->waiting->status == WAITING_ACQUIRED); assert(item->waiting == NULL || item->waiting->status == WAITING_ACQUIRED);
*p_data = item->data; *p_data = item->data;
*p_waiting = item->waiting; *p_waiting = item->waiting;
*p_unboundop = item->unboundop;
// We clear them here, so they won't be released in _channelitem_clear(). // We clear them here, so they won't be released in _channelitem_clear().
item->data = NULL; item->data = NULL;
item->waiting = NULL; item->waiting = NULL;
_channelitem_free(item); _channelitem_free(item);
} }
static int
_channelitem_clear_interpreter(_channelitem *item)
{
assert(item->interpid >= 0);
if (item->data == NULL) {
// Its interpreter was already cleared (or it was never bound).
// For UNBOUND_REMOVE it should have been freed at that time.
assert(item->unboundop != UNBOUND_REMOVE);
return 0;
}
assert(_PyCrossInterpreterData_INTERPID(item->data) == item->interpid);
switch (item->unboundop) {
case UNBOUND_REMOVE:
// The caller must free/clear it.
return 1;
case UNBOUND_ERROR:
case UNBOUND_REPLACE:
// We won't need the cross-interpreter data later
// so we completely throw it away.
_channelitem_clear_data(item, 0);
return 0;
default:
Py_FatalError("not reachable");
return -1;
}
}
typedef struct _channelqueue { typedef struct _channelqueue {
int64_t count; int64_t count;
_channelitem *first; _channelitem *first;
@ -634,9 +690,10 @@ _channelqueue_free(_channelqueue *queue)
static int static int
_channelqueue_put(_channelqueue *queue, _channelqueue_put(_channelqueue *queue,
_PyCrossInterpreterData *data, _waiting_t *waiting) int64_t interpid, _PyCrossInterpreterData *data,
_waiting_t *waiting, int unboundop)
{ {
_channelitem *item = _channelitem_new(data, waiting); _channelitem *item = _channelitem_new(interpid, data, waiting, unboundop);
if (item == NULL) { if (item == NULL) {
return -1; return -1;
} }
@ -659,7 +716,8 @@ _channelqueue_put(_channelqueue *queue,
static int static int
_channelqueue_get(_channelqueue *queue, _channelqueue_get(_channelqueue *queue,
_PyCrossInterpreterData **p_data, _waiting_t **p_waiting) _PyCrossInterpreterData **p_data, _waiting_t **p_waiting,
int *p_unboundop)
{ {
_channelitem *item = queue->first; _channelitem *item = queue->first;
if (item == NULL) { if (item == NULL) {
@ -671,7 +729,7 @@ _channelqueue_get(_channelqueue *queue,
} }
queue->count -= 1; queue->count -= 1;
_channelitem_popped(item, p_data, p_waiting); _channelitem_popped(item, p_data, p_waiting, p_unboundop);
return 0; return 0;
} }
@ -737,7 +795,8 @@ _channelqueue_remove(_channelqueue *queue, _channelitem_id_t itemid,
} }
queue->count -= 1; queue->count -= 1;
_channelitem_popped(item, p_data, p_waiting); int unboundop;
_channelitem_popped(item, p_data, p_waiting, &unboundop);
} }
static void static void
@ -748,14 +807,17 @@ _channelqueue_clear_interpreter(_channelqueue *queue, int64_t interpid)
while (next != NULL) { while (next != NULL) {
_channelitem *item = next; _channelitem *item = next;
next = item->next; next = item->next;
if (_PyCrossInterpreterData_INTERPID(item->data) == interpid) { int remove = (item->interpid == interpid)
? _channelitem_clear_interpreter(item)
: 0;
if (remove) {
_channelitem_free(item);
if (prev == NULL) { if (prev == NULL) {
queue->first = item->next; queue->first = next;
} }
else { else {
prev->next = item->next; prev->next = next;
} }
_channelitem_free(item);
queue->count -= 1; queue->count -= 1;
} }
else { else {
@ -1018,12 +1080,15 @@ typedef struct _channel {
PyThread_type_lock mutex; PyThread_type_lock mutex;
_channelqueue *queue; _channelqueue *queue;
_channelends *ends; _channelends *ends;
struct {
int unboundop;
} defaults;
int open; int open;
struct _channel_closing *closing; struct _channel_closing *closing;
} _channel_state; } _channel_state;
static _channel_state * static _channel_state *
_channel_new(PyThread_type_lock mutex) _channel_new(PyThread_type_lock mutex, int unboundop)
{ {
_channel_state *chan = GLOBAL_MALLOC(_channel_state); _channel_state *chan = GLOBAL_MALLOC(_channel_state);
if (chan == NULL) { if (chan == NULL) {
@ -1041,6 +1106,7 @@ _channel_new(PyThread_type_lock mutex)
GLOBAL_FREE(chan); GLOBAL_FREE(chan);
return NULL; return NULL;
} }
chan->defaults.unboundop = unboundop;
chan->open = 1; chan->open = 1;
chan->closing = NULL; chan->closing = NULL;
return chan; return chan;
@ -1061,7 +1127,8 @@ _channel_free(_channel_state *chan)
static int static int
_channel_add(_channel_state *chan, int64_t interpid, _channel_add(_channel_state *chan, int64_t interpid,
_PyCrossInterpreterData *data, _waiting_t *waiting) _PyCrossInterpreterData *data, _waiting_t *waiting,
int unboundop)
{ {
int res = -1; int res = -1;
PyThread_acquire_lock(chan->mutex, WAIT_LOCK); PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
@ -1075,7 +1142,7 @@ _channel_add(_channel_state *chan, int64_t interpid,
goto done; goto done;
} }
if (_channelqueue_put(chan->queue, data, waiting) != 0) { if (_channelqueue_put(chan->queue, interpid, data, waiting, unboundop) != 0) {
goto done; goto done;
} }
// Any errors past this point must cause a _waiting_release() call. // Any errors past this point must cause a _waiting_release() call.
@ -1088,7 +1155,8 @@ done:
static int static int
_channel_next(_channel_state *chan, int64_t interpid, _channel_next(_channel_state *chan, int64_t interpid,
_PyCrossInterpreterData **p_data, _waiting_t **p_waiting) _PyCrossInterpreterData **p_data, _waiting_t **p_waiting,
int *p_unboundop)
{ {
int err = 0; int err = 0;
PyThread_acquire_lock(chan->mutex, WAIT_LOCK); PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
@ -1102,12 +1170,16 @@ _channel_next(_channel_state *chan, int64_t interpid,
goto done; goto done;
} }
int empty = _channelqueue_get(chan->queue, p_data, p_waiting); int empty = _channelqueue_get(chan->queue, p_data, p_waiting, p_unboundop);
assert(empty == 0 || empty == ERR_CHANNEL_EMPTY);
assert(!PyErr_Occurred()); assert(!PyErr_Occurred());
if (empty && chan->closing != NULL) { if (empty) {
assert(empty == ERR_CHANNEL_EMPTY);
if (chan->closing != NULL) {
chan->open = 0; chan->open = 0;
} }
err = ERR_CHANNEL_EMPTY;
goto done;
}
done: done:
PyThread_release_lock(chan->mutex); PyThread_release_lock(chan->mutex);
@ -1528,18 +1600,27 @@ done:
PyThread_release_lock(channels->mutex); PyThread_release_lock(channels->mutex);
} }
static int64_t * struct channel_id_and_info {
int64_t id;
int unboundop;
};
static struct channel_id_and_info *
_channels_list_all(_channels *channels, int64_t *count) _channels_list_all(_channels *channels, int64_t *count)
{ {
int64_t *cids = NULL; struct channel_id_and_info *cids = NULL;
PyThread_acquire_lock(channels->mutex, WAIT_LOCK); PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
int64_t *ids = PyMem_NEW(int64_t, (Py_ssize_t)(channels->numopen)); struct channel_id_and_info *ids =
PyMem_NEW(struct channel_id_and_info, (Py_ssize_t)(channels->numopen));
if (ids == NULL) { if (ids == NULL) {
goto done; goto done;
} }
_channelref *ref = channels->head; _channelref *ref = channels->head;
for (int64_t i=0; ref != NULL; ref = ref->next, i++) { for (int64_t i=0; ref != NULL; ref = ref->next, i++) {
ids[i] = ref->cid; ids[i] = (struct channel_id_and_info){
.id = ref->cid,
.unboundop = ref->chan->defaults.unboundop,
};
} }
*count = channels->numopen; *count = channels->numopen;
@ -1624,13 +1705,13 @@ _channel_finish_closing(_channel_state *chan) {
// Create a new channel. // Create a new channel.
static int64_t static int64_t
channel_create(_channels *channels) channel_create(_channels *channels, int unboundop)
{ {
PyThread_type_lock mutex = PyThread_allocate_lock(); PyThread_type_lock mutex = PyThread_allocate_lock();
if (mutex == NULL) { if (mutex == NULL) {
return ERR_CHANNEL_MUTEX_INIT; return ERR_CHANNEL_MUTEX_INIT;
} }
_channel_state *chan = _channel_new(mutex); _channel_state *chan = _channel_new(mutex, unboundop);
if (chan == NULL) { if (chan == NULL) {
PyThread_free_lock(mutex); PyThread_free_lock(mutex);
return -1; return -1;
@ -1662,7 +1743,7 @@ channel_destroy(_channels *channels, int64_t cid)
// Optionally request to be notified when it is received. // Optionally request to be notified when it is received.
static int static int
channel_send(_channels *channels, int64_t cid, PyObject *obj, channel_send(_channels *channels, int64_t cid, PyObject *obj,
_waiting_t *waiting) _waiting_t *waiting, int unboundop)
{ {
PyInterpreterState *interp = _get_current_interp(); PyInterpreterState *interp = _get_current_interp();
if (interp == NULL) { if (interp == NULL) {
@ -1698,7 +1779,7 @@ channel_send(_channels *channels, int64_t cid, PyObject *obj,
} }
// Add the data to the channel. // Add the data to the channel.
int res = _channel_add(chan, interpid, data, waiting); int res = _channel_add(chan, interpid, data, waiting, unboundop);
PyThread_release_lock(mutex); PyThread_release_lock(mutex);
if (res != 0) { if (res != 0) {
// We may chain an exception here: // We may chain an exception here:
@ -1735,7 +1816,7 @@ channel_clear_sent(_channels *channels, int64_t cid, _waiting_t *waiting)
// Like channel_send(), but strictly wait for the object to be received. // Like channel_send(), but strictly wait for the object to be received.
static int static int
channel_send_wait(_channels *channels, int64_t cid, PyObject *obj, channel_send_wait(_channels *channels, int64_t cid, PyObject *obj,
PY_TIMEOUT_T timeout) int unboundop, PY_TIMEOUT_T timeout)
{ {
// We use a stack variable here, so we must ensure that &waiting // We use a stack variable here, so we must ensure that &waiting
// is not held by any channel item at the point this function exits. // is not held by any channel item at the point this function exits.
@ -1746,7 +1827,7 @@ channel_send_wait(_channels *channels, int64_t cid, PyObject *obj,
} }
/* Queue up the object. */ /* Queue up the object. */
int res = channel_send(channels, cid, obj, &waiting); int res = channel_send(channels, cid, obj, &waiting, unboundop);
if (res < 0) { if (res < 0) {
assert(waiting.status == WAITING_NO_STATUS); assert(waiting.status == WAITING_NO_STATUS);
goto finally; goto finally;
@ -1788,7 +1869,7 @@ finally:
// The current interpreter gets associated with the recv end of the channel. // The current interpreter gets associated with the recv end of the channel.
// XXX Support a "wait" mutex? // XXX Support a "wait" mutex?
static int static int
channel_recv(_channels *channels, int64_t cid, PyObject **res) channel_recv(_channels *channels, int64_t cid, PyObject **res, int *p_unboundop)
{ {
int err; int err;
*res = NULL; *res = NULL;
@ -1816,13 +1897,15 @@ channel_recv(_channels *channels, int64_t cid, PyObject **res)
// Pop off the next item from the channel. // Pop off the next item from the channel.
_PyCrossInterpreterData *data = NULL; _PyCrossInterpreterData *data = NULL;
_waiting_t *waiting = NULL; _waiting_t *waiting = NULL;
err = _channel_next(chan, interpid, &data, &waiting); err = _channel_next(chan, interpid, &data, &waiting, p_unboundop);
PyThread_release_lock(mutex); PyThread_release_lock(mutex);
if (err != 0) { if (err != 0) {
return err; return err;
} }
else if (data == NULL) { else if (data == NULL) {
// The item was unbound.
assert(!PyErr_Occurred()); assert(!PyErr_Occurred());
*res = NULL;
return 0; return 0;
} }
@ -1915,6 +1998,23 @@ channel_is_associated(_channels *channels, int64_t cid, int64_t interpid,
return (end != NULL && end->open); return (end != NULL && end->open);
} }
static int
_channel_get_count(_channels *channels, int64_t cid, Py_ssize_t *p_count)
{
PyThread_type_lock mutex = NULL;
_channel_state *chan = NULL;
int err = _channels_lookup(channels, cid, &mutex, &chan);
if (err != 0) {
return err;
}
assert(chan != NULL);
int64_t count = chan->queue->count;
PyThread_release_lock(mutex);
*p_count = (Py_ssize_t)count;
return 0;
}
/* channel info */ /* channel info */
@ -2767,9 +2867,22 @@ clear_interpreter(void *data)
static PyObject * static PyObject *
channelsmod_create(PyObject *self, PyObject *Py_UNUSED(ignored)) channelsmod_create(PyObject *self, PyObject *args, PyObject *kwds)
{ {
int64_t cid = channel_create(&_globals.channels); static char *kwlist[] = {"unboundop", NULL};
int unboundop;
if (!PyArg_ParseTupleAndKeywords(args, kwds, "i:create", kwlist,
&unboundop))
{
return NULL;
}
if (!check_unbound(unboundop)) {
PyErr_Format(PyExc_ValueError,
"unsupported unboundop %d", unboundop);
return NULL;
}
int64_t cid = channel_create(&_globals.channels, unboundop);
if (cid < 0) { if (cid < 0) {
(void)handle_channel_error(-1, self, cid); (void)handle_channel_error(-1, self, cid);
return NULL; return NULL;
@ -2796,7 +2909,7 @@ channelsmod_create(PyObject *self, PyObject *Py_UNUSED(ignored))
} }
PyDoc_STRVAR(channelsmod_create_doc, PyDoc_STRVAR(channelsmod_create_doc,
"channel_create() -> cid\n\ "channel_create(unboundop) -> cid\n\
\n\ \n\
Create a new cross-interpreter channel and return a unique generated ID."); Create a new cross-interpreter channel and return a unique generated ID.");
@ -2831,7 +2944,8 @@ static PyObject *
channelsmod_list_all(PyObject *self, PyObject *Py_UNUSED(ignored)) channelsmod_list_all(PyObject *self, PyObject *Py_UNUSED(ignored))
{ {
int64_t count = 0; int64_t count = 0;
int64_t *cids = _channels_list_all(&_globals.channels, &count); struct channel_id_and_info *cids =
_channels_list_all(&_globals.channels, &count);
if (cids == NULL) { if (cids == NULL) {
if (count == 0) { if (count == 0) {
return PyList_New(0); return PyList_New(0);
@ -2848,19 +2962,26 @@ channelsmod_list_all(PyObject *self, PyObject *Py_UNUSED(ignored))
ids = NULL; ids = NULL;
goto finally; goto finally;
} }
int64_t *cur = cids; struct channel_id_and_info *cur = cids;
for (int64_t i=0; i < count; cur++, i++) { for (int64_t i=0; i < count; cur++, i++) {
PyObject *cidobj = NULL; PyObject *cidobj = NULL;
int err = newchannelid(state->ChannelIDType, *cur, 0, int err = newchannelid(state->ChannelIDType, cur->id, 0,
&_globals.channels, 0, 0, &_globals.channels, 0, 0,
(channelid **)&cidobj); (channelid **)&cidobj);
if (handle_channel_error(err, self, *cur)) { if (handle_channel_error(err, self, cur->id)) {
assert(cidobj == NULL); assert(cidobj == NULL);
Py_SETREF(ids, NULL); Py_SETREF(ids, NULL);
break; break;
} }
assert(cidobj != NULL); assert(cidobj != NULL);
PyList_SET_ITEM(ids, (Py_ssize_t)i, cidobj);
PyObject *item = Py_BuildValue("Oi", cidobj, cur->unboundop);
Py_DECREF(cidobj);
if (item == NULL) {
Py_SETREF(ids, NULL);
break;
}
PyList_SET_ITEM(ids, (Py_ssize_t)i, item);
} }
finally: finally:
@ -2942,16 +3063,24 @@ receive end.");
static PyObject * static PyObject *
channelsmod_send(PyObject *self, PyObject *args, PyObject *kwds) channelsmod_send(PyObject *self, PyObject *args, PyObject *kwds)
{ {
static char *kwlist[] = {"cid", "obj", "blocking", "timeout", NULL}; static char *kwlist[] = {"cid", "obj", "unboundop", "blocking", "timeout",
NULL};
struct channel_id_converter_data cid_data = { struct channel_id_converter_data cid_data = {
.module = self, .module = self,
}; };
PyObject *obj; PyObject *obj;
int unboundop = UNBOUND_REPLACE;
int blocking = 1; int blocking = 1;
PyObject *timeout_obj = NULL; PyObject *timeout_obj = NULL;
if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&O|$pO:channel_send", kwlist, if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&O|i$pO:channel_send", kwlist,
channel_id_converter, &cid_data, &obj, channel_id_converter, &cid_data, &obj,
&blocking, &timeout_obj)) { &unboundop, &blocking, &timeout_obj))
{
return NULL;
}
if (!check_unbound(unboundop)) {
PyErr_Format(PyExc_ValueError,
"unsupported unboundop %d", unboundop);
return NULL; return NULL;
} }
@ -2964,10 +3093,10 @@ channelsmod_send(PyObject *self, PyObject *args, PyObject *kwds)
/* Queue up the object. */ /* Queue up the object. */
int err = 0; int err = 0;
if (blocking) { if (blocking) {
err = channel_send_wait(&_globals.channels, cid, obj, timeout); err = channel_send_wait(&_globals.channels, cid, obj, unboundop, timeout);
} }
else { else {
err = channel_send(&_globals.channels, cid, obj, NULL); err = channel_send(&_globals.channels, cid, obj, NULL, unboundop);
} }
if (handle_channel_error(err, self, cid)) { if (handle_channel_error(err, self, cid)) {
return NULL; return NULL;
@ -2985,17 +3114,24 @@ By default this waits for the object to be received.");
static PyObject * static PyObject *
channelsmod_send_buffer(PyObject *self, PyObject *args, PyObject *kwds) channelsmod_send_buffer(PyObject *self, PyObject *args, PyObject *kwds)
{ {
static char *kwlist[] = {"cid", "obj", "blocking", "timeout", NULL}; static char *kwlist[] = {"cid", "obj", "unboundop", "blocking", "timeout",
NULL};
struct channel_id_converter_data cid_data = { struct channel_id_converter_data cid_data = {
.module = self, .module = self,
}; };
PyObject *obj; PyObject *obj;
int unboundop = UNBOUND_REPLACE;
int blocking = 1; int blocking = 1;
PyObject *timeout_obj = NULL; PyObject *timeout_obj = NULL;
if (!PyArg_ParseTupleAndKeywords(args, kwds, if (!PyArg_ParseTupleAndKeywords(args, kwds,
"O&O|$pO:channel_send_buffer", kwlist, "O&O|i$pO:channel_send_buffer", kwlist,
channel_id_converter, &cid_data, &obj, channel_id_converter, &cid_data, &obj,
&blocking, &timeout_obj)) { &unboundop, &blocking, &timeout_obj)) {
return NULL;
}
if (!check_unbound(unboundop)) {
PyErr_Format(PyExc_ValueError,
"unsupported unboundop %d", unboundop);
return NULL; return NULL;
} }
@ -3013,10 +3149,11 @@ channelsmod_send_buffer(PyObject *self, PyObject *args, PyObject *kwds)
/* Queue up the object. */ /* Queue up the object. */
int err = 0; int err = 0;
if (blocking) { if (blocking) {
err = channel_send_wait(&_globals.channels, cid, tempobj, timeout); err = channel_send_wait(
&_globals.channels, cid, tempobj, unboundop, timeout);
} }
else { else {
err = channel_send(&_globals.channels, cid, tempobj, NULL); err = channel_send(&_globals.channels, cid, tempobj, NULL, unboundop);
} }
Py_DECREF(tempobj); Py_DECREF(tempobj);
if (handle_channel_error(err, self, cid)) { if (handle_channel_error(err, self, cid)) {
@ -3048,25 +3185,28 @@ channelsmod_recv(PyObject *self, PyObject *args, PyObject *kwds)
cid = cid_data.cid; cid = cid_data.cid;
PyObject *obj = NULL; PyObject *obj = NULL;
int err = channel_recv(&_globals.channels, cid, &obj); int unboundop = 0;
if (handle_channel_error(err, self, cid)) { int err = channel_recv(&_globals.channels, cid, &obj, &unboundop);
return NULL; if (err == ERR_CHANNEL_EMPTY && dflt != NULL) {
}
Py_XINCREF(dflt);
if (obj == NULL) {
// Use the default. // Use the default.
if (dflt == NULL) { obj = Py_NewRef(dflt);
(void)handle_channel_error(ERR_CHANNEL_EMPTY, self, cid); err = 0;
}
else if (handle_channel_error(err, self, cid)) {
return NULL; return NULL;
} }
obj = Py_NewRef(dflt); else if (obj == NULL) {
// The item was unbound.
return Py_BuildValue("Oi", Py_None, unboundop);
} }
Py_XDECREF(dflt);
return obj; PyObject *res = Py_BuildValue("OO", obj, Py_None);
Py_DECREF(obj);
return res;
} }
PyDoc_STRVAR(channelsmod_recv_doc, PyDoc_STRVAR(channelsmod_recv_doc,
"channel_recv(cid, [default]) -> obj\n\ "channel_recv(cid, [default]) -> (obj, unboundop)\n\
\n\ \n\
Return a new object from the data at the front of the channel's queue.\n\ Return a new object from the data at the front of the channel's queue.\n\
\n\ \n\
@ -3167,6 +3307,34 @@ Close the channel for the current interpreter. 'send' and 'recv'\n\
(bool) may be used to indicate the ends to close. By default both\n\ (bool) may be used to indicate the ends to close. By default both\n\
ends are closed. Closing an already closed end is a noop."); ends are closed. Closing an already closed end is a noop.");
static PyObject *
channelsmod_get_count(PyObject *self, PyObject *args, PyObject *kwds)
{
static char *kwlist[] = {"cid", NULL};
struct channel_id_converter_data cid_data = {
.module = self,
};
if (!PyArg_ParseTupleAndKeywords(args, kwds,
"O&:get_count", kwlist,
channel_id_converter, &cid_data)) {
return NULL;
}
int64_t cid = cid_data.cid;
Py_ssize_t count = -1;
int err = _channel_get_count(&_globals.channels, cid, &count);
if (handle_channel_error(err, self, cid)) {
return NULL;
}
assert(count >= 0);
return PyLong_FromSsize_t(count);
}
PyDoc_STRVAR(channelsmod_get_count_doc,
"get_count(cid)\n\
\n\
Return the number of items in the channel.");
static PyObject * static PyObject *
channelsmod_get_info(PyObject *self, PyObject *args, PyObject *kwds) channelsmod_get_info(PyObject *self, PyObject *args, PyObject *kwds)
{ {
@ -3194,6 +3362,38 @@ PyDoc_STRVAR(channelsmod_get_info_doc,
\n\ \n\
Return details about the channel."); Return details about the channel.");
static PyObject *
channelsmod_get_channel_defaults(PyObject *self, PyObject *args, PyObject *kwds)
{
static char *kwlist[] = {"cid", NULL};
struct channel_id_converter_data cid_data = {
.module = self,
};
if (!PyArg_ParseTupleAndKeywords(args, kwds,
"O&:get_channel_defaults", kwlist,
channel_id_converter, &cid_data)) {
return NULL;
}
int64_t cid = cid_data.cid;
PyThread_type_lock mutex = NULL;
_channel_state *channel = NULL;
int err = _channels_lookup(&_globals.channels, cid, &mutex, &channel);
if (handle_channel_error(err, self, cid)) {
return NULL;
}
int unboundop = channel->defaults.unboundop;
PyThread_release_lock(mutex);
PyObject *defaults = Py_BuildValue("i", unboundop);
return defaults;
}
PyDoc_STRVAR(channelsmod_get_channel_defaults_doc,
"get_channel_defaults(cid)\n\
\n\
Return the channel's default values, set when it was created.");
static PyObject * static PyObject *
channelsmod__channel_id(PyObject *self, PyObject *args, PyObject *kwds) channelsmod__channel_id(PyObject *self, PyObject *args, PyObject *kwds)
{ {
@ -3240,8 +3440,8 @@ channelsmod__register_end_types(PyObject *self, PyObject *args, PyObject *kwds)
} }
static PyMethodDef module_functions[] = { static PyMethodDef module_functions[] = {
{"create", channelsmod_create, {"create", _PyCFunction_CAST(channelsmod_create),
METH_NOARGS, channelsmod_create_doc}, METH_VARARGS | METH_KEYWORDS, channelsmod_create_doc},
{"destroy", _PyCFunction_CAST(channelsmod_destroy), {"destroy", _PyCFunction_CAST(channelsmod_destroy),
METH_VARARGS | METH_KEYWORDS, channelsmod_destroy_doc}, METH_VARARGS | METH_KEYWORDS, channelsmod_destroy_doc},
{"list_all", channelsmod_list_all, {"list_all", channelsmod_list_all,
@ -3258,8 +3458,12 @@ static PyMethodDef module_functions[] = {
METH_VARARGS | METH_KEYWORDS, channelsmod_close_doc}, METH_VARARGS | METH_KEYWORDS, channelsmod_close_doc},
{"release", _PyCFunction_CAST(channelsmod_release), {"release", _PyCFunction_CAST(channelsmod_release),
METH_VARARGS | METH_KEYWORDS, channelsmod_release_doc}, METH_VARARGS | METH_KEYWORDS, channelsmod_release_doc},
{"get_count", _PyCFunction_CAST(channelsmod_get_count),
METH_VARARGS | METH_KEYWORDS, channelsmod_get_count_doc},
{"get_info", _PyCFunction_CAST(channelsmod_get_info), {"get_info", _PyCFunction_CAST(channelsmod_get_info),
METH_VARARGS | METH_KEYWORDS, channelsmod_get_info_doc}, METH_VARARGS | METH_KEYWORDS, channelsmod_get_info_doc},
{"get_channel_defaults", _PyCFunction_CAST(channelsmod_get_channel_defaults),
METH_VARARGS | METH_KEYWORDS, channelsmod_get_channel_defaults_doc},
{"_channel_id", _PyCFunction_CAST(channelsmod__channel_id), {"_channel_id", _PyCFunction_CAST(channelsmod__channel_id),
METH_VARARGS | METH_KEYWORDS, NULL}, METH_VARARGS | METH_KEYWORDS, NULL},
{"_register_end_types", _PyCFunction_CAST(channelsmod__register_end_types), {"_register_end_types", _PyCFunction_CAST(channelsmod__register_end_types),

View File

@ -9,7 +9,9 @@
#include "pycore_crossinterp.h" // struct _xid #include "pycore_crossinterp.h" // struct _xid
#define REGISTERS_HEAP_TYPES #define REGISTERS_HEAP_TYPES
#define HAS_UNBOUND_ITEMS
#include "_interpreters_common.h" #include "_interpreters_common.h"
#undef HAS_UNBOUND_ITEMS
#undef REGISTERS_HEAP_TYPES #undef REGISTERS_HEAP_TYPES
@ -58,20 +60,6 @@ _release_xid_data(_PyCrossInterpreterData *data, int flags)
return res; return res;
} }
static inline int64_t
_get_interpid(_PyCrossInterpreterData *data)
{
int64_t interpid;
if (data != NULL) {
interpid = _PyCrossInterpreterData_INTERPID(data);
assert(!PyErr_Occurred());
}
else {
interpid = PyInterpreterState_GetID(PyInterpreterState_Get());
}
return interpid;
}
static PyInterpreterState * static PyInterpreterState *
_get_current_interp(void) _get_current_interp(void)
{ {
@ -402,32 +390,6 @@ handle_queue_error(int err, PyObject *mod, int64_t qid)
} }
/* unbound items ************************************************************/
#define UNBOUND_REMOVE 1
#define UNBOUND_ERROR 2
#define UNBOUND_REPLACE 3
// It would also be possible to add UNBOUND_REPLACE where the replacement
// value is user-provided. There would be some limitations there, though.
// Another possibility would be something like UNBOUND_COPY, where the
// object is released but the underlying data is copied (with the "raw"
// allocator) and used when the item is popped off the queue.
static int
check_unbound(int unboundop)
{
switch (unboundop) {
case UNBOUND_REMOVE:
case UNBOUND_ERROR:
case UNBOUND_REPLACE:
return 1;
default:
return 0;
}
}
/* the basic queue **********************************************************/ /* the basic queue **********************************************************/
struct _queueitem; struct _queueitem;

View File

@ -19,3 +19,48 @@ clear_xid_class(PyTypeObject *cls)
return _PyCrossInterpreterData_UnregisterClass(cls); return _PyCrossInterpreterData_UnregisterClass(cls);
} }
#endif #endif
static inline int64_t
_get_interpid(_PyCrossInterpreterData *data)
{
int64_t interpid;
if (data != NULL) {
interpid = _PyCrossInterpreterData_INTERPID(data);
assert(!PyErr_Occurred());
}
else {
interpid = PyInterpreterState_GetID(PyInterpreterState_Get());
}
return interpid;
}
/* unbound items ************************************************************/
#ifdef HAS_UNBOUND_ITEMS
#define UNBOUND_REMOVE 1
#define UNBOUND_ERROR 2
#define UNBOUND_REPLACE 3
// It would also be possible to add UNBOUND_REPLACE where the replacement
// value is user-provided. There would be some limitations there, though.
// Another possibility would be something like UNBOUND_COPY, where the
// object is released but the underlying data is copied (with the "raw"
// allocator) and used when the item is popped off the queue.
static int
check_unbound(int unboundop)
{
switch (unboundop) {
case UNBOUND_REMOVE:
case UNBOUND_ERROR:
case UNBOUND_REPLACE:
return 1;
default:
return 0;
}
}
#endif