diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateServiceIT.java b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateServiceIT.java index 4fc6550f2a3a6..d556206b8d341 100644 --- a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateServiceIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateServiceIT.java @@ -23,11 +23,14 @@ import java.nio.charset.StandardCharsets; import java.util.Base64; import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; +import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING; import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING; +import static org.opensearch.gateway.remote.RemoteClusterStateService.RETAINED_MANIFESTS; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.METADATA_FILE_PREFIX; import static org.opensearch.gateway.remote.RemoteGlobalMetadataManager.COORDINATION_METADATA; @@ -63,27 +66,62 @@ private Map initialTestSetup(int shardCount, int replicaCount, int return indexStats; } - public void testFullClusterRestoreStaleDelete() throws Exception { + public void testRemoteCleanupTaskUpdated() { int shardCount = randomIntBetween(1, 2); int replicaCount = 1; int dataNodeCount = shardCount * (replicaCount + 1); int clusterManagerNodeCount = 1; initialTestSetup(shardCount, replicaCount, dataNodeCount, clusterManagerNodeCount); - setReplicaCount(0); - setReplicaCount(2); - setReplicaCount(0); - setReplicaCount(1); - setReplicaCount(0); - setReplicaCount(1); - setReplicaCount(0); - setReplicaCount(2); - setReplicaCount(0); + RemoteClusterStateService remoteClusterStateService = internalCluster().getClusterManagerNodeInstance( + RemoteClusterStateService.class + ); + + assertEquals( + 5, + remoteClusterStateService.getStaleFileDeletionTask().getInterval().getMinutes() + ); + assertTrue(remoteClusterStateService.getStaleFileDeletionTask().isScheduled()); + + // now disable + client().admin().cluster().prepareUpdateSettings() + .setPersistentSettings(Settings.builder().put(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING.getKey(), -1)) + .get(); + assertEquals( + -1, + remoteClusterStateService.getStaleFileDeletionTask().getInterval().getMillis() + ); + assertFalse(remoteClusterStateService.getStaleFileDeletionTask().isScheduled()); + + // now set Clean up interval to 1 min + client().admin().cluster().prepareUpdateSettings() + .setPersistentSettings(Settings.builder().put(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING.getKey(), "1m")) + .get(); + assertEquals( + 1, + remoteClusterStateService.getStaleFileDeletionTask().getInterval().getMinutes() + ); + } + + public void testRemoteCleanupOnlyAfter10Updates() throws Exception { + int shardCount = randomIntBetween(1, 2); + int replicaCount = 1; + int dataNodeCount = shardCount * (replicaCount + 1); + int clusterManagerNodeCount = 1; + + initialTestSetup(shardCount, replicaCount, dataNodeCount, clusterManagerNodeCount); RemoteClusterStateService remoteClusterStateService = internalCluster().getClusterManagerNodeInstance( RemoteClusterStateService.class ); + // set cleanup interval to 1 min + client().admin().cluster().prepareUpdateSettings() + .setPersistentSettings(Settings.builder().put(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING.getKey(), "1m")) + .get(); + + replicaCount = updateReplicaCountNTimes(9, replicaCount); + RepositoriesService repositoriesService = internalCluster().getClusterManagerNodeInstance(RepositoriesService.class); BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(REPOSITORY_NAME); @@ -95,14 +133,30 @@ public void testFullClusterRestoreStaleDelete() throws Exception { ) .add("cluster-state") .add(getClusterState().metadata().clusterUUID()); + BlobPath manifestContainerPath = baseMetadataPath.add("manifest"); - assertEquals(10, repository.blobStore().blobContainer(baseMetadataPath.add("manifest")).listBlobsByPrefix("manifest").size()); + assertBusy(() -> { + assertEquals(RETAINED_MANIFESTS - 1, repository.blobStore().blobContainer(manifestContainerPath).listBlobsByPrefix("manifest").size()); + }, 1, TimeUnit.MINUTES); + + replicaCount = updateReplicaCountNTimes(8, replicaCount); + + // wait for 1 min, to ensure that clean up task ran and didn't clean up stale files because it was less than 10 + Thread.sleep(60000); + assertNotEquals(RETAINED_MANIFESTS - 1, repository.blobStore().blobContainer(manifestContainerPath).listBlobsByPrefix("manifest").size()); + + // Do 2 more updates, now since the total successful state changes are more than 10, stale files will be cleaned up + replicaCount = updateReplicaCountNTimes(2, replicaCount); + + assertBusy(() -> { + assertEquals(RETAINED_MANIFESTS - 1, repository.blobStore().blobContainer(manifestContainerPath).listBlobsByPrefix("manifest").size()); + }, 1, TimeUnit.MINUTES); Map indexMetadataMap = remoteClusterStateService.getLatestClusterState( cluster().getClusterName(), getClusterState().metadata().clusterUUID() ).getMetadata().getIndices(); - assertEquals(0, indexMetadataMap.values().stream().findFirst().get().getNumberOfReplicas()); + assertEquals(replicaCount, indexMetadataMap.values().stream().findFirst().get().getNumberOfReplicas()); assertEquals(shardCount, indexMetadataMap.values().stream().findFirst().get().getNumberOfShards()); } @@ -243,4 +297,16 @@ private void setReplicaCount(int replicaCount) { .setSettings(Settings.builder().put(SETTING_NUMBER_OF_REPLICAS, replicaCount)) .get(); } + + private int updateReplicaCountNTimes(int n, int initialCount) { + int newReplicaCount = randomIntBetween(0, 3);; + for (int i = 0; i < n; i++) { + while (newReplicaCount == initialCount) { + newReplicaCount = randomIntBetween(0, 3); + } + setReplicaCount(newReplicaCount); + initialCount = newReplicaCount; + } + return newReplicaCount; + } } diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 81ee55fcc31d4..b18832e7ada41 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -706,6 +706,7 @@ public void apply(Settings value, Settings current, Settings previous) { // Remote cluster state settings RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING, + RemoteClusterStateService.REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING, RemoteIndexMetadataManager.INDEX_METADATA_UPLOAD_TIMEOUT_SETTING, RemoteGlobalMetadataManager.GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING, RemoteManifestManager.METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING, diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 1f7f40b8fb598..54295fe4fb183 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -15,6 +15,7 @@ import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.CheckedRunnable; import org.opensearch.common.Nullable; import org.opensearch.common.blobstore.BlobContainer; @@ -25,6 +26,7 @@ import org.opensearch.common.settings.Setting.Property; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.concurrent.AbstractAsyncTask; import org.opensearch.common.util.io.IOUtils; import org.opensearch.core.action.ActionListener; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata; @@ -85,6 +87,7 @@ */ public class RemoteClusterStateService implements Closeable { public static final int RETAINED_MANIFESTS = 10; + public static final int SKIP_CLEANUP_STATE_CHANGES = 10; private static final Logger logger = LogManager.getLogger(RemoteClusterStateService.class); @@ -98,6 +101,17 @@ public class RemoteClusterStateService implements Closeable { Property.Final ); + /** + * Setting to specify the interval to do run stale file cleanup job + */ + public static final Setting REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING = Setting.timeSetting( + "cluster.remote_store.state.cleanup_interval", + TimeValue.timeValueMinutes(5), + TimeValue.timeValueMillis(-1), + Property.NodeScope, + Property.Dynamic + ); + private final String nodeId; private final Supplier repositoriesService; private final Settings settings; @@ -113,7 +127,13 @@ public class RemoteClusterStateService implements Closeable { private RemoteGlobalMetadataManager remoteGlobalMetadataManager; private RemoteManifestManager remoteManifestManager; private ClusterSettings clusterSettings; + private TimeValue staleFileCleanupInterval; + private AsyncStaleFileDeletion staleFileDeletionTask; public static final int INDEX_METADATA_CURRENT_CODEC_VERSION = 1; + private String latestClusterName; + private String latestClusterUUID; + private long lastCleanupAttemptState; + private boolean isClusterManagerNode; public RemoteClusterStateService( String nodeId, @@ -133,6 +153,10 @@ public RemoteClusterStateService( this.slowWriteLoggingThreshold = clusterSettings.get(SLOW_WRITE_LOGGING_THRESHOLD); clusterSettings.addSettingsUpdateConsumer(SLOW_WRITE_LOGGING_THRESHOLD, this::setSlowWriteLoggingThreshold); this.remoteStateStats = new RemotePersistenceStats(); + this.staleFileCleanupInterval = clusterSettings.get(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING); + clusterSettings.addSettingsUpdateConsumer(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING, this::updateCleanupInterval); + this.lastCleanupAttemptState = 0; + this.isClusterManagerNode = DiscoveryNode.isClusterManagerNode(settings); } private BlobStoreTransferService getBlobStoreTransferService() { @@ -314,7 +338,8 @@ public ClusterMetadataManifest writeIncrementalMetadata( firstUpload || !customsToUpload.isEmpty() ? allUploadedCustomMap : previousManifest.getCustomMetadataMap(), false ); - deleteStaleClusterMetadata(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID(), RETAINED_MANIFESTS); + this.latestClusterName = clusterState.getClusterName().value(); + this.latestClusterUUID = clusterState.metadata().clusterUUID(); final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos); remoteStateStats.stateSucceeded(); @@ -503,6 +528,49 @@ private UploadedMetadataResults writeMetadataInParallel( return response; } + public TimeValue getStaleFileCleanupInterval() { + return this.staleFileCleanupInterval; + } + + AsyncStaleFileDeletion getStaleFileDeletionTask() { // for testing + return this.staleFileDeletionTask; + } + + private void updateCleanupInterval(TimeValue updatedInterval) { + if (!isClusterManagerNode) return; + this.staleFileCleanupInterval = updatedInterval; + logger.info("updated remote state cleanup interval to {}", updatedInterval); + // After updating the interval, we need to close the current task and create a new one which will run with updated interval + if (!this.staleFileDeletionTask.getInterval().equals(updatedInterval)) { + this.staleFileDeletionTask.setInterval(updatedInterval); + } + } + + private void cleanUpStaleFiles() { + long cleanUpAttemptState = remoteStateStats.getSuccessCount(); + if ( + cleanUpAttemptState - lastCleanupAttemptState > SKIP_CLEANUP_STATE_CHANGES && + this.latestClusterName != null && this.latestClusterUUID != null + ) { + logger.info( + "Cleaning up stale remote state files for cluster [{}] with uuid [{}]. Last clean was done before {} updates", + this.latestClusterName, + this.latestClusterUUID, + cleanUpAttemptState - lastCleanupAttemptState + ); + deleteStaleClusterMetadata(this.latestClusterName, this.latestClusterUUID, RETAINED_MANIFESTS); + lastCleanupAttemptState = cleanUpAttemptState; + } else { + logger.info( + "Skipping cleanup of stale remote state files for cluster [{}] with uuid [{}]. Last clean was done before {} updates, which is less than threshold {}", + this.latestClusterName, + this.latestClusterUUID, + cleanUpAttemptState - lastCleanupAttemptState, + SKIP_CLEANUP_STATE_CHANGES + ); + } + } + @Nullable public ClusterMetadataManifest markLastStateAsCommitted(ClusterState clusterState, ClusterMetadataManifest previousManifest) throws IOException { @@ -539,6 +607,9 @@ public Optional getLatestClusterMetadataManifest(String @Override public void close() throws IOException { + if (staleFileDeletionTask != null) { + staleFileDeletionTask.close(); + } if (blobStoreRepository != null) { IOUtils.close(blobStoreRepository); } @@ -556,6 +627,9 @@ public void start() { remoteGlobalMetadataManager = new RemoteGlobalMetadataManager(blobStoreRepository, clusterSettings); remoteIndexMetadataManager = new RemoteIndexMetadataManager(blobStoreRepository, clusterSettings); remoteManifestManager = new RemoteManifestManager(blobStoreRepository, clusterSettings, nodeId); + if (isClusterManagerNode) { + staleFileDeletionTask = new AsyncStaleFileDeletion(this); + } } private String fetchPreviousClusterUUID(String clusterName, String clusterUUID) { @@ -1000,4 +1074,23 @@ public void deleteStaleClusterUUIDs(ClusterState clusterState, ClusterMetadataMa public RemotePersistenceStats getStats() { return remoteStateStats; } + + static final class AsyncStaleFileDeletion extends AbstractAsyncTask { + private final RemoteClusterStateService remoteClusterStateService; + AsyncStaleFileDeletion(RemoteClusterStateService remoteClusterStateService) { + super(logger, remoteClusterStateService.threadpool, remoteClusterStateService.getStaleFileCleanupInterval(), true); + this.remoteClusterStateService = remoteClusterStateService; + rescheduleIfNecessary(); + } + + @Override + protected boolean mustReschedule() { + return true; + } + + @Override + protected void runInternal() { + remoteClusterStateService.cleanUpStaleFiles(); + } + } } diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java index 0327ddc887f2e..4636d42fb0799 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -33,6 +33,7 @@ import org.opensearch.common.network.NetworkModule; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.concurrent.AbstractAsyncTask; import org.opensearch.core.ParseField; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.bytes.BytesArray; @@ -84,6 +85,7 @@ import org.mockito.ArgumentMatchers; import static java.util.stream.Collectors.toList; +import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING; import static org.opensearch.gateway.remote.RemoteClusterStateService.RETAINED_MANIFESTS; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.FORMAT_PARAMS; @@ -93,6 +95,7 @@ import static org.opensearch.gateway.remote.RemoteGlobalMetadataManager.TEMPLATES_METADATA; import static org.opensearch.gateway.remote.RemoteManifestManager.MANIFEST_CURRENT_CODEC_VERSION; import static org.opensearch.gateway.remote.RemoteManifestManager.MANIFEST_FILE_PREFIX; +import static org.opensearch.node.NodeRoleSettings.NODE_ROLES_SETTING; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT; @@ -109,6 +112,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.opensearch.test.OpenSearchIntegTestCase.client; public class RemoteClusterStateServiceTests extends OpenSearchTestCase { @@ -1310,6 +1314,52 @@ public void testSingleConcurrentExecutionOfStaleManifestCleanup() throws Excepti assertBusy(() -> assertEquals(1, callCount.get())); } + public void testRemoteCleanupTaskScheduled() { + AbstractAsyncTask cleanupTask = remoteClusterStateService.getStaleFileDeletionTask(); + assertNull(cleanupTask); + + remoteClusterStateService.start(); + assertNotNull(remoteClusterStateService.getStaleFileDeletionTask()); + assertTrue(remoteClusterStateService.getStaleFileDeletionTask().mustReschedule()); + assertEquals( + clusterSettings.get(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING), + remoteClusterStateService.getStaleFileDeletionTask().getInterval() + ); + assertTrue(remoteClusterStateService.getStaleFileDeletionTask().isScheduled()); + assertFalse(remoteClusterStateService.getStaleFileDeletionTask().isClosed()); + } + + public void testRemoteCleanupNotInitializedOnDataOnlyNode() { + String stateRepoTypeAttributeKey = String.format( + Locale.getDefault(), + "node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, + "remote_store_repository" + ); + String stateRepoSettingsAttributeKeyPrefix = String.format( + Locale.getDefault(), + "node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX, + "remote_store_repository" + ); + Settings settings = Settings.builder() + .put("node.attr." + REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, "remote_store_repository") + .putList(NODE_ROLES_SETTING.getKey(), "d") + .put(stateRepoTypeAttributeKey, FsRepository.TYPE) + .put(stateRepoSettingsAttributeKeyPrefix + "location", "randomRepoPath") + .put(RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true) + .build(); + ClusterSettings dataNodeClusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + remoteClusterStateService = new RemoteClusterStateService( + "test-node-id", + repositoriesServiceSupplier, + settings, + dataNodeClusterSettings, + () -> 0L, + threadPool + ); + remoteClusterStateService.start(); + assertNull(remoteClusterStateService.getStaleFileDeletionTask()); + } + private void mockObjectsForGettingPreviousClusterUUID(Map clusterUUIDsPointers) throws IOException { mockObjectsForGettingPreviousClusterUUID(clusterUUIDsPointers, false, Collections.emptyMap()); }