26
26
27
27
# XXX temporary: a monkey-patched subprocess.Popen
28
28
if compat .PY34 :
29
- from . import tmp_subprocess
29
+ from .tmp_subprocess import _Popen
30
30
else :
31
- # Python 3.3 has a different version of Popen
32
- from . import tmp_subprocess33 as tmp_subprocess
31
+ # shows that we can fallback to an older version of subprocess.Popen
32
+ # safely: it will block, but asyncio will still work.
33
+ _Popen = subprocess .Popen
33
34
34
35
35
36
__all__ = ['SelectorEventLoop' ,
@@ -665,97 +666,108 @@ def _set_inheritable(fd, inheritable):
665
666
fcntl .fcntl (fd , fcntl .F_SETFD , old & ~ cloexec_flag )
666
667
667
668
668
- class _NonBlockingPopen (tmp_subprocess ._Popen ):
669
- """A modified Popen which performs IO operations using an event loop."""
670
- # TODO can we include the stdin trick in popen?
671
- def __init__ (self , loop , exec_waiter , watcher , * args , ** kwargs ):
672
- self ._loop = loop
673
- self ._watcher = watcher
674
- self ._exec_waiter = exec_waiter
675
- super ().__init__ (* args , ** kwargs )
676
-
677
- def _cleanup_on_exec_failure (self ):
678
- super ()._cleanup_on_exec_failure ()
679
- self ._exec_waiter = None
680
- self ._loop = None
681
- self ._watcher = None
682
-
683
- def _get_exec_err_pipe (self ):
684
- errpipe_read , errpipe_write = self ._loop ._socketpair ()
685
- errpipe_read .setblocking (False )
686
- _set_inheritable (errpipe_write .fileno (), False )
687
- return errpipe_read .detach (), errpipe_write .detach ()
669
+ if hasattr (_Popen , "_wait_exec_done" ):
670
+ class _NonBlockingPopen (_Popen ):
671
+ """A modified Popen which performs IO operations using an event loop."""
672
+ def __init__ (self , loop , exec_waiter , watcher , * args , ** kwargs ):
673
+ self ._loop = loop
674
+ self ._watcher = watcher
675
+ self ._exec_waiter = exec_waiter
676
+ super ().__init__ (* args , ** kwargs )
688
677
689
- def _wait_exec_done (self , orig_executable , cwd , errpipe_read ):
690
- errpipe_data = bytearray ()
691
- self ._loop .add_reader (errpipe_read , self ._read_errpipe ,
692
- orig_executable , cwd , errpipe_read , errpipe_data )
693
-
694
- def _read_errpipe (self , orig_executable , cwd , errpipe_read , errpipe_data ):
695
- try :
696
- part = os .read (errpipe_read , 50000 )
697
- except BlockingIOError :
698
- return
699
- except Exception as exc :
700
- self ._loop .remove_reader (errpipe_read )
701
- os .close (errpipe_read )
702
- self ._exec_waiter .set_exception (exc )
703
- self ._cleanup_on_exec_failure ()
704
- else :
705
- if part and len (errpipe_data ) <= 50000 :
706
- errpipe_data .extend (part )
678
+ def _cleanup_on_exec_failure (self ):
679
+ super ()._cleanup_on_exec_failure ()
680
+ self ._exec_waiter = None
681
+ self ._loop = None
682
+ self ._watcher = None
683
+
684
+ def _get_exec_err_pipe (self ):
685
+ errpipe_read , errpipe_write = self ._loop ._socketpair ()
686
+ errpipe_read .setblocking (False )
687
+ _set_inheritable (errpipe_write .fileno (), False )
688
+ return errpipe_read .detach (), errpipe_write .detach ()
689
+
690
+ def _wait_exec_done (self , orig_executable , cwd , errpipe_read ):
691
+ errpipe_data = bytearray ()
692
+ self ._loop .add_reader (errpipe_read , self ._read_errpipe ,
693
+ orig_executable , cwd , errpipe_read ,
694
+ errpipe_data )
695
+
696
+ def _read_errpipe (self , orig_executable , cwd , errpipe_read ,
697
+ errpipe_data ):
698
+ try :
699
+ part = os .read (errpipe_read , 50000 )
700
+ except BlockingIOError :
707
701
return
702
+ except Exception as exc :
703
+ self ._loop .remove_reader (errpipe_read )
704
+ os .close (errpipe_read )
705
+ self ._exec_waiter .set_exception (exc )
706
+ self ._cleanup_on_exec_failure ()
707
+ else :
708
+ if part and len (errpipe_data ) <= 50000 :
709
+ errpipe_data .extend (part )
710
+ return
708
711
709
- self ._loop .remove_reader (errpipe_read )
710
- os .close (errpipe_read )
712
+ self ._loop .remove_reader (errpipe_read )
713
+ os .close (errpipe_read )
711
714
712
- if errpipe_data :
713
- # asynchronously wait until the process terminated
714
- self ._watcher .add_child_handler (
715
- self .pid , self ._check_exec_result , orig_executable ,
716
- cwd , errpipe_data )
717
- else :
715
+ if errpipe_data :
716
+ # asynchronously wait until the process terminated
717
+ self ._watcher .add_child_handler (
718
+ self .pid , self ._check_exec_result , orig_executable ,
719
+ cwd , errpipe_data )
720
+ else :
721
+ if not self ._exec_waiter .cancelled ():
722
+ self ._exec_waiter .set_result (None )
723
+ self ._exec_waiter = None
724
+ self ._loop = None
725
+ self ._watcher = None
726
+
727
+ def _check_exec_result (self , pid , returncode , orig_executable , cwd ,
728
+ errpipe_data ):
729
+ try :
730
+ super ()._check_exec_result (orig_executable , cwd , errpipe_data )
731
+ except Exception as exc :
718
732
if not self ._exec_waiter .cancelled ():
719
- self ._exec_waiter .set_result (None )
720
- self ._exec_waiter = None
721
- self ._loop = None
722
- self ._watcher = None
723
-
724
- def _check_exec_result (self , pid , returncode , orig_executable , cwd ,
725
- errpipe_data ):
726
- try :
727
- super ()._check_exec_result (orig_executable , cwd , errpipe_data )
728
- except Exception as exc :
729
- if not self ._exec_waiter .cancelled ():
730
- self ._exec_waiter .set_exception (exc )
731
- self ._cleanup_on_exec_failure ()
733
+ self ._exec_waiter .set_exception (exc )
734
+ self ._cleanup_on_exec_failure ()
735
+ else :
736
+ _NonBlockingPopen = None
732
737
733
738
734
739
class _UnixSubprocessTransport (base_subprocess .BaseSubprocessTransport ):
735
740
@coroutine
736
741
def _start (self , args , shell , stdin , stdout , stderr , bufsize , ** kwargs ):
742
+ stdin_w = None
743
+ if stdin == subprocess .PIPE :
744
+ # Use a socket pair for stdin, since not all platforms
745
+ # support selecting read events on the write end of a
746
+ # socket (which we use in order to detect closing of the
747
+ # other end). Notably this is needed on AIX, and works
748
+ # just fine on other platforms.
749
+ stdin , stdin_w = self ._loop ._socketpair ()
750
+
751
+ # Mark the write end of the stdin pipe as non-inheritable,
752
+ # needed by close_fds=False on Python 3.3 and older
753
+ # (Python 3.4 implements the PEP 446, socketpair returns
754
+ # non-inheritable sockets)
755
+ _set_inheritable (stdin_w .fileno (), False )
756
+
737
757
with events .get_child_watcher () as watcher :
738
- stdin_w = None
739
- if stdin == subprocess .PIPE :
740
- # Use a socket pair for stdin, since not all platforms
741
- # support selecting read events on the write end of a
742
- # socket (which we use in order to detect closing of the
743
- # other end). Notably this is needed on AIX, and works
744
- # just fine on other platforms.
745
- stdin , stdin_w = self ._loop ._socketpair ()
746
-
747
- # Mark the write end of the stdin pipe as non-inheritable,
748
- # needed by close_fds=False on Python 3.3 and older
749
- # (Python 3.4 implements the PEP 446, socketpair returns
750
- # non-inheritable sockets)
751
- _set_inheritable (stdin_w .fileno (), False )
752
- exec_waiter = self ._loop .create_future ()
753
758
try :
754
- self ._proc = _NonBlockingPopen (
755
- self ._loop , exec_waiter , watcher , args , shell = shell ,
756
- stdin = stdin , stdout = stdout , stderr = stderr ,
757
- universal_newlines = False , bufsize = bufsize , ** kwargs )
758
- yield from exec_waiter
759
+ if _NonBlockingPopen :
760
+ exec_waiter = self ._loop .create_future ()
761
+ self ._proc = _NonBlockingPopen (
762
+ self ._loop , exec_waiter , watcher , args , shell = shell ,
763
+ stdin = stdin , stdout = stdout , stderr = stderr ,
764
+ universal_newlines = False , bufsize = bufsize , ** kwargs )
765
+ yield from exec_waiter
766
+ else :
767
+ self ._proc = subprocess .Popen (
768
+ args , shell = shell , stdin = stdin , stdout = stdout ,
769
+ stderr = stderr , universal_newlines = False ,
770
+ bufsize = bufsize , ** kwargs )
759
771
except :
760
772
self ._failed_before_start = True
761
773
# TODO stdin is probably closed by proc, but what about stdin_w
@@ -766,9 +778,10 @@ def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
766
778
else :
767
779
watcher .add_child_handler (self ._proc .pid ,
768
780
self ._child_watcher_callback )
769
- if stdin_w is not None :
770
- stdin .close ()
771
- self ._proc .stdin = open (stdin_w .detach (), 'wb' , buffering = bufsize )
781
+
782
+ if stdin_w is not None :
783
+ stdin .close ()
784
+ self ._proc .stdin = open (stdin_w .detach (), 'wb' , buffering = bufsize )
772
785
773
786
def _child_watcher_callback (self , pid , returncode ):
774
787
self ._loop .call_soon_threadsafe (self ._process_exited , returncode )
0 commit comments