Skip to content

Commit

Permalink
topic message update
Browse files Browse the repository at this point in the history
Signed-off-by: nadine.loepfe <[email protected]>
  • Loading branch information
nadineloepfe committed Feb 10, 2025
1 parent a611d01 commit 7ebe939
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 13 deletions.
54 changes: 52 additions & 2 deletions src/hiero_sdk_python/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
from hiero_sdk_python.query.transaction_get_receipt_query import TransactionGetReceiptQuery
from hiero_sdk_python.transaction.transaction_id import TransactionId

import random
import requests
from hiero_sdk_python.account.account_id import AccountId

Operator = namedtuple('Operator', ['account_id', 'private_key'])

class Client:
Expand Down Expand Up @@ -46,7 +50,7 @@ def __init__(self, network=None):
if not node_account_ids:
raise ValueError("No nodes available in the network configuration.")

initial_node_id = node_account_ids[0]
initial_node_id = node_account_ids[0]
self._switch_node(initial_node_id)

self._init_mirror_stub()
Expand All @@ -61,16 +65,26 @@ def _init_mirror_stub(self):
self.mirror_stub = mirror_consensus_grpc.ConsensusServiceStub(self.mirror_channel)

def set_operator(self, account_id, private_key):
"""
Sets the operator credentials (account ID and private key).
"""
self.operator_account_id = account_id
self.operator_private_key = private_key

@property
def operator(self):
"""
Returns an Operator namedtuple if both account ID and private key are set,
otherwise None.
"""
if self.operator_account_id and self.operator_private_key:
return Operator(account_id=self.operator_account_id, private_key=self.operator_private_key)
return None

def generate_transaction_id(self):
"""
Generates a new transaction ID, requiring that the operator_account_id is set.
"""
if self.operator_account_id is None:
raise ValueError("Operator account ID must be set to generate transaction ID.")
return TransactionId.generate(self.operator_account_id)
Expand All @@ -85,6 +99,9 @@ def get_node_account_ids(self):
raise ValueError("No nodes available in the network configuration.")

def get_transaction_receipt(self, transaction_id, max_attempts=10, sleep_seconds=2):
"""
Repeatedly queries for a transaction receipt until SUCCESS or certain retryable statuses.
"""
for attempt in range(max_attempts):
receipt_query = TransactionGetReceiptQuery()
receipt_query.set_transaction_id(transaction_id)
Expand All @@ -93,7 +110,13 @@ def get_transaction_receipt(self, transaction_id, max_attempts=10, sleep_seconds

if status == ResponseCode.SUCCESS:
return receipt
elif status in (ResponseCode.UNKNOWN, ResponseCode.BUSY, ResponseCode.RECEIPT_NOT_FOUND, ResponseCode.RECORD_NOT_FOUND, ResponseCode.PLATFORM_NOT_ACTIVE):
elif status in (
ResponseCode.UNKNOWN,
ResponseCode.BUSY,
ResponseCode.RECEIPT_NOT_FOUND,
ResponseCode.RECORD_NOT_FOUND,
ResponseCode.PLATFORM_NOT_ACTIVE
):
time.sleep(sleep_seconds)
continue
else:
Expand Down Expand Up @@ -137,3 +160,30 @@ def _switch_node(self, node_account_id):
self.crypto_stub = crypto_service_pb2_grpc.CryptoServiceStub(self.channel)
self.topic_stub = consensus_service_pb2_grpc.ConsensusServiceStub(self.channel)
self.node_account_id = node_account_id

def close(self):
"""
Closes any open gRPC channels and frees resources.
Call this when you are done using the Client to ensure a clean shutdown.
"""
if self.channel is not None:
self.channel.close()
self.channel = None

if self.mirror_channel is not None:
self.mirror_channel.close()
self.mirror_channel = None

self.token_stub = None
self.crypto_stub = None
self.topic_stub = None
self.mirror_stub = None

def __enter__(self):
return self

def __exit__(self, exc_type, exc_value, traceback):
"""
Automatically close channels when exiting 'with' block.
"""
self.close()
3 changes: 2 additions & 1 deletion src/hiero_sdk_python/client/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class Network:
'mainnet': 'https://mainnet-public.mirrornode.hedera.com',
'testnet': 'https://testnet.mirrornode.hedera.com',
'previewnet': 'https://previewnet.mirrornode.hedera.com',
'solo': 'http://localhost:8080'
'solo': 'http://localhost:8080'
}

