forked from rrcarlosr/Jetpack
398 lines
15 KiB
Python
398 lines
15 KiB
Python
# Copyright (c) 2012 The Chromium OS Authors.
|
|
# Use of this source code is governed by a BSD-style license that can be
|
|
# found in the LICENSE file.
|
|
#
|
|
# Copyright (c) 2003-2005 by Peter Astrand <astrand@lysator.liu.se>
|
|
# Licensed to PSF under a Contributor Agreement.
|
|
# See http://www.python.org/2.4/license for licensing details.
|
|
|
|
"""Subprocress execution
|
|
|
|
This module holds a subclass of subprocess.Popen with our own required
|
|
features, mainly that we get access to the subprocess output while it
|
|
is running rather than just at the end. This makes it easiler to show
|
|
progress information and filter output in real time.
|
|
"""
|
|
|
|
import errno
|
|
import os
|
|
import pty
|
|
import select
|
|
import subprocess
|
|
import sys
|
|
import unittest
|
|
|
|
|
|
# Import these here so the caller does not need to import subprocess also.
|
|
PIPE = subprocess.PIPE
|
|
STDOUT = subprocess.STDOUT
|
|
PIPE_PTY = -3 # Pipe output through a pty
|
|
stay_alive = True
|
|
|
|
|
|
class Popen(subprocess.Popen):
|
|
"""Like subprocess.Popen with ptys and incremental output
|
|
|
|
This class deals with running a child process and filtering its output on
|
|
both stdout and stderr while it is running. We do this so we can monitor
|
|
progress, and possibly relay the output to the user if requested.
|
|
|
|
The class is similar to subprocess.Popen, the equivalent is something like:
|
|
|
|
Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
|
|
But this class has many fewer features, and two enhancement:
|
|
|
|
1. Rather than getting the output data only at the end, this class sends it
|
|
to a provided operation as it arrives.
|
|
2. We use pseudo terminals so that the child will hopefully flush its output
|
|
to us as soon as it is produced, rather than waiting for the end of a
|
|
line.
|
|
|
|
Use CommunicateFilter() to handle output from the subprocess.
|
|
|
|
"""
|
|
|
|
def __init__(self, args, stdin=None, stdout=PIPE_PTY, stderr=PIPE_PTY,
|
|
shell=False, cwd=None, env=None, **kwargs):
|
|
"""Cut-down constructor
|
|
|
|
Args:
|
|
args: Program and arguments for subprocess to execute.
|
|
stdin: See subprocess.Popen()
|
|
stdout: See subprocess.Popen(), except that we support the sentinel
|
|
value of cros_subprocess.PIPE_PTY.
|
|
stderr: See subprocess.Popen(), except that we support the sentinel
|
|
value of cros_subprocess.PIPE_PTY.
|
|
shell: See subprocess.Popen()
|
|
cwd: Working directory to change to for subprocess, or None if none.
|
|
env: Environment to use for this subprocess, or None to inherit parent.
|
|
kwargs: No other arguments are supported at the moment. Passing other
|
|
arguments will cause a ValueError to be raised.
|
|
"""
|
|
stdout_pty = None
|
|
stderr_pty = None
|
|
|
|
if stdout == PIPE_PTY:
|
|
stdout_pty = pty.openpty()
|
|
stdout = os.fdopen(stdout_pty[1])
|
|
if stderr == PIPE_PTY:
|
|
stderr_pty = pty.openpty()
|
|
stderr = os.fdopen(stderr_pty[1])
|
|
|
|
super(Popen, self).__init__(args, stdin=stdin,
|
|
stdout=stdout, stderr=stderr, shell=shell, cwd=cwd, env=env,
|
|
**kwargs)
|
|
|
|
# If we're on a PTY, we passed the slave half of the PTY to the subprocess.
|
|
# We want to use the master half on our end from now on. Setting this here
|
|
# does make some assumptions about the implementation of subprocess, but
|
|
# those assumptions are pretty minor.
|
|
|
|
# Note that if stderr is STDOUT, then self.stderr will be set to None by
|
|
# this constructor.
|
|
if stdout_pty is not None:
|
|
self.stdout = os.fdopen(stdout_pty[0])
|
|
if stderr_pty is not None:
|
|
self.stderr = os.fdopen(stderr_pty[0])
|
|
|
|
# Insist that unit tests exist for other arguments we don't support.
|
|
if kwargs:
|
|
raise ValueError("Unit tests do not test extra args - please add tests")
|
|
|
|
def CommunicateFilter(self, output):
|
|
"""Interact with process: Read data from stdout and stderr.
|
|
|
|
This method runs until end-of-file is reached, then waits for the
|
|
subprocess to terminate.
|
|
|
|
The output function is sent all output from the subprocess and must be
|
|
defined like this:
|
|
|
|
def Output([self,] stream, data)
|
|
Args:
|
|
stream: the stream the output was received on, which will be
|
|
sys.stdout or sys.stderr.
|
|
data: a string containing the data
|
|
|
|
Note: The data read is buffered in memory, so do not use this
|
|
method if the data size is large or unlimited.
|
|
|
|
Args:
|
|
output: Function to call with each fragment of output.
|
|
|
|
Returns:
|
|
A tuple (stdout, stderr, combined) which is the data received on
|
|
stdout, stderr and the combined data (interleaved stdout and stderr).
|
|
|
|
Note that the interleaved output will only be sensible if you have
|
|
set both stdout and stderr to PIPE or PIPE_PTY. Even then it depends on
|
|
the timing of the output in the subprocess. If a subprocess flips
|
|
between stdout and stderr quickly in succession, by the time we come to
|
|
read the output from each we may see several lines in each, and will read
|
|
all the stdout lines, then all the stderr lines. So the interleaving
|
|
may not be correct. In this case you might want to pass
|
|
stderr=cros_subprocess.STDOUT to the constructor.
|
|
|
|
This feature is still useful for subprocesses where stderr is
|
|
rarely used and indicates an error.
|
|
|
|
Note also that if you set stderr to STDOUT, then stderr will be empty
|
|
and the combined output will just be the same as stdout.
|
|
"""
|
|
|
|
read_set = []
|
|
write_set = []
|
|
stdout = None # Return
|
|
stderr = None # Return
|
|
|
|
if self.stdin:
|
|
# Flush stdio buffer. This might block, if the user has
|
|
# been writing to .stdin in an uncontrolled fashion.
|
|
self.stdin.flush()
|
|
if input:
|
|
write_set.append(self.stdin)
|
|
else:
|
|
self.stdin.close()
|
|
if self.stdout:
|
|
read_set.append(self.stdout)
|
|
stdout = []
|
|
if self.stderr and self.stderr != self.stdout:
|
|
read_set.append(self.stderr)
|
|
stderr = []
|
|
combined = []
|
|
|
|
input_offset = 0
|
|
while read_set or write_set:
|
|
try:
|
|
rlist, wlist, _ = select.select(read_set, write_set, [], 0.2)
|
|
except select.error, e:
|
|
if e.args[0] == errno.EINTR:
|
|
continue
|
|
raise
|
|
|
|
if not stay_alive:
|
|
self.terminate()
|
|
|
|
if self.stdin in wlist:
|
|
# When select has indicated that the file is writable,
|
|
# we can write up to PIPE_BUF bytes without risk
|
|
# blocking. POSIX defines PIPE_BUF >= 512
|
|
chunk = input[input_offset : input_offset + 512]
|
|
bytes_written = os.write(self.stdin.fileno(), chunk)
|
|
input_offset += bytes_written
|
|
if input_offset >= len(input):
|
|
self.stdin.close()
|
|
write_set.remove(self.stdin)
|
|
|
|
if self.stdout in rlist:
|
|
data = ""
|
|
# We will get an error on read if the pty is closed
|
|
try:
|
|
data = os.read(self.stdout.fileno(), 1024)
|
|
except OSError:
|
|
pass
|
|
if data == "":
|
|
self.stdout.close()
|
|
read_set.remove(self.stdout)
|
|
else:
|
|
stdout.append(data)
|
|
combined.append(data)
|
|
if output:
|
|
output(sys.stdout, data)
|
|
if self.stderr in rlist:
|
|
data = ""
|
|
# We will get an error on read if the pty is closed
|
|
try:
|
|
data = os.read(self.stderr.fileno(), 1024)
|
|
except OSError:
|
|
pass
|
|
if data == "":
|
|
self.stderr.close()
|
|
read_set.remove(self.stderr)
|
|
else:
|
|
stderr.append(data)
|
|
combined.append(data)
|
|
if output:
|
|
output(sys.stderr, data)
|
|
|
|
# All data exchanged. Translate lists into strings.
|
|
if stdout is not None:
|
|
stdout = ''.join(stdout)
|
|
else:
|
|
stdout = ''
|
|
if stderr is not None:
|
|
stderr = ''.join(stderr)
|
|
else:
|
|
stderr = ''
|
|
combined = ''.join(combined)
|
|
|
|
# Translate newlines, if requested. We cannot let the file
|
|
# object do the translation: It is based on stdio, which is
|
|
# impossible to combine with select (unless forcing no
|
|
# buffering).
|
|
if self.universal_newlines and hasattr(file, 'newlines'):
|
|
if stdout:
|
|
stdout = self._translate_newlines(stdout)
|
|
if stderr:
|
|
stderr = self._translate_newlines(stderr)
|
|
|
|
self.wait()
|
|
return (stdout, stderr, combined)
|
|
|
|
|
|
# Just being a unittest.TestCase gives us 14 public methods. Unless we
|
|
# disable this, we can only have 6 tests in a TestCase. That's not enough.
|
|
#
|
|
# pylint: disable=R0904
|
|
|
|
class TestSubprocess(unittest.TestCase):
|
|
"""Our simple unit test for this module"""
|
|
|
|
class MyOperation:
|
|
"""Provides a operation that we can pass to Popen"""
|
|
def __init__(self, input_to_send=None):
|
|
"""Constructor to set up the operation and possible input.
|
|
|
|
Args:
|
|
input_to_send: a text string to send when we first get input. We will
|
|
add \r\n to the string.
|
|
"""
|
|
self.stdout_data = ''
|
|
self.stderr_data = ''
|
|
self.combined_data = ''
|
|
self.stdin_pipe = None
|
|
self._input_to_send = input_to_send
|
|
if input_to_send:
|
|
pipe = os.pipe()
|
|
self.stdin_read_pipe = pipe[0]
|
|
self._stdin_write_pipe = os.fdopen(pipe[1], 'w')
|
|
|
|
def Output(self, stream, data):
|
|
"""Output handler for Popen. Stores the data for later comparison"""
|
|
if stream == sys.stdout:
|
|
self.stdout_data += data
|
|
if stream == sys.stderr:
|
|
self.stderr_data += data
|
|
self.combined_data += data
|
|
|
|
# Output the input string if we have one.
|
|
if self._input_to_send:
|
|
self._stdin_write_pipe.write(self._input_to_send + '\r\n')
|
|
self._stdin_write_pipe.flush()
|
|
|
|
def _BasicCheck(self, plist, oper):
|
|
"""Basic checks that the output looks sane."""
|
|
self.assertEqual(plist[0], oper.stdout_data)
|
|
self.assertEqual(plist[1], oper.stderr_data)
|
|
self.assertEqual(plist[2], oper.combined_data)
|
|
|
|
# The total length of stdout and stderr should equal the combined length
|
|
self.assertEqual(len(plist[0]) + len(plist[1]), len(plist[2]))
|
|
|
|
def test_simple(self):
|
|
"""Simple redirection: Get process list"""
|
|
oper = TestSubprocess.MyOperation()
|
|
plist = Popen(['ps']).CommunicateFilter(oper.Output)
|
|
self._BasicCheck(plist, oper)
|
|
|
|
def test_stderr(self):
|
|
"""Check stdout and stderr"""
|
|
oper = TestSubprocess.MyOperation()
|
|
cmd = 'echo fred >/dev/stderr && false || echo bad'
|
|
plist = Popen([cmd], shell=True).CommunicateFilter(oper.Output)
|
|
self._BasicCheck(plist, oper)
|
|
self.assertEqual(plist [0], 'bad\r\n')
|
|
self.assertEqual(plist [1], 'fred\r\n')
|
|
|
|
def test_shell(self):
|
|
"""Check with and without shell works"""
|
|
oper = TestSubprocess.MyOperation()
|
|
cmd = 'echo test >/dev/stderr'
|
|
self.assertRaises(OSError, Popen, [cmd], shell=False)
|
|
plist = Popen([cmd], shell=True).CommunicateFilter(oper.Output)
|
|
self._BasicCheck(plist, oper)
|
|
self.assertEqual(len(plist [0]), 0)
|
|
self.assertEqual(plist [1], 'test\r\n')
|
|
|
|
def test_list_args(self):
|
|
"""Check with and without shell works using list arguments"""
|
|
oper = TestSubprocess.MyOperation()
|
|
cmd = ['echo', 'test', '>/dev/stderr']
|
|
plist = Popen(cmd, shell=False).CommunicateFilter(oper.Output)
|
|
self._BasicCheck(plist, oper)
|
|
self.assertEqual(plist [0], ' '.join(cmd[1:]) + '\r\n')
|
|
self.assertEqual(len(plist [1]), 0)
|
|
|
|
oper = TestSubprocess.MyOperation()
|
|
|
|
# this should be interpreted as 'echo' with the other args dropped
|
|
cmd = ['echo', 'test', '>/dev/stderr']
|
|
plist = Popen(cmd, shell=True).CommunicateFilter(oper.Output)
|
|
self._BasicCheck(plist, oper)
|
|
self.assertEqual(plist [0], '\r\n')
|
|
|
|
def test_cwd(self):
|
|
"""Check we can change directory"""
|
|
for shell in (False, True):
|
|
oper = TestSubprocess.MyOperation()
|
|
plist = Popen('pwd', shell=shell, cwd='/tmp').CommunicateFilter(oper.Output)
|
|
self._BasicCheck(plist, oper)
|
|
self.assertEqual(plist [0], '/tmp\r\n')
|
|
|
|
def test_env(self):
|
|
"""Check we can change environment"""
|
|
for add in (False, True):
|
|
oper = TestSubprocess.MyOperation()
|
|
env = os.environ
|
|
if add:
|
|
env ['FRED'] = 'fred'
|
|
cmd = 'echo $FRED'
|
|
plist = Popen(cmd, shell=True, env=env).CommunicateFilter(oper.Output)
|
|
self._BasicCheck(plist, oper)
|
|
self.assertEqual(plist [0], add and 'fred\r\n' or '\r\n')
|
|
|
|
def test_extra_args(self):
|
|
"""Check we can't add extra arguments"""
|
|
self.assertRaises(ValueError, Popen, 'true', close_fds=False)
|
|
|
|
def test_basic_input(self):
|
|
"""Check that incremental input works
|
|
|
|
We set up a subprocess which will prompt for name. When we see this prompt
|
|
we send the name as input to the process. It should then print the name
|
|
properly to stdout.
|
|
"""
|
|
oper = TestSubprocess.MyOperation('Flash')
|
|
prompt = 'What is your name?: '
|
|
cmd = 'echo -n "%s"; read name; echo Hello $name' % prompt
|
|
plist = Popen([cmd], stdin=oper.stdin_read_pipe,
|
|
shell=True).CommunicateFilter(oper.Output)
|
|
self._BasicCheck(plist, oper)
|
|
self.assertEqual(len(plist [1]), 0)
|
|
self.assertEqual(plist [0], prompt + 'Hello Flash\r\r\n')
|
|
|
|
def test_isatty(self):
|
|
"""Check that ptys appear as terminals to the subprocess"""
|
|
oper = TestSubprocess.MyOperation()
|
|
cmd = ('if [ -t %d ]; then echo "terminal %d" >&%d; '
|
|
'else echo "not %d" >&%d; fi;')
|
|
both_cmds = ''
|
|
for fd in (1, 2):
|
|
both_cmds += cmd % (fd, fd, fd, fd, fd)
|
|
plist = Popen(both_cmds, shell=True).CommunicateFilter(oper.Output)
|
|
self._BasicCheck(plist, oper)
|
|
self.assertEqual(plist [0], 'terminal 1\r\n')
|
|
self.assertEqual(plist [1], 'terminal 2\r\n')
|
|
|
|
# Now try with PIPE and make sure it is not a terminal
|
|
oper = TestSubprocess.MyOperation()
|
|
plist = Popen(both_cmds, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
|
|
shell=True).CommunicateFilter(oper.Output)
|
|
self._BasicCheck(plist, oper)
|
|
self.assertEqual(plist [0], 'not 1\n')
|
|
self.assertEqual(plist [1], 'not 2\n')
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|