Skip to content

Commit

Permalink
Address PR comment
Browse files Browse the repository at this point in the history
Signed-off-by: Shivansh Arora <[email protected]>
  • Loading branch information
shiv0408 committed May 2, 2024
1 parent 1febb68 commit 8f5d7d7
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.RETAINED_MANIFESTS;
import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.SKIP_CLEANUP_STATE_CHANGES;
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
import static org.opensearch.indices.IndicesService.CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteClusterStateCleanupManagerIT extends RemoteStoreBaseIntegTestCase {
Expand Down Expand Up @@ -82,7 +83,7 @@ public void testRemoteCleanupTaskUpdated() {

public void testRemoteCleanupDeleteStale() throws Exception {
int shardCount = randomIntBetween(1, 2);
int replicaCount = 3;
int replicaCount = 1;
int dataNodeCount = shardCount * (replicaCount + 1);
int clusterManagerNodeCount = 1;

Expand All @@ -97,61 +98,48 @@ public void testRemoteCleanupDeleteStale() throws Exception {

assertTrue(response.isAcknowledged());

// update replica count to simulate cluster state changes, so that we verify number of manifest files
replicaCount = updateReplicaCountNTimes(RETAINED_MANIFESTS + 2 * SKIP_CLEANUP_STATE_CHANGES, replicaCount);
// update cluster state 21 times to ensure that clean up has run after this will upload 42 manifest files
// to repository, if manifest files are less than that it means clean up has run
updateClusterStateNTimes(RETAINED_MANIFESTS + SKIP_CLEANUP_STATE_CHANGES + 1);

RepositoriesService repositoriesService = internalCluster().getClusterManagerNodeInstance(RepositoriesService.class);
BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(REPOSITORY_NAME);
BlobPath baseMetadataPath = repository.basePath()
.add(
Base64.getUrlEncoder()
.withoutPadding()
.encodeToString(getClusterState().getClusterName().value().getBytes(StandardCharsets.UTF_8))
)
.add("cluster-state")
.add(getClusterState().metadata().clusterUUID());
BlobPath manifestContainerPath = baseMetadataPath.add("manifest");

assertBusy(() -> {
RepositoriesService repositoriesService = internalCluster().getClusterManagerNodeInstance(RepositoriesService.class);
BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(REPOSITORY_NAME);
BlobPath baseMetadataPath = repository.basePath()
.add(
Base64.getUrlEncoder()
.withoutPadding()
.encodeToString(getClusterState().getClusterName().value().getBytes(StandardCharsets.UTF_8))
)
.add("cluster-state")
.add(getClusterState().metadata().clusterUUID());
BlobPath manifestContainerPath = baseMetadataPath.add("manifest");
int manifestFiles = repository.blobStore().blobContainer(manifestContainerPath).listBlobsByPrefix("manifest").size();
logger.info("number of current manifest file: {}", manifestFiles);
// we can't guarantee that we have same number of manifest as Retained manifest in our repo as there can be other queued task
// other than replica count change which can upload new manifest files, that's why we check that number of manifests is between
// Retained manifests and Retained manifests + Skip cleanup state changes
// Retained manifests and Retained manifests + 2 * Skip cleanup state changes (each cluster state update uploads 2 manifests)
assertTrue(
"Current number of manifest files: " + manifestFiles,
manifestFiles >= RETAINED_MANIFESTS && manifestFiles <= RETAINED_MANIFESTS + SKIP_CLEANUP_STATE_CHANGES
manifestFiles >= RETAINED_MANIFESTS && manifestFiles < RETAINED_MANIFESTS + 2 * SKIP_CLEANUP_STATE_CHANGES
);
}, 5000, TimeUnit.MILLISECONDS);

RemoteClusterStateService remoteClusterStateService = internalCluster().getClusterManagerNodeInstance(
RemoteClusterStateService.class
);
Map<String, IndexMetadata> indexMetadataMap = remoteClusterStateService.getLatestClusterState(
cluster().getClusterName(),
getClusterState().metadata().clusterUUID()
).getMetadata().getIndices();
assertEquals(replicaCount, indexMetadataMap.values().stream().findFirst().get().getNumberOfReplicas());
assertEquals(shardCount, indexMetadataMap.values().stream().findFirst().get().getNumberOfShards());
}

private void setReplicaCount(int replicaCount) {
client().admin()
.indices()
.prepareUpdateSettings(INDEX_NAME)
.setSettings(Settings.builder().put(SETTING_NUMBER_OF_REPLICAS, replicaCount))
.get();
ensureGreen(INDEX_NAME);
}, 500, TimeUnit.MILLISECONDS);
}

private int updateReplicaCountNTimes(int n, int initialCount) {
private void updateClusterStateNTimes(int n) {
int newReplicaCount = randomIntBetween(0, 3);
;
for (int i = 0; i < n; i++) {
while (newReplicaCount == initialCount) {
newReplicaCount = randomIntBetween(0, 3);
}
setReplicaCount(newReplicaCount);
initialCount = newReplicaCount;
for (int i = n; i>0; i--) {
ClusterUpdateSettingsResponse response = client().admin().cluster()
.prepareUpdateSettings()
.setPersistentSettings(
Settings.builder()
.put(
CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING.getKey(),
i, TimeUnit.SECONDS
)
).get();
assertTrue(response.isAcknowledged());
}
return newReplicaCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,10 @@ public ClusterMetadataManifest markLastStateAsCommitted(ClusterState clusterStat
previousManifest.getGlobalMetadataFileName(),
true
);
remoteClusterStateCleanupManager.deleteStaleClusterUUIDs(clusterState, committedManifest);
if (!previousManifest.isClusterUUIDCommitted() && committedManifest.isClusterUUIDCommitted()) {
remoteClusterStateCleanupManager.deleteStaleClusterUUIDs(clusterState, committedManifest);
}

return committedManifest;
}

Expand Down

0 comments on commit 8f5d7d7

Please sign in to comment.