Skip to content

Commit

Permalink
Add Convert-Index-To-Remote Action for issue #808 (#1302)
Browse files Browse the repository at this point in the history
Co-authored-by: Seung Yeon Joo <[email protected]>
Co-authored-by: bowenlan-amzn <[email protected]>
  • Loading branch information
3 people authored Feb 5, 2025
1 parent d289c6c commit 705f1ea
Show file tree
Hide file tree
Showing 13 changed files with 705 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import org.opensearch.core.xcontent.XContentParserUtils
import org.opensearch.indexmanagement.indexstatemanagement.action.AliasActionParser
import org.opensearch.indexmanagement.indexstatemanagement.action.AllocationActionParser
import org.opensearch.indexmanagement.indexstatemanagement.action.CloseActionParser
import org.opensearch.indexmanagement.indexstatemanagement.action.ConvertIndexToRemoteActionParser
import org.opensearch.indexmanagement.indexstatemanagement.action.DeleteActionParser
import org.opensearch.indexmanagement.indexstatemanagement.action.ForceMergeActionParser
import org.opensearch.indexmanagement.indexstatemanagement.action.IndexPriorityActionParser
Expand Down Expand Up @@ -52,6 +53,7 @@ class ISMActionsParser private constructor() {
ShrinkActionParser(),
SnapshotActionParser(),
TransformActionParser(),
ConvertIndexToRemoteActionParser(),
)

val customActionExtensionMap = mutableMapOf<String, String>()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.indexstatemanagement.action

import org.opensearch.core.common.io.stream.StreamOutput
import org.opensearch.core.xcontent.ToXContent
import org.opensearch.core.xcontent.XContentBuilder
import org.opensearch.indexmanagement.indexstatemanagement.step.restore.AttemptRestoreStep
import org.opensearch.indexmanagement.spi.indexstatemanagement.Action
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext

