from asyncio import subprocess import asyncio import signal import sys import unittest from test import support if sys.platform != 'win32': from asyncio import unix_events # Program blocking PROGRAM_BLOCKED = [sys.executable, '-c', 'import time; time.sleep(3600)'] # Program sleeping during 1 second PROGRAM_SLEEP_1SEC = [sys.executable, '-c', 'import time; time.sleep(1)'] # Program copying input to output PROGRAM_CAT = [ sys.executable, '-c', ';'.join(('import sys', 'data = sys.stdin.buffer.read()', 'sys.stdout.buffer.write(data)'))] class SubprocessMixin: def test_stdin_stdout(self): args = PROGRAM_CAT @asyncio.coroutine def run(data): proc = yield from asyncio.create_subprocess_exec( *args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, loop=self.loop) # feed data proc.stdin.write(data) yield from proc.stdin.drain() proc.stdin.close() # get output and exitcode data = yield from proc.stdout.read() exitcode = yield from proc.wait() return (exitcode, data) task = run(b'some data') task = asyncio.wait_for(task, 10.0, loop=self.loop) exitcode, stdout = self.loop.run_until_complete(task) self.assertEqual(exitcode, 0) self.assertEqual(stdout, b'some data') def test_communicate(self): args = PROGRAM_CAT @asyncio.coroutine def run(data): proc = yield from asyncio.create_subprocess_exec( *args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, loop=self.loop) stdout, stderr = yield from proc.communicate(data) return proc.returncode, stdout task = run(b'some data') task = asyncio.wait_for(task, 10.0, loop=self.loop) exitcode, stdout = self.loop.run_until_complete(task) self.assertEqual(exitcode, 0) self.assertEqual(stdout, b'some data') def test_shell(self): create = asyncio.create_subprocess_shell('exit 7', loop=self.loop) proc = self.loop.run_until_complete(create) exitcode = self.loop.run_until_complete(proc.wait()) self.assertEqual(exitcode, 7) def test_start_new_session(self): # start the new process in a new session create = asyncio.create_subprocess_shell('exit 8', start_new_session=True, loop=self.loop) proc = self.loop.run_until_complete(create) exitcode = self.loop.run_until_complete(proc.wait()) self.assertEqual(exitcode, 8) def test_kill(self): args = PROGRAM_BLOCKED create = asyncio.create_subprocess_exec(*args, loop=self.loop) proc = self.loop.run_until_complete(create) proc.kill() returncode = self.loop.run_until_complete(proc.wait()) if sys.platform == 'win32': self.assertIsInstance(returncode, int) # expect 1 but sometimes get 0 else: self.assertEqual(-signal.SIGKILL, returncode) def test_terminate(self): args = PROGRAM_BLOCKED create = asyncio.create_subprocess_exec(*args, loop=self.loop) proc = self.loop.run_until_complete(create) proc.terminate() returncode = self.loop.run_until_complete(proc.wait()) if sys.platform == 'win32': self.assertIsInstance(returncode, int) # expect 1 but sometimes get 0 else: self.assertEqual(-signal.SIGTERM, returncode) @unittest.skipIf(sys.platform == 'win32', "Don't have SIGHUP") def test_send_signal(self): args = PROGRAM_BLOCKED create = asyncio.create_subprocess_exec(*args, loop=self.loop) proc = self.loop.run_until_complete(create) proc.send_signal(signal.SIGHUP) returncode = self.loop.run_until_complete(proc.wait()) self.assertEqual(-signal.SIGHUP, returncode) def test_broken_pipe(self): large_data = b'x' * support.PIPE_MAX_SIZE create = asyncio.create_subprocess_exec( *PROGRAM_SLEEP_1SEC, stdin=subprocess.PIPE, loop=self.loop) proc = self.loop.run_until_complete(create) with self.assertRaises(BrokenPipeError): self.loop.run_until_complete(proc.communicate(large_data)) self.loop.run_until_complete(proc.wait()) if sys.platform != 'win32': # Unix class SubprocessWatcherMixin(SubprocessMixin): Watcher = None def setUp(self): policy = asyncio.get_event_loop_policy() self.loop = policy.new_event_loop() # ensure that the event loop is passed explicitly in the code policy.set_event_loop(None) watcher = self.Watcher() watcher.attach_loop(self.loop) policy.set_child_watcher(watcher) def tearDown(self): policy = asyncio.get_event_loop_policy() policy.set_child_watcher(None) self.loop.close() policy.set_event_loop(None) class SubprocessSafeWatcherTests(SubprocessWatcherMixin, unittest.TestCase): Watcher = unix_events.SafeChildWatcher class SubprocessFastWatcherTests(SubprocessWatcherMixin, unittest.TestCase): Watcher = unix_events.FastChildWatcher else: # Windows class SubprocessProactorTests(SubprocessMixin, unittest.TestCase): def setUp(self): policy = asyncio.get_event_loop_policy() self.loop = asyncio.ProactorEventLoop() # ensure that the event loop is passed explicitly in the code policy.set_event_loop(None) def tearDown(self): policy = asyncio.get_event_loop_policy() self.loop.close() policy.set_event_loop(None) if __name__ == '__main__': unittest.main()