Skip to content

Commit

Permalink
Use context manager to disconnect celery beat signals
Browse files Browse the repository at this point in the history
  • Loading branch information
noliveleger committed Oct 29, 2024
1 parent af4c3fd commit 9cbe802
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 48 deletions.
32 changes: 10 additions & 22 deletions kobo/apps/trash_bin/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@
from django.conf import settings
from django.contrib.auth import get_user_model
from django.db import transaction
from django.db.models.signals import pre_delete, post_delete
from django.db.models.signals import post_delete
from django.utils.timezone import now
from django_celery_beat.models import (
ClockedSchedule,
PeriodicTask,
PeriodicTasks,
)
from requests.exceptions import HTTPError

Expand All @@ -25,7 +24,11 @@
from .models import TrashStatus
from .models.account import AccountTrash
from .models.project import ProjectTrash
from .utils import delete_asset, replace_user_with_placeholder
from .utils import (
delete_asset,
replace_user_with_placeholder,
signals_temporarily_disconnected,
)


@celery_app.task(
Expand Down Expand Up @@ -259,27 +262,19 @@ def empty_project_retry(sender=None, **kwargs):

@celery_app.task
def garbage_collector():
deleted_tasks = 0
deleted_users = 0
deleted_projects = 0

try:
# Disconnect `PeriodicTasks` signals before performing bulk deletions
pre_delete.disconnect(PeriodicTasks.changed, sender=PeriodicTask)
post_delete.disconnect(PeriodicTasks.update_changed, sender=ClockedSchedule)

with signals_temporarily_disconnected(delete=True):
with transaction.atomic():

# Remove orphan periodic tasks
deleted_users, _ = PeriodicTask.objects.exclude(
PeriodicTask.objects.exclude(
pk__in=AccountTrash.objects.values_list(
'periodic_task_id', flat=True
),
).filter(
name__startswith=DELETE_USER_STR_PREFIX, clocked__isnull=False
).delete()

deleted_projects, _ = PeriodicTask.objects.exclude(
PeriodicTask.objects.exclude(
pk__in=ProjectTrash.objects.values_list(
'periodic_task_id', flat=True
),
Expand All @@ -288,15 +283,8 @@ def garbage_collector():
).delete()

# Then, remove clocked schedules
deleted_tasks, _ = ClockedSchedule.objects.exclude(
ClockedSchedule.objects.exclude(
pk__in=PeriodicTask.objects.filter(
clocked__isnull=False
).values_list('clocked_id', flat=True),
).delete()
finally:
# Reconnect `PeriodicTasks` signals after performing bulk deletions
pre_delete.connect(PeriodicTasks.changed, sender=PeriodicTask)
post_delete.connect(PeriodicTasks.update_changed, sender=ClockedSchedule)

if deleted_users + deleted_projects + deleted_tasks:
PeriodicTasks.update_changed()
66 changes: 40 additions & 26 deletions kobo/apps/trash_bin/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import json
from contextlib import contextmanager
from copy import deepcopy
from datetime import timedelta

Expand All @@ -9,7 +10,7 @@
from django.db import IntegrityError, models, transaction
from django.db.models import F, Q
from django.db.models.signals import post_delete, post_save, pre_delete, pre_save
from django.utils.timezone import now
from django.utils import timezone
from django_celery_beat.models import (
ClockedSchedule,
PeriodicTask,
Expand Down Expand Up @@ -154,12 +155,8 @@ def move_to_trash(
)
)

try:
# Disconnect signals before bulk-creating periodic tasks
pre_save.disconnect(PeriodicTasks.changed, sender=PeriodicTask)
post_save.disconnect(PeriodicTasks.update_changed, sender=ClockedSchedule)

clocked_time = now() + timedelta(days=grace_period)
with signals_temporarily_disconnected(save=True):
clocked_time = timezone.now() + timedelta(days=grace_period)
clocked = ClockedSchedule.objects.create(clocked_time=clocked_time)
trash_model.objects.bulk_create(trash_objects)
try:
Expand All @@ -179,13 +176,6 @@ def move_to_trash(

except IntegrityError:
raise TrashIntegrityError
finally:
# Reconnect signals after bulk-creating periodic tasks
pre_save.connect(PeriodicTasks.changed, sender=PeriodicTask)
post_save.connect(PeriodicTasks.update_changed, sender=ClockedSchedule)

# Force celery beat scheduler to refresh
PeriodicTasks.update_changed()

# Update relationships between periodic task and trash objects
updated_trash_objects = []
Expand Down Expand Up @@ -255,19 +245,9 @@ def put_back(
for obj_dict in objects_list
]
)
try:
# Disconnect `PeriodicTasks` (plural) signals, until `PeriodicTask` (singular)
# delete query finishes to avoid unnecessary DB queries.
# see https://django-celery-beat.readthedocs.io/en/stable/reference/django-celery-beat.models.html#django_celery_beat.models.PeriodicTasks
pre_delete.disconnect(PeriodicTasks.changed, sender=PeriodicTask)
post_delete.disconnect(PeriodicTasks.update_changed, sender=ClockedSchedule)
PeriodicTask.objects.only('pk').filter(pk__in=periodic_task_ids).delete()
finally:
post_delete.connect(PeriodicTasks.update_changed, sender=ClockedSchedule)
pre_delete.connect(PeriodicTasks.changed, sender=PeriodicTask)

# Force celery beat scheduler to refresh
PeriodicTasks.update_changed()
with signals_temporarily_disconnected(delete=True):
PeriodicTask.objects.only('pk').filter(pk__in=periodic_task_ids).delete()


def replace_user_with_placeholder(
Expand Down Expand Up @@ -310,6 +290,40 @@ def replace_user_with_placeholder(
return placeholder_user


@contextmanager
def signals_temporarily_disconnected(save=False, delete=False):
"""
Temporarily disconnects `PeriodicTasks` signals to prevent accumulating
update queries for Celery Beat while bulk operations are in progress.
See https://django-celery-beat.readthedocs.io/en/stable/reference/django-celery-beat.models.html#django_celery_beat.models.PeriodicTasks
"""

try:
if delete:
pre_delete.disconnect(PeriodicTasks.changed, sender=PeriodicTask)
post_delete.disconnect(PeriodicTasks.update_changed, sender=ClockedSchedule)
if save:
pre_save.disconnect(PeriodicTasks.changed, sender=PeriodicTask)
post_save.disconnect(PeriodicTasks.update_changed, sender=ClockedSchedule)
yield
finally:
if delete:
post_delete.connect(PeriodicTasks.update_changed, sender=ClockedSchedule)
pre_delete.connect(PeriodicTasks.changed, sender=PeriodicTask)
if save:
pre_save.connect(PeriodicTasks.changed, sender=PeriodicTask)
post_save.connect(PeriodicTasks.update_changed, sender=ClockedSchedule)

few_minutes_ago = timezone.now() - timedelta(minutes=5)

# Limit the number of `update_changed()` calls to prevent table locking,
# which can create a bottleneck.
if PeriodicTasks.objects.filter(last_update__lt=few_minutes_ago).exists():
# Force celery beat scheduler to refresh
PeriodicTasks.update_changed()


def _delete_submissions(request_author: settings.AUTH_USER_MODEL, asset: 'kpi.Asset'):

(
Expand Down

0 comments on commit 9cbe802

Please sign in to comment.