diff --git a/api/app.py b/api/app.py index d979377..93bcf58 100644 --- a/api/app.py +++ b/api/app.py @@ -53,8 +53,6 @@ from dataclasses import dataclass -from typing import List, Optional - from pydantic import BaseModel class DeviceBind(BaseModel): @@ -1012,12 +1010,12 @@ class ClientSSLError(Exception): from starlette.status import HTTP_401_UNAUTHORIZED, HTTP_404_NOT_FOUND from datetime import datetime import hashlib -import os from Crypto.PublicKey import RSA from Crypto.Signature import pkcs1_15 from Crypto.Hash import SHA256 -from pydantic import BaseModel -from typing import Optional +from Crypto.Cipher import PKCS1_OAEP +import binascii +from base64 import b64encode, b64decode security_basic = HTTPBasic() API_KEY_NAME = "access_token" @@ -1056,41 +1054,50 @@ def load_rsa_keys(): def generate_api_key(email: str, password: str) -> str: - """Generate a signed API key.""" - timestamp = datetime.now().strftime("%Y%m%d%H%M%S%f") - salt = os.urandom(16).hex() - payload = f"{email}:{password}:{timestamp}:{salt}" - api_key = hashlib.sha256(payload.encode()).hexdigest() + """Generate a signed and encrypted API key.""" + timestamp = datetime.utcnow().strftime("%Y%m%d%H%M%S%f") + payload = f"{email}:{password}:{timestamp}" + # Encrypt the payload + cipher = PKCS1_OAEP.new(PUBLIC_KEY) + encrypted_payload = cipher.encrypt(payload.encode()) + encrypted_hex = binascii.hexlify(encrypted_payload).decode() + + # Sign the encrypted payload + hash_obj = SHA256.new(encrypted_payload) + signature = pkcs1_15.new(PRIVATE_KEY).sign(hash_obj) + signature_hex = binascii.hexlify(signature).decode() + + return f"{encrypted_hex}:{signature_hex}" + +def decrypt_api_key(api_key: str) -> dict: + """Verify and decrypt an API key.""" + try: + encrypted_payload_hex, signature_hex = api_key.split(':') + encrypted_payload = binascii.unhexlify(encrypted_payload_hex) + signature = binascii.unhexlify(signature_hex) - # Sign the API key - key_hash = SHA256.new(api_key.encode()) - signature = pkcs1_15.new(PRIVATE_KEY).sign(key_hash) - signed_api_key = api_key + ":" + signature.hex() + # Verify the signature + hash_obj = SHA256.new(encrypted_payload) + pkcs1_15.new(PUBLIC_KEY).verify(hash_obj, signature) - return signed_api_key + # Decrypt the payload + cipher = PKCS1_OAEP.new(PRIVATE_KEY) + payload = cipher.decrypt(encrypted_payload) + email, password, timestamp = payload.decode().split(':') -def decrypt_api_key(api_key: str): - """Decrypt API key to extract the email and password.""" - try: - api_key_part, signature = api_key.rsplit(':', 1) - key_hash = SHA256.new(api_key_part.encode()) - # Verify the signature - pkcs1_15.new(PUBLIC_KEY).verify(key_hash, bytes.fromhex(signature)) - - # Decode the base string - decoded_bytes = bytes.fromhex(api_key_part) - decoded_string = decoded_bytes.decode('utf-8') # Assuming the input was UTF-8-encoded - email, password, timestamp, salt = decoded_string.split(':') - - return {"email": email, "password": password} - except ValueError: # Catches all errors related to cryptographic operations + return {"email": email, "password": password, "timestamp": timestamp} + except (ValueError, IndexError, TypeError, binascii.Error) as e: raise HTTPException(status_code=403, detail="Invalid API key") def verify_api_key(api_key: str) -> bool: try: - api_key, signature = api_key.rsplit(':', 1) - key_hash = SHA256.new(api_key.encode()) - pkcs1_15.new(PUBLIC_KEY).verify(key_hash, bytes.fromhex(signature)) + encrypted_payload_hex, signature_hex = api_key.split(':') + encrypted_payload = binascii.unhexlify(encrypted_payload_hex) + signature = binascii.unhexlify(signature_hex) + + # Verify the signature + hash_obj = SHA256.new(encrypted_payload) + pkcs1_15.new(PUBLIC_KEY).verify(hash_obj, signature) return True except Exception: return False @@ -1133,7 +1140,7 @@ async def auth(renpho: RenphoWeight = Depends(get_current_user)): return APIResponse(status="success", message="Authentication successful.") @app.get("/generate_api_key", response_model=APIResponse) -def generate_key(request: Request, email: str, password: str): +def generate_key(request: Request, email: str, password: str, renpho: RenphoWeight = Depends(get_current_user)): """Generate API key for a user.""" try: api_key = generate_api_key(email, password)