gh-61215: New mock to wait for multi-threaded events to happen (#16094)

mock: Add `ThreadingMock` class

Add a new class that allows to wait for a call to happen by using
`Event` objects. This mock class can be used to test and validate
expectations of multithreading code.

It uses two attributes for events to distinguish calls with any argument
and calls with specific arguments.

The calls with specific arguments need a lock to prevent two calls in
parallel from creating the same event twice.

The timeout is configured at class and constructor level to allow users
to set a timeout, we considered passing it as an argument to the
function but it could collide with a function parameter. Alternatively
we also considered passing it as positional only but from an API
caller perspective it was unclear what the first number meant on the
function call, think `mock.wait_until_called(1, "arg1", "arg2")`, where
1 is the timeout.

Lastly we also considered adding the new attributes to magic mock
directly rather than having a custom mock class for multi threading
scenarios, but we preferred to have specialised class that can be
composed if necessary. Additionally, having added it to `MagicMock`
directly would have resulted in `AsyncMock` having this logic, which
would not work as expected, since when if user "waits" on a
coroutine does not have the same meaning as waiting on a standard
call.

Co-authored-by: Karthikeyan Singaravelan <tir.karthi@gmail.com>
This commit is contained in:
Mario Corchero 2023-07-03 07:56:54 +01:00 committed by GitHub
parent 0355625d94
commit d65b783b69
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 422 additions and 0 deletions

View File

@ -205,8 +205,10 @@ The Mock Class
import asyncio
import inspect
import unittest
import threading
from unittest.mock import sentinel, DEFAULT, ANY
from unittest.mock import patch, call, Mock, MagicMock, PropertyMock, AsyncMock
from unittest.mock import ThreadingMock
from unittest.mock import mock_open
:class:`Mock` is a flexible mock object intended to replace the use of stubs and
@ -1099,6 +1101,51 @@ object::
[call('foo'), call('bar')]
.. class:: ThreadingMock(spec=None, side_effect=None, return_value=DEFAULT, wraps=None, name=None, spec_set=None, unsafe=False, *, timeout=UNSET, **kwargs)
A version of :class:`MagicMock` for multithreading tests. The
:class:`ThreadingMock` object provides extra methods to wait for a call to
be invoked, rather than assert on it immediately.
The default timeout is specified by the ``timeout`` argument, or if unset by the
:attr:`ThreadingMock.DEFAULT_TIMEOUT` attribute, which defaults to blocking (``None``).
You can configure the global default timeout by setting :attr:`ThreadingMock.DEFAULT_TIMEOUT`.
.. method:: wait_until_called(*, timeout=UNSET)
Waits until the mock is called.
If a timeout was passed at the creation of the mock or if a timeout
argument is passed to this function, the function raises an
:exc:`AssertionError` if the call is not performed in time.
>>> mock = ThreadingMock()
>>> thread = threading.Thread(target=mock)
>>> thread.start()
>>> mock.wait_until_called(timeout=1)
>>> thread.join()
.. method:: wait_until_any_call(*args, **kwargs)
Waits until the the mock is called with the specified arguments.
If a timeout was passed at the creation of the mock
the function raises an :exc:`AssertionError` if the call is not performed in time.
>>> mock = ThreadingMock()
>>> thread = threading.Thread(target=mock, args=("arg1", "arg2",), kwargs={"arg": "thing"})
>>> thread.start()
>>> mock.wait_until_any_call("arg1", "arg2", arg="thing")
>>> thread.join()
.. attribute:: DEFAULT_TIMEOUT
Global default timeout in seconds to create instances of :class:`ThreadingMock`.
.. versionadded:: 3.13
Calling
~~~~~~~

View File

@ -0,0 +1,278 @@
import time
import unittest
import concurrent.futures
from unittest.mock import patch, ThreadingMock, call
class Something:
def method_1(self):
pass
def method_2(self):
pass
class TestThreadingMock(unittest.TestCase):
def _call_after_delay(self, func, /, *args, **kwargs):
time.sleep(kwargs.pop("delay"))
func(*args, **kwargs)
def setUp(self):
self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=5)
def tearDown(self):
self._executor.shutdown()
def run_async(self, func, /, *args, delay=0, **kwargs):
self._executor.submit(
self._call_after_delay, func, *args, **kwargs, delay=delay
)
def _make_mock(self, *args, **kwargs):
return ThreadingMock(*args, **kwargs)
def test_spec(self):
waitable_mock = self._make_mock(spec=Something)
with patch(f"{__name__}.Something", waitable_mock) as m:
something = m()
self.assertIsInstance(something.method_1, ThreadingMock)
self.assertIsInstance(something.method_1().method_2(), ThreadingMock)
with self.assertRaises(AttributeError):
m.test
def test_side_effect(self):
waitable_mock = self._make_mock()
with patch(f"{__name__}.Something", waitable_mock):
something = Something()
something.method_1.side_effect = [1]
self.assertEqual(something.method_1(), 1)
def test_instance_check(self):
waitable_mock = self._make_mock()
with patch(f"{__name__}.Something", waitable_mock):
something = Something()
self.assertIsInstance(something.method_1, ThreadingMock)
self.assertIsInstance(something.method_1().method_2(), ThreadingMock)
def test_dynamic_child_mocks_are_threading_mocks(self):
waitable_mock = self._make_mock()
self.assertIsInstance(waitable_mock.child, ThreadingMock)
def test_dynamic_child_mocks_inherit_timeout(self):
mock1 = self._make_mock()
self.assertIs(mock1._mock_wait_timeout, None)
mock2 = self._make_mock(timeout=2)
self.assertEqual(mock2._mock_wait_timeout, 2)
mock3 = self._make_mock(timeout=3)
self.assertEqual(mock3._mock_wait_timeout, 3)
self.assertIs(mock1.child._mock_wait_timeout, None)
self.assertEqual(mock2.child._mock_wait_timeout, 2)
self.assertEqual(mock3.child._mock_wait_timeout, 3)
self.assertEqual(mock2.really().__mul__().complex._mock_wait_timeout, 2)
def test_no_name_clash(self):
waitable_mock = self._make_mock()
waitable_mock._event = "myevent"
waitable_mock.event = "myevent"
waitable_mock.timeout = "mytimeout"
waitable_mock("works")
waitable_mock.wait_until_called()
waitable_mock.wait_until_any_call("works")
def test_wait_success(self):
waitable_mock = self._make_mock(spec=Something)
with patch(f"{__name__}.Something", waitable_mock):
something = Something()
self.run_async(something.method_1, delay=0.01)
something.method_1.wait_until_called()
something.method_1.wait_until_any_call()
something.method_1.assert_called()
def test_wait_success_with_instance_timeout(self):
waitable_mock = self._make_mock(timeout=1)
with patch(f"{__name__}.Something", waitable_mock):
something = Something()
self.run_async(something.method_1, delay=0.01)
something.method_1.wait_until_called()
something.method_1.wait_until_any_call()
something.method_1.assert_called()
def test_wait_failed_with_instance_timeout(self):
waitable_mock = self._make_mock(timeout=0.01)
with patch(f"{__name__}.Something", waitable_mock):
something = Something()
self.run_async(something.method_1, delay=0.5)
self.assertRaises(AssertionError, waitable_mock.method_1.wait_until_called)
self.assertRaises(
AssertionError, waitable_mock.method_1.wait_until_any_call
)
def test_wait_success_with_timeout_override(self):
waitable_mock = self._make_mock(timeout=0.01)
with patch(f"{__name__}.Something", waitable_mock):
something = Something()
self.run_async(something.method_1, delay=0.05)
something.method_1.wait_until_called(timeout=1)
def test_wait_failed_with_timeout_override(self):
waitable_mock = self._make_mock(timeout=1)
with patch(f"{__name__}.Something", waitable_mock):
something = Something()
self.run_async(something.method_1, delay=0.1)
with self.assertRaises(AssertionError):
something.method_1.wait_until_called(timeout=0.05)
with self.assertRaises(AssertionError):
something.method_1.wait_until_any_call(timeout=0.05)
def test_wait_success_called_before(self):
waitable_mock = self._make_mock()
with patch(f"{__name__}.Something", waitable_mock):
something = Something()
something.method_1()
something.method_1.wait_until_called()
something.method_1.wait_until_any_call()
something.method_1.assert_called()
def test_wait_magic_method(self):
waitable_mock = self._make_mock()
with patch(f"{__name__}.Something", waitable_mock):
something = Something()
self.run_async(something.method_1.__str__, delay=0.01)
something.method_1.__str__.wait_until_called()
something.method_1.__str__.assert_called()
def test_wait_until_any_call_positional(self):
waitable_mock = self._make_mock(spec=Something)
with patch(f"{__name__}.Something", waitable_mock):
something = Something()
self.run_async(something.method_1, 1, delay=0.1)
self.run_async(something.method_1, 2, delay=0.2)
self.run_async(something.method_1, 3, delay=0.3)
self.assertNotIn(call(1), something.method_1.mock_calls)
something.method_1.wait_until_any_call(1)
something.method_1.assert_called_with(1)
self.assertNotIn(call(2), something.method_1.mock_calls)
self.assertNotIn(call(3), something.method_1.mock_calls)
something.method_1.wait_until_any_call(3)
self.assertIn(call(2), something.method_1.mock_calls)
something.method_1.wait_until_any_call(2)
def test_wait_until_any_call_keywords(self):
waitable_mock = self._make_mock(spec=Something)
with patch(f"{__name__}.Something", waitable_mock):
something = Something()
self.run_async(something.method_1, a=1, delay=0.1)
self.run_async(something.method_1, b=2, delay=0.2)
self.run_async(something.method_1, c=3, delay=0.3)
self.assertNotIn(call(a=1), something.method_1.mock_calls)
something.method_1.wait_until_any_call(a=1)
something.method_1.assert_called_with(a=1)
self.assertNotIn(call(b=2), something.method_1.mock_calls)
self.assertNotIn(call(c=3), something.method_1.mock_calls)
something.method_1.wait_until_any_call(c=3)
self.assertIn(call(b=2), something.method_1.mock_calls)
something.method_1.wait_until_any_call(b=2)
def test_wait_until_any_call_no_argument_fails_when_called_with_arg(self):
waitable_mock = self._make_mock(timeout=0.01)
with patch(f"{__name__}.Something", waitable_mock):
something = Something()
something.method_1(1)
something.method_1.assert_called_with(1)
with self.assertRaises(AssertionError):
something.method_1.wait_until_any_call()
something.method_1()
something.method_1.wait_until_any_call()
def test_wait_until_any_call_global_default(self):
with patch.object(ThreadingMock, "DEFAULT_TIMEOUT"):
ThreadingMock.DEFAULT_TIMEOUT = 0.01
m = self._make_mock()
with self.assertRaises(AssertionError):
m.wait_until_any_call()
with self.assertRaises(AssertionError):
m.wait_until_called()
m()
m.wait_until_any_call()
assert ThreadingMock.DEFAULT_TIMEOUT != 0.01
def test_wait_until_any_call_change_global_and_override(self):
with patch.object(ThreadingMock, "DEFAULT_TIMEOUT"):
ThreadingMock.DEFAULT_TIMEOUT = 0.01
m1 = self._make_mock()
self.run_async(m1, delay=0.1)
with self.assertRaises(AssertionError):
m1.wait_until_called()
m2 = self._make_mock(timeout=1)
self.run_async(m2, delay=0.1)
m2.wait_until_called()
m3 = self._make_mock()
self.run_async(m3, delay=0.1)
m3.wait_until_called(timeout=1)
m4 = self._make_mock()
self.run_async(m4, delay=0.1)
m4.wait_until_called(timeout=None)
m5 = self._make_mock(timeout=None)
self.run_async(m5, delay=0.1)
m5.wait_until_called()
assert ThreadingMock.DEFAULT_TIMEOUT != 0.01
def test_reset_mock_resets_wait(self):
m = self._make_mock(timeout=0.01)
with self.assertRaises(AssertionError):
m.wait_until_called()
with self.assertRaises(AssertionError):
m.wait_until_any_call()
m()
m.wait_until_called()
m.wait_until_any_call()
m.assert_called_once()
m.reset_mock()
with self.assertRaises(AssertionError):
m.wait_until_called()
with self.assertRaises(AssertionError):
m.wait_until_any_call()
m()
m.wait_until_called()
m.wait_until_any_call()
m.assert_called_once()
if __name__ == "__main__":
unittest.main()

