diff --git a/Lib/test/test_asyncio/test_subprocess.py b/Lib/test/test_asyncio/test_subprocess.py index 3830dea7d9b..1ae290a003f 100644 --- a/Lib/test/test_asyncio/test_subprocess.py +++ b/Lib/test/test_asyncio/test_subprocess.py @@ -686,6 +686,88 @@ class SubprocessMixin: self.assertIsNone(self.loop.run_until_complete(execute())) + async def check_stdout_output(self, coro, output): + proc = await coro + stdout, _ = await proc.communicate() + self.assertEqual(stdout, output) + self.assertEqual(proc.returncode, 0) + task = asyncio.create_task(proc.wait()) + await asyncio.sleep(0) + self.assertEqual(task.result(), proc.returncode) + + def test_create_subprocess_env_shell(self) -> None: + async def main() -> None: + cmd = f'''{sys.executable} -c "import os, sys; sys.stdout.write(os.getenv('FOO'))"''' + env = {"FOO": 'bar'} + proc = await asyncio.create_subprocess_shell( + cmd, env=env, stdout=subprocess.PIPE + ) + return proc + + self.loop.run_until_complete(self.check_stdout_output(main(), b'bar')) + + def test_create_subprocess_env_exec(self) -> None: + async def main() -> None: + cmd = [sys.executable, "-c", + "import os, sys; sys.stdout.write(os.getenv('FOO'))"] + env = {"FOO": 'baz'} + proc = await asyncio.create_subprocess_exec( + *cmd, env=env, stdout=subprocess.PIPE + ) + return proc + + self.loop.run_until_complete(self.check_stdout_output(main(), b'baz')) + + + def test_subprocess_concurrent_wait(self) -> None: + async def main() -> None: + proc = await asyncio.create_subprocess_exec( + *PROGRAM_CAT, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + ) + stdout, _ = await proc.communicate(b'some data') + self.assertEqual(stdout, b"some data") + self.assertEqual(proc.returncode, 0) + self.assertEqual(await asyncio.gather(*[proc.wait() for _ in range(10)]), + [proc.returncode] * 10) + + self.loop.run_until_complete(main()) + + def test_subprocess_consistent_callbacks(self): + events = [] + class MyProtocol(asyncio.SubprocessProtocol): + def __init__(self, exit_future: asyncio.Future) -> None: + self.exit_future = exit_future + + def pipe_data_received(self, fd, data) -> None: + events.append(('pipe_data_received', fd, data)) + + def pipe_connection_lost(self, fd, exc) -> None: + events.append('pipe_connection_lost') + + def process_exited(self) -> None: + events.append('process_exited') + self.exit_future.set_result(True) + + async def main() -> None: + loop = asyncio.get_running_loop() + exit_future = asyncio.Future() + code = 'import sys; sys.stdout.write("stdout"); sys.stderr.write("stderr")' + transport, _ = await loop.subprocess_exec(lambda: MyProtocol(exit_future), + sys.executable, '-c', code, stdin=None) + await exit_future + transport.close() + self.assertEqual(events, [ + ('pipe_data_received', 1, b'stdout'), + ('pipe_data_received', 2, b'stderr'), + 'pipe_connection_lost', + 'pipe_connection_lost', + 'process_exited', + ]) + + self.loop.run_until_complete(main()) + def test_subprocess_communicate_stdout(self): # See https://github.com/python/cpython/issues/100133 async def get_command_stdout(cmd, *args):