From 511ddfd55925a38e47fd7fd652b7ff97392c8078 Mon Sep 17 00:00:00 2001 From: "nadine.loepfe" Date: Tue, 18 Feb 2025 15:40:00 +0100 Subject: [PATCH] fix codacy errors Signed-off-by: nadine.loepfe --- .../consensus/topic_message.py | 64 ++++++++++++------- src/hiero_sdk_python/crypto/private_key.py | 50 ++++++++++----- 2 files changed, 74 insertions(+), 40 deletions(-) diff --git a/src/hiero_sdk_python/consensus/topic_message.py b/src/hiero_sdk_python/consensus/topic_message.py index 61139be..35f6196 100644 --- a/src/hiero_sdk_python/consensus/topic_message.py +++ b/src/hiero_sdk_python/consensus/topic_message.py @@ -1,5 +1,5 @@ from datetime import datetime -from typing import Optional, List, Union +from typing import Optional, List, Union, Dict from hiero_sdk_python.hapi.mirror import consensus_service_pb2 as mirror_proto def _to_datetime(ts_proto) -> datetime: @@ -30,16 +30,26 @@ class TopicMessage: def __init__( self, consensus_timestamp: datetime, - contents: bytes, - running_hash: bytes, - sequence_number: int, + message_data: Dict[str, Union[bytes, int]], chunks: List[TopicMessageChunk], transaction_id: Optional[str] = None, ): + """ + Args: + consensus_timestamp: The final consensus timestamp. + message_data: Dict with required fields: + { + "contents": bytes, + "running_hash": bytes, + "sequence_number": int + } + chunks: All individual chunks that form this message. + transaction_id: The transaction ID string if available. + """ self.consensus_timestamp = consensus_timestamp - self.contents = contents - self.running_hash = running_hash - self.sequence_number = sequence_number + self.contents = message_data["contents"] + self.running_hash = message_data["running_hash"] + self.sequence_number = message_data["sequence_number"] self.chunks = chunks self.transaction_id = transaction_id @@ -52,7 +62,7 @@ def of_single(cls, response: mirror_proto.ConsensusTopicResponse) -> "TopicMessa consensus_timestamp = chunk.consensus_timestamp contents = response.message running_hash = response.runningHash - sequence_number = response.sequence_number + sequence_number = chunk.sequence_number transaction_id = None if response.HasField("chunkInfo") and response.chunkInfo.HasField("initialTransactionID"): @@ -64,9 +74,11 @@ def of_single(cls, response: mirror_proto.ConsensusTopicResponse) -> "TopicMessa return cls( consensus_timestamp, - contents, - running_hash, - sequence_number, + { + "contents": contents, + "running_hash": running_hash, + "sequence_number": sequence_number, + }, [chunk], transaction_id ) @@ -87,9 +99,11 @@ def of_many(cls, responses: List[mirror_proto.ConsensusTopicResponse]) -> "Topic chunks.append(c) total_size += len(r.message) - if (transaction_id is None + if ( + transaction_id is None and r.HasField("chunkInfo") - and r.chunkInfo.HasField("initialTransactionID")): + and r.chunkInfo.HasField("initialTransactionID") + ): tx_id = r.chunkInfo.initialTransactionID transaction_id = ( f"{tx_id.shardNum}.{tx_id.realmNum}.{tx_id.accountNum}-" @@ -110,9 +124,11 @@ def of_many(cls, responses: List[mirror_proto.ConsensusTopicResponse]) -> "Topic return cls( consensus_timestamp, - bytes(contents), - running_hash, - sequence_number, + { + "contents": bytes(contents), + "running_hash": running_hash, + "sequence_number": sequence_number, + }, chunks, transaction_id ) @@ -131,14 +147,7 @@ def from_proto( If chunking is enabled and multiple chunks are detected, they are reassembled into one combined TopicMessage. Otherwise, a single chunk is returned as-is. """ - if isinstance(response_or_responses, mirror_proto.ConsensusTopicResponse): - response = response_or_responses - if chunking_enabled and response.HasField("chunkInfo") and response.chunkInfo.total > 1: - raise ValueError( - "Cannot handle multi-chunk in a single response. Pass all chunk responses in a list." - ) - return cls.of_single(response) - else: + if not isinstance(response_or_responses, mirror_proto.ConsensusTopicResponse): if not response_or_responses: raise ValueError("Empty response list provided to from_proto().") @@ -147,6 +156,13 @@ def from_proto( return cls.of_many(response_or_responses) + response = response_or_responses + if chunking_enabled and response.HasField("chunkInfo") and response.chunkInfo.total > 1: + raise ValueError( + "Cannot handle multi-chunk in a single response. Pass all chunk responses in a list." + ) + return cls.of_single(response) + def __str__(self): contents_str = self.contents.decode("utf-8", errors="replace") return ( diff --git a/src/hiero_sdk_python/crypto/private_key.py b/src/hiero_sdk_python/crypto/private_key.py index 937fa21..1832cf3 100644 --- a/src/hiero_sdk_python/crypto/private_key.py +++ b/src/hiero_sdk_python/crypto/private_key.py @@ -1,7 +1,7 @@ from cryptography.hazmat.primitives.asymmetric import ed25519, ec from cryptography.hazmat.primitives import serialization from cryptography.hazmat.backends import default_backend -from typing import Union +from typing import Optional, Union from hiero_sdk_python.crypto.public_key import PublicKey class PrivateKey: @@ -60,30 +60,48 @@ def from_bytes(cls, key_bytes: bytes) -> "PrivateKey": If the key is DER-encoded, tries to parse Ed25519 vs ECDSA. """ if len(key_bytes) == 32: - try: - ed_priv = ed25519.Ed25519PrivateKey.from_private_bytes(key_bytes) + ed_priv = cls._try_load_ed25519(key_bytes) + if ed_priv: return cls(ed_priv) - except Exception: - pass - try: - private_int = int.from_bytes(key_bytes, "big") - ec_priv = ec.derive_private_key(private_int, ec.SECP256K1(), default_backend()) + ec_priv = cls._try_load_ecdsa(key_bytes) + if ec_priv: return cls(ec_priv) - except Exception: - pass + der_key = cls._try_load_der(key_bytes) + if der_key: + return cls(der_key) + + raise ValueError("Failed to load private key from bytes.") + + @staticmethod + def _try_load_ed25519(key_bytes: bytes) -> Optional[ed25519.Ed25519PrivateKey]: + try: + return ed25519.Ed25519PrivateKey.from_private_bytes(key_bytes) + except Exception: + return None + + @staticmethod + def _try_load_ecdsa(key_bytes: bytes) -> Optional[ec.EllipticCurvePrivateKey]: + try: + private_int = int.from_bytes(key_bytes, "big") + return ec.derive_private_key(private_int, ec.SECP256K1(), default_backend()) + except Exception: + return None + + @staticmethod + def _try_load_der(key_bytes: bytes) -> Optional[Union[ed25519.Ed25519PrivateKey, ec.EllipticCurvePrivateKey]]: try: private_key = serialization.load_der_private_key(key_bytes, password=None) if isinstance(private_key, ed25519.Ed25519PrivateKey): - return cls(private_key) + return private_key if isinstance(private_key, ec.EllipticCurvePrivateKey): if not isinstance(private_key.curve, ec.SECP256K1): raise ValueError("Only secp256k1 ECDSA is supported.") - return cls(private_key) - raise ValueError("Unsupported private key type.") - except Exception as e: - raise ValueError(f"Failed to load private key (DER): {e}") + return private_key + return None + except Exception: + return None def sign(self, data: bytes) -> bytes: return self._private_key.sign(data) @@ -130,4 +148,4 @@ def is_ecdsa(self) -> bool: def __repr__(self): if self.is_ed25519(): return f"" - return f"" + return f"" \ No newline at end of file