Skip to content

Commit

Permalink
Add javadocs
Browse files Browse the repository at this point in the history
Signed-off-by: Shivansh Arora <[email protected]>
  • Loading branch information
shiv0408 committed Feb 27, 2024
1 parent e3db300 commit 44da3c6
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
import org.opensearch.cluster.routing.allocation.decider.Decision;
import org.opensearch.common.collect.Tuple;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.gateway.AsyncShardFetch.FetchResult;
import org.opensearch.indices.store.TransportNodesListShardStoreMetadata;
import org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch;
import org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadata;
import org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadataBatch;
import org.opensearch.indices.store.TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata;

Expand All @@ -44,42 +47,47 @@ public abstract class ReplicaShardBatchAllocator extends ReplicaShardAllocator {
* Process existing recoveries of replicas and see if we need to cancel them if we find a better
* match. Today, a better match is one that can perform a no-op recovery while the previous recovery
* has to copy segment files.
*
* @param allocation the overall routing allocation
* @param shardBatches a list of shard batches to check for existing recoveries
*/
public void processExistingRecoveries(RoutingAllocation allocation, List<List<ShardRouting>> shardBatches) {
RoutingNodes routingNodes = allocation.routingNodes();
List<Runnable> shardCancellationActions = new ArrayList<>();
// iterate through the batches, each batch needs to be processed together as fetch call should be made for shards from same batch
for (List<ShardRouting> shardBatch : shardBatches) {
Set<ShardRouting> eligibleFetchShards = new HashSet<>();
Set<ShardRouting> eligibleShards = new HashSet<>();
Set<ShardRouting> ineligibleShards = new HashSet<>();
boolean shardMatched;
// iterate over shards to check for match for each of those
for (ShardRouting shard : shardBatch) {
shardMatched = false;
// need to iterate over all the nodes to find matching shard
for (RoutingNode routingNode : routingNodes) {
if (routingNode.getByShardId(shard.shardId()) != null) {
ShardRouting shardFromRoutingNode = routingNode.getByShardId(shard.shardId());
if (!shardFromRoutingNode.primary()) {
shardMatched = true;
if (shouldSkipFetchForRecovery(shardFromRoutingNode)) {
continue;
}
eligibleFetchShards.add(shardFromRoutingNode);
ShardRouting shardFromRoutingNode = routingNode.getByShardId(shard.shardId());
if (shardFromRoutingNode != null && !shardFromRoutingNode.primary()) {
shardMatched = true;
if (shouldSkipFetchForRecovery(shardFromRoutingNode)) {
ineligibleShards.add(shardFromRoutingNode);
continue;
}
eligibleShards.add(shardFromRoutingNode);
}
if (shardMatched) {
break;
}
}
}
AsyncShardFetch.FetchResult<NodeStoreFilesMetadataBatch> shardState = fetchData(
eligibleFetchShards,
eligibleShards,
ineligibleShards,
allocation
);
if (!shardState.hasData()) {
logger.trace("{}: fetching new stores for initializing shard batch", eligibleFetchShards);
logger.trace("{}: fetching new stores for initializing shard batch", eligibleShards);
continue; // still fetching
}
for (ShardRouting shard : eligibleFetchShards) {
for (ShardRouting shard : eligibleShards) {
Map<DiscoveryNode, StoreFilesMetadata> nodeShardStores = convertToNodeStoreFilesMetadataMap(shard, shardState);

Runnable cancellationAction = getShardCancellationAction(shard, allocation, nodeShardStores);
Expand All @@ -94,8 +102,8 @@ public void processExistingRecoveries(RoutingAllocation allocation, List<List<Sh
}

abstract protected FetchResult<NodeStoreFilesMetadataBatch> fetchData(
Set<ShardRouting> shardEligibleForFetch,
Set<ShardRouting> inEligibleShards,
Set<ShardRouting> eligibleShards,
Set<ShardRouting> ineligibleShards,
RoutingAllocation allocation
);

Expand All @@ -104,7 +112,8 @@ protected FetchResult<TransportNodesListShardStoreMetadata.NodeStoreFilesMetadat
ShardRouting shard,
RoutingAllocation allocation
) {
return null;
logger.error("fetchData for single shard called via batch allocator");
throw new IllegalStateException("ReplicaShardBatchAllocator should only be used for a batch of shards");
}

@Override
Expand All @@ -120,14 +129,13 @@ public HashMap<ShardRouting, AllocateUnassignedDecision> makeAllocationDecision(
) {
HashMap<ShardRouting, AllocateUnassignedDecision> shardAllocationDecisions = new HashMap<>();
final boolean explain = allocation.debugDecision();
final RoutingNodes routingNodes = allocation.routingNodes();
Set<ShardRouting> shardsEligibleForFetch = new HashSet<>();
Set<ShardRouting> shardsNotEligibleForFetch = new HashSet<>();
Set<ShardRouting> eligibleShards = new HashSet<>();
Set<ShardRouting> ineligibleShards = new HashSet<>();
HashMap<ShardRouting, Tuple<Decision, Map<String, NodeAllocationResult>>> nodeAllocationDecisions = new HashMap<>();
for (ShardRouting shard : shards) {
if (!isResponsibleFor(shard)) {
// this allocator n is not responsible for allocating this shard
shardsNotEligibleForFetch.add(shard);
ineligibleShards.add(shard);
shardAllocationDecisions.put(shard, AllocateUnassignedDecision.NOT_TAKEN);
continue;
}
Expand All @@ -151,21 +159,21 @@ public HashMap<ShardRouting, AllocateUnassignedDecision> makeAllocationDecision(
// so that we don't have to compute the decisions again
nodeAllocationDecisions.put(shard, result);

shardsEligibleForFetch.add(shard);
eligibleShards.add(shard);
}

// Do not call fetchData if there are no eligible shards
if (shardsEligibleForFetch.isEmpty()) {
if (eligibleShards.isEmpty()) {
return shardAllocationDecisions;
}
// only fetch data for eligible shards
final FetchResult<NodeStoreFilesMetadataBatch> shardsState = fetchData(
shardsEligibleForFetch,
shardsNotEligibleForFetch,
eligibleShards,
ineligibleShards,
allocation
);

for (ShardRouting unassignedShard : shardsEligibleForFetch) {
for (ShardRouting unassignedShard : eligibleShards) {
Tuple<Decision, Map<String, NodeAllocationResult>> result = nodeAllocationDecisions.get(unassignedShard);
shardAllocationDecisions.put(
unassignedShard,
Expand All @@ -180,25 +188,24 @@ public HashMap<ShardRouting, AllocateUnassignedDecision> makeAllocationDecision(
}
return shardAllocationDecisions;
}

private Map<DiscoveryNode, StoreFilesMetadata> convertToNodeStoreFilesMetadataMap(
ShardRouting unassignedShard,
FetchResult<NodeStoreFilesMetadataBatch> data
) {
if (!data.hasData()) {
// if we don't have any data, return null
return null;
ShardRouting unassignedShard,
FetchResult<NodeStoreFilesMetadataBatch> data) {

if(!data.hasData()) {
return new HashMap<>();
}
return new HashMap<>(
data.getData()
.entrySet()
.stream()
.collect(
Collectors.toMap(
Map.Entry::getKey,
entry -> entry.getValue().getNodeStoreFilesMetadataBatch().get(unassignedShard.shardId()).storeFilesMetadata()
)
)
);

Map<DiscoveryNode, StoreFilesMetadata> map = new HashMap<>();

data.getData().forEach((key, value) -> {
Map<ShardId, NodeStoreFilesMetadata> batch = value.getNodeStoreFilesMetadataBatch();
NodeStoreFilesMetadata metadata = batch.get(unassignedShard.shardId());
if (metadata != null && metadata.getStoreFileFetchException() == null) {
map.put(key, metadata.storeFilesMetadata());
}
});

return map;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -814,12 +814,12 @@ public TestBatchAllocator addData(

@Override
protected AsyncShardFetch.FetchResult<NodeStoreFilesMetadataBatch> fetchData(
Set<ShardRouting> shardEligibleForFetch,
Set<ShardRouting> inEligibleShards,
Set<ShardRouting> eligibleShards,
Set<ShardRouting> ineligibleShards,
RoutingAllocation allocation
) {
fetchDataCalled.set(true);
eligibleShardFetchDataCount.set(shardEligibleForFetch.size());
eligibleShardFetchDataCount.set(eligibleShards.size());
Map<DiscoveryNode, NodeStoreFilesMetadataBatch> tData = null;
if (data != null) {
tData = new HashMap<>();
Expand Down

0 comments on commit 44da3c6

Please sign in to comment.