Skip to content

Commit

Permalink
Optimize stale file deletion
Browse files Browse the repository at this point in the history
Signed-off-by: Shivansh Arora <[email protected]>
  • Loading branch information
shiv0408 committed Apr 9, 2024
1 parent 33cfeb0 commit 5f62745
Show file tree
Hide file tree
Showing 4 changed files with 223 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING;
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
import static org.opensearch.gateway.remote.RemoteClusterStateService.RETAINED_MANIFESTS;
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER;
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.METADATA_FILE_PREFIX;
import static org.opensearch.gateway.remote.RemoteGlobalMetadataManager.COORDINATION_METADATA;
Expand Down Expand Up @@ -63,27 +66,62 @@ private Map<String, Long> initialTestSetup(int shardCount, int replicaCount, int
return indexStats;
}

public void testFullClusterRestoreStaleDelete() throws Exception {
public void testRemoteCleanupTaskUpdated() {
int shardCount = randomIntBetween(1, 2);
int replicaCount = 1;
int dataNodeCount = shardCount * (replicaCount + 1);
int clusterManagerNodeCount = 1;

initialTestSetup(shardCount, replicaCount, dataNodeCount, clusterManagerNodeCount);
setReplicaCount(0);
setReplicaCount(2);
setReplicaCount(0);
setReplicaCount(1);
setReplicaCount(0);
setReplicaCount(1);
setReplicaCount(0);
setReplicaCount(2);
setReplicaCount(0);
RemoteClusterStateService remoteClusterStateService = internalCluster().getClusterManagerNodeInstance(
RemoteClusterStateService.class
);

assertEquals(
5,
remoteClusterStateService.getStaleFileDeletionTask().getInterval().getMinutes()
);
assertTrue(remoteClusterStateService.getStaleFileDeletionTask().isScheduled());

// now disable
client().admin().cluster().prepareUpdateSettings()
.setPersistentSettings(Settings.builder().put(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING.getKey(), -1))
.get();

assertEquals(
-1,
remoteClusterStateService.getStaleFileDeletionTask().getInterval().getMillis()
);
assertFalse(remoteClusterStateService.getStaleFileDeletionTask().isScheduled());

// now set Clean up interval to 1 min
client().admin().cluster().prepareUpdateSettings()
.setPersistentSettings(Settings.builder().put(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING.getKey(), "1m"))
.get();
assertEquals(
1,
remoteClusterStateService.getStaleFileDeletionTask().getInterval().getMinutes()
);
}

public void testRemoteCleanupOnlyAfter10Updates() throws Exception {
int shardCount = randomIntBetween(1, 2);
int replicaCount = 1;
int dataNodeCount = shardCount * (replicaCount + 1);
int clusterManagerNodeCount = 1;

initialTestSetup(shardCount, replicaCount, dataNodeCount, clusterManagerNodeCount);
RemoteClusterStateService remoteClusterStateService = internalCluster().getClusterManagerNodeInstance(
RemoteClusterStateService.class
);

// set cleanup interval to 1 min
client().admin().cluster().prepareUpdateSettings()
.setPersistentSettings(Settings.builder().put(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING.getKey(), "1m"))
.get();

replicaCount = updateReplicaCountNTimes(9, replicaCount);

RepositoriesService repositoriesService = internalCluster().getClusterManagerNodeInstance(RepositoriesService.class);

BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(REPOSITORY_NAME);
Expand All @@ -95,14 +133,30 @@ public void testFullClusterRestoreStaleDelete() throws Exception {
)
.add("cluster-state")
.add(getClusterState().metadata().clusterUUID());
BlobPath manifestContainerPath = baseMetadataPath.add("manifest");

assertEquals(10, repository.blobStore().blobContainer(baseMetadataPath.add("manifest")).listBlobsByPrefix("manifest").size());
assertBusy(() -> {
assertEquals(RETAINED_MANIFESTS - 1, repository.blobStore().blobContainer(manifestContainerPath).listBlobsByPrefix("manifest").size());
}, 1, TimeUnit.MINUTES);

replicaCount = updateReplicaCountNTimes(8, replicaCount);

// wait for 1 min, to ensure that clean up task ran and didn't clean up stale files because it was less than 10
Thread.sleep(60000);
assertNotEquals(RETAINED_MANIFESTS - 1, repository.blobStore().blobContainer(manifestContainerPath).listBlobsByPrefix("manifest").size());

// Do 2 more updates, now since the total successful state changes are more than 10, stale files will be cleaned up
replicaCount = updateReplicaCountNTimes(2, replicaCount);

assertBusy(() -> {
assertEquals(RETAINED_MANIFESTS - 1, repository.blobStore().blobContainer(manifestContainerPath).listBlobsByPrefix("manifest").size());
}, 1, TimeUnit.MINUTES);

Map<String, IndexMetadata> indexMetadataMap = remoteClusterStateService.getLatestClusterState(
cluster().getClusterName(),
getClusterState().metadata().clusterUUID()
).getMetadata().getIndices();
assertEquals(0, indexMetadataMap.values().stream().findFirst().get().getNumberOfReplicas());
assertEquals(replicaCount, indexMetadataMap.values().stream().findFirst().get().getNumberOfReplicas());
assertEquals(shardCount, indexMetadataMap.values().stream().findFirst().get().getNumberOfShards());
}

Expand Down Expand Up @@ -243,4 +297,16 @@ private void setReplicaCount(int replicaCount) {
.setSettings(Settings.builder().put(SETTING_NUMBER_OF_REPLICAS, replicaCount))
.get();
}

private int updateReplicaCountNTimes(int n, int initialCount) {
int newReplicaCount = randomIntBetween(0, 3);;
for (int i = 0; i < n; i++) {
while (newReplicaCount == initialCount) {
newReplicaCount = randomIntBetween(0, 3);
}
setReplicaCount(newReplicaCount);
initialCount = newReplicaCount;
}
return newReplicaCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -706,6 +706,7 @@ public void apply(Settings value, Settings current, Settings previous) {

// Remote cluster state settings
RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING,
RemoteClusterStateService.REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING,
RemoteIndexMetadataManager.INDEX_METADATA_UPLOAD_TIMEOUT_SETTING,
RemoteGlobalMetadataManager.GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING,
RemoteManifestManager.METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.CheckedRunnable;
import org.opensearch.common.Nullable;
import org.opensearch.common.blobstore.BlobContainer;
Expand All @@ -25,6 +26,7 @@
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.AbstractAsyncTask;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.action.ActionListener;
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata;
Expand Down Expand Up @@ -85,6 +87,7 @@
*/
public class RemoteClusterStateService implements Closeable {
public static final int RETAINED_MANIFESTS = 10;
public static final int SKIP_CLEANUP_STATE_CHANGES = 10;

private static final Logger logger = LogManager.getLogger(RemoteClusterStateService.class);

Expand All @@ -98,6 +101,17 @@ public class RemoteClusterStateService implements Closeable {
Property.Final
);

/**
* Setting to specify the interval to do run stale file cleanup job
*/
public static final Setting<TimeValue> REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING = Setting.timeSetting(
"cluster.remote_store.state.cleanup_interval",
TimeValue.timeValueMinutes(5),
TimeValue.timeValueMillis(-1),
Property.NodeScope,
Property.Dynamic
);

private final String nodeId;
private final Supplier<RepositoriesService> repositoriesService;
private final Settings settings;
Expand All @@ -113,7 +127,13 @@ public class RemoteClusterStateService implements Closeable {
private RemoteGlobalMetadataManager remoteGlobalMetadataManager;
private RemoteManifestManager remoteManifestManager;
private ClusterSettings clusterSettings;
private TimeValue staleFileCleanupInterval;
private AsyncStaleFileDeletion staleFileDeletionTask;
public static final int INDEX_METADATA_CURRENT_CODEC_VERSION = 1;
private String latestClusterName;
private String latestClusterUUID;
private long lastCleanupAttemptState;
private boolean isClusterManagerNode;

public RemoteClusterStateService(
String nodeId,
Expand All @@ -133,6 +153,10 @@ public RemoteClusterStateService(
this.slowWriteLoggingThreshold = clusterSettings.get(SLOW_WRITE_LOGGING_THRESHOLD);
clusterSettings.addSettingsUpdateConsumer(SLOW_WRITE_LOGGING_THRESHOLD, this::setSlowWriteLoggingThreshold);
this.remoteStateStats = new RemotePersistenceStats();
this.staleFileCleanupInterval = clusterSettings.get(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING);
clusterSettings.addSettingsUpdateConsumer(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING, this::updateCleanupInterval);
this.lastCleanupAttemptState = 0;
this.isClusterManagerNode = DiscoveryNode.isClusterManagerNode(settings);
}

private BlobStoreTransferService getBlobStoreTransferService() {
Expand Down Expand Up @@ -314,7 +338,8 @@ public ClusterMetadataManifest writeIncrementalMetadata(
firstUpload || !customsToUpload.isEmpty() ? allUploadedCustomMap : previousManifest.getCustomMetadataMap(),
false
);
deleteStaleClusterMetadata(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID(), RETAINED_MANIFESTS);
this.latestClusterName = clusterState.getClusterName().value();
this.latestClusterUUID = clusterState.metadata().clusterUUID();

final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos);
remoteStateStats.stateSucceeded();
Expand Down Expand Up @@ -503,6 +528,49 @@ private UploadedMetadataResults writeMetadataInParallel(
return response;
}

public TimeValue getStaleFileCleanupInterval() {
return this.staleFileCleanupInterval;
}

AsyncStaleFileDeletion getStaleFileDeletionTask() { // for testing
return this.staleFileDeletionTask;
}

private void updateCleanupInterval(TimeValue updatedInterval) {
if (!isClusterManagerNode) return;
this.staleFileCleanupInterval = updatedInterval;
logger.info("updated remote state cleanup interval to {}", updatedInterval);
// After updating the interval, we need to close the current task and create a new one which will run with updated interval
if (!this.staleFileDeletionTask.getInterval().equals(updatedInterval)) {
this.staleFileDeletionTask.setInterval(updatedInterval);
}
}

private void cleanUpStaleFiles() {
long cleanUpAttemptState = remoteStateStats.getSuccessCount();
if (
cleanUpAttemptState - lastCleanupAttemptState > SKIP_CLEANUP_STATE_CHANGES &&
this.latestClusterName != null && this.latestClusterUUID != null
) {
logger.info(
"Cleaning up stale remote state files for cluster [{}] with uuid [{}]. Last clean was done before {} updates",
this.latestClusterName,
this.latestClusterUUID,
cleanUpAttemptState - lastCleanupAttemptState
);
deleteStaleClusterMetadata(this.latestClusterName, this.latestClusterUUID, RETAINED_MANIFESTS);
lastCleanupAttemptState = cleanUpAttemptState;
} else {
logger.info(
"Skipping cleanup of stale remote state files for cluster [{}] with uuid [{}]. Last clean was done before {} updates, which is less than threshold {}",
this.latestClusterName,
this.latestClusterUUID,
cleanUpAttemptState - lastCleanupAttemptState,
SKIP_CLEANUP_STATE_CHANGES
);
}
}

@Nullable
public ClusterMetadataManifest markLastStateAsCommitted(ClusterState clusterState, ClusterMetadataManifest previousManifest)
throws IOException {
Expand Down Expand Up @@ -539,6 +607,9 @@ public Optional<ClusterMetadataManifest> getLatestClusterMetadataManifest(String

@Override
public void close() throws IOException {
if (staleFileDeletionTask != null) {
staleFileDeletionTask.close();
}
if (blobStoreRepository != null) {
IOUtils.close(blobStoreRepository);
}
Expand All @@ -556,6 +627,9 @@ public void start() {
remoteGlobalMetadataManager = new RemoteGlobalMetadataManager(blobStoreRepository, clusterSettings);
remoteIndexMetadataManager = new RemoteIndexMetadataManager(blobStoreRepository, clusterSettings);
remoteManifestManager = new RemoteManifestManager(blobStoreRepository, clusterSettings, nodeId);
if (isClusterManagerNode) {
staleFileDeletionTask = new AsyncStaleFileDeletion(this);
}
}

private String fetchPreviousClusterUUID(String clusterName, String clusterUUID) {
Expand Down Expand Up @@ -1000,4 +1074,23 @@ public void deleteStaleClusterUUIDs(ClusterState clusterState, ClusterMetadataMa
public RemotePersistenceStats getStats() {
return remoteStateStats;
}

static final class AsyncStaleFileDeletion extends AbstractAsyncTask {
private final RemoteClusterStateService remoteClusterStateService;
AsyncStaleFileDeletion(RemoteClusterStateService remoteClusterStateService) {
super(logger, remoteClusterStateService.threadpool, remoteClusterStateService.getStaleFileCleanupInterval(), true);
this.remoteClusterStateService = remoteClusterStateService;
rescheduleIfNecessary();
}

@Override
protected boolean mustReschedule() {
return true;
}

@Override
protected void runInternal() {
remoteClusterStateService.cleanUpStaleFiles();
}
}
}
Loading

0 comments on commit 5f62745

Please sign in to comment.