diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index a8d4c79c14..61799035d3 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -3041,7 +3041,18 @@ int clusterProcessPacket(clusterLink *link) { clusterMsg *hdr = (clusterMsg *)link->rcvbuf; uint16_t type = ntohs(hdr->type); mstime_t now = mstime(); + + uint16_t flags = ntohs(hdr->flags); + uint64_t sender_claimed_current_epoch = 0, sender_claimed_config_epoch = 0; clusterNode *sender = getNodeFromLinkAndMsg(link, hdr); + int sender_claims_to_be_primary = !memcmp(hdr->replicaof, CLUSTER_NODE_NULL_NAME, CLUSTER_NAMELEN); + int sender_last_reported_as_replica = sender && nodeIsReplica(sender); + int sender_last_reported_as_primary = sender && nodeIsPrimary(sender); + + if (sender && (hdr->mflags[0] & CLUSTERMSG_FLAG0_EXT_DATA)) { + sender->flags |= CLUSTER_NODE_EXTENSIONS_SUPPORTED; + } + /* Update the last time we saw any data from this node. We * use this in order to avoid detecting a timeout from a node that * is just sending a lot of data in the cluster bus, for instance @@ -3053,8 +3064,6 @@ int clusterProcessPacket(clusterLink *link) { return pubsubProcessLightPacket(link, type); } - uint16_t flags = ntohs(hdr->flags); - uint64_t senderCurrentEpoch = 0, senderConfigEpoch = 0; if (sender && (hdr->mflags[0] & CLUSTERMSG_FLAG0_EXT_DATA)) { sender->flags |= CLUSTER_NODE_EXTENSIONS_SUPPORTED; @@ -3067,13 +3076,13 @@ int clusterProcessPacket(clusterLink *link) { if (sender && !nodeInHandshake(sender)) { /* Update our currentEpoch if we see a newer epoch in the cluster. */ - senderCurrentEpoch = ntohu64(hdr->currentEpoch); - senderConfigEpoch = ntohu64(hdr->configEpoch); - if (senderCurrentEpoch > server.cluster->currentEpoch) server.cluster->currentEpoch = senderCurrentEpoch; + sender_claimed_current_epoch = ntohu64(hdr->currentEpoch); + sender_claimed_config_epoch = ntohu64(hdr->configEpoch); + if (sender_claimed_current_epoch > server.cluster->currentEpoch) + server.cluster->currentEpoch = sender_claimed_current_epoch; /* Update the sender configEpoch if it is a primary publishing a newer one. */ - if (!memcmp(hdr->replicaof, CLUSTER_NODE_NULL_NAME, sizeof(hdr->replicaof)) && - senderConfigEpoch > sender->configEpoch) { - sender->configEpoch = senderConfigEpoch; + if (sender_claims_to_be_primary && sender_claimed_config_epoch > sender->configEpoch) { + sender->configEpoch = sender_claimed_config_epoch; clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_FSYNC_CONFIG); } /* Update the replication offset info for this node. */ @@ -3235,36 +3244,36 @@ int clusterProcessPacket(clusterLink *link) { /* Check for role switch: replica -> primary or primary -> replica. */ if (sender) { serverLog(LL_DEBUG, "node %.40s (%s) announces that it is a %s in shard %.40s", sender->name, - sender->human_nodename, - !memcmp(hdr->replicaof, CLUSTER_NODE_NULL_NAME, sizeof(hdr->replicaof)) ? "primary" : "replica", - sender->shard_id); - if (!memcmp(hdr->replicaof, CLUSTER_NODE_NULL_NAME, sizeof(hdr->replicaof))) { + sender->human_nodename, sender_claims_to_be_primary ? "primary" : "replica", sender->shard_id); + if (sender_claims_to_be_primary) { /* Node is a primary. */ clusterSetNodeAsPrimary(sender); } else { /* Node is a replica. */ - clusterNode *primary = clusterLookupNode(hdr->replicaof, CLUSTER_NAMELEN); + clusterNode *sender_claimed_primary = clusterLookupNode(hdr->replicaof, CLUSTER_NAMELEN); - if (clusterNodeIsPrimary(sender)) { + if (sender_last_reported_as_primary) { /* Primary turned into a replica! Reconfigure the node. */ - if (primary && areInSameShard(primary, sender)) { + if (sender_claimed_primary && areInSameShard(sender_claimed_primary, sender)) { /* `sender` was a primary and was in the same shard as its new primary */ - if (sender->configEpoch > senderConfigEpoch) { + if (sender->configEpoch > sender_claimed_config_epoch) { serverLog(LL_NOTICE, "Ignore stale message from %.40s (%s) in shard %.40s;" " gossip config epoch: %llu, current config epoch: %llu", sender->name, sender->human_nodename, sender->shard_id, - (unsigned long long)senderConfigEpoch, (unsigned long long)sender->configEpoch); + (unsigned long long)sender_claimed_config_epoch, + (unsigned long long)sender->configEpoch); } else { /* `primary` is still a `replica` in this observer node's view; * update its role and configEpoch */ - clusterSetNodeAsPrimary(primary); - primary->configEpoch = senderConfigEpoch; + clusterSetNodeAsPrimary(sender_claimed_primary); + sender_claimed_primary->configEpoch = sender_claimed_config_epoch; serverLog(LL_NOTICE, "A failover occurred in shard %.40s; node %.40s (%s)" " failed over to node %.40s (%s) with a config epoch of %llu", - sender->shard_id, sender->name, sender->human_nodename, primary->name, - primary->human_nodename, (unsigned long long)primary->configEpoch); + sender->shard_id, sender->name, sender->human_nodename, + sender_claimed_primary->name, sender_claimed_primary->human_nodename, + (unsigned long long)sender_claimed_primary->configEpoch); } } else { /* `sender` was moved to another shard and has become a replica, remove its slot assignment */ @@ -3273,9 +3282,9 @@ int clusterProcessPacket(clusterLink *link) { "Node %.40s (%s) is no longer primary of shard %.40s;" " removed all %d slot(s) it used to own", sender->name, sender->human_nodename, sender->shard_id, slots); - if (primary != NULL) { + if (sender_claimed_primary != NULL) { serverLog(LL_NOTICE, "Node %.40s (%s) is now part of shard %.40s", sender->name, - sender->human_nodename, primary->shard_id); + sender->human_nodename, sender_claimed_primary->shard_id); } } @@ -3287,17 +3296,17 @@ int clusterProcessPacket(clusterLink *link) { } /* Primary node changed for this replica? */ - if (primary && sender->replicaof != primary) { + if (sender_claimed_primary && sender->replicaof != sender_claimed_primary) { if (sender->replicaof) clusterNodeRemoveReplica(sender->replicaof, sender); serverLog(LL_NOTICE, "Node %.40s (%s) is now a replica of node %.40s (%s) in shard %.40s", - sender->name, sender->human_nodename, primary->name, primary->human_nodename, - sender->shard_id); - clusterNodeAddReplica(primary, sender); - sender->replicaof = primary; + sender->name, sender->human_nodename, sender_claimed_primary->name, + sender_claimed_primary->human_nodename, sender->shard_id); + clusterNodeAddReplica(sender_claimed_primary, sender); + sender->replicaof = sender_claimed_primary; /* Update the shard_id when a replica is connected to its * primary in the very first time. */ - updateShardId(sender, primary->shard_id); + updateShardId(sender, sender_claimed_primary->shard_id); /* Update config. */ clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG); @@ -3312,66 +3321,41 @@ int clusterProcessPacket(clusterLink *link) { /* Many checks are only needed if the set of served slots this * instance claims is different compared to the set of slots we have - * for it. Check this ASAP to avoid other computational expansive - * checks later. */ - clusterNode *sender_primary = NULL; /* Sender or its primary if replica. */ - int dirty_slots = 0; /* Sender claimed slots don't match my view? */ - - if (sender) { - sender_primary = clusterNodeIsPrimary(sender) ? sender : sender->replicaof; - if (sender_primary) { - dirty_slots = memcmp(sender_primary->slots, hdr->myslots, sizeof(hdr->myslots)) != 0; - - /* Force dirty when the sending shard owns no slots so that - * we have a chance to examine and repair slot migrating/importing - * states that involve empty shards. */ - dirty_slots |= sender_primary->numslots == 0; - } - } - - /* 1) If the sender of the message is a primary, and we detected that - * the set of slots it claims changed, scan the slots to see if we - * need to update our configuration. */ - if (sender_primary && dirty_slots) - clusterUpdateSlotsConfigWith(sender_primary, senderConfigEpoch, hdr->myslots); - - /* Explicitly check for a replication loop before attempting the replication - * chain folding logic. */ - if (myself->replicaof && myself->replicaof->replicaof && myself->replicaof->replicaof != myself) { - /* Safeguard against sub-replicas. A replica's primary can turn itself - * into a replica if its last slot is removed. If no other node takes - * over the slot, there is nothing else to trigger replica migration. */ - serverLog(LL_NOTICE, "I'm a sub-replica! Reconfiguring myself as a replica of %.40s from %.40s", - myself->replicaof->replicaof->name, myself->replicaof->name); - clusterSetPrimary(myself->replicaof->replicaof, 1); - clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_FSYNC_CONFIG); - } - - /* 2) We also check for the reverse condition, that is, the sender - * claims to serve slots we know are served by a primary with a - * greater configEpoch. If this happens we inform the sender. - * - * This is useful because sometimes after a partition heals, a - * reappearing primary may be the last one to claim a given set of - * hash slots, but with a configuration that other instances know to - * be deprecated. Example: - * - * A and B are primary and replica for slots 1,2,3. - * A is partitioned away, B gets promoted. - * B is partitioned away, and A returns available. - * - * Usually B would PING A publishing its set of served slots and its - * configEpoch, but because of the partition B can't inform A of the - * new configuration, so other nodes that have an updated table must - * do it. In this way A will stop to act as a primary (or can try to - * failover if there are the conditions to win the election). */ - if (sender && dirty_slots) { - int j; - - for (j = 0; j < CLUSTER_SLOTS; j++) { + * for it or if there was a failover in the sender's shard. Check + * this ASAP to avoid other computational expensive checks later.*/ + + if (sender && sender_claims_to_be_primary && + (sender_last_reported_as_replica || memcmp(sender->slots, hdr->myslots, sizeof(hdr->myslots)))) { + /* Make sure CLUSTER_NODE_PRIMARY has already been set by now on sender */ + serverAssert(nodeIsPrimary(sender)); + + /* 1) If the sender of the message is a primary, and we detected that + * the set of slots it claims changed, scan the slots to see if we + * need to update our configuration. */ + clusterUpdateSlotsConfigWith(sender, sender_claimed_config_epoch, hdr->myslots); + + /* 2) We also check for the reverse condition, that is, the sender + * claims to serve slots we know are served by a primary with a + * greater configEpoch. If this happens we inform the sender. + * + * This is useful because sometimes after a partition heals, a + * reappearing primary may be the last one to claim a given set of + * hash slots, but with a configuration that other instances know to + * be deprecated. Example: + * + * A and B are primary and replica for slots 1,2,3. + * A is partitioned away, B gets promoted. + * B is partitioned away, and A returns available. + * + * Usually B would PING A publishing its set of served slots and its + * configEpoch, but because of the partition B can't inform A of the + * new configuration, so other nodes that have an updated table must + * do it. In this way A will stop to act as a primary (or can try to + * failover if there are the conditions to win the election). */ + for (int j = 0; j < CLUSTER_SLOTS; j++) { if (bitmapTestBit(hdr->myslots, j)) { if (server.cluster->slots[j] == sender || isSlotUnclaimed(j)) continue; - if (server.cluster->slots[j]->configEpoch > senderConfigEpoch) { + if (server.cluster->slots[j]->configEpoch > sender_claimed_config_epoch) { serverLog(LL_VERBOSE, "Node %.40s has old slots configuration, sending " "an UPDATE message about %.40s", @@ -3387,10 +3371,22 @@ int clusterProcessPacket(clusterLink *link) { } } + /* Explicitly check for a replication loop before attempting the replication + * chain folding logic. */ + if (myself->replicaof && myself->replicaof->replicaof && myself->replicaof->replicaof != myself) { + /* Safeguard against sub-replicas. A replica's primary can turn itself + * into a replica if its last slot is removed. If no other node takes + * over the slot, there is nothing else to trigger replica migration. */ + serverLog(LL_NOTICE, "I'm a sub-replica! Reconfiguring myself as a replica of %.40s from %.40s", + myself->replicaof->replicaof->name, myself->replicaof->name); + clusterSetPrimary(myself->replicaof->replicaof, 1); + clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_FSYNC_CONFIG); + } + /* If our config epoch collides with the sender's try to fix * the problem. */ - if (sender && clusterNodeIsPrimary(myself) && clusterNodeIsPrimary(sender) && - senderConfigEpoch == myself->configEpoch) { + if (sender && nodeIsPrimary(myself) && nodeIsPrimary(sender) && + sender_claimed_config_epoch == myself->configEpoch) { clusterHandleConfigEpochCollision(sender); } @@ -3442,7 +3438,7 @@ int clusterProcessPacket(clusterLink *link) { /* We consider this vote only if the sender is a primary serving * a non zero number of slots, and its currentEpoch is greater or * equal to epoch where this node started the election. */ - if (clusterNodeIsVotingPrimary(sender) && senderCurrentEpoch >= server.cluster->failover_auth_epoch) { + if (clusterNodeIsVotingPrimary(sender) && sender_claimed_current_epoch >= server.cluster->failover_auth_epoch) { server.cluster->failover_auth_count++; /* Maybe we reached a quorum here, set a flag to make sure * we check ASAP. */ diff --git a/tests/unit/cluster/hostnames.tcl b/tests/unit/cluster/hostnames.tcl index 04b32e380b..232c6cf818 100644 --- a/tests/unit/cluster/hostnames.tcl +++ b/tests/unit/cluster/hostnames.tcl @@ -156,18 +156,18 @@ test "Verify the nodes configured with prefer hostname only show hostname for ne # to accept our isolated nodes connections. At this point they will # start showing up in cluster slots. wait_for_condition 50 100 { - [llength [R 6 CLUSTER SLOTS]] eq 3 + [llength [R 6 CLUSTER SLOTS]] eq 2 } else { fail "Node did not learn about the 2 shards it can talk to" } wait_for_condition 50 100 { - [lindex [get_slot_field [R 6 CLUSTER SLOTS] 1 2 3] 1] eq "shard-1.com" + [lindex [get_slot_field [R 6 CLUSTER SLOTS] 0 2 3] 1] eq "shard-1.com" } else { fail "hostname for shard-1 didn't reach node 6" } wait_for_condition 50 100 { - [lindex [get_slot_field [R 6 CLUSTER SLOTS] 2 2 3] 1] eq "shard-2.com" + [lindex [get_slot_field [R 6 CLUSTER SLOTS] 1 2 3] 1] eq "shard-2.com" } else { fail "hostname for shard-2 didn't reach node 6" }