|
1 | 1 | import io
|
2 |
| -import time |
| 2 | +import os |
| 3 | +import platform |
3 | 4 | import weakref
|
| 5 | +from functools import lru_cache |
| 6 | +from pathlib import Path |
| 7 | +import warnings |
4 | 8 |
|
5 |
| -import asyncio |
6 | 9 | import aiohttp
|
7 | 10 |
|
8 | 11 | from fsspec.asyn import AsyncFileSystem, sync, sync_wrapper
|
9 | 12 | from fsspec.exceptions import FSTimeoutError
|
10 | 13 |
|
11 |
| -from .utils import get_default_gateways |
12 |
| - |
13 | 14 | import logging
|
14 | 15 |
|
15 | 16 | logger = logging.getLogger("ipfsspec")
|
@@ -138,129 +139,98 @@ def __str__(self):
|
138 | 139 | return f"GW({self.url})"
|
139 | 140 |
|
140 | 141 |
|
141 |
| -class GatewayState: |
142 |
| - def __init__(self): |
143 |
| - self.reachable = True |
144 |
| - self.next_request_time = 0 |
145 |
| - self.backoff_time = 0 |
146 |
| - self.start_backoff = 1e-5 |
147 |
| - self.max_backoff = 5 |
148 |
| - |
149 |
| - def schedule_next(self): |
150 |
| - self.next_request_time = time.monotonic() + self.backoff_time |
151 |
| - |
152 |
| - def backoff(self): |
153 |
| - if self.backoff_time < self.start_backoff: |
154 |
| - self.backoff_time = self.start_backoff |
155 |
| - else: |
156 |
| - self.backoff_time *= 2 |
157 |
| - self.reachable = True |
158 |
| - self.schedule_next() |
159 |
| - |
160 |
| - def speedup(self, not_below=0): |
161 |
| - did_speed_up = False |
162 |
| - if self.backoff_time > not_below: |
163 |
| - self.backoff_time *= 0.9 |
164 |
| - did_speed_up = True |
165 |
| - self.reachable = True |
166 |
| - self.schedule_next() |
167 |
| - return did_speed_up |
168 |
| - |
169 |
| - def broken(self): |
170 |
| - self.backoff_time = self.max_backoff |
171 |
| - self.reachable = False |
172 |
| - self.schedule_next() |
173 |
| - |
174 |
| - def trying_to_reach(self): |
175 |
| - self.next_request_time = time.monotonic() + 1 |
176 |
| - |
177 |
| - |
178 |
| -class MultiGateway(AsyncIPFSGatewayBase): |
179 |
| - def __init__(self, gws, max_backoff_rounds=50): |
180 |
| - self.gws = [(GatewayState(), gw) for gw in gws] |
181 |
| - self.max_backoff_rounds = max_backoff_rounds |
182 |
| - |
183 |
| - @property |
184 |
| - def _gws_in_priority_order(self): |
185 |
| - now = time.monotonic() |
186 |
| - return sorted(self.gws, key=lambda x: max(now, x[0].next_request_time)) |
187 |
| - |
188 |
| - async def _gw_op(self, op): |
189 |
| - for _ in range(self.max_backoff_rounds): |
190 |
| - for state, gw in self._gws_in_priority_order: |
191 |
| - not_before = state.next_request_time |
192 |
| - if not state.reachable: |
193 |
| - state.trying_to_reach() |
194 |
| - else: |
195 |
| - state.schedule_next() |
196 |
| - now = time.monotonic() |
197 |
| - if not_before > now: |
198 |
| - await asyncio.sleep(not_before - now) |
199 |
| - logger.debug("tring %s", gw) |
200 |
| - try: |
201 |
| - res = await op(gw) |
202 |
| - if state.speedup(time.monotonic() - now): |
203 |
| - logger.debug("%s speedup", gw) |
204 |
| - return res |
205 |
| - except FileNotFoundError: # early exit if object doesn't exist |
206 |
| - raise |
207 |
| - except (RequestsTooQuick, aiohttp.ClientResponseError, asyncio.TimeoutError) as e: |
208 |
| - state.backoff() |
209 |
| - logger.debug("%s backoff %s", gw, e) |
210 |
| - break |
211 |
| - except IOError as e: |
212 |
| - exception = e |
213 |
| - state.broken() |
214 |
| - logger.debug("%s broken", gw) |
215 |
| - continue |
216 |
| - else: |
217 |
| - raise exception |
218 |
| - raise RequestsTooQuick() |
219 |
| - |
220 |
| - async def api_get(self, endpoint, session, **kwargs): |
221 |
| - return await self._gw_op(lambda gw: gw.api_get(endpoint, session, **kwargs)) |
222 |
| - |
223 |
| - async def api_post(self, endpoint, session, **kwargs): |
224 |
| - return await self._gw_op(lambda gw: gw.api_post(endpoint, session, **kwargs)) |
225 |
| - |
226 |
| - async def cid_head(self, path, session, headers=None, **kwargs): |
227 |
| - return await self._gw_op(lambda gw: gw.cid_head(path, session, headers=headers, **kwargs)) |
228 |
| - |
229 |
| - async def cid_get(self, path, session, headers=None, **kwargs): |
230 |
| - return await self._gw_op(lambda gw: gw.cid_get(path, session, headers=headers, **kwargs)) |
231 |
| - |
232 |
| - async def cat(self, path, session): |
233 |
| - return await self._gw_op(lambda gw: gw.cat(path, session)) |
234 |
| - |
235 |
| - async def ls(self, path, session): |
236 |
| - return await self._gw_op(lambda gw: gw.ls(path, session)) |
237 |
| - |
238 |
| - def state_report(self): |
239 |
| - return "\n".join(f"{s.next_request_time}, {gw}" for s, gw in self.gws) |
240 |
| - |
241 |
| - def __str__(self): |
242 |
| - return "Multi-GW(" + ", ".join(str(gw) for _, gw in self.gws) + ")" |
243 |
| - |
244 |
| - |
245 | 142 | async def get_client(**kwargs):
|
246 | 143 | timeout = aiohttp.ClientTimeout(sock_connect=1, sock_read=5)
|
247 | 144 | kwargs = {"timeout": timeout, **kwargs}
|
248 | 145 | return aiohttp.ClientSession(**kwargs)
|
249 | 146 |
|
250 | 147 |
|
251 |
| -DEFAULT_GATEWAY = None |
| 148 | +def gateway_from_file(gateway_path): |
| 149 | + if gateway_path.exists(): |
| 150 | + with open(gateway_path) as gw_file: |
| 151 | + ipfs_gateway = gw_file.readline().strip() |
| 152 | + logger.debug("using IPFS gateway from %s: %s", gateway_path, ipfs_gateway) |
| 153 | + return AsyncIPFSGateway(ipfs_gateway) |
| 154 | + return None |
252 | 155 |
|
253 | 156 |
|
| 157 | +@lru_cache |
254 | 158 | def get_gateway():
|
255 |
| - global DEFAULT_GATEWAY |
256 |
| - if DEFAULT_GATEWAY is None: |
257 |
| - use_gateway(*get_default_gateways()) |
258 |
| - return DEFAULT_GATEWAY |
259 |
| - |
260 |
| - |
261 |
| -def use_gateway(*urls): |
262 |
| - global DEFAULT_GATEWAY |
263 |
| - DEFAULT_GATEWAY = MultiGateway([AsyncIPFSGateway(url) for url in urls]) |
| 159 | + """ |
| 160 | + Get IPFS gateway according to IPIP-280 |
| 161 | +
|
| 162 | + see: https://github.com/ipfs/specs/pull/280 |
| 163 | + """ |
| 164 | + |
| 165 | + # IPFS_GATEWAY environment variable should override everything |
| 166 | + ipfs_gateway = os.environ.get("IPFS_GATEWAY", "") |
| 167 | + if ipfs_gateway: |
| 168 | + logger.debug("using IPFS gateway from IPFS_GATEWAY environment variable: %s", ipfs_gateway) |
| 169 | + return AsyncIPFSGateway(ipfs_gateway) |
| 170 | + |
| 171 | + # internal configuration: accept IPFSSPEC_GATEWAYS for backwards compatibility |
| 172 | + if ipfsspec_gateways := os.environ.get("IPFSSPEC_GATEWAYS", ""): |
| 173 | + ipfs_gateway = ipfsspec_gateways.split()[0] |
| 174 | + logger.debug("using IPFS gateway from IPFSSPEC_GATEWAYS environment variable: %s", ipfs_gateway) |
| 175 | + warnings.warn("The IPFSSPEC_GATEWAYS environment variable is deprecated, please configure your IPFS Gateway according to IPIP-280, e.g. by using the IPFS_GATEWAY environment variable or using the ~/.ipfs/gateway file.", DeprecationWarning) |
| 176 | + return AsyncIPFSGateway(ipfs_gateway) |
| 177 | + |
| 178 | + # check various well-known files for possible gateway configurations |
| 179 | + if ipfs_path := os.environ.get("IPFS_PATH", ""): |
| 180 | + if ipfs_gateway := gateway_from_file(Path(ipfs_path) / "gateway"): |
| 181 | + return ipfs_gateway |
| 182 | + |
| 183 | + if home := os.environ.get("HOME", ""): |
| 184 | + if ipfs_gateway := gateway_from_file(Path(home) / ".ipfs" / "gateway"): |
| 185 | + return ipfs_gateway |
| 186 | + |
| 187 | + if config_home := os.environ.get("XDG_CONFIG_HOME", ""): |
| 188 | + if ipfs_gateway := gateway_from_file(Path(config_home) / "ipfs" / "gateway"): |
| 189 | + return ipfs_gateway |
| 190 | + |
| 191 | + if ipfs_gateway := gateway_from_file(Path("/etc") / "ipfs" / "gateway"): |
| 192 | + return ipfs_gateway |
| 193 | + |
| 194 | + system = platform.system() |
| 195 | + |
| 196 | + if system == "Windows": |
| 197 | + candidates = [ |
| 198 | + Path(os.environ.get("LOCALAPPDATA")) / "ipfs" / "gateway", |
| 199 | + Path(os.environ.get("APPDATA")) / "ipfs" / "gateway", |
| 200 | + Path(os.environ.get("PROGRAMDATA")) / "ipfs" / "gateway", |
| 201 | + ] |
| 202 | + elif system == "Darwin": |
| 203 | + candidates = [ |
| 204 | + Path(os.environ.get("HOME")) / "Library" / "Application Support" / "ipfs" / "gateway", |
| 205 | + Path("/Library") / "Application Support" / "ipfs" / "gateway", |
| 206 | + ] |
| 207 | + elif system == "Linux": |
| 208 | + candidates = [ |
| 209 | + Path(os.environ.get("HOME")) / ".config" / "ipfs" / "gateway", |
| 210 | + Path("/etc") / "ipfs" / "gateway", |
| 211 | + ] |
| 212 | + else: |
| 213 | + candidates = [] |
| 214 | + |
| 215 | + for candidate in candidates: |
| 216 | + if ipfs_gateway := gateway_from_file(candidate): |
| 217 | + return ipfs_gateway |
| 218 | + |
| 219 | + # if we reach this point, no gateway is configured |
| 220 | + raise RuntimeError("IPFS Gateway could not be found!\n" |
| 221 | + "In order to access IPFS, you must configure an " |
| 222 | + "IPFS Gateway using a IPIP-280 configuration method. " |
| 223 | + "Possible options are: \n" |
| 224 | + " * set the environment variable IPFS_GATEWAY\n" |
| 225 | + " * write a gateway in the first line of the file ~/.ipfs/gateway\n" |
| 226 | + "\n" |
| 227 | + "It's always best to run your own IPFS gateway, e.g. by using " |
| 228 | + "IPFS Desktop (https://docs.ipfs.tech/install/ipfs-desktop/) or " |
| 229 | + "the command line version Kubo (https://docs.ipfs.tech/install/command-line/). " |
| 230 | + "If you can't run your own gateway, you may also try using the " |
| 231 | + "public IPFS gateway at https://ipfs.io or https://dweb.link . " |
| 232 | + "However, this is not recommended for productive use and you may experience " |
| 233 | + "severe performance issues.") |
264 | 234 |
|
265 | 235 |
|
266 | 236 | class AsyncIPFSFileSystem(AsyncFileSystem):
|
|
0 commit comments