splitChildAllocationIds = new ArrayList<>();
+ for (int c = 0; c < numberOfChildShards; c++) {
+ splitChildAllocationIds.add(UUIDs.randomBase64UUID());
+ }
+ return new AllocationId(allocationId.getId(), null, splitChildAllocationIds, null);
}
/**
@@ -148,7 +209,18 @@ public static AllocationId newRelocation(AllocationId allocationId) {
*/
public static AllocationId cancelRelocation(AllocationId allocationId) {
assert allocationId.getRelocationId() != null;
- return new AllocationId(allocationId.getId(), null);
+ return new AllocationId(allocationId.getId(), null, null, null);
+ }
+
+ /**
+ * Creates a new allocation id representing a cancelled split.
+ *
+ * Note that this is expected to be called on the allocation id
+ * of the *source* shard
+ */
+ public static AllocationId cancelSplit(AllocationId allocationId) {
+ assert allocationId.getSplitChildAllocationIds() != null;
+ return new AllocationId(allocationId.getId(), null, null, null);
}
/**
@@ -159,7 +231,18 @@ public static AllocationId cancelRelocation(AllocationId allocationId) {
*/
public static AllocationId finishRelocation(AllocationId allocationId) {
assert allocationId.getRelocationId() != null;
- return new AllocationId(allocationId.getId(), null);
+ return new AllocationId(allocationId.getId(), null, null, null);
+ }
+
+ /**
+ * Creates a new allocation id finalizing a split.
+ *
+ * Note that this is expected to be called on the allocation id
+ * of the *target* shard and thus it only needs to clear the parent allocation id.
+ */
+ public static AllocationId finishSplit(AllocationId allocationId) {
+ assert allocationId.getParentAllocationId() != null;
+ return new AllocationId(allocationId.getId(), null, null, null);
}
/**
@@ -177,27 +260,34 @@ public String getRelocationId() {
return relocationId;
}
+ /**
+ * The transient split child allocation ids holding the unique ids which are used for split.
+ */
+ public List getSplitChildAllocationIds() {
+ return splitChildAllocationIds;
+ }
+
+ /**
+ * The transient split parent allocation id holding the unique id that is used for split.
+ */
+ public String getParentAllocationId() {
+ return parentAllocationId;
+ }
+
@Override
public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
+ if (this == o) return true;
+ if (!(o instanceof AllocationId)) return false;
AllocationId that = (AllocationId) o;
- if (!id.equals(that.id)) {
- return false;
- }
- return !(relocationId != null ? !relocationId.equals(that.relocationId) : that.relocationId != null);
-
+ return Objects.equals(id, that.id)
+ && Objects.equals(relocationId, that.relocationId)
+ && Objects.equals(splitChildAllocationIds, that.splitChildAllocationIds)
+ && Objects.equals(parentAllocationId, that.parentAllocationId);
}
@Override
public int hashCode() {
- int result = id.hashCode();
- result = 31 * result + (relocationId != null ? relocationId.hashCode() : 0);
- return result;
+ return Objects.hash(id, relocationId, splitChildAllocationIds, parentAllocationId);
}
@Override
@@ -212,6 +302,16 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (relocationId != null) {
builder.field(RELOCATION_ID_KEY, relocationId);
}
+ if (splitChildAllocationIds != null) {
+ builder.startArray(SPLIT_CHILD_ALLOCATION_IDS);
+ for (String index : splitChildAllocationIds) {
+ builder.value(index);
+ }
+ builder.endArray();
+ }
+ if (parentAllocationId != null) {
+ builder.field(PARENT_ALLOCATION_ID, parentAllocationId);
+ }
builder.endObject();
return builder;
}
diff --git a/server/src/main/java/org/opensearch/cluster/routing/IndexRoutingTable.java b/server/src/main/java/org/opensearch/cluster/routing/IndexRoutingTable.java
index 08574dddc007c..681284b6b26dc 100644
--- a/server/src/main/java/org/opensearch/cluster/routing/IndexRoutingTable.java
+++ b/server/src/main/java/org/opensearch/cluster/routing/IndexRoutingTable.java
@@ -33,6 +33,7 @@
package org.opensearch.cluster.routing;
import org.apache.lucene.util.CollectionUtil;
+import org.opensearch.Version;
import org.opensearch.cluster.AbstractDiffable;
import org.opensearch.cluster.Diff;
import org.opensearch.cluster.metadata.IndexMetadata;
@@ -93,12 +94,15 @@ public class IndexRoutingTable extends AbstractDiffable
// shards with state set to UNASSIGNED
private final Map shards;
+ private final Map childReplicas;
+
private final List allActiveShards;
- IndexRoutingTable(Index index, final Map shards) {
+ IndexRoutingTable(Index index, final Map shards, Map childReplicas) {
this.index = index;
this.shuffler = new RotationShardShuffler(Randomness.get().nextInt());
this.shards = Collections.unmodifiableMap(shards);
+ this.childReplicas = Collections.unmodifiableMap(childReplicas);
List allActiveShards = new ArrayList<>();
for (IndexShardRoutingTable cursor : shards.values()) {
for (ShardRouting shardRouting : cursor) {
@@ -130,10 +134,12 @@ boolean validate(Metadata metadata) {
}
// check the number of shards
- if (indexMetadata.getNumberOfShards() != shards().size()) {
+ if (indexMetadata.getNumberOfShards() - indexMetadata.getSplitShardsMetadata().numberOfEmptyParentShards() != shards().size()) {
Set expected = new HashSet<>();
for (int i = 0; i < indexMetadata.getNumberOfShards(); i++) {
- expected.add(i);
+ if (indexMetadata.getSplitShardsMetadata().isEmptyParentShard(i) == false) {
+ expected.add(i);
+ }
}
for (IndexShardRoutingTable indexShardRoutingTable : this) {
expected.remove(indexShardRoutingTable.shardId().id());
@@ -166,7 +172,7 @@ boolean validate(Metadata metadata) {
}
final Set inSyncAllocationIds = indexMetadata.inSyncAllocationIds(shardRouting.id());
if (shardRouting.active()
- && inSyncAllocationIds.contains(shardRouting.allocationId().getId()) == false
+ && (inSyncAllocationIds == null || inSyncAllocationIds.contains(shardRouting.allocationId().getId()) == false)
&& shardRouting.isSearchOnly() == false) {
throw new IllegalStateException(
"active shard routing "
@@ -242,6 +248,10 @@ public int numberOfNodesShardsAreAllocatedOn(String... excludedNodes) {
return nodes.size();
}
+ public Map getChildReplicas() {
+ return childReplicas;
+ }
+
public Map shards() {
return shards;
}
@@ -254,6 +264,10 @@ public IndexShardRoutingTable shard(int shardId) {
return shards.get(shardId);
}
+ public IndexShardRoutingTable childReplicaShard(int shardId) {
+ return childReplicas.get(shardId);
+ }
+
/**
* Returns true
if all shards are primary and active. Otherwise false
.
*/
@@ -350,6 +364,7 @@ public boolean equals(Object o) {
if (!index.equals(that.index)) return false;
if (!shards.equals(that.shards)) return false;
+ if (!childReplicas.equals(that.childReplicas)) return false;
return true;
}
@@ -358,6 +373,7 @@ public boolean equals(Object o) {
public int hashCode() {
int result = index.hashCode();
result = 31 * result + shards.hashCode();
+ result = 31 * result + childReplicas.hashCode();
return result;
}
@@ -374,6 +390,12 @@ public static IndexRoutingTable readFrom(StreamInput in) throws IOException {
for (int i = 0; i < size; i++) {
builder.addIndexShard(IndexShardRoutingTable.Builder.readFromThin(in, index));
}
+ if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
+ int noOfChildReplicas = in.readVInt();
+ for (int i = 0; i < noOfChildReplicas; i++) {
+ builder.addChildReplicaShard(IndexShardRoutingTable.Builder.readFromThin(in, index));
+ }
+ }
return builder.build();
}
@@ -389,6 +411,12 @@ public void writeTo(StreamOutput out) throws IOException {
for (IndexShardRoutingTable indexShard : this) {
IndexShardRoutingTable.Builder.writeToThin(indexShard, out);
}
+ if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
+ out.writeVInt(childReplicas.size());
+ for (IndexShardRoutingTable indexShard : childReplicas.values()) {
+ IndexShardRoutingTable.Builder.writeToThin(indexShard, out);
+ }
+ }
}
public void writeVerifiableTo(BufferedChecksumStreamOutput out) throws IOException {
@@ -410,6 +438,7 @@ public static class Builder {
private final Index index;
private final Map shards = new HashMap<>();
+ private final Map childReplicas = new HashMap<>();
public Builder(Index index) {
this.index = index;
@@ -642,6 +671,7 @@ private Builder initializeEmpty(IndexMetadata indexMetadata, UnassignedInfo unas
}
shards.put(shardNumber, indexShardRoutingBuilder.build());
}
+
return this;
}
@@ -757,6 +787,11 @@ public Builder addIndexShard(IndexShardRoutingTable indexShard) {
return this;
}
+ public Builder addChildReplicaShard(IndexShardRoutingTable indexShard) {
+ childReplicas.put(indexShard.shardId().id(), indexShard);
+ return this;
+ }
+
/**
* Adds a new shard routing (makes a copy of it), with reference data used from the index shard routing table
* if it needs to be created.
@@ -772,8 +807,20 @@ public Builder addShard(ShardRouting shard) {
return this;
}
+ public Builder addChildReplica(ShardRouting childReplica) {
+ assert childReplica.started() && childReplica.getParentShardId() != null && childReplica.primary() == false;
+ IndexShardRoutingTable childReplicaShard = childReplicas.get(childReplica.shardId().id());
+ if (childReplicaShard == null) {
+ childReplicaShard = new IndexShardRoutingTable.Builder(childReplica.shardId()).addShard(childReplica).build();
+ } else {
+ childReplicaShard = new IndexShardRoutingTable.Builder(childReplicaShard).addShard(childReplica).build();
+ }
+ childReplicas.put(childReplica.id(), childReplicaShard);
+ return this;
+ }
+
public IndexRoutingTable build() {
- return new IndexRoutingTable(index, shards);
+ return new IndexRoutingTable(index, shards, childReplicas);
}
}
diff --git a/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java b/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java
index eb4177d7046ca..67cb1af79669c 100644
--- a/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java
+++ b/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java
@@ -160,6 +160,16 @@ public class IndexShardRoutingTable extends AbstractDiffable childSplitShards);
+
/**
* Called when an unassigned shard's unassigned information was updated
*/
@@ -77,6 +95,21 @@ public interface RoutingChangesObserver {
*/
void relocationSourceRemoved(ShardRouting removedReplicaRelocationSource);
+ /**
+ * Called when split completes after child shards are started.
+ */
+ void splitCompleted(ShardRouting removedSplitSource, IndexMetadata indexMetadata);
+
+ /**
+ * Called when in-place split of child shards has failed.
+ */
+ void splitFailed(ShardRouting splitSource, IndexMetadata indexMetadata);
+
+ /**
+ * Called to determine if shard split has failed in current cluster update.
+ */
+ boolean isSplitOfShardFailed(ShardRouting parentShard);
+
/**
* Called when started replica is promoted to primary.
*/
@@ -104,11 +137,26 @@ public void shardStarted(ShardRouting initializingShard, ShardRouting startedSha
}
+ @Override
+ public void childReplicaStarted(ShardRouting initializingShard, ShardRouting parentShard, ShardRouting childReplica) {
+
+ }
+
+ @Override
+ public void childShardFailed(ShardRouting parentShard, ShardRouting childShard) {
+
+ }
+
@Override
public void relocationStarted(ShardRouting startedShard, ShardRouting targetRelocatingShard) {
}
+ @Override
+ public void splitStarted(ShardRouting startedShard, List childSplitShards) {
+
+ }
+
@Override
public void unassignedInfoUpdated(ShardRouting unassignedShard, UnassignedInfo newUnassignedInfo) {
@@ -119,6 +167,11 @@ public void shardFailed(ShardRouting activeShard, UnassignedInfo unassignedInfo)
}
+ @Override
+ public boolean isSplitOfShardFailed(ShardRouting parentShard) {
+ return false;
+ }
+
@Override
public void relocationCompleted(ShardRouting removedRelocationSource) {
@@ -129,6 +182,16 @@ public void relocationSourceRemoved(ShardRouting removedReplicaRelocationSource)
}
+ @Override
+ public void splitCompleted(ShardRouting removedSplitSource, IndexMetadata indexMetadata) {
+
+ }
+
+ @Override
+ public void splitFailed(ShardRouting splitSource, IndexMetadata indexMetadata) {
+
+ }
+
@Override
public void replicaPromoted(ShardRouting replicaShard) {
@@ -174,6 +237,24 @@ public void relocationStarted(ShardRouting startedShard, ShardRouting targetRelo
}
}
+ @Override
+ public void splitStarted(ShardRouting startedShard, List childSplitShards) {
+ for (RoutingChangesObserver routingChangesObserver : routingChangesObservers) {
+ routingChangesObserver.splitStarted(startedShard, childSplitShards);
+ }
+ }
+
+ @Override
+ public boolean isSplitOfShardFailed(ShardRouting parentShard) {
+ for (RoutingChangesObserver routingChangesObserver : routingChangesObservers) {
+ if (routingChangesObserver.isSplitOfShardFailed(parentShard)) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
@Override
public void unassignedInfoUpdated(ShardRouting unassignedShard, UnassignedInfo newUnassignedInfo) {
for (RoutingChangesObserver routingChangesObserver : routingChangesObservers) {
@@ -202,6 +283,34 @@ public void relocationSourceRemoved(ShardRouting removedReplicaRelocationSource)
}
}
+ @Override
+ public void childReplicaStarted(ShardRouting initializingShard, ShardRouting parentShard, ShardRouting childReplica) {
+ for (RoutingChangesObserver routingChangesObserver : routingChangesObservers) {
+ routingChangesObserver.childReplicaStarted(initializingShard, parentShard, childReplica);
+ }
+ }
+
+ @Override
+ public void childShardFailed(ShardRouting parentShard, ShardRouting childShard) {
+ for (RoutingChangesObserver routingChangesObserver : routingChangesObservers) {
+ routingChangesObserver.childShardFailed(parentShard, childShard);
+ }
+ }
+
+ @Override
+ public void splitCompleted(ShardRouting removedSplitSource, IndexMetadata indexMetadata) {
+ for (RoutingChangesObserver routingChangesObserver : routingChangesObservers) {
+ routingChangesObserver.splitCompleted(removedSplitSource, indexMetadata);
+ }
+ }
+
+ @Override
+ public void splitFailed(ShardRouting splitSource, IndexMetadata indexMetadata) {
+ for (RoutingChangesObserver routingChangesObserver : routingChangesObservers) {
+ routingChangesObserver.splitFailed(splitSource, indexMetadata);
+ }
+ }
+
@Override
public void replicaPromoted(ShardRouting replicaShard) {
for (RoutingChangesObserver routingChangesObserver : routingChangesObservers) {
diff --git a/server/src/main/java/org/opensearch/cluster/routing/RoutingNode.java b/server/src/main/java/org/opensearch/cluster/routing/RoutingNode.java
index 24c3077960444..ccf29692c4f3f 100644
--- a/server/src/main/java/org/opensearch/cluster/routing/RoutingNode.java
+++ b/server/src/main/java/org/opensearch/cluster/routing/RoutingNode.java
@@ -135,6 +135,7 @@ public Iterator iterator() {
private final LinkedHashSet initializingShards;
private final LinkedHashSet relocatingShards;
+ private final LinkedHashSet splittingShards;
private final HashMap> shardsByIndex;
@@ -146,6 +147,7 @@ public RoutingNode(String nodeId, DiscoveryNode node, ShardRouting... shardRouti
this.shards = new BucketedShards(primaryShards, replicaShards);
this.relocatingShards = new LinkedHashSet<>();
this.initializingShards = new LinkedHashSet<>();
+ this.splittingShards = new LinkedHashSet<>();
this.shardsByIndex = new LinkedHashMap<>();
for (ShardRouting shardRouting : shardRoutings) {
@@ -153,6 +155,8 @@ public RoutingNode(String nodeId, DiscoveryNode node, ShardRouting... shardRouti
initializingShards.add(shardRouting);
} else if (shardRouting.relocating()) {
relocatingShards.add(shardRouting);
+ } else if (shardRouting.splitting()) {
+ splittingShards.add(shardRouting);
}
shardsByIndex.computeIfAbsent(shardRouting.index(), k -> new LinkedHashSet<>()).add(shardRouting);
@@ -232,6 +236,8 @@ void add(ShardRouting shard) {
initializingShards.add(shard);
} else if (shard.relocating()) {
relocatingShards.add(shard);
+ } else if (shard.splitting()) {
+ splittingShards.add(shard);
}
shardsByIndex.computeIfAbsent(shard.index(), k -> new LinkedHashSet<>()).add(shard);
assert invariant();
@@ -253,6 +259,9 @@ void update(ShardRouting oldShard, ShardRouting newShard) {
} else if (oldShard.relocating()) {
boolean exist = relocatingShards.remove(oldShard);
assert exist : "expected shard " + oldShard + " to exist in relocatingShards";
+ } else if (oldShard.splitting()) {
+ boolean exist = splittingShards.remove(oldShard);
+ assert exist : "expected shard " + oldShard + " to exist in splittingShards";
}
shardsByIndex.get(oldShard.index()).remove(oldShard);
if (shardsByIndex.get(oldShard.index()).isEmpty()) {
@@ -262,6 +271,8 @@ void update(ShardRouting oldShard, ShardRouting newShard) {
initializingShards.add(newShard);
} else if (newShard.relocating()) {
relocatingShards.add(newShard);
+ } else if (newShard.splitting()) {
+ splittingShards.add(newShard);
}
shardsByIndex.computeIfAbsent(newShard.index(), k -> new LinkedHashSet<>()).add(newShard);
assert invariant();
@@ -277,6 +288,9 @@ void remove(ShardRouting shard) {
} else if (shard.relocating()) {
boolean exist = relocatingShards.remove(shard);
assert exist : "expected shard " + shard + " to exist in relocatingShards";
+ } else if (shard.splitting()) {
+ boolean exist = splittingShards.remove(shard);
+ assert exist : "expected shard " + shard + " to exist in splittingShards";
}
shardsByIndex.get(shard.index()).remove(shard);
if (shardsByIndex.get(shard.index()).isEmpty()) {
@@ -296,6 +310,8 @@ public int numberOfShardsWithState(ShardRoutingState... states) {
return initializingShards.size();
} else if (states[0] == ShardRoutingState.RELOCATING) {
return relocatingShards.size();
+ } else if (states[0] == ShardRoutingState.SPLITTING) {
+ return splittingShards.size();
}
}
@@ -321,6 +337,8 @@ public List shardsWithState(ShardRoutingState... states) {
return new ArrayList<>(initializingShards);
} else if (states[0] == ShardRoutingState.RELOCATING) {
return new ArrayList<>(relocatingShards);
+ } else if (states[0] == ShardRoutingState.SPLITTING) {
+ return new ArrayList<>(splittingShards);
}
}
@@ -361,6 +379,14 @@ public List shardsWithState(String index, ShardRoutingState... sta
shards.add(shardEntry);
}
return shards;
+ } else if (states[0] == ShardRoutingState.SPLITTING) {
+ for (ShardRouting shardEntry : splittingShards) {
+ if (shardEntry.getIndexName().equals(index) == false) {
+ continue;
+ }
+ shards.add(shardEntry);
+ }
+ return shards;
}
}
@@ -444,6 +470,13 @@ private boolean invariant() {
assert relocatingShards.size() == shardRoutingsRelocating.size();
assert relocatingShards.containsAll(shardRoutingsRelocating);
+ // splittingShards must be consistent with that in shards
+ Collection shardRoutingsSplitting = StreamSupport.stream(shards.spliterator(), false)
+ .filter(ShardRouting::splitting)
+ .collect(Collectors.toList());
+ assert splittingShards.size() == shardRoutingsSplitting.size();
+ assert splittingShards.containsAll(shardRoutingsSplitting);
+
final Map> shardRoutingsByIndex = StreamSupport.stream(shards.spliterator(), false)
.collect(Collectors.groupingBy(ShardRouting::index, Collectors.toSet()));
assert shardRoutingsByIndex.equals(shardsByIndex);
diff --git a/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java b/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java
index 76111f623e0a5..8c1795466b716 100644
--- a/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java
+++ b/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java
@@ -37,6 +37,7 @@
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
+import org.opensearch.cluster.metadata.ShardRange;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.UnassignedInfo.AllocationStatus;
import org.opensearch.cluster.routing.allocation.ExistingShardsAllocator;
@@ -94,6 +95,8 @@ public class RoutingNodes implements Iterable {
private final Map> assignedShards = new HashMap<>();
+ private final List splittingShards = new ArrayList<>();
+
private final boolean readOnly;
private int inactivePrimaryCount = 0;
@@ -101,6 +104,7 @@ public class RoutingNodes implements Iterable {
private int inactiveShardCount = 0;
private int relocatingShards = 0;
+ private int splittingShardsCount = 0;
private final Map> nodesPerAttributeNames;
private final Map recoveriesPerNode = new HashMap<>();
@@ -152,12 +156,41 @@ public RoutingNodes(ClusterState clusterState, boolean readOnly) {
addInitialRecovery(targetShardRouting, indexShard.primary);
routingNode.add(targetShardRouting);
assignedShardsAdd(targetShardRouting);
+ } else if (shard.splitting()) {
+ splittingShardsCount++;
+ for (ShardRouting childShard : shard.getRecoveringChildShards()) {
+ // Replication source is parent primary in case of both child shards and their replicas.
+ if (childShard.started() == false) {
+ addInitialRecovery(childShard, indexShard.primary);
+ } else {
+ // Only replicas can start first if a portion of recovering child shards is seen started.
+ assert childShard.primary() == false;
+ }
+ if (childShard.primary()) {
+ routingNode.add(childShard);
+ } else {
+ RoutingNode replicaRoutingNode = nodesToShards.computeIfAbsent(
+ childShard.currentNodeId(),
+ k -> new RoutingNode(childShard.currentNodeId(), clusterState.nodes().get(childShard.currentNodeId()))
+ );
+ replicaRoutingNode.add(childShard);
+ }
+ assignedShardsAdd(childShard);
+ }
+ updateSplitSourceOutgoingRecovery(shard, true);
} else if (shard.initializing()) {
if (shard.primary()) {
inactivePrimaryCount++;
}
inactiveShardCount++;
addInitialRecovery(shard, indexShard.primary);
+ } else if (shard.started() && shard.primary()) {
+ IndexMetadata indexMetadata = metadata.getIndexSafe(indexRoutingTable.getIndex());
+ if (indexMetadata.getSplitShardsMetadata().isSplitOfShardInProgress(shard.id())) {
+ ShardRange[] childShardRanges = indexMetadata.getSplitShardsMetadata().getChildShardsOfParent(shard.shardId().id());
+ ShardRouting parentRouting = shard.createRecoveringChildShards(childShardRanges, indexMetadata.getNumberOfReplicas());
+ splittingShards.add(parentRouting);
+ }
}
} else {
unassignedShards.add(shard);
@@ -168,26 +201,30 @@ public RoutingNodes(ClusterState clusterState, boolean readOnly) {
}
private void addRecovery(ShardRouting routing) {
- updateRecoveryCounts(routing, true, findAssignedPrimaryIfPeerRecovery(routing));
+ updateRecoveryCounts(routing, true, findAssignedPrimaryIfPeerRecoveryOrSplit(routing));
}
private void removeRecovery(ShardRouting routing) {
- updateRecoveryCounts(routing, false, findAssignedPrimaryIfPeerRecovery(routing));
+ updateRecoveryCounts(routing, false, findAssignedPrimaryIfPeerRecoveryOrSplit(routing));
}
private void addInitialRecovery(ShardRouting routing, ShardRouting initialPrimaryShard) {
updateRecoveryCounts(routing, true, initialPrimaryShard);
}
- private void updateRecoveryCounts(final ShardRouting routing, final boolean increment, @Nullable final ShardRouting primary) {
+ private void updateSplitSourceOutgoingRecovery(ShardRouting splitSource, final boolean increment) {
+ final int howMany = increment ? 1 : -1;
+ Recoveries.getOrAdd(getRecoveries(splitSource), splitSource.currentNodeId()).addOutgoing(howMany);
+ }
+ private void updateRecoveryCounts(final ShardRouting routing, final boolean increment, @Nullable final ShardRouting primary) {
final int howMany = increment ? 1 : -1;
assert routing.initializing() : "routing must be initializing: " + routing;
// TODO: check primary == null || primary.active() after all tests properly add ReplicaAfterPrimaryActiveAllocationDecider
assert primary == null || primary.assignedToNode() : "shard is initializing but its primary is not assigned to a node";
- // Primary shard routing, excluding the relocating primaries.
- if (routing.primary() && (primary == null || primary == routing)) {
+ // Primary shard routing, excluding the relocating/splitting primaries in the following if condition.
+ if (routing.primary() && (primary == null || (primary == routing))) {
assert routing.relocatingNodeId() == null : "Routing must be a non relocating primary";
Recoveries.getOrAdd(initialPrimaryRecoveries, routing.currentNodeId()).addIncoming(howMany);
return;
@@ -195,7 +232,7 @@ private void updateRecoveryCounts(final ShardRouting routing, final boolean incr
Recoveries.getOrAdd(getRecoveries(routing), routing.currentNodeId()).addIncoming(howMany);
- if (routing.recoverySource().getType() == RecoverySource.Type.PEER) {
+ if (routing.recoverySource().getType() == RecoverySource.Type.PEER && !routing.isSplitTarget()) {
// add/remove corresponding outgoing recovery on node with primary shard
if (primary == null) {
throw new IllegalStateException("shard is peer recovering but primary is unassigned");
@@ -254,7 +291,7 @@ public int getInitialOutgoingRecoveries(String nodeId) {
}
@Nullable
- private ShardRouting findAssignedPrimaryIfPeerRecovery(ShardRouting routing) {
+ private ShardRouting findAssignedPrimaryIfPeerRecoveryOrSplit(ShardRouting routing) {
ShardRouting primary = null;
if (routing.recoverySource() != null && routing.recoverySource().getType() == RecoverySource.Type.PEER) {
List shardRoutings = assignedShards.get(routing.shardId());
@@ -272,6 +309,27 @@ private ShardRouting findAssignedPrimaryIfPeerRecovery(ShardRouting routing) {
}
}
}
+
+ if (primary != null) {
+ return primary;
+ }
+
+ if (routing.isSplitTarget()) {
+ List shardRoutings = assignedShards.get(routing.getParentShardId());
+ if (shardRoutings != null) {
+ for (ShardRouting shardRouting : shardRoutings) {
+ if (shardRouting.primary()) {
+ if (shardRouting.active()) {
+ return shardRouting;
+ } else if (primary == null) {
+ primary = shardRouting;
+ } else if (primary.getRecoveringChildShards() != null) {
+ primary = shardRouting;
+ }
+ }
+ }
+ }
+ }
return primary;
}
@@ -289,6 +347,10 @@ public UnassignedShards unassigned() {
return this.unassignedShards;
}
+ public List splitting() {
+ return this.splittingShards;
+ }
+
public RoutingNode node(String nodeId) {
return nodesToShards.get(nodeId);
}
@@ -334,6 +396,10 @@ public int getRelocatingShardCount() {
return relocatingShards;
}
+ public int getSplittingShardCount() {
+ return splittingShardsCount;
+ }
+
/**
* Returns all shards that are not in the state UNASSIGNED with the same shard
* ID as the given shard.
@@ -370,6 +436,23 @@ public ShardRouting activePrimary(ShardId shardId) {
return null;
}
+ /**
+ * Returns the primary child for the given shard id or null
if
+ * no child is found or the parent shard is not splitting.
+ */
+ public ShardRouting primaryChild(ShardId parentShardId, ShardId childShardId) {
+ for (ShardRouting shardRouting : assignedShards(parentShardId)) {
+ if (shardRouting.splitting()) {
+ for (ShardRouting childShard : shardRouting.getRecoveringChildShards()) {
+ if (childShard.shardId().id() == childShardId.id() && childShard.primary()) {
+ return childShard;
+ }
+ }
+ }
+ }
+ return null;
+ }
+
/**
* Returns one active replica shard for the given shard id or null
if
* no active replica is found.
@@ -560,6 +643,108 @@ public Tuple relocateShard(
return Tuple.tuple(source, target);
}
+ /**
+ * Update assigned child shards against the current splitting parent and their respective recoveries.
+ */
+ public void assignChildShards(
+ ShardRouting startedShard,
+ ShardRouting parentShard,
+ RoutingChangesObserver changes,
+ Map assignedRoutingNodes
+ ) {
+ ensureMutable();
+ assert parentShard.splitting();
+ splittingShardsCount++;
+ List assignedChildShards = parentShard.assignChildShards(assignedRoutingNodes);
+ for (ShardRouting assignedChildShard : assignedChildShards) {
+ node(assignedChildShard.currentNodeId()).add(assignedChildShard);
+ assignedShardsAdd(assignedChildShard);
+ addRecovery(assignedChildShard);
+ }
+ updateSplitSourceOutgoingRecovery(parentShard, true);
+ updateAssigned(startedShard, parentShard);
+ changes.splitStarted(startedShard, assignedChildShards);
+ }
+
+ public void startInPlaceChildShards(
+ Logger logger,
+ List childShards,
+ IndexMetadata indexMetadata,
+ RoutingChangesObserver routingChangesObserver,
+ RoutingTable routingTable
+ ) {
+ ensureMutable();
+ assert !childShards.isEmpty();
+ ShardRouting parentShard = getByAllocationId(
+ childShards.get(0).getParentShardId(),
+ childShards.get(0).allocationId().getParentAllocationId()
+ );
+ int validShardEvents = 0, invalidShardEvents = 0;
+ for (ShardRouting childShard : childShards) {
+ if (childShard.isSplitTargetOf(parentShard) == false ||
+ (childShard.isStartedChildReplica() == false && childShard.initializing() == false)) {
+ invalidShardEvents++;
+ } else {
+ validShardEvents++;
+ }
+ }
+
+ if (invalidShardEvents != 0) {
+ logger.error(
+ "Invalid shard started event for child shards received. Unknown child found."
+ + ", Parent shard is valid: ["
+ + (indexMetadata.getSplitShardsMetadata().isSplitOfShardInProgress(parentShard.shardId().id()) == true)
+ + "]. Failing all child shards and cancelling split."
+ );
+ // We just need to fail one child shard because failShard ensures that failure of any child shard
+ // fails all child shards and cancels split of source shard.
+ UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, "");
+ failShard(logger, childShards.get(0), unassignedInfo, indexMetadata, routingChangesObserver);
+ return;
+ }
+
+ int startedChildPrimaries = 0, startedChildReplicas = 0;
+ ShardRouting updatedParent = parentShard;
+ for (ShardRouting childShard : childShards) {
+ if (childShard.primary() == false) {
+ if (childShard.initializing() == true) {
+ ShardRouting startedChild = started(childShard);
+ routingChangesObserver.childReplicaStarted(childShard, parentShard, startedChild);
+ updatedParent = updatedParent.updatedStartedReplicaOnParent(childShard, startedChild);
+ startedChildReplicas++;
+ }
+ } else {
+ assert childShard.initializing();
+ ShardRouting startedShard = started(childShard);
+ routingChangesObserver.shardStarted(childShard, startedShard);
+ startedChildPrimaries++;
+ }
+ }
+ updateAssigned(parentShard, updatedParent);
+
+ if (startedChildPrimaries == 0) {
+ assert startedChildReplicas > 0;
+ } else {
+ assert startedChildPrimaries == indexMetadata.getSplitShardsMetadata().
+ getChildShardsOfParent(parentShard.shardId().id()).length;
+ assert startedChildReplicas == 0;
+ for (ShardRouting childShard : parentShard.getRecoveringChildShards()) {
+ ShardRouting assignedChild = getByAllocationId(childShard.shardId(), childShard.allocationId().getId());
+ if (assignedChild.isSplitTarget()) {
+ assert assignedChild.primary() == false;
+ removeParentFromStartedChild(assignedChild);
+ assignedChild = getByAllocationId(childShard.shardId(), childShard.allocationId().getId());
+ }
+ assert assignedChild.started() && assignedChild.isSplitTarget() == false;
+ }
+ IndexShardRoutingTable shardRoutingTable = routingTable.shardRoutingTable(parentShard.shardId());
+ for (ShardRouting parent : shardRoutingTable.getShards()) {
+ remove(parent);
+ }
+ routingChangesObserver.splitCompleted(parentShard, indexMetadata);
+ }
+ }
+
/**
* Applies the relevant logic to start an initializing shard.
*
@@ -629,6 +814,7 @@ public ShardRouting startShard(Logger logger, ShardRouting initializingShard, Ro
* - If shard is a primary, this also fails initializing replicas.
* - If shard is an active primary, this also promotes an active replica to primary (if such a replica exists).
* - If shard is a relocating primary, this also removes the primary relocation target shard.
+ * - If shard is being split, this also removes target child shards.
* - If shard is a relocating replica, this promotes the replica relocation target to a full initializing replica, removing the
* relocation source information. This is possible as peer recovery is always done from the primary.
* - If shard is a (primary or replica) relocation target, this also clears the relocation information on the source shard.
@@ -642,6 +828,10 @@ public void failShard(
RoutingChangesObserver routingChangesObserver
) {
ensureMutable();
+ if (failedShard.getParentShardId() != null && getByAllocationId(failedShard.shardId(), failedShard.allocationId().getId()) == null) {
+ // We already removed this child when parent failed.
+ return;
+ }
assert failedShard.assignedToNode() : "only assigned shards can be failed";
assert indexMetadata.getIndex().equals(failedShard.index()) : "shard failed for unknown index (shard entry: " + failedShard + ")";
assert getByAllocationId(failedShard.shardId(), failedShard.allocationId().getId()) == failedShard
@@ -653,7 +843,7 @@ assert getByAllocationId(failedShard.shardId(), failedShard.allocationId().getId
logger.debug("{} failing shard {} with unassigned info ({})", failedShard.shardId(), failedShard, unassignedInfo.shortSummary());
// if this is a primary, fail initializing replicas first (otherwise we move RoutingNodes into an inconsistent state)
- if (failedShard.primary()) {
+ if (failedShard.primary() && failedShard.getParentShardId() == null) {
List assignedShards = assignedShards(failedShard.shardId());
if (assignedShards.isEmpty() == false) {
// copy list to prevent ConcurrentModificationException
@@ -695,11 +885,34 @@ assert getByAllocationId(failedShard.shardId(), failedShard.allocationId().getId
removeRelocationSource(targetShard);
routingChangesObserver.relocationSourceRemoved(targetShard);
}
+ } else if (failedShard.splitting()) {
+ for (ShardRouting childShard : failedShard.getRecoveringChildShards()) {
+ assert childShard.getParentShardId() != null;
+ logger.trace("{} is removed due to the failure/cancellation of the source shard", childShard);
+ remove(childShard);
+ }
+ routingChangesObserver.splitFailed(failedShard, indexMetadata);
}
// fail actual shard
+ if (failedShard.getParentShardId() != null) {
+ ShardRouting parentShard = getByAllocationId(
+ failedShard.getParentShardId(),
+ failedShard.allocationId().getParentAllocationId()
+ );
+
+ if (parentShard.splitting() == true) {
+ childShardFailed(logger, failedShard, unassignedInfo, indexMetadata, routingChangesObserver, parentShard);
+ assert node(failedShard.currentNodeId()).getByShardId(failedShard.shardId()) == null : "failedShard "
+ + failedShard
+ + " was matched but wasn't removed";
+ }
+ return;
+ }
+
if (failedShard.initializing()) {
- if (failedShard.relocatingNodeId() == null) {
+ AllocationId failedShardAllocId = failedShard.allocationId();
+ if (failedShard.relocatingNodeId() == null && failedShardAllocId.getParentAllocationId() == null) {
if (failedShard.primary()) {
// promote active replica to primary if active replica exists (only the case for shadow replicas)
unassignPrimaryAndPromoteActiveReplicaIfExists(failedShard, unassignedInfo, routingChangesObserver);
@@ -708,7 +921,8 @@ assert getByAllocationId(failedShard.shardId(), failedShard.allocationId().getId
moveToUnassigned(failedShard, unassignedInfo);
}
} else {
- // The shard is a target of a relocating shard. In that case we only need to remove the target shard and cancel the source
+ // The shard is a target or child of a relocating shard. In that case we only need to remove the target/child shard(s) and
+ // cancel the source
// relocation. No shard is left unassigned
logger.trace(
"{} is a relocation target, resolving source to cancel relocation ({})",
@@ -745,6 +959,30 @@ assert node(failedShard.currentNodeId()).getByShardId(failedShard.shardId()) ==
+ " was matched but wasn't removed";
}
+ private void childShardFailed(
+ Logger logger,
+ ShardRouting failedShard,
+ UnassignedInfo unassignedInfo,
+ IndexMetadata indexMetadata,
+ RoutingChangesObserver routingChangesObserver,
+ ShardRouting parentShard
+ ) {
+
+ assert parentShard.isSplitSourceOf(failedShard);
+ logger.trace(
+ "{}, resolved source to [{}]. canceling split ... ({})",
+ failedShard.shardId(),
+ parentShard,
+ unassignedInfo.shortSummary()
+ );
+ cancelSplit(parentShard);
+
+ for (ShardRouting childShard : parentShard.getRecoveringChildShards()) {
+ remove(childShard);
+ }
+ routingChangesObserver.splitFailed(parentShard, indexMetadata);
+ }
+
private void unassignPrimaryAndPromoteActiveReplicaIfExists(
ShardRouting failedShard,
UnassignedInfo unassignedInfo,
@@ -786,7 +1024,7 @@ private void promoteReplicaToPrimary(ShardRouting activeReplica, RoutingChangesO
*/
private ShardRouting started(ShardRouting shard) {
assert shard.initializing() : "expected an initializing shard " + shard;
- if (shard.relocatingNodeId() == null) {
+ if (shard.relocatingNodeId() == null && !shard.isSplitTarget()) {
// if this is not a target shard for relocation, we need to update statistics
inactiveShardCount--;
if (shard.primary()) {
@@ -794,11 +1032,24 @@ private ShardRouting started(ShardRouting shard) {
}
}
removeRecovery(shard);
- ShardRouting startedShard = shard.moveToStarted();
+ ShardRouting startedShard;
+ if (shard.isSplitTarget() && shard.primary() == false) {
+ startedShard = shard.moveChildReplicaToStarted();
+ } else {
+ startedShard = shard.moveToStarted();
+ }
updateAssigned(shard, startedShard);
return startedShard;
}
+
+ private ShardRouting removeParentFromStartedChild(ShardRouting shard) {
+ assert shard.started();
+ ShardRouting parentRemoved = shard.removeParentFromReplica();
+ updateAssigned(shard, parentRemoved);
+ return parentRemoved;
+ }
+
/**
* Cancels a relocation of a shard that shard must relocating.
*
@@ -811,6 +1062,22 @@ private ShardRouting cancelRelocation(ShardRouting shard) {
return cancelledShard;
}
+ /**
+ * Cancels a relocation of a shard that shard must relocating.
+ *
+ * @return the shard after cancelling relocation
+ */
+ private ShardRouting cancelSplit(ShardRouting shard) {
+ splittingShardsCount--;
+ for (ShardRouting splittingShard : splittingShards) {
+ assert splittingShard.shardId().equals(shard.shardId()) == false;
+ }
+ ShardRouting cancelledShard = shard.cancelSplit();
+ updateAssigned(shard, cancelledShard);
+ updateSplitSourceOutgoingRecovery(shard, false);
+ return cancelledShard;
+ }
+
/**
* moves the assigned replica shard to primary.
*
@@ -835,7 +1102,7 @@ private ShardRouting promoteActiveReplicaShardToPrimary(ShardRouting replicaShar
private void remove(ShardRouting shard) {
assert shard.unassigned() == false : "only assigned shards can be removed here (" + shard + ")";
node(shard.currentNodeId()).remove(shard);
- if (shard.initializing() && shard.relocatingNodeId() == null) {
+ if (shard.initializing() && shard.relocatingNodeId() == null && !shard.isSplitTarget()) {
inactiveShardCount--;
assert inactiveShardCount >= 0;
if (shard.primary()) {
@@ -843,6 +1110,8 @@ private void remove(ShardRouting shard) {
}
} else if (shard.relocating()) {
shard = cancelRelocation(shard);
+ } else if (shard.splitting()) {
+ shard = cancelSplit(shard);
}
assignedShardsRemove(shard);
if (shard.initializing()) {
@@ -1215,10 +1484,11 @@ public static boolean assertShardStats(RoutingNodes routingNodes) {
int inactivePrimaryCount = 0;
int inactiveShardCount = 0;
int relocating = 0;
+ int splitting = 0;
Map indicesAndShards = new HashMap<>();
for (RoutingNode node : routingNodes) {
for (ShardRouting shard : node) {
- if (shard.initializing() && shard.relocatingNodeId() == null) {
+ if (shard.initializing() && shard.relocatingNodeId() == null && !shard.isSplitTarget()) {
inactiveShardCount++;
if (shard.primary()) {
inactivePrimaryCount++;
@@ -1226,6 +1496,8 @@ public static boolean assertShardStats(RoutingNodes routingNodes) {
}
if (shard.relocating()) {
relocating++;
+ } else if (shard.splitting()) {
+ splitting++;
}
Integer i = indicesAndShards.get(shard.index());
if (i == null) {
@@ -1272,12 +1544,12 @@ public static boolean assertShardStats(RoutingNodes routingNodes) {
}
}
- assertRecoveriesPerNode(routingNodes, routingNodes.initialPrimaryRecoveries, false, x -> isNonRelocatingPrimary(x));
+ assertRecoveriesPerNode(routingNodes, routingNodes.initialPrimaryRecoveries, false, x -> isNonRelocatingOrNonSplittingPrimary(x));
assertRecoveriesPerNode(
routingNodes,
Recoveries.unionRecoveries(routingNodes.recoveriesPerNode, routingNodes.initialReplicaRecoveries),
true,
- x -> !isNonRelocatingPrimary(x)
+ x -> !isNonRelocatingOrNonSplittingPrimary(x)
);
assert unassignedPrimaryCount == routingNodes.unassignedShards.getNumPrimaries() : "Unassigned primaries is ["
@@ -1305,6 +1577,11 @@ public static boolean assertShardStats(RoutingNodes routingNodes) {
+ "] but expected ["
+ relocating
+ "]";
+ assert routingNodes.getSplittingShardCount() == splitting : "Splitting shards mismatch ["
+ + routingNodes.getSplittingShardCount()
+ + "] but expected ["
+ + splitting
+ + "]";
return true;
}
@@ -1325,10 +1602,21 @@ private static void assertRecoveriesPerNode(
for (ShardRouting routing : routingNode) {
if (routing.initializing() && incomingCountFilter.apply(routing)) incoming++;
- if (verifyOutgoingRecoveries && routing.primary() && routing.isRelocationTarget() == false) {
+ if (verifyOutgoingRecoveries
+ && routing.primary()
+ && routing.isRelocationTarget() == false
+ && routing.isSplitTarget() == false) {
for (ShardRouting assigned : routingNodes.assignedShards.get(routing.shardId())) {
if (assigned.initializing() && assigned.recoverySource().getType() == RecoverySource.Type.PEER) {
outgoing++;
+ } else if (assigned.splitting()) {
+ for (ShardRouting childShardRouting : assigned.getRecoveringChildShards()) {
+ assert routingNodes.assignedShards.containsKey(childShardRouting.shardId());
+ for (ShardRouting assignedChildShard : routingNodes.assignedShards.get(childShardRouting.shardId())) {
+ assert assignedChildShard.getParentShardId().equals(assigned.shardId());
+ }
+ }
+ outgoing++;
}
}
}
@@ -1340,8 +1628,8 @@ private static void assertRecoveriesPerNode(
}
}
- private static boolean isNonRelocatingPrimary(ShardRouting routing) {
- return routing.primary() && routing.relocatingNodeId() == null;
+ private static boolean isNonRelocatingOrNonSplittingPrimary(ShardRouting routing) {
+ return routing.primary() && routing.relocatingNodeId() == null && !routing.splitting() && !routing.isSplitTarget();
}
private void ensureMutable() {
diff --git a/server/src/main/java/org/opensearch/cluster/routing/RoutingTable.java b/server/src/main/java/org/opensearch/cluster/routing/RoutingTable.java
index 7128eb44bfb14..e801382158972 100644
--- a/server/src/main/java/org/opensearch/cluster/routing/RoutingTable.java
+++ b/server/src/main/java/org/opensearch/cluster/routing/RoutingTable.java
@@ -173,6 +173,24 @@ public IndexShardRoutingTable shardRoutingTable(ShardId shardId) {
return shard;
}
+ /**
+ * All child replica shards for the provided {@link ShardId}
+ * @return All the child replica shard routing entries for the given index and shard id
+ * @throws IndexNotFoundException if provided index does not exist
+ * @throws ShardNotFoundException if provided shard id is unknown
+ */
+ public IndexShardRoutingTable childReplicaShardRoutingTable(ShardId shardId) {
+ IndexRoutingTable indexRouting = index(shardId.getIndexName());
+ if (indexRouting == null || indexRouting.getIndex().equals(shardId.getIndex()) == false) {
+ throw new IndexNotFoundException(shardId.getIndex());
+ }
+ IndexShardRoutingTable shard = indexRouting.childReplicaShard(shardId.id());
+ if (shard == null) {
+ throw new ShardNotFoundException(shardId);
+ }
+ return shard;
+ }
+
@Nullable
public ShardRouting getByAllocationId(ShardId shardId, String allocationId) {
final IndexRoutingTable indexRoutingTable = index(shardId.getIndexName());
@@ -494,8 +512,9 @@ public Builder updateNodes(long version, RoutingNodes routingNodes) {
for (RoutingNode routingNode : routingNodes) {
for (ShardRouting shardRoutingEntry : routingNode) {
// every relocating shard has a double entry, ignore the target one.
- if (shardRoutingEntry.initializing() && shardRoutingEntry.relocatingNodeId() != null) continue;
-
+ // Also, ignore initializing child shards.
+ if (shardRoutingEntry.initializing()
+ && (shardRoutingEntry.relocatingNodeId() != null || shardRoutingEntry.isSplitTarget())) continue;
addShard(indexRoutingTableBuilders, shardRoutingEntry);
}
}
@@ -522,7 +541,11 @@ private static void addShard(
indexBuilder = new IndexRoutingTable.Builder(index);
indexRoutingTableBuilders.put(index.getName(), indexBuilder);
}
- indexBuilder.addShard(shardRoutingEntry);
+ if (shardRoutingEntry.isStartedChildReplica()) {
+ indexBuilder.addChildReplica(shardRoutingEntry);
+ } else {
+ indexBuilder.addShard(shardRoutingEntry);
+ }
}
/**
diff --git a/server/src/main/java/org/opensearch/cluster/routing/RoutingTableIncrementalDiff.java b/server/src/main/java/org/opensearch/cluster/routing/RoutingTableIncrementalDiff.java
index 13501a431d9f9..c47577c3485ad 100644
--- a/server/src/main/java/org/opensearch/cluster/routing/RoutingTableIncrementalDiff.java
+++ b/server/src/main/java/org/opensearch/cluster/routing/RoutingTableIncrementalDiff.java
@@ -8,6 +8,7 @@
package org.opensearch.cluster.routing;
+import org.opensearch.Version;
import org.opensearch.cluster.Diff;
import org.opensearch.cluster.DiffableUtils;
import org.opensearch.core.common.io.stream.StreamInput;
@@ -15,6 +16,7 @@
import org.opensearch.core.index.Index;
import java.io.IOException;
+import java.util.HashMap;
import java.util.Map;
import static org.opensearch.cluster.DiffableUtils.MapDiff;
@@ -96,12 +98,14 @@ public MapDiff> provid
public static class IndexRoutingTableIncrementalDiff implements Diff {
private final Diff