Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle empty addresses in CLUSTER NODES responses #148

Merged
merged 4 commits into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 47 additions & 25 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -753,7 +753,7 @@ static int store_replica_nodes(dict *nodes, dict *replicas) {
* Only parse primary nodes if the `parsed_primary_id` argument is NULL,
* otherwise replicas are also parsed and its primary_id is returned by pointer
* via 'parsed_primary_id'. */
static int parse_cluster_nodes_line(valkeyClusterContext *cc, char *line,
static int parse_cluster_nodes_line(valkeyClusterContext *cc, valkeyContext *c, char *line,
bjosv marked this conversation as resolved.
Show resolved Hide resolved
valkeyClusterNode **parsed_node, char **parsed_primary_id) {
char *p, *id = NULL, *addr = NULL, *flags = NULL, *primary_id = NULL,
*link_state = NULL, *slots = NULL;
Expand Down Expand Up @@ -788,13 +788,13 @@ static int parse_cluster_nodes_line(valkeyClusterContext *cc, char *line,
while (*flags != '\0') {
if ((p = strchr(flags, ',')) != NULL)
*p = '\0';
if (memcmp(flags, "master", 6) == 0) {
if (memcmp(flags, "master", 6) == 0)
role = VALKEY_ROLE_PRIMARY;
break;
}
if (memcmp(flags, "slave", 5) == 0) {
else if (memcmp(flags, "slave", 5) == 0)
role = VALKEY_ROLE_REPLICA;
break;
else if (memcmp(flags, "noaddr", 6) == 0) {
*parsed_node = NULL;
return VALKEY_OK; /* Skip nodes with 'noaddr'. */
}
if (p == NULL) /* No more flags. */
break;
Expand Down Expand Up @@ -825,31 +825,53 @@ static int parse_cluster_nodes_line(valkeyClusterContext *cc, char *line,
if ((p = strchr(addr, PORT_CPORT_SEPARATOR)) != NULL) {
*p = '\0';
}
node->addr = sdsnew(addr);
if (node->addr == NULL)
goto oom;

/* Get the host part */
/* Find the required port separator. */
if ((p = strrchr(addr, IP_PORT_SEPARATOR)) == NULL) {
valkeyClusterSetError(cc, VALKEY_ERR_OTHER, "Invalid node address");
freeValkeyClusterNode(node);
return VALKEY_ERR;
}
*p = '\0';

/* Skip nodes where address starts with ":0", i.e. 'noaddr'. */
if (strlen(addr) == 0) {
/* Get the port (skip the found port separator). */
int port = vk_atoi(p + 1, strlen(p + 1));
if (port < 1 || port > UINT16_MAX) {
valkeyClusterSetError(cc, VALKEY_ERR_OTHER, "Invalid port");
freeValkeyClusterNode(node);
*parsed_node = NULL;
return VALKEY_OK;
return VALKEY_ERR;
}
node->host = sdsnew(addr);
if (node->host == NULL)
goto oom;
node->port = port;

/* Get the port. */
p++; // Skip separator character.
node->port = vk_atoi(p, strlen(p));
/* Check that we received an ip/host address, i.e. the field
* does not start with the port separator. */
if (p != addr) {
node->addr = sdsnew(addr);
if (node->addr == NULL)
goto oom;

*p = '\0'; /* Cut port separator. */

node->host = sdsnew(addr);
if (node->host == NULL)
goto oom;

} else {
/* We received an ip/host that is an empty string. According to the docs
* we can treat this as it means the same address we sent this command to. */
node->host = sdsnew(c->tcp.host);
if (node->host == NULL) {
goto oom;
}
/* Create a new addr field using correct host:port */
node->addr = sdsnew(node->host);
if (node->addr == NULL) {
goto oom;
}
node->addr = sdscatfmt(node->addr, ":%i", node->port);
if (node->addr == NULL) {
goto oom;
}
}

/* No slot parsing needed for replicas, but return primary id. */
if (node->role == VALKEY_ROLE_REPLICA) {
Expand Down Expand Up @@ -911,7 +933,7 @@ static int parse_cluster_nodes_line(valkeyClusterContext *cc, char *line,
/**
* Parse the "cluster nodes" command reply to nodes dict.
*/
static dict *parse_cluster_nodes(valkeyClusterContext *cc, valkeyReply *reply) {
static dict *parse_cluster_nodes(valkeyClusterContext *cc, valkeyContext *c, valkeyReply *reply) {
dict *nodes = NULL;
int slot_ranges_found = 0;
int add_replicas = cc->flags & VALKEYCLUSTER_FLAG_ADD_SLAVE;
Expand All @@ -936,7 +958,7 @@ static dict *parse_cluster_nodes(valkeyClusterContext *cc, valkeyReply *reply) {

char *primary_id;
valkeyClusterNode *node;
if (parse_cluster_nodes_line(cc, line, &node, add_replicas ? &primary_id : NULL) != VALKEY_OK)
if (parse_cluster_nodes_line(cc, c, line, &node, add_replicas ? &primary_id : NULL) != VALKEY_OK)
goto error;
if (node == NULL)
continue; /* Line skipped. */
Expand Down Expand Up @@ -1037,7 +1059,7 @@ static int clusterUpdateRouteHandleReply(valkeyClusterContext *cc,
if (cc->flags & VALKEYCLUSTER_FLAG_ROUTE_USE_SLOTS) {
nodes = parse_cluster_slots(cc, reply);
} else {
nodes = parse_cluster_nodes(cc, reply);
nodes = parse_cluster_nodes(cc, c, reply);
}
freeReplyObject(reply);
return updateNodesAndSlotmap(cc, nodes);
Expand Down Expand Up @@ -2896,7 +2918,7 @@ void clusterNodesReplyCallback(valkeyAsyncContext *ac, void *r,
}

valkeyClusterContext *cc = acc->cc;
dict *nodes = parse_cluster_nodes(cc, reply);
dict *nodes = parse_cluster_nodes(cc, &ac->c, reply);
if (updateNodesAndSlotmap(cc, nodes) != VALKEY_OK) {
/* Ignore failures for now */
}
Expand Down
85 changes: 72 additions & 13 deletions tests/ut_slotmap_update.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ const char *__asan_default_options(void) {
}
#endif

/* Includes source file to test static functions. */
/* Includes source files to test static functions. */
#include "cluster.c"
#include "valkey.c"

#include <stdbool.h>

Expand All @@ -37,6 +38,7 @@ valkeyReply *create_cluster_nodes_reply(const char *bulkstr) {
/* Parse a cluster nodes reply from a basic deployment. */
void test_parse_cluster_nodes(bool parse_replicas) {
valkeyClusterContext *cc = valkeyClusterContextInit();
valkeyContext *c = valkeyContextInit();
valkeyClusterNode *node;
cluster_slot *slot;
dictIterator di;
Expand All @@ -51,7 +53,7 @@ void test_parse_cluster_nodes(bool parse_replicas) {
"6ec23923021cf3ffec47632106199cb7f496ce01 127.0.0.1:30005@31005,hostname5 slave 67ed2db8d677e59ec4a4cefb06858cf2a1a89fa1 0 1426238316232 5 connected\n"
"824fe116063bc5fcf9f4ffd895bc17aee7731ac3 127.0.0.1:30006@31006,hostname6 slave 292f8b365bb7edb5e285caf0b7e6ddc7265d2f4f 0 1426238317741 6 connected\n"
"e7d1eecce10fd6bb5eb35b9f99a514335d9ba9ca 127.0.0.1:30001@31001,hostname1 myself,master - 0 0 1 connected 0-5460\n");
dict *nodes = parse_cluster_nodes(cc, reply);
dict *nodes = parse_cluster_nodes(cc, c, reply);
freeReplyObject(reply);

assert(nodes);
Expand Down Expand Up @@ -116,11 +118,13 @@ void test_parse_cluster_nodes(bool parse_replicas) {
}

dictRelease(nodes);
valkeyFree(c);
valkeyClusterFree(cc);
}

void test_parse_cluster_nodes_during_failover(void) {
valkeyClusterContext *cc = valkeyClusterContextInit();
valkeyContext *c = valkeyContextInit();
valkeyClusterNode *node;
cluster_slot *slot;
dictIterator di;
Expand All @@ -133,7 +137,7 @@ void test_parse_cluster_nodes_during_failover(void) {
"ad0f5210dda1736a1b5467cd6e797f011a192097 10.10.10.125:7000@17000 slave 4394d8eb03de1f524b56cb385f0eb9052ce65283 0 1625255656366 1 connected\n"
"8675cd30fdd4efa088634e50fbd5c0675238a35e 10.10.10.124:7000@17000 slave 22de56650b3714c1c42fc0d120f80c66c24d8795 0 1625255655360 3 connected\n"
"4394d8eb03de1f524b56cb385f0eb9052ce65283 10.10.10.121:7000@17000 myself,master - 0 1625255653000 1 connected 0-5460\n");
dict *nodes = parse_cluster_nodes(cc, reply);
dict *nodes = parse_cluster_nodes(cc, c, reply);
freeReplyObject(reply);

assert(nodes);
Expand Down Expand Up @@ -178,24 +182,26 @@ void test_parse_cluster_nodes_during_failover(void) {
assert(slot->end == 5460);

dictRelease(nodes);
valkeyFree(c);
valkeyClusterFree(cc);
}

/* Skip nodes with no address, i.e with address :0 */
/* Skip nodes with the `noaddr` flag. */
void test_parse_cluster_nodes_with_noaddr(void) {
valkeyClusterContext *cc = valkeyClusterContextInit();
valkeyContext *c = valkeyContextInit();
valkeyClusterNode *node;
dictIterator di;

valkeyReply *reply = create_cluster_nodes_reply(
"752d150249c157c7cb312b6b056517bbbecb42d2 :0@0 master,noaddr - 1658754833817 1658754833000 3 disconnected 5461-10922\n"
"e839a12fbed631de867016f636d773e644562e72 127.0.0.0:6379@16379 myself,master - 0 1658755601000 1 connected 0-5460\n"
"87f785c4a51f58c06e4be55de8c112210a811db9 127.0.0.2:6379@16379 master - 0 1658755602418 3 connected 10923-16383\n");
dict *nodes = parse_cluster_nodes(cc, reply);
dict *nodes = parse_cluster_nodes(cc, c, reply);
freeReplyObject(reply);

assert(nodes);
assert(dictSize(nodes) == 2); /* Only 2 masters since ":0" is skipped. */
assert(dictSize(nodes) == 2); /* Only 2 primaries since `noaddr` nodes are skipped. */
dictInitIterator(&di, nodes);
/* Verify node 1 */
node = dictGetEntryVal(dictNext(&di));
Expand All @@ -205,12 +211,48 @@ void test_parse_cluster_nodes_with_noaddr(void) {
assert(strcmp(node->addr, "127.0.0.2:6379") == 0);

dictRelease(nodes);
valkeyFree(c);
valkeyClusterFree(cc);
}

void test_parse_cluster_nodes_with_empty_ip(void) {
valkeyClusterContext *cc = valkeyClusterContextInit();
valkeyClusterNode *node;
dictIterator di;

/* Set the IP from which the response is received from. */
valkeyContext *c = valkeyContextInit();
c->tcp.host = strdup("127.0.0.99");

valkeyReply *reply = create_cluster_nodes_reply(
"752d150249c157c7cb312b6b056517bbbecb42d2 :6379@16379 myself,master - 0 0 0 connected 5461-10922\n"
"e839a12fbed631de867016f636d773e644562e72 127.0.0.1:6379@16379 master - 0 1658755601000 1 connected 0-5460\n"
"87f785c4a51f58c06e4be55de8c112210a811db9 127.0.0.2:6379@16379 master - 0 1658755602418 3 connected 10923-16383\n");
dict *nodes = parse_cluster_nodes(cc, c, reply);
freeReplyObject(reply);

assert(nodes);
assert(dictSize(nodes) == 3);
dictInitIterator(&di, nodes);
/* Verify node 1 */
node = dictGetEntryVal(dictNext(&di));
assert(strcmp(node->addr, "127.0.0.1:6379") == 0);
/* Verify node 2 */
node = dictGetEntryVal(dictNext(&di));
assert(strcmp(node->addr, "127.0.0.2:6379") == 0);
/* Verify node 3 */
node = dictGetEntryVal(dictNext(&di));
assert(strcmp(node->addr, "127.0.0.99:6379") == 0); /* Uses the IP from which the response was received from. */

dictRelease(nodes);
valkeyFree(c);
valkeyClusterFree(cc);
}

/* Parse replies with additional importing and migrating information. */
void test_parse_cluster_nodes_with_special_slot_entries(void) {
valkeyClusterContext *cc = valkeyClusterContextInit();
valkeyContext *c = valkeyContextInit();
valkeyClusterNode *node;
cluster_slot *slot;
dictIterator di;
Expand All @@ -220,7 +262,7 @@ void test_parse_cluster_nodes_with_special_slot_entries(void) {
* importing slot information that will be ignored. */
valkeyReply *reply = create_cluster_nodes_reply(
"4394d8eb03de1f524b56cb385f0eb9052ce65283 10.10.10.121:7000@17000 myself,master - 0 1625255653000 1 connected 0 2-5460 [0->-e7d1eecce10fd6bb5eb35b9f99a514335d9ba9ca] [1-<-292f8b365bb7edb5e285caf0b7e6ddc7265d2f4f]\n");
dict *nodes = parse_cluster_nodes(cc, reply);
dict *nodes = parse_cluster_nodes(cc, c, reply);
freeReplyObject(reply);

assert(nodes);
Expand All @@ -243,12 +285,14 @@ void test_parse_cluster_nodes_with_special_slot_entries(void) {
assert(slot->end == 5460);

dictRelease(nodes);
valkeyFree(c);
valkeyClusterFree(cc);
}

/* Parse a cluster nodes reply containing a primary with multiple replicas. */
void test_parse_cluster_nodes_with_multiple_replicas(void) {
valkeyClusterContext *cc = valkeyClusterContextInit();
valkeyContext *c = valkeyContextInit();
valkeyClusterNode *node;
cluster_slot *slot;
dictIterator di;
Expand All @@ -263,7 +307,7 @@ void test_parse_cluster_nodes_with_multiple_replicas(void) {
"e7d1eecce10fd6bb5eb35b9f99a514335d9ba9ca 127.0.0.1:30001@31001,hostname1 myself,master - 0 0 1 connected 0-16383\n"
"67ed2db8d677e59ec4a4cefb06858cf2a1a89fa1 127.0.0.1:30002@31002,hostname2 slave e7d1eecce10fd6bb5eb35b9f99a514335d9ba9ca 0 1426238316232 2 connected\n"
"292f8b365bb7edb5e285caf0b7e6ddc7265d2f4f 127.0.0.1:30003@31003,hostname3 slave e7d1eecce10fd6bb5eb35b9f99a514335d9ba9ca 0 1426238318243 3 connected\n");
dict *nodes = parse_cluster_nodes(cc, reply);
dict *nodes = parse_cluster_nodes(cc, c, reply);
freeReplyObject(reply);

/* Verify master. */
Expand Down Expand Up @@ -306,19 +350,21 @@ void test_parse_cluster_nodes_with_multiple_replicas(void) {
assert(node->role == VALKEY_ROLE_REPLICA);

dictRelease(nodes);
valkeyFree(c);
valkeyClusterFree(cc);
}

/* Give error when parsing erroneous data. */
void test_parse_cluster_nodes_with_parse_error(void) {
valkeyClusterContext *cc = valkeyClusterContextInit();
valkeyContext *c = valkeyContextInit();
valkeyReply *reply;
dict *nodes;

/* Missing link-state (and slots). */
reply = create_cluster_nodes_reply(
"e839a12fbed631de867016f636d773e644562e72 127.0.0.0:30001@31001 myself,master - 0 1658755601000 1 \n");
nodes = parse_cluster_nodes(cc, reply);
nodes = parse_cluster_nodes(cc, c, reply);
freeReplyObject(reply);
assert(nodes == NULL);
assert(cc->err == VALKEY_ERR_OTHER);
Expand All @@ -327,7 +373,7 @@ void test_parse_cluster_nodes_with_parse_error(void) {
/* Missing port. */
reply = create_cluster_nodes_reply(
"e839a12fbed631de867016f636d773e644562e72 127.0.0.0@31001 myself,master - 0 1658755601000 1 connected 0-5460\n");
nodes = parse_cluster_nodes(cc, reply);
nodes = parse_cluster_nodes(cc, c, reply);
freeReplyObject(reply);
assert(nodes == NULL);
assert(cc->err == VALKEY_ERR_OTHER);
Expand All @@ -336,35 +382,47 @@ void test_parse_cluster_nodes_with_parse_error(void) {
/* Missing port and cport. */
reply = create_cluster_nodes_reply(
"e839a12fbed631de867016f636d773e644562e72 127.0.0.0 myself,master - 0 1658755601000 1 connected 0-5460\n");
nodes = parse_cluster_nodes(cc, reply);
nodes = parse_cluster_nodes(cc, c, reply);
freeReplyObject(reply);
assert(nodes == NULL);
assert(cc->err == VALKEY_ERR_OTHER);
valkeyClusterClearError(cc);

/* Invalid port. */
reply = create_cluster_nodes_reply(
"e839a12fbed631de867016f636d773e644562e72 127.0.0.0:66000@67000 myself,master - 0 1658755601000 1 connected 0-5460\n");
nodes = parse_cluster_nodes(cc, c, reply);
freeReplyObject(reply);
assert(nodes == NULL);
assert(cc->err == VALKEY_ERR_OTHER);
valkeyClusterClearError(cc);

valkeyFree(c);
valkeyClusterFree(cc);
}

/* Redis pre-v4.0 returned node addresses without the clusterbus port,
* i.e. `ip:port` instead of `ip:port@cport` */
void test_parse_cluster_nodes_with_legacy_format(void) {
valkeyClusterContext *cc = valkeyClusterContextInit();
valkeyContext *c = valkeyContextInit();
valkeyClusterNode *node;
dictIterator di;

valkeyReply *reply = create_cluster_nodes_reply(
"e839a12fbed631de867016f636d773e644562e72 127.0.0.0:6379 myself,master - 0 1658755601000 1 connected 0-5460\n"
"752d150249c157c7cb312b6b056517bbbecb42d2 :0 master,noaddr - 1658754833817 1658754833000 3 disconnected 5461-10922\n");
dict *nodes = parse_cluster_nodes(cc, reply);
dict *nodes = parse_cluster_nodes(cc, c, reply);
freeReplyObject(reply);

assert(nodes);
assert(dictSize(nodes) == 1); /* Only 1 master since ":0" is skipped. */
assert(dictSize(nodes) == 1); /* Only 1 primary since `noaddr` nodes are skipped. */
dictInitIterator(&di, nodes);
node = dictGetEntryVal(dictNext(&di));
assert(strcmp(node->addr, "127.0.0.0:6379") == 0);

dictRelease(nodes);
valkeyFree(c);
valkeyClusterFree(cc);
}

Expand All @@ -373,6 +431,7 @@ int main(void) {
test_parse_cluster_nodes(true /* replicas parsed */);
test_parse_cluster_nodes_during_failover();
test_parse_cluster_nodes_with_noaddr();
test_parse_cluster_nodes_with_empty_ip();
test_parse_cluster_nodes_with_special_slot_entries();
test_parse_cluster_nodes_with_multiple_replicas();
test_parse_cluster_nodes_with_parse_error();
Expand Down
Loading