diff --git a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/Step.kt b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/Step.kt index a259a4b1b..9ff5c9fcb 100644 --- a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/Step.kt +++ b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/Step.kt @@ -57,6 +57,7 @@ abstract class Step(val name: String, val isSafeToDisableOn: Boolean = true) { CONDITION_NOT_MET("condition_not_met"), FAILED("failed"), COMPLETED("completed"), + TIMED_OUT("timed_out"), ; override fun toString(): String { diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt index 89de1efa5..d0a3e0269 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt @@ -93,6 +93,7 @@ import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedInde import org.opensearch.indexmanagement.spi.indexstatemanagement.model.PolicyRetryInfoMetaData import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StateMetaData import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData import org.opensearch.jobscheduler.spi.JobExecutionContext import org.opensearch.jobscheduler.spi.LockModel import org.opensearch.jobscheduler.spi.ScheduledJobParameter @@ -339,13 +340,18 @@ object ManagedIndexRunner : if (action?.hasTimedOut(currentActionMetaData) == true) { val info = mapOf("message" to "Action timed out") logger.error("Action=${action.type} has timed out") - val updated = updateManagedIndexMetaData( - managedIndexMetaData - .copy(actionMetaData = currentActionMetaData?.copy(failed = true), info = info), + + val updatedIndexMetaData = managedIndexMetaData.copy( + actionMetaData = currentActionMetaData?.copy(failed = true), + stepMetaData = step?.let { StepMetaData(it.name, System.currentTimeMillis(), Step.StepStatus.TIMED_OUT) }, + info = info, ) + + val updated = updateManagedIndexMetaData(updatedIndexMetaData) + if (updated.metadataSaved) { disableManagedIndexConfig(managedIndexConfig) - publishErrorNotification(policy, managedIndexMetaData) + publishErrorNotification(policy, updatedIndexMetaData) } return } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ActionTimeoutIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ActionTimeoutIT.kt index e4cfd8392..54bdd764c 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ActionTimeoutIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ActionTimeoutIT.kt @@ -8,8 +8,10 @@ package org.opensearch.indexmanagement.indexstatemanagement.action import org.opensearch.indexmanagement.indexstatemanagement.IndexStateManagementRestTestCase import org.opensearch.indexmanagement.indexstatemanagement.step.open.AttemptOpenStep import org.opensearch.indexmanagement.indexstatemanagement.step.rollover.AttemptRolloverStep +import org.opensearch.indexmanagement.spi.indexstatemanagement.Step import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionMetaData import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData import org.opensearch.indexmanagement.waitFor import java.time.Instant import java.util.Locale @@ -20,11 +22,12 @@ class ActionTimeoutIT : IndexStateManagementRestTestCase() { fun `test failed action`() { val indexName = "${testIndexName}_index_1" val policyID = "${testIndexName}_testPolicyName_1" - val testPolicy = """ + val testPolicy = + """ {"policy":{"description":"Default policy","default_state":"rolloverstate","states":[ {"name":"rolloverstate","actions":[{"timeout":"1s","rollover":{"min_doc_count":100}}], "transitions":[]}]}} - """.trimIndent() + """.trimIndent() createPolicyJson(testPolicy, policyID) @@ -52,16 +55,32 @@ class ActionTimeoutIT : IndexStateManagementRestTestCase() { waitFor { assertPredicatesOnMetaData( listOf( - indexName to listOf( - ActionMetaData.ACTION to fun(actionMetaDataMap: Any?): Boolean = - assertActionEquals( - ActionMetaData( - name = RolloverAction.name, startTime = Instant.now().toEpochMilli(), index = 0, - failed = true, consumedRetries = 0, lastRetryTime = null, actionProperties = null, - ), - actionMetaDataMap, - ), - ), + indexName to + listOf( + ActionMetaData.ACTION to + + fun(actionMetaDataMap: Any?): Boolean = + assertActionEquals( + ActionMetaData( + name = RolloverAction.name, + startTime = Instant.now().toEpochMilli(), + index = 0, + failed = true, + consumedRetries = 0, + lastRetryTime = null, + actionProperties = null, + ), + actionMetaDataMap, + ), + StepMetaData.STEP to + fun(stepMetaDataMap: Any?): Boolean = + assertStepEquals( + StepMetaData( + "attempt_rollover", Instant.now().toEpochMilli(), Step.StepStatus.TIMED_OUT, + ), + stepMetaDataMap, + ), + ), ), getExplainMap(indexName), strict = false, @@ -73,11 +92,12 @@ class ActionTimeoutIT : IndexStateManagementRestTestCase() { fun `test action timeout doesn't bleed over into next action`() { val indexName = "${testIndexName}_index_2" val policyID = "${testIndexName}_testPolicyName_2" - val testPolicy = """ - {"policy":{"description":"Default policy","default_state":"rolloverstate","states":[ - {"name":"rolloverstate","actions":[{"timeout": "5s","open":{}},{"timeout":"1s","rollover":{"min_doc_count":100}}], - "transitions":[]}]}} - """.trimIndent() + val testPolicy = + """ + {"policy":{"description":"Default policy","default_state":"rolloverstate","states":[ + {"name":"rolloverstate","actions":[{"timeout": "5s","open":{}},{"timeout":"1s","rollover":{"min_doc_count":100}}], + "transitions":[]}]}} + """.trimIndent() createPolicyJson(testPolicy, policyID) @@ -96,7 +116,14 @@ class ActionTimeoutIT : IndexStateManagementRestTestCase() { val expectedOpenInfoString = mapOf("message" to AttemptOpenStep.getSuccessMessage(indexName)).toString() waitFor { assertPredicatesOnMetaData( - listOf(indexName to listOf(ManagedIndexMetaData.INFO to fun(info: Any?): Boolean = expectedOpenInfoString == info.toString())), + listOf( + indexName to + listOf( + ManagedIndexMetaData.INFO to + + fun(info: Any?): Boolean = expectedOpenInfoString == info.toString(), + ), + ), getExplainMap(indexName), strict = false, )