from asyncio import subprocess from asyncio import test_utils import asyncio import signal import sys import unittest from test import support if sys.platform != 'win32': from asyncio import unix_events # Program blocking PROGRAM_BLOCKED = [sys.executable, '-c', 'import time; time.sleep(3600)'] # Program copying input to output PROGRAM_CAT = [ sys.executable, '-c', ';'.join(('import sys', 'data = sys.stdin.buffer.read()', 'sys.stdout.buffer.write(data)'))] class SubprocessMixin: def test_stdin_stdout(self): args = PROGRAM_CAT @asyncio.coroutine def run(data): proc = yield from asyncio.create_subprocess_exec( *args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, loop=self.loop) # feed data proc.stdin.write(data) yield from proc.stdin.drain() proc.stdin.close() # get output and exitcode data = yield from proc.stdout.read() exitcode = yield from proc.wait() return (exitcode, data) task = run(b'some data') task = asyncio.wait_for(task, 60.0, loop=self.loop) exitcode, stdout = self.loop.run_until_complete(task) self.assertEqual(exitcode, 0) self.assertEqual(stdout, b'some data') def test_communicate(self): args = PROGRAM_CAT @asyncio.coroutine def run(data): proc = yield from asyncio.create_subprocess_exec( *args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, loop=self.loop) stdout, stderr = yield from proc.communicate(data) return proc.returncode, stdout task = run(b'some data') task = asyncio.wait_for(task, 60.0, loop=self.loop) exitcode, stdout = self.loop.run_until_complete(task) self.assertEqual(exitcode, 0) self.assertEqual(stdout, b'some data') def test_shell(self): create = asyncio.create_subprocess_shell('exit 7', loop=self.loop) proc = self.loop.run_until_complete(create) exitcode = self.loop.run_until_complete(proc.wait()) self.assertEqual(exitcode, 7) def test_start_new_session(self): # start the new process in a new session create = asyncio.create_subprocess_shell('exit 8', start_new_session=True, loop=self.loop) proc = self.loop.run_until_complete(create) exitcode = self.loop.run_until_complete(proc.wait()) self.assertEqual(exitcode, 8) def test_kill(self): args = PROGRAM_BLOCKED create = asyncio.create_subprocess_exec(*args, loop=self.loop) proc = self.loop.run_until_complete(create) proc.kill() returncode = self.loop.run_until_complete(proc.wait()) if sys.platform == 'win32': self.assertIsInstance(returncode, int) # expect 1 but sometimes get 0 else: self.assertEqual(-signal.SIGKILL, returncode) def test_terminate(self): args = PROGRAM_BLOCKED create = asyncio.create_subprocess_exec(*args, loop=self.loop) proc = self.loop.run_until_complete(create) proc.terminate() returncode = self.loop.run_until_complete(proc.wait()) if sys.platform == 'win32': self.assertIsInstance(returncode, int) # expect 1 but sometimes get 0 else: self.assertEqual(-signal.SIGTERM, returncode) @unittest.skipIf(sys.platform == 'win32', "Don't have SIGHUP") def test_send_signal(self): code = 'import time; print("sleeping", flush=True); time.sleep(3600)' args = [sys.executable, '-c', code] create = asyncio.create_subprocess_exec(*args, loop=self.loop, stdout=subprocess.PIPE) proc = self.loop.run_until_complete(create) @asyncio.coroutine def send_signal(proc): # basic synchronization to wait until the program is sleeping line = yield from proc.stdout.readline() self.assertEqual(line, b'sleeping\n') proc.send_signal(signal.SIGHUP) returncode = (yield from proc.wait()) return returncode returncode = self.loop.run_until_complete(send_signal(proc)) self.assertEqual(-signal.SIGHUP, returncode) def prepare_broken_pipe_test(self): # buffer large enough to feed the whole pipe buffer large_data = b'x' * support.PIPE_MAX_SIZE # the program ends before the stdin can be feeded create = asyncio.create_subprocess_exec( sys.executable, '-c', 'pass', stdin=subprocess.PIPE, loop=self.loop) proc = self.loop.run_until_complete(create) return (proc, large_data) def test_stdin_broken_pipe(self): proc, large_data = self.prepare_broken_pipe_test() @asyncio.coroutine def write_stdin(proc, data): proc.stdin.write(data) yield from proc.stdin.drain() coro = write_stdin(proc, large_data) # drain() must raise BrokenPipeError or ConnectionResetError self.assertRaises((BrokenPipeError, ConnectionResetError), self.loop.run_until_complete, coro) self.loop.run_until_complete(proc.wait()) def test_communicate_ignore_broken_pipe(self): proc, large_data = self.prepare_broken_pipe_test() # communicate() must ignore BrokenPipeError when feeding stdin self.loop.run_until_complete(proc.communicate(large_data)) self.loop.run_until_complete(proc.wait()) if sys.platform != 'win32': # Unix class SubprocessWatcherMixin(SubprocessMixin): Watcher = None def setUp(self): policy = asyncio.get_event_loop_policy() self.loop = policy.new_event_loop() # ensure that the event loop is passed explicitly in asyncio policy.set_event_loop(None) watcher = self.Watcher() watcher.attach_loop(self.loop) policy.set_child_watcher(watcher) def tearDown(self): policy = asyncio.get_event_loop_policy() policy.set_child_watcher(None) self.loop.close() super().tearDown() class SubprocessSafeWatcherTests(SubprocessWatcherMixin, test_utils.TestCase): Watcher = unix_events.SafeChildWatcher class SubprocessFastWatcherTests(SubprocessWatcherMixin, test_utils.TestCase): Watcher = unix_events.FastChildWatcher else: # Windows class SubprocessProactorTests(SubprocessMixin, test_utils.TestCase): def setUp(self): policy = asyncio.get_event_loop_policy() self.loop = asyncio.ProactorEventLoop() # ensure that the event loop is passed explicitly in asyncio policy.set_event_loop(None) def tearDown(self): policy = asyncio.get_event_loop_policy() self.loop.close() policy.set_event_loop(None) super().tearDown() if __name__ == '__main__': unittest.main()