Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Async-ify fetching content-addressed uri contents #166

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions ethpm/backends/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,30 @@ def fetch_uri_contents(self, uri: URI) -> Union[bytes, URI]:
Fetch the contents stored at a URI.
"""
pass


class AsyncBaseURIBackend(ABC):
@abstractmethod
def can_resolve_uri(self, uri: URI) -> bool:
"""
Return a bool indicating whether this backend class can
resolve the given URI to it's contents.
"""
pass

@abstractmethod
def can_translate_uri(self, uri: URI) -> bool:
"""
Return a bool indicating whether this backend class can
translate the given URI to a corresponding content-addressed URI.
"""
pass

@abstractmethod
async def fetch_uri_contents(self, uri: URI) -> Union[bytes, URI]:
"""
Fetch the contents stored at a URI.
"""
pass


94 changes: 87 additions & 7 deletions ethpm/backends/ipfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
from pathlib import Path
from typing import Dict, List, Type

import asks
from eth_utils import import_string, to_bytes
import ipfshttpclient
import trio

from ethpm import ASSETS_DIR
from ethpm.backends.base import BaseURIBackend
from ethpm.backends.base import BaseURIBackend, AsyncBaseURIBackend
from ethpm.constants import (
DEFAULT_IPFS_BACKEND,
INFURA_GATEWAY_MULTIADDR,
Expand Down Expand Up @@ -87,6 +89,89 @@ def pin_assets(self, file_or_dir_path: Path) -> List[Dict[str, str]]:
)


#
# Async
#

class AsyncBaseIPFSBackend(AsyncBaseURIBackend):
"""
Base class for all URIs with an IPFS scheme.
"""

def can_resolve_uri(self, uri: str) -> bool:
"""
Return a bool indicating whether or not this backend
is capable of serving the content located at the URI.
"""
return is_ipfs_uri(uri)

def can_translate_uri(self, uri: str) -> bool:
"""
Return False. IPFS URIs cannot be used to point
to another content-addressed URI.
"""
return False


class AsyncLocal(AsyncBaseIPFSBackend):
@property
def base_uri(self):
return "http://127.0.0.1:8080/ipfs"

async def fetch_uri_contents(self, uri: str):
ipfs_hash = extract_ipfs_path_from_uri(uri)
try:
response = await asks.get(f"{self.base_uri}/{ipfs_hash}")
except:
raise CannotHandleURI
contents = response.content

validation_hash = generate_file_hash(contents)
if validation_hash != ipfs_hash:
raise ValidationError(
f"Hashed IPFS contents retrieved from uri: {uri} do not match its content hash."
)
return contents


class AsyncIPFS(AsyncBaseIPFSBackend):
@property
def base_uri(self):
return "https://ipfs.io/ipfs"

async def fetch_uri_contents(self, uri: str):
ipfs_hash = extract_ipfs_path_from_uri(uri)
response = await asks.get(f"{self.base_uri}/{ipfs_hash}")
contents = response.content

validation_hash = generate_file_hash(contents)
if validation_hash != ipfs_hash:
raise ValidationError(
f"Hashed IPFS contents retrieved from uri: {uri} do not match its content hash."
)
return contents


class AsyncInfura(AsyncBaseIPFSBackend):
@property
def base_uri(self) -> str:
return "https://ipfs.infura.io:5001/ipfs"

async def fetch_uri_contents(self, uri: str) -> bytes:
ipfs_hash = extract_ipfs_path_from_uri(uri)
response = await asks.get(f"{self.base_uri}/{ipfs_hash}")
contents = response.content
if contents == b'bad request\n':
raise CannotHandleURI

validation_hash = generate_file_hash(contents)
if validation_hash != ipfs_hash:
raise ValidationError(
f"Hashed IPFS contents retrieved from uri: {uri} do not match its content hash."
)
return contents


class IPFSGatewayBackend(IPFSOverHTTPBackend):
"""
Backend class for all IPFS URIs served over the IPFS gateway.
Expand All @@ -100,12 +185,7 @@ def base_uri(self) -> str:

def pin_assets(self, file_or_dir_path: Path) -> List[Dict[str, str]]:
raise CannotHandleURI(
"IPFS gateway is currently disabled, please use a different IPFS backend."
)

def fetch_uri_contents(self, uri: str) -> bytes:
raise CannotHandleURI(
"IPFS gateway is currently disabled, please use a different IPFS backend."
"IPFS gateway does not allow pinning assets, please use a different IPFS backend."
)


