|
| 1 | +import json |
1 | 2 | import logging
|
| 3 | +import sys |
2 | 4 | from abc import ABC, abstractmethod
|
| 5 | +from functools import lru_cache |
3 | 6 | from pathlib import Path
|
4 | 7 | from typing import Dict, Optional
|
5 | 8 |
|
6 | 9 | from coincurve.keys import PrivateKey
|
| 10 | +from rich.prompt import Console, Prompt, Text |
7 | 11 | from typing_extensions import deprecated
|
| 12 | +from web3 import Web3 |
8 | 13 |
|
9 | 14 | from aleph.sdk.conf import settings
|
10 | 15 | from aleph.sdk.utils import enum_as_str
|
@@ -143,22 +148,145 @@ async def decrypt(self, content: bytes) -> bytes:
|
143 | 148 | raise NotImplementedError
|
144 | 149 |
|
145 | 150 |
|
146 |
| -# Start of the ugly stuff |
147 | 151 | def generate_key() -> bytes:
|
| 152 | + """ |
| 153 | + Generate a new private key. |
| 154 | +
|
| 155 | + Returns: |
| 156 | + bytes: The generated private key as bytes. |
| 157 | + """ |
| 158 | + |
148 | 159 | privkey = PrivateKey()
|
149 | 160 | return privkey.secret
|
150 | 161 |
|
151 | 162 |
|
| 163 | +def create_or_import_key() -> bytes: |
| 164 | + """ |
| 165 | + Create or import a private key. |
| 166 | +
|
| 167 | + This function allows the user to either import an existing private key |
| 168 | + or generate a new one. If the user chooses to import a key, they can |
| 169 | + enter a private key in hexadecimal format or a mnemonic seed phrase. |
| 170 | +
|
| 171 | + Returns: |
| 172 | + bytes: The private key as bytes. |
| 173 | + """ |
| 174 | + if Prompt.ask("Import an existing wallet", choices=["y", "n"], default="n") == "y": |
| 175 | + data = Prompt.ask("Enter your private key or mnemonic seed phrase") |
| 176 | + # private key |
| 177 | + if data.startswith("0x"): |
| 178 | + data = data[2:] |
| 179 | + if len(data) == 64: |
| 180 | + return bytes.fromhex(data) |
| 181 | + # mnemonic seed phrase |
| 182 | + elif len(data.split()) in [12, 24]: |
| 183 | + w3 = Web3() |
| 184 | + w3.eth.account.enable_unaudited_hdwallet_features() |
| 185 | + return w3.eth.account.from_mnemonic(data.strip()).key |
| 186 | + else: |
| 187 | + raise ValueError("Invalid private key or mnemonic seed phrase") |
| 188 | + else: |
| 189 | + return generate_key() |
| 190 | + |
| 191 | + |
| 192 | +def save_key(private_key: bytes, path: Path): |
| 193 | + """ |
| 194 | + Save a private key to a file. |
| 195 | +
|
| 196 | + Parameters: |
| 197 | + private_key (bytes): The private key as bytes. |
| 198 | + path (Path): The path to the private key file. |
| 199 | +
|
| 200 | + Returns: |
| 201 | + None |
| 202 | + """ |
| 203 | + w3 = Web3() |
| 204 | + address = None |
| 205 | + path.parent.mkdir(exist_ok=True, parents=True) |
| 206 | + if path.name.endswith(".key") or "pytest" in sys.modules: |
| 207 | + address = w3.to_checksum_address(w3.eth.account.from_key(private_key).address) |
| 208 | + path.write_bytes(private_key) |
| 209 | + elif path.name.endswith(".json"): |
| 210 | + address = w3.to_checksum_address(w3.eth.account.from_key(private_key).address) |
| 211 | + password = Prompt.ask( |
| 212 | + "Enter a password to encrypt your keystore", password=True |
| 213 | + ) |
| 214 | + keystore = w3.eth.account.encrypt(private_key, password) |
| 215 | + path.write_text(json.dumps(keystore)) |
| 216 | + else: |
| 217 | + raise ValueError("Unsupported private key file format") |
| 218 | + confirmation = Text.assemble( |
| 219 | + "\nYour address: ", |
| 220 | + Text(address, style="cyan"), |
| 221 | + "\nSaved file: ", |
| 222 | + Text(str(path), style="orange1"), |
| 223 | + "\n", |
| 224 | + ) |
| 225 | + Console().print(confirmation) |
| 226 | + |
| 227 | + |
| 228 | +@lru_cache(maxsize=1) |
| 229 | +def load_key(private_key_path: Path) -> bytes: |
| 230 | + """ |
| 231 | + Load a private key from a file. |
| 232 | +
|
| 233 | + This function supports two types of private key files: |
| 234 | + 1. Unencrypted .key files. |
| 235 | + 2. Encrypted .json keystore files. |
| 236 | +
|
| 237 | + Parameters: |
| 238 | + private_key_path (Path): The path to the private key file. |
| 239 | +
|
| 240 | + Returns: |
| 241 | + bytes: The private key as bytes. |
| 242 | +
|
| 243 | + Raises: |
| 244 | + FileNotFoundError: If the private key file does not exist. |
| 245 | + ValueError: If the private key file is not a .key or .json file. |
| 246 | + """ |
| 247 | + if not private_key_path.exists(): |
| 248 | + raise FileNotFoundError("Private key file not found") |
| 249 | + elif private_key_path.name.endswith(".key"): |
| 250 | + return private_key_path.read_bytes() |
| 251 | + elif private_key_path.name.endswith(".json"): |
| 252 | + keystore = private_key_path.read_text() |
| 253 | + password = Prompt.ask("Keystore password", password=True) |
| 254 | + try: |
| 255 | + return Web3().eth.account.decrypt(keystore, password) |
| 256 | + except ValueError: |
| 257 | + raise ValueError("Invalid password") |
| 258 | + else: |
| 259 | + raise ValueError("Unsupported private key file format") |
| 260 | + |
| 261 | + |
152 | 262 | def get_fallback_private_key(path: Optional[Path] = None) -> bytes:
|
| 263 | + """ |
| 264 | + Retrieve or create a fallback private key. |
| 265 | +
|
| 266 | + This function attempts to load a private key from the specified path. |
| 267 | + If the path is not provided, it defaults to the path specified in the |
| 268 | + settings. If the file does not exist or is empty, a new private key |
| 269 | + is generated and saved to the specified path. A symlink is also created |
| 270 | + to use this key by default. |
| 271 | +
|
| 272 | + Parameters: |
| 273 | + path (Optional[Path]): The path to the private key file. If not provided, |
| 274 | + the default path from settings is used. |
| 275 | +
|
| 276 | + Returns: |
| 277 | + bytes: The private key as bytes. |
| 278 | + """ |
153 | 279 | path = path or settings.PRIVATE_KEY_FILE
|
154 | 280 | private_key: bytes
|
155 | 281 | if path.exists() and path.stat().st_size > 0:
|
156 |
| - private_key = path.read_bytes() |
| 282 | + private_key = load_key(path) |
157 | 283 | else:
|
158 |
| - private_key = generate_key() |
159 |
| - path.parent.mkdir(exist_ok=True, parents=True) |
160 |
| - path.write_bytes(private_key) |
161 |
| - |
| 284 | + private_key = ( |
| 285 | + generate_key() |
| 286 | + if path.name.endswith(".key") or "pytest" in sys.modules |
| 287 | + else create_or_import_key() |
| 288 | + ) |
| 289 | + save_key(private_key, path) |
162 | 290 | default_key_path = path.parent / "default.key"
|
163 | 291 |
|
164 | 292 | # If the symlink exists but does not point to a file, delete it.
|
|
0 commit comments