Skip to content

fix: deprecation warning, update poorman handshake #42

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Dec 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 111 additions & 36 deletions hivemind_bus_client/identity.py
Original file line number Diff line number Diff line change
@@ -1,101 +1,176 @@
from os.path import basename, dirname
from poorman_handshake.asymmetric.utils import export_private_key, create_private_key

from poorman_handshake.asymmetric.utils import export_RSA_key, create_RSA_key
from json_database import JsonConfigXDG
from typing import Optional


class NodeIdentity:
"""
A class representing a node's identity within a HiveMind network.

Attributes:
IDENTITY_FILE (JsonConfigXDG): A configuration file containing the node's identity information.
"""

def __init__(self, identity_file=None):
def __init__(self, identity_file: Optional[str] = None):
"""
Initialize the NodeIdentity instance with an optional identity file.

Args:
identity_file (Optional[str]): Path to a custom identity file (default: None, uses default configuration).
"""
self.IDENTITY_FILE = identity_file or JsonConfigXDG("_identity", subfolder="hivemind")

@property
def name(self):
"""human readable label, not guaranteed unique
can describe functionality, brand, capabilities or something else...
def name(self) -> str:
"""
Get or set the human-readable label for the node.

The name is not guaranteed to be unique and can describe functionality, brand, capabilities, or other attributes.

Returns:
str: The name of the node, defaulting to "unnamed-node" if not set.
"""
if not self.IDENTITY_FILE.get("name") and self.IDENTITY_FILE.get("key"):
self.IDENTITY_FILE["name"] = basename(self.IDENTITY_FILE["key"])
return self.IDENTITY_FILE.get("name") or "unnamed-node"

@name.setter
def name(self, val):
def name(self, val: str):
"""Set the name of the node."""
self.IDENTITY_FILE["name"] = val

@property
def public_key(self):
"""ASCI public PGP key"""
def public_key(self) -> Optional[str]:
"""
Get or set the public RSA key for the node.

Returns:
Optional[str]: The public RSA key, if available.
"""
return self.IDENTITY_FILE.get("public_key")

@public_key.setter
def public_key(self, val):
def public_key(self, val: str):
"""Set the public RSA key for the node."""
self.IDENTITY_FILE["public_key"] = val

@property
def private_key(self):
"""path to PRIVATE .asc PGP key, this cryptographic key
uniquely identifies this device across the hive and proves it's identity"""
def private_key(self) -> str:
"""
Get or set the path to the private RSA PEM file for the node.

The private key is used to uniquely identify the device and prove its identity within the HiveMind network.

Returns:
str: The path to the private key file.
"""
return self.IDENTITY_FILE.get("secret_key") or \
f"{dirname(self.IDENTITY_FILE.path)}/{self.name}.asc"
f"{dirname(self.IDENTITY_FILE.path)}/{self.name}.pem"

@private_key.setter
def private_key(self, val):
def private_key(self, val: str):
"""Set the path to the private RSA PEM file for the node."""
self.IDENTITY_FILE["secret_key"] = val

@property
def password(self):
"""password is used to generate a session aes key on handshake.
It should be used instead of users manually setting an encryption key.
This password can be thought as identifying a sub-hive where all devices
can connect to each other (access keys still need to be valid)"""
def password(self) -> Optional[str]:
"""
Get or set the password for the node.

The password is used to generate a session AES key during the non-RSA handshake process.

Returns:
Optional[str]: The password used for session encryption.
"""
return self.IDENTITY_FILE.get("password")

@password.setter
def password(self, val):
def password(self, val: str):
"""Set the password for the node."""
self.IDENTITY_FILE["password"] = val

@property
def access_key(self):
def access_key(self) -> Optional[str]:
"""
Get or set the access key for the node.

Returns:
Optional[str]: The access key for the node.
"""
return self.IDENTITY_FILE.get("access_key")

@access_key.setter
def access_key(self, val):
def access_key(self, val: str):
"""Set the access key for the node."""
self.IDENTITY_FILE["access_key"] = val

@property
def site_id(self):
def site_id(self) -> Optional[str]:
"""
Get or set the site ID for the node.

Returns:
Optional[str]: The site ID for the node.
"""
return self.IDENTITY_FILE.get("site_id")

