From 49c6853c5861cfac2134add1c5ecda0a12b9600e Mon Sep 17 00:00:00 2001 From: Rahul Karajgikar Date: Mon, 22 Jul 2024 10:09:51 +0530 Subject: [PATCH 1/8] Fix for hasInitiatedFetching() in batch mode Signed-off-by: Rahul Karajgikar --- .../gateway/RecoveryFromGatewayIT.java | 196 ++++++++++++++++++ .../gateway/AsyncShardBatchFetch.java | 8 + .../gateway/ShardsBatchGatewayAllocator.java | 27 ++- 3 files changed, 230 insertions(+), 1 deletion(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java b/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java index 4085cc3890f30..06fba3e942d4b 100644 --- a/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java @@ -1051,6 +1051,202 @@ public void testMultipleReplicaShardAssignmentWithDelayedAllocationAndDifferentN ensureGreen("test"); } + public void testAllocationExplainReturnsNoWhenExtraReplicaShardInNonBatchMode() throws Exception { + // Non batch mode - This test is to validate that we don't return AWAITING_INFO in allocation explain API when the deciders are + // returning NO + internalCluster().startClusterManagerOnlyNodes( + 1, + Settings.builder().put(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.getKey(), false).build() + ); + internalCluster().startDataOnlyNodes(5); + createIndex( + "test", + Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 4).build() + ); + ensureGreen("test"); + ensureStableCluster(6); + + // Stop one of the nodes to make the cluster yellow + // We cannot directly create an index with replica = data node count because then the whole flow will get skipped due to + // INDEX_CREATED + List nodesWithReplicaShards = findNodesWithShard(false); + Settings replicaNodeDataPathSettings = internalCluster().dataPathSettings(nodesWithReplicaShards.get(0)); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodesWithReplicaShards.get(0))); + + ensureStableCluster(5); + ensureYellow("test"); + + logger.info("--> calling allocation explain API"); + // shard should have decision NO because there is no valid node for the extra replica to go to + assertEquals( + AllocationDecision.NO, + client().admin() + .cluster() + .prepareAllocationExplain() + .setIndex("test") + .setShard(0) + .setPrimary(false) + .get() + .getExplanation() + .getShardAllocationDecision() + .getAllocateDecision() + .getAllocationDecision() + ); + + // Now creating a new index with too many replicas and trying again + createIndex( + "test2", + Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 5).build() + ); + + ensureYellowAndNoInitializingShards("test2"); + + logger.info("--> calling allocation explain API again"); + // shard should have decision NO because there are 6 replicas and 4 data nodes + assertEquals( + AllocationDecision.NO, + client().admin() + .cluster() + .prepareAllocationExplain() + .setIndex("test2") + .setShard(0) + .setPrimary(false) + .get() + .getExplanation() + .getShardAllocationDecision() + .getAllocateDecision() + .getAllocationDecision() + ); + + logger.info("--> restarting the stopped node"); + internalCluster().startDataOnlyNode( + Settings.builder().put("node.name", nodesWithReplicaShards.get(0)).put(replicaNodeDataPathSettings).build() + ); + + ensureStableCluster(6); + ensureGreen("test"); + + logger.info("--> calling allocation explain API 3rd time"); + // shard should still have decision NO because there are 6 replicas and 5 data nodes + assertEquals( + AllocationDecision.NO, + client().admin() + .cluster() + .prepareAllocationExplain() + .setIndex("test2") + .setShard(0) + .setPrimary(false) + .get() + .getExplanation() + .getShardAllocationDecision() + .getAllocateDecision() + .getAllocationDecision() + ); + + internalCluster().startDataOnlyNodes(1); + + ensureStableCluster(7); + ensureGreen("test2"); + } + + public void testAllocationExplainReturnsNoWhenExtraReplicaShardInBatchMode() throws Exception { + // Batch mode - This test is to validate that we don't return AWAITING_INFO in allocation explain API when the deciders are + // returning NO + internalCluster().startClusterManagerOnlyNodes( + 1, + Settings.builder().put(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.getKey(), true).build() + ); + internalCluster().startDataOnlyNodes(5); + createIndex( + "test", + Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 4).build() + ); + ensureGreen("test"); + ensureStableCluster(6); + + // Stop one of the nodes to make the cluster yellow + // We cannot directly create an index with replica = data node count because then the whole flow will get skipped due to + // INDEX_CREATED + List nodesWithReplicaShards = findNodesWithShard(false); + Settings replicaNodeDataPathSettings = internalCluster().dataPathSettings(nodesWithReplicaShards.get(0)); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodesWithReplicaShards.get(0))); + + ensureStableCluster(5); + ensureYellow("test"); + + logger.info("--> calling allocation explain API"); + // shard should have decision NO because there is no valid node for the extra replica to go to + assertEquals( + AllocationDecision.NO, + client().admin() + .cluster() + .prepareAllocationExplain() + .setIndex("test") + .setShard(0) + .setPrimary(false) + .get() + .getExplanation() + .getShardAllocationDecision() + .getAllocateDecision() + .getAllocationDecision() + ); + + // Now creating a new index with too many replicas and trying again + createIndex( + "test2", + Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 5).build() + ); + + ensureYellowAndNoInitializingShards("test2"); + + logger.info("--> calling allocation explain API again"); + // shard should have decision NO because there are 6 replicas and 4 data nodes + assertEquals( + AllocationDecision.NO, + client().admin() + .cluster() + .prepareAllocationExplain() + .setIndex("test2") + .setShard(0) + .setPrimary(false) + .get() + .getExplanation() + .getShardAllocationDecision() + .getAllocateDecision() + .getAllocationDecision() + ); + + logger.info("--> restarting the stopped node"); + internalCluster().startDataOnlyNode( + Settings.builder().put("node.name", nodesWithReplicaShards.get(0)).put(replicaNodeDataPathSettings).build() + ); + + ensureStableCluster(6); + ensureGreen("test"); + + logger.info("--> calling allocation explain API 3rd time"); + // shard should still have decision NO because there are 6 replicas and 5 data nodes + assertEquals( + AllocationDecision.NO, + client().admin() + .cluster() + .prepareAllocationExplain() + .setIndex("test2") + .setShard(0) + .setPrimary(false) + .get() + .getExplanation() + .getShardAllocationDecision() + .getAllocateDecision() + .getAllocationDecision() + ); + + internalCluster().startDataOnlyNodes(1); + + ensureStableCluster(7); + ensureGreen("test2"); + } + public void testNBatchesCreationAndAssignment() throws Exception { // we will reduce batch size to 5 to make sure we have enough batches to test assignment // Total number of primary shards = 50 (50 indices*1) diff --git a/server/src/main/java/org/opensearch/gateway/AsyncShardBatchFetch.java b/server/src/main/java/org/opensearch/gateway/AsyncShardBatchFetch.java index 4f39a39cea678..df642a9f5a743 100644 --- a/server/src/main/java/org/opensearch/gateway/AsyncShardBatchFetch.java +++ b/server/src/main/java/org/opensearch/gateway/AsyncShardBatchFetch.java @@ -80,6 +80,14 @@ public synchronized void clearShard(ShardId shardId) { this.cache.deleteShard(shardId); } + public boolean hasEmptyCache() { + return this.cache.getCache().isEmpty(); + } + + public AsyncShardFetchCache getCache() { + return this.cache; + } + /** * Cache implementation of transport actions returning batch of shards related data in the response. * Store node level responses of transport actions like {@link TransportNodesListGatewayStartedShardsBatch} or diff --git a/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java b/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java index 55f5388d8f454..9ce9db87979ed 100644 --- a/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java @@ -576,8 +576,33 @@ protected AsyncShardFetch.FetchResult Date: Thu, 25 Jul 2024 18:30:54 +0530 Subject: [PATCH 2/8] Add changelog Signed-off-by: Rahul Karajgikar --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index e88a084f7d7f6..d4c8c955bced4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ## [Unreleased 2.x] ### Added +- Fix for hasInitiatedFetching to fix allocation explain and manual reroute APIs (([#14972](https://github.com/opensearch-project/OpenSearch/pull/14972)) ### Dependencies - Bump `org.apache.commons:commons-lang3` from 3.14.0 to 3.15.0 ([#14861](https://github.com/opensearch-project/OpenSearch/pull/14861)) From b06f596473c955805557b7f5e77d4f3cfbc5f9e8 Mon Sep 17 00:00:00 2001 From: Rahul Karajgikar Date: Fri, 26 Jul 2024 13:08:05 +0530 Subject: [PATCH 3/8] Fix RecoveryFromGatewayIT Signed-off-by: Rahul Karajgikar --- .../opensearch/gateway/RecoveryFromGatewayIT.java | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java b/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java index 06fba3e942d4b..8f88afa46dd6e 100644 --- a/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java @@ -797,7 +797,9 @@ public void testBatchModeEnabledWithoutTimeout() throws Exception { ); assertTrue(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.get(internalCluster().clusterService().getSettings())); assertEquals(1, gatewayAllocator.getNumberOfStartedShardBatches()); - assertEquals(1, gatewayAllocator.getNumberOfStoreShardBatches()); + // Replica shard would be marked ineligible since there are no data nodes. + // It would then be removed from any batch and batches would get deleted, so we would have 0 replica batches + assertEquals(0, gatewayAllocator.getNumberOfStoreShardBatches()); // Now start both data nodes and ensure batch mode is working logger.info("--> restarting the stopped nodes"); @@ -1300,7 +1302,9 @@ public void testNBatchesCreationAndAssignment() throws Exception { ); assertTrue(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.get(internalCluster().clusterService().getSettings())); assertEquals(10, gatewayAllocator.getNumberOfStartedShardBatches()); - assertEquals(10, gatewayAllocator.getNumberOfStoreShardBatches()); + // All replica shards would be marked ineligible since there are no data nodes. + // They would then be removed from any batch and batches would get deleted, so we would have 0 replica batches + assertEquals(0, gatewayAllocator.getNumberOfStoreShardBatches()); health = client(internalCluster().getClusterManagerName()).admin().cluster().health(Requests.clusterHealthRequest()).actionGet(); assertFalse(health.isTimedOut()); assertEquals(RED, health.getStatus()); @@ -1389,7 +1393,9 @@ public void testCulpritShardInBatch() throws Exception { ); assertTrue(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.get(internalCluster().clusterService().getSettings())); assertEquals(1, gatewayAllocator.getNumberOfStartedShardBatches()); - assertEquals(1, gatewayAllocator.getNumberOfStoreShardBatches()); + // Replica shard would be marked ineligible since there are no data nodes. + // It would then be removed from any batch and batches would get deleted, so we would have 0 replica batches + assertEquals(0, gatewayAllocator.getNumberOfStoreShardBatches()); assertTrue(clusterRerouteResponse.isAcknowledged()); health = client(internalCluster().getClusterManagerName()).admin().cluster().health(Requests.clusterHealthRequest()).actionGet(); assertFalse(health.isTimedOut()); From 3f024b7e3fbf3b4e0456a093ecb949d3e61be7d1 Mon Sep 17 00:00:00 2001 From: Rahul Karajgikar Date: Fri, 26 Jul 2024 16:50:37 +0530 Subject: [PATCH 4/8] Add assertion to IT Signed-off-by: Rahul Karajgikar --- .../gateway/RecoveryFromGatewayIT.java | 163 +++++++++--------- 1 file changed, 79 insertions(+), 84 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java b/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java index 8f88afa46dd6e..e4115d9d42ae2 100644 --- a/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java @@ -57,6 +57,7 @@ import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.ShardRoutingState; import org.opensearch.cluster.routing.UnassignedInfo; +import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision; import org.opensearch.cluster.routing.allocation.AllocationDecision; import org.opensearch.cluster.routing.allocation.ExistingShardsAllocator; import org.opensearch.cluster.service.ClusterService; @@ -1080,20 +1081,19 @@ public void testAllocationExplainReturnsNoWhenExtraReplicaShardInNonBatchMode() logger.info("--> calling allocation explain API"); // shard should have decision NO because there is no valid node for the extra replica to go to - assertEquals( - AllocationDecision.NO, - client().admin() - .cluster() - .prepareAllocationExplain() - .setIndex("test") - .setShard(0) - .setPrimary(false) - .get() - .getExplanation() - .getShardAllocationDecision() - .getAllocateDecision() - .getAllocationDecision() - ); + AllocateUnassignedDecision aud = client().admin() + .cluster() + .prepareAllocationExplain() + .setIndex("test") + .setShard(0) + .setPrimary(false) + .get() + .getExplanation() + .getShardAllocationDecision() + .getAllocateDecision(); + + assertEquals(AllocationDecision.NO, aud.getAllocationDecision()); + assertEquals("cannot allocate because allocation is not permitted to any of the nodes", aud.getExplanation()); // Now creating a new index with too many replicas and trying again createIndex( @@ -1105,20 +1105,19 @@ public void testAllocationExplainReturnsNoWhenExtraReplicaShardInNonBatchMode() logger.info("--> calling allocation explain API again"); // shard should have decision NO because there are 6 replicas and 4 data nodes - assertEquals( - AllocationDecision.NO, - client().admin() - .cluster() - .prepareAllocationExplain() - .setIndex("test2") - .setShard(0) - .setPrimary(false) - .get() - .getExplanation() - .getShardAllocationDecision() - .getAllocateDecision() - .getAllocationDecision() - ); + aud = client().admin() + .cluster() + .prepareAllocationExplain() + .setIndex("test2") + .setShard(0) + .setPrimary(false) + .get() + .getExplanation() + .getShardAllocationDecision() + .getAllocateDecision(); + + assertEquals(AllocationDecision.NO, aud.getAllocationDecision()); + assertEquals("cannot allocate because allocation is not permitted to any of the nodes", aud.getExplanation()); logger.info("--> restarting the stopped node"); internalCluster().startDataOnlyNode( @@ -1130,20 +1129,19 @@ public void testAllocationExplainReturnsNoWhenExtraReplicaShardInNonBatchMode() logger.info("--> calling allocation explain API 3rd time"); // shard should still have decision NO because there are 6 replicas and 5 data nodes - assertEquals( - AllocationDecision.NO, - client().admin() - .cluster() - .prepareAllocationExplain() - .setIndex("test2") - .setShard(0) - .setPrimary(false) - .get() - .getExplanation() - .getShardAllocationDecision() - .getAllocateDecision() - .getAllocationDecision() - ); + aud = client().admin() + .cluster() + .prepareAllocationExplain() + .setIndex("test2") + .setShard(0) + .setPrimary(false) + .get() + .getExplanation() + .getShardAllocationDecision() + .getAllocateDecision(); + + assertEquals(AllocationDecision.NO, aud.getAllocationDecision()); + assertEquals("cannot allocate because allocation is not permitted to any of the nodes", aud.getExplanation()); internalCluster().startDataOnlyNodes(1); @@ -1178,20 +1176,19 @@ public void testAllocationExplainReturnsNoWhenExtraReplicaShardInBatchMode() thr logger.info("--> calling allocation explain API"); // shard should have decision NO because there is no valid node for the extra replica to go to - assertEquals( - AllocationDecision.NO, - client().admin() - .cluster() - .prepareAllocationExplain() - .setIndex("test") - .setShard(0) - .setPrimary(false) - .get() - .getExplanation() - .getShardAllocationDecision() - .getAllocateDecision() - .getAllocationDecision() - ); + AllocateUnassignedDecision aud = client().admin() + .cluster() + .prepareAllocationExplain() + .setIndex("test") + .setShard(0) + .setPrimary(false) + .get() + .getExplanation() + .getShardAllocationDecision() + .getAllocateDecision(); + + assertEquals(AllocationDecision.NO, aud.getAllocationDecision()); + assertEquals("cannot allocate because allocation is not permitted to any of the nodes", aud.getExplanation()); // Now creating a new index with too many replicas and trying again createIndex( @@ -1203,20 +1200,19 @@ public void testAllocationExplainReturnsNoWhenExtraReplicaShardInBatchMode() thr logger.info("--> calling allocation explain API again"); // shard should have decision NO because there are 6 replicas and 4 data nodes - assertEquals( - AllocationDecision.NO, - client().admin() - .cluster() - .prepareAllocationExplain() - .setIndex("test2") - .setShard(0) - .setPrimary(false) - .get() - .getExplanation() - .getShardAllocationDecision() - .getAllocateDecision() - .getAllocationDecision() - ); + aud = client().admin() + .cluster() + .prepareAllocationExplain() + .setIndex("test2") + .setShard(0) + .setPrimary(false) + .get() + .getExplanation() + .getShardAllocationDecision() + .getAllocateDecision(); + + assertEquals(AllocationDecision.NO, aud.getAllocationDecision()); + assertEquals("cannot allocate because allocation is not permitted to any of the nodes", aud.getExplanation()); logger.info("--> restarting the stopped node"); internalCluster().startDataOnlyNode( @@ -1228,20 +1224,19 @@ public void testAllocationExplainReturnsNoWhenExtraReplicaShardInBatchMode() thr logger.info("--> calling allocation explain API 3rd time"); // shard should still have decision NO because there are 6 replicas and 5 data nodes - assertEquals( - AllocationDecision.NO, - client().admin() - .cluster() - .prepareAllocationExplain() - .setIndex("test2") - .setShard(0) - .setPrimary(false) - .get() - .getExplanation() - .getShardAllocationDecision() - .getAllocateDecision() - .getAllocationDecision() - ); + aud = client().admin() + .cluster() + .prepareAllocationExplain() + .setIndex("test2") + .setShard(0) + .setPrimary(false) + .get() + .getExplanation() + .getShardAllocationDecision() + .getAllocateDecision(); + + assertEquals(AllocationDecision.NO, aud.getAllocationDecision()); + assertEquals("cannot allocate because allocation is not permitted to any of the nodes", aud.getExplanation()); internalCluster().startDataOnlyNodes(1); From b5d0dc82b5511f39f1587bd106abdf44c5df5284 Mon Sep 17 00:00:00 2001 From: Rahul Karajgikar Date: Fri, 26 Jul 2024 16:57:36 +0530 Subject: [PATCH 5/8] Add comment to javadoc Signed-off-by: Rahul Karajgikar --- .../org/opensearch/gateway/ShardsBatchGatewayAllocator.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java b/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java index 9ce9db87979ed..decaf474926c6 100644 --- a/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java @@ -583,6 +583,8 @@ protected boolean hasInitiatedFetching(ShardRouting shard) { * Allocation explain and manual reroute APIs try to append shard store information (matching bytes) to the allocation decision. * However, these APIs do not want to trigger a new asyncFetch for these ineligible shards, unless the data from nodes is already there. * This function is used to see if a fetch has happened to decide if it is possible to append shard store info without a new async fetch. + * In the case when shard has a batch but no fetch has happened before, it would be because it is a new batch. + * In the case when shard has a batch, and a fetch has happened before, and no fetch is ongoing, it would be because we have already completed fetch for all nodes. * * In order to check if a fetch has ever happened, we check 2 things: * 1. If the shard batch cache is empty, we know that fetch has never happened so we return false. From 3cd70c56aa728662c33c545ba37d8ce6ceeb1d58 Mon Sep 17 00:00:00 2001 From: Rahul Karajgikar Date: Fri, 26 Jul 2024 17:56:38 +0530 Subject: [PATCH 6/8] Fix recently added IT Signed-off-by: Rahul Karajgikar --- .../org/opensearch/gateway/RecoveryFromGatewayIT.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java b/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java index e4115d9d42ae2..6d41d0227fd25 100644 --- a/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java @@ -845,7 +845,9 @@ public void testBatchModeEnabledWithSufficientTimeoutAndClusterGreen() throws Ex ); assertTrue(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.get(internalCluster().clusterService().getSettings())); assertEquals(1, gatewayAllocator.getNumberOfStartedShardBatches()); - assertEquals(1, gatewayAllocator.getNumberOfStoreShardBatches()); + // Replica shard would be marked ineligible since there are no data nodes. + // It would then be removed from any batch and batches would get deleted, so we would have 0 replica batches + assertEquals(0, gatewayAllocator.getNumberOfStoreShardBatches()); // Now start both data nodes and ensure batch mode is working logger.info("--> restarting the stopped nodes"); @@ -910,7 +912,9 @@ public void testBatchModeEnabledWithInSufficientTimeoutButClusterGreen() throws assertTrue(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.get(internalCluster().clusterService().getSettings())); assertEquals(10, gatewayAllocator.getNumberOfStartedShardBatches()); - assertEquals(10, gatewayAllocator.getNumberOfStoreShardBatches()); + // All replica shards would be marked ineligible since there are no data nodes. + // They would then be removed from any batch and batches would get deleted, so we would have 0 replica batches + assertEquals(0, gatewayAllocator.getNumberOfStoreShardBatches()); health = client(internalCluster().getClusterManagerName()).admin().cluster().health(Requests.clusterHealthRequest()).actionGet(); assertFalse(health.isTimedOut()); assertEquals(RED, health.getStatus()); From b29502bfed6abb1e6d2102ef76ec84bc7f366fe4 Mon Sep 17 00:00:00 2001 From: Rahul Karajgikar Date: Mon, 29 Jul 2024 10:29:05 +0530 Subject: [PATCH 7/8] Add null checks and update ITs Signed-off-by: Rahul Karajgikar --- .../gateway/RecoveryFromGatewayIT.java | 309 +++++++----------- .../gateway/ShardsBatchGatewayAllocator.java | 6 +- 2 files changed, 129 insertions(+), 186 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java b/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java index 6d41d0227fd25..eccc903dfac82 100644 --- a/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java @@ -802,9 +802,22 @@ public void testBatchModeEnabledWithoutTimeout() throws Exception { // It would then be removed from any batch and batches would get deleted, so we would have 0 replica batches assertEquals(0, gatewayAllocator.getNumberOfStoreShardBatches()); - // Now start both data nodes and ensure batch mode is working - logger.info("--> restarting the stopped nodes"); + // Now start one data node + logger.info("--> restarting the first stopped node"); internalCluster().startDataOnlyNode(Settings.builder().put("node.name", dataOnlyNodes.get(0)).put(node0DataPathSettings).build()); + ensureStableCluster(2); + ensureYellow("test"); + assertEquals(0, gatewayAllocator.getNumberOfStartedShardBatches()); + assertEquals(0, gatewayAllocator.getNumberOfStoreShardBatches()); + assertEquals(0, gatewayAllocator.getNumberOfInFlightFetches()); + + // calling reroute and asserting on reroute response + logger.info("--> calling reroute while cluster is yellow"); + clusterRerouteResponse = client().admin().cluster().prepareReroute().setRetryFailed(true).get(); + assertTrue(clusterRerouteResponse.isAcknowledged()); + + // Now start last data node and ensure batch mode is working and cluster goes green + logger.info("--> restarting the second stopped node"); internalCluster().startDataOnlyNode(Settings.builder().put("node.name", dataOnlyNodes.get(1)).put(node1DataPathSettings).build()); ensureStableCluster(3); ensureGreen("test"); @@ -849,9 +862,22 @@ public void testBatchModeEnabledWithSufficientTimeoutAndClusterGreen() throws Ex // It would then be removed from any batch and batches would get deleted, so we would have 0 replica batches assertEquals(0, gatewayAllocator.getNumberOfStoreShardBatches()); - // Now start both data nodes and ensure batch mode is working - logger.info("--> restarting the stopped nodes"); + // Now start one data nodes and ensure batch mode is working + logger.info("--> restarting the first stopped node"); internalCluster().startDataOnlyNode(Settings.builder().put("node.name", dataOnlyNodes.get(0)).put(node0DataPathSettings).build()); + ensureStableCluster(2); + ensureYellow("test"); + assertEquals(0, gatewayAllocator.getNumberOfStartedShardBatches()); + assertEquals(0, gatewayAllocator.getNumberOfStoreShardBatches()); + assertEquals(0, gatewayAllocator.getNumberOfInFlightFetches()); + + // calling reroute and asserting on reroute response + logger.info("--> calling reroute while cluster is yellow"); + clusterRerouteResponse = client().admin().cluster().prepareReroute().setRetryFailed(true).get(); + assertTrue(clusterRerouteResponse.isAcknowledged()); + + // Now start last data node and ensure batch mode is working and cluster goes green + logger.info("--> restarting the second stopped node"); internalCluster().startDataOnlyNode(Settings.builder().put("node.name", dataOnlyNodes.get(1)).put(node1DataPathSettings).build()); ensureStableCluster(3); ensureGreen("test"); @@ -1061,191 +1087,13 @@ public void testMultipleReplicaShardAssignmentWithDelayedAllocationAndDifferentN public void testAllocationExplainReturnsNoWhenExtraReplicaShardInNonBatchMode() throws Exception { // Non batch mode - This test is to validate that we don't return AWAITING_INFO in allocation explain API when the deciders are // returning NO - internalCluster().startClusterManagerOnlyNodes( - 1, - Settings.builder().put(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.getKey(), false).build() - ); - internalCluster().startDataOnlyNodes(5); - createIndex( - "test", - Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 4).build() - ); - ensureGreen("test"); - ensureStableCluster(6); - - // Stop one of the nodes to make the cluster yellow - // We cannot directly create an index with replica = data node count because then the whole flow will get skipped due to - // INDEX_CREATED - List nodesWithReplicaShards = findNodesWithShard(false); - Settings replicaNodeDataPathSettings = internalCluster().dataPathSettings(nodesWithReplicaShards.get(0)); - internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodesWithReplicaShards.get(0))); - - ensureStableCluster(5); - ensureYellow("test"); - - logger.info("--> calling allocation explain API"); - // shard should have decision NO because there is no valid node for the extra replica to go to - AllocateUnassignedDecision aud = client().admin() - .cluster() - .prepareAllocationExplain() - .setIndex("test") - .setShard(0) - .setPrimary(false) - .get() - .getExplanation() - .getShardAllocationDecision() - .getAllocateDecision(); - - assertEquals(AllocationDecision.NO, aud.getAllocationDecision()); - assertEquals("cannot allocate because allocation is not permitted to any of the nodes", aud.getExplanation()); - - // Now creating a new index with too many replicas and trying again - createIndex( - "test2", - Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 5).build() - ); - - ensureYellowAndNoInitializingShards("test2"); - - logger.info("--> calling allocation explain API again"); - // shard should have decision NO because there are 6 replicas and 4 data nodes - aud = client().admin() - .cluster() - .prepareAllocationExplain() - .setIndex("test2") - .setShard(0) - .setPrimary(false) - .get() - .getExplanation() - .getShardAllocationDecision() - .getAllocateDecision(); - - assertEquals(AllocationDecision.NO, aud.getAllocationDecision()); - assertEquals("cannot allocate because allocation is not permitted to any of the nodes", aud.getExplanation()); - - logger.info("--> restarting the stopped node"); - internalCluster().startDataOnlyNode( - Settings.builder().put("node.name", nodesWithReplicaShards.get(0)).put(replicaNodeDataPathSettings).build() - ); - - ensureStableCluster(6); - ensureGreen("test"); - - logger.info("--> calling allocation explain API 3rd time"); - // shard should still have decision NO because there are 6 replicas and 5 data nodes - aud = client().admin() - .cluster() - .prepareAllocationExplain() - .setIndex("test2") - .setShard(0) - .setPrimary(false) - .get() - .getExplanation() - .getShardAllocationDecision() - .getAllocateDecision(); - - assertEquals(AllocationDecision.NO, aud.getAllocationDecision()); - assertEquals("cannot allocate because allocation is not permitted to any of the nodes", aud.getExplanation()); - - internalCluster().startDataOnlyNodes(1); - - ensureStableCluster(7); - ensureGreen("test2"); + this.allocationExplainReturnsNoWhenExtraReplicaShard(false); } public void testAllocationExplainReturnsNoWhenExtraReplicaShardInBatchMode() throws Exception { // Batch mode - This test is to validate that we don't return AWAITING_INFO in allocation explain API when the deciders are // returning NO - internalCluster().startClusterManagerOnlyNodes( - 1, - Settings.builder().put(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.getKey(), true).build() - ); - internalCluster().startDataOnlyNodes(5); - createIndex( - "test", - Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 4).build() - ); - ensureGreen("test"); - ensureStableCluster(6); - - // Stop one of the nodes to make the cluster yellow - // We cannot directly create an index with replica = data node count because then the whole flow will get skipped due to - // INDEX_CREATED - List nodesWithReplicaShards = findNodesWithShard(false); - Settings replicaNodeDataPathSettings = internalCluster().dataPathSettings(nodesWithReplicaShards.get(0)); - internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodesWithReplicaShards.get(0))); - - ensureStableCluster(5); - ensureYellow("test"); - - logger.info("--> calling allocation explain API"); - // shard should have decision NO because there is no valid node for the extra replica to go to - AllocateUnassignedDecision aud = client().admin() - .cluster() - .prepareAllocationExplain() - .setIndex("test") - .setShard(0) - .setPrimary(false) - .get() - .getExplanation() - .getShardAllocationDecision() - .getAllocateDecision(); - - assertEquals(AllocationDecision.NO, aud.getAllocationDecision()); - assertEquals("cannot allocate because allocation is not permitted to any of the nodes", aud.getExplanation()); - - // Now creating a new index with too many replicas and trying again - createIndex( - "test2", - Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 5).build() - ); - - ensureYellowAndNoInitializingShards("test2"); - - logger.info("--> calling allocation explain API again"); - // shard should have decision NO because there are 6 replicas and 4 data nodes - aud = client().admin() - .cluster() - .prepareAllocationExplain() - .setIndex("test2") - .setShard(0) - .setPrimary(false) - .get() - .getExplanation() - .getShardAllocationDecision() - .getAllocateDecision(); - - assertEquals(AllocationDecision.NO, aud.getAllocationDecision()); - assertEquals("cannot allocate because allocation is not permitted to any of the nodes", aud.getExplanation()); - - logger.info("--> restarting the stopped node"); - internalCluster().startDataOnlyNode( - Settings.builder().put("node.name", nodesWithReplicaShards.get(0)).put(replicaNodeDataPathSettings).build() - ); - - ensureStableCluster(6); - ensureGreen("test"); - - logger.info("--> calling allocation explain API 3rd time"); - // shard should still have decision NO because there are 6 replicas and 5 data nodes - aud = client().admin() - .cluster() - .prepareAllocationExplain() - .setIndex("test2") - .setShard(0) - .setPrimary(false) - .get() - .getExplanation() - .getShardAllocationDecision() - .getAllocateDecision(); - - assertEquals(AllocationDecision.NO, aud.getAllocationDecision()); - assertEquals("cannot allocate because allocation is not permitted to any of the nodes", aud.getExplanation()); - - internalCluster().startDataOnlyNodes(1); - - ensureStableCluster(7); - ensureGreen("test2"); + this.allocationExplainReturnsNoWhenExtraReplicaShard(true); } public void testNBatchesCreationAndAssignment() throws Exception { @@ -1712,4 +1560,97 @@ private List findNodesWithShard(final boolean primary) { Collections.shuffle(requiredStartedShards, random()); return requiredStartedShards.stream().map(shard -> state.nodes().get(shard.currentNodeId()).getName()).collect(Collectors.toList()); } + + private void allocationExplainReturnsNoWhenExtraReplicaShard(boolean batchModeEnabled) throws Exception { + internalCluster().startClusterManagerOnlyNodes( + 1, + Settings.builder().put(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.getKey(), batchModeEnabled).build() + ); + internalCluster().startDataOnlyNodes(5); + createIndex( + "test", + Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 4).build() + ); + ensureGreen("test"); + ensureStableCluster(6); + + // Stop one of the nodes to make the cluster yellow + // We cannot directly create an index with replica = data node count because then the whole flow will get skipped due to + // INDEX_CREATED + List nodesWithReplicaShards = findNodesWithShard(false); + Settings replicaNodeDataPathSettings = internalCluster().dataPathSettings(nodesWithReplicaShards.get(0)); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodesWithReplicaShards.get(0))); + + ensureStableCluster(5); + ensureYellow("test"); + + logger.info("--> calling allocation explain API"); + // shard should have decision NO because there is no valid node for the extra replica to go to + AllocateUnassignedDecision aud = client().admin() + .cluster() + .prepareAllocationExplain() + .setIndex("test") + .setShard(0) + .setPrimary(false) + .get() + .getExplanation() + .getShardAllocationDecision() + .getAllocateDecision(); + + assertEquals(AllocationDecision.NO, aud.getAllocationDecision()); + assertEquals("cannot allocate because allocation is not permitted to any of the nodes", aud.getExplanation()); + + // Now creating a new index with too many replicas and trying again + createIndex( + "test2", + Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 5).build() + ); + + ensureYellowAndNoInitializingShards("test2"); + + logger.info("--> calling allocation explain API again"); + // shard should have decision NO because there are 6 replicas and 4 data nodes + aud = client().admin() + .cluster() + .prepareAllocationExplain() + .setIndex("test2") + .setShard(0) + .setPrimary(false) + .get() + .getExplanation() + .getShardAllocationDecision() + .getAllocateDecision(); + + assertEquals(AllocationDecision.NO, aud.getAllocationDecision()); + assertEquals("cannot allocate because allocation is not permitted to any of the nodes", aud.getExplanation()); + + logger.info("--> restarting the stopped node"); + internalCluster().startDataOnlyNode( + Settings.builder().put("node.name", nodesWithReplicaShards.get(0)).put(replicaNodeDataPathSettings).build() + ); + + ensureStableCluster(6); + ensureGreen("test"); + + logger.info("--> calling allocation explain API 3rd time"); + // shard should still have decision NO because there are 6 replicas and 5 data nodes + aud = client().admin() + .cluster() + .prepareAllocationExplain() + .setIndex("test2") + .setShard(0) + .setPrimary(false) + .get() + .getExplanation() + .getShardAllocationDecision() + .getAllocateDecision(); + + assertEquals(AllocationDecision.NO, aud.getAllocationDecision()); + assertEquals("cannot allocate because allocation is not permitted to any of the nodes", aud.getExplanation()); + + internalCluster().startDataOnlyNodes(1); + + ensureStableCluster(7); + ensureGreen("test2"); + } } diff --git a/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java b/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java index decaf474926c6..673ed8dbaa1c3 100644 --- a/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java @@ -594,12 +594,14 @@ protected boolean hasInitiatedFetching(ShardRouting shard) { * see {@link AsyncShardFetchCache#findNodesToFetch()} */ String batchId = getBatchId(shard, shard.primary()); + if (batchId == null) { + return false; + } logger.trace("Checking if fetching done for batch id {}", batchId); ShardsBatch shardsBatch = shard.primary() ? batchIdToStartedShardBatch.get(batchId) : batchIdToStoreShardBatch.get(batchId); - // if fetchData has never been called, the per node cache will be empty and have no nodes // this is because cache.fillShardCacheWithDataNodes(nodes) initialises this map and is called in AsyncShardFetch.fetchData - if (shardsBatch.getAsyncFetcher().hasEmptyCache()) { + if (shardsBatch == null || shardsBatch.getAsyncFetcher().hasEmptyCache()) { logger.trace("Batch cache is empty for batch {} ", batchId); return false; } From f1bcdc825e0a97eaf1050a35ead41cb9fc64025a Mon Sep 17 00:00:00 2001 From: Rahul Karajgikar Date: Mon, 29 Jul 2024 12:04:19 +0530 Subject: [PATCH 8/8] Add logging for decision Signed-off-by: Rahul Karajgikar --- .../java/org/opensearch/gateway/ReplicaShardBatchAllocator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/gateway/ReplicaShardBatchAllocator.java b/server/src/main/java/org/opensearch/gateway/ReplicaShardBatchAllocator.java index 7c75f2a5d1a8f..0818b187271cb 100644 --- a/server/src/main/java/org/opensearch/gateway/ReplicaShardBatchAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/ReplicaShardBatchAllocator.java @@ -183,7 +183,7 @@ private AllocateUnassignedDecision getUnassignedShardAllocationDecision( if (allocationDecision.type() != Decision.Type.YES && (!explain || !hasInitiatedFetching(shardRouting))) { // only return early if we are not in explain mode, or we are in explain mode but we have not // yet attempted to fetch any shard data - logger.trace("{}: ignoring allocation, can't be allocated on any node", shardRouting); + logger.trace("{}: ignoring allocation, can't be allocated on any node. Decision: {}", shardRouting, allocationDecision.type()); return AllocateUnassignedDecision.no( UnassignedInfo.AllocationStatus.fromDecision(allocationDecision.type()), result.v2() != null ? new ArrayList<>(result.v2().values()) : null