Skip to content

Commit abba062

Browse files
authored
More efficient sort in tryRelocateShard (#128063) (#128159)
No need to do this via an allocation-heavy `Stream`, we can just put the objects straight into an array, sort them in-place, and keep hold of the array to avoid having to allocate anything on the next iteration. Also slims down `BY_DESCENDING_SHARD_ID`: it's always sorting the same index so we don't need to look at `ShardId#index` in the comparison, nor do we really need multiple layers of vtable lookups, we can just compare the shard IDs directly. Relates #128021
1 parent 6fb106d commit abba062

File tree

2 files changed

+28
-7
lines changed

2 files changed

+28
-7
lines changed

docs/changelog/128063.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 128063
2+
summary: More efficient sort in `tryRelocateShard`
3+
area: Allocation
4+
type: enhancement
5+
issues: []

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@
5454
import java.util.Map;
5555
import java.util.Set;
5656
import java.util.function.BiFunction;
57-
import java.util.stream.StreamSupport;
5857

5958
import static org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata.Type.REPLACE;
6059
import static org.elasticsearch.cluster.routing.ExpectedShardSizeEstimator.getExpectedShardSize;
@@ -1206,7 +1205,13 @@ private AllocateUnassignedDecision decideAllocateUnassigned(final ShardRouting s
12061205
return AllocateUnassignedDecision.fromDecision(decision, minNode != null ? minNode.routingNode.node() : null, nodeDecisions);
12071206
}
12081207

1209-
private static final Comparator<ShardRouting> BY_DESCENDING_SHARD_ID = Comparator.comparing(ShardRouting::shardId).reversed();
1208+
private static final Comparator<ShardRouting> BY_DESCENDING_SHARD_ID = (s1, s2) -> Integer.compare(s2.id(), s1.id());
1209+
1210+
/**
1211+
* Scratch space for accumulating/sorting the {@link ShardRouting} instances when contemplating moving the shards away from a node
1212+
* in {@link #tryRelocateShard} - re-used to avoid extraneous allocations etc.
1213+
*/
1214+
private ShardRouting[] shardRoutingsOnMaxWeightNode;
12101215

12111216
/**
12121217
* Tries to find a relocation from the max node to the minimal node for an arbitrary shard of the given index on the
@@ -1217,13 +1222,24 @@ private boolean tryRelocateShard(ModelNode minNode, ModelNode maxNode, String id
12171222
final ModelIndex index = maxNode.getIndex(idx);
12181223
if (index != null) {
12191224
logger.trace("Try relocating shard of [{}] from [{}] to [{}]", idx, maxNode.getNodeId(), minNode.getNodeId());
1220-
final Iterable<ShardRouting> shardRoutings = StreamSupport.stream(index.spliterator(), false)
1221-
.filter(ShardRouting::started) // cannot rebalance unassigned, initializing or relocating shards anyway
1222-
.sorted(BY_DESCENDING_SHARD_ID) // check in descending order of shard id so that the decision is deterministic
1223-
::iterator;
1225+
if (shardRoutingsOnMaxWeightNode == null || shardRoutingsOnMaxWeightNode.length < index.numShards()) {
1226+
shardRoutingsOnMaxWeightNode = new ShardRouting[index.numShards() * 2]; // oversized so reuse is more likely
1227+
}
1228+
1229+
int startedShards = 0;
1230+
for (final var shardRouting : index) {
1231+
if (shardRouting.started()) { // cannot rebalance unassigned, initializing or relocating shards anyway
1232+
shardRoutingsOnMaxWeightNode[startedShards] = shardRouting;
1233+
startedShards += 1;
1234+
}
1235+
}
1236+
// check in descending order of shard id so that the decision is deterministic
1237+
ArrayUtil.timSort(shardRoutingsOnMaxWeightNode, 0, startedShards, BY_DESCENDING_SHARD_ID);
12241238

12251239
final AllocationDeciders deciders = allocation.deciders();
1226-
for (ShardRouting shard : shardRoutings) {
1240+
for (int shardIndex = 0; shardIndex < startedShards; shardIndex++) {
1241+
final ShardRouting shard = shardRoutingsOnMaxWeightNode[shardIndex];
1242+
12271243
final Decision rebalanceDecision = deciders.canRebalance(shard, allocation);
12281244
if (rebalanceDecision.type() == Type.NO) {
12291245
continue;

0 commit comments

Comments
 (0)