diff --git a/asyncio/base_subprocess.py b/asyncio/base_subprocess.py index 23742a16..0264028e 100644 --- a/asyncio/base_subprocess.py +++ b/asyncio/base_subprocess.py @@ -25,6 +25,7 @@ def __init__(self, loop, protocol, args, shell, self._pending_calls = collections.deque() self._pipes = {} self._finished = False + self._failed_before_start = False if stdin == subprocess.PIPE: self._pipes[0] = None @@ -34,25 +35,58 @@ def __init__(self, loop, protocol, args, shell, self._pipes[2] = None # Create the child process: set the _proc attribute + self._loop.create_task(self._create_child( + waiter, args=args, shell=shell, stdin=stdin, stdout=stdout, + stderr=stderr, bufsize=bufsize, start_kwargs=kwargs)) + + @coroutine + def _create_child(self, waiter, args, shell, stdin, stdout, stderr, + bufsize, start_kwargs): try: - self._start(args=args, shell=shell, stdin=stdin, stdout=stdout, - stderr=stderr, bufsize=bufsize, **kwargs) - except: - self.close() - raise + try: + start = self._start(args=args, shell=shell, stdin=stdin, + stdout=stdout, stderr=stderr, + bufsize=bufsize, **start_kwargs) + except: + self._failed_before_start = True + self.close() + raise - self._pid = self._proc.pid - self._extra['subprocess'] = self._proc + try: + if start is not None: + # _start is not required to be a coroutine + yield from start + except: + self.close() + raise - if self._loop.get_debug(): - if isinstance(args, (bytes, str)): - program = args - else: - program = args[0] - logger.debug('process %r created: pid %s', - program, self._pid) + self._pid = self._proc.pid + self._extra['subprocess'] = self._proc - self._loop.create_task(self._connect_pipes(waiter)) + if self._loop.get_debug(): + if isinstance(args, (bytes, str)): + program = args + else: + program = args[0] + logger.debug('process %r created: pid %s', program, self._pid) + + if self._closed: + # transport.close() may have been called concurrently, for + # instance if _make_subprocess_transport() has been cancelled. + if self._proc.stdin: + self._proc.stdin.close() + if self._proc.stdout: + self._proc.stdout.close() + if self._proc.stderr: + self._proc.stderr.close() + else: + yield from self._connect_pipes(waiter) + except Exception as exc: + if waiter is not None and not waiter.cancelled(): + waiter.set_exception(exc) + else: + if waiter is not None and not waiter.cancelled(): + waiter.set_result(None) def __repr__(self): info = [self.__class__.__name__] @@ -160,40 +194,33 @@ def kill(self): @coroutine def _connect_pipes(self, waiter): - try: - proc = self._proc - loop = self._loop - - if proc.stdin is not None: - _, pipe = yield from loop.connect_write_pipe( - lambda: WriteSubprocessPipeProto(self, 0), - proc.stdin) - self._pipes[0] = pipe - - if proc.stdout is not None: - _, pipe = yield from loop.connect_read_pipe( - lambda: ReadSubprocessPipeProto(self, 1), - proc.stdout) - self._pipes[1] = pipe - - if proc.stderr is not None: - _, pipe = yield from loop.connect_read_pipe( - lambda: ReadSubprocessPipeProto(self, 2), - proc.stderr) - self._pipes[2] = pipe - - assert self._pending_calls is not None - - loop.call_soon(self._protocol.connection_made, self) - for callback, data in self._pending_calls: - loop.call_soon(callback, *data) - self._pending_calls = None - except Exception as exc: - if waiter is not None and not waiter.cancelled(): - waiter.set_exception(exc) - else: - if waiter is not None and not waiter.cancelled(): - waiter.set_result(None) + proc = self._proc + loop = self._loop + + if proc.stdin is not None: + _, pipe = yield from loop.connect_write_pipe( + lambda: WriteSubprocessPipeProto(self, 0), + proc.stdin) + self._pipes[0] = pipe + + if proc.stdout is not None: + _, pipe = yield from loop.connect_read_pipe( + lambda: ReadSubprocessPipeProto(self, 1), + proc.stdout) + self._pipes[1] = pipe + + if proc.stderr is not None: + _, pipe = yield from loop.connect_read_pipe( + lambda: ReadSubprocessPipeProto(self, 2), + proc.stderr) + self._pipes[2] = pipe + + assert self._pending_calls is not None + + loop.call_soon(self._protocol.connection_made, self) + for callback, data in self._pending_calls: + loop.call_soon(callback, *data) + self._pending_calls = None def _call(self, cb, *data): if self._pending_calls is not None: @@ -233,6 +260,12 @@ def _wait(self): """Wait until the process exit and return the process return code. This method is a coroutine.""" + if self._failed_before_start: + # Let loop._make_subprocess_transport() call transport._wait() when + # an exception is raised asynchronously during the setup of the + # transport, it garantees that necessary cleanup will be performed. + return + if self._returncode is not None: return self._returncode diff --git a/asyncio/tmp_subprocess.py b/asyncio/tmp_subprocess.py new file mode 100644 index 00000000..a8bfd2bb --- /dev/null +++ b/asyncio/tmp_subprocess.py @@ -0,0 +1,301 @@ +import builtins +import errno +import io +import os +import subprocess +import threading +import warnings + +import _posixsubprocess +from subprocess import (SubprocessError, _PLATFORM_DEFAULT_CLOSE_FDS, PIPE, + _cleanup) + +_mswindows = msvcrt = False + + +class _Popen(subprocess.Popen): + def __init__(self, args, bufsize=-1, executable=None, + stdin=None, stdout=None, stderr=None, + preexec_fn=None, close_fds=_PLATFORM_DEFAULT_CLOSE_FDS, + shell=False, cwd=None, env=None, universal_newlines=False, + startupinfo=None, creationflags=0, + restore_signals=True, start_new_session=False, + pass_fds=()): + """Create new Popen instance.""" + _cleanup() + # Held while anything is calling waitpid before returncode has been + # updated to prevent clobbering returncode if wait() or poll() are + # called from multiple threads at once. After acquiring the lock, + # code must re-check self.returncode to see if another thread just + # finished a waitpid() call. + self._waitpid_lock = threading.Lock() + + self._input = None + self._communication_started = False + if bufsize is None: + bufsize = -1 # Restore default + if not isinstance(bufsize, int): + raise TypeError("bufsize must be an integer") + + if _mswindows: + if preexec_fn is not None: + raise ValueError("preexec_fn is not supported on Windows " + "platforms") + any_stdio_set = (stdin is not None or stdout is not None or + stderr is not None) + if close_fds is _PLATFORM_DEFAULT_CLOSE_FDS: + if any_stdio_set: + close_fds = False + else: + close_fds = True + elif close_fds and any_stdio_set: + raise ValueError( + "close_fds is not supported on Windows platforms" + " if you redirect stdin/stdout/stderr") + else: + # POSIX + if close_fds is _PLATFORM_DEFAULT_CLOSE_FDS: + close_fds = True + if pass_fds and not close_fds: + warnings.warn("pass_fds overriding close_fds.", RuntimeWarning) + close_fds = True + if startupinfo is not None: + raise ValueError("startupinfo is only supported on Windows " + "platforms") + if creationflags != 0: + raise ValueError("creationflags is only supported on Windows " + "platforms") + + self.args = args + self.stdin = None + self.stdout = None + self.stderr = None + self.pid = None + self.returncode = None + self.universal_newlines = universal_newlines + + # Input and output objects. The general principle is like + # this: + # + # Parent Child + # ------ ----- + # p2cwrite ---stdin---> p2cread + # c2pread <--stdout--- c2pwrite + # errread <--stderr--- errwrite + # + # On POSIX, the child objects are file descriptors. On + # Windows, these are Windows file handles. The parent objects + # are file descriptors on both platforms. The parent objects + # are -1 when not using PIPEs. The child objects are -1 + # when not redirecting. + + (p2cread, p2cwrite, + c2pread, c2pwrite, + errread, errwrite) = self._get_handles(stdin, stdout, stderr) + + # We wrap OS handles *before* launching the child, otherwise a + # quickly terminating child could make our fds unwrappable + # (see #8458). + + if _mswindows: + if p2cwrite != -1: + p2cwrite = msvcrt.open_osfhandle(p2cwrite.Detach(), 0) + if c2pread != -1: + c2pread = msvcrt.open_osfhandle(c2pread.Detach(), 0) + if errread != -1: + errread = msvcrt.open_osfhandle(errread.Detach(), 0) + + if p2cwrite != -1: + self.stdin = io.open(p2cwrite, 'wb', bufsize) + if universal_newlines: + self.stdin = io.TextIOWrapper(self.stdin, write_through=True, + line_buffering=(bufsize == 1)) + if c2pread != -1: + self.stdout = io.open(c2pread, 'rb', bufsize) + if universal_newlines: + self.stdout = io.TextIOWrapper(self.stdout) + if errread != -1: + self.stderr = io.open(errread, 'rb', bufsize) + if universal_newlines: + self.stderr = io.TextIOWrapper(self.stderr) + + self._child_pipes_to_close = set() + if stdin == PIPE: + self._child_pipes_to_close.add(p2cread) + if stdout == PIPE: + self._child_pipes_to_close.add(c2pwrite) + if stderr == PIPE: + self._child_pipes_to_close.add(errwrite) + if hasattr(self, '_devnull'): + self._child_pipes_to_close.add(self._devnull) + + try: + self._execute_child(args, executable, preexec_fn, close_fds, + pass_fds, cwd, env, + startupinfo, creationflags, shell, + p2cread, p2cwrite, + c2pread, c2pwrite, + errread, errwrite, + restore_signals, start_new_session) + except: + # Cleanup if the child failed starting. + self._cleanup_on_exec_failure() + raise + + def _cleanup_on_exec_failure(self): + for f in filter(None, (self.stdin, self.stdout, self.stderr)): + try: + f.close() + except OSError: + pass # Ignore EBADF or other errors. + + for fd in self._child_pipes_to_close: + try: + os.close(fd) + except OSError: + pass + + self._child_pipes_to_close.clear() + + if True: # XXX if unix + def _execute_child(self, args, executable, preexec_fn, close_fds, + pass_fds, cwd, env, + startupinfo, creationflags, shell, + p2cread, p2cwrite, + c2pread, c2pwrite, + errread, errwrite, + restore_signals, start_new_session): + """Execute program (POSIX version)""" + + if isinstance(args, (str, bytes)): + args = [args] + else: + args = list(args) + + if shell: + args = ["/bin/sh", "-c"] + args + if executable: + args[0] = executable + + if executable is None: + executable = args[0] + orig_executable = executable + + # For transferring possible exec failure from child to parent. + # Data format: "exception name:hex errno:description" + # Pickle is not used; it is complex and involves memory allocation. + errpipe_read, errpipe_write = self._get_exec_err_pipe() + # errpipe_write must not be in the standard io 0, 1, or 2 fd range. + low_fds_to_close = [] + while errpipe_write < 3: + low_fds_to_close.append(errpipe_write) + errpipe_write = os.dup(errpipe_write) + for low_fd in low_fds_to_close: + os.close(low_fd) + try: + try: + # We must avoid complex work that could involve + # malloc or free in the child process to avoid + # potential deadlocks, thus we do all this here. + # and pass it to fork_exec() + + if env is not None: + env_list = [os.fsencode(k) + b'=' + os.fsencode(v) + for k, v in env.items()] + else: + env_list = None # Use execv instead of execve. + executable = os.fsencode(executable) + if os.path.dirname(executable): + executable_list = (executable,) + else: + # This matches the behavior of os._execvpe(). + executable_list = tuple( + os.path.join(os.fsencode(dir), executable) + for dir in os.get_exec_path(env)) + fds_to_keep = set(pass_fds) + fds_to_keep.add(errpipe_write) + self.pid = _posixsubprocess.fork_exec( + args, executable_list, + close_fds, sorted(fds_to_keep), cwd, env_list, + p2cread, p2cwrite, c2pread, c2pwrite, + errread, errwrite, + errpipe_read, errpipe_write, + restore_signals, start_new_session, preexec_fn) + self._child_created = True + finally: + # be sure the FD is closed no matter what + os.close(errpipe_write) + + # self._devnull is not always defined. + to_close = set() + devnull_fd = getattr(self, '_devnull', None) + if p2cread != -1 and p2cwrite != -1 and p2cread != devnull_fd: + to_close.add(p2cread) + if c2pwrite != -1 and c2pread != -1 and c2pwrite != devnull_fd: + to_close.add(c2pwrite) + if errwrite != -1 and errread != -1 and errwrite != devnull_fd: + to_close.add(errwrite) + if devnull_fd is not None: + to_close.add(devnull_fd) + for fd in to_close: + os.close(fd) + # Prevent a double close of these fds from __init__ on error. + self._child_pipes_to_close.remove(fd) + except: + os.close(errpipe_read) + raise + + self._wait_exec_done(orig_executable, cwd, errpipe_read) + + def _get_exec_err_pipe(self): + return os.pipe() + + def _wait_exec_done(self, orig_executable, cwd, errpipe_read): + assert errpipe_read is not None + try: + # Wait for exec to fail or succeed; possibly raising an + # exception (limited in size) + errpipe_data = bytearray() + while True: + part = os.read(errpipe_read, 50000) + errpipe_data += part + if not part or len(errpipe_data) > 50000: + break + finally: + os.close(errpipe_read) + + if errpipe_data: + self._check_exec_result(orig_executable, cwd, errpipe_data) + + def _check_exec_result(self, orig_executable, cwd, errpipe_data): + try: + os.waitpid(self.pid, 0) + except ChildProcessError: + pass + try: + exception_name, hex_errno, err_msg = ( + errpipe_data.split(b':', 2)) + except ValueError: + exception_name = b'SubprocessError' + hex_errno = b'0' + err_msg = (b'Bad exception data from child: ' + + repr(errpipe_data)) + child_exception_type = getattr( + builtins, exception_name.decode('ascii'), + SubprocessError) + err_msg = err_msg.decode(errors="surrogatepass") + if issubclass(child_exception_type, OSError) and hex_errno: + errno_num = int(hex_errno, 16) + child_exec_never_called = (err_msg == "noexec") + if child_exec_never_called: + err_msg = "" + if errno_num != 0: + err_msg = os.strerror(errno_num) + if errno_num == errno.ENOENT: + if child_exec_never_called: + # The error must be from chdir(cwd). + err_msg += ': ' + repr(cwd) + else: + err_msg += ': ' + repr(orig_executable) + raise child_exception_type(errno_num, err_msg) + raise child_exception_type(err_msg) diff --git a/asyncio/unix_events.py b/asyncio/unix_events.py index 65b61db6..010e8472 100644 --- a/asyncio/unix_events.py +++ b/asyncio/unix_events.py @@ -24,6 +24,14 @@ from .coroutines import coroutine from .log import logger +# XXX temporary: a monkey-patched subprocess.Popen +if compat.PY34: + from .tmp_subprocess import _Popen +else: + # shows that we can fallback to an older version of subprocess.Popen + # safely: it will block, but asyncio will still work. + _Popen = subprocess.Popen + __all__ = ['SelectorEventLoop', 'AbstractChildWatcher', 'SafeChildWatcher', @@ -176,34 +184,27 @@ def _make_write_pipe_transport(self, pipe, protocol, waiter=None, def _make_subprocess_transport(self, protocol, args, shell, stdin, stdout, stderr, bufsize, extra=None, **kwargs): - with events.get_child_watcher() as watcher: - waiter = self.create_future() - transp = _UnixSubprocessTransport(self, protocol, args, shell, - stdin, stdout, stderr, bufsize, - waiter=waiter, extra=extra, - **kwargs) - - watcher.add_child_handler(transp.get_pid(), - self._child_watcher_callback, transp) - try: - yield from waiter - except Exception as exc: - # Workaround CPython bug #23353: using yield/yield-from in an - # except block of a generator doesn't clear properly - # sys.exc_info() - err = exc - else: - err = None + waiter = self.create_future() + transp = _UnixSubprocessTransport( + self, protocol, args, shell, stdin, stdout, stderr, bufsize, + waiter=waiter, extra=extra, **kwargs) - if err is not None: - transp.close() - yield from transp._wait() - raise err + try: + yield from waiter + except Exception as exc: + # Workaround CPython bug #23353: using yield/yield-from in an + # except block of a generator doesn't clear properly + # sys.exc_info() + err = exc + else: + err = None - return transp + if err is not None: + transp.close() + yield from transp._wait() + raise err - def _child_watcher_callback(self, pid, returncode, transp): - self.call_soon_threadsafe(transp._process_exited, returncode) + return transp @coroutine def create_unix_connection(self, protocol_factory, path, *, @@ -665,8 +666,78 @@ def _set_inheritable(fd, inheritable): fcntl.fcntl(fd, fcntl.F_SETFD, old & ~cloexec_flag) -class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport): +if hasattr(_Popen, "_wait_exec_done"): + class _NonBlockingPopen(_Popen): + """A modified Popen which performs IO operations using an event loop.""" + def __init__(self, loop, exec_waiter, watcher, *args, **kwargs): + self._loop = loop + self._watcher = watcher + self._exec_waiter = exec_waiter + super().__init__(*args, **kwargs) + + def _cleanup_on_exec_failure(self): + super()._cleanup_on_exec_failure() + self._exec_waiter = None + self._loop = None + self._watcher = None + + def _get_exec_err_pipe(self): + errpipe_read, errpipe_write = self._loop._socketpair() + errpipe_read.setblocking(False) + _set_inheritable(errpipe_write.fileno(), False) + return errpipe_read.detach(), errpipe_write.detach() + + def _wait_exec_done(self, orig_executable, cwd, errpipe_read): + errpipe_data = bytearray() + self._loop.add_reader(errpipe_read, self._read_errpipe, + orig_executable, cwd, errpipe_read, + errpipe_data) + + def _read_errpipe(self, orig_executable, cwd, errpipe_read, + errpipe_data): + try: + part = os.read(errpipe_read, 50000) + except BlockingIOError: + return + except Exception as exc: + self._loop.remove_reader(errpipe_read) + os.close(errpipe_read) + self._exec_waiter.set_exception(exc) + self._cleanup_on_exec_failure() + else: + if part and len(errpipe_data) <= 50000: + errpipe_data.extend(part) + return + self._loop.remove_reader(errpipe_read) + os.close(errpipe_read) + + if errpipe_data: + # asynchronously wait until the process terminated + self._watcher.add_child_handler( + self.pid, self._check_exec_result, orig_executable, + cwd, errpipe_data) + else: + if not self._exec_waiter.cancelled(): + self._exec_waiter.set_result(None) + self._exec_waiter = None + self._loop = None + self._watcher = None + + def _check_exec_result(self, pid, returncode, orig_executable, cwd, + errpipe_data): + try: + super()._check_exec_result(orig_executable, cwd, errpipe_data) + except Exception as exc: + if not self._exec_waiter.cancelled(): + self._exec_waiter.set_exception(exc) + self._cleanup_on_exec_failure() +else: + _NonBlockingPopen = None + + +class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport): + @coroutine def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs): stdin_w = None if stdin == subprocess.PIPE: @@ -682,13 +753,39 @@ def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs): # (Python 3.4 implements the PEP 446, socketpair returns # non-inheritable sockets) _set_inheritable(stdin_w.fileno(), False) - self._proc = subprocess.Popen( - args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr, - universal_newlines=False, bufsize=bufsize, **kwargs) + + with events.get_child_watcher() as watcher: + try: + if _NonBlockingPopen: + exec_waiter = self._loop.create_future() + self._proc = _NonBlockingPopen( + self._loop, exec_waiter, watcher, args, shell=shell, + stdin=stdin, stdout=stdout, stderr=stderr, + universal_newlines=False, bufsize=bufsize, **kwargs) + yield from exec_waiter + else: + self._proc = subprocess.Popen( + args, shell=shell, stdin=stdin, stdout=stdout, + stderr=stderr, universal_newlines=False, + bufsize=bufsize, **kwargs) + except: + self._failed_before_start = True + # TODO stdin is probably closed by proc, but what about stdin_w + # so far? check this + if stdin_w is not None: + stdin_w.close() + raise + else: + watcher.add_child_handler(self._proc.pid, + self._child_watcher_callback) + if stdin_w is not None: stdin.close() self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize) + def _child_watcher_callback(self, pid, returncode): + self._loop.call_soon_threadsafe(self._process_exited, returncode) + class AbstractChildWatcher: """Abstract base class for monitoring child processes. diff --git a/tests/test_subprocess.py b/tests/test_subprocess.py index bba688bb..0c6a39fa 100644 --- a/tests/test_subprocess.py +++ b/tests/test_subprocess.py @@ -1,3 +1,4 @@ +import multiprocessing import signal import sys import unittest @@ -6,6 +7,7 @@ import asyncio from asyncio import base_subprocess +from asyncio import compat from asyncio import subprocess from asyncio import test_utils try: @@ -52,8 +54,9 @@ def create_transport(self, waiter=None): def test_proc_exited(self): waiter = asyncio.Future(loop=self.loop) transport, protocol = self.create_transport(waiter) - transport._process_exited(6) self.loop.run_until_complete(waiter) + transport._process_exited(6) + test_utils.run_briefly(self.loop) self.assertEqual(transport.get_returncode(), 6) @@ -164,6 +167,42 @@ def test_terminate(self): else: self.assertEqual(-signal.SIGTERM, returncode) + @unittest.skipIf(sys.platform == 'win32', "Don't support preexec_fn") + def test_exception_in_preexec(self): + def raise_exception(): + raise Exception("custom exception") + + args = PROGRAM_BLOCKED + create = asyncio.create_subprocess_exec( + *args, preexec_fn=raise_exception, loop=self.loop) + with self.assertRaises(Exception) as ctx: + self.loop.run_until_complete(create) + + if compat.PY34: + from subprocess import SubprocessError + self.assertIsInstance(ctx.exception, SubprocessError) + else: + self.assertIsInstance(ctx.exception, RuntimeError) + self.assertEqual("Exception occurred in preexec_fn.", + str(ctx.exception)) + + @unittest.skipIf(sys.platform == 'win32', "Don't support preexec_fn") + def test_cancel_during_preexec(self): + lock = multiprocessing.Lock() + lock.acquire() + + def block(): + lock.acquire() + lock.release() + + create = asyncio.create_subprocess_shell( + 'exit 7', preexec_fn=block, loop=self.loop) + task = self.loop.create_task(create) + self.loop.call_soon(task.cancel) + self.loop.call_soon(lock.release) + with self.assertRaises(asyncio.CancelledError): + self.loop.run_until_complete(task) + @unittest.skipIf(sys.platform == 'win32', "Don't have SIGHUP") def test_send_signal(self): code = 'import time; print("sleeping", flush=True); time.sleep(3600)' @@ -447,7 +486,7 @@ def test_popen_error(self): if sys.platform == 'win32': target = 'asyncio.windows_utils.Popen' else: - target = 'subprocess.Popen' + target = 'asyncio.unix_events._NonBlockingPopen' with mock.patch(target) as popen: exc = ZeroDivisionError popen.side_effect = exc