Skip to content

Commit

Permalink
Connect/Disconnect PeriodicTasks signal on bulk actions
Browse files Browse the repository at this point in the history
  • Loading branch information
noliveleger committed Oct 28, 2024
1 parent cac4be3 commit af4c3fd
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 49 deletions.
71 changes: 42 additions & 29 deletions kobo/apps/trash_bin/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from django.conf import settings
from django.contrib.auth import get_user_model
from django.db import transaction
from django.db.models.signals import post_delete
from django.db.models.signals import pre_delete, post_delete
from django.utils.timezone import now
from django_celery_beat.models import (
ClockedSchedule,
Expand Down Expand Up @@ -205,8 +205,6 @@ def empty_project(project_trash_id: int):

@task_failure.connect(sender=empty_account)
def empty_account_failure(sender=None, **kwargs):
# Force scheduler to refresh
PeriodicTasks.update_changed()

exception = kwargs['exception']
account_trash_id = kwargs['args'][0]
Expand Down Expand Up @@ -234,8 +232,6 @@ def empty_account_retry(sender=None, **kwargs):

@task_failure.connect(sender=empty_project)
def empty_project_failure(sender=None, **kwargs):
# Force scheduler to refresh
PeriodicTasks.update_changed()

exception = kwargs['exception']
project_trash_id = kwargs['args'][0]
Expand Down Expand Up @@ -263,27 +259,44 @@ def empty_project_retry(sender=None, **kwargs):

@celery_app.task
def garbage_collector():
with transaction.atomic():
# Remove orphan periodic tasks
PeriodicTask.objects.exclude(
pk__in=AccountTrash.objects.values_list(
'periodic_task_id', flat=True
),
).filter(
name__startswith=DELETE_USER_STR_PREFIX, clocked__isnull=False
).delete()

PeriodicTask.objects.exclude(
pk__in=ProjectTrash.objects.values_list(
'periodic_task_id', flat=True
),
).filter(
name__startswith=DELETE_PROJECT_STR_PREFIX, clocked__isnull=False
).delete()

# Then, remove clocked schedules
ClockedSchedule.objects.exclude(
pk__in=PeriodicTask.objects.filter(
clocked__isnull=False
).values_list('clocked_id', flat=True),
).delete()
deleted_tasks = 0
deleted_users = 0
deleted_projects = 0

try:
# Disconnect `PeriodicTasks` signals before performing bulk deletions
pre_delete.disconnect(PeriodicTasks.changed, sender=PeriodicTask)
post_delete.disconnect(PeriodicTasks.update_changed, sender=ClockedSchedule)

with transaction.atomic():

# Remove orphan periodic tasks
deleted_users, _ = PeriodicTask.objects.exclude(
pk__in=AccountTrash.objects.values_list(
'periodic_task_id', flat=True
),
).filter(
name__startswith=DELETE_USER_STR_PREFIX, clocked__isnull=False
).delete()

deleted_projects, _ = PeriodicTask.objects.exclude(
pk__in=ProjectTrash.objects.values_list(
'periodic_task_id', flat=True
),
).filter(
name__startswith=DELETE_PROJECT_STR_PREFIX, clocked__isnull=False
).delete()

# Then, remove clocked schedules
deleted_tasks, _ = ClockedSchedule.objects.exclude(
pk__in=PeriodicTask.objects.filter(
clocked__isnull=False
).values_list('clocked_id', flat=True),
).delete()
finally:
# Reconnect `PeriodicTasks` signals after performing bulk deletions
pre_delete.connect(PeriodicTasks.changed, sender=PeriodicTask)
post_delete.connect(PeriodicTasks.update_changed, sender=ClockedSchedule)

if deleted_users + deleted_projects + deleted_tasks:
PeriodicTasks.update_changed()
52 changes: 32 additions & 20 deletions kobo/apps/trash_bin/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from django.conf import settings
from django.db import IntegrityError, models, transaction
from django.db.models import F, Q
from django.db.models.signals import pre_delete
from django.db.models.signals import post_delete, post_save, pre_delete, pre_save
from django.utils.timezone import now
from django_celery_beat.models import (
ClockedSchedule,
Expand Down Expand Up @@ -103,9 +103,6 @@ def move_to_trash(
username and primary key is retained after deleting all other data.
"""

clocked_time = now() + timedelta(days=grace_period)
clocked = ClockedSchedule.objects.create(clocked_time=clocked_time)

(
trash_model,
fk_field_name,
Expand Down Expand Up @@ -158,24 +155,37 @@ def move_to_trash(
)

try:
# Disconnect signals before bulk-creating periodic tasks
pre_save.disconnect(PeriodicTasks.changed, sender=PeriodicTask)
post_save.disconnect(PeriodicTasks.update_changed, sender=ClockedSchedule)

clocked_time = now() + timedelta(days=grace_period)
clocked = ClockedSchedule.objects.create(clocked_time=clocked_time)
trash_model.objects.bulk_create(trash_objects)
try:
periodic_tasks = PeriodicTask.objects.bulk_create(
[
PeriodicTask(
clocked=clocked,
name=task_name_placeholder.format(**ato.metadata),
task=f'kobo.apps.trash_bin.tasks.{task}',
args=json.dumps([ato.id]),
one_off=True,
enabled=not empty_manually,
)
for ato in trash_objects
],
)

periodic_tasks = PeriodicTask.objects.bulk_create(
[
PeriodicTask(
clocked=clocked,
name=task_name_placeholder.format(**ato.metadata),
task=f'kobo.apps.trash_bin.tasks.{task}',
args=json.dumps([ato.id]),
one_off=True,
enabled=not empty_manually,
)
for ato in trash_objects
],
)
except IntegrityError:
raise TrashIntegrityError
finally:
# Reconnect signals after bulk-creating periodic tasks
pre_save.connect(PeriodicTasks.changed, sender=PeriodicTask)
post_save.connect(PeriodicTasks.update_changed, sender=ClockedSchedule)

except IntegrityError as e:
raise TrashIntegrityError
# Force celery beat scheduler to refresh
PeriodicTasks.update_changed()

# Update relationships between periodic task and trash objects
updated_trash_objects = []
Expand Down Expand Up @@ -246,12 +256,14 @@ def put_back(
]
)
try:
# Disconnect `PeriodicTasks` (plural) signal, until `PeriodicTask` (singular)
# Disconnect `PeriodicTasks` (plural) signals, until `PeriodicTask` (singular)
# delete query finishes to avoid unnecessary DB queries.
# see https://django-celery-beat.readthedocs.io/en/stable/reference/django-celery-beat.models.html#django_celery_beat.models.PeriodicTasks
pre_delete.disconnect(PeriodicTasks.changed, sender=PeriodicTask)
post_delete.disconnect(PeriodicTasks.update_changed, sender=ClockedSchedule)
PeriodicTask.objects.only('pk').filter(pk__in=periodic_task_ids).delete()
finally:
post_delete.connect(PeriodicTasks.update_changed, sender=ClockedSchedule)
pre_delete.connect(PeriodicTasks.changed, sender=PeriodicTask)

# Force celery beat scheduler to refresh
Expand Down

0 comments on commit af4c3fd

Please sign in to comment.