diff --git a/src/cluster.c b/src/cluster.c index a79e280..c15240a 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -748,12 +748,16 @@ static int store_replica_nodes(dict *nodes, dict *replicas) { return VALKEY_OK; } -/* Parse a node from a single CLUSTER NODES line. Returns an allocated - * valkeyClusterNode as a pointer in `parsed_node`. +/* Parse a node from a single CLUSTER NODES line. + * Returns VALKEY_OK and an allocated valkeyClusterNode as a pointer in + * `parsed_node`, or VALKEY_ERR when the parsing fails. * Only parse primary nodes if the `parsed_primary_id` argument is NULL, * otherwise replicas are also parsed and its primary_id is returned by pointer - * via 'parsed_primary_id'. */ -static int parse_cluster_nodes_line(valkeyClusterContext *cc, char *line, + * via 'parsed_primary_id'. + * The valkeyContext used when sending the CLUSTER NODES command should be + * provided in `c` since its destination IP address is used when no IP address + * is found in the parsed string. */ +static int parse_cluster_nodes_line(valkeyClusterContext *cc, valkeyContext *c, char *line, valkeyClusterNode **parsed_node, char **parsed_primary_id) { char *p, *id = NULL, *addr = NULL, *flags = NULL, *primary_id = NULL, *link_state = NULL, *slots = NULL; @@ -788,13 +792,13 @@ static int parse_cluster_nodes_line(valkeyClusterContext *cc, char *line, while (*flags != '\0') { if ((p = strchr(flags, ',')) != NULL) *p = '\0'; - if (memcmp(flags, "master", 6) == 0) { + if (memcmp(flags, "master", 6) == 0) role = VALKEY_ROLE_PRIMARY; - break; - } - if (memcmp(flags, "slave", 5) == 0) { + else if (memcmp(flags, "slave", 5) == 0) role = VALKEY_ROLE_REPLICA; - break; + else if (memcmp(flags, "noaddr", 6) == 0) { + *parsed_node = NULL; + return VALKEY_OK; /* Skip nodes with 'noaddr'. */ } if (p == NULL) /* No more flags. */ break; @@ -825,31 +829,53 @@ static int parse_cluster_nodes_line(valkeyClusterContext *cc, char *line, if ((p = strchr(addr, PORT_CPORT_SEPARATOR)) != NULL) { *p = '\0'; } - node->addr = sdsnew(addr); - if (node->addr == NULL) - goto oom; - /* Get the host part */ + /* Find the required port separator. */ if ((p = strrchr(addr, IP_PORT_SEPARATOR)) == NULL) { valkeyClusterSetError(cc, VALKEY_ERR_OTHER, "Invalid node address"); freeValkeyClusterNode(node); return VALKEY_ERR; } - *p = '\0'; - /* Skip nodes where address starts with ":0", i.e. 'noaddr'. */ - if (strlen(addr) == 0) { + /* Get the port (skip the found port separator). */ + int port = vk_atoi(p + 1, strlen(p + 1)); + if (port < 1 || port > UINT16_MAX) { + valkeyClusterSetError(cc, VALKEY_ERR_OTHER, "Invalid port"); freeValkeyClusterNode(node); - *parsed_node = NULL; - return VALKEY_OK; + return VALKEY_ERR; } - node->host = sdsnew(addr); - if (node->host == NULL) - goto oom; + node->port = port; - /* Get the port. */ - p++; // Skip separator character. - node->port = vk_atoi(p, strlen(p)); + /* Check that we received an ip/host address, i.e. the field + * does not start with the port separator. */ + if (p != addr) { + node->addr = sdsnew(addr); + if (node->addr == NULL) + goto oom; + + *p = '\0'; /* Cut port separator. */ + + node->host = sdsnew(addr); + if (node->host == NULL) + goto oom; + + } else { + /* We received an ip/host that is an empty string. According to the docs + * we can treat this as it means the same address we sent this command to. */ + node->host = sdsnew(c->tcp.host); + if (node->host == NULL) { + goto oom; + } + /* Create a new addr field using correct host:port */ + node->addr = sdsnew(node->host); + if (node->addr == NULL) { + goto oom; + } + node->addr = sdscatfmt(node->addr, ":%i", node->port); + if (node->addr == NULL) { + goto oom; + } + } /* No slot parsing needed for replicas, but return primary id. */ if (node->role == VALKEY_ROLE_REPLICA) { @@ -911,7 +937,7 @@ static int parse_cluster_nodes_line(valkeyClusterContext *cc, char *line, /** * Parse the "cluster nodes" command reply to nodes dict. */ -static dict *parse_cluster_nodes(valkeyClusterContext *cc, valkeyReply *reply) { +static dict *parse_cluster_nodes(valkeyClusterContext *cc, valkeyContext *c, valkeyReply *reply) { dict *nodes = NULL; int slot_ranges_found = 0; int add_replicas = cc->flags & VALKEYCLUSTER_FLAG_ADD_SLAVE; @@ -936,7 +962,7 @@ static dict *parse_cluster_nodes(valkeyClusterContext *cc, valkeyReply *reply) { char *primary_id; valkeyClusterNode *node; - if (parse_cluster_nodes_line(cc, line, &node, add_replicas ? &primary_id : NULL) != VALKEY_OK) + if (parse_cluster_nodes_line(cc, c, line, &node, add_replicas ? &primary_id : NULL) != VALKEY_OK) goto error; if (node == NULL) continue; /* Line skipped. */ @@ -1037,7 +1063,7 @@ static int clusterUpdateRouteHandleReply(valkeyClusterContext *cc, if (cc->flags & VALKEYCLUSTER_FLAG_ROUTE_USE_SLOTS) { nodes = parse_cluster_slots(cc, reply); } else { - nodes = parse_cluster_nodes(cc, reply); + nodes = parse_cluster_nodes(cc, c, reply); } freeReplyObject(reply); return updateNodesAndSlotmap(cc, nodes); @@ -2896,7 +2922,7 @@ void clusterNodesReplyCallback(valkeyAsyncContext *ac, void *r, } valkeyClusterContext *cc = acc->cc; - dict *nodes = parse_cluster_nodes(cc, reply); + dict *nodes = parse_cluster_nodes(cc, &ac->c, reply); if (updateNodesAndSlotmap(cc, nodes) != VALKEY_OK) { /* Ignore failures for now */ } diff --git a/tests/ut_slotmap_update.c b/tests/ut_slotmap_update.c index bfeec6e..7d19688 100644 --- a/tests/ut_slotmap_update.c +++ b/tests/ut_slotmap_update.c @@ -13,8 +13,9 @@ const char *__asan_default_options(void) { } #endif -/* Includes source file to test static functions. */ +/* Includes source files to test static functions. */ #include "cluster.c" +#include "valkey.c" #include @@ -37,6 +38,7 @@ valkeyReply *create_cluster_nodes_reply(const char *bulkstr) { /* Parse a cluster nodes reply from a basic deployment. */ void test_parse_cluster_nodes(bool parse_replicas) { valkeyClusterContext *cc = valkeyClusterContextInit(); + valkeyContext *c = valkeyContextInit(); valkeyClusterNode *node; cluster_slot *slot; dictIterator di; @@ -51,7 +53,7 @@ void test_parse_cluster_nodes(bool parse_replicas) { "6ec23923021cf3ffec47632106199cb7f496ce01 127.0.0.1:30005@31005,hostname5 slave 67ed2db8d677e59ec4a4cefb06858cf2a1a89fa1 0 1426238316232 5 connected\n" "824fe116063bc5fcf9f4ffd895bc17aee7731ac3 127.0.0.1:30006@31006,hostname6 slave 292f8b365bb7edb5e285caf0b7e6ddc7265d2f4f 0 1426238317741 6 connected\n" "e7d1eecce10fd6bb5eb35b9f99a514335d9ba9ca 127.0.0.1:30001@31001,hostname1 myself,master - 0 0 1 connected 0-5460\n"); - dict *nodes = parse_cluster_nodes(cc, reply); + dict *nodes = parse_cluster_nodes(cc, c, reply); freeReplyObject(reply); assert(nodes); @@ -116,11 +118,13 @@ void test_parse_cluster_nodes(bool parse_replicas) { } dictRelease(nodes); + valkeyFree(c); valkeyClusterFree(cc); } void test_parse_cluster_nodes_during_failover(void) { valkeyClusterContext *cc = valkeyClusterContextInit(); + valkeyContext *c = valkeyContextInit(); valkeyClusterNode *node; cluster_slot *slot; dictIterator di; @@ -133,7 +137,7 @@ void test_parse_cluster_nodes_during_failover(void) { "ad0f5210dda1736a1b5467cd6e797f011a192097 10.10.10.125:7000@17000 slave 4394d8eb03de1f524b56cb385f0eb9052ce65283 0 1625255656366 1 connected\n" "8675cd30fdd4efa088634e50fbd5c0675238a35e 10.10.10.124:7000@17000 slave 22de56650b3714c1c42fc0d120f80c66c24d8795 0 1625255655360 3 connected\n" "4394d8eb03de1f524b56cb385f0eb9052ce65283 10.10.10.121:7000@17000 myself,master - 0 1625255653000 1 connected 0-5460\n"); - dict *nodes = parse_cluster_nodes(cc, reply); + dict *nodes = parse_cluster_nodes(cc, c, reply); freeReplyObject(reply); assert(nodes); @@ -178,12 +182,14 @@ void test_parse_cluster_nodes_during_failover(void) { assert(slot->end == 5460); dictRelease(nodes); + valkeyFree(c); valkeyClusterFree(cc); } -/* Skip nodes with no address, i.e with address :0 */ +/* Skip nodes with the `noaddr` flag. */ void test_parse_cluster_nodes_with_noaddr(void) { valkeyClusterContext *cc = valkeyClusterContextInit(); + valkeyContext *c = valkeyContextInit(); valkeyClusterNode *node; dictIterator di; @@ -191,11 +197,11 @@ void test_parse_cluster_nodes_with_noaddr(void) { "752d150249c157c7cb312b6b056517bbbecb42d2 :0@0 master,noaddr - 1658754833817 1658754833000 3 disconnected 5461-10922\n" "e839a12fbed631de867016f636d773e644562e72 127.0.0.0:6379@16379 myself,master - 0 1658755601000 1 connected 0-5460\n" "87f785c4a51f58c06e4be55de8c112210a811db9 127.0.0.2:6379@16379 master - 0 1658755602418 3 connected 10923-16383\n"); - dict *nodes = parse_cluster_nodes(cc, reply); + dict *nodes = parse_cluster_nodes(cc, c, reply); freeReplyObject(reply); assert(nodes); - assert(dictSize(nodes) == 2); /* Only 2 masters since ":0" is skipped. */ + assert(dictSize(nodes) == 2); /* Only 2 primaries since `noaddr` nodes are skipped. */ dictInitIterator(&di, nodes); /* Verify node 1 */ node = dictGetEntryVal(dictNext(&di)); @@ -205,12 +211,48 @@ void test_parse_cluster_nodes_with_noaddr(void) { assert(strcmp(node->addr, "127.0.0.2:6379") == 0); dictRelease(nodes); + valkeyFree(c); + valkeyClusterFree(cc); +} + +void test_parse_cluster_nodes_with_empty_ip(void) { + valkeyClusterContext *cc = valkeyClusterContextInit(); + valkeyClusterNode *node; + dictIterator di; + + /* Set the IP from which the response is received from. */ + valkeyContext *c = valkeyContextInit(); + c->tcp.host = strdup("127.0.0.99"); + + valkeyReply *reply = create_cluster_nodes_reply( + "752d150249c157c7cb312b6b056517bbbecb42d2 :6379@16379 myself,master - 0 0 0 connected 5461-10922\n" + "e839a12fbed631de867016f636d773e644562e72 127.0.0.1:6379@16379 master - 0 1658755601000 1 connected 0-5460\n" + "87f785c4a51f58c06e4be55de8c112210a811db9 127.0.0.2:6379@16379 master - 0 1658755602418 3 connected 10923-16383\n"); + dict *nodes = parse_cluster_nodes(cc, c, reply); + freeReplyObject(reply); + + assert(nodes); + assert(dictSize(nodes) == 3); + dictInitIterator(&di, nodes); + /* Verify node 1 */ + node = dictGetEntryVal(dictNext(&di)); + assert(strcmp(node->addr, "127.0.0.1:6379") == 0); + /* Verify node 2 */ + node = dictGetEntryVal(dictNext(&di)); + assert(strcmp(node->addr, "127.0.0.2:6379") == 0); + /* Verify node 3 */ + node = dictGetEntryVal(dictNext(&di)); + assert(strcmp(node->addr, "127.0.0.99:6379") == 0); /* Uses the IP from which the response was received from. */ + + dictRelease(nodes); + valkeyFree(c); valkeyClusterFree(cc); } /* Parse replies with additional importing and migrating information. */ void test_parse_cluster_nodes_with_special_slot_entries(void) { valkeyClusterContext *cc = valkeyClusterContextInit(); + valkeyContext *c = valkeyContextInit(); valkeyClusterNode *node; cluster_slot *slot; dictIterator di; @@ -220,7 +262,7 @@ void test_parse_cluster_nodes_with_special_slot_entries(void) { * importing slot information that will be ignored. */ valkeyReply *reply = create_cluster_nodes_reply( "4394d8eb03de1f524b56cb385f0eb9052ce65283 10.10.10.121:7000@17000 myself,master - 0 1625255653000 1 connected 0 2-5460 [0->-e7d1eecce10fd6bb5eb35b9f99a514335d9ba9ca] [1-<-292f8b365bb7edb5e285caf0b7e6ddc7265d2f4f]\n"); - dict *nodes = parse_cluster_nodes(cc, reply); + dict *nodes = parse_cluster_nodes(cc, c, reply); freeReplyObject(reply); assert(nodes); @@ -243,12 +285,14 @@ void test_parse_cluster_nodes_with_special_slot_entries(void) { assert(slot->end == 5460); dictRelease(nodes); + valkeyFree(c); valkeyClusterFree(cc); } /* Parse a cluster nodes reply containing a primary with multiple replicas. */ void test_parse_cluster_nodes_with_multiple_replicas(void) { valkeyClusterContext *cc = valkeyClusterContextInit(); + valkeyContext *c = valkeyContextInit(); valkeyClusterNode *node; cluster_slot *slot; dictIterator di; @@ -263,7 +307,7 @@ void test_parse_cluster_nodes_with_multiple_replicas(void) { "e7d1eecce10fd6bb5eb35b9f99a514335d9ba9ca 127.0.0.1:30001@31001,hostname1 myself,master - 0 0 1 connected 0-16383\n" "67ed2db8d677e59ec4a4cefb06858cf2a1a89fa1 127.0.0.1:30002@31002,hostname2 slave e7d1eecce10fd6bb5eb35b9f99a514335d9ba9ca 0 1426238316232 2 connected\n" "292f8b365bb7edb5e285caf0b7e6ddc7265d2f4f 127.0.0.1:30003@31003,hostname3 slave e7d1eecce10fd6bb5eb35b9f99a514335d9ba9ca 0 1426238318243 3 connected\n"); - dict *nodes = parse_cluster_nodes(cc, reply); + dict *nodes = parse_cluster_nodes(cc, c, reply); freeReplyObject(reply); /* Verify master. */ @@ -306,19 +350,21 @@ void test_parse_cluster_nodes_with_multiple_replicas(void) { assert(node->role == VALKEY_ROLE_REPLICA); dictRelease(nodes); + valkeyFree(c); valkeyClusterFree(cc); } /* Give error when parsing erroneous data. */ void test_parse_cluster_nodes_with_parse_error(void) { valkeyClusterContext *cc = valkeyClusterContextInit(); + valkeyContext *c = valkeyContextInit(); valkeyReply *reply; dict *nodes; /* Missing link-state (and slots). */ reply = create_cluster_nodes_reply( "e839a12fbed631de867016f636d773e644562e72 127.0.0.0:30001@31001 myself,master - 0 1658755601000 1 \n"); - nodes = parse_cluster_nodes(cc, reply); + nodes = parse_cluster_nodes(cc, c, reply); freeReplyObject(reply); assert(nodes == NULL); assert(cc->err == VALKEY_ERR_OTHER); @@ -327,7 +373,7 @@ void test_parse_cluster_nodes_with_parse_error(void) { /* Missing port. */ reply = create_cluster_nodes_reply( "e839a12fbed631de867016f636d773e644562e72 127.0.0.0@31001 myself,master - 0 1658755601000 1 connected 0-5460\n"); - nodes = parse_cluster_nodes(cc, reply); + nodes = parse_cluster_nodes(cc, c, reply); freeReplyObject(reply); assert(nodes == NULL); assert(cc->err == VALKEY_ERR_OTHER); @@ -336,12 +382,22 @@ void test_parse_cluster_nodes_with_parse_error(void) { /* Missing port and cport. */ reply = create_cluster_nodes_reply( "e839a12fbed631de867016f636d773e644562e72 127.0.0.0 myself,master - 0 1658755601000 1 connected 0-5460\n"); - nodes = parse_cluster_nodes(cc, reply); + nodes = parse_cluster_nodes(cc, c, reply); + freeReplyObject(reply); + assert(nodes == NULL); + assert(cc->err == VALKEY_ERR_OTHER); + valkeyClusterClearError(cc); + + /* Invalid port. */ + reply = create_cluster_nodes_reply( + "e839a12fbed631de867016f636d773e644562e72 127.0.0.0:66000@67000 myself,master - 0 1658755601000 1 connected 0-5460\n"); + nodes = parse_cluster_nodes(cc, c, reply); freeReplyObject(reply); assert(nodes == NULL); assert(cc->err == VALKEY_ERR_OTHER); valkeyClusterClearError(cc); + valkeyFree(c); valkeyClusterFree(cc); } @@ -349,22 +405,24 @@ void test_parse_cluster_nodes_with_parse_error(void) { * i.e. `ip:port` instead of `ip:port@cport` */ void test_parse_cluster_nodes_with_legacy_format(void) { valkeyClusterContext *cc = valkeyClusterContextInit(); + valkeyContext *c = valkeyContextInit(); valkeyClusterNode *node; dictIterator di; valkeyReply *reply = create_cluster_nodes_reply( "e839a12fbed631de867016f636d773e644562e72 127.0.0.0:6379 myself,master - 0 1658755601000 1 connected 0-5460\n" "752d150249c157c7cb312b6b056517bbbecb42d2 :0 master,noaddr - 1658754833817 1658754833000 3 disconnected 5461-10922\n"); - dict *nodes = parse_cluster_nodes(cc, reply); + dict *nodes = parse_cluster_nodes(cc, c, reply); freeReplyObject(reply); assert(nodes); - assert(dictSize(nodes) == 1); /* Only 1 master since ":0" is skipped. */ + assert(dictSize(nodes) == 1); /* Only 1 primary since `noaddr` nodes are skipped. */ dictInitIterator(&di, nodes); node = dictGetEntryVal(dictNext(&di)); assert(strcmp(node->addr, "127.0.0.0:6379") == 0); dictRelease(nodes); + valkeyFree(c); valkeyClusterFree(cc); } @@ -373,6 +431,7 @@ int main(void) { test_parse_cluster_nodes(true /* replicas parsed */); test_parse_cluster_nodes_during_failover(); test_parse_cluster_nodes_with_noaddr(); + test_parse_cluster_nodes_with_empty_ip(); test_parse_cluster_nodes_with_special_slot_entries(); test_parse_cluster_nodes_with_multiple_replicas(); test_parse_cluster_nodes_with_parse_error();