Skip to content

Commit 871db13

Browse files
IGNITE-20697 Store crash recovery data to checkpoint recovery files
1 parent 20e82be commit 871db13

File tree

9 files changed

+56
-34
lines changed

9 files changed

+56
-34
lines changed

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointWorkflow.java

+29-7
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,8 @@
6666
import org.apache.ignite.internal.util.GridConcurrentMultiPairQueue;
6767
import org.apache.ignite.internal.util.GridMultiCollectionWrapper;
6868
import org.apache.ignite.internal.util.StripedExecutor;
69+
import org.apache.ignite.internal.util.function.ThrowableSupplier;
6970
import org.apache.ignite.internal.util.future.GridCompoundFuture;
70-
import org.apache.ignite.internal.util.future.GridFinishedFuture;
7171
import org.apache.ignite.internal.util.future.GridFutureAdapter;
7272
import org.apache.ignite.internal.util.typedef.T2;
7373
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -216,14 +216,16 @@ private IgniteThreadPoolExecutor initializeCheckpointPool() {
216216
* @param curr Current checkpoint event info.
217217
* @param tracker Checkpoint metrics tracker.
218218
* @param workProgressDispatcher Work progress dispatcher.
219+
* @param writeRecoveryData Write recovery data on checkpoint.
219220
* @return Checkpoint collected info.
220221
* @throws IgniteCheckedException if fail.
221222
*/
222223
public Checkpoint markCheckpointBegin(
223224
long cpTs,
224225
CheckpointProgressImpl curr,
225226
CheckpointMetricsTracker tracker,
226-
WorkProgressDispatcher workProgressDispatcher
227+
WorkProgressDispatcher workProgressDispatcher,
228+
boolean writeRecoveryData
227229
) throws IgniteCheckedException {
228230
Collection<DataRegion> checkpointedRegions = dataRegions.get();
229231

@@ -276,8 +278,26 @@ curr, new PartitionAllocationMap(), checkpointCollectPagesInfoPool, workProgress
276278

277279
fillCacheGroupState(cpRec);
278280

279-
//There are allowable to replace pages only after checkpoint entry was stored to disk.
280-
cpPagesHolder = beginAllCheckpoints(checkpointedRegions, curr.futureFor(MARKER_STORED_TO_DISK));
281+
IgniteInternalFuture<?> markerStoredToDiskFut = curr.futureFor(MARKER_STORED_TO_DISK);
282+
283+
// There are allowable to replace pages only after checkpoint entry was stored to disk.
284+
ThrowableSupplier<Boolean, IgniteCheckedException> allowToReplace = writeRecoveryData ?
285+
() -> {
286+
// If we write recovery data on checkpoint it's not safe to wait for MARKER_STORED_TO_DISK future,
287+
// recovery data writers acquire page memory segments locks to write the pages, in the same time
288+
// another thread can lock page memory segment for writing during page replacement and wait for
289+
// marker stored to disk, so deadlock is possible.
290+
return curr.greaterOrEqualTo(MARKER_STORED_TO_DISK);
291+
} :
292+
() -> {
293+
// Uninterruptibly is important because otherwise in case of interrupt of client thread node
294+
// would be stopped.
295+
markerStoredToDiskFut.getUninterruptibly();
296+
297+
return true;
298+
};
299+
300+
cpPagesHolder = beginAllCheckpoints(checkpointedRegions, allowToReplace);
281301

282302
curr.currentCheckpointPagesCount(cpPagesHolder.pagesNum());
283303

@@ -416,8 +436,10 @@ private void fillCacheGroupState(CheckpointRecord cpRec) throws IgniteCheckedExc
416436
* @return holder of FullPageIds obtained from each PageMemory, overall number of dirty pages, and flag defines at
417437
* least one user page became a dirty since last checkpoint.
418438
*/
419-
private CheckpointPagesInfoHolder beginAllCheckpoints(Collection<DataRegion> regions,
420-
IgniteInternalFuture<?> allowToReplace) {
439+
private CheckpointPagesInfoHolder beginAllCheckpoints(
440+
Collection<DataRegion> regions,
441+
ThrowableSupplier<Boolean, IgniteCheckedException> allowToReplace
442+
) {
421443
Collection<Map.Entry<PageMemoryEx, GridMultiCollectionWrapper<FullPageId>>> res =
422444
new ArrayList<>(regions.size());
423445

@@ -611,7 +633,7 @@ public void finalizeCheckpointOnRecovery(
611633

612634
Collection<DataRegion> regions = dataRegions.get();
613635

614-
CheckpointPagesInfoHolder cpPagesHolder = beginAllCheckpoints(regions, new GridFinishedFuture<>());
636+
CheckpointPagesInfoHolder cpPagesHolder = beginAllCheckpoints(regions, () -> Boolean.TRUE);
615637

616638
// Sort and split all dirty pages set to several stripes.
617639
GridConcurrentMultiPairQueue<PageMemoryEx, FullPageId> pages =

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -402,7 +402,8 @@ private void doCheckpoint() {
402402
startCheckpointProgress();
403403

404404
try {
405-
chp = checkpointWorkflow.markCheckpointBegin(lastCpTs, curCpProgress, tracker, this);
405+
chp = checkpointWorkflow.markCheckpointBegin(lastCpTs, curCpProgress, tracker, this,
406+
writeRecoveryData);
406407

407408
tracker.onMarkEnd();
408409

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/CheckpointPages.java

+8-7
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@
1919

2020
import java.util.Collection;
2121
import org.apache.ignite.IgniteCheckedException;
22-
import org.apache.ignite.internal.IgniteInternalFuture;
2322
import org.apache.ignite.internal.pagemem.FullPageId;
23+
import org.apache.ignite.internal.util.function.ThrowableSupplier;
2424

2525
/**
2626
* View of pages which should be stored during current checkpoint.
@@ -30,15 +30,15 @@ class CheckpointPages {
3030
private final Collection<FullPageId> segCheckpointPages;
3131

3232
/** The sign which allows to replace pages from a checkpoint by page replacer. */
33-
private final IgniteInternalFuture allowToReplace;
33+
private final ThrowableSupplier<Boolean, IgniteCheckedException> allowToReplace;
3434

3535
/**
3636
* @param pages Pages which would be stored to disk in current checkpoint.
37-
* @param replaceFuture The sign which allows to replace pages from a checkpoint by page replacer.
37+
* @param allowToReplace The sign which allows to replace pages from a checkpoint by page replacer.
3838
*/
39-
CheckpointPages(Collection<FullPageId> pages, IgniteInternalFuture replaceFuture) {
39+
CheckpointPages(Collection<FullPageId> pages, ThrowableSupplier<Boolean, IgniteCheckedException> allowToReplace) {
4040
segCheckpointPages = pages;
41-
allowToReplace = replaceFuture;
41+
this.allowToReplace = allowToReplace;
4242
}
4343

4444
/**
@@ -51,8 +51,9 @@ public boolean allowToSave(FullPageId fullPageId) throws IgniteCheckedException
5151
if (checkpointPages == null || allowToReplace == null)
5252
return false;
5353

54-
//Uninterruptibly is important because otherwise in case of interrupt of client thread node would be stopped.
55-
allowToReplace.getUninterruptibly();
54+
// Here, in allowToReplace we can wait on future for some time (until checkpoint marker is stored to disk).
55+
if (!allowToReplace.get())
56+
return false;
5657

5758
return checkpointPages.contains(fullPageId);
5859
}

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryEx.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.ignite.internal.processors.cache.persistence.PageStoreWriter;
2929
import org.apache.ignite.internal.processors.cache.persistence.StorageException;
3030
import org.apache.ignite.internal.util.GridMultiCollectionWrapper;
31+
import org.apache.ignite.internal.util.function.ThrowableSupplier;
3132

3233
/**
3334
* Page memory with some persistence related additions.
@@ -119,7 +120,7 @@ public long acquirePage(int grpId, long pageId, IoStatisticsHolder statHldr,
119120
* @throws IgniteException If checkpoint has been already started and was not finished.
120121
* @param allowToReplace The sign which allows to replace pages from a checkpoint by page replacer.
121122
*/
122-
public GridMultiCollectionWrapper<FullPageId> beginCheckpoint(IgniteInternalFuture allowToReplace)
123+
public GridMultiCollectionWrapper<FullPageId> beginCheckpoint(ThrowableSupplier<Boolean, IgniteCheckedException> allowToReplace)
123124
throws IgniteException;
124125

125126
/**
@@ -132,7 +133,7 @@ public GridMultiCollectionWrapper<FullPageId> beginCheckpoint(IgniteInternalFutu
132133
*{@link PageStoreWriter} will be called when the page will be ready to write.
133134
*
134135
* @param pageId Page ID to get byte buffer for. The page ID must be present in the collection returned by
135-
* the {@link #beginCheckpoint(IgniteInternalFuture)} method call.
136+
* the {@link #beginCheckpoint(ThrowableSupplier)} method call.
136137
* @param buf Temporary buffer to write changes into.
137138
* @param pageWriter Checkpoint page write context.
138139
* @param tracker Checkpoint metrics tracker.

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@
8181
import org.apache.ignite.internal.util.GridMultiCollectionWrapper;
8282
import org.apache.ignite.internal.util.GridUnsafe;
8383
import org.apache.ignite.internal.util.OffheapReadWriteLock;
84+
import org.apache.ignite.internal.util.function.ThrowableSupplier;
8485
import org.apache.ignite.internal.util.future.CountDownFuture;
8586
import org.apache.ignite.internal.util.lang.GridPlainRunnable;
8687
import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -1112,7 +1113,7 @@ boolean shouldThrottle(double dirtyRatioThreshold) {
11121113

11131114
/** {@inheritDoc} */
11141115
@Override public GridMultiCollectionWrapper<FullPageId> beginCheckpoint(
1115-
IgniteInternalFuture allowToReplace
1116+
ThrowableSupplier<Boolean, IgniteCheckedException> allowToReplace
11161117
) throws IgniteException {
11171118
if (segments == null)
11181119
return new GridMultiCollectionWrapper<>(Collections.emptyList());

modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsRecoveryAfterFileCorruptionTest.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@
5252
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
5353
import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
5454
import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
55-
import org.apache.ignite.internal.util.future.GridFinishedFuture;
5655
import org.apache.ignite.internal.util.typedef.internal.U;
5756
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
5857
import org.junit.Test;
@@ -347,7 +346,7 @@ private void generateWal(
347346
}
348347
}
349348

350-
Collection<FullPageId> pageIds = mem.beginCheckpoint(new GridFinishedFuture());
349+
Collection<FullPageId> pageIds = mem.beginCheckpoint(() -> Boolean.TRUE);
351350

352351
info("Acquired pages for checkpoint: " + pageIds.size());
353352

modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCheckpointSimulationWithRealCpDisabledTest.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,6 @@
7474
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
7575
import org.apache.ignite.internal.processors.cache.persistence.tree.io.TrackingPageIO;
7676
import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
77-
import org.apache.ignite.internal.util.future.GridFinishedFuture;
7877
import org.apache.ignite.internal.util.lang.GridCloseableIterator;
7978
import org.apache.ignite.internal.util.lang.GridFilteredClosableIterator;
8079
import org.apache.ignite.internal.util.typedef.F;
@@ -571,7 +570,7 @@ public void testDirtyFlag() throws Exception {
571570
ig.context().cache().context().database().checkpointReadUnlock();
572571
}
573572

574-
Collection<FullPageId> cpPages = mem.beginCheckpoint(new GridFinishedFuture());
573+
Collection<FullPageId> cpPages = mem.beginCheckpoint(() -> Boolean.TRUE);
575574

576575
ig.context().cache().context().database().checkpointReadLock();
577576

@@ -901,7 +900,7 @@ private IgniteBiTuple<Map<FullPageId, Integer>, WALPointer> runCheckpointing(
901900
try {
902901
snapshot = new HashMap<>(resMap);
903902

904-
pageIds = mem.beginCheckpoint(new GridFinishedFuture());
903+
pageIds = mem.beginCheckpoint(() -> Boolean.TRUE);
905904

906905
checkpoints--;
907906

modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgnitePageMemReplaceDelayedWriteUnitTest.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@
4848
import org.apache.ignite.internal.processors.performancestatistics.PerformanceStatisticsProcessor;
4949
import org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor;
5050
import org.apache.ignite.internal.util.GridMultiCollectionWrapper;
51-
import org.apache.ignite.internal.util.future.GridFinishedFuture;
5251
import org.apache.ignite.internal.util.typedef.internal.U;
5352
import org.apache.ignite.lang.IgniteOutClosure;
5453
import org.apache.ignite.logger.NullLogger;
@@ -120,7 +119,7 @@ public void testReplacementWithDelayCausesLockForRead() throws IgniteCheckedExce
120119
memory.releasePage(1, pageId, ptr);
121120
}
122121

123-
GridMultiCollectionWrapper<FullPageId> ids = memory.beginCheckpoint(new GridFinishedFuture());
122+
GridMultiCollectionWrapper<FullPageId> ids = memory.beginCheckpoint(() -> Boolean.TRUE);
124123
int cpPages = ids.size();
125124
log.info("Started CP with [" + cpPages + "] pages in it, created [" + markDirty + "] pages");
126125

@@ -183,7 +182,7 @@ public void testBackwardCompatibilityMode() throws IgniteCheckedException {
183182
memory.releasePage(1, pageId, ptr);
184183
}
185184

186-
GridMultiCollectionWrapper<FullPageId> ids = memory.beginCheckpoint(new GridFinishedFuture());
185+
GridMultiCollectionWrapper<FullPageId> ids = memory.beginCheckpoint(() -> Boolean.TRUE);
187186
int cpPages = ids.size();
188187
log.info("Started CP with [" + cpPages + "] pages in it, created [" + markDirty + "] pages");
189188

modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java

+7-8
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,6 @@
6060
import org.apache.ignite.internal.processors.plugin.IgnitePluginProcessor;
6161
import org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor;
6262
import org.apache.ignite.internal.util.GridMultiCollectionWrapper;
63-
import org.apache.ignite.internal.util.future.GridFinishedFuture;
6463
import org.apache.ignite.internal.util.typedef.internal.U;
6564
import org.apache.ignite.lang.IgniteInClosure;
6665
import org.apache.ignite.lang.IgniteOutClosure;
@@ -135,7 +134,7 @@ public void testCheckpointBufferOverusageDontCauseWriteLockLeak() throws Excepti
135134
//Success
136135
}
137136

138-
memory.beginCheckpoint(new GridFinishedFuture());
137+
memory.beginCheckpoint(() -> Boolean.TRUE);
139138

140139
final AtomicReference<FullPageId> lastPage = new AtomicReference<>();
141140

@@ -246,14 +245,14 @@ public void testCheckpointProtocolWriteDirtyPageAfterWriteUnlock() throws Except
246245
writePage(memory, fullId, (byte)1);
247246
}
248247

249-
doCheckpoint(memory.beginCheckpoint(new GridFinishedFuture()), memory, pageStoreMgr);
248+
doCheckpoint(memory.beginCheckpoint(() -> Boolean.TRUE), memory, pageStoreMgr);
250249

251250
FullPageId cowPageId = allocated.get(0);
252251

253252
// Mark some pages as dirty.
254253
writePage(memory, cowPageId, (byte)2);
255254

256-
GridMultiCollectionWrapper<FullPageId> cpPages = memory.beginCheckpoint(new GridFinishedFuture());
255+
GridMultiCollectionWrapper<FullPageId> cpPages = memory.beginCheckpoint(() -> Boolean.TRUE);
257256

258257
assertEquals(1, cpPages.size());
259258

@@ -306,7 +305,7 @@ public void runThrottlingEmptifyCpBufFirst(PageMemoryImpl.ThrottlingPolicy plc)
306305
writePage(memory, fullId, (byte)1);
307306
}
308307

309-
GridMultiCollectionWrapper<FullPageId> markedPages = memory.beginCheckpoint(new GridFinishedFuture());
308+
GridMultiCollectionWrapper<FullPageId> markedPages = memory.beginCheckpoint(() -> Boolean.TRUE);
310309

311310
for (int i = 0; i < pagesForStartThrottling + (memory.checkpointBufferPagesSize() * 2 / 3); i++)
312311
writePage(memory, allocated.get(i), (byte)1);
@@ -392,7 +391,7 @@ public void testCheckpointProtocolCannotReplaceUnwrittenPage() throws Exception
392391
}
393392

394393
// CP Write lock.
395-
memory.beginCheckpoint(new GridFinishedFuture());
394+
memory.beginCheckpoint(() -> Boolean.TRUE);
396395
// CP Write unlock.
397396

398397
byte[] buf = new byte[PAGE_SIZE];
@@ -466,7 +465,7 @@ private void testCheckpointBufferCantOverflowWithThrottlingMixedLoad(PageMemoryI
466465
acquireAndReleaseWriteLock(memory, fullPageId);
467466
}
468467

469-
memory.beginCheckpoint(new GridFinishedFuture());
468+
memory.beginCheckpoint(() -> Boolean.TRUE);
470469

471470
CheckpointMetricsTracker mockTracker = Mockito.mock(CheckpointMetricsTracker.class);
472471

@@ -488,7 +487,7 @@ private void testCheckpointBufferCantOverflowWithThrottlingMixedLoad(PageMemoryI
488487
acquireAndReleaseWriteLock(memory, fullPageId);
489488
}
490489

491-
memory.beginCheckpoint(new GridFinishedFuture());
490+
memory.beginCheckpoint(() -> Boolean.TRUE);
492491

493492
Collections.shuffle(pages); // Mix pages in checkpoint with clean pages
494493

0 commit comments

Comments
 (0)