Skip to content

Commit

Permalink
Merge branch 'unstable' into new-clstrbs-msg
Browse files Browse the repository at this point in the history
Signed-off-by: Roshan Khatri <[email protected]>
  • Loading branch information
roshkhatri committed Jul 16, 2024
2 parents f4da935 + 66d0f7d commit e5232e1
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 93 deletions.
176 changes: 86 additions & 90 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -3041,7 +3041,18 @@ int clusterProcessPacket(clusterLink *link) {
clusterMsg *hdr = (clusterMsg *)link->rcvbuf;
uint16_t type = ntohs(hdr->type);
mstime_t now = mstime();

uint16_t flags = ntohs(hdr->flags);
uint64_t sender_claimed_current_epoch = 0, sender_claimed_config_epoch = 0;
clusterNode *sender = getNodeFromLinkAndMsg(link, hdr);
int sender_claims_to_be_primary = !memcmp(hdr->replicaof, CLUSTER_NODE_NULL_NAME, CLUSTER_NAMELEN);
int sender_last_reported_as_replica = sender && nodeIsReplica(sender);
int sender_last_reported_as_primary = sender && nodeIsPrimary(sender);

if (sender && (hdr->mflags[0] & CLUSTERMSG_FLAG0_EXT_DATA)) {
sender->flags |= CLUSTER_NODE_EXTENSIONS_SUPPORTED;
}

/* Update the last time we saw any data from this node. We
* use this in order to avoid detecting a timeout from a node that
* is just sending a lot of data in the cluster bus, for instance
Expand All @@ -3053,8 +3064,6 @@ int clusterProcessPacket(clusterLink *link) {
return pubsubProcessLightPacket(link, type);
}

uint16_t flags = ntohs(hdr->flags);
uint64_t senderCurrentEpoch = 0, senderConfigEpoch = 0;

if (sender && (hdr->mflags[0] & CLUSTERMSG_FLAG0_EXT_DATA)) {
sender->flags |= CLUSTER_NODE_EXTENSIONS_SUPPORTED;
Expand All @@ -3067,13 +3076,13 @@ int clusterProcessPacket(clusterLink *link) {

if (sender && !nodeInHandshake(sender)) {
/* Update our currentEpoch if we see a newer epoch in the cluster. */
senderCurrentEpoch = ntohu64(hdr->currentEpoch);
senderConfigEpoch = ntohu64(hdr->configEpoch);
if (senderCurrentEpoch > server.cluster->currentEpoch) server.cluster->currentEpoch = senderCurrentEpoch;
sender_claimed_current_epoch = ntohu64(hdr->currentEpoch);
sender_claimed_config_epoch = ntohu64(hdr->configEpoch);
if (sender_claimed_current_epoch > server.cluster->currentEpoch)
server.cluster->currentEpoch = sender_claimed_current_epoch;
/* Update the sender configEpoch if it is a primary publishing a newer one. */
if (!memcmp(hdr->replicaof, CLUSTER_NODE_NULL_NAME, sizeof(hdr->replicaof)) &&
senderConfigEpoch > sender->configEpoch) {
sender->configEpoch = senderConfigEpoch;
if (sender_claims_to_be_primary && sender_claimed_config_epoch > sender->configEpoch) {
sender->configEpoch = sender_claimed_config_epoch;
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_FSYNC_CONFIG);
}
/* Update the replication offset info for this node. */
Expand Down Expand Up @@ -3235,36 +3244,36 @@ int clusterProcessPacket(clusterLink *link) {
/* Check for role switch: replica -> primary or primary -> replica. */
if (sender) {
serverLog(LL_DEBUG, "node %.40s (%s) announces that it is a %s in shard %.40s", sender->name,
sender->human_nodename,
!memcmp(hdr->replicaof, CLUSTER_NODE_NULL_NAME, sizeof(hdr->replicaof)) ? "primary" : "replica",
sender->shard_id);
if (!memcmp(hdr->replicaof, CLUSTER_NODE_NULL_NAME, sizeof(hdr->replicaof))) {
sender->human_nodename, sender_claims_to_be_primary ? "primary" : "replica", sender->shard_id);
if (sender_claims_to_be_primary) {
/* Node is a primary. */
clusterSetNodeAsPrimary(sender);
} else {
/* Node is a replica. */
clusterNode *primary = clusterLookupNode(hdr->replicaof, CLUSTER_NAMELEN);
clusterNode *sender_claimed_primary = clusterLookupNode(hdr->replicaof, CLUSTER_NAMELEN);

if (clusterNodeIsPrimary(sender)) {
if (sender_last_reported_as_primary) {
/* Primary turned into a replica! Reconfigure the node. */
if (primary && areInSameShard(primary, sender)) {
if (sender_claimed_primary && areInSameShard(sender_claimed_primary, sender)) {
/* `sender` was a primary and was in the same shard as its new primary */
if (sender->configEpoch > senderConfigEpoch) {
if (sender->configEpoch > sender_claimed_config_epoch) {
serverLog(LL_NOTICE,
"Ignore stale message from %.40s (%s) in shard %.40s;"
" gossip config epoch: %llu, current config epoch: %llu",
sender->name, sender->human_nodename, sender->shard_id,
(unsigned long long)senderConfigEpoch, (unsigned long long)sender->configEpoch);
(unsigned long long)sender_claimed_config_epoch,
(unsigned long long)sender->configEpoch);
} else {
/* `primary` is still a `replica` in this observer node's view;
* update its role and configEpoch */
clusterSetNodeAsPrimary(primary);
primary->configEpoch = senderConfigEpoch;
clusterSetNodeAsPrimary(sender_claimed_primary);
sender_claimed_primary->configEpoch = sender_claimed_config_epoch;
serverLog(LL_NOTICE,
"A failover occurred in shard %.40s; node %.40s (%s)"
" failed over to node %.40s (%s) with a config epoch of %llu",
sender->shard_id, sender->name, sender->human_nodename, primary->name,
primary->human_nodename, (unsigned long long)primary->configEpoch);
sender->shard_id, sender->name, sender->human_nodename,
sender_claimed_primary->name, sender_claimed_primary->human_nodename,
(unsigned long long)sender_claimed_primary->configEpoch);
}
} else {
/* `sender` was moved to another shard and has become a replica, remove its slot assignment */
Expand All @@ -3273,9 +3282,9 @@ int clusterProcessPacket(clusterLink *link) {
"Node %.40s (%s) is no longer primary of shard %.40s;"
" removed all %d slot(s) it used to own",
sender->name, sender->human_nodename, sender->shard_id, slots);
if (primary != NULL) {
if (sender_claimed_primary != NULL) {
serverLog(LL_NOTICE, "Node %.40s (%s) is now part of shard %.40s", sender->name,
sender->human_nodename, primary->shard_id);
sender->human_nodename, sender_claimed_primary->shard_id);
}
}

Expand All @@ -3287,17 +3296,17 @@ int clusterProcessPacket(clusterLink *link) {
}

/* Primary node changed for this replica? */
if (primary && sender->replicaof != primary) {
if (sender_claimed_primary && sender->replicaof != sender_claimed_primary) {
if (sender->replicaof) clusterNodeRemoveReplica(sender->replicaof, sender);
serverLog(LL_NOTICE, "Node %.40s (%s) is now a replica of node %.40s (%s) in shard %.40s",
sender->name, sender->human_nodename, primary->name, primary->human_nodename,
sender->shard_id);
clusterNodeAddReplica(primary, sender);
sender->replicaof = primary;
sender->name, sender->human_nodename, sender_claimed_primary->name,
sender_claimed_primary->human_nodename, sender->shard_id);
clusterNodeAddReplica(sender_claimed_primary, sender);
sender->replicaof = sender_claimed_primary;

/* Update the shard_id when a replica is connected to its
* primary in the very first time. */
updateShardId(sender, primary->shard_id);
updateShardId(sender, sender_claimed_primary->shard_id);

/* Update config. */
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
Expand All @@ -3312,66 +3321,41 @@ int clusterProcessPacket(clusterLink *link) {

/* Many checks are only needed if the set of served slots this
* instance claims is different compared to the set of slots we have
* for it. Check this ASAP to avoid other computational expansive
* checks later. */
clusterNode *sender_primary = NULL; /* Sender or its primary if replica. */
int dirty_slots = 0; /* Sender claimed slots don't match my view? */

if (sender) {
sender_primary = clusterNodeIsPrimary(sender) ? sender : sender->replicaof;
if (sender_primary) {
dirty_slots = memcmp(sender_primary->slots, hdr->myslots, sizeof(hdr->myslots)) != 0;

/* Force dirty when the sending shard owns no slots so that
* we have a chance to examine and repair slot migrating/importing
* states that involve empty shards. */
dirty_slots |= sender_primary->numslots == 0;
}
}

/* 1) If the sender of the message is a primary, and we detected that
* the set of slots it claims changed, scan the slots to see if we
* need to update our configuration. */
if (sender_primary && dirty_slots)
clusterUpdateSlotsConfigWith(sender_primary, senderConfigEpoch, hdr->myslots);

/* Explicitly check for a replication loop before attempting the replication
* chain folding logic. */
if (myself->replicaof && myself->replicaof->replicaof && myself->replicaof->replicaof != myself) {
/* Safeguard against sub-replicas. A replica's primary can turn itself
* into a replica if its last slot is removed. If no other node takes
* over the slot, there is nothing else to trigger replica migration. */
serverLog(LL_NOTICE, "I'm a sub-replica! Reconfiguring myself as a replica of %.40s from %.40s",
myself->replicaof->replicaof->name, myself->replicaof->name);
clusterSetPrimary(myself->replicaof->replicaof, 1);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_FSYNC_CONFIG);
}

/* 2) We also check for the reverse condition, that is, the sender
* claims to serve slots we know are served by a primary with a
* greater configEpoch. If this happens we inform the sender.
*
* This is useful because sometimes after a partition heals, a
* reappearing primary may be the last one to claim a given set of
* hash slots, but with a configuration that other instances know to
* be deprecated. Example:
*
* A and B are primary and replica for slots 1,2,3.
* A is partitioned away, B gets promoted.
* B is partitioned away, and A returns available.
*
* Usually B would PING A publishing its set of served slots and its
* configEpoch, but because of the partition B can't inform A of the
* new configuration, so other nodes that have an updated table must
* do it. In this way A will stop to act as a primary (or can try to
* failover if there are the conditions to win the election). */
if (sender && dirty_slots) {
int j;

for (j = 0; j < CLUSTER_SLOTS; j++) {
* for it or if there was a failover in the sender's shard. Check
* this ASAP to avoid other computational expensive checks later.*/

if (sender && sender_claims_to_be_primary &&
(sender_last_reported_as_replica || memcmp(sender->slots, hdr->myslots, sizeof(hdr->myslots)))) {
/* Make sure CLUSTER_NODE_PRIMARY has already been set by now on sender */
serverAssert(nodeIsPrimary(sender));

/* 1) If the sender of the message is a primary, and we detected that
* the set of slots it claims changed, scan the slots to see if we
* need to update our configuration. */
clusterUpdateSlotsConfigWith(sender, sender_claimed_config_epoch, hdr->myslots);

/* 2) We also check for the reverse condition, that is, the sender
* claims to serve slots we know are served by a primary with a
* greater configEpoch. If this happens we inform the sender.
*
* This is useful because sometimes after a partition heals, a
* reappearing primary may be the last one to claim a given set of
* hash slots, but with a configuration that other instances know to
* be deprecated. Example:
*
* A and B are primary and replica for slots 1,2,3.
* A is partitioned away, B gets promoted.
* B is partitioned away, and A returns available.
*
* Usually B would PING A publishing its set of served slots and its
* configEpoch, but because of the partition B can't inform A of the
* new configuration, so other nodes that have an updated table must
* do it. In this way A will stop to act as a primary (or can try to
* failover if there are the conditions to win the election). */
for (int j = 0; j < CLUSTER_SLOTS; j++) {
if (bitmapTestBit(hdr->myslots, j)) {
if (server.cluster->slots[j] == sender || isSlotUnclaimed(j)) continue;
if (server.cluster->slots[j]->configEpoch > senderConfigEpoch) {
if (server.cluster->slots[j]->configEpoch > sender_claimed_config_epoch) {
serverLog(LL_VERBOSE,
"Node %.40s has old slots configuration, sending "
"an UPDATE message about %.40s",
Expand All @@ -3387,10 +3371,22 @@ int clusterProcessPacket(clusterLink *link) {
}
}

/* Explicitly check for a replication loop before attempting the replication
* chain folding logic. */
if (myself->replicaof && myself->replicaof->replicaof && myself->replicaof->replicaof != myself) {
/* Safeguard against sub-replicas. A replica's primary can turn itself
* into a replica if its last slot is removed. If no other node takes
* over the slot, there is nothing else to trigger replica migration. */
serverLog(LL_NOTICE, "I'm a sub-replica! Reconfiguring myself as a replica of %.40s from %.40s",
myself->replicaof->replicaof->name, myself->replicaof->name);
clusterSetPrimary(myself->replicaof->replicaof, 1);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_FSYNC_CONFIG);
}

/* If our config epoch collides with the sender's try to fix
* the problem. */
if (sender && clusterNodeIsPrimary(myself) && clusterNodeIsPrimary(sender) &&
senderConfigEpoch == myself->configEpoch) {
if (sender && nodeIsPrimary(myself) && nodeIsPrimary(sender) &&
sender_claimed_config_epoch == myself->configEpoch) {
clusterHandleConfigEpochCollision(sender);
}

Expand Down Expand Up @@ -3442,7 +3438,7 @@ int clusterProcessPacket(clusterLink *link) {
/* We consider this vote only if the sender is a primary serving
* a non zero number of slots, and its currentEpoch is greater or
* equal to epoch where this node started the election. */
if (clusterNodeIsVotingPrimary(sender) && senderCurrentEpoch >= server.cluster->failover_auth_epoch) {
if (clusterNodeIsVotingPrimary(sender) && sender_claimed_current_epoch >= server.cluster->failover_auth_epoch) {
server.cluster->failover_auth_count++;
/* Maybe we reached a quorum here, set a flag to make sure
* we check ASAP. */
Expand Down
6 changes: 3 additions & 3 deletions tests/unit/cluster/hostnames.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -156,18 +156,18 @@ test "Verify the nodes configured with prefer hostname only show hostname for ne
# to accept our isolated nodes connections. At this point they will
# start showing up in cluster slots.
wait_for_condition 50 100 {
[llength [R 6 CLUSTER SLOTS]] eq 3
[llength [R 6 CLUSTER SLOTS]] eq 2
} else {
fail "Node did not learn about the 2 shards it can talk to"
}
wait_for_condition 50 100 {
[lindex [get_slot_field [R 6 CLUSTER SLOTS] 1 2 3] 1] eq "shard-1.com"
[lindex [get_slot_field [R 6 CLUSTER SLOTS] 0 2 3] 1] eq "shard-1.com"
} else {
fail "hostname for shard-1 didn't reach node 6"
}

wait_for_condition 50 100 {
[lindex [get_slot_field [R 6 CLUSTER SLOTS] 2 2 3] 1] eq "shard-2.com"
[lindex [get_slot_field [R 6 CLUSTER SLOTS] 1 2 3] 1] eq "shard-2.com"
} else {
fail "hostname for shard-2 didn't reach node 6"
}
Expand Down

0 comments on commit e5232e1

Please sign in to comment.