forked from superdesk/newsroom-core
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
af9573d
commit 6322856
Showing
2 changed files
with
53 additions
and
76 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,104 +1,80 @@ | ||
import base64 | ||
import hmac | ||
import hashlib | ||
import json | ||
|
||
from typing import Any | ||
|
||
from superdesk.utc import utcnow | ||
from superdesk.core import get_app_config | ||
from superdesk.core.auth.token_auth import TokenAuthorization | ||
from superdesk.core.types import Request | ||
from superdesk import get_resource_service | ||
from superdesk.errors import SuperdeskApiError | ||
from superdesk.core.auth.user_auth import UserAuthProtocol | ||
from authlib.jose import jwt | ||
from authlib.jose.errors import BadSignatureError, ExpiredTokenError, DecodeError | ||
from superdesk.core import get_app_config | ||
from time import time | ||
import logging | ||
from newsroom.auth.utils import get_current_request | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
class JWTTokenAuth(UserAuthProtocol): | ||
|
||
class JWTTokenAuth(TokenAuthorization): | ||
""" | ||
Implements Async JWT authentication by extending UserAuthProtocol. | ||
Implements Async JWT authentication by extending the new async TokenAuthorization. | ||
""" | ||
|
||
@staticmethod | ||
def _decode_token(token: str) -> dict[str, Any]: | ||
""" | ||
Decodes a JWT token uses manual base64 decoding. | ||
""" | ||
try: | ||
header_b64, payload_b64, signature_b64 = token.split(".") | ||
payload = json.loads(base64.urlsafe_b64decode(payload_b64 + "==").decode()) | ||
|
||
return payload | ||
except Exception: | ||
raise SuperdeskApiError.unauthorizedError() | ||
|
||
@staticmethod | ||
def _verify_signature(token: str, secret: str) -> bool: | ||
def get_token_from_request(self, request: Request) -> str | None: | ||
""" | ||
Verifies the JWT signature manually using HMAC-SHA256. | ||
Extracts the token from `Authorization` header. | ||
""" | ||
try: | ||
header_b64, payload_b64, signature_b64 = token.split(".") | ||
unsigned_token = f"{header_b64}.{payload_b64}" | ||
auth = (request.get_header("Authorization") or "").strip() | ||
if auth.lower().startswith(("token", "bearer", "basic")): | ||
return auth.split(" ")[1] if " " in auth else None | ||
return auth if auth else None | ||
|
||
expected_signature = ( | ||
base64.urlsafe_b64encode(hmac.new(secret.encode(), unsigned_token.encode(), hashlib.sha256).digest()) | ||
.decode() | ||
.strip("=") | ||
) | ||
|
||
return hmac.compare_digest(expected_signature, signature_b64) | ||
except Exception: | ||
return False | ||
|
||
async def authenticate(self, request: Request): | ||
def authenticate(self, request: Request = None): | ||
""" | ||
Extracts the JWT token, verifies it, and starts a session. | ||
Validates the JWT token and authenticates the user. | ||
""" | ||
token = request.get_header("Authorization") | ||
if token: | ||
token = token.strip() | ||
if token.lower().startswith(("token", "bearer")): | ||
token = token.split(" ")[1] if " " in token else "" | ||
else: | ||
token = request.storage.session.get("session_token") | ||
|
||
if request is None: | ||
request = get_current_request() | ||
token = self.get_token_from_request(request) | ||
if not token: | ||
await self.stop_session(request) | ||
logger.warning("Missing Authorization token") | ||
raise SuperdeskApiError.unauthorizedError() | ||
|
||
secret = get_app_config("AUTH_SERVER_SHARED_SECRET") | ||
|
||
if not secret: | ||
logger.warning("AUTH_SERVER_SHARED_SECRET is not configured in default settings") | ||
raise SuperdeskApiError.unauthorizedError() | ||
|
||
if not self._verify_signature(token, secret): | ||
raise SuperdeskApiError.unauthorizedError() | ||
|
||
payload = self._decode_token(token) | ||
exp = payload.get("exp") | ||
|
||
if exp and utcnow().timestamp() > exp: | ||
try: | ||
decoded_jwt = jwt.decode(token, key=secret) | ||
decoded_jwt.validate_exp(now=int(time()), leeway=0) | ||
except (BadSignatureError, ExpiredTokenError, DecodeError) as e: | ||
logger.error(f"JWT authentication failed: {e}") | ||
raise SuperdeskApiError.unauthorizedError() | ||
|
||
await self.start_session(request, payload) | ||
self.start_session(request, decoded_jwt) | ||
|
||
async def start_session(self, request: Request, payload: dict[str, Any]): | ||
def start_session(self, request: Request, token_data: dict): | ||
""" | ||
Starts a session and stores the user data. | ||
Starts a session by storing token data. | ||
""" | ||
user_id = payload.get("client_id") | ||
user_service = get_resource_service("users") | ||
user = user_service.find_one(req=None, _id=user_id) | ||
|
||
if not user: | ||
raise SuperdeskApiError.unauthorizedError() | ||
|
||
request.storage.request.set("auth_token", payload) | ||
request.storage.request.set("user_id", user_id) | ||
request.storage.request.set("user", user) | ||
request.storage.request.set("auth_token", token_data) | ||
request.storage.request.set("user_id", token_data.get("client_id")) | ||
|
||
def get_current_user(self, request: Request) -> dict[str, Any] | None: | ||
def get_current_user(self, request: Request): | ||
""" | ||
Retrieves the current user from the session. | ||
""" | ||
return request.storage.request.get("user") | ||
return request.storage.request.get("user_id") | ||
|
||
def authorized(self, allowed_roles, resource, method) -> bool: | ||
""" | ||
Checks if the request is authorized by validating the token. | ||
""" | ||
request = get_current_request() | ||
token = self.get_token_from_request(request) | ||
if not token: | ||
logger.warning("No token found in request") | ||
return False | ||
|
||
try: | ||
self.authenticate(request) | ||
return True | ||
except SuperdeskApiError: | ||
return False # Return False instead of raising an error |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters