Skip to content

Commit

Permalink
Upstream Fetch
Browse files Browse the repository at this point in the history
Signed-off-by: Prudhvi Godithi <[email protected]>
  • Loading branch information
prudhvigodithi committed Feb 25, 2025
1 parent 6e2bb1d commit cb515eb
Show file tree
Hide file tree
Showing 23 changed files with 2,328 additions and 181 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@

package org.opensearch.indices.replication;

import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest;
import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreResponse;
import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.common.settings.Settings;
Expand All @@ -21,7 +25,9 @@

import java.util.List;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;

Expand Down Expand Up @@ -102,8 +108,8 @@ private void bootstrapIndexWithOutSearchReplicas(ReplicationType replicationType

Settings settings = Settings.builder()
.put(super.indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put(SETTING_NUMBER_OF_SHARDS, 1)
.put(SETTING_NUMBER_OF_REPLICAS, 1)
.put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 0)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, replicationType)
.build();
Expand All @@ -114,13 +120,29 @@ private void bootstrapIndexWithOutSearchReplicas(ReplicationType replicationType
ensureGreen(INDEX_NAME);
}

public void testRemoteStoreRestoreFailsForSearchOnlyIndex() throws Exception {
bootstrapIndexWithSearchReplicas();
assertAcked(client().admin().indices().prepareSearchOnly(INDEX_NAME).setSearchOnly(true).get());

GetSettingsResponse settingsResponse = client().admin().indices().prepareGetSettings(INDEX_NAME).get();
assertEquals("true", settingsResponse.getSetting(INDEX_NAME, IndexMetadata.INDEX_BLOCKS_SEARCH_ONLY_SETTING.getKey()));

IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> {
PlainActionFuture<RestoreRemoteStoreResponse> future = PlainActionFuture.newFuture();
client().admin().cluster().restoreRemoteStore(new RestoreRemoteStoreRequest().indices(INDEX_NAME), future);
future.actionGet();
});

assertTrue(exception.getMessage().contains("Cannot restore index [" + INDEX_NAME + "] because search-only mode is enabled"));
}

