Skip to content

Commit

Permalink
Resolved PR comments
Browse files Browse the repository at this point in the history
1.Stop data nodes in test cases deterministically instead of random
2.Minor rename of variables

Signed-off-by: Gaurav Chandani <[email protected]>
  • Loading branch information
Gaurav614 committed Apr 10, 2024
1 parent 52c9d1c commit 4a63473
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import org.opensearch.indices.store.TransportNodesListShardStoreMetadataHelper;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.InternalSettingsPlugin;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.InternalTestCluster.RestartCallback;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope;
Expand Down Expand Up @@ -774,8 +775,8 @@ public void testBatchModeEnabled() throws Exception {
ensureGreen("test");
Settings node0DataPathSettings = internalCluster().dataPathSettings(dataOnlyNodes.get(0));
Settings node1DataPathSettings = internalCluster().dataPathSettings(dataOnlyNodes.get(1));
internalCluster().stopRandomDataNode();
internalCluster().stopRandomDataNode();
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(dataOnlyNodes.get(0)));
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(dataOnlyNodes.get(1)));
ensureRed("test");
ensureStableCluster(1);

Expand All @@ -799,6 +800,7 @@ public void testBatchModeEnabled() throws Exception {
ensureGreen("test");
assertEquals(0, gatewayAllocator.getNumberOfStartedShardBatches());
assertEquals(0, gatewayAllocator.getNumberOfStoreShardBatches());
assertEquals(0,gatewayAllocator.getNumberOfInFlightFetches());
}

public void testBatchModeDisabled() throws Exception {
Expand All @@ -815,8 +817,8 @@ public void testBatchModeDisabled() throws Exception {
ensureGreen("test");
Settings node0DataPathSettings = internalCluster().dataPathSettings(dataOnlyNodes.get(0));
Settings node1DataPathSettings = internalCluster().dataPathSettings(dataOnlyNodes.get(1));
internalCluster().stopRandomDataNode();
internalCluster().stopRandomDataNode();
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(dataOnlyNodes.get(0)));
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(dataOnlyNodes.get(1)));
ensureStableCluster(1);

logger.info("--> Now do a protective reroute");
Expand Down Expand Up @@ -870,8 +872,8 @@ public void testNBatchesCreationAndAssignment() throws Exception {
Settings node1DataPathSettings = internalCluster().dataPathSettings(dataOnlyNodes.get(1));

internalCluster().stopCurrentClusterManagerNode();
internalCluster().stopRandomDataNode();
internalCluster().stopRandomDataNode();
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(dataOnlyNodes.get(0)));
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(dataOnlyNodes.get(1)));

// Now start cluster manager node and post that verify batches created
internalCluster().startClusterManagerOnlyNodes(
Expand Down Expand Up @@ -959,9 +961,9 @@ public void testCulpritShardInBatch() throws Exception {
Settings node2DataPathSettings = internalCluster().dataPathSettings(dataOnlyNodes.get(2));

internalCluster().stopCurrentClusterManagerNode();
internalCluster().stopRandomDataNode();
internalCluster().stopRandomDataNode();
internalCluster().stopRandomDataNode();
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(dataOnlyNodes.get(0)));
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(dataOnlyNodes.get(1)));
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(dataOnlyNodes.get(2)));

