Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-24425 WAL files move to NodeFileTree #11865

Merged
merged 18 commits into from
Feb 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.junit.Test;

import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_SEGMENT_FILE_COMPACTED_FILTER;

/**
* Saves data using previous version of ignite and then load this data using actual version
*/
Expand Down Expand Up @@ -124,11 +126,7 @@ private void doTestStartupWithOldVersion(String ver) throws Exception {

NodeFileTree ft = ignite.context().pdsFolderResolver().fileTree();

File[] compressedSegments = ft.walArchive().listFiles(new FilenameFilter() {
@Override public boolean accept(File dir, String name) {
return name.endsWith(".wal.zip");
}
});
File[] compressedSegments = ft.walArchive().listFiles(WAL_SEGMENT_FILE_COMPACTED_FILTER);

final int actualCompressedWalSegments = compressedSegments == null ? 0 : compressedSegments.length;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.ignite.internal.pagemem.wal.record.CdcDataRecord;
import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
import org.apache.ignite.internal.processors.cache.persistence.filename.NodeFileTree;
import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
import org.apache.ignite.internal.processors.configuration.distributed.DistributedChangeableProperty;
Expand Down Expand Up @@ -250,10 +251,12 @@ private void checkDeleteLostSegmentLinks(List<Long> expBefore, List<Long> expAft

/** */
private void checkLinks(IgniteEx srv, List<Long> expLinks) {
File[] links = srv.context().pdsFolderResolver().fileTree().walCdc().listFiles(WAL_SEGMENT_FILE_FILTER);
NodeFileTree ft = srv.context().pdsFolderResolver().fileTree();

File[] links = ft.walCdc().listFiles(WAL_SEGMENT_FILE_FILTER);

assertEquals(expLinks.size(), links.length);
Arrays.stream(links).map(File::toPath).map(FileWriteAheadLogManager::segmentIndex)
Arrays.stream(links).map(File::toPath).map(ft::walSegmentIndex)
.allMatch(expLinks::contains);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import org.apache.ignite.internal.processors.cache.persistence.filename.NodeFileTree;
import org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderResolver;
import org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderSettings;
import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory;
import org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext;
Expand Down Expand Up @@ -88,7 +87,6 @@
import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CDC_MANAGER_STOP_RECORD;
import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.DATA_RECORD_V2;
import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_SEGMENT_FILE_FILTER;
import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.segmentIndex;
import static org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext.closeAllComponents;
import static org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext.startAllComponents;
import static org.apache.ignite.internal.processors.metric.impl.MetricUtils.metricName;
Expand Down Expand Up @@ -489,9 +487,9 @@ public void consumeWalSegmentsUntilStopped() {
// Need unseen WAL segments only.
.filter(p -> WAL_SEGMENT_FILE_FILTER.accept(p.toFile()) && !seen.contains(p))
.peek(seen::add) // Adds to seen.
.sorted(Comparator.comparingLong(FileWriteAheadLogManager::segmentIndex)) // Sort by segment index.
.sorted(Comparator.comparingLong(ft::walSegmentIndex)) // Sort by segment index.
.peek(p -> {
long nextSgmnt = segmentIndex(p);
long nextSgmnt = ft.walSegmentIndex(p);

if (lastSgmnt.get() != -1 && nextSgmnt - lastSgmnt.get() != 1) {
throw new IgniteException("Found missed segments. Some events are missed. Exiting! " +
Expand Down Expand Up @@ -561,7 +559,7 @@ private boolean consumeSegment(Path segment) {
if (walState != null)
builder.from(walState.get1());

long segmentIdx = segmentIndex(segment);
long segmentIdx = ft.walSegmentIndex(segment);

lastSegmentConsumptionTs.value(System.currentTimeMillis());

Expand Down Expand Up @@ -807,7 +805,7 @@ private void updateCaches() {
* @return {@code True} if segment file was deleted, {@code false} otherwise.
*/
private boolean removeProcessedOnFailover(Path segment) {
long segmentIdx = segmentIndex(segment);
long segmentIdx = ft.walSegmentIndex(segment);

if (segmentIdx > walState.get1().index()) {
throw new IgniteException("Found segment greater then saved state. Some events are missed. Exiting! " +
Expand Down Expand Up @@ -854,7 +852,7 @@ private void saveStateAndRemoveProcessed(T2<WALPointer, Integer> curState) throw
Path processedSegment = rmvIter.next();

// Can't delete current segment, because state points to it.
if (segmentIndex(processedSegment) >= curState.get1().index())
if (ft.walSegmentIndex(processedSegment) >= curState.get1().index())
continue;

// WAL segment is a hard link to a segment file in a specifal Change Data Capture folder.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.compute.ComputeJobResult;
import org.apache.ignite.internal.cdc.CdcFileLockHolder;
import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
import org.apache.ignite.internal.processors.cache.persistence.filename.NodeFileTree;
import org.apache.ignite.internal.processors.task.GridInternal;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
Expand Down Expand Up @@ -94,24 +94,24 @@ protected CdcDeleteLostSegmentsJob(CdcDeleteLostSegmentLinksCommandArg arg, bool
if (!CU.isCdcEnabled(ignite.configuration()))
throw new IgniteException("CDC is not configured.");

File walCdcDir = ignite.context().pdsFolderResolver().fileTree().walCdc();
NodeFileTree ft = ignite.context().pdsFolderResolver().fileTree();

CdcFileLockHolder lock = new CdcFileLockHolder(walCdcDir.getAbsolutePath(), "Delete lost segments job", log);
CdcFileLockHolder lock = new CdcFileLockHolder(ft.walCdc().getAbsolutePath(), "Delete lost segments job", log);

try {
lock.tryLock(1);

try (Stream<Path> cdcFiles = Files.list(walCdcDir.toPath())) {
try (Stream<Path> cdcFiles = Files.list(ft.walCdc().toPath())) {
Set<File> delete = new HashSet<>();

AtomicLong lastSgmnt = new AtomicLong(-1);

cdcFiles
.filter(p -> WAL_SEGMENT_FILE_FILTER.accept(p.toFile()))
.sorted(Comparator.comparingLong(FileWriteAheadLogManager::segmentIndex)
.sorted(Comparator.comparingLong(ft::walSegmentIndex)
.reversed()) // Sort by segment index.
.forEach(path -> {
long idx = FileWriteAheadLogManager.segmentIndex(path);
long idx = ft.walSegmentIndex(path);

if (lastSgmnt.get() == -1 || lastSgmnt.get() - idx == 1) {
lastSgmnt.set(idx);
Expand Down Expand Up @@ -139,7 +139,7 @@ protected CdcDeleteLostSegmentsJob(CdcDeleteLostSegmentLinksCommandArg arg, bool
log.info("Segment CDC link deleted [file=" + file.getAbsolutePath() + ']');
});

Path stateDir = walCdcDir.toPath().resolve(STATE_DIR);
Path stateDir = ft.walCdc().toPath().resolve(STATE_DIR);

if (stateDir.toFile().exists()) {
File walState = stateDir.resolve(WAL_STATE_FILE_NAME).toFile();
Expand All @@ -157,7 +157,7 @@ protected CdcDeleteLostSegmentsJob(CdcDeleteLostSegmentLinksCommandArg arg, bool
catch (IgniteCheckedException e) {
throw new RuntimeException("Failed to delete lost segment CDC links. " +
"Unable to acquire lock to lock CDC folder. Make sure a CDC app is shut down " +
"[dir=" + walCdcDir.getAbsolutePath() + ", reason=" + e.getMessage() + ']');
"[dir=" + ft.walCdc().getAbsolutePath() + ", reason=" + e.getMessage() + ']');
}
finally {
U.closeQuiet(lock);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.regex.Pattern;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
Expand All @@ -48,6 +47,9 @@
import org.apache.ignite.resources.LoggerResource;
import org.jetbrains.annotations.Nullable;

import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_NAME_PATTERN;
import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_SEGMENT_FILE_COMPACTED_PATTERN;

/**
* Performs WAL cleanup clusterwide.
*/
Expand All @@ -56,12 +58,6 @@ public class WalTask extends VisorMultiNodeTask<WalDeleteCommandArg, WalTaskResu
/** */
private static final long serialVersionUID = 0L;

/** Pattern for segment file names. */
private static final Pattern WAL_NAME_PATTERN = Pattern.compile("\\d{16}\\.wal");

/** Pattern for compacted segment file names. */
private static final Pattern WAL_SEGMENT_FILE_COMPACTED_PATTERN = Pattern.compile("\\d{16}\\.wal\\.zip");

/** WAL archive file filter. */
private static final FileFilter WAL_ARCHIVE_FILE_FILTER = new FileFilter() {
@Override public boolean accept(File file) {
Expand Down Expand Up @@ -137,6 +133,9 @@ private static class WalJob extends VisorJob<WalDeleteCommandArg, Collection<Str
@LoggerResource
private transient IgniteLogger log;

/** Node file tree. */
private transient NodeFileTree ft;

/**
* @param arg WAL task argument.
* @param debug Debug flag.
Expand All @@ -149,6 +148,7 @@ public WalJob(WalDeleteCommandArg arg, boolean debug) {
@Nullable @Override protected Collection<String> run(@Nullable WalDeleteCommandArg arg) throws IgniteException {
try {
GridKernalContext cctx = ignite.context();
ft = ignite.context().pdsFolderResolver().fileTree();

GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)cctx.cache().context().database();
FileWriteAheadLogManager wal = (FileWriteAheadLogManager)cctx.cache().context().wal();
Expand Down Expand Up @@ -194,10 +194,10 @@ Collection<String> getUnusedWalSegments(
sortWalFiles(walFiles);

// Obtain index of last archived WAL segment, it will not be deleted.
long lastArchIdx = getIndex(walFiles[walFiles.length - 1]);
long lastArchIdx = ft.walSegmentIndex(walFiles[walFiles.length - 1].toPath());

for (File f : walFiles) {
long fileIdx = getIndex(f);
long fileIdx = ft.walSegmentIndex(f.toPath());

if (fileIdx < maxIdx && fileIdx < lastArchIdx)
res.add(f.getAbsolutePath());
Expand Down Expand Up @@ -239,7 +239,7 @@ Collection<String> deleteUnusedWalSegments(
Collection<String> res = new ArrayList<>(num);

for (File walFile: walFiles) {
if (getIndex(walFile) < maxIdx && num > 0)
if (ft.walSegmentIndex(walFile.toPath()) < maxIdx && num > 0)
res.add(walFile.getAbsolutePath());
else
break;
Expand Down Expand Up @@ -272,8 +272,6 @@ private int resolveMaxReservedIndex(FileWriteAheadLogManager wal, WALPointer low
* @throws IgniteCheckedException if failed.
*/
private File getWalArchiveDir() throws IgniteCheckedException {
NodeFileTree ft = ignite.context().pdsFolderResolver().fileTree();

if (!ft.walArchive().exists())
throw new IgniteCheckedException("WAL archive directory does not exists" + ft.walArchive().getAbsolutePath());

Expand All @@ -288,19 +286,9 @@ private File getWalArchiveDir() throws IgniteCheckedException {
private void sortWalFiles(File[] files) {
Arrays.sort(files, new Comparator<File>() {
@Override public int compare(File o1, File o2) {
return Long.compare(getIndex(o1), getIndex(o2));
return Long.compare(ft.walSegmentIndex(o1.toPath()), ft.walSegmentIndex(o2.toPath()));
}
});
}
}

/**
* Get index from WAL segment file.
*
* @param file WAL segment file.
* @return Index of WAL segment file.
*/
private static long getIndex(File file) {
return Long.parseLong(file.getName().substring(0, 16));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.ignite.internal.pagemem.wal;

import java.io.File;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.configuration.DataStorageConfiguration;
Expand Down Expand Up @@ -235,12 +234,6 @@ public WALIterator replay(
*/
void startAutoReleaseSegments();

/**
* @param idx Segment index.
* @return Compressed archive segment.
*/
@Nullable File compactedSegment(long idx);

/**
* Blocks current thread while segment with the {@code idx} not compressed.
* If segment compressed, already, returns immediately.
Expand Down
Loading