Skip to content

Commit

Permalink
IGNITE-20697 Store crash recovery data to checkpoint recovery files - F…
Browse files Browse the repository at this point in the history
…ixes #11024.

Signed-off-by: Aleksey Plekhanov <[email protected]>
  • Loading branch information
alex-plekhanov committed Feb 9, 2025
1 parent 1bd8c8a commit fd6e15d
Show file tree
Hide file tree
Showing 63 changed files with 2,458 additions and 358 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.processors.cache.persistence.db;

import org.apache.ignite.configuration.DiskPageCompression;

/**
* Class containing tests for applying checkpoint recovery data with compression.
*/
public class IgnitePdsCheckpointRecoveryWithCompressionTest extends IgnitePdsCheckpointRecoveryTest {
/** */
@Override protected DiskPageCompression getCompression() {
return DiskPageCompression.SNAPPY;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsCheckpointRecoveryWithCompressionTest;
import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgnitePdsCheckpointSimulationWithRealCpDisabledAndWalCompressionTest;
import org.apache.ignite.internal.processors.cache.persistence.db.wal.WalCompactionAndPageCompressionTest;
import org.apache.ignite.internal.processors.cache.persistence.db.wal.WalRecoveryWithPageCompressionAndTdeTest;
Expand Down Expand Up @@ -64,6 +65,9 @@ public static List<Class<?>> suite() {
suite.add(IgnitePdsCheckpointSimulationWithRealCpDisabledAndWalCompressionTest.class);
suite.add(WalCompactionAndPageCompressionTest.class);

// Checkpoint recovery.
suite.add(IgnitePdsCheckpointRecoveryWithCompressionTest.class);

// Snapshots.
suite.add(SnapshotCompressionBasicTest.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,12 @@ public class DataStorageConfiguration implements Serializable {
/** Value used to indicate the use of half of the {@link #getMaxWalArchiveSize}. */
public static final long HALF_MAX_WAL_ARCHIVE_SIZE = -1;

/** Default value for {@link #writeRecoveryDataOnCheckpoint} property. */
public static final boolean DFLT_WRITE_RECOVERY_DATA_ON_CP = false;

/** Default compression algorithm for checkpoint recovery data. */
public static final DiskPageCompression DFLT_CP_RECOVERY_DATA_COMRESSION = DiskPageCompression.SKIP_GARBAGE;

/** Memory page size. */
private int pageSize = IgniteSystemProperties.getInteger(
IGNITE_DEFAULT_DATA_STORAGE_PAGE_SIZE, 0);
Expand Down Expand Up @@ -339,6 +345,19 @@ public class DataStorageConfiguration implements Serializable {
/** Default memory allocator for all data regions. */
@Nullable private MemoryAllocator memoryAllocator = null;

/**
* Mode for storing page recovery data.
* If {@code true}, page recovery data will be written during checkpoint.
* If {@code false}, WAL physical records will be used to store page recovery data.
*/
private boolean writeRecoveryDataOnCheckpoint = DFLT_WRITE_RECOVERY_DATA_ON_CP;

/** Compression algorithm for checkpoint recovery data. */
private DiskPageCompression cpRecoveryDataCompression = DFLT_CP_RECOVERY_DATA_COMRESSION;

/** Compression level for checkpoint recovery data. */
private Integer cpRecoveryDataCompressionLevel;

/**
* Creates valid durable memory configuration with all default values.
*/
Expand Down Expand Up @@ -1394,6 +1413,72 @@ public DataStorageConfiguration setMemoryAllocator(MemoryAllocator allocator) {
return this;
}

/**
* @return Flag defining mode for storing page recovery data. If {@code true}, recovery data will be written
* during checkpoint, if {@code false}, WAL physical records will be used to store recovery data.
*/
public boolean isWriteRecoveryDataOnCheckpoint() {
return writeRecoveryDataOnCheckpoint;
}

/**
* Sets mode for storing page recovery data.
*
* @param writeRecoveryDataOnCheckpoint If {@code true}, page recovery data will be written during checkpoint,
* if {@code false}, WAL physical records will be used to store page recovery data.
* Default is {@link #DFLT_WRITE_RECOVERY_DATA_ON_CP}.
* @return {@code this} for chaining.
*/
public DataStorageConfiguration setWriteRecoveryDataOnCheckpoint(boolean writeRecoveryDataOnCheckpoint) {
this.writeRecoveryDataOnCheckpoint = writeRecoveryDataOnCheckpoint;

return this;
}

/**
* Gets compression algorithm for checkpoint recovery data.
*
* @return Page compression algorithm.
*/
public DiskPageCompression getCheckpointRecoveryDataCompression() {
return cpRecoveryDataCompression == null ? DFLT_CP_RECOVERY_DATA_COMRESSION : cpRecoveryDataCompression;
}

/**
* Sets compression algorithm for checkpoint recovery data.
*
* @param cpRecoveryDataCompression Compression algorithm.
* @return {@code this} for chaining.
*/
public DataStorageConfiguration setCheckpointRecoveryDataCompression(DiskPageCompression cpRecoveryDataCompression) {
this.cpRecoveryDataCompression = cpRecoveryDataCompression;

return this;
}

/**
* Gets {@link #getCheckpointRecoveryDataCompression()} algorithm specific compression level.
*
* @return Checkpoint recovery data compression level or {@code null} for default.
*/
public Integer getCheckpointRecoveryDataCompressionLevel() {
return cpRecoveryDataCompressionLevel;
}

/**
* Sets {@link #setCheckpointRecoveryDataCompression(DiskPageCompression)} algorithm specific compression level.
*
* @param cpRecoveryDataCompressionLevel Checkpoint recovery data compression level or {@code null} to use default.
* {@link DiskPageCompression#ZSTD Zstd}: from {@code -131072} to {@code 22} (default {@code 3}).
* {@link DiskPageCompression#LZ4 LZ4}: from {@code 0} to {@code 17} (default {@code 0}).
* @return {@code this} for chaining.
*/
public DataStorageConfiguration setCheckpointRecoveryDataCompressionLevel(Integer cpRecoveryDataCompressionLevel) {
this.cpRecoveryDataCompressionLevel = cpRecoveryDataCompressionLevel;

return this;
}

/** {@inheritDoc} */
@Override public String toString() {
return S.toString(DataStorageConfiguration.class, this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1563,13 +1563,15 @@ private long requiredOffheap() {
for (DataRegionConfiguration dataReg : dataRegions) {
res += dataReg.getMaxSize();

res += U.checkpointBufferSize(dataReg);
res += U.checkpointBufferSize(memCfg, dataReg);
}
}

res += memCfg.getDefaultDataRegionConfiguration().getMaxSize();
DataRegionConfiguration dfltDataRegion = memCfg.getDefaultDataRegionConfiguration();

res += U.checkpointBufferSize(memCfg.getDefaultDataRegionConfiguration());
res += dfltDataRegion.getMaxSize();

res += U.checkpointBufferSize(memCfg, dfltDataRegion);

return res;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,12 +208,12 @@ public WALIterator replay(
public int reserved(WALPointer low, WALPointer high);

/**
* Checks WAL disabled for cache group.
* Checks WAL page records disabled.
*
* @param grpId Group id.
* @param pageId Page id.
*/
public boolean disabled(int grpId, long pageId);
public boolean pageRecordsDisabled(int grpId, long pageId);

/**
* Getting local WAL segment size.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1012,13 +1012,16 @@ private WalStateResult awaitCheckpoint(CheckpointProgress cpFut, WalStatePropose
}

/**
* Checks WAL disabled for cache group.
* Checks WAL page records disabled.
*
* @param grpId Group id.
* @param pageId Page id.
* @return {@code True} if WAL disable for group. {@code False} If not.
*/
public boolean isDisabled(int grpId, long pageId) {
public boolean isPageRecordsDisabled(int grpId, long pageId) {
if (cctx.kernalContext().config().getDataStorageConfiguration().isWriteRecoveryDataOnCheckpoint())
return true;

CacheGroupContext ctx = cctx.cache().cacheGroup(grpId);

return ctx != null && (!ctx.walEnabled()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,12 @@ public class DataStorageMetricsImpl {
/** */
private final AtomicLongMetric lastCpSplitAndSortPagesDuration;

/** */
private final AtomicLongMetric lastCpRecoveryDataWriteDuration;

/** */
private final AtomicLongMetric lastCpRecoveryDataSize;

/** */
private final AtomicLongMetric lastCpTotalPages;

Expand Down Expand Up @@ -155,6 +161,9 @@ public class DataStorageMetricsImpl {
/** */
private final HistogramMetricImpl cpSplitAndSortPagesHistogram;

/** */
private final HistogramMetricImpl cpRecoveryDataWriteHistogram;

/** */
private final HistogramMetricImpl cpHistogram;

Expand Down Expand Up @@ -246,6 +255,12 @@ public DataStorageMetricsImpl(
lastCpSplitAndSortPagesDuration = mreg.longMetric("LastCheckpointSplitAndSortPagesDuration",
"Duration of splitting and sorting checkpoint pages of the last checkpoint in milliseconds.");

lastCpRecoveryDataWriteDuration = mreg.longMetric("LastCheckpointRecoveryDataWriteDuration",
"Duration of checkpoint recovery data write in milliseconds.");

lastCpRecoveryDataSize = mreg.longMetric("LastCheckpointRecoveryDataSize",
"Size of checkpoint recovery data in bytes.");

lastCpTotalPages = mreg.longMetric("LastCheckpointTotalPagesNumber",
"Total number of pages written during the last checkpoint.");

Expand Down Expand Up @@ -312,6 +327,9 @@ public DataStorageMetricsImpl(
cpSplitAndSortPagesHistogram = mreg.histogram("CheckpointSplitAndSortPagesHistogram", cpBounds,
"Histogram of splitting and sorting checkpoint pages duration in milliseconds.");

cpRecoveryDataWriteHistogram = mreg.histogram("CheckpointRecoveryDataWriteHistogram", cpBounds,
"Histogram of checkpoint recovery data write duration in milliseconds.");

cpHistogram = mreg.histogram("CheckpointHistogram", cpBounds,
"Histogram of checkpoint duration in milliseconds.");

Expand Down Expand Up @@ -672,11 +690,13 @@ public boolean metricsEnabled() {
* @param walRecordFsyncDuration Duration of WAL fsync after logging {@link CheckpointRecord} on checkpoint begin.
* @param writeEntryDuration Duration of checkpoint entry buffer writing to file.
* @param splitAndSortPagesDuration Duration of splitting and sorting checkpoint pages.
* @param recoveryDataWriteDuration Recovery data write duration.
* @param duration Total checkpoint duration.
* @param start Checkpoint start time.
* @param totalPages Total number of all pages in checkpoint.
* @param dataPages Total number of data pages in checkpoint.
* @param cowPages Total number of COW-ed pages in checkpoint.
* @param recoveryDataSize Recovery data size, in bytes.
* @param storageSize Storage space allocated, in bytes.
* @param sparseStorageSize Storage space allocated adjusted for possible sparsity, in bytes.
*/
Expand All @@ -691,11 +711,13 @@ public void onCheckpoint(
long walRecordFsyncDuration,
long writeEntryDuration,
long splitAndSortPagesDuration,
long recoveryDataWriteDuration,
long duration,
long start,
long totalPages,
long dataPages,
long cowPages,
long recoveryDataSize,
long storageSize,
long sparseStorageSize
) {
Expand All @@ -712,11 +734,13 @@ public void onCheckpoint(
lastCpWalRecordFsyncDuration.value(walRecordFsyncDuration);
lastCpWriteEntryDuration.value(writeEntryDuration);
lastCpSplitAndSortPagesDuration.value(splitAndSortPagesDuration);
lastCpRecoveryDataWriteDuration.value(recoveryDataWriteDuration);
lastCpDuration.value(duration);
lastCpStart.value(start);
lastCpTotalPages.value(totalPages);
lastCpDataPages.value(dataPages);
lastCpCowPages.value(cowPages);
lastCpRecoveryDataSize.value(recoveryDataSize);
this.storageSize.value(storageSize);
this.sparseStorageSize.value(sparseStorageSize);

Expand All @@ -732,6 +756,7 @@ public void onCheckpoint(
cpWalRecordFsyncHistogram.value(walRecordFsyncDuration);
cpWriteEntryHistogram.value(writeEntryDuration);
cpSplitAndSortPagesHistogram.value(splitAndSortPagesDuration);
cpRecoveryDataWriteHistogram.value(recoveryDataWriteDuration);
cpHistogram.value(duration);
}

Expand Down
Loading

0 comments on commit fd6e15d

Please sign in to comment.