diff --git a/Lib/asyncio/events.py b/Lib/asyncio/events.py index 7ebc3cb4f3e..36ae312b079 100644 --- a/Lib/asyncio/events.py +++ b/Lib/asyncio/events.py @@ -1,10 +1,11 @@ """Event loop and event loop policy.""" -__all__ = ['AbstractEventLoopPolicy', 'DefaultEventLoopPolicy', +__all__ = ['AbstractEventLoopPolicy', 'AbstractEventLoop', 'AbstractServer', 'Handle', 'TimerHandle', 'get_event_loop_policy', 'set_event_loop_policy', 'get_event_loop', 'set_event_loop', 'new_event_loop', + 'get_child_watcher', 'set_child_watcher', ] import subprocess @@ -318,8 +319,18 @@ class AbstractEventLoopPolicy: """XXX""" raise NotImplementedError + # Child processes handling (Unix only). -class DefaultEventLoopPolicy(threading.local, AbstractEventLoopPolicy): + def get_child_watcher(self): + """XXX""" + raise NotImplementedError + + def set_child_watcher(self, watcher): + """XXX""" + raise NotImplementedError + + +class BaseDefaultEventLoopPolicy(AbstractEventLoopPolicy): """Default policy implementation for accessing the event loop. In this policy, each thread has its own event loop. However, we @@ -332,28 +343,34 @@ class DefaultEventLoopPolicy(threading.local, AbstractEventLoopPolicy): associated). """ - _loop = None - _set_called = False + _loop_factory = None + + class _Local(threading.local): + _loop = None + _set_called = False + + def __init__(self): + self._local = self._Local() def get_event_loop(self): """Get the event loop. This may be None or an instance of EventLoop. """ - if (self._loop is None and - not self._set_called and + if (self._local._loop is None and + not self._local._set_called and isinstance(threading.current_thread(), threading._MainThread)): - self._loop = self.new_event_loop() - assert self._loop is not None, \ + self._local._loop = self.new_event_loop() + assert self._local._loop is not None, \ ('There is no current event loop in thread %r.' % threading.current_thread().name) - return self._loop + return self._local._loop def set_event_loop(self, loop): """Set the event loop.""" - self._set_called = True + self._local._set_called = True assert loop is None or isinstance(loop, AbstractEventLoop) - self._loop = loop + self._local._loop = loop def new_event_loop(self): """Create a new event loop. @@ -361,12 +378,7 @@ class DefaultEventLoopPolicy(threading.local, AbstractEventLoopPolicy): You must call set_event_loop() to make this the current event loop. """ - if sys.platform == 'win32': # pragma: no cover - from . import windows_events - return windows_events.SelectorEventLoop() - else: # pragma: no cover - from . import unix_events - return unix_events.SelectorEventLoop() + return self._loop_factory() # Event loop policy. The policy itself is always global, even if the @@ -375,12 +387,22 @@ class DefaultEventLoopPolicy(threading.local, AbstractEventLoopPolicy): # call to get_event_loop_policy(). _event_loop_policy = None +# Lock for protecting the on-the-fly creation of the event loop policy. +_lock = threading.Lock() + + +def _init_event_loop_policy(): + global _event_loop_policy + with _lock: + if _event_loop_policy is None: # pragma: no branch + from . import DefaultEventLoopPolicy + _event_loop_policy = DefaultEventLoopPolicy() + def get_event_loop_policy(): """XXX""" - global _event_loop_policy if _event_loop_policy is None: - _event_loop_policy = DefaultEventLoopPolicy() + _init_event_loop_policy() return _event_loop_policy @@ -404,3 +426,13 @@ def set_event_loop(loop): def new_event_loop(): """XXX""" return get_event_loop_policy().new_event_loop() + + +def get_child_watcher(): + """XXX""" + return get_event_loop_policy().get_child_watcher() + + +def set_child_watcher(watcher): + """XXX""" + return get_event_loop_policy().set_child_watcher(watcher) diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py index c95ad488c00..dd57fe8e797 100644 --- a/Lib/asyncio/unix_events.py +++ b/Lib/asyncio/unix_events.py @@ -8,6 +8,7 @@ import socket import stat import subprocess import sys +import threading from . import base_subprocess @@ -20,7 +21,10 @@ from . import transports from .log import logger -__all__ = ['SelectorEventLoop', 'STDIN', 'STDOUT', 'STDERR'] +__all__ = ['SelectorEventLoop', 'STDIN', 'STDOUT', 'STDERR', + 'AbstractChildWatcher', 'SafeChildWatcher', + 'FastChildWatcher', 'DefaultEventLoopPolicy', + ] STDIN = 0 STDOUT = 1 @@ -31,7 +35,7 @@ if sys.platform == 'win32': # pragma: no cover raise ImportError('Signals are not really supported on Windows') -class SelectorEventLoop(selector_events.BaseSelectorEventLoop): +class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop): """Unix event loop Adds signal handling to SelectorEventLoop @@ -40,17 +44,10 @@ class SelectorEventLoop(selector_events.BaseSelectorEventLoop): def __init__(self, selector=None): super().__init__(selector) self._signal_handlers = {} - self._subprocesses = {} def _socketpair(self): return socket.socketpair() - def close(self): - handler = self._signal_handlers.get(signal.SIGCHLD) - if handler is not None: - self.remove_signal_handler(signal.SIGCHLD) - super().close() - def add_signal_handler(self, sig, callback, *args): """Add a handler for a signal. UNIX only. @@ -152,49 +149,20 @@ class SelectorEventLoop(selector_events.BaseSelectorEventLoop): def _make_subprocess_transport(self, protocol, args, shell, stdin, stdout, stderr, bufsize, extra=None, **kwargs): - self._reg_sigchld() - transp = _UnixSubprocessTransport(self, protocol, args, shell, - stdin, stdout, stderr, bufsize, - extra=None, **kwargs) - self._subprocesses[transp.get_pid()] = transp + with events.get_child_watcher() as watcher: + transp = _UnixSubprocessTransport(self, protocol, args, shell, + stdin, stdout, stderr, bufsize, + extra=None, **kwargs) + watcher.add_child_handler(transp.get_pid(), + self._child_watcher_callback, transp) yield from transp._post_init() return transp - def _reg_sigchld(self): - if signal.SIGCHLD not in self._signal_handlers: - self.add_signal_handler(signal.SIGCHLD, self._sig_chld) + def _child_watcher_callback(self, pid, returncode, transp): + self.call_soon_threadsafe(transp._process_exited, returncode) - def _sig_chld(self): - try: - # Because of signal coalescing, we must keep calling waitpid() as - # long as we're able to reap a child. - while True: - try: - pid, status = os.waitpid(-1, os.WNOHANG) - except ChildProcessError: - break # No more child processes exist. - if pid == 0: - break # All remaining child processes are still alive. - elif os.WIFSIGNALED(status): - # A child process died because of a signal. - returncode = -os.WTERMSIG(status) - elif os.WIFEXITED(status): - # A child process exited (e.g. sys.exit()). - returncode = os.WEXITSTATUS(status) - else: - # A child exited, but we don't understand its status. - # This shouldn't happen, but if it does, let's just - # return that status; perhaps that helps debug it. - returncode = status - transp = self._subprocesses.get(pid) - if transp is not None: - transp._process_exited(returncode) - except Exception: - logger.exception('Unknown exception in SIGCHLD handler') - - def _subprocess_closed(self, transport): - pid = transport.get_pid() - self._subprocesses.pop(pid, None) + def _subprocess_closed(self, transp): + pass def _set_nonblocking(fd): @@ -423,3 +391,335 @@ class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport): if stdin_w is not None: stdin.close() self._proc.stdin = open(stdin_w.detach(), 'rb', buffering=bufsize) + + +class AbstractChildWatcher: + """Abstract base class for monitoring child processes. + + Objects derived from this class monitor a collection of subprocesses and + report their termination or interruption by a signal. + + New callbacks are registered with .add_child_handler(). Starting a new + process must be done within a 'with' block to allow the watcher to suspend + its activity until the new process if fully registered (this is needed to + prevent a race condition in some implementations). + + Example: + with watcher: + proc = subprocess.Popen("sleep 1") + watcher.add_child_handler(proc.pid, callback) + + Notes: + Implementations of this class must be thread-safe. + + Since child watcher objects may catch the SIGCHLD signal and call + waitpid(-1), there should be only one active object per process. + """ + + def add_child_handler(self, pid, callback, *args): + """Register a new child handler. + + Arrange for callback(pid, returncode, *args) to be called when + process 'pid' terminates. Specifying another callback for the same + process replaces the previous handler. + + Note: callback() must be thread-safe + """ + raise NotImplementedError() + + def remove_child_handler(self, pid): + """Removes the handler for process 'pid'. + + The function returns True if the handler was successfully removed, + False if there was nothing to remove.""" + + raise NotImplementedError() + + def set_loop(self, loop): + """Reattach the watcher to another event loop. + + Note: loop may be None + """ + raise NotImplementedError() + + def close(self): + """Close the watcher. + + This must be called to make sure that any underlying resource is freed. + """ + raise NotImplementedError() + + def __enter__(self): + """Enter the watcher's context and allow starting new processes + + This function must return self""" + raise NotImplementedError() + + def __exit__(self, a, b, c): + """Exit the watcher's context""" + raise NotImplementedError() + + +class BaseChildWatcher(AbstractChildWatcher): + + def __init__(self, loop): + self._loop = None + self._callbacks = {} + + self.set_loop(loop) + + def close(self): + self.set_loop(None) + self._callbacks.clear() + + def _do_waitpid(self, expected_pid): + raise NotImplementedError() + + def _do_waitpid_all(self): + raise NotImplementedError() + + def set_loop(self, loop): + assert loop is None or isinstance(loop, events.AbstractEventLoop) + + if self._loop is not None: + self._loop.remove_signal_handler(signal.SIGCHLD) + + self._loop = loop + if loop is not None: + loop.add_signal_handler(signal.SIGCHLD, self._sig_chld) + + # Prevent a race condition in case a child terminated + # during the switch. + self._do_waitpid_all() + + def remove_child_handler(self, pid): + try: + del self._callbacks[pid] + return True + except KeyError: + return False + + def _sig_chld(self): + try: + self._do_waitpid_all() + except Exception: + logger.exception('Unknown exception in SIGCHLD handler') + + def _compute_returncode(self, status): + if os.WIFSIGNALED(status): + # The child process died because of a signal. + return -os.WTERMSIG(status) + elif os.WIFEXITED(status): + # The child process exited (e.g sys.exit()). + return os.WEXITSTATUS(status) + else: + # The child exited, but we don't understand its status. + # This shouldn't happen, but if it does, let's just + # return that status; perhaps that helps debug it. + return status + + +class SafeChildWatcher(BaseChildWatcher): + """'Safe' child watcher implementation. + + This implementation avoids disrupting other code spawning processes by + polling explicitly each process in the SIGCHLD handler instead of calling + os.waitpid(-1). + + This is a safe solution but it has a significant overhead when handling a + big number of children (O(n) each time SIGCHLD is raised) + """ + + def __enter__(self): + return self + + def __exit__(self, a, b, c): + pass + + def add_child_handler(self, pid, callback, *args): + self._callbacks[pid] = callback, args + + # Prevent a race condition in case the child is already terminated. + self._do_waitpid(pid) + + def _do_waitpid_all(self): + + for pid in list(self._callbacks): + self._do_waitpid(pid) + + def _do_waitpid(self, expected_pid): + assert expected_pid > 0 + + try: + pid, status = os.waitpid(expected_pid, os.WNOHANG) + except ChildProcessError: + # The child process is already reaped + # (may happen if waitpid() is called elsewhere). + pid = expected_pid + returncode = 255 + logger.warning( + "Unknown child process pid %d, will report returncode 255", + pid) + else: + if pid == 0: + # The child process is still alive. + return + + returncode = self._compute_returncode(status) + + try: + callback, args = self._callbacks.pop(pid) + except KeyError: # pragma: no cover + # May happen if .remove_child_handler() is called + # after os.waitpid() returns. + pass + else: + callback(pid, returncode, *args) + + +class FastChildWatcher(BaseChildWatcher): + """'Fast' child watcher implementation. + + This implementation reaps every terminated processes by calling + os.waitpid(-1) directly, possibly breaking other code spawning processes + and waiting for their termination. + + There is no noticeable overhead when handling a big number of children + (O(1) each time a child terminates). + """ + def __init__(self, loop): + super().__init__(loop) + + self._lock = threading.Lock() + self._zombies = {} + self._forks = 0 + + def close(self): + super().close() + self._zombies.clear() + + def __enter__(self): + with self._lock: + self._forks += 1 + + return self + + def __exit__(self, a, b, c): + with self._lock: + self._forks -= 1 + + if self._forks or not self._zombies: + return + + collateral_victims = str(self._zombies) + self._zombies.clear() + + logger.warning( + "Caught subprocesses termination from unknown pids: %s", + collateral_victims) + + def add_child_handler(self, pid, callback, *args): + assert self._forks, "Must use the context manager" + + self._callbacks[pid] = callback, args + + try: + # Ensure that the child is not already terminated. + # (raise KeyError if still alive) + returncode = self._zombies.pop(pid) + + # Child is dead, therefore we can fire the callback immediately. + # First we remove it from the dict. + # (raise KeyError if .remove_child_handler() was called in-between) + del self._callbacks[pid] + except KeyError: + pass + else: + callback(pid, returncode, *args) + + def _do_waitpid_all(self): + # Because of signal coalescing, we must keep calling waitpid() as + # long as we're able to reap a child. + while True: + try: + pid, status = os.waitpid(-1, os.WNOHANG) + except ChildProcessError: + # No more child processes exist. + return + else: + if pid == 0: + # A child process is still alive. + return + + returncode = self._compute_returncode(status) + + try: + callback, args = self._callbacks.pop(pid) + except KeyError: + # unknown child + with self._lock: + if self._forks: + # It may not be registered yet. + self._zombies[pid] = returncode + continue + + logger.warning( + "Caught subprocess termination from unknown pid: " + "%d -> %d", pid, returncode) + else: + callback(pid, returncode, *args) + + +class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy): + """XXX""" + _loop_factory = _UnixSelectorEventLoop + + def __init__(self): + super().__init__() + self._watcher = None + + def _init_watcher(self): + with events._lock: + if self._watcher is None: # pragma: no branch + if isinstance(threading.current_thread(), + threading._MainThread): + self._watcher = SafeChildWatcher(self._local._loop) + else: + self._watcher = SafeChildWatcher(None) + + def set_event_loop(self, loop): + """Set the event loop. + + As a side effect, if a child watcher was set before, then calling + .set_event_loop() from the main thread will call .set_loop(loop) on the + child watcher. + """ + + super().set_event_loop(loop) + + if self._watcher is not None and \ + isinstance(threading.current_thread(), threading._MainThread): + self._watcher.set_loop(loop) + + def get_child_watcher(self): + """Get the child watcher + + If not yet set, a SafeChildWatcher object is automatically created. + """ + if self._watcher is None: + self._init_watcher() + + return self._watcher + + def set_child_watcher(self, watcher): + """Set the child watcher""" + + assert watcher is None or isinstance(watcher, AbstractChildWatcher) + + if self._watcher is not None: + self._watcher.close() + + self._watcher = watcher + +SelectorEventLoop = _UnixSelectorEventLoop +DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy diff --git a/Lib/asyncio/windows_events.py b/Lib/asyncio/windows_events.py index d7444bdf655..64fe38617d7 100644 --- a/Lib/asyncio/windows_events.py +++ b/Lib/asyncio/windows_events.py @@ -7,6 +7,7 @@ import weakref import struct import _winapi +from . import events from . import base_subprocess from . import futures from . import proactor_events @@ -17,7 +18,9 @@ from .log import logger from . import _overlapped -__all__ = ['SelectorEventLoop', 'ProactorEventLoop', 'IocpProactor'] +__all__ = ['SelectorEventLoop', 'ProactorEventLoop', 'IocpProactor', + 'DefaultEventLoopPolicy', + ] NULL = 0 @@ -108,7 +111,7 @@ class PipeServer(object): __del__ = close -class SelectorEventLoop(selector_events.BaseSelectorEventLoop): +class _WindowsSelectorEventLoop(selector_events.BaseSelectorEventLoop): """Windows version of selector event loop.""" def _socketpair(self): @@ -453,3 +456,13 @@ class _WindowsSubprocessTransport(base_subprocess.BaseSubprocessTransport): f = self._loop._proactor.wait_for_handle(int(self._proc._handle)) f.add_done_callback(callback) + + +SelectorEventLoop = _WindowsSelectorEventLoop + + +class _WindowsDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy): + _loop_factory = SelectorEventLoop + + +DefaultEventLoopPolicy = _WindowsDefaultEventLoopPolicy diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py index 4af9aa93c8b..00bd4085c13 100644 --- a/Lib/test/test_asyncio/test_events.py +++ b/Lib/test/test_asyncio/test_events.py @@ -1308,8 +1308,17 @@ else: from asyncio import selectors from asyncio import unix_events + class UnixEventLoopTestsMixin(EventLoopTestsMixin): + def setUp(self): + super().setUp() + events.set_child_watcher(unix_events.SafeChildWatcher(self.loop)) + + def tearDown(self): + events.set_child_watcher(None) + super().tearDown() + if hasattr(selectors, 'KqueueSelector'): - class KqueueEventLoopTests(EventLoopTestsMixin, + class KqueueEventLoopTests(UnixEventLoopTestsMixin, SubprocessTestsMixin, unittest.TestCase): @@ -1318,7 +1327,7 @@ else: selectors.KqueueSelector()) if hasattr(selectors, 'EpollSelector'): - class EPollEventLoopTests(EventLoopTestsMixin, + class EPollEventLoopTests(UnixEventLoopTestsMixin, SubprocessTestsMixin, unittest.TestCase): @@ -1326,7 +1335,7 @@ else: return unix_events.SelectorEventLoop(selectors.EpollSelector()) if hasattr(selectors, 'PollSelector'): - class PollEventLoopTests(EventLoopTestsMixin, + class PollEventLoopTests(UnixEventLoopTestsMixin, SubprocessTestsMixin, unittest.TestCase): @@ -1334,7 +1343,7 @@ else: return unix_events.SelectorEventLoop(selectors.PollSelector()) # Should always exist. - class SelectEventLoopTests(EventLoopTestsMixin, + class SelectEventLoopTests(UnixEventLoopTestsMixin, SubprocessTestsMixin, unittest.TestCase): @@ -1557,25 +1566,36 @@ class ProtocolsAbsTests(unittest.TestCase): class PolicyTests(unittest.TestCase): + def create_policy(self): + if sys.platform == "win32": + from asyncio import windows_events + return windows_events.DefaultEventLoopPolicy() + else: + from asyncio import unix_events + return unix_events.DefaultEventLoopPolicy() + def test_event_loop_policy(self): policy = events.AbstractEventLoopPolicy() self.assertRaises(NotImplementedError, policy.get_event_loop) self.assertRaises(NotImplementedError, policy.set_event_loop, object()) self.assertRaises(NotImplementedError, policy.new_event_loop) + self.assertRaises(NotImplementedError, policy.get_child_watcher) + self.assertRaises(NotImplementedError, policy.set_child_watcher, + object()) def test_get_event_loop(self): - policy = events.DefaultEventLoopPolicy() - self.assertIsNone(policy._loop) + policy = self.create_policy() + self.assertIsNone(policy._local._loop) loop = policy.get_event_loop() self.assertIsInstance(loop, events.AbstractEventLoop) - self.assertIs(policy._loop, loop) + self.assertIs(policy._local._loop, loop) self.assertIs(loop, policy.get_event_loop()) loop.close() def test_get_event_loop_after_set_none(self): - policy = events.DefaultEventLoopPolicy() + policy = self.create_policy() policy.set_event_loop(None) self.assertRaises(AssertionError, policy.get_event_loop) @@ -1583,7 +1603,7 @@ class PolicyTests(unittest.TestCase): def test_get_event_loop_thread(self, m_current_thread): def f(): - policy = events.DefaultEventLoopPolicy() + policy = self.create_policy() self.assertRaises(AssertionError, policy.get_event_loop) th = threading.Thread(target=f) @@ -1591,14 +1611,14 @@ class PolicyTests(unittest.TestCase): th.join() def test_new_event_loop(self): - policy = events.DefaultEventLoopPolicy() + policy = self.create_policy() loop = policy.new_event_loop() self.assertIsInstance(loop, events.AbstractEventLoop) loop.close() def test_set_event_loop(self): - policy = events.DefaultEventLoopPolicy() + policy = self.create_policy() old_loop = policy.get_event_loop() self.assertRaises(AssertionError, policy.set_event_loop, object()) @@ -1621,7 +1641,7 @@ class PolicyTests(unittest.TestCase): old_policy = events.get_event_loop_policy() - policy = events.DefaultEventLoopPolicy() + policy = self.create_policy() events.set_event_loop_policy(policy) self.assertIs(policy, events.get_event_loop_policy()) self.assertIsNot(policy, old_policy) diff --git a/Lib/test/test_asyncio/test_unix_events.py b/Lib/test/test_asyncio/test_unix_events.py index f29e7afec15..a4d835e3f4c 100644 --- a/Lib/test/test_asyncio/test_unix_events.py +++ b/Lib/test/test_asyncio/test_unix_events.py @@ -3,10 +3,12 @@ import gc import errno import io +import os import pprint import signal import stat import sys +import threading import unittest import unittest.mock @@ -181,124 +183,6 @@ class SelectorEventLoopTests(unittest.TestCase): self.assertRaises( RuntimeError, self.loop.remove_signal_handler, signal.SIGHUP) - @unittest.mock.patch('os.WTERMSIG') - @unittest.mock.patch('os.WEXITSTATUS') - @unittest.mock.patch('os.WIFSIGNALED') - @unittest.mock.patch('os.WIFEXITED') - @unittest.mock.patch('os.waitpid') - def test__sig_chld(self, m_waitpid, m_WIFEXITED, m_WIFSIGNALED, - m_WEXITSTATUS, m_WTERMSIG): - m_waitpid.side_effect = [(7, object()), ChildProcessError] - m_WIFEXITED.return_value = True - m_WIFSIGNALED.return_value = False - m_WEXITSTATUS.return_value = 3 - transp = unittest.mock.Mock() - self.loop._subprocesses[7] = transp - - self.loop._sig_chld() - transp._process_exited.assert_called_with(3) - self.assertFalse(m_WTERMSIG.called) - - @unittest.mock.patch('os.WTERMSIG') - @unittest.mock.patch('os.WEXITSTATUS') - @unittest.mock.patch('os.WIFSIGNALED') - @unittest.mock.patch('os.WIFEXITED') - @unittest.mock.patch('os.waitpid') - def test__sig_chld_signal(self, m_waitpid, m_WIFEXITED, m_WIFSIGNALED, - m_WEXITSTATUS, m_WTERMSIG): - m_waitpid.side_effect = [(7, object()), ChildProcessError] - m_WIFEXITED.return_value = False - m_WIFSIGNALED.return_value = True - m_WTERMSIG.return_value = 1 - transp = unittest.mock.Mock() - self.loop._subprocesses[7] = transp - - self.loop._sig_chld() - transp._process_exited.assert_called_with(-1) - self.assertFalse(m_WEXITSTATUS.called) - - @unittest.mock.patch('os.WTERMSIG') - @unittest.mock.patch('os.WEXITSTATUS') - @unittest.mock.patch('os.WIFSIGNALED') - @unittest.mock.patch('os.WIFEXITED') - @unittest.mock.patch('os.waitpid') - def test__sig_chld_zero_pid(self, m_waitpid, m_WIFEXITED, m_WIFSIGNALED, - m_WEXITSTATUS, m_WTERMSIG): - m_waitpid.side_effect = [(0, object()), ChildProcessError] - transp = unittest.mock.Mock() - self.loop._subprocesses[7] = transp - - self.loop._sig_chld() - self.assertFalse(transp._process_exited.called) - self.assertFalse(m_WIFSIGNALED.called) - self.assertFalse(m_WIFEXITED.called) - self.assertFalse(m_WTERMSIG.called) - self.assertFalse(m_WEXITSTATUS.called) - - @unittest.mock.patch('os.WTERMSIG') - @unittest.mock.patch('os.WEXITSTATUS') - @unittest.mock.patch('os.WIFSIGNALED') - @unittest.mock.patch('os.WIFEXITED') - @unittest.mock.patch('os.waitpid') - def test__sig_chld_not_registered_subprocess(self, m_waitpid, - m_WIFEXITED, m_WIFSIGNALED, - m_WEXITSTATUS, m_WTERMSIG): - m_waitpid.side_effect = [(7, object()), ChildProcessError] - m_WIFEXITED.return_value = True - m_WIFSIGNALED.return_value = False - m_WEXITSTATUS.return_value = 3 - - self.loop._sig_chld() - self.assertFalse(m_WTERMSIG.called) - - @unittest.mock.patch('os.WTERMSIG') - @unittest.mock.patch('os.WEXITSTATUS') - @unittest.mock.patch('os.WIFSIGNALED') - @unittest.mock.patch('os.WIFEXITED') - @unittest.mock.patch('os.waitpid') - def test__sig_chld_unknown_status(self, m_waitpid, - m_WIFEXITED, m_WIFSIGNALED, - m_WEXITSTATUS, m_WTERMSIG): - m_waitpid.side_effect = [(7, object()), ChildProcessError] - m_WIFEXITED.return_value = False - m_WIFSIGNALED.return_value = False - transp = unittest.mock.Mock() - self.loop._subprocesses[7] = transp - - self.loop._sig_chld() - self.assertTrue(transp._process_exited.called) - self.assertFalse(m_WEXITSTATUS.called) - self.assertFalse(m_WTERMSIG.called) - - @unittest.mock.patch('asyncio.unix_events.logger') - @unittest.mock.patch('os.WTERMSIG') - @unittest.mock.patch('os.WEXITSTATUS') - @unittest.mock.patch('os.WIFSIGNALED') - @unittest.mock.patch('os.WIFEXITED') - @unittest.mock.patch('os.waitpid') - def test__sig_chld_unknown_status_in_handler(self, m_waitpid, - m_WIFEXITED, m_WIFSIGNALED, - m_WEXITSTATUS, m_WTERMSIG, - m_log): - m_waitpid.side_effect = Exception - transp = unittest.mock.Mock() - self.loop._subprocesses[7] = transp - - self.loop._sig_chld() - self.assertFalse(transp._process_exited.called) - self.assertFalse(m_WIFSIGNALED.called) - self.assertFalse(m_WIFEXITED.called) - self.assertFalse(m_WTERMSIG.called) - self.assertFalse(m_WEXITSTATUS.called) - m_log.exception.assert_called_with( - 'Unknown exception in SIGCHLD handler') - - @unittest.mock.patch('os.waitpid') - def test__sig_chld_process_error(self, m_waitpid): - m_waitpid.side_effect = ChildProcessError - self.loop._sig_chld() - self.assertTrue(m_waitpid.called) - class UnixReadPipeTransportTests(unittest.TestCase): @@ -777,5 +661,872 @@ class UnixWritePipeTransportTests(unittest.TestCase): self.assertFalse(self.protocol.connection_lost.called) +class AbstractChildWatcherTests(unittest.TestCase): + + def test_not_implemented(self): + f = unittest.mock.Mock() + watcher = unix_events.AbstractChildWatcher() + self.assertRaises( + NotImplementedError, watcher.add_child_handler, f, f) + self.assertRaises( + NotImplementedError, watcher.remove_child_handler, f) + self.assertRaises( + NotImplementedError, watcher.set_loop, f) + self.assertRaises( + NotImplementedError, watcher.close) + self.assertRaises( + NotImplementedError, watcher.__enter__) + self.assertRaises( + NotImplementedError, watcher.__exit__, f, f, f) + + +class BaseChildWatcherTests(unittest.TestCase): + + def test_not_implemented(self): + f = unittest.mock.Mock() + watcher = unix_events.BaseChildWatcher(None) + self.assertRaises( + NotImplementedError, watcher._do_waitpid, f) + + +class ChildWatcherTestsMixin: + instance = None + + ignore_warnings = unittest.mock.patch.object(unix_events.logger, "warning") + + def setUp(self): + self.loop = test_utils.TestLoop() + self.running = False + self.zombies = {} + + assert ChildWatcherTestsMixin.instance is None + ChildWatcherTestsMixin.instance = self + + with unittest.mock.patch.object( + self.loop, "add_signal_handler") as self.m_add_signal_handler: + self.watcher = self.create_watcher(self.loop) + + def tearDown(self): + ChildWatcherTestsMixin.instance = None + + def waitpid(pid, flags): + self = ChildWatcherTestsMixin.instance + if isinstance(self.watcher, unix_events.SafeChildWatcher) or pid != -1: + self.assertGreater(pid, 0) + try: + if pid < 0: + return self.zombies.popitem() + else: + return pid, self.zombies.pop(pid) + except KeyError: + pass + if self.running: + return 0, 0 + else: + raise ChildProcessError() + + def add_zombie(self, pid, returncode): + self.zombies[pid] = returncode + 32768 + + def WIFEXITED(status): + return status >= 32768 + + def WIFSIGNALED(status): + return 32700 < status < 32768 + + def WEXITSTATUS(status): + self = ChildWatcherTestsMixin.instance + self.assertTrue(type(self).WIFEXITED(status)) + return status - 32768 + + def WTERMSIG(status): + self = ChildWatcherTestsMixin.instance + self.assertTrue(type(self).WIFSIGNALED(status)) + return 32768 - status + + def test_create_watcher(self): + self.m_add_signal_handler.assert_called_once_with( + signal.SIGCHLD, self.watcher._sig_chld) + + @unittest.mock.patch('os.WTERMSIG', wraps=WTERMSIG) + @unittest.mock.patch('os.WEXITSTATUS', wraps=WEXITSTATUS) + @unittest.mock.patch('os.WIFSIGNALED', wraps=WIFSIGNALED) + @unittest.mock.patch('os.WIFEXITED', wraps=WIFEXITED) + @unittest.mock.patch('os.waitpid', wraps=waitpid) + def test_sigchld(self, m_waitpid, m_WIFEXITED, m_WIFSIGNALED, + m_WEXITSTATUS, m_WTERMSIG): + # register a child + callback = unittest.mock.Mock() + + with self.watcher: + self.running = True + self.watcher.add_child_handler(42, callback, 9, 10, 14) + + self.assertFalse(callback.called) + self.assertFalse(m_WIFEXITED.called) + self.assertFalse(m_WIFSIGNALED.called) + self.assertFalse(m_WEXITSTATUS.called) + self.assertFalse(m_WTERMSIG.called) + + # child is running + self.watcher._sig_chld() + + self.assertFalse(callback.called) + self.assertFalse(m_WIFEXITED.called) + self.assertFalse(m_WIFSIGNALED.called) + self.assertFalse(m_WEXITSTATUS.called) + self.assertFalse(m_WTERMSIG.called) + + # child terminates (returncode 12) + self.running = False + self.add_zombie(42, 12) + self.watcher._sig_chld() + + self.assertTrue(m_WIFEXITED.called) + self.assertTrue(m_WEXITSTATUS.called) + self.assertFalse(m_WTERMSIG.called) + callback.assert_called_once_with(42, 12, 9, 10, 14) + + m_WIFSIGNALED.reset_mock() + m_WIFEXITED.reset_mock() + m_WEXITSTATUS.reset_mock() + callback.reset_mock() + + # ensure that the child is effectively reaped + self.add_zombie(42, 13) + with self.ignore_warnings: + self.watcher._sig_chld() + + self.assertFalse(callback.called) + self.assertFalse(m_WTERMSIG.called) + + m_WIFSIGNALED.reset_mock() + m_WIFEXITED.reset_mock() + m_WEXITSTATUS.reset_mock() + + # sigchld called again + self.zombies.clear() + self.watcher._sig_chld() + + self.assertFalse(callback.called) + self.assertFalse(m_WIFEXITED.called) + self.assertFalse(m_WIFSIGNALED.called) + self.assertFalse(m_WEXITSTATUS.called) + self.assertFalse(m_WTERMSIG.called) + + @unittest.mock.patch('os.WTERMSIG', wraps=WTERMSIG) + @unittest.mock.patch('os.WEXITSTATUS', wraps=WEXITSTATUS) + @unittest.mock.patch('os.WIFSIGNALED', wraps=WIFSIGNALED) + @unittest.mock.patch('os.WIFEXITED', wraps=WIFEXITED) + @unittest.mock.patch('os.waitpid', wraps=waitpid) + def test_sigchld_two_children(self, m_waitpid, m_WIFEXITED, m_WIFSIGNALED, + m_WEXITSTATUS, m_WTERMSIG): + callback1 = unittest.mock.Mock() + callback2 = unittest.mock.Mock() + + # register child 1 + with self.watcher: + self.running = True + self.watcher.add_child_handler(43, callback1, 7, 8) + + self.assertFalse(callback1.called) + self.assertFalse(callback2.called) + self.assertFalse(m_WIFEXITED.called) + self.assertFalse(m_WIFSIGNALED.called) + self.assertFalse(m_WEXITSTATUS.called) + self.assertFalse(m_WTERMSIG.called) + + # register child 2 + with self.watcher: + self.watcher.add_child_handler(44, callback2, 147, 18) + + self.assertFalse(callback1.called) + self.assertFalse(callback2.called) + self.assertFalse(m_WIFEXITED.called) + self.assertFalse(m_WIFSIGNALED.called) + self.assertFalse(m_WEXITSTATUS.called) + self.assertFalse(m_WTERMSIG.called) + + # childen are running + self.watcher._sig_chld() + + self.assertFalse(callback1.called) + self.assertFalse(callback2.called) + self.assertFalse(m_WIFEXITED.called) + self.assertFalse(m_WIFSIGNALED.called) + self.assertFalse(m_WEXITSTATUS.called) + self.assertFalse(m_WTERMSIG.called) + + # child 1 terminates (signal 3) + self.add_zombie(43, -3) + self.watcher._sig_chld() + + callback1.assert_called_once_with(43, -3, 7, 8) + self.assertFalse(callback2.called) + self.assertTrue(m_WIFSIGNALED.called) + self.assertFalse(m_WEXITSTATUS.called) + self.assertTrue(m_WTERMSIG.called) + + m_WIFSIGNALED.reset_mock() + m_WIFEXITED.reset_mock() + m_WTERMSIG.reset_mock() + callback1.reset_mock() + + # child 2 still running + self.watcher._sig_chld() + + self.assertFalse(callback1.called) + self.assertFalse(callback2.called) + self.assertFalse(m_WIFEXITED.called) + self.assertFalse(m_WIFSIGNALED.called) + self.assertFalse(m_WEXITSTATUS.called) + self.assertFalse(m_WTERMSIG.called) + + # child 2 terminates (code 108) + self.add_zombie(44, 108) + self.running = False + self.watcher._sig_chld() + + callback2.assert_called_once_with(44, 108, 147, 18) + self.assertFalse(callback1.called) + self.assertTrue(m_WIFEXITED.called) + self.assertTrue(m_WEXITSTATUS.called) + self.assertFalse(m_WTERMSIG.called) + + m_WIFSIGNALED.reset_mock() + m_WIFEXITED.reset_mock() + m_WEXITSTATUS.reset_mock() + callback2.reset_mock() + + # ensure that the children are effectively reaped + self.add_zombie(43, 14) + self.add_zombie(44, 15) + with self.ignore_warnings: + self.watcher._sig_chld() + + self.assertFalse(callback1.called) + self.assertFalse(callback2.called) + self.assertFalse(m_WTERMSIG.called) + + m_WIFSIGNALED.reset_mock() + m_WIFEXITED.reset_mock() + m_WEXITSTATUS.reset_mock() + + # sigchld called again + self.zombies.clear() + self.watcher._sig_chld() + + self.assertFalse(callback1.called) + self.assertFalse(callback2.called) + self.assertFalse(m_WIFEXITED.called) + self.assertFalse(m_WIFSIGNALED.called) + self.assertFalse(m_WEXITSTATUS.called) + self.assertFalse(m_WTERMSIG.called) + + @unittest.mock.patch('os.WTERMSIG', wraps=WTERMSIG) + @unittest.mock.patch('os.WEXITSTATUS', wraps=WEXITSTATUS) + @unittest.mock.patch('os.WIFSIGNALED', wraps=WIFSIGNALED) + @unittest.mock.patch('os.WIFEXITED', wraps=WIFEXITED) + @unittest.mock.patch('os.waitpid', wraps=waitpid) + def test_sigchld_two_children_terminating_together( + self, m_waitpid, m_WIFEXITED, m_WIFSIGNALED, m_WEXITSTATUS, + m_WTERMSIG): + callback1 = unittest.mock.Mock() + callback2 = unittest.mock.Mock() + + # register child 1 + with self.watcher: + self.running = True + self.watcher.add_child_handler(45, callback1, 17, 8) + + self.assertFalse(callback1.called) + self.assertFalse(callback2.called) + self.assertFalse(m_WIFEXITED.called) + self.assertFalse(m_WIFSIGNALED.called) + self.assertFalse(m_WEXITSTATUS.called) + self.assertFalse(m_WTERMSIG.called) + + # register child 2 + with self.watcher: + self.watcher.add_child_handler(46, callback2, 1147, 18) + + self.assertFalse(callback1.called) + self.assertFalse(callback2.called) + self.assertFalse(m_WIFEXITED.called) + self.assertFalse(m_WIFSIGNALED.called) + self.assertFalse(m_WEXITSTATUS.called) + self.assertFalse(m_WTERMSIG.called) + + # childen are running + self.watcher._sig_chld() + + self.assertFalse(callback1.called) + self.assertFalse(callback2.called) + self.assertFalse(m_WIFEXITED.called) + self.assertFalse(m_WIFSIGNALED.called) + self.assertFalse(m_WEXITSTATUS.called) + self.assertFalse(m_WTERMSIG.called) + + # child 1 terminates (code 78) + # child 2 terminates (signal 5) + self.add_zombie(45, 78) + self.add_zombie(46, -5) + self.running = False + self.watcher._sig_chld() + + callback1.assert_called_once_with(45, 78, 17, 8) + callback2.assert_called_once_with(46, -5, 1147, 18) + self.assertTrue(m_WIFSIGNALED.called) + self.assertTrue(m_WIFEXITED.called) + self.assertTrue(m_WEXITSTATUS.called) + self.assertTrue(m_WTERMSIG.called) + + m_WIFSIGNALED.reset_mock() + m_WIFEXITED.reset_mock() + m_WTERMSIG.reset_mock() + m_WEXITSTATUS.reset_mock() + callback1.reset_mock() + callback2.reset_mock() + + # ensure that the children are effectively reaped + self.add_zombie(45, 14) + self.add_zombie(46, 15) + with self.ignore_warnings: + self.watcher._sig_chld() + + self.assertFalse(callback1.called) + self.assertFalse(callback2.called) + self.assertFalse(m_WTERMSIG.called) + + @unittest.mock.patch('os.WTERMSIG', wraps=WTERMSIG) + @unittest.mock.patch('os.WEXITSTATUS', wraps=WEXITSTATUS) + @unittest.mock.patch('os.WIFSIGNALED', wraps=WIFSIGNALED) + @unittest.mock.patch('os.WIFEXITED', wraps=WIFEXITED) + @unittest.mock.patch('os.waitpid', wraps=waitpid) + def test_sigchld_race_condition( + self, m_waitpid, m_WIFEXITED, m_WIFSIGNALED, m_WEXITSTATUS, + m_WTERMSIG): + # register a child + callback = unittest.mock.Mock() + + with self.watcher: + # child terminates before being registered + self.add_zombie(50, 4) + self.watcher._sig_chld() + + self.watcher.add_child_handler(50, callback, 1, 12) + + callback.assert_called_once_with(50, 4, 1, 12) + callback.reset_mock() + + # ensure that the child is effectively reaped + self.add_zombie(50, -1) + with self.ignore_warnings: + self.watcher._sig_chld() + + self.assertFalse(callback.called) + + @unittest.mock.patch('os.WTERMSIG', wraps=WTERMSIG) + @unittest.mock.patch('os.WEXITSTATUS', wraps=WEXITSTATUS) + @unittest.mock.patch('os.WIFSIGNALED', wraps=WIFSIGNALED) + @unittest.mock.patch('os.WIFEXITED', wraps=WIFEXITED) + @unittest.mock.patch('os.waitpid', wraps=waitpid) + def test_sigchld_replace_handler( + self, m_waitpid, m_WIFEXITED, m_WIFSIGNALED, m_WEXITSTATUS, + m_WTERMSIG): + callback1 = unittest.mock.Mock() + callback2 = unittest.mock.Mock() + + # register a child + with self.watcher: + self.running = True + self.watcher.add_child_handler(51, callback1, 19) + + self.assertFalse(callback1.called) + self.assertFalse(callback2.called) + self.assertFalse(m_WIFEXITED.called) + self.assertFalse(m_WIFSIGNALED.called) + self.assertFalse(m_WEXITSTATUS.called) + self.assertFalse(m_WTERMSIG.called) + + # register the same child again + with self.watcher: + self.watcher.add_child_handler(51, callback2, 21) + + self.assertFalse(callback1.called) + self.assertFalse(callback2.called) + self.assertFalse(m_WIFEXITED.called) + self.assertFalse(m_WIFSIGNALED.called) + self.assertFalse(m_WEXITSTATUS.called) + self.assertFalse(m_WTERMSIG.called) + + # child terminates (signal 8) + self.running = False + self.add_zombie(51, -8) + self.watcher._sig_chld() + + callback2.assert_called_once_with(51, -8, 21) + self.assertFalse(callback1.called) + self.assertTrue(m_WIFSIGNALED.called) + self.assertFalse(m_WEXITSTATUS.called) + self.assertTrue(m_WTERMSIG.called) + + m_WIFSIGNALED.reset_mock() + m_WIFEXITED.reset_mock() + m_WTERMSIG.reset_mock() + callback2.reset_mock() + + # ensure that the child is effectively reaped + self.add_zombie(51, 13) + with self.ignore_warnings: + self.watcher._sig_chld() + + self.assertFalse(callback1.called) + self.assertFalse(callback2.called) + self.assertFalse(m_WTERMSIG.called) + + @unittest.mock.patch('os.WTERMSIG', wraps=WTERMSIG) + @unittest.mock.patch('os.WEXITSTATUS', wraps=WEXITSTATUS) + @unittest.mock.patch('os.WIFSIGNALED', wraps=WIFSIGNALED) + @unittest.mock.patch('os.WIFEXITED', wraps=WIFEXITED) + @unittest.mock.patch('os.waitpid', wraps=waitpid) + def test_sigchld_remove_handler(self, m_waitpid, m_WIFEXITED, + m_WIFSIGNALED, m_WEXITSTATUS, m_WTERMSIG): + callback = unittest.mock.Mock() + + # register a child + with self.watcher: + self.running = True + self.watcher.add_child_handler(52, callback, 1984) + + self.assertFalse(callback.called) + self.assertFalse(m_WIFEXITED.called) + self.assertFalse(m_WIFSIGNALED.called) + self.assertFalse(m_WEXITSTATUS.called) + self.assertFalse(m_WTERMSIG.called) + + # unregister the child + self.watcher.remove_child_handler(52) + + self.assertFalse(callback.called) + self.assertFalse(m_WIFEXITED.called) + self.assertFalse(m_WIFSIGNALED.called) + self.assertFalse(m_WEXITSTATUS.called) + self.assertFalse(m_WTERMSIG.called) + + # child terminates (code 99) + self.running = False + self.add_zombie(52, 99) + with self.ignore_warnings: + self.watcher._sig_chld() + + self.assertFalse(callback.called) + + @unittest.mock.patch('os.WTERMSIG', wraps=WTERMSIG) + @unittest.mock.patch('os.WEXITSTATUS', wraps=WEXITSTATUS) + @unittest.mock.patch('os.WIFSIGNALED', wraps=WIFSIGNALED) + @unittest.mock.patch('os.WIFEXITED', wraps=WIFEXITED) + @unittest.mock.patch('os.waitpid', wraps=waitpid) + def test_sigchld_unknown_status(self, m_waitpid, m_WIFEXITED, + m_WIFSIGNALED, m_WEXITSTATUS, m_WTERMSIG): + callback = unittest.mock.Mock() + + # register a child + with self.watcher: + self.running = True + self.watcher.add_child_handler(53, callback, -19) + + self.assertFalse(callback.called) + self.assertFalse(m_WIFEXITED.called) + self.assertFalse(m_WIFSIGNALED.called) + self.assertFalse(m_WEXITSTATUS.called) + self.assertFalse(m_WTERMSIG.called) + + # terminate with unknown status + self.zombies[53] = 1178 + self.running = False + self.watcher._sig_chld() + + callback.assert_called_once_with(53, 1178, -19) + self.assertTrue(m_WIFEXITED.called) + self.assertTrue(m_WIFSIGNALED.called) + self.assertFalse(m_WEXITSTATUS.called) + self.assertFalse(m_WTERMSIG.called) + + callback.reset_mock() + m_WIFEXITED.reset_mock() + m_WIFSIGNALED.reset_mock() + + # ensure that the child is effectively reaped + self.add_zombie(53, 101) + with self.ignore_warnings: + self.watcher._sig_chld() + + self.assertFalse(callback.called) + + @unittest.mock.patch('os.WTERMSIG', wraps=WTERMSIG) + @unittest.mock.patch('os.WEXITSTATUS', wraps=WEXITSTATUS) + @unittest.mock.patch('os.WIFSIGNALED', wraps=WIFSIGNALED) + @unittest.mock.patch('os.WIFEXITED', wraps=WIFEXITED) + @unittest.mock.patch('os.waitpid', wraps=waitpid) + def test_remove_child_handler(self, m_waitpid, m_WIFEXITED, + m_WIFSIGNALED, m_WEXITSTATUS, m_WTERMSIG): + callback1 = unittest.mock.Mock() + callback2 = unittest.mock.Mock() + callback3 = unittest.mock.Mock() + + # register children + with self.watcher: + self.running = True + self.watcher.add_child_handler(54, callback1, 1) + self.watcher.add_child_handler(55, callback2, 2) + self.watcher.add_child_handler(56, callback3, 3) + + # remove child handler 1 + self.assertTrue(self.watcher.remove_child_handler(54)) + + # remove child handler 2 multiple times + self.assertTrue(self.watcher.remove_child_handler(55)) + self.assertFalse(self.watcher.remove_child_handler(55)) + self.assertFalse(self.watcher.remove_child_handler(55)) + + # all children terminate + self.add_zombie(54, 0) + self.add_zombie(55, 1) + self.add_zombie(56, 2) + self.running = False + with self.ignore_warnings: + self.watcher._sig_chld() + + self.assertFalse(callback1.called) + self.assertFalse(callback2.called) + callback3.assert_called_once_with(56, 2, 3) + + @unittest.mock.patch('os.waitpid', wraps=waitpid) + def test_sigchld_unhandled_exception(self, m_waitpid): + callback = unittest.mock.Mock() + + # register a child + with self.watcher: + self.running = True + self.watcher.add_child_handler(57, callback) + + # raise an exception + m_waitpid.side_effect = ValueError + + with unittest.mock.patch.object(unix_events.logger, + "exception") as m_exception: + + self.assertEqual(self.watcher._sig_chld(), None) + self.assertTrue(m_exception.called) + + @unittest.mock.patch('os.WTERMSIG', wraps=WTERMSIG) + @unittest.mock.patch('os.WEXITSTATUS', wraps=WEXITSTATUS) + @unittest.mock.patch('os.WIFSIGNALED', wraps=WIFSIGNALED) + @unittest.mock.patch('os.WIFEXITED', wraps=WIFEXITED) + @unittest.mock.patch('os.waitpid', wraps=waitpid) + def test_sigchld_child_reaped_elsewhere( + self, m_waitpid, m_WIFEXITED, m_WIFSIGNALED, m_WEXITSTATUS, + m_WTERMSIG): + + # register a child + callback = unittest.mock.Mock() + + with self.watcher: + self.running = True + self.watcher.add_child_handler(58, callback) + + self.assertFalse(callback.called) + self.assertFalse(m_WIFEXITED.called) + self.assertFalse(m_WIFSIGNALED.called) + self.assertFalse(m_WEXITSTATUS.called) + self.assertFalse(m_WTERMSIG.called) + + # child terminates + self.running = False + self.add_zombie(58, 4) + + # waitpid is called elsewhere + os.waitpid(58, os.WNOHANG) + + m_waitpid.reset_mock() + + # sigchld + with self.ignore_warnings: + self.watcher._sig_chld() + + callback.assert_called(m_waitpid) + if isinstance(self.watcher, unix_events.FastChildWatcher): + # here the FastChildWatche enters a deadlock + # (there is no way to prevent it) + self.assertFalse(callback.called) + else: + callback.assert_called_once_with(58, 255) + + @unittest.mock.patch('os.WTERMSIG', wraps=WTERMSIG) + @unittest.mock.patch('os.WEXITSTATUS', wraps=WEXITSTATUS) + @unittest.mock.patch('os.WIFSIGNALED', wraps=WIFSIGNALED) + @unittest.mock.patch('os.WIFEXITED', wraps=WIFEXITED) + @unittest.mock.patch('os.waitpid', wraps=waitpid) + def test_sigchld_unknown_pid_during_registration( + self, m_waitpid, m_WIFEXITED, m_WIFSIGNALED, m_WEXITSTATUS, + m_WTERMSIG): + + # register two children + callback1 = unittest.mock.Mock() + callback2 = unittest.mock.Mock() + + with self.ignore_warnings, self.watcher: + self.running = True + # child 1 terminates + self.add_zombie(591, 7) + # an unknown child terminates + self.add_zombie(593, 17) + + self.watcher._sig_chld() + + self.watcher.add_child_handler(591, callback1) + self.watcher.add_child_handler(592, callback2) + + callback1.assert_called_once_with(591, 7) + self.assertFalse(callback2.called) + + @unittest.mock.patch('os.WTERMSIG', wraps=WTERMSIG) + @unittest.mock.patch('os.WEXITSTATUS', wraps=WEXITSTATUS) + @unittest.mock.patch('os.WIFSIGNALED', wraps=WIFSIGNALED) + @unittest.mock.patch('os.WIFEXITED', wraps=WIFEXITED) + @unittest.mock.patch('os.waitpid', wraps=waitpid) + def test_set_loop( + self, m_waitpid, m_WIFEXITED, m_WIFSIGNALED, m_WEXITSTATUS, + m_WTERMSIG): + + # register a child + callback = unittest.mock.Mock() + + with self.watcher: + self.running = True + self.watcher.add_child_handler(60, callback) + + # attach a new loop + old_loop = self.loop + self.loop = test_utils.TestLoop() + + with unittest.mock.patch.object( + old_loop, + "remove_signal_handler") as m_old_remove_signal_handler, \ + unittest.mock.patch.object( + self.loop, + "add_signal_handler") as m_new_add_signal_handler: + + self.watcher.set_loop(self.loop) + + m_old_remove_signal_handler.assert_called_once_with( + signal.SIGCHLD) + m_new_add_signal_handler.assert_called_once_with( + signal.SIGCHLD, self.watcher._sig_chld) + + # child terminates + self.running = False + self.add_zombie(60, 9) + self.watcher._sig_chld() + + callback.assert_called_once_with(60, 9) + + @unittest.mock.patch('os.WTERMSIG', wraps=WTERMSIG) + @unittest.mock.patch('os.WEXITSTATUS', wraps=WEXITSTATUS) + @unittest.mock.patch('os.WIFSIGNALED', wraps=WIFSIGNALED) + @unittest.mock.patch('os.WIFEXITED', wraps=WIFEXITED) + @unittest.mock.patch('os.waitpid', wraps=waitpid) + def test_set_loop_race_condition( + self, m_waitpid, m_WIFEXITED, m_WIFSIGNALED, m_WEXITSTATUS, + m_WTERMSIG): + + # register 3 children + callback1 = unittest.mock.Mock() + callback2 = unittest.mock.Mock() + callback3 = unittest.mock.Mock() + + with self.watcher: + self.running = True + self.watcher.add_child_handler(61, callback1) + self.watcher.add_child_handler(62, callback2) + self.watcher.add_child_handler(622, callback3) + + # detach the loop + old_loop = self.loop + self.loop = None + + with unittest.mock.patch.object( + old_loop, "remove_signal_handler") as m_remove_signal_handler: + + self.watcher.set_loop(None) + + m_remove_signal_handler.assert_called_once_with( + signal.SIGCHLD) + + # child 1 & 2 terminate + self.add_zombie(61, 11) + self.add_zombie(62, -5) + + # SIGCHLD was not catched + self.assertFalse(callback1.called) + self.assertFalse(callback2.called) + self.assertFalse(callback3.called) + + # attach a new loop + self.loop = test_utils.TestLoop() + + with unittest.mock.patch.object( + self.loop, "add_signal_handler") as m_add_signal_handler: + + self.watcher.set_loop(self.loop) + + m_add_signal_handler.assert_called_once_with( + signal.SIGCHLD, self.watcher._sig_chld) + callback1.assert_called_once_with(61, 11) # race condition! + callback2.assert_called_once_with(62, -5) # race condition! + self.assertFalse(callback3.called) + + callback1.reset_mock() + callback2.reset_mock() + + # child 3 terminates + self.running = False + self.add_zombie(622, 19) + self.watcher._sig_chld() + + self.assertFalse(callback1.called) + self.assertFalse(callback2.called) + callback3.assert_called_once_with(622, 19) + + @unittest.mock.patch('os.WTERMSIG', wraps=WTERMSIG) + @unittest.mock.patch('os.WEXITSTATUS', wraps=WEXITSTATUS) + @unittest.mock.patch('os.WIFSIGNALED', wraps=WIFSIGNALED) + @unittest.mock.patch('os.WIFEXITED', wraps=WIFEXITED) + @unittest.mock.patch('os.waitpid', wraps=waitpid) + def test_close( + self, m_waitpid, m_WIFEXITED, m_WIFSIGNALED, m_WEXITSTATUS, + m_WTERMSIG): + + # register two children + callback1 = unittest.mock.Mock() + callback2 = unittest.mock.Mock() + + with self.watcher: + self.running = True + # child 1 terminates + self.add_zombie(63, 9) + # other child terminates + self.add_zombie(65, 18) + self.watcher._sig_chld() + + self.watcher.add_child_handler(63, callback1) + self.watcher.add_child_handler(64, callback1) + + self.assertEqual(len(self.watcher._callbacks), 1) + if isinstance(self.watcher, unix_events.FastChildWatcher): + self.assertEqual(len(self.watcher._zombies), 1) + + with unittest.mock.patch.object( + self.loop, + "remove_signal_handler") as m_remove_signal_handler: + + self.watcher.close() + + m_remove_signal_handler.assert_called_once_with( + signal.SIGCHLD) + self.assertFalse(self.watcher._callbacks) + if isinstance(self.watcher, unix_events.FastChildWatcher): + self.assertFalse(self.watcher._zombies) + + +class SafeChildWatcherTests (ChildWatcherTestsMixin, unittest.TestCase): + def create_watcher(self, loop): + return unix_events.SafeChildWatcher(loop) + + +class FastChildWatcherTests (ChildWatcherTestsMixin, unittest.TestCase): + def create_watcher(self, loop): + return unix_events.FastChildWatcher(loop) + + +class PolicyTests(unittest.TestCase): + + def create_policy(self): + return unix_events.DefaultEventLoopPolicy() + + def test_get_child_watcher(self): + policy = self.create_policy() + self.assertIsNone(policy._watcher) + + watcher = policy.get_child_watcher() + self.assertIsInstance(watcher, unix_events.SafeChildWatcher) + + self.assertIs(policy._watcher, watcher) + + self.assertIs(watcher, policy.get_child_watcher()) + self.assertIsNone(watcher._loop) + + def test_get_child_watcher_after_set(self): + policy = self.create_policy() + watcher = unix_events.FastChildWatcher(None) + + policy.set_child_watcher(watcher) + self.assertIs(policy._watcher, watcher) + self.assertIs(watcher, policy.get_child_watcher()) + + def test_get_child_watcher_with_mainloop_existing(self): + policy = self.create_policy() + loop = policy.get_event_loop() + + self.assertIsNone(policy._watcher) + watcher = policy.get_child_watcher() + + self.assertIsInstance(watcher, unix_events.SafeChildWatcher) + self.assertIs(watcher._loop, loop) + + loop.close() + + def test_get_child_watcher_thread(self): + + def f(): + policy.set_event_loop(policy.new_event_loop()) + + self.assertIsInstance(policy.get_event_loop(), + events.AbstractEventLoop) + watcher = policy.get_child_watcher() + + self.assertIsInstance(watcher, unix_events.SafeChildWatcher) + self.assertIsNone(watcher._loop) + + policy.get_event_loop().close() + + policy = self.create_policy() + + th = threading.Thread(target=f) + th.start() + th.join() + + def test_child_watcher_replace_mainloop_existing(self): + policy = self.create_policy() + loop = policy.get_event_loop() + + watcher = policy.get_child_watcher() + + self.assertIs(watcher._loop, loop) + + new_loop = policy.new_event_loop() + policy.set_event_loop(new_loop) + + self.assertIs(watcher._loop, new_loop) + + policy.set_event_loop(None) + + self.assertIs(watcher._loop, None) + + loop.close() + new_loop.close() + + if __name__ == '__main__': unittest.main()