Skip to content

Commit d0ee947

Browse files
Virgin Bittonqcoumes
authored andcommitted
feat: Add JoinField and update indexation command to select on which models to start the indexation
1 parent 4d4f437 commit d0ee947

File tree

4 files changed

+145
-42
lines changed

4 files changed

+145
-42
lines changed

django_opensearch_dsl/documents.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,11 @@ def search(cls, using=None, index=None):
6363
using=cls._get_using(using), index=cls._default_index(index), doc_type=[cls], model=cls.django.model
6464
)
6565

66-
def get_queryset(self, filter_: Optional[Q] = None, exclude: Optional[Q] = None, count: int = None) -> QuerySet:
66+
def get_queryset(
67+
self, db_alias: str = "default", filter_: Optional[Q] = None, exclude: Optional[Q] = None, count: int = None
68+
) -> QuerySet:
6769
"""Return the queryset that should be indexed by this doc type."""
68-
qs = self.django.model.objects.all()
70+
qs = self.django.model.objects.using(db_alias).all()
6971

7072
if filter_:
7173
qs = qs.filter(filter_)
@@ -88,6 +90,7 @@ def _eta(self, start, done, total): # pragma: no cover
8890

8991
def get_indexing_queryset(
9092
self,
93+
db_alias: str = "default",
9194
verbose: bool = False,
9295
filter_: Optional[Q] = None,
9396
exclude: Optional[Q] = None,
@@ -97,7 +100,7 @@ def get_indexing_queryset(
97100
) -> Iterable:
98101
"""Divide the queryset into chunks."""
99102
chunk_size = self.django.queryset_pagination
100-
qs = self.get_queryset(filter_=filter_, exclude=exclude, count=count)
103+
qs = self.get_queryset(db_alias=db_alias, filter_=filter_, exclude=exclude, count=count)
101104
qs = qs.order_by("pk") if not qs.query.is_sliced else qs
102105
count = qs.count()
103106
model = self.django.model.__name__

django_opensearch_dsl/fields.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,10 @@ class IpField(DODField, fields.Ip):
218218
"""Allow indexing of IPv4 and IPv6 addresses."""
219219

220220

221+
class JoinField(DODField, fields.Join):
222+
"""Allow indexing of Join fields (with parent/child relation)."""
223+
224+
221225
class LongField(DODField, fields.Long):
222226
"""Allow indexing of long.
223227

django_opensearch_dsl/management/commands/opensearch.py

Lines changed: 131 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from django.core.exceptions import FieldError
1313
from django.core.management import BaseCommand
1414
from django.core.management.base import OutputWrapper
15+
from django.db import DEFAULT_DB_ALIAS
1516
from django.db.models import Q
1617

1718
from django_opensearch_dsl.registries import registry
@@ -110,6 +111,13 @@ def _manage_index(self, action, indices, force, verbosity, ignore_error, **optio
110111
self.stdout.flush()
111112
try:
112113
if action == OpensearchAction.CREATE:
114+
# If current index depends on many different models, add them to
115+
# index._doc_types before indexing to make sure all mappings of different models
116+
# are taken into account.
117+
index_models = registry.get_indices_raw().get(index, None)
118+
if index_models:
119+
for model in list(index_models):
120+
index._doc_types.append(model)
113121
index.create()
114122
elif action == OpensearchAction.DELETE:
115123
index.delete()
@@ -133,14 +141,38 @@ def _manage_index(self, action, indices, force, verbosity, ignore_error, **optio
133141
self.stdout.write(f"{pp} index '{index._name}'... {self.style.SUCCESS('OK')}") # noqa
134142

135143
def _manage_document(
136-
self, action, indices, force, filters, excludes, verbosity, parallel, count, refresh, missing, **options
144+
self,
145+
action,
146+
indices,
147+
objects,
148+
force,
149+
filters,
150+
excludes,
151+
verbosity,
152+
parallel,
153+
count,
154+
refresh,
155+
missing,
156+
database,
157+
**options,
137158
): # noqa
138159
"""Manage the creation and deletion of indices."""
139160
action = OpensearchAction(action)
140161
known = registry.get_indices()
141162
filter_ = functools.reduce(operator.and_, (Q(**{k: v}) for k, v in filters)) if filters else None
142163
exclude = functools.reduce(operator.and_, (Q(**{k: v}) for k, v in excludes)) if excludes else None
143164

165+
# Filter existing objects
166+
valid_models = []
167+
registered_models = [m.__name__.lower() for m in registry.get_models()]
168+
if objects:
169+
for model in objects:
170+
if model.lower() in registered_models:
171+
valid_models.append(model)
172+
else:
173+
self.stderr.write(f"Unknown object '{model}', choices are: '{registered_models}'")
174+
exit(1)
175+
144176
# Filter indices
145177
if indices:
146178
# Ensure every given indices exists
@@ -165,23 +197,50 @@ def _manage_document(
165197
# Check field, preparing to display expected actions
166198
s = f"The following documents will be {action.past}:"
167199
kwargs_list = []
168-
for index in indices:
200+
201+
if objects:
202+
django_models = [m for m in registry.get_models() if m.__name__.lower() in valid_models]
203+
all_os_models = []
204+
selected_os_models = []
205+
indices_raw = registry.get_indices_raw()
206+
207+
for k, v in indices_raw.items():
208+
for model in list(v):
209+
all_os_models.append(model)
210+
211+
for os_model in all_os_models:
212+
if os_model.django.model in django_models and os_model.Index.name in list(i._name for i in indices):
213+
selected_os_models.append(os_model)
214+
169215
# Handle --missing
170216
exclude_ = exclude
171-
if missing and action == OpensearchAction.INDEX:
172-
q = Q(pk__in=[h.meta.id for h in index.search().extra(stored_fields=[]).scan()])
173-
exclude_ = exclude_ & q if exclude_ is not None else q
174-
175-
document = index._doc_types[0]() # noqa
176-
try:
177-
kwargs_list.append({"filter_": filter_, "exclude": exclude_, "count": count})
178-
qs = document.get_queryset(filter_=filter_, exclude=exclude_, count=count).count()
179-
except FieldError as e:
180-
model = index._doc_types[0].django.model.__name__ # noqa
181-
self.stderr.write(f"Error while filtering on '{model}' (from index '{index._name}'):\n{e}'") # noqa
182-
exit(1)
183-
else:
184-
s += f"\n\t- {qs} {document.django.model.__name__}."
217+
for model in selected_os_models:
218+
try:
219+
kwargs_list.append({"filter_": filter_, "exclude": exclude_, "count": count})
220+
qs = model().get_queryset(filter_=filter_, exclude=exclude_, count=count).count()
221+
except FieldError as e:
222+
self.stderr.write(f"Error while filtering on '{model.django.model.__name__}':\n{e}'") # noqa
223+
exit(1)
224+
else:
225+
s += f"\n\t- {qs} {model.django.model.__name__}."
226+
else:
227+
for index in indices:
228+
# Handle --missing
229+
exclude_ = exclude
230+
if missing and action == OpensearchAction.INDEX:
231+
q = Q(pk__in=[h.meta.id for h in index.search().extra(stored_fields=[]).scan()])
232+
exclude_ = exclude_ & q if exclude_ is not None else q
233+
234+
document = index._doc_types[0]() # noqa
235+
try:
236+
kwargs_list.append({"db_alias": database, "filter_": filter_, "exclude": exclude_, "count": count})
237+
qs = document.get_queryset(filter_=filter_, exclude=exclude_, count=count).count()
238+
except FieldError as e:
239+
model = index._doc_types[0].django.model.__name__ # noqa
240+
self.stderr.write(f"Error while filtering on '{model}' (from index '{index._name}'):\n{e}'") # noqa
241+
exit(1)
242+
else:
243+
s += f"\n\t- {qs} {document.django.model.__name__}."
185244

186245
# Display expected actions
187246
if verbosity or not force:
@@ -198,28 +257,53 @@ def _manage_document(
198257
exit(1)
199258

200259
result = "\n"
201-
for index, kwargs in zip(indices, kwargs_list):
202-
document = index._doc_types[0]() # noqa
203-
qs = document.get_indexing_queryset(stdout=self.stdout._out, verbose=verbosity, action=action, **kwargs)
204-
success, errors = document.update(
205-
qs, parallel=parallel, refresh=refresh, action=action, raise_on_error=False
206-
)
207-
208-
success_str = self.style.SUCCESS(success) if success else success
209-
errors_str = self.style.ERROR(len(errors)) if errors else len(errors)
210-
model = document.django.model.__name__
211-
212-
if verbosity == 1:
213-
result += f"{success_str} {model} successfully {action.past}, {errors_str} errors:\n"
214-
reasons = defaultdict(int)
215-
for e in errors: # Count occurrence of each error
216-
error = e.get(action, {"result": "unknown error"}).get("result", "unknown error")
217-
reasons[error] += 1
218-
for reasons, total in reasons.items():
219-
result += f" - {reasons} : {total}\n"
220-
221-
if verbosity > 1:
222-
result += f"{success_str} {model} successfully {action}d, {errors_str} errors:\n {errors}\n"
260+
if objects:
261+
for model, kwargs in zip(selected_os_models, kwargs_list):
262+
document = model() # noqa
263+
qs = document.get_indexing_queryset(stdout=self.stdout._out, verbose=verbosity, action=action, **kwargs)
264+
success, errors = document.update(
265+
qs, parallel=parallel, refresh=refresh, action=action, raise_on_error=False
266+
)
267+
268+
success_str = self.style.SUCCESS(success) if success else success
269+
errors_str = self.style.ERROR(len(errors)) if errors else len(errors)
270+
model = document.django.model.__name__
271+
272+
if verbosity == 1:
273+
result += f"{success_str} {model} successfully {action.past}, {errors_str} errors:\n"
274+
reasons = defaultdict(int)
275+
for e in errors: # Count occurrence of each error
276+
error = e.get(action, {"result": "unknown error"}).get("result", "unknown error")
277+
reasons[error] += 1
278+
for reasons, total in reasons.items():
279+
result += f" - {reasons} : {total}\n"
280+
281+
if verbosity > 1:
282+
result += f"{success_str} {model} successfully {action}d, {errors_str} errors:\n {errors}\n"
283+
284+
else:
285+
for index, kwargs in zip(indices, kwargs_list):
286+
document = index._doc_types[0]() # noqa
287+
qs = document.get_indexing_queryset(stdout=self.stdout._out, verbose=verbosity, action=action, **kwargs)
288+
success, errors = document.update(
289+
qs, parallel=parallel, refresh=refresh, action=action, raise_on_error=False
290+
)
291+
292+
success_str = self.style.SUCCESS(success) if success else success
293+
errors_str = self.style.ERROR(len(errors)) if errors else len(errors)
294+
model = document.django.model.__name__
295+
296+
if verbosity == 1:
297+
result += f"{success_str} {model} successfully {action.past}, {errors_str} errors:\n"
298+
reasons = defaultdict(int)
299+
for e in errors: # Count occurrence of each error
300+
error = e.get(action, {"result": "unknown error"}).get("result", "unknown error")
301+
reasons[error] += 1
302+
for reasons, total in reasons.items():
303+
result += f" - {reasons} : {total}\n"
304+
305+
if verbosity > 1:
306+
result += f"{success_str} {model} successfully {action}d, {errors_str} errors:\n {errors}\n"
223307

224308
if verbosity:
225309
self.stdout.write(result + "\n")
@@ -237,7 +321,7 @@ def add_arguments(self, parser):
237321
)
238322
subparser.set_defaults(func=self.__list_index)
239323

240-
# 'manage' subcommand
324+
# 'index' subcommand
241325
subparser = subparsers.add_parser(
242326
"index",
243327
help="Manage the creation an deletion of indices.",
@@ -288,6 +372,13 @@ def add_arguments(self, parser):
288372
OpensearchAction.UPDATE.value,
289373
],
290374
)
375+
subparser.add_argument(
376+
"-d",
377+
"--database",
378+
type=str,
379+
default=DEFAULT_DB_ALIAS,
380+
help="Nominates a database to use as source. Defaults to the 'default' database.",
381+
)
291382
subparser.add_argument(
292383
"-f",
293384
"--filters",
@@ -321,6 +412,7 @@ def add_arguments(self, parser):
321412
subparser.add_argument(
322413
"-i", "--indices", type=str, nargs="*", help="Only update documents on the given indices."
323414
)
415+
subparser.add_argument("-o", "--objects", type=str, nargs="*", help="Only update selected objects.")
324416
subparser.add_argument(
325417
"-c", "--count", type=int, default=None, help="Update at most COUNT objects (0 to index everything)."
326418
)

django_opensearch_dsl/registries.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,5 +180,9 @@ def __contains__(self, obj):
180180
f"'in <{type(self).__name__}>' requires a Model subclass as left operand, not {type(dict).__name__}"
181181
)
182182

183+
def get_indices_raw(self):
184+
"""Get all indices as they are store in the registry or the indices for a list of models."""
185+
return self._indices
186+
183187

184188
registry = DocumentRegistry()

0 commit comments

Comments
 (0)