Skip to content

Commit

Permalink
Modify the Integ test
Browse files Browse the repository at this point in the history
Signed-off-by: Shivansh Arora <[email protected]>
  • Loading branch information
shiv0408 committed Apr 30, 2024
1 parent 3408bd7 commit 00a36ab
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.CLUSTER_STATE_CLEANUP_INTERVAL_DEFAULT;
import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING;
import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.RETAINED_MANIFESTS;
import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.SKIP_CLEANUP_STATE_CHANGES;
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
Expand Down Expand Up @@ -79,63 +80,47 @@ public void testRemoteCleanupTaskUpdated() {
assertEquals(1, remoteClusterStateCleanupManager.getStaleFileDeletionTask().getInterval().getMinutes());
}

public void testRemoteCleanupOnlyAfter10Updates() throws Exception {
public void testRemoteCleanupDeleteStale() throws Exception {
int shardCount = randomIntBetween(1, 2);
int replicaCount = 1;
int replicaCount = 3;
int dataNodeCount = shardCount * (replicaCount + 1);
int clusterManagerNodeCount = 1;

initialTestSetup(shardCount, replicaCount, dataNodeCount, clusterManagerNodeCount);

// set cleanup interval to 100 ms
// set cleanup interval to 100 ms to make the test faster
ClusterUpdateSettingsResponse response = client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(Settings.builder().put(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING.getKey(), "100ms"))
.get();

assertEquals(true, response.isAcknowledged());

replicaCount = updateReplicaCountNTimes(9, replicaCount);

RepositoriesService repositoriesService = internalCluster().getClusterManagerNodeInstance(RepositoriesService.class);

BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(REPOSITORY_NAME);
BlobPath baseMetadataPath = repository.basePath()
.add(
Base64.getUrlEncoder()
.withoutPadding()
.encodeToString(getClusterState().getClusterName().value().getBytes(StandardCharsets.UTF_8))
)
.add("cluster-state")
.add(getClusterState().metadata().clusterUUID());
BlobPath manifestContainerPath = baseMetadataPath.add("manifest");

assertBusy(() -> {
assertEquals(
RETAINED_MANIFESTS - 1,
repository.blobStore().blobContainer(manifestContainerPath).listBlobsByPrefix("manifest").size()
);
}, 500, TimeUnit.MILLISECONDS);

replicaCount = updateReplicaCountNTimes(8, replicaCount);

// wait for 1 min, to ensure that clean up task ran and didn't clean up stale files because it was less than 10
Thread.sleep(100);
assertNotEquals(
RETAINED_MANIFESTS - 1,
repository.blobStore().blobContainer(manifestContainerPath).listBlobsByPrefix("manifest").size()
);
assertTrue(response.isAcknowledged());

// Do 2 more updates, now since the total successful state changes are more than 10, stale files will be cleaned up
replicaCount = updateReplicaCountNTimes(2, replicaCount);
// update replica count to simulate cluster state changes, so that we verify number of manifest files
replicaCount = updateReplicaCountNTimes(RETAINED_MANIFESTS + 2 * SKIP_CLEANUP_STATE_CHANGES, replicaCount);

assertBusy(() -> {
assertEquals(
RETAINED_MANIFESTS - 1,
repository.blobStore().blobContainer(manifestContainerPath).listBlobsByPrefix("manifest").size()
RepositoriesService repositoriesService = internalCluster().getClusterManagerNodeInstance(RepositoriesService.class);
BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(REPOSITORY_NAME);
BlobPath baseMetadataPath = repository.basePath()
.add(
Base64.getUrlEncoder()
.withoutPadding()
.encodeToString(getClusterState().getClusterName().value().getBytes(StandardCharsets.UTF_8))
)
.add("cluster-state")
.add(getClusterState().metadata().clusterUUID());
BlobPath manifestContainerPath = baseMetadataPath.add("manifest");
int manifestFiles = repository.blobStore().blobContainer(manifestContainerPath).listBlobsByPrefix("manifest").size();
// we can't guarantee that we have same number of manifest as Retained manifest in our repo as there can be other queued task
// other than replica count change which can upload new manifest files, that's why we check that number of manifests is between
// Retained manifests and Retained manifests + Skip cleanup state changes
assertTrue(
"Current number of manifest files: " + manifestFiles,
manifestFiles >= RETAINED_MANIFESTS && manifestFiles <= RETAINED_MANIFESTS + SKIP_CLEANUP_STATE_CHANGES
);
}, 100, TimeUnit.MILLISECONDS);
}, 5000, TimeUnit.MILLISECONDS);

RemoteClusterStateService remoteClusterStateService = internalCluster().getClusterManagerNodeInstance(
RemoteClusterStateService.class
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ public class RemoteClusterStateCleanupManager implements Closeable {
Setting.Property.Dynamic
);
private static final Logger logger = LogManager.getLogger(RemoteClusterStateCleanupManager.class);
private RemoteClusterStateService remoteClusterStateService;
private RemotePersistenceStats remoteStateStats;
private final RemoteClusterStateService remoteClusterStateService;
private final RemotePersistenceStats remoteStateStats;
private BlobStoreTransferService blobStoreTransferService;
private volatile TimeValue staleFileCleanupInterval;
private final AtomicBoolean deleteStaleMetadataRunning = new AtomicBoolean(false);
Expand Down Expand Up @@ -239,8 +239,8 @@ public void onResponse(List<BlobMetadata> blobMetadata) {
deleteClusterMetadata(
clusterName,
clusterUUID,
blobMetadata.subList(0, manifestsToRetain - 1),
blobMetadata.subList(manifestsToRetain - 1, blobMetadata.size())
blobMetadata.subList(0, manifestsToRetain),
blobMetadata.subList(manifestsToRetain, blobMetadata.size())
);
}
deleteStaleMetadataRunning.set(false);
Expand Down Expand Up @@ -271,8 +271,8 @@ public void onFailure(Exception e) {
* @param clusterUUIDs clusteUUIDs for which the remote state needs to be purged
*/
void deleteStaleUUIDsClusterMetadata(String clusterName, List<String> clusterUUIDs) {
clusterUUIDs.forEach(clusterUUID -> {
getBlobStoreTransferService().deleteAsync(
clusterUUIDs.forEach(
clusterUUID -> getBlobStoreTransferService().deleteAsync(
ThreadPool.Names.REMOTE_PURGE,
remoteClusterStateService.getCusterMetadataBasePath(clusterName, clusterUUID),
new ActionListener<>() {
Expand All @@ -293,8 +293,8 @@ public void onFailure(Exception e) {
remoteStateStats.cleanUpAttemptFailed();
}
}
);
});
)
);
}

private void deleteStalePaths(String clusterName, String clusterUUID, List<String> stalePaths) throws IOException {
Expand Down

0 comments on commit 00a36ab

Please sign in to comment.