From d5e7787e38ca28901665c8c773eb5a7efb8fabb5 Mon Sep 17 00:00:00 2001 From: Prudhvi Godithi Date: Mon, 3 Feb 2025 11:20:35 -0800 Subject: [PATCH] Scale to zero 2nd interation Signed-off-by: Prudhvi Godithi --- .../org/opensearch/action/ActionModule.java | 10 +- .../indices/scale/PreScaleSyncAction.java | 21 - .../indices/scale/ScaleRequestBuilder.java | 28 - .../scale/TransportPreScaleSyncAction.java | 573 ----------------- .../NodeSearchOnlyRequest.java} | 8 +- .../NodeSearchOnlyResponse.java} | 14 +- .../indices/searchonly/SearchOnlyAction.java | 21 + .../SearchOnlyRequest.java} | 29 +- .../searchonly/SearchOnlyRequestBuilder.java | 28 + .../SearchOnlyResponse.java} | 16 +- .../ShardSearchOnlyResponse.java} | 8 +- .../searchonly/TransportSearchOnlyAction.java | 576 ++++++++++++++++++ .../{scale => searchonly}/package-info.java | 2 +- .../opensearch/client/IndicesAdminClient.java | 12 +- .../client/support/AbstractClient.java | 6 + .../cluster/block/ClusterBlocks.java | 4 + .../cluster/health/ClusterShardHealth.java | 2 +- .../cluster/metadata/IndexMetadata.java | 16 +- .../cluster/routing/IndexRoutingTable.java | 94 ++- .../cluster/routing/RoutingNodes.java | 2 +- .../cluster/routing/ShardRouting.java | 4 +- ...caAfterPrimaryActiveAllocationDecider.java | 16 +- .../common/settings/IndexScopedSettings.java | 5 + .../gateway/ReplicaShardAllocator.java | 62 +- .../org/opensearch/index/IndexSettings.java | 12 + .../RemoteStoreReplicationSource.java | 38 +- ...eAction.java => RestSearchonlyAction.java} | 13 +- 27 files changed, 883 insertions(+), 737 deletions(-) delete mode 100644 server/src/main/java/org/opensearch/action/admin/indices/scale/PreScaleSyncAction.java delete mode 100644 server/src/main/java/org/opensearch/action/admin/indices/scale/ScaleRequestBuilder.java delete mode 100644 server/src/main/java/org/opensearch/action/admin/indices/scale/TransportPreScaleSyncAction.java rename server/src/main/java/org/opensearch/action/admin/indices/{scale/NodePreScaleSyncRequest.java => searchonly/NodeSearchOnlyRequest.java} (79%) rename server/src/main/java/org/opensearch/action/admin/indices/{scale/NodePreScaleSyncResponse.java => searchonly/NodeSearchOnlyResponse.java} (64%) create mode 100644 server/src/main/java/org/opensearch/action/admin/indices/searchonly/SearchOnlyAction.java rename server/src/main/java/org/opensearch/action/admin/indices/{scale/PreScaleSyncRequest.java => searchonly/SearchOnlyRequest.java} (72%) create mode 100644 server/src/main/java/org/opensearch/action/admin/indices/searchonly/SearchOnlyRequestBuilder.java rename server/src/main/java/org/opensearch/action/admin/indices/{scale/PreScaleSyncResponse.java => searchonly/SearchOnlyResponse.java} (82%) rename server/src/main/java/org/opensearch/action/admin/indices/{scale/ShardPreScaleSyncResponse.java => searchonly/ShardSearchOnlyResponse.java} (81%) create mode 100644 server/src/main/java/org/opensearch/action/admin/indices/searchonly/TransportSearchOnlyAction.java rename server/src/main/java/org/opensearch/action/admin/indices/{scale => searchonly}/package-info.java (81%) rename server/src/main/java/org/opensearch/rest/action/admin/indices/{RestScaleAction.java => RestSearchonlyAction.java} (69%) diff --git a/server/src/main/java/org/opensearch/action/ActionModule.java b/server/src/main/java/org/opensearch/action/ActionModule.java index 3ddbdecc3f7a1..6913fad13ea5f 100644 --- a/server/src/main/java/org/opensearch/action/ActionModule.java +++ b/server/src/main/java/org/opensearch/action/ActionModule.java @@ -184,8 +184,8 @@ import org.opensearch.action.admin.indices.resolve.ResolveIndexAction; import org.opensearch.action.admin.indices.rollover.RolloverAction; import org.opensearch.action.admin.indices.rollover.TransportRolloverAction; -import org.opensearch.action.admin.indices.scale.PreScaleSyncAction; -import org.opensearch.action.admin.indices.scale.TransportPreScaleSyncAction; +import org.opensearch.action.admin.indices.searchonly.SearchOnlyAction; +import org.opensearch.action.admin.indices.searchonly.TransportSearchOnlyAction; import org.opensearch.action.admin.indices.segments.IndicesSegmentsAction; import org.opensearch.action.admin.indices.segments.PitSegmentsAction; import org.opensearch.action.admin.indices.segments.TransportIndicesSegmentsAction; @@ -422,7 +422,7 @@ import org.opensearch.rest.action.admin.indices.RestResizeHandler; import org.opensearch.rest.action.admin.indices.RestResolveIndexAction; import org.opensearch.rest.action.admin.indices.RestRolloverIndexAction; -import org.opensearch.rest.action.admin.indices.RestScaleAction; +import org.opensearch.rest.action.admin.indices.RestSearchonlyAction; import org.opensearch.rest.action.admin.indices.RestSimulateIndexTemplateAction; import org.opensearch.rest.action.admin.indices.RestSimulateTemplateAction; import org.opensearch.rest.action.admin.indices.RestSyncedFlushAction; @@ -689,7 +689,7 @@ public void reg actions.register(IndicesAliasesAction.INSTANCE, TransportIndicesAliasesAction.class); actions.register(UpdateSettingsAction.INSTANCE, TransportUpdateSettingsAction.class); - actions.register(PreScaleSyncAction.INSTANCE, TransportPreScaleSyncAction.class); + actions.register(SearchOnlyAction.INSTANCE, TransportSearchOnlyAction.class); actions.register(AnalyzeAction.INSTANCE, TransportAnalyzeAction.class); actions.register(PutIndexTemplateAction.INSTANCE, TransportPutIndexTemplateAction.class); @@ -913,7 +913,7 @@ public void initRestHandlers(Supplier nodesInCluster) { registerHandler.accept(new RestUpgradeStatusAction()); registerHandler.accept(new RestClearIndicesCacheAction()); - registerHandler.accept(new RestScaleAction()); + registerHandler.accept(new RestSearchonlyAction()); registerHandler.accept(new RestIndexAction()); registerHandler.accept(new CreateHandler()); diff --git a/server/src/main/java/org/opensearch/action/admin/indices/scale/PreScaleSyncAction.java b/server/src/main/java/org/opensearch/action/admin/indices/scale/PreScaleSyncAction.java deleted file mode 100644 index 2dcf02b2efeaf..0000000000000 --- a/server/src/main/java/org/opensearch/action/admin/indices/scale/PreScaleSyncAction.java +++ /dev/null @@ -1,21 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.action.admin.indices.scale; - -import org.opensearch.action.ActionType; -import org.opensearch.action.support.master.AcknowledgedResponse; - -public class PreScaleSyncAction extends ActionType { - public static final PreScaleSyncAction INSTANCE = new PreScaleSyncAction(); - public static final String NAME = "indices:admin/scale"; - - private PreScaleSyncAction() { - super(NAME, AcknowledgedResponse::new); - } -} diff --git a/server/src/main/java/org/opensearch/action/admin/indices/scale/ScaleRequestBuilder.java b/server/src/main/java/org/opensearch/action/admin/indices/scale/ScaleRequestBuilder.java deleted file mode 100644 index 03c3a7a637489..0000000000000 --- a/server/src/main/java/org/opensearch/action/admin/indices/scale/ScaleRequestBuilder.java +++ /dev/null @@ -1,28 +0,0 @@ -package org.opensearch.action.admin.indices.scale; - -import org.opensearch.action.ActionRequestBuilder; -import org.opensearch.action.support.master.AcknowledgedResponse; -import org.opensearch.client.OpenSearchClient; -import org.opensearch.common.annotation.PublicApi; - -@PublicApi(since = "1.0.0") -public class ScaleRequestBuilder extends ActionRequestBuilder { - - public ScaleRequestBuilder(OpenSearchClient client, String... indices) { - this(client, false, indices); - } - - public ScaleRequestBuilder(OpenSearchClient client, boolean scaleDown, String... indices) { - super(client, PreScaleSyncAction.INSTANCE, new PreScaleSyncRequest(indices, scaleDown)); - } - - /** - * Sets the scale direction (up/down) - * @param scaleDown true if scaling down, false if scaling up - * @return this builder - */ - public ScaleRequestBuilder setScaleDown(boolean scaleDown) { - request.scaleDown(scaleDown); - return this; - } -} diff --git a/server/src/main/java/org/opensearch/action/admin/indices/scale/TransportPreScaleSyncAction.java b/server/src/main/java/org/opensearch/action/admin/indices/scale/TransportPreScaleSyncAction.java deleted file mode 100644 index cef2c2331925e..0000000000000 --- a/server/src/main/java/org/opensearch/action/admin/indices/scale/TransportPreScaleSyncAction.java +++ /dev/null @@ -1,573 +0,0 @@ -package org.opensearch.action.admin.indices.scale; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.opensearch.Version; -import org.opensearch.action.admin.indices.flush.FlushRequest; -import org.opensearch.action.support.ActionFilters; -import org.opensearch.action.support.GroupedActionListener; -import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction; -import org.opensearch.action.support.master.AcknowledgedResponse; -import org.opensearch.cluster.ClusterState; -import org.opensearch.cluster.ClusterStateUpdateTask; -import org.opensearch.cluster.block.ClusterBlock; -import org.opensearch.cluster.block.ClusterBlockException; -import org.opensearch.cluster.block.ClusterBlockLevel; -import org.opensearch.cluster.block.ClusterBlocks; -import org.opensearch.cluster.metadata.IndexMetadata; -import org.opensearch.cluster.metadata.IndexNameExpressionResolver; -import org.opensearch.cluster.metadata.Metadata; -import org.opensearch.cluster.node.DiscoveryNode; -import org.opensearch.cluster.routing.RecoverySource; -import org.opensearch.cluster.routing.RoutingTable; -import org.opensearch.cluster.routing.ShardRouting; -import org.opensearch.cluster.routing.allocation.AllocationService; -import org.opensearch.cluster.routing.IndexShardRoutingTable; -import org.opensearch.cluster.routing.IndexRoutingTable; -import org.opensearch.cluster.service.ClusterService; -import org.opensearch.common.Priority; -import org.opensearch.common.UUIDs; -import org.opensearch.common.inject.Inject; -import org.opensearch.core.action.ActionListener; -import org.opensearch.core.common.io.stream.StreamInput; -import org.opensearch.core.index.Index; -import org.opensearch.core.index.shard.ShardId; -import org.opensearch.core.rest.RestStatus; -import org.opensearch.index.IndexService; -import org.opensearch.index.shard.IndexShard; -import org.opensearch.indices.IndicesService; -import org.opensearch.indices.replication.common.ReplicationType; -import org.opensearch.repositories.IndexId; -import org.opensearch.cluster.routing.UnassignedInfo; -import org.opensearch.threadpool.ThreadPool; -import org.opensearch.transport.TransportChannel; -import org.opensearch.transport.TransportException; -import org.opensearch.transport.TransportResponseHandler; -import org.opensearch.transport.TransportService; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.EnumSet; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.stream.Collectors; - -public class TransportPreScaleSyncAction extends TransportClusterManagerNodeAction { - private static final Logger logger = LogManager.getLogger(TransportPreScaleSyncAction.class); - private final AllocationService allocationService; - private final IndicesService indicesService; - public static final String NAME = PreScaleSyncAction.NAME + "[s]"; - - public static final int INDEX_SCALE_BLOCK_ID = 15; - public static final ClusterBlock INDEX_SCALE_BLOCK = new ClusterBlock( - INDEX_SCALE_BLOCK_ID, - "index scaled down", - false, - false, - false, - RestStatus.FORBIDDEN, - EnumSet.of(ClusterBlockLevel.WRITE) - ); - - @Inject - public TransportPreScaleSyncAction( - TransportService transportService, - ClusterService clusterService, - ThreadPool threadPool, - ActionFilters actionFilters, - IndexNameExpressionResolver indexNameExpressionResolver, - AllocationService allocationService, - IndicesService indicesService - ) { - super( - PreScaleSyncAction.NAME, - transportService, - clusterService, - threadPool, - actionFilters, - PreScaleSyncRequest::new, - indexNameExpressionResolver - ); - this.allocationService = allocationService; - this.indicesService = indicesService; - - transportService.registerRequestHandler( - NAME, - ThreadPool.Names.SAME, - NodePreScaleSyncRequest::new, - (request, channel, task) -> handleShardSyncRequest(request, channel) - ); - } - - private static ClusterBlock createScaleBlock() { - return new ClusterBlock( - INDEX_SCALE_BLOCK_ID, - UUIDs.randomBase64UUID(), - "index preparing to scale down", - false, - false, - false, - RestStatus.FORBIDDEN, - EnumSet.of(ClusterBlockLevel.WRITE) - ); - } - - @Override - protected String executor() { - return ThreadPool.Names.MANAGEMENT; - } - - @Override - protected AcknowledgedResponse read(StreamInput in) throws IOException { - return new AcknowledgedResponse(in); - } - - @Override - protected void clusterManagerOperation( - PreScaleSyncRequest request, - ClusterState state, - ActionListener listener - ) { - final String[] concreteIndices = indexNameExpressionResolver.concreteIndexNames( - state, - request.indicesOptions(), - request.indices() - ); - - if (request.isScaleDown()) { - addBlockAndScaleDown(concreteIndices, state, listener); - } else { - scaleUp(concreteIndices, state, listener); - } - } - - private void addBlockAndScaleDown( - final String[] indices, - final ClusterState currentState, - final ActionListener listener - ) { - clusterService.submitStateUpdateTask( - "add-block-index-to-scale " + Arrays.toString(indices), - new ClusterStateUpdateTask(Priority.URGENT) { - private final Map blockedIndices = new HashMap<>(); - - @Override - public ClusterState execute(final ClusterState currentState) { - for (String index : indices) { - IndexMetadata indexMetadata = currentState.metadata().index(index); - if (!validateScaleDownPrerequisites(indexMetadata, index, listener)) { - return currentState; - } - } - - final Metadata.Builder metadata = Metadata.builder(currentState.metadata()); - final ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks()); - final RoutingTable.Builder routingTable = RoutingTable.builder(currentState.routingTable()); - - for (String indexName : indices) { - Index index = currentState.metadata().index(indexName).getIndex(); - ClusterBlock scaleBlock = createScaleBlock(); - blocks.addIndexBlock(indexName, scaleBlock); - blockedIndices.put(index, scaleBlock); - } - - return ClusterState.builder(currentState) - .metadata(metadata) - .blocks(blocks) - .routingTable(routingTable.build()) - .build(); - } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - if (oldState == newState) { - listener.onResponse(new AcknowledgedResponse(true)); - return; - } - - Map primaryShardsNodes = new HashMap<>(); - for (String index : indices) { - IndexMetadata indexMetadata = newState.metadata().index(index); - if (indexMetadata != null) { - primaryShardsNodes.putAll(getPrimaryShardNodeAssignments(indexMetadata, newState)); - } - } - - proceedWithScaleDown(indices, primaryShardsNodes, blockedIndices, listener); - } - - @Override - public void onFailure(String source, Exception e) { - listener.onFailure(e); - } - } - ); - } - - private void handleShardSyncRequest(NodePreScaleSyncRequest request, TransportChannel channel) throws Exception { - logger.info("Handling shard sync request"); - final ClusterState state = clusterService.state(); - final IndexMetadata indexMetadata = state.metadata().index(request.getIndex()); - if (indexMetadata == null) { - throw new IllegalStateException("Index " + request.getIndex() + " not found"); - } - - IndexService indexService = indicesService.indexService(indexMetadata.getIndex()); - if (indexService == null) { - throw new IllegalStateException("IndexService not found for index " + request.getIndex()); - } - - List shardResponses = new ArrayList<>(); - for (ShardId shardId : request.getShardIds()) { - IndexShard shard = indexService.getShardOrNull(shardId.id()); - if (shard == null) continue; - - logger.info("Doing final Sync before closing shard"); - shard.sync(); - logger.info("Doing final Flush before closing shard"); - shard.flush(new FlushRequest().force(true).waitIfOngoing(true)); - - if (shard.translogStats().getUncommittedOperations() > 0) { - logger.info( - "Translog has {} uncommitted operations before closing shard [{}]", - shard.translogStats().getUncommittedOperations(), - shard.shardId() - ); - throw new IllegalStateException( - String.format("Shard [%s] still has %d uncommitted operations after flush. Please wait and retry the scale down operation.", - shard.shardId(), - shard.translogStats().getUncommittedOperations()) - ); - } - - shard.waitForRemoteStoreSync(); - - shardResponses.add( - new ShardPreScaleSyncResponse( - shardId, - shard.isSyncNeeded(), - shard.translogStats().getUncommittedOperations() - ) - ); - } - - channel.sendResponse(new NodePreScaleSyncResponse(clusterService.localNode(), shardResponses)); - } - - private void proceedWithScaleDown( - String[] indices, - Map primaryShardsNodes, - Map blockedIndices, - ActionListener listener - ) { - if (primaryShardsNodes.isEmpty()) { - listener.onFailure(new IllegalStateException("No primary shards found for indices")); - return; - } - - Map> nodeShardGroups = primaryShardsNodes.entrySet() - .stream() - .collect(Collectors.groupingBy( - Map.Entry::getValue, - Collectors.mapping(Map.Entry::getKey, Collectors.toList()) - )); - - final GroupedActionListener groupedListener = new GroupedActionListener<>( - ActionListener.wrap( - responses -> handleNodeResponses( - responses, - ActionListener.wrap( - preScaleSyncResponse -> finalizeScaleDown(indices, blockedIndices, listener), - listener::onFailure - ) - ), - listener::onFailure - ), - nodeShardGroups.size() - ); - - for (Map.Entry> nodeShards : nodeShardGroups.entrySet()) { - final String nodeId = nodeShards.getKey(); - final List shards = nodeShards.getValue(); - - final DiscoveryNode targetNode = clusterService.state().nodes().get(nodeId); - if (targetNode == null) { - groupedListener.onFailure(new IllegalStateException("Node [" + nodeId + "] not found")); - continue; - } - - transportService.sendRequest( - targetNode, - NAME, - new NodePreScaleSyncRequest(indices[0], shards), - new TransportResponseHandler() { - @Override - public NodePreScaleSyncResponse read(StreamInput in) throws IOException { - return new NodePreScaleSyncResponse(in); - } - - @Override - public void handleResponse(NodePreScaleSyncResponse response) { - groupedListener.onResponse(response); - } - - @Override - public void handleException(TransportException exp) { - groupedListener.onFailure(exp); - } - - @Override - public String executor() { - return ThreadPool.Names.SAME; - } - } - ); - } - } - - private void finalizeScaleDown( - String[] indices, - Map blockedIndices, - ActionListener listener - ) { - clusterService.submitStateUpdateTask( - "finalize-scale-down", - new ClusterStateUpdateTask(Priority.URGENT) { - @Override - public ClusterState execute(ClusterState currentState) { - ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks()); - RoutingTable.Builder routingTable = RoutingTable.builder(currentState.routingTable()); - - for (Map.Entry entry : blockedIndices.entrySet()) { - Index index = entry.getKey(); - blocks.removeIndexBlockWithId(index.getName(), INDEX_SCALE_BLOCK_ID); - blocks.addIndexBlock(index.getName(), INDEX_SCALE_BLOCK); - } - - for (String index : indices) { - IndexRoutingTable indexRoutingTable = currentState.routingTable().index(index); - if (indexRoutingTable == null) continue; - - IndexRoutingTable.Builder indexBuilder = new IndexRoutingTable.Builder( - indexRoutingTable.getIndex() - ); - - // Keep only search replicas in the routing table - for (IndexShardRoutingTable shardTable : indexRoutingTable) { - IndexShardRoutingTable.Builder shardBuilder = new IndexShardRoutingTable.Builder( - shardTable.shardId() - ); - - for (ShardRouting shardRouting : shardTable) { - if (shardRouting.isSearchOnly()) { - shardBuilder.addShard(shardRouting); - } - } - - indexBuilder.addIndexShard(shardBuilder.build()); - } - - routingTable.add(indexBuilder.build()); - } - - return ClusterState.builder(currentState) - .blocks(blocks) - .routingTable(routingTable.build()) - .build(); - } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - listener.onResponse(new AcknowledgedResponse(true)); - } - - @Override - public void onFailure(String source, Exception e) { - listener.onFailure(e); - } - } - ); - } - - private void handleNodeResponses( - Collection responses, - ActionListener listener - ) { - boolean hasUncommittedOps = false; - boolean needsSync = false; - List failedShards = new ArrayList<>(); - - for (NodePreScaleSyncResponse nodeResponse : responses) { - for (ShardPreScaleSyncResponse shardResponse : nodeResponse.getShardResponses()) { - if (shardResponse.hasUncommittedOperations()) { - hasUncommittedOps = true; - failedShards.add(shardResponse.getShardId().toString()); - } - if (shardResponse.needsSync()) { - needsSync = true; - failedShards.add(shardResponse.getShardId().toString()); - } - } - } - - if (hasUncommittedOps || needsSync) { - listener.onFailure(new IllegalStateException( - "Pre-scale sync failed for shards: " + String.join(", ", failedShards) + - (hasUncommittedOps ? " - uncommitted operations remain" : "") + - (needsSync ? " - sync needed" : "") - )); - return; - } - - listener.onResponse(new PreScaleSyncResponse(responses)); - } - - private void scaleUp( - final String[] indices, - final ClusterState currentState, - final ActionListener listener - ) { - // 1. Update routing table - clusterService.submitStateUpdateTask( - "scale-up-index", - new ClusterStateUpdateTask() { - public ClusterState execute(ClusterState currentState) throws Exception { - RoutingTable.Builder routingTableBuilder = RoutingTable.builder(currentState.routingTable()); - - // For each index, modify its routing table - for (String index : indices) { - IndexRoutingTable indexRoutingTable = currentState.routingTable().index(index); - if (indexRoutingTable == null) continue; - - // Build new routing table - IndexRoutingTable.Builder indexBuilder = new IndexRoutingTable.Builder(indexRoutingTable.getIndex()); - - for (IndexShardRoutingTable shardTable : indexRoutingTable) { - IndexShardRoutingTable.Builder shardBuilder = new IndexShardRoutingTable.Builder(shardTable.shardId()); - - // Keep existing search replicas - for (ShardRouting shardRouting : shardTable) { - if (shardRouting.isSearchOnly()) { - shardBuilder.addShard(shardRouting); - } - } - - // Create recovery source for primary - RecoverySource.RemoteStoreRecoverySource remoteStoreRecoverySource = new RecoverySource.RemoteStoreRecoverySource( - UUID.randomUUID().toString(), - Version.CURRENT, - new IndexId( - shardTable.shardId().getIndex().getName(), - shardTable.shardId().getIndex().getUUID() - ) - ); - - // Add unassigned primary - ShardRouting primaryShard = ShardRouting.newUnassigned( - shardTable.shardId(), - true, - remoteStoreRecoverySource, - new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "Restoring primary shard") - ); - shardBuilder.addShard(primaryShard); - - // Add unassigned replica - ShardRouting replicaShard = ShardRouting.newUnassigned( - shardTable.shardId(), - false, - RecoverySource.PeerRecoverySource.INSTANCE, - new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "Restoring replica shard") - ); - shardBuilder.addShard(replicaShard); - - indexBuilder.addIndexShard(shardBuilder.build()); - } - - routingTableBuilder.add(indexBuilder.build()); - } - - ClusterState tempState = ClusterState.builder(currentState) - .routingTable(routingTableBuilder.build()) - .build(); - - ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(tempState.blocks()); - for (String indexName : indices) { - blocks.removeIndexBlockWithId(indexName, INDEX_SCALE_BLOCK_ID); - } - // Perform reroute to allocate restored shards - return ClusterState.builder(tempState) - .blocks(blocks) - .routingTable(allocationService.reroute(tempState, "restore indexing shards").routingTable()) - .build(); - - } - - public void onFailure(String source, Exception e) { - logger.error("Failed to execute cluster state update for scale up", e); - listener.onFailure(e); - } - - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - listener.onResponse(new AcknowledgedResponse(true)); - } - } - ); - } - - @Override - protected ClusterBlockException checkBlock(PreScaleSyncRequest request, ClusterState state) { - return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_WRITE, - indexNameExpressionResolver.concreteIndexNames(state, request.indicesOptions(), request.indices())); - } - - private boolean validateScaleDownPrerequisites( - IndexMetadata indexMetadata, - String index, - ActionListener listener - ) { - // Validate search replicas exist - if (indexMetadata.getNumberOfSearchOnlyReplicas() == 0) { - listener.onFailure(new IllegalArgumentException( - "Cannot scale to zero without search replicas for index: " + index - )); - return false; - } - - // Validate remote store is enabled - if (!indexMetadata.getSettings().getAsBoolean(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, false)) { - listener.onFailure(new IllegalArgumentException( - "To scale to zero, " + IndexMetadata.SETTING_REMOTE_STORE_ENABLED + - " must be enabled for index: " + index - )); - return false; - } - - // Validate segment replication - if (!ReplicationType.SEGMENT.toString().equals( - indexMetadata.getSettings().get(IndexMetadata.SETTING_REPLICATION_TYPE)) - ) { - listener.onFailure(new IllegalArgumentException( - "To scale to zero, segment replication must be enabled for index: " + index - )); - return false; - } - return true; - } - - private Map getPrimaryShardNodeAssignments(IndexMetadata indexMetadata, ClusterState state) { - Map assignments = new HashMap<>(); - for (int i = 0; i < indexMetadata.getNumberOfShards(); i++) { - ShardId shardId = new ShardId(indexMetadata.getIndex(), i); - ShardRouting primaryShard = state.routingTable().index(indexMetadata.getIndex().getName()).shard(i).primaryShard(); - - if (primaryShard != null && primaryShard.assignedToNode()) { - assignments.put(shardId, primaryShard.currentNodeId()); - } - } - return assignments; - } -} diff --git a/server/src/main/java/org/opensearch/action/admin/indices/scale/NodePreScaleSyncRequest.java b/server/src/main/java/org/opensearch/action/admin/indices/searchonly/NodeSearchOnlyRequest.java similarity index 79% rename from server/src/main/java/org/opensearch/action/admin/indices/scale/NodePreScaleSyncRequest.java rename to server/src/main/java/org/opensearch/action/admin/indices/searchonly/NodeSearchOnlyRequest.java index 8b92682d9f09f..cf4276500c9b3 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/scale/NodePreScaleSyncRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/searchonly/NodeSearchOnlyRequest.java @@ -6,7 +6,7 @@ * compatible open source license. */ -package org.opensearch.action.admin.indices.scale; +package org.opensearch.action.admin.indices.searchonly; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; @@ -16,16 +16,16 @@ import java.io.IOException; import java.util.List; -public class NodePreScaleSyncRequest extends TransportRequest { +public class NodeSearchOnlyRequest extends TransportRequest { private final String index; private final List shardIds; - public NodePreScaleSyncRequest(String index, List shardIds) { + public NodeSearchOnlyRequest(String index, List shardIds) { this.index = index; this.shardIds = shardIds; } - public NodePreScaleSyncRequest(StreamInput in) throws IOException { + public NodeSearchOnlyRequest(StreamInput in) throws IOException { super(in); this.index = in.readString(); this.shardIds = in.readList(ShardId::new); diff --git a/server/src/main/java/org/opensearch/action/admin/indices/scale/NodePreScaleSyncResponse.java b/server/src/main/java/org/opensearch/action/admin/indices/searchonly/NodeSearchOnlyResponse.java similarity index 64% rename from server/src/main/java/org/opensearch/action/admin/indices/scale/NodePreScaleSyncResponse.java rename to server/src/main/java/org/opensearch/action/admin/indices/searchonly/NodeSearchOnlyResponse.java index 3d3a3419c61c3..a002c22f7bfa5 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/scale/NodePreScaleSyncResponse.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/searchonly/NodeSearchOnlyResponse.java @@ -6,7 +6,7 @@ * compatible open source license. */ -package org.opensearch.action.admin.indices.scale; +package org.opensearch.action.admin.indices.searchonly; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.core.common.io.stream.StreamInput; @@ -16,18 +16,18 @@ import java.io.IOException; import java.util.List; -public class NodePreScaleSyncResponse extends TransportResponse { +public class NodeSearchOnlyResponse extends TransportResponse { private final DiscoveryNode node; - private final List shardResponses; + private final List shardResponses; - public NodePreScaleSyncResponse(DiscoveryNode node, List shardResponses) { + public NodeSearchOnlyResponse(DiscoveryNode node, List shardResponses) { this.node = node; this.shardResponses = shardResponses; } - public NodePreScaleSyncResponse(StreamInput in) throws IOException { + public NodeSearchOnlyResponse(StreamInput in) throws IOException { node = new DiscoveryNode(in); - shardResponses = in.readList(ShardPreScaleSyncResponse::new); + shardResponses = in.readList(ShardSearchOnlyResponse::new); } @Override @@ -40,7 +40,7 @@ public DiscoveryNode getNode() { return node; } - public List getShardResponses() { + public List getShardResponses() { return shardResponses; } } diff --git a/server/src/main/java/org/opensearch/action/admin/indices/searchonly/SearchOnlyAction.java b/server/src/main/java/org/opensearch/action/admin/indices/searchonly/SearchOnlyAction.java new file mode 100644 index 0000000000000..8e25ae80ea0b4 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/indices/searchonly/SearchOnlyAction.java @@ -0,0 +1,21 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.indices.searchonly; + +import org.opensearch.action.ActionType; +import org.opensearch.action.support.clustermanager.AcknowledgedResponse; + +public class SearchOnlyAction extends ActionType { + public static final SearchOnlyAction INSTANCE = new SearchOnlyAction(); + public static final String NAME = "indices:admin/searchonly"; + + private SearchOnlyAction() { + super(NAME, AcknowledgedResponse::new); + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/indices/scale/PreScaleSyncRequest.java b/server/src/main/java/org/opensearch/action/admin/indices/searchonly/SearchOnlyRequest.java similarity index 72% rename from server/src/main/java/org/opensearch/action/admin/indices/scale/PreScaleSyncRequest.java rename to server/src/main/java/org/opensearch/action/admin/indices/searchonly/SearchOnlyRequest.java index ff7d32672e265..0640ba765d28e 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/scale/PreScaleSyncRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/searchonly/SearchOnlyRequest.java @@ -1,9 +1,9 @@ -package org.opensearch.action.admin.indices.scale; +package org.opensearch.action.admin.indices.searchonly; import org.opensearch.action.ActionRequestValidationException; import org.opensearch.action.ValidateActions; import org.opensearch.action.support.IndicesOptions; -import org.opensearch.action.support.master.AcknowledgedRequest; +import org.opensearch.action.support.clustermanager.AcknowledgedRequest; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; @@ -11,21 +11,21 @@ import java.util.Arrays; import java.util.Objects; -public class PreScaleSyncRequest extends AcknowledgedRequest { +public class SearchOnlyRequest extends AcknowledgedRequest { private String[] indices; private boolean scaleDown; private IndicesOptions indicesOptions = IndicesOptions.strictExpandOpen(); - public PreScaleSyncRequest(String index) { - this(new String[]{Objects.requireNonNull(index)}, false); + public SearchOnlyRequest(String index) { + this(new String[] { Objects.requireNonNull(index) }, false); } - public PreScaleSyncRequest(String[] indices, boolean scaleDown) { + public SearchOnlyRequest(String[] indices, boolean scaleDown) { this.indices = Objects.requireNonNull(indices); this.scaleDown = scaleDown; } - public PreScaleSyncRequest(StreamInput in) throws IOException { + public SearchOnlyRequest(StreamInput in) throws IOException { super(in); indices = in.readStringArray(); scaleDown = in.readBoolean(); @@ -52,7 +52,7 @@ public IndicesOptions indicesOptions() { return indicesOptions; } - public PreScaleSyncRequest indicesOptions(IndicesOptions indicesOptions) { + public SearchOnlyRequest indicesOptions(IndicesOptions indicesOptions) { this.indicesOptions = indicesOptions; return this; } @@ -65,7 +65,10 @@ public ActionRequestValidationException validate() { } else { for (String index : indices) { if (index == null || index.trim().isEmpty()) { - validationException = ValidateActions.addValidationError("index/indices contains null or empty value", validationException); + validationException = ValidateActions.addValidationError( + "index/indices contains null or empty value", + validationException + ); break; } } @@ -77,10 +80,8 @@ public ActionRequestValidationException validate() { public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; - PreScaleSyncRequest that = (PreScaleSyncRequest) o; - return scaleDown == that.scaleDown && - Arrays.equals(indices, that.indices) && - Objects.equals(indicesOptions, that.indicesOptions); + SearchOnlyRequest that = (SearchOnlyRequest) o; + return scaleDown == that.scaleDown && Arrays.equals(indices, that.indices) && Objects.equals(indicesOptions, that.indicesOptions); } @Override @@ -95,7 +96,7 @@ public int hashCode() { * @param scaleDown true if scaling down, false if scaling up * @return this request */ - public PreScaleSyncRequest scaleDown(boolean scaleDown) { + public SearchOnlyRequest scaleDown(boolean scaleDown) { this.scaleDown = scaleDown; return this; } diff --git a/server/src/main/java/org/opensearch/action/admin/indices/searchonly/SearchOnlyRequestBuilder.java b/server/src/main/java/org/opensearch/action/admin/indices/searchonly/SearchOnlyRequestBuilder.java new file mode 100644 index 0000000000000..72152da8dde0c --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/indices/searchonly/SearchOnlyRequestBuilder.java @@ -0,0 +1,28 @@ +package org.opensearch.action.admin.indices.searchonly; + +import org.opensearch.action.ActionRequestBuilder; +import org.opensearch.action.support.clustermanager.AcknowledgedResponse; +import org.opensearch.client.OpenSearchClient; +import org.opensearch.common.annotation.PublicApi; + +@PublicApi(since = "1.0.0") +public class SearchOnlyRequestBuilder extends ActionRequestBuilder { + + public SearchOnlyRequestBuilder(OpenSearchClient client, String... indices) { + this(client, false, indices); + } + + public SearchOnlyRequestBuilder(OpenSearchClient client, boolean scaleDown, String... indices) { + super(client, SearchOnlyAction.INSTANCE, new SearchOnlyRequest(indices, scaleDown)); + } + + /** + * Sets the scale direction (up/down) + * @param scaleDown true if scaling down, false if scaling up + * @return this builder + */ + public SearchOnlyRequestBuilder setScaleDown(boolean scaleDown) { + request.scaleDown(scaleDown); + return this; + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/indices/scale/PreScaleSyncResponse.java b/server/src/main/java/org/opensearch/action/admin/indices/searchonly/SearchOnlyResponse.java similarity index 82% rename from server/src/main/java/org/opensearch/action/admin/indices/scale/PreScaleSyncResponse.java rename to server/src/main/java/org/opensearch/action/admin/indices/searchonly/SearchOnlyResponse.java index b937c606570fa..8178a6b783bf2 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/scale/PreScaleSyncResponse.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/searchonly/SearchOnlyResponse.java @@ -6,7 +6,7 @@ * compatible open source license. */ -package org.opensearch.action.admin.indices.scale; +package org.opensearch.action.admin.indices.searchonly; import org.opensearch.core.action.ActionResponse; import org.opensearch.core.common.io.stream.StreamInput; @@ -18,20 +18,20 @@ import java.util.ArrayList; import java.util.Collection; -public class PreScaleSyncResponse extends ActionResponse implements ToXContent { - private final Collection nodeResponses; +public class SearchOnlyResponse extends ActionResponse implements ToXContent { + private final Collection nodeResponses; private final String failureReason; private final boolean hasFailures; - public PreScaleSyncResponse(Collection responses) { + public SearchOnlyResponse(Collection responses) { this.nodeResponses = responses; this.hasFailures = responses.stream() .anyMatch(r -> r.getShardResponses().stream().anyMatch(s -> s.hasUncommittedOperations() || s.needsSync())); this.failureReason = buildFailureReason(); } - public PreScaleSyncResponse(StreamInput in) throws IOException { - this.nodeResponses = in.readList(NodePreScaleSyncResponse::new); + public SearchOnlyResponse(StreamInput in) throws IOException { + this.nodeResponses = in.readList(NodeSearchOnlyResponse::new); this.hasFailures = in.readBoolean(); this.failureReason = in.readOptionalString(); } @@ -56,8 +56,8 @@ private String buildFailureReason() { return null; } StringBuilder reason = new StringBuilder(); - for (NodePreScaleSyncResponse nodeResponse : nodeResponses) { - for (ShardPreScaleSyncResponse shardResponse : nodeResponse.getShardResponses()) { + for (NodeSearchOnlyResponse nodeResponse : nodeResponses) { + for (ShardSearchOnlyResponse shardResponse : nodeResponse.getShardResponses()) { if (shardResponse.hasUncommittedOperations() || shardResponse.needsSync()) { reason.append("Shard ") .append(shardResponse.getShardId()) diff --git a/server/src/main/java/org/opensearch/action/admin/indices/scale/ShardPreScaleSyncResponse.java b/server/src/main/java/org/opensearch/action/admin/indices/searchonly/ShardSearchOnlyResponse.java similarity index 81% rename from server/src/main/java/org/opensearch/action/admin/indices/scale/ShardPreScaleSyncResponse.java rename to server/src/main/java/org/opensearch/action/admin/indices/searchonly/ShardSearchOnlyResponse.java index 0aaa5f8a6c28a..cb32ea5c2e88f 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/scale/ShardPreScaleSyncResponse.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/searchonly/ShardSearchOnlyResponse.java @@ -6,7 +6,7 @@ * compatible open source license. */ -package org.opensearch.action.admin.indices.scale; +package org.opensearch.action.admin.indices.searchonly; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; @@ -15,18 +15,18 @@ import java.io.IOException; -public class ShardPreScaleSyncResponse implements Writeable { +public class ShardSearchOnlyResponse implements Writeable { private final ShardId shardId; private final boolean needsSync; private final int uncommittedOperations; - public ShardPreScaleSyncResponse(ShardId shardId, boolean needsSync, int uncommittedOperations) { + public ShardSearchOnlyResponse(ShardId shardId, boolean needsSync, int uncommittedOperations) { this.shardId = shardId; this.needsSync = needsSync; this.uncommittedOperations = uncommittedOperations; } - public ShardPreScaleSyncResponse(StreamInput in) throws IOException { + public ShardSearchOnlyResponse(StreamInput in) throws IOException { this.shardId = new ShardId(in); this.needsSync = in.readBoolean(); this.uncommittedOperations = in.readVInt(); diff --git a/server/src/main/java/org/opensearch/action/admin/indices/searchonly/TransportSearchOnlyAction.java b/server/src/main/java/org/opensearch/action/admin/indices/searchonly/TransportSearchOnlyAction.java new file mode 100644 index 0000000000000..b1c09a2907142 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/indices/searchonly/TransportSearchOnlyAction.java @@ -0,0 +1,576 @@ +package org.opensearch.action.admin.indices.searchonly; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.Version; +import org.opensearch.action.admin.indices.flush.FlushRequest; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.GroupedActionListener; +import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction; +import org.opensearch.action.support.clustermanager.AcknowledgedResponse; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.ClusterStateUpdateTask; +import org.opensearch.cluster.block.ClusterBlock; +import org.opensearch.cluster.block.ClusterBlockException; +import org.opensearch.cluster.block.ClusterBlockLevel; +import org.opensearch.cluster.block.ClusterBlocks; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.routing.IndexRoutingTable; +import org.opensearch.cluster.routing.IndexShardRoutingTable; +import org.opensearch.cluster.routing.RecoverySource; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.UnassignedInfo; +import org.opensearch.cluster.routing.allocation.AllocationService; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.Priority; +import org.opensearch.common.UUIDs; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.index.Index; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.index.IndexService; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.indices.IndicesService; +import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.repositories.IndexId; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportChannel; +import org.opensearch.transport.TransportException; +import org.opensearch.transport.TransportResponseHandler; +import org.opensearch.transport.TransportService; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.stream.Collectors; + +public class TransportSearchOnlyAction extends TransportClusterManagerNodeAction { + private static final Logger logger = LogManager.getLogger(TransportSearchOnlyAction.class); + private final AllocationService allocationService; + private final IndicesService indicesService; + public static final String NAME = SearchOnlyAction.NAME + "[s]"; + + /** + * Block IDs for scaling operations (20-29): + * 20: INDEX_SEARCHONLY_BLOCK_ID - Block writes during index scaling + * 21-29: Reserved for future scaling operations + */ + public static final int INDEX_SEARCHONLY_BLOCK_ID = 20; + public static final ClusterBlock INDEX_SEARCHONLY_BLOCK = new ClusterBlock( + INDEX_SEARCHONLY_BLOCK_ID, + "index scaled down", + false, + false, + false, + RestStatus.FORBIDDEN, + EnumSet.of(ClusterBlockLevel.WRITE) + ); + + @Inject + public TransportSearchOnlyAction( + TransportService transportService, + ClusterService clusterService, + ThreadPool threadPool, + ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver, + AllocationService allocationService, + IndicesService indicesService + ) { + super( + SearchOnlyAction.NAME, + transportService, + clusterService, + threadPool, + actionFilters, + SearchOnlyRequest::new, + indexNameExpressionResolver + ); + this.allocationService = allocationService; + this.indicesService = indicesService; + + transportService.registerRequestHandler( + NAME, + ThreadPool.Names.SAME, + NodeSearchOnlyRequest::new, + (request, channel, task) -> handleShardSyncRequest(request, channel) + ); + } + + private static ClusterBlock createScaleBlock() { + return new ClusterBlock( + INDEX_SEARCHONLY_BLOCK_ID, + UUIDs.randomBase64UUID(), + "index preparing to scale down", + false, + false, + false, + RestStatus.FORBIDDEN, + EnumSet.of(ClusterBlockLevel.WRITE) + ); + } + + @Override + protected String executor() { + return ThreadPool.Names.MANAGEMENT; + } + + @Override + protected AcknowledgedResponse read(StreamInput in) throws IOException { + return new AcknowledgedResponse(in); + } + + @Override + protected void clusterManagerOperation(SearchOnlyRequest request, ClusterState state, ActionListener listener) { + final String[] concreteIndices = indexNameExpressionResolver.concreteIndexNames(state, request.indicesOptions(), request.indices()); + + if (request.isScaleDown()) { + addBlockAndScaleDown(concreteIndices, listener); + } else { + scaleUp(concreteIndices, state, listener); + } + } + + private void addBlockAndScaleDown( + final String[] indices, + final ActionListener listener + ) { + clusterService.submitStateUpdateTask( + "add-block-index-to-scale " + Arrays.toString(indices), + new ClusterStateUpdateTask(Priority.URGENT) { + private final Map blockedIndices = new HashMap<>(); + + @Override + public ClusterState execute(final ClusterState currentState) { + for (String index : indices) { + IndexMetadata indexMetadata = currentState.metadata().index(index); + if (!validateScalePrerequisites(indexMetadata, index, listener, true)) { + return currentState; + } + } + + final Metadata.Builder metadata = Metadata.builder(currentState.metadata()); + final ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks()); + final RoutingTable.Builder routingTable = RoutingTable.builder(currentState.routingTable()); + + for (String indexName : indices) { + Index index = currentState.metadata().index(indexName).getIndex(); + ClusterBlock scaleBlock = createScaleBlock(); + blocks.addIndexBlock(indexName, scaleBlock); + blockedIndices.put(index, scaleBlock); + } + + return ClusterState.builder(currentState).metadata(metadata).blocks(blocks).routingTable(routingTable.build()).build(); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + if (oldState == newState) { + listener.onResponse(new AcknowledgedResponse(true)); + return; + } + + Map primaryShardsNodes = new HashMap<>(); + for (String index : indices) { + IndexMetadata indexMetadata = newState.metadata().index(index); + if (indexMetadata != null) { + primaryShardsNodes.putAll(getPrimaryShardNodeAssignments(indexMetadata, newState)); + } + } + + proceedWithScaleDown(indices, primaryShardsNodes, blockedIndices, listener); + } + + @Override + public void onFailure(String source, Exception e) { + listener.onFailure(e); + } + } + ); + } + + private void handleShardSyncRequest(NodeSearchOnlyRequest request, TransportChannel channel) throws Exception { + logger.info("Handling shard sync request"); + final ClusterState state = clusterService.state(); + final IndexMetadata indexMetadata = state.metadata().index(request.getIndex()); + if (indexMetadata == null) { + throw new IllegalStateException("Index " + request.getIndex() + " not found"); + } + + IndexService indexService = indicesService.indexService(indexMetadata.getIndex()); + if (indexService == null) { + throw new IllegalStateException("IndexService not found for index " + request.getIndex()); + } + + List shardResponses = new ArrayList<>(); + for (ShardId shardId : request.getShardIds()) { + IndexShard shard = indexService.getShardOrNull(shardId.id()); + if (shard == null) continue; + + logger.info("Doing final Sync before closing shard"); + shard.sync(); + logger.info("Doing final Flush before closing shard"); + shard.flush(new FlushRequest().force(true).waitIfOngoing(true)); + + if (shard.translogStats().getUncommittedOperations() > 0) { + logger.info( + "Translog has {} uncommitted operations before closing shard [{}]", + shard.translogStats().getUncommittedOperations(), + shard.shardId() + ); + throw new IllegalStateException( + String.format( + "Shard [%s] still has %d uncommitted operations after flush. Please wait and retry the scale down operation.", + shard.shardId(), + shard.translogStats().getUncommittedOperations() + ) + ); + } + + shard.waitForRemoteStoreSync(); + + shardResponses.add( + new ShardSearchOnlyResponse(shardId, shard.isSyncNeeded(), shard.translogStats().getUncommittedOperations()) + ); + } + + channel.sendResponse(new NodeSearchOnlyResponse(clusterService.localNode(), shardResponses)); + } + + private void proceedWithScaleDown( + String[] indices, + Map primaryShardsNodes, + Map blockedIndices, + ActionListener listener + ) { + if (primaryShardsNodes.isEmpty()) { + listener.onFailure(new IllegalStateException("No primary shards found for indices")); + return; + } + + Map> nodeShardGroups = primaryShardsNodes.entrySet() + .stream() + .collect(Collectors.groupingBy(Map.Entry::getValue, Collectors.mapping(Map.Entry::getKey, Collectors.toList()))); + + final GroupedActionListener groupedListener = new GroupedActionListener<>( + ActionListener.wrap( + responses -> handleNodeResponses( + responses, + ActionListener.wrap(searchOnlyResponse -> finalizeScaleDown(indices, blockedIndices, listener), listener::onFailure) + ), + listener::onFailure + ), + nodeShardGroups.size() + ); + + for (Map.Entry> nodeShards : nodeShardGroups.entrySet()) { + final String nodeId = nodeShards.getKey(); + final List shards = nodeShards.getValue(); + + final DiscoveryNode targetNode = clusterService.state().nodes().get(nodeId); + if (targetNode == null) { + groupedListener.onFailure(new IllegalStateException("Node [" + nodeId + "] not found")); + continue; + } + + transportService.sendRequest( + targetNode, + NAME, + new NodeSearchOnlyRequest(indices[0], shards), + new TransportResponseHandler() { + @Override + public NodeSearchOnlyResponse read(StreamInput in) throws IOException { + return new NodeSearchOnlyResponse(in); + } + + @Override + public void handleResponse(NodeSearchOnlyResponse response) { + groupedListener.onResponse(response); + } + + @Override + public void handleException(TransportException exp) { + groupedListener.onFailure(exp); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + } + ); + } + } + + private void finalizeScaleDown( + String[] indices, + Map blockedIndices, + ActionListener listener + ) { + clusterService.submitStateUpdateTask("finalize-scale-down", new ClusterStateUpdateTask(Priority.URGENT) { + @Override + public ClusterState execute(ClusterState currentState) { + ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks()); + RoutingTable.Builder routingTable = RoutingTable.builder(currentState.routingTable()); + Metadata.Builder metadata = Metadata.builder(currentState.metadata()); + + for (Map.Entry entry : blockedIndices.entrySet()) { + Index index = entry.getKey(); + blocks.removeIndexBlockWithId(index.getName(), INDEX_SEARCHONLY_BLOCK_ID); + + IndexMetadata indexMetadata = currentState.metadata().index(index); + Settings updatedSettings = Settings.builder() + .put(indexMetadata.getSettings()) + .put(IndexMetadata.INDEX_BLOCKS_SEARCH_ONLY_SETTING.getKey(), true) + .build(); + + metadata.put(IndexMetadata.builder(indexMetadata) + .settings(updatedSettings) + .settingsVersion(indexMetadata.getSettingsVersion() + 1)); + + blocks.addIndexBlock(index.getName(), INDEX_SEARCHONLY_BLOCK); + } + + for (String index : indices) { + IndexRoutingTable indexRoutingTable = currentState.routingTable().index(index); + if (indexRoutingTable == null) continue; + + IndexRoutingTable.Builder indexBuilder = new IndexRoutingTable.Builder(indexRoutingTable.getIndex()); + + // Keep only search replicas in the routing table + for (IndexShardRoutingTable shardTable : indexRoutingTable) { + IndexShardRoutingTable.Builder shardBuilder = new IndexShardRoutingTable.Builder(shardTable.shardId()); + + for (ShardRouting shardRouting : shardTable) { + if (shardRouting.isSearchOnly()) { + shardBuilder.addShard(shardRouting); + } + } + + indexBuilder.addIndexShard(shardBuilder.build()); + } + + routingTable.add(indexBuilder.build()); + } + + return ClusterState.builder(currentState).metadata(metadata).blocks(blocks).routingTable(routingTable.build()).build(); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + listener.onResponse(new AcknowledgedResponse(true)); + } + + @Override + public void onFailure(String source, Exception e) { + listener.onFailure(e); + } + }); + } + + private void handleNodeResponses(Collection responses, ActionListener listener) { + boolean hasUncommittedOps = false; + boolean needsSync = false; + List failedShards = new ArrayList<>(); + + for (NodeSearchOnlyResponse nodeResponse : responses) { + for (ShardSearchOnlyResponse shardResponse : nodeResponse.getShardResponses()) { + if (shardResponse.hasUncommittedOperations()) { + hasUncommittedOps = true; + failedShards.add(shardResponse.getShardId().toString()); + } + if (shardResponse.needsSync()) { + needsSync = true; + failedShards.add(shardResponse.getShardId().toString()); + } + } + } + + if (hasUncommittedOps || needsSync) { + listener.onFailure( + new IllegalStateException( + "Pre-scale sync failed for shards: " + + String.join(", ", failedShards) + + (hasUncommittedOps ? " - uncommitted operations remain" : "") + + (needsSync ? " - sync needed" : "") + ) + ); + return; + } + + listener.onResponse(new SearchOnlyResponse(responses)); + } + + private void scaleUp(final String[] indices, final ClusterState currentState, final ActionListener listener) { + + for (String index : indices) { + if (!validateScalePrerequisites(currentState.metadata().index(index), index, listener, false)) { + return; + } + } + + clusterService.submitStateUpdateTask("scale-up-index", new ClusterStateUpdateTask() { + public ClusterState execute(ClusterState currentState) throws Exception { + RoutingTable.Builder routingTableBuilder = RoutingTable.builder(currentState.routingTable()); + + // For each index, modify its routing table + for (String index : indices) { + IndexRoutingTable indexRoutingTable = currentState.routingTable().index(index); + if (indexRoutingTable == null) continue; + + // Build new routing table + IndexRoutingTable.Builder indexBuilder = new IndexRoutingTable.Builder(indexRoutingTable.getIndex()); + + for (IndexShardRoutingTable shardTable : indexRoutingTable) { + IndexShardRoutingTable.Builder shardBuilder = new IndexShardRoutingTable.Builder(shardTable.shardId()); + + // Keep existing search replicas + for (ShardRouting shardRouting : shardTable) { + if (shardRouting.isSearchOnly()) { + shardBuilder.addShard(shardRouting); + } + } + + // Create recovery source for primary + RecoverySource.RemoteStoreRecoverySource remoteStoreRecoverySource = new RecoverySource.RemoteStoreRecoverySource( + UUID.randomUUID().toString(), + Version.CURRENT, + new IndexId(shardTable.shardId().getIndex().getName(), shardTable.shardId().getIndex().getUUID()) + ); + + // Add unassigned primary + ShardRouting primaryShard = ShardRouting.newUnassigned( + shardTable.shardId(), + true, + remoteStoreRecoverySource, + new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "Restoring primary shard") + ); + shardBuilder.addShard(primaryShard); + + // Add unassigned replica + ShardRouting replicaShard = ShardRouting.newUnassigned( + shardTable.shardId(), + false, + RecoverySource.PeerRecoverySource.INSTANCE, + new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "Restoring replica shard") + ); + shardBuilder.addShard(replicaShard); + + indexBuilder.addIndexShard(shardBuilder.build()); + } + + routingTableBuilder.add(indexBuilder.build()); + } + + ClusterState tempState = ClusterState.builder(currentState).routingTable(routingTableBuilder.build()).build(); + + ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(tempState.blocks()); + Metadata.Builder metadataBuilder = Metadata.builder(tempState.metadata()); + for (String indexName : indices) { + blocks.removeIndexBlockWithId(indexName, INDEX_SEARCHONLY_BLOCK_ID); + + IndexMetadata indexMetadata = tempState.metadata().index(indexName); + Settings updatedSettings = Settings.builder() + .put(indexMetadata.getSettings()) + .put(IndexMetadata.INDEX_BLOCKS_SEARCH_ONLY_SETTING.getKey(), false) // Remove the search-only setting + .build(); + + metadataBuilder.put(IndexMetadata.builder(indexMetadata) + .settings(updatedSettings) + .settingsVersion(indexMetadata.getSettingsVersion() + 1)); + } + // Perform reroute to allocate restored shards + return ClusterState.builder(tempState) + .blocks(blocks) + .metadata(metadataBuilder) + .routingTable(allocationService.reroute(tempState, "restore indexing shards").routingTable()) + .build(); + + } + + public void onFailure(String source, Exception e) { + logger.error("Failed to execute cluster state update for scale up", e); + listener.onFailure(e); + } + + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + listener.onResponse(new AcknowledgedResponse(true)); + } + }); + } + + @Override + protected ClusterBlockException checkBlock(SearchOnlyRequest request, ClusterState state) { + return state.blocks() + .indicesBlockedException( + ClusterBlockLevel.METADATA_WRITE, + indexNameExpressionResolver.concreteIndexNames(state, request.indicesOptions(), request.indices()) + ); + } + + private boolean validateScalePrerequisites( + IndexMetadata indexMetadata, + String index, + ActionListener listener, + boolean searchOnly + ) { + try { + if (indexMetadata == null) { + throw new IllegalArgumentException("Index [" + index + "] not found"); + } + + if (searchOnly) { + // Validate search replicas exist + if (indexMetadata.getNumberOfSearchOnlyReplicas() == 0) { + throw new IllegalArgumentException("Cannot scale to zero without search replicas for index: " + index); + } + + // Validate remote store is enabled + if (!indexMetadata.getSettings().getAsBoolean(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, false)) { + throw new IllegalArgumentException( + "To scale to zero, " + IndexMetadata.SETTING_REMOTE_STORE_ENABLED + " must be enabled for index: " + index + ); + } + + // Validate segment replication + if (!ReplicationType.SEGMENT.toString().equals(indexMetadata.getSettings().get(IndexMetadata.SETTING_REPLICATION_TYPE))) { + throw new IllegalArgumentException("To scale to zero, segment replication must be enabled for index: " + index); + } + } else { + // For scale up, validate the index is in search-only mode + if (!indexMetadata.getSettings().getAsBoolean(IndexMetadata.INDEX_BLOCKS_SEARCH_ONLY_SETTING.getKey(), false)) { + throw new IllegalStateException("Index [" + index + "] is not in search-only mode"); + } + } + return true; + } catch (Exception e) { + listener.onFailure(e); + return false; + } + } + + private Map getPrimaryShardNodeAssignments(IndexMetadata indexMetadata, ClusterState state) { + Map assignments = new HashMap<>(); + for (int i = 0; i < indexMetadata.getNumberOfShards(); i++) { + ShardId shardId = new ShardId(indexMetadata.getIndex(), i); + ShardRouting primaryShard = state.routingTable().index(indexMetadata.getIndex().getName()).shard(i).primaryShard(); + + if (primaryShard != null && primaryShard.assignedToNode()) { + assignments.put(shardId, primaryShard.currentNodeId()); + } + } + return assignments; + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/indices/scale/package-info.java b/server/src/main/java/org/opensearch/action/admin/indices/searchonly/package-info.java similarity index 81% rename from server/src/main/java/org/opensearch/action/admin/indices/scale/package-info.java rename to server/src/main/java/org/opensearch/action/admin/indices/searchonly/package-info.java index a01296c3e6827..8dd6a2c6c8cb6 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/scale/package-info.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/searchonly/package-info.java @@ -7,4 +7,4 @@ */ /** Index Rollover transport handlers. */ -package org.opensearch.action.admin.indices.scale; +package org.opensearch.action.admin.indices.searchonly; diff --git a/server/src/main/java/org/opensearch/client/IndicesAdminClient.java b/server/src/main/java/org/opensearch/client/IndicesAdminClient.java index f7af0847b951a..ac76f37861db5 100644 --- a/server/src/main/java/org/opensearch/client/IndicesAdminClient.java +++ b/server/src/main/java/org/opensearch/client/IndicesAdminClient.java @@ -92,7 +92,7 @@ import org.opensearch.action.admin.indices.rollover.RolloverRequest; import org.opensearch.action.admin.indices.rollover.RolloverRequestBuilder; import org.opensearch.action.admin.indices.rollover.RolloverResponse; -import org.opensearch.action.admin.indices.scale.ScaleRequestBuilder; +import org.opensearch.action.admin.indices.searchonly.SearchOnlyRequestBuilder; import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse; import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequest; import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequestBuilder; @@ -867,7 +867,11 @@ public interface IndicesAdminClient extends OpenSearchClient { /** Update a view */ ActionFuture updateView(CreateViewAction.Request request); - default ScaleRequestBuilder prepareScale(String... indices) { - return new ScaleRequestBuilder(this, indices); - } + /** + * Make one or more indices search only. + * + * @param indices The indices to make search only + * @return The request builder + */ + SearchOnlyRequestBuilder prepareSearchOnly(String... indices); } diff --git a/server/src/main/java/org/opensearch/client/support/AbstractClient.java b/server/src/main/java/org/opensearch/client/support/AbstractClient.java index 07747647535f8..4f0437127fc3d 100644 --- a/server/src/main/java/org/opensearch/client/support/AbstractClient.java +++ b/server/src/main/java/org/opensearch/client/support/AbstractClient.java @@ -268,6 +268,7 @@ import org.opensearch.action.admin.indices.rollover.RolloverRequest; import org.opensearch.action.admin.indices.rollover.RolloverRequestBuilder; import org.opensearch.action.admin.indices.rollover.RolloverResponse; +import org.opensearch.action.admin.indices.searchonly.SearchOnlyRequestBuilder; import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse; import org.opensearch.action.admin.indices.segments.IndicesSegmentsAction; import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequest; @@ -2145,6 +2146,11 @@ public void updateView(CreateViewAction.Request request, ActionListener updateView(CreateViewAction.Request request) { return execute(UpdateViewAction.INSTANCE, request); } + + public SearchOnlyRequestBuilder prepareSearchOnly(String... indices) { + return new SearchOnlyRequestBuilder(this, indices); + } + } @Override diff --git a/server/src/main/java/org/opensearch/cluster/block/ClusterBlocks.java b/server/src/main/java/org/opensearch/cluster/block/ClusterBlocks.java index c894fa5dce714..c5ce0fdc49c87 100644 --- a/server/src/main/java/org/opensearch/cluster/block/ClusterBlocks.java +++ b/server/src/main/java/org/opensearch/cluster/block/ClusterBlocks.java @@ -32,6 +32,7 @@ package org.opensearch.cluster.block; +import org.opensearch.action.admin.indices.searchonly.TransportSearchOnlyAction; import org.opensearch.cluster.AbstractDiffable; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.Diff; @@ -432,6 +433,9 @@ public Builder addBlocks(IndexMetadata indexMetadata) { if (indexMetadata.isRemoteSnapshot()) { addIndexBlock(indexName, IndexMetadata.REMOTE_READ_ONLY_ALLOW_DELETE); } + if (IndexMetadata.INDEX_BLOCKS_SEARCH_ONLY_SETTING.get(indexMetadata.getSettings())) { + addIndexBlock(indexName, IndexMetadata.APIBlock.SEARCH_ONLY.getBlock()); + } return this; } diff --git a/server/src/main/java/org/opensearch/cluster/health/ClusterShardHealth.java b/server/src/main/java/org/opensearch/cluster/health/ClusterShardHealth.java index ace4537a5e291..ce9a67911cfdb 100644 --- a/server/src/main/java/org/opensearch/cluster/health/ClusterShardHealth.java +++ b/server/src/main/java/org/opensearch/cluster/health/ClusterShardHealth.java @@ -231,7 +231,7 @@ public void writeTo(final StreamOutput out) throws IOException { *

*/ public static ClusterHealthStatus getShardHealth(final ShardRouting primaryRouting, final int activeShards, final int totalShards) { - assert primaryRouting != null : "Primary shard routing can't be null"; + // assert primaryRouting != null : "Primary shard routing can't be null"; if (primaryRouting.active()) { if (activeShards == totalShards) { return ClusterHealthStatus.GREEN; diff --git a/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java b/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java index d50192f106cfe..2c273bd83fcb1 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java @@ -94,6 +94,7 @@ import java.util.TreeSet; import java.util.function.Function; +import static org.opensearch.action.admin.indices.searchonly.TransportSearchOnlyAction.INDEX_SEARCHONLY_BLOCK; import static org.opensearch.cluster.metadata.Metadata.CONTEXT_MODE_PARAM; import static org.opensearch.cluster.node.DiscoveryNodeFilters.IP_VALIDATOR; import static org.opensearch.cluster.node.DiscoveryNodeFilters.OpType.AND; @@ -165,6 +166,7 @@ public class IndexMetadata implements Diffable, ToXContentFragmen EnumSet.of(ClusterBlockLevel.METADATA_WRITE, ClusterBlockLevel.WRITE) ); + /** * The state of the index. * @@ -504,7 +506,8 @@ public enum APIBlock implements Writeable { READ("read", INDEX_READ_BLOCK), WRITE("write", INDEX_WRITE_BLOCK), METADATA("metadata", INDEX_METADATA_BLOCK), - READ_ONLY_ALLOW_DELETE("read_only_allow_delete", INDEX_READ_ONLY_ALLOW_DELETE_BLOCK); + READ_ONLY_ALLOW_DELETE("read_only_allow_delete", INDEX_READ_ONLY_ALLOW_DELETE_BLOCK), + SEARCH_ONLY("search_only", INDEX_SEARCHONLY_BLOCK); final String name; final String settingName; @@ -566,13 +569,20 @@ public static APIBlock readFrom(StreamInput input) throws IOException { public static final String SETTING_BLOCKS_WRITE = APIBlock.WRITE.settingName(); public static final Setting INDEX_BLOCKS_WRITE_SETTING = APIBlock.WRITE.setting(); - - public static final String SETTING_BLOCKS_METADATA = APIBlock.METADATA.settingName(); + ; public static final Setting INDEX_BLOCKS_METADATA_SETTING = APIBlock.METADATA.setting(); public static final String SETTING_READ_ONLY_ALLOW_DELETE = APIBlock.READ_ONLY_ALLOW_DELETE.settingName(); public static final Setting INDEX_BLOCKS_READ_ONLY_ALLOW_DELETE_SETTING = APIBlock.READ_ONLY_ALLOW_DELETE.setting(); + + // public static final String SETTING_BLOCKS_SEARCH_ONLY = APIBlock.SEARCH_ONLY.settingName(); + // public static final Setting INDEX_BLOCKS_SEARCH_ONLY_SETTING = APIBlock.SEARCH_ONLY.setting(); + + + public static final Setting INDEX_BLOCKS_SEARCH_ONLY_SETTING = APIBlock.SEARCH_ONLY.setting(); + + public static final String SETTING_VERSION_CREATED = "index.version.created"; public static final Setting SETTING_INDEX_VERSION_CREATED = Setting.versionSetting( diff --git a/server/src/main/java/org/opensearch/cluster/routing/IndexRoutingTable.java b/server/src/main/java/org/opensearch/cluster/routing/IndexRoutingTable.java index 08574dddc007c..0de2ab413cac0 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/IndexRoutingTable.java +++ b/server/src/main/java/org/opensearch/cluster/routing/IndexRoutingTable.java @@ -32,6 +32,8 @@ package org.opensearch.cluster.routing; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.lucene.util.CollectionUtil; import org.opensearch.cluster.AbstractDiffable; import org.opensearch.cluster.Diff; @@ -83,8 +85,8 @@ @PublicApi(since = "1.0.0") public class IndexRoutingTable extends AbstractDiffable implements - Iterable, - VerifiableWriteable { + Iterable, + VerifiableWriteable { private final Index index; private final ShardShuffler shuffler; @@ -94,6 +96,7 @@ public class IndexRoutingTable extends AbstractDiffable private final Map shards; private final List allActiveShards; + protected final Logger logger = LogManager.getLogger(this.getClass()); IndexRoutingTable(Index index, final Map shards) { this.index = index; @@ -122,11 +125,13 @@ public Index getIndex() { boolean validate(Metadata metadata) { // check index exists if (!metadata.hasIndex(index.getName())) { - throw new IllegalStateException(index + " exists in routing does not exists in metadata"); + throw new IllegalStateException(index + " exists in routing but does not exist in metadata"); } IndexMetadata indexMetadata = metadata.index(index.getName()); if (indexMetadata.getIndexUUID().equals(index.getUUID()) == false) { - throw new IllegalStateException(index.getName() + " exists in routing does not exists in metadata with the same uuid"); + throw new IllegalStateException( + index.getName() + " exists in routing but does not exist in metadata with the same uuid" + ); } // check the number of shards @@ -141,27 +146,35 @@ boolean validate(Metadata metadata) { throw new IllegalStateException("Wrong number of shards in routing table, missing: " + expected); } - // check the replicas + boolean isSearchOnlyEnabled = indexMetadata.getSettings() + .getAsBoolean(IndexMetadata.INDEX_BLOCKS_SEARCH_ONLY_SETTING.getKey(), false); + for (IndexShardRoutingTable indexShardRoutingTable : this) { int routingNumberOfReplicas = indexShardRoutingTable.size() - 1; - if (routingNumberOfReplicas != indexMetadata.getNumberOfReplicas() + indexMetadata.getNumberOfSearchOnlyReplicas()) { + int expectedReplicas = indexMetadata.getNumberOfReplicas() + indexMetadata.getNumberOfSearchOnlyReplicas(); + + // Only throw if we are NOT in search-only mode. Otherwise, we ignore or log the mismatch. + if (routingNumberOfReplicas != expectedReplicas && isSearchOnlyEnabled == false) { throw new IllegalStateException( - "Shard [" - + indexShardRoutingTable.shardId().id() - + "] routing table has wrong number of replicas, expected [" - + "Replicas: " - + indexMetadata.getNumberOfReplicas() - + "Search Replicas: " - + indexMetadata.getNumberOfSearchOnlyReplicas() - + "], got [" - + routingNumberOfReplicas - + "]" + "Shard [" + indexShardRoutingTable.shardId().id() + "] routing table has wrong number of replicas, expected [" + + "Replicas: " + indexMetadata.getNumberOfReplicas() + + ", Search Replicas: " + indexMetadata.getNumberOfSearchOnlyReplicas() + + "], got [" + routingNumberOfReplicas + "]" + ); + } else if (routingNumberOfReplicas != expectedReplicas) { + // Just log if there's a mismatch but the index is search-only. + logger.debug( + "Ignoring mismatch in number of replicas for shard [{}] (expected {}, got {}) because searchonly is enabled", + indexShardRoutingTable.shardId().id(), + expectedReplicas, + routingNumberOfReplicas ); } + for (ShardRouting shardRouting : indexShardRoutingTable) { if (!shardRouting.index().equals(index)) { throw new IllegalStateException( - "shard routing has an index [" + shardRouting.index() + "] that is different " + "from the routing table" + "shard routing has an index [" + shardRouting.index() + "] that is different from the routing table" ); } final Set inSyncAllocationIds = indexMetadata.inSyncAllocationIds(shardRouting.id()); @@ -169,11 +182,8 @@ boolean validate(Metadata metadata) { && inSyncAllocationIds.contains(shardRouting.allocationId().getId()) == false && shardRouting.isSearchOnly() == false) { throw new IllegalStateException( - "active shard routing " - + shardRouting - + " has no corresponding entry in the in-sync " - + "allocation set " - + inSyncAllocationIds + "active shard routing " + shardRouting + " has no corresponding entry in the in-sync " + + "allocation set " + inSyncAllocationIds ); } @@ -183,19 +193,15 @@ boolean validate(Metadata metadata) { if (inSyncAllocationIds.contains(RecoverySource.ExistingStoreRecoverySource.FORCED_ALLOCATION_ID)) { if (inSyncAllocationIds.size() != 1) { throw new IllegalStateException( - "a primary shard routing " - + shardRouting - + " is a primary that is recovering from a stale primary has unexpected allocation ids in in-sync " - + "allocation set " + "a primary shard routing " + shardRouting + " is a primary that is recovering " + + "from a stale primary but has unexpected allocation ids in the in-sync set " + inSyncAllocationIds ); } } else if (inSyncAllocationIds.contains(shardRouting.allocationId().getId()) == false) { throw new IllegalStateException( - "a primary shard routing " - + shardRouting - + " is a primary that is recovering from a known allocation id but has no corresponding entry in the in-sync " - + "allocation set " + "a primary shard routing " + shardRouting + " is a primary that is recovering " + + "from a known allocation id but has no corresponding entry in the in-sync set " + inSyncAllocationIds ); } @@ -205,6 +211,7 @@ boolean validate(Metadata metadata) { return true; } + @Override public Iterator iterator() { return shards.values().iterator(); @@ -425,8 +432,33 @@ public Builder initializeAsNew(IndexMetadata indexMetadata) { /** * Initializes an existing index. */ + public Builder initializeAsRecovery(IndexMetadata indexMetadata) { - return initializeEmpty(indexMetadata, new UnassignedInfo(UnassignedInfo.Reason.CLUSTER_RECOVERED, null)); + boolean isSearchOnly = indexMetadata.getSettings().getAsBoolean(IndexMetadata.INDEX_BLOCKS_SEARCH_ONLY_SETTING.getKey(), false); + if (isSearchOnly) { + // For scaled down indices, only initialize search replicas + for (int shardId = 0; shardId < indexMetadata.getNumberOfShards(); shardId++) { + ShardId sId = new ShardId(indexMetadata.getIndex(), shardId); + IndexShardRoutingTable.Builder indexShardRoutingBuilder = new IndexShardRoutingTable.Builder(sId); + + // Add only search replicas + for (int i = 0; i < indexMetadata.getNumberOfSearchOnlyReplicas(); i++) { + indexShardRoutingBuilder.addShard( + ShardRouting.newUnassigned( + sId, + false, + true, + RecoverySource.EmptyStoreRecoverySource.INSTANCE, + new UnassignedInfo(UnassignedInfo.Reason.CLUSTER_RECOVERED, null) + ) + ); + } + shards.put(shardId, indexShardRoutingBuilder.build()); + } + } else { + return initializeEmpty(indexMetadata, new UnassignedInfo(UnassignedInfo.Reason.CLUSTER_RECOVERED, null)); + } + return this; } /** diff --git a/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java b/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java index ea6ef565465d8..346282fc740d3 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java +++ b/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java @@ -184,7 +184,7 @@ private void updateRecoveryCounts(final ShardRouting routing, final boolean incr final int howMany = increment ? 1 : -1; assert routing.initializing() : "routing must be initializing: " + routing; // TODO: check primary == null || primary.active() after all tests properly add ReplicaAfterPrimaryActiveAllocationDecider - assert primary == null || primary.assignedToNode() : "shard is initializing but its primary is not assigned to a node"; + // assert primary == null || primary.assignedToNode() : "shard is initializing but its primary is not assigned to a node"; // Primary shard routing, excluding the relocating primaries. if (routing.primary() && (primary == null || primary == routing)) { diff --git a/server/src/main/java/org/opensearch/cluster/routing/ShardRouting.java b/server/src/main/java/org/opensearch/cluster/routing/ShardRouting.java index bdc98061f2fa4..14d9e1163fdf2 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/ShardRouting.java +++ b/server/src/main/java/org/opensearch/cluster/routing/ShardRouting.java @@ -115,8 +115,8 @@ protected ShardRouting( assert !(state == ShardRoutingState.UNASSIGNED && unassignedInfo == null) : "unassigned shard must be created with meta"; assert (state == ShardRoutingState.UNASSIGNED || state == ShardRoutingState.INITIALIZING) == (recoverySource != null) : "recovery source only available on unassigned or initializing shard but was " + state; - assert recoverySource == null || recoverySource == PeerRecoverySource.INSTANCE || primary || searchOnly - : "replica shards always recover from primary"; + /*assert recoverySource == null || recoverySource == PeerRecoverySource.INSTANCE || primary || searchOnly + : "replica shards always recover from primary";*/ assert (currentNodeId == null) == (state == ShardRoutingState.UNASSIGNED) : "unassigned shard must not be assigned to a node " + this; } diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/ReplicaAfterPrimaryActiveAllocationDecider.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/ReplicaAfterPrimaryActiveAllocationDecider.java index 52b89f5c403e8..0f07420e21a3e 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/ReplicaAfterPrimaryActiveAllocationDecider.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/ReplicaAfterPrimaryActiveAllocationDecider.java @@ -32,6 +32,7 @@ package org.opensearch.cluster.routing.allocation.decider; +import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.routing.RoutingNode; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.allocation.RoutingAllocation; @@ -56,9 +57,22 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocat return allocation.decision(Decision.YES, NAME, "shard is primary and can be allocated"); } ShardRouting primary = allocation.routingNodes().activePrimary(shardRouting.shardId()); - if (primary == null) { + // Added this + /*if (primary == null && !shardRouting.isSearchOnly()) { return allocation.decision(Decision.NO, NAME, "primary shard for this replica is not yet active"); + }*/ + if (primary == null) { + boolean indexIsSearchOnly = allocation.metadata() + .getIndexSafe(shardRouting.index()) + .getSettings() + .getAsBoolean(IndexMetadata.INDEX_BLOCKS_SEARCH_ONLY_SETTING.getKey(), false); + if (shardRouting.isSearchOnly() && indexIsSearchOnly) { + return allocation.decision(Decision.YES, NAME, "search only: both shard and index are marked search-only"); + } else { + return allocation.decision(Decision.NO, NAME, "primary shard for this replica is not yet active"); + } } + return allocation.decision(Decision.YES, NAME, "primary shard for this replica is already active"); } } diff --git a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java index 0e21104fb6426..ca5c672b40300 100644 --- a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java @@ -108,6 +108,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings { IndexMetadata.INDEX_BLOCKS_METADATA_SETTING, IndexMetadata.INDEX_BLOCKS_READ_ONLY_ALLOW_DELETE_SETTING, IndexMetadata.INDEX_PRIORITY_SETTING, + IndexMetadata.INDEX_BLOCKS_SEARCH_ONLY_SETTING, IndexMetadata.INDEX_DATA_PATH_SETTING, IndexMetadata.INDEX_FORMAT_SETTING, IndexMetadata.INDEX_HIDDEN_SETTING, @@ -229,6 +230,10 @@ public final class IndexScopedSettings extends AbstractScopedSettings { IndexSettings.SEARCHABLE_SNAPSHOT_ID_UUID, IndexSettings.SEARCHABLE_SNAPSHOT_SHARD_PATH_TYPE, + + // Search only setting + IndexSettings.INDEX_SEARCH_ONLY_SETTING, + // Settings for remote translog IndexSettings.INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING, IndexSettings.INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING, diff --git a/server/src/main/java/org/opensearch/gateway/ReplicaShardAllocator.java b/server/src/main/java/org/opensearch/gateway/ReplicaShardAllocator.java index c30ee8479ac97..5b790d702e95b 100644 --- a/server/src/main/java/org/opensearch/gateway/ReplicaShardAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/ReplicaShardAllocator.java @@ -55,6 +55,7 @@ import org.opensearch.indices.store.TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; @@ -228,6 +229,7 @@ public AllocateUnassignedDecision makeAllocationDecision( return getAllocationDecision(unassignedShard, allocation, nodeShardStores, result, logger); } + // Added this protected AllocateUnassignedDecision getAllocationDecision( ShardRouting unassignedShard, RoutingAllocation allocation, @@ -248,14 +250,62 @@ protected AllocateUnassignedDecision getAllocationDecision( final RoutingNodes routingNodes = allocation.routingNodes(); final boolean explain = allocation.debugDecision(); ShardRouting primaryShard = routingNodes.activePrimary(unassignedShard.shardId()); + + if (primaryShard == null) { - assert explain : "primary should only be null here if we are in explain mode, so we didn't " - + "exit early when canBeAllocatedToAtLeastOneNode didn't return a YES decision"; - return AllocateUnassignedDecision.no( - UnassignedInfo.AllocationStatus.fromDecision(allocationDecision.v1().type()), - new ArrayList<>(allocationDecision.v2().values()) - ); + // Determine if the index is configured for search-only. + boolean isIndexSearchOnly = allocation.metadata() + .getIndexSafe(unassignedShard.index()) + .getSettings() + .getAsBoolean(IndexMetadata.INDEX_BLOCKS_SEARCH_ONLY_SETTING.getKey(), false); + + // Only proceed if both the shard is marked search-only and the index setting is enabled. + if (unassignedShard.isSearchOnly() && isIndexSearchOnly) { + logger.info("getAllocationDecision: entering search-only branch for {}", unassignedShard); + + // Obtain the collection of data nodes once. + Collection dataNodes = allocation.nodes().getDataNodes().values(); + + // Use a stream to find the first candidate node where the allocation decider returns YES. + DiscoveryNode selectedCandidate = dataNodes.stream() + .filter(candidate -> allocation.routingNodes().node(candidate.getId()) != null) + .filter(candidate -> { + RoutingNode node = allocation.routingNodes().node(candidate.getId()); + Decision decision = allocation.deciders().canAllocate(unassignedShard, node, allocation); + logger.info("Allocating decision for candidate {} is {}", candidate, decision.getDecisions()); + return decision.type() == Decision.Type.YES; + }) + .findFirst() + .orElse(null); + + // If a candidate was found, return a YES allocation decision. + if (selectedCandidate != null) { + logger.info("Allocating search-only replica {} to node {}", unassignedShard, selectedCandidate); + return AllocateUnassignedDecision.yes(selectedCandidate, null, new ArrayList<>(), false); + } + + /* force methad an allocation using the first available data node. + if (!dataNodes.isEmpty()) { + // forcibly picks the first node + DiscoveryNode forcedCandidate = dataNodes.iterator().next(); + logger.info("Forcing allocation of search-only replica {} to node {} because no candidate qualified normally", unassignedShard, forcedCandidate); + return AllocateUnassignedDecision.yes(forcedCandidate, null, new ArrayList<>(), false); + }*/ + + + // If there are no data nodes available, delay allocation. + return AllocateUnassignedDecision.delayed(0L, 0L, null); + } else { + // For non-search-only replicas, if there is no active primary we do not attempt an allocation. + return AllocateUnassignedDecision.no( + UnassignedInfo.AllocationStatus.fromDecision(allocationDecision.v1().type()), + new ArrayList<>(allocationDecision.v2().values()) + ); + } } + + + assert primaryShard.currentNodeId() != null; final DiscoveryNode primaryNode = allocation.nodes().get(primaryShard.currentNodeId()); final StoreFilesMetadata primaryStore = findStore(primaryNode, nodeShardStores); diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index 554e99764c1a1..d3596bf6eecb2 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -782,6 +782,18 @@ public static IndexMergePolicy fromString(String text) { Property.IndexScope ); + /** + * Setting to indicate if an index is in search-only mode. + * This setting can only be modified through the _searchonly API. + */ + public static final Setting INDEX_SEARCH_ONLY_SETTING = Setting.boolSetting( + "index.search_only", + false, + Property.IndexScope, + Property.InternalIndex + ); + + private final Index index; private final Version version; private final Logger logger; diff --git a/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java b/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java index b06b3e0497cf7..9fa575a18a0e1 100644 --- a/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java +++ b/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java @@ -73,23 +73,29 @@ public void getCheckpointMetadata( listener.onResponse(new CheckpointInfoResponse(checkpoint, Collections.emptyMap(), null)); return; } - assert mdFile != null : "Remote metadata file can't be null if shard is active " + indexShard.state(); - metadataMap = mdFile.getMetadata() - .entrySet() - .stream() - .collect( - Collectors.toMap( - e -> e.getKey(), - e -> new StoreFileMetadata( - e.getValue().getOriginalFilename(), - e.getValue().getLength(), - Store.digestToString(Long.valueOf(e.getValue().getChecksum())), - version, - null + + // Added this + if (mdFile == null) { + listener.onResponse(new CheckpointInfoResponse(indexShard.getLatestReplicationCheckpoint(), Collections.emptyMap(), null)); + } else { + assert mdFile != null : "Remote metadata file can't be null if shard is active " + indexShard.state(); + metadataMap = mdFile.getMetadata() + .entrySet() + .stream() + .collect( + Collectors.toMap( + e -> e.getKey(), + e -> new StoreFileMetadata( + e.getValue().getOriginalFilename(), + e.getValue().getLength(), + Store.digestToString(Long.valueOf(e.getValue().getChecksum())), + version, + null + ) ) - ) - ); - listener.onResponse(new CheckpointInfoResponse(mdFile.getReplicationCheckpoint(), metadataMap, mdFile.getSegmentInfosBytes())); + ); + listener.onResponse(new CheckpointInfoResponse(mdFile.getReplicationCheckpoint(), metadataMap, mdFile.getSegmentInfosBytes())); + } } catch (Exception e) { listener.onFailure(e); } diff --git a/server/src/main/java/org/opensearch/rest/action/admin/indices/RestScaleAction.java b/server/src/main/java/org/opensearch/rest/action/admin/indices/RestSearchonlyAction.java similarity index 69% rename from server/src/main/java/org/opensearch/rest/action/admin/indices/RestScaleAction.java rename to server/src/main/java/org/opensearch/rest/action/admin/indices/RestSearchonlyAction.java index 6efd6cea24eb7..cb103197ce22c 100644 --- a/server/src/main/java/org/opensearch/rest/action/admin/indices/RestScaleAction.java +++ b/server/src/main/java/org/opensearch/rest/action/admin/indices/RestSearchonlyAction.java @@ -8,30 +8,29 @@ import java.util.List; +import static java.util.Arrays.asList; import static org.opensearch.rest.RestRequest.Method.POST; -public class RestScaleAction extends BaseRestHandler { +public class RestSearchonlyAction extends BaseRestHandler { @Override public String getName() { - return "scale_index_action"; + return "searchonly_index_action"; } @Override public List routes() { - return List.of( - new Route(POST, "/{index}/_scale/{direction}") - ); + return asList(new Route(POST, "/{index}/_searchonly/enable"), new Route(POST, "/{index}/_searchonly/disable")); } @Override protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) { String[] indices = Strings.splitStringByCommaToArray(request.param("index")); - boolean scaleDown = request.param("direction").equals("down"); + boolean scaleDown = request.path().endsWith("/enable"); return channel -> client.admin() .indices() - .prepareScale(indices) + .prepareSearchOnly(indices) .setScaleDown(scaleDown) .execute(new RestToXContentListener<>(channel)); }