Skip to content

Commit b7d4974

Browse files
Bidaya0CorrosiveKidAndrii Rusanovbidaya0
authored
Celery signal processor implement (#414)
* Implement the Celery signal processor The `CelerySignalProcessor` allows automatic updates on the index as delayed background tasks using Celery. NB: We cannot process deletes as background tasks. By the time the Celery worker would pick up the delete job, the model instance would already deleted. We can get around this by setting Celery to use `pickle` and sending the object to the worker, but using `pickle` opens the application up to security concerns. * Draft implementation for different signal processor tests * requirements.txt update * signal.py revert revert signals.py * update * add delete task * add delete and delete_related perpare * fix import bug * update documents and bug fixes. * Celery signal processor branch bak (#2) * add requiremnts deps * add ci * change optional require, update signal handle ways. * change update task * change update task * change test case class. --------- Co-authored-by: Rasika Amaratissa <[email protected]> Co-authored-by: Andrii Rusanov <[email protected]> Co-authored-by: bidaya0 <[email protected]>
1 parent f6fa886 commit b7d4974

File tree

11 files changed

+202
-12
lines changed

11 files changed

+202
-12
lines changed

.github/workflows/ci.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ jobs:
122122
run: |
123123
TOX_ENV=$(echo "py${{ matrix.python-version }}-django-${{ matrix.django-version }}-es${{ matrix.es-dsl-version }}" | tr -d .)
124124
python -m tox -e $TOX_ENV -- --elasticsearch
125+
python -m tox -e $TOX_ENV -- --elasticsearch --signal-processor celery
125126
126127
- name: Publish Coverage Report
127128
uses: codecov/codecov-action@v1

django_elasticsearch_dsl/documents.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,13 @@ def _get_actions(self, object_list, action):
219219
for object_instance in object_list:
220220
if action == 'delete' or self.should_index_object(object_instance):
221221
yield self._prepare_action(object_instance, action)
222+
223+
def get_actions(self, object_list, action):
224+
"""
225+
Generate the elasticsearch payload.
226+
"""
227+
return self._get_actions(object_list, action)
228+
222229

223230
def _bulk(self, *args, **kwargs):
224231
"""Helper for switching between normal and parallel bulk operation"""

django_elasticsearch_dsl/registries.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,5 +174,11 @@ def get_indices(self, models=None):
174174

175175
return set(iterkeys(self._indices))
176176

177+
def __contains__(self, model):
178+
"""
179+
Checks that model is in registry
180+
"""
181+
return model in self._models or model in self._related_models
182+
177183

178184
registry = DocumentRegistry()

django_elasticsearch_dsl/signals.py

Lines changed: 125 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,13 @@
77
from __future__ import absolute_import
88

99
from django.db import models
10+
from django.apps import apps
1011
from django.dispatch import Signal
11-
1212
from .registries import registry
13-
13+
from django.core.exceptions import ObjectDoesNotExist
14+
from importlib import import_module
15+
# Sent after document indexing is completed
16+
post_index = Signal()
1417

1518
class BaseSignalProcessor(object):
1619
"""Base signal processor.
@@ -96,6 +99,124 @@ def teardown(self):
9699
models.signals.m2m_changed.disconnect(self.handle_m2m_changed)
97100
models.signals.pre_delete.disconnect(self.handle_pre_delete)
98101

102+
try:
103+
from celery import shared_task
104+
except ImportError:
105+
pass
106+
else:
107+
class CelerySignalProcessor(RealTimeSignalProcessor):
108+
"""Celery signal processor.
109+
110+
Allows automatic updates on the index as delayed background tasks using
111+
Celery.
112+
113+
NB: We cannot process deletes as background tasks.
114+
By the time the Celery worker would pick up the delete job, the
115+
model instance would already deleted. We can get around this by
116+
setting Celery to use `pickle` and sending the object to the worker,
117+
but using `pickle` opens the application up to security concerns.
118+
"""
99119

100-
# Sent after document indexing is completed
101-
post_index = Signal()
120+
def handle_save(self, sender, instance, **kwargs):
121+
"""Handle save with a Celery task.
122+
123+
Given an individual model instance, update the object in the index.
124+
Update the related objects either.
125+
"""
126+
pk = instance.pk
127+
app_label = instance._meta.app_label
128+
model_name = instance.__class__.__name__
129+
130+
self.registry_update_task.delay(pk, app_label, model_name)
131+
self.registry_update_related_task.delay(pk, app_label, model_name)
132+
133+
def handle_pre_delete(self, sender, instance, **kwargs):
134+
"""Handle removing of instance object from related models instance.
135+
We need to do this before the real delete otherwise the relation
136+
doesn't exists anymore and we can't get the related models instance.
137+
"""
138+
self.prepare_registry_delete_related_task(instance)
139+
140+
def handle_delete(self, sender, instance, **kwargs):
141+
"""Handle delete.
142+
143+
Given an individual model instance, delete the object from index.
144+
"""
145+
self.prepare_registry_delete_task(instance)
146+
147+
def prepare_registry_delete_related_task(self, instance):
148+
"""
149+
Select its related instance before this instance was deleted.
150+
And pass that to celery.
151+
"""
152+
action = 'index'
153+
for doc in registry._get_related_doc(instance):
154+
doc_instance = doc(related_instance_to_ignore=instance)
155+
try:
156+
related = doc_instance.get_instances_from_related(instance)
157+
except ObjectDoesNotExist:
158+
related = None
159+
if related is not None:
160+
doc_instance.update(related)
161+
if isinstance(related, models.Model):
162+
object_list = [related]
163+
else:
164+
object_list = related
165+
bulk_data = list(doc_instance._get_actions(object_list, action)),
166+
self.registry_delete_task.delay(doc_instance.__class__.__name__, bulk_data)
167+
168+
@shared_task()
169+
def registry_delete_task(doc_label, data):
170+
"""
171+
Handle the bulk delete data on the registry as a Celery task.
172+
The different implementations used are due to the difference between delete and update operations.
173+
The update operation can re-read the updated data from the database to ensure eventual consistency,
174+
but the delete needs to be processed before the database record is deleted to obtain the associated data.
175+
"""
176+
doc_instance = import_module(doc_label)
177+
parallel = True
178+
doc_instance._bulk(bulk_data, parallel=parallel)
179+
180+
def prepare_registry_delete_task(self, instance):
181+
"""
182+
Get the prepare did before database record deleted.
183+
"""
184+
action = 'delete'
185+
for doc in registry._get_related_doc(instance):
186+
doc_instance = doc(related_instance_to_ignore=instance)
187+
try:
188+
related = doc_instance.get_instances_from_related(instance)
189+
except ObjectDoesNotExist:
190+
related = None
191+
if related is not None:
192+
doc_instance.update(related)
193+
if isinstance(related, models.Model):
194+
object_list = [related]
195+
else:
196+
object_list = related
197+
bulk_data = list(doc_instance.get_actions(object_list, action)),
198+
self.registry_delete_task.delay(doc_instance.__class__.__name__, bulk_data)
199+
200+
@shared_task()
201+
def registry_update_task(pk, app_label, model_name):
202+
"""Handle the update on the registry as a Celery task."""
203+
try:
204+
model = apps.get_model(app_label, model_name)
205+
except LookupError:
206+
pass
207+
else:
208+
registry.update(
209+
model.objects.get(pk=pk)
210+
)
211+
212+
@shared_task()
213+
def registry_update_related_task(pk, app_label, model_name):
214+
"""Handle the related update on the registry as a Celery task."""
215+
try:
216+
model = apps.get_model(app_label, model_name)
217+
except LookupError:
218+
pass
219+
else:
220+
registry.update_related(
221+
model.objects.get(pk=pk)
222+
)

docs/source/settings.rst

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,15 @@ An example:
3737
3838
Defaults to ``django_elasticsearch_dsl.signals.RealTimeSignalProcessor``.
3939

40-
You could, for instance, make a ``CelerySignalProcessor`` which would add
41-
update jobs to the queue to for delayed processing.
40+
Options: ``django_elasticsearch_dsl.signals.RealTimeSignalProcessor`` \ ``django_elasticsearch_dsl.signals.CelerySignalProcessor``
41+
42+
In this ``CelerySignalProcessor`` implementation,
43+
Create and update operations will record the updated data primary key from the database and delay the time to find the association to ensure eventual consistency.
44+
Delete operations are processed to obtain associated data before database records are deleted.
45+
And celery needs to be pre-configured in the django project, for example `Using Celery with Django <https://docs.celeryq.dev/en/stable/django/first-steps-with-django.html>`.
46+
47+
You could, for instance, make a ``CustomSignalProcessor`` which would apply
48+
update jobs as your wish.
4249

4350
ELASTICSEARCH_DSL_PARALLEL
4451
==========================

requirements.txt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,2 @@
11
django>=1.9.6
22
elasticsearch-dsl>=7.0.0,<8.0.0
3-

requirements_test.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ mock>=1.0.1
33
flake8>=2.1.0
44
tox>=1.7.0
55
Pillow==6.2.0
6+
celery>=4.1.0
67

78

89
# Additional test requirements go here

runtests.py

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,19 @@
22
import sys
33
import argparse
44

5+
from celery import Celery
6+
57
try:
68
from django.conf import settings
79
from django.test.utils import get_runner
810

9-
def get_settings():
11+
def get_settings(signal_processor):
12+
PROCESSOR_CLASSES = {
13+
'realtime': 'django_elasticsearch_dsl.signals.RealTimeSignalProcessor',
14+
'celery': 'django_elasticsearch_dsl.signals.CelerySignalProcessor',
15+
}
16+
17+
signal_processor = PROCESSOR_CLASSES[signal_processor]
1018
settings.configure(
1119
DEBUG=True,
1220
USE_TZ=True,
@@ -31,6 +39,10 @@ def get_settings():
3139
},
3240
},
3341
DEFAULT_AUTO_FIELD="django.db.models.BigAutoField",
42+
CELERY_BROKER_URL='memory://localhost/',
43+
CELERY_TASK_ALWAYS_EAGER=True,
44+
CELERY_EAGER_PROPAGATES_EXCEPTIONS=True,
45+
ELASTICSEARCH_DSL_SIGNAL_PROCESSOR=signal_processor
3446
)
3547

3648
try:
@@ -41,6 +53,9 @@ def get_settings():
4153
else:
4254
setup()
4355

56+
app = Celery()
57+
app.config_from_object('django.conf:settings', namespace='CELERY')
58+
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
4459
return settings
4560

4661
except ImportError:
@@ -59,6 +74,13 @@ def make_parser():
5974
const='localhost:9200',
6075
help="To run integration test against an Elasticsearch server",
6176
)
77+
parser.add_argument(
78+
'--signal-processor',
79+
nargs='?',
80+
default='realtime',
81+
choices=('realtime', 'celery'),
82+
help='Defines which signal backend to choose'
83+
)
6284
return parser
6385

6486

@@ -70,7 +92,9 @@ def run_tests(*test_args):
7092
if not test_args:
7193
test_args = ['tests']
7294

73-
settings = get_settings()
95+
signal_processor = args.signal_processor
96+
97+
settings = get_settings(signal_processor)
7498
TestRunner = get_runner(settings)
7599
test_runner = TestRunner()
76100

setup.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,4 +72,7 @@
7272
'Programming Language :: Python :: 3.9',
7373
'Programming Language :: Python :: 3.10',
7474
],
75+
extras_require={
76+
'celery': ["celery>=4.1.0"],
77+
}
7578
)

tests/test_documents.py

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import json
22
from unittest import TestCase
3+
from unittest import SkipTest
4+
35

46
import django
57
from django.db import models
@@ -64,7 +66,19 @@ class Index:
6466
doc_type = 'car_document'
6567

6668

67-
class DocTypeTestCase(TestCase):
69+
class BaseDocTypeTestCase(object):
70+
TARGET_PROCESSOR = None
71+
72+
@classmethod
73+
def setUpClass(cls):
74+
from django.conf import settings
75+
if cls.TARGET_PROCESSOR != settings.ELASTICSEARCH_DSL_SIGNAL_PROCESSOR:
76+
raise SkipTest(
77+
"Skipped because {} is required, not {}".format(
78+
cls.TARGET_PROCESSOR, settings.ELASTICSEARCH_DSL_SIGNAL_PROCESSOR
79+
)
80+
)
81+
super(BaseDocTypeTestCase,cls).setUpClass()
6882

6983
def test_model_class_added(self):
7084
self.assertEqual(CarDocument.django.model, Car)
@@ -538,3 +552,10 @@ def should_index_object(self, obj):
538552
data_body = mock_bulk.call_args[1]['body']
539553
self.assertTrue(article1.slug in data_body)
540554
self.assertTrue(article2.slug not in data_body)
555+
556+
class RealTimeDocTypeTestCase(BaseDocTypeTestCase, TestCase):
557+
TARGET_PROCESSOR = 'django_elasticsearch_dsl.signals.RealTimeSignalProcessor'
558+
559+
560+
class CeleryDocTypeTestCase(BaseDocTypeTestCase, TestCase):
561+
TARGET_PROCESSOR = 'django_elasticsearch_dsl.signals.CelerySignalProcessor'

tests/test_integration.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
import django
55
from django.core.management import call_command
6-
from django.test import TestCase
6+
from django.test import TestCase, TransactionTestCase
77
if django.VERSION < (4, 0):
88
from django.utils.translation import ugettext_lazy as _
99
else:
@@ -29,7 +29,7 @@
2929

3030

3131
@unittest.skipUnless(is_es_online(), 'Elasticsearch is offline')
32-
class IntegrationTestCase(ESTestCase, TestCase):
32+
class IntegrationTestCase(ESTestCase, TransactionTestCase):
3333
def setUp(self):
3434
super(IntegrationTestCase, self).setUp()
3535
self.manufacturer = Manufacturer(

0 commit comments

Comments
 (0)