View File

@ -14,6 +14,7 @@ __all__ = (
'call',
'create_autospec',
'AsyncMock',
'ThreadingMock',
'FILTER_DIR',
'NonCallableMock',
'NonCallableMagicMock',
@ -32,6 +33,7 @@ import sys
import builtins
import pkgutil
from asyncio import iscoroutinefunction
import threading
from types import CodeType, ModuleType, MethodType
from unittest.util import safe_repr
from functools import wraps, partial
@ -3003,6 +3005,98 @@ class PropertyMock(Mock):
self(val)
_timeout_unset = sentinel.TIMEOUT_UNSET
class ThreadingMixin(Base):
DEFAULT_TIMEOUT = None
def _get_child_mock(self, /, **kw):
if "timeout" in kw:
kw["timeout"] = kw.pop("timeout")
elif isinstance(kw.get("parent"), ThreadingMixin):
kw["timeout"] = kw["parent"]._mock_wait_timeout
elif isinstance(kw.get("_new_parent"), ThreadingMixin):
kw["timeout"] = kw["_new_parent"]._mock_wait_timeout
return super()._get_child_mock(**kw)
def __init__(self, *args, timeout=_timeout_unset, **kwargs):
super().__init__(*args, **kwargs)
if timeout is _timeout_unset:
timeout = self.DEFAULT_TIMEOUT
self.__dict__["_mock_event"] = threading.Event() # Event for any call
self.__dict__["_mock_calls_events"] = [] # Events for each of the calls
self.__dict__["_mock_calls_events_lock"] = threading.Lock()
self.__dict__["_mock_wait_timeout"] = timeout
def reset_mock(self, /, *args, **kwargs):
"""
See :func:`.Mock.reset_mock()`
"""
super().reset_mock(*args, **kwargs)
self.__dict__["_mock_event"] = threading.Event()
self.__dict__["_mock_calls_events"] = []
def __get_event(self, expected_args, expected_kwargs):
with self._mock_calls_events_lock:
for args, kwargs, event in self._mock_calls_events:
if (args, kwargs) == (expected_args, expected_kwargs):
return event
new_event = threading.Event()
self._mock_calls_events.append((expected_args, expected_kwargs, new_event))
return new_event
def _mock_call(self, *args, **kwargs):
ret_value = super()._mock_call(*args, **kwargs)
call_event = self.__get_event(args, kwargs)
call_event.set()
self._mock_event.set()
return ret_value
def wait_until_called(self, *, timeout=_timeout_unset):
"""Wait until the mock object is called.
`timeout` - time to wait for in seconds, waits forever otherwise.
Defaults to the constructor provided timeout.
Use None to block undefinetively.
"""
if timeout is _timeout_unset:
timeout = self._mock_wait_timeout
if not self._mock_event.wait(timeout=timeout):
msg = (f"{self._mock_name or 'mock'} was not called before"
f" timeout({timeout}).")
raise AssertionError(msg)
def wait_until_any_call(self, *args, **kwargs):
"""Wait until the mock object is called with given args.
Waits for the timeout in seconds provided in the constructor.
"""
event = self.__get_event(args, kwargs)
if not event.wait(timeout=self._mock_wait_timeout):
expected_string = self._format_mock_call_signature(args, kwargs)
raise AssertionError(f'{expected_string} call not found')
class ThreadingMock(ThreadingMixin, MagicMixin, Mock):
"""
A mock that can be used to wait until on calls happening
in a different thread.
The constructor can take a `timeout` argument which
controls the timeout in seconds for all `wait` calls of the mock.
You can change the default timeout of all instances via the
`ThreadingMock.DEFAULT_TIMEOUT` attribute.
If no timeout is set, it will block undefinetively.
"""
pass
def seal(mock):
"""Disable the automatic generation of child mocks.

View File

@ -0,0 +1,3 @@
Add ``ThreadingMock`` to :mod:`unittest.mock` that can be used to create
Mock objects that can wait until they are called. Patch by Karthikeyan
Singaravelan and Mario Corchero.