Skip to content

Commit d1034ec

Browse files
committed
return subscription handler, add msg chunks, signing commit
Signed-off-by: nadine.loepfe <[email protected]>
1 parent 5c4f05a commit d1034ec

File tree

5 files changed

+202
-68
lines changed

5 files changed

+202
-68
lines changed

examples/query_topic_message.py

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,7 @@
33
from datetime import datetime
44
from dotenv import load_dotenv
55

6-
from hiero_sdk_python import (
7-
Network,
8-
Client,
9-
TopicMessageQuery,
10-
)
6+
from hiero_sdk_python import Network, Client, TopicMessageQuery
117

128
load_dotenv()
139

@@ -20,9 +16,6 @@ def on_message_handler(topic_message):
2016

2117
def on_error_handler(e):
2218
print(f"Subscription error: {e}")
23-
if "Stream removed" in str(e):
24-
print("Reconnecting due to stream removal...")
25-
query_topic_messages()
2619

2720
query = TopicMessageQuery(
2821
topic_id=os.getenv('TOPIC_ID'),
@@ -31,15 +24,21 @@ def on_error_handler(e):
3124
chunking_enabled=True
3225
)
3326

34-
query.subscribe(
27+
handle = query.subscribe(
3528
client,
3629
on_message=on_message_handler,
3730
on_error=on_error_handler
3831
)
3932

40-
print("Subscription started. Waiting for messages...")
41-
while True:
42-
time.sleep(10)
33+
print("Subscription started. Press Ctrl+C to cancel...")
34+
try:
35+
while True:
36+
time.sleep(10)
37+
except KeyboardInterrupt:
38+
print("Cancelling subscription...")
39+
handle.cancel()
40+
handle.join()
41+
print("Subscription cancelled. Exiting.")
4342

4443
if __name__ == "__main__":
4544
query_topic_messages()

src/hiero_sdk_python/client/client.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,18 +10,13 @@
1010

1111
from hiero_sdk_python.hapi.mirror import (
1212
consensus_service_pb2_grpc as mirror_consensus_grpc,
13-
mirror_network_service_pb2_grpc as mirror_network_grpc
1413
)
1514

1615
from .network import Network
1716
from hiero_sdk_python.response_code import ResponseCode
1817
from hiero_sdk_python.query.transaction_get_receipt_query import TransactionGetReceiptQuery
1918
from hiero_sdk_python.transaction.transaction_id import TransactionId
2019

21-
import random
22-
import requests
23-
from hiero_sdk_python.account.account_id import AccountId
24-
2520
Operator = namedtuple('Operator', ['account_id', 'private_key'])
2621

2722
class Client:

src/hiero_sdk_python/client/network.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,6 @@ def _fetch_nodes_from_mirror_node(self):
9797
url = f"{base_url}/api/v1/network/nodes?limit=100&order=desc"
9898

9999
try:
100-
import requests
101100
response = requests.get(url)
102101
response.raise_for_status()
103102
data = response.json()

src/hiero_sdk_python/consensus/topic_message.py

Lines changed: 140 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,61 +1,160 @@
1-
from hiero_sdk_python.hapi.mirror import consensus_service_pb2 as mirror_proto
21
from datetime import datetime
2+
from typing import Optional, List, Union
3+
from hiero_sdk_python.hapi.mirror import consensus_service_pb2 as mirror_proto
4+
5+
def _to_datetime(ts_proto) -> datetime:
6+
"""
7+
Convert a protobuf Timestamp to a Python datetime (UTC).
8+
"""
9+
return datetime.utcfromtimestamp(ts_proto.seconds + ts_proto.nanos / 1e9)
10+
11+
12+
class TopicMessageChunk:
13+
"""
14+
Represents a single chunk within a chunked topic message.
15+
Mirrors the Java 'TopicMessageChunk'.
16+
"""
17+
18+
def __init__(self, response: mirror_proto.ConsensusTopicResponse):
19+
self.consensus_timestamp = _to_datetime(response.consensusTimestamp)
20+
self.content_size = len(response.message)
21+
self.running_hash = response.runningHash
22+
self.sequence_number = response.sequenceNumber
23+
324

425
class TopicMessage:
526
"""
6-
Represents a single message returned from a Hedera Mirror Node subscription.
27+
Represents a Hedera TopicMessage, possibly composed of multiple chunks.
728
"""
829

9-
def __init__(self, consensus_timestamp, message, running_hash, sequence_number,
10-
running_hash_version=None, chunk_info=None, chunks=None, transaction_id=None):
30+
def __init__(
31+
self,
32+
consensus_timestamp: datetime,
33+
contents: bytes,
34+
running_hash: bytes,
35+
sequence_number: int,
36+
chunks: List[TopicMessageChunk],
37+
transaction_id: Optional[str] = None,
38+
):
1139
self.consensus_timestamp = consensus_timestamp
12-
self.message = message or b""
13-
self.running_hash = running_hash or b""
14-
self.sequence_number = sequence_number or 0
15-
self.running_hash_version = running_hash_version
16-
self.chunk_info = chunk_info
40+
self.contents = contents
41+
self.running_hash = running_hash
42+
self.sequence_number = sequence_number
1743
self.chunks = chunks
1844
self.transaction_id = transaction_id
1945

2046
@classmethod
21-
def from_proto(cls, response: mirror_proto.ConsensusTopicResponse) -> "TopicMessage":
47+
def of_single(cls, response: mirror_proto.ConsensusTopicResponse) -> "TopicMessage":
2248
"""
23-
Parse a Mirror Node response into a simpler object.
49+
Build a TopicMessage from a single-chunk response.
2450
"""
25-
transaction_id = (
26-
response.chunkInfo.initialTransactionID
27-
if response.HasField("chunkInfo") and response.chunkInfo.HasField("initialTransactionID")
28-
else None
51+
chunk = TopicMessageChunk(response)
52+
consensus_timestamp = chunk.consensus_timestamp
53+
contents = response.message
54+
running_hash = response.runningHash
55+
sequence_number = response.sequence_number
56+
57+
transaction_id = None
58+
if response.HasField("chunkInfo") and response.chunkInfo.HasField("initialTransactionID"):
59+
tx_id = response.chunkInfo.initialTransactionID
60+
transaction_id = (
61+
f"{tx_id.shardNum}.{tx_id.realmNum}.{tx_id.accountNum}-"
62+
f"{tx_id.transactionValidStart.seconds}.{tx_id.transactionValidStart.nanos}"
63+
)
64+
65+
return cls(
66+
consensus_timestamp,
67+
contents,
68+
running_hash,
69+
sequence_number,
70+
[chunk],
71+
transaction_id
2972
)
73+
74+
@classmethod
75+
def of_many(cls, responses: List[mirror_proto.ConsensusTopicResponse]) -> "TopicMessage":
76+
"""
77+
Reassemble multiple chunk responses into a single TopicMessage.
78+
"""
79+
sorted_responses = sorted(responses, key=lambda r: r.chunkInfo.number)
80+
81+
chunks = []
82+
total_size = 0
83+
transaction_id = None
84+
85+
for r in sorted_responses:
86+
c = TopicMessageChunk(r)
87+
chunks.append(c)
88+
total_size += len(r.message)
89+
90+
if (transaction_id is None
91+
and r.HasField("chunkInfo")
92+
and r.chunkInfo.HasField("initialTransactionID")):
93+
tx_id = r.chunkInfo.initialTransactionID
94+
transaction_id = (
95+
f"{tx_id.shardNum}.{tx_id.realmNum}.{tx_id.accountNum}-"
96+
f"{tx_id.transactionValidStart.seconds}.{tx_id.transactionValidStart.nanos}"
97+
)
98+
99+
contents = bytearray(total_size)
100+
offset = 0
101+
for r in sorted_responses:
102+
end = offset + len(r.message)
103+
contents[offset:end] = r.message
104+
offset = end
105+
106+
last_r = sorted_responses[-1]
107+
consensus_timestamp = _to_datetime(last_r.consensusTimestamp)
108+
running_hash = last_r.runningHash
109+
sequence_number = last_r.sequenceNumber
110+
30111
return cls(
31-
consensus_timestamp=response.consensusTimestamp,
32-
message=response.message,
33-
running_hash=response.runningHash,
34-
sequence_number=response.sequenceNumber,
35-
running_hash_version=response.runningHashVersion if response.runningHashVersion != 0 else None,
36-
chunk_info=response.chunkInfo if response.HasField("chunkInfo") else None,
37-
transaction_id=transaction_id,
112+
consensus_timestamp,
113+
bytes(contents),
114+
running_hash,
115+
sequence_number,
116+
chunks,
117+
transaction_id
38118
)
39119

40-
def __str__(self):
120+
@classmethod
121+
def from_proto(
122+
cls,
123+
response_or_responses: Union[mirror_proto.ConsensusTopicResponse, List[mirror_proto.ConsensusTopicResponse]],
124+
chunking_enabled: bool = False
125+
) -> "TopicMessage":
41126
"""
42-
Returns a nicely formatted string representation of the topic message.
127+
Creates a TopicMessage from either:
128+
- A single ConsensusTopicResponse
129+
- A list of responses (for multi-chunk)
130+
131+
If chunking is enabled and multiple chunks are detected, they are reassembled
132+
into one combined TopicMessage. Otherwise, a single chunk is returned as-is.
43133
"""
44-
timestamp = datetime.utcfromtimestamp(self.consensus_timestamp.seconds).strftime('%Y-%m-%d %H:%M:%S UTC')
45-
message = self.message.decode('utf-8', errors='ignore')
46-
running_hash = self.running_hash.hex()
47-
48-
formatted_message = (
49-
f"Received Topic Message:\n"
50-
f" - Timestamp: {timestamp}\n"
51-
f" - Sequence Number: {self.sequence_number}\n"
52-
f" - Message: {message}\n"
53-
f" - Running Hash: {running_hash}\n"
134+
if isinstance(response_or_responses, mirror_proto.ConsensusTopicResponse):
135+
response = response_or_responses
136+
if chunking_enabled and response.HasField("chunkInfo") and response.chunkInfo.total > 1:
137+
raise ValueError(
138+
"Cannot handle multi-chunk in a single response. Pass all chunk responses in a list."
139+
)
140+
return cls.of_single(response)
141+
else:
142+
if not response_or_responses:
143+
raise ValueError("Empty response list provided to from_proto().")
144+
145+
if not chunking_enabled and len(response_or_responses) == 1:
146+
return cls.of_single(response_or_responses[0])
147+
148+
return cls.of_many(response_or_responses)
149+
150+
def __str__(self):
151+
contents_str = self.contents.decode("utf-8", errors="replace")
152+
return (
153+
f"TopicMessage("
154+
f"consensus_timestamp={self.consensus_timestamp}, "
155+
f"sequence_number={self.sequence_number}, "
156+
f"contents='{contents_str[:40]}{'...' if len(contents_str) > 40 else ''}', "
157+
f"chunk_count={len(self.chunks)}, "
158+
f"transaction_id={self.transaction_id}"
159+
f")"
54160
)
55-
if self.running_hash_version:
56-
formatted_message += f" - Running Hash Version: {self.running_hash_version}\n"
57-
if self.chunk_info:
58-
formatted_message += f" - Chunk Info: {self.chunk_info}\n"
59-
if self.transaction_id:
60-
formatted_message += f" - Transaction ID: {self.transaction_id}\n"
61-
return formatted_message

src/hiero_sdk_python/query/topic_message_query.py

Lines changed: 51 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,22 @@
1-
from datetime import datetime
2-
from typing import Optional, Callable, Union
3-
import threading
41
import time
2+
import threading
3+
from datetime import datetime
4+
from typing import Optional, Callable, Union, Dict, List
55

6-
from hiero_sdk_python.consensus.topic_message import TopicMessage
76
from hiero_sdk_python.hapi.mirror import consensus_service_pb2 as mirror_proto
87
from hiero_sdk_python.hapi.services import basic_types_pb2, timestamp_pb2
98
from hiero_sdk_python.consensus.topic_id import TopicId
9+
from hiero_sdk_python.consensus.topic_message import TopicMessage
10+
from hiero_sdk_python.utils.subscription_handle import SubscriptionHandle
11+
from hiero_sdk_python import Client
1012

1113

1214
class TopicMessageQuery:
1315
"""
1416
A query to subscribe to messages from a specific HCS topic, via a mirror node.
17+
18+
If `chunking_enabled=True`, multi-chunk messages are automatically reassembled
19+
before invoking `on_message`.
1520
"""
1621

1722
def __init__(
@@ -83,10 +88,11 @@ def set_chunking_enabled(self, enabled: bool):
8388

8489
def subscribe(
8590
self,
86-
client,
91+
client: Client,
8792
on_message: Callable[[TopicMessage], None],
8893
on_error: Optional[Callable[[Exception], None]] = None,
89-
):
94+
) -> SubscriptionHandle:
95+
9096
if not self._topic_id:
9197
raise ValueError("Topic ID must be set before subscribing.")
9298
if not client.mirror_stub:
@@ -100,25 +106,61 @@ def subscribe(
100106
if self._limit is not None:
101107
request.limit = self._limit
102108

109+
subscription_handle = SubscriptionHandle()
110+
111+
pending_chunks: Dict[str, List[mirror_proto.ConsensusTopicResponse]] = {}
112+
103113
def run_stream():
104114
attempt = 0
105-
while attempt < self._max_attempts:
115+
while attempt < self._max_attempts and not subscription_handle.is_cancelled():
106116
try:
107117
message_stream = client.mirror_stub.subscribeTopic(request)
118+
108119
for response in message_stream:
109-
msg_obj = TopicMessage.from_proto(response)
110-
on_message(msg_obj)
120+
if subscription_handle.is_cancelled():
121+
return
122+
123+
if (not self._chunking_enabled
124+
or not response.HasField("chunkInfo")
125+
or response.chunkInfo.total <= 1):
126+
msg_obj = TopicMessage.of_single(response)
127+
on_message(msg_obj)
128+
continue
129+
130+
initial_tx_id = response.chunkInfo.initialTransactionID
131+
tx_id_str = (f"{initial_tx_id.shardNum}."
132+
f"{initial_tx_id.realmNum}."
133+
f"{initial_tx_id.accountNum}-"
134+
f"{initial_tx_id.transactionValidStart.seconds}."
135+
f"{initial_tx_id.transactionValidStart.nanos}")
136+
if tx_id_str not in pending_chunks:
137+
pending_chunks[tx_id_str] = []
138+
pending_chunks[tx_id_str].append(response)
139+
140+
if len(pending_chunks[tx_id_str]) == response.chunkInfo.total:
141+
chunk_list = pending_chunks.pop(tx_id_str)
142+
msg_obj = TopicMessage.of_many(chunk_list)
143+
on_message(msg_obj)
144+
111145
if self._completion_handler:
112146
self._completion_handler()
113147
return
148+
114149
except Exception as e:
150+
if subscription_handle.is_cancelled():
151+
return
152+
115153
attempt += 1
116154
if attempt >= self._max_attempts:
117155
if on_error:
118156
on_error(e)
119157
return
158+
120159
delay = min(0.5 * (2 ** (attempt - 1)), self._max_backoff)
121160
time.sleep(delay)
122161

123162
thread = threading.Thread(target=run_stream, daemon=True)
163+
subscription_handle.set_thread(thread)
124164
thread.start()
165+
166+
return subscription_handle

0 commit comments

Comments
 (0)