From 4a63473335118957a40fbb2b64b2f1a5cc1cc78b Mon Sep 17 00:00:00 2001 From: Gaurav Chandani Date: Wed, 10 Apr 2024 12:28:43 +0530 Subject: [PATCH] Resolved PR comments 1.Stop data nodes in test cases deterministically instead of random 2.Minor rename of variables Signed-off-by: Gaurav Chandani --- .../gateway/RecoveryFromGatewayIT.java | 20 +++++----- .../gateway/ShardsBatchGatewayAllocator.java | 37 +++++++++---------- 2 files changed, 29 insertions(+), 28 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java b/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java index 9abbe058b6730..2454b643ad025 100644 --- a/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java @@ -79,6 +79,7 @@ import org.opensearch.indices.store.TransportNodesListShardStoreMetadataHelper; import org.opensearch.plugins.Plugin; import org.opensearch.test.InternalSettingsPlugin; +import org.opensearch.test.InternalTestCluster; import org.opensearch.test.InternalTestCluster.RestartCallback; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope; @@ -774,8 +775,8 @@ public void testBatchModeEnabled() throws Exception { ensureGreen("test"); Settings node0DataPathSettings = internalCluster().dataPathSettings(dataOnlyNodes.get(0)); Settings node1DataPathSettings = internalCluster().dataPathSettings(dataOnlyNodes.get(1)); - internalCluster().stopRandomDataNode(); - internalCluster().stopRandomDataNode(); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(dataOnlyNodes.get(0))); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(dataOnlyNodes.get(1))); ensureRed("test"); ensureStableCluster(1); @@ -799,6 +800,7 @@ public void testBatchModeEnabled() throws Exception { ensureGreen("test"); assertEquals(0, gatewayAllocator.getNumberOfStartedShardBatches()); assertEquals(0, gatewayAllocator.getNumberOfStoreShardBatches()); + assertEquals(0,gatewayAllocator.getNumberOfInFlightFetches()); } public void testBatchModeDisabled() throws Exception { @@ -815,8 +817,8 @@ public void testBatchModeDisabled() throws Exception { ensureGreen("test"); Settings node0DataPathSettings = internalCluster().dataPathSettings(dataOnlyNodes.get(0)); Settings node1DataPathSettings = internalCluster().dataPathSettings(dataOnlyNodes.get(1)); - internalCluster().stopRandomDataNode(); - internalCluster().stopRandomDataNode(); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(dataOnlyNodes.get(0))); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(dataOnlyNodes.get(1))); ensureStableCluster(1); logger.info("--> Now do a protective reroute"); @@ -870,8 +872,8 @@ public void testNBatchesCreationAndAssignment() throws Exception { Settings node1DataPathSettings = internalCluster().dataPathSettings(dataOnlyNodes.get(1)); internalCluster().stopCurrentClusterManagerNode(); - internalCluster().stopRandomDataNode(); - internalCluster().stopRandomDataNode(); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(dataOnlyNodes.get(0))); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(dataOnlyNodes.get(1))); // Now start cluster manager node and post that verify batches created internalCluster().startClusterManagerOnlyNodes( @@ -959,9 +961,9 @@ public void testCulpritShardInBatch() throws Exception { Settings node2DataPathSettings = internalCluster().dataPathSettings(dataOnlyNodes.get(2)); internalCluster().stopCurrentClusterManagerNode(); - internalCluster().stopRandomDataNode(); - internalCluster().stopRandomDataNode(); - internalCluster().stopRandomDataNode(); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(dataOnlyNodes.get(0))); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(dataOnlyNodes.get(1))); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(dataOnlyNodes.get(2))); // Now start cluster manager node and post that verify batches created internalCluster().startClusterManagerOnlyNodes( diff --git a/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java b/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java index 5c66b3c91476f..69878d83dfa73 100644 --- a/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java @@ -53,6 +53,7 @@ import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; +import java.util.stream.Stream; import java.util.stream.StreamSupport; /** @@ -108,14 +109,12 @@ public ShardsBatchGatewayAllocator( @Override public void cleanCaches() { - Releasables.close( - batchIdToStartedShardBatch.values().stream().map(shardsBatch -> shardsBatch.asyncBatch).collect(Collectors.toList()) - ); - batchIdToStartedShardBatch.clear(); - Releasables.close( - batchIdToStoreShardBatch.values().stream().map(shardsBatch -> shardsBatch.asyncBatch).collect(Collectors.toList()) - ); - batchIdToStoreShardBatch.clear(); + Stream.of(batchIdToStartedShardBatch, batchIdToStoreShardBatch).forEach(b -> { + Releasables.close( + b.values().stream().map(shardsBatch -> shardsBatch.asyncBatch).collect(Collectors.toList()) + ); + b.clear(); + }); } // for tests @@ -226,20 +225,20 @@ protected Set createAndUpdateBatches(RoutingAllocation allocation, boole RoutingNodes.UnassignedShards unassigned = allocation.routingNodes().unassigned(); ConcurrentMap currentBatches = primary ? batchIdToStartedShardBatch : batchIdToStoreShardBatch; // get all batched shards - Map currentBatchShards = new HashMap<>(); + Map currentBatchedShards = new HashMap<>(); for (Map.Entry batchEntry : currentBatches.entrySet()) { batchEntry.getValue().getBatchedShards() - .forEach(shardId -> currentBatchShards.put(shardId, + .forEach(shardId -> currentBatchedShards.put(shardId, batchEntry.getKey())); } - Set shardsToBatch = Sets.newHashSet(); + Set newShardsToBatch = Sets.newHashSet(); Set batchedShardsToAssign = Sets.newHashSet(); // add all unassigned shards to the batch if they are not already in a batch unassigned.forEach(shardRouting -> { - if ((currentBatchShards.containsKey(shardRouting.shardId()) == false) && (shardRouting.primary() == primary)) { + if ((currentBatchedShards.containsKey(shardRouting.shardId()) == false) && (shardRouting.primary() == primary)) { assert shardRouting.unassigned(); - shardsToBatch.add(shardRouting); + newShardsToBatch.add(shardRouting); } // if shard is already batched update to latest shardRouting information in the batches // Replica shard assignment can be cancelled if we get a better match. These ShardRouting objects also @@ -249,7 +248,7 @@ protected Set createAndUpdateBatches(RoutingAllocation allocation, boole // failure in executeDecision of BaseGatewayShardAllocator. Previous non-batch mode flow also used to // pass ShardRouting object directly from unassignedIterator, so we're following the same behaviour. else if (shardRouting.primary() == primary) { - String batchId = currentBatchShards.get(shardRouting.shardId()); + String batchId = currentBatchedShards.get(shardRouting.shardId()); batchesToBeAssigned.add(batchId); currentBatches.get(batchId).batchInfo.get(shardRouting.shardId()).setShardRouting(shardRouting); batchedShardsToAssign.add(shardRouting.shardId()); @@ -258,11 +257,11 @@ else if (shardRouting.primary() == primary) { refreshShardBatches(currentBatches, batchedShardsToAssign); - Iterator iterator = shardsToBatch.iterator(); + Iterator iterator = newShardsToBatch.iterator(); assert maxBatchSize > 0 : "Shards batch size must be greater than 0"; long batchSize = maxBatchSize; - Map shardsToAddToCurrentBatch = new HashMap<>(); + Map perBatchShards = new HashMap<>(); while (iterator.hasNext()) { ShardRouting currentShard = iterator.next(); ShardEntry shardEntry = new ShardEntry( @@ -271,17 +270,17 @@ else if (shardRouting.primary() == primary) { ), currentShard ); - shardsToAddToCurrentBatch.put(currentShard.shardId(), shardEntry); + perBatchShards.put(currentShard.shardId(), shardEntry); batchSize--; iterator.remove(); // add to batch if batch size full or last shard in unassigned list if (batchSize == 0 || iterator.hasNext() == false) { String batchUUId = UUIDs.base64UUID(); - ShardsBatch shardsBatch = new ShardsBatch(batchUUId, shardsToAddToCurrentBatch, primary); + ShardsBatch shardsBatch = new ShardsBatch(batchUUId, perBatchShards, primary); // add the batch to list of current batches addBatch(shardsBatch, primary); batchesToBeAssigned.add(batchUUId); - shardsToAddToCurrentBatch.clear(); + perBatchShards.clear(); batchSize = maxBatchSize; } }