diff --git a/modules/compress/src/main/java/org/apache/ignite/internal/processors/compress/CompressionProcessorImpl.java b/modules/compress/src/main/java/org/apache/ignite/internal/processors/compress/CompressionProcessorImpl.java index 49de6a6e6efaca..1d159eb8f67b7c 100644 --- a/modules/compress/src/main/java/org/apache/ignite/internal/processors/compress/CompressionProcessorImpl.java +++ b/modules/compress/src/main/java/org/apache/ignite/internal/processors/compress/CompressionProcessorImpl.java @@ -30,9 +30,7 @@ import org.apache.ignite.configuration.DiskPageCompression; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.ThreadLocalDirectByteBuffer; -import org.apache.ignite.internal.pagemem.PageUtils; import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIO; -import org.apache.ignite.internal.processors.cache.persistence.tree.io.CompactablePageIO; import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO; import org.apache.ignite.internal.util.GridUnsafe; import org.apache.ignite.internal.util.typedef.internal.U; @@ -41,16 +39,12 @@ import static java.nio.file.StandardOpenOption.CREATE; import static java.nio.file.StandardOpenOption.WRITE; import static org.apache.ignite.configuration.DataStorageConfiguration.MAX_PAGE_SIZE; -import static org.apache.ignite.configuration.DiskPageCompression.SKIP_GARBAGE; import static org.apache.ignite.internal.util.GridUnsafe.NATIVE_BYTE_ORDER; /** * Compression processor. */ public class CompressionProcessorImpl extends CompressionProcessor { - /** Max page size. */ - private final ThreadLocalDirectByteBuffer compactBuf = new ThreadLocalDirectByteBuffer(MAX_PAGE_SIZE, NATIVE_BYTE_ORDER); - /** A bit more than max page size, extra space is required by compressors. */ private final ThreadLocalDirectByteBuffer compressBuf = new ThreadLocalDirectByteBuffer(maxCompressedBufferSize(MAX_PAGE_SIZE), NATIVE_BYTE_ORDER); @@ -92,84 +86,6 @@ public CompressionProcessorImpl(GridKernalContext ctx) { checkPunchHole(storagePath, fsBlockSize); } - /** {@inheritDoc} */ - @Override public ByteBuffer compressPage( - ByteBuffer page, - int pageSize, - int blockSize, - DiskPageCompression compression, - int compressLevel - ) throws IgniteCheckedException { - assert compression != null && compression != DiskPageCompression.DISABLED : compression; - assert U.isPow2(blockSize) : blockSize; - assert page.position() == 0 && page.limit() >= pageSize; - - int oldPageLimit = page.limit(); - - try { - // Page size will be less than page limit when TDE is enabled. To make compaction and compression work - // correctly we need to set limit to real page size. - page.limit(pageSize); - - ByteBuffer compactPage = doCompactPage(page, pageSize); - - int compactSize = compactPage.limit(); - - assert compactSize <= pageSize : compactSize; - - // If no need to compress further or configured just to skip garbage. - if (compactSize < blockSize || compression == SKIP_GARBAGE) - return setCompactionInfo(compactPage, compactSize); - - ByteBuffer compressedPage = doCompressPage(compression, compactPage, compactSize, compressLevel); - - assert compressedPage.position() == 0; - int compressedSize = compressedPage.limit(); - - int freeCompactBlocks = (pageSize - compactSize) / blockSize; - int freeCompressedBlocks = (pageSize - compressedSize) / blockSize; - - if (freeCompactBlocks >= freeCompressedBlocks) { - if (freeCompactBlocks == 0) - return page; // No blocks will be released. - - return setCompactionInfo(compactPage, compactSize); - } - - return setCompressionInfo(compressedPage, compression, compressedSize, compactSize); - } - finally { - page.limit(oldPageLimit); - } - } - - /** - * @param page Page buffer. - * @param pageSize Page size. - * @return Compacted page buffer. - */ - private ByteBuffer doCompactPage(ByteBuffer page, int pageSize) throws IgniteCheckedException { - PageIO io = PageIO.getPageIO(page); - - ByteBuffer compactPage = compactBuf.get(); - - if (io instanceof CompactablePageIO) { - // Drop the garbage from the page. - ((CompactablePageIO)io).compactPage(page, compactPage, pageSize); - } - else { - // Direct buffer is required as output of this method. - if (page.isDirect()) - return page; - - PageUtils.putBytes(GridUnsafe.bufferAddress(compactPage), 0, page.array()); - - compactPage.limit(pageSize); - } - - return compactPage; - } - /** Check if filesystem actually supports punching holes. */ private void checkPunchHole(Path storagePath, int fsBlockSz) throws IgniteException { ByteBuffer buffer = null; @@ -198,33 +114,6 @@ private void checkPunchHole(Path storagePath, int fsBlockSz) throws IgniteExcept } } - /** - * @param page Page. - * @param compactSize Compacted page size. - * @return The given page. - */ - private static ByteBuffer setCompactionInfo(ByteBuffer page, int compactSize) { - return setCompressionInfo(page, SKIP_GARBAGE, compactSize, compactSize); - } - - /** - * @param page Page. - * @param compression Compression algorithm. - * @param compressedSize Compressed size. - * @param compactedSize Compact size. - * @return The given page. - */ - private static ByteBuffer setCompressionInfo(ByteBuffer page, DiskPageCompression compression, int compressedSize, int compactedSize) { - assert compressedSize >= 0 && compressedSize <= Short.MAX_VALUE : compressedSize; - assert compactedSize >= 0 && compactedSize <= Short.MAX_VALUE : compactedSize; - - PageIO.setCompressionType(page, getCompressionType(compression)); - PageIO.setCompressedSize(page, (short)compressedSize); - PageIO.setCompactedSize(page, (short)compactedSize); - - return page; - } - /** * @param compression Compression algorithm. * @param compactPage Compacted page. @@ -232,7 +121,12 @@ private static ByteBuffer setCompressionInfo(ByteBuffer page, DiskPageCompressio * @param compressLevel Compression level. * @return Compressed page. */ - private ByteBuffer doCompressPage(DiskPageCompression compression, ByteBuffer compactPage, int compactSize, int compressLevel) { + @Override protected ByteBuffer doCompressPage( + DiskPageCompression compression, + ByteBuffer compactPage, + int compactSize, + int compressLevel + ) { switch (compression) { case ZSTD: return compressPageZstd(compactPage, compactSize, compressLevel); @@ -319,99 +213,46 @@ private static void copyPageHeader(ByteBuffer compactPage, ByteBuffer compressed compactPage.limit(compactSize); } - /** - * @param compression Compression. - * @return Level. - */ - private static byte getCompressionType(DiskPageCompression compression) { - if (compression == DiskPageCompression.DISABLED) - return UNCOMPRESSED_PAGE; - - switch (compression) { - case ZSTD: - return ZSTD_COMPRESSED_PAGE; - - case LZ4: - return LZ4_COMPRESSED_PAGE; - - case SNAPPY: - return SNAPPY_COMPRESSED_PAGE; - - case SKIP_GARBAGE: - return COMPACTED_PAGE; - } - throw new IllegalStateException("Unexpected compression: " + compression); - } - /** {@inheritDoc} */ - @Override public void decompressPage(ByteBuffer page, int pageSize) throws IgniteCheckedException { - assert page.capacity() >= pageSize : "capacity=" + page.capacity() + ", pageSize=" + pageSize; - - byte compressType = PageIO.getCompressionType(page); - - if (compressType == UNCOMPRESSED_PAGE) - return; // Nothing to do. + @Override protected void doDecompressPage(int compressType, ByteBuffer page, int compressedSize, int compactSize) { + ByteBuffer dst = compressBuf.get(); - short compressedSize = PageIO.getCompressedSize(page); - short compactSize = PageIO.getCompactedSize(page); + // Position on a part that needs to be decompressed. + page.limit(compressedSize) + .position(PageIO.COMMON_HEADER_END); - assert compactSize <= pageSize && compactSize >= compressedSize; + // LZ4 needs this limit to be exact. + dst.limit(compactSize - PageIO.COMMON_HEADER_END); - if (compressType == COMPACTED_PAGE) { - // Just setup bounds before restoring the page. - page.position(0).limit(compactSize); - } - else { - ByteBuffer dst = compressBuf.get(); - - // Position on a part that needs to be decompressed. - page.limit(compressedSize) - .position(PageIO.COMMON_HEADER_END); - - // LZ4 needs this limit to be exact. - dst.limit(compactSize - PageIO.COMMON_HEADER_END); - - switch (compressType) { - case ZSTD_COMPRESSED_PAGE: - Zstd.decompress(dst, page); - dst.flip(); - - break; + switch (compressType) { + case ZSTD_COMPRESSED_PAGE: + Zstd.decompress(dst, page); + dst.flip(); - case LZ4_COMPRESSED_PAGE: - Lz4.decompress(page, dst); - dst.flip(); + break; - break; + case LZ4_COMPRESSED_PAGE: + Lz4.decompress(page, dst); + dst.flip(); - case SNAPPY_COMPRESSED_PAGE: - try { - Snappy.uncompress(page, dst); - } - catch (IOException e) { - throw new IgniteException(e); - } - break; - - default: - throw new IgniteException("Unknown compression: " + compressType); - } - - page.position(PageIO.COMMON_HEADER_END).limit(compactSize); - page.put(dst).flip(); - assert page.limit() == compactSize; - } + break; - PageIO io = PageIO.getPageIO(page); + case SNAPPY_COMPRESSED_PAGE: + try { + Snappy.uncompress(page, dst); + } + catch (IOException e) { + throw new IgniteException(e); + } + break; - if (io instanceof CompactablePageIO) - ((CompactablePageIO)io).restorePage(page, pageSize); - else { - assert compactSize == pageSize - : "Wrong compacted page size [compactSize=" + compactSize + ", pageSize=" + pageSize + ']'; + default: + throw new IgniteException("Unknown compression: " + compressType); } - setCompressionInfo(page, DiskPageCompression.DISABLED, 0, 0); + page.position(PageIO.COMMON_HEADER_END).limit(compactSize); + page.put(dst).flip(); + assert page.limit() == compactSize; } /** */ diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java index f5bf5598a692bf..40f36be257ea06 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java @@ -194,6 +194,12 @@ public class DataStorageConfiguration implements Serializable { /** Value used to indicate the use of half of the {@link #getMaxWalArchiveSize}. */ public static final long HALF_MAX_WAL_ARCHIVE_SIZE = -1; + /** Default value for {@link #writeRecoveryDataOnCheckpoint} property. */ + public static final boolean DFLT_WRITE_RECOVERY_DATA_ON_CP = false; + + /** Default compression algorithm for checkpoint recovery data. */ + public static final DiskPageCompression DFLT_CP_RECOVERY_DATA_COMRESSION = DiskPageCompression.SKIP_GARBAGE; + /** Memory page size. */ private int pageSize = IgniteSystemProperties.getInteger( IGNITE_DEFAULT_DATA_STORAGE_PAGE_SIZE, 0); @@ -349,6 +355,19 @@ public class DataStorageConfiguration implements Serializable { /** Default memory allocator for all data regions. */ @Nullable private MemoryAllocator memoryAllocator = null; + /** + * Mode for storing page recovery data. + * If {@code true}, page recovery data will be written during checkpoint. + * If {@code false}, WAL physical records will be used to store page recovery data. + */ + private boolean writeRecoveryDataOnCheckpoint = DFLT_WRITE_RECOVERY_DATA_ON_CP; + + /** Compression algorithm for checkpoint recovery data. */ + private DiskPageCompression cpRecoveryDataCompression = DFLT_CP_RECOVERY_DATA_COMRESSION; + + /** Compression level for checkpoint recovery data. */ + private Integer cpRecoveryDataCompressionLevel; + /** * Creates valid durable memory configuration with all default values. */ @@ -1404,6 +1423,72 @@ public DataStorageConfiguration setMemoryAllocator(MemoryAllocator allocator) { return this; } + /** + * @return Flag defining mode for storing page recovery data. If {@code true}, recovery data will be written + * during checkpoint, if {@code false}, WAL physical records will be used to store recovery data. + */ + public boolean isWriteRecoveryDataOnCheckpoint() { + return writeRecoveryDataOnCheckpoint; + } + + /** + * Sets mode for storing page recovery data. + * + * @param writeRecoveryDataOnCheckpoint If {@code true}, page recovery data will be written during checkpoint, + * if {@code false}, WAL physical records will be used to store page recovery data. + * Default is {@link #DFLT_WRITE_RECOVERY_DATA_ON_CP}. + * @return {@code this} for chaining. + */ + public DataStorageConfiguration setWriteRecoveryDataOnCheckpoint(boolean writeRecoveryDataOnCheckpoint) { + this.writeRecoveryDataOnCheckpoint = writeRecoveryDataOnCheckpoint; + + return this; + } + + /** + * Gets compression algorithm for checkpoint recovery data. + * + * @return Page compression algorithm. + */ + public DiskPageCompression getCheckpointRecoveryDataCompression() { + return cpRecoveryDataCompression == null ? DFLT_CP_RECOVERY_DATA_COMRESSION : cpRecoveryDataCompression; + } + + /** + * Sets compression algorithm for checkpoint recovery data. + * + * @param cpRecoveryDataCompression Compression algorithm. + * @return {@code this} for chaining. + */ + public DataStorageConfiguration setCheckpointRecoveryDataCompression(DiskPageCompression cpRecoveryDataCompression) { + this.cpRecoveryDataCompression = cpRecoveryDataCompression; + + return this; + } + + /** + * Gets {@link #getCheckpointRecoveryDataCompression()} algorithm specific compression level. + * + * @return Checkpoint recovery data compression level or {@code null} for default. + */ + public Integer getCheckpointRecoveryDataCompressionLevel() { + return cpRecoveryDataCompressionLevel; + } + + /** + * Sets {@link #setCheckpointRecoveryDataCompression(DiskPageCompression)} algorithm specific compression level. + * + * @param cpRecoveryDataCompressionLevel Checkpoint recovery data compression level or {@code null} to use default. + * {@link DiskPageCompression#ZSTD Zstd}: from {@code -131072} to {@code 22} (default {@code 3}). + * {@link DiskPageCompression#LZ4 LZ4}: from {@code 0} to {@code 17} (default {@code 0}). + * @return {@code this} for chaining. + */ + public DataStorageConfiguration setCheckpointRecoveryDataCompressionLevel(Integer cpRecoveryDataCompressionLevel) { + this.cpRecoveryDataCompressionLevel = cpRecoveryDataCompressionLevel; + + return this; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(DataStorageConfiguration.class, this); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java index 35a01a0ad83325..54ea4c12cfb99b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java @@ -208,12 +208,12 @@ public WALIterator replay( public int reserved(WALPointer low, WALPointer high); /** - * Checks WAL disabled for cache group. + * Checks WAL page records disabled. * * @param grpId Group id. * @param pageId Page id. */ - public boolean disabled(int grpId, long pageId); + public boolean pageRecordsDisabled(int grpId, long pageId); /** * Getting local WAL segment size. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java index 1b7a104f074d44..58ee111c3e81e2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java @@ -1012,13 +1012,16 @@ private WalStateResult awaitCheckpoint(CheckpointProgress cpFut, WalStatePropose } /** - * Checks WAL disabled for cache group. + * Checks WAL page records disabled. * * @param grpId Group id. * @param pageId Page id. * @return {@code True} if WAL disable for group. {@code False} If not. */ - public boolean isDisabled(int grpId, long pageId) { + public boolean isPageRecordsDisabled(int grpId, long pageId) { + if (cctx.kernalContext().config().getDataStorageConfiguration().isWriteRecoveryDataOnCheckpoint()) + return true; + CacheGroupContext ctx = cctx.cache().cacheGroup(grpId); return ctx != null && (!ctx.walEnabled() diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DataStorageMetricsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DataStorageMetricsImpl.java index 49ca4ef059fb52..80444e7d102234 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DataStorageMetricsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DataStorageMetricsImpl.java @@ -89,6 +89,9 @@ public class DataStorageMetricsImpl { /** */ private final AtomicLongMetric lastCpSplitAndSortPagesDuration; + /** */ + private final AtomicLongMetric lastCpRecoveryDataWriteDuration; + /** */ private final AtomicLongMetric lastCpTotalPages; @@ -155,6 +158,9 @@ public class DataStorageMetricsImpl { /** */ private final HistogramMetricImpl cpSplitAndSortPagesHistogram; + /** */ + private final HistogramMetricImpl cpRecoveryDataWriteHistogram; + /** */ private final HistogramMetricImpl cpHistogram; @@ -246,6 +252,9 @@ public DataStorageMetricsImpl( lastCpSplitAndSortPagesDuration = mreg.longMetric("LastCheckpointSplitAndSortPagesDuration", "Duration of splitting and sorting checkpoint pages of the last checkpoint in milliseconds."); + lastCpRecoveryDataWriteDuration = mreg.longMetric("LastCheckpointRecoveryDataWriteDuration", + "Duration of checkpoint recovery data write in milliseconds."); + lastCpTotalPages = mreg.longMetric("LastCheckpointTotalPagesNumber", "Total number of pages written during the last checkpoint."); @@ -308,6 +317,9 @@ public DataStorageMetricsImpl( cpSplitAndSortPagesHistogram = mreg.histogram("CheckpointSplitAndSortPagesHistogram", cpBounds, "Histogram of splitting and sorting checkpoint pages duration in milliseconds."); + cpRecoveryDataWriteHistogram = mreg.histogram("CheckpointRecoveryDataWriteHistogram", cpBounds, + "Histogram of checkpoint recovery data write duration in milliseconds."); + cpHistogram = mreg.histogram("CheckpointHistogram", cpBounds, "Histogram of checkpoint duration in milliseconds."); @@ -658,6 +670,7 @@ public boolean metricsEnabled() { * @param walRecordFsyncDuration Duration of WAL fsync after logging {@link CheckpointRecord} on checkpoint begin. * @param writeEntryDuration Duration of checkpoint entry buffer writing to file. * @param splitAndSortPagesDuration Duration of splitting and sorting checkpoint pages. + * @param recoveryDataWriteDuration Recovery data write duration. * @param duration Total checkpoint duration. * @param start Checkpoint start time. * @param totalPages Total number of all pages in checkpoint. @@ -677,6 +690,7 @@ public void onCheckpoint( long walRecordFsyncDuration, long writeEntryDuration, long splitAndSortPagesDuration, + long recoveryDataWriteDuration, long duration, long start, long totalPages, @@ -698,6 +712,7 @@ public void onCheckpoint( lastCpWalRecordFsyncDuration.value(walRecordFsyncDuration); lastCpWriteEntryDuration.value(writeEntryDuration); lastCpSplitAndSortPagesDuration.value(splitAndSortPagesDuration); + lastCpRecoveryDataWriteDuration.value(recoveryDataWriteDuration); lastCpDuration.value(duration); lastCpStart.value(start); lastCpTotalPages.value(totalPages); @@ -718,6 +733,7 @@ public void onCheckpoint( cpWalRecordFsyncHistogram.value(walRecordFsyncDuration); cpWriteEntryHistogram.value(writeEntryDuration); cpSplitAndSortPagesHistogram.value(splitAndSortPagesDuration); + cpRecoveryDataWriteHistogram.value(recoveryDataWriteDuration); cpHistogram.value(duration); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java index 6cd08378b248c2..e704347f1be2ae 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java @@ -66,9 +66,9 @@ import org.apache.ignite.internal.mem.DirectMemoryProvider; import org.apache.ignite.internal.mem.DirectMemoryRegion; import org.apache.ignite.internal.metric.IoStatisticsHolderNoOp; +import org.apache.ignite.internal.pagemem.FullPageId; import org.apache.ignite.internal.pagemem.PageIdAllocator; import org.apache.ignite.internal.pagemem.PageMemory; -import org.apache.ignite.internal.pagemem.PageUtils; import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager; import org.apache.ignite.internal.pagemem.store.PageStore; import org.apache.ignite.internal.pagemem.wal.WALIterator; @@ -110,6 +110,7 @@ import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener; import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointManager; import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointProgress; +import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointRecoveryFile; import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointStatus; import org.apache.ignite.internal.processors.cache.persistence.checkpoint.Checkpointer; import org.apache.ignite.internal.processors.cache.persistence.checkpoint.LightweightCheckpointManager; @@ -129,6 +130,7 @@ import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId; import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO; import org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIO; +import org.apache.ignite.internal.processors.cache.persistence.tree.util.PageHandler; import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer; import org.apache.ignite.internal.processors.cache.persistence.wal.crc.IgniteDataIntegrityViolationException; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager; @@ -143,6 +145,7 @@ import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.util.StripedExecutor; import org.apache.ignite.internal.util.TimeBag; +import org.apache.ignite.internal.util.future.CountDownFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.T2; @@ -2132,9 +2135,84 @@ private RestoreBinaryState performBinaryMemoryRestore( ", walArchive=" + persistenceCfg.getWalArchivePath() + "]"); } + long start = U.currentTimeMillis(); + + AtomicLong applied = new AtomicLong(); + CacheStripedExecutor exec = new CacheStripedExecutor(cctx.kernalContext().pools().getStripedExecutorService()); - long start = U.currentTimeMillis(); + boolean restoredFromCheckpointRecoveryFiles = false; + + // Try to restore from checkpoint recovery files. + if (apply) { + List recoveryFiles = checkpointManager.checkpointRecoveryFiles(status.cpStartId); + + // TODO Perhaps some consistency check that all files are found should be added. + if (!recoveryFiles.isEmpty() ) { + if (log.isInfoEnabled()) { + recoveryFiles.sort(Comparator.comparing(CheckpointRecoveryFile::checkpointerIndex)); + + String files = recoveryFiles.size() == 1 ? recoveryFiles.get(0).file().getName() : + recoveryFiles.get(0).file().getName() + " .. " + + recoveryFiles.get(recoveryFiles.size() - 1).file().getName(); + + log.info("Start physical recovery from checkpoint recovery files [" + files + ']'); + } + + CountDownFuture cpRecoveryFut = new CountDownFuture(recoveryFiles.size()); + + recoveryFiles.forEach(cpRecoveryFile -> { + exec.submit(() -> { + Throwable err = null; + + try { + cpRecoveryFile.forAllPages(cacheGroupsPredicate::apply, (fullPageId, buf) -> { + if (skipRemovedIndexUpdates(fullPageId.groupId(), partId(fullPageId.pageId()))) + return; + + try { + PageMemoryEx pageMem = getPageMemoryForCacheGroup(fullPageId.groupId()); + + if (pageMem == null) + return; + + applyPage(pageMem, fullPageId, buf); + + applied.incrementAndGet(); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + }); + } + catch (Throwable e) { + err = e; + } + finally { + try { + cpRecoveryFile.close(); + } + catch (Exception e) { + if (err == null) + err = e; + else + err.addSuppressed(e); + } + cpRecoveryFut.onDone(err); + } + // Use file index instead of grpId to define stripe of stripped executor. + }, cpRecoveryFile.checkpointerIndex(), 0); + }); + + cpRecoveryFut.get(); + + restoredFromCheckpointRecoveryFiles = true; + + // Fall through to restore from WAL after restoring from checkpoint recovery files. + // There will be no page snapshot records and page delta records in WAL in this case, but we still + // need to apply PART_META_UPDATE_STATE/PARTITION_DESTROY records. + } + } long lastArchivedSegment = cctx.wal().lastArchivedSegment(); @@ -2142,8 +2220,6 @@ private RestoreBinaryState performBinaryMemoryRestore( RestoreBinaryState restoreBinaryState = new RestoreBinaryState(status, it, lastArchivedSegment, cacheGroupsPredicate); - AtomicLong applied = new AtomicLong(); - try { while (restoreBinaryState.hasNext()) { if (exec.error()) @@ -2157,6 +2233,8 @@ private RestoreBinaryState performBinaryMemoryRestore( switch (rec.type()) { case PAGE_RECORD: if (restoreBinaryState.needApplyBinaryUpdate()) { + assert !restoredFromCheckpointRecoveryFiles; + PageSnapshot pageSnapshot = (PageSnapshot)rec; // Here we do not require tag check because we may be applying memory changes after @@ -2235,6 +2313,8 @@ private RestoreBinaryState performBinaryMemoryRestore( default: if (restoreBinaryState.needApplyBinaryUpdate() && rec instanceof PageDeltaRecord) { + assert !restoredFromCheckpointRecoveryFiles; + PageDeltaRecord pageDelta = (PageDeltaRecord)rec; int groupId = pageDelta.groupId(); @@ -2317,9 +2397,19 @@ public void stripedApplyPage( * @param pageSnapshotRecord Page snapshot record. * @throws IgniteCheckedException If failed. */ - public void applyPageSnapshot(PageMemoryEx pageMem, PageSnapshot pageSnapshotRecord) throws IgniteCheckedException { - int grpId = pageSnapshotRecord.fullPageId().groupId(); - long pageId = pageSnapshotRecord.fullPageId().pageId(); + private void applyPageSnapshot(PageMemoryEx pageMem, PageSnapshot pageSnapshotRecord) throws IgniteCheckedException { + applyPage(pageMem, pageSnapshotRecord.fullPageId(), pageSnapshotRecord.pageDataBuffer()); + } + + /** + * @param pageMem Page memory. + * @param fullPageId Full page ID. + * @param buf Page buffer to apply. + * @throws IgniteCheckedException If failed. + */ + private void applyPage(PageMemoryEx pageMem, FullPageId fullPageId, ByteBuffer buf) throws IgniteCheckedException { + int grpId = fullPageId.groupId(); + long pageId = fullPageId.pageId(); long page = pageMem.acquirePage(grpId, pageId, IoStatisticsHolderNoOp.INSTANCE, true); @@ -2327,14 +2417,14 @@ public void applyPageSnapshot(PageMemoryEx pageMem, PageSnapshot pageSnapshotRec long pageAddr = pageMem.writeLock(grpId, pageId, page, true); try { - PageUtils.putBytes(pageAddr, 0, pageSnapshotRecord.pageData()); + ByteBuffer pageMemBuf = pageMem.pageBuffer(pageAddr); - if (PageIO.getCompressionType(pageAddr) != CompressionProcessor.UNCOMPRESSED_PAGE) { - int realPageSize = pageMem.realPageSize(pageSnapshotRecord.groupId()); + PageHandler.copyMemory(buf, 0, pageMemBuf, 0, buf.remaining()); - assert pageSnapshotRecord.pageDataSize() <= realPageSize : pageSnapshotRecord.pageDataSize(); + if (PageIO.getCompressionType(pageAddr) != CompressionProcessor.UNCOMPRESSED_PAGE) { + int realPageSize = pageMem.realPageSize(grpId); - cctx.kernalContext().compress().decompressPage(pageMem.pageBuffer(pageAddr), realPageSize); + cctx.kernalContext().compress().decompressPage(pageMemBuf, realPageSize); } } finally { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpoint.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpoint.java index bc842dd98dcf61..1ae38f2ad817d6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpoint.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpoint.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cache.persistence.checkpoint; import org.apache.ignite.internal.pagemem.FullPageId; +import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord; import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx; import org.apache.ignite.internal.util.GridConcurrentMultiPairQueue; import org.apache.ignite.lang.IgniteBiTuple; @@ -36,6 +37,9 @@ class Checkpoint { /** Checkpoint progress status. */ final CheckpointProgressImpl progress; + /** Checkpoint WAL record. */ + final CheckpointRecord cpRecord; + /** WAL segments fully covered by this checkpoint. */ IgniteBiTuple walSegsCoveredRange; @@ -48,15 +52,18 @@ class Checkpoint { * @param cpEntry Checkpoint entry. * @param cpPages Pages to write to the page store. * @param progress Checkpoint progress status. + * @param cpRecord Checkpoint WAL record. */ Checkpoint( @Nullable CheckpointEntry cpEntry, GridConcurrentMultiPairQueue cpPages, - CheckpointProgressImpl progress + CheckpointProgressImpl progress, + CheckpointRecord cpRecord ) { this.cpEntry = cpEntry; this.cpPages = cpPages; this.progress = progress; + this.cpRecord = cpRecord; pagesSize = cpPages.initialSize(); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointManager.java index 9445307b1f5335..08f1850220c903 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointManager.java @@ -21,6 +21,7 @@ import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.util.Collection; +import java.util.List; import java.util.UUID; import java.util.concurrent.Executor; import java.util.function.Function; @@ -36,6 +37,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheProcessor; import org.apache.ignite.internal.processors.cache.persistence.DataRegion; import org.apache.ignite.internal.processors.cache.persistence.DataStorageMetricsImpl; +import org.apache.ignite.internal.processors.cache.persistence.StorageException; import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager; import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx; @@ -78,6 +80,9 @@ public class CheckpointManager { /** Checkpoint markers storage which mark the start and end of each checkpoint. */ private final CheckpointMarkersStorage checkpointMarkersStorage; + /** Storage for checkpoint recovery files. */ + private final CheckpointRecoveryFileStorage checkpointRecoveryFileStorage; + /** Timeout checkpoint lock which should be used while write to memory happened. */ final CheckpointTimeoutLock checkpointTimeoutLock; @@ -173,6 +178,7 @@ public CheckpointManager( }; checkpointPagesWriterFactory = new CheckpointPagesWriterFactory( + cacheProcessor.context().kernalContext(), logger, (pageMemEx, fullPage, buf, tag) -> pageStoreManager.write(fullPage.groupId(), fullPage.pageId(), buf, tag, true), persStoreMetrics, @@ -180,6 +186,9 @@ public CheckpointManager( pageMemoryGroupResolver ); + checkpointRecoveryFileStorage = new CheckpointRecoveryFileStorage(cacheProcessor.context().kernalContext(), + checkpointDirectory(), ioFactory); + checkpointerProvider = () -> new Checkpointer( igniteInstanceName, checkpointThreadName, @@ -191,6 +200,7 @@ public CheckpointManager( cacheProcessor, checkpointWorkflow, checkpointPagesWriterFactory, + checkpointRecoveryFileStorage, persistenceCfg.getCheckpointFrequency(), persistenceCfg.getCheckpointThreads(), cpFreqDeviation @@ -303,6 +313,13 @@ public CheckpointHistory checkpointHistory() { return checkpointMarkersStorage.history(); } + /** + * @return List of checkpoint recovery files. + */ + public List checkpointRecoveryFiles(UUID cpId) throws StorageException { + return checkpointRecoveryFileStorage.list(cpId::equals); + } + /** * Initialize checkpoint storage. */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointMarkersStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointMarkersStorage.java index badbc9cc9247d0..dc4821fe8206b8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointMarkersStorage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointMarkersStorage.java @@ -445,6 +445,8 @@ private void removeCheckpointFiles(CheckpointEntry cpEntry) throws IgniteChecked } /** + * Writes checkpoint entry buffer {@code entryBuf} to specified checkpoint file with 2-phase protocol. + * * @param entryBuf Buffer which would be written to disk. * @param cp Prepared checkpoint entry. * @param type Type of checkpoint marker. @@ -484,7 +486,7 @@ private void writeCheckpointEntry( } /** - * Writes checkpoint entry buffer {@code entryBuf} to specified checkpoint file with 2-phase protocol. + * Creates and writes checkpoint entry to checkpoint marker file. * * @param cpTs Checkpoint timestamp. * @param cpId Checkpoint id. @@ -502,14 +504,28 @@ public CheckpointEntry writeCheckpointEntry( CheckpointEntryType type, boolean skipSync ) throws StorageException { - CheckpointEntry entry = prepareCheckpointEntry( - tmpWriteBuf, - cpTs, - cpId, - ptr, - rec, - type - ); + CheckpointEntry entry = createCheckPointEntry(cpTs, ptr, cpId, rec, type); + + writeCheckpointEntry(entry, rec, type, skipSync); + + return entry; + } + + /** + * Writes checkpoint entry to checkpoint marker file. + * + * @param entry Checkpoint entry. + * @param rec Checkpoint WAL record. + * @param type Checkpoint type. + * @throws StorageException If failed to write checkpoint entry. + */ + public void writeCheckpointEntry( + CheckpointEntry entry, + @Nullable CheckpointRecord rec, + CheckpointEntryType type, + boolean skipSync + ) throws StorageException { + prepareCheckpointEntryBuf(tmpWriteBuf, entry.checkpointMark()); if (type == CheckpointEntryType.START) cpHistory.addCheckpoint(entry, rec.cacheGroupStates()); @@ -517,30 +533,15 @@ public CheckpointEntry writeCheckpointEntry( writeCheckpointEntry(tmpWriteBuf, entry, type, skipSync); onEarliestCheckpointMapChanged(); - - return entry; } /** - * Prepares checkpoint entry containing WAL pointer to checkpoint record. Writes into given {@code ptrBuf} WAL - * pointer content. + * Writes into given {@code entryBuf} WAL pointer content. * * @param entryBuf Buffer to fill - * @param cpTs Checkpoint timestamp. - * @param cpId Checkpoint id. * @param ptr WAL pointer containing record. - * @param rec Checkpoint WAL record. - * @param type Checkpoint type. - * @return Checkpoint entry. */ - private CheckpointEntry prepareCheckpointEntry( - ByteBuffer entryBuf, - long cpTs, - UUID cpId, - WALPointer ptr, - @Nullable CheckpointRecord rec, - CheckpointEntryType type - ) { + private void prepareCheckpointEntryBuf(ByteBuffer entryBuf, WALPointer ptr) { assert ptr != null; entryBuf.rewind(); @@ -552,8 +553,6 @@ private CheckpointEntry prepareCheckpointEntry( entryBuf.putInt(ptr.length()); entryBuf.flip(); - - return createCheckPointEntry(cpTs, ptr, cpId, rec, type); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointPagesWriter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointPagesWriter.java index 9349ee1b38f693..3a508ca679558f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointPagesWriter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointPagesWriter.java @@ -194,7 +194,7 @@ private GridConcurrentMultiPairQueue writePages( PageStoreWriter pageStoreWriter = pageStoreWriters.computeIfAbsent(pageMem, pageMemEx -> createPageStoreWriter(pageMemEx, pagesToRetry)); - pageMem.checkpointWritePage(fullId, tmpWriteBuf, pageStoreWriter, tracker); + pageMem.checkpointWritePage(fullId, tmpWriteBuf, pageStoreWriter, tracker, false); if (throttlingEnabled) { while (pageMem.isCpBufferOverflowThresholdExceeded()) { @@ -205,7 +205,7 @@ private GridConcurrentMultiPairQueue writePages( tmpWriteBuf.rewind(); - pageMem.checkpointWritePage(cpPageId, tmpWriteBuf, pageStoreWriter, tracker); + pageMem.checkpointWritePage(cpPageId, tmpWriteBuf, pageStoreWriter, tracker, false); } } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointPagesWriterFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointPagesWriterFactory.java index 4713b83e0e3b26..a261823a69c822 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointPagesWriterFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointPagesWriterFactory.java @@ -21,12 +21,14 @@ import java.util.Collection; import java.util.HashMap; import java.util.Map; +import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.LongAdder; import java.util.function.BooleanSupplier; import java.util.function.Function; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.pagemem.FullPageId; import org.apache.ignite.internal.pagemem.store.PageStore; import org.apache.ignite.internal.processors.cache.persistence.DataStorageMetricsImpl; @@ -38,6 +40,7 @@ import org.apache.ignite.internal.util.future.CountDownFuture; import org.apache.ignite.internal.util.lang.IgniteThrowableFunction; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.internal.util.worker.WorkProgressDispatcher; import org.jsr166.ConcurrentLinkedHashMap; /** @@ -46,6 +49,9 @@ * It holds all dependency which is needed for creation of checkpoint writer and recovery checkpoint writer. */ public class CheckpointPagesWriterFactory { + /** Context. */ + private final GridKernalContext ctx; + /** Logger. */ private final IgniteLogger log; @@ -65,6 +71,7 @@ public class CheckpointPagesWriterFactory { private final CheckpointPagesWriter.CheckpointPageWriter checkpointPageWriter; /** + * @param ctx Context. * @param logger Logger. * @param checkpointPageWriter Checkpoint page writer. * @param persStoreMetrics Persistence metrics. @@ -73,6 +80,7 @@ public class CheckpointPagesWriterFactory { * @param pageMemoryGroupResolver Page memory resolver. */ CheckpointPagesWriterFactory( + GridKernalContext ctx, Function, IgniteLogger> logger, CheckpointPagesWriter.CheckpointPageWriter checkpointPageWriter, DataStorageMetricsImpl persStoreMetrics, @@ -80,6 +88,7 @@ public class CheckpointPagesWriterFactory { ThreadLocal threadBuf, IgniteThrowableFunction pageMemoryGroupResolver ) { + this.ctx = ctx; this.log = logger.apply(getClass()); this.persStoreMetrics = persStoreMetrics; this.threadBuf = threadBuf; @@ -98,7 +107,7 @@ public class CheckpointPagesWriterFactory { * @param shutdownNow Checker of stop operation. * @return Instance of page checkpint writer. */ - CheckpointPagesWriter build( + Runnable buildCheckpointPagesWriter( CheckpointMetricsTracker tracker, GridConcurrentMultiPairQueue cpPages, ConcurrentLinkedHashMap updStores, @@ -124,6 +133,38 @@ CheckpointPagesWriter build( ); } + /** + * @param recoveryDataFile File to write recovery data. + * @param cpPages List of pages to write. + * @param cacheGrpIds Set of cache groups to process (cache groups with WAL enabled). + * @param doneWriteFut Write done future. + * @param workProgressDispatcher Work progress dispatcher. + * @param curCpProgress Current checkpoint data. + * @param shutdownNow Checker of stop operation. + * @return Instance of page checkpint writer. + */ + Runnable buildRecoveryDataWriter( + CheckpointRecoveryFile recoveryDataFile, + GridConcurrentMultiPairQueue cpPages, + Set cacheGrpIds, + CountDownFuture doneWriteFut, + WorkProgressDispatcher workProgressDispatcher, + CheckpointProgressImpl curCpProgress, + BooleanSupplier shutdownNow + ) { + return new RecoveryDataWriter( + ctx, + recoveryDataFile, + cpPages, + cacheGrpIds, + doneWriteFut, + workProgressDispatcher, + log, + curCpProgress, + shutdownNow + ); + } + /** * @param pages List of pages to write. * @param updStores Updated page store storage. @@ -131,7 +172,7 @@ CheckpointPagesWriter build( * @param cpPagesCnt Count of checkpointed pages. * @return Instance of page checkpint writer. */ - Runnable buildRecovery( + Runnable buildRecoveryFinalizer( GridConcurrentMultiPairQueue pages, Collection updStores, AtomicReference writePagesError, @@ -168,7 +209,7 @@ Runnable buildRecovery( // Write page content to page store via pageStoreWriter. // Tracker is null, because no need to track checkpoint metrics on recovery. - pageMem.checkpointWritePage(res.getValue(), tmpWriteBuf, pageStoreWriter, null); + pageMem.checkpointWritePage(res.getValue(), tmpWriteBuf, pageStoreWriter, null, false); // Add number of handled pages. pagesWritten++; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointProgress.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointProgress.java index 61bfe538275c37..18cb1716be5c0d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointProgress.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointProgress.java @@ -78,6 +78,11 @@ public interface CheckpointProgress { */ public AtomicInteger evictedPagesCounter(); + /** + * @return Counter for written recovery pages during current checkpoint. Not null only if checkpoint is running. + */ + public AtomicInteger writtenRecoveryPagesCounter(); + /** * @return Number of pages in current checkpoint. If checkpoint is not running, returns 0. */ @@ -114,6 +119,13 @@ public interface CheckpointProgress { */ public void updateEvictedPages(int delta); + /** + * Update written recovery pages counter. + * + * @param delta Pages num to update. + */ + public void updateWrittenRecoveryPages(int delta); + /** Clear cp progress counters. */ public void clearCounters(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointProgressImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointProgressImpl.java index 196954b5558080..6bbadb49454fae 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointProgressImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointProgressImpl.java @@ -62,6 +62,9 @@ public class CheckpointProgressImpl implements CheckpointProgress { /** Counter for evicted checkpoint pages. Not null only if checkpoint is running. */ private volatile AtomicInteger evictedPagesCntr; + /** Counter for written recovery pages. Not null only if checkpoint is running. */ + private volatile AtomicInteger writtenRecoveryPagesCntr; + /** Number of pages in current checkpoint at the beginning of checkpoint. */ private volatile int currCheckpointPagesCnt; @@ -185,10 +188,10 @@ public void reason(String reason) { } /** {@inheritDoc} */ - @Override public void updateWrittenPages(int deltha) { - A.ensure(deltha > 0, "param must be positive"); + @Override public void updateWrittenPages(int delta) { + A.ensure(delta > 0, "param must be positive"); - writtenPagesCntr.addAndGet(deltha); + writtenPagesCntr.addAndGet(delta); } /** {@inheritDoc} */ @@ -197,10 +200,10 @@ public void reason(String reason) { } /** {@inheritDoc} */ - @Override public void updateSyncedPages(int deltha) { - A.ensure(deltha > 0, "param must be positive"); + @Override public void updateSyncedPages(int delta) { + A.ensure(delta > 0, "param must be positive"); - syncedPagesCntr.addAndGet(deltha); + syncedPagesCntr.addAndGet(delta); } /** {@inheritDoc} */ @@ -216,6 +219,21 @@ public void reason(String reason) { evictedPagesCounter().addAndGet(delta); } + /** {@inheritDoc} */ + @Override public AtomicInteger writtenRecoveryPagesCounter() { + return writtenRecoveryPagesCntr; + } + + /** {@inheritDoc} */ + @Override public void updateWrittenRecoveryPages(int delta) { + A.ensure(delta > 0, "param must be positive"); + + AtomicInteger cntr = writtenRecoveryPagesCounter(); + + if (cntr != null) + cntr.addAndGet(delta); + } + /** {@inheritDoc} */ @Override public int currentCheckpointPagesCount() { return currCheckpointPagesCnt; @@ -233,6 +251,7 @@ public void reason(String reason) { writtenPagesCntr = new AtomicInteger(); syncedPagesCntr = new AtomicInteger(); evictedPagesCntr = new AtomicInteger(); + writtenRecoveryPagesCntr = new AtomicInteger(); } /** {@inheritDoc} */ @@ -242,6 +261,7 @@ public void reason(String reason) { writtenPagesCntr = null; syncedPagesCntr = null; evictedPagesCntr = null; + writtenRecoveryPagesCntr = null; } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointRecoveryFile.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointRecoveryFile.java new file mode 100644 index 00000000000000..2ed017f9601dc0 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointRecoveryFile.java @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.checkpoint; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.UUID; +import java.util.function.BiConsumer; +import java.util.function.IntPredicate; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.managers.encryption.GroupKey; +import org.apache.ignite.internal.pagemem.FullPageId; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; +import org.apache.ignite.internal.processors.cache.persistence.wal.crc.FastCrc; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.encryption.EncryptionSpi; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.configuration.DataStorageConfiguration.MAX_PAGE_SIZE; + +/** + * File for recovery on failure during checkpoint. + * Not thread safe. + */ +public class CheckpointRecoveryFile implements AutoCloseable { + /** Record header size. */ + private static final int HEADER_SIZE = + 8 /* pageId */ + + 4 /* grpId */ + + 4 /* pageSize */ + + 4 /* CRC */ + + 1 /* encrypted flag */ + + 1 /* encryption key id */; + + /** Record header buffer. */ + private final ByteBuffer hdrBuf = ByteBuffer.allocateDirect(HEADER_SIZE).order(ByteOrder.nativeOrder()); + + /** Buffer for encrypted data. */ + private final ByteBuffer encBuf; + + /** Context. */ + private final GridKernalContext ctx; + + /** Encryption SPI */ + private final EncryptionSpi encSpi; + + /** File IO to read/write recovery data. */ + private final FileIO fileIo; + + /** Checkpoint timestamp. */ + private final long cpTs; + + /** Checkpoint ID. */ + private final UUID cpId; + + /** Checkpointer index. */ + private final int cpIdx; + + /** */ + private final File file; + + /** + * Ctor. + */ + CheckpointRecoveryFile(GridKernalContext ctx, long cpTs, UUID cpId, int cpIdx, File file, FileIO fileIo) { + this.ctx = ctx; + this.cpTs = cpTs; + this.cpId = cpId; + this.cpIdx = cpIdx; + this.file = file; + this.fileIo = fileIo; + + encSpi = ctx.encryption().enabled() ? ctx.config().getEncryptionSpi() : null; + encBuf = encSpi != null ? ByteBuffer.allocateDirect(encSpi.encryptedSize(MAX_PAGE_SIZE)) : null; + } + + /** Checkpoint timestamp. */ + public long checkpointTs() { + return cpTs; + } + + /** Checkpoint ID. */ + public UUID checkpointId() { + return cpId; + } + + /** Checkpointer index. */ + public int checkpointerIndex() { + return cpIdx; + } + + /** Recovery file. */ + public File file() { + return file; + } + + /** */ + public void fsync() throws IOException { + fileIo.force(); + } + + /** {@inheritDoc} */ + @Override public void close() throws Exception { + fileIo.close(); + } + + /** */ + public void writePage(FullPageId fullPageId, ByteBuffer buf) throws IOException { + // Encrypt if required. + byte encFlag = 0; + byte encKeyId = 0; + + if (encSpi != null) { + GroupKey grpKey = ctx.encryption().getActiveKey(fullPageId.groupId()); + + if (grpKey != null) { + encFlag = 1; + encKeyId = grpKey.id(); + encBuf.clear(); + encBuf.limit(encSpi.encryptedSize(buf.remaining())); + encSpi.encrypt(buf, grpKey.key(), encBuf); + encBuf.rewind(); + buf = encBuf; + } + } + + hdrBuf.clear(); + hdrBuf.putLong(fullPageId.pageId()); + hdrBuf.putInt(fullPageId.groupId()); + hdrBuf.putInt(buf.remaining()); + // We have dedicated CRC field in the page structure, but we intentionally store CRC to record header, since + // page buffer can be encrypted. + hdrBuf.putInt(FastCrc.calcCrc(buf, buf.remaining())); + hdrBuf.put(encFlag); + hdrBuf.put(encKeyId); + + hdrBuf.rewind(); + buf.rewind(); + + fileIo.writeFully(hdrBuf); + fileIo.writeFully(buf); + } + + /** */ + private @Nullable FullPageId readPage(IntPredicate cacheGrpPredicate, ByteBuffer buf) throws IOException { + // Read header. + hdrBuf.clear(); + long pos = fileIo.position(); // For error messages. + + int read = fileIo.readFully(hdrBuf); + + if (read <= 0) + return null; + + if (read < hdrBuf.capacity()) { + throw new IOException("Recovery file buffer underflow [file=" + file.getName() + + ", pos=" + pos + ", expSize=" + hdrBuf.capacity() + ", read=" + read + ']'); + } + + hdrBuf.rewind(); + FullPageId fullPageId = new FullPageId(hdrBuf.getLong(), hdrBuf.getInt()); + int pageSize = hdrBuf.getInt(); + int storedCrc = hdrBuf.getInt(); + byte encFlag = hdrBuf.get(); + byte encKeyId = hdrBuf.get(); + + if (!cacheGrpPredicate.test(fullPageId.groupId())) { + fileIo.position(fileIo.position() + pageSize); + buf.clear(); + buf.limit(0); + return fullPageId; + } + + ByteBuffer decBuf = buf; // Buffer for decrypted data. + + if (encFlag != 0) + buf = encBuf; + + // Read page data. + buf.clear(); + assert buf.capacity() >= pageSize; + buf.limit(pageSize); + read = fileIo.readFully(buf); + + if (read < pageSize) { + throw new IOException("Recovery file buffer underflow [file=" + file.getName() + + ", pos=" + pos + ", expSize=" + pageSize + ", read=" + read + ']'); + } + + buf.rewind(); + + int calcCrc = FastCrc.calcCrc(buf, buf.remaining()); + + if (storedCrc != calcCrc) { + throw new IOException("CRC validation failed [file=" + file.getName() + ", pos=" + pos + + ", storedCrc=" + U.hexInt(storedCrc) + ", calcCrc=" + U.hexInt(calcCrc) + "]"); + } + + buf.rewind(); + + if (encFlag != 0) { + if (encSpi == null) { + throw new IOException("Found encrypted record, but encryption is disabled [file=" + + file.getName() + ", pos=" + pos + ']'); + } + + GroupKey grpKey = ctx.encryption().groupKey(fullPageId.groupId(), encKeyId); + + if (grpKey == null) { + throw new IOException("Not found encryption key id [file=" + + file.getName() + ", pos=" + pos + ", grpId=" + fullPageId.groupId() + ", keyId=" + encKeyId + ']'); + } + + EncryptionSpi encSpi = ctx.config().getEncryptionSpi(); + + decBuf.clear(); + encSpi.decrypt(buf, grpKey.key(), decBuf); + decBuf.limit(decBuf.position()); + decBuf.rewind(); + } + + return fullPageId; + } + + /** + * Preforms action on all pages stored in recovery file. + */ + public void forAllPages( + IntPredicate cacheGrpPredicate, + BiConsumer action + ) throws IOException { + fileIo.position(0); + + ByteBuffer buf = ByteBuffer.allocateDirect(MAX_PAGE_SIZE).order(ByteOrder.nativeOrder()); + + FullPageId fullPageId = readPage(cacheGrpPredicate, buf); + + while (fullPageId != null) { + if (cacheGrpPredicate.test(fullPageId.groupId())) + action.accept(fullPageId, buf); + + fullPageId = readPage(cacheGrpPredicate, buf); + } + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointRecoveryFileStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointRecoveryFileStorage.java new file mode 100644 index 00000000000000..dc1a7d79ca5a2e --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointRecoveryFileStorage.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.checkpoint; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.function.Predicate; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.processors.cache.persistence.StorageException; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.jetbrains.annotations.Nullable; + +import static java.nio.file.StandardOpenOption.CREATE; +import static java.nio.file.StandardOpenOption.READ; +import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING; +import static java.nio.file.StandardOpenOption.WRITE; + +/** + * + */ +public class CheckpointRecoveryFileStorage { + /** Checkpoint recovery file name pattern. */ + public static final Pattern FILE_NAME_PATTERN = Pattern.compile("(\\d+)-(.*)-RECOVERY-(\\d+)\\.bin"); + + /** Context. */ + private final GridKernalContext ctx; + + /** + * File IO factory. + */ + private final FileIOFactory fileIoFactory; + + /** + * Recovery file location. + */ + private final File dir; + + /** + * Ctor. + */ + public CheckpointRecoveryFileStorage(GridKernalContext ctx, File dir, FileIOFactory fileIoFactory) throws StorageException { + this.ctx = ctx; + this.dir = dir; + this.fileIoFactory = fileIoFactory; + + if (!U.mkdirs(dir)) + throw new StorageException("Failed to create directory for checkpoint recovery files [dir=" + dir + ']'); + + } + + /** */ + private static String fileName(long cpTs, UUID cpId, int idx) { + return cpTs + "-" + cpId + "-" + "RECOVERY-" + idx + ".bin"; + } + + /** + * Factory method. + */ + public CheckpointRecoveryFile create(long cpTs, UUID cpId, int idx) throws StorageException { + File file = new File(dir, fileName(cpTs, cpId, idx)); + + try { + FileIO fileIO = fileIoFactory.create(file, CREATE, TRUNCATE_EXISTING, WRITE); + + return new CheckpointRecoveryFile(ctx, cpTs, cpId, idx, file, fileIO); + } + catch (IOException e) { + throw new StorageException("Failed to create checkpoint recovery file [file=" + file + ']', e); + } + } + + /** + * Gets list of recovery files, satisfying given predicate (or all files in storage if predicate is null). + * + * @return List of recovery files. + */ + public List list(@Nullable Predicate predicate) throws StorageException { + File[] files = dir.listFiles(f -> f.isFile() && f.getName().contains("-RECOVERY-")); + List fileList = new ArrayList<>(); + + for (File file : files) { + Matcher matcher = FILE_NAME_PATTERN.matcher(file.getName()); + if (matcher.matches()) { + long ts = Long.parseLong(matcher.group(1)); + UUID id = UUID.fromString(matcher.group(2)); + int idx = Integer.parseInt(matcher.group(3)); + + if (predicate == null || predicate.test(id)) { + try { + fileList.add(new CheckpointRecoveryFile(ctx, ts, id, idx, file, fileIoFactory.create(file, READ))); + } + catch (IOException e) { + throw new StorageException("Failed to open checkpoint recovery file [file=" + file + ']', e); + } + } + } + } + + return fileList; + } + + /** + * Deletes all recovery files in storage. + */ + public void clear() throws StorageException { + File[] files = dir.listFiles(f -> f.isFile() + && f.getName().contains("-RECOVERY-") + && FILE_NAME_PATTERN.matcher(f.getName()).matches()); + + for (File file : files) { + if (!file.delete()) + throw new StorageException("Failed to checkpoint recovery file [file=" + file + ']'); + } + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointWorkflow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointWorkflow.java index bf6518933ad429..da60a45e9b14f2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointWorkflow.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointWorkflow.java @@ -57,6 +57,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState; import org.apache.ignite.internal.processors.cache.persistence.DataRegion; +import org.apache.ignite.internal.processors.cache.persistence.StorageException; import org.apache.ignite.internal.processors.cache.persistence.pagemem.CheckpointMetricsTracker; import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx; import org.apache.ignite.internal.processors.cache.persistence.partstate.PartitionAllocationMap; @@ -315,37 +316,38 @@ curr, new PartitionAllocationMap(), checkpointCollectPagesInfoPool, workProgress tracker.onWalCpRecordFsyncEnd(); - CheckpointEntry checkpointEntry = null; - - if (checkpointMarkersStorage != null) - checkpointEntry = checkpointMarkersStorage.writeCheckpointEntry( - cpTs, - cpRec.checkpointId(), - cpPtr, - cpRec, - CheckpointEntryType.START, - skipSync - ); - - curr.transitTo(MARKER_STORED_TO_DISK); - - tracker.onSplitAndSortCpPagesStart(); - GridConcurrentMultiPairQueue cpPages = splitAndSortCpPagesIfNeeded(cpPagesHolder); tracker.onSplitAndSortCpPagesEnd(); - return new Checkpoint(checkpointEntry, cpPages, curr); + CheckpointEntry cpEntry = new CheckpointEntry(cpTs, cpPtr, cpRec.checkpointId(), cpRec.cacheGroupStates()); + + return new Checkpoint(cpEntry, cpPages, curr, cpRec); } else { if (ctx0.walFlush() && wal != null) wal.flush(null, true); - return new Checkpoint(null, GridConcurrentMultiPairQueue.EMPTY, curr); + return new Checkpoint(null, GridConcurrentMultiPairQueue.EMPTY, curr, cpRec); } } + /** Stores begin checkpoint marker to disk. */ + public void storeBeginMarker(Checkpoint cp) throws StorageException { + if (checkpointMarkersStorage != null && cp.cpEntry != null) { + checkpointMarkersStorage.writeCheckpointEntry( + cp.cpEntry, + cp.cpRecord, + CheckpointEntryType.START, + skipSync + ); + + cp.progress.transitTo(MARKER_STORED_TO_DISK); + } + + } + /** * Fill cache group state in checkpoint record. * @@ -627,7 +629,7 @@ public void finalizeCheckpointOnRecovery( for (int stripeIdx = 0; stripeIdx < exec.stripesCount(); stripeIdx++) exec.execute( stripeIdx, - checkpointPagesWriterFactory.buildRecovery(pages, updStores, writePagesError, cpPagesCnt) + checkpointPagesWriterFactory.buildRecoveryFinalizer(pages, updStores, writePagesError, cpPagesCnt) ); // Await completion all write tasks. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java index e09d91451a0278..ae7c3a138bc3e1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java @@ -18,8 +18,10 @@ package org.apache.ignite.internal.processors.cache.persistence.checkpoint; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; @@ -34,6 +36,7 @@ import org.apache.ignite.IgniteLogger; import org.apache.ignite.failure.FailureContext; import org.apache.ignite.failure.FailureType; +import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.LongJVMPauseDetector; @@ -42,10 +45,12 @@ import org.apache.ignite.internal.pagemem.store.PageStore; import org.apache.ignite.internal.processors.cache.CacheGroupContext; import org.apache.ignite.internal.processors.cache.GridCacheProcessor; +import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxLog; import org.apache.ignite.internal.processors.cache.persistence.CheckpointState; import org.apache.ignite.internal.processors.cache.persistence.DataStorageMetricsImpl; import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager; +import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage; import org.apache.ignite.internal.processors.cache.persistence.pagemem.CheckpointMetricsTracker; import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx; import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer; @@ -105,8 +110,9 @@ public class Checkpointer extends GridWorker { "checkpointListenersExecuteTime=%dms, " + "checkpointLockHoldTime=%dms, " + "walCpRecordFsyncDuration=%dms, " + - "writeCheckpointEntryDuration=%dms, " + "splitAndSortCpPagesDuration=%dms, " + + "writeRecoveryDataDuration=%dms, " + + "writeCheckpointEntryDuration=%dms, " + "%s" + "pages=%d, " + "reason='%s']"; @@ -149,6 +155,9 @@ public class Checkpointer extends GridWorker { /** Factory for the creation of page-write workers. */ private final CheckpointPagesWriterFactory checkpointPagesWriterFactory; + /** Storage for checkpoint recovery files. */ + @Nullable private final CheckpointRecoveryFileStorage checkpointRecoveryFileStorage; + /** The number of IO-bound threads which will write pages to disk. */ private final int checkpointWritePageThreads; @@ -173,6 +182,9 @@ public class Checkpointer extends GridWorker { /** Performance statistics processor. */ private final PerformanceStatisticsProcessor psproc; + /** Write recovery data during checkpoint. */ + private final boolean writeRecoveryData; + /** For testing only. */ private GridFutureAdapter enableChangeApplied; @@ -205,6 +217,7 @@ public class Checkpointer extends GridWorker { GridCacheProcessor cacheProcessor, CheckpointWorkflow checkpoint, CheckpointPagesWriterFactory factory, + @Nullable CheckpointRecoveryFileStorage checkpointRecoveryFileStorage, long checkpointFrequency, int checkpointWritePageThreads, Supplier cpFreqDeviation @@ -220,7 +233,10 @@ public class Checkpointer extends GridWorker { this.checkpointWritePageThreads = Math.max(checkpointWritePageThreads, 1); this.checkpointWritePagesPool = initializeCheckpointPool(); this.cpFreqDeviation = cpFreqDeviation; - this.psproc = cacheProcessor.context().kernalContext().performanceStatistics(); + GridKernalContext ctx = cacheProcessor.context().kernalContext(); + this.psproc = ctx.performanceStatistics(); + this.writeRecoveryData = ctx.config().getDataStorageConfiguration().isWriteRecoveryDataOnCheckpoint(); + this.checkpointRecoveryFileStorage = checkpointRecoveryFileStorage; scheduledCp = new CheckpointProgressImpl(nextCheckpointInterval()); } @@ -386,6 +402,48 @@ private void doCheckpoint() { try { chp = checkpointWorkflow.markCheckpointBegin(lastCpTs, curCpProgress, tracker, this); + + tracker.onMarkEnd(); + + currentProgress().initCounters(chp.pagesSize); + + if (checkpointRecoveryFileStorage != null) { + checkpointRecoveryFileStorage.clear(); + + if (chp.hasDelta() && writeRecoveryData) { + if (log.isInfoEnabled()) { + log.info(String.format("Checkpoint recovery data write started [" + + "checkpointId=%s, " + + "startPtr=%s, " + + "pages=%d, " + + "checkpointBeforeLockTime=%dms, " + + "checkpointLockWait=%dms, " + + "checkpointListenersExecuteTime=%dms, " + + "checkpointLockHoldTime=%dms, " + + "walCpRecordFsyncDuration=%dms, " + + "splitAndSortCpPagesDuration=%dms, " + + "reason='%s']", + chp.cpEntry == null ? "" : chp.cpEntry.checkpointId(), + chp.cpEntry == null ? "" : chp.cpEntry.checkpointMark(), + chp.pagesSize, + tracker.beforeLockDuration(), + tracker.lockWaitDuration(), + tracker.listenersExecuteDuration(), + tracker.lockHoldDuration(), + tracker.walCpRecordFsyncDuration(), + tracker.splitAndSortCpPagesDuration(), + chp.progress.reason() + )); + } + + writeRecoveryData(chp); + } + } + tracker.onWriteRecoveryDataEnd(); + + checkpointWorkflow.storeBeginMarker(chp); + + tracker.onCpMarkerStoreEnd(); } catch (Exception e) { if (curCpProgress != null) @@ -399,8 +457,6 @@ private void doCheckpoint() { updateHeartbeat(); - currentProgress().initCounters(chp.pagesSize); - if (chp.hasDelta()) { if (log.isInfoEnabled()) { long possibleJvmPauseDur = possibleLongJvmPauseDuration(tracker); @@ -415,8 +471,9 @@ private void doCheckpoint() { tracker.listenersExecuteDuration(), tracker.lockHoldDuration(), tracker.walCpRecordFsyncDuration(), - tracker.writeCheckpointEntryDuration(), tracker.splitAndSortCpPagesDuration(), + tracker.recoveryDataWriteDuration(), + tracker.writeCheckpointEntryDuration(), possibleJvmPauseDur > 0 ? "possibleJvmPauseDuration=" + possibleJvmPauseDur + "ms, " : "", chp.pagesSize, chp.progress.reason() @@ -454,12 +511,14 @@ private void doCheckpoint() { if (chp.hasDelta() || destroyedPartitionsCnt > 0) { if (log.isInfoEnabled()) { log.info(String.format("Checkpoint finished [cpId=%s, pages=%d, markPos=%s, " + - "walSegmentsCovered=%s, markDuration=%dms, pagesWrite=%dms, fsync=%dms, total=%dms]", + "walSegmentsCovered=%s, markDuration=%dms, recoveryWrite=%dms, pagesWrite=%dms, " + + "fsync=%dms, total=%dms]", chp.cpEntry != null ? chp.cpEntry.checkpointId() : "", chp.pagesSize, chp.cpEntry != null ? chp.cpEntry.checkpointMark() : "", walRangeStr(chp.walSegsCoveredRange), tracker.markDuration(), + tracker.recoveryDataWriteDuration(), tracker.pagesWriteDuration(), tracker.fsyncDuration(), tracker.totalDuration())); @@ -475,6 +534,53 @@ private void doCheckpoint() { } } + /** + * Writes data required for storage recovery in case of crash during write pages phase. + * + * @param cp Current checkpoint + */ + void writeRecoveryData(Checkpoint cp) throws IgniteCheckedException { + IgniteThreadPoolExecutor pageWritePool = checkpointWritePagesPool; + + int threads = pageWritePool == null ? 1 : pageWritePool.getMaximumPoolSize(); + + // Cache group to process: cache groups with enabled WAL and reserved system group placeholders (without context). + Set cacheGrpIds = new HashSet<>(cp.cpRecord.cacheGroupStates().keySet()); + cacheGrpIds.add(TxLog.TX_LOG_CACHE_ID); + cacheGrpIds.add(MetaStorage.METASTORAGE_CACHE_ID); + + CountDownFuture doneFut = new CountDownFuture(threads); + + for (int i = 0; i < checkpointWritePageThreads; i++) { + CheckpointRecoveryFile file = checkpointRecoveryFileStorage.create(cp.cpEntry.timestamp(), cp.cpEntry.checkpointId(), i); + + Runnable write = checkpointPagesWriterFactory.buildRecoveryDataWriter( + file, + cp.cpPages, + cacheGrpIds, + doneFut, + this, + cp.progress, + this::isShutdownNow + ); + + if (pageWritePool == null) + write.run(); + else { + try { + pageWritePool.execute(write); + } + catch (RejectedExecutionException ignore) { + // Run the task synchronously. + write.run(); + } + } + } + + // Wait and check for errors. + doneFut.get(); + } + /** * @param workProgressDispatcher Work progress dispatcher. * @param tracker Checkpoint metrics tracker. @@ -489,6 +595,8 @@ boolean writePages( WorkProgressDispatcher workProgressDispatcher, BooleanSupplier shutdownNow ) throws IgniteCheckedException { + cpPages.rewind(); + IgniteThreadPoolExecutor pageWritePool = checkpointWritePagesPool; int checkpointWritePageThreads = pageWritePool == null ? 1 : pageWritePool.getMaximumPoolSize(); @@ -501,7 +609,7 @@ boolean writePages( tracker.onPagesWriteStart(); for (int i = 0; i < checkpointWritePageThreads; i++) { - Runnable write = checkpointPagesWriterFactory.build( + Runnable write = checkpointPagesWriterFactory.buildCheckpointPagesWriter( tracker, cpPages, updStores, @@ -580,6 +688,7 @@ private void updateMetrics(Checkpoint chp, CheckpointMetricsTracker tracker) { tracker.walCpRecordFsyncDuration(), tracker.writeCheckpointEntryDuration(), tracker.splitAndSortCpPagesDuration(), + tracker.recoveryDataWriteDuration(), tracker.totalDuration(), tracker.checkpointStartTime(), chp.pagesSize, @@ -601,6 +710,7 @@ private void updateMetrics(Checkpoint chp, CheckpointMetricsTracker tracker) { tracker.walCpRecordFsyncDuration(), tracker.writeCheckpointEntryDuration(), tracker.splitAndSortCpPagesDuration(), + tracker.recoveryDataWriteDuration(), tracker.totalDuration(), tracker.checkpointStartTime(), chp.pagesSize, diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/LightweightCheckpointManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/LightweightCheckpointManager.java index 8a82c6e626f42b..6f427b716c64c3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/LightweightCheckpointManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/LightweightCheckpointManager.java @@ -129,6 +129,7 @@ public LightweightCheckpointManager( }; checkpointPagesWriterFactory = new CheckpointPagesWriterFactory( + cacheProcessor.context().kernalContext(), logger, (pageMemEx, fullPage, buf, tag) -> pageMemEx.pageManager().write(fullPage.groupId(), fullPage.pageId(), buf, tag, true), @@ -149,6 +150,7 @@ public LightweightCheckpointManager( cacheProcessor, checkpointWorkflow, checkpointPagesWriterFactory, + null, persistenceCfg.getCheckpointFrequency(), persistenceCfg.getCheckpointThreads(), () -> 0 diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/RecoveryDataWriter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/RecoveryDataWriter.java new file mode 100644 index 00000000000000..7a642b4064acd7 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/RecoveryDataWriter.java @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.checkpoint; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.BooleanSupplier; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.DiskPageCompression; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.NodeStoppingException; +import org.apache.ignite.internal.pagemem.FullPageId; +import org.apache.ignite.internal.processors.cache.persistence.PageStoreWriter; +import org.apache.ignite.internal.processors.cache.persistence.StorageException; +import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx; +import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryImpl; +import org.apache.ignite.internal.processors.compress.CompressionProcessor; +import org.apache.ignite.internal.util.GridConcurrentMultiPairQueue; +import org.apache.ignite.internal.util.future.CountDownFuture; +import org.apache.ignite.internal.util.worker.WorkProgressDispatcher; + +import static org.apache.ignite.configuration.DataStorageConfiguration.MAX_PAGE_SIZE; +import static org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO.getType; +import static org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO.getVersion; +import static org.apache.ignite.internal.util.IgniteUtils.hexLong; + +/** + * Implementation of writer which able to store recovery data to disk during checkpoint. + */ +public class RecoveryDataWriter implements Runnable { + /** File to write recovery data. */ + private final CheckpointRecoveryFile file; + + /** Context. */ + private final GridKernalContext ctx; + + /** Logger. */ + private final IgniteLogger log; + + /** Collection of page IDs to write under this task. Overall pages to write may be greater than this collection. */ + private final GridConcurrentMultiPairQueue writePageIds; + + /** Set of cache groups to process. */ + private final Set cacheGrpIds; + + /** Future which should be finished when all pages would be written. */ + private final CountDownFuture doneFut; + + /** Work progress dispatcher. */ + private final WorkProgressDispatcher workProgressDispatcher; + + /** Current checkpoint. This field is updated only by checkpoint thread. */ + private final CheckpointProgressImpl curCpProgress; + + /** Shutdown now. */ + private final BooleanSupplier shutdownNow; + + /** Page data buffer. */ + private final ByteBuffer pageBuf = ByteBuffer.allocateDirect(MAX_PAGE_SIZE).order(ByteOrder.nativeOrder()); + + /** */ + private final DiskPageCompression compressionType; + + /** */ + private final int compressionLevel; + + /** + * Creates task for write recovery data. + * + * @param ctx Context. + * @param file Checkpoint recovery file. + * @param writePageIds Collection of page IDs to write. + * @param doneFut Done future. + * @param workProgressDispatcher Work progress dispatcher. + * @param log Logger. + * @param curCpProgress Checkpoint progress. + * @param shutdownNow Shutdown supplier. + */ + RecoveryDataWriter( + GridKernalContext ctx, + CheckpointRecoveryFile file, + GridConcurrentMultiPairQueue writePageIds, + Set cacheGrpIds, + CountDownFuture doneFut, + WorkProgressDispatcher workProgressDispatcher, + IgniteLogger log, + CheckpointProgressImpl curCpProgress, + BooleanSupplier shutdownNow + ) { + this.ctx = ctx; + this.file = file; + this.writePageIds = writePageIds; + this.cacheGrpIds = cacheGrpIds; + this.doneFut = doneFut; + this.workProgressDispatcher = workProgressDispatcher; + this.log = log; + this.curCpProgress = curCpProgress; + this.shutdownNow = shutdownNow; + + DataStorageConfiguration dsCfg = ctx.config().getDataStorageConfiguration(); + compressionType = dsCfg.getCheckpointRecoveryDataCompression(); + compressionLevel = CompressionProcessor.getCompressionLevel(dsCfg.getCheckpointRecoveryDataCompressionLevel(), + compressionType); + } + + /** {@inheritDoc} */ + @Override public void run() { + GridConcurrentMultiPairQueue writePageIds = this.writePageIds; + + Throwable err = null; + + try { + GridConcurrentMultiPairQueue pagesToRetry = writePages(writePageIds); + + if (!pagesToRetry.isEmpty()) { + if (log.isInfoEnabled()) { + log.info(pagesToRetry.initialSize() + " recovery pages were not written yet due to " + + "unsuccessful page write lock acquisition and will be retried"); + } + + while (!pagesToRetry.isEmpty()) + pagesToRetry = writePages(pagesToRetry); + } + + if (shutdownNow.getAsBoolean()) { + doneFut.onDone(new NodeStoppingException("Node is stopping.")); + + return; + } + + workProgressDispatcher.blockingSectionBegin(); + + try { + file.fsync(); + } + finally { + workProgressDispatcher.blockingSectionEnd(); + } + } + catch (Throwable e) { + err = e; + } + finally { + try { + file.close(); + } + catch (Exception e) { + if (err == null) + err = e; + else + err.addSuppressed(e); + } + doneFut.onDone(err); + } + } + + /** + * @param writePageIds Collections of pages to write. + * @return pagesToRetry Pages which should be retried. + */ + private GridConcurrentMultiPairQueue writePages( + GridConcurrentMultiPairQueue writePageIds + ) throws IgniteCheckedException { + Map> pagesToRetry = new HashMap<>(); + + Map pageStoreWriters = new HashMap<>(); + + GridConcurrentMultiPairQueue.Result res = + new GridConcurrentMultiPairQueue.Result<>(); + + while (writePageIds.next(res)) { + if (shutdownNow.getAsBoolean()) + break; + + workProgressDispatcher.updateHeartbeat(); + + FullPageId fullId = res.getValue(); + + PageMemoryEx pageMem = res.getKey(); + + if (!cacheGrpIds.contains(fullId.groupId())) + continue; + + pageBuf.rewind(); + pageBuf.limit(pageMem.pageSize()); + + PageStoreWriter pageStoreWriter = + pageStoreWriters.computeIfAbsent(pageMem, pageMemEx -> createPageStoreWriter(pageMemEx, pagesToRetry)); + + pageMem.checkpointWritePage(fullId, pageBuf, pageStoreWriter, null, true); + } + + return pagesToRetry.isEmpty() ? + GridConcurrentMultiPairQueue.EMPTY : + new GridConcurrentMultiPairQueue<>(pagesToRetry); + } + + /** + * Factory method for create {@link PageStoreWriter}. + * + * @param pageMemEx Page memory. + * @param pagesToRetry List pages for retry. + * @return Checkpoint page write context. + */ + private PageStoreWriter createPageStoreWriter( + PageMemoryEx pageMemEx, + Map> pagesToRetry + ) { + return new PageStoreWriter() { + /** {@inheritDoc} */ + @Override public void writePage(FullPageId fullPageId, ByteBuffer buf, int tag) throws IgniteCheckedException { + if (tag == PageMemoryImpl.TRY_AGAIN_TAG) { + pagesToRetry.computeIfAbsent(pageMemEx, k -> new ArrayList<>()).add(fullPageId); + + return; + } + + long pageId = fullPageId.pageId(); + + assert getType(buf) != 0 : "Invalid state. Type is 0! pageId = " + hexLong(pageId); + assert getVersion(buf) != 0 : "Invalid state. Version is 0! pageId = " + hexLong(pageId); + + buf.limit(pageMemEx.realPageSize(fullPageId.groupId())); + + if (compressionType != DiskPageCompression.DISABLED) { + buf = ctx.compress().compressPage(buf, buf.remaining(), 1, + compressionType, compressionLevel); + } + + try { + file.writePage(fullPageId, buf); + } + catch (IOException e) { + throw new StorageException("Failed to write page to recovery file [file=" + file.file() + ']', e); + } + + curCpProgress.updateWrittenRecoveryPages(1); + } + }; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIO.java index 3ecdd22f962c9a..aa925dfe8acad7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIO.java @@ -66,7 +66,7 @@ public interface FileIO extends AutoCloseable { * * @param destBuf Destination byte buffer. * - * @return Number of written bytes. + * @return Number of read bytes. * * @throws IOException If some I/O error occurs. */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/CheckpointMetricsTracker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/CheckpointMetricsTracker.java index 474f837b67485c..3a1c32d4c4c373 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/CheckpointMetricsTracker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/CheckpointMetricsTracker.java @@ -54,7 +54,7 @@ public class CheckpointMetricsTracker { private volatile int cowPages; /** */ - private long cpStart = System.currentTimeMillis(); + private final long cpStart = System.currentTimeMillis(); /** */ private long cpLockWaitStart; @@ -62,6 +62,9 @@ public class CheckpointMetricsTracker { /** */ private long cpMarkStart; + /** */ + private long cpMarkEnd; + /** */ private long cpLockRelease; @@ -81,11 +84,14 @@ public class CheckpointMetricsTracker { private long walCpRecordFsyncEnd; /** */ - private long splitAndSortCpPagesStart; + private long cpMarkerStoreEnd; /** */ private long splitAndSortCpPagesEnd; + /** */ + private long cpRecoveryDataWriteEnd; + /** */ private long listenersExecEnd; @@ -125,6 +131,11 @@ public void onMarkStart() { cpMarkStart = System.currentTimeMillis(); } + /** */ + public void onMarkEnd() { + cpMarkEnd = System.currentTimeMillis(); + } + /** */ public void onLockRelease() { cpLockRelease = System.currentTimeMillis(); @@ -156,8 +167,8 @@ public void onWalCpRecordFsyncStart() { } /** */ - public void onSplitAndSortCpPagesStart() { - splitAndSortCpPagesStart = System.currentTimeMillis(); + public void onCpMarkerStoreEnd() { + cpMarkerStoreEnd = System.currentTimeMillis(); } /** */ @@ -170,6 +181,11 @@ public void onWalCpRecordFsyncEnd() { walCpRecordFsyncEnd = System.currentTimeMillis(); } + /** */ + public void onWriteRecoveryDataEnd() { + cpRecoveryDataWriteEnd = System.currentTimeMillis(); + } + /** * @return Total checkpoint duration. */ @@ -202,7 +218,7 @@ public long listenersExecuteDuration() { * @return Checkpoint mark duration. */ public long markDuration() { - return cpPagesWriteStart - cpMarkStart; + return cpMarkEnd - cpMarkStart; } /** @@ -234,19 +250,26 @@ public long walCpRecordFsyncDuration() { } /** - * @return Duration of checkpoint entry buffer writing to file. - * - * @see CheckpointMarkersStorage#writeCheckpointEntry(long, UUID, WALPointer, CheckpointRecord, CheckpointEntryType, boolean) + * @return Duration of splitting and sorting checkpoint pages. */ - public long writeCheckpointEntryDuration() { - return splitAndSortCpPagesStart - walCpRecordFsyncEnd; + public long splitAndSortCpPagesDuration() { + return splitAndSortCpPagesEnd - walCpRecordFsyncEnd; } /** - * @return Duration of splitting and sorting checkpoint pages. + * @return Duration of writing recovery data. */ - public long splitAndSortCpPagesDuration() { - return splitAndSortCpPagesEnd - splitAndSortCpPagesStart; + public long recoveryDataWriteDuration() { + return cpRecoveryDataWriteEnd - cpMarkEnd; + } + + /** + * @return Duration of checkpoint entry buffer writing to file. + * + * @see CheckpointMarkersStorage#writeCheckpointEntry(long, UUID, WALPointer, CheckpointRecord, CheckpointEntryType, boolean) + */ + public long writeCheckpointEntryDuration() { + return cpMarkerStoreEnd - cpRecoveryDataWriteEnd; } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryEx.java index a592b835c5b0f5..01345b4190b106 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryEx.java @@ -136,13 +136,15 @@ public GridMultiCollectionWrapper beginCheckpoint(IgniteInternalFutu * @param buf Temporary buffer to write changes into. * @param pageWriter Checkpoint page write context. * @param tracker Checkpoint metrics tracker. + * @param keepDirty Don't reset dirty flag on page. * @throws IgniteCheckedException If failed to obtain page data. */ public void checkpointWritePage( FullPageId pageId, ByteBuffer buf, PageStoreWriter pageWriter, - CheckpointMetricsTracker tracker + CheckpointMetricsTracker tracker, + boolean keepDirty ) throws IgniteCheckedException; /** */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java index 4d554a894085a2..43c054190a290f 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java @@ -617,7 +617,7 @@ else if (throttlingPlc == ThrottlingPolicy.CHECKPOINT_BUFFER_ONLY) trackingIO.initNewPage(pageAddr, pageId, realPageSize(grpId), metrics); - if (!ctx.wal().disabled(fullId.groupId(), fullId.pageId())) { + if (!ctx.wal().pageRecordsDisabled(fullId.groupId(), fullId.pageId())) { if (!ctx.wal().isAlwaysWriteFullPages()) ctx.wal().log( new InitNewPageRecord( @@ -959,6 +959,8 @@ private void tryToRestorePage(FullPageId fullId, ByteBuffer buf) throws IgniteCh ByteBuffer curPage = null; ByteBuffer lastValidPage = null; + // TODO Try to restore from checkpoint recovery files. + try (WALIterator it = walMgr.replay(null)) { for (IgniteBiTuple tuple : it) { switch (tuple.getValue().type()) { @@ -1172,7 +1174,8 @@ private boolean isThrottlingEnabled() { FullPageId fullId, ByteBuffer buf, PageStoreWriter pageStoreWriter, - CheckpointMetricsTracker metricsTracker + CheckpointMetricsTracker metricsTracker, + boolean keepDirty ) throws IgniteCheckedException { assert buf.remaining() == pageSize(); @@ -1243,7 +1246,7 @@ private boolean isThrottlingEnabled() { } } - copyPageForCheckpoint(absPtr, fullId, buf, tag, pageSingleAcquire, pageStoreWriter, metricsTracker); + copyPageForCheckpoint(absPtr, fullId, buf, tag, pageSingleAcquire, pageStoreWriter, metricsTracker, keepDirty); } /** @@ -1253,6 +1256,7 @@ private boolean isThrottlingEnabled() { * @param pageSingleAcquire Page is acquired only once. We don't pin the page second time (until page will not be * copied) in case checkpoint temporary buffer is used. * @param pageStoreWriter Checkpoint page write context. + * @param keepDirty Keep page in checkpoint buffer and don't reset dirty flag. */ private void copyPageForCheckpoint( long absPtr, @@ -1261,7 +1265,8 @@ private void copyPageForCheckpoint( Integer tag, boolean pageSingleAcquire, PageStoreWriter pageStoreWriter, - CheckpointMetricsTracker tracker + CheckpointMetricsTracker tracker, + boolean keepDirty ) throws IgniteCheckedException { assert absPtr != 0; assert PageHeader.isAcquired(absPtr) || !isInCheckpoint(fullId); @@ -1286,7 +1291,9 @@ private void copyPageForCheckpoint( return; } - if (!clearCheckpoint(fullId)) { + boolean inCheckpoint = keepDirty ? isInCheckpoint(fullId) : clearCheckpoint(fullId); + + if (!inCheckpoint) { rwLock.writeUnlock(absPtr + PAGE_LOCK_OFFSET, OffheapReadWriteLock.TAG_LOCK_ALWAYS); if (!pageSingleAcquire) @@ -1299,20 +1306,22 @@ private void copyPageForCheckpoint( long tmpRelPtr = PageHeader.tempBufferPointer(absPtr); if (tmpRelPtr != INVALID_REL_PTR) { - PageHeader.tempBufferPointer(absPtr, INVALID_REL_PTR); - long tmpAbsPtr = checkpointPool.absolute(tmpRelPtr); - copyInBuffer(tmpAbsPtr, buf); + copyToBuffer(tmpAbsPtr, buf); + + if (!keepDirty) { + PageHeader.tempBufferPointer(absPtr, INVALID_REL_PTR); - PageHeader.fullPageId(tmpAbsPtr, NULL_PAGE); + PageHeader.fullPageId(tmpAbsPtr, NULL_PAGE); - GridUnsafe.zeroMemory(tmpAbsPtr + PAGE_OVERHEAD, pageSize()); + GridUnsafe.zeroMemory(tmpAbsPtr + PAGE_OVERHEAD, pageSize()); - if (tracker != null) - tracker.onCowPageWritten(); + if (tracker != null) + tracker.onCowPageWritten(); - releaseCheckpointBufferPage(tmpRelPtr); + releaseCheckpointBufferPage(tmpRelPtr); + } // Need release again because we pin page when resolve abs pointer, // and page did not have tmp buffer page. @@ -1320,9 +1329,10 @@ private void copyPageForCheckpoint( PageHeader.releasePage(absPtr); } else { - copyInBuffer(absPtr, buf); + copyToBuffer(absPtr, buf); - PageHeader.dirty(absPtr, false); + if (!keepDirty) + PageHeader.dirty(absPtr, false); } assert PageIO.getType(buf) != 0 : "Invalid state. Type is 0! pageId = " + U.hexLong(fullId.pageId()); @@ -1351,11 +1361,11 @@ private void copyPageForCheckpoint( /** * @param absPtr Absolute ptr. - * @param buf Tmp buffer. + * @param out Output buffer. */ - private void copyInBuffer(long absPtr, ByteBuffer buf) { - if (buf.isDirect()) { - long tmpPtr = GridUnsafe.bufferAddress(buf); + private void copyToBuffer(long absPtr, ByteBuffer out) { + if (out.isDirect()) { + long tmpPtr = GridUnsafe.bufferAddress(out); GridUnsafe.copyMemory(absPtr + PAGE_OVERHEAD, tmpPtr, pageSize()); @@ -1363,7 +1373,7 @@ private void copyInBuffer(long absPtr, ByteBuffer buf) { assert PageIO.getCrc(tmpPtr) == 0; //TODO GG-11480 } else { - byte[] arr = buf.array(); + byte[] arr = out.array(); assert arr != null; assert arr.length == pageSize(); @@ -1860,7 +1870,7 @@ private void setDirty(FullPageId pageId, long absPtr, boolean dirty, boolean for * */ void beforeReleaseWrite(FullPageId pageId, long ptr, boolean pageWalRec) throws IgniteCheckedException { - boolean walIsNotDisabled = walMgr != null && !walMgr.disabled(pageId.groupId(), pageId.pageId()); + boolean walIsNotDisabled = walMgr != null && !walMgr.pageRecordsDisabled(pageId.groupId(), pageId.pageId()); boolean pageRecOrAlwaysWriteFullPage = walMgr != null && (pageWalRec || walMgr.isAlwaysWriteFullPages()); if (pageRecOrAlwaysWriteFullPage && walIsNotDisabled) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/util/PageHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/util/PageHandler.java index 39ce3b4873f1c1..b62b7d4154caac 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/util/PageHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/util/PageHandler.java @@ -490,7 +490,7 @@ public static boolean isWalDeltaRecordNeeded( @Nullable Boolean walPlc) { // If the page is clean, then it is either newly allocated or just after checkpoint. // In both cases we have to write full page contents to WAL. - return wal != null && !wal.isAlwaysWriteFullPages() && walPlc != TRUE && !wal.disabled(cacheId, pageId) && + return wal != null && !wal.isAlwaysWriteFullPages() && walPlc != TRUE && !wal.pageRecordsDisabled(cacheId, pageId) && (walPlc == FALSE || pageMem.isDirty(cacheId, pageId, page)); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java index c1f04bd3baab7b..8fcb9d7cdf502b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java @@ -161,8 +161,7 @@ import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.HEADER_RECORD_SIZE; import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.readPosition; import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.readSegmentHeader; -import static org.apache.ignite.internal.processors.compress.CompressionProcessor.checkCompressionLevelBounds; -import static org.apache.ignite.internal.processors.compress.CompressionProcessor.getDefaultCompressionLevel; +import static org.apache.ignite.internal.processors.compress.CompressionProcessor.getCompressionLevel; import static org.apache.ignite.internal.processors.configuration.distributed.DistributedBooleanProperty.detachedBooleanProperty; import static org.apache.ignite.internal.util.io.GridFileUtils.ensureHardLinkAvailable; @@ -600,9 +599,7 @@ public void setFileIOFactory(FileIOFactory ioFactory) { cctx.kernalContext().compress().checkPageCompressionSupported(); - pageCompressionLevel = dsCfg.getWalPageCompressionLevel() != null ? - checkCompressionLevelBounds(dsCfg.getWalPageCompressionLevel(), pageCompression) : - getDefaultCompressionLevel(pageCompression); + pageCompressionLevel = getCompressionLevel(dsCfg.getWalPageCompressionLevel(), pageCompression); } } } @@ -1259,8 +1256,8 @@ private boolean segmentReservedOrLocked(long absIdx) { } /** {@inheritDoc} */ - @Override public boolean disabled(int grpId, long pageId) { - return cctx.walState().isDisabled(grpId, pageId); + @Override public boolean pageRecordsDisabled(int grpId, long pageId) { + return cctx.walState().isPageRecordsDisabled(grpId, pageId); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/compress/CompressionHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/compress/CompressionHandler.java index a1dd5146aad827..faa1eb7e0b69c8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/compress/CompressionHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/compress/CompressionHandler.java @@ -115,9 +115,7 @@ public static CompressionHandler create( CompressionProcessor comprProc = ctx.compress(); - int lvl = cfg.getDiskPageCompressionLevel() == null ? - CompressionProcessor.getDefaultCompressionLevel(diskPageCompr) : - CompressionProcessor.checkCompressionLevelBounds(cfg.getDiskPageCompressionLevel(), diskPageCompr); + int lvl = CompressionProcessor.getCompressionLevel(cfg.getDiskPageCompressionLevel(), diskPageCompr); File dbPath = ctx.pdsFolderResolver().resolveFolders().persistentStoreRootPath(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/compress/CompressionProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/compress/CompressionProcessor.java index fe760712161ed4..20abe4cd36d5ac 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/compress/CompressionProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/compress/CompressionProcessor.java @@ -23,8 +23,17 @@ import org.apache.ignite.configuration.DiskPageCompression; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteComponentType; +import org.apache.ignite.internal.ThreadLocalDirectByteBuffer; +import org.apache.ignite.internal.pagemem.PageUtils; import org.apache.ignite.internal.processors.GridProcessorAdapter; +import org.apache.ignite.internal.processors.cache.persistence.tree.io.CompactablePageIO; import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO; +import org.apache.ignite.internal.util.GridUnsafe; +import org.apache.ignite.internal.util.typedef.internal.U; + +import static org.apache.ignite.configuration.DataStorageConfiguration.MAX_PAGE_SIZE; +import static org.apache.ignite.configuration.DiskPageCompression.SKIP_GARBAGE; +import static org.apache.ignite.internal.util.GridUnsafe.NATIVE_BYTE_ORDER; /** * Compression processor. @@ -65,6 +74,9 @@ public class CompressionProcessor extends GridProcessorAdapter { /** */ protected static final byte SNAPPY_COMPRESSED_PAGE = 4; + /** Max page size. */ + private final ThreadLocalDirectByteBuffer compactBuf = new ThreadLocalDirectByteBuffer(MAX_PAGE_SIZE, NATIVE_BYTE_ORDER); + /** * @param ctx Kernal context. */ @@ -72,6 +84,16 @@ public CompressionProcessor(GridKernalContext ctx) { super(ctx); } + /** + * @param compressLevel Compression level. + * @param compression Compression algorithm. + * @return Compression level. + */ + public static int getCompressionLevel(Integer compressLevel, DiskPageCompression compression) { + return compressLevel != null ? checkCompressionLevelBounds(compressLevel, compression) : + getDefaultCompressionLevel(compression); + } + /** * @param compression Compression algorithm. * @return Default compression level. @@ -134,7 +156,7 @@ private static T fail() throws IgniteCheckedException { } /** - * Checks weither page compression is supported. + * Checks weither page compression can be used for page file storage. * * @throws IgniteCheckedException If compression is not supported. */ @@ -143,6 +165,8 @@ public void checkPageCompressionSupported() throws IgniteCheckedException { } /** + * Checks weither page file storage supports compression. + * * @param storagePath Storage path. * @param pageSize Page size. * @throws IgniteCheckedException If compression is not supported. @@ -151,10 +175,66 @@ public void checkPageCompressionSupported(Path storagePath, int pageSize) throws fail(); } + /** + * @param page Page. + * @param compactSize Compacted page size. + * @return The given page. + */ + protected static ByteBuffer setCompactionInfo(ByteBuffer page, int compactSize) { + return setCompressionInfo(page, SKIP_GARBAGE, compactSize, compactSize); + } + + /** + * @param page Page. + * @param compression Compression algorithm. + * @param compressedSize Compressed size. + * @param compactedSize Compact size. + * @return The given page. + */ + protected static ByteBuffer setCompressionInfo( + ByteBuffer page, + DiskPageCompression compression, + int compressedSize, + int compactedSize + ) { + assert compressedSize >= 0 && compressedSize <= Short.MAX_VALUE : compressedSize; + assert compactedSize >= 0 && compactedSize <= Short.MAX_VALUE : compactedSize; + + PageIO.setCompressionType(page, getCompressionType(compression)); + PageIO.setCompressedSize(page, (short)compressedSize); + PageIO.setCompactedSize(page, (short)compactedSize); + + return page; + } + + /** + * @param compression Compression. + * @return Level. + */ + private static byte getCompressionType(DiskPageCompression compression) { + if (compression == DiskPageCompression.DISABLED) + return UNCOMPRESSED_PAGE; + + switch (compression) { + case ZSTD: + return ZSTD_COMPRESSED_PAGE; + + case LZ4: + return LZ4_COMPRESSED_PAGE; + + case SNAPPY: + return SNAPPY_COMPRESSED_PAGE; + + case SKIP_GARBAGE: + return COMPACTED_PAGE; + } + throw new IllegalStateException("Unexpected compression: " + compression); + } + /** * @param page Page buffer. * @param pageSize Page size. - * @param storeBlockSize Store block size. + * @param blockSize Store block size. * @param compression Compression algorithm. * @param compressLevel Compression level. * @return Possibly compressed buffer. @@ -163,11 +243,94 @@ public void checkPageCompressionSupported(Path storagePath, int pageSize) throws public ByteBuffer compressPage( ByteBuffer page, int pageSize, - int storeBlockSize, + int blockSize, DiskPageCompression compression, int compressLevel ) throws IgniteCheckedException { - return fail(); + assert compression != null && compression != DiskPageCompression.DISABLED : compression; + assert U.isPow2(blockSize) : blockSize; + assert page.position() == 0 && page.limit() >= pageSize; + + int oldPageLimit = page.limit(); + + try { + // Page size will be less than page limit when TDE is enabled. To make compaction and compression work + // correctly we need to set limit to real page size. + page.limit(pageSize); + + ByteBuffer compactPage = doCompactPage(page, pageSize); + + int compactSize = compactPage.limit(); + + assert compactSize <= pageSize : compactSize; + + // If no need to compress further or configured just to skip garbage. + if (compactSize < blockSize || compression == SKIP_GARBAGE) + return setCompactionInfo(compactPage, compactSize); + + ByteBuffer compressedPage = doCompressPage(compression, compactPage, compactSize, compressLevel); + + assert compressedPage.position() == 0; + int compressedSize = compressedPage.limit(); + + int freeCompactBlocks = (pageSize - compactSize) / blockSize; + int freeCompressedBlocks = (pageSize - compressedSize) / blockSize; + + if (freeCompactBlocks >= freeCompressedBlocks) { + if (freeCompactBlocks == 0) + return page; // No blocks will be released. + + return setCompactionInfo(compactPage, compactSize); + } + + return setCompressionInfo(compressedPage, compression, compressedSize, compactSize); + } + finally { + page.limit(oldPageLimit); + } + } + + /** + * @param page Page buffer. + * @param pageSize Page size. + * @return Compacted page buffer. + */ + protected ByteBuffer doCompactPage(ByteBuffer page, int pageSize) throws IgniteCheckedException { + PageIO io = PageIO.getPageIO(page); + + ByteBuffer compactPage = compactBuf.get(); + + if (io instanceof CompactablePageIO) { + // Drop the garbage from the page. + ((CompactablePageIO)io).compactPage(page, compactPage, pageSize); + } + else { + // Direct buffer is required as output of this method. + if (page.isDirect()) + return page; + + PageUtils.putBytes(GridUnsafe.bufferAddress(compactPage), 0, page.array()); + + compactPage.limit(pageSize); + } + + return compactPage; + } + + /** + * @param compression Compression algorithm. + * @param compactPage Compacted page. + * @param compactSize Compacted page size. + * @param compressLevel Compression level. + * @return Compressed page. + */ + protected ByteBuffer doCompressPage( + DiskPageCompression compression, + ByteBuffer compactPage, + int compactSize, + int compressLevel + ) { + throw new IllegalStateException("Unsupported compression: " + compression); } /** @@ -176,7 +339,39 @@ public ByteBuffer compressPage( * @throws IgniteCheckedException If failed. */ public void decompressPage(ByteBuffer page, int pageSize) throws IgniteCheckedException { - if (PageIO.getCompressionType(page) != UNCOMPRESSED_PAGE) - fail(); + assert page.capacity() >= pageSize : "capacity=" + page.capacity() + ", pageSize=" + pageSize; + + byte compressType = PageIO.getCompressionType(page); + + if (compressType == UNCOMPRESSED_PAGE) + return; // Nothing to do. + + short compressedSize = PageIO.getCompressedSize(page); + short compactSize = PageIO.getCompactedSize(page); + + assert compactSize <= pageSize && compactSize >= compressedSize; + + if (compressType == COMPACTED_PAGE) { + // Just setup bounds before restoring the page. + page.position(0).limit(compactSize); + } + else + doDecompressPage(compressType, page, compressedSize, compactSize); + + PageIO io = PageIO.getPageIO(page); + + if (io instanceof CompactablePageIO) + ((CompactablePageIO)io).restorePage(page, pageSize); + else { + assert compactSize == pageSize + : "Wrong compacted page size [compactSize=" + compactSize + ", pageSize=" + pageSize + ']'; + } + + setCompressionInfo(page, DiskPageCompression.DISABLED, 0, 0); + } + + /** */ + protected void doDecompressPage(int compressType, ByteBuffer page, int compressedSize, int compactSize) { + throw new IllegalStateException("Unsupported compression: " + compressType); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/FilePerformanceStatisticsReader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/FilePerformanceStatisticsReader.java index b0c8244734a32d..ef040b395cbfcf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/FilePerformanceStatisticsReader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/FilePerformanceStatisticsReader.java @@ -57,6 +57,7 @@ import static org.apache.ignite.internal.processors.performancestatistics.OperationType.QUERY_ROWS; import static org.apache.ignite.internal.processors.performancestatistics.OperationType.TASK; import static org.apache.ignite.internal.processors.performancestatistics.OperationType.TX_COMMIT; +import static org.apache.ignite.internal.processors.performancestatistics.OperationType.VERSION; import static org.apache.ignite.internal.processors.performancestatistics.OperationType.cacheOperation; import static org.apache.ignite.internal.processors.performancestatistics.OperationType.cacheRecordSize; import static org.apache.ignite.internal.processors.performancestatistics.OperationType.cacheStartRecordSize; @@ -145,6 +146,7 @@ public void read(List filesOrDirs) throws IOException { try (FileIO io = ioFactory.create(file)) { fileIo = io; + boolean first = true; while (true) { if (io.read(buf) <= 0) { @@ -166,7 +168,8 @@ public void read(List filesOrDirs) throws IOException { buf.mark(); - while (deserialize(buf, nodeId)) { + while (deserialize(buf, nodeId, first)) { + first = false; if (forwardRead != null && forwardRead.found) { if (forwardRead.resetBuf) { buf.limit(0); @@ -201,9 +204,10 @@ public void read(List filesOrDirs) throws IOException { /** * @param buf Buffer. * @param nodeId Node id. + * @param firstRecord Is it first record in the file. * @return {@code True} if operation deserialized. {@code False} if not enough bytes. */ - private boolean deserialize(ByteBuffer buf, UUID nodeId) throws IOException { + private boolean deserialize(ByteBuffer buf, UUID nodeId, boolean firstRecord) throws IOException { if (buf.remaining() < 1) return false; @@ -211,7 +215,20 @@ private boolean deserialize(ByteBuffer buf, UUID nodeId) throws IOException { OperationType opType = OperationType.of(opTypeByte); - if (cacheOperation(opType)) { + if (firstRecord && opType != VERSION) + throw new IgniteException("Unsupported file format"); + + if (opType == VERSION) { + short ver = buf.getShort(); + + if (ver != FilePerformanceStatisticsWriter.FILE_FORMAT_VERSION) { + throw new IgniteException("Unsupported file format version [fileVer=" + ver + ", supportedVer=" + + FilePerformanceStatisticsWriter.FILE_FORMAT_VERSION + ']'); + } + + return true; + } + else if (cacheOperation(opType)) { if (buf.remaining() < cacheRecordSize()) return false; @@ -459,6 +476,7 @@ else if (opType == CHECKPOINT) { long walCpRecordFsyncDuration = buf.getLong(); long writeCheckpointEntryDuration = buf.getLong(); long splitAndSortCpPagesDuration = buf.getLong(); + long recoveryDataWriteDuration = buf.getLong(); long totalDuration = buf.getLong(); long cpStartTime = buf.getLong(); int pagesSize = buf.getInt(); @@ -477,6 +495,7 @@ else if (opType == CHECKPOINT) { walCpRecordFsyncDuration, writeCheckpointEntryDuration, splitAndSortCpPagesDuration, + recoveryDataWriteDuration, totalDuration, cpStartTime, pagesSize, diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/FilePerformanceStatisticsWriter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/FilePerformanceStatisticsWriter.java index 293b675738e791..1aac2d6cc59af5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/FilePerformanceStatisticsWriter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/FilePerformanceStatisticsWriter.java @@ -96,6 +96,12 @@ public class FilePerformanceStatisticsWriter { /** Default maximum cached strings threshold. String caching will stop on threshold excess. */ public static final int DFLT_CACHED_STRINGS_THRESHOLD = 10 * 1024; + /** + * File format version. This version should be incremented each time when format of existing events are + * changed (fields added/removed) to avoid unexpected non-informative errors on deserialization. + */ + public static final short FILE_FORMAT_VERSION = 1; + /** File writer thread name. */ static final String WRITER_THREAD_NAME = "performance-statistics-writer"; @@ -159,6 +165,8 @@ public FilePerformanceStatisticsWriter(GridKernalContext ctx) throws IgniteCheck ringByteBuf = new SegmentedRingByteBuffer(bufSize, fileMaxSize, SegmentedRingByteBuffer.BufferMode.DIRECT); fileWriter = new FileWriter(ctx, log); + + doWrite(OperationType.VERSION, Short.BYTES, buf -> buf.putShort(FILE_FORMAT_VERSION)); } /** Starts collecting performance statistics. */ @@ -377,6 +385,7 @@ File file() { * @param walCpRecordFsyncDuration Wal cp record fsync duration. * @param writeCpEntryDuration Write checkpoint entry duration. * @param splitAndSortCpPagesDuration Split and sort cp pages duration. + * @param recoveryDataWriteDuration Recovery data write duration in milliseconds. * @param totalDuration Total duration in milliseconds. * @param cpStartTime Checkpoint start time in milliseconds. * @param pagesSize Pages size. @@ -394,6 +403,7 @@ public void checkpoint( long walCpRecordFsyncDuration, long writeCpEntryDuration, long splitAndSortCpPagesDuration, + long recoveryDataWriteDuration, long totalDuration, long cpStartTime, int pagesSize, @@ -411,6 +421,7 @@ public void checkpoint( buf.putLong(walCpRecordFsyncDuration); buf.putLong(writeCpEntryDuration); buf.putLong(splitAndSortCpPagesDuration); + buf.putLong(recoveryDataWriteDuration); buf.putLong(totalDuration); buf.putLong(cpStartTime); buf.putInt(pagesSize); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/OperationType.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/OperationType.java index df18195b6538df..9f09bc9ac3d890 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/OperationType.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/OperationType.java @@ -91,7 +91,10 @@ public enum OperationType { QUERY_ROWS(20), /** Custom query property. */ - QUERY_PROPERTY(21); + QUERY_PROPERTY(21), + + /** Version */ + VERSION(255); /** Cache operations. */ public static final EnumSet CACHE_OPS = EnumSet.of(CACHE_GET, CACHE_PUT, CACHE_REMOVE, @@ -217,7 +220,7 @@ public static int jobRecordSize() { /** @return Checkpoint record size. */ public static int checkpointRecordSize() { - return 8 * 12 + 4 * 3; + return 9 * 12 + 4 * 3; } /** @return Pages write throttle record size. */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsHandler.java index cc1a203a03faf2..f8c9b58a056b10 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsHandler.java @@ -131,6 +131,7 @@ void queryReads(UUID nodeId, GridCacheQueryType type, UUID queryNodeId, long id, * @param walCpRecordFsyncDuration Wal cp record fsync duration. * @param writeCpEntryDuration Write checkpoint entry duration. * @param splitAndSortCpPagesDuration Split and sort cp pages duration. + * @param recoveryDataWriteDuration Recovery data write duration. * @param totalDuration Total duration in milliseconds. * @param cpStartTime Checkpoint start time in milliseconds. * @param pagesSize Pages size. @@ -149,6 +150,7 @@ void checkpoint( long walCpRecordFsyncDuration, long writeCpEntryDuration, long splitAndSortCpPagesDuration, + long recoveryDataWriteDuration, long totalDuration, long cpStartTime, int pagesSize, diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsProcessor.java index fbafb352afe54a..6ea39beba5f583 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsProcessor.java @@ -257,6 +257,7 @@ public void checkpoint( long walCpRecordFsyncDuration, long writeCpEntryDuration, long splitAndSortCpPagesDuration, + long recoveryDataWriteDuration, long totalDuration, long cpStartTime, int pagesSize, @@ -272,6 +273,7 @@ public void checkpoint( walCpRecordFsyncDuration, writeCpEntryDuration, splitAndSortCpPagesDuration, + recoveryDataWriteDuration, totalDuration, cpStartTime, pagesSize, diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridConcurrentMultiPairQueue.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridConcurrentMultiPairQueue.java index 858d9332111dfa..dcc637efeeaf88 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridConcurrentMultiPairQueue.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridConcurrentMultiPairQueue.java @@ -146,6 +146,13 @@ public boolean next(Result res) { return true; } + /** + * Rewind the queue to start iterating from the beginning. + */ + public void rewind() { + pos.set(0); + } + /** * @return {@code true} if empty. */ diff --git a/modules/core/src/main/java/org/apache/ignite/spi/encryption/EncryptionSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/encryption/EncryptionSpi.java index 31f1b8416cfad6..b7d7d5412aeef7 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/encryption/EncryptionSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/encryption/EncryptionSpi.java @@ -81,6 +81,28 @@ public interface EncryptionSpi extends IgniteSpi { */ byte[] decrypt(byte[] data, Serializable key); + /** + * Decrypts data encrypted with {@link #encrypt(ByteBuffer, Serializable, ByteBuffer)}. + * Note: Default method implementation was introduced for compatibility. This implementation is not effective + * for direct byte buffers, since it requires additional array creation and copy. + * It's better to have own implementation of this method in SPI. + * + * @param data Data to decrypt. + * @param key Encryption key. + * @param res Destination of the decrypted data. + */ + default void decrypt(ByteBuffer data, Serializable key, ByteBuffer res) { + byte[] arr; + + if (data.hasArray()) + arr = data.array(); + else { + arr = new byte[data.remaining()]; + data.get(arr); + } + res.put(decrypt(arr, key)); + } + /** * Decrypts data encrypted with {@link #encryptNoPadding(ByteBuffer, Serializable, ByteBuffer)} * diff --git a/modules/core/src/main/java/org/apache/ignite/spi/encryption/keystore/KeystoreEncryptionSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/encryption/keystore/KeystoreEncryptionSpi.java index effe8bf3439d1c..7613c5bccc6642 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/encryption/keystore/KeystoreEncryptionSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/encryption/keystore/KeystoreEncryptionSpi.java @@ -217,8 +217,22 @@ public class KeystoreEncryptionSpi extends IgniteSpiAdapter implements Encryptio } } + /** {@inheritDoc} */ + @Override public void decrypt(ByteBuffer data, Serializable key, ByteBuffer res) { + doDecryption(data, aesWithPadding.get(), key, res); + } + /** {@inheritDoc} */ @Override public void decryptNoPadding(ByteBuffer data, Serializable key, ByteBuffer res) { + doDecryption(data, aesWithoutPadding.get(), key, res); + } + + /** + * @param data Plain data. + * @param cipher Cipher. + * @param key Encryption key. + */ + private void doEncryption(ByteBuffer data, Cipher cipher, Serializable key, ByteBuffer res) { assert key instanceof KeystoreEncryptionKey; ensureStarted(); @@ -226,28 +240,27 @@ public class KeystoreEncryptionSpi extends IgniteSpiAdapter implements Encryptio try { SecretKeySpec keySpec = new SecretKeySpec(((KeystoreEncryptionKey)key).key().getEncoded(), CIPHER_ALGO); - Cipher cipher = aesWithoutPadding.get(); - - byte[] iv = new byte[cipher.getBlockSize()]; + byte[] iv = initVector(cipher); - data.get(iv); + res.put(iv); - cipher.init(DECRYPT_MODE, keySpec, new IvParameterSpec(iv)); + cipher.init(ENCRYPT_MODE, keySpec, new IvParameterSpec(iv)); cipher.doFinal(data, res); } - catch (InvalidAlgorithmParameterException | InvalidKeyException | IllegalBlockSizeException | - ShortBufferException | BadPaddingException e) { + catch (ShortBufferException | InvalidAlgorithmParameterException | InvalidKeyException | + IllegalBlockSizeException | BadPaddingException e) { throw new IgniteSpiException(e); } } /** - * @param data Plain data. + * @param data Data to decrypt. * @param cipher Cipher. * @param key Encryption key. + * @param res Destination of the decrypted data. */ - private void doEncryption(ByteBuffer data, Cipher cipher, Serializable key, ByteBuffer res) { + private void doDecryption(ByteBuffer data, Cipher cipher, Serializable key, ByteBuffer res) { assert key instanceof KeystoreEncryptionKey; ensureStarted(); @@ -255,16 +268,16 @@ private void doEncryption(ByteBuffer data, Cipher cipher, Serializable key, Byte try { SecretKeySpec keySpec = new SecretKeySpec(((KeystoreEncryptionKey)key).key().getEncoded(), CIPHER_ALGO); - byte[] iv = initVector(cipher); + byte[] iv = new byte[cipher.getBlockSize()]; - res.put(iv); + data.get(iv); - cipher.init(ENCRYPT_MODE, keySpec, new IvParameterSpec(iv)); + cipher.init(DECRYPT_MODE, keySpec, new IvParameterSpec(iv)); cipher.doFinal(data, res); } - catch (ShortBufferException | InvalidAlgorithmParameterException | InvalidKeyException | - IllegalBlockSizeException | BadPaddingException e) { + catch (InvalidAlgorithmParameterException | InvalidKeyException | IllegalBlockSizeException | + ShortBufferException | BadPaddingException e) { throw new IgniteSpiException(e); } } @@ -346,10 +359,17 @@ private int encryptedSize(int dataSize, String algo) { switch (algo) { case AES_WITH_PADDING: + // One extra block is added to the start of the message to store initial vector. + // If original data is not aligned to block size we pad bytes to align to block size. + // If original data is aligned to block size we pad 1 additional block. + // In general, we pad `blockSize - (dataSize % blockSize)` bytes. + // See RFC2315 for more information about padding. cntBlocks = 2; break; case AES_WITHOUT_PADDING: + // We use AES_WITHOUT_PADDING only for data aligned to block size. In this case only one extra block + // is added to store initial vector. cntBlocks = 1; break; diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsRecoveryAfterFileCorruptionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsRecoveryAfterFileCorruptionTest.java index 3373819303820b..01464269f76102 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsRecoveryAfterFileCorruptionTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsRecoveryAfterFileCorruptionTest.java @@ -384,7 +384,7 @@ private void generateWal( if (pageIds.contains(fullId)) { long cpStart = System.nanoTime(); - mem.checkpointWritePage(fullId, tmpBuf, pageStoreWriter, null); + mem.checkpointWritePage(fullId, tmpBuf, pageStoreWriter, null, false); long cpEnd = System.nanoTime(); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsCheckpointRecoveryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsCheckpointRecoveryTest.java new file mode 100644 index 00000000000000..6664e2b9ec3131 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsCheckpointRecoveryTest.java @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.db; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.file.OpenOption; +import java.util.Collection; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteState; +import org.apache.ignite.Ignition; +import org.apache.ignite.cluster.ClusterState; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.failure.StopNodeFailureHandler; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.encryption.AbstractEncryptionTest; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIODecorator; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; +import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.spi.encryption.keystore.KeystoreEncryptionSpi; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.GridAbstractTest; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointRecoveryFileStorage.FILE_NAME_PATTERN; +import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.PART_FILE_PREFIX; + +/** + * Class containing tests for applying checkpoint recovery data. + */ +@RunWith(Parameterized.class) +public class IgnitePdsCheckpointRecoveryTest extends GridCommonAbstractTest { + /** */ + private static final int KEYS_CNT = 10_000; + + /** */ + private final AtomicBoolean fail = new AtomicBoolean(); + + /** */ + @Parameterized.Parameter(0) + public boolean encrypt; + + /** */ + @Parameterized.Parameters(name = "encrypt={0}") + public static Collection parameters() { + return F.asList(new Object[] {false}, new Object[] {true}); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + KeystoreEncryptionSpi encSpi = new KeystoreEncryptionSpi(); + + encSpi.setKeyStorePath(AbstractEncryptionTest.KEYSTORE_PATH); + encSpi.setKeyStorePassword(AbstractEncryptionTest.KEYSTORE_PASSWORD.toCharArray()); + + return super.getConfiguration(igniteInstanceName) + .setFailureHandler(new StopNodeFailureHandler()) + .setEncryptionSpi(encSpi) + .setDataStorageConfiguration(new DataStorageConfiguration() + .setFileIOFactory(new PageStoreSpoilingFileIOFactory(fail)) + .setWriteRecoveryDataOnCheckpoint(true) + .setDefaultDataRegionConfiguration(new DataRegionConfiguration() + .setPersistenceEnabled(true) + )); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + stopAllGrids(); + + cleanPersistenceDir(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + cleanPersistenceDir(); + } + + /** */ + @Test + public void testRecoverFromCheckpointRecoveryFiles() throws Exception { + IgniteEx ignite = startGrid(0); + ignite.cluster().state(ClusterState.ACTIVE); + + CacheConfiguration cacheCfg = GridAbstractTest.defaultCacheConfiguration() + .setEncryptionEnabled(encrypt); + + IgniteCache cache = ignite.createCache(cacheCfg); + + for (int i = 0; i < KEYS_CNT; i++) + cache.put(i, i); + + AtomicInteger val = new AtomicInteger(KEYS_CNT); + + IgniteInternalFuture fut = GridTestUtils.runAsync(() -> { + while (true) + cache.put(ThreadLocalRandom.current().nextInt(KEYS_CNT), val.incrementAndGet()); + }); + + File cpDir = dbMgr(ignite).checkpointDirectory(); + + fail.set(true); + + try { + forceCheckpoint(); + } + catch (Throwable ignore) { + // Expected. + } + + try { + fut.get(10_000); + } + catch (Throwable ignore) { + // Expected. + } + + assertTrue(GridTestUtils.waitForCondition( + () -> Ignition.state(getTestIgniteInstanceName(0)) == IgniteState.STOPPED_ON_FAILURE, + 10_000 + )); + + fail.set(false); + + assertTrue(cpDir.listFiles(((dir, name) -> FILE_NAME_PATTERN.matcher(name).matches())).length > 0); + + ignite = startGrid(0); + IgniteCache cache0 = ignite.cache(DEFAULT_CACHE_NAME); + + int max = 0; + for (int i = 0; i < KEYS_CNT; i++) + max = Math.max(max, cache0.get(i)); + + // There are two cases possible: + // 1. Failure during put before writting cache entry ta WAL, in this case, after restore we will get last value + // in cache: val.get() - 1 + // 2. Failure during put after writting cache entry ta WAL, in this case, after restore we will get last value + // in cache: val.get() + assertTrue("Expected value between " + (val.get() - 1) + " and " + val.get() + ", actual value: " + max, + max >= val.get() - 1 && max <= val.get()); + } + + /** */ + private static final class PageStoreSpoilingFileIOFactory implements FileIOFactory { + /** */ + private final FileIOFactory delegateFactory; + + /** */ + private final AtomicBoolean failFlag; + + /** */ + PageStoreSpoilingFileIOFactory(AtomicBoolean failFlag) { + delegateFactory = new RandomAccessFileIOFactory(); + + this.failFlag = failFlag; + } + + /** {@inheritDoc}*/ + @Override public FileIO create(File file, OpenOption... modes) throws IOException { + FileIO delegate = delegateFactory.create(file, modes); + + return file.getName().startsWith(PART_FILE_PREFIX) ? new PageStoreSpoiling(delegate) : delegate; + } + + /** */ + final class PageStoreSpoiling extends FileIODecorator { + /** */ + AtomicInteger spoiledPages = new AtomicInteger(); + + /** + * @param delegate File I/O delegate + */ + public PageStoreSpoiling(FileIO delegate) { + super(delegate); + } + + /** {@inheritDoc} */ + @Override public int writeFully(ByteBuffer srcBuf, long position) throws IOException { + if (failFlag.get()) { + // Spoil first 10 pages and after that throw an exception. + if (spoiledPages.getAndIncrement() > 10) + throw new IOException("Test exception."); + else { + srcBuf = ByteBuffer.allocate(srcBuf.remaining()).order(ByteOrder.nativeOrder()); + ThreadLocalRandom.current().nextBytes(srcBuf.array()); + } + } + + return delegate.writeFully(srcBuf, position); + } + } + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCheckpointSimulationWithRealCpDisabledTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCheckpointSimulationWithRealCpDisabledTest.java index f18337e42bfc7e..3fcde63d8eb451 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCheckpointSimulationWithRealCpDisabledTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCheckpointSimulationWithRealCpDisabledTest.java @@ -603,7 +603,7 @@ public void testDirtyFlag() throws Exception { buf.rewind(); mem.checkpointWritePage(fullId, buf, (fullPageId, buffer, tag) -> { - }, null); + }, null, false); buf.position(PageIO.COMMON_HEADER_END); @@ -940,7 +940,7 @@ private IgniteBiTuple, WALPointer> runCheckpointing( }; while (true) { - mem.checkpointWritePage(fullId, tmpBuf, pageStoreWriter, null); + mem.checkpointWritePage(fullId, tmpBuf, pageStoreWriter, null, false); tag = tag0.get(); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java index 03418defa38225..358c28b7ff74b5 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java @@ -119,7 +119,7 @@ public class NoOpWALManager implements IgniteWriteAheadLogManager { } /** {@inheritDoc} */ - @Override public boolean disabled(int grpId, long pageId) { + @Override public boolean pageRecordsDisabled(int grpId, long pageId) { return false; } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java index c71f81b4c292af..7e36fa1d33fae5 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java @@ -342,7 +342,7 @@ private void doCheckpoint( ByteBuffer buf = ByteBuffer.wrap(data); - memory.checkpointWritePage(cpPage, buf, pageStoreWriter, null); + memory.checkpointWritePage(cpPage, buf, pageStoreWriter, null, false); while (memory.isCpBufferOverflowThresholdExceeded()) { FullPageId cpPageId = memory.pullPageFromCpBuffer(); @@ -356,7 +356,7 @@ private void doCheckpoint( tmpWriteBuf.rewind(); - memory.checkpointWritePage(cpPageId, tmpWriteBuf, pageStoreWriter, null); + memory.checkpointWritePage(cpPageId, tmpWriteBuf, pageStoreWriter, null, false); } } @@ -416,7 +416,7 @@ public void testCheckpointProtocolCannotReplaceUnwrittenPage() throws Exception assertTrue("Should oom before check replaced page.", oom); assertTrue("Missing page: " + fullPageId, memory.hasLoadedPage(fullPageId)); - }, null); + }, null, false); } /** @@ -474,7 +474,7 @@ private void testCheckpointBufferCantOverflowWithThrottlingMixedLoad(PageMemoryI memory.checkpointWritePage(checkpointPage, ByteBuffer.allocate(PAGE_SIZE), (fullPageId, buffer, tag) -> { // No-op. - }, mockTracker); + }, mockTracker, false); memory.finishCheckpoint(); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/performancestatistics/AbstractPerformanceStatisticsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/performancestatistics/AbstractPerformanceStatisticsTest.java index 4384d5471bd8e5..cc0d4ad41003cf 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/performancestatistics/AbstractPerformanceStatisticsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/performancestatistics/AbstractPerformanceStatisticsTest.java @@ -223,6 +223,7 @@ public static class TestHandler implements PerformanceStatisticsHandler { long walCpRecordFsyncDuration, long writeCpEntryDuration, long splitAndSortCpPagesDuration, + long recoveryDataWriteDuration, long totalDuration, long cpStartTime, int pagesSize, diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/performancestatistics/CheckpointTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/performancestatistics/CheckpointTest.java index 62ab155073500e..57d3b7337d842a 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/performancestatistics/CheckpointTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/performancestatistics/CheckpointTest.java @@ -125,6 +125,7 @@ public void testCheckpoint() throws Exception { long walCpRecordFsyncDuration, long writeCpEntryDuration, long splitAndSortCpPagesDuration, + long recoveryDataWriteDuration, long totalDuration, long cpStartTime, int pagesSize, @@ -150,6 +151,8 @@ public void testCheckpoint() throws Exception { writeCpEntryDuration); assertEquals(mreg.findMetric("LastCheckpointSplitAndSortPagesDuration").value(), splitAndSortCpPagesDuration); + assertEquals(mreg.findMetric("LastCheckpointRecoveryDataWriteDuration").value(), + recoveryDataWriteDuration); assertEquals(mreg.findMetric("LastCheckpointDuration").value(), totalDuration); assertEquals(lastStart.value(), cpStartTime); assertEquals(mreg.findMetric("LastCheckpointTotalPagesNumber").value(), pagesSize); diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite8.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite8.java index c8a8ae67bee147..30504e654dd74e 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite8.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite8.java @@ -33,6 +33,7 @@ import org.apache.ignite.internal.processors.cache.persistence.NoUnnecessaryRebalanceTest; import org.apache.ignite.internal.processors.cache.persistence.PagesPossibleCorruptionDiagnosticTest; import org.apache.ignite.internal.processors.cache.persistence.PendingTreeCorruptionTest; +import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsCheckpointRecoveryTest; import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsPageReplacementDuringPartitionClearTest; import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsTransactionsHangTest; import org.apache.ignite.internal.processors.cache.persistence.db.wal.HistoricalReservationTest; @@ -105,6 +106,8 @@ public static List> suite(Collection ignoredTests) { GridTestUtils.addTestIfNeeded(suite, PagesPossibleCorruptionDiagnosticTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, MaintenancePersistenceTaskTest.class, ignoredTests); + GridTestUtils.addTestIfNeeded(suite, IgnitePdsCheckpointRecoveryTest.class, ignoredTests); + return suite; }