Skip to content

Commit

Permalink
Handle empty addresses in CLUSTER NODES responses (#148)
Browse files Browse the repository at this point in the history
When parsing a `CLUSTER NODES` response we will now treat an empty IP
string as it means the same endpoint as the response came from. This due
to that the docs mention that an empty string is returned for the IP
field when the node doesn't know its own IP address.
The created `valkeyClusterNode` will get its address from the
`valkeyContext` used when sending the command.

Additional changes are that we will use the `noaddr` flag instead of the
address length when deciding when a node should be skipped; and added
validation of the port.

Signed-off-by: Björn Svensson <[email protected]>
  • Loading branch information
bjosv authored Jan 15, 2025
1 parent cb617d7 commit c793fa1
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 41 deletions.
82 changes: 54 additions & 28 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -748,12 +748,16 @@ static int store_replica_nodes(dict *nodes, dict *replicas) {
return VALKEY_OK;
}

/* Parse a node from a single CLUSTER NODES line. Returns an allocated
* valkeyClusterNode as a pointer in `parsed_node`.
/* Parse a node from a single CLUSTER NODES line.
* Returns VALKEY_OK and an allocated valkeyClusterNode as a pointer in
* `parsed_node`, or VALKEY_ERR when the parsing fails.
* Only parse primary nodes if the `parsed_primary_id` argument is NULL,
* otherwise replicas are also parsed and its primary_id is returned by pointer
* via 'parsed_primary_id'. */
static int parse_cluster_nodes_line(valkeyClusterContext *cc, char *line,
* via 'parsed_primary_id'.
* The valkeyContext used when sending the CLUSTER NODES command should be
* provided in `c` since its destination IP address is used when no IP address
* is found in the parsed string. */
static int parse_cluster_nodes_line(valkeyClusterContext *cc, valkeyContext *c, char *line,
valkeyClusterNode **parsed_node, char **parsed_primary_id) {
char *p, *id = NULL, *addr = NULL, *flags = NULL, *primary_id = NULL,
*link_state = NULL, *slots = NULL;
Expand Down Expand Up @@ -788,13 +792,13 @@ static int parse_cluster_nodes_line(valkeyClusterContext *cc, char *line,
while (*flags != '\0') {
if ((p = strchr(flags, ',')) != NULL)
*p = '\0';
if (memcmp(flags, "master", 6) == 0) {
if (memcmp(flags, "master", 6) == 0)
role = VALKEY_ROLE_PRIMARY;
break;
}
if (memcmp(flags, "slave", 5) == 0) {
else if (memcmp(flags, "slave", 5) == 0)
role = VALKEY_ROLE_REPLICA;
break;
else if (memcmp(flags, "noaddr", 6) == 0) {
*parsed_node = NULL;
return VALKEY_OK; /* Skip nodes with 'noaddr'. */
}
if (p == NULL) /* No more flags. */
break;
Expand Down Expand Up @@ -825,31 +829,53 @@ static int parse_cluster_nodes_line(valkeyClusterContext *cc, char *line,
if ((p = strchr(addr, PORT_CPORT_SEPARATOR)) != NULL) {
*p = '\0';
}
node->addr = sdsnew(addr);
if (node->addr == NULL)
goto oom;

/* Get the host part */
/* Find the required port separator. */
if ((p = strrchr(addr, IP_PORT_SEPARATOR)) == NULL) {
valkeyClusterSetError(cc, VALKEY_ERR_OTHER, "Invalid node address");
freeValkeyClusterNode(node);
return VALKEY_ERR;
}
*p = '\0';

/* Skip nodes where address starts with ":0", i.e. 'noaddr'. */
if (strlen(addr) == 0) {
/* Get the port (skip the found port separator). */
int port = vk_atoi(p + 1, strlen(p + 1));
if (port < 1 || port > UINT16_MAX) {
valkeyClusterSetError(cc, VALKEY_ERR_OTHER, "Invalid port");
freeValkeyClusterNode(node);
*parsed_node = NULL;
return VALKEY_OK;
return VALKEY_ERR;
}
node->host = sdsnew(addr);
if (node->host == NULL)
goto oom;
node->port = port;

/* Get the port. */
p++; // Skip separator character.
node->port = vk_atoi(p, strlen(p));
/* Check that we received an ip/host address, i.e. the field
* does not start with the port separator. */
if (p != addr) {
node->addr = sdsnew(addr);
if (node->addr == NULL)
goto oom;

*p = '\0'; /* Cut port separator. */

node->host = sdsnew(addr);
if (node->host == NULL)
goto oom;

} else {
/* We received an ip/host that is an empty string. According to the docs
* we can treat this as it means the same address we sent this command to. */
node->host = sdsnew(c->tcp.host);
if (node->host == NULL) {
goto oom;
}
/* Create a new addr field using correct host:port */
node->addr = sdsnew(node->host);
if (node->addr == NULL) {
goto oom;
}
node->addr = sdscatfmt(node->addr, ":%i", node->port);
if (node->addr == NULL) {
goto oom;
}
}

/* No slot parsing needed for replicas, but return primary id. */
if (node->role == VALKEY_ROLE_REPLICA) {
Expand Down Expand Up @@ -911,7 +937,7 @@ static int parse_cluster_nodes_line(valkeyClusterContext *cc, char *line,
/**
* Parse the "cluster nodes" command reply to nodes dict.
*/
static dict *parse_cluster_nodes(valkeyClusterContext *cc, valkeyReply *reply) {
static dict *parse_cluster_nodes(valkeyClusterContext *cc, valkeyContext *c, valkeyReply *reply) {
dict *nodes = NULL;
int slot_ranges_found = 0;
int add_replicas = cc->flags & VALKEYCLUSTER_FLAG_ADD_SLAVE;
Expand All @@ -936,7 +962,7 @@ static dict *parse_cluster_nodes(valkeyClusterContext *cc, valkeyReply *reply) {

char *primary_id;
valkeyClusterNode *node;
if (parse_cluster_nodes_line(cc, line, &node, add_replicas ? &primary_id : NULL) != VALKEY_OK)
if (parse_cluster_nodes_line(cc, c, line, &node, add_replicas ? &primary_id : NULL) != VALKEY_OK)
goto error;
if (node == NULL)
continue; /* Line skipped. */
Expand Down Expand Up @@ -1037,7 +1063,7 @@ static int clusterUpdateRouteHandleReply(valkeyClusterContext *cc,
if (cc->flags & VALKEYCLUSTER_FLAG_ROUTE_USE_SLOTS) {
nodes = parse_cluster_slots(cc, reply);
} else {
nodes = parse_cluster_nodes(cc, reply);
nodes = parse_cluster_nodes(cc, c, reply);
}
freeReplyObject(reply);
return updateNodesAndSlotmap(cc, nodes);
Expand Down Expand Up @@ -2896,7 +2922,7 @@ void clusterNodesReplyCallback(valkeyAsyncContext *ac, void *r,
}

valkeyClusterContext *cc = acc->cc;
dict *nodes = parse_cluster_nodes(cc, reply);
dict *nodes = parse_cluster_nodes(cc, &ac->c, reply);
if (updateNodesAndSlotmap(cc, nodes) != VALKEY_OK) {
/* Ignore failures for now */
}
Expand Down
85 changes: 72 additions & 13 deletions tests/ut_slotmap_update.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ const char *__asan_default_options(void) {
}
#endif

/* Includes source file to test static functions. */
/* Includes source files to test static functions. */
#include "cluster.c"
#include "valkey.c"

#include <stdbool.h>

Expand All @@ -37,6 +38,7 @@ valkeyReply *create_cluster_nodes_reply(const char *bulkstr) {
/* Parse a cluster nodes reply from a basic deployment. */
void test_parse_cluster_nodes(bool parse_replicas) {
valkeyClusterContext *cc = valkeyClusterContextInit();
valkeyContext *c = valkeyContextInit();
valkeyClusterNode *node;
cluster_slot *slot;
dictIterator di;
Expand All @@ -51,7 +53,7 @@ void test_parse_cluster_nodes(bool parse_replicas) {
"6ec23923021cf3ffec47632106199cb7f496ce01 127.0.0.1:30005@31005,hostname5 slave 67ed2db8d677e59ec4a4cefb06858cf2a1a89fa1 0 1426238316232 5 connected\n"
"824fe116063bc5fcf9f4ffd895bc17aee7731ac3 127.0.0.1:30006@31006,hostname6 slave 292f8b365bb7edb5e285caf0b7e6ddc7265d2f4f 0 1426238317741 6 connected\n"
"e7d1eecce10fd6bb5eb35b9f99a514335d9ba9ca 127.0.0.1:30001@31001,hostname1 myself,master - 0 0 1 connected 0-5460\n");
dict *nodes = parse_cluster_nodes(cc, reply);
dict *nodes = parse_cluster_nodes(cc, c, reply);
freeReplyObject(reply);

assert(nodes);
Expand Down Expand Up @@ -116,11 +118,13 @@ void test_parse_cluster_nodes(bool parse_replicas) {
}

dictRelease(nodes);
valkeyFree(c);
valkeyClusterFree(cc);
}

void test_parse_cluster_nodes_during_failover(void) {
valkeyClusterContext *cc = valkeyClusterContextInit();
valkeyContext *c = valkeyContextInit();
valkeyClusterNode *node;
cluster_slot *slot;
dictIterator di;
Expand All @@ -133,7 +137,7 @@ void test_parse_cluster_nodes_during_failover(void) {
"ad0f5210dda1736a1b5467cd6e797f011a192097 10.10.10.125:7000@17000 slave 4394d8eb03de1f524b56cb385f0eb9052ce65283 0 1625255656366 1 connected\n"
"8675cd30fdd4efa088634e50fbd5c0675238a35e 10.10.10.124:7000@17000 slave 22de56650b3714c1c42fc0d120f80c66c24d8795 0 1625255655360 3 connected\n"
"4394d8eb03de1f524b56cb385f0eb9052ce65283 10.10.10.121:7000@17000 myself,master - 0 1625255653000 1 connected 0-5460\n");
dict *nodes = parse_cluster_nodes(cc, reply);
dict *nodes = parse_cluster_nodes(cc, c, reply);
freeReplyObject(reply);

assert(nodes);
Expand Down Expand Up @@ -178,24 +182,26 @@ void test_parse_cluster_nodes_during_failover(void) {
assert(slot->end == 5460);

dictRelease(nodes);
valkeyFree(c);
valkeyClusterFree(cc);
}

/* Skip nodes with no address, i.e with address :0 */
/* Skip nodes with the `noaddr` flag. */
void test_parse_cluster_nodes_with_noaddr(void) {
valkeyClusterContext *cc = valkeyClusterContextInit();
valkeyContext *c = valkeyContextInit();
valkeyClusterNode *node;
dictIterator di;

valkeyReply *reply = create_cluster_nodes_reply(
"752d150249c157c7cb312b6b056517bbbecb42d2 :0@0 master,noaddr - 1658754833817 1658754833000 3 disconnected 5461-10922\n"
"e839a12fbed631de867016f636d773e644562e72 127.0.0.0:6379@16379 myself,master - 0 1658755601000 1 connected 0-5460\n"
"87f785c4a51f58c06e4be55de8c112210a811db9 127.0.0.2:6379@16379 master - 0 1658755602418 3 connected 10923-16383\n");
dict *nodes = parse_cluster_nodes(cc, reply);
dict *nodes = parse_cluster_nodes(cc, c, reply);
freeReplyObject(reply);

assert(nodes);
assert(dictSize(nodes) == 2); /* Only 2 masters since ":0" is skipped. */
assert(dictSize(nodes) == 2); /* Only 2 primaries since `noaddr` nodes are skipped. */
dictInitIterator(&di, nodes);
/* Verify node 1 */
node = dictGetEntryVal(dictNext(&di));
Expand All @@ -205,12 +211,48 @@ void test_parse_cluster_nodes_with_noaddr(void) {
assert(strcmp(node->addr, "127.0.0.2:6379") == 0);

dictRelease(nodes);
valkeyFree(c);
valkeyClusterFree(cc);
}

void test_parse_cluster_nodes_with_empty_ip(void) {
valkeyClusterContext *cc = valkeyClusterContextInit();
valkeyClusterNode *node;
dictIterator di;

/* Set the IP from which the response is received from. */
valkeyContext *c = valkeyContextInit();
c->tcp.host = strdup("127.0.0.99");

valkeyReply *reply = create_cluster_nodes_reply(
"752d150249c157c7cb312b6b056517bbbecb42d2 :6379@16379 myself,master - 0 0 0 connected 5461-10922\n"
"e839a12fbed631de867016f636d773e644562e72 127.0.0.1:6379@16379 master - 0 1658755601000 1 connected 0-5460\n"
"87f785c4a51f58c06e4be55de8c112210a811db9 127.0.0.2:6379@16379 master - 0 1658755602418 3 connected 10923-16383\n");
dict *nodes = parse_cluster_nodes(cc, c, reply);
freeReplyObject(reply);

assert(nodes);
assert(dictSize(nodes) == 3);
dictInitIterator(&di, nodes);
/* Verify node 1 */
node = dictGetEntryVal(dictNext(&di));
assert(strcmp(node->addr, "127.0.0.1:6379") == 0);
/* Verify node 2 */
node = dictGetEntryVal(dictNext(&di));
assert(strcmp(node->addr, "127.0.0.2:6379") == 0);
/* Verify node 3 */
node = dictGetEntryVal(dictNext(&di));
assert(strcmp(node->addr, "127.0.0.99:6379") == 0); /* Uses the IP from which the response was received from. */

dictRelease(nodes);
valkeyFree(c);
valkeyClusterFree(cc);
}

/* Parse replies with additional importing and migrating information. */
void test_parse_cluster_nodes_with_special_slot_entries(void) {
valkeyClusterContext *cc = valkeyClusterContextInit();
valkeyContext *c = valkeyContextInit();
valkeyClusterNode *node;
cluster_slot *slot;
dictIterator di;
Expand All @@ -220,7 +262,7 @@ void test_parse_cluster_nodes_with_special_slot_entries(void) {
* importing slot information that will be ignored. */
valkeyReply *reply = create_cluster_nodes_reply(
"4394d8eb03de1f524b56cb385f0eb9052ce65283 10.10.10.121:7000@17000 myself,master - 0 1625255653000 1 connected 0 2-5460 [0->-e7d1eecce10fd6bb5eb35b9f99a514335d9ba9ca] [1-<-292f8b365bb7edb5e285caf0b7e6ddc7265d2f4f]\n");
dict *nodes = parse_cluster_nodes(cc, reply);
dict *nodes = parse_cluster_nodes(cc, c, reply);
freeReplyObject(reply);

assert(nodes);
Expand All @@ -243,12 +285,14 @@ void test_parse_cluster_nodes_with_special_slot_entries(void) {
assert(slot->end == 5460);

dictRelease(nodes);
valkeyFree(c);
valkeyClusterFree(cc);
}

/* Parse a cluster nodes reply containing a primary with multiple replicas. */
void test_parse_cluster_nodes_with_multiple_replicas(void) {
valkeyClusterContext *cc = valkeyClusterContextInit();
valkeyContext *c = valkeyContextInit();
valkeyClusterNode *node;
cluster_slot *slot;
dictIterator di;
Expand All @@ -263,7 +307,7 @@ void test_parse_cluster_nodes_with_multiple_replicas(void) {
"e7d1eecce10fd6bb5eb35b9f99a514335d9ba9ca 127.0.0.1:30001@31001,hostname1 myself,master - 0 0 1 connected 0-16383\n"
"67ed2db8d677e59ec4a4cefb06858cf2a1a89fa1 127.0.0.1:30002@31002,hostname2 slave e7d1eecce10fd6bb5eb35b9f99a514335d9ba9ca 0 1426238316232 2 connected\n"
"292f8b365bb7edb5e285caf0b7e6ddc7265d2f4f 127.0.0.1:30003@31003,hostname3 slave e7d1eecce10fd6bb5eb35b9f99a514335d9ba9ca 0 1426238318243 3 connected\n");
dict *nodes = parse_cluster_nodes(cc, reply);
dict *nodes = parse_cluster_nodes(cc, c, reply);
freeReplyObject(reply);

/* Verify master. */
Expand Down Expand Up @@ -306,19 +350,21 @@ void test_parse_cluster_nodes_with_multiple_replicas(void) {
assert(node->role == VALKEY_ROLE_REPLICA);

dictRelease(nodes);
valkeyFree(c);
valkeyClusterFree(cc);
}

/* Give error when parsing erroneous data. */
void test_parse_cluster_nodes_with_parse_error(void) {
valkeyClusterContext *cc = valkeyClusterContextInit();
valkeyContext *c = valkeyContextInit();
valkeyReply *reply;
dict *nodes;

/* Missing link-state (and slots). */
reply = create_cluster_nodes_reply(
"e839a12fbed631de867016f636d773e644562e72 127.0.0.0:30001@31001 myself,master - 0 1658755601000 1 \n");
nodes = parse_cluster_nodes(cc, reply);
nodes = parse_cluster_nodes(cc, c, reply);
freeReplyObject(reply);
assert(nodes == NULL);
assert(cc->err == VALKEY_ERR_OTHER);
Expand All @@ -327,7 +373,7 @@ void test_parse_cluster_nodes_with_parse_error(void) {
/* Missing port. */
reply = create_cluster_nodes_reply(
"e839a12fbed631de867016f636d773e644562e72 127.0.0.0@31001 myself,master - 0 1658755601000 1 connected 0-5460\n");
nodes = parse_cluster_nodes(cc, reply);
nodes = parse_cluster_nodes(cc, c, reply);
freeReplyObject(reply);
assert(nodes == NULL);
assert(cc->err == VALKEY_ERR_OTHER);
Expand All @@ -336,35 +382,47 @@ void test_parse_cluster_nodes_with_parse_error(void) {
/* Missing port and cport. */
reply = create_cluster_nodes_reply(
"e839a12fbed631de867016f636d773e644562e72 127.0.0.0 myself,master - 0 1658755601000 1 connected 0-5460\n");
nodes = parse_cluster_nodes(cc, reply);
nodes = parse_cluster_nodes(cc, c, reply);
freeReplyObject(reply);
assert(nodes == NULL);
assert(cc->err == VALKEY_ERR_OTHER);
valkeyClusterClearError(cc);

/* Invalid port. */
reply = create_cluster_nodes_reply(
"e839a12fbed631de867016f636d773e644562e72 127.0.0.0:66000@67000 myself,master - 0 1658755601000 1 connected 0-5460\n");
nodes = parse_cluster_nodes(cc, c, reply);
freeReplyObject(reply);
assert(nodes == NULL);
assert(cc->err == VALKEY_ERR_OTHER);
valkeyClusterClearError(cc);

valkeyFree(c);
valkeyClusterFree(cc);
}

/* Redis pre-v4.0 returned node addresses without the clusterbus port,
* i.e. `ip:port` instead of `ip:port@cport` */
void test_parse_cluster_nodes_with_legacy_format(void) {
valkeyClusterContext *cc = valkeyClusterContextInit();
valkeyContext *c = valkeyContextInit();
valkeyClusterNode *node;
dictIterator di;

valkeyReply *reply = create_cluster_nodes_reply(
"e839a12fbed631de867016f636d773e644562e72 127.0.0.0:6379 myself,master - 0 1658755601000 1 connected 0-5460\n"
"752d150249c157c7cb312b6b056517bbbecb42d2 :0 master,noaddr - 1658754833817 1658754833000 3 disconnected 5461-10922\n");
dict *nodes = parse_cluster_nodes(cc, reply);
dict *nodes = parse_cluster_nodes(cc, c, reply);
freeReplyObject(reply);

assert(nodes);
assert(dictSize(nodes) == 1); /* Only 1 master since ":0" is skipped. */
assert(dictSize(nodes) == 1); /* Only 1 primary since `noaddr` nodes are skipped. */
dictInitIterator(&di, nodes);
node = dictGetEntryVal(dictNext(&di));
assert(strcmp(node->addr, "127.0.0.0:6379") == 0);

dictRelease(nodes);
valkeyFree(c);
valkeyClusterFree(cc);
}

Expand All @@ -373,6 +431,7 @@ int main(void) {
test_parse_cluster_nodes(true /* replicas parsed */);
test_parse_cluster_nodes_during_failover();
test_parse_cluster_nodes_with_noaddr();
test_parse_cluster_nodes_with_empty_ip();
test_parse_cluster_nodes_with_special_slot_entries();
test_parse_cluster_nodes_with_multiple_replicas();
test_parse_cluster_nodes_with_parse_error();
Expand Down

0 comments on commit c793fa1

Please sign in to comment.