-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Search only replicas (scale to zero) with Reader/Writer Separation #17299
base: main
Are you sure you want to change the base?
Changes from all commits
5c32f43
1d71948
e82f050
e89b812
db5212b
97b4d0e
6e2bb1d
cb515eb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,7 +8,11 @@ | |
|
||
package org.opensearch.indices.replication; | ||
|
||
import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest; | ||
import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreResponse; | ||
import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse; | ||
import org.opensearch.action.search.SearchResponse; | ||
import org.opensearch.action.support.PlainActionFuture; | ||
import org.opensearch.cluster.metadata.IndexMetadata; | ||
import org.opensearch.cluster.metadata.Metadata; | ||
import org.opensearch.common.settings.Settings; | ||
|
@@ -21,7 +25,9 @@ | |
|
||
import java.util.List; | ||
|
||
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; | ||
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS; | ||
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS; | ||
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; | ||
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; | ||
|
||
|
@@ -102,8 +108,8 @@ private void bootstrapIndexWithOutSearchReplicas(ReplicationType replicationType | |
|
||
Settings settings = Settings.builder() | ||
.put(super.indexSettings()) | ||
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) | ||
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) | ||
.put(SETTING_NUMBER_OF_SHARDS, 1) | ||
.put(SETTING_NUMBER_OF_REPLICAS, 1) | ||
.put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 0) | ||
.put(IndexMetadata.SETTING_REPLICATION_TYPE, replicationType) | ||
.build(); | ||
|
@@ -114,13 +120,29 @@ private void bootstrapIndexWithOutSearchReplicas(ReplicationType replicationType | |
ensureGreen(INDEX_NAME); | ||
} | ||
|
||
public void testRemoteStoreRestoreFailsForSearchOnlyIndex() throws Exception { | ||
bootstrapIndexWithSearchReplicas(); | ||
assertAcked(client().admin().indices().prepareSearchOnly(INDEX_NAME).setSearchOnly(true).get()); | ||
|
||
GetSettingsResponse settingsResponse = client().admin().indices().prepareGetSettings(INDEX_NAME).get(); | ||
assertEquals("true", settingsResponse.getSetting(INDEX_NAME, IndexMetadata.INDEX_BLOCKS_SEARCH_ONLY_SETTING.getKey())); | ||
|
||
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> { | ||
PlainActionFuture<RestoreRemoteStoreResponse> future = PlainActionFuture.newFuture(); | ||
client().admin().cluster().restoreRemoteStore(new RestoreRemoteStoreRequest().indices(INDEX_NAME), future); | ||
future.actionGet(); | ||
}); | ||
|
||
assertTrue(exception.getMessage().contains("Cannot restore index [" + INDEX_NAME + "] because search-only mode is enabled")); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. how are we handling multiple indices where some are SO and some aren't? Also we should add a bit more to this message soemething like - There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about something like
Honoring the same if for example
In before commit I have added a silent ignore, but with existing latest code it throws error stopping the operation. |
||
} | ||
|
||
private void bootstrapIndexWithSearchReplicas() throws InterruptedException { | ||
startCluster(3); | ||
|
||
Settings settings = Settings.builder() | ||
.put(super.indexSettings()) | ||
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) | ||
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) | ||
.put(SETTING_NUMBER_OF_SHARDS, 1) | ||
.put(SETTING_NUMBER_OF_REPLICAS, 1) | ||
.put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 1) | ||
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) | ||
.build(); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.action.admin.indices.scale.searchonly; | ||
|
||
import org.opensearch.core.common.io.stream.StreamInput; | ||
import org.opensearch.core.common.io.stream.StreamOutput; | ||
import org.opensearch.core.index.shard.ShardId; | ||
import org.opensearch.transport.TransportRequest; | ||
|
||
import java.io.IOException; | ||
import java.util.List; | ||
|
||
/** | ||
* A transport request sent to nodes to facilitate shard synchronization during search-only scaling operations. | ||
* <p> | ||
* This request is sent from the cluster manager to data nodes that host primary shards for the target index | ||
* during scale operations. It contains the index name and a list of shard IDs that need to be synchronized | ||
* before completing a scale-down operation. | ||
* <p> | ||
* When a node receives this request, it performs final sync and flush operations on the specified shards, | ||
* ensuring all operations are committed and the remote store is synced. This is a crucial step in | ||
* the scale-down process to ensure no data loss occurs when the index transitions to search-only mode. | ||
*/ | ||
class NodeSearchOnlyRequest extends TransportRequest { | ||
private final String index; | ||
private final List<ShardId> shardIds; | ||
|
||
/** | ||
* Constructs a new NodeSearchOnlyRequest. | ||
* | ||
* @param index the name of the index being scaled | ||
* @param shardIds the list of shard IDs to be synchronized on the target node | ||
*/ | ||
NodeSearchOnlyRequest(String index, List<ShardId> shardIds) { | ||
this.index = index; | ||
this.shardIds = shardIds; | ||
} | ||
|
||
/** | ||
* Deserialization constructor. | ||
* | ||
* @param in the stream input to read from | ||
* @throws IOException if there is an I/O error during deserialization | ||
*/ | ||
NodeSearchOnlyRequest(StreamInput in) throws IOException { | ||
super(in); | ||
this.index = in.readString(); | ||
this.shardIds = in.readList(ShardId::new); | ||
} | ||
|
||
/** | ||
* Serializes this request to the given output stream. | ||
* | ||
* @param out the output stream to write to | ||
* @throws IOException if there is an I/O error during serialization | ||
*/ | ||
@Override | ||
public void writeTo(StreamOutput out) throws IOException { | ||
super.writeTo(out); | ||
out.writeString(index); | ||
out.writeList(shardIds); | ||
} | ||
|
||
/** | ||
* Returns the index name associated with this request. | ||
* | ||
* @return the index name | ||
*/ | ||
String getIndex() { | ||
return index; | ||
} | ||
|
||
/** | ||
* Returns the list of shard IDs to be synchronized. | ||
* | ||
* @return the list of shard IDs | ||
*/ | ||
List<ShardId> getShardIds() { | ||
return shardIds; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,89 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.action.admin.indices.scale.searchonly; | ||
|
||
import org.opensearch.cluster.node.DiscoveryNode; | ||
import org.opensearch.core.common.io.stream.StreamInput; | ||
import org.opensearch.core.common.io.stream.StreamOutput; | ||
import org.opensearch.core.transport.TransportResponse; | ||
|
||
import java.io.IOException; | ||
import java.util.List; | ||
|
||
/** | ||
* Response sent from nodes after processing a {@link NodeSearchOnlyRequest} during search-only scaling operations. | ||
* <p> | ||
* This response contains information about the node that processed the request and the results of | ||
* synchronization attempts for each requested shard. The cluster manager uses these responses to | ||
* determine whether it's safe to proceed with finalizing a scale-down operation. | ||
* <p> | ||
* Each response includes details about whether shards have any uncommitted operations or need | ||
* additional synchronization, which would indicate the scale operation should be delayed until | ||
* the cluster reaches a stable state. | ||
*/ | ||
class NodeSearchOnlyResponse extends TransportResponse { | ||
private final DiscoveryNode node; | ||
private final List<ShardSearchOnlyResponse> shardResponses; | ||
|
||
/** | ||
* Constructs a new NodeSearchOnlyResponse. | ||
* | ||
* @param node the node that processed the synchronization request | ||
* @param shardResponses the list of responses from individual shard synchronization attempts | ||
*/ | ||
NodeSearchOnlyResponse(DiscoveryNode node, List<ShardSearchOnlyResponse> shardResponses) { | ||
this.node = node; | ||
this.shardResponses = shardResponses; | ||
} | ||
|
||
/** | ||
* Deserialization constructor. | ||
* | ||
* @param in the stream input to read from | ||
* @throws IOException if there is an I/O error during deserialization | ||
*/ | ||
NodeSearchOnlyResponse(StreamInput in) throws IOException { | ||
node = new DiscoveryNode(in); | ||
shardResponses = in.readList(ShardSearchOnlyResponse::new); | ||
} | ||
|
||
/** | ||
* Serializes this response to the given output stream. | ||
* | ||
* @param out the output stream to write to | ||
* @throws IOException if there is an I/O error during serialization | ||
*/ | ||
@Override | ||
public void writeTo(StreamOutput out) throws IOException { | ||
node.writeTo(out); | ||
out.writeList(shardResponses); | ||
} | ||
|
||
/** | ||
* Returns the node that processed the synchronization request. | ||
* | ||
* @return the discovery node information | ||
*/ | ||
public DiscoveryNode getNode() { | ||
return node; | ||
} | ||
|
||
/** | ||
* Returns the list of shard-level synchronization responses. | ||
* <p> | ||
* These responses contain critical information about the state of each shard, | ||
* including whether there are uncommitted operations or if additional synchronization | ||
* is needed before the scale operation can safely proceed. | ||
* | ||
* @return the list of shard responses | ||
*/ | ||
public List<ShardSearchOnlyResponse> getShardResponses() { | ||
return shardResponses; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have used these settings to test the search only replicas with remote store as filesystem, will remove them and change to default once I have the green tests and finalized the PR review.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if we should add a param to easily set these from command line with ./gradlew run.