gh-98624 Add mutex to unittest.mock.NonCallableMock (#98688)

* Added lock to NonCallableMock in unittest.mock

* Add blurb

* Nitpick blurb

* Edit comment based on @Jason-Y-Z's review

* Add link to GH issue
This commit is contained in:
noah-weingarden 2022-10-28 03:51:18 -04:00 committed by GitHub
parent fbcafa6eee
commit 0346eddbe9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 39 additions and 27 deletions

View File

@ -35,6 +35,7 @@ from asyncio import iscoroutinefunction
from types import CodeType, ModuleType, MethodType
from unittest.util import safe_repr
from functools import wraps, partial
from threading import RLock
class InvalidSpecError(Exception):
@ -402,6 +403,14 @@ class Base(object):
class NonCallableMock(Base):
"""A non-callable version of `Mock`"""
# Store a mutex as a class attribute in order to protect concurrent access
# to mock attributes. Using a class attribute allows all NonCallableMock
# instances to share the mutex for simplicity.
#
# See https://github.com/python/cpython/issues/98624 for why this is
# necessary.
_lock = RLock()
def __new__(cls, /, *args, **kw):
# every instance has its own class
# so we can create magic methods on the
@ -644,35 +653,36 @@ class NonCallableMock(Base):
f"{name!r} is not a valid assertion. Use a spec "
f"for the mock if {name!r} is meant to be an attribute.")
result = self._mock_children.get(name)
if result is _deleted:
raise AttributeError(name)
elif result is None:
wraps = None
if self._mock_wraps is not None:
# XXXX should we get the attribute without triggering code
# execution?
wraps = getattr(self._mock_wraps, name)
with NonCallableMock._lock:
result = self._mock_children.get(name)
if result is _deleted:
raise AttributeError(name)
elif result is None:
wraps = None
if self._mock_wraps is not None:
# XXXX should we get the attribute without triggering code
# execution?
wraps = getattr(self._mock_wraps, name)
result = self._get_child_mock(
parent=self, name=name, wraps=wraps, _new_name=name,
_new_parent=self
)
self._mock_children[name] = result
elif isinstance(result, _SpecState):
try:
result = create_autospec(
result.spec, result.spec_set, result.instance,
result.parent, result.name
result = self._get_child_mock(
parent=self, name=name, wraps=wraps, _new_name=name,
_new_parent=self
)
except InvalidSpecError:
target_name = self.__dict__['_mock_name'] or self
raise InvalidSpecError(
f'Cannot autospec attr {name!r} from target '
f'{target_name!r} as it has already been mocked out. '
f'[target={self!r}, attr={result.spec!r}]')
self._mock_children[name] = result
self._mock_children[name] = result
elif isinstance(result, _SpecState):
try:
result = create_autospec(
result.spec, result.spec_set, result.instance,
result.parent, result.name
)
except InvalidSpecError:
target_name = self.__dict__['_mock_name'] or self
raise InvalidSpecError(
f'Cannot autospec attr {name!r} from target '
f'{target_name!r} as it has already been mocked out. '
f'[target={self!r}, attr={result.spec!r}]')
self._mock_children[name] = result
return result

View File

@ -0,0 +1,2 @@
Add a mutex to unittest.mock.NonCallableMock to protect concurrent access
to mock attributes.