From c120a4587494989454057a9f3c96526f4b82f237 Mon Sep 17 00:00:00 2001 From: Harkrishn Patro Date: Wed, 27 Mar 2024 21:29:44 -0700 Subject: [PATCH] Sharded pubsub command execution within multi/exec (#13) Allow SPUBLISH command within multi/exec on replica. --- src/cluster.c | 15 +++---- tests/unit/cluster/sharded-pubsub.tcl | 56 +++++++++++++++++++++++++++ 2 files changed, 64 insertions(+), 7 deletions(-) create mode 100644 tests/unit/cluster/sharded-pubsub.tcl diff --git a/src/cluster.c b/src/cluster.c index 2a9d33a57d..60f5cc3b4e 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -1030,9 +1030,11 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in mc.cmd = cmd; } - int is_pubsubshard = cmd->proc == ssubscribeCommand || - cmd->proc == sunsubscribeCommand || - cmd->proc == spublishCommand; + uint64_t cmd_flags = getCommandFlags(c); + + /* Only valid for sharded pubsub as regular pubsub can operate on any node and bypasses this layer. */ + int pubsubshard_included = (cmd_flags & CMD_PUBSUB) || + (c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_PUBSUB)); /* Check that all the keys are in the same hash slot, and obtain this * slot and the node associated. */ @@ -1109,7 +1111,7 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in * node until the migration completes with CLUSTER SETSLOT * NODE . */ int flags = LOOKUP_NOTOUCH | LOOKUP_NOSTATS | LOOKUP_NONOTIFY | LOOKUP_NOEXPIRE; - if ((migrating_slot || importing_slot) && !is_pubsubshard) + if ((migrating_slot || importing_slot) && !pubsubshard_included) { if (lookupKeyReadWithFlags(&server.db[0], thiskey, flags) == NULL) missing_keys++; else existing_keys++; @@ -1122,11 +1124,10 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in * without redirections or errors in all the cases. */ if (n == NULL) return myself; - uint64_t cmd_flags = getCommandFlags(c); /* Cluster is globally down but we got keys? We only serve the request * if it is a read command and when allow_reads_when_down is enabled. */ if (!isClusterHealthy()) { - if (is_pubsubshard) { + if (pubsubshard_included) { if (!server.cluster_allow_pubsubshard_when_down) { if (error_code) *error_code = CLUSTER_REDIR_DOWN_STATE; return NULL; @@ -1189,7 +1190,7 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in * is serving, we can reply without redirection. */ int is_write_command = (cmd_flags & CMD_WRITE) || (c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_WRITE)); - if (((c->flags & CLIENT_READONLY) || is_pubsubshard) && + if (((c->flags & CLIENT_READONLY) || pubsubshard_included) && !is_write_command && clusterNodeIsSlave(myself) && clusterNodeGetSlaveof(myself) == n) diff --git a/tests/unit/cluster/sharded-pubsub.tcl b/tests/unit/cluster/sharded-pubsub.tcl new file mode 100644 index 0000000000..b5b19ff481 --- /dev/null +++ b/tests/unit/cluster/sharded-pubsub.tcl @@ -0,0 +1,56 @@ +start_cluster 1 1 {tags {external:skip cluster}} { + set primary_id 0 + set replica1_id 1 + + set primary [Rn $primary_id] + set replica [Rn $replica1_id] + + test "Sharded pubsub publish behavior within multi/exec" { + foreach {node} {primary replica} { + set node [set $node] + $node MULTI + $node SPUBLISH ch1 "hello" + $node EXEC + } + } + + test "Sharded pubsub within multi/exec with cross slot operation" { + $primary MULTI + $primary SPUBLISH ch1 "hello" + $primary GET foo + catch {[$primary EXEC]} err + assert_match {CROSSSLOT*} $err + } + + test "Sharded pubsub publish behavior within multi/exec with read operation on primary" { + $primary MULTI + $primary SPUBLISH foo "hello" + $primary GET foo + $primary EXEC + } {0 {}} + + test "Sharded pubsub publish behavior within multi/exec with read operation on replica" { + $replica MULTI + $replica SPUBLISH foo "hello" + catch {[$replica GET foo]} err + assert_match {MOVED*} $err + catch {[$replica EXEC]} err + assert_match {EXECABORT*} $err + } + + test "Sharded pubsub publish behavior within multi/exec with write operation on primary" { + $primary MULTI + $primary SPUBLISH foo "hello" + $primary SET foo bar + $primary EXEC + } {0 OK} + + test "Sharded pubsub publish behavior within multi/exec with write operation on replica" { + $replica MULTI + $replica SPUBLISH foo "hello" + catch {[$replica SET foo bar]} err + assert_match {MOVED*} $err + catch {[$replica EXEC]} err + assert_match {EXECABORT*} $err + } +} \ No newline at end of file