Skip to content
This repository was archived by the owner on Nov 23, 2017. It is now read-only.

make fork + exec non blocking on unix #428

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 82 additions & 49 deletions asyncio/base_subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def __init__(self, loop, protocol, args, shell,
self._pending_calls = collections.deque()
self._pipes = {}
self._finished = False
self._failed_before_start = False

if stdin == subprocess.PIPE:
self._pipes[0] = None
Expand All @@ -34,25 +35,58 @@ def __init__(self, loop, protocol, args, shell,
self._pipes[2] = None

# Create the child process: set the _proc attribute
self._loop.create_task(self._create_child(
waiter, args=args, shell=shell, stdin=stdin, stdout=stdout,
stderr=stderr, bufsize=bufsize, start_kwargs=kwargs))

@coroutine
def _create_child(self, waiter, args, shell, stdin, stdout, stderr,
bufsize, start_kwargs):
try:
self._start(args=args, shell=shell, stdin=stdin, stdout=stdout,
stderr=stderr, bufsize=bufsize, **kwargs)
except:
self.close()
raise
try:
start = self._start(args=args, shell=shell, stdin=stdin,
stdout=stdout, stderr=stderr,
bufsize=bufsize, **start_kwargs)
except:
self._failed_before_start = True
self.close()
raise

self._pid = self._proc.pid
self._extra['subprocess'] = self._proc
try:
if start is not None:
# _start is not required to be a coroutine
yield from start
except:
self.close()
raise

if self._loop.get_debug():
if isinstance(args, (bytes, str)):
program = args
else:
program = args[0]
logger.debug('process %r created: pid %s',
program, self._pid)
self._pid = self._proc.pid
self._extra['subprocess'] = self._proc

self._loop.create_task(self._connect_pipes(waiter))
if self._loop.get_debug():
if isinstance(args, (bytes, str)):
program = args
else:
program = args[0]
logger.debug('process %r created: pid %s', program, self._pid)

if self._closed:
# transport.close() may have been called concurrently, for
# instance if _make_subprocess_transport() has been cancelled.
if self._proc.stdin:
self._proc.stdin.close()
if self._proc.stdout:
self._proc.stdout.close()
if self._proc.stderr:
self._proc.stderr.close()
else:
yield from self._connect_pipes(waiter)
except Exception as exc:
if waiter is not None and not waiter.cancelled():
waiter.set_exception(exc)
else:
if waiter is not None and not waiter.cancelled():
waiter.set_result(None)

def __repr__(self):
info = [self.__class__.__name__]
Expand Down Expand Up @@ -160,40 +194,33 @@ def kill(self):

@coroutine
def _connect_pipes(self, waiter):
try:
proc = self._proc
loop = self._loop

if proc.stdin is not None:
_, pipe = yield from loop.connect_write_pipe(
lambda: WriteSubprocessPipeProto(self, 0),
proc.stdin)
self._pipes[0] = pipe

if proc.stdout is not None:
_, pipe = yield from loop.connect_read_pipe(
lambda: ReadSubprocessPipeProto(self, 1),
proc.stdout)
self._pipes[1] = pipe

if proc.stderr is not None:
_, pipe = yield from loop.connect_read_pipe(
lambda: ReadSubprocessPipeProto(self, 2),
proc.stderr)
self._pipes[2] = pipe

assert self._pending_calls is not None

loop.call_soon(self._protocol.connection_made, self)
for callback, data in self._pending_calls:
loop.call_soon(callback, *data)
self._pending_calls = None
except Exception as exc:
if waiter is not None and not waiter.cancelled():
waiter.set_exception(exc)
else:
if waiter is not None and not waiter.cancelled():
waiter.set_result(None)
proc = self._proc
loop = self._loop

if proc.stdin is not None:
_, pipe = yield from loop.connect_write_pipe(
lambda: WriteSubprocessPipeProto(self, 0),
proc.stdin)
self._pipes[0] = pipe

if proc.stdout is not None:
_, pipe = yield from loop.connect_read_pipe(
lambda: ReadSubprocessPipeProto(self, 1),
proc.stdout)
self._pipes[1] = pipe

if proc.stderr is not None:
_, pipe = yield from loop.connect_read_pipe(
lambda: ReadSubprocessPipeProto(self, 2),
proc.stderr)
self._pipes[2] = pipe

assert self._pending_calls is not None

loop.call_soon(self._protocol.connection_made, self)
for callback, data in self._pending_calls:
loop.call_soon(callback, *data)
self._pending_calls = None

def _call(self, cb, *data):
if self._pending_calls is not None:
Expand Down Expand Up @@ -233,6 +260,12 @@ def _wait(self):
"""Wait until the process exit and return the process return code.

This method is a coroutine."""
if self._failed_before_start:
# Let loop._make_subprocess_transport() call transport._wait() when
# an exception is raised asynchronously during the setup of the
# transport, it garantees that necessary cleanup will be performed.
return

if self._returncode is not None:
return self._returncode

Expand Down
Loading