From fb8af4c786c073a64264ccb478364e85502aca64 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Svensson?= Date: Thu, 23 Jan 2025 14:03:55 +0100 Subject: [PATCH] Retry when an async slotmap update fails (#159) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Small fix to reattempt failing slotmap updates. - Handle empty and NULL addresses in `CLUSTER SLOTS` responses. As described in cluster slots docs: "Clients may treat the empty string in the same way as NULL, that is the same endpoint it used to send the current command to" - clusterclient_async received a new flag --async-initial-update to enable usage of valkeyClusterAsyncConnect2, which avoids blocking initial slotmap update. - Use short timeout when scheduling processing of next command in clusterclient_async and allow the event engine to read from sockets before next command, which improves predictability in tests. Signed-off-by: Björn Svensson --- src/cluster.c | 96 ++++++++----------- tests/CMakeLists.txt | 8 ++ tests/clusterclient_async.c | 45 ++++++--- .../connect-during-cluster-startup-test.sh | 90 +++++++++++++++++ ...luster-startup-using-cluster-nodes-test.sh | 68 +++++++++++++ ...o-all-nodes-during-scaledown-test-async.sh | 20 +--- tests/ut_slotmap_update.c | 64 ++++++++++--- 7 files changed, 295 insertions(+), 96 deletions(-) create mode 100755 tests/scripts/connect-during-cluster-startup-test.sh create mode 100755 tests/scripts/connect-during-cluster-startup-using-cluster-nodes-test.sh diff --git a/src/cluster.c b/src/cluster.c index c15240aa..6f277420 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -389,37 +389,9 @@ static int authenticate(valkeyClusterContext *cc, valkeyContext *c) { * Return a new node with the "cluster slots" command reply. */ static valkeyClusterNode *node_get_with_slots(valkeyClusterContext *cc, - valkeyReply *host_elem, - valkeyReply *port_elem, + char *host, int port, uint8_t role) { - valkeyClusterNode *node = NULL; - - if (host_elem == NULL || port_elem == NULL) { - return NULL; - } - - if (host_elem->type != VALKEY_REPLY_STRING || host_elem->len <= 0) { - valkeyClusterSetError(cc, VALKEY_ERR_OTHER, - "Command(cluster slots) reply error: " - "node ip is not string."); - goto error; - } - - if (port_elem->type != VALKEY_REPLY_INTEGER) { - valkeyClusterSetError(cc, VALKEY_ERR_OTHER, - "Command(cluster slots) reply error: " - "node port is not integer."); - goto error; - } - - if (port_elem->integer < 1 || port_elem->integer > UINT16_MAX) { - valkeyClusterSetError(cc, VALKEY_ERR_OTHER, - "Command(cluster slots) reply error: " - "node port is not valid."); - goto error; - } - - node = createValkeyClusterNode(); + valkeyClusterNode *node = createValkeyClusterNode(); if (node == NULL) { goto oom; } @@ -433,29 +405,26 @@ static valkeyClusterNode *node_get_with_slots(valkeyClusterContext *cc, node->slots->free = listClusterSlotDestructor; } - node->addr = sdsnewlen(host_elem->str, host_elem->len); + node->addr = sdsnew(host); if (node->addr == NULL) { goto oom; } - node->addr = sdscatfmt(node->addr, ":%i", port_elem->integer); + node->addr = sdscatfmt(node->addr, ":%i", port); if (node->addr == NULL) { goto oom; } - node->host = sdsnewlen(host_elem->str, host_elem->len); + node->host = sdsnew(host); if (node->host == NULL) { goto oom; } node->name = NULL; - node->port = (int)port_elem->integer; + node->port = port; node->role = role; return node; oom: valkeyClusterSetError(cc, VALKEY_ERR_OOM, "Out of memory"); - // passthrough - -error: if (node != NULL) { sdsfree(node->addr); sdsfree(node->host); @@ -510,7 +479,8 @@ static void cluster_nodes_swap_ctx(dict *nodes_f, dict *nodes_t) { /** * Parse the "cluster slots" command reply to nodes dict. */ -static dict *parse_cluster_slots(valkeyClusterContext *cc, valkeyReply *reply) { +static dict *parse_cluster_slots(valkeyClusterContext *cc, valkeyContext *c, + valkeyReply *reply) { int ret; cluster_slot *slot = NULL; dict *nodes = NULL; @@ -594,22 +564,39 @@ static dict *parse_cluster_slots(valkeyClusterContext *cc, valkeyReply *reply) { elem_ip = elem_nodes->element[0]; elem_port = elem_nodes->element[1]; - if (elem_ip == NULL || elem_port == NULL || - elem_ip->type != VALKEY_REPLY_STRING || - elem_port->type != VALKEY_REPLY_INTEGER) { - valkeyClusterSetError(cc, VALKEY_ERR_OTHER, - "Command(cluster slots) reply error: " - "ip or port is not correct."); + /* Validate ip element. Accept a NULL value ip (NIL type) since + * we will handle the unknown endpoint special. */ + if (elem_ip == NULL || (elem_ip->type != VALKEY_REPLY_STRING && + elem_ip->type != VALKEY_REPLY_NIL)) { + valkeyClusterSetError(cc, VALKEY_ERR_OTHER, "Invalid node address"); goto error; } + /* Validate port element. */ + if (elem_port == NULL || elem_port->type != VALKEY_REPLY_INTEGER || + (elem_port->integer < 1 || elem_port->integer > UINT16_MAX)) { + valkeyClusterSetError(cc, VALKEY_ERR_OTHER, "Invalid port"); + goto error; + } + + /* Get the received ip/host. According to the docs an unknown + * endpoint or an empty string can be treated as it means + * the same address as we sent this command to. + * An unknown endpoint has the type VALKEY_REPLY_NIL and its + * length is initiated to zero. */ + char *host = (elem_ip->len > 0) ? elem_ip->str : c->tcp.host; + if (host == NULL) { + goto oom; + } + int port = elem_port->integer; + if (idx == 2) { /* Parse a primary node. */ - sds address = sdsnewlen(elem_ip->str, elem_ip->len); + sds address = sdsnew(host); if (address == NULL) { goto oom; } - address = sdscatfmt(address, ":%i", elem_port->integer); + address = sdscatfmt(address, ":%i", port); if (address == NULL) { goto oom; } @@ -628,8 +615,7 @@ static dict *parse_cluster_slots(valkeyClusterContext *cc, valkeyReply *reply) { break; } - primary = node_get_with_slots(cc, elem_ip, elem_port, - VALKEY_ROLE_PRIMARY); + primary = node_get_with_slots(cc, host, port, VALKEY_ROLE_PRIMARY); if (primary == NULL) { goto error; } @@ -654,7 +640,7 @@ static dict *parse_cluster_slots(valkeyClusterContext *cc, valkeyReply *reply) { slot = NULL; } else if (cc->flags & VALKEYCLUSTER_FLAG_ADD_SLAVE) { - replica = node_get_with_slots(cc, elem_ip, elem_port, + replica = node_get_with_slots(cc, host, port, VALKEY_ROLE_REPLICA); if (replica == NULL) { goto error; @@ -1061,7 +1047,7 @@ static int clusterUpdateRouteHandleReply(valkeyClusterContext *cc, dict *nodes; if (cc->flags & VALKEYCLUSTER_FLAG_ROUTE_USE_SLOTS) { - nodes = parse_cluster_slots(cc, reply); + nodes = parse_cluster_slots(cc, c, reply); } else { nodes = parse_cluster_nodes(cc, c, reply); } @@ -2889,7 +2875,6 @@ int valkeyClusterAsyncSetDisconnectCallback(valkeyClusterAsyncContext *acc, /* Reply callback function for CLUSTER SLOTS */ void clusterSlotsReplyCallback(valkeyAsyncContext *ac, void *r, void *privdata) { - UNUSED(ac); valkeyReply *reply = (valkeyReply *)r; valkeyClusterAsyncContext *acc = (valkeyClusterAsyncContext *)privdata; acc->lastSlotmapUpdateAttempt = vk_usec_now(); @@ -2901,16 +2886,16 @@ void clusterSlotsReplyCallback(valkeyAsyncContext *ac, void *r, } valkeyClusterContext *cc = acc->cc; - dict *nodes = parse_cluster_slots(cc, reply); + dict *nodes = parse_cluster_slots(cc, &ac->c, reply); if (updateNodesAndSlotmap(cc, nodes) != VALKEY_OK) { - /* Ignore failures for now */ + /* Retry using available nodes */ + updateSlotMapAsync(acc, NULL); } } /* Reply callback function for CLUSTER NODES */ void clusterNodesReplyCallback(valkeyAsyncContext *ac, void *r, void *privdata) { - UNUSED(ac); valkeyReply *reply = (valkeyReply *)r; valkeyClusterAsyncContext *acc = (valkeyClusterAsyncContext *)privdata; acc->lastSlotmapUpdateAttempt = vk_usec_now(); @@ -2924,7 +2909,8 @@ void clusterNodesReplyCallback(valkeyAsyncContext *ac, void *r, valkeyClusterContext *cc = acc->cc; dict *nodes = parse_cluster_nodes(cc, &ac->c, reply); if (updateNodesAndSlotmap(cc, nodes) != VALKEY_OK) { - /* Ignore failures for now */ + /* Retry using available nodes */ + updateSlotMapAsync(acc, NULL); } } diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index a7bdb30c..39db3f50 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -279,4 +279,12 @@ if (LIBEVENT_LIBRARY) COMMAND "${CMAKE_SOURCE_DIR}/tests/scripts/client-disconnect-without-slotmap-update-test.sh" "$" WORKING_DIRECTORY "${CMAKE_SOURCE_DIR}/tests/scripts/") + add_test(NAME connect-during-cluster-startup-test-async + COMMAND "${CMAKE_SOURCE_DIR}/tests/scripts/connect-during-cluster-startup-test.sh" + "$" + WORKING_DIRECTORY "${CMAKE_SOURCE_DIR}/tests/scripts/") + add_test(NAME connect-during-cluster-startup-using-cluster-nodes-test-async + COMMAND "${CMAKE_SOURCE_DIR}/tests/scripts/connect-during-cluster-startup-using-cluster-nodes-test.sh" + "$" + WORKING_DIRECTORY "${CMAKE_SOURCE_DIR}/tests/scripts/") endif() diff --git a/tests/clusterclient_async.c b/tests/clusterclient_async.c index d19f68d6..e81c5fee 100644 --- a/tests/clusterclient_async.c +++ b/tests/clusterclient_async.c @@ -56,6 +56,8 @@ char cmd_history[HISTORY_DEPTH][CMD_SIZE]; int num_running = 0; int resend_failed_cmd = 0; int send_to_all = 0; +int show_events = 0; +int async_initial_update = 0; void sendNextCommand(evutil_socket_t, short, void *); @@ -100,8 +102,9 @@ void replyCallback(valkeyClusterAsyncContext *acc, void *r, void *privdata) { if (--num_running == 0) { /* Schedule a read from stdin and send next command */ + struct timeval timeout = {0, 10}; struct event_base *base = acc->attach_data; - event_base_once(base, -1, EV_TIMEOUT, sendNextCommand, acc, NULL); + event_base_once(base, -1, EV_TIMEOUT, sendNextCommand, acc, &timeout); } } @@ -172,8 +175,9 @@ void sendNextCommand(evutil_socket_t fd, short kind, void *arg) { printf("error: %s\n", acc->errstr); /* Schedule a read from stdin and handle next command. */ + struct timeval timeout = {0, 10}; struct event_base *base = acc->attach_data; - event_base_once(base, -1, EV_TIMEOUT, sendNextCommand, acc, NULL); + event_base_once(base, -1, EV_TIMEOUT, sendNextCommand, acc, &timeout); } } @@ -189,7 +193,17 @@ void sendNextCommand(evutil_socket_t fd, short kind, void *arg) { void eventCallback(const valkeyClusterContext *cc, int event, void *privdata) { (void)cc; - (void)privdata; + if (event == VALKEYCLUSTER_EVENT_READY) { + /* Schedule a read from stdin and send next command. */ + valkeyClusterAsyncContext *acc = (valkeyClusterAsyncContext *)privdata; + struct timeval timeout = {0, 10}; + struct event_base *base = acc->attach_data; + event_base_once(base, -1, EV_TIMEOUT, sendNextCommand, acc, &timeout); + } + + if (!show_events) + return; + const char *e = NULL; switch (event) { case VALKEYCLUSTER_EVENT_SLOTMAP_UPDATED: @@ -224,7 +238,6 @@ void disconnectCallback(const valkeyAsyncContext *ac, int status) { int main(int argc, char **argv) { int use_cluster_slots = 1; // Get topology via CLUSTER SLOTS - int show_events = 0; int show_connection_events = 0; int optind; @@ -235,6 +248,8 @@ int main(int argc, char **argv) { show_events = 1; } else if (strcmp(argv[optind], "--connection-events") == 0) { show_connection_events = 1; + } else if (strcmp(argv[optind], "--async-initial-update") == 0) { + async_initial_update = 1; } else { fprintf(stderr, "Unknown argument: '%s'\n", argv[optind]); } @@ -254,28 +269,30 @@ int main(int argc, char **argv) { valkeyClusterSetOptionTimeout(acc->cc, timeout); valkeyClusterSetOptionConnectTimeout(acc->cc, timeout); valkeyClusterSetOptionMaxRetry(acc->cc, 1); + valkeyClusterSetEventCallback(acc->cc, eventCallback, acc); if (use_cluster_slots) { valkeyClusterSetOptionRouteUseSlots(acc->cc); } - if (show_events) { - valkeyClusterSetEventCallback(acc->cc, eventCallback, NULL); - } if (show_connection_events) { valkeyClusterAsyncSetConnectCallback(acc, connectCallback); valkeyClusterAsyncSetDisconnectCallback(acc, disconnectCallback); } - if (valkeyClusterConnect2(acc->cc) != VALKEY_OK) { - printf("Connect error: %s\n", acc->cc->errstr); - exit(2); - } - struct event_base *base = event_base_new(); int status = valkeyClusterLibeventAttach(acc, base); assert(status == VALKEY_OK); - /* Schedule a read from stdin and send next command */ - event_base_once(base, -1, EV_TIMEOUT, sendNextCommand, acc, NULL); + if (async_initial_update) { + if (valkeyClusterAsyncConnect2(acc) != VALKEY_OK) { + printf("Connect error: %s\n", acc->errstr); + exit(2); + } + } else { + if (valkeyClusterConnect2(acc->cc) != VALKEY_OK) { + printf("Connect error: %s\n", acc->cc->errstr); + exit(2); + } + } event_base_dispatch(base); diff --git a/tests/scripts/connect-during-cluster-startup-test.sh b/tests/scripts/connect-during-cluster-startup-test.sh new file mode 100755 index 00000000..ca23189f --- /dev/null +++ b/tests/scripts/connect-during-cluster-startup-test.sh @@ -0,0 +1,90 @@ +#!/bin/sh +# +# Connect to a cluster which is in the processes of starting up. +# +# The first attempt to get the slotmap will receive a reply without any +# slot information and this should result in a retry. +# The following slotmap updates tests the handling of an nil/empty IP address. +# +# The client is configured to use the CLUSTER SLOTS command. +# +# Usage: $0 /path/to/clusterclient-binary + +clientprog=${1:-./clusterclient_async} +testname=connect-during-cluster-startup-test + +# Sync process just waiting for server to be ready to accept connection. +perl -we 'use sigtrap "handler", sub{exit}, "CONT"; sleep 1; die "timeout"' & +syncpid=$! + +# Start simulated server. +timeout 5s ./simulated-valkey.pl -p 7400 -d --sigcont $syncpid <<'EOF' & +# The initial slotmap is not covering any slots, expect a retry since it's not accepted. +EXPECT CONNECT +EXPECT ["CLUSTER", "SLOTS"] +SEND [] + +# The node has now been delegated a few slots and should be accepted. +# Respond with an unknown endpoint (nil) to test that current connection IP is used instead. +EXPECT ["CLUSTER", "SLOTS"] +SEND *1\r\n*3\r\n:0\r\n:10\r\n*3\r\n$-1\r\n:7400\r\n$40\r\nf5378fa2ad1fbd569f01ba2fe29fa8feb36cdfb8\r\n + +# The node has now been delegated all slots. +# Use empty address to test that current connection IP is used instead. +EXPECT ["CLUSTER", "SLOTS"] +SEND [[0, 16383, ["", 7400, "f5378fa2ad1fbd569f01ba2fe29fa8feb36cdfb8"]]] + +EXPECT ["SET", "foo", "bar3"] +SEND +OK +EXPECT CLOSE +EOF +server=$! + +# Wait until server is ready to accept client connection. +wait $syncpid; + +# Run client which will fetch the initial slotmap asynchronously. +timeout 3s "$clientprog" --events --async-initial-update 127.0.0.1:7400 > "$testname.out" <<'EOF' +# Slot not yet handled, will trigger a slotmap update which will be throttled. +SET foo bar1 + +# Wait to avoid slotmap update throttling. +!sleep + +# A command will fail directly, but a slotmap update is scheduled. +SET foo bar2 + +# Allow slotmap update to finish. +!sleep + +SET foo bar3 +EOF +clientexit=$? + +# Wait for server to exit. +wait $server; serverexit=$? + +# Check exit status on server. +if [ $serverexit -ne 0 ]; then + echo "Simulated server exited with status $serverexit" + exit $serverexit +fi +# Check exit status on client. +if [ $clientexit -ne 0 ]; then + echo "$clientprog exited with status $clientexit" + exit $clientexit +fi + +# Check the output from the client. +expected="Event: slotmap-updated +Event: ready +error: slot not served by any node +error: slot not served by any node +Event: slotmap-updated +OK +Event: free-context" + +echo "$expected" | diff -u - "$testname.out" || exit 99 + +# Clean up. +rm "$testname.out" diff --git a/tests/scripts/connect-during-cluster-startup-using-cluster-nodes-test.sh b/tests/scripts/connect-during-cluster-startup-using-cluster-nodes-test.sh new file mode 100755 index 00000000..99abff38 --- /dev/null +++ b/tests/scripts/connect-during-cluster-startup-using-cluster-nodes-test.sh @@ -0,0 +1,68 @@ +#!/bin/sh +# +# Connect to a cluster which is in the processes of starting up. +# +# The first attempt to get the slotmap will receive a reply without any +# slot information and this should result in a retry. +# +# The client is configured to use the CLUSTER NODES command. +# +# Usage: $0 /path/to/clusterclient-binary + +clientprog=${1:-./clusterclient_async} +testname=connect-during-cluster-startup-using-cluster-nodes-test + +# Sync process just waiting for server to be ready to accept connection. +perl -we 'use sigtrap "handler", sub{exit}, "CONT"; sleep 1; die "timeout"' & +syncpid=$! + +# Start simulated server. +timeout 5s ./simulated-valkey.pl -p 7400 -d --sigcont $syncpid <<'EOF' & +# The initial slotmap is not covering any slots, expect a retry. +EXPECT CONNECT +EXPECT ["CLUSTER", "NODES"] +SEND "8adca41945787ad1c9e725a40a43cf72bd4c6ad4 :7400@17400 myself,master - 0 0 0 connected\n" + +# The node has now been delegated slots. +EXPECT ["CLUSTER", "NODES"] +SEND "8adca41945787ad1c9e725a40a43cf72bd4c6ad4 :7400@17400 myself,master - 0 0 1 connected 0-16383\n" + +EXPECT ["SET", "foo", "bar"] +SEND +OK +EXPECT CLOSE +EOF +server=$! + +# Wait until server is ready to accept client connection. +wait $syncpid; + +# Run client which will fetch the initial slotmap asynchronously using CLUSTER NODES. +timeout 3s "$clientprog" --events --use-cluster-nodes --async-initial-update 127.0.0.1:7400 > "$testname.out" <<'EOF' +SET foo bar +EOF +clientexit=$? + +# Wait for server to exit. +wait $server; serverexit=$? + +# Check exit status on server. +if [ $serverexit -ne 0 ]; then + echo "Simulated server exited with status $serverexit" + exit $serverexit +fi +# Check exit status on client. +if [ $clientexit -ne 0 ]; then + echo "$clientprog exited with status $clientexit" + exit $clientexit +fi + +# Check the output from the client. +expected="Event: slotmap-updated +Event: ready +OK +Event: free-context" + +echo "$expected" | diff -u - "$testname.out" || exit 99 + +# Clean up. +rm "$testname.out" diff --git a/tests/scripts/dbsize-to-all-nodes-during-scaledown-test-async.sh b/tests/scripts/dbsize-to-all-nodes-during-scaledown-test-async.sh index 399694e5..7240b443 100755 --- a/tests/scripts/dbsize-to-all-nodes-during-scaledown-test-async.sh +++ b/tests/scripts/dbsize-to-all-nodes-during-scaledown-test-async.sh @@ -81,26 +81,14 @@ if [ $clientexit -ne 0 ]; then exit $clientexit fi -# Check the output from clusterclient, which depends on timing. -# Client sends the second 'DBSIZE' to node #2 just after node #2 closes its socket. -expected1="10 +# Check the output from clusterclient. +expected="10 20 -error: Server closed the connection +error: Connection refused 11 12" -# Client sends the second 'DBSIZE' to node #2 just before node #2 closes its socket. -expected2="10 -20 -error: Connection reset by peer -11 -12" - -# The reply "11" from node #1 can come before or after the socket error from node #2. -# Therefore, we sort before comparing. -diff -u <(echo "$expected1" | sort) <(sort "$testname.out") || \ - diff -u <(echo "$expected2" | sort) <(sort "$testname.out") || \ - exit 99 +echo "$expected" | diff -u - "$testname.out" || exit 99 # Clean up rm "$testname.out" diff --git a/tests/ut_slotmap_update.c b/tests/ut_slotmap_update.c index a2f43dda..ad6931c5 100644 --- a/tests/ut_slotmap_update.c +++ b/tests/ut_slotmap_update.c @@ -515,6 +515,7 @@ void test_parse_cluster_nodes_with_legacy_format(void) { /* Parse a cluster slots reply from a basic deployment. */ void test_parse_cluster_slots(bool parse_replicas) { valkeyClusterContext *cc = valkeyClusterContextInit(); + valkeyContext *c = valkeyContextInit(); valkeyClusterNode *node; cluster_slot *slot; dictIterator di; @@ -530,7 +531,7 @@ void test_parse_cluster_slots(bool parse_replicas) { " [10923, 16383, ['127.0.0.1', 30003, '292f8b365bb7edb5e285caf0b7e6ddc7265d2f4f', ['hostname', 'localhost']]" " ['127.0.0.1', 30006, '824fe116063bc5fcf9f4ffd895bc17aee7731ac3', ['hostname', 'localhost']]]]"); - dict *nodes = parse_cluster_slots(cc, reply); + dict *nodes = parse_cluster_slots(cc, c, reply); freeReplyObject(reply); assert(nodes); @@ -592,46 +593,84 @@ void test_parse_cluster_slots(bool parse_replicas) { } dictRelease(nodes); + valkeyFree(c); valkeyClusterFree(cc); } void test_parse_cluster_slots_with_empty_ip(void) { valkeyClusterContext *cc = valkeyClusterContextInit(); + valkeyClusterNode *node; + dictIterator di; + + /* Set the IP from which the response is received from. */ + valkeyContext *c = valkeyContextInit(); + c->tcp.host = strdup("127.0.0.99"); valkeyReply *reply = create_cluster_slots_reply( "[[0, 5460, ['', 6379, 'e7d1eecce10fd6bb5eb35b9f99a514335d9ba9ca']]," " [5461, 10922, ['127.0.0.1', 6379, '67ed2db8d677e59ec4a4cefb06858cf2a1a89fa1']]," - " [10923, 16383, ['127.0.0.1', 6379, '292f8b365bb7edb5e285caf0b7e6ddc7265d2f4f']]]"); + " [10923, 16383, ['127.0.0.2', 6379, '292f8b365bb7edb5e285caf0b7e6ddc7265d2f4f']]]"); - dict *nodes = parse_cluster_slots(cc, reply); + dict *nodes = parse_cluster_slots(cc, c, reply); freeReplyObject(reply); - assert(nodes == NULL); - assert(cc->err == VALKEY_ERR_OTHER); + assert(nodes); + assert(dictSize(nodes) == 3); + dictInitIterator(&di, nodes); + /* Verify node 1 */ + node = dictGetEntryVal(dictNext(&di)); + assert(strcmp(node->addr, "127.0.0.1:6379") == 0); + /* Verify node 2 */ + node = dictGetEntryVal(dictNext(&di)); + assert(strcmp(node->addr, "127.0.0.2:6379") == 0); + /* Verify node 3 */ + node = dictGetEntryVal(dictNext(&di)); + assert(strcmp(node->addr, "127.0.0.99:6379") == 0); /* Uses the IP from which the response was received from. */ + dictRelease(nodes); + valkeyFree(c); valkeyClusterFree(cc); } void test_parse_cluster_slots_with_null_ip(void) { valkeyClusterContext *cc = valkeyClusterContextInit(); + valkeyClusterNode *node; + dictIterator di; + + /* Set the IP from which the response is received from. */ + valkeyContext *c = valkeyContextInit(); + c->tcp.host = strdup("127.0.0.99"); valkeyReply *reply = create_cluster_slots_reply( "[[0, 5460, [null, 6379, 'e7d1eecce10fd6bb5eb35b9f99a514335d9ba9ca']]," " [5461, 10922, ['127.0.0.1', 6379, '67ed2db8d677e59ec4a4cefb06858cf2a1a89fa1']]," - " [10923, 16383, ['127.0.0.1', 6379, '292f8b365bb7edb5e285caf0b7e6ddc7265d2f4f']]]"); + " [10923, 16383, ['127.0.0.2', 6379, '292f8b365bb7edb5e285caf0b7e6ddc7265d2f4f']]]"); - dict *nodes = parse_cluster_slots(cc, reply); + dict *nodes = parse_cluster_slots(cc, c, reply); freeReplyObject(reply); - assert(nodes == NULL); - assert(cc->err == VALKEY_ERR_OTHER); + assert(nodes); + assert(dictSize(nodes) == 3); + dictInitIterator(&di, nodes); + /* Verify node 1 */ + node = dictGetEntryVal(dictNext(&di)); + assert(strcmp(node->addr, "127.0.0.1:6379") == 0); + /* Verify node 2 */ + node = dictGetEntryVal(dictNext(&di)); + assert(strcmp(node->addr, "127.0.0.2:6379") == 0); + /* Verify node 3 */ + node = dictGetEntryVal(dictNext(&di)); + assert(strcmp(node->addr, "127.0.0.99:6379") == 0); /* Uses the IP from which the response was received from. */ + dictRelease(nodes); + valkeyFree(c); valkeyClusterFree(cc); } /* Parse a cluster slots reply containing a primary with multiple replicas. */ void test_parse_cluster_slots_with_multiple_replicas(void) { valkeyClusterContext *cc = valkeyClusterContextInit(); + valkeyContext *c = valkeyContextInit(); valkeyClusterNode *node; cluster_slot *slot; dictIterator di; @@ -647,7 +686,7 @@ void test_parse_cluster_slots_with_multiple_replicas(void) { " ['127.0.0.1', 30002, '6ec23923021cf3ffec47632106199cb7f496ce01']," " ['127.0.0.1', 30003, '824fe116063bc5fcf9f4ffd895bc17aee7731ac3']]]"); - dict *nodes = parse_cluster_slots(cc, reply); + dict *nodes = parse_cluster_slots(cc, c, reply); freeReplyObject(reply); /* Verify primary. */ @@ -684,11 +723,13 @@ void test_parse_cluster_slots_with_multiple_replicas(void) { assert(node->role == VALKEY_ROLE_REPLICA); dictRelease(nodes); + valkeyFree(c); valkeyClusterFree(cc); } void test_parse_cluster_slots_with_noncontiguous_slots(void) { valkeyClusterContext *cc = valkeyClusterContextInit(); + valkeyContext *c = valkeyContextInit(); valkeyClusterNode *node; cluster_slot *slot; dictIterator di; @@ -704,7 +745,7 @@ void test_parse_cluster_slots_with_noncontiguous_slots(void) { " [4, 5460, ['127.0.0.1', 30001, 'e7d1eecce10fd6bb5eb35b9f99a514335d9ba9ca']," " ['127.0.0.1', 30004, '07c37dfeb235213a872192d90877d0cd55635b91']]]"); - dict *nodes = parse_cluster_slots(cc, reply); + dict *nodes = parse_cluster_slots(cc, c, reply); freeReplyObject(reply); /* Verify primary. */ @@ -736,6 +777,7 @@ void test_parse_cluster_slots_with_noncontiguous_slots(void) { assert(node->role == VALKEY_ROLE_REPLICA); dictRelease(nodes); + valkeyFree(c); valkeyClusterFree(cc); }