From 52c9d1cc2c60a05cd0765a79e8da63ad95aec290 Mon Sep 17 00:00:00 2001 From: Aman Khare Date: Thu, 4 Apr 2024 12:45:56 +0530 Subject: [PATCH] Remove shards from batches if they are not present in unassigned list from allocation object Signed-off-by: Aman Khare --- .../gateway/RecoveryFromGatewayIT.java | 49 +++++++++++++++++++ .../gateway/ShardsBatchGatewayAllocator.java | 28 ++++++++++- 2 files changed, 76 insertions(+), 1 deletion(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java b/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java index 5e84ce18ff243..9abbe058b6730 100644 --- a/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java @@ -42,11 +42,13 @@ import org.opensearch.action.admin.cluster.reroute.ClusterRerouteResponse; import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsGroup; import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsResponse; +import org.opensearch.action.admin.indices.exists.indices.IndicesExistsResponse; import org.opensearch.action.admin.indices.recovery.RecoveryResponse; import org.opensearch.action.admin.indices.stats.IndexStats; import org.opensearch.action.admin.indices.stats.IndicesStatsResponse; import org.opensearch.action.admin.indices.stats.ShardStats; import org.opensearch.action.support.ActionTestUtils; +import org.opensearch.action.support.master.AcknowledgedResponse; import org.opensearch.client.Requests; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.coordination.ElectionSchedulerFactory; @@ -1187,6 +1189,53 @@ public void testShardStoreFetchCorruptedIndexUsingBatchAction() throws Exception assertNodeStoreFilesMetadataSuccessCase(nodeStoreFilesMetadata.get(shardId2), shardId2); } + public void testDeleteRedIndexInBatchMode() throws Exception { + internalCluster().startClusterManagerOnlyNodes( + 1, + Settings.builder().put(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.getKey(), true).build() + ); + List dataOnlyNodes = internalCluster().startDataOnlyNodes(2); + createIndex( + "test", + Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 2).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build() + ); + createIndex( + "test1", + Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 2).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build() + ); + createIndex( + "test2", + Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 2).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build() + ); + createIndex( + "testg", + Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build() + ); + + ensureGreen("test", "test1", "test2", "testg"); + internalCluster().stopRandomDataNode(); + ensureStableCluster(2); + + ShardsBatchGatewayAllocator gatewayAllocator = internalCluster().getInstance( + ShardsBatchGatewayAllocator.class, + internalCluster().getClusterManagerName() + ); + ensureRed("test", "test1", "test2"); + + assertTrue(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.get(internalCluster().clusterService().getSettings())); + + logger.info("--> Now do a reroute so batches are created"); // to avoid any race condition in test + ClusterRerouteResponse clusterRerouteResponse = client().admin().cluster().prepareReroute().setRetryFailed(true).get(); + assertTrue(clusterRerouteResponse.isAcknowledged()); + + AcknowledgedResponse deleteIndexResponse = client().admin().indices().prepareDelete("test").get(); + assertTrue(deleteIndexResponse.isAcknowledged()); + + ensureYellow("testg"); + IndicesExistsResponse indexExistResponse = client().admin().indices().prepareExists("test").get(); + assertFalse(indexExistResponse.isExists()); + } + private void prepareIndices(String[] indices, int numberOfPrimaryShards, int numberOfReplicaShards) { for (String index : indices) { createIndex( diff --git a/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java b/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java index 9d327e351338a..5c66b3c91476f 100644 --- a/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java @@ -234,6 +234,7 @@ protected Set createAndUpdateBatches(RoutingAllocation allocation, boole } Set shardsToBatch = Sets.newHashSet(); + Set batchedShardsToAssign = Sets.newHashSet(); // add all unassigned shards to the batch if they are not already in a batch unassigned.forEach(shardRouting -> { if ((currentBatchShards.containsKey(shardRouting.shardId()) == false) && (shardRouting.primary() == primary)) { @@ -251,8 +252,12 @@ else if (shardRouting.primary() == primary) { String batchId = currentBatchShards.get(shardRouting.shardId()); batchesToBeAssigned.add(batchId); currentBatches.get(batchId).batchInfo.get(shardRouting.shardId()).setShardRouting(shardRouting); + batchedShardsToAssign.add(shardRouting.shardId()); } }); + + refreshShardBatches(currentBatches, batchedShardsToAssign); + Iterator iterator = shardsToBatch.iterator(); assert maxBatchSize > 0 : "Shards batch size must be greater than 0"; @@ -283,6 +288,23 @@ else if (shardRouting.primary() == primary) { return batchesToBeAssigned; } + private void refreshShardBatches(ConcurrentMap currentBatches, Set batchedShardsToAssign) { + // cleanup shard from batches if they are not present in unassigned list from allocation object. This is + // needed as AllocationService.reroute can also be called directly by API flows for example DeleteIndices. + // So, as part of calling reroute, those shards will be removed from allocation object. It'll handle the + // scenarios where shards can be removed from unassigned list without "start" or "failed" event. + for (Map.Entry batchEntry : currentBatches.entrySet()) { + Iterator shardIdIterator = batchEntry.getValue().getBatchedShards().iterator(); + while (shardIdIterator.hasNext()) { + ShardId shardId = shardIdIterator.next(); + if (batchedShardsToAssign.contains(shardId) == false) { + shardIdIterator.remove(); + batchEntry.getValue().clearShardFromCache(shardId); + } + } + } + } + private void addBatch(ShardsBatch shardsBatch, boolean primary) { ConcurrentMap batches = primary ? batchIdToStartedShardBatch : batchIdToStoreShardBatch; if (batches.containsKey(shardsBatch.getBatchId())) { @@ -640,11 +662,15 @@ private TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadata buildEm private void removeFromBatch(ShardRouting shard) { removeShard(shard.shardId()); - asyncBatch.clearShard(shard.shardId()); + clearShardFromCache(shard.shardId()); // assert that fetcher and shards are the same as batched shards assert batchInfo.size() == asyncBatch.shardAttributesMap.size() : "Shards size is not equal to fetcher size"; } + private void clearShardFromCache(ShardId shardId) { + asyncBatch.clearShard(shardId); + } + public List getBatchedShardRoutings() { return batchInfo.values().stream().map(ShardEntry::getShardRouting).collect(Collectors.toList()); }