diff --git a/django_elasticsearch_dsl/signals.py b/django_elasticsearch_dsl/signals.py index 48f42249..5701424c 100644 --- a/django_elasticsearch_dsl/signals.py +++ b/django_elasticsearch_dsl/signals.py @@ -144,58 +144,98 @@ def handle_delete(self, sender, instance, **kwargs): """ self.prepare_registry_delete_task(instance) + + @shared_task() + def registry_delete_related_task(doc_module, doc_class, object_ids, action): + """ + A Celery task that fetches the latest data for given object IDs and performs the required indexing action. + This version uses the custom `get_queryset()` method defined in the document class. + + Instead of deleting the related objects we update it so that the deleted connection between + the deleted model and the related model is updated into elasticsearch. + """ + doc_instance = getattr(import_module(doc_module), doc_class)() + model = doc_instance.django.model + + # Fetch the latest instances from the database + #object_list = model.objects.filter(pk__in=object_ids).all() + # Use the custom queryset method if available + object_list = doc_instance.get_queryset().filter(pk__in=object_ids) + if not object_list: + return + + # Generate the bulk update data + bulk_data = list(doc_instance._get_actions(object_list, action)) + + if bulk_data: + doc_instance._bulk(bulk_data, parallel=True) + + def prepare_registry_delete_related_task(self, instance): """ - Select its related instance before this instance was deleted. - And pass that to celery. + Collect IDs of related instances before the main instance is deleted and queue these IDs + for indexing in Elasticsearch through a registry_delete_related_task. """ - action = 'index' - for doc in registry._get_related_doc(instance): - doc_instance = doc(related_instance_to_ignore=instance) + related_docs = list(registry._get_related_doc(instance)) + if not related_docs: + return + + for doc_class in related_docs: + doc_instance = doc_class() try: related = doc_instance.get_instances_from_related(instance) except ObjectDoesNotExist: related = None - if related is not None: - doc_instance.update(related) + + if related: if isinstance(related, models.Model): - object_list = [related] + object_ids = [related.pk] else: - object_list = related - bulk_data = list(doc_instance._get_actions(object_list, action)), - self.registry_delete_task.delay(doc_instance.__class__.__name__, bulk_data) + object_ids = [obj.pk for obj in related if hasattr(obj, 'pk')] + + action = 'index' # Set the operation as 'index' + # Send only the IDs to the task + self.registry_delete_related_task.delay(doc_class.__module__, doc_class.__name__, object_ids, action) + @shared_task() - def registry_delete_task(doc_label, data): + def registry_delete_task(doc_module, doc_class, bulk_data): """ Handle the bulk delete data on the registry as a Celery task. The different implementations used are due to the difference between delete and update operations. The update operation can re-read the updated data from the database to ensure eventual consistency, but the delete needs to be processed before the database record is deleted to obtain the associated data. """ - doc_instance = import_module(doc_label) + doc_instance = getattr(import_module(doc_module), doc_class)() parallel = True doc_instance._bulk(bulk_data, parallel=parallel) + def prepare_registry_delete_task(self, instance): """ - Get the prepare did before database record deleted. + Prepare deletion of the instance itself from Elasticsearch. """ action = 'delete' - for doc in registry._get_related_doc(instance): - doc_instance = doc(related_instance_to_ignore=instance) - try: - related = doc_instance.get_instances_from_related(instance) - except ObjectDoesNotExist: - related = None - if related is not None: - doc_instance.update(related) - if isinstance(related, models.Model): - object_list = [related] - else: - object_list = related - bulk_data = list(doc_instance.get_actions(object_list, action)), - self.registry_delete_task.delay(doc_instance.__class__.__name__, bulk_data) + + # Find all documents in the registry that are related to the instance's model class + if instance.__class__ not in registry._models: + return + + bulk_data = [] + for doc_class in registry._models[instance.__class__]: + doc_instance = doc_class() # Create an instance of the document + if isinstance(instance, models.Model): + object_list = [instance] + else: + object_list = instance + + # Assuming get_actions method prepares the correct delete actions for Elasticsearch + bulk_data.extend(list(doc_instance._get_actions(object_list, action))) + + if bulk_data: + # Ensure registry_delete_task is prepared to handle bulk deletion + self.registry_delete_task.delay(doc_instance.__module__, doc_instance.__class__.__name__, bulk_data) + @shared_task() def registry_update_task(pk, app_label, model_name): @@ -205,9 +245,13 @@ def registry_update_task(pk, app_label, model_name): except LookupError: pass else: - registry.update( - model.objects.get(pk=pk) - ) + try: + registry.update( + model.objects.get(pk=pk) + ) + except ObjectDoesNotExist as e: + print(f'Error registry_update_task: {e}') + @shared_task() def registry_update_related_task(pk, app_label, model_name): @@ -217,6 +261,9 @@ def registry_update_related_task(pk, app_label, model_name): except LookupError: pass else: - registry.update_related( - model.objects.get(pk=pk) - ) + try: + registry.update_related( + model.objects.get(pk=pk) + ) + except ObjectDoesNotExist as e: + print(f'Error registry_update_related_task: {e}')