22
22
import java .util .LinkedHashMap ;
23
23
import java .util .List ;
24
24
import java .util .Map ;
25
+ import java .util .concurrent .atomic .AtomicLong ;
25
26
import java .util .function .Supplier ;
26
27
import org .apache .ignite .IgniteCheckedException ;
27
28
import org .apache .ignite .IgniteLogger ;
28
29
import org .apache .ignite .configuration .DataRegionConfiguration ;
29
30
import org .apache .ignite .configuration .LoadAllWarmUpConfiguration ;
31
+ import org .apache .ignite .internal .GridKernalContext ;
32
+ import org .apache .ignite .internal .managers .communication .GridIoPolicy ;
30
33
import org .apache .ignite .internal .processors .cache .CacheGroupContext ;
31
34
import org .apache .ignite .internal .processors .cache .GridCacheProcessor ;
32
35
import org .apache .ignite .internal .processors .cache .distributed .dht .topology .GridDhtLocalPartition ;
33
36
import org .apache .ignite .internal .processors .cache .persistence .DataRegion ;
34
37
import org .apache .ignite .internal .processors .cache .persistence .pagemem .PageMemoryEx ;
35
38
import org .apache .ignite .internal .util .tostring .GridToStringExclude ;
36
39
import org .apache .ignite .internal .util .typedef .internal .S ;
40
+ import org .apache .ignite .internal .util .typedef .internal .U ;
37
41
42
+ import static java .util .stream .Collectors .averagingInt ;
38
43
import static java .util .stream .Collectors .toList ;
39
44
import static org .apache .ignite .internal .pagemem .PageIdAllocator .INDEX_PARTITION ;
45
+ import static org .apache .ignite .internal .util .IgniteUtils .doInParallel ;
40
46
41
47
/**
42
48
* "Load all" warm-up strategy, which loads pages to persistent data region
@@ -99,12 +105,32 @@ public LoadAllWarmUpStrategy(IgniteLogger log, Supplier<Collection<CacheGroupCon
99
105
loadDataInfo .keySet ().stream ().map (CacheGroupContext ::cacheOrGroupName ).collect (toList ()) + ']' );
100
106
}
101
107
102
- long loadedPageCnt = 0 ;
108
+ AtomicLong loadedPageCnt = new AtomicLong () ;
103
109
104
110
for (Map .Entry <CacheGroupContext , List <LoadPartition >> e : loadDataInfo .entrySet ()) {
105
111
CacheGroupContext grp = e .getKey ();
106
112
List <LoadPartition > parts = e .getValue ();
107
113
114
+ LoadPartition idxPart = parts .get (0 );
115
+
116
+ int avgPartPagesCnt = parts .stream ()
117
+ .filter (p -> p .part () != INDEX_PARTITION )
118
+ .map (LoadPartition ::pages )
119
+ .collect (averagingInt (i -> i )).intValue ();
120
+
121
+ if (avgPartPagesCnt != 0 && idxPart .pages () > avgPartPagesCnt ) {
122
+ // Split index partition into chunks to balance threads load. It's needed since
123
+ // index partition is usually much larger than the data one if indexing is enabled.
124
+ List <LoadPartition > newParts = new ArrayList <>(idxPart .pages () / avgPartPagesCnt + parts .size ());
125
+
126
+ for (int i = 0 ; i < idxPart .pages (); i += avgPartPagesCnt )
127
+ newParts .add (new LoadPartition (idxPart .part (), Math .min (idxPart .pages () - i , avgPartPagesCnt ), i ));
128
+
129
+ newParts .addAll (parts .subList (1 , parts .size ()));
130
+
131
+ parts = newParts ;
132
+ }
133
+
108
134
if (log .isInfoEnabled ()) {
109
135
log .info ("Start warm-up cache group, with estimated statistics [name=" + grp .cacheOrGroupName ()
110
136
+ ", partCnt=" + parts .size () + ", pageCnt="
@@ -113,31 +139,42 @@ public LoadAllWarmUpStrategy(IgniteLogger log, Supplier<Collection<CacheGroupCon
113
139
114
140
PageMemoryEx pageMemEx = (PageMemoryEx )region .pageMemory ();
115
141
116
- for (LoadPartition part : parts ) {
117
- long pageId = pageMemEx .partitionMetaPageId (grp .groupId (), part .part ());
142
+ GridKernalContext ctx = grp .shared ().kernalContext ();
118
143
119
- for (int i = 0 ; i < part .pages (); i ++, pageId ++, loadedPageCnt ++) {
120
- if (stop ) {
121
- if (log .isInfoEnabled ()) {
122
- log .info ("Stop warm-up cache group with loaded statistics [name="
123
- + grp .cacheOrGroupName () + ", pageCnt=" + loadedPageCnt
124
- + ", remainingPageCnt=" + (availableLoadPageCnt - loadedPageCnt ) + ']' );
125
- }
144
+ doInParallel (
145
+ U .availableThreadCount (ctx , GridIoPolicy .SYSTEM_POOL , 2 ),
146
+ ctx .pools ().getSystemExecutorService (),
147
+ parts ,
148
+ part -> {
149
+ long pageId = pageMemEx .partitionMetaPageId (grp .groupId (), part .part ());
126
150
127
- return ;
128
- }
151
+ pageId += part .startPageIdx ();
129
152
130
- long pagePtr = -1 ;
153
+ for (int i = 0 ; i < part .pages (); i ++, pageId ++, loadedPageCnt .incrementAndGet ()) {
154
+ if (stop ) {
155
+ if (log .isInfoEnabled ()) {
156
+ log .info ("Stop warm-up cache group with loaded statistics [name="
157
+ + grp .cacheOrGroupName () + ", pageCnt=" + loadedPageCnt .get ()
158
+ + ", remainingPageCnt=" + (availableLoadPageCnt - loadedPageCnt .get ()) + ']' );
159
+ }
131
160
132
- try {
133
- pagePtr = pageMemEx .acquirePage (grp .groupId (), pageId );
134
- }
135
- finally {
136
- if (pagePtr != -1 )
137
- pageMemEx .releasePage (grp .groupId (), pageId , pagePtr );
161
+ return null ;
162
+ }
163
+
164
+ long pagePtr = -1 ;
165
+
166
+ try {
167
+ pagePtr = pageMemEx .acquirePage (grp .groupId (), pageId );
168
+ }
169
+ finally {
170
+ if (pagePtr != -1 )
171
+ pageMemEx .releasePage (grp .groupId (), pageId , pagePtr );
172
+ }
138
173
}
174
+
175
+ return null ;
139
176
}
140
- }
177
+ );
141
178
}
142
179
}
143
180
@@ -194,14 +231,14 @@ protected Map<CacheGroupContext, List<LoadPartition>> loadDataInfo(
194
231
for (int j = -1 ; j < locParts .size () && availableLoadPageCnt > 0 ; j ++) {
195
232
int p = j == -1 ? INDEX_PARTITION : locParts .get (j ).id ();
196
233
197
- long partPageCnt = grp .shared ().pageStore ().pages (grp .groupId (), p );
234
+ int partPageCnt = grp .shared ().pageStore ().pages (grp .groupId (), p );
198
235
199
236
if (partPageCnt > 0 ) {
200
- long pageCnt = (availableLoadPageCnt - partPageCnt ) >= 0 ? partPageCnt : availableLoadPageCnt ;
237
+ int pageCnt = (availableLoadPageCnt - partPageCnt ) >= 0 ? partPageCnt : ( int ) availableLoadPageCnt ;
201
238
202
239
availableLoadPageCnt -= pageCnt ;
203
240
204
- loadableGrps .computeIfAbsent (grp , grpCtx -> new ArrayList <>()).add (new LoadPartition (p , pageCnt ));
241
+ loadableGrps .computeIfAbsent (grp , grpCtx -> new ArrayList <>()).add (new LoadPartition (p , pageCnt , 0 ));
205
242
}
206
243
}
207
244
}
@@ -217,20 +254,25 @@ static class LoadPartition {
217
254
private final int part ;
218
255
219
256
/** Number of pages to load. */
220
- private final long pages ;
257
+ private final int pages ;
258
+
259
+ /** Index of first page to load. */
260
+ private final int startPageIdx ;
221
261
222
262
/**
223
263
* Constructor.
224
264
*
225
265
* @param part Partition id.
226
266
* @param pages Number of pages to load.
267
+ * @param startPageIdx Index of first page to load.
227
268
*/
228
- public LoadPartition (int part , long pages ) {
269
+ public LoadPartition (int part , int pages , int startPageIdx ) {
229
270
assert part >= 0 : "Partition id cannot be negative." ;
230
271
assert pages > 0 : "Number of pages to load must be greater than zero." ;
231
272
232
273
this .part = part ;
233
274
this .pages = pages ;
275
+ this .startPageIdx = startPageIdx ;
234
276
}
235
277
236
278
/**
@@ -247,10 +289,19 @@ public int part() {
247
289
*
248
290
* @return Number of pages to load.
249
291
*/
250
- public long pages () {
292
+ public int pages () {
251
293
return pages ;
252
294
}
253
295
296
+ /**
297
+ * Return index of first page to load.
298
+ *
299
+ * @return Index of first page to load.
300
+ */
301
+ public int startPageIdx () {
302
+ return startPageIdx ;
303
+ }
304
+
254
305
/** {@inheritDoc} */
255
306
@ Override public String toString () {
256
307
return S .toString (LoadPartition .class , this );
0 commit comments