// Now start cluster manager node and post that verify batches created
internalCluster().startClusterManagerOnlyNodes(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

/**
Expand Down Expand Up @@ -108,14 +109,12 @@ public ShardsBatchGatewayAllocator(

@Override
public void cleanCaches() {
Releasables.close(
batchIdToStartedShardBatch.values().stream().map(shardsBatch -> shardsBatch.asyncBatch).collect(Collectors.toList())
);
batchIdToStartedShardBatch.clear();
Releasables.close(
batchIdToStoreShardBatch.values().stream().map(shardsBatch -> shardsBatch.asyncBatch).collect(Collectors.toList())
);
batchIdToStoreShardBatch.clear();
Stream.of(batchIdToStartedShardBatch, batchIdToStoreShardBatch).forEach(b -> {
Releasables.close(
b.values().stream().map(shardsBatch -> shardsBatch.asyncBatch).collect(Collectors.toList())
);
b.clear();
});
}

// for tests
Expand Down Expand Up @@ -226,20 +225,20 @@ protected Set<String> createAndUpdateBatches(RoutingAllocation allocation, boole
RoutingNodes.UnassignedShards unassigned = allocation.routingNodes().unassigned();
ConcurrentMap<String, ShardsBatch> currentBatches = primary ? batchIdToStartedShardBatch : batchIdToStoreShardBatch;
// get all batched shards
Map<ShardId, String> currentBatchShards = new HashMap<>();
Map<ShardId, String> currentBatchedShards = new HashMap<>();
for (Map.Entry<String, ShardsBatch> batchEntry : currentBatches.entrySet()) {
batchEntry.getValue().getBatchedShards()
.forEach(shardId -> currentBatchShards.put(shardId,
.forEach(shardId -> currentBatchedShards.put(shardId,
batchEntry.getKey()));
}

Set<ShardRouting> shardsToBatch = Sets.newHashSet();
Set<ShardRouting> newShardsToBatch = Sets.newHashSet();
Set<ShardId> batchedShardsToAssign = Sets.newHashSet();
// add all unassigned shards to the batch if they are not already in a batch
unassigned.forEach(shardRouting -> {
if ((currentBatchShards.containsKey(shardRouting.shardId()) == false) && (shardRouting.primary() == primary)) {
if ((currentBatchedShards.containsKey(shardRouting.shardId()) == false) && (shardRouting.primary() == primary)) {
assert shardRouting.unassigned();
shardsToBatch.add(shardRouting);
newShardsToBatch.add(shardRouting);
}
// if shard is already batched update to latest shardRouting information in the batches
// Replica shard assignment can be cancelled if we get a better match. These ShardRouting objects also
Expand All @@ -249,7 +248,7 @@ protected Set<String> createAndUpdateBatches(RoutingAllocation allocation, boole
// failure in executeDecision of BaseGatewayShardAllocator. Previous non-batch mode flow also used to
// pass ShardRouting object directly from unassignedIterator, so we're following the same behaviour.
else if (shardRouting.primary() == primary) {
String batchId = currentBatchShards.get(shardRouting.shardId());
String batchId = currentBatchedShards.get(shardRouting.shardId());
batchesToBeAssigned.add(batchId);
currentBatches.get(batchId).batchInfo.get(shardRouting.shardId()).setShardRouting(shardRouting);
batchedShardsToAssign.add(shardRouting.shardId());
Expand All @@ -258,11 +257,11 @@ else if (shardRouting.primary() == primary) {

refreshShardBatches(currentBatches, batchedShardsToAssign);

Iterator<ShardRouting> iterator = shardsToBatch.iterator();
Iterator<ShardRouting> iterator = newShardsToBatch.iterator();
assert maxBatchSize > 0 : "Shards batch size must be greater than 0";

long batchSize = maxBatchSize;
Map<ShardId, ShardEntry> shardsToAddToCurrentBatch = new HashMap<>();
Map<ShardId, ShardEntry> perBatchShards = new HashMap<>();
while (iterator.hasNext()) {
ShardRouting currentShard = iterator.next();
ShardEntry shardEntry = new ShardEntry(
Expand All @@ -271,17 +270,17 @@ else if (shardRouting.primary() == primary) {
),
currentShard
);
shardsToAddToCurrentBatch.put(currentShard.shardId(), shardEntry);
perBatchShards.put(currentShard.shardId(), shardEntry);
batchSize--;
iterator.remove();
// add to batch if batch size full or last shard in unassigned list
if (batchSize == 0 || iterator.hasNext() == false) {
String batchUUId = UUIDs.base64UUID();
ShardsBatch shardsBatch = new ShardsBatch(batchUUId, shardsToAddToCurrentBatch, primary);
ShardsBatch shardsBatch = new ShardsBatch(batchUUId, perBatchShards, primary);
// add the batch to list of current batches
addBatch(shardsBatch, primary);
batchesToBeAssigned.add(batchUUId);
shardsToAddToCurrentBatch.clear();
perBatchShards.clear();
batchSize = maxBatchSize;
}
}
Expand Down

0 comments on commit 4a63473

Please sign in to comment.