-
Notifications
You must be signed in to change notification settings - Fork 123
/
Copy pathasyncio.py
161 lines (128 loc) · 5.35 KB
/
asyncio.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
"""Event loop implementation that uses the `asyncio` standard module.
The `asyncio` module was added to python standard library on 3.4, and it
provides a pure python implementation of an event loop library. It is used
as a fallback in case pyuv is not available(on python implementations other
than CPython).
Earlier python versions are supported through the `trollius` package, which
is a backport of `asyncio` that works on Python 2.6+.
"""
from __future__ import absolute_import
import logging
import os
import sys
from collections import deque
try:
# For python 3.4+, use the standard library module
import asyncio
except (ImportError, SyntaxError):
# Fallback to trollius
import trollius as asyncio
from .base import BaseEventLoop
logger = logging.getLogger(__name__)
debug, info, warn = (logger.debug, logger.info, logger.warning,)
loop_cls = asyncio.SelectorEventLoop
if os.name == 'nt':
from asyncio.windows_utils import PipeHandle
import msvcrt
# On windows use ProactorEventLoop which support pipes and is backed by the
# more powerful IOCP facility
# NOTE: we override in the stdio case, because it doesn't work.
loop_cls = asyncio.ProactorEventLoop
class AsyncioEventLoop(BaseEventLoop, asyncio.Protocol,
asyncio.SubprocessProtocol):
"""`BaseEventLoop` subclass that uses `asyncio` as a backend."""
def connection_made(self, transport):
"""Used to signal `asyncio.Protocol` of a successful connection."""
self._transport = transport
self._raw_transport = transport
if isinstance(transport, asyncio.SubprocessTransport):
self._transport = transport.get_pipe_transport(0)
def connection_lost(self, exc):
"""Used to signal `asyncio.Protocol` of a lost connection."""
self._on_error(exc.args[0] if exc else 'EOF')
def data_received(self, data):
"""Used to signal `asyncio.Protocol` of incoming data."""
if self._on_data:
self._on_data(data)
return
self._queued_data.append(data)
def pipe_connection_lost(self, fd, exc):
"""Used to signal `asyncio.SubprocessProtocol` of a lost connection."""
self._on_error(exc.args[0] if exc else 'EOF')
def pipe_data_received(self, fd, data):
"""Used to signal `asyncio.SubprocessProtocol` of incoming data."""
if fd == 2: # stderr fd number
self._on_stderr(data)
elif self._on_data:
self._on_data(data)
else:
self._queued_data.append(data)
def process_exited(self):
"""Used to signal `asyncio.SubprocessProtocol` when the child exits."""
self._on_error('EOF')
def _init(self):
self._loop = loop_cls()
self._queued_data = deque()
self._fact = lambda: self
self._raw_transport = None
if os.name != 'nt':
self._child_watcher = asyncio.get_child_watcher()
self._child_watcher.attach_loop(self._loop)
def _connect_tcp(self, address, port):
coroutine = self._loop.create_connection(self._fact, address, port)
self._loop.run_until_complete(coroutine)
def _connect_socket(self, path):
if os.name == 'nt':
coroutine = self._loop.create_pipe_connection(self._fact, path)
else:
coroutine = self._loop.create_unix_connection(self._fact, path)
self._loop.run_until_complete(coroutine)
def _connect_stdio(self):
if os.name == 'nt':
pipe = PipeHandle(msvcrt.get_osfhandle(sys.stdin.fileno()))
else:
pipe = sys.stdin
coroutine = self._loop.connect_read_pipe(self._fact, pipe)
self._loop.run_until_complete(coroutine)
debug("native stdin connection successful")
# Make sure subprocesses don't clobber stdout,
# send the output to stderr instead.
rename_stdout = os.dup(sys.stdout.fileno())
os.dup2(sys.stderr.fileno(), sys.stdout.fileno())
if os.name == 'nt':
pipe = PipeHandle(msvcrt.get_osfhandle(rename_stdout))
else:
pipe = os.fdopen(rename_stdout, 'wb')
coroutine = self._loop.connect_write_pipe(self._fact, pipe)
self._loop.run_until_complete(coroutine)
debug("native stdout connection successful")
def _connect_child(self, argv):
coroutine = self._loop.subprocess_exec(self._fact, *argv)
self._loop.run_until_complete(coroutine)
def _start_reading(self):
pass
def _send(self, data):
self._transport.write(data)
def _run(self):
while self._queued_data:
self._on_data(self._queued_data.popleft())
self._loop.run_forever()
def _stop(self):
self._loop.stop()
def _close(self):
if self._raw_transport is not None:
self._raw_transport.close()
self._loop.close()
def _threadsafe_call(self, fn):
self._loop.call_soon_threadsafe(fn)
def _setup_signals(self, signals):
if os.name == 'nt':
# add_signal_handler is not supported in win32
self._signals = []
return
self._signals = list(signals)
for signum in self._signals:
self._loop.add_signal_handler(signum, self._on_signal, signum)
def _teardown_signals(self):
for signum in self._signals:
self._loop.remove_signal_handler(signum)