Skip to content

Commit 66a53af

Browse files
authored
Feature: download on ipfs client (#36)
Solutions: - Add a function called download_file_ipfs that downloads from ipfs.io/ipfs/. - Refactor the current download process to include: - Chunk size and storage into a buffer (BytesIO). (memory friendly) - Add a test for the download functionality.
1 parent 70f28da commit 66a53af

File tree

4 files changed

+151
-8
lines changed

4 files changed

+151
-8
lines changed

src/aleph/sdk/client.py

+78-7
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
TypeVar,
2424
Union,
2525
)
26+
from io import BytesIO
2627

2728
import aiohttp
2829
from aleph_message.models import (
@@ -31,6 +32,7 @@
3132
AlephMessage,
3233
ForgetContent,
3334
ForgetMessage,
35+
ItemHash,
3436
ItemType,
3537
MessageType,
3638
PostContent,
@@ -43,16 +45,17 @@
4345
)
4446
from aleph_message.models.execution.base import Encoding
4547
from aleph_message.status import MessageStatus
46-
from pydantic import ValidationError
48+
from pydantic import ValidationError, BaseModel
4749

4850
from aleph.sdk.types import Account, GenericMessage, StorageEnum
49-
51+
from aleph.sdk.utils import copy_async_readable_to_buffer, Writable, AsyncReadable
5052
from .conf import settings
5153
from .exceptions import (
5254
BroadcastError,
5355
InvalidMessageError,
5456
MessageNotFoundError,
5557
MultipleMessagesError,
58+
FileTooLarge,
5659
)
5760
from .models import MessagesResponse
5861
from .utils import check_unix_socket_valid, get_message_type_value
@@ -229,6 +232,12 @@ def get_posts(
229232
def download_file(self, file_hash: str) -> bytes:
230233
return self._wrap(self.async_session.download_file, file_hash=file_hash)
231234

235+
def download_file_ipfs(self, file_hash: str) -> bytes:
236+
return self._wrap(
237+
self.async_session.download_file_ipfs,
238+
file_hash=file_hash,
239+
)
240+
232241
def watch_messages(
233242
self,
234243
message_type: Optional[MessageType] = None,
@@ -609,6 +618,55 @@ async def get_posts(
609618
resp.raise_for_status()
610619
return await resp.json()
611620

621+
async def download_file_to_buffer(
622+
self,
623+
file_hash: str,
624+
output_buffer: Writable[bytes],
625+
) -> None:
626+
"""
627+
Download a file from the storage engine and write it to the specified output buffer.
628+
:param file_hash: The hash of the file to retrieve.
629+
:param output_buffer: Writable binary buffer. The file will be written to this buffer.
630+
"""
631+
632+
async with self.http_session.get(
633+
f"/api/v0/storage/raw/{file_hash}"
634+
) as response:
635+
if response.status == 200:
636+
await copy_async_readable_to_buffer(
637+
response.content, output_buffer, chunk_size=16 * 1024
638+
)
639+
if response.status == 413:
640+
ipfs_hash = ItemHash(file_hash)
641+
if ipfs_hash.item_type == ItemType.ipfs:
642+
return await self.download_file_ipfs_to_buffer(
643+
file_hash, output_buffer
644+
)
645+
else:
646+
raise FileTooLarge(f"The file from {file_hash} is too large")
647+
648+
async def download_file_ipfs_to_buffer(
649+
self,
650+
file_hash: str,
651+
output_buffer: Writable[bytes],
652+
) -> None:
653+
"""
654+
Download a file from the storage engine and write it to the specified output buffer.
655+
656+
:param file_hash: The hash of the file to retrieve.
657+
:param output_buffer: The binary output buffer to write the file data to.
658+
"""
659+
async with aiohttp.ClientSession() as session:
660+
async with session.get(
661+
f"https://ipfs.aleph.im/ipfs/{file_hash}"
662+
) as response:
663+
if response.status == 200:
664+
await copy_async_readable_to_buffer(
665+
response.content, output_buffer, chunk_size=16 * 1024
666+
)
667+
else:
668+
response.raise_for_status()
669+
612670
async def download_file(
613671
self,
614672
file_hash: str,
@@ -620,11 +678,24 @@ async def download_file(
620678
621679
:param file_hash: The hash of the file to retrieve.
622680
"""
623-
async with self.http_session.get(
624-
f"/api/v0/storage/raw/{file_hash}"
625-
) as response:
626-
response.raise_for_status()
627-
return await response.read()
681+
buffer = BytesIO()
682+
await self.download_file_to_buffer(file_hash, output_buffer=buffer)
683+
return buffer.getvalue()
684+
685+
async def download_file_ipfs(
686+
self,
687+
file_hash: str,
688+
) -> bytes:
689+
"""
690+
Get a file from the ipfs storage engine as raw bytes.
691+
692+
Warning: Downloading large files can be slow.
693+
694+
:param file_hash: The hash of the file to retrieve.
695+
"""
696+
buffer = BytesIO()
697+
await self.download_file_ipfs_to_buffer(file_hash, output_buffer=buffer)
698+
return buffer.getvalue()
628699

629700
async def get_messages(
630701
self,

src/aleph/sdk/exceptions.py

+8
Original file line numberDiff line numberDiff line change
@@ -42,3 +42,11 @@ class BadSignatureError(Exception):
4242
"""
4343

4444
pass
45+
46+
47+
class FileTooLarge(Exception):
48+
"""
49+
A file is too large
50+
"""
51+
52+
pass

src/aleph/sdk/utils.py

+32-1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,13 @@
1313
from aleph.sdk.conf import settings
1414
from aleph.sdk.types import GenericMessage
1515

16+
from typing import (
17+
Tuple,
18+
Type,
19+
TypeVar,
20+
Protocol,
21+
)
22+
1623
logger = logging.getLogger(__name__)
1724

1825
try:
@@ -47,7 +54,7 @@ def create_archive(path: Path) -> Tuple[Path, Encoding]:
4754
return archive_path, Encoding.zip
4855
elif os.path.isfile(path):
4956
if path.suffix == ".squashfs" or (
50-
magic and magic.from_file(path).startswith("Squashfs filesystem")
57+
magic and magic.from_file(path).startswith("Squashfs filesystem")
5158
):
5259
return path, Encoding.squashfs
5360
else:
@@ -79,6 +86,30 @@ def check_unix_socket_valid(unix_socket_path: str) -> bool:
7986
return True
8087

8188

89+
T = TypeVar("T", str, bytes, covariant=True)
90+
U = TypeVar("U", str, bytes, contravariant=True)
91+
92+
93+
class AsyncReadable(Protocol[T]):
94+
async def read(self, n: int = -1) -> T:
95+
...
96+
97+
98+
class Writable(Protocol[U]):
99+
def write(self, buffer: U) -> int:
100+
...
101+
102+
103+
async def copy_async_readable_to_buffer(
104+
readable: AsyncReadable[T], buffer: Writable[T], chunk_size: int
105+
):
106+
while True:
107+
chunk = await readable.read(chunk_size)
108+
if not chunk:
109+
break
110+
buffer.write(chunk)
111+
112+
82113
def enum_as_str(obj: Union[str, Enum]) -> str:
83114
"""Returns the value of an Enum, or the string itself when passing a string.
84115

tests/unit/test_download.py

+33
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
import pytest
2+
from aleph.sdk import AlephClient
3+
from aleph.sdk.conf import settings as sdk_settings
4+
5+
6+
@pytest.mark.parametrize(
7+
"file_hash,expected_size",
8+
[
9+
("QmeomffUNfmQy76CQGy9NdmqEnnHU9soCexBnGU3ezPHVH", 5),
10+
("Qmdy5LaAL4eghxE7JD6Ah5o4PJGarjAV9st8az2k52i1vq", 5817703),
11+
],
12+
)
13+
@pytest.mark.asyncio
14+
async def test_download(file_hash: str, expected_size: int):
15+
async with AlephClient(api_server=sdk_settings.API_HOST) as client:
16+
file_content = await client.download_file(file_hash) # File is 5B
17+
file_size = len(file_content)
18+
assert file_size == expected_size
19+
20+
21+
@pytest.mark.parametrize(
22+
"file_hash,expected_size",
23+
[
24+
("QmeomffUNfmQy76CQGy9NdmqEnnHU9soCexBnGU3ezPHVH", 5),
25+
("Qmdy5LaAL4eghxE7JD6Ah5o4PJGarjAV9st8az2k52i1vq", 5817703),
26+
],
27+
)
28+
@pytest.mark.asyncio
29+
async def test_download_ipfs(file_hash: str, expected_size: int):
30+
async with AlephClient(api_server=sdk_settings.API_HOST) as client:
31+
file_content = await client.download_file_ipfs(file_hash) ## 5817703 B FILE
32+
file_size = len(file_content)
33+
assert file_size == expected_size

0 commit comments

Comments
 (0)