Skip to content

Commit

Permalink
Upstream fetch
Browse files Browse the repository at this point in the history
Signed-off-by: Prudhvi Godithi <[email protected]>
  • Loading branch information
prudhvigodithi committed Feb 8, 2025
1 parent db5212b commit 64bb954
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

import org.opensearch.action.ActionRequestBuilder;
import org.opensearch.action.support.clustermanager.AcknowledgedResponse;
import org.opensearch.client.OpenSearchClient;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.transport.client.OpenSearchClient;

@PublicApi(since = "1.0.0")
public class SearchOnlyRequestBuilder extends ActionRequestBuilder<SearchOnlyRequest, AcknowledgedResponse> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -229,6 +230,7 @@ private void handleShardSyncRequest(NodeSearchOnlyRequest request, TransportChan
);
throw new IllegalStateException(
String.format(
Locale.ROOT,
"Shard [%s] still has %d uncommitted operations after flush. Please wait and retry the scale down operation.",
shard.shardId(),
shard.translogStats().getUncommittedOperations()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ protected void clusterManagerOperation(
final String customDataPath = IndexMetadata.INDEX_DATA_PATH_SETTING.get(state.metadata().index(index).getSettings());
for (IndexShardRoutingTable routing : indexShardRoutingTables) {
final int shardId = routing.shardId().id();
ClusterShardHealth shardHealth = new ClusterShardHealth(shardId, routing);
ClusterShardHealth shardHealth = new ClusterShardHealth(shardId, routing, state.metadata().index(index));
if (request.shardStatuses().contains(shardHealth.getStatus())) {
shardsToFetch.add(Tuple.tuple(routing.shardId(), customDataPath));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public ClusterIndexHealth(final IndexMetadata indexMetadata, final IndexRoutingT
shards = new HashMap<>();
for (IndexShardRoutingTable shardRoutingTable : indexRoutingTable) {
int shardId = shardRoutingTable.shardId().id();
shards.put(shardId, new ClusterShardHealth(shardId, shardRoutingTable));
shards.put(shardId, new ClusterShardHealth(shardId, shardRoutingTable, indexMetadata));
}

// update the index status
Expand Down Expand Up @@ -218,7 +218,7 @@ public ClusterIndexHealth(
if (isShardLevelHealthRequired) {
for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) {
int shardId = indexShardRoutingTable.shardId().id();
ClusterShardHealth shardHealth = new ClusterShardHealth(shardId, indexShardRoutingTable);
ClusterShardHealth shardHealth = new ClusterShardHealth(shardId, indexShardRoutingTable, indexMetadata);
if (shardHealth.isPrimaryActive()) {
computeActivePrimaryShards++;
}
Expand Down Expand Up @@ -268,7 +268,8 @@ public ClusterIndexHealth(
ClusterHealthStatus shardHealth = ClusterShardHealth.getShardHealth(
primaryShard,
activeShardsPerShardId,
shardRoutingCountPerShardId
shardRoutingCountPerShardId,
indexMetadata
);
computeStatus = getIndexHealthStatus(shardHealth, computeStatus);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.cluster.health;

import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.cluster.routing.ShardRouting;
Expand Down Expand Up @@ -113,7 +114,7 @@ public final class ClusterShardHealth implements Writeable, ToXContentFragment {
private int delayedUnassignedShards;
private final boolean primaryActive;

public ClusterShardHealth(final int shardId, final IndexShardRoutingTable shardRoutingTable) {
public ClusterShardHealth(final int shardId, final IndexShardRoutingTable shardRoutingTable, final IndexMetadata indexMetadata) {
this.shardId = shardId;
int computeActiveShards = 0;
int computeRelocatingShards = 0;
Expand All @@ -139,13 +140,13 @@ public ClusterShardHealth(final int shardId, final IndexShardRoutingTable shardR
}
}
final ShardRouting primaryRouting = shardRoutingTable.primaryShard();
this.status = getShardHealth(primaryRouting, computeActiveShards, shardRoutingTable.size());
this.status = getShardHealth(primaryRouting, computeActiveShards, shardRoutingTable.size(), indexMetadata);
this.activeShards = computeActiveShards;
this.relocatingShards = computeRelocatingShards;
this.initializingShards = computeInitializingShards;
this.unassignedShards = computeUnassignedShards;
this.delayedUnassignedShards = computeDelayedUnassignedShards;
this.primaryActive = primaryRouting.active();
this.primaryActive = primaryRouting != null && primaryRouting.active();
}

public ClusterShardHealth(final StreamInput in) throws IOException {
Expand Down Expand Up @@ -230,9 +231,17 @@ public void writeTo(final StreamOutput out) throws IOException {
* Shard health is RED when the primary is not active.
* </p>
*/
public static ClusterHealthStatus getShardHealth(final ShardRouting primaryRouting, final int activeShards, final int totalShards) {
// TO DO
// assert primaryRouting != null : "Primary shard routing can't be null";
public static ClusterHealthStatus getShardHealth(
final ShardRouting primaryRouting,
final int activeShards,
final int totalShards,
final IndexMetadata indexMetadata
) {
if (primaryRouting == null) {
boolean isSearchOnlyEnabled = indexMetadata.getSettings()
.getAsBoolean(IndexMetadata.INDEX_BLOCKS_SEARCH_ONLY_SETTING.getKey(), false);
return isSearchOnlyEnabled ? ClusterHealthStatus.GREEN : ClusterHealthStatus.RED;
}
if (primaryRouting.active()) {
if (activeShards == totalShards) {
return ClusterHealthStatus.GREEN;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package org.opensearch.rest.action.admin.indices;

import org.opensearch.client.node.NodeClient;
import org.opensearch.core.common.Strings;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.RestRequest;
import org.opensearch.rest.action.RestToXContentListener;
import org.opensearch.transport.client.node.NodeClient;

import java.util.List;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,15 @@

package org.opensearch.cluster.health;

import org.opensearch.Version;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.routing.TestShardRouting;
import org.opensearch.cluster.routing.UnassignedInfo;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.index.Index;
import org.opensearch.core.index.shard.ShardId;
Expand Down Expand Up @@ -64,8 +67,18 @@ public void testClusterShardGreenHealth() {
indexShardRoutingBuilder.addShard(
TestShardRouting.newShardRouting(indexName, shardID, "node_1", null, false, ShardRoutingState.STARTED)
);

IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(indexName)
.settings(Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
)
.creationDate(System.currentTimeMillis());
IndexMetadata indexMetadata = indexMetadataBuilder.build();
IndexShardRoutingTable indexShardRoutingTable = indexShardRoutingBuilder.build();
ClusterShardHealth clusterShardHealth = new ClusterShardHealth(shardID, indexShardRoutingTable);

ClusterShardHealth clusterShardHealth = new ClusterShardHealth(shardID, indexShardRoutingTable, indexMetadata);
assertEquals(2, clusterShardHealth.getActiveShards());
assertEquals(0, clusterShardHealth.getInitializingShards());
assertEquals(0, clusterShardHealth.getRelocatingShards());
Expand Down Expand Up @@ -112,7 +125,17 @@ public void testClusterShardYellowHealth() {
)
);
IndexShardRoutingTable indexShardRoutingTable = indexShardRoutingBuilder.build();
ClusterShardHealth clusterShardHealth = new ClusterShardHealth(shardID, indexShardRoutingTable);

IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(indexName)
.settings(Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
)
.creationDate(System.currentTimeMillis());
IndexMetadata indexMetadata = indexMetadataBuilder.build();

ClusterShardHealth clusterShardHealth = new ClusterShardHealth(shardID, indexShardRoutingTable, indexMetadata);
assertEquals(2, clusterShardHealth.getActiveShards());
assertEquals(1, clusterShardHealth.getInitializingShards());
assertEquals(1, clusterShardHealth.getRelocatingShards());
Expand Down Expand Up @@ -150,7 +173,17 @@ public void testClusterShardRedHealth() {
TestShardRouting.newShardRouting(indexName, shardID, null, null, false, ShardRoutingState.UNASSIGNED)
);
IndexShardRoutingTable indexShardRoutingTable = indexShardRoutingBuilder.build();
ClusterShardHealth clusterShardHealth = new ClusterShardHealth(shardID, indexShardRoutingTable);

IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(indexName)
.settings(Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
)
.creationDate(System.currentTimeMillis());
IndexMetadata indexMetadata = indexMetadataBuilder.build();

ClusterShardHealth clusterShardHealth = new ClusterShardHealth(shardID, indexShardRoutingTable, indexMetadata);
assertEquals(0, clusterShardHealth.getActiveShards());
assertEquals(0, clusterShardHealth.getInitializingShards());
assertEquals(0, clusterShardHealth.getRelocatingShards());
Expand All @@ -161,7 +194,15 @@ public void testClusterShardRedHealth() {
}

public void testShardRoutingNullCheck() {
assertThrows(AssertionError.class, () -> ClusterShardHealth.getShardHealth(null, 0, 0));
IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder("test")
.settings(Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
);
IndexMetadata indexMetadata = indexMetadataBuilder.build();

assertThrows(AssertionError.class, () -> ClusterShardHealth.getShardHealth(null, 0, 0, indexMetadata));
}

@Override
Expand Down

0 comments on commit 64bb954

Please sign in to comment.