class ConvertIndexToRemoteAction(
val repository: String,
val snapshot: String,
index: Int,
) : Action(name, index) {

companion object {
const val name = "convert_index_to_remote"
const val REPOSITORY_FIELD = "repository"
const val SNAPSHOT_FIELD = "snapshot"
}

private val attemptRestoreStep = AttemptRestoreStep(this)

private val steps = listOf(attemptRestoreStep)

override fun getStepToExecute(context: StepContext): Step = attemptRestoreStep

override fun getSteps(): List<Step> = steps

override fun populateAction(builder: XContentBuilder, params: ToXContent.Params) {
builder.startObject(type)
builder.field(REPOSITORY_FIELD, repository)
builder.field(SNAPSHOT_FIELD, snapshot)
builder.endObject()
}

override fun populateAction(out: StreamOutput) {
out.writeString(repository)
out.writeString(snapshot)
out.writeInt(actionIndex)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.indexstatemanagement.action

import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.xcontent.XContentParser
import org.opensearch.core.xcontent.XContentParser.Token
import org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken
import org.opensearch.indexmanagement.indexstatemanagement.action.ConvertIndexToRemoteAction.Companion.REPOSITORY_FIELD
import org.opensearch.indexmanagement.indexstatemanagement.action.ConvertIndexToRemoteAction.Companion.SNAPSHOT_FIELD
import org.opensearch.indexmanagement.spi.indexstatemanagement.Action
import org.opensearch.indexmanagement.spi.indexstatemanagement.ActionParser

class ConvertIndexToRemoteActionParser : ActionParser() {
override fun fromStreamInput(sin: StreamInput): Action {
val repository = sin.readString()
val snapshot = sin.readString()
val index = sin.readInt()
return ConvertIndexToRemoteAction(repository, snapshot, index)
}

override fun fromXContent(xcp: XContentParser, index: Int): Action {
var repository: String? = null
var snapshot: String? = null

ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != Token.END_OBJECT) {
val fieldName = xcp.currentName()
xcp.nextToken()

when (fieldName) {
REPOSITORY_FIELD -> repository = xcp.text()
SNAPSHOT_FIELD -> snapshot = xcp.text()
else -> throw IllegalArgumentException("Invalid field: [$fieldName] found in ConvertIndexToRemoteAction.")
}
}

return ConvertIndexToRemoteAction(
repository = requireNotNull(repository) { "ConvertIndexToRemoteAction repository must be specified" },
snapshot = requireNotNull(snapshot) { "ConvertIndexToRemoteAction snapshot must be specified" },
index = index,
)
}

override fun getActionType(): String = ConvertIndexToRemoteAction.name
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.indexstatemanagement.step.restore

import org.apache.logging.log4j.LogManager
import org.opensearch.ExceptionsHelper
import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsRequest
import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse
import org.opensearch.core.rest.RestStatus
import org.opensearch.indexmanagement.indexstatemanagement.action.ConvertIndexToRemoteAction
import org.opensearch.indexmanagement.opensearchapi.convertToMap
import org.opensearch.indexmanagement.opensearchapi.suspendUntil
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionProperties
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData
import org.opensearch.script.Script
import org.opensearch.script.ScriptService
import org.opensearch.script.ScriptType
import org.opensearch.script.TemplateScript
import org.opensearch.snapshots.SnapshotException
import org.opensearch.snapshots.SnapshotState
import org.opensearch.transport.RemoteTransportException

class AttemptRestoreStep(private val action: ConvertIndexToRemoteAction) : Step(name) {

private val logger = LogManager.getLogger(javaClass)
private var stepStatus = StepStatus.STARTING
private var info: Map<String, Any>? = null
private var snapshotName: String? = null

@Suppress("TooGenericExceptionCaught", "ComplexMethod", "ReturnCount", "LongMethod")
override suspend fun execute(): Step {
val context = this.context ?: return this
val managedIndexMetadata = context.metadata
val indexName = context.metadata.index
val scriptService = context.scriptService
val repository = action.repository
val snapshot = action.snapshot

try {
val mutableInfo = mutableMapOf<String, String>()
val snapshotScript = Script(ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG, snapshot, mapOf())
val defaultSnapshotPattern = snapshot.ifBlank { indexName }
val snapshotPattern = compileTemplate(snapshotScript, managedIndexMetadata, defaultSnapshotPattern, scriptService)

// List snapshots matching the pattern
val getSnapshotsRequest = GetSnapshotsRequest()
.repository(repository)
.snapshots(arrayOf("$snapshotPattern*"))
.ignoreUnavailable(true)
.verbose(true)

val getSnapshotsResponse: GetSnapshotsResponse = context.client.admin().cluster().suspendUntil {
getSnapshots(getSnapshotsRequest, it)
}
val snapshots = getSnapshotsResponse.snapshots
if (snapshots.isNullOrEmpty()) {
val message = getFailedMessage(indexName, "No snapshots found matching pattern [$snapshotPattern*]")
stepStatus = StepStatus.FAILED
info = mapOf("message" to message)
return this
}

val successfulSnapshots = snapshots.filter { it.state() == SnapshotState.SUCCESS }

if (successfulSnapshots.isEmpty()) {
val message = getFailedMessage(
indexName,
"No successful snapshots found matching pattern [$snapshotPattern*]",
)
stepStatus = StepStatus.FAILED
info = mapOf("message" to message)
return this
}

// Select the latest snapshot
val latestSnapshotInfo = successfulSnapshots.maxByOrNull { it.endTime() }!!
logger.info("Restoring snapshot info: $latestSnapshotInfo")

// Use the snapshot name from the selected SnapshotInfo
snapshotName = latestSnapshotInfo.snapshotId().name

// Proceed with the restore operation
val restoreSnapshotRequest = RestoreSnapshotRequest(repository, snapshotName)
.indices(indexName)
.storageType(RestoreSnapshotRequest.StorageType.REMOTE_SNAPSHOT)
.renamePattern("^(.*)\$")
.renameReplacement("$1_remote")
.waitForCompletion(false)
val response: RestoreSnapshotResponse = context.client.admin().cluster().suspendUntil {
restoreSnapshot(restoreSnapshotRequest, it)
}

when (response.status()) {
RestStatus.ACCEPTED, RestStatus.OK -> {
stepStatus = StepStatus.COMPLETED
mutableInfo["message"] = getSuccessMessage(indexName)
}
else -> {
val message = getFailedMessage(indexName, "Unexpected response status: ${response.status()}")
logger.warn("$message - $response")
stepStatus = StepStatus.FAILED
mutableInfo["message"] = message
mutableInfo["cause"] = response.toString()
}
}
info = mutableInfo.toMap()
} catch (e: RemoteTransportException) {
val cause = ExceptionsHelper.unwrapCause(e)
if (cause is SnapshotException) {
handleRestoreException(indexName, cause)
} else {
handleException(indexName, cause as Exception)
}
} catch (e: SnapshotException) {
handleRestoreException(indexName, e)
} catch (e: Exception) {
handleException(indexName, e)
}

return this
}

private fun compileTemplate(
template: Script,
managedIndexMetaData: ManagedIndexMetaData,
defaultValue: String,
scriptService: ScriptService,
): String {
val contextMap =
managedIndexMetaData.convertToMap().filterKeys { key ->
key in validTopContextFields
}
val compiledValue =
scriptService.compile(template, TemplateScript.CONTEXT)
.newInstance(template.params + mapOf("ctx" to contextMap))
.execute()
return compiledValue.ifBlank { defaultValue }
}

private fun handleRestoreException(indexName: String, e: SnapshotException) {
val message = getFailedRestoreMessage(indexName)
logger.debug(message, e)
stepStatus = StepStatus.FAILED
val mutableInfo = mutableMapOf<String, Any>("message" to message)
val errorMessage = e.message
if (errorMessage != null) mutableInfo["cause"] = errorMessage
info = mutableInfo.toMap()
}

private fun handleException(indexName: String, e: Exception) {
val message = getFailedMessage(indexName, e.message ?: "Unknown error")
logger.error(message, e)
stepStatus = StepStatus.FAILED
val mutableInfo = mutableMapOf<String, Any>("message" to message)
val errorMessage = e.message
if (errorMessage != null) mutableInfo["cause"] = errorMessage
info = mutableInfo.toMap()
}

override fun getUpdatedManagedIndexMetadata(currentMetadata: ManagedIndexMetaData): ManagedIndexMetaData {
val currentActionMetaData = currentMetadata.actionMetaData
return currentMetadata.copy(
actionMetaData = currentActionMetaData?.copy(actionProperties = ActionProperties(snapshotName = snapshotName)),
stepMetaData = StepMetaData(name, getStepStartTime(currentMetadata).toEpochMilli(), stepStatus),
transitionTo = null,
info = info,
)
}

override fun isIdempotent(): Boolean = false

companion object {
val validTopContextFields = setOf("index", "indexUuid")
const val name = "attempt_restore"
fun getFailedMessage(index: String, cause: String) = "Failed to start restore for [index=$index], cause: $cause"
fun getFailedRestoreMessage(index: String) = "Failed to start restore due to concurrent restore or snapshot in progress [index=$index]"
fun getSuccessMessage(index: String) = "Successfully started restore for [index=$index]"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class ActionValidation(
"read_write" -> ValidateReadWrite(settings, clusterService, jvmService).execute(indexName)
"replica_count" -> ValidateReplicaCount(settings, clusterService, jvmService).execute(indexName)
"snapshot" -> ValidateSnapshot(settings, clusterService, jvmService).execute(indexName)
"convert_index_to_remote" -> ValidateConvertIndexToRemote(settings, clusterService, jvmService).execute(indexName)
"transition" -> ValidateTransition(settings, clusterService, jvmService).execute(indexName)
"close" -> ValidateClose(settings, clusterService, jvmService).execute(indexName)
"index_priority" -> ValidateIndexPriority(settings, clusterService, jvmService).execute(indexName)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.indexstatemanagement.validation

import org.apache.logging.log4j.LogManager
import org.opensearch.cluster.metadata.MetadataCreateIndexService
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.settings.Settings
import org.opensearch.indexmanagement.spi.indexstatemanagement.Validate
import org.opensearch.indexmanagement.util.OpenForTesting
import org.opensearch.indices.InvalidIndexNameException
import org.opensearch.monitor.jvm.JvmService

@OpenForTesting
class ValidateConvertIndexToRemote(
settings: Settings,
clusterService: ClusterService,
jvmService: JvmService,
) : Validate(settings, clusterService, jvmService) {

private val logger = LogManager.getLogger(javaClass)

@Suppress("ReturnSuppressCount", "ReturnCount")
override fun execute(indexName: String): Validate {
// For restore action, check if index name is valid
if (!validIndexName(indexName)) {
validationStatus = ValidationStatus.FAILED
return this
}

// Optionally, check if the index already exists
// Depending on your requirements, you may want to allow or disallow restoring over existing indices
if (indexExists(indexName)) {
val message = getIndexAlreadyExistsMessage(indexName)
logger.warn(message)
validationStatus = ValidationStatus.FAILED
validationMessage = message
return this
}

validationMessage = getValidationPassedMessage(indexName)
return this
}

private fun indexExists(indexName: String): Boolean {
val indexExists = clusterService.state().metadata.indices.containsKey(indexName)
return indexExists
}

// Checks if the index name is valid according to OpenSearch naming conventions
private fun validIndexName(indexName: String): Boolean {
val exceptionGenerator: (String, String) -> RuntimeException = { name, reason ->
InvalidIndexNameException(name, reason)
}
try {
MetadataCreateIndexService.validateIndexOrAliasName(indexName, exceptionGenerator)
} catch (e: Exception) {
val message = getIndexNotValidMessage(indexName)
logger.warn(message)
validationMessage = message
return false
}
return true
}

@Suppress("TooManyFunctions")
companion object {
const val name = "validate_convert_index_to_remote"
fun getIndexAlreadyExistsMessage(index: String) = "Index [index=$index] already exists, cannot restore over existing index."
fun getIndexNotValidMessage(index: String) = "Index [index=$index] is not valid for restore action."
fun getValidationPassedMessage(index: String) = "Restore action validation passed for [index=$index]"
}
}
Loading

0 comments on commit 705f1ea

Please sign in to comment.