Skip to content

Commit

Permalink
Fix for failure IT's
Browse files Browse the repository at this point in the history
Signed-off-by: Sandeep Kumawat <[email protected]>
  • Loading branch information
Sandeep Kumawat committed Feb 25, 2025
1 parent 37f1490 commit 3ce8970
Show file tree
Hide file tree
Showing 7 changed files with 159 additions and 546 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -244,12 +243,4 @@ protected SegmentInfos getLatestSegmentInfos(IndexShard shard) throws IOExceptio
return closeable.get();
}
}

protected boolean warmIndexSegmentReplicationEnabled() {
return Objects.equals(
IndexModule.INDEX_STORE_LOCALITY_SETTING.get(indexSettings()).toString(),
IndexModule.DataLocalityType.PARTIAL.name()
);
}

}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ public void flush(boolean force, boolean waitIfOngoing) throws EngineException {
ensureOpen();
// Skip flushing for indices with partial locality (warm indices)
// For these indices, we don't need to commit as we will sync from the remote store on re-open
if (engineConfig.getIndexSettings().isStoreLocalityPartial() == false) {
if (engineConfig.getIndexSettings().isStoreLocalityPartial()) {
return;
}
// readLock is held here to wait/block any concurrent close that acquires the writeLock.
Expand Down Expand Up @@ -447,7 +447,9 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) {
latestSegmentInfos.changed();
}
try {
commitSegmentInfos(latestSegmentInfos);
if (engineConfig.getIndexSettings().isStoreLocalityPartial() == false) {
commitSegmentInfos(latestSegmentInfos);
}
} catch (IOException e) {
// mark the store corrupted unless we are closing as result of engine failure.
// in this case Engine#failShard will handle store corruption.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5160,8 +5160,10 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, final Runn
store.deleteQuiet(file);
}
}
assert Arrays.stream(store.directory().listAll()).filter(f -> f.startsWith(IndexFileNames.SEGMENTS)).findAny().isEmpty()
: "There should not be any segments file in the dir";
if (indexSettings.isStoreLocalityPartial() == false) {
assert Arrays.stream(store.directory().listAll()).filter(f -> f.startsWith(IndexFileNames.SEGMENTS)).findAny().isEmpty()
: "There should not be any segments file in the dir";
}
store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint);
}
syncSegmentSuccess = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,16 +80,14 @@ public CompositeDirectory(Directory localDirectory, Directory remoteDirectory, F
*
* @throws IOException in case of I/O error
*/
// TODO: Revisit listAll() implementation, Check if we should include the remote files as well.
@Override
public String[] listAll() throws IOException {
ensureOpen();
logger.trace("Composite Directory[{}]: listAll() called", this::toString);
String[] localFiles = localDirectory.listAll();
Set<String> allFiles = new HashSet<>(Arrays.asList(localFiles));
// String[] remoteFiles = getRemoteFiles();
// allFiles.addAll(Arrays.asList(remoteFiles));
logger.trace("Composite Directory[{}]: Local Directory files - {}", this::toString, () -> Arrays.toString(localFiles));
// logger.trace("Composite Directory[{}]: Remote Directory files - {}", this::toString, () -> Arrays.toString(remoteFiles));
logger.trace("listAll Composite Directory[{}]: Local Directory files - {}", this::toString, () -> Arrays.toString(localFiles));
Set<String> nonBlockLuceneFiles = allFiles.stream()
.filter(file -> !FileTypeUtils.isBlockFile(file))
.collect(Collectors.toUnmodifiableSet());
Expand All @@ -113,12 +111,17 @@ public void deleteFile(String name) throws IOException {
if (FileTypeUtils.isTempFile(name)) {
localDirectory.deleteFile(name);
} else if (Arrays.asList(listAll()).contains(name) == false) {
logger.debug("The file [{}] does not exist", name);
// we should not fail here as localDirectory might not contain this file.
// throw new NoSuchFileException("File " + name + " not found in directory");
logger.debug("The file [{}] does not exist in local directory", name);
// we should not throw exception in this case as localDirectory might not contain this file.
} else {
localDirectory.deleteFile(name);
fileCache.remove(getFilePath(name));
// It is possible that filecache doesn't have the file, but localdirectory contains the file. We will delete it from the
// localDirectory.
if (fileCache.get(getFilePath(name)) == null) {
logger.info("The file [{}] exist in local but not part of FileCache, deleting it from local", name);
localDirectory.deleteFile(name);
} else {
fileCache.remove(getFilePath(name));
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,8 @@ public void logCurrentState() {
if (cache.size() > 0) {
logger.trace("SegmentedCache " + i);
((LRUCache<K, V>) cache).logCurrentState();
} else {
logger.trace("SegmentedCache is empty");
}
i++;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void setup() throws IOException {

public void testListAll() throws IOException {
String[] actualFileNames = compositeDirectory.listAll();
String[] expectedFileNames = new String[] { "_0.cfe", "_0.cfs", "_0.si", "_1.cfe", "_2.cfe", "segments_1", "temp_file.tmp" };
String[] expectedFileNames = new String[] { "_1.cfe", "_2.cfe", "temp_file.tmp" };
assertArrayEquals(expectedFileNames, actualFileNames);
}

Expand Down

0 comments on commit 3ce8970

Please sign in to comment.