diff --git a/ethpm/backends/base.py b/ethpm/backends/base.py index 8735b39..0acda50 100644 --- a/ethpm/backends/base.py +++ b/ethpm/backends/base.py @@ -34,3 +34,30 @@ def fetch_uri_contents(self, uri: URI) -> Union[bytes, URI]: Fetch the contents stored at a URI. """ pass + + +class AsyncBaseURIBackend(ABC): + @abstractmethod + def can_resolve_uri(self, uri: URI) -> bool: + """ + Return a bool indicating whether this backend class can + resolve the given URI to it's contents. + """ + pass + + @abstractmethod + def can_translate_uri(self, uri: URI) -> bool: + """ + Return a bool indicating whether this backend class can + translate the given URI to a corresponding content-addressed URI. + """ + pass + + @abstractmethod + async def fetch_uri_contents(self, uri: URI) -> Union[bytes, URI]: + """ + Fetch the contents stored at a URI. + """ + pass + + diff --git a/ethpm/backends/ipfs.py b/ethpm/backends/ipfs.py index 596672b..52986ff 100644 --- a/ethpm/backends/ipfs.py +++ b/ethpm/backends/ipfs.py @@ -3,11 +3,13 @@ from pathlib import Path from typing import Dict, List, Type +import asks from eth_utils import import_string, to_bytes import ipfshttpclient +import trio from ethpm import ASSETS_DIR -from ethpm.backends.base import BaseURIBackend +from ethpm.backends.base import BaseURIBackend, AsyncBaseURIBackend from ethpm.constants import ( DEFAULT_IPFS_BACKEND, INFURA_GATEWAY_MULTIADDR, @@ -87,6 +89,89 @@ def pin_assets(self, file_or_dir_path: Path) -> List[Dict[str, str]]: ) +# +# Async +# + +class AsyncBaseIPFSBackend(AsyncBaseURIBackend): + """ + Base class for all URIs with an IPFS scheme. + """ + + def can_resolve_uri(self, uri: str) -> bool: + """ + Return a bool indicating whether or not this backend + is capable of serving the content located at the URI. + """ + return is_ipfs_uri(uri) + + def can_translate_uri(self, uri: str) -> bool: + """ + Return False. IPFS URIs cannot be used to point + to another content-addressed URI. + """ + return False + + +class AsyncLocal(AsyncBaseIPFSBackend): + @property + def base_uri(self): + return "http://127.0.0.1:8080/ipfs" + + async def fetch_uri_contents(self, uri: str): + ipfs_hash = extract_ipfs_path_from_uri(uri) + try: + response = await asks.get(f"{self.base_uri}/{ipfs_hash}") + except: + raise CannotHandleURI + contents = response.content + + validation_hash = generate_file_hash(contents) + if validation_hash != ipfs_hash: + raise ValidationError( + f"Hashed IPFS contents retrieved from uri: {uri} do not match its content hash." + ) + return contents + + +class AsyncIPFS(AsyncBaseIPFSBackend): + @property + def base_uri(self): + return "https://ipfs.io/ipfs" + + async def fetch_uri_contents(self, uri: str): + ipfs_hash = extract_ipfs_path_from_uri(uri) + response = await asks.get(f"{self.base_uri}/{ipfs_hash}") + contents = response.content + + validation_hash = generate_file_hash(contents) + if validation_hash != ipfs_hash: + raise ValidationError( + f"Hashed IPFS contents retrieved from uri: {uri} do not match its content hash." + ) + return contents + + +class AsyncInfura(AsyncBaseIPFSBackend): + @property + def base_uri(self) -> str: + return "https://ipfs.infura.io:5001/ipfs" + + async def fetch_uri_contents(self, uri: str) -> bytes: + ipfs_hash = extract_ipfs_path_from_uri(uri) + response = await asks.get(f"{self.base_uri}/{ipfs_hash}") + contents = response.content + if contents == b'bad request\n': + raise CannotHandleURI + + validation_hash = generate_file_hash(contents) + if validation_hash != ipfs_hash: + raise ValidationError( + f"Hashed IPFS contents retrieved from uri: {uri} do not match its content hash." + ) + return contents + + class IPFSGatewayBackend(IPFSOverHTTPBackend): """ Backend class for all IPFS URIs served over the IPFS gateway. @@ -100,12 +185,7 @@ def base_uri(self) -> str: def pin_assets(self, file_or_dir_path: Path) -> List[Dict[str, str]]: raise CannotHandleURI( - "IPFS gateway is currently disabled, please use a different IPFS backend." - ) - - def fetch_uri_contents(self, uri: str) -> bytes: - raise CannotHandleURI( - "IPFS gateway is currently disabled, please use a different IPFS backend." + "IPFS gateway does not allow pinning assets, please use a different IPFS backend." ) diff --git a/ethpm/constants.py b/ethpm/constants.py index 8e58e14..08babd0 100644 --- a/ethpm/constants.py +++ b/ethpm/constants.py @@ -5,7 +5,7 @@ DEFAULT_IPFS_BACKEND = "ethpm.backends.ipfs.InfuraIPFSBackend" -IPFS_GATEWAY_PREFIX = "https://ipfs.io/ipfs/" +IPFS_GATEWAY_PREFIX = "/dns4/ipfs.io/tcp/443/https" # TODO Deprecate in favor of a better scheme for fetching registry URIs. # Please play nice and don't use this key for any shenanigans, thanks! diff --git a/ethpm/utils/backend.py b/ethpm/utils/backend.py index 8d9cc0d..a032b2f 100644 --- a/ethpm/utils/backend.py +++ b/ethpm/utils/backend.py @@ -4,12 +4,17 @@ from eth_typing import URI from eth_utils import to_tuple from ipfshttpclient.exceptions import ConnectionError +import trio from ethpm.backends.base import BaseURIBackend from ethpm.backends.http import GithubOverHTTPSBackend from ethpm.backends.ipfs import ( + AsyncIPFS, + AsyncInfura, + AsyncLocal, DummyIPFSBackend, InfuraIPFSBackend, + IPFSGatewayBackend, LocalIPFSBackend, get_ipfs_backend_class, ) @@ -18,16 +23,25 @@ URI_BACKENDS = [ InfuraIPFSBackend, + IPFSGatewayBackend, DummyIPFSBackend, LocalIPFSBackend, GithubOverHTTPSBackend, RegistryURIBackend, ] +ASYNC_URI_BACKENDS = [ + AsyncIPFS, + AsyncInfura, + AsyncLocal, +] logger = logging.getLogger("ethpm.utils.backend") def resolve_uri_contents(uri: URI, fingerprint: bool = None) -> bytes: + """ + synchronous fetching single supported c-a uri + """ resolvable_backends = get_resolvable_backends_for_uri(uri) if resolvable_backends: for backend in resolvable_backends: @@ -45,13 +59,44 @@ def resolve_uri_contents(uri: URI, fingerprint: bool = None) -> bytes: "Registry URIs must point to a resolvable content-addressed URI." ) package_id = RegistryURIBackend().fetch_uri_contents(uri) - return resolve_uri_contents(package_id, True) + return resolve_uri_contents(package_id, fingerprint=True) raise CannotHandleURI( f"URI: {uri} cannot be resolved by any of the available backends." ) +async def async_resolve_uris(uris): + """ + takes list of any supported content-addressed uris and returns dict {uri=> contents} + NO registry uris! + """ + results = {} + async with trio.open_nursery() as nursery: + for uri in uris: + nursery.start_soon(async_resolve_uri_contents, uri, results) + return results + + +async def async_resolve_uri_contents(uri, results): + async_backends = async_get_resolvable_backends_for_uri(uri) + send_channel, receive_channel = trio.open_memory_channel(0) + async def jockey(async_fn): + try: + await send_channel.send(await async_fn(uri)) + except CannotHandleURI: + pass + + async with trio.open_nursery() as nursery: + for backend in async_backends: + nursery.start_soon(jockey, backend().fetch_uri_contents) + # will this hang if no backends can serve uri? + winner = await receive_channel.receive() + nursery.cancel_scope.cancel() + # mutation acceptable here? + results[uri] = winner + + @to_tuple def get_translatable_backends_for_uri( uri: URI @@ -64,6 +109,18 @@ def get_translatable_backends_for_uri( except ConnectionError: logger.debug("No local IPFS node available on port 5001.", exc_info=True) +@to_tuple +def async_get_resolvable_backends_for_uri( + uri: URI +) -> Generator[Type[BaseURIBackend], None, None]: + for backend_class in ASYNC_URI_BACKENDS: + try: + if backend_class().can_resolve_uri(uri): # type: ignore + yield backend_class + except ConnectionError: + logger.debug( + "No local IPFS node available on port 5001.", exc_info=True + ) @to_tuple def get_resolvable_backends_for_uri( diff --git a/setup.py b/setup.py index 30d5490..b771a39 100644 --- a/setup.py +++ b/setup.py @@ -56,11 +56,13 @@ url='https://github.com/ethpm/py-ethpm', include_package_data=True, install_requires=[ + 'asks>=2.3.5,<3', 'eth-utils>=1.6.0,<2', 'ipfshttpclient>=0.4.12,<1', 'jsonschema>=2.6.0,<3', 'protobuf>=3.0.0,<4', 'rlp>=1.0.1,<2', + "trio>=0.11,<0.12", 'web3[tester]>=5.0.0b1,<6', ], setup_requires=['setuptools-markdown'],