From 44da3c649c77b5a4ffe9bafddeb75a5e8e84901d Mon Sep 17 00:00:00 2001 From: Shivansh Arora Date: Wed, 28 Feb 2024 05:10:27 +0530 Subject: [PATCH] Add javadocs Signed-off-by: Shivansh Arora --- .../gateway/ReplicaShardBatchAllocator.java | 91 ++++++++++--------- .../ReplicaShardBatchAllocatorTest.java | 6 +- 2 files changed, 52 insertions(+), 45 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/ReplicaShardBatchAllocator.java b/server/src/main/java/org/opensearch/gateway/ReplicaShardBatchAllocator.java index e057fa354ece0..fbfde89f1705d 100644 --- a/server/src/main/java/org/opensearch/gateway/ReplicaShardBatchAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/ReplicaShardBatchAllocator.java @@ -19,8 +19,11 @@ import org.opensearch.cluster.routing.allocation.RoutingAllocation; import org.opensearch.cluster.routing.allocation.decider.Decision; import org.opensearch.common.collect.Tuple; +import org.opensearch.core.index.shard.ShardId; import org.opensearch.gateway.AsyncShardFetch.FetchResult; import org.opensearch.indices.store.TransportNodesListShardStoreMetadata; +import org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch; +import org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadata; import org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadataBatch; import org.opensearch.indices.store.TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata; @@ -44,26 +47,31 @@ public abstract class ReplicaShardBatchAllocator extends ReplicaShardAllocator { * Process existing recoveries of replicas and see if we need to cancel them if we find a better * match. Today, a better match is one that can perform a no-op recovery while the previous recovery * has to copy segment files. + * + * @param allocation the overall routing allocation + * @param shardBatches a list of shard batches to check for existing recoveries */ public void processExistingRecoveries(RoutingAllocation allocation, List> shardBatches) { RoutingNodes routingNodes = allocation.routingNodes(); List shardCancellationActions = new ArrayList<>(); + // iterate through the batches, each batch needs to be processed together as fetch call should be made for shards from same batch for (List shardBatch : shardBatches) { - Set eligibleFetchShards = new HashSet<>(); + Set eligibleShards = new HashSet<>(); Set ineligibleShards = new HashSet<>(); boolean shardMatched; + // iterate over shards to check for match for each of those for (ShardRouting shard : shardBatch) { shardMatched = false; + // need to iterate over all the nodes to find matching shard for (RoutingNode routingNode : routingNodes) { - if (routingNode.getByShardId(shard.shardId()) != null) { - ShardRouting shardFromRoutingNode = routingNode.getByShardId(shard.shardId()); - if (!shardFromRoutingNode.primary()) { - shardMatched = true; - if (shouldSkipFetchForRecovery(shardFromRoutingNode)) { - continue; - } - eligibleFetchShards.add(shardFromRoutingNode); + ShardRouting shardFromRoutingNode = routingNode.getByShardId(shard.shardId()); + if (shardFromRoutingNode != null && !shardFromRoutingNode.primary()) { + shardMatched = true; + if (shouldSkipFetchForRecovery(shardFromRoutingNode)) { + ineligibleShards.add(shardFromRoutingNode); + continue; } + eligibleShards.add(shardFromRoutingNode); } if (shardMatched) { break; @@ -71,15 +79,15 @@ public void processExistingRecoveries(RoutingAllocation allocation, List shardState = fetchData( - eligibleFetchShards, + eligibleShards, ineligibleShards, allocation ); if (!shardState.hasData()) { - logger.trace("{}: fetching new stores for initializing shard batch", eligibleFetchShards); + logger.trace("{}: fetching new stores for initializing shard batch", eligibleShards); continue; // still fetching } - for (ShardRouting shard : eligibleFetchShards) { + for (ShardRouting shard : eligibleShards) { Map nodeShardStores = convertToNodeStoreFilesMetadataMap(shard, shardState); Runnable cancellationAction = getShardCancellationAction(shard, allocation, nodeShardStores); @@ -94,8 +102,8 @@ public void processExistingRecoveries(RoutingAllocation allocation, List fetchData( - Set shardEligibleForFetch, - Set inEligibleShards, + Set eligibleShards, + Set ineligibleShards, RoutingAllocation allocation ); @@ -104,7 +112,8 @@ protected FetchResult makeAllocationDecision( ) { HashMap shardAllocationDecisions = new HashMap<>(); final boolean explain = allocation.debugDecision(); - final RoutingNodes routingNodes = allocation.routingNodes(); - Set shardsEligibleForFetch = new HashSet<>(); - Set shardsNotEligibleForFetch = new HashSet<>(); + Set eligibleShards = new HashSet<>(); + Set ineligibleShards = new HashSet<>(); HashMap>> nodeAllocationDecisions = new HashMap<>(); for (ShardRouting shard : shards) { if (!isResponsibleFor(shard)) { // this allocator n is not responsible for allocating this shard - shardsNotEligibleForFetch.add(shard); + ineligibleShards.add(shard); shardAllocationDecisions.put(shard, AllocateUnassignedDecision.NOT_TAKEN); continue; } @@ -151,21 +159,21 @@ public HashMap makeAllocationDecision( // so that we don't have to compute the decisions again nodeAllocationDecisions.put(shard, result); - shardsEligibleForFetch.add(shard); + eligibleShards.add(shard); } // Do not call fetchData if there are no eligible shards - if (shardsEligibleForFetch.isEmpty()) { + if (eligibleShards.isEmpty()) { return shardAllocationDecisions; } // only fetch data for eligible shards final FetchResult shardsState = fetchData( - shardsEligibleForFetch, - shardsNotEligibleForFetch, + eligibleShards, + ineligibleShards, allocation ); - for (ShardRouting unassignedShard : shardsEligibleForFetch) { + for (ShardRouting unassignedShard : eligibleShards) { Tuple> result = nodeAllocationDecisions.get(unassignedShard); shardAllocationDecisions.put( unassignedShard, @@ -180,25 +188,24 @@ public HashMap makeAllocationDecision( } return shardAllocationDecisions; } - private Map convertToNodeStoreFilesMetadataMap( - ShardRouting unassignedShard, - FetchResult data - ) { - if (!data.hasData()) { - // if we don't have any data, return null - return null; + ShardRouting unassignedShard, + FetchResult data) { + + if(!data.hasData()) { + return new HashMap<>(); } - return new HashMap<>( - data.getData() - .entrySet() - .stream() - .collect( - Collectors.toMap( - Map.Entry::getKey, - entry -> entry.getValue().getNodeStoreFilesMetadataBatch().get(unassignedShard.shardId()).storeFilesMetadata() - ) - ) - ); + + Map map = new HashMap<>(); + + data.getData().forEach((key, value) -> { + Map batch = value.getNodeStoreFilesMetadataBatch(); + NodeStoreFilesMetadata metadata = batch.get(unassignedShard.shardId()); + if (metadata != null && metadata.getStoreFileFetchException() == null) { + map.put(key, metadata.storeFilesMetadata()); + } + }); + + return map; } } diff --git a/server/src/test/java/org/opensearch/gateway/ReplicaShardBatchAllocatorTest.java b/server/src/test/java/org/opensearch/gateway/ReplicaShardBatchAllocatorTest.java index 70045a6388342..c76d0e5449bd7 100644 --- a/server/src/test/java/org/opensearch/gateway/ReplicaShardBatchAllocatorTest.java +++ b/server/src/test/java/org/opensearch/gateway/ReplicaShardBatchAllocatorTest.java @@ -814,12 +814,12 @@ public TestBatchAllocator addData( @Override protected AsyncShardFetch.FetchResult fetchData( - Set shardEligibleForFetch, - Set inEligibleShards, + Set eligibleShards, + Set ineligibleShards, RoutingAllocation allocation ) { fetchDataCalled.set(true); - eligibleShardFetchDataCount.set(shardEligibleForFetch.size()); + eligibleShardFetchDataCount.set(eligibleShards.size()); Map tData = null; if (data != null) { tData = new HashMap<>();