| # Copyright (c) 2012 The Chromium OS Authors. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| # |
| # Copyright (c) 2003-2005 by Peter Astrand <astrand@lysator.liu.se> |
| # Licensed to PSF under a Contributor Agreement. |
| # See http://www.python.org/2.4/license for licensing details. |
| |
| """Subprocress execution |
| |
| This module holds a subclass of subprocess.Popen with our own required |
| features, mainly that we get access to the subprocess output while it |
| is running rather than just at the end. This makes it easiler to show |
| progress information and filter output in real time. |
| """ |
| |
| import errno |
| import os |
| import pty |
| import select |
| import subprocess |
| import sys |
| import unittest |
| |
| |
| # Import these here so the caller does not need to import subprocess also. |
| PIPE = subprocess.PIPE |
| STDOUT = subprocess.STDOUT |
| PIPE_PTY = -3 # Pipe output through a pty |
| stay_alive = True |
| |
| |
| class Popen(subprocess.Popen): |
| """Like subprocess.Popen with ptys and incremental output |
| |
| This class deals with running a child process and filtering its output on |
| both stdout and stderr while it is running. We do this so we can monitor |
| progress, and possibly relay the output to the user if requested. |
| |
| The class is similar to subprocess.Popen, the equivalent is something like: |
| |
| Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
| |
| But this class has many fewer features, and two enhancement: |
| |
| 1. Rather than getting the output data only at the end, this class sends it |
| to a provided operation as it arrives. |
| 2. We use pseudo terminals so that the child will hopefully flush its output |
| to us as soon as it is produced, rather than waiting for the end of a |
| line. |
| |
| Use CommunicateFilter() to handle output from the subprocess. |
| |
| """ |
| |
| def __init__(self, args, stdin=None, stdout=PIPE_PTY, stderr=PIPE_PTY, |
| shell=False, cwd=None, env=None, **kwargs): |
| """Cut-down constructor |
| |
| Args: |
| args: Program and arguments for subprocess to execute. |
| stdin: See subprocess.Popen() |
| stdout: See subprocess.Popen(), except that we support the sentinel |
| value of cros_subprocess.PIPE_PTY. |
| stderr: See subprocess.Popen(), except that we support the sentinel |
| value of cros_subprocess.PIPE_PTY. |
| shell: See subprocess.Popen() |
| cwd: Working directory to change to for subprocess, or None if none. |
| env: Environment to use for this subprocess, or None to inherit parent. |
| kwargs: No other arguments are supported at the moment. Passing other |
| arguments will cause a ValueError to be raised. |
| """ |
| stdout_pty = None |
| stderr_pty = None |
| |
| if stdout == PIPE_PTY: |
| stdout_pty = pty.openpty() |
| stdout = os.fdopen(stdout_pty[1]) |
| if stderr == PIPE_PTY: |
| stderr_pty = pty.openpty() |
| stderr = os.fdopen(stderr_pty[1]) |
| |
| super(Popen, self).__init__(args, stdin=stdin, |
| stdout=stdout, stderr=stderr, shell=shell, cwd=cwd, env=env, |
| **kwargs) |
| |
| # If we're on a PTY, we passed the slave half of the PTY to the subprocess. |
| # We want to use the master half on our end from now on. Setting this here |
| # does make some assumptions about the implementation of subprocess, but |
| # those assumptions are pretty minor. |
| |
| # Note that if stderr is STDOUT, then self.stderr will be set to None by |
| # this constructor. |
| if stdout_pty is not None: |
| self.stdout = os.fdopen(stdout_pty[0]) |
| if stderr_pty is not None: |
| self.stderr = os.fdopen(stderr_pty[0]) |
| |
| # Insist that unit tests exist for other arguments we don't support. |
| if kwargs: |
| raise ValueError("Unit tests do not test extra args - please add tests") |
| |
| def CommunicateFilter(self, output): |
| """Interact with process: Read data from stdout and stderr. |
| |
| This method runs until end-of-file is reached, then waits for the |
| subprocess to terminate. |
| |
| The output function is sent all output from the subprocess and must be |
| defined like this: |
| |
| def Output([self,] stream, data) |
| Args: |
| stream: the stream the output was received on, which will be |
| sys.stdout or sys.stderr. |
| data: a string containing the data |
| |
| Note: The data read is buffered in memory, so do not use this |
| method if the data size is large or unlimited. |
| |
| Args: |
| output: Function to call with each fragment of output. |
| |
| Returns: |
| A tuple (stdout, stderr, combined) which is the data received on |
| stdout, stderr and the combined data (interleaved stdout and stderr). |
| |
| Note that the interleaved output will only be sensible if you have |
| set both stdout and stderr to PIPE or PIPE_PTY. Even then it depends on |
| the timing of the output in the subprocess. If a subprocess flips |
| between stdout and stderr quickly in succession, by the time we come to |
| read the output from each we may see several lines in each, and will read |
| all the stdout lines, then all the stderr lines. So the interleaving |
| may not be correct. In this case you might want to pass |
| stderr=cros_subprocess.STDOUT to the constructor. |
| |
| This feature is still useful for subprocesses where stderr is |
| rarely used and indicates an error. |
| |
| Note also that if you set stderr to STDOUT, then stderr will be empty |
| and the combined output will just be the same as stdout. |
| """ |
| |
| read_set = [] |
| write_set = [] |
| stdout = None # Return |
| stderr = None # Return |
| |
| if self.stdin: |
| # Flush stdio buffer. This might block, if the user has |
| # been writing to .stdin in an uncontrolled fashion. |
| self.stdin.flush() |
| if input: |
| write_set.append(self.stdin) |
| else: |
| self.stdin.close() |
| if self.stdout: |
| read_set.append(self.stdout) |
| stdout = [] |
| if self.stderr and self.stderr != self.stdout: |
| read_set.append(self.stderr) |
| stderr = [] |
| combined = [] |
| |
| input_offset = 0 |
| while read_set or write_set: |
| try: |
| rlist, wlist, _ = select.select(read_set, write_set, [], 0.2) |
| except select.error as e: |
| if e.args[0] == errno.EINTR: |
| continue |
| raise |
| |
| if not stay_alive: |
| self.terminate() |
| |
| if self.stdin in wlist: |
| # When select has indicated that the file is writable, |
| # we can write up to PIPE_BUF bytes without risk |
| # blocking. POSIX defines PIPE_BUF >= 512 |
| chunk = input[input_offset : input_offset + 512] |
| bytes_written = os.write(self.stdin.fileno(), chunk) |
| input_offset += bytes_written |
| if input_offset >= len(input): |
| self.stdin.close() |
| write_set.remove(self.stdin) |
| |
| if self.stdout in rlist: |
| data = "" |
| # We will get an error on read if the pty is closed |
| try: |
| data = os.read(self.stdout.fileno(), 1024) |
| except OSError: |
| pass |
| if data == "": |
| self.stdout.close() |
| read_set.remove(self.stdout) |
| else: |
| stdout.append(data) |
| combined.append(data) |
| if output: |
| output(sys.stdout, data) |
| if self.stderr in rlist: |
| data = "" |
| # We will get an error on read if the pty is closed |
| try: |
| data = os.read(self.stderr.fileno(), 1024) |
| except OSError: |
| pass |
| if data == "": |
| self.stderr.close() |
| read_set.remove(self.stderr) |
| else: |
| stderr.append(data) |
| combined.append(data) |
| if output: |
| output(sys.stderr, data) |
| |
| # All data exchanged. Translate lists into strings. |
| if stdout is not None: |
| stdout = ''.join(stdout) |
| else: |
| stdout = '' |
| if stderr is not None: |
| stderr = ''.join(stderr) |
| else: |
| stderr = '' |
| combined = ''.join(combined) |
| |
| # Translate newlines, if requested. We cannot let the file |
| # object do the translation: It is based on stdio, which is |
| # impossible to combine with select (unless forcing no |
| # buffering). |
| if self.universal_newlines and hasattr(file, 'newlines'): |
| if stdout: |
| stdout = self._translate_newlines(stdout) |
| if stderr: |
| stderr = self._translate_newlines(stderr) |
| |
| self.wait() |
| return (stdout, stderr, combined) |
| |
| |
| # Just being a unittest.TestCase gives us 14 public methods. Unless we |
| # disable this, we can only have 6 tests in a TestCase. That's not enough. |
| # |
| # pylint: disable=R0904 |
| |
| class TestSubprocess(unittest.TestCase): |
| """Our simple unit test for this module""" |
| |
| class MyOperation: |
| """Provides a operation that we can pass to Popen""" |
| def __init__(self, input_to_send=None): |
| """Constructor to set up the operation and possible input. |
| |
| Args: |
| input_to_send: a text string to send when we first get input. We will |
| add \r\n to the string. |
| """ |
| self.stdout_data = '' |
| self.stderr_data = '' |
| self.combined_data = '' |
| self.stdin_pipe = None |
| self._input_to_send = input_to_send |
| if input_to_send: |
| pipe = os.pipe() |
| self.stdin_read_pipe = pipe[0] |
| self._stdin_write_pipe = os.fdopen(pipe[1], 'w') |
| |
| def Output(self, stream, data): |
| """Output handler for Popen. Stores the data for later comparison""" |
| if stream == sys.stdout: |
| self.stdout_data += data |
| if stream == sys.stderr: |
| self.stderr_data += data |
| self.combined_data += data |
| |
| # Output the input string if we have one. |
| if self._input_to_send: |
| self._stdin_write_pipe.write(self._input_to_send + '\r\n') |
| self._stdin_write_pipe.flush() |
| |
| def _BasicCheck(self, plist, oper): |
| """Basic checks that the output looks sane.""" |
| self.assertEqual(plist[0], oper.stdout_data) |
| self.assertEqual(plist[1], oper.stderr_data) |
| self.assertEqual(plist[2], oper.combined_data) |
| |
| # The total length of stdout and stderr should equal the combined length |
| self.assertEqual(len(plist[0]) + len(plist[1]), len(plist[2])) |
| |
| def test_simple(self): |
| """Simple redirection: Get process list""" |
| oper = TestSubprocess.MyOperation() |
| plist = Popen(['ps']).CommunicateFilter(oper.Output) |
| self._BasicCheck(plist, oper) |
| |
| def test_stderr(self): |
| """Check stdout and stderr""" |
| oper = TestSubprocess.MyOperation() |
| cmd = 'echo fred >/dev/stderr && false || echo bad' |
| plist = Popen([cmd], shell=True).CommunicateFilter(oper.Output) |
| self._BasicCheck(plist, oper) |
| self.assertEqual(plist [0], 'bad\r\n') |
| self.assertEqual(plist [1], 'fred\r\n') |
| |
| def test_shell(self): |
| """Check with and without shell works""" |
| oper = TestSubprocess.MyOperation() |
| cmd = 'echo test >/dev/stderr' |
| self.assertRaises(OSError, Popen, [cmd], shell=False) |
| plist = Popen([cmd], shell=True).CommunicateFilter(oper.Output) |
| self._BasicCheck(plist, oper) |
| self.assertEqual(len(plist [0]), 0) |
| self.assertEqual(plist [1], 'test\r\n') |
| |
| def test_list_args(self): |
| """Check with and without shell works using list arguments""" |
| oper = TestSubprocess.MyOperation() |
| cmd = ['echo', 'test', '>/dev/stderr'] |
| plist = Popen(cmd, shell=False).CommunicateFilter(oper.Output) |
| self._BasicCheck(plist, oper) |
| self.assertEqual(plist [0], ' '.join(cmd[1:]) + '\r\n') |
| self.assertEqual(len(plist [1]), 0) |
| |
| oper = TestSubprocess.MyOperation() |
| |
| # this should be interpreted as 'echo' with the other args dropped |
| cmd = ['echo', 'test', '>/dev/stderr'] |
| plist = Popen(cmd, shell=True).CommunicateFilter(oper.Output) |
| self._BasicCheck(plist, oper) |
| self.assertEqual(plist [0], '\r\n') |
| |
| def test_cwd(self): |
| """Check we can change directory""" |
| for shell in (False, True): |
| oper = TestSubprocess.MyOperation() |
| plist = Popen('pwd', shell=shell, cwd='/tmp').CommunicateFilter(oper.Output) |
| self._BasicCheck(plist, oper) |
| self.assertEqual(plist [0], '/tmp\r\n') |
| |
| def test_env(self): |
| """Check we can change environment""" |
| for add in (False, True): |
| oper = TestSubprocess.MyOperation() |
| env = os.environ |
| if add: |
| env ['FRED'] = 'fred' |
| cmd = 'echo $FRED' |
| plist = Popen(cmd, shell=True, env=env).CommunicateFilter(oper.Output) |
| self._BasicCheck(plist, oper) |
| self.assertEqual(plist [0], add and 'fred\r\n' or '\r\n') |
| |
| def test_extra_args(self): |
| """Check we can't add extra arguments""" |
| self.assertRaises(ValueError, Popen, 'true', close_fds=False) |
| |
| def test_basic_input(self): |
| """Check that incremental input works |
| |
| We set up a subprocess which will prompt for name. When we see this prompt |
| we send the name as input to the process. It should then print the name |
| properly to stdout. |
| """ |
| oper = TestSubprocess.MyOperation('Flash') |
| prompt = 'What is your name?: ' |
| cmd = 'echo -n "%s"; read name; echo Hello $name' % prompt |
| plist = Popen([cmd], stdin=oper.stdin_read_pipe, |
| shell=True).CommunicateFilter(oper.Output) |
| self._BasicCheck(plist, oper) |
| self.assertEqual(len(plist [1]), 0) |
| self.assertEqual(plist [0], prompt + 'Hello Flash\r\r\n') |
| |
| def test_isatty(self): |
| """Check that ptys appear as terminals to the subprocess""" |
| oper = TestSubprocess.MyOperation() |
| cmd = ('if [ -t %d ]; then echo "terminal %d" >&%d; ' |
| 'else echo "not %d" >&%d; fi;') |
| both_cmds = '' |
| for fd in (1, 2): |
| both_cmds += cmd % (fd, fd, fd, fd, fd) |
| plist = Popen(both_cmds, shell=True).CommunicateFilter(oper.Output) |
| self._BasicCheck(plist, oper) |
| self.assertEqual(plist [0], 'terminal 1\r\n') |
| self.assertEqual(plist [1], 'terminal 2\r\n') |
| |
| # Now try with PIPE and make sure it is not a terminal |
| oper = TestSubprocess.MyOperation() |
| plist = Popen(both_cmds, stdout=subprocess.PIPE, stderr=subprocess.PIPE, |
| shell=True).CommunicateFilter(oper.Output) |
| self._BasicCheck(plist, oper) |
| self.assertEqual(plist [0], 'not 1\n') |
| self.assertEqual(plist [1], 'not 2\n') |
| |
| if __name__ == '__main__': |
| unittest.main() |