Skip to content

Commit

Permalink
Use List instead of Set, adding more comments
Browse files Browse the repository at this point in the history
Signed-off-by: Shivansh Arora <[email protected]>
  • Loading branch information
shiv0408 committed Feb 27, 2024
1 parent b6b7d2f commit 698247c
Show file tree
Hide file tree
Showing 2 changed files with 243 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
import org.opensearch.cluster.routing.allocation.decider.Decision;
import org.opensearch.common.collect.Tuple;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.gateway.AsyncShardFetch.FetchResult;
import org.opensearch.indices.store.StoreFilesMetadata;
import org.opensearch.indices.store.TransportNodesListShardStoreMetadata;
import org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch;
import org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadata;
import org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadataBatch;
import org.opensearch.indices.store.TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata;

import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -45,38 +48,48 @@ public abstract class ReplicaShardBatchAllocator extends ReplicaShardAllocator {
* Process existing recoveries of replicas and see if we need to cancel them if we find a better
* match. Today, a better match is one that can perform a no-op recovery while the previous recovery
* has to copy segment files.
*
* @param allocation the overall routing allocation
* @param shardBatches a list of shard batches to check for existing recoveries
*/
public void processExistingRecoveries(RoutingAllocation allocation, List<Set<ShardRouting>> shardBatches) {
public void processExistingRecoveries(RoutingAllocation allocation, List<List<ShardRouting>> shardBatches) {
RoutingNodes routingNodes = allocation.routingNodes();
List<Runnable> shardCancellationActions = new ArrayList<>();
for (Set<ShardRouting> shardBatch : shardBatches) {
Set<ShardRouting> eligibleFetchShards = new HashSet<>();
// iterate through the batches, each batch needs to be processed together as fetch call should be made for shards from same batch
for (List<ShardRouting> shardBatch : shardBatches) {
Set<ShardRouting> eligibleShards = new HashSet<>();
Set<ShardRouting> ineligibleShards = new HashSet<>();
boolean shardMatched;
// iterate over shards to check for match for each of those
for (ShardRouting shard : shardBatch) {
shardMatched = false;
// need to iterate over all the nodes to find matching shard
for (RoutingNode routingNode : routingNodes) {
if (routingNode.getByShardId(shard.shardId()) != null) {
ShardRouting shardFromRoutingNode = routingNode.getByShardId(shard.shardId());
if (!shardFromRoutingNode.primary()){
shardMatched = true;
if (shouldSkipFetchForRecovery(shardFromRoutingNode)) {
continue;
}
eligibleFetchShards.add(shardFromRoutingNode);
ShardRouting shardFromRoutingNode = routingNode.getByShardId(shard.shardId());
if (shardFromRoutingNode != null && !shardFromRoutingNode.primary()) {
shardMatched = true;
if (shouldSkipFetchForRecovery(shardFromRoutingNode)) {
ineligibleShards.add(shardFromRoutingNode);
continue;
}
eligibleShards.add(shardFromRoutingNode);
}
if (shardMatched){
if (shardMatched) {
break;
}
}
}
AsyncBatchShardFetch.FetchResult<NodeStoreFilesMetadataBatch> shardState = fetchData(eligibleFetchShards, ineligibleShards, allocation);
if (shardState.hasData() == false) {
logger.trace("{}: fetching new stores for initializing shard batch", eligibleFetchShards);
AsyncShardFetch.FetchResult<NodeStoreFilesMetadataBatch> shardState = fetchData(
eligibleShards,
ineligibleShards,
allocation
);
if (!shardState.hasData()) {
logger.trace("{}: fetching new stores for initializing shard batch", eligibleShards);
continue; // still fetching
}
for (ShardRouting shard : eligibleFetchShards) {
Map<DiscoveryNode, StoreFilesMetadata> nodeShardStores = getNodeShardStores(shard, shardState);
for (ShardRouting shard : eligibleShards) {
Map<DiscoveryNode, StoreFilesMetadata> nodeShardStores = convertToNodeStoreFilesMetadataMap(shard, shardState);

Runnable cancellationAction = getShardCancellationAction(shard, allocation, nodeShardStores);
if (cancellationAction != null) {
Expand All @@ -89,33 +102,41 @@ public void processExistingRecoveries(RoutingAllocation allocation, List<Set<Sha
}
}


abstract protected FetchResult<NodeStoreFilesMetadataBatch> fetchData(Set<ShardRouting> shardEligibleForFetch,
Set<ShardRouting> inEligibleShards,
RoutingAllocation allocation);
abstract protected FetchResult<NodeStoreFilesMetadataBatch> fetchData(
Set<ShardRouting> eligibleShards,
Set<ShardRouting> ineligibleShards,
RoutingAllocation allocation
);

@Override
protected FetchResult<TransportNodesListShardStoreMetadata.NodeStoreFilesMetadata> fetchData(ShardRouting shard, RoutingAllocation allocation) {
return null;
protected FetchResult<TransportNodesListShardStoreMetadata.NodeStoreFilesMetadata> fetchData(
ShardRouting shard,
RoutingAllocation allocation
) {
logger.error("fetchData for single shard called via batch allocator");
throw new IllegalStateException("ReplicaShardBatchAllocator should only be used for a batch of shards");
}

@Override
public AllocateUnassignedDecision makeAllocationDecision(ShardRouting unassignedShard, RoutingAllocation allocation, Logger logger) {
return null;
return makeAllocationDecision(Collections.singletonList(unassignedShard), allocation, logger).get(unassignedShard);
}

@Override
public HashMap<ShardRouting, AllocateUnassignedDecision> makeAllocationDecision(Set<ShardRouting> shards, RoutingAllocation allocation, Logger logger) {
public HashMap<ShardRouting, AllocateUnassignedDecision> makeAllocationDecision(
List<ShardRouting> shards,
RoutingAllocation allocation,
Logger logger
) {
HashMap<ShardRouting, AllocateUnassignedDecision> shardAllocationDecisions = new HashMap<>();
final boolean explain = allocation.debugDecision();
final RoutingNodes routingNodes = allocation.routingNodes();
Set<ShardRouting> shardsEligibleForFetch = new HashSet<>();
Set<ShardRouting> shardsNotEligibleForFetch = new HashSet<>();
Set<ShardRouting> eligibleShards = new HashSet<>();
Set<ShardRouting> ineligibleShards = new HashSet<>();
HashMap<ShardRouting, Tuple<Decision, Map<String, NodeAllocationResult>>> nodeAllocationDecisions = new HashMap<>();
for (ShardRouting shard : shards) {
if (!isResponsibleFor(shard)) {
// this allocator n is not responsible for allocating this shard
shardsNotEligibleForFetch.add(shard);
ineligibleShards.add(shard);
shardAllocationDecisions.put(shard, AllocateUnassignedDecision.NOT_TAKEN);
continue;
}
Expand All @@ -126,50 +147,66 @@ public HashMap<ShardRouting, AllocateUnassignedDecision> makeAllocationDecision(
// only return early if we are not in explain mode, or we are in explain mode but we have not
// yet attempted to fetch any shard data
logger.trace("{}: ignoring allocation, can't be allocated on any node", shard);
shardAllocationDecisions.put(shard,
AllocateUnassignedDecision.no(UnassignedInfo.AllocationStatus.fromDecision(allocationDecision.type()),
result.v2() != null ? new ArrayList<>(result.v2().values()) : null));
shardAllocationDecisions.put(
shard,
AllocateUnassignedDecision.no(
UnassignedInfo.AllocationStatus.fromDecision(allocationDecision.type()),
result.v2() != null ? new ArrayList<>(result.v2().values()) : null
)
);
continue;
}
// storing the nodeDecisions in nodeAllocationDecisions if the decision is not YES
// so that we don't have to compute the decisions again
nodeAllocationDecisions.put(shard, result);

shardsEligibleForFetch.add(shard);
eligibleShards.add(shard);
}

// Do not call fetchData if there are no eligible shards
if (shardsEligibleForFetch.isEmpty()) {
if (eligibleShards.isEmpty()) {
return shardAllocationDecisions;
}
// only fetch data for eligible shards
final FetchResult<NodeStoreFilesMetadataBatch> shardsState = fetchData(shardsEligibleForFetch, shardsNotEligibleForFetch, allocation);

for (ShardRouting unassignedShard : shardsEligibleForFetch) {
if (!shardsState.hasData()) {
logger.trace("{}: ignoring allocation, still fetching shard stores", unassignedShard);
allocation.setHasPendingAsyncFetch();
List<NodeAllocationResult> nodeDecisions = null;
if (explain) {
nodeDecisions = buildDecisionsForAllNodes(unassignedShard, allocation);
}
shardAllocationDecisions.put(unassignedShard,
AllocateUnassignedDecision.no(UnassignedInfo.AllocationStatus.FETCHING_SHARD_DATA, nodeDecisions));
continue;
}
final FetchResult<NodeStoreFilesMetadataBatch> shardsState = fetchData(
eligibleShards,
ineligibleShards,
allocation
);

for (ShardRouting unassignedShard : eligibleShards) {
Tuple<Decision, Map<String, NodeAllocationResult>> result = nodeAllocationDecisions.get(unassignedShard);
shardAllocationDecisions.put(unassignedShard, getAllocationDecision(unassignedShard, allocation, getNodeShardStores(unassignedShard, shardsState), result, logger));
shardAllocationDecisions.put(
unassignedShard,
getAllocationDecision(
unassignedShard,
allocation,
convertToNodeStoreFilesMetadataMap(unassignedShard, shardsState),
result,
logger
)
);
}
return shardAllocationDecisions;
}
private Map<DiscoveryNode, StoreFilesMetadata> convertToNodeStoreFilesMetadataMap(
ShardRouting unassignedShard,
FetchResult<NodeStoreFilesMetadataBatch> data) {

private Map<DiscoveryNode, StoreFilesMetadata> getNodeShardStores(ShardRouting unassignedShard, FetchResult<NodeStoreFilesMetadataBatch> data) {
assert data.hasData();
return new HashMap<>(
data.getData().entrySet().stream().collect(Collectors.toMap(
Map.Entry::getKey,
entry -> entry.getValue().getNodeStoreFilesMetadataBatch().get(unassignedShard.shardId()).storeFilesMetadata()
))
);
if(!data.hasData()) {
return new HashMap<>();
}

Map<DiscoveryNode, StoreFilesMetadata> map = new HashMap<>();

data.getData().forEach((key, value) -> {
Map<ShardId, NodeStoreFilesMetadata> batch = value.getNodeStoreFilesMetadataBatch();
NodeStoreFilesMetadata metadata = batch.get(unassignedShard.shardId());
if (metadata != null && metadata.getStoreFileFetchException() == null) {
map.put(key, metadata.storeFilesMetadata());
}
});

return map;
}
}
Loading

0 comments on commit 698247c

Please sign in to comment.