Skip to content

Commit

Permalink
IGNITE-20697 Store crash recovery data to checkpoint recovery files (…
Browse files Browse the repository at this point in the history
…WIP)
  • Loading branch information
alex-plekhanov committed Nov 2, 2023
1 parent 27ed13a commit b7ed255
Show file tree
Hide file tree
Showing 43 changed files with 1,806 additions and 356 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@
import org.apache.ignite.configuration.DiskPageCompression;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.ThreadLocalDirectByteBuffer;
import org.apache.ignite.internal.pagemem.PageUtils;
import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.CompactablePageIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.internal.util.typedef.internal.U;
Expand All @@ -41,16 +39,12 @@
import static java.nio.file.StandardOpenOption.CREATE;
import static java.nio.file.StandardOpenOption.WRITE;
import static org.apache.ignite.configuration.DataStorageConfiguration.MAX_PAGE_SIZE;
import static org.apache.ignite.configuration.DiskPageCompression.SKIP_GARBAGE;
import static org.apache.ignite.internal.util.GridUnsafe.NATIVE_BYTE_ORDER;

/**
* Compression processor.
*/
public class CompressionProcessorImpl extends CompressionProcessor {
/** Max page size. */
private final ThreadLocalDirectByteBuffer compactBuf = new ThreadLocalDirectByteBuffer(MAX_PAGE_SIZE, NATIVE_BYTE_ORDER);

/** A bit more than max page size, extra space is required by compressors. */
private final ThreadLocalDirectByteBuffer compressBuf =
new ThreadLocalDirectByteBuffer(maxCompressedBufferSize(MAX_PAGE_SIZE), NATIVE_BYTE_ORDER);
Expand Down Expand Up @@ -92,84 +86,6 @@ public CompressionProcessorImpl(GridKernalContext ctx) {
checkPunchHole(storagePath, fsBlockSize);
}

/** {@inheritDoc} */
@Override public ByteBuffer compressPage(
ByteBuffer page,
int pageSize,
int blockSize,
DiskPageCompression compression,
int compressLevel
) throws IgniteCheckedException {
assert compression != null && compression != DiskPageCompression.DISABLED : compression;
assert U.isPow2(blockSize) : blockSize;
assert page.position() == 0 && page.limit() >= pageSize;

int oldPageLimit = page.limit();

try {
// Page size will be less than page limit when TDE is enabled. To make compaction and compression work
// correctly we need to set limit to real page size.
page.limit(pageSize);

ByteBuffer compactPage = doCompactPage(page, pageSize);

int compactSize = compactPage.limit();

assert compactSize <= pageSize : compactSize;

// If no need to compress further or configured just to skip garbage.
if (compactSize < blockSize || compression == SKIP_GARBAGE)
return setCompactionInfo(compactPage, compactSize);

ByteBuffer compressedPage = doCompressPage(compression, compactPage, compactSize, compressLevel);

assert compressedPage.position() == 0;
int compressedSize = compressedPage.limit();

int freeCompactBlocks = (pageSize - compactSize) / blockSize;
int freeCompressedBlocks = (pageSize - compressedSize) / blockSize;

if (freeCompactBlocks >= freeCompressedBlocks) {
if (freeCompactBlocks == 0)
return page; // No blocks will be released.

return setCompactionInfo(compactPage, compactSize);
}

return setCompressionInfo(compressedPage, compression, compressedSize, compactSize);
}
finally {
page.limit(oldPageLimit);
}
}

/**
* @param page Page buffer.
* @param pageSize Page size.
* @return Compacted page buffer.
*/
private ByteBuffer doCompactPage(ByteBuffer page, int pageSize) throws IgniteCheckedException {
PageIO io = PageIO.getPageIO(page);

ByteBuffer compactPage = compactBuf.get();

if (io instanceof CompactablePageIO) {
// Drop the garbage from the page.
((CompactablePageIO)io).compactPage(page, compactPage, pageSize);
}
else {
// Direct buffer is required as output of this method.
if (page.isDirect())
return page;

PageUtils.putBytes(GridUnsafe.bufferAddress(compactPage), 0, page.array());

compactPage.limit(pageSize);
}

return compactPage;
}

/** Check if filesystem actually supports punching holes. */
private void checkPunchHole(Path storagePath, int fsBlockSz) throws IgniteException {
ByteBuffer buffer = null;
Expand Down Expand Up @@ -198,41 +114,19 @@ private void checkPunchHole(Path storagePath, int fsBlockSz) throws IgniteExcept
}
}

/**
* @param page Page.
* @param compactSize Compacted page size.
* @return The given page.
*/
private static ByteBuffer setCompactionInfo(ByteBuffer page, int compactSize) {
return setCompressionInfo(page, SKIP_GARBAGE, compactSize, compactSize);
}

