Skip to content

Commit

Permalink
IGNITE-20697 Store crash recovery data to checkpoint recovery files
Browse files Browse the repository at this point in the history
  • Loading branch information
alex-plekhanov committed Nov 29, 2023
1 parent 871db13 commit 619220d
Show file tree
Hide file tree
Showing 12 changed files with 160 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1576,13 +1576,13 @@ private long requiredOffheap() {
for (DataRegionConfiguration dataReg : dataRegions) {
res += dataReg.getMaxSize();

res += U.checkpointBufferSize(dataReg);
res += U.checkpointBufferSize(memCfg, dataReg);
}
}

res += memCfg.getDefaultDataRegionConfiguration().getMaxSize();

res += U.checkpointBufferSize(memCfg.getDefaultDataRegionConfiguration());
res += U.checkpointBufferSize(memCfg, memCfg.getDefaultDataRegionConfiguration());

return res;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1192,7 +1192,7 @@ private long[] calculateFragmentSizes(String regionName, int concLvl, long cache
long cacheSize = regCfg.getMaxSize();

// Checkpoint buffer size can not be greater than cache size, it does not make sense.
long chpBufSize = checkpointBufferSize(regCfg);
long chpBufSize = checkpointBufferSize(dsCfg, regCfg);

if (chpBufSize > cacheSize) {
U.quietAndInfo(log,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,10 @@ public void reason(String reason) {
@Override public void updateEvictedPages(int delta) {
A.ensure(delta > 0, "param must be positive");

if (evictedPagesCounter() != null)
evictedPagesCounter().addAndGet(delta);
AtomicInteger cntr = evictedPagesCounter();

if (cntr != null)
cntr.addAndGet(delta);
}

/** {@inheritDoc} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,18 @@ class CheckpointBufferOverflowWatchdog {
* {@link PagesWriteThrottlePolicy#CP_BUF_FILL_THRESHOLD} of the buffer is filled) and, hence, writer threads need
* to be throttled.
*
* @return {@code true} iff Checkpoint Buffer is in danger zone
* @return {@code true} if Checkpoint Buffer is in danger zone
*/
boolean isInDangerZone() {
int checkpointBufLimit = (int)(pageMemory.checkpointBufferPagesSize() * CP_BUF_FILL_THRESHOLD);

return pageMemory.checkpointBufferPagesCount() > checkpointBufLimit;
}

/**
* @return Checkpoint Buffer fill rate.
*/
double fillRate() {
return (double)pageMemory.checkpointBufferPagesCount() / pageMemory.checkpointBufferPagesSize();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
/**
* Logic used to protect memory (mainly, Checkpoint Buffer) from exhaustion using exponential backoff.
*/
class ExponentialBackoffThrottlingStrategy {
class ExponentialBackoffThrottlingStrategy implements ThrottlingStrategy {
/**
* Starting throttle time. Limits write speed to 1000 MB/s.
*/
Expand All @@ -36,21 +36,13 @@ class ExponentialBackoffThrottlingStrategy {
*/
private final ExponentialBackoff backoff = new ExponentialBackoff(STARTING_THROTTLE_NANOS, BACKOFF_RATIO);

/**
* Computes next duration (in nanos) to throttle a thread to protect Checkpoint Buffer.
*
* @return park time in nanos
*/
long protectionParkTime() {
/** {@inheritDoc} */
@Override public long protectionParkTime() {
return backoff.nextDuration();
}

/**
* Resets the backoff counter. Invoked when no throttling is needed anymore.
*
* @return {@code true} iff the backoff was not already in a reset state
*/
boolean resetBackoff() {
/** {@inheritDoc} */
@Override public boolean reset() {
return backoff.reset();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.processors.cache.persistence.pagemem;

import java.util.concurrent.atomic.AtomicBoolean;

import static org.apache.ignite.internal.processors.cache.persistence.pagemem.PagesWriteThrottlePolicy.CP_BUF_FILL_THRESHOLD;

/**
* Logic used to protect memory (Checkpoint Buffer) from exhaustion using throttling duration based on storage fill rate.
*/
class FillRateBasedThrottlingStrategy implements ThrottlingStrategy {
/**
* Minimum throttle time. 10 microseconds.
*/
private static final long MIN_THROTTLE_NANOS = 10_000L;

/**
* Maximum throttle time. 1 second.
*/
private static final long MAX_THROTTLE_NANOS = 1_000_000_000L;

/**
* The exponent to calculate park time.
*/
private static final double POW = Math.log((double)MAX_THROTTLE_NANOS / MIN_THROTTLE_NANOS);

/** */
private final CheckpointBufferOverflowWatchdog cpBufOverflowWatchdog;

/** */
private final AtomicBoolean throttlingStarted = new AtomicBoolean();

/** */
FillRateBasedThrottlingStrategy(CheckpointBufferOverflowWatchdog watchdog) {
cpBufOverflowWatchdog = watchdog;
}

/** {@inheritDoc} */
@Override public long protectionParkTime() {
double fillRate = cpBufOverflowWatchdog.fillRate();

if (fillRate < CP_BUF_FILL_THRESHOLD)
return 0;

throttlingStarted.set(true);

return (long)(Math.exp(POW * (fillRate - CP_BUF_FILL_THRESHOLD) / (1 - CP_BUF_FILL_THRESHOLD)) * MIN_THROTTLE_NANOS);
}

/** {@inheritDoc} */
@Override public boolean reset() {
return throttlingStarted.compareAndSet(true, false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -427,12 +427,19 @@ public PageMemoryImpl(
* Resolves instance of {@link PagesWriteThrottlePolicy} according to chosen throttle policy.
*/
private void initWriteThrottle() {
boolean fillRateBasedCpBufProtection = ctx.kernalContext().config().getDataStorageConfiguration()
.isWriteRecoveryDataOnCheckpoint();

if (throttlingPlc == ThrottlingPolicy.SPEED_BASED)
writeThrottle = new PagesWriteSpeedBasedThrottle(this, cpProgressProvider, stateChecker, log);
else if (throttlingPlc == ThrottlingPolicy.TARGET_RATIO_BASED)
writeThrottle = new PagesWriteThrottle(this, cpProgressProvider, stateChecker, false, log);
else if (throttlingPlc == ThrottlingPolicy.CHECKPOINT_BUFFER_ONLY)
writeThrottle = new PagesWriteThrottle(this, null, stateChecker, true, log);
else if (throttlingPlc == ThrottlingPolicy.TARGET_RATIO_BASED) {
writeThrottle = new PagesWriteThrottle(this, cpProgressProvider, stateChecker,
false, fillRateBasedCpBufProtection, log);
}
else if (throttlingPlc == ThrottlingPolicy.CHECKPOINT_BUFFER_ONLY) {
writeThrottle = new PagesWriteThrottle(this, null, stateChecker,
true, fillRateBasedCpBufProtection, log);
}
}

/** {@inheritDoc} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ private long computeThrottlingParkTime(boolean isPageInCheckpoint, long curNanoT
if (isPageInCheckpoint) {
// The fact that we are here means that we checked whether CP Buffer is in danger zone and found that
// it is ok, so its protector may relax, hence we reset it.
cpBufferProtector.resetBackoff();
cpBufferProtector.reset();
}
return cleanPagesProtector.protectionParkTime(curNanoTime);
}
Expand Down Expand Up @@ -230,7 +230,7 @@ long getCleanPagesProtectionParkTime(

/** {@inheritDoc} */
@Override public void onFinishCheckpoint() {
cpBufferProtector.resetBackoff();
cpBufferProtector.reset();

cleanPagesProtector.finish();
markSpeedAndAvgParkTime.finishInterval();
Expand Down Expand Up @@ -306,7 +306,7 @@ public double throttleWeight() {
/** {@inheritDoc} */
@Override public void wakeupThrottledThreads() {
if (!isCpBufferOverflowThresholdExceeded()) {
cpBufferProtector.resetBackoff();
cpBufferProtector.reset();

unparkParkedThreads();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@ public class PagesWriteThrottle implements PagesWriteThrottlePolicy {
private final CheckpointLockStateChecker stateChecker;

/** In-checkpoint protection logic. */
private final ExponentialBackoffThrottlingStrategy inCheckpointProtection
= new ExponentialBackoffThrottlingStrategy();
private final ThrottlingStrategy inCheckpointProtection;

/** Not-in-checkpoint protection logic. */
private final ExponentialBackoffThrottlingStrategy notInCheckpointProtection
Expand All @@ -64,12 +63,15 @@ public class PagesWriteThrottle implements PagesWriteThrottlePolicy {
* @param cpProgress Database manager.
* @param stateChecker checkpoint lock state checker.
* @param throttleOnlyPagesInCheckpoint If true, throttle will only protect from checkpoint buffer overflow.
* @param fillRateBasedCpBufProtection If true, fill rate based throttling will be used to protect from
* checkpoint buffer overflow.
* @param log Logger.
*/
public PagesWriteThrottle(PageMemoryImpl pageMemory,
IgniteOutClosure<CheckpointProgress> cpProgress,
CheckpointLockStateChecker stateChecker,
boolean throttleOnlyPagesInCheckpoint,
boolean fillRateBasedCpBufProtection,
IgniteLogger log
) {
this.pageMemory = pageMemory;
Expand All @@ -78,6 +80,8 @@ public PagesWriteThrottle(PageMemoryImpl pageMemory,
this.throttleOnlyPagesInCheckpoint = throttleOnlyPagesInCheckpoint;
cpBufferWatchdog = new CheckpointBufferOverflowWatchdog(pageMemory);
this.log = log;
inCheckpointProtection = fillRateBasedCpBufProtection ? new FillRateBasedThrottlingStrategy(cpBufferWatchdog) :
new ExponentialBackoffThrottlingStrategy();

assert throttleOnlyPagesInCheckpoint || cpProgress != null
: "cpProgress must be not null if ratio based throttling mode is used";
Expand All @@ -95,11 +99,13 @@ public PagesWriteThrottle(PageMemoryImpl pageMemory,
if (!shouldThrottle && !throttleOnlyPagesInCheckpoint) {
CheckpointProgress progress = cpProgress.apply();

AtomicInteger writtenPagesCntr = progress == null ? null : cpProgress.apply().writtenPagesCounter();
AtomicInteger writtenPagesCntr = progress == null ? null : progress.writtenPagesCounter();
AtomicInteger writtenRecoveryPagesCntr = progress == null ? null : progress.writtenRecoveryPagesCounter();

if (progress == null || writtenPagesCntr == null)
if (progress == null || writtenPagesCntr == null || writtenRecoveryPagesCntr == null)
return; // Don't throttle if checkpoint is not running.

int cpWrittenRecoveryPages = writtenRecoveryPagesCntr.get();
int cpWrittenPages = writtenPagesCntr.get();

int cpTotalPages = progress.currentCheckpointPagesCount();
Expand All @@ -109,7 +115,8 @@ public PagesWriteThrottle(PageMemoryImpl pageMemory,
shouldThrottle = pageMemory.shouldThrottle(3.0 / 4);
}
else {
double dirtyRatioThreshold = ((double)cpWrittenPages) / cpTotalPages;
double dirtyRatioThreshold = cpWrittenRecoveryPages == 0 ? ((double)cpWrittenPages) / cpTotalPages :
(cpWrittenRecoveryPages + cpWrittenPages) / 2d / cpTotalPages;

// Starting with 0.05 to avoid throttle right after checkpoint start
// 7/12 is maximum ratio of dirty pages
Expand All @@ -119,8 +126,7 @@ public PagesWriteThrottle(PageMemoryImpl pageMemory,
}
}

ExponentialBackoffThrottlingStrategy exponentialThrottle = isPageInCheckpoint
? inCheckpointProtection : notInCheckpointProtection;
ThrottlingStrategy exponentialThrottle = isPageInCheckpoint ? inCheckpointProtection : notInCheckpointProtection;

if (shouldThrottle) {
long throttleParkTimeNs = exponentialThrottle.protectionParkTime();
Expand Down Expand Up @@ -155,7 +161,7 @@ public PagesWriteThrottle(PageMemoryImpl pageMemory,
pageMemory.metrics().addThrottlingTime(U.currentTimeMillis() - startTime);
}
else {
boolean backoffWasAlreadyStarted = exponentialThrottle.resetBackoff();
boolean backoffWasAlreadyStarted = exponentialThrottle.reset();

if (isPageInCheckpoint && backoffWasAlreadyStarted)
unparkParkedThreads();
Expand All @@ -165,7 +171,7 @@ public PagesWriteThrottle(PageMemoryImpl pageMemory,
/** {@inheritDoc} */
@Override public void wakeupThrottledThreads() {
if (!isCpBufferOverflowThresholdExceeded()) {
inCheckpointProtection.resetBackoff();
inCheckpointProtection.reset();

unparkParkedThreads();
}
Expand All @@ -184,8 +190,8 @@ private void unparkParkedThreads() {

/** {@inheritDoc} */
@Override public void onFinishCheckpoint() {
inCheckpointProtection.resetBackoff();
notInCheckpointProtection.resetBackoff();
inCheckpointProtection.reset();
notInCheckpointProtection.reset();
}

/** {@inheritDoc} */
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package org.apache.ignite.internal.processors.cache.persistence.pagemem;

/**
* Strategy used to protect memory from exhaustion.
*/
public interface ThrottlingStrategy {
/**
* Computes next duration (in nanos) to throttle a thread.
*
* @return park time in nanos.
*/
public long protectionParkTime();

/**
* Resets the state. Invoked when no throttling is needed anymore.
*
* @return {@code true} if the instance was not already in a reset state
*/
public boolean reset();
}
Original file line number Diff line number Diff line change
Expand Up @@ -342,8 +342,15 @@ public abstract class IgniteUtils {
/** Minimum checkpointing page buffer size (may be adjusted by Ignite). */
public static final Long DFLT_MIN_CHECKPOINTING_PAGE_BUFFER_SIZE = GB / 4;

/** Default minimum checkpointing page buffer size (may be adjusted by Ignite). */
public static final Long DFLT_MAX_CHECKPOINTING_PAGE_BUFFER_SIZE = 2 * GB;
/** Default maximum checkpointing page buffer size (when recovery data stored in WAL). */
public static final Long DFLT_MAX_CHECKPOINTING_PAGE_BUFFER_SIZE_WAL_RECOVERY = 2 * GB;

/**
* Default maximum checkpointing page buffer size (when recovery data stored on checkpoint).
* In this mode checkpoint duration can be twice as long as for mode with storing recovery data to WAL.
* Also, checkpoint buffer pages can't be released during write recovery data phase, so we need larger buffer size.
*/
public static final Long DFLT_MAX_CHECKPOINTING_PAGE_BUFFER_SIZE_CP_RECOVERY = 5 * GB;

/** @see IgniteSystemProperties#IGNITE_MBEAN_APPEND_CLASS_LOADER_ID */
public static final boolean DFLT_MBEAN_APPEND_CLASS_LOADER_ID = true;
Expand Down Expand Up @@ -11160,19 +11167,21 @@ public static <T> T fromBytes(byte[] data) {
* @param regCfg Configuration.
* @return Checkpoint buffer size.
*/
public static long checkpointBufferSize(DataRegionConfiguration regCfg) {
public static long checkpointBufferSize(DataStorageConfiguration dsCfg, DataRegionConfiguration regCfg) {
if (!regCfg.isPersistenceEnabled())
return 0L;

long res = regCfg.getCheckpointPageBufferSize();

if (res == 0L) {
long maxCpPageBufSize = dsCfg.isWriteRecoveryDataOnCheckpoint() ?
DFLT_MAX_CHECKPOINTING_PAGE_BUFFER_SIZE_CP_RECOVERY :
DFLT_MAX_CHECKPOINTING_PAGE_BUFFER_SIZE_WAL_RECOVERY;

if (regCfg.getMaxSize() < GB)
res = Math.min(DFLT_MIN_CHECKPOINTING_PAGE_BUFFER_SIZE, regCfg.getMaxSize());
else if (regCfg.getMaxSize() < 8 * GB)
res = regCfg.getMaxSize() / 4;
else
res = DFLT_MAX_CHECKPOINTING_PAGE_BUFFER_SIZE;
res = Math.min(regCfg.getMaxSize() / 4, maxCpPageBufSize);
}

return res;
Expand All @@ -11194,7 +11203,7 @@ public static long adjustedWalHistorySize(DataStorageConfiguration dsCfg, @Nulla

if (dsCfg.getDataRegionConfigurations() != null) {
for (DataRegionConfiguration regCfg : dsCfg.getDataRegionConfigurations()) {
long cpBufSize = checkpointBufferSize(regCfg);
long cpBufSize = checkpointBufferSize(dsCfg, regCfg);

if (cpBufSize > regCfg.getMaxSize())
cpBufSize = regCfg.getMaxSize();
Expand All @@ -11207,7 +11216,7 @@ public static long adjustedWalHistorySize(DataStorageConfiguration dsCfg, @Nulla
{
DataRegionConfiguration regCfg = dsCfg.getDefaultDataRegionConfiguration();

long cpBufSize = checkpointBufferSize(regCfg);
long cpBufSize = checkpointBufferSize(dsCfg, regCfg);

if (cpBufSize > regCfg.getMaxSize())
cpBufSize = regCfg.getMaxSize();
Expand Down
Loading

0 comments on commit 619220d

Please sign in to comment.