Test specialization's thread-safety (GH-105953)

This commit is contained in:
Brandt Bucher 2023-06-22 15:02:38 -07:00 committed by GitHub
parent cd5280367a
commit 403a574b00
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 511 additions and 1 deletions

View File

@ -1,3 +1,6 @@
import dis
import threading
import types
import unittest
@ -481,6 +484,513 @@ class TestCallCache(unittest.TestCase):
f()
class TestRacesDoNotCrash(unittest.TestCase):
# Careful with these. Bigger numbers have a higher chance of catching bugs,
# but you can also burn through a *ton* of type/dict/function versions:
ITEMS = 1000
LOOPS = 4
WARMUPS = 2
WRITERS = 2
def assert_specialized(self, f, opname):
instructions = dis.get_instructions(f, adaptive=True)
opnames = {instruction.opname for instruction in instructions}
self.assertIn(opname, opnames)
def assert_races_do_not_crash(
self, opname, get_items, read, write, *, check_items=False
):
# This might need a few dozen loops in some cases:
for _ in range(self.LOOPS):
items = get_items()
# Reset:
if check_items:
for item in items:
item.__code__ = item.__code__.replace()
else:
read.__code__ = read.__code__.replace()
# Specialize:
for _ in range(self.WARMUPS):
read(items)
if check_items:
for item in items:
self.assert_specialized(item, opname)
else:
self.assert_specialized(read, opname)
# Create writers:
writers = []
for _ in range(self.WRITERS):
writer = threading.Thread(target=write, args=[items])
writers.append(writer)
# Run:
for writer in writers:
writer.start()
read(items) # BOOM!
for writer in writers:
writer.join()
def test_binary_subscr_getitem(self):
def get_items():
class C:
__getitem__ = lambda self, item: None
items = []
for _ in range(self.ITEMS):
item = C()
items.append(item)
return items
def read(items):
for item in items:
try:
item[None]
except TypeError:
pass
def write(items):
for item in items:
try:
del item.__getitem__
except AttributeError:
pass
type(item).__getitem__ = lambda self, item: None
opname = "BINARY_SUBSCR_GETITEM"
self.assert_races_do_not_crash(opname, get_items, read, write)
def test_binary_subscr_list_int(self):
def get_items():
items = []
for _ in range(self.ITEMS):
item = [None]
items.append(item)
return items
def read(items):
for item in items:
try:
item[0]
except IndexError:
pass
def write(items):
for item in items:
item.clear()
item.append(None)
opname = "BINARY_SUBSCR_LIST_INT"
self.assert_races_do_not_crash(opname, get_items, read, write)
def test_for_iter_gen(self):
def get_items():
def g():
yield
yield
items = []
for _ in range(self.ITEMS):
item = g()
items.append(item)
return items
def read(items):
for item in items:
try:
for _ in item:
break
except ValueError:
pass
def write(items):
for item in items:
try:
for _ in item:
break
except ValueError:
pass
opname = "FOR_ITER_GEN"
self.assert_races_do_not_crash(opname, get_items, read, write)
def test_for_iter_list(self):
def get_items():
items = []
for _ in range(self.ITEMS):
item = [None]
items.append(item)
return items
def read(items):
for item in items:
for item in item:
break
def write(items):
for item in items:
item.clear()
item.append(None)
opname = "FOR_ITER_LIST"
self.assert_races_do_not_crash(opname, get_items, read, write)
def test_load_attr_class(self):
def get_items():
class C:
a = object()
items = []
for _ in range(self.ITEMS):
item = C
items.append(item)
return items
def read(items):
for item in items:
try:
item.a
except AttributeError:
pass
def write(items):
for item in items:
try:
del item.a
except AttributeError:
pass
item.a = object()
opname = "LOAD_ATTR_CLASS"
self.assert_races_do_not_crash(opname, get_items, read, write)
def test_load_attr_getattribute_overridden(self):
def get_items():
class C:
__getattribute__ = lambda self, name: None
items = []
for _ in range(self.ITEMS):
item = C()
items.append(item)
return items
def read(items):
for item in items:
try:
item.a
except AttributeError:
pass
def write(items):
for item in items:
try:
del item.__getattribute__
except AttributeError:
pass
type(item).__getattribute__ = lambda self, name: None
opname = "LOAD_ATTR_GETATTRIBUTE_OVERRIDDEN"
self.assert_races_do_not_crash(opname, get_items, read, write)
def test_load_attr_instance_value(self):
def get_items():
class C:
pass
items = []
for _ in range(self.ITEMS):
item = C()
item.a = None
items.append(item)
return items
def read(items):
for item in items:
item.a
def write(items):
for item in items:
item.__dict__[None] = None
opname = "LOAD_ATTR_INSTANCE_VALUE"
self.assert_races_do_not_crash(opname, get_items, read, write)
def test_load_attr_method_lazy_dict(self):
def get_items():
class C(Exception):
m = lambda self: None
items = []
for _ in range(self.ITEMS):
item = C()
items.append(item)
return items
def read(items):
for item in items:
try:
item.m()
except AttributeError:
pass
def write(items):
for item in items:
try:
del item.m
except AttributeError:
pass
type(item).m = lambda self: None
opname = "LOAD_ATTR_METHOD_LAZY_DICT"
self.assert_races_do_not_crash(opname, get_items, read, write)
def test_load_attr_method_no_dict(self):
def get_items():
class C:
__slots__ = ()
m = lambda self: None
items = []
for _ in range(self.ITEMS):
item = C()
items.append(item)
return items
def read(items):
for item in items:
try:
item.m()
except AttributeError:
pass
def write(items):
for item in items:
try:
del item.m
except AttributeError:
pass
type(item).m = lambda self: None
opname = "LOAD_ATTR_METHOD_NO_DICT"
self.assert_races_do_not_crash(opname, get_items, read, write)
def test_load_attr_method_with_values(self):
def get_items():
class C:
m = lambda self: None
items = []
for _ in range(self.ITEMS):
item = C()
items.append(item)
return items
def read(items):
for item in items:
try:
item.m()
except AttributeError:
pass
def write(items):
for item in items:
try:
del item.m
except AttributeError:
pass
type(item).m = lambda self: None
opname = "LOAD_ATTR_METHOD_WITH_VALUES"
self.assert_races_do_not_crash(opname, get_items, read, write)
def test_load_attr_module(self):
def get_items():
items = []
for _ in range(self.ITEMS):
item = types.ModuleType("<item>")
items.append(item)
return items
def read(items):
for item in items:
try:
item.__name__
except AttributeError:
pass
def write(items):
for item in items:
d = item.__dict__.copy()
item.__dict__.clear()
item.__dict__.update(d)
opname = "LOAD_ATTR_MODULE"
self.assert_races_do_not_crash(opname, get_items, read, write)
def test_load_attr_property(self):
def get_items():
class C:
a = property(lambda self: None)
items = []
for _ in range(self.ITEMS):
item = C()
items.append(item)
return items
def read(items):
for item in items:
try:
item.a
except AttributeError:
pass
def write(items):
for item in items:
try:
del type(item).a
except AttributeError:
pass
type(item).a = property(lambda self: None)
opname = "LOAD_ATTR_PROPERTY"
self.assert_races_do_not_crash(opname, get_items, read, write)
def test_load_attr_with_hint(self):
def get_items():
class C:
pass
items = []
for _ in range(self.ITEMS):
item = C()
item.__dict__
item.a = None
items.append(item)
return items
def read(items):
for item in items:
item.a
def write(items):
for item in items:
item.__dict__[None] = None
opname = "LOAD_ATTR_WITH_HINT"
self.assert_races_do_not_crash(opname, get_items, read, write)
def test_load_global_module(self):
def get_items():
items = []
for _ in range(self.ITEMS):
item = eval("lambda: x", {"x": None})
items.append(item)
return items
def read(items):
for item in items:
item()
def write(items):
for item in items:
item.__globals__[None] = None
opname = "LOAD_GLOBAL_MODULE"
self.assert_races_do_not_crash(
opname, get_items, read, write, check_items=True
)
def test_store_attr_instance_value(self):
def get_items():
class C:
pass
items = []
for _ in range(self.ITEMS):
item = C()
items.append(item)
return items
def read(items):
for item in items:
item.a = None
def write(items):
for item in items:
item.__dict__[None] = None
opname = "STORE_ATTR_INSTANCE_VALUE"
self.assert_races_do_not_crash(opname, get_items, read, write)
def test_store_attr_with_hint(self):
def get_items():
class C:
pass
items = []
for _ in range(self.ITEMS):
item = C()
item.__dict__
items.append(item)
return items
def read(items):
for item in items:
item.a = None
def write(items):
for item in items:
item.__dict__[None] = None
opname = "STORE_ATTR_WITH_HINT"
self.assert_races_do_not_crash(opname, get_items, read, write)
def test_store_subscr_list_int(self):
def get_items():
items = []
for _ in range(self.ITEMS):
item = [None]
items.append(item)
return items
def read(items):
for item in items:
try:
item[0] = None
except IndexError:
pass
def write(items):
for item in items:
item.clear()
item.append(None)
opname = "STORE_SUBSCR_LIST_INT"
self.assert_races_do_not_crash(opname, get_items, read, write)
def test_unpack_sequence_list(self):
def get_items():
items = []
for _ in range(self.ITEMS):
item = [None]
items.append(item)
return items
def read(items):
for item in items:
try:
[_] = item
except ValueError:
pass
def write(items):
for item in items:
item.clear()
item.append(None)
opname = "UNPACK_SEQUENCE_LIST"
self.assert_races_do_not_crash(opname, get_items, read, write)
if __name__ == "__main__":
import unittest
unittest.main()