Expand Down
2 changes: 1 addition & 1 deletion ethpm/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

DEFAULT_IPFS_BACKEND = "ethpm.backends.ipfs.InfuraIPFSBackend"

IPFS_GATEWAY_PREFIX = "https://ipfs.io/ipfs/"
IPFS_GATEWAY_PREFIX = "/dns4/ipfs.io/tcp/443/https"

# TODO Deprecate in favor of a better scheme for fetching registry URIs.
# Please play nice and don't use this key for any shenanigans, thanks!
Expand Down
59 changes: 58 additions & 1 deletion ethpm/utils/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,17 @@
from eth_typing import URI
from eth_utils import to_tuple
from ipfshttpclient.exceptions import ConnectionError
import trio

from ethpm.backends.base import BaseURIBackend
from ethpm.backends.http import GithubOverHTTPSBackend
from ethpm.backends.ipfs import (
AsyncIPFS,
AsyncInfura,
AsyncLocal,
DummyIPFSBackend,
InfuraIPFSBackend,
IPFSGatewayBackend,
LocalIPFSBackend,
get_ipfs_backend_class,
)
Expand All @@ -18,16 +23,25 @@

URI_BACKENDS = [
InfuraIPFSBackend,
IPFSGatewayBackend,
DummyIPFSBackend,
LocalIPFSBackend,
GithubOverHTTPSBackend,
RegistryURIBackend,
]
ASYNC_URI_BACKENDS = [
AsyncIPFS,
AsyncInfura,
AsyncLocal,
]

logger = logging.getLogger("ethpm.utils.backend")


def resolve_uri_contents(uri: URI, fingerprint: bool = None) -> bytes:
"""
synchronous fetching single supported c-a uri
"""
resolvable_backends = get_resolvable_backends_for_uri(uri)
if resolvable_backends:
for backend in resolvable_backends:
Expand All @@ -45,13 +59,44 @@ def resolve_uri_contents(uri: URI, fingerprint: bool = None) -> bytes:
"Registry URIs must point to a resolvable content-addressed URI."
)
package_id = RegistryURIBackend().fetch_uri_contents(uri)
return resolve_uri_contents(package_id, True)
return resolve_uri_contents(package_id, fingerprint=True)

raise CannotHandleURI(
f"URI: {uri} cannot be resolved by any of the available backends."
)


async def async_resolve_uris(uris):
"""
takes list of any supported content-addressed uris and returns dict {uri=> contents}
NO registry uris!
"""
results = {}
async with trio.open_nursery() as nursery:
for uri in uris:
nursery.start_soon(async_resolve_uri_contents, uri, results)
return results


async def async_resolve_uri_contents(uri, results):
async_backends = async_get_resolvable_backends_for_uri(uri)
send_channel, receive_channel = trio.open_memory_channel(0)
async def jockey(async_fn):
try:
await send_channel.send(await async_fn(uri))
except CannotHandleURI:
pass

async with trio.open_nursery() as nursery:
for backend in async_backends:
nursery.start_soon(jockey, backend().fetch_uri_contents)
# will this hang if no backends can serve uri?
winner = await receive_channel.receive()
nursery.cancel_scope.cancel()
# mutation acceptable here?
results[uri] = winner


@to_tuple
def get_translatable_backends_for_uri(
uri: URI
Expand All @@ -64,6 +109,18 @@ def get_translatable_backends_for_uri(
except ConnectionError:
logger.debug("No local IPFS node available on port 5001.", exc_info=True)

@to_tuple
def async_get_resolvable_backends_for_uri(
uri: URI
) -> Generator[Type[BaseURIBackend], None, None]:
for backend_class in ASYNC_URI_BACKENDS:
try:
if backend_class().can_resolve_uri(uri): # type: ignore
yield backend_class
except ConnectionError:
logger.debug(
"No local IPFS node available on port 5001.", exc_info=True
)

@to_tuple
def get_resolvable_backends_for_uri(
Expand Down
2 changes: 2 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,13 @@
url='https://github.com/ethpm/py-ethpm',
include_package_data=True,
install_requires=[
'asks>=2.3.5,<3',
'eth-utils>=1.6.0,<2',
'ipfshttpclient>=0.4.12,<1',
'jsonschema>=2.6.0,<3',
'protobuf>=3.0.0,<4',
'rlp>=1.0.1,<2',
"trio>=0.11,<0.12",
'web3[tester]>=5.0.0b1,<6',
],
setup_requires=['setuptools-markdown'],
Expand Down