Skip to content

Commit

Permalink
Address feedback
Browse files Browse the repository at this point in the history
Signed-off-by: Roshan Khatri <[email protected]>
  • Loading branch information
roshkhatri committed Jul 18, 2024
1 parent aac7894 commit f0db157
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 9 deletions.
2 changes: 1 addition & 1 deletion src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -3052,7 +3052,7 @@ int clusterProcessPacket(clusterLink *link) {
if (!link->node || nodeInHandshake(link->node)) {
freeClusterLink(link);
serverLog(
LL_WARNING,
LL_NOTICE,
"Closing link for node that sent a lightweight message of type %hu as its first message on the link",
type);
return 0;
Expand Down
12 changes: 6 additions & 6 deletions src/pubsub.c
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,7 @@ int pubsubPublishMessage(robj *channel, robj *message, int sharded) {
}

/* Publish messages to all the subscribers. */
int pubsubPublishMessages(robj *channel, robj **messages, int count, int sharded) {
int pubsubPublishMultiMessages(robj *channel, robj **messages, int count, int sharded) {
int total_receivers = 0;
for (int i = 0; i < count; i++) {
total_receivers += pubsubPublishMessage(channel, messages[i], sharded);
Expand Down Expand Up @@ -612,14 +612,14 @@ void punsubscribeCommand(client *c) {

/* This function wraps pubsubPublishMessage and also propagates the message to cluster.
* Used by the commands PUBLISH/SPUBLISH and their respective module APIs.*/
int pubsubPublishMessagesAndPropagateToCluster(robj *channel, robj **messages, int count, int sharded) {
int receivers = pubsubPublishMessages(channel, messages, count, sharded);
int pubsubPublishMultiMessagesAndPropagateToCluster(robj *channel, robj **messages, int count, int sharded) {
int receivers = pubsubPublishMultiMessages(channel, messages, count, sharded);
if (server.cluster_enabled) clusterPropagatePublish(channel, messages, count, sharded);
return receivers;
}

int pubsubPublishMessageAndPropagateToCluster(robj *channel, robj *message, int sharded) {
return pubsubPublishMessagesAndPropagateToCluster(channel, &message, 1, sharded);
return pubsubPublishMultiMessagesAndPropagateToCluster(channel, &message, 1, sharded);
}

/* PUBLISH <channel> <message> */
Expand All @@ -629,7 +629,7 @@ void publishCommand(client *c) {
return;
}

int receivers = pubsubPublishMessagesAndPropagateToCluster(c->argv[1], &c->argv[2], 1, 0);
int receivers = pubsubPublishMultiMessagesAndPropagateToCluster(c->argv[1], &c->argv[2], 1, 0);
if (!server.cluster_enabled) forceCommandPropagation(c, PROPAGATE_REPL);
addReplyLongLong(c, receivers);
}
Expand Down Expand Up @@ -715,7 +715,7 @@ void channelList(client *c, sds pat, kvstore *pubsub_channels) {

/* SPUBLISH <shardchannel> <message> */
void spublishCommand(client *c) {
int receivers = pubsubPublishMessagesAndPropagateToCluster(c->argv[1], &c->argv[2], 1, 1);
int receivers = pubsubPublishMultiMessagesAndPropagateToCluster(c->argv[1], &c->argv[2], 1, 1);
if (!server.cluster_enabled) forceCommandPropagation(c, PROPAGATE_REPL);
addReplyLongLong(c, receivers);
}
Expand Down
4 changes: 2 additions & 2 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -3289,9 +3289,9 @@ int pubsubUnsubscribeShardAllChannels(client *c, int notify);
void pubsubShardUnsubscribeAllChannelsInSlot(unsigned int slot);
int pubsubUnsubscribeAllPatterns(client *c, int notify);
int pubsubPublishMessage(robj *channel, robj *message, int sharded);
int pubsubPublishMessages(robj *channel, robj **messages, int count, int sharded);
int pubsubPublishMultiMessages(robj *channel, robj **messages, int count, int sharded);
int pubsubPublishMessageAndPropagateToCluster(robj *channel, robj *message, int sharded);
int pubsubPublishMessagesAndPropagateToCluster(robj *channel, robj **messages, int count, int sharded);
int pubsubPublishMultiMessagesAndPropagateToCluster(robj *channel, robj **messages, int count, int sharded);
void addReplyPubsubMessage(client *c, robj *channel, robj *msg, robj *message_bulk);
int serverPubsubSubscriptionCount(void);
int serverPubsubShardSubscriptionCount(void);
Expand Down

0 comments on commit f0db157

Please sign in to comment.