Skip to content

Commit

Permalink
Fix cluster info sent stats for message with light header
Browse files Browse the repository at this point in the history
Signed-off-by: Harkrishn Patro <[email protected]>
  • Loading branch information
hpatro committed Jan 16, 2025
1 parent d99457c commit 84596a0
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 11 deletions.
25 changes: 15 additions & 10 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -3859,7 +3859,7 @@ void clusterReadHandler(connection *conn) {
* It is guaranteed that this function will never have as a side effect
* the link to be invalidated, so it is safe to call this function
* from event handlers that will do stuff with the same link later. */
void clusterSendMessage(clusterLink *link, clusterMsgSendBlock *msgblock) {
void clusterSendMessage(clusterLink *link, clusterMsgSendBlock *msgblock, int is_light_hdr) {
if (!link) {
return;
}
Expand All @@ -3874,7 +3874,12 @@ void clusterSendMessage(clusterLink *link, clusterMsgSendBlock *msgblock) {
server.stat_cluster_links_memory += sizeof(listNode);

/* Populate sent messages stats. */
uint16_t type = ntohs(getMessageFromSendBlock(msgblock)->type);
uint16_t type;
if (is_light_hdr) {
type = ntohs(getLightMessageFromSendBlock(msgblock)->type) & ~CLUSTERMSG_MODIFIER_MASK;
} else {
type = ntohs(getMessageFromSendBlock(msgblock)->type);
}
if (type < CLUSTERMSG_TYPE_COUNT) server.cluster->stats_bus_messages_sent[type]++;
}

Expand All @@ -3893,7 +3898,7 @@ void clusterBroadcastMessage(clusterMsgSendBlock *msgblock) {
clusterNode *node = dictGetVal(de);

if (node->flags & (CLUSTER_NODE_MYSELF | CLUSTER_NODE_HANDSHAKE)) continue;
clusterSendMessage(node->link, msgblock);
clusterSendMessage(node->link, msgblock, 0);
}
dictReleaseIterator(di);
}
Expand Down Expand Up @@ -4141,7 +4146,7 @@ void clusterSendPing(clusterLink *link, int type) {
hdr->count = htons(gossipcount);
hdr->totlen = htonl(totlen);

clusterSendMessage(link, msgblock);
clusterSendMessage(link, msgblock, 0);
clusterMsgSendBlockDecrRefCount(msgblock);
}

Expand Down Expand Up @@ -4262,7 +4267,7 @@ void clusterSendUpdate(clusterLink *link, clusterNode *node) {
hdr->data.update.nodecfg.slots[i] & (~server.cluster->owner_not_claiming_slot[i]);
}

clusterSendMessage(link, msgblock);
clusterSendMessage(link, msgblock, 0);
clusterMsgSendBlockDecrRefCount(msgblock);
}

Expand All @@ -4281,7 +4286,7 @@ void clusterSendModule(clusterLink *link, uint64_t module_id, uint8_t type, cons
memcpy(hdr->data.module.msg.bulk_data, payload, len);

if (link)
clusterSendMessage(link, msgblock);
clusterSendMessage(link, msgblock, 0);
else
clusterBroadcastMessage(msgblock);

Expand Down Expand Up @@ -4336,12 +4341,12 @@ void clusterPropagatePublish(robj *channel, robj *message, int sharded) {
while ((node = clusterNodeIterNext(&iter)) != NULL) {
if (node->flags & (CLUSTER_NODE_MYSELF | CLUSTER_NODE_HANDSHAKE)) continue;
if (nodeSupportsLightMsgHdr(node)) {
clusterSendMessage(node->link, msgblock_light);
clusterSendMessage(node->link, msgblock_light, 1);
} else {
if (msgblock == NULL) {
msgblock = clusterCreatePublishMsgBlock(channel, message, 0, sharded);
}
clusterSendMessage(node->link, msgblock);
clusterSendMessage(node->link, msgblock, 0);
}
}
clusterNodeIterReset(&iter);
Expand Down Expand Up @@ -4378,7 +4383,7 @@ void clusterSendFailoverAuth(clusterNode *node) {
uint32_t msglen = sizeof(clusterMsg) - sizeof(union clusterMsgData);
clusterMsgSendBlock *msgblock = createClusterMsgSendBlock(CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK, msglen);

clusterSendMessage(node->link, msgblock);
clusterSendMessage(node->link, msgblock, 0);
clusterMsgSendBlockDecrRefCount(msgblock);
}

Expand All @@ -4389,7 +4394,7 @@ void clusterSendMFStart(clusterNode *node) {
uint32_t msglen = sizeof(clusterMsg) - sizeof(union clusterMsgData);
clusterMsgSendBlock *msgblock = createClusterMsgSendBlock(CLUSTERMSG_TYPE_MFSTART, msglen);

clusterSendMessage(node->link, msgblock);
clusterSendMessage(node->link, msgblock, 0);
clusterMsgSendBlockDecrRefCount(msgblock);
}

Expand Down
18 changes: 17 additions & 1 deletion tests/unit/cluster/pubsub.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,21 @@ test "Test publishing to master" {
test "Test publishing to slave" {
test_cluster_publish 5 10
}

} ;# start_cluster

start_cluster 3 0 {tags {external:skip cluster}} {
test "Test cluster info stats for publish" {
R 0 PUBLISH hello world
assert_equal 2 [CI 0 cluster_stats_messages_publish_sent]
wait_for_condition 50 100 {
[CI 1 cluster_stats_messages_publish_received] eq 1
} else {
fail "node 2 didn't receive clusterbus publish packet"
}
wait_for_condition 50 100 {
[CI 2 cluster_stats_messages_publish_received] eq 1
} else {
fail "node 3 didn't receive clusterbus publish packet"
}
}
}

0 comments on commit 84596a0

Please sign in to comment.