Skip to content

Commit

Permalink
Evaluate and execute decisions for shards in a batch together and mak…
Browse files Browse the repository at this point in the history
…e allocation decision.
  • Loading branch information
Swetha Guptha committed May 29, 2024
1 parent 828b854 commit 19b6530
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,10 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing

int primariesInRecovery = allocation.routingNodes().getInitialPrimariesIncomingRecoveries(node.nodeId());

logger.debug( "ThrottlingAllocationDecider decision, throttle: [{}] primary recovery limit [{}],"
+ " primaries in recovery [{}] invoked for [{}] on node [{}]",
primariesInRecovery >= primariesInitialRecoveries, primariesInitialRecoveries,
primariesInRecovery, shardRouting, node.node() );
if (primariesInRecovery >= primariesInitialRecoveries) {
// TODO: Should index creation not be throttled for primary shards?
return allocation.decision(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.cluster.routing.RoutingNode;
import org.opensearch.cluster.routing.RoutingNodes;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision;
import org.opensearch.cluster.routing.allocation.AllocationDecision;
Expand All @@ -46,9 +45,7 @@
import org.opensearch.cluster.routing.allocation.decider.Decision;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.stream.Collectors;

/**
* An abstract class that implements basic functionality for allocating
Expand Down Expand Up @@ -81,21 +78,6 @@ public void allocateUnassigned(
executeDecision(shardRouting, allocateUnassignedDecision, allocation, unassignedAllocationHandler);
}

/**
* Allocate Batch of unassigned shard to nodes where valid copies of the shard already exists
* @param shardRoutings the shards to allocate
* @param allocation the allocation state container object
*/
public void allocateUnassignedBatch(List<ShardRouting> shardRoutings, RoutingAllocation allocation) {
RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator();
while (iterator.hasNext()) {
ShardRouting shardRouting = iterator.next();
if (shardRoutings.contains(shardRouting)) {
allocateUnassigned(shardRouting, allocation, iterator);
}
}
}

protected void executeDecision(
ShardRouting shardRouting,
AllocateUnassignedDecision allocateUnassignedDecision,
Expand All @@ -119,8 +101,6 @@ protected void executeDecision(
}
}

public void allocateUnassignedBatch(String batchId, RoutingAllocation allocation) {}

protected long getExpectedShardSize(ShardRouting shardRouting, RoutingAllocation allocation) {
if (shardRouting.primary()) {
if (shardRouting.recoverySource().getType() == RecoverySource.Type.SNAPSHOT) {
Expand Down Expand Up @@ -149,21 +129,6 @@ public abstract AllocateUnassignedDecision makeAllocationDecision(
Logger logger
);

public HashMap<ShardRouting, AllocateUnassignedDecision> makeAllocationDecision(
List<ShardRouting> unassignedShardBatch,
RoutingAllocation allocation,
Logger logger
) {

return (HashMap<ShardRouting, AllocateUnassignedDecision>) unassignedShardBatch.stream()
.collect(
Collectors.toMap(
unassignedShard -> unassignedShard,
unassignedShard -> makeAllocationDecision(unassignedShard, allocation, logger)
)
);
}

/**
* Builds decisions for all nodes in the cluster, so that the explain API can provide information on
* allocation decisions for each node, while still waiting to allocate the shard (e.g. due to fetching shard data).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,7 @@
import org.opensearch.gateway.TransportNodesGatewayStartedShardHelper.NodeGatewayStartedShard;
import org.opensearch.gateway.TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShardsBatch;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -62,6 +58,21 @@ protected FetchResult<TransportNodesListGatewayStartedShards.NodeGatewayStartedS
}

@Override
public AllocateUnassignedDecision makeAllocationDecision(ShardRouting unassignedShard, RoutingAllocation allocation, Logger logger) {
AllocateUnassignedDecision decision = getInEligibleShardDecision(unassignedShard, allocation);
if (decision != null) {
return decision;
}
final FetchResult<NodeGatewayStartedShardsBatch> shardsState = fetchData(List.of(unassignedShard), Collections.emptyList(), allocation);
List<NodeGatewayStartedShard> nodeGatewayStartedShards = adaptToNodeShardStates(unassignedShard, shardsState);
return getAllocationDecision(unassignedShard, allocation, nodeGatewayStartedShards, logger);
}

/**
* Allocate Batch of unassigned shard to nodes where valid copies of the shard already exists
* @param shardRoutings the shards to allocate
* @param allocation the allocation state container object
*/
public void allocateUnassignedBatch(List<ShardRouting> shardRoutings, RoutingAllocation allocation) {
HashMap<ShardId, AllocateUnassignedDecision> ineligibleShardAllocationDecisions = new HashMap<>();
List<ShardRouting> eligibleShards = new ArrayList<>();
Expand All @@ -80,22 +91,22 @@ public void allocateUnassignedBatch(List<ShardRouting> shardRoutings, RoutingAll
// only fetch data for eligible shards
final FetchResult<NodeGatewayStartedShardsBatch> shardsState = fetchData(eligibleShards, inEligibleShards, allocation);

List<ShardId> shardIdsFromBatch = shardRoutings.stream()
Set<ShardId> shardIdsFromBatch = shardRoutings.stream()
.map(shardRouting -> shardRouting.shardId())
.collect(Collectors.toList());
.collect(Collectors.toSet());
RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator();
while (iterator.hasNext()) {
ShardRouting unassignedShard = iterator.next();
AllocateUnassignedDecision allocateUnassignedDecision;
AllocateUnassignedDecision allocationDecision;

if (shardIdsFromBatch.contains(unassignedShard.shardId())) {
if (ineligibleShardAllocationDecisions.containsKey(unassignedShard.shardId())) {
allocateUnassignedDecision = ineligibleShardAllocationDecisions.get(unassignedShard.shardId());
allocationDecision = ineligibleShardAllocationDecisions.get(unassignedShard.shardId());
} else {
List<NodeGatewayStartedShard> nodeShardStates = adaptToNodeShardStates(unassignedShard, shardsState);
allocateUnassignedDecision = getAllocationDecision(unassignedShard, allocation, nodeShardStates, logger);
allocationDecision = getAllocationDecision(unassignedShard, allocation, nodeShardStates, logger);
}
executeDecision(unassignedShard, allocateUnassignedDecision, allocation, iterator);
executeDecision(unassignedShard, allocationDecision, allocation, iterator);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,33 @@ protected FetchResult<TransportNodesListShardStoreMetadata.NodeStoreFilesMetadat
}

@Override
public AllocateUnassignedDecision makeAllocationDecision(ShardRouting unassignedShard, RoutingAllocation allocation, Logger logger) {
if (isResponsibleFor(unassignedShard) == false) {
return AllocateUnassignedDecision.NOT_TAKEN;
}
Tuple<Decision, Map<String, NodeAllocationResult>> result = canBeAllocatedToAtLeastOneNode(unassignedShard, allocation);
Decision allocateDecision = result.v1();
if (allocateDecision.type() != Decision.Type.YES
&& (allocation.debugDecision() == false || hasInitiatedFetching(unassignedShard) == false)) {
// only return early if we are not in explain mode, or we are in explain mode but we have not
// yet attempted to fetch any shard data
logger.trace("{}: ignoring allocation, can't be allocated on any node", unassignedShard);
return AllocateUnassignedDecision.no(
UnassignedInfo.AllocationStatus.fromDecision(allocateDecision.type()),
result.v2() != null ? new ArrayList<>(result.v2().values()) : null
);
}

final FetchResult<NodeStoreFilesMetadataBatch> shardsState = fetchData(List.of(unassignedShard), Collections.emptyList(), allocation);
Map<DiscoveryNode, StoreFilesMetadata> nodeShardStores = convertToNodeStoreFilesMetadataMap(unassignedShard, shardsState);
return getAllocationDecision(unassignedShard, allocation, nodeShardStores, result, logger);
}

/**
* Allocate Batch of unassigned shard to nodes where valid copies of the shard already exists
* @param shardRoutings the shards to allocate
* @param allocation the allocation state container object
*/
public void allocateUnassignedBatch(List<ShardRouting> shardRoutings, RoutingAllocation allocation) {
HashMap<ShardId, AllocateUnassignedDecision> ineligibleShardAllocationDecisions = new HashMap<>();
final boolean explain = allocation.debugDecision();
Expand Down Expand Up @@ -147,6 +174,7 @@ public void allocateUnassignedBatch(List<ShardRouting> shardRoutings, RoutingAll
ShardRouting unassignedShard = iterator.next();

if (shardIdsFromBatch.contains(unassignedShard.shardId())) {
logger.info("Executing allocation decision for shard {}", shardIdsFromBatch);
AllocateUnassignedDecision allocateUnassignedDecision;
if (ineligibleShardAllocationDecisions.containsKey(unassignedShard.shardId())) {
allocateUnassignedDecision = ineligibleShardAllocationDecisions.get(unassignedShard.shardId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.routing.UnassignedInfo;
import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision;
import org.opensearch.cluster.routing.allocation.AllocationDecision;
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.opensearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
Expand Down Expand Up @@ -83,18 +81,6 @@ private void allocateAllUnassignedBatch(final RoutingAllocation allocation) {
batchAllocator.allocateUnassignedBatch(shardsToBatch, allocation);
}

public void testMakeAllocationDecisionForReplicaShard() {
final RoutingAllocation allocation = routingAllocationWithOnePrimary(noAllocationDeciders(), CLUSTER_RECOVERED, "allocId1");

List<ShardRouting> replicaShards = allocation.routingTable().getIndicesRouting().get("test").shard(shardId.id()).replicaShards();
List<ShardRouting> shards = new ArrayList<>(replicaShards);
HashMap<ShardRouting, AllocateUnassignedDecision> allDecisions = batchAllocator.makeAllocationDecision(shards, allocation, logger);
// verify we get decisions for all the shards
assertEquals(shards.size(), allDecisions.size());
assertEquals(shards, new ArrayList<>(allDecisions.keySet()));
assertFalse(allDecisions.get(replicaShards.get(0)).isDecisionTaken());
}

public void testAllocateUnassignedBatch() {
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
AllocationDeciders allocationDeciders = randomAllocationDeciders(Settings.builder().build(),
Expand Down

0 comments on commit 19b6530

Please sign in to comment.