/**
* @param page Page.
* @param compression Compression algorithm.
* @param compressedSize Compressed size.
* @param compactedSize Compact size.
* @return The given page.
*/
private static ByteBuffer setCompressionInfo(ByteBuffer page, DiskPageCompression compression, int compressedSize, int compactedSize) {
assert compressedSize >= 0 && compressedSize <= Short.MAX_VALUE : compressedSize;
assert compactedSize >= 0 && compactedSize <= Short.MAX_VALUE : compactedSize;

PageIO.setCompressionType(page, getCompressionType(compression));
PageIO.setCompressedSize(page, (short)compressedSize);
PageIO.setCompactedSize(page, (short)compactedSize);

return page;
}

/**
* @param compression Compression algorithm.
* @param compactPage Compacted page.
* @param compactSize Compacted page size.
* @param compressLevel Compression level.
* @return Compressed page.
*/
private ByteBuffer doCompressPage(DiskPageCompression compression, ByteBuffer compactPage, int compactSize, int compressLevel) {
@Override protected ByteBuffer doCompressPage(
DiskPageCompression compression,
ByteBuffer compactPage,
int compactSize,
int compressLevel
) {
switch (compression) {
case ZSTD:
return compressPageZstd(compactPage, compactSize, compressLevel);
Expand Down Expand Up @@ -319,99 +213,46 @@ private static void copyPageHeader(ByteBuffer compactPage, ByteBuffer compressed
compactPage.limit(compactSize);
}

/**
* @param compression Compression.
* @return Level.
*/
private static byte getCompressionType(DiskPageCompression compression) {
if (compression == DiskPageCompression.DISABLED)
return UNCOMPRESSED_PAGE;

switch (compression) {
case ZSTD:
return ZSTD_COMPRESSED_PAGE;

case LZ4:
return LZ4_COMPRESSED_PAGE;

case SNAPPY:
return SNAPPY_COMPRESSED_PAGE;

case SKIP_GARBAGE:
return COMPACTED_PAGE;
}
throw new IllegalStateException("Unexpected compression: " + compression);
}

/** {@inheritDoc} */
@Override public void decompressPage(ByteBuffer page, int pageSize) throws IgniteCheckedException {
assert page.capacity() >= pageSize : "capacity=" + page.capacity() + ", pageSize=" + pageSize;

byte compressType = PageIO.getCompressionType(page);

if (compressType == UNCOMPRESSED_PAGE)
return; // Nothing to do.
@Override protected void doDecompressPage(int compressType, ByteBuffer page, int compressedSize, int compactSize) {
ByteBuffer dst = compressBuf.get();

short compressedSize = PageIO.getCompressedSize(page);
short compactSize = PageIO.getCompactedSize(page);
// Position on a part that needs to be decompressed.
page.limit(compressedSize)
.position(PageIO.COMMON_HEADER_END);

assert compactSize <= pageSize && compactSize >= compressedSize;
// LZ4 needs this limit to be exact.
dst.limit(compactSize - PageIO.COMMON_HEADER_END);

if (compressType == COMPACTED_PAGE) {
// Just setup bounds before restoring the page.
page.position(0).limit(compactSize);
}
else {
ByteBuffer dst = compressBuf.get();

// Position on a part that needs to be decompressed.
page.limit(compressedSize)
.position(PageIO.COMMON_HEADER_END);

// LZ4 needs this limit to be exact.
dst.limit(compactSize - PageIO.COMMON_HEADER_END);

switch (compressType) {
case ZSTD_COMPRESSED_PAGE:
Zstd.decompress(dst, page);
dst.flip();

break;
switch (compressType) {
case ZSTD_COMPRESSED_PAGE:
Zstd.decompress(dst, page);
dst.flip();

case LZ4_COMPRESSED_PAGE:
Lz4.decompress(page, dst);
dst.flip();
break;

break;
case LZ4_COMPRESSED_PAGE:
Lz4.decompress(page, dst);
dst.flip();

case SNAPPY_COMPRESSED_PAGE:
try {
Snappy.uncompress(page, dst);
}
catch (IOException e) {
throw new IgniteException(e);
}
break;

default:
throw new IgniteException("Unknown compression: " + compressType);
}

page.position(PageIO.COMMON_HEADER_END).limit(compactSize);
page.put(dst).flip();
assert page.limit() == compactSize;
}
break;

PageIO io = PageIO.getPageIO(page);
case SNAPPY_COMPRESSED_PAGE:
try {
Snappy.uncompress(page, dst);
}
catch (IOException e) {
throw new IgniteException(e);
}
break;

if (io instanceof CompactablePageIO)
((CompactablePageIO)io).restorePage(page, pageSize);
else {
assert compactSize == pageSize
: "Wrong compacted page size [compactSize=" + compactSize + ", pageSize=" + pageSize + ']';
default:
throw new IgniteException("Unknown compression: " + compressType);
}

setCompressionInfo(page, DiskPageCompression.DISABLED, 0, 0);
page.position(PageIO.COMMON_HEADER_END).limit(compactSize);
page.put(dst).flip();
assert page.limit() == compactSize;
}

/** */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,12 @@ public class DataStorageConfiguration implements Serializable {
/** Value used to indicate the use of half of the {@link #getMaxWalArchiveSize}. */
public static final long HALF_MAX_WAL_ARCHIVE_SIZE = -1;

/** Default value for {@link #writeRecoveryDataOnCheckpoint} property. */
public static final boolean DFLT_WRITE_RECOVERY_DATA_ON_CP = false;

/** Default compression algorithm for checkpoint recovery data. */
public static final DiskPageCompression DFLT_CP_RECOVERY_DATA_COMRESSION = DiskPageCompression.SKIP_GARBAGE;

/** Memory page size. */
private int pageSize = IgniteSystemProperties.getInteger(
IGNITE_DEFAULT_DATA_STORAGE_PAGE_SIZE, 0);
Expand Down Expand Up @@ -349,6 +355,19 @@ public class DataStorageConfiguration implements Serializable {
/** Default memory allocator for all data regions. */
@Nullable private MemoryAllocator memoryAllocator = null;

/**
* Mode for storing page recovery data.
* If {@code true}, page recovery data will be written during checkpoint.
* If {@code false}, WAL physical records will be used to store page recovery data.
*/
private boolean writeRecoveryDataOnCheckpoint = DFLT_WRITE_RECOVERY_DATA_ON_CP;

/** Compression algorithm for checkpoint recovery data. */
private DiskPageCompression cpRecoveryDataCompression = DFLT_CP_RECOVERY_DATA_COMRESSION;

/** Compression level for checkpoint recovery data. */
private Integer cpRecoveryDataCompressionLevel;

/**
* Creates valid durable memory configuration with all default values.
*/
Expand Down Expand Up @@ -1404,6 +1423,72 @@ public DataStorageConfiguration setMemoryAllocator(MemoryAllocator allocator) {
return this;
}

/**
* @return Flag defining mode for storing page recovery data. If {@code true}, recovery data will be written
* during checkpoint, if {@code false}, WAL physical records will be used to store recovery data.
*/
public boolean isWriteRecoveryDataOnCheckpoint() {
return writeRecoveryDataOnCheckpoint;
}

/**
* Sets mode for storing page recovery data.
*
* @param writeRecoveryDataOnCheckpoint If {@code true}, page recovery data will be written during checkpoint,
* if {@code false}, WAL physical records will be used to store page recovery data.
* Default is {@link #DFLT_WRITE_RECOVERY_DATA_ON_CP}.
* @return {@code this} for chaining.
*/
public DataStorageConfiguration setWriteRecoveryDataOnCheckpoint(boolean writeRecoveryDataOnCheckpoint) {
this.writeRecoveryDataOnCheckpoint = writeRecoveryDataOnCheckpoint;

return this;
}

/**
* Gets compression algorithm for checkpoint recovery data.
*
* @return Page compression algorithm.
*/
public DiskPageCompression getCheckpointRecoveryDataCompression() {
return cpRecoveryDataCompression == null ? DFLT_CP_RECOVERY_DATA_COMRESSION : cpRecoveryDataCompression;
}

/**
* Sets compression algorithm for checkpoint recovery data.
*
* @param cpRecoveryDataCompression Compression algorithm.
* @return {@code this} for chaining.
*/
public DataStorageConfiguration setCheckpointRecoveryDataCompression(DiskPageCompression cpRecoveryDataCompression) {
this.cpRecoveryDataCompression = cpRecoveryDataCompression;

return this;
}

/**
* Gets {@link #getCheckpointRecoveryDataCompression()} algorithm specific compression level.
*
* @return Checkpoint recovery data compression level or {@code null} for default.
*/
public Integer getCheckpointRecoveryDataCompressionLevel() {
return cpRecoveryDataCompressionLevel;
}

/**
* Sets {@link #setCheckpointRecoveryDataCompression(DiskPageCompression)} algorithm specific compression level.
*
* @param cpRecoveryDataCompressionLevel Checkpoint recovery data compression level or {@code null} to use default.
* {@link DiskPageCompression#ZSTD Zstd}: from {@code -131072} to {@code 22} (default {@code 3}).
* {@link DiskPageCompression#LZ4 LZ4}: from {@code 0} to {@code 17} (default {@code 0}).
* @return {@code this} for chaining.
*/
public DataStorageConfiguration setCheckpointRecoveryDataCompressionLevel(Integer cpRecoveryDataCompressionLevel) {
this.cpRecoveryDataCompressionLevel = cpRecoveryDataCompressionLevel;

return this;
}

/** {@inheritDoc} */
@Override public String toString() {
return S.toString(DataStorageConfiguration.class, this);
Expand Down
Loading

0 comments on commit b7ed255

Please sign in to comment.