mirror of https://github.com/python/cpython
gh-93649: Split watcher API tests from _testcapimodule.c (#99532)
This commit is contained in:
parent
9db1e17c80
commit
51d10354f4
|
@ -2,7 +2,6 @@
|
|||
# these are all functions _testcapi exports whose name begins with 'test_'.
|
||||
|
||||
from collections import OrderedDict
|
||||
from contextlib import contextmanager, ExitStack
|
||||
import _thread
|
||||
import importlib.machinery
|
||||
import importlib.util
|
||||
|
@ -20,7 +19,6 @@ import warnings
|
|||
import weakref
|
||||
from test import support
|
||||
from test.support import MISSING_C_DOCSTRINGS
|
||||
from test.support import catch_unraisable_exception
|
||||
from test.support import import_helper
|
||||
from test.support import threading_helper
|
||||
from test.support import warnings_helper
|
||||
|
@ -1705,333 +1703,5 @@ class Test_Pep523API(unittest.TestCase):
|
|||
self.do_test(func2)
|
||||
|
||||
|
||||
class TestDictWatchers(unittest.TestCase):
|
||||
# types of watchers testcapimodule can add:
|
||||
EVENTS = 0 # appends dict events as strings to global event list
|
||||
ERROR = 1 # unconditionally sets and signals a RuntimeException
|
||||
SECOND = 2 # always appends "second" to global event list
|
||||
|
||||
def add_watcher(self, kind=EVENTS):
|
||||
return _testcapi.add_dict_watcher(kind)
|
||||
|
||||
def clear_watcher(self, watcher_id):
|
||||
_testcapi.clear_dict_watcher(watcher_id)
|
||||
|
||||
@contextmanager
|
||||
def watcher(self, kind=EVENTS):
|
||||
wid = self.add_watcher(kind)
|
||||
try:
|
||||
yield wid
|
||||
finally:
|
||||
self.clear_watcher(wid)
|
||||
|
||||
def assert_events(self, expected):
|
||||
actual = _testcapi.get_dict_watcher_events()
|
||||
self.assertEqual(actual, expected)
|
||||
|
||||
def watch(self, wid, d):
|
||||
_testcapi.watch_dict(wid, d)
|
||||
|
||||
def unwatch(self, wid, d):
|
||||
_testcapi.unwatch_dict(wid, d)
|
||||
|
||||
def test_set_new_item(self):
|
||||
d = {}
|
||||
with self.watcher() as wid:
|
||||
self.watch(wid, d)
|
||||
d["foo"] = "bar"
|
||||
self.assert_events(["new:foo:bar"])
|
||||
|
||||
def test_set_existing_item(self):
|
||||
d = {"foo": "bar"}
|
||||
with self.watcher() as wid:
|
||||
self.watch(wid, d)
|
||||
d["foo"] = "baz"
|
||||
self.assert_events(["mod:foo:baz"])
|
||||
|
||||
def test_clone(self):
|
||||
d = {}
|
||||
d2 = {"foo": "bar"}
|
||||
with self.watcher() as wid:
|
||||
self.watch(wid, d)
|
||||
d.update(d2)
|
||||
self.assert_events(["clone"])
|
||||
|
||||
def test_no_event_if_not_watched(self):
|
||||
d = {}
|
||||
with self.watcher() as wid:
|
||||
d["foo"] = "bar"
|
||||
self.assert_events([])
|
||||
|
||||
def test_del(self):
|
||||
d = {"foo": "bar"}
|
||||
with self.watcher() as wid:
|
||||
self.watch(wid, d)
|
||||
del d["foo"]
|
||||
self.assert_events(["del:foo"])
|
||||
|
||||
def test_pop(self):
|
||||
d = {"foo": "bar"}
|
||||
with self.watcher() as wid:
|
||||
self.watch(wid, d)
|
||||
d.pop("foo")
|
||||
self.assert_events(["del:foo"])
|
||||
|
||||
def test_clear(self):
|
||||
d = {"foo": "bar"}
|
||||
with self.watcher() as wid:
|
||||
self.watch(wid, d)
|
||||
d.clear()
|
||||
self.assert_events(["clear"])
|
||||
|
||||
def test_dealloc(self):
|
||||
d = {"foo": "bar"}
|
||||
with self.watcher() as wid:
|
||||
self.watch(wid, d)
|
||||
del d
|
||||
self.assert_events(["dealloc"])
|
||||
|
||||
def test_unwatch(self):
|
||||
d = {}
|
||||
with self.watcher() as wid:
|
||||
self.watch(wid, d)
|
||||
d["foo"] = "bar"
|
||||
self.unwatch(wid, d)
|
||||
d["hmm"] = "baz"
|
||||
self.assert_events(["new:foo:bar"])
|
||||
|
||||
def test_error(self):
|
||||
d = {}
|
||||
with self.watcher(kind=self.ERROR) as wid:
|
||||
self.watch(wid, d)
|
||||
with catch_unraisable_exception() as cm:
|
||||
d["foo"] = "bar"
|
||||
self.assertIs(cm.unraisable.object, d)
|
||||
self.assertEqual(str(cm.unraisable.exc_value), "boom!")
|
||||
self.assert_events([])
|
||||
|
||||
def test_two_watchers(self):
|
||||
d1 = {}
|
||||
d2 = {}
|
||||
with self.watcher() as wid1:
|
||||
with self.watcher(kind=self.SECOND) as wid2:
|
||||
self.watch(wid1, d1)
|
||||
self.watch(wid2, d2)
|
||||
d1["foo"] = "bar"
|
||||
d2["hmm"] = "baz"
|
||||
self.assert_events(["new:foo:bar", "second"])
|
||||
|
||||
def test_watch_non_dict(self):
|
||||
with self.watcher() as wid:
|
||||
with self.assertRaisesRegex(ValueError, r"Cannot watch non-dictionary"):
|
||||
self.watch(wid, 1)
|
||||
|
||||
def test_watch_out_of_range_watcher_id(self):
|
||||
d = {}
|
||||
with self.assertRaisesRegex(ValueError, r"Invalid dict watcher ID -1"):
|
||||
self.watch(-1, d)
|
||||
with self.assertRaisesRegex(ValueError, r"Invalid dict watcher ID 8"):
|
||||
self.watch(8, d) # DICT_MAX_WATCHERS = 8
|
||||
|
||||
def test_watch_unassigned_watcher_id(self):
|
||||
d = {}
|
||||
with self.assertRaisesRegex(ValueError, r"No dict watcher set for ID 1"):
|
||||
self.watch(1, d)
|
||||
|
||||
def test_unwatch_non_dict(self):
|
||||
with self.watcher() as wid:
|
||||
with self.assertRaisesRegex(ValueError, r"Cannot watch non-dictionary"):
|
||||
self.unwatch(wid, 1)
|
||||
|
||||
def test_unwatch_out_of_range_watcher_id(self):
|
||||
d = {}
|
||||
with self.assertRaisesRegex(ValueError, r"Invalid dict watcher ID -1"):
|
||||
self.unwatch(-1, d)
|
||||
with self.assertRaisesRegex(ValueError, r"Invalid dict watcher ID 8"):
|
||||
self.unwatch(8, d) # DICT_MAX_WATCHERS = 8
|
||||
|
||||
def test_unwatch_unassigned_watcher_id(self):
|
||||
d = {}
|
||||
with self.assertRaisesRegex(ValueError, r"No dict watcher set for ID 1"):
|
||||
self.unwatch(1, d)
|
||||
|
||||
def test_clear_out_of_range_watcher_id(self):
|
||||
with self.assertRaisesRegex(ValueError, r"Invalid dict watcher ID -1"):
|
||||
self.clear_watcher(-1)
|
||||
with self.assertRaisesRegex(ValueError, r"Invalid dict watcher ID 8"):
|
||||
self.clear_watcher(8) # DICT_MAX_WATCHERS = 8
|
||||
|
||||
def test_clear_unassigned_watcher_id(self):
|
||||
with self.assertRaisesRegex(ValueError, r"No dict watcher set for ID 1"):
|
||||
self.clear_watcher(1)
|
||||
|
||||
|
||||
class TestTypeWatchers(unittest.TestCase):
|
||||
# types of watchers testcapimodule can add:
|
||||
TYPES = 0 # appends modified types to global event list
|
||||
ERROR = 1 # unconditionally sets and signals a RuntimeException
|
||||
WRAP = 2 # appends modified type wrapped in list to global event list
|
||||
|
||||
# duplicating the C constant
|
||||
TYPE_MAX_WATCHERS = 8
|
||||
|
||||
def add_watcher(self, kind=TYPES):
|
||||
return _testcapi.add_type_watcher(kind)
|
||||
|
||||
def clear_watcher(self, watcher_id):
|
||||
_testcapi.clear_type_watcher(watcher_id)
|
||||
|
||||
@contextmanager
|
||||
def watcher(self, kind=TYPES):
|
||||
wid = self.add_watcher(kind)
|
||||
try:
|
||||
yield wid
|
||||
finally:
|
||||
self.clear_watcher(wid)
|
||||
|
||||
def assert_events(self, expected):
|
||||
actual = _testcapi.get_type_modified_events()
|
||||
self.assertEqual(actual, expected)
|
||||
|
||||
def watch(self, wid, t):
|
||||
_testcapi.watch_type(wid, t)
|
||||
|
||||
def unwatch(self, wid, t):
|
||||
_testcapi.unwatch_type(wid, t)
|
||||
|
||||
def test_watch_type(self):
|
||||
class C: pass
|
||||
with self.watcher() as wid:
|
||||
self.watch(wid, C)
|
||||
C.foo = "bar"
|
||||
self.assert_events([C])
|
||||
|
||||
def test_event_aggregation(self):
|
||||
class C: pass
|
||||
with self.watcher() as wid:
|
||||
self.watch(wid, C)
|
||||
C.foo = "bar"
|
||||
C.bar = "baz"
|
||||
# only one event registered for both modifications
|
||||
self.assert_events([C])
|
||||
|
||||
def test_lookup_resets_aggregation(self):
|
||||
class C: pass
|
||||
with self.watcher() as wid:
|
||||
self.watch(wid, C)
|
||||
C.foo = "bar"
|
||||
# lookup resets type version tag
|
||||
self.assertEqual(C.foo, "bar")
|
||||
C.bar = "baz"
|
||||
# both events registered
|
||||
self.assert_events([C, C])
|
||||
|
||||
def test_unwatch_type(self):
|
||||
class C: pass
|
||||
with self.watcher() as wid:
|
||||
self.watch(wid, C)
|
||||
C.foo = "bar"
|
||||
self.assertEqual(C.foo, "bar")
|
||||
self.assert_events([C])
|
||||
self.unwatch(wid, C)
|
||||
C.bar = "baz"
|
||||
self.assert_events([C])
|
||||
|
||||
def test_clear_watcher(self):
|
||||
class C: pass
|
||||
# outer watcher is unused, it's just to keep events list alive
|
||||
with self.watcher() as _:
|
||||
with self.watcher() as wid:
|
||||
self.watch(wid, C)
|
||||
C.foo = "bar"
|
||||
self.assertEqual(C.foo, "bar")
|
||||
self.assert_events([C])
|
||||
C.bar = "baz"
|
||||
# Watcher on C has been cleared, no new event
|
||||
self.assert_events([C])
|
||||
|
||||
def test_watch_type_subclass(self):
|
||||
class C: pass
|
||||
class D(C): pass
|
||||
with self.watcher() as wid:
|
||||
self.watch(wid, D)
|
||||
C.foo = "bar"
|
||||
self.assert_events([D])
|
||||
|
||||
def test_error(self):
|
||||
class C: pass
|
||||
with self.watcher(kind=self.ERROR) as wid:
|
||||
self.watch(wid, C)
|
||||
with catch_unraisable_exception() as cm:
|
||||
C.foo = "bar"
|
||||
self.assertIs(cm.unraisable.object, C)
|
||||
self.assertEqual(str(cm.unraisable.exc_value), "boom!")
|
||||
self.assert_events([])
|
||||
|
||||
def test_two_watchers(self):
|
||||
class C1: pass
|
||||
class C2: pass
|
||||
with self.watcher() as wid1:
|
||||
with self.watcher(kind=self.WRAP) as wid2:
|
||||
self.assertNotEqual(wid1, wid2)
|
||||
self.watch(wid1, C1)
|
||||
self.watch(wid2, C2)
|
||||
C1.foo = "bar"
|
||||
C2.hmm = "baz"
|
||||
self.assert_events([C1, [C2]])
|
||||
|
||||
def test_watch_non_type(self):
|
||||
with self.watcher() as wid:
|
||||
with self.assertRaisesRegex(ValueError, r"Cannot watch non-type"):
|
||||
self.watch(wid, 1)
|
||||
|
||||
def test_watch_out_of_range_watcher_id(self):
|
||||
class C: pass
|
||||
with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID -1"):
|
||||
self.watch(-1, C)
|
||||
with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID 8"):
|
||||
self.watch(self.TYPE_MAX_WATCHERS, C)
|
||||
|
||||
def test_watch_unassigned_watcher_id(self):
|
||||
class C: pass
|
||||
with self.assertRaisesRegex(ValueError, r"No type watcher set for ID 1"):
|
||||
self.watch(1, C)
|
||||
|
||||
def test_unwatch_non_type(self):
|
||||
with self.watcher() as wid:
|
||||
with self.assertRaisesRegex(ValueError, r"Cannot watch non-type"):
|
||||
self.unwatch(wid, 1)
|
||||
|
||||
def test_unwatch_out_of_range_watcher_id(self):
|
||||
class C: pass
|
||||
with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID -1"):
|
||||
self.unwatch(-1, C)
|
||||
with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID 8"):
|
||||
self.unwatch(self.TYPE_MAX_WATCHERS, C)
|
||||
|
||||
def test_unwatch_unassigned_watcher_id(self):
|
||||
class C: pass
|
||||
with self.assertRaisesRegex(ValueError, r"No type watcher set for ID 1"):
|
||||
self.unwatch(1, C)
|
||||
|
||||
def test_clear_out_of_range_watcher_id(self):
|
||||
with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID -1"):
|
||||
self.clear_watcher(-1)
|
||||
with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID 8"):
|
||||
self.clear_watcher(self.TYPE_MAX_WATCHERS)
|
||||
|
||||
def test_clear_unassigned_watcher_id(self):
|
||||
with self.assertRaisesRegex(ValueError, r"No type watcher set for ID 1"):
|
||||
self.clear_watcher(1)
|
||||
|
||||
def test_no_more_ids_available(self):
|
||||
contexts = [self.watcher() for i in range(self.TYPE_MAX_WATCHERS)]
|
||||
with ExitStack() as stack:
|
||||
for ctx in contexts:
|
||||
stack.enter_context(ctx)
|
||||
with self.assertRaisesRegex(RuntimeError, r"no more type watcher IDs"):
|
||||
self.add_watcher()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
|
|
@ -0,0 +1,340 @@
|
|||
import unittest
|
||||
|
||||
from contextlib import contextmanager, ExitStack
|
||||
from test.support import catch_unraisable_exception, import_helper
|
||||
|
||||
|
||||
# Skip this test if the _testcapi module isn't available.
|
||||
_testcapi = import_helper.import_module('_testcapi')
|
||||
|
||||
|
||||
class TestDictWatchers(unittest.TestCase):
|
||||
# types of watchers testcapimodule can add:
|
||||
EVENTS = 0 # appends dict events as strings to global event list
|
||||
ERROR = 1 # unconditionally sets and signals a RuntimeException
|
||||
SECOND = 2 # always appends "second" to global event list
|
||||
|
||||
def add_watcher(self, kind=EVENTS):
|
||||
return _testcapi.add_dict_watcher(kind)
|
||||
|
||||
def clear_watcher(self, watcher_id):
|
||||
_testcapi.clear_dict_watcher(watcher_id)
|
||||
|
||||
@contextmanager
|
||||
def watcher(self, kind=EVENTS):
|
||||
wid = self.add_watcher(kind)
|
||||
try:
|
||||
yield wid
|
||||
finally:
|
||||
self.clear_watcher(wid)
|
||||
|
||||
def assert_events(self, expected):
|
||||
actual = _testcapi.get_dict_watcher_events()
|
||||
self.assertEqual(actual, expected)
|
||||
|
||||
def watch(self, wid, d):
|
||||
_testcapi.watch_dict(wid, d)
|
||||
|
||||
def unwatch(self, wid, d):
|
||||
_testcapi.unwatch_dict(wid, d)
|
||||
|
||||
def test_set_new_item(self):
|
||||
d = {}
|
||||
with self.watcher() as wid:
|
||||
self.watch(wid, d)
|
||||
d["foo"] = "bar"
|
||||
self.assert_events(["new:foo:bar"])
|
||||
|
||||
def test_set_existing_item(self):
|
||||
d = {"foo": "bar"}
|
||||
with self.watcher() as wid:
|
||||
self.watch(wid, d)
|
||||
d["foo"] = "baz"
|
||||
self.assert_events(["mod:foo:baz"])
|
||||
|
||||
def test_clone(self):
|
||||
d = {}
|
||||
d2 = {"foo": "bar"}
|
||||
with self.watcher() as wid:
|
||||
self.watch(wid, d)
|
||||
d.update(d2)
|
||||
self.assert_events(["clone"])
|
||||
|
||||
def test_no_event_if_not_watched(self):
|
||||
d = {}
|
||||
with self.watcher() as wid:
|
||||
d["foo"] = "bar"
|
||||
self.assert_events([])
|
||||
|
||||
def test_del(self):
|
||||
d = {"foo": "bar"}
|
||||
with self.watcher() as wid:
|
||||
self.watch(wid, d)
|
||||
del d["foo"]
|
||||
self.assert_events(["del:foo"])
|
||||
|
||||
def test_pop(self):
|
||||
d = {"foo": "bar"}
|
||||
with self.watcher() as wid:
|
||||
self.watch(wid, d)
|
||||
d.pop("foo")
|
||||
self.assert_events(["del:foo"])
|
||||
|
||||
def test_clear(self):
|
||||
d = {"foo": "bar"}
|
||||
with self.watcher() as wid:
|
||||
self.watch(wid, d)
|
||||
d.clear()
|
||||
self.assert_events(["clear"])
|
||||
|
||||
def test_dealloc(self):
|
||||
d = {"foo": "bar"}
|
||||
with self.watcher() as wid:
|
||||
self.watch(wid, d)
|
||||
del d
|
||||
self.assert_events(["dealloc"])
|
||||
|
||||
def test_unwatch(self):
|
||||
d = {}
|
||||
with self.watcher() as wid:
|
||||
self.watch(wid, d)
|
||||
d["foo"] = "bar"
|
||||
self.unwatch(wid, d)
|
||||
d["hmm"] = "baz"
|
||||
self.assert_events(["new:foo:bar"])
|
||||
|
||||
def test_error(self):
|
||||
d = {}
|
||||
with self.watcher(kind=self.ERROR) as wid:
|
||||
self.watch(wid, d)
|
||||
with catch_unraisable_exception() as cm:
|
||||
d["foo"] = "bar"
|
||||
self.assertIs(cm.unraisable.object, d)
|
||||
self.assertEqual(str(cm.unraisable.exc_value), "boom!")
|
||||
self.assert_events([])
|
||||
|
||||
def test_two_watchers(self):
|
||||
d1 = {}
|
||||
d2 = {}
|
||||
with self.watcher() as wid1:
|
||||
with self.watcher(kind=self.SECOND) as wid2:
|
||||
self.watch(wid1, d1)
|
||||
self.watch(wid2, d2)
|
||||
d1["foo"] = "bar"
|
||||
d2["hmm"] = "baz"
|
||||
self.assert_events(["new:foo:bar", "second"])
|
||||
|
||||
def test_watch_non_dict(self):
|
||||
with self.watcher() as wid:
|
||||
with self.assertRaisesRegex(ValueError, r"Cannot watch non-dictionary"):
|
||||
self.watch(wid, 1)
|
||||
|
||||
def test_watch_out_of_range_watcher_id(self):
|
||||
d = {}
|
||||
with self.assertRaisesRegex(ValueError, r"Invalid dict watcher ID -1"):
|
||||
self.watch(-1, d)
|
||||
with self.assertRaisesRegex(ValueError, r"Invalid dict watcher ID 8"):
|
||||
self.watch(8, d) # DICT_MAX_WATCHERS = 8
|
||||
|
||||
def test_watch_unassigned_watcher_id(self):
|
||||
d = {}
|
||||
with self.assertRaisesRegex(ValueError, r"No dict watcher set for ID 1"):
|
||||
self.watch(1, d)
|
||||
|
||||
def test_unwatch_non_dict(self):
|
||||
with self.watcher() as wid:
|
||||
with self.assertRaisesRegex(ValueError, r"Cannot watch non-dictionary"):
|
||||
self.unwatch(wid, 1)
|
||||
|
||||
def test_unwatch_out_of_range_watcher_id(self):
|
||||
d = {}
|
||||
with self.assertRaisesRegex(ValueError, r"Invalid dict watcher ID -1"):
|
||||
self.unwatch(-1, d)
|
||||
with self.assertRaisesRegex(ValueError, r"Invalid dict watcher ID 8"):
|
||||
self.unwatch(8, d) # DICT_MAX_WATCHERS = 8
|
||||
|
||||
def test_unwatch_unassigned_watcher_id(self):
|
||||
d = {}
|
||||
with self.assertRaisesRegex(ValueError, r"No dict watcher set for ID 1"):
|
||||
self.unwatch(1, d)
|
||||
|
||||
def test_clear_out_of_range_watcher_id(self):
|
||||
with self.assertRaisesRegex(ValueError, r"Invalid dict watcher ID -1"):
|
||||
self.clear_watcher(-1)
|
||||
with self.assertRaisesRegex(ValueError, r"Invalid dict watcher ID 8"):
|
||||
self.clear_watcher(8) # DICT_MAX_WATCHERS = 8
|
||||
|
||||
def test_clear_unassigned_watcher_id(self):
|
||||
with self.assertRaisesRegex(ValueError, r"No dict watcher set for ID 1"):
|
||||
self.clear_watcher(1)
|
||||
|
||||
|
||||
class TestTypeWatchers(unittest.TestCase):
|
||||
# types of watchers testcapimodule can add:
|
||||
TYPES = 0 # appends modified types to global event list
|
||||
ERROR = 1 # unconditionally sets and signals a RuntimeException
|
||||
WRAP = 2 # appends modified type wrapped in list to global event list
|
||||
|
||||
# duplicating the C constant
|
||||
TYPE_MAX_WATCHERS = 8
|
||||
|
||||
def add_watcher(self, kind=TYPES):
|
||||
return _testcapi.add_type_watcher(kind)
|
||||
|
||||
def clear_watcher(self, watcher_id):
|
||||
_testcapi.clear_type_watcher(watcher_id)
|
||||
|
||||
@contextmanager
|
||||
def watcher(self, kind=TYPES):
|
||||
wid = self.add_watcher(kind)
|
||||
try:
|
||||
yield wid
|
||||
finally:
|
||||
self.clear_watcher(wid)
|
||||
|
||||
def assert_events(self, expected):
|
||||
actual = _testcapi.get_type_modified_events()
|
||||
self.assertEqual(actual, expected)
|
||||
|
||||
def watch(self, wid, t):
|
||||
_testcapi.watch_type(wid, t)
|
||||
|
||||
def unwatch(self, wid, t):
|
||||
_testcapi.unwatch_type(wid, t)
|
||||
|
||||
def test_watch_type(self):
|
||||
class C: pass
|
||||
with self.watcher() as wid:
|
||||
self.watch(wid, C)
|
||||
C.foo = "bar"
|
||||
self.assert_events([C])
|
||||
|
||||
def test_event_aggregation(self):
|
||||
class C: pass
|
||||
with self.watcher() as wid:
|
||||
self.watch(wid, C)
|
||||
C.foo = "bar"
|
||||
C.bar = "baz"
|
||||
# only one event registered for both modifications
|
||||
self.assert_events([C])
|
||||
|
||||
def test_lookup_resets_aggregation(self):
|
||||
class C: pass
|
||||
with self.watcher() as wid:
|
||||
self.watch(wid, C)
|
||||
C.foo = "bar"
|
||||
# lookup resets type version tag
|
||||
self.assertEqual(C.foo, "bar")
|
||||
C.bar = "baz"
|
||||
# both events registered
|
||||
self.assert_events([C, C])
|
||||
|
||||
def test_unwatch_type(self):
|
||||
class C: pass
|
||||
with self.watcher() as wid:
|
||||
self.watch(wid, C)
|
||||
C.foo = "bar"
|
||||
self.assertEqual(C.foo, "bar")
|
||||
self.assert_events([C])
|
||||
self.unwatch(wid, C)
|
||||
C.bar = "baz"
|
||||
self.assert_events([C])
|
||||
|
||||
def test_clear_watcher(self):
|
||||
class C: pass
|
||||
# outer watcher is unused, it's just to keep events list alive
|
||||
with self.watcher() as _:
|
||||
with self.watcher() as wid:
|
||||
self.watch(wid, C)
|
||||
C.foo = "bar"
|
||||
self.assertEqual(C.foo, "bar")
|
||||
self.assert_events([C])
|
||||
C.bar = "baz"
|
||||
# Watcher on C has been cleared, no new event
|
||||
self.assert_events([C])
|
||||
|
||||
def test_watch_type_subclass(self):
|
||||
class C: pass
|
||||
class D(C): pass
|
||||
with self.watcher() as wid:
|
||||
self.watch(wid, D)
|
||||
C.foo = "bar"
|
||||
self.assert_events([D])
|
||||
|
||||
def test_error(self):
|
||||
class C: pass
|
||||
with self.watcher(kind=self.ERROR) as wid:
|
||||
self.watch(wid, C)
|
||||
with catch_unraisable_exception() as cm:
|
||||
C.foo = "bar"
|
||||
self.assertIs(cm.unraisable.object, C)
|
||||
self.assertEqual(str(cm.unraisable.exc_value), "boom!")
|
||||
self.assert_events([])
|
||||
|
||||
def test_two_watchers(self):
|
||||
class C1: pass
|
||||
class C2: pass
|
||||
with self.watcher() as wid1:
|
||||
with self.watcher(kind=self.WRAP) as wid2:
|
||||
self.assertNotEqual(wid1, wid2)
|
||||
self.watch(wid1, C1)
|
||||
self.watch(wid2, C2)
|
||||
C1.foo = "bar"
|
||||
C2.hmm = "baz"
|
||||
self.assert_events([C1, [C2]])
|
||||
|
||||
def test_watch_non_type(self):
|
||||
with self.watcher() as wid:
|
||||
with self.assertRaisesRegex(ValueError, r"Cannot watch non-type"):
|
||||
self.watch(wid, 1)
|
||||
|
||||
def test_watch_out_of_range_watcher_id(self):
|
||||
class C: pass
|
||||
with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID -1"):
|
||||
self.watch(-1, C)
|
||||
with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID 8"):
|
||||
self.watch(self.TYPE_MAX_WATCHERS, C)
|
||||
|
||||
def test_watch_unassigned_watcher_id(self):
|
||||
class C: pass
|
||||
with self.assertRaisesRegex(ValueError, r"No type watcher set for ID 1"):
|
||||
self.watch(1, C)
|
||||
|
||||
def test_unwatch_non_type(self):
|
||||
with self.watcher() as wid:
|
||||
with self.assertRaisesRegex(ValueError, r"Cannot watch non-type"):
|
||||
self.unwatch(wid, 1)
|
||||
|
||||
def test_unwatch_out_of_range_watcher_id(self):
|
||||
class C: pass
|
||||
with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID -1"):
|
||||
self.unwatch(-1, C)
|
||||
with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID 8"):
|
||||
self.unwatch(self.TYPE_MAX_WATCHERS, C)
|
||||
|
||||
def test_unwatch_unassigned_watcher_id(self):
|
||||
class C: pass
|
||||
with self.assertRaisesRegex(ValueError, r"No type watcher set for ID 1"):
|
||||
self.unwatch(1, C)
|
||||
|
||||
def test_clear_out_of_range_watcher_id(self):
|
||||
with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID -1"):
|
||||
self.clear_watcher(-1)
|
||||
with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID 8"):
|
||||
self.clear_watcher(self.TYPE_MAX_WATCHERS)
|
||||
|
||||
def test_clear_unassigned_watcher_id(self):
|
||||
with self.assertRaisesRegex(ValueError, r"No type watcher set for ID 1"):
|
||||
self.clear_watcher(1)
|
||||
|
||||
def test_no_more_ids_available(self):
|
||||
contexts = [self.watcher() for i in range(self.TYPE_MAX_WATCHERS)]
|
||||
with ExitStack() as stack:
|
||||
for ctx in contexts:
|
||||
stack.enter_context(ctx)
|
||||
with self.assertRaisesRegex(RuntimeError, r"no more type watcher IDs"):
|
||||
self.add_watcher()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
|
@ -169,7 +169,7 @@
|
|||
@MODULE__XXTESTFUZZ_TRUE@_xxtestfuzz _xxtestfuzz/_xxtestfuzz.c _xxtestfuzz/fuzzer.c
|
||||
@MODULE__TESTBUFFER_TRUE@_testbuffer _testbuffer.c
|
||||
@MODULE__TESTINTERNALCAPI_TRUE@_testinternalcapi _testinternalcapi.c
|
||||
@MODULE__TESTCAPI_TRUE@_testcapi _testcapimodule.c _testcapi/vectorcall.c _testcapi/vectorcall_limited.c _testcapi/heaptype.c _testcapi/unicode.c _testcapi/getargs.c _testcapi/pytime.c _testcapi/datetime.c _testcapi/docstring.c _testcapi/mem.c
|
||||
@MODULE__TESTCAPI_TRUE@_testcapi _testcapimodule.c _testcapi/vectorcall.c _testcapi/vectorcall_limited.c _testcapi/heaptype.c _testcapi/unicode.c _testcapi/getargs.c _testcapi/pytime.c _testcapi/datetime.c _testcapi/docstring.c _testcapi/mem.c _testcapi/watchers.c
|
||||
|
||||
# Some testing modules MUST be built as shared libraries.
|
||||
*shared*
|
||||
|
|
|
@ -32,6 +32,7 @@ int _PyTestCapi_Init_PyTime(PyObject *module);
|
|||
int _PyTestCapi_Init_DateTime(PyObject *module);
|
||||
int _PyTestCapi_Init_Docstring(PyObject *module);
|
||||
int _PyTestCapi_Init_Mem(PyObject *module);
|
||||
int _PyTestCapi_Init_Watchers(PyObject *module);
|
||||
|
||||
#ifdef LIMITED_API_AVAILABLE
|
||||
int _PyTestCapi_Init_VectorcallLimited(PyObject *module);
|
||||
|
|
|
@ -0,0 +1,302 @@
|
|||
#include "parts.h"
|
||||
|
||||
|
||||
// Test dict watching
|
||||
static PyObject *g_dict_watch_events;
|
||||
static int g_dict_watchers_installed;
|
||||
|
||||
static int
|
||||
dict_watch_callback(PyDict_WatchEvent event,
|
||||
PyObject *dict,
|
||||
PyObject *key,
|
||||
PyObject *new_value)
|
||||
{
|
||||
PyObject *msg;
|
||||
switch (event) {
|
||||
case PyDict_EVENT_CLEARED:
|
||||
msg = PyUnicode_FromString("clear");
|
||||
break;
|
||||
case PyDict_EVENT_DEALLOCATED:
|
||||
msg = PyUnicode_FromString("dealloc");
|
||||
break;
|
||||
case PyDict_EVENT_CLONED:
|
||||
msg = PyUnicode_FromString("clone");
|
||||
break;
|
||||
case PyDict_EVENT_ADDED:
|
||||
msg = PyUnicode_FromFormat("new:%S:%S", key, new_value);
|
||||
break;
|
||||
case PyDict_EVENT_MODIFIED:
|
||||
msg = PyUnicode_FromFormat("mod:%S:%S", key, new_value);
|
||||
break;
|
||||
case PyDict_EVENT_DELETED:
|
||||
msg = PyUnicode_FromFormat("del:%S", key);
|
||||
break;
|
||||
default:
|
||||
msg = PyUnicode_FromString("unknown");
|
||||
}
|
||||
if (msg == NULL) {
|
||||
return -1;
|
||||
}
|
||||
assert(PyList_Check(g_dict_watch_events));
|
||||
if (PyList_Append(g_dict_watch_events, msg) < 0) {
|
||||
Py_DECREF(msg);
|
||||
return -1;
|
||||
}
|
||||
Py_DECREF(msg);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int
|
||||
dict_watch_callback_second(PyDict_WatchEvent event,
|
||||
PyObject *dict,
|
||||
PyObject *key,
|
||||
PyObject *new_value)
|
||||
{
|
||||
PyObject *msg = PyUnicode_FromString("second");
|
||||
if (msg == NULL) {
|
||||
return -1;
|
||||
}
|
||||
int rc = PyList_Append(g_dict_watch_events, msg);
|
||||
Py_DECREF(msg);
|
||||
if (rc < 0) {
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int
|
||||
dict_watch_callback_error(PyDict_WatchEvent event,
|
||||
PyObject *dict,
|
||||
PyObject *key,
|
||||
PyObject *new_value)
|
||||
{
|
||||
PyErr_SetString(PyExc_RuntimeError, "boom!");
|
||||
return -1;
|
||||
}
|
||||
|
||||
static PyObject *
|
||||
add_dict_watcher(PyObject *self, PyObject *kind)
|
||||
{
|
||||
int watcher_id;
|
||||
assert(PyLong_Check(kind));
|
||||
long kind_l = PyLong_AsLong(kind);
|
||||
if (kind_l == 2) {
|
||||
watcher_id = PyDict_AddWatcher(dict_watch_callback_second);
|
||||
}
|
||||
else if (kind_l == 1) {
|
||||
watcher_id = PyDict_AddWatcher(dict_watch_callback_error);
|
||||
}
|
||||
else {
|
||||
watcher_id = PyDict_AddWatcher(dict_watch_callback);
|
||||
}
|
||||
if (watcher_id < 0) {
|
||||
return NULL;
|
||||
}
|
||||
if (!g_dict_watchers_installed) {
|
||||
assert(!g_dict_watch_events);
|
||||
if (!(g_dict_watch_events = PyList_New(0))) {
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
g_dict_watchers_installed++;
|
||||
return PyLong_FromLong(watcher_id);
|
||||
}
|
||||
|
||||
static PyObject *
|
||||
clear_dict_watcher(PyObject *self, PyObject *watcher_id)
|
||||
{
|
||||
if (PyDict_ClearWatcher(PyLong_AsLong(watcher_id))) {
|
||||
return NULL;
|
||||
}
|
||||
g_dict_watchers_installed--;
|
||||
if (!g_dict_watchers_installed) {
|
||||
assert(g_dict_watch_events);
|
||||
Py_CLEAR(g_dict_watch_events);
|
||||
}
|
||||
Py_RETURN_NONE;
|
||||
}
|
||||
|
||||
static PyObject *
|
||||
watch_dict(PyObject *self, PyObject *args)
|
||||
{
|
||||
PyObject *dict;
|
||||
int watcher_id;
|
||||
if (!PyArg_ParseTuple(args, "iO", &watcher_id, &dict)) {
|
||||
return NULL;
|
||||
}
|
||||
if (PyDict_Watch(watcher_id, dict)) {
|
||||
return NULL;
|
||||
}
|
||||
Py_RETURN_NONE;
|
||||
}
|
||||
|
||||
static PyObject *
|
||||
unwatch_dict(PyObject *self, PyObject *args)
|
||||
{
|
||||
PyObject *dict;
|
||||
int watcher_id;
|
||||
if (!PyArg_ParseTuple(args, "iO", &watcher_id, &dict)) {
|
||||
return NULL;
|
||||
}
|
||||
if (PyDict_Unwatch(watcher_id, dict)) {
|
||||
return NULL;
|
||||
}
|
||||
Py_RETURN_NONE;
|
||||
}
|
||||
|
||||
static PyObject *
|
||||
get_dict_watcher_events(PyObject *self, PyObject *Py_UNUSED(args))
|
||||
{
|
||||
if (!g_dict_watch_events) {
|
||||
PyErr_SetString(PyExc_RuntimeError, "no watchers active");
|
||||
return NULL;
|
||||
}
|
||||
return Py_NewRef(g_dict_watch_events);
|
||||
}
|
||||
|
||||
// Test type watchers
|
||||
static PyObject *g_type_modified_events;
|
||||
static int g_type_watchers_installed;
|
||||
|
||||
static int
|
||||
type_modified_callback(PyTypeObject *type)
|
||||
{
|
||||
assert(PyList_Check(g_type_modified_events));
|
||||
if(PyList_Append(g_type_modified_events, (PyObject *)type) < 0) {
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int
|
||||
type_modified_callback_wrap(PyTypeObject *type)
|
||||
{
|
||||
assert(PyList_Check(g_type_modified_events));
|
||||
PyObject *list = PyList_New(0);
|
||||
if (list == NULL) {
|
||||
return -1;
|
||||
}
|
||||
if (PyList_Append(list, (PyObject *)type) < 0) {
|
||||
Py_DECREF(list);
|
||||
return -1;
|
||||
}
|
||||
if (PyList_Append(g_type_modified_events, list) < 0) {
|
||||
Py_DECREF(list);
|
||||
return -1;
|
||||
}
|
||||
Py_DECREF(list);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int
|
||||
type_modified_callback_error(PyTypeObject *type)
|
||||
{
|
||||
PyErr_SetString(PyExc_RuntimeError, "boom!");
|
||||
return -1;
|
||||
}
|
||||
|
||||
static PyObject *
|
||||
add_type_watcher(PyObject *self, PyObject *kind)
|
||||
{
|
||||
int watcher_id;
|
||||
assert(PyLong_Check(kind));
|
||||
long kind_l = PyLong_AsLong(kind);
|
||||
if (kind_l == 2) {
|
||||
watcher_id = PyType_AddWatcher(type_modified_callback_wrap);
|
||||
}
|
||||
else if (kind_l == 1) {
|
||||
watcher_id = PyType_AddWatcher(type_modified_callback_error);
|
||||
}
|
||||
else {
|
||||
watcher_id = PyType_AddWatcher(type_modified_callback);
|
||||
}
|
||||
if (watcher_id < 0) {
|
||||
return NULL;
|
||||
}
|
||||
if (!g_type_watchers_installed) {
|
||||
assert(!g_type_modified_events);
|
||||
if (!(g_type_modified_events = PyList_New(0))) {
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
g_type_watchers_installed++;
|
||||
return PyLong_FromLong(watcher_id);
|
||||
}
|
||||
|
||||
static PyObject *
|
||||
clear_type_watcher(PyObject *self, PyObject *watcher_id)
|
||||
{
|
||||
if (PyType_ClearWatcher(PyLong_AsLong(watcher_id))) {
|
||||
return NULL;
|
||||
}
|
||||
g_type_watchers_installed--;
|
||||
if (!g_type_watchers_installed) {
|
||||
assert(g_type_modified_events);
|
||||
Py_CLEAR(g_type_modified_events);
|
||||
}
|
||||
Py_RETURN_NONE;
|
||||
}
|
||||
|
||||
static PyObject *
|
||||
get_type_modified_events(PyObject *self, PyObject *Py_UNUSED(args))
|
||||
{
|
||||
if (!g_type_modified_events) {
|
||||
PyErr_SetString(PyExc_RuntimeError, "no watchers active");
|
||||
return NULL;
|
||||
}
|
||||
return Py_NewRef(g_type_modified_events);
|
||||
}
|
||||
|
||||
static PyObject *
|
||||
watch_type(PyObject *self, PyObject *args)
|
||||
{
|
||||
PyObject *type;
|
||||
int watcher_id;
|
||||
if (!PyArg_ParseTuple(args, "iO", &watcher_id, &type)) {
|
||||
return NULL;
|
||||
}
|
||||
if (PyType_Watch(watcher_id, type)) {
|
||||
return NULL;
|
||||
}
|
||||
Py_RETURN_NONE;
|
||||
}
|
||||
|
||||
static PyObject *
|
||||
unwatch_type(PyObject *self, PyObject *args)
|
||||
{
|
||||
PyObject *type;
|
||||
int watcher_id;
|
||||
if (!PyArg_ParseTuple(args, "iO", &watcher_id, &type)) {
|
||||
return NULL;
|
||||
}
|
||||
if (PyType_Unwatch(watcher_id, type)) {
|
||||
return NULL;
|
||||
}
|
||||
Py_RETURN_NONE;
|
||||
}
|
||||
|
||||
static PyMethodDef test_methods[] = {
|
||||
// Dict watchers.
|
||||
{"add_dict_watcher", add_dict_watcher, METH_O, NULL},
|
||||
{"clear_dict_watcher", clear_dict_watcher, METH_O, NULL},
|
||||
{"watch_dict", watch_dict, METH_VARARGS, NULL},
|
||||
{"unwatch_dict", unwatch_dict, METH_VARARGS, NULL},
|
||||
{"get_dict_watcher_events", get_dict_watcher_events, METH_NOARGS, NULL},
|
||||
|
||||
// Type watchers.
|
||||
{"add_type_watcher", add_type_watcher, METH_O, NULL},
|
||||
{"clear_type_watcher", clear_type_watcher, METH_O, NULL},
|
||||
{"watch_type", watch_type, METH_VARARGS, NULL},
|
||||
{"unwatch_type", unwatch_type, METH_VARARGS, NULL},
|
||||
{"get_type_modified_events", get_type_modified_events, METH_NOARGS, NULL},
|
||||
{NULL},
|
||||
};
|
||||
|
||||
int
|
||||
_PyTestCapi_Init_Watchers(PyObject *mod)
|
||||
{
|
||||
if (PyModule_AddFunctions(mod, test_methods) < 0) {
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
}
|
|
@ -3390,159 +3390,6 @@ test_tstate_capi(PyObject *self, PyObject *Py_UNUSED(args))
|
|||
Py_RETURN_NONE;
|
||||
}
|
||||
|
||||
|
||||
// Test dict watching
|
||||
static PyObject *g_dict_watch_events;
|
||||
static int g_dict_watchers_installed;
|
||||
|
||||
static int
|
||||
dict_watch_callback(PyDict_WatchEvent event,
|
||||
PyObject *dict,
|
||||
PyObject *key,
|
||||
PyObject *new_value)
|
||||
{
|
||||
PyObject *msg;
|
||||
switch(event) {
|
||||
case PyDict_EVENT_CLEARED:
|
||||
msg = PyUnicode_FromString("clear");
|
||||
break;
|
||||
case PyDict_EVENT_DEALLOCATED:
|
||||
msg = PyUnicode_FromString("dealloc");
|
||||
break;
|
||||
case PyDict_EVENT_CLONED:
|
||||
msg = PyUnicode_FromString("clone");
|
||||
break;
|
||||
case PyDict_EVENT_ADDED:
|
||||
msg = PyUnicode_FromFormat("new:%S:%S", key, new_value);
|
||||
break;
|
||||
case PyDict_EVENT_MODIFIED:
|
||||
msg = PyUnicode_FromFormat("mod:%S:%S", key, new_value);
|
||||
break;
|
||||
case PyDict_EVENT_DELETED:
|
||||
msg = PyUnicode_FromFormat("del:%S", key);
|
||||
break;
|
||||
default:
|
||||
msg = PyUnicode_FromString("unknown");
|
||||
}
|
||||
if (!msg) {
|
||||
return -1;
|
||||
}
|
||||
assert(PyList_Check(g_dict_watch_events));
|
||||
if (PyList_Append(g_dict_watch_events, msg) < 0) {
|
||||
Py_DECREF(msg);
|
||||
return -1;
|
||||
}
|
||||
Py_DECREF(msg);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int
|
||||
dict_watch_callback_second(PyDict_WatchEvent event,
|
||||
PyObject *dict,
|
||||
PyObject *key,
|
||||
PyObject *new_value)
|
||||
{
|
||||
PyObject *msg = PyUnicode_FromString("second");
|
||||
if (!msg) {
|
||||
return -1;
|
||||
}
|
||||
if (PyList_Append(g_dict_watch_events, msg) < 0) {
|
||||
Py_DECREF(msg);
|
||||
return -1;
|
||||
}
|
||||
Py_DECREF(msg);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int
|
||||
dict_watch_callback_error(PyDict_WatchEvent event,
|
||||
PyObject *dict,
|
||||
PyObject *key,
|
||||
PyObject *new_value)
|
||||
{
|
||||
PyErr_SetString(PyExc_RuntimeError, "boom!");
|
||||
return -1;
|
||||
}
|
||||
|
||||
static PyObject *
|
||||
add_dict_watcher(PyObject *self, PyObject *kind)
|
||||
{
|
||||
int watcher_id;
|
||||
assert(PyLong_Check(kind));
|
||||
long kind_l = PyLong_AsLong(kind);
|
||||
if (kind_l == 2) {
|
||||
watcher_id = PyDict_AddWatcher(dict_watch_callback_second);
|
||||
} else if (kind_l == 1) {
|
||||
watcher_id = PyDict_AddWatcher(dict_watch_callback_error);
|
||||
} else {
|
||||
watcher_id = PyDict_AddWatcher(dict_watch_callback);
|
||||
}
|
||||
if (watcher_id < 0) {
|
||||
return NULL;
|
||||
}
|
||||
if (!g_dict_watchers_installed) {
|
||||
assert(!g_dict_watch_events);
|
||||
if (!(g_dict_watch_events = PyList_New(0))) {
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
g_dict_watchers_installed++;
|
||||
return PyLong_FromLong(watcher_id);
|
||||
}
|
||||
|
||||
static PyObject *
|
||||
clear_dict_watcher(PyObject *self, PyObject *watcher_id)
|
||||
{
|
||||
if (PyDict_ClearWatcher(PyLong_AsLong(watcher_id))) {
|
||||
return NULL;
|
||||
}
|
||||
g_dict_watchers_installed--;
|
||||
if (!g_dict_watchers_installed) {
|
||||
assert(g_dict_watch_events);
|
||||
Py_CLEAR(g_dict_watch_events);
|
||||
}
|
||||
Py_RETURN_NONE;
|
||||
}
|
||||
|
||||
static PyObject *
|
||||
watch_dict(PyObject *self, PyObject *args)
|
||||
{
|
||||
PyObject *dict;
|
||||
int watcher_id;
|
||||
if (!PyArg_ParseTuple(args, "iO", &watcher_id, &dict)) {
|
||||
return NULL;
|
||||
}
|
||||
if (PyDict_Watch(watcher_id, dict)) {
|
||||
return NULL;
|
||||
}
|
||||
Py_RETURN_NONE;
|
||||
}
|
||||
|
||||
static PyObject *
|
||||
unwatch_dict(PyObject *self, PyObject *args)
|
||||
{
|
||||
PyObject *dict;
|
||||
int watcher_id;
|
||||
if (!PyArg_ParseTuple(args, "iO", &watcher_id, &dict)) {
|
||||
return NULL;
|
||||
}
|
||||
if (PyDict_Unwatch(watcher_id, dict)) {
|
||||
return NULL;
|
||||
}
|
||||
Py_RETURN_NONE;
|
||||
}
|
||||
|
||||
static PyObject *
|
||||
get_dict_watcher_events(PyObject *self, PyObject *Py_UNUSED(args))
|
||||
{
|
||||
if (!g_dict_watch_events) {
|
||||
PyErr_SetString(PyExc_RuntimeError, "no watchers active");
|
||||
return NULL;
|
||||
}
|
||||
return Py_NewRef(g_dict_watch_events);
|
||||
}
|
||||
|
||||
|
||||
// Test PyFloat_Pack2(), PyFloat_Pack4() and PyFloat_Pack8()
|
||||
static PyObject *
|
||||
test_float_pack(PyObject *self, PyObject *args)
|
||||
|
@ -3988,128 +3835,6 @@ function_set_kw_defaults(PyObject *self, PyObject *args)
|
|||
Py_RETURN_NONE;
|
||||
}
|
||||
|
||||
|
||||
// type watchers
|
||||
|
||||
static PyObject *g_type_modified_events;
|
||||
static int g_type_watchers_installed;
|
||||
|
||||
static int
|
||||
type_modified_callback(PyTypeObject *type)
|
||||
{
|
||||
assert(PyList_Check(g_type_modified_events));
|
||||
if(PyList_Append(g_type_modified_events, (PyObject *)type) < 0) {
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int
|
||||
type_modified_callback_wrap(PyTypeObject *type)
|
||||
{
|
||||
assert(PyList_Check(g_type_modified_events));
|
||||
PyObject *list = PyList_New(0);
|
||||
if (!list) {
|
||||
return -1;
|
||||
}
|
||||
if (PyList_Append(list, (PyObject *)type) < 0) {
|
||||
Py_DECREF(list);
|
||||
return -1;
|
||||
}
|
||||
if (PyList_Append(g_type_modified_events, list) < 0) {
|
||||
Py_DECREF(list);
|
||||
return -1;
|
||||
}
|
||||
Py_DECREF(list);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int
|
||||
type_modified_callback_error(PyTypeObject *type)
|
||||
{
|
||||
PyErr_SetString(PyExc_RuntimeError, "boom!");
|
||||
return -1;
|
||||
}
|
||||
|
||||
static PyObject *
|
||||
add_type_watcher(PyObject *self, PyObject *kind)
|
||||
{
|
||||
int watcher_id;
|
||||
assert(PyLong_Check(kind));
|
||||
long kind_l = PyLong_AsLong(kind);
|
||||
if (kind_l == 2) {
|
||||
watcher_id = PyType_AddWatcher(type_modified_callback_wrap);
|
||||
} else if (kind_l == 1) {
|
||||
watcher_id = PyType_AddWatcher(type_modified_callback_error);
|
||||
} else {
|
||||
watcher_id = PyType_AddWatcher(type_modified_callback);
|
||||
}
|
||||
if (watcher_id < 0) {
|
||||
return NULL;
|
||||
}
|
||||
if (!g_type_watchers_installed) {
|
||||
assert(!g_type_modified_events);
|
||||
if (!(g_type_modified_events = PyList_New(0))) {
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
g_type_watchers_installed++;
|
||||
return PyLong_FromLong(watcher_id);
|
||||
}
|
||||
|
||||
static PyObject *
|
||||
clear_type_watcher(PyObject *self, PyObject *watcher_id)
|
||||
{
|
||||
if (PyType_ClearWatcher(PyLong_AsLong(watcher_id))) {
|
||||
return NULL;
|
||||
}
|
||||
g_type_watchers_installed--;
|
||||
if (!g_type_watchers_installed) {
|
||||
assert(g_type_modified_events);
|
||||
Py_CLEAR(g_type_modified_events);
|
||||
}
|
||||
Py_RETURN_NONE;
|
||||
}
|
||||
|
||||
static PyObject *
|
||||
get_type_modified_events(PyObject *self, PyObject *Py_UNUSED(args))
|
||||
{
|
||||
if (!g_type_modified_events) {
|
||||
PyErr_SetString(PyExc_RuntimeError, "no watchers active");
|
||||
return NULL;
|
||||
}
|
||||
return Py_NewRef(g_type_modified_events);
|
||||
}
|
||||
|
||||
static PyObject *
|
||||
watch_type(PyObject *self, PyObject *args)
|
||||
{
|
||||
PyObject *type;
|
||||
int watcher_id;
|
||||
if (!PyArg_ParseTuple(args, "iO", &watcher_id, &type)) {
|
||||
return NULL;
|
||||
}
|
||||
if (PyType_Watch(watcher_id, type)) {
|
||||
return NULL;
|
||||
}
|
||||
Py_RETURN_NONE;
|
||||
}
|
||||
|
||||
static PyObject *
|
||||
unwatch_type(PyObject *self, PyObject *args)
|
||||
{
|
||||
PyObject *type;
|
||||
int watcher_id;
|
||||
if (!PyArg_ParseTuple(args, "iO", &watcher_id, &type)) {
|
||||
return NULL;
|
||||
}
|
||||
if (PyType_Unwatch(watcher_id, type)) {
|
||||
return NULL;
|
||||
}
|
||||
Py_RETURN_NONE;
|
||||
}
|
||||
|
||||
|
||||
static PyObject *test_buildvalue_issue38913(PyObject *, PyObject *);
|
||||
|
||||
static PyMethodDef TestMethods[] = {
|
||||
|
@ -4259,11 +3984,6 @@ static PyMethodDef TestMethods[] = {
|
|||
{"settrace_to_record", settrace_to_record, METH_O, NULL},
|
||||
{"test_macros", test_macros, METH_NOARGS, NULL},
|
||||
{"clear_managed_dict", clear_managed_dict, METH_O, NULL},
|
||||
{"add_dict_watcher", add_dict_watcher, METH_O, NULL},
|
||||
{"clear_dict_watcher", clear_dict_watcher, METH_O, NULL},
|
||||
{"watch_dict", watch_dict, METH_VARARGS, NULL},
|
||||
{"unwatch_dict", unwatch_dict, METH_VARARGS, NULL},
|
||||
{"get_dict_watcher_events", get_dict_watcher_events, METH_NOARGS, NULL},
|
||||
{"function_get_code", function_get_code, METH_O, NULL},
|
||||
{"function_get_globals", function_get_globals, METH_O, NULL},
|
||||
{"function_get_module", function_get_module, METH_O, NULL},
|
||||
|
@ -4271,11 +3991,6 @@ static PyMethodDef TestMethods[] = {
|
|||
{"function_set_defaults", function_set_defaults, METH_VARARGS, NULL},
|
||||
{"function_get_kw_defaults", function_get_kw_defaults, METH_O, NULL},
|
||||
{"function_set_kw_defaults", function_set_kw_defaults, METH_VARARGS, NULL},
|
||||
{"add_type_watcher", add_type_watcher, METH_O, NULL},
|
||||
{"clear_type_watcher", clear_type_watcher, METH_O, NULL},
|
||||
{"watch_type", watch_type, METH_VARARGS, NULL},
|
||||
{"unwatch_type", unwatch_type, METH_VARARGS, NULL},
|
||||
{"get_type_modified_events", get_type_modified_events, METH_NOARGS, NULL},
|
||||
{NULL, NULL} /* sentinel */
|
||||
};
|
||||
|
||||
|
@ -5096,6 +4811,9 @@ PyInit__testcapi(void)
|
|||
if (_PyTestCapi_Init_Mem(m) < 0) {
|
||||
return NULL;
|
||||
}
|
||||
if (_PyTestCapi_Init_Watchers(m) < 0) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
#ifndef LIMITED_API_AVAILABLE
|
||||
PyModule_AddObjectRef(m, "LIMITED_API_AVAILABLE", Py_False);
|
||||
|
|
|
@ -103,6 +103,7 @@
|
|||
<ClCompile Include="..\Modules\_testcapi\datetime.c" />
|
||||
<ClCompile Include="..\Modules\_testcapi\docstring.c" />
|
||||
<ClCompile Include="..\Modules\_testcapi\mem.c" />
|
||||
<ClCompile Include="..\Modules\_testcapi\watchers.c" />
|
||||
</ItemGroup>
|
||||
<ItemGroup>
|
||||
<ResourceCompile Include="..\PC\python_nt.rc" />
|
||||
|
|
|
@ -39,6 +39,9 @@
|
|||
<ClCompile Include="..\Modules\_testcapi\mem.c">
|
||||
<Filter>Source Files</Filter>
|
||||
</ClCompile>
|
||||
<ClCompile Include="..\Modules\_testcapi\watchers.c">
|
||||
<Filter>Source Files</Filter>
|
||||
</ClCompile>
|
||||
</ItemGroup>
|
||||
<ItemGroup>
|
||||
<ResourceCompile Include="..\PC\python_nt.rc">
|
||||
|
|
Loading…
Reference in New Issue