From 9cbe8027dc63f34ae468a9e22ad54aa4d2be7de6 Mon Sep 17 00:00:00 2001 From: Olivier Leger Date: Tue, 29 Oct 2024 10:01:01 -0400 Subject: [PATCH] Use context manager to disconnect celery beat signals --- kobo/apps/trash_bin/tasks.py | 32 ++++++----------- kobo/apps/trash_bin/utils.py | 66 ++++++++++++++++++++++-------------- 2 files changed, 50 insertions(+), 48 deletions(-) diff --git a/kobo/apps/trash_bin/tasks.py b/kobo/apps/trash_bin/tasks.py index e816151723..0e38329519 100644 --- a/kobo/apps/trash_bin/tasks.py +++ b/kobo/apps/trash_bin/tasks.py @@ -4,12 +4,11 @@ from django.conf import settings from django.contrib.auth import get_user_model from django.db import transaction -from django.db.models.signals import pre_delete, post_delete +from django.db.models.signals import post_delete from django.utils.timezone import now from django_celery_beat.models import ( ClockedSchedule, PeriodicTask, - PeriodicTasks, ) from requests.exceptions import HTTPError @@ -25,7 +24,11 @@ from .models import TrashStatus from .models.account import AccountTrash from .models.project import ProjectTrash -from .utils import delete_asset, replace_user_with_placeholder +from .utils import ( + delete_asset, + replace_user_with_placeholder, + signals_temporarily_disconnected, +) @celery_app.task( @@ -259,19 +262,11 @@ def empty_project_retry(sender=None, **kwargs): @celery_app.task def garbage_collector(): - deleted_tasks = 0 - deleted_users = 0 - deleted_projects = 0 - - try: - # Disconnect `PeriodicTasks` signals before performing bulk deletions - pre_delete.disconnect(PeriodicTasks.changed, sender=PeriodicTask) - post_delete.disconnect(PeriodicTasks.update_changed, sender=ClockedSchedule) + with signals_temporarily_disconnected(delete=True): with transaction.atomic(): - # Remove orphan periodic tasks - deleted_users, _ = PeriodicTask.objects.exclude( + PeriodicTask.objects.exclude( pk__in=AccountTrash.objects.values_list( 'periodic_task_id', flat=True ), @@ -279,7 +274,7 @@ def garbage_collector(): name__startswith=DELETE_USER_STR_PREFIX, clocked__isnull=False ).delete() - deleted_projects, _ = PeriodicTask.objects.exclude( + PeriodicTask.objects.exclude( pk__in=ProjectTrash.objects.values_list( 'periodic_task_id', flat=True ), @@ -288,15 +283,8 @@ def garbage_collector(): ).delete() # Then, remove clocked schedules - deleted_tasks, _ = ClockedSchedule.objects.exclude( + ClockedSchedule.objects.exclude( pk__in=PeriodicTask.objects.filter( clocked__isnull=False ).values_list('clocked_id', flat=True), ).delete() - finally: - # Reconnect `PeriodicTasks` signals after performing bulk deletions - pre_delete.connect(PeriodicTasks.changed, sender=PeriodicTask) - post_delete.connect(PeriodicTasks.update_changed, sender=ClockedSchedule) - - if deleted_users + deleted_projects + deleted_tasks: - PeriodicTasks.update_changed() diff --git a/kobo/apps/trash_bin/utils.py b/kobo/apps/trash_bin/utils.py index ba52ed12bb..cda5d4adf1 100644 --- a/kobo/apps/trash_bin/utils.py +++ b/kobo/apps/trash_bin/utils.py @@ -1,6 +1,7 @@ from __future__ import annotations import json +from contextlib import contextmanager from copy import deepcopy from datetime import timedelta @@ -9,7 +10,7 @@ from django.db import IntegrityError, models, transaction from django.db.models import F, Q from django.db.models.signals import post_delete, post_save, pre_delete, pre_save -from django.utils.timezone import now +from django.utils import timezone from django_celery_beat.models import ( ClockedSchedule, PeriodicTask, @@ -154,12 +155,8 @@ def move_to_trash( ) ) - try: - # Disconnect signals before bulk-creating periodic tasks - pre_save.disconnect(PeriodicTasks.changed, sender=PeriodicTask) - post_save.disconnect(PeriodicTasks.update_changed, sender=ClockedSchedule) - - clocked_time = now() + timedelta(days=grace_period) + with signals_temporarily_disconnected(save=True): + clocked_time = timezone.now() + timedelta(days=grace_period) clocked = ClockedSchedule.objects.create(clocked_time=clocked_time) trash_model.objects.bulk_create(trash_objects) try: @@ -179,13 +176,6 @@ def move_to_trash( except IntegrityError: raise TrashIntegrityError - finally: - # Reconnect signals after bulk-creating periodic tasks - pre_save.connect(PeriodicTasks.changed, sender=PeriodicTask) - post_save.connect(PeriodicTasks.update_changed, sender=ClockedSchedule) - - # Force celery beat scheduler to refresh - PeriodicTasks.update_changed() # Update relationships between periodic task and trash objects updated_trash_objects = [] @@ -255,19 +245,9 @@ def put_back( for obj_dict in objects_list ] ) - try: - # Disconnect `PeriodicTasks` (plural) signals, until `PeriodicTask` (singular) - # delete query finishes to avoid unnecessary DB queries. - # see https://django-celery-beat.readthedocs.io/en/stable/reference/django-celery-beat.models.html#django_celery_beat.models.PeriodicTasks - pre_delete.disconnect(PeriodicTasks.changed, sender=PeriodicTask) - post_delete.disconnect(PeriodicTasks.update_changed, sender=ClockedSchedule) - PeriodicTask.objects.only('pk').filter(pk__in=periodic_task_ids).delete() - finally: - post_delete.connect(PeriodicTasks.update_changed, sender=ClockedSchedule) - pre_delete.connect(PeriodicTasks.changed, sender=PeriodicTask) - # Force celery beat scheduler to refresh - PeriodicTasks.update_changed() + with signals_temporarily_disconnected(delete=True): + PeriodicTask.objects.only('pk').filter(pk__in=periodic_task_ids).delete() def replace_user_with_placeholder( @@ -310,6 +290,40 @@ def replace_user_with_placeholder( return placeholder_user +@contextmanager +def signals_temporarily_disconnected(save=False, delete=False): + """ + Temporarily disconnects `PeriodicTasks` signals to prevent accumulating + update queries for Celery Beat while bulk operations are in progress. + + See https://django-celery-beat.readthedocs.io/en/stable/reference/django-celery-beat.models.html#django_celery_beat.models.PeriodicTasks + """ + + try: + if delete: + pre_delete.disconnect(PeriodicTasks.changed, sender=PeriodicTask) + post_delete.disconnect(PeriodicTasks.update_changed, sender=ClockedSchedule) + if save: + pre_save.disconnect(PeriodicTasks.changed, sender=PeriodicTask) + post_save.disconnect(PeriodicTasks.update_changed, sender=ClockedSchedule) + yield + finally: + if delete: + post_delete.connect(PeriodicTasks.update_changed, sender=ClockedSchedule) + pre_delete.connect(PeriodicTasks.changed, sender=PeriodicTask) + if save: + pre_save.connect(PeriodicTasks.changed, sender=PeriodicTask) + post_save.connect(PeriodicTasks.update_changed, sender=ClockedSchedule) + + few_minutes_ago = timezone.now() - timedelta(minutes=5) + + # Limit the number of `update_changed()` calls to prevent table locking, + # which can create a bottleneck. + if PeriodicTasks.objects.filter(last_update__lt=few_minutes_ago).exists(): + # Force celery beat scheduler to refresh + PeriodicTasks.update_changed() + + def _delete_submissions(request_author: settings.AUTH_USER_MODEL, asset: 'kpi.Asset'): (