cpython/Lib/test/test_asyncio/test_subprocess.py

454 lines
17 KiB
Python
Raw Normal View History

import signal
import sys
import unittest
from unittest import mock
2014-12-18 07:29:53 -04:00
import asyncio
asyncio: sync with Tulip Issue #23347: send_signal(), kill() and terminate() methods of BaseSubprocessTransport now check if the transport was closed and if the process exited. Issue #23347: Refactor creation of subprocess transports. Changes on BaseSubprocessTransport: * Add a wait() method to wait until the child process exit * The constructor now accepts an optional waiter parameter. The _post_init() coroutine must not be called explicitly anymore. It makes subprocess transports closer to other transports, and it gives more freedom if we want later to change completly how subprocess transports are created. * close() now kills the process instead of kindly terminate it: the child process may ignore SIGTERM and continue to run. Call explicitly terminate() and wait() if you want to kindly terminate the child process. * close() now logs a warning in debug mode if the process is still running and needs to be killed * _make_subprocess_transport() is now fully asynchronous again: if the creation of the transport failed, wait asynchronously for the process eixt. Before the wait was synchronous. This change requires close() to *kill*, and not terminate, the child process. * Remove the _kill_wait() method, replaced with a more agressive close() method. It fixes _make_subprocess_transport() on error. BaseSubprocessTransport.close() calls the close() method of pipe transports, whereas _kill_wait() closed directly pipes of the subprocess.Popen object without unregistering file descriptors from the selector (which caused severe bugs). These changes simplifies the code of subprocess.py.
2015-01-29 19:05:19 -04:00
from asyncio import base_subprocess
2014-12-18 07:29:53 -04:00
from asyncio import subprocess
from asyncio import test_utils
try:
from test import support
2014-12-18 07:29:53 -04:00
except ImportError:
from asyncio import test_support as support
if sys.platform != 'win32':
from asyncio import unix_events
# Program blocking
PROGRAM_BLOCKED = [sys.executable, '-c', 'import time; time.sleep(3600)']
# Program copying input to output
PROGRAM_CAT = [
sys.executable, '-c',
';'.join(('import sys',
'data = sys.stdin.buffer.read()',
'sys.stdout.buffer.write(data)'))]
asyncio: sync with Tulip Issue #23347: send_signal(), kill() and terminate() methods of BaseSubprocessTransport now check if the transport was closed and if the process exited. Issue #23347: Refactor creation of subprocess transports. Changes on BaseSubprocessTransport: * Add a wait() method to wait until the child process exit * The constructor now accepts an optional waiter parameter. The _post_init() coroutine must not be called explicitly anymore. It makes subprocess transports closer to other transports, and it gives more freedom if we want later to change completly how subprocess transports are created. * close() now kills the process instead of kindly terminate it: the child process may ignore SIGTERM and continue to run. Call explicitly terminate() and wait() if you want to kindly terminate the child process. * close() now logs a warning in debug mode if the process is still running and needs to be killed * _make_subprocess_transport() is now fully asynchronous again: if the creation of the transport failed, wait asynchronously for the process eixt. Before the wait was synchronous. This change requires close() to *kill*, and not terminate, the child process. * Remove the _kill_wait() method, replaced with a more agressive close() method. It fixes _make_subprocess_transport() on error. BaseSubprocessTransport.close() calls the close() method of pipe transports, whereas _kill_wait() closed directly pipes of the subprocess.Popen object without unregistering file descriptors from the selector (which caused severe bugs). These changes simplifies the code of subprocess.py.
2015-01-29 19:05:19 -04:00
class TestSubprocessTransport(base_subprocess.BaseSubprocessTransport):
def _start(self, *args, **kwargs):
self._proc = mock.Mock()
self._proc.stdin = None
self._proc.stdout = None
self._proc.stderr = None
class SubprocessTransportTests(test_utils.TestCase):
def setUp(self):
self.loop = self.new_test_loop()
self.set_event_loop(self.loop)
def create_transport(self, waiter=None):
protocol = mock.Mock()
protocol.connection_made._is_coroutine = False
protocol.process_exited._is_coroutine = False
transport = TestSubprocessTransport(
self.loop, protocol, ['test'], False,
None, None, None, 0, waiter=waiter)
return (transport, protocol)
def test_proc_exited(self):
waiter = asyncio.Future(loop=self.loop)
transport, protocol = self.create_transport(waiter)
transport._process_exited(6)
self.loop.run_until_complete(waiter)
self.assertEqual(transport.get_returncode(), 6)
self.assertTrue(protocol.connection_made.called)
self.assertTrue(protocol.process_exited.called)
self.assertTrue(protocol.connection_lost.called)
self.assertEqual(protocol.connection_lost.call_args[0], (None,))
self.assertFalse(transport._closed)
self.assertIsNone(transport._loop)
self.assertIsNone(transport._proc)
self.assertIsNone(transport._protocol)
# methods must raise ProcessLookupError if the process exited
self.assertRaises(ProcessLookupError,
transport.send_signal, signal.SIGTERM)
self.assertRaises(ProcessLookupError, transport.terminate)
self.assertRaises(ProcessLookupError, transport.kill)
transport.close()
asyncio: sync with Tulip Issue #23347: send_signal(), kill() and terminate() methods of BaseSubprocessTransport now check if the transport was closed and if the process exited. Issue #23347: Refactor creation of subprocess transports. Changes on BaseSubprocessTransport: * Add a wait() method to wait until the child process exit * The constructor now accepts an optional waiter parameter. The _post_init() coroutine must not be called explicitly anymore. It makes subprocess transports closer to other transports, and it gives more freedom if we want later to change completly how subprocess transports are created. * close() now kills the process instead of kindly terminate it: the child process may ignore SIGTERM and continue to run. Call explicitly terminate() and wait() if you want to kindly terminate the child process. * close() now logs a warning in debug mode if the process is still running and needs to be killed * _make_subprocess_transport() is now fully asynchronous again: if the creation of the transport failed, wait asynchronously for the process eixt. Before the wait was synchronous. This change requires close() to *kill*, and not terminate, the child process. * Remove the _kill_wait() method, replaced with a more agressive close() method. It fixes _make_subprocess_transport() on error. BaseSubprocessTransport.close() calls the close() method of pipe transports, whereas _kill_wait() closed directly pipes of the subprocess.Popen object without unregistering file descriptors from the selector (which caused severe bugs). These changes simplifies the code of subprocess.py.
2015-01-29 19:05:19 -04:00
class SubprocessMixin:
2014-02-18 23:56:15 -04:00
def test_stdin_stdout(self):
args = PROGRAM_CAT
@asyncio.coroutine
def run(data):
proc = yield from asyncio.create_subprocess_exec(
*args,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
loop=self.loop)
# feed data
proc.stdin.write(data)
yield from proc.stdin.drain()
proc.stdin.close()
# get output and exitcode
data = yield from proc.stdout.read()
exitcode = yield from proc.wait()
return (exitcode, data)
task = run(b'some data')
task = asyncio.wait_for(task, 60.0, loop=self.loop)
exitcode, stdout = self.loop.run_until_complete(task)
self.assertEqual(exitcode, 0)
self.assertEqual(stdout, b'some data')
def test_communicate(self):
args = PROGRAM_CAT
@asyncio.coroutine
def run(data):
proc = yield from asyncio.create_subprocess_exec(
*args,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
loop=self.loop)
stdout, stderr = yield from proc.communicate(data)
return proc.returncode, stdout
task = run(b'some data')
task = asyncio.wait_for(task, 60.0, loop=self.loop)
exitcode, stdout = self.loop.run_until_complete(task)
self.assertEqual(exitcode, 0)
self.assertEqual(stdout, b'some data')
def test_shell(self):
create = asyncio.create_subprocess_shell('exit 7',
loop=self.loop)
proc = self.loop.run_until_complete(create)
exitcode = self.loop.run_until_complete(proc.wait())
self.assertEqual(exitcode, 7)
def test_start_new_session(self):
# start the new process in a new session
create = asyncio.create_subprocess_shell('exit 8',
start_new_session=True,
loop=self.loop)
proc = self.loop.run_until_complete(create)
exitcode = self.loop.run_until_complete(proc.wait())
self.assertEqual(exitcode, 8)
def test_kill(self):
args = PROGRAM_BLOCKED
create = asyncio.create_subprocess_exec(*args, loop=self.loop)
proc = self.loop.run_until_complete(create)
proc.kill()
returncode = self.loop.run_until_complete(proc.wait())
if sys.platform == 'win32':
self.assertIsInstance(returncode, int)
# expect 1 but sometimes get 0
else:
self.assertEqual(-signal.SIGKILL, returncode)
def test_terminate(self):
args = PROGRAM_BLOCKED
create = asyncio.create_subprocess_exec(*args, loop=self.loop)
proc = self.loop.run_until_complete(create)
proc.terminate()
returncode = self.loop.run_until_complete(proc.wait())
if sys.platform == 'win32':
self.assertIsInstance(returncode, int)
# expect 1 but sometimes get 0
else:
self.assertEqual(-signal.SIGTERM, returncode)
@unittest.skipIf(sys.platform == 'win32', "Don't have SIGHUP")
def test_send_signal(self):
code = 'import time; print("sleeping", flush=True); time.sleep(3600)'
args = [sys.executable, '-c', code]
2015-01-08 19:09:10 -04:00
create = asyncio.create_subprocess_exec(*args,
stdout=subprocess.PIPE,
loop=self.loop)
proc = self.loop.run_until_complete(create)
@asyncio.coroutine
def send_signal(proc):
# basic synchronization to wait until the program is sleeping
line = yield from proc.stdout.readline()
self.assertEqual(line, b'sleeping\n')
proc.send_signal(signal.SIGHUP)
returncode = (yield from proc.wait())
return returncode
returncode = self.loop.run_until_complete(send_signal(proc))
self.assertEqual(-signal.SIGHUP, returncode)
def prepare_broken_pipe_test(self):
# buffer large enough to feed the whole pipe buffer
large_data = b'x' * support.PIPE_MAX_SIZE
# the program ends before the stdin can be feeded
create = asyncio.create_subprocess_exec(
sys.executable, '-c', 'pass',
stdin=subprocess.PIPE,
loop=self.loop)
proc = self.loop.run_until_complete(create)
return (proc, large_data)
def test_stdin_broken_pipe(self):
proc, large_data = self.prepare_broken_pipe_test()
@asyncio.coroutine
def write_stdin(proc, data):
proc.stdin.write(data)
yield from proc.stdin.drain()
coro = write_stdin(proc, large_data)
# drain() must raise BrokenPipeError or ConnectionResetError
with test_utils.disable_logger():
self.assertRaises((BrokenPipeError, ConnectionResetError),
self.loop.run_until_complete, coro)
self.loop.run_until_complete(proc.wait())
def test_communicate_ignore_broken_pipe(self):
proc, large_data = self.prepare_broken_pipe_test()
# communicate() must ignore BrokenPipeError when feeding stdin
with test_utils.disable_logger():
self.loop.run_until_complete(proc.communicate(large_data))
self.loop.run_until_complete(proc.wait())
def test_pause_reading(self):
limit = 10
size = (limit * 2 + 1)
@asyncio.coroutine
def test_pause_reading():
code = '\n'.join((
'import sys',
'sys.stdout.write("x" * %s)' % size,
'sys.stdout.flush()',
))
connect_read_pipe = self.loop.connect_read_pipe
@asyncio.coroutine
def connect_read_pipe_mock(*args, **kw):
transport, protocol = yield from connect_read_pipe(*args, **kw)
transport.pause_reading = mock.Mock()
transport.resume_reading = mock.Mock()
return (transport, protocol)
self.loop.connect_read_pipe = connect_read_pipe_mock
proc = yield from asyncio.create_subprocess_exec(
sys.executable, '-c', code,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
limit=limit,
loop=self.loop)
stdout_transport = proc._transport.get_pipe_transport(1)
stdout, stderr = yield from proc.communicate()
# The child process produced more than limit bytes of output,
# the stream reader transport should pause the protocol to not
# allocate too much memory.
return (stdout, stdout_transport)
# Issue #22685: Ensure that the stream reader pauses the protocol
# when the child process produces too much data
stdout, transport = self.loop.run_until_complete(test_pause_reading())
self.assertEqual(stdout, b'x' * size)
self.assertTrue(transport.pause_reading.called)
self.assertTrue(transport.resume_reading.called)
def test_stdin_not_inheritable(self):
# asyncio issue #209: stdin must not be inheritable, otherwise
# the Process.communicate() hangs
@asyncio.coroutine
def len_message(message):
code = 'import sys; data = sys.stdin.read(); print(len(data))'
proc = yield from asyncio.create_subprocess_exec(
sys.executable, '-c', code,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
close_fds=False,
loop=self.loop)
stdout, stderr = yield from proc.communicate(message)
exitcode = yield from proc.wait()
return (stdout, exitcode)
output, exitcode = self.loop.run_until_complete(len_message(b'abc'))
self.assertEqual(output.rstrip(), b'3')
self.assertEqual(exitcode, 0)
def test_cancel_process_wait(self):
# Issue #23140: cancel Process.wait()
@asyncio.coroutine
def cancel_wait():
proc = yield from asyncio.create_subprocess_exec(
*PROGRAM_BLOCKED,
loop=self.loop)
# Create an internal future waiting on the process exit
task = self.loop.create_task(proc.wait())
self.loop.call_soon(task.cancel)
try:
yield from task
except asyncio.CancelledError:
pass
# Cancel the future
task.cancel()
# Kill the process and wait until it is done
proc.kill()
yield from proc.wait()
self.loop.run_until_complete(cancel_wait())
def test_cancel_make_subprocess_transport_exec(self):
@asyncio.coroutine
def cancel_make_transport():
coro = asyncio.create_subprocess_exec(*PROGRAM_BLOCKED,
loop=self.loop)
task = self.loop.create_task(coro)
self.loop.call_soon(task.cancel)
try:
yield from task
except asyncio.CancelledError:
pass
# ignore the log:
# "Exception during subprocess creation, kill the subprocess"
with test_utils.disable_logger():
self.loop.run_until_complete(cancel_make_transport())
def test_cancel_post_init(self):
@asyncio.coroutine
def cancel_make_transport():
coro = self.loop.subprocess_exec(asyncio.SubprocessProtocol,
*PROGRAM_BLOCKED)
task = self.loop.create_task(coro)
self.loop.call_soon(task.cancel)
try:
yield from task
except asyncio.CancelledError:
pass
# ignore the log:
# "Exception during subprocess creation, kill the subprocess"
with test_utils.disable_logger():
self.loop.run_until_complete(cancel_make_transport())
test_utils.run_briefly(self.loop)
def test_close_kill_running(self):
@asyncio.coroutine
def kill_running():
create = self.loop.subprocess_exec(asyncio.SubprocessProtocol,
*PROGRAM_BLOCKED)
transport, protocol = yield from create
kill_called = False
def kill():
nonlocal kill_called
kill_called = True
orig_kill()
proc = transport.get_extra_info('subprocess')
orig_kill = proc.kill
proc.kill = kill
returncode = transport.get_returncode()
transport.close()
yield from transport._wait()
return (returncode, kill_called)
# Ignore "Close running child process: kill ..." log
with test_utils.disable_logger():
returncode, killed = self.loop.run_until_complete(kill_running())
self.assertIsNone(returncode)
# transport.close() must kill the process if it is still running
self.assertTrue(killed)
test_utils.run_briefly(self.loop)
def test_close_dont_kill_finished(self):
@asyncio.coroutine
def kill_running():
create = self.loop.subprocess_exec(asyncio.SubprocessProtocol,
*PROGRAM_BLOCKED)
transport, protocol = yield from create
proc = transport.get_extra_info('subprocess')
# kill the process (but asyncio is not notified immediatly)
proc.kill()
proc.wait()
proc.kill = mock.Mock()
proc_returncode = proc.poll()
transport_returncode = transport.get_returncode()
transport.close()
return (proc_returncode, transport_returncode, proc.kill.called)
# Ignore "Unknown child process pid ..." log of SafeChildWatcher,
# emitted because the test already consumes the exit status:
# proc.wait()
with test_utils.disable_logger():
result = self.loop.run_until_complete(kill_running())
test_utils.run_briefly(self.loop)
proc_returncode, transport_return_code, killed = result
self.assertIsNotNone(proc_returncode)
self.assertIsNone(transport_return_code)
# transport.close() must not kill the process if it finished, even if
# the transport was not notified yet
self.assertFalse(killed)
if sys.platform != 'win32':
# Unix
class SubprocessWatcherMixin(SubprocessMixin):
2014-02-18 23:56:15 -04:00
Watcher = None
def setUp(self):
policy = asyncio.get_event_loop_policy()
self.loop = policy.new_event_loop()
self.set_event_loop(self.loop)
watcher = self.Watcher()
watcher.attach_loop(self.loop)
policy.set_child_watcher(watcher)
self.addCleanup(policy.set_child_watcher, None)
2014-02-18 23:56:15 -04:00
class SubprocessSafeWatcherTests(SubprocessWatcherMixin,
test_utils.TestCase):
2014-02-18 23:56:15 -04:00
Watcher = unix_events.SafeChildWatcher
2014-02-18 23:56:15 -04:00
class SubprocessFastWatcherTests(SubprocessWatcherMixin,
test_utils.TestCase):
2014-02-18 23:56:15 -04:00
Watcher = unix_events.FastChildWatcher
2014-02-18 23:56:15 -04:00
else:
# Windows
class SubprocessProactorTests(SubprocessMixin, test_utils.TestCase):
2014-02-18 23:56:15 -04:00
def setUp(self):
self.loop = asyncio.ProactorEventLoop()
self.set_event_loop(self.loop)
if __name__ == '__main__':
unittest.main()