@site_id.setter
def site_id(self, val):
def site_id(self, val: str):
"""Set the site ID for the node."""
self.IDENTITY_FILE["site_id"] = val

@property
def default_master(self):
def default_master(self) -> Optional[str]:
"""
Get or set the host for default master of the node.

Returns:
Optional[str]: The default master for the node.
"""
return self.IDENTITY_FILE.get("default_master")

@default_master.setter
def default_master(self, val):
def default_master(self, val: str):
"""Set the host for the default master of the node."""
self.IDENTITY_FILE["default_master"] = val

@property
def default_port(self):
def default_port(self) -> Optional[int]:
"""
Get or set the default port for the node.

Returns:
Optional[int]: The default port for the node.
"""
return self.IDENTITY_FILE.get("default_port")

@default_port.setter
def default_port(self, val):
def default_port(self, val: int):
"""Set the default port for the node."""
self.IDENTITY_FILE["default_port"] = val

def save(self):
def save(self) -> None:
"""
Save the current node identity to the identity file.
"""
self.IDENTITY_FILE.store()

def reload(self):
def reload(self) -> None:
"""
Reload the node identity from the identity file.
"""
self.IDENTITY_FILE.reload()

def create_keys(self):
key = create_private_key("HiveMindComs")
priv = f"{dirname(self.IDENTITY_FILE.path)}/HiveMindComs.asc"
export_private_key(priv, key)
pub = str(key.pubkey)
def create_keys(self) -> None:
"""
Generate a new RSA key pair (public and private) and store them in the identity file.

This method generates a new private key, stores it in a PEM file, and updates the node's public and private keys
in the identity file.
"""
secret, pub = create_RSA_key()
priv = f"{dirname(self.IDENTITY_FILE.path)}/HiveMindComs.pem"
export_RSA_key(secret, priv)
self.private_key = priv
self.public_key = pub
Comment on lines +165 to 176
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Add error handling and file permissions for key operations

The key generation implementation needs improvements for robustness and security:

  • No error handling is present for file operations in export_RSA_key and key file creation
  • File permissions for the private key file (HiveMindComs.pem) are not explicitly set
  • No existing key files in other formats were found that would require migration

Required changes:

  • Add try-except blocks around file operations in create_keys and export_RSA_key
  • Set restrictive permissions (600) for the private key file using os.chmod
  • Consider adding a check if the key file already exists before overwriting
🔗 Analysis chain

Verify key generation and storage

The key generation and storage logic has been updated to use RSA keys stored in PEM format. Please ensure:

  1. All existing key files are properly migrated
  2. Key file permissions are set appropriately
  3. Error handling for file operations is added

Let's check for any existing key file references:


Let me gather more information about the key generation and storage implementation.


Let me check one more aspect about error handling and file operations.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Look for other key file patterns that might need migration
rg -l "\.asc|\.pgp|\.gpg" 

# Check for file operation error handling
ast-grep --pattern 'export_RSA_key($_, $_)' -A 5

Length of output: 285


Script:

#!/bin/bash
# Check the implementation of create_RSA_key and export_RSA_key functions
ast-grep --pattern 'def create_RSA_key' -A 10
ast-grep --pattern 'def export_RSA_key' -A 10

# Look for any existing key file handling or migration code
rg -A 5 "HiveMindComs.pem"

# Check for file permission settings
rg -A 3 "chmod|os\.chmod"

Length of output: 447


Script:

#!/bin/bash
# Check for try-except blocks around file operations in identity.py
rg -A 5 "try:" hivemind_bus_client/identity.py

# Look for imports of os module and file operation related functions
rg "^import os|^from os" hivemind_bus_client/identity.py
rg "^from pathlib" hivemind_bus_client/identity.py

# Check the full Identity class implementation for file operation patterns
ast-grep --pattern 'class Identity' -A 10

Length of output: 240

3 changes: 1 addition & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
poorman-handshake>=0.2.2a3
poorman-handshake>=1.0.0,<2.0.0
ovos_bus_client>=0.0.6a19
ovos_utils>=0.0.38
bitstring>=4.1.1
PGPy>=0.6.0
cryptography>=41.0.1
pycryptodomex>=3.18.0
pybase64
Loading