15
15
Module for internal utilities.
16
16
"""
17
17
18
+ import base64
18
19
import os as _os
19
20
import json as _json
21
+ import re as _re
20
22
import typing as _typing
21
23
import dataclasses as _dataclasses
22
24
import datetime as _dt
29
31
P = _typing .ParamSpec ("P" )
30
32
R = _typing .TypeVar ("R" )
31
33
34
+ JWT_REGEX = _re .compile (
35
+ r"^[a-zA-Z0-9\-_=]+?\.[a-zA-Z0-9\-_=]+?\.([a-zA-Z0-9\-_=]+)?$" )
36
+
32
37
33
38
class Sentinel :
34
39
"""Internal class for RESET_VALUE."""
@@ -204,9 +209,13 @@ def as_dict(self) -> dict:
204
209
205
210
206
211
def _on_call_check_auth_token (
207
- request : _Request
212
+ request : _Request ,
213
+ verify_token : bool = True ,
208
214
) -> None | _typing .Literal [OnCallTokenState .INVALID ] | dict [str , _typing .Any ]:
209
- """Validates the auth token in a callable request."""
215
+ """
216
+ Validates the auth token in a callable request.
217
+ If verify_token is False, the token will be decoded without verification.
218
+ """
210
219
authorization = request .headers .get ("Authorization" )
211
220
if authorization is None :
212
221
return None
@@ -215,13 +224,15 @@ def _on_call_check_auth_token(
215
224
return OnCallTokenState .INVALID
216
225
try :
217
226
id_token = authorization .replace ("Bearer " , "" )
218
- auth_token = _auth .verify_id_token (id_token )
227
+ if verify_token :
228
+ auth_token = _auth .verify_id_token (id_token )
229
+ else :
230
+ auth_token = _unsafe_decode_id_token (id_token )
219
231
return auth_token
220
232
# pylint: disable=broad-except
221
233
except Exception as err :
222
234
_logging .error (f"Error validating token: { err } " )
223
235
return OnCallTokenState .INVALID
224
- return OnCallTokenState .INVALID
225
236
226
237
227
238
def _on_call_check_app_token (
@@ -240,23 +251,44 @@ def _on_call_check_app_token(
240
251
return OnCallTokenState .INVALID
241
252
242
253
243
- def on_call_check_tokens (request : _Request ,) -> _OnCallTokenVerification :
254
+ def _unsafe_decode_id_token (token : str ):
255
+ # Check if the token matches the JWT pattern
256
+ if not JWT_REGEX .match (token ):
257
+ return {}
258
+
259
+ # Split the token by '.' and decode each component from base64
260
+ components = [base64 .urlsafe_b64decode (f"{ s } ==" ) for s in token .split ("." )]
261
+
262
+ # Attempt to parse the payload (second component) as JSON
263
+ payload = components [1 ].decode ("utf-8" )
264
+ try :
265
+ payload = _json .loads (payload )
266
+ except _json .JSONDecodeError :
267
+ # If there's an error during parsing, ignore it and return the payload as is
268
+ pass
269
+
270
+ return payload
271
+
272
+
273
+ def on_call_check_tokens (request : _Request ,
274
+ verify_token : bool = True ) -> _OnCallTokenVerification :
244
275
"""Check tokens"""
245
276
verifications = _OnCallTokenVerification ()
246
277
247
- auth_token = _on_call_check_auth_token (request )
278
+ auth_token = _on_call_check_auth_token (request , verify_token = verify_token )
248
279
if auth_token is None :
249
280
verifications .auth = OnCallTokenState .MISSING
250
281
elif isinstance (auth_token , dict ):
251
282
verifications .auth = OnCallTokenState .VALID
252
283
verifications .auth_token = auth_token
253
284
254
- app_token = _on_call_check_app_token (request )
255
- if app_token is None :
256
- verifications .app = OnCallTokenState .MISSING
257
- elif isinstance (app_token , dict ):
258
- verifications .app = OnCallTokenState .VALID
259
- verifications .app_token = app_token
285
+ if verify_token :
286
+ app_token = _on_call_check_app_token (request )
287
+ if app_token is None :
288
+ verifications .app = OnCallTokenState .MISSING
289
+ elif isinstance (app_token , dict ):
290
+ verifications .app = OnCallTokenState .VALID
291
+ verifications .app_token = app_token
260
292
261
293
log_payload = {
262
294
** verifications .as_dict (),
@@ -266,7 +298,7 @@ def on_call_check_tokens(request: _Request,) -> _OnCallTokenVerification:
266
298
}
267
299
268
300
errs = []
269
- if verifications .app == OnCallTokenState .INVALID :
301
+ if verify_token and verifications .app == OnCallTokenState .INVALID :
270
302
errs .append (("AppCheck token was rejected." , log_payload ))
271
303
272
304
if verifications .auth == OnCallTokenState .INVALID :
0 commit comments