Skip to content

Commit

Permalink
Renamed variables and added documentations
Browse files Browse the repository at this point in the history
Signed-off-by: Gaurav Chandani <[email protected]>
  • Loading branch information
Gaurav614 committed Feb 23, 2024
1 parent 492c237 commit 74201a0
Showing 1 changed file with 20 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
import java.util.stream.StreamSupport;

/**
* Allocator for the gateway
* Allocator for the gateway to assign batch of shards.
*
* @opensearch.internal
*/
Expand All @@ -60,7 +60,6 @@ public class ShardsBatchGatewayAllocator implements ExistingShardsAllocator {

private static final Logger logger = LogManager.getLogger(ShardsBatchGatewayAllocator.class);
private final long maxBatchSize;
// private final boolean batchMode;
private static final short DEFAULT_SHARD_BATCH_SIZE = 2000;

/**
Expand All @@ -76,8 +75,8 @@ public class ShardsBatchGatewayAllocator implements ExistingShardsAllocator {

private final RerouteService rerouteService;

private PrimaryShardBatchAllocator primaryBatchShardAllocator;
private ReplicaShardBatchAllocator replicaBatchShardAllocator;
private PrimaryShardBatchAllocator primaryShardBatchAllocator;
private ReplicaShardBatchAllocator replicaShardBatchAllocator;

private Set<String> lastSeenEphemeralIds = Collections.emptySet();

Expand All @@ -98,12 +97,11 @@ public ShardsBatchGatewayAllocator(
Settings settings
) {
this.rerouteService = rerouteService;
this.primaryBatchShardAllocator = new InternalPrimaryBatchShardAllocator();
this.replicaBatchShardAllocator = new InternalReplicaBatchShardAllocator();
this.primaryShardBatchAllocator = new InternalPrimaryBatchShardAllocator();
this.replicaShardBatchAllocator = new InternalReplicaBatchShardAllocator();
this.batchStartedAction = batchStartedAction;
this.batchStoreAction = batchStoreAction;
this.maxBatchSize = GATEWAY_ALLOCATOR_BATCH_SIZE.get(settings);
// this.batchMode = EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.get(settings);
}

@Override
Expand All @@ -122,9 +120,9 @@ public void cleanCaches() {
protected ShardsBatchGatewayAllocator() {
this.rerouteService = null;
this.batchStartedAction = null;
this.primaryBatchShardAllocator = null;
this.primaryShardBatchAllocator = null;
this.batchStoreAction = null;
this.replicaBatchShardAllocator = null;
this.replicaShardBatchAllocator = null;
this.maxBatchSize = DEFAULT_SHARD_BATCH_SIZE;
// this.batchMode = true;
}
Expand All @@ -134,7 +132,6 @@ protected ShardsBatchGatewayAllocator() {
@Override
public int getNumberOfInFlightFetches() {
int count = 0;
// If fetching is done in non batched-mode then maps to maintain batches will be empty and vice versa for batch-mode
for (ShardsBatch batch : batchIdToStartedShardBatch.values()) {
count += (batch.getNumberOfInFlightFetches() * batch.getBatchedShards().size());
}
Expand All @@ -161,21 +158,21 @@ public void applyFailedShards(final List<FailedShard> failedShards, final Routin

@Override
public void beforeAllocation(final RoutingAllocation allocation) {
assert primaryBatchShardAllocator != null;
assert replicaBatchShardAllocator != null;
assert primaryShardBatchAllocator != null;
assert replicaShardBatchAllocator != null;
ensureAsyncFetchStorePrimaryRecency(allocation);
}

@Override
public void afterPrimariesBeforeReplicas(RoutingAllocation allocation) {
assert replicaBatchShardAllocator != null;
assert replicaShardBatchAllocator != null;
List<Set<ShardRouting>> storedShardBatches = batchIdToStoreShardBatch.values()
.stream()
.map(ShardsBatch::getBatchedShardRoutings)
.collect(Collectors.toList());
if (allocation.routingNodes().hasInactiveShards()) {
// cancel existing recoveries if we have a better match
replicaBatchShardAllocator.processExistingRecoveries(allocation, storedShardBatches);
replicaShardBatchAllocator.processExistingRecoveries(allocation, storedShardBatches);
}
}

Expand All @@ -191,9 +188,9 @@ public void allocateUnassigned(
@Override
public void allocateAllUnassignedShards(final RoutingAllocation allocation, boolean primary) {

assert primaryBatchShardAllocator != null;
assert replicaBatchShardAllocator != null;
innerAllocateUnassignedBatch(allocation, primaryBatchShardAllocator, replicaBatchShardAllocator, primary);
assert primaryShardBatchAllocator != null;
assert replicaShardBatchAllocator != null;
innerAllocateUnassignedBatch(allocation, primaryShardBatchAllocator, replicaShardBatchAllocator, primary);
}

protected void innerAllocateUnassignedBatch(
Expand Down Expand Up @@ -256,14 +253,14 @@ else if (shardRouting.primary() == primary) {
while (iterator.hasNext()) {
ShardRouting currentShard = iterator.next();
if (batchSize > 0) {
ShardEntry sharEntry = new ShardEntry(
ShardEntry shardEntry = new ShardEntry(
new ShardAttributes(
currentShard.shardId(),
IndexMetadata.INDEX_DATA_PATH_SETTING.get(allocation.metadata().index(currentShard.index()).getSettings())
),
currentShard
);
shardsToAddToCurrentBatch.put(currentShard.shardId(), sharEntry);
shardsToAddToCurrentBatch.put(currentShard.shardId(), shardEntry);
batchSize--;
iterator.remove();
}
Expand Down Expand Up @@ -364,11 +361,11 @@ public AllocateUnassignedDecision explainUnassignedShardAllocation(ShardRouting
}
assert getBatchId(unassignedShard, unassignedShard.primary()) != null;
if (unassignedShard.primary()) {
assert primaryBatchShardAllocator != null;
return primaryBatchShardAllocator.makeAllocationDecision(unassignedShard, routingAllocation, logger);
assert primaryShardBatchAllocator != null;
return primaryShardBatchAllocator.makeAllocationDecision(unassignedShard, routingAllocation, logger);
} else {
assert replicaBatchShardAllocator != null;
return replicaBatchShardAllocator.makeAllocationDecision(unassignedShard, routingAllocation, logger);
assert replicaShardBatchAllocator != null;
return replicaShardBatchAllocator.makeAllocationDecision(unassignedShard, routingAllocation, logger);
}
}

Expand Down

0 comments on commit 74201a0

Please sign in to comment.