Skip to content

Commit

Permalink
fix codacy errors
Browse files Browse the repository at this point in the history
Signed-off-by: nadine.loepfe <[email protected]>
  • Loading branch information
nadineloepfe committed Feb 18, 2025
1 parent d1034ec commit 511ddfd
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 40 deletions.
64 changes: 40 additions & 24 deletions src/hiero_sdk_python/consensus/topic_message.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from datetime import datetime
from typing import Optional, List, Union
from typing import Optional, List, Union, Dict
from hiero_sdk_python.hapi.mirror import consensus_service_pb2 as mirror_proto

def _to_datetime(ts_proto) -> datetime:
Expand Down Expand Up @@ -30,16 +30,26 @@ class TopicMessage:
def __init__(
self,
consensus_timestamp: datetime,
contents: bytes,
running_hash: bytes,
sequence_number: int,
message_data: Dict[str, Union[bytes, int]],
chunks: List[TopicMessageChunk],
transaction_id: Optional[str] = None,
):
"""
Args:
consensus_timestamp: The final consensus timestamp.
message_data: Dict with required fields:
{
"contents": bytes,
"running_hash": bytes,
"sequence_number": int
}
chunks: All individual chunks that form this message.
transaction_id: The transaction ID string if available.
"""
self.consensus_timestamp = consensus_timestamp
self.contents = contents
self.running_hash = running_hash
self.sequence_number = sequence_number
self.contents = message_data["contents"]
self.running_hash = message_data["running_hash"]
self.sequence_number = message_data["sequence_number"]
self.chunks = chunks
self.transaction_id = transaction_id

Expand All @@ -52,7 +62,7 @@ def of_single(cls, response: mirror_proto.ConsensusTopicResponse) -> "TopicMessa
consensus_timestamp = chunk.consensus_timestamp
contents = response.message
running_hash = response.runningHash
sequence_number = response.sequence_number
sequence_number = chunk.sequence_number

transaction_id = None
if response.HasField("chunkInfo") and response.chunkInfo.HasField("initialTransactionID"):
Expand All @@ -64,9 +74,11 @@ def of_single(cls, response: mirror_proto.ConsensusTopicResponse) -> "TopicMessa

return cls(
consensus_timestamp,
contents,
running_hash,
sequence_number,
{
"contents": contents,
"running_hash": running_hash,
"sequence_number": sequence_number,
},
[chunk],
transaction_id
)
Expand All @@ -87,9 +99,11 @@ def of_many(cls, responses: List[mirror_proto.ConsensusTopicResponse]) -> "Topic
chunks.append(c)
total_size += len(r.message)

if (transaction_id is None
if (
transaction_id is None
and r.HasField("chunkInfo")
and r.chunkInfo.HasField("initialTransactionID")):
and r.chunkInfo.HasField("initialTransactionID")
):
tx_id = r.chunkInfo.initialTransactionID
transaction_id = (
f"{tx_id.shardNum}.{tx_id.realmNum}.{tx_id.accountNum}-"
Expand All @@ -110,9 +124,11 @@ def of_many(cls, responses: List[mirror_proto.ConsensusTopicResponse]) -> "Topic

return cls(
consensus_timestamp,
bytes(contents),
running_hash,
sequence_number,
{
"contents": bytes(contents),
"running_hash": running_hash,
"sequence_number": sequence_number,
},
chunks,
transaction_id
)
Expand All @@ -131,14 +147,7 @@ def from_proto(
If chunking is enabled and multiple chunks are detected, they are reassembled
into one combined TopicMessage. Otherwise, a single chunk is returned as-is.
"""
if isinstance(response_or_responses, mirror_proto.ConsensusTopicResponse):
response = response_or_responses
if chunking_enabled and response.HasField("chunkInfo") and response.chunkInfo.total > 1:
raise ValueError(
"Cannot handle multi-chunk in a single response. Pass all chunk responses in a list."
)
return cls.of_single(response)
else:
if not isinstance(response_or_responses, mirror_proto.ConsensusTopicResponse):
if not response_or_responses:
raise ValueError("Empty response list provided to from_proto().")

Expand All @@ -147,6 +156,13 @@ def from_proto(

return cls.of_many(response_or_responses)

response = response_or_responses
if chunking_enabled and response.HasField("chunkInfo") and response.chunkInfo.total > 1:
raise ValueError(
"Cannot handle multi-chunk in a single response. Pass all chunk responses in a list."
)
return cls.of_single(response)

def __str__(self):
contents_str = self.contents.decode("utf-8", errors="replace")
return (
Expand Down
50 changes: 34 additions & 16 deletions src/hiero_sdk_python/crypto/private_key.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from cryptography.hazmat.primitives.asymmetric import ed25519, ec
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.backends import default_backend
from typing import Union
from typing import Optional, Union
from hiero_sdk_python.crypto.public_key import PublicKey

class PrivateKey:
Expand Down Expand Up @@ -60,30 +60,48 @@ def from_bytes(cls, key_bytes: bytes) -> "PrivateKey":
If the key is DER-encoded, tries to parse Ed25519 vs ECDSA.
"""
if len(key_bytes) == 32:
try:
ed_priv = ed25519.Ed25519PrivateKey.from_private_bytes(key_bytes)
ed_priv = cls._try_load_ed25519(key_bytes)
if ed_priv:
return cls(ed_priv)
except Exception:
pass

try:
private_int = int.from_bytes(key_bytes, "big")
ec_priv = ec.derive_private_key(private_int, ec.SECP256K1(), default_backend())
ec_priv = cls._try_load_ecdsa(key_bytes)
if ec_priv:
return cls(ec_priv)
except Exception:
pass

der_key = cls._try_load_der(key_bytes)
if der_key:
return cls(der_key)

raise ValueError("Failed to load private key from bytes.")

@staticmethod
def _try_load_ed25519(key_bytes: bytes) -> Optional[ed25519.Ed25519PrivateKey]:
try:
return ed25519.Ed25519PrivateKey.from_private_bytes(key_bytes)
except Exception:
return None

@staticmethod
def _try_load_ecdsa(key_bytes: bytes) -> Optional[ec.EllipticCurvePrivateKey]:
try:
private_int = int.from_bytes(key_bytes, "big")
return ec.derive_private_key(private_int, ec.SECP256K1(), default_backend())
except Exception:
return None

@staticmethod
def _try_load_der(key_bytes: bytes) -> Optional[Union[ed25519.Ed25519PrivateKey, ec.EllipticCurvePrivateKey]]:
try:
private_key = serialization.load_der_private_key(key_bytes, password=None)
if isinstance(private_key, ed25519.Ed25519PrivateKey):
return cls(private_key)
return private_key
if isinstance(private_key, ec.EllipticCurvePrivateKey):
if not isinstance(private_key.curve, ec.SECP256K1):
raise ValueError("Only secp256k1 ECDSA is supported.")
return cls(private_key)
raise ValueError("Unsupported private key type.")
except Exception as e:
raise ValueError(f"Failed to load private key (DER): {e}")
return private_key
return None
except Exception:
return None

def sign(self, data: bytes) -> bytes:
return self._private_key.sign(data)
Expand Down Expand Up @@ -130,4 +148,4 @@ def is_ecdsa(self) -> bool:
def __repr__(self):
if self.is_ed25519():
return f"<PrivateKey (Ed25519) hex={self.to_string_raw()}>"
return f"<PrivateKey (ECDSA) hex={self.to_string_raw()}>"
return f"<PrivateKey (ECDSA) hex={self.to_string_raw()}>"

0 comments on commit 511ddfd

Please sign in to comment.