diff --git a/.coveragerc b/.coveragerc index 89ec6fe0..bf5bfda2 100644 --- a/.coveragerc +++ b/.coveragerc @@ -26,3 +26,6 @@ exclude_lines = # Don't complain if non-runnable code isn't run: if 0: if __name__ == .__main__.: + + # Don't complain about ineffective code: + pass diff --git a/src/aleph/sdk/base.py b/src/aleph/sdk/base.py new file mode 100644 index 00000000..a5b2c266 --- /dev/null +++ b/src/aleph/sdk/base.py @@ -0,0 +1,476 @@ +# An interface for all clients to implement. + +import logging +from abc import ABC, abstractmethod +from datetime import datetime +from pathlib import Path +from typing import ( + Any, + AsyncIterable, + Dict, + Iterable, + List, + Mapping, + Optional, + Tuple, + Type, + Union, +) + +from aleph_message.models import ( + AlephMessage, + MessagesResponse, + MessageType, + PostMessage, +) +from aleph_message.models.execution.program import Encoding +from aleph_message.status import MessageStatus + +from aleph.sdk.models import PostsResponse +from aleph.sdk.types import GenericMessage, StorageEnum + +DEFAULT_PAGE_SIZE = 200 + + +class BaseAlephClient(ABC): + @abstractmethod + async def fetch_aggregate( + self, + address: str, + key: str, + limit: int = 100, + ) -> Dict[str, Dict]: + """ + Fetch a value from the aggregate store by owner address and item key. + + :param address: Address of the owner of the aggregate + :param key: Key of the aggregate + :param limit: Maximum number of items to fetch (Default: 100) + """ + pass + + @abstractmethod + async def fetch_aggregates( + self, + address: str, + keys: Optional[Iterable[str]] = None, + limit: int = 100, + ) -> Dict[str, Dict]: + """ + Fetch key-value pairs from the aggregate store by owner address. + + :param address: Address of the owner of the aggregate + :param keys: Keys of the aggregates to fetch (Default: all items) + :param limit: Maximum number of items to fetch (Default: 100) + """ + pass + + @abstractmethod + async def get_posts( + self, + pagination: int = DEFAULT_PAGE_SIZE, + page: int = 1, + types: Optional[Iterable[str]] = None, + refs: Optional[Iterable[str]] = None, + addresses: Optional[Iterable[str]] = None, + tags: Optional[Iterable[str]] = None, + hashes: Optional[Iterable[str]] = None, + channels: Optional[Iterable[str]] = None, + chains: Optional[Iterable[str]] = None, + start_date: Optional[Union[datetime, float]] = None, + end_date: Optional[Union[datetime, float]] = None, + ignore_invalid_messages: Optional[bool] = True, + invalid_messages_log_level: Optional[int] = logging.NOTSET, + ) -> PostsResponse: + """ + Fetch a list of posts from the network. + + :param pagination: Number of items to fetch (Default: 200) + :param page: Page to fetch, begins at 1 (Default: 1) + :param types: Types of posts to fetch (Default: all types) + :param refs: If set, only fetch posts that reference these hashes (in the "refs" field) + :param addresses: Addresses of the posts to fetch (Default: all addresses) + :param tags: Tags of the posts to fetch (Default: all tags) + :param hashes: Specific item_hashes to fetch + :param channels: Channels of the posts to fetch (Default: all channels) + :param chains: Chains of the posts to fetch (Default: all chains) + :param start_date: Earliest date to fetch messages from + :param end_date: Latest date to fetch messages from + :param ignore_invalid_messages: Ignore invalid messages (Default: True) + :param invalid_messages_log_level: Log level to use for invalid messages (Default: logging.NOTSET) + """ + pass + + async def get_posts_iterator( + self, + types: Optional[Iterable[str]] = None, + refs: Optional[Iterable[str]] = None, + addresses: Optional[Iterable[str]] = None, + tags: Optional[Iterable[str]] = None, + hashes: Optional[Iterable[str]] = None, + channels: Optional[Iterable[str]] = None, + chains: Optional[Iterable[str]] = None, + start_date: Optional[Union[datetime, float]] = None, + end_date: Optional[Union[datetime, float]] = None, + ) -> AsyncIterable[PostMessage]: + """ + Fetch all filtered posts, returning an async iterator and fetching them page by page. Might return duplicates + but will always return all posts. + + :param types: Types of posts to fetch (Default: all types) + :param refs: If set, only fetch posts that reference these hashes (in the "refs" field) + :param addresses: Addresses of the posts to fetch (Default: all addresses) + :param tags: Tags of the posts to fetch (Default: all tags) + :param hashes: Specific item_hashes to fetch + :param channels: Channels of the posts to fetch (Default: all channels) + :param chains: Chains of the posts to fetch (Default: all chains) + :param start_date: Earliest date to fetch messages from + :param end_date: Latest date to fetch messages from + """ + page = 1 + resp = None + while resp is None or len(resp.posts) > 0: + resp = await self.get_posts( + page=page, + types=types, + refs=refs, + addresses=addresses, + tags=tags, + hashes=hashes, + channels=channels, + chains=chains, + start_date=start_date, + end_date=end_date, + ) + page += 1 + for post in resp.posts: + yield post + + @abstractmethod + async def download_file( + self, + file_hash: str, + ) -> bytes: + """ + Get a file from the storage engine as raw bytes. + + Warning: Downloading large files can be slow and memory intensive. + + :param file_hash: The hash of the file to retrieve. + """ + pass + + @abstractmethod + async def get_messages( + self, + pagination: int = DEFAULT_PAGE_SIZE, + page: int = 1, + message_type: Optional[MessageType] = None, + message_types: Optional[Iterable[MessageType]] = None, + content_types: Optional[Iterable[str]] = None, + content_keys: Optional[Iterable[str]] = None, + refs: Optional[Iterable[str]] = None, + addresses: Optional[Iterable[str]] = None, + tags: Optional[Iterable[str]] = None, + hashes: Optional[Iterable[str]] = None, + channels: Optional[Iterable[str]] = None, + chains: Optional[Iterable[str]] = None, + start_date: Optional[Union[datetime, float]] = None, + end_date: Optional[Union[datetime, float]] = None, + ignore_invalid_messages: Optional[bool] = True, + invalid_messages_log_level: Optional[int] = logging.NOTSET, + ) -> MessagesResponse: + """ + Fetch a list of messages from the network. + + :param pagination: Number of items to fetch (Default: 200) + :param page: Page to fetch, begins at 1 (Default: 1) + :param message_type: [DEPRECATED] Filter by message type, can be "AGGREGATE", "POST", "PROGRAM", "VM", "STORE" or "FORGET" + :param message_types: Filter by message types, can be any combination of "AGGREGATE", "POST", "PROGRAM", "VM", "STORE" or "FORGET" + :param content_types: Filter by content type + :param content_keys: Filter by aggregate key + :param refs: If set, only fetch posts that reference these hashes (in the "refs" field) + :param addresses: Addresses of the posts to fetch (Default: all addresses) + :param tags: Tags of the posts to fetch (Default: all tags) + :param hashes: Specific item_hashes to fetch + :param channels: Channels of the posts to fetch (Default: all channels) + :param chains: Filter by sender address chain + :param start_date: Earliest date to fetch messages from + :param end_date: Latest date to fetch messages from + :param ignore_invalid_messages: Ignore invalid messages (Default: True) + :param invalid_messages_log_level: Log level to use for invalid messages (Default: logging.NOTSET) + """ + pass + + async def get_messages_iterator( + self, + message_type: Optional[MessageType] = None, + content_types: Optional[Iterable[str]] = None, + content_keys: Optional[Iterable[str]] = None, + refs: Optional[Iterable[str]] = None, + addresses: Optional[Iterable[str]] = None, + tags: Optional[Iterable[str]] = None, + hashes: Optional[Iterable[str]] = None, + channels: Optional[Iterable[str]] = None, + chains: Optional[Iterable[str]] = None, + start_date: Optional[Union[datetime, float]] = None, + end_date: Optional[Union[datetime, float]] = None, + ) -> AsyncIterable[AlephMessage]: + """ + Fetch all filtered messages, returning an async iterator and fetching them page by page. Might return duplicates + but will always return all messages. + + :param message_type: Filter by message type, can be "AGGREGATE", "POST", "PROGRAM", "VM", "STORE" or "FORGET" + :param content_types: Filter by content type + :param content_keys: Filter by content key + :param refs: If set, only fetch posts that reference these hashes (in the "refs" field) + :param addresses: Addresses of the posts to fetch (Default: all addresses) + :param tags: Tags of the posts to fetch (Default: all tags) + :param hashes: Specific item_hashes to fetch + :param channels: Channels of the posts to fetch (Default: all channels) + :param chains: Filter by sender address chain + :param start_date: Earliest date to fetch messages from + :param end_date: Latest date to fetch messages from + """ + page = 1 + resp = None + while resp is None or len(resp.messages) > 0: + resp = await self.get_messages( + page=page, + message_type=message_type, + content_types=content_types, + content_keys=content_keys, + refs=refs, + addresses=addresses, + tags=tags, + hashes=hashes, + channels=channels, + chains=chains, + start_date=start_date, + end_date=end_date, + ) + page += 1 + for message in resp.messages: + yield message + + @abstractmethod + async def get_message( + self, + item_hash: str, + message_type: Optional[Type[GenericMessage]] = None, + channel: Optional[str] = None, + ) -> GenericMessage: + """ + Get a single message from its `item_hash` and perform some basic validation. + + :param item_hash: Hash of the message to fetch + :param message_type: Type of message to fetch + :param channel: Channel of the message to fetch + """ + pass + + @abstractmethod + def watch_messages( + self, + message_type: Optional[MessageType] = None, + message_types: Optional[Iterable[MessageType]] = None, + content_types: Optional[Iterable[str]] = None, + content_keys: Optional[Iterable[str]] = None, + refs: Optional[Iterable[str]] = None, + addresses: Optional[Iterable[str]] = None, + tags: Optional[Iterable[str]] = None, + hashes: Optional[Iterable[str]] = None, + channels: Optional[Iterable[str]] = None, + chains: Optional[Iterable[str]] = None, + start_date: Optional[Union[datetime, float]] = None, + end_date: Optional[Union[datetime, float]] = None, + ) -> AsyncIterable[AlephMessage]: + """ + Iterate over current and future matching messages asynchronously. + + :param message_type: [DEPRECATED] Type of message to watch + :param message_types: Types of messages to watch + :param content_types: Content types to watch + :param content_keys: Filter by aggregate key + :param refs: References to watch + :param addresses: Addresses to watch + :param tags: Tags to watch + :param hashes: Hashes to watch + :param channels: Channels to watch + :param chains: Chains to watch + :param start_date: Start date from when to watch + :param end_date: End date until when to watch + """ + pass + + +class BaseAuthenticatedAlephClient(BaseAlephClient): + @abstractmethod + async def create_post( + self, + post_content: Any, + post_type: str, + ref: Optional[str] = None, + address: Optional[str] = None, + channel: Optional[str] = None, + inline: bool = True, + storage_engine: StorageEnum = StorageEnum.storage, + sync: bool = False, + ) -> Tuple[AlephMessage, MessageStatus]: + """ + Create a POST message on the Aleph network. It is associated with a channel and owned by an account. + + :param post_content: The content of the message + :param post_type: An arbitrary content type that helps to describe the post_content + :param ref: A reference to a previous message that it replaces + :param address: The address that will be displayed as the author of the message + :param channel: The channel that the message will be posted on + :param inline: An optional flag to indicate if the content should be inlined in the message or not + :param storage_engine: An optional storage engine to use for the message, if not inlined (Default: "storage") + :param sync: If true, waits for the message to be processed by the API server (Default: False) + """ + pass + + @abstractmethod + async def create_aggregate( + self, + key: str, + content: Mapping[str, Any], + address: Optional[str] = None, + channel: Optional[str] = None, + inline: bool = True, + sync: bool = False, + ) -> Tuple[AlephMessage, MessageStatus]: + """ + Create an AGGREGATE message. It is meant to be used as a quick access storage associated with an account. + + :param key: Key to use to store the content + :param content: Content to store + :param address: Address to use to sign the message + :param channel: Channel to use (Default: "TEST") + :param inline: Whether to write content inside the message (Default: True) + :param sync: If true, waits for the message to be processed by the API server (Default: False) + """ + pass + + @abstractmethod + async def create_store( + self, + address: Optional[str] = None, + file_content: Optional[bytes] = None, + file_path: Optional[Union[str, Path]] = None, + file_hash: Optional[str] = None, + guess_mime_type: bool = False, + ref: Optional[str] = None, + storage_engine: StorageEnum = StorageEnum.storage, + extra_fields: Optional[dict] = None, + channel: Optional[str] = None, + sync: bool = False, + ) -> Tuple[AlephMessage, MessageStatus]: + """ + Create a STORE message to store a file on the Aleph network. + + Can be passed either a file path, an IPFS hash or the file's content as raw bytes. + + :param address: Address to display as the author of the message (Default: account.get_address()) + :param file_content: Byte stream of the file to store (Default: None) + :param file_path: Path to the file to store (Default: None) + :param file_hash: Hash of the file to store (Default: None) + :param guess_mime_type: Guess the MIME type of the file (Default: False) + :param ref: Reference to a previous message (Default: None) + :param storage_engine: Storage engine to use (Default: "storage") + :param extra_fields: Extra fields to add to the STORE message (Default: None) + :param channel: Channel to post the message to (Default: "TEST") + :param sync: If true, waits for the message to be processed by the API server (Default: False) + """ + pass + + @abstractmethod + async def create_program( + self, + program_ref: str, + entrypoint: str, + runtime: str, + environment_variables: Optional[Mapping[str, str]] = None, + storage_engine: StorageEnum = StorageEnum.storage, + channel: Optional[str] = None, + address: Optional[str] = None, + sync: bool = False, + memory: Optional[int] = None, + vcpus: Optional[int] = None, + timeout_seconds: Optional[float] = None, + persistent: bool = False, + encoding: Encoding = Encoding.zip, + volumes: Optional[List[Mapping]] = None, + subscriptions: Optional[List[Mapping]] = None, + metadata: Optional[Mapping[str, Any]] = None, + ) -> Tuple[AlephMessage, MessageStatus]: + """ + Post a (create) PROGRAM message. + + :param program_ref: Reference to the program to run + :param entrypoint: Entrypoint to run + :param runtime: Runtime to use + :param environment_variables: Environment variables to pass to the program + :param storage_engine: Storage engine to use (Default: "storage") + :param channel: Channel to use (Default: "TEST") + :param address: Address to use (Default: account.get_address()) + :param sync: If true, waits for the message to be processed by the API server + :param memory: Memory in MB for the VM to be allocated (Default: 128) + :param vcpus: Number of vCPUs to allocate (Default: 1) + :param timeout_seconds: Timeout in seconds (Default: 30.0) + :param persistent: Whether the program should be persistent or not (Default: False) + :param encoding: Encoding to use (Default: Encoding.zip) + :param volumes: Volumes to mount + :param subscriptions: Patterns of Aleph messages to forward to the program's event receiver + :param metadata: Metadata to attach to the message + """ + pass + + @abstractmethod + async def forget( + self, + hashes: List[str], + reason: Optional[str], + storage_engine: StorageEnum = StorageEnum.storage, + channel: Optional[str] = None, + address: Optional[str] = None, + sync: bool = False, + ) -> Tuple[AlephMessage, MessageStatus]: + """ + Post a FORGET message to remove previous messages from the network. + + Targeted messages need to be signed by the same account that is attempting to forget them, + if the creating address did not delegate the access rights to the forgetting account. + + :param hashes: Hashes of the messages to forget + :param reason: Reason for forgetting the messages + :param storage_engine: Storage engine to use (Default: "storage") + :param channel: Channel to use (Default: "TEST") + :param address: Address to use (Default: account.get_address()) + :param sync: If true, waits for the message to be processed by the API server (Default: False) + """ + pass + + @abstractmethod + async def submit( + self, + content: Dict[str, Any], + message_type: MessageType, + channel: Optional[str] = None, + storage_engine: StorageEnum = StorageEnum.storage, + allow_inlining: bool = True, + sync: bool = False, + ) -> Tuple[AlephMessage, MessageStatus]: + """ + Submit a message to the network. This is a generic method that can be used to submit any type of message. + Prefer using the more specific methods to submit messages. + + :param content: Content of the message + :param message_type: Type of the message + :param channel: Channel to use (Default: "TEST") + :param storage_engine: Storage engine to use (Default: "storage") + :param allow_inlining: Whether to allow inlining the content of the message (Default: True) + :param sync: If true, waits for the message to be processed by the API server (Default: False) + """ + pass diff --git a/src/aleph/sdk/client.py b/src/aleph/sdk/client.py index 0b2c1553..e73989af 100644 --- a/src/aleph/sdk/client.py +++ b/src/aleph/sdk/client.py @@ -51,6 +51,7 @@ from aleph.sdk.types import Account, GenericMessage, StorageEnum from aleph.sdk.utils import Writable, copy_async_readable_to_buffer +from .base import BaseAlephClient, BaseAuthenticatedAlephClient from .conf import settings from .exceptions import ( BroadcastError, @@ -59,7 +60,7 @@ MessageNotFoundError, MultipleMessagesError, ) -from .models import MessagesResponse +from .models import MessagesResponse, Post, PostsResponse from .utils import check_unix_socket_valid, get_message_type_value logger = logging.getLogger(__name__) @@ -140,6 +141,7 @@ def get_messages( pagination: int = 200, page: int = 1, message_type: Optional[MessageType] = None, + message_types: Optional[List[MessageType]] = None, content_types: Optional[Iterable[str]] = None, content_keys: Optional[Iterable[str]] = None, refs: Optional[Iterable[str]] = None, @@ -158,6 +160,7 @@ def get_messages( pagination=pagination, page=page, message_type=message_type, + message_types=message_types, content_types=content_types, content_keys=content_keys, refs=refs, @@ -215,7 +218,7 @@ def get_posts( chains: Optional[Iterable[str]] = None, start_date: Optional[Union[datetime, float]] = None, end_date: Optional[Union[datetime, float]] = None, - ) -> Dict[str, Dict]: + ) -> PostsResponse: return self._wrap( self.async_session.get_posts, pagination=pagination, @@ -469,7 +472,7 @@ def submit( ) -class AlephClient: +class AlephClient(BaseAlephClient): api_server: str http_session: aiohttp.ClientSession @@ -530,14 +533,6 @@ async def fetch_aggregate( key: str, limit: int = 100, ) -> Dict[str, Dict]: - """ - Fetch a value from the aggregate store by owner address and item key. - - :param address: Address of the owner of the aggregate - :param key: Key of the aggregate - :param limit: Maximum number of items to fetch (Default: 100) - """ - params: Dict[str, Any] = {"keys": key} if limit: params["limit"] = limit @@ -555,14 +550,6 @@ async def fetch_aggregates( keys: Optional[Iterable[str]] = None, limit: int = 100, ) -> Dict[str, Dict]: - """ - Fetch key-value pairs from the aggregate store by owner address. - - :param address: Address of the owner of the aggregate - :param keys: Keys of the aggregates to fetch (Default: all items) - :param limit: Maximum number of items to fetch (Default: 100) - """ - keys_str = ",".join(keys) if keys else "" params: Dict[str, Any] = {} if keys_str: @@ -591,22 +578,17 @@ async def get_posts( chains: Optional[Iterable[str]] = None, start_date: Optional[Union[datetime, float]] = None, end_date: Optional[Union[datetime, float]] = None, - ) -> Dict[str, Dict]: - """ - Fetch a list of posts from the network. - - :param pagination: Number of items to fetch (Default: 200) - :param page: Page to fetch, begins at 1 (Default: 1) - :param types: Types of posts to fetch (Default: all types) - :param refs: If set, only fetch posts that reference these hashes (in the "refs" field) - :param addresses: Addresses of the posts to fetch (Default: all addresses) - :param tags: Tags of the posts to fetch (Default: all tags) - :param hashes: Specific item_hashes to fetch - :param channels: Channels of the posts to fetch (Default: all channels) - :param chains: Chains of the posts to fetch (Default: all chains) - :param start_date: Earliest date to fetch messages from - :param end_date: Latest date to fetch messages from - """ + ignore_invalid_messages: Optional[bool] = True, + invalid_messages_log_level: Optional[int] = logging.NOTSET, + ) -> PostsResponse: + ignore_invalid_messages = ( + True if ignore_invalid_messages is None else ignore_invalid_messages + ) + invalid_messages_log_level = ( + logging.NOTSET + if invalid_messages_log_level is None + else invalid_messages_log_level + ) params: Dict[str, Any] = dict(pagination=pagination, page=page) @@ -636,7 +618,25 @@ async def get_posts( async with self.http_session.get("/api/v0/posts.json", params=params) as resp: resp.raise_for_status() - return await resp.json() + response_json = await resp.json() + posts_raw = response_json["posts"] + + posts: List[Post] = [] + for post_raw in posts_raw: + try: + posts.append(Post.parse_obj(post_raw)) + except ValidationError as e: + if not ignore_invalid_messages: + raise e + if invalid_messages_log_level: + logger.log(level=invalid_messages_log_level, msg=e) + return PostsResponse( + posts=posts, + pagination_page=response_json["pagination_page"], + pagination_total=response_json["pagination_total"], + pagination_per_page=response_json["pagination_per_page"], + pagination_item=response_json["pagination_item"], + ) async def download_file_to_buffer( self, @@ -722,7 +722,7 @@ async def get_messages( pagination: int = 200, page: int = 1, message_type: Optional[MessageType] = None, - message_types: Optional[List[MessageType]] = None, + message_types: Optional[Iterable[MessageType]] = None, content_types: Optional[Iterable[str]] = None, content_keys: Optional[Iterable[str]] = None, refs: Optional[Iterable[str]] = None, @@ -733,29 +733,9 @@ async def get_messages( chains: Optional[Iterable[str]] = None, start_date: Optional[Union[datetime, float]] = None, end_date: Optional[Union[datetime, float]] = None, - ignore_invalid_messages: bool = True, - invalid_messages_log_level: int = logging.NOTSET, + ignore_invalid_messages: Optional[bool] = True, + invalid_messages_log_level: Optional[int] = logging.NOTSET, ) -> MessagesResponse: - """ - Fetch a list of messages from the network. - - :param pagination: Number of items to fetch (Default: 200) - :param page: Page to fetch, begins at 1 (Default: 1) - :param message_type: [DEPRECATED] Filter by message type, can be "AGGREGATE", "POST", "PROGRAM", "VM", "STORE" or "FORGET" - :param message_types: Filter by message types, can be any combination of "AGGREGATE", "POST", "PROGRAM", "VM", "STORE" or "FORGET" - :param content_types: Filter by content type - :param content_keys: Filter by content key - :param refs: If set, only fetch posts that reference these hashes (in the "refs" field) - :param addresses: Addresses of the posts to fetch (Default: all addresses) - :param tags: Tags of the posts to fetch (Default: all tags) - :param hashes: Specific item_hashes to fetch - :param channels: Channels of the posts to fetch (Default: all channels) - :param chains: Filter by sender address chain - :param start_date: Earliest date to fetch messages from - :param end_date: Latest date to fetch messages from - :param ignore_invalid_messages: Ignore invalid messages (Default: False) - :param invalid_messages_log_level: Log level to use for invalid messages (Default: logging.NOTSET) - """ ignore_invalid_messages = ( True if ignore_invalid_messages is None else ignore_invalid_messages ) @@ -775,6 +755,7 @@ async def get_messages( params["msgType"] = message_type.value if message_types is not None: params["msgTypes"] = ",".join([t.value for t in message_types]) + print(params["msgTypes"]) if content_types is not None: params["contentTypes"] = ",".join(content_types) if content_keys is not None: @@ -842,13 +823,6 @@ async def get_message( message_type: Optional[Type[GenericMessage]] = None, channel: Optional[str] = None, ) -> GenericMessage: - """ - Get a single message from its `item_hash` and perform some basic validation. - - :param item_hash: Hash of the message to fetch - :param message_type: Type of message to fetch - :param channel: Channel of the message to fetch - """ messages_response = await self.get_messages( hashes=[item_hash], channels=[channel] if channel else None, @@ -874,6 +848,7 @@ async def watch_messages( message_type: Optional[MessageType] = None, message_types: Optional[Iterable[MessageType]] = None, content_types: Optional[Iterable[str]] = None, + content_keys: Optional[Iterable[str]] = None, refs: Optional[Iterable[str]] = None, addresses: Optional[Iterable[str]] = None, tags: Optional[Iterable[str]] = None, @@ -883,21 +858,6 @@ async def watch_messages( start_date: Optional[Union[datetime, float]] = None, end_date: Optional[Union[datetime, float]] = None, ) -> AsyncIterable[AlephMessage]: - """ - Iterate over current and future matching messages asynchronously. - - :param message_type: [DEPRECATED] Type of message to watch - :param message_types: Types of messages to watch - :param content_types: Content types to watch - :param refs: References to watch - :param addresses: Addresses to watch - :param tags: Tags to watch - :param hashes: Hashes to watch - :param channels: Channels to watch - :param chains: Chains to watch - :param start_date: Start date from when to watch - :param end_date: End date until when to watch - """ params: Dict[str, Any] = dict() if message_type is not None: @@ -910,6 +870,8 @@ async def watch_messages( params["msgTypes"] = ",".join([t.value for t in message_types]) if content_types is not None: params["contentTypes"] = ",".join(content_types) + if content_keys is not None: + params["contentKeys"] = ",".join(content_keys) if refs is not None: params["refs"] = ",".join(refs) if addresses is not None: @@ -948,7 +910,7 @@ async def watch_messages( break -class AuthenticatedAlephClient(AlephClient): +class AuthenticatedAlephClient(AlephClient, BaseAuthenticatedAlephClient): account: Account BROADCAST_MESSAGE_FIELDS = { @@ -986,8 +948,11 @@ async def __aenter__(self) -> "AuthenticatedAlephClient": return self async def ipfs_push(self, content: Mapping) -> str: - """Push arbitrary content as JSON to the IPFS service.""" + """ + Push arbitrary content as JSON to the IPFS service. + :param content: The dict-like content to upload + """ url = "/api/v0/ipfs/add_json" logger.debug(f"Pushing to IPFS on {url}") @@ -996,8 +961,11 @@ async def ipfs_push(self, content: Mapping) -> str: return (await resp.json()).get("hash") async def storage_push(self, content: Mapping) -> str: - """Push arbitrary content as JSON to the storage service.""" + """ + Push arbitrary content as JSON to the storage service. + :param content: The dict-like content to upload + """ url = "/api/v0/storage/add_json" logger.debug(f"Pushing to storage on {url}") @@ -1006,7 +974,11 @@ async def storage_push(self, content: Mapping) -> str: return (await resp.json()).get("hash") async def ipfs_push_file(self, file_content: Union[str, bytes]) -> str: - """Push a file to the IPFS service.""" + """ + Push a file to the IPFS service. + + :param file_content: The file content to upload + """ data = aiohttp.FormData() data.add_field("file", file_content) @@ -1018,7 +990,9 @@ async def ipfs_push_file(self, file_content: Union[str, bytes]) -> str: return (await resp.json()).get("hash") async def storage_push_file(self, file_content) -> str: - """Push a file to the storage service.""" + """ + Push a file to the storage service. + """ data = aiohttp.FormData() data.add_field("file", file_content) @@ -1161,18 +1135,6 @@ async def create_post( storage_engine: StorageEnum = StorageEnum.storage, sync: bool = False, ) -> Tuple[PostMessage, MessageStatus]: - """ - Create a POST message on the Aleph network. It is associated with a channel and owned by an account. - - :param post_content: The content of the message - :param post_type: An arbitrary content type that helps to describe the post_content - :param ref: A reference to a previous message that it replaces - :param address: The address that will be displayed as the author of the message - :param channel: The channel that the message will be posted on - :param inline: An optional flag to indicate if the content should be inlined in the message or not - :param storage_engine: An optional storage engine to use for the message, if not inlined (Default: "storage") - :param sync: If true, waits for the message to be processed by the API server (Default: False) - """ address = address or settings.ADDRESS_TO_USE or self.account.get_address() content = PostContent( @@ -1201,16 +1163,6 @@ async def create_aggregate( inline: bool = True, sync: bool = False, ) -> Tuple[AggregateMessage, MessageStatus]: - """ - Create an AGGREGATE message. It is meant to be used as a quick access storage associated with an account. - - :param key: Key to use to store the content - :param content: Content to store - :param address: Address to use to sign the message - :param channel: Channel to use (Default: "TEST") - :param inline: Whether to write content inside the message (Default: True) - :param sync: If true, waits for the message to be processed by the API server (Default: False) - """ address = address or settings.ADDRESS_TO_USE or self.account.get_address() content_ = AggregateContent( @@ -1241,22 +1193,6 @@ async def create_store( channel: Optional[str] = None, sync: bool = False, ) -> Tuple[StoreMessage, MessageStatus]: - """ - Create a STORE message to store a file on the Aleph network. - - Can be passed either a file path, an IPFS hash or the file's content as raw bytes. - - :param address: Address to display as the author of the message (Default: account.get_address()) - :param file_content: Byte stream of the file to store (Default: None) - :param file_path: Path to the file to store (Default: None) - :param file_hash: Hash of the file to store (Default: None) - :param guess_mime_type: Guess the MIME type of the file (Default: False) - :param ref: Reference to a previous message (Default: None) - :param storage_engine: Storage engine to use (Default: "storage") - :param extra_fields: Extra fields to add to the STORE message (Default: None) - :param channel: Channel to post the message to (Default: "TEST") - :param sync: If true, waits for the message to be processed by the API server (Default: False) - """ address = address or settings.ADDRESS_TO_USE or self.account.get_address() extra_fields = extra_fields or {} @@ -1277,7 +1213,7 @@ async def create_store( else: raise ValueError(f"Unknown storage engine: '{storage_engine}'") - assert file_hash, "File hash should be empty" + assert file_hash, "File hash should not be empty" if magic is None: pass @@ -1325,25 +1261,6 @@ async def create_program( subscriptions: Optional[List[Mapping]] = None, metadata: Optional[Mapping[str, Any]] = None, ) -> Tuple[ProgramMessage, MessageStatus]: - """ - Post a (create) PROGRAM message. - - :param program_ref: Reference to the program to run - :param entrypoint: Entrypoint to run - :param runtime: Runtime to use - :param environment_variables: Environment variables to pass to the program - :param storage_engine: Storage engine to use (Default: "storage") - :param channel: Channel to use (Default: "TEST") - :param address: Address to use (Default: account.get_address()) - :param sync: If true, waits for the message to be processed by the API server - :param memory: Memory in MB for the VM to be allocated (Default: 128) - :param vcpus: Number of vCPUs to allocate (Default: 1) - :param timeout_seconds: Timeout in seconds (Default: 30.0) - :param persistent: Whether the program should be persistent or not (Default: False) - :param encoding: Encoding to use (Default: Encoding.zip) - :param volumes: Volumes to mount - :param subscriptions: Patterns of Aleph messages to forward to the program's event receiver - """ address = address or settings.ADDRESS_TO_USE or self.account.get_address() volumes = volumes if volumes is not None else [] @@ -1421,19 +1338,6 @@ async def forget( address: Optional[str] = None, sync: bool = False, ) -> Tuple[ForgetMessage, MessageStatus]: - """ - Post a FORGET message to remove previous messages from the network. - - Targeted messages need to be signed by the same account that is attempting to forget them, - if the creating address did not delegate the access rights to the forgetting account. - - :param hashes: Hashes of the messages to forget - :param reason: Reason for forgetting the messages - :param storage_engine: Storage engine to use (Default: "storage") - :param channel: Channel to use (Default: "TEST") - :param address: Address to use (Default: account.get_address()) - :param sync: If true, waits for the message to be processed by the API server (Default: False) - """ address = address or settings.ADDRESS_TO_USE or self.account.get_address() content = ForgetContent( diff --git a/src/aleph/sdk/models.py b/src/aleph/sdk/models.py index f7dfcd6e..f5b1072b 100644 --- a/src/aleph/sdk/models.py +++ b/src/aleph/sdk/models.py @@ -1,14 +1,51 @@ -from typing import List +from typing import Any, Dict, List, Optional, Union -from aleph_message.models import AlephMessage -from pydantic import BaseModel +from aleph_message.models import AlephMessage, BaseMessage, ChainRef, ItemHash +from pydantic import BaseModel, Field -class MessagesResponse(BaseModel): - """Response from an Aleph node API on the path /api/v0/messages.json""" - - messages: List[AlephMessage] +class PaginationResponse(BaseModel): pagination_page: int pagination_total: int pagination_per_page: int pagination_item: str + + +class MessagesResponse(PaginationResponse): + """Response from an Aleph node API on the path /api/v0/messages.json""" + + messages: List[AlephMessage] + pagination_item = "messages" + + +class Post(BaseMessage): + """ + A post is a type of message that can be updated. Over the get_posts API + we get the latest version of a post. + """ + + hash: ItemHash = Field(description="Hash of the content (sha256 by default)") + original_item_hash: ItemHash = Field( + description="Hash of the original content (sha256 by default)" + ) + original_signature: Optional[str] = Field( + description="Cryptographic signature of the original message by the sender" + ) + original_type: str = Field( + description="The original, user-generated 'content-type' of the POST message" + ) + content: Dict[str, Any] = Field( + description="The content.content of the POST message" + ) + type: str = Field(description="The content.type of the POST message") + address: str = Field(description="The address of the sender of the POST message") + ref: Optional[Union[str, ChainRef]] = Field( + description="Other message referenced by this one" + ) + + +class PostsResponse(PaginationResponse): + """Response from an Aleph node API on the path /api/v0/posts.json""" + + posts: List[Post] + pagination_item = "posts" diff --git a/src/aleph/sdk/vm/cache.py b/src/aleph/sdk/vm/cache.py index bad6da09..ff5ca7c8 100644 --- a/src/aleph/sdk/vm/cache.py +++ b/src/aleph/sdk/vm/cache.py @@ -110,7 +110,7 @@ async def keys(self, pattern: str = "*") -> List[str]: return await resp.json() -class TestVmCache(BaseVmCache): +class LocalVmCache(BaseVmCache): """This is a local, dict-based cache that can be used for testing purposes.""" def __init__(self): diff --git a/tests/integration/itest_forget.py b/tests/integration/itest_forget.py index 8339e316..29b6c6d9 100644 --- a/tests/integration/itest_forget.py +++ b/tests/integration/itest_forget.py @@ -107,10 +107,10 @@ async def test_forget_a_forget_message(fixture_account): account=fixture_account, api_server=TARGET_NODE ) as session: get_post_response = await session.get_posts(hashes=[post_hash]) - assert len(get_post_response["posts"]) == 1 - post = get_post_response["posts"][0] + assert len(get_post_response.posts) == 1 + post = get_post_response.posts[0] - forget_message_hash = post["forgotten_by"][0] + forget_message_hash = post.forgotten_by[0] forget_message, forget_status = await session.forget( hashes=[forget_message_hash], reason="I want to remember this post. Maybe I can forget I forgot it?", diff --git a/tests/unit/test_asynchronous_get.py b/tests/unit/test_asynchronous_get.py index 5a139328..db788e0b 100644 --- a/tests/unit/test_asynchronous_get.py +++ b/tests/unit/test_asynchronous_get.py @@ -3,10 +3,11 @@ from unittest.mock import AsyncMock import pytest -from aleph_message.models import MessagesResponse, MessageType +from aleph_message.models import MessagesResponse from aleph.sdk.client import AlephClient from aleph.sdk.conf import settings +from aleph.sdk.models import PostsResponse def make_mock_session(get_return_value: Dict[str, Any]) -> AlephClient: @@ -66,14 +67,10 @@ async def test_fetch_aggregates(): @pytest.mark.asyncio async def test_get_posts(): async with AlephClient(api_server=settings.API_HOST) as session: - response: MessagesResponse = await session.get_messages( - message_type=MessageType.post, - ) + response: PostsResponse = await session.get_posts() - messages = response.messages - assert len(messages) > 1 - for message in messages: - assert message.type == MessageType.post + posts = response.posts + assert len(posts) > 1 @pytest.mark.asyncio diff --git a/tests/unit/test_synchronous_get.py b/tests/unit/test_synchronous_get.py index d267fa07..eee26dcf 100644 --- a/tests/unit/test_synchronous_get.py +++ b/tests/unit/test_synchronous_get.py @@ -4,8 +4,9 @@ from aleph.sdk.conf import settings -def test_get_posts(): +def test_get_post_messages(): with AlephClient(api_server=settings.API_HOST) as session: + # TODO: Remove deprecated message_type parameter after message_types changes on pyaleph are deployed response: MessagesResponse = session.get_messages( pagination=2, message_type=MessageType.post, diff --git a/tests/unit/test_vm_cache.py b/tests/unit/test_vm_cache.py index 3e36f934..aa1a84fc 100644 --- a/tests/unit/test_vm_cache.py +++ b/tests/unit/test_vm_cache.py @@ -1,11 +1,11 @@ import pytest -from aleph.sdk.vm.cache import TestVmCache, sanitize_cache_key +from aleph.sdk.vm.cache import LocalVmCache, sanitize_cache_key @pytest.mark.asyncio async def test_local_vm_cache(): - cache = TestVmCache() + cache = LocalVmCache() assert (await cache.get("doesnotexist")) is None assert len(await cache.keys()) == 0 key = "thisdoesexist"