private void bootstrapIndexWithSearchReplicas() throws InterruptedException {
startCluster(3);

Settings settings = Settings.builder()
.put(super.indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put(SETTING_NUMBER_OF_SHARDS, 1)
.put(SETTING_NUMBER_OF_REPLICAS, 1)
.put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 1)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build();
Expand Down
4 changes: 2 additions & 2 deletions server/src/main/java/org/opensearch/action/ActionModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@
import org.opensearch.rest.action.admin.indices.RestResizeHandler;
import org.opensearch.rest.action.admin.indices.RestResolveIndexAction;
import org.opensearch.rest.action.admin.indices.RestRolloverIndexAction;
import org.opensearch.rest.action.admin.indices.RestSearchOnlyAction;
import org.opensearch.rest.action.admin.indices.RestScaleSearchOnlyAction;
import org.opensearch.rest.action.admin.indices.RestSimulateIndexTemplateAction;
import org.opensearch.rest.action.admin.indices.RestSimulateTemplateAction;
import org.opensearch.rest.action.admin.indices.RestSyncedFlushAction;
Expand Down Expand Up @@ -910,7 +910,7 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
registerHandler.accept(new RestUpgradeAction());
registerHandler.accept(new RestUpgradeStatusAction());
registerHandler.accept(new RestClearIndicesCacheAction());
registerHandler.accept(new RestSearchOnlyAction());
registerHandler.accept(new RestScaleSearchOnlyAction());
registerHandler.accept(new RestIndexAction());
registerHandler.accept(new CreateHandler());
registerHandler.accept(new AutoIdHandler(nodesInCluster));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
* index state compatibility, and configuration prerequisites such as remote store
* and segment replication settings.
*/
class ScaleOperationValidator {
class SearchOnlyOperationValidator {

/**
* Validates that the given index meets the prerequisites for the scale operation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,11 @@ public SearchOnlyRequestBuilder(OpenSearchClient client, boolean scaleDown, Stri
* <li>Scale up an index from search-only mode back to full read-write operation</li>
* </ul>
*
* @param scaleDown true if scaling down to search-only mode, false if scaling up to normal operation
* @param searchOnly true if scaling down to search-only mode, false if scaling up to normal operation
* @return this builder (for method chaining)
*/
public SearchOnlyRequestBuilder setScaleDown(boolean scaleDown) {
request.scaleDown(scaleDown);
public SearchOnlyRequestBuilder setSearchOnly(boolean searchOnly) {
request.scaleDown(searchOnly);
return this;

Check warning on line 65 in server/src/main/java/org/opensearch/action/admin/indices/scale/searchonly/SearchOnlyRequestBuilder.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/indices/scale/searchonly/SearchOnlyRequestBuilder.java#L64-L65

Added lines #L64 - L65 were not covered by tests
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public class TransportSearchOnlyAction extends TransportClusterManagerNodeAction
private final IndicesService indicesService;
private final TransportService transportService;

private final ScaleOperationValidator validator;
private final SearchOnlyOperationValidator validator;
private final SearchOnlyClusterStateBuilder searchOnlyClusterStateBuilder;
private final SearchOnlyShardSyncManager searchOnlyShardSyncManager;

Expand Down Expand Up @@ -131,7 +131,7 @@ public TransportSearchOnlyAction(
this.allocationService = allocationService;
this.indicesService = indicesService;
this.transportService = transportService;
this.validator = new ScaleOperationValidator();
this.validator = new SearchOnlyOperationValidator();
this.searchOnlyClusterStateBuilder = new SearchOnlyClusterStateBuilder();
this.searchOnlyShardSyncManager = new SearchOnlyShardSyncManager(clusterService, transportService, NAME);

Expand Down Expand Up @@ -239,7 +239,7 @@ private void finalizeScaleDown(String index, ActionListener<AcknowledgedResponse
/**
* Handles an incoming shard sync request from another node.
*/
private void handleShardSyncRequest(NodeSearchOnlyRequest request, TransportChannel channel) throws Exception {
void handleShardSyncRequest(NodeSearchOnlyRequest request, TransportChannel channel) throws Exception {
logger.info("Handling shard sync request for index [{}]", request.getIndex());
ClusterState state = clusterService.state();

Expand Down Expand Up @@ -277,7 +277,7 @@ private List<ShardSearchOnlyResponse> syncShards(IndexService indexService, List
return shardResponses;
}

private ShardSearchOnlyResponse syncSingleShard(IndexShard shard) throws Exception {
ShardSearchOnlyResponse syncSingleShard(IndexShard shard) throws Exception {
logger.info("Performing final sync and flush for shard {}", shard.shardId());
shard.sync();
shard.flush(new FlushRequest().force(true).waitIfOngoing(true));
Expand Down Expand Up @@ -331,7 +331,7 @@ protected ClusterBlockException checkBlock(SearchOnlyRequest request, ClusterSta
* <li>Initiates shard synchronization after the block is applied</li>
* </ul>
*/
private class AddBlockClusterStateUpdateTask extends ClusterStateUpdateTask {
class AddBlockClusterStateUpdateTask extends ClusterStateUpdateTask {
private final String index;
private final Map<Index, ClusterBlock> blockedIndices;
private final ActionListener<AcknowledgedResponse> listener;
Expand Down Expand Up @@ -390,7 +390,7 @@ public void onFailure(String source, Exception e) {
* <li>Updates the routing table to remove non-search-only shards</li>
* </ul>
*/
private class FinalizeScaleDownTask extends ClusterStateUpdateTask {
class FinalizeScaleDownTask extends ClusterStateUpdateTask {
private final String index;
private final ActionListener<AcknowledgedResponse> listener;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,6 @@ public RoutingNodes(ClusterState clusterState, boolean readOnly) {
// also fill replicaSet information
for (final IndexRoutingTable indexRoutingTable : routingTable.indicesRouting().values()) {
for (IndexShardRoutingTable indexShard : indexRoutingTable) {
// TO DO
// assert indexShard.primary != null;
IndexMetadata idxMetadata = metadata.index(indexShard.shardId().getIndex());
boolean isSearchOnly = false;
if (idxMetadata != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,8 @@ protected ShardRouting(
assert !(state == ShardRoutingState.UNASSIGNED && unassignedInfo == null) : "unassigned shard must be created with meta";
assert (state == ShardRoutingState.UNASSIGNED || state == ShardRoutingState.INITIALIZING) == (recoverySource != null)
: "recovery source only available on unassigned or initializing shard but was " + state;
// TO DO
/*assert recoverySource == null || recoverySource == PeerRecoverySource.INSTANCE || primary || searchOnly
: "replica shards always recover from primary";*/
assert recoverySource == null || recoverySource == PeerRecoverySource.INSTANCE || primary || searchOnly
: "replica shards always recover from primary";
assert (currentNodeId == null) == (state == ShardRoutingState.UNASSIGNED) : "unassigned shard must not be assigned to a node "
+ this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,9 +230,6 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
IndexSettings.SEARCHABLE_SNAPSHOT_ID_UUID,
IndexSettings.SEARCHABLE_SNAPSHOT_SHARD_PATH_TYPE,

// Index Search only setting
IndexSettings.INDEX_SEARCH_ONLY_SETTING,

// Settings for remote translog
IndexSettings.INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING,
IndexSettings.INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,8 +260,6 @@ protected AllocateUnassignedDecision getAllocationDecision(

// Only proceed if both the shard is marked search-only and the index setting is enabled.
if (unassignedShard.isSearchOnly() && isIndexSearchOnly) {
logger.info("getAllocationDecision: entering search-only branch for {}", unassignedShard);

// Obtain the collection of data nodes once.
Collection<DiscoveryNode> dataNodes = allocation.nodes().getDataNodes().values();

Check warning on line 264 in server/src/main/java/org/opensearch/gateway/ReplicaShardAllocator.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/ReplicaShardAllocator.java#L264

Added line #L264 was not covered by tests

Expand All @@ -271,15 +269,13 @@ protected AllocateUnassignedDecision getAllocationDecision(
.filter(candidate -> {
RoutingNode node = allocation.routingNodes().node(candidate.getId());
Decision decision = allocation.deciders().canAllocate(unassignedShard, node, allocation);

Check warning on line 271 in server/src/main/java/org/opensearch/gateway/ReplicaShardAllocator.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/ReplicaShardAllocator.java#L269-L271

Added lines #L269 - L271 were not covered by tests
logger.info("Allocating decision for candidate {} is {}", candidate, decision.getDecisions());
return decision.type() == Decision.Type.YES;
})
.findFirst()
.orElse(null);

Check warning on line 275 in server/src/main/java/org/opensearch/gateway/ReplicaShardAllocator.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/ReplicaShardAllocator.java#L274-L275

Added lines #L274 - L275 were not covered by tests

// If a candidate was found, return a YES allocation decision.
if (selectedCandidate != null) {
logger.info("Allocating search-only replica {} to node {}", unassignedShard, selectedCandidate);
return AllocateUnassignedDecision.yes(selectedCandidate, null, new ArrayList<>(), false);

Check warning on line 279 in server/src/main/java/org/opensearch/gateway/ReplicaShardAllocator.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/ReplicaShardAllocator.java#L279

Added line #L279 was not covered by tests
}

Expand Down
11 changes: 0 additions & 11 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -782,17 +782,6 @@ public static IndexMergePolicy fromString(String text) {
Property.IndexScope
);

/**
* Setting to indicate if an index is in search-only mode.
* This setting can only be modified through the _searchonly API.
*/
public static final Setting<Boolean> INDEX_SEARCH_ONLY_SETTING = Setting.boolSetting(
"index.search_only",
false,
Property.IndexScope,
Property.InternalIndex
);

private final Index index;
private final Version version;
private final Logger logger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,40 +167,31 @@ public RemoteRestoreResult restore(
IndicesOptions.fromOptions(true, true, true, true)
);

boolean allSearchOnly = true;
for (String indexName : filteredIndices) {
IndexMetadata indexMetadata = currentState.metadata().index(indexName);
if (indexMetadata == null) {
logger.warn("Skipping restore: index [{}] does not exist.", indexName);
logger.warn("Index restore is not supported for non-existent index. Skipping: {}", indexName);
continue;

Check warning on line 174 in server/src/main/java/org/opensearch/index/recovery/RemoteStoreRestoreService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/recovery/RemoteStoreRestoreService.java#L174

Added line #L174 was not covered by tests
}

boolean isSearchOnly = indexMetadata.getSettings()
.getAsBoolean(IndexMetadata.INDEX_BLOCKS_SEARCH_ONLY_SETTING.getKey(), false);

Check warning on line 177 in server/src/main/java/org/opensearch/index/recovery/RemoteStoreRestoreService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/recovery/RemoteStoreRestoreService.java#L176-L177

Added lines #L176 - L177 were not covered by tests

if (isSearchOnly) {
logger.warn("Skipping _remotestore/_restore for index [{}] as search-only mode is enabled.", indexName);
} else if (restoreAllShards && IndexMetadata.State.CLOSE.equals(indexMetadata.getState()) == false) {
throw new IllegalArgumentException(
String.format(Locale.ROOT, "Cannot restore index [%s] because search-only mode is enabled", indexName)

Check warning on line 180 in server/src/main/java/org/opensearch/index/recovery/RemoteStoreRestoreService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/recovery/RemoteStoreRestoreService.java#L179-L180

Added lines #L179 - L180 were not covered by tests
);
}
if (restoreAllShards && IndexMetadata.State.CLOSE.equals(indexMetadata.getState()) == false) {
throw new IllegalStateException(
String.format(
Locale.ROOT,
"cannot restore index [%s] because an open index with same name/uuid already exists in the cluster.",
indexName
) + " Close the existing index."
);
} else {
allSearchOnly = false;
indexMetadataMap.put(indexName, new Tuple<>(false, indexMetadata));
}
}

if (allSearchOnly) {
throw new IllegalArgumentException(
"Skipping _remotestore/_restore for all selected indices as search-only mode is enabled."
);
indexMetadataMap.put(indexName, new Tuple<>(false, indexMetadata));

Check warning on line 192 in server/src/main/java/org/opensearch/index/recovery/RemoteStoreRestoreService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/recovery/RemoteStoreRestoreService.java#L192

Added line #L192 was not covered by tests
}
}

return executeRestore(currentState, indexMetadataMap, restoreAllShards, remoteState);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.rest.action.admin.indices;

import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.RestRequest;
import org.opensearch.rest.action.RestToXContentListener;
import org.opensearch.transport.client.node.NodeClient;

import java.io.IOException;
import java.util.List;
import java.util.Map;

import static java.util.Arrays.asList;
import static java.util.Collections.unmodifiableList;
import static org.opensearch.rest.RestRequest.Method.POST;

/**
* Rest action for scaling index operations
*
* @opensearch.internal
*/
public class RestScaleSearchOnlyAction extends BaseRestHandler {

private static final String SEARCH_ONLY_FIELD = "search_only";

@Override
public List<Route> routes() {
return unmodifiableList(asList(new Route(POST, "/{index}/_scale")));
}

@Override
public String getName() {
return "search_only_index_action";
}

@Override
protected RestChannelConsumer prepareRequest(final RestRequest request, NodeClient client) throws IOException {
String index = request.param("index");
if (index == null || index.trim().isEmpty()) {
throw new IllegalArgumentException("index is required");
}

// Parse the request body first to get the scale down value
final boolean searchOnly = parseSearchOnlyValue(request);

// Then use the final value in the lambda
return channel -> client.admin()
.indices()
.prepareSearchOnly(index)
.setSearchOnly(searchOnly)
.execute(new RestToXContentListener<>(channel));

Check warning on line 58 in server/src/main/java/org/opensearch/rest/action/admin/indices/RestScaleSearchOnlyAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/rest/action/admin/indices/RestScaleSearchOnlyAction.java#L55-L58

Added lines #L55 - L58 were not covered by tests
}

/**
* Parses and validates the search_only parameter from the request body.
*/
private boolean parseSearchOnlyValue(RestRequest request) {
try {
Map<String, Object> source;
try {
source = request.contentParser().map();
} catch (Exception e) {
throw new IllegalArgumentException("Request body must be valid JSON", e);
}
for (String key : source.keySet()) {
if (!SEARCH_ONLY_FIELD.equals(key)) {
throw new IllegalArgumentException("Unknown parameter [" + key + "]. Only [" + SEARCH_ONLY_FIELD + "] is allowed.");
}
}
if (!source.containsKey(SEARCH_ONLY_FIELD)) {
throw new IllegalArgumentException("Parameter [" + SEARCH_ONLY_FIELD + "] is required");
}
Object value = source.get(SEARCH_ONLY_FIELD);
if (!(value instanceof Boolean)) {
throw new IllegalArgumentException("Parameter [" + SEARCH_ONLY_FIELD + "] must be a boolean (true or false)");
}
return (Boolean) value;
} catch (Exception e) {
if (e instanceof IllegalArgumentException) {
throw e;
}
throw new IllegalArgumentException("Request body must be valid JSON", e);

Check warning on line 89 in server/src/main/java/org/opensearch/rest/action/admin/indices/RestScaleSearchOnlyAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/rest/action/admin/indices/RestScaleSearchOnlyAction.java#L89

Added line #L89 was not covered by tests
}
}
}
Loading

0 comments on commit cb515eb

Please sign in to comment.