Skip to content

Commit

Permalink
IGNITE-20697 Store crash recovery data to checkpoint recovery files
Browse files Browse the repository at this point in the history
  • Loading branch information
alex-plekhanov committed Dec 12, 2023
1 parent 34b9616 commit 4516c86
Show file tree
Hide file tree
Showing 66 changed files with 2,573 additions and 564 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@
import org.apache.ignite.configuration.DiskPageCompression;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.ThreadLocalDirectByteBuffer;
import org.apache.ignite.internal.pagemem.PageUtils;
import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.CompactablePageIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.internal.util.typedef.internal.U;
Expand All @@ -41,16 +39,12 @@
import static java.nio.file.StandardOpenOption.CREATE;
import static java.nio.file.StandardOpenOption.WRITE;
import static org.apache.ignite.configuration.DataStorageConfiguration.MAX_PAGE_SIZE;
import static org.apache.ignite.configuration.DiskPageCompression.SKIP_GARBAGE;
import static org.apache.ignite.internal.util.GridUnsafe.NATIVE_BYTE_ORDER;

/**
* Compression processor.
*/
public class CompressionProcessorImpl extends CompressionProcessor {
/** Max page size. */
private final ThreadLocalDirectByteBuffer compactBuf = new ThreadLocalDirectByteBuffer(MAX_PAGE_SIZE, NATIVE_BYTE_ORDER);

/** A bit more than max page size, extra space is required by compressors. */
private final ThreadLocalDirectByteBuffer compressBuf =
new ThreadLocalDirectByteBuffer(maxCompressedBufferSize(MAX_PAGE_SIZE), NATIVE_BYTE_ORDER);
Expand Down Expand Up @@ -92,84 +86,6 @@ public CompressionProcessorImpl(GridKernalContext ctx) {
checkPunchHole(storagePath, fsBlockSize);
}

/** {@inheritDoc} */
@Override public ByteBuffer compressPage(
ByteBuffer page,
int pageSize,
int blockSize,
DiskPageCompression compression,
int compressLevel
) throws IgniteCheckedException {
assert compression != null && compression != DiskPageCompression.DISABLED : compression;
assert U.isPow2(blockSize) : blockSize;
assert page.position() == 0 && page.limit() >= pageSize;

int oldPageLimit = page.limit();

try {
// Page size will be less than page limit when TDE is enabled. To make compaction and compression work
// correctly we need to set limit to real page size.
page.limit(pageSize);

ByteBuffer compactPage = doCompactPage(page, pageSize);

int compactSize = compactPage.limit();

assert compactSize <= pageSize : compactSize;

// If no need to compress further or configured just to skip garbage.
if (compactSize < blockSize || compression == SKIP_GARBAGE)
return setCompactionInfo(compactPage, compactSize);

ByteBuffer compressedPage = doCompressPage(compression, compactPage, compactSize, compressLevel);

assert compressedPage.position() == 0;
int compressedSize = compressedPage.limit();

int freeCompactBlocks = (pageSize - compactSize) / blockSize;
int freeCompressedBlocks = (pageSize - compressedSize) / blockSize;

if (freeCompactBlocks >= freeCompressedBlocks) {
if (freeCompactBlocks == 0)
return page; // No blocks will be released.

return setCompactionInfo(compactPage, compactSize);
}

return setCompressionInfo(compressedPage, compression, compressedSize, compactSize);
}
finally {
page.limit(oldPageLimit);
}
}

/**
* @param page Page buffer.
* @param pageSize Page size.
* @return Compacted page buffer.
*/
private ByteBuffer doCompactPage(ByteBuffer page, int pageSize) throws IgniteCheckedException {
PageIO io = PageIO.getPageIO(page);

ByteBuffer compactPage = compactBuf.get();

if (io instanceof CompactablePageIO) {
// Drop the garbage from the page.
((CompactablePageIO)io).compactPage(page, compactPage, pageSize);
}
else {
// Direct buffer is required as output of this method.
if (page.isDirect())
return page;

PageUtils.putBytes(GridUnsafe.bufferAddress(compactPage), 0, page.array());

compactPage.limit(pageSize);
}

return compactPage;
}

/** Check if filesystem actually supports punching holes. */
private void checkPunchHole(Path storagePath, int fsBlockSz) throws IgniteException {
ByteBuffer buf = null;
Expand Down Expand Up @@ -198,41 +114,19 @@ private void checkPunchHole(Path storagePath, int fsBlockSz) throws IgniteExcept
}
}

/**
* @param page Page.
* @param compactSize Compacted page size.
* @return The given page.
*/
private static ByteBuffer setCompactionInfo(ByteBuffer page, int compactSize) {
return setCompressionInfo(page, SKIP_GARBAGE, compactSize, compactSize);
}

/**
* @param page Page.
* @param compression Compression algorithm.
* @param compressedSize Compressed size.
* @param compactedSize Compact size.
* @return The given page.
*/
private static ByteBuffer setCompressionInfo(ByteBuffer page, DiskPageCompression compression, int compressedSize, int compactedSize) {
assert compressedSize >= 0 && compressedSize <= Short.MAX_VALUE : compressedSize;
assert compactedSize >= 0 && compactedSize <= Short.MAX_VALUE : compactedSize;

PageIO.setCompressionType(page, getCompressionType(compression));
PageIO.setCompressedSize(page, (short)compressedSize);
PageIO.setCompactedSize(page, (short)compactedSize);

return page;
}