DEFAULT_NODES = {
Expand Down Expand Up @@ -97,6 +97,7 @@ def _fetch_nodes_from_mirror_node(self):
url = f"{base_url}/api/v1/network/nodes?limit=100&order=desc"

try:
import requests
response = requests.get(url)
response.raise_for_status()
data = response.json()
Expand Down
29 changes: 19 additions & 10 deletions src/hiero_sdk_python/query/topic_message_query.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
from datetime import datetime
from typing import Optional, Callable, Union
import threading

from hiero_sdk_python.consensus.topic_message import TopicMessage
from hiero_sdk_python.hapi.mirror import consensus_service_pb2 as mirror_proto
from hiero_sdk_python.hapi.services import basic_types_pb2, timestamp_pb2
from hiero_sdk_python.consensus.topic_id import TopicId


class TopicMessageQuery:
"""
A query to subscribe to messages from a specific HCS topic, via a mirror node.
Expand All @@ -19,21 +21,20 @@ def __init__(
limit: Optional[int] = None,
chunking_enabled: bool = False,
):
"""
Initializes a new TopicMessageQuery instance.
Args:
topic_id (str or TopicId, optional): The ID of the topic to subscribe to.
start_time (datetime, optional): Start time for the subscription.
end_time (datetime, optional): End time for the subscription.
limit (int, optional): Maximum number of messages to retrieve.
chunking_enabled (bool, optional): Whether to enable chunking.
"""
self._topic_id = self._parse_topic_id(topic_id) if topic_id else None
self._start_time = self._parse_timestamp(start_time) if start_time else None
self._end_time = self._parse_timestamp(end_time) if end_time else None
self._limit = limit
self._chunking_enabled = chunking_enabled
self._completion_handler: Optional[Callable[[], None]] = None

def set_completion_handler(self, handler: Callable[[], None]):
"""
Assign a callback that will be invoked when the subscription
completes (i.e., the mirror node closes the stream).
"""
self._completion_handler = handler
return self

def _parse_topic_id(self, topic_id: Union[str, TopicId]):
if isinstance(topic_id, str):
Expand Down Expand Up @@ -78,6 +79,12 @@ def subscribe(
on_message: Callable[[TopicMessage], None],
on_error: Optional[Callable[[Exception], None]] = None,
):
"""
Subscribes to the given topic on the mirror node via client.mirror_stub.
The on_message callback is invoked for each received TopicMessage.
The optional on_error callback is invoked if an exception occurs in the subscription thread.
If a completion handler has been set (via set_completion_handler), it is called when the stream ends.
"""
if not self._topic_id:
raise ValueError("Topic ID must be set before subscribing.")
if not client.mirror_stub:
Expand All @@ -97,6 +104,8 @@ def run_stream():
for response in message_stream:
msg_obj = TopicMessage.from_proto(response)
on_message(msg_obj)
if self._completion_handler:
self._completion_handler()
except Exception as e:
if on_error:
on_error(e)
Expand Down
35 changes: 35 additions & 0 deletions src/hiero_sdk_python/utils/subscription_handle.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import threading

class SubscriptionHandle:
"""
Represents a handle to an ongoing subscription.
Calling .cancel() will signal the subscription thread to stop.
"""
def __init__(self):
self._cancelled = threading.Event()
self._thread = None

def cancel(self):
"""
Signals to cancel the subscription.
"""
self._cancelled.set()

def is_cancelled(self) -> bool:
"""
Returns True if this subscription is already cancelled.
"""
return self._cancelled.is_set()

def set_thread(self, thread: threading.Thread):
"""
(Optional) Store the thread object for reference.
"""
self._thread = thread

def join(self, timeout=None):
"""
(Optional) Wait for the subscription thread to end.
"""
if self._thread:
self._thread.join(timeout)

0 comments on commit 7ebe939

Please sign in to comment.