Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-20697 Move page recovery data from WAL #11024

Closed
wants to merge 8 commits into from
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.processors.cache.persistence.db;

import org.apache.ignite.configuration.DiskPageCompression;

/**
* Class containing tests for applying checkpoint recovery data with compression.
*/
public class IgnitePdsCheckpointRecoveryWithCompressionTest extends IgnitePdsCheckpointRecoveryTest {
/** */
@Override protected DiskPageCompression getCompression() {
return DiskPageCompression.SNAPPY;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsCheckpointRecoveryWithCompressionTest;
import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgnitePdsCheckpointSimulationWithRealCpDisabledAndWalCompressionTest;
import org.apache.ignite.internal.processors.cache.persistence.db.wal.WalCompactionAndPageCompressionTest;
import org.apache.ignite.internal.processors.cache.persistence.db.wal.WalRecoveryWithPageCompressionAndTdeTest;
Expand Down Expand Up @@ -64,6 +65,9 @@ public static List<Class<?>> suite() {
suite.add(IgnitePdsCheckpointSimulationWithRealCpDisabledAndWalCompressionTest.class);
suite.add(WalCompactionAndPageCompressionTest.class);

// Checkpoint recovery.
suite.add(IgnitePdsCheckpointRecoveryWithCompressionTest.class);

// Snapshots.
suite.add(SnapshotCompressionBasicTest.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,12 @@ public class DataStorageConfiguration implements Serializable {
/** Value used to indicate the use of half of the {@link #getMaxWalArchiveSize}. */
public static final long HALF_MAX_WAL_ARCHIVE_SIZE = -1;

/** Default value for {@link #writeRecoveryDataOnCheckpoint} property. */
public static final boolean DFLT_WRITE_RECOVERY_DATA_ON_CP = false;

/** Default compression algorithm for checkpoint recovery data. */
public static final DiskPageCompression DFLT_CP_RECOVERY_DATA_COMRESSION = DiskPageCompression.SKIP_GARBAGE;

/** Memory page size. */
private int pageSize = IgniteSystemProperties.getInteger(
IGNITE_DEFAULT_DATA_STORAGE_PAGE_SIZE, 0);
Expand Down Expand Up @@ -339,6 +345,19 @@ public class DataStorageConfiguration implements Serializable {
/** Default memory allocator for all data regions. */
@Nullable private MemoryAllocator memoryAllocator = null;

/**
* Mode for storing page recovery data.
* If {@code true}, page recovery data will be written during checkpoint.
* If {@code false}, WAL physical records will be used to store page recovery data.
*/
private boolean writeRecoveryDataOnCheckpoint = DFLT_WRITE_RECOVERY_DATA_ON_CP;

/** Compression algorithm for checkpoint recovery data. */
private DiskPageCompression cpRecoveryDataCompression = DFLT_CP_RECOVERY_DATA_COMRESSION;

/** Compression level for checkpoint recovery data. */
private Integer cpRecoveryDataCompressionLevel;

/**
* Creates valid durable memory configuration with all default values.
*/
Expand Down Expand Up @@ -1394,6 +1413,72 @@ public DataStorageConfiguration setMemoryAllocator(MemoryAllocator allocator) {
return this;
}

/**
* @return Flag defining mode for storing page recovery data. If {@code true}, recovery data will be written
* during checkpoint, if {@code false}, WAL physical records will be used to store recovery data.
*/
public boolean isWriteRecoveryDataOnCheckpoint() {
return writeRecoveryDataOnCheckpoint;
}

/**
* Sets mode for storing page recovery data.
*
* @param writeRecoveryDataOnCheckpoint If {@code true}, page recovery data will be written during checkpoint,
* if {@code false}, WAL physical records will be used to store page recovery data.
* Default is {@link #DFLT_WRITE_RECOVERY_DATA_ON_CP}.
* @return {@code this} for chaining.
*/
public DataStorageConfiguration setWriteRecoveryDataOnCheckpoint(boolean writeRecoveryDataOnCheckpoint) {
this.writeRecoveryDataOnCheckpoint = writeRecoveryDataOnCheckpoint;

return this;
}

/**
* Gets compression algorithm for checkpoint recovery data.
*
* @return Page compression algorithm.
*/
public DiskPageCompression getCheckpointRecoveryDataCompression() {
return cpRecoveryDataCompression == null ? DFLT_CP_RECOVERY_DATA_COMRESSION : cpRecoveryDataCompression;
}

/**
* Sets compression algorithm for checkpoint recovery data.
*
* @param cpRecoveryDataCompression Compression algorithm.
* @return {@code this} for chaining.
*/
public DataStorageConfiguration setCheckpointRecoveryDataCompression(DiskPageCompression cpRecoveryDataCompression) {
this.cpRecoveryDataCompression = cpRecoveryDataCompression;

return this;
}

/**
* Gets {@link #getCheckpointRecoveryDataCompression()} algorithm specific compression level.
*
* @return Checkpoint recovery data compression level or {@code null} for default.
*/
public Integer getCheckpointRecoveryDataCompressionLevel() {
return cpRecoveryDataCompressionLevel;
}

/**
* Sets {@link #setCheckpointRecoveryDataCompression(DiskPageCompression)} algorithm specific compression level.
*
* @param cpRecoveryDataCompressionLevel Checkpoint recovery data compression level or {@code null} to use default.
* {@link DiskPageCompression#ZSTD Zstd}: from {@code -131072} to {@code 22} (default {@code 3}).
* {@link DiskPageCompression#LZ4 LZ4}: from {@code 0} to {@code 17} (default {@code 0}).
* @return {@code this} for chaining.
*/
public DataStorageConfiguration setCheckpointRecoveryDataCompressionLevel(Integer cpRecoveryDataCompressionLevel) {
this.cpRecoveryDataCompressionLevel = cpRecoveryDataCompressionLevel;

return this;
}

/** {@inheritDoc} */
@Override public String toString() {
return S.toString(DataStorageConfiguration.class, this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1563,13 +1563,15 @@ private long requiredOffheap() {
for (DataRegionConfiguration dataReg : dataRegions) {
res += dataReg.getMaxSize();

res += U.checkpointBufferSize(dataReg);
res += U.checkpointBufferSize(memCfg, dataReg);
}
}

res += memCfg.getDefaultDataRegionConfiguration().getMaxSize();
DataRegionConfiguration dfltDataRegion = memCfg.getDefaultDataRegionConfiguration();

res += U.checkpointBufferSize(memCfg.getDefaultDataRegionConfiguration());
res += dfltDataRegion.getMaxSize();

res += U.checkpointBufferSize(memCfg, dfltDataRegion);

return res;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,12 +208,12 @@ public WALIterator replay(
public int reserved(WALPointer low, WALPointer high);

/**
* Checks WAL disabled for cache group.
* Checks WAL page records disabled.
*
* @param grpId Group id.
* @param pageId Page id.
*/
public boolean disabled(int grpId, long pageId);
public boolean pageRecordsDisabled(int grpId, long pageId);

/**
* Getting local WAL segment size.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1012,13 +1012,16 @@ private WalStateResult awaitCheckpoint(CheckpointProgress cpFut, WalStatePropose
}

/**
* Checks WAL disabled for cache group.
* Checks WAL page records disabled.
*
* @param grpId Group id.
* @param pageId Page id.
* @return {@code True} if WAL disable for group. {@code False} If not.
*/
public boolean isDisabled(int grpId, long pageId) {
public boolean isPageRecordsDisabled(int grpId, long pageId) {
if (cctx.kernalContext().config().getDataStorageConfiguration().isWriteRecoveryDataOnCheckpoint())
return true;

CacheGroupContext ctx = cctx.cache().cacheGroup(grpId);

return ctx != null && (!ctx.walEnabled()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,12 @@ public class DataStorageMetricsImpl {
/** */
private final AtomicLongMetric lastCpSplitAndSortPagesDuration;

/** */
private final AtomicLongMetric lastCpRecoveryDataWriteDuration;

/** */
private final AtomicLongMetric lastCpRecoveryDataSize;

/** */
private final AtomicLongMetric lastCpTotalPages;

Expand Down Expand Up @@ -155,6 +161,9 @@ public class DataStorageMetricsImpl {
/** */
private final HistogramMetricImpl cpSplitAndSortPagesHistogram;

/** */
private final HistogramMetricImpl cpRecoveryDataWriteHistogram;

/** */
private final HistogramMetricImpl cpHistogram;

Expand Down Expand Up @@ -246,6 +255,12 @@ public DataStorageMetricsImpl(
lastCpSplitAndSortPagesDuration = mreg.longMetric("LastCheckpointSplitAndSortPagesDuration",
"Duration of splitting and sorting checkpoint pages of the last checkpoint in milliseconds.");

lastCpRecoveryDataWriteDuration = mreg.longMetric("LastCheckpointRecoveryDataWriteDuration",
"Duration of checkpoint recovery data write in milliseconds.");

lastCpRecoveryDataSize = mreg.longMetric("LastCheckpointRecoveryDataSize",
"Size of checkpoint recovery data in bytes.");

lastCpTotalPages = mreg.longMetric("LastCheckpointTotalPagesNumber",
"Total number of pages written during the last checkpoint.");

Expand Down Expand Up @@ -312,6 +327,9 @@ public DataStorageMetricsImpl(
cpSplitAndSortPagesHistogram = mreg.histogram("CheckpointSplitAndSortPagesHistogram", cpBounds,
"Histogram of splitting and sorting checkpoint pages duration in milliseconds.");

cpRecoveryDataWriteHistogram = mreg.histogram("CheckpointRecoveryDataWriteHistogram", cpBounds,
"Histogram of checkpoint recovery data write duration in milliseconds.");

cpHistogram = mreg.histogram("CheckpointHistogram", cpBounds,
"Histogram of checkpoint duration in milliseconds.");

Expand Down Expand Up @@ -672,11 +690,13 @@ public boolean metricsEnabled() {
* @param walRecordFsyncDuration Duration of WAL fsync after logging {@link CheckpointRecord} on checkpoint begin.
* @param writeEntryDuration Duration of checkpoint entry buffer writing to file.
* @param splitAndSortPagesDuration Duration of splitting and sorting checkpoint pages.
* @param recoveryDataWriteDuration Recovery data write duration.
* @param duration Total checkpoint duration.
* @param start Checkpoint start time.
* @param totalPages Total number of all pages in checkpoint.
* @param dataPages Total number of data pages in checkpoint.
* @param cowPages Total number of COW-ed pages in checkpoint.
* @param recoveryDataSize Recovery data size, in bytes.
* @param storageSize Storage space allocated, in bytes.
* @param sparseStorageSize Storage space allocated adjusted for possible sparsity, in bytes.
*/
Expand All @@ -691,11 +711,13 @@ public void onCheckpoint(
long walRecordFsyncDuration,
long writeEntryDuration,
long splitAndSortPagesDuration,
long recoveryDataWriteDuration,
long duration,
long start,
long totalPages,
long dataPages,
long cowPages,
long recoveryDataSize,
long storageSize,
long sparseStorageSize
) {
Expand All @@ -712,11 +734,13 @@ public void onCheckpoint(
lastCpWalRecordFsyncDuration.value(walRecordFsyncDuration);
lastCpWriteEntryDuration.value(writeEntryDuration);
lastCpSplitAndSortPagesDuration.value(splitAndSortPagesDuration);
lastCpRecoveryDataWriteDuration.value(recoveryDataWriteDuration);
lastCpDuration.value(duration);
lastCpStart.value(start);
lastCpTotalPages.value(totalPages);
lastCpDataPages.value(dataPages);
lastCpCowPages.value(cowPages);
lastCpRecoveryDataSize.value(recoveryDataSize);
this.storageSize.value(storageSize);
this.sparseStorageSize.value(sparseStorageSize);

Expand All @@ -732,6 +756,7 @@ public void onCheckpoint(
cpWalRecordFsyncHistogram.value(walRecordFsyncDuration);
cpWriteEntryHistogram.value(writeEntryDuration);
cpSplitAndSortPagesHistogram.value(splitAndSortPagesDuration);
cpRecoveryDataWriteHistogram.value(recoveryDataWriteDuration);
cpHistogram.value(duration);
}

Expand Down
Loading