From 8b209fd4f8a9bf9603888bda2c44b5cfd4ebf47a Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Mon, 15 Jul 2024 13:43:59 -0600 Subject: [PATCH] gh-76785: Expand How Interpreter Channels Handle Interpreter Finalization (gh-121805) See 6b98b274b6 for an explanation of the problem and solution. Here I've applied the solution to channels. --- Lib/test/support/interpreters/_crossinterp.py | 102 ++++++ Lib/test/support/interpreters/channels.py | 110 +++++- Lib/test/support/interpreters/queues.py | 60 +-- Lib/test/test__interpchannels.py | 275 +++++++------- Lib/test/test_interpreters/test_channels.py | 222 +++++++++++ Lib/test/test_interpreters/test_queues.py | 4 +- Modules/_interpchannelsmodule.c | 344 ++++++++++++++---- Modules/_interpqueuesmodule.c | 42 +-- Modules/_interpreters_common.h | 45 +++ 9 files changed, 898 insertions(+), 306 deletions(-) create mode 100644 Lib/test/support/interpreters/_crossinterp.py diff --git a/Lib/test/support/interpreters/_crossinterp.py b/Lib/test/support/interpreters/_crossinterp.py new file mode 100644 index 00000000000..544e197ba4c --- /dev/null +++ b/Lib/test/support/interpreters/_crossinterp.py @@ -0,0 +1,102 @@ +"""Common code between queues and channels.""" + + +class ItemInterpreterDestroyed(Exception): + """Raised when trying to get an item whose interpreter was destroyed.""" + + +class classonly: + """A non-data descriptor that makes a value only visible on the class. + + This is like the "classmethod" builtin, but does not show up on + instances of the class. It may be used as a decorator. + """ + + def __init__(self, value): + self.value = value + self.getter = classmethod(value).__get__ + self.name = None + + def __set_name__(self, cls, name): + if self.name is not None: + raise TypeError('already used') + self.name = name + + def __get__(self, obj, cls): + if obj is not None: + raise AttributeError(self.name) + # called on the class + return self.getter(None, cls) + + +class UnboundItem: + """Represents a cross-interpreter item no longer bound to an interpreter. + + An item is unbound when the interpreter that added it to the + cross-interpreter container is destroyed. + """ + + __slots__ = () + + @classonly + def singleton(cls, kind, module, name='UNBOUND'): + doc = cls.__doc__.replace('cross-interpreter container', kind) + doc = doc.replace('cross-interpreter', kind) + subclass = type( + f'Unbound{kind.capitalize()}Item', + (cls,), + dict( + _MODULE=module, + _NAME=name, + __doc__=doc, + ), + ) + return object.__new__(subclass) + + _MODULE = __name__ + _NAME = 'UNBOUND' + + def __new__(cls): + raise Exception(f'use {cls._MODULE}.{cls._NAME}') + + def __repr__(self): + return f'{self._MODULE}.{self._NAME}' +# return f'interpreters.queues.UNBOUND' + + +UNBOUND = object.__new__(UnboundItem) +UNBOUND_ERROR = object() +UNBOUND_REMOVE = object() + +_UNBOUND_CONSTANT_TO_FLAG = { + UNBOUND_REMOVE: 1, + UNBOUND_ERROR: 2, + UNBOUND: 3, +} +_UNBOUND_FLAG_TO_CONSTANT = {v: k + for k, v in _UNBOUND_CONSTANT_TO_FLAG.items()} + + +def serialize_unbound(unbound): + op = unbound + try: + flag = _UNBOUND_CONSTANT_TO_FLAG[op] + except KeyError: + raise NotImplementedError(f'unsupported unbound replacement op {op!r}') + return flag, + + +def resolve_unbound(flag, exctype_destroyed): + try: + op = _UNBOUND_FLAG_TO_CONSTANT[flag] + except KeyError: + raise NotImplementedError(f'unsupported unbound replacement op {flag!r}') + if op is UNBOUND_REMOVE: + # "remove" not possible here + raise NotImplementedError + elif op is UNBOUND_ERROR: + raise exctype_destroyed("item's original interpreter destroyed") + elif op is UNBOUND: + return UNBOUND + else: + raise NotImplementedError(repr(op)) diff --git a/Lib/test/support/interpreters/channels.py b/Lib/test/support/interpreters/channels.py index fbae7e634cf..d2bd93d77f7 100644 --- a/Lib/test/support/interpreters/channels.py +++ b/Lib/test/support/interpreters/channels.py @@ -2,35 +2,68 @@ import time import _interpchannels as _channels +from . import _crossinterp # aliases: from _interpchannels import ( ChannelError, ChannelNotFoundError, ChannelClosedError, ChannelEmptyError, ChannelNotEmptyError, ) +from ._crossinterp import ( + UNBOUND_ERROR, UNBOUND_REMOVE, +) __all__ = [ + 'UNBOUND', 'UNBOUND_ERROR', 'UNBOUND_REMOVE', 'create', 'list_all', 'SendChannel', 'RecvChannel', 'ChannelError', 'ChannelNotFoundError', 'ChannelEmptyError', + 'ItemInterpreterDestroyed', ] -def create(): +class ItemInterpreterDestroyed(ChannelError, + _crossinterp.ItemInterpreterDestroyed): + """Raised from get() and get_nowait().""" + + +UNBOUND = _crossinterp.UnboundItem.singleton('queue', __name__) + + +def _serialize_unbound(unbound): + if unbound is UNBOUND: + unbound = _crossinterp.UNBOUND + return _crossinterp.serialize_unbound(unbound) + + +def _resolve_unbound(flag): + resolved = _crossinterp.resolve_unbound(flag, ItemInterpreterDestroyed) + if resolved is _crossinterp.UNBOUND: + resolved = UNBOUND + return resolved + + +def create(*, unbounditems=UNBOUND): """Return (recv, send) for a new cross-interpreter channel. The channel may be used to pass data safely between interpreters. + + "unbounditems" sets the default for the send end of the channel. + See SendChannel.send() for supported values. The default value + is UNBOUND, which replaces the unbound item when received. """ - cid = _channels.create() - recv, send = RecvChannel(cid), SendChannel(cid) + unbound = _serialize_unbound(unbounditems) + unboundop, = unbound + cid = _channels.create(unboundop) + recv, send = RecvChannel(cid), SendChannel(cid, _unbound=unbound) return recv, send def list_all(): """Return a list of (recv, send) for all open channels.""" - return [(RecvChannel(cid), SendChannel(cid)) - for cid in _channels.list_all()] + return [(RecvChannel(cid), SendChannel(cid, _unbound=unbound)) + for cid, unbound in _channels.list_all()] class _ChannelEnd: @@ -106,12 +139,15 @@ class RecvChannel(_ChannelEnd): if timeout < 0: raise ValueError(f'timeout value must be non-negative') end = time.time() + timeout - obj = _channels.recv(self._id, _sentinel) + obj, unboundop = _channels.recv(self._id, _sentinel) while obj is _sentinel: time.sleep(_delay) if timeout is not None and time.time() >= end: raise TimeoutError - obj = _channels.recv(self._id, _sentinel) + obj, unboundop = _channels.recv(self._id, _sentinel) + if unboundop is not None: + assert obj is None, repr(obj) + return _resolve_unbound(unboundop) return obj def recv_nowait(self, default=_NOT_SET): @@ -122,9 +158,13 @@ class RecvChannel(_ChannelEnd): is the same as recv(). """ if default is _NOT_SET: - return _channels.recv(self._id) + obj, unboundop = _channels.recv(self._id) else: - return _channels.recv(self._id, default) + obj, unboundop = _channels.recv(self._id, default) + if unboundop is not None: + assert obj is None, repr(obj) + return _resolve_unbound(unboundop) + return obj def close(self): _channels.close(self._id, recv=True) @@ -135,43 +175,79 @@ class SendChannel(_ChannelEnd): _end = 'send' + def __new__(cls, cid, *, _unbound=None): + if _unbound is None: + try: + op = _channels.get_channel_defaults(cid) + _unbound = (op,) + except ChannelNotFoundError: + _unbound = _serialize_unbound(UNBOUND) + self = super().__new__(cls, cid) + self._unbound = _unbound + return self + @property def is_closed(self): info = self._info return info.closed or info.closing - def send(self, obj, timeout=None): + def send(self, obj, timeout=None, *, + unbound=None, + ): """Send the object (i.e. its data) to the channel's receiving end. This blocks until the object is received. """ - _channels.send(self._id, obj, timeout=timeout, blocking=True) + if unbound is None: + unboundop, = self._unbound + else: + unboundop, = _serialize_unbound(unbound) + _channels.send(self._id, obj, unboundop, timeout=timeout, blocking=True) - def send_nowait(self, obj): + def send_nowait(self, obj, *, + unbound=None, + ): """Send the object to the channel's receiving end. If the object is immediately received then return True (else False). Otherwise this is the same as send(). """ + if unbound is None: + unboundop, = self._unbound + else: + unboundop, = _serialize_unbound(unbound) # XXX Note that at the moment channel_send() only ever returns # None. This should be fixed when channel_send_wait() is added. # See bpo-32604 and gh-19829. - return _channels.send(self._id, obj, blocking=False) + return _channels.send(self._id, obj, unboundop, blocking=False) - def send_buffer(self, obj, timeout=None): + def send_buffer(self, obj, timeout=None, *, + unbound=None, + ): """Send the object's buffer to the channel's receiving end. This blocks until the object is received. """ - _channels.send_buffer(self._id, obj, timeout=timeout, blocking=True) + if unbound is None: + unboundop, = self._unbound + else: + unboundop, = _serialize_unbound(unbound) + _channels.send_buffer(self._id, obj, unboundop, + timeout=timeout, blocking=True) - def send_buffer_nowait(self, obj): + def send_buffer_nowait(self, obj, *, + unbound=None, + ): """Send the object's buffer to the channel's receiving end. If the object is immediately received then return True (else False). Otherwise this is the same as send(). """ - return _channels.send_buffer(self._id, obj, blocking=False) + if unbound is None: + unboundop, = self._unbound + else: + unboundop, = _serialize_unbound(unbound) + return _channels.send_buffer(self._id, obj, unboundop, blocking=False) def close(self): _channels.close(self._id, send=True) diff --git a/Lib/test/support/interpreters/queues.py b/Lib/test/support/interpreters/queues.py index 402ceffd1bb..deb8e8613af 100644 --- a/Lib/test/support/interpreters/queues.py +++ b/Lib/test/support/interpreters/queues.py @@ -5,11 +5,15 @@ import queue import time import weakref import _interpqueues as _queues +from . import _crossinterp # aliases: from _interpqueues import ( QueueError, QueueNotFoundError, ) +from ._crossinterp import ( + UNBOUND_ERROR, UNBOUND_REMOVE, +) __all__ = [ 'UNBOUND', 'UNBOUND_ERROR', 'UNBOUND_REMOVE', @@ -34,7 +38,8 @@ class QueueFull(QueueError, queue.Full): """ -class ItemInterpreterDestroyed(QueueError): +class ItemInterpreterDestroyed(QueueError, + _crossinterp.ItemInterpreterDestroyed): """Raised from get() and get_nowait().""" @@ -42,57 +47,20 @@ _SHARED_ONLY = 0 _PICKLED = 1 -class UnboundItem: - """Represents a Queue item no longer bound to an interpreter. +UNBOUND = _crossinterp.UnboundItem.singleton('queue', __name__) - An item is unbound when the interpreter that added it to the queue - is destroyed. - """ - - __slots__ = () - - def __new__(cls): - return UNBOUND - - def __repr__(self): - return f'interpreters.queues.UNBOUND' - - -UNBOUND = object.__new__(UnboundItem) -UNBOUND_ERROR = object() -UNBOUND_REMOVE = object() - -_UNBOUND_CONSTANT_TO_FLAG = { - UNBOUND_REMOVE: 1, - UNBOUND_ERROR: 2, - UNBOUND: 3, -} -_UNBOUND_FLAG_TO_CONSTANT = {v: k - for k, v in _UNBOUND_CONSTANT_TO_FLAG.items()} def _serialize_unbound(unbound): - op = unbound - try: - flag = _UNBOUND_CONSTANT_TO_FLAG[op] - except KeyError: - raise NotImplementedError(f'unsupported unbound replacement op {op!r}') - return flag, + if unbound is UNBOUND: + unbound = _crossinterp.UNBOUND + return _crossinterp.serialize_unbound(unbound) def _resolve_unbound(flag): - try: - op = _UNBOUND_FLAG_TO_CONSTANT[flag] - except KeyError: - raise NotImplementedError(f'unsupported unbound replacement op {flag!r}') - if op is UNBOUND_REMOVE: - # "remove" not possible here - raise NotImplementedError - elif op is UNBOUND_ERROR: - raise ItemInterpreterDestroyed("item's original interpreter destroyed") - elif op is UNBOUND: - return UNBOUND - else: - raise NotImplementedError(repr(op)) + resolved = _crossinterp.resolve_unbound(flag, ItemInterpreterDestroyed) + if resolved is _crossinterp.UNBOUND: + resolved = UNBOUND + return resolved def create(maxsize=0, *, syncobj=False, unbounditems=UNBOUND): diff --git a/Lib/test/test__interpchannels.py b/Lib/test/test__interpchannels.py index b76c58917c0..4a7f04b9df9 100644 --- a/Lib/test/test__interpchannels.py +++ b/Lib/test/test__interpchannels.py @@ -8,6 +8,8 @@ import unittest from test.support import import_helper +_channels = import_helper.import_module('_interpchannels') +from test.support.interpreters import _crossinterp from test.test__interpreters import ( _interpreters, _run_output, @@ -15,7 +17,7 @@ from test.test__interpreters import ( ) -_channels = import_helper.import_module('_interpchannels') +REPLACE = _crossinterp._UNBOUND_CONSTANT_TO_FLAG[_crossinterp.UNBOUND] # Additional tests are found in Lib/test/test_interpreters/test_channels.py. @@ -29,9 +31,19 @@ _channels = import_helper.import_module('_interpchannels') def recv_wait(cid): while True: try: - return _channels.recv(cid) + obj, unboundop = _channels.recv(cid) except _channels.ChannelEmptyError: time.sleep(0.1) + else: + assert unboundop is None, repr(unboundop) + return obj + + +def recv_nowait(cid, *args, unbound=False): + obj, unboundop = _channels.recv(cid, *args) + assert (unboundop is None) != unbound, repr(unboundop) + return obj + #@contextmanager #def run_threaded(id, source, **shared): @@ -212,7 +224,7 @@ def _run_action(cid, action, end, state): else: raise Exception('expected ChannelEmptyError') else: - _channels.recv(cid) + recv_nowait(cid) return state.decr() else: raise ValueError(end) @@ -235,7 +247,7 @@ def _run_action(cid, action, end, state): def clean_up_channels(): - for cid in _channels.list_all(): + for cid, _ in _channels.list_all(): try: _channels.destroy(cid) except _channels.ChannelNotFoundError: @@ -297,7 +309,7 @@ class ChannelIDTests(TestBase): _channels._channel_id(10, send=False, recv=False) def test_does_not_exist(self): - cid = _channels.create() + cid = _channels.create(REPLACE) with self.assertRaises(_channels.ChannelNotFoundError): _channels._channel_id(int(cid) + 1) # unforced @@ -319,9 +331,9 @@ class ChannelIDTests(TestBase): self.assertEqual(repr(cid), 'ChannelID(10)') def test_equality(self): - cid1 = _channels.create() + cid1 = _channels.create(REPLACE) cid2 = _channels._channel_id(int(cid1)) - cid3 = _channels.create() + cid3 = _channels.create(REPLACE) self.assertTrue(cid1 == cid1) self.assertTrue(cid1 == cid2) @@ -341,11 +353,11 @@ class ChannelIDTests(TestBase): self.assertTrue(cid1 != cid3) def test_shareable(self): - chan = _channels.create() + chan = _channels.create(REPLACE) - obj = _channels.create() + obj = _channels.create(REPLACE) _channels.send(chan, obj, blocking=False) - got = _channels.recv(chan) + got = recv_nowait(chan) self.assertEqual(got, obj) self.assertIs(type(got), type(obj)) @@ -356,15 +368,15 @@ class ChannelIDTests(TestBase): class ChannelTests(TestBase): def test_create_cid(self): - cid = _channels.create() + cid = _channels.create(REPLACE) self.assertIsInstance(cid, _channels.ChannelID) def test_sequential_ids(self): - before = _channels.list_all() - id1 = _channels.create() - id2 = _channels.create() - id3 = _channels.create() - after = _channels.list_all() + before = [cid for cid, _ in _channels.list_all()] + id1 = _channels.create(REPLACE) + id2 = _channels.create(REPLACE) + id3 = _channels.create(REPLACE) + after = [cid for cid, _ in _channels.list_all()] self.assertEqual(id2, int(id1) + 1) self.assertEqual(id3, int(id2) + 1) @@ -374,7 +386,7 @@ class ChannelTests(TestBase): id1 = _interpreters.create() out = _run_output(id1, dedent(""" import _interpchannels as _channels - cid = _channels.create() + cid = _channels.create(3) print(cid) """)) cid1 = int(out.strip()) @@ -382,7 +394,7 @@ class ChannelTests(TestBase): id2 = _interpreters.create() out = _run_output(id2, dedent(""" import _interpchannels as _channels - cid = _channels.create() + cid = _channels.create(3) print(cid) """)) cid2 = int(out.strip()) @@ -392,7 +404,7 @@ class ChannelTests(TestBase): def test_channel_list_interpreters_none(self): """Test listing interpreters for a channel with no associations.""" # Test for channel with no associated _interpreters. - cid = _channels.create() + cid = _channels.create(REPLACE) send_interps = _channels.list_interpreters(cid, send=True) recv_interps = _channels.list_interpreters(cid, send=False) self.assertEqual(send_interps, []) @@ -401,7 +413,7 @@ class ChannelTests(TestBase): def test_channel_list_interpreters_basic(self): """Test basic listing channel _interpreters.""" interp0, *_ = _interpreters.get_main() - cid = _channels.create() + cid = _channels.create(REPLACE) _channels.send(cid, "send", blocking=False) # Test for a channel that has one end associated to an interpreter. send_interps = _channels.list_interpreters(cid, send=True) @@ -412,7 +424,7 @@ class ChannelTests(TestBase): interp1 = _interpreters.create() _run_output(interp1, dedent(f""" import _interpchannels as _channels - obj = _channels.recv({cid}) + _channels.recv({cid}) """)) # Test for channel that has both ends associated to an interpreter. send_interps = _channels.list_interpreters(cid, send=True) @@ -426,7 +438,7 @@ class ChannelTests(TestBase): interp1 = _interpreters.create() interp2 = _interpreters.create() interp3 = _interpreters.create() - cid = _channels.create() + cid = _channels.create(REPLACE) _channels.send(cid, "send", blocking=False) _run_output(interp1, dedent(f""" @@ -435,11 +447,11 @@ class ChannelTests(TestBase): """)) _run_output(interp2, dedent(f""" import _interpchannels as _channels - obj = _channels.recv({cid}) + _channels.recv({cid}) """)) _run_output(interp3, dedent(f""" import _interpchannels as _channels - obj = _channels.recv({cid}) + _channels.recv({cid}) """)) send_interps = _channels.list_interpreters(cid, send=True) recv_interps = _channels.list_interpreters(cid, send=False) @@ -450,11 +462,11 @@ class ChannelTests(TestBase): """Test listing channel interpreters with a destroyed interpreter.""" interp0, *_ = _interpreters.get_main() interp1 = _interpreters.create() - cid = _channels.create() + cid = _channels.create(REPLACE) _channels.send(cid, "send", blocking=False) _run_output(interp1, dedent(f""" import _interpchannels as _channels - obj = _channels.recv({cid}) + _channels.recv({cid}) """)) # Should be one interpreter associated with each end. send_interps = _channels.list_interpreters(cid, send=True) @@ -476,16 +488,16 @@ class ChannelTests(TestBase): interp0, *_ = _interpreters.get_main() interp1 = _interpreters.create() interp2 = _interpreters.create() - cid = _channels.create() + cid = _channels.create(REPLACE) _channels.send(cid, "data", blocking=False) _run_output(interp1, dedent(f""" import _interpchannels as _channels - obj = _channels.recv({cid}) + _channels.recv({cid}) """)) _channels.send(cid, "data", blocking=False) _run_output(interp2, dedent(f""" import _interpchannels as _channels - obj = _channels.recv({cid}) + _channels.recv({cid}) """)) # Check the setup. send_interps = _channels.list_interpreters(cid, send=True) @@ -516,7 +528,7 @@ class ChannelTests(TestBase): """Test listing channel interpreters with a closed channel.""" interp0, *_ = _interpreters.get_main() interp1 = _interpreters.create() - cid = _channels.create() + cid = _channels.create(REPLACE) # Put something in the channel so that it's not empty. _channels.send(cid, "send", blocking=False) @@ -538,7 +550,7 @@ class ChannelTests(TestBase): """Test listing channel interpreters with a channel's send end closed.""" interp0, *_ = _interpreters.get_main() interp1 = _interpreters.create() - cid = _channels.create() + cid = _channels.create(REPLACE) # Put something in the channel so that it's not empty. _channels.send(cid, "send", blocking=False) @@ -570,7 +582,7 @@ class ChannelTests(TestBase): _channels.list_interpreters(cid, send=False) def test_allowed_types(self): - cid = _channels.create() + cid = _channels.create(REPLACE) objects = [ None, 'spam', @@ -580,7 +592,7 @@ class ChannelTests(TestBase): for obj in objects: with self.subTest(obj): _channels.send(cid, obj, blocking=False) - got = _channels.recv(cid) + got = recv_nowait(cid) self.assertEqual(got, obj) self.assertIs(type(got), type(obj)) @@ -589,7 +601,7 @@ class ChannelTests(TestBase): # XXX What about between interpreters? def test_run_string_arg_unresolved(self): - cid = _channels.create() + cid = _channels.create(REPLACE) interp = _interpreters.create() _interpreters.set___main___attrs(interp, dict(cid=cid.send)) @@ -598,7 +610,7 @@ class ChannelTests(TestBase): print(cid.end) _channels.send(cid, b'spam', blocking=False) """)) - obj = _channels.recv(cid) + obj = recv_nowait(cid) self.assertEqual(obj, b'spam') self.assertEqual(out.strip(), 'send') @@ -608,7 +620,7 @@ class ChannelTests(TestBase): # Note: this test caused crashes on some buildbots (bpo-33615). @unittest.skip('disabled until high-level channels exist') def test_run_string_arg_resolved(self): - cid = _channels.create() + cid = _channels.create(REPLACE) cid = _channels._channel_id(cid, _resolve=True) interp = _interpreters.create() @@ -618,7 +630,7 @@ class ChannelTests(TestBase): _channels.send(chan.id, b'spam', blocking=False) """), dict(chan=cid.send)) - obj = _channels.recv(cid) + obj = recv_nowait(cid) self.assertEqual(obj, b'spam') self.assertEqual(out.strip(), 'send') @@ -627,10 +639,10 @@ class ChannelTests(TestBase): # send/recv def test_send_recv_main(self): - cid = _channels.create() + cid = _channels.create(REPLACE) orig = b'spam' _channels.send(cid, orig, blocking=False) - obj = _channels.recv(cid) + obj = recv_nowait(cid) self.assertEqual(obj, orig) self.assertIsNot(obj, orig) @@ -639,27 +651,27 @@ class ChannelTests(TestBase): id1 = _interpreters.create() out = _run_output(id1, dedent(""" import _interpchannels as _channels - cid = _channels.create() + cid = _channels.create(REPLACE) orig = b'spam' _channels.send(cid, orig, blocking=False) - obj = _channels.recv(cid) + obj, _ = _channels.recv(cid) assert obj is not orig assert obj == orig """)) def test_send_recv_different_interpreters(self): - cid = _channels.create() + cid = _channels.create(REPLACE) id1 = _interpreters.create() out = _run_output(id1, dedent(f""" import _interpchannels as _channels _channels.send({cid}, b'spam', blocking=False) """)) - obj = _channels.recv(cid) + obj = recv_nowait(cid) self.assertEqual(obj, b'spam') def test_send_recv_different_threads(self): - cid = _channels.create() + cid = _channels.create(REPLACE) def f(): obj = recv_wait(cid) @@ -674,7 +686,7 @@ class ChannelTests(TestBase): self.assertEqual(obj, b'spam') def test_send_recv_different_interpreters_and_threads(self): - cid = _channels.create() + cid = _channels.create(REPLACE) id1 = _interpreters.create() out = None @@ -685,7 +697,7 @@ class ChannelTests(TestBase): import _interpchannels as _channels while True: try: - obj = _channels.recv({cid}) + obj, _ = _channels.recv({cid}) break except _channels.ChannelEmptyError: time.sleep(0.1) @@ -710,23 +722,23 @@ class ChannelTests(TestBase): _channels.recv(10) def test_recv_empty(self): - cid = _channels.create() + cid = _channels.create(REPLACE) with self.assertRaises(_channels.ChannelEmptyError): _channels.recv(cid) def test_recv_default(self): default = object() - cid = _channels.create() - obj1 = _channels.recv(cid, default) + cid = _channels.create(REPLACE) + obj1 = recv_nowait(cid, default) _channels.send(cid, None, blocking=False) _channels.send(cid, 1, blocking=False) _channels.send(cid, b'spam', blocking=False) _channels.send(cid, b'eggs', blocking=False) - obj2 = _channels.recv(cid, default) - obj3 = _channels.recv(cid, default) - obj4 = _channels.recv(cid) - obj5 = _channels.recv(cid, default) - obj6 = _channels.recv(cid, default) + obj2 = recv_nowait(cid, default) + obj3 = recv_nowait(cid, default) + obj4 = recv_nowait(cid) + obj5 = recv_nowait(cid, default) + obj6 = recv_nowait(cid, default) self.assertIs(obj1, default) self.assertIs(obj2, None) @@ -737,7 +749,7 @@ class ChannelTests(TestBase): def test_recv_sending_interp_destroyed(self): with self.subTest('closed'): - cid1 = _channels.create() + cid1 = _channels.create(REPLACE) interp = _interpreters.create() _interpreters.run_string(interp, dedent(f""" import _interpchannels as _channels @@ -750,7 +762,7 @@ class ChannelTests(TestBase): _channels.recv(cid1) del cid1 with self.subTest('still open'): - cid2 = _channels.create() + cid2 = _channels.create(REPLACE) interp = _interpreters.create() _interpreters.run_string(interp, dedent(f""" import _interpchannels as _channels @@ -759,7 +771,8 @@ class ChannelTests(TestBase): _channels.send(cid2, b'eggs', blocking=False) _interpreters.destroy(interp) - _channels.recv(cid2) + recv_nowait(cid2, unbound=True) + recv_nowait(cid2, unbound=False) with self.assertRaisesRegex(RuntimeError, f'channel {cid2} is empty'): _channels.recv(cid2) @@ -770,9 +783,9 @@ class ChannelTests(TestBase): def test_send_buffer(self): buf = bytearray(b'spamspamspam') - cid = _channels.create() + cid = _channels.create(REPLACE) _channels.send_buffer(cid, buf, blocking=False) - obj = _channels.recv(cid) + obj = recv_nowait(cid) self.assertIsNot(obj, buf) self.assertIsInstance(obj, memoryview) @@ -794,12 +807,12 @@ class ChannelTests(TestBase): else: send = _channels.send - cid = _channels.create() + cid = _channels.create(REPLACE) try: started = time.monotonic() send(cid, obj, blocking=False) stopped = time.monotonic() - _channels.recv(cid) + recv_nowait(cid) finally: _channels.destroy(cid) delay = stopped - started # seconds @@ -813,7 +826,7 @@ class ChannelTests(TestBase): received = None obj = b'spam' wait = self.build_send_waiter(obj) - cid = _channels.create() + cid = _channels.create(REPLACE) def f(): nonlocal received wait() @@ -829,7 +842,7 @@ class ChannelTests(TestBase): received = None obj = bytearray(b'spam') wait = self.build_send_waiter(obj, buffer=True) - cid = _channels.create() + cid = _channels.create(REPLACE) def f(): nonlocal received wait() @@ -844,7 +857,7 @@ class ChannelTests(TestBase): def test_send_blocking_no_wait(self): received = None obj = b'spam' - cid = _channels.create() + cid = _channels.create(REPLACE) def f(): nonlocal received received = recv_wait(cid) @@ -858,7 +871,7 @@ class ChannelTests(TestBase): def test_send_buffer_blocking_no_wait(self): received = None obj = bytearray(b'spam') - cid = _channels.create() + cid = _channels.create(REPLACE) def f(): nonlocal received received = recv_wait(cid) @@ -873,20 +886,20 @@ class ChannelTests(TestBase): obj = b'spam' with self.subTest('non-blocking with timeout'): - cid = _channels.create() + cid = _channels.create(REPLACE) with self.assertRaises(ValueError): _channels.send(cid, obj, blocking=False, timeout=0.1) with self.subTest('timeout hit'): - cid = _channels.create() + cid = _channels.create(REPLACE) with self.assertRaises(TimeoutError): _channels.send(cid, obj, blocking=True, timeout=0.1) with self.assertRaises(_channels.ChannelEmptyError): - received = _channels.recv(cid) + received = recv_nowait(cid) print(repr(received)) with self.subTest('timeout not hit'): - cid = _channels.create() + cid = _channels.create(REPLACE) def f(): recv_wait(cid) t = threading.Thread(target=f) @@ -910,20 +923,20 @@ class ChannelTests(TestBase): obj = bytearray(b'spam') with self.subTest('non-blocking with timeout'): - cid = _channels.create() + cid = _channels.create(REPLACE) with self.assertRaises(ValueError): _channels.send_buffer(cid, obj, blocking=False, timeout=0.1) with self.subTest('timeout hit'): - cid = _channels.create() + cid = _channels.create(REPLACE) with self.assertRaises(TimeoutError): _channels.send_buffer(cid, obj, blocking=True, timeout=0.1) with self.assertRaises(_channels.ChannelEmptyError): - received = _channels.recv(cid) + received = recv_nowait(cid) print(repr(received)) with self.subTest('timeout not hit'): - cid = _channels.create() + cid = _channels.create(REPLACE) def f(): recv_wait(cid) t = threading.Thread(target=f) @@ -936,7 +949,7 @@ class ChannelTests(TestBase): wait = self.build_send_waiter(obj) with self.subTest('without timeout'): - cid = _channels.create() + cid = _channels.create(REPLACE) def f(): wait() _channels.close(cid, force=True) @@ -947,7 +960,7 @@ class ChannelTests(TestBase): t.join() with self.subTest('with timeout'): - cid = _channels.create() + cid = _channels.create(REPLACE) def f(): wait() _channels.close(cid, force=True) @@ -974,7 +987,7 @@ class ChannelTests(TestBase): wait = self.build_send_waiter(obj, buffer=True) with self.subTest('without timeout'): - cid = _channels.create() + cid = _channels.create(REPLACE) def f(): wait() _channels.close(cid, force=True) @@ -985,7 +998,7 @@ class ChannelTests(TestBase): t.join() with self.subTest('with timeout'): - cid = _channels.create() + cid = _channels.create(REPLACE) def f(): wait() _channels.close(cid, force=True) @@ -999,9 +1012,9 @@ class ChannelTests(TestBase): # close def test_close_single_user(self): - cid = _channels.create() + cid = _channels.create(REPLACE) _channels.send(cid, b'spam', blocking=False) - _channels.recv(cid) + recv_nowait(cid) _channels.close(cid) with self.assertRaises(_channels.ChannelClosedError): @@ -1010,7 +1023,7 @@ class ChannelTests(TestBase): _channels.recv(cid) def test_close_multiple_users(self): - cid = _channels.create() + cid = _channels.create(REPLACE) id1 = _interpreters.create() id2 = _interpreters.create() _interpreters.run_string(id1, dedent(f""" @@ -1034,9 +1047,9 @@ class ChannelTests(TestBase): self.assertEqual(excsnap.type.__name__, 'ChannelClosedError') def test_close_multiple_times(self): - cid = _channels.create() + cid = _channels.create(REPLACE) _channels.send(cid, b'spam', blocking=False) - _channels.recv(cid) + recv_nowait(cid) _channels.close(cid) with self.assertRaises(_channels.ChannelClosedError): @@ -1051,9 +1064,9 @@ class ChannelTests(TestBase): ] for send, recv in tests: with self.subTest((send, recv)): - cid = _channels.create() + cid = _channels.create(REPLACE) _channels.send(cid, b'spam', blocking=False) - _channels.recv(cid) + recv_nowait(cid) _channels.close(cid, send=send, recv=recv) with self.assertRaises(_channels.ChannelClosedError): @@ -1062,56 +1075,56 @@ class ChannelTests(TestBase): _channels.recv(cid) def test_close_defaults_with_unused_items(self): - cid = _channels.create() + cid = _channels.create(REPLACE) _channels.send(cid, b'spam', blocking=False) _channels.send(cid, b'ham', blocking=False) with self.assertRaises(_channels.ChannelNotEmptyError): _channels.close(cid) - _channels.recv(cid) + recv_nowait(cid) _channels.send(cid, b'eggs', blocking=False) def test_close_recv_with_unused_items_unforced(self): - cid = _channels.create() + cid = _channels.create(REPLACE) _channels.send(cid, b'spam', blocking=False) _channels.send(cid, b'ham', blocking=False) with self.assertRaises(_channels.ChannelNotEmptyError): _channels.close(cid, recv=True) - _channels.recv(cid) + recv_nowait(cid) _channels.send(cid, b'eggs', blocking=False) - _channels.recv(cid) - _channels.recv(cid) + recv_nowait(cid) + recv_nowait(cid) _channels.close(cid, recv=True) def test_close_send_with_unused_items_unforced(self): - cid = _channels.create() + cid = _channels.create(REPLACE) _channels.send(cid, b'spam', blocking=False) _channels.send(cid, b'ham', blocking=False) _channels.close(cid, send=True) with self.assertRaises(_channels.ChannelClosedError): _channels.send(cid, b'eggs') - _channels.recv(cid) - _channels.recv(cid) + recv_nowait(cid) + recv_nowait(cid) with self.assertRaises(_channels.ChannelClosedError): _channels.recv(cid) def test_close_both_with_unused_items_unforced(self): - cid = _channels.create() + cid = _channels.create(REPLACE) _channels.send(cid, b'spam', blocking=False) _channels.send(cid, b'ham', blocking=False) with self.assertRaises(_channels.ChannelNotEmptyError): _channels.close(cid, recv=True, send=True) - _channels.recv(cid) + recv_nowait(cid) _channels.send(cid, b'eggs', blocking=False) - _channels.recv(cid) - _channels.recv(cid) + recv_nowait(cid) + recv_nowait(cid) _channels.close(cid, recv=True) def test_close_recv_with_unused_items_forced(self): - cid = _channels.create() + cid = _channels.create(REPLACE) _channels.send(cid, b'spam', blocking=False) _channels.send(cid, b'ham', blocking=False) _channels.close(cid, recv=True, force=True) @@ -1122,7 +1135,7 @@ class ChannelTests(TestBase): _channels.recv(cid) def test_close_send_with_unused_items_forced(self): - cid = _channels.create() + cid = _channels.create(REPLACE) _channels.send(cid, b'spam', blocking=False) _channels.send(cid, b'ham', blocking=False) _channels.close(cid, send=True, force=True) @@ -1133,7 +1146,7 @@ class ChannelTests(TestBase): _channels.recv(cid) def test_close_both_with_unused_items_forced(self): - cid = _channels.create() + cid = _channels.create(REPLACE) _channels.send(cid, b'spam', blocking=False) _channels.send(cid, b'ham', blocking=False) _channels.close(cid, send=True, recv=True, force=True) @@ -1144,7 +1157,7 @@ class ChannelTests(TestBase): _channels.recv(cid) def test_close_never_used(self): - cid = _channels.create() + cid = _channels.create(REPLACE) _channels.close(cid) with self.assertRaises(_channels.ChannelClosedError): @@ -1153,7 +1166,7 @@ class ChannelTests(TestBase): _channels.recv(cid) def test_close_by_unassociated_interp(self): - cid = _channels.create() + cid = _channels.create(REPLACE) _channels.send(cid, b'spam', blocking=False) interp = _interpreters.create() _interpreters.run_string(interp, dedent(f""" @@ -1166,11 +1179,11 @@ class ChannelTests(TestBase): _channels.close(cid) def test_close_used_multiple_times_by_single_user(self): - cid = _channels.create() + cid = _channels.create(REPLACE) _channels.send(cid, b'spam', blocking=False) _channels.send(cid, b'spam', blocking=False) _channels.send(cid, b'spam', blocking=False) - _channels.recv(cid) + recv_nowait(cid) _channels.close(cid, force=True) with self.assertRaises(_channels.ChannelClosedError): @@ -1179,7 +1192,7 @@ class ChannelTests(TestBase): _channels.recv(cid) def test_channel_list_interpreters_invalid_channel(self): - cid = _channels.create() + cid = _channels.create(REPLACE) # Test for invalid channel ID. with self.assertRaises(_channels.ChannelNotFoundError): _channels.list_interpreters(1000, send=True) @@ -1191,7 +1204,7 @@ class ChannelTests(TestBase): def test_channel_list_interpreters_invalid_args(self): # Tests for invalid arguments passed to the API. - cid = _channels.create() + cid = _channels.create(REPLACE) with self.assertRaises(TypeError): _channels.list_interpreters(cid) @@ -1240,9 +1253,9 @@ class ChannelReleaseTests(TestBase): """ def test_single_user(self): - cid = _channels.create() + cid = _channels.create(REPLACE) _channels.send(cid, b'spam', blocking=False) - _channels.recv(cid) + recv_nowait(cid) _channels.release(cid, send=True, recv=True) with self.assertRaises(_channels.ChannelClosedError): @@ -1251,7 +1264,7 @@ class ChannelReleaseTests(TestBase): _channels.recv(cid) def test_multiple_users(self): - cid = _channels.create() + cid = _channels.create(REPLACE) id1 = _interpreters.create() id2 = _interpreters.create() _interpreters.run_string(id1, dedent(f""" @@ -1260,7 +1273,7 @@ class ChannelReleaseTests(TestBase): """)) out = _run_output(id2, dedent(f""" import _interpchannels as _channels - obj = _channels.recv({cid}) + obj, _ = _channels.recv({cid}) _channels.release({cid}) print(repr(obj)) """)) @@ -1271,9 +1284,9 @@ class ChannelReleaseTests(TestBase): self.assertEqual(out.strip(), "b'spam'") def test_no_kwargs(self): - cid = _channels.create() + cid = _channels.create(REPLACE) _channels.send(cid, b'spam', blocking=False) - _channels.recv(cid) + recv_nowait(cid) _channels.release(cid) with self.assertRaises(_channels.ChannelClosedError): @@ -1282,16 +1295,16 @@ class ChannelReleaseTests(TestBase): _channels.recv(cid) def test_multiple_times(self): - cid = _channels.create() + cid = _channels.create(REPLACE) _channels.send(cid, b'spam', blocking=False) - _channels.recv(cid) + recv_nowait(cid) _channels.release(cid, send=True, recv=True) with self.assertRaises(_channels.ChannelClosedError): _channels.release(cid, send=True, recv=True) def test_with_unused_items(self): - cid = _channels.create() + cid = _channels.create(REPLACE) _channels.send(cid, b'spam', blocking=False) _channels.send(cid, b'ham', blocking=False) _channels.release(cid, send=True, recv=True) @@ -1300,7 +1313,7 @@ class ChannelReleaseTests(TestBase): _channels.recv(cid) def test_never_used(self): - cid = _channels.create() + cid = _channels.create(REPLACE) _channels.release(cid) with self.assertRaises(_channels.ChannelClosedError): @@ -1309,14 +1322,14 @@ class ChannelReleaseTests(TestBase): _channels.recv(cid) def test_by_unassociated_interp(self): - cid = _channels.create() + cid = _channels.create(REPLACE) _channels.send(cid, b'spam', blocking=False) interp = _interpreters.create() _interpreters.run_string(interp, dedent(f""" import _interpchannels as _channels _channels.release({cid}) """)) - obj = _channels.recv(cid) + obj = recv_nowait(cid) _channels.release(cid) with self.assertRaises(_channels.ChannelClosedError): @@ -1325,7 +1338,7 @@ class ChannelReleaseTests(TestBase): def test_close_if_unassociated(self): # XXX Something's not right with this test... - cid = _channels.create() + cid = _channels.create(REPLACE) interp = _interpreters.create() _interpreters.run_string(interp, dedent(f""" import _interpchannels as _channels @@ -1338,21 +1351,21 @@ class ChannelReleaseTests(TestBase): def test_partially(self): # XXX Is partial close too weird/confusing? - cid = _channels.create() + cid = _channels.create(REPLACE) _channels.send(cid, None, blocking=False) - _channels.recv(cid) + recv_nowait(cid) _channels.send(cid, b'spam', blocking=False) _channels.release(cid, send=True) - obj = _channels.recv(cid) + obj = recv_nowait(cid) self.assertEqual(obj, b'spam') def test_used_multiple_times_by_single_user(self): - cid = _channels.create() + cid = _channels.create(REPLACE) _channels.send(cid, b'spam', blocking=False) _channels.send(cid, b'spam', blocking=False) _channels.send(cid, b'spam', blocking=False) - _channels.recv(cid) + recv_nowait(cid) _channels.release(cid, send=True, recv=True) with self.assertRaises(_channels.ChannelClosedError): @@ -1428,9 +1441,9 @@ class ChannelCloseFixture(namedtuple('ChannelCloseFixture', def _new_channel(self, creator): if creator.name == 'main': - return _channels.create() + return _channels.create(REPLACE) else: - ch = _channels.create() + ch = _channels.create(REPLACE) run_interp(creator.id, f""" import _interpreters cid = _xxsubchannels.create() @@ -1439,7 +1452,7 @@ class ChannelCloseFixture(namedtuple('ChannelCloseFixture', _xxsubchannels.send({ch}, int(cid), blocking=False) del _interpreters """) - self._cid = _channels.recv(ch) + self._cid = recv_nowait(ch) return self._cid def _get_interpreter(self, interp): @@ -1657,7 +1670,7 @@ class ExhaustiveChannelTests(TestBase): ) fix.record_action(action, result) else: - _cid = _channels.create() + _cid = _channels.create(REPLACE) run_interp(interp.id, f""" result = helpers.run_action( {fix.cid}, @@ -1670,8 +1683,8 @@ class ExhaustiveChannelTests(TestBase): _channels.send({_cid}, b'X' if result.closed else b'', blocking=False) """) result = ChannelState( - pending=int.from_bytes(_channels.recv(_cid), 'little'), - closed=bool(_channels.recv(_cid)), + pending=int.from_bytes(recv_nowait(_cid), 'little'), + closed=bool(recv_nowait(_cid)), ) fix.record_action(action, result) @@ -1729,7 +1742,7 @@ class ExhaustiveChannelTests(TestBase): self.assertTrue(fix.state.closed) for _ in range(fix.state.pending): - _channels.recv(fix.cid) + recv_nowait(fix.cid) self._assert_closed_in_interp(fix) for interp in ('same', 'other'): diff --git a/Lib/test/test_interpreters/test_channels.py b/Lib/test/test_interpreters/test_channels.py index 6c37754142e..eada18f99d0 100644 --- a/Lib/test/test_interpreters/test_channels.py +++ b/Lib/test/test_interpreters/test_channels.py @@ -372,6 +372,228 @@ class TestSendRecv(TestBase): obj[4:8] = b'ham.' self.assertEqual(obj, buf) + def test_send_cleared_with_subinterpreter(self): + def common(rch, sch, unbound=None, presize=0): + if not unbound: + extraargs = '' + elif unbound is channels.UNBOUND: + extraargs = ', unbound=channels.UNBOUND' + elif unbound is channels.UNBOUND_ERROR: + extraargs = ', unbound=channels.UNBOUND_ERROR' + elif unbound is channels.UNBOUND_REMOVE: + extraargs = ', unbound=channels.UNBOUND_REMOVE' + else: + raise NotImplementedError(repr(unbound)) + interp = interpreters.create() + + _run_output(interp, dedent(f""" + from test.support.interpreters import channels + sch = channels.SendChannel({sch.id}) + obj1 = b'spam' + obj2 = b'eggs' + sch.send_nowait(obj1{extraargs}) + sch.send_nowait(obj2{extraargs}) + """)) + self.assertEqual( + _channels.get_count(rch.id), + presize + 2, + ) + + if presize == 0: + obj1 = rch.recv() + self.assertEqual(obj1, b'spam') + self.assertEqual( + _channels.get_count(rch.id), + presize + 1, + ) + + return interp + + with self.subTest('default'): # UNBOUND + rch, sch = channels.create() + interp = common(rch, sch) + del interp + self.assertEqual(_channels.get_count(rch.id), 1) + obj1 = rch.recv() + self.assertEqual(_channels.get_count(rch.id), 0) + self.assertIs(obj1, channels.UNBOUND) + self.assertEqual(_channels.get_count(rch.id), 0) + with self.assertRaises(channels.ChannelEmptyError): + rch.recv_nowait() + + with self.subTest('UNBOUND'): + rch, sch = channels.create() + interp = common(rch, sch, channels.UNBOUND) + del interp + self.assertEqual(_channels.get_count(rch.id), 1) + obj1 = rch.recv() + self.assertIs(obj1, channels.UNBOUND) + self.assertEqual(_channels.get_count(rch.id), 0) + with self.assertRaises(channels.ChannelEmptyError): + rch.recv_nowait() + + with self.subTest('UNBOUND_ERROR'): + rch, sch = channels.create() + interp = common(rch, sch, channels.UNBOUND_ERROR) + + del interp + self.assertEqual(_channels.get_count(rch.id), 1) + with self.assertRaises(channels.ItemInterpreterDestroyed): + rch.recv() + + self.assertEqual(_channels.get_count(rch.id), 0) + with self.assertRaises(channels.ChannelEmptyError): + rch.recv_nowait() + + with self.subTest('UNBOUND_REMOVE'): + rch, sch = channels.create() + + interp = common(rch, sch, channels.UNBOUND_REMOVE) + del interp + self.assertEqual(_channels.get_count(rch.id), 0) + with self.assertRaises(channels.ChannelEmptyError): + rch.recv_nowait() + + sch.send_nowait(b'ham', unbound=channels.UNBOUND_REMOVE) + self.assertEqual(_channels.get_count(rch.id), 1) + interp = common(rch, sch, channels.UNBOUND_REMOVE, 1) + self.assertEqual(_channels.get_count(rch.id), 3) + sch.send_nowait(42, unbound=channels.UNBOUND_REMOVE) + self.assertEqual(_channels.get_count(rch.id), 4) + del interp + self.assertEqual(_channels.get_count(rch.id), 2) + obj1 = rch.recv() + obj2 = rch.recv() + self.assertEqual(obj1, b'ham') + self.assertEqual(obj2, 42) + self.assertEqual(_channels.get_count(rch.id), 0) + with self.assertRaises(channels.ChannelEmptyError): + rch.recv_nowait() + + def test_send_cleared_with_subinterpreter_mixed(self): + rch, sch = channels.create() + interp = interpreters.create() + + # If we don't associate the main interpreter with the channel + # then the channel will be automatically closed when interp + # is destroyed. + sch.send_nowait(None) + rch.recv() + self.assertEqual(_channels.get_count(rch.id), 0) + + _run_output(interp, dedent(f""" + from test.support.interpreters import channels + sch = channels.SendChannel({sch.id}) + sch.send_nowait(1, unbound=channels.UNBOUND) + sch.send_nowait(2, unbound=channels.UNBOUND_ERROR) + sch.send_nowait(3) + sch.send_nowait(4, unbound=channels.UNBOUND_REMOVE) + sch.send_nowait(5, unbound=channels.UNBOUND) + """)) + self.assertEqual(_channels.get_count(rch.id), 5) + + del interp + self.assertEqual(_channels.get_count(rch.id), 4) + + obj1 = rch.recv() + self.assertIs(obj1, channels.UNBOUND) + self.assertEqual(_channels.get_count(rch.id), 3) + + with self.assertRaises(channels.ItemInterpreterDestroyed): + rch.recv() + self.assertEqual(_channels.get_count(rch.id), 2) + + obj2 = rch.recv() + self.assertIs(obj2, channels.UNBOUND) + self.assertEqual(_channels.get_count(rch.id), 1) + + obj3 = rch.recv() + self.assertIs(obj3, channels.UNBOUND) + self.assertEqual(_channels.get_count(rch.id), 0) + + def test_send_cleared_with_subinterpreter_multiple(self): + rch, sch = channels.create() + interp1 = interpreters.create() + interp2 = interpreters.create() + + sch.send_nowait(1) + _run_output(interp1, dedent(f""" + from test.support.interpreters import channels + rch = channels.RecvChannel({rch.id}) + sch = channels.SendChannel({sch.id}) + obj1 = rch.recv() + sch.send_nowait(2, unbound=channels.UNBOUND) + sch.send_nowait(obj1, unbound=channels.UNBOUND_REMOVE) + """)) + _run_output(interp2, dedent(f""" + from test.support.interpreters import channels + rch = channels.RecvChannel({rch.id}) + sch = channels.SendChannel({sch.id}) + obj2 = rch.recv() + obj1 = rch.recv() + """)) + self.assertEqual(_channels.get_count(rch.id), 0) + sch.send_nowait(3) + _run_output(interp1, dedent(""" + sch.send_nowait(4, unbound=channels.UNBOUND) + # interp closed here + sch.send_nowait(5, unbound=channels.UNBOUND_REMOVE) + sch.send_nowait(6, unbound=channels.UNBOUND) + """)) + _run_output(interp2, dedent(""" + sch.send_nowait(7, unbound=channels.UNBOUND_ERROR) + # interp closed here + sch.send_nowait(obj1, unbound=channels.UNBOUND_ERROR) + sch.send_nowait(obj2, unbound=channels.UNBOUND_REMOVE) + sch.send_nowait(8, unbound=channels.UNBOUND) + """)) + _run_output(interp1, dedent(""" + sch.send_nowait(9, unbound=channels.UNBOUND_REMOVE) + sch.send_nowait(10, unbound=channels.UNBOUND) + """)) + self.assertEqual(_channels.get_count(rch.id), 10) + + obj3 = rch.recv() + self.assertEqual(obj3, 3) + self.assertEqual(_channels.get_count(rch.id), 9) + + obj4 = rch.recv() + self.assertEqual(obj4, 4) + self.assertEqual(_channels.get_count(rch.id), 8) + + del interp1 + self.assertEqual(_channels.get_count(rch.id), 6) + + # obj5 was removed + + obj6 = rch.recv() + self.assertIs(obj6, channels.UNBOUND) + self.assertEqual(_channels.get_count(rch.id), 5) + + obj7 = rch.recv() + self.assertEqual(obj7, 7) + self.assertEqual(_channels.get_count(rch.id), 4) + + del interp2 + self.assertEqual(_channels.get_count(rch.id), 3) + + # obj1 + with self.assertRaises(channels.ItemInterpreterDestroyed): + rch.recv() + self.assertEqual(_channels.get_count(rch.id), 2) + + # obj2 was removed + + obj8 = rch.recv() + self.assertIs(obj8, channels.UNBOUND) + self.assertEqual(_channels.get_count(rch.id), 1) + + # obj9 was removed + + obj10 = rch.recv() + self.assertIs(obj10, channels.UNBOUND) + self.assertEqual(_channels.get_count(rch.id), 0) + if __name__ == '__main__': # Test needs to be a package, so we can do relative imports. diff --git a/Lib/test/test_interpreters/test_queues.py b/Lib/test/test_interpreters/test_queues.py index 30d58a5b291..18f83d097eb 100644 --- a/Lib/test/test_interpreters/test_queues.py +++ b/Lib/test/test_interpreters/test_queues.py @@ -8,11 +8,11 @@ from test.support import import_helper, Py_DEBUG # Raise SkipTest if subinterpreters not supported. _queues = import_helper.import_module('_interpqueues') from test.support import interpreters -from test.support.interpreters import queues +from test.support.interpreters import queues, _crossinterp from .utils import _run_output, TestBase as _TestBase -REPLACE = queues._UNBOUND_CONSTANT_TO_FLAG[queues.UNBOUND] +REPLACE = _crossinterp._UNBOUND_CONSTANT_TO_FLAG[_crossinterp.UNBOUND] def get_num_queues(): diff --git a/Modules/_interpchannelsmodule.c b/Modules/_interpchannelsmodule.c index f0447475c49..a8b4a8d76b0 100644 --- a/Modules/_interpchannelsmodule.c +++ b/Modules/_interpchannelsmodule.c @@ -18,7 +18,9 @@ #endif #define REGISTERS_HEAP_TYPES +#define HAS_UNBOUND_ITEMS #include "_interpreters_common.h" +#undef HAS_UNBOUND_ITEMS #undef REGISTERS_HEAP_TYPES @@ -511,8 +513,14 @@ _waiting_finish_releasing(_waiting_t *waiting) struct _channelitem; typedef struct _channelitem { + /* The interpreter that added the item to the queue. + The actual bound interpid is found in item->data. + This is necessary because item->data might be NULL, + meaning the interpreter has been destroyed. */ + int64_t interpid; _PyCrossInterpreterData *data; _waiting_t *waiting; + int unboundop; struct _channelitem *next; } _channelitem; @@ -524,11 +532,22 @@ _channelitem_ID(_channelitem *item) static void _channelitem_init(_channelitem *item, - _PyCrossInterpreterData *data, _waiting_t *waiting) + int64_t interpid, _PyCrossInterpreterData *data, + _waiting_t *waiting, int unboundop) { + if (interpid < 0) { + interpid = _get_interpid(data); + } + else { + assert(data == NULL + || _PyCrossInterpreterData_INTERPID(data) < 0 + || interpid == _PyCrossInterpreterData_INTERPID(data)); + } *item = (_channelitem){ + .interpid = interpid, .data = data, .waiting = waiting, + .unboundop = unboundop, }; if (waiting != NULL) { waiting->itemid = _channelitem_ID(item); @@ -536,17 +555,15 @@ _channelitem_init(_channelitem *item, } static void -_channelitem_clear(_channelitem *item) +_channelitem_clear_data(_channelitem *item, int removed) { - item->next = NULL; - if (item->data != NULL) { // It was allocated in channel_send(). (void)_release_xid_data(item->data, XID_IGNORE_EXC & XID_FREE); item->data = NULL; } - if (item->waiting != NULL) { + if (item->waiting != NULL && removed) { if (item->waiting->status == WAITING_ACQUIRED) { _waiting_release(item->waiting, 0); } @@ -554,15 +571,23 @@ _channelitem_clear(_channelitem *item) } } +static void +_channelitem_clear(_channelitem *item) +{ + item->next = NULL; + _channelitem_clear_data(item, 1); +} + static _channelitem * -_channelitem_new(_PyCrossInterpreterData *data, _waiting_t *waiting) +_channelitem_new(int64_t interpid, _PyCrossInterpreterData *data, + _waiting_t *waiting, int unboundop) { _channelitem *item = GLOBAL_MALLOC(_channelitem); if (item == NULL) { PyErr_NoMemory(); return NULL; } - _channelitem_init(item, data, waiting); + _channelitem_init(item, interpid, data, waiting, unboundop); return item; } @@ -585,17 +610,48 @@ _channelitem_free_all(_channelitem *item) static void _channelitem_popped(_channelitem *item, - _PyCrossInterpreterData **p_data, _waiting_t **p_waiting) + _PyCrossInterpreterData **p_data, _waiting_t **p_waiting, + int *p_unboundop) { assert(item->waiting == NULL || item->waiting->status == WAITING_ACQUIRED); *p_data = item->data; *p_waiting = item->waiting; + *p_unboundop = item->unboundop; // We clear them here, so they won't be released in _channelitem_clear(). item->data = NULL; item->waiting = NULL; _channelitem_free(item); } +static int +_channelitem_clear_interpreter(_channelitem *item) +{ + assert(item->interpid >= 0); + if (item->data == NULL) { + // Its interpreter was already cleared (or it was never bound). + // For UNBOUND_REMOVE it should have been freed at that time. + assert(item->unboundop != UNBOUND_REMOVE); + return 0; + } + assert(_PyCrossInterpreterData_INTERPID(item->data) == item->interpid); + + switch (item->unboundop) { + case UNBOUND_REMOVE: + // The caller must free/clear it. + return 1; + case UNBOUND_ERROR: + case UNBOUND_REPLACE: + // We won't need the cross-interpreter data later + // so we completely throw it away. + _channelitem_clear_data(item, 0); + return 0; + default: + Py_FatalError("not reachable"); + return -1; + } +} + + typedef struct _channelqueue { int64_t count; _channelitem *first; @@ -634,9 +690,10 @@ _channelqueue_free(_channelqueue *queue) static int _channelqueue_put(_channelqueue *queue, - _PyCrossInterpreterData *data, _waiting_t *waiting) + int64_t interpid, _PyCrossInterpreterData *data, + _waiting_t *waiting, int unboundop) { - _channelitem *item = _channelitem_new(data, waiting); + _channelitem *item = _channelitem_new(interpid, data, waiting, unboundop); if (item == NULL) { return -1; } @@ -659,7 +716,8 @@ _channelqueue_put(_channelqueue *queue, static int _channelqueue_get(_channelqueue *queue, - _PyCrossInterpreterData **p_data, _waiting_t **p_waiting) + _PyCrossInterpreterData **p_data, _waiting_t **p_waiting, + int *p_unboundop) { _channelitem *item = queue->first; if (item == NULL) { @@ -671,7 +729,7 @@ _channelqueue_get(_channelqueue *queue, } queue->count -= 1; - _channelitem_popped(item, p_data, p_waiting); + _channelitem_popped(item, p_data, p_waiting, p_unboundop); return 0; } @@ -737,7 +795,8 @@ _channelqueue_remove(_channelqueue *queue, _channelitem_id_t itemid, } queue->count -= 1; - _channelitem_popped(item, p_data, p_waiting); + int unboundop; + _channelitem_popped(item, p_data, p_waiting, &unboundop); } static void @@ -748,14 +807,17 @@ _channelqueue_clear_interpreter(_channelqueue *queue, int64_t interpid) while (next != NULL) { _channelitem *item = next; next = item->next; - if (_PyCrossInterpreterData_INTERPID(item->data) == interpid) { + int remove = (item->interpid == interpid) + ? _channelitem_clear_interpreter(item) + : 0; + if (remove) { + _channelitem_free(item); if (prev == NULL) { - queue->first = item->next; + queue->first = next; } else { - prev->next = item->next; + prev->next = next; } - _channelitem_free(item); queue->count -= 1; } else { @@ -1018,12 +1080,15 @@ typedef struct _channel { PyThread_type_lock mutex; _channelqueue *queue; _channelends *ends; + struct { + int unboundop; + } defaults; int open; struct _channel_closing *closing; } _channel_state; static _channel_state * -_channel_new(PyThread_type_lock mutex) +_channel_new(PyThread_type_lock mutex, int unboundop) { _channel_state *chan = GLOBAL_MALLOC(_channel_state); if (chan == NULL) { @@ -1041,6 +1106,7 @@ _channel_new(PyThread_type_lock mutex) GLOBAL_FREE(chan); return NULL; } + chan->defaults.unboundop = unboundop; chan->open = 1; chan->closing = NULL; return chan; @@ -1061,7 +1127,8 @@ _channel_free(_channel_state *chan) static int _channel_add(_channel_state *chan, int64_t interpid, - _PyCrossInterpreterData *data, _waiting_t *waiting) + _PyCrossInterpreterData *data, _waiting_t *waiting, + int unboundop) { int res = -1; PyThread_acquire_lock(chan->mutex, WAIT_LOCK); @@ -1075,7 +1142,7 @@ _channel_add(_channel_state *chan, int64_t interpid, goto done; } - if (_channelqueue_put(chan->queue, data, waiting) != 0) { + if (_channelqueue_put(chan->queue, interpid, data, waiting, unboundop) != 0) { goto done; } // Any errors past this point must cause a _waiting_release() call. @@ -1088,7 +1155,8 @@ done: static int _channel_next(_channel_state *chan, int64_t interpid, - _PyCrossInterpreterData **p_data, _waiting_t **p_waiting) + _PyCrossInterpreterData **p_data, _waiting_t **p_waiting, + int *p_unboundop) { int err = 0; PyThread_acquire_lock(chan->mutex, WAIT_LOCK); @@ -1102,11 +1170,15 @@ _channel_next(_channel_state *chan, int64_t interpid, goto done; } - int empty = _channelqueue_get(chan->queue, p_data, p_waiting); - assert(empty == 0 || empty == ERR_CHANNEL_EMPTY); + int empty = _channelqueue_get(chan->queue, p_data, p_waiting, p_unboundop); assert(!PyErr_Occurred()); - if (empty && chan->closing != NULL) { - chan->open = 0; + if (empty) { + assert(empty == ERR_CHANNEL_EMPTY); + if (chan->closing != NULL) { + chan->open = 0; + } + err = ERR_CHANNEL_EMPTY; + goto done; } done: @@ -1528,18 +1600,27 @@ done: PyThread_release_lock(channels->mutex); } -static int64_t * +struct channel_id_and_info { + int64_t id; + int unboundop; +}; + +static struct channel_id_and_info * _channels_list_all(_channels *channels, int64_t *count) { - int64_t *cids = NULL; + struct channel_id_and_info *cids = NULL; PyThread_acquire_lock(channels->mutex, WAIT_LOCK); - int64_t *ids = PyMem_NEW(int64_t, (Py_ssize_t)(channels->numopen)); + struct channel_id_and_info *ids = + PyMem_NEW(struct channel_id_and_info, (Py_ssize_t)(channels->numopen)); if (ids == NULL) { goto done; } _channelref *ref = channels->head; for (int64_t i=0; ref != NULL; ref = ref->next, i++) { - ids[i] = ref->cid; + ids[i] = (struct channel_id_and_info){ + .id = ref->cid, + .unboundop = ref->chan->defaults.unboundop, + }; } *count = channels->numopen; @@ -1624,13 +1705,13 @@ _channel_finish_closing(_channel_state *chan) { // Create a new channel. static int64_t -channel_create(_channels *channels) +channel_create(_channels *channels, int unboundop) { PyThread_type_lock mutex = PyThread_allocate_lock(); if (mutex == NULL) { return ERR_CHANNEL_MUTEX_INIT; } - _channel_state *chan = _channel_new(mutex); + _channel_state *chan = _channel_new(mutex, unboundop); if (chan == NULL) { PyThread_free_lock(mutex); return -1; @@ -1662,7 +1743,7 @@ channel_destroy(_channels *channels, int64_t cid) // Optionally request to be notified when it is received. static int channel_send(_channels *channels, int64_t cid, PyObject *obj, - _waiting_t *waiting) + _waiting_t *waiting, int unboundop) { PyInterpreterState *interp = _get_current_interp(); if (interp == NULL) { @@ -1698,7 +1779,7 @@ channel_send(_channels *channels, int64_t cid, PyObject *obj, } // Add the data to the channel. - int res = _channel_add(chan, interpid, data, waiting); + int res = _channel_add(chan, interpid, data, waiting, unboundop); PyThread_release_lock(mutex); if (res != 0) { // We may chain an exception here: @@ -1735,7 +1816,7 @@ channel_clear_sent(_channels *channels, int64_t cid, _waiting_t *waiting) // Like channel_send(), but strictly wait for the object to be received. static int channel_send_wait(_channels *channels, int64_t cid, PyObject *obj, - PY_TIMEOUT_T timeout) + int unboundop, PY_TIMEOUT_T timeout) { // We use a stack variable here, so we must ensure that &waiting // is not held by any channel item at the point this function exits. @@ -1746,7 +1827,7 @@ channel_send_wait(_channels *channels, int64_t cid, PyObject *obj, } /* Queue up the object. */ - int res = channel_send(channels, cid, obj, &waiting); + int res = channel_send(channels, cid, obj, &waiting, unboundop); if (res < 0) { assert(waiting.status == WAITING_NO_STATUS); goto finally; @@ -1788,7 +1869,7 @@ finally: // The current interpreter gets associated with the recv end of the channel. // XXX Support a "wait" mutex? static int -channel_recv(_channels *channels, int64_t cid, PyObject **res) +channel_recv(_channels *channels, int64_t cid, PyObject **res, int *p_unboundop) { int err; *res = NULL; @@ -1816,13 +1897,15 @@ channel_recv(_channels *channels, int64_t cid, PyObject **res) // Pop off the next item from the channel. _PyCrossInterpreterData *data = NULL; _waiting_t *waiting = NULL; - err = _channel_next(chan, interpid, &data, &waiting); + err = _channel_next(chan, interpid, &data, &waiting, p_unboundop); PyThread_release_lock(mutex); if (err != 0) { return err; } else if (data == NULL) { + // The item was unbound. assert(!PyErr_Occurred()); + *res = NULL; return 0; } @@ -1915,6 +1998,23 @@ channel_is_associated(_channels *channels, int64_t cid, int64_t interpid, return (end != NULL && end->open); } +static int +_channel_get_count(_channels *channels, int64_t cid, Py_ssize_t *p_count) +{ + PyThread_type_lock mutex = NULL; + _channel_state *chan = NULL; + int err = _channels_lookup(channels, cid, &mutex, &chan); + if (err != 0) { + return err; + } + assert(chan != NULL); + int64_t count = chan->queue->count; + PyThread_release_lock(mutex); + + *p_count = (Py_ssize_t)count; + return 0; +} + /* channel info */ @@ -2767,9 +2867,22 @@ clear_interpreter(void *data) static PyObject * -channelsmod_create(PyObject *self, PyObject *Py_UNUSED(ignored)) +channelsmod_create(PyObject *self, PyObject *args, PyObject *kwds) { - int64_t cid = channel_create(&_globals.channels); + static char *kwlist[] = {"unboundop", NULL}; + int unboundop; + if (!PyArg_ParseTupleAndKeywords(args, kwds, "i:create", kwlist, + &unboundop)) + { + return NULL; + } + if (!check_unbound(unboundop)) { + PyErr_Format(PyExc_ValueError, + "unsupported unboundop %d", unboundop); + return NULL; + } + + int64_t cid = channel_create(&_globals.channels, unboundop); if (cid < 0) { (void)handle_channel_error(-1, self, cid); return NULL; @@ -2796,7 +2909,7 @@ channelsmod_create(PyObject *self, PyObject *Py_UNUSED(ignored)) } PyDoc_STRVAR(channelsmod_create_doc, -"channel_create() -> cid\n\ +"channel_create(unboundop) -> cid\n\ \n\ Create a new cross-interpreter channel and return a unique generated ID."); @@ -2831,7 +2944,8 @@ static PyObject * channelsmod_list_all(PyObject *self, PyObject *Py_UNUSED(ignored)) { int64_t count = 0; - int64_t *cids = _channels_list_all(&_globals.channels, &count); + struct channel_id_and_info *cids = + _channels_list_all(&_globals.channels, &count); if (cids == NULL) { if (count == 0) { return PyList_New(0); @@ -2848,19 +2962,26 @@ channelsmod_list_all(PyObject *self, PyObject *Py_UNUSED(ignored)) ids = NULL; goto finally; } - int64_t *cur = cids; + struct channel_id_and_info *cur = cids; for (int64_t i=0; i < count; cur++, i++) { PyObject *cidobj = NULL; - int err = newchannelid(state->ChannelIDType, *cur, 0, + int err = newchannelid(state->ChannelIDType, cur->id, 0, &_globals.channels, 0, 0, (channelid **)&cidobj); - if (handle_channel_error(err, self, *cur)) { + if (handle_channel_error(err, self, cur->id)) { assert(cidobj == NULL); Py_SETREF(ids, NULL); break; } assert(cidobj != NULL); - PyList_SET_ITEM(ids, (Py_ssize_t)i, cidobj); + + PyObject *item = Py_BuildValue("Oi", cidobj, cur->unboundop); + Py_DECREF(cidobj); + if (item == NULL) { + Py_SETREF(ids, NULL); + break; + } + PyList_SET_ITEM(ids, (Py_ssize_t)i, item); } finally: @@ -2942,16 +3063,24 @@ receive end."); static PyObject * channelsmod_send(PyObject *self, PyObject *args, PyObject *kwds) { - static char *kwlist[] = {"cid", "obj", "blocking", "timeout", NULL}; + static char *kwlist[] = {"cid", "obj", "unboundop", "blocking", "timeout", + NULL}; struct channel_id_converter_data cid_data = { .module = self, }; PyObject *obj; + int unboundop = UNBOUND_REPLACE; int blocking = 1; PyObject *timeout_obj = NULL; - if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&O|$pO:channel_send", kwlist, + if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&O|i$pO:channel_send", kwlist, channel_id_converter, &cid_data, &obj, - &blocking, &timeout_obj)) { + &unboundop, &blocking, &timeout_obj)) + { + return NULL; + } + if (!check_unbound(unboundop)) { + PyErr_Format(PyExc_ValueError, + "unsupported unboundop %d", unboundop); return NULL; } @@ -2964,10 +3093,10 @@ channelsmod_send(PyObject *self, PyObject *args, PyObject *kwds) /* Queue up the object. */ int err = 0; if (blocking) { - err = channel_send_wait(&_globals.channels, cid, obj, timeout); + err = channel_send_wait(&_globals.channels, cid, obj, unboundop, timeout); } else { - err = channel_send(&_globals.channels, cid, obj, NULL); + err = channel_send(&_globals.channels, cid, obj, NULL, unboundop); } if (handle_channel_error(err, self, cid)) { return NULL; @@ -2985,17 +3114,24 @@ By default this waits for the object to be received."); static PyObject * channelsmod_send_buffer(PyObject *self, PyObject *args, PyObject *kwds) { - static char *kwlist[] = {"cid", "obj", "blocking", "timeout", NULL}; + static char *kwlist[] = {"cid", "obj", "unboundop", "blocking", "timeout", + NULL}; struct channel_id_converter_data cid_data = { .module = self, }; PyObject *obj; + int unboundop = UNBOUND_REPLACE; int blocking = 1; PyObject *timeout_obj = NULL; if (!PyArg_ParseTupleAndKeywords(args, kwds, - "O&O|$pO:channel_send_buffer", kwlist, + "O&O|i$pO:channel_send_buffer", kwlist, channel_id_converter, &cid_data, &obj, - &blocking, &timeout_obj)) { + &unboundop, &blocking, &timeout_obj)) { + return NULL; + } + if (!check_unbound(unboundop)) { + PyErr_Format(PyExc_ValueError, + "unsupported unboundop %d", unboundop); return NULL; } @@ -3013,10 +3149,11 @@ channelsmod_send_buffer(PyObject *self, PyObject *args, PyObject *kwds) /* Queue up the object. */ int err = 0; if (blocking) { - err = channel_send_wait(&_globals.channels, cid, tempobj, timeout); + err = channel_send_wait( + &_globals.channels, cid, tempobj, unboundop, timeout); } else { - err = channel_send(&_globals.channels, cid, tempobj, NULL); + err = channel_send(&_globals.channels, cid, tempobj, NULL, unboundop); } Py_DECREF(tempobj); if (handle_channel_error(err, self, cid)) { @@ -3048,25 +3185,28 @@ channelsmod_recv(PyObject *self, PyObject *args, PyObject *kwds) cid = cid_data.cid; PyObject *obj = NULL; - int err = channel_recv(&_globals.channels, cid, &obj); - if (handle_channel_error(err, self, cid)) { + int unboundop = 0; + int err = channel_recv(&_globals.channels, cid, &obj, &unboundop); + if (err == ERR_CHANNEL_EMPTY && dflt != NULL) { + // Use the default. + obj = Py_NewRef(dflt); + err = 0; + } + else if (handle_channel_error(err, self, cid)) { return NULL; } - Py_XINCREF(dflt); - if (obj == NULL) { - // Use the default. - if (dflt == NULL) { - (void)handle_channel_error(ERR_CHANNEL_EMPTY, self, cid); - return NULL; - } - obj = Py_NewRef(dflt); + else if (obj == NULL) { + // The item was unbound. + return Py_BuildValue("Oi", Py_None, unboundop); } - Py_XDECREF(dflt); - return obj; + + PyObject *res = Py_BuildValue("OO", obj, Py_None); + Py_DECREF(obj); + return res; } PyDoc_STRVAR(channelsmod_recv_doc, -"channel_recv(cid, [default]) -> obj\n\ +"channel_recv(cid, [default]) -> (obj, unboundop)\n\ \n\ Return a new object from the data at the front of the channel's queue.\n\ \n\ @@ -3167,6 +3307,34 @@ Close the channel for the current interpreter. 'send' and 'recv'\n\ (bool) may be used to indicate the ends to close. By default both\n\ ends are closed. Closing an already closed end is a noop."); +static PyObject * +channelsmod_get_count(PyObject *self, PyObject *args, PyObject *kwds) +{ + static char *kwlist[] = {"cid", NULL}; + struct channel_id_converter_data cid_data = { + .module = self, + }; + if (!PyArg_ParseTupleAndKeywords(args, kwds, + "O&:get_count", kwlist, + channel_id_converter, &cid_data)) { + return NULL; + } + int64_t cid = cid_data.cid; + + Py_ssize_t count = -1; + int err = _channel_get_count(&_globals.channels, cid, &count); + if (handle_channel_error(err, self, cid)) { + return NULL; + } + assert(count >= 0); + return PyLong_FromSsize_t(count); +} + +PyDoc_STRVAR(channelsmod_get_count_doc, +"get_count(cid)\n\ +\n\ +Return the number of items in the channel."); + static PyObject * channelsmod_get_info(PyObject *self, PyObject *args, PyObject *kwds) { @@ -3194,6 +3362,38 @@ PyDoc_STRVAR(channelsmod_get_info_doc, \n\ Return details about the channel."); +static PyObject * +channelsmod_get_channel_defaults(PyObject *self, PyObject *args, PyObject *kwds) +{ + static char *kwlist[] = {"cid", NULL}; + struct channel_id_converter_data cid_data = { + .module = self, + }; + if (!PyArg_ParseTupleAndKeywords(args, kwds, + "O&:get_channel_defaults", kwlist, + channel_id_converter, &cid_data)) { + return NULL; + } + int64_t cid = cid_data.cid; + + PyThread_type_lock mutex = NULL; + _channel_state *channel = NULL; + int err = _channels_lookup(&_globals.channels, cid, &mutex, &channel); + if (handle_channel_error(err, self, cid)) { + return NULL; + } + int unboundop = channel->defaults.unboundop; + PyThread_release_lock(mutex); + + PyObject *defaults = Py_BuildValue("i", unboundop); + return defaults; +} + +PyDoc_STRVAR(channelsmod_get_channel_defaults_doc, +"get_channel_defaults(cid)\n\ +\n\ +Return the channel's default values, set when it was created."); + static PyObject * channelsmod__channel_id(PyObject *self, PyObject *args, PyObject *kwds) { @@ -3240,8 +3440,8 @@ channelsmod__register_end_types(PyObject *self, PyObject *args, PyObject *kwds) } static PyMethodDef module_functions[] = { - {"create", channelsmod_create, - METH_NOARGS, channelsmod_create_doc}, + {"create", _PyCFunction_CAST(channelsmod_create), + METH_VARARGS | METH_KEYWORDS, channelsmod_create_doc}, {"destroy", _PyCFunction_CAST(channelsmod_destroy), METH_VARARGS | METH_KEYWORDS, channelsmod_destroy_doc}, {"list_all", channelsmod_list_all, @@ -3258,8 +3458,12 @@ static PyMethodDef module_functions[] = { METH_VARARGS | METH_KEYWORDS, channelsmod_close_doc}, {"release", _PyCFunction_CAST(channelsmod_release), METH_VARARGS | METH_KEYWORDS, channelsmod_release_doc}, + {"get_count", _PyCFunction_CAST(channelsmod_get_count), + METH_VARARGS | METH_KEYWORDS, channelsmod_get_count_doc}, {"get_info", _PyCFunction_CAST(channelsmod_get_info), METH_VARARGS | METH_KEYWORDS, channelsmod_get_info_doc}, + {"get_channel_defaults", _PyCFunction_CAST(channelsmod_get_channel_defaults), + METH_VARARGS | METH_KEYWORDS, channelsmod_get_channel_defaults_doc}, {"_channel_id", _PyCFunction_CAST(channelsmod__channel_id), METH_VARARGS | METH_KEYWORDS, NULL}, {"_register_end_types", _PyCFunction_CAST(channelsmod__register_end_types), diff --git a/Modules/_interpqueuesmodule.c b/Modules/_interpqueuesmodule.c index 8e827891987..5dec240f02c 100644 --- a/Modules/_interpqueuesmodule.c +++ b/Modules/_interpqueuesmodule.c @@ -9,7 +9,9 @@ #include "pycore_crossinterp.h" // struct _xid #define REGISTERS_HEAP_TYPES +#define HAS_UNBOUND_ITEMS #include "_interpreters_common.h" +#undef HAS_UNBOUND_ITEMS #undef REGISTERS_HEAP_TYPES @@ -58,20 +60,6 @@ _release_xid_data(_PyCrossInterpreterData *data, int flags) return res; } -static inline int64_t -_get_interpid(_PyCrossInterpreterData *data) -{ - int64_t interpid; - if (data != NULL) { - interpid = _PyCrossInterpreterData_INTERPID(data); - assert(!PyErr_Occurred()); - } - else { - interpid = PyInterpreterState_GetID(PyInterpreterState_Get()); - } - return interpid; -} - static PyInterpreterState * _get_current_interp(void) { @@ -402,32 +390,6 @@ handle_queue_error(int err, PyObject *mod, int64_t qid) } -/* unbound items ************************************************************/ - -#define UNBOUND_REMOVE 1 -#define UNBOUND_ERROR 2 -#define UNBOUND_REPLACE 3 - -// It would also be possible to add UNBOUND_REPLACE where the replacement -// value is user-provided. There would be some limitations there, though. -// Another possibility would be something like UNBOUND_COPY, where the -// object is released but the underlying data is copied (with the "raw" -// allocator) and used when the item is popped off the queue. - -static int -check_unbound(int unboundop) -{ - switch (unboundop) { - case UNBOUND_REMOVE: - case UNBOUND_ERROR: - case UNBOUND_REPLACE: - return 1; - default: - return 0; - } -} - - /* the basic queue **********************************************************/ struct _queueitem; diff --git a/Modules/_interpreters_common.h b/Modules/_interpreters_common.h index 07120f6ccc7..0d2e0c9efd3 100644 --- a/Modules/_interpreters_common.h +++ b/Modules/_interpreters_common.h @@ -19,3 +19,48 @@ clear_xid_class(PyTypeObject *cls) return _PyCrossInterpreterData_UnregisterClass(cls); } #endif + + +static inline int64_t +_get_interpid(_PyCrossInterpreterData *data) +{ + int64_t interpid; + if (data != NULL) { + interpid = _PyCrossInterpreterData_INTERPID(data); + assert(!PyErr_Occurred()); + } + else { + interpid = PyInterpreterState_GetID(PyInterpreterState_Get()); + } + return interpid; +} + + +/* unbound items ************************************************************/ + +#ifdef HAS_UNBOUND_ITEMS + +#define UNBOUND_REMOVE 1 +#define UNBOUND_ERROR 2 +#define UNBOUND_REPLACE 3 + +// It would also be possible to add UNBOUND_REPLACE where the replacement +// value is user-provided. There would be some limitations there, though. +// Another possibility would be something like UNBOUND_COPY, where the +// object is released but the underlying data is copied (with the "raw" +// allocator) and used when the item is popped off the queue. + +static int +check_unbound(int unboundop) +{ + switch (unboundop) { + case UNBOUND_REMOVE: + case UNBOUND_ERROR: + case UNBOUND_REPLACE: + return 1; + default: + return 0; + } +} + +#endif