Skip to content

Commit

Permalink
Fixed PR comments for GA-batcher-PR
Browse files Browse the repository at this point in the history
Signed-off-by: Gaurav Chandani <[email protected]>
  • Loading branch information
Gaurav614 committed Apr 9, 2024
1 parent 9673843 commit 5037a33
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import org.opensearch.indices.store.TransportNodesListShardStoreMetadataHelper;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.InternalSettingsPlugin;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.InternalTestCluster.RestartCallback;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope;
Expand Down Expand Up @@ -990,8 +991,8 @@ public void testBatchModeEnabled() throws Exception {
ensureGreen("test");
Settings node0DataPathSettings = internalCluster().dataPathSettings(dataOnlyNodes.get(0));
Settings node1DataPathSettings = internalCluster().dataPathSettings(dataOnlyNodes.get(1));
internalCluster().stopRandomDataNode();
internalCluster().stopRandomDataNode();
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(dataOnlyNodes.get(0)));
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(dataOnlyNodes.get(1)));
ensureRed("test");
ensureStableCluster(1);

Expand All @@ -1015,6 +1016,7 @@ public void testBatchModeEnabled() throws Exception {
ensureGreen("test");
assertEquals(0, gatewayAllocator.getNumberOfStartedShardBatches());
assertEquals(0, gatewayAllocator.getNumberOfStoreShardBatches());
assertEquals(0,gatewayAllocator.getNumberOfInFlightFetches());
}

public void testBatchModeDisabled() throws Exception {
Expand All @@ -1031,8 +1033,8 @@ public void testBatchModeDisabled() throws Exception {
ensureGreen("test");
Settings node0DataPathSettings = internalCluster().dataPathSettings(dataOnlyNodes.get(0));
Settings node1DataPathSettings = internalCluster().dataPathSettings(dataOnlyNodes.get(1));
internalCluster().stopRandomDataNode();
internalCluster().stopRandomDataNode();
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(dataOnlyNodes.get(0)));
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(dataOnlyNodes.get(1)));
ensureStableCluster(1);

logger.info("--> Now do a protective reroute");
Expand Down Expand Up @@ -1086,9 +1088,8 @@ public void testNBatchesCreationAndAssignment() throws Exception {
Settings node1DataPathSettings = internalCluster().dataPathSettings(dataOnlyNodes.get(1));

internalCluster().stopCurrentClusterManagerNode();
internalCluster().stopRandomDataNode();
internalCluster().stopRandomDataNode();

internalCluster().stopRandomNode(InternalTestCluster.nameFilter(dataOnlyNodes.get(0)));
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(dataOnlyNodes.get(1)));
// Now start cluster manager node and post that verify batches created
internalCluster().startClusterManagerOnlyNodes(
1,
Expand Down Expand Up @@ -1175,9 +1176,9 @@ public void testCulpritShardInBatch() throws Exception {
Settings node2DataPathSettings = internalCluster().dataPathSettings(dataOnlyNodes.get(2));

internalCluster().stopCurrentClusterManagerNode();
internalCluster().stopRandomDataNode();
internalCluster().stopRandomDataNode();
internalCluster().stopRandomDataNode();
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(dataOnlyNodes.get(0)));
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(dataOnlyNodes.get(1)));
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(dataOnlyNodes.get(2)));

// Now start cluster manager node and post that verify batches created
internalCluster().startClusterManagerOnlyNodes(
Expand Down Expand Up @@ -1217,7 +1218,7 @@ public void testCulpritShardInBatch() throws Exception {
internalCluster().startDataOnlyNode(Settings.builder().put("node.name", dataOnlyNodes.get(2)).put(node2DataPathSettings).build());
ensureStableCluster(4);

health = client().admin().cluster().health(Requests.clusterHealthRequest().waitForGreenStatus().timeout("10s")).actionGet();
health = client().admin().cluster().health(Requests.clusterHealthRequest().waitForGreenStatus().timeout("1m")).actionGet();

assertEquals(RED, health.getStatus());
assertTrue(health.isTimedOut());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

/**
Expand Down Expand Up @@ -112,14 +113,12 @@ public ShardsBatchGatewayAllocator(

@Override
public void cleanCaches() {
Releasables.close(
batchIdToStartedShardBatch.values().stream().map(shardsBatch -> shardsBatch.asyncBatch).collect(Collectors.toList())
);
batchIdToStartedShardBatch.clear();
Releasables.close(
batchIdToStoreShardBatch.values().stream().map(shardsBatch -> shardsBatch.asyncBatch).collect(Collectors.toList())
);
batchIdToStoreShardBatch.clear();
Stream.of(batchIdToStartedShardBatch, batchIdToStoreShardBatch).forEach(b -> {
Releasables.close(
b.values().stream().map(shardsBatch -> shardsBatch.asyncBatch).collect(Collectors.toList())
);
b.clear();
});
}

// for tests
Expand Down Expand Up @@ -230,26 +229,26 @@ protected Set<String> createAndUpdateBatches(RoutingAllocation allocation, boole
RoutingNodes.UnassignedShards unassigned = allocation.routingNodes().unassigned();
ConcurrentMap<String, ShardsBatch> currentBatches = primary ? batchIdToStartedShardBatch : batchIdToStoreShardBatch;
// get all batched shards
Map<ShardId, String> currentBatchShards = new HashMap<>();
Map<ShardId, String> currentBatchedShards = new HashMap<>();
for (Map.Entry<String, ShardsBatch> batchEntry : currentBatches.entrySet()) {
batchEntry.getValue().getBatchedShards().forEach(shardId -> currentBatchShards.put(shardId, batchEntry.getKey()));
batchEntry.getValue().getBatchedShards().forEach(shardId -> currentBatchedShards.put(shardId, batchEntry.getKey()));
}

Set<ShardRouting> shardsToBatch = Sets.newHashSet();
Set<ShardRouting> newShardsToBatch = Sets.newHashSet();
// add all unassigned shards to the batch if they are not already in a batch
unassigned.forEach(shardRouting -> {
if ((currentBatchShards.containsKey(shardRouting.shardId()) == false) && (shardRouting.primary() == primary)) {
if ((currentBatchedShards.containsKey(shardRouting.shardId()) == false) && (shardRouting.primary() == primary)) {
assert shardRouting.unassigned();
shardsToBatch.add(shardRouting);
newShardsToBatch.add(shardRouting);
}
// if shard is already batched update to latest shardRouting information in the batches
else if (shardRouting.primary() == primary) {
String batchId = currentBatchShards.get(shardRouting.shardId());
String batchId = currentBatchedShards.get(shardRouting.shardId());
batchesToBeAssigned.add(batchId);
currentBatches.get(batchId).batchInfo.get(shardRouting.shardId()).setShardRouting(shardRouting);
}
});
Iterator<ShardRouting> iterator = shardsToBatch.iterator();
Iterator<ShardRouting> iterator = newShardsToBatch.iterator();
assert maxBatchSize > 0 : "Shards batch size must be greater than 0";

long batchSize = maxBatchSize;
Expand Down

0 comments on commit 5037a33

Please sign in to comment.