From 8fef3fe7317390a3b1f61dfebaff541e23811633 Mon Sep 17 00:00:00 2001 From: Martin Richard Date: Mon, 26 Sep 2016 15:01:53 +0200 Subject: [PATCH 1/7] make fork + exec non blocking on unix --- asyncio/base_subprocess.py | 119 +++++++++------ asyncio/tmp_subprocess.py | 301 +++++++++++++++++++++++++++++++++++++ asyncio/unix_events.py | 183 ++++++++++++++++------ tests/test_subprocess.py | 34 ++++- 4 files changed, 541 insertions(+), 96 deletions(-) create mode 100644 asyncio/tmp_subprocess.py diff --git a/asyncio/base_subprocess.py b/asyncio/base_subprocess.py index 23742a16..05c8b1d9 100644 --- a/asyncio/base_subprocess.py +++ b/asyncio/base_subprocess.py @@ -34,25 +34,53 @@ def __init__(self, loop, protocol, args, shell, self._pipes[2] = None # Create the child process: set the _proc attribute + self._loop.create_task(self._create_child( + waiter, args=args, shell=shell, stdin=stdin, stdout=stdout, + stderr=stderr, bufsize=bufsize, start_kwargs=kwargs)) + + @coroutine + def _create_child(self, waiter, args, shell, stdin, stdout, stderr, + bufsize, start_kwargs): try: - self._start(args=args, shell=shell, stdin=stdin, stdout=stdout, - stderr=stderr, bufsize=bufsize, **kwargs) - except: - self.close() - raise + try: + start = self._start(args=args, shell=shell, stdin=stdin, + stdout=stdout, stderr=stderr, + bufsize=bufsize, **start_kwargs) + + if start is not None: + # _start is not required to be a coroutine + yield from start + except: + self.close() + raise - self._pid = self._proc.pid - self._extra['subprocess'] = self._proc + self._pid = self._proc.pid + self._extra['subprocess'] = self._proc - if self._loop.get_debug(): - if isinstance(args, (bytes, str)): - program = args + if self._loop.get_debug(): + if isinstance(args, (bytes, str)): + program = args + else: + program = args[0] + logger.debug('process %r created: pid %s', program, self._pid) + + if self._closed: + # transport.close() may have been called concurrently, for + # instance if _make_subprocess_transport() has been cancelled. + if self._proc.stdin: + self._proc.stdin.close() + if self._proc.stdout: + self._proc.stdout.close() + if self._proc.stderr: + self._proc.stderr.close() else: - program = args[0] - logger.debug('process %r created: pid %s', - program, self._pid) - - self._loop.create_task(self._connect_pipes(waiter)) + yield from self._connect_pipes(waiter) + except Exception as exc: + if waiter is not None and not waiter.cancelled(): + waiter.set_exception(exc) + else: + if waiter is not None and not waiter.cancelled(): + waiter.set_result(None) def __repr__(self): info = [self.__class__.__name__] @@ -160,40 +188,33 @@ def kill(self): @coroutine def _connect_pipes(self, waiter): - try: - proc = self._proc - loop = self._loop - - if proc.stdin is not None: - _, pipe = yield from loop.connect_write_pipe( - lambda: WriteSubprocessPipeProto(self, 0), - proc.stdin) - self._pipes[0] = pipe - - if proc.stdout is not None: - _, pipe = yield from loop.connect_read_pipe( - lambda: ReadSubprocessPipeProto(self, 1), - proc.stdout) - self._pipes[1] = pipe - - if proc.stderr is not None: - _, pipe = yield from loop.connect_read_pipe( - lambda: ReadSubprocessPipeProto(self, 2), - proc.stderr) - self._pipes[2] = pipe - - assert self._pending_calls is not None - - loop.call_soon(self._protocol.connection_made, self) - for callback, data in self._pending_calls: - loop.call_soon(callback, *data) - self._pending_calls = None - except Exception as exc: - if waiter is not None and not waiter.cancelled(): - waiter.set_exception(exc) - else: - if waiter is not None and not waiter.cancelled(): - waiter.set_result(None) + proc = self._proc + loop = self._loop + + if proc.stdin is not None: + _, pipe = yield from loop.connect_write_pipe( + lambda: WriteSubprocessPipeProto(self, 0), + proc.stdin) + self._pipes[0] = pipe + + if proc.stdout is not None: + _, pipe = yield from loop.connect_read_pipe( + lambda: ReadSubprocessPipeProto(self, 1), + proc.stdout) + self._pipes[1] = pipe + + if proc.stderr is not None: + _, pipe = yield from loop.connect_read_pipe( + lambda: ReadSubprocessPipeProto(self, 2), + proc.stderr) + self._pipes[2] = pipe + + assert self._pending_calls is not None + + loop.call_soon(self._protocol.connection_made, self) + for callback, data in self._pending_calls: + loop.call_soon(callback, *data) + self._pending_calls = None def _call(self, cb, *data): if self._pending_calls is not None: diff --git a/asyncio/tmp_subprocess.py b/asyncio/tmp_subprocess.py new file mode 100644 index 00000000..a8bfd2bb --- /dev/null +++ b/asyncio/tmp_subprocess.py @@ -0,0 +1,301 @@ +import builtins +import errno +import io +import os +import subprocess +import threading +import warnings + +import _posixsubprocess +from subprocess import (SubprocessError, _PLATFORM_DEFAULT_CLOSE_FDS, PIPE, + _cleanup) + +_mswindows = msvcrt = False + + +class _Popen(subprocess.Popen): + def __init__(self, args, bufsize=-1, executable=None, + stdin=None, stdout=None, stderr=None, + preexec_fn=None, close_fds=_PLATFORM_DEFAULT_CLOSE_FDS, + shell=False, cwd=None, env=None, universal_newlines=False, + startupinfo=None, creationflags=0, + restore_signals=True, start_new_session=False, + pass_fds=()): + """Create new Popen instance.""" + _cleanup() + # Held while anything is calling waitpid before returncode has been + # updated to prevent clobbering returncode if wait() or poll() are + # called from multiple threads at once. After acquiring the lock, + # code must re-check self.returncode to see if another thread just + # finished a waitpid() call. + self._waitpid_lock = threading.Lock() + + self._input = None + self._communication_started = False + if bufsize is None: + bufsize = -1 # Restore default + if not isinstance(bufsize, int): + raise TypeError("bufsize must be an integer") + + if _mswindows: + if preexec_fn is not None: + raise ValueError("preexec_fn is not supported on Windows " + "platforms") + any_stdio_set = (stdin is not None or stdout is not None or + stderr is not None) + if close_fds is _PLATFORM_DEFAULT_CLOSE_FDS: + if any_stdio_set: + close_fds = False + else: + close_fds = True + elif close_fds and any_stdio_set: + raise ValueError( + "close_fds is not supported on Windows platforms" + " if you redirect stdin/stdout/stderr") + else: + # POSIX + if close_fds is _PLATFORM_DEFAULT_CLOSE_FDS: + close_fds = True + if pass_fds and not close_fds: + warnings.warn("pass_fds overriding close_fds.", RuntimeWarning) + close_fds = True + if startupinfo is not None: + raise ValueError("startupinfo is only supported on Windows " + "platforms") + if creationflags != 0: + raise ValueError("creationflags is only supported on Windows " + "platforms") + + self.args = args + self.stdin = None + self.stdout = None + self.stderr = None + self.pid = None + self.returncode = None + self.universal_newlines = universal_newlines + + # Input and output objects. The general principle is like + # this: + # + # Parent Child + # ------ ----- + # p2cwrite ---stdin---> p2cread + # c2pread <--stdout--- c2pwrite + # errread <--stderr--- errwrite + # + # On POSIX, the child objects are file descriptors. On + # Windows, these are Windows file handles. The parent objects + # are file descriptors on both platforms. The parent objects + # are -1 when not using PIPEs. The child objects are -1 + # when not redirecting. + + (p2cread, p2cwrite, + c2pread, c2pwrite, + errread, errwrite) = self._get_handles(stdin, stdout, stderr) + + # We wrap OS handles *before* launching the child, otherwise a + # quickly terminating child could make our fds unwrappable + # (see #8458). + + if _mswindows: + if p2cwrite != -1: + p2cwrite = msvcrt.open_osfhandle(p2cwrite.Detach(), 0) + if c2pread != -1: + c2pread = msvcrt.open_osfhandle(c2pread.Detach(), 0) + if errread != -1: + errread = msvcrt.open_osfhandle(errread.Detach(), 0) + + if p2cwrite != -1: + self.stdin = io.open(p2cwrite, 'wb', bufsize) + if universal_newlines: + self.stdin = io.TextIOWrapper(self.stdin, write_through=True, + line_buffering=(bufsize == 1)) + if c2pread != -1: + self.stdout = io.open(c2pread, 'rb', bufsize) + if universal_newlines: + self.stdout = io.TextIOWrapper(self.stdout) + if errread != -1: + self.stderr = io.open(errread, 'rb', bufsize) + if universal_newlines: + self.stderr = io.TextIOWrapper(self.stderr) + + self._child_pipes_to_close = set() + if stdin == PIPE: + self._child_pipes_to_close.add(p2cread) + if stdout == PIPE: + self._child_pipes_to_close.add(c2pwrite) + if stderr == PIPE: + self._child_pipes_to_close.add(errwrite) + if hasattr(self, '_devnull'): + self._child_pipes_to_close.add(self._devnull) + + try: + self._execute_child(args, executable, preexec_fn, close_fds, + pass_fds, cwd, env, + startupinfo, creationflags, shell, + p2cread, p2cwrite, + c2pread, c2pwrite, + errread, errwrite, + restore_signals, start_new_session) + except: + # Cleanup if the child failed starting. + self._cleanup_on_exec_failure() + raise + + def _cleanup_on_exec_failure(self): + for f in filter(None, (self.stdin, self.stdout, self.stderr)): + try: + f.close() + except OSError: + pass # Ignore EBADF or other errors. + + for fd in self._child_pipes_to_close: + try: + os.close(fd) + except OSError: + pass + + self._child_pipes_to_close.clear() + + if True: # XXX if unix + def _execute_child(self, args, executable, preexec_fn, close_fds, + pass_fds, cwd, env, + startupinfo, creationflags, shell, + p2cread, p2cwrite, + c2pread, c2pwrite, + errread, errwrite, + restore_signals, start_new_session): + """Execute program (POSIX version)""" + + if isinstance(args, (str, bytes)): + args = [args] + else: + args = list(args) + + if shell: + args = ["/bin/sh", "-c"] + args + if executable: + args[0] = executable + + if executable is None: + executable = args[0] + orig_executable = executable + + # For transferring possible exec failure from child to parent. + # Data format: "exception name:hex errno:description" + # Pickle is not used; it is complex and involves memory allocation. + errpipe_read, errpipe_write = self._get_exec_err_pipe() + # errpipe_write must not be in the standard io 0, 1, or 2 fd range. + low_fds_to_close = [] + while errpipe_write < 3: + low_fds_to_close.append(errpipe_write) + errpipe_write = os.dup(errpipe_write) + for low_fd in low_fds_to_close: + os.close(low_fd) + try: + try: + # We must avoid complex work that could involve + # malloc or free in the child process to avoid + # potential deadlocks, thus we do all this here. + # and pass it to fork_exec() + + if env is not None: + env_list = [os.fsencode(k) + b'=' + os.fsencode(v) + for k, v in env.items()] + else: + env_list = None # Use execv instead of execve. + executable = os.fsencode(executable) + if os.path.dirname(executable): + executable_list = (executable,) + else: + # This matches the behavior of os._execvpe(). + executable_list = tuple( + os.path.join(os.fsencode(dir), executable) + for dir in os.get_exec_path(env)) + fds_to_keep = set(pass_fds) + fds_to_keep.add(errpipe_write) + self.pid = _posixsubprocess.fork_exec( + args, executable_list, + close_fds, sorted(fds_to_keep), cwd, env_list, + p2cread, p2cwrite, c2pread, c2pwrite, + errread, errwrite, + errpipe_read, errpipe_write, + restore_signals, start_new_session, preexec_fn) + self._child_created = True + finally: + # be sure the FD is closed no matter what + os.close(errpipe_write) + + # self._devnull is not always defined. + to_close = set() + devnull_fd = getattr(self, '_devnull', None) + if p2cread != -1 and p2cwrite != -1 and p2cread != devnull_fd: + to_close.add(p2cread) + if c2pwrite != -1 and c2pread != -1 and c2pwrite != devnull_fd: + to_close.add(c2pwrite) + if errwrite != -1 and errread != -1 and errwrite != devnull_fd: + to_close.add(errwrite) + if devnull_fd is not None: + to_close.add(devnull_fd) + for fd in to_close: + os.close(fd) + # Prevent a double close of these fds from __init__ on error. + self._child_pipes_to_close.remove(fd) + except: + os.close(errpipe_read) + raise + + self._wait_exec_done(orig_executable, cwd, errpipe_read) + + def _get_exec_err_pipe(self): + return os.pipe() + + def _wait_exec_done(self, orig_executable, cwd, errpipe_read): + assert errpipe_read is not None + try: + # Wait for exec to fail or succeed; possibly raising an + # exception (limited in size) + errpipe_data = bytearray() + while True: + part = os.read(errpipe_read, 50000) + errpipe_data += part + if not part or len(errpipe_data) > 50000: + break + finally: + os.close(errpipe_read) + + if errpipe_data: + self._check_exec_result(orig_executable, cwd, errpipe_data) + + def _check_exec_result(self, orig_executable, cwd, errpipe_data): + try: + os.waitpid(self.pid, 0) + except ChildProcessError: + pass + try: + exception_name, hex_errno, err_msg = ( + errpipe_data.split(b':', 2)) + except ValueError: + exception_name = b'SubprocessError' + hex_errno = b'0' + err_msg = (b'Bad exception data from child: ' + + repr(errpipe_data)) + child_exception_type = getattr( + builtins, exception_name.decode('ascii'), + SubprocessError) + err_msg = err_msg.decode(errors="surrogatepass") + if issubclass(child_exception_type, OSError) and hex_errno: + errno_num = int(hex_errno, 16) + child_exec_never_called = (err_msg == "noexec") + if child_exec_never_called: + err_msg = "" + if errno_num != 0: + err_msg = os.strerror(errno_num) + if errno_num == errno.ENOENT: + if child_exec_never_called: + # The error must be from chdir(cwd). + err_msg += ': ' + repr(cwd) + else: + err_msg += ': ' + repr(orig_executable) + raise child_exception_type(errno_num, err_msg) + raise child_exception_type(err_msg) diff --git a/asyncio/unix_events.py b/asyncio/unix_events.py index 65b61db6..3b853e75 100644 --- a/asyncio/unix_events.py +++ b/asyncio/unix_events.py @@ -24,6 +24,9 @@ from .coroutines import coroutine from .log import logger +# XXX temporary: a monkey-patched subprocess.Popen +from . import tmp_subprocess + __all__ = ['SelectorEventLoop', 'AbstractChildWatcher', 'SafeChildWatcher', @@ -176,34 +179,27 @@ def _make_write_pipe_transport(self, pipe, protocol, waiter=None, def _make_subprocess_transport(self, protocol, args, shell, stdin, stdout, stderr, bufsize, extra=None, **kwargs): - with events.get_child_watcher() as watcher: - waiter = self.create_future() - transp = _UnixSubprocessTransport(self, protocol, args, shell, - stdin, stdout, stderr, bufsize, - waiter=waiter, extra=extra, - **kwargs) - - watcher.add_child_handler(transp.get_pid(), - self._child_watcher_callback, transp) - try: - yield from waiter - except Exception as exc: - # Workaround CPython bug #23353: using yield/yield-from in an - # except block of a generator doesn't clear properly - # sys.exc_info() - err = exc - else: - err = None + waiter = self.create_future() + transp = _UnixSubprocessTransport( + self, protocol, args, shell, stdin, stdout, stderr, bufsize, + waiter=waiter, extra=extra, **kwargs) - if err is not None: - transp.close() - yield from transp._wait() - raise err + try: + yield from waiter + except Exception as exc: + # Workaround CPython bug #23353: using yield/yield-from in an + # except block of a generator doesn't clear properly + # sys.exc_info() + err = exc + else: + err = None - return transp + if err is not None: + transp.close() + yield from transp._wait() + raise err - def _child_watcher_callback(self, pid, returncode, transp): - self.call_soon_threadsafe(transp._process_exited, returncode) + return transp @coroutine def create_unix_connection(self, protocol_factory, path, *, @@ -665,29 +661,126 @@ def _set_inheritable(fd, inheritable): fcntl.fcntl(fd, fcntl.F_SETFD, old & ~cloexec_flag) +class _NonBlockingPopen(tmp_subprocess._Popen): + """A modified Popen which performs IO operations using an event loop.""" + # TODO can we include the stdin trick in popen? + def __init__(self, loop, exec_waiter, watcher, *args, **kwargs): + self._loop = loop + self._watcher = watcher + self._exec_waiter = exec_waiter + super().__init__(*args, **kwargs) + + def _cleanup_on_exec_failure(self): + super()._cleanup_on_exec_failure() + self._exec_waiter = None + self._loop = None + self._watcher = None + + def _get_exec_err_pipe(self): + errpipe_read, errpipe_write = self._loop._socketpair() + return errpipe_read.detach(), errpipe_write.detach() + + def _wait_exec_done(self, orig_executable, cwd, errpipe_read): + errpipe_data = bytearray() + self._loop.add_reader(errpipe_read, self._read_errpipe, + orig_executable, cwd, errpipe_read, errpipe_data) + + def _read_errpipe(self, orig_executable, cwd, errpipe_read, errpipe_data): + try: + part = os.read(errpipe_read, 50000) + except BlockingIOError: + return + except Exception as exc: + self._loop.remove_reader(errpipe_read) + os.close(errpipe_read) + self._exec_waiter.set_exception(exc) + self._cleanup_on_exec_failure() + else: + if part and len(errpipe_data) <= 50000: + errpipe_data.extend(part) + return + + self._loop.remove_reader(errpipe_read) + os.close(errpipe_read) + + if errpipe_data: + # asynchronously wait until the process terminated + self._watcher.add_child_handler( + self.pid, self._check_exec_result, orig_executable, + cwd, errpipe_data) + else: + if not self._exec_waiter.cancelled(): + self._exec_waiter.set_result(None) + self._exec_waiter = None + self._loop = None + self._watcher = None + + def _check_exec_result(self, pid, returncode, orig_executable, cwd, + errpipe_data): + try: + super()._check_exec_result(orig_executable, cwd, errpipe_data) + except Exception as exc: + if not self._exec_waiter.cancelled(): + self._exec_waiter.set_exception(exc) + self._cleanup_on_exec_failure() + + class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._failed_before_exec = False + @coroutine def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs): - stdin_w = None - if stdin == subprocess.PIPE: - # Use a socket pair for stdin, since not all platforms - # support selecting read events on the write end of a - # socket (which we use in order to detect closing of the - # other end). Notably this is needed on AIX, and works - # just fine on other platforms. - stdin, stdin_w = self._loop._socketpair() - - # Mark the write end of the stdin pipe as non-inheritable, - # needed by close_fds=False on Python 3.3 and older - # (Python 3.4 implements the PEP 446, socketpair returns - # non-inheritable sockets) - _set_inheritable(stdin_w.fileno(), False) - self._proc = subprocess.Popen( - args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr, - universal_newlines=False, bufsize=bufsize, **kwargs) - if stdin_w is not None: - stdin.close() - self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize) + with events.get_child_watcher() as watcher: + stdin_w = None + if stdin == subprocess.PIPE: + # Use a socket pair for stdin, since not all platforms + # support selecting read events on the write end of a + # socket (which we use in order to detect closing of the + # other end). Notably this is needed on AIX, and works + # just fine on other platforms. + stdin, stdin_w = self._loop._socketpair() + + # Mark the write end of the stdin pipe as non-inheritable, + # needed by close_fds=False on Python 3.3 and older + # (Python 3.4 implements the PEP 446, socketpair returns + # non-inheritable sockets) + _set_inheritable(stdin_w.fileno(), False) + exec_waiter = self._loop.create_future() + try: + self._proc = _NonBlockingPopen( + self._loop, exec_waiter, watcher, args, shell=shell, + stdin=stdin, stdout=stdout, stderr=stderr, + universal_newlines=False, bufsize=bufsize, **kwargs) + yield from exec_waiter + except: + self._failed_before_exec = True + # TODO stdin is probably closed by proc, but what about stdin_w + # so far? check this + if stdin_w is not None: + stdin_w.close() + raise + else: + watcher.add_child_handler(self._proc.pid, + self._child_watcher_callback) + if stdin_w is not None: + stdin.close() + self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize) + + def _child_watcher_callback(self, pid, returncode): + self._loop.call_soon_threadsafe(self._process_exited, returncode) + + @coroutine + def _wait(self): + if self._failed_before_exec: + # let loop._make_subprocess_transport() call transport._wait() when + # an excpetion is raised asynchronously during the setup of the + # transport, which garantees that necessary cleanup will be + # performed + return + else: + return (yield from super()._wait()) class AbstractChildWatcher: diff --git a/tests/test_subprocess.py b/tests/test_subprocess.py index bba688bb..3bd60411 100644 --- a/tests/test_subprocess.py +++ b/tests/test_subprocess.py @@ -1,8 +1,11 @@ +import multiprocessing import signal import sys +import time import unittest import warnings from unittest import mock +from subprocess import SubprocessError import asyncio from asyncio import base_subprocess @@ -52,8 +55,9 @@ def create_transport(self, waiter=None): def test_proc_exited(self): waiter = asyncio.Future(loop=self.loop) transport, protocol = self.create_transport(waiter) - transport._process_exited(6) self.loop.run_until_complete(waiter) + transport._process_exited(6) + test_utils.run_briefly(self.loop) self.assertEqual(transport.get_returncode(), 6) @@ -164,6 +168,32 @@ def test_terminate(self): else: self.assertEqual(-signal.SIGTERM, returncode) + def test_exception_in_preexec(self): + def raise_exception(): + raise Exception("custom exception") + + args = PROGRAM_BLOCKED + create = asyncio.create_subprocess_exec( + *args, preexec_fn=raise_exception, loop=self.loop) + with self.assertRaises(SubprocessError): + self.loop.run_until_complete(create) + + def test_cancel_during_preexec(self): + lock = multiprocessing.Lock() + lock.acquire() + + def block(): + lock.acquire() + lock.release() + + create = asyncio.create_subprocess_shell( + 'exit 7', preexec_fn=block, loop=self.loop) + task = self.loop.create_task(create) + self.loop.call_soon(task.cancel) + self.loop.call_soon(lock.release) + with self.assertRaises(asyncio.CancelledError): + self.loop.run_until_complete(task) + @unittest.skipIf(sys.platform == 'win32', "Don't have SIGHUP") def test_send_signal(self): code = 'import time; print("sleeping", flush=True); time.sleep(3600)' @@ -447,7 +477,7 @@ def test_popen_error(self): if sys.platform == 'win32': target = 'asyncio.windows_utils.Popen' else: - target = 'subprocess.Popen' + target = 'asyncio.unix_events._NonBlockingPopen' with mock.patch(target) as popen: exc = ZeroDivisionError popen.side_effect = exc From 6aa4e300b0d45a8cbf38ffa96aa473fddd580ebb Mon Sep 17 00:00:00 2001 From: Martin Richard Date: Tue, 27 Sep 2016 15:12:24 +0200 Subject: [PATCH 2/7] compatibility with Python 3.3 --- asyncio/tmp_subprocess33.py | 289 ++++++++++++++++++++++++++++++++++++ asyncio/unix_events.py | 7 +- tests/test_subprocess.py | 13 +- 3 files changed, 305 insertions(+), 4 deletions(-) create mode 100644 asyncio/tmp_subprocess33.py diff --git a/asyncio/tmp_subprocess33.py b/asyncio/tmp_subprocess33.py new file mode 100644 index 00000000..a35ba6bb --- /dev/null +++ b/asyncio/tmp_subprocess33.py @@ -0,0 +1,289 @@ +import builtins +import errno +import io +import os +import subprocess +import warnings + +import _posixsubprocess +from subprocess import (PIPE, _PLATFORM_DEFAULT_CLOSE_FDS, _create_pipe, + _cleanup, _eintr_retry_call, ) + +mswindows = msvcrt = False + + +# Popen for python 3.3 +class _Popen(subprocess.Popen): + def __init__(self, args, bufsize=-1, executable=None, + stdin=None, stdout=None, stderr=None, + preexec_fn=None, close_fds=_PLATFORM_DEFAULT_CLOSE_FDS, + shell=False, cwd=None, env=None, universal_newlines=False, + startupinfo=None, creationflags=0, + restore_signals=True, start_new_session=False, + pass_fds=()): + """Create new Popen instance.""" + _cleanup() + + self._input = None + self._communication_started = False + if bufsize is None: + bufsize = -1 # Restore default + if not isinstance(bufsize, int): + raise TypeError("bufsize must be an integer") + + if mswindows: + if preexec_fn is not None: + raise ValueError("preexec_fn is not supported on Windows " + "platforms") + any_stdio_set = (stdin is not None or stdout is not None or + stderr is not None) + if close_fds is _PLATFORM_DEFAULT_CLOSE_FDS: + if any_stdio_set: + close_fds = False + else: + close_fds = True + elif close_fds and any_stdio_set: + raise ValueError( + "close_fds is not supported on Windows platforms" + " if you redirect stdin/stdout/stderr") + else: + # POSIX + if close_fds is _PLATFORM_DEFAULT_CLOSE_FDS: + close_fds = True + if pass_fds and not close_fds: + warnings.warn("pass_fds overriding close_fds.", RuntimeWarning) + close_fds = True + if startupinfo is not None: + raise ValueError("startupinfo is only supported on Windows " + "platforms") + if creationflags != 0: + raise ValueError("creationflags is only supported on Windows " + "platforms") + + self.args = args + self.stdin = None + self.stdout = None + self.stderr = None + self.pid = None + self.returncode = None + self.universal_newlines = universal_newlines + + # Input and output objects. The general principle is like + # this: + # + # Parent Child + # ------ ----- + # p2cwrite ---stdin---> p2cread + # c2pread <--stdout--- c2pwrite + # errread <--stderr--- errwrite + # + # On POSIX, the child objects are file descriptors. On + # Windows, these are Windows file handles. The parent objects + # are file descriptors on both platforms. The parent objects + # are -1 when not using PIPEs. The child objects are -1 + # when not redirecting. + + (p2cread, p2cwrite, + c2pread, c2pwrite, + errread, errwrite) = self._get_handles(stdin, stdout, stderr) + + # We wrap OS handles *before* launching the child, otherwise a + # quickly terminating child could make our fds unwrappable + # (see #8458). + + if mswindows: + if p2cwrite != -1: + p2cwrite = msvcrt.open_osfhandle(p2cwrite.Detach(), 0) + if c2pread != -1: + c2pread = msvcrt.open_osfhandle(c2pread.Detach(), 0) + if errread != -1: + errread = msvcrt.open_osfhandle(errread.Detach(), 0) + + if p2cwrite != -1: + self.stdin = io.open(p2cwrite, 'wb', bufsize) + if universal_newlines: + self.stdin = io.TextIOWrapper(self.stdin, write_through=True) + if c2pread != -1: + self.stdout = io.open(c2pread, 'rb', bufsize) + if universal_newlines: + self.stdout = io.TextIOWrapper(self.stdout) + if errread != -1: + self.stderr = io.open(errread, 'rb', bufsize) + if universal_newlines: + self.stderr = io.TextIOWrapper(self.stderr) + + self._child_pipes_to_close = set() + if stdin == PIPE: + self._child_pipes_to_close.add(p2cread) + if stdout == PIPE: + self._child_pipes_to_close.add(c2pwrite) + if stderr == PIPE: + self._child_pipes_to_close.add(errwrite) + if hasattr(self, '_devnull'): + self._child_pipes_to_close.add(self._devnull) + + try: + self._execute_child(args, executable, preexec_fn, close_fds, + pass_fds, cwd, env, + startupinfo, creationflags, shell, + p2cread, p2cwrite, + c2pread, c2pwrite, + errread, errwrite, + restore_signals, start_new_session) + except: + # Cleanup if the child failed starting. + self._cleanup_on_exec_failure() + raise + + def _cleanup_on_exec_failure(self): + for f in filter(None, (self.stdin, self.stdout, self.stderr)): + try: + f.close() + except OSError: + pass # Ignore EBADF or other errors. + + for fd in self._child_pipes_to_close: + try: + os.close(fd) + except OSError: + pass + + self._child_pipes_to_close.clear() + + if True: # XXX if unix + def _execute_child(self, args, executable, preexec_fn, close_fds, + pass_fds, cwd, env, + startupinfo, creationflags, shell, + p2cread, p2cwrite, + c2pread, c2pwrite, + errread, errwrite, + restore_signals, start_new_session): + """Execute program (POSIX version)""" + + if isinstance(args, (str, bytes)): + args = [args] + else: + args = list(args) + + if shell: + args = ["/bin/sh", "-c"] + args + if executable: + args[0] = executable + + if executable is None: + executable = args[0] + orig_executable = executable + + # For transferring possible exec failure from child to parent. + # Data format: "exception name:hex errno:description" + # Pickle is not used; it is complex and involves memory allocation. + errpipe_read, errpipe_write = self._get_exec_err_pipe() + try: + try: + # We must avoid complex work that could involve + # malloc or free in the child process to avoid + # potential deadlocks, thus we do all this here. + # and pass it to fork_exec() + + if env is not None: + env_list = [os.fsencode(k) + b'=' + os.fsencode(v) + for k, v in env.items()] + else: + env_list = None # Use execv instead of execve. + executable = os.fsencode(executable) + if os.path.dirname(executable): + executable_list = (executable,) + else: + # This matches the behavior of os._execvpe(). + executable_list = tuple( + os.path.join(os.fsencode(dir), executable) + for dir in os.get_exec_path(env)) + fds_to_keep = set(pass_fds) + fds_to_keep.add(errpipe_write) + self.pid = _posixsubprocess.fork_exec( + args, executable_list, + close_fds, sorted(fds_to_keep), cwd, env_list, + p2cread, p2cwrite, c2pread, c2pwrite, + errread, errwrite, + errpipe_read, errpipe_write, + restore_signals, start_new_session, preexec_fn) + self._child_created = True + finally: + # be sure the FD is closed no matter what + os.close(errpipe_write) + + # self._devnull is not always defined. + devnull_fd = getattr(self, '_devnull', None) + to_close = set() + if p2cread != -1 and p2cwrite != -1 and p2cread != devnull_fd: + to_close.add(p2cread) + if c2pwrite != -1 and c2pread != -1 and c2pwrite != devnull_fd: + to_close.add(c2pwrite) + if errwrite != -1 and errread != -1 and errwrite != devnull_fd: + to_close.add(errwrite) + if devnull_fd is not None: + to_close.add(devnull_fd) + for fd in to_close: + os.close(fd) + # Prevent a double close of these fds from __init__ on error. + self._child_pipes_to_close.remove(fd) + except: + os.close(errpipe_read) + raise + + self._wait_exec_done(orig_executable, cwd, errpipe_read) + + def _get_exec_err_pipe(self): + return _create_pipe() + + def _wait_exec_done(self, orig_executable, cwd, errpipe_read): + assert errpipe_read is not None + try: + # Wait for exec to fail or succeed; possibly raising an + # exception (limited in size) + errpipe_data = bytearray() + while True: + part = _eintr_retry_call(os.read, errpipe_read, 50000) + errpipe_data += part + if not part or len(errpipe_data) > 50000: + break + finally: + # be sure the FD is closed no matter what + os.close(errpipe_read) + + if errpipe_data: + self._check_exec_result(orig_executable, cwd, errpipe_data) + + def _check_exec_result(self, orig_executable, cwd, errpipe_data): + try: + _eintr_retry_call(os.waitpid, self.pid, 0) + except OSError as e: + if e.errno != errno.ECHILD: + raise + try: + exception_name, hex_errno, err_msg = ( + errpipe_data.split(b':', 2)) + except ValueError: + exception_name = b'RuntimeError' + hex_errno = b'0' + err_msg = (b'Bad exception data from child: ' + + repr(errpipe_data)) + child_exception_type = getattr( + builtins, exception_name.decode('ascii'), + RuntimeError) + err_msg = err_msg.decode(errors="surrogatepass") + if issubclass(child_exception_type, OSError) and hex_errno: + errno_num = int(hex_errno, 16) + child_exec_never_called = (err_msg == "noexec") + if child_exec_never_called: + err_msg = "" + if errno_num != 0: + err_msg = os.strerror(errno_num) + if errno_num == errno.ENOENT: + if child_exec_never_called: + # The error must be from chdir(cwd). + err_msg += ': ' + repr(cwd) + else: + err_msg += ': ' + repr(orig_executable) + raise child_exception_type(errno_num, err_msg) + raise child_exception_type(err_msg) diff --git a/asyncio/unix_events.py b/asyncio/unix_events.py index 3b853e75..28d930ef 100644 --- a/asyncio/unix_events.py +++ b/asyncio/unix_events.py @@ -25,7 +25,11 @@ from .log import logger # XXX temporary: a monkey-patched subprocess.Popen -from . import tmp_subprocess +if compat.PY34: + from . import tmp_subprocess +else: + # Python 3.3 has a different version of Popen + from . import tmp_subprocess33 as tmp_subprocess __all__ = ['SelectorEventLoop', @@ -678,6 +682,7 @@ def _cleanup_on_exec_failure(self): def _get_exec_err_pipe(self): errpipe_read, errpipe_write = self._loop._socketpair() + _set_inheritable(errpipe_write.fileno(), False) return errpipe_read.detach(), errpipe_write.detach() def _wait_exec_done(self, orig_executable, cwd, errpipe_read): diff --git a/tests/test_subprocess.py b/tests/test_subprocess.py index 3bd60411..d6238b04 100644 --- a/tests/test_subprocess.py +++ b/tests/test_subprocess.py @@ -1,14 +1,13 @@ import multiprocessing import signal import sys -import time import unittest import warnings from unittest import mock -from subprocess import SubprocessError import asyncio from asyncio import base_subprocess +from asyncio import compat from asyncio import subprocess from asyncio import test_utils try: @@ -175,9 +174,17 @@ def raise_exception(): args = PROGRAM_BLOCKED create = asyncio.create_subprocess_exec( *args, preexec_fn=raise_exception, loop=self.loop) - with self.assertRaises(SubprocessError): + with self.assertRaises(Exception) as ctx: self.loop.run_until_complete(create) + if compat.PY34: + from subprocess import SubprocessError + self.assertIsInstance(ctx.exception, SubprocessError) + else: + self.assertIsInstance(ctx.exception, RuntimeError) + self.assertEqual("Exception occurred in preexec_fn.", + str(ctx.exception)) + def test_cancel_during_preexec(self): lock = multiprocessing.Lock() lock.acquire() From 71cf40645e41d83e89261df70197f2ed39945d03 Mon Sep 17 00:00:00 2001 From: Martin Richard Date: Wed, 5 Oct 2016 18:37:26 +0200 Subject: [PATCH 3/7] preexec_fn not supported on windows --- tests/test_subprocess.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/test_subprocess.py b/tests/test_subprocess.py index d6238b04..0c6a39fa 100644 --- a/tests/test_subprocess.py +++ b/tests/test_subprocess.py @@ -167,6 +167,7 @@ def test_terminate(self): else: self.assertEqual(-signal.SIGTERM, returncode) + @unittest.skipIf(sys.platform == 'win32', "Don't support preexec_fn") def test_exception_in_preexec(self): def raise_exception(): raise Exception("custom exception") @@ -185,6 +186,7 @@ def raise_exception(): self.assertEqual("Exception occurred in preexec_fn.", str(ctx.exception)) + @unittest.skipIf(sys.platform == 'win32', "Don't support preexec_fn") def test_cancel_during_preexec(self): lock = multiprocessing.Lock() lock.acquire() From a77c0ffe1a41e6fe076aaa87ea299a20ec80f8b5 Mon Sep 17 00:00:00 2001 From: Martin Richard Date: Thu, 6 Oct 2016 12:16:49 +0200 Subject: [PATCH 4/7] handle asynchronous exceptions when creating SubprocessTransport --- asyncio/base_subprocess.py | 12 ++++++++++++ asyncio/unix_events.py | 17 +---------------- 2 files changed, 13 insertions(+), 16 deletions(-) diff --git a/asyncio/base_subprocess.py b/asyncio/base_subprocess.py index 05c8b1d9..0264028e 100644 --- a/asyncio/base_subprocess.py +++ b/asyncio/base_subprocess.py @@ -25,6 +25,7 @@ def __init__(self, loop, protocol, args, shell, self._pending_calls = collections.deque() self._pipes = {} self._finished = False + self._failed_before_start = False if stdin == subprocess.PIPE: self._pipes[0] = None @@ -46,7 +47,12 @@ def _create_child(self, waiter, args, shell, stdin, stdout, stderr, start = self._start(args=args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr, bufsize=bufsize, **start_kwargs) + except: + self._failed_before_start = True + self.close() + raise + try: if start is not None: # _start is not required to be a coroutine yield from start @@ -254,6 +260,12 @@ def _wait(self): """Wait until the process exit and return the process return code. This method is a coroutine.""" + if self._failed_before_start: + # Let loop._make_subprocess_transport() call transport._wait() when + # an exception is raised asynchronously during the setup of the + # transport, it garantees that necessary cleanup will be performed. + return + if self._returncode is not None: return self._returncode diff --git a/asyncio/unix_events.py b/asyncio/unix_events.py index 28d930ef..e6d84023 100644 --- a/asyncio/unix_events.py +++ b/asyncio/unix_events.py @@ -731,10 +731,6 @@ def _check_exec_result(self, pid, returncode, orig_executable, cwd, class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self._failed_before_exec = False - @coroutine def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs): with events.get_child_watcher() as watcher: @@ -760,7 +756,7 @@ def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs): universal_newlines=False, bufsize=bufsize, **kwargs) yield from exec_waiter except: - self._failed_before_exec = True + self._failed_before_start = True # TODO stdin is probably closed by proc, but what about stdin_w # so far? check this if stdin_w is not None: @@ -776,17 +772,6 @@ def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs): def _child_watcher_callback(self, pid, returncode): self._loop.call_soon_threadsafe(self._process_exited, returncode) - @coroutine - def _wait(self): - if self._failed_before_exec: - # let loop._make_subprocess_transport() call transport._wait() when - # an excpetion is raised asynchronously during the setup of the - # transport, which garantees that necessary cleanup will be - # performed - return - else: - return (yield from super()._wait()) - class AbstractChildWatcher: """Abstract base class for monitoring child processes. From 0cf2e73a93c9d0b99698ae2fcceaad9fdf9e4bae Mon Sep 17 00:00:00 2001 From: Martin Richard Date: Tue, 11 Oct 2016 13:58:19 +0200 Subject: [PATCH 5/7] errpipe_read must be non-blocking --- asyncio/unix_events.py | 1 + 1 file changed, 1 insertion(+) diff --git a/asyncio/unix_events.py b/asyncio/unix_events.py index e6d84023..3239ee95 100644 --- a/asyncio/unix_events.py +++ b/asyncio/unix_events.py @@ -682,6 +682,7 @@ def _cleanup_on_exec_failure(self): def _get_exec_err_pipe(self): errpipe_read, errpipe_write = self._loop._socketpair() + errpipe_read.setblocking(False) _set_inheritable(errpipe_write.fileno(), False) return errpipe_read.detach(), errpipe_write.detach() From ef41fc838a00ce182f58bd36b487e2fa7039d4ce Mon Sep 17 00:00:00 2001 From: Martin Richard Date: Wed, 9 Nov 2016 11:47:28 +0100 Subject: [PATCH 6/7] make asyncio backward compatible with older Popen --- asyncio/unix_events.py | 183 ++++++++++++++++++++++------------------- 1 file changed, 98 insertions(+), 85 deletions(-) diff --git a/asyncio/unix_events.py b/asyncio/unix_events.py index 3239ee95..010e8472 100644 --- a/asyncio/unix_events.py +++ b/asyncio/unix_events.py @@ -26,10 +26,11 @@ # XXX temporary: a monkey-patched subprocess.Popen if compat.PY34: - from . import tmp_subprocess + from .tmp_subprocess import _Popen else: - # Python 3.3 has a different version of Popen - from . import tmp_subprocess33 as tmp_subprocess + # shows that we can fallback to an older version of subprocess.Popen + # safely: it will block, but asyncio will still work. + _Popen = subprocess.Popen __all__ = ['SelectorEventLoop', @@ -665,97 +666,108 @@ def _set_inheritable(fd, inheritable): fcntl.fcntl(fd, fcntl.F_SETFD, old & ~cloexec_flag) -class _NonBlockingPopen(tmp_subprocess._Popen): - """A modified Popen which performs IO operations using an event loop.""" - # TODO can we include the stdin trick in popen? - def __init__(self, loop, exec_waiter, watcher, *args, **kwargs): - self._loop = loop - self._watcher = watcher - self._exec_waiter = exec_waiter - super().__init__(*args, **kwargs) - - def _cleanup_on_exec_failure(self): - super()._cleanup_on_exec_failure() - self._exec_waiter = None - self._loop = None - self._watcher = None - - def _get_exec_err_pipe(self): - errpipe_read, errpipe_write = self._loop._socketpair() - errpipe_read.setblocking(False) - _set_inheritable(errpipe_write.fileno(), False) - return errpipe_read.detach(), errpipe_write.detach() +if hasattr(_Popen, "_wait_exec_done"): + class _NonBlockingPopen(_Popen): + """A modified Popen which performs IO operations using an event loop.""" + def __init__(self, loop, exec_waiter, watcher, *args, **kwargs): + self._loop = loop + self._watcher = watcher + self._exec_waiter = exec_waiter + super().__init__(*args, **kwargs) - def _wait_exec_done(self, orig_executable, cwd, errpipe_read): - errpipe_data = bytearray() - self._loop.add_reader(errpipe_read, self._read_errpipe, - orig_executable, cwd, errpipe_read, errpipe_data) - - def _read_errpipe(self, orig_executable, cwd, errpipe_read, errpipe_data): - try: - part = os.read(errpipe_read, 50000) - except BlockingIOError: - return - except Exception as exc: - self._loop.remove_reader(errpipe_read) - os.close(errpipe_read) - self._exec_waiter.set_exception(exc) - self._cleanup_on_exec_failure() - else: - if part and len(errpipe_data) <= 50000: - errpipe_data.extend(part) + def _cleanup_on_exec_failure(self): + super()._cleanup_on_exec_failure() + self._exec_waiter = None + self._loop = None + self._watcher = None + + def _get_exec_err_pipe(self): + errpipe_read, errpipe_write = self._loop._socketpair() + errpipe_read.setblocking(False) + _set_inheritable(errpipe_write.fileno(), False) + return errpipe_read.detach(), errpipe_write.detach() + + def _wait_exec_done(self, orig_executable, cwd, errpipe_read): + errpipe_data = bytearray() + self._loop.add_reader(errpipe_read, self._read_errpipe, + orig_executable, cwd, errpipe_read, + errpipe_data) + + def _read_errpipe(self, orig_executable, cwd, errpipe_read, + errpipe_data): + try: + part = os.read(errpipe_read, 50000) + except BlockingIOError: return + except Exception as exc: + self._loop.remove_reader(errpipe_read) + os.close(errpipe_read) + self._exec_waiter.set_exception(exc) + self._cleanup_on_exec_failure() + else: + if part and len(errpipe_data) <= 50000: + errpipe_data.extend(part) + return - self._loop.remove_reader(errpipe_read) - os.close(errpipe_read) + self._loop.remove_reader(errpipe_read) + os.close(errpipe_read) - if errpipe_data: - # asynchronously wait until the process terminated - self._watcher.add_child_handler( - self.pid, self._check_exec_result, orig_executable, - cwd, errpipe_data) - else: + if errpipe_data: + # asynchronously wait until the process terminated + self._watcher.add_child_handler( + self.pid, self._check_exec_result, orig_executable, + cwd, errpipe_data) + else: + if not self._exec_waiter.cancelled(): + self._exec_waiter.set_result(None) + self._exec_waiter = None + self._loop = None + self._watcher = None + + def _check_exec_result(self, pid, returncode, orig_executable, cwd, + errpipe_data): + try: + super()._check_exec_result(orig_executable, cwd, errpipe_data) + except Exception as exc: if not self._exec_waiter.cancelled(): - self._exec_waiter.set_result(None) - self._exec_waiter = None - self._loop = None - self._watcher = None - - def _check_exec_result(self, pid, returncode, orig_executable, cwd, - errpipe_data): - try: - super()._check_exec_result(orig_executable, cwd, errpipe_data) - except Exception as exc: - if not self._exec_waiter.cancelled(): - self._exec_waiter.set_exception(exc) - self._cleanup_on_exec_failure() + self._exec_waiter.set_exception(exc) + self._cleanup_on_exec_failure() +else: + _NonBlockingPopen = None class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport): @coroutine def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs): + stdin_w = None + if stdin == subprocess.PIPE: + # Use a socket pair for stdin, since not all platforms + # support selecting read events on the write end of a + # socket (which we use in order to detect closing of the + # other end). Notably this is needed on AIX, and works + # just fine on other platforms. + stdin, stdin_w = self._loop._socketpair() + + # Mark the write end of the stdin pipe as non-inheritable, + # needed by close_fds=False on Python 3.3 and older + # (Python 3.4 implements the PEP 446, socketpair returns + # non-inheritable sockets) + _set_inheritable(stdin_w.fileno(), False) + with events.get_child_watcher() as watcher: - stdin_w = None - if stdin == subprocess.PIPE: - # Use a socket pair for stdin, since not all platforms - # support selecting read events on the write end of a - # socket (which we use in order to detect closing of the - # other end). Notably this is needed on AIX, and works - # just fine on other platforms. - stdin, stdin_w = self._loop._socketpair() - - # Mark the write end of the stdin pipe as non-inheritable, - # needed by close_fds=False on Python 3.3 and older - # (Python 3.4 implements the PEP 446, socketpair returns - # non-inheritable sockets) - _set_inheritable(stdin_w.fileno(), False) - exec_waiter = self._loop.create_future() try: - self._proc = _NonBlockingPopen( - self._loop, exec_waiter, watcher, args, shell=shell, - stdin=stdin, stdout=stdout, stderr=stderr, - universal_newlines=False, bufsize=bufsize, **kwargs) - yield from exec_waiter + if _NonBlockingPopen: + exec_waiter = self._loop.create_future() + self._proc = _NonBlockingPopen( + self._loop, exec_waiter, watcher, args, shell=shell, + stdin=stdin, stdout=stdout, stderr=stderr, + universal_newlines=False, bufsize=bufsize, **kwargs) + yield from exec_waiter + else: + self._proc = subprocess.Popen( + args, shell=shell, stdin=stdin, stdout=stdout, + stderr=stderr, universal_newlines=False, + bufsize=bufsize, **kwargs) except: self._failed_before_start = True # TODO stdin is probably closed by proc, but what about stdin_w @@ -766,9 +778,10 @@ def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs): else: watcher.add_child_handler(self._proc.pid, self._child_watcher_callback) - if stdin_w is not None: - stdin.close() - self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize) + + if stdin_w is not None: + stdin.close() + self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize) def _child_watcher_callback(self, pid, returncode): self._loop.call_soon_threadsafe(self._process_exited, returncode) From b3a81e057ae161e67189e4768c62725593fee8cb Mon Sep 17 00:00:00 2001 From: Martin Richard Date: Wed, 9 Nov 2016 11:52:38 +0100 Subject: [PATCH 7/7] drop the mockey-patch of Popen in python 3.3 --- asyncio/tmp_subprocess33.py | 289 ------------------------------------ 1 file changed, 289 deletions(-) delete mode 100644 asyncio/tmp_subprocess33.py diff --git a/asyncio/tmp_subprocess33.py b/asyncio/tmp_subprocess33.py deleted file mode 100644 index a35ba6bb..00000000 --- a/asyncio/tmp_subprocess33.py +++ /dev/null @@ -1,289 +0,0 @@ -import builtins -import errno -import io -import os -import subprocess -import warnings - -import _posixsubprocess -from subprocess import (PIPE, _PLATFORM_DEFAULT_CLOSE_FDS, _create_pipe, - _cleanup, _eintr_retry_call, ) - -mswindows = msvcrt = False - - -# Popen for python 3.3 -class _Popen(subprocess.Popen): - def __init__(self, args, bufsize=-1, executable=None, - stdin=None, stdout=None, stderr=None, - preexec_fn=None, close_fds=_PLATFORM_DEFAULT_CLOSE_FDS, - shell=False, cwd=None, env=None, universal_newlines=False, - startupinfo=None, creationflags=0, - restore_signals=True, start_new_session=False, - pass_fds=()): - """Create new Popen instance.""" - _cleanup() - - self._input = None - self._communication_started = False - if bufsize is None: - bufsize = -1 # Restore default - if not isinstance(bufsize, int): - raise TypeError("bufsize must be an integer") - - if mswindows: - if preexec_fn is not None: - raise ValueError("preexec_fn is not supported on Windows " - "platforms") - any_stdio_set = (stdin is not None or stdout is not None or - stderr is not None) - if close_fds is _PLATFORM_DEFAULT_CLOSE_FDS: - if any_stdio_set: - close_fds = False - else: - close_fds = True - elif close_fds and any_stdio_set: - raise ValueError( - "close_fds is not supported on Windows platforms" - " if you redirect stdin/stdout/stderr") - else: - # POSIX - if close_fds is _PLATFORM_DEFAULT_CLOSE_FDS: - close_fds = True - if pass_fds and not close_fds: - warnings.warn("pass_fds overriding close_fds.", RuntimeWarning) - close_fds = True - if startupinfo is not None: - raise ValueError("startupinfo is only supported on Windows " - "platforms") - if creationflags != 0: - raise ValueError("creationflags is only supported on Windows " - "platforms") - - self.args = args - self.stdin = None - self.stdout = None - self.stderr = None - self.pid = None - self.returncode = None - self.universal_newlines = universal_newlines - - # Input and output objects. The general principle is like - # this: - # - # Parent Child - # ------ ----- - # p2cwrite ---stdin---> p2cread - # c2pread <--stdout--- c2pwrite - # errread <--stderr--- errwrite - # - # On POSIX, the child objects are file descriptors. On - # Windows, these are Windows file handles. The parent objects - # are file descriptors on both platforms. The parent objects - # are -1 when not using PIPEs. The child objects are -1 - # when not redirecting. - - (p2cread, p2cwrite, - c2pread, c2pwrite, - errread, errwrite) = self._get_handles(stdin, stdout, stderr) - - # We wrap OS handles *before* launching the child, otherwise a - # quickly terminating child could make our fds unwrappable - # (see #8458). - - if mswindows: - if p2cwrite != -1: - p2cwrite = msvcrt.open_osfhandle(p2cwrite.Detach(), 0) - if c2pread != -1: - c2pread = msvcrt.open_osfhandle(c2pread.Detach(), 0) - if errread != -1: - errread = msvcrt.open_osfhandle(errread.Detach(), 0) - - if p2cwrite != -1: - self.stdin = io.open(p2cwrite, 'wb', bufsize) - if universal_newlines: - self.stdin = io.TextIOWrapper(self.stdin, write_through=True) - if c2pread != -1: - self.stdout = io.open(c2pread, 'rb', bufsize) - if universal_newlines: - self.stdout = io.TextIOWrapper(self.stdout) - if errread != -1: - self.stderr = io.open(errread, 'rb', bufsize) - if universal_newlines: - self.stderr = io.TextIOWrapper(self.stderr) - - self._child_pipes_to_close = set() - if stdin == PIPE: - self._child_pipes_to_close.add(p2cread) - if stdout == PIPE: - self._child_pipes_to_close.add(c2pwrite) - if stderr == PIPE: - self._child_pipes_to_close.add(errwrite) - if hasattr(self, '_devnull'): - self._child_pipes_to_close.add(self._devnull) - - try: - self._execute_child(args, executable, preexec_fn, close_fds, - pass_fds, cwd, env, - startupinfo, creationflags, shell, - p2cread, p2cwrite, - c2pread, c2pwrite, - errread, errwrite, - restore_signals, start_new_session) - except: - # Cleanup if the child failed starting. - self._cleanup_on_exec_failure() - raise - - def _cleanup_on_exec_failure(self): - for f in filter(None, (self.stdin, self.stdout, self.stderr)): - try: - f.close() - except OSError: - pass # Ignore EBADF or other errors. - - for fd in self._child_pipes_to_close: - try: - os.close(fd) - except OSError: - pass - - self._child_pipes_to_close.clear() - - if True: # XXX if unix - def _execute_child(self, args, executable, preexec_fn, close_fds, - pass_fds, cwd, env, - startupinfo, creationflags, shell, - p2cread, p2cwrite, - c2pread, c2pwrite, - errread, errwrite, - restore_signals, start_new_session): - """Execute program (POSIX version)""" - - if isinstance(args, (str, bytes)): - args = [args] - else: - args = list(args) - - if shell: - args = ["/bin/sh", "-c"] + args - if executable: - args[0] = executable - - if executable is None: - executable = args[0] - orig_executable = executable - - # For transferring possible exec failure from child to parent. - # Data format: "exception name:hex errno:description" - # Pickle is not used; it is complex and involves memory allocation. - errpipe_read, errpipe_write = self._get_exec_err_pipe() - try: - try: - # We must avoid complex work that could involve - # malloc or free in the child process to avoid - # potential deadlocks, thus we do all this here. - # and pass it to fork_exec() - - if env is not None: - env_list = [os.fsencode(k) + b'=' + os.fsencode(v) - for k, v in env.items()] - else: - env_list = None # Use execv instead of execve. - executable = os.fsencode(executable) - if os.path.dirname(executable): - executable_list = (executable,) - else: - # This matches the behavior of os._execvpe(). - executable_list = tuple( - os.path.join(os.fsencode(dir), executable) - for dir in os.get_exec_path(env)) - fds_to_keep = set(pass_fds) - fds_to_keep.add(errpipe_write) - self.pid = _posixsubprocess.fork_exec( - args, executable_list, - close_fds, sorted(fds_to_keep), cwd, env_list, - p2cread, p2cwrite, c2pread, c2pwrite, - errread, errwrite, - errpipe_read, errpipe_write, - restore_signals, start_new_session, preexec_fn) - self._child_created = True - finally: - # be sure the FD is closed no matter what - os.close(errpipe_write) - - # self._devnull is not always defined. - devnull_fd = getattr(self, '_devnull', None) - to_close = set() - if p2cread != -1 and p2cwrite != -1 and p2cread != devnull_fd: - to_close.add(p2cread) - if c2pwrite != -1 and c2pread != -1 and c2pwrite != devnull_fd: - to_close.add(c2pwrite) - if errwrite != -1 and errread != -1 and errwrite != devnull_fd: - to_close.add(errwrite) - if devnull_fd is not None: - to_close.add(devnull_fd) - for fd in to_close: - os.close(fd) - # Prevent a double close of these fds from __init__ on error. - self._child_pipes_to_close.remove(fd) - except: - os.close(errpipe_read) - raise - - self._wait_exec_done(orig_executable, cwd, errpipe_read) - - def _get_exec_err_pipe(self): - return _create_pipe() - - def _wait_exec_done(self, orig_executable, cwd, errpipe_read): - assert errpipe_read is not None - try: - # Wait for exec to fail or succeed; possibly raising an - # exception (limited in size) - errpipe_data = bytearray() - while True: - part = _eintr_retry_call(os.read, errpipe_read, 50000) - errpipe_data += part - if not part or len(errpipe_data) > 50000: - break - finally: - # be sure the FD is closed no matter what - os.close(errpipe_read) - - if errpipe_data: - self._check_exec_result(orig_executable, cwd, errpipe_data) - - def _check_exec_result(self, orig_executable, cwd, errpipe_data): - try: - _eintr_retry_call(os.waitpid, self.pid, 0) - except OSError as e: - if e.errno != errno.ECHILD: - raise - try: - exception_name, hex_errno, err_msg = ( - errpipe_data.split(b':', 2)) - except ValueError: - exception_name = b'RuntimeError' - hex_errno = b'0' - err_msg = (b'Bad exception data from child: ' + - repr(errpipe_data)) - child_exception_type = getattr( - builtins, exception_name.decode('ascii'), - RuntimeError) - err_msg = err_msg.decode(errors="surrogatepass") - if issubclass(child_exception_type, OSError) and hex_errno: - errno_num = int(hex_errno, 16) - child_exec_never_called = (err_msg == "noexec") - if child_exec_never_called: - err_msg = "" - if errno_num != 0: - err_msg = os.strerror(errno_num) - if errno_num == errno.ENOENT: - if child_exec_never_called: - # The error must be from chdir(cwd). - err_msg += ': ' + repr(cwd) - else: - err_msg += ': ' + repr(orig_executable) - raise child_exception_type(errno_num, err_msg) - raise child_exception_type(err_msg)