/**
* @param compression Compression algorithm.
* @param compactPage Compacted page.
* @param compactSize Compacted page size.
* @param compressLevel Compression level.
* @return Compressed page.
*/
private ByteBuffer doCompressPage(DiskPageCompression compression, ByteBuffer compactPage, int compactSize, int compressLevel) {
@Override protected ByteBuffer doCompressPage(
DiskPageCompression compression,
ByteBuffer compactPage,
int compactSize,
int compressLevel
) {
switch (compression) {
case ZSTD:
return compressPageZstd(compactPage, compactSize, compressLevel);
Expand Down Expand Up @@ -319,99 +213,46 @@ private static void copyPageHeader(ByteBuffer compactPage, ByteBuffer compressed
compactPage.limit(compactSize);
}

/**
* @param compression Compression.
* @return Level.
*/
private static byte getCompressionType(DiskPageCompression compression) {
if (compression == DiskPageCompression.DISABLED)
return UNCOMPRESSED_PAGE;

switch (compression) {
case ZSTD:
return ZSTD_COMPRESSED_PAGE;

case LZ4:
return LZ4_COMPRESSED_PAGE;

case SNAPPY:
return SNAPPY_COMPRESSED_PAGE;

case SKIP_GARBAGE:
return COMPACTED_PAGE;
}
throw new IllegalStateException("Unexpected compression: " + compression);
}

/** {@inheritDoc} */
@Override public void decompressPage(ByteBuffer page, int pageSize) throws IgniteCheckedException {
assert page.capacity() >= pageSize : "capacity=" + page.capacity() + ", pageSize=" + pageSize;

byte compressType = PageIO.getCompressionType(page);

if (compressType == UNCOMPRESSED_PAGE)
return; // Nothing to do.
@Override protected void doDecompressPage(int compressType, ByteBuffer page, int compressedSize, int compactSize) {
ByteBuffer dst = compressBuf.get();

short compressedSize = PageIO.getCompressedSize(page);
short compactSize = PageIO.getCompactedSize(page);
// Position on a part that needs to be decompressed.
page.limit(compressedSize)
.position(PageIO.COMMON_HEADER_END);

assert compactSize <= pageSize && compactSize >= compressedSize;
// LZ4 needs this limit to be exact.
dst.limit(compactSize - PageIO.COMMON_HEADER_END);

if (compressType == COMPACTED_PAGE) {
// Just setup bounds before restoring the page.
page.position(0).limit(compactSize);
}
else {
ByteBuffer dst = compressBuf.get();

// Position on a part that needs to be decompressed.
page.limit(compressedSize)
.position(PageIO.COMMON_HEADER_END);

// LZ4 needs this limit to be exact.
dst.limit(compactSize - PageIO.COMMON_HEADER_END);

switch (compressType) {
case ZSTD_COMPRESSED_PAGE:
Zstd.decompress(dst, page);
dst.flip();

break;
switch (compressType) {
case ZSTD_COMPRESSED_PAGE:
Zstd.decompress(dst, page);
dst.flip();

case LZ4_COMPRESSED_PAGE:
Lz4.decompress(page, dst);
dst.flip();
break;

break;
case LZ4_COMPRESSED_PAGE:
Lz4.decompress(page, dst);
dst.flip();

case SNAPPY_COMPRESSED_PAGE:
try {
Snappy.uncompress(page, dst);
}
catch (IOException e) {
throw new IgniteException(e);
}
break;

default:
throw new IgniteException("Unknown compression: " + compressType);
}

page.position(PageIO.COMMON_HEADER_END).limit(compactSize);
page.put(dst).flip();
assert page.limit() == compactSize;
}
break;

PageIO io = PageIO.getPageIO(page);
case SNAPPY_COMPRESSED_PAGE:
try {
Snappy.uncompress(page, dst);
}
catch (IOException e) {
throw new IgniteException(e);
}
break;

if (io instanceof CompactablePageIO)
((CompactablePageIO)io).restorePage(page, pageSize);
else {
assert compactSize == pageSize
: "Wrong compacted page size [compactSize=" + compactSize + ", pageSize=" + pageSize + ']';
default:
throw new IgniteException("Unknown compression: " + compressType);
}

setCompressionInfo(page, DiskPageCompression.DISABLED, 0, 0);
page.position(PageIO.COMMON_HEADER_END).limit(compactSize);
page.put(dst).flip();
assert page.limit() == compactSize;
}

/** */
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.processors.cache.persistence.db;

import org.apache.ignite.configuration.DiskPageCompression;

/**
* Class containing tests for applying checkpoint recovery data with compression.
*/
public class IgnitePdsCheckpointRecoveryWithCompressionTest extends IgnitePdsCheckpointRecoveryTest {
/** */
@Override protected DiskPageCompression getCompression() {
return DiskPageCompression.SNAPPY;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsCheckpointRecoveryWithCompressionTest;
import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgnitePdsCheckpointSimulationWithRealCpDisabledAndWalCompressionTest;
import org.apache.ignite.internal.processors.cache.persistence.db.wal.WalCompactionAndPageCompressionTest;
import org.apache.ignite.internal.processors.cache.persistence.db.wal.WalRecoveryWithPageCompressionAndTdeTest;
Expand Down Expand Up @@ -64,6 +65,9 @@ public static List<Class<?>> suite() {
suite.add(IgnitePdsCheckpointSimulationWithRealCpDisabledAndWalCompressionTest.class);
suite.add(WalCompactionAndPageCompressionTest.class);

// Checkpoint recovery.
suite.add(IgnitePdsCheckpointRecoveryWithCompressionTest.class);

// Snapshots.
suite.add(SnapshotCompressionBasicTest.class);

Expand Down
Loading

0 comments on commit 4516c86

Please sign in to comment.