forked from PedroBern/django-graphql-auth
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmixins.py
616 lines (511 loc) · 20.5 KB
/
mixins.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
from smtplib import SMTPException
from django.core.signing import BadSignature, SignatureExpired
from django.core.exceptions import ObjectDoesNotExist
from django.contrib.auth import get_user_model
from django.contrib.auth.forms import SetPasswordForm, PasswordChangeForm
from django.db import transaction
from django.utils.module_loading import import_string
import graphene
from graphql_jwt.exceptions import JSONWebTokenError, JSONWebTokenExpired
from graphql_jwt.decorators import token_auth
from .forms import RegisterForm, EmailForm, UpdateAccountForm, PasswordLessRegisterForm
from .bases import Output
from .models import UserStatus
from .settings import graphql_auth_settings as app_settings
from .exceptions import (
UserAlreadyVerified,
UserNotVerified,
WrongUsage,
TokenScopeError,
EmailAlreadyInUse,
InvalidCredentials,
PasswordAlreadySetError,
)
from .constants import Messages, TokenAction
from .utils import revoke_user_refresh_token, get_token_payload, using_refresh_tokens
from .shortcuts import get_user_by_email, get_user_to_login
from .signals import user_registered, user_verified
from .decorators import (
password_confirmation_required,
verification_required,
secondary_email_required,
)
UserModel = get_user_model()
if app_settings.EMAIL_ASYNC_TASK and isinstance(app_settings.EMAIL_ASYNC_TASK, str):
async_email_func = import_string(app_settings.EMAIL_ASYNC_TASK)
else:
async_email_func = None
def add_dynamic_fields(cls):
setattr(cls, "token", graphene.Field(graphene.String))
if using_refresh_tokens():
setattr(cls, "refresh_token", graphene.Field(graphene.String))
return cls
@add_dynamic_fields
class RegisterMixin(Output):
"""
Register user with fields defined in the settings.
If the email field of the user model is part of the
registration fields (default), check if there is
no user with that email or as a secondary email.
If it exists, it does not register the user,
even if the email field is not defined as unique
(default of the default django user model).
When creating the user, it also creates a `UserStatus`
related to that user, making it possible to track
if the user is archived, verified and has a secondary
email.
Send account verification email.
If allowed to not verified users login, return token.
"""
form = (
PasswordLessRegisterForm
if app_settings.ALLOW_PASSWORDLESS_REGISTRATION
else RegisterForm
)
@classmethod
@token_auth
def login_on_register(cls, root, info, **kwargs):
return cls()
@classmethod
def resolve_mutation(cls, root, info, **kwargs):
try:
with transaction.atomic():
f = cls.form(kwargs)
if f.is_valid():
email = kwargs.get(UserModel.EMAIL_FIELD, False)
UserStatus.clean_email(email)
user = f.save()
send_activation = (
app_settings.SEND_ACTIVATION_EMAIL is True and email
)
send_password_set = (
app_settings.ALLOW_PASSWORDLESS_REGISTRATION is True
and app_settings.SEND_PASSWORD_SET_EMAIL is True
and email
)
if send_activation:
# TODO CHECK FOR EMAIL ASYNC SETTING
if async_email_func:
async_email_func(user.status.send_activation_email, (info,))
else:
user.status.send_activation_email(info)
if send_password_set:
# TODO CHECK FOR EMAIL ASYNC SETTING
if async_email_func:
async_email_func(
user.status.send_password_set_email, (info,)
)
else:
user.status.send_password_set_email(info)
user_registered.send(sender=cls, user=user)
if app_settings.ALLOW_LOGIN_NOT_VERIFIED:
payload = cls.login_on_register(
root, info, password=kwargs.get("password1"), **kwargs
)
return_value = {}
for field in cls._meta.fields:
return_value[field] = getattr(payload, field)
return cls(**return_value)
return cls(success=True)
else:
return cls(success=False, errors=f.errors.get_json_data())
except EmailAlreadyInUse:
return cls(
success=False,
# if the email was set as a secondary email,
# the RegisterForm will not catch it,
# so we need to run UserStatus.clean_email(email)
errors={UserModel.EMAIL_FIELD: Messages.EMAIL_IN_USE},
)
except SMTPException:
return cls(success=False, errors=Messages.EMAIL_FAIL)
class VerifyAccountMixin(Output):
"""
Verify user account.
Receive the token that was sent by email.
If the token is valid, make the user verified
by making the `user.status.verified` field true.
"""
@classmethod
def resolve_mutation(cls, root, info, **kwargs):
try:
token = kwargs.get("token")
UserStatus.verify(token)
return cls(success=True)
except UserAlreadyVerified:
return cls(success=False, errors=Messages.ALREADY_VERIFIED)
except SignatureExpired:
return cls(success=False, errors=Messages.EXPIRED_TOKEN)
except (BadSignature, TokenScopeError):
return cls(success=False, errors=Messages.INVALID_TOKEN)
class VerifySecondaryEmailMixin(Output):
"""
Verify user secondary email.
Receive the token that was sent by email.
User is already verified when using this mutation.
If the token is valid, add the secondary email
to `user.status.secondary_email` field.
Note that until the secondary email is verified,
it has not been saved anywhere beyond the token,
so it can still be used to create a new account.
After being verified, it will no longer be available.
"""
@classmethod
def resolve_mutation(cls, root, info, **kwargs):
try:
token = kwargs.get("token")
UserStatus.verify_secondary_email(token)
return cls(success=True)
except EmailAlreadyInUse:
# while the token was sent and the user haven't
# verified, the email was free. If other account
# was created with it, it is already in use.
return cls(success=False, errors=Messages.EMAIL_IN_USE)
except SignatureExpired:
return cls(success=False, errors=Messages.EXPIRED_TOKEN)
except (BadSignature, TokenScopeError):
return cls(success=False, errors=Messages.INVALID_TOKEN)
class ResendActivationEmailMixin(Output):
"""
Sends activation email.
It is called resend because theoretically
the first activation email was sent when
the user registered.
If there is no user with the requested email,
a successful response is returned.
"""
@classmethod
def resolve_mutation(cls, root, info, **kwargs):
try:
email = kwargs.get("email")
f = EmailForm({"email": email})
if f.is_valid():
user = get_user_by_email(email)
if async_email_func:
async_email_func(user.status.resend_activation_email, (info,))
else:
user.status.resend_activation_email(info)
return cls(success=True)
return cls(success=False, errors=f.errors.get_json_data())
except ObjectDoesNotExist:
return cls(success=True) # even if user is not registred
except SMTPException:
return cls(success=False, errors=Messages.EMAIL_FAIL)
except UserAlreadyVerified:
return cls(success=False, errors={"email": Messages.ALREADY_VERIFIED})
class SendPasswordResetEmailMixin(Output):
"""
Send password reset email.
For non verified users, send an activation
email instead.
Accepts both primary and secondary email.
If there is no user with the requested email,
a successful response is returned.
"""
@classmethod
def resolve_mutation(cls, root, info, **kwargs):
try:
email = kwargs.get("email")
f = EmailForm({"email": email})
if f.is_valid():
user = get_user_by_email(email)
if async_email_func:
async_email_func(
user.status.send_password_reset_email, (info, [email])
)
else:
user.status.send_password_reset_email(info, [email])
return cls(success=True)
return cls(success=False, errors=f.errors.get_json_data())
except ObjectDoesNotExist:
return cls(success=True) # even if user is not registred
except SMTPException:
return cls(success=False, errors=Messages.EMAIL_FAIL)
except UserNotVerified:
user = get_user_by_email(email)
try:
if async_email_func:
async_email_func(user.status.resend_activation_email, (info,))
else:
user.status.resend_activation_email(info)
return cls(
success=False,
errors={"email": Messages.NOT_VERIFIED_PASSWORD_RESET},
)
except SMTPException:
return cls(success=False, errors=Messages.EMAIL_FAIL)
class PasswordResetMixin(Output):
"""
Change user password without old password.
Receive the token that was sent by email.
If token and new passwords are valid, update
user password and in case of using refresh
tokens, revoke all of them.
Also, if user has not been verified yet, verify it.
"""
form = SetPasswordForm
@classmethod
def resolve_mutation(cls, root, info, **kwargs):
try:
token = kwargs.pop("token")
payload = get_token_payload(
token,
TokenAction.PASSWORD_RESET,
app_settings.EXPIRATION_PASSWORD_RESET_TOKEN,
)
user = UserModel._default_manager.get(**payload)
f = cls.form(user, kwargs)
if f.is_valid():
revoke_user_refresh_token(user)
user = f.save()
if user.status.verified is False:
user.status.verified = True
user.status.save(update_fields=["verified"])
user_verified.send(sender=cls, user=user)
return cls(success=True)
return cls(success=False, errors=f.errors.get_json_data())
except SignatureExpired:
return cls(success=False, errors=Messages.EXPIRED_TOKEN)
except (BadSignature, TokenScopeError):
return cls(success=False, errors=Messages.INVALID_TOKEN)
class PasswordSetMixin(Output):
"""
Set user password - for passwordless registration
Receive the token that was sent by email.
If token and new passwords are valid, set
user password and in case of using refresh
tokens, revoke all of them.
Also, if user has not been verified yet, verify it.
"""
form = SetPasswordForm
@classmethod
def resolve_mutation(cls, root, info, **kwargs):
try:
token = kwargs.pop("token")
payload = get_token_payload(
token,
TokenAction.PASSWORD_SET,
app_settings.EXPIRATION_PASSWORD_SET_TOKEN,
)
user = UserModel._default_manager.get(**payload)
f = cls.form(user, kwargs)
if f.is_valid():
# Check if user has already set a password
if user.has_usable_password():
raise PasswordAlreadySetError
revoke_user_refresh_token(user)
user = f.save()
if user.status.verified is False:
user.status.verified = True
user.status.save(update_fields=["verified"])
return cls(success=True)
return cls(success=False, errors=f.errors.get_json_data())
except SignatureExpired:
return cls(success=False, errors=Messages.EXPIRED_TOKEN)
except (BadSignature, TokenScopeError):
return cls(success=False, errors=Messages.INVALID_TOKEN)
except (PasswordAlreadySetError):
return cls(success=False, errors=Messages.PASSWORD_ALREADY_SET)
class ObtainJSONWebTokenMixin(Output):
"""
Obtain JSON web token for given user.
Allow to perform login with different fields,
and secondary email if set. The fields are
defined on settings.
Not verified users can login by default. This
can be changes on settings.
If user is archived, make it unarchive and
return `unarchiving=True` on output.
"""
@classmethod
def resolve(cls, root, info, **kwargs):
unarchiving = kwargs.get("unarchiving", False)
return cls(user=info.context.user, unarchiving=unarchiving)
@classmethod
def resolve_mutation(cls, root, info, **kwargs):
if len(kwargs.items()) != 2:
raise WrongUsage(
"Must login with password and one of the following fields %s."
% (app_settings.LOGIN_ALLOWED_FIELDS)
)
try:
next_kwargs = None
USERNAME_FIELD = UserModel.USERNAME_FIELD
unarchiving = False
# extract USERNAME_FIELD to use in query
if USERNAME_FIELD in kwargs:
query_kwargs = {USERNAME_FIELD: kwargs[USERNAME_FIELD]}
next_kwargs = kwargs
password = kwargs.get("password")
else: # use what is left to query
password = kwargs.pop("password")
query_field, query_value = kwargs.popitem()
query_kwargs = {query_field: query_value}
user = get_user_to_login(**query_kwargs)
if not next_kwargs:
next_kwargs = {
"password": password,
USERNAME_FIELD: getattr(user, USERNAME_FIELD),
}
if user.status.archived is True: # unarchive on login
UserStatus.unarchive(user)
unarchiving = True
if user.status.verified or app_settings.ALLOW_LOGIN_NOT_VERIFIED:
return cls.parent_resolve(
root, info, unarchiving=unarchiving, **next_kwargs
)
if user.check_password(password):
raise UserNotVerified
raise InvalidCredentials
except (JSONWebTokenError, ObjectDoesNotExist, InvalidCredentials):
return cls(success=False, errors=Messages.INVALID_CREDENTIALS)
except UserNotVerified:
return cls(success=False, errors=Messages.NOT_VERIFIED)
class ArchiveOrDeleteMixin(Output):
@classmethod
@verification_required
@password_confirmation_required
def resolve_mutation(cls, root, info, *args, **kwargs):
user = info.context.user
cls.resolve_action(user, root=root, info=info)
return cls(success=True)
class ArchiveAccountMixin(ArchiveOrDeleteMixin):
"""
Archive account and revoke refresh tokens.
User must be verified and confirm password.
"""
@classmethod
def resolve_action(cls, user, *args, **kwargs):
UserStatus.archive(user)
revoke_user_refresh_token(user=user)
class DeleteAccountMixin(ArchiveOrDeleteMixin):
"""
Delete account permanently or make `user.is_active=False`.
The behavior is defined on settings.
Anyway user refresh tokens are revoked.
User must be verified and confirm password.
"""
@classmethod
def resolve_action(cls, user, *args, **kwargs):
if app_settings.ALLOW_DELETE_ACCOUNT:
revoke_user_refresh_token(user=user)
user.delete()
else:
user.is_active = False
user.save(update_fields=["is_active"])
revoke_user_refresh_token(user=user)
@add_dynamic_fields
class PasswordChangeMixin(Output):
"""
Change account password when user knows the old password.
A new token and refresh token are sent. User must be verified.
"""
form = PasswordChangeForm
@classmethod
@token_auth
def login_on_password_change(cls, root, info, **kwargs):
return cls()
@classmethod
@verification_required
@password_confirmation_required
def resolve_mutation(cls, root, info, **kwargs):
user = info.context.user
f = cls.form(user, kwargs)
if f.is_valid():
revoke_user_refresh_token(user)
user = f.save()
payload = cls.login_on_password_change(
root,
info,
password=kwargs.get("new_password1"),
**{user.USERNAME_FIELD: getattr(user, user.USERNAME_FIELD)}
)
return_value = {}
for field in cls._meta.fields:
return_value[field] = getattr(payload, field)
return cls(**return_value)
else:
return cls(success=False, errors=f.errors.get_json_data())
class UpdateAccountMixin(Output):
"""
Update user model fields, defined on settings.
User must be verified.
"""
form = UpdateAccountForm
@classmethod
@verification_required
def resolve_mutation(cls, root, info, **kwargs):
user = info.context.user
fields = cls.form.Meta.fields
for field in fields:
if field not in kwargs:
kwargs[field] = getattr(user, field)
f = cls.form(kwargs, instance=user)
if f.is_valid():
f.save()
return cls(success=True)
else:
return cls(success=False, errors=f.errors.get_json_data())
class VerifyOrRefreshOrRevokeTokenMixin(Output):
"""
Same as `grapgql_jwt` implementation, with standard output.
"""
@classmethod
def resolve_mutation(cls, root, info, **kwargs):
try:
return cls.parent_resolve(root, info, **kwargs)
except JSONWebTokenExpired:
return cls(success=False, errors=Messages.EXPIRED_TOKEN)
except JSONWebTokenError:
return cls(success=False, errors=Messages.INVALID_TOKEN)
class SendSecondaryEmailActivationMixin(Output):
"""
Send activation to secondary email.
User must be verified and confirm password.
"""
@classmethod
@verification_required
@password_confirmation_required
def resolve_mutation(cls, root, info, **kwargs):
try:
email = kwargs.get("email")
f = EmailForm({"email": email})
if f.is_valid():
user = info.context.user
if async_email_func:
async_email_func(
user.status.send_secondary_email_activation, (info, email)
)
else:
user.status.send_secondary_email_activation(info, email)
return cls(success=True)
return cls(success=False, errors=f.errors.get_json_data())
except EmailAlreadyInUse:
# while the token was sent and the user haven't verified,
# the email was free. If other account was created with it
# it is already in use
return cls(success=False, errors={"email": Messages.EMAIL_IN_USE})
except SMTPException:
return cls(success=False, errors=Messages.EMAIL_FAIL)
class SwapEmailsMixin(Output):
"""
Swap between primary and secondary emails.
Require password confirmation.
"""
@classmethod
@secondary_email_required
@password_confirmation_required
def resolve_mutation(cls, root, info, **kwargs):
info.context.user.status.swap_emails()
return cls(success=True)
class RemoveSecondaryEmailMixin(Output):
"""
Remove user secondary email.
Require password confirmation.
"""
@classmethod
@secondary_email_required
@password_confirmation_required
def resolve_mutation(cls, root, info, **kwargs):
info.context.user.status.remove_secondary_email()
return cls(success=True)