diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ShrinkAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ShrinkAction.kt index d91bd281b..0a599fd0c 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ShrinkAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ShrinkAction.kt @@ -28,6 +28,7 @@ class ShrinkAction( val percentageOfSourceShards: Double?, val targetIndexTemplate: Script?, val aliases: List?, + val switchAliases: Boolean = false, val forceUnsafe: Boolean?, index: Int ) : Action(name, index) { @@ -104,6 +105,7 @@ class ShrinkAction( if (percentageOfSourceShards != null) builder.field(PERCENTAGE_OF_SOURCE_SHARDS_FIELD, percentageOfSourceShards) if (targetIndexTemplate != null) builder.field(TARGET_INDEX_TEMPLATE_FIELD, targetIndexTemplate) if (aliases != null) { builder.aliasesField(aliases) } + builder.field(SWITCH_ALIASES, switchAliases) if (forceUnsafe != null) builder.field(FORCE_UNSAFE_FIELD, forceUnsafe) builder.endObject() } @@ -120,6 +122,7 @@ class ShrinkAction( } else { out.writeBoolean(false) } + out.writeBoolean(switchAliases) out.writeOptionalBoolean(forceUnsafe) out.writeInt(actionIndex) } @@ -131,6 +134,7 @@ class ShrinkAction( const val MAX_SHARD_SIZE_FIELD = "max_shard_size" const val TARGET_INDEX_TEMPLATE_FIELD = "target_index_name_template" const val ALIASES_FIELD = "aliases" + const val SWITCH_ALIASES = "switch_aliases" const val FORCE_UNSAFE_FIELD = "force_unsafe" const val LOCK_SOURCE_JOB_ID = "shrink-node_name" fun getSecurityFailureMessage(failure: String) = "Shrink action failed because of missing permissions: $failure" diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ShrinkActionParser.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ShrinkActionParser.kt index 4d32de907..cfaedf169 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ShrinkActionParser.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ShrinkActionParser.kt @@ -15,6 +15,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.action.ShrinkAction.C import org.opensearch.indexmanagement.indexstatemanagement.action.ShrinkAction.Companion.MAX_SHARD_SIZE_FIELD import org.opensearch.indexmanagement.indexstatemanagement.action.ShrinkAction.Companion.NUM_NEW_SHARDS_FIELD import org.opensearch.indexmanagement.indexstatemanagement.action.ShrinkAction.Companion.PERCENTAGE_OF_SOURCE_SHARDS_FIELD +import org.opensearch.indexmanagement.indexstatemanagement.action.ShrinkAction.Companion.SWITCH_ALIASES import org.opensearch.indexmanagement.indexstatemanagement.action.ShrinkAction.Companion.TARGET_INDEX_TEMPLATE_FIELD import org.opensearch.indexmanagement.spi.indexstatemanagement.Action import org.opensearch.indexmanagement.spi.indexstatemanagement.ActionParser @@ -27,10 +28,11 @@ class ShrinkActionParser : ActionParser() { val percentageOfSourceShards = sin.readOptionalDouble() val targetIndexTemplate = if (sin.readBoolean()) Script(sin) else null val aliases = if (sin.readBoolean()) sin.readList(::Alias) else null + val switchAliases = sin.readBoolean() val forceUnsafe = sin.readOptionalBoolean() val index = sin.readInt() - return ShrinkAction(numNewShards, maxShardSize, percentageOfSourceShards, targetIndexTemplate, aliases, forceUnsafe, index) + return ShrinkAction(numNewShards, maxShardSize, percentageOfSourceShards, targetIndexTemplate, aliases, switchAliases, forceUnsafe, index) } @Suppress("NestedBlockDepth") @@ -40,6 +42,7 @@ class ShrinkActionParser : ActionParser() { var percentageOfSourceShards: Double? = null var targetIndexTemplate: Script? = null var aliases: List? = null + var switchAliases = false var forceUnsafe: Boolean? = null ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) @@ -63,12 +66,13 @@ class ShrinkActionParser : ActionParser() { } } } + SWITCH_ALIASES -> switchAliases = xcp.booleanValue() FORCE_UNSAFE_FIELD -> forceUnsafe = xcp.booleanValue() else -> throw IllegalArgumentException("Invalid field: [$fieldName] found in ShrinkAction.") } } - return ShrinkAction(numNewShards, maxShardSize, percentageOfSourceShards, targetIndexTemplate, aliases, forceUnsafe, index) + return ShrinkAction(numNewShards, maxShardSize, percentageOfSourceShards, targetIndexTemplate, aliases, switchAliases, forceUnsafe, index) } override fun getActionType(): String { diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/WaitForShrinkStep.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/WaitForShrinkStep.kt index 906360039..33c7fb111 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/WaitForShrinkStep.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/WaitForShrinkStep.kt @@ -5,6 +5,8 @@ package org.opensearch.indexmanagement.indexstatemanagement.step.shrink +import org.opensearch.action.admin.indices.alias.IndicesAliasesRequest +import org.opensearch.action.admin.indices.alias.IndicesAliasesRequest.AliasActions import org.opensearch.action.admin.indices.stats.IndicesStatsRequest import org.opensearch.action.admin.indices.stats.IndicesStatsResponse import org.opensearch.action.support.master.AcknowledgedResponse @@ -12,13 +14,14 @@ import org.opensearch.client.Client import org.opensearch.cluster.service.ClusterService import org.opensearch.common.settings.Settings import org.opensearch.indexmanagement.indexstatemanagement.action.ShrinkAction -import org.opensearch.indexmanagement.indexstatemanagement.util.resetReadOnlyAndRouting import org.opensearch.indexmanagement.indexstatemanagement.util.deleteShrinkLock import org.opensearch.indexmanagement.indexstatemanagement.util.getActionStartTime import org.opensearch.indexmanagement.indexstatemanagement.util.issueUpdateSettingsRequest +import org.opensearch.indexmanagement.indexstatemanagement.util.resetReadOnlyAndRouting import org.opensearch.indexmanagement.opensearchapi.suspendUntil import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionProperties import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ShrinkActionProperties import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData import java.time.Duration @@ -45,8 +48,15 @@ class WaitForShrinkStep(private val action: ShrinkAction) : ShrinkStep(name, tru if (!deleteShrinkLock(localShrinkActionProperties, context.lockService, logger)) { logger.error("Failed to delete Shrink action lock on node [${localShrinkActionProperties.nodeName}]") } - stepStatus = StepStatus.COMPLETED - info = mapOf("message" to SUCCESS_MESSAGE) + + if (switchAliases(context, localShrinkActionProperties)) { + stepStatus = StepStatus.COMPLETED + info = mapOf("message" to SUCCESS_MESSAGE) + } else { + stepStatus = StepStatus.FAILED + info = mapOf("message" to "Shrink failed due to aliases switch failure.") + } + return this } @@ -91,6 +101,64 @@ class WaitForShrinkStep(private val action: ShrinkAction) : ShrinkStep(name, tru } } + private suspend fun switchAliases(context: StepContext, shrinkActionProperties: ShrinkActionProperties): Boolean { + + val sourceIndexName = context.metadata.index + val targetIndexName = shrinkActionProperties.targetIndexName + + if (!action.switchAliases) { + logger.info("Switch aliases disabled from [$sourceIndexName] to [$targetIndexName].") + return true + } + + logger.info("Switching aliases from [$sourceIndexName] to [$targetIndexName].") + + val targetIndexAliasesNames = context + .clusterService + .state() + .metadata() + .index(targetIndexName) + .aliases + .keys + val sourceIndexAliases = context + .clusterService + .state() + .metadata() + .index(sourceIndexName) + .aliases + .values + + val req = IndicesAliasesRequest() + sourceIndexAliases.map { it.alias }.forEach { req.addAliasAction(AliasActions(AliasActions.Type.REMOVE).index(sourceIndexName).alias(it)) } + + sourceIndexAliases + .filterNot { targetIndexAliasesNames.contains(it.alias) } + .map { + AliasActions(AliasActions.Type.ADD) + .index(targetIndexName) + .alias(it.alias) + .filter(it.filter?.string()) + .indexRouting(it.indexRouting) + .searchRouting(it.searchRouting) + .isHidden(it.isHidden) + .writeIndex(it.writeIndex()) + } + .forEach { req.addAliasAction(it) } + + return try { + val response: AcknowledgedResponse = context.client.admin().indices().suspendUntil { aliases(req, it) } + if (response.isAcknowledged) { + logger.info("Aliases switched successfully from [$sourceIndexName] to [$targetIndexName].") + } else { + logger.error("Switching aliases from [$sourceIndexName] to [$targetIndexName] failed.") + } + response.isAcknowledged + } catch (e: Exception) { + logger.error("Switching aliases from [$sourceIndexName] to [$targetIndexName] failed due to exception.", e) + false + } + } + override fun getUpdatedManagedIndexMetadata(currentMetadata: ManagedIndexMetaData): ManagedIndexMetaData { return currentMetadata.copy( actionMetaData = currentMetadata.actionMetaData?.copy( diff --git a/src/main/resources/mappings/opendistro-ism-config.json b/src/main/resources/mappings/opendistro-ism-config.json index bdcdcea31..4c138a267 100644 --- a/src/main/resources/mappings/opendistro-ism-config.json +++ b/src/main/resources/mappings/opendistro-ism-config.json @@ -1,6 +1,6 @@ { "_meta" : { - "schema_version": 20 + "schema_version": 21 }, "dynamic": "strict", "properties": { @@ -551,6 +551,9 @@ "type": "object", "enabled": false }, + "switch_aliases": { + "type": "boolean" + }, "force_unsafe": { "type": "boolean" } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt index 80813c19a..6f56a174f 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt @@ -42,7 +42,7 @@ import javax.management.remote.JMXServiceURL abstract class IndexManagementRestTestCase : ODFERestTestCase() { - val configSchemaVersion = 20 + val configSchemaVersion = 21 val historySchemaVersion = 7 // Having issues with tests leaking into other tests and mappings being incorrect and they are not caught by any pending task wait check as diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/TestHelpers.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/TestHelpers.kt index 0693d77d0..d6fa0bed7 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/TestHelpers.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/TestHelpers.kt @@ -153,16 +153,17 @@ fun randomShrinkAction( percentageOfSourceShards: Double? = null, targetIndexTemplate: Script? = if (randomBoolean()) randomTemplateScript(randomAlphaOfLength(10)) else null, aliases: List? = if (randomBoolean()) randomList(10) { randomAlias() } else null, + switchAliases: Boolean = randomBoolean(), forceUnsafe: Boolean? = if (randomBoolean()) randomBoolean() else null ): ShrinkAction { if (numNewShards == null && maxShardSize == null && percentageOfSourceShards == null) { when (randomInt(2)) { - 0 -> return ShrinkAction(abs(randomInt()) + 1, null, null, targetIndexTemplate, aliases, forceUnsafe, 0) - 1 -> return ShrinkAction(null, randomByteSizeValue(), null, targetIndexTemplate, aliases, forceUnsafe, 0) - 2 -> return ShrinkAction(null, null, randomDoubleBetween(0.0, 1.0, true), targetIndexTemplate, aliases, forceUnsafe, 0) + 0 -> return ShrinkAction(abs(randomInt()) + 1, null, null, targetIndexTemplate, aliases, switchAliases, forceUnsafe, 0) + 1 -> return ShrinkAction(null, randomByteSizeValue(), null, targetIndexTemplate, aliases, switchAliases, forceUnsafe, 0) + 2 -> return ShrinkAction(null, null, randomDoubleBetween(0.0, 1.0, true), targetIndexTemplate, aliases, switchAliases, forceUnsafe, 0) } } - return ShrinkAction(numNewShards, maxShardSize, percentageOfSourceShards, targetIndexTemplate, aliases, forceUnsafe, 0) + return ShrinkAction(numNewShards, maxShardSize, percentageOfSourceShards, targetIndexTemplate, aliases, switchAliases, forceUnsafe, 0) } fun randomReadOnlyActionConfig(): ReadOnlyAction { diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ShrinkActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ShrinkActionIT.kt index 5f2e54cbc..7b08d5ad7 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ShrinkActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ShrinkActionIT.kt @@ -8,6 +8,7 @@ package org.opensearch.indexmanagement.indexstatemanagement.action import org.apache.hc.core5.http.ContentType import org.apache.hc.core5.http.io.entity.StringEntity import org.apache.logging.log4j.LogManager +import org.junit.Assert import org.junit.Assume import org.junit.Before import org.opensearch.action.admin.indices.alias.Alias @@ -17,6 +18,8 @@ import org.opensearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_R import org.opensearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING import org.opensearch.common.settings.Settings import org.opensearch.core.common.unit.ByteSizeValue +import org.opensearch.core.rest.RestStatus +import org.opensearch.core.xcontent.MediaTypeRegistry import org.opensearch.index.query.QueryBuilders import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX import org.opensearch.indexmanagement.indexstatemanagement.IndexStateManagementRestTestCase @@ -30,7 +33,6 @@ import org.opensearch.indexmanagement.indexstatemanagement.step.shrink.WaitForSh import org.opensearch.indexmanagement.makeRequest import org.opensearch.indexmanagement.spi.indexstatemanagement.Step import org.opensearch.indexmanagement.waitFor -import org.opensearch.core.rest.RestStatus import org.opensearch.script.Script import org.opensearch.script.ScriptType import java.time.Instant @@ -237,6 +239,143 @@ class ShrinkActionIT : IndexStateManagementRestTestCase() { assertShrinkActionRun(indexName, policyID, excludedNode) } + @Suppress("UNCHECKED_CAST") + fun `test switch aliases`() { + val indexName = "${testIndexName}_index_4" + val aliasToSwitch = Alias("${indexName}_alias_to_switch") + .writeIndex(false) + .isHidden(false) + .filter("""{"term":{"switch":"switch"}}""") + .routing("1") + + val aliasToOverride = Alias("${indexName}_alias_to_override") + .writeIndex(true) + .isHidden(false) + .filter("""{"term":{"overridden":"overridden"}}""") + .routing("2") + + val aliasToAdd = Alias("${indexName}_alias_to_add") + .writeIndex(false) + .isHidden(false) + .filter("""{"term":{"add":"add"}}""") + .routing("3") + + val policyID = "${testIndexName}_testPolicyName_3" + + val shrinkAction = ShrinkAction( + numNewShards = null, + maxShardSize = null, + percentageOfSourceShards = 0.5, + targetIndexTemplate = Script(ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG, "{{ctx.index}}$testIndexSuffix", mapOf()), + aliases = listOf(aliasToOverride, aliasToAdd), + switchAliases = true, + forceUnsafe = true, + index = 0 + ) + val states = listOf(State("ShrinkState", listOf(shrinkAction), listOf())) + + val policy = Policy( + id = policyID, + description = "$testIndexName description", + schemaVersion = 11L, + lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS), + errorNotification = randomErrorNotification(), + defaultState = states[0].name, + states = states + ) + + createPolicy(policy, policyID) + createIndex(indexName, policyID, null, "0", "3", "") + changeAlias( + index = indexName, alias = aliasToSwitch.name(), action = "add", filter = aliasToSwitch.filter(), isWriteIndex = aliasToSwitch.writeIndex(), isHidden = aliasToSwitch.isHidden, + routing = aliasToSwitch.indexRouting().toInt(), indexRouting = aliasToSwitch.indexRouting().toInt(), searchRouting = aliasToSwitch.searchRouting().toInt() + ) + changeAlias( + index = indexName, alias = aliasToOverride.name(), action = "add", filter = aliasToOverride.filter(), isWriteIndex = false, isHidden = aliasToOverride.isHidden, + routing = aliasToOverride.indexRouting().toInt(), indexRouting = aliasToOverride.indexRouting().toInt(), searchRouting = aliasToOverride.searchRouting().toInt() + ) + + insertSampleData(indexName, 3) + + // Will change the startTime each execution so that it triggers in 2 seconds + // First execution: Policy is initialized + val managedIndexConfig = getExistingManagedIndexConfig(indexName) + + updateManagedIndexConfigStartTime(managedIndexConfig) + waitFor(Instant.ofEpochSecond(60)) { assertEquals(policyID, getExplainManagedIndexMetaData(indexName).policyID) } + // Starts AttemptMoveShardsStep + updateManagedIndexConfigStartTime(managedIndexConfig) + + val targetIndexName = indexName + testIndexSuffix + waitFor(Instant.ofEpochSecond(60)) { + assertEquals(targetIndexName, getExplainManagedIndexMetaData(indexName).actionMetaData!!.actionProperties!!.shrinkActionProperties!!.targetIndexName) + assertEquals("true", getIndexBlocksWriteSetting(indexName)) + assertNotNull("Couldn't find node to shrink onto.", getExplainManagedIndexMetaData(indexName).actionMetaData!!.actionProperties!!.shrinkActionProperties!!.nodeName) + val settings = getFlatSettings(indexName) + val nodeToShrink = getExplainManagedIndexMetaData(indexName).actionMetaData!!.actionProperties!!.shrinkActionProperties!!.nodeName + assertTrue(settings.containsKey("index.routing.allocation.require._name")) + assertEquals(nodeToShrink, settings["index.routing.allocation.require._name"]) + assertEquals( + AttemptMoveShardsStep.getSuccessMessage(nodeToShrink), + getExplainManagedIndexMetaData(indexName).info?.get("message") + ) + } + + val nodeToShrink = getExplainManagedIndexMetaData(indexName).actionMetaData!!.actionProperties!!.shrinkActionProperties!!.nodeName + + // starts WaitForMoveShardsStep + updateManagedIndexConfigStartTime(managedIndexConfig) + waitFor(Instant.ofEpochSecond(60)) { + assertEquals( + WaitForMoveShardsStep.getSuccessMessage(nodeToShrink), + getExplainManagedIndexMetaData(indexName).info?.get("message") + ) + } + // Wait for move should finish before this. Starts AttemptShrinkStep + updateManagedIndexConfigStartTime(managedIndexConfig) + waitFor(Instant.ofEpochSecond(50)) { + assertTrue("Target index is not created", indexExists(targetIndexName)) + assertEquals( + AttemptShrinkStep.getSuccessMessage(targetIndexName), + getExplainManagedIndexMetaData(indexName).info?.get("message") + ) + } + + // starts WaitForShrinkStep + updateManagedIndexConfigStartTime(managedIndexConfig) + waitFor(Instant.ofEpochSecond(60)) { + val sourceIndexAliases = getAlias(indexName, "") + assertTrue("Source index aliases list must be empty after alias switch.", sourceIndexAliases.isEmpty()) + + val targetIndexAliases = getAlias(targetIndexName, "") + assertEquals("Target index aliases count is incorrect.", 3, targetIndexAliases.size) + + assertTrue("Target index must contain shrink action alias.", targetIndexAliases.containsKey(aliasToAdd.name())) + assertAliasesEqual(aliasToAdd, targetIndexAliases[aliasToAdd.name()]) + + assertTrue("Target index must contain switched source index alias.", targetIndexAliases.containsKey(aliasToSwitch.name())) + assertAliasesEqual(aliasToSwitch, targetIndexAliases[aliasToSwitch.name()]) + + assertTrue("Target index must contain shrink action alias which overrides source index alias.", targetIndexAliases.containsKey(aliasToOverride.name())) + assertAliasesEqual(aliasToOverride, targetIndexAliases[aliasToOverride.name()]) + } + } + + @Suppress("UNCHECKED_CAST") + private fun assertAliasesEqual(expectedAlas: Alias, actualAliasRaw: Any?) { + Assert.assertNotNull("Actual alias to compare must not be null.", actualAliasRaw) + val actualAlias = actualAliasRaw as Map + assertEquals(expectedAlas.writeIndex() ?: false, actualAlias["is_write_index"] ?: false) + assertEquals(expectedAlas.isHidden ?: false, actualAlias["is_hidden"] ?: false) + assertEquals(expectedAlas.searchRouting(), actualAlias["search_routing"]) + assertEquals(expectedAlas.indexRouting(), actualAlias["index_routing"]) + + val builder = MediaTypeRegistry.contentBuilder(MediaTypeRegistry.JSON) + builder.map(actualAlias["filter"] as Map) + val actualFilter = builder.toString() + assertEquals(expectedAlas.filter(), actualFilter) + } + fun `test no-op with single source index primary shard`() { val logger = LogManager.getLogger(::ShrinkActionIT) val indexName = "${testIndexName}_index_shard_noop" diff --git a/src/test/resources/mappings/cached-opendistro-ism-config.json b/src/test/resources/mappings/cached-opendistro-ism-config.json index bdcdcea31..4c138a267 100644 --- a/src/test/resources/mappings/cached-opendistro-ism-config.json +++ b/src/test/resources/mappings/cached-opendistro-ism-config.json @@ -1,6 +1,6 @@ { "_meta" : { - "schema_version": 20 + "schema_version": 21 }, "dynamic": "strict", "properties": { @@ -551,6 +551,9 @@ "type": "object", "enabled": false }, + "switch_aliases": { + "type": "boolean" + }, "force_unsafe": { "type": "boolean" }