forked from AllenInstitute/AllenSDK
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathhttp_engine.py
More file actions
240 lines (192 loc) · 7.13 KB
/
http_engine.py
File metadata and controls
240 lines (192 loc) · 7.13 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
import functools
import os
import asyncio
import time
import warnings
import logging
from typing import Optional, Iterable, Callable, AsyncIterator, Awaitable
import requests
import aiohttp
import nest_asyncio
from tqdm.auto import tqdm
DEFAULT_TIMEOUT = 20 * 60 # seconds
DEFAULT_CHUNKSIZE = 1024 * 10 # bytes
class HttpEngine:
def __init__(
self,
scheme: str,
host: str,
timeout: float = DEFAULT_TIMEOUT,
chunksize: int = DEFAULT_CHUNKSIZE,
**kwargs
):
""" Simple tool for making streaming http requests.
Parameters
----------
scheme :
e.g "http" or "https"
host :
will be used as the base for request urls
timeout :
requests taking longer than this (in seconds) will raise a
`requests.Timeout` error. The clock on this timeout starts running
when the initial request is made.
chunksize :
When streaming data, how many bytes ought to be requested at once.
**kwargs :
unused. Defined here so that parameters can fall through from
subclasses
"""
self.scheme = scheme
self.host = host
self.timeout = timeout
self.chunksize = chunksize
def _build_url(self, route):
return f"{self.scheme}://{self.host}/{route}"
def stream(self, route):
""" Makes an http request and returns an iterator over the response.
Parameters
----------
route :
the http route (under this object's host) to request against.
"""
url = self._build_url(route)
start_time = time.perf_counter()
response = requests.get(url, stream=True)
response_b = None
if "Content-length" in response.headers:
response_b = float(response.headers["Content-length"])
size_message = f"{response_b / 1024 ** 2:3.3f}MiB" if response_b is not None else "potentially large"
logging.warning(f"downloading a {size_message} file from {url}")
progress = tqdm( unit="B", total=response_b, unit_scale=True, desc="Downloading")
for chunk in response.iter_content(self.chunksize):
if chunk: # filter out keep-alive new chunks
progress.update(len(chunk))
yield chunk
elapsed = time.perf_counter() - start_time
if elapsed > self.timeout:
raise requests.Timeout(f"Download took {elapsed} seconds, but timeout was set to {self.timeout}")
@staticmethod
def write_bytes(path: str, stream: Iterable[bytes]):
write_from_stream(path, stream)
AsyncStreamCallbackType = Callable[[AsyncIterator[bytes]], Awaitable[None]]
class AsyncHttpEngine(HttpEngine):
def __init__(
self,
scheme: str,
host: str,
session: Optional[aiohttp.ClientSession] = None,
**kwargs
):
""" Simple tool for making asynchronous streaming http requests.
Parameters
----------
scheme :
e.g "http" or "https"
host :
will be used as the base for request urls
session :
If provided, this preconstructed session will be used rather than
a new one. Keep in mind that AsyncHttpEngine closes its session
when it is garbage collected!
**kwargs :
Will be passed to parent.
"""
super(AsyncHttpEngine, self).__init__(scheme, host, **kwargs)
self._session = None
if session:
self._session = session
warnings.warn(
"Recieved preconstructed session, ignoring timeout parameter."
)
@property
def session(self):
if self._session is None:
self._session = aiohttp.ClientSession(
timeout=aiohttp.client.ClientTimeout(self.timeout)
)
return self._session
async def _stream_coroutine(
self,
route: str,
callback: AsyncStreamCallbackType
):
url = self._build_url(route)
async with self.session.get(url) as response:
await callback(response.content.iter_chunked(self.chunksize))
def stream(
self,
route: str
) -> Callable[[AsyncStreamCallbackType], Awaitable[None]]:
""" Returns a coroutine which
- makes an http request
- exposes internally an asynchronous iterator over the response
- takes a callback parameter, which should consume the iterator.
Parameters
----------
route :
the http route (under this object's host) to request against.
Notes
-----
To use this method, you will need an appropriate consumer. For
instance, If you want to write the streamed data to a local file, you
can use write_bytes_from_coroutine.
Examples
--------
>>> engine = AsyncHttpEngine("http", "examplehost")
>>> stream_coro = engine.stream("example/route")
>>> write_bytes_from_coroutine("example/file/path.txt", stream_coro)
"""
return functools.partial(self._stream_coroutine, route)
def __del__(self):
if getattr(self, "_session", None) is not None:
nest_asyncio.apply()
loop = asyncio.get_event_loop()
loop.run_until_complete(self._session.close())
@staticmethod
def write_bytes(
path: str,
coroutine: Callable[[AsyncStreamCallbackType], Awaitable[None]]):
write_bytes_from_coroutine(path, coroutine)
def write_bytes_from_coroutine(
path: str,
coroutine: Callable[[AsyncStreamCallbackType], Awaitable[None]]
):
""" Utility for streaming http from an asynchronous requester to a file.
Parameters
----------
path :
Write to this file
coroutine :
The source of the data. Needs to have a specific structure, namely:
- the first-position parameter of the coroutine ought to accept a
callback. This callback ought to itself be awaitable.
- within the coroutine, this callback ought to be called with a
single argument. That single argument should be an asynchronous
iterator.
Please see AsyncHttpEngine.stream (and
AsyncHttpEngine._stream_coroutine) for an example.
"""
os.makedirs(os.path.dirname(path), exist_ok=True)
async def callback(file_, iterable):
async for chunk in iterable:
file_.write(chunk)
async def wrapper():
with open(path, "wb") as file_:
callback_ = functools.partial(callback, file_)
await coroutine(callback_)
nest_asyncio.apply()
loop = asyncio.get_event_loop()
loop.run_until_complete(wrapper())
def write_from_stream(path: str, stream: Iterable[bytes]):
""" Write bytes to a file from an iterator
Parameters
----------
path :
write to this file
stream :
iterable yielding bytes to be written
"""
with open(path, "wb") as fil:
for chunk in stream:
fil.write(chunk)