Skip to content

Commit

Permalink
Upon being elected as master, prefer joins' node info to existing clu…
Browse files Browse the repository at this point in the history
…ster state (elastic#19743)

When we introduces [persistent node ids](elastic#19140) we were concerned that people may copy data folders from one to another resulting in two nodes competing for the same id in the cluster. To solve this we elected to not allow an incoming join if a different with same id already exists in the cluster, or if some other node already has the same transport address as the incoming join. The rationeel there was that it is better to prefer existing nodes and that we can rely on node fault detection to remove any node from the cluster that isn't correct any more, making room for the node that wants to join (and will keep trying).

Sadly there were two problems with this:
1) One minor and easy to fix - we didn't allow for the case where the existing node can have the same network address as the incoming one, but have a different ephemeral id (after node restart). This confused the logic in `AllocationService`, in this rare cases. The cluster is good enough to detect this and recover later on, but it's not clean.
2) The assumption that Node Fault Detection will clean up is *wrong* when the node just won an election (it wasn't master before) and needs to process the incoming joins in order to commit the cluster state and assume it's mastership. In those cases, the Node Fault Detection isn't active. 

This PR fixes these two and prefers incoming nodes to existing node when finishing an election. 
On top of the, on request by @ywelsch , `AllocationService` synchronization between the nodes of the cluster and it's routing table is now explicit rather than something we do all the time. The same goes for promotion of replicas to primaries.
  • Loading branch information
bleskes authored Aug 5, 2016
1 parent 3f6a3c0 commit 609a199
Show file tree
Hide file tree
Showing 88 changed files with 641 additions and 482 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ public void setUp() throws Exception {
RoutingTable routingTable = rb.build();
DiscoveryNodes.Builder nb = DiscoveryNodes.builder();
for (int i = 1; i <= numNodes; i++) {
nb.put(Allocators.newNode("node" + i, Collections.singletonMap("tag", "tag_" + (i % numTags))));
nb.add(Allocators.newNode("node" + i, Collections.singletonMap("tag", "tag_" + (i % numTags))));
}
initialClusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
.metaData(metaData).routingTable(routingTable).nodes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,10 @@ public Builder nodes(DiscoveryNodes nodes) {
return this;
}

public DiscoveryNodes nodes() {
return nodes;
}

public Builder routingResult(RoutingAllocation.Result routingResult) {
this.routingTable = routingResult.routingTable();
this.metaData = routingResult.metaData();
Expand Down Expand Up @@ -723,7 +727,6 @@ public static ClusterState fromBytes(byte[] data, DiscoveryNode localNode) throw
public static ClusterState readFrom(StreamInput in, @Nullable DiscoveryNode localNode) throws IOException {
return PROTO.readFrom(in, localNode);
}

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,14 +357,14 @@ public DiscoveryNodes removeDeadMembers(Set<String> newNodes, String masterNodeI
Builder builder = new Builder().masterNodeId(masterNodeId).localNodeId(localNodeId);
for (DiscoveryNode node : this) {
if (newNodes.contains(node.getId())) {
builder.put(node);
builder.add(node);
}
}
return builder.build();
}

public DiscoveryNodes newNode(DiscoveryNode node) {
return new Builder(this).put(node).build();
return new Builder(this).add(node).build();
}

/**
Expand Down Expand Up @@ -554,8 +554,8 @@ private DiscoveryNodes readFrom(StreamInput in, DiscoveryNode localNode) throws
node = localNode;
}
// some one already built this and validated it's OK, skip the n2 scans
assert builder.validatePut(node) == null : "building disco nodes from network doesn't pass preflight: "
+ builder.validatePut(node);
assert builder.validateAdd(node) == null : "building disco nodes from network doesn't pass preflight: "
+ builder.validateAdd(node);
builder.putUnsafe(node);
}
return builder.build();
Expand Down Expand Up @@ -592,17 +592,27 @@ public Builder(DiscoveryNodes nodes) {

/**
* adds a disco node to the builder. Will throw an {@link IllegalArgumentException} if
* the supplied node doesn't pass the pre-flight checks performed by {@link #validatePut(DiscoveryNode)}
* the supplied node doesn't pass the pre-flight checks performed by {@link #validateAdd(DiscoveryNode)}
*/
public Builder put(DiscoveryNode node) {
final String preflight = validatePut(node);
public Builder add(DiscoveryNode node) {
final String preflight = validateAdd(node);
if (preflight != null) {
throw new IllegalArgumentException(preflight);
}
putUnsafe(node);
return this;
}

/**
* Get a node by its id
*
* @param nodeId id of the wanted node
* @return wanted node if it exists. Otherwise <code>null</code>
*/
@Nullable public DiscoveryNode get(String nodeId) {
return nodes.get(nodeId);
}

private void putUnsafe(DiscoveryNode node) {
nodes.put(node.getId(), node);
}
Expand Down Expand Up @@ -635,20 +645,20 @@ public Builder localNodeId(String localNodeId) {
*
* @return null if all is OK or an error message explaining why a node can not be added.
*
* Note: if this method returns a non-null value, calling {@link #put(DiscoveryNode)} will fail with an
* Note: if this method returns a non-null value, calling {@link #add(DiscoveryNode)} will fail with an
* exception
*/
private String validatePut(DiscoveryNode node) {
private String validateAdd(DiscoveryNode node) {
for (ObjectCursor<DiscoveryNode> cursor : nodes.values()) {
final DiscoveryNode existingNode = cursor.value;
if (node.getAddress().equals(existingNode.getAddress()) &&
node.getId().equals(existingNode.getId()) == false) {
return "can't add node " + node + ", found existing node " + existingNode + " with same address";
}
if (node.getId().equals(existingNode.getId()) &&
node.getAddress().equals(existingNode.getAddress()) == false) {
node.equals(existingNode) == false) {
return "can't add node " + node + ", found existing node " + existingNode
+ " with the same id, but a different address";
+ " with the same id but is a different node instance";
}
}
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,11 +249,36 @@ public Result applyFailedShards(ClusterState clusterState, List<FailedRerouteAll
applyFailedShard(allocation, failedShard, unassignedInfo);
}
gatewayAllocator.applyFailedShards(allocation);

reroute(allocation);
String failedShardsAsString = firstListElementsToCommaDelimitedString(failedShards, s -> s.routingEntry.shardId().toString());
return buildResultAndLogHealthChange(allocation, "shards failed [" + failedShardsAsString + "] ...");
}

/**
* unassigned an shards that are associated with nodes that are no longer part of the cluster, potentially promoting replicas
* if needed.
*/
public RoutingAllocation.Result deassociateDeadNodes(ClusterState clusterState, boolean reroute, String reason) {
RoutingNodes routingNodes = getMutableRoutingNodes(clusterState);
// shuffle the unassigned nodes, just so we won't have things like poison failed shards
routingNodes.unassigned().shuffle();
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState,
clusterInfoService.getClusterInfo(), currentNanoTime(), false);

// first, clear from the shards any node id they used to belong to that is now dead
boolean changed = deassociateDeadNodes(allocation);

if (reroute) {
changed |= reroute(allocation);
}

if (!changed) {
return new RoutingAllocation.Result(false, clusterState.routingTable(), clusterState.metaData());
}
return buildResultAndLogHealthChange(allocation, reason);
}

/**
* Removes delay markers from unassigned shards based on current time stamp. Returns true if markers were removed.
*/
Expand Down Expand Up @@ -352,13 +377,9 @@ private void logClusterHealthStateChange(ClusterStateHealth previousStateHealth,
}

private boolean reroute(RoutingAllocation allocation) {
boolean changed = false;
// first, clear from the shards any node id they used to belong to that is now dead
changed |= deassociateDeadNodes(allocation);
assert deassociateDeadNodes(allocation) == false : "dead nodes should be explicitly cleaned up. See deassociateDeadNodes";

// elect primaries *before* allocating unassigned, so backups of primaries that failed
// will be moved to primary state and not wait for primaries to be allocated and recovered (*from gateway*)
changed |= electPrimariesAndUnassignedDanglingReplicas(allocation);
boolean changed = electPrimariesAndUnassignedDanglingReplicas(allocation);

// now allocate all the unassigned to available nodes
if (allocation.routingNodes().unassigned().size() > 0) {
Expand Down Expand Up @@ -390,8 +411,8 @@ private boolean electPrimariesAndUnassignedDanglingReplicas(RoutingAllocation al
if (candidate != null) {
shardEntry = unassignedIterator.demotePrimaryToReplicaShard();
ShardRouting primarySwappedCandidate = routingNodes.promoteAssignedReplicaShardToPrimary(candidate);
changed = true;
if (primarySwappedCandidate.relocatingNodeId() != null) {
changed = true;
// its also relocating, make sure to move the other routing to primary
RoutingNode node = routingNodes.node(primarySwappedCandidate.relocatingNodeId());
if (node != null) {
Expand All @@ -406,7 +427,6 @@ private boolean electPrimariesAndUnassignedDanglingReplicas(RoutingAllocation al
IndexMetaData index = allocation.metaData().getIndexSafe(primarySwappedCandidate.index());
if (IndexMetaData.isIndexUsingShadowReplicas(index.getSettings())) {
routingNodes.reinitShadowPrimary(primarySwappedCandidate);
changed = true;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public synchronized void setClusterStatePublisher(BiConsumer<ClusterChangedEvent

public synchronized void setLocalNode(DiscoveryNode localNode) {
assert clusterState.nodes().getLocalNodeId() == null : "local node is already set";
DiscoveryNodes.Builder nodeBuilder = DiscoveryNodes.builder(clusterState.nodes()).put(localNode).localNodeId(localNode.getId());
DiscoveryNodes.Builder nodeBuilder = DiscoveryNodes.builder(clusterState.nodes()).add(localNode).localNodeId(localNode.getId());
this.clusterState = ClusterState.builder(clusterState).nodes(nodeBuilder).build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public boolean runOnlyOnMaster() {
public ClusterState execute(ClusterState currentState) {
DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder();
for (LocalDiscovery discovery : clusterGroups.get(clusterName).members()) {
nodesBuilder.put(discovery.localNode());
nodesBuilder.add(discovery.localNode());
}
nodesBuilder.localNodeId(master.localNode().getId()).masterNodeId(master.localNode().getId());
// remove the NO_MASTER block in this case
Expand All @@ -160,7 +160,7 @@ public boolean runOnlyOnMaster() {
public ClusterState execute(ClusterState currentState) {
DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder();
for (LocalDiscovery discovery : clusterGroups.get(clusterName).members()) {
nodesBuilder.put(discovery.localNode());
nodesBuilder.add(discovery.localNode());
}
nodesBuilder.localNodeId(master.localNode().getId()).masterNodeId(master.localNode().getId());
currentState = ClusterState.builder(currentState).nodes(nodesBuilder).build();
Expand Down Expand Up @@ -231,8 +231,8 @@ public ClusterState execute(ClusterState currentState) {
}
// reroute here, so we eagerly remove dead nodes from the routing
ClusterState updatedState = ClusterState.builder(currentState).nodes(newNodes).build();
RoutingAllocation.Result routingResult = master.allocationService.reroute(
ClusterState.builder(updatedState).build(), "elected as master");
RoutingAllocation.Result routingResult = master.allocationService.deassociateDeadNodes(
ClusterState.builder(updatedState).build(), true, "node stopped");
return ClusterState.builder(updatedState).routingResult(routingResult).build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -413,8 +413,7 @@ public BatchResult<DiscoveryNode> execute(ClusterState currentState, List<Discov

final DiscoveryNodes currentNodes = currentState.nodes();
boolean nodesChanged = false;
ClusterState.Builder newState = ClusterState.builder(currentState);
DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(currentNodes);
ClusterState.Builder newState;

if (joiningNodes.size() == 1 && joiningNodes.get(0).equals(FINISH_ELECTION_TASK)) {
return results.successes(joiningNodes).build(currentState);
Expand All @@ -423,16 +422,17 @@ public BatchResult<DiscoveryNode> execute(ClusterState currentState, List<Discov
// use these joins to try and become the master.
// Note that we don't have to do any validation of the amount of joining nodes - the commit
// during the cluster state publishing guarantees that we have enough
nodesBuilder.masterNodeId(currentNodes.getLocalNodeId());
ClusterBlocks clusterBlocks = ClusterBlocks.builder().blocks(currentState.blocks())
.removeGlobalBlock(discoverySettings.getNoMasterBlock()).build();
newState.blocks(clusterBlocks);
newState = becomeMasterAndTrimConflictingNodes(currentState, joiningNodes);
nodesChanged = true;
} else if (nodesBuilder.isLocalNodeElectedMaster() == false) {
} else if (currentNodes.isLocalNodeElectedMaster() == false) {
logger.trace("processing node joins, but we are not the master. current master: {}", currentNodes.getMasterNode());
throw new NotMasterException("Node [" + currentNodes.getLocalNode() + "] not master for join request");
} else {
newState = ClusterState.builder(currentState);
}

DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(newState.nodes());

assert nodesBuilder.isLocalNodeElectedMaster();

// processing any joins
Expand All @@ -443,7 +443,7 @@ public BatchResult<DiscoveryNode> execute(ClusterState currentState, List<Discov
logger.debug("received a join request for an existing node [{}]", node);
} else {
try {
nodesBuilder.put(node);
nodesBuilder.add(node);
nodesChanged = true;
} catch (IllegalArgumentException e) {
results.failure(node, e);
Expand All @@ -468,6 +468,28 @@ public BatchResult<DiscoveryNode> execute(ClusterState currentState, List<Discov
return results.build(newState.build());
}

private ClusterState.Builder becomeMasterAndTrimConflictingNodes(ClusterState currentState, List<DiscoveryNode> joiningNodes) {
assert currentState.nodes().getMasterNodeId() == null : currentState.prettyPrint();
DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(currentState.nodes());
nodesBuilder.masterNodeId(currentState.nodes().getLocalNodeId());
ClusterBlocks clusterBlocks = ClusterBlocks.builder().blocks(currentState.blocks())
.removeGlobalBlock(discoverySettings.getNoMasterBlock()).build();
for (final DiscoveryNode joiningNode : joiningNodes) {
final DiscoveryNode existingNode = nodesBuilder.get(joiningNode.getId());
if (existingNode != null && existingNode.equals(joiningNode) == false) {
logger.debug("removing existing node [{}], which conflicts with incoming join from [{}]", existingNode, joiningNode);
nodesBuilder.remove(existingNode.getId());
}
}

// now trim any left over dead nodes - either left there when the previous master stepped down
// or removed by us above
ClusterState tmpState = ClusterState.builder(currentState).nodes(nodesBuilder).blocks(clusterBlocks).build();
RoutingAllocation.Result result = allocationService.deassociateDeadNodes(tmpState, false,
"removed dead nodes on election");
return ClusterState.builder(tmpState).routingResult(result);
}

@Override
public boolean runOnlyOnMaster() {
// we validate that we are allowed to change the cluster state during cluster state processing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,8 @@ public BatchResult<Task> execute(final ClusterState currentState, final List<Tas
if (!electMasterService.hasEnoughMasterNodes(remainingNodesClusterState.nodes())) {
return resultBuilder.build(rejoin.apply(remainingNodesClusterState, "not enough master nodes"));
} else {
final RoutingAllocation.Result routingResult = allocationService.reroute(remainingNodesClusterState, describeTasks(tasks));
final RoutingAllocation.Result routingResult =
allocationService.deassociateDeadNodes(remainingNodesClusterState, true, describeTasks(tasks));
return resultBuilder.build(ClusterState.builder(remainingNodesClusterState).routingResult(routingResult).build());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ private ClusterState applyUpdate(ClusterState currentState, ClusterChangedEvent
clusterStateChanged = true;
logger.info("[{}] adding node [{}]", tribeName, discoNode);
nodes.remove(tribe.getId()); // remove any existing node with the same id but different ephemeral id
nodes.put(discoNode);
nodes.add(discoNode);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -739,7 +739,7 @@ public void testTasksToXContentGrouping() throws Exception {
// First group by node
DiscoveryNodes.Builder discoNodes = DiscoveryNodes.builder();
for (TestNode testNode : this.testNodes) {
discoNodes.put(testNode.discoveryNode);
discoNodes.add(testNode.discoveryNode);
}
response.setDiscoveryNodes(discoNodes.build());
Map<String, Object> byNodes = serialize(response, new ToXContent.MapParams(Collections.singletonMap("group_by", "nodes")));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ private ClusterState createInitialClusterState(AllocationService service) {
ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING
.getDefault(Settings.EMPTY))
.metaData(metaData).routingTable(routingTable).build();
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2")))
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().add(newNode("node1")).add(newNode("node2")))
.build();
RoutingTable prevRoutingTable = routingTable;
routingTable = service.reroute(clusterState, "reroute").routingTable();
Expand Down
Loading

0 comments on commit 609a199

Please sign in to comment.