Skip to content

Commit

Permalink
IGNITE-24390 Move WAL dirs to NodeFileTree (#11841)
Browse files Browse the repository at this point in the history
  • Loading branch information
nizhikov authored Feb 5, 2025
1 parent 9e04bd9 commit ae1cad6
Show file tree
Hide file tree
Showing 44 changed files with 355 additions and 535 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,7 @@ private void cleanPersistenceDir() throws Exception {
SharedFileTree sft = new SharedFileTree(U.defaultWorkDirectory());

U.delete(U.resolveWorkDirectory(U.defaultWorkDirectory(), "cp", false));
U.delete(sft.db());
U.delete(sft.marshaller());
U.delete(sft.binaryMetaRoot());
U.delete(sft.marshaller().getParentFile());
}

/** */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
package org.apache.ignite.compatibility.persistence;

import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Set;
import java.util.TreeSet;
import java.util.function.Consumer;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
Expand Down Expand Up @@ -224,36 +224,11 @@ private void assertNodeIndexesInFolder(Integer... indexes) throws IgniteCheckedE
private void assertPdsDirsDefaultExist(String subDirName) throws IgniteCheckedException {
NodeFileTree ft = nodeFileTree(subDirName);

assertTrue(ft.binaryMeta().exists() && ft.binaryMeta().isDirectory());
Consumer<File> check = dir -> assertTrue(dir.exists() && dir.isDirectory());

assertDirectoryExist(PersistentStoreConfiguration.DFLT_WAL_STORE_PATH, subDirName);
assertDirectoryExist(PersistentStoreConfiguration.DFLT_WAL_ARCHIVE_PATH, subDirName);
assertDirectoryExist(PdsFolderResolver.DB_DEFAULT_FOLDER, subDirName);
check.accept(ft.binaryMeta());
check.accept(ft.wal());
check.accept(ft.walArchive());
check.accept(ft.nodeStorage());
}

/**
* Checks one folder existence
*
* @param subFolderNames subfolders array to touch
* @throws IgniteCheckedException if IO error occur
*/
private void assertDirectoryExist(String... subFolderNames) throws IgniteCheckedException {
File curFolder = new File(U.defaultWorkDirectory());

for (String name : subFolderNames) {
curFolder = new File(curFolder, name);
}

final String path;
try {
path = curFolder.getCanonicalPath();
}
catch (IOException e) {
throw new IgniteCheckedException("Failed to convert path: [" + curFolder.getAbsolutePath() + "]", e);
}

assertTrue("Directory " + Arrays.asList(subFolderNames).toString()
+ " is expected to exist [" + path + "]", curFolder.exists() && curFolder.isDirectory());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -250,9 +250,7 @@ private void checkDeleteLostSegmentLinks(List<Long> expBefore, List<Long> expAft

/** */
private void checkLinks(IgniteEx srv, List<Long> expLinks) {
FileWriteAheadLogManager wal0 = (FileWriteAheadLogManager)srv.context().cache().context().wal(true);

File[] links = wal0.walCdcDirectory().listFiles(WAL_SEGMENT_FILE_FILTER);
File[] links = srv.context().pdsFolderResolver().fileTree().walCdc().listFiles(WAL_SEGMENT_FILE_FILTER);

assertEquals(expLinks.size(), links.length);
Arrays.stream(links).map(File::toPath).map(FileWriteAheadLogManager::segmentIndex)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -750,6 +750,8 @@ public String getWalPath() {
* @return {@code this} for chaining.
*/
public DataStorageConfiguration setWalPath(String walStorePath) {
A.notNull(walStorePath, "WAL");

this.walPath = walStorePath;

return this;
Expand All @@ -772,6 +774,8 @@ public String getWalArchivePath() {
* @return {@code this} for chaining.
*/
public DataStorageConfiguration setWalArchivePath(String walArchivePath) {
A.notNull(walArchivePath, "WAL archive");

this.walArchivePath = walArchivePath;

return this;
Expand All @@ -797,6 +801,8 @@ public String getCdcWalPath() {
*/
@IgniteExperimental
public DataStorageConfiguration setCdcWalPath(String cdcWalPath) {
A.notNull(cdcWalPath, "CDC WAL");

this.cdcWalPath = cdcWalPath;

return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,6 @@ public class PersistentStoreConfiguration implements Serializable {
/** Default wal always write full pages. */
public static final boolean DFLT_WAL_ALWAYS_WRITE_FULL_PAGES = false;

/** Default wal directory. */
public static final String DFLT_WAL_STORE_PATH = "db/wal";

/** Default wal archive directory. */
public static final String DFLT_WAL_ARCHIVE_PATH = "db/wal/archive";

/** Default write throttling enabled. */
public static final boolean DFLT_WRITE_THROTTLING_ENABLED = false;

Expand Down Expand Up @@ -115,10 +109,10 @@ public class PersistentStoreConfiguration implements Serializable {
private int walSegmentSize = DFLT_WAL_SEGMENT_SIZE;

/** Directory where WAL is stored (work directory) */
private String walStorePath = DFLT_WAL_STORE_PATH;
private String walStorePath = DataStorageConfiguration.DFLT_WAL_PATH;

/** WAL archive path. */
private String walArchivePath = DFLT_WAL_ARCHIVE_PATH;
private String walArchivePath = DataStorageConfiguration.DFLT_WAL_ARCHIVE_PATH;

/** Metrics enabled flag. */
private boolean metricsEnabled = DFLT_METRICS_ENABLED;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.io.Serializable;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashSet;
Expand Down Expand Up @@ -226,12 +225,6 @@ public class CdcMain implements Runnable {
/** Logger. */
private final IgniteLogger log;

/** Change Data Capture directory. */
private Path cdcDir;

/** Database directory. */
private File dbDir;

/** Ignite folders. */
private NodeFileTree ft;

Expand Down Expand Up @@ -325,10 +318,10 @@ public void runX() throws Exception {
}

try (CdcFileLockHolder lock = lockPds()) {
Files.createDirectories(cdcDir.resolve(STATE_DIR));
Files.createDirectories(ft.walCdc().toPath().resolve(STATE_DIR));

if (log.isInfoEnabled()) {
log.info("Change Data Capture [dir=" + cdcDir + ']');
log.info("Change Data Capture [dir=" + ft.walCdc() + ']');
log.info("Ignite node Binary meta [dir=" + ft.binaryMeta() + ']');
log.info("Ignite node Marshaller [dir=" + ft.marshaller() + ']');
}
Expand All @@ -340,7 +333,7 @@ public void runX() throws Exception {
try {
kctx.resource().injectGeneric(consumer.consumer());

state = createState(cdcDir.resolve(STATE_DIR));
state = createState(ft.walCdc().toPath().resolve(STATE_DIR));

walState = state.loadWalState();
typesState = state.loadTypesState();
Expand Down Expand Up @@ -379,7 +372,6 @@ protected CdcConsumerState createState(Path stateDir) {
}

/**
* @return Kernal instance.
* @throws IgniteCheckedException If failed.
*/
private void startStandaloneKernal() throws IgniteCheckedException {
Expand Down Expand Up @@ -424,7 +416,7 @@ private void startStandaloneKernal() throws IgniteCheckedException {
private void initMetrics() {
mreg.objectMetric(BINARY_META_DIR, String.class, "Binary meta directory").value(ft.binaryMeta().getAbsolutePath());
mreg.objectMetric(MARSHALLER_DIR, String.class, "Marshaller directory").value(ft.marshaller().getAbsolutePath());
mreg.objectMetric(CDC_DIR, String.class, "CDC directory").value(cdcDir.toFile().getAbsolutePath());
mreg.objectMetric(CDC_DIR, String.class, "CDC directory").value(ft.walCdc().getAbsolutePath());

curSegmentIdx = mreg.longMetric(CUR_SEG_IDX, "Current segment index");
committedSegmentIdx = mreg.longMetric(COMMITTED_SEG_IDX, "Committed segment index");
Expand Down Expand Up @@ -489,7 +481,7 @@ public void consumeWalSegmentsUntilStopped() {
return;
}

try (Stream<Path> cdcFiles = Files.list(cdcDir)) {
try (Stream<Path> cdcFiles = Files.list(ft.walCdc().toPath())) {
Set<Path> exists = new HashSet<>();

Iterator<Path> segments = cdcFiles
Expand Down Expand Up @@ -757,14 +749,14 @@ private void updateMappings() {
/** Search for new or changed {@link CdcCacheEvent} and notifies the consumer. */
private void updateCaches() {
try {
if (!dbDir.exists())
if (!ft.nodeStorage().exists())
return;

Set<Integer> destroyed = new HashSet<>(cachesState.keySet());

Iterator<CdcCacheEvent> cacheEvts = GridLocalConfigManager
.readCachesData(
dbDir,
ft.nodeStorage(),
kctx.marshallerContext().jdkMarshaller(),
igniteCfg)
.entrySet().stream()
Expand Down Expand Up @@ -887,35 +879,18 @@ private CdcFileLockHolder tryLock(File dbStoreDirWithSubdirectory) {
return null;
}

File cdcRoot = new File(igniteCfg.getDataStorageConfiguration().getCdcWalPath());

if (!cdcRoot.isAbsolute()) {
cdcRoot = new File(
igniteCfg.getWorkDirectory(),
igniteCfg.getDataStorageConfiguration().getCdcWalPath()
);
}

if (!cdcRoot.exists()) {
log.warning("CDC root directory not exists. Should be created by Ignite Node. " +
"Is Change Data Capture enabled in IgniteConfiguration? [dir=" + cdcRoot + ']');
String folderName = dbStoreDirWithSubdirectory.getName();

return null;
}
ft = new NodeFileTree(igniteCfg, folderName);

Path cdcDir = Paths.get(cdcRoot.getAbsolutePath(), dbStoreDirWithSubdirectory.getName());

if (!Files.exists(cdcDir)) {
if (!ft.walCdc().exists()) {
log.warning("CDC directory not exists. Should be created by Ignite Node. " +
"Is Change Data Capture enabled in IgniteConfiguration? [dir=" + cdcDir + ']');
"Is Change Data Capture enabled in IgniteConfiguration? [dir=" + ft.walCdc() + ']');

return null;
}

this.cdcDir = cdcDir;
this.dbDir = dbStoreDirWithSubdirectory;

CdcFileLockHolder lock = new CdcFileLockHolder(cdcDir.toString(), "cdc.lock", log);
CdcFileLockHolder lock = new CdcFileLockHolder(ft.walCdc().toString(), "cdc.lock", log);

try {
lock.tryLock(cdcCfg.getLockTimeout());
Expand All @@ -926,7 +901,7 @@ private CdcFileLockHolder tryLock(File dbStoreDirWithSubdirectory) {
U.closeQuiet(lock);

if (log.isInfoEnabled()) {
log.info("Unable to acquire lock to lock CDC folder [dir=" + cdcRoot + "]" + NL +
log.info("Unable to acquire lock to lock CDC folder [dir=" + ft.walCdc() + "]" + NL +
"Reason: " + e.getMessage());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
import org.apache.ignite.internal.util.typedef.internal.U;

import static org.apache.ignite.internal.cdc.CdcConsumerState.CDC_MODE_FILE_NAME;
Expand All @@ -45,7 +44,7 @@ public class CdcUtilityActiveCdcManager extends GridCacheSharedManagerAdapter im
if (fut.localJoinExchange() || fut.activateCluster()) {
try {
File cdcModeFile = Paths.get(
((FileWriteAheadLogManager)cctx.wal(true)).walCdcDirectory().getAbsolutePath(),
cctx.kernalContext().pdsFolderResolver().fileTree().walCdc().getAbsolutePath(),
STATE_DIR,
CDC_MODE_FILE_NAME).toFile();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.ignite.internal.cdc.CdcFileLockHolder;
import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
import org.apache.ignite.internal.processors.task.GridInternal;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.visor.VisorJob;
import org.apache.ignite.internal.visor.VisorMultiNodeTask;
Expand Down Expand Up @@ -90,13 +91,11 @@ protected CdcDeleteLostSegmentsJob(CdcDeleteLostSegmentLinksCommandArg arg, bool

/** {@inheritDoc} */
@Override protected Void run(CdcDeleteLostSegmentLinksCommandArg arg) throws IgniteException {
FileWriteAheadLogManager wal = (FileWriteAheadLogManager)ignite.context().cache().context().wal(true);

File walCdcDir = wal.walCdcDirectory();

if (walCdcDir == null)
if (!CU.isCdcEnabled(ignite.configuration()))
throw new IgniteException("CDC is not configured.");

File walCdcDir = ignite.context().pdsFolderResolver().fileTree().walCdc();

CdcFileLockHolder lock = new CdcFileLockHolder(walCdcDir.getAbsolutePath(), "Delete lost segments job", log);

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,10 @@
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.compute.ComputeJobResult;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.management.wal.WalPrintCommand.WalPrintCommandArg;
import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderSettings;
import org.apache.ignite.internal.processors.cache.persistence.filename.NodeFileTree;
import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
import org.apache.ignite.internal.processors.task.GridInternal;
Expand Down Expand Up @@ -274,32 +272,12 @@ private int resolveMaxReservedIndex(FileWriteAheadLogManager wal, WALPointer low
* @throws IgniteCheckedException if failed.
*/
private File getWalArchiveDir() throws IgniteCheckedException {
IgniteConfiguration igCfg = ignite.context().config();
NodeFileTree ft = ignite.context().pdsFolderResolver().fileTree();

DataStorageConfiguration dsCfg = igCfg.getDataStorageConfiguration();
if (!ft.walArchive().exists())
throw new IgniteCheckedException("WAL archive directory does not exists" + ft.walArchive().getAbsolutePath());

PdsFolderSettings resFldrs = ignite.context().pdsFolderResolver().resolveFolders();

String consId = resFldrs.folderName();

File dir;

if (dsCfg.getWalArchivePath() != null) {
File workDir0 = new File(dsCfg.getWalArchivePath());

dir = workDir0.isAbsolute() ?
new File(workDir0, consId) :
new File(U.resolveWorkDirectory(igCfg.getWorkDirectory(), dsCfg.getWalArchivePath(), false),
consId);
}
else
dir = new File(U.resolveWorkDirectory(igCfg.getWorkDirectory(),
DataStorageConfiguration.DFLT_WAL_ARCHIVE_PATH, false), consId);

if (!dir.exists())
throw new IgniteCheckedException("WAL archive directory does not exists" + dir.getAbsolutePath());

return dir;
return ft.walArchive();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,13 +235,6 @@ public WALIterator replay(
*/
void startAutoReleaseSegments();

/**
* Archive directory if any.
*
* @return Archive directory.
*/
@Nullable File archiveDir();

/**
* @param idx Segment index.
* @return Compressed archive segment.
Expand Down
Loading

0 comments on commit ae1cad6

Please sign in to comment.