|
6 | 6 | from typing import Iterable, Optional |
7 | 7 |
|
8 | 8 | from django.db import models |
9 | | -from django.db.models import Q, QuerySet |
| 9 | +from django.db.models import Max, Min, Q, QuerySet |
10 | 10 | from opensearchpy.helpers import bulk, parallel_bulk |
11 | 11 | from opensearchpy.helpers.document import Document as DSLDocument |
12 | 12 |
|
@@ -101,29 +101,38 @@ def get_indexing_queryset( |
101 | 101 | """Divide the queryset into chunks.""" |
102 | 102 | chunk_size = self.django.queryset_pagination |
103 | 103 | qs = self.get_queryset(db_alias=db_alias, filter_=filter_, exclude=exclude, count=count) |
104 | | - qs = qs.order_by("pk") if not qs.query.is_sliced else qs |
105 | 104 | count = qs.count() |
106 | 105 | model = self.django.model.__name__ |
107 | 106 | action = action.present_participle.title() |
108 | 107 |
|
109 | | - i = 0 |
| 108 | + if self.django.order_indexing_queryset and not qs.query.is_ordered: |
| 109 | + qs = qs.order_by("pk") |
| 110 | + |
| 111 | + # In order to avoid loading big querysets into memory or |
| 112 | + # loading them in temporary tables in the database, |
| 113 | + # we have the possibility to divide the queryset using batch_size. |
| 114 | + result = qs.aggregate(min_pk=Min("pk"), max_pk=Max("pk")) |
| 115 | + min_value = result["min_pk"] |
| 116 | + max_value = result["max_pk"] + 1 |
| 117 | + |
110 | 118 | done = 0 |
| 119 | + current_batch = 0 |
| 120 | + total_batches = (max_value - min_value + chunk_size - 1) // chunk_size |
111 | 121 | start = time.time() |
112 | 122 | if verbose: |
113 | 123 | stdout.write(f"{action} {model}: 0% ({self._eta(start, done, count)})\r") |
114 | | - while done < count: |
115 | | - if verbose: |
116 | | - stdout.write(f"{action} {model}: {round(i / count * 100)}% ({self._eta(start, done, count)})\r") |
117 | 124 |
|
118 | | - for obj in qs[i : i + chunk_size]: |
| 125 | + for pk_offset in range(min_value, max_value, chunk_size): |
| 126 | + current_batch += 1 |
| 127 | + max_pk = min(pk_offset + self.django.queryset_pagination, max_value) |
| 128 | + batch_qs = qs.filter(pk__gte=pk_offset, pk__lt=max_pk) |
| 129 | + stdout.write(f"Processing batch {current_batch}/{total_batches} with pk from {pk_offset} to {max_pk - 1}\n") |
| 130 | + for obj in batch_qs: |
119 | 131 | done += 1 |
| 132 | + if done % chunk_size == 0: |
| 133 | + stdout.write(f"{action} {model}: {round(done / count * 100)}% ({self._eta(start, done, count)})\r") |
120 | 134 | yield obj |
121 | 135 |
|
122 | | - i = min(i + chunk_size, count) |
123 | | - |
124 | | - if verbose: |
125 | | - stdout.write(f"{action} {count} {model}: OK \n") |
126 | | - |
127 | 136 | def init_prepare(self): |
128 | 137 | """Initialise the data model preparers once here. |
129 | 138 |
|
